Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 175 additions & 24 deletions docs/scripts/uploadBenchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import asyncio
import datetime
import logging
import os
import shutil
import subprocess # nosec
Expand All @@ -38,13 +39,17 @@
# from opentelemetry.sdk.resources import SERVICE_NAME, Resource
# from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

# trace.set_tracer_provider(
# TracerProvider(resource=Resource(attributes={SERVICE_NAME: "upload_benchmarking"}))
# )
# trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
# os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "https://ingest.us.signoz.cloud"
# os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = "signoz-ingestion-key=<your key>"
# os.environ["OTEL_SERVICE_INSTANCE_ID"] = "local"

trace.set_tracer_provider(
TracerProvider(resource=Resource(attributes={SERVICE_NAME: "upload_benchmarking"}))
)
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
tracer = trace.get_tracer("my_tracer")

PARENT_PROJECT = "syn$FILL_ME_IN"
PARENT_PROJECT = "syn70984427"
S3_BUCKET = "s3://$FILL_ME_IN"
S3_PROFILE = "$FILL_ME_IN"

Expand Down Expand Up @@ -104,18 +109,21 @@ def create_folder_structure(
print(f"total_size_of_files_bytes: {total_size_of_files_bytes}")
print(f"size_of_each_file_bytes: {size_of_each_file_bytes}")

chunk_size = MiB # size of each chunk in bytes

def create_files_in_current_dir(path_to_create_files: str) -> None:
for i in range(1, num_files_per_directory + 1):
chunk_size = MiB # size of each chunk in bytes
num_chunks = size_of_each_file_bytes // chunk_size
filename = os.path.join(path_to_create_files, f"file{i}.txt")
# when the file size is right, just modify the beginning to refresh the file
if (
os.path.isfile(filename)
and os.path.getsize(filename) == size_of_each_file_bytes
):
with open(filename, "r+b") as f:
f.seek(0)
f.write(os.urandom(chunk_size))
# if the file doesn't exist or the size is wrong, create it from scratch
else:
if os.path.isfile(filename):
os.remove(filename)
Expand Down Expand Up @@ -158,7 +166,7 @@ def cleanup(
["aws", "s3", "rm", S3_BUCKET, "--recursive", "--profile", S3_PROFILE]
) # nosec
if delete_synapse:
for child in syn.getChildren(PARENT_PROJECT, includeTypes=["folder"]):
for child in syn.getChildren(PARENT_PROJECT, includeTypes=["folder", "file"]):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I added "file" here because I am uploading all files directly to a project. If this is not desired, I could revert.

syn.delete(child["id"])
syn.cache.purge(after_date=datetime.datetime(2021, 1, 1))

Expand Down Expand Up @@ -212,6 +220,7 @@ def execute_synapseutils_test(
manifestFile=manifest_path,
sendMessages=False,
)

print(
f"\nTime to sync to Synapse: {perf_counter() - time_before_syncToSynapse}"
)
Expand Down Expand Up @@ -277,6 +286,75 @@ def execute_walk_test(
)


def execute_walk_file_sequential(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BryanFauble I modified the execute_walk_test_oop function, and I am going to test uploading files sequentially. Do you mind taking a look? I already tested it with small files to ensure that this function works and files can be uploaded.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this looks good to me!

path: str,
test_name: str,
) -> None:
"""Execute the test that uses os.walk to sync all files/folders to synapse.

Arguments:
path: The path to the root directory
test_name: The name of the test to add to the span name
"""
with tracer.start_as_current_span(f"manual_walk__{test_name}"):
time_before_walking_tree = perf_counter()

# Create descriptive log file name with timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
log_file_path = os.path.expanduser(
f"~/upload_benchmark_{test_name}_{timestamp}.log"
)
with open(log_file_path, "a") as log_file:
log_file.write(f"Test: {test_name}\n")
start_time = datetime.datetime.now()
log_file.write(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")

# Create a simple parent lookup
parents = {path: PARENT_PROJECT}

for directory_path, directory_names, file_names in os.walk(path):
# Create folders on Synapse first
for directory_name in directory_names:
folder_path = os.path.join(directory_path, directory_name)
parent_id = parents[directory_path]

new_folder = Folder(name=directory_name, parent_id=parent_id)
# Store each folder immediately and save its Synapse ID
stored_folder = asyncio.run(new_folder.store_async())
parents[folder_path] = stored_folder.id

# Upload files one by one
for filename in file_names:
filepath = os.path.join(directory_path, filename)
parent_id = parents[directory_path]

new_file = File(
path=filepath,
parent_id=parent_id,
annotations={
"annot1": "value1",
"annot2": 1,
"annot3": 1.2,
"annot4": True,
"annot5": "2020-01-01",
},
description="This is a Test File",
)
# Upload this single file immediately
asyncio.run(new_file.store_async())

# Write end time and duration to log file
with open(log_file_path, "a") as log_file:
end_time = datetime.datetime.now()
duration = perf_counter() - time_before_walking_tree
log_file.write(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_file.write(f"Duration: {duration:.2f} seconds\n")
log_file.write("-" * 50 + "\n")
print(
f"\nTime to walk and sync tree sequentially: {perf_counter() - time_before_walking_tree}"
)


def execute_walk_test_oop(
path: str,
test_name: str,
Expand All @@ -290,6 +368,16 @@ def execute_walk_test_oop(
with tracer.start_as_current_span(f"manual_walk__{test_name}"):
time_before_walking_tree = perf_counter()

# Create descriptive log file name with timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
log_file_path = os.path.expanduser(
f"~/upload_benchmark_{test_name}_{timestamp}.log"
)
with open(log_file_path, "a") as log_file:
log_file.write(f"Test: {test_name}\n")
start_time = datetime.datetime.now()
log_file.write(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")

root_project = Project(id=PARENT_PROJECT)
parents = {path: root_project}
for directory_path, directory_names, file_names in os.walk(path):
Expand Down Expand Up @@ -318,6 +406,14 @@ def execute_walk_test_oop(
)
parent_container.files.append(new_file)
asyncio.run(root_project.store_async())

# Write end time and duration to log file
with open(log_file_path, "a") as log_file:
end_time = datetime.datetime.now()
duration = perf_counter() - time_before_walking_tree
log_file.write(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_file.write(f"Duration: {duration:.2f} seconds\n")
log_file.write("-" * 50 + "\n")
print(
f"\nTime to walk and sync tree - OOP: {perf_counter() - time_before_walking_tree}"
)
Expand Down Expand Up @@ -361,15 +457,21 @@ def execute_test_suite(
# Cleanup can be changed to delete_local=True when we want to clear the files out
# This can be kept as False to allow multiple tests with the same file/folder
# structure to re-use the files on Disk.
cleanup(path=path, delete_synapse=True, delete_s3=True, delete_local=False)
cleanup(path=path, delete_synapse=True, delete_s3=False, delete_local=False)
_, total_files, _ = create_folder_structure(
path=path,
depth_of_directory_tree=depth_of_directory_tree,
num_sub_directories=num_sub_directories,
num_files_per_directory=num_files_per_directory,
total_size_of_files_mib=total_size_of_files_mib,
)
test_name = f"{total_files}_files_{total_size_of_files_mib}MiB"

if total_size_of_files_mib >= 1024:
test_name = f"{total_files}_files_{total_size_of_files_mib // 1024}GiB"
else:
test_name = f"{total_files}_files_{total_size_of_files_mib}MiB"

execute_walk_file_sequential(path, test_name)

# execute_synapseutils_test(path, test_name)

Expand All @@ -380,25 +482,27 @@ def execute_test_suite(
# execute_sync_to_s3(path, test_name)


syn = synapseclient.Synapse(debug=False)
root_path = os.path.expanduser("~/benchmarking")
syn = synapseclient.Synapse(debug=True, http_timeout_seconds=600)
synapseclient.Synapse.enable_open_telemetry()
root_path = os.path.expanduser("~/benchmarking3")

# Log-in with ~.synapseConfig `authToken`
syn.login()

print("25 Files - 25MiB")
# print("25 Files - 25MiB")
# 25 Files - 25MiB -----------------------------------------------------------------------
depth = 1
sub_directories = 1
files_per_directory = 25
size_mib = 25

execute_test_suite(
path=root_path,
depth_of_directory_tree=depth,
num_sub_directories=sub_directories,
num_files_per_directory=files_per_directory,
total_size_of_files_mib=size_mib,
)
# depth = 1
# sub_directories = 1
# files_per_directory = 25
# size_mib = 25

# execute_test_suite(
# path=root_path,
# depth_of_directory_tree=depth,
# num_sub_directories=sub_directories,
# num_files_per_directory=files_per_directory,
# total_size_of_files_mib=size_mib,
# )

# print("1 Files - 10MiB")
# ## 1 Files - 10MiB -----------------------------------------------------------------------
Expand Down Expand Up @@ -535,3 +639,50 @@ def execute_test_suite(
# num_files_per_directory=files_per_directory,
# total_size_of_files_mib=size_mib,
# )

# print("4 Files - 400GB")
# # 4 Files - 400GB -----------------------------------------------------------------------
# depth = 1
# sub_directories = 1
# files_per_directory = 4
# size_mib = 4 * 100 * 1024

# execute_test_suite(
# path=root_path,
# depth_of_directory_tree=depth,
# num_sub_directories=sub_directories,
# num_files_per_directory=files_per_directory,
# total_size_of_files_mib=size_mib,
# )


# print("45 File - 100GB")
# # 45 File - 100GB -----------------------------------------------------------------------
# depth = 1
# sub_directories = 1
# files_per_directory = 45
# size_mib = 45 * 100 * 1024

# execute_test_suite(
# path=root_path,
# depth_of_directory_tree=depth,
# num_sub_directories=sub_directories,
# num_files_per_directory=files_per_directory,
# total_size_of_files_mib=size_mib,
# )


# print("4 Files - 1MB")
# # 4 Files - 1MB -----------------------------------------------------------------------
# depth = 1
# sub_directories = 1
# files_per_directory = 4
# size_mib = 4 * 1024

# execute_test_suite(
# path=root_path,
# depth_of_directory_tree=depth,
# num_sub_directories=sub_directories,
# num_files_per_directory=files_per_directory,
# total_size_of_files_mib=size_mib,
# )
Loading