From 44529d912f0b502bc6e11621c6115cd069fe8088 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 8 Jul 2025 07:35:04 -0400 Subject: [PATCH 1/7] Add field to dataframe join to indicate if we should keep duplicate keys --- python/datafusion/dataframe.py | 32 +++++++++++++++++++----------- python/tests/test_dataframe.py | 1 - src/dataframe.rs | 36 +++++++++++++++++++++++++++++++++- 3 files changed, 56 insertions(+), 13 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index c6ff7eda5..3c956c963 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -774,6 +774,7 @@ def join( left_on: None = None, right_on: None = None, join_keys: None = None, + keep_duplicate_keys: bool = False, ) -> DataFrame: ... @overload @@ -786,6 +787,7 @@ def join( left_on: str | Sequence[str], right_on: str | Sequence[str], join_keys: tuple[list[str], list[str]] | None = None, + keep_duplicate_keys: bool = False, ) -> DataFrame: ... @overload @@ -798,6 +800,7 @@ def join( join_keys: tuple[list[str], list[str]], left_on: None = None, right_on: None = None, + keep_duplicate_keys: bool = False, ) -> DataFrame: ... def join( @@ -809,6 +812,7 @@ def join( left_on: str | Sequence[str] | None = None, right_on: str | Sequence[str] | None = None, join_keys: tuple[list[str], list[str]] | None = None, + keep_duplicate_keys: bool = False, ) -> DataFrame: """Join this :py:class:`DataFrame` with another :py:class:`DataFrame`. @@ -821,11 +825,23 @@ def join( "right", "full", "semi", "anti". left_on: Join column of the left dataframe. right_on: Join column of the right dataframe. + keep_duplicate_keys: When False, the columns from the right DataFrame + that have identical names in the ``on`` fields to the left DataFrame + will be dropped. join_keys: Tuple of two lists of column names to join on. [Deprecated] Returns: DataFrame after join. """ + if join_keys is not None: + warnings.warn( + "`join_keys` is deprecated, use `on` or `left_on` with `right_on`", + category=DeprecationWarning, + stacklevel=2, + ) + left_on = join_keys[0] + right_on = join_keys[1] + # This check is to prevent breaking API changes where users prior to # DF 43.0.0 would pass the join_keys as a positional argument instead # of a keyword argument. @@ -836,18 +852,10 @@ def join( and isinstance(on[1], list) ): # We know this is safe because we've checked the types - join_keys = on # type: ignore[assignment] + left_on = on[0] + right_on = on[1] on = None - if join_keys is not None: - warnings.warn( - "`join_keys` is deprecated, use `on` or `left_on` with `right_on`", - category=DeprecationWarning, - stacklevel=2, - ) - left_on = join_keys[0] - right_on = join_keys[1] - if on is not None: if left_on is not None or right_on is not None: error_msg = "`left_on` or `right_on` should not provided with `on`" @@ -866,7 +874,9 @@ def join( if isinstance(right_on, str): right_on = [right_on] - return DataFrame(self.df.join(right.df, how, left_on, right_on)) + return DataFrame( + self.df.join(right.df, how, left_on, right_on, keep_duplicate_keys) + ) def join_on( self, diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 101dfc5b2..c29270f66 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -647,7 +647,6 @@ def test_unnest_without_nulls(nested_df): assert result.column(1) == pa.array([7, 8, 8, 9, 9, 9]) -@pytest.mark.filterwarnings("ignore:`join_keys`:DeprecationWarning") def test_join(): ctx = SessionContext() diff --git a/src/dataframe.rs b/src/dataframe.rs index a93aa0185..862f4d411 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -629,6 +629,7 @@ impl PyDataFrame { how: &str, left_on: Vec, right_on: Vec, + keep_duplicate_keys: bool, ) -> PyDataFusionResult { let join_type = match how { "inner" => JoinType::Inner, @@ -647,13 +648,46 @@ impl PyDataFrame { let left_keys = left_on.iter().map(|s| s.as_ref()).collect::>(); let right_keys = right_on.iter().map(|s| s.as_ref()).collect::>(); - let df = self.df.as_ref().clone().join( + let mut df = self.df.as_ref().clone().join( right.df.as_ref().clone(), join_type, &left_keys, &right_keys, None, )?; + + if !keep_duplicate_keys { + let mutual_keys = left_keys + .iter() + .zip(right_keys.iter()) + .filter(|(l, r)| l == r) + .map(|(key, _)| *key) + .collect::>(); + + let fields_to_drop = mutual_keys + .iter() + .map(|name| { + df.logical_plan() + .schema() + .qualified_fields_with_unqualified_name(name) + }) + .filter(|r| r.len() == 2) + .map(|r| r[1]) + .collect::>(); + + let expr: Vec = df + .logical_plan() + .schema() + .fields() + .into_iter() + .enumerate() + .map(|(idx, _)| df.logical_plan().schema().qualified_field(idx)) + .filter(|(qualifier, f)| !fields_to_drop.contains(&(*qualifier, f))) + .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field)))) + .collect(); + df = df.select(expr)?; + } + Ok(Self::new(df)) } From 1a5587cfaf896fd51aeba24db0b537f1a09dcd61 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 8 Jul 2025 07:37:16 -0400 Subject: [PATCH 2/7] Suppress expected warning --- python/tests/test_sql.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/tests/test_sql.py b/python/tests/test_sql.py index 8f57992d1..8d1f30096 100644 --- a/python/tests/test_sql.py +++ b/python/tests/test_sql.py @@ -157,6 +157,9 @@ def test_register_parquet(ctx, tmp_path): assert result.to_pydict() == {"cnt": [100]} +@pytest.mark.filterwarnings( + "ignore:using literals for table_partition_cols data types:DeprecationWarning" +) @pytest.mark.parametrize( ("path_to_str", "legacy_data_type"), [(True, False), (False, False), (False, True)] ) From 9882dd9a69e0063951d89378abd51c20c063e8e9 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 8 Jul 2025 07:37:43 -0400 Subject: [PATCH 3/7] Minor: small tables rendered way too large --- python/datafusion/dataframe_formatter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/datafusion/dataframe_formatter.py b/python/datafusion/dataframe_formatter.py index 4082ff4ec..bb53d323e 100644 --- a/python/datafusion/dataframe_formatter.py +++ b/python/datafusion/dataframe_formatter.py @@ -370,7 +370,7 @@ def _build_table_container_start(self) -> list[str]: f"max-height: {self.max_height}px; overflow: auto; border: " '1px solid #ccc;">' ) - html.append('') + html.append('
') return html def _build_table_header(self, schema: Any) -> list[str]: From 4f9f19095a2dda0e1e46902a3891ab16b087177f Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 7 Nov 2025 15:03:38 -0500 Subject: [PATCH 4/7] Rename from keep_duplicate_keys to drop_duplicate_keys --- python/datafusion/dataframe.py | 12 ++++++------ src/dataframe.rs | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 3c956c963..b3b48e963 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -774,7 +774,7 @@ def join( left_on: None = None, right_on: None = None, join_keys: None = None, - keep_duplicate_keys: bool = False, + drop_duplicate_keys: bool = True, ) -> DataFrame: ... @overload @@ -787,7 +787,7 @@ def join( left_on: str | Sequence[str], right_on: str | Sequence[str], join_keys: tuple[list[str], list[str]] | None = None, - keep_duplicate_keys: bool = False, + drop_duplicate_keys: bool = True, ) -> DataFrame: ... @overload @@ -800,7 +800,7 @@ def join( join_keys: tuple[list[str], list[str]], left_on: None = None, right_on: None = None, - keep_duplicate_keys: bool = False, + drop_duplicate_keys: bool = True, ) -> DataFrame: ... def join( @@ -812,7 +812,7 @@ def join( left_on: str | Sequence[str] | None = None, right_on: str | Sequence[str] | None = None, join_keys: tuple[list[str], list[str]] | None = None, - keep_duplicate_keys: bool = False, + drop_duplicate_keys: bool = True, ) -> DataFrame: """Join this :py:class:`DataFrame` with another :py:class:`DataFrame`. @@ -825,7 +825,7 @@ def join( "right", "full", "semi", "anti". left_on: Join column of the left dataframe. right_on: Join column of the right dataframe. - keep_duplicate_keys: When False, the columns from the right DataFrame + drop_duplicate_keys: When True, the columns from the right DataFrame that have identical names in the ``on`` fields to the left DataFrame will be dropped. join_keys: Tuple of two lists of column names to join on. [Deprecated] @@ -875,7 +875,7 @@ def join( right_on = [right_on] return DataFrame( - self.df.join(right.df, how, left_on, right_on, keep_duplicate_keys) + self.df.join(right.df, how, left_on, right_on, drop_duplicate_keys) ) def join_on( diff --git a/src/dataframe.rs b/src/dataframe.rs index 862f4d411..187bb0acf 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -629,7 +629,7 @@ impl PyDataFrame { how: &str, left_on: Vec, right_on: Vec, - keep_duplicate_keys: bool, + drop_duplicate_keys: bool, ) -> PyDataFusionResult { let join_type = match how { "inner" => JoinType::Inner, @@ -656,7 +656,7 @@ impl PyDataFrame { None, )?; - if !keep_duplicate_keys { + if drop_duplicate_keys { let mutual_keys = left_keys .iter() .zip(right_keys.iter()) From 1e7f87461052593d5db9335e4328ed74b03d9b64 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 7 Nov 2025 15:18:28 -0500 Subject: [PATCH 5/7] Add unit tests for dropping duplicate keys or not --- python/tests/test_dataframe.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index c29270f66..8292e258e 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -663,25 +663,38 @@ def test_join(): df1 = ctx.create_dataframe([[batch]], "r") df2 = df.join(df1, on="a", how="inner") - df2.show() df2 = df2.sort(column("l.a")) table = pa.Table.from_batches(df2.collect()) expected = {"a": [1, 2], "c": [8, 10], "b": [4, 5]} assert table.to_pydict() == expected - df2 = df.join(df1, left_on="a", right_on="a", how="inner") - df2.show() + # Test the default behavior for dropping duplicate keys + # Since we may have a duplicate column name and pa.Table() + # hides the fact, instead we need to explicitly check the + # resultant arrays. + df2 = df.join(df1, left_on="a", right_on="a", how="inner", drop_duplicate_keys=True) df2 = df2.sort(column("l.a")) - table = pa.Table.from_batches(df2.collect()) + result = df2.collect()[0] + assert result.num_columns == 3 + assert result.column(0) == pa.array([1, 2], pa.int64()) + assert result.column(1) == pa.array([4, 5], pa.int64()) + assert result.column(2) == pa.array([8, 10], pa.int64()) - expected = {"a": [1, 2], "c": [8, 10], "b": [4, 5]} - assert table.to_pydict() == expected + df2 = df.join( + df1, left_on="a", right_on="a", how="inner", drop_duplicate_keys=False + ) + df2 = df2.sort(column("l.a")) + result = df2.collect()[0] + assert result.num_columns == 4 + assert result.column(0) == pa.array([1, 2], pa.int64()) + assert result.column(1) == pa.array([4, 5], pa.int64()) + assert result.column(2) == pa.array([1, 2], pa.int64()) + assert result.column(3) == pa.array([8, 10], pa.int64()) # Verify we don't make a breaking change to pre-43.0.0 # where users would pass join_keys as a positional argument df2 = df.join(df1, (["a"], ["a"]), how="inner") - df2.show() df2 = df2.sort(column("l.a")) table = pa.Table.from_batches(df2.collect()) From c1b9dc48d108722d4a43b24e7b4b5234e98bb317 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 7 Nov 2025 15:30:53 -0500 Subject: [PATCH 6/7] Update online docs --- .../user-guide/common-operations/joins.rst | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/docs/source/user-guide/common-operations/joins.rst b/docs/source/user-guide/common-operations/joins.rst index 40d922150..75524751e 100644 --- a/docs/source/user-guide/common-operations/joins.rst +++ b/docs/source/user-guide/common-operations/joins.rst @@ -101,4 +101,36 @@ the right table. .. ipython:: python - left.join(right, left_on="customer_id", right_on="id", how="anti") \ No newline at end of file + left.join(right, left_on="customer_id", right_on="id", how="anti") + +Duplicate Keys +-------------- + +It is common to join two DataFrames on a common column name. Starting in +version 51.0.0, ``datafusion-python``` will now drop duplicate column names by +default. This reduces problems with ambiguous column selection after joins. +You can disable this feature by setting the parameter ``drop_duplicate_keys`` +to ``False``. + +.. ipython:: python + + left = ctx.from_pydict( + { + "id": [1, 2, 3], + "customer": ["Alice", "Bob", "Charlie"], + } + ) + + right = ctx.from_pylist([ + {"id": 1, "name": "CityCabs"}, + {"id": 2, "name": "MetroRide"}, + {"id": 5, "name": "UrbanGo"}, + ]) + + left.join(right, "id", how="inner") + +In contrast to the above example, if we wish to get both columns: + +.. ipython:: python + + left.join(right, "id", "inner", drop_duplicate_keys=False) From 60cc49acf50a6e664cb36af13ba83706b7f20bc2 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 8 Nov 2025 09:40:58 -0500 Subject: [PATCH 7/7] Update docs/source/user-guide/common-operations/joins.rst Co-authored-by: kosiew --- docs/source/user-guide/common-operations/joins.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/common-operations/joins.rst b/docs/source/user-guide/common-operations/joins.rst index 75524751e..035d7488d 100644 --- a/docs/source/user-guide/common-operations/joins.rst +++ b/docs/source/user-guide/common-operations/joins.rst @@ -133,4 +133,4 @@ In contrast to the above example, if we wish to get both columns: .. ipython:: python - left.join(right, "id", "inner", drop_duplicate_keys=False) + left.join(right, "id", how="inner", drop_duplicate_keys=False)