summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-09-15 17:08:37 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-09-15 17:08:37 +0100
commit181fc1d53b4219f1171ffd657dcbc415870f3210 (patch)
tree9b93c46e0cc312ba55fe646e20a89f6aa3395917
parentf27156117a7d26f9f13f3a90db1b280c5a7597c9 (diff)
parentc44594e78d2bfb455d4cfac5f2517a0ec8ed94f1 (diff)
downloadrabbitmq-server-181fc1d53b4219f1171ffd657dcbc415870f3210.tar.gz
Merge in default
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_amqqueue.erl41
-rw-r--r--src/rabbit_amqqueue_process.erl23
-rw-r--r--src/rabbit_autoheal.erl37
-rw-r--r--src/rabbit_mirror_queue_master.erl3
-rw-r--r--src/rabbit_mirror_queue_misc.erl3
-rw-r--r--src/rabbit_node_monitor.erl4
-rw-r--r--src/rabbit_tests.erl14
-rw-r--r--src/rabbit_upgrade_functions.erl18
9 files changed, 94 insertions, 52 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 5e41ea93..74e165cd 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -55,7 +55,8 @@
down_slave_nodes, %% durable
policy, %% durable, implicit update as above
gm_pids, %% transient
- decorators}). %% transient, recalculated as above
+ decorators, %% transient, recalculated as above
+ state}). %% durable (have we crashed?)
-record(exchange_serial, {name, next}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 34f231f8..e3833e7c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -274,7 +274,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
slave_pids = [],
sync_slave_pids = [],
down_slave_nodes = [],
- gm_pids = []})),
+ gm_pids = [],
+ state = live})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
gen_server2:call(
rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare),
@@ -282,7 +283,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
- fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end);
+ fun () ->
+ ok = store_queue(Q#amqqueue{state = live}),
+ rabbit_misc:const(Q)
+ end);
internal_declare(Q = #amqqueue{name = QueueName}, false) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
@@ -290,7 +294,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[] ->
case not_found_or_absent(QueueName) of
not_found -> Q1 = rabbit_policy:set(Q),
- ok = store_queue(Q1),
+ Q2 = Q1#amqqueue{state = live},
+ ok = store_queue(Q2),
B = add_default_binding(Q1),
fun () -> B(), Q1 end;
{absent, _Q, _} = R -> rabbit_misc:const(R)
@@ -383,6 +388,8 @@ not_found_or_absent_dirty(Name) ->
with(Name, F, E) ->
case lookup(Name) of
+ {ok, Q = #amqqueue{state = crashed}} ->
+ E({absent, Q, crashed});
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
@@ -392,11 +399,8 @@ with(Name, F, E) ->
%% the retry loop.
rabbit_misc:with_exit_handler(
fun () -> false = rabbit_misc:is_process_alive(QPid),
- case crashed_or_recovering(Q) of
- crashed -> E({absent, Q, crashed});
- recovering -> timer:sleep(25),
- with(Name, F, E)
- end
+ timer:sleep(25),
+ with(Name, F, E)
end, fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
@@ -409,20 +413,6 @@ with_or_die(Name, F) ->
({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end).
-%% TODO we could say we are crashed when we mean recovering if we
-%% happen to call in the middle of a crash-failover. We could try to
-%% figure out whether that's happening by looking for the supervisor -
-%% but we'd need some additional book keeping to know what it is. And
-%% it will just mean a temporary glitch while crashing, which is
-%% fairly tolerable.
-crashed_or_recovering(#amqqueue{pid = QPid, slave_pids = []}) ->
- case lists:member(node(QPid), [node() | nodes()]) of
- true -> crashed;
- false -> recovering
- end;
-crashed_or_recovering(_Q) ->
- recovering.
-
assert_equivalence(#amqqueue{durable = Durable,
auto_delete = AutoDelete} = Q,
Durable, AutoDelete, RequiredArgs, Owner) ->
@@ -646,10 +636,10 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate:call(QPid, {delete, IfUnused, IfEmpty}).
delete_crashed(#amqqueue{ pid = QPid } = Q) ->
- rpc:call(node(QPid), ?MODULE, delete_crashed_internal, [Q]).
+ ok = rpc:call(node(QPid), ?MODULE, delete_crashed_internal, [Q]).
delete_crashed_internal(#amqqueue{ name = QName }) ->
- {ok, BQ} = application:get_env(backing_queue_module),
+ {ok, BQ} = application:get_env(rabbit, backing_queue_module),
BQ:delete_crashed(QName),
ok = internal_delete(QName).
@@ -843,7 +833,8 @@ immutable(Q) -> Q#amqqueue{pid = none,
down_slave_nodes = none,
gm_pids = none,
policy = none,
- decorators = none}.
+ decorators = none,
+ state = none}.
deliver([], _Delivery, _Flow) ->
%% /dev/null optimisation
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2f2a94f4..fcf17381 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -234,19 +234,16 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
terminate(normal, State) -> %% delete case
terminate_shutdown(terminate_delete(true, normal, State), State);
-terminate(_Reason, State = #q{q = #amqqueue{name = Name}}) ->
- terminate_shutdown(
- fun (BQS) ->
- rabbit_event:if_enabled(
- State, #q.stats_timer,
- fun () ->
- rabbit_event:notify(queue_stats, [{name, Name},
- {state, crashed}])
- end),
- %% If we crashed don't try to clean up the BQS, probably
- %% best to leave it.
- BQS
- end, State).
+%% If we crashed don't try to clean up the BQS, probably best to leave it.
+terminate(_Reason, State = #q{q = Q}) ->
+ terminate_shutdown(fun (BQS) ->
+ Q2 = Q#amqqueue{state = crashed},
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ rabbit_amqqueue:store_queue(Q2)
+ end),
+ BQS
+ end, State).
terminate_delete(EmitStats, Reason,
State = #q{q = #amqqueue{name = QName},
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
index c5237d34..13df1662 100644
--- a/src/rabbit_autoheal.erl
+++ b/src/rabbit_autoheal.erl
@@ -54,6 +54,10 @@
%% - we are the winner and are waiting for all losing nodes to stop
%% before telling them they can restart
%%
+%% {leader_waiting, OutstandingStops}
+%% - we are the leader, and have already assigned the winner and losers.
+%% We are neither but need to ignore further requests to autoheal.
+%%
%% restarting
%% - we are restarting. Of course the node monitor immediately dies
%% then so this state does not last long. We therefore send the
@@ -83,8 +87,7 @@ enabled() ->
%% stopped - all nodes can now start again
rabbit_down(Node, {winner_waiting, [Node], Notify}) ->
rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]),
- notify_safe(Notify),
- not_healing;
+ winner_finish(Notify);
rabbit_down(Node, {winner_waiting, WaitFor, Notify}) ->
{winner_waiting, WaitFor -- [Node], Notify};
@@ -106,8 +109,7 @@ node_down(Node, {winner_waiting, _, Notify}) ->
rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]),
%% Make sure any nodes waiting for us start - it won't necessarily
%% heal the partition but at least they won't get stuck.
- notify_safe(Notify),
- not_healing;
+ winner_finish(Notify);
node_down(Node, _State) ->
rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]),
@@ -154,14 +156,14 @@ handle_msg({become_winner, Losers},
not_healing, _Partitions) ->
rabbit_log:info("Autoheal: I am the winner, waiting for ~p to stop~n",
[Losers]),
- {winner_waiting, Losers, Losers};
+ filter_already_down_losers(Losers, Losers);
handle_msg({become_winner, Losers},
{winner_waiting, WaitFor, Notify}, _Partitions) ->
rabbit_log:info("Autoheal: I am the winner, waiting additionally for "
"~p to stop~n", [Losers]),
- {winner_waiting, lists:usort(Losers ++ WaitFor),
- lists:usort(Losers ++ Notify)};
+ filter_already_down_losers(lists:usort(Losers ++ WaitFor),
+ lists:usort(Losers ++ Notify));
handle_msg({winner_is, Winner},
not_healing, _Partitions) ->
@@ -188,8 +190,9 @@ handle_msg(_, restarting, _Partitions) ->
send(Node, Msg) -> {?SERVER, Node} ! {autoheal_msg, Msg}.
-notify_safe(Notify) ->
- [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify].
+winner_finish(Notify) ->
+ [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify],
+ not_healing.
make_decision(AllPartitions) ->
Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]),
@@ -225,3 +228,19 @@ all_partitions([{Node, CantSee} | Rest], Partitions) ->
_ -> [A, B | Others]
end,
all_partitions(Rest, Partitions1).
+
+%% We could have received and ignored DOWN messages from some losers
+%% before becoming the winner - check for already down nodes.
+filter_already_down_losers(WaitFor, Notify) ->
+ WaitFor2 = rabbit_node_monitor:alive_rabbit_nodes(WaitFor),
+ case WaitFor of
+ WaitFor2 -> ok;
+ _ -> rabbit_log:info("Autoheal: ~p already down~n",
+ [WaitFor -- WaitFor2])
+ end,
+ case WaitFor2 of
+ [] -> rabbit_log:info(
+ "Autoheal: final node has stopped, starting...~n",[]),
+ winner_finish(Notify);
+ _ -> {winner_waiting, WaitFor2, Notify}
+ end.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 1bea8042..5ce22271 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -110,7 +110,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
[Q1 = #amqqueue{gm_pids = GMPids}]
= mnesia:read({rabbit_queue, QName}),
ok = rabbit_amqqueue:store_queue(
- Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]})
+ Q1#amqqueue{gm_pids = [{GM, Self} | GMPids],
+ state = live})
end),
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
%% We need synchronous add here (i.e. do not return until the
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index aec6f93d..826b6927 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -243,7 +243,8 @@ store_updated_slaves(Q = #amqqueue{pid = MPid,
SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)],
DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]],
Q1 = Q#amqqueue{sync_slave_pids = SSPids1,
- down_slave_nodes = DSNs1},
+ down_slave_nodes = DSNs1,
+ state = live},
ok = rabbit_amqqueue:store_queue(Q1),
%% Wake it up so that we emit a stats event
rabbit_amqqueue:notify_policy_changed(Q1),
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 7eaeaf50..e1ff9ffd 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -32,7 +32,8 @@
code_change/3]).
%% Utils
--export([all_rabbit_nodes_up/0, run_outside_applications/1, ping_all/0]).
+-export([all_rabbit_nodes_up/0, run_outside_applications/1, ping_all/0,
+ alive_rabbit_nodes/1]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -66,6 +67,7 @@
-spec(all_rabbit_nodes_up/0 :: () -> boolean()).
-spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()).
-spec(ping_all/0 :: () -> 'ok').
+-spec(alive_rabbit_nodes/1 :: ([node()]) -> [node()]).
-endif.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7018bffe..ee052c32 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -29,9 +29,19 @@
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
--define(TIMEOUT, 5000).
+-define(TIMEOUT, 30000).
all_tests() ->
+ try
+ all_tests0()
+ catch
+ Type:Error ->
+ rabbit_misc:format(
+ "Tests failed~nError: {~p, ~p}~nStack trace:~n~p~n",
+ [Type, Error, erlang:get_stacktrace()])
+ end.
+
+all_tests0() ->
ok = setup_cluster(),
ok = truncate:test(),
ok = supervisor2_tests:test_all(),
@@ -2892,6 +2902,8 @@ test_queue_recover() ->
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
publish_and_confirm(Q, <<>>, Count),
+ [{_, SupPid, _, _}] = supervisor:which_children(rabbit_amqqueue_sup_sup),
+ exit(SupPid, kill),
exit(QPid, kill),
MRef = erlang:monitor(process, QPid),
receive {'DOWN', MRef, process, QPid, _Info} -> ok
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 1104f373..9f6dc21a 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -49,6 +49,7 @@
-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}).
-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
+-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}).
%% -------------------------------------------------------------------
@@ -80,6 +81,7 @@
-spec(internal_system_x/0 :: () -> 'ok').
-spec(cluster_name/0 :: () -> 'ok').
-spec(down_slave_nodes/0 :: () -> 'ok').
+-spec(queue_state/0 :: () -> 'ok').
-endif.
@@ -400,6 +402,22 @@ down_slave_nodes(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]).
+queue_state() ->
+ ok = queue_state(rabbit_queue),
+ ok = queue_state(rabbit_durable_queue).
+
+queue_state(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ live}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators, state]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->