Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
95a32d3
Allow configuring osiris data_dir in Cuttlefish config
the-mikedavis Oct 31, 2025
a5f1487
Allow configuring classic queue data dir in Cuttlefish config
the-mikedavis Oct 31, 2025
8d6122a
rabbit_stream_queue: Enable recovery after registering queue type
the-mikedavis Oct 24, 2025
283aa51
rabbit_alarm: Prefer maps to dicts
the-mikedavis Oct 21, 2025
d73d45d
rabbit_env: Enable disksup in os_mon but set threshold to 1.0
the-mikedavis Sep 16, 2025
74e0151
rabbit_disk_monitor: Use disksup to determine available bytes
the-mikedavis Sep 16, 2025
94b0a83
rabbit.schema: Add config options for per-queue-type disk limits
the-mikedavis Oct 16, 2025
b3b5cc4
rabbit_disk_monitor: Monitor per-queue-type mounts
the-mikedavis Oct 21, 2025
40d2d5e
rabbit_alarm: Add a helper to format resource alarm sources
the-mikedavis Oct 23, 2025
399ce7e
Set per-queue-type disk alarms for configured mounts
the-mikedavis Oct 23, 2025
73e7427
AMQP 0-9-1: Handle per-queue-type disk alarms
the-mikedavis Oct 23, 2025
be380c2
prometheus: Add core metrics per-mount for free space and limit
the-mikedavis Oct 30, 2025
984e841
management: Expose current mount stats in API and UI
the-mikedavis Oct 31, 2025
c67693e
rabbit_disk_monitor: Add config to tune polling interval
the-mikedavis Oct 31, 2025
e3fb813
CLI: Extend set_disk_free_limit to set mount limits
the-mikedavis Oct 31, 2025
f1b36bb
rabbit_stream_reader: Block during stream queue-type disk alarm
the-mikedavis Nov 4, 2025
d288f74
MQTT: Handle per-queue-type disk alarms
the-mikedavis Nov 10, 2025
d8b19d3
AMQP 1.0: Handle per-queue-type disk alarms
the-mikedavis Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions deps/amqp_client/src/amqp_gen_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
%% connection.block, connection.unblock handler
block_handler,
blocked_by = sets:new([{version, 2}]),
queue_types_published = sets:new([{version, 2}]),
closing = false %% #closing{} | false
}).

