diff --git a/docs/serverless/utils/rp_upload.md b/docs/serverless/utils/rp_upload.md index 9a826507..3560fdb9 100644 --- a/docs/serverless/utils/rp_upload.md +++ b/docs/serverless/utils/rp_upload.md @@ -4,6 +4,14 @@ The upload utility provides functions to upload files and in-memory objects to a *Note: The upload utility utilizes the Virtual-hosted-style URL with the bucket name in the host name. For example, `https: // bucket-name.s3.amazonaws.com`.* +## Requirements + +The upload utility requires [boto3](https://pypi.org/project/boto3/) for S3 functionality. boto3 is lazy-loaded to minimize initial import time and memory footprint. + +If you attempt to use S3 upload features without boto3 installed or S3 credentials are not configured, files will be saved to local disk instead: +- `upload_image()` saves to `simulated_uploaded/` directory +- `upload_file_to_bucket()` and `upload_in_memory_object()` save to `local_upload/` directory + ## Bucket Credentials You can set your S3 bucket credentials in the following ways: diff --git a/runpod/serverless/__init__.py b/runpod/serverless/__init__.py index c3b3c19a..1b6b88df 100644 --- a/runpod/serverless/__init__.py +++ b/runpod/serverless/__init__.py @@ -14,7 +14,6 @@ from ..version import __version__ as runpod_version from . import worker -from .modules import rp_fastapi from .modules.rp_logger import RunPodLogger from .modules.rp_progress import progress_update @@ -155,6 +154,7 @@ def start(config: Dict[str, Any]): if config["rp_args"]["rp_serve_api"]: log.info("Starting API server.") + from .modules import rp_fastapi api_server = rp_fastapi.WorkerAPI(config) api_server.start_uvicorn( @@ -166,6 +166,7 @@ def start(config: Dict[str, Any]): if realtime_port: log.info(f"Starting API server for realtime on port {realtime_port}.") + from .modules import rp_fastapi api_server = rp_fastapi.WorkerAPI(config) api_server.start_uvicorn( diff --git a/runpod/serverless/modules/rp_scale.py b/runpod/serverless/modules/rp_scale.py index 5c7d79cc..f8a63bca 100644 --- a/runpod/serverless/modules/rp_scale.py +++ b/runpod/serverless/modules/rp_scale.py @@ -195,7 +195,7 @@ async def get_jobs(self, session: ClientSession): except TooManyRequests: log.debug( - f"JobScaler.get_jobs | Too many requests. Debounce for 5 seconds." + "JobScaler.get_jobs | Too many requests. Debounce for 5 seconds." ) await asyncio.sleep(5) # debounce for 5 seconds except asyncio.CancelledError: diff --git a/runpod/serverless/utils/rp_upload.py b/runpod/serverless/utils/rp_upload.py index 3f1c5af5..d4e9a015 100644 --- a/runpod/serverless/utils/rp_upload.py +++ b/runpod/serverless/utils/rp_upload.py @@ -10,20 +10,75 @@ import threading import time import uuid -from typing import Optional, Tuple +from typing import TYPE_CHECKING, Optional, Tuple from urllib.parse import urlparse -import boto3 -from boto3 import session -from boto3.s3.transfer import TransferConfig -from botocore.config import Config from tqdm_loggable.auto import tqdm +if TYPE_CHECKING: + from boto3.s3.transfer import TransferConfig + from botocore.client import BaseClient + logger = logging.getLogger("runpod upload utility") FMT = "%(filename)-20s:%(lineno)-4d %(asctime)s %(message)s" logging.basicConfig(level=logging.INFO, format=FMT, handlers=[logging.StreamHandler()]) +def _import_boto3_dependencies(): + """ + Lazy-load boto3 dependencies. + Returns tuple of (session, TransferConfig, Config) or raises ImportError. + """ + try: + from boto3 import session + from boto3.s3.transfer import TransferConfig + from botocore.config import Config + return session, TransferConfig, Config + except ImportError as e: + raise ImportError( + "boto3 is required for S3 upload functionality. " + "Install with: pip install boto3" + ) from e + + +def _save_to_local_fallback( + file_name: str, + source_path: Optional[str] = None, + file_data: Optional[bytes] = None, + directory: str = "local_upload" +) -> str: + """ + Save file to local directory as fallback when S3 is unavailable. + + Args: + file_name: Name of the file to save + source_path: Path to source file to copy (for file-based uploads) + file_data: Bytes to write (for in-memory uploads) + directory: Local directory to save to (default: 'local_upload') + + Returns: + Path to the saved local file + """ + logger.warning( + f"No bucket endpoint set, saving to disk folder '{directory}'. " + "If this is a live endpoint, please reference: " + "https://github.com/runpod/runpod-python/blob/main/docs/serverless/utils/rp_upload.md" + ) + + os.makedirs(directory, exist_ok=True) + local_upload_location = f"{directory}/{file_name}" + + if source_path: + shutil.copyfile(source_path, local_upload_location) + elif file_data is not None: + with open(local_upload_location, "wb") as file_output: + file_output.write(file_data) + else: + raise ValueError("Either source_path or file_data must be provided") + + return local_upload_location + + def extract_region_from_url(endpoint_url): """ Extracts the region from the endpoint URL. @@ -43,12 +98,20 @@ def extract_region_from_url(endpoint_url): # --------------------------- S3 Bucket Connection --------------------------- # def get_boto_client( bucket_creds: Optional[dict] = None, -) -> Tuple[ - boto3.client, TransferConfig -]: # pragma: no cover # pylint: disable=line-too-long +) -> Tuple[Optional["BaseClient"], Optional["TransferConfig"]]: """ Returns a boto3 client and transfer config for the bucket. + Lazy-loads boto3 to reduce initial import time. """ + try: + session, TransferConfig, Config = _import_boto3_dependencies() + except ImportError: + logger.warning( + "boto3 not installed. S3 upload functionality disabled. " + "Install with: pip install boto3" + ) + return None, None + bucket_session = session.Session() boto_config = Config( @@ -111,18 +174,13 @@ def upload_image( output = input_file.read() if boto_client is None: - # Save the output to a file - print("No bucket endpoint set, saving to disk folder 'simulated_uploaded'") - print("If this is a live endpoint, please reference the following:") - print( - "https://github.com/runpod/runpod-python/blob/main/docs/serverless/utils/rp_upload.md" - ) # pylint: disable=line-too-long - - os.makedirs("simulated_uploaded", exist_ok=True) - sim_upload_location = f"simulated_uploaded/{image_name}{file_extension}" - - with open(sim_upload_location, "wb") as file_output: - file_output.write(output) + # Save the output to a file using fallback helper + file_name_with_ext = f"{image_name}{file_extension}" + sim_upload_location = _save_to_local_fallback( + file_name_with_ext, + file_data=output, + directory="simulated_uploaded" + ) if results_list is not None: results_list[result_index] = sim_upload_location @@ -180,6 +238,15 @@ def bucket_upload(job_id, file_list, bucket_creds): # pragma: no cover """ Uploads files to bucket storage. """ + try: + session, _, Config = _import_boto3_dependencies() + except ImportError: + logger.error( + "boto3 not installed. Cannot upload to S3 bucket. " + "Install with: pip install boto3" + ) + raise + temp_bucket_session = session.Session() temp_boto_config = Config( @@ -231,17 +298,7 @@ def upload_file_to_bucket( key = f"{prefix}/{file_name}" if prefix else file_name if boto_client is None: - print("No bucket endpoint set, saving to disk folder 'local_upload'") - print("If this is a live endpoint, please reference the following:") - print( - "https://github.com/runpod/runpod-python/blob/main/docs/serverless/utils/rp_upload.md" - ) # pylint: disable=line-too-long - - os.makedirs("local_upload", exist_ok=True) - local_upload_location = f"local_upload/{file_name}" - shutil.copyfile(file_location, local_upload_location) - - return local_upload_location + return _save_to_local_fallback(file_name, source_path=file_location) file_size = os.path.getsize(file_location) with tqdm( @@ -285,6 +342,9 @@ def upload_in_memory_object( key = f"{prefix}/{file_name}" if prefix else file_name + if boto_client is None: + return _save_to_local_fallback(file_name, file_data=file_data) + file_size = len(file_data) with tqdm( total=file_size, unit="B", unit_scale=True, desc=file_name diff --git a/tests/test_serverless/test_utils/test_upload.py b/tests/test_serverless/test_utils/test_upload.py index a67ada8c..3e3abe53 100644 --- a/tests/test_serverless/test_utils/test_upload.py +++ b/tests/test_serverless/test_utils/test_upload.py @@ -32,6 +32,15 @@ def setUp(self) -> None: def tearDown(self): os.environ = self.original_environ + def test_import_boto3_dependencies_missing(self): + """ + Tests _import_boto3_dependencies when boto3 is not available + """ + with patch("builtins.__import__", side_effect=ImportError("No module named 'boto3'")): + with self.assertRaises(ImportError) as context: + rp_upload._import_boto3_dependencies() + self.assertIn("boto3 is required for S3 upload functionality", str(context.exception)) + def test_get_boto_client(self): """ Tests get_boto_client @@ -39,9 +48,9 @@ def test_get_boto_client(self): # Define the bucket credentials bucket_creds = BUCKET_CREDENTIALS - # Mock boto3.session.Session + # Mock boto3 imports (now lazy-loaded inside the function) with patch("boto3.session.Session") as mock_session, patch( - "runpod.serverless.utils.rp_upload.TransferConfig" + "boto3.s3.transfer.TransferConfig" ) as mock_transfer_config: mock_session.return_value.client.return_value = self.mock_boto_client mock_transfer_config.return_value = self.mock_transfer_config @@ -110,8 +119,9 @@ def test_get_boto_client_environ(self): importlib.reload(rp_upload) + # Mock boto3 imports (now lazy-loaded inside the function) with patch("boto3.session.Session") as mock_session, patch( - "runpod.serverless.utils.rp_upload.TransferConfig" + "boto3.s3.transfer.TransferConfig" ) as mock_transfer_config: mock_session.return_value.client.return_value = self.mock_boto_client mock_transfer_config.return_value = self.mock_transfer_config @@ -178,9 +188,46 @@ def test_upload_image_s3(self, mock_open, mock_get_boto_client): mock_boto_client.generate_presigned_url.assert_called_once() +class TestLocalFallback(unittest.TestCase): + """Tests for _save_to_local_fallback helper function""" + + @patch("os.makedirs") + def test_save_to_local_fallback_invalid_args(self, mock_makedirs): + """ + Tests _save_to_local_fallback raises ValueError when neither source_path nor file_data provided + """ + with self.assertRaises(ValueError) as context: + rp_upload._save_to_local_fallback("test.txt") + self.assertIn("Either source_path or file_data must be provided", str(context.exception)) + + class TestUploadUtility(unittest.TestCase): """Tests for upload utility""" + @patch("runpod.serverless.utils.rp_upload.get_boto_client") + @patch("os.path.exists") + @patch("shutil.copyfile") + @patch("os.makedirs") + def test_upload_file_to_bucket_fallback( + self, mock_makedirs, mock_copyfile, mock_exists, mock_get_boto_client + ): + """ + Tests upload_file_to_bucket fallback when boto_client is None + """ + # Mock get_boto_client to return None + mock_get_boto_client.return_value = (None, None) + mock_exists.return_value = True + + file_name = "example.txt" + file_location = "/path/to/file.txt" + + result = upload_file_to_bucket(file_name, file_location) + + # Check fallback behavior + assert result == "local_upload/example.txt" + mock_makedirs.assert_called_once_with("local_upload", exist_ok=True) + mock_copyfile.assert_called_once_with(file_location, "local_upload/example.txt") + @patch("runpod.serverless.utils.rp_upload.get_boto_client") def test_upload_file_to_bucket(self, mock_get_boto_client): """ @@ -220,6 +267,29 @@ def test_upload_file_to_bucket(self, mock_get_boto_client): ExpiresIn=604800, ) + @patch("runpod.serverless.utils.rp_upload.get_boto_client") + @patch("builtins.open", new_callable=unittest.mock.mock_open) + @patch("os.makedirs") + def test_upload_in_memory_object_fallback( + self, mock_makedirs, mock_open_file, mock_get_boto_client + ): + """ + Tests upload_in_memory_object fallback when boto_client is None + """ + # Mock get_boto_client to return None + mock_get_boto_client.return_value = (None, None) + + file_name = "example.txt" + file_data = b"This is test data." + + result = upload_in_memory_object(file_name, file_data) + + # Check fallback behavior + assert result == "local_upload/example.txt" + mock_makedirs.assert_called_once_with("local_upload", exist_ok=True) + mock_open_file.assert_called_once_with("local_upload/example.txt", "wb") + mock_open_file().write.assert_called_once_with(file_data) + @patch("runpod.serverless.utils.rp_upload.get_boto_client") def test_upload_in_memory_object(self, mock_get_boto_client): """ diff --git a/tests/test_serverless/test_worker.py b/tests/test_serverless/test_worker.py index 19f03388..e1fd743f 100644 --- a/tests/test_serverless/test_worker.py +++ b/tests/test_serverless/test_worker.py @@ -84,13 +84,13 @@ def test_local_api(self): with patch( "argparse.ArgumentParser.parse_known_args" ) as mock_parse_known_args, patch( - "runpod.serverless.rp_fastapi" - ) as mock_fastapi: + "runpod.serverless.modules.rp_fastapi.WorkerAPI" + ) as mock_worker_api: mock_parse_known_args.return_value = known_args, [] runpod.serverless.start({"handler": self.mock_handler}) - assert mock_fastapi.WorkerAPI.called + assert mock_worker_api.called @patch("runpod.serverless.log") @patch("runpod.serverless.sys.exit") @@ -544,7 +544,7 @@ def test_start_sets_excepthook(self, _, __): assert sys.excepthook == _handle_uncaught_exception @patch("runpod.serverless.signal.signal") - @patch("runpod.serverless.rp_fastapi.WorkerAPI.start_uvicorn") + @patch("runpod.serverless.modules.rp_fastapi.WorkerAPI.start_uvicorn") @patch("runpod.serverless._set_config_args") def test_start_does_not_set_excepthook(self, mock_set_config_args, _, __): mock_set_config_args.return_value = self.config