diff options
-rw-r--r-- | src/rabbit.erl | 5 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 111 | ||||
-rw-r--r-- | src/rabbit_prelaunch.erl | 2 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 331 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 12 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_version.erl | 172 |
8 files changed, 486 insertions, 151 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index c9a929ae..807e9e7d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -192,7 +192,8 @@ %%---------------------------------------------------------------------------- prepare() -> - ok = ensure_working_log_handlers(). + ok = ensure_working_log_handlers(), + ok = rabbit_upgrade:maybe_upgrade_mnesia(). start() -> try @@ -233,6 +234,7 @@ rotate_logs(BinarySuffix) -> start(normal, []) -> case erts_version_check() of ok -> + ok = rabbit_mnesia:delete_previously_running_nodes(), {ok, SupPid} = rabbit_sup:start_link(), true = register(rabbit, self()), @@ -245,6 +247,7 @@ start(normal, []) -> end. stop(_State) -> + ok = rabbit_mnesia:record_running_nodes(), terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 963d814e..fbcf07ae 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -18,9 +18,12 @@ -module(rabbit_mnesia). -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, - cluster/1, force_cluster/1, reset/0, force_reset/0, + cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1, wait_for_tables/1]). + empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, + create_cluster_nodes_config/1, read_cluster_nodes_config/0, + record_running_nodes/0, read_previously_running_nodes/0, + delete_previously_running_nodes/0, running_nodes_filename/0]). -export([table_names/0]). @@ -42,6 +45,7 @@ -spec(dir/0 :: () -> file:filename()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). +-spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). -spec(cluster/1 :: ([node()]) -> 'ok'). -spec(force_cluster/1 :: ([node()]) -> 'ok'). @@ -55,6 +59,12 @@ -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). +-spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). +-spec(read_cluster_nodes_config/0 :: () -> [node()]). +-spec(record_running_nodes/0 :: () -> 'ok'). +-spec(read_previously_running_nodes/0 :: () -> [node()]). +-spec(delete_previously_running_nodes/0 :: () -> 'ok'). +-spec(running_nodes_filename/0 :: () -> file:filename()). -endif. @@ -80,7 +90,8 @@ status() -> init() -> ensure_mnesia_running(), ensure_mnesia_dir(), - ok = init_db(read_cluster_nodes_config(), true), + ok = init_db(read_cluster_nodes_config(), true, + fun maybe_upgrade_local_or_record_desired/0), ok. is_db_empty() -> @@ -102,7 +113,8 @@ cluster(ClusterNodes, Force) -> ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try - ok = init_db(ClusterNodes, Force), + ok = init_db(ClusterNodes, Force, + fun maybe_upgrade_local_or_record_desired/0), ok = create_cluster_nodes_config(ClusterNodes) after mnesia:stop() @@ -367,11 +379,40 @@ delete_cluster_nodes_config() -> FileName, Reason}}) end. +running_nodes_filename() -> + filename:join(dir(), "nodes_running_at_shutdown"). + +record_running_nodes() -> + FileName = running_nodes_filename(), + Nodes = running_clustered_nodes() -- [node()], + %% Don't check the result: we're shutting down anyway and this is + %% a best-effort-basis. + rabbit_misc:write_term_file(FileName, [Nodes]), + ok. + +read_previously_running_nodes() -> + FileName = running_nodes_filename(), + case rabbit_misc:read_term_file(FileName) of + {ok, [Nodes]} -> Nodes; + {error, enoent} -> []; + {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, + FileName, Reason}}) + end. + +delete_previously_running_nodes() -> + FileName = running_nodes_filename(), + case file:delete(FileName) of + ok -> ok; + {error, enoent} -> ok; + {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, + FileName, Reason}}) + end. + %% Take a cluster node config and create the right kind of node - a %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. If Force is false, don't allow %% connections to offline nodes. -init_db(ClusterNodes, Force) -> +init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) -> UClusterNodes = lists:usort(ClusterNodes), ProperClusterNodes = UClusterNodes -- [node()], case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of @@ -387,26 +428,21 @@ init_db(ClusterNodes, Force) -> end; true -> ok end, - case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of - {[], true, [_]} -> - %% True single disc node, attempt upgrade - case rabbit_upgrade:maybe_upgrade() of - ok -> ensure_schema_integrity(); - version_not_available -> schema_ok_or_move() - end; - {[], true, _} -> - %% "Master" (i.e. without config) disc node in cluster, - %% verify schema - ensure_version_ok(rabbit_upgrade:read_version()), - ensure_schema_integrity(); - {[], false, _} -> + case {Nodes, mnesia:system_info(use_dir)} of + {[], false} -> %% Nothing there at all, start from scratch ok = create_schema(); - {[AnotherNode|_], _, _} -> + {[], true} -> + %% We're the first node up + case rabbit_upgrade:maybe_upgrade_local() of + ok -> ensure_schema_integrity(); + version_not_available -> ok = schema_ok_or_move() + end, + ok; + {[AnotherNode|_], _} -> %% Subsequent node in cluster, catch up - ensure_version_ok(rabbit_upgrade:read_version()), ensure_version_ok( - rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), + rpc:call(AnotherNode, rabbit_version, recorded, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), ok = wait_for_replicated_tables(), @@ -415,7 +451,9 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), - ensure_schema_integrity() + ok = SecondaryPostMnesiaFun(), + ensure_schema_integrity(), + ok end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -424,6 +462,14 @@ init_db(ClusterNodes, Force) -> throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) end. +maybe_upgrade_local_or_record_desired() -> + case rabbit_upgrade:maybe_upgrade_local() of + ok -> ok; + %% If we're just starting up a new node we won't have a + %% version + version_not_available -> ok = rabbit_version:record_desired() + end. + schema_ok_or_move() -> case check_schema_integrity() of ok -> @@ -440,13 +486,13 @@ schema_ok_or_move() -> end. ensure_version_ok({ok, DiscVersion}) -> - case rabbit_upgrade:desired_version() of - DiscVersion -> ok; - DesiredVersion -> throw({error, {schema_mismatch, - DesiredVersion, DiscVersion}}) + DesiredVersion = rabbit_version:desired(), + case rabbit_version:matches(DesiredVersion, DiscVersion) of + true -> ok; + false -> throw({error, {version_mismatch, DesiredVersion, DiscVersion}}) end; ensure_version_ok({error, _}) -> - ok = rabbit_upgrade:write_version(). + ok = rabbit_version:record_desired(). create_schema() -> mnesia:stop(), @@ -456,7 +502,7 @@ create_schema() -> cannot_start_mnesia), ok = create_tables(), ensure_schema_integrity(), - ok = rabbit_upgrade:write_version(). + ok = rabbit_version:record_desired(). move_db() -> mnesia:stop(), @@ -481,13 +527,8 @@ move_db() -> ok. copy_db(Destination) -> - mnesia:stop(), - case rabbit_misc:recursive_copy(dir(), Destination) of - ok -> - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia); - {error, E} -> - {error, E} - end. + ok = ensure_mnesia_not_running(), + rabbit_misc:recursive_copy(dir(), Destination). create_tables() -> lists:foreach(fun ({Tab, TabDef}) -> diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 7bb8c0ea..8800e8d6 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -235,7 +235,7 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> +process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) -> [{apply,{rabbit,prepare,[]}}, Entry]; process_entry(Entry) -> [Entry]. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 33c5391b..367953b8 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -169,7 +169,7 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({add_queue_ttl, []}). +-rabbit_upgrade({add_queue_ttl, local, []}). -ifdef(use_specs). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index ebda5d03..5ec08330 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -16,7 +16,7 @@ -module(rabbit_upgrade). --export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]). +-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]). -include("rabbit.hrl"). @@ -27,141 +27,260 @@ -ifdef(use_specs). --type(step() :: atom()). --type(version() :: [step()]). - --spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available'). --spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). --spec(write_version/0 :: () -> 'ok'). --spec(desired_version/0 :: () -> version()). +-spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). +-spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available'). -endif. %% ------------------------------------------------------------------- -%% Try to upgrade the schema. If no information on the existing schema -%% could be found, do nothing. rabbit_mnesia:check_schema_integrity() -%% will catch the problem. -maybe_upgrade() -> - case read_version() of - {ok, CurrentHeads} -> - with_upgrade_graph( - fun (G) -> - case unknown_heads(CurrentHeads, G) of - [] -> case upgrades_to_apply(CurrentHeads, G) of - [] -> ok; - Upgrades -> apply_upgrades(Upgrades) - end; - Unknown -> throw({error, - {future_upgrades_found, Unknown}}) - end - end); - {error, enoent} -> - version_not_available +%% The upgrade logic is quite involved, due to the existence of +%% clusters. +%% +%% Firstly, we have two different types of upgrades to do: Mnesia and +%% everythinq else. Mnesia upgrades must only be done by one node in +%% the cluster (we treat a non-clustered node as a single-node +%% cluster). This is the primary upgrader. The other upgrades need to +%% be done by all nodes. +%% +%% The primary upgrader has to start first (and do its Mnesia +%% upgrades). Secondary upgraders need to reset their Mnesia database +%% and then rejoin the cluster. They can't do the Mnesia upgrades as +%% well and then merge databases since the cookie for each table will +%% end up different and the merge will fail. +%% +%% This in turn means that we need to determine whether we are the +%% primary or secondary upgrader *before* Mnesia comes up. If we +%% didn't then the secondary upgrader would try to start Mnesia, and +%% either hang waiting for a node which is not yet up, or fail since +%% its schema differs from the other nodes in the cluster. +%% +%% Also, the primary upgrader needs to start Mnesia to do its +%% upgrades, but needs to forcibly load tables rather than wait for +%% them (in case it was not the last node to shut down, in which case +%% it would wait forever). +%% +%% This in turn means that maybe_upgrade_mnesia/0 has to be patched +%% into the boot process by prelaunch before the mnesia application is +%% started. By the time Mnesia is started the upgrades have happened +%% (on the primary), or Mnesia has been reset (on the secondary) and +%% rabbit_mnesia:init_db/2 can then make the node rejoin the cluster +%% in the normal way. +%% +%% The non-mnesia upgrades are then triggered by +%% rabbit_mnesia:init_db/2. Of course, it's possible for a given +%% upgrade process to only require Mnesia upgrades, or only require +%% non-Mnesia upgrades. In the latter case no Mnesia resets and +%% reclusterings occur. +%% +%% The primary upgrader needs to be a disc node. Ideally we would like +%% it to be the last disc node to shut down (since otherwise there's a +%% risk of data loss). On each node we therefore record the disc nodes +%% that were still running when we shut down. A disc node that knows +%% other nodes were up when it shut down, or a ram node, will refuse +%% to be the primary upgrader, and will thus not start when upgrades +%% are needed. +%% +%% However, this is racy if several nodes are shut down at once. Since +%% rabbit records the running nodes, and shuts down before mnesia, the +%% race manifests as all disc nodes thinking they are not the primary +%% upgrader. Therefore the user can remove the record of the last disc +%% node to shut down to get things going again. This may lose any +%% mnesia changes that happened after the node chosen as the primary +%% upgrader was shut down. + +%% ------------------------------------------------------------------- + +ensure_backup_taken() -> + case filelib:is_file(lock_filename()) of + false -> case filelib:is_dir(backup_dir()) of + false -> ok = take_backup(); + _ -> ok + end; + true -> throw({error, previous_upgrade_failed}) end. -read_version() -> - case rabbit_misc:read_term_file(schema_filename()) of - {ok, [Heads]} -> {ok, Heads}; - {error, _} = Err -> Err +take_backup() -> + BackupDir = backup_dir(), + case rabbit_mnesia:copy_db(BackupDir) of + ok -> info("upgrades: Mnesia dir backed up to ~p~n", + [BackupDir]); + {error, E} -> throw({could_not_back_up_mnesia_dir, E}) end. -write_version() -> - ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]), - ok. +ensure_backup_removed() -> + case filelib:is_dir(backup_dir()) of + true -> ok = remove_backup(); + _ -> ok + end. -desired_version() -> - with_upgrade_graph(fun (G) -> heads(G) end). +remove_backup() -> + ok = rabbit_misc:recursive_delete([backup_dir()]), + info("upgrades: Mnesia backup removed~n", []). -%% ------------------------------------------------------------------- +maybe_upgrade_mnesia() -> + AllNodes = rabbit_mnesia:all_clustered_nodes(), + case rabbit_version:upgrades_required(mnesia) of + {error, version_not_available} -> + case AllNodes of + [_] -> ok; + _ -> die("Cluster upgrade needed but upgrading from " + "< 2.1.1.~nUnfortunately you will need to " + "rebuild the cluster.", []) + end; + {error, _} = Err -> + throw(Err); + {ok, []} -> + ok; + {ok, Upgrades} -> + ensure_backup_taken(), + ok = case upgrade_mode(AllNodes) of + primary -> primary_upgrade(Upgrades, AllNodes); + secondary -> secondary_upgrade(AllNodes) + end + end. -with_upgrade_graph(Fun) -> - case rabbit_misc:build_acyclic_graph( - fun vertices/2, fun edges/2, - rabbit_misc:all_module_attributes(rabbit_upgrade)) of - {ok, G} -> try - Fun(G) - after - true = digraph:delete(G) - end; - {error, {vertex, duplicate, StepName}} -> - throw({error, {duplicate_upgrade_step, StepName}}); - {error, {edge, {bad_vertex, StepName}, _From, _To}} -> - throw({error, {dependency_on_unknown_upgrade_step, StepName}}); - {error, {edge, {bad_edge, StepNames}, _From, _To}} -> - throw({error, {cycle_in_upgrade_steps, StepNames}}) +upgrade_mode(AllNodes) -> + case nodes_running(AllNodes) of + [] -> + AfterUs = rabbit_mnesia:read_previously_running_nodes(), + case {is_disc_node(), AfterUs} of + {true, []} -> + primary; + {true, _} -> + Filename = rabbit_mnesia:running_nodes_filename(), + die("Cluster upgrade needed but other disc nodes shut " + "down after this one.~nPlease first start the last " + "disc node to shut down.~n~nNote: if several disc " + "nodes were shut down simultaneously they may " + "all~nshow this message. In which case, remove " + "the lock file on one of them and~nstart that node. " + "The lock file on this node is:~n~n ~s ", [Filename]); + {false, _} -> + die("Cluster upgrade needed but this is a ram node.~n" + "Please first start the last disc node to shut down.", + []) + end; + [Another|_] -> + MyVersion = rabbit_version:desired_for_scope(mnesia), + ErrFun = fun (ClusterVersion) -> + %% The other node(s) are running an + %% unexpected version. + die("Cluster upgrade needed but other nodes are " + "running ~p~nand I want ~p", + [ClusterVersion, MyVersion]) + end, + case rpc:call(Another, rabbit_version, desired_for_scope, + [mnesia]) of + {badrpc, {'EXIT', {undef, _}}} -> ErrFun(unknown_old_version); + {badrpc, Reason} -> ErrFun({unknown, Reason}); + CV -> case rabbit_version:matches( + MyVersion, CV) of + true -> secondary; + false -> ErrFun(CV) + end + end end. -vertices(Module, Steps) -> - [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps]. +is_disc_node() -> + %% This is pretty ugly but we can't start Mnesia and ask it (will hang), + %% we can't look at the config file (may not include us even if we're a + %% disc node). + filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")). + +die(Msg, Args) -> + %% We don't throw or exit here since that gets thrown + %% straight out into do_boot, generating an erl_crash.dump + %% and displaying any error message in a confusing way. + error_logger:error_msg(Msg, Args), + io:format("~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args), + error_logger:logfile(close), + halt(1). + +primary_upgrade(Upgrades, Nodes) -> + Others = Nodes -- [node()], + ok = apply_upgrades( + mnesia, + Upgrades, + fun () -> + force_tables(), + case Others of + [] -> ok; + _ -> info("mnesia upgrades: Breaking cluster~n", []), + [{atomic, ok} = mnesia:del_table_copy(schema, Node) + || Node <- Others] + end + end), + ok. -edges(_Module, Steps) -> - [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. +force_tables() -> + [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()]. -unknown_heads(Heads, G) -> - [H || H <- Heads, digraph:vertex(G, H) =:= false]. +secondary_upgrade(AllNodes) -> + %% must do this before we wipe out schema + IsDiscNode = is_disc_node(), + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema), + %% Note that we cluster with all nodes, rather than all disc nodes + %% (as we can't know all disc nodes at this point). This is safe as + %% we're not writing the cluster config, just setting up Mnesia. + ClusterNodes = case IsDiscNode of + true -> AllNodes; + false -> AllNodes -- [node()] + end, + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end), + ok = rabbit_version:record_desired_for_scope(mnesia), + ok. -upgrades_to_apply(Heads, G) -> - %% Take all the vertices which can reach the known heads. That's - %% everything we've already applied. Subtract that from all - %% vertices: that's what we have to apply. - Unsorted = sets:to_list( - sets:subtract( - sets:from_list(digraph:vertices(G)), - sets:from_list(digraph_utils:reaching(Heads, G)))), - %% Form a subgraph from that list and find a topological ordering - %% so we can invoke them in order. - [element(2, digraph:vertex(G, StepName)) || - StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. +nodes_running(Nodes) -> + [N || N <- Nodes, node_running(N)]. -heads(G) -> - lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). +node_running(Node) -> + case rpc:call(Node, application, which_applications, []) of + {badrpc, _} -> false; + Apps -> lists:keysearch(rabbit, 1, Apps) =/= false + end. %% ------------------------------------------------------------------- -apply_upgrades(Upgrades) -> - LockFile = lock_filename(dir()), - case rabbit_misc:lock_file(LockFile) of - ok -> - BackupDir = dir() ++ "-upgrade-backup", - info("Upgrades: ~w to apply~n", [length(Upgrades)]), - case rabbit_mnesia:copy_db(BackupDir) of - ok -> - %% We need to make the backup after creating the - %% lock file so that it protects us from trying to - %% overwrite the backup. Unfortunately this means - %% the lock file exists in the backup too, which - %% is not intuitive. Remove it. - ok = file:delete(lock_filename(BackupDir)), - info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), - [apply_upgrade(Upgrade) || Upgrade <- Upgrades], - info("Upgrades: All upgrades applied successfully~n", []), - ok = write_version(), - ok = rabbit_misc:recursive_delete([BackupDir]), - info("Upgrades: Mnesia backup removed~n", []), - ok = file:delete(LockFile); - {error, E} -> - %% If we can't backup, the upgrade hasn't started - %% hence we don't need the lockfile since the real - %% mnesia dir is the good one. - ok = file:delete(LockFile), - throw({could_not_back_up_mnesia_dir, E}) - end; - {error, eexist} -> - throw({error, previous_upgrade_failed}) +maybe_upgrade_local() -> + case rabbit_version:upgrades_required(local) of + {error, version_not_available} -> version_not_available; + {error, _} = Err -> throw(Err); + {ok, []} -> ensure_backup_removed(), + ok; + {ok, Upgrades} -> mnesia:stop(), + ensure_backup_taken(), + ok = apply_upgrades(local, Upgrades, + fun () -> ok end), + ensure_backup_removed(), + ok end. -apply_upgrade({M, F}) -> - info("Upgrades: Applying ~w:~w~n", [M, F]), +%% ------------------------------------------------------------------- + +apply_upgrades(Scope, Upgrades, Fun) -> + ok = rabbit_misc:lock_file(lock_filename()), + info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]), + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + Fun(), + [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades], + info("~s upgrades: All upgrades applied successfully~n", [Scope]), + ok = rabbit_version:record_desired_for_scope(Scope), + ok = file:delete(lock_filename()). + +apply_upgrade(Scope, {M, F}) -> + info("~s upgrades: Applying ~w:~w~n", [Scope, M, F]), ok = apply(M, F, []). %% ------------------------------------------------------------------- dir() -> rabbit_mnesia:dir(). -schema_filename() -> filename:join(dir(), ?VERSION_FILENAME). - +lock_filename() -> lock_filename(dir()). lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). +backup_dir() -> dir() ++ "-upgrade-backup". %% NB: we cannot use rabbit_log here since it may not have been %% started yet diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b9dbe418..7567c29e 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -20,12 +20,12 @@ -compile([export_all]). --rabbit_upgrade({remove_user_scope, []}). --rabbit_upgrade({hash_passwords, []}). --rabbit_upgrade({add_ip_to_listener, []}). --rabbit_upgrade({internal_exchanges, []}). --rabbit_upgrade({user_to_internal_user, [hash_passwords]}). --rabbit_upgrade({topic_trie, []}). +-rabbit_upgrade({remove_user_scope, mnesia, []}). +-rabbit_upgrade({hash_passwords, mnesia, []}). +-rabbit_upgrade({add_ip_to_listener, mnesia, []}). +-rabbit_upgrade({internal_exchanges, mnesia, []}). +-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). +-rabbit_upgrade({topic_trie, mnesia, []}). %% ------------------------------------------------------------------- diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 14c36b12..7a1102e5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -301,7 +301,7 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({multiple_routing_keys, []}). +-rabbit_upgrade({multiple_routing_keys, local, []}). -ifdef(use_specs). diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl new file mode 100644 index 00000000..400abc10 --- /dev/null +++ b/src/rabbit_version.erl @@ -0,0 +1,172 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_version). + +-export([recorded/0, matches/2, desired/0, desired_for_scope/1, + record_desired/0, record_desired_for_scope/1, + upgrades_required/1]). + +%% ------------------------------------------------------------------- +-ifdef(use_specs). + +-export_type([scope/0, step/0]). + +-type(scope() :: atom()). +-type(scope_version() :: [atom()]). +-type(step() :: {atom(), atom()}). + +-type(version() :: [atom()]). + +-spec(recorded/0 :: () -> rabbit_types:ok_or_error2(version(), any())). +-spec(matches/2 :: ([A], [A]) -> boolean()). +-spec(desired/0 :: () -> version()). +-spec(desired_for_scope/1 :: (scope()) -> scope_version()). +-spec(record_desired/0 :: () -> 'ok'). +-spec(record_desired_for_scope/1 :: + (scope()) -> rabbit_types:ok_or_error(any())). +-spec(upgrades_required/1 :: + (scope()) -> rabbit_types:ok_or_error2([step()], any())). + +-endif. +%% ------------------------------------------------------------------- + +-define(VERSION_FILENAME, "schema_version"). +-define(SCOPES, [mnesia, local]). + +%% ------------------------------------------------------------------- + +recorded() -> case rabbit_misc:read_term_file(schema_filename()) of + {ok, [V]} -> {ok, V}; + {error, _} = Err -> Err + end. + +record(V) -> ok = rabbit_misc:write_term_file(schema_filename(), [V]). + +recorded_for_scope(Scope) -> + case recorded() of + {error, _} = Err -> + Err; + {ok, Version} -> + {ok, case lists:keysearch(Scope, 1, categorise_by_scope(Version)) of + false -> []; + {value, {Scope, SV1}} -> SV1 + end} + end. + +record_for_scope(Scope, ScopeVersion) -> + case recorded() of + {error, _} = Err -> + Err; + {ok, Version} -> + Version1 = lists:keystore(Scope, 1, categorise_by_scope(Version), + {Scope, ScopeVersion}), + ok = record([Name || {_Scope, Names} <- Version1, Name <- Names]) + end. + +%% ------------------------------------------------------------------- + +matches(VerA, VerB) -> + lists:usort(VerA) =:= lists:usort(VerB). + +%% ------------------------------------------------------------------- + +desired() -> [Name || Scope <- ?SCOPES, Name <- desired_for_scope(Scope)]. + +desired_for_scope(Scope) -> with_upgrade_graph(fun heads/1, Scope). + +record_desired() -> record(desired()). + +record_desired_for_scope(Scope) -> + record_for_scope(Scope, desired_for_scope(Scope)). + +upgrades_required(Scope) -> + case recorded_for_scope(Scope) of + {error, enoent} -> + {error, version_not_available}; + {ok, CurrentHeads} -> + with_upgrade_graph( + fun (G) -> + case unknown_heads(CurrentHeads, G) of + [] -> {ok, upgrades_to_apply(CurrentHeads, G)}; + Unknown -> {error, {future_upgrades_found, Unknown}} + end + end, Scope) + end. + +%% ------------------------------------------------------------------- + +with_upgrade_graph(Fun, Scope) -> + case rabbit_misc:build_acyclic_graph( + fun (Module, Steps) -> vertices(Module, Steps, Scope) end, + fun (Module, Steps) -> edges(Module, Steps, Scope) end, + rabbit_misc:all_module_attributes(rabbit_upgrade)) of + {ok, G} -> try + Fun(G) + after + true = digraph:delete(G) + end; + {error, {vertex, duplicate, StepName}} -> + throw({error, {duplicate_upgrade_step, StepName}}); + {error, {edge, {bad_vertex, StepName}, _From, _To}} -> + throw({error, {dependency_on_unknown_upgrade_step, StepName}}); + {error, {edge, {bad_edge, StepNames}, _From, _To}} -> + throw({error, {cycle_in_upgrade_steps, StepNames}}) + end. + +vertices(Module, Steps, Scope0) -> + [{StepName, {Module, StepName}} || {StepName, Scope1, _Reqs} <- Steps, + Scope0 == Scope1]. + +edges(_Module, Steps, Scope0) -> + [{Require, StepName} || {StepName, Scope1, Requires} <- Steps, + Require <- Requires, + Scope0 == Scope1]. +unknown_heads(Heads, G) -> + [H || H <- Heads, digraph:vertex(G, H) =:= false]. + +upgrades_to_apply(Heads, G) -> + %% Take all the vertices which can reach the known heads. That's + %% everything we've already applied. Subtract that from all + %% vertices: that's what we have to apply. + Unsorted = sets:to_list( + sets:subtract( + sets:from_list(digraph:vertices(G)), + sets:from_list(digraph_utils:reaching(Heads, G)))), + %% Form a subgraph from that list and find a topological ordering + %% so we can invoke them in order. + [element(2, digraph:vertex(G, StepName)) || + StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. + +heads(G) -> + lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). + +%% ------------------------------------------------------------------- + +categorise_by_scope(Version) when is_list(Version) -> + Categorised = + [{Scope, Name} || {_Module, Attributes} <- + rabbit_misc:all_module_attributes(rabbit_upgrade), + {Name, Scope, _Requires} <- Attributes, + lists:member(Name, Version)], + orddict:to_list( + lists:foldl(fun ({Scope, Name}, CatVersion) -> + rabbit_misc:orddict_cons(Scope, Name, CatVersion) + end, orddict:new(), Categorised)). + +dir() -> rabbit_mnesia:dir(). + +schema_filename() -> filename:join(dir(), ?VERSION_FILENAME). |