Expand Down Expand Up @@ -214,22 +215,18 @@ handle_cast({register_blocked_handler, HandlerPid},
{noreply, State1};
handle_cast({conserve_resources, Source, Conserve},
#state{blocked_by = BlockedBy} = State) ->
WasNotBlocked = sets:is_empty(BlockedBy),
BlockedBy1 = case Conserve of
true ->
sets:add_element(Source, BlockedBy);
false ->
sets:del_element(Source, BlockedBy)
end,
State1 = State#state{blocked_by = BlockedBy1},
case sets:is_empty(BlockedBy1) of
true ->
handle_method(#'connection.unblocked'{}, State1);
false when WasNotBlocked ->
handle_method(#'connection.blocked'{}, State1);
false ->
{noreply, State1}
end.
maybe_block(State, State1);
handle_cast({channel_published_to_queue_type, _ChPid, QT},
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature might need a feature flag. Here for direct connections if old client code is used on a newer server then it would error after publishing since it isn't expecting this cast. I think it would be unlikely to happen in practice but the mixed-version test suite will probably run into this.

#state{queue_types_published = QTs} = State) ->
State1 = State#state{queue_types_published = sets:add_element(QT, QTs)},
maybe_block(State, State1).

%% @private
handle_info({'DOWN', _, process, BlockHandler, Reason},
Expand Down Expand Up @@ -274,6 +271,24 @@ i(Item, #state{module = Mod, module_state = MState}) -> Mod:i(Item, MState).
register_blocked_handler(Pid, HandlerPid) ->
gen_server:cast(Pid, {register_blocked_handler, HandlerPid}).

maybe_block(State0, State1) ->
WasBlocked = should_block(State0),
case should_block(State1) of
true when not WasBlocked ->
handle_method(#'connection.blocked'{}, State1);
false when WasBlocked ->
handle_method(#'connection.unblocked'{}, State1);
_ ->
{noreply, State1}
end.

should_block(#state{blocked_by = BlockedBy, queue_types_published = QTs}) ->
lists:any(fun ({disk, QT}) ->
sets:is_element(QT, QTs);
(_Resource) ->
true
end, sets:to_list(BlockedBy)).

%%---------------------------------------------------------------------------
%% Command handling
%%---------------------------------------------------------------------------
Expand Down
108 changes: 108 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,87 @@ fun(Conf) ->
end
end}.

%% Tuning of disk monitor polling parameters
{mapping, "disk_monitor.fast_rate", "rabbit.disk_monitor_fast_rate", [
%% Unit: KB/second, for example 250_000 for 250MB/sec.
{datatype, [integer]}
]}.
{mapping, "disk_monitor.min_interval", "rabbit.disk_monitor_min_interval", [
%% Unit: milliseconds.
{datatype, [integer]}
]}.
{mapping, "disk_monitor.max_interval", "rabbit.disk_monitor_max_interval", [
%% Unit: milliseconds.
{datatype, [integer]}
]}.

%% Per-queue-type / per-mount disk alarms
{mapping, "disk_free_limits.$num.name", "rabbit.disk_free_limits", [
{datatype, [binary]}
]}.
{mapping, "disk_free_limits.$num.mount", "rabbit.disk_free_limits", [
{datatype, [string]}
]}.
{mapping, "disk_free_limits.$num.limit", "rabbit.disk_free_limits", [
{datatype, [integer, string]},
{validators, ["is_supported_information_unit"]}
]}.
{mapping, "disk_free_limits.$num.queue_types", "rabbit.disk_free_limits", [
{datatype, [binary]}
]}.

{translation, "rabbit.disk_free_limits",
fun(Conf) ->
case cuttlefish_variable:filter_by_prefix("disk_free_limits", Conf) of
[] ->
cuttlefish:unset();
Settings ->
Ls = lists:foldl(
fun ({["disk_free_limits", Num, Key0], Value0}, Acc) ->
Idx = case string:to_integer(Num) of
{N, []} -> N;
_ -> cuttlefish:invalid(lists:flatten(io_lib:format("~p could not be parsed as a number", [Num])))
end,
Key = case Key0 of
"name" -> name;
"mount" -> mount;
"limit" -> limit;
"queue_types" -> queue_types;
_ -> cuttlefish:invalid(lists:flatten(io_lib:format("~p is invalid", [Key0])))
end,
Value = case Key of
queue_types -> string:split(Value0, ",");
_ -> Value0
end,
maps:update_with(
Idx,
fun (#{Key := ExistingValue} = Limit) ->
cuttlefish:warn(
io_lib:format("Disk limit ~b has duplicate setting ~ts, "
"using ~tp instead of ~tp",
[Idx, Key, Value, ExistingValue])),
Limit#{Key := Value};
(Limit) ->
Limit#{Key => Value}
end, #{Key => Value}, Acc);
(Other, _Acc) ->
cuttlefish:invalid(
lists:flatten(io_lib:format("~p is invalid", [Other])))
end, #{}, Settings),
maps:fold(
fun(_Idx, #{name := Name}, Names) ->
case sets:is_element(Name, Names) of
true ->
cuttlefish:invalid(
lists:flatten(io_lib:format("name ~ts is used by multiple mounts", [Name])));
false ->
sets:add_element(Name, Names)
end
end, sets:new([{version, 2}]), Ls),
Ls
end
end}.

%%
%% Clustering
%% =====================
Expand Down Expand Up @@ -2620,6 +2701,20 @@ end}.
{datatype, {enum, [true, false]}}
]}.

%% Classic queue data directory
{mapping, "classic_queue.data_dir", "rabbit.classic_queue_data_dir", [
{datatype, string}
]}.

{translation, "rabbit.classic_queue_data_dir",
fun(Conf) ->
case cuttlefish:conf_get("classic_queue.data_dir", Conf, undefined) of
undefined -> cuttlefish:unset();
Val -> Val
end
end
}.

%%
%% Backing queue version
%%
Expand Down Expand Up @@ -2776,6 +2871,19 @@ fun(Conf) ->
end
end}.

{mapping, "stream.data_dir", "osiris.data_dir", [
{datatype, string}
]}.

{translation, "osiris.data_dir",
fun(Conf) ->
case cuttlefish:conf_get("stream.data_dir", Conf, undefined) of
undefined -> cuttlefish:unset();
Val -> Val
end
end
}.

{mapping, "stream.read_ahead", "rabbit.stream_read_ahead",
[{datatype, {enum, [true, false]}}]}.

Expand Down
Loading
Loading