Skip to content
8 changes: 8 additions & 0 deletions docs/serverless/utils/rp_upload.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ The upload utility provides functions to upload files and in-memory objects to a

*Note: The upload utility utilizes the Virtual-hosted-style URL with the bucket name in the host name. For example, `https: // bucket-name.s3.amazonaws.com`.*

## Requirements

The upload utility requires [boto3](https://pypi.org/project/boto3/) for S3 functionality. boto3 is lazy-loaded to minimize initial import time and memory footprint.

If you attempt to use S3 upload features without boto3 installed or S3 credentials are not configured, files will be saved to local disk instead:
- `upload_image()` saves to `simulated_uploaded/` directory
- `upload_file_to_bucket()` and `upload_in_memory_object()` save to `local_upload/` directory

## Bucket Credentials

You can set your S3 bucket credentials in the following ways:
Expand Down
2 changes: 1 addition & 1 deletion runpod/serverless/modules/rp_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async def get_jobs(self, session: ClientSession):

except TooManyRequests:
log.debug(
f"JobScaler.get_jobs | Too many requests. Debounce for 5 seconds."
"JobScaler.get_jobs | Too many requests. Debounce for 5 seconds."
)
await asyncio.sleep(5) # debounce for 5 seconds
except asyncio.CancelledError:
Expand Down
76 changes: 60 additions & 16 deletions runpod/serverless/utils/rp_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,33 @@
import threading
import time
import uuid
from typing import Optional, Tuple
from typing import Any, Optional, Tuple
from urllib.parse import urlparse

import boto3
from boto3 import session
from boto3.s3.transfer import TransferConfig
from botocore.config import Config
from tqdm_loggable.auto import tqdm

logger = logging.getLogger("runpod upload utility")
FMT = "%(filename)-20s:%(lineno)-4d %(asctime)s %(message)s"
logging.basicConfig(level=logging.INFO, format=FMT, handlers=[logging.StreamHandler()])


def _import_boto3_dependencies():
"""
Lazy-load boto3 dependencies.
Returns tuple of (session, TransferConfig, Config) or raises ImportError.
"""
try:
from boto3 import session
from boto3.s3.transfer import TransferConfig
from botocore.config import Config
return session, TransferConfig, Config
except ImportError as e:
raise ImportError(
"boto3 is required for S3 upload functionality. "
"Install with: pip install boto3"
) from e


def extract_region_from_url(endpoint_url):
"""
Extracts the region from the endpoint URL.
Expand All @@ -43,12 +56,20 @@ def extract_region_from_url(endpoint_url):
# --------------------------- S3 Bucket Connection --------------------------- #
def get_boto_client(
bucket_creds: Optional[dict] = None,
) -> Tuple[
boto3.client, TransferConfig
]: # pragma: no cover # pylint: disable=line-too-long
) -> Tuple[Any, Any]: # pragma: no cover # pylint: disable=line-too-long
"""
Returns a boto3 client and transfer config for the bucket.
Lazy-loads boto3 to reduce initial import time.
"""
try:
session, TransferConfig, Config = _import_boto3_dependencies()
except ImportError:
logger.warning(
"boto3 not installed. S3 upload functionality disabled. "
"Install with: pip install boto3"
)
return None, None

bucket_session = session.Session()

boto_config = Config(
Expand Down Expand Up @@ -112,11 +133,11 @@ def upload_image(

if boto_client is None:
# Save the output to a file
print("No bucket endpoint set, saving to disk folder 'simulated_uploaded'")
print("If this is a live endpoint, please reference the following:")
print(
logger.warning(
"No bucket endpoint set, saving to disk folder 'simulated_uploaded'. "
"If this is a live endpoint, please reference: "
"https://github.com/runpod/runpod-python/blob/main/docs/serverless/utils/rp_upload.md"
) # pylint: disable=line-too-long
)

os.makedirs("simulated_uploaded", exist_ok=True)
sim_upload_location = f"simulated_uploaded/{image_name}{file_extension}"
Expand Down Expand Up @@ -180,6 +201,15 @@ def bucket_upload(job_id, file_list, bucket_creds): # pragma: no cover
"""
Uploads files to bucket storage.
"""
try:
session, _, Config = _import_boto3_dependencies()
except ImportError:
logger.error(
"boto3 not installed. Cannot upload to S3 bucket. "
"Install with: pip install boto3"
)
raise

temp_bucket_session = session.Session()

temp_boto_config = Config(
Expand Down Expand Up @@ -231,11 +261,11 @@ def upload_file_to_bucket(
key = f"{prefix}/{file_name}" if prefix else file_name

if boto_client is None:
print("No bucket endpoint set, saving to disk folder 'local_upload'")
print("If this is a live endpoint, please reference the following:")
print(
logger.warning(
"No bucket endpoint set, saving to disk folder 'local_upload'. "
"If this is a live endpoint, please reference: "
"https://github.com/runpod/runpod-python/blob/main/docs/serverless/utils/rp_upload.md"
) # pylint: disable=line-too-long
)

os.makedirs("local_upload", exist_ok=True)
local_upload_location = f"local_upload/{file_name}"
Expand Down Expand Up @@ -285,6 +315,20 @@ def upload_in_memory_object(

key = f"{prefix}/{file_name}" if prefix else file_name

if boto_client is None:
logger.warning(
"No bucket endpoint set, saving to disk folder 'local_upload'. "
"If this is a live endpoint, please reference: "
"https://github.com/runpod/runpod-python/blob/main/docs/serverless/utils/rp_upload.md"
)

os.makedirs("local_upload", exist_ok=True)
local_upload_location = f"local_upload/{file_name}"
with open(local_upload_location, "wb") as file_output:
file_output.write(file_data)

return local_upload_location

file_size = len(file_data)
with tqdm(
total=file_size, unit="B", unit_scale=True, desc=file_name
Expand Down
7 changes: 4 additions & 3 deletions tests/test_serverless/test_utils/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ def test_get_boto_client(self):
# Define the bucket credentials
bucket_creds = BUCKET_CREDENTIALS

# Mock boto3.session.Session
# Mock boto3 imports (now lazy-loaded inside the function)
with patch("boto3.session.Session") as mock_session, patch(
"runpod.serverless.utils.rp_upload.TransferConfig"
"boto3.s3.transfer.TransferConfig"
) as mock_transfer_config:
mock_session.return_value.client.return_value = self.mock_boto_client
mock_transfer_config.return_value = self.mock_transfer_config
Expand Down Expand Up @@ -110,8 +110,9 @@ def test_get_boto_client_environ(self):

importlib.reload(rp_upload)

# Mock boto3 imports (now lazy-loaded inside the function)
with patch("boto3.session.Session") as mock_session, patch(
"runpod.serverless.utils.rp_upload.TransferConfig"
"boto3.s3.transfer.TransferConfig"
) as mock_transfer_config:
mock_session.return_value.client.return_value = self.mock_boto_client
mock_transfer_config.return_value = self.mock_transfer_config
Expand Down
Loading