Skip to content

Commit ded3ba9

Browse files
committed
rabbit_stream_reader: Block during stream queue-type disk alarm
1 parent 4340016 commit ded3ba9

File tree

2 files changed

+79
-45
lines changed

2 files changed

+79
-45
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 75 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,12 @@ init([KeepaliveSup,
162162
DeliverVersion = ?VERSION_1,
163163
RequestTimeout = application:get_env(rabbitmq_stream,
164164
request_timeout, 60_000),
165+
Sources = rabbit_alarm:register(self(),
166+
{?MODULE, resource_alarm, []}),
167+
Alarms = sets:from_list([S || S <- Sources,
168+
S =:= disk orelse
169+
S =:= ?STREAM_DISK_ALARM],
170+
[{version, 2}]),
165171
Connection =
166172
#stream_connection{name =
167173
rabbit_data_coercion:to_binary(ConnStr),
@@ -181,7 +187,7 @@ init([KeepaliveSup,
181187
authentication_state = none,
182188
connection_step = tcp_connected,
183189
frame_max = FrameMax,
184-
resource_alarm = false,
190+
resource_alarms = Alarms,
185191
send_file_oct = SendFileOct,
186192
transport = ConnTransport,
187193
proxy_socket =
@@ -192,11 +198,10 @@ init([KeepaliveSup,
192198
deliver_version = DeliverVersion},
193199
State =
194200
#stream_connection_state{consumers = #{},
195-
blocked = false,
201+
blocked = not sets:is_empty(Alarms),
196202
data =
197203
rabbit_stream_core:init(undefined)},
198204
Transport:setopts(RealSocket, [{active, once}]),
199-
_ = rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}),
200205
ConnectionNegotiationStepTimeout =
201206
application:get_env(rabbitmq_stream,
202207
connection_negotiation_step_timeout,
@@ -423,6 +428,8 @@ handle_info(Msg,
423428
#statem_data{transport = Transport,
424429
connection =
425430
#stream_connection{socket = S,
431+
resource_alarms =
432+
ResourceAlarms,
426433
connection_step =
427434
PreviousConnectionStep} =
428435
Connection,
@@ -451,12 +458,18 @@ handle_info(Msg,
451458
{Error, S, Reason} ->
452459
?LOG_WARNING("Socket error ~tp [~w]", [Reason, S]),
453460
stop;
454-
{resource_alarm, IsThereAlarm} ->
461+
{resource_alarm, Source, Conserve} ->
462+
ResourceAlarms1 = case Conserve of
463+
true ->
464+
sets:add_element(Source, ResourceAlarms);
465+
false ->
466+
sets:del_element(Source, ResourceAlarms)
467+
end,
455468
{keep_state,
456469
StatemData#statem_data{connection =
457-
Connection#stream_connection{resource_alarm
470+
Connection#stream_connection{resource_alarms
458471
=
459-
IsThereAlarm},
472+
ResourceAlarms1},
460473
connection_state =
461474
State#stream_connection_state{blocked =
462475
true}}};
@@ -502,8 +515,9 @@ invalid_transition(Transport, Socket, From, To) ->
502515
-spec resource_alarm(pid(),
503516
rabbit_alarm:resource_alarm_source(),
504517
rabbit_alarm:resource_alert()) -> ok.
505-
resource_alarm(ConnectionPid, disk, {_, Conserve, _}) ->
506-
ConnectionPid ! {resource_alarm, Conserve},
518+
resource_alarm(ConnectionPid, Source, {_, Conserve, _})
519+
when Source =:= disk orelse Source =:= ?STREAM_DISK_ALARM ->
520+
ConnectionPid ! {resource_alarm, Source, Conserve},
507521
ok;
508522
resource_alarm(_ConnectionPid, _Resource, _Alert) ->
509523
ok.
@@ -525,17 +539,12 @@ should_unblock(#stream_connection{publishers = Publishers}, _)
525539
%% always unblock a connection without publishers
526540
true;
527541
should_unblock(#stream_connection{credits = Credits,
528-
resource_alarm = ResourceAlarm},
542+
resource_alarms = ResourceAlarms},
529543
#configuration{credits_required_for_unblocking =
530544
CreditsRequiredForUnblocking}) ->
531-
case {ResourceAlarm,
532-
has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking)}
533-
of
534-
{true, _} ->
535-
false;
536-
{false, EnoughCreditsToUnblock} ->
537-
EnoughCreditsToUnblock
538-
end.
545+
sets:is_empty(ResourceAlarms) andalso
546+
has_enough_credits_to_unblock(Credits,
547+
CreditsRequiredForUnblocking).
539548

