diff options
author | Michael Bridgen <mikeb@lshift.net> | 2009-10-22 17:17:05 +0100 |
---|---|---|
committer | Michael Bridgen <mikeb@lshift.net> | 2009-10-22 17:17:05 +0100 |
commit | c44103847c904f85fc0070fd3f1d93e966286e78 (patch) | |
tree | c017e65845abe6792b3bb0c50617da7e59b26de1 | |
parent | d94469f247495de0e455889a759eca8df43d2e03 (diff) | |
download | rabbitmq-server-c44103847c904f85fc0070fd3f1d93e966286e78.tar.gz |
- keep track of the owner in the amqqueue record. We need this so we
can check equivalence on queue.declare
- speaking of which, implement the "equivalent" rule of queue.declare
(bug 21832)
- queues are exclusive on creation, and don't change owner, so get rid
of claim_queue
- also check exclusive in the passive version of queue.declare; this
is explicitly mandated by the spec (queue.declare, exclusive)
- bug 21385: exclusive queues are deleted when their owners go away,
as per the spec. This didn't actually change implementation -- it
was wrong for 0-8
-rw-r--r-- | include/rabbit.hrl | 13 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 12 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 55 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 72 |
4 files changed, 70 insertions, 82 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index c94965f9..f8ff4778 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -51,7 +51,7 @@ -record(exchange, {name, type, durable, arguments}). --record(amqqueue, {name, durable, auto_delete, arguments, pid}). +-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). @@ -102,11 +102,12 @@ write :: regexp(), read :: regexp()}). -type(amqqueue() :: - #amqqueue{name :: queue_name(), - durable :: boolean(), - auto_delete :: boolean(), - arguments :: amqp_table(), - pid :: maybe(pid())}). + #amqqueue{name :: queue_name(), + durable :: boolean(), + auto_delete :: boolean(), + exclusive_owner :: maybe(pid()), + arguments :: amqp_table(), + pid :: maybe(pid())}). -type(exchange() :: #exchange{name :: exchange_name(), type :: exchange_type(), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1a5e82d7..52c54754 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,13 +31,12 @@ -module(rabbit_amqqueue). --export([start/0, recover/0, declare/4, delete/3, purge/1]). +-export([start/0, recover/0, declare/5, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). --export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). @@ -63,7 +62,7 @@ -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). --spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> +-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), maybe(pid())) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). @@ -91,7 +90,6 @@ -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). --spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), msg()} | 'empty'). -spec(basic_consume/8 :: @@ -151,11 +149,12 @@ recover_durable_queues() -> end)), ok. -declare(QueueName, Durable, AutoDelete, Args) -> +declare(QueueName, Durable, AutoDelete, Args, Owner) -> Q = start_queue_process(#amqqueue{name = QueueName, durable = Durable, auto_delete = AutoDelete, arguments = Args, + exclusive_owner = Owner, pid = none}), internal_declare(Q, true). @@ -286,9 +285,6 @@ limit_all(QPids, ChPid, LimiterPid) -> fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, QPids). -claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). - basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe2e8509..3a867f86 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -49,7 +49,6 @@ % Queue's state -record(q, {q, - owner, exclusive_consumer, has_had_consumers, next_msg_id, @@ -95,8 +94,11 @@ start_link(Q) -> init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), + case Q#amqqueue.exclusive_owner of + none -> ok; + ReaderPid -> erlang:monitor(process, ReaderPid) + end, {ok, #q{q = Q, - owner = none, exclusive_consumer = none, has_had_consumers = false, next_msg_id = 1, @@ -331,9 +333,9 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> cancel_holder(_ChPid, _ConsumerTag, Holder) -> Holder. -check_queue_owner(none, _) -> ok; -check_queue_owner({ReaderPid, _}, ReaderPid) -> ok; -check_queue_owner({_, _}, _) -> mismatch. +check_queue_owner(none, _) -> ok; +check_queue_owner(ReaderPid, ReaderPid) -> ok; +check_queue_owner(_, _) -> mismatch. check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; @@ -607,7 +609,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, - _From, State = #q{owner = Owner, + _From, State = #q{q = #amqqueue{exclusive_owner = Owner}, exclusive_consumer = ExistingHolder}) -> case check_queue_owner(Owner, ReaderPid) of mismatch -> @@ -705,29 +707,7 @@ handle_call({delete, IfUnused, IfEmpty}, _From, handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> ok = purge_message_buffer(qname(State), MessageBuffer), reply({ok, queue:len(MessageBuffer)}, - State#q{message_buffer = queue:new()}); - -handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, - exclusive_consumer = Holder}) -> - case Owner of - none -> - case check_exclusive_access(Holder, true, State) of - in_use -> - %% FIXME: Is this really the right answer? What if - %% an active consumer's reader is actually the - %% claiming pid? Should that be allowed? In order - %% to check, we'd need to hold not just the ch - %% pid for each consumer, but also its reader - %% pid... - reply(locked, State); - ok -> - reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}) - end; - {ReaderPid, _MonitorRef} -> - reply(ok, State); - _ -> - reply(locked, State) - end. + State#q{message_buffer = queue:new()}). handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. @@ -799,19 +779,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} end)). -handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, - State = #q{owner = {DownPid, MonitorRef}}) -> - %% We know here that there are no consumers on this queue that are - %% owned by other pids than the one that just went down, so since - %% exclusive in some sense implies autodelete, we delete the queue - %% here. The other way of implementing the "exclusive implies - %% autodelete" feature is to actually set autodelete when an - %% exclusive declaration is seen, but this has the problem that - %% the python tests rely on the queue not going away after a - %% basic.cancel when the queue was declared exclusive and - %% nonautodelete. - NewState = State#q{owner = none}, - {stop, normal, NewState}; +handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, + State = #q{q= #amqqueue{ exclusive_owner = DownPid}}) -> + %% Exclusively owned queues must disappear with their owner. + {stop, normal, State}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 26db0777..e54ba4ed 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -617,25 +617,37 @@ handle_method(#'queue.declare'{queue = QueueNameBin, arguments = Args}, _, State = #ch { virtual_host = VHostPath, reader_pid = ReaderPid }) -> - %% FIXME: atomic create&claim - Finish = - fun (Q) -> - if ExclusiveDeclare -> - case rabbit_amqqueue:claim_queue(Q, ReaderPid) of - locked -> - %% AMQP 0-8 doesn't say which - %% exception to use, so we mimic QPid - %% here. - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(Q#amqqueue.name)]); - ok -> ok - end; - true -> - ok - end, - Q + Owner = case ExclusiveDeclare of + true -> ReaderPid; + false -> none + end, + %% We use this in both branches, because queue_declare may yet return an + %% existing queue. + Finish = + fun(Q) -> + case Q of + %% "equivalent" rule. NB: we don't pay attention to + %% anything in the arguments table, so for the sake of the + %% "equivalent" rule, all tables of arguments are + %% semantically equivalant. + Matched = #amqqueue{name = QueueName, + durable = Durable, %% i.e., as supplied + exclusive_owner = Owner, + auto_delete = AutoDelete %% i.e,. as supplied + } -> + check_configure_permitted(QueueName, State), + Matched; + %% exclusivity trumps non-equivalence arbitrarily + #amqqueue{name = QueueName, exclusive_owner = ExclusiveOwner} + when ExclusiveOwner =/= Owner -> + rabbit_misc:protocol_error(resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]); + #amqqueue{name = QueueName} -> + rabbit_misc:protocol_error(channel_error, + "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]) + end end, Q = case rabbit_amqqueue:with( rabbit_misc:r(VHostPath, queue, QueueNameBin), @@ -648,21 +660,29 @@ handle_method(#'queue.declare'{queue = QueueNameBin, end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), check_configure_permitted(QueueName, State), - Finish(rabbit_amqqueue:declare(QueueName, - Durable, AutoDelete, Args)); - Other = #amqqueue{name = QueueName} -> - check_configure_permitted(QueueName, State), - Other + Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner)); + Found -> Found end, return_queue_declare_ok(State, NoWait, Q); handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, nowait = NoWait}, - _, State = #ch{ virtual_host = VHostPath }) -> + _, State = #ch{ virtual_host = VHostPath, + reader_pid = ReaderPid }) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end), + CheckExclusive = + fun(Q) -> + case Q of + #amqqueue{ exclusive_owner = none} -> Q; + #amqqueue{ exclusive_owner = ReaderPid } -> Q; + _ -> rabbit_misc:protocol_error(resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(Q#amqqueue.name)]) + end + end, + Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive), return_queue_declare_ok(State, NoWait, Q); handle_method(#'queue.delete'{queue = QueueNameBin, |