summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-23 18:02:13 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-23 18:02:13 +0000
commitfec6c5e2279bd726b27c76bc0b464c53a7f5525a (patch)
treeddb45c969e67fd7787507f0b89207278d4d53de7
parent3b89e0573c46e82557dc2592514907e2a6d0ae71 (diff)
parent330eb98c7bc0e3df4149807dba765263a06c2d3d (diff)
downloadrabbitmq-server-fec6c5e2279bd726b27c76bc0b464c53a7f5525a.tar.gz
Merging bug23425 to default
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_mnesia.erl111
-rw-r--r--src/rabbit_prelaunch.erl2
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_upgrade.erl331
-rw-r--r--src/rabbit_upgrade_functions.erl12
-rw-r--r--src/rabbit_variable_queue.erl2
-rw-r--r--src/rabbit_version.erl172
8 files changed, 486 insertions, 151 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c9a929ae..807e9e7d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -192,7 +192,8 @@
%%----------------------------------------------------------------------------
prepare() ->
- ok = ensure_working_log_handlers().
+ ok = ensure_working_log_handlers(),
+ ok = rabbit_upgrade:maybe_upgrade_mnesia().
start() ->
try
@@ -233,6 +234,7 @@ rotate_logs(BinarySuffix) ->
start(normal, []) ->
case erts_version_check() of
ok ->
+ ok = rabbit_mnesia:delete_previously_running_nodes(),
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
@@ -245,6 +247,7 @@ start(normal, []) ->
end.
stop(_State) ->
+ ok = rabbit_mnesia:record_running_nodes(),
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 963d814e..fbcf07ae 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -18,9 +18,12 @@
-module(rabbit_mnesia).
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
- cluster/1, force_cluster/1, reset/0, force_reset/0,
+ cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3,
is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
- empty_ram_only_tables/0, copy_db/1, wait_for_tables/1]).
+ empty_ram_only_tables/0, copy_db/1, wait_for_tables/1,
+ create_cluster_nodes_config/1, read_cluster_nodes_config/0,
+ record_running_nodes/0, read_previously_running_nodes/0,
+ delete_previously_running_nodes/0, running_nodes_filename/0]).
-export([table_names/0]).
@@ -42,6 +45,7 @@
-spec(dir/0 :: () -> file:filename()).
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
+-spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok').
-spec(is_db_empty/0 :: () -> boolean()).
-spec(cluster/1 :: ([node()]) -> 'ok').
-spec(force_cluster/1 :: ([node()]) -> 'ok').
@@ -55,6 +59,12 @@
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
-spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
+-spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok').
+-spec(read_cluster_nodes_config/0 :: () -> [node()]).
+-spec(record_running_nodes/0 :: () -> 'ok').
+-spec(read_previously_running_nodes/0 :: () -> [node()]).
+-spec(delete_previously_running_nodes/0 :: () -> 'ok').
+-spec(running_nodes_filename/0 :: () -> file:filename()).
-endif.
@@ -80,7 +90,8 @@ status() ->
init() ->
ensure_mnesia_running(),
ensure_mnesia_dir(),
- ok = init_db(read_cluster_nodes_config(), true),
+ ok = init_db(read_cluster_nodes_config(), true,
+ fun maybe_upgrade_local_or_record_desired/0),
ok.
is_db_empty() ->
@@ -102,7 +113,8 @@ cluster(ClusterNodes, Force) ->
ensure_mnesia_dir(),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
try
- ok = init_db(ClusterNodes, Force),
+ ok = init_db(ClusterNodes, Force,
+ fun maybe_upgrade_local_or_record_desired/0),
ok = create_cluster_nodes_config(ClusterNodes)
after
mnesia:stop()
@@ -367,11 +379,40 @@ delete_cluster_nodes_config() ->
FileName, Reason}})
end.
+running_nodes_filename() ->
+ filename:join(dir(), "nodes_running_at_shutdown").
+
+record_running_nodes() ->
+ FileName = running_nodes_filename(),
+ Nodes = running_clustered_nodes() -- [node()],
+ %% Don't check the result: we're shutting down anyway and this is
+ %% a best-effort-basis.
+ rabbit_misc:write_term_file(FileName, [Nodes]),
+ ok.
+
+read_previously_running_nodes() ->
+ FileName = running_nodes_filename(),
+ case rabbit_misc:read_term_file(FileName) of
+ {ok, [Nodes]} -> Nodes;
+ {error, enoent} -> [];
+ {error, Reason} -> throw({error, {cannot_read_previous_nodes_file,
+ FileName, Reason}})
+ end.
+
+delete_previously_running_nodes() ->
+ FileName = running_nodes_filename(),
+ case file:delete(FileName) of
+ ok -> ok;
+ {error, enoent} -> ok;
+ {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file,
+ FileName, Reason}})
+ end.
+
%% Take a cluster node config and create the right kind of node - a
%% standalone disk node, or disk or ram node connected to the
%% specified cluster nodes. If Force is false, don't allow
%% connections to offline nodes.
-init_db(ClusterNodes, Force) ->
+init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) ->
UClusterNodes = lists:usort(ClusterNodes),
ProperClusterNodes = UClusterNodes -- [node()],
case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
@@ -387,26 +428,21 @@ init_db(ClusterNodes, Force) ->
end;
true -> ok
end,
- case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of
- {[], true, [_]} ->
- %% True single disc node, attempt upgrade
- case rabbit_upgrade:maybe_upgrade() of
- ok -> ensure_schema_integrity();
- version_not_available -> schema_ok_or_move()
- end;
- {[], true, _} ->
- %% "Master" (i.e. without config) disc node in cluster,
- %% verify schema
- ensure_version_ok(rabbit_upgrade:read_version()),
- ensure_schema_integrity();
- {[], false, _} ->
+ case {Nodes, mnesia:system_info(use_dir)} of
+ {[], false} ->
%% Nothing there at all, start from scratch
ok = create_schema();
- {[AnotherNode|_], _, _} ->
+ {[], true} ->
+ %% We're the first node up
+ case rabbit_upgrade:maybe_upgrade_local() of
+ ok -> ensure_schema_integrity();
+ version_not_available -> ok = schema_ok_or_move()
+ end,
+ ok;
+ {[AnotherNode|_], _} ->
%% Subsequent node in cluster, catch up
- ensure_version_ok(rabbit_upgrade:read_version()),
ensure_version_ok(
- rpc:call(AnotherNode, rabbit_upgrade, read_version, [])),
+ rpc:call(AnotherNode, rabbit_version, recorded, [])),
IsDiskNode = ClusterNodes == [] orelse
lists:member(node(), ClusterNodes),
ok = wait_for_replicated_tables(),
@@ -415,7 +451,9 @@ init_db(ClusterNodes, Force) ->
true -> disc;
false -> ram
end),
- ensure_schema_integrity()
+ ok = SecondaryPostMnesiaFun(),
+ ensure_schema_integrity(),
+ ok
end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
@@ -424,6 +462,14 @@ init_db(ClusterNodes, Force) ->
throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
end.
+maybe_upgrade_local_or_record_desired() ->
+ case rabbit_upgrade:maybe_upgrade_local() of
+ ok -> ok;
+ %% If we're just starting up a new node we won't have a
+ %% version
+ version_not_available -> ok = rabbit_version:record_desired()
+ end.
+
schema_ok_or_move() ->
case check_schema_integrity() of
ok ->
@@ -440,13 +486,13 @@ schema_ok_or_move() ->
end.
ensure_version_ok({ok, DiscVersion}) ->
- case rabbit_upgrade:desired_version() of
- DiscVersion -> ok;
- DesiredVersion -> throw({error, {schema_mismatch,
- DesiredVersion, DiscVersion}})
+ DesiredVersion = rabbit_version:desired(),
+ case rabbit_version:matches(DesiredVersion, DiscVersion) of
+ true -> ok;
+ false -> throw({error, {version_mismatch, DesiredVersion, DiscVersion}})
end;
ensure_version_ok({error, _}) ->
- ok = rabbit_upgrade:write_version().
+ ok = rabbit_version:record_desired().
create_schema() ->
mnesia:stop(),
@@ -456,7 +502,7 @@ create_schema() ->
cannot_start_mnesia),
ok = create_tables(),
ensure_schema_integrity(),
- ok = rabbit_upgrade:write_version().
+ ok = rabbit_version:record_desired().
move_db() ->
mnesia:stop(),
@@ -481,13 +527,8 @@ move_db() ->
ok.
copy_db(Destination) ->
- mnesia:stop(),
- case rabbit_misc:recursive_copy(dir(), Destination) of
- ok ->
- rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia);
- {error, E} ->
- {error, E}
- end.
+ ok = ensure_mnesia_not_running(),
+ rabbit_misc:recursive_copy(dir(), Destination).
create_tables() ->
lists:foreach(fun ({Tab, TabDef}) ->
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 7bb8c0ea..8800e8d6 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -235,7 +235,7 @@ post_process_script(ScriptFile) ->
{error, {failed_to_load_script, Reason}}
end.
-process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) ->
+process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) ->
[{apply,{rabbit,prepare,[]}}, Entry];
process_entry(Entry) ->
[Entry].
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 33c5391b..367953b8 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -169,7 +169,7 @@
%%----------------------------------------------------------------------------
--rabbit_upgrade({add_queue_ttl, []}).
+-rabbit_upgrade({add_queue_ttl, local, []}).
-ifdef(use_specs).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index ebda5d03..5ec08330 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -16,7 +16,7 @@
-module(rabbit_upgrade).
--export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]).
+-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]).
-include("rabbit.hrl").
@@ -27,141 +27,260 @@
-ifdef(use_specs).
--type(step() :: atom()).
--type(version() :: [step()]).
-
--spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available').
--spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())).
--spec(write_version/0 :: () -> 'ok').
--spec(desired_version/0 :: () -> version()).
+-spec(maybe_upgrade_mnesia/0 :: () -> 'ok').
+-spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available').
-endif.
%% -------------------------------------------------------------------
-%% Try to upgrade the schema. If no information on the existing schema
-%% could be found, do nothing. rabbit_mnesia:check_schema_integrity()
-%% will catch the problem.
-maybe_upgrade() ->
- case read_version() of
- {ok, CurrentHeads} ->
- with_upgrade_graph(
- fun (G) ->
- case unknown_heads(CurrentHeads, G) of
- [] -> case upgrades_to_apply(CurrentHeads, G) of
- [] -> ok;
- Upgrades -> apply_upgrades(Upgrades)
- end;
- Unknown -> throw({error,
- {future_upgrades_found, Unknown}})
- end
- end);
- {error, enoent} ->
- version_not_available
+%% The upgrade logic is quite involved, due to the existence of
+%% clusters.
+%%
+%% Firstly, we have two different types of upgrades to do: Mnesia and
+%% everythinq else. Mnesia upgrades must only be done by one node in
+%% the cluster (we treat a non-clustered node as a single-node
+%% cluster). This is the primary upgrader. The other upgrades need to
+%% be done by all nodes.
+%%
+%% The primary upgrader has to start first (and do its Mnesia
+%% upgrades). Secondary upgraders need to reset their Mnesia database
+%% and then rejoin the cluster. They can't do the Mnesia upgrades as
+%% well and then merge databases since the cookie for each table will
+%% end up different and the merge will fail.
+%%
+%% This in turn means that we need to determine whether we are the
+%% primary or secondary upgrader *before* Mnesia comes up. If we
+%% didn't then the secondary upgrader would try to start Mnesia, and
+%% either hang waiting for a node which is not yet up, or fail since
+%% its schema differs from the other nodes in the cluster.
+%%
+%% Also, the primary upgrader needs to start Mnesia to do its
+%% upgrades, but needs to forcibly load tables rather than wait for
+%% them (in case it was not the last node to shut down, in which case
+%% it would wait forever).
+%%
+%% This in turn means that maybe_upgrade_mnesia/0 has to be patched
+%% into the boot process by prelaunch before the mnesia application is
+%% started. By the time Mnesia is started the upgrades have happened
+%% (on the primary), or Mnesia has been reset (on the secondary) and
+%% rabbit_mnesia:init_db/2 can then make the node rejoin the cluster
+%% in the normal way.
+%%
+%% The non-mnesia upgrades are then triggered by
+%% rabbit_mnesia:init_db/2. Of course, it's possible for a given
+%% upgrade process to only require Mnesia upgrades, or only require
+%% non-Mnesia upgrades. In the latter case no Mnesia resets and
+%% reclusterings occur.
+%%
+%% The primary upgrader needs to be a disc node. Ideally we would like
+%% it to be the last disc node to shut down (since otherwise there's a
+%% risk of data loss). On each node we therefore record the disc nodes
+%% that were still running when we shut down. A disc node that knows
+%% other nodes were up when it shut down, or a ram node, will refuse
+%% to be the primary upgrader, and will thus not start when upgrades
+%% are needed.
+%%
+%% However, this is racy if several nodes are shut down at once. Since
+%% rabbit records the running nodes, and shuts down before mnesia, the
+%% race manifests as all disc nodes thinking they are not the primary
+%% upgrader. Therefore the user can remove the record of the last disc
+%% node to shut down to get things going again. This may lose any
+%% mnesia changes that happened after the node chosen as the primary
+%% upgrader was shut down.
+
+%% -------------------------------------------------------------------
+
+ensure_backup_taken() ->
+ case filelib:is_file(lock_filename()) of
+ false -> case filelib:is_dir(backup_dir()) of
+ false -> ok = take_backup();
+ _ -> ok
+ end;
+ true -> throw({error, previous_upgrade_failed})
end.
-read_version() ->
- case rabbit_misc:read_term_file(schema_filename()) of
- {ok, [Heads]} -> {ok, Heads};
- {error, _} = Err -> Err
+take_backup() ->
+ BackupDir = backup_dir(),
+ case rabbit_mnesia:copy_db(BackupDir) of
+ ok -> info("upgrades: Mnesia dir backed up to ~p~n",
+ [BackupDir]);
+ {error, E} -> throw({could_not_back_up_mnesia_dir, E})
end.
-write_version() ->
- ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]),
- ok.
+ensure_backup_removed() ->
+ case filelib:is_dir(backup_dir()) of
+ true -> ok = remove_backup();
+ _ -> ok
+ end.
-desired_version() ->
- with_upgrade_graph(fun (G) -> heads(G) end).
+remove_backup() ->
+ ok = rabbit_misc:recursive_delete([backup_dir()]),
+ info("upgrades: Mnesia backup removed~n", []).
-%% -------------------------------------------------------------------
+maybe_upgrade_mnesia() ->
+ AllNodes = rabbit_mnesia:all_clustered_nodes(),
+ case rabbit_version:upgrades_required(mnesia) of
+ {error, version_not_available} ->
+ case AllNodes of
+ [_] -> ok;
+ _ -> die("Cluster upgrade needed but upgrading from "
+ "< 2.1.1.~nUnfortunately you will need to "
+ "rebuild the cluster.", [])
+ end;
+ {error, _} = Err ->
+ throw(Err);
+ {ok, []} ->
+ ok;
+ {ok, Upgrades} ->
+ ensure_backup_taken(),
+ ok = case upgrade_mode(AllNodes) of
+ primary -> primary_upgrade(Upgrades, AllNodes);
+ secondary -> secondary_upgrade(AllNodes)
+ end
+ end.
-with_upgrade_graph(Fun) ->
- case rabbit_misc:build_acyclic_graph(
- fun vertices/2, fun edges/2,
- rabbit_misc:all_module_attributes(rabbit_upgrade)) of
- {ok, G} -> try
- Fun(G)
- after
- true = digraph:delete(G)
- end;
- {error, {vertex, duplicate, StepName}} ->
- throw({error, {duplicate_upgrade_step, StepName}});
- {error, {edge, {bad_vertex, StepName}, _From, _To}} ->
- throw({error, {dependency_on_unknown_upgrade_step, StepName}});
- {error, {edge, {bad_edge, StepNames}, _From, _To}} ->
- throw({error, {cycle_in_upgrade_steps, StepNames}})
+upgrade_mode(AllNodes) ->
+ case nodes_running(AllNodes) of
+ [] ->
+ AfterUs = rabbit_mnesia:read_previously_running_nodes(),
+ case {is_disc_node(), AfterUs} of
+ {true, []} ->
+ primary;
+ {true, _} ->
+ Filename = rabbit_mnesia:running_nodes_filename(),
+ die("Cluster upgrade needed but other disc nodes shut "
+ "down after this one.~nPlease first start the last "
+ "disc node to shut down.~n~nNote: if several disc "
+ "nodes were shut down simultaneously they may "
+ "all~nshow this message. In which case, remove "
+ "the lock file on one of them and~nstart that node. "
+ "The lock file on this node is:~n~n ~s ", [Filename]);
+ {false, _} ->
+ die("Cluster upgrade needed but this is a ram node.~n"
+ "Please first start the last disc node to shut down.",
+ [])
+ end;
+ [Another|_] ->
+ MyVersion = rabbit_version:desired_for_scope(mnesia),
+ ErrFun = fun (ClusterVersion) ->
+ %% The other node(s) are running an
+ %% unexpected version.
+ die("Cluster upgrade needed but other nodes are "
+ "running ~p~nand I want ~p",
+ [ClusterVersion, MyVersion])
+ end,
+ case rpc:call(Another, rabbit_version, desired_for_scope,
+ [mnesia]) of
+ {badrpc, {'EXIT', {undef, _}}} -> ErrFun(unknown_old_version);
+ {badrpc, Reason} -> ErrFun({unknown, Reason});
+ CV -> case rabbit_version:matches(
+ MyVersion, CV) of
+ true -> secondary;
+ false -> ErrFun(CV)
+ end
+ end
end.
-vertices(Module, Steps) ->
- [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps].
+is_disc_node() ->
+ %% This is pretty ugly but we can't start Mnesia and ask it (will hang),
+ %% we can't look at the config file (may not include us even if we're a
+ %% disc node).
+ filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")).
+
+die(Msg, Args) ->
+ %% We don't throw or exit here since that gets thrown
+ %% straight out into do_boot, generating an erl_crash.dump
+ %% and displaying any error message in a confusing way.
+ error_logger:error_msg(Msg, Args),
+ io:format("~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args),
+ error_logger:logfile(close),
+ halt(1).
+
+primary_upgrade(Upgrades, Nodes) ->
+ Others = Nodes -- [node()],
+ ok = apply_upgrades(
+ mnesia,
+ Upgrades,
+ fun () ->
+ force_tables(),
+ case Others of
+ [] -> ok;
+ _ -> info("mnesia upgrades: Breaking cluster~n", []),
+ [{atomic, ok} = mnesia:del_table_copy(schema, Node)
+ || Node <- Others]
+ end
+ end),
+ ok.
-edges(_Module, Steps) ->
- [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires].
+force_tables() ->
+ [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()].
-unknown_heads(Heads, G) ->
- [H || H <- Heads, digraph:vertex(G, H) =:= false].
+secondary_upgrade(AllNodes) ->
+ %% must do this before we wipe out schema
+ IsDiscNode = is_disc_node(),
+ rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
+ cannot_delete_schema),
+ %% Note that we cluster with all nodes, rather than all disc nodes
+ %% (as we can't know all disc nodes at this point). This is safe as
+ %% we're not writing the cluster config, just setting up Mnesia.
+ ClusterNodes = case IsDiscNode of
+ true -> AllNodes;
+ false -> AllNodes -- [node()]
+ end,
+ rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
+ ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end),
+ ok = rabbit_version:record_desired_for_scope(mnesia),
+ ok.
-upgrades_to_apply(Heads, G) ->
- %% Take all the vertices which can reach the known heads. That's
- %% everything we've already applied. Subtract that from all
- %% vertices: that's what we have to apply.
- Unsorted = sets:to_list(
- sets:subtract(
- sets:from_list(digraph:vertices(G)),
- sets:from_list(digraph_utils:reaching(Heads, G)))),
- %% Form a subgraph from that list and find a topological ordering
- %% so we can invoke them in order.
- [element(2, digraph:vertex(G, StepName)) ||
- StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
+nodes_running(Nodes) ->
+ [N || N <- Nodes, node_running(N)].
-heads(G) ->
- lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]).
+node_running(Node) ->
+ case rpc:call(Node, application, which_applications, []) of
+ {badrpc, _} -> false;
+ Apps -> lists:keysearch(rabbit, 1, Apps) =/= false
+ end.
%% -------------------------------------------------------------------
-apply_upgrades(Upgrades) ->
- LockFile = lock_filename(dir()),
- case rabbit_misc:lock_file(LockFile) of
- ok ->
- BackupDir = dir() ++ "-upgrade-backup",
- info("Upgrades: ~w to apply~n", [length(Upgrades)]),
- case rabbit_mnesia:copy_db(BackupDir) of
- ok ->
- %% We need to make the backup after creating the
- %% lock file so that it protects us from trying to
- %% overwrite the backup. Unfortunately this means
- %% the lock file exists in the backup too, which
- %% is not intuitive. Remove it.
- ok = file:delete(lock_filename(BackupDir)),
- info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]),
- [apply_upgrade(Upgrade) || Upgrade <- Upgrades],
- info("Upgrades: All upgrades applied successfully~n", []),
- ok = write_version(),
- ok = rabbit_misc:recursive_delete([BackupDir]),
- info("Upgrades: Mnesia backup removed~n", []),
- ok = file:delete(LockFile);
- {error, E} ->
- %% If we can't backup, the upgrade hasn't started
- %% hence we don't need the lockfile since the real
- %% mnesia dir is the good one.
- ok = file:delete(LockFile),
- throw({could_not_back_up_mnesia_dir, E})
- end;
- {error, eexist} ->
- throw({error, previous_upgrade_failed})
+maybe_upgrade_local() ->
+ case rabbit_version:upgrades_required(local) of
+ {error, version_not_available} -> version_not_available;
+ {error, _} = Err -> throw(Err);
+ {ok, []} -> ensure_backup_removed(),
+ ok;
+ {ok, Upgrades} -> mnesia:stop(),
+ ensure_backup_taken(),
+ ok = apply_upgrades(local, Upgrades,
+ fun () -> ok end),
+ ensure_backup_removed(),
+ ok
end.
-apply_upgrade({M, F}) ->
- info("Upgrades: Applying ~w:~w~n", [M, F]),
+%% -------------------------------------------------------------------
+
+apply_upgrades(Scope, Upgrades, Fun) ->
+ ok = rabbit_misc:lock_file(lock_filename()),
+ info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]),
+ rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
+ Fun(),
+ [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades],
+ info("~s upgrades: All upgrades applied successfully~n", [Scope]),
+ ok = rabbit_version:record_desired_for_scope(Scope),
+ ok = file:delete(lock_filename()).
+
+apply_upgrade(Scope, {M, F}) ->
+ info("~s upgrades: Applying ~w:~w~n", [Scope, M, F]),
ok = apply(M, F, []).
%% -------------------------------------------------------------------
dir() -> rabbit_mnesia:dir().
-schema_filename() -> filename:join(dir(), ?VERSION_FILENAME).
-
+lock_filename() -> lock_filename(dir()).
lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME).
+backup_dir() -> dir() ++ "-upgrade-backup".
%% NB: we cannot use rabbit_log here since it may not have been
%% started yet
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index b9dbe418..7567c29e 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -20,12 +20,12 @@
-compile([export_all]).
--rabbit_upgrade({remove_user_scope, []}).
--rabbit_upgrade({hash_passwords, []}).
--rabbit_upgrade({add_ip_to_listener, []}).
--rabbit_upgrade({internal_exchanges, []}).
--rabbit_upgrade({user_to_internal_user, [hash_passwords]}).
--rabbit_upgrade({topic_trie, []}).
+-rabbit_upgrade({remove_user_scope, mnesia, []}).
+-rabbit_upgrade({hash_passwords, mnesia, []}).
+-rabbit_upgrade({add_ip_to_listener, mnesia, []}).
+-rabbit_upgrade({internal_exchanges, mnesia, []}).
+-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}).
+-rabbit_upgrade({topic_trie, mnesia, []}).
%% -------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 14c36b12..7a1102e5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -301,7 +301,7 @@
%%----------------------------------------------------------------------------
--rabbit_upgrade({multiple_routing_keys, []}).
+-rabbit_upgrade({multiple_routing_keys, local, []}).
-ifdef(use_specs).
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
new file mode 100644
index 00000000..400abc10
--- /dev/null
+++ b/src/rabbit_version.erl
@@ -0,0 +1,172 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_version).
+
+-export([recorded/0, matches/2, desired/0, desired_for_scope/1,
+ record_desired/0, record_desired_for_scope/1,
+ upgrades_required/1]).
+
+%% -------------------------------------------------------------------
+-ifdef(use_specs).
+
+-export_type([scope/0, step/0]).
+
+-type(scope() :: atom()).
+-type(scope_version() :: [atom()]).
+-type(step() :: {atom(), atom()}).
+
+-type(version() :: [atom()]).
+
+-spec(recorded/0 :: () -> rabbit_types:ok_or_error2(version(), any())).
+-spec(matches/2 :: ([A], [A]) -> boolean()).
+-spec(desired/0 :: () -> version()).
+-spec(desired_for_scope/1 :: (scope()) -> scope_version()).
+-spec(record_desired/0 :: () -> 'ok').
+-spec(record_desired_for_scope/1 ::
+ (scope()) -> rabbit_types:ok_or_error(any())).
+-spec(upgrades_required/1 ::
+ (scope()) -> rabbit_types:ok_or_error2([step()], any())).
+
+-endif.
+%% -------------------------------------------------------------------
+
+-define(VERSION_FILENAME, "schema_version").
+-define(SCOPES, [mnesia, local]).
+
+%% -------------------------------------------------------------------
+
+recorded() -> case rabbit_misc:read_term_file(schema_filename()) of
+ {ok, [V]} -> {ok, V};
+ {error, _} = Err -> Err
+ end.
+
+record(V) -> ok = rabbit_misc:write_term_file(schema_filename(), [V]).
+
+recorded_for_scope(Scope) ->
+ case recorded() of
+ {error, _} = Err ->
+ Err;
+ {ok, Version} ->
+ {ok, case lists:keysearch(Scope, 1, categorise_by_scope(Version)) of
+ false -> [];
+ {value, {Scope, SV1}} -> SV1
+ end}
+ end.
+
+record_for_scope(Scope, ScopeVersion) ->
+ case recorded() of
+ {error, _} = Err ->
+ Err;
+ {ok, Version} ->
+ Version1 = lists:keystore(Scope, 1, categorise_by_scope(Version),
+ {Scope, ScopeVersion}),
+ ok = record([Name || {_Scope, Names} <- Version1, Name <- Names])
+ end.
+
+%% -------------------------------------------------------------------
+
+matches(VerA, VerB) ->
+ lists:usort(VerA) =:= lists:usort(VerB).
+
+%% -------------------------------------------------------------------
+
+desired() -> [Name || Scope <- ?SCOPES, Name <- desired_for_scope(Scope)].
+
+desired_for_scope(Scope) -> with_upgrade_graph(fun heads/1, Scope).
+
+record_desired() -> record(desired()).
+
+record_desired_for_scope(Scope) ->
+ record_for_scope(Scope, desired_for_scope(Scope)).
+
+upgrades_required(Scope) ->
+ case recorded_for_scope(Scope) of
+ {error, enoent} ->
+ {error, version_not_available};
+ {ok, CurrentHeads} ->
+ with_upgrade_graph(
+ fun (G) ->
+ case unknown_heads(CurrentHeads, G) of
+ [] -> {ok, upgrades_to_apply(CurrentHeads, G)};
+ Unknown -> {error, {future_upgrades_found, Unknown}}
+ end
+ end, Scope)
+ end.
+
+%% -------------------------------------------------------------------
+
+with_upgrade_graph(Fun, Scope) ->
+ case rabbit_misc:build_acyclic_graph(
+ fun (Module, Steps) -> vertices(Module, Steps, Scope) end,
+ fun (Module, Steps) -> edges(Module, Steps, Scope) end,
+ rabbit_misc:all_module_attributes(rabbit_upgrade)) of
+ {ok, G} -> try
+ Fun(G)
+ after
+ true = digraph:delete(G)
+ end;
+ {error, {vertex, duplicate, StepName}} ->
+ throw({error, {duplicate_upgrade_step, StepName}});
+ {error, {edge, {bad_vertex, StepName}, _From, _To}} ->
+ throw({error, {dependency_on_unknown_upgrade_step, StepName}});
+ {error, {edge, {bad_edge, StepNames}, _From, _To}} ->
+ throw({error, {cycle_in_upgrade_steps, StepNames}})
+ end.
+
+vertices(Module, Steps, Scope0) ->
+ [{StepName, {Module, StepName}} || {StepName, Scope1, _Reqs} <- Steps,
+ Scope0 == Scope1].
+
+edges(_Module, Steps, Scope0) ->
+ [{Require, StepName} || {StepName, Scope1, Requires} <- Steps,
+ Require <- Requires,
+ Scope0 == Scope1].
+unknown_heads(Heads, G) ->
+ [H || H <- Heads, digraph:vertex(G, H) =:= false].
+
+upgrades_to_apply(Heads, G) ->
+ %% Take all the vertices which can reach the known heads. That's
+ %% everything we've already applied. Subtract that from all
+ %% vertices: that's what we have to apply.
+ Unsorted = sets:to_list(
+ sets:subtract(
+ sets:from_list(digraph:vertices(G)),
+ sets:from_list(digraph_utils:reaching(Heads, G)))),
+ %% Form a subgraph from that list and find a topological ordering
+ %% so we can invoke them in order.
+ [element(2, digraph:vertex(G, StepName)) ||
+ StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
+
+heads(G) ->
+ lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]).
+
+%% -------------------------------------------------------------------
+
+categorise_by_scope(Version) when is_list(Version) ->
+ Categorised =
+ [{Scope, Name} || {_Module, Attributes} <-
+ rabbit_misc:all_module_attributes(rabbit_upgrade),
+ {Name, Scope, _Requires} <- Attributes,
+ lists:member(Name, Version)],
+ orddict:to_list(
+ lists:foldl(fun ({Scope, Name}, CatVersion) ->
+ rabbit_misc:orddict_cons(Scope, Name, CatVersion)
+ end, orddict:new(), Categorised)).
+
+dir() -> rabbit_mnesia:dir().
+
+schema_filename() -> filename:join(dir(), ?VERSION_FILENAME).