diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-11 15:58:41 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-11 15:58:41 +0100 |
commit | 6fc0cf7c0007441e3bcebb49f193b15c7831c0aa (patch) | |
tree | 59d6592e9160ba318f8eb2ebc25061fbb92f9c10 | |
parent | f23eb8eff46008e46dc5d8e55ea17911cf162f82 (diff) | |
download | rabbitmq-server-6fc0cf7c0007441e3bcebb49f193b15c7831c0aa.tar.gz |
Simplify maintenance of persistent_count on ack and purge:
* Don't parameterise remove_queue_entries/5 with a fold function, there's
only one call site.
* Get remove_queue_entries/4 / purge_betas_and_deltas/3 to update persistent
count directly, not removed counts by store.
* Update persistent count in one place in remove_pending_ack/2, not many
places in ack/2.
* Remove a couple of no-longer needed helpers.
-rw-r--r-- | src/rabbit_variable_queue.erl | 79 |
1 files changed, 34 insertions, 45 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 03b99562..42013ba0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -502,24 +502,24 @@ purge(State = #vqstate { q4 = Q4, %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - {LensByStore, IndexState1} = remove_queue_entries( - fun ?QUEUE:foldl/3, Q4, - orddict:new(), IndexState, MSCState), - {LensByStore1, State1 = #vqstate { q1 = Q1, - index_state = IndexState2, - msg_store_clients = MSCState1 }} = - purge_betas_and_deltas(LensByStore, - State #vqstate { q4 = ?QUEUE:new(), - index_state = IndexState1 }), - {LensByStore2, IndexState3} = remove_queue_entries( - fun ?QUEUE:foldl/3, Q1, - LensByStore1, IndexState2, MSCState1), - PCount1 = PCount - find_persistent_count(LensByStore2), + {PCount1, IndexState1} = + remove_queue_entries(Q4, PCount, IndexState, MSCState), + + {PCount2, State1 = #vqstate { q1 = Q1, + index_state = IndexState2, + msg_store_clients = MSCState1 }} = + purge_betas_and_deltas( + PCount1, State #vqstate { q4 = ?QUEUE:new(), + index_state = IndexState1 }), + + {PCount3, IndexState3} = + remove_queue_entries(Q1, PCount2, IndexState2, MSCState1), + {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), index_state = IndexState3, len = 0, ram_msg_count = 0, - persistent_count = PCount1 })}. + persistent_count = PCount3 })}. purge_acks(State) -> a(purge_pending_ack(false, State)). @@ -638,7 +638,6 @@ ack([SeqId], State) -> index_on_disk = IndexOnDisk }, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount, ack_out_counter = AckOutCount }} = remove_pending_ack(SeqId, State), IndexState1 = case IndexOnDisk of @@ -649,16 +648,13 @@ ack([SeqId], State) -> true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); false -> ok end, - PCount1 = PCount - one_if(IsPersistent), {[MsgId], a(State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, ack_out_counter = AckOutCount + 1 })}; ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount, ack_out_counter = AckOutCount }} = lists:foldl( fun (SeqId, {Acc, State2}) -> @@ -668,11 +664,8 @@ ack(AckTags, State) -> IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], - PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( - orddict:new(), MsgIdsByStore)), {lists:reverse(AllMsgIds), a(State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. requeue(AckTags, #vqstate { delta = Delta, @@ -1174,29 +1167,31 @@ remove(AckRequired, MsgStatus = #msg_status { len = Len - 1, persistent_count = PCount1})}. -purge_betas_and_deltas(LensByStore, +purge_betas_and_deltas(PCount, State = #vqstate { q3 = Q3, index_state = IndexState, msg_store_clients = MSCState }) -> case ?QUEUE:is_empty(Q3) of - true -> {LensByStore, State}; - false -> {LensByStore1, IndexState1} = - remove_queue_entries(fun ?QUEUE:foldl/3, Q3, - LensByStore, IndexState, MSCState), - purge_betas_and_deltas(LensByStore1, + true -> {PCount, State}; + false -> {PCount1, IndexState1} = remove_queue_entries( + Q3, PCount, IndexState, MSCState), + purge_betas_and_deltas(PCount1, maybe_deltas_to_betas( State #vqstate { q3 = ?QUEUE:new(), index_state = IndexState1 })) end. -remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) -> +remove_queue_entries(Q, PCount, IndexState, MSCState) -> {MsgIdsByStore, Delivers, Acks} = - Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), + ?QUEUE:foldl(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore), + {PCount - case orddict:find(true, MsgIdsByStore) of + error -> 0; + {ok, Ids} -> length(Ids) + end, rabbit_queue_index:ack(Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. @@ -1212,12 +1207,6 @@ remove_queue_entries1( cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks)}. -sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) -> - orddict:fold( - fun (IsPersistent, MsgIds, LensByStore1) -> - orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1) - end, LensByStore, MsgIdsByStore). - %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- @@ -1289,8 +1278,14 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, none -> gb_trees:get(SeqId, DPA) end. -remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> +remove_pending_ack(SeqId, State) -> + {MsgStatus, State1 = #vqstate { persistent_count = PCount }} = + remove_pending_ack0(SeqId, State), + PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent), + {MsgStatus, State1 # vqstate{ persistent_count = PCount1 }}. + +remove_pending_ack0(SeqId, State = #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA }) -> case gb_trees:lookup(SeqId, RPA) of {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA), {V, State #vqstate { ram_pending_ack = RPA1 }}; @@ -1340,12 +1335,6 @@ accumulate_ack(#msg_status { seq_id = SeqId, end, [MsgId | AllMsgIds]}. -find_persistent_count(LensByStore) -> - case orddict:find(true, LensByStore) of - error -> 0; - {ok, Len} -> Len - end. - %%---------------------------------------------------------------------------- %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- @@ -1441,7 +1430,7 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> %% Mostly opposite of record_pending_ack/2 msg_from_pending_ack(SeqId, State) -> {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = - remove_pending_ack(SeqId, State), + remove_pending_ack0(SeqId, State), {MsgStatus #msg_status { msg_props = MsgProps #message_properties { needs_confirming = false } }, State1}. |