diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-13 13:35:32 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-13 13:35:32 +0100 |
commit | ead26e578cc05d20f8de272b8dfff23439eeb836 (patch) | |
tree | e934b34cd5da74de341e6c5cce5688e9f22b9591 | |
parent | b1626fa03a6c7c6be73bccdcccd83f133f1b5c2d (diff) | |
download | rabbitmq-server-ead26e578cc05d20f8de272b8dfff23439eeb836.tar.gz |
merge consumer and (confirms, stats) monitors
-rw-r--r-- | src/rabbit_channel.erl | 74 |
1 files changed, 37 insertions, 37 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index fde44c4d..bcbb78d4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -275,7 +275,7 @@ handle_cast(terminate, State) -> handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), - noreply(monitor_consumer(ConsumerTag, State)); + noreply(consumer_maybe_monitor(ConsumerTag, State)); handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), @@ -563,8 +563,10 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack, State) -> {value, MsgSeqNos} -> MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), case gb_sets:is_empty(MsgSeqNos1) of - true -> maybe_demonitor(QPid, State), - gb_trees:delete(QPid, UQM); + true -> UQM2 = gb_trees:delete(QPid, UQM), + maybe_demonitor(QPid, + State#ch{unconfirmed_qm = UQM2}), + UQM2; false -> gb_trees:update(QPid, MsgSeqNos1, UQM) end; none -> @@ -747,12 +749,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, end) of {ok, Q} -> State1 = State#ch{consumer_mapping = - dict:store(ActualConsumerTag, - {Q, undefined}, + dict:store(ActualConsumerTag, Q, ConsumerMapping)}, {noreply, case NoWait of - true -> monitor_consumer(ActualConsumerTag, State1); + true -> consumer_maybe_monitor(ActualConsumerTag, State1); false -> State1 end}; {{error, exclusive_consume_unavailable}, _Q} -> @@ -775,16 +776,15 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, error -> %% Spec requires we ignore this situation. return_ok(State, NoWait, OkMsg); - {ok, {Q, MRef}} -> - ConsumerMonitors1 = - case MRef of - undefined -> ConsumerMonitors; - _ -> true = erlang:demonitor(MRef), - dict:erase(MRef, ConsumerMonitors) - end, + {ok, Q = #amqqueue{pid = QPid}} -> + ConsumerMonitors1 = case get({monitoring, QPid}) of + undefined -> ConsumerMonitors; + MRef -> dict:erase(MRef, ConsumerMonitors) + end, NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag, ConsumerMapping), consumer_monitors = ConsumerMonitors1}, + ok = maybe_demonitor(Q, NewState), %% In order to ensure that no more messages are sent to %% the consumer after the cancel_ok has been sent, we get %% the queue process to send the cancel_ok on our @@ -1125,43 +1125,44 @@ maybe_monitor(QPid) -> case get({monitoring, QPid}) of undefined -> MRef = erlang:monitor(process, QPid), put({monitoring, QPid}, MRef), - ok; - _ -> ok + MRef; + MRef -> MRef end. confirm_maybe_monitor(QPid, UQM) -> case gb_trees:is_defined(QPid, UQM) of true -> ok; - false -> maybe_monitor(QPid) - end. - -maybe_demonitor(QPid, #ch{stats_timer = StatsTimer}) -> - case get({monitoring, QPid}) of - undefined -> ok; - MRef -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, - if not StatsEnabled -> true = erlang:demonitor(MRef); - true -> ok - end, - ok + false -> maybe_monitor(QPid), + ok end. -monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, - consumer_monitors = ConsumerMonitors, - capabilities = Capabilities}) -> +consumer_maybe_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, + consumer_monitors = ConsumerMonitors, + capabilities = Capabilities}) -> case rabbit_misc:table_lookup( Capabilities, <<"consumer_cancel_notify">>) of {bool, true} -> - {#amqqueue{pid = QPid} = Q, undefined} = - dict:fetch(ConsumerTag, ConsumerMapping), - MRef = erlang:monitor(process, QPid), - State#ch{consumer_mapping = - dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping), - consumer_monitors = + #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping), + MRef = maybe_monitor(QPid), + State#ch{consumer_monitors = dict:store(MRef, ConsumerTag, ConsumerMonitors)}; _ -> State end. +maybe_demonitor(QPid, #ch{stats_timer = StatsTimer, + consumer_monitors = ConsumerMonitors}) -> + case get({monitoring, QPid}) of + undefined -> ok; + MRef -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, + ConsumerMonitored = dict:find(MRef, ConsumerMonitors) =/= error, + case not StatsEnabled and not ConsumerMonitored of + true -> true = erlang:demonitor(MRef); + false -> ok + end, + ok + end. + handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); @@ -1332,8 +1333,7 @@ limit_queues(Limiter, #ch{consumer_mapping = Consumers}) -> consumer_queues(Consumers) -> lists:usort([QPid || - {_Key, {#amqqueue{pid = QPid}, _MRef}} - <- dict:to_list(Consumers)]). + {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]). %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for |