diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-27 14:44:22 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-27 14:44:22 +0100 |
commit | 4e19085635ef3abe3c269af2b86b19a71720f99b (patch) | |
tree | 408c392857053fabcc6d7051aa5191fcb15c0d1d /src/rabbit_channel.erl | |
parent | aa89b97aaa40a9a495e5979373f6f02537380d44 (diff) | |
parent | bb58efaff44c4bb39f00eb1f591b8baa979ee2cd (diff) | |
download | rabbitmq-server-4e19085635ef3abe3c269af2b86b19a71720f99b.tar.gz |
Merging default into bug 21824
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r-- | src/rabbit_channel.erl | 35 |
1 files changed, 23 insertions, 12 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f23b6d9c..66326396 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/5, do/2, do/3, shutdown/1]). +-export([start_link/6, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). @@ -46,7 +46,7 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking}). + consumer_mapping, blocking, queue_collector_pid}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -66,8 +66,8 @@ -ifdef(use_specs). --spec(start_link/5 :: - (channel_number(), pid(), pid(), username(), vhost()) -> pid()). +-spec(start_link/6 :: + (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()). -spec(do/2 :: (pid(), amqp_method()) -> 'ok'). -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). @@ -86,10 +86,10 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Username, VHost) -> +start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> {ok, Pid} = gen_server2:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, - Username, VHost], []), + Username, VHost, CollectorPid], []), Pid. do(Pid, Method) -> @@ -135,7 +135,7 @@ info_all(Items) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Username, VHost]) -> +init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> process_flag(trap_exit, true), link(WriterPid), ok = pg_local:join(rabbit_channels, self()), @@ -153,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> virtual_host = VHost, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), - blocking = dict:new()}, + blocking = dict:new(), + queue_collector_pid = CollectorPid}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -691,10 +692,11 @@ handle_method(#'queue.declare'{queue = QueueNameBin, durable = Durable, exclusive = ExclusiveDeclare, auto_delete = AutoDelete, - nowait = NoWait, - arguments = Args}, - _, State = #ch{virtual_host = VHostPath, - reader_pid = ReaderPid}) -> + nowait = NoWait, + arguments = Args}, + _, State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid, + queue_collector_pid = CollectorPid}) -> Owner = case ExclusiveDeclare of true -> ReaderPid; false -> none @@ -710,6 +712,15 @@ handle_method(#'queue.declare'{queue = QueueNameBin, %% semantically equivalant. #amqqueue{exclusive_owner = Owner} -> check_configure_permitted(QueueName, State), + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as the + %% connection shuts down. + case Owner of + none -> ok; + _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue( + CollectorPid, Q) + end, Q; %% exclusivity trumps non-equivalence arbitrarily #amqqueue{} -> |