summaryrefslogtreecommitdiff
path: root/src/rabbit_channel.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-05-27 14:44:22 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-05-27 14:44:22 +0100
commit4e19085635ef3abe3c269af2b86b19a71720f99b (patch)
tree408c392857053fabcc6d7051aa5191fcb15c0d1d /src/rabbit_channel.erl
parentaa89b97aaa40a9a495e5979373f6f02537380d44 (diff)
parentbb58efaff44c4bb39f00eb1f591b8baa979ee2cd (diff)
downloadrabbitmq-server-4e19085635ef3abe3c269af2b86b19a71720f99b.tar.gz
Merging default into bug 21824
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r--src/rabbit_channel.erl35
1 files changed, 23 insertions, 12 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f23b6d9c..66326396 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--export([start_link/5, do/2, do/3, shutdown/1]).
+-export([start_link/6, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
@@ -46,7 +46,7 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking}).
+ consumer_mapping, blocking, queue_collector_pid}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -66,8 +66,8 @@
-ifdef(use_specs).
--spec(start_link/5 ::
- (channel_number(), pid(), pid(), username(), vhost()) -> pid()).
+-spec(start_link/6 ::
+ (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()).
-spec(do/2 :: (pid(), amqp_method()) -> 'ok').
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
@@ -86,10 +86,10 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost) ->
+start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
{ok, Pid} = gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid,
- Username, VHost], []),
+ Username, VHost, CollectorPid], []),
Pid.
do(Pid, Method) ->
@@ -135,7 +135,7 @@ info_all(Items) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
@@ -153,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
virtual_host = VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
- blocking = dict:new()},
+ blocking = dict:new(),
+ queue_collector_pid = CollectorPid},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -691,10 +692,11 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
durable = Durable,
exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
- nowait = NoWait,
- arguments = Args},
- _, State = #ch{virtual_host = VHostPath,
- reader_pid = ReaderPid}) ->
+ nowait = NoWait,
+ arguments = Args},
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid,
+ queue_collector_pid = CollectorPid}) ->
Owner = case ExclusiveDeclare of
true -> ReaderPid;
false -> none
@@ -710,6 +712,15 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% semantically equivalant.
#amqqueue{exclusive_owner = Owner} ->
check_configure_permitted(QueueName, State),
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as the
+ %% connection shuts down.
+ case Owner of
+ none -> ok;
+ _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(
+ CollectorPid, Q)
+ end,
Q;
%% exclusivity trumps non-equivalence arbitrarily
#amqqueue{} ->