diff options
author | Tim Watson <tim@rabbitmq.com> | 2012-11-13 11:37:30 +0000 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2012-11-13 11:37:30 +0000 |
commit | e6098fa2bc32b10883c79fe1772fe9a4c24c1ee1 (patch) | |
tree | b5eb67df5ef5a9c247c784061cb23a62264dc14f | |
parent | fcd59347940d894cae4351783307357758c3525a (diff) | |
parent | 79a6cb25847fa11f5fce627a2c08390ab84b1463 (diff) | |
download | rabbitmq-server-e6098fa2bc32b10883c79fe1772fe9a4c24c1ee1.tar.gz |
merge default into bug25178
-rw-r--r-- | src/rabbit.erl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 22 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 42 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 |
4 files changed, 47 insertions, 24 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 66adcca3..f3d31b22 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -576,7 +576,10 @@ boot_delegate() -> rabbit_sup:start_supervisor_child(delegate_sup, [Count]). recover() -> - rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()). + Qs = rabbit_amqqueue:recover(), + ok = rabbit_binding:recover(rabbit_exchange:recover(), + [QName || #amqqueue{name = QName} <- Qs]), + rabbit_amqqueue:start(Qs). maybe_insert_default_data() -> case rabbit_table:is_empty() of diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 922951be..9fb453c1 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -16,7 +16,8 @@ -module(rabbit_amqqueue). --export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). +-export([recover/0, stop/0, start/1, declare/5, + delete_immediately/1, delete/3, purge/1]). -export([pseudo_queue/2]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, @@ -64,8 +65,9 @@ {'absent', rabbit_types:amqqueue()}). -type(not_found_or_absent() :: 'not_found' | {'absent', rabbit_types:amqqueue()}). --spec(start/0 :: () -> [name()]). +-spec(recover/0 :: () -> [rabbit_types:amqqueue()]). -spec(stop/0 :: () -> 'ok'). +-spec(start/1 :: ([rabbit_types:amqqueue()]) -> 'ok'). -spec(declare/5 :: (name(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) @@ -179,7 +181,7 @@ -define(CONSUMER_INFO_KEYS, [queue_name, channel_pid, consumer_tag, ack_required]). -start() -> +recover() -> %% Clear out remnants of old incarnation, in case we restarted %% faster than other nodes handled DOWN messages from us. on_node_down(node()), @@ -199,6 +201,14 @@ stop() -> {ok, BQ} = application:get_env(rabbit, backing_queue_module), ok = BQ:stop(). +start(Qs) -> + %% At this point all recovered queues and their bindings are + %% visible to routing, so now it is safe for them to complete + %% their initialisation (which may involve interacting with other + %% queues). + [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs], + ok. + find_durable_queues() -> Node = node(), %% TODO: use dirty ops instead @@ -211,8 +221,8 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(node(), Q) || Q <- DurableQueues], - [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs, - gen_server2:call(Pid, {init, true}, infinity) == {new, Q}]. + [Q || Q = #amqqueue{pid = Pid} <- Qs, + gen_server2:call(Pid, {init, self()}, infinity) == {new, Q}]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), @@ -227,7 +237,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> gm_pids = []}), {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), Q1 = start_queue_process(Node, Q0), - gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity). + gen_server2:call(Q1#amqqueue.pid, {init, new}, infinity). internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 43fe3578..92b00db0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -195,7 +195,7 @@ code_change(_OldVsn, State, _Extra) -> declare(Recover, From, State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined}) -> - case rabbit_amqqueue:internal_declare(Q, Recover) of + case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of #amqqueue{} = Q1 -> case matches(Recover, Q, Q1) of true -> @@ -206,6 +206,7 @@ declare(Recover, From, State = #q{q = Q, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = bq_init(BQ, Q, Recover), + recovery_barrier(Recover), State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), @@ -219,25 +220,34 @@ declare(Recover, From, State = #q{q = Q, {stop, normal, Err, State} end. -matches(true, Q, Q) -> true; -matches(true, _Q, _Q1) -> false; -matches(false, Q1, Q2) -> +matches(new, Q1, Q2) -> %% i.e. not policy - Q1#amqqueue.name =:= Q2#amqqueue.name andalso - Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso - Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso + Q1#amqqueue.name =:= Q2#amqqueue.name andalso + Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso + Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso - Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso - Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso - Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids. + Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso + Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso + Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids; +matches(_, Q, Q) -> true; +matches(_, _Q, _Q1) -> false. bq_init(BQ, Q, Recover) -> Self = self(), - BQ:init(Q, Recover, + BQ:init(Q, Recover =/= new, fun (Mod, Fun) -> rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end). +recovery_barrier(new) -> + ok; +recovery_barrier(BarrierPid) -> + MRef = erlang:monitor(process, BarrierPid), + receive + {BarrierPid, go} -> erlang:demonitor(MRef, [flush]); + {'DOWN', MRef, process, _, _} -> ok + end. + process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> lists:foldl( fun({Arg, Fun}, State1) -> @@ -247,9 +257,9 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> end end, State, [{<<"x-expires">>, fun init_expires/2}, - {<<"x-message-ttl">>, fun init_ttl/2}, {<<"x-dead-letter-exchange">>, fun init_dlx/2}, - {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}]). + {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}, + {<<"x-message-ttl">>, fun init_ttl/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). @@ -1001,9 +1011,9 @@ handle_call({init, Recover}, From, q = #amqqueue{name = QName} = Q} = State, gen_server2:reply(From, not_found), case Recover of - true -> ok; - _ -> rabbit_log:warning( - "Queue ~p exclusive owner went away~n", [QName]) + new -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", [QName]); + _ -> ok end, BQS = bq_init(BQ, Q, Recover), %% Rely on terminate to delete the queue. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f802a5a0..8a24d388 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2524,7 +2524,7 @@ test_queue_recover() -> after 10000 -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(), - rabbit_amqqueue:start(), + rabbit_amqqueue:start(rabbit_amqqueue:recover()), rabbit_amqqueue:with_or_die( QName, fun (Q1 = #amqqueue { pid = QPid1 }) -> |