diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-14 12:24:21 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-14 12:24:21 +0000 |
commit | 6fc0feb3e036f45a226e2e323cb018758a5e8db7 (patch) | |
tree | 5e0d4fb15d24cd8ba3a4f40ace12028a1faf50b2 /src | |
parent | 88282eb072f38d71f26f9cb6d663685ef095bfd5 (diff) | |
download | rabbitmq-server-6fc0feb3e036f45a226e2e323cb018758a5e8db7.tar.gz |
Simplify code surrounding the return from BQ:ack
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 56 |
2 files changed, 29 insertions, 29 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 352e76fd..8603d8d7 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -77,7 +77,7 @@ behaviour_info(callbacks) -> {fetch, 2}, %% Acktags supplied are for messages which can now be forgotten - %% about. + %% about. Must return 1 guid per Ack, in the same order as Acks. {ack, 2}, %% A publish, but in the context of a transaction. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 88c61d57..565c61e7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1311,10 +1311,8 @@ record_pending_ack(#msg_status { seq_id = SeqId, ack_in_counter = AckInCount}) -> {AckEntry, RAI1} = case MsgOnDisk of - true -> - {{IsPersistent, Guid, MsgProps}, RAI}; - false -> - {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)} + true -> {{IsPersistent, Guid, MsgProps}, RAI}; + false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)} end, PA1 = dict:store(SeqId, AckEntry, PA), State #vqstate { pending_ack = PA1, @@ -1325,8 +1323,8 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, - {[], orddict:new()}, PA), + {PersistentSeqIds, GuidsByStore, _AllGuids} = + dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, case KeepPersistent of @@ -1336,18 +1334,17 @@ remove_pending_ack(KeepPersistent, Guids), State1 end; - false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = orddict:fold( - fun (IsPersistent, Guids, ok) -> - msg_store_remove(MSCState, IsPersistent, Guids) - end, ok, GuidsByStore), + false -> IndexState1 = + rabbit_queue_index:ack(PersistentSeqIds, IndexState), + [ok = msg_store_remove(MSCState, IsPersistent, Guids) + || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], State1 #vqstate { index_state = IndexState1 } end. ack(_MsgStoreFun, _Fun, [], State) -> {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> - {{SeqIds, GuidsByStore}, + {{PersistentSeqIds, GuidsByStore, AllGuids}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1361,27 +1358,30 @@ ack(MsgStoreFun, Fun, AckTags, State) -> pending_ack = dict:erase(SeqId, PA), ram_ack_index = gb_trees:delete_any(SeqId, RAI)})} - end, {{[], orddict:new()}, State}, AckTags), - IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - AckdGuids = lists:append( - orddict:fold( - fun (IsPersistent, Guids, Gs) -> - MsgStoreFun(MSCState, IsPersistent, Guids), - [Guids | Gs] - end, [], GuidsByStore)), + end, {accumulate_ack_init(), State}, AckTags), + IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState), + [ok = MsgStoreFun(MSCState, IsPersistent, Guids) + || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - {AckdGuids, State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }}. + {lists:reverse(AllGuids), + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) }}. + +accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, - index_on_disk = false }, Acc) -> - Acc; -accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) -> - {cons_if(IsPersistent, SeqId, SeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}. + index_on_disk = false, + guid = Guid }, + {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> + {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]}; +accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, + {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> + {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), + rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore), + [Guid | AllGuids]}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of |