|
17 | 17 | import static com.rabbitmq.stream.impl.Utils.exceptionMessage; |
18 | 18 | import static com.rabbitmq.stream.impl.Utils.formatConstant; |
19 | 19 | import static com.rabbitmq.stream.impl.Utils.namedRunnable; |
| 20 | +import static java.lang.String.format; |
20 | 21 | import static java.util.concurrent.TimeUnit.SECONDS; |
21 | 22 |
|
22 | | -import com.rabbitmq.stream.Address; |
23 | | -import com.rabbitmq.stream.AddressResolver; |
24 | | -import com.rabbitmq.stream.BackOffDelayPolicy; |
25 | | -import com.rabbitmq.stream.Codec; |
26 | | -import com.rabbitmq.stream.ConsumerBuilder; |
27 | | -import com.rabbitmq.stream.Environment; |
28 | | -import com.rabbitmq.stream.MessageHandler; |
| 23 | +import com.rabbitmq.stream.*; |
29 | 24 | import com.rabbitmq.stream.MessageHandler.Context; |
30 | | -import com.rabbitmq.stream.NoOffsetException; |
31 | | -import com.rabbitmq.stream.OffsetSpecification; |
32 | | -import com.rabbitmq.stream.ProducerBuilder; |
33 | | -import com.rabbitmq.stream.StreamCreator; |
34 | | -import com.rabbitmq.stream.StreamException; |
35 | | -import com.rabbitmq.stream.StreamStats; |
36 | | -import com.rabbitmq.stream.SubscriptionListener; |
37 | 25 | import com.rabbitmq.stream.compression.CompressionCodecFactory; |
38 | 26 | import com.rabbitmq.stream.impl.Client.ClientParameters; |
39 | 27 | import com.rabbitmq.stream.impl.Client.ShutdownListener; |
@@ -515,12 +503,35 @@ public StreamStats queryStreamStats(String stream) { |
515 | 503 | response.getResponseCode(), |
516 | 504 | stream, |
517 | 505 | () -> |
518 | | - "Error while querying stream info: " |
| 506 | + "Error while querying stream stats: " |
519 | 507 | + formatConstant(response.getResponseCode()) |
520 | 508 | + "."); |
521 | 509 | } |
522 | 510 | } |
523 | 511 |
|
| 512 | + @Override |
| 513 | + public boolean streamExists(String stream) { |
| 514 | + checkNotClosed(); |
| 515 | + this.maybeInitializeLocator(); |
| 516 | + StreamStatsResponse response = |
| 517 | + locatorOperation( |
| 518 | + Utils.namedFunction( |
| 519 | + client -> client.streamStats(stream), "Query stream stats on stream '%s'", stream)); |
| 520 | + if (response.isOk()) { |
| 521 | + return true; |
| 522 | + } else if (response.getResponseCode() == Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST) { |
| 523 | + return false; |
| 524 | + } else { |
| 525 | + throw convertCodeToException( |
| 526 | + response.getResponseCode(), |
| 527 | + stream, |
| 528 | + () -> |
| 529 | + format( |
| 530 | + "Unexpected result when checking if stream '%s' exists: %s.", |
| 531 | + stream, formatConstant(response.getResponseCode()))); |
| 532 | + } |
| 533 | + } |
| 534 | + |
524 | 535 | private static class DefaultStreamStats implements StreamStats { |
525 | 536 |
|
526 | 537 | private final LongSupplier firstOffsetSupplier, committedOffsetSupplier; |
|
0 commit comments