Skip to content

Commit e2b4e26

Browse files
committed
Unacknowledged Opcode based writes aren't supported
JAVA-2241
1 parent c39037e commit e2b4e26

File tree

8 files changed

+180
-75
lines changed

8 files changed

+180
-75
lines changed

driver-core/src/main/com/mongodb/operation/BaseWriteOperation.java

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@
4646
import static com.mongodb.operation.OperationHelper.AsyncCallableWithConnection;
4747
import static com.mongodb.operation.OperationHelper.CallableWithConnection;
4848
import static com.mongodb.operation.OperationHelper.LOGGER;
49-
import static com.mongodb.operation.OperationHelper.bypassDocumentValidationNotSupported;
50-
import static com.mongodb.operation.OperationHelper.getBypassDocumentValidationException;
49+
import static com.mongodb.operation.OperationHelper.checkBypassDocumentValidationIsSupported;
5150
import static com.mongodb.operation.OperationHelper.releasingCallback;
5251
import static com.mongodb.operation.OperationHelper.serverIsAtLeastVersionTwoDotSix;
5352
import static com.mongodb.operation.OperationHelper.withConnection;
@@ -135,9 +134,7 @@ public WriteConcernResult execute(final WriteBinding binding) {
135134
@Override
136135
public WriteConcernResult call(final Connection connection) {
137136
try {
138-
if (bypassDocumentValidationNotSupported(bypassDocumentValidation, writeConcern, connection.getDescription())) {
139-
throw getBypassDocumentValidationException();
140-
}
137+
checkBypassDocumentValidationIsSupported(connection, bypassDocumentValidation, writeConcern);
141138
if (writeConcern.isAcknowledged() && serverIsAtLeastVersionTwoDotSix(connection.getDescription())) {
142139
return translateBulkWriteResult(executeCommandProtocol(connection));
143140
} else {
@@ -155,36 +152,45 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall
155152
withConnection(binding, new AsyncCallableWithConnection() {
156153
@Override
157154
public void call(final AsyncConnection connection, final Throwable t) {
158-
SingleResultCallback<WriteConcernResult> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
155+
final SingleResultCallback<WriteConcernResult> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
159156
if (t != null) {
160157
errHandlingCallback.onResult(null, t);
161-
} else if (bypassDocumentValidationNotSupported(bypassDocumentValidation, writeConcern, connection.getDescription())) {
162-
releasingCallback(errHandlingCallback, connection).onResult(null, getBypassDocumentValidationException());
163158
} else {
164-
final SingleResultCallback<WriteConcernResult> wrappedCallback = releasingCallback(errHandlingCallback, connection);
165-
if (writeConcern.isAcknowledged() && serverIsAtLeastVersionTwoDotSix(connection.getDescription())) {
166-
executeCommandProtocolAsync(connection, new SingleResultCallback<BulkWriteResult>() {
167-
@Override
168-
public void onResult(final BulkWriteResult result, final Throwable t) {
169-
if (t != null) {
170-
wrappedCallback.onResult(null, translateException(t));
171-
} else {
172-
wrappedCallback.onResult(translateBulkWriteResult(result), null);
159+
checkBypassDocumentValidationIsSupported(connection, bypassDocumentValidation, writeConcern,
160+
new AsyncCallableWithConnection() {
161+
@Override
162+
public void call(final AsyncConnection connection, final Throwable t1) {
163+
if (t1 != null) {
164+
releasingCallback(errHandlingCallback, connection).onResult(null, t1);
165+
} else {
166+
final SingleResultCallback<WriteConcernResult> wrappedCallback =
167+
releasingCallback(errHandlingCallback, connection);
168+
if (writeConcern.isAcknowledged() && serverIsAtLeastVersionTwoDotSix(connection.getDescription())) {
169+
executeCommandProtocolAsync(connection, new SingleResultCallback<BulkWriteResult>() {
170+
@Override
171+
public void onResult(final BulkWriteResult result, final Throwable t) {
172+
if (t != null) {
173+
wrappedCallback.onResult(null, translateException(t));
174+
} else {
175+
wrappedCallback.onResult(translateBulkWriteResult(result), null);
176+
}
177+
}
178+
});
179+
} else {
180+
executeProtocolAsync(connection, new SingleResultCallback<WriteConcernResult>() {
181+
@Override
182+
public void onResult(final WriteConcernResult result, final Throwable t) {
183+
if (t != null) {
184+
wrappedCallback.onResult(null, translateException(t));
185+
} else {
186+
wrappedCallback.onResult(result, null);
187+
}
188+
}
189+
});
190+
}
191+
}
173192
}
174-
}
175-
});
176-
} else {
177-
executeProtocolAsync(connection, new SingleResultCallback<WriteConcernResult>() {
178-
@Override
179-
public void onResult(final WriteConcernResult result, final Throwable t) {
180-
if (t != null) {
181-
wrappedCallback.onResult(null, translateException(t));
182-
} else {
183-
wrappedCallback.onResult(result, null);
184-
}
185-
}
186-
});
187-
}
193+
});
188194
}
189195
}
190196
});

driver-core/src/main/com/mongodb/operation/DeleteOperation.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,14 @@ public List<DeleteRequest> getDeleteRequests() {
6565

6666
@Override
6767
protected WriteConcernResult executeProtocol(final Connection connection) {
68-
checkValidWriteRequestCollations(connection, deleteRequests);
68+
checkValidWriteRequestCollations(connection, deleteRequests, getWriteConcern());
6969
return connection.delete(getNamespace(), isOrdered(), getWriteConcern(), deleteRequests);
7070
}
7171

7272
@Override
7373
protected void executeProtocolAsync(final AsyncConnection connection,
7474
final SingleResultCallback<WriteConcernResult> callback) {
75-
checkValidWriteRequestCollations(connection, deleteRequests, new AsyncCallableWithConnection(){
75+
checkValidWriteRequestCollations(connection, deleteRequests, getWriteConcern(), new AsyncCallableWithConnection(){
7676
@Override
7777
public void call(final AsyncConnection connection, final Throwable t) {
7878
if (t != null) {
@@ -86,13 +86,13 @@ public void call(final AsyncConnection connection, final Throwable t) {
8686

8787
@Override
8888
protected BulkWriteResult executeCommandProtocol(final Connection connection) {
89-
checkValidWriteRequestCollations(connection, deleteRequests);
89+
checkValidWriteRequestCollations(connection, deleteRequests, getWriteConcern());
9090
return connection.deleteCommand(getNamespace(), isOrdered(), getWriteConcern(), deleteRequests);
9191
}
9292

9393
@Override
9494
protected void executeCommandProtocolAsync(final AsyncConnection connection, final SingleResultCallback<BulkWriteResult> callback) {
95-
checkValidWriteRequestCollations(connection, deleteRequests, new AsyncCallableWithConnection(){
95+
checkValidWriteRequestCollations(connection, deleteRequests, getWriteConcern(), new AsyncCallableWithConnection(){
9696
@Override
9797
public void call(final AsyncConnection connection, final Throwable t) {
9898
if (t != null) {

driver-core/src/main/com/mongodb/operation/MixedBulkWriteOperation.java

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,7 @@
5858
import static com.mongodb.operation.OperationHelper.AsyncCallableWithConnection;
5959
import static com.mongodb.operation.OperationHelper.CallableWithConnection;
6060
import static com.mongodb.operation.OperationHelper.LOGGER;
61-
import static com.mongodb.operation.OperationHelper.bypassDocumentValidationNotSupported;
62-
import static com.mongodb.operation.OperationHelper.checkValidWriteRequestCollations;
63-
import static com.mongodb.operation.OperationHelper.getBypassDocumentValidationException;
61+
import static com.mongodb.operation.OperationHelper.checkValidBypassDocumentValidationAndWriteRequestCollations;
6462
import static com.mongodb.operation.OperationHelper.releasingCallback;
6563
import static com.mongodb.operation.OperationHelper.withConnection;
6664
import static java.lang.String.format;
@@ -170,12 +168,8 @@ public BulkWriteResult execute(final WriteBinding binding) {
170168
return withConnection(binding, new CallableWithConnection<BulkWriteResult>() {
171169
@Override
172170
public BulkWriteResult call(final Connection connection) {
173-
if (bypassDocumentValidationNotSupported(bypassDocumentValidation, writeConcern, connection.getDescription())) {
174-
throw getBypassDocumentValidationException();
175-
}
176-
177-
checkValidWriteRequestCollations(connection, writeRequests);
178-
171+
OperationHelper.checkValidWriteRequests(connection, bypassDocumentValidation, writeRequests,
172+
writeConcern);
179173
BulkWriteBatchCombiner bulkWriteBatchCombiner = new BulkWriteBatchCombiner(connection.getDescription().getServerAddress(),
180174
ordered, writeConcern);
181175
for (Run run : getRunGenerator(connection.getDescription())) {
@@ -207,23 +201,21 @@ public void call(final AsyncConnection connection, final Throwable t) {
207201
if (t != null) {
208202
errHandlingCallback.onResult(null, t);
209203
} else {
210-
checkValidWriteRequestCollations(connection, writeRequests, new AsyncCallableWithConnection() {
211-
@Override
212-
public void call(final AsyncConnection connection, final Throwable t) {
213-
if (t != null) {
214-
releasingCallback(errHandlingCallback, connection).onResult(null, t);
215-
} else if (bypassDocumentValidationNotSupported(bypassDocumentValidation, writeConcern,
216-
connection.getDescription())) {
217-
releasingCallback(errHandlingCallback, connection).onResult(null, getBypassDocumentValidationException());
218-
} else {
219-
Iterator<Run> runs = getRunGenerator(connection.getDescription()).iterator();
220-
executeRunsAsync(runs, connection, new BulkWriteBatchCombiner(connection.getDescription()
221-
.getServerAddress(),
222-
ordered, writeConcern),
223-
errHandlingCallback);
224-
}
225-
}
226-
});
204+
checkValidWriteRequests(connection, bypassDocumentValidation, writeRequests,
205+
writeConcern, new AsyncCallableWithConnection() {
206+
@Override
207+
public void call(final AsyncConnection connection, final Throwable t) {
208+
if (t != null) {
209+
releasingCallback(errHandlingCallback, connection).onResult(null, t);
210+
} else {
211+
Iterator<Run> runs = getRunGenerator(connection.getDescription()).iterator();
212+
executeRunsAsync(runs, connection,
213+
new BulkWriteBatchCombiner(connection.getDescription().getServerAddress(), ordered,
214+
writeConcern),
215+
errHandlingCallback);
216+
}
217+
}
218+
});
227219
}
228220
}
229221
});

0 commit comments

Comments
 (0)