Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 1b7954e

Browse files
authored
Merge pull request #479 from mizitch/backport-sortvalues
Add SortValues
2 parents 3e65582 + 9e27591 commit 1b7954e

File tree

13 files changed

+1706
-0
lines changed

13 files changed

+1706
-0
lines changed

contrib/sorter/README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#Sorter
2+
This module provides the SortValues transform, which takes a `PCollection<KV<K, Iterable<KV<K2, V>>>>` and produces a `PCollection<KV<K, Iterable<KV<K2, V>>>>` where, for each primary key `K` the paired `Iterable<KV<K2, V>>` has been sorted by the byte encoding of secondary key (`K2`). It will efficiently and scalably sort the iterables, even if they are large (do not fit in memory).
3+
4+
##Caveats
5+
* This transform performs value-only sorting; the iterable accompanying each key is sorted, but *there is no relationship between different keys*, as Dataflow does not support any defined relationship between different elements in a PCollection.
6+
* Each `Iterable<KV<K2, V>>` is sorted on a single worker using local memory and disk. This means that `SortValues` may be a performance and/or scalability bottleneck when used in different pipelines. For example, users are discouraged from using `SortValues` on a `PCollection` of a single element to globally sort a large `PCollection`.
7+
8+
##Options
9+
* The user can customize the temporary location used if sorting requires spilling to disk and the maximum amount of memory to use by creating a custom instance of `BufferedExternalSorter.Options` to pass into `SortValues.create`.
10+
11+
##Using `SortValues`
12+
~~~~
13+
PCollection<KV<String, KV<String, Integer>>> input = ...
14+
15+
// Group by primary key, bringing <SecondaryKey, Value> pairs for the same key together.
16+
PCollection<KV<String, Iterable<KV<String, Integer>>>> grouped =
17+
input.apply(GroupByKey.<String, KV<String, Integer>>create());
18+
19+
// For every primary key, sort the iterable of <SecondaryKey, Value> pairs by secondary key.
20+
PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted =
21+
grouped.apply(
22+
SortValues.<String, String, Integer>create(new BufferedExternalSorter.Options()));
23+
~~~~

contrib/sorter/pom.xml

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
3+
~ Copyright (C) 2016 Google Inc.
4+
~
5+
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
6+
~ use this file except in compliance with the License. You may obtain a copy of
7+
~ the License at
8+
~
9+
~ http://www.apache.org/licenses/LICENSE-2.0
10+
~
11+
~ Unless required by applicable law or agreed to in writing, software
12+
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
~ License for the specific language governing permissions and limitations under
15+
~ the License.
16+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
17+
<project xmlns="http://maven.apache.org/POM/4.0.0"
18+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
22+
<groupId>com.google.cloud.dataflow</groupId>
23+
<artifactId>google-cloud-dataflow-java-contrib-sorter</artifactId>
24+
<name>Google Cloud Dataflow Sorter Library</name>
25+
<description>Library to sort data from within Dataflow pipelines.</description>
26+
<version>0.0.1-SNAPSHOT</version>
27+
<packaging>jar</packaging>
28+
29+
<licenses>
30+
<license>
31+
<name>Apache License, Version 2.0</name>
32+
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
33+
<distribution>repo</distribution>
34+
</license>
35+
</licenses>
36+
37+
<properties>
38+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
39+
<google-cloud-dataflow-version>[1.2.0,2.0.0)</google-cloud-dataflow-version>
40+
<hadoop.version>2.7.1</hadoop.version>
41+
</properties>
42+
43+
<build>
44+
<plugins>
45+
<plugin>
46+
<groupId>org.apache.maven.plugins</groupId>
47+
<artifactId>maven-compiler-plugin</artifactId>
48+
<version>3.2</version>
49+
<configuration>
50+
<source>1.7</source>
51+
<target>1.7</target>
52+
</configuration>
53+
</plugin>
54+
55+
<plugin>
56+
<groupId>org.apache.maven.plugins</groupId>
57+
<artifactId>maven-checkstyle-plugin</artifactId>
58+
<version>2.17</version>
59+
<dependencies>
60+
<dependency>
61+
<groupId>com.puppycrawl.tools</groupId>
62+
<artifactId>checkstyle</artifactId>
63+
<version>6.19</version>
64+
</dependency>
65+
</dependencies>
66+
<configuration>
67+
<configLocation>../../checkstyle.xml</configLocation>
68+
<consoleOutput>true</consoleOutput>
69+
<failOnViolation>true</failOnViolation>
70+
<includeTestSourceDirectory>true</includeTestSourceDirectory>
71+
</configuration>
72+
<executions>
73+
<execution>
74+
<goals>
75+
<goal>check</goal>
76+
</goals>
77+
</execution>
78+
</executions>
79+
</plugin>
80+
81+
<!-- Source plugin for generating source and test-source JARs. -->
82+
<plugin>
83+
<groupId>org.apache.maven.plugins</groupId>
84+
<artifactId>maven-source-plugin</artifactId>
85+
<version>2.4</version>
86+
<executions>
87+
<execution>
88+
<id>attach-sources</id>
89+
<phase>compile</phase>
90+
<goals>
91+
<goal>jar</goal>
92+
</goals>
93+
</execution>
94+
<execution>
95+
<id>attach-test-sources</id>
96+
<phase>test-compile</phase>
97+
<goals>
98+
<goal>test-jar</goal>
99+
</goals>
100+
</execution>
101+
</executions>
102+
</plugin>
103+
104+
<plugin>
105+
<groupId>org.apache.maven.plugins</groupId>
106+
<artifactId>maven-javadoc-plugin</artifactId>
107+
<version>2.10.3</version>
108+
<configuration>
109+
<windowtitle>Google Cloud Dataflow Sorter Contrib</windowtitle>
110+
<doctitle>Google Cloud Dataflow Sorter Contrib</doctitle>
111+
112+
<subpackages>com.google.cloud.dataflow.contrib.sorter</subpackages>
113+
<use>false</use>
114+
<bottom><![CDATA[<br>]]></bottom>
115+
116+
<offlineLinks>
117+
<offlineLink>
118+
<url>https://cloud.google.com/dataflow/java-sdk/JavaDoc/</url>
119+
<location>${basedir}/../../javadoc/dataflow-sdk-docs</location>
120+
</offlineLink>
121+
<offlineLink>
122+
<url>http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/</url>
123+
<location>${basedir}/../../javadoc/guava-docs</location>
124+
</offlineLink>
125+
</offlineLinks>
126+
</configuration>
127+
<executions>
128+
<execution>
129+
<goals>
130+
<goal>jar</goal>
131+
</goals>
132+
<phase>package</phase>
133+
</execution>
134+
</executions>
135+
</plugin>
136+
137+
<!-- Shading Hadoop dependency so that users may use their own version
138+
of Hadoop without interference from this module. -->
139+
<plugin>
140+
<groupId>org.apache.maven.plugins</groupId>
141+
<artifactId>maven-shade-plugin</artifactId>
142+
<executions>
143+
<execution>
144+
<id>bundle-and-repackage</id>
145+
<phase>package</phase>
146+
<goals>
147+
<goal>shade</goal>
148+
</goals>
149+
<configuration>
150+
<shadeTestJar>true</shadeTestJar>
151+
<artifactSet>
152+
<includes>
153+
<include>org.apache.hadoop:hadoop-mapreduce-client-core</include>
154+
<include>org.apache.hadoop:hadoop-common</include>
155+
<include>com.google.guava:guava</include>
156+
</includes>
157+
</artifactSet>
158+
<filters>
159+
<filter>
160+
<artifact>*:*</artifact>
161+
<excludes>
162+
<exclude>META-INF/*.SF</exclude>
163+
<exclude>META-INF/*.DSA</exclude>
164+
<exclude>META-INF/*.RSA</exclude>
165+
</excludes>
166+
</filter>
167+
</filters>
168+
<relocations>
169+
<relocation>
170+
<pattern>org.apache.hadoop</pattern>
171+
<shadedPattern>com.google.cloud.dataflow.repackaged.org.apache.hadoop</shadedPattern>
172+
</relocation>
173+
<relocation>
174+
<pattern>com.google.common</pattern>
175+
<shadedPattern>com.google.cloud.dataflow.repackaged.com.google.common</shadedPattern>
176+
</relocation>
177+
<relocation>
178+
<pattern>com.google.thirdparty</pattern>
179+
<shadedPattern>com.google.cloud.dataflow.repackaged.com.google.thirdparty</shadedPattern>
180+
</relocation>
181+
</relocations>
182+
</configuration>
183+
</execution>
184+
</executions>
185+
</plugin>
186+
187+
</plugins>
188+
</build>
189+
190+
<dependencies>
191+
<dependency>
192+
<groupId>com.google.cloud.dataflow</groupId>
193+
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
194+
<version>${google-cloud-dataflow-version}</version>
195+
</dependency>
196+
197+
<dependency>
198+
<groupId>org.apache.hadoop</groupId>
199+
<artifactId>hadoop-mapreduce-client-core</artifactId>
200+
<version>${hadoop.version}</version>
201+
</dependency>
202+
203+
<dependency>
204+
<groupId>org.apache.hadoop</groupId>
205+
<artifactId>hadoop-common</artifactId>
206+
<version>${hadoop.version}</version>
207+
</dependency>
208+
209+
<dependency>
210+
<groupId>com.google.guava</groupId>
211+
<artifactId>guava</artifactId>
212+
<version>19.0</version>
213+
</dependency>
214+
215+
<!-- test dependencies -->
216+
<dependency>
217+
<groupId>org.hamcrest</groupId>
218+
<artifactId>hamcrest-all</artifactId>
219+
<version>1.3</version>
220+
<scope>test</scope>
221+
</dependency>
222+
223+
<dependency>
224+
<groupId>org.mockito</groupId>
225+
<artifactId>mockito-all</artifactId>
226+
<version>1.10.19</version>
227+
<scope>test</scope>
228+
</dependency>
229+
230+
<dependency>
231+
<groupId>junit</groupId>
232+
<artifactId>junit</artifactId>
233+
<version>4.11</version>
234+
<scope>test</scope>
235+
</dependency>
236+
</dependencies>
237+
</project>
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Copyright (C) 2016 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package com.google.cloud.dataflow.contrib.sorter;
18+
19+
import static com.google.common.base.Preconditions.checkArgument;
20+
21+
import com.google.cloud.dataflow.sdk.values.KV;
22+
import java.io.IOException;
23+
import java.io.Serializable;
24+
25+
/**
26+
* {@link Sorter} that will use in memory sorting until the values can't fit into memory and will
27+
* then fall back to external sorting.
28+
*/
29+
public class BufferedExternalSorter implements Sorter {
30+
/** Contains configuration for the sorter. */
31+
public static class Options implements Serializable {
32+
private String tempLocation = "/tmp";
33+
private int memoryMB = 100;
34+
35+
/** Sets the path to a temporary location where the sorter writes intermediate files. */
36+
public void setTempLocation(String tempLocation) {
37+
checkArgument(
38+
!tempLocation.startsWith("gs://"),
39+
"BufferedExternalSorter does not support GCS temporary location");
40+
41+
this.tempLocation = tempLocation;
42+
}
43+
44+
/** Returns the configured temporary location. */
45+
public String getTempLocation() {
46+
return tempLocation;
47+
}
48+
49+
/**
50+
* Sets the size of the memory buffer in megabytes. This controls both the buffer for initial in
51+
* memory sorting and the buffer used when external sorting. Must be greater than zero.
52+
*/
53+
public void setMemoryMB(int memoryMB) {
54+
checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
55+
this.memoryMB = memoryMB;
56+
}
57+
58+
/** Returns the configured size of the memory buffer. */
59+
public int getMemoryMB() {
60+
return memoryMB;
61+
}
62+
}
63+
64+
private ExternalSorter externalSorter;
65+
private InMemorySorter inMemorySorter;
66+
67+
boolean inMemorySorterFull;
68+
69+
BufferedExternalSorter(ExternalSorter externalSorter, InMemorySorter inMemorySorter) {
70+
this.externalSorter = externalSorter;
71+
this.inMemorySorter = inMemorySorter;
72+
}
73+
74+
public static BufferedExternalSorter create(Options options) {
75+
ExternalSorter.Options externalSorterOptions = new ExternalSorter.Options();
76+
externalSorterOptions.setMemoryMB(options.getMemoryMB());
77+
externalSorterOptions.setTempLocation(options.getTempLocation());
78+
79+
InMemorySorter.Options inMemorySorterOptions = new InMemorySorter.Options();
80+
inMemorySorterOptions.setMemoryMB(options.getMemoryMB());
81+
82+
return new BufferedExternalSorter(
83+
ExternalSorter.create(externalSorterOptions), InMemorySorter.create(inMemorySorterOptions));
84+
}
85+
86+
@Override
87+
public void add(KV<byte[], byte[]> record) throws IOException {
88+
if (!inMemorySorterFull) {
89+
if (inMemorySorter.addIfRoom(record)) {
90+
return;
91+
} else {
92+
// Flushing contents of in memory sorter to external sorter so we can rely on external
93+
// from here on out
94+
inMemorySorterFull = true;
95+
transferToExternalSorter();
96+
}
97+
}
98+
99+
// In memory sorter is full, so put in external sorter instead
100+
externalSorter.add(record);
101+
}
102+
103+
/**
104+
* Transfers all of the records loaded so far into the in memory sorter over to the external
105+
* sorter.
106+
*/
107+
private void transferToExternalSorter() throws IOException {
108+
for (KV<byte[], byte[]> record : inMemorySorter.sort()) {
109+
externalSorter.add(record);
110+
}
111+
// Allow in memory sorter and its contents to be garbage collected
112+
inMemorySorter = null;
113+
}
114+
115+
@Override
116+
public Iterable<KV<byte[], byte[]>> sort() throws IOException {
117+
if (!inMemorySorterFull) {
118+
return inMemorySorter.sort();
119+
} else {
120+
return externalSorter.sort();
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)