540549
init_credit(CreditReference, Credits) ->
541550
atomics:put(CreditReference, 1, Credits).
@@ -624,12 +633,13 @@ close_immediately(Transport, S) ->
624633

625634
open(enter, _OldState, _StateData) ->
626635
keep_state_and_data;
627-
open(info, {resource_alarm, IsThereAlarm},
636+
open(info, {resource_alarm, Source, Conserve},
628637
#statem_data{transport = Transport,
629638
connection =
630639
#stream_connection{socket = S,
631640
name = ConnectionName,
632641
credits = Credits,
642+
resource_alarms = ResourceAlarms,
633643
heartbeater = Heartbeater} =
634644
Connection,
635645
connection_state =
@@ -638,6 +648,13 @@ open(info, {resource_alarm, IsThereAlarm},
638648
#configuration{credits_required_for_unblocking =
639649
CreditsRequiredForUnblocking}} =
640650
StatemData) ->
651+
ResourceAlarms1 = case Conserve of
652+
true ->
653+
sets:add_element(Source, ResourceAlarms);
654+
false ->
655+
sets:del_element(Source, ResourceAlarms)
656+
end,
657+
IsThereAlarm = not sets:is_empty(ResourceAlarms1),
641658
?LOG_DEBUG("Connection ~tp received resource alarm. Alarm "
642659
"on? ~tp",
643660
[ConnectionName, IsThereAlarm]),
@@ -668,8 +685,8 @@ open(info, {resource_alarm, IsThereAlarm},
668685
end,
669686
{keep_state,
670687
StatemData#statem_data{connection =
671-
Connection#stream_connection{resource_alarm =
672-
IsThereAlarm},
688+
Connection#stream_connection{resource_alarms =
689+
ResourceAlarms1},
673690
connection_state =
674691
State#stream_connection_state{blocked =
675692
NewBlockedState}}};
@@ -1190,15 +1207,23 @@ close_sent(info, {tcp_error, S, Reason}, #statem_data{}) ->
11901207
"[~w] [~w]",
11911208
[Reason, S, self()]),
11921209
stop;
1193-
close_sent(info, {resource_alarm, IsThereAlarm},
1210+
close_sent(info, {resource_alarm, Source, Conserve},
11941211
StatemData = #statem_data{connection = Connection}) ->
1212+
ResourceAlarms = Connection#stream_connection.resource_alarms,
1213+
ResourceAlarms1 = case Conserve of
1214+
true ->
1215+
sets:add_element(Source, ResourceAlarms);
1216+
false ->
1217+
sets:del_element(Source, ResourceAlarms)
1218+
end,
1219+
IsThereAlarm = not sets:is_empty(ResourceAlarms1),
11951220
?LOG_WARNING("Stream protocol connection ignored a resource "
11961221
"alarm ~tp in state ~ts",
11971222
[IsThereAlarm, ?FUNCTION_NAME]),
11981223
{keep_state,
11991224
StatemData#statem_data{connection =
1200-
Connection#stream_connection{resource_alarm =
1201-
IsThereAlarm}}};
1225+
Connection#stream_connection{resource_alarms =
1226+
ResourceAlarms1}}};
12021227
close_sent(info, Msg, _StatemData) ->
12031228
?LOG_WARNING("Ignored unknown message ~tp in state ~ts",
12041229
[Msg, ?FUNCTION_NAME]),
@@ -1550,13 +1575,15 @@ notify_auth_result(Username,
15501575
[P || {_, V} = P <- EventProps, V =/= '']).
15511576

