@@ -55,10 +55,16 @@ final class Heartbeater {
5555
5656 private ScheduledFuture <?> future ;
5757
58+ private volatile long lastActivityTime ;
59+
5860 Heartbeater (FrameHandler frameHandler ) {
5961 this .frameHandler = frameHandler ;
6062 }
6163
64+ public void signalActivity () {
65+ this .lastActivityTime = System .nanoTime ();
66+ }
67+
6268 /**
6369 * Sets the heartbeat in seconds.
6470 */
@@ -77,7 +83,7 @@ public void setHeartbeat(int heartbeatSeconds) {
7783 if (heartbeatSeconds > 0 ) {
7884 ScheduledExecutorService executor = createExecutorIfNecessary ();
7985 ScheduledFuture <?> newFuture = executor .scheduleAtFixedRate (
80- new HeartbeatRunnable (), heartbeatSeconds ,
86+ new HeartbeatRunnable (heartbeatSeconds ), heartbeatSeconds ,
8187 heartbeatSeconds , TimeUnit .SECONDS );
8288
8389 synchronized (this .monitor ) {
@@ -121,9 +127,21 @@ public void shutdown() {
121127
122128 private class HeartbeatRunnable implements Runnable {
123129
130+ private static final long NANOS_IN_SECOND = 1000 * 1000 * 1000 ;
131+
132+ private final long heartbeatNanos ;
133+
134+ private HeartbeatRunnable (int heartbeatSeconds ) {
135+ this .heartbeatNanos = NANOS_IN_SECOND * heartbeatSeconds ;
136+ }
137+
124138 public void run () {
125139 try {
126- frameHandler .writeFrame (new Frame (AMQP .FRAME_HEARTBEAT , 0 ));
140+ long now = System .nanoTime ();
141+
142+ if (now > (lastActivityTime + this .heartbeatNanos )) {
143+ frameHandler .writeFrame (new Frame (AMQP .FRAME_HEARTBEAT , 0 ));
144+ }
127145 } catch (IOException e ) {
128146 // ignore
129147 }
0 commit comments