1010from ads .feature_store .feature_group import FeatureGroup
1111from ads .feature_store .common .enums import TransformationMode
1212from tests .integration .feature_store .test_base import FeatureStoreTestCase
13+ from ads .feature_store .common .spark_session_singleton import SparkSessionSingleton
1314
1415
1516class TestFeatureStoreTransformation (FeatureStoreTestCase ):
@@ -18,7 +19,7 @@ class TestFeatureStoreTransformation(FeatureStoreTestCase):
1819 valid_spark_queries = [
1920 "SELECT requisitionId, length(title) As title_word_count,"
2021 " CASE When length(title) > 0 Then 0 Else 1 End As empty_title,"
21- " length(description) As description_word_count," \
22+ " length(description) As description_word_count,"
2223 " length(designation) As designation_word_count FROM DATA_SOURCE_INPUT" ,
2324 "SELECT user_id, credit_score FROM DATA_SOURCE_INPUT" ,
2425 "SELECT country, city, zipcode, state FROM DATA_SOURCE_INPUT WHERE state in ('PR', 'AZ', 'FL') order by state" ,
@@ -104,7 +105,19 @@ def test_transformation_query_with_feature_group_job(self):
104105 )
105106 assert fg .oci_feature_group .id
106107
107- fg .materialise (self .data )
108+ # convert pandas to spark dataframe to run SPARK SQL transformation mode
109+ spark = SparkSessionSingleton ().get_spark_session ()
110+ spark_df = spark .createDataFrame (self .data )
111+ # get item count
112+ item_count = spark_df .count ()
113+ # materialise to delta table
114+ fg .materialise (spark_df )
115+ # read dataframe
116+ df = fg .select ().read ()
117+ # assert dataframe
118+ assert df
119+ # assert count
120+ assert df .count () == item_count
108121
109122 self .clean_up_feature_group (fg )
110123 self .clean_up_transformation (transformation )
0 commit comments