summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-29 21:56:41 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-29 21:56:41 +0100
commit2f2a55ad1913800db2876f2919f5c9a34f23a472 (patch)
treee10baf0aa3b41b1fbbe6f248b8e4b4ec6d9fcb07
parent500fdc0d25f431aac42a9ef179ec4b5653c2167d (diff)
downloadrabbitmq-server-2f2a55ad1913800db2876f2919f5c9a34f23a472.tar.gz
inline ack/4
since there is now just one call site left
-rw-r--r--src/rabbit_variable_queue.erl57
1 files changed, 25 insertions, 32 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 73488832..e06f32bf 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -565,11 +565,32 @@ fetch(AckRequired, State) ->
{Res, a(State3)}
end.
+ack([], State) ->
+ {[], State};
ack(AckTags, State) ->
- {MsgIds, State1} = ack(fun msg_store_remove/3,
- fun (_, State0) -> State0 end,
- AckTags, State),
- {MsgIds, a(State1)}.
+ {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ persistent_count = PCount,
+ ack_out_counter = AckOutCount }} =
+ lists:foldl(
+ fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA,
+ ram_ack_index = RAI }}) ->
+ AckEntry = dict:fetch(SeqId, PA),
+ {accumulate_ack(SeqId, AckEntry, Acc),
+ State2 #vqstate {
+ pending_ack = dict:erase(SeqId, PA),
+ ram_ack_index = gb_trees:delete_any(SeqId, RAI)}}
+ end, {accumulate_ack_init(), State}, AckTags),
+ IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
+ [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
+ || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
+ PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
+ orddict:new(), MsgIdsByStore)),
+ {lists:reverse(AllMsgIds),
+ a(State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) })}.
requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta,
q3 = Q3,
@@ -1216,34 +1237,6 @@ remove_pending_ack(KeepPersistent,
State1 #vqstate { index_state = IndexState1 }
end.
-ack(_MsgStoreFun, _Fun, [], State) ->
- {[], State};
-ack(MsgStoreFun, Fun, AckTags, State) ->
- {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
- State1 = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState,
- persistent_count = PCount,
- ack_out_counter = AckOutCount }} =
- lists:foldl(
- fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA,
- ram_ack_index = RAI }}) ->
- AckEntry = dict:fetch(SeqId, PA),
- {accumulate_ack(SeqId, AckEntry, Acc),
- Fun(AckEntry, State2 #vqstate {
- pending_ack = dict:erase(SeqId, PA),
- ram_ack_index =
- gb_trees:delete_any(SeqId, RAI)})}
- end, {accumulate_ack_init(), State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
- [ok = MsgStoreFun(MSCState, IsPersistent, MsgIds)
- || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
- PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
- orddict:new(), MsgIdsByStore)),
- {lists:reverse(AllMsgIds),
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }}.
-
accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS