@@ -162,6 +162,12 @@ init([KeepaliveSup,
162162 DeliverVersion = ? VERSION_1 ,
163163 RequestTimeout = application :get_env (rabbitmq_stream ,
164164 request_timeout , 60_000 ),
165+ Sources = rabbit_alarm :register (self (),
166+ {? MODULE , resource_alarm , []}),
167+ Alarms = sets :from_list ([S || S <- Sources ,
168+ S =:= disk orelse
169+ S =:= ? STREAM_DISK_ALARM ],
170+ [{version , 2 }]),
165171 Connection =
166172 # stream_connection {name =
167173 rabbit_data_coercion :to_binary (ConnStr ),
@@ -181,7 +187,7 @@ init([KeepaliveSup,
181187 authentication_state = none ,
182188 connection_step = tcp_connected ,
183189 frame_max = FrameMax ,
184- resource_alarm = false ,
190+ resource_alarms = Alarms ,
185191 send_file_oct = SendFileOct ,
186192 transport = ConnTransport ,
187193 proxy_socket =
@@ -192,11 +198,10 @@ init([KeepaliveSup,
192198 deliver_version = DeliverVersion },
193199 State =
194200 # stream_connection_state {consumers = #{},
195- blocked = false ,
201+ blocked = not sets : is_empty ( Alarms ) ,
196202 data =
197203 rabbit_stream_core :init (undefined )},
198204 Transport :setopts (RealSocket , [{active , once }]),
199- _ = rabbit_alarm :register (self (), {? MODULE , resource_alarm , []}),
200205 ConnectionNegotiationStepTimeout =
201206 application :get_env (rabbitmq_stream ,
202207 connection_negotiation_step_timeout ,
@@ -423,6 +428,8 @@ handle_info(Msg,
423428 # statem_data {transport = Transport ,
424429 connection =
425430 # stream_connection {socket = S ,
431+ resource_alarms =
432+ ResourceAlarms ,
426433 connection_step =
427434 PreviousConnectionStep } =
428435 Connection ,
@@ -451,12 +458,18 @@ handle_info(Msg,
451458 {Error , S , Reason } ->
452459 ? LOG_WARNING (" Socket error ~tp [~w ]" , [Reason , S ]),
453460 stop ;
454- {resource_alarm , IsThereAlarm } ->
461+ {resource_alarm , Source , Conserve } ->
462+ ResourceAlarms1 = case Conserve of
463+ true ->
464+ sets :add_element (Source , ResourceAlarms );
465+ false ->
466+ sets :del_element (Source , ResourceAlarms )
467+ end ,
455468 {keep_state ,
456469 StatemData # statem_data {connection =
457- Connection # stream_connection {resource_alarm
470+ Connection # stream_connection {resource_alarms
458471 =
459- IsThereAlarm },
472+ ResourceAlarms1 },
460473 connection_state =
461474 State # stream_connection_state {blocked =
462475 true }}};
@@ -502,8 +515,9 @@ invalid_transition(Transport, Socket, From, To) ->
502515-spec resource_alarm (pid (),
503516 rabbit_alarm :resource_alarm_source (),
504517 rabbit_alarm :resource_alert ()) -> ok .
505- resource_alarm (ConnectionPid , disk , {_ , Conserve , _ }) ->
506- ConnectionPid ! {resource_alarm , Conserve },
518+ resource_alarm (ConnectionPid , Source , {_ , Conserve , _ })
519+ when Source =:= disk orelse Source =:= ? STREAM_DISK_ALARM ->
520+ ConnectionPid ! {resource_alarm , Source , Conserve },
507521 ok ;
508522resource_alarm (_ConnectionPid , _Resource , _Alert ) ->
509523 ok .
@@ -525,17 +539,12 @@ should_unblock(#stream_connection{publishers = Publishers}, _)
525539 % % always unblock a connection without publishers
526540 true ;
527541should_unblock (# stream_connection {credits = Credits ,
528- resource_alarm = ResourceAlarm },
542+ resource_alarms = ResourceAlarms },
529543 # configuration {credits_required_for_unblocking =
530544 CreditsRequiredForUnblocking }) ->
531- case {ResourceAlarm ,
532- has_enough_credits_to_unblock (Credits , CreditsRequiredForUnblocking )}
533- of
534- {true , _ } ->
535- false ;
536- {false , EnoughCreditsToUnblock } ->
537- EnoughCreditsToUnblock
538- end .
545+ sets :is_empty (ResourceAlarms ) andalso
546+ has_enough_credits_to_unblock (Credits ,
547+ CreditsRequiredForUnblocking ).
539548
540549init_credit (CreditReference , Credits ) ->
541550 atomics :put (CreditReference , 1 , Credits ).
@@ -624,12 +633,13 @@ close_immediately(Transport, S) ->
624633
625634open (enter , _OldState , _StateData ) ->
626635 keep_state_and_data ;
627- open (info , {resource_alarm , IsThereAlarm },
636+ open (info , {resource_alarm , Source , Conserve },
628637 # statem_data {transport = Transport ,
629638 connection =
630639 # stream_connection {socket = S ,
631640 name = ConnectionName ,
632641 credits = Credits ,
642+ resource_alarms = ResourceAlarms ,
633643 heartbeater = Heartbeater } =
634644 Connection ,
635645 connection_state =
@@ -638,6 +648,13 @@ open(info, {resource_alarm, IsThereAlarm},
638648 # configuration {credits_required_for_unblocking =
639649 CreditsRequiredForUnblocking }} =
640650 StatemData ) ->
651+ ResourceAlarms1 = case Conserve of
652+ true ->
653+ sets :add_element (Source , ResourceAlarms );
654+ false ->
655+ sets :del_element (Source , ResourceAlarms )
656+ end ,
657+ IsThereAlarm = not sets :is_empty (ResourceAlarms1 ),
641658 ? LOG_DEBUG (" Connection ~tp received resource alarm. Alarm "
642659 " on? ~tp " ,
643660 [ConnectionName , IsThereAlarm ]),
@@ -668,8 +685,8 @@ open(info, {resource_alarm, IsThereAlarm},
668685 end ,
669686 {keep_state ,
670687 StatemData # statem_data {connection =
671- Connection # stream_connection {resource_alarm =
672- IsThereAlarm },
688+ Connection # stream_connection {resource_alarms =
689+ ResourceAlarms1 },
673690 connection_state =
674691 State # stream_connection_state {blocked =
675692 NewBlockedState }}};
@@ -1190,15 +1207,23 @@ close_sent(info, {tcp_error, S, Reason}, #statem_data{}) ->
11901207 " [~w ] [~w ]" ,
11911208 [Reason , S , self ()]),
11921209 stop ;
1193- close_sent (info , {resource_alarm , IsThereAlarm },
1210+ close_sent (info , {resource_alarm , Source , Conserve },
11941211 StatemData = # statem_data {connection = Connection }) ->
1212+ ResourceAlarms = Connection # stream_connection .resource_alarms ,
1213+ ResourceAlarms1 = case Conserve of
1214+ true ->
1215+ sets :add_element (Source , ResourceAlarms );
1216+ false ->
1217+ sets :del_element (Source , ResourceAlarms )
1218+ end ,
1219+ IsThereAlarm = not sets :is_empty (ResourceAlarms1 ),
11951220 ? LOG_WARNING (" Stream protocol connection ignored a resource "
11961221 " alarm ~tp in state ~ts " ,
11971222 [IsThereAlarm , ? FUNCTION_NAME ]),
11981223 {keep_state ,
11991224 StatemData # statem_data {connection =
1200- Connection # stream_connection {resource_alarm =
1201- IsThereAlarm }}};
1225+ Connection # stream_connection {resource_alarms =
1226+ ResourceAlarms1 }}};
12021227close_sent (info , Msg , _StatemData ) ->
12031228 ? LOG_WARNING (" Ignored unknown message ~tp in state ~ts " ,
12041229 [Msg , ? FUNCTION_NAME ]),
@@ -1550,13 +1575,15 @@ notify_auth_result(Username,
15501575 [P || {_ , V } = P <- EventProps , V =/= '' ]).
15511576
15521577handle_frame_post_auth (Transport ,
1553- # stream_connection {resource_alarm = true } = Connection0 ,
1578+ # stream_connection {resource_alarms =
1579+ ResourceAlarms } = Connection0 ,
15541580 State ,
15551581 {request , CorrelationId ,
15561582 {declare_publisher ,
15571583 PublisherId ,
15581584 _WriterRef ,
1559- Stream }}) ->
1585+ Stream }})
1586+ when map_size (ResourceAlarms ) =/= 0 ->
15601587 ? LOG_INFO (" Cannot create publisher ~tp on stream ~tp , connection "
15611588 " is blocked because of resource alarm" ,
15621589 [PublisherId , Stream ]),
@@ -1574,10 +1601,12 @@ handle_frame_post_auth(Transport,
15741601 host = Host ,
15751602 auth_mechanism = Auth_Mechanism ,
15761603 authentication_state = AuthState ,
1577- resource_alarm = false } = C1 ,
1604+ resource_alarms = ResourceAlarms
1605+ } = C1 ,
15781606 S1 ,
15791607 {request , CorrelationId ,
1580- {sasl_authenticate , NewMechanism , NewSaslBin }}) ->
1608+ {sasl_authenticate , NewMechanism , NewSaslBin }})
1609+ when map_size (ResourceAlarms ) =:= 0 ->
15811610 ? LOG_DEBUG (" Received sasl_authenticate for username '~ts '" , [Username ]),
15821611
15831612 {Connection1 , State1 } =
@@ -1656,33 +1685,35 @@ handle_frame_post_auth(Transport,
16561685 {Connection1 , State1 };
16571686handle_frame_post_auth (Transport ,
16581687 # stream_connection {user = User ,
1659- resource_alarm = false } = C ,
1688+ resource_alarms = ResourceAlarms
1689+ } = C ,
16601690 State ,
16611691 {request , CorrelationId ,
16621692 {declare_publisher , _PublisherId , WriterRef , S }})
1663- when ? IS_INVALID_REF (WriterRef ) ->
1664- {Code , Counter } = case check_write_permitted (stream_r (S , C ), User ) of
1665- ok ->
1666- {? RESPONSE_CODE_PRECONDITION_FAILED , ? PRECONDITION_FAILED };
1667- error ->
1668- {? RESPONSE_CODE_ACCESS_REFUSED , ? ACCESS_REFUSED }
1669- end ,
1670- response (Transport ,
1671- C ,
1672- declare_publisher ,
1673- CorrelationId ,
1674- Code ),
1675- increase_protocol_counter (Counter ),
1676- {C , State };
1693+ when ? IS_INVALID_REF (WriterRef ) andalso map_size ( ResourceAlarms ) =:= 0 ->
1694+ {Code , Counter } = case check_write_permitted (stream_r (S , C ), User ) of
1695+ ok ->
1696+ {? RESPONSE_CODE_PRECONDITION_FAILED , ? PRECONDITION_FAILED };
1697+ error ->
1698+ {? RESPONSE_CODE_ACCESS_REFUSED , ? ACCESS_REFUSED }
1699+ end ,
1700+ response (Transport ,
1701+ C ,
1702+ declare_publisher ,
1703+ CorrelationId ,
1704+ Code ),
1705+ increase_protocol_counter (Counter ),
1706+ {C , State };
16771707handle_frame_post_auth (Transport ,
16781708 # stream_connection {user = User ,
16791709 publishers = Publishers0 ,
16801710 publisher_to_ids = RefIds0 ,
1681- resource_alarm = false } =
1711+ resource_alarms = ResourceAlarms } =
16821712 Connection0 ,
16831713 State ,
16841714 {request , CorrelationId ,
1685- {declare_publisher , PublisherId , WriterRef , Stream }}) ->
1715+ {declare_publisher , PublisherId , WriterRef , Stream }})
1716+ when map_size (ResourceAlarms ) =:= 0 ->
16861717 case check_write_permitted (stream_r (Stream ,
16871718 Connection0 ),
16881719 User )
0 commit comments