diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-22 15:14:07 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-22 15:14:07 +0100 |
commit | 86a7ad787334356739079279880c6947c82b0c67 (patch) | |
tree | 0f7a82afe2a0648da0c0701d4cd5d9fbf96b2d1b | |
parent | 1926573db998244854f33acbddd50576104ec355 (diff) | |
download | rabbitmq-server-86a7ad787334356739079279880c6947c82b0c67.tar.gz |
replace use of dict with orddict for per-msg_store partitionsbug21673
which is much faster for small dicts
-rw-r--r-- | src/rabbit_variable_queue.erl | 35 |
1 files changed, 18 insertions, 17 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8bff66af..92ffc511 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -971,10 +971,10 @@ purge_betas_and_deltas(State = #vqstate { q3 = Q3, remove_queue_entries(Fold, Q, IndexState) -> {GuidsByStore, Delivers, Acks} = - Fold(fun remove_queue_entries1/2, {dict:new(), [], []}, Q), - ok = dict:fold(fun (MsgStore, Guids, ok) -> - rabbit_msg_store:remove(MsgStore, Guids) - end, ok, GuidsByStore), + Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), + ok = orddict:fold(fun (MsgStore, Guids, ok) -> + rabbit_msg_store:remove(MsgStore, Guids) + end, ok, GuidsByStore), rabbit_queue_index:ack(Acks, rabbit_queue_index:deliver(Delivers, IndexState)). @@ -984,8 +984,8 @@ remove_queue_entries1( index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, {GuidsByStore, Delivers, Acks}) -> {case MsgOnDisk of - true -> rabbit_misc:dict_cons(find_msg_store(IsPersistent), Guid, - GuidsByStore); + true -> rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, + GuidsByStore); false -> GuidsByStore end, cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), @@ -1084,19 +1084,20 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState }) -> {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, - {[], dict:new()}, PA), + {[], orddict:new()}, PA), State1 = State #vqstate { pending_ack = dict:new() }, case KeepPersistent of - true -> case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of + true -> case orddict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of error -> State1; {ok, Guids} -> ok = rabbit_msg_store:remove( ?TRANSIENT_MSG_STORE, Guids), State1 end; false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = dict:fold(fun (MsgStore, Guids, ok) -> - rabbit_msg_store:remove(MsgStore, Guids) - end, ok, GuidsByStore), + ok = orddict:fold( + fun (MsgStore, Guids, ok) -> + rabbit_msg_store:remove(MsgStore, Guids) + end, ok, GuidsByStore), State1 #vqstate { index_state = IndexState1 } end. @@ -1111,12 +1112,12 @@ ack(MsgStoreFun, Fun, AckTags, State) -> {accumulate_ack(SeqId, AckEntry, Acc), Fun(AckEntry, State2 #vqstate { pending_ack = dict:erase(SeqId, PA) })} - end, {{[], dict:new()}, State}, AckTags), + end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = dict:fold(fun (MsgStore, Guids, ok) -> - MsgStoreFun(MsgStore, Guids) - end, ok, GuidsByStore), - PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of + ok = orddict:fold(fun (MsgStore, Guids, ok) -> + MsgStoreFun(MsgStore, Guids) + end, ok, GuidsByStore), + PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of error -> 0; {ok, Guids} -> length(Guids) end, @@ -1129,7 +1130,7 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS Acc; accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), - rabbit_misc:dict_cons(find_msg_store(IsPersistent), Guid, Dict)}. + rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. %%---------------------------------------------------------------------------- %% Phase changes |