summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-06-29 16:07:32 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-06-29 16:07:32 +0100
commit3e14d8d9d0ae179ac11a1ebb0b177830171593e2 (patch)
tree61a3a45a0e61487530df1767acedc4f0013a27da
parent1f1f7c80d75505c79912971d7b794a5add70b4a3 (diff)
downloadrabbitmq-server-3e14d8d9d0ae179ac11a1ebb0b177830171593e2.tar.gz
Rework queue declaration to avoid a race condition that could result in incorrect values in the queue.declare_ok
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_channel.erl89
3 files changed, 54 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index eebcfcb9..f0e536b5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -66,7 +66,7 @@
-spec(start/0 :: () -> 'ok').
-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(),
- maybe(pid())) -> amqqueue()).
+ maybe(pid())) -> {'new' | 'existing', amqqueue()}).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5fdf0ffa..70e6e755 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -137,7 +137,7 @@ declare(Recover, From,
backing_queue = BQ, backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
- Q -> gen_server2:reply(From, Q),
+ Q -> gen_server2:reply(From, {new, Q}),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use,
[self()]),
@@ -146,7 +146,7 @@ declare(Recover, From,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
noreply(State#q{backing_queue_state = BQS});
- Q1 -> {stop, normal, Q1, State}
+ Q1 -> {stop, normal, {existing, Q1}, State}
end.
terminate_shutdown(Fun, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8649ecc7..4a741be7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -284,17 +284,23 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
Reader ! {channel_exit, Channel, Reason},
State#ch{state = terminating}.
-return_queue_declare_ok(State, NoWait, Q) ->
- NewState = State#ch{most_recently_declared_queue =
- (Q#amqqueue.name)#resource.name},
+return_queue_declare_ok(Q = #amqqueue{name = #resource{name = ActualName}},
+ NoWait, MessageCount, ConsumerCount, Register, Owner,
+ State = #ch{queue_collector_pid = CollectorPid}) ->
+ %% We need to notify the reader within the channel process so that
+ %% we can be sure there are no outstanding exclusive queues being
+ %% declared as the connection shuts down.
+ ok = case Register andalso is_pid(Owner) of
+ true -> rabbit_reader_queue_collector:register_exclusive_queue(
+ CollectorPid, Q);
+ false -> ok
+ end,
+ NewState = State#ch{most_recently_declared_queue = ActualName},
case NoWait of
- true -> {noreply, NewState};
+ true ->
+ {noreply, NewState};
false ->
- {ok, ActualName, MessageCount, ConsumerCount} =
- rabbit_misc:with_exit_handler(
- fun () -> {ok, Q#amqqueue.name, 0, 0} end,
- fun () -> rabbit_amqqueue:stat(Q) end),
- Reply = #'queue.declare_ok'{queue = ActualName#resource.name,
+ Reply = #'queue.declare_ok'{queue = ActualName,
message_count = MessageCount,
consumer_count = ConsumerCount},
{reply, Reply, NewState}
@@ -716,10 +722,9 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
nowait = NoWait,
- arguments = Args},
+ arguments = Args} = Declare,
_, State = #ch{virtual_host = VHostPath,
- reader_pid = ReaderPid,
- queue_collector_pid = CollectorPid}) ->
+ reader_pid = ReaderPid}) ->
Owner = case ExclusiveDeclare of
true -> ReaderPid;
false -> none
@@ -731,30 +736,33 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- Q = case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
- Args, Owner) of
- #amqqueue{name = QueueName,
- durable = Durable1,
- auto_delete = AutoDelete1} = Q1
- when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
- check_exclusive_access(Q1, Owner, strict),
- check_configure_permitted(QueueName, State),
- %% We need to notify the reader within the channel
- %% process so that we can be sure there are no
- %% outstanding exclusive queues being declared as the
- %% connection shuts down.
- case Owner of
- none -> ok;
- _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q1)
- end,
- Q1;
- %% non-equivalence trumps exclusivity arbitrarily
- #amqqueue{name = QueueName} ->
- rabbit_misc:protocol_error(
- precondition_failed, "parameters for ~s not equivalent",
- [rabbit_misc:rs(QueueName)])
- end,
- return_queue_declare_ok(State, NoWait, Q);
+ case rabbit_amqqueue:with(QueueName,
+ fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end) of
+ {{ok, _ActualName, MessageCount1, ConsumerCount1},
+ #amqqueue{name = QueueName, durable = Durable1,
+ auto_delete = AutoDelete1} = Q1}
+ when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
+ check_exclusive_access(Q1, Owner, strict),
+ check_configure_permitted(QueueName, State),
+ return_queue_declare_ok(Q1, NoWait, MessageCount1, ConsumerCount1,
+ false, Owner, State);
+ {{ok, _ActualName, _MessageCount1, _ConsumerCount1},
+ #amqqueue{name = QueueName}} ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)]);
+ {error, not_found} ->
+ case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner) of
+ {new, Q1 = #amqqueue{}} ->
+ return_queue_declare_ok(Q1, NoWait, 0, 0, true, Owner,
+ State);
+ {existing, _Q1} ->
+ %% must have been created between the stat and the
+ %% declare. Loop around again.
+ handle_method(Declare, undefined, State)
+ end
+ end;
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
@@ -762,9 +770,14 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
_, State = #ch{virtual_host = VHostPath,
reader_pid = ReaderPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
+ {{ok, _ActualName, MessageCount, ConsumerCount},
+ #amqqueue{name = QueueName} = Q} =
+ rabbit_amqqueue:with_or_die(
+ QueueName, fun (Q1) -> {rabbit_amqqueue:stat(Q1), Q1} end),
check_configure_permitted(QueueName, State),
- Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end),
- return_queue_declare_ok(State, NoWait, Q);
+ check_exclusive_access(Q, ReaderPid, lax),
+ return_queue_declare_ok(Q, NoWait, MessageCount, ConsumerCount,
+ false, none, State);
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,