44import com .acuity .iot .dsa .dslink .protocol .responder .DSResponder ;
55import com .acuity .iot .dsa .dslink .transport .DSTransport ;
66import java .io .IOException ;
7- import java .util .LinkedList ;
8- import java .util .List ;
7+ import java .util .concurrent .ConcurrentLinkedQueue ;
98import org .iot .dsa .conn .DSConnection ;
109import org .iot .dsa .conn .DSIConnected ;
1110import org .iot .dsa .dslink .DSIRequester ;
@@ -49,8 +48,8 @@ public abstract class DSSession extends DSNode implements DSIConnected {
4948 private int messageId = 0 ;
5049 private int nextMessage = 1 ;
5150 private final Object outgoingMutex = new Object ();
52- private List <OutboundMessage > outgoingRequests = new LinkedList <OutboundMessage >();
53- private List <OutboundMessage > outgoingResponses = new LinkedList <OutboundMessage >();
51+ private ConcurrentLinkedQueue <OutboundMessage > outgoingRequests = new ConcurrentLinkedQueue <OutboundMessage >();
52+ private ConcurrentLinkedQueue <OutboundMessage > outgoingResponses = new ConcurrentLinkedQueue <OutboundMessage >();
5453 private DSInfo requesterAllowed = getInfo (REQUESTER_ALLOWED );
5554 private ReadThread readThread ;
5655 private WriteThread writeThread ;
@@ -78,10 +77,8 @@ public void enqueueOutgoingRequest(OutboundMessage arg) {
7877 if (!isRequesterAllowed ()) {
7978 throw new IllegalStateException ("Requester not allowed" );
8079 }
81- synchronized (outgoingMutex ) {
82- outgoingRequests .add (arg );
83- outgoingMutex .notify ();
84- }
80+ outgoingRequests .add (arg );
81+ notifyOutgoing ();
8582 }
8683 }
8784
@@ -90,10 +87,8 @@ public void enqueueOutgoingRequest(OutboundMessage arg) {
9087 */
9188 public void enqueueOutgoingResponse (OutboundMessage arg ) {
9289 if (connected ) {
93- synchronized (outgoingMutex ) {
94- outgoingResponses .add (arg );
95- outgoingMutex .notify ();
96- }
90+ outgoingResponses .add (arg );
91+ notifyOutgoing ();
9792 }
9893 }
9994
@@ -175,24 +170,14 @@ protected void declareDefaults() {
175170 * Can return null.
176171 */
177172 protected OutboundMessage dequeueOutgoingRequest () {
178- synchronized (outgoingMutex ) {
179- if (!outgoingRequests .isEmpty ()) {
180- return outgoingRequests .remove (0 );
181- }
182- }
183- return null ;
173+ return outgoingRequests .poll ();
184174 }
185175
186176 /**
187177 * Can return null.
188178 */
189179 protected OutboundMessage dequeueOutgoingResponse () {
190- synchronized (outgoingMutex ) {
191- if (!outgoingResponses .isEmpty ()) {
192- return outgoingResponses .remove (0 );
193- }
194- }
195- return null ;
180+ return outgoingResponses .poll ();
196181 }
197182
198183 /**
@@ -240,20 +225,25 @@ protected boolean hasAckToSend() {
240225 * Override point, this returns the result of hasMessagesToSend.
241226 */
242227 protected boolean hasSomethingToSend () {
243- if (ackToSend >= 0 ) {
244- return true ;
245- }
246- if (hasPingToSend ()) {
228+ if (hasAckToSend () || hasPingToSend ()) {
247229 return true ;
248230 }
249231 if (waitingForAcks ()) {
250232 return false ;
251233 }
252234 if (!outgoingResponses .isEmpty ()) {
253- return true ;
235+ for (OutboundMessage msg : outgoingResponses ) {
236+ if (msg .canWrite (this )) {
237+ return true ;
238+ }
239+ }
254240 }
255241 if (!outgoingRequests .isEmpty ()) {
256- return true ;
242+ for (OutboundMessage msg : outgoingRequests ) {
243+ if (msg .canWrite (this )) {
244+ return true ;
245+ }
246+ }
257247 }
258248 return false ;
259249 }
@@ -282,25 +272,19 @@ protected void onConnected() {
282272 connected = true ;
283273 lastTimeRecv = lastTimeSend = System .currentTimeMillis ();
284274 readThread = new ReadThread (getConnection ().getLink ().getLinkName () + " Reader" );
285- writeThread = new WriteThread (getConnection ().getLink ().getLinkName () + " Writer" );
286275 readThread .start ();
276+ Thread .yield ();
277+ writeThread = new WriteThread (getConnection ().getLink ().getLinkName () + " Writer" );
287278 writeThread .start ();
288279 }
289280
290281 /**
291282 * Clear the outgoing queues and waits for the the read and write threads to exit.
292283 */
293284 protected void onDisconnected () {
294- synchronized (outgoingMutex ) {
295- outgoingRequests .clear ();
296- outgoingResponses .clear ();
297- outgoingMutex .notifyAll ();
298- }
299- try {
300- writeThread .join ();
301- } catch (Exception x ) {
302- debug (getPath (), x );
303- }
285+ outgoingRequests .clear ();
286+ outgoingResponses .clear ();
287+ notifyOutgoing ();
304288 try {
305289 readThread .join ();
306290 } catch (Exception x ) {
@@ -319,18 +303,21 @@ protected void onDisconnecting() {
319303 }
320304 connected = false ;
321305 notifyOutgoing ();
306+ try {
307+ writeThread .join ();
308+ } catch (Exception x ) {
309+ debug (getPath (), x );
310+ }
311+ //Attempt to exit cleanly, try to get acks for sent messages.
312+ waitForAcks (1000 );
322313 }
323314
324315 protected void requeueOutgoingRequest (OutboundMessage arg ) {
325- synchronized (outgoingMutex ) {
326- outgoingRequests .add (arg );
327- }
316+ outgoingRequests .add (arg );
328317 }
329318
330319 protected void requeueOutgoingResponse (OutboundMessage arg ) {
331- synchronized (outgoingMutex ) {
332- outgoingResponses .add (arg );
333- }
320+ outgoingResponses .add (arg );
334321 }
335322
336323 /**
@@ -385,6 +372,26 @@ private void verifyLastSend() throws IOException {
385372 }
386373 }
387374
375+ /* Try to exit cleanly, wait for all acks for sent messages. */
376+ private void waitForAcks (long timeout ) {
377+ long start = System .currentTimeMillis ();
378+ synchronized (outgoingMutex ) {
379+ while (getMissingAcks () > 0 ) {
380+ try {
381+ outgoingMutex .wait (500 );
382+ } catch (InterruptedException x ) {
383+ warn (getPath (), x );
384+ }
385+ if ((System .currentTimeMillis () - start ) > timeout ) {
386+ debug (debug () ? String
387+ .format ("waitForAcks timeout (%s / %s)" , ackRcvd , messageId )
388+ : null );
389+ break ;
390+ }
391+ }
392+ }
393+ }
394+
388395 ///////////////////////////////////////////////////////////////////////////
389396 // Inner Classes
390397 ///////////////////////////////////////////////////////////////////////////
@@ -400,6 +407,7 @@ private class ReadThread extends Thread {
400407 }
401408
402409 public void run () {
410+ debug ("Enter DSSession.ReadThread" );
403411 DSLinkConnection conn = getConnection ();
404412 try {
405413 while (connected ) {
@@ -415,6 +423,7 @@ public void run() {
415423 conn .connDown (DSException .makeMessage (x ));
416424 }
417425 }
426+ debug ("Exit DSSession.ReadThread" );
418427 }
419428 }
420429
@@ -430,6 +439,7 @@ private class WriteThread extends Thread {
430439
431440 public void run () {
432441 DSLinkConnection conn = getConnection ();
442+ debug ("Enter DSSession.WriteThread" );
433443 try {
434444 while (connected ) {
435445 verifyLastRead ();
@@ -454,6 +464,7 @@ public void run() {
454464 conn .connDown (DSException .makeMessage (x ));
455465 }
456466 }
467+ debug ("Exit DSSession.WriteThread" );
457468 }
458469 }
459470
0 commit comments