Skip to content

Commit 3873b99

Browse files
committed
code changes for streaming dataframe
1 parent a14b4e3 commit 3873b99

File tree

6 files changed

+343
-24
lines changed

6 files changed

+343
-24
lines changed

ads/feature_store/common/enums.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,35 @@ class IngestionMode(Enum):
6767
DEFAULT = "DEFAULT"
6868
UPSERT = "UPSERT"
6969

70+
class StreamIngestionMode(Enum):
71+
"""
72+
Enumeration for stream ingestion modes.
73+
74+
- `COMPLETE`: Represents complete stream ingestion where the entire dataset is replaced.
75+
- `APPEND`: Represents appending new data to the existing dataset.
76+
- `UPDATE`: Represents updating existing data in the dataset.
77+
"""
78+
COMPLETE = "COMPLETE"
79+
APPEND = "APPEND"
80+
UPDATE = "UPDATE"
81+
82+
class StreamingIngestionMode(Enum):
83+
"""
84+
An enumeration that represents the supported Ingestion Mode in feature store.
85+
86+
Attributes:
87+
OVERWRITE (str): Ingestion mode to overwrite the data in the system.
88+
APPEND (str): Ingestion mode to append the data in the system.
89+
UPSERT (str): Ingestion mode to insert and update the data in the system.
90+
91+
Methods:
92+
None
93+
"""
94+
95+
APPEND = "APPEND"
96+
DEFAULT = "DEFAULT"
97+
UPSERT = "UPSERT"
98+
7099

71100
class JoinType(Enum):
72101
"""Enumeration of supported SQL join types.

ads/feature_store/execution_strategy/delta_lake/delta_lake_service.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,18 @@ def write_dataframe_to_delta_lake(
5757
None.
5858
"""
5959
logger.info(f"target table name {target_table_name}")
60+
# query = (
61+
# dataflow_output.writeStream.outputMode("append")
62+
# .format("delta")
63+
# .option(
64+
# "checkpointLocation",
65+
# "/Users/yogeshkumawat/Desktop/Github-Oracle/accelerated-data-science/TestYogi/streaming",
66+
# )
67+
# .toTable(target_table_name)
68+
# )
69+
#
70+
# query.awaitTermination()
71+
6072
if (
6173
self.spark_engine.is_delta_table_exists(target_table_name)
6274
and ingestion_mode.upper() == IngestionMode.UPSERT.value
@@ -341,3 +353,33 @@ def __get_insert_update_query_expression(feature_data_source_columns, table_name
341353

342354
logger.info(f"get_insert_update_query_expression {feature_data_update_set}")
343355
return feature_data_update_set
356+
357+
def write_stream_dataframe_to_delta_lake(
358+
self,
359+
stream_dataframe,
360+
target_table,
361+
output_mode,
362+
query_name,
363+
await_termination,
364+
timeout,
365+
checkpoint_dir,
366+
feature_option_details,
367+
):
368+
query = (
369+
stream_dataframe
370+
.writeStream.
371+
outputMode(output_mode)
372+
.format("delta")
373+
.option(
374+
"checkpointLocation",
375+
checkpoint_dir,
376+
)
377+
.options(self.get_delta_write_config(feature_option_details))
378+
.queryName(query_name)
379+
.toTable(target_table)
380+
)
381+
382+
if await_termination:
383+
query.awaitTermination(timeout)
384+
385+
return query

ads/feature_store/execution_strategy/engine/spark_engine.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from datetime import datetime
99

1010
from ads.common.decorator.runtime_dependency import OptionalDependency
11+
from ads.feature_store.common.utils.utility import get_schema_from_spark_dataframe, get_schema_from_spark_df
1112

1213
try:
1314
from pyspark.sql import SparkSession
@@ -42,10 +43,10 @@ def __init__(self, metastore_id: str = None, spark_session: SparkSession = None)
4243
)
4344

4445
def get_time_version_data(
45-
self,
46-
delta_table_name: str,
47-
version_number: int = None,
48-
timestamp: datetime = None,
46+
self,
47+
delta_table_name: str,
48+
version_number: int = None,
49+
timestamp: datetime = None,
4950
):
5051
split_db_name = delta_table_name.split(".")
5152

@@ -103,10 +104,10 @@ def _read_delta_table(self, delta_table_path: str, read_options: Dict):
103104
return df
104105

105106
def sql(
106-
self,
107-
query: str,
108-
dataframe_type: DataFrameType = DataFrameType.SPARK,
109-
is_online: bool = False,
107+
self,
108+
query: str,
109+
dataframe_type: DataFrameType = DataFrameType.SPARK,
110+
is_online: bool = False,
110111
):
111112
"""Execute SQL command on the offline or online feature store database
112113
@@ -186,19 +187,27 @@ def get_tables_from_database(self, database):
186187

187188
return permanent_tables
188189

189-
def get_columns_from_table(self, table_name: str):
190+
def get_output_columns_from_table_or_dataframe(self, table_name: str = None, dataframe=None):
190191
"""Returns the column(features) along with type from the given table.
191192
192193
Args:
193194
table_name(str): A string specifying the name of table name for which columns should be returned.
195+
dataframe: Dataframe containing the transformed dataframe.
194196
195197
Returns:
196198
List[{"name": "<feature_name>","featureType": "<feature_type>"}]
197199
Returns the List of dictionary of column with name and type from the given table.
200+
198201
"""
202+
if table_name is None and dataframe is None:
203+
raise ValueError("Either 'table_name' or 'dataframe' must be provided to retrieve output columns.")
204+
205+
if dataframe is not None:
206+
feature_data_target = dataframe
207+
else:
208+
feature_data_target = self.spark.sql(f"SELECT * FROM {table_name} LIMIT 1")
199209

200210
target_table_columns = []
201-
feature_data_target = self.spark.sql(f"SELECT * FROM {table_name} LIMIT 1")
202211

203212
for field in feature_data_target.schema.fields:
204213
target_table_columns.append(

ads/feature_store/execution_strategy/execution_strategy.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,19 @@ def ingest_feature_definition(
4242
"""
4343
pass
4444

45+
@abstractmethod
46+
def ingest_feature_definition_stream(
47+
self,
48+
feature_group,
49+
feature_group_job: FeatureGroupJob,
50+
dataframe,
51+
query_name,
52+
await_termination,
53+
timeout,
54+
checkpoint_dir,
55+
):
56+
pass
57+
4558
@abstractmethod
4659
def ingest_dataset(self, dataset, dataset_job: DatasetJob):
4760
"""

0 commit comments

Comments
 (0)