Skip to content

Commit 34eea92

Browse files
committed
Make route return multiple streams
1 parent ee040af commit 34eea92

File tree

7 files changed

+65
-38
lines changed

7 files changed

+65
-38
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1240,7 +1240,7 @@ int getPort() {
12401240
return port;
12411241
}
12421242

1243-
public String route(String routingKey, String superStream) {
1243+
public List<String> route(String routingKey, String superStream) {
12441244
if (routingKey == null || superStream == null) {
12451245
throw new IllegalArgumentException("routing key and stream must not be null");
12461246
}
@@ -1263,7 +1263,7 @@ public String route(String routingKey, String superStream) {
12631263
bb.writeBytes(routingKey.getBytes(StandardCharsets.UTF_8));
12641264
bb.writeShort(superStream.length());
12651265
bb.writeBytes(superStream.getBytes(StandardCharsets.UTF_8));
1266-
OutstandingRequest<String> request = new OutstandingRequest<>(this.rpcTimeout);
1266+
OutstandingRequest<List<String>> request = new OutstandingRequest<>(this.rpcTimeout);
12671267
outstandingRequests.put(correlationId, request);
12681268
channel.writeAndFlush(bb);
12691269
request.block();

src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,20 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import com.rabbitmq.stream.Message;
17+
import java.util.Collections;
1718
import java.util.List;
1819
import java.util.concurrent.CopyOnWriteArrayList;
1920
import java.util.function.Function;
2021
import java.util.function.ToIntFunction;
22+
import java.util.stream.Collectors;
2123

2224
class HashRoutingStrategy implements RoutingStrategy {
2325

2426
private final Function<Message, String> routingKeyExtractor;
2527

2628
private final StreamEnvironment env;
2729

28-
private final String superStream;
29-
30-
private final List<String> partitions;
30+
private final List<List<String>> partitions;
3131

3232
private final ToIntFunction<String> hash;
3333

@@ -38,15 +38,16 @@ class HashRoutingStrategy implements RoutingStrategy {
3838
ToIntFunction<String> hash) {
3939
this.routingKeyExtractor = routingKeyExtractor;
4040
this.env = env;
41-
this.superStream = superStream;
4241
// TODO use async retry to get locator
4342
List<String> ps = this.env.locator().partitions(superStream);
44-
this.partitions = new CopyOnWriteArrayList<>(ps);
43+
this.partitions =
44+
new CopyOnWriteArrayList<>(
45+
ps.stream().map(Collections::singletonList).collect(Collectors.toList()));
4546
this.hash = hash;
4647
}
4748

4849
@Override
49-
public String route(Message message) {
50+
public List<String> route(Message message) {
5051
String routingKey = routingKeyExtractor.apply(message);
5152
int hashValue = hash.applyAsInt(routingKey);
5253
return this.partitions.get((hashValue & 0x7FFFFFFF) % this.partitions.size());

src/main/java/com/rabbitmq/stream/impl/RoutingKeyRoutingStrategy.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import com.rabbitmq.stream.Message;
17+
import java.util.List;
1718
import java.util.Map;
1819
import java.util.concurrent.ConcurrentHashMap;
1920
import java.util.function.Function;
@@ -22,7 +23,7 @@ class RoutingKeyRoutingStrategy implements RoutingStrategy {
2223

2324
private final Function<Message, String> routingKeyExtractor;
2425

25-
private final Map<String, String> routingKeysToStreams = new ConcurrentHashMap<>();
26+
private final Map<String, List<String>> routingKeysToStreams = new ConcurrentHashMap<>();
2627

2728
private final StreamEnvironment env;
2829

@@ -36,15 +37,15 @@ class RoutingKeyRoutingStrategy implements RoutingStrategy {
3637
}
3738

3839
@Override
39-
public String route(Message message) {
40+
public List<String> route(Message message) {
4041
String routingKey = this.routingKeyExtractor.apply(message);
41-
String stream =
42+
List<String> streams =
4243
routingKeysToStreams.computeIfAbsent(
4344
routingKey,
4445
routingKey1 -> {
4546
// TODO retry on locator lookup
4647
return env.locator().route(routingKey1, superStream);
4748
});
48-
return stream;
49+
return streams;
4950
}
5051
}

src/main/java/com/rabbitmq/stream/impl/RoutingStrategy.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import com.rabbitmq.stream.Message;
17+
import java.util.List;
1718

18-
public interface RoutingStrategy {
19+
interface RoutingStrategy {
1920

20-
String route(Message message);
21+
List<String> route(Message message);
2122
}

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -879,28 +879,33 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
879879
int read = 4;
880880
short responseCode = message.readShort();
881881
read += 2;
882-
short streamSize = message.readShort();
883-
read += 2;
884-
String stream;
885-
if (streamSize == -1) {
886-
stream = null;
882+
int streamCount = message.readInt();
883+
read += 4;
884+
List<String> streams;
885+
if (streamCount == 0) {
886+
streams = Collections.emptyList();
887887
} else {
888-
byte[] bytes = new byte[streamSize];
889-
message.readBytes(bytes);
890-
stream = new String(bytes, StandardCharsets.UTF_8);
891-
read += stream.length();
888+
streams = new ArrayList<>(streamCount);
889+
for (int i = 0; i < streamCount; i++) {
890+
String stream = readString(message);
891+
read += (2 + stream.length());
892+
streams.add(stream);
893+
}
892894
}
893895

894896
if (responseCode != RESPONSE_CODE_OK) {
895897
LOGGER.info("Route returned error: {}", Utils.formatConstant(responseCode));
896898
}
897899

898-
OutstandingRequest<String> outstandingRequest =
899-
remove(client.outstandingRequests, correlationId, String.class);
900+
OutstandingRequest<List<String>> outstandingRequest =
901+
remove(
902+
client.outstandingRequests,
903+
correlationId,
904+
new ParameterizedTypeReference<List<String>>() {});
900905
if (outstandingRequest == null) {
901906
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
902907
} else {
903-
outstandingRequest.response().set(stream);
908+
outstandingRequest.response().set(streams);
904909
outstandingRequest.countDown();
905910
}
906911
return read;

src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,13 @@ public long getLastPublishingId() {
8585
public void send(Message message, ConfirmationHandler confirmationHandler) {
8686
// TODO handle when the stream is not found (no partition found for the message)
8787
// and call the confirmation handler with a failure
88-
String stream = this.routingStrategy.route(message);
89-
Producer producer =
90-
producers.computeIfAbsent(
91-
stream, stream1 -> producerBuilder.duplicate().stream(stream1).build());
92-
producer.send(message, confirmationHandler);
88+
List<String> streams = this.routingStrategy.route(message);
89+
for (String stream : streams) {
90+
Producer producer =
91+
producers.computeIfAbsent(
92+
stream, stream1 -> producerBuilder.duplicate().stream(stream1).build());
93+
producer.send(message, confirmationHandler);
94+
}
9395
}
9496

9597
@Override

src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ void tearDown() throws Exception {
5252
}
5353

5454
@Test
55-
void routeShouldReturnNullWhenExchangeDoesNotExist() {
56-
assertThat(cf.get().route("", UUID.randomUUID().toString())).isNull();
55+
void routeShouldReturnEmptyListWhenExchangeDoesNotExist() {
56+
assertThat(cf.get().route("", UUID.randomUUID().toString())).isEmpty();
5757
}
5858

5959
@Test
@@ -66,8 +66,8 @@ void routeShouldReturnNullWhenNoStreamForRoutingKey() throws Exception {
6666
declareSuperStreamTopology(connection, superStream, partitions);
6767

6868
Client client = cf.get();
69-
assertThat(client.route("0", superStream)).isEqualTo(superStream + "-0");
70-
assertThat(client.route("42", superStream)).isNull();
69+
assertThat(client.route("0", superStream)).hasSize(1).contains(superStream + "-0");
70+
assertThat(client.route("42", superStream)).isEmpty();
7171
}
7272

7373
@Test
@@ -88,8 +88,25 @@ void routeTopologyWithPartitionCount() throws Exception {
8888
.hasSize(partitions)
8989
.containsExactlyInAnyOrderElementsOf(
9090
IntStream.range(0, partitions).mapToObj(i -> superStream + "-" + i).collect(toList()));
91-
assertThat(client.route("0", superStream)).isEqualTo(superStream + "-0");
92-
assertThat(client.route("1", superStream)).isEqualTo(superStream + "-1");
93-
assertThat(client.route("2", superStream)).isEqualTo(superStream + "-2");
91+
assertThat(client.route("0", superStream)).hasSize(1).contains(superStream + "-0");
92+
assertThat(client.route("1", superStream)).hasSize(1).contains(superStream + "-1");
93+
assertThat(client.route("2", superStream)).hasSize(1).contains(superStream + "-2");
94+
}
95+
96+
@Test
97+
void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception {
98+
declareSuperStreamTopology(connection, superStream, 3);
99+
connection.createChannel().queueBind(superStream + "-1", superStream, "0");
100+
Client client = cf.get();
101+
List<String> streams = client.partitions(superStream);
102+
assertThat(streams)
103+
.hasSize(partitions + 1)
104+
.contains(
105+
IntStream.range(0, partitions)
106+
.mapToObj(i -> superStream + "-" + i)
107+
.toArray(String[]::new));
108+
assertThat(client.route("0", superStream))
109+
.hasSize(2)
110+
.contains(superStream + "-0", superStream + "-1");
94111
}
95112
}

0 commit comments

Comments
 (0)