summaryrefslogtreecommitdiff
path: root/src/rabbit_limiter.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-03-02 16:07:13 +0000
committerMatthew Sackman <matthew@lshift.net>2010-03-02 16:07:13 +0000
commita5bddb9ebdc68ea26ad44c71009b084dbf6e648e (patch)
treec82111bd8211d2be78691e79d32b63af3b64333f /src/rabbit_limiter.erl
parent56c564c9627d621108ebb5bb378943940e5e5d5e (diff)
downloadrabbitmq-server-a5bddb9ebdc68ea26ad44c71009b084dbf6e648e.tar.gz
Problem: if the limiter is blocked due to a client flow, and the client then issues a qos = 0, we can't have the limiter exiting. Thus limit needs to become a call, plus associated changes. Note that we *have* to know whether or not the limiter is about to stop - if we just wait for the exit signal then there's a race with a new qos setting being sent to an expiring limiter and the limiter thus getting lost. Also, the channel *must* be responsible for telling the queues to forget the limiter, otherwise again there's a race between a new limiter starting up and telling the queues about it, and the old limiter dying and telling the queues they're unlimited. This is why the limit_queues function cannot be moved into the limiter - the channel must drive this.
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r--src/rabbit_limiter.erl17
1 files changed, 10 insertions, 7 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index c9f8183f..d998499d 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -47,7 +47,7 @@
-spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
--spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
+-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
@@ -77,13 +77,12 @@ start_link(ChPid, UnackedMsgCount) ->
shutdown(undefined) ->
ok;
shutdown(LimiterPid) ->
- unlink(LimiterPid),
gen_server2:cast(LimiterPid, shutdown).
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- gen_server2:cast(LimiterPid, {limit, PrefetchCount}).
+ gen_server2:call(LimiterPid, {limit, PrefetchCount}).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
@@ -130,14 +129,18 @@ handle_call({can_send, QPid, AckRequired}, _From,
end;
handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
- {reply, PrefetchCount, State}.
+ {reply, PrefetchCount, State};
+
+handle_call({limit, PrefetchCount}, _From, State) ->
+ State1 = maybe_notify(State, State#lim{prefetch_count = PrefetchCount}),
+ case PrefetchCount == 0 of
+ true -> {stop, normal, stopped, State1};
+ false -> {reply, ok, State1}
+ end.
handle_cast(shutdown, State) ->
{stop, normal, State};
-handle_cast({limit, PrefetchCount}, State) ->
- {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})};
-
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count