diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-13 12:11:00 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-13 12:11:00 +0100 |
commit | b1626fa03a6c7c6be73bccdcccd83f133f1b5c2d (patch) | |
tree | 607df03b173667a0a4cbe78b9e81a1f27119d750 | |
parent | 7a44f77d4435564f11158ca48ab578edc2c5ac7d (diff) | |
download | rabbitmq-server-b1626fa03a6c7c6be73bccdcccd83f133f1b5c2d.tar.gz |
merge stats and confirm monitors
-rw-r--r-- | src/rabbit_channel.erl | 59 |
1 files changed, 35 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4acda156..fde44c4d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -552,20 +552,20 @@ process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ, fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) -> case gb_trees:lookup(MsgSeqNo, UMQ0) of {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, - Acc, Nack); + Acc, Nack, State); none -> Acc end end, {[], UMQ, UQM}, MsgSeqNos), {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) -> +remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack, State) -> UQM1 = case gb_trees:lookup(QPid, UQM) of - {value, {MRef, MsgSeqNos}} -> + {value, MsgSeqNos} -> MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), case gb_sets:is_empty(MsgSeqNos1) of - true -> erlang:demonitor(MRef), + true -> maybe_demonitor(QPid, State), gb_trees:delete(QPid, UQM); - false -> gb_trees:update(QPid, {MRef, MsgSeqNos1}, UQM) + false -> gb_trees:update(QPid, MsgSeqNos1, UQM) end; none -> UQM @@ -1121,6 +1121,30 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +maybe_monitor(QPid) -> + case get({monitoring, QPid}) of + undefined -> MRef = erlang:monitor(process, QPid), + put({monitoring, QPid}, MRef), + ok; + _ -> ok + end. + +confirm_maybe_monitor(QPid, UQM) -> + case gb_trees:is_defined(QPid, UQM) of + true -> ok; + false -> maybe_monitor(QPid) + end. + +maybe_demonitor(QPid, #ch{stats_timer = StatsTimer}) -> + case get({monitoring, QPid}) of + undefined -> ok; + MRef -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, + if not StatsEnabled -> true = erlang:demonitor(MRef); + true -> ok + end, + ok + end. + monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, consumer_monitors = ConsumerMonitors, capabilities = Capabilities}) -> @@ -1138,16 +1162,10 @@ monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, State end. -monitor_confirm_queue(QPid, UQM) -> - case gb_trees:is_defined(QPid, UQM) of - true -> undefined; - false -> erlang:monitor(process, QPid) - end. - handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of - {value, {_MRef, MsgSet}} -> gb_sets:to_list(MsgSet); - none -> [] + {value, MsgSet} -> gb_sets:to_list(MsgSet); + none -> [] end, %% We remove the MsgSeqNos from UQM before calling %% process_confirms to prevent each MsgSeqNo being removed from @@ -1364,12 +1382,12 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> UQM1 = lists:foldl( fun (QPid, UQM2) -> case gb_trees:lookup(QPid, UQM2) of - {value, {MRef, MsgSeqNos}} -> + {value, MsgSeqNos} -> MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), - gb_trees:update(QPid, {MRef, MsgSeqNos1}, UQM2); + gb_trees:update(QPid, MsgSeqNos1, UQM2); none -> - MRef = monitor_confirm_queue(QPid, UQM2), - gb_trees:insert(QPid, {MRef, SingletonSet}, UQM2) + ok = confirm_maybe_monitor(QPid, UQM2), + gb_trees:insert(QPid, SingletonSet, UQM2) end end, UQM, QPids), State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}. @@ -1494,13 +1512,6 @@ incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> incr_stats(X, Inc, Measure) -> update_measures(exchange_stats, X, Inc, Measure). -maybe_monitor(QPid) -> - case get({monitoring, QPid}) of - undefined -> erlang:monitor(process, QPid), - put({monitoring, QPid}, true); - _ -> ok - end. - update_measures(Type, QX, Inc, Measure) -> Measures = case get({Type, QX}) of undefined -> []; |