Skip to content

Commit c39037e

Browse files
committed
Fix Async GridFS Download to Stream small buffer bug
When using a smaller than chunksize buffer, need to check the internal buffer has been read before trying to get the next chunk. JAVA-2294
1 parent 87e7664 commit c39037e

File tree

2 files changed

+40
-10
lines changed

2 files changed

+40
-10
lines changed

driver-async/src/main/com/mongodb/async/client/gridfs/GridFSDownloadStreamImpl.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public void onResult(final Integer result, final Throwable t) {
141141
private void checkAndFetchResults(final int amountRead, final ByteBuffer dst, final SingleResultCallback<Integer> callback) {
142142
if (currentPosition == fileInfo.getLength() || dst.remaining() == 0) {
143143
callback.onResult(amountRead, null);
144-
} else if (!resultsQueue.isEmpty()) {
144+
} else if (hasResultsToProcess()) {
145145
processResults(amountRead, dst, callback);
146146
} else if (cursor == null) {
147147
chunksCollection.find(new Document("files_id", fileInfo.getId())
@@ -181,23 +181,27 @@ public void onResult(final List<Document> result, final Throwable t) {
181181
private void processResults(final int previousAmountRead, final ByteBuffer dst, final SingleResultCallback<Integer> callback) {
182182
try {
183183
int amountRead = previousAmountRead;
184-
while (currentPosition < fileInfo.getLength() && dst.remaining() > 0 && !resultsQueue.isEmpty()) {
185-
if (buffer == null || bufferOffset == buffer.length) {
184+
int amountToCopy = dst.remaining();
185+
while (currentPosition < fileInfo.getLength() && amountToCopy > 0) {
186+
187+
if (getBufferFromResultsQueue()) {
186188
buffer = getBufferFromChunk(resultsQueue.poll(), chunkIndex);
187189
bufferOffset = 0;
188190
chunkIndex += 1;
189191
}
190192

191-
int amountToCopy = dst.remaining();
192193
if (amountToCopy > buffer.length - bufferOffset) {
193194
amountToCopy = buffer.length - bufferOffset;
194195
}
195-
dst.put(buffer, bufferOffset, amountToCopy);
196-
bufferOffset += amountToCopy;
197-
currentPosition += amountToCopy;
198-
amountRead += amountToCopy;
199-
}
200196

197+
if (amountToCopy > 0) {
198+
dst.put(buffer, bufferOffset, amountToCopy);
199+
bufferOffset += amountToCopy;
200+
currentPosition += amountToCopy;
201+
amountRead += amountToCopy;
202+
amountToCopy = dst.remaining();
203+
}
204+
}
201205
checkAndFetchResults(amountRead, dst, callback);
202206
} catch (MongoGridFSException e) {
203207
callback.onResult(null, e);
@@ -259,6 +263,14 @@ private byte[] getBufferFromChunk(final Document chunk, final int expectedChunkI
259263
return data;
260264
}
261265

266+
private boolean getBufferFromResultsQueue() {
267+
return !resultsQueue.isEmpty() && (buffer == null || bufferOffset == buffer.length);
268+
}
269+
270+
private boolean hasResultsToProcess() {
271+
return !resultsQueue.isEmpty() || (buffer != null && bufferOffset < buffer.length);
272+
}
273+
262274
private <A> boolean tryGetReadingLock(final SingleResultCallback<A> callback) {
263275
if (checkClosed()) {
264276
callbackClosedException(callback);

driver-async/src/test/functional/com/mongodb/async/client/gridfs/GridFSBucketSmokeTestSpecification.groovy

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ class GridFSBucketSmokeTestSpecification extends FunctionalSpecification {
226226
given:
227227
def contentBytes = new byte[9000];
228228
new SecureRandom().nextBytes(contentBytes);
229-
def bufferSize = 3000
229+
def bufferSize = 2000
230230
def options = new GridFSUploadOptions().chunkSizeBytes(4000)
231231

232232
when:
@@ -266,6 +266,24 @@ class GridFSBucketSmokeTestSpecification extends FunctionalSpecification {
266266
fileBuffer.put(byteBuffer.array())
267267
totalRead += read
268268
read = run(downloadStream.&read, byteBuffer.clear())
269+
then:
270+
read == bufferSize
271+
272+
when:
273+
fileBuffer.put(byteBuffer.array())
274+
totalRead += read
275+
read = run(downloadStream.&read, byteBuffer.clear())
276+
277+
then:
278+
read == 1000
279+
280+
when:
281+
def remaining = new byte[read]
282+
byteBuffer.flip()
283+
byteBuffer.get(remaining, 0, 1000)
284+
fileBuffer.put(remaining)
285+
totalRead += read
286+
read = run(downloadStream.&read, byteBuffer.clear())
269287

270288
then:
271289
read == -1

0 commit comments

Comments
 (0)