summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-15 11:25:28 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-15 11:25:28 +0100
commit8acdda1f848597d9a403e02aaffbfb270e0767bf (patch)
treee67971f71ef59b3277d8599a3fac13b49f90c1e4
parent843ddc3518b731caec22115d7b34c2a6499e1750 (diff)
downloadrabbitmq-server-8acdda1f848597d9a403e02aaffbfb270e0767bf.tar.gz
consumer_monitors is now a set of QPids
-rw-r--r--src/rabbit_channel.erl44
1 files changed, 25 insertions, 19 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 861a3b3a..684c4206 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -191,7 +191,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = gb_sets:new(),
- consumer_monitors = dict:new(),
+ consumer_monitors = gb_sets:new(),
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
@@ -323,14 +323,13 @@ handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
noreply([ensure_stats_timer],
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
-handle_info({'DOWN', MRef, process, QPid, Reason},
+handle_info({'DOWN', _MRef, process, QPid, Reason},
State = #ch{consumer_monitors = ConsumerMonitors}) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
noreply(
- case dict:find(MRef, ConsumerMonitors) of
- error -> State1;
- {ok, ConsumerTag} -> handle_consuming_queue_down(MRef, ConsumerTag,
- State1)
+ case gb_sets:is_member(QPid, ConsumerMonitors) of
+ false -> State1;
+ true -> handle_consuming_queue_down(QPid, State1)
end);
handle_info({'EXIT', _Pid, Reason}, State) ->
@@ -780,7 +779,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
{ok, Q = #amqqueue{pid = QPid}} ->
ConsumerMonitors1 = case get({monitoring, QPid}) of
undefined -> ConsumerMonitors;
- MRef -> dict:erase(MRef, ConsumerMonitors)
+ _ -> gb_sets:delete(QPid,
+ ConsumerMonitors)
end,
NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag,
ConsumerMapping),
@@ -1137,9 +1137,8 @@ consumer_monitor(ConsumerTag,
Capabilities, <<"consumer_cancel_notify">>) of
{bool, true} ->
#amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping),
- MRef = monitor_queue(QPid),
- State#ch{consumer_monitors =
- dict:store(MRef, ConsumerTag, ConsumerMonitors)};
+ monitor_queue(QPid),
+ State#ch{consumer_monitors = gb_sets:add(QPid, ConsumerMonitors)};
_ ->
State
end.
@@ -1151,8 +1150,7 @@ demonitor_queue(QPid, #ch{stats_timer = StatsTimer,
case get({monitoring, QPid}) of
undefined -> ok;
MRef -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
- ConsumerMonitored =
- dict:find(MRef, ConsumerMonitors) =/= error,
+ ConsumerMonitored = gb_sets:is_member(QPid, ConsumerMonitors),
QueueBlocked = gb_sets:is_element(QPid, Blocking),
ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
case StatsEnabled or ConsumerMonitored or
@@ -1188,16 +1186,24 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
State3 = SendFun(MXs, State2),
queue_blocked(QPid, State3).
-handle_consuming_queue_down(MRef, ConsumerTag,
+handle_consuming_queue_down(QPid,
State = #ch{consumer_mapping = ConsumerMapping,
consumer_monitors = ConsumerMonitors,
writer_pid = WriterPid}) ->
- ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
- ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors),
- Cancel = #'basic.cancel'{consumer_tag = ConsumerTag,
- nowait = true},
- ok = rabbit_writer:send_command(WriterPid, Cancel),
- State#ch{consumer_mapping = ConsumerMapping1,
+ {ConsumerTags, ConsumerMapping1} =
+ dict:fold(fun (CTag, #amqqueue{pid = QPid0}, {CTags, CMap})
+ when QPid =:= QPid0 ->
+ {[CTag | CTags], CMap};
+ (CTag, Q, {CTags, CMap}) ->
+ {CTags, dict:store(CTag, Q, CMap)}
+ end, {[], dict:new()}, ConsumerMapping),
+ ConsumerMonitors1 = gb_sets:delete(QPid, ConsumerMonitors),
+ [begin
+ Cancel = #'basic.cancel'{consumer_tag = ConsumerTag,
+ nowait = true},
+ ok = rabbit_writer:send_command(WriterPid, Cancel)
+ end || ConsumerTag <- ConsumerTags],
+ State#ch{consumer_mapping = ConsumerMapping1,
consumer_monitors = ConsumerMonitors1}.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,