Skip to content

Commit 898531f

Browse files
committed
flush after every command rather than every frame
1 parent 445b5a0 commit 898531f

File tree

5 files changed

+23
-1
lines changed

5 files changed

+23
-1
lines changed

src/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ public void transmit(AMQChannel channel) throws IOException {
183183
connection.writeFrame(frame);
184184
}
185185
}
186+
187+
connection.flush();
188+
186189
}
187190

188191
@Override public String toString() {

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,13 @@ public void writeFrame(Frame f) throws IOException {
409409
_heartbeatSender.signalActivity();
410410
}
411411

412+
/**
413+
* Public API - flush the output buffers
414+
*/
415+
public void flush() throws IOException {
416+
_frameHandler.flush();
417+
}
418+
412419
private static int negotiatedMaxValue(int clientValue, int serverValue) {
413420
return (clientValue == 0 || serverValue == 0) ?
414421
Math.max(clientValue, serverValue) :

src/com/rabbitmq/client/impl/FrameHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ public interface FrameHandler {
7272
*/
7373
void writeFrame(Frame frame) throws IOException;
7474

75+
/**
76+
* Flush the underlying data connection.
77+
*
78+
* @throws IOException if there is a problem accessing the connection
79+
*/
80+
void flush() throws IOException;
81+
7582
/**
7683
* Close the underlying data connection (complaint not permitted).
7784
*/

src/com/rabbitmq/client/impl/HeartbeatSender.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public void run() {
130130

131131
if (now > (lastActivityTime + this.heartbeatNanos)) {
132132
frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
133+
frameHandler.flush();
133134
}
134135
} catch (IOException e) {
135136
// ignore

src/com/rabbitmq/client/impl/SocketFrameHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,16 @@ public Frame readFrame() throws IOException {
152152
public void writeFrame(Frame frame) throws IOException {
153153
synchronized (_outputStream) {
154154
frame.writeTo(_outputStream);
155-
_outputStream.flush();
156155
}
157156
}
158157

158+
public void flush() throws IOException {
159+
_outputStream.flush();
160+
}
161+
159162
public void close() {
160163
try {
164+
flush();
161165
_socket.close();
162166
} catch (IOException ioe) {
163167
// Ignore.

0 commit comments

Comments
 (0)