Skip to content

Commit 40398b9

Browse files
Merge pull request #14938 from rabbitmq/mergify/bp/v4.2.x/pr-14926
Fix quorum queue `drop-head` dead letter order (backport #14926)
2 parents 1eff3e4 + ee10f11 commit 40398b9

File tree

3 files changed

+93
-33
lines changed

3 files changed

+93
-33
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -411,16 +411,16 @@ apply(#{index := Index,
411411
Effects2 = [reply_log_effect(RaftIdx, MsgId, Header,
412412
messages_ready(State4), From)
413413
| Effects1],
414-
{State, _DroppedMsg, Effects} =
415-
evaluate_limit(Index, false, State0, State4, Effects2),
414+
{State, Effects} = evaluate_limit(Index, State0,
415+
State4, Effects2),
416416
{State, '$ra_no_reply', Effects};
417417
{nochange, _ExpiredMsg = true, State2, Effects0} ->
418418
%% All ready messages expired.
419419
State3 = State2#?STATE{consumers =
420420
maps:remove(ConsumerId,
421421
State2#?STATE.consumers)},
422-
{State, _, Effects} = evaluate_limit(Index, false, State0,
423-
State3, Effects0),
422+
{State, Effects} = evaluate_limit(Index, State0,
423+
State3, Effects0),
424424
{State, {dequeue, empty}, Effects}
425425
end
426426
end;
@@ -515,8 +515,7 @@ apply(#{index := Index}, #purge{},
515515
},
516516
Effects0 = [{aux, force_checkpoint}, garbage_collection],
517517
Reply = {purge, NumReady},
518-
{State, _, Effects} = evaluate_limit(Index, false, State0,
519-
State1, Effects0),
518+
{State, Effects} = evaluate_limit(Index, State0, State1, Effects0),
520519
{State, Reply, Effects};
521520
apply(#{index := _Idx}, #garbage_collection{}, State) ->
522521
{State, ok, [{aux, garbage_collection}]};
@@ -1662,7 +1661,6 @@ combine_effects([{mod_call,
16621661
combine_effects(New, Old) ->
16631662
New ++ Old.
16641663

1665-
16661664
maybe_set_msg_ttl(Msg, RaCmdTs, Header,
16671665
#?STATE{cfg = #cfg{msg_ttl = MsgTTL}}) ->
16681666
case mc:is(Msg) of
@@ -1994,10 +1992,8 @@ checkout(#{index := Index} = Meta,
19941992
State2 = State1#?STATE{msg_cache = undefined,
19951993
dlx = DlxState},
19961994
Effects2 = DlxDeliveryEffects ++ Effects1,
1997-
case evaluate_limit(Index, false, OldState, State2, Effects2) of
1998-
{State, _, Effects} ->
1999-
{State, Reply, Effects}
2000-
end.
1995+
{State, Effects} = evaluate_limit(Index, OldState, State2, Effects2),
1996+
{State, Reply, Effects}.
20011997

20021998
checkout0(Meta, {success, ConsumerKey, MsgId,
20031999
?MSG(_, _) = Msg, ExpiredMsg, State, Effects},
@@ -2014,29 +2010,37 @@ checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) ->
20142010
Effects = add_delivery_effects(Effects0, SendAcc, State0),
20152011
{State0, ExpiredMsg, lists:reverse(Effects)}.
20162012

2017-
evaluate_limit(_Index, Result,
2018-
#?STATE{cfg = #cfg{max_length = undefined,
2019-
max_bytes = undefined}},
2020-
#?STATE{cfg = #cfg{max_length = undefined,
2021-
max_bytes = undefined}} = State,
2022-
Effects) ->
2023-
{State, Result, Effects};
2024-
evaluate_limit(_Index, Result, _BeforeState,
2025-
#?STATE{cfg = #cfg{max_length = undefined,
2026-
max_bytes = undefined},
2027-
enqueuers = Enqs0} = State0,
2028-
Effects0) ->
2013+
evaluate_limit(Idx, State1, State2, OuterEffects) ->
2014+
case evaluate_limit0(Idx, State1, State2, []) of
2015+
{State, []} ->
2016+
{State, OuterEffects};
2017+
{State, Effects} ->
2018+
{State, OuterEffects ++ lists:reverse(Effects)}
2019+
end.
2020+
2021+
evaluate_limit0(_Index,
2022+
#?STATE{cfg = #cfg{max_length = undefined,
2023+
max_bytes = undefined}},
2024+
#?STATE{cfg = #cfg{max_length = undefined,
2025+
max_bytes = undefined}} = State,
2026+
Effects) ->
2027+
{State, Effects};
2028+
evaluate_limit0(_Index, _BeforeState,
2029+
#?STATE{cfg = #cfg{max_length = undefined,
2030+
max_bytes = undefined},
2031+
enqueuers = Enqs0} = State0,
2032+
Effects0) ->
20292033
%% max_length and/or max_bytes policies have just been deleted
20302034
{Enqs, Effects} = unblock_enqueuers(Enqs0, Effects0),
2031-
{State0#?STATE{enqueuers = Enqs}, Result, Effects};
2032-
evaluate_limit(Index, Result, BeforeState,
2033-
#?STATE{cfg = #cfg{overflow_strategy = Strategy},
2034-
enqueuers = Enqs0} = State0,
2035-
Effects0) ->
2035+
{State0#?STATE{enqueuers = Enqs}, Effects};
2036+
evaluate_limit0(Index, BeforeState,
2037+
#?STATE{cfg = #cfg{overflow_strategy = Strategy},
2038+
enqueuers = Enqs0} = State0,
2039+
Effects0) ->
20362040
case is_over_limit(State0) of
20372041
true when Strategy == drop_head ->
20382042
{State, Effects} = drop_head(State0, Effects0),
2039-
evaluate_limit(Index, true, BeforeState, State, Effects);
2043+
evaluate_limit0(Index, BeforeState, State, Effects);
20402044
true when Strategy == reject_publish ->
20412045
%% generate send_msg effect for each enqueuer to let them know
20422046
%% they need to block
@@ -2050,7 +2054,7 @@ evaluate_limit(Index, Result, BeforeState,
20502054
(_P, _E, Acc) ->
20512055
Acc
20522056
end, {Enqs0, Effects0}, Enqs0),
2053-
{State0#?STATE{enqueuers = Enqs}, Result, Effects};
2057+
{State0#?STATE{enqueuers = Enqs}, Effects};
20542058
false when Strategy == reject_publish ->
20552059
%% TODO: optimise as this case gets called for every command
20562060
%% pretty much
@@ -2059,12 +2063,12 @@ evaluate_limit(Index, Result, BeforeState,
20592063
{false, true} ->
20602064
%% we have moved below the lower limit
20612065
{Enqs, Effects} = unblock_enqueuers(Enqs0, Effects0),
2062-
{State0#?STATE{enqueuers = Enqs}, Result, Effects};
2066+
{State0#?STATE{enqueuers = Enqs}, Effects};
20632067
_ ->
2064-
{State0, Result, Effects0}
2068+
{State0, Effects0}
20652069
end;
20662070
false ->
2067-
{State0, Result, Effects0}
2071+
{State0, Effects0}
20682072
end.
20692073

20702074
unblock_enqueuers(Enqs0, Effects0) ->

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ all_tests() ->
167167
subscribe_redelivery_count,
168168
message_bytes_metrics,
169169
queue_length_limit_drop_head,
170+
queue_length_bytes_limit_drop_head,
170171
queue_length_limit_reject_publish,
171172
queue_length_limit_policy_cleared,
172173
subscribe_redelivery_limit,
@@ -3697,6 +3698,50 @@ queue_length_limit_drop_head(Config) ->
36973698
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
36983699
no_ack = true})).
36993700

3701+
queue_length_bytes_limit_drop_head(Config) ->
3702+
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
3703+
3704+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
3705+
QQ = ?config(queue_name, Config),
3706+
DLQ = <<"dead letter queue">>,
3707+
3708+
?assertEqual({'queue.declare_ok', DLQ, 0, 0},
3709+
declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
3710+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
3711+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
3712+
{<<"x-overflow">>, longstr, <<"drop-head">>},
3713+
{<<"x-max-length-bytes">>, long, 1000},
3714+
{<<"x-dead-letter-exchange">>, longstr, <<>>},
3715+
{<<"x-dead-letter-routing-key">>, longstr, DLQ}])),
3716+
3717+
LargePayload = binary:copy(<<"x">>, 1500),
3718+
ok = amqp_channel:cast(Ch,
3719+
#'basic.publish'{routing_key = QQ},
3720+
#amqp_msg{payload = <<"m1">>}),
3721+
ok = amqp_channel:cast(Ch,
3722+
#'basic.publish'{routing_key = QQ},
3723+
#amqp_msg{payload = <<"m2">>}),
3724+
ok = amqp_channel:cast(Ch,
3725+
#'basic.publish'{routing_key = QQ},
3726+
#amqp_msg{payload = LargePayload}),
3727+
wait_for_consensus(QQ, Config),
3728+
wait_for_consensus(DLQ, Config),
3729+
RaName = ra_name(DLQ),
3730+
wait_for_messages_ready(Servers, RaName, 3),
3731+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
3732+
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
3733+
no_ack = true})),
3734+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
3735+
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
3736+
no_ack = true})),
3737+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = LargePayload}},
3738+
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
3739+
no_ack = true})),
3740+
3741+
[?assertEqual(#'queue.delete_ok'{message_count = 0},
3742+
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
3743+
|| Q <- [QQ, DLQ]].
3744+
37003745
queue_length_limit_reject_publish(Config) ->
37013746
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
37023747

release-notes/4.2.1.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
## RabbitMQ 4.2.1
2+
3+
RabbitMQ `4.2.1` is a maintenance release in the `4.2.x` [release series](https://www.rabbitmq.com/release-information).
4+
5+
### Core Server
6+
7+
#### Bug Fixes
8+
9+
* Quorum queue at-most-once dead lettering for the overflow behaviour `drop-head` now happens in the correct order.
10+
11+
GitHub issue: [#14926](https://github.com/rabbitmq/rabbitmq-server/pull/14926)

0 commit comments

Comments
 (0)