summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-11 15:58:41 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-11 15:58:41 +0100
commit6fc0cf7c0007441e3bcebb49f193b15c7831c0aa (patch)
tree59d6592e9160ba318f8eb2ebc25061fbb92f9c10
parentf23eb8eff46008e46dc5d8e55ea17911cf162f82 (diff)
downloadrabbitmq-server-6fc0cf7c0007441e3bcebb49f193b15c7831c0aa.tar.gz
Simplify maintenance of persistent_count on ack and purge:
* Don't parameterise remove_queue_entries/5 with a fold function, there's only one call site. * Get remove_queue_entries/4 / purge_betas_and_deltas/3 to update persistent count directly, not removed counts by store. * Update persistent count in one place in remove_pending_ack/2, not many places in ack/2. * Remove a couple of no-longer needed helpers.
-rw-r--r--src/rabbit_variable_queue.erl79
1 files changed, 34 insertions, 45 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 03b99562..42013ba0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -502,24 +502,24 @@ purge(State = #vqstate { q4 = Q4,
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- {LensByStore, IndexState1} = remove_queue_entries(
- fun ?QUEUE:foldl/3, Q4,
- orddict:new(), IndexState, MSCState),
- {LensByStore1, State1 = #vqstate { q1 = Q1,
- index_state = IndexState2,
- msg_store_clients = MSCState1 }} =
- purge_betas_and_deltas(LensByStore,
- State #vqstate { q4 = ?QUEUE:new(),
- index_state = IndexState1 }),
- {LensByStore2, IndexState3} = remove_queue_entries(
- fun ?QUEUE:foldl/3, Q1,
- LensByStore1, IndexState2, MSCState1),
- PCount1 = PCount - find_persistent_count(LensByStore2),
+ {PCount1, IndexState1} =
+ remove_queue_entries(Q4, PCount, IndexState, MSCState),
+
+ {PCount2, State1 = #vqstate { q1 = Q1,
+ index_state = IndexState2,
+ msg_store_clients = MSCState1 }} =
+ purge_betas_and_deltas(
+ PCount1, State #vqstate { q4 = ?QUEUE:new(),
+ index_state = IndexState1 }),
+
+ {PCount3, IndexState3} =
+ remove_queue_entries(Q1, PCount2, IndexState2, MSCState1),
+
{Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
index_state = IndexState3,
len = 0,
ram_msg_count = 0,
- persistent_count = PCount1 })}.
+ persistent_count = PCount3 })}.
purge_acks(State) -> a(purge_pending_ack(false, State)).
@@ -638,7 +638,6 @@ ack([SeqId], State) ->
index_on_disk = IndexOnDisk },
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
- persistent_count = PCount,
ack_out_counter = AckOutCount }} =
remove_pending_ack(SeqId, State),
IndexState1 = case IndexOnDisk of
@@ -649,16 +648,13 @@ ack([SeqId], State) ->
true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
false -> ok
end,
- PCount1 = PCount - one_if(IsPersistent),
{[MsgId],
a(State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
ack_out_counter = AckOutCount + 1 })};
ack(AckTags, State) ->
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
- persistent_count = PCount,
ack_out_counter = AckOutCount }} =
lists:foldl(
fun (SeqId, {Acc, State2}) ->
@@ -668,11 +664,8 @@ ack(AckTags, State) ->
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
- PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
- orddict:new(), MsgIdsByStore)),
{lists:reverse(AllMsgIds),
a(State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
requeue(AckTags, #vqstate { delta = Delta,
@@ -1174,29 +1167,31 @@ remove(AckRequired, MsgStatus = #msg_status {
len = Len - 1,
persistent_count = PCount1})}.
-purge_betas_and_deltas(LensByStore,
+purge_betas_and_deltas(PCount,
State = #vqstate { q3 = Q3,
index_state = IndexState,
msg_store_clients = MSCState }) ->
case ?QUEUE:is_empty(Q3) of
- true -> {LensByStore, State};
- false -> {LensByStore1, IndexState1} =
- remove_queue_entries(fun ?QUEUE:foldl/3, Q3,
- LensByStore, IndexState, MSCState),
- purge_betas_and_deltas(LensByStore1,
+ true -> {PCount, State};
+ false -> {PCount1, IndexState1} = remove_queue_entries(
+ Q3, PCount, IndexState, MSCState),
+ purge_betas_and_deltas(PCount1,
maybe_deltas_to_betas(
State #vqstate {
q3 = ?QUEUE:new(),
index_state = IndexState1 }))
end.
-remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) ->
+remove_queue_entries(Q, PCount, IndexState, MSCState) ->
{MsgIdsByStore, Delivers, Acks} =
- Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
+ ?QUEUE:foldl(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore),
+ {PCount - case orddict:find(true, MsgIdsByStore) of
+ error -> 0;
+ {ok, Ids} -> length(Ids)
+ end,
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
@@ -1212,12 +1207,6 @@ remove_queue_entries1(
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
-sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) ->
- orddict:fold(
- fun (IsPersistent, MsgIds, LensByStore1) ->
- orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1)
- end, LensByStore, MsgIdsByStore).
-
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
@@ -1289,8 +1278,14 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
none -> gb_trees:get(SeqId, DPA)
end.
-remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
+remove_pending_ack(SeqId, State) ->
+ {MsgStatus, State1 = #vqstate { persistent_count = PCount }} =
+ remove_pending_ack0(SeqId, State),
+ PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent),
+ {MsgStatus, State1 # vqstate{ persistent_count = PCount1 }}.
+
+remove_pending_ack0(SeqId, State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
case gb_trees:lookup(SeqId, RPA) of
{value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
{V, State #vqstate { ram_pending_ack = RPA1 }};
@@ -1340,12 +1335,6 @@ accumulate_ack(#msg_status { seq_id = SeqId,
end,
[MsgId | AllMsgIds]}.
-find_persistent_count(LensByStore) ->
- case orddict:find(true, LensByStore) of
- error -> 0;
- {ok, Len} -> Len
- end.
-
%%----------------------------------------------------------------------------
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
@@ -1441,7 +1430,7 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
%% Mostly opposite of record_pending_ack/2
msg_from_pending_ack(SeqId, State) ->
{#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
- remove_pending_ack(SeqId, State),
+ remove_pending_ack0(SeqId, State),
{MsgStatus #msg_status {
msg_props = MsgProps #message_properties { needs_confirming = false } },
State1}.