diff options
author | Simon MacMullen <simon@lshift.net> | 2010-05-20 13:59:01 +0100 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-05-20 13:59:01 +0100 |
commit | f223711ad70b62f21c5cdbdff2d4472fe1959629 (patch) | |
tree | 3a74f2df4a25b258e4ffd07a58707fe091b0d7a2 | |
parent | 7691d5ad3340077af860cecad258a46565bfe346 (diff) | |
download | rabbitmq-server-f223711ad70b62f21c5cdbdff2d4472fe1959629.tar.gz |
Cherry-pick 9f4bf96b07a3f6e066792d92a1c3252a1d98842c.
-rw-r--r-- | include/rabbit.hrl | 13 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 12 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 61 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 72 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 4 |
5 files changed, 74 insertions, 88 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 145f6104..cf33b6fd 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -51,7 +51,7 @@ -record(exchange, {name, type, durable, auto_delete, arguments}). --record(amqqueue, {name, durable, auto_delete, arguments, pid}). +-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). @@ -104,11 +104,12 @@ write :: regexp(), read :: regexp()}). -type(amqqueue() :: - #amqqueue{name :: queue_name(), - durable :: boolean(), - auto_delete :: boolean(), - arguments :: amqp_table(), - pid :: maybe(pid())}). + #amqqueue{name :: queue_name(), + durable :: boolean(), + auto_delete :: boolean(), + exclusive_owner :: maybe(pid()), + arguments :: amqp_table(), + pid :: maybe(pid())}). -type(exchange() :: #exchange{name :: exchange_name(), type :: exchange_type(), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7b88c45d..3a9ff074 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -module(rabbit_amqqueue). --export([start/0, declare/4, delete/3, purge/1]). +-export([start/0, declare/5, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, update_ram_duration/1, set_ram_duration_target/2, @@ -41,7 +41,6 @@ stat/1, stat_all/0, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). --export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). @@ -66,7 +65,7 @@ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -spec(start/0 :: () -> 'ok'). --spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> +-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), maybe(pid())) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). @@ -97,7 +96,6 @@ -spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). --spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/8 :: @@ -148,11 +146,12 @@ recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. -declare(QueueName, Durable, AutoDelete, Args) -> +declare(QueueName, Durable, AutoDelete, Args, Owner) -> Q = start_queue_process(#amqqueue{name = QueueName, durable = Durable, auto_delete = AutoDelete, arguments = Args, + exclusive_owner = Owner, pid = none}), case gen_server2:call(Q#amqqueue.pid, {init, false}) of not_found -> rabbit_misc:not_found(QueueName); @@ -298,9 +297,6 @@ limit_all(QPids, ChPid, LimiterPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end). -claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - delegate_call(QPid, {claim_queue, ReaderPid}, infinity). - basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f12e1b70..b23ccbe5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -50,7 +50,6 @@ % Queue's state -record(q, {q, - owner, exclusive_consumer, has_had_consumers, backing_queue, @@ -104,7 +103,6 @@ init(Q) -> {ok, BQ} = application:get_env(backing_queue_module), {ok, #q{q = Q#amqqueue{pid = self()}, - owner = none, exclusive_consumer = none, has_had_consumers = false, backing_queue = BQ, @@ -433,9 +431,9 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> cancel_holder(_ChPid, _ConsumerTag, Holder) -> Holder. -check_queue_owner(none, _) -> ok; -check_queue_owner({ReaderPid, _}, ReaderPid) -> ok; -check_queue_owner({_, _}, _) -> mismatch. +check_queue_owner(none, _) -> ok; +check_queue_owner(ReaderPid, ReaderPid) -> ok; +check_queue_owner(_, _) -> mismatch. check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; @@ -488,9 +486,9 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; i(pid, _) -> self(); -i(owner_pid, #q{owner = none}) -> +i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) -> ''; -i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) -> +i(owner_pid, #q{q = #amqqueue{exclusive_owner = ReaderPid}}) -> ReaderPid; i(exclusive_consumer_pid, #q{exclusive_consumer = none}) -> ''; @@ -520,8 +518,13 @@ i(Item, _) -> %--------------------------------------------------------------------------- handle_call({init, Recover}, From, - State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, + State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable, + exclusive_owner = ExclusiveOwner}, backing_queue = BQ, backing_queue_state = undefined}) -> + case ExclusiveOwner of + none -> ok; + ReaderPid -> erlang:monitor(process, ReaderPid) + end, %% TODO: If we're exclusively owned && our owner isn't alive && %% Recover then we should BQ:init and then {stop, normal, %% not_found, State}, relying on terminate to delete the queue. @@ -615,7 +618,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, - _From, State = #q{owner = Owner, + _From, State = #q{q = #amqqueue{exclusive_owner = Owner}, exclusive_consumer = ExistingHolder}) -> case check_queue_owner(Owner, ReaderPid) of mismatch -> @@ -713,29 +716,6 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, {Count, BQS1} = BQ:purge(BQS), reply({ok, Count}, State#q{backing_queue_state = BQS1}); -handle_call({claim_queue, ReaderPid}, _From, - State = #q{owner = Owner, exclusive_consumer = Holder}) -> - case Owner of - none -> - case check_exclusive_access(Holder, true, State) of - in_use -> - %% FIXME: Is this really the right answer? What if - %% an active consumer's reader is actually the - %% claiming pid? Should that be allowed? In order - %% to check, we'd need to hold not just the ch - %% pid for each consumer, but also its reader - %% pid... - reply(locked, State); - ok -> - MonitorRef = erlang:monitor(process, ReaderPid), - reply(ok, State#q{owner = {ReaderPid, MonitorRef}}) - end; - {ReaderPid, _MonitorRef} -> - reply(ok, State); - _ -> - reply(locked, State) - end; - handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). @@ -825,19 +805,10 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State). -handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, - State = #q{owner = {DownPid, MonitorRef}}) -> - %% We know here that there are no consumers on this queue that are - %% owned by other pids than the one that just went down, so since - %% exclusive in some sense implies autodelete, we delete the queue - %% here. The other way of implementing the "exclusive implies - %% autodelete" feature is to actually set autodelete when an - %% exclusive declaration is seen, but this has the problem that - %% the python tests rely on the queue not going away after a - %% basic.cancel when the queue was declared exclusive and - %% nonautodelete. - NewState = State#q{owner = none}, - {stop, normal, NewState}; +handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, + State = #q{q= #amqqueue{ exclusive_owner = DownPid}}) -> + %% Exclusively owned queues must disappear with their owner. + {stop, normal, State}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> case handle_ch_down(DownPid, State) of {ok, NewState} -> noreply(NewState); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a48db9c8..5a06f513 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -683,25 +683,37 @@ handle_method(#'queue.declare'{queue = QueueNameBin, arguments = Args}, _, State = #ch { virtual_host = VHostPath, reader_pid = ReaderPid }) -> - %% FIXME: atomic create&claim - Finish = - fun (Q) -> - if ExclusiveDeclare -> - case rabbit_amqqueue:claim_queue(Q, ReaderPid) of - locked -> - %% AMQP 0-8 doesn't say which - %% exception to use, so we mimic QPid - %% here. - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(Q#amqqueue.name)]); - ok -> ok - end; - true -> - ok - end, - Q + Owner = case ExclusiveDeclare of + true -> ReaderPid; + false -> none + end, + %% We use this in both branches, because queue_declare may yet return an + %% existing queue. + Finish = + fun(Q) -> + case Q of + %% "equivalent" rule. NB: we don't pay attention to + %% anything in the arguments table, so for the sake of the + %% "equivalent" rule, all tables of arguments are + %% semantically equivalant. + Matched = #amqqueue{name = QueueName, + durable = Durable, %% i.e., as supplied + exclusive_owner = Owner, + auto_delete = AutoDelete %% i.e,. as supplied + } -> + check_configure_permitted(QueueName, State), + Matched; + %% exclusivity trumps non-equivalence arbitrarily + #amqqueue{name = QueueName, exclusive_owner = ExclusiveOwner} + when ExclusiveOwner =/= Owner -> + rabbit_misc:protocol_error(resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]); + #amqqueue{name = QueueName} -> + rabbit_misc:protocol_error(channel_error, + "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]) + end end, Q = case rabbit_amqqueue:with( rabbit_misc:r(VHostPath, queue, QueueNameBin), @@ -714,21 +726,29 @@ handle_method(#'queue.declare'{queue = QueueNameBin, end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), check_configure_permitted(QueueName, State), - Finish(rabbit_amqqueue:declare(QueueName, - Durable, AutoDelete, Args)); - Other = #amqqueue{name = QueueName} -> - check_configure_permitted(QueueName, State), - Other + Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner)); + Found -> Found end, return_queue_declare_ok(State, NoWait, Q); handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, nowait = NoWait}, - _, State = #ch{ virtual_host = VHostPath }) -> + _, State = #ch{ virtual_host = VHostPath, + reader_pid = ReaderPid }) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end), + CheckExclusive = + fun(Q) -> + case Q of + #amqqueue{ exclusive_owner = none} -> Q; + #amqqueue{ exclusive_owner = ReaderPid } -> Q; + _ -> rabbit_misc:protocol_error(resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(Q#amqqueue.name)]) + end + end, + Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive), return_queue_declare_ok(State, NoWait, Q); handle_method(#'queue.delete'{queue = QueueNameBin, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 76ebd982..8064d06b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -746,16 +746,14 @@ test_user_management() -> passed. test_server_status() -> - %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>), [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, Name), - false, false, []) || + false, false, [], none) || Name <- [<<"foo">>, <<"bar">>]], - ok = rabbit_amqqueue:claim_queue(Q, self()), ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined, <<"ctag">>, true, undefined), |