11package com .acuity .iot .dsa .dslink .protocol ;
22
33import com .acuity .iot .dsa .dslink .protocol .message .OutboundMessage ;
4+ import com .acuity .iot .dsa .dslink .protocol .responder .DSResponder ;
45import com .acuity .iot .dsa .dslink .transport .DSTransport ;
56import java .io .IOException ;
67import java .util .LinkedList ;
@@ -21,18 +22,20 @@ public abstract class DSSession extends DSNode {
2122 // Class Fields
2223 ///////////////////////////////////////////////////////////////////////////
2324
24- private static final int MAX_MSG_ID = 2147483647 ;
25+ private static final int MAX_MSG_ID = Integer . MAX_VALUE ;
2526 private static final long MSG_TIMEOUT = 60000 ;
2627
2728 ///////////////////////////////////////////////////////////////////////////
2829 // Instance Fields
2930 ///////////////////////////////////////////////////////////////////////////
3031
32+ private int ackRcvd = -1 ;
33+ private int ackToSend = -1 ;
3134 private boolean connected = false ;
3235 private DSLinkConnection connection ;
33- private long lastRecv ;
34- private long lastSend ;
35- private int nextAck = - 1 ;
36+ private long lastTimeRecv ;
37+ private long lastTimeSend ;
38+ private int messageId = 0 ;
3639 private int nextMessage = 1 ;
3740 private Object outgoingMutex = new Object ();
3841 private List <OutboundMessage > outgoingRequests = new LinkedList <OutboundMessage >();
@@ -51,7 +54,7 @@ public DSSession(DSLinkConnection connection) {
5154 }
5255
5356 ///////////////////////////////////////////////////////////////////////////
54- // Methods
57+ // Public Methods
5558 ///////////////////////////////////////////////////////////////////////////
5659
5760 /**
@@ -96,37 +99,37 @@ public void enqueueOutgoingResponse(OutboundMessage arg) {
9699 }
97100 }
98101
99- public DSLinkConnection getConnection () {
100- return connection ;
101- }
102-
103102 /**
104- * Last ack received from the broker.
103+ * Last ack received from the broker, or -1 .
105104 */
106- public abstract long getLastAckRcvd ();
105+ public int getAckRcvd () {
106+ return ackRcvd ;
107+ }
107108
108109 /**
109- * The next ack id, or -1.
110+ * The next ack id to send , or -1.
110111 */
111- public synchronized int getNextAck () {
112- int ret = nextAck ;
113- nextAck = -1 ;
112+ public synchronized int getAckToSend () {
113+ int ret = ackToSend ;
114+ ackToSend = -1 ;
114115 return ret ;
115116 }
116117
118+ public DSLinkConnection getConnection () {
119+ return connection ;
120+ }
121+
117122 /**
118- * Returns the next new message id .
123+ * The current (last) message ID generated .
119124 */
120- public synchronized int getNextMessageId () {
121- int ret = nextMessage ;
122- if (++nextMessage > MAX_MSG_ID ) {
123- nextMessage = 1 ;
124- }
125- return ret ;
125+ public int getMessageId () {
126+ return messageId ;
126127 }
127128
128129 public abstract DSIRequester getRequester ();
129130
131+ public abstract DSResponder getResponder ();
132+
130133 public DSTransport getTransport () {
131134 return getConnection ().getTransport ();
132135 }
@@ -161,13 +164,13 @@ public void onDisconnect() {
161164 * implementation. A separate thread is spun off to manage writing.
162165 */
163166 public void run () {
164- lastRecv = lastSend = System .currentTimeMillis ();
167+ lastTimeRecv = lastTimeSend = System .currentTimeMillis ();
165168 new WriteThread (getConnection ().getLink ().getLinkName () + " Writer" ).start ();
166169 while (connected ) {
167170 try {
168171 verifyLastSend ();
169172 doRecvMessage ();
170- lastRecv = System .currentTimeMillis ();
173+ lastTimeRecv = System .currentTimeMillis ();
171174 } catch (Exception x ) {
172175 getTransport ().close ();
173176 if (connected ) {
@@ -178,16 +181,6 @@ public void run() {
178181 }
179182 }
180183
181- /**
182- * Call for each incoming message id that needs to be acked.
183- */
184- public synchronized void setNextAck (int nextAck ) {
185- if (nextAck > 0 ) {
186- this .nextAck = nextAck ;
187- notifyOutgoing ();
188- }
189- }
190-
191184 /**
192185 * Called when the broker signifies that requests are allowed.
193186 */
@@ -197,6 +190,10 @@ public void setRequesterAllowed() {
197190
198191 public abstract boolean shouldEndMessage ();
199192
193+ ///////////////////////////////////////////////////////////////////////////
194+ // Protected Methods
195+ ///////////////////////////////////////////////////////////////////////////
196+
200197 /**
201198 * Can return null.
202199 */
@@ -238,8 +235,23 @@ protected String getLogName() {
238235 return "Session" ;
239236 }
240237
238+ protected int getMissingAcks () {
239+ return messageId - ackRcvd - 1 ;
240+ }
241+
242+ /**
243+ * Returns the next new message id.
244+ */
245+ protected synchronized int getNextMessageId () {
246+ messageId = nextMessage ;
247+ if (++nextMessage > MAX_MSG_ID ) {
248+ nextMessage = 1 ;
249+ }
250+ return messageId ;
251+ }
252+
241253 protected boolean hasAckToSend () {
242- return nextAck > 0 ;
254+ return ackToSend > 0 ;
243255 }
244256
245257 protected boolean hasOutgoingRequests () {
@@ -254,9 +266,12 @@ protected boolean hasOutgoingResponses() {
254266 * Override point, this returns the result of hasMessagesToSend.
255267 */
256268 protected boolean hasSomethingToSend () {
257- if (nextAck > 0 ) {
269+ if (ackToSend >= 0 ) {
258270 return true ;
259271 }
272+ if (waitingForAcks ()) {
273+ return false ;
274+ }
260275 if (!outgoingResponses .isEmpty ()) {
261276 return true ;
262277 }
@@ -279,14 +294,63 @@ protected void notifyOutgoing() {
279294 }
280295 }
281296
297+ protected int numOutgoingRequests () {
298+ return outgoingRequests .size ();
299+ }
300+
301+ protected int numOutgoingResponses () {
302+ return outgoingResponses .size ();
303+ }
304+
305+ protected void requeueOutgoingRequest (OutboundMessage arg ) {
306+ synchronized (outgoingMutex ) {
307+ outgoingRequests .add (arg );
308+ }
309+ }
310+
311+ protected void requeueOutgoingResponse (OutboundMessage arg ) {
312+ synchronized (outgoingMutex ) {
313+ outgoingResponses .add (arg );
314+ }
315+ }
316+
317+ /**
318+ * Call for each incoming message id that needs to be acked.
319+ */
320+ protected void setAckRcvd (int ackRcvd ) {
321+ if (ackRcvd < this .ackRcvd ) {
322+ warn (warn () ? String .format ("Ack rcvd %s < last %s" , ackRcvd , this .ackRcvd ) : null );
323+ }
324+ this .ackRcvd = ackRcvd ;
325+ notifyOutgoing ();
326+ }
327+
328+ /**
329+ * Call for each incoming message id that needs to be acked.
330+ */
331+ protected void setAckToSend (int ackToSend ) {
332+ if (ackToSend > 0 ) {
333+ this .ackToSend = ackToSend ;
334+ notifyOutgoing ();
335+ }
336+ }
337+
338+ protected boolean waitingForAcks () {
339+ return getMissingAcks () > 8 ;
340+ }
341+
342+ ///////////////////////////////////////////////////////////////////////////
343+ // Package / Private Methods
344+ ///////////////////////////////////////////////////////////////////////////
345+
282346 private void verifyLastRead () throws IOException {
283- if ((System .currentTimeMillis () - lastRecv ) > MSG_TIMEOUT ) {
347+ if ((System .currentTimeMillis () - lastTimeRecv ) > MSG_TIMEOUT ) {
284348 throw new IOException ("No message received in " + MSG_TIMEOUT + "ms" );
285349 }
286350 }
287351
288352 private void verifyLastSend () throws IOException {
289- if ((System .currentTimeMillis () - lastSend ) > MSG_TIMEOUT ) {
353+ if ((System .currentTimeMillis () - lastTimeSend ) > MSG_TIMEOUT ) {
290354 throw new IOException ("No message sent in " + MSG_TIMEOUT + "ms" );
291355 }
292356 }
@@ -320,7 +384,7 @@ public void run() {
320384 }
321385 }
322386 doSendMessage ();
323- lastSend = System .currentTimeMillis ();
387+ lastTimeSend = System .currentTimeMillis ();
324388 }
325389 } catch (Exception x ) {
326390 if (connected ) {
0 commit comments