Skip to content

Commit aca1b1f

Browse files
author
Simon MacMullen
committed
Merge bug22947 into default.
2 parents b450b96 + df77dfc commit aca1b1f

19 files changed

+206
-204
lines changed

build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ test.javac.out=${build.out}/test/classes
1313
test.src.home=test/src
1414
java-jvm-1.4=c:/Program Files/java/j2re1.4.2_14/bin/java
1515
sibling.codegen.dir=../rabbitmq-codegen/
16-
spec.version=0.8
16+
spec.version=0.9.1
1717
bundle.out=${build.out}/bundle
1818
javadoc.out=build/doc/api
1919
python.bin=python

codegen.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ def printHeader():
150150
public static class PROTOCOL {"""
151151
print " public static final int MAJOR = %i;" % spec.major
152152
print " public static final int MINOR = %i;" % spec.minor
153+
print " public static final int REVISION = %i;" % spec.revision
153154
print " public static final int PORT = %i;" % spec.port
154155
print " }"
155156

src/com/rabbitmq/client/BasicProperties.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,6 @@ public interface BasicProperties {
114114
*/
115115
public abstract String getAppId();
116116

117-
/**
118-
* Retrieve the value in the clusterId field.
119-
* @return clusterId field, or null if the field has not been set.
120-
*/
121-
public abstract String getClusterId();
122-
123117
/**
124118
* Set the contentType field, or null indicating the field is not set
125119
* @param contentType the value to set the field to
@@ -198,9 +192,4 @@ public interface BasicProperties {
198192
*/
199193
public abstract void setAppId(String appId);
200194

201-
/**
202-
* Set the clusterId field, or null indicating the field is not set
203-
* @param clusterId the value to set the field to
204-
*/
205-
public abstract void setClusterId(String clusterId);
206195
}

src/com/rabbitmq/client/Channel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
478478
* @see com.rabbitmq.client.AMQP.Basic.Consume
479479
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
480480
* @see #basicAck
481-
* @see #basicConsume(String,boolean, String,boolean,boolean, Consumer)
481+
* @see #basicConsume(String,boolean, String,boolean,boolean, Map, Consumer)
482482
*/
483483
String basicConsume(String queue, Consumer callback) throws IOException;
484484

@@ -492,7 +492,7 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
492492
* @throws java.io.IOException if an error is encountered
493493
* @see com.rabbitmq.client.AMQP.Basic.Consume
494494
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
495-
* @see #basicConsume(String,boolean, String,boolean,boolean, Consumer)
495+
* @see #basicConsume(String,boolean, String,boolean,boolean, Map, Consumer)
496496
*/
497497
String basicConsume(String queue, boolean noAck, Consumer callback) throws IOException;
498498

@@ -506,7 +506,7 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
506506
* @throws java.io.IOException if an error is encountered
507507
* @see com.rabbitmq.client.AMQP.Basic.Consume
508508
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
509-
* @see #basicConsume(String,boolean, String,boolean,boolean, Consumer)
509+
* @see #basicConsume(String,boolean, String,boolean,boolean, Map, Consumer)
510510
*/
511511
String basicConsume(String queue, boolean noAck, String consumerTag, Consumer callback) throws IOException;
512512

@@ -524,7 +524,7 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
524524
* @see com.rabbitmq.client.AMQP.Basic.Consume
525525
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
526526
*/
527-
String basicConsume(String queue, boolean noAck, String consumerTag, boolean noLocal, boolean exclusive, Consumer callback) throws IOException;
527+
String basicConsume(String queue, boolean noAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> filter, Consumer callback) throws IOException;
528528

529529
/**
530530
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}

src/com/rabbitmq/client/Connection.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,6 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
9898
*/
9999
Map<String, Object> getClientProperties();
100100

101-
/**
102-
* Retrieve the known hosts.
103-
* @return an array of addresses for all hosts that came back in the initial {@link com.rabbitmq.client.AMQP.Connection.OpenOk} open-ok method
104-
*/
105-
Address[] getKnownHosts();
106-
107101
/**
108102
* Retrieve the server properties.
109103
* @return a map of the server properties. This typically includes the product name and version of the server.

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 14 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -361,53 +361,25 @@ protected void configureSocket(Socket socket) throws IOException{
361361
socket.setTcpNoDelay(true);
362362
}
363363

364-
private Connection newConnection(Address[] addresses,
365-
int maxRedirects,
366-
Map<Address,Integer> redirectAttempts)
364+
/**
365+
* Create a new broker connection
366+
* @param addrs an array of known broker addresses (hostname/port pairs) to try in order
367+
* @return an interface to the connection
368+
* @throws IOException if it encounters a problem
369+
*/
370+
public Connection newConnection(Address[] addrs)
367371
throws IOException
368372
{
369373
IOException lastException = null;
370-
for (Address address : addresses) {
371-
Address[] lastKnownAddresses = new Address[0];
374+
for (Address addr : addrs) {
372375
try {
373-
while(true) {
374-
FrameHandler frameHandler = createFrameHandler(address);
375-
Integer redirectCount = redirectAttempts.get(address);
376-
if (redirectCount == null)
377-
redirectCount = 0;
378-
boolean allowRedirects = redirectCount < maxRedirects;
379-
try {
380-
AMQConnection conn = new AMQConnection(this,
381-
frameHandler);
382-
conn.start(!allowRedirects);
383-
return conn;
384-
} catch (RedirectException e) {
385-
if (!allowRedirects) {
386-
//this should never happen with a well-behaved server
387-
throw new IOException("server ignored 'insist'");
388-
} else {
389-
redirectAttempts.put(address, redirectCount+1);
390-
lastKnownAddresses = e.getKnownAddresses();
391-
address = e.getAddress();
392-
//TODO: we may want to log redirection attempts.
393-
}
394-
}
395-
}
376+
FrameHandler frameHandler = createFrameHandler(addr);
377+
AMQConnection conn = new AMQConnection(this,
378+
frameHandler);
379+
conn.start();
380+
return conn;
396381
} catch (IOException e) {
397382
lastException = e;
398-
if (lastKnownAddresses.length > 0) {
399-
// If there aren't any, don't bother trying, since
400-
// a recursive call with empty lastKnownAddresses
401-
// will cause our lastException to be stomped on
402-
// by an uninformative IOException. See bug 16273.
403-
try {
404-
return newConnection(lastKnownAddresses,
405-
maxRedirects,
406-
redirectAttempts);
407-
} catch (IOException e1) {
408-
lastException = e1;
409-
}
410-
}
411383
}
412384
}
413385

@@ -425,10 +397,7 @@ private Connection newConnection(Address[] addresses,
425397
*/
426398
public Connection newConnection() throws IOException {
427399
return newConnection(new Address[] {
428-
new Address(getHost(), getPort())
429-
},
430-
0,
431-
new HashMap<Address,Integer>());
400+
new Address(getHost(), getPort())});
432401
}
433402

434403

src/com/rabbitmq/client/RedirectException.java

Lines changed: 0 additions & 73 deletions
This file was deleted.

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import com.rabbitmq.client.Connection;
4747
import com.rabbitmq.client.ConnectionFactory;
4848
import com.rabbitmq.client.MissedHeartbeatException;
49-
import com.rabbitmq.client.RedirectException;
5049
import com.rabbitmq.client.ShutdownSignalException;
5150
import com.rabbitmq.utility.BlockingCell;
5251
import com.rabbitmq.utility.Utility;
@@ -155,9 +154,6 @@ public void ensureIsOpen()
155154
*/
156155
private int _heartbeat;
157156

158-
/** Hosts retrieved from the connection.open-ok */
159-
private Address[] _knownHosts;
160-
161157
private final String _username, _password, _virtualHost;
162158
private final int _requestedChannelMax, _requestedFrameMax, _requestedHeartbeat;
163159
private final Map<String, Object> _clientProperties;
@@ -175,11 +171,6 @@ public int getPort() {
175171
return _frameHandler.getPort();
176172
}
177173

178-
/** {@inheritDoc} */
179-
public Address[] getKnownHosts() {
180-
return _knownHosts;
181-
}
182-
183174
public FrameHandler getFrameHandler(){
184175
return _frameHandler;
185176
}
@@ -236,12 +227,10 @@ public AMQConnection(ConnectionFactory factory,
236227
* Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then
237228
* calls Connection.Open and waits for the OpenOk. Sets heartbeat
238229
* and frame max values after tuning has taken place.
239-
* @param insist true if broker redirects are disallowed
240-
* @throws RedirectException if the server is redirecting us to a different host/port
241230
* @throws java.io.IOException if an error is encountered
242231
*/
243-
public void start(boolean insist)
244-
throws IOException, RedirectException
232+
public void start()
233+
throws IOException
245234
{
246235
// Make sure that the first thing we do is to send the header,
247236
// which should cause any socket errors to show up for us, rather
@@ -316,19 +305,11 @@ public void start(boolean insist)
316305
_channel0.transmit(new AMQImpl.Connection.TuneOk(channelMax,
317306
frameMax,
318307
heartbeat));
319-
308+
// 0.9.1: insist [on not being redirected] is deprecated, but
309+
// still in generated code; just pass a dummy value here
320310
Method res = _channel0.exnWrappingRpc(new AMQImpl.Connection.Open(_virtualHost,
321311
"",
322-
insist)).getMethod();
323-
if (res instanceof AMQP.Connection.Redirect) {
324-
AMQP.Connection.Redirect redirect = (AMQP.Connection.Redirect) res;
325-
throw new RedirectException(Address.parseAddress(redirect.getHost()),
326-
Address.parseAddresses(redirect.getKnownHosts()));
327-
} else {
328-
AMQP.Connection.OpenOk openOk = (AMQP.Connection.OpenOk) res;
329-
_knownHosts = Address.parseAddresses(openOk.getKnownHosts());
330-
}
331-
312+
false)).getMethod();
332313
return;
333314
}
334315

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
7676
* and this field can be deleted.
7777
*/
7878
@Deprecated
79-
private static final int TICKET = 1;
79+
private static final int TICKET = 0;
8080

8181
/**
8282
* Map from consumer tag to {@link Consumer} instance.
@@ -672,12 +672,12 @@ public String basicConsume(String queue, boolean noAck, String consumerTag,
672672
Consumer callback)
673673
throws IOException
674674
{
675-
return basicConsume(queue, noAck, consumerTag, false, false, callback);
675+
return basicConsume(queue, noAck, consumerTag, false, false, null, callback);
676676
}
677677

678678
/** Public API - {@inheritDoc} */
679679
public String basicConsume(String queue, boolean noAck, String consumerTag,
680-
boolean noLocal, boolean exclusive,
680+
boolean noLocal, boolean exclusive, Map<String, Object> filter,
681681
final Consumer callback)
682682
throws IOException
683683
{
@@ -702,7 +702,7 @@ public String transformReply(AMQCommand replyCommand) {
702702

703703
rpc(new Basic.Consume(TICKET, queue, consumerTag,
704704
noLocal, noAck, exclusive,
705-
false),
705+
false, filter),
706706
k);
707707

708708
try {

src/com/rabbitmq/client/impl/Frame.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,32 @@ public static void protocolVersionMismatch(DataInputStream is) throws IOExceptio
173173
}
174174

175175
try {
176-
int transportHigh = is.readUnsignedByte();
177-
int transportLow = is.readUnsignedByte();
178-
int serverMajor = is.readUnsignedByte();
179-
int serverMinor = is.readUnsignedByte();
180-
x = new MalformedFrameException("AMQP protocol version mismatch; we are version " + AMQP.PROTOCOL.MAJOR + "-" + AMQP.PROTOCOL.MINOR
181-
+ ", server is " + serverMajor + "-" + serverMinor + " with transport " + transportHigh + "." + transportLow);
176+
int[] signature = new int[4];
177+
178+
for (int i = 0; i < 4; i++) {
179+
signature[i] = is.readUnsignedByte();
180+
}
181+
182+
if (signature[0] == 1 &&
183+
signature[1] == 1 &&
184+
signature[2] == 8 &&
185+
signature[3] == 0) {
186+
x = new MalformedFrameException("AMQP protocol version mismatch; we are version " +
187+
AMQP.PROTOCOL.MAJOR + "-" + AMQP.PROTOCOL.MINOR + "-" + AMQP.PROTOCOL.REVISION +
188+
", server is 0-8");
189+
}
190+
else {
191+
String sig = "";
192+
for (int i = 0; i < 4; i++) {
193+
if (i != 0) sig += ",";
194+
sig += signature[i];
195+
}
196+
197+
x = new MalformedFrameException("AMQP protocol version mismatch; we are version " +
198+
AMQP.PROTOCOL.MAJOR + "-" + AMQP.PROTOCOL.MINOR + "-" + AMQP.PROTOCOL.REVISION +
199+
", server sent signature " + sig);
200+
}
201+
182202
} catch (IOException ex) {
183203
x = new MalformedFrameException("Invalid AMQP protocol header from server");
184204
}

0 commit comments

Comments
 (0)