diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-21 18:31:49 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-21 18:31:49 +0000 |
commit | f45fa72f76d72aca74e87347dfd8de6ff3522800 (patch) | |
tree | 1464e0d8310c3544ef55a41e0397a970b0321e1f /src/rabbit_variable_queue.erl | |
parent | 5647d8a9e4e0d3d8c27cb72941ab3f33dde5048e (diff) | |
download | rabbitmq-server-f45fa72f76d72aca74e87347dfd8de6ff3522800.tar.gz |
simplify variable_queue:ack's return format
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r-- | src/rabbit_variable_queue.erl | 71 |
1 files changed, 32 insertions, 39 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6264255b..aa6d3a87 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -654,11 +654,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { persistent_count = PCount1 })}. ack(AckTags, State) -> - {Guids, State1} = - ack(fun msg_store_remove/3, - fun (_, State0) -> State0 end, - AckTags, State), - {Guids, a(State1)}. + a(ack(fun msg_store_remove/3, + fun (_, State0) -> State0 end, + AckTags, State)). tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, State = #vqstate { durable = IsDurable, @@ -708,23 +706,22 @@ tx_commit(Txn, Fun, MsgPropsFun, end)}. requeue(AckTags, MsgPropsFun, State) -> - {_Guids, State1} = - ack(fun msg_store_release/3, - fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> - {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), - true, false, State1), - State2; - ({IsPersistent, Guid, MsgProps}, State1) -> - #vqstate { msg_store_clients = MSCState } = State1, - {{ok, Msg = #basic_message{}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, Guid), - State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps), - true, true, State2), - State3 - end, - AckTags, State), - a(reduce_memory_use(State1)). + a(reduce_memory_use( + ack(fun msg_store_release/3, + fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> + {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), + true, false, State1), + State2; + ({IsPersistent, Guid, MsgProps}, State1) -> + #vqstate { msg_store_clients = MSCState } = State1, + {{ok, Msg = #basic_message{}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, Guid), + State2 = State1 #vqstate { msg_store_clients = MSCState1 }, + {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps), + true, true, State2), + State3 + end, + AckTags, State))). len(#vqstate { len = Len }) -> Len. @@ -1156,7 +1153,6 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), - {_Guids, NewState} = ack(Acks, State), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], {SeqIds, State1 = #vqstate { index_state = IndexState }} = @@ -1168,7 +1164,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { {SeqId, State3} = publish(Msg, MsgProps, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, NewState}, Pubs), + end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( @@ -1319,7 +1315,7 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {PersistentSeqIds, GuidsByStore, _AllGuids} = + {PersistentSeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, @@ -1338,9 +1334,9 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - {[], State}; + State; ack(MsgStoreFun, Fun, AckTags, State) -> - {{PersistentSeqIds, GuidsByStore, AllGuids}, + {{PersistentSeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1360,24 +1356,21 @@ ack(MsgStoreFun, Fun, AckTags, State) -> || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - {lists:reverse(AllGuids), - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }}. + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) }. -accumulate_ack_init() -> {[], orddict:new(), []}. +accumulate_ack_init() -> {[], orddict:new()}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, - index_on_disk = false, - guid = Guid }, - {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> - {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]}; + index_on_disk = false }, + {PersistentSeqIdsAcc, GuidsByStore}) -> + {PersistentSeqIdsAcc, GuidsByStore}; accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, - {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> + {PersistentSeqIdsAcc, GuidsByStore}) -> {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore), - [Guid | AllGuids]}. + rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of |