summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-03-03 17:37:53 +0000
committerMatthew Sackman <matthew@lshift.net>2010-03-03 17:37:53 +0000
commite3571ea6e641200ff18bd59d314ae84cc5e139aa (patch)
tree6b98c3e2fa83283a1f9dedc153c8eb0d03baf3bc
parent369cd5a52308dc68eef2ef54a1dfbc1e36a7906b (diff)
downloadrabbitmq-server-e3571ea6e641200ff18bd59d314ae84cc5e139aa.tar.gz
Better handle the limiter exiting
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_limiter.erl22
2 files changed, 16 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1bfda6d4..ed49ee3b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -223,8 +223,6 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
{stop, normal, State};
-handle_info({'EXIT', _OldLimiterPid, normal}, State) ->
- {noreply, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 4cb8725b..9ef79e19 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -80,12 +80,14 @@ start_link(ChPid, UnackedMsgCount) ->
shutdown(undefined) ->
ok;
shutdown(LimiterPid) ->
+ true = unlink(LimiterPid),
gen_server2:cast(LimiterPid, shutdown).
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- gen_server2:call(LimiterPid, {limit, PrefetchCount}).
+ unlink_on_stopped(LimiterPid,
+ gen_server2:call(LimiterPid, {limit, PrefetchCount})).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
@@ -123,7 +125,8 @@ block(LimiterPid) ->
unblock(undefined) ->
ok;
unblock(LimiterPid) ->
- gen_server2:call(LimiterPid, unblock, infinity).
+ unlink_on_stopped(LimiterPid,
+ gen_server2:call(LimiterPid, unblock, infinity)).
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -169,10 +172,8 @@ handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
end,
- case maybe_notify(State, State#lim{volume = NewVolume}) of
- {cont, State1} -> {noreply, State1};
- {stop, State1} -> {stop, normal, State1}
- end;
+ {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}),
+ {noreply, State1};
handle_cast({register, QPid}, State) ->
{noreply, remember_queue(QPid, State)};
@@ -246,3 +247,12 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
ok
end,
State#lim{queues = NewQueues}.
+
+unlink_on_stopped(LimiterPid, stopped) ->
+ true = unlink(LimiterPid),
+ ok = receive {'EXIT', LimiterPid, _Reason} -> ok
+ after 0 -> ok
+ end,
+ stopped;
+unlink_on_stopped(_LimiterPid, Result) ->
+ Result.