diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-29 16:07:32 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-29 16:07:32 +0100 |
commit | 3e14d8d9d0ae179ac11a1ebb0b177830171593e2 (patch) | |
tree | 61a3a45a0e61487530df1767acedc4f0013a27da | |
parent | 1f1f7c80d75505c79912971d7b794a5add70b4a3 (diff) | |
download | rabbitmq-server-3e14d8d9d0ae179ac11a1ebb0b177830171593e2.tar.gz |
Rework queue declaration to avoid a race condition that could result in incorrect values in the queue.declare_ok
-rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 89 |
3 files changed, 54 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index eebcfcb9..f0e536b5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -66,7 +66,7 @@ -spec(start/0 :: () -> 'ok'). -spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), - maybe(pid())) -> amqqueue()). + maybe(pid())) -> {'new' | 'existing', amqqueue()}). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5fdf0ffa..70e6e755 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -137,7 +137,7 @@ declare(Recover, From, backing_queue = BQ, backing_queue_state = undefined}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; - Q -> gen_server2:reply(From, Q), + Q -> gen_server2:reply(From, {new, Q}), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [self()]), @@ -146,7 +146,7 @@ declare(Recover, From, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), noreply(State#q{backing_queue_state = BQS}); - Q1 -> {stop, normal, Q1, State} + Q1 -> {stop, normal, {existing, Q1}, State} end. terminate_shutdown(Fun, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8649ecc7..4a741be7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -284,17 +284,23 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> Reader ! {channel_exit, Channel, Reason}, State#ch{state = terminating}. -return_queue_declare_ok(State, NoWait, Q) -> - NewState = State#ch{most_recently_declared_queue = - (Q#amqqueue.name)#resource.name}, +return_queue_declare_ok(Q = #amqqueue{name = #resource{name = ActualName}}, + NoWait, MessageCount, ConsumerCount, Register, Owner, + State = #ch{queue_collector_pid = CollectorPid}) -> + %% We need to notify the reader within the channel process so that + %% we can be sure there are no outstanding exclusive queues being + %% declared as the connection shuts down. + ok = case Register andalso is_pid(Owner) of + true -> rabbit_reader_queue_collector:register_exclusive_queue( + CollectorPid, Q); + false -> ok + end, + NewState = State#ch{most_recently_declared_queue = ActualName}, case NoWait of - true -> {noreply, NewState}; + true -> + {noreply, NewState}; false -> - {ok, ActualName, MessageCount, ConsumerCount} = - rabbit_misc:with_exit_handler( - fun () -> {ok, Q#amqqueue.name, 0, 0} end, - fun () -> rabbit_amqqueue:stat(Q) end), - Reply = #'queue.declare_ok'{queue = ActualName#resource.name, + Reply = #'queue.declare_ok'{queue = ActualName, message_count = MessageCount, consumer_count = ConsumerCount}, {reply, Reply, NewState} @@ -716,10 +722,9 @@ handle_method(#'queue.declare'{queue = QueueNameBin, exclusive = ExclusiveDeclare, auto_delete = AutoDelete, nowait = NoWait, - arguments = Args}, + arguments = Args} = Declare, _, State = #ch{virtual_host = VHostPath, - reader_pid = ReaderPid, - queue_collector_pid = CollectorPid}) -> + reader_pid = ReaderPid}) -> Owner = case ExclusiveDeclare of true -> ReaderPid; false -> none @@ -731,30 +736,33 @@ handle_method(#'queue.declare'{queue = QueueNameBin, Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - Q = case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, - Args, Owner) of - #amqqueue{name = QueueName, - durable = Durable1, - auto_delete = AutoDelete1} = Q1 - when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> - check_exclusive_access(Q1, Owner, strict), - check_configure_permitted(QueueName, State), - %% We need to notify the reader within the channel - %% process so that we can be sure there are no - %% outstanding exclusive queues being declared as the - %% connection shuts down. - case Owner of - none -> ok; - _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q1) - end, - Q1; - %% non-equivalence trumps exclusivity arbitrarily - #amqqueue{name = QueueName} -> - rabbit_misc:protocol_error( - precondition_failed, "parameters for ~s not equivalent", - [rabbit_misc:rs(QueueName)]) - end, - return_queue_declare_ok(State, NoWait, Q); + case rabbit_amqqueue:with(QueueName, + fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end) of + {{ok, _ActualName, MessageCount1, ConsumerCount1}, + #amqqueue{name = QueueName, durable = Durable1, + auto_delete = AutoDelete1} = Q1} + when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> + check_exclusive_access(Q1, Owner, strict), + check_configure_permitted(QueueName, State), + return_queue_declare_ok(Q1, NoWait, MessageCount1, ConsumerCount1, + false, Owner, State); + {{ok, _ActualName, _MessageCount1, _ConsumerCount1}, + #amqqueue{name = QueueName}} -> + rabbit_misc:protocol_error( + precondition_failed, "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]); + {error, not_found} -> + case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner) of + {new, Q1 = #amqqueue{}} -> + return_queue_declare_ok(Q1, NoWait, 0, 0, true, Owner, + State); + {existing, _Q1} -> + %% must have been created between the stat and the + %% declare. Loop around again. + handle_method(Declare, undefined, State) + end + end; handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, @@ -762,9 +770,14 @@ handle_method(#'queue.declare'{queue = QueueNameBin, _, State = #ch{virtual_host = VHostPath, reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), + {{ok, _ActualName, MessageCount, ConsumerCount}, + #amqqueue{name = QueueName} = Q} = + rabbit_amqqueue:with_or_die( + QueueName, fun (Q1) -> {rabbit_amqqueue:stat(Q1), Q1} end), check_configure_permitted(QueueName, State), - Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end), - return_queue_declare_ok(State, NoWait, Q); + check_exclusive_access(Q, ReaderPid, lax), + return_queue_declare_ok(Q, NoWait, MessageCount, ConsumerCount, + false, none, State); handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, |