diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-09-30 20:37:22 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-09-30 20:37:22 +0100 |
commit | 8012c6d91be14f3f500de1a5dc8396ef40becbe6 (patch) | |
tree | ec5a001044c6bd4484071fd49c109dfccf1a1cad | |
parent | 73e62384d08f01542abce06fb17b21d2dcb6928b (diff) | |
parent | 8bcaca0ccb9f329be82d588da595647a517e89c7 (diff) | |
download | rabbitmq-server-8012c6d91be14f3f500de1a5dc8396ef40becbe6.tar.gz |
merge bug23190 into default
-rw-r--r-- | src/rabbit_variable_queue.erl | 67 |
1 files changed, 44 insertions, 23 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 30d3a8ae..cbc71bcc 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -472,23 +472,30 @@ delete_and_terminate(State) -> a(State2 #vqstate { index_state = IndexState1, msg_store_clients = undefined }). -purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> +purge(State = #vqstate { q4 = Q4, + index_state = IndexState, + len = Len, + persistent_count = PCount }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - IndexState1 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, - IndexState), - State1 = #vqstate { q1 = Q1, index_state = IndexState2 } = - purge_betas_and_deltas(State #vqstate { q4 = queue:new(), + {LensByStore, IndexState1} = remove_queue_entries( + fun rabbit_misc:queue_fold/3, Q4, + orddict:new(), IndexState), + {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2 }} = + purge_betas_and_deltas(LensByStore, + State #vqstate { q4 = queue:new(), index_state = IndexState1 }), - IndexState3 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q1, - IndexState2), + {LensByStore2, IndexState3} = remove_queue_entries( + fun rabbit_misc:queue_fold/3, Q1, + LensByStore1, IndexState2), + PCount1 = PCount - find_persistent_count(LensByStore2), {Len, a(State1 #vqstate { q1 = queue:new(), index_state = IndexState3, len = 0, ram_msg_count = 0, ram_index_count = 0, - persistent_count = 0 })}. + persistent_count = PCount1 })}. publish(Msg, State) -> {_SeqId, State1} = publish(Msg, false, false, State), @@ -957,26 +964,30 @@ tx_commit_index(State = #vqstate { on_sync = #sync { reduce_memory_use( State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). -purge_betas_and_deltas(State = #vqstate { q3 = Q3, +purge_betas_and_deltas(LensByStore, + State = #vqstate { q3 = Q3, index_state = IndexState }) -> case bpqueue:is_empty(Q3) of - true -> State; - false -> IndexState1 = remove_queue_entries(fun beta_fold/3, Q3, - IndexState), - purge_betas_and_deltas( - maybe_deltas_to_betas( - State #vqstate { q3 = bpqueue:new(), - index_state = IndexState1 })) + true -> {LensByStore, State}; + false -> {LensByStore1, IndexState1} = remove_queue_entries( + fun beta_fold/3, Q3, + LensByStore, IndexState), + purge_betas_and_deltas(LensByStore1, + maybe_deltas_to_betas( + State #vqstate { + q3 = bpqueue:new(), + index_state = IndexState1 })) end. -remove_queue_entries(Fold, Q, IndexState) -> +remove_queue_entries(Fold, Q, LensByStore, IndexState) -> {GuidsByStore, Delivers, Acks} = Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), ok = orddict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:remove(MsgStore, Guids) end, ok, GuidsByStore), - rabbit_queue_index:ack(Acks, - rabbit_queue_index:deliver(Delivers, IndexState)). + {sum_guids_by_store_to_len(LensByStore, GuidsByStore), + rabbit_queue_index:ack(Acks, + rabbit_queue_index:deliver(Delivers, IndexState))}. remove_queue_entries1( #msg_status { guid = Guid, seq_id = SeqId, @@ -991,6 +1002,12 @@ remove_queue_entries1( cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks)}. +sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> + orddict:fold( + fun (MsgStore, Guids, LensByStore1) -> + orddict:update_counter(MsgStore, length(Guids), LensByStore1) + end, LensByStore, GuidsByStore). + %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- @@ -1117,10 +1134,8 @@ ack(MsgStoreFun, Fun, AckTags, State) -> ok = orddict:fold(fun (MsgStore, Guids, ok) -> MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), - PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of - error -> 0; - {ok, Guids} -> length(Guids) - end, + PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( + orddict:new(), GuidsByStore)), State1 #vqstate { index_state = IndexState1, persistent_count = PCount1 }. @@ -1132,6 +1147,12 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. +find_persistent_count(LensByStore) -> + case orddict:find(?PERSISTENT_MSG_STORE, LensByStore) of + error -> 0; + {ok, Len} -> Len + end. + %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- |