summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-13 13:35:32 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-13 13:35:32 +0100
commitead26e578cc05d20f8de272b8dfff23439eeb836 (patch)
treee934b34cd5da74de341e6c5cce5688e9f22b9591
parentb1626fa03a6c7c6be73bccdcccd83f133f1b5c2d (diff)
downloadrabbitmq-server-ead26e578cc05d20f8de272b8dfff23439eeb836.tar.gz
merge consumer and (confirms, stats) monitors
-rw-r--r--src/rabbit_channel.erl74
1 files changed, 37 insertions, 37 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index fde44c4d..bcbb78d4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -275,7 +275,7 @@ handle_cast(terminate, State) ->
handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
- noreply(monitor_consumer(ConsumerTag, State));
+ noreply(consumer_maybe_monitor(ConsumerTag, State));
handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
@@ -563,8 +563,10 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack, State) ->
{value, MsgSeqNos} ->
MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
case gb_sets:is_empty(MsgSeqNos1) of
- true -> maybe_demonitor(QPid, State),
- gb_trees:delete(QPid, UQM);
+ true -> UQM2 = gb_trees:delete(QPid, UQM),
+ maybe_demonitor(QPid,
+ State#ch{unconfirmed_qm = UQM2}),
+ UQM2;
false -> gb_trees:update(QPid, MsgSeqNos1, UQM)
end;
none ->
@@ -747,12 +749,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
end) of
{ok, Q} ->
State1 = State#ch{consumer_mapping =
- dict:store(ActualConsumerTag,
- {Q, undefined},
+ dict:store(ActualConsumerTag, Q,
ConsumerMapping)},
{noreply,
case NoWait of
- true -> monitor_consumer(ActualConsumerTag, State1);
+ true -> consumer_maybe_monitor(ActualConsumerTag, State1);
false -> State1
end};
{{error, exclusive_consume_unavailable}, _Q} ->
@@ -775,16 +776,15 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
error ->
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
- {ok, {Q, MRef}} ->
- ConsumerMonitors1 =
- case MRef of
- undefined -> ConsumerMonitors;
- _ -> true = erlang:demonitor(MRef),
- dict:erase(MRef, ConsumerMonitors)
- end,
+ {ok, Q = #amqqueue{pid = QPid}} ->
+ ConsumerMonitors1 = case get({monitoring, QPid}) of
+ undefined -> ConsumerMonitors;
+ MRef -> dict:erase(MRef, ConsumerMonitors)
+ end,
NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag,
ConsumerMapping),
consumer_monitors = ConsumerMonitors1},
+ ok = maybe_demonitor(Q, NewState),
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
%% the queue process to send the cancel_ok on our
@@ -1125,43 +1125,44 @@ maybe_monitor(QPid) ->
case get({monitoring, QPid}) of
undefined -> MRef = erlang:monitor(process, QPid),
put({monitoring, QPid}, MRef),
- ok;
- _ -> ok
+ MRef;
+ MRef -> MRef
end.
confirm_maybe_monitor(QPid, UQM) ->
case gb_trees:is_defined(QPid, UQM) of
true -> ok;
- false -> maybe_monitor(QPid)
- end.
-
-maybe_demonitor(QPid, #ch{stats_timer = StatsTimer}) ->
- case get({monitoring, QPid}) of
- undefined -> ok;
- MRef -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
- if not StatsEnabled -> true = erlang:demonitor(MRef);
- true -> ok
- end,
- ok
+ false -> maybe_monitor(QPid),
+ ok
end.
-monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors,
- capabilities = Capabilities}) ->
+consumer_maybe_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors,
+ capabilities = Capabilities}) ->
case rabbit_misc:table_lookup(
Capabilities, <<"consumer_cancel_notify">>) of
{bool, true} ->
- {#amqqueue{pid = QPid} = Q, undefined} =
- dict:fetch(ConsumerTag, ConsumerMapping),
- MRef = erlang:monitor(process, QPid),
- State#ch{consumer_mapping =
- dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping),
- consumer_monitors =
+ #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping),
+ MRef = maybe_monitor(QPid),
+ State#ch{consumer_monitors =
dict:store(MRef, ConsumerTag, ConsumerMonitors)};
_ ->
State
end.
+maybe_demonitor(QPid, #ch{stats_timer = StatsTimer,
+ consumer_monitors = ConsumerMonitors}) ->
+ case get({monitoring, QPid}) of
+ undefined -> ok;
+ MRef -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
+ ConsumerMonitored = dict:find(MRef, ConsumerMonitors) =/= error,
+ case not StatsEnabled and not ConsumerMonitored of
+ true -> true = erlang:demonitor(MRef);
+ false -> ok
+ end,
+ ok
+ end.
+
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
@@ -1332,8 +1333,7 @@ limit_queues(Limiter, #ch{consumer_mapping = Consumers}) ->
consumer_queues(Consumers) ->
lists:usort([QPid ||
- {_Key, {#amqqueue{pid = QPid}, _MRef}}
- <- dict:to_list(Consumers)]).
+ {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for