Skip to content

Commit c437ddf

Browse files
authored
[GEN-1667] Delete Table Rows Using Filtered DataFrame (#1254)
* add new param to delete_rows so rows can be deleted with a filtered dataframe
1 parent d9a7fef commit c437ddf

File tree

4 files changed

+228
-18
lines changed

4 files changed

+228
-18
lines changed

docs/tutorials/python/table.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,12 +198,18 @@ Qux2,4,203001,204001,+,False
198198

199199
## 5. Deleting Table rows & Tables
200200

201-
* Deleting specific rows - Query for the rows you want to delete and call syn.delete on the results
201+
* Deleting specific rows - Query for the rows you want to delete and call delete_rows on the results
202202

203203
```python
204204
table.delete_rows(query=f"SELECT * FROM {table.id} WHERE Strand = '+'")
205205
```
206206

207+
* Or deleting rows based on a dataframe, where the ROW_ID and ROW_VERSION columns specify the rows to be deleted from the table. In this example, rows 2 and 3 are deleted. See this document that describes the expected columns of the dataframe: <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/Row.html>. Note: The ROW_VERSION begins at 1 upon row creation and increases by one with every subsequent update.
208+
209+
```python
210+
table.delete_rows(df = pd.DataFrame({"ROW_ID": [2, 3], "ROW_VERSION": [1, 1]}))
211+
```
212+
207213
* Deleting the whole table will deletes the whole table and all rows
208214

209215
```python

synapseclient/models/mixins/table_components.py

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4049,31 +4049,38 @@ class TableDeleteRowMixin:
40494049

40504050
async def delete_rows_async(
40514051
self,
4052-
query: str,
4052+
query: Optional[str] = None,
4053+
df: Optional[DATA_FRAME_TYPE] = None,
40534054
*,
40544055
job_timeout: int = 600,
40554056
synapse_client: Optional[Synapse] = None,
40564057
) -> DATA_FRAME_TYPE:
40574058
"""
4058-
Delete rows from a table given a query to select rows. The query at a
4059-
minimum must select the `ROW_ID` and `ROW_VERSION` columns. If you want to
4059+
Delete rows from a table given a query or a pandas dataframe to select rows.
4060+
The query at a minimum must select the `ROW_ID` and `ROW_VERSION` columns. If you want to
40604061
inspect the data that will be deleted ahead of time you may use the
40614062
`.query` method to get the data.
4062-
4063+
The dataframe must at least contain the `ROW_ID` and `ROW_VERSION` columns. And `ROW_ETAG` column is also required
4064+
if the entity is one of the following: `EntityView`, `Dataset`, `DatasetCollection`, or `SubmissionView`.
4065+
If both query and df are provided, the query will be used.
40634066
40644067
Arguments:
40654068
query: The query to select the rows to delete. The query at a minimum
40664069
must select the `ROW_ID` and `ROW_VERSION` columns. See this document
40674070
that describes the expected syntax of the query:
40684071
<https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/web/controller/TableExamples.html>
4072+
df: A pandas dataframe that contains the rows to delete. The dataframe must at least contain the `ROW_ID` and `ROW_VERSION` columns.
4073+
If the entity is one of the following: `EntityView`, `Dataset`, `DatasetCollection`, or `SubmissionView` then the dataframe must also contain the `ROW_ETAG` column.
4074+
See this document that describes the expected columns of the dataframe:
4075+
<https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/Row.html>
40694076
job_timeout: The amount of time to wait for table updates to complete
40704077
before a `SynapseTimeoutError` is thrown. The default is 600 seconds.
40714078
synapse_client: If not passed in and caching was not disabled by
40724079
`Synapse.allow_client_caching(False)` this will use the last created
40734080
instance from the Synapse class constructor.
40744081
40754082
Returns:
4076-
The results of your query for the rows that were deleted from the table.
4083+
The results of your query or dataframe for the rows that were deleted from the table.
40774084
40784085
Example: Selecting a row to delete
40794086
This example shows how you may select a row to delete from a table.
@@ -4109,17 +4116,73 @@ async def main():
41094116
41104117
asyncio.run(main())
41114118
```
4119+
4120+
Example: Selecting rows to delete using a dataframe
4121+
This example shows how you may select a row to delete from a table based on a dataframe.
4122+
4123+
```python
4124+
import asyncio
4125+
import pandas as pd
4126+
from synapseclient import Synapse
4127+
from synapseclient.models import Table # Also works with `Dataset`
4128+
4129+
syn = Synapse()
4130+
syn.login()
4131+
4132+
# Creating a pandas dataframe that contains the rows to delete.
4133+
# In this example, we create a dataframe that specifies the first two rows of the table for deletion.
4134+
# Assuming no changes have been made to the table so the ROW_VERSION is 1.
4135+
4136+
df = pd.DataFrame({"ROW_ID": [1, 2], "ROW_VERSION": [1, 1]})
4137+
async def main():
4138+
await Table(id="syn1234").delete_rows_async(df=df)
4139+
4140+
asyncio.run(main())
4141+
```
41124142
"""
41134143
client = Synapse.get_client(synapse_client=synapse_client)
4114-
results_from_query = await self.query_async(query=query, synapse_client=client)
4115-
client.logger.info(
4116-
f"Found {len(results_from_query)} rows to delete for given query: {query}"
4117-
)
4118-
4144+
# check if both query and df are None
4145+
if query is None and df is None:
4146+
raise ValueError("Either query or df must be provided.")
4147+
if query is not None:
4148+
rows_to_delete = await self.query_async(query=query, synapse_client=client)
4149+
client.logger.info(
4150+
f"Found {len(rows_to_delete)} rows to delete for given query: {query}"
4151+
)
4152+
elif df is not None:
4153+
rows_to_delete = df
4154+
if (
4155+
"ROW_ID" not in rows_to_delete.columns
4156+
or "ROW_VERSION" not in rows_to_delete.columns
4157+
):
4158+
raise ValueError(
4159+
"The dataframe must contain the 'ROW_ID' and 'ROW_VERSION' columns."
4160+
)
4161+
# check the validity of the ROW_ID and ROW_VERSION columns
4162+
existing_rows = await self.query_async(
4163+
query=f"SELECT ROW_ID, ROW_VERSION FROM {self.id}",
4164+
synapse_client=client,
4165+
)
4166+
# check if all ROW_ID and ROW_VERSION pair in the dataframe exist in the table
4167+
merged = df.merge(
4168+
existing_rows, on=["ROW_ID", "ROW_VERSION"], how="left", indicator=True
4169+
)
4170+
if not all(merged["_merge"] == "both"):
4171+
discrepant_idx = merged.loc[merged["_merge"] != "both"].index
4172+
raise ValueError(
4173+
f"Rows with the following ROW_ID and ROW_VERSION pairs were not found in table {self.id}: {', '.join(map(str, discrepant_idx))}."
4174+
)
4175+
client.logger.info(
4176+
f"Received {len(rows_to_delete)} rows to delete for given dataframe."
4177+
)
41194178
if self.__class__.__name__ in CLASSES_THAT_CONTAIN_ROW_ETAG:
4120-
filtered_columns = results_from_query[["ROW_ID", "ROW_VERSION", "ROW_ETAG"]]
4179+
if "ROW_ETAG" not in rows_to_delete.columns:
4180+
raise ValueError(
4181+
f"The dataframe must contain the 'ROW_ETAG' column when deleting rows from a {self.__class__.__name__}."
4182+
)
4183+
filtered_columns = rows_to_delete[["ROW_ID", "ROW_VERSION", "ROW_ETAG"]]
41214184
else:
4122-
filtered_columns = results_from_query[["ROW_ID", "ROW_VERSION"]]
4185+
filtered_columns = rows_to_delete[["ROW_ID", "ROW_VERSION"]]
41234186

41244187
filepath = f"{tempfile.mkdtemp()}/{self.id}_upload_{uuid.uuid4()}.csv"
41254188
try:
@@ -4138,7 +4201,7 @@ async def main():
41384201
entity_id=self.id, changes=[upload_request]
41394202
).send_job_and_wait_async(synapse_client=client, timeout=job_timeout)
41404203

4141-
return results_from_query
4204+
return rows_to_delete
41424205

41434206

41444207
def infer_column_type_from_data(values: DATA_FRAME_TYPE) -> List[Column]:

tests/integration/synapseclient/models/async/test_table_async.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1683,7 +1683,7 @@ def init(self, syn: Synapse, schedule_for_cleanup: Callable[..., None]) -> None:
16831683
self.syn = syn
16841684
self.schedule_for_cleanup = schedule_for_cleanup
16851685

1686-
async def test_delete_single_row(self, project_model: Project) -> None:
1686+
async def test_delete_single_row_via_query(self, project_model: Project) -> None:
16871687
# GIVEN a table in Synapse
16881688
table_name = str(uuid.uuid4())
16891689
table = Table(
@@ -1720,7 +1720,7 @@ async def test_delete_single_row(self, project_model: Project) -> None:
17201720
# AND only 2 rows should exist on the table
17211721
assert len(results) == 2
17221722

1723-
async def test_delete_multiple_rows(self, project_model: Project) -> None:
1723+
async def test_delete_multiple_rows_via_query(self, project_model: Project) -> None:
17241724
# GIVEN a table in Synapse
17251725
table_name = str(uuid.uuid4())
17261726
table = Table(
@@ -1757,7 +1757,7 @@ async def test_delete_multiple_rows(self, project_model: Project) -> None:
17571757
# AND only 1 row should exist on the table
17581758
assert len(results) == 1
17591759

1760-
async def test_delete_no_rows(self, project_model: Project) -> None:
1760+
async def test_delete_no_rows_via_query(self, project_model: Project) -> None:
17611761
# GIVEN a table in Synapse
17621762
table_name = str(uuid.uuid4())
17631763
table = Table(
@@ -1793,6 +1793,45 @@ async def test_delete_no_rows(self, project_model: Project) -> None:
17931793
# AND 3 rows should exist on the table
17941794
assert len(results) == 3
17951795

1796+
async def test_delete_multiple_rows_via_dataframe(
1797+
self, project_model: Project
1798+
) -> None:
1799+
# GIVEN a table in Synapse
1800+
table_name = str(uuid.uuid4())
1801+
table = Table(
1802+
name=table_name,
1803+
parent_id=project_model.id,
1804+
columns=[Column(name="column_string", column_type=ColumnType.STRING)],
1805+
)
1806+
table = await table.store_async(synapse_client=self.syn)
1807+
self.schedule_for_cleanup(table.id)
1808+
1809+
# AND data for a column already stored in Synapse
1810+
data_for_table = pd.DataFrame({"column_string": ["value1", "value2", "value3"]})
1811+
await table.store_rows_async(
1812+
values=data_for_table, schema_storage_strategy=None, synapse_client=self.syn
1813+
)
1814+
# Get the ROW_ID and ROW_VERSION for the data we just added
1815+
# WHEN I delete rows from the table using a dataframe
1816+
await table.delete_rows_async(
1817+
df=pd.DataFrame({"ROW_ID": [2, 3], "ROW_VERSION": [1, 1]}),
1818+
synapse_client=self.syn,
1819+
)
1820+
1821+
# AND I query the table
1822+
results = await query_async(
1823+
f"SELECT * FROM {table.id}", synapse_client=self.syn
1824+
)
1825+
1826+
# THEN the data in the columns should match
1827+
pd.testing.assert_series_equal(
1828+
results["column_string"],
1829+
pd.DataFrame({"column_string": ["value1"]})["column_string"],
1830+
)
1831+
1832+
# AND only 1 row should exist on the table
1833+
assert len(results) == 1
1834+
17961835

17971836
class TestColumnModifications:
17981837
@pytest.fixture(autouse=True, scope="function")

tests/unit/synapseclient/mixins/unit_test_table_components.py

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,7 @@ async def test_delete_with_name_and_parent_id(self):
751751
async def test_delete_with_no_id_or_name_and_parent_id(self):
752752
# GIVEN a TestClass instance with no id or name and parent_id
753753
test_instance = self.ClassForTest()
754+
test_instance.__name__ = ""
754755

755756
with pytest.raises(
756757
ValueError,
@@ -1791,7 +1792,7 @@ class ClassForTest(TableDeleteRowMixin, QueryMixin):
17911792
name: Optional[str] = "test_table"
17921793
columns: Dict[str, Column] = field(default_factory=dict)
17931794

1794-
async def test_delete_rows_async(self):
1795+
async def test_delete_rows_async_via_query(self):
17951796
# GIVEN a TestClass instance
17961797
test_instance = self.ClassForTest()
17971798
with (
@@ -1811,12 +1812,17 @@ async def test_delete_rows_async(self):
18111812
entity_id=test_instance.id, changes=[]
18121813
),
18131814
) as mock_send_job_and_wait_async,
1815+
patch.object(self.syn.logger, "info") as mock_logger_info,
18141816
):
18151817
# WHEN I call delete_rows_async
18161818
result = await test_instance.delete_rows_async(
18171819
query=self.fake_query, synapse_client=self.syn
18181820
)
18191821

1822+
# THEN mock_logger_info should be called
1823+
mock_logger_info.assert_called_once_with(
1824+
f"Found 2 rows to delete for given query: {self.fake_query}"
1825+
)
18201826
# THEN mock_query_async should be called
18211827
mock_query_async.assert_awaited_once_with(
18221828
query=self.fake_query, synapse_client=self.syn
@@ -1834,6 +1840,102 @@ async def test_delete_rows_async(self):
18341840
pd.DataFrame({"ROW_ID": ["A", "B"], "ROW_VERSION": [1, 2]})
18351841
)
18361842

1843+
async def test_delete_rows_async_via_dataframe_pass(self):
1844+
# GIVEN a TestClass instance
1845+
test_instance = self.ClassForTest()
1846+
df = pd.DataFrame({"ROW_ID": ["A"], "ROW_VERSION": [1]})
1847+
with (
1848+
patch(
1849+
"synapseclient.models.mixins.table_components.QueryMixin.query_async",
1850+
return_value=pd.DataFrame(
1851+
{"ROW_ID": ["A", "B"], "ROW_VERSION": [1, 2]}
1852+
),
1853+
) as mock_query_async,
1854+
patch(
1855+
"synapseclient.models.mixins.table_components.multipart_upload_file_async",
1856+
return_value="fake_file_handle_id",
1857+
) as mock_multipart_upload_file_async,
1858+
patch(
1859+
SEND_JOB_AND_WAIT_ASYNC_PATCH,
1860+
return_value=TableUpdateTransaction(
1861+
entity_id=test_instance.id, changes=[]
1862+
),
1863+
) as mock_send_job_and_wait_async,
1864+
patch.object(self.syn.logger, "info") as mock_logger_info,
1865+
):
1866+
# WHEN I call delete_rows_async
1867+
result = await test_instance.delete_rows_async(
1868+
df=df, synapse_client=self.syn
1869+
)
1870+
1871+
# THEN mock_logger_info should be called
1872+
mock_logger_info.assert_called_once_with(
1873+
f"Received 1 rows to delete for given dataframe."
1874+
)
1875+
# AND mock_multipart_upload_file_async should be called
1876+
mock_multipart_upload_file_async.assert_awaited_once()
1877+
# AND mock_send_job_and_wait_async should be called
1878+
mock_send_job_and_wait_async.assert_awaited_once_with(
1879+
synapse_client=self.syn,
1880+
timeout=600,
1881+
)
1882+
1883+
# AND the result should be the expected dataframe object
1884+
assert result.equals(pd.DataFrame({"ROW_ID": ["A"], "ROW_VERSION": [1]}))
1885+
1886+
@pytest.mark.parametrize(
1887+
"df, error_msg",
1888+
[
1889+
(
1890+
pd.DataFrame(columns=["ROW_ID"]), # Missing ROW_VERSION column
1891+
"The dataframe must contain the 'ROW_ID' and 'ROW_VERSION' columns.",
1892+
),
1893+
(
1894+
pd.DataFrame(columns=["ROW_VERSION"]), # Missing ROW_ID column
1895+
"The dataframe must contain the 'ROW_ID' and 'ROW_VERSION' columns.",
1896+
),
1897+
(
1898+
pd.DataFrame(columns=["INVALID_COL", "ROW_VERSION"]), # Invalid column
1899+
"The dataframe must contain the 'ROW_ID' and 'ROW_VERSION' columns.",
1900+
),
1901+
(
1902+
pd.DataFrame(columns=["ROW_ID", "INVALID_COL"]), # Invalid column
1903+
"The dataframe must contain the 'ROW_ID' and 'ROW_VERSION' columns.",
1904+
),
1905+
(
1906+
pd.DataFrame(columns=["INVALID_COL1", "INVALID_COL2"]), # Both invalid
1907+
"The dataframe must contain the 'ROW_ID' and 'ROW_VERSION' columns.",
1908+
),
1909+
(
1910+
pd.DataFrame(
1911+
{"ROW_ID": ["C", "D"], "ROW_VERSION": [2, 2]}
1912+
), # Both invalid
1913+
"Rows with the following ROW_ID and ROW_VERSION pairs were not found in table syn123: 0, 1.",
1914+
),
1915+
],
1916+
)
1917+
async def test_delete_rows_via_dataframe_fail(self, df, error_msg):
1918+
# GIVEN a TestClass instance
1919+
test_instance = self.ClassForTest()
1920+
1921+
# WHEN I call delete_rows_async
1922+
with (
1923+
patch(
1924+
"synapseclient.models.mixins.table_components.QueryMixin.query_async",
1925+
return_value=pd.DataFrame(
1926+
{"ROW_ID": ["A", "B"], "ROW_VERSION": [1, 2]}
1927+
),
1928+
) as mock_query_async,
1929+
patch.object(self.syn.logger, "info") as mock_logger_info,
1930+
):
1931+
with pytest.raises(ValueError, match=error_msg):
1932+
result = await test_instance.delete_rows_async(
1933+
df=df, synapse_client=self.syn
1934+
)
1935+
1936+
# THEN mock_logger_info should not be called
1937+
mock_logger_info.assert_not_called()
1938+
18371939

18381940
class TestQueryTableCsv:
18391941
"""Test suite for the _query_table_csv function."""

0 commit comments

Comments
 (0)