Skip to content
199 changes: 175 additions & 24 deletions docs/scripts/uploadBenchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import asyncio
import datetime
import logging
import os
import shutil
import subprocess # nosec
Expand All @@ -38,13 +39,17 @@
# from opentelemetry.sdk.resources import SERVICE_NAME, Resource
# from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

# trace.set_tracer_provider(
# TracerProvider(resource=Resource(attributes={SERVICE_NAME: "upload_benchmarking"}))
# )
# trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
# os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "https://ingest.us.signoz.cloud"
# os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = "signoz-ingestion-key=<your key>"
# os.environ["OTEL_SERVICE_INSTANCE_ID"] = "local"

trace.set_tracer_provider(
TracerProvider(resource=Resource(attributes={SERVICE_NAME: "upload_benchmarking"}))
)
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
tracer = trace.get_tracer("my_tracer")

PARENT_PROJECT = "syn$FILL_ME_IN"
PARENT_PROJECT = "syn70984427"
S3_BUCKET = "s3://$FILL_ME_IN"
S3_PROFILE = "$FILL_ME_IN"

Expand Down Expand Up @@ -104,18 +109,21 @@ def create_folder_structure(
print(f"total_size_of_files_bytes: {total_size_of_files_bytes}")
print(f"size_of_each_file_bytes: {size_of_each_file_bytes}")

chunk_size = MiB # size of each chunk in bytes

def create_files_in_current_dir(path_to_create_files: str) -> None:
for i in range(1, num_files_per_directory + 1):
chunk_size = MiB # size of each chunk in bytes
num_chunks = size_of_each_file_bytes // chunk_size
filename = os.path.join(path_to_create_files, f"file{i}.txt")
# when the file size is right, just modify the beginning to refresh the file
if (
os.path.isfile(filename)
and os.path.getsize(filename) == size_of_each_file_bytes
):
with open(filename, "r+b") as f:
f.seek(0)
f.write(os.urandom(chunk_size))
# if the file doesn't exist or the size is wrong, create it from scratch
else:
if os.path.isfile(filename):
os.remove(filename)
Expand Down Expand Up @@ -158,7 +166,7 @@ def cleanup(
["aws", "s3", "rm", S3_BUCKET, "--recursive", "--profile", S3_PROFILE]
) # nosec
if delete_synapse:
for child in syn.getChildren(PARENT_PROJECT, includeTypes=["folder"]):
for child in syn.getChildren(PARENT_PROJECT, includeTypes=["folder", "file"]):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I added "file" here because I am uploading all files directly to a project. If this is not desired, I could revert.

syn.delete(child["id"])
syn.cache.purge(after_date=datetime.datetime(2021, 1, 1))

Expand Down Expand Up @@ -212,6 +220,7 @@ def execute_synapseutils_test(
manifestFile=manifest_path,
sendMessages=False,
)

print(
f"\nTime to sync to Synapse: {perf_counter() - time_before_syncToSynapse}"
)
Expand Down Expand Up @@ -277,6 +286,75 @@ def execute_walk_test(
)


def execute_walk_file_sequential(
path: str,
test_name: str,
) -> None:
"""Execute the test that uses os.walk to sync all files/folders to synapse.

Arguments:
path: The path to the root directory
test_name: The name of the test to add to the span name
"""
with tracer.start_as_current_span(f"manual_walk__{test_name}"):
time_before_walking_tree = perf_counter()

# Create descriptive log file name with timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
log_file_path = os.path.expanduser(
f"~/upload_benchmark_{test_name}_{timestamp}.log"
)
with open(log_file_path, "a") as log_file:
log_file.write(f"Test: {test_name}\n")
start_time = datetime.datetime.now()
log_file.write(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")

# Create a simple parent lookup
parents = {path: PARENT_PROJECT}

for directory_path, directory_names, file_names in os.walk(path):
# Create folders on Synapse first
for directory_name in directory_names:
folder_path = os.path.join(directory_path, directory_name)
parent_id = parents[directory_path]

new_folder = Folder(name=directory_name, parent_id=parent_id)
# Store each folder immediately and save its Synapse ID
stored_folder = asyncio.run(new_folder.store_async())
parents[folder_path] = stored_folder.id

# Upload files one by one
for filename in file_names:
filepath = os.path.join(directory_path, filename)
parent_id = parents[directory_path]

new_file = File(
path=filepath,
parent_id=parent_id,
annotations={
"annot1": "value1",
"annot2": 1,
"annot3": 1.2,
"annot4": True,
"annot5": "2020-01-01",
},
description="This is a Test File",
)
# Upload this single file immediately
asyncio.run(new_file.store_async())

# Write end time and duration to log file
with open(log_file_path, "a") as log_file:
end_time = datetime.datetime.now()
duration = perf_counter() - time_before_walking_tree
log_file.write(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_file.write(f"Duration: {duration:.2f} seconds\n")
log_file.write("-" * 50 + "\n")
print(
f"\nTime to walk and sync tree sequentially: {perf_counter() - time_before_walking_tree}"
)


def execute_walk_test_oop(
path: str,
test_name: str,
Expand All @@ -290,6 +368,16 @@ def execute_walk_test_oop(
with tracer.start_as_current_span(f"manual_walk__{test_name}"):
time_before_walking_tree = perf_counter()

# Create descriptive log file name with timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
log_file_path = os.path.expanduser(
f"~/upload_benchmark_{test_name}_{timestamp}.log"
)
with open(log_file_path, "a") as log_file:
log_file.write(f"Test: {test_name}\n")
start_time = datetime.datetime.now()
log_file.write(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")

root_project = Project(id=PARENT_PROJECT)
parents = {path: root_project}
for directory_path, directory_names, file_names in os.walk(path):
Expand Down Expand Up @@ -318,6 +406,14 @@ def execute_walk_test_oop(
)
parent_container.files.append(new_file)
asyncio.run(root_project.store_async())

# Write end time and duration to log file
with open(log_file_path, "a") as log_file:
end_time = datetime.datetime.now()
duration = perf_counter() - time_before_walking_tree
log_file.write(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_file.write(f"Duration: {duration:.2f} seconds\n")
log_file.write("-" * 50 + "\n")
print(
f"\nTime to walk and sync tree - OOP: {perf_counter() - time_before_walking_tree}"
)
Expand Down Expand Up @@ -361,15 +457,21 @@ def execute_test_suite(
# Cleanup can be changed to delete_local=True when we want to clear the files out
# This can be kept as False to allow multiple tests with the same file/folder
# structure to re-use the files on Disk.
cleanup(path=path, delete_synapse=True, delete_s3=True, delete_local=False)
cleanup(path=path, delete_synapse=True, delete_s3=False, delete_local=False)
_, total_files, _ = create_folder_structure(
path=path,
depth_of_directory_tree=depth_of_directory_tree,
num_sub_directories=num_sub_directories,
num_files_per_directory=num_files_per_directory,
total_size_of_files_mib=total_size_of_files_mib,
)
test_name = f"{total_files}_files_{total_size_of_files_mib}MiB"

if total_size_of_files_mib >= 1024:
test_name = f"{total_files}_files_{total_size_of_files_mib // 1024}GiB"
else:
test_name = f"{total_files}_files_{total_size_of_files_mib}MiB"

execute_walk_file_sequential(path, test_name)

# execute_synapseutils_test(path, test_name)

Expand All @@ -380,25 +482,27 @@ def execute_test_suite(
# execute_sync_to_s3(path, test_name)


syn = synapseclient.Synapse(debug=False)
root_path = os.path.expanduser("~/benchmarking")
syn = synapseclient.Synapse(debug=True, http_timeout_seconds=600)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we found that a higher default http_timeout_seconds as a bit more stable, is that a case to raise the defaults to a higher value @linglp ?

synapseclient.Synapse.enable_open_telemetry()
root_path = os.path.expanduser("~/benchmarking3")

# Log-in with ~.synapseConfig `authToken`
syn.login()

print("25 Files - 25MiB")
# print("25 Files - 25MiB")
# 25 Files - 25MiB -----------------------------------------------------------------------
depth = 1
sub_directories = 1
files_per_directory = 25
size_mib = 25

execute_test_suite(
path=root_path,
depth_of_directory_tree=depth,
num_sub_directories=sub_directories,
num_files_per_directory=files_per_directory,
total_size_of_files_mib=size_mib,
)
# depth = 1
# sub_directories = 1
# files_per_directory = 25
# size_mib = 25

# execute_test_suite(
# path=root_path,
# depth_of_directory_tree=depth,
# num_sub_directories=sub_directories,
# num_files_per_directory=files_per_directory,
# total_size_of_files_mib=size_mib,
# )

# print("1 Files - 10MiB")
# ## 1 Files - 10MiB -----------------------------------------------------------------------
Expand Down Expand Up @@ -535,3 +639,50 @@ def execute_test_suite(
# num_files_per_directory=files_per_directory,
# total_size_of_files_mib=size_mib,
# )

# print("4 Files - 400GB")
# # 4 Files - 400GB -----------------------------------------------------------------------
# depth = 1
# sub_directories = 1
# files_per_directory = 4
# size_mib = 4 * 100 * 1024

# execute_test_suite(
# path=root_path,
# depth_of_directory_tree=depth,
# num_sub_directories=sub_directories,
# num_files_per_directory=files_per_directory,
# total_size_of_files_mib=size_mib,
# )


# print("45 File - 100GB")
# # 45 File - 100GB -----------------------------------------------------------------------
# depth = 1
# sub_directories = 1
# files_per_directory = 45
# size_mib = 45 * 100 * 1024

# execute_test_suite(
# path=root_path,
# depth_of_directory_tree=depth,
# num_sub_directories=sub_directories,
# num_files_per_directory=files_per_directory,
# total_size_of_files_mib=size_mib,
# )


# print("4 Files - 1MB")
# # 4 Files - 1MB -----------------------------------------------------------------------
# depth = 1
# sub_directories = 1
# files_per_directory = 4
# size_mib = 4 * 1024

# execute_test_suite(
# path=root_path,
# depth_of_directory_tree=depth,
# num_sub_directories=sub_directories,
# num_files_per_directory=files_per_directory,
# total_size_of_files_mib=size_mib,
# )
Loading