diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-15 11:25:28 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-15 11:25:28 +0100 |
commit | 8acdda1f848597d9a403e02aaffbfb270e0767bf (patch) | |
tree | e67971f71ef59b3277d8599a3fac13b49f90c1e4 | |
parent | 843ddc3518b731caec22115d7b34c2a6499e1750 (diff) | |
download | rabbitmq-server-8acdda1f848597d9a403e02aaffbfb270e0767bf.tar.gz |
consumer_monitors is now a set of QPids
-rw-r--r-- | src/rabbit_channel.erl | 44 |
1 files changed, 25 insertions, 19 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 861a3b3a..684c4206 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -191,7 +191,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), blocking = gb_sets:new(), - consumer_monitors = dict:new(), + consumer_monitors = gb_sets:new(), queue_collector_pid = CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, @@ -323,14 +323,13 @@ handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) -> noreply([ensure_stats_timer], State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); -handle_info({'DOWN', MRef, process, QPid, Reason}, +handle_info({'DOWN', _MRef, process, QPid, Reason}, State = #ch{consumer_monitors = ConsumerMonitors}) -> State1 = handle_publishing_queue_down(QPid, Reason, State), noreply( - case dict:find(MRef, ConsumerMonitors) of - error -> State1; - {ok, ConsumerTag} -> handle_consuming_queue_down(MRef, ConsumerTag, - State1) + case gb_sets:is_member(QPid, ConsumerMonitors) of + false -> State1; + true -> handle_consuming_queue_down(QPid, State1) end); handle_info({'EXIT', _Pid, Reason}, State) -> @@ -780,7 +779,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, {ok, Q = #amqqueue{pid = QPid}} -> ConsumerMonitors1 = case get({monitoring, QPid}) of undefined -> ConsumerMonitors; - MRef -> dict:erase(MRef, ConsumerMonitors) + _ -> gb_sets:delete(QPid, + ConsumerMonitors) end, NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag, ConsumerMapping), @@ -1137,9 +1137,8 @@ consumer_monitor(ConsumerTag, Capabilities, <<"consumer_cancel_notify">>) of {bool, true} -> #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping), - MRef = monitor_queue(QPid), - State#ch{consumer_monitors = - dict:store(MRef, ConsumerTag, ConsumerMonitors)}; + monitor_queue(QPid), + State#ch{consumer_monitors = gb_sets:add(QPid, ConsumerMonitors)}; _ -> State end. @@ -1151,8 +1150,7 @@ demonitor_queue(QPid, #ch{stats_timer = StatsTimer, case get({monitoring, QPid}) of undefined -> ok; MRef -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, - ConsumerMonitored = - dict:find(MRef, ConsumerMonitors) =/= error, + ConsumerMonitored = gb_sets:is_member(QPid, ConsumerMonitors), QueueBlocked = gb_sets:is_element(QPid, Blocking), ConfirmMonitored = gb_trees:is_defined(QPid, UQM), case StatsEnabled or ConsumerMonitored or @@ -1188,16 +1186,24 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> State3 = SendFun(MXs, State2), queue_blocked(QPid, State3). -handle_consuming_queue_down(MRef, ConsumerTag, +handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, consumer_monitors = ConsumerMonitors, writer_pid = WriterPid}) -> - ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping), - ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors), - Cancel = #'basic.cancel'{consumer_tag = ConsumerTag, - nowait = true}, - ok = rabbit_writer:send_command(WriterPid, Cancel), - State#ch{consumer_mapping = ConsumerMapping1, + {ConsumerTags, ConsumerMapping1} = + dict:fold(fun (CTag, #amqqueue{pid = QPid0}, {CTags, CMap}) + when QPid =:= QPid0 -> + {[CTag | CTags], CMap}; + (CTag, Q, {CTags, CMap}) -> + {CTags, dict:store(CTag, Q, CMap)} + end, {[], dict:new()}, ConsumerMapping), + ConsumerMonitors1 = gb_sets:delete(QPid, ConsumerMonitors), + [begin + Cancel = #'basic.cancel'{consumer_tag = ConsumerTag, + nowait = true}, + ok = rabbit_writer:send_command(WriterPid, Cancel) + end || ConsumerTag <- ConsumerTags], + State#ch{consumer_mapping = ConsumerMapping1, consumer_monitors = ConsumerMonitors1}. binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, |