summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-03-03 17:38:54 +0000
committerMatthew Sackman <matthew@lshift.net>2010-03-03 17:38:54 +0000
commit696b375da9cf26dbabead3d9abd1640f15ff0321 (patch)
treefd3c0180d5b72ed9f712ed51d130ca86314a61e0
parent22a15f8b78591107381617f1ed882fb1c93a1334 (diff)
parente3571ea6e641200ff18bd59d314ae84cc5e139aa (diff)
downloadrabbitmq-server-bug22423.tar.gz
Merging headsbug22423
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_limiter.erl22
2 files changed, 16 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 314ca826..41085fb7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -222,8 +222,6 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
{stop, normal, State};
-handle_info({'EXIT', _OldLimiterPid, normal}, State) ->
- {noreply, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 252ba001..7d840861 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -80,12 +80,14 @@ start_link(ChPid, UnackedMsgCount) ->
shutdown(undefined) ->
ok;
shutdown(LimiterPid) ->
+ true = unlink(LimiterPid),
gen_server2:cast(LimiterPid, shutdown).
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- gen_server2:call(LimiterPid, {limit, PrefetchCount}).
+ unlink_on_stopped(LimiterPid,
+ gen_server2:call(LimiterPid, {limit, PrefetchCount})).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
@@ -123,7 +125,8 @@ block(LimiterPid) ->
unblock(undefined) ->
ok;
unblock(LimiterPid) ->
- gen_server2:call(LimiterPid, unblock, infinity).
+ unlink_on_stopped(LimiterPid,
+ gen_server2:call(LimiterPid, unblock, infinity)).
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -169,10 +172,8 @@ handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
end,
- case maybe_notify(State, State#lim{volume = NewVolume}) of
- {cont, State1} -> {noreply, State1};
- {stop, State1} -> {stop, normal, State1}
- end;
+ {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}),
+ {noreply, State1};
handle_cast({register, QPid}, State) ->
{noreply, remember_queue(QPid, State)};
@@ -246,3 +247,12 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
ok
end,
State#lim{queues = NewQueues}.
+
+unlink_on_stopped(LimiterPid, stopped) ->
+ true = unlink(LimiterPid),
+ ok = receive {'EXIT', LimiterPid, _Reason} -> ok
+ after 0 -> ok
+ end,
+ stopped;
+unlink_on_stopped(_LimiterPid, Result) ->
+ Result.