summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@lshift.net>2009-10-22 17:17:05 +0100
committerMichael Bridgen <mikeb@lshift.net>2009-10-22 17:17:05 +0100
commitc44103847c904f85fc0070fd3f1d93e966286e78 (patch)
treec017e65845abe6792b3bb0c50617da7e59b26de1
parentd94469f247495de0e455889a759eca8df43d2e03 (diff)
downloadrabbitmq-server-c44103847c904f85fc0070fd3f1d93e966286e78.tar.gz
- keep track of the owner in the amqqueue record. We need this so we
can check equivalence on queue.declare - speaking of which, implement the "equivalent" rule of queue.declare (bug 21832) - queues are exclusive on creation, and don't change owner, so get rid of claim_queue - also check exclusive in the passive version of queue.declare; this is explicitly mandated by the spec (queue.declare, exclusive) - bug 21385: exclusive queues are deleted when their owners go away, as per the spec. This didn't actually change implementation -- it was wrong for 0-8
-rw-r--r--include/rabbit.hrl13
-rw-r--r--src/rabbit_amqqueue.erl12
-rw-r--r--src/rabbit_amqqueue_process.erl55
-rw-r--r--src/rabbit_channel.erl72
4 files changed, 70 insertions, 82 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index c94965f9..f8ff4778 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -51,7 +51,7 @@
-record(exchange, {name, type, durable, arguments}).
--record(amqqueue, {name, durable, auto_delete, arguments, pid}).
+-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
@@ -102,11 +102,12 @@
write :: regexp(),
read :: regexp()}).
-type(amqqueue() ::
- #amqqueue{name :: queue_name(),
- durable :: boolean(),
- auto_delete :: boolean(),
- arguments :: amqp_table(),
- pid :: maybe(pid())}).
+ #amqqueue{name :: queue_name(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ exclusive_owner :: maybe(pid()),
+ arguments :: amqp_table(),
+ pid :: maybe(pid())}).
-type(exchange() ::
#exchange{name :: exchange_name(),
type :: exchange_type(),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1a5e82d7..52c54754 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,13 +31,12 @@
-module(rabbit_amqqueue).
--export([start/0, recover/0, declare/4, delete/3, purge/1]).
+-export([start/0, recover/0, declare/5, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
--export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
@@ -63,7 +62,7 @@
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
--spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
+-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), maybe(pid())) ->
amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
@@ -91,7 +90,6 @@
-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
--spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
-spec(basic_consume/8 ::
@@ -151,11 +149,12 @@ recover_durable_queues() ->
end)),
ok.
-declare(QueueName, Durable, AutoDelete, Args) ->
+declare(QueueName, Durable, AutoDelete, Args, Owner) ->
Q = start_queue_process(#amqqueue{name = QueueName,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
+ exclusive_owner = Owner,
pid = none}),
internal_declare(Q, true).
@@ -286,9 +285,6 @@ limit_all(QPids, ChPid, LimiterPid) ->
fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end,
QPids).
-claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity).
-
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe2e8509..3a867f86 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -49,7 +49,6 @@
% Queue's state
-record(q, {q,
- owner,
exclusive_consumer,
has_had_consumers,
next_msg_id,
@@ -95,8 +94,11 @@ start_link(Q) ->
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
+ case Q#amqqueue.exclusive_owner of
+ none -> ok;
+ ReaderPid -> erlang:monitor(process, ReaderPid)
+ end,
{ok, #q{q = Q,
- owner = none,
exclusive_consumer = none,
has_had_consumers = false,
next_msg_id = 1,
@@ -331,9 +333,9 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
Holder.
-check_queue_owner(none, _) -> ok;
-check_queue_owner({ReaderPid, _}, ReaderPid) -> ok;
-check_queue_owner({_, _}, _) -> mismatch.
+check_queue_owner(none, _) -> ok;
+check_queue_owner(ReaderPid, ReaderPid) -> ok;
+check_queue_owner(_, _) -> mismatch.
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
@@ -607,7 +609,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{owner = Owner,
+ _From, State = #q{q = #amqqueue{exclusive_owner = Owner},
exclusive_consumer = ExistingHolder}) ->
case check_queue_owner(Owner, ReaderPid) of
mismatch ->
@@ -705,29 +707,7 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
ok = purge_message_buffer(qname(State), MessageBuffer),
reply({ok, queue:len(MessageBuffer)},
- State#q{message_buffer = queue:new()});
-
-handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
- exclusive_consumer = Holder}) ->
- case Owner of
- none ->
- case check_exclusive_access(Holder, true, State) of
- in_use ->
- %% FIXME: Is this really the right answer? What if
- %% an active consumer's reader is actually the
- %% claiming pid? Should that be allowed? In order
- %% to check, we'd need to hold not just the ch
- %% pid for each consumer, but also its reader
- %% pid...
- reply(locked, State);
- ok ->
- reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}})
- end;
- {ReaderPid, _MonitorRef} ->
- reply(ok, State);
- _ ->
- reply(locked, State)
- end.
+ State#q{message_buffer = queue:new()}).
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
@@ -799,19 +779,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
end)).
-handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
- State = #q{owner = {DownPid, MonitorRef}}) ->
- %% We know here that there are no consumers on this queue that are
- %% owned by other pids than the one that just went down, so since
- %% exclusive in some sense implies autodelete, we delete the queue
- %% here. The other way of implementing the "exclusive implies
- %% autodelete" feature is to actually set autodelete when an
- %% exclusive declaration is seen, but this has the problem that
- %% the python tests rely on the queue not going away after a
- %% basic.cancel when the queue was declared exclusive and
- %% nonautodelete.
- NewState = State#q{owner = none},
- {stop, normal, NewState};
+handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
+ State = #q{q= #amqqueue{ exclusive_owner = DownPid}}) ->
+ %% Exclusively owned queues must disappear with their owner.
+ {stop, normal, State};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 26db0777..e54ba4ed 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -617,25 +617,37 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
arguments = Args},
_, State = #ch { virtual_host = VHostPath,
reader_pid = ReaderPid }) ->
- %% FIXME: atomic create&claim
- Finish =
- fun (Q) ->
- if ExclusiveDeclare ->
- case rabbit_amqqueue:claim_queue(Q, ReaderPid) of
- locked ->
- %% AMQP 0-8 doesn't say which
- %% exception to use, so we mimic QPid
- %% here.
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(Q#amqqueue.name)]);
- ok -> ok
- end;
- true ->
- ok
- end,
- Q
+ Owner = case ExclusiveDeclare of
+ true -> ReaderPid;
+ false -> none
+ end,
+ %% We use this in both branches, because queue_declare may yet return an
+ %% existing queue.
+ Finish =
+ fun(Q) ->
+ case Q of
+ %% "equivalent" rule. NB: we don't pay attention to
+ %% anything in the arguments table, so for the sake of the
+ %% "equivalent" rule, all tables of arguments are
+ %% semantically equivalant.
+ Matched = #amqqueue{name = QueueName,
+ durable = Durable, %% i.e., as supplied
+ exclusive_owner = Owner,
+ auto_delete = AutoDelete %% i.e,. as supplied
+ } ->
+ check_configure_permitted(QueueName, State),
+ Matched;
+ %% exclusivity trumps non-equivalence arbitrarily
+ #amqqueue{name = QueueName, exclusive_owner = ExclusiveOwner}
+ when ExclusiveOwner =/= Owner ->
+ rabbit_misc:protocol_error(resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)]);
+ #amqqueue{name = QueueName} ->
+ rabbit_misc:protocol_error(channel_error,
+ "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)])
+ end
end,
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
@@ -648,21 +660,29 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
check_configure_permitted(QueueName, State),
- Finish(rabbit_amqqueue:declare(QueueName,
- Durable, AutoDelete, Args));
- Other = #amqqueue{name = QueueName} ->
- check_configure_permitted(QueueName, State),
- Other
+ Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner));
+ Found -> Found
end,
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
nowait = NoWait},
- _, State = #ch{ virtual_host = VHostPath }) ->
+ _, State = #ch{ virtual_host = VHostPath,
+ reader_pid = ReaderPid }) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end),
+ CheckExclusive =
+ fun(Q) ->
+ case Q of
+ #amqqueue{ exclusive_owner = none} -> Q;
+ #amqqueue{ exclusive_owner = ReaderPid } -> Q;
+ _ -> rabbit_misc:protocol_error(resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(Q#amqqueue.name)])
+ end
+ end,
+ Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive),
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.delete'{queue = QueueNameBin,