Skip to content

Commit 73e7427

Browse files
committed
AMQP 0-9-1: Handle per-queue-type disk alarms
This covers both network and direct connections for 0-9-1. We store a set of the queue types which have been published into on both a channel and connection level since blocking is done on the connection level but only the channel knows what queue types have been published. Then when the published queue types or the set of alarms changes, the connection evaluates whether it is affected by the alarm. If not it may publish but once a channel publishes to an alarmed queue type the connection then blocks until the channel exits or the alarm clears.
1 parent 399ce7e commit 73e7427

File tree

4 files changed

+108
-26
lines changed

4 files changed

+108
-26
lines changed

deps/amqp_client/src/amqp_gen_connection.erl

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
%% connection.block, connection.unblock handler
3333
block_handler,
3434
blocked_by = sets:new([{version, 2}]),
35+
queue_types_published = sets:new([{version, 2}]),
3536
closing = false %% #closing{} | false
3637
}).
3738

@@ -214,22 +215,18 @@ handle_cast({register_blocked_handler, HandlerPid},
214215
{noreply, State1};
215216
handle_cast({conserve_resources, Source, Conserve},
216217
#state{blocked_by = BlockedBy} = State) ->
217-
WasNotBlocked = sets:is_empty(BlockedBy),
218218
BlockedBy1 = case Conserve of
219219
true ->
220220
sets:add_element(Source, BlockedBy);
221221
false ->
222222
sets:del_element(Source, BlockedBy)
223223
end,
224224
State1 = State#state{blocked_by = BlockedBy1},
225-
case sets:is_empty(BlockedBy1) of
226-
true ->
227-
handle_method(#'connection.unblocked'{}, State1);
228-
false when WasNotBlocked ->
229-
handle_method(#'connection.blocked'{}, State1);
230-
false ->
231-
{noreply, State1}
232-
end.
225+
maybe_block(State, State1);
226+
handle_cast({channel_published_to_queue_type, _ChPid, QT},
227+
#state{queue_types_published = QTs} = State) ->
228+
State1 = State#state{queue_types_published = sets:add_element(QT, QTs)},
229+
maybe_block(State, State1).
233230

234231
%% @private
235232
handle_info({'DOWN', _, process, BlockHandler, Reason},
@@ -274,6 +271,24 @@ i(Item, #state{module = Mod, module_state = MState}) -> Mod:i(Item, MState).
274271
register_blocked_handler(Pid, HandlerPid) ->
275272
gen_server:cast(Pid, {register_blocked_handler, HandlerPid}).
276273

274+
maybe_block(State0, State1) ->
275+
WasBlocked = should_block(State0),
276+
case should_block(State1) of
277+
true when not WasBlocked ->
278+
handle_method(#'connection.blocked'{}, State1);
279+
false when WasBlocked ->
280+
handle_method(#'connection.unblocked'{}, State1);
281+
_ ->
282+
{noreply, State1}
283+
end.
284+
285+
should_block(#state{blocked_by = BlockedBy, queue_types_published = QTs}) ->
286+
lists:any(fun ({disk, QT}) ->
287+
sets:is_element(QT, QTs);
288+
(_Resource) ->
289+
true
290+
end, sets:to_list(BlockedBy)).
291+
277292
%%---------------------------------------------------------------------------
278293
%% Command handling
279294
%%---------------------------------------------------------------------------

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
-export([deactivate_limit_all/2]).
6868

6969
-export([prepend_extra_bcc/1]).
70-
-export([queue/1, queue_names/1]).
70+
-export([queue/1, queue_names/1, queue_types/1]).
7171

7272
-export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]).
7373
-export([delete_transient_queues_on_node/1]).
@@ -2097,6 +2097,15 @@ queue_names(Queues) ->
20972097
amqqueue:get_name(Q)
20982098
end, Queues).
20992099

2100+
-spec queue_types([Q | {Q, route_infos()}]) ->
2101+
[rabbit_queue_type:queue_type()] when Q :: amqqueue:target().
2102+
queue_types(Queues) ->
2103+
lists:map(fun({Q, RouteInfos}) when is_map(RouteInfos) ->
2104+
amqqueue:get_type(Q);
2105+
(Q) ->
2106+
amqqueue:get_type(Q)
2107+
end, Queues).
2108+
21002109
-spec lookup_extra_bcc(amqqueue:target(), binary()) ->
21012110
[amqqueue:target()].
21022111
lookup_extra_bcc(Q, BCCName) ->

deps/rabbit/src/rabbit_channel.erl

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@
169169
delivery_flow :: flow | noflow,
170170
interceptor_state,
171171
queue_states,
172+
queue_types_published = sets:new([{version, 2}]) ::
173+
sets:set(rabbit_queue_type:queue_type()),
172174
tick_timer,
173175
publishing_mode = false :: boolean()
174176
}).
@@ -2092,14 +2094,16 @@ deliver_to_queues(XName,
20922094
{ok, QueueStates, Actions} ->
20932095
rabbit_global_counters:messages_routed(amqp091, length(Qs)),
20942096
QueueNames = rabbit_amqqueue:queue_names(Qs),
2097+
QueueTypes = rabbit_amqqueue:queue_types(Qs),
20952098
%% NB: the order here is important since basic.returns must be
20962099
%% sent before confirms.
20972100
ok = process_routing_mandatory(Mandatory, RoutedToQueues, Message, XName, State0),
20982101
MsgSeqNo = maps:get(correlation, Options, undefined),
20992102
State1 = process_routing_confirm(MsgSeqNo, QueueNames, XName, State0),
2103+
State2 = notify_published_queue_types(QueueTypes, State1),
21002104
%% Actions must be processed after registering confirms as actions may
21012105
%% contain rejections of publishes
2102-
State = handle_queue_actions(Actions, State1#ch{queue_states = QueueStates}),
2106+
State = handle_queue_actions(Actions, State2#ch{queue_states = QueueStates}),
21032107
case rabbit_event:stats_level(State, #ch.stats_timer) of
21042108
fine ->
21052109
?INCR_STATS(exchange_stats, XName, 1, publish),
@@ -2165,6 +2169,17 @@ process_routing_confirm(MsgSeqNo, QRefs, XName, State)
21652169
State#ch{unconfirmed =
21662170
rabbit_confirms:insert(MsgSeqNo, QRefs, XName, State#ch.unconfirmed)}.
21672171

2172+
notify_published_queue_types(QueueTypes,
2173+
#ch{cfg = #conf{conn_pid = ConnPid},
2174+
queue_types_published = QTs0} = State) ->
2175+
QTs = sets:union(QTs0, sets:from_list(QueueTypes, [{version, 2}])),
2176+
sets:fold(
2177+
fun(QT, ok) ->
2178+
gen_server:cast(ConnPid,
2179+
{channel_published_to_queue_type, self(), QT})
2180+
end, ok, sets:subtract(QTs, QTs0)),
2181+
State#ch{queue_types_published = QTs}.
2182+
21682183
confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) ->
21692184
%% NOTE: if queue name does not exist here it's likely that the ref also
21702185
%% does not exist in unconfirmed messages.

deps/rabbit/src/rabbit_reader.erl

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,14 @@
113113
%% a set of the reasons why we are
114114
%% blocked: {resource, memory}, {resource, disk}.
115115
%% More reasons can be added in the future.
116-
blocked_by,
116+
blocked_by = sets:new([{version, 2}]) ::
117+
sets:set(flow | {resource,
118+
rabbit_alarm:resource_alarm_source()}),
119+
%% the set of queue types which have been published to
120+
%% by channels on this connection, used for per-queue
121+
%% type disk alarm blocking
122+
queue_types_published = #{} :: #{ChannelPid :: pid() =>
123+
sets:set(rabbit_queue_type:queue_type())},
117124
%% true if received any publishes, false otherwise
118125
%% note that this will also be true when connection is
119126
%% already blocked
@@ -335,7 +342,6 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
335342
throttle = #throttle{
336343
last_blocked_at = never,
337344
should_block = false,
338-
blocked_by = sets:new([{version, 2}]),
339345
connection_blocked_message_sent = false
340346
},
341347
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock)},
@@ -677,6 +683,14 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
677683
handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
678684
%% Ignore, we will emit a created event once we start running.
679685
State;
686+
handle_other({'$gen_cast', {channel_published_to_queue_type, ChPid, QT}},
687+
#v1{throttle = Throttle0} = State0) ->
688+
QTs = maps:update_with(
689+
ChPid, fun(ChQTs) -> sets:add_element(QT, ChQTs) end,
690+
sets:from_list([QT], [{version, 2}]),
691+
Throttle0#throttle.queue_types_published),
692+
Throttle = Throttle0#throttle{queue_types_published = QTs},
693+
control_throttle(State0#v1{throttle = Throttle});
680694
handle_other(ensure_stats, State) ->
681695
ensure_stats_timer(State);
682696
handle_other(emit_stats, State) ->
@@ -1007,14 +1021,21 @@ is_over_node_channel_limit() ->
10071021
end
10081022
end.
10091023

1010-
channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
1024+
channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount,
1025+
throttle = Throttle0} = State) ->
10111026
case get({ch_pid, ChPid}) of
1012-
undefined -> {undefined, State};
1013-
{Channel, MRef} -> credit_flow:peer_down(ChPid),
1014-
erase({channel, Channel}),
1015-
erase({ch_pid, ChPid}),
1016-
erlang:demonitor(MRef, [flush]),
1017-
{Channel, State#v1{channel_count = ChannelCount - 1}}
1027+
undefined ->
1028+
{undefined, State};
1029+
{Channel, MRef} ->
1030+
credit_flow:peer_down(ChPid),
1031+
erase({channel, Channel}),
1032+
erase({ch_pid, ChPid}),
1033+
erlang:demonitor(MRef, [flush]),
1034+
QT = maps:remove(ChPid,
1035+
Throttle0#throttle.queue_types_published),
1036+
Throttle = Throttle0#throttle{queue_types_published = QT},
1037+
{Channel, State#v1{channel_count = ChannelCount - 1,
1038+
throttle = Throttle}}
10181039
end.
10191040

10201041
all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
@@ -1738,22 +1759,44 @@ update_last_blocked_at(Throttle) ->
17381759
connection_blocked_message_sent(
17391760
#throttle{connection_blocked_message_sent = BS}) -> BS.
17401761

1741-
should_send_blocked(Throttle = #throttle{blocked_by = Reasons}) ->
1762+
should_send_blocked(Throttle) ->
17421763
should_block(Throttle)
17431764
andalso
1744-
sets:size(sets:del_element(flow, Reasons)) =/= 0
1765+
do_throttle_reasons_apply(Throttle)
17451766
andalso
17461767
not connection_blocked_message_sent(Throttle).
17471768

1748-
should_send_unblocked(Throttle = #throttle{blocked_by = Reasons}) ->
1769+
should_send_unblocked(Throttle) ->
17491770
connection_blocked_message_sent(Throttle)
17501771
andalso
1751-
sets:size(sets:del_element(flow, Reasons)) == 0.
1772+
not do_throttle_reasons_apply(Throttle).
1773+
1774+
do_throttle_reasons_apply(#throttle{blocked_by = Reasons} = Throttle) ->
1775+
lists:any(
1776+
fun ({resource, disk}) ->
1777+
true;
1778+
({resource, memory}) ->
1779+
true;
1780+
({resource, {disk, QT}}) ->
1781+
has_published_to_queue_type(QT, Throttle);
1782+
(_) ->
1783+
%% Flow control should not send connection.blocked
1784+
false
1785+
end, sets:to_list(Reasons)).
1786+
1787+
has_published_to_queue_type(QT, #throttle{queue_types_published = QTs}) ->
1788+
rabbit_misc:maps_any(
1789+
fun(_ChPid, ChQT) -> sets:is_element(QT, ChQT) end, QTs).
17521790

17531791
%% Returns true if we have a reason to block
17541792
%% this connection.
1755-
has_reasons_to_block(#throttle{blocked_by = Reasons}) ->
1756-
sets:size(Reasons) > 0.
1793+
has_reasons_to_block(#throttle{blocked_by = Reasons} = Throttle) ->
1794+
lists:any(
1795+
fun ({resource, {disk, QType}}) ->
1796+
has_published_to_queue_type(QType, Throttle);
1797+
(_) ->
1798+
true
1799+
end, sets:to_list(Reasons)).
17571800

17581801
is_blocked_by_flow(#throttle{blocked_by = Reasons}) ->
17591802
sets:is_element(flow, Reasons).

0 commit comments

Comments
 (0)