diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-17 17:01:03 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-17 17:01:03 +0000 |
commit | 1bafead0212d17e41198121a83ed44ea1bd506b8 (patch) | |
tree | 332d5ecf0914f1a3bbc5d4730dc63e07069d7bbe | |
parent | f3139c0e04b8056f8cc152aa9795ad522c90e882 (diff) | |
download | rabbitmq-server-1bafead0212d17e41198121a83ed44ea1bd506b8.tar.gz |
Maybe monitor queues on consume, maybe unmonitor on cancel
-rw-r--r-- | src/rabbit_channel.erl | 65 |
1 files changed, 44 insertions, 21 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a6790b6c..346ec371 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,9 +33,9 @@ start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed, confirmed, - capabilities}). + consumer_mapping, blocking, consumer_monitors, queue_collector_pid, + stats_timer, confirm_enabled, publish_seqno, unconfirmed, + confirmed, capabilities}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -171,6 +171,7 @@ init([Channel, ReaderPid, WriterPid, User, VHost, Capabilities, CollectorPid, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), blocking = dict:new(), + consumer_monitors = dict:new(), queue_collector_pid = CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, @@ -646,9 +647,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait}, - _, State = #ch{reader_pid = ReaderPid, - limiter_pid = LimiterPid, - consumer_mapping = ConsumerMapping }) -> + _, State = #ch{reader_pid = ReaderPid, + limiter_pid = LimiterPid, + consumer_mapping = ConsumerMapping, + consumer_monitors = ConsumerMonitors, + capabilities = Capabilities}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), @@ -665,18 +668,31 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> - rabbit_amqqueue:basic_consume( - Q, NoAck, self(), LimiterPid, - ActualConsumerTag, ExclusiveConsume, - ok_msg(NoWait, #'basic.consume_ok'{ - consumer_tag = ActualConsumerTag})) + {rabbit_amqqueue:basic_consume( + Q, NoAck, self(), LimiterPid, + ActualConsumerTag, ExclusiveConsume, + ok_msg(NoWait, #'basic.consume_ok'{ + consumer_tag = ActualConsumerTag})), + Q#amqqueue.pid} end) of - ok -> + {ok, QPid} -> + {ConsumerMonitors1, MRef} = + case rabbit_misc:table_lookup( + Capabilities, + <<"consumer_death_notification">>) of + {bool, true} -> + MRef1 = erlang:monitor(process, QPid), + {dict:store(MRef1, ActualConsumerTag, + ConsumerMonitors), MRef1}; + _ -> + {ConsumerMonitors, undefined} + end, {noreply, State#ch{consumer_mapping = dict:store(ActualConsumerTag, - QueueName, - ConsumerMapping)}}; - {error, exclusive_consume_unavailable} -> + {QueueName, MRef}, + ConsumerMapping), + consumer_monitors = ConsumerMonitors1}}; + {{error, exclusive_consume_unavailable}, _QPid} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", [rabbit_misc:rs(QueueName)]) @@ -689,16 +705,23 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{consumer_mapping = ConsumerMapping }) -> + _, State = #ch{consumer_mapping = ConsumerMapping, + consumer_monitors = ConsumerMonitors}) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> %% Spec requires we ignore this situation. return_ok(State, NoWait, OkMsg); - {ok, QueueName} -> - NewState = State#ch{consumer_mapping = - dict:erase(ConsumerTag, - ConsumerMapping)}, + {ok, {QueueName, MRef}} -> + ConsumerMonitors1 = + case MRef of + undefined -> ConsumerMonitors; + _ -> true = erlang:demonitor(MRef), + dict:erase(MRef, ConsumerMonitors) + end, + NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag, + ConsumerMapping), + consumer_monitors = ConsumerMonitors1}, case rabbit_amqqueue:with( QueueName, fun (Q) -> @@ -1208,7 +1231,7 @@ limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> consumer_queues(Consumers) -> [QPid || QueueName <- sets:to_list( - dict:fold(fun (_ConsumerTag, QueueName, S) -> + dict:fold(fun (_ConsumerTag, {QueueName, _MRef}, S) -> sets:add_element(QueueName, S) end, sets:new(), Consumers)), case rabbit_amqqueue:lookup(QueueName) of |