Skip to content

Commit d288f74

Browse files
committed
MQTT: Handle per-queue-type disk alarms
This includes regular MQTT and MQTT-over-WebSockets.
1 parent f1b36bb commit d288f74

File tree

3 files changed

+25
-12
lines changed

3 files changed

+25
-12
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@
9999
-record(state,
100100
{cfg :: #cfg{},
101101
queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state(),
102+
queue_types_published = sets:new([{version, 2}]) ::
103+
sets:set(rabbit_queue_type:queue_type()),
102104
%% Packet IDs published to queues but not yet confirmed.
103105
unacked_client_pubs = rabbit_mqtt_confirms:init() :: rabbit_mqtt_confirms:state(),
104106
%% Packet IDs published to MQTT subscribers but not yet acknowledged.
@@ -1697,14 +1699,19 @@ deliver_to_queues(Message,
16971699
Options,
16981700
RoutedToQNames,
16991701
State0 = #state{queue_states = QStates0,
1702+
queue_types_published = QTs0,
17001703
cfg = #cfg{proto_ver = ProtoVer}}) ->
17011704
Qs0 = rabbit_db_queue:get_targets(RoutedToQNames),
17021705
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
17031706
case rabbit_queue_type:deliver(Qs, Message, Options, QStates0) of
17041707
{ok, QStates, Actions} ->
17051708
rabbit_global_counters:messages_routed(ProtoVer, length(Qs)),
1706-
State = process_routing_confirm(Options, Qs,
1707-
State0#state{queue_states = QStates}),
1709+
QTs1 = sets:from_list(rabbit_amqqueue:queue_types(Qs),
1710+
[{version, 2}]),
1711+
QTs = sets:union(QTs0, QTs1),
1712+
State1 = State0#state{queue_states = QStates,
1713+
queue_types_published = QTs},
1714+
State = process_routing_confirm(Options, Qs, State1),
17081715
%% Actions must be processed after registering confirms as actions may
17091716
%% contain rejections of publishes.
17101717
{ok, handle_queue_actions(Actions, State)};
@@ -2362,10 +2369,18 @@ is_socket_busy(Socket) ->
23622369
false
23632370
end.
23642371

2365-
-spec throttle(boolean(), state()) -> boolean().
2366-
throttle(Conserve, #state{queues_soft_limit_exceeded = QSLE,
2367-
cfg = #cfg{published = Published}}) ->
2368-
Conserve andalso Published orelse
2372+
-spec throttle(sets:set(rabbit_alarm:resource_alarm_source()), state()) ->
2373+
boolean().
2374+
throttle(BlockedBy, #state{queues_soft_limit_exceeded = QSLE,
2375+
queue_types_published = QTs,
2376+
cfg = #cfg{published = Published}}) ->
2377+
Alarmed = sets:fold(
2378+
fun ({disk, QT}, Acc) ->
2379+
Acc orelse sets:is_element(QT, QTs);
2380+
(_, _) ->
2381+
true
2382+
end, false, BlockedBy),
2383+
Alarmed andalso Published orelse
23692384
not sets:is_empty(QSLE) orelse
23702385
credit_flow:blocked().
23712386

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -428,10 +428,9 @@ control_throttle(State = #state{connection_state = ConnState,
428428
proc_state = PState,
429429
keepalive = KState
430430
}) ->
431-
Conserve = not sets:is_empty(BlockedBy),
432431
Throttle = case PState of
433-
connect_packet_unprocessed -> Conserve;
434-
_ -> rabbit_mqtt_processor:throttle(Conserve, PState)
432+
connect_packet_unprocessed -> not sets:is_empty(BlockedBy);
433+
_ -> rabbit_mqtt_processor:throttle(BlockedBy, PState)
435434
end,
436435
case {ConnState, Throttle} of
437436
{running, true} ->

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -414,10 +414,9 @@ control_throttle(State = #state{connection_state = ConnState,
414414
proc_state = PState,
415415
keepalive = KState
416416
}) ->
417-
Conserve = not sets:is_empty(BlockedBy),
418417
Throttle = case PState of
419-
connect_packet_unprocessed -> Conserve;
420-
_ -> rabbit_mqtt_processor:throttle(Conserve, PState)
418+
connect_packet_unprocessed -> not sets:is_empty(BlockedBy);
419+
_ -> rabbit_mqtt_processor:throttle(BlockedBy, PState)
421420
end,
422421
case {ConnState, Throttle} of
423422
{running, true} ->

0 commit comments

Comments
 (0)