summaryrefslogtreecommitdiff
path: root/src/rabbit_limiter.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r--src/rabbit_limiter.erl22
1 files changed, 16 insertions, 6 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 252ba001..7d840861 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -80,12 +80,14 @@ start_link(ChPid, UnackedMsgCount) ->
shutdown(undefined) ->
ok;
shutdown(LimiterPid) ->
+ true = unlink(LimiterPid),
gen_server2:cast(LimiterPid, shutdown).
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- gen_server2:call(LimiterPid, {limit, PrefetchCount}).
+ unlink_on_stopped(LimiterPid,
+ gen_server2:call(LimiterPid, {limit, PrefetchCount})).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
@@ -123,7 +125,8 @@ block(LimiterPid) ->
unblock(undefined) ->
ok;
unblock(LimiterPid) ->
- gen_server2:call(LimiterPid, unblock, infinity).
+ unlink_on_stopped(LimiterPid,
+ gen_server2:call(LimiterPid, unblock, infinity)).
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -169,10 +172,8 @@ handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
end,
- case maybe_notify(State, State#lim{volume = NewVolume}) of
- {cont, State1} -> {noreply, State1};
- {stop, State1} -> {stop, normal, State1}
- end;
+ {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}),
+ {noreply, State1};
handle_cast({register, QPid}, State) ->
{noreply, remember_queue(QPid, State)};
@@ -246,3 +247,12 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
ok
end,
State#lim{queues = NewQueues}.
+
+unlink_on_stopped(LimiterPid, stopped) ->
+ true = unlink(LimiterPid),
+ ok = receive {'EXIT', LimiterPid, _Reason} -> ok
+ after 0 -> ok
+ end,
+ stopped;
+unlink_on_stopped(_LimiterPid, Result) ->
+ Result.