diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-05-07 15:40:14 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-05-07 15:40:14 +0100 |
commit | 1bb9b955955e5a31708b2a5cd1466f1d9b1bd24c (patch) | |
tree | 334009a6fb705ff50d51933012c6289760ac7366 | |
parent | 70ec97af43e9d272b96d8970eef73c837146db95 (diff) | |
download | rabbitmq-server-1bb9b955955e5a31708b2a5cd1466f1d9b1bd24c.tar.gz |
Move the internal declare step into the queue process side of things
-rw-r--r-- | src/rabbit_amqqueue.erl | 63 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 36 |
2 files changed, 55 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 41799a92..2af67736 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -109,7 +109,7 @@ -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). --spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). +-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue() | 'not_found'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). @@ -147,11 +147,8 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], %% Issue inits to *all* the queues so that they all init at the same time - [ok = gen_server2:cast(Q#amqqueue.pid, {init, true}) || Q <- Qs], - [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs], - rabbit_misc:execute_mnesia_transaction( - fun () -> [ok = store_queue(Q) || Q <- Qs] end), - Qs. + [Q || Q <- Qs, + gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -159,36 +156,34 @@ declare(QueueName, Durable, AutoDelete, Args) -> auto_delete = AutoDelete, arguments = Args, pid = none}), - ok = gen_server2:cast(Q#amqqueue.pid, {init, false}), - ok = gen_server2:call(Q#amqqueue.pid, sync, infinity), - internal_declare(Q, true). - -internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> - case rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> - case mnesia:read( - {rabbit_durable_queue, QueueName}) of - [] -> ok = store_queue(Q), - case WantDefaultBinding of - true -> add_default_binding(Q); - false -> ok - end, - Q; - [_] -> not_found %% existing Q on stopped node - end; - [ExistingQ] -> - ExistingQ - end - end) of - not_found -> exit(Q#amqqueue.pid, shutdown), - rabbit_misc:not_found(QueueName); - Q -> Q; - ExistingQ -> exit(Q#amqqueue.pid, shutdown), - ExistingQ + case gen_server2:call(Q#amqqueue.pid, {init, false}) of + not_found -> rabbit_misc:not_found(QueueName); + Q1 -> Q1 end. +internal_declare(Q = #amqqueue{name = QueueName}, Recover) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + case Recover of + true -> + ok = store_queue(Q), + Q; + false -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> + case mnesia:read({rabbit_durable_queue, + QueueName}) of + [] -> ok = store_queue(Q), + ok = add_default_binding(Q), + Q; + [_] -> not_found %% Q exists on stopped node + end; + [ExistingQ] -> + ExistingQ + end + end + end). + store_queue(Q = #amqqueue{durable = true}) -> ok = mnesia:write(rabbit_durable_queue, Q, write), ok = mnesia:write(rabbit_queue, Q, write), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 06712e9c..a4f9f3f6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -103,7 +103,7 @@ init(Q) -> process_flag(trap_exit, true), {ok, BQ} = application:get_env(backing_queue_module), - {ok, #q{q = Q, + {ok, #q{q = Q#amqqueue{pid = self()}, owner = none, exclusive_consumer = none, has_had_consumers = false, @@ -121,9 +121,13 @@ terminate({shutdown, _}, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); terminate(_Reason, State = #q{backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? - State1 = terminate_shutdown(fun (BQS) -> BQ:delete_and_terminate(BQS) end, - State), - ok = rabbit_amqqueue:internal_delete(qname(State1)). + terminate_shutdown(fun (BQS) -> + BQS1 = BQ:delete_and_terminate(BQS), + %% don't care if the internal delete + %% doesn't return 'ok'. + rabbit_amqqueue:internal_delete(qname(State)), + BQS1 + end, State). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -133,10 +137,10 @@ code_change(_OldVsn, State, _Extra) -> terminate_shutdown(Fun, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), + ok = rabbit_memory_monitor:deregister(self()), case BQS of undefined -> State; - _ -> ok = rabbit_memory_monitor:deregister(self()), - BQS1 = lists:foldl( + _ -> BQS1 = lists:foldl( fun (#cr{txn = none}, BQSN) -> BQSN; (#cr{txn = Txn}, BQSN) -> @@ -714,16 +718,28 @@ handle_call({claim_queue, ReaderPid}, _From, end; handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> - reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). + reply(ok, maybe_run_queue_via_backing_queue(Fun, State)); -handle_cast({init, Recover}, - State = #q{q = #amqqueue{name = QName, durable = IsDurable}, +handle_call({init, Recover}, From, + State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined}) -> ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [self()]), ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), - noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable, Recover)}); + %% TODO: If we're exclusively owned && our owner isn't alive && + %% Recover then we should BQ:init and then {stop, normal, + %% not_found, State}, relying on terminate to delete the queue. + case rabbit_amqqueue:internal_declare(Q, Recover) of + not_found -> + {stop, normal, not_found, State}; + Q -> + gen_server2:reply(From, Q), + noreply(State#q{backing_queue_state = + BQ:init(QName, IsDurable, Recover)}); + Q1 -> + {stop, normal, Q1, State} + end. handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. |