1+ import ast
2+ import json
3+
4+ from ads .feature_store .dataset import Dataset
5+ from ads .feature_store .statistics_config import StatisticsConfig
6+ from tests .integration .feature_store .test_base import FeatureStoreTestCase
7+ from ads .feature_store .feature_group import FeatureGroup
8+
9+
10+ class TestPartitioningForFeatureGroupAndDataset (FeatureStoreTestCase ):
11+ """Contains integration tests for partitioning of feature groups and datasets"""
12+
13+ def define_feature_group_resource_with_partitioning (
14+ self , entity_id , feature_store_id , partitioning_keys
15+ ) -> "FeatureGroup" :
16+ feature_group_resource = (
17+ FeatureGroup ()
18+ .with_description ("feature group with statistics disabled" )
19+ .with_compartment_id (self .COMPARTMENT_ID )
20+ .with_name (self .get_name ("petals2" ))
21+ .with_entity_id (entity_id )
22+ .with_feature_store_id (feature_store_id )
23+ .with_primary_keys ([])
24+ .with_partition_keys (partitioning_keys )
25+ .with_input_feature_details (self .INPUT_FEATURE_DETAILS )
26+ .with_statistics_config (False )
27+ )
28+ return feature_group_resource
29+
30+ def define_dataset_resource_with_partitioning (
31+ self , entity_id , feature_store_id , feature_group_name , partitioning_keys
32+ ) -> "Dataset" :
33+ name = self .get_name ("petals_ds" )
34+ dataset_resource = (
35+ Dataset ()
36+ .with_description ("dataset description" )
37+ .with_compartment_id (self .COMPARTMENT_ID )
38+ .with_name (name )
39+ .with_entity_id (entity_id )
40+ .with_feature_store_id (feature_store_id )
41+ .with_query (f"SELECT * FROM `{ entity_id } `.{ feature_group_name } " )
42+ .with_statistics_config (
43+ StatisticsConfig (True , columns = ["sepal_length" , "petal_width" ])
44+ )
45+ .with_partition_keys (partitioning_keys )
46+ )
47+ return dataset_resource
48+
49+ def test_feature_group_materialization_with_partitioning_keys (self ):
50+ fs = self .define_feature_store_resource ().create ()
51+ assert fs .oci_fs .id
52+
53+ entity = self .create_entity_resource (fs )
54+ assert entity .oci_fs_entity .id
55+
56+ fg = self .define_feature_group_resource_with_partitioning (
57+ entity .oci_fs_entity .id , fs .oci_fs .id , ['class' ]
58+ ).create ()
59+ assert fg .oci_feature_group .id
60+
61+ fg .materialise (self .data )
62+
63+ history_df = fg .history ()
64+ history_df_dict = json .loads (history_df .toJSON ().collect ()[0 ])
65+ materialized_partition_keys = ast .literal_eval (history_df_dict .get ("operationParameters" ).get ("partitionBy" ))
66+
67+ assert len (materialized_partition_keys ) == 1
68+ assert materialized_partition_keys [0 ] == "class"
69+
70+ self .clean_up_feature_group (fg )
71+ self .clean_up_entity (entity )
72+ self .clean_up_feature_store (fs )
73+
74+ def test_feature_group_materialization_without_partitioning_keys (self ):
75+ fs = self .define_feature_store_resource ().create ()
76+ assert fs .oci_fs .id
77+
78+ entity = self .create_entity_resource (fs )
79+ assert entity .oci_fs_entity .id
80+
81+ fg = self .define_feature_group_resource_with_partitioning (
82+ entity .oci_fs_entity .id , fs .oci_fs .id , None
83+ ).create ()
84+ assert fg .oci_feature_group .id
85+
86+ fg .materialise (self .data )
87+
88+ history_df = fg .history ()
89+ history_df_dict = json .loads (history_df .toJSON ().collect ()[0 ])
90+ materialized_partition_keys = ast .literal_eval (history_df_dict .get ("operationParameters" ).get ("partitionBy" ))
91+
92+ assert len (materialized_partition_keys ) == 0
93+
94+ self .clean_up_feature_group (fg )
95+ self .clean_up_entity (entity )
96+ self .clean_up_feature_store (fs )
97+
98+ def test_dataset_materialization_with_partitioning_keys (self ):
99+ fs = self .define_feature_store_resource ().create ()
100+ assert fs .oci_fs .id
101+
102+ entity = self .create_entity_resource (fs )
103+ assert entity .oci_fs_entity .id
104+
105+ fg = self .define_feature_group_resource (
106+ entity .oci_fs_entity .id , fs .oci_fs .id
107+ ).create ()
108+
109+ assert fg .oci_feature_group .id
110+ fg .materialise (self .data )
111+
112+ dataset = self .define_dataset_resource_with_partitioning (
113+ entity .oci_fs_entity .id , fs .oci_fs .id , fg .oci_feature_group .name , ['class' ]
114+ ).create ()
115+ assert dataset .oci_dataset .id
116+
117+ dataset .materialise ()
118+ history_df = dataset .history ()
119+ history_df_dict = json .loads (history_df .toJSON ().collect ()[0 ])
120+ materialized_partition_keys = ast .literal_eval (history_df_dict .get ("operationParameters" ).get ("partitionBy" ))
121+
122+ assert len (materialized_partition_keys ) == 1
123+ assert materialized_partition_keys [0 ] == "class"
124+
125+ self .clean_up_dataset (dataset )
126+ self .clean_up_feature_group (fg )
127+ self .clean_up_entity (entity )
128+ self .clean_up_feature_store (fs )
129+
130+ def test_dataset_materialization_without_partitioning_keys (self ):
131+ fs = self .define_feature_store_resource ().create ()
132+ assert fs .oci_fs .id
133+
134+ entity = self .create_entity_resource (fs )
135+ assert entity .oci_fs_entity .id
136+
137+ fg = self .define_feature_group_resource (
138+ entity .oci_fs_entity .id , fs .oci_fs .id
139+ ).create ()
140+
141+ assert fg .oci_feature_group .id
142+ fg .materialise (self .data )
143+
144+ dataset = self .define_dataset_resource_with_partitioning (
145+ entity .oci_fs_entity .id , fs .oci_fs .id , fg .oci_feature_group .name , None
146+ ).create ()
147+ assert dataset .oci_dataset .id
148+
149+ dataset .materialise ()
150+ history_df = dataset .history ()
151+ history_df_dict = json .loads (history_df .toJSON ().collect ()[0 ])
152+ materialized_partition_keys = ast .literal_eval (history_df_dict .get ("operationParameters" ).get ("partitionBy" ))
153+
154+ assert len (materialized_partition_keys ) == 0
155+
156+ self .clean_up_dataset (dataset )
157+ self .clean_up_feature_group (fg )
158+ self .clean_up_entity (entity )
159+ self .clean_up_feature_store (fs )
0 commit comments