summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl39
1 files changed, 21 insertions, 18 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0bfec2fd..209e5252 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,13 +16,12 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
+-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
+ publish/4, publish_delivered/5, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1,
- needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, discard/3,
- multiple_routing_keys/0, fold/3]).
+ set_ram_duration_target/2, ram_duration/1, needs_timeout/1,
+ timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
+ is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -579,23 +578,27 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, MsgFun, State) ->
+dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
+
+dropwhile(Pred, AckRequired, State, Msgs) ->
+ End = fun(S) when AckRequired -> {lists:reverse(Msgs), S};
+ (S) -> {undefined, S}
+ end,
case queue_out(State) of
{empty, State1} ->
- a(State1);
+ End(a(State1));
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case {Pred(MsgProps), MsgFun} of
- {true, undefined} ->
+ case {Pred(MsgProps), AckRequired} of
+ {true, true} ->
+ {MsgStatus1, State2} = read_msg(MsgStatus, State1),
+ {{Msg, _, AckTag, _}, State3} =
+ internal_fetch(true, MsgStatus1, State2),
+ dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
+ {true, false} ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, MsgFun, State2);
- {true, _} ->
- {{_, _, AckTag, _}, State2} =
- internal_fetch(true, MsgStatus, State1),
- {MsgStatus, State3} = read_msg(MsgStatus, State2),
- MsgFun(MsgStatus#msg_status.msg, AckTag),
- dropwhile(Pred, MsgFun, State3);
+ dropwhile(Pred, AckRequired, State2, undefined);
{false, _} ->
- a(in_r(MsgStatus, State1))
+ End(a(in_r(MsgStatus, State1)))
end
end.