File tree Expand file tree Collapse file tree 1 file changed +22
-23
lines changed
driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal Expand file tree Collapse file tree 1 file changed +22
-23
lines changed Original file line number Diff line number Diff line change @@ -87,29 +87,28 @@ private void recurseCursor(){
8787 batchCursor .setBatchSize (calculateBatchSize (sink .requestedFromDownstream ()));
8888 Mono .from (batchCursor .next (() -> sink .isCancelled ()))
8989 .doOnCancel (this ::closeCursor )
90- .doOnError ((e ) -> {
91- try {
92- closeCursor ();
93- } finally {
94- sink .error (e );
95- }
96- })
97- .doOnSuccess (results -> {
98- if (!results .isEmpty ()) {
99- results
100- .stream ()
101- .filter (Objects ::nonNull )
102- .forEach (sink ::next );
103- calculateDemand (-results .size ());
104- }
105- if (batchCursor .isClosed ()) {
106- sink .complete ();
107- } else {
108- inProgress .set (false );
109- recurseCursor ();
110- }
111- })
112- .subscribe ();
90+ .subscribe (results -> {
91+ if (!results .isEmpty ()) {
92+ results
93+ .stream ()
94+ .filter (Objects ::nonNull )
95+ .forEach (sink ::next );
96+ calculateDemand (-results .size ());
97+ }
98+ if (batchCursor .isClosed ()) {
99+ sink .complete ();
100+ } else {
101+ inProgress .set (false );
102+ recurseCursor ();
103+ }
104+ },
105+ e -> {
106+ try {
107+ closeCursor ();
108+ } finally {
109+ sink .error (e );
110+ }
111+ });
113112 }
114113 }
115114 }
You can’t perform that action at this time.
0 commit comments