Skip to content

Commit 6dc214e

Browse files
Merge pull request #14926 from rabbitmq/qq-drop-head-order
Fix quorum queue `drop-head` dead letter order
2 parents 1262b01 + 59fdd73 commit 6dc214e

File tree

4 files changed

+99
-33
lines changed

4 files changed

+99
-33
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -411,16 +411,16 @@ apply(#{index := Index,
411411
Effects2 = [reply_log_effect(RaftIdx, MsgId, Header,
412412
messages_ready(State4), From)
413413
| Effects1],
414-
{State, _DroppedMsg, Effects} =
415-
evaluate_limit(Index, false, State0, State4, Effects2),
414+
{State, Effects} = evaluate_limit(Index, State0,
415+
State4, Effects2),
416416
{State, '$ra_no_reply', Effects};
417417
{nochange, _ExpiredMsg = true, State2, Effects0} ->
418418
%% All ready messages expired.
419419
State3 = State2#?STATE{consumers =
420420
maps:remove(ConsumerId,
421421
State2#?STATE.consumers)},
422-
{State, _, Effects} = evaluate_limit(Index, false, State0,
423-
State3, Effects0),
422+
{State, Effects} = evaluate_limit(Index, State0,
423+
State3, Effects0),
424424
{State, {dequeue, empty}, Effects}
425425
end
426426
end;
@@ -515,8 +515,7 @@ apply(#{index := Index}, #purge{},
515515
},
516516
Effects0 = [{aux, force_checkpoint}, garbage_collection],
517517
Reply = {purge, NumReady},
518-
{State, _, Effects} = evaluate_limit(Index, false, State0,
519-
State1, Effects0),
518+
{State, Effects} = evaluate_limit(Index, State0, State1, Effects0),
520519
{State, Reply, Effects};
521520
apply(#{index := _Idx}, #garbage_collection{}, State) ->
522521
{State, ok, [{aux, garbage_collection}]};
@@ -1662,7 +1661,6 @@ combine_effects([{mod_call,
16621661
combine_effects(New, Old) ->
16631662
New ++ Old.
16641663

1665-
16661664
maybe_set_msg_ttl(Msg, RaCmdTs, Header,
16671665
#?STATE{cfg = #cfg{msg_ttl = MsgTTL}}) ->
16681666
case mc:is(Msg) of
@@ -1994,10 +1992,8 @@ checkout(#{index := Index} = Meta,
19941992
State2 = State1#?STATE{msg_cache = undefined,
19951993
dlx = DlxState},
19961994
Effects2 = DlxDeliveryEffects ++ Effects1,
1997-
case evaluate_limit(Index, false, OldState, State2, Effects2) of
1998-
{State, _, Effects} ->
1999-
{State, Reply, Effects}
2000-
end.
1995+
{State, Effects} = evaluate_limit(Index, OldState, State2, Effects2),
1996+
{State, Reply, Effects}.
20011997

20021998
checkout0(Meta, {success, ConsumerKey, MsgId,
20031999
?MSG(_, _) = Msg, ExpiredMsg, State, Effects},
@@ -2014,29 +2010,37 @@ checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) ->
20142010
Effects = add_delivery_effects(Effects0, SendAcc, State0),
20152011
{State0, ExpiredMsg, lists:reverse(Effects)}.
20162012

2017-
evaluate_limit(_Index, Result,
2018-
#?STATE{cfg = #cfg{max_length = undefined,
2019-
max_bytes = undefined}},
2020-
#?STATE{cfg = #cfg{max_length = undefined,
2021-
max_bytes = undefined}} = State,
2022-
Effects) ->
2023-
{State, Result, Effects};
2024-
evaluate_limit(_Index, Result, _BeforeState,
2025-
#?STATE{cfg = #cfg{max_length = undefined,
2026-
max_bytes = undefined},
2027-
enqueuers = Enqs0} = State0,
2028-
Effects0) ->
2013+
evaluate_limit(Idx, State1, State2, OuterEffects) ->
2014+
case evaluate_limit0(Idx, State1, State2, []) of
2015+
{State, []} ->
2016+
{State, OuterEffects};
2017+
{State, Effects} ->
2018+
{State, OuterEffects ++ lists:reverse(Effects)}
2019+
end.
2020+
2021+
evaluate_limit0(_Index,
2022+
#?STATE{cfg = #cfg{max_length = undefined,
2023+
max_bytes = undefined}},
2024+
#?STATE{cfg = #cfg{max_length = undefined,
2025+
max_bytes = undefined}} = State,
2026+
Effects) ->
2027+
{State, Effects};
2028+
evaluate_limit0(_Index, _BeforeState,
2029+
#?STATE{cfg = #cfg{max_length = undefined,
2030+
max_bytes = undefined},
2031+
enqueuers = Enqs0} = State0,
2032+
Effects0) ->
20292033
%% max_length and/or max_bytes policies have just been deleted
20302034
{Enqs, Effects} = unblock_enqueuers(Enqs0, Effects0),
2031-
{State0#?STATE{enqueuers = Enqs}, Result, Effects};
2032-
evaluate_limit(Index, Result, BeforeState,
2033-
#?STATE{cfg = #cfg{overflow_strategy = Strategy},
2034-
enqueuers = Enqs0} = State0,
2035-
Effects0) ->
2035+
{State0#?STATE{enqueuers = Enqs}, Effects};
2036+
evaluate_limit0(Index, BeforeState,
2037+
#?STATE{cfg = #cfg{overflow_strategy = Strategy},
2038+
enqueuers = Enqs0} = State0,
2039+
Effects0) ->
20362040
case is_over_limit(State0) of
20372041
true when Strategy == drop_head ->
20382042
{State, Effects} = drop_head(State0, Effects0),
2039-
evaluate_limit(Index, true, BeforeState, State, Effects);
2043+
evaluate_limit0(Index, BeforeState, State, Effects);
20402044
true when Strategy == reject_publish ->
20412045
%% generate send_msg effect for each enqueuer to let them know
20422046
%% they need to block
@@ -2050,7 +2054,7 @@ evaluate_limit(Index, Result, BeforeState,
20502054
(_P, _E, Acc) ->
20512055
Acc
20522056
end, {Enqs0, Effects0}, Enqs0),
2053-
{State0#?STATE{enqueuers = Enqs}, Result, Effects};
2057+
{State0#?STATE{enqueuers = Enqs}, Effects};
20542058
false when Strategy == reject_publish ->
20552059
%% TODO: optimise as this case gets called for every command
20562060
%% pretty much
@@ -2059,12 +2063,12 @@ evaluate_limit(Index, Result, BeforeState,
20592063
{false, true} ->
20602064
%% we have moved below the lower limit
20612065
{Enqs, Effects} = unblock_enqueuers(Enqs0, Effects0),
2062-
{State0#?STATE{enqueuers = Enqs}, Result, Effects};
2066+
{State0#?STATE{enqueuers = Enqs}, Effects};
20632067
_ ->
2064-
{State0, Result, Effects0}
2068+
{State0, Effects0}
20652069
end;
20662070
false ->
2067-
{State0, Result, Effects0}
2071+
{State0, Effects0}
20682072
end.
20692073

20702074
unblock_enqueuers(Enqs0, Effects0) ->

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ all_tests() ->
167167
subscribe_redelivery_count,
168168
message_bytes_metrics,
169169
queue_length_limit_drop_head,
170+
queue_length_bytes_limit_drop_head,
170171
queue_length_limit_reject_publish,
171172
queue_length_limit_policy_cleared,
172173
subscribe_redelivery_limit,
@@ -3669,6 +3670,50 @@ queue_length_limit_drop_head(Config) ->
36693670
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
36703671
no_ack = true})).
36713672

3673+
queue_length_bytes_limit_drop_head(Config) ->
3674+
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
3675+
3676+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
3677+
QQ = ?config(queue_name, Config),
3678+
DLQ = <<"dead letter queue">>,
3679+
3680+
?assertEqual({'queue.declare_ok', DLQ, 0, 0},
3681+
declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
3682+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
3683+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
3684+
{<<"x-overflow">>, longstr, <<"drop-head">>},
3685+
{<<"x-max-length-bytes">>, long, 1000},
3686+
{<<"x-dead-letter-exchange">>, longstr, <<>>},
3687+
{<<"x-dead-letter-routing-key">>, longstr, DLQ}])),
3688+
3689+
LargePayload = binary:copy(<<"x">>, 1500),
3690+
ok = amqp_channel:cast(Ch,
3691+
#'basic.publish'{routing_key = QQ},
3692+
#amqp_msg{payload = <<"m1">>}),
3693+
ok = amqp_channel:cast(Ch,
3694+
#'basic.publish'{routing_key = QQ},
3695+
#amqp_msg{payload = <<"m2">>}),
3696+
ok = amqp_channel:cast(Ch,
3697+
#'basic.publish'{routing_key = QQ},
3698+
#amqp_msg{payload = LargePayload}),
3699+
wait_for_consensus(QQ, Config),
3700+
wait_for_consensus(DLQ, Config),
3701+
RaName = ra_name(DLQ),
3702+
wait_for_messages_ready(Servers, RaName, 3),
3703+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
3704+
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
3705+
no_ack = true})),
3706+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
3707+
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
3708+
no_ack = true})),
3709+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = LargePayload}},
3710+
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
3711+
no_ack = true})),
3712+
3713+
[?assertEqual(#'queue.delete_ok'{message_count = 0},
3714+
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
3715+
|| Q <- [QQ, DLQ]].
3716+
36723717
queue_length_limit_reject_publish(Config) ->
36733718
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
36743719

release-notes/4.2.1.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
## RabbitMQ 4.2.1
2+
3+
RabbitMQ `4.2.1` is a maintenance release in the `4.2.x` [release series](https://www.rabbitmq.com/release-information).
4+
5+
### Core Server
6+
7+
#### Bug Fixes
8+
9+
* Quorum queue at-most-once dead lettering for the overflow behaviour `drop-head` now happens in the correct order.
10+
11+
GitHub issue: [#14926](https://github.com/rabbitmq/rabbitmq-server/pull/14926)

release-notes/4.3.0.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ compared to other versions.
5757

5858
### Core Server
5959

60+
#### Bug Fixes
61+
62+
* Quorum queue at-most-once dead lettering for the overflow behaviour `drop-head` now happens in the correct order.
63+
64+
GitHub issue: [#14926](https://github.com/rabbitmq/rabbitmq-server/pull/14926)
65+
6066
#### Enhancements
6167

6268
TBD

0 commit comments

Comments
 (0)