1515
1616package software .amazon .awssdk .services .s3 .internal .multipart ;
1717
18+ import static software .amazon .awssdk .services .s3 .internal .multipart .MultipartUploadHelper .contentLengthMismatchForPart ;
19+ import static software .amazon .awssdk .services .s3 .internal .multipart .MultipartUploadHelper .partNumMismatch ;
1820import static software .amazon .awssdk .services .s3 .multipart .S3MultipartExecutionAttribute .JAVA_PROGRESS_LISTENER ;
1921
2022import java .util .Collection ;
2123import java .util .HashMap ;
2224import java .util .Map ;
25+ import java .util .Optional ;
2326import java .util .concurrent .CompletableFuture ;
2427import java .util .concurrent .ConcurrentLinkedQueue ;
2528import java .util .concurrent .atomic .AtomicBoolean ;
3235import software .amazon .awssdk .annotations .SdkInternalApi ;
3336import software .amazon .awssdk .core .async .AsyncRequestBody ;
3437import software .amazon .awssdk .core .async .listener .PublisherListener ;
38+ import software .amazon .awssdk .core .exception .SdkClientException ;
3539import software .amazon .awssdk .services .s3 .model .CompleteMultipartUploadResponse ;
3640import software .amazon .awssdk .services .s3 .model .CompletedPart ;
3741import software .amazon .awssdk .services .s3 .model .PutObjectRequest ;
@@ -54,10 +58,10 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
5458 private final AtomicBoolean failureActionInitiated = new AtomicBoolean (false );
5559 private final AtomicInteger partNumber = new AtomicInteger (1 );
5660 private final MultipartUploadHelper multipartUploadHelper ;
57- private final long contentLength ;
61+ private final long totalSize ;
5862 private final long partSize ;
59- private final int partCount ;
60- private final int numExistingParts ;
63+ private final int expectedNumParts ;
64+ private final int existingNumParts ;
6165 private final String uploadId ;
6266 private final Collection <CompletableFuture <CompletedPart >> futures = new ConcurrentLinkedQueue <>();
6367 private final PutObjectRequest putObjectRequest ;
@@ -77,25 +81,21 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
7781 KnownContentLengthAsyncRequestBodySubscriber (MpuRequestContext mpuRequestContext ,
7882 CompletableFuture <PutObjectResponse > returnFuture ,
7983 MultipartUploadHelper multipartUploadHelper ) {
80- this .contentLength = mpuRequestContext .contentLength ();
84+ this .totalSize = mpuRequestContext .contentLength ();
8185 this .partSize = mpuRequestContext .partSize ();
82- this .partCount = determinePartCount ( contentLength , partSize );
86+ this .expectedNumParts = mpuRequestContext . expectedNumParts ( );
8387 this .putObjectRequest = mpuRequestContext .request ().left ();
8488 this .returnFuture = returnFuture ;
8589 this .uploadId = mpuRequestContext .uploadId ();
8690 this .existingParts = mpuRequestContext .existingParts () == null ? new HashMap <>() : mpuRequestContext .existingParts ();
87- this .numExistingParts = NumericUtils .saturatedCast (mpuRequestContext .numPartsCompleted ());
88- this .completedParts = new AtomicReferenceArray <>(partCount );
91+ this .existingNumParts = NumericUtils .saturatedCast (mpuRequestContext .numPartsCompleted ());
92+ this .completedParts = new AtomicReferenceArray <>(expectedNumParts );
8993 this .multipartUploadHelper = multipartUploadHelper ;
9094 this .progressListener = putObjectRequest .overrideConfiguration ().map (c -> c .executionAttributes ()
9195 .getAttribute (JAVA_PROGRESS_LISTENER ))
9296 .orElseGet (PublisherListener ::noOp );
9397 }
9498
95- private int determinePartCount (long contentLength , long partSize ) {
96- return (int ) Math .ceil (contentLength / (double ) partSize );
97- }
98-
9999 public S3ResumeToken pause () {
100100 isPaused = true ;
101101
@@ -119,8 +119,8 @@ public S3ResumeToken pause() {
119119 return S3ResumeToken .builder ()
120120 .uploadId (uploadId )
121121 .partSize (partSize )
122- .totalNumParts ((long ) partCount )
123- .numPartsCompleted (numPartsCompleted + numExistingParts )
122+ .totalNumParts ((long ) expectedNumParts )
123+ .numPartsCompleted (numPartsCompleted + existingNumParts )
124124 .build ();
125125 }
126126
@@ -145,21 +145,32 @@ public void onSubscribe(Subscription s) {
145145
146146 @ Override
147147 public void onNext (AsyncRequestBody asyncRequestBody ) {
148- if (isPaused ) {
148+ if (isPaused || isDone ) {
149149 return ;
150150 }
151151
152- if ( existingParts . containsKey ( partNumber .get ())) {
153- partNumber . getAndIncrement ();
152+ int currentPartNum = partNumber .getAndIncrement ();
153+ if ( existingParts . containsKey ( currentPartNum )) {
154154 asyncRequestBody .subscribe (new CancelledSubscriber <>());
155155 subscription .request (1 );
156156 asyncRequestBody .contentLength ().ifPresent (progressListener ::subscriberOnNext );
157157 return ;
158158 }
159159
160+ Optional <SdkClientException > sdkClientException = validatePart (asyncRequestBody , currentPartNum );
161+ if (sdkClientException .isPresent ()) {
162+ multipartUploadHelper .failRequestsElegantly (futures ,
163+ sdkClientException .get (),
164+ uploadId ,
165+ returnFuture ,
166+ putObjectRequest );
167+ subscription .cancel ();
168+ return ;
169+ }
170+
160171 asyncRequestBodyInFlight .incrementAndGet ();
161172 UploadPartRequest uploadRequest = SdkPojoConversionUtils .toUploadPartRequest (putObjectRequest ,
162- partNumber . getAndIncrement () ,
173+ currentPartNum ,
163174 uploadId );
164175
165176 Consumer <CompletedPart > completedPartConsumer = completedPart -> completedParts .set (completedPart .partNumber () - 1 ,
@@ -179,6 +190,39 @@ public void onNext(AsyncRequestBody asyncRequestBody) {
179190 subscription .request (1 );
180191 }
181192
193+ private Optional <SdkClientException > validatePart (AsyncRequestBody asyncRequestBody , int currentPartNum ) {
194+ if (!asyncRequestBody .contentLength ().isPresent ()) {
195+ return Optional .of (MultipartUploadHelper .contentLengthMissingForPart (currentPartNum ));
196+ }
197+
198+ Long currentPartSize = asyncRequestBody .contentLength ().get ();
199+
200+ if (currentPartNum > expectedNumParts ) {
201+ return Optional .of (partNumMismatch (expectedNumParts , currentPartNum ));
202+ }
203+
204+ if (currentPartNum == expectedNumParts ) {
205+ return validateLastPartSize (currentPartSize );
206+ }
207+
208+ if (currentPartSize != partSize ) {
209+ return Optional .of (contentLengthMismatchForPart (partSize , currentPartSize ));
210+ }
211+ return Optional .empty ();
212+ }
213+
214+ private Optional <SdkClientException > validateLastPartSize (Long currentPartSize ) {
215+ long remainder = totalSize % partSize ;
216+ long expectedLastPartSize = remainder == 0 ? partSize : remainder ;
217+ if (currentPartSize != expectedLastPartSize ) {
218+ return Optional .of (
219+ SdkClientException .create ("Content length of the last part must be equal to the "
220+ + "expected last part size. Expected: " + expectedLastPartSize
221+ + ", Actual: " + currentPartSize ));
222+ }
223+ return Optional .empty ();
224+ }
225+
182226 private boolean shouldFailRequest () {
183227 return failureActionInitiated .compareAndSet (false , true ) && !isPaused ;
184228 }
@@ -187,6 +231,7 @@ private boolean shouldFailRequest() {
187231 public void onError (Throwable t ) {
188232 log .debug (() -> "Received onError " , t );
189233 if (failureActionInitiated .compareAndSet (false , true )) {
234+ isDone = true ;
190235 multipartUploadHelper .failRequestsElegantly (futures , t , uploadId , returnFuture , putObjectRequest );
191236 }
192237 }
@@ -203,6 +248,7 @@ public void onComplete() {
203248 private void completeMultipartUploadIfFinished (int requestsInFlight ) {
204249 if (isDone && requestsInFlight == 0 && completedMultipartInitiated .compareAndSet (false , true )) {
205250 CompletedPart [] parts ;
251+
206252 if (existingParts .isEmpty ()) {
207253 parts =
208254 IntStream .range (0 , completedParts .length ())
@@ -212,15 +258,23 @@ private void completeMultipartUploadIfFinished(int requestsInFlight) {
212258 // List of CompletedParts needs to be in ascending order
213259 parts = mergeCompletedParts ();
214260 }
261+
262+ int actualNumParts = partNumber .get () - 1 ;
263+ if (actualNumParts != expectedNumParts ) {
264+ SdkClientException exception = partNumMismatch (expectedNumParts , actualNumParts );
265+ multipartUploadHelper .failRequestsElegantly (futures , exception , uploadId , returnFuture , putObjectRequest );
266+ return ;
267+ }
268+
215269 completeMpuFuture = multipartUploadHelper .completeMultipartUpload (returnFuture , uploadId , parts , putObjectRequest ,
216- contentLength );
270+ totalSize );
217271 }
218272 }
219273
220274 private CompletedPart [] mergeCompletedParts () {
221- CompletedPart [] merged = new CompletedPart [partCount ];
275+ CompletedPart [] merged = new CompletedPart [expectedNumParts ];
222276 int currPart = 1 ;
223- while (currPart < partCount + 1 ) {
277+ while (currPart < expectedNumParts + 1 ) {
224278 CompletedPart completedPart = existingParts .containsKey (currPart ) ? existingParts .get (currPart ) :
225279 completedParts .get (currPart - 1 );
226280 merged [currPart - 1 ] = completedPart ;
0 commit comments