From 95a32d339a916a96d74e41ffbb2ca2e912857b85 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 31 Oct 2025 15:22:12 -0400 Subject: [PATCH 01/18] Allow configuring osiris data_dir in Cuttlefish config This is the same as the `raft.data_dir` option but for Osiris' data directory. Configuring this in Cuttlefish is nicer than the existing `$RABBITMQ_STREAM_DIR` environment variable way of changing the dir. --- deps/rabbit/priv/schema/rabbit.schema | 13 +++++++++++++ .../test/config_schema_SUITE_data/rabbit.snippets | 10 ++++++++++ 2 files changed, 23 insertions(+) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index c589f21a61f..6ddb986f0a6 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2776,6 +2776,19 @@ fun(Conf) -> end end}. +{mapping, "stream.data_dir", "osiris.data_dir", [ + {datatype, string} +]}. + +{translation, "osiris.data_dir", + fun(Conf) -> + case cuttlefish:conf_get("stream.data_dir", Conf, undefined) of + undefined -> cuttlefish:unset(); + Val -> Val + end + end +}. + {mapping, "stream.read_ahead", "rabbit.stream_read_ahead", [{datatype, {enum, [true, false]}}]}. diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 5a4ffc6c5df..f5e7ba93039 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1273,6 +1273,16 @@ credential_validator.regexp = ^abc\\d+", [{rabbit, [ {stream_read_ahead_limit, "8KiB"} ]}], + []}, + + %% + %% Stream data dir + %% + {stream_data_dir, + "stream.data_dir = /data/rabbitmq/stream", + [{osiris, [ + {data_dir, "/data/rabbitmq/stream"} + ]}], []} ]. From a5f1487977465f562eb4e2fe7adb969bad6cf90d Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 31 Oct 2025 15:21:49 -0400 Subject: [PATCH 02/18] Allow configuring classic queue data dir in Cuttlefish config Same as the parent commit but for classic queue data directories. --- deps/rabbit/priv/schema/rabbit.schema | 14 ++++++++++++++ deps/rabbit/src/rabbit_vhost.erl | 9 +++++++-- .../test/config_schema_SUITE_data/rabbit.snippets | 10 ++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 6ddb986f0a6..3de3b81b882 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2620,6 +2620,20 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% Classic queue data directory +{mapping, "classic_queue.data_dir", "rabbit.classic_queue_data_dir", [ + {datatype, string} +]}. + +{translation, "rabbit.classic_queue_data_dir", + fun(Conf) -> + case cuttlefish:conf_get("classic_queue.data_dir", Conf, undefined) of + undefined -> cuttlefish:unset(); + Val -> Val + end + end +}. + %% %% Backing queue version %% diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl index f6321e881dc..40f976c25e3 100644 --- a/deps/rabbit/src/rabbit_vhost.erl +++ b/deps/rabbit/src/rabbit_vhost.erl @@ -679,8 +679,13 @@ msg_store_dir_wildcard() -> rabbit_data_coercion:to_list(filename:join([msg_store_dir_base(), "*"])). msg_store_dir_base() -> - Dir = rabbit:data_dir(), - filename:join([Dir, "msg_stores", "vhosts"]). + case application:get_env(rabbit, classic_queue_data_dir) of + {ok, Dir} -> + Dir; + undefined -> + Dir = rabbit:data_dir(), + filename:join([Dir, "msg_stores", "vhosts"]) + end. config_file_path(VHost) -> VHostDir = msg_store_dir_path(VHost), diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index f5e7ba93039..6471314074e 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1111,6 +1111,16 @@ credential_validator.regexp = ^abc\\d+", [], []}, + %% + %% Classic queue data dir + %% + {classic_queue_data_dir, + "classic_queue.data_dir = /data/rabbitmq/classic", + [{rabbit, [ + {classic_queue_data_dir, "/data/rabbitmq/classic"} + ]}], + []}, + %% %% Quorum queue %% From 8d6122a323e00f3d3e1d328276dbafd4474b82f5 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 24 Oct 2025 14:00:40 -0400 Subject: [PATCH 03/18] rabbit_stream_queue: Enable recovery after registering queue type --- deps/rabbit/src/rabbit_stream_queue.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 95f8d6b2266..62ab2b22280 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -126,7 +126,8 @@ [queue, <<"stream">>, ?MODULE]}}, {cleanup, {rabbit_registry, unregister, [queue, <<"stream">>]}}, - {requires, rabbit_registry} + {requires, rabbit_registry}, + {enables, recovery} ]}). -type client() :: #stream_client{}. From 283aa51a0c3627cfa53f3df1d5ee132e21678a29 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 21 Oct 2025 17:38:49 -0400 Subject: [PATCH 04/18] rabbit_alarm: Prefer maps to dicts This is not a functional change, just a refactor to eliminate dicts and use maps instead. This cleans up some helper functions like dict_append/3, and we can use map comprehensions in some places to avoid intermediary lists. --- deps/rabbit/src/rabbit_alarm.erl | 84 +++++++++++++++----------------- 1 file changed, 38 insertions(+), 46 deletions(-) diff --git a/deps/rabbit/src/rabbit_alarm.erl b/deps/rabbit/src/rabbit_alarm.erl index 879fdae8189..84c5188f159 100644 --- a/deps/rabbit/src/rabbit_alarm.erl +++ b/deps/rabbit/src/rabbit_alarm.erl @@ -42,9 +42,9 @@ %%---------------------------------------------------------------------------- --record(alarms, {alertees :: dict:dict(pid(), rabbit_types:mfargs()), - alarmed_nodes :: dict:dict(node(), [resource_alarm_source()]), - alarms :: [alarm()]}). +-record(alarms, {alertees = #{} :: #{pid() => rabbit_types:mfargs()}, + alarmed_nodes = #{} :: #{node() => [resource_alarm_source()]}, + alarms = [] :: [alarm()]}). -export_type([alarm/0]). -type local_alarm() :: 'file_descriptor_limit'. @@ -90,7 +90,7 @@ stop() -> ok. %% called like this: `apply(M, F, A ++ [Pid, Source, Alert])', where `Source' %% has the type of resource_alarm_source() and `Alert' has the type of resource_alert(). --spec register(pid(), rabbit_types:mfargs()) -> [atom()]. +-spec register(pid(), rabbit_types:mfargs()) -> [resource_alarm_source()]. register(Pid, AlertMFA) -> gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity). @@ -177,12 +177,10 @@ remote_conserve_resources(Pid, Source, {false, _, _}) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, #alarms{alertees = dict:new(), - alarmed_nodes = dict:new(), - alarms = []}}. + {ok, #alarms{}}. handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) -> - {ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])), + {ok, lists:usort(lists:append(maps:values(AN))), internal_register(Pid, AlertMFA, State)}; handle_call(get_alarms, State) -> @@ -236,14 +234,11 @@ handle_event({node_up, Node}, State) -> {ok, State}; handle_event({node_down, Node}, #alarms{alarmed_nodes = AN} = State) -> - AlarmsForDeadNode = case dict:find(Node, AN) of - {ok, V} -> V; - error -> [] - end, + AlarmsForDeadNode = maps:get(Node, AN, []), {ok, lists:foldr(fun(Source, AccState) -> ?LOG_WARNING("~ts resource limit alarm cleared for dead node ~tp", [Source, Node]), - maybe_alert(fun dict_unappend/3, Node, Source, false, AccState) + maybe_alert(fun map_unappend/3, Node, Source, false, AccState) end, State, AlarmsForDeadNode)}; handle_event({register, Pid, AlertMFA}, State) -> @@ -254,7 +249,7 @@ handle_event(_Event, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #alarms{alertees = Alertees}) -> - {ok, State#alarms{alertees = dict:erase(Pid, Alertees)}}; + {ok, State#alarms{alertees = maps:remove(Pid, Alertees)}}; handle_info(_Info, State) -> {ok, State}. @@ -267,22 +262,14 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -dict_append(Key, Val, Dict) -> - L = case dict:find(Key, Dict) of - {ok, V} -> V; - error -> [] - end, - dict:store(Key, lists:usort([Val|L]), Dict). - -dict_unappend(Key, Val, Dict) -> - L = case dict:find(Key, Dict) of - {ok, V} -> V; - error -> [] - end, +map_append(Key, Val, Map) -> + maps:update_with(Key, fun(Vs) -> [Val | Vs] end, [Val], Map). +map_unappend(Key, Val, Map) -> + L = maps:get(Key, Map, []), case lists:delete(Val, L) of - [] -> dict:erase(Key, Dict); - X -> dict:store(Key, X, Dict) + [] -> maps:remove(Key, Map); + X -> Map#{Key := X} end. maybe_alert(UpdateFun, Node, Source, WasAlertAdded, @@ -290,7 +277,10 @@ maybe_alert(UpdateFun, Node, Source, WasAlertAdded, alertees = Alertees}) -> AN1 = UpdateFun(Node, Source, AN), %% Is alarm for Source still set on any node? - StillHasAlerts = lists:any(fun ({_Node, NodeAlerts}) -> lists:member(Source, NodeAlerts) end, dict:to_list(AN1)), + StillHasAlerts = rabbit_misc:maps_any( + fun(_Node, NodeAlerts) -> + lists:member(Source, NodeAlerts) + end, AN1), case StillHasAlerts of true -> ok; false -> ?LOG_WARNING("~ts resource limit alarm cleared across the cluster", [Source]) @@ -311,22 +301,24 @@ alert_remote(Alert, Alertees, Source) -> alert(Alertees, Source, Alert, NodeComparator) -> Node = node(), - dict:fold(fun (Pid, {M, F, A}, ok) -> - case NodeComparator(Node, node(Pid)) of - true -> apply(M, F, A ++ [Pid, Source, Alert]); - false -> ok - end - end, ok, Alertees). + maps:foreach(fun (Pid, {M, F, A}) -> + case NodeComparator(Node, node(Pid)) of + true -> apply(M, F, A ++ [Pid, Source, Alert]); + false -> ok + end + end, Alertees). internal_register(Pid, {M, F, A} = AlertMFA, State = #alarms{alertees = Alertees}) -> _MRef = erlang:monitor(process, Pid), - _ = case dict:find(node(), State#alarms.alarmed_nodes) of - {ok, Sources} -> [apply(M, F, A ++ [Pid, R, {true, true, node()}]) || R <- Sources]; - error -> ok + Node = node(), + _ = case State#alarms.alarmed_nodes of + #{Node := Sources} -> + [apply(M, F, A ++ [Pid, R, {true, true, node()}]) || R <- Sources]; + _ -> + ok end, - NewAlertees = dict:store(Pid, AlertMFA, Alertees), - State#alarms{alertees = NewAlertees}. + State#alarms{alertees = Alertees#{Pid => AlertMFA}}. handle_set_resource_alarm(Source, Node, State) -> ?LOG_WARNING( @@ -335,7 +327,7 @@ handle_set_resource_alarm(Source, Node, State) -> "*** Publishers will be blocked until this alarm clears ***~n" "**********************************************************~n", [Source, Node]), - {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)}. + {ok, maybe_alert(fun map_append/3, Node, Source, true, State)}. handle_set_alarm({file_descriptor_limit, []}, State) -> ?LOG_WARNING( @@ -351,7 +343,7 @@ handle_set_alarm(Alarm, State) -> handle_clear_resource_alarm(Source, Node, State) -> ?LOG_WARNING("~ts resource limit alarm cleared on node ~tp", [Source, Node]), - {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}. + {ok, maybe_alert(fun map_unappend/3, Node, Source, false, State)}. handle_clear_alarm(file_descriptor_limit, State) -> ?LOG_WARNING("file descriptor limit alarm cleared~n"), @@ -361,14 +353,14 @@ handle_clear_alarm(Alarm, State) -> {ok, State}. is_node_alarmed(Source, Node, #alarms{alarmed_nodes = AN}) -> - case dict:find(Node, AN) of - {ok, Sources} -> + case AN of + #{Node := Sources} -> lists:member(Source, Sources); - error -> + _ -> false end. compute_alarms(#alarms{alarms = Alarms, alarmed_nodes = AN}) -> Alarms ++ [ {{resource_limit, Source, Node}, []} - || {Node, Sources} <- dict:to_list(AN), Source <- Sources ]. + || Node := Sources <- AN, Source <- Sources ]. From d73d45d5842daed56905adac18c851f45654080f Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 16 Sep 2025 10:36:35 -0400 Subject: [PATCH 05/18] rabbit_env: Enable disksup in os_mon but set threshold to 1.0 Previously we set `start_disksup` to `false` to avoid OTP's automatic monitoring of disk space. `disksup`'s gen_server starts a port (which runs `df` on Unix) which measures disk usage and sets an alarm through OTP's `alarm_handler` when usage exceeds the configured `disk_almost_full_threshold`. We can set this threshold to 1.0 to effectively turn off disksup's monitoring (i.e. the alarm will never be set). By enabling disksup we have access to `get_disk_data/0` and `get_disk_info/0,1` which can be used to replace the copied versions in `rabbit_disk_monitor`. --- deps/rabbit_common/src/rabbit_env.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/deps/rabbit_common/src/rabbit_env.erl b/deps/rabbit_common/src/rabbit_env.erl index e3c551ae9c0..bc70a8e3128 100644 --- a/deps/rabbit_common/src/rabbit_env.erl +++ b/deps/rabbit_common/src/rabbit_env.erl @@ -306,7 +306,12 @@ context_to_app_env_vars1( [{kernel, inet_default_connect_options, [{nodelay, true}]}, {sasl, errlog_type, error}, {os_mon, start_cpu_sup, false}, - {os_mon, start_disksup, false}, + %% Start disksup but configure the threshold high enough that it will + %% never alarm. `disksup' must be started to call `get_disk_info/0,1' + %% and `get_disk_data/0' but we don't want it polluting the logs with + %% its alarms. Alarming is done by `rabbit_disk_monitor' instead. + {os_mon, start_disksup, true}, + {os_mon, disk_almost_full_threshold, 1.0}, {os_mon, start_memsup, false}, {mnesia, dir, DataDir}, {ra, data_dir, QuorumQueueDir}, From 74e0151785fe2e06f79fbb9dd7ac726ec100093e Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 16 Sep 2025 12:07:59 -0400 Subject: [PATCH 06/18] rabbit_disk_monitor: Use disksup to determine available bytes `disksup` now exposes the calculation for available disk space for a given path using the same `df` mechanism on Unix. We can use this directly and drop the custom code which reimplements that. --- deps/rabbit/src/rabbit_disk_monitor.erl | 195 ++----------------- deps/rabbit/test/unit_disk_monitor_SUITE.erl | 4 +- 2 files changed, 18 insertions(+), 181 deletions(-) diff --git a/deps/rabbit/src/rabbit_disk_monitor.erl b/deps/rabbit/src/rabbit_disk_monitor.erl index 292bce853d7..12f8ae29d29 100644 --- a/deps/rabbit/src/rabbit_disk_monitor.erl +++ b/deps/rabbit/src/rabbit_disk_monitor.erl @@ -15,11 +15,6 @@ %% watermark (configurable either as an absolute value or %% relative to the memory limit). %% -%% Disk monitoring is done by shelling out to /usr/bin/df -%% instead of related built-in OTP functions because currently -%% this is the most reliable way of determining free disk space -%% for the partition our internal database is on. -%% %% Update interval is dynamically calculated assuming disk %% space is being filled at FAST_RATE. @@ -65,11 +60,7 @@ %% on start-up retries, %% Interval between retries - interval, - %% Operating system in use - os, - %% Port running sh to execute df commands - port + interval }). %%---------------------------------------------------------------------------- @@ -134,18 +125,9 @@ init([Limit]) -> State1 = set_min_check_interval(?DEFAULT_MIN_DISK_CHECK_INTERVAL, State0), State2 = set_max_check_interval(?DEFAULT_MAX_DISK_CHECK_INTERVAL, State1), - OS = os:type(), - Port = case OS of - {unix, _} -> - start_portprogram(); - {win32, _OSname} -> - not_used - end, - State3 = State2#state{port=Port, os=OS}, - - State4 = enable(State3), + State3 = enable(State2), - {ok, State4}. + {ok, State3}. handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) -> ?LOG_INFO("Cannot set disk free limit: " @@ -210,43 +192,6 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%---------------------------------------------------------------------------- -start_portprogram() -> - Args = ["-s", "rabbit_disk_monitor"], - Opts = [stream, stderr_to_stdout, {args, Args}], - erlang:open_port({spawn_executable, "/bin/sh"}, Opts). - -run_port_cmd(Cmd0, Port) -> - %% Insert a carriage return, ^M or ASCII 13, after the command, - %% to indicate end of output - Cmd1 = io_lib:format("~ts < /dev/null; echo \"\^M\"~n", [Cmd0]), - Cmd2 = rabbit_data_coercion:to_utf8_binary(Cmd1), - Port ! {self(), {command, [Cmd2, 10]}}, % The 10 at the end is a newline - get_reply(Port, []). - -get_reply(Port, O) -> - receive - {Port, {data, N}} -> - case newline(N, O) of - {ok, Str} -> - Str; - {more, Acc} -> - get_reply(Port, Acc) - end; - {'EXIT', Port, Reason} -> - exit({port_died, Reason}) - end. - -% Character 13 is ^M or carriage return -newline([13|_], B) -> - {ok, lists:reverse(B)}; -newline([H|T], B) -> - newline(T, [H|B]); -newline([], B) -> - {more, B}. - -find_cmd(Cmd) -> - os:find_executable(Cmd). - safe_ets_lookup(Key, Default) -> try case ets:lookup(?ETS_NAME, Key) of @@ -281,10 +226,8 @@ set_disk_limits(State, Limit0) -> internal_update(State = #state{limit = Limit, dir = Dir, - alarmed = Alarmed, - os = OS, - port = Port}) -> - CurrentFree = get_disk_free(Dir, OS, Port), + alarmed = Alarmed}) -> + CurrentFree = get_disk_free(Dir), %% note: 'NaN' is considered to be less than a number NewAlarmed = CurrentFree < Limit, case {Alarmed, NewAlarmed} of @@ -300,103 +243,16 @@ internal_update(State = #state{limit = Limit, ets:insert(?ETS_NAME, {disk_free, CurrentFree}), State#state{alarmed = NewAlarmed, actual = CurrentFree}. -get_disk_free(Dir, {unix, Sun}, Port) - when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris -> - Df = find_cmd("df"), - parse_free_unix(run_port_cmd(Df ++ " -k '" ++ Dir ++ "'", Port)); -get_disk_free(Dir, {unix, _}, Port) -> - Df = find_cmd("df"), - parse_free_unix(run_port_cmd(Df ++ " -kP '" ++ Dir ++ "'", Port)); -get_disk_free(Dir, {win32, _}, not_used) -> - % Dir: - % "c:/Users/username/AppData/Roaming/RabbitMQ/db/rabbit2@username-z01-mnesia" - case win32_get_drive_letter(Dir) of - error -> - ?LOG_WARNING("Expected the mnesia directory absolute " - "path to start with a drive letter like " - "'C:'. The path is: '~tp'", [Dir]), - {ok, Free} = win32_get_disk_free_dir(Dir), - Free; - DriveLetter -> - % Note: yes, "$\s" is the $char sequence for an ASCII space - F = fun([D, $:, $\\, $\s | _]) when D =:= DriveLetter -> - true; - (_) -> false - end, - % Note: we can use os_mon_sysinfo:get_disk_info/1 after the following is fixed: - % https://github.com/erlang/otp/issues/6156 - try - % Note: DriveInfoStr is in this format - % "C:\\ DRIVE_FIXED 720441434112 1013310287872 720441434112\n" - Lines = os_mon_sysinfo:get_disk_info(), - [DriveInfoStr] = lists:filter(F, Lines), - [DriveLetter, $:, $\\, $\s | DriveInfo] = DriveInfoStr, - - % https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-getdiskfreespaceexa - % lib/os_mon/c_src/win32sysinfo.c: - % if (fpGetDiskFreeSpaceEx(drive,&availbytes,&totbytes,&totbytesfree)){ - % sprintf(answer,"%s DRIVE_FIXED %I64u %I64u %I64u\n",drive,availbytes,totbytes,totbytesfree); - ["DRIVE_FIXED", FreeBytesAvailableToCallerStr, - _TotalNumberOfBytesStr, _TotalNumberOfFreeBytesStr] = string:tokens(DriveInfo, " "), - list_to_integer(FreeBytesAvailableToCallerStr) - catch _:{timeout, _}:_ -> - %% could not compute the result - 'NaN'; - _:Reason:_ -> - ?LOG_WARNING("Free disk space monitoring failed to retrieve the amount of available space: ~p", [Reason]), - %% could not compute the result - 'NaN' - end - end. - -parse_free_unix(Str) -> - case string:tokens(Str, "\n") of - [_, S | _] -> case string:tokens(S, " \t") of - [_, _, _, Free | _] -> list_to_integer(Free) * 1024; - _ -> exit({unparseable, Str}) - end; - _ -> exit({unparseable, Str}) - end. - -win32_get_drive_letter([DriveLetter, $:, $/ | _]) when (DriveLetter >= $a andalso DriveLetter =< $z) -> - % Note: os_mon_sysinfo returns drives with uppercase letters, so uppercase it here - DriveLetter - 32; -win32_get_drive_letter([DriveLetter, $:, $/ | _]) when (DriveLetter >= $A andalso DriveLetter =< $Z) -> - DriveLetter; -win32_get_drive_letter(_) -> - error. - -win32_get_disk_free_dir(Dir) -> - %% On Windows, the Win32 API enforces a limit of 260 characters - %% (MAX_PATH). If we call `dir` with a path longer than that, it - %% fails with "File not found". Starting with Windows 10 version - %% 1607, this limit was removed, but the administrator has to - %% configure that. - %% - %% NTFS supports paths up to 32767 characters. Therefore, paths - %% longer than 260 characters exist but they are "inaccessible" to - %% `dir`. - %% - %% A workaround is to tell the Win32 API to not parse a path and - %% just pass it raw to the underlying filesystem. To do this, the - %% path must be prepended with "\\?\". That's what we do here. - %% - %% However, the underlying filesystem may not support forward - %% slashes transparently, as the Win32 API does. Therefore, we - %% convert all forward slashes to backslashes. - %% - %% See the following page to learn more about this: - %% https://ss64.com/nt/syntax-filenames.html - RawDir = "\\\\?\\" ++ string:replace(Dir, "/", "\\", all), - case run_os_cmd("dir /-C /W \"" ++ RawDir ++ "\"") of - {error, Error} -> - exit({unparseable, Error}); - CommandResult -> - LastLine0 = lists:last(string:tokens(CommandResult, "\r\n")), - LastLine1 = lists:reverse(LastLine0), - {match, [Free]} = re:run(LastLine1, "(\\d+)", - [{capture, all_but_first, list}]), - {ok, list_to_integer(lists:reverse(Free))} +-spec get_disk_free(file:filename_all()) -> + AvailableBytes :: non_neg_integer() | 'NaN'. +get_disk_free(Dir) -> + case disksup:get_disk_info(Dir) of + [{D, 0, 0, 0, 0}] when D =:= Dir orelse D =:= "none" -> + 'NaN'; + [{_MountPoint, _TotalKiB, AvailableKiB, _Capacity}] -> + AvailableKiB * 1024; + _DiskInfo -> + 'NaN' end. interpret_limit({mem_relative, Relative}) @@ -437,8 +293,8 @@ interval(#state{limit = Limit, enable(#state{retries = 0} = State) -> ?LOG_ERROR("Free disk space monitor failed to start!"), State; -enable(#state{dir = Dir, os = OS, port = Port} = State) -> - enable_handle_disk_free(catch get_disk_free(Dir, OS, Port), State). +enable(#state{dir = Dir} = State) -> + enable_handle_disk_free(get_disk_free(Dir), State). enable_handle_disk_free(DiskFree, State) when is_integer(DiskFree) -> enable_handle_total_memory(catch vm_memory_monitor:get_total_memory(), DiskFree, State); @@ -461,20 +317,3 @@ enable_handle_total_memory(Error, _DiskFree, #state{interval = Interval, retries [Retries, Error]), erlang:send_after(Interval, self(), try_enable), State#state{enabled = false}. - -run_os_cmd(Cmd) -> - Pid = self(), - Ref = make_ref(), - CmdFun = fun() -> - CmdResult = rabbit_misc:os_cmd(Cmd), - Pid ! {Pid, Ref, CmdResult} - end, - CmdPid = spawn(CmdFun), - receive - {Pid, Ref, CmdResult} -> - CmdResult - after 5000 -> - exit(CmdPid, kill), - ?LOG_ERROR("Command timed out: '~ts'", [Cmd]), - {error, timeout} - end. diff --git a/deps/rabbit/test/unit_disk_monitor_SUITE.erl b/deps/rabbit/test/unit_disk_monitor_SUITE.erl index 3058cc904eb..2a795f01376 100644 --- a/deps/rabbit/test/unit_disk_monitor_SUITE.erl +++ b/deps/rabbit/test/unit_disk_monitor_SUITE.erl @@ -9,9 +9,7 @@ -include_lib("eunit/include/eunit.hrl"). --compile(export_all). - --define(TIMEOUT, 30000). +-compile([nowarn_export_all, export_all]). all() -> [ From 94b0a83abd4d136eb3c03da594f5a81e2d069698 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 16 Oct 2025 10:06:09 -0400 Subject: [PATCH 07/18] rabbit.schema: Add config options for per-queue-type disk limits --- deps/rabbit/priv/schema/rabbit.schema | 67 +++++++++++++++++++ .../config_schema_SUITE_data/rabbit.snippets | 20 ++++++ 2 files changed, 87 insertions(+) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 3de3b81b882..0c35f32e97e 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -1238,6 +1238,73 @@ fun(Conf) -> end end}. +%% Per-queue-type / per-mount disk alarms +{mapping, "disk_free_limits.$num.name", "rabbit.disk_free_limits", [ + {datatype, [binary]} +]}. +{mapping, "disk_free_limits.$num.mount", "rabbit.disk_free_limits", [ + {datatype, [string]} +]}. +{mapping, "disk_free_limits.$num.limit", "rabbit.disk_free_limits", [ + {datatype, [integer, string]}, + {validators, ["is_supported_information_unit"]} +]}. +{mapping, "disk_free_limits.$num.queue_types", "rabbit.disk_free_limits", [ + {datatype, [binary]} +]}. + +{translation, "rabbit.disk_free_limits", +fun(Conf) -> + case cuttlefish_variable:filter_by_prefix("disk_free_limits", Conf) of + [] -> + cuttlefish:unset(); + Settings -> + Ls = lists:foldl( + fun ({["disk_free_limits", Num, Key0], Value0}, Acc) -> + Idx = case string:to_integer(Num) of + {N, []} -> N; + _ -> cuttlefish:invalid(lists:flatten(io_lib:format("~p could not be parsed as a number", [Num]))) + end, + Key = case Key0 of + "name" -> name; + "mount" -> mount; + "limit" -> limit; + "queue_types" -> queue_types; + _ -> cuttlefish:invalid(lists:flatten(io_lib:format("~p is invalid", [Key0]))) + end, + Value = case Key of + queue_types -> string:split(Value0, ","); + _ -> Value0 + end, + maps:update_with( + Idx, + fun (#{Key := ExistingValue} = Limit) -> + cuttlefish:warn( + io_lib:format("Disk limit ~b has duplicate setting ~ts, " + "using ~tp instead of ~tp", + [Idx, Key, Value, ExistingValue])), + Limit#{Key := Value}; + (Limit) -> + Limit#{Key => Value} + end, #{Key => Value}, Acc); + (Other, _Acc) -> + cuttlefish:invalid( + lists:flatten(io_lib:format("~p is invalid", [Other]))) + end, #{}, Settings), + maps:fold( + fun(_Idx, #{name := Name}, Names) -> + case sets:is_element(Name, Names) of + true -> + cuttlefish:invalid( + lists:flatten(io_lib:format("name ~ts is used by multiple mounts", [Name]))); + false -> + sets:add_element(Name, Names) + end + end, sets:new([{version, 2}]), Ls), + Ls + end +end}. + %% %% Clustering %% ===================== diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 6471314074e..36cab3e370b 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -467,6 +467,26 @@ tcp_listen_options.exit_on_close = false", "total_memory_available_override_value = 1024MB", [{rabbit,[{total_memory_available_override_value, "1024MB"}]}], []}, + {disk_free_limits_per_mount, + "disk_free_limits.1.name = messaging + disk_free_limits.1.mount = /data/queues + disk_free_limits.1.limit = 2GB + disk_free_limits.1.queue_types = classic,quorum + + disk_free_limits.2.name = streaming + disk_free_limits.2.mount = /data/streams + disk_free_limits.2.limit = 2GB + disk_free_limits.2.queue_types = stream", + [{rabbit,[{disk_free_limits, + #{1 => #{name => <<"messaging">>, + mount => "/data/queues", + limit => "2GB", + queue_types => [<<"classic">>, <<"quorum">>]}, + 2 => #{name => <<"streaming">>, + mount => "/data/streams", + limit => "2GB", + queue_types => [<<"stream">>]}}}]}], + []}, {ranch_connection_max, "ranch_connection_max = 999", [{rabbit,[{ranch_connection_max, 999}]}], From b3b5cc4465f7ac37b8602d85317126a4068da3c4 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 21 Oct 2025 16:04:09 -0400 Subject: [PATCH 08/18] rabbit_disk_monitor: Monitor per-queue-type mounts --- deps/rabbit/src/rabbit_disk_monitor.erl | 330 +++++++++++++++++++----- 1 file changed, 263 insertions(+), 67 deletions(-) diff --git a/deps/rabbit/src/rabbit_disk_monitor.erl b/deps/rabbit/src/rabbit_disk_monitor.erl index 12f8ae29d29..c4415b2f213 100644 --- a/deps/rabbit/src/rabbit_disk_monitor.erl +++ b/deps/rabbit/src/rabbit_disk_monitor.erl @@ -28,23 +28,38 @@ -export([get_disk_free_limit/0, set_disk_free_limit/1, get_min_check_interval/0, set_min_check_interval/1, get_max_check_interval/0, set_max_check_interval/1, - get_disk_free/0, set_enabled/1]). + get_disk_free/0, get_mount_free/0, set_enabled/1]). -define(SERVER, ?MODULE). -define(ETS_NAME, ?MODULE). +-define(MOUNT_ETS_NAME, rabbit_disk_monitor_per_mount). -define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100). -define(DEFAULT_MAX_DISK_CHECK_INTERVAL, 10000). -define(DEFAULT_DISK_FREE_LIMIT, 50000000). %% 250MB/s i.e. 250kB/ms -define(FAST_RATE, (250 * 1000)). +-record(mount, + {%% name set in configuration + name :: binary(), + %% number set in configuration, used to order the disks in the UI + precedence :: integer(), + %% minimum bytes available + limit :: non_neg_integer(), + %% detected available disk space in bytes + available = 'NaN' :: non_neg_integer() | 'NaN', + %% set of queue types which should be blocked if the limit is exceeded + queue_types :: sets:set(rabbit_queue_type:queue_type())}). + -record(state, { - %% monitor partition on which this directory resides + %% monitor partition on which the data directory resides dir, %% configured limit in bytes limit, %% last known free disk space amount in bytes actual, + %% extra file systems to monitor mapped to the queue types to + mounts = #{} :: mounts(), %% minimum check interval min_interval, %% maximum check interval @@ -67,6 +82,19 @@ -type disk_free_limit() :: integer() | {'absolute', integer()} | string() | {'mem_relative', float() | integer()}. +-type mounts() :: #{file:filename() => #mount{}}. + +%%---------------------------------------------------------------------------- + +%% This needs to wait until the recovery phase so that queue types have a +%% chance to register themselves. +-rabbit_boot_step({monitor_mounts, + [{description, "monitor per-queue-type mounts"}, + {mfa, {gen_server, call, + [?MODULE, monitor_mounts]}}, + {requires, recovery}, + {enables, routing_ready}]}). + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -99,6 +127,28 @@ set_max_check_interval(Interval) -> get_disk_free() -> safe_ets_lookup(disk_free, 'NaN'). +-spec get_mount_free() -> + [#{name := binary(), + available := non_neg_integer() | 'NaN', + limit := pos_integer()}]. +get_mount_free() -> + Ms0 = try + ets:tab2list(?MOUNT_ETS_NAME) + catch + error:badarg -> + [] + end, + Ms = lists:sort( + fun(#mount{precedence = A}, #mount{precedence = B}) -> + %% ascending + A < B + end, Ms0), + [#{name => Name, + available => Available, + limit => Limit} || #mount{name = Name, + available = Available, + limit = Limit} <- Ms]. + -spec set_enabled(string()) -> 'ok'. set_enabled(Enabled) -> gen_server:call(?MODULE, {set_enabled, Enabled}). @@ -112,12 +162,12 @@ start_link(Args) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). init([Limit]) -> - Dir = dir(), {ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries), {ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval), ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), - State0 = #state{dir = Dir, - alarmed = false, + ?MOUNT_ETS_NAME = ets:new(?MOUNT_ETS_NAME, [protected, set, named_table, + {keypos, #mount.name}]), + State0 = #state{alarmed = false, enabled = true, limit = Limit, retries = Retries, @@ -166,6 +216,15 @@ handle_call({set_enabled, _Enabled = false}, _From, State = #state{enabled = fal ?LOG_INFO("Free disk space monitor was already disabled"), {reply, ok, State#state{enabled = false}}; +handle_call(monitor_mounts, _From, State) -> + case State of + #state{enabled = true} -> + State1 = State#state{mounts = mounts()}, + {reply, ok, internal_update(State1)}; + #state{enabled = false} -> + {reply, ok, State} + end; + handle_call(_Request, _From, State) -> {noreply, State}. @@ -205,9 +264,6 @@ safe_ets_lookup(Key, Default) -> Default end. -% the partition / drive containing this directory will be monitored -dir() -> rabbit:data_dir(). - set_min_check_interval(MinInterval, State) -> ets:insert(?ETS_NAME, {min_check_interval, MinInterval}), State#state{min_interval = MinInterval}. @@ -224,36 +280,86 @@ set_disk_limits(State, Limit0) -> ets:insert(?ETS_NAME, {disk_free_limit, Limit}), internal_update(State1). -internal_update(State = #state{limit = Limit, - dir = Dir, - alarmed = Alarmed}) -> - CurrentFree = get_disk_free(Dir), +internal_update(#state{limit = DataDirLimit, + dir = Dir, + mounts = Mounts, + alarmed = Alarmed} = State) -> + DiskFree = get_disk_free(State), + DataDirFree = maps:get(Dir, DiskFree, 'NaN'), %% note: 'NaN' is considered to be less than a number - NewAlarmed = CurrentFree < Limit, + NewAlarmed = DataDirFree < DataDirLimit, case {Alarmed, NewAlarmed} of {false, true} -> - emit_update_info("insufficient", CurrentFree, Limit), + emit_update_info("insufficient", DataDirFree, DataDirLimit), rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []}); {true, false} -> - emit_update_info("sufficient", CurrentFree, Limit), + emit_update_info("sufficient", DataDirFree, DataDirLimit), rabbit_alarm:clear_alarm({resource_limit, disk, node()}); _ -> ok end, - ets:insert(?ETS_NAME, {disk_free, CurrentFree}), - State#state{alarmed = NewAlarmed, actual = CurrentFree}. - --spec get_disk_free(file:filename_all()) -> - AvailableBytes :: non_neg_integer() | 'NaN'. -get_disk_free(Dir) -> - case disksup:get_disk_info(Dir) of - [{D, 0, 0, 0, 0}] when D =:= Dir orelse D =:= "none" -> - 'NaN'; - [{_MountPoint, _TotalKiB, AvailableKiB, _Capacity}] -> - AvailableKiB * 1024; - _DiskInfo -> - 'NaN' - end. + ets:insert(?ETS_NAME, {disk_free, DataDirFree}), + + NewMounts = maps:map( + fun(Path, M) -> + Available = maps:get(Path, DiskFree, 'NaN'), + M#mount{available = Available} + end, Mounts), + ets:insert(?MOUNT_ETS_NAME, [M || _Path := M <- NewMounts]), + + AlarmedMs = alarmed_mounts(Mounts), + NewAlarmedMs = alarmed_mounts(NewMounts), + + NewlyClearedMs = sets:subtract(AlarmedMs, NewAlarmedMs), + NewlyAlarmedMs = sets:subtract(NewAlarmedMs, AlarmedMs), + + lists:foreach( + fun(Path) -> + #mount{name = Name, + limit = Limit, + available = Available} = maps:get(Path, NewMounts), + emit_update_info(Name, "insufficient", Available, Limit) + end, lists:sort(sets:to_list(NewlyAlarmedMs))), + %% TODO: rabbit_alarm:set_alarm/1 for affected queue types + lists:foreach( + fun(Path) -> + #mount{name = Name, + limit = Limit, + available = Available} = maps:get(Path, NewMounts), + emit_update_info(Name, "sufficient", Available, Limit) + end, lists:sort(sets:to_list(NewlyClearedMs))), + %% TODO: rabbit_alarm:clear_alarm/1 for affected queue types + + State#state{alarmed = NewAlarmed, + actual = DataDirFree, + mounts = NewMounts}. + +emit_update_info(StateStr, CurrentFree, Limit) -> + ?LOG_INFO( + "Free disk space is ~ts. Free bytes: ~b. Limit: ~b", + [StateStr, CurrentFree, Limit]). +emit_update_info(MountPoint, StateStr, CurrentFree, Limit) -> + ?LOG_INFO( + "Free space of disk '~ts' is ~ts. Free bytes: ~b. Limit: ~b", + [MountPoint, StateStr, CurrentFree, Limit]). + +-spec alarmed_mounts(mounts()) -> sets:set(file:filename()). +alarmed_mounts(Mounts) -> + maps:fold( + fun (Path, #mount{available = Available, + limit = Limit}, Acc) when Available < Limit -> + sets:add_element(Path, Acc); + (_Path, _Mount, Acc) -> + Acc + end, sets:new([{version, 2}]), Mounts). + +-spec get_disk_free(#state{}) -> + #{file:filename() => AvailableB :: non_neg_integer()}. +get_disk_free(#state{dir = DataDir, mounts = Mounts}) -> + #{Mount => AvailableKiB * 1024 || + {Mount, Total, AvailableKiB, Capacity} <- disksup:get_disk_info(), + {Total, AvailableKiB, Capacity} =/= {0, 0, 0}, + Mount =:= DataDir orelse is_map_key(Mount, Mounts)}. interpret_limit({mem_relative, Relative}) when is_number(Relative) -> @@ -269,51 +375,141 @@ interpret_limit(Absolute) -> ?DEFAULT_DISK_FREE_LIMIT end. -emit_update_info(StateStr, CurrentFree, Limit) -> - ?LOG_INFO( - "Free disk space is ~ts. Free bytes: ~b. Limit: ~b", - [StateStr, CurrentFree, Limit]). - start_timer(State) -> State#state{timer = erlang:send_after(interval(State), self(), update)}. -interval(#state{alarmed = true, - max_interval = MaxInterval}) -> - MaxInterval; -interval(#state{actual = 'NaN', - max_interval = MaxInterval}) -> - MaxInterval; -interval(#state{limit = Limit, - actual = Actual, +interval(#state{actual = DataDirAvailable, + limit = DataDirLimit, + mounts = Mounts, min_interval = MinInterval, max_interval = MaxInterval}) -> - IdealInterval = 2 * (Actual - Limit) / ?FAST_RATE, + DataDirGap = case DataDirAvailable of + N when is_integer(N) -> + N - DataDirLimit; + _ -> + 1_000_000_000 + end, + SmallestGap = maps:fold( + fun (_Path, #mount{available = A, limit = L}, Min) + when is_integer(A) -> + erlang:min(A - L, Min); + (_Path, _Mount, Min) -> + Min + end, DataDirGap, Mounts), + IdealInterval = 2 * SmallestGap / ?FAST_RATE, trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))). +-spec mounts() -> mounts(). +mounts() -> + case application:get_env(rabbit, disk_free_limits) of + {ok, Limits} -> + maps:fold( + fun(Prec, #{name := Name, + mount := Path, + limit := Limit0, + queue_types := QTs0}, Acc) -> + Res = rabbit_resource_monitor_misc:parse_information_unit( + Limit0), + case Res of + {ok, Limit} -> + {Known, Unknown} = resolve_queue_types(QTs0), + case Unknown of + [_ | _] -> + ?LOG_WARNING( + "Unknown queue types configured for " + "disk '~ts': ~ts", + [Name, lists:join(", ", Unknown)]), + ok; + _ -> + ok + end, + case Known of + [] -> + ?LOG_ERROR("No known queue types " + "configured for disk '~ts'. " + "The disk will not be " + "monitored for free " + "disk space.", [Name]), + Acc; + _ -> + QTs = sets:from_list(Known, + [{version, 2}]), + Mount = #mount{name = Name, + precedence = Prec, + limit = Limit, + queue_types = QTs}, + Acc#{Path => Mount} + end; + {error, parse_error} -> + ?LOG_ERROR("Unable to parse free disk limit " + "'~ts' for disk '~ts'. The disk will " + "not be monitored for free space.", + [Limit0, Name]), + Acc + end + end, #{}, Limits); + undefined -> + #{} + end. + +resolve_queue_types(QTs) -> + resolve_queue_types(QTs, {[], []}). + +resolve_queue_types([], Acc) -> + Acc; +resolve_queue_types([QT | Rest], {Known, Unknown}) -> + case rabbit_registry:lookup_type_module(queue, QT) of + {ok, TypeModule} -> + resolve_queue_types(Rest, {[TypeModule | Known], Unknown}); + {error, not_found} -> + resolve_queue_types(Rest, {Known, [QT | Unknown]}) + end. + enable(#state{retries = 0} = State) -> ?LOG_ERROR("Free disk space monitor failed to start!"), State; -enable(#state{dir = Dir} = State) -> - enable_handle_disk_free(get_disk_free(Dir), State). - -enable_handle_disk_free(DiskFree, State) when is_integer(DiskFree) -> - enable_handle_total_memory(catch vm_memory_monitor:get_total_memory(), DiskFree, State); -enable_handle_disk_free(Error, #state{interval = Interval, retries = Retries} = State) -> - ?LOG_WARNING("Free disk space monitor encountered an error " - "(e.g. failed to parse output from OS tools). " - "Retries left: ~b Error:~n~tp", - [Retries, Error]), - erlang:send_after(Interval, self(), try_enable), - State#state{enabled = false}. - -enable_handle_total_memory(TotalMemory, DiskFree, #state{limit = Limit} = State) when is_integer(TotalMemory) -> - ?LOG_INFO("Enabling free disk space monitoring " - "(disk free space: ~b, total memory: ~b)", [DiskFree, TotalMemory]), - start_timer(set_disk_limits(State, Limit)); -enable_handle_total_memory(Error, _DiskFree, #state{interval = Interval, retries = Retries} = State) -> - ?LOG_WARNING("Free disk space monitor encountered an error " - "retrieving total memory. " - "Retries left: ~b Error:~n~tp", - [Retries, Error]), - erlang:send_after(Interval, self(), try_enable), - State#state{enabled = false}. +enable(#state{dir = undefined, + interval = Interval, + retries = Retries} = State) -> + case resolve_data_dir() of + {ok, MountPoint} -> + enable(State#state{dir = MountPoint}); + {error, Reason} -> + ?LOG_WARNING("Free disk space monitor encounter an error " + "resolving the data directory '~ts'. Retries left: " + "~b Error:~n~tp", + [rabbit:data_dir(), Retries, Reason]), + erlang:send_after(Interval, self(), try_enable), + State#state{enabled = false} + end; +enable(#state{dir = Dir, + retries = Retries, + interval = Interval, + limit = Limit} = State) -> + DiskFree = get_disk_free(State), + case vm_memory_monitor:get_total_memory() of + TotalMemory when is_integer(TotalMemory) -> + ?LOG_INFO("Enabling free disk space monitoring (data dir free " + "space: ~b, total memory: ~b)", + [maps:get(Dir, DiskFree, unknown), TotalMemory]), + start_timer(set_disk_limits(State, Limit)); + unknown -> + ?LOG_WARNING("Free disk space monitor could not determine total " + "memory. Retries left: ~b", [Retries]), + erlang:send_after(Interval, self(), try_enable), + State#state{enabled = false} + end. + +resolve_data_dir() -> + case disksup:get_disk_info(rabbit:data_dir()) of + [{"none", 0, 0, 0}] -> + {error, disksup_not_available}; + [{MountPoint, 0, 0, 0}] -> + {error, {cannot_determine_space, MountPoint}}; + [{MountPoint, _TotalKiB, _AvailableKiB, _Capacity}] -> + {ok, MountPoint}; + [] -> + {error, no_disk_info}; + [_ | _] = Infos -> + {error, {multiple_disks, length(Infos)}} + end. From 40d2d5e65425f3ebc0d54d32dc0e038e77b87c62 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 23 Oct 2025 12:56:29 -0400 Subject: [PATCH 09/18] rabbit_alarm: Add a helper to format resource alarm sources --- deps/rabbit/src/rabbit_alarm.erl | 30 +++++++++++++++--------------- deps/rabbit/src/rabbit_reader.erl | 9 +++------ 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/deps/rabbit/src/rabbit_alarm.erl b/deps/rabbit/src/rabbit_alarm.erl index 84c5188f159..52cd78db9e1 100644 --- a/deps/rabbit/src/rabbit_alarm.erl +++ b/deps/rabbit/src/rabbit_alarm.erl @@ -24,6 +24,7 @@ -export([start_link/0, start/0, stop/0, register/2, set_alarm/1, clear_alarm/1, get_alarms/0, get_alarms/1, get_local_alarms/0, get_local_alarms/1, on_node_up/1, on_node_down/1, + format_resource_alarm_source/1, format_as_map/1, format_as_maps/1, is_local/1]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, @@ -124,25 +125,23 @@ is_local({file_descriptor_limit, _}) -> true; is_local({{resource_limit, _Resource, Node}, _}) when Node =:= node() -> true; is_local({{resource_limit, _Resource, Node}, _}) when Node =/= node() -> false. +-spec format_resource_alarm_source(resource_alarm_source()) -> iodata(). +format_resource_alarm_source(disk) -> + ?DISK_SPACE_RESOURCE; +format_resource_alarm_source(memory) -> + ?MEMORY_RESOURCE; +format_resource_alarm_source(Unknown) -> + io_lib:format("~w", [Unknown]). + -spec format_as_map(alarm()) -> #{binary() => term()}. format_as_map(file_descriptor_limit) -> #{ <<"resource">> => ?FILE_DESCRIPTOR_RESOURCE, <<"node">> => node() }; -format_as_map({resource_limit, disk, Node}) -> - #{ - <<"resource">> => ?DISK_SPACE_RESOURCE, - <<"node">> => Node - }; -format_as_map({resource_limit, memory, Node}) -> - #{ - <<"resource">> => ?MEMORY_RESOURCE, - <<"node">> => Node - }; format_as_map({resource_limit, Limit, Node}) -> #{ - <<"resource">> => rabbit_data_coercion:to_binary(Limit), + <<"resource">> => iolist_to_binary(format_resource_alarm_source(Limit)), <<"node">> => Node }. @@ -237,7 +236,7 @@ handle_event({node_down, Node}, #alarms{alarmed_nodes = AN} = State) -> AlarmsForDeadNode = maps:get(Node, AN, []), {ok, lists:foldr(fun(Source, AccState) -> ?LOG_WARNING("~ts resource limit alarm cleared for dead node ~tp", - [Source, Node]), + [format_resource_alarm_source(Source), Node]), maybe_alert(fun map_unappend/3, Node, Source, false, AccState) end, State, AlarmsForDeadNode)}; @@ -283,7 +282,8 @@ maybe_alert(UpdateFun, Node, Source, WasAlertAdded, end, AN1), case StillHasAlerts of true -> ok; - false -> ?LOG_WARNING("~ts resource limit alarm cleared across the cluster", [Source]) + false -> ?LOG_WARNING("~ts resource limit alarm cleared across the cluster", + [format_resource_alarm_source(Source)]) end, Alert = {WasAlertAdded, StillHasAlerts, Node}, case node() of @@ -326,7 +326,7 @@ handle_set_resource_alarm(Source, Node, State) -> "**********************************************************~n" "*** Publishers will be blocked until this alarm clears ***~n" "**********************************************************~n", - [Source, Node]), + [format_resource_alarm_source(Source), Node]), {ok, maybe_alert(fun map_append/3, Node, Source, true, State)}. handle_set_alarm({file_descriptor_limit, []}, State) -> @@ -342,7 +342,7 @@ handle_set_alarm(Alarm, State) -> handle_clear_resource_alarm(Source, Node, State) -> ?LOG_WARNING("~ts resource limit alarm cleared on node ~tp", - [Source, Node]), + [format_resource_alarm_source(Source), Node]), {ok, maybe_alert(fun map_unappend/3, Node, Source, false, State)}. handle_clear_alarm(file_descriptor_limit, State) -> diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index fe3e74c7b92..d43a7f3a9fc 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -1727,14 +1727,11 @@ send_error_on_channel0_and_close(Channel, Protocol, Reason, State) -> blocked_by_message(#throttle{blocked_by = Reasons}) -> %% we don't want to report internal flow as a reason here since %% it is entirely transient - Reasons1 = sets:del_element(flow, Reasons), - RStr = string:join([format_blocked_by(R) || R <- sets:to_list(Reasons1)], " & "), + RStr = lists:join([rabbit_alarm:format_resource_alarm_source(R) || + {resource, R} <- sets:to_list(Reasons)], + " & "), list_to_binary(rabbit_misc:format("low on ~ts", [RStr])). -format_blocked_by({resource, memory}) -> "memory"; -format_blocked_by({resource, disk}) -> "disk"; -format_blocked_by({resource, disc}) -> "disk". - update_last_blocked_at(Throttle) -> Throttle#throttle{last_blocked_at = erlang:monotonic_time()}. From 399ce7e56509ff8a7d37e8da30f18a0e395639fe Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 23 Oct 2025 16:50:58 -0400 Subject: [PATCH 10/18] Set per-queue-type disk alarms for configured mounts This introduces a new variant of `rabbit_alarm:resource_alarm_source()`: `{disk, QueueType}` which triggers when the configured mount for queue type(s) fall under their limit of available space. --- deps/rabbit/src/rabbit_alarm.erl | 7 ++++- deps/rabbit/src/rabbit_disk_monitor.erl | 28 +++++++++++++++++-- ...theus_rabbitmq_alarm_metrics_collector.erl | 17 +++++++++-- 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/deps/rabbit/src/rabbit_alarm.erl b/deps/rabbit/src/rabbit_alarm.erl index 52cd78db9e1..f4af0dae2d7 100644 --- a/deps/rabbit/src/rabbit_alarm.erl +++ b/deps/rabbit/src/rabbit_alarm.erl @@ -49,7 +49,10 @@ -export_type([alarm/0]). -type local_alarm() :: 'file_descriptor_limit'. --type resource_alarm_source() :: 'disk' | 'memory'. +-type resource_alarm_source() :: + memory + | disk + | {disk, rabbit_queue_type:queue_type()}. -type resource_alarm() :: {resource_limit, resource_alarm_source(), node()}. -type alarm() :: local_alarm() | resource_alarm(). -type resource_alert() :: {WasAlarmSetForNode :: boolean(), @@ -128,6 +131,8 @@ is_local({{resource_limit, _Resource, Node}, _}) when Node =/= node() -> false. -spec format_resource_alarm_source(resource_alarm_source()) -> iodata(). format_resource_alarm_source(disk) -> ?DISK_SPACE_RESOURCE; +format_resource_alarm_source({disk, QueueType}) -> + io_lib:format("disk for queue type '~ts'", [QueueType]); format_resource_alarm_source(memory) -> ?MEMORY_RESOURCE; format_resource_alarm_source(Unknown) -> diff --git a/deps/rabbit/src/rabbit_disk_monitor.erl b/deps/rabbit/src/rabbit_disk_monitor.erl index c4415b2f213..505711abe20 100644 --- a/deps/rabbit/src/rabbit_disk_monitor.erl +++ b/deps/rabbit/src/rabbit_disk_monitor.erl @@ -308,10 +308,14 @@ internal_update(#state{limit = DataDirLimit, ets:insert(?MOUNT_ETS_NAME, [M || _Path := M <- NewMounts]), AlarmedMs = alarmed_mounts(Mounts), + AlarmedQTs = alarmed_queue_types(Mounts), NewAlarmedMs = alarmed_mounts(NewMounts), + NewAlarmedQTs = alarmed_queue_types(NewMounts), NewlyClearedMs = sets:subtract(AlarmedMs, NewAlarmedMs), + NewlyClearedQTs = sets:subtract(AlarmedQTs, NewAlarmedQTs), NewlyAlarmedMs = sets:subtract(NewAlarmedMs, AlarmedMs), + NewlyAlarmedQTs = sets:subtract(NewAlarmedQTs, AlarmedQTs), lists:foreach( fun(Path) -> @@ -320,7 +324,11 @@ internal_update(#state{limit = DataDirLimit, available = Available} = maps:get(Path, NewMounts), emit_update_info(Name, "insufficient", Available, Limit) end, lists:sort(sets:to_list(NewlyAlarmedMs))), - %% TODO: rabbit_alarm:set_alarm/1 for affected queue types + lists:foreach( + fun(QT) -> + Alarm = {resource_limit, {disk, QT}, node()}, + rabbit_alarm:set_alarm({Alarm, []}) + end, lists:sort(sets:to_list(NewlyAlarmedQTs))), lists:foreach( fun(Path) -> #mount{name = Name, @@ -328,7 +336,11 @@ internal_update(#state{limit = DataDirLimit, available = Available} = maps:get(Path, NewMounts), emit_update_info(Name, "sufficient", Available, Limit) end, lists:sort(sets:to_list(NewlyClearedMs))), - %% TODO: rabbit_alarm:clear_alarm/1 for affected queue types + lists:foreach( + fun(QT) -> + Alarm = {resource_limit, {disk, QT}, node()}, + rabbit_alarm:clear_alarm(Alarm) + end, lists:sort(sets:to_list(NewlyClearedQTs))), State#state{alarmed = NewAlarmed, actual = DataDirFree, @@ -353,6 +365,18 @@ alarmed_mounts(Mounts) -> Acc end, sets:new([{version, 2}]), Mounts). +-spec alarmed_queue_types(mounts()) -> + sets:set(module()). +alarmed_queue_types(MountPoints) -> + maps:fold( + fun (_Path, #mount{available = Available, + limit = Limit, + queue_types = QTs}, Acc) when Available < Limit -> + sets:union(QTs, Acc); + (_Path, _Mount, Acc) -> + Acc + end, sets:new([{version, 2}]), MountPoints). + -spec get_disk_free(#state{}) -> #{file:filename() => AvailableB :: non_neg_integer()}. get_disk_free(#state{dir = DataDir, mounts = Mounts}) -> diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl index 284ff73c9fc..911b9add6fd 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl @@ -35,11 +35,13 @@ collect_mf(_Registry, Callback) -> Alarms = rabbit_alarm:get_local_alarms(500), %% TODO: figure out timeout ActiveAlarms = lists:foldl(fun ({{resource_limit, disk, _}, _}, Acc) -> - maps:put(disk_limit, 1, Acc); + Acc#{disk_limit => 1}; + ({{resource_limit, {disk, QT}, _}, _}, Acc) -> + Acc#{{disk, QT} => 1}; ({{resource_limit, memory, _}, _}, Acc) -> - maps:put(memory_limit, 1, Acc); + Acc#{memory_limit => 1}; ({file_descriptor_limit, _}, Acc) -> - maps:put(file_descriptor_limit, 1, Acc) + Acc#{file_descriptor_limit => 1} end, #{}, Alarms), @@ -58,6 +60,15 @@ collect_mf(_Registry, Callback) -> <<"is 1 if VM memory watermark alarm is in effect">>, untyped, [untyped_metric(maps:get(memory_limit, ActiveAlarms, 0))])), + + Callback(create_mf(?METRIC_NAME(<<"queue_type_free_disk_space_watermark">>), + <<"is 1 if the queue type disk-space alarm is in effect">>, + untyped, + [prometheus_model_helpers:untyped_metric( + #{queue_type => QT}, + maps:get({disk, QT}, ActiveAlarms, 0)) || + {_, QT} <- rabbit_registry:lookup_all(queue)])), + ok catch exit:{timeout, _} -> From 73e7427643ccc0f411e8ea058a657d6db5136f73 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 23 Oct 2025 17:53:33 -0400 Subject: [PATCH 11/18] AMQP 0-9-1: Handle per-queue-type disk alarms This covers both network and direct connections for 0-9-1. We store a set of the queue types which have been published into on both a channel and connection level since blocking is done on the connection level but only the channel knows what queue types have been published. Then when the published queue types or the set of alarms changes, the connection evaluates whether it is affected by the alarm. If not it may publish but once a channel publishes to an alarmed queue type the connection then blocks until the channel exits or the alarm clears. --- deps/amqp_client/src/amqp_gen_connection.erl | 33 ++++++--- deps/rabbit/src/rabbit_amqqueue.erl | 11 ++- deps/rabbit/src/rabbit_channel.erl | 17 ++++- deps/rabbit/src/rabbit_reader.erl | 73 ++++++++++++++++---- 4 files changed, 108 insertions(+), 26 deletions(-) diff --git a/deps/amqp_client/src/amqp_gen_connection.erl b/deps/amqp_client/src/amqp_gen_connection.erl index 886a06d45f0..15e39fdd46d 100644 --- a/deps/amqp_client/src/amqp_gen_connection.erl +++ b/deps/amqp_client/src/amqp_gen_connection.erl @@ -32,6 +32,7 @@ %% connection.block, connection.unblock handler block_handler, blocked_by = sets:new([{version, 2}]), + queue_types_published = sets:new([{version, 2}]), closing = false %% #closing{} | false }). @@ -214,7 +215,6 @@ handle_cast({register_blocked_handler, HandlerPid}, {noreply, State1}; handle_cast({conserve_resources, Source, Conserve}, #state{blocked_by = BlockedBy} = State) -> - WasNotBlocked = sets:is_empty(BlockedBy), BlockedBy1 = case Conserve of true -> sets:add_element(Source, BlockedBy); @@ -222,14 +222,11 @@ handle_cast({conserve_resources, Source, Conserve}, sets:del_element(Source, BlockedBy) end, State1 = State#state{blocked_by = BlockedBy1}, - case sets:is_empty(BlockedBy1) of - true -> - handle_method(#'connection.unblocked'{}, State1); - false when WasNotBlocked -> - handle_method(#'connection.blocked'{}, State1); - false -> - {noreply, State1} - end. + maybe_block(State, State1); +handle_cast({channel_published_to_queue_type, _ChPid, QT}, + #state{queue_types_published = QTs} = State) -> + State1 = State#state{queue_types_published = sets:add_element(QT, QTs)}, + maybe_block(State, State1). %% @private handle_info({'DOWN', _, process, BlockHandler, Reason}, @@ -274,6 +271,24 @@ i(Item, #state{module = Mod, module_state = MState}) -> Mod:i(Item, MState). register_blocked_handler(Pid, HandlerPid) -> gen_server:cast(Pid, {register_blocked_handler, HandlerPid}). +maybe_block(State0, State1) -> + WasBlocked = should_block(State0), + case should_block(State1) of + true when not WasBlocked -> + handle_method(#'connection.blocked'{}, State1); + false when WasBlocked -> + handle_method(#'connection.unblocked'{}, State1); + _ -> + {noreply, State1} + end. + +should_block(#state{blocked_by = BlockedBy, queue_types_published = QTs}) -> + lists:any(fun ({disk, QT}) -> + sets:is_element(QT, QTs); + (_Resource) -> + true + end, sets:to_list(BlockedBy)). + %%--------------------------------------------------------------------------- %% Command handling %%--------------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 1c8a706a627..f35d6574e9f 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -67,7 +67,7 @@ -export([deactivate_limit_all/2]). -export([prepend_extra_bcc/1]). --export([queue/1, queue_names/1]). +-export([queue/1, queue_names/1, queue_types/1]). -export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]). -export([delete_transient_queues_on_node/1]). @@ -2097,6 +2097,15 @@ queue_names(Queues) -> amqqueue:get_name(Q) end, Queues). +-spec queue_types([Q | {Q, route_infos()}]) -> + [rabbit_queue_type:queue_type()] when Q :: amqqueue:target(). +queue_types(Queues) -> + lists:map(fun({Q, RouteInfos}) when is_map(RouteInfos) -> + amqqueue:get_type(Q); + (Q) -> + amqqueue:get_type(Q) + end, Queues). + -spec lookup_extra_bcc(amqqueue:target(), binary()) -> [amqqueue:target()]. lookup_extra_bcc(Q, BCCName) -> diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 3c7c865fcb0..3d592da5c66 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -169,6 +169,8 @@ delivery_flow :: flow | noflow, interceptor_state, queue_states, + queue_types_published = sets:new([{version, 2}]) :: + sets:set(rabbit_queue_type:queue_type()), tick_timer, publishing_mode = false :: boolean() }). @@ -2092,14 +2094,16 @@ deliver_to_queues(XName, {ok, QueueStates, Actions} -> rabbit_global_counters:messages_routed(amqp091, length(Qs)), QueueNames = rabbit_amqqueue:queue_names(Qs), + QueueTypes = rabbit_amqqueue:queue_types(Qs), %% NB: the order here is important since basic.returns must be %% sent before confirms. ok = process_routing_mandatory(Mandatory, RoutedToQueues, Message, XName, State0), MsgSeqNo = maps:get(correlation, Options, undefined), State1 = process_routing_confirm(MsgSeqNo, QueueNames, XName, State0), + State2 = notify_published_queue_types(QueueTypes, State1), %% Actions must be processed after registering confirms as actions may %% contain rejections of publishes - State = handle_queue_actions(Actions, State1#ch{queue_states = QueueStates}), + State = handle_queue_actions(Actions, State2#ch{queue_states = QueueStates}), case rabbit_event:stats_level(State, #ch.stats_timer) of fine -> ?INCR_STATS(exchange_stats, XName, 1, publish), @@ -2165,6 +2169,17 @@ process_routing_confirm(MsgSeqNo, QRefs, XName, State) State#ch{unconfirmed = rabbit_confirms:insert(MsgSeqNo, QRefs, XName, State#ch.unconfirmed)}. +notify_published_queue_types(QueueTypes, + #ch{cfg = #conf{conn_pid = ConnPid}, + queue_types_published = QTs0} = State) -> + QTs = sets:union(QTs0, sets:from_list(QueueTypes, [{version, 2}])), + sets:fold( + fun(QT, ok) -> + gen_server:cast(ConnPid, + {channel_published_to_queue_type, self(), QT}) + end, ok, sets:subtract(QTs, QTs0)), + State#ch{queue_types_published = QTs}. + confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) -> %% NOTE: if queue name does not exist here it's likely that the ref also %% does not exist in unconfirmed messages. diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index d43a7f3a9fc..0f5c517fd76 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -113,7 +113,14 @@ %% a set of the reasons why we are %% blocked: {resource, memory}, {resource, disk}. %% More reasons can be added in the future. - blocked_by, + blocked_by = sets:new([{version, 2}]) :: + sets:set(flow | {resource, + rabbit_alarm:resource_alarm_source()}), + %% the set of queue types which have been published to + %% by channels on this connection, used for per-queue + %% type disk alarm blocking + queue_types_published = #{} :: #{ChannelPid :: pid() => + sets:set(rabbit_queue_type:queue_type())}, %% true if received any publishes, false otherwise %% note that this will also be true when connection is %% already blocked @@ -335,7 +342,6 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> throttle = #throttle{ last_blocked_at = never, should_block = false, - blocked_by = sets:new([{version, 2}]), connection_blocked_message_sent = false }, proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock)}, @@ -677,6 +683,14 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> %% Ignore, we will emit a created event once we start running. State; +handle_other({'$gen_cast', {channel_published_to_queue_type, ChPid, QT}}, + #v1{throttle = Throttle0} = State0) -> + QTs = maps:update_with( + ChPid, fun(ChQTs) -> sets:add_element(QT, ChQTs) end, + sets:from_list([QT], [{version, 2}]), + Throttle0#throttle.queue_types_published), + Throttle = Throttle0#throttle{queue_types_published = QTs}, + control_throttle(State0#v1{throttle = Throttle}); handle_other(ensure_stats, State) -> ensure_stats_timer(State); handle_other(emit_stats, State) -> @@ -1007,14 +1021,21 @@ is_over_node_channel_limit() -> end end. -channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) -> +channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount, + throttle = Throttle0} = State) -> case get({ch_pid, ChPid}) of - undefined -> {undefined, State}; - {Channel, MRef} -> credit_flow:peer_down(ChPid), - erase({channel, Channel}), - erase({ch_pid, ChPid}), - erlang:demonitor(MRef, [flush]), - {Channel, State#v1{channel_count = ChannelCount - 1}} + undefined -> + {undefined, State}; + {Channel, MRef} -> + credit_flow:peer_down(ChPid), + erase({channel, Channel}), + erase({ch_pid, ChPid}), + erlang:demonitor(MRef, [flush]), + QT = maps:remove(ChPid, + Throttle0#throttle.queue_types_published), + Throttle = Throttle0#throttle{queue_types_published = QT}, + {Channel, State#v1{channel_count = ChannelCount - 1, + throttle = Throttle}} end. all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. @@ -1738,22 +1759,44 @@ update_last_blocked_at(Throttle) -> connection_blocked_message_sent( #throttle{connection_blocked_message_sent = BS}) -> BS. -should_send_blocked(Throttle = #throttle{blocked_by = Reasons}) -> +should_send_blocked(Throttle) -> should_block(Throttle) andalso - sets:size(sets:del_element(flow, Reasons)) =/= 0 + do_throttle_reasons_apply(Throttle) andalso not connection_blocked_message_sent(Throttle). -should_send_unblocked(Throttle = #throttle{blocked_by = Reasons}) -> +should_send_unblocked(Throttle) -> connection_blocked_message_sent(Throttle) andalso - sets:size(sets:del_element(flow, Reasons)) == 0. + not do_throttle_reasons_apply(Throttle). + +do_throttle_reasons_apply(#throttle{blocked_by = Reasons} = Throttle) -> + lists:any( + fun ({resource, disk}) -> + true; + ({resource, memory}) -> + true; + ({resource, {disk, QT}}) -> + has_published_to_queue_type(QT, Throttle); + (_) -> + %% Flow control should not send connection.blocked + false + end, sets:to_list(Reasons)). + +has_published_to_queue_type(QT, #throttle{queue_types_published = QTs}) -> + rabbit_misc:maps_any( + fun(_ChPid, ChQT) -> sets:is_element(QT, ChQT) end, QTs). %% Returns true if we have a reason to block %% this connection. -has_reasons_to_block(#throttle{blocked_by = Reasons}) -> - sets:size(Reasons) > 0. +has_reasons_to_block(#throttle{blocked_by = Reasons} = Throttle) -> + lists:any( + fun ({resource, {disk, QType}}) -> + has_published_to_queue_type(QType, Throttle); + (_) -> + true + end, sets:to_list(Reasons)). is_blocked_by_flow(#throttle{blocked_by = Reasons}) -> sets:is_element(flow, Reasons). From be380c2f8cd9b963ed1ebef6bab75ab959d3e82d Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 30 Oct 2025 16:58:59 -0400 Subject: [PATCH 12/18] prometheus: Add core metrics per-mount for free space and limit This adds two gauge metrics which are emitted per configured mount, one for available bytes and the other for the low watermark. The label `"disk="` is attached to both gauges to distinguish which mount the gauge applies to. --- deps/rabbitmq_prometheus/metrics.md | 9 +++++++++ .../prometheus_rabbitmq_core_metrics_collector.erl | 10 ++++++++++ 2 files changed, 19 insertions(+) diff --git a/deps/rabbitmq_prometheus/metrics.md b/deps/rabbitmq_prometheus/metrics.md index 7f61b0d3af9..980d8292925 100644 --- a/deps/rabbitmq_prometheus/metrics.md +++ b/deps/rabbitmq_prometheus/metrics.md @@ -136,6 +136,15 @@ These metrics are specific to the stream protocol. | rabbitmq_process_resident_memory_bytes | Memory used in bytes | | rabbitmq_resident_memory_limit_bytes | Memory high watermark in bytes | +### Per-mount disk space + +| Metric | Description | +| --- | --- | +| rabbitmq_mount_space_available_bytes | Disk space available in bytes on configured mount | +| rabbitmq_mount_space_available_limit_bytes | Disk space available low watermark in bytes on configured mount | + +Both metrics have a `disk` label identifying the configured disk name which was measured. + ### Connections | Metric | Description | diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index 7f6ed70d56d..a508a1d775f 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -85,6 +85,10 @@ {2, undefined, erlang_net_ticktime_seconds, gauge, "Inter-node heartbeat interval", net_ticktime}, {2, ?MILLISECOND, erlang_uptime_seconds, gauge, "Node uptime", uptime} ]}, + {mount_metrics, [ + {2, undefined, mount_space_available_bytes, gauge, "Disk space available in bytes on configured mount"}, + {3, undefined, mount_space_available_limit_bytes, gauge, "Disk space available low watermark in bytes on configured mount"} + ]}, {node_persister_metrics, [ {2, undefined, io_read_ops_total, counter, "Total number of I/O read operations", io_read_count}, @@ -856,6 +860,12 @@ get_data(exchange_names, _, _) -> Label = <<"vhost=\"", VHost/binary, "\",exchange=\"", Name/binary, "\",type=\"", (atom_to_binary(EType))/binary, "\"">>, [{Label, 1}|Acc] end, [], rabbit_exchange:list()); +get_data(mount_metrics, _, _) -> + [{<<"disk=", Name/binary>>, Available, Limit} + || #{name := Name, + available := Available, + limit := Limit} <- rabbit_disk_monitor:get_mount_free(), + Available =/= 'NaN']; get_data(Table, _, _) -> ets:tab2list(Table). From 984e841e76872ec42e3d74ae09e0028e5f04951a Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 31 Oct 2025 11:51:09 -0400 Subject: [PATCH 13/18] management: Expose current mount stats in API and UI This adds the configured mounts, if there are any, to the API JSON response for the `/api/nodes` and `/api/node/` endpoints and to the overview and node-detail UI pages. Time series data is not collected for these metrics - that should be scraped from the Prometheus endpoint instead. --- .../rabbitmq_management/priv/www/js/global.js | 3 +- .../priv/www/js/tmpl/node.ejs | 16 ++++++++++ .../priv/www/js/tmpl/overview.ejs | 32 +++++++++++++++++++ .../src/rabbit_mgmt_db.erl | 3 +- .../src/rabbit_mgmt_data.erl | 13 ++++---- 5 files changed, 59 insertions(+), 8 deletions(-) diff --git a/deps/rabbitmq_management/priv/www/js/global.js b/deps/rabbitmq_management/priv/www/js/global.js index 60715bbb449..e346bd75f02 100644 --- a/deps/rabbitmq_management/priv/www/js/global.js +++ b/deps/rabbitmq_management/priv/www/js/global.js @@ -141,7 +141,8 @@ var ALL_COLUMNS = {'Statistics': [['file_descriptors', 'File descriptors', true], ['erlang_processes', 'Erlang processes', true], ['memory', 'Memory', true], - ['disk_space', 'Disk space', true]], + ['disk_space', 'Disk space', true], + ['mount_space', 'Other disks', true]], 'General': [['uptime', 'Uptime', true], ['cores', 'Cores', true], ['info', 'Info', true], diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/node.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/node.ejs index e1739b9415f..e67c71f1d20 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/node.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/node.ejs @@ -131,6 +131,22 @@ <% } %> +<% + for (var i = 0; i < node.mount_stats.length; i++) { + var mount = node.mount_stats[i]; +%> + + + <%= fmt_string(mount.name) %> disk space + + +<%= node_stat_bar('available', 'limit', 'low watermark', mount, fmt_bytes_axis, + mount.available < mount.limit ? 'red' : 'green', + mount.available < mount.limit ? 'disk_free-alarm' : null, + true) %> + + +<% } %> diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/overview.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/overview.ejs index ac152cbfc67..f374660f40e 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/overview.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/overview.ejs @@ -93,6 +93,24 @@ <% if (show_column('overview', 'disk_space')) { %> Disk space <% } %> + <% if (show_column('overview', 'mount_space')) { %> + <% + var other_disk_names = []; + var unique_disk_names = {}; + for (var i = 0; i < nodes.length; i++) { + for (var j = 0; j < nodes[i].mount_stats.length; j++) { + var name = nodes[i].mount_stats[j].name; + if (!Object.hasOwnProperty(unique_disk_names, name)) { + unique_disk_names[name] = true; + other_disk_names.push(name); + } + } + } + %> + <% for (var i = 0; i < other_disk_names.length; i++) { %> + <%= fmt_string(other_disk_names[i]) %> disk space + <% } %> + <% } %> <% if (show_column('overview', 'uptime')) { %> Uptime <% } %> @@ -180,6 +198,20 @@ <% } %> <% } %> + <% if (show_column('overview', 'mount_space')) { %> + <% for (var i = 0; i < other_disk_names.length; i++) { + var mount = node.mount_stats.find((m) => m.name == other_disk_names[i]); + %> + + <% if (mount) { %> + <%= node_stat_bar('available', 'limit', 'low watermark', + mount, fmt_bytes_axis, + mount.available < mount.limit ? 'red' : 'green', + null, true) %> + <% } %> + + <% } %> + <% } %> <% if (show_column('overview', 'uptime')) { %> <%= fmt_uptime(node.uptime) %> <% } %> diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_db.erl b/deps/rabbitmq_management/src/rabbit_mgmt_db.erl index 890022a1aab..f6fe6e96549 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_db.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_db.erl @@ -661,7 +661,8 @@ node_stats(Ranges, Objs, Interval) -> StatsD = [{cluster_links, NodeNodeStats}], MgmtStats = maps:get(mgmt_stats, NData), Details = augment_details(Obj, []), % augmentation needs to be node local - combine(Props, Obj) ++ Details ++ Stats ++ StatsD ++ MgmtStats + MountStats = [{mount_stats, maps:get(mount_stats, NData, [])}], + combine(Props, Obj) ++ Details ++ Stats ++ StatsD ++ MgmtStats ++ MountStats end || Obj <- Objs]. combine(New, Old) -> diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl index 0e66b3e1de1..7cfc5572f12 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl @@ -108,14 +108,15 @@ vhost_data(Ranges, Id) -> node_data(Ranges, Id) -> maps:from_list( - [{mgmt_stats, mgmt_queue_length_stats(Id)}] ++ - [{node_node_metrics, node_node_metrics()}] ++ - node_raw_detail_stats_data(Ranges, Id) ++ - [raw_message_data(node_coarse_stats, + [{mgmt_stats, mgmt_queue_length_stats(Id)}, + {node_node_metrics, node_node_metrics()}, + {node_stats, lookup_element(node_stats, Id)}, + {mount_stats, [maps:to_list(M) || M <- rabbit_disk_monitor:get_mount_free()]}, + raw_message_data(node_coarse_stats, pick_range(coarse_node_stats, Ranges), Id), raw_message_data(node_persister_stats, - pick_range(coarse_node_stats, Ranges), Id), - {node_stats, lookup_element(node_stats, Id)}] ++ + pick_range(coarse_node_stats, Ranges), Id)] ++ + node_raw_detail_stats_data(Ranges, Id) ++ node_connection_churn_rates_data(Ranges, Id)). overview_data(_Pid, User, Ranges, VHosts) -> From c67693e9710cccfdd35564da79c140bf5263f5c8 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 31 Oct 2025 12:33:56 -0400 Subject: [PATCH 14/18] rabbit_disk_monitor: Add config to tune polling interval The polling interval (min, max and fast-rate) should be tuned for use on different hardware. For example high-end machines with strong network bandwidth should be tuning the fast-rate higher so that disk space is checked more often, as with stronger resources the disk space could fill up more rapidly than the default 250MB/sec predicts. --- deps/rabbit/priv/schema/rabbit.schema | 14 ++++++++++++++ deps/rabbit/src/rabbit_disk_monitor.erl | 16 +++++++++++----- .../config_schema_SUITE_data/rabbit.snippets | 9 +++++++++ 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 0c35f32e97e..b5651a318bc 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -1238,6 +1238,20 @@ fun(Conf) -> end end}. +%% Tuning of disk monitor polling parameters +{mapping, "disk_monitor.fast_rate", "rabbit.disk_monitor_fast_rate", [ + %% Unit: KB/second, for example 250_000 for 250MB/sec. + {datatype, [integer]} +]}. +{mapping, "disk_monitor.min_interval", "rabbit.disk_monitor_min_interval", [ + %% Unit: milliseconds. + {datatype, [integer]} +]}. +{mapping, "disk_monitor.max_interval", "rabbit.disk_monitor_max_interval", [ + %% Unit: milliseconds. + {datatype, [integer]} +]}. + %% Per-queue-type / per-mount disk alarms {mapping, "disk_free_limits.$num.name", "rabbit.disk_free_limits", [ {datatype, [binary]} diff --git a/deps/rabbit/src/rabbit_disk_monitor.erl b/deps/rabbit/src/rabbit_disk_monitor.erl index 505711abe20..fb73170cd31 100644 --- a/deps/rabbit/src/rabbit_disk_monitor.erl +++ b/deps/rabbit/src/rabbit_disk_monitor.erl @@ -36,8 +36,6 @@ -define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100). -define(DEFAULT_MAX_DISK_CHECK_INTERVAL, 10000). -define(DEFAULT_DISK_FREE_LIMIT, 50000000). -%% 250MB/s i.e. 250kB/ms --define(FAST_RATE, (250 * 1000)). -record(mount, {%% name set in configuration @@ -164,6 +162,10 @@ start_link(Args) -> init([Limit]) -> {ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries), {ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval), + MinInterval = application:get_env(rabbit, disk_monitor_min_interval, + ?DEFAULT_MIN_DISK_CHECK_INTERVAL), + MaxInterval = application:get_env(rabbit, disk_monitor_max_interval, + ?DEFAULT_MAX_DISK_CHECK_INTERVAL), ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), ?MOUNT_ETS_NAME = ets:new(?MOUNT_ETS_NAME, [protected, set, named_table, {keypos, #mount.name}]), @@ -172,8 +174,8 @@ init([Limit]) -> limit = Limit, retries = Retries, interval = Interval}, - State1 = set_min_check_interval(?DEFAULT_MIN_DISK_CHECK_INTERVAL, State0), - State2 = set_max_check_interval(?DEFAULT_MAX_DISK_CHECK_INTERVAL, State1), + State1 = set_min_check_interval(MinInterval, State0), + State2 = set_max_check_interval(MaxInterval, State1), State3 = enable(State2), @@ -420,9 +422,13 @@ interval(#state{actual = DataDirAvailable, (_Path, _Mount, Min) -> Min end, DataDirGap, Mounts), - IdealInterval = 2 * SmallestGap / ?FAST_RATE, + IdealInterval = 2 * SmallestGap / fast_rate(), trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))). +fast_rate() -> + %% 250MB/s i.e. 250kB/ms + application:get_env(rabbit, disk_monitor_fast_rate, 250_000). + -spec mounts() -> mounts(). mounts() -> case application:get_env(rabbit, disk_free_limits) of diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 36cab3e370b..76ea2656a4e 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -179,6 +179,15 @@ ssl_options.fail_if_no_peer_cert = true", "disk_free_limit.absolute = 2P", [{rabbit,[{disk_free_limit, "2P"}]}], []}, + %% Disk monitor polling + {disk_monitor_tune_polling_parameters, + "disk_monitor.fast_rate = 1000000 # 1 GB/sec + disk_monitor.min_interval = 50 + disk_monitor.max_interval = 20000", + [{rabbit,[{disk_monitor_fast_rate, 1_000_000}, + {disk_monitor_min_interval, 50}, + {disk_monitor_max_interval, 20_000}]}], + []}, {default_users, " From e3fb813fcd1b8b2232b88f0a43f8c883b7787cad Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 31 Oct 2025 13:13:23 -0400 Subject: [PATCH 15/18] CLI: Extend set_disk_free_limit to set mount limits With this change you can say: rabbitmqctl set_disk_limit mount Streaming 2GiB This applies the limit only to the "Streaming" mount. --- deps/rabbit/src/rabbit_disk_monitor.erl | 23 ++++++++++ .../commands/set_disk_free_limit_command.ex | 46 ++++++++++++++++++- 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/src/rabbit_disk_monitor.erl b/deps/rabbit/src/rabbit_disk_monitor.erl index fb73170cd31..9d78e953c4e 100644 --- a/deps/rabbit/src/rabbit_disk_monitor.erl +++ b/deps/rabbit/src/rabbit_disk_monitor.erl @@ -26,6 +26,7 @@ terminate/2, code_change/3]). -export([get_disk_free_limit/0, set_disk_free_limit/1, + set_disk_free_limit/2, get_min_check_interval/0, set_min_check_interval/1, get_max_check_interval/0, set_max_check_interval/1, get_disk_free/0, get_mount_free/0, set_enabled/1]). @@ -105,6 +106,11 @@ get_disk_free_limit() -> set_disk_free_limit(Limit) -> gen_server:call(?MODULE, {set_disk_free_limit, Limit}). +-spec set_disk_free_limit(MountName :: binary(), integer()) -> 'ok'. +set_disk_free_limit(MountName, Limit) + when is_binary(MountName) andalso is_integer(Limit) -> + gen_server:call(?MODULE, {set_disk_free_limit, MountName, Limit}). + -spec get_min_check_interval() -> integer(). get_min_check_interval() -> safe_ets_lookup(min_check_interval, ?DEFAULT_MIN_DISK_CHECK_INTERVAL). @@ -189,6 +195,23 @@ handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) -> handle_call({set_disk_free_limit, Limit}, _From, State) -> {reply, ok, set_disk_limits(State, Limit)}; +handle_call({set_disk_free_limit, Name, Limit}, _From, + #state{mounts = Mounts0} = State) -> + MatchingMount = lists:search( + fun({_Path, #mount{name = N}}) -> + Name =:= N + end, maps:to_list(Mounts0)), + case MatchingMount of + {value, {Path, Mount}} -> + ?LOG_INFO("Updated disk free limit of mount '~ts'", [Name]), + Mounts = Mounts0#{Path := Mount#mount{limit = Limit}}, + {reply, ok, State#state{mounts = Mounts}}; + false -> + ?LOG_WARNING("Cannot set disk free limit for mount '~ts' since " + "the name does not match any known mounts.", [Name]), + {reply, ok, State} + end; + handle_call(get_max_check_interval, _From, State) -> {reply, State#state.max_interval, State}; diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/set_disk_free_limit_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/set_disk_free_limit_command.ex index efa36cf1bc4..b4a987453f6 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/set_disk_free_limit_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/set_disk_free_limit_command.ex @@ -24,6 +24,14 @@ defmodule RabbitMQ.CLI.Ctl.Commands.SetDiskFreeLimitCommand do {:validation_failure, :too_many_args} end + def validate(["mount" | _] = args, _) when length(args) < 3 do + {:validation_failure, :not_enough_args} + end + + def validate(["mount" | _] = args, _) when length(args) > 3 do + {:validation_failure, :too_many_args} + end + def validate([limit], _) do case Integer.parse(limit) do {_, ""} -> @@ -47,12 +55,41 @@ defmodule RabbitMQ.CLI.Ctl.Commands.SetDiskFreeLimitCommand do end end + def validate(["mount", _name, limit], _) do + case Integer.parse(limit) do + {_, ""} -> + :ok + + {limit_val, units} -> + case memory_unit_absolute(limit_val, units) do + scaled_limit when is_integer(scaled_limit) -> :ok + _ -> {:validation_failure, :bad_argument} + end + + _ -> + {:validation_failure, :bad_argument} + end + end + def validate([_ | rest], _) when length(rest) > 0 do {:validation_failure, :too_many_args} end use RabbitMQ.CLI.Core.RequiresRabbitAppRunning + def run(["mount", mount_name, limit], %{node: node_name}) do + limit = + case Integer.parse(limit) do + {limit, ""} -> limit + {limit, units} -> + case memory_unit_absolute(limit, units) do + scaled_limit when is_integer(scaled_limit) -> + scaled_limit + end + end + make_rpc_call(node_name, [mount_name, limit]) + end + def run(["mem_relative", _] = args, opts) do set_disk_free_limit_relative(args, opts) end @@ -70,6 +107,10 @@ defmodule RabbitMQ.CLI.Ctl.Commands.SetDiskFreeLimitCommand do use RabbitMQ.CLI.DefaultOutput + def banner(["mount", mount, limit], %{node: node_name}) do + "Setting disk free limit for mount #{mount} on #{node_name} to #{limit} ..." + end + def banner(["mem_relative", arg], %{node: node_name}) do "Setting disk free limit on #{node_name} to #{arg} times the total RAM ..." end @@ -77,12 +118,13 @@ defmodule RabbitMQ.CLI.Ctl.Commands.SetDiskFreeLimitCommand do def banner([arg], %{node: node_name}), do: "Setting disk free limit on #{node_name} to #{arg} bytes ..." - def usage, do: "set_disk_free_limit | mem_relative " + def usage, do: "set_disk_free_limit | mem_relative | mount " def usage_additional() do [ ["", "New limit as an absolute value with units, e.g. 1GB"], - ["mem_relative ", "New limit as a fraction of total memory reported by the OS"] + ["mem_relative ", "New limit as a fraction of total memory reported by the OS"], + ["mount ", "New limit for the given mount name as an absolute value with units, e.g. 1GB"] ] end From f1b36bb8d2f68216faa4e6732f9ef97f98047338 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 4 Nov 2025 18:01:40 -0500 Subject: [PATCH 16/18] rabbit_stream_reader: Block during stream queue-type disk alarm --- .../src/rabbit_stream_reader.erl | 119 +++++++++++------- .../src/rabbit_stream_reader.hrl | 5 +- 2 files changed, 79 insertions(+), 45 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 575d587a6d3..d231204deb4 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -162,6 +162,12 @@ init([KeepaliveSup, DeliverVersion = ?VERSION_1, RequestTimeout = application:get_env(rabbitmq_stream, request_timeout, 60_000), + Sources = rabbit_alarm:register(self(), + {?MODULE, resource_alarm, []}), + Alarms = sets:from_list([S || S <- Sources, + S =:= disk orelse + S =:= ?STREAM_DISK_ALARM], + [{version, 2}]), Connection = #stream_connection{name = rabbit_data_coercion:to_binary(ConnStr), @@ -181,7 +187,7 @@ init([KeepaliveSup, authentication_state = none, connection_step = tcp_connected, frame_max = FrameMax, - resource_alarm = false, + resource_alarms = Alarms, send_file_oct = SendFileOct, transport = ConnTransport, proxy_socket = @@ -192,11 +198,10 @@ init([KeepaliveSup, deliver_version = DeliverVersion}, State = #stream_connection_state{consumers = #{}, - blocked = false, + blocked = not sets:is_empty(Alarms), data = rabbit_stream_core:init(undefined)}, Transport:setopts(RealSocket, [{active, once}]), - _ = rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}), ConnectionNegotiationStepTimeout = application:get_env(rabbitmq_stream, connection_negotiation_step_timeout, @@ -423,6 +428,8 @@ handle_info(Msg, #statem_data{transport = Transport, connection = #stream_connection{socket = S, + resource_alarms = + ResourceAlarms, connection_step = PreviousConnectionStep} = Connection, @@ -451,12 +458,18 @@ handle_info(Msg, {Error, S, Reason} -> ?LOG_WARNING("Socket error ~tp [~w]", [Reason, S]), stop; - {resource_alarm, IsThereAlarm} -> + {resource_alarm, Source, Conserve} -> + ResourceAlarms1 = case Conserve of + true -> + sets:add_element(Source, ResourceAlarms); + false -> + sets:del_element(Source, ResourceAlarms) + end, {keep_state, StatemData#statem_data{connection = - Connection#stream_connection{resource_alarm + Connection#stream_connection{resource_alarms = - IsThereAlarm}, + ResourceAlarms1}, connection_state = State#stream_connection_state{blocked = true}}}; @@ -502,8 +515,9 @@ invalid_transition(Transport, Socket, From, To) -> -spec resource_alarm(pid(), rabbit_alarm:resource_alarm_source(), rabbit_alarm:resource_alert()) -> ok. -resource_alarm(ConnectionPid, disk, {_, Conserve, _}) -> - ConnectionPid ! {resource_alarm, Conserve}, +resource_alarm(ConnectionPid, Source, {_, Conserve, _}) + when Source =:= disk orelse Source =:= ?STREAM_DISK_ALARM -> + ConnectionPid ! {resource_alarm, Source, Conserve}, ok; resource_alarm(_ConnectionPid, _Resource, _Alert) -> ok. @@ -525,17 +539,12 @@ should_unblock(#stream_connection{publishers = Publishers}, _) %% always unblock a connection without publishers true; should_unblock(#stream_connection{credits = Credits, - resource_alarm = ResourceAlarm}, + resource_alarms = ResourceAlarms}, #configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking}) -> - case {ResourceAlarm, - has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking)} - of - {true, _} -> - false; - {false, EnoughCreditsToUnblock} -> - EnoughCreditsToUnblock - end. + sets:is_empty(ResourceAlarms) andalso + has_enough_credits_to_unblock(Credits, + CreditsRequiredForUnblocking). init_credit(CreditReference, Credits) -> atomics:put(CreditReference, 1, Credits). @@ -624,12 +633,13 @@ close_immediately(Transport, S) -> open(enter, _OldState, _StateData) -> keep_state_and_data; -open(info, {resource_alarm, IsThereAlarm}, +open(info, {resource_alarm, Source, Conserve}, #statem_data{transport = Transport, connection = #stream_connection{socket = S, name = ConnectionName, credits = Credits, + resource_alarms = ResourceAlarms, heartbeater = Heartbeater} = Connection, connection_state = @@ -638,6 +648,13 @@ open(info, {resource_alarm, IsThereAlarm}, #configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking}} = StatemData) -> + ResourceAlarms1 = case Conserve of + true -> + sets:add_element(Source, ResourceAlarms); + false -> + sets:del_element(Source, ResourceAlarms) + end, + IsThereAlarm = not sets:is_empty(ResourceAlarms1), ?LOG_DEBUG("Connection ~tp received resource alarm. Alarm " "on? ~tp", [ConnectionName, IsThereAlarm]), @@ -668,8 +685,8 @@ open(info, {resource_alarm, IsThereAlarm}, end, {keep_state, StatemData#statem_data{connection = - Connection#stream_connection{resource_alarm = - IsThereAlarm}, + Connection#stream_connection{resource_alarms = + ResourceAlarms1}, connection_state = State#stream_connection_state{blocked = NewBlockedState}}}; @@ -1190,15 +1207,23 @@ close_sent(info, {tcp_error, S, Reason}, #statem_data{}) -> "[~w] [~w]", [Reason, S, self()]), stop; -close_sent(info, {resource_alarm, IsThereAlarm}, +close_sent(info, {resource_alarm, Source, Conserve}, StatemData = #statem_data{connection = Connection}) -> + ResourceAlarms = Connection#stream_connection.resource_alarms, + ResourceAlarms1 = case Conserve of + true -> + sets:add_element(Source, ResourceAlarms); + false -> + sets:del_element(Source, ResourceAlarms) + end, + IsThereAlarm = not sets:is_empty(ResourceAlarms1), ?LOG_WARNING("Stream protocol connection ignored a resource " "alarm ~tp in state ~ts", [IsThereAlarm, ?FUNCTION_NAME]), {keep_state, StatemData#statem_data{connection = - Connection#stream_connection{resource_alarm = - IsThereAlarm}}}; + Connection#stream_connection{resource_alarms = + ResourceAlarms1}}}; close_sent(info, Msg, _StatemData) -> ?LOG_WARNING("Ignored unknown message ~tp in state ~ts", [Msg, ?FUNCTION_NAME]), @@ -1550,13 +1575,15 @@ notify_auth_result(Username, [P || {_, V} = P <- EventProps, V =/= '']). handle_frame_post_auth(Transport, - #stream_connection{resource_alarm = true} = Connection0, + #stream_connection{resource_alarms = + ResourceAlarms} = Connection0, State, {request, CorrelationId, {declare_publisher, PublisherId, _WriterRef, - Stream}}) -> + Stream}}) + when map_size(ResourceAlarms) =/= 0 -> ?LOG_INFO("Cannot create publisher ~tp on stream ~tp, connection " "is blocked because of resource alarm", [PublisherId, Stream]), @@ -1574,10 +1601,12 @@ handle_frame_post_auth(Transport, host = Host, auth_mechanism = Auth_Mechanism, authentication_state = AuthState, - resource_alarm = false} = C1, + resource_alarms = ResourceAlarms + } = C1, S1, {request, CorrelationId, - {sasl_authenticate, NewMechanism, NewSaslBin}}) -> + {sasl_authenticate, NewMechanism, NewSaslBin}}) + when map_size(ResourceAlarms) =:= 0 -> ?LOG_DEBUG("Received sasl_authenticate for username '~ts'", [Username]), {Connection1, State1} = @@ -1656,33 +1685,35 @@ handle_frame_post_auth(Transport, {Connection1, State1}; handle_frame_post_auth(Transport, #stream_connection{user = User, - resource_alarm = false} = C, + resource_alarms = ResourceAlarms + } = C, State, {request, CorrelationId, {declare_publisher, _PublisherId, WriterRef, S}}) - when ?IS_INVALID_REF(WriterRef) -> - {Code, Counter} = case check_write_permitted(stream_r(S, C), User) of - ok -> - {?RESPONSE_CODE_PRECONDITION_FAILED, ?PRECONDITION_FAILED}; - error -> - {?RESPONSE_CODE_ACCESS_REFUSED, ?ACCESS_REFUSED} - end, - response(Transport, - C, - declare_publisher, - CorrelationId, - Code), - increase_protocol_counter(Counter), - {C, State}; + when ?IS_INVALID_REF(WriterRef) andalso map_size(ResourceAlarms) =:= 0 -> + {Code, Counter} = case check_write_permitted(stream_r(S, C), User) of + ok -> + {?RESPONSE_CODE_PRECONDITION_FAILED, ?PRECONDITION_FAILED}; + error -> + {?RESPONSE_CODE_ACCESS_REFUSED, ?ACCESS_REFUSED} + end, + response(Transport, + C, + declare_publisher, + CorrelationId, + Code), + increase_protocol_counter(Counter), + {C, State}; handle_frame_post_auth(Transport, #stream_connection{user = User, publishers = Publishers0, publisher_to_ids = RefIds0, - resource_alarm = false} = + resource_alarms = ResourceAlarms} = Connection0, State, {request, CorrelationId, - {declare_publisher, PublisherId, WriterRef, Stream}}) -> + {declare_publisher, PublisherId, WriterRef, Stream}}) + when map_size(ResourceAlarms) =:= 0 -> case check_write_permitted(stream_r(Stream, Connection0), User) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl b/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl index 0f572322189..5775dcc835a 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl @@ -24,6 +24,9 @@ authenticating | authenticated | tuning | tuned | opened | failure | closing | close_sent | closing_done. +-define(STREAM_DISK_ALARM, {disk, rabbit_stream_queue}). +%% subset of rabbit_alarm:resource_alarm_source() +-type blocked_resource() :: disk | ?STREAM_DISK_ALARM. -record(publisher, {publisher_id :: publisher_id(), @@ -86,7 +89,7 @@ client_properties = #{} :: #{binary() => binary()}, monitors = #{} :: #{reference() => {pid(), stream()}}, stats_timer :: undefined | rabbit_event:state(), - resource_alarm :: boolean(), + resource_alarms :: sets:set(blocked_resource()), send_file_oct :: atomics:atomics_ref(), % number of bytes sent with send_file (for metrics) transport :: tcp | ssl, From d288f74a9c5fa40d125e9df93f7e77ba377fa02b Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 10 Nov 2025 14:15:54 -0500 Subject: [PATCH 17/18] MQTT: Handle per-queue-type disk alarms This includes regular MQTT and MQTT-over-WebSockets. --- .../src/rabbit_mqtt_processor.erl | 27 ++++++++++++++----- deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl | 5 ++-- .../src/rabbit_web_mqtt_handler.erl | 5 ++-- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index e20c0b3267c..7512faf8048 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -99,6 +99,8 @@ -record(state, {cfg :: #cfg{}, queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state(), + queue_types_published = sets:new([{version, 2}]) :: + sets:set(rabbit_queue_type:queue_type()), %% Packet IDs published to queues but not yet confirmed. unacked_client_pubs = rabbit_mqtt_confirms:init() :: rabbit_mqtt_confirms:state(), %% Packet IDs published to MQTT subscribers but not yet acknowledged. @@ -1697,14 +1699,19 @@ deliver_to_queues(Message, Options, RoutedToQNames, State0 = #state{queue_states = QStates0, + queue_types_published = QTs0, cfg = #cfg{proto_ver = ProtoVer}}) -> Qs0 = rabbit_db_queue:get_targets(RoutedToQNames), Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), case rabbit_queue_type:deliver(Qs, Message, Options, QStates0) of {ok, QStates, Actions} -> rabbit_global_counters:messages_routed(ProtoVer, length(Qs)), - State = process_routing_confirm(Options, Qs, - State0#state{queue_states = QStates}), + QTs1 = sets:from_list(rabbit_amqqueue:queue_types(Qs), + [{version, 2}]), + QTs = sets:union(QTs0, QTs1), + State1 = State0#state{queue_states = QStates, + queue_types_published = QTs}, + State = process_routing_confirm(Options, Qs, State1), %% Actions must be processed after registering confirms as actions may %% contain rejections of publishes. {ok, handle_queue_actions(Actions, State)}; @@ -2362,10 +2369,18 @@ is_socket_busy(Socket) -> false end. --spec throttle(boolean(), state()) -> boolean(). -throttle(Conserve, #state{queues_soft_limit_exceeded = QSLE, - cfg = #cfg{published = Published}}) -> - Conserve andalso Published orelse +-spec throttle(sets:set(rabbit_alarm:resource_alarm_source()), state()) -> + boolean(). +throttle(BlockedBy, #state{queues_soft_limit_exceeded = QSLE, + queue_types_published = QTs, + cfg = #cfg{published = Published}}) -> + Alarmed = sets:fold( + fun ({disk, QT}, Acc) -> + Acc orelse sets:is_element(QT, QTs); + (_, _) -> + true + end, false, BlockedBy), + Alarmed andalso Published orelse not sets:is_empty(QSLE) orelse credit_flow:blocked(). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index de047e55a3f..5d455997082 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -428,10 +428,9 @@ control_throttle(State = #state{connection_state = ConnState, proc_state = PState, keepalive = KState }) -> - Conserve = not sets:is_empty(BlockedBy), Throttle = case PState of - connect_packet_unprocessed -> Conserve; - _ -> rabbit_mqtt_processor:throttle(Conserve, PState) + connect_packet_unprocessed -> not sets:is_empty(BlockedBy); + _ -> rabbit_mqtt_processor:throttle(BlockedBy, PState) end, case {ConnState, Throttle} of {running, true} -> diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index a5290d7ff6e..1f4c5b12882 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -414,10 +414,9 @@ control_throttle(State = #state{connection_state = ConnState, proc_state = PState, keepalive = KState }) -> - Conserve = not sets:is_empty(BlockedBy), Throttle = case PState of - connect_packet_unprocessed -> Conserve; - _ -> rabbit_mqtt_processor:throttle(Conserve, PState) + connect_packet_unprocessed -> not sets:is_empty(BlockedBy); + _ -> rabbit_mqtt_processor:throttle(BlockedBy, PState) end, case {ConnState, Throttle} of {running, true} -> From d8b19d35825980b934cef204cd2a440f04360a5b Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 10 Nov 2025 14:17:10 -0500 Subject: [PATCH 18/18] AMQP 1.0: Handle per-queue-type disk alarms --- deps/rabbit/src/rabbit_amqp_session.erl | 98 ++++++++++++------- deps/rabbit/test/amqp_client_SUITE.erl | 79 ++++++++++++++- .../src/rabbit_ct_broker_helpers.erl | 4 + 3 files changed, 147 insertions(+), 34 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index b8e6ce82f5d..1195d3222f5 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -381,6 +381,8 @@ stashed_eol = [] :: [rabbit_amqqueue:name()], queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state(), + queue_types_published = sets:new([{version, 2}]) :: + sets:set(rabbit_queue_type:queue_type()), permission_cache = [] :: permission_cache(), topic_permission_cache = [] :: topic_permission_cache() }). @@ -448,9 +450,14 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId, true = is_valid_max(MaxLinkCredit), true = is_valid_max(MaxQueueCredit), true = is_valid_max(MaxIncomingWindow), - IncomingWindow = case sets:is_empty(Alarms) of - true -> MaxIncomingWindow; - false -> 0 + InResourceAlarm = sets:fold(fun ({disk, _}, Acc) -> + Acc; + (_, _) -> + true + end, false, Alarms), + IncomingWindow = case InResourceAlarm of + true -> 0; + false -> MaxIncomingWindow end, NextOutgoingId = ?INITIAL_OUTGOING_TRANSFER_ID, @@ -582,36 +589,17 @@ handle_cast({queue_event, _, _} = QEvent, State0) -> log_error_and_close_session(Error, State0) end; handle_cast({conserve_resources, Alarm, Conserve}, - #state{incoming_window = IncomingWindow0, - cfg = #cfg{resource_alarms = Alarms0, - incoming_window_margin = Margin0, + #state{cfg = #cfg{resource_alarms = Alarms0, writer_pid = WriterPid, - channel_num = Ch, - max_incoming_window = MaxIncomingWindow + channel_num = Ch } = Cfg } = State0) -> Alarms = case Conserve of true -> sets:add_element(Alarm, Alarms0); false -> sets:del_element(Alarm, Alarms0) end, - {SendFlow, IncomingWindow, Margin} = - case {sets:is_empty(Alarms0), sets:is_empty(Alarms)} of - {true, false} -> - %% Alarm kicked in. - %% Notify the client to not send us any more TRANSFERs. Since we decrase - %% our incoming window dynamically, there might be incoming in-flight - %% TRANSFERs. So, let's be lax and allow for some excess TRANSFERs. - {true, 0, MaxIncomingWindow}; - {false, true} -> - %% All alarms cleared. - %% Notify the client that it can resume sending us TRANSFERs. - {true, MaxIncomingWindow, 0}; - _ -> - {false, IncomingWindow0, Margin0} - end, - State = State0#state{incoming_window = IncomingWindow, - cfg = Cfg#cfg{resource_alarms = Alarms, - incoming_window_margin = Margin}}, + State1 = State0#state{cfg = Cfg#cfg{resource_alarms = Alarms}}, + {SendFlow, State} = check_resource_alarm(State0, State1), case SendFlow of true -> Flow = session_flow_fields(#'v1_0.flow'{}, State), @@ -637,6 +625,41 @@ handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) -> handle_cast(shutdown, State) -> {stop, normal, State}. +is_in_resource_alarm(#state{cfg = #cfg{resource_alarms = Alarms}, + queue_types_published = QTs}) -> + sets:fold( + fun ({disk, QT}, Acc) -> + Acc orelse sets:is_element(QT, QTs); + (_, _) -> + true + end, false, Alarms). + +check_resource_alarm(State0, + #state{incoming_window = IncomingWindow0, + cfg = #cfg{incoming_window_margin = Margin0, + max_incoming_window = MaxIncomingWindow + } = Cfg} = State1) -> + WasBlocked = is_in_resource_alarm(State0), + IsBlocked = is_in_resource_alarm(State1), + {SendFlow, IncomingWindow, Margin} = + case IsBlocked of + true when not WasBlocked -> + %% Alarm kicked in. + %% Notify the client to not send us any more TRANSFERs. Since we decrase + %% our incoming window dynamically, there might be incoming in-flight + %% TRANSFERs. So, let's be lax and allow for some excess TRANSFERs. + {true, 0, MaxIncomingWindow}; + false when WasBlocked -> + %% All alarms cleared. + %% Notify the client that it can resume sending us TRANSFERs. + {true, MaxIncomingWindow, 0}; + _ -> + {false, IncomingWindow0, Margin0} + end, + State = State1#state{incoming_window = IncomingWindow, + cfg = Cfg#cfg{incoming_window_margin = Margin}}, + {SendFlow, State}. + log_error_and_close_session( Error, State = #state{cfg = #cfg{reader_pid = ReaderPid, writer_pid = WriterPid, @@ -1940,7 +1963,6 @@ session_flow_control_received_transfer( incoming_window = InWindow0, remote_outgoing_window = RemoteOutgoingWindow, cfg = #cfg{incoming_window_margin = Margin, - resource_alarms = Alarms, max_incoming_window = MaxIncomingWindow} } = State) -> InWindow1 = InWindow0 - 1, @@ -1954,7 +1976,7 @@ session_flow_control_received_transfer( ok end, {Flows, InWindow} = case InWindow1 =< (MaxIncomingWindow div 2) andalso - sets:is_empty(Alarms) of + not is_in_resource_alarm(State) of true -> %% We've reached halfway and there are no %% disk or memory alarm, open the window. @@ -2371,6 +2393,7 @@ incoming_link_transfer( multi_transfer_msg = MultiTransfer } = Link0, State0 = #state{queue_states = QStates0, + queue_types_published = QTs0, permission_cache = PermCache0, topic_permission_cache = TopicPermCache0, cfg = #cfg{user = User = #user{username = Username}, @@ -2414,19 +2437,26 @@ incoming_link_transfer( Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), case rabbit_queue_type:deliver(Qs, Mc, Opts, QStates0) of {ok, QStates, Actions} -> + QTs1 = sets:from_list(rabbit_amqqueue:queue_types(Qs), + [{version, 2}]), + QTs = sets:union(QTs0, QTs1), State1 = State0#state{queue_states = QStates, + queue_types_published = QTs, permission_cache = PermCache, topic_permission_cache = TopicPermCache}, %% Confirms must be registered before processing actions %% because actions may contain rejections of publishes. {U, Reply0} = process_routing_confirm( Qs, Settled, DeliveryId, U0), - State = handle_queue_actions(Actions, State1), + State2 = handle_queue_actions(Actions, State1), + {SendAlarmFlow, State} = check_resource_alarm( + State0, State2), DeliveryCount = add(DeliveryCount0, 1), Credit1 = Credit0 - 1, {Credit, Reply1} = maybe_grant_link_credit( Credit1, MaxLinkCredit, - DeliveryCount, map_size(U), Handle), + DeliveryCount, map_size(U), Handle, + SendAlarmFlow), Reply = Reply0 ++ Reply1, Link = Link0#incoming_link{ delivery_count = DeliveryCount, @@ -2461,7 +2491,8 @@ incoming_link_transfer( Credit1 = Credit0 - 1, {Credit, Reply0} = maybe_grant_link_credit( Credit1, MaxLinkCredit, - DeliveryCount, map_size(U0), Handle), + DeliveryCount, map_size(U0), Handle, + false), Reply = [Disposition | Reply0], Link = Link0#incoming_link{ delivery_count = DeliveryCount, @@ -2574,8 +2605,9 @@ rejected(DeliveryId, Error) -> settled = true, state = #'v1_0.rejected'{error = Error}}. -maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, Handle) -> - case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) of +maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, + Handle, AlarmFlow) -> + case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) orelse AlarmFlow of true -> {MaxLinkCredit, [flow(Handle, DeliveryCount, MaxLinkCredit)]}; false -> diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 9287b10d805..cdd470aae65 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -116,6 +116,7 @@ groups() -> resource_alarm_before_session_begin, resource_alarm_after_session_begin, resource_alarm_send_many, + per_queue_type_disk_alarm, max_message_size_client_to_server, max_message_size_server_to_client, global_counters, @@ -225,7 +226,13 @@ init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), rabbit_ct_helpers:merge_app_env( Config, {rabbit, [{quorum_tick_interval, 1000}, - {stream_tick_interval, 1000} + {stream_tick_interval, 1000}, + %% Imaginary mount-point for per-queue-type disk alarms + {disk_free_limits, + #{1 => #{name => <<"streaming">>, + mount => "/does/not/exist", + limit => "2GB", + queue_types => [<<"stream">>]}}} ]}). end_per_suite(Config) -> @@ -3263,6 +3270,76 @@ auth_attempt_metrics(Config) -> ?assertEqual(0, proplists:get_value(auth_attempts_failed, Attempt2)), ?assertEqual(1, proplists:get_value(auth_attempts_succeeded, Attempt2)). +per_queue_type_disk_alarm(Config) -> + Prefix = atom_to_binary(?FUNCTION_NAME), + Resource = {disk, rabbit_stream_queue}, + CQ = <>, + SQ = <>, + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = CQ}), + #'queue.declare_ok'{} = amqp_channel:call( + Ch, #'queue.declare'{ + queue = SQ, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}), + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + + %% Set the alarm for the stream queue type. + ok = rabbit_ct_broker_helpers:set_alarm(Config, 0, Resource), + + %% Attach one sender to the CQ and one to the SQ. + {ok, Session1} = amqp10_client:begin_session_sync(Connection), + {ok, Sender1} = amqp10_client:attach_sender_link( + Session1, <>, + rabbitmq_amqp_address:queue(CQ), unsettled), + {ok, Session2} = amqp10_client:begin_session_sync(Connection), + {ok, Sender2} = amqp10_client:attach_sender_link( + Session2, <>, + rabbitmq_amqp_address:queue(SQ), unsettled), + + %% Both senders initially have link and session credit. + ok = wait_for_credit(Sender1), + ok = wait_for_credit(Sender2), + Tag1 = <<"tag1">>, + Msg1 = amqp10_msg:new(Tag1, <<"m1">>, false), + ?assertEqual(ok, + amqp10_client:send_msg(Sender1, Msg1)), + ok = wait_for_accepted(Tag1), + ?assertEqual(ok, + amqp10_client:send_msg(Sender2, Msg1)), + ok = wait_for_accepted(Tag1), + + %% Once the SQ sender has delivered to a stream, it becomes blocked by + %% session flow control. + Tag2 = <<"tag2">>, + Msg2 = amqp10_msg:new(Tag2, <<"m2">>, false), + ?assertEqual(ok, + amqp10_client:send_msg(Sender1, Msg2)), + ok = wait_for_accepted(Tag2), + ?assertEqual({error, remote_incoming_window_exceeded}, + amqp10_client:send_msg(Sender2, Msg2)), + + %% Clear the alarm and the SQ sender can then send transfers. + ok = rabbit_ct_broker_helpers:clear_alarm(Config, 0, Resource), + Tag3 = <<"tag3">>, + Msg3 = amqp10_msg:new(Tag3, <<"m3">>, false), + ?assertEqual(ok, + amqp10_client:send_msg(Sender1, Msg3)), + ok = wait_for_accepted(Tag3), + ?assertEqual(ok, + amqp10_client:send_msg(Sender2, Msg3)), + ok = wait_for_accepted(Tag3), + + ok = amqp10_client:detach_link(Sender1), + ok = end_session_sync(Session1), + ok = amqp10_client:detach_link(Sender2), + ok = end_session_sync(Session2), + #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = CQ}), + #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = SQ}), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). + max_message_size_client_to_server(Config) -> DefaultMaxMessageSize = rpc(Config, persistent_term, get, [max_message_size]), %% Limit the server to only accept messages up to 2KB. diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl index 3e745433097..1332c251e8f 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl @@ -1815,6 +1815,8 @@ set_alarm(Config, Node, file_descriptor_limit = Resource) -> set_alarm(Config, Node, memory = Resource) -> rpc(Config, Node, rabbit_alarm, set_alarm, [{{resource_limit, Resource, Node}, []}]); set_alarm(Config, Node, disk = Resource) -> + rpc(Config, Node, rabbit_alarm, set_alarm, [{{resource_limit, Resource, Node}, []}]); +set_alarm(Config, Node, {disk, QueueType} = Resource) when is_atom(QueueType) -> rpc(Config, Node, rabbit_alarm, set_alarm, [{{resource_limit, Resource, Node}, []}]). get_alarms(Config, Node) -> @@ -1828,6 +1830,8 @@ clear_alarm(Config, Node, file_descriptor_limit = Resource) -> clear_alarm(Config, Node, memory = Resource) -> rpc(Config, Node, rabbit_alarm, clear_alarm, [{resource_limit, Resource, Node}]); clear_alarm(Config, Node, disk = Resource) -> + rpc(Config, Node, rabbit_alarm, clear_alarm, [{resource_limit, Resource, Node}]); +clear_alarm(Config, Node, {disk, QueueType} = Resource) when is_atom(QueueType) -> rpc(Config, Node, rabbit_alarm, clear_alarm, [{resource_limit, Resource, Node}]). clear_all_alarms(Config, Node) ->