Skip to content

Commit c80f7d3

Browse files
committed
DataType Changes,validation output transpose
1 parent 432955f commit c80f7d3

15 files changed

+235
-167
lines changed

ads/feature_store/common/enums.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ class FeatureType(Enum):
295295
STRING_BINARY_MAP = "STRING_BINARY_MAP"
296296
STRING_BOOLEAN_MAP = "STRING_BOOLEAN_MAP"
297297
UNKNOWN = "UNKNOWN"
298+
COMPLEX = "COMPLEX"
298299

299300

300301
class EntityType(Enum):

ads/feature_store/common/spark_session_singleton.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ def __init__(self, metastore_id: str = None):
7575
"spark.hadoop.oracle.dcat.metastore.id", metastore_id
7676
).config(
7777
"spark.sql.warehouse.dir", metastore.default_managed_table_location
78-
)
78+
)\
79+
.config("spark.driver.memory", "16G")
7980

8081
if developer_enabled():
8182
# Configure spark session with delta jars only in developer mode. In other cases,

ads/feature_store/common/utils/feature_schema_mapper.py

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def map_spark_type_to_feature_type(spark_type):
7171
if spark_type in spark_type_to_feature_type:
7272
return spark_type_to_feature_type.get(spark_type)
7373
else:
74-
return FeatureType.UNKNOWN
74+
return FeatureType.COMPLEX
7575

7676

7777
def map_pandas_type_to_feature_type(feature_name, values):
@@ -180,7 +180,7 @@ def map_feature_type_to_spark_type(feature_type):
180180
if feature_type_in in spark_types:
181181
return spark_types.get(feature_type_in)
182182
else:
183-
return "UNKNOWN"
183+
return "COMPLEX"
184184

185185

186186
def get_raw_data_source_schema(raw_feature_details: List[dict]):
@@ -225,30 +225,22 @@ def map_feature_type_to_pandas(feature_type):
225225
FeatureType.INTEGER: "int32",
226226
FeatureType.DECIMAL: "object",
227227
FeatureType.DATE: "object",
228+
FeatureType.STRING_ARRAY: "object",
229+
FeatureType.INTEGER_ARRAY: "object",
230+
FeatureType.LONG_ARRAY: "object",
231+
FeatureType.FLOAT_ARRAY: "object",
232+
FeatureType.DOUBLE_ARRAY: "object",
233+
FeatureType.TIMESTAMP_ARRAY: "object",
234+
FeatureType.BOOLEAN_ARRAY: "object",
235+
# FeatureType.DECIMAL_ARRAY: "object",
236+
FeatureType.DATE_ARRAY: "object",
228237
}
229238
if feature_type_in in supported_feature_type:
230239
return supported_feature_type.get(feature_type_in)
231240
else:
232241
raise TypeError(f"Feature Type {feature_type} is not supported for pandas")
233242

234243

235-
def convert_pandas_datatype_with_schema(
236-
raw_feature_details: List[dict], input_df: pd.DataFrame
237-
):
238-
feature_detail_map = {}
239-
for feature_details in raw_feature_details:
240-
feature_detail_map[feature_details.get("name")] = feature_details
241-
for column in input_df.columns:
242-
if column in feature_detail_map.keys():
243-
feature_details = feature_detail_map[column]
244-
feature_type = feature_details.get("featureType")
245-
pandas_type = map_feature_type_to_pandas(feature_type)
246-
input_df[column] = (
247-
input_df[column]
248-
.astype(pandas_type)
249-
.where(pd.notnull(input_df[column]), None)
250-
)
251-
252244

