diff options
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r-- | src/rabbit_limiter.erl | 22 |
1 files changed, 16 insertions, 6 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 252ba001..7d840861 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -80,12 +80,14 @@ start_link(ChPid, UnackedMsgCount) -> shutdown(undefined) -> ok; shutdown(LimiterPid) -> + true = unlink(LimiterPid), gen_server2:cast(LimiterPid, shutdown). limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - gen_server2:call(LimiterPid, {limit, PrefetchCount}). + unlink_on_stopped(LimiterPid, + gen_server2:call(LimiterPid, {limit, PrefetchCount})). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -123,7 +125,8 @@ block(LimiterPid) -> unblock(undefined) -> ok; unblock(LimiterPid) -> - gen_server2:call(LimiterPid, unblock, infinity). + unlink_on_stopped(LimiterPid, + gen_server2:call(LimiterPid, unblock, infinity)). %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -169,10 +172,8 @@ handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count end, - case maybe_notify(State, State#lim{volume = NewVolume}) of - {cont, State1} -> {noreply, State1}; - {stop, State1} -> {stop, normal, State1} - end; + {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), + {noreply, State1}; handle_cast({register, QPid}, State) -> {noreply, remember_queue(QPid, State)}; @@ -246,3 +247,12 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. + +unlink_on_stopped(LimiterPid, stopped) -> + true = unlink(LimiterPid), + ok = receive {'EXIT', LimiterPid, _Reason} -> ok + after 0 -> ok + end, + stopped; +unlink_on_stopped(_LimiterPid, Result) -> + Result. |