summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-05-07 15:40:14 +0100
committerMatthew Sackman <matthew@lshift.net>2010-05-07 15:40:14 +0100
commit1bb9b955955e5a31708b2a5cd1466f1d9b1bd24c (patch)
tree334009a6fb705ff50d51933012c6289760ac7366
parent70ec97af43e9d272b96d8970eef73c837146db95 (diff)
downloadrabbitmq-server-1bb9b955955e5a31708b2a5cd1466f1d9b1bd24c.tar.gz
Move the internal declare step into the queue process side of things
-rw-r--r--src/rabbit_amqqueue.erl63
-rw-r--r--src/rabbit_amqqueue_process.erl36
2 files changed, 55 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 41799a92..2af67736 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -109,7 +109,7 @@
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
--spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
+-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue() | 'not_found').
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
@@ -147,11 +147,8 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
%% Issue inits to *all* the queues so that they all init at the same time
- [ok = gen_server2:cast(Q#amqqueue.pid, {init, true}) || Q <- Qs],
- [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs],
- rabbit_misc:execute_mnesia_transaction(
- fun () -> [ok = store_queue(Q) || Q <- Qs] end),
- Qs.
+ [Q || Q <- Qs,
+ gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -159,36 +156,34 @@ declare(QueueName, Durable, AutoDelete, Args) ->
auto_delete = AutoDelete,
arguments = Args,
pid = none}),
- ok = gen_server2:cast(Q#amqqueue.pid, {init, false}),
- ok = gen_server2:call(Q#amqqueue.pid, sync, infinity),
- internal_declare(Q, true).
-
-internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_queue, QueueName}) of
- [] ->
- case mnesia:read(
- {rabbit_durable_queue, QueueName}) of
- [] -> ok = store_queue(Q),
- case WantDefaultBinding of
- true -> add_default_binding(Q);
- false -> ok
- end,
- Q;
- [_] -> not_found %% existing Q on stopped node
- end;
- [ExistingQ] ->
- ExistingQ
- end
- end) of
- not_found -> exit(Q#amqqueue.pid, shutdown),
- rabbit_misc:not_found(QueueName);
- Q -> Q;
- ExistingQ -> exit(Q#amqqueue.pid, shutdown),
- ExistingQ
+ case gen_server2:call(Q#amqqueue.pid, {init, false}) of
+ not_found -> rabbit_misc:not_found(QueueName);
+ Q1 -> Q1
end.
+internal_declare(Q = #amqqueue{name = QueueName}, Recover) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case Recover of
+ true ->
+ ok = store_queue(Q),
+ Q;
+ false ->
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] ->
+ case mnesia:read({rabbit_durable_queue,
+ QueueName}) of
+ [] -> ok = store_queue(Q),
+ ok = add_default_binding(Q),
+ Q;
+ [_] -> not_found %% Q exists on stopped node
+ end;
+ [ExistingQ] ->
+ ExistingQ
+ end
+ end
+ end).
+
store_queue(Q = #amqqueue{durable = true}) ->
ok = mnesia:write(rabbit_durable_queue, Q, write),
ok = mnesia:write(rabbit_queue, Q, write),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 06712e9c..a4f9f3f6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -103,7 +103,7 @@ init(Q) ->
process_flag(trap_exit, true),
{ok, BQ} = application:get_env(backing_queue_module),
- {ok, #q{q = Q,
+ {ok, #q{q = Q#amqqueue{pid = self()},
owner = none,
exclusive_consumer = none,
has_had_consumers = false,
@@ -121,9 +121,13 @@ terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
terminate(_Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
- State1 = terminate_shutdown(fun (BQS) -> BQ:delete_and_terminate(BQS) end,
- State),
- ok = rabbit_amqqueue:internal_delete(qname(State1)).
+ terminate_shutdown(fun (BQS) ->
+ BQS1 = BQ:delete_and_terminate(BQS),
+ %% don't care if the internal delete
+ %% doesn't return 'ok'.
+ rabbit_amqqueue:internal_delete(qname(State)),
+ BQS1
+ end, State).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -133,10 +137,10 @@ code_change(_OldVsn, State, _Extra) ->
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
+ ok = rabbit_memory_monitor:deregister(self()),
case BQS of
undefined -> State;
- _ -> ok = rabbit_memory_monitor:deregister(self()),
- BQS1 = lists:foldl(
+ _ -> BQS1 = lists:foldl(
fun (#cr{txn = none}, BQSN) ->
BQSN;
(#cr{txn = Txn}, BQSN) ->
@@ -714,16 +718,28 @@ handle_call({claim_queue, ReaderPid}, _From,
end;
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
- reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
+ reply(ok, maybe_run_queue_via_backing_queue(Fun, State));
-handle_cast({init, Recover},
- State = #q{q = #amqqueue{name = QName, durable = IsDurable},
+handle_call({init, Recover}, From,
+ State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
backing_queue = BQ, backing_queue_state = undefined}) ->
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
- noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable, Recover)});
+ %% TODO: If we're exclusively owned && our owner isn't alive &&
+ %% Recover then we should BQ:init and then {stop, normal,
+ %% not_found, State}, relying on terminate to delete the queue.
+ case rabbit_amqqueue:internal_declare(Q, Recover) of
+ not_found ->
+ {stop, normal, not_found, State};
+ Q ->
+ gen_server2:reply(From, Q),
+ noreply(State#q{backing_queue_state =
+ BQ:init(QName, IsDurable, Recover)});
+ Q1 ->
+ {stop, normal, Q1, State}
+ end.
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.