diff options
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r-- | src/rabbit_channel.erl | 142 |
1 files changed, 70 insertions, 72 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f4434ade..1d91494b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -284,20 +284,15 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> Reader ! {channel_exit, Channel, Reason}, State#ch{state = terminating}. -return_queue_declare_ok(State, NoWait, Q) -> - NewState = State#ch{most_recently_declared_queue = - (Q#amqqueue.name)#resource.name}, +return_queue_declare_ok(#resource{name = ActualName}, + NoWait, MessageCount, ConsumerCount, State) -> + NewState = State#ch{most_recently_declared_queue = ActualName}, case NoWait of true -> {noreply, NewState}; - false -> - {ok, ActualName, MessageCount, ConsumerCount} = - rabbit_misc:with_exit_handler( - fun () -> {ok, Q#amqqueue.name, 0, 0} end, - fun () -> rabbit_amqqueue:stat(Q) end), - Reply = #'queue.declare_ok'{queue = ActualName#resource.name, - message_count = MessageCount, - consumer_count = ConsumerCount}, - {reply, Reply, NewState} + false -> Reply = #'queue.declare_ok'{queue = ActualName, + message_count = MessageCount, + consumer_count = ConsumerCount}, + {reply, Reply, NewState} end. check_resource_access(Username, Resource, Perm) -> @@ -344,7 +339,7 @@ with_exclusive_access_or_die(QName, ReaderPid, F) -> expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( - not_allowed, "no previously declared queue", []); + not_found, "no previously declared queue", []); expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath, most_recently_declared_queue = MRDQ }) -> rabbit_misc:r(VHostPath, queue, MRDQ); @@ -354,7 +349,7 @@ expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) -> expand_routing_key_shortcut(<<>>, <<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( - not_allowed, "no previously declared queue", []); + not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, #ch{ most_recently_declared_queue = MRDQ }) -> MRDQ; @@ -447,13 +442,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routed -> ok; unroutable -> - %% FIXME: 312 should be replaced by the ?NO_ROUTE - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 312, <<"unroutable">>); + ok = basic_return(Message, WriterPid, no_route); not_delivered -> - %% FIXME: 313 should be replaced by the ?NO_CONSUMERS - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>) + ok = basic_return(Message, WriterPid, no_consumers) end, {noreply, case TxnKey of none -> State; @@ -608,9 +599,8 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, end, {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; -handle_method(#'basic.recover'{requeue = true}, - _, State = #ch{ transaction_id = none, - unacked_message_q = UAMQ }) -> +handle_method(#'basic.recover_async'{requeue = true}, + _, State = #ch{ unacked_message_q = UAMQ }) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> %% The Qpid python test suite incorrectly assumes @@ -620,12 +610,12 @@ handle_method(#'basic.recover'{requeue = true}, rabbit_amqqueue:requeue( QPid, lists:reverse(MsgIds), self()) end, ok, UAMQ), - %% No answer required, apparently! + %% No answer required - basic.recover is the newer, synchronous + %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; -handle_method(#'basic.recover'{requeue = false}, - _, State = #ch{ transaction_id = none, - writer_pid = WriterPid, +handle_method(#'basic.recover_async'{requeue = false}, + _, State = #ch{ writer_pid = WriterPid, unacked_message_q = UAMQ }) -> ok = rabbit_misc:queue_fold( fun ({_DeliveryTag, none, _Msg}, ok) -> @@ -645,12 +635,17 @@ handle_method(#'basic.recover'{requeue = false}, WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) end, ok, UAMQ), - %% No answer required, apparently! + %% No answer required - basic.recover is the newer, synchronous + %% variant of this method {noreply, State}; -handle_method(#'basic.recover'{}, _, _State) -> - rabbit_misc:protocol_error( - not_allowed, "attempt to recover a transactional channel",[]); +handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> + {noreply, State2 = #ch{writer_pid = WriterPid}} = + handle_method(#'basic.recover_async'{requeue = Requeue}, + Content, + State), + ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}), + {noreply, State2}; handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, @@ -716,7 +711,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, exclusive = ExclusiveDeclare, auto_delete = AutoDelete, nowait = NoWait, - arguments = Args}, + arguments = Args} = Declare, _, State = #ch{virtual_host = VHostPath, reader_pid = ReaderPid, queue_collector_pid = CollectorPid}) -> @@ -724,46 +719,43 @@ handle_method(#'queue.declare'{queue = QueueNameBin, true -> ReaderPid; false -> none end, - %% We use this in both branches, because queue_declare may yet return an - %% existing queue. - Finish = fun (#amqqueue{name = QueueName, - durable = Durable1, - auto_delete = AutoDelete1} = Q) - when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> - check_exclusive_access(Q, Owner, strict), - check_configure_permitted(QueueName, State), - %% We need to notify the reader within the channel - %% process so that we can be sure there are no - %% outstanding exclusive queues being declared as the - %% connection shuts down. - case Owner of - none -> ok; - _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) - end, - Q; - %% non-equivalence trumps exclusivity arbitrarily - (#amqqueue{name = QueueName}) -> - rabbit_misc:protocol_error( - precondition_failed, - "parameters for ~s not equivalent", - [rabbit_misc:rs(QueueName)]) - end, - Q = case rabbit_amqqueue:with( - rabbit_misc:r(VHostPath, queue, QueueNameBin), - Finish) of - {error, not_found} -> - ActualNameBin = - case QueueNameBin of + ActualNameBin = case QueueNameBin of <<>> -> rabbit_guid:binstring_guid("amq.gen"); Other -> check_name('queue', Other) end, - QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, - Args, Owner)); - #amqqueue{} = Other -> - Other - end, - return_queue_declare_ok(State, NoWait, Q); + QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), + check_configure_permitted(QueueName, State), + case rabbit_amqqueue:with(QueueName, + fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end) of + {{ok, QueueName, MessageCount, ConsumerCount}, + #amqqueue{durable = Durable1, auto_delete = AutoDelete1} = Q} + when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> + check_exclusive_access(Q, Owner, strict), + return_queue_declare_ok(QueueName, NoWait, MessageCount, + ConsumerCount, State); + {{ok, QueueName, _MessageCount, _ConsumerCount}, #amqqueue{}} -> + rabbit_misc:protocol_error( + precondition_failed, "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]); + {error, not_found} -> + case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner) of + {new, Q = #amqqueue{}} -> + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as + %% the connection shuts down. + ok = case Owner of + none -> ok; + _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) + end, + return_queue_declare_ok(QueueName, NoWait, 0, 0, State); + {existing, _Q} -> + %% must have been created between the stat and the + %% declare. Loop around again. + handle_method(Declare, none, State) + end + end; handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, @@ -772,8 +764,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end), - return_queue_declare_ok(State, NoWait, Q); + {{ok, QueueName, MessageCount, ConsumerCount}, #amqqueue{} = Q} = + rabbit_amqqueue:with_or_die( + QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), + check_exclusive_access(Q, ReaderPid, lax), + return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, + State); handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, @@ -946,7 +942,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}, - WriterPid, ReplyCode, ReplyText) -> + WriterPid, Reason) -> + {_Close, ReplyCode, ReplyText} = + rabbit_framing:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, #'basic.return'{reply_code = ReplyCode, |