diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-30 12:08:52 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-30 12:08:52 +0100 |
commit | 2ad6c7cf437d5f7b351a757e0b1b1a158f5b7e66 (patch) | |
tree | 8b27c133486aa349b4ddc6217f9df90618940258 | |
parent | c5cbc173bac6c24ec5c386ea742bd40e9d539993 (diff) | |
download | rabbitmq-server-2ad6c7cf437d5f7b351a757e0b1b1a158f5b7e66.tar.gz |
Fixed
-rw-r--r-- | src/rabbit_variable_queue.erl | 65 |
1 files changed, 41 insertions, 24 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 30d3a8ae..ffd8013b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -472,23 +472,28 @@ delete_and_terminate(State) -> a(State2 #vqstate { index_state = IndexState1, msg_store_clients = undefined }). -purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> +purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len, + persistent_count = PCount }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - IndexState1 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, - IndexState), - State1 = #vqstate { q1 = Q1, index_state = IndexState2 } = - purge_betas_and_deltas(State #vqstate { q4 = queue:new(), + {LensByStore, IndexState1} = + remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, + orddict:new(), IndexState), + {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2 }} = + purge_betas_and_deltas(LensByStore, + State #vqstate { q4 = queue:new(), index_state = IndexState1 }), - IndexState3 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q1, - IndexState2), + {LensByStore2, IndexState3} = + remove_queue_entries(fun rabbit_misc:queue_fold/3, Q1, + LensByStore1, IndexState2), + PCount1 = PCount - find_persistent_count(fun (X) -> X end, LensByStore2), {Len, a(State1 #vqstate { q1 = queue:new(), index_state = IndexState3, len = 0, ram_msg_count = 0, ram_index_count = 0, - persistent_count = 0 })}. + persistent_count = PCount1 })}. publish(Msg, State) -> {_SeqId, State1} = publish(Msg, false, false, State), @@ -957,26 +962,29 @@ tx_commit_index(State = #vqstate { on_sync = #sync { reduce_memory_use( State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). -purge_betas_and_deltas(State = #vqstate { q3 = Q3, - index_state = IndexState }) -> +purge_betas_and_deltas(LensByStore, State = #vqstate { index_state = IndexState, + q3 = Q3 }) -> case bpqueue:is_empty(Q3) of - true -> State; - false -> IndexState1 = remove_queue_entries(fun beta_fold/3, Q3, - IndexState), - purge_betas_and_deltas( - maybe_deltas_to_betas( - State #vqstate { q3 = bpqueue:new(), - index_state = IndexState1 })) + true -> {LensByStore, State}; + false -> {LensByStore1, IndexState1} = + remove_queue_entries(fun beta_fold/3, Q3, LensByStore, + IndexState), + purge_betas_and_deltas(LensByStore1, + maybe_deltas_to_betas( + State #vqstate { + index_state = IndexState1, + q3 = bpqueue:new() })) end. -remove_queue_entries(Fold, Q, IndexState) -> +remove_queue_entries(Fold, Q, LensByStore, IndexState) -> {GuidsByStore, Delivers, Acks} = Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), ok = orddict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:remove(MsgStore, Guids) end, ok, GuidsByStore), - rabbit_queue_index:ack(Acks, - rabbit_queue_index:deliver(Delivers, IndexState)). + {sum_guids_by_store_to_len(LensByStore, GuidsByStore), + rabbit_queue_index:ack(Acks, + rabbit_queue_index:deliver(Delivers, IndexState))}. remove_queue_entries1( #msg_status { guid = Guid, seq_id = SeqId, @@ -991,6 +999,12 @@ remove_queue_entries1( cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks)}. +sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> + orddict:fold( + fun (MsgStore, Guids, Acc) -> + orddict:update_counter(MsgStore, length(Guids), LensByStore) + end, LensByStore, GuidsByStore). + %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- @@ -1117,10 +1131,7 @@ ack(MsgStoreFun, Fun, AckTags, State) -> ok = orddict:fold(fun (MsgStore, Guids, ok) -> MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), - PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of - error -> 0; - {ok, Guids} -> length(Guids) - end, + PCount1 = PCount - find_persistent_count(fun erlang:length/1, GuidsByStore), State1 #vqstate { index_state = IndexState1, persistent_count = PCount1 }. @@ -1132,6 +1143,12 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. +find_persistent_count(Fun, ByStore) -> + case orddict:find(?PERSISTENT_MSG_STORE, ByStore) of + error -> 0; + {ok, Value} -> Fun(Value) + end. + %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- |