summaryrefslogtreecommitdiff
path: root/src/rabbit_limiter.erl
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-08-02 10:48:18 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-08-02 10:48:18 +0100
commit62b6ce3e78a42a69a336049e5872bf4686a8543a (patch)
tree7a55098c74ec108cdb5d094ed16ccd9eb661e06c /src/rabbit_limiter.erl
parentc6248e4437c04032e9231d265181e2e87d615ef5 (diff)
parente2c57c78fcc0281eeb78dd1914287e539265244c (diff)
downloadrabbitmq-server-62b6ce3e78a42a69a336049e5872bf4686a8543a.tar.gz
merge default into bug23504
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r--src/rabbit_limiter.erl33
1 files changed, 16 insertions, 17 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 86ea7282..8f9ab032 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -49,7 +49,7 @@
-record(lim, {prefetch_count = 0,
ch_pid,
blocked = false,
- queues = dict:new(), % QPid -> {MonitorRef, Notify}
+ queues = orddict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
%% notified of a change in the limit or volume that may allow it to
@@ -65,7 +65,7 @@ start_link(ChPid, UnackedMsgCount) ->
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- gen_server2:call(LimiterPid, {limit, PrefetchCount}).
+ gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
@@ -120,9 +120,9 @@ init([ChPid, UnackedMsgCount]) ->
prioritise_call(get_limit, _From, _State) -> 9;
prioritise_call(_Msg, _From, _State) -> 0.
-handle_call({can_send, _QPid, _AckRequired}, _From,
+handle_call({can_send, QPid, _AckRequired}, _From,
State = #lim{blocked = true}) ->
- {reply, false, State};
+ {reply, false, limit_queue(QPid, State)};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
case limit_reached(State) of
@@ -196,31 +196,30 @@ limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
blocked(#lim{blocked = Blocked}) -> Blocked.
remember_queue(QPid, State = #lim{queues = Queues}) ->
- case dict:is_key(QPid, Queues) of
+ case orddict:is_key(QPid, Queues) of
false -> MRef = erlang:monitor(process, QPid),
- State#lim{queues = dict:store(QPid, {MRef, false}, Queues)};
+ State#lim{queues = orddict:store(QPid, {MRef, false}, Queues)};
true -> State
end.
forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
- case dict:find(QPid, Queues) of
- {ok, {MRef, _}} ->
- true = erlang:demonitor(MRef),
- ok = rabbit_amqqueue:unblock(QPid, ChPid),
- State#lim{queues = dict:erase(QPid, Queues)};
- error -> State
+ case orddict:find(QPid, Queues) of
+ {ok, {MRef, _}} -> true = erlang:demonitor(MRef),
+ ok = rabbit_amqqueue:unblock(QPid, ChPid),
+ State#lim{queues = orddict:erase(QPid, Queues)};
+ error -> State
end.
limit_queue(QPid, State = #lim{queues = Queues}) ->
UpdateFun = fun ({MRef, _}) -> {MRef, true} end,
- State#lim{queues = dict:update(QPid, UpdateFun, Queues)}.
+ State#lim{queues = orddict:update(QPid, UpdateFun, Queues)}.
notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
{QList, NewQueues} =
- dict:fold(fun (_QPid, {_, false}, Acc) -> Acc;
- (QPid, {MRef, true}, {L, D}) ->
- {[QPid | L], dict:store(QPid, {MRef, false}, D)}
- end, {[], Queues}, Queues),
+ orddict:fold(fun (_QPid, {_, false}, Acc) -> Acc;
+ (QPid, {MRef, true}, {L, D}) ->
+ {[QPid | L], orddict:store(QPid, {MRef, false}, D)}
+ end, {[], Queues}, Queues),
case length(QList) of
0 -> ok;
L ->