diff --git a/synapseclient/models/mixins/table_components.py b/synapseclient/models/mixins/table_components.py index 3bb9237ce..9c5fd9ef2 100644 --- a/synapseclient/models/mixins/table_components.py +++ b/synapseclient/models/mixins/table_components.py @@ -1756,7 +1756,6 @@ def _construct_partial_rows_for_upsert( for col in primary_keys[1:]: matching_conditions &= chunk_to_check_for_upsert[col] == getattr(row, col) matching_row = chunk_to_check_for_upsert.loc[matching_conditions] - # Determines which cells need to be updated for column in chunk_to_check_for_upsert.columns: if len(matching_row[column].values) > 1: @@ -1770,18 +1769,61 @@ def _construct_partial_rows_for_upsert( column_id = entity.columns[column].id column_type = entity.columns[column].column_type cell_value = matching_row[column].values[0] - if not hasattr(row, column) or cell_value != getattr(row, column): - if (isinstance(cell_value, list) and len(cell_value) > 0) or not isna( - cell_value - ): - partial_change_values[ - column_id - ] = _convert_pandas_row_to_python_types( - cell=cell_value, column_type=column_type + + # Safely compare values, handling pandas NA and arrays + row_value = getattr(row, column) if hasattr(row, column) else None + values_differ = False + + if not hasattr(row, column): + values_differ = True + else: + # Helper to check if value is NA (handles both scalars and arrays) + try: + cell_is_na = isna(cell_value) + # If isna returns an array, check if all elements are NA + if hasattr(cell_is_na, "__iter__"): + # convert np.bool_ to bool + cell_is_na = bool(cell_is_na.all()) + except (TypeError, ValueError): + cell_is_na = False + + try: + row_is_na = isna(row_value) + # If isna returns an array, check if all elements are NA + if hasattr(row_is_na, "__iter__"): + # convert np.bool_ to bool + row_is_na = bool(row_is_na.all()) + except (TypeError, ValueError): + row_is_na = False + + if cell_is_na and row_is_na: + # Both are NA, no change needed + values_differ = False + elif cell_is_na or row_is_na: + # One is NA, the other is not + values_differ = True + else: + # Neither is NA, safe to compare + try: + values_differ = cell_value != row_value + # Handle array comparison result + if hasattr(values_differ, "__iter__"): + # convert np.bool_ to bool + values_differ = bool(values_differ.any()) + except (TypeError, ValueError): + # If comparison fails, assume they differ + values_differ = True + if values_differ: + if ( + isinstance(cell_value, list) and len(cell_value) > 0 + ) or not cell_is_na: + partial_change_values[column_id] = ( + _convert_pandas_row_to_python_types( + cell=cell_value, column_type=column_type + ) ) else: partial_change_values[column_id] = None - if partial_change_values: partial_change = PartialRow( row_id=row.ROW_ID, @@ -1981,11 +2023,11 @@ async def _upsert_rows_async( ) if isinstance(values, dict): - values = DataFrame(values) + values = DataFrame(values).convert_dtypes() elif isinstance(values, str): - values = csv_to_pandas_df(filepath=values, **kwargs) + values = csv_to_pandas_df(filepath=values, **kwargs).convert_dtypes() elif isinstance(values, DataFrame): - pass + values = values.convert_dtypes() else: raise ValueError( "Don't know how to make tables from values of type %s." % type(values) @@ -2747,18 +2789,38 @@ async def main(): date_columns = [] list_columns = [] + list_column_types = {} dtype = {} if result.headers is not None: for column in result.headers: - if column.column_type == "STRING": - # we want to identify string columns so that pandas doesn't try to - # automatically parse strings in a string column to other data types + if column.column_type in ( + "STRING", + "LINK", + "MEDIUMTEXT", + "LARGETEXT", + "ENTITYID", + "SUBMISSIONID", + "EVALUATIONID", + "USERID", + "FILEHANDLEID", + ): + # String-based columns (including text types and ID types) should be + # explicitly typed to prevent pandas from automatically converting + # values to other types (e.g., 'syn123' to numeric) dtype[column.name] = str + elif column.column_type == "JSON": + # JSON columns are also stored as lists in the CSV and need to be + # parsed with json.loads + list_columns.append(column.name) + list_column_types[column.name] = column.column_type elif column.column_type in LIST_COLUMN_TYPES: list_columns.append(column.name) + list_column_types[column.name] = column.column_type elif column.column_type == "DATE" and convert_to_datetime: date_columns.append(column.name) + # Note: DOUBLE, INTEGER, and BOOLEAN types are handled by pandas' + # default type inference and do not need explicit dtype specifications return csv_to_pandas_df( filepath=csv_path, @@ -2768,6 +2830,8 @@ async def main(): row_id_and_version_in_index=False, date_columns=date_columns if date_columns else None, list_columns=list_columns if list_columns else None, + list_column_types=list_column_types if list_column_types else None, + dtype=dtype, **kwargs, ) @@ -3470,14 +3534,18 @@ async def main(): original_values = values if isinstance(values, dict): - values = DataFrame(values) + values = DataFrame(values).convert_dtypes() elif ( isinstance(values, str) and schema_storage_strategy == SchemaStorageStrategy.INFER_FROM_DATA ): - values = csv_to_pandas_df(filepath=values, **(read_csv_kwargs or {})) - elif isinstance(values, DataFrame) or isinstance(values, str): - # We don't need to convert a DF, and CSVs will be uploaded as is + values = csv_to_pandas_df( + filepath=values, **(read_csv_kwargs or {}) + ).convert_dtypes() + elif isinstance(values, DataFrame): + values = values.convert_dtypes() + elif isinstance(values, str): + # CSVs will be uploaded as is pass else: raise ValueError( @@ -4350,6 +4418,7 @@ def csv_to_pandas_df( lines_to_skip: int = 0, date_columns: Optional[List[str]] = None, list_columns: Optional[List[str]] = None, + list_column_types: Optional[Dict[str, str]] = None, row_id_and_version_in_index: bool = True, dtype: Optional[Dict[str, Any]] = None, **kwargs, @@ -4376,6 +4445,9 @@ def csv_to_pandas_df( it will be used instead of this `lines_to_skip` argument. date_columns: The names of the date columns in the file list_columns: The names of the list columns in the file + list_column_types: A dictionary mapping list column names to their Synapse + column types (e.g., 'INTEGER_LIST', 'USERID_LIST'). Used to + properly convert items within lists to their correct types. row_id_and_version_in_index: Whether the file contains rowId and version in the index, Defaults to `True`. dtype: The data type for the file, Defaults to `None`. @@ -4414,13 +4486,56 @@ def csv_to_pandas_df( # parse date columns if exists if date_columns: df = _convert_df_date_cols_to_datetime(df, date_columns) - # Turn list columns into lists + # Turn list columns into lists and convert items to their proper types if list_columns: for col in list_columns: # Fill NA values with empty lists, it must be a string for json.loads to work df.fillna({col: "[]"}, inplace=True) df[col] = df[col].apply(json.loads) + # Convert list items to their proper types based on column type + if list_column_types and col in list_column_types: + column_type = list_column_types[col] + if column_type == "INTEGER_LIST": + # Convert items to int + df[col] = df[col].apply( + lambda x: ( + [int(item) for item in x] if isinstance(x, list) else x + ) + ) + elif column_type == "USERID_LIST": + # USERID items should be strings + df[col] = df[col].apply( + lambda x: ( + [str(item) for item in x] if isinstance(x, list) else x + ) + ) + elif column_type == "BOOLEAN_LIST": + # Convert items to bool + df[col] = df[col].apply( + lambda x: ( + [bool(item) for item in x] if isinstance(x, list) else x + ) + ) + elif column_type == "DATE_LIST": + # Date items are already handled by json.loads as they come as numbers + pass + elif column_type == "ENTITYID_LIST": + # ENTITYID items should remain as strings + df[col] = df[col].apply( + lambda x: ( + [str(item) for item in x] if isinstance(x, list) else x + ) + ) + elif column_type == "STRING_LIST": + # String items should remain as strings + df[col] = df[col].apply( + lambda x: ( + [str(item) for item in x] if isinstance(x, list) else x + ) + ) + # JSON type doesn't need item conversion as it preserves types from json.loads + if ( row_id_and_version_in_index and "ROW_ID" in df.columns diff --git a/tests/integration/synapseclient/models/async/test_table_async.py b/tests/integration/synapseclient/models/async/test_table_async.py index 5416f1836..83624cf05 100644 --- a/tests/integration/synapseclient/models/async/test_table_async.py +++ b/tests/integration/synapseclient/models/async/test_table_async.py @@ -1293,9 +1293,7 @@ async def test_upsert_with_large_data_and_batching( # AND multiple batch jobs should have been created due to batching settings assert spy_send_job.call_count == 7 # More batches due to small size settings - async def test_upsert_all_data_types( - self, mocker: MockerFixture, project_model: Project - ) -> None: + async def test_upsert_all_data_types(self, project_model: Project) -> None: """Test upserting all supported data types to ensure type compatibility.""" # GIVEN a table in Synapse with all data types table_name = str(uuid.uuid4()) @@ -1351,60 +1349,60 @@ async def test_upsert_all_data_types( evaluation, file.id, name="Submission 1", submitterAlias="My Team" ) - # GIVEN initial data with all data types + # GIVEN initial data with all data types, including random null values initial_data = pd.DataFrame( { # Basic types "column_string": ["value1", "value2", "value3"], - "column_double": [1.1, 2.2, 3.3], - "column_integer": [1, 2, 3], - "column_boolean": [True, True, True], + "column_double": [1.1, None, 2.2], + "column_integer": [1, None, 3], + "column_boolean": [True, None, True], "column_date": [ utils.to_unix_epoch_time("2021-01-01"), - utils.to_unix_epoch_time("2021-01-02"), + None, utils.to_unix_epoch_time("2021-01-03"), ], # Reference types "column_filehandleid": [ file.file_handle.id, - file.file_handle.id, + None, file.file_handle.id, ], - "column_entityid": [file.id, file.id, file.id], + "column_entityid": [file.id, None, file.id], "column_submissionid": [ submission.id, - submission.id, + None, submission.id, ], "column_evaluationid": [ evaluation.id, - evaluation.id, + None, evaluation.id, ], # Text types "column_link": [ "https://www.synapse.org/Profile:", - "https://www.synapse.org/Profile:", + None, "https://www.synapse.org/Profile:", ], - "column_mediumtext": ["value1", "value2", "value3"], - "column_largetext": ["value1", "value2", "value3"], + "column_mediumtext": ["value1", None, "value3"], + "column_largetext": ["value1", None, "value3"], # User IDs "column_userid": [ self.syn.credentials.owner_id, - self.syn.credentials.owner_id, + None, self.syn.credentials.owner_id, ], # List types "column_string_LIST": [ ["value1", "value2"], - ["value3", "value4"], + None, ["value5", "value6"], ], - "column_integer_LIST": [[1, 2], [3, 4], [5, 6]], + "column_integer_LIST": [[1, 2], None, [5, 6]], "column_boolean_LIST": [ [True, False], - [True, False], + None, [True, False], ], "column_date_LIST": [ @@ -1412,10 +1410,7 @@ async def test_upsert_all_data_types( utils.to_unix_epoch_time("2021-01-01"), utils.to_unix_epoch_time("2021-01-02"), ], - [ - utils.to_unix_epoch_time("2021-01-03"), - utils.to_unix_epoch_time("2021-01-04"), - ], + None, [ utils.to_unix_epoch_time("2021-01-05"), utils.to_unix_epoch_time("2021-01-06"), @@ -1423,18 +1418,18 @@ async def test_upsert_all_data_types( ], "column_entity_id_list": [ [file.id, file.id], - [file.id, file.id], + None, [file.id, file.id], ], "column_user_id_list": [ [self.syn.credentials.owner_id, self.syn.credentials.owner_id], - [self.syn.credentials.owner_id, self.syn.credentials.owner_id], + None, [self.syn.credentials.owner_id, self.syn.credentials.owner_id], ], # JSON type "column_json": [ {"key1": "value1", "key2": 2}, - {"key3": "value3", "key4": 4}, + None, {"key5": "value5", "key6": 6}, ], } @@ -1447,6 +1442,123 @@ async def test_upsert_all_data_types( synapse_client=self.syn, ) + # THEN verify the initial data was stored correctly + results_after_insert = await query_async( + f"SELECT * FROM {table.id}", + synapse_client=self.syn, + include_row_id_and_row_version=False, + ) + + # Verify data types and values match for all columns + assert len(results_after_insert) == 3 + + # Row 0 - all non-null values + assert results_after_insert["column_string"][0] == "value1" + assert results_after_insert["column_double"][0] == 1.1 + assert results_after_insert["column_integer"][0] == 1 + assert results_after_insert["column_boolean"][0] is True + assert results_after_insert["column_date"][0] == utils.to_unix_epoch_time( + "2021-01-01" + ) + assert results_after_insert["column_filehandleid"][0] == file.file_handle.id + assert results_after_insert["column_entityid"][0] == file.id + assert results_after_insert["column_submissionid"][0] == submission.id + assert results_after_insert["column_evaluationid"][0] == evaluation.id + assert ( + results_after_insert["column_link"][0] + == "https://www.synapse.org/Profile:" + ) + assert results_after_insert["column_mediumtext"][0] == "value1" + assert results_after_insert["column_largetext"][0] == "value1" + assert ( + results_after_insert["column_userid"][0] + == self.syn.credentials.owner_id + ) + assert results_after_insert["column_string_LIST"][0] == ["value1", "value2"] + assert results_after_insert["column_integer_LIST"][0] == [1, 2] + assert results_after_insert["column_boolean_LIST"][0] == [True, False] + assert results_after_insert["column_date_LIST"][0] == [ + utils.to_unix_epoch_time("2021-01-01"), + utils.to_unix_epoch_time("2021-01-02"), + ] + assert results_after_insert["column_entity_id_list"][0] == [ + file.id, + file.id, + ] + assert results_after_insert["column_user_id_list"][0] == [ + self.syn.credentials.owner_id, + self.syn.credentials.owner_id, + ] + assert results_after_insert["column_json"][0] == { + "key1": "value1", + "key2": 2, + } + + assert results_after_insert["column_string"][1] == "value2" + + # Row 1 - all null values + assert pd.isna(results_after_insert["column_double"][1]) + assert pd.isna(results_after_insert["column_integer"][1]) + assert pd.isna(results_after_insert["column_boolean"][1]) + assert pd.isna(results_after_insert["column_date"][1]) + assert pd.isna(results_after_insert["column_filehandleid"][1]) + assert pd.isna(results_after_insert["column_entityid"][1]) + assert pd.isna(results_after_insert["column_submissionid"][1]) + assert pd.isna(results_after_insert["column_evaluationid"][1]) + assert pd.isna(results_after_insert["column_link"][1]) + assert pd.isna(results_after_insert["column_mediumtext"][1]) + assert pd.isna(results_after_insert["column_largetext"][1]) + assert pd.isna(results_after_insert["column_userid"][1]) + assert len(results_after_insert["column_string_LIST"][1]) == 0 + assert len(results_after_insert["column_integer_LIST"][1]) == 0 + assert len(results_after_insert["column_boolean_LIST"][1]) == 0 + assert len(results_after_insert["column_date_LIST"][1]) == 0 + assert len(results_after_insert["column_entity_id_list"][1]) == 0 + assert len(results_after_insert["column_user_id_list"][1]) == 0 + assert len(results_after_insert["column_json"][1]) == 0 + + # Row 2 - all non-null values + assert results_after_insert["column_string"][2] == "value3" + assert results_after_insert["column_double"][2] == 2.2 + assert results_after_insert["column_integer"][2] == 3 + assert results_after_insert["column_boolean"][2] is True + assert results_after_insert["column_date"][2] == utils.to_unix_epoch_time( + "2021-01-03" + ) + assert results_after_insert["column_filehandleid"][2] == file.file_handle.id + assert results_after_insert["column_entityid"][2] == file.id + assert results_after_insert["column_submissionid"][2] == submission.id + assert results_after_insert["column_evaluationid"][2] == evaluation.id + assert ( + results_after_insert["column_link"][2] + == "https://www.synapse.org/Profile:" + ) + assert results_after_insert["column_mediumtext"][2] == "value3" + assert results_after_insert["column_largetext"][2] == "value3" + assert ( + results_after_insert["column_userid"][2] + == self.syn.credentials.owner_id + ) + assert results_after_insert["column_string_LIST"][2] == ["value5", "value6"] + assert results_after_insert["column_integer_LIST"][2] == [5, 6] + assert results_after_insert["column_boolean_LIST"][2] == [True, False] + assert results_after_insert["column_date_LIST"][2] == [ + utils.to_unix_epoch_time("2021-01-05"), + utils.to_unix_epoch_time("2021-01-06"), + ] + assert results_after_insert["column_entity_id_list"][2] == [ + file.id, + file.id, + ] + assert results_after_insert["column_user_id_list"][2] == [ + self.syn.credentials.owner_id, + self.syn.credentials.owner_id, + ] + assert results_after_insert["column_json"][2] == { + "key5": "value5", + "key6": 6, + } + # Create a second test file to update references path2 = utils.make_bogus_data_file() self.schedule_for_cleanup(path2) @@ -1454,60 +1566,60 @@ async def test_upsert_all_data_types( synapse_client=self.syn ) - # WHEN I upsert with updated data for all types + # WHEN I upsert with updated data for all types, including null values updated_data = pd.DataFrame( { # Basic types with updated values "column_string": ["value1", "value2", "value3"], - "column_double": [11.2, 22.3, 33.4], - "column_integer": [11, 22, 33], - "column_boolean": [False, False, False], + "column_double": [11.2, None, 33.4], + "column_integer": [11, None, 33], + "column_boolean": [False, None, False], "column_date": [ utils.to_unix_epoch_time("2022-01-01"), - utils.to_unix_epoch_time("2022-01-02"), + None, utils.to_unix_epoch_time("2022-01-03"), ], # Updated references "column_filehandleid": [ - int(file2.file_handle.id), - int(file2.file_handle.id), - int(file2.file_handle.id), + file2.file_handle.id, + None, + file2.file_handle.id, ], - "column_entityid": [file2.id, file2.id, file2.id], + "column_entityid": [file2.id, None, file2.id], "column_submissionid": [ - int(submission.id), - int(submission.id), - int(submission.id), + submission.id, + None, + submission.id, ], "column_evaluationid": [ - int(evaluation.id), - int(evaluation.id), - int(evaluation.id), + evaluation.id, + None, + evaluation.id, ], # Updated text "column_link": [ "https://www.synapse.org/", - "https://www.synapse.org/", + None, "https://www.synapse.org/", ], - "column_mediumtext": ["value11", "value22", "value33"], - "column_largetext": ["value11", "value22", "value33"], + "column_mediumtext": ["value11", None, "value33"], + "column_largetext": ["value11", None, "value33"], # User IDs "column_userid": [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), + self.syn.credentials.owner_id, + None, + self.syn.credentials.owner_id, ], # Updated list types "column_string_LIST": [ ["value11", "value22"], - ["value33", "value44"], + None, ["value55", "value66"], ], - "column_integer_LIST": [[11, 22], [33, 44], [55, 66]], + "column_integer_LIST": [[11, 22], None, [55, 66]], "column_boolean_LIST": [ [False, True], - [False, True], + None, [False, True], ], "column_date_LIST": [ @@ -1515,10 +1627,7 @@ async def test_upsert_all_data_types( utils.to_unix_epoch_time("2022-01-01"), utils.to_unix_epoch_time("2022-01-02"), ], - [ - utils.to_unix_epoch_time("2022-01-03"), - utils.to_unix_epoch_time("2022-01-04"), - ], + None, [ utils.to_unix_epoch_time("2022-01-05"), utils.to_unix_epoch_time("2022-01-06"), @@ -1526,27 +1635,24 @@ async def test_upsert_all_data_types( ], "column_entity_id_list": [ [file2.id, file2.id], - [file2.id, file2.id], + None, [file2.id, file2.id], ], "column_user_id_list": [ [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), - ], - [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), + self.syn.credentials.owner_id, + self.syn.credentials.owner_id, ], + None, [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), + self.syn.credentials.owner_id, + self.syn.credentials.owner_id, ], ], # JSON "column_json": [ json.dumps({"key11": "value11", "key22": 22}), - json.dumps({"key33": "value33", "key44": 44}), + None, json.dumps({"key55": "value55", "key66": 66}), ], } @@ -1566,13 +1672,59 @@ async def test_upsert_all_data_types( include_row_id_and_row_version=False, ) - # Check that all values were updated correctly - # Convert to JSON for easy comparison - original_as_string = updated_data.to_json() - modified_as_string = results.to_json() - assert original_as_string == modified_as_string + # Verify the upserted data matches expected values and handles nulls correctly + assert len(results) == 3 + + # Check string column (primary key) + assert results["column_string"][0] == "value1" + assert results["column_string"][1] == "value2" + assert results["column_string"][2] == "value3" + + # Check numeric types with null + assert results["column_double"][0] == 11.2 + assert pd.isna(results["column_double"][1]) + assert results["column_double"][2] == 33.4 + + assert results["column_integer"][0] == 11 + assert pd.isna(results["column_integer"][1]) + assert results["column_integer"][2] == 33 + + # Check boolean with null + assert results["column_boolean"][0] is False + assert pd.isna(results["column_boolean"][1]) + assert results["column_boolean"][2] is False + + # Check date with null + assert results["column_date"][0] == utils.to_unix_epoch_time("2022-01-01") + assert pd.isna(results["column_date"][1]) + assert results["column_date"][2] == utils.to_unix_epoch_time("2022-01-03") + + # Check reference types with nulls + assert results["column_filehandleid"][0] == file2.file_handle.id + assert pd.isna(results["column_filehandleid"][1]) + + assert results["column_entityid"][0] == file2.id + assert pd.isna(results["column_entityid"][1]) + + # Check text types with nulls + assert results["column_mediumtext"][0] == "value11" + assert pd.isna(results["column_mediumtext"][1]) + assert results["column_mediumtext"][2] == "value33" + + # Check list types with nulls + assert results["column_string_LIST"][0] == ["value11", "value22"] + assert len(results["column_string_LIST"][1]) == 0 + assert results["column_string_LIST"][2] == ["value55", "value66"] - # WHEN I upsert with multiple primary keys + assert results["column_integer_LIST"][0] == [11, 22] + assert len(results["column_integer_LIST"][1]) == 0 + + # Check JSON with null + assert results["column_json"][0] == {"key11": "value11", "key22": 22} + assert len(results["column_json"][1]) == 0 + assert results["column_json"][2] == {"key55": "value55", "key66": 66} + + # WHEN I upsert with multiple primary keys and null values multi_key_data = pd.DataFrame( { # Just using a subset of columns for this test case @@ -1586,43 +1738,43 @@ async def test_upsert_all_data_types( utils.to_unix_epoch_time("2021-01-03"), ], "column_filehandleid": [ - int(file.file_handle.id), - int(file.file_handle.id), - int(file.file_handle.id), + file.file_handle.id, + None, + file.file_handle.id, ], - "column_entityid": [file.id, file.id, file.id], + "column_entityid": [file.id, None, file.id], "column_submissionid": [ - int(submission.id), - int(submission.id), - int(submission.id), + submission.id, + None, + submission.id, ], "column_evaluationid": [ - int(evaluation.id), - int(evaluation.id), - int(evaluation.id), + evaluation.id, + None, + evaluation.id, ], "column_link": [ "https://www.synapse.org/", - "https://www.synapse.org/", + None, "https://www.synapse.org/", ], - "column_mediumtext": ["updated1", "updated2", "updated3"], - "column_largetext": ["largetext1", "largetext2", "largetext3"], + "column_mediumtext": ["updated1", None, "updated3"], + "column_largetext": ["largetext1", None, "largetext3"], "column_userid": [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), + self.syn.credentials.owner_id, + None, + self.syn.credentials.owner_id, ], # Simplified list data "column_string_LIST": [ ["a", "b"], - ["c", "d"], + None, ["e", "f"], ], - "column_integer_LIST": [[9, 8], [7, 6], [5, 4]], + "column_integer_LIST": [[9, 8], None, [5, 4]], "column_boolean_LIST": [ [True, True], - [True, True], + None, [True, True], ], "column_date_LIST": [ @@ -1630,10 +1782,7 @@ async def test_upsert_all_data_types( utils.to_unix_epoch_time("2023-01-01"), utils.to_unix_epoch_time("2023-01-02"), ], - [ - utils.to_unix_epoch_time("2023-01-03"), - utils.to_unix_epoch_time("2023-01-04"), - ], + None, [ utils.to_unix_epoch_time("2023-01-05"), utils.to_unix_epoch_time("2023-01-06"), @@ -1641,26 +1790,23 @@ async def test_upsert_all_data_types( ], "column_entity_id_list": [ [file.id, file.id], - [file.id, file.id], + None, [file.id, file.id], ], "column_user_id_list": [ [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), + self.syn.credentials.owner_id, + self.syn.credentials.owner_id, ], + None, [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), - ], - [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), + self.syn.credentials.owner_id, + self.syn.credentials.owner_id, ], ], "column_json": [ json.dumps({"final1": "value1"}), - json.dumps({"final2": "value2"}), + None, json.dumps({"final3": "value3"}), ], } @@ -1690,6 +1836,57 @@ async def test_upsert_all_data_types( # We should have more rows now (original 3 + 3 new ones) assert len(results_after_multi_key) == 6 + # Verify that null values are properly handled in the newly inserted rows + # Find the rows with the new string values + new_rows = results_after_multi_key[ + results_after_multi_key["column_string"].isin(["this", "is", "updated"]) + ] + assert len(new_rows) == 3 + + for _, row in new_rows.iterrows(): + if row["column_string"] == "this": + assert row["column_double"] == 1.1 + assert row["column_integer"] == 1 + assert row["column_boolean"] is True + assert row["column_date"] == utils.to_unix_epoch_time("2021-01-01") + assert row["column_filehandleid"] == file.file_handle.id + assert row["column_entityid"] == file.id + assert row["column_mediumtext"] == "updated1" + assert row["column_largetext"] == "largetext1" + assert row["column_userid"] == self.syn.credentials.owner_id + assert row["column_string_LIST"] == ["a", "b"] + assert row["column_integer_LIST"] == [9, 8] + assert row["column_boolean_LIST"] == [True, True] + assert row["column_date_LIST"] == [ + utils.to_unix_epoch_time("2023-01-01"), + utils.to_unix_epoch_time("2023-01-02"), + ] + assert row["column_json"] == {"final1": "value1"} + elif row["column_string"] == "is": + assert row["column_double"] == 2.2 + assert row["column_integer"] == 2 + assert row["column_boolean"] is True + assert row["column_date"] == utils.to_unix_epoch_time("2021-01-02") + assert pd.isna(row["column_filehandleid"]) + assert pd.isna(row["column_entityid"]) + assert pd.isna(row["column_mediumtext"]) + assert pd.isna(row["column_largetext"]) + assert pd.isna(row["column_userid"]) + assert len(row["column_string_LIST"]) == 0 + assert len(row["column_integer_LIST"]) == 0 + assert len(row["column_boolean_LIST"]) == 0 + assert len(row["column_date_LIST"]) == 0 + assert len(row["column_json"]) == 0 + elif row["column_string"] == "updated": + assert row["column_double"] == 3.3 + assert row["column_integer"] == 3 + assert row["column_boolean"] is True + assert row["column_date"] == utils.to_unix_epoch_time("2021-01-03") + assert row["column_filehandleid"] == file.file_handle.id + assert row["column_entityid"] == file.id + assert row["column_mediumtext"] == "updated3" + assert row["column_largetext"] == "largetext3" + finally: # Clean up self.syn.delete(evaluation) diff --git a/tests/integration/synapseclient/models/synchronous/test_table.py b/tests/integration/synapseclient/models/synchronous/test_table.py index 1ba4b88b5..cc2dd26af 100644 --- a/tests/integration/synapseclient/models/synchronous/test_table.py +++ b/tests/integration/synapseclient/models/synchronous/test_table.py @@ -1239,9 +1239,7 @@ async def test_upsert_with_large_data_and_batching( # AND multiple batch jobs should have been created due to batching settings assert spy_send_job.call_count == 7 # More batches due to small size settings - async def test_upsert_all_data_types( - self, mocker: MockerFixture, project_model: Project - ) -> None: + def test_upsert_all_data_types(self, project_model: Project) -> None: """Test upserting all supported data types to ensure type compatibility.""" # GIVEN a table in Synapse with all data types table_name = str(uuid.uuid4()) @@ -1297,60 +1295,60 @@ async def test_upsert_all_data_types( evaluation, file.id, name="Submission 1", submitterAlias="My Team" ) - # GIVEN initial data with all data types + # GIVEN initial data with all data types, including random null values initial_data = pd.DataFrame( { # Basic types "column_string": ["value1", "value2", "value3"], - "column_double": [1.1, 2.2, 3.3], - "column_integer": [1, 2, 3], - "column_boolean": [True, True, True], + "column_double": [1.1, None, 2.2], + "column_integer": [1, None, 3], + "column_boolean": [True, None, True], "column_date": [ utils.to_unix_epoch_time("2021-01-01"), - utils.to_unix_epoch_time("2021-01-02"), + None, utils.to_unix_epoch_time("2021-01-03"), ], # Reference types "column_filehandleid": [ file.file_handle.id, - file.file_handle.id, + None, file.file_handle.id, ], - "column_entityid": [file.id, file.id, file.id], + "column_entityid": [file.id, None, file.id], "column_submissionid": [ submission.id, - submission.id, + None, submission.id, ], "column_evaluationid": [ evaluation.id, - evaluation.id, + None, evaluation.id, ], # Text types "column_link": [ "https://www.synapse.org/Profile:", - "https://www.synapse.org/Profile:", + None, "https://www.synapse.org/Profile:", ], - "column_mediumtext": ["value1", "value2", "value3"], - "column_largetext": ["value1", "value2", "value3"], + "column_mediumtext": ["value1", None, "value3"], + "column_largetext": ["value1", None, "value3"], # User IDs "column_userid": [ self.syn.credentials.owner_id, - self.syn.credentials.owner_id, + None, self.syn.credentials.owner_id, ], # List types "column_string_LIST": [ ["value1", "value2"], - ["value3", "value4"], + None, ["value5", "value6"], ], - "column_integer_LIST": [[1, 2], [3, 4], [5, 6]], + "column_integer_LIST": [[1, 2], None, [5, 6]], "column_boolean_LIST": [ [True, False], - [True, False], + None, [True, False], ], "column_date_LIST": [ @@ -1358,10 +1356,7 @@ async def test_upsert_all_data_types( utils.to_unix_epoch_time("2021-01-01"), utils.to_unix_epoch_time("2021-01-02"), ], - [ - utils.to_unix_epoch_time("2021-01-03"), - utils.to_unix_epoch_time("2021-01-04"), - ], + None, [ utils.to_unix_epoch_time("2021-01-05"), utils.to_unix_epoch_time("2021-01-06"), @@ -1369,18 +1364,18 @@ async def test_upsert_all_data_types( ], "column_entity_id_list": [ [file.id, file.id], - [file.id, file.id], + None, [file.id, file.id], ], "column_user_id_list": [ [self.syn.credentials.owner_id, self.syn.credentials.owner_id], - [self.syn.credentials.owner_id, self.syn.credentials.owner_id], + None, [self.syn.credentials.owner_id, self.syn.credentials.owner_id], ], # JSON type "column_json": [ {"key1": "value1", "key2": 2}, - {"key3": "value3", "key4": 4}, + None, {"key5": "value5", "key6": 6}, ], } @@ -1393,67 +1388,184 @@ async def test_upsert_all_data_types( synapse_client=self.syn, ) + # THEN verify the initial data was stored correctly + results_after_insert = query( + f"SELECT * FROM {table.id}", + synapse_client=self.syn, + include_row_id_and_row_version=False, + ) + + # Verify data types and values match for all columns + assert len(results_after_insert) == 3 + + # Row 0 - all non-null values + assert results_after_insert["column_string"][0] == "value1" + assert results_after_insert["column_double"][0] == 1.1 + assert results_after_insert["column_integer"][0] == 1 + assert results_after_insert["column_boolean"][0] is True + assert results_after_insert["column_date"][0] == utils.to_unix_epoch_time( + "2021-01-01" + ) + assert results_after_insert["column_filehandleid"][0] == file.file_handle.id + assert results_after_insert["column_entityid"][0] == file.id + assert results_after_insert["column_submissionid"][0] == submission.id + assert results_after_insert["column_evaluationid"][0] == evaluation.id + assert ( + results_after_insert["column_link"][0] + == "https://www.synapse.org/Profile:" + ) + assert results_after_insert["column_mediumtext"][0] == "value1" + assert results_after_insert["column_largetext"][0] == "value1" + assert ( + results_after_insert["column_userid"][0] + == self.syn.credentials.owner_id + ) + assert results_after_insert["column_string_LIST"][0] == ["value1", "value2"] + assert results_after_insert["column_integer_LIST"][0] == [1, 2] + assert results_after_insert["column_boolean_LIST"][0] == [True, False] + assert results_after_insert["column_date_LIST"][0] == [ + utils.to_unix_epoch_time("2021-01-01"), + utils.to_unix_epoch_time("2021-01-02"), + ] + assert results_after_insert["column_entity_id_list"][0] == [ + file.id, + file.id, + ] + assert results_after_insert["column_user_id_list"][0] == [ + self.syn.credentials.owner_id, + self.syn.credentials.owner_id, + ] + assert results_after_insert["column_json"][0] == { + "key1": "value1", + "key2": 2, + } + + assert results_after_insert["column_string"][1] == "value2" + + # Row 1 - all null values + assert pd.isna(results_after_insert["column_double"][1]) + assert pd.isna(results_after_insert["column_integer"][1]) + assert pd.isna(results_after_insert["column_boolean"][1]) + assert pd.isna(results_after_insert["column_date"][1]) + assert pd.isna(results_after_insert["column_filehandleid"][1]) + assert pd.isna(results_after_insert["column_entityid"][1]) + assert pd.isna(results_after_insert["column_submissionid"][1]) + assert pd.isna(results_after_insert["column_evaluationid"][1]) + assert pd.isna(results_after_insert["column_link"][1]) + assert pd.isna(results_after_insert["column_mediumtext"][1]) + assert pd.isna(results_after_insert["column_largetext"][1]) + assert pd.isna(results_after_insert["column_userid"][1]) + assert len(results_after_insert["column_string_LIST"][1]) == 0 + assert len(results_after_insert["column_integer_LIST"][1]) == 0 + assert len(results_after_insert["column_boolean_LIST"][1]) == 0 + assert len(results_after_insert["column_date_LIST"][1]) == 0 + assert len(results_after_insert["column_entity_id_list"][1]) == 0 + assert len(results_after_insert["column_user_id_list"][1]) == 0 + assert len(results_after_insert["column_json"][1]) == 0 + + # Row 2 - all non-null values + assert results_after_insert["column_string"][2] == "value3" + assert results_after_insert["column_double"][2] == 2.2 + assert results_after_insert["column_integer"][2] == 3 + assert results_after_insert["column_boolean"][2] is True + assert results_after_insert["column_date"][2] == utils.to_unix_epoch_time( + "2021-01-03" + ) + assert results_after_insert["column_filehandleid"][2] == file.file_handle.id + assert results_after_insert["column_entityid"][2] == file.id + assert results_after_insert["column_submissionid"][2] == submission.id + assert results_after_insert["column_evaluationid"][2] == evaluation.id + assert ( + results_after_insert["column_link"][2] + == "https://www.synapse.org/Profile:" + ) + assert results_after_insert["column_mediumtext"][2] == "value3" + assert results_after_insert["column_largetext"][2] == "value3" + assert ( + results_after_insert["column_userid"][2] + == self.syn.credentials.owner_id + ) + assert results_after_insert["column_string_LIST"][2] == ["value5", "value6"] + assert results_after_insert["column_integer_LIST"][2] == [5, 6] + assert results_after_insert["column_boolean_LIST"][2] == [True, False] + assert results_after_insert["column_date_LIST"][2] == [ + utils.to_unix_epoch_time("2021-01-05"), + utils.to_unix_epoch_time("2021-01-06"), + ] + assert results_after_insert["column_entity_id_list"][2] == [ + file.id, + file.id, + ] + assert results_after_insert["column_user_id_list"][2] == [ + self.syn.credentials.owner_id, + self.syn.credentials.owner_id, + ] + assert results_after_insert["column_json"][2] == { + "key5": "value5", + "key6": 6, + } + # Create a second test file to update references path2 = utils.make_bogus_data_file() self.schedule_for_cleanup(path2) - file2 = File(parent_id=project_model.id, path=path2).store( + file2: File = File(parent_id=project_model.id, path=path2).store( synapse_client=self.syn ) - # WHEN I upsert with updated data for all types + # WHEN I upsert with updated data for all types, including null values updated_data = pd.DataFrame( { # Basic types with updated values "column_string": ["value1", "value2", "value3"], - "column_double": [11.2, 22.3, 33.4], - "column_integer": [11, 22, 33], - "column_boolean": [False, False, False], + "column_double": [11.2, None, 33.4], + "column_integer": [11, None, 33], + "column_boolean": [False, None, False], "column_date": [ utils.to_unix_epoch_time("2022-01-01"), - utils.to_unix_epoch_time("2022-01-02"), + None, utils.to_unix_epoch_time("2022-01-03"), ], # Updated references "column_filehandleid": [ - int(file2.file_handle.id), - int(file2.file_handle.id), - int(file2.file_handle.id), + file2.file_handle.id, + None, + file2.file_handle.id, ], - "column_entityid": [file2.id, file2.id, file2.id], + "column_entityid": [file2.id, None, file2.id], "column_submissionid": [ - int(submission.id), - int(submission.id), - int(submission.id), + submission.id, + None, + submission.id, ], "column_evaluationid": [ - int(evaluation.id), - int(evaluation.id), - int(evaluation.id), + evaluation.id, + None, + evaluation.id, ], # Updated text "column_link": [ "https://www.synapse.org/", - "https://www.synapse.org/", + None, "https://www.synapse.org/", ], - "column_mediumtext": ["value11", "value22", "value33"], - "column_largetext": ["value11", "value22", "value33"], + "column_mediumtext": ["value11", None, "value33"], + "column_largetext": ["value11", None, "value33"], # User IDs "column_userid": [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), + self.syn.credentials.owner_id, + None, + self.syn.credentials.owner_id, ], # Updated list types "column_string_LIST": [ ["value11", "value22"], - ["value33", "value44"], + None, ["value55", "value66"], ], - "column_integer_LIST": [[11, 22], [33, 44], [55, 66]], + "column_integer_LIST": [[11, 22], None, [55, 66]], "column_boolean_LIST": [ [False, True], - [False, True], + None, [False, True], ], "column_date_LIST": [ @@ -1461,10 +1573,7 @@ async def test_upsert_all_data_types( utils.to_unix_epoch_time("2022-01-01"), utils.to_unix_epoch_time("2022-01-02"), ], - [ - utils.to_unix_epoch_time("2022-01-03"), - utils.to_unix_epoch_time("2022-01-04"), - ], + None, [ utils.to_unix_epoch_time("2022-01-05"), utils.to_unix_epoch_time("2022-01-06"), @@ -1472,27 +1581,24 @@ async def test_upsert_all_data_types( ], "column_entity_id_list": [ [file2.id, file2.id], - [file2.id, file2.id], + None, [file2.id, file2.id], ], "column_user_id_list": [ [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), - ], - [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), + self.syn.credentials.owner_id, + self.syn.credentials.owner_id, ], + None, [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), + self.syn.credentials.owner_id, + self.syn.credentials.owner_id, ], ], # JSON "column_json": [ json.dumps({"key11": "value11", "key22": 22}), - json.dumps({"key33": "value33", "key44": 44}), + None, json.dumps({"key55": "value55", "key66": 66}), ], } @@ -1512,13 +1618,59 @@ async def test_upsert_all_data_types( include_row_id_and_row_version=False, ) - # Check that all values were updated correctly - # Convert to JSON for easy comparison - original_as_string = updated_data.to_json() - modified_as_string = results.to_json() - assert original_as_string == modified_as_string + # Verify the upserted data matches expected values and handles nulls correctly + assert len(results) == 3 + + # Check string column (primary key) + assert results["column_string"][0] == "value1" + assert results["column_string"][1] == "value2" + assert results["column_string"][2] == "value3" + + # Check numeric types with null + assert results["column_double"][0] == 11.2 + assert pd.isna(results["column_double"][1]) + assert results["column_double"][2] == 33.4 + + assert results["column_integer"][0] == 11 + assert pd.isna(results["column_integer"][1]) + assert results["column_integer"][2] == 33 + + # Check boolean with null + assert results["column_boolean"][0] is False + assert pd.isna(results["column_boolean"][1]) + assert results["column_boolean"][2] is False + + # Check date with null + assert results["column_date"][0] == utils.to_unix_epoch_time("2022-01-01") + assert pd.isna(results["column_date"][1]) + assert results["column_date"][2] == utils.to_unix_epoch_time("2022-01-03") + + # Check reference types with nulls + assert results["column_filehandleid"][0] == file2.file_handle.id + assert pd.isna(results["column_filehandleid"][1]) + + assert results["column_entityid"][0] == file2.id + assert pd.isna(results["column_entityid"][1]) + + # Check text types with nulls + assert results["column_mediumtext"][0] == "value11" + assert pd.isna(results["column_mediumtext"][1]) + assert results["column_mediumtext"][2] == "value33" + + # Check list types with nulls + assert results["column_string_LIST"][0] == ["value11", "value22"] + assert len(results["column_string_LIST"][1]) == 0 + assert results["column_string_LIST"][2] == ["value55", "value66"] - # WHEN I upsert with multiple primary keys + assert results["column_integer_LIST"][0] == [11, 22] + assert len(results["column_integer_LIST"][1]) == 0 + + # Check JSON with null + assert results["column_json"][0] == {"key11": "value11", "key22": 22} + assert len(results["column_json"][1]) == 0 + assert results["column_json"][2] == {"key55": "value55", "key66": 66} + + # WHEN I upsert with multiple primary keys and null values multi_key_data = pd.DataFrame( { # Just using a subset of columns for this test case @@ -1532,43 +1684,43 @@ async def test_upsert_all_data_types( utils.to_unix_epoch_time("2021-01-03"), ], "column_filehandleid": [ - int(file.file_handle.id), - int(file.file_handle.id), - int(file.file_handle.id), + file.file_handle.id, + None, + file.file_handle.id, ], - "column_entityid": [file.id, file.id, file.id], + "column_entityid": [file.id, None, file.id], "column_submissionid": [ - int(submission.id), - int(submission.id), - int(submission.id), + submission.id, + None, + submission.id, ], "column_evaluationid": [ - int(evaluation.id), - int(evaluation.id), - int(evaluation.id), + evaluation.id, + None, + evaluation.id, ], "column_link": [ "https://www.synapse.org/", - "https://www.synapse.org/", + None, "https://www.synapse.org/", ], - "column_mediumtext": ["updated1", "updated2", "updated3"], - "column_largetext": ["largetext1", "largetext2", "largetext3"], + "column_mediumtext": ["updated1", None, "updated3"], + "column_largetext": ["largetext1", None, "largetext3"], "column_userid": [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), + self.syn.credentials.owner_id, + None, + self.syn.credentials.owner_id, ], # Simplified list data "column_string_LIST": [ ["a", "b"], - ["c", "d"], + None, ["e", "f"], ], - "column_integer_LIST": [[9, 8], [7, 6], [5, 4]], + "column_integer_LIST": [[9, 8], None, [5, 4]], "column_boolean_LIST": [ [True, True], - [True, True], + None, [True, True], ], "column_date_LIST": [ @@ -1576,10 +1728,7 @@ async def test_upsert_all_data_types( utils.to_unix_epoch_time("2023-01-01"), utils.to_unix_epoch_time("2023-01-02"), ], - [ - utils.to_unix_epoch_time("2023-01-03"), - utils.to_unix_epoch_time("2023-01-04"), - ], + None, [ utils.to_unix_epoch_time("2023-01-05"), utils.to_unix_epoch_time("2023-01-06"), @@ -1587,26 +1736,23 @@ async def test_upsert_all_data_types( ], "column_entity_id_list": [ [file.id, file.id], - [file.id, file.id], + None, [file.id, file.id], ], "column_user_id_list": [ [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), + self.syn.credentials.owner_id, + self.syn.credentials.owner_id, ], + None, [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), - ], - [ - int(self.syn.credentials.owner_id), - int(self.syn.credentials.owner_id), + self.syn.credentials.owner_id, + self.syn.credentials.owner_id, ], ], "column_json": [ json.dumps({"final1": "value1"}), - json.dumps({"final2": "value2"}), + None, json.dumps({"final3": "value3"}), ], } @@ -1636,6 +1782,57 @@ async def test_upsert_all_data_types( # We should have more rows now (original 3 + 3 new ones) assert len(results_after_multi_key) == 6 + # Verify that null values are properly handled in the newly inserted rows + # Find the rows with the new string values + new_rows = results_after_multi_key[ + results_after_multi_key["column_string"].isin(["this", "is", "updated"]) + ] + assert len(new_rows) == 3 + + for _, row in new_rows.iterrows(): + if row["column_string"] == "this": + assert row["column_double"] == 1.1 + assert row["column_integer"] == 1 + assert row["column_boolean"] is True + assert row["column_date"] == utils.to_unix_epoch_time("2021-01-01") + assert row["column_filehandleid"] == file.file_handle.id + assert row["column_entityid"] == file.id + assert row["column_mediumtext"] == "updated1" + assert row["column_largetext"] == "largetext1" + assert row["column_userid"] == self.syn.credentials.owner_id + assert row["column_string_LIST"] == ["a", "b"] + assert row["column_integer_LIST"] == [9, 8] + assert row["column_boolean_LIST"] == [True, True] + assert row["column_date_LIST"] == [ + utils.to_unix_epoch_time("2023-01-01"), + utils.to_unix_epoch_time("2023-01-02"), + ] + assert row["column_json"] == {"final1": "value1"} + elif row["column_string"] == "is": + assert row["column_double"] == 2.2 + assert row["column_integer"] == 2 + assert row["column_boolean"] is True + assert row["column_date"] == utils.to_unix_epoch_time("2021-01-02") + assert pd.isna(row["column_filehandleid"]) + assert pd.isna(row["column_entityid"]) + assert pd.isna(row["column_mediumtext"]) + assert pd.isna(row["column_largetext"]) + assert pd.isna(row["column_userid"]) + assert len(row["column_string_LIST"]) == 0 + assert len(row["column_integer_LIST"]) == 0 + assert len(row["column_boolean_LIST"]) == 0 + assert len(row["column_date_LIST"]) == 0 + assert len(row["column_json"]) == 0 + elif row["column_string"] == "updated": + assert row["column_double"] == 3.3 + assert row["column_integer"] == 3 + assert row["column_boolean"] is True + assert row["column_date"] == utils.to_unix_epoch_time("2021-01-03") + assert row["column_filehandleid"] == file.file_handle.id + assert row["column_entityid"] == file.id + assert row["column_mediumtext"] == "updated3" + assert row["column_largetext"] == "largetext3" + finally: # Clean up self.syn.delete(evaluation) diff --git a/tests/unit/synapseclient/mixins/unit_test_table_components.py b/tests/unit/synapseclient/mixins/unit_test_table_components.py index e2a91e6dd..13781cdf5 100644 --- a/tests/unit/synapseclient/mixins/unit_test_table_components.py +++ b/tests/unit/synapseclient/mixins/unit_test_table_components.py @@ -8,6 +8,7 @@ import pandas as pd import pytest +import numpy as np from synapseclient import Synapse from synapseclient.api import ViewEntityType, ViewTypeMask from synapseclient.core.constants.concrete_types import ( @@ -16,7 +17,7 @@ QUERY_TABLE_CSV_REQUEST, ) from synapseclient.core.utils import MB -from synapseclient.models import Activity, Column, ColumnType +from synapseclient.models import Activity, Column from synapseclient.models.mixins.table_components import ( ColumnMixin, DeleteMixin, @@ -31,6 +32,7 @@ ViewSnapshotMixin, ViewStoreMixin, ViewUpdateMixin, + _construct_partial_rows_for_upsert, _query_table_csv, _query_table_next_page, _query_table_row_set, @@ -39,6 +41,7 @@ ActionRequiredCount, ColumnType, CsvTableDescriptor, + PartialRow, Query, QueryBundleRequest, QueryJob, @@ -950,6 +953,778 @@ async def test_upsert_rows_async(self): synapse_client=self.syn, ) + def test_construct_partial_rows_for_upsert_single_value_column_no_na_with_changes( + self, + ): + # GIVEN an entity with single value columns without NA values + test_instance = self.ClassForTest( + id="syn123", + columns={ + "col1": Column(name="col1", column_type=ColumnType.STRING, id="id1"), + "col2": Column(name="col2", column_type=ColumnType.INTEGER, id="id2"), + }, + ) + + # Results from Synapse query (existing rows) + results = pd.DataFrame( + { + "ROW_ID": ["row1", "row2"], + "col1": ["A", "B"], + "col2": [1, 2], + } + ) + + # Data to upsert (with changes) + chunk_to_check_for_upsert = pd.DataFrame( + { + "col1": ["A", "B"], + "col2": [1, 20], # Changed values + } + ) + + primary_keys = ["col1"] + contains_etag = False + wait_for_eventually_consistent_view = False + + # WHEN I call _construct_partial_rows_for_upsert + ( + rows_to_update, + indexes_with_changes, + indexes_without_changes, + syn_id_and_etags, + ) = _construct_partial_rows_for_upsert( + entity=test_instance, + results=results, + chunk_to_check_for_upsert=chunk_to_check_for_upsert, + primary_keys=primary_keys, + contains_etag=contains_etag, + wait_for_eventually_consistent_view=wait_for_eventually_consistent_view, + ) + + # THEN I expect rows to be updated + assert len(rows_to_update) == 1 + assert len(indexes_with_changes) == 1 + assert len(indexes_without_changes) == 1 + assert len(syn_id_and_etags) == 0 + + # Verify the second row update + assert rows_to_update[0].row_id == "row2" + assert rows_to_update[0].etag is None + assert len(rows_to_update[0].values) == 1 + assert rows_to_update[0].values[0]["key"] == "id2" + assert rows_to_update[0].values[0]["value"] == 20 + + # verify first row without changes + assert indexes_without_changes[0] == 0 + + def test_construct_partial_rows_for_upsert_single_value_column_no_na_without_changes( + self, + ): + # GIVEN an entity with single value columns without NA values where values don't change + test_instance = self.ClassForTest( + id="syn123", + columns={ + "col1": Column(name="col1", column_type=ColumnType.STRING, id="id1"), + "col2": Column(name="col2", column_type=ColumnType.INTEGER, id="id2"), + }, + ) + + results = pd.DataFrame( + { + "ROW_ID": ["row1", "row2"], + "col1": ["A", "B"], + "col2": [1, 2], + } + ) + + chunk_to_check_for_upsert = pd.DataFrame( + { + "col1": ["A", "B"], + "col2": [1, 2], # Same values, no changes + } + ) + + primary_keys = ["col1"] + contains_etag = False + wait_for_eventually_consistent_view = False + + # WHEN I call _construct_partial_rows_for_upsert + ( + rows_to_update, + indexes_with_changes, + indexes_without_changes, + syn_id_and_etags, + ) = _construct_partial_rows_for_upsert( + entity=test_instance, + results=results, + chunk_to_check_for_upsert=chunk_to_check_for_upsert, + primary_keys=primary_keys, + contains_etag=contains_etag, + wait_for_eventually_consistent_view=wait_for_eventually_consistent_view, + ) + + # THEN I expect no rows to be updated + assert len(rows_to_update) == 0 + assert len(indexes_with_changes) == 0 + assert len(indexes_without_changes) == 2 + assert len(syn_id_and_etags) == 0 + + def test_construct_partial_rows_for_upsert_single_value_no_na_with_etag(self): + # GIVEN an entity with single value columns without NA values and results containing ROW_ETAG + test_instance = self.ClassForTest( + id="syn123", + columns={ + "col1": Column(name="col1", column_type=ColumnType.STRING, id="id1"), + "col2": Column(name="col2", column_type=ColumnType.INTEGER, id="id2"), + }, + ) + + results = pd.DataFrame( + { + "ROW_ID": ["row1"], + "ROW_ETAG": ["etag1"], + "id": ["syn123"], + "col1": ["A"], + "col2": [1], + } + ) + + chunk_to_check_for_upsert = pd.DataFrame( + { + "col1": ["A"], + "col2": [10], # Changed value + } + ) + + primary_keys = ["col1"] + contains_etag = True + wait_for_eventually_consistent_view = True + + # WHEN I call _construct_partial_rows_for_upsert + ( + rows_to_update, + indexes_with_changes, + indexes_without_changes, + syn_id_and_etags, + ) = _construct_partial_rows_for_upsert( + entity=test_instance, + results=results, + chunk_to_check_for_upsert=chunk_to_check_for_upsert, + primary_keys=primary_keys, + contains_etag=contains_etag, + wait_for_eventually_consistent_view=wait_for_eventually_consistent_view, + ) + + # THEN I expect the row to be updated with etag + assert len(rows_to_update) == 1 + assert rows_to_update[0].row_id == "row1" + assert rows_to_update[0].etag == "etag1" + assert len(indexes_with_changes) == 1 + assert indexes_with_changes[0] == 0 + assert len(indexes_without_changes) == 0 + assert rows_to_update[0].values[0]["key"] == "id2" + assert rows_to_update[0].values[0]["value"] == 10 + assert len(syn_id_and_etags) == 1 + assert syn_id_and_etags["syn123"] == "etag1" + + def test_construct_partial_rows_for_upsert_single_value_column_with_na_values_changes( + self, + ): + # GIVEN an entity with columns and dataframes containing NA values and values change + test_instance = self.ClassForTest( + id="syn123", + columns={ + "col1": Column(name="col1", column_type=ColumnType.STRING, id="id1"), + "col2": Column(name="col2", column_type=ColumnType.INTEGER, id="id2"), + }, + ) + + results = pd.DataFrame( + { + "ROW_ID": ["row1", "row2"], + "col1": ["A", "B"], + "col2": [1, pd.NA], + } + ) + + chunk_to_check_for_upsert = pd.DataFrame( + { + "col1": ["A", "B"], + "col2": [ + pd.NA, + pd.NA, + ], # row2 shouldn't be updated since it both cell and row are NA + } + ) + + primary_keys = ["col1"] + contains_etag = False + wait_for_eventually_consistent_view = False + + # WHEN I call _construct_partial_rows_for_upsert + ( + rows_to_update, + indexes_with_changes, + indexes_without_changes, + syn_id_and_etags, + ) = _construct_partial_rows_for_upsert( + entity=test_instance, + results=results, + chunk_to_check_for_upsert=chunk_to_check_for_upsert, + primary_keys=primary_keys, + contains_etag=contains_etag, + wait_for_eventually_consistent_view=wait_for_eventually_consistent_view, + ) + + # Verify the first row update + assert rows_to_update[0].row_id == "row1" + assert rows_to_update[0].etag is None + assert len(rows_to_update[0].values) == 1 + assert rows_to_update[0].values[0]["key"] == "id2" + assert rows_to_update[0].values[0]["value"] == None + assert len(syn_id_and_etags) == 0 + + def test_construct_partial_rows_for_upsert_with_list_column__no_na_changes(self): + # GIVEN an entity with a list column without NA values where values change + test_instance = self.ClassForTest( + id="syn123", + columns={ + "col1": Column(name="col1", column_type=ColumnType.STRING, id="id1"), + "col2": Column( + name="col2", column_type=ColumnType.STRING_LIST, id="id2" + ), + }, + ) + + results = pd.DataFrame( + { + "ROW_ID": ["row1", "row2"], + "col1": ["A", "B"], + "col2": [["item1", "item2"], ["item3", "item4"]], + } + ) + + chunk_to_check_for_upsert = pd.DataFrame( + { + "col1": ["A", "B"], + "col2": [["item1", "item3"], ["item3", "item4"]], # Changed list value + } + ) + + primary_keys = ["col1"] + contains_etag = False + wait_for_eventually_consistent_view = False + + # WHEN I call _construct_partial_rows_for_upsert + ( + rows_to_update, + indexes_with_changes, + indexes_without_changes, + syn_id_and_etags, + ) = _construct_partial_rows_for_upsert( + entity=test_instance, + results=results, + chunk_to_check_for_upsert=chunk_to_check_for_upsert, + primary_keys=primary_keys, + contains_etag=contains_etag, + wait_for_eventually_consistent_view=wait_for_eventually_consistent_view, + ) + + # THEN I expect the row to be updated + assert len(rows_to_update) == 1 + assert rows_to_update[0].row_id == "row1" + assert len(indexes_with_changes) == 1 + assert indexes_with_changes[0] == 0 + assert rows_to_update[0].values[0]["key"] == "id2" + assert rows_to_update[0].values[0]["value"] == ["item1", "item3"] + + # Verify second row is not tracked since it has no changes + assert len(indexes_without_changes) == 1 + assert indexes_without_changes[0] == 1 + assert len(syn_id_and_etags) == 0 + + def test_construct_partial_rows_for_upsert_with_list_column_with_na_values_changes( + self, + ): + # GIVEN an entity with a List column with NA values where values change + test_instance = self.ClassForTest( + id="syn123", + columns={ + "col1": Column(name="col1", column_type=ColumnType.STRING, id="id1"), + "col2": Column( + name="col2", column_type=ColumnType.STRING_LIST, id="id2" + ), + }, + ) + + results = pd.DataFrame( + { + "ROW_ID": ["row1", "row2"], + "col1": ["A", "B"], + "col2": [["item1", "item2"], [pd.NA, "item4"]], # row2 has NA + } + ) + + chunk_to_check_for_upsert = pd.DataFrame( + { + "col1": ["A", "B"], + "col2": [ + ["item1", "item3"], + ["item3", "item4"], + ], # row 1 and 2 both change + } + ) + primary_keys = ["col1"] + contains_etag = False + wait_for_eventually_consistent_view = False + + # WHEN I call _construct_partial_rows_for_upsert + ( + rows_to_update, + indexes_with_changes, + indexes_without_changes, + syn_id_and_etags, + ) = _construct_partial_rows_for_upsert( + entity=test_instance, + results=results, + chunk_to_check_for_upsert=chunk_to_check_for_upsert, + primary_keys=primary_keys, + contains_etag=contains_etag, + wait_for_eventually_consistent_view=wait_for_eventually_consistent_view, + ) + + # THEN I expect both rows to be updated (value to NA, and NA to value) + assert len(rows_to_update) == 2 + assert len(indexes_with_changes) == 2 + assert len(indexes_without_changes) == 0 + assert len(syn_id_and_etags) == 0 + + # Verify first row: list value changes to NA + assert rows_to_update[0].row_id == "row1" + assert rows_to_update[0].values[0]["key"] == "id2" + assert rows_to_update[0].values[0]["value"] == ["item1", "item3"] + + # Verify second row: NA changes to list value + assert rows_to_update[1].row_id == "row2" + assert rows_to_update[1].values[0]["key"] == "id2" + assert rows_to_update[1].values[0]["value"] == ["item3", "item4"] + + def test_construct_partial_rows_for_upsert_with_list_column_with_na_values_no_changes( + self, + ): + # GIVEN an entity with a LIST column where values don't change + test_instance = self.ClassForTest( + id="syn123", + columns={ + "col1": Column(name="col1", column_type=ColumnType.STRING, id="id1"), + "col2": Column( + name="col2", column_type=ColumnType.STRING_LIST, id="id2" + ), + }, + ) + + results = pd.DataFrame( + { + "ROW_ID": ["row1"], + "col1": ["A"], + "col2": [["item1", "item2", pd.NA]], + } + ) + + chunk_to_check_for_upsert = pd.DataFrame( + { + "col1": ["A"], + "col2": [["item1", "item2", pd.NA]], # Same list value + } + ) + + primary_keys = ["col1"] + contains_etag = False + wait_for_eventually_consistent_view = False + + # WHEN I call _construct_partial_rows_for_upsert + ( + rows_to_update, + indexes_with_changes, + indexes_without_changes, + syn_id_and_etags, + ) = _construct_partial_rows_for_upsert( + entity=test_instance, + results=results, + chunk_to_check_for_upsert=chunk_to_check_for_upsert, + primary_keys=primary_keys, + contains_etag=contains_etag, + wait_for_eventually_consistent_view=wait_for_eventually_consistent_view, + ) + + # THEN I expect no rows to be updated + assert len(rows_to_update) == 0 + assert len(indexes_with_changes) == 0 + assert len(indexes_without_changes) == 1 + assert indexes_without_changes[0] == 0 + assert len(syn_id_and_etags) == 0 + + def test_construct_partial_rows_for_upsert_with_list_column_changes_with_na_values_changes( + self, + ): + # GIVEN an entity with a List column with NA values where values change + test_instance = self.ClassForTest( + id="syn123", + columns={ + "col1": Column(name="col1", column_type=ColumnType.STRING, id="id1"), + "col2": Column( + name="col2", column_type=ColumnType.STRING_LIST, id="id2" + ), + }, + ) + + results = pd.DataFrame( + { + "ROW_ID": ["row1", "row2"], + "col1": ["A", "B"], + "col2": [["item1", "item2"], [pd.NA, "item4"]], # row2 has NA + } + ) + + chunk_to_check_for_upsert = pd.DataFrame( + { + "col1": ["A", "B"], + "col2": [ + ["item1", "item3"], + ["item3", "item4"], + ], # row 1 and 2 both change + } + ) + + primary_keys = ["col1"] + contains_etag = False + wait_for_eventually_consistent_view = False + + # WHEN I call _construct_partial_rows_for_upsert + ( + rows_to_update, + indexes_with_changes, + indexes_without_changes, + syn_id_and_etags, + ) = _construct_partial_rows_for_upsert( + entity=test_instance, + results=results, + chunk_to_check_for_upsert=chunk_to_check_for_upsert, + primary_keys=primary_keys, + contains_etag=contains_etag, + wait_for_eventually_consistent_view=wait_for_eventually_consistent_view, + ) + + # THEN I expect both rows to be updated (value to NA, and NA to value) + assert len(rows_to_update) == 2 + assert len(indexes_with_changes) == 2 + assert len(indexes_without_changes) == 0 + assert len(syn_id_and_etags) == 0 + + # Verify first row: list value changes to NA + assert rows_to_update[0].row_id == "row1" + assert rows_to_update[0].values[0]["key"] == "id2" + assert rows_to_update[0].values[0]["value"] == ["item1", "item3"] + + # Verify second row: NA changes to list value + assert rows_to_update[1].row_id == "row2" + assert rows_to_update[1].values[0]["key"] == "id2" + assert rows_to_update[1].values[0]["value"] == ["item3", "item4"] + + def test_construct_partial_rows_for_upsert_with_numpy_array_comparison_no_na_changes( + self, + ): + # GIVEN an entity where values might be numpy arrays without NA values where values change + test_instance = self.ClassForTest( + id="syn123", + columns={ + "col1": Column(name="col1", column_type=ColumnType.STRING, id="id1"), + "col2": Column( + name="col2", column_type=ColumnType.INTEGER_LIST, id="id2" + ), + }, + ) + + # Create dataframes with numpy arrays + results = pd.DataFrame( + { + "ROW_ID": ["row1", "row2"], + "col1": ["A", "B"], + "col2": [np.array([1, 2, 3]), np.array([4, 5, 6])], + } + ) + + chunk_to_check_for_upsert = pd.DataFrame( + { + "col1": ["A", "B"], + "col2": [ + np.array([1, 2, 4]), + np.array([4, 5, 6]), + ], # Changed array value + } + ) + + primary_keys = ["col1"] + contains_etag = False + wait_for_eventually_consistent_view = False + + # WHEN I call _construct_partial_rows_for_upsert + ( + rows_to_update, + indexes_with_changes, + indexes_without_changes, + syn_id_and_etags, + ) = _construct_partial_rows_for_upsert( + entity=test_instance, + results=results, + chunk_to_check_for_upsert=chunk_to_check_for_upsert, + primary_keys=primary_keys, + contains_etag=contains_etag, + wait_for_eventually_consistent_view=wait_for_eventually_consistent_view, + ) + + # THEN I expect the row to be updated (numpy array comparison should work) + assert len(rows_to_update) == 1 + assert len(indexes_with_changes) == 1 + assert rows_to_update[0].row_id == "row1" + assert rows_to_update[0].values[0]["key"] == "id2" + assert rows_to_update[0].values[0]["value"] == [ + np.int64(1), + np.int64(2), + np.int64(4), + ] + assert len(indexes_without_changes) == 1 + assert len(syn_id_and_etags) == 0 + + def test_construct_partial_rows_for_upsert_with_numpy_array_comparison_with_na_changes( + self, + ): + # GIVEN an entity with numpy arrays that might contain NA values where values change + import numpy as np + + test_instance = self.ClassForTest( + id="syn123", + columns={ + "col1": Column(name="col1", column_type=ColumnType.STRING, id="id1"), + "col2": Column( + name="col2", column_type=ColumnType.INTEGER_LIST, id="id2" + ), + }, + ) + + # Test with arrays containing pd.NA where values change + results = pd.DataFrame( + { + "ROW_ID": ["row1", "row2"], + "col1": ["A", "B"], + "col2": [np.array([1, 2, pd.NA]), np.array([4, 5, 6])], + } + ) + + chunk_to_check_for_upsert = pd.DataFrame( + { + "col1": ["A", "B"], + "col2": [ + np.array([1, 2, pd.NA]), + np.array([4, pd.NA, 6]), + ], # row 2 changes + } + ) + + primary_keys = ["col1"] + contains_etag = False + wait_for_eventually_consistent_view = False + + # WHEN I call _construct_partial_rows_for_upsert + # This should handle the pd.NA comparison gracefully + ( + rows_to_update, + indexes_with_changes, + indexes_without_changes, + syn_id_and_etags, + ) = _construct_partial_rows_for_upsert( + entity=test_instance, + results=results, + chunk_to_check_for_upsert=chunk_to_check_for_upsert, + primary_keys=primary_keys, + contains_etag=contains_etag, + wait_for_eventually_consistent_view=wait_for_eventually_consistent_view, + ) + + # THEN the function should handle this without crashing + assert len(rows_to_update) == 2 + assert len(indexes_with_changes) == 2 + assert rows_to_update[0].row_id == "row1" + assert rows_to_update[0].values[0]["key"] == "id2" + assert rows_to_update[0].values[0]["value"] == [1, 2, pd.NA] + assert indexes_with_changes[0] == 0 + assert rows_to_update[1].row_id == "row2" + assert rows_to_update[1].values[0]["key"] == "id2" + assert rows_to_update[1].values[0]["value"] == [4, pd.NA, 6] + assert len(indexes_without_changes) == 0 + assert len(syn_id_and_etags) == 0 + + def test_construct_partial_rows_for_upsert_with_multiple_primary_keys(self): + # GIVEN an entity with columns and multiple primary keys + test_instance = self.ClassForTest( + id="syn123", + columns={ + "col1": Column(name="col1", column_type=ColumnType.STRING, id="id1"), + "col2": Column(name="col2", column_type=ColumnType.STRING, id="id2"), + "col3": Column(name="col3", column_type=ColumnType.INTEGER, id="id3"), + }, + ) + + results = pd.DataFrame( + { + "ROW_ID": ["row1"], + "col1": ["A"], + "col2": ["B"], + "col3": [1], + } + ) + + chunk_to_check_for_upsert = pd.DataFrame( + { + "col1": ["A"], + "col2": ["B"], + "col3": [10], # Changed value + } + ) + + primary_keys = ["col1", "col2"] + contains_etag = False + wait_for_eventually_consistent_view = False + + # WHEN I call _construct_partial_rows_for_upsert + ( + rows_to_update, + indexes_with_changes, + indexes_without_changes, + syn_id_and_etags, + ) = _construct_partial_rows_for_upsert( + entity=test_instance, + results=results, + chunk_to_check_for_upsert=chunk_to_check_for_upsert, + primary_keys=primary_keys, + contains_etag=contains_etag, + wait_for_eventually_consistent_view=wait_for_eventually_consistent_view, + ) + + # THEN I expect the row to be updated + assert len(rows_to_update) == 1 + assert rows_to_update[0].row_id == "row1" + assert rows_to_update[0].values[0]["key"] == "id3" + assert rows_to_update[0].values[0]["value"] == 10 + assert len(indexes_with_changes) == 1 + assert indexes_with_changes[0] == 0 + assert len(indexes_without_changes) == 0 + assert len(syn_id_and_etags) == 0 + + def test_construct_partial_rows_for_upsert_with_column_not_in_entity(self): + # GIVEN an entity with columns and upsert data containing a column not in entity and changes to the column should be ignored + test_instance = self.ClassForTest( + id="syn123", + columns={ + "col1": Column(name="col1", column_type=ColumnType.STRING, id="id1"), + }, + ) + + results = pd.DataFrame( + { + "ROW_ID": ["row1"], + "col1": ["A"], + } + ) + + chunk_to_check_for_upsert = pd.DataFrame( + { + "col1": ["A"], + "col2": [10], # Column not in entity.columns + } + ) + + primary_keys = ["col1"] + contains_etag = False + wait_for_eventually_consistent_view = False + + # WHEN I call _construct_partial_rows_for_upsert + ( + rows_to_update, + indexes_with_changes, + indexes_without_changes, + syn_id_and_etags, + ) = _construct_partial_rows_for_upsert( + entity=test_instance, + results=results, + chunk_to_check_for_upsert=chunk_to_check_for_upsert, + primary_keys=primary_keys, + contains_etag=contains_etag, + wait_for_eventually_consistent_view=wait_for_eventually_consistent_view, + ) + + # THEN I expect no rows to be updated (col2 is ignored) + assert len(rows_to_update) == 0 + assert len(indexes_with_changes) == 0 + assert len(indexes_without_changes) == 1 + assert indexes_without_changes[0] == 0 + assert len(syn_id_and_etags) == 0 + + def test_construct_partial_rows_for_upsert_with_wait_for_eventually_consistent_view( + self, + ): + # GIVEN an entity with columns and results containing id and ROW_ETAG + test_instance = self.ClassForTest( + id="syn123", + columns={ + "col1": Column(name="col1", column_type=ColumnType.STRING, id="id1"), + "col2": Column(name="col2", column_type=ColumnType.INTEGER, id="id2"), + }, + ) + + results = pd.DataFrame( + { + "ROW_ID": ["row1"], + "ROW_ETAG": ["etag1"], + "id": ["syn456"], + "col1": ["A"], + "col2": [1], + } + ) + + chunk_to_check_for_upsert = pd.DataFrame( + { + "col1": ["A"], + "col2": [10], # Changed value + } + ) + + primary_keys = ["col1"] + contains_etag = True + wait_for_eventually_consistent_view = True + + # WHEN I call _construct_partial_rows_for_upsert + ( + rows_to_update, + indexes_with_changes, + indexes_without_changes, + syn_id_and_etags, + ) = _construct_partial_rows_for_upsert( + entity=test_instance, + results=results, + chunk_to_check_for_upsert=chunk_to_check_for_upsert, + primary_keys=primary_keys, + contains_etag=contains_etag, + wait_for_eventually_consistent_view=wait_for_eventually_consistent_view, + ) + + # THEN I expect the row to be updated and syn_id_and_etags to be populated + assert len(rows_to_update) == 1 + assert rows_to_update[0].row_id == "row1" + assert rows_to_update[0].etag == "etag1" + assert len(syn_id_and_etags) == 1 + assert syn_id_and_etags["syn456"] == "etag1" + class TestQuery: """Test suite for the Query.to_synapse_request method.""" @@ -1455,6 +2230,8 @@ async def test_query_async(self): row_id_and_version_in_index=False, date_columns=None, list_columns=None, + dtype={"col1": str}, + list_column_types=None, ) # AND the result should match expected DataFrame @@ -1542,6 +2319,12 @@ async def test_query_async_with_date_and_list_columns(self): row_id_and_version_in_index=False, date_columns=["date_col"], # Should contain the DATE column list_columns=["list_col"], # Should contain the STRING_LIST column + dtype={ + "string_col": str, + }, + list_column_types={ + "list_col": ColumnType.STRING_LIST, + }, ) # AND the result should match expected DataFrame