15521577
handle_frame_post_auth(Transport,
1553-
#stream_connection{resource_alarm = true} = Connection0,
1578+
#stream_connection{resource_alarms =
1579+
ResourceAlarms} = Connection0,
15541580
State,
15551581
{request, CorrelationId,
15561582
{declare_publisher,
15571583
PublisherId,
15581584
_WriterRef,
1559-
Stream}}) ->
1585+
Stream}})
1586+
when map_size(ResourceAlarms) =/= 0 ->
15601587
?LOG_INFO("Cannot create publisher ~tp on stream ~tp, connection "
15611588
"is blocked because of resource alarm",
15621589
[PublisherId, Stream]),
@@ -1574,10 +1601,12 @@ handle_frame_post_auth(Transport,
15741601
host = Host,
15751602
auth_mechanism = Auth_Mechanism,
15761603
authentication_state = AuthState,
1577-
resource_alarm = false} = C1,
1604+
resource_alarms = ResourceAlarms
1605+
} = C1,
15781606
S1,
15791607
{request, CorrelationId,
1580-
{sasl_authenticate, NewMechanism, NewSaslBin}}) ->
1608+
{sasl_authenticate, NewMechanism, NewSaslBin}})
1609+
when map_size(ResourceAlarms) =:= 0 ->
15811610
?LOG_DEBUG("Received sasl_authenticate for username '~ts'", [Username]),
15821611

15831612
{Connection1, State1} =
@@ -1656,33 +1685,35 @@ handle_frame_post_auth(Transport,
16561685
{Connection1, State1};
16571686
handle_frame_post_auth(Transport,
16581687
#stream_connection{user = User,
1659-
resource_alarm = false} = C,
1688+
resource_alarms = ResourceAlarms
1689+
} = C,
16601690
State,
16611691
{request, CorrelationId,
16621692
{declare_publisher, _PublisherId, WriterRef, S}})
1663-
when ?IS_INVALID_REF(WriterRef) ->
1664-
{Code, Counter} = case check_write_permitted(stream_r(S, C), User) of
1665-
ok ->
1666-
{?RESPONSE_CODE_PRECONDITION_FAILED, ?PRECONDITION_FAILED};
1667-
error ->
1668-
{?RESPONSE_CODE_ACCESS_REFUSED, ?ACCESS_REFUSED}
1669-
end,
1670-
response(Transport,
1671-
C,
1672-
declare_publisher,
1673-
CorrelationId,
1674-
Code),
1675-
increase_protocol_counter(Counter),
1676-
{C, State};
1693+
when ?IS_INVALID_REF(WriterRef) andalso map_size(ResourceAlarms) =:= 0 ->
1694+
{Code, Counter} = case check_write_permitted(stream_r(S, C), User) of
1695+
ok ->
1696+
{?RESPONSE_CODE_PRECONDITION_FAILED, ?PRECONDITION_FAILED};
1697+
error ->
1698+
{?RESPONSE_CODE_ACCESS_REFUSED, ?ACCESS_REFUSED}
1699+
end,
1700+
response(Transport,
1701+
C,
1702+
declare_publisher,
1703+
CorrelationId,
1704+
Code),
1705+
increase_protocol_counter(Counter),
1706+
{C, State};
16771707
handle_frame_post_auth(Transport,
16781708
#stream_connection{user = User,
16791709
publishers = Publishers0,
16801710
publisher_to_ids = RefIds0,
1681-
resource_alarm = false} =
1711+
resource_alarms = ResourceAlarms} =
16821712
Connection0,
16831713
State,
16841714
{request, CorrelationId,
1685-
{declare_publisher, PublisherId, WriterRef, Stream}}) ->
1715+
{declare_publisher, PublisherId, WriterRef, Stream}})
1716+
when map_size(ResourceAlarms) =:= 0 ->
16861717
case check_write_permitted(stream_r(Stream,
16871718
Connection0),
16881719
User)

deps/rabbitmq_stream/src/rabbit_stream_reader.hrl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
authenticating | authenticated | tuning |
2525
tuned | opened | failure |
2626
closing | close_sent | closing_done.
27+
-define(STREAM_DISK_ALARM, {disk, rabbit_stream_queue}).
28+
%% subset of rabbit_alarm:resource_alarm_source()
29+
-type blocked_resource() :: disk | ?STREAM_DISK_ALARM.
2730

2831
-record(publisher,
2932
{publisher_id :: publisher_id(),
@@ -86,7 +89,7 @@
8689
client_properties = #{} :: #{binary() => binary()},
8790
monitors = #{} :: #{reference() => {pid(), stream()}},
8891
stats_timer :: undefined | rabbit_event:state(),
89-
resource_alarm :: boolean(),
92+
resource_alarms :: sets:set(blocked_resource()),
9093
send_file_oct ::
9194
atomics:atomics_ref(), % number of bytes sent with send_file (for metrics)
9295
transport :: tcp | ssl,

0 commit comments

Comments
 (0)