Skip to content

Commit 57db4ec

Browse files
gustavocidornelaswhoseoyster
authored andcommitted
Completes OPEN-5872 Improve data streaming logic
1 parent 80d28ed commit 57db4ec

File tree

1 file changed

+67
-61
lines changed

1 file changed

+67
-61
lines changed

openlayer/services/data_streamer.py

Lines changed: 67 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ def __init__(
2727
openlayer_project_name: Optional[str] = None,
2828
openlayer_inference_pipeline_name: Optional[str] = None,
2929
openlayer_inference_pipeline_id: Optional[str] = None,
30-
publish: bool = False,
3130
) -> None:
3231
self._openlayer_api_key = openlayer_api_key or utils.get_env_variable(
3332
"OPENLAYER_API_KEY"
@@ -44,7 +43,6 @@ def __init__(
4443
openlayer_inference_pipeline_id
4544
or utils.get_env_variable("OPENLAYER_INFERENCE_PIPELINE_ID")
4645
)
47-
self.publish = publish
4846

4947
# Lazy load the inference pipeline
5048
self.inference_pipeline = None
@@ -92,40 +90,39 @@ def _get_openlayer_attribute(
9290

9391
def _validate_attributes(self) -> None:
9492
"""Granular validation of the arguments."""
95-
if self.publish:
96-
if not self.openlayer_api_key:
97-
raise ValueError(
98-
"An Openlayer API key is required for publishing."
99-
" Please set it as environment variable named OPENLAYER_API_KEY."
100-
)
93+
if not self.openlayer_api_key:
94+
logger.error(
95+
"An Openlayer API key is required for publishing."
96+
" Please set it as environment variable named OPENLAYER_API_KEY."
97+
)
10198

102-
if (
103-
not self.openlayer_project_name
104-
and not self.openlayer_inference_pipeline_name
105-
and not self.openlayer_inference_pipeline_id
106-
):
107-
raise ValueError(
108-
"You must provide more information about the project and"
109-
" inference pipeline on Openlayer to publish data."
110-
" Please provide either: "
111-
" - the project name and inference pipeline name, or"
112-
" - the inference pipeline id."
113-
" You can set them as environment variables named"
114-
" OPENLAYER_PROJECT_NAME, OPENLAYER_INFERENCE_PIPELINE_NAME, "
115-
"and OPENLAYER_INFERENCE_PIPELINE_ID."
116-
)
99+
if (
100+
not self.openlayer_project_name
101+
and not self.openlayer_inference_pipeline_name
102+
and not self.openlayer_inference_pipeline_id
103+
):
104+
logger.error(
105+
"You must provide more information about the project and"
106+
" inference pipeline on Openlayer to publish data."
107+
" Please provide either: "
108+
" - the project name and inference pipeline name, or"
109+
" - the inference pipeline id."
110+
" You can set them as environment variables named"
111+
" OPENLAYER_PROJECT_NAME, OPENLAYER_INFERENCE_PIPELINE_NAME, "
112+
"and OPENLAYER_INFERENCE_PIPELINE_ID."
113+
)
117114

118-
if (
119-
self.openlayer_inference_pipeline_name
120-
and not self.openlayer_project_name
121-
and not self.openlayer_inference_pipeline_id
122-
):
123-
raise ValueError(
124-
"You must provide the Openlayer project name where the inference"
125-
" pipeline is located."
126-
" Please set it as the environment variable"
127-
" OPENLAYER_PROJECT_NAME."
128-
)
115+
if (
116+
self.openlayer_inference_pipeline_name
117+
and not self.openlayer_project_name
118+
and not self.openlayer_inference_pipeline_id
119+
):
120+
logger.error(
121+
"You must provide the Openlayer project name where the inference"
122+
" pipeline is located."
123+
" Please set it as the environment variable"
124+
" OPENLAYER_PROJECT_NAME."
125+
)
129126

130127
def stream_data(self, data: Dict[str, any], config: Dict[str, any]) -> None:
131128
"""Stream data to the Openlayer platform.
@@ -145,7 +142,7 @@ def _check_inference_pipeline_ready(self) -> None:
145142
if self.inference_pipeline is None:
146143
self._load_inference_pipeline()
147144
if self.inference_pipeline is None:
148-
raise ValueError(
145+
logger.error(
149146
"No inference pipeline found. Please provide the inference pipeline"
150147
" id or name."
151148
)
@@ -155,37 +152,46 @@ def _load_inference_pipeline(self) -> None:
155152
156153
If no platform/project information is provided, it is set to None.
157154
"""
158-
inference_pipeline = None
159-
client = openlayer.OpenlayerClient(
160-
api_key=self.openlayer_api_key, verbose=False
161-
)
162155

163-
# Prioritize the inference pipeline id over the name
164-
if self.openlayer_inference_pipeline_id:
165-
inference_pipeline = inference_pipelines.InferencePipeline(
166-
client=client,
167-
upload=None,
168-
json={"id": self.openlayer_inference_pipeline_id, "projectId": None},
169-
task_type=tasks.TaskType.LLM,
156+
inference_pipeline = None
157+
try:
158+
client = openlayer.OpenlayerClient(
159+
api_key=self.openlayer_api_key, verbose=False
170160
)
171-
elif self.openlayer_inference_pipeline_name:
172-
with utils.HidePrints():
173-
project = client.create_project(
174-
name=self.openlayer_project_name, task_type=tasks.TaskType.LLM
161+
162+
# Prioritize the inference pipeline id over the name
163+
if self.openlayer_inference_pipeline_id:
164+
inference_pipeline = inference_pipelines.InferencePipeline(
165+
client=client,
166+
upload=None,
167+
json={
168+
"id": self.openlayer_inference_pipeline_id,
169+
"projectId": None,
170+
},
171+
task_type=tasks.TaskType.LLM,
175172
)
176-
inference_pipeline = project.create_inference_pipeline(
177-
name=self.openlayer_inference_pipeline_name
173+
elif self.openlayer_inference_pipeline_name:
174+
with utils.HidePrints():
175+
project = client.create_project(
176+
name=self.openlayer_project_name, task_type=tasks.TaskType.LLM
177+
)
178+
inference_pipeline = project.create_inference_pipeline(
179+
name=self.openlayer_inference_pipeline_name
180+
)
181+
if inference_pipeline:
182+
logger.info(
183+
"Going to try to stream data to the inference pipeline with id %s.",
184+
inference_pipeline.id,
178185
)
179-
if inference_pipeline:
180-
logger.info(
181-
"Going to try to stream data to the inference pipeline with id %s.",
182-
inference_pipeline.id,
183-
)
184-
else:
185-
logger.warning(
186-
"No inference pipeline found. Data will not be streamed to Openlayer."
186+
else:
187+
logger.warning(
188+
"No inference pipeline found. Data will not be streamed to Openlayer."
189+
)
190+
self.inference_pipeline = inference_pipeline
191+
except Exception as exc:
192+
logger.error(
193+
"An error occurred while trying to load the inference pipeline: %s", exc
187194
)
188-
self.inference_pipeline = inference_pipeline
189195

190196
def publish_batch_data(self, df: pd.DataFrame, config: Dict[str, any]) -> None:
191197
"""Publish a batch of data to the Openlayer platform.

0 commit comments

Comments
 (0)