diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-11-09 16:41:39 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-11-09 16:41:39 +0000 |
commit | 6d7741caeac453498ead442805a5c46bbd9312c3 (patch) | |
tree | ff39b980a66128343a16a490baf19302f7b99e41 | |
parent | 7ed10703eeea9379736e32cec751f23c3393aa4c (diff) | |
parent | 0cd23736f28817385eba70dfea045567da0e6194 (diff) | |
download | rabbitmq-server-6d7741caeac453498ead442805a5c46bbd9312c3.tar.gz |
Merged from default
-rw-r--r-- | src/rabbit.erl | 87 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 44 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 92 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 160 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 73 |
5 files changed, 369 insertions, 87 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 8c36a9f0..a1dd2c2e 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -301,49 +301,38 @@ run_boot_step({StepName, Attributes}) -> ok end. -module_attributes(Module) -> - case catch Module:module_info(attributes) of - {'EXIT', {undef, [{Module, module_info, _} | _]}} -> - io:format("WARNING: module ~p not found, so not scanned for boot steps.~n", - [Module]), - []; - {'EXIT', Reason} -> - exit(Reason); - V -> - V - end. - boot_steps() -> - AllApps = [App || {App, _, _} <- application:loaded_applications()], - Modules = lists:usort( - lists:append([Modules - || {ok, Modules} <- - [application:get_key(App, modules) - || App <- AllApps]])), - UnsortedSteps = - lists:flatmap(fun (Module) -> - [{StepName, Attributes} - || {rabbit_boot_step, [{StepName, Attributes}]} - <- module_attributes(Module)] - end, Modules), - sort_boot_steps(UnsortedSteps). + sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)). + +vertices(_Module, Steps) -> + [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps]. + +edges(_Module, Steps) -> + [case Key of + requires -> {StepName, OtherStep}; + enables -> {OtherStep, StepName} + end || {StepName, Atts} <- Steps, + {Key, OtherStep} <- Atts, + Key =:= requires orelse Key =:= enables]. + +graph_build_error({vertex, duplicate, StepName}) -> + boot_error("Duplicate boot step name: ~w~n", [StepName]); +graph_build_error({edge, Reason, From, To}) -> + boot_error( + "Could not add boot step dependency of ~w on ~w:~n~s", + [To, From, + case Reason of + {bad_vertex, V} -> + io_lib:format("Boot step not registered: ~w~n", [V]); + {bad_edge, [First | Rest]} -> + [io_lib:format("Cyclic dependency: ~w", [First]), + [io_lib:format(" depends on ~w", [Next]) || Next <- Rest], + io_lib:format(" depends on ~w~n", [First])] + end]). sort_boot_steps(UnsortedSteps) -> - G = digraph:new([acyclic]), - - %% Add vertices, with duplicate checking. - [case digraph:vertex(G, StepName) of - false -> digraph:add_vertex(G, StepName, Step); - _ -> boot_error("Duplicate boot step name: ~w~n", [StepName]) - end || Step = {StepName, _Attrs} <- UnsortedSteps], - - %% Add edges, detecting cycles and missing vertices. - lists:foreach(fun ({StepName, Attributes}) -> - [add_boot_step_dep(G, StepName, PrecedingStepName) - || {requires, PrecedingStepName} <- Attributes], - [add_boot_step_dep(G, SucceedingStepName, StepName) - || {enables, SucceedingStepName} <- Attributes] - end, UnsortedSteps), + G = rabbit_misc:build_acyclic_graph( + fun vertices/2, fun edges/2, fun graph_build_error/1, UnsortedSteps), %% Use topological sort to find a consistent ordering (if there is %% one, otherwise fail). @@ -365,24 +354,6 @@ sort_boot_steps(UnsortedSteps) -> [MissingFunctions]) end. -add_boot_step_dep(G, RunsSecond, RunsFirst) -> - case digraph:add_edge(G, RunsSecond, RunsFirst) of - {error, Reason} -> - boot_error("Could not add boot step dependency of ~w on ~w:~n~s", - [RunsSecond, RunsFirst, - case Reason of - {bad_vertex, V} -> - io_lib:format("Boot step not registered: ~w~n", [V]); - {bad_edge, [First | Rest]} -> - [io_lib:format("Cyclic dependency: ~w", [First]), - [io_lib:format(" depends on ~w", [Next]) - || Next <- Rest], - io_lib:format(" depends on ~w~n", [First])] - end]); - _ -> - ok - end. - %%--------------------------------------------------------------------------- log_location(Type) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index e5c30c06..128fca60 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -64,6 +64,7 @@ -export([recursive_delete/1, dict_cons/3, orddict_cons/3, unlink_and_capture_exit/1]). -export([get_options/2]). +-export([all_module_attributes/1, build_acyclic_graph/4]). -export([now_ms/0]). -import(mnesia). @@ -185,6 +186,7 @@ -spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). -spec(get_options/2 :: ([optdef()], [string()]) -> {[string()], [{string(), any()}]}). +-spec(all_module_attributes/1 :: (atom()) -> dict:dictionary()). -spec(now_ms/0 :: () -> non_neg_integer()). -endif. @@ -726,3 +728,45 @@ get_flag(_, []) -> now_ms() -> timer:now_diff(now(), {0,0,0}) div 1000. + +module_attributes(Module) -> + case catch Module:module_info(attributes) of + {'EXIT', {undef, [{Module, module_info, _} | _]}} -> + io:format("WARNING: module ~p not found, so not scanned for boot steps.~n", + [Module]), + []; + {'EXIT', Reason} -> + exit(Reason); + V -> + V + end. + +all_module_attributes(Name) -> + Modules = + lists:usort( + lists:append( + [Modules || {App, _, _} <- application:loaded_applications(), + {ok, Modules} <- [application:get_key(App, modules)]])), + lists:foldl( + fun (Module, Acc) -> + case lists:append([Atts || {N, Atts} <- module_attributes(Module), + N =:= Name]) of + [] -> Acc; + Atts -> [{Module, Atts} | Acc] + end + end, [], Modules). + + +build_acyclic_graph(VertexFun, EdgeFun, ErrorFun, Graph) -> + G = digraph:new([acyclic]), + [ case digraph:vertex(G, Vertex) of + false -> digraph:add_vertex(G, Vertex, Label); + _ -> ErrorFun({vertex, duplicate, Vertex}) + end || {Module, Atts} <- Graph, + {Vertex, Label} <- VertexFun(Module, Atts) ], + [ case digraph:add_edge(G, From, To) of + {error, E} -> ErrorFun({edge, E, From, To}); + _ -> ok + end || {Module, Atts} <- Graph, + {From, To} <- EdgeFun(Module, Atts) ], + G. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 8de2f0d6..9bb70143 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -44,9 +44,6 @@ -include("rabbit.hrl"). --define(SCHEMA_VERSION_SET, []). --define(SCHEMA_VERSION_FILENAME, "schema_version"). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -94,9 +91,6 @@ init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), - ok = rabbit_misc:write_term_file(filename:join( - dir(), ?SCHEMA_VERSION_FILENAME), - [?SCHEMA_VERSION_SET]), ok. is_db_empty() -> @@ -378,28 +372,30 @@ init_db(ClusterNodes, Force) -> end; _ -> ok end, - case Nodes of - [] -> - case mnesia:system_info(use_dir) of - true -> - case check_schema_integrity() of - ok -> - ok; - {error, Reason} -> - %% NB: we cannot use rabbit_log here since - %% it may not have been started yet - error_logger:warning_msg( - "schema integrity check failed: ~p~n" - "moving database to backup location " - "and recreating schema from scratch~n", - [Reason]), - ok = move_db(), - ok = create_schema() - end; - false -> - ok = create_schema() + case {Nodes, mnesia:system_info(use_dir), + mnesia:system_info(db_nodes)} of + {[], true, [_]} -> + %% True single disc node, attempt upgrade + wait_for_tables(), + case rabbit_upgrade:maybe_upgrade() of + ok -> + schema_ok_or_exit(); + version_not_available -> + schema_ok_or_move() end; - [_|_] -> + {[], true, _} -> + %% "Master" (i.e. without config) disc node in cluster, + %% verify schema + wait_for_tables(), + version_ok_or_exit(rabbit_upgrade:read_version()), + schema_ok_or_exit(); + {[], false, _} -> + %% First RAM node in cluster, start from scratch + ok = create_schema(); + {[AnotherNode|_], _, _} -> + %% Subsequent node in cluster, catch up + version_ok_or_exit( + rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), ok = wait_for_replicated_tables(), @@ -408,7 +404,7 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), - ok = ensure_schema_integrity() + schema_ok_or_exit() end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -418,6 +414,43 @@ init_db(ClusterNodes, Force) -> ClusterNodes, Reason}}) end. +schema_ok_or_move() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + %% NB: we cannot use rabbit_log here since it may not have been + %% started yet + error_logger:warning_msg("schema integrity check failed: ~p~n" + "moving database to backup location " + "and recreating schema from scratch~n", + [Reason]), + ok = move_db(), + ok = create_schema() + end. + +version_ok_or_exit(V) -> + DesiredVersion = rabbit_upgrade:desired_version(), + case V of + {ok, DiscVersion} -> + case DesiredVersion of + DiscVersion -> + ok; + _ -> + exit({schema_mismatch, DesiredVersion, DiscVersion}) + end; + {error, _} -> + ok = rabbit_upgrade:write_version() + end. + +schema_ok_or_exit() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + exit({schema_invalid, Reason}) + end. + create_schema() -> mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), @@ -426,7 +459,8 @@ create_schema() -> cannot_start_mnesia), ok = create_tables(), ok = ensure_schema_integrity(), - ok = wait_for_tables(). + ok = wait_for_tables(), + ok = rabbit_upgrade:write_version(). move_db() -> mnesia:stop(), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl new file mode 100644 index 00000000..37bac90d --- /dev/null +++ b/src/rabbit_upgrade.erl @@ -0,0 +1,160 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are Rabbit Technologies Ltd. +%% +%% Copyright (C) 2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_upgrade). + +-export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]). + +-include("rabbit.hrl"). + +-define(VERSION_FILENAME, "schema_version"). +-define(LOCK_FILENAME, "schema_upgrade_lock"). + +%% ------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available'). +-spec(read_version/0 :: + () -> {'ok', [any()]} | rabbit_types:error(any())). +-spec(write_version/0 :: () -> 'ok'). +-spec(desired_version/0 :: () -> [any()]). + +-endif. + +%% ------------------------------------------------------------------- + +%% Try to upgrade the schema. If no information on the existing schema could +%% be found, do nothing. rabbit_mnesia:check_schema_integrity() will catch the +%% problem. +maybe_upgrade() -> + case read_version() of + {ok, CurrentHeads} -> + G = load_graph(), + case unknown_heads(CurrentHeads, G) of + [] -> + case upgrades_to_apply(CurrentHeads, G) of + [] -> ok; + Upgrades -> apply_upgrades(Upgrades) + end; + Unknown -> + [warn("Data store has had future upgrade ~w applied." ++ + " Will not upgrade.~n", [U]) || U <- Unknown] + end, + true = digraph:delete(G), + ok; + {error, enoent} -> + version_not_available + end. + +read_version() -> + case rabbit_misc:read_term_file(schema_filename()) of + {ok, [Heads]} -> {ok, Heads}; + {error, E} -> {error, E} + end. + +write_version() -> + ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]), + ok. + +desired_version() -> + G = load_graph(), + Version = heads(G), + true = digraph:delete(G), + Version. + +%% ------------------------------------------------------------------- + +load_graph() -> + Upgrades = rabbit_misc:all_module_attributes(rabbit_upgrade), + rabbit_misc:build_acyclic_graph( + fun vertices/2, fun edges/2, fun graph_build_error/1, Upgrades). + +vertices(Module, Steps) -> + [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps]. + +edges(_Module, Steps) -> + [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. + +graph_build_error({vertex, duplicate, StepName}) -> + exit({duplicate_upgrade, StepName}); +graph_build_error({edge, E, From, To}) -> + exit({E, From, To}). + +unknown_heads(Heads, G) -> + [H || H <- Heads, digraph:vertex(G, H) =:= false]. + +upgrades_to_apply(Heads, G) -> + %% Take all the vertices which can reach the known heads. That's + %% everything we've already applied. Subtract that from all + %% vertices: that's what we have to apply. + Unsorted = sets:to_list( + sets:subtract( + sets:from_list(digraph:vertices(G)), + sets:from_list(digraph_utils:reaching(Heads, G)))), + %% Form a subgraph from that list and find a topological ordering + %% so we can invoke them in order. + [element(2, digraph:vertex(G, StepName)) + || StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. + +heads(G) -> + lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). + +%% ------------------------------------------------------------------- + +apply_upgrades(Upgrades) -> + LockFile = lock_filename(), + case file:read_file_info(LockFile) of + {error, enoent} -> + info("Upgrades: ~w to apply~n", [length(Upgrades)]), + {ok, Lock} = file:open(LockFile, write), + ok = file:close(Lock), + [apply_upgrade(Upgrade) || Upgrade <- Upgrades], + info("Upgrades: All applied~n", []), + ok = write_version(), + ok = file:delete(LockFile); + {ok, _FI} -> + exit(previous_upgrade_failed); + {error, _} = Error -> + exit(Error) + end. + +apply_upgrade({M, F}) -> + info("Upgrades: Applying ~w:~w~n", [M, F]), + ok = apply(M, F, []). + +%% ------------------------------------------------------------------- + +schema_filename() -> + filename:join(dir(), ?VERSION_FILENAME). + +lock_filename() -> + filename:join(dir(), ?LOCK_FILENAME). + +%% NB: we cannot use rabbit_log here since it may not have been started yet +info(Msg, Args) -> + error_logger:info_msg(Msg, Args). + +warn(Msg, Args) -> + error_logger:warning_msg(Msg, Args). + +dir() -> + rabbit_mnesia:dir(). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl new file mode 100644 index 00000000..d4add455 --- /dev/null +++ b/src/rabbit_upgrade_functions.erl @@ -0,0 +1,73 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are Rabbit Technologies Ltd. +%% +%% Copyright (C) 2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% +-module(rabbit_upgrade_functions). + +-include("rabbit.hrl"). + +-compile([export_all]). + +-rabbit_upgrade({test_add_column, []}). +-rabbit_upgrade({test_remove_column, [test_add_column]}). +-rabbit_upgrade({remove_user_scope, []}). + +%% ------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(remove_user_scope/0 :: () -> 'ok'). +-spec(test_add_column/0 :: () -> 'ok'). +-spec(test_remove_column/0 :: () -> 'ok'). + +-endif. + +%%-------------------------------------------------------------------- + +remove_user_scope() -> + {atomic, ok} = mnesia:transform_table( + rabbit_user_permission, + fun (Perm = #user_permission{ + permission = {permission, + _Scope, Conf, Write, Read}}) -> + Perm#user_permission{ + permission = #permission{configure = Conf, + write = Write, + read = Read}} + end, + record_info(fields, user_permission)), + ok. + +test_add_column() -> + {atomic, ok} = mnesia:transform_table( + rabbit_user, + fun ({user, Username, Password, Admin}) -> + {user, Username, Password, Admin, something_else} + end, + [username, password, is_admin, something]), + ok. + +test_remove_column() -> + {atomic, ok} = mnesia:transform_table( + rabbit_user, + fun ({user, Username, Password, Admin, _SomethingElse}) -> + {user, Username, Password, Admin} + end, + record_info(fields, user)), + ok. |