11"""Upload a batch of inferences to the Openlayer platform."""
22
33import time
4+ import logging
45import tempfile
56from typing import Optional
67
1213from ... import Openlayer
1314from ..._utils import maybe_transform
1415from ...types .inference_pipelines import data_stream_params
15- import asyncio
1616
17+ log : logging .Logger = logging .getLogger (__name__ )
1718
18- async def upload_batch_inferences_async (
19+
20+ def upload_batch_inferences (
1921 client : Openlayer ,
2022 inference_pipeline_id : str ,
2123 config : data_stream_params .Config ,
2224 dataset_df : Optional [pd .DataFrame ] = None ,
2325 dataset_path : Optional [str ] = None ,
2426 storage_type : Optional [StorageType ] = None ,
2527 merge : bool = False ,
26- verbose : bool = False ,
2728) -> None :
2829 """Uploads a batch of inferences to the Openlayer platform."""
2930 if dataset_df is None and dataset_path is None :
@@ -45,16 +46,9 @@ async def upload_batch_inferences_async(
4546 # writer
4647 if dataset_df is not None :
4748 temp_file_path = f"{ tmp_dir } /dataset.arrow"
48- if verbose :
49- print ("Converting DataFrame to pyarrow Table..." )
5049 pa_table = pa .Table .from_pandas (dataset_df )
5150 pa_schema = pa_table .schema
5251
53- if verbose :
54- print (
55- "Writing Arrow Table using RecordBatchStreamWriter to "
56- f"{ temp_file_path } "
57- )
5852 with pa .ipc .RecordBatchStreamWriter (temp_file_path , pa_schema ) as writer :
5953 writer .write_table (pa_table , max_chunksize = 16384 )
6054 else :
@@ -64,14 +58,15 @@ async def upload_batch_inferences_async(
6458 # camelCase the config
6559 config = maybe_transform (config , data_stream_params .Config )
6660
67- # Upload tarball to storage
68- if verbose :
69- print ("Uploading dataset to storage via presigned URL..." )
70- uploader .upload (
61+ # Upload file to Openlayer storage
62+ log .info ("Uploading file to Openlayer" )
63+ response = uploader .upload (
7164 file_path = temp_file_path ,
7265 object_name = object_name ,
7366 presigned_url_response = presigned_url_response ,
7467 )
68+ if response .status_code != 200 :
69+ raise ValueError (f"Failed to upload file to storage: { response .text } " )
7570
7671 # Notify the backend
7772 client .post (
@@ -83,30 +78,7 @@ async def upload_batch_inferences_async(
8378 "config" : config ,
8479 },
8580 )
86-
87-
88- def upload_batch_inferences (
89- client : Openlayer ,
90- inference_pipeline_id : str ,
91- config : data_stream_params .Config ,
92- dataset_df : Optional [pd .DataFrame ] = None ,
93- dataset_path : Optional [str ] = None ,
94- storage_type : Optional [StorageType ] = None ,
95- merge : bool = False ,
96- verbose : bool = False ,
97- ) -> None :
98- asyncio .run (
99- upload_batch_inferences_async (
100- client ,
101- inference_pipeline_id ,
102- config ,
103- dataset_df ,
104- dataset_path ,
105- storage_type ,
106- merge ,
107- verbose ,
108- )
109- )
81+ log .info ("Success! Uploaded batch inferences" )
11082
11183
11284def update_batch_inferences (
0 commit comments