Skip to content
8 changes: 8 additions & 0 deletions docs/serverless/utils/rp_upload.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ The upload utility provides functions to upload files and in-memory objects to a

*Note: The upload utility utilizes the Virtual-hosted-style URL with the bucket name in the host name. For example, `https: // bucket-name.s3.amazonaws.com`.*

## Requirements

The upload utility requires [boto3](https://pypi.org/project/boto3/) for S3 functionality. boto3 is lazy-loaded to minimize initial import time and memory footprint.

If you attempt to use S3 upload features without boto3 installed or S3 credentials are not configured, files will be saved to local disk instead:
- `upload_image()` saves to `simulated_uploaded/` directory
- `upload_file_to_bucket()` and `upload_in_memory_object()` save to `local_upload/` directory

## Bucket Credentials

You can set your S3 bucket credentials in the following ways:
Expand Down
3 changes: 2 additions & 1 deletion runpod/serverless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

from ..version import __version__ as runpod_version
from . import worker
from .modules import rp_fastapi
from .modules.rp_logger import RunPodLogger
from .modules.rp_progress import progress_update

Expand Down Expand Up @@ -155,6 +154,7 @@ def start(config: Dict[str, Any]):

if config["rp_args"]["rp_serve_api"]:
log.info("Starting API server.")
from .modules import rp_fastapi
api_server = rp_fastapi.WorkerAPI(config)

api_server.start_uvicorn(
Expand All @@ -166,6 +166,7 @@ def start(config: Dict[str, Any]):

if realtime_port:
log.info(f"Starting API server for realtime on port {realtime_port}.")
from .modules import rp_fastapi
api_server = rp_fastapi.WorkerAPI(config)

api_server.start_uvicorn(
Expand Down
2 changes: 1 addition & 1 deletion runpod/serverless/modules/rp_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async def get_jobs(self, session: ClientSession):

except TooManyRequests:
log.debug(
f"JobScaler.get_jobs | Too many requests. Debounce for 5 seconds."
"JobScaler.get_jobs | Too many requests. Debounce for 5 seconds."
)
await asyncio.sleep(5) # debounce for 5 seconds
except asyncio.CancelledError:
Expand Down
122 changes: 91 additions & 31 deletions runpod/serverless/utils/rp_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,75 @@
import threading
import time
import uuid
from typing import Optional, Tuple
from typing import TYPE_CHECKING, Optional, Tuple
from urllib.parse import urlparse

import boto3
from boto3 import session
from boto3.s3.transfer import TransferConfig
from botocore.config import Config
from tqdm_loggable.auto import tqdm

if TYPE_CHECKING:
from boto3.s3.transfer import TransferConfig
from botocore.client import BaseClient

logger = logging.getLogger("runpod upload utility")
FMT = "%(filename)-20s:%(lineno)-4d %(asctime)s %(message)s"
logging.basicConfig(level=logging.INFO, format=FMT, handlers=[logging.StreamHandler()])


def _import_boto3_dependencies():
"""
Lazy-load boto3 dependencies.
Returns tuple of (session, TransferConfig, Config) or raises ImportError.
"""
try:
from boto3 import session
from boto3.s3.transfer import TransferConfig
from botocore.config import Config
return session, TransferConfig, Config
except ImportError as e:
raise ImportError(
"boto3 is required for S3 upload functionality. "
"Install with: pip install boto3"
) from e


def _save_to_local_fallback(
file_name: str,
source_path: Optional[str] = None,
file_data: Optional[bytes] = None,
directory: str = "local_upload"
) -> str:
"""
Save file to local directory as fallback when S3 is unavailable.

Args:
file_name: Name of the file to save
source_path: Path to source file to copy (for file-based uploads)
file_data: Bytes to write (for in-memory uploads)
directory: Local directory to save to (default: 'local_upload')

Returns:
Path to the saved local file
"""
logger.warning(
f"No bucket endpoint set, saving to disk folder '{directory}'. "
"If this is a live endpoint, please reference: "
"https://github.com/runpod/runpod-python/blob/main/docs/serverless/utils/rp_upload.md"
)

os.makedirs(directory, exist_ok=True)
local_upload_location = f"{directory}/{file_name}"

if source_path:
shutil.copyfile(source_path, local_upload_location)
elif file_data is not None:
with open(local_upload_location, "wb") as file_output:
file_output.write(file_data)
else:
raise ValueError("Either source_path or file_data must be provided")

return local_upload_location


def extract_region_from_url(endpoint_url):
"""
Extracts the region from the endpoint URL.
Expand All @@ -43,12 +98,20 @@ def extract_region_from_url(endpoint_url):
# --------------------------- S3 Bucket Connection --------------------------- #
def get_boto_client(
bucket_creds: Optional[dict] = None,
) -> Tuple[
boto3.client, TransferConfig
]: # pragma: no cover # pylint: disable=line-too-long
) -> Tuple[Optional["BaseClient"], Optional["TransferConfig"]]:
"""
Returns a boto3 client and transfer config for the bucket.
Lazy-loads boto3 to reduce initial import time.
"""
try:
session, TransferConfig, Config = _import_boto3_dependencies()
except ImportError:
logger.warning(
"boto3 not installed. S3 upload functionality disabled. "
"Install with: pip install boto3"
)
return None, None

bucket_session = session.Session()

boto_config = Config(
Expand Down Expand Up @@ -111,18 +174,13 @@ def upload_image(
output = input_file.read()

if boto_client is None:
# Save the output to a file
print("No bucket endpoint set, saving to disk folder 'simulated_uploaded'")
print("If this is a live endpoint, please reference the following:")
print(
"https://github.com/runpod/runpod-python/blob/main/docs/serverless/utils/rp_upload.md"
) # pylint: disable=line-too-long

os.makedirs("simulated_uploaded", exist_ok=True)
sim_upload_location = f"simulated_uploaded/{image_name}{file_extension}"

with open(sim_upload_location, "wb") as file_output:
file_output.write(output)
# Save the output to a file using fallback helper
file_name_with_ext = f"{image_name}{file_extension}"
sim_upload_location = _save_to_local_fallback(
file_name_with_ext,
file_data=output,
directory="simulated_uploaded"
)

if results_list is not None:
results_list[result_index] = sim_upload_location
Expand Down Expand Up @@ -180,6 +238,15 @@ def bucket_upload(job_id, file_list, bucket_creds): # pragma: no cover
"""
Uploads files to bucket storage.
"""
try:
session, _, Config = _import_boto3_dependencies()
except ImportError:
logger.error(
"boto3 not installed. Cannot upload to S3 bucket. "
"Install with: pip install boto3"
)
raise

temp_bucket_session = session.Session()

temp_boto_config = Config(
Expand Down Expand Up @@ -231,17 +298,7 @@ def upload_file_to_bucket(
key = f"{prefix}/{file_name}" if prefix else file_name

if boto_client is None:
print("No bucket endpoint set, saving to disk folder 'local_upload'")
print("If this is a live endpoint, please reference the following:")
print(
"https://github.com/runpod/runpod-python/blob/main/docs/serverless/utils/rp_upload.md"
) # pylint: disable=line-too-long

os.makedirs("local_upload", exist_ok=True)
local_upload_location = f"local_upload/{file_name}"
shutil.copyfile(file_location, local_upload_location)

return local_upload_location
return _save_to_local_fallback(file_name, source_path=file_location)

file_size = os.path.getsize(file_location)
with tqdm(
Expand Down Expand Up @@ -285,6 +342,9 @@ def upload_in_memory_object(

key = f"{prefix}/{file_name}" if prefix else file_name

if boto_client is None:
return _save_to_local_fallback(file_name, file_data=file_data)

file_size = len(file_data)
with tqdm(
total=file_size, unit="B", unit_scale=True, desc=file_name
Expand Down
76 changes: 73 additions & 3 deletions tests/test_serverless/test_utils/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,25 @@ def setUp(self) -> None:
def tearDown(self):
os.environ = self.original_environ

def test_import_boto3_dependencies_missing(self):
"""
Tests _import_boto3_dependencies when boto3 is not available
"""
with patch("builtins.__import__", side_effect=ImportError("No module named 'boto3'")):
with self.assertRaises(ImportError) as context:
rp_upload._import_boto3_dependencies()
self.assertIn("boto3 is required for S3 upload functionality", str(context.exception))

def test_get_boto_client(self):
"""
Tests get_boto_client
"""
# Define the bucket credentials
bucket_creds = BUCKET_CREDENTIALS

# Mock boto3.session.Session
# Mock boto3 imports (now lazy-loaded inside the function)
with patch("boto3.session.Session") as mock_session, patch(
"runpod.serverless.utils.rp_upload.TransferConfig"
"boto3.s3.transfer.TransferConfig"
) as mock_transfer_config:
mock_session.return_value.client.return_value = self.mock_boto_client
mock_transfer_config.return_value = self.mock_transfer_config
Expand Down Expand Up @@ -110,8 +119,9 @@ def test_get_boto_client_environ(self):

importlib.reload(rp_upload)

# Mock boto3 imports (now lazy-loaded inside the function)
with patch("boto3.session.Session") as mock_session, patch(
"runpod.serverless.utils.rp_upload.TransferConfig"
"boto3.s3.transfer.TransferConfig"
) as mock_transfer_config:
mock_session.return_value.client.return_value = self.mock_boto_client
mock_transfer_config.return_value = self.mock_transfer_config
Expand Down Expand Up @@ -178,9 +188,46 @@ def test_upload_image_s3(self, mock_open, mock_get_boto_client):
mock_boto_client.generate_presigned_url.assert_called_once()


class TestLocalFallback(unittest.TestCase):
"""Tests for _save_to_local_fallback helper function"""

@patch("os.makedirs")
def test_save_to_local_fallback_invalid_args(self, mock_makedirs):
"""
Tests _save_to_local_fallback raises ValueError when neither source_path nor file_data provided
"""
with self.assertRaises(ValueError) as context:
rp_upload._save_to_local_fallback("test.txt")
self.assertIn("Either source_path or file_data must be provided", str(context.exception))


class TestUploadUtility(unittest.TestCase):
"""Tests for upload utility"""

@patch("runpod.serverless.utils.rp_upload.get_boto_client")
@patch("os.path.exists")
@patch("shutil.copyfile")
@patch("os.makedirs")
def test_upload_file_to_bucket_fallback(
self, mock_makedirs, mock_copyfile, mock_exists, mock_get_boto_client
):
"""
Tests upload_file_to_bucket fallback when boto_client is None
"""
# Mock get_boto_client to return None
mock_get_boto_client.return_value = (None, None)
mock_exists.return_value = True

file_name = "example.txt"
file_location = "/path/to/file.txt"

result = upload_file_to_bucket(file_name, file_location)

# Check fallback behavior
assert result == "local_upload/example.txt"
mock_makedirs.assert_called_once_with("local_upload", exist_ok=True)
mock_copyfile.assert_called_once_with(file_location, "local_upload/example.txt")

@patch("runpod.serverless.utils.rp_upload.get_boto_client")
def test_upload_file_to_bucket(self, mock_get_boto_client):
"""
Expand Down Expand Up @@ -220,6 +267,29 @@ def test_upload_file_to_bucket(self, mock_get_boto_client):
ExpiresIn=604800,
)

@patch("runpod.serverless.utils.rp_upload.get_boto_client")
@patch("builtins.open", new_callable=unittest.mock.mock_open)
@patch("os.makedirs")
def test_upload_in_memory_object_fallback(
self, mock_makedirs, mock_open_file, mock_get_boto_client
):
"""
Tests upload_in_memory_object fallback when boto_client is None
"""
# Mock get_boto_client to return None
mock_get_boto_client.return_value = (None, None)

file_name = "example.txt"
file_data = b"This is test data."

result = upload_in_memory_object(file_name, file_data)

# Check fallback behavior
assert result == "local_upload/example.txt"
mock_makedirs.assert_called_once_with("local_upload", exist_ok=True)
mock_open_file.assert_called_once_with("local_upload/example.txt", "wb")
mock_open_file().write.assert_called_once_with(file_data)

@patch("runpod.serverless.utils.rp_upload.get_boto_client")
def test_upload_in_memory_object(self, mock_get_boto_client):
"""
Expand Down
8 changes: 4 additions & 4 deletions tests/test_serverless/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ def test_local_api(self):
with patch(
"argparse.ArgumentParser.parse_known_args"
) as mock_parse_known_args, patch(
"runpod.serverless.rp_fastapi"
) as mock_fastapi:
"runpod.serverless.modules.rp_fastapi.WorkerAPI"
) as mock_worker_api:

mock_parse_known_args.return_value = known_args, []
runpod.serverless.start({"handler": self.mock_handler})

assert mock_fastapi.WorkerAPI.called
assert mock_worker_api.called

@patch("runpod.serverless.log")
@patch("runpod.serverless.sys.exit")
Expand Down Expand Up @@ -544,7 +544,7 @@ def test_start_sets_excepthook(self, _, __):
assert sys.excepthook == _handle_uncaught_exception

@patch("runpod.serverless.signal.signal")
@patch("runpod.serverless.rp_fastapi.WorkerAPI.start_uvicorn")
@patch("runpod.serverless.modules.rp_fastapi.WorkerAPI.start_uvicorn")
@patch("runpod.serverless._set_config_args")
def test_start_does_not_set_excepthook(self, mock_set_config_args, _, __):
mock_set_config_args.return_value = self.config
Expand Down
Loading