summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl54
1 files changed, 28 insertions, 26 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 34a28afe..811017d9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- dropwhile/4, fetch/2, ack/4, requeue/2, len/1, is_empty/1,
+ dropwhile/3, fetch/2, ack/3, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3,
@@ -581,19 +581,18 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, MsgFun, MsgSeqNo, State) ->
+dropwhile(Pred, MsgFun, State) ->
case queue_out(State) of
{empty, State1} ->
- {MsgSeqNo, a(State1)};
+ a(State1);
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
true ->
- {MsgSeqNo1, State2} =
- MsgFun(read_msg_callback(MsgStatus), MsgSeqNo, State1),
+ State2 = MsgFun(read_msg_callback(MsgStatus), undefined, State1),
{_, State3} = internal_fetch(false, MsgStatus, State2),
- dropwhile(Pred, MsgFun, MsgSeqNo1, State3);
+ dropwhile(Pred, MsgFun, State3);
false ->
- {MsgSeqNo, a(in_r(MsgStatus, State1))}
+ a(in_r(MsgStatus, State1))
end
end.
@@ -626,39 +625,42 @@ read_msg_callback1(MsgId, IsPersistent,
msg_store_read(MSCState, IsPersistent, MsgId),
{Msg, State #vqstate { msg_store_clients = MSCState1 }}.
-ack([], _Fun, MsgSeqNo, State) ->
- {[], MsgSeqNo, State};
+ack([], _Fun, State) ->
+ {[], State};
-ack(AckTags, MsgFun, MsgSeqNo, State) ->
+ack(AckTags, undefined, State) ->
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
- {MsgSeqNo2,
- State1 = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState,
- persistent_count = PCount,
- ack_out_counter = AckOutCount }}} =
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ persistent_count = PCount,
+ ack_out_counter = AckOutCount }} =
lists:foldl(
- fun (SeqId, {Acc, {MsgSeqNo1, State2 = #vqstate{pending_ack = PA}}}) ->
- AckEntry = gb_trees:get(SeqId, PA),
+ fun (SeqId, {Acc, State2}) ->
{MsgStatus, State3} = remove_pending_ack(SeqId, State2),
- {accumulate_ack(MsgStatus, Acc),
- MsgFun(read_msg_callback(AckEntry), MsgSeqNo1, State3)}
- end, {accumulate_ack_init(), {MsgSeqNo, State}}, AckTags),
+ {accumulate_ack(MsgStatus, Acc), State3}
+ end, {accumulate_ack_init(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
orddict:new(), MsgIdsByStore)),
{lists:reverse(AllMsgIds),
- MsgSeqNo2,
a(State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) })}.
+ ack_out_counter = AckOutCount + length(AckTags) })};
+
+ack(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) ->
+ [begin
+ AckEntry = gb_trees:get(SeqId, PA),
+ MsgFun(read_msg_callback(AckEntry), SeqId, State)
+ end || SeqId <- AckTags],
+ {[], State}.
requeue(AckTags, #vqstate { delta = Delta,
- q3 = Q3,
- q4 = Q4,
- in_counter = InCounter,
- len = Len } = State) ->
+ q3 = Q3,
+ q4 = Q4,
+ in_counter = InCounter,
+ len = Len } = State) ->
{SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
beta_limit(Q3),
fun publish_alpha/2, State),