diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-16 16:03:39 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-16 16:03:39 +0000 |
commit | 3c302660253185a11505ff33c7196b9d120df8c0 (patch) | |
tree | f8f43f9c4009ce51e9d37c4a4fe3170849e37de9 | |
parent | 916f0d7414bdab0ce0b28be5c2f8b61af461b5ba (diff) | |
parent | 22007275cb3d133e047a291510d716b23fe05dfb (diff) | |
download | rabbitmq-server-3c302660253185a11505ff33c7196b9d120df8c0.tar.gz |
Merging default into bug23425
-rw-r--r-- | src/rabbit.erl | 1 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 84 | ||||
-rw-r--r-- | src/rabbit_prelaunch.erl | 2 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 307 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 12 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 2 |
7 files changed, 339 insertions, 71 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index c9a929ae..b1d88a52 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -204,6 +204,7 @@ start() -> end. stop() -> + ok = rabbit_mnesia:record_running_disc_nodes(), ok = rabbit_misc:stop_applications(?APPS). stop_and_halt() -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 66436920..e61f5fce 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -18,9 +18,12 @@ -module(rabbit_mnesia). -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, - cluster/1, force_cluster/1, reset/0, force_reset/0, + cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/2, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1, wait_for_tables/1]). + empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, + create_cluster_nodes_config/1, read_cluster_nodes_config/0, + record_running_disc_nodes/0, read_previously_running_disc_nodes/0, + delete_previously_running_disc_nodes/0, running_nodes_filename/0]). -export([table_names/0]). @@ -42,6 +45,7 @@ -spec(dir/0 :: () -> file:filename()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). +-spec(init_db/2 :: ([node()], boolean()) -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). -spec(cluster/1 :: ([node()]) -> 'ok'). -spec(force_cluster/1 :: ([node()]) -> 'ok'). @@ -55,6 +59,12 @@ -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). +-spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). +-spec(read_cluster_nodes_config/0 :: () -> [node()]). +-spec(record_running_disc_nodes/0 :: () -> 'ok'). +-spec(read_previously_running_disc_nodes/0 :: () -> [node()]). +-spec(delete_previously_running_disc_nodes/0 :: () -> 'ok'). +-spec(running_nodes_filename/0 :: () -> file:filename()). -endif. @@ -367,6 +377,38 @@ delete_cluster_nodes_config() -> FileName, Reason}}) end. +running_nodes_filename() -> + dir() ++ "/nodes_running_at_shutdown". + +record_running_disc_nodes() -> + FileName = running_nodes_filename(), + Nodes = sets:to_list( + sets:intersection( + sets:from_list(nodes_of_type(disc_copies)), + sets:from_list(running_clustered_nodes()))) -- [node()], + %% Don't check the result: we're shutting down anyway and this is + %% a best-effort-basis. + rabbit_misc:write_term_file(FileName, [Nodes]), + ok. + +read_previously_running_disc_nodes() -> + FileName = running_nodes_filename(), + case rabbit_misc:read_term_file(FileName) of + {ok, [Nodes]} -> Nodes; + {error, enoent} -> []; + {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, + FileName, Reason}}) + end. + +delete_previously_running_disc_nodes() -> + FileName = running_nodes_filename(), + case file:delete(FileName) of + ok -> ok; + {error, enoent} -> ok; + {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, + FileName, Reason}}) + end. + %% Take a cluster node config and create the right kind of node - a %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. If Force is false, don't allow @@ -387,24 +429,18 @@ init_db(ClusterNodes, Force) -> end; true -> ok end, - case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of - {[], true, [_]} -> - %% True single disc node, attempt upgrade - case rabbit_upgrade:maybe_upgrade() of + case {Nodes, mnesia:system_info(use_dir)} of + {[], false} -> + %% Nothing there at all, start from scratch + ok = create_schema(); + {[], true} -> + %% We're the first node up + case rabbit_upgrade:maybe_upgrade_local() of ok -> ensure_schema_integrity(); version_not_available -> schema_ok_or_move() end; - {[], true, _} -> - %% "Master" (i.e. without config) disc node in cluster, - %% verify schema - ensure_version_ok(rabbit_upgrade:read_version()), - ensure_schema_integrity(); - {[], false, _} -> - %% Nothing there at all, start from scratch - ok = create_schema(); - {[AnotherNode|_], _, _} -> + {[AnotherNode|_], _} -> %% Subsequent node in cluster, catch up - ensure_version_ok(rabbit_upgrade:read_version()), ensure_version_ok( rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), IsDiskNode = ClusterNodes == [] orelse @@ -415,6 +451,14 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), + case rabbit_upgrade:maybe_upgrade_local() of + ok -> + ok; + %% If we're just starting up a new node we won't have + %% a version + version_not_available -> + ok = rabbit_upgrade:write_version() + end, ensure_schema_integrity() end; {error, Reason} -> @@ -481,13 +525,7 @@ move_db() -> ok. copy_db(Destination) -> - mnesia:stop(), - case rabbit_misc:recursive_copy(dir(), Destination) of - ok -> - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia); - {error, E} -> - {error, E} - end. + rabbit_misc:recursive_copy(dir(), Destination). create_tables() -> lists:foreach(fun ({Tab, TabDef}) -> diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 7bb8c0ea..92ad6a24 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -237,6 +237,8 @@ post_process_script(ScriptFile) -> process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> [{apply,{rabbit,prepare,[]}}, Entry]; +process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) -> + [{apply,{rabbit_upgrade,maybe_upgrade_mnesia,[]}}, Entry]; process_entry(Entry) -> [Entry]. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 33c5391b..367953b8 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -169,7 +169,7 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({add_queue_ttl, []}). +-rabbit_upgrade({add_queue_ttl, local, []}). -ifdef(use_specs). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index ebda5d03..f1134cfa 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -16,68 +16,290 @@ -module(rabbit_upgrade). --export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]). +-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]). +-export([read_version/0, write_version/0, desired_version/0, + desired_version/1]). -include("rabbit.hrl"). -define(VERSION_FILENAME, "schema_version"). -define(LOCK_FILENAME, "schema_upgrade_lock"). +-define(SCOPES, [mnesia, local]). %% ------------------------------------------------------------------- -ifdef(use_specs). -type(step() :: atom()). --type(version() :: [step()]). +-type(version() :: [{scope(), [step()]}]). +-type(scope() :: 'mnesia' | 'local'). --spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available'). +-spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). +-spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available'). -spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). -spec(desired_version/0 :: () -> version()). +-spec(desired_version/1 :: (scope()) -> [step()]). -endif. %% ------------------------------------------------------------------- -%% Try to upgrade the schema. If no information on the existing schema -%% could be found, do nothing. rabbit_mnesia:check_schema_integrity() -%% will catch the problem. -maybe_upgrade() -> - case read_version() of - {ok, CurrentHeads} -> - with_upgrade_graph( - fun (G) -> - case unknown_heads(CurrentHeads, G) of - [] -> case upgrades_to_apply(CurrentHeads, G) of - [] -> ok; - Upgrades -> apply_upgrades(Upgrades) - end; - Unknown -> throw({error, - {future_upgrades_found, Unknown}}) - end - end); - {error, enoent} -> - version_not_available +%% The upgrade logic is quite involved, due to the existence of +%% clusters. +%% +%% Firstly, we have two different types of upgrades to do: Mnesia and +%% everythinq else. Mnesia upgrades must only be done by one node in +%% the cluster (we treat a non-clustered node as a single-node +%% cluster). This is the primary upgrader. The other upgrades need to +%% be done by all nodes. +%% +%% The primary upgrader has to start first (and do its Mnesia +%% upgrades). Secondary upgraders need to reset their Mnesia database +%% and then rejoin the cluster. They can't do the Mnesia upgrades as +%% well and then merge databases since the cookie for each table will +%% end up different and the merge will fail. +%% +%% This in turn means that we need to determine whether we are the +%% primary or secondary upgrader *before* Mnesia comes up. If we +%% didn't then the secondary upgrader would try to start Mnesia, and +%% either hang waiting for a node which is not yet up, or fail since +%% its schema differs from the other nodes in the cluster. +%% +%% Also, the primary upgrader needs to start Mnesia to do its +%% upgrades, but needs to forcibly load tables rather than wait for +%% them (in case it was not the last node to shut down, in which case +%% it would wait forever). +%% +%% This in turn means that maybe_upgrade_mnesia/0 has to be patched +%% into the boot process by prelaunch before the mnesia application is +%% started. By the time Mnesia is started the upgrades have happened +%% (on the primary), or Mnesia has been reset (on the secondary) and +%% rabbit_mnesia:init_db/2 can then make the node rejoin the cluster +%% in the normal way. +%% +%% The non-mnesia upgrades are then triggered by +%% rabbit_mnesia:init_db/2. Of course, it's possible for a given +%% upgrade process to only require Mnesia upgrades, or only require +%% non-Mnesia upgrades. In the latter case no Mnesia resets and +%% reclusterings occur. +%% +%% The primary upgrader needs to be a disc node. Ideally we would like +%% it to be the last disc node to shut down (since otherwise there's a +%% risk of data loss). On each node we therefore record the disc nodes +%% that were still running when we shut down. A disc node that knows +%% other nodes were up when it shut down, or a ram node, will refuse +%% to be the primary upgrader, and will thus not start when upgrades +%% are needed. +%% +%% However, this is racy if several nodes are shut down at once. Since +%% rabbit records the running nodes, and shuts down before mnesia, the +%% race manifests as all disc nodes thinking they are not the primary +%% upgrader. Therefore the user can remove the record of the last disc +%% node to shut down to get things going again. This may lose any +%% mnesia changes that happened after the node chosen as the primary +%% upgrader was shut down. + +%% ------------------------------------------------------------------- + +maybe_upgrade_mnesia() -> + AllNodes = rabbit_mnesia:all_clustered_nodes(), + case upgrades_required(mnesia) of + version_not_available -> + rabbit:prepare(), %% Ensure we have logs for this + case AllNodes of + [_] -> ok; + _ -> die("Cluster upgrade needed but upgrading from " + "< 2.1.1.~nUnfortunately you will need to " + "rebuild the cluster.", []) + end; + [] -> + ok; + Upgrades -> + rabbit:prepare(), %% Ensure we have logs for this + case upgrade_mode(AllNodes) of + primary -> primary_upgrade(Upgrades, AllNodes); + secondary -> secondary_upgrade(AllNodes) + end + end, + ok = rabbit_mnesia:delete_previously_running_disc_nodes(). + +upgrade_mode(AllNodes) -> + case nodes_running(AllNodes) of + [] -> + AfterUs = rabbit_mnesia:read_previously_running_disc_nodes(), + case {is_disc_node(), AfterUs} of + {true, []} -> + primary; + {true, _} -> + Filename = rabbit_mnesia:running_nodes_filename(), + die("Cluster upgrade needed but other disc nodes shut " + "down after this one.~nPlease first start the last " + "disc node to shut down.~nThe disc nodes that were " + "still running when this one shut down are:~n~n" + " ~p~n~nNote: if several disc nodes were shut down " + "simultaneously they may all~nshow this message. " + "In which case, remove the lock file on one of them " + "and~nstart that node. The lock file on this node " + "is:~n~n ~s ", + [AfterUs, Filename]); + {false, _} -> + die("Cluster upgrade needed but this is a ram node.~n" + "Please first start the last disc node to shut down.", + []) + end; + [Another|_] -> + ClusterVersion = + case rpc:call(Another, + rabbit_upgrade, desired_version, [mnesia]) of + {badrpc, {'EXIT', {undef, _}}} -> unknown_old_version; + {badrpc, Reason} -> {unknown, Reason}; + V -> V + end, + case desired_version(mnesia) of + ClusterVersion -> + %% The other node(s) have upgraded already, I am not the + %% upgrader + secondary; + MyVersion -> + %% The other node(s) are running an unexpected version. + die("Cluster upgrade needed but other nodes are " + "running ~p~nand I want ~p", + [ClusterVersion, MyVersion]) + end + end. + +is_disc_node() -> + %% This is pretty ugly but we can't start Mnesia and ask it (will hang), + %% we can't look at the config file (may not include us even if we're a + %% disc node). + filelib:is_regular(rabbit_mnesia:dir() ++ "/rabbit_durable_exchange.DCD"). + +die(Msg, Args) -> + %% We don't throw or exit here since that gets thrown + %% straight out into do_boot, generating an erl_crash.dump + %% and displaying any error message in a confusing way. + error_logger:error_msg(Msg, Args), + io:format("~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args), + error_logger:logfile(close), + halt(1). + +primary_upgrade(Upgrades, Nodes) -> + Others = Nodes -- [node()], + apply_upgrades( + mnesia, + Upgrades, + fun () -> + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + force_tables(), + case Others of + [] -> ok; + _ -> info("mnesia upgrades: Breaking cluster~n", []), + [{atomic, ok} = mnesia:del_table_copy(schema, Node) + || Node <- Others] + end + end), + ok. + +force_tables() -> + [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()]. + +secondary_upgrade(AllNodes) -> + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema), + %% Note that we cluster with all nodes, rather than all disc nodes + %% (as we can't know all disc nodes at this point). This is safe as + %% we're not writing the cluster config, just setting up Mnesia. + ClusterNodes = case is_disc_node() of + true -> AllNodes; + false -> AllNodes -- [node()] + end, + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + ok = rabbit_mnesia:init_db(ClusterNodes, true), + ok = write_version(mnesia), + ok. + +nodes_running(Nodes) -> + [N || N <- Nodes, node_running(N)]. + +node_running(Node) -> + case rpc:call(Node, application, which_applications, []) of + {badrpc, _} -> false; + Apps -> lists:keysearch(rabbit, 1, Apps) =/= false + end. + +%% ------------------------------------------------------------------- + +maybe_upgrade_local() -> + case upgrades_required(local) of + version_not_available -> version_not_available; + [] -> ok; + Upgrades -> apply_upgrades(local, Upgrades, + fun() -> ok end) end. read_version() -> case rabbit_misc:read_term_file(schema_filename()) of - {ok, [Heads]} -> {ok, Heads}; + {ok, [V]} -> {ok, V}; {error, _} = Err -> Err end. +read_version(Scope) -> + case read_version() of + {error, _} = E -> E; + {ok, V} -> {ok, filter_by_scope(Scope, V)} + end. + write_version() -> ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]), ok. +write_version(Scope) -> + {ok, V0} = read_version(), + V = flatten([case S of + Scope -> desired_version(S); + _ -> filter_by_scope(S, V0) + end || S <- ?SCOPES]), + ok = rabbit_misc:write_term_file(schema_filename(), [V]), + ok. + desired_version() -> - with_upgrade_graph(fun (G) -> heads(G) end). + flatten([desired_version(Scope) || Scope <- ?SCOPES]). + +desired_version(Scope) -> + with_upgrade_graph(fun (G) -> heads(G) end, Scope). + +flatten(LoL) -> + lists:sort(lists:append(LoL)). + +filter_by_scope(Scope, Versions) -> + with_upgrade_graph( + fun(G) -> + ScopeVs = digraph:vertices(G), + [V || V <- Versions, lists:member(V, ScopeVs)] + end, Scope). %% ------------------------------------------------------------------- -with_upgrade_graph(Fun) -> +upgrades_required(Scope) -> + case read_version(Scope) of + {ok, CurrentHeads} -> + with_upgrade_graph( + fun (G) -> + case unknown_heads(CurrentHeads, G) of + [] -> upgrades_to_apply(CurrentHeads, G); + Unknown -> throw({error, + {future_upgrades_found, Unknown}}) + end + end, Scope); + {error, enoent} -> + version_not_available + end. + +with_upgrade_graph(Fun, Scope) -> case rabbit_misc:build_acyclic_graph( - fun vertices/2, fun edges/2, + fun (Module, Steps) -> vertices(Module, Steps, Scope) end, + fun (Module, Steps) -> edges(Module, Steps, Scope) end, rabbit_misc:all_module_attributes(rabbit_upgrade)) of {ok, G} -> try Fun(G) @@ -92,12 +314,14 @@ with_upgrade_graph(Fun) -> throw({error, {cycle_in_upgrade_steps, StepNames}}) end. -vertices(Module, Steps) -> - [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps]. - -edges(_Module, Steps) -> - [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. +vertices(Module, Steps, Scope0) -> + [{StepName, {Module, StepName}} || {StepName, Scope1, _Reqs} <- Steps, + Scope0 == Scope1]. +edges(_Module, Steps, Scope0) -> + [{Require, StepName} || {StepName, Scope1, Requires} <- Steps, + Require <- Requires, + Scope0 == Scope1]. unknown_heads(Heads, G) -> [H || H <- Heads, digraph:vertex(G, H) =:= false]. @@ -119,12 +343,12 @@ heads(G) -> %% ------------------------------------------------------------------- -apply_upgrades(Upgrades) -> +apply_upgrades(Scope, Upgrades, Fun) -> LockFile = lock_filename(dir()), case rabbit_misc:lock_file(LockFile) of ok -> BackupDir = dir() ++ "-upgrade-backup", - info("Upgrades: ~w to apply~n", [length(Upgrades)]), + info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]), case rabbit_mnesia:copy_db(BackupDir) of ok -> %% We need to make the backup after creating the @@ -133,12 +357,15 @@ apply_upgrades(Upgrades) -> %% the lock file exists in the backup too, which %% is not intuitive. Remove it. ok = file:delete(lock_filename(BackupDir)), - info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), - [apply_upgrade(Upgrade) || Upgrade <- Upgrades], - info("Upgrades: All upgrades applied successfully~n", []), - ok = write_version(), + info("~s upgrades: Mnesia dir backed up to ~p~n", + [Scope, BackupDir]), + Fun(), + [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades], + info("~s upgrades: All upgrades applied successfully~n", + [Scope]), + ok = write_version(Scope), ok = rabbit_misc:recursive_delete([BackupDir]), - info("Upgrades: Mnesia backup removed~n", []), + info("~s upgrades: Mnesia backup removed~n", [Scope]), ok = file:delete(LockFile); {error, E} -> %% If we can't backup, the upgrade hasn't started @@ -151,8 +378,8 @@ apply_upgrades(Upgrades) -> throw({error, previous_upgrade_failed}) end. -apply_upgrade({M, F}) -> - info("Upgrades: Applying ~w:~w~n", [M, F]), +apply_upgrade(Scope, {M, F}) -> + info("~s upgrades: Applying ~w:~w~n", [Scope, M, F]), ok = apply(M, F, []). %% ------------------------------------------------------------------- diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b9dbe418..7567c29e 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -20,12 +20,12 @@ -compile([export_all]). --rabbit_upgrade({remove_user_scope, []}). --rabbit_upgrade({hash_passwords, []}). --rabbit_upgrade({add_ip_to_listener, []}). --rabbit_upgrade({internal_exchanges, []}). --rabbit_upgrade({user_to_internal_user, [hash_passwords]}). --rabbit_upgrade({topic_trie, []}). +-rabbit_upgrade({remove_user_scope, mnesia, []}). +-rabbit_upgrade({hash_passwords, mnesia, []}). +-rabbit_upgrade({add_ip_to_listener, mnesia, []}). +-rabbit_upgrade({internal_exchanges, mnesia, []}). +-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). +-rabbit_upgrade({topic_trie, mnesia, []}). %% ------------------------------------------------------------------- diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1b29756b..6e3460c5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -298,7 +298,7 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({multiple_routing_keys, []}). +-rabbit_upgrade({multiple_routing_keys, local, []}). -ifdef(use_specs). |