diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-29 21:56:41 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-29 21:56:41 +0100 |
commit | 2f2a55ad1913800db2876f2919f5c9a34f23a472 (patch) | |
tree | e10baf0aa3b41b1fbbe6f248b8e4b4ec6d9fcb07 | |
parent | 500fdc0d25f431aac42a9ef179ec4b5653c2167d (diff) | |
download | rabbitmq-server-2f2a55ad1913800db2876f2919f5c9a34f23a472.tar.gz |
inline ack/4
since there is now just one call site left
-rw-r--r-- | src/rabbit_variable_queue.erl | 57 |
1 files changed, 25 insertions, 32 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 73488832..e06f32bf 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -565,11 +565,32 @@ fetch(AckRequired, State) -> {Res, a(State3)} end. +ack([], State) -> + {[], State}; ack(AckTags, State) -> - {MsgIds, State1} = ack(fun msg_store_remove/3, - fun (_, State0) -> State0 end, - AckTags, State), - {MsgIds, a(State1)}. + {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + persistent_count = PCount, + ack_out_counter = AckOutCount }} = + lists:foldl( + fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA, + ram_ack_index = RAI }}) -> + AckEntry = dict:fetch(SeqId, PA), + {accumulate_ack(SeqId, AckEntry, Acc), + State2 #vqstate { + pending_ack = dict:erase(SeqId, PA), + ram_ack_index = gb_trees:delete_any(SeqId, RAI)}} + end, {accumulate_ack_init(), State}, AckTags), + IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), + [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) + || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], + PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( + orddict:new(), MsgIdsByStore)), + {lists:reverse(AllMsgIds), + a(State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) })}. requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta, q3 = Q3, @@ -1216,34 +1237,6 @@ remove_pending_ack(KeepPersistent, State1 #vqstate { index_state = IndexState1 } end. -ack(_MsgStoreFun, _Fun, [], State) -> - {[], State}; -ack(MsgStoreFun, Fun, AckTags, State) -> - {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, - State1 = #vqstate { index_state = IndexState, - msg_store_clients = MSCState, - persistent_count = PCount, - ack_out_counter = AckOutCount }} = - lists:foldl( - fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA, - ram_ack_index = RAI }}) -> - AckEntry = dict:fetch(SeqId, PA), - {accumulate_ack(SeqId, AckEntry, Acc), - Fun(AckEntry, State2 #vqstate { - pending_ack = dict:erase(SeqId, PA), - ram_ack_index = - gb_trees:delete_any(SeqId, RAI)})} - end, {accumulate_ack_init(), State}, AckTags), - IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), - [ok = MsgStoreFun(MSCState, IsPersistent, MsgIds) - || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], - PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( - orddict:new(), MsgIdsByStore)), - {lists:reverse(AllMsgIds), - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }}. - accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS |