diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-03-02 16:07:13 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-03-02 16:07:13 +0000 |
commit | a5bddb9ebdc68ea26ad44c71009b084dbf6e648e (patch) | |
tree | c82111bd8211d2be78691e79d32b63af3b64333f /src/rabbit_limiter.erl | |
parent | 56c564c9627d621108ebb5bb378943940e5e5d5e (diff) | |
download | rabbitmq-server-a5bddb9ebdc68ea26ad44c71009b084dbf6e648e.tar.gz |
Problem: if the limiter is blocked due to a client flow, and the client then issues a qos = 0, we can't have the limiter exiting. Thus limit needs to become a call, plus associated changes. Note that we *have* to know whether or not the limiter is about to stop - if we just wait for the exit signal then there's a race with a new qos setting being sent to an expiring limiter and the limiter thus getting lost. Also, the channel *must* be responsible for telling the queues to forget the limiter, otherwise again there's a race between a new limiter starting up and telling the queues about it, and the old limiter dying and telling the queues they're unlimited. This is why the limit_queues function cannot be moved into the limiter - the channel must drive this.
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r-- | src/rabbit_limiter.erl | 17 |
1 files changed, 10 insertions, 7 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index c9f8183f..d998499d 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -47,7 +47,7 @@ -spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). --spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). @@ -77,13 +77,12 @@ start_link(ChPid, UnackedMsgCount) -> shutdown(undefined) -> ok; shutdown(LimiterPid) -> - unlink(LimiterPid), gen_server2:cast(LimiterPid, shutdown). limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - gen_server2:cast(LimiterPid, {limit, PrefetchCount}). + gen_server2:call(LimiterPid, {limit, PrefetchCount}). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -130,14 +129,18 @@ handle_call({can_send, QPid, AckRequired}, _From, end; handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> - {reply, PrefetchCount, State}. + {reply, PrefetchCount, State}; + +handle_call({limit, PrefetchCount}, _From, State) -> + State1 = maybe_notify(State, State#lim{prefetch_count = PrefetchCount}), + case PrefetchCount == 0 of + true -> {stop, normal, stopped, State1}; + false -> {reply, ok, State1} + end. handle_cast(shutdown, State) -> {stop, normal, State}; -handle_cast({limit, PrefetchCount}, State) -> - {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; - handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count |