From fe76518a13ba6e8e41eb0d5d5d69155d24384d25 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 30 Jun 2010 12:35:37 +0100 Subject: Refactoring and cosmetics --- src/rabbit_channel.erl | 70 +++++++++++++++++++++++--------------------------- 1 file changed, 32 insertions(+), 38 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 703a0690..179a9a9d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -284,17 +284,8 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> Reader ! {channel_exit, Channel, Reason}, State#ch{state = terminating}. -return_queue_declare_ok(Q = #amqqueue{name = #resource{name = ActualName}}, - NoWait, MessageCount, ConsumerCount, Register, Owner, - State = #ch{queue_collector_pid = CollectorPid}) -> - %% We need to notify the reader within the channel process so that - %% we can be sure there are no outstanding exclusive queues being - %% declared as the connection shuts down. - ok = case Register andalso is_pid(Owner) of - true -> rabbit_reader_queue_collector:register_exclusive_queue( - CollectorPid, Q); - false -> ok - end, +return_queue_declare_ok(#resource{name = ActualName}, + NoWait, MessageCount, ConsumerCount, State) -> NewState = State#ch{most_recently_declared_queue = ActualName}, case NoWait of true -> {noreply, NewState}; @@ -714,21 +705,20 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.delete_ok'{}) end; -handle_method(#'queue.declare'{queue = QueueNameBin, - passive = false, - durable = Durable, - exclusive = ExclusiveDeclare, - auto_delete = AutoDelete, - nowait = NoWait, - arguments = Args} = Declare, +handle_method(Declare = #'queue.declare'{queue = QueueNameBin, + passive = false, + durable = Durable, + exclusive = ExclusiveDeclare, + auto_delete = AutoDelete, + nowait = NoWait, + arguments = Args}, _, State = #ch{virtual_host = VHostPath, - reader_pid = ReaderPid}) -> + reader_pid = ReaderPid, + queue_collector_pid = CollectorPid}) -> Owner = case ExclusiveDeclare of true -> ReaderPid; false -> none end, - %% We use this in both branches, because queue_declare may yet return an - %% existing queue. ActualNameBin = case QueueNameBin of <<>> -> rabbit_guid:binstring_guid("amq.gen"); Other -> check_name('queue', Other) @@ -737,25 +727,30 @@ handle_method(#'queue.declare'{queue = QueueNameBin, check_configure_permitted(QueueName, State), case rabbit_amqqueue:with(QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end) of - {{ok, _ActualName, MessageCount, ConsumerCount}, - #amqqueue{name = QueueName, durable = Durable1, - auto_delete = AutoDelete1} = Q1} + {{ok, QueueName, MessageCount, ConsumerCount}, + #amqqueue{durable = Durable1, auto_delete = AutoDelete1} = Q} when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> - check_exclusive_access(Q1, Owner, strict), - return_queue_declare_ok(Q1, NoWait, MessageCount, ConsumerCount, - false, Owner, State); - {{ok, _ActualName, _MessageCount, _ConsumerCount}, - #amqqueue{name = QueueName}} -> + check_exclusive_access(Q, Owner, strict), + return_queue_declare_ok(QueueName, NoWait, MessageCount, + ConsumerCount, State); + {{ok, QueueName, _MessageCount, _ConsumerCount}, #amqqueue{}} -> rabbit_misc:protocol_error( precondition_failed, "parameters for ~s not equivalent", [rabbit_misc:rs(QueueName)]); {error, not_found} -> case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner) of - {new, Q1 = #amqqueue{}} -> - return_queue_declare_ok(Q1, NoWait, 0, 0, true, Owner, - State); - {existing, _Q1} -> + {new, Q = #amqqueue{}} -> + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as + %% the connection shuts down. + ok = case Owner of + none -> ok; + _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) + end, + return_queue_declare_ok(QueueName, NoWait, 0, 0, State); + {existing, _Q} -> %% must have been created between the stat and the %% declare. Loop around again. handle_method(Declare, undefined, State) @@ -769,13 +764,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - {{ok, _ActualName, MessageCount, ConsumerCount}, - #amqqueue{name = QueueName} = Q} = + {{ok, QueueName, MessageCount, ConsumerCount}, #amqqueue{} = Q} = rabbit_amqqueue:with_or_die( - QueueName, fun (Q1) -> {rabbit_amqqueue:stat(Q1), Q1} end), + QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), check_exclusive_access(Q, ReaderPid, lax), - return_queue_declare_ok(Q, NoWait, MessageCount, ConsumerCount, - false, none, State); + return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, + State); handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, -- cgit v1.2.1