diff --git a/docs/scripts/uploadBenchmark.py b/docs/scripts/uploadBenchmark.py index 8a6205b5d..d90a8411c 100644 --- a/docs/scripts/uploadBenchmark.py +++ b/docs/scripts/uploadBenchmark.py @@ -19,6 +19,7 @@ import asyncio import datetime +import logging import os import shutil import subprocess # nosec @@ -38,13 +39,17 @@ # from opentelemetry.sdk.resources import SERVICE_NAME, Resource # from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -# trace.set_tracer_provider( -# TracerProvider(resource=Resource(attributes={SERVICE_NAME: "upload_benchmarking"})) -# ) -# trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) +# os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "https://ingest.us.signoz.cloud" +# os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = "signoz-ingestion-key=" +# os.environ["OTEL_SERVICE_INSTANCE_ID"] = "local" + +trace.set_tracer_provider( + TracerProvider(resource=Resource(attributes={SERVICE_NAME: "upload_benchmarking"})) +) +trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) tracer = trace.get_tracer("my_tracer") -PARENT_PROJECT = "syn$FILL_ME_IN" +PARENT_PROJECT = "syn70984427" S3_BUCKET = "s3://$FILL_ME_IN" S3_PROFILE = "$FILL_ME_IN" @@ -104,11 +109,13 @@ def create_folder_structure( print(f"total_size_of_files_bytes: {total_size_of_files_bytes}") print(f"size_of_each_file_bytes: {size_of_each_file_bytes}") + chunk_size = MiB # size of each chunk in bytes + def create_files_in_current_dir(path_to_create_files: str) -> None: for i in range(1, num_files_per_directory + 1): - chunk_size = MiB # size of each chunk in bytes num_chunks = size_of_each_file_bytes // chunk_size filename = os.path.join(path_to_create_files, f"file{i}.txt") + # when the file size is right, just modify the beginning to refresh the file if ( os.path.isfile(filename) and os.path.getsize(filename) == size_of_each_file_bytes @@ -116,6 +123,7 @@ def create_files_in_current_dir(path_to_create_files: str) -> None: with open(filename, "r+b") as f: f.seek(0) f.write(os.urandom(chunk_size)) + # if the file doesn't exist or the size is wrong, create it from scratch else: if os.path.isfile(filename): os.remove(filename) @@ -158,7 +166,7 @@ def cleanup( ["aws", "s3", "rm", S3_BUCKET, "--recursive", "--profile", S3_PROFILE] ) # nosec if delete_synapse: - for child in syn.getChildren(PARENT_PROJECT, includeTypes=["folder"]): + for child in syn.getChildren(PARENT_PROJECT, includeTypes=["folder", "file"]): syn.delete(child["id"]) syn.cache.purge(after_date=datetime.datetime(2021, 1, 1)) @@ -212,6 +220,7 @@ def execute_synapseutils_test( manifestFile=manifest_path, sendMessages=False, ) + print( f"\nTime to sync to Synapse: {perf_counter() - time_before_syncToSynapse}" ) @@ -277,6 +286,75 @@ def execute_walk_test( ) +def execute_walk_file_sequential( + path: str, + test_name: str, +) -> None: + """Execute the test that uses os.walk to sync all files/folders to synapse. + + Arguments: + path: The path to the root directory + test_name: The name of the test to add to the span name + """ + with tracer.start_as_current_span(f"manual_walk__{test_name}"): + time_before_walking_tree = perf_counter() + + # Create descriptive log file name with timestamp + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + log_file_path = os.path.expanduser( + f"~/upload_benchmark_{test_name}_{timestamp}.log" + ) + with open(log_file_path, "a") as log_file: + log_file.write(f"Test: {test_name}\n") + start_time = datetime.datetime.now() + log_file.write(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n") + + # Create a simple parent lookup + parents = {path: PARENT_PROJECT} + + for directory_path, directory_names, file_names in os.walk(path): + # Create folders on Synapse first + for directory_name in directory_names: + folder_path = os.path.join(directory_path, directory_name) + parent_id = parents[directory_path] + + new_folder = Folder(name=directory_name, parent_id=parent_id) + # Store each folder immediately and save its Synapse ID + stored_folder = asyncio.run(new_folder.store_async()) + parents[folder_path] = stored_folder.id + + # Upload files one by one + for filename in file_names: + filepath = os.path.join(directory_path, filename) + parent_id = parents[directory_path] + + new_file = File( + path=filepath, + parent_id=parent_id, + annotations={ + "annot1": "value1", + "annot2": 1, + "annot3": 1.2, + "annot4": True, + "annot5": "2020-01-01", + }, + description="This is a Test File", + ) + # Upload this single file immediately + asyncio.run(new_file.store_async()) + + # Write end time and duration to log file + with open(log_file_path, "a") as log_file: + end_time = datetime.datetime.now() + duration = perf_counter() - time_before_walking_tree + log_file.write(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n") + log_file.write(f"Duration: {duration:.2f} seconds\n") + log_file.write("-" * 50 + "\n") + print( + f"\nTime to walk and sync tree sequentially: {perf_counter() - time_before_walking_tree}" + ) + + def execute_walk_test_oop( path: str, test_name: str, @@ -290,6 +368,16 @@ def execute_walk_test_oop( with tracer.start_as_current_span(f"manual_walk__{test_name}"): time_before_walking_tree = perf_counter() + # Create descriptive log file name with timestamp + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + log_file_path = os.path.expanduser( + f"~/upload_benchmark_{test_name}_{timestamp}.log" + ) + with open(log_file_path, "a") as log_file: + log_file.write(f"Test: {test_name}\n") + start_time = datetime.datetime.now() + log_file.write(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n") + root_project = Project(id=PARENT_PROJECT) parents = {path: root_project} for directory_path, directory_names, file_names in os.walk(path): @@ -318,6 +406,14 @@ def execute_walk_test_oop( ) parent_container.files.append(new_file) asyncio.run(root_project.store_async()) + + # Write end time and duration to log file + with open(log_file_path, "a") as log_file: + end_time = datetime.datetime.now() + duration = perf_counter() - time_before_walking_tree + log_file.write(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n") + log_file.write(f"Duration: {duration:.2f} seconds\n") + log_file.write("-" * 50 + "\n") print( f"\nTime to walk and sync tree - OOP: {perf_counter() - time_before_walking_tree}" ) @@ -361,7 +457,7 @@ def execute_test_suite( # Cleanup can be changed to delete_local=True when we want to clear the files out # This can be kept as False to allow multiple tests with the same file/folder # structure to re-use the files on Disk. - cleanup(path=path, delete_synapse=True, delete_s3=True, delete_local=False) + cleanup(path=path, delete_synapse=True, delete_s3=False, delete_local=False) _, total_files, _ = create_folder_structure( path=path, depth_of_directory_tree=depth_of_directory_tree, @@ -369,7 +465,13 @@ def execute_test_suite( num_files_per_directory=num_files_per_directory, total_size_of_files_mib=total_size_of_files_mib, ) - test_name = f"{total_files}_files_{total_size_of_files_mib}MiB" + + if total_size_of_files_mib >= 1024: + test_name = f"{total_files}_files_{total_size_of_files_mib // 1024}GiB" + else: + test_name = f"{total_files}_files_{total_size_of_files_mib}MiB" + + execute_walk_file_sequential(path, test_name) # execute_synapseutils_test(path, test_name) @@ -380,25 +482,27 @@ def execute_test_suite( # execute_sync_to_s3(path, test_name) -syn = synapseclient.Synapse(debug=False) -root_path = os.path.expanduser("~/benchmarking") +syn = synapseclient.Synapse(debug=True, http_timeout_seconds=600) +synapseclient.Synapse.enable_open_telemetry() +root_path = os.path.expanduser("~/benchmarking3") + # Log-in with ~.synapseConfig `authToken` syn.login() -print("25 Files - 25MiB") +# print("25 Files - 25MiB") # 25 Files - 25MiB ----------------------------------------------------------------------- -depth = 1 -sub_directories = 1 -files_per_directory = 25 -size_mib = 25 - -execute_test_suite( - path=root_path, - depth_of_directory_tree=depth, - num_sub_directories=sub_directories, - num_files_per_directory=files_per_directory, - total_size_of_files_mib=size_mib, -) +# depth = 1 +# sub_directories = 1 +# files_per_directory = 25 +# size_mib = 25 + +# execute_test_suite( +# path=root_path, +# depth_of_directory_tree=depth, +# num_sub_directories=sub_directories, +# num_files_per_directory=files_per_directory, +# total_size_of_files_mib=size_mib, +# ) # print("1 Files - 10MiB") # ## 1 Files - 10MiB ----------------------------------------------------------------------- @@ -535,3 +639,50 @@ def execute_test_suite( # num_files_per_directory=files_per_directory, # total_size_of_files_mib=size_mib, # ) + +# print("4 Files - 400GB") +# # 4 Files - 400GB ----------------------------------------------------------------------- +# depth = 1 +# sub_directories = 1 +# files_per_directory = 4 +# size_mib = 4 * 100 * 1024 + +# execute_test_suite( +# path=root_path, +# depth_of_directory_tree=depth, +# num_sub_directories=sub_directories, +# num_files_per_directory=files_per_directory, +# total_size_of_files_mib=size_mib, +# ) + + +# print("45 File - 100GB") +# # 45 File - 100GB ----------------------------------------------------------------------- +# depth = 1 +# sub_directories = 1 +# files_per_directory = 45 +# size_mib = 45 * 100 * 1024 + +# execute_test_suite( +# path=root_path, +# depth_of_directory_tree=depth, +# num_sub_directories=sub_directories, +# num_files_per_directory=files_per_directory, +# total_size_of_files_mib=size_mib, +# ) + + +# print("4 Files - 1MB") +# # 4 Files - 1MB ----------------------------------------------------------------------- +# depth = 1 +# sub_directories = 1 +# files_per_directory = 4 +# size_mib = 4 * 1024 + +# execute_test_suite( +# path=root_path, +# depth_of_directory_tree=depth, +# num_sub_directories=sub_directories, +# num_files_per_directory=files_per_directory, +# total_size_of_files_mib=size_mib, +# )