summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-15 14:49:48 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-15 14:49:48 +0100
commit4d5c11c51bbc8d456c26878ace2b06cdece40248 (patch)
tree2a543792c0e9faed682d70d9c2931e019d6aec98
parent3a6381d2d1c79542f61d96b3c86d20b691bff514 (diff)
downloadrabbitmq-server-4d5c11c51bbc8d456c26878ace2b06cdece40248.tar.gz
rename consumer_monitors to queue_consumers
-rw-r--r--src/rabbit_channel.erl61
1 files changed, 32 insertions, 29 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2da6605e..9b5d7ec0 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,7 +36,7 @@
limiter, tx_status, next_tag,
unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
user, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
+ consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
unconfirmed_qm, confirmed, capabilities, trace_state,
queue_monitors}).
@@ -192,7 +192,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = gb_sets:new(),
- consumer_monitors = dict:new(),
+ queue_consumers = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
@@ -770,8 +770,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
- _, State = #ch{consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors}) ->
+ _, State = #ch{consumer_mapping = ConsumerMapping,
+ queue_consumers = QueueConsumers}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -779,16 +779,19 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
return_ok(State, NoWait, OkMsg);
{ok, Q = #amqqueue{pid = QPid}} ->
ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
- ConsumerMonitors1 =
- case dict:find(QPid, ConsumerMonitors) of
- error -> ConsumerMonitors;
+ QueueConsumers1 =
+ case dict:find(QPid, QueueConsumers) of
+ error -> QueueConsumers;
{ok, CTags} -> case gb_sets:size(CTags) of
- 1 -> dict:erase(QPid, ConsumerMonitors);
- _ -> dict:store(QPid, gb_sets:delete(ConsumerTag, CTags), ConsumerMonitors)
+ 1 -> dict:erase(QPid, QueueConsumers);
+ _ -> dict:store(QPid,
+ gb_sets:delete(ConsumerTag,
+ CTags),
+ QueueConsumers)
end
end,
- State1 = State#ch{consumer_mapping = ConsumerMapping1,
- consumer_monitors = ConsumerMonitors1},
+ State1 = State#ch{consumer_mapping = ConsumerMapping1,
+ queue_consumers = QueueConsumers1},
State2 = demonitor_queue(Q, State1),
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
@@ -1144,19 +1147,19 @@ monitor_queue(QPid, State = #ch{queue_monitors = QueueMonitors}) ->
end.
consumer_monitor(ConsumerTag,
- State = #ch{consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors,
- capabilities = Capabilities}) ->
+ State = #ch{consumer_mapping = ConsumerMapping,
+ queue_consumers = QueueConsumers,
+ capabilities = Capabilities}) ->
case rabbit_misc:table_lookup(
Capabilities, <<"consumer_cancel_notify">>) of
{bool, true} ->
#amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping),
- ConsumerMonitors1 =
+ QueueConsumers1 =
dict:update(QPid,
fun (CTags) -> gb_sets:insert(ConsumerTag, CTags) end,
gb_sets:singleton(ConsumerTag),
- ConsumerMonitors),
- monitor_queue(QPid, State#ch{consumer_monitors = ConsumerMonitors1});
+ QueueConsumers),
+ monitor_queue(QPid, State#ch{queue_consumers = QueueConsumers1});
_ ->
State
end.
@@ -1172,12 +1175,12 @@ demonitor_queue(QPid, State = #ch{queue_monitors = QueueMonitors}) ->
end
end.
-queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer,
- consumer_monitors = ConsumerMonitors,
- blocking = Blocking,
- unconfirmed_qm = UQM}) ->
+queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer,
+ queue_consumers = QueueConsumers,
+ blocking = Blocking,
+ unconfirmed_qm = UQM}) ->
StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
- ConsumerMonitored = dict:is_key(QPid, ConsumerMonitors),
+ ConsumerMonitored = dict:is_key(QPid, QueueConsumers),
QueueBlocked = gb_sets:is_element(QPid, Blocking),
ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored.
@@ -1205,14 +1208,14 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
SendFun(MXs, State2).
handle_consuming_queue_down(QPid,
- State = #ch{consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors,
- writer_pid = WriterPid}) ->
- ConsumerTags = case dict:find(QPid, ConsumerMonitors) of
+ State = #ch{consumer_mapping = ConsumerMapping,
+ queue_consumers = QueueConsumers,
+ writer_pid = WriterPid}) ->
+ ConsumerTags = case dict:find(QPid, QueueConsumers) of
error -> gb_sets:new();
{ok, CTags} -> CTags
end,
- ConsumerMonitors1 = dict:erase(QPid, ConsumerMonitors),
+ QueueConsumers1 = dict:erase(QPid, QueueConsumers),
ConsumerMapping1 = gb_sets:fold(fun (CTag, CMap) -> dict:erase(CTag, CMap) end,
ConsumerMapping, ConsumerTags),
[begin
@@ -1220,8 +1223,8 @@ handle_consuming_queue_down(QPid,
nowait = true},
ok = rabbit_writer:send_command(WriterPid, Cancel)
end || ConsumerTag <- gb_sets:to_list(ConsumerTags)],
- State#ch{consumer_mapping = ConsumerMapping1,
- consumer_monitors = ConsumerMonitors1}.
+ State#ch{consumer_mapping = ConsumerMapping1,
+ queue_consumers = QueueConsumers1}.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,