diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-13 13:57:49 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-13 13:57:49 +0100 |
commit | d146bcbd9069a3c93b45bfcbf43d72ccc56896bc (patch) | |
tree | e2a7af986f11fbfe77c2d29bdbf8fe7678b26100 | |
parent | ead26e578cc05d20f8de272b8dfff23439eeb836 (diff) | |
download | rabbitmq-server-d146bcbd9069a3c93b45bfcbf43d72ccc56896bc.tar.gz |
merge flow and (consumer, confirm, starts) monitors
-rw-r--r-- | src/rabbit_channel.erl | 37 |
1 files changed, 20 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index bcbb78d4..fc3d2c9a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -190,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, virtual_host = VHost, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), - blocking = dict:new(), + blocking = gb_sets:new(), consumer_monitors = dict:new(), queue_collector_pid = CollectorPid, stats_timer = StatsTimer, @@ -516,17 +516,18 @@ check_name(_Kind, NameBin) -> NameBin. queue_blocked(QPid, State = #ch{blocking = Blocking}) -> - case dict:find(QPid, Blocking) of - error -> State; - {ok, MRef} -> true = erlang:demonitor(MRef), - Blocking1 = dict:erase(QPid, Blocking), - ok = case dict:size(Blocking1) of - 0 -> rabbit_writer:send_command( - State#ch.writer_pid, - #'channel.flow_ok'{active = false}); - _ -> ok - end, - State#ch{blocking = Blocking1} + case gb_sets:is_element(QPid, Blocking) of + false -> State; + true -> Blocking1 = gb_sets:delete(QPid, Blocking), + ok = case gb_sets:size(Blocking1) of + 0 -> rabbit_writer:send_command( + State#ch.writer_pid, + #'channel.flow_ok'{active = false}); + _ -> ok + end, + State1 = State#ch{blocking = Blocking1}, + maybe_demonitor(QPid, State1), + State1 end. record_confirm(undefined, _, State) -> @@ -1109,10 +1110,9 @@ handle_method(#'channel.flow'{active = false}, _, ok = rabbit_limiter:block(Limiter1), case consumer_queues(Consumers) of [] -> {reply, #'channel.flow_ok'{active = false}, State1}; - QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || - QPid <- QPids], + QPids -> Queues = [begin maybe_monitor(QPid), QPid end || QPid <- QPids], ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, State1#ch{blocking = dict:from_list(Queues)}} + {noreply, State1#ch{blocking = gb_sets:from_list(Queues)}} end; handle_method(_MethodRecord, _Content, _State) -> @@ -1151,12 +1151,15 @@ consumer_maybe_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMappi end. maybe_demonitor(QPid, #ch{stats_timer = StatsTimer, - consumer_monitors = ConsumerMonitors}) -> + consumer_monitors = ConsumerMonitors, + blocking = Blocking}) -> case get({monitoring, QPid}) of undefined -> ok; MRef -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, ConsumerMonitored = dict:find(MRef, ConsumerMonitors) =/= error, - case not StatsEnabled and not ConsumerMonitored of + QueueBlocked = gb_sets:is_element(QPid, Blocking), + case not StatsEnabled and not ConsumerMonitored and + not QueueBlocked of true -> true = erlang:demonitor(MRef); false -> ok end, |