Skip to content

Commit cc3fb53

Browse files
committed
refactored code for streaming changes
1 parent 3873b99 commit cc3fb53

File tree

2 files changed

+5
-48
lines changed

2 files changed

+5
-48
lines changed

ads/feature_store/common/enums.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -79,24 +79,6 @@ class StreamIngestionMode(Enum):
7979
APPEND = "APPEND"
8080
UPDATE = "UPDATE"
8181

82-
class StreamingIngestionMode(Enum):
83-
"""
84-
An enumeration that represents the supported Ingestion Mode in feature store.
85-
86-
Attributes:
87-
OVERWRITE (str): Ingestion mode to overwrite the data in the system.
88-
APPEND (str): Ingestion mode to append the data in the system.
89-
UPSERT (str): Ingestion mode to insert and update the data in the system.
90-
91-
Methods:
92-
None
93-
"""
94-
95-
APPEND = "APPEND"
96-
DEFAULT = "DEFAULT"
97-
UPSERT = "UPSERT"
98-
99-
10082
class JoinType(Enum):
10183
"""Enumeration of supported SQL join types.
10284

ads/feature_store/execution_strategy/spark/spark_execution.py

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -210,26 +210,6 @@ def _validate_expectation(expectation_type, validation_output: dict):
210210
if error_message:
211211
raise Exception(error_message)
212212

213-
@classmethod
214-
def is_streaming_dataframe(cls, data_frame):
215-
"""
216-
Check if the provided DataFrame is a Spark Streaming DataFrame.
217-
218-
Args:
219-
data_frame (DataFrame): The DataFrame to check.
220-
221-
Returns:
222-
bool: True if it's a Spark Streaming DataFrame, False otherwise.
223-
"""
224-
if isinstance(data_frame, pd.DataFrame):
225-
return False
226-
elif isinstance(data_frame, DataFrame):
227-
return data_frame.isStreaming
228-
else:
229-
raise ValueError(
230-
"Invalid DataFrame type. Expected Pandas or Spark DataFrame."
231-
)
232-
233213
def _save_offline_dataframe(
234214
self, data_frame, feature_group, feature_group_job: FeatureGroupJob
235215
):
@@ -332,17 +312,12 @@ def _save_offline_dataframe(
332312
)
333313

334314
logger.info(f"output features for the FeatureGroup: {output_features}")
335-
# Compute Feature Statistics
336315

337-
if self.is_streaming_dataframe(data_frame):
338-
logger.warning(
339-
"Stats skipped: Streaming DataFrames are not supported for statistics."
340-
)
341-
else:
342-
feature_statistics = StatisticsService.compute_stats_with_mlm(
343-
statistics_config=feature_group.oci_feature_group.statistics_config,
344-
input_df=featured_data,
345-
)
316+
# Compute Feature Statistics
317+
feature_statistics = StatisticsService.compute_stats_with_mlm(
318+
statistics_config=feature_group.oci_feature_group.statistics_config,
319+
input_df=featured_data,
320+
)
346321

347322
except Exception as ex:
348323
error_details = str(ex)

0 commit comments

Comments
 (0)