@@ -411,16 +411,16 @@ apply(#{index := Index,
411411 Effects2 = [reply_log_effect (RaftIdx , MsgId , Header ,
412412 messages_ready (State4 ), From )
413413 | Effects1 ],
414- {State , _DroppedMsg , Effects } =
415- evaluate_limit ( Index , false , State0 , State4 , Effects2 ),
414+ {State , Effects } = evaluate_limit ( Index , State0 ,
415+ State4 , Effects2 ),
416416 {State , '$ra_no_reply' , Effects };
417417 {nochange , _ExpiredMsg = true , State2 , Effects0 } ->
418418 % % All ready messages expired.
419419 State3 = State2 #? STATE {consumers =
420420 maps :remove (ConsumerId ,
421421 State2 #? STATE .consumers )},
422- {State , _ , Effects } = evaluate_limit (Index , false , State0 ,
423- State3 , Effects0 ),
422+ {State , Effects } = evaluate_limit (Index , State0 ,
423+ State3 , Effects0 ),
424424 {State , {dequeue , empty }, Effects }
425425 end
426426 end ;
@@ -515,8 +515,7 @@ apply(#{index := Index}, #purge{},
515515 },
516516 Effects0 = [{aux , force_checkpoint }, garbage_collection ],
517517 Reply = {purge , NumReady },
518- {State , _ , Effects } = evaluate_limit (Index , false , State0 ,
519- State1 , Effects0 ),
518+ {State , Effects } = evaluate_limit (Index , State0 , State1 , Effects0 ),
520519 {State , Reply , Effects };
521520apply (#{index := _Idx }, # garbage_collection {}, State ) ->
522521 {State , ok , [{aux , garbage_collection }]};
@@ -1993,10 +1992,8 @@ checkout(#{index := Index} = Meta,
19931992 State2 = State1 #? STATE {msg_cache = undefined ,
19941993 dlx = DlxState },
19951994 Effects2 = DlxDeliveryEffects ++ Effects1 ,
1996- case evaluate_limit (Index , false , OldState , State2 , Effects2 ) of
1997- {State , _ , Effects } ->
1998- {State , Reply , Effects }
1999- end .
1995+ {State , Effects } = evaluate_limit (Index , OldState , State2 , Effects2 ),
1996+ {State , Reply , Effects }.
20001997
20011998checkout0 (Meta , {success , ConsumerKey , MsgId ,
20021999 ? MSG (_ , _ ) = Msg , ExpiredMsg , State , Effects },
@@ -2013,37 +2010,37 @@ checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) ->
20132010 Effects = add_delivery_effects (Effects0 , SendAcc , State0 ),
20142011 {State0 , ExpiredMsg , lists :reverse (Effects )}.
20152012
2016- evaluate_limit (_Index , Result ,
2017- #? STATE {cfg = # cfg {max_length = undefined ,
2018- max_bytes = undefined }},
2019- #? STATE {cfg = # cfg {max_length = undefined ,
2020- max_bytes = undefined }} = State ,
2021- Effects ) ->
2022- {State , Result , Effects };
2023- evaluate_limit (_Index , Result , _BeforeState ,
2024- #? STATE {cfg = # cfg {max_length = undefined ,
2025- max_bytes = undefined },
2026- enqueuers = Enqs0 } = State0 ,
2027- Effects0 ) ->
2013+ evaluate_limit (Idx , State1 , State2 , OuterEffects ) ->
2014+ case evaluate_limit0 (Idx , State1 , State2 , []) of
2015+ {State , []} ->
2016+ {State , OuterEffects };
2017+ {State , Effects } ->
2018+ {State , OuterEffects ++ lists :reverse (Effects )}
2019+ end .
2020+
2021+ evaluate_limit0 (_Index ,
2022+ #? STATE {cfg = # cfg {max_length = undefined ,
2023+ max_bytes = undefined }},
2024+ #? STATE {cfg = # cfg {max_length = undefined ,
2025+ max_bytes = undefined }} = State ,
2026+ Effects ) ->
2027+ {State , Effects };
2028+ evaluate_limit0 (_Index , _BeforeState ,
2029+ #? STATE {cfg = # cfg {max_length = undefined ,
2030+ max_bytes = undefined },
2031+ enqueuers = Enqs0 } = State0 ,
2032+ Effects0 ) ->
20282033 % % max_length and/or max_bytes policies have just been deleted
20292034 {Enqs , Effects } = unblock_enqueuers (Enqs0 , Effects0 ),
2030- {State0 #? STATE {enqueuers = Enqs }, Result , Effects };
2031- evaluate_limit (Index , Result , BeforeState ,
2032- #? STATE {cfg = # cfg {overflow_strategy = Strategy ,
2033- dead_letter_handler = DLH },
2034- enqueuers = Enqs0 } = State0 ,
2035- Effects0 ) ->
2035+ {State0 #? STATE {enqueuers = Enqs }, Effects };
2036+ evaluate_limit0 (Index , BeforeState ,
2037+ #? STATE {cfg = # cfg {overflow_strategy = Strategy },
2038+ enqueuers = Enqs0 } = State0 ,
2039+ Effects0 ) ->
20362040 case is_over_limit (State0 ) of
20372041 true when Strategy == drop_head ->
20382042 {State , Effects } = drop_head (State0 , Effects0 ),
2039- evaluate_limit (Index , true , BeforeState , State , Effects );
2040- false when Strategy == drop_head andalso
2041- Result =:= true andalso
2042- element (1 , DLH ) =:= at_most_once ->
2043- % % At most once dead letter in the correct order.
2044- Dropped = BeforeState #? STATE .messages_total - State0 #? STATE .messages_total ,
2045- {LogEffects , Effects } = lists :split (Dropped , Effects0 ),
2046- {State0 , Result , Effects ++ lists :reverse (LogEffects )};
2043+ evaluate_limit0 (Index , BeforeState , State , Effects );
20472044 true when Strategy == reject_publish ->
20482045 % % generate send_msg effect for each enqueuer to let them know
20492046 % % they need to block
@@ -2057,7 +2054,7 @@ evaluate_limit(Index, Result, BeforeState,
20572054 (_P , _E , Acc ) ->
20582055 Acc
20592056 end , {Enqs0 , Effects0 }, Enqs0 ),
2060- {State0 #? STATE {enqueuers = Enqs }, Result , Effects };
2057+ {State0 #? STATE {enqueuers = Enqs }, Effects };
20612058 false when Strategy == reject_publish ->
20622059 % % TODO: optimise as this case gets called for every command
20632060 % % pretty much
@@ -2066,12 +2063,12 @@ evaluate_limit(Index, Result, BeforeState,
20662063 {false , true } ->
20672064 % % we have moved below the lower limit
20682065 {Enqs , Effects } = unblock_enqueuers (Enqs0 , Effects0 ),
2069- {State0 #? STATE {enqueuers = Enqs }, Result , Effects };
2066+ {State0 #? STATE {enqueuers = Enqs }, Effects };
20702067 _ ->
2071- {State0 , Result , Effects0 }
2068+ {State0 , Effects0 }
20722069 end ;
20732070 false ->
2074- {State0 , Result , Effects0 }
2071+ {State0 , Effects0 }
20752072 end .
20762073
20772074unblock_enqueuers (Enqs0 , Effects0 ) ->
0 commit comments