summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-07-22 15:14:07 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-07-22 15:14:07 +0100
commit86a7ad787334356739079279880c6947c82b0c67 (patch)
tree0f7a82afe2a0648da0c0701d4cd5d9fbf96b2d1b
parent1926573db998244854f33acbddd50576104ec355 (diff)
downloadrabbitmq-server-bug21673.tar.gz
replace use of dict with orddict for per-msg_store partitionsbug21673
which is much faster for small dicts
-rw-r--r--src/rabbit_variable_queue.erl35
1 files changed, 18 insertions, 17 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8bff66af..92ffc511 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -971,10 +971,10 @@ purge_betas_and_deltas(State = #vqstate { q3 = Q3,
remove_queue_entries(Fold, Q, IndexState) ->
{GuidsByStore, Delivers, Acks} =
- Fold(fun remove_queue_entries1/2, {dict:new(), [], []}, Q),
- ok = dict:fold(fun (MsgStore, Guids, ok) ->
- rabbit_msg_store:remove(MsgStore, Guids)
- end, ok, GuidsByStore),
+ Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
+ ok = orddict:fold(fun (MsgStore, Guids, ok) ->
+ rabbit_msg_store:remove(MsgStore, Guids)
+ end, ok, GuidsByStore),
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState)).
@@ -984,8 +984,8 @@ remove_queue_entries1(
index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
{GuidsByStore, Delivers, Acks}) ->
{case MsgOnDisk of
- true -> rabbit_misc:dict_cons(find_msg_store(IsPersistent), Guid,
- GuidsByStore);
+ true -> rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid,
+ GuidsByStore);
false -> GuidsByStore
end,
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
@@ -1084,19 +1084,20 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState }) ->
{SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
- {[], dict:new()}, PA),
+ {[], orddict:new()}, PA),
State1 = State #vqstate { pending_ack = dict:new() },
case KeepPersistent of
- true -> case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
+ true -> case orddict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
error -> State1;
{ok, Guids} -> ok = rabbit_msg_store:remove(
?TRANSIENT_MSG_STORE, Guids),
State1
end;
false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = dict:fold(fun (MsgStore, Guids, ok) ->
- rabbit_msg_store:remove(MsgStore, Guids)
- end, ok, GuidsByStore),
+ ok = orddict:fold(
+ fun (MsgStore, Guids, ok) ->
+ rabbit_msg_store:remove(MsgStore, Guids)
+ end, ok, GuidsByStore),
State1 #vqstate { index_state = IndexState1 }
end.
@@ -1111,12 +1112,12 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
{accumulate_ack(SeqId, AckEntry, Acc),
Fun(AckEntry, State2 #vqstate {
pending_ack = dict:erase(SeqId, PA) })}
- end, {{[], dict:new()}, State}, AckTags),
+ end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = dict:fold(fun (MsgStore, Guids, ok) ->
- MsgStoreFun(MsgStore, Guids)
- end, ok, GuidsByStore),
- PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
+ ok = orddict:fold(fun (MsgStore, Guids, ok) ->
+ MsgStoreFun(MsgStore, Guids)
+ end, ok, GuidsByStore),
+ PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
error -> 0;
{ok, Guids} -> length(Guids)
end,
@@ -1129,7 +1130,7 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
Acc;
accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
{cons_if(IsPersistent, SeqId, SeqIdsAcc),
- rabbit_misc:dict_cons(find_msg_store(IsPersistent), Guid, Dict)}.
+ rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.
%%----------------------------------------------------------------------------
%% Phase changes