summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-07 11:17:33 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-07 11:17:33 +0000
commit948e3e59cda4c80056dc27cff8b2d6ce542aaa37 (patch)
tree1e1c3e7536c45ec7dcf2228eff3602f5add31698
parent982f61fe0fb6cac8dc46d9f6ca027763a9fb4545 (diff)
downloadrabbitmq-server-948e3e59cda4c80056dc27cff8b2d6ce542aaa37.tar.gz
Notify on channel down.
-rw-r--r--src/rabbit_amqqueue_process.erl10
1 files changed, 10 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 886db8d1..94cfa619 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -701,6 +701,12 @@ remove_consumers(ChPid, Queue, QName) ->
true
end, Queue).
+channel_consumers(ChPid, Queue) ->
+ priority_queue:fold(
+ fun ({CP, #consumer{tag = CTag}}, _, Acc) when CP =:= ChPid ->
+ [CTag | Acc]
+ end, [], Queue).
+
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
not_found -> State;
@@ -760,6 +766,10 @@ handle_ch_down(DownPid, State = #q{active_consumers = AC,
end,
State2 = State1#q{active_consumers = AC1,
exclusive_consumer = Holder1},
+ [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) ||
+ CTag <- channel_consumers(ChPid, AC)],
+ [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) ||
+ CTag <- channel_consumers(ChPid, Blocked)],
case should_auto_delete(State2) of
true -> {stop, State2};
false -> {ok, requeue_and_run(queue:to_list(ChAckTags),