-
Notifications
You must be signed in to change notification settings - Fork 73
[GEN-2381] Pandas handling of nullable cells #1272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
2efe8e3
bb9a5fd
3abf87e
e3de13a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1756,7 +1756,6 @@ def _construct_partial_rows_for_upsert( | |
| for col in primary_keys[1:]: | ||
| matching_conditions &= chunk_to_check_for_upsert[col] == getattr(row, col) | ||
| matching_row = chunk_to_check_for_upsert.loc[matching_conditions] | ||
|
|
||
| # Determines which cells need to be updated | ||
| for column in chunk_to_check_for_upsert.columns: | ||
| if len(matching_row[column].values) > 1: | ||
|
|
@@ -1770,18 +1769,61 @@ def _construct_partial_rows_for_upsert( | |
| column_id = entity.columns[column].id | ||
| column_type = entity.columns[column].column_type | ||
| cell_value = matching_row[column].values[0] | ||
| if not hasattr(row, column) or cell_value != getattr(row, column): | ||
| if (isinstance(cell_value, list) and len(cell_value) > 0) or not isna( | ||
| cell_value | ||
| ): | ||
| partial_change_values[ | ||
| column_id | ||
| ] = _convert_pandas_row_to_python_types( | ||
| cell=cell_value, column_type=column_type | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you know why this part is needed even after using
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| # Safely compare values, handling pandas NA and arrays | ||
| row_value = getattr(row, column) if hasattr(row, column) else None | ||
| values_differ = False | ||
|
|
||
| if not hasattr(row, column): | ||
| values_differ = True | ||
| else: | ||
| # Helper to check if value is NA (handles both scalars and arrays) | ||
| try: | ||
| cell_is_na = isna(cell_value) | ||
| # If isna returns an array, check if all elements are NA | ||
| if hasattr(cell_is_na, "__iter__"): | ||
| # convert np.bool_ to bool | ||
| cell_is_na = bool(cell_is_na.all()) | ||
| except (TypeError, ValueError): | ||
| cell_is_na = False | ||
|
|
||
| try: | ||
| row_is_na = isna(row_value) | ||
| # If isna returns an array, check if all elements are NA | ||
| if hasattr(row_is_na, "__iter__"): | ||
| # convert np.bool_ to bool | ||
| row_is_na = bool(row_is_na.all()) | ||
| except (TypeError, ValueError): | ||
| row_is_na = False | ||
|
|
||
| if cell_is_na and row_is_na: | ||
| # Both are NA, no change needed | ||
| values_differ = False | ||
| elif cell_is_na or row_is_na: | ||
| # One is NA, the other is not | ||
| values_differ = True | ||
| else: | ||
| # Neither is NA, safe to compare | ||
| try: | ||
| values_differ = cell_value != row_value | ||
| # Handle array comparison result | ||
| if hasattr(values_differ, "__iter__"): | ||
| # convert np.bool_ to bool | ||
| values_differ = bool(values_differ.any()) | ||
| except (TypeError, ValueError): | ||
| # If comparison fails, assume they differ | ||
| values_differ = True | ||
| if values_differ: | ||
| if ( | ||
| isinstance(cell_value, list) and len(cell_value) > 0 | ||
| ) or not cell_is_na: | ||
| partial_change_values[column_id] = ( | ||
| _convert_pandas_row_to_python_types( | ||
| cell=cell_value, column_type=column_type | ||
| ) | ||
| ) | ||
| else: | ||
| partial_change_values[column_id] = None | ||
|
|
||
| if partial_change_values: | ||
| partial_change = PartialRow( | ||
| row_id=row.ROW_ID, | ||
|
|
@@ -1981,11 +2023,11 @@ async def _upsert_rows_async( | |
| ) | ||
|
|
||
| if isinstance(values, dict): | ||
| values = DataFrame(values) | ||
| values = DataFrame(values).convert_dtypes() | ||
| elif isinstance(values, str): | ||
| values = csv_to_pandas_df(filepath=values, **kwargs) | ||
| values = csv_to_pandas_df(filepath=values, **kwargs).convert_dtypes() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the plan to also add this everywhere, so in the query function
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it should be added there I think @danlu1 if you wanted to take the changes from here.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can take a closer look at the code. I think adding .convert_dtypes here or right after read_csv in csv_to_pandas_df would both work. |
||
| elif isinstance(values, DataFrame): | ||
| pass | ||
| values = values.convert_dtypes() | ||
| else: | ||
| raise ValueError( | ||
| "Don't know how to make tables from values of type %s." % type(values) | ||
|
|
@@ -2747,18 +2789,38 @@ async def main(): | |
|
|
||
| date_columns = [] | ||
| list_columns = [] | ||
| list_column_types = {} | ||
| dtype = {} | ||
|
|
||
| if result.headers is not None: | ||
| for column in result.headers: | ||
| if column.column_type == "STRING": | ||
| # we want to identify string columns so that pandas doesn't try to | ||
| # automatically parse strings in a string column to other data types | ||
| if column.column_type in ( | ||
| "STRING", | ||
| "LINK", | ||
| "MEDIUMTEXT", | ||
| "LARGETEXT", | ||
| "ENTITYID", | ||
| "SUBMISSIONID", | ||
| "EVALUATIONID", | ||
| "USERID", | ||
| "FILEHANDLEID", | ||
| ): | ||
| # String-based columns (including text types and ID types) should be | ||
| # explicitly typed to prevent pandas from automatically converting | ||
| # values to other types (e.g., 'syn123' to numeric) | ||
| dtype[column.name] = str | ||
| elif column.column_type == "JSON": | ||
| # JSON columns are also stored as lists in the CSV and need to be | ||
| # parsed with json.loads | ||
| list_columns.append(column.name) | ||
| list_column_types[column.name] = column.column_type | ||
| elif column.column_type in LIST_COLUMN_TYPES: | ||
| list_columns.append(column.name) | ||
| list_column_types[column.name] = column.column_type | ||
| elif column.column_type == "DATE" and convert_to_datetime: | ||
| date_columns.append(column.name) | ||
| # Note: DOUBLE, INTEGER, and BOOLEAN types are handled by pandas' | ||
| # default type inference and do not need explicit dtype specifications | ||
|
|
||
| return csv_to_pandas_df( | ||
| filepath=csv_path, | ||
|
|
@@ -2768,6 +2830,8 @@ async def main(): | |
| row_id_and_version_in_index=False, | ||
| date_columns=date_columns if date_columns else None, | ||
| list_columns=list_columns if list_columns else None, | ||
| list_column_types=list_column_types if list_column_types else None, | ||
| dtype=dtype, | ||
| **kwargs, | ||
| ) | ||
|
|
||
|
|
@@ -3470,14 +3534,18 @@ async def main(): | |
|
|
||
| original_values = values | ||
| if isinstance(values, dict): | ||
| values = DataFrame(values) | ||
| values = DataFrame(values).convert_dtypes() | ||
| elif ( | ||
| isinstance(values, str) | ||
| and schema_storage_strategy == SchemaStorageStrategy.INFER_FROM_DATA | ||
| ): | ||
| values = csv_to_pandas_df(filepath=values, **(read_csv_kwargs or {})) | ||
| elif isinstance(values, DataFrame) or isinstance(values, str): | ||
| # We don't need to convert a DF, and CSVs will be uploaded as is | ||
| values = csv_to_pandas_df( | ||
| filepath=values, **(read_csv_kwargs or {}) | ||
| ).convert_dtypes() | ||
| elif isinstance(values, DataFrame): | ||
| values = values.convert_dtypes() | ||
| elif isinstance(values, str): | ||
| # CSVs will be uploaded as is | ||
| pass | ||
| else: | ||
| raise ValueError( | ||
|
|
@@ -4350,6 +4418,7 @@ def csv_to_pandas_df( | |
| lines_to_skip: int = 0, | ||
| date_columns: Optional[List[str]] = None, | ||
| list_columns: Optional[List[str]] = None, | ||
| list_column_types: Optional[Dict[str, str]] = None, | ||
| row_id_and_version_in_index: bool = True, | ||
| dtype: Optional[Dict[str, Any]] = None, | ||
| **kwargs, | ||
|
|
@@ -4376,6 +4445,9 @@ def csv_to_pandas_df( | |
| it will be used instead of this `lines_to_skip` argument. | ||
| date_columns: The names of the date columns in the file | ||
| list_columns: The names of the list columns in the file | ||
| list_column_types: A dictionary mapping list column names to their Synapse | ||
| column types (e.g., 'INTEGER_LIST', 'USERID_LIST'). Used to | ||
| properly convert items within lists to their correct types. | ||
| row_id_and_version_in_index: Whether the file contains rowId and | ||
| version in the index, Defaults to `True`. | ||
| dtype: The data type for the file, Defaults to `None`. | ||
|
|
@@ -4414,13 +4486,56 @@ def csv_to_pandas_df( | |
| # parse date columns if exists | ||
| if date_columns: | ||
| df = _convert_df_date_cols_to_datetime(df, date_columns) | ||
| # Turn list columns into lists | ||
| # Turn list columns into lists and convert items to their proper types | ||
| if list_columns: | ||
| for col in list_columns: | ||
| # Fill NA values with empty lists, it must be a string for json.loads to work | ||
| df.fillna({col: "[]"}, inplace=True) | ||
| df[col] = df[col].apply(json.loads) | ||
|
|
||
| # Convert list items to their proper types based on column type | ||
| if list_column_types and col in list_column_types: | ||
| column_type = list_column_types[col] | ||
| if column_type == "INTEGER_LIST": | ||
| # Convert items to int | ||
| df[col] = df[col].apply( | ||
| lambda x: ( | ||
| [int(item) for item in x] if isinstance(x, list) else x | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might not working if the list has NAs. I run into |
||
| ) | ||
| ) | ||
| elif column_type == "USERID_LIST": | ||
| # USERID items should be strings | ||
| df[col] = df[col].apply( | ||
| lambda x: ( | ||
| [str(item) for item in x] if isinstance(x, list) else x | ||
| ) | ||
| ) | ||
| elif column_type == "BOOLEAN_LIST": | ||
| # Convert items to bool | ||
| df[col] = df[col].apply( | ||
| lambda x: ( | ||
| [bool(item) for item in x] if isinstance(x, list) else x | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bool(np.np) outputted True and bool(pd.NA) error out |
||
| ) | ||
| ) | ||
| elif column_type == "DATE_LIST": | ||
| # Date items are already handled by json.loads as they come as numbers | ||
| pass | ||
| elif column_type == "ENTITYID_LIST": | ||
| # ENTITYID items should remain as strings | ||
| df[col] = df[col].apply( | ||
| lambda x: ( | ||
| [str(item) for item in x] if isinstance(x, list) else x | ||
| ) | ||
| ) | ||
| elif column_type == "STRING_LIST": | ||
| # String items should remain as strings | ||
| df[col] = df[col].apply( | ||
| lambda x: ( | ||
| [str(item) for item in x] if isinstance(x, list) else x | ||
| ) | ||
| ) | ||
| # JSON type doesn't need item conversion as it preserves types from json.loads | ||
|
|
||
| if ( | ||
| row_id_and_version_in_index | ||
| and "ROW_ID" in df.columns | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this scope of this for the upsert rows part just limited to when you want to use a dataframe to directly upsert rows to the Synapse Table. Upserting from a csv would be unaffected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When trying to upsert from a CSV we need to read it into a Dataframe in order for us to do the comparison to find the cells of data which changed.