summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-11-04 13:41:19 +0000
committerSimon MacMullen <simon@rabbitmq.com>2010-11-04 13:41:19 +0000
commit56385940ddec8e087edc64dbaf3d7c0fe6934a3f (patch)
tree8ec6128cb7df900008c3fd2b494a83ce6a20c764
parent22f0202b25d7922c11002a734d50c9fec24c8614 (diff)
downloadrabbitmq-server-56385940ddec8e087edc64dbaf3d7c0fe6934a3f.tar.gz
Check the version we want matches the one a remote node is running before joining the cluster.
-rw-r--r--src/rabbit_mnesia.erl20
-rw-r--r--src/rabbit_upgrade.erl52
2 files changed, 49 insertions, 23 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 4663e175..35be1700 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -377,7 +377,7 @@ init_db(ClusterNodes, Force) ->
{[], true, [_]} ->
%% True single disc node, attempt upgrade
wait_for_tables(),
- rabbit_upgrade:maybe_upgrade(dir()),
+ rabbit_upgrade:maybe_upgrade(),
case check_schema_integrity() of
ok ->
ok;
@@ -403,8 +403,19 @@ init_db(ClusterNodes, Force) ->
{[], false, _} ->
%% First RAM node in cluster, start from scratch
ok = create_schema();
- {[_|_], _, _} ->
+ {[AnotherNode|_], _, _} ->
%% Subsequent node in cluster, catch up
+ LocalVersion = rabbit_upgrade:desired_version(),
+ {ok, RemoteVersion} = rpc:call(
+ AnotherNode,
+ rabbit_upgrade, read_version, []),
+ case LocalVersion of
+ RemoteVersion ->
+ ok;
+ _ ->
+ exit({schema_mismatch, LocalVersion, RemoteVersion})
+ end,
+ ok = rabbit_upgrade:write_version(),
IsDiskNode = ClusterNodes == [] orelse
lists:member(node(), ClusterNodes),
ok = wait_for_replicated_tables(),
@@ -413,8 +424,7 @@ init_db(ClusterNodes, Force) ->
true -> disc;
false -> ram
end),
- ok = ensure_schema_integrity(),
- ok = rabbit_upgrade:write_version(dir())
+ ok = ensure_schema_integrity()
end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
@@ -433,7 +443,7 @@ create_schema() ->
ok = create_tables(),
ok = ensure_schema_integrity(),
ok = wait_for_tables(),
- ok = rabbit_upgrade:write_version(dir()).
+ ok = rabbit_upgrade:write_version().
move_db() ->
mnesia:stop(),
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 53731639..9262ebab 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -21,7 +21,7 @@
-module(rabbit_upgrade).
--export([maybe_upgrade/1, write_version/1]).
+-export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]).
-include("rabbit.hrl").
@@ -32,8 +32,11 @@
-ifdef(use_specs).
--spec(maybe_upgrade/1 :: (file:filename()) -> 'ok').
--spec(write_version/1 :: (file:filename()) -> 'ok').
+-spec(maybe_upgrade/0 :: () -> 'ok').
+-spec(read_version/0 ::
+ () -> {'ok', [any()]} | rabbit_types:error(any())).
+-spec(write_version/0 :: () -> 'ok').
+-spec(desired_version/0 :: () -> [any()]).
-endif.
@@ -42,15 +45,15 @@
%% Try to upgrade the schema. If no information on the existing schema could
%% be found, do nothing. rabbit_mnesia:check_schema_integrity() will catch the
%% problem.
-maybe_upgrade(Dir) ->
- case rabbit_misc:read_term_file(schema_filename(Dir)) of
- {ok, [CurrentHeads]} ->
+maybe_upgrade() ->
+ case read_version() of
+ {ok, CurrentHeads} ->
G = load_graph(),
case unknown_heads(CurrentHeads, G) of
[] ->
case upgrades_to_apply(CurrentHeads, G) of
[] -> ok;
- Upgrades -> apply_upgrades(Upgrades, Dir)
+ Upgrades -> apply_upgrades(Upgrades)
end;
Unknown ->
[warn("Data store has had future upgrade ~w applied." ++
@@ -62,11 +65,21 @@ maybe_upgrade(Dir) ->
ok
end.
-write_version(Dir) ->
+read_version() ->
+ case rabbit_misc:read_term_file(schema_filename()) of
+ {ok, [Heads]} -> {ok, Heads};
+ {error, E} -> {error, E}
+ end.
+
+write_version() ->
+ ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]),
+ ok.
+
+desired_version() ->
G = load_graph(),
- ok = rabbit_misc:write_term_file(schema_filename(Dir), [heads(G)]),
+ Version = heads(G),
true = digraph:delete(G),
- ok.
+ Version.
%% -------------------------------------------------------------------
@@ -103,12 +116,12 @@ upgrades_to_apply(Heads, G) ->
|| StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
heads(G) ->
- [V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0].
+ lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]).
%% -------------------------------------------------------------------
-apply_upgrades(Upgrades, Dir) ->
- LockFile = lock_filename(Dir),
+apply_upgrades(Upgrades) ->
+ LockFile = lock_filename(),
case file:read_file_info(LockFile) of
{error, enoent} ->
info("Upgrades: ~w to apply~n", [length(Upgrades)]),
@@ -116,7 +129,7 @@ apply_upgrades(Upgrades, Dir) ->
ok = file:close(Lock),
[apply_upgrade(Upgrade) || Upgrade <- Upgrades],
info("Upgrades: All applied~n", []),
- ok = write_version(Dir),
+ ok = write_version(),
ok = file:delete(LockFile);
{ok, _FI} ->
exit(previous_upgrade_failed);
@@ -130,11 +143,11 @@ apply_upgrade({M, F}) ->
%% -------------------------------------------------------------------
-schema_filename(Dir) ->
- filename:join(Dir, ?VERSION_FILENAME).
+schema_filename() ->
+ filename:join(dir(), ?VERSION_FILENAME).
-lock_filename(Dir) ->
- filename:join(Dir, ?LOCK_FILENAME).
+lock_filename() ->
+ filename:join(dir(), ?LOCK_FILENAME).
%% NB: we cannot use rabbit_log here since it may not have been started yet
info(Msg, Args) ->
@@ -142,3 +155,6 @@ info(Msg, Args) ->
warn(Msg, Args) ->
error_logger:warning_msg(Msg, Args).
+
+dir() ->
+ rabbit_mnesia:dir().