Skip to content

Commit ef0e104

Browse files
committed
start operator for bigframes
1 parent e095dc1 commit ef0e104

File tree

1 file changed

+83
-0
lines changed

1 file changed

+83
-0
lines changed
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# Tested on Cloud Composer 3
16+
#
17+
# For local development:
18+
# pip install 'apache-airflow[google]==2.10.5'
19+
20+
21+
import datetime
22+
23+
from airflow import models
24+
from airflow.operators import bash
25+
from airflow.operators.python import (
26+
PythonVirtualenvOperator,
27+
)
28+
29+
30+
default_dag_args = {
31+
# The start_date describes when a DAG is valid / can be run. Set this to a
32+
# fixed point in time rather than dynamically, since it is evaluated every
33+
# time a DAG is parsed. See:
34+
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
35+
"start_date": datetime.datetime(2025, 6, 30),
36+
}
37+
38+
GCS_LOCATION = "gs://us-central1-bigframes-orche-b70f2a52-bucket/data/us-census/cc-est2023-agesex-all.csv"
39+
40+
# Define a DAG (directed acyclic graph) of tasks.
41+
# Any task you create within the context manager is automatically added to the
42+
# DAG object.
43+
with models.DAG(
44+
"census_from_http_to_gcs_once",
45+
schedule_interval="@once",
46+
default_args=default_dag_args,
47+
) as dag:
48+
download = bash.BashOperator(
49+
task_id="download",
50+
bash_command="wget https://www2.census.gov/programs-surveys/popest/datasets/2020-2023/counties/asrh/cc-est2023-agesex-all.csv",
51+
)
52+
upload = bash.BashOperator(
53+
task_id="upload",
54+
bash_command=f"gcloud storage cp cc-est2023-agesex-all.csv {GCS_LOCATION}",
55+
)
56+
57+
def callable_virtualenv():
58+
"""
59+
Example function that will be performed in a virtual environment.
60+
61+
Importing at the module level ensures that it will not attempt to import the
62+
library before it is installed.
63+
"""
64+
import bigframes.pandas as bpd
65+
66+
# Prevent the operator from accidentally downloading too many rows to
67+
# the client-side.
68+
bpd.options.compute.maximum_result_rows = 1000
69+
70+
# TODO: read csv using bigquery engine
71+
# TODO: any sort of processing / cleanup?
72+
# TODO: some data validations (after cache())
73+
# TODO: write to destination table
74+
75+
bf_to_gbq = PythonVirtualenvOperator(
76+
task_id="bf_to_gbq",
77+
python_callable=callable_virtualenv,
78+
requirements=["bigframes==2.9.0"],
79+
system_site_packages=False,
80+
)
81+
82+
83+
download >> upload >> bf_to_gbq

0 commit comments

Comments
 (0)