diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-14 21:28:10 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-14 21:28:10 +0100 |
commit | e9abddfc8550f335f77d74377c9f364b33393bc7 (patch) | |
tree | 15d85d07075e53da33872405e96f62c52c276365 | |
parent | bb416385996c9cfcb5bd7c7c017c13d366703e5a (diff) | |
parent | e33c9c1550898dacd6fc97bd3ae9c55a455dac92 (diff) | |
download | rabbitmq-server-e9abddfc8550f335f77d74377c9f364b33393bc7.tar.gz |
merge from default + cosmetic
-rw-r--r-- | src/rabbit_channel.erl | 127 |
1 files changed, 72 insertions, 55 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dfe84644..a0d7fe79 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -190,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, virtual_host = VHost, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), - blocking = dict:new(), + blocking = gb_sets:new(), consumer_monitors = dict:new(), queue_collector_pid = CollectorPid, stats_timer = StatsTimer, @@ -275,7 +275,7 @@ handle_cast(terminate, State) -> handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), - noreply(monitor_consumer(ConsumerTag, State)); + noreply(consumer_monitor(ConsumerTag, State)); handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), @@ -325,12 +325,12 @@ handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) -> handle_info({'DOWN', MRef, process, QPid, Reason}, State = #ch{consumer_monitors = ConsumerMonitors}) -> + State1 = handle_publishing_queue_down(QPid, Reason, State), noreply( case dict:find(MRef, ConsumerMonitors) of - error -> - handle_publishing_queue_down(QPid, Reason, State); - {ok, ConsumerTag} -> - handle_consuming_queue_down(MRef, ConsumerTag, State) + error -> State1; + {ok, ConsumerTag} -> handle_consuming_queue_down(MRef, ConsumerTag, + State1) end); handle_info({'EXIT', _Pid, Reason}, State) -> @@ -516,17 +516,18 @@ check_name(_Kind, NameBin) -> NameBin. queue_blocked(QPid, State = #ch{blocking = Blocking}) -> - case dict:find(QPid, Blocking) of - error -> State; - {ok, MRef} -> true = erlang:demonitor(MRef), - Blocking1 = dict:erase(QPid, Blocking), - ok = case dict:size(Blocking1) of - 0 -> rabbit_writer:send_command( - State#ch.writer_pid, - #'channel.flow_ok'{active = false}); - _ -> ok - end, - State#ch{blocking = Blocking1} + case gb_sets:is_element(QPid, Blocking) of + false -> State; + true -> Blocking1 = gb_sets:delete(QPid, Blocking), + ok = case gb_sets:is_empty(Blocking1) of + true -> rabbit_writer:send_command( + State#ch.writer_pid, + #'channel.flow_ok'{active = false}); + false -> ok + end, + State1 = State#ch{blocking = Blocking1}, + demonitor_queue(QPid, State1), + State1 end. record_confirm(undefined, _, State) -> @@ -552,18 +553,21 @@ process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ, fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) -> case gb_trees:lookup(MsgSeqNo, UMQ0) of {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, - Acc, Nack); + Acc, Nack, State); none -> Acc end end, {[], UMQ, UQM}, MsgSeqNos), {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) -> +remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack, State) -> UQM1 = case gb_trees:lookup(QPid, UQM) of {value, MsgSeqNos} -> MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), case gb_sets:is_empty(MsgSeqNos1) of - true -> gb_trees:delete(QPid, UQM); + true -> UQM2 = gb_trees:delete(QPid, UQM), + demonitor_queue(QPid, + State#ch{unconfirmed_qm = UQM2}), + UQM2; false -> gb_trees:update(QPid, MsgSeqNos1, UQM) end; none -> @@ -746,12 +750,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, end) of {ok, Q} -> State1 = State#ch{consumer_mapping = - dict:store(ActualConsumerTag, - {Q, undefined}, + dict:store(ActualConsumerTag, Q, ConsumerMapping)}, {noreply, case NoWait of - true -> monitor_consumer(ActualConsumerTag, State1); + true -> consumer_monitor(ActualConsumerTag, State1); false -> State1 end}; {{error, exclusive_consume_unavailable}, _Q} -> @@ -774,16 +777,15 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, error -> %% Spec requires we ignore this situation. return_ok(State, NoWait, OkMsg); - {ok, {Q, MRef}} -> - ConsumerMonitors1 = - case MRef of - undefined -> ConsumerMonitors; - _ -> true = erlang:demonitor(MRef), - dict:erase(MRef, ConsumerMonitors) - end, + {ok, Q = #amqqueue{pid = QPid}} -> + ConsumerMonitors1 = case get({monitoring, QPid}) of + undefined -> ConsumerMonitors; + MRef -> dict:erase(MRef, ConsumerMonitors) + end, NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag, ConsumerMapping), consumer_monitors = ConsumerMonitors1}, + ok = demonitor_queue(Q, NewState), %% In order to ensure that no more messages are sent to %% the consumer after the cancel_ok has been sent, we get %% the queue process to send the cancel_ok on our @@ -1108,10 +1110,9 @@ handle_method(#'channel.flow'{active = false}, _, ok = rabbit_limiter:block(Limiter1), case consumer_queues(Consumers) of [] -> {reply, #'channel.flow_ok'{active = false}, State1}; - QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || - QPid <- QPids], + QPids -> Queues = [begin monitor_queue(QPid), QPid end || QPid <- QPids], ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, State1#ch{blocking = dict:from_list(Queues)}} + {noreply, State1#ch{blocking = gb_sets:from_list(Queues)}} end; handle_method(_MethodRecord, _Content, _State) -> @@ -1120,25 +1121,49 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- -monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, - consumer_monitors = ConsumerMonitors, - capabilities = Capabilities}) -> +monitor_queue(QPid) -> + case get({monitoring, QPid}) of + undefined -> MRef = erlang:monitor(process, QPid), + put({monitoring, QPid}, MRef), + MRef; + MRef -> MRef + end. + +consumer_monitor(ConsumerTag, + State = #ch{consumer_mapping = ConsumerMapping, + consumer_monitors = ConsumerMonitors, + capabilities = Capabilities}) -> case rabbit_misc:table_lookup( Capabilities, <<"consumer_cancel_notify">>) of {bool, true} -> - {#amqqueue{pid = QPid} = Q, undefined} = - dict:fetch(ConsumerTag, ConsumerMapping), - MRef = erlang:monitor(process, QPid), - State#ch{consumer_mapping = - dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping), - consumer_monitors = + #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping), + MRef = monitor_queue(QPid), + State#ch{consumer_monitors = dict:store(MRef, ConsumerTag, ConsumerMonitors)}; _ -> State end. +demonitor_queue(QPid, #ch{stats_timer = StatsTimer, + consumer_monitors = ConsumerMonitors, + blocking = Blocking}) -> + case get({monitoring, QPid}) of + undefined -> ok; + MRef -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, + ConsumerMonitored = + dict:find(MRef, ConsumerMonitors) =/= error, + QueueBlocked = gb_sets:is_element(QPid, Blocking), + case not StatsEnabled and not ConsumerMonitored and + not QueueBlocked of + true -> true = erlang:demonitor(MRef), + erase({monitoring, QPid}); + false -> ok + end, + ok + end. + handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> - MsgSeqNos = case gb_trees:lookup(QPid, UQM) of + MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); none -> [] end, @@ -1307,8 +1332,7 @@ limit_queues(Limiter, #ch{consumer_mapping = Consumers}) -> consumer_queues(Consumers) -> lists:usort([QPid || - {_Key, {#amqqueue{pid = QPid}, _MRef}} - <- dict:to_list(Consumers)]). + {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]). %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for @@ -1356,12 +1380,12 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> SingletonSet = gb_sets:singleton(MsgSeqNo), UQM1 = lists:foldl( fun (QPid, UQM2) -> - maybe_monitor(QPid), case gb_trees:lookup(QPid, UQM2) of {value, MsgSeqNos} -> MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), gb_trees:update(QPid, MsgSeqNos1, UQM2); none -> + monitor_queue(QPid), gb_trees:insert(QPid, SingletonSet, UQM2) end end, UQM, QPids), @@ -1479,21 +1503,14 @@ maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) -> end. incr_stats({QPid, _} = QX, Inc, Measure) -> - maybe_monitor(QPid), + monitor_queue(QPid), update_measures(queue_exchange_stats, QX, Inc, Measure); incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> - maybe_monitor(QPid), + monitor_queue(QPid), update_measures(queue_stats, QPid, Inc, Measure); incr_stats(X, Inc, Measure) -> update_measures(exchange_stats, X, Inc, Measure). -maybe_monitor(QPid) -> - case get({monitoring, QPid}) of - undefined -> erlang:monitor(process, QPid), - put({monitoring, QPid}, true); - _ -> ok - end. - update_measures(Type, QX, Inc, Measure) -> Measures = case get({Type, QX}) of undefined -> []; |