3434import org .reactivestreams .Subscriber ;
3535import reactor .core .publisher .Flux ;
3636import reactor .core .publisher .Mono ;
37+ import reactor .util .function .Tuple2 ;
3738
3839import java .nio .ByteBuffer ;
3940import java .util .Date ;
4041import java .util .Map ;
4142import java .util .concurrent .atomic .AtomicBoolean ;
42- import java .util .concurrent .atomic .AtomicInteger ;
43- import java .util .concurrent .atomic .AtomicLong ;
44- import java .util .function .Consumer ;
4543import java .util .function .Function ;
4644
4745import static com .mongodb .ReadPreference .primary ;
4846import static com .mongodb .assertions .Assertions .notNull ;
4947
50-
5148/**
5249 * <p>This class is not part of the public API and may be removed or changed at any time</p>
5350 */
@@ -98,31 +95,22 @@ public BsonValue getId() {
9895
9996 @ Override
10097 public void subscribe (final Subscriber <? super Void > s ) {
101- Mono .< Void > create ( sink -> {
98+ Mono .deferContextual ( ctx -> {
10299 AtomicBoolean terminated = new AtomicBoolean (false );
103- sink .onCancel (() -> createCancellationMono (terminated ).subscribe ());
104-
105- Consumer <Throwable > errorHandler = e -> createCancellationMono (terminated )
106- .doOnError (i -> sink .error (e ))
107- .doOnSuccess (i -> sink .error (e ))
108- .subscribe ();
109-
110- Consumer <Long > saveFileDataMono = l -> createSaveFileDataMono (terminated , l )
111- .doOnError (errorHandler )
112- .doOnSuccess (i -> sink .success ())
113- .subscribe ();
114-
115- Consumer <Void > saveChunksMono = i -> createSaveChunksMono (terminated )
116- .doOnError (errorHandler )
117- .doOnSuccess (saveFileDataMono )
118- .subscribe ();
119-
120- createCheckAndCreateIndexesMono ()
121- .doOnError (errorHandler )
122- .doOnSuccess (saveChunksMono )
123- .subscribe ();
124- })
125- .subscribe (s );
100+ return createCheckAndCreateIndexesMono ()
101+ .then (createSaveChunksMono (terminated ))
102+ .flatMap (lengthInBytes -> createSaveFileDataMono (terminated , lengthInBytes ))
103+ .onErrorResume (originalError ->
104+ createCancellationMono (terminated )
105+ .onErrorMap (cancellationError -> {
106+ // Timeout exception might occur during cancellation. It gets suppressed.
107+ originalError .addSuppressed (cancellationError );
108+ return originalError ;
109+ })
110+ .then (Mono .error (originalError )))
111+ .doOnCancel (() -> createCancellationMono (terminated ).contextWrite (ctx ).subscribe ())
112+ .then ();
113+ }).subscribe (s );
126114 }
127115
128116 public GridFSUploadPublisher <ObjectId > withObjectId () {
@@ -156,28 +144,14 @@ private Mono<Void> createCheckAndCreateIndexesMono() {
156144 } else {
157145 findPublisher = collection .find ();
158146 }
159- AtomicBoolean collectionExists = new AtomicBoolean (false );
147+ return Mono .from (findPublisher .projection (PROJECTION ).first ())
148+ .switchIfEmpty (Mono .defer (() ->
149+ checkAndCreateIndex (filesCollection .withReadPreference (primary ()), FILES_INDEX )
150+ .then (checkAndCreateIndex (chunksCollection .withReadPreference (primary ()), CHUNKS_INDEX ))
151+ .then (Mono .fromCallable (Document ::new ))
152+ ))
153+ .then ();
160154
161- return Mono .create (sink -> Mono .from (findPublisher .projection (PROJECTION ).first ())
162- .subscribe (
163- d -> collectionExists .set (true ),
164- sink ::error ,
165- () -> {
166- if (collectionExists .get ()) {
167- sink .success ();
168- } else {
169- checkAndCreateIndex (filesCollection .withReadPreference (primary ()), FILES_INDEX )
170- .doOnError (sink ::error )
171- .doOnSuccess (i -> {
172- checkAndCreateIndex (chunksCollection .withReadPreference (primary ()), CHUNKS_INDEX )
173- .doOnError (sink ::error )
174- .doOnSuccess (sink ::success )
175- .subscribe ();
176- })
177- .subscribe ();
178- }
179- })
180- );
181155 }
182156
183157 private <T > Mono <Boolean > hasIndex (final MongoCollection <T > collection , final Document index ) {
@@ -189,29 +163,23 @@ private <T> Mono<Boolean> hasIndex(final MongoCollection<T> collection, final Do
189163 }
190164
191165 return Flux .from (listIndexesPublisher )
192- .collectList ()
193- .map (indexes -> {
194- boolean hasIndex = false ;
195- for (Document result : indexes ) {
196- Document indexDoc = result .get ("key" , new Document ());
197- for (final Map .Entry <String , Object > entry : indexDoc .entrySet ()) {
198- if (entry .getValue () instanceof Number ) {
199- entry .setValue (((Number ) entry .getValue ()).intValue ());
200- }
201- }
202- if (indexDoc .equals (index )) {
203- hasIndex = true ;
204- break ;
166+ .filter ((result ) -> {
167+ Document indexDoc = result .get ("key" , new Document ());
168+ for (final Map .Entry <String , Object > entry : indexDoc .entrySet ()) {
169+ if (entry .getValue () instanceof Number ) {
170+ entry .setValue (((Number ) entry .getValue ()).intValue ());
205171 }
206172 }
207- return hasIndex ;
208- });
173+ return indexDoc .equals (index );
174+ })
175+ .take (1 )
176+ .hasElements ();
209177 }
210178
211179 private <T > Mono <Void > checkAndCreateIndex (final MongoCollection <T > collection , final Document index ) {
212180 return hasIndex (collection , index ).flatMap (hasIndex -> {
213181 if (!hasIndex ) {
214- return createIndexMono (collection , index ).flatMap ( s -> Mono . empty () );
182+ return createIndexMono (collection , index ).then ( );
215183 } else {
216184 return Mono .empty ();
217185 }
@@ -223,14 +191,14 @@ private <T> Mono<String> createIndexMono(final MongoCollection<T> collection, fi
223191 }
224192
225193 private Mono <Long > createSaveChunksMono (final AtomicBoolean terminated ) {
226- return Mono .create (sink -> {
227- AtomicLong lengthInBytes = new AtomicLong (0 );
228- AtomicInteger chunkIndex = new AtomicInteger (0 );
229- new ResizingByteBufferFlux (source , chunkSizeBytes )
230- .flatMap ((Function <ByteBuffer , Publisher <InsertOneResult >>) byteBuffer -> {
194+ return new ResizingByteBufferFlux (source , chunkSizeBytes )
195+ .index ()
196+ .flatMap ((Function <Tuple2 <Long , ByteBuffer >, Publisher <Integer >>) indexAndBuffer -> {
231197 if (terminated .get ()) {
232198 return Mono .empty ();
233199 }
200+ Long index = indexAndBuffer .getT1 ();
201+ ByteBuffer byteBuffer = indexAndBuffer .getT2 ();
234202 byte [] byteArray = new byte [byteBuffer .remaining ()];
235203 if (byteBuffer .hasArray ()) {
236204 System .arraycopy (byteBuffer .array (), byteBuffer .position (), byteArray , 0 , byteBuffer .remaining ());
@@ -240,18 +208,19 @@ private Mono<Long> createSaveChunksMono(final AtomicBoolean terminated) {
240208 byteBuffer .reset ();
241209 }
242210 Binary data = new Binary (byteArray );
243- lengthInBytes .addAndGet (data .length ());
244211
245212 Document chunkDocument = new Document ("files_id" , fileId )
246- .append ("n" , chunkIndex . getAndIncrement ())
213+ .append ("n" , index . intValue ())
247214 .append ("data" , data );
248215
249- return clientSession == null ? chunksCollection .insertOne (chunkDocument )
216+ Publisher <InsertOneResult > insertOnePublisher = clientSession == null
217+ ? chunksCollection .insertOne (chunkDocument )
250218 : chunksCollection .insertOne (clientSession , chunkDocument );
219+
220+ return Mono .from (insertOnePublisher ).thenReturn (data .length ());
251221 })
252- .subscribe (null , sink ::error , () -> sink .success (lengthInBytes .get ()));
253- });
254- }
222+ .reduce (0L , Long ::sum );
223+ }
255224
256225 private Mono <InsertOneResult > createSaveFileDataMono (final AtomicBoolean terminated , final long lengthInBytes ) {
257226 if (terminated .compareAndSet (false , true )) {
0 commit comments