3636}
3737
3838GCS_LOCATION = "gs://us-central1-bigframes-orche-b70f2a52-bucket/data/us-census/cc-est2024-agesex-all.csv"
39+ BIGQUERY_DESTINATION = "swast-scratch.airflow_demo.us_census_by_county2020_to_present"
3940
4041# Define a DAG (directed acyclic graph) of tasks.
4142# Any task you create within the context manager is automatically added to the
@@ -89,14 +90,8 @@ def callable_virtualenv():
8990 # parsing the file.
9091 df = bpd .read_csv (GCS_LOCATION , engine = "bigquery" )
9192
92- # TODO: any sort of processing / cleanup?
93- # The key for YEAR is as follows:
94- # 1 = 4/1/2020 population estimates base
95- # 2 = 7/1/2020 population estimate
96- # 3 = 7/1/2021 population estimate
97- # 4 = 7/1/2022 population estimate
98- # 5 = 7/1/2023 population estimate
99- # 6 = 7/1/2024 population estimate
93+ # Perform preprocessing. For example, you can map some coded data
94+ # into a form that is easier to understand.
10095 df_dates = df .assign (
10196 ESTIMATE_DATE = df ["YEAR" ].case_when (
10297 caselist = [
@@ -110,6 +105,8 @@ def callable_virtualenv():
110105 ]
111106 ),
112107 ).drop (columns = ["YEAR" ])
108+
109+ # TODO(developer): Add additional processing and cleanup as needed.
113110
114111 # One of the benefits of using BigQuery DataFrames in your operators is
115112 # that it makes it easy to perform data validations.
@@ -118,10 +115,20 @@ def callable_virtualenv():
118115 # complicated, it hints to BigQuery DataFrames to run those first and
119116 # avoid duplicating work.
120117 df_dates .cache ()
118+ row_count , column_count = df_dates .shape
119+ assert row_count > 0
120+ assert column_count > 0
121121 assert not df_dates ["ESTIMATE_DATE" ].hasnans
122122
123- # Now that we have validated the data is as expected, it should be safe
124- # to write to the final destination table.
123+ # TODO(developer): Add additional validations as needed.
124+
125+ # Now that you have validated the data, it should be safe to write
126+ # to the final destination table.
127+ df_dates .to_gbq (
128+ BIGQUERY_DESTINATION ,
129+ if_exists = "replace" ,
130+ clustering_columns = ["ESTIMATE_DATE" , "STATE" , "COUNTY" ],
131+ )
125132 finally :
126133 # Closing the session is optional. Any temporary tables created
127134 # should be automatically cleaned up when the BigQuery Session
0 commit comments