summaryrefslogtreecommitdiff
path: root/src/rabbit_channel.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r--src/rabbit_channel.erl119
1 files changed, 57 insertions, 62 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index eeab1fb4..66326396 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -299,13 +299,24 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
-exclusive_access_or_locked(ReaderPid, Q) ->
- case Q of
- #amqqueue{ exclusive_owner = none} -> Q;
- #amqqueue{ exclusive_owner = ReaderPid } -> Q;
- _ -> rabbit_misc:protocol_error(resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(Q#amqqueue.name)])
+with_exclusive_access_or_die(QName, ReaderPid, F) ->
+ case rabbit_amqqueue:with_or_die(
+ QName, fun(Q) -> case Q of
+ #amqqueue{exclusive_owner = none} ->
+ F(Q);
+ #amqqueue{exclusive_owner = ReaderPid} ->
+ F(Q);
+ _ ->
+ {error, wrong_exclusive_owner}
+ end
+ end) of
+ {error, wrong_exclusive_owner} ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QName)]);
+ Else ->
+ Else
end.
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
@@ -493,12 +504,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% In order to ensure that the consume_ok gets sent before
%% any messages are sent to the consumer, we get the queue
%% process to send the consume_ok on our behalf.
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- exclusive_access_or_locked(ReaderPid, Q),
- NoAck, self(), LimiterPid,
+ Q, NoAck, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -677,16 +687,16 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
-handle_method(#'queue.declare'{queue = QueueNameBin,
- passive = false,
- durable = Durable,
- exclusive = ExclusiveDeclare,
+handle_method(#'queue.declare'{queue = QueueNameBin,
+ passive = false,
+ durable = Durable,
+ exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
nowait = NoWait,
arguments = Args},
- _, State = #ch { virtual_host = VHostPath,
- reader_pid = ReaderPid,
- queue_collector_pid = CollectorPid }) ->
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid,
+ queue_collector_pid = CollectorPid}) ->
Owner = case ExclusiveDeclare of
true -> ReaderPid;
false -> none
@@ -694,17 +704,13 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% We use this in both branches, because queue_declare may yet return an
%% existing queue.
Finish =
- fun(Q) ->
+ fun(Q = #amqqueue{name = QueueName}) ->
case Q of
%% "equivalent" rule. NB: we don't pay attention to
%% anything in the arguments table, so for the sake of the
%% "equivalent" rule, all tables of arguments are
%% semantically equivalant.
- Matched = #amqqueue{name = QueueName,
- durable = Durable, %% i.e., as supplied
- exclusive_owner = Owner,
- auto_delete = AutoDelete %% i.e,. as supplied
- } ->
+ #amqqueue{exclusive_owner = Owner} ->
check_configure_permitted(QueueName, State),
%% We need to notify the reader within the channel
%% process so that we can be sure there are no
@@ -712,21 +718,17 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% connection shuts down.
case Owner of
none -> ok;
- _ -> rabbit_reader_queue_collector:register_exclusive_queue(
- CollectorPid, Matched)
+ _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(
+ CollectorPid, Q)
end,
- Matched;
+ Q;
%% exclusivity trumps non-equivalence arbitrarily
- #amqqueue{name = QueueName, exclusive_owner = ExclusiveOwner}
- when ExclusiveOwner =/= Owner ->
- rabbit_misc:protocol_error(resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(QueueName)]);
- #amqqueue{name = QueueName} ->
- rabbit_misc:protocol_error(channel_error,
- "parameters for ~s not equivalent",
- [rabbit_misc:rs(QueueName)])
- end
+ #amqqueue{} ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)])
+ end
end,
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
@@ -738,37 +740,33 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- check_configure_permitted(QueueName, State),
- Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner));
- Found -> Found
+ Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner));
+ #amqqueue{} = Other ->
+ Other
end,
return_queue_declare_ok(State, NoWait, Q);
-handle_method(#'queue.declare'{queue = QueueNameBin,
+handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
- nowait = NoWait},
- _, State = #ch{ virtual_host = VHostPath,
- reader_pid = ReaderPid }) ->
+ nowait = NoWait},
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- CheckExclusive = fun(Q) -> exclusive_access_or_locked(ReaderPid, Q) end,
- Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive),
+ Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun(Q) -> Q end),
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty,
- nowait = NoWait
- },
- _, State = #ch{ reader_pid = ReaderPid }) ->
+ nowait = NoWait},
+ _, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with_or_die(
- QueueName,
- fun (Q) ->
- rabbit_amqqueue:delete(exclusive_access_or_locked(ReaderPid, Q),
- IfUnused, IfEmpty)
- end) of
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
+ fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
rabbit_misc:protocol_error(
precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]);
@@ -800,15 +798,12 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
- _, State = #ch{ reader_pid = ReaderPid }) ->
+ _, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
- QueueName,
- fun (Q) ->
- exclusive_access_or_locked(ReaderPid, Q),
- rabbit_amqqueue:purge(Q)
- end),
+ {ok, PurgedMessageCount} = with_exclusive_access_or_die(
+ QueueName, ReaderPid,
+ fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});