diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-09-29 10:14:36 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-09-29 10:14:36 +0100 |
commit | fa79e2edcab3f749d3b1ce45234d0f0547cc7eca (patch) | |
tree | 33c57781dbad071fb12288f45672c71cba689e41 | |
parent | 3a61cb9d66fabd330f9f9dacc66cef9a7f88e645 (diff) | |
download | rabbitmq-server-fa79e2edcab3f749d3b1ce45234d0f0547cc7eca.tar.gz |
Only ack messages when necessary
-rw-r--r-- | src/rabbit_variable_queue.erl | 21 |
1 files changed, 11 insertions, 10 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index df8bb312..d11aa639 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1169,7 +1169,7 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {MsgIdsByStore, _AllMsgIds} = + {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, @@ -1181,7 +1181,7 @@ remove_pending_ack(KeepPersistent, State1 end; false -> IndexState1 = - rabbit_queue_index:ack(dict:fetch_keys(PA), IndexState), + rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], State1 #vqstate { index_state = IndexState1 } @@ -1190,7 +1190,7 @@ remove_pending_ack(KeepPersistent, ack(_MsgStoreFun, _Fun, [], State) -> {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> - {{MsgIdsByStore, AllMsgIds}, + {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1205,7 +1205,7 @@ ack(MsgStoreFun, Fun, AckTags, State) -> ram_ack_index = gb_trees:delete_any(SeqId, RAI)})} end, {accumulate_ack_init(), State}, AckTags), - IndexState1 = rabbit_queue_index:ack(AckTags, IndexState), + IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), [ok = MsgStoreFun(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( @@ -1215,17 +1215,18 @@ ack(MsgStoreFun, Fun, AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) }}. -accumulate_ack_init() -> {orddict:new(), []}. +accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false, msg_id = MsgId }, - {MsgIdsByStore, AllMsgIds}) -> - {MsgIdsByStore, [MsgId | AllMsgIds]}; -accumulate_ack(_SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk}, - {MsgIdsByStore, AllMsgIds}) -> - {rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), + {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> + {IndexOnDiskSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]}; +accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps, IndexOnDisk}, + {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> + {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), + rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), [MsgId | AllMsgIds]}. find_persistent_count(LensByStore) -> |