Skip to content

Commit d8b19d3

Browse files
committed
AMQP 1.0: Handle per-queue-type disk alarms
1 parent d288f74 commit d8b19d3

File tree

3 files changed

+147
-34
lines changed

3 files changed

+147
-34
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,8 @@
381381
stashed_eol = [] :: [rabbit_amqqueue:name()],
382382

383383
queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state(),
384+
queue_types_published = sets:new([{version, 2}]) ::
385+
sets:set(rabbit_queue_type:queue_type()),
384386
permission_cache = [] :: permission_cache(),
385387
topic_permission_cache = [] :: topic_permission_cache()
386388
}).
@@ -448,9 +450,14 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId,
448450
true = is_valid_max(MaxLinkCredit),
449451
true = is_valid_max(MaxQueueCredit),
450452
true = is_valid_max(MaxIncomingWindow),
451-
IncomingWindow = case sets:is_empty(Alarms) of
452-
true -> MaxIncomingWindow;
453-
false -> 0
453+
InResourceAlarm = sets:fold(fun ({disk, _}, Acc) ->
454+
Acc;
455+
(_, _) ->
456+
true
457+
end, false, Alarms),
458+
IncomingWindow = case InResourceAlarm of
459+
true -> 0;
460+
false -> MaxIncomingWindow
454461
end,
455462
NextOutgoingId = ?INITIAL_OUTGOING_TRANSFER_ID,
456463

@@ -582,36 +589,17 @@ handle_cast({queue_event, _, _} = QEvent, State0) ->
582589
log_error_and_close_session(Error, State0)
583590
end;
584591
handle_cast({conserve_resources, Alarm, Conserve},
585-
#state{incoming_window = IncomingWindow0,
586-
cfg = #cfg{resource_alarms = Alarms0,
587-
incoming_window_margin = Margin0,
592+
#state{cfg = #cfg{resource_alarms = Alarms0,
588593
writer_pid = WriterPid,
589-
channel_num = Ch,
590-
max_incoming_window = MaxIncomingWindow
594+
channel_num = Ch
591595
} = Cfg
592596
} = State0) ->
593597
Alarms = case Conserve of
594598
true -> sets:add_element(Alarm, Alarms0);
595599
false -> sets:del_element(Alarm, Alarms0)
596600
end,
597-
{SendFlow, IncomingWindow, Margin} =
598-
case {sets:is_empty(Alarms0), sets:is_empty(Alarms)} of
599-
{true, false} ->
600-
%% Alarm kicked in.
601-
%% Notify the client to not send us any more TRANSFERs. Since we decrase
602-
%% our incoming window dynamically, there might be incoming in-flight
603-
%% TRANSFERs. So, let's be lax and allow for some excess TRANSFERs.
604-
{true, 0, MaxIncomingWindow};
605-
{false, true} ->
606-
%% All alarms cleared.
607-
%% Notify the client that it can resume sending us TRANSFERs.
608-
{true, MaxIncomingWindow, 0};
609-
_ ->
610-
{false, IncomingWindow0, Margin0}
611-
end,
612-
State = State0#state{incoming_window = IncomingWindow,
613-
cfg = Cfg#cfg{resource_alarms = Alarms,
614-
incoming_window_margin = Margin}},
601+
State1 = State0#state{cfg = Cfg#cfg{resource_alarms = Alarms}},
602+
{SendFlow, State} = check_resource_alarm(State0, State1),
615603
case SendFlow of
616604
true ->
617605
Flow = session_flow_fields(#'v1_0.flow'{}, State),
@@ -637,6 +625,41 @@ handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) ->
637625
handle_cast(shutdown, State) ->
638626
{stop, normal, State}.
639627

628+
is_in_resource_alarm(#state{cfg = #cfg{resource_alarms = Alarms},
629+
queue_types_published = QTs}) ->
630+
sets:fold(
631+
fun ({disk, QT}, Acc) ->
632+
Acc orelse sets:is_element(QT, QTs);
633+
(_, _) ->
634+
true
635+
end, false, Alarms).
636+
637+
check_resource_alarm(State0,
638+
#state{incoming_window = IncomingWindow0,
639+
cfg = #cfg{incoming_window_margin = Margin0,
640+
max_incoming_window = MaxIncomingWindow
641+
} = Cfg} = State1) ->
642+
WasBlocked = is_in_resource_alarm(State0),
643+
IsBlocked = is_in_resource_alarm(State1),
644+
{SendFlow, IncomingWindow, Margin} =
645+
case IsBlocked of
646+
true when not WasBlocked ->
647+
%% Alarm kicked in.
648+
%% Notify the client to not send us any more TRANSFERs. Since we decrase
649+
%% our incoming window dynamically, there might be incoming in-flight
650+
%% TRANSFERs. So, let's be lax and allow for some excess TRANSFERs.
651+
{true, 0, MaxIncomingWindow};
652+
false when WasBlocked ->
653+
%% All alarms cleared.
654+
%% Notify the client that it can resume sending us TRANSFERs.
655+
{true, MaxIncomingWindow, 0};
656+
_ ->
657+
{false, IncomingWindow0, Margin0}
658+
end,
659+
State = State1#state{incoming_window = IncomingWindow,
660+
cfg = Cfg#cfg{incoming_window_margin = Margin}},
661+
{SendFlow, State}.
662+
640663
log_error_and_close_session(
641664
Error, State = #state{cfg = #cfg{reader_pid = ReaderPid,
642665
writer_pid = WriterPid,
@@ -1940,7 +1963,6 @@ session_flow_control_received_transfer(
19401963
incoming_window = InWindow0,
19411964
remote_outgoing_window = RemoteOutgoingWindow,
19421965
cfg = #cfg{incoming_window_margin = Margin,
1943-
resource_alarms = Alarms,
19441966
max_incoming_window = MaxIncomingWindow}
19451967
} = State) ->
19461968
InWindow1 = InWindow0 - 1,
@@ -1954,7 +1976,7 @@ session_flow_control_received_transfer(
19541976
ok
19551977
end,
19561978
{Flows, InWindow} = case InWindow1 =< (MaxIncomingWindow div 2) andalso
1957-
sets:is_empty(Alarms) of
1979+
not is_in_resource_alarm(State) of
19581980
true ->
19591981
%% We've reached halfway and there are no
19601982
%% disk or memory alarm, open the window.
@@ -2371,6 +2393,7 @@ incoming_link_transfer(
23712393
multi_transfer_msg = MultiTransfer
23722394
} = Link0,
23732395
State0 = #state{queue_states = QStates0,
2396+
queue_types_published = QTs0,
23742397
permission_cache = PermCache0,
23752398
topic_permission_cache = TopicPermCache0,
23762399
cfg = #cfg{user = User = #user{username = Username},
@@ -2414,19 +2437,26 @@ incoming_link_transfer(
24142437
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
24152438
case rabbit_queue_type:deliver(Qs, Mc, Opts, QStates0) of
24162439
{ok, QStates, Actions} ->
2440+
QTs1 = sets:from_list(rabbit_amqqueue:queue_types(Qs),
2441+
[{version, 2}]),
2442+
QTs = sets:union(QTs0, QTs1),
24172443
State1 = State0#state{queue_states = QStates,
2444+
queue_types_published = QTs,
24182445
permission_cache = PermCache,
24192446
topic_permission_cache = TopicPermCache},
24202447
%% Confirms must be registered before processing actions
24212448
%% because actions may contain rejections of publishes.
24222449
{U, Reply0} = process_routing_confirm(
24232450
Qs, Settled, DeliveryId, U0),
2424-
State = handle_queue_actions(Actions, State1),
2451+
State2 = handle_queue_actions(Actions, State1),
2452+
{SendAlarmFlow, State} = check_resource_alarm(
2453+
State0, State2),
24252454
DeliveryCount = add(DeliveryCount0, 1),
24262455
Credit1 = Credit0 - 1,
24272456
{Credit, Reply1} = maybe_grant_link_credit(
24282457
Credit1, MaxLinkCredit,
2429-
DeliveryCount, map_size(U), Handle),
2458+
DeliveryCount, map_size(U), Handle,
2459+
SendAlarmFlow),
24302460
Reply = Reply0 ++ Reply1,
24312461
Link = Link0#incoming_link{
24322462
delivery_count = DeliveryCount,
@@ -2461,7 +2491,8 @@ incoming_link_transfer(
24612491
Credit1 = Credit0 - 1,
24622492
{Credit, Reply0} = maybe_grant_link_credit(
24632493
Credit1, MaxLinkCredit,
2464-
DeliveryCount, map_size(U0), Handle),
2494+
DeliveryCount, map_size(U0), Handle,
2495+
false),
24652496
Reply = [Disposition | Reply0],
24662497
Link = Link0#incoming_link{
24672498
delivery_count = DeliveryCount,
@@ -2574,8 +2605,9 @@ rejected(DeliveryId, Error) ->
25742605
settled = true,
25752606
state = #'v1_0.rejected'{error = Error}}.
25762607

2577-
maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, Handle) ->
2578-
case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) of
2608+
maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed,
2609+
Handle, AlarmFlow) ->
2610+
case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) orelse AlarmFlow of
25792611
true ->
25802612
{MaxLinkCredit, [flow(Handle, DeliveryCount, MaxLinkCredit)]};
25812613
false ->

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ groups() ->
116116
resource_alarm_before_session_begin,
117117
resource_alarm_after_session_begin,
118118
resource_alarm_send_many,
119+
per_queue_type_disk_alarm,
119120
max_message_size_client_to_server,
120121
max_message_size_server_to_client,
121122
global_counters,
@@ -225,7 +226,13 @@ init_per_suite(Config) ->
225226
rabbit_ct_helpers:log_environment(),
226227
rabbit_ct_helpers:merge_app_env(
227228
Config, {rabbit, [{quorum_tick_interval, 1000},
228-
{stream_tick_interval, 1000}
229+
{stream_tick_interval, 1000},
230+
%% Imaginary mount-point for per-queue-type disk alarms
231+
{disk_free_limits,
232+
#{1 => #{name => <<"streaming">>,
233+
mount => "/does/not/exist",
234+
limit => "2GB",
235+
queue_types => [<<"stream">>]}}}
229236
]}).
230237

231238
end_per_suite(Config) ->
@@ -3263,6 +3270,76 @@ auth_attempt_metrics(Config) ->
32633270
?assertEqual(0, proplists:get_value(auth_attempts_failed, Attempt2)),
32643271
?assertEqual(1, proplists:get_value(auth_attempts_succeeded, Attempt2)).
32653272

