Skip to content

Commit 59e3a00

Browse files
authored
Surface errors when files are modified during request (#6405)
* Ensure that file modification exceptions in FileAsyncRequestBody are propogated correctly. * Update exception thrown to SdkClientException, fix tests * Handle errors in constructor * Add more test cases * PR cleanups
1 parent 87666a0 commit 59e3a00

File tree

4 files changed

+195
-34
lines changed

4 files changed

+195
-34
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Ensure that file modification exceptions in AsyncRequestBody#fromFile are propagated correctly."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 115 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
3737
import software.amazon.awssdk.core.async.CloseableAsyncRequestBody;
3838
import software.amazon.awssdk.core.async.SdkPublisher;
39+
import software.amazon.awssdk.core.exception.SdkClientException;
3940
import software.amazon.awssdk.core.internal.util.Mimetype;
4041
import software.amazon.awssdk.core.internal.util.NoopSubscription;
4142
import software.amazon.awssdk.utils.Logger;
@@ -71,6 +72,8 @@ public final class FileAsyncRequestBody implements AsyncRequestBody {
7172
private final int chunkSizeInBytes;
7273
private final long position;
7374
private final long numBytesToRead;
75+
private FileTime modifiedTimeAtStart;
76+
private Long sizeAtStart;
7477

7578
private FileAsyncRequestBody(DefaultBuilder builder) {
7679
this.path = builder.path;
@@ -79,6 +82,27 @@ private FileAsyncRequestBody(DefaultBuilder builder) {
7982
this.position = builder.position == null ? 0 : Validate.isNotNegative(builder.position, "position");
8083
this.numBytesToRead = builder.numBytesToRead == null ? fileLength - this.position :
8184
Validate.isNotNegative(builder.numBytesToRead, "numBytesToRead");
85+
if (builder.modifiedTimeAtStart != null) {
86+
this.modifiedTimeAtStart = builder.modifiedTimeAtStart;
87+
} else {
88+
try {
89+
this.modifiedTimeAtStart = Files.getLastModifiedTime(path);
90+
} catch (IOException e) {
91+
log.debug(() -> "Failed to get last modified time for path " + path, e);
92+
this.modifiedTimeAtStart = null;
93+
}
94+
}
95+
96+
if (builder.sizeAtStart != null) {
97+
this.sizeAtStart = builder.sizeAtStart;
98+
} else {
99+
try {
100+
this.sizeAtStart = Files.size(path);
101+
} catch (IOException e) {
102+
log.debug(() -> "Failed to get file size for path " + path, e);
103+
this.sizeAtStart = null;
104+
}
105+
}
82106
}
83107

84108
@Override
@@ -112,6 +136,14 @@ public long numBytesToRead() {
112136
return numBytesToRead;
113137
}
114138

139+
public FileTime modifiedTimeAtStart() {
140+
return modifiedTimeAtStart;
141+
}
142+
143+
public Long sizeAtStart() {
144+
return sizeAtStart;
145+
}
146+
115147
@Override
116148
public Optional<Long> contentLength() {
117149
return Optional.of(numBytesToRead);
@@ -131,7 +163,7 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
131163
// We need to synchronize here because the subscriber could call
132164
// request() from within onSubscribe which would potentially
133165
// trigger onNext before onSubscribe is finished.
134-
Subscription subscription = new FileSubscription(channel, s);
166+
Subscription subscription = new FileSubscription(channel, s, modifiedTimeAtStart, sizeAtStart);
135167

136168
synchronized (subscription) {
137169
s.onSubscribe(subscription);
@@ -203,6 +235,20 @@ public interface Builder extends SdkBuilder<Builder, FileAsyncRequestBody> {
203235
* @return The builder for method chaining.
204236
*/
205237
Builder numBytesToRead(Long numBytesToRead);
238+
239+
/**
240+
* Optional - sets the file modified time at the start of the request.
241+
* @param modifiedTimeAtStart initial file modification time
242+
* @return The builder for method chaining.
243+
*/
244+
Builder modifiedTimeAtStart(FileTime modifiedTimeAtStart);
245+
246+
/**
247+
* Optional - sets the file size in bytes at the start of the request.
248+
* @param sizeAtStart initial file size at start.
249+
* @return The builder for method chaining.
250+
*/
251+
Builder sizeAtStart(Long sizeAtStart);
206252
}
207253

208254
private static final class DefaultBuilder implements Builder {
@@ -211,6 +257,8 @@ private static final class DefaultBuilder implements Builder {
211257
private Path path;
212258
private Integer chunkSizeInBytes;
213259
private Long numBytesToRead;
260+
private FileTime modifiedTimeAtStart;
261+
private Long sizeAtStart;
214262

215263
@Override
216264
public Builder path(Path path) {
@@ -240,6 +288,18 @@ public Builder numBytesToRead(Long numBytesToRead) {
240288
return this;
241289
}
242290

291+
@Override
292+
public Builder modifiedTimeAtStart(FileTime modifiedTimeAtStart) {
293+
this.modifiedTimeAtStart = modifiedTimeAtStart;
294+
return this;
295+
}
296+
297+
@Override
298+
public Builder sizeAtStart(Long sizeAtStart) {
299+
this.sizeAtStart = sizeAtStart;
300+
return this;
301+
}
302+
243303
public void setChunkSizeInBytes(Integer chunkSizeInBytes) {
244304
chunkSizeInBytes(chunkSizeInBytes);
245305
}
@@ -267,13 +327,23 @@ private final class FileSubscription implements Subscription {
267327
private final Object lock = new Object();
268328

269329
private FileSubscription(AsynchronousFileChannel inputChannel,
270-
Subscriber<? super ByteBuffer> subscriber) throws IOException {
330+
Subscriber<? super ByteBuffer> subscriber,
331+
FileTime modifiedTimeAtStart, Long sizeAtStart) throws IOException {
271332
this.inputChannel = inputChannel;
272333
this.subscriber = subscriber;
273-
this.sizeAtStart = inputChannel.size();
274-
this.modifiedTimeAtStart = Files.getLastModifiedTime(path);
275334
this.remainingBytes = new AtomicLong(numBytesToRead);
276335
this.currentPosition = new AtomicLong(position);
336+
if (sizeAtStart != null) {
337+
this.sizeAtStart = sizeAtStart;
338+
} else {
339+
this.sizeAtStart = Files.size(path);
340+
}
341+
342+
if (modifiedTimeAtStart != null) {
343+
this.modifiedTimeAtStart = modifiedTimeAtStart;
344+
} else {
345+
this.modifiedTimeAtStart = Files.getLastModifiedTime(path);
346+
}
277347
}
278348

279349
@Override
@@ -338,12 +408,21 @@ public void completed(Integer result, ByteBuffer attachment) {
338408

339409
int readBytes = attachment.remaining();
340410
currentPosition.addAndGet(readBytes);
341-
remainingBytes.addAndGet(-readBytes);
411+
long remaining = remainingBytes.addAndGet(-readBytes);
412+
413+
// we need to validate the file is unchanged before providing the last bytes to subscriber
414+
// the subscriber (eg: NettyRequestExecutor) may cancel subscription once all expected bytes have
415+
// been received. Validating here ensures errors are correctly signaled.
416+
if (remaining == 0) {
417+
closeFile();
418+
if (!validateFileUnchangedAndSignalErrors()) {
419+
return;
420+
}
421+
}
342422

343423
signalOnNext(attachment);
344424

345-
if (remainingBytes.get() == 0) {
346-
closeFile();
425+
if (remaining == 0) {
347426
signalOnComplete();
348427
}
349428

@@ -391,42 +470,49 @@ private void signalOnNext(ByteBuffer attachment) {
391470
}
392471

393472
private void signalOnComplete() {
473+
if (!validateFileUnchangedAndSignalErrors()) {
474+
return;
475+
}
476+
477+
synchronized (this) {
478+
if (!done) {
479+
done = true;
480+
subscriber.onComplete();
481+
}
482+
}
483+
}
484+
485+
private boolean validateFileUnchangedAndSignalErrors() {
394486
try {
395487
long sizeAtEnd = Files.size(path);
396488
if (sizeAtStart != sizeAtEnd) {
397-
signalOnError(new IOException("File size changed after reading started. Initial size: " + sizeAtStart + ". "
398-
+ "Current size: " + sizeAtEnd));
399-
return;
489+
signalOnError(SdkClientException.create("File size changed after reading started. Initial size: "
490+
+ sizeAtStart + ". Current size: " + sizeAtEnd));
491+
return false;
400492
}
401493

402494
if (remainingBytes.get() > 0) {
403-
signalOnError(new IOException("Fewer bytes were read than were expected, was the file modified after "
404-
+ "reading started?"));
405-
return;
495+
signalOnError(SdkClientException.create("Fewer bytes were read than were expected, was the file modified "
496+
+ "after reading started?"));
497+
return false;
406498
}
407499

408500
FileTime modifiedTimeAtEnd = Files.getLastModifiedTime(path);
409501
if (modifiedTimeAtStart.compareTo(modifiedTimeAtEnd) != 0) {
410-
signalOnError(new IOException("File last-modified time changed after reading started. Initial modification "
411-
+ "time: " + modifiedTimeAtStart + ". Current modification time: " +
412-
modifiedTimeAtEnd));
413-
return;
502+
signalOnError(SdkClientException.create("File last-modified time changed after reading started. "
503+
+ "Initial modification time: " + modifiedTimeAtStart
504+
+ ". Current modification time: " + modifiedTimeAtEnd));
505+
return false;
414506
}
415507
} catch (NoSuchFileException e) {
416-
signalOnError(new IOException("Unable to check file status after read. Was the file deleted or were its "
417-
+ "permissions changed?", e));
418-
return;
508+
signalOnError(SdkClientException.create("Unable to check file status after read. Was the file deleted"
509+
+ " or were its permissions changed?", e));
510+
return false;
419511
} catch (IOException e) {
420-
signalOnError(new IOException("Unable to check file status after read.", e));
421-
return;
422-
}
423-
424-
synchronized (this) {
425-
if (!done) {
426-
done = true;
427-
subscriber.onComplete();
428-
}
512+
signalOnError(SdkClientException.create("Unable to check file status after read.", e));
513+
return false;
429514
}
515+
return true;
430516
}
431517

432518
private void signalOnError(Throwable t) {

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.nio.ByteBuffer;
1919
import java.nio.file.Path;
20+
import java.nio.file.attribute.FileTime;
2021
import java.util.Optional;
2122
import java.util.concurrent.atomic.AtomicBoolean;
2223
import java.util.concurrent.atomic.AtomicInteger;
@@ -53,6 +54,8 @@ public final class FileAsyncRequestBodySplitHelper {
5354

5455
private AtomicInteger numAsyncRequestBodiesInFlight = new AtomicInteger(0);
5556
private AtomicInteger chunkIndex = new AtomicInteger(0);
57+
private final FileTime modifiedTimeAtStart;
58+
private final long sizeAtStart;
5659

5760
public FileAsyncRequestBodySplitHelper(FileAsyncRequestBody asyncRequestBody,
5861
AsyncRequestBodySplitConfiguration splitConfiguration) {
@@ -70,6 +73,8 @@ public FileAsyncRequestBodySplitHelper(FileAsyncRequestBody asyncRequestBody,
7073
splitConfiguration.bufferSizeInBytes();
7174
this.bufferPerAsyncRequestBody = Math.min(asyncRequestBody.chunkSizeInBytes(),
7275
NumericUtils.saturatedCast(totalBufferSize));
76+
this.modifiedTimeAtStart = asyncRequestBody.modifiedTimeAtStart();
77+
this.sizeAtStart = asyncRequestBody.sizeAtStart();
7378
}
7479

7580
public SdkPublisher<AsyncRequestBody> split() {
@@ -134,6 +139,8 @@ private AsyncRequestBody newFileAsyncRequestBody(SimplePublisher<AsyncRequestBod
134139
.position(position)
135140
.numBytesToRead(numBytesToReadForThisChunk)
136141
.chunkSizeInBytes(bufferPerAsyncRequestBody)
142+
.modifiedTimeAtStart(modifiedTimeAtStart)
143+
.sizeAtStart(sizeAtStart)
137144
.build();
138145
return new FileAsyncRequestBodyWrapper(fileAsyncRequestBody, simplePublisher);
139146
}

0 commit comments

Comments
 (0)