Skip to content

Commit 056333e

Browse files
author
Rob Harrop
committed
moved heartbeating into a separate thread
1 parent c062a6c commit 056333e

File tree

2 files changed

+146
-21
lines changed

2 files changed

+146
-21
lines changed

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
import java.util.Map;
3838
import java.util.HashMap;
3939
import java.util.concurrent.TimeoutException;
40+
import java.util.concurrent.ScheduledExecutorService;
41+
import java.util.concurrent.Executors;
42+
import java.util.concurrent.TimeUnit;
4043

4144
import com.rabbitmq.client.AMQP;
4245
import com.rabbitmq.client.Address;
@@ -122,6 +125,9 @@ public static Map<String, Object> defaultClientProperties() {
122125
/** Flag indicating whether the client received Connection.Close message from the broker */
123126
private boolean _brokerInitiatedShutdown = false;
124127

128+
/** Manages heartbeats for this connection */
129+
private final Heartbeater heartbeater;
130+
125131
/**
126132
* Protected API - respond, in the driver thread, to a ShutdownSignal.
127133
* @param channel the channel to disconnect
@@ -211,6 +217,7 @@ public AMQConnection(ConnectionFactory factory,
211217
_clientProperties = new HashMap<String, Object>(factory.getClientProperties());
212218

213219
this.factory = factory;
220+
this.heartbeater = new Heartbeater(frameHandler);
214221
_frameHandler = frameHandler;
215222
_running = true;
216223
_frameMax = 0;
@@ -349,10 +356,11 @@ public int getHeartbeat() {
349356
*/
350357
public void setHeartbeat(int heartbeat) {
351358
try {
359+
this.heartbeater.setHeartbeat(heartbeat);
360+
352361
// Divide by four to make the maximum unwanted delay in
353362
// sending a timeout be less than a quarter of the
354363
// timeout setting.
355-
_heartbeat = heartbeat;
356364
_frameHandler.setTimeout(heartbeat * 1000 / 4);
357365
} catch (SocketException se) {
358366
// should do more here?
@@ -416,7 +424,7 @@ private class MainLoop extends Thread {
416424
try {
417425
while (_running) {
418426
Frame frame = readFrame();
419-
maybeSendHeartbeat();
427+
420428
if (frame != null) {
421429
_missedHeartbeats = 0;
422430
if (frame.type == AMQP.FRAME_HEARTBEAT) {
@@ -459,25 +467,6 @@ private class MainLoop extends Thread {
459467
}
460468
}
461469

462-
private static final long NANOS_IN_SECOND = 1000 * 1000 * 1000;
463-
464-
/**
465-
* Private API - Checks lastActivityTime and heartbeat, sending a
466-
* heartbeat frame if conditions are right.
467-
*/
468-
public void maybeSendHeartbeat() throws IOException {
469-
if (_heartbeat == 0) {
470-
// No heartbeating.
471-
return;
472-
}
473-
474-
long now = System.nanoTime();
475-
if (now > (_lastActivityTime + (_heartbeat * NANOS_IN_SECOND))) {
476-
_lastActivityTime = now;
477-
writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
478-
}
479-
}
480-
481470
/**
482471
* Private API - Called when a frame-read operation times out. Checks to
483472
* see if too many heartbeats have been missed, and if so, throws
@@ -607,6 +596,10 @@ public ShutdownSignalException shutdown(Object reason,
607596
if (isOpen())
608597
_shutdownCause = sse;
609598
}
599+
600+
// stop any heartbeating
601+
this.heartbeater.shutdown();
602+
610603
_channel0.processShutdownSignal(sse, !initiatedByApplication, notifyRpc);
611604
_channelManager.handleSignal(sse);
612605
return sse;
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.impl;
33+
34+
import com.rabbitmq.client.AMQP;
35+
36+
import java.util.concurrent.ScheduledExecutorService;
37+
import java.util.concurrent.Executors;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.ScheduledFuture;
40+
import java.io.IOException;
41+
42+
/**
43+
* Manages heartbeats for a {@link AMQConnection}.
44+
* <p/>
45+
* Heartbeats are sent in a dedicated thread that is separate
46+
* from the main loop thread used for the connection.
47+
*/
48+
final class Heartbeater {
49+
50+
private final Object monitor = new Object();
51+
52+
private final FrameHandler frameHandler;
53+
54+
private ScheduledExecutorService executor;
55+
56+
private ScheduledFuture<?> future;
57+
58+
Heartbeater(FrameHandler frameHandler) {
59+
this.frameHandler = frameHandler;
60+
}
61+
62+
/**
63+
* Sets the heartbeat in seconds.
64+
*/
65+
public void setHeartbeat(int heartbeatSeconds) {
66+
ScheduledFuture<?> previousFuture = null;
67+
synchronized (this.monitor) {
68+
previousFuture = this.future;
69+
this.future = null;
70+
}
71+
72+
if (previousFuture != null) {
73+
previousFuture.cancel(true);
74+
}
75+
76+
77+
if (heartbeatSeconds > 0) {
78+
ScheduledExecutorService executor = createExecutorIfNecessary();
79+
ScheduledFuture<?> newFuture = executor.scheduleAtFixedRate(
80+
new HeartbeatRunnable(), heartbeatSeconds,
81+
heartbeatSeconds, TimeUnit.SECONDS);
82+
83+
synchronized (this.monitor) {
84+
this.future = newFuture;
85+
}
86+
}
87+
88+
}
89+
90+
private ScheduledExecutorService createExecutorIfNecessary() {
91+
synchronized (this.monitor) {
92+
if (this.executor == null) {
93+
this.executor = Executors.newSingleThreadScheduledExecutor();
94+
}
95+
return this.executor;
96+
}
97+
}
98+
99+
/**
100+
* Shutdown the heartbeat process, if any.
101+
*/
102+
public void shutdown() {
103+
ScheduledFuture<?> future;
104+
ScheduledExecutorService executor;
105+
106+
synchronized (this.monitor) {
107+
future = this.future;
108+
executor = this.executor;
109+
this.future = null;
110+
this.executor = null;
111+
}
112+
113+
if (future != null) {
114+
future.cancel(true);
115+
}
116+
117+
if (executor != null) {
118+
executor.shutdown();
119+
}
120+
}
121+
122+
private class HeartbeatRunnable implements Runnable {
123+
124+
public void run() {
125+
try {
126+
frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
127+
} catch (IOException e) {
128+
// ignore
129+
}
130+
}
131+
}
132+
}

0 commit comments

Comments
 (0)