From ac86eb6af8730fdd24e484a689bfb9ad32934067 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 5 Nov 2025 21:40:12 +0100 Subject: [PATCH 1/2] khepri --- rabbitmq-components.mk | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 61891fc7b63..9384550b734 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -46,7 +46,8 @@ dep_credentials_obfuscation = hex 3.5.0 dep_cuttlefish = hex 3.6.0 dep_gen_batch_server = hex 0.8.8 dep_jose = hex 1.11.10 -dep_khepri = hex 0.17.2 +#dep_khepri = hex 0.17.2 +dep_khepri = git https://github.com/rabbitmq/khepri.git skip-keep_while-on-children-when-parent-created dep_khepri_mnesia_migration = hex 0.8.0 dep_meck = hex 1.0.0 dep_osiris = git https://github.com/rabbitmq/osiris v1.10.1 From 3a75b6c9d46565be2d274d38c1c220d2167046f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 5 Nov 2025 19:46:14 +0100 Subject: [PATCH 2/2] rabbit_db: Eliminate the `delete_queue` Khepri transaction ... by using `keep_while` conditions on bindings and auto-delete exchanges. [Why] The `delete_queue` transaction's anonymous function has to be be extracted by Horus, like any Khepri transaction. This is an expensive operation, but Horus uses caching to avoid most work after the first extraction. The problem is when there are many concurrent executions of the same transaction, before it has been executed once: the cache is not hot and Horus has to extract the same transaction many times in parallel currently. An example of this situation is when there are massive disconnections from RabbitMQ clients that trigger massive queue deletions. This can put a lot of load on RabbitMQ. [How] This patch removes the entire transaction. Instead, it uses `keep_while` conditions on bindings and auto-delete exchanges to let Khepri handle the deletion of semantically related tree nodes. RabbitMQ just has to make a simle "delete this queue" command. --- deps/rabbit/src/rabbit_db_binding.erl | 54 ++++++++++++++++++++-- deps/rabbit/src/rabbit_db_exchange.erl | 19 +++++++- deps/rabbit/src/rabbit_db_queue.erl | 62 +++++++++++++++++--------- 3 files changed, 109 insertions(+), 26 deletions(-) diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl index 2584a725e79..c7ea403b7f2 100644 --- a/deps/rabbit/src/rabbit_db_binding.erl +++ b/deps/rabbit/src/rabbit_db_binding.erl @@ -35,7 +35,8 @@ has_for_source_in_mnesia/1, has_for_source_in_khepri/1, match_source_and_destination_in_khepri_tx/2, - clear_in_khepri/0 + clear_in_khepri/0, + khepri_ret_to_deletions/2 ]). -export([ @@ -201,6 +202,14 @@ create_in_khepri(#binding{source = SrcName, case ChecksFun(Src, Dst) of ok -> RoutePath = khepri_route_path(Binding), + DstPath = case DstName of + #resource{kind = queue} -> + rabbit_db_queue:khepri_queue_path(DstName); + #resource{kind = exchange} -> + rabbit_db_exchange:khepri_exchange_path(DstName) + end, + KeepWhile = #{DstPath => #if_node_exists{}}, + PutOptions = #{keep_while => KeepWhile}, MaybeSerial = rabbit_exchange:serialise_events(Src), Serial = rabbit_khepri:transaction( fun() -> @@ -210,11 +219,17 @@ create_in_khepri(#binding{source = SrcName, true -> already_exists; false -> - ok = khepri_tx:put(RoutePath, sets:add_element(Binding, Set)), + ok = khepri_tx:put( + RoutePath, + sets:add_element(Binding, Set), + PutOptions), serial_in_khepri(MaybeSerial, Src) end; _ -> - ok = khepri_tx:put(RoutePath, sets:add_element(Binding, sets:new([{version, 2}]))), + ok = khepri_tx:put( + RoutePath, + sets:add_element(Binding, sets:new([{version, 2}])), + PutOptions), serial_in_khepri(MaybeSerial, Src) end end, rw), @@ -906,6 +921,7 @@ delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, na Name, ?KHEPRI_WILDCARD_STAR), %% RoutingKey {ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern), + % logger:alert("BindingsMap = ~p", [BindingsMap]), Bindings = maps:fold( fun(Path, Props, Acc) -> case {Path, Props} of @@ -920,6 +936,38 @@ delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, na rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4, lists:keysort(#binding.source, Bindings), OnlyDurable). +khepri_ret_to_deletions(Deleted, OnlyDurable) -> + Bindings0 = maps:fold( + fun(Path, Props, Acc) -> + case {Path, Props} of + {?RABBITMQ_KHEPRI_ROUTE_PATH( + _VHost, _SrcName, _Kind, _Name, _RoutingKey), + #{data := Set}} -> + sets:to_list(Set) ++ Acc; + {_, _} -> + Acc + end + end, [], Deleted), + Bindings1 = lists:keysort(#binding.source, Bindings0), + rabbit_binding:group_bindings_fold( + fun(XName, Bindings, Deletions, _OnlyDurable) -> + ExchangePath = rabbit_db_exchange:khepri_exchange_path(XName), + case Deleted of + #{ExchangePath := #{data := X}} -> + rabbit_binding:add_deletion( + XName, X, deleted, Bindings, Deletions); + _ -> + case rabbit_db_exchange:get(XName) of + {ok, X} -> + rabbit_binding:add_deletion( + XName, X, not_deleted, Bindings, Deletions); + _ -> + Deletions + end + end + end, + Bindings1, OnlyDurable). + %% ------------------------------------------------------------------- %% delete_transient_for_destination_in_mnesia(). %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl index 548ab78f696..981c47bc372 100644 --- a/deps/rabbit/src/rabbit_db_exchange.erl +++ b/deps/rabbit/src/rabbit_db_exchange.erl @@ -414,7 +414,24 @@ create_or_get_in_khepri(#exchange{name = XName} = X) -> Path0, [#if_any{conditions = [#if_node_exists{exists = false}, #if_has_payload{has_payload = false}]}]), - case rabbit_khepri:put(Path1, X) of + Options = case X of + #exchange{name = #resource{virtual_host = VHost, + name = Name}, + auto_delete = true} -> + Path = rabbit_db_binding:khepri_route_path( + VHost, + Name, + _Kind = ?KHEPRI_WILDCARD_STAR, + _DstName = ?KHEPRI_WILDCARD_STAR, + _RoutingKey = ?KHEPRI_WILDCARD_STAR), + KeepWhile = #{Path => #if_all{conditions = + [#if_node_exists{}, + #if_has_data{}]}}, + #{keep_while => KeepWhile}; + _ -> + #{} + end, + case rabbit_khepri:put(Path1, X, Options) of ok -> {new, X}; {error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} -> diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index 2fe9fbccf2f..cd625dcfe5c 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -411,28 +411,46 @@ delete_in_khepri(QueueName) -> delete_in_khepri(QueueName, false). delete_in_khepri(QueueName, OnlyDurable) -> - rabbit_khepri:transaction( - fun () -> - Path = khepri_queue_path(QueueName), - UsesUniformWriteRet = try - khepri_tx:does_api_comply_with(uniform_write_ret) - catch - error:undef -> - false - end, - case khepri_tx_adv:delete(Path) of - {ok, #{Path := #{data := _}}} when UsesUniformWriteRet -> - %% we want to execute some things, as decided by rabbit_exchange, - %% after the transaction. - rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable); - {ok, #{data := _}} when not UsesUniformWriteRet -> - %% we want to execute some things, as decided by rabbit_exchange, - %% after the transaction. - rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable); - {ok, _} -> - ok - end - end, rw). + Path = khepri_queue_path(QueueName), + FeatureFlag = true, + case FeatureFlag of + true -> + case khepri_adv:delete(Path) of + {ok, #{Path := #{data := _}} = Deleted} -> + %% we want to execute some things, as decided by + %% rabbit_exchange, after the transaction. + rabbit_db_binding:khepri_ret_to_deletions( + Deleted, OnlyDurable); + {ok, _} -> + ok; + {error, _} = Error -> + Error + end; + false -> + UsesUniformWriteRet = try + khepri_tx:does_api_comply_with(uniform_write_ret) + catch + error:undef -> + false + end, + rabbit_khepri:transaction( + fun () -> + Ret1 = khepri_tx_adv:delete(Path), + % logger:alert("Deleted queue ret = ~p", [Ret1]), + case Ret1 of + {ok, #{Path := #{data := _}}} when UsesUniformWriteRet -> + %% we want to execute some things, as decided by rabbit_exchange, + %% after the transaction. + rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable); + {ok, #{data := _}} when not UsesUniformWriteRet -> + %% we want to execute some things, as decided by rabbit_exchange, + %% after the transaction. + rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable); + {ok, _} -> + ok + end + end, rw) + end. %% ------------------------------------------------------------------- %% internal_delete().