summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl35
1 files changed, 14 insertions, 21 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8998c0e8..b8771600 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -565,31 +565,24 @@ dropwhile(Pred, State) ->
dropwhile1(Pred, State) ->
internal_queue_out(
- fun(MsgStatus = #msg_status { msg_props = MsgProps, msg = Msg,
- index_on_disk = IndexOnDisk },
- State1 = #vqstate { q3 = Q3, q4 = Q4,
- ram_index_count = RamIndexCount }) ->
+ fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) ->
case Pred(MsgProps) of
- true ->
- {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile1(Pred, State2);
- false ->
- {ok,
- case Msg of
- undefined ->
- true = queue:is_empty(Q4), %% ASSERTION
- Q3a = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
- RamIndexCount1 =
- RamIndexCount + one_if(not IndexOnDisk),
- State1 #vqstate {
- q3 = Q3a, ram_index_count = RamIndexCount1 };
- _ ->
- Q4a = queue:in_r(MsgStatus, Q4),
- State1 #vqstate { q4 = Q4a }
- end}
+ true -> {_, State2} = internal_fetch(false, MsgStatus,
+ State1),
+ dropwhile1(Pred, State2);
+ false -> {ok, in_r(MsgStatus, State1)}
end
end, State).
+in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
+ State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
+ true = queue:is_empty(Q4), %% ASSERTION
+ State #vqstate {
+ q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
+ ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
+in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
+ State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }.
+
fetch(AckRequired, State) ->
internal_queue_out(
fun(MsgStatus, State1) ->