Skip to content

Commit 28dfb8c

Browse files
committed
feat: add async batch uploads & improve client-side upload latency
1 parent a9cf640 commit 28dfb8c

File tree

4 files changed

+61
-23
lines changed

4 files changed

+61
-23
lines changed

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ dependencies = [
1616
"sniffio",
1717
"cached-property; python_version < '3.8'",
1818
"pandas; python_version >= '3.7'",
19+
"pyarrow>=11.0.0",
1920
"pyyaml>=6.0",
21+
"requests_toolbelt>=1.0.0",
2022
]
2123
requires-python = ">= 3.7"
2224
classifiers = [

src/openlayer/lib/data/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@
44
"StorageType",
55
"upload_reference_dataframe",
66
"upload_batch_inferences",
7+
"upload_batch_inferences_async",
78
"update_batch_inferences",
89
]
910

1011
from ._upload import StorageType
11-
from .batch_inferences import update_batch_inferences, upload_batch_inferences
12+
from .batch_inferences import (
13+
update_batch_inferences,
14+
upload_batch_inferences,
15+
upload_batch_inferences_async,
16+
)
1217
from .reference_dataset import upload_reference_dataframe

src/openlayer/lib/data/_upload.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
"""
66

77
import os
8-
import shutil
98
from enum import Enum
109
from typing import Optional
1110

src/openlayer/lib/data/batch_inferences.py

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,29 @@
11
"""Upload a batch of inferences to the Openlayer platform."""
22

3-
import os
43
import time
5-
import shutil
6-
import tarfile
74
import tempfile
85
from typing import Optional
96

107
import httpx
118
import pandas as pd
9+
import pyarrow as pa
1210

1311
from . import StorageType, _upload
14-
from .. import utils
1512
from ... import Openlayer
1613
from ..._utils import maybe_transform
1714
from ...types.inference_pipelines import data_stream_params
15+
import asyncio
1816

1917

20-
def upload_batch_inferences(
18+
async def upload_batch_inferences_async(
2119
client: Openlayer,
2220
inference_pipeline_id: str,
2321
config: data_stream_params.Config,
2422
dataset_df: Optional[pd.DataFrame] = None,
2523
dataset_path: Optional[str] = None,
2624
storage_type: Optional[StorageType] = None,
2725
merge: bool = False,
26+
verbose: bool = False,
2827
) -> None:
2928
"""Uploads a batch of inferences to the Openlayer platform."""
3029
if dataset_df is None and dataset_path is None:
@@ -33,7 +32,7 @@ def upload_batch_inferences(
3332
raise ValueError("Only one of dataset_df or dataset_path should be provided.")
3433

3534
uploader = _upload.Uploader(client, storage_type)
36-
object_name = f"batch_data_{time.time()}_{inference_pipeline_id}.tar.gz"
35+
object_name = f"batch_data_{time.time()}_{inference_pipeline_id}.arrow"
3736

3837
# Fetch presigned url
3938
presigned_url_response = client.storage.presigned_url.create(
@@ -42,26 +41,34 @@ def upload_batch_inferences(
4241

4342
# Write dataset and config to temp directory
4443
with tempfile.TemporaryDirectory() as tmp_dir:
45-
temp_file_path = f"{tmp_dir}/dataset.csv"
44+
# If DataFrame is provided, convert it to Arrow Table and write it using IPC
45+
# writer
4646
if dataset_df is not None:
47-
dataset_df.to_csv(temp_file_path, index=False)
48-
else:
49-
shutil.copy(dataset_path, temp_file_path)
47+
temp_file_path = f"{tmp_dir}/dataset.arrow"
48+
if verbose:
49+
print("Converting DataFrame to pyarrow Table...")
50+
pa_table = pa.Table.from_pandas(dataset_df)
51+
pa_schema = pa_table.schema
5052

51-
# Copy relevant files to tmp dir
52-
config["label"] = "production"
53-
utils.write_yaml(
54-
maybe_transform(config, data_stream_params.Config),
55-
f"{tmp_dir}/dataset_config.yaml",
56-
)
53+
if verbose:
54+
print(
55+
"Writing Arrow Table using RecordBatchStreamWriter to "
56+
f"{temp_file_path}"
57+
)
58+
with pa.ipc.RecordBatchStreamWriter(temp_file_path, pa_schema) as writer:
59+
writer.write_table(pa_table, max_chunksize=16384)
60+
else:
61+
object_name = f"batch_data_{time.time()}_{inference_pipeline_id}.csv"
62+
temp_file_path = dataset_path
5763

58-
tar_file_path = os.path.join(tmp_dir, object_name)
59-
with tarfile.open(tar_file_path, mode="w:gz") as tar:
60-
tar.add(tmp_dir, arcname=os.path.basename("monitoring_data"))
64+
# camelCase the config
65+
config = maybe_transform(config, data_stream_params.Config)
6166

62-
# Upload to storage
67+
# Upload tarball to storage
68+
if verbose:
69+
print("Uploading dataset to storage via presigned URL...")
6370
uploader.upload(
64-
file_path=tar_file_path,
71+
file_path=temp_file_path,
6572
object_name=object_name,
6673
presigned_url_response=presigned_url_response,
6774
)
@@ -73,10 +80,35 @@ def upload_batch_inferences(
7380
body={
7481
"storageUri": presigned_url_response.storage_uri,
7582
"performDataMerge": merge,
83+
"config": config,
7684
},
7785
)
7886

7987

88+
def upload_batch_inferences(
89+
client: Openlayer,
90+
inference_pipeline_id: str,
91+
config: data_stream_params.Config,
92+
dataset_df: Optional[pd.DataFrame] = None,
93+
dataset_path: Optional[str] = None,
94+
storage_type: Optional[StorageType] = None,
95+
merge: bool = False,
96+
verbose: bool = False,
97+
) -> None:
98+
asyncio.run(
99+
upload_batch_inferences_async(
100+
client,
101+
inference_pipeline_id,
102+
config,
103+
dataset_df,
104+
dataset_path,
105+
storage_type,
106+
merge,
107+
verbose,
108+
)
109+
)
110+
111+
80112
def update_batch_inferences(
81113
client: Openlayer,
82114
inference_pipeline_id: str,

0 commit comments

Comments
 (0)