Skip to content

Commit e7213c7

Browse files
feat: add partitioning and clustering parameters to the to_gbq function (#949)
* add partitioning and clustering to the to_gbq function * feat: add partitioning and clustering to the to_gbq function * added tests * fixed documentation error * run nox -r -s format lint --------- Co-authored-by: Tim Sweña (Swast) <swast@google.com>
1 parent ff95c02 commit e7213c7

File tree

2 files changed

+137
-3
lines changed

2 files changed

+137
-3
lines changed

pandas_gbq/gbq.py

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@
66
from datetime import datetime
77
import logging
88
import re
9+
import typing
910
import warnings
1011

12+
import pandas
13+
1114
from pandas_gbq.contexts import Context # noqa - backward compatible export
1215
from pandas_gbq.contexts import context
1316
from pandas_gbq.exceptions import ( # noqa - backward compatible export
@@ -380,6 +383,14 @@ def to_gbq(
380383
progress_bar=True,
381384
credentials=None,
382385
api_method: str = "default",
386+
clustering_columns: typing.Union[
387+
pandas.core.indexes.base.Index, typing.Iterable[typing.Hashable]
388+
] = (),
389+
time_partitioning_column: typing.Optional[str] = None,
390+
time_partitioning_type: typing.Optional[str] = "DAY",
391+
time_partitioning_expiration_ms: typing.Optional[int] = None,
392+
range_partitioning_column: typing.Optional[str] = None,
393+
range_partitioning_range: typing.Optional[dict] = None,
383394
verbose=None,
384395
private_key=None,
385396
auth_redirect_uri=None,
@@ -406,6 +417,18 @@ def to_gbq(
406417
destination_table : str
407418
Name of table to be written, in the form ``dataset.tablename`` or
408419
``project.dataset.tablename``.
420+
clustering_columns : pandas.Index | Iterable[Hashable], optional
421+
Specifies the columns for clustering in the BigQuery table.
422+
time_partitioning_column : str, optional
423+
Specifies the column for time-based partitioning in the BigQuery table.
424+
time_partitioning_type : str, default 'DAY'
425+
Specifies the type of time-based partitioning.
426+
time_partitioning_expiration_ms : int, optional
427+
Specifies the milliseconds for time-based partitioning expiration.
428+
range_partitioning_column : str, optional
429+
Specifies the column for range-based partitioning in the BigQuery table.
430+
range_partitioning_range : dict, optional
431+
Specifies the range for range-based partitioning.
409432
project_id : str, optional
410433
Google Cloud Platform project ID. Optional when available from
411434
the environment.
@@ -610,7 +633,16 @@ def to_gbq(
610633
location=location,
611634
credentials=connector.credentials,
612635
)
613-
table_connector.create(table_id, table_schema)
636+
table_connector.create(
637+
table_id,
638+
table_schema,
639+
clustering_columns=clustering_columns,
640+
time_partitioning_column=time_partitioning_column,
641+
time_partitioning_type=time_partitioning_type,
642+
time_partitioning_expiration_ms=time_partitioning_expiration_ms,
643+
range_partitioning_column=range_partitioning_column,
644+
range_partitioning_range=range_partitioning_range,
645+
)
614646
else:
615647
if if_exists == "append":
616648
# Convert original schema (the schema that already exists) to pandas-gbq API format
@@ -731,7 +763,17 @@ def exists(self, table_id):
731763
except self.http_error as ex:
732764
self.process_http_error(ex)
733765

734-
def create(self, table_id, schema):
766+
def create(
767+
self,
768+
table_id,
769+
schema,
770+
clustering_columns=None,
771+
time_partitioning_column=None,
772+
time_partitioning_type="DAY",
773+
time_partitioning_expiration_ms=None,
774+
range_partitioning_column=None,
775+
range_partitioning_range=None,
776+
):
735777
"""Create a table in Google BigQuery given a table and schema
736778
737779
Parameters
@@ -741,8 +783,27 @@ def create(self, table_id, schema):
741783
schema : str
742784
Use the generate_bq_schema to generate your table schema from a
743785
dataframe.
786+
clustering_columns : list, optional
787+
List of columns to cluster the table on.
788+
time_partitioning_column : str, optional
789+
Column to partition the table on.
790+
time_partitioning_type : str, default 'DAY'
791+
Type of time partitioning.
792+
time_partitioning_expiration_ms : int, optional
793+
Expiration time for the partitioning.
794+
range_partitioning_column : str, optional
795+
Column to partition the table on.
796+
range_partitioning_range : dict, optional
797+
Range for the partitioning.
744798
"""
745-
from google.cloud.bigquery import DatasetReference, Table, TableReference
799+
from google.cloud.bigquery import (
800+
DatasetReference,
801+
PartitionRange,
802+
RangePartitioning,
803+
Table,
804+
TableReference,
805+
TimePartitioning,
806+
)
746807

747808
if self.exists(table_id):
748809
raise TableCreationError("Table {0} already exists".format(table_id))
@@ -762,6 +823,26 @@ def create(self, table_id, schema):
762823
table = Table(table_ref)
763824
table.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)
764825

826+
if clustering_columns:
827+
table.clustering_fields = list(clustering_columns)
828+
829+
if time_partitioning_column:
830+
table.time_partitioning = TimePartitioning(
831+
type_=time_partitioning_type,
832+
field=time_partitioning_column,
833+
expiration_ms=time_partitioning_expiration_ms,
834+
)
835+
836+
if range_partitioning_column and range_partitioning_range:
837+
table.range_partitioning = RangePartitioning(
838+
field=range_partitioning_column,
839+
range_=PartitionRange(
840+
start=range_partitioning_range["start"],
841+
end=range_partitioning_range["end"],
842+
interval=range_partitioning_range["interval"],
843+
),
844+
)
845+
765846
try:
766847
self.client.create_table(table)
767848
except self.http_error as ex:

tests/unit/test_to_gbq.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,3 +271,56 @@ def custom_import_module_side_effect(name, package=None):
271271
create_user_agent()
272272
== f"pandas-{pd.__version__} jupyter bigquery_jupyter_plugin"
273273
)
274+
275+
276+
def test_to_gbq_with_clustering(mock_bigquery_client):
277+
mock_bigquery_client.get_table.side_effect = google.api_core.exceptions.NotFound(
278+
"my_table"
279+
)
280+
gbq.to_gbq(
281+
DataFrame([[1]]),
282+
"my_dataset.my_table",
283+
project_id="1234",
284+
clustering_columns=["col_a", "col_b"],
285+
)
286+
mock_bigquery_client.create_table.assert_called_with(mock.ANY)
287+
table = mock_bigquery_client.create_table.call_args[0][0]
288+
assert table.clustering_fields == ["col_a", "col_b"]
289+
290+
291+
def test_to_gbq_with_time_partitioning(mock_bigquery_client):
292+
mock_bigquery_client.get_table.side_effect = google.api_core.exceptions.NotFound(
293+
"my_table"
294+
)
295+
gbq.to_gbq(
296+
DataFrame([[1]]),
297+
"my_dataset.my_table",
298+
project_id="1234",
299+
time_partitioning_column="time_col",
300+
time_partitioning_type="DAY",
301+
time_partitioning_expiration_ms=100,
302+
)
303+
mock_bigquery_client.create_table.assert_called_with(mock.ANY)
304+
table = mock_bigquery_client.create_table.call_args[0][0]
305+
assert table.time_partitioning.type_ == "DAY"
306+
assert table.time_partitioning.field == "time_col"
307+
assert table.time_partitioning.expiration_ms == 100
308+
309+
310+
def test_to_gbq_with_range_partitioning(mock_bigquery_client):
311+
mock_bigquery_client.get_table.side_effect = google.api_core.exceptions.NotFound(
312+
"my_table"
313+
)
314+
gbq.to_gbq(
315+
DataFrame([[1]]),
316+
"my_dataset.my_table",
317+
project_id="1234",
318+
range_partitioning_column="range_col",
319+
range_partitioning_range={"start": 0, "end": 100, "interval": 10},
320+
)
321+
mock_bigquery_client.create_table.assert_called_with(mock.ANY)
322+
table = mock_bigquery_client.create_table.call_args[0][0]
323+
assert table.range_partitioning.field == "range_col"
324+
assert table.range_partitioning.range_.start == 0
325+
assert table.range_partitioning.range_.end == 100
326+
assert table.range_partitioning.range_.interval == 10

0 commit comments

Comments
 (0)