diff options
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r-- | src/rabbit_variable_queue.erl | 39 |
1 files changed, 21 insertions, 18 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0bfec2fd..209e5252 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,13 +16,12 @@ -module(rabbit_variable_queue). --export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/5, drain_confirmed/1, +-export([init/3, terminate/2, delete_and_terminate/2, purge/1, + publish/4, publish_delivered/5, drain_confirmed/1, dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, - needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, discard/3, - multiple_routing_keys/0, fold/3]). + set_ram_duration_target/2, ram_duration/1, needs_timeout/1, + timeout/1, handle_pre_hibernate/1, status/1, invoke/3, + is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]). -export([start/1, stop/0]). @@ -579,23 +578,27 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, MsgFun, State) -> +dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []). + +dropwhile(Pred, AckRequired, State, Msgs) -> + End = fun(S) when AckRequired -> {lists:reverse(Msgs), S}; + (S) -> {undefined, S} + end, case queue_out(State) of {empty, State1} -> - a(State1); + End(a(State1)); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case {Pred(MsgProps), MsgFun} of - {true, undefined} -> + case {Pred(MsgProps), AckRequired} of + {true, true} -> + {MsgStatus1, State2} = read_msg(MsgStatus, State1), + {{Msg, _, AckTag, _}, State3} = + internal_fetch(true, MsgStatus1, State2), + dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]); + {true, false} -> {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, MsgFun, State2); - {true, _} -> - {{_, _, AckTag, _}, State2} = - internal_fetch(true, MsgStatus, State1), - {MsgStatus, State3} = read_msg(MsgStatus, State2), - MsgFun(MsgStatus#msg_status.msg, AckTag), - dropwhile(Pred, MsgFun, State3); + dropwhile(Pred, AckRequired, State2, undefined); {false, _} -> - a(in_r(MsgStatus, State1)) + End(a(in_r(MsgStatus, State1))) end end. |