3273+
per_queue_type_disk_alarm(Config) ->
3274+
Prefix = atom_to_binary(?FUNCTION_NAME),
3275+
Resource = {disk, rabbit_stream_queue},
3276+
CQ = <<Prefix/binary, "-classic">>,
3277+
SQ = <<Prefix/binary, "-stream">>,
3278+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
3279+
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = CQ}),
3280+
#'queue.declare_ok'{} = amqp_channel:call(
3281+
Ch, #'queue.declare'{
3282+
queue = SQ,
3283+
durable = true,
3284+
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}),
3285+
3286+
OpnConf = connection_config(Config),
3287+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
3288+
3289+
%% Set the alarm for the stream queue type.
3290+
ok = rabbit_ct_broker_helpers:set_alarm(Config, 0, Resource),
3291+
3292+
%% Attach one sender to the CQ and one to the SQ.
3293+
{ok, Session1} = amqp10_client:begin_session_sync(Connection),
3294+
{ok, Sender1} = amqp10_client:attach_sender_link(
3295+
Session1, <<Prefix/binary, "-cq-sender">>,
3296+
rabbitmq_amqp_address:queue(CQ), unsettled),
3297+
{ok, Session2} = amqp10_client:begin_session_sync(Connection),
3298+
{ok, Sender2} = amqp10_client:attach_sender_link(
3299+
Session2, <<Prefix/binary, "-sq-sender">>,
3300+
rabbitmq_amqp_address:queue(SQ), unsettled),
3301+
3302+
%% Both senders initially have link and session credit.
3303+
ok = wait_for_credit(Sender1),
3304+
ok = wait_for_credit(Sender2),
3305+
Tag1 = <<"tag1">>,
3306+
Msg1 = amqp10_msg:new(Tag1, <<"m1">>, false),
3307+
?assertEqual(ok,
3308+
amqp10_client:send_msg(Sender1, Msg1)),
3309+
ok = wait_for_accepted(Tag1),
3310+
?assertEqual(ok,
3311+
amqp10_client:send_msg(Sender2, Msg1)),
3312+
ok = wait_for_accepted(Tag1),
3313+
3314+
%% Once the SQ sender has delivered to a stream, it becomes blocked by
3315+
%% session flow control.
3316+
Tag2 = <<"tag2">>,
3317+
Msg2 = amqp10_msg:new(Tag2, <<"m2">>, false),
3318+
?assertEqual(ok,
3319+
amqp10_client:send_msg(Sender1, Msg2)),
3320+
ok = wait_for_accepted(Tag2),
3321+
?assertEqual({error, remote_incoming_window_exceeded},
3322+
amqp10_client:send_msg(Sender2, Msg2)),
3323+
3324+
%% Clear the alarm and the SQ sender can then send transfers.
3325+
ok = rabbit_ct_broker_helpers:clear_alarm(Config, 0, Resource),
3326+
Tag3 = <<"tag3">>,
3327+
Msg3 = amqp10_msg:new(Tag3, <<"m3">>, false),
3328+
?assertEqual(ok,
3329+
amqp10_client:send_msg(Sender1, Msg3)),
3330+
ok = wait_for_accepted(Tag3),
3331+
?assertEqual(ok,
3332+
amqp10_client:send_msg(Sender2, Msg3)),
3333+
ok = wait_for_accepted(Tag3),
3334+
3335+
ok = amqp10_client:detach_link(Sender1),
3336+
ok = end_session_sync(Session1),
3337+
ok = amqp10_client:detach_link(Sender2),
3338+
ok = end_session_sync(Session2),
3339+
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = CQ}),
3340+
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = SQ}),
3341+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
3342+
32663343
max_message_size_client_to_server(Config) ->
32673344
DefaultMaxMessageSize = rpc(Config, persistent_term, get, [max_message_size]),
32683345
%% Limit the server to only accept messages up to 2KB.

deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1815,6 +1815,8 @@ set_alarm(Config, Node, file_descriptor_limit = Resource) ->
18151815
set_alarm(Config, Node, memory = Resource) ->
18161816
rpc(Config, Node, rabbit_alarm, set_alarm, [{{resource_limit, Resource, Node}, []}]);
18171817
set_alarm(Config, Node, disk = Resource) ->
1818+
rpc(Config, Node, rabbit_alarm, set_alarm, [{{resource_limit, Resource, Node}, []}]);
1819+
set_alarm(Config, Node, {disk, QueueType} = Resource) when is_atom(QueueType) ->
18181820
rpc(Config, Node, rabbit_alarm, set_alarm, [{{resource_limit, Resource, Node}, []}]).
18191821

18201822
get_alarms(Config, Node) ->
@@ -1828,6 +1830,8 @@ clear_alarm(Config, Node, file_descriptor_limit = Resource) ->
18281830
clear_alarm(Config, Node, memory = Resource) ->
18291831
rpc(Config, Node, rabbit_alarm, clear_alarm, [{resource_limit, Resource, Node}]);
18301832
clear_alarm(Config, Node, disk = Resource) ->
1833+
rpc(Config, Node, rabbit_alarm, clear_alarm, [{resource_limit, Resource, Node}]);
1834+
clear_alarm(Config, Node, {disk, QueueType} = Resource) when is_atom(QueueType) ->
18311835
rpc(Config, Node, rabbit_alarm, clear_alarm, [{resource_limit, Resource, Node}]).
18321836

18331837
clear_all_alarms(Config, Node) ->

0 commit comments

Comments
 (0)