3939import java .util .concurrent .ScheduledFuture ;
4040import java .io .IOException ;
4141
42+ import static java .util .concurrent .TimeUnit .*;
43+
4244/**
4345 * Manages heartbeats for a {@link AMQConnection}.
4446 * <p/>
@@ -69,7 +71,7 @@ public void signalActivity() {
6971 * Sets the heartbeat in seconds.
7072 */
7173 public void setHeartbeat (int heartbeatSeconds ) {
72- ScheduledFuture <?> previousFuture = null ;
74+ ScheduledFuture <?> previousFuture ;
7375 synchronized (this .monitor ) {
7476 previousFuture = this .future ;
7577 this .future = null ;
@@ -79,12 +81,14 @@ public void setHeartbeat(int heartbeatSeconds) {
7981 previousFuture .cancel (true );
8082 }
8183
82-
8384 if (heartbeatSeconds > 0 ) {
85+ // wake every heartbeatSeconds / 2 to avoid the worst case
86+ // where the last activity comes just after the last heartbeat
87+ long interval = SECONDS .toMillis (heartbeatSeconds ) / 2 ;
8488 ScheduledExecutorService executor = createExecutorIfNecessary ();
89+ Runnable task = new HeartbeatRunnable (interval );
8590 ScheduledFuture <?> newFuture = executor .scheduleAtFixedRate (
86- new HeartbeatRunnable (heartbeatSeconds ), heartbeatSeconds ,
87- heartbeatSeconds , TimeUnit .SECONDS );
91+ task , interval , interval , TimeUnit .MILLISECONDS );
8892
8993 synchronized (this .monitor ) {
9094 this .future = newFuture ;
@@ -127,19 +131,17 @@ public void shutdown() {
127131
128132 private class HeartbeatRunnable implements Runnable {
129133
130- private static final long NANOS_IN_SECOND = 1000 * 1000 * 1000 ;
131-
132134 private final long heartbeatNanos ;
133135
134- private HeartbeatRunnable (int heartbeatSeconds ) {
135- this .heartbeatNanos = NANOS_IN_SECOND * heartbeatSeconds ;
136+ private HeartbeatRunnable (long heartbeatMillis ) {
137+ this .heartbeatNanos = MILLISECONDS . toNanos ( heartbeatMillis ) ;
136138 }
137139
138140 public void run () {
139141 try {
140142 long now = System .nanoTime ();
141143
142- if (now > (lastActivityTime + this .heartbeatNanos )) {
144+ if (now > (lastActivityTime + this .heartbeatNanos )) {
143145 frameHandler .writeFrame (new Frame (AMQP .FRAME_HEARTBEAT , 0 ));
144146 }
145147 } catch (IOException e ) {
0 commit comments