2323import com .mongodb .client .result .DeleteResult ;
2424import com .mongodb .diagnostics .logging .Logger ;
2525import com .mongodb .diagnostics .logging .Loggers ;
26+ import com .mongodb .session .ClientSession ;
2627import org .bson .BsonValue ;
2728import org .bson .Document ;
2829import org .bson .types .Binary ;
3940
4041final class GridFSUploadStreamImpl implements GridFSUploadStream {
4142 private static final Logger LOGGER = Loggers .getLogger ("client.gridfs" );
43+ private final ClientSession clientSession ;
4244 private final MongoCollection <GridFSFile > filesCollection ;
4345 private final MongoCollection <Document > chunksCollection ;
4446 private final BsonValue fileId ;
@@ -63,9 +65,10 @@ final class GridFSUploadStreamImpl implements GridFSUploadStream {
6365 private int chunkIndex ;
6466 /* accessed only when writing */
6567
66- GridFSUploadStreamImpl (final MongoCollection <GridFSFile > filesCollection , final MongoCollection <Document > chunksCollection ,
67- final BsonValue fileId , final String filename , final int chunkSizeBytes , final Document metadata ,
68- final GridFSIndexCheck indexCheck ) {
68+ GridFSUploadStreamImpl (final ClientSession clientSession , final MongoCollection <GridFSFile > filesCollection ,
69+ final MongoCollection <Document > chunksCollection , final BsonValue fileId , final String filename ,
70+ final int chunkSizeBytes , final Document metadata , final GridFSIndexCheck indexCheck ) {
71+ this .clientSession = clientSession ;
6972 this .filesCollection = notNull ("files collection" , filesCollection );
7073 this .chunksCollection = notNull ("chunks collection" , chunksCollection );
7174 this .fileId = notNull ("File Id" , fileId );
@@ -99,13 +102,20 @@ public void abort(final SingleResultCallback<Void> callback) {
99102 if (!takeWritingLock (errHandlingCallback )) {
100103 return ;
101104 }
102- chunksCollection .deleteMany (new Document ("files_id" , fileId ), new SingleResultCallback <DeleteResult >() {
105+
106+ SingleResultCallback <DeleteResult > deleteCallback = new SingleResultCallback <DeleteResult >() {
103107 @ Override
104108 public void onResult (final DeleteResult result , final Throwable t ) {
105109 releaseWritingLock ();
106110 errHandlingCallback .onResult (null , t );
107111 }
108- });
112+ };
113+
114+ if (clientSession != null ) {
115+ chunksCollection .deleteMany (clientSession , new Document ("files_id" , fileId ), deleteCallback );
116+ } else {
117+ chunksCollection .deleteMany (new Document ("files_id" , fileId ), deleteCallback );
118+ }
109119 }
110120
111121 @ Override
@@ -167,21 +177,27 @@ public void onResult(final Void result, final Throwable t) {
167177 GridFSFile gridFSFile = new GridFSFile (fileId , filename , lengthInBytes , chunkSizeBytes , new Date (),
168178 toHex (md5 .digest ()), metadata );
169179
170- filesCollection . insertOne ( gridFSFile , new SingleResultCallback <Void >() {
180+ SingleResultCallback < Void > insertCallback = new SingleResultCallback <Void >() {
171181 @ Override
172182 public void onResult (final Void result , final Throwable t ) {
173183 buffer = null ;
174184 releaseWritingLock ();
175185 errHandlingCallback .onResult (result , t );
176186 }
177- });
187+ };
188+
189+ if (clientSession != null ) {
190+ filesCollection .insertOne (clientSession , gridFSFile , insertCallback );
191+ } else {
192+ filesCollection .insertOne (gridFSFile , insertCallback );
193+ }
178194 }
179195 }
180196 });
181197 }
182198
183199 private void write (final int amount , final ByteBuffer src , final SingleResultCallback <Integer > callback ) {
184- if (!takeWritingLock (callback )){
200+ if (!takeWritingLock (callback )) {
185201 return ;
186202 }
187203
@@ -234,21 +250,25 @@ private void writeChunk(final SingleResultCallback<Void> callback) {
234250 if (md5 == null ) {
235251 callback .onResult (null , new MongoGridFSException ("No MD5 message digest available, cannot upload file" ));
236252 } else if (bufferOffset > 0 ) {
237- chunksCollection .insertOne (new Document ("files_id" , fileId ).append ("n" , chunkIndex ).append ("data" , getData ()),
238- new SingleResultCallback <Void >() {
239- @ Override
240- public void onResult (final Void result , final Throwable t ) {
241- if (t != null ) {
242- callback .onResult (null , t );
243- } else {
244- md5 .update (buffer );
245- chunkIndex ++;
246- bufferOffset = 0 ;
247- callback .onResult (null , null );
248- }
249- }
253+ Document insertDocument = new Document ("files_id" , fileId ).append ("n" , chunkIndex ).append ("data" , getData ());
254+ SingleResultCallback <Void > insertCallback = new SingleResultCallback <Void >() {
255+ @ Override
256+ public void onResult (final Void result , final Throwable t ) {
257+ if (t != null ) {
258+ callback .onResult (null , t );
259+ } else {
260+ md5 .update (buffer );
261+ chunkIndex ++;
262+ bufferOffset = 0 ;
263+ callback .onResult (null , null );
250264 }
251- );
265+ }
266+ };
267+ if (clientSession != null ) {
268+ chunksCollection .insertOne (clientSession , insertDocument , insertCallback );
269+ } else {
270+ chunksCollection .insertOne (insertDocument , insertCallback );
271+ }
252272 } else {
253273 callback .onResult (null , null );
254274 }
0 commit comments