-
Notifications
You must be signed in to change notification settings - Fork 73
[SYNPY-1591] test upload speed of uploading large files to synapse #1264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
|
|
||
| import asyncio | ||
| import datetime | ||
| import logging | ||
| import os | ||
| import shutil | ||
| import subprocess # nosec | ||
|
|
@@ -38,13 +39,17 @@ | |
| # from opentelemetry.sdk.resources import SERVICE_NAME, Resource | ||
| # from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | ||
|
|
||
| # trace.set_tracer_provider( | ||
| # TracerProvider(resource=Resource(attributes={SERVICE_NAME: "upload_benchmarking"})) | ||
| # ) | ||
| # trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) | ||
| # os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "https://ingest.us.signoz.cloud" | ||
| # os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = "signoz-ingestion-key=<your key>" | ||
| # os.environ["OTEL_SERVICE_INSTANCE_ID"] = "local" | ||
|
|
||
| trace.set_tracer_provider( | ||
| TracerProvider(resource=Resource(attributes={SERVICE_NAME: "upload_benchmarking"})) | ||
| ) | ||
| trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) | ||
| tracer = trace.get_tracer("my_tracer") | ||
|
|
||
| PARENT_PROJECT = "syn$FILL_ME_IN" | ||
| PARENT_PROJECT = "syn70984427" | ||
| S3_BUCKET = "s3://$FILL_ME_IN" | ||
| S3_PROFILE = "$FILL_ME_IN" | ||
|
|
||
|
|
@@ -104,18 +109,21 @@ def create_folder_structure( | |
| print(f"total_size_of_files_bytes: {total_size_of_files_bytes}") | ||
| print(f"size_of_each_file_bytes: {size_of_each_file_bytes}") | ||
|
|
||
| chunk_size = MiB # size of each chunk in bytes | ||
|
|
||
| def create_files_in_current_dir(path_to_create_files: str) -> None: | ||
| for i in range(1, num_files_per_directory + 1): | ||
| chunk_size = MiB # size of each chunk in bytes | ||
| num_chunks = size_of_each_file_bytes // chunk_size | ||
| filename = os.path.join(path_to_create_files, f"file{i}.txt") | ||
| # when the file size is right, just modify the beginning to refresh the file | ||
| if ( | ||
| os.path.isfile(filename) | ||
| and os.path.getsize(filename) == size_of_each_file_bytes | ||
| ): | ||
| with open(filename, "r+b") as f: | ||
| f.seek(0) | ||
| f.write(os.urandom(chunk_size)) | ||
| # if the file doesn't exist or the size is wrong, create it from scratch | ||
| else: | ||
| if os.path.isfile(filename): | ||
| os.remove(filename) | ||
|
|
@@ -158,7 +166,7 @@ def cleanup( | |
| ["aws", "s3", "rm", S3_BUCKET, "--recursive", "--profile", S3_PROFILE] | ||
| ) # nosec | ||
| if delete_synapse: | ||
| for child in syn.getChildren(PARENT_PROJECT, includeTypes=["folder"]): | ||
| for child in syn.getChildren(PARENT_PROJECT, includeTypes=["folder", "file"]): | ||
| syn.delete(child["id"]) | ||
| syn.cache.purge(after_date=datetime.datetime(2021, 1, 1)) | ||
|
|
||
|
|
@@ -212,6 +220,7 @@ def execute_synapseutils_test( | |
| manifestFile=manifest_path, | ||
| sendMessages=False, | ||
| ) | ||
|
|
||
| print( | ||
| f"\nTime to sync to Synapse: {perf_counter() - time_before_syncToSynapse}" | ||
| ) | ||
|
|
@@ -277,6 +286,75 @@ def execute_walk_test( | |
| ) | ||
|
|
||
|
|
||
| def execute_walk_file_sequential( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @BryanFauble I modified the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this looks good to me! |
||
| path: str, | ||
| test_name: str, | ||
| ) -> None: | ||
| """Execute the test that uses os.walk to sync all files/folders to synapse. | ||
|
|
||
| Arguments: | ||
| path: The path to the root directory | ||
| test_name: The name of the test to add to the span name | ||
| """ | ||
| with tracer.start_as_current_span(f"manual_walk__{test_name}"): | ||
| time_before_walking_tree = perf_counter() | ||
|
|
||
| # Create descriptive log file name with timestamp | ||
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | ||
| log_file_path = os.path.expanduser( | ||
| f"~/upload_benchmark_{test_name}_{timestamp}.log" | ||
| ) | ||
| with open(log_file_path, "a") as log_file: | ||
| log_file.write(f"Test: {test_name}\n") | ||
| start_time = datetime.datetime.now() | ||
| log_file.write(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n") | ||
|
|
||
| # Create a simple parent lookup | ||
| parents = {path: PARENT_PROJECT} | ||
|
|
||
| for directory_path, directory_names, file_names in os.walk(path): | ||
| # Create folders on Synapse first | ||
| for directory_name in directory_names: | ||
| folder_path = os.path.join(directory_path, directory_name) | ||
| parent_id = parents[directory_path] | ||
|
|
||
| new_folder = Folder(name=directory_name, parent_id=parent_id) | ||
| # Store each folder immediately and save its Synapse ID | ||
| stored_folder = asyncio.run(new_folder.store_async()) | ||
| parents[folder_path] = stored_folder.id | ||
|
|
||
| # Upload files one by one | ||
| for filename in file_names: | ||
| filepath = os.path.join(directory_path, filename) | ||
| parent_id = parents[directory_path] | ||
|
|
||
| new_file = File( | ||
| path=filepath, | ||
| parent_id=parent_id, | ||
| annotations={ | ||
| "annot1": "value1", | ||
| "annot2": 1, | ||
| "annot3": 1.2, | ||
| "annot4": True, | ||
| "annot5": "2020-01-01", | ||
| }, | ||
| description="This is a Test File", | ||
| ) | ||
| # Upload this single file immediately | ||
| asyncio.run(new_file.store_async()) | ||
|
|
||
| # Write end time and duration to log file | ||
| with open(log_file_path, "a") as log_file: | ||
| end_time = datetime.datetime.now() | ||
| duration = perf_counter() - time_before_walking_tree | ||
| log_file.write(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n") | ||
| log_file.write(f"Duration: {duration:.2f} seconds\n") | ||
| log_file.write("-" * 50 + "\n") | ||
| print( | ||
| f"\nTime to walk and sync tree sequentially: {perf_counter() - time_before_walking_tree}" | ||
| ) | ||
|
|
||
|
|
||
| def execute_walk_test_oop( | ||
| path: str, | ||
| test_name: str, | ||
|
|
@@ -290,6 +368,16 @@ def execute_walk_test_oop( | |
| with tracer.start_as_current_span(f"manual_walk__{test_name}"): | ||
| time_before_walking_tree = perf_counter() | ||
|
|
||
| # Create descriptive log file name with timestamp | ||
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | ||
| log_file_path = os.path.expanduser( | ||
| f"~/upload_benchmark_{test_name}_{timestamp}.log" | ||
| ) | ||
| with open(log_file_path, "a") as log_file: | ||
| log_file.write(f"Test: {test_name}\n") | ||
| start_time = datetime.datetime.now() | ||
| log_file.write(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n") | ||
|
|
||
| root_project = Project(id=PARENT_PROJECT) | ||
| parents = {path: root_project} | ||
| for directory_path, directory_names, file_names in os.walk(path): | ||
|
|
@@ -318,6 +406,14 @@ def execute_walk_test_oop( | |
| ) | ||
| parent_container.files.append(new_file) | ||
| asyncio.run(root_project.store_async()) | ||
|
|
||
| # Write end time and duration to log file | ||
| with open(log_file_path, "a") as log_file: | ||
| end_time = datetime.datetime.now() | ||
| duration = perf_counter() - time_before_walking_tree | ||
| log_file.write(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n") | ||
| log_file.write(f"Duration: {duration:.2f} seconds\n") | ||
| log_file.write("-" * 50 + "\n") | ||
| print( | ||
| f"\nTime to walk and sync tree - OOP: {perf_counter() - time_before_walking_tree}" | ||
| ) | ||
|
|
@@ -361,15 +457,21 @@ def execute_test_suite( | |
| # Cleanup can be changed to delete_local=True when we want to clear the files out | ||
| # This can be kept as False to allow multiple tests with the same file/folder | ||
| # structure to re-use the files on Disk. | ||
| cleanup(path=path, delete_synapse=True, delete_s3=True, delete_local=False) | ||
| cleanup(path=path, delete_synapse=True, delete_s3=False, delete_local=False) | ||
| _, total_files, _ = create_folder_structure( | ||
| path=path, | ||
| depth_of_directory_tree=depth_of_directory_tree, | ||
| num_sub_directories=num_sub_directories, | ||
| num_files_per_directory=num_files_per_directory, | ||
| total_size_of_files_mib=total_size_of_files_mib, | ||
| ) | ||
| test_name = f"{total_files}_files_{total_size_of_files_mib}MiB" | ||
|
|
||
| if total_size_of_files_mib >= 1024: | ||
| test_name = f"{total_files}_files_{total_size_of_files_mib // 1024}GiB" | ||
| else: | ||
| test_name = f"{total_files}_files_{total_size_of_files_mib}MiB" | ||
|
|
||
| execute_walk_file_sequential(path, test_name) | ||
|
|
||
| # execute_synapseutils_test(path, test_name) | ||
|
|
||
|
|
@@ -380,25 +482,27 @@ def execute_test_suite( | |
| # execute_sync_to_s3(path, test_name) | ||
|
|
||
|
|
||
| syn = synapseclient.Synapse(debug=False) | ||
| root_path = os.path.expanduser("~/benchmarking") | ||
| syn = synapseclient.Synapse(debug=True, http_timeout_seconds=600) | ||
| synapseclient.Synapse.enable_open_telemetry() | ||
| root_path = os.path.expanduser("~/benchmarking3") | ||
|
|
||
| # Log-in with ~.synapseConfig `authToken` | ||
| syn.login() | ||
|
|
||
| print("25 Files - 25MiB") | ||
| # print("25 Files - 25MiB") | ||
| # 25 Files - 25MiB ----------------------------------------------------------------------- | ||
| depth = 1 | ||
| sub_directories = 1 | ||
| files_per_directory = 25 | ||
| size_mib = 25 | ||
|
|
||
| execute_test_suite( | ||
| path=root_path, | ||
| depth_of_directory_tree=depth, | ||
| num_sub_directories=sub_directories, | ||
| num_files_per_directory=files_per_directory, | ||
| total_size_of_files_mib=size_mib, | ||
| ) | ||
| # depth = 1 | ||
| # sub_directories = 1 | ||
| # files_per_directory = 25 | ||
| # size_mib = 25 | ||
|
|
||
| # execute_test_suite( | ||
| # path=root_path, | ||
| # depth_of_directory_tree=depth, | ||
| # num_sub_directories=sub_directories, | ||
| # num_files_per_directory=files_per_directory, | ||
| # total_size_of_files_mib=size_mib, | ||
| # ) | ||
|
|
||
| # print("1 Files - 10MiB") | ||
| # ## 1 Files - 10MiB ----------------------------------------------------------------------- | ||
|
|
@@ -535,3 +639,50 @@ def execute_test_suite( | |
| # num_files_per_directory=files_per_directory, | ||
| # total_size_of_files_mib=size_mib, | ||
| # ) | ||
|
|
||
| # print("4 Files - 400GB") | ||
| # # 4 Files - 400GB ----------------------------------------------------------------------- | ||
| # depth = 1 | ||
| # sub_directories = 1 | ||
| # files_per_directory = 4 | ||
| # size_mib = 4 * 100 * 1024 | ||
|
|
||
| # execute_test_suite( | ||
| # path=root_path, | ||
| # depth_of_directory_tree=depth, | ||
| # num_sub_directories=sub_directories, | ||
| # num_files_per_directory=files_per_directory, | ||
| # total_size_of_files_mib=size_mib, | ||
| # ) | ||
|
|
||
|
|
||
| # print("45 File - 100GB") | ||
| # # 45 File - 100GB ----------------------------------------------------------------------- | ||
| # depth = 1 | ||
| # sub_directories = 1 | ||
| # files_per_directory = 45 | ||
| # size_mib = 45 * 100 * 1024 | ||
|
|
||
| # execute_test_suite( | ||
| # path=root_path, | ||
| # depth_of_directory_tree=depth, | ||
| # num_sub_directories=sub_directories, | ||
| # num_files_per_directory=files_per_directory, | ||
| # total_size_of_files_mib=size_mib, | ||
| # ) | ||
|
|
||
|
|
||
| # print("4 Files - 1MB") | ||
| # # 4 Files - 1MB ----------------------------------------------------------------------- | ||
| # depth = 1 | ||
| # sub_directories = 1 | ||
| # files_per_directory = 4 | ||
| # size_mib = 4 * 1024 | ||
|
|
||
| # execute_test_suite( | ||
| # path=root_path, | ||
| # depth_of_directory_tree=depth, | ||
| # num_sub_directories=sub_directories, | ||
| # num_files_per_directory=files_per_directory, | ||
| # total_size_of_files_mib=size_mib, | ||
| # ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I added "file" here because I am uploading all files directly to a project. If this is not desired, I could revert.