diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-15 14:49:48 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-15 14:49:48 +0100 |
commit | 4d5c11c51bbc8d456c26878ace2b06cdece40248 (patch) | |
tree | 2a543792c0e9faed682d70d9c2931e019d6aec98 | |
parent | 3a6381d2d1c79542f61d96b3c86d20b691bff514 (diff) | |
download | rabbitmq-server-4d5c11c51bbc8d456c26878ace2b06cdece40248.tar.gz |
rename consumer_monitors to queue_consumers
-rw-r--r-- | src/rabbit_channel.erl | 61 |
1 files changed, 32 insertions, 29 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2da6605e..9b5d7ec0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,7 +36,7 @@ limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_ack_q, user, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, consumer_monitors, queue_collector_pid, + consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm, confirmed, capabilities, trace_state, queue_monitors}). @@ -192,7 +192,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), blocking = gb_sets:new(), - consumer_monitors = dict:new(), + queue_consumers = dict:new(), queue_collector_pid = CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, @@ -770,8 +770,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{consumer_mapping = ConsumerMapping, - consumer_monitors = ConsumerMonitors}) -> + _, State = #ch{consumer_mapping = ConsumerMapping, + queue_consumers = QueueConsumers}) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -779,16 +779,19 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, return_ok(State, NoWait, OkMsg); {ok, Q = #amqqueue{pid = QPid}} -> ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping), - ConsumerMonitors1 = - case dict:find(QPid, ConsumerMonitors) of - error -> ConsumerMonitors; + QueueConsumers1 = + case dict:find(QPid, QueueConsumers) of + error -> QueueConsumers; {ok, CTags} -> case gb_sets:size(CTags) of - 1 -> dict:erase(QPid, ConsumerMonitors); - _ -> dict:store(QPid, gb_sets:delete(ConsumerTag, CTags), ConsumerMonitors) + 1 -> dict:erase(QPid, QueueConsumers); + _ -> dict:store(QPid, + gb_sets:delete(ConsumerTag, + CTags), + QueueConsumers) end end, - State1 = State#ch{consumer_mapping = ConsumerMapping1, - consumer_monitors = ConsumerMonitors1}, + State1 = State#ch{consumer_mapping = ConsumerMapping1, + queue_consumers = QueueConsumers1}, State2 = demonitor_queue(Q, State1), %% In order to ensure that no more messages are sent to %% the consumer after the cancel_ok has been sent, we get @@ -1144,19 +1147,19 @@ monitor_queue(QPid, State = #ch{queue_monitors = QueueMonitors}) -> end. consumer_monitor(ConsumerTag, - State = #ch{consumer_mapping = ConsumerMapping, - consumer_monitors = ConsumerMonitors, - capabilities = Capabilities}) -> + State = #ch{consumer_mapping = ConsumerMapping, + queue_consumers = QueueConsumers, + capabilities = Capabilities}) -> case rabbit_misc:table_lookup( Capabilities, <<"consumer_cancel_notify">>) of {bool, true} -> #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping), - ConsumerMonitors1 = + QueueConsumers1 = dict:update(QPid, fun (CTags) -> gb_sets:insert(ConsumerTag, CTags) end, gb_sets:singleton(ConsumerTag), - ConsumerMonitors), - monitor_queue(QPid, State#ch{consumer_monitors = ConsumerMonitors1}); + QueueConsumers), + monitor_queue(QPid, State#ch{queue_consumers = QueueConsumers1}); _ -> State end. @@ -1172,12 +1175,12 @@ demonitor_queue(QPid, State = #ch{queue_monitors = QueueMonitors}) -> end end. -queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer, - consumer_monitors = ConsumerMonitors, - blocking = Blocking, - unconfirmed_qm = UQM}) -> +queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer, + queue_consumers = QueueConsumers, + blocking = Blocking, + unconfirmed_qm = UQM}) -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, - ConsumerMonitored = dict:is_key(QPid, ConsumerMonitors), + ConsumerMonitored = dict:is_key(QPid, QueueConsumers), QueueBlocked = gb_sets:is_element(QPid, Blocking), ConfirmMonitored = gb_trees:is_defined(QPid, UQM), StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored. @@ -1205,14 +1208,14 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> SendFun(MXs, State2). handle_consuming_queue_down(QPid, - State = #ch{consumer_mapping = ConsumerMapping, - consumer_monitors = ConsumerMonitors, - writer_pid = WriterPid}) -> - ConsumerTags = case dict:find(QPid, ConsumerMonitors) of + State = #ch{consumer_mapping = ConsumerMapping, + queue_consumers = QueueConsumers, + writer_pid = WriterPid}) -> + ConsumerTags = case dict:find(QPid, QueueConsumers) of error -> gb_sets:new(); {ok, CTags} -> CTags end, - ConsumerMonitors1 = dict:erase(QPid, ConsumerMonitors), + QueueConsumers1 = dict:erase(QPid, QueueConsumers), ConsumerMapping1 = gb_sets:fold(fun (CTag, CMap) -> dict:erase(CTag, CMap) end, ConsumerMapping, ConsumerTags), [begin @@ -1220,8 +1223,8 @@ handle_consuming_queue_down(QPid, nowait = true}, ok = rabbit_writer:send_command(WriterPid, Cancel) end || ConsumerTag <- gb_sets:to_list(ConsumerTags)], - State#ch{consumer_mapping = ConsumerMapping1, - consumer_monitors = ConsumerMonitors1}. + State#ch{consumer_mapping = ConsumerMapping1, + queue_consumers = QueueConsumers1}. binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, |