summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-13 12:11:00 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-13 12:11:00 +0100
commitb1626fa03a6c7c6be73bccdcccd83f133f1b5c2d (patch)
tree607df03b173667a0a4cbe78b9e81a1f27119d750
parent7a44f77d4435564f11158ca48ab578edc2c5ac7d (diff)
downloadrabbitmq-server-b1626fa03a6c7c6be73bccdcccd83f133f1b5c2d.tar.gz
merge stats and confirm monitors
-rw-r--r--src/rabbit_channel.erl59
1 files changed, 35 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4acda156..fde44c4d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -552,20 +552,20 @@ process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ,
fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) ->
case gb_trees:lookup(MsgSeqNo, UMQ0) of
{value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
- Acc, Nack);
+ Acc, Nack, State);
none -> Acc
end
end, {[], UMQ, UQM}, MsgSeqNos),
{MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) ->
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack, State) ->
UQM1 = case gb_trees:lookup(QPid, UQM) of
- {value, {MRef, MsgSeqNos}} ->
+ {value, MsgSeqNos} ->
MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
case gb_sets:is_empty(MsgSeqNos1) of
- true -> erlang:demonitor(MRef),
+ true -> maybe_demonitor(QPid, State),
gb_trees:delete(QPid, UQM);
- false -> gb_trees:update(QPid, {MRef, MsgSeqNos1}, UQM)
+ false -> gb_trees:update(QPid, MsgSeqNos1, UQM)
end;
none ->
UQM
@@ -1121,6 +1121,30 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+maybe_monitor(QPid) ->
+ case get({monitoring, QPid}) of
+ undefined -> MRef = erlang:monitor(process, QPid),
+ put({monitoring, QPid}, MRef),
+ ok;
+ _ -> ok
+ end.
+
+confirm_maybe_monitor(QPid, UQM) ->
+ case gb_trees:is_defined(QPid, UQM) of
+ true -> ok;
+ false -> maybe_monitor(QPid)
+ end.
+
+maybe_demonitor(QPid, #ch{stats_timer = StatsTimer}) ->
+ case get({monitoring, QPid}) of
+ undefined -> ok;
+ MRef -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
+ if not StatsEnabled -> true = erlang:demonitor(MRef);
+ true -> ok
+ end,
+ ok
+ end.
+
monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
consumer_monitors = ConsumerMonitors,
capabilities = Capabilities}) ->
@@ -1138,16 +1162,10 @@ monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
State
end.
-monitor_confirm_queue(QPid, UQM) ->
- case gb_trees:is_defined(QPid, UQM) of
- true -> undefined;
- false -> erlang:monitor(process, QPid)
- end.
-
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
- {value, {_MRef, MsgSet}} -> gb_sets:to_list(MsgSet);
- none -> []
+ {value, MsgSet} -> gb_sets:to_list(MsgSet);
+ none -> []
end,
%% We remove the MsgSeqNos from UQM before calling
%% process_confirms to prevent each MsgSeqNo being removed from
@@ -1364,12 +1382,12 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
UQM1 = lists:foldl(
fun (QPid, UQM2) ->
case gb_trees:lookup(QPid, UQM2) of
- {value, {MRef, MsgSeqNos}} ->
+ {value, MsgSeqNos} ->
MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
- gb_trees:update(QPid, {MRef, MsgSeqNos1}, UQM2);
+ gb_trees:update(QPid, MsgSeqNos1, UQM2);
none ->
- MRef = monitor_confirm_queue(QPid, UQM2),
- gb_trees:insert(QPid, {MRef, SingletonSet}, UQM2)
+ ok = confirm_maybe_monitor(QPid, UQM2),
+ gb_trees:insert(QPid, SingletonSet, UQM2)
end
end, UQM, QPids),
State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}.
@@ -1494,13 +1512,6 @@ incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
incr_stats(X, Inc, Measure) ->
update_measures(exchange_stats, X, Inc, Measure).
-maybe_monitor(QPid) ->
- case get({monitoring, QPid}) of
- undefined -> erlang:monitor(process, QPid),
- put({monitoring, QPid}, true);
- _ -> ok
- end.
-
update_measures(Type, QX, Inc, Measure) ->
Measures = case get({Type, QX}) of
undefined -> [];