summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-13 13:57:49 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-13 13:57:49 +0100
commitd146bcbd9069a3c93b45bfcbf43d72ccc56896bc (patch)
treee2a7af986f11fbfe77c2d29bdbf8fe7678b26100
parentead26e578cc05d20f8de272b8dfff23439eeb836 (diff)
downloadrabbitmq-server-d146bcbd9069a3c93b45bfcbf43d72ccc56896bc.tar.gz
merge flow and (consumer, confirm, starts) monitors
-rw-r--r--src/rabbit_channel.erl37
1 files changed, 20 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index bcbb78d4..fc3d2c9a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -190,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
- blocking = dict:new(),
+ blocking = gb_sets:new(),
consumer_monitors = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
@@ -516,17 +516,18 @@ check_name(_Kind, NameBin) ->
NameBin.
queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
- case dict:find(QPid, Blocking) of
- error -> State;
- {ok, MRef} -> true = erlang:demonitor(MRef),
- Blocking1 = dict:erase(QPid, Blocking),
- ok = case dict:size(Blocking1) of
- 0 -> rabbit_writer:send_command(
- State#ch.writer_pid,
- #'channel.flow_ok'{active = false});
- _ -> ok
- end,
- State#ch{blocking = Blocking1}
+ case gb_sets:is_element(QPid, Blocking) of
+ false -> State;
+ true -> Blocking1 = gb_sets:delete(QPid, Blocking),
+ ok = case gb_sets:size(Blocking1) of
+ 0 -> rabbit_writer:send_command(
+ State#ch.writer_pid,
+ #'channel.flow_ok'{active = false});
+ _ -> ok
+ end,
+ State1 = State#ch{blocking = Blocking1},
+ maybe_demonitor(QPid, State1),
+ State1
end.
record_confirm(undefined, _, State) ->
@@ -1109,10 +1110,9 @@ handle_method(#'channel.flow'{active = false}, _,
ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
- QPid <- QPids],
+ QPids -> Queues = [begin maybe_monitor(QPid), QPid end || QPid <- QPids],
ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, State1#ch{blocking = dict:from_list(Queues)}}
+ {noreply, State1#ch{blocking = gb_sets:from_list(Queues)}}
end;
handle_method(_MethodRecord, _Content, _State) ->
@@ -1151,12 +1151,15 @@ consumer_maybe_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMappi
end.
maybe_demonitor(QPid, #ch{stats_timer = StatsTimer,
- consumer_monitors = ConsumerMonitors}) ->
+ consumer_monitors = ConsumerMonitors,
+ blocking = Blocking}) ->
case get({monitoring, QPid}) of
undefined -> ok;
MRef -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
ConsumerMonitored = dict:find(MRef, ConsumerMonitors) =/= error,
- case not StatsEnabled and not ConsumerMonitored of
+ QueueBlocked = gb_sets:is_element(QPid, Blocking),
+ case not StatsEnabled and not ConsumerMonitored and
+ not QueueBlocked of
true -> true = erlang:demonitor(MRef);
false -> ok
end,