diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-07 11:17:33 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-07 11:17:33 +0000 |
commit | 948e3e59cda4c80056dc27cff8b2d6ce542aaa37 (patch) | |
tree | 1e1c3e7536c45ec7dcf2228eff3602f5add31698 | |
parent | 982f61fe0fb6cac8dc46d9f6ca027763a9fb4545 (diff) | |
download | rabbitmq-server-948e3e59cda4c80056dc27cff8b2d6ce542aaa37.tar.gz |
Notify on channel down.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 |
1 files changed, 10 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 886db8d1..94cfa619 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -701,6 +701,12 @@ remove_consumers(ChPid, Queue, QName) -> true end, Queue). +channel_consumers(ChPid, Queue) -> + priority_queue:fold( + fun ({CP, #consumer{tag = CTag}}, _, Acc) when CP =:= ChPid -> + [CTag | Acc] + end, [], Queue). + possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of not_found -> State; @@ -760,6 +766,10 @@ handle_ch_down(DownPid, State = #q{active_consumers = AC, end, State2 = State1#q{active_consumers = AC1, exclusive_consumer = Holder1}, + [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) || + CTag <- channel_consumers(ChPid, AC)], + [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) || + CTag <- channel_consumers(ChPid, Blocked)], case should_auto_delete(State2) of true -> {stop, State2}; false -> {ok, requeue_and_run(queue:to_list(ChAckTags), |