44import com .acuity .iot .dsa .dslink .protocol .responder .DSResponder ;
55import com .acuity .iot .dsa .dslink .transport .DSTransport ;
66import java .io .IOException ;
7- import java .util .LinkedList ;
8- import java .util .List ;
7+ import java .util .concurrent .ConcurrentLinkedQueue ;
98import org .iot .dsa .conn .DSConnection ;
109import org .iot .dsa .conn .DSIConnected ;
1110import org .iot .dsa .dslink .DSIRequester ;
@@ -49,8 +48,8 @@ public abstract class DSSession extends DSNode implements DSIConnected {
4948 private int messageId = 0 ;
5049 private int nextMessage = 1 ;
5150 private final Object outgoingMutex = new Object ();
52- private List <OutboundMessage > outgoingRequests = new LinkedList <OutboundMessage >();
53- private List <OutboundMessage > outgoingResponses = new LinkedList <OutboundMessage >();
51+ private ConcurrentLinkedQueue <OutboundMessage > outgoingRequests = new ConcurrentLinkedQueue <OutboundMessage >();
52+ private ConcurrentLinkedQueue <OutboundMessage > outgoingResponses = new ConcurrentLinkedQueue <OutboundMessage >();
5453 private DSInfo requesterAllowed = getInfo (REQUESTER_ALLOWED );
5554 private ReadThread readThread ;
5655 private WriteThread writeThread ;
@@ -78,10 +77,8 @@ public void enqueueOutgoingRequest(OutboundMessage arg) {
7877 if (!isRequesterAllowed ()) {
7978 throw new IllegalStateException ("Requester not allowed" );
8079 }
81- synchronized (outgoingMutex ) {
82- outgoingRequests .add (arg );
83- outgoingMutex .notify ();
84- }
80+ outgoingRequests .add (arg );
81+ notifyOutgoing ();
8582 }
8683 }
8784
@@ -90,10 +87,8 @@ public void enqueueOutgoingRequest(OutboundMessage arg) {
9087 */
9188 public void enqueueOutgoingResponse (OutboundMessage arg ) {
9289 if (connected ) {
93- synchronized (outgoingMutex ) {
94- outgoingResponses .add (arg );
95- outgoingMutex .notify ();
96- }
90+ outgoingResponses .add (arg );
91+ notifyOutgoing ();
9792 }
9893 }
9994
@@ -175,24 +170,14 @@ protected void declareDefaults() {
175170 * Can return null.
176171 */
177172 protected OutboundMessage dequeueOutgoingRequest () {
178- synchronized (outgoingMutex ) {
179- if (!outgoingRequests .isEmpty ()) {
180- return outgoingRequests .remove (0 );
181- }
182- }
183- return null ;
173+ return outgoingRequests .poll ();
184174 }
185175
186176 /**
187177 * Can return null.
188178 */
189179 protected OutboundMessage dequeueOutgoingResponse () {
190- synchronized (outgoingMutex ) {
191- if (!outgoingResponses .isEmpty ()) {
192- return outgoingResponses .remove (0 );
193- }
194- }
195- return null ;
180+ return outgoingResponses .poll ();
196181 }
197182
198183 /**
@@ -247,10 +232,18 @@ protected boolean hasSomethingToSend() {
247232 return false ;
248233 }
249234 if (!outgoingResponses .isEmpty ()) {
250- return true ;
235+ for (OutboundMessage msg : outgoingResponses ) {
236+ if (msg .canWrite (this )) {
237+ return true ;
238+ }
239+ }
251240 }
252241 if (!outgoingRequests .isEmpty ()) {
253- return true ;
242+ for (OutboundMessage msg : outgoingRequests ) {
243+ if (msg .canWrite (this )) {
244+ return true ;
245+ }
246+ }
254247 }
255248 return false ;
256249 }
@@ -289,11 +282,9 @@ protected void onConnected() {
289282 * Clear the outgoing queues and waits for the the read and write threads to exit.
290283 */
291284 protected void onDisconnected () {
292- synchronized (outgoingMutex ) {
293- outgoingRequests .clear ();
294- outgoingResponses .clear ();
295- outgoingMutex .notifyAll ();
296- }
285+ outgoingRequests .clear ();
286+ outgoingResponses .clear ();
287+ notifyOutgoing ();
297288 try {
298289 readThread .join ();
299290 } catch (Exception x ) {
@@ -322,15 +313,11 @@ protected void onDisconnecting() {
322313 }
323314
324315 protected void requeueOutgoingRequest (OutboundMessage arg ) {
325- synchronized (outgoingMutex ) {
326- outgoingRequests .add (arg );
327- }
316+ outgoingRequests .add (arg );
328317 }
329318
330319 protected void requeueOutgoingResponse (OutboundMessage arg ) {
331- synchronized (outgoingMutex ) {
332- outgoingResponses .add (arg );
333- }
320+ outgoingResponses .add (arg );
334321 }
335322
336323 /**
@@ -396,7 +383,8 @@ private void waitForAcks(long timeout) {
396383 warn (getPath (), x );
397384 }
398385 if ((System .currentTimeMillis () - start ) > timeout ) {
399- debug (debug () ? String .format ("witForAcks timeout (%s / %s)" ,ackRcvd ,messageId )
386+ debug (debug () ? String
387+ .format ("waitForAcks timeout (%s / %s)" , ackRcvd , messageId )
400388 : null );
401389 break ;
402390 }
0 commit comments