4646import static com .rabbitmq .stream .impl .Utils .extractResponseCode ;
4747import static com .rabbitmq .stream .impl .Utils .formatConstant ;
4848import static java .lang .String .format ;
49+ import static java .lang .String .join ;
4950import static java .util .concurrent .TimeUnit .SECONDS ;
5051
5152import com .rabbitmq .stream .AuthenticationFailureException ;
@@ -421,10 +422,11 @@ private Map<String, String> peerProperties() {
421422 throw new StreamException ("Error when establishing stream connection" , request .error ());
422423 }
423424 } catch (StreamException e ) {
425+ outstandingRequests .remove (correlationId );
424426 throw e ;
425427 } catch (RuntimeException e ) {
426428 outstandingRequests .remove (correlationId );
427- throw new StreamException (e );
429+ throw new StreamException ("Error while trying to exchange peer properties" , e );
428430 }
429431 }
430432
@@ -484,9 +486,12 @@ private SaslAuthenticateResponse sendSaslAuthenticate(
484486 channel .writeAndFlush (bb );
485487 request .block ();
486488 return request .response .get ();
489+ } catch (StreamException e ) {
490+ outstandingRequests .remove (correlationId );
491+ throw e ;
487492 } catch (RuntimeException e ) {
488493 outstandingRequests .remove (correlationId );
489- throw new StreamException (e );
494+ throw new StreamException ("Error while trying to authenticate" , e );
490495 }
491496 }
492497
@@ -516,7 +521,7 @@ private Map<String, String> open(String virtualHost) {
516521 throw e ;
517522 } catch (RuntimeException e ) {
518523 outstandingRequests .remove (correlationId );
519- throw new StreamException (e );
524+ throw new StreamException ("Error during open command" , e );
520525 }
521526 }
522527
@@ -527,7 +532,7 @@ void send(byte[] content) {
527532 try {
528533 channel .writeAndFlush (bb ).sync ();
529534 } catch (InterruptedException e ) {
530- throw new StreamException (e );
535+ throw new StreamException ("Error while sending bytes" , e );
531536 }
532537 }
533538
@@ -555,9 +560,12 @@ private void sendClose(short code, String reason) {
555560 "Unexpected response code when closing: "
556561 + formatConstant (request .response .get ().getResponseCode ()));
557562 }
563+ } catch (StreamException e ) {
564+ outstandingRequests .remove (correlationId );
565+ throw e ;
558566 } catch (RuntimeException e ) {
559567 outstandingRequests .remove (correlationId );
560- throw new StreamException (e );
568+ throw new StreamException ("Error while closing connection" , e );
561569 }
562570 }
563571
@@ -575,9 +583,12 @@ private List<String> getSaslMechanisms() {
575583 channel .writeAndFlush (bb );
576584 request .block ();
577585 return request .response .get ();
586+ } catch (StreamException e ) {
587+ outstandingRequests .remove (correlationId );
588+ throw e ;
578589 } catch (RuntimeException e ) {
579590 outstandingRequests .remove (correlationId );
580- throw new StreamException (e );
591+ throw new StreamException ("Error while exchanging SASL mechanisms" , e );
581592 }
582593 }
583594
@@ -611,9 +622,12 @@ public Response create(String stream, Map<String, String> arguments) {
611622 channel .writeAndFlush (bb );
612623 request .block ();
613624 return request .response .get ();
625+ } catch (StreamException e ) {
626+ outstandingRequests .remove (correlationId );
627+ throw e ;
614628 } catch (RuntimeException e ) {
615629 outstandingRequests .remove (correlationId );
616- throw new StreamException (e );
630+ throw new StreamException (format ( "Error while creating stream '%s'" , stream ), e );
617631 }
618632 }
619633
@@ -656,9 +670,12 @@ public Response delete(String stream) {
656670 channel .writeAndFlush (bb );
657671 request .block ();
658672 return request .response .get ();
673+ } catch (StreamException e ) {
674+ outstandingRequests .remove (correlationId );
675+ throw e ;
659676 } catch (RuntimeException e ) {
660677 outstandingRequests .remove (correlationId );
661- throw new StreamException (e );
678+ throw new StreamException (format ( "Error while deleting stream '%s'" , stream ), e );
662679 }
663680 }
664681
@@ -688,9 +705,13 @@ public Map<String, StreamMetadata> metadata(String... streams) {
688705 channel .writeAndFlush (bb );
689706 request .block ();
690707 return request .response .get ();
708+ } catch (StreamException e ) {
709+ outstandingRequests .remove (correlationId );
710+ throw e ;
691711 } catch (RuntimeException e ) {
692712 outstandingRequests .remove (correlationId );
693- throw new StreamException (e );
713+ throw new StreamException (
714+ format ("Error while getting metadata for stream(s) '%s'" , join ("," , streams )), e );
694715 }
695716 }
696717
@@ -728,7 +749,8 @@ public Response declarePublisher(byte publisherId, String publisherReference, St
728749 throw e ;
729750 } catch (RuntimeException e ) {
730751 outstandingRequests .remove (correlationId );
731- throw new StreamException (e );
752+ throw new StreamException (
753+ format ("Error while declaring publisher for stream '%s'" , stream ), e );
732754 }
733755 }
734756
@@ -752,7 +774,7 @@ public Response deletePublisher(byte publisherId) {
752774 throw e ;
753775 } catch (RuntimeException e ) {
754776 outstandingRequests .remove (correlationId );
755- throw new StreamException (e );
777+ throw new StreamException ("Error while deleting publisher" , e );
756778 }
757779 }
758780
@@ -1090,7 +1112,8 @@ public Response subscribe(
10901112 throw e ;
10911113 } catch (RuntimeException e ) {
10921114 outstandingRequests .remove (correlationId );
1093- throw new StreamException (e );
1115+ throw new StreamException (
1116+ format ("Error while trying to subscribe to stream '%s'" , stream ), e );
10941117 }
10951118 }
10961119
@@ -1147,7 +1170,10 @@ public QueryOffsetResponse queryOffset(String reference, String stream) {
11471170 throw e ;
11481171 } catch (RuntimeException e ) {
11491172 outstandingRequests .remove (correlationId );
1150- throw new StreamException (e );
1173+ throw new StreamException (
1174+ format (
1175+ "Error while querying offset for reference '%s' on stream '%s'" , reference , stream ),
1176+ e );
11511177 }
11521178 }
11531179
@@ -1185,9 +1211,16 @@ public long queryPublisherSequence(String publisherReference, String stream) {
11851211 formatConstant (response .getResponseCode ()));
11861212 }
11871213 return response .getSequence ();
1214+ } catch (StreamException e ) {
1215+ outstandingRequests .remove (correlationId );
1216+ throw e ;
11881217 } catch (RuntimeException e ) {
11891218 outstandingRequests .remove (correlationId );
1190- throw new StreamException (e );
1219+ throw new StreamException (
1220+ format (
1221+ "Error while querying publisher sequence for '%s' on stream '%s'" ,
1222+ publisherReference , stream ),
1223+ e );
11911224 }
11921225 }
11931226
@@ -1206,9 +1239,12 @@ public Response unsubscribe(byte subscriptionId) {
12061239 channel .writeAndFlush (bb );
12071240 request .block ();
12081241 return request .response .get ();
1242+ } catch (StreamException e ) {
1243+ outstandingRequests .remove (correlationId );
1244+ throw e ;
12091245 } catch (RuntimeException e ) {
12101246 outstandingRequests .remove (correlationId );
1211- throw new StreamException (e );
1247+ throw new StreamException ("Error while unsubscribing" , e );
12121248 }
12131249 }
12141250
@@ -1330,9 +1366,16 @@ public List<String> route(String routingKey, String superStream) {
13301366 channel .writeAndFlush (bb );
13311367 request .block ();
13321368 return request .response .get ();
1369+ } catch (StreamException e ) {
1370+ outstandingRequests .remove (correlationId );
1371+ throw e ;
13331372 } catch (RuntimeException e ) {
13341373 outstandingRequests .remove (correlationId );
1335- throw new StreamException (e );
1374+ throw new StreamException (
1375+ format (
1376+ "Error while querying route for routing key '%s' on super stream '%s'" ,
1377+ routingKey , superStream ),
1378+ e );
13361379 }
13371380 }
13381381
@@ -1356,9 +1399,13 @@ public List<String> partitions(String superStream) {
13561399 channel .writeAndFlush (bb );
13571400 request .block ();
13581401 return request .response .get ();
1402+ } catch (StreamException e ) {
1403+ outstandingRequests .remove (correlationId );
1404+ throw e ;
13591405 } catch (RuntimeException e ) {
13601406 outstandingRequests .remove (correlationId );
1361- throw new StreamException (e );
1407+ throw new StreamException (
1408+ format ("Error while querying partitions for super stream '%s'" , superStream ), e );
13621409 }
13631410 }
13641411
@@ -1384,9 +1431,12 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
13841431 channel .writeAndFlush (bb );
13851432 request .block ();
13861433 return request .response .get ();
1434+ } catch (StreamException e ) {
1435+ outstandingRequests .remove (correlationId );
1436+ throw e ;
13871437 } catch (RuntimeException e ) {
13881438 outstandingRequests .remove (correlationId );
1389- throw new StreamException (e );
1439+ throw new StreamException ("Error while exchanging command version" , e );
13901440 }
13911441 }
13921442
@@ -1409,9 +1459,13 @@ StreamStatsResponse streamStats(String stream) {
14091459 channel .writeAndFlush (bb );
14101460 request .block ();
14111461 return request .response .get ();
1462+ } catch (StreamException e ) {
1463+ outstandingRequests .remove (correlationId );
1464+ throw e ;
14121465 } catch (RuntimeException e ) {
14131466 outstandingRequests .remove (correlationId );
1414- throw new StreamException (e );
1467+ throw new StreamException (
1468+ format ("Error while querying statistics for stream '%s'" , stream ), e );
14151469 }
14161470 }
14171471
0 commit comments