diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-08-02 10:48:18 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-08-02 10:48:18 +0100 |
commit | 62b6ce3e78a42a69a336049e5872bf4686a8543a (patch) | |
tree | 7a55098c74ec108cdb5d094ed16ccd9eb661e06c /src/rabbit_limiter.erl | |
parent | c6248e4437c04032e9231d265181e2e87d615ef5 (diff) | |
parent | e2c57c78fcc0281eeb78dd1914287e539265244c (diff) | |
download | rabbitmq-server-62b6ce3e78a42a69a336049e5872bf4686a8543a.tar.gz |
merge default into bug23504
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r-- | src/rabbit_limiter.erl | 33 |
1 files changed, 16 insertions, 17 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 86ea7282..8f9ab032 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -49,7 +49,7 @@ -record(lim, {prefetch_count = 0, ch_pid, blocked = false, - queues = dict:new(), % QPid -> {MonitorRef, Notify} + queues = orddict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be %% notified of a change in the limit or volume that may allow it to @@ -65,7 +65,7 @@ start_link(ChPid, UnackedMsgCount) -> limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - gen_server2:call(LimiterPid, {limit, PrefetchCount}). + gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -120,9 +120,9 @@ init([ChPid, UnackedMsgCount]) -> prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. -handle_call({can_send, _QPid, _AckRequired}, _From, +handle_call({can_send, QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> - {reply, false, State}; + {reply, false, limit_queue(QPid, State)}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> case limit_reached(State) of @@ -196,31 +196,30 @@ limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> blocked(#lim{blocked = Blocked}) -> Blocked. remember_queue(QPid, State = #lim{queues = Queues}) -> - case dict:is_key(QPid, Queues) of + case orddict:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), - State#lim{queues = dict:store(QPid, {MRef, false}, Queues)}; + State#lim{queues = orddict:store(QPid, {MRef, false}, Queues)}; true -> State end. forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> - case dict:find(QPid, Queues) of - {ok, {MRef, _}} -> - true = erlang:demonitor(MRef), - ok = rabbit_amqqueue:unblock(QPid, ChPid), - State#lim{queues = dict:erase(QPid, Queues)}; - error -> State + case orddict:find(QPid, Queues) of + {ok, {MRef, _}} -> true = erlang:demonitor(MRef), + ok = rabbit_amqqueue:unblock(QPid, ChPid), + State#lim{queues = orddict:erase(QPid, Queues)}; + error -> State end. limit_queue(QPid, State = #lim{queues = Queues}) -> UpdateFun = fun ({MRef, _}) -> {MRef, true} end, - State#lim{queues = dict:update(QPid, UpdateFun, Queues)}. + State#lim{queues = orddict:update(QPid, UpdateFun, Queues)}. notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> {QList, NewQueues} = - dict:fold(fun (_QPid, {_, false}, Acc) -> Acc; - (QPid, {MRef, true}, {L, D}) -> - {[QPid | L], dict:store(QPid, {MRef, false}, D)} - end, {[], Queues}, Queues), + orddict:fold(fun (_QPid, {_, false}, Acc) -> Acc; + (QPid, {MRef, true}, {L, D}) -> + {[QPid | L], orddict:store(QPid, {MRef, false}, D)} + end, {[], Queues}, Queues), case length(QList) of 0 -> ok; L -> |