253245
def map_spark_type_to_stats_data_type(spark_type):
254246
"""Maps the spark data types to MLM library data types

ads/feature_store/common/utils/utility.py

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111
from ads.common.decorator.runtime_dependency import OptionalDependency
1212
from ads.feature_store.common.utils.feature_schema_mapper import (
1313
map_spark_type_to_feature_type,
14-
map_pandas_type_to_feature_type,
14+
map_feature_type_to_pandas,
1515
)
1616
from ads.feature_store.feature import Feature, DatasetFeature
1717
from ads.feature_store.feature_group_expectation import Rule, Expectation
1818
from ads.feature_store.input_feature_detail import FeatureDetail
19+
from ads.feature_store.common.spark_session_singleton import SparkSessionSingleton
1920

2021
try:
2122
from pyspark.pandas import DataFrame
@@ -154,18 +155,9 @@ def get_features(
154155

155156

156157
def get_schema_from_pandas_df(df: pd.DataFrame):
157-
schema_details = []
158-
159-
for order_number, field in enumerate(df.columns, start=1):
160-
details = {
161-
"name": field,
162-
"feature_type": map_pandas_type_to_feature_type(field, df[field]),
163-
"order_number": order_number,
164-
}
165-
166-
schema_details.append(details)
167-
168-
return schema_details
158+
spark = SparkSessionSingleton().get_spark_session()
159+
converted_df = spark.createDataFrame(df)
160+
return get_schema_from_spark_df(converted_df)
169161

170162

171163
def get_schema_from_spark_df(df: DataFrame):
@@ -268,3 +260,45 @@ def largest_matching_subset_of_primary_keys(left_feature_group, right_feature_gr
268260
common_keys = left_primary_keys.intersection(right_primary_keys)
269261

270262
return common_keys
263+
264+
265+
def convert_pandas_datatype_with_schema(
266+
raw_feature_details: List[dict], input_df: pd.DataFrame
267+
):
268+
feature_detail_map = {}
269+
columns_to_remove = []
270+
for feature_details in raw_feature_details:
271+
feature_detail_map[feature_details.get("name")] = feature_details
272+
for column in input_df.columns:
273+
if column in feature_detail_map.keys():
274+
feature_details = feature_detail_map[column]
275+
feature_type = feature_details.get("featureType")
276+
pandas_type = map_feature_type_to_pandas(feature_type)
277+
input_df[column] = (
278+
input_df[column]
279+
.astype(pandas_type)
280+
.where(pd.notnull(input_df[column]), None)
281+
)
282+
else:
283+
logger.warning("column" + column + "doesnt exist in the input feature details")
284+
columns_to_remove.append(column)
285+
return input_df.drop(columns = columns_to_remove)
286+
287+
288+
def validate_spark_dataframe_schema(raw_feature_details: List[dict], input_df: DataFrame):
289+
feature_detail_map = {}
290+
columns_to_remove = []
291+
for feature_details in raw_feature_details:
292+
feature_detail_map[feature_details.get("name")] = feature_details
293+
for column in input_df.columns:
294+
if column not in feature_detail_map.keys():
295+
logger.warning("column" + column + "doesnt exist in the input feature details")
296+
columns_to_remove.append(column)
297+
298+
return input_df.drop(*columns_to_remove)
299+
300+
301+
def validate_input_feature_details(input_feature_details, data_frame):
302+
if isinstance(data_frame, pd.DataFrame):
303+
return convert_pandas_datatype_with_schema(input_feature_details, data_frame)
304+
return validate_spark_dataframe_schema(input_feature_details, data_frame)

ads/feature_store/docs/source/feature_group.rst

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,6 @@ Statistics Results
175175
==================
176176
You can call the ``get_statistics()`` method of the FeatureGroup instance to fetch validation results for a specific ingestion job.
177177

178-
.. note::
179-
180-
PyDeequ is a Python API for Deequ, a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets.
181-
182178
.. code-block:: python3
183179
184180
# Fetch stats results for a feature group job
@@ -196,26 +192,17 @@ With a FeatureGroup instance, we can get the last feature group job details usin
196192
197193
# Fetch validation results for a feature group
198194
feature_group_job = feature_group.get_last_job()
199-
df = feature_group_job.get_validation().to_pandas()
200-
df.show()
195+
feature_group_job.get_validation_output_df()
201196
202197
Get features
203198
=============
204-
You can call the ``get_features_dataframe()`` method of the FeatureGroup instance to fetch features in a feature group
199+
You can call the ``get_features_df`` method of the FeatureGroup instance to fetch features in a feature group
205200

206201
.. code-block:: python3
207202
208203
# Fetch features for a feature group
209-
df = feature_group.get_features_dataframe()
210-
211-
Get input schema details
212-
==========================
213-
You can call the ``get_input_schema_dataframe()`` method of the FeatureGroup instance to fetch input schema details of a feature group
214-
215-
.. code-block:: python3
204+
df = feature_group.get_features_df()
216205
217-
# Fetch features for a feature group
218-
df = feature_group.get_input_schema_dataframe()
219206
220207
Filter
221208
======
@@ -363,31 +350,31 @@ The data will be stored in a data type native to each store. There is an option
363350
- STRING
364351
- Textual data
365352
* - ArrayType(IntegerType())
366-
- object (list), object (np.ndarray) - not supported
353+
- object (list), object (np.ndarray)
367354
- INTEGER_ARRAY
368355
- List of values
369356
* - ArrayType(LongType())
370-
- object (list), object (np.ndarray) - not supported
357+
- object (list), object (np.ndarray)
371358
- LONG_ARRAY
372359
- List of values
373360
* - ArrayType(FloatType())
374-
- object (list), object (np.ndarray) - not supported
361+
- object (list), object (np.ndarray)
375362
- FLOAT_ARRAY
376363
- List of values
377364
* - ArrayType(DoubleType())
378-
- object (list), object (np.ndarray) - not supported
365+
- object (list), object (np.ndarray)
379366
- DOUBLE_ARRAY
380367
- List of values
381368
* - ArrayType(BinaryType())
382-
- object (list), object (np.ndarray) - not supported
369+
- object (list), object (np.ndarray)
383370
- BINARY_ARRAY
384371
- List of values
385372
* - ArrayType(DateType())
386-
- object (list), object (np.ndarray) - not supported
373+
- object (list), object (np.ndarray)
387374
- DATE_ARRAY
388375
- List of values
389376
* - ArrayType(TimestampType())
390-
- object (list), object (np.ndarray) - not supported
377+
- object (list), object (np.ndarray)
391378
- TIMESTAMP_ARRAY
392379
- List of values
393380
* - StructType

ads/feature_store/execution_strategy/spark/spark_execution.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@
2727
EntityType,
2828
)
2929
from ads.feature_store.common.spark_session_singleton import SparkSessionSingleton
30-
from ads.feature_store.common.utils.feature_schema_mapper import (
31-
convert_pandas_datatype_with_schema,
32-
)
3330
from ads.feature_store.common.utils.transformation_utils import TransformationUtils
3431
from ads.feature_store.data_validation.great_expectation import ExpectationService
3532
from ads.feature_store.dataset_job import DatasetJob
@@ -41,6 +38,9 @@
4138
from ads.feature_store.transformation import Transformation
4239

4340
from ads.feature_store.feature_statistics.statistics_service import StatisticsService
41+
from ads.feature_store.common.utils.utility import (
42+
validate_input_feature_details
43+
)
4444

4545
logger = logging.getLogger(__name__)
4646

@@ -177,11 +177,8 @@ def _save_offline_dataframe(
177177
database = feature_group.entity_id
178178
self.spark_engine.create_database(database)
179179

180-
if isinstance(data_frame, pd.DataFrame):
181-
if not feature_group.is_infer_schema:
182-
convert_pandas_datatype_with_schema(
183-
feature_group.input_feature_details, data_frame
184-
)
180+
if not feature_group.is_infer_schema:
181+
data_frame = validate_input_feature_details(feature_group.input_feature_details, data_frame)
185182

186183
# TODO: Get event timestamp column and apply filtering basis from and to timestamp
187184

ads/feature_store/feature_group_job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def get_validation_output_df(self) -> "pandas.DataFrame":
177177
)
178178

179179
# Convert Python object to Pandas DataFrame
180-
validation_output_df = pandas.json_normalize(validation_output_json)
180+
validation_output_df = pandas.json_normalize(validation_output_json).transpose()
181181

182182
# return the validation output DataFrame
183183
return validation_output_df

ads/feature_store/validation_output.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def to_pandas(self) -> pd.DataFrame:
2222
The validation output information as a pandas DataFrame.
2323
"""
2424
if self.content:
25-
profile_result = pd.json_normalize(self.content)
25+
profile_result = pd.json_normalize(self.content).transpose()
2626
return profile_result
2727

2828
@property

tests/integration/feature_store/test_dataset_validations.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def test_dataset_validation_operations(self):
5555
assert dataset.oci_dataset.id
5656

5757
dataset.materialise()
58-
df = dataset.get_validation_output().to_pandas()
58+
df = dataset.get_validation_output().to_pandas().T
5959
assert df is not None
6060
assert "success" in df.columns
6161
assert True in df["success"].values

0 commit comments

Comments
 (0)