diff options
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r-- | src/rabbit_channel.erl | 119 |
1 files changed, 57 insertions, 62 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index eeab1fb4..66326396 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -299,13 +299,24 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). -exclusive_access_or_locked(ReaderPid, Q) -> - case Q of - #amqqueue{ exclusive_owner = none} -> Q; - #amqqueue{ exclusive_owner = ReaderPid } -> Q; - _ -> rabbit_misc:protocol_error(resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(Q#amqqueue.name)]) +with_exclusive_access_or_die(QName, ReaderPid, F) -> + case rabbit_amqqueue:with_or_die( + QName, fun(Q) -> case Q of + #amqqueue{exclusive_owner = none} -> + F(Q); + #amqqueue{exclusive_owner = ReaderPid} -> + F(Q); + _ -> + {error, wrong_exclusive_owner} + end + end) of + {error, wrong_exclusive_owner} -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QName)]); + Else -> + Else end. expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> @@ -493,12 +504,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, %% In order to ensure that the consume_ok gets sent before %% any messages are sent to the consumer, we get the queue %% process to send the consume_ok on our behalf. - case rabbit_amqqueue:with_or_die( - QueueName, + case with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_consume( - exclusive_access_or_locked(ReaderPid, Q), - NoAck, self(), LimiterPid, + Q, NoAck, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -677,16 +687,16 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.delete_ok'{}) end; -handle_method(#'queue.declare'{queue = QueueNameBin, - passive = false, - durable = Durable, - exclusive = ExclusiveDeclare, +handle_method(#'queue.declare'{queue = QueueNameBin, + passive = false, + durable = Durable, + exclusive = ExclusiveDeclare, auto_delete = AutoDelete, nowait = NoWait, arguments = Args}, - _, State = #ch { virtual_host = VHostPath, - reader_pid = ReaderPid, - queue_collector_pid = CollectorPid }) -> + _, State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid, + queue_collector_pid = CollectorPid}) -> Owner = case ExclusiveDeclare of true -> ReaderPid; false -> none @@ -694,17 +704,13 @@ handle_method(#'queue.declare'{queue = QueueNameBin, %% We use this in both branches, because queue_declare may yet return an %% existing queue. Finish = - fun(Q) -> + fun(Q = #amqqueue{name = QueueName}) -> case Q of %% "equivalent" rule. NB: we don't pay attention to %% anything in the arguments table, so for the sake of the %% "equivalent" rule, all tables of arguments are %% semantically equivalant. - Matched = #amqqueue{name = QueueName, - durable = Durable, %% i.e., as supplied - exclusive_owner = Owner, - auto_delete = AutoDelete %% i.e,. as supplied - } -> + #amqqueue{exclusive_owner = Owner} -> check_configure_permitted(QueueName, State), %% We need to notify the reader within the channel %% process so that we can be sure there are no @@ -712,21 +718,17 @@ handle_method(#'queue.declare'{queue = QueueNameBin, %% connection shuts down. case Owner of none -> ok; - _ -> rabbit_reader_queue_collector:register_exclusive_queue( - CollectorPid, Matched) + _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue( + CollectorPid, Q) end, - Matched; + Q; %% exclusivity trumps non-equivalence arbitrarily - #amqqueue{name = QueueName, exclusive_owner = ExclusiveOwner} - when ExclusiveOwner =/= Owner -> - rabbit_misc:protocol_error(resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(QueueName)]); - #amqqueue{name = QueueName} -> - rabbit_misc:protocol_error(channel_error, - "parameters for ~s not equivalent", - [rabbit_misc:rs(QueueName)]) - end + #amqqueue{} -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]) + end end, Q = case rabbit_amqqueue:with( rabbit_misc:r(VHostPath, queue, QueueNameBin), @@ -738,37 +740,33 @@ handle_method(#'queue.declare'{queue = QueueNameBin, Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - check_configure_permitted(QueueName, State), - Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner)); - Found -> Found + Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner)); + #amqqueue{} = Other -> + Other end, return_queue_declare_ok(State, NoWait, Q); -handle_method(#'queue.declare'{queue = QueueNameBin, +handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, - nowait = NoWait}, - _, State = #ch{ virtual_host = VHostPath, - reader_pid = ReaderPid }) -> + nowait = NoWait}, + _, State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - CheckExclusive = fun(Q) -> exclusive_access_or_locked(ReaderPid, Q) end, - Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive), + Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun(Q) -> Q end), return_queue_declare_ok(State, NoWait, Q); handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty, - nowait = NoWait - }, - _, State = #ch{ reader_pid = ReaderPid }) -> + nowait = NoWait}, + _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), - case rabbit_amqqueue:with_or_die( - QueueName, - fun (Q) -> - rabbit_amqqueue:delete(exclusive_access_or_locked(ReaderPid, Q), - IfUnused, IfEmpty) - end) of + case with_exclusive_access_or_die( + QueueName, ReaderPid, + fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> rabbit_misc:protocol_error( precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]); @@ -800,15 +798,12 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, - _, State = #ch{ reader_pid = ReaderPid }) -> + _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( - QueueName, - fun (Q) -> - exclusive_access_or_locked(ReaderPid, Q), - rabbit_amqqueue:purge(Q) - end), + {ok, PurgedMessageCount} = with_exclusive_access_or_die( + QueueName, ReaderPid, + fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); |