Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 136 additions & 21 deletions synapseclient/models/mixins/table_components.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this scope of this for the upsert rows part just limited to when you want to use a dataframe to directly upsert rows to the Synapse Table. Upserting from a csv would be unaffected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When trying to upsert from a CSV we need to read it into a Dataframe in order for us to do the comparison to find the cells of data which changed.

Original file line number Diff line number Diff line change
Expand Up @@ -1756,7 +1756,6 @@ def _construct_partial_rows_for_upsert(
for col in primary_keys[1:]:
matching_conditions &= chunk_to_check_for_upsert[col] == getattr(row, col)
matching_row = chunk_to_check_for_upsert.loc[matching_conditions]

# Determines which cells need to be updated
for column in chunk_to_check_for_upsert.columns:
if len(matching_row[column].values) > 1:
Expand All @@ -1770,18 +1769,61 @@ def _construct_partial_rows_for_upsert(
column_id = entity.columns[column].id
column_type = entity.columns[column].column_type
cell_value = matching_row[column].values[0]
if not hasattr(row, column) or cell_value != getattr(row, column):
if (isinstance(cell_value, list) and len(cell_value) > 0) or not isna(
cell_value
):
partial_change_values[
column_id
] = _convert_pandas_row_to_python_types(
cell=cell_value, column_type=column_type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why this part is needed even after using .convert_dtypes()? and Dan mentioning that the Synapse Tables uploaded results were OK after adding convert_dtypes() in her investigation. Is this due to itertuples coercing into native pandas dtypes and so we need the explicit handling of NAs below

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convert_dtypes doesn't handle for arrays of values, and it was running into issues when checking for differences in arrays when there were null values.

# Safely compare values, handling pandas NA and arrays
row_value = getattr(row, column) if hasattr(row, column) else None
values_differ = False

if not hasattr(row, column):
values_differ = True
else:
# Helper to check if value is NA (handles both scalars and arrays)
try:
cell_is_na = isna(cell_value)
# If isna returns an array, check if all elements are NA
if hasattr(cell_is_na, "__iter__"):
# convert np.bool_ to bool
cell_is_na = bool(cell_is_na.all())
except (TypeError, ValueError):
cell_is_na = False

try:
row_is_na = isna(row_value)
# If isna returns an array, check if all elements are NA
if hasattr(row_is_na, "__iter__"):
# convert np.bool_ to bool
row_is_na = bool(row_is_na.all())
except (TypeError, ValueError):
row_is_na = False

if cell_is_na and row_is_na:
# Both are NA, no change needed
values_differ = False
elif cell_is_na or row_is_na:
# One is NA, the other is not
values_differ = True
else:
# Neither is NA, safe to compare
try:
values_differ = cell_value != row_value
# Handle array comparison result
if hasattr(values_differ, "__iter__"):
# convert np.bool_ to bool
values_differ = bool(values_differ.any())
except (TypeError, ValueError):
# If comparison fails, assume they differ
values_differ = True
if values_differ:
if (
isinstance(cell_value, list) and len(cell_value) > 0
) or not cell_is_na:
partial_change_values[column_id] = (
_convert_pandas_row_to_python_types(
cell=cell_value, column_type=column_type
)
)
else:
partial_change_values[column_id] = None

if partial_change_values:
partial_change = PartialRow(
row_id=row.ROW_ID,
Expand Down Expand Up @@ -1981,11 +2023,11 @@ async def _upsert_rows_async(
)

if isinstance(values, dict):
values = DataFrame(values)
values = DataFrame(values).convert_dtypes()
elif isinstance(values, str):
values = csv_to_pandas_df(filepath=values, **kwargs)
values = csv_to_pandas_df(filepath=values, **kwargs).convert_dtypes()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the plan to also add this everywhere, so in the query function

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it should be added there I think @danlu1 if you wanted to take the changes from here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can take a closer look at the code. I think adding .convert_dtypes here or right after read_csv in csv_to_pandas_df would both work.

elif isinstance(values, DataFrame):
pass
values = values.convert_dtypes()
else:
raise ValueError(
"Don't know how to make tables from values of type %s." % type(values)
Expand Down Expand Up @@ -2747,18 +2789,38 @@ async def main():

date_columns = []
list_columns = []
list_column_types = {}
dtype = {}

if result.headers is not None:
for column in result.headers:
if column.column_type == "STRING":
# we want to identify string columns so that pandas doesn't try to
# automatically parse strings in a string column to other data types
if column.column_type in (
"STRING",
"LINK",
"MEDIUMTEXT",
"LARGETEXT",
"ENTITYID",
"SUBMISSIONID",
"EVALUATIONID",
"USERID",
"FILEHANDLEID",
):
# String-based columns (including text types and ID types) should be
# explicitly typed to prevent pandas from automatically converting
# values to other types (e.g., 'syn123' to numeric)
dtype[column.name] = str
elif column.column_type == "JSON":
# JSON columns are also stored as lists in the CSV and need to be
# parsed with json.loads
list_columns.append(column.name)
list_column_types[column.name] = column.column_type
elif column.column_type in LIST_COLUMN_TYPES:
list_columns.append(column.name)
list_column_types[column.name] = column.column_type
elif column.column_type == "DATE" and convert_to_datetime:
date_columns.append(column.name)
# Note: DOUBLE, INTEGER, and BOOLEAN types are handled by pandas'
# default type inference and do not need explicit dtype specifications

return csv_to_pandas_df(
filepath=csv_path,
Expand All @@ -2768,6 +2830,8 @@ async def main():
row_id_and_version_in_index=False,
date_columns=date_columns if date_columns else None,
list_columns=list_columns if list_columns else None,
list_column_types=list_column_types if list_column_types else None,
dtype=dtype,
**kwargs,
)

Expand Down Expand Up @@ -3470,14 +3534,18 @@ async def main():

original_values = values
if isinstance(values, dict):
values = DataFrame(values)
values = DataFrame(values).convert_dtypes()
elif (
isinstance(values, str)
and schema_storage_strategy == SchemaStorageStrategy.INFER_FROM_DATA
):
values = csv_to_pandas_df(filepath=values, **(read_csv_kwargs or {}))
elif isinstance(values, DataFrame) or isinstance(values, str):
# We don't need to convert a DF, and CSVs will be uploaded as is
values = csv_to_pandas_df(
filepath=values, **(read_csv_kwargs or {})
).convert_dtypes()
elif isinstance(values, DataFrame):
values = values.convert_dtypes()
elif isinstance(values, str):
# CSVs will be uploaded as is
pass
else:
raise ValueError(
Expand Down Expand Up @@ -4350,6 +4418,7 @@ def csv_to_pandas_df(
lines_to_skip: int = 0,
date_columns: Optional[List[str]] = None,
list_columns: Optional[List[str]] = None,
list_column_types: Optional[Dict[str, str]] = None,
row_id_and_version_in_index: bool = True,
dtype: Optional[Dict[str, Any]] = None,
**kwargs,
Expand All @@ -4376,6 +4445,9 @@ def csv_to_pandas_df(
it will be used instead of this `lines_to_skip` argument.
date_columns: The names of the date columns in the file
list_columns: The names of the list columns in the file
list_column_types: A dictionary mapping list column names to their Synapse
column types (e.g., 'INTEGER_LIST', 'USERID_LIST'). Used to
properly convert items within lists to their correct types.
row_id_and_version_in_index: Whether the file contains rowId and
version in the index, Defaults to `True`.
dtype: The data type for the file, Defaults to `None`.
Expand Down Expand Up @@ -4414,13 +4486,56 @@ def csv_to_pandas_df(
# parse date columns if exists
if date_columns:
df = _convert_df_date_cols_to_datetime(df, date_columns)
# Turn list columns into lists
# Turn list columns into lists and convert items to their proper types
if list_columns:
for col in list_columns:
# Fill NA values with empty lists, it must be a string for json.loads to work
df.fillna({col: "[]"}, inplace=True)
df[col] = df[col].apply(json.loads)

# Convert list items to their proper types based on column type
if list_column_types and col in list_column_types:
column_type = list_column_types[col]
if column_type == "INTEGER_LIST":
# Convert items to int
df[col] = df[col].apply(
lambda x: (
[int(item) for item in x] if isinstance(x, list) else x
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not working if the list has NAs. I run into ValueError: cannot convert float NaN to integer when the NA is np.nan and TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NAType' when the NA is pd.NA.

)
)
elif column_type == "USERID_LIST":
# USERID items should be strings
df[col] = df[col].apply(
lambda x: (
[str(item) for item in x] if isinstance(x, list) else x
)
)
elif column_type == "BOOLEAN_LIST":
# Convert items to bool
df[col] = df[col].apply(
lambda x: (
[bool(item) for item in x] if isinstance(x, list) else x
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bool(np.np) outputted True and bool(pd.NA) error out TypeError: boolean value of NA is ambiguous

)
)
elif column_type == "DATE_LIST":
# Date items are already handled by json.loads as they come as numbers
pass
elif column_type == "ENTITYID_LIST":
# ENTITYID items should remain as strings
df[col] = df[col].apply(
lambda x: (
[str(item) for item in x] if isinstance(x, list) else x
)
)
elif column_type == "STRING_LIST":
# String items should remain as strings
df[col] = df[col].apply(
lambda x: (
[str(item) for item in x] if isinstance(x, list) else x
)
)
# JSON type doesn't need item conversion as it preserves types from json.loads

if (
row_id_and_version_in_index
and "ROW_ID" in df.columns
Expand Down
Loading
Loading