11#!/usr/bin/env python
22# -*- coding: utf-8; -*-
3+ import copy
4+ import os
35
46# Copyright (c) 2023 Oracle and/or its affiliates.
57# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
4143from ads .feature_engineering .feature_type import datetime
4244
4345logger = logging .getLogger (__name__ )
46+ logger .setLevel (logging .INFO )
4447
4548
4649def get_execution_engine_type (
@@ -117,6 +120,92 @@ def validate_delta_format_parameters(
117120 raise Exception (f"version number cannot be negative" )
118121
119122
123+ def show_ingestion_summary (
124+ entity_id : str ,
125+ entity_type : EntityType = EntityType .FEATURE_GROUP ,
126+ error_details : str = None ,
127+ ):
128+ """
129+ Displays a ingestion summary table with the given entity type and error details.
130+
131+ Args:
132+ entity_id: str
133+ entity_type (EntityType, optional): The type of entity being ingested. Defaults to EntityType.FEATURE_GROUP.
134+ error_details (str, optional): Details of any errors that occurred during ingestion. Defaults to None.
135+ """
136+ from tabulate import tabulate
137+
138+ table_headers = ["entity_id" , "entity_type" , "ingestion_status" , "error_details" ]
139+ ingestion_status = "Failed" if error_details else "Succeeded"
140+
141+ table_values = [
142+ entity_id ,
143+ entity_type .value ,
144+ ingestion_status ,
145+ error_details if error_details else "None" ,
146+ ]
147+
148+ logger .info (
149+ "Ingestion Summary \n "
150+ + tabulate (
151+ [table_values ],
152+ headers = table_headers ,
153+ tablefmt = "fancy_grid" ,
154+ numalign = "center" ,
155+ stralign = "center" ,
156+ )
157+ )
158+
159+
160+ def show_validation_summary (ingestion_status : str , validation_output , expectation_type ):
161+ from tabulate import tabulate
162+
163+ statistics = validation_output ["statistics" ]
164+
165+ table_headers = (
166+ ["expectation_type" ] + list (statistics .keys ()) + ["ingestion_status" ]
167+ )
168+
169+ table_values = [expectation_type ] + list (statistics .values ()) + [ingestion_status ]
170+
171+ logger .info (
172+ "Validation Summary \n "
173+ + tabulate (
174+ [table_values ],
175+ headers = table_headers ,
176+ tablefmt = "fancy_grid" ,
177+ numalign = "center" ,
178+ stralign = "center" ,
179+ )
180+ )
181+
182+ rule_table_headers = ["rule_type" , "arguments" , "status" ]
183+
184+ rule_table_values = [
185+ [
186+ rule_output ["expectation_config" ].get ("expectation_type" ),
187+ {
188+ key : value
189+ for key , value in rule_output ["expectation_config" ]["kwargs" ].items ()
190+ if key != "batch_id"
191+ },
192+ rule_output .get ("success" ),
193+ ]
194+ for rule_output in validation_output ["results" ]
195+ ]
196+
197+ logger .info (
198+ "Validations Rules Summary \n "
199+ + tabulate (
200+ rule_table_values ,
201+ headers = rule_table_headers ,
202+ tablefmt = "fancy_grid" ,
203+ numalign = "center" ,
204+ stralign = "center" ,
205+ )
206+ )
207+
208+
120209def get_features (
121210 output_columns : List [dict ],
122211 parent_id : str ,
@@ -154,8 +243,10 @@ def get_features(
154243 return features
155244
156245
157- def get_schema_from_pandas_df (df : pd .DataFrame ):
158- spark = SparkSessionSingleton ().get_spark_session ()
246+ def get_schema_from_pandas_df (df : pd .DataFrame , feature_store_id : str ):
247+ spark = SparkSessionSingleton (
248+ get_metastore_id (feature_store_id )
249+ ).get_spark_session ()
159250 converted_df = spark .createDataFrame (df )
160251 return get_schema_from_spark_df (converted_df )
161252
@@ -174,27 +265,29 @@ def get_schema_from_spark_df(df: DataFrame):
174265 return schema_details
175266
176267
177- def get_schema_from_df (data_frame : Union [DataFrame , pd .DataFrame ]) -> List [dict ]:
268+ def get_schema_from_df (
269+ data_frame : Union [DataFrame , pd .DataFrame ], feature_store_id : str
270+ ) -> List [dict ]:
178271 """
179272 Given a DataFrame, returns a list of dictionaries that describe its schema.
180273 If the DataFrame is a pandas DataFrame, it uses pandas methods to get the schema.
181274 If it's a PySpark DataFrame, it uses PySpark methods to get the schema.
182275 """
183276 if isinstance (data_frame , pd .DataFrame ):
184- return get_schema_from_pandas_df (data_frame )
277+ return get_schema_from_pandas_df (data_frame , feature_store_id )
185278 else :
186279 return get_schema_from_spark_df (data_frame )
187280
188281
189282def get_input_features_from_df (
190- data_frame : Union [DataFrame , pd .DataFrame ]
283+ data_frame : Union [DataFrame , pd .DataFrame ], feature_store_id : str
191284) -> List [FeatureDetail ]:
192285 """
193286 Given a DataFrame, returns a list of FeatureDetail objects that represent its input features.
194287 Each FeatureDetail object contains information about a single input feature, such as its name, data type, and
195288 whether it's categorical or numerical.
196289 """
197- schema_details = get_schema_from_df (data_frame )
290+ schema_details = get_schema_from_df (data_frame , feature_store_id )
198291 feature_details = []
199292
200293 for schema_detail in schema_details :
@@ -263,7 +356,7 @@ def largest_matching_subset_of_primary_keys(left_feature_group, right_feature_gr
263356
264357
265358def convert_pandas_datatype_with_schema (
266- raw_feature_details : List [dict ], input_df : pd .DataFrame
359+ raw_feature_details : List [dict ], input_df : pd .DataFrame
267360) -> pd .DataFrame :
268361 feature_detail_map = {}
269362 columns_to_remove = []
@@ -280,21 +373,25 @@ def convert_pandas_datatype_with_schema(
280373 .where (pd .notnull (input_df [column ]), None )
281374 )
282375 else :
283- logger .warning ("column" + column + "doesn't exist in the input feature details" )
376+ logger .warning (
377+ "column" + column + "doesn't exist in the input feature details"
378+ )
284379 columns_to_remove .append (column )
285- return input_df .drop (columns = columns_to_remove )
380+ return input_df .drop (columns = columns_to_remove )
286381
287382
288383def convert_spark_dataframe_with_schema (
289- raw_feature_details : List [dict ], input_df : DataFrame
384+ raw_feature_details : List [dict ], input_df : DataFrame
290385) -> DataFrame :
291386 feature_detail_map = {}
292387 columns_to_remove = []
293388 for feature_details in raw_feature_details :
294389 feature_detail_map [feature_details .get ("name" )] = feature_details
295390 for column in input_df .columns :
296391 if column not in feature_detail_map .keys ():
297- logger .warning ("column" + column + "doesn't exist in the input feature details" )
392+ logger .warning (
393+ "column" + column + "doesn't exist in the input feature details"
394+ )
298395 columns_to_remove .append (column )
299396
300397 return input_df .drop (* columns_to_remove )
0 commit comments