diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-16 17:30:30 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-16 17:30:30 +0000 |
commit | ae2e8ee3a60753439654ea6feef90ca7df3a3096 (patch) | |
tree | e1b17db5a8b0d5afd9cb0f44135be4800c36d5e0 | |
parent | 3c302660253185a11505ff33c7196b9d120df8c0 (diff) | |
download | rabbitmq-server-ae2e8ee3a60753439654ea6feef90ca7df3a3096.tar.gz |
Abstract and rewrite schema_version handling functions
-rw-r--r-- | src/rabbit_mnesia.erl | 18 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 96 | ||||
-rw-r--r-- | src/rabbit_version.erl | 103 |
3 files changed, 143 insertions, 74 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index e61f5fce..fa442c9c 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -442,7 +442,7 @@ init_db(ClusterNodes, Force) -> {[AnotherNode|_], _} -> %% Subsequent node in cluster, catch up ensure_version_ok( - rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), + rpc:call(AnotherNode, rabbit_version, read, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), ok = wait_for_replicated_tables(), @@ -457,7 +457,8 @@ init_db(ClusterNodes, Force) -> %% If we're just starting up a new node we won't have %% a version version_not_available -> - ok = rabbit_upgrade:write_version() + ok = rabbit_version:write( + rabbit_upgrade:desired_version()) end, ensure_schema_integrity() end; @@ -484,13 +485,14 @@ schema_ok_or_move() -> end. ensure_version_ok({ok, DiscVersion}) -> - case rabbit_upgrade:desired_version() of - DiscVersion -> ok; - DesiredVersion -> throw({error, {schema_mismatch, - DesiredVersion, DiscVersion}}) + DesiredVersion = rabbit_upgrade:desired_version(), + case rabbit_version:'=~='(DesiredVersion, DiscVersion) of + true -> ok; + false -> throw({error, {schema_mismatch, + DesiredVersion, DiscVersion}}) end; ensure_version_ok({error, _}) -> - ok = rabbit_upgrade:write_version(). + ok = rabbit_version:write(rabbit_upgrade:desired_version()). create_schema() -> mnesia:stop(), @@ -500,7 +502,7 @@ create_schema() -> cannot_start_mnesia), ok = create_tables(), ok = ensure_schema_integrity(), - ok = rabbit_upgrade:write_version(). + ok = rabbit_version:write(rabbit_upgrade:desired_version()). move_db() -> mnesia:stop(), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index f1134cfa..7a4a4fd8 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -17,8 +17,7 @@ -module(rabbit_upgrade). -export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]). --export([read_version/0, write_version/0, desired_version/0, - desired_version/1]). +-export([desired_version/0]). -include("rabbit.hrl"). @@ -30,16 +29,9 @@ -ifdef(use_specs). --type(step() :: atom()). --type(version() :: [{scope(), [step()]}]). --type(scope() :: 'mnesia' | 'local'). - -spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). -spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available'). --spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). --spec(write_version/0 :: () -> 'ok'). --spec(desired_version/0 :: () -> version()). --spec(desired_version/1 :: (scope()) -> [step()]). +-spec(desired_version/0 :: () -> rabbit_version:version()). -endif. @@ -173,7 +165,7 @@ is_disc_node() -> %% This is pretty ugly but we can't start Mnesia and ask it (will hang), %% we can't look at the config file (may not include us even if we're a %% disc node). - filelib:is_regular(rabbit_mnesia:dir() ++ "/rabbit_durable_exchange.DCD"). + filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")). die(Msg, Args) -> %% We don't throw or exit here since that gets thrown @@ -216,7 +208,7 @@ secondary_upgrade(AllNodes) -> end, rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ok = rabbit_mnesia:init_db(ClusterNodes, true), - ok = write_version(mnesia), + ok = write_desired_scope_version(mnesia), ok. nodes_running(Nodes) -> @@ -238,63 +230,37 @@ maybe_upgrade_local() -> fun() -> ok end) end. -read_version() -> - case rabbit_misc:read_term_file(schema_filename()) of - {ok, [V]} -> {ok, V}; - {error, _} = Err -> Err - end. - -read_version(Scope) -> - case read_version() of - {error, _} = E -> E; - {ok, V} -> {ok, filter_by_scope(Scope, V)} - end. - -write_version() -> - ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]), - ok. - -write_version(Scope) -> - {ok, V0} = read_version(), - V = flatten([case S of - Scope -> desired_version(S); - _ -> filter_by_scope(S, V0) - end || S <- ?SCOPES]), - ok = rabbit_misc:write_term_file(schema_filename(), [V]), - ok. - -desired_version() -> - flatten([desired_version(Scope) || Scope <- ?SCOPES]). +desired_version() -> [{Scope, desired_version(Scope)} || Scope <- ?SCOPES]. -desired_version(Scope) -> - with_upgrade_graph(fun (G) -> heads(G) end, Scope). +desired_version(Scope) -> with_upgrade_graph(fun (G) -> heads(G) end, Scope). -flatten(LoL) -> - lists:sort(lists:append(LoL)). - -filter_by_scope(Scope, Versions) -> - with_upgrade_graph( - fun(G) -> - ScopeVs = digraph:vertices(G), - [V || V <- Versions, lists:member(V, ScopeVs)] - end, Scope). +write_desired_scope_version(Scope) -> + ok = rabbit_version:with_scope_version( + Scope, + fun ({error, Error}) -> + throw({error, {can_not_read_version_to_write_it, Error}}) + end, + fun (_SV) -> {desired_version(Scope), ok} end). %% ------------------------------------------------------------------- upgrades_required(Scope) -> - case read_version(Scope) of - {ok, CurrentHeads} -> - with_upgrade_graph( - fun (G) -> - case unknown_heads(CurrentHeads, G) of - [] -> upgrades_to_apply(CurrentHeads, G); - Unknown -> throw({error, - {future_upgrades_found, Unknown}}) - end - end, Scope); - {error, enoent} -> - version_not_available - end. + rabbit_version:with_scope_version( + Scope, + fun ({error, enoent}) -> version_not_available end, + fun (CurrentHeads) -> + {CurrentHeads, + with_upgrade_graph( + fun (G) -> + case unknown_heads(CurrentHeads, G) of + [] -> + upgrades_to_apply(CurrentHeads, G); + Unknown -> + throw({error, + {future_upgrades_found, Unknown}}) + end + end, Scope)} + end). with_upgrade_graph(Fun, Scope) -> case rabbit_misc:build_acyclic_graph( @@ -363,7 +329,7 @@ apply_upgrades(Scope, Upgrades, Fun) -> [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades], info("~s upgrades: All upgrades applied successfully~n", [Scope]), - ok = write_version(Scope), + ok = write_desired_scope_version(Scope), ok = rabbit_misc:recursive_delete([BackupDir]), info("~s upgrades: Mnesia backup removed~n", [Scope]), ok = file:delete(LockFile); @@ -386,8 +352,6 @@ apply_upgrade(Scope, {M, F}) -> dir() -> rabbit_mnesia:dir(). -schema_filename() -> filename:join(dir(), ?VERSION_FILENAME). - lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). %% NB: we cannot use rabbit_log here since it may not have been diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl new file mode 100644 index 00000000..c88d57fe --- /dev/null +++ b/src/rabbit_version.erl @@ -0,0 +1,103 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_version). + +-export([read/0, write/1, with_scope_version/3, '=~='/2]). + +%% ------------------------------------------------------------------- +-ifdef(use_specs). + +-export_type([step/0, version/0, scope/0]). + +-type(step() :: atom()). +-type(version() :: [{scope(), [step()]}]). +-type(scope() :: atom()). + +-spec(read/0 :: () -> rabbit_types:ok_or_error2(version(), any())). +-spec(write/1 :: (version()) -> 'ok'). +-spec(with_scope_version/3 :: + (scope(), + fun (({'error', any()}) -> E), + fun (([step()]) -> {[step()], A})) -> E | A). +-spec('=~='/2 :: (version(), version()) -> boolean()). + +-endif. +%% ------------------------------------------------------------------- + +-define(VERSION_FILENAME, "schema_version"). + +%% ------------------------------------------------------------------- + +read() -> + case rabbit_misc:read_term_file(schema_filename()) of + {ok, [V]} -> {ok, categorise_by_scope(V)}; + {error, _} = Err -> Err + end. + +write(Version) -> + V = [Name || {_Scope, Names} <- Version, Name <- Names], + ok = rabbit_misc:write_term_file(schema_filename(), [V]). + +with_scope_version(Scope, ErrorHandler, Fun) -> + case read() of + {error, _} = Err -> + ErrorHandler(Err); + {ok, Version} -> + SV = case lists:keysearch(Scope, 1, Version) of + false -> []; + {value, {Scope, SV1}} -> SV1 + end, + {SV2, Result} = Fun(SV), + ok = case SV =:= SV2 of + true -> ok; + false -> write(lists:keystore(Scope, 1, Version, + {Scope, SV2})) + end, + Result + end. + +'=~='(VerA, VerB) -> + matches(lists:usort(VerA), lists:usort(VerB)). + +%% ------------------------------------------------------------------- + +matches([], []) -> + true; +matches([{Scope, SV}|VerA], [{Scope, SV}|VerB]) -> + matches(VerA, VerB); +matches([{Scope, SVA}|VerA], [{Scope, SVB}|VerB]) -> + case {lists:usort(SVA), lists:usort(SVB)} of + {SV, SV} -> matches(VerA, VerB); + _ -> false + end; +matches(_VerA, _VerB) -> + false. + +categorise_by_scope(Heads) when is_list(Heads) -> + Categorised = + [{Scope, Name} || {_Module, Attributes} <- + rabbit_misc:all_module_attributes(rabbit_upgrade), + {Name, Scope, _Requires} <- Attributes, + lists:member(Name, Heads)], + orddict:to_list( + lists:foldl(fun ({Scope, Name}, Version) -> + rabbit_misc:orddict_cons(Scope, Name, Version) + end, orddict:new(), Categorised)). + +dir() -> rabbit_mnesia:dir(). + +schema_filename() -> filename:join(dir(), ?VERSION_FILENAME). |