Skip to content

Commit d81f860

Browse files
authored
datatype - infer schema changes and integration tests (#229)
2 parents 62840d3 + 8b3e996 commit d81f860

File tree

15 files changed

+1464
-67
lines changed

15 files changed

+1464
-67
lines changed

ads/common/oci_mixin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,7 @@ def update_from_oci_model(
726726
for attr in self.swagger_types.keys():
727727
if (
728728
hasattr(oci_model_instance, attr)
729-
and getattr(oci_model_instance, attr)
729+
and getattr(oci_model_instance, attr) is not None
730730
and (
731731
not hasattr(self, attr)
732732
or not getattr(self, attr)

ads/feature_store/common/enums.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,17 +261,40 @@ class FeatureType(Enum):
261261
"""
262262

263263
STRING = "STRING"
264+
SHORT = "SHORT"
264265
INTEGER = "INTEGER"
266+
LONG = "LONG"
265267
FLOAT = "FLOAT"
266268
DOUBLE = "DOUBLE"
267269
BOOLEAN = "BOOLEAN"
268270
DATE = "DATE"
269271
TIMESTAMP = "TIMESTAMP"
270272
DECIMAL = "DECIMAL"
271273
BINARY = "BINARY"
272-
ARRAY = "ARRAY"
273-
MAP = "MAP"
274-
STRUCT = "STRUCT"
274+
BYTE = "BYTE"
275+
STRING_ARRAY = "STRING_ARRAY"
276+
INTEGER_ARRAY = "INTEGER_ARRAY"
277+
SHORT_ARRAY = "SHORT_ARRAY"
278+
LONG_ARRAY = "LONG_ARRAY"
279+
FLOAT_ARRAY = "FLOAT_ARRAY"
280+
DOUBLE_ARRAY = "DOUBLE_ARRAY"
281+
BINARY_ARRAY = "BINARY_ARRAY"
282+
DATE_ARRAY = "DATE_ARRAY"
283+
TIMESTAMP_ARRAY = "TIMESTAMP_ARRAY"
284+
BYTE_ARRAY = "BYTE_ARRAY"
285+
BOOLEAN_ARRAY = "BOOLEAN_ARRAY"
286+
STRING_STRING_MAP = "STRING_STRING_MAP"
287+
STRING_INTEGER_MAP = "STRING_INTEGER_MAP"
288+
STRING_SHORT_MAP = "STRING_SHORT_MAP"
289+
STRING_LONG_MAP = "STRING_LONG_MAP"
290+
STRING_FLOAT_MAP = "STRING_FLOAT_MAP"
291+
STRING_DOUBLE_MAP = "STRING_DOUBLE_MAP"
292+
STRING_TIMESTAMP_MAP = "STRING_TIMESTAMP_MAP"
293+
STRING_DATE_MAP = "STRING_DATE_MAP"
294+
STRING_BYTE_MAP = "STRING_BYTE_MAP"
295+
STRING_BINARY_MAP = "STRING_BINARY_MAP"
296+
STRING_BOOLEAN_MAP = "STRING_BOOLEAN_MAP"
297+
UNKNOWN = "UNKNOWN"
275298

276299

277300
class EntityType(Enum):

ads/feature_store/common/utils/feature_schema_mapper.py

Lines changed: 171 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@
66

77
from typing import List
88

9+
import numpy as np
10+
import pandas as pd
11+
912
from ads.common.decorator.runtime_dependency import OptionalDependency
13+
from ads.feature_store.common.enums import FeatureType
1014

1115
try:
1216
from pyspark.sql.types import *
@@ -25,65 +29,153 @@ def map_spark_type_to_feature_type(spark_type):
2529
:return:
2630
"""
2731
spark_type_to_feature_type = {
28-
StringType(): "string",
29-
IntegerType(): "integer",
30-
FloatType(): "float",
31-
DoubleType(): "double",
32-
BooleanType(): "boolean",
33-
DateType(): "date",
34-
TimestampType(): "timestamp",
35-
DecimalType(): "decimal",
36-
BinaryType(): "binary",
37-
ArrayType(StringType()): "array",
38-
MapType(StringType(), StringType()): "map",
39-
StructType(): "struct",
40-
ByteType(): "byte",
41-
ShortType(): "short",
42-
LongType(): "long",
32+
StringType(): FeatureType.STRING,
33+
IntegerType(): FeatureType.INTEGER,
34+
ShortType(): FeatureType.SHORT,
35+
LongType(): FeatureType.LONG,
36+
FloatType(): FeatureType.FLOAT,
37+
DoubleType(): FeatureType.DOUBLE,
38+
BooleanType(): FeatureType.BOOLEAN,
39+
DateType(): FeatureType.DATE,
40+
TimestampType(): FeatureType.TIMESTAMP,
41+
BinaryType(): FeatureType.BINARY,
42+
ByteType(): FeatureType.BYTE,
43+
ArrayType(StringType()): FeatureType.STRING_ARRAY,
44+
ArrayType(IntegerType()): FeatureType.INTEGER_ARRAY,
45+
ArrayType(LongType()): FeatureType.LONG_ARRAY,
46+
ArrayType(FloatType()): FeatureType.FLOAT_ARRAY,
47+
ArrayType(DoubleType()): FeatureType.DOUBLE_ARRAY,
48+
ArrayType(BinaryType()): FeatureType.BINARY_ARRAY,
49+
ArrayType(DateType()): FeatureType.DATE_ARRAY,
50+
ArrayType(TimestampType()): FeatureType.TIMESTAMP_ARRAY,
51+
ArrayType(ByteType()): FeatureType.BYTE_ARRAY,
52+
ArrayType(BooleanType()): FeatureType.BOOLEAN_ARRAY,
53+
ArrayType(ShortType()): FeatureType.SHORT_ARRAY,
54+
MapType(StringType(), StringType()): FeatureType.STRING_STRING_MAP,
55+
MapType(StringType(), IntegerType()): FeatureType.STRING_INTEGER_MAP,
56+
MapType(StringType(), ShortType()): FeatureType.STRING_SHORT_MAP,
57+
MapType(StringType(), LongType()): FeatureType.STRING_LONG_MAP,
58+
MapType(StringType(), FloatType()): FeatureType.STRING_FLOAT_MAP,
59+
MapType(StringType(), DoubleType()): FeatureType.STRING_DOUBLE_MAP,
60+
MapType(StringType(), TimestampType()): FeatureType.STRING_TIMESTAMP_MAP,
61+
MapType(StringType(), DateType()): FeatureType.STRING_DATE_MAP,
62+
MapType(StringType(), BinaryType()): FeatureType.STRING_BINARY_MAP,
63+
MapType(StringType(), ByteType()): FeatureType.STRING_BYTE_MAP,
64+
MapType(StringType(), BooleanType()): FeatureType.STRING_BOOLEAN_MAP,
4365
}
44-
45-
return spark_type_to_feature_type.get(spark_type).upper()
66+
if spark_type in spark_type_to_feature_type:
67+
return spark_type_to_feature_type.get(spark_type)
68+
else:
69+
return FeatureType.UNKNOWN
70+
71+
72+
def map_pandas_type_to_feature_type(feature_name, values):
73+
pandas_type = str(values.dtype)
74+
inferred_dtype = FeatureType.UNKNOWN
75+
if pandas_type is "object":
76+
for row in values:
77+
if isinstance(row, (list, np.ndarray)):
78+
raise TypeError(f"object of type {type(row)} not supported")
79+
pandas_basic_type = type(row).__name__
80+
current_dtype = map_pandas_basic_type_to_feature_type(pandas_basic_type)
81+
if inferred_dtype is FeatureType.UNKNOWN:
82+
inferred_dtype = current_dtype
83+
else:
84+
if (
85+
current_dtype != inferred_dtype
86+
and current_dtype is not FeatureType.UNKNOWN
87+
):
88+
raise TypeError(
89+
f"Input feature '{feature_name}' has mixed types, {current_dtype} and {inferred_dtype}. "
90+
f"That is not allowed. "
91+
)
92+
else:
93+
inferred_dtype = map_pandas_basic_type_to_feature_type(pandas_type)
94+
if inferred_dtype is FeatureType.UNKNOWN:
95+
raise TypeError(
96+
f"Input feature '{feature_name}' has type {str(pandas_type)} which is not supported"
97+
)
98+
else:
99+
return inferred_dtype
46100

47101

48-
def map_pandas_type_to_feature_type(pandas_type):
102+
def map_pandas_basic_type_to_feature_type(pandas_type):
49103
"""Returns the feature type corresponding to pandas_type
50104
:param pandas_type:
51105
:return:
52106
"""
107+
# TODO uint64 with bigger number cant be mapped to LongType
53108
pandas_type_to_feature_type = {
54-
"object": "string",
55-
"int64": "integer",
56-
"float64": "float",
57-
"bool": "boolean",
109+
"str": FeatureType.STRING,
110+
"string": FeatureType.STRING,
111+
"int": FeatureType.INTEGER,
112+
"int8": FeatureType.INTEGER,
113+
"int16": FeatureType.INTEGER,
114+
"int32": FeatureType.LONG,
115+
"int64": FeatureType.LONG,
116+
"uint8": FeatureType.INTEGER,
117+
"uint16": FeatureType.INTEGER,
118+
"uint32": FeatureType.LONG,
119+
"uint64": FeatureType.LONG,
120+
"float": FeatureType.FLOAT,
121+
"float16": FeatureType.FLOAT,
122+
"float32": FeatureType.DOUBLE,
123+
"float64": FeatureType.DOUBLE,
124+
"datetime64[ns]": FeatureType.TIMESTAMP,
125+
"datetime64[ns, UTC]": FeatureType.TIMESTAMP,
126+
"timedelta64[ns]": FeatureType.LONG,
127+
"bool": FeatureType.BOOLEAN,
128+
"Decimal": FeatureType.DECIMAL,
129+
"date": FeatureType.DATE,
58130
}
59-
60-
return pandas_type_to_feature_type.get(pandas_type).upper()
131+
if pandas_type in pandas_type_to_feature_type:
132+
return pandas_type_to_feature_type.get(pandas_type)
133+
return FeatureType.UNKNOWN
61134

62135

63136
def map_feature_type_to_spark_type(feature_type):
64137
"""Returns the Spark Type for a particular feature type.
65138
:param feature_type:
66139
:return: Spark Type
67140
"""
141+
feature_type_in = FeatureType(feature_type)
68142
spark_types = {
69-
"string": StringType(),
70-
"integer": IntegerType(),
71-
"float": FloatType(),
72-
"double": DoubleType(),
73-
"boolean": BooleanType(),
74-
"date": DateType(),
75-
"timestamp": TimestampType(),
76-
"decimal": DecimalType(),
77-
"binary": BinaryType(),
78-
"array": ArrayType(StringType()),
79-
"map": MapType(StringType(), StringType()),
80-
"struct": StructType(),
81-
"byte": ByteType(),
82-
"short": ShortType(),
83-
"long": LongType(),
143+
FeatureType.STRING: StringType(),
144+
FeatureType.SHORT: ShortType(),
145+
FeatureType.INTEGER: IntegerType(),
146+
FeatureType.LONG: LongType(),
147+
FeatureType.FLOAT: FloatType(),
148+
FeatureType.DOUBLE: DoubleType(),
149+
FeatureType.BOOLEAN: BooleanType(),
150+
FeatureType.DATE: DateType(),
151+
FeatureType.TIMESTAMP: TimestampType(),
152+
FeatureType.DECIMAL: DecimalType(),
153+
FeatureType.BINARY: BinaryType(),
154+
FeatureType.STRING_ARRAY: ArrayType(StringType()),
155+
FeatureType.INTEGER_ARRAY: ArrayType(IntegerType()),
156+
FeatureType.SHORT_ARRAY: ArrayType(ShortType()),
157+
FeatureType.LONG_ARRAY: ArrayType(LongType()),
158+
FeatureType.FLOAT_ARRAY: ArrayType(FloatType()),
159+
FeatureType.DOUBLE_ARRAY: ArrayType(DoubleType()),
160+
FeatureType.BINARY_ARRAY: ArrayType(BinaryType()),
161+
FeatureType.DATE_ARRAY: ArrayType(DateType()),
162+
FeatureType.BOOLEAN_ARRAY: ArrayType(BooleanType()),
163+
FeatureType.TIMESTAMP_ARRAY: ArrayType(TimestampType()),
164+
FeatureType.STRING_STRING_MAP: MapType(StringType(), StringType()),
165+
FeatureType.STRING_INTEGER_MAP: MapType(StringType(), IntegerType()),
166+
FeatureType.STRING_SHORT_MAP: MapType(StringType(), ShortType()),
167+
FeatureType.STRING_LONG_MAP: MapType(StringType(), LongType()),
168+
FeatureType.STRING_FLOAT_MAP: MapType(StringType(), FloatType()),
169+
FeatureType.STRING_DOUBLE_MAP: MapType(StringType(), DoubleType()),
170+
FeatureType.STRING_DATE_MAP: MapType(StringType(), DateType()),
171+
FeatureType.STRING_TIMESTAMP_MAP: MapType(StringType(), TimestampType()),
172+
FeatureType.STRING_BOOLEAN_MAP: MapType(StringType(), BooleanType()),
173+
FeatureType.BYTE: ByteType(),
84174
}
85-
86-
return spark_types.get(feature_type.lower(), None)
175+
if feature_type_in in spark_types:
176+
return spark_types.get(feature_type_in)
177+
else:
178+
return "UNKNOWN"
87179

88180

89181
def get_raw_data_source_schema(raw_feature_details: List[dict]):
@@ -94,6 +186,7 @@ def get_raw_data_source_schema(raw_feature_details: List[dict]):
94186
95187
Returns:
96188
StructType: Spark schema.
189+
:param raw_feature_details:
97190
"""
98191
# Initialize the schema
99192
features_schema = StructType()
@@ -113,3 +206,40 @@ def get_raw_data_source_schema(raw_feature_details: List[dict]):
113206
features_schema.add(feature_name, feature_type, is_nullable)
114207

115208
return features_schema
209+
210+
211+
def map_feature_type_to_pandas(feature_type):
212+
feature_type_in = FeatureType(feature_type)
213+
supported_feature_type = {
214+
FeatureType.STRING: str,
215+
FeatureType.LONG: "int64",
216+
FeatureType.DOUBLE: "float64",
217+
FeatureType.TIMESTAMP: "datetime64[ns]",
218+
FeatureType.BOOLEAN: "bool",
219+
FeatureType.FLOAT: "float32",
220+
FeatureType.INTEGER: "int32",
221+
FeatureType.DECIMAL: "object",
222+
FeatureType.DATE: "object",
223+
}
224+
if feature_type_in in supported_feature_type:
225+
return supported_feature_type.get(feature_type_in)
226+
else:
227+
raise TypeError(f"Feature Type {feature_type} is not supported for pandas")
228+
229+
230+
def convert_pandas_datatype_with_schema(
231+
raw_feature_details: List[dict], input_df: pd.DataFrame
232+
):
233+
feature_detail_map = {}
234+
for feature_details in raw_feature_details:
235+
feature_detail_map[feature_details.get("name")] = feature_details
236+
for column in input_df.columns:
237+
if column in feature_detail_map.keys():
238+
feature_details = feature_detail_map[column]
239+
feature_type = feature_details.get("featureType")
240+
pandas_type = map_feature_type_to_pandas(feature_type)
241+
input_df[column] = (
242+
input_df[column]
243+
.astype(pandas_type)
244+
.where(pd.notnull(input_df[column]), None)
245+
)

ads/feature_store/common/utils/utility.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,7 @@ def get_schema_from_pandas_df(df: pd.DataFrame):
159159
for order_number, field in enumerate(df.columns, start=1):
160160
details = {
161161
"name": field,
162-
"feature_type": FeatureType(
163-
map_pandas_type_to_feature_type(str(df[field].dtype))
164-
),
162+
"feature_type": map_pandas_type_to_feature_type(field, df[field]),
165163
"order_number": order_number,
166164
}
167165

@@ -176,7 +174,7 @@ def get_schema_from_spark_df(df: DataFrame):
176174
for order_number, field in enumerate(df.schema.fields, start=1):
177175
details = {
178176
"name": field.name,
179-
"feature_type": FeatureType(map_spark_type_to_feature_type(field.dataType)),
177+
"feature_type": map_spark_type_to_feature_type(field.dataType),
180178
"order_number": order_number,
181179
}
182180
schema_details.append(details)

ads/feature_store/execution_strategy/engine/spark_engine.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def get_columns_from_table(self, table_name: str):
138138
target_table_columns.append(
139139
{
140140
"name": field.name,
141-
"featureType": map_spark_type_to_feature_type(field.dataType),
141+
"featureType": map_spark_type_to_feature_type(field.dataType).value,
142142
}
143143
)
144144
return target_table_columns

ads/feature_store/execution_strategy/spark/spark_execution.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
)
2929
from ads.feature_store.common.spark_session_singleton import SparkSessionSingleton
3030
from ads.feature_store.common.utils.feature_schema_mapper import (
31-
get_raw_data_source_schema,
31+
convert_pandas_datatype_with_schema,
3232
)
3333
from ads.feature_store.common.utils.transformation_utils import TransformationUtils
3434
from ads.feature_store.data_validation.great_expectation import ExpectationService
@@ -177,16 +177,9 @@ def _save_offline_dataframe(
177177
database = feature_group.entity_id
178178
self.spark_engine.create_database(database)
179179

180-
if data_frame is None:
181-
raw_schema = get_raw_data_source_schema(
182-
feature_group.input_feature_details
183-
)
184-
elif isinstance(data_frame, pd.DataFrame):
185-
raw_schema = self.spark_engine.convert_from_pandas_to_spark_dataframe(
186-
data_frame
187-
).schema
188-
else:
189-
raw_schema = data_frame.schema
180+
if isinstance(data_frame, pd.DataFrame):
181+
if not feature_group.is_infer_schema:
182+
convert_pandas_datatype_with_schema(feature_group.input_feature_details, data_frame)
190183

191184
# TODO: Get event timestamp column and apply filtering basis from and to timestamp
192185

@@ -223,7 +216,7 @@ def _save_offline_dataframe(
223216
target_table,
224217
feature_group.primary_keys,
225218
feature_group_job.ingestion_mode,
226-
raw_schema,
219+
featured_data.schema,
227220
feature_group_job.feature_option_details,
228221
)
229222

0 commit comments

Comments
 (0)