summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-09-30 12:08:52 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-09-30 12:08:52 +0100
commit2ad6c7cf437d5f7b351a757e0b1b1a158f5b7e66 (patch)
tree8b27c133486aa349b4ddc6217f9df90618940258
parentc5cbc173bac6c24ec5c386ea742bd40e9d539993 (diff)
downloadrabbitmq-server-2ad6c7cf437d5f7b351a757e0b1b1a158f5b7e66.tar.gz
Fixed
-rw-r--r--src/rabbit_variable_queue.erl65
1 files changed, 41 insertions, 24 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 30d3a8ae..ffd8013b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -472,23 +472,28 @@ delete_and_terminate(State) ->
a(State2 #vqstate { index_state = IndexState1,
msg_store_clients = undefined }).
-purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
+purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len,
+ persistent_count = PCount }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- IndexState1 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4,
- IndexState),
- State1 = #vqstate { q1 = Q1, index_state = IndexState2 } =
- purge_betas_and_deltas(State #vqstate { q4 = queue:new(),
+ {LensByStore, IndexState1} =
+ remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4,
+ orddict:new(), IndexState),
+ {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2 }} =
+ purge_betas_and_deltas(LensByStore,
+ State #vqstate { q4 = queue:new(),
index_state = IndexState1 }),
- IndexState3 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q1,
- IndexState2),
+ {LensByStore2, IndexState3} =
+ remove_queue_entries(fun rabbit_misc:queue_fold/3, Q1,
+ LensByStore1, IndexState2),
+ PCount1 = PCount - find_persistent_count(fun (X) -> X end, LensByStore2),
{Len, a(State1 #vqstate { q1 = queue:new(),
index_state = IndexState3,
len = 0,
ram_msg_count = 0,
ram_index_count = 0,
- persistent_count = 0 })}.
+ persistent_count = PCount1 })}.
publish(Msg, State) ->
{_SeqId, State1} = publish(Msg, false, false, State),
@@ -957,26 +962,29 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
reduce_memory_use(
State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
-purge_betas_and_deltas(State = #vqstate { q3 = Q3,
- index_state = IndexState }) ->
+purge_betas_and_deltas(LensByStore, State = #vqstate { index_state = IndexState,
+ q3 = Q3 }) ->
case bpqueue:is_empty(Q3) of
- true -> State;
- false -> IndexState1 = remove_queue_entries(fun beta_fold/3, Q3,
- IndexState),
- purge_betas_and_deltas(
- maybe_deltas_to_betas(
- State #vqstate { q3 = bpqueue:new(),
- index_state = IndexState1 }))
+ true -> {LensByStore, State};
+ false -> {LensByStore1, IndexState1} =
+ remove_queue_entries(fun beta_fold/3, Q3, LensByStore,
+ IndexState),
+ purge_betas_and_deltas(LensByStore1,
+ maybe_deltas_to_betas(
+ State #vqstate {
+ index_state = IndexState1,
+ q3 = bpqueue:new() }))
end.
-remove_queue_entries(Fold, Q, IndexState) ->
+remove_queue_entries(Fold, Q, LensByStore, IndexState) ->
{GuidsByStore, Delivers, Acks} =
Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:remove(MsgStore, Guids)
end, ok, GuidsByStore),
- rabbit_queue_index:ack(Acks,
- rabbit_queue_index:deliver(Delivers, IndexState)).
+ {sum_guids_by_store_to_len(LensByStore, GuidsByStore),
+ rabbit_queue_index:ack(Acks,
+ rabbit_queue_index:deliver(Delivers, IndexState))}.
remove_queue_entries1(
#msg_status { guid = Guid, seq_id = SeqId,
@@ -991,6 +999,12 @@ remove_queue_entries1(
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
+sum_guids_by_store_to_len(LensByStore, GuidsByStore) ->
+ orddict:fold(
+ fun (MsgStore, Guids, Acc) ->
+ orddict:update_counter(MsgStore, length(Guids), LensByStore)
+ end, LensByStore, GuidsByStore).
+
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
@@ -1117,10 +1131,7 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
- PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
- error -> 0;
- {ok, Guids} -> length(Guids)
- end,
+ PCount1 = PCount - find_persistent_count(fun erlang:length/1, GuidsByStore),
State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1 }.
@@ -1132,6 +1143,12 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
{cons_if(IsPersistent, SeqId, SeqIdsAcc),
rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.
+find_persistent_count(Fun, ByStore) ->
+ case orddict:find(?PERSISTENT_MSG_STORE, ByStore) of
+ error -> 0;
+ {ok, Value} -> Fun(Value)
+ end.
+
%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------