Skip to content

Commit 1b0ce23

Browse files
Parthibwhoseoyster
authored andcommitted
OPEN-5555: Added methods for sending stream data
1 parent 7b6c8ac commit 1b0ce23

File tree

3 files changed

+146
-1
lines changed

3 files changed

+146
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
2020
* Warnings if the dependencies from the `requirement_txt_file` and current environment are inconsistent.
2121
* Paths to custom SSL certificates can now be modified by altering `openlayer.api.VERIFY_REQUESTS`. The value can either be True (default), False, or a path to a certificate.
2222
* Ability to check for goal statuses through the API.
23+
* New method `send_stream_data` for inference pipelines that is used for real time streaming of small bits of data.
2324

2425
### Changed
2526

openlayer/__init__.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1046,7 +1046,7 @@ def upload_reference_dataset(
10461046
storage_uri_key="referenceDatasetUri",
10471047
method="PUT",
10481048
)
1049-
print("Referece dataset uploaded!")
1049+
print("Reference dataset uploaded!")
10501050

10511051
def upload_reference_dataframe(
10521052
self,
@@ -1073,6 +1073,75 @@ def upload_reference_dataframe(
10731073
dataset_config_file_path=dataset_config_file_path,
10741074
task_type=task_type,
10751075
)
1076+
1077+
def send_stream_data(
1078+
self,
1079+
inference_pipeline_id: str,
1080+
task_type: TaskType,
1081+
stream_df: pd.DataFrame,
1082+
stream_config: Optional[Dict[str, any]] = None,
1083+
stream_config_file_path: Optional[str] = None,
1084+
verbose: bool = True,
1085+
) -> None:
1086+
"""Publishes a batch of production data to the Openlayer platform."""
1087+
if stream_config is None and stream_config_file_path is None:
1088+
raise ValueError(
1089+
"Either `batch_config` or `batch_config_file_path` must be" " provided."
1090+
)
1091+
if stream_config_file_path is not None and not os.path.exists(
1092+
stream_config_file_path
1093+
):
1094+
raise exceptions.OpenlayerValidationError(
1095+
f"Stream config file path {stream_config_file_path} does not exist."
1096+
) from None
1097+
elif stream_config_file_path is not None:
1098+
stream_config = utils.read_yaml(stream_config_file_path)
1099+
1100+
stream_config["label"] = "production"
1101+
1102+
# Validate stream of data
1103+
stream_validator = dataset_validators.get_validator(
1104+
task_type=task_type,
1105+
dataset_config=stream_config,
1106+
dataset_config_file_path=stream_config_file_path,
1107+
dataset_df=stream_df,
1108+
)
1109+
failed_validations = stream_validator.validate()
1110+
1111+
if failed_validations:
1112+
raise exceptions.OpenlayerValidationError(
1113+
"There are issues with the stream of data and its config. \n"
1114+
"Make sure to fix all of the issues listed above before the upload.",
1115+
) from None
1116+
1117+
# Load dataset config and augment with defaults
1118+
stream_data = DatasetSchema().load(
1119+
{"task_type": task_type.value, **stream_config}
1120+
)
1121+
1122+
# Add default columns if not present
1123+
if stream_data.get("columnNames") is None:
1124+
stream_data["columnNames"] = list(stream_df.columns)
1125+
columns_to_add = {"timestampColumnName", "inferenceIdColumnName"}
1126+
for column in columns_to_add:
1127+
if stream_data.get(column) is None:
1128+
stream_data, stream_df = self._add_default_column(
1129+
config=stream_data, df=stream_df, column_name=column
1130+
)
1131+
1132+
1133+
body = {
1134+
"datasetConfig": stream_data,
1135+
"dataset": stream_df.to_dict(orient="records"),
1136+
}
1137+
1138+
self.api.post_request(
1139+
endpoint=f"inference-pipelines/{inference_pipeline_id}/data-stream",
1140+
body=body,
1141+
)
1142+
1143+
if verbose:
1144+
print("Stream published!")
10761145

10771146
def publish_batch_data(
10781147
self,

openlayer/inference_pipelines.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,81 @@ def upload_reference_dataframe(
238238
task_type=self.taskType,
239239
**kwargs,
240240
)
241+
242+
def send_stream_data(self, *args, **kwargs):
243+
"""Publishes a stream of production data to the Openlayer platform.
244+
245+
Parameters
246+
----------
247+
stream_df : pd.DataFrame
248+
Dataframe containing the batch of production data.
249+
stream_config : Dict[str, any], optional
250+
Dictionary containing the batch configuration. This is not needed if
251+
``batch_config_file_path`` is provided.
252+
253+
.. admonition:: What's in the config?
254+
255+
The configuration for a batch of data depends on the :obj:`TaskType`.
256+
Refer to the `How to write dataset configs guides <https://docs.openlayer.com/docs/tabular-classification-dataset-config>`_
257+
for details. These configurations are
258+
the same for development and batches of production data.
259+
260+
stream_config_file_path : str
261+
Path to the configuration YAML file. This is not needed if
262+
``batch_config`` is provided.
263+
264+
.. admonition:: What's in the config file?
265+
266+
The configuration for a batch of data depends on the :obj:`TaskType`.
267+
Refer to the `How to write dataset configs guides <https://docs.openlayer.com/docs/tabular-classification-dataset-config>`_
268+
for details. These configurations are
269+
the same for development and batches of production data.
270+
271+
Notes
272+
-----
273+
Production data usually has a column with the inference timestamps. This
274+
column is specified in the ``timestampsColumnName`` of the batch config file,
275+
and it should contain timestamps in the **UNIX format in seconds**.
276+
277+
Production data also usually has a column with the prediction IDs. This
278+
column is specified in the ``inferenceIdColumnName`` of the batch config file.
279+
This column is particularly important when the ground truths are not available
280+
during inference time, and they are updated later.
281+
282+
If the above are not provided, **Openlayer will generate inference IDs and use
283+
the current time as the inference timestamp**.
284+
285+
Examples
286+
--------
287+
**Related guide**: `How to set up monitoring <https://docs.openlayer.com/docs/set-up-monitoring>`_.
288+
289+
First, instantiate the client and retrieve an existing inference pipeline:
290+
291+
>>> import openlayer
292+
>>>
293+
>>> client = openlayer.OpenlayerClient('YOUR_API_KEY_HERE')
294+
>>>
295+
>>> project = client.load_project(name="Churn prediction")
296+
>>>
297+
>>> inference_pipeline = project.load_inference_pipeline(
298+
... name="XGBoost model inference pipeline",
299+
... )
300+
301+
With the ``InferencePipeline`` object retrieved, you can publish a batch
302+
of production data -- in this example, stored in a pandas dataframe
303+
called ``df`` -- with:
304+
305+
>>> inference_pipeline.send_stream_data(
306+
... batch_df=df,
307+
... batch_config=config,
308+
... )
309+
"""
310+
return self.client.send_stream_data(
311+
*args,
312+
inference_pipeline_id=self.id,
313+
task_type=self.taskType,
314+
**kwargs,
315+
)
241316

242317
def publish_batch_data(self, *args, **kwargs):
243318
"""Publishes a batch of production data to the Openlayer platform.

0 commit comments

Comments
 (0)