summaryrefslogtreecommitdiff
path: root/src/rabbit_channel.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r--src/rabbit_channel.erl34
1 files changed, 21 insertions, 13 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 6c04f4cd..dc37959b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -734,7 +734,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait,
- arguments = Arguments},
+ arguments = Args},
_, State = #ch{conn_pid = ConnPid,
limiter = Limiter,
consumer_mapping = ConsumerMapping}) ->
@@ -755,12 +755,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
+ {CreditArgs, OtherArgs} = parse_credit_args(Args),
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(),
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
ActualConsumerTag, ExclusiveConsume,
- parse_credit_args(Arguments),
+ CreditArgs, OtherArgs,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -934,7 +935,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
- rabbit_misc:not_found(ExchangeName);
+ return_ok(State, NoWait, #'exchange.delete_ok'{});
{error, in_use} ->
precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]);
ok ->
@@ -961,7 +962,7 @@ handle_method(#'exchange.unbind'{destination = DestinationNameBin,
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = false,
- durable = Durable,
+ durable = DurableDeclare,
exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
nowait = NoWait,
@@ -973,6 +974,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
true -> ConnPid;
false -> none
end,
+ Durable = DurableDeclare andalso not ExclusiveDeclare,
ActualNameBin = case QueueNameBin of
<<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
"amq.gen");
@@ -1052,9 +1054,15 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
_, State = #ch{conn_pid = ConnPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ConnPid,
- fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
+ case rabbit_amqqueue:with(
+ QueueName,
+ fun (Q) ->
+ rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
+ rabbit_amqqueue:delete(Q, IfUnused, IfEmpty)
+ end,
+ fun (not_found) -> {ok, 0};
+ ({absent, Q}) -> rabbit_misc:absent(Q)
+ end) of
{error, in_use} ->
precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
{error, not_empty} ->
@@ -1246,12 +1254,12 @@ handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
parse_credit_args(Arguments) ->
case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
- {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
- rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, Credit}, {boolean, Drain}} -> {Credit, Drain};
- _ -> none
- end;
- undefined -> none
+ {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
+ _ -> none
+ end, lists:keydelete(<<"x-credit">>, 1, Arguments)};
+ undefined -> {none, Arguments}
end.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,