summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-11-09 16:41:39 +0000
committerSimon MacMullen <simon@rabbitmq.com>2010-11-09 16:41:39 +0000
commit6d7741caeac453498ead442805a5c46bbd9312c3 (patch)
treeff39b980a66128343a16a490baf19302f7b99e41
parent7ed10703eeea9379736e32cec751f23c3393aa4c (diff)
parent0cd23736f28817385eba70dfea045567da0e6194 (diff)
downloadrabbitmq-server-6d7741caeac453498ead442805a5c46bbd9312c3.tar.gz
Merged from default
-rw-r--r--src/rabbit.erl87
-rw-r--r--src/rabbit_misc.erl44
-rw-r--r--src/rabbit_mnesia.erl92
-rw-r--r--src/rabbit_upgrade.erl160
-rw-r--r--src/rabbit_upgrade_functions.erl73
5 files changed, 369 insertions, 87 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 8c36a9f0..a1dd2c2e 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -301,49 +301,38 @@ run_boot_step({StepName, Attributes}) ->
ok
end.
-module_attributes(Module) ->
- case catch Module:module_info(attributes) of
- {'EXIT', {undef, [{Module, module_info, _} | _]}} ->
- io:format("WARNING: module ~p not found, so not scanned for boot steps.~n",
- [Module]),
- [];
- {'EXIT', Reason} ->
- exit(Reason);
- V ->
- V
- end.
-
boot_steps() ->
- AllApps = [App || {App, _, _} <- application:loaded_applications()],
- Modules = lists:usort(
- lists:append([Modules
- || {ok, Modules} <-
- [application:get_key(App, modules)
- || App <- AllApps]])),
- UnsortedSteps =
- lists:flatmap(fun (Module) ->
- [{StepName, Attributes}
- || {rabbit_boot_step, [{StepName, Attributes}]}
- <- module_attributes(Module)]
- end, Modules),
- sort_boot_steps(UnsortedSteps).
+ sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)).
+
+vertices(_Module, Steps) ->
+ [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps].
+
+edges(_Module, Steps) ->
+ [case Key of
+ requires -> {StepName, OtherStep};
+ enables -> {OtherStep, StepName}
+ end || {StepName, Atts} <- Steps,
+ {Key, OtherStep} <- Atts,
+ Key =:= requires orelse Key =:= enables].
+
+graph_build_error({vertex, duplicate, StepName}) ->
+ boot_error("Duplicate boot step name: ~w~n", [StepName]);
+graph_build_error({edge, Reason, From, To}) ->
+ boot_error(
+ "Could not add boot step dependency of ~w on ~w:~n~s",
+ [To, From,
+ case Reason of
+ {bad_vertex, V} ->
+ io_lib:format("Boot step not registered: ~w~n", [V]);
+ {bad_edge, [First | Rest]} ->
+ [io_lib:format("Cyclic dependency: ~w", [First]),
+ [io_lib:format(" depends on ~w", [Next]) || Next <- Rest],
+ io_lib:format(" depends on ~w~n", [First])]
+ end]).
sort_boot_steps(UnsortedSteps) ->
- G = digraph:new([acyclic]),
-
- %% Add vertices, with duplicate checking.
- [case digraph:vertex(G, StepName) of
- false -> digraph:add_vertex(G, StepName, Step);
- _ -> boot_error("Duplicate boot step name: ~w~n", [StepName])
- end || Step = {StepName, _Attrs} <- UnsortedSteps],
-
- %% Add edges, detecting cycles and missing vertices.
- lists:foreach(fun ({StepName, Attributes}) ->
- [add_boot_step_dep(G, StepName, PrecedingStepName)
- || {requires, PrecedingStepName} <- Attributes],
- [add_boot_step_dep(G, SucceedingStepName, StepName)
- || {enables, SucceedingStepName} <- Attributes]
- end, UnsortedSteps),
+ G = rabbit_misc:build_acyclic_graph(
+ fun vertices/2, fun edges/2, fun graph_build_error/1, UnsortedSteps),
%% Use topological sort to find a consistent ordering (if there is
%% one, otherwise fail).
@@ -365,24 +354,6 @@ sort_boot_steps(UnsortedSteps) ->
[MissingFunctions])
end.
-add_boot_step_dep(G, RunsSecond, RunsFirst) ->
- case digraph:add_edge(G, RunsSecond, RunsFirst) of
- {error, Reason} ->
- boot_error("Could not add boot step dependency of ~w on ~w:~n~s",
- [RunsSecond, RunsFirst,
- case Reason of
- {bad_vertex, V} ->
- io_lib:format("Boot step not registered: ~w~n", [V]);
- {bad_edge, [First | Rest]} ->
- [io_lib:format("Cyclic dependency: ~w", [First]),
- [io_lib:format(" depends on ~w", [Next])
- || Next <- Rest],
- io_lib:format(" depends on ~w~n", [First])]
- end]);
- _ ->
- ok
- end.
-
%%---------------------------------------------------------------------------
log_location(Type) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index e5c30c06..128fca60 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -64,6 +64,7 @@
-export([recursive_delete/1, dict_cons/3, orddict_cons/3,
unlink_and_capture_exit/1]).
-export([get_options/2]).
+-export([all_module_attributes/1, build_acyclic_graph/4]).
-export([now_ms/0]).
-import(mnesia).
@@ -185,6 +186,7 @@
-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
-spec(get_options/2 :: ([optdef()], [string()])
-> {[string()], [{string(), any()}]}).
+-spec(all_module_attributes/1 :: (atom()) -> dict:dictionary()).
-spec(now_ms/0 :: () -> non_neg_integer()).
-endif.
@@ -726,3 +728,45 @@ get_flag(_, []) ->
now_ms() ->
timer:now_diff(now(), {0,0,0}) div 1000.
+
+module_attributes(Module) ->
+ case catch Module:module_info(attributes) of
+ {'EXIT', {undef, [{Module, module_info, _} | _]}} ->
+ io:format("WARNING: module ~p not found, so not scanned for boot steps.~n",
+ [Module]),
+ [];
+ {'EXIT', Reason} ->
+ exit(Reason);
+ V ->
+ V
+ end.
+
+all_module_attributes(Name) ->
+ Modules =
+ lists:usort(
+ lists:append(
+ [Modules || {App, _, _} <- application:loaded_applications(),
+ {ok, Modules} <- [application:get_key(App, modules)]])),
+ lists:foldl(
+ fun (Module, Acc) ->
+ case lists:append([Atts || {N, Atts} <- module_attributes(Module),
+ N =:= Name]) of
+ [] -> Acc;
+ Atts -> [{Module, Atts} | Acc]
+ end
+ end, [], Modules).
+
+
+build_acyclic_graph(VertexFun, EdgeFun, ErrorFun, Graph) ->
+ G = digraph:new([acyclic]),
+ [ case digraph:vertex(G, Vertex) of
+ false -> digraph:add_vertex(G, Vertex, Label);
+ _ -> ErrorFun({vertex, duplicate, Vertex})
+ end || {Module, Atts} <- Graph,
+ {Vertex, Label} <- VertexFun(Module, Atts) ],
+ [ case digraph:add_edge(G, From, To) of
+ {error, E} -> ErrorFun({edge, E, From, To});
+ _ -> ok
+ end || {Module, Atts} <- Graph,
+ {From, To} <- EdgeFun(Module, Atts) ],
+ G.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 8de2f0d6..9bb70143 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -44,9 +44,6 @@
-include("rabbit.hrl").
--define(SCHEMA_VERSION_SET, []).
--define(SCHEMA_VERSION_FILENAME, "schema_version").
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -94,9 +91,6 @@ init() ->
ok = ensure_mnesia_running(),
ok = ensure_mnesia_dir(),
ok = init_db(read_cluster_nodes_config(), true),
- ok = rabbit_misc:write_term_file(filename:join(
- dir(), ?SCHEMA_VERSION_FILENAME),
- [?SCHEMA_VERSION_SET]),
ok.
is_db_empty() ->
@@ -378,28 +372,30 @@ init_db(ClusterNodes, Force) ->
end;
_ -> ok
end,
- case Nodes of
- [] ->
- case mnesia:system_info(use_dir) of
- true ->
- case check_schema_integrity() of
- ok ->
- ok;
- {error, Reason} ->
- %% NB: we cannot use rabbit_log here since
- %% it may not have been started yet
- error_logger:warning_msg(
- "schema integrity check failed: ~p~n"
- "moving database to backup location "
- "and recreating schema from scratch~n",
- [Reason]),
- ok = move_db(),
- ok = create_schema()
- end;
- false ->
- ok = create_schema()
+ case {Nodes, mnesia:system_info(use_dir),
+ mnesia:system_info(db_nodes)} of
+ {[], true, [_]} ->
+ %% True single disc node, attempt upgrade
+ wait_for_tables(),
+ case rabbit_upgrade:maybe_upgrade() of
+ ok ->
+ schema_ok_or_exit();
+ version_not_available ->
+ schema_ok_or_move()
end;
- [_|_] ->
+ {[], true, _} ->
+ %% "Master" (i.e. without config) disc node in cluster,
+ %% verify schema
+ wait_for_tables(),
+ version_ok_or_exit(rabbit_upgrade:read_version()),
+ schema_ok_or_exit();
+ {[], false, _} ->
+ %% First RAM node in cluster, start from scratch
+ ok = create_schema();
+ {[AnotherNode|_], _, _} ->
+ %% Subsequent node in cluster, catch up
+ version_ok_or_exit(
+ rpc:call(AnotherNode, rabbit_upgrade, read_version, [])),
IsDiskNode = ClusterNodes == [] orelse
lists:member(node(), ClusterNodes),
ok = wait_for_replicated_tables(),
@@ -408,7 +404,7 @@ init_db(ClusterNodes, Force) ->
true -> disc;
false -> ram
end),
- ok = ensure_schema_integrity()
+ schema_ok_or_exit()
end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
@@ -418,6 +414,43 @@ init_db(ClusterNodes, Force) ->
ClusterNodes, Reason}})
end.
+schema_ok_or_move() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ %% NB: we cannot use rabbit_log here since it may not have been
+ %% started yet
+ error_logger:warning_msg("schema integrity check failed: ~p~n"
+ "moving database to backup location "
+ "and recreating schema from scratch~n",
+ [Reason]),
+ ok = move_db(),
+ ok = create_schema()
+ end.
+
+version_ok_or_exit(V) ->
+ DesiredVersion = rabbit_upgrade:desired_version(),
+ case V of
+ {ok, DiscVersion} ->
+ case DesiredVersion of
+ DiscVersion ->
+ ok;
+ _ ->
+ exit({schema_mismatch, DesiredVersion, DiscVersion})
+ end;
+ {error, _} ->
+ ok = rabbit_upgrade:write_version()
+ end.
+
+schema_ok_or_exit() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ exit({schema_invalid, Reason})
+ end.
+
create_schema() ->
mnesia:stop(),
rabbit_misc:ensure_ok(mnesia:create_schema([node()]),
@@ -426,7 +459,8 @@ create_schema() ->
cannot_start_mnesia),
ok = create_tables(),
ok = ensure_schema_integrity(),
- ok = wait_for_tables().
+ ok = wait_for_tables(),
+ ok = rabbit_upgrade:write_version().
move_db() ->
mnesia:stop(),
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
new file mode 100644
index 00000000..37bac90d
--- /dev/null
+++ b/src/rabbit_upgrade.erl
@@ -0,0 +1,160 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are Rabbit Technologies Ltd.
+%%
+%% Copyright (C) 2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_upgrade).
+
+-export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]).
+
+-include("rabbit.hrl").
+
+-define(VERSION_FILENAME, "schema_version").
+-define(LOCK_FILENAME, "schema_upgrade_lock").
+
+%% -------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available').
+-spec(read_version/0 ::
+ () -> {'ok', [any()]} | rabbit_types:error(any())).
+-spec(write_version/0 :: () -> 'ok').
+-spec(desired_version/0 :: () -> [any()]).
+
+-endif.
+
+%% -------------------------------------------------------------------
+
+%% Try to upgrade the schema. If no information on the existing schema could
+%% be found, do nothing. rabbit_mnesia:check_schema_integrity() will catch the
+%% problem.
+maybe_upgrade() ->
+ case read_version() of
+ {ok, CurrentHeads} ->
+ G = load_graph(),
+ case unknown_heads(CurrentHeads, G) of
+ [] ->
+ case upgrades_to_apply(CurrentHeads, G) of
+ [] -> ok;
+ Upgrades -> apply_upgrades(Upgrades)
+ end;
+ Unknown ->
+ [warn("Data store has had future upgrade ~w applied." ++
+ " Will not upgrade.~n", [U]) || U <- Unknown]
+ end,
+ true = digraph:delete(G),
+ ok;
+ {error, enoent} ->
+ version_not_available
+ end.
+
+read_version() ->
+ case rabbit_misc:read_term_file(schema_filename()) of
+ {ok, [Heads]} -> {ok, Heads};
+ {error, E} -> {error, E}
+ end.
+
+write_version() ->
+ ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]),
+ ok.
+
+desired_version() ->
+ G = load_graph(),
+ Version = heads(G),
+ true = digraph:delete(G),
+ Version.
+
+%% -------------------------------------------------------------------
+
+load_graph() ->
+ Upgrades = rabbit_misc:all_module_attributes(rabbit_upgrade),
+ rabbit_misc:build_acyclic_graph(
+ fun vertices/2, fun edges/2, fun graph_build_error/1, Upgrades).
+
+vertices(Module, Steps) ->
+ [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps].
+
+edges(_Module, Steps) ->
+ [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires].
+
+graph_build_error({vertex, duplicate, StepName}) ->
+ exit({duplicate_upgrade, StepName});
+graph_build_error({edge, E, From, To}) ->
+ exit({E, From, To}).
+
+unknown_heads(Heads, G) ->
+ [H || H <- Heads, digraph:vertex(G, H) =:= false].
+
+upgrades_to_apply(Heads, G) ->
+ %% Take all the vertices which can reach the known heads. That's
+ %% everything we've already applied. Subtract that from all
+ %% vertices: that's what we have to apply.
+ Unsorted = sets:to_list(
+ sets:subtract(
+ sets:from_list(digraph:vertices(G)),
+ sets:from_list(digraph_utils:reaching(Heads, G)))),
+ %% Form a subgraph from that list and find a topological ordering
+ %% so we can invoke them in order.
+ [element(2, digraph:vertex(G, StepName))
+ || StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
+
+heads(G) ->
+ lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]).
+
+%% -------------------------------------------------------------------
+
+apply_upgrades(Upgrades) ->
+ LockFile = lock_filename(),
+ case file:read_file_info(LockFile) of
+ {error, enoent} ->
+ info("Upgrades: ~w to apply~n", [length(Upgrades)]),
+ {ok, Lock} = file:open(LockFile, write),
+ ok = file:close(Lock),
+ [apply_upgrade(Upgrade) || Upgrade <- Upgrades],
+ info("Upgrades: All applied~n", []),
+ ok = write_version(),
+ ok = file:delete(LockFile);
+ {ok, _FI} ->
+ exit(previous_upgrade_failed);
+ {error, _} = Error ->
+ exit(Error)
+ end.
+
+apply_upgrade({M, F}) ->
+ info("Upgrades: Applying ~w:~w~n", [M, F]),
+ ok = apply(M, F, []).
+
+%% -------------------------------------------------------------------
+
+schema_filename() ->
+ filename:join(dir(), ?VERSION_FILENAME).
+
+lock_filename() ->
+ filename:join(dir(), ?LOCK_FILENAME).
+
+%% NB: we cannot use rabbit_log here since it may not have been started yet
+info(Msg, Args) ->
+ error_logger:info_msg(Msg, Args).
+
+warn(Msg, Args) ->
+ error_logger:warning_msg(Msg, Args).
+
+dir() ->
+ rabbit_mnesia:dir().
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
new file mode 100644
index 00000000..d4add455
--- /dev/null
+++ b/src/rabbit_upgrade_functions.erl
@@ -0,0 +1,73 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are Rabbit Technologies Ltd.
+%%
+%% Copyright (C) 2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-module(rabbit_upgrade_functions).
+
+-include("rabbit.hrl").
+
+-compile([export_all]).
+
+-rabbit_upgrade({test_add_column, []}).
+-rabbit_upgrade({test_remove_column, [test_add_column]}).
+-rabbit_upgrade({remove_user_scope, []}).
+
+%% -------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(remove_user_scope/0 :: () -> 'ok').
+-spec(test_add_column/0 :: () -> 'ok').
+-spec(test_remove_column/0 :: () -> 'ok').
+
+-endif.
+
+%%--------------------------------------------------------------------
+
+remove_user_scope() ->
+ {atomic, ok} = mnesia:transform_table(
+ rabbit_user_permission,
+ fun (Perm = #user_permission{
+ permission = {permission,
+ _Scope, Conf, Write, Read}}) ->
+ Perm#user_permission{
+ permission = #permission{configure = Conf,
+ write = Write,
+ read = Read}}
+ end,
+ record_info(fields, user_permission)),
+ ok.
+
+test_add_column() ->
+ {atomic, ok} = mnesia:transform_table(
+ rabbit_user,
+ fun ({user, Username, Password, Admin}) ->
+ {user, Username, Password, Admin, something_else}
+ end,
+ [username, password, is_admin, something]),
+ ok.
+
+test_remove_column() ->
+ {atomic, ok} = mnesia:transform_table(
+ rabbit_user,
+ fun ({user, Username, Password, Admin, _SomethingElse}) ->
+ {user, Username, Password, Admin}
+ end,
+ record_info(fields, user)),
+ ok.