summaryrefslogtreecommitdiff
path: root/src/rabbit_limiter.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r--src/rabbit_limiter.erl33
1 files changed, 16 insertions, 17 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 86ea7282..8f9ab032 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -49,7 +49,7 @@
-record(lim, {prefetch_count = 0,
ch_pid,
blocked = false,
- queues = dict:new(), % QPid -> {MonitorRef, Notify}
+ queues = orddict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
%% notified of a change in the limit or volume that may allow it to
@@ -65,7 +65,7 @@ start_link(ChPid, UnackedMsgCount) ->
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- gen_server2:call(LimiterPid, {limit, PrefetchCount}).
+ gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
@@ -120,9 +120,9 @@ init([ChPid, UnackedMsgCount]) ->
prioritise_call(get_limit, _From, _State) -> 9;
prioritise_call(_Msg, _From, _State) -> 0.
-handle_call({can_send, _QPid, _AckRequired}, _From,
+handle_call({can_send, QPid, _AckRequired}, _From,
State = #lim{blocked = true}) ->
- {reply, false, State};
+ {reply, false, limit_queue(QPid, State)};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
case limit_reached(State) of
@@ -196,31 +196,30 @@ limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
blocked(#lim{blocked = Blocked}) -> Blocked.
remember_queue(QPid, State = #lim{queues = Queues}) ->
- case dict:is_key(QPid, Queues) of
+ case orddict:is_key(QPid, Queues) of
false -> MRef = erlang:monitor(process, QPid),
- State#lim{queues = dict:store(QPid, {MRef, false}, Queues)};
+ State#lim{queues = orddict:store(QPid, {MRef, false}, Queues)};
true -> State
end.
forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
- case dict:find(QPid, Queues) of
- {ok, {MRef, _}} ->
- true = erlang:demonitor(MRef),
- ok = rabbit_amqqueue:unblock(QPid, ChPid),
- State#lim{queues = dict:erase(QPid, Queues)};
- error -> State
+ case orddict:find(QPid, Queues) of
+ {ok, {MRef, _}} -> true = erlang:demonitor(MRef),
+ ok = rabbit_amqqueue:unblock(QPid, ChPid),
+ State#lim{queues = orddict:erase(QPid, Queues)};
+ error -> State
end.
limit_queue(QPid, State = #lim{queues = Queues}) ->
UpdateFun = fun ({MRef, _}) -> {MRef, true} end,
- State#lim{queues = dict:update(QPid, UpdateFun, Queues)}.
+ State#lim{queues = orddict:update(QPid, UpdateFun, Queues)}.
notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
{QList, NewQueues} =
- dict:fold(fun (_QPid, {_, false}, Acc) -> Acc;
- (QPid, {MRef, true}, {L, D}) ->
- {[QPid | L], dict:store(QPid, {MRef, false}, D)}
- end, {[], Queues}, Queues),
+ orddict:fold(fun (_QPid, {_, false}, Acc) -> Acc;
+ (QPid, {MRef, true}, {L, D}) ->
+ {[QPid | L], orddict:store(QPid, {MRef, false}, D)}
+ end, {[], Queues}, Queues),
case length(QList) of
0 -> ok;
L ->