Skip to content

Commit d41e61e

Browse files
committed
add some basic preprocessing and validations
1 parent ef0e104 commit d41e61e

File tree

1 file changed

+68
-11
lines changed

1 file changed

+68
-11
lines changed

2025/census-data-airflow-bigframes/census_to_bigquery_venv.py

Lines changed: 68 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
"start_date": datetime.datetime(2025, 6, 30),
3636
}
3737

38-
GCS_LOCATION = "gs://us-central1-bigframes-orche-b70f2a52-bucket/data/us-census/cc-est2023-agesex-all.csv"
38+
GCS_LOCATION = "gs://us-central1-bigframes-orche-b70f2a52-bucket/data/us-census/cc-est2024-agesex-all.csv"
3939

4040
# Define a DAG (directed acyclic graph) of tasks.
4141
# Any task you create within the context manager is automatically added to the
@@ -47,11 +47,14 @@
4747
) as dag:
4848
download = bash.BashOperator(
4949
task_id="download",
50-
bash_command="wget https://www2.census.gov/programs-surveys/popest/datasets/2020-2023/counties/asrh/cc-est2023-agesex-all.csv",
50+
# See
51+
# https://www.census.gov/data/tables/time-series/demo/popest/2020s-counties-detail.html
52+
# for file paths and methodologies.
53+
bash_command="wget https://www2.census.gov/programs-surveys/popest/datasets/2020-2024/counties/asrh/cc-est2024-agesex-all.csv",
5154
)
5255
upload = bash.BashOperator(
5356
task_id="upload",
54-
bash_command=f"gcloud storage cp cc-est2023-agesex-all.csv {GCS_LOCATION}",
57+
bash_command=f"gcloud storage cp cc-est2024-agesex-all.csv {GCS_LOCATION}",
5558
)
5659

5760
def callable_virtualenv():
@@ -61,21 +64,75 @@ def callable_virtualenv():
6164
Importing at the module level ensures that it will not attempt to import the
6265
library before it is installed.
6366
"""
67+
#=============================
68+
# Setup bigframes
69+
#=============================
6470
import bigframes.pandas as bpd
6571

66-
# Prevent the operator from accidentally downloading too many rows to
67-
# the client-side.
68-
bpd.options.compute.maximum_result_rows = 1000
72+
# Recommended: Partial ordering mode enables the best performance.
73+
bpd.options.bigquery.ordering_mode = "partial"
6974

70-
# TODO: read csv using bigquery engine
71-
# TODO: any sort of processing / cleanup?
72-
# TODO: some data validations (after cache())
73-
# TODO: write to destination table
75+
# Recommended: Fail the operator if it accidentally downloads too many
76+
# rows to the client-side from BigQuery. This can prevent your operator
77+
# from using too much memory.
78+
bpd.options.compute.maximum_result_rows = 10_000
79+
80+
# Optional. An explicit project ID is not needed if the project can be
81+
# determined from the environment, such as in Cloud Composer, Google
82+
# Compute Engine, or if authenicated with the gcloud application-default
83+
# commands.
84+
# bpd.options.bigquery.project = "my-project-id"
85+
86+
try:
87+
# By loading with the BigQuery engine, you can avoid having to read
88+
# the file into memory. This is because BigQuery is responsible for
89+
# parsing the file.
90+
df = bpd.read_csv(GCS_LOCATION, engine="bigquery")
91+
92+
# TODO: any sort of processing / cleanup?
93+
# The key for YEAR is as follows:
94+
# 1 = 4/1/2020 population estimates base
95+
# 2 = 7/1/2020 population estimate
96+
# 3 = 7/1/2021 population estimate
97+
# 4 = 7/1/2022 population estimate
98+
# 5 = 7/1/2023 population estimate
99+
# 6 = 7/1/2024 population estimate
100+
df_dates = df.assign(
101+
ESTIMATE_DATE=df["YEAR"].case_when(
102+
caselist=[
103+
(df["YEAR"].eq(1), datetime.date(2020, 4, 1)),
104+
(df["YEAR"].eq(2), datetime.date(2020, 7, 1)),
105+
(df["YEAR"].eq(3), datetime.date(2021, 7, 1)),
106+
(df["YEAR"].eq(4), datetime.date(2022, 7, 1)),
107+
(df["YEAR"].eq(5), datetime.date(2023, 7, 1)),
108+
(df["YEAR"].eq(6), datetime.date(2024, 7, 1)),
109+
(True, None),
110+
]
111+
),
112+
).drop(columns=["YEAR"])
113+
114+
# One of the benefits of using BigQuery DataFrames in your operators is
115+
# that it makes it easy to perform data validations.
116+
#
117+
# Note: cache() is optional, but if any of the preprocessing above is
118+
# complicated, it hints to BigQuery DataFrames to run those first and
119+
# avoid duplicating work.
120+
df_dates.cache()
121+
assert not df_dates["ESTIMATE_DATE"].hasnans
122+
123+
# Now that we have validated the data is as expected, it should be safe
124+
# to write to the final destination table.
125+
finally:
126+
# Closing the session is optional. Any temporary tables created
127+
# should be automatically cleaned up when the BigQuery Session
128+
# closes after 24 hours, but closing the session explicitly can help
129+
# save on storage costs.
130+
bpd.close_session()
74131

75132
bf_to_gbq = PythonVirtualenvOperator(
76133
task_id="bf_to_gbq",
77134
python_callable=callable_virtualenv,
78-
requirements=["bigframes==2.9.0"],
135+
requirements=["bigframes==2.10.0"],
79136
system_site_packages=False,
80137
)
81138

0 commit comments

Comments
 (0)