Skip to content

Commit ffc0b86

Browse files
authored
Adding Iceberg REST Catalog Examples for Dataflow Documentation (#10149)
* Add Iceberg REST Catalog Examples * add start and end tags * Add integration tests * Fix tests * Fix Style * Fix variable * Rename file * fix write time * Test Assert * Pass gcp project id * Create Scoped creds * Trigger tests * Add Auth header * Fix Kafka test
1 parent 7524578 commit ffc0b86

File tree

5 files changed

+413
-7
lines changed

5 files changed

+413
-7
lines changed

dataflow/snippets/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
<maven.compiler.source>11</maven.compiler.source>
3838
<maven.compiler.target>11</maven.compiler.target>
3939
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
40-
<apache_beam.version>2.62.0</apache_beam.version>
40+
<apache_beam.version>2.67.0</apache_beam.version>
4141
<slf4j.version>2.0.12</slf4j.version>
4242
<parquet.version>1.14.0</parquet.version>
4343
<iceberg.version>1.4.2</iceberg.version>
@@ -155,6 +155,11 @@
155155
<artifactId>iceberg-data</artifactId>
156156
<version>${iceberg.version}</version>
157157
</dependency>
158+
<dependency>
159+
<groupId>org.apache.iceberg</groupId>
160+
<artifactId>iceberg-gcp</artifactId>
161+
<version>${iceberg.version}</version>
162+
</dependency>
158163

159164
<!-- Kafka -->
160165
<dependency>
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
// [START dataflow_apache_iceberg_cdc_read]
20+
import com.google.auth.oauth2.GoogleCredentials;
21+
import com.google.common.collect.ImmutableMap;
22+
import java.io.IOException;
23+
import java.util.Map;
24+
import org.apache.beam.sdk.Pipeline;
25+
import org.apache.beam.sdk.coders.RowCoder;
26+
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
27+
import org.apache.beam.sdk.managed.Managed;
28+
import org.apache.beam.sdk.options.Default;
29+
import org.apache.beam.sdk.options.Description;
30+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
31+
import org.apache.beam.sdk.options.Validation;
32+
import org.apache.beam.sdk.schemas.Schema;
33+
import org.apache.beam.sdk.transforms.MapElements;
34+
import org.apache.beam.sdk.transforms.Sum;
35+
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
36+
import org.apache.beam.sdk.transforms.windowing.Window;
37+
import org.apache.beam.sdk.values.KV;
38+
import org.apache.beam.sdk.values.PCollection;
39+
import org.apache.beam.sdk.values.Row;
40+
import org.apache.beam.sdk.values.TypeDescriptors;
41+
import org.joda.time.Duration;
42+
43+
/**
44+
* A streaming pipeline that reads CDC events from an Iceberg table, aggregates user clicks, and
45+
* writes the results to another Iceberg table.
46+
*
47+
* <p>This pipeline can be used to process the output of {@link
48+
* ApacheIcebergRestCatalogStreamingWrite}.
49+
*/
50+
public class ApacheIcebergCdcRead {
51+
52+
// Schema for the source table containing click events.
53+
public static final Schema SOURCE_SCHEMA =
54+
Schema.builder().addStringField("user_id").addInt64Field("click_count").build();
55+
56+
// Schema for the destination table containing aggregated click counts.
57+
public static final Schema DESTINATION_SCHEMA =
58+
Schema.builder().addStringField("user_id").addInt64Field("total_clicks").build();
59+
60+
/** Pipeline options for this example. */
61+
public interface Options extends GcpOptions {
62+
@Description("The source Iceberg table to read CDC events from")
63+
@Validation.Required
64+
String getSourceTable();
65+
66+
void setSourceTable(String sourceTable);
67+
68+
@Description("The destination Iceberg table to write aggregated results to")
69+
@Validation.Required
70+
String getDestinationTable();
71+
72+
void setDestinationTable(String destinationTable);
73+
74+
@Description("Warehouse location for the Iceberg catalog")
75+
@Validation.Required
76+
String getWarehouse();
77+
78+
void setWarehouse(String warehouse);
79+
80+
@Description("The URI for the REST catalog")
81+
@Default.String("https://biglake.googleapis.com/iceberg/v1beta/restcatalog")
82+
String getCatalogUri();
83+
84+
void setCatalogUri(String value);
85+
86+
@Description("The name of the Iceberg catalog")
87+
@Validation.Required
88+
String getCatalogName();
89+
90+
void setCatalogName(String catalogName);
91+
}
92+
93+
public static void main(String[] args) throws IOException {
94+
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
95+
96+
Map<String, String> catalogProps =
97+
ImmutableMap.<String, String>builder()
98+
.put("type", "rest")
99+
.put("uri", options.getCatalogUri())
100+
.put("warehouse", options.getWarehouse())
101+
.put("header.x-goog-user-project", options.getProject())
102+
.put(
103+
"header.Authorization",
104+
"Bearer "
105+
+ GoogleCredentials.getApplicationDefault()
106+
.createScoped("https://www.googleapis.com/auth/cloud-platform")
107+
.refreshAccessToken()
108+
.getTokenValue())
109+
.put("rest-metrics-reporting-enabled", "false")
110+
.build();
111+
112+
Pipeline p = Pipeline.create(options);
113+
114+
// Configure the Iceberg CDC read
115+
Map<String, Object> icebergReadConfig =
116+
ImmutableMap.<String, Object>builder()
117+
.put("table", options.getSourceTable())
118+
.put("catalog_name", options.getCatalogName())
119+
.put("catalog_properties", catalogProps)
120+
.put("streaming", Boolean.TRUE)
121+
.put("poll_interval_seconds", 20)
122+
.build();
123+
124+
PCollection<Row> cdcEvents =
125+
p.apply("ReadFromIceberg", Managed.read(Managed.ICEBERG_CDC).withConfig(icebergReadConfig))
126+
.getSinglePCollection()
127+
.setRowSchema(SOURCE_SCHEMA);
128+
129+
PCollection<Row> aggregatedRows =
130+
cdcEvents
131+
.apply("ApplyWindow", Window.into(FixedWindows.of(Duration.standardSeconds(30))))
132+
.apply(
133+
"ExtractUserAndCount",
134+
MapElements.into(
135+
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.longs()))
136+
.via(
137+
row -> {
138+
String userId = row.getString("user_id");
139+
Long clickCount = row.getInt64("click_count");
140+
return KV.of(userId, clickCount == null ? 0L : clickCount);
141+
}))
142+
.apply("SumClicksPerUser", Sum.longsPerKey())
143+
.apply(
144+
"FormatToRow",
145+
MapElements.into(TypeDescriptors.rows())
146+
.via(
147+
kv ->
148+
Row.withSchema(DESTINATION_SCHEMA)
149+
.withFieldValue("user_id", kv.getKey())
150+
.withFieldValue("total_clicks", kv.getValue())
151+
.build()))
152+
.setCoder(RowCoder.of(DESTINATION_SCHEMA));
153+
154+
// Configure the Iceberg write
155+
Map<String, Object> icebergWriteConfig =
156+
ImmutableMap.<String, Object>builder()
157+
.put("table", options.getDestinationTable())
158+
.put("catalog_properties", catalogProps)
159+
.put("catalog_name", options.getCatalogName())
160+
.put("triggering_frequency_seconds", 30)
161+
.build();
162+
163+
aggregatedRows.apply(
164+
"WriteToIceberg", Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));
165+
166+
p.run();
167+
}
168+
}
169+
// [END dataflow_apache_iceberg_cdc_read]
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
// [START dataflow_apache_iceberg_rest_catalog_streaming_write]
20+
import com.google.auth.oauth2.GoogleCredentials;
21+
import com.google.common.collect.ImmutableMap;
22+
import java.io.IOException;
23+
import java.util.Map;
24+
import org.apache.beam.sdk.Pipeline;
25+
import org.apache.beam.sdk.coders.RowCoder;
26+
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
27+
import org.apache.beam.sdk.io.GenerateSequence;
28+
import org.apache.beam.sdk.managed.Managed;
29+
import org.apache.beam.sdk.options.Default;
30+
import org.apache.beam.sdk.options.Description;
31+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
32+
import org.apache.beam.sdk.options.StreamingOptions;
33+
import org.apache.beam.sdk.options.Validation;
34+
import org.apache.beam.sdk.schemas.Schema;
35+
import org.apache.beam.sdk.transforms.MapElements;
36+
import org.apache.beam.sdk.values.Row;
37+
import org.apache.beam.sdk.values.TypeDescriptors;
38+
import org.joda.time.Duration;
39+
40+
/**
41+
* A streaming pipeline that writes data to an Iceberg table using the REST catalog.
42+
*
43+
* <p>This example demonstrates writing to an Iceberg table backed by the BigLake Metastore. For
44+
* more information, see the documentation at
45+
* https://cloud.google.com/bigquery/docs/iceberg-biglake-metastore.
46+
*/
47+
public class ApacheIcebergRestCatalogStreamingWrite {
48+
49+
// The schema for the generated records.
50+
public static final Schema SCHEMA =
51+
Schema.builder().addStringField("user_id").addInt64Field("click_count").build();
52+
53+
/** Pipeline options for this example. */
54+
public interface Options extends GcpOptions, StreamingOptions {
55+
@Description(
56+
"Warehouse location where the table's data will be written to. "
57+
+ "BigLake only supports Single Region buckets")
58+
@Validation.Required
59+
String getWarehouse();
60+
61+
void setWarehouse(String warehouse);
62+
63+
@Description("The URI for the REST catalog")
64+
@Validation.Required
65+
@Default.String("https://biglake.googleapis.com/iceberg/v1beta/restcatalog")
66+
String getCatalogUri();
67+
68+
void setCatalogUri(String value);
69+
70+
@Description("The name of the table to write to")
71+
@Validation.Required
72+
String getIcebergTable();
73+
74+
void setIcebergTable(String value);
75+
76+
@Description("The name of the Apache Iceberg catalog")
77+
@Validation.Required
78+
String getCatalogName();
79+
80+
void setCatalogName(String catalogName);
81+
}
82+
83+
/**
84+
* The main entry point for the pipeline.
85+
*
86+
* @param args Command-line arguments
87+
* @throws IOException If there is an issue with Google Credentials
88+
*/
89+
public static void main(String[] args) throws IOException {
90+
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
91+
options.setStreaming(true);
92+
93+
// Note: The token expires in 1 hour. Users may need to re-run the pipeline.
94+
// Future updates to Iceberg and the BigLake Metastore will support token refreshing.
95+
Map<String, String> catalogProps =
96+
ImmutableMap.<String, String>builder()
97+
.put("type", "rest")
98+
.put("uri", options.getCatalogUri())
99+
.put("warehouse", options.getWarehouse())
100+
.put("header.x-goog-user-project", options.getProject())
101+
.put(
102+
"header.Authorization",
103+
"Bearer "
104+
+ GoogleCredentials.getApplicationDefault()
105+
.createScoped("https://www.googleapis.com/auth/cloud-platform")
106+
.refreshAccessToken()
107+
.getTokenValue())
108+
.put("rest-metrics-reporting-enabled", "false")
109+
.build();
110+
111+
Map<String, Object> icebergWriteConfig =
112+
ImmutableMap.<String, Object>builder()
113+
.put("table", options.getIcebergTable())
114+
.put("catalog_properties", catalogProps)
115+
.put("catalog_name", options.getCatalogName())
116+
.put("triggering_frequency_seconds", 20)
117+
.build();
118+
119+
Pipeline p = Pipeline.create(options);
120+
121+
p.apply(
122+
"GenerateSequence",
123+
GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5)))
124+
.apply(
125+
"ConvertToRows",
126+
MapElements.into(TypeDescriptors.rows())
127+
.via(
128+
i ->
129+
Row.withSchema(SCHEMA)
130+
.withFieldValue("user_id", "user-" + (i % 10))
131+
.withFieldValue("click_count", i % 100)
132+
.build()))
133+
.setCoder(RowCoder.of(SCHEMA))
134+
.apply("WriteToIceberg", Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));
135+
136+
p.run();
137+
}
138+
}
139+
// [END dataflow_apache_iceberg_rest_catalog_streaming_write]

dataflow/snippets/src/main/java/com/example/dataflow/KafkaRead.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public static Pipeline createPipeline(Options options) {
6767
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
6868
.put("bootstrap_servers", options.getBootstrapServer())
6969
.put("topic", options.getTopic())
70-
.put("data_format", "RAW")
70+
.put("format", "RAW")
7171
.put("max_read_time_seconds", 15)
7272
.put("auto_offset_reset_config", "earliest")
7373
.build();

0 commit comments

Comments
 (0)