summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-11-13 11:37:30 +0000
committerTim Watson <tim@rabbitmq.com>2012-11-13 11:37:30 +0000
commite6098fa2bc32b10883c79fe1772fe9a4c24c1ee1 (patch)
treeb5eb67df5ef5a9c247c784061cb23a62264dc14f
parentfcd59347940d894cae4351783307357758c3525a (diff)
parent79a6cb25847fa11f5fce627a2c08390ab84b1463 (diff)
downloadrabbitmq-server-e6098fa2bc32b10883c79fe1772fe9a4c24c1ee1.tar.gz
merge default into bug25178
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_amqqueue.erl22
-rw-r--r--src/rabbit_amqqueue_process.erl42
-rw-r--r--src/rabbit_tests.erl2
4 files changed, 47 insertions, 24 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 66adcca3..f3d31b22 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -576,7 +576,10 @@ boot_delegate() ->
rabbit_sup:start_supervisor_child(delegate_sup, [Count]).
recover() ->
- rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()).
+ Qs = rabbit_amqqueue:recover(),
+ ok = rabbit_binding:recover(rabbit_exchange:recover(),
+ [QName || #amqqueue{name = QName} <- Qs]),
+ rabbit_amqqueue:start(Qs).
maybe_insert_default_data() ->
case rabbit_table:is_empty() of
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 922951be..9fb453c1 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -16,7 +16,8 @@
-module(rabbit_amqqueue).
--export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
+-export([recover/0, stop/0, start/1, declare/5,
+ delete_immediately/1, delete/3, purge/1]).
-export([pseudo_queue/2]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
@@ -64,8 +65,9 @@
{'absent', rabbit_types:amqqueue()}).
-type(not_found_or_absent() :: 'not_found' |
{'absent', rabbit_types:amqqueue()}).
--spec(start/0 :: () -> [name()]).
+-spec(recover/0 :: () -> [rabbit_types:amqqueue()]).
-spec(stop/0 :: () -> 'ok').
+-spec(start/1 :: ([rabbit_types:amqqueue()]) -> 'ok').
-spec(declare/5 ::
(name(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
@@ -179,7 +181,7 @@
-define(CONSUMER_INFO_KEYS,
[queue_name, channel_pid, consumer_tag, ack_required]).
-start() ->
+recover() ->
%% Clear out remnants of old incarnation, in case we restarted
%% faster than other nodes handled DOWN messages from us.
on_node_down(node()),
@@ -199,6 +201,14 @@ stop() ->
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:stop().
+start(Qs) ->
+ %% At this point all recovered queues and their bindings are
+ %% visible to routing, so now it is safe for them to complete
+ %% their initialisation (which may involve interacting with other
+ %% queues).
+ [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs],
+ ok.
+
find_durable_queues() ->
Node = node(),
%% TODO: use dirty ops instead
@@ -211,8 +221,8 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(node(), Q) || Q <- DurableQueues],
- [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs,
- gen_server2:call(Pid, {init, true}, infinity) == {new, Q}].
+ [Q || Q = #amqqueue{pid = Pid} <- Qs,
+ gen_server2:call(Pid, {init, self()}, infinity) == {new, Q}].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
@@ -227,7 +237,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
gm_pids = []}),
{Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
Q1 = start_queue_process(Node, Q0),
- gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity).
+ gen_server2:call(Q1#amqqueue.pid, {init, new}, infinity).
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 43fe3578..92b00db0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -195,7 +195,7 @@ code_change(_OldVsn, State, _Extra) ->
declare(Recover, From, State = #q{q = Q,
backing_queue = BQ,
backing_queue_state = undefined}) ->
- case rabbit_amqqueue:internal_declare(Q, Recover) of
+ case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of
#amqqueue{} = Q1 ->
case matches(Recover, Q, Q1) of
true ->
@@ -206,6 +206,7 @@ declare(Recover, From, State = #q{q = Q,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = bq_init(BQ, Q, Recover),
+ recovery_barrier(Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -219,25 +220,34 @@ declare(Recover, From, State = #q{q = Q,
{stop, normal, Err, State}
end.
-matches(true, Q, Q) -> true;
-matches(true, _Q, _Q1) -> false;
-matches(false, Q1, Q2) ->
+matches(new, Q1, Q2) ->
%% i.e. not policy
- Q1#amqqueue.name =:= Q2#amqqueue.name andalso
- Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
- Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
+ Q1#amqqueue.name =:= Q2#amqqueue.name andalso
+ Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
+ Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
- Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
- Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
- Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids.
+ Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
+ Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
+ Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids;
+matches(_, Q, Q) -> true;
+matches(_, _Q, _Q1) -> false.
bq_init(BQ, Q, Recover) ->
Self = self(),
- BQ:init(Q, Recover,
+ BQ:init(Q, Recover =/= new,
fun (Mod, Fun) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
+recovery_barrier(new) ->
+ ok;
+recovery_barrier(BarrierPid) ->
+ MRef = erlang:monitor(process, BarrierPid),
+ receive
+ {BarrierPid, go} -> erlang:demonitor(MRef, [flush]);
+ {'DOWN', MRef, process, _, _} -> ok
+ end.
+
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
lists:foldl(
fun({Arg, Fun}, State1) ->
@@ -247,9 +257,9 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
end
end, State,
[{<<"x-expires">>, fun init_expires/2},
- {<<"x-message-ttl">>, fun init_ttl/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
- {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}]).
+ {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
+ {<<"x-message-ttl">>, fun init_ttl/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
@@ -1001,9 +1011,9 @@ handle_call({init, Recover}, From,
q = #amqqueue{name = QName} = Q} = State,
gen_server2:reply(From, not_found),
case Recover of
- true -> ok;
- _ -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n", [QName])
+ new -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName]);
+ _ -> ok
end,
BQS = bq_init(BQ, Q, Recover),
%% Rely on terminate to delete the queue.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f802a5a0..8a24d388 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2524,7 +2524,7 @@ test_queue_recover() ->
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
rabbit_amqqueue:stop(),
- rabbit_amqqueue:start(),
+ rabbit_amqqueue:start(rabbit_amqqueue:recover()),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->