summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-02-17 17:01:03 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-02-17 17:01:03 +0000
commit1bafead0212d17e41198121a83ed44ea1bd506b8 (patch)
tree332d5ecf0914f1a3bbc5d4730dc63e07069d7bbe
parentf3139c0e04b8056f8cc152aa9795ad522c90e882 (diff)
downloadrabbitmq-server-1bafead0212d17e41198121a83ed44ea1bd506b8.tar.gz
Maybe monitor queues on consume, maybe unmonitor on cancel
-rw-r--r--src/rabbit_channel.erl65
1 files changed, 44 insertions, 21 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a6790b6c..346ec371 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,9 +33,9 @@
start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed, confirmed,
- capabilities}).
+ consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
+ stats_timer, confirm_enabled, publish_seqno, unconfirmed,
+ confirmed, capabilities}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -171,6 +171,7 @@ init([Channel, ReaderPid, WriterPid, User, VHost, Capabilities, CollectorPid,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = dict:new(),
+ consumer_monitors = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
@@ -646,9 +647,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait},
- _, State = #ch{reader_pid = ReaderPid,
- limiter_pid = LimiterPid,
- consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{reader_pid = ReaderPid,
+ limiter_pid = LimiterPid,
+ consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors,
+ capabilities = Capabilities}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
@@ -665,18 +668,31 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) ->
- rabbit_amqqueue:basic_consume(
- Q, NoAck, self(), LimiterPid,
- ActualConsumerTag, ExclusiveConsume,
- ok_msg(NoWait, #'basic.consume_ok'{
- consumer_tag = ActualConsumerTag}))
+ {rabbit_amqqueue:basic_consume(
+ Q, NoAck, self(), LimiterPid,
+ ActualConsumerTag, ExclusiveConsume,
+ ok_msg(NoWait, #'basic.consume_ok'{
+ consumer_tag = ActualConsumerTag})),
+ Q#amqqueue.pid}
end) of
- ok ->
+ {ok, QPid} ->
+ {ConsumerMonitors1, MRef} =
+ case rabbit_misc:table_lookup(
+ Capabilities,
+ <<"consumer_death_notification">>) of
+ {bool, true} ->
+ MRef1 = erlang:monitor(process, QPid),
+ {dict:store(MRef1, ActualConsumerTag,
+ ConsumerMonitors), MRef1};
+ _ ->
+ {ConsumerMonitors, undefined}
+ end,
{noreply, State#ch{consumer_mapping =
dict:store(ActualConsumerTag,
- QueueName,
- ConsumerMapping)}};
- {error, exclusive_consume_unavailable} ->
+ {QueueName, MRef},
+ ConsumerMapping),
+ consumer_monitors = ConsumerMonitors1}};
+ {{error, exclusive_consume_unavailable}, _QPid} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
[rabbit_misc:rs(QueueName)])
@@ -689,16 +705,23 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
- _, State = #ch{consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
- {ok, QueueName} ->
- NewState = State#ch{consumer_mapping =
- dict:erase(ConsumerTag,
- ConsumerMapping)},
+ {ok, {QueueName, MRef}} ->
+ ConsumerMonitors1 =
+ case MRef of
+ undefined -> ConsumerMonitors;
+ _ -> true = erlang:demonitor(MRef),
+ dict:erase(MRef, ConsumerMonitors)
+ end,
+ NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag,
+ ConsumerMapping),
+ consumer_monitors = ConsumerMonitors1},
case rabbit_amqqueue:with(
QueueName,
fun (Q) ->
@@ -1208,7 +1231,7 @@ limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
consumer_queues(Consumers) ->
[QPid || QueueName <-
sets:to_list(
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
+ dict:fold(fun (_ConsumerTag, {QueueName, _MRef}, S) ->
sets:add_element(QueueName, S)
end, sets:new(), Consumers)),
case rabbit_amqqueue:lookup(QueueName) of