summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-05-20 13:59:01 +0100
committerSimon MacMullen <simon@lshift.net>2010-05-20 13:59:01 +0100
commitf223711ad70b62f21c5cdbdff2d4472fe1959629 (patch)
tree3a74f2df4a25b258e4ffd07a58707fe091b0d7a2
parent7691d5ad3340077af860cecad258a46565bfe346 (diff)
downloadrabbitmq-server-f223711ad70b62f21c5cdbdff2d4472fe1959629.tar.gz
Cherry-pick 9f4bf96b07a3f6e066792d92a1c3252a1d98842c.
-rw-r--r--include/rabbit.hrl13
-rw-r--r--src/rabbit_amqqueue.erl12
-rw-r--r--src/rabbit_amqqueue_process.erl61
-rw-r--r--src/rabbit_channel.erl72
-rw-r--r--src/rabbit_tests.erl4
5 files changed, 74 insertions, 88 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 145f6104..cf33b6fd 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -51,7 +51,7 @@
-record(exchange, {name, type, durable, auto_delete, arguments}).
--record(amqqueue, {name, durable, auto_delete, arguments, pid}).
+-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
@@ -104,11 +104,12 @@
write :: regexp(),
read :: regexp()}).
-type(amqqueue() ::
- #amqqueue{name :: queue_name(),
- durable :: boolean(),
- auto_delete :: boolean(),
- arguments :: amqp_table(),
- pid :: maybe(pid())}).
+ #amqqueue{name :: queue_name(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ exclusive_owner :: maybe(pid()),
+ arguments :: amqp_table(),
+ pid :: maybe(pid())}).
-type(exchange() ::
#exchange{name :: exchange_name(),
type :: exchange_type(),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7b88c45d..3a9ff074 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, declare/4, delete/3, purge/1]).
+-export([start/0, declare/5, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
@@ -41,7 +41,6 @@
stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
--export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
@@ -66,7 +65,7 @@
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
--spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
+-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), maybe(pid())) ->
amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
@@ -97,7 +96,6 @@
-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
--spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(basic_consume/8 ::
@@ -148,11 +146,12 @@ recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
[Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
-declare(QueueName, Durable, AutoDelete, Args) ->
+declare(QueueName, Durable, AutoDelete, Args, Owner) ->
Q = start_queue_process(#amqqueue{name = QueueName,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
+ exclusive_owner = Owner,
pid = none}),
case gen_server2:call(Q#amqqueue.pid, {init, false}) of
not_found -> rabbit_misc:not_found(QueueName);
@@ -298,9 +297,6 @@ limit_all(QPids, ChPid, LimiterPid) ->
gen_server2:cast(QPid, {limit, ChPid, LimiterPid})
end).
-claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- delegate_call(QPid, {claim_queue, ReaderPid}, infinity).
-
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f12e1b70..b23ccbe5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -50,7 +50,6 @@
% Queue's state
-record(q, {q,
- owner,
exclusive_consumer,
has_had_consumers,
backing_queue,
@@ -104,7 +103,6 @@ init(Q) ->
{ok, BQ} = application:get_env(backing_queue_module),
{ok, #q{q = Q#amqqueue{pid = self()},
- owner = none,
exclusive_consumer = none,
has_had_consumers = false,
backing_queue = BQ,
@@ -433,9 +431,9 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
Holder.
-check_queue_owner(none, _) -> ok;
-check_queue_owner({ReaderPid, _}, ReaderPid) -> ok;
-check_queue_owner({_, _}, _) -> mismatch.
+check_queue_owner(none, _) -> ok;
+check_queue_owner(ReaderPid, ReaderPid) -> ok;
+check_queue_owner(_, _) -> mismatch.
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
@@ -488,9 +486,9 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
i(pid, _) ->
self();
-i(owner_pid, #q{owner = none}) ->
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
-i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) ->
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = ReaderPid}}) ->
ReaderPid;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
@@ -520,8 +518,13 @@ i(Item, _) ->
%---------------------------------------------------------------------------
handle_call({init, Recover}, From,
- State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
+ State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable,
+ exclusive_owner = ExclusiveOwner},
backing_queue = BQ, backing_queue_state = undefined}) ->
+ case ExclusiveOwner of
+ none -> ok;
+ ReaderPid -> erlang:monitor(process, ReaderPid)
+ end,
%% TODO: If we're exclusively owned && our owner isn't alive &&
%% Recover then we should BQ:init and then {stop, normal,
%% not_found, State}, relying on terminate to delete the queue.
@@ -615,7 +618,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{owner = Owner,
+ _From, State = #q{q = #amqqueue{exclusive_owner = Owner},
exclusive_consumer = ExistingHolder}) ->
case check_queue_owner(Owner, ReaderPid) of
mismatch ->
@@ -713,29 +716,6 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
{Count, BQS1} = BQ:purge(BQS),
reply({ok, Count}, State#q{backing_queue_state = BQS1});
-handle_call({claim_queue, ReaderPid}, _From,
- State = #q{owner = Owner, exclusive_consumer = Holder}) ->
- case Owner of
- none ->
- case check_exclusive_access(Holder, true, State) of
- in_use ->
- %% FIXME: Is this really the right answer? What if
- %% an active consumer's reader is actually the
- %% claiming pid? Should that be allowed? In order
- %% to check, we'd need to hold not just the ch
- %% pid for each consumer, but also its reader
- %% pid...
- reply(locked, State);
- ok ->
- MonitorRef = erlang:monitor(process, ReaderPid),
- reply(ok, State#q{owner = {ReaderPid, MonitorRef}})
- end;
- {ReaderPid, _MonitorRef} ->
- reply(ok, State);
- _ ->
- reply(locked, State)
- end;
-
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
@@ -825,19 +805,10 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State).
-handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
- State = #q{owner = {DownPid, MonitorRef}}) ->
- %% We know here that there are no consumers on this queue that are
- %% owned by other pids than the one that just went down, so since
- %% exclusive in some sense implies autodelete, we delete the queue
- %% here. The other way of implementing the "exclusive implies
- %% autodelete" feature is to actually set autodelete when an
- %% exclusive declaration is seen, but this has the problem that
- %% the python tests rely on the queue not going away after a
- %% basic.cancel when the queue was declared exclusive and
- %% nonautodelete.
- NewState = State#q{owner = none},
- {stop, normal, NewState};
+handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
+ State = #q{q= #amqqueue{ exclusive_owner = DownPid}}) ->
+ %% Exclusively owned queues must disappear with their owner.
+ {stop, normal, State};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
{ok, NewState} -> noreply(NewState);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a48db9c8..5a06f513 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -683,25 +683,37 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
arguments = Args},
_, State = #ch { virtual_host = VHostPath,
reader_pid = ReaderPid }) ->
- %% FIXME: atomic create&claim
- Finish =
- fun (Q) ->
- if ExclusiveDeclare ->
- case rabbit_amqqueue:claim_queue(Q, ReaderPid) of
- locked ->
- %% AMQP 0-8 doesn't say which
- %% exception to use, so we mimic QPid
- %% here.
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(Q#amqqueue.name)]);
- ok -> ok
- end;
- true ->
- ok
- end,
- Q
+ Owner = case ExclusiveDeclare of
+ true -> ReaderPid;
+ false -> none
+ end,
+ %% We use this in both branches, because queue_declare may yet return an
+ %% existing queue.
+ Finish =
+ fun(Q) ->
+ case Q of
+ %% "equivalent" rule. NB: we don't pay attention to
+ %% anything in the arguments table, so for the sake of the
+ %% "equivalent" rule, all tables of arguments are
+ %% semantically equivalant.
+ Matched = #amqqueue{name = QueueName,
+ durable = Durable, %% i.e., as supplied
+ exclusive_owner = Owner,
+ auto_delete = AutoDelete %% i.e,. as supplied
+ } ->
+ check_configure_permitted(QueueName, State),
+ Matched;
+ %% exclusivity trumps non-equivalence arbitrarily
+ #amqqueue{name = QueueName, exclusive_owner = ExclusiveOwner}
+ when ExclusiveOwner =/= Owner ->
+ rabbit_misc:protocol_error(resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)]);
+ #amqqueue{name = QueueName} ->
+ rabbit_misc:protocol_error(channel_error,
+ "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)])
+ end
end,
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
@@ -714,21 +726,29 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
check_configure_permitted(QueueName, State),
- Finish(rabbit_amqqueue:declare(QueueName,
- Durable, AutoDelete, Args));
- Other = #amqqueue{name = QueueName} ->
- check_configure_permitted(QueueName, State),
- Other
+ Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner));
+ Found -> Found
end,
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
nowait = NoWait},
- _, State = #ch{ virtual_host = VHostPath }) ->
+ _, State = #ch{ virtual_host = VHostPath,
+ reader_pid = ReaderPid }) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end),
+ CheckExclusive =
+ fun(Q) ->
+ case Q of
+ #amqqueue{ exclusive_owner = none} -> Q;
+ #amqqueue{ exclusive_owner = ReaderPid } -> Q;
+ _ -> rabbit_misc:protocol_error(resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(Q#amqqueue.name)])
+ end
+ end,
+ Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive),
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.delete'{queue = QueueNameBin,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 76ebd982..8064d06b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -746,16 +746,14 @@ test_user_management() ->
passed.
test_server_status() ->
-
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
[Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
- false, false, []) ||
+ false, false, [], none) ||
Name <- [<<"foo">>, <<"bar">>]],
- ok = rabbit_amqqueue:claim_queue(Q, self()),
ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined,
<<"ctag">>, true, undefined),