summaryrefslogtreecommitdiff
path: root/src/rabbit_channel.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r--src/rabbit_channel.erl142
1 files changed, 70 insertions, 72 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f4434ade..1d91494b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -284,20 +284,15 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
Reader ! {channel_exit, Channel, Reason},
State#ch{state = terminating}.
-return_queue_declare_ok(State, NoWait, Q) ->
- NewState = State#ch{most_recently_declared_queue =
- (Q#amqqueue.name)#resource.name},
+return_queue_declare_ok(#resource{name = ActualName},
+ NoWait, MessageCount, ConsumerCount, State) ->
+ NewState = State#ch{most_recently_declared_queue = ActualName},
case NoWait of
true -> {noreply, NewState};
- false ->
- {ok, ActualName, MessageCount, ConsumerCount} =
- rabbit_misc:with_exit_handler(
- fun () -> {ok, Q#amqqueue.name, 0, 0} end,
- fun () -> rabbit_amqqueue:stat(Q) end),
- Reply = #'queue.declare_ok'{queue = ActualName#resource.name,
- message_count = MessageCount,
- consumer_count = ConsumerCount},
- {reply, Reply, NewState}
+ false -> Reply = #'queue.declare_ok'{queue = ActualName,
+ message_count = MessageCount,
+ consumer_count = ConsumerCount},
+ {reply, Reply, NewState}
end.
check_resource_access(Username, Resource, Perm) ->
@@ -344,7 +339,7 @@ with_exclusive_access_or_die(QName, ReaderPid, F) ->
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
- not_allowed, "no previously declared queue", []);
+ not_found, "no previously declared queue", []);
expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath,
most_recently_declared_queue = MRDQ }) ->
rabbit_misc:r(VHostPath, queue, MRDQ);
@@ -354,7 +349,7 @@ expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) ->
expand_routing_key_shortcut(<<>>, <<>>,
#ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
- not_allowed, "no previously declared queue", []);
+ not_found, "no previously declared queue", []);
expand_routing_key_shortcut(<<>>, <<>>,
#ch{ most_recently_declared_queue = MRDQ }) ->
MRDQ;
@@ -447,13 +442,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routed ->
ok;
unroutable ->
- %% FIXME: 312 should be replaced by the ?NO_ROUTE
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 312, <<"unroutable">>);
+ ok = basic_return(Message, WriterPid, no_route);
not_delivered ->
- %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>)
+ ok = basic_return(Message, WriterPid, no_consumers)
end,
{noreply, case TxnKey of
none -> State;
@@ -608,9 +599,8 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
end,
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
-handle_method(#'basic.recover'{requeue = true},
- _, State = #ch{ transaction_id = none,
- unacked_message_q = UAMQ }) ->
+handle_method(#'basic.recover_async'{requeue = true},
+ _, State = #ch{ unacked_message_q = UAMQ }) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
%% The Qpid python test suite incorrectly assumes
@@ -620,12 +610,12 @@ handle_method(#'basic.recover'{requeue = true},
rabbit_amqqueue:requeue(
QPid, lists:reverse(MsgIds), self())
end, ok, UAMQ),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
-handle_method(#'basic.recover'{requeue = false},
- _, State = #ch{ transaction_id = none,
- writer_pid = WriterPid,
+handle_method(#'basic.recover_async'{requeue = false},
+ _, State = #ch{ writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
ok = rabbit_misc:queue_fold(
fun ({_DeliveryTag, none, _Msg}, ok) ->
@@ -645,12 +635,17 @@ handle_method(#'basic.recover'{requeue = false},
WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
end, ok, UAMQ),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State};
-handle_method(#'basic.recover'{}, _, _State) ->
- rabbit_misc:protocol_error(
- not_allowed, "attempt to recover a transactional channel",[]);
+handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
+ {noreply, State2 = #ch{writer_pid = WriterPid}} =
+ handle_method(#'basic.recover_async'{requeue = Requeue},
+ Content,
+ State),
+ ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}),
+ {noreply, State2};
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
@@ -716,7 +711,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
nowait = NoWait,
- arguments = Args},
+ arguments = Args} = Declare,
_, State = #ch{virtual_host = VHostPath,
reader_pid = ReaderPid,
queue_collector_pid = CollectorPid}) ->
@@ -724,46 +719,43 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
true -> ReaderPid;
false -> none
end,
- %% We use this in both branches, because queue_declare may yet return an
- %% existing queue.
- Finish = fun (#amqqueue{name = QueueName,
- durable = Durable1,
- auto_delete = AutoDelete1} = Q)
- when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
- check_exclusive_access(Q, Owner, strict),
- check_configure_permitted(QueueName, State),
- %% We need to notify the reader within the channel
- %% process so that we can be sure there are no
- %% outstanding exclusive queues being declared as the
- %% connection shuts down.
- case Owner of
- none -> ok;
- _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
- end,
- Q;
- %% non-equivalence trumps exclusivity arbitrarily
- (#amqqueue{name = QueueName}) ->
- rabbit_misc:protocol_error(
- precondition_failed,
- "parameters for ~s not equivalent",
- [rabbit_misc:rs(QueueName)])
- end,
- Q = case rabbit_amqqueue:with(
- rabbit_misc:r(VHostPath, queue, QueueNameBin),
- Finish) of
- {error, not_found} ->
- ActualNameBin =
- case QueueNameBin of
+ ActualNameBin = case QueueNameBin of
<<>> -> rabbit_guid:binstring_guid("amq.gen");
Other -> check_name('queue', Other)
end,
- QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
- Args, Owner));
- #amqqueue{} = Other ->
- Other
- end,
- return_queue_declare_ok(State, NoWait, Q);
+ QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
+ check_configure_permitted(QueueName, State),
+ case rabbit_amqqueue:with(QueueName,
+ fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end) of
+ {{ok, QueueName, MessageCount, ConsumerCount},
+ #amqqueue{durable = Durable1, auto_delete = AutoDelete1} = Q}
+ when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
+ check_exclusive_access(Q, Owner, strict),
+ return_queue_declare_ok(QueueName, NoWait, MessageCount,
+ ConsumerCount, State);
+ {{ok, QueueName, _MessageCount, _ConsumerCount}, #amqqueue{}} ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)]);
+ {error, not_found} ->
+ case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner) of
+ {new, Q = #amqqueue{}} ->
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as
+ %% the connection shuts down.
+ ok = case Owner of
+ none -> ok;
+ _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
+ end,
+ return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
+ {existing, _Q} ->
+ %% must have been created between the stat and the
+ %% declare. Loop around again.
+ handle_method(Declare, none, State)
+ end
+ end;
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
@@ -772,8 +764,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
reader_pid = ReaderPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end),
- return_queue_declare_ok(State, NoWait, Q);
+ {{ok, QueueName, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
+ rabbit_amqqueue:with_or_die(
+ QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
+ check_exclusive_access(Q, ReaderPid, lax),
+ return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
+ State);
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
@@ -946,7 +942,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content},
- WriterPid, ReplyCode, ReplyText) ->
+ WriterPid, Reason) ->
+ {_Close, ReplyCode, ReplyText} =
+ rabbit_framing:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.return'{reply_code = ReplyCode,