diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-03-03 17:38:54 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-03-03 17:38:54 +0000 |
commit | 696b375da9cf26dbabead3d9abd1640f15ff0321 (patch) | |
tree | fd3c0180d5b72ed9f712ed51d130ca86314a61e0 | |
parent | 22a15f8b78591107381617f1ed882fb1c93a1334 (diff) | |
parent | e3571ea6e641200ff18bd59d314ae84cc5e139aa (diff) | |
download | rabbitmq-server-bug22423.tar.gz |
Merging headsbug22423
-rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 22 |
2 files changed, 16 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 314ca826..41085fb7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -222,8 +222,6 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, {stop, normal, State}; -handle_info({'EXIT', _OldLimiterPid, normal}, State) -> - {noreply, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 252ba001..7d840861 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -80,12 +80,14 @@ start_link(ChPid, UnackedMsgCount) -> shutdown(undefined) -> ok; shutdown(LimiterPid) -> + true = unlink(LimiterPid), gen_server2:cast(LimiterPid, shutdown). limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - gen_server2:call(LimiterPid, {limit, PrefetchCount}). + unlink_on_stopped(LimiterPid, + gen_server2:call(LimiterPid, {limit, PrefetchCount})). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -123,7 +125,8 @@ block(LimiterPid) -> unblock(undefined) -> ok; unblock(LimiterPid) -> - gen_server2:call(LimiterPid, unblock, infinity). + unlink_on_stopped(LimiterPid, + gen_server2:call(LimiterPid, unblock, infinity)). %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -169,10 +172,8 @@ handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count end, - case maybe_notify(State, State#lim{volume = NewVolume}) of - {cont, State1} -> {noreply, State1}; - {stop, State1} -> {stop, normal, State1} - end; + {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), + {noreply, State1}; handle_cast({register, QPid}, State) -> {noreply, remember_queue(QPid, State)}; @@ -246,3 +247,12 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. + +unlink_on_stopped(LimiterPid, stopped) -> + true = unlink(LimiterPid), + ok = receive {'EXIT', LimiterPid, _Reason} -> ok + after 0 -> ok + end, + stopped; +unlink_on_stopped(_LimiterPid, Result) -> + Result. |