1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15- # Tested on Cloud Composer 3
16- #
17- # For local development:
18- # pip install 'apache-airflow[google]==2.10.5'
15+ """
16+ An example DAG for loading data from the US Census using BigQuery DataFrames
17+ (aka bigframes). This DAG uses PythonVirtualenvOperator for environments where
18+ bigframes can't be installed for use from PythonOperator.
19+
20+ I have tested this DAG on Cloud Composer 3 with Apache Airflow 2.10.5.
21+
22+ For local development:
23+
24+ pip install 'apache-airflow[google]==2.10.5' bigframes
25+ """
1926
2027
2128import datetime
3643}
3744
3845GCS_LOCATION = "gs://us-central1-bigframes-orche-b70f2a52-bucket/data/us-census/cc-est2024-agesex-all.csv"
39- BIGQUERY_DESTINATION = "swast-scratch.airflow_demo.us_census_by_county2020_to_present"
4046
4147# Define a DAG (directed acyclic graph) of tasks.
4248# Any task you create within the context manager is automatically added to the
4349# DAG object.
4450with models .DAG (
45- "census_from_http_to_gcs_once " ,
51+ "census_from_http_to_bigquery_once " ,
4652 schedule_interval = "@once" ,
4753 default_args = default_dag_args ,
4854) as dag :
49- download = bash .BashOperator (
50- task_id = "download " ,
55+ download_upload = bash .BashOperator (
56+ task_id = "download_upload " ,
5157 # See
5258 # https://www.census.gov/data/tables/time-series/demo/popest/2020s-counties-detail.html
5359 # for file paths and methodologies.
54- bash_command = "wget https://www2.census.gov/programs-surveys/popest/datasets/2020-2024/counties/asrh/cc-est2024-agesex-all.csv" ,
55- )
56- upload = bash .BashOperator (
57- task_id = "upload" ,
58- bash_command = f"gcloud storage cp cc-est2024-agesex-all.csv { GCS_LOCATION } " ,
60+ bash_command = f"""
61+ wget https://www2.census.gov/programs-surveys/popest/datasets/2020-2024/counties/asrh/cc-est2024-agesex-all.csv -P ~;
62+ gcloud storage cp ~/cc-est2024-agesex-all.csv { GCS_LOCATION }
63+ """ ,
5964 )
6065
6166 def callable_virtualenv ():
@@ -65,10 +70,16 @@ def callable_virtualenv():
6570 Importing at the module level ensures that it will not attempt to import the
6671 library before it is installed.
6772 """
73+ import datetime
74+
75+ import bigframes .pandas as bpd
76+
77+ BIGQUERY_DESTINATION = "swast-scratch.airflow_demo.us_census_by_county2020_to_present"
78+ GCS_LOCATION = "gs://us-central1-bigframes-orche-b70f2a52-bucket/data/us-census/cc-est2024-agesex-all.csv"
79+
6880 #=============================
6981 # Setup bigframes
7082 #=============================
71- import bigframes .pandas as bpd
7283
7384 # Recommended: Partial ordering mode enables the best performance.
7485 bpd .options .bigquery .ordering_mode = "partial"
@@ -144,4 +155,4 @@ def callable_virtualenv():
144155 )
145156
146157
147- download >> upload >> bf_to_gbq
158+ download_upload >> bf_to_gbq
0 commit comments