diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-09-15 17:08:37 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-09-15 17:08:37 +0100 |
commit | 181fc1d53b4219f1171ffd657dcbc415870f3210 (patch) | |
tree | 9b93c46e0cc312ba55fe646e20a89f6aa3395917 | |
parent | f27156117a7d26f9f13f3a90db1b280c5a7597c9 (diff) | |
parent | c44594e78d2bfb455d4cfac5f2517a0ec8ed94f1 (diff) | |
download | rabbitmq-server-181fc1d53b4219f1171ffd657dcbc415870f3210.tar.gz |
Merge in default
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 41 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 23 | ||||
-rw-r--r-- | src/rabbit_autoheal.erl | 37 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 3 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 3 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 4 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 14 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 18 |
9 files changed, 94 insertions, 52 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5e41ea93..74e165cd 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -55,7 +55,8 @@ down_slave_nodes, %% durable policy, %% durable, implicit update as above gm_pids, %% transient - decorators}). %% transient, recalculated as above + decorators, %% transient, recalculated as above + state}). %% durable (have we crashed?) -record(exchange_serial, {name, next}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 34f231f8..e3833e7c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -274,7 +274,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> slave_pids = [], sync_slave_pids = [], down_slave_nodes = [], - gm_pids = []})), + gm_pids = [], + state = live})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call( rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), @@ -282,7 +283,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end); + fun () -> + ok = store_queue(Q#amqqueue{state = live}), + rabbit_misc:const(Q) + end); internal_declare(Q = #amqqueue{name = QueueName}, false) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> @@ -290,7 +294,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [] -> case not_found_or_absent(QueueName) of not_found -> Q1 = rabbit_policy:set(Q), - ok = store_queue(Q1), + Q2 = Q1#amqqueue{state = live}, + ok = store_queue(Q2), B = add_default_binding(Q1), fun () -> B(), Q1 end; {absent, _Q, _} = R -> rabbit_misc:const(R) @@ -383,6 +388,8 @@ not_found_or_absent_dirty(Name) -> with(Name, F, E) -> case lookup(Name) of + {ok, Q = #amqqueue{state = crashed}} -> + E({absent, Q, crashed}); {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do @@ -392,11 +399,8 @@ with(Name, F, E) -> %% the retry loop. rabbit_misc:with_exit_handler( fun () -> false = rabbit_misc:is_process_alive(QPid), - case crashed_or_recovering(Q) of - crashed -> E({absent, Q, crashed}); - recovering -> timer:sleep(25), - with(Name, F, E) - end + timer:sleep(25), + with(Name, F, E) end, fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) @@ -409,20 +413,6 @@ with_or_die(Name, F) -> ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) end). -%% TODO we could say we are crashed when we mean recovering if we -%% happen to call in the middle of a crash-failover. We could try to -%% figure out whether that's happening by looking for the supervisor - -%% but we'd need some additional book keeping to know what it is. And -%% it will just mean a temporary glitch while crashing, which is -%% fairly tolerable. -crashed_or_recovering(#amqqueue{pid = QPid, slave_pids = []}) -> - case lists:member(node(QPid), [node() | nodes()]) of - true -> crashed; - false -> recovering - end; -crashed_or_recovering(_Q) -> - recovering. - assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, Durable, AutoDelete, RequiredArgs, Owner) -> @@ -646,10 +636,10 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate:call(QPid, {delete, IfUnused, IfEmpty}). delete_crashed(#amqqueue{ pid = QPid } = Q) -> - rpc:call(node(QPid), ?MODULE, delete_crashed_internal, [Q]). + ok = rpc:call(node(QPid), ?MODULE, delete_crashed_internal, [Q]). delete_crashed_internal(#amqqueue{ name = QName }) -> - {ok, BQ} = application:get_env(backing_queue_module), + {ok, BQ} = application:get_env(rabbit, backing_queue_module), BQ:delete_crashed(QName), ok = internal_delete(QName). @@ -843,7 +833,8 @@ immutable(Q) -> Q#amqqueue{pid = none, down_slave_nodes = none, gm_pids = none, policy = none, - decorators = none}. + decorators = none, + state = none}. deliver([], _Delivery, _Flow) -> %% /dev/null optimisation diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2f2a94f4..fcf17381 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -234,19 +234,16 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate(normal, State) -> %% delete case terminate_shutdown(terminate_delete(true, normal, State), State); -terminate(_Reason, State = #q{q = #amqqueue{name = Name}}) -> - terminate_shutdown( - fun (BQS) -> - rabbit_event:if_enabled( - State, #q.stats_timer, - fun () -> - rabbit_event:notify(queue_stats, [{name, Name}, - {state, crashed}]) - end), - %% If we crashed don't try to clean up the BQS, probably - %% best to leave it. - BQS - end, State). +%% If we crashed don't try to clean up the BQS, probably best to leave it. +terminate(_Reason, State = #q{q = Q}) -> + terminate_shutdown(fun (BQS) -> + Q2 = Q#amqqueue{state = crashed}, + rabbit_misc:execute_mnesia_transaction( + fun() -> + rabbit_amqqueue:store_queue(Q2) + end), + BQS + end, State). terminate_delete(EmitStats, Reason, State = #q{q = #amqqueue{name = QName}, diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl index c5237d34..13df1662 100644 --- a/src/rabbit_autoheal.erl +++ b/src/rabbit_autoheal.erl @@ -54,6 +54,10 @@ %% - we are the winner and are waiting for all losing nodes to stop %% before telling them they can restart %% +%% {leader_waiting, OutstandingStops} +%% - we are the leader, and have already assigned the winner and losers. +%% We are neither but need to ignore further requests to autoheal. +%% %% restarting %% - we are restarting. Of course the node monitor immediately dies %% then so this state does not last long. We therefore send the @@ -83,8 +87,7 @@ enabled() -> %% stopped - all nodes can now start again rabbit_down(Node, {winner_waiting, [Node], Notify}) -> rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]), - notify_safe(Notify), - not_healing; + winner_finish(Notify); rabbit_down(Node, {winner_waiting, WaitFor, Notify}) -> {winner_waiting, WaitFor -- [Node], Notify}; @@ -106,8 +109,7 @@ node_down(Node, {winner_waiting, _, Notify}) -> rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]), %% Make sure any nodes waiting for us start - it won't necessarily %% heal the partition but at least they won't get stuck. - notify_safe(Notify), - not_healing; + winner_finish(Notify); node_down(Node, _State) -> rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]), @@ -154,14 +156,14 @@ handle_msg({become_winner, Losers}, not_healing, _Partitions) -> rabbit_log:info("Autoheal: I am the winner, waiting for ~p to stop~n", [Losers]), - {winner_waiting, Losers, Losers}; + filter_already_down_losers(Losers, Losers); handle_msg({become_winner, Losers}, {winner_waiting, WaitFor, Notify}, _Partitions) -> rabbit_log:info("Autoheal: I am the winner, waiting additionally for " "~p to stop~n", [Losers]), - {winner_waiting, lists:usort(Losers ++ WaitFor), - lists:usort(Losers ++ Notify)}; + filter_already_down_losers(lists:usort(Losers ++ WaitFor), + lists:usort(Losers ++ Notify)); handle_msg({winner_is, Winner}, not_healing, _Partitions) -> @@ -188,8 +190,9 @@ handle_msg(_, restarting, _Partitions) -> send(Node, Msg) -> {?SERVER, Node} ! {autoheal_msg, Msg}. -notify_safe(Notify) -> - [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify]. +winner_finish(Notify) -> + [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], + not_healing. make_decision(AllPartitions) -> Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]), @@ -225,3 +228,19 @@ all_partitions([{Node, CantSee} | Rest], Partitions) -> _ -> [A, B | Others] end, all_partitions(Rest, Partitions1). + +%% We could have received and ignored DOWN messages from some losers +%% before becoming the winner - check for already down nodes. +filter_already_down_losers(WaitFor, Notify) -> + WaitFor2 = rabbit_node_monitor:alive_rabbit_nodes(WaitFor), + case WaitFor of + WaitFor2 -> ok; + _ -> rabbit_log:info("Autoheal: ~p already down~n", + [WaitFor -- WaitFor2]) + end, + case WaitFor2 of + [] -> rabbit_log:info( + "Autoheal: final node has stopped, starting...~n",[]), + winner_finish(Notify); + _ -> {winner_waiting, WaitFor2, Notify} + end. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 1bea8042..5ce22271 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -110,7 +110,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> [Q1 = #amqqueue{gm_pids = GMPids}] = mnesia:read({rabbit_queue, QName}), ok = rabbit_amqqueue:store_queue( - Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]}) + Q1#amqqueue{gm_pids = [{GM, Self} | GMPids], + state = live}) end), {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), %% We need synchronous add here (i.e. do not return until the diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index aec6f93d..826b6927 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -243,7 +243,8 @@ store_updated_slaves(Q = #amqqueue{pid = MPid, SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)], DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]], Q1 = Q#amqqueue{sync_slave_pids = SSPids1, - down_slave_nodes = DSNs1}, + down_slave_nodes = DSNs1, + state = live}, ok = rabbit_amqqueue:store_queue(Q1), %% Wake it up so that we emit a stats event rabbit_amqqueue:notify_policy_changed(Q1), diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 7eaeaf50..e1ff9ffd 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -32,7 +32,8 @@ code_change/3]). %% Utils --export([all_rabbit_nodes_up/0, run_outside_applications/1, ping_all/0]). +-export([all_rabbit_nodes_up/0, run_outside_applications/1, ping_all/0, + alive_rabbit_nodes/1]). -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). @@ -66,6 +67,7 @@ -spec(all_rabbit_nodes_up/0 :: () -> boolean()). -spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()). -spec(ping_all/0 :: () -> 'ok'). +-spec(alive_rabbit_nodes/1 :: ([node()]) -> [node()]). -endif. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 7018bffe..ee052c32 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -29,9 +29,19 @@ -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). -define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). --define(TIMEOUT, 5000). +-define(TIMEOUT, 30000). all_tests() -> + try + all_tests0() + catch + Type:Error -> + rabbit_misc:format( + "Tests failed~nError: {~p, ~p}~nStack trace:~n~p~n", + [Type, Error, erlang:get_stacktrace()]) + end. + +all_tests0() -> ok = setup_cluster(), ok = truncate:test(), ok = supervisor2_tests:test_all(), @@ -2892,6 +2902,8 @@ test_queue_recover() -> rabbit_amqqueue:declare(test_queue(), true, false, [], none), publish_and_confirm(Q, <<>>, Count), + [{_, SupPid, _, _}] = supervisor:which_children(rabbit_amqqueue_sup_sup), + exit(SupPid, kill), exit(QPid, kill), MRef = erlang:monitor(process, QPid), receive {'DOWN', MRef, process, QPid, _Info} -> ok diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 1104f373..9f6dc21a 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -49,6 +49,7 @@ -rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). -rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}). -rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}). +-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}). %% ------------------------------------------------------------------- @@ -80,6 +81,7 @@ -spec(internal_system_x/0 :: () -> 'ok'). -spec(cluster_name/0 :: () -> 'ok'). -spec(down_slave_nodes/0 :: () -> 'ok'). +-spec(queue_state/0 :: () -> 'ok'). -endif. @@ -400,6 +402,22 @@ down_slave_nodes(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]). +queue_state() -> + ok = queue_state(rabbit_queue), + ok = queue_state(rabbit_durable_queue). + +queue_state(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + live} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators, state]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |