-
Notifications
You must be signed in to change notification settings - Fork 73
[SYNPY-1591] test upload speed of uploading large files to synapse #1264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
linglp
wants to merge
15
commits into
develop
Choose a base branch
from
synpy-1591
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 5 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
734bf1c
set up a proof of concept
7bb6676
change position of the timer
61e41f3
update script
5565daa
update script to better log
f613064
update function to upload files sequentially
042fab8
clean up script
aae09bb
clean up script
21c3b06
run black
386534d
add to benchmarking page
2a80d3e
update docstring
6ca60f4
update format
8b1b7a8
update format
486c806
add warning message
f9a599a
add warning to tutorial
af5eec5
added failed test attempts result
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
|
|
||
| import asyncio | ||
| import datetime | ||
| import logging | ||
| import os | ||
| import shutil | ||
| import subprocess # nosec | ||
|
|
@@ -38,13 +39,17 @@ | |
| # from opentelemetry.sdk.resources import SERVICE_NAME, Resource | ||
| # from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | ||
|
|
||
| # trace.set_tracer_provider( | ||
| # TracerProvider(resource=Resource(attributes={SERVICE_NAME: "upload_benchmarking"})) | ||
| # ) | ||
| # trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) | ||
| # os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "https://ingest.us.signoz.cloud" | ||
| # os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = "signoz-ingestion-key=<your key>" | ||
| # os.environ["OTEL_SERVICE_INSTANCE_ID"] = "local" | ||
|
|
||
| trace.set_tracer_provider( | ||
| TracerProvider(resource=Resource(attributes={SERVICE_NAME: "upload_benchmarking"})) | ||
| ) | ||
| trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) | ||
| tracer = trace.get_tracer("my_tracer") | ||
|
|
||
| PARENT_PROJECT = "syn$FILL_ME_IN" | ||
| PARENT_PROJECT = "syn70984427" | ||
| S3_BUCKET = "s3://$FILL_ME_IN" | ||
| S3_PROFILE = "$FILL_ME_IN" | ||
|
|
||
|
|
@@ -104,18 +109,21 @@ def create_folder_structure( | |
| print(f"total_size_of_files_bytes: {total_size_of_files_bytes}") | ||
| print(f"size_of_each_file_bytes: {size_of_each_file_bytes}") | ||
|
|
||
| chunk_size = MiB # size of each chunk in bytes | ||
|
|
||
| def create_files_in_current_dir(path_to_create_files: str) -> None: | ||
| for i in range(1, num_files_per_directory + 1): | ||
| chunk_size = MiB # size of each chunk in bytes | ||
| num_chunks = size_of_each_file_bytes // chunk_size | ||
| filename = os.path.join(path_to_create_files, f"file{i}.txt") | ||
| # when the file size is right, just modify the beginning to refresh the file | ||
| if ( | ||
| os.path.isfile(filename) | ||
| and os.path.getsize(filename) == size_of_each_file_bytes | ||
| ): | ||
| with open(filename, "r+b") as f: | ||
| f.seek(0) | ||
| f.write(os.urandom(chunk_size)) | ||
| # if the file doesn't exist or the size is wrong, create it from scratch | ||
| else: | ||
| if os.path.isfile(filename): | ||
| os.remove(filename) | ||
|
|
@@ -158,7 +166,7 @@ def cleanup( | |
| ["aws", "s3", "rm", S3_BUCKET, "--recursive", "--profile", S3_PROFILE] | ||
| ) # nosec | ||
| if delete_synapse: | ||
| for child in syn.getChildren(PARENT_PROJECT, includeTypes=["folder"]): | ||
| for child in syn.getChildren(PARENT_PROJECT, includeTypes=["folder", "file"]): | ||
| syn.delete(child["id"]) | ||
| syn.cache.purge(after_date=datetime.datetime(2021, 1, 1)) | ||
|
|
||
|
|
@@ -212,6 +220,7 @@ def execute_synapseutils_test( | |
| manifestFile=manifest_path, | ||
| sendMessages=False, | ||
| ) | ||
|
|
||
| print( | ||
| f"\nTime to sync to Synapse: {perf_counter() - time_before_syncToSynapse}" | ||
| ) | ||
|
|
@@ -277,6 +286,75 @@ def execute_walk_test( | |
| ) | ||
|
|
||
|
|
||
| def execute_walk_file_sequential( | ||
linglp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| path: str, | ||
| test_name: str, | ||
| ) -> None: | ||
| """Execute the test that uses os.walk to sync all files/folders to synapse. | ||
|
|
||
| Arguments: | ||
| path: The path to the root directory | ||
| test_name: The name of the test to add to the span name | ||
| """ | ||
| with tracer.start_as_current_span(f"manual_walk__{test_name}"): | ||
| time_before_walking_tree = perf_counter() | ||
|
|
||
| # Create descriptive log file name with timestamp | ||
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | ||
| log_file_path = os.path.expanduser( | ||
| f"~/upload_benchmark_{test_name}_{timestamp}.log" | ||
| ) | ||
| with open(log_file_path, "a") as log_file: | ||
| log_file.write(f"Test: {test_name}\n") | ||
| start_time = datetime.datetime.now() | ||
| log_file.write(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n") | ||
|
|
||
| # Create a simple parent lookup | ||
| parents = {path: PARENT_PROJECT} | ||
|
|
||
| for directory_path, directory_names, file_names in os.walk(path): | ||
| # Create folders on Synapse first | ||
| for directory_name in directory_names: | ||
| folder_path = os.path.join(directory_path, directory_name) | ||
| parent_id = parents[directory_path] | ||
|
|
||
| new_folder = Folder(name=directory_name, parent_id=parent_id) | ||
| # Store each folder immediately and save its Synapse ID | ||
| stored_folder = asyncio.run(new_folder.store_async()) | ||
| parents[folder_path] = stored_folder.id | ||
|
|
||
| # Upload files one by one | ||
| for filename in file_names: | ||
| filepath = os.path.join(directory_path, filename) | ||
| parent_id = parents[directory_path] | ||
|
|
||
| new_file = File( | ||
| path=filepath, | ||
| parent_id=parent_id, | ||
| annotations={ | ||
| "annot1": "value1", | ||
| "annot2": 1, | ||
| "annot3": 1.2, | ||
| "annot4": True, | ||
| "annot5": "2020-01-01", | ||
| }, | ||
| description="This is a Test File", | ||
| ) | ||
| # Upload this single file immediately | ||
| asyncio.run(new_file.store_async()) | ||
|
|
||
| # Write end time and duration to log file | ||
| with open(log_file_path, "a") as log_file: | ||
| end_time = datetime.datetime.now() | ||
| duration = perf_counter() - time_before_walking_tree | ||
| log_file.write(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n") | ||
| log_file.write(f"Duration: {duration:.2f} seconds\n") | ||
| log_file.write("-" * 50 + "\n") | ||
| print( | ||
| f"\nTime to walk and sync tree sequentially: {perf_counter() - time_before_walking_tree}" | ||
| ) | ||
|
|
||
|
|
||
| def execute_walk_test_oop( | ||
| path: str, | ||
| test_name: str, | ||
|
|
@@ -290,6 +368,16 @@ def execute_walk_test_oop( | |
| with tracer.start_as_current_span(f"manual_walk__{test_name}"): | ||
| time_before_walking_tree = perf_counter() | ||
|
|
||
| # Create descriptive log file name with timestamp | ||
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | ||
| log_file_path = os.path.expanduser( | ||
| f"~/upload_benchmark_{test_name}_{timestamp}.log" | ||
| ) | ||
| with open(log_file_path, "a") as log_file: | ||
| log_file.write(f"Test: {test_name}\n") | ||
| start_time = datetime.datetime.now() | ||
| log_file.write(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n") | ||
|
|
||
| root_project = Project(id=PARENT_PROJECT) | ||
| parents = {path: root_project} | ||
| for directory_path, directory_names, file_names in os.walk(path): | ||
|
|
@@ -318,6 +406,14 @@ def execute_walk_test_oop( | |
| ) | ||
| parent_container.files.append(new_file) | ||
| asyncio.run(root_project.store_async()) | ||
|
|
||
| # Write end time and duration to log file | ||
| with open(log_file_path, "a") as log_file: | ||
| end_time = datetime.datetime.now() | ||
| duration = perf_counter() - time_before_walking_tree | ||
| log_file.write(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n") | ||
| log_file.write(f"Duration: {duration:.2f} seconds\n") | ||
| log_file.write("-" * 50 + "\n") | ||
| print( | ||
| f"\nTime to walk and sync tree - OOP: {perf_counter() - time_before_walking_tree}" | ||
| ) | ||
|
|
@@ -361,15 +457,21 @@ def execute_test_suite( | |
| # Cleanup can be changed to delete_local=True when we want to clear the files out | ||
| # This can be kept as False to allow multiple tests with the same file/folder | ||
| # structure to re-use the files on Disk. | ||
| cleanup(path=path, delete_synapse=True, delete_s3=True, delete_local=False) | ||
| cleanup(path=path, delete_synapse=True, delete_s3=False, delete_local=False) | ||
| _, total_files, _ = create_folder_structure( | ||
| path=path, | ||
| depth_of_directory_tree=depth_of_directory_tree, | ||
| num_sub_directories=num_sub_directories, | ||
| num_files_per_directory=num_files_per_directory, | ||
| total_size_of_files_mib=total_size_of_files_mib, | ||
| ) | ||
| test_name = f"{total_files}_files_{total_size_of_files_mib}MiB" | ||
|
|
||
| if total_size_of_files_mib >= 1024: | ||
| test_name = f"{total_files}_files_{total_size_of_files_mib // 1024}GiB" | ||
| else: | ||
| test_name = f"{total_files}_files_{total_size_of_files_mib}MiB" | ||
|
|
||
| execute_walk_file_sequential(path, test_name) | ||
|
|
||
| # execute_synapseutils_test(path, test_name) | ||
|
|
||
|
|
@@ -380,25 +482,27 @@ def execute_test_suite( | |
| # execute_sync_to_s3(path, test_name) | ||
|
|
||
|
|
||
| syn = synapseclient.Synapse(debug=False) | ||
| root_path = os.path.expanduser("~/benchmarking") | ||
| syn = synapseclient.Synapse(debug=True, http_timeout_seconds=600) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we found that a higher default |
||
| synapseclient.Synapse.enable_open_telemetry() | ||
| root_path = os.path.expanduser("~/benchmarking3") | ||
|
|
||
| # Log-in with ~.synapseConfig `authToken` | ||
| syn.login() | ||
|
|
||
| print("25 Files - 25MiB") | ||
| # print("25 Files - 25MiB") | ||
| # 25 Files - 25MiB ----------------------------------------------------------------------- | ||
| depth = 1 | ||
| sub_directories = 1 | ||
| files_per_directory = 25 | ||
| size_mib = 25 | ||
|
|
||
| execute_test_suite( | ||
| path=root_path, | ||
| depth_of_directory_tree=depth, | ||
| num_sub_directories=sub_directories, | ||
| num_files_per_directory=files_per_directory, | ||
| total_size_of_files_mib=size_mib, | ||
| ) | ||
| # depth = 1 | ||
| # sub_directories = 1 | ||
| # files_per_directory = 25 | ||
| # size_mib = 25 | ||
|
|
||
| # execute_test_suite( | ||
| # path=root_path, | ||
| # depth_of_directory_tree=depth, | ||
| # num_sub_directories=sub_directories, | ||
| # num_files_per_directory=files_per_directory, | ||
| # total_size_of_files_mib=size_mib, | ||
| # ) | ||
|
|
||
| # print("1 Files - 10MiB") | ||
| # ## 1 Files - 10MiB ----------------------------------------------------------------------- | ||
|
|
@@ -535,3 +639,50 @@ def execute_test_suite( | |
| # num_files_per_directory=files_per_directory, | ||
| # total_size_of_files_mib=size_mib, | ||
| # ) | ||
|
|
||
| # print("4 Files - 400GB") | ||
| # # 4 Files - 400GB ----------------------------------------------------------------------- | ||
| # depth = 1 | ||
| # sub_directories = 1 | ||
| # files_per_directory = 4 | ||
| # size_mib = 4 * 100 * 1024 | ||
|
|
||
| # execute_test_suite( | ||
| # path=root_path, | ||
| # depth_of_directory_tree=depth, | ||
| # num_sub_directories=sub_directories, | ||
| # num_files_per_directory=files_per_directory, | ||
| # total_size_of_files_mib=size_mib, | ||
| # ) | ||
|
|
||
|
|
||
| # print("45 File - 100GB") | ||
| # # 45 File - 100GB ----------------------------------------------------------------------- | ||
| # depth = 1 | ||
| # sub_directories = 1 | ||
| # files_per_directory = 45 | ||
| # size_mib = 45 * 100 * 1024 | ||
|
|
||
| # execute_test_suite( | ||
| # path=root_path, | ||
| # depth_of_directory_tree=depth, | ||
| # num_sub_directories=sub_directories, | ||
| # num_files_per_directory=files_per_directory, | ||
| # total_size_of_files_mib=size_mib, | ||
| # ) | ||
|
|
||
|
|
||
| # print("4 Files - 1MB") | ||
| # # 4 Files - 1MB ----------------------------------------------------------------------- | ||
| # depth = 1 | ||
| # sub_directories = 1 | ||
| # files_per_directory = 4 | ||
| # size_mib = 4 * 1024 | ||
|
|
||
| # execute_test_suite( | ||
| # path=root_path, | ||
| # depth_of_directory_tree=depth, | ||
| # num_sub_directories=sub_directories, | ||
| # num_files_per_directory=files_per_directory, | ||
| # total_size_of_files_mib=size_mib, | ||
| # ) | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I added "file" here because I am uploading all files directly to a project. If this is not desired, I could revert.