@@ -1097,12 +1097,13 @@ def send_stream_data(
10971097 elif stream_config_file_path is not None :
10981098 stream_config = utils .read_yaml (stream_config_file_path )
10991099
1100- stream_config ["label" ] = "production"
1100+ stream_config_to_validate = dict (stream_config )
1101+ stream_config_to_validate ["label" ] = "production"
11011102
11021103 # Validate stream of data
11031104 stream_validator = dataset_validators .get_validator (
11041105 task_type = task_type ,
1105- dataset_config = stream_config ,
1106+ dataset_config = stream_config_to_validate ,
11061107 dataset_config_file_path = stream_config_file_path ,
11071108 dataset_df = stream_df ,
11081109 )
@@ -1115,13 +1116,9 @@ def send_stream_data(
11151116 ) from None
11161117
11171118 # Load dataset config and augment with defaults
1118- stream_data = DatasetSchema ().load (
1119- {"task_type" : task_type .value , ** stream_config }
1120- )
1119+ stream_data = dict (stream_config )
11211120
11221121 # Add default columns if not present
1123- if stream_data .get ("columnNames" ) is None :
1124- stream_data ["columnNames" ] = list (stream_df .columns )
11251122 columns_to_add = {"timestampColumnName" , "inferenceIdColumnName" }
11261123 for column in columns_to_add :
11271124 if stream_data .get (column ) is None :
@@ -1131,10 +1128,12 @@ def send_stream_data(
11311128
11321129
11331130 body = {
1134- "datasetConfig " : stream_data ,
1135- "dataset " : stream_df .to_dict (orient = "records" ),
1131+ "config " : stream_data ,
1132+ "rows " : stream_df .to_dict (orient = "records" ),
11361133 }
1137-
1134+
1135+ print ("This is the body!" )
1136+ print (body )
11381137 self .api .post_request (
11391138 endpoint = f"inference-pipelines/{ inference_pipeline_id } /data-stream" ,
11401139 body = body ,
@@ -1182,11 +1181,6 @@ def publish_batch_data(
11821181 "Make sure to fix all of the issues listed above before the upload." ,
11831182 ) from None
11841183
1185- # Load dataset config and augment with defaults
1186- batch_data = DatasetSchema ().load (
1187- {"task_type" : task_type .value , ** batch_config }
1188- )
1189-
11901184 # Add default columns if not present
11911185 if batch_data .get ("columnNames" ) is None :
11921186 batch_data ["columnNames" ] = list (batch_df .columns )
0 commit comments