@@ -39,7 +39,6 @@ public class DSInboundInvoke extends DSInboundRequest
3939 private static final int STATE_UPDATES = 2 ;
4040 private static final int STATE_CLOSE_PENDING = 3 ;
4141 private static final int STATE_CLOSED = 4 ;
42- private static final int STATE_RAW = 10 ;
4342
4443 ///////////////////////////////////////////////////////////////////////////
4544 // Instance Fields
@@ -93,8 +92,7 @@ public void close(Exception reason) {
9392 return ;
9493 }
9594 closeReason = reason ;
96- state = STATE_CLOSE_PENDING ;
97- enqueueResponse ();
95+ close ();
9896 }
9997
10098 /**
@@ -155,7 +153,8 @@ public void run() {
155153 setPath (path .getPath ());
156154 result = responder .onInvoke (this );
157155 if (result instanceof RawActionResult ) {
158- state = STATE_RAW ;
156+ state = STATE_UPDATES ;
157+ return ;
159158 }
160159 } else {
161160 DSInfo info = path .getTargetInfo ();
@@ -194,7 +193,11 @@ public void send(DSList row) {
194193 enqueueUpdate (new Update (row ));
195194 }
196195
196+ /**
197+ * For the broker to use as a pass-thru mechanism.
198+ */
197199 public void sendRaw (DSMap raw ) {
200+ state = STATE_UPDATES ;
198201 enqueueUpdate (new Update (raw ));
199202 }
200203
@@ -219,9 +222,6 @@ public boolean write(DSSession session, MessageWriter writer) {
219222 }
220223 writeBegin (writer );
221224 switch (state ) {
222- case STATE_RAW :
223- writeRaw (writer );
224- break ;
225225 case STATE_INIT :
226226 writeColumns (writer );
227227 writeInitialResults (writer );
@@ -306,14 +306,11 @@ private void doClose() {
306306 if (result == null ) {
307307 return ;
308308 }
309- DSRuntime .run (new Runnable () {
310- @ Override
311- public void run () {
312- try {
313- result .onClose ();
314- } catch (Exception x ) {
315- error (getPath (), x );
316- }
309+ DSRuntime .run (() -> {
310+ try {
311+ result .onClose ();
312+ } catch (Exception x ) {
313+ error (getPath (), x );
317314 }
318315 });
319316 }
@@ -437,29 +434,17 @@ private void writeInitialResults(MessageWriter writer) {
437434 }
438435 }
439436
440- private void writeRaw (MessageWriter writer ) {
437+ /**
438+ * Only called by writeUpdates.
439+ */
440+ private void writeRaw (Update update , MessageWriter writer ) {
441441 DSIWriter out = writer .getWriter ();
442- Update update = updateHead ; //peak ahead
443- if (update == null ) {
444- return ;
445- }
446442 DSMap map ;
447- DSResponder responder = getResponder ();
448- while (true ) {
449- update = dequeueUpdate ();
450- map = update .raw ;
451- map .remove ("rid" );
452- for (DSMap .Entry e : map ) {
453- out .key (e .getKey ());
454- out .value (e .getValue ());
455- }
456- if (updateHead == null ) {
457- break ;
458- }
459- if (responder .shouldEndMessage ()) {
460- enqueueResponse ();
461- break ;
462- }
443+ map = update .raw ;
444+ map .remove ("rid" );
445+ for (DSMap .Entry e : map ) {
446+ out .key (e .getKey ());
447+ out .value (e .getValue ());
463448 }
464449 String stream = map .get ("stream" , "" );
465450 if (stream .equals ("closed" )) {
@@ -468,11 +453,15 @@ private void writeRaw(MessageWriter writer) {
468453 }
469454
470455 private void writeUpdates (MessageWriter writer ) {
471- DSIWriter out = writer .getWriter ();
472456 Update update = updateHead ; //peak ahead
473457 if (update == null ) {
474458 return ;
475459 }
460+ if (update .isRaw ()) {
461+ writeRaw (update , writer );
462+ return ;
463+ }
464+ DSIWriter out = writer .getWriter ();
476465 if (update .type != null ) {
477466 out .key ("meta" )
478467 .beginMap ()
@@ -494,7 +483,8 @@ private void writeUpdates(MessageWriter writer) {
494483 if ((updateHead == null ) || (updateHead .type != null )) {
495484 break ;
496485 }
497- if (responder .shouldEndMessage ()) {
486+ //raw messaging won't be mixed with normal messaging, but best to be safe
487+ if (updateHead .isRaw () || responder .shouldEndMessage ()) {
498488 enqueueResponse ();
499489 break ;
500490 }
@@ -569,6 +559,10 @@ protected static class Update {
569559 this .type = type ;
570560 }
571561
562+ boolean isRaw () {
563+ return type == UpdateType .RAW ;
564+ }
565+
572566 String typeKey () {
573567 if ((type == UpdateType .INSERT ) || (type == UpdateType .REPLACE )) {
574568 return "modify" ;
0 commit comments