Skip to content

Commit 70d8915

Browse files
ansdmergify[bot]
authored andcommitted
Drop large number of messages more efficiently
This commit at-most-once dead letters a huge number of quorum queue messages due to overflow behaviour `drop-head` more efficiently by avoiding repeatedly creating huge lists. A huge number of messages gets dropped if for example: * when the `max-length(-bytes)' is changed via a policy update, or * a very large message was just enqueued causing many small messages being dead lettered due to `max-length-bytes` being exceeded. (cherry picked from commit 422cc2e)
1 parent 117652b commit 70d8915

File tree

1 file changed

+22
-17
lines changed

1 file changed

+22
-17
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1639,30 +1639,27 @@ drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects) ->
16391639
#?STATE{cfg = #cfg{dead_letter_handler = DLH},
16401640
dlx = DlxState} = State = State3,
16411641
{_, DlxEffects} = rabbit_fifo_dlx:discard([Msg], maxlen, DLH, DlxState),
1642-
{State, add_drop_head_effects(DlxEffects, Effects)};
1642+
{State, combine_effects(DlxEffects, Effects)};
16431643
empty ->
16441644
{State0, Effects}
16451645
end.
16461646

1647-
add_drop_head_effects([{mod_call,
1648-
rabbit_global_counters,
1649-
messages_dead_lettered,
1650-
[Reason, rabbit_quorum_queue, Type, NewLen]}],
1651-
[{mod_call,
1652-
rabbit_global_counters,
1653-
messages_dead_lettered,
1654-
[Reason, rabbit_quorum_queue, Type, PrevLen]} | Rem]) ->
1655-
%% combine global counter update effects to avoid bulding a huge list of
1656-
%% effects if many messages are dropped at the same time as could happen
1657-
%% when the `max_length' is changed via a configuration update.
1647+
%% combine global counter update effects to avoid bulding a huge list of
1648+
%% effects if many messages are dropped at the same time as could happen
1649+
%% when the `max_length' is changed via a configuration update.
1650+
combine_effects([{mod_call,
1651+
rabbit_global_counters,
1652+
messages_dead_lettered,
1653+
[Reason, rabbit_quorum_queue, Type, NewLen]}],
1654+
[{mod_call,
1655+
rabbit_global_counters,
1656+
messages_dead_lettered,
1657+
[Reason, rabbit_quorum_queue, Type, PrevLen]} | Rem]) ->
16581658
[{mod_call,
16591659
rabbit_global_counters,
16601660
messages_dead_lettered,
16611661
[Reason, rabbit_quorum_queue, Type, PrevLen + NewLen]} | Rem];
1662-
add_drop_head_effects([{log, _, _}] = DlxEffs, Effs) ->
1663-
%% dead letter in the correct order
1664-
Effs ++ DlxEffs;
1665-
add_drop_head_effects(New, Old) ->
1662+
combine_effects(New, Old) ->
16661663
New ++ Old.
16671664

16681665
maybe_set_msg_ttl(Msg, RaCmdTs, Header,
@@ -2032,13 +2029,21 @@ evaluate_limit(_Index, Result, _BeforeState,
20322029
{Enqs, Effects} = unblock_enqueuers(Enqs0, Effects0),
20332030
{State0#?STATE{enqueuers = Enqs}, Result, Effects};
20342031
evaluate_limit(Index, Result, BeforeState,
2035-
#?STATE{cfg = #cfg{overflow_strategy = Strategy},
2032+
#?STATE{cfg = #cfg{overflow_strategy = Strategy,
2033+
dead_letter_handler = DLH},
20362034
enqueuers = Enqs0} = State0,
20372035
Effects0) ->
20382036
case is_over_limit(State0) of
20392037
true when Strategy == drop_head ->
20402038
{State, Effects} = drop_head(State0, Effects0),
20412039
evaluate_limit(Index, true, BeforeState, State, Effects);
2040+
false when Strategy == drop_head andalso
2041+
Result =:= true andalso
2042+
element(1, DLH) =:= at_most_once ->
2043+
%% At most once dead letter in the correct order.
2044+
Dropped = BeforeState#?STATE.messages_total - State0#?STATE.messages_total,
2045+
{LogEffects, Effects} = lists:split(Dropped, Effects0),
2046+
{State0, Result, Effects ++ lists:reverse(LogEffects)};
20422047
true when Strategy == reject_publish ->
20432048
%% generate send_msg effect for each enqueuer to let them know
20442049
%% they need to block

0 commit comments

Comments
 (0)