Skip to content

Commit ee4e196

Browse files
committed
Get rid of read retry in NIO for plain connection
A FrameBuilder class has been introduced to be able to read a frame in several network reads. The frame builder keeps track of the partial frame state. This avoids pulling from the network when the channel doesn't come back with any value from a network read, which is against the NIO philosophy. References #319
1 parent 638c5c1 commit ee4e196

File tree

5 files changed

+251
-13
lines changed

5 files changed

+251
-13
lines changed
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.impl.nio;
17+
18+
import com.rabbitmq.client.AMQP;
19+
import com.rabbitmq.client.MalformedFrameException;
20+
import com.rabbitmq.client.impl.Frame;
21+
22+
import java.io.IOException;
23+
import java.nio.ByteBuffer;
24+
import java.nio.channels.ReadableByteChannel;
25+
26+
/**
27+
*
28+
*/
29+
public class FrameBuilder {
30+
31+
private static final int PAYLOAD_OFFSET = 1 /* type */ + 2 /* channel */ + 4 /* payload size */;
32+
33+
private final ReadableByteChannel channel;
34+
35+
private final ByteBuffer buffer;
36+
37+
private int frameType;
38+
private int frameChannel;
39+
private byte [] framePayload;
40+
41+
private int bytesRead = 0;
42+
43+
private final int [] frameBuffer = new int[3];
44+
45+
public FrameBuilder(ReadableByteChannel channel, ByteBuffer buffer) {
46+
this.channel = channel;
47+
this.buffer = buffer;
48+
}
49+
50+
public Frame readFrame() throws IOException {
51+
while(readFromNetworkIfNecessary()) {
52+
if (bytesRead == 0) {
53+
// type
54+
// FIXME check first byte isn't 'A' and thus a header indicating protocol version mismatch
55+
frameType = readFromBuffer();
56+
} else if (bytesRead == 1) {
57+
// channel 1/2
58+
frameBuffer[0] = readFromBuffer();
59+
} else if (bytesRead == 2) {
60+
// channel 2/2
61+
frameChannel = (frameBuffer[0] << 8) + (readFromBuffer() << 0);
62+
} else if (bytesRead == 3) {
63+
// payload size 1/4
64+
frameBuffer[0] = readFromBuffer();
65+
} else if (bytesRead == 4) {
66+
// payload size 2/4
67+
frameBuffer[1] = readFromBuffer();
68+
} else if (bytesRead == 5) {
69+
// payload size 3/4
70+
frameBuffer[2] = readFromBuffer();
71+
} else if (bytesRead == 6) {
72+
// payload size 4/4
73+
int framePayloadSize = ((frameBuffer[0] << 24) + (frameBuffer[1] << 16) + (frameBuffer[2] << 8) + (readFromBuffer() << 0));
74+
framePayload = new byte[framePayloadSize];
75+
} else if (bytesRead >= PAYLOAD_OFFSET && bytesRead < framePayload.length + PAYLOAD_OFFSET) {
76+
framePayload[bytesRead - PAYLOAD_OFFSET] = (byte) readFromBuffer();
77+
} else if (bytesRead == framePayload.length + PAYLOAD_OFFSET) {
78+
int frameEndMarker = readFromBuffer();
79+
if (frameEndMarker != AMQP.FRAME_END) {
80+
throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);
81+
}
82+
bytesRead = 0;
83+
return new Frame(frameType, frameChannel, framePayload);
84+
} else {
85+
throw new IllegalStateException("Number of read bytes incorrect: " + bytesRead);
86+
}
87+
bytesRead++;
88+
}
89+
return null;
90+
}
91+
92+
private int read() throws IOException {
93+
return NioHelper.read(channel, buffer);
94+
}
95+
96+
private int readFromBuffer() {
97+
return buffer.get() & 0xff;
98+
}
99+
100+
private boolean readFromNetworkIfNecessary() throws IOException {
101+
if(!buffer.hasRemaining()) {
102+
buffer.clear();
103+
int read = read();
104+
buffer.flip();
105+
if (read > 0) {
106+
return true;
107+
} else {
108+
return false;
109+
}
110+
} else {
111+
return true;
112+
}
113+
}
114+
}

src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -149,22 +149,29 @@ public void run() {
149149
state.prepareForReadSequence();
150150

151151
while (state.continueReading()) {
152-
Frame frame = Frame.readFrom(inputStream);
152+
final Frame frame;
153+
if (state.frameBuilder == null) {
154+
frame = Frame.readFrom(inputStream);
155+
} else {
156+
frame = state.frameBuilder.readFrame();
157+
}
153158

154-
try {
155-
boolean noProblem = state.getConnection().handleReadFrame(frame);
156-
if (noProblem && (!state.getConnection().isRunning() || state.getConnection().hasBrokerInitiatedShutdown())) {
157-
// looks like the frame was Close-Ok or Close
158-
dispatchShutdownToConnection(state);
159+
if (frame != null) {
160+
try {
161+
boolean noProblem = state.getConnection().handleReadFrame(frame);
162+
if (noProblem && (!state.getConnection().isRunning() || state.getConnection().hasBrokerInitiatedShutdown())) {
163+
// looks like the frame was Close-Ok or Close
164+
dispatchShutdownToConnection(state);
165+
key.cancel();
166+
break;
167+
}
168+
} catch (Throwable ex) {
169+
// problem during frame processing, tell connection, and
170+
// we can stop for this channel
171+
handleIoError(state, ex);
159172
key.cancel();
160173
break;
161174
}
162-
} catch (Throwable ex) {
163-
// problem during frame processing, tell connection, and
164-
// we can stop for this channel
165-
handleIoError(state, ex);
166-
key.cancel();
167-
break;
168175
}
169176
}
170177

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ public class SocketChannelFrameHandlerState {
7777

7878
final DataInputStream inputStream;
7979

80+
final FrameBuilder frameBuilder;
81+
8082
public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioLoopsState, NioParams nioParams, SSLEngine sslEngine) {
8183
this.channel = channel;
8284
this.readSelectorState = nioLoopsState.readSelectorState;
@@ -98,6 +100,8 @@ public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioL
98100
new ByteBufferInputStream(channel, plainIn)
99101
);
100102

103+
this.frameBuilder = new FrameBuilder(channel, plainIn);
104+
101105
} else {
102106
this.ssl = true;
103107
this.plainOut = ByteBuffer.allocate(sslEngine.getSession().getApplicationBufferSize());
@@ -111,6 +115,8 @@ public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioL
111115
this.inputStream = new DataInputStream(
112116
new SslEngineByteBufferInputStream(sslEngine, plainIn, cipherIn, channel)
113117
);
118+
119+
this.frameBuilder = null;
114120
}
115121

116122
}

src/test/java/com/rabbitmq/client/test/ClientTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.rabbitmq.client.test;
1818

19+
import com.rabbitmq.client.impl.nio.FrameBuilder;
1920
import com.rabbitmq.utility.IntAllocatorTests;
2021
import org.junit.runner.RunWith;
2122
import org.junit.runners.Suite;
@@ -49,7 +50,8 @@
4950
ConnectionFactoryTest.class,
5051
RecoveryAwareAMQConnectionFactoryTest.class,
5152
RpcTest.class,
52-
RecoveryDelayHandlerTest.class
53+
RecoveryDelayHandlerTest.class,
54+
FrameBuilder.class
5355
})
5456
public class ClientTests {
5557

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.test;
17+
18+
import com.rabbitmq.client.AMQP;
19+
import com.rabbitmq.client.impl.Frame;
20+
import com.rabbitmq.client.impl.nio.FrameBuilder;
21+
import org.junit.Test;
22+
import org.junit.runner.RunWith;
23+
import org.mockito.Mock;
24+
import org.mockito.junit.MockitoJUnitRunner;
25+
26+
import java.io.IOException;
27+
import java.nio.ByteBuffer;
28+
import java.nio.channels.ReadableByteChannel;
29+
30+
import static org.hamcrest.Matchers.is;
31+
import static org.hamcrest.Matchers.notNullValue;
32+
import static org.hamcrest.Matchers.nullValue;
33+
import static org.junit.Assert.assertThat;
34+
35+
/**
36+
*
37+
*/
38+
@RunWith(MockitoJUnitRunner.class)
39+
public class FrameBuilderTest {
40+
41+
@Mock
42+
ReadableByteChannel channel;
43+
44+
ByteBuffer buffer;
45+
46+
FrameBuilder builder;
47+
48+
@Test
49+
public void buildFrameInOneGo() throws IOException {
50+
buffer = ByteBuffer.wrap(new byte[]{1, 0, 0, 0, 0, 0, 3, 1, 2, 3, end()});
51+
builder = new FrameBuilder(channel, buffer);
52+
Frame frame = builder.readFrame();
53+
assertThat(frame, notNullValue());
54+
assertThat(frame.type, is(1));
55+
assertThat(frame.channel, is(0));
56+
assertThat(frame.getPayload().length, is(3));
57+
}
58+
59+
@Test
60+
public void buildFramesInOneGo() throws IOException {
61+
byte[] frameContent = new byte[]{1, 0, 0, 0, 0, 0, 3, 1, 2, 3, end()};
62+
int nbFrames = 13;
63+
byte[] frames = new byte[frameContent.length * nbFrames];
64+
for(int i = 0; i < nbFrames; i++) {
65+
for (int j = 0; j < frameContent.length; j++) {
66+
frames[i * frameContent.length + j] = frameContent[j];
67+
}
68+
}
69+
buffer = ByteBuffer.wrap(frames);
70+
builder = new FrameBuilder(channel, buffer);
71+
int frameCount = 0;
72+
Frame frame;
73+
while ((frame = builder.readFrame()) != null) {
74+
assertThat(frame, notNullValue());
75+
assertThat(frame.type, is(1));
76+
assertThat(frame.channel, is(0));
77+
assertThat(frame.getPayload().length, is(3));
78+
frameCount++;
79+
}
80+
assertThat(frameCount, is(nbFrames));
81+
}
82+
83+
@Test
84+
public void buildFrameInSeveralCalls() throws IOException {
85+
buffer = ByteBuffer.wrap(new byte[]{1, 0, 0, 0, 0, 0, 3, 1, 2});
86+
builder = new FrameBuilder(channel, buffer);
87+
Frame frame = builder.readFrame();
88+
assertThat(frame, nullValue());
89+
90+
buffer.clear();
91+
buffer.put(b(3)).put(end());
92+
buffer.flip();
93+
94+
frame = builder.readFrame();
95+
assertThat(frame, notNullValue());
96+
assertThat(frame.type, is(1));
97+
assertThat(frame.channel, is(0));
98+
assertThat(frame.getPayload().length, is(3));
99+
}
100+
101+
byte b(int b) {
102+
return (byte) b;
103+
}
104+
105+
byte end() {
106+
return (byte) AMQP.FRAME_END;
107+
}
108+
109+
}

0 commit comments

Comments
 (0)