@@ -77,10 +77,10 @@ def __init__(self, metastore_id: str = None):
7777 self ._jvm = self ._spark_context ._jvm
7878
7979 def ingest_feature_definition (
80- self ,
81- feature_group : "FeatureGroup" ,
82- feature_group_job : FeatureGroupJob ,
83- dataframe ,
80+ self ,
81+ feature_group : "FeatureGroup" ,
82+ feature_group_job : FeatureGroupJob ,
83+ dataframe ,
8484 ):
8585 try :
8686 self ._save_offline_dataframe (dataframe , feature_group , feature_group_job )
@@ -94,7 +94,7 @@ def ingest_dataset(self, dataset, dataset_job: DatasetJob):
9494 raise SparkExecutionException (e ).with_traceback (e .__traceback__ )
9595
9696 def delete_feature_definition (
97- self , feature_group : "FeatureGroup" , feature_group_job : FeatureGroupJob
97+ self , feature_group : "FeatureGroup" , feature_group_job : FeatureGroupJob
9898 ):
9999 """
100100 Deletes a feature definition from the system.
@@ -188,7 +188,7 @@ def _validate_expectation(expectation_type, validation_output: dict):
188188 raise Exception (error_message )
189189
190190 def _save_offline_dataframe (
191- self , data_frame , feature_group , feature_group_job : FeatureGroupJob
191+ self , data_frame , feature_group , feature_group_job : FeatureGroupJob
192192 ):
193193 """Ingest dataframe to the feature store system. as now this handles both spark dataframe and pandas
194194 dataframe. in case of pandas after transformation we convert it to spark and write to the delta.
@@ -211,6 +211,7 @@ def _save_offline_dataframe(
211211 feature_statistics = None
212212 validation_output = None
213213 output_features = []
214+ version = 2 # after MLM upgrade
214215
215216 try :
216217 # Create database in hive metastore if not exist
@@ -312,6 +313,7 @@ def _save_offline_dataframe(
312313 "validation_output" : str (validation_output ) if validation_output else None ,
313314 "commit_id" : "commit_id" ,
314315 "feature_statistics" : feature_statistics ,
316+ "version" : version
315317 }
316318
317319 self ._update_job_and_parent_details (
@@ -460,7 +462,7 @@ def _save_dataset_input(self, dataset, dataset_job: DatasetJob):
460462
461463 @staticmethod
462464 def _update_job_and_parent_details (
463- parent_entity , job_entity , output_features = None , output_details = None
465+ parent_entity , job_entity , output_features = None , output_details = None
464466 ):
465467 """
466468 Updates the parent and job entities with relevant details.
0 commit comments