summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-16 17:30:30 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-16 17:30:30 +0000
commitae2e8ee3a60753439654ea6feef90ca7df3a3096 (patch)
treee1b17db5a8b0d5afd9cb0f44135be4800c36d5e0
parent3c302660253185a11505ff33c7196b9d120df8c0 (diff)
downloadrabbitmq-server-ae2e8ee3a60753439654ea6feef90ca7df3a3096.tar.gz
Abstract and rewrite schema_version handling functions
-rw-r--r--src/rabbit_mnesia.erl18
-rw-r--r--src/rabbit_upgrade.erl96
-rw-r--r--src/rabbit_version.erl103
3 files changed, 143 insertions, 74 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index e61f5fce..fa442c9c 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -442,7 +442,7 @@ init_db(ClusterNodes, Force) ->
{[AnotherNode|_], _} ->
%% Subsequent node in cluster, catch up
ensure_version_ok(
- rpc:call(AnotherNode, rabbit_upgrade, read_version, [])),
+ rpc:call(AnotherNode, rabbit_version, read, [])),
IsDiskNode = ClusterNodes == [] orelse
lists:member(node(), ClusterNodes),
ok = wait_for_replicated_tables(),
@@ -457,7 +457,8 @@ init_db(ClusterNodes, Force) ->
%% If we're just starting up a new node we won't have
%% a version
version_not_available ->
- ok = rabbit_upgrade:write_version()
+ ok = rabbit_version:write(
+ rabbit_upgrade:desired_version())
end,
ensure_schema_integrity()
end;
@@ -484,13 +485,14 @@ schema_ok_or_move() ->
end.
ensure_version_ok({ok, DiscVersion}) ->
- case rabbit_upgrade:desired_version() of
- DiscVersion -> ok;
- DesiredVersion -> throw({error, {schema_mismatch,
- DesiredVersion, DiscVersion}})
+ DesiredVersion = rabbit_upgrade:desired_version(),
+ case rabbit_version:'=~='(DesiredVersion, DiscVersion) of
+ true -> ok;
+ false -> throw({error, {schema_mismatch,
+ DesiredVersion, DiscVersion}})
end;
ensure_version_ok({error, _}) ->
- ok = rabbit_upgrade:write_version().
+ ok = rabbit_version:write(rabbit_upgrade:desired_version()).
create_schema() ->
mnesia:stop(),
@@ -500,7 +502,7 @@ create_schema() ->
cannot_start_mnesia),
ok = create_tables(),
ok = ensure_schema_integrity(),
- ok = rabbit_upgrade:write_version().
+ ok = rabbit_version:write(rabbit_upgrade:desired_version()).
move_db() ->
mnesia:stop(),
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index f1134cfa..7a4a4fd8 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -17,8 +17,7 @@
-module(rabbit_upgrade).
-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]).
--export([read_version/0, write_version/0, desired_version/0,
- desired_version/1]).
+-export([desired_version/0]).
-include("rabbit.hrl").
@@ -30,16 +29,9 @@
-ifdef(use_specs).
--type(step() :: atom()).
--type(version() :: [{scope(), [step()]}]).
--type(scope() :: 'mnesia' | 'local').
-
-spec(maybe_upgrade_mnesia/0 :: () -> 'ok').
-spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available').
--spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())).
--spec(write_version/0 :: () -> 'ok').
--spec(desired_version/0 :: () -> version()).
--spec(desired_version/1 :: (scope()) -> [step()]).
+-spec(desired_version/0 :: () -> rabbit_version:version()).
-endif.
@@ -173,7 +165,7 @@ is_disc_node() ->
%% This is pretty ugly but we can't start Mnesia and ask it (will hang),
%% we can't look at the config file (may not include us even if we're a
%% disc node).
- filelib:is_regular(rabbit_mnesia:dir() ++ "/rabbit_durable_exchange.DCD").
+ filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")).
die(Msg, Args) ->
%% We don't throw or exit here since that gets thrown
@@ -216,7 +208,7 @@ secondary_upgrade(AllNodes) ->
end,
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
ok = rabbit_mnesia:init_db(ClusterNodes, true),
- ok = write_version(mnesia),
+ ok = write_desired_scope_version(mnesia),
ok.
nodes_running(Nodes) ->
@@ -238,63 +230,37 @@ maybe_upgrade_local() ->
fun() -> ok end)
end.
-read_version() ->
- case rabbit_misc:read_term_file(schema_filename()) of
- {ok, [V]} -> {ok, V};
- {error, _} = Err -> Err
- end.
-
-read_version(Scope) ->
- case read_version() of
- {error, _} = E -> E;
- {ok, V} -> {ok, filter_by_scope(Scope, V)}
- end.
-
-write_version() ->
- ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]),
- ok.
-
-write_version(Scope) ->
- {ok, V0} = read_version(),
- V = flatten([case S of
- Scope -> desired_version(S);
- _ -> filter_by_scope(S, V0)
- end || S <- ?SCOPES]),
- ok = rabbit_misc:write_term_file(schema_filename(), [V]),
- ok.
-
-desired_version() ->
- flatten([desired_version(Scope) || Scope <- ?SCOPES]).
+desired_version() -> [{Scope, desired_version(Scope)} || Scope <- ?SCOPES].
-desired_version(Scope) ->
- with_upgrade_graph(fun (G) -> heads(G) end, Scope).
+desired_version(Scope) -> with_upgrade_graph(fun (G) -> heads(G) end, Scope).
-flatten(LoL) ->
- lists:sort(lists:append(LoL)).
-
-filter_by_scope(Scope, Versions) ->
- with_upgrade_graph(
- fun(G) ->
- ScopeVs = digraph:vertices(G),
- [V || V <- Versions, lists:member(V, ScopeVs)]
- end, Scope).
+write_desired_scope_version(Scope) ->
+ ok = rabbit_version:with_scope_version(
+ Scope,
+ fun ({error, Error}) ->
+ throw({error, {can_not_read_version_to_write_it, Error}})
+ end,
+ fun (_SV) -> {desired_version(Scope), ok} end).
%% -------------------------------------------------------------------
upgrades_required(Scope) ->
- case read_version(Scope) of
- {ok, CurrentHeads} ->
- with_upgrade_graph(
- fun (G) ->
- case unknown_heads(CurrentHeads, G) of
- [] -> upgrades_to_apply(CurrentHeads, G);
- Unknown -> throw({error,
- {future_upgrades_found, Unknown}})
- end
- end, Scope);
- {error, enoent} ->
- version_not_available
- end.
+ rabbit_version:with_scope_version(
+ Scope,
+ fun ({error, enoent}) -> version_not_available end,
+ fun (CurrentHeads) ->
+ {CurrentHeads,
+ with_upgrade_graph(
+ fun (G) ->
+ case unknown_heads(CurrentHeads, G) of
+ [] ->
+ upgrades_to_apply(CurrentHeads, G);
+ Unknown ->
+ throw({error,
+ {future_upgrades_found, Unknown}})
+ end
+ end, Scope)}
+ end).
with_upgrade_graph(Fun, Scope) ->
case rabbit_misc:build_acyclic_graph(
@@ -363,7 +329,7 @@ apply_upgrades(Scope, Upgrades, Fun) ->
[apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades],
info("~s upgrades: All upgrades applied successfully~n",
[Scope]),
- ok = write_version(Scope),
+ ok = write_desired_scope_version(Scope),
ok = rabbit_misc:recursive_delete([BackupDir]),
info("~s upgrades: Mnesia backup removed~n", [Scope]),
ok = file:delete(LockFile);
@@ -386,8 +352,6 @@ apply_upgrade(Scope, {M, F}) ->
dir() -> rabbit_mnesia:dir().
-schema_filename() -> filename:join(dir(), ?VERSION_FILENAME).
-
lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME).
%% NB: we cannot use rabbit_log here since it may not have been
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
new file mode 100644
index 00000000..c88d57fe
--- /dev/null
+++ b/src/rabbit_version.erl
@@ -0,0 +1,103 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_version).
+
+-export([read/0, write/1, with_scope_version/3, '=~='/2]).
+
+%% -------------------------------------------------------------------
+-ifdef(use_specs).
+
+-export_type([step/0, version/0, scope/0]).
+
+-type(step() :: atom()).
+-type(version() :: [{scope(), [step()]}]).
+-type(scope() :: atom()).
+
+-spec(read/0 :: () -> rabbit_types:ok_or_error2(version(), any())).
+-spec(write/1 :: (version()) -> 'ok').
+-spec(with_scope_version/3 ::
+ (scope(),
+ fun (({'error', any()}) -> E),
+ fun (([step()]) -> {[step()], A})) -> E | A).
+-spec('=~='/2 :: (version(), version()) -> boolean()).
+
+-endif.
+%% -------------------------------------------------------------------
+
+-define(VERSION_FILENAME, "schema_version").
+
+%% -------------------------------------------------------------------
+
+read() ->
+ case rabbit_misc:read_term_file(schema_filename()) of
+ {ok, [V]} -> {ok, categorise_by_scope(V)};
+ {error, _} = Err -> Err
+ end.
+
+write(Version) ->
+ V = [Name || {_Scope, Names} <- Version, Name <- Names],
+ ok = rabbit_misc:write_term_file(schema_filename(), [V]).
+
+with_scope_version(Scope, ErrorHandler, Fun) ->
+ case read() of
+ {error, _} = Err ->
+ ErrorHandler(Err);
+ {ok, Version} ->
+ SV = case lists:keysearch(Scope, 1, Version) of
+ false -> [];
+ {value, {Scope, SV1}} -> SV1
+ end,
+ {SV2, Result} = Fun(SV),
+ ok = case SV =:= SV2 of
+ true -> ok;
+ false -> write(lists:keystore(Scope, 1, Version,
+ {Scope, SV2}))
+ end,
+ Result
+ end.
+
+'=~='(VerA, VerB) ->
+ matches(lists:usort(VerA), lists:usort(VerB)).
+
+%% -------------------------------------------------------------------
+
+matches([], []) ->
+ true;
+matches([{Scope, SV}|VerA], [{Scope, SV}|VerB]) ->
+ matches(VerA, VerB);
+matches([{Scope, SVA}|VerA], [{Scope, SVB}|VerB]) ->
+ case {lists:usort(SVA), lists:usort(SVB)} of
+ {SV, SV} -> matches(VerA, VerB);
+ _ -> false
+ end;
+matches(_VerA, _VerB) ->
+ false.
+
+categorise_by_scope(Heads) when is_list(Heads) ->
+ Categorised =
+ [{Scope, Name} || {_Module, Attributes} <-
+ rabbit_misc:all_module_attributes(rabbit_upgrade),
+ {Name, Scope, _Requires} <- Attributes,
+ lists:member(Name, Heads)],
+ orddict:to_list(
+ lists:foldl(fun ({Scope, Name}, Version) ->
+ rabbit_misc:orddict_cons(Scope, Name, Version)
+ end, orddict:new(), Categorised)).
+
+dir() -> rabbit_mnesia:dir().
+
+schema_filename() -> filename:join(dir(), ?VERSION_FILENAME).