Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit c737a68

Browse files
sammcveetydhalperi
authored andcommitted
Backport TextIO ValueProvider changes [BEAM-551] (#499)
* Initial backport * Initial backport * Initial backport * Fixups * Fixups
1 parent 2dd04b0 commit c737a68

File tree

11 files changed

+223
-52
lines changed

11 files changed

+223
-52
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public CompressedSource<T> withDecompression(DecompressingChannelFactory channel
236236
*/
237237
private CompressedSource(
238238
FileBasedSource<T> sourceDelegate, DecompressingChannelFactory channelFactory) {
239-
super(sourceDelegate.getFileOrPatternSpec(), Long.MAX_VALUE);
239+
super(sourceDelegate.getFileOrPatternSpecProvider(), Long.MAX_VALUE);
240240
this.sourceDelegate = sourceDelegate;
241241
this.channelFactory = channelFactory;
242242
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.google.cloud.dataflow.sdk.coders.Coder;
2323
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
2424
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
25+
import com.google.cloud.dataflow.sdk.options.ValueProvider;
26+
import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider;
2527
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
2628
import com.google.cloud.dataflow.sdk.util.FileIOChannelFactory;
2729
import com.google.cloud.dataflow.sdk.util.GcsIOChannelFactory;
@@ -71,7 +73,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
7173
/**
7274
* Base filename for final output files.
7375
*/
74-
protected final String baseOutputFilename;
76+
protected final ValueProvider<String> baseOutputFilename;
7577

7678
/**
7779
* The extension to be used for the final output files.
@@ -88,7 +90,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
8890
* Construct a FileBasedSink with the given base output filename and extension.
8991
*/
9092
public FileBasedSink(String baseOutputFilename, String extension) {
91-
this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX);
93+
this(StaticValueProvider.of(baseOutputFilename), extension, ShardNameTemplate.INDEX_OF_MAX);
9294
}
9395

9496
/**
@@ -98,6 +100,17 @@ public FileBasedSink(String baseOutputFilename, String extension) {
98100
* <p>See {@link ShardNameTemplate} for a description of file naming templates.
99101
*/
100102
public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
103+
this(StaticValueProvider.of(baseOutputFilename), extension, fileNamingTemplate);
104+
}
105+
106+
/**
107+
* Construct a FileBasedSink with the given base output filename, extension, and file naming
108+
* template.
109+
*
110+
* <p>See {@link ShardNameTemplate} for a description of file naming templates.
111+
*/
112+
public FileBasedSink(ValueProvider<String> baseOutputFilename,
113+
String extension, String fileNamingTemplate) {
101114
this.baseOutputFilename = baseOutputFilename;
102115
this.extension = extension;
103116
this.fileNamingTemplate = fileNamingTemplate;
@@ -107,6 +120,13 @@ public FileBasedSink(String baseOutputFilename, String extension, String fileNam
107120
* Returns the base output filename for this file based sink.
108121
*/
109122
public String getBaseOutputFilename() {
123+
return baseOutputFilename.get();
124+
}
125+
126+
/**
127+
* Returns the base output filename for this file based sink.
128+
*/
129+
public ValueProvider<String> getBaseOutputFilenameProvider() {
110130
return baseOutputFilename;
111131
}
112132

@@ -130,7 +150,9 @@ public void populateDisplayData(DisplayData.Builder builder) {
130150
super.populateDisplayData(builder);
131151

132152
String fileNamePattern = String.format("%s%s%s",
133-
baseOutputFilename, fileNamingTemplate, getFileExtension(extension));
153+
baseOutputFilename.isAccessible()
154+
? baseOutputFilename.get() : baseOutputFilename.toString(),
155+
fileNamingTemplate, getFileExtension(extension));
134156
builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
135157
.withLabel("File Name Pattern"));
136158
}
@@ -220,7 +242,7 @@ public enum TemporaryFileRetention {
220242
/**
221243
* Base filename used for temporary output files. Default is the baseOutputFilename.
222244
*/
223-
protected final String baseTemporaryFilename;
245+
protected final ValueProvider<String> baseTemporaryFilename;
224246

225247
/**
226248
* Name separator for temporary files. Temporary files will be named
@@ -243,7 +265,7 @@ protected static final String buildTemporaryFilename(String prefix, String suffi
243265
* @param sink the FileBasedSink that will be used to configure this write operation.
244266
*/
245267
public FileBasedWriteOperation(FileBasedSink<T> sink) {
246-
this(sink, sink.baseOutputFilename);
268+
this(sink, sink.getBaseOutputFilenameProvider(), TemporaryFileRetention.REMOVE);
247269
}
248270

249271
/**
@@ -253,7 +275,7 @@ public FileBasedWriteOperation(FileBasedSink<T> sink) {
253275
* @param baseTemporaryFilename the base filename to be used for temporary output files.
254276
*/
255277
public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename) {
256-
this(sink, baseTemporaryFilename, TemporaryFileRetention.REMOVE);
278+
this(sink, StaticValueProvider.of(baseTemporaryFilename), TemporaryFileRetention.REMOVE);
257279
}
258280

259281
/**
@@ -265,6 +287,12 @@ public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilena
265287
*/
266288
public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename,
267289
TemporaryFileRetention temporaryFileRetention) {
290+
this(sink, StaticValueProvider.of(baseTemporaryFilename), temporaryFileRetention);
291+
}
292+
293+
private FileBasedWriteOperation(FileBasedSink<T> sink,
294+
ValueProvider<String> baseTemporaryFilename,
295+
TemporaryFileRetention temporaryFileRetention) {
268296
this.sink = sink;
269297
this.baseTemporaryFilename = baseTemporaryFilename;
270298
this.temporaryFileRetention = temporaryFileRetention;
@@ -360,7 +388,7 @@ protected final List<String> copyToOutputFiles(List<String> filenames, PipelineO
360388
protected final List<String> generateDestinationFilenames(int numFiles) {
361389
List<String> destFilenames = new ArrayList<>();
362390
String extension = getSink().extension;
363-
String baseOutputFilename = getSink().baseOutputFilename;
391+
String baseOutputFilename = getSink().baseOutputFilename.get();
364392
String fileNamingTemplate = getSink().fileNamingTemplate;
365393

366394
String suffix = getFileExtension(extension);
@@ -395,7 +423,7 @@ protected final void removeTemporaryFiles(PipelineOptions options) throws IOExce
395423
*/
396424
protected final void removeTemporaryFiles(
397425
Collection<String> knownFiles, PipelineOptions options) throws IOException {
398-
String pattern = buildTemporaryFilename(baseTemporaryFilename, "*");
426+
String pattern = buildTemporaryFilename(baseTemporaryFilename.get(), "*");
399427
LOG.debug("Finding temporary bundle output files matching {}.", pattern);
400428
FileOperations fileOperations = FileOperationsFactory.getFileOperations(pattern, options);
401429
IOChannelFactory factory = IOChannelUtils.getFactory(pattern);
@@ -508,7 +536,7 @@ protected void writeFooter() throws Exception {}
508536
public final void open(String uId) throws Exception {
509537
this.id = uId;
510538
filename = FileBasedWriteOperation.buildTemporaryFilename(
511-
getWriteOperation().baseTemporaryFilename, uId);
539+
getWriteOperation().baseTemporaryFilename.get(), uId);
512540
LOG.debug("Opening {}.", filename);
513541
channel = IOChannelUtils.create(filename, mimeType);
514542
try {

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import static com.google.common.base.Preconditions.checkState;
1919

2020
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
21+
import com.google.cloud.dataflow.sdk.options.ValueProvider;
22+
import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider;
2123
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
2224
import com.google.cloud.dataflow.sdk.util.IOChannelFactory;
2325
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
@@ -73,7 +75,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
7375
// Package-private for testing.
7476
static final int THREAD_POOL_SIZE = 128;
7577

76-
private final String fileOrPatternSpec;
78+
private final ValueProvider<String> fileOrPatternSpec;
7779
private final Mode mode;
7880

7981
/**
@@ -95,6 +97,16 @@ public enum Mode {
9597
* @param minBundleSize minimum bundle size in bytes.
9698
*/
9799
public FileBasedSource(String fileOrPatternSpec, long minBundleSize) {
100+
this(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
101+
}
102+
103+
104+
/**
105+
* Create a {@code FileBaseSource} based on a file or a file pattern specification.
106+
* Same as the {@code String} constructor, but accepting a {@link ValueProvider}
107+
* to allow for runtime configuration of the source.
108+
*/
109+
public FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
98110
super(0, Long.MAX_VALUE, minBundleSize);
99111
mode = Mode.FILEPATTERN;
100112
this.fileOrPatternSpec = fileOrPatternSpec;
@@ -120,10 +132,14 @@ public FileBasedSource(String fileName, long minBundleSize,
120132
long startOffset, long endOffset) {
121133
super(startOffset, endOffset, minBundleSize);
122134
mode = Mode.SINGLE_FILE_OR_SUBRANGE;
123-
this.fileOrPatternSpec = fileName;
135+
this.fileOrPatternSpec = StaticValueProvider.of(fileName);
124136
}
125137

126138
public final String getFileOrPatternSpec() {
139+
return fileOrPatternSpec.get();
140+
}
141+
142+
public final ValueProvider<String> getFileOrPatternSpecProvider() {
127143
return fileOrPatternSpec;
128144
}
129145

@@ -142,7 +158,9 @@ public final FileBasedSource<T> createSourceForSubrange(long start, long end) {
142158
+ " of the subrange cannot be larger than the end offset value " + getEndOffset()
143159
+ " of the parent source");
144160

145-
FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec, start, end);
161+
checkState(fileOrPatternSpec.isAccessible(),
162+
"Subrange creation should only happen at execution time.");
163+
FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec.get(), start, end);
146164
if (start > 0 || end != Long.MAX_VALUE) {
147165
checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
148166
"Source created for the range [" + start + "," + end + ")"
@@ -180,19 +198,21 @@ public final long getEstimatedSizeBytes(PipelineOptions options) throws Exceptio
180198
// we perform the size estimation of files and file patterns using the interface provided by
181199
// IOChannelFactory.
182200

183-
IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
184201
if (mode == Mode.FILEPATTERN) {
202+
checkState(fileOrPatternSpec.isAccessible(),
203+
"Size estimation should be done at execution time.");
204+
IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
185205
// TODO Implement a more efficient parallel/batch size estimation mechanism for file patterns.
186206
long startTime = System.currentTimeMillis();
187207
long totalSize = 0;
188-
Collection<String> inputs = factory.match(fileOrPatternSpec);
208+
Collection<String> inputs = factory.match(fileOrPatternSpec.get());
189209
if (inputs.size() <= MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT) {
190210
totalSize = getExactTotalSizeOfFiles(inputs, factory);
191-
LOG.debug("Size estimation of all files of pattern " + fileOrPatternSpec + " took "
211+
LOG.debug("Size estimation of all files of pattern " + fileOrPatternSpec.get() + " took "
192212
+ (System.currentTimeMillis() - startTime) + " ms");
193213
} else {
194214
totalSize = getEstimatedSizeOfFilesBySampling(inputs, factory);
195-
LOG.debug("Size estimation of pattern " + fileOrPatternSpec + " by sampling took "
215+
LOG.debug("Size estimation of pattern " + fileOrPatternSpec.get() + " by sampling took "
196216
+ (System.currentTimeMillis() - startTime) + " ms");
197217
}
198218
return totalSize;
@@ -261,7 +281,7 @@ private static long getEstimatedSizeOfFilesBySampling(
261281
@Override
262282
public void populateDisplayData(DisplayData.Builder builder) {
263283
super.populateDisplayData(builder);
264-
builder.add(DisplayData.item("filePattern", getFileOrPatternSpec())
284+
builder.add(DisplayData.item("filePattern", getFileOrPatternSpecProvider())
265285
.withLabel("File Pattern"));
266286
}
267287

@@ -294,7 +314,9 @@ public final List<? extends FileBasedSource<T>> splitIntoBundles(
294314
ListeningExecutorService service =
295315
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
296316
try {
297-
for (final String file : FileBasedSource.expandFilePattern(fileOrPatternSpec)) {
317+
checkState(fileOrPatternSpec.isAccessible(),
318+
"Bundle splitting should only happen at execution time.");
319+
for (final String file : FileBasedSource.expandFilePattern(fileOrPatternSpec.get())) {
298320
futures.add(createFutureForFileSplit(file, desiredBundleSizeBytes, options, service));
299321
}
300322
List<? extends FileBasedSource<T>> splitResults =
@@ -334,8 +356,10 @@ protected boolean isSplittable() throws Exception {
334356
// We split a file-based source into subranges only if the file is efficiently seekable.
335357
// If a file is not efficiently seekable it would be highly inefficient to create and read a
336358
// source based on a subrange of that file.
337-
IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
338-
return factory.isReadSeekEfficient(fileOrPatternSpec);
359+
checkState(fileOrPatternSpec.isAccessible(),
360+
"isSplittable should only be called at runtime.");
361+
IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
362+
return factory.isReadSeekEfficient(fileOrPatternSpec.get());
339363
}
340364

341365
@Override
@@ -345,7 +369,7 @@ public final BoundedReader<T> createReader(PipelineOptions options) throws IOExc
345369

346370
if (mode == Mode.FILEPATTERN) {
347371
long startTime = System.currentTimeMillis();
348-
Collection<String> files = FileBasedSource.expandFilePattern(fileOrPatternSpec);
372+
Collection<String> files = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
349373
List<FileBasedReader<T>> fileReaders = new ArrayList<>();
350374
for (String fileName : files) {
351375
long endOffset;
@@ -373,9 +397,9 @@ public final BoundedReader<T> createReader(PipelineOptions options) throws IOExc
373397
public String toString() {
374398
switch (mode) {
375399
case FILEPATTERN:
376-
return fileOrPatternSpec;
400+
return fileOrPatternSpec.toString();
377401
case SINGLE_FILE_OR_SUBRANGE:
378-
return fileOrPatternSpec + " range " + super.toString();
402+
return fileOrPatternSpec.toString() + " range " + super.toString();
379403
default:
380404
throw new IllegalStateException("Unexpected mode: " + mode);
381405
}
@@ -407,8 +431,8 @@ public final long getMaxEndOffset(PipelineOptions options) throws Exception {
407431
throw new IllegalArgumentException("Cannot determine the exact end offset of a file pattern");
408432
}
409433
if (getEndOffset() == Long.MAX_VALUE) {
410-
IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
411-
return factory.getSizeBytes(fileOrPatternSpec);
434+
IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
435+
return factory.getSizeBytes(fileOrPatternSpec.get());
412436
} else {
413437
return getEndOffset();
414438
}
@@ -480,8 +504,9 @@ public synchronized FileBasedSource<T> getCurrentSource() {
480504
@Override
481505
protected final boolean startImpl() throws IOException {
482506
FileBasedSource<T> source = getCurrentSource();
483-
IOChannelFactory factory = IOChannelUtils.getFactory(source.getFileOrPatternSpec());
484-
this.channel = factory.open(source.getFileOrPatternSpec());
507+
IOChannelFactory factory = IOChannelUtils.getFactory(
508+
source.getFileOrPatternSpecProvider().get());
509+
this.channel = factory.open(source.getFileOrPatternSpecProvider().get());
485510

486511
if (channel instanceof SeekableByteChannel) {
487512
SeekableByteChannel seekChannel = (SeekableByteChannel) channel;

0 commit comments

Comments
 (0)