From fe3a8699396d5ea3d9e4d0f67ab411adbf9a24d5 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 4 Jan 2011 13:39:08 +0000 Subject: Sketch of how clustered upgrades might work. --- src/rabbit_mnesia.erl | 81 ++++++++++++++++++++++++++++------------ src/rabbit_queue_index.erl | 2 +- src/rabbit_upgrade.erl | 52 ++++++++++++++++---------- src/rabbit_upgrade_functions.erl | 33 ++++++++++++++-- 4 files changed, 120 insertions(+), 48 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 11f5e410..2550bdd4 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -35,7 +35,7 @@ -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, force_cluster/1, reset/0, force_reset/0, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1]). + forget_other_nodes/0, empty_ram_only_tables/0, copy_db/1]). -export([table_names/0]). @@ -66,6 +66,7 @@ -spec(is_clustered/0 :: () -> boolean()). -spec(running_clustered_nodes/0 :: () -> [node()]). -spec(all_clustered_nodes/0 :: () -> [node()]). +-spec(forget_other_nodes/0 :: () -> 'ok'). -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). @@ -126,8 +127,8 @@ cluster(ClusterNodes, Force) -> %% return node to its virgin state, where it is not member of any %% cluster, has no cluster configuration, no local database, and no %% persisted messages -reset() -> reset(false). -force_reset() -> reset(true). +reset() -> reset(all). +force_reset() -> reset(force_all). is_clustered() -> RunningNodes = running_clustered_nodes(), @@ -139,6 +140,10 @@ all_clustered_nodes() -> running_clustered_nodes() -> mnesia:system_info(running_db_nodes). +forget_other_nodes() -> + Nodes = all_clustered_nodes() -- [node()], + [{atomic, ok} = mnesia:del_table_copy(schema, Node) || Node <- Nodes]. + empty_ram_only_tables() -> Node = node(), lists:foreach( @@ -385,32 +390,54 @@ init_db(ClusterNodes, Force) -> {[], true, [_]} -> %% True single disc node, attempt upgrade ok = wait_for_tables(), - case rabbit_upgrade:maybe_upgrade() of + case rabbit_upgrade:maybe_upgrade([mnesia, local]) of ok -> ensure_schema_ok(); version_not_available -> schema_ok_or_move() end; {[], true, _} -> %% "Master" (i.e. without config) disc node in cluster, - %% verify schema + %% do upgrade ok = wait_for_tables(), - ensure_version_ok(rabbit_upgrade:read_version()), - ensure_schema_ok(); + case rabbit_upgrade:maybe_upgrade([mnesia, local]) of + ok -> ensure_schema_ok(); + version_not_available -> schema_ok_or_move() + end; {[], false, _} -> %% Nothing there at all, start from scratch ok = create_schema(); {[AnotherNode|_], _, _} -> %% Subsequent node in cluster, catch up - ensure_version_ok(rabbit_upgrade:read_version()), - ensure_version_ok( - rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), - ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, disc_copies), - ok = create_local_table_copies(case IsDiskNode of - true -> disc; - false -> ram - end), + case IsDiskNode of + true -> + %% TODO test this branch ;) + %% TODO don't just reset every time we start up! + mnesia:stop(), + reset(mnesia), + mnesia:start(), + %% TODO what should we ensure? + %% ensure_version_ok(rabbit_upgrade:read_version()), + %% ensure_version_ok( + %% rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), + %% TODO needed? + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(disc); + false -> + ok = wait_for_replicated_tables(), + %% TODO can we live without this on disc? + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(ram), + case rabbit_upgrade:maybe_upgrade([local]) of + ok -> + ok; + %% If we're just starting up a new node + %% we won't have a version + version_not_available -> + ok = rabbit_upgrade:write_version() + end + end, ensure_schema_ok() end; {error, Reason} -> @@ -563,12 +590,15 @@ wait_for_tables(TableNames) -> throw({error, {failed_waiting_for_tables, Reason}}) end. -reset(Force) -> +%% Mode: force_all - get rid of everything unconditionally +%% all - get rid of everything, conditional on Mnesia working +%% mnesia - just get rid of Mnesia, leave everything else +reset(Mode) -> ok = ensure_mnesia_not_running(), Node = node(), - case Force of - true -> ok; - false -> + case Mode of + force_all -> ok; + _ -> ok = ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), {Nodes, RunningNodes} = @@ -583,9 +613,14 @@ reset(Force) -> rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), cannot_delete_schema) end, - ok = delete_cluster_nodes_config(), - %% remove persisted messages and any other garbage we find - ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")), + case Mode of + mnesia -> + ok; + _ -> + ok = delete_cluster_nodes_config(), + %% remove persisted messages and any other garbage we find + ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")) + end, ok. leave_cluster([], _) -> ok; diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 76c0a4ef..6adcd8b0 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -182,7 +182,7 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({add_queue_ttl, []}). +-rabbit_upgrade({add_queue_ttl, local, []}). -ifdef(use_specs). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 97a07514..dee08f48 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -21,7 +21,7 @@ -module(rabbit_upgrade). --export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]). +-export([maybe_upgrade/1, read_version/0, write_version/0, desired_version/0]). -include("rabbit.hrl"). @@ -33,9 +33,10 @@ -ifdef(use_specs). -type(step() :: atom()). +-type(scope() :: 'mnesia' | 'local'). -type(version() :: [step()]). --spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available'). +-spec(maybe_upgrade/1 :: ([scope()]) -> 'ok' | 'version_not_available'). -spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). -spec(desired_version/0 :: () -> version()). @@ -47,24 +48,28 @@ %% Try to upgrade the schema. If no information on the existing schema %% could be found, do nothing. rabbit_mnesia:check_schema_integrity() %% will catch the problem. -maybe_upgrade() -> +maybe_upgrade(Scopes) -> case read_version() of {ok, CurrentHeads} -> with_upgrade_graph( - fun (G) -> - case unknown_heads(CurrentHeads, G) of - [] -> case upgrades_to_apply(CurrentHeads, G) of - [] -> ok; - Upgrades -> apply_upgrades(Upgrades) - end; - Unknown -> throw({error, - {future_upgrades_found, Unknown}}) - end - end); + fun (G) -> maybe_upgrade_graph(CurrentHeads, Scopes, G) end); {error, enoent} -> version_not_available end. +maybe_upgrade_graph(CurrentHeads, Scopes, G) -> + case unknown_heads(CurrentHeads, G) of + [] -> + case upgrades_to_apply(CurrentHeads, Scopes, G) of + [] -> + ok; + Upgrades -> + apply_upgrades(Upgrades, lists:member(mnesia, Scopes)) + end; + Unknown -> + throw({error, {future_upgrades_found, Unknown}}) + end. + read_version() -> case rabbit_misc:read_term_file(schema_filename()) of {ok, [Heads]} -> {ok, Heads}; @@ -98,16 +103,17 @@ with_upgrade_graph(Fun) -> end. vertices(Module, Steps) -> - [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps]. + [{StepName, {Scope, {Module, StepName}}} || + {StepName, Scope, _Reqs} <- Steps]. edges(_Module, Steps) -> - [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. - + [{Require, StepName} || {StepName, _Scope, Requires} <- Steps, + Require <- Requires]. unknown_heads(Heads, G) -> [H || H <- Heads, digraph:vertex(G, H) =:= false]. -upgrades_to_apply(Heads, G) -> +upgrades_to_apply(Heads, Scopes, G) -> %% Take all the vertices which can reach the known heads. That's %% everything we've already applied. Subtract that from all %% vertices: that's what we have to apply. @@ -117,15 +123,17 @@ upgrades_to_apply(Heads, G) -> sets:from_list(digraph_utils:reaching(Heads, G)))), %% Form a subgraph from that list and find a topological ordering %% so we can invoke them in order. - [element(2, digraph:vertex(G, StepName)) || - StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. + Sorted = [element(2, digraph:vertex(G, StepName)) || + StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))], + %% Only return the upgrades for the appropriate scopes + [Upgrade || {Scope, Upgrade} <- Sorted, lists:member(Scope, Scopes)]. heads(G) -> lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). %% ------------------------------------------------------------------- -apply_upgrades(Upgrades) -> +apply_upgrades(Upgrades, ForgetOthers) -> LockFile = lock_filename(dir()), case rabbit_misc:lock_file(LockFile) of ok -> @@ -140,6 +148,10 @@ apply_upgrades(Upgrades) -> %% is not intuitive. Remove it. ok = file:delete(lock_filename(BackupDir)), info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), + case ForgetOthers of + true -> rabbit_mnesia:forget_other_nodes(); + _ -> ok + end, [apply_upgrade(Upgrade) || Upgrade <- Upgrades], info("Upgrades: All upgrades applied successfully~n", []), ok = write_version(), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 7848c848..43e468ff 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -24,10 +24,14 @@ -compile([export_all]). --rabbit_upgrade({remove_user_scope, []}). --rabbit_upgrade({hash_passwords, []}). --rabbit_upgrade({add_ip_to_listener, []}). --rabbit_upgrade({internal_exchanges, []}). +-rabbit_upgrade({remove_user_scope, mnesia, []}). +-rabbit_upgrade({hash_passwords, mnesia, []}). +-rabbit_upgrade({add_ip_to_listener, mnesia, []}). +-rabbit_upgrade({internal_exchanges, mnesia, []}). + +-rabbit_upgrade({one, mnesia, []}). +-rabbit_upgrade({two, local, [one]}). +-rabbit_upgrade({three, mnesia, [two]}). %% ------------------------------------------------------------------- @@ -85,6 +89,27 @@ internal_exchanges() -> || T <- Tables ], ok. +one() -> + mnesia( + rabbit_user, + fun ({user, Username, Hash, IsAdmin}) -> + {user, Username, Hash, IsAdmin, foo} + end, + [username, password_hash, is_admin, extra]). + +two() -> + ok = rabbit_misc:write_term_file(filename:join(rabbit_mnesia:dir(), "test"), + [test]). + +three() -> + mnesia( + rabbit_user, + fun ({user, Username, Hash, IsAdmin, _}) -> + {user, Username, Hash, IsAdmin} + end, + [username, password_hash, is_admin]). + + %%-------------------------------------------------------------------- mnesia(TableName, Fun, FieldList) -> -- cgit v1.2.1 From 0b093ecb559424e2b2c7809cba5dc2cbdfab710c Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 4 Jan 2011 14:18:51 +0000 Subject: These two cases are the same. --- src/rabbit_mnesia.erl | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 2550bdd4..f1e007a1 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -386,26 +386,19 @@ init_db(ClusterNodes, Force) -> end; true -> ok end, - case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of - {[], true, [_]} -> - %% True single disc node, attempt upgrade + case {Nodes, mnesia:system_info(use_dir)} of + {[], true} -> + %% True single disc node, or master" (i.e. without + %% config) disc node in cluster, attempt upgrade ok = wait_for_tables(), case rabbit_upgrade:maybe_upgrade([mnesia, local]) of ok -> ensure_schema_ok(); version_not_available -> schema_ok_or_move() end; - {[], true, _} -> - %% "Master" (i.e. without config) disc node in cluster, - %% do upgrade - ok = wait_for_tables(), - case rabbit_upgrade:maybe_upgrade([mnesia, local]) of - ok -> ensure_schema_ok(); - version_not_available -> schema_ok_or_move() - end; - {[], false, _} -> + {[], false} -> %% Nothing there at all, start from scratch ok = create_schema(); - {[AnotherNode|_], _, _} -> + {[AnotherNode|_], _} -> %% Subsequent node in cluster, catch up IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), -- cgit v1.2.1 From 50a9fc4fb471d68225090f0b0fe39ead5110012b Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 4 Jan 2011 16:09:18 +0000 Subject: Make disc node reclustering work, various cleanups. --- src/rabbit_mnesia.erl | 94 +++++++++++++++++++++++--------------------------- src/rabbit_upgrade.erl | 24 ++++++------- 2 files changed, 53 insertions(+), 65 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index f1e007a1..e5929f86 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -35,7 +35,7 @@ -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, force_cluster/1, reset/0, force_reset/0, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - forget_other_nodes/0, empty_ram_only_tables/0, copy_db/1]). + empty_ram_only_tables/0, copy_db/1]). -export([table_names/0]). @@ -66,7 +66,6 @@ -spec(is_clustered/0 :: () -> boolean()). -spec(running_clustered_nodes/0 :: () -> [node()]). -spec(all_clustered_nodes/0 :: () -> [node()]). --spec(forget_other_nodes/0 :: () -> 'ok'). -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). @@ -127,8 +126,8 @@ cluster(ClusterNodes, Force) -> %% return node to its virgin state, where it is not member of any %% cluster, has no cluster configuration, no local database, and no %% persisted messages -reset() -> reset(all). -force_reset() -> reset(force_all). +reset() -> reset(false). +force_reset() -> reset(true). is_clustered() -> RunningNodes = running_clustered_nodes(), @@ -388,10 +387,11 @@ init_db(ClusterNodes, Force) -> end, case {Nodes, mnesia:system_info(use_dir)} of {[], true} -> - %% True single disc node, or master" (i.e. without + %% True single disc node, or "master" (i.e. without %% config) disc node in cluster, attempt upgrade ok = wait_for_tables(), - case rabbit_upgrade:maybe_upgrade([mnesia, local]) of + case rabbit_upgrade:maybe_upgrade( + [mnesia, local], fun forget_other_nodes/0) of ok -> ensure_schema_ok(); version_not_available -> schema_ok_or_move() end; @@ -400,37 +400,27 @@ init_db(ClusterNodes, Force) -> ok = create_schema(); {[AnotherNode|_], _} -> %% Subsequent node in cluster, catch up - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), - case IsDiskNode of - true -> - %% TODO test this branch ;) - %% TODO don't just reset every time we start up! - mnesia:stop(), - reset(mnesia), - mnesia:start(), - %% TODO what should we ensure? - %% ensure_version_ok(rabbit_upgrade:read_version()), - %% ensure_version_ok( - %% rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), - %% TODO needed? - ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, disc_copies), - ok = create_local_table_copies(disc); - false -> - ok = wait_for_replicated_tables(), - %% TODO can we live without this on disc? - ok = create_local_table_copy(schema, disc_copies), - ok = create_local_table_copies(ram), - case rabbit_upgrade:maybe_upgrade([local]) of - ok -> - ok; - %% If we're just starting up a new node - %% we won't have a version - version_not_available -> - ok = rabbit_upgrade:write_version() - end + %% TODO what should we ensure? + %% ensure_version_ok(rabbit_upgrade:read_version()), + %% ensure_version_ok( + %% rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), + Type = case ClusterNodes == [] orelse + lists:member(node(), ClusterNodes) of + true -> disc; + false -> ram + end, + case rabbit_upgrade:maybe_upgrade( + [local], reset_fun(ProperClusterNodes)) of + ok -> + ok; + %% If we're just starting up a new node + %% we won't have a version + version_not_available -> + ok = rabbit_upgrade:write_version() end, + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(Type), ensure_schema_ok() end; {error, Reason} -> @@ -470,6 +460,16 @@ ensure_schema_ok() -> {error, Reason} -> throw({error, {schema_invalid, Reason}}) end. +reset_fun(ProperClusterNodes) -> + fun() -> + mnesia:stop(), + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema), + rabbit_misc:ensure_ok(mnesia:start(), + cannot_start_mnesia), + {ok, _} = mnesia:change_config(extra_db_nodes, ProperClusterNodes) + end. + create_schema() -> mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), @@ -583,15 +583,12 @@ wait_for_tables(TableNames) -> throw({error, {failed_waiting_for_tables, Reason}}) end. -%% Mode: force_all - get rid of everything unconditionally -%% all - get rid of everything, conditional on Mnesia working -%% mnesia - just get rid of Mnesia, leave everything else -reset(Mode) -> +reset(Force) -> ok = ensure_mnesia_not_running(), Node = node(), - case Mode of - force_all -> ok; - _ -> + case Force of + true -> ok; + false -> ok = ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), {Nodes, RunningNodes} = @@ -606,14 +603,9 @@ reset(Mode) -> rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), cannot_delete_schema) end, - case Mode of - mnesia -> - ok; - _ -> - ok = delete_cluster_nodes_config(), - %% remove persisted messages and any other garbage we find - ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")) - end, + ok = delete_cluster_nodes_config(), + %% remove persisted messages and any other garbage we find + ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")), ok. leave_cluster([], _) -> ok; diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index dee08f48..7e59faaf 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -21,7 +21,7 @@ -module(rabbit_upgrade). --export([maybe_upgrade/1, read_version/0, write_version/0, desired_version/0]). +-export([maybe_upgrade/2, read_version/0, write_version/0, desired_version/0]). -include("rabbit.hrl"). @@ -36,7 +36,8 @@ -type(scope() :: 'mnesia' | 'local'). -type(version() :: [step()]). --spec(maybe_upgrade/1 :: ([scope()]) -> 'ok' | 'version_not_available'). +%% TODO update +%%-spec(maybe_upgrade/1 :: ([scope()]) -> 'ok' | 'version_not_available'). -spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). -spec(desired_version/0 :: () -> version()). @@ -48,23 +49,21 @@ %% Try to upgrade the schema. If no information on the existing schema %% could be found, do nothing. rabbit_mnesia:check_schema_integrity() %% will catch the problem. -maybe_upgrade(Scopes) -> +maybe_upgrade(Scopes, Fun) -> case read_version() of {ok, CurrentHeads} -> with_upgrade_graph( - fun (G) -> maybe_upgrade_graph(CurrentHeads, Scopes, G) end); + fun (G) -> maybe_upgrade_graph(CurrentHeads, Scopes, Fun, G) end); {error, enoent} -> version_not_available end. -maybe_upgrade_graph(CurrentHeads, Scopes, G) -> +maybe_upgrade_graph(CurrentHeads, Scopes, Fun, G) -> case unknown_heads(CurrentHeads, G) of [] -> case upgrades_to_apply(CurrentHeads, Scopes, G) of - [] -> - ok; - Upgrades -> - apply_upgrades(Upgrades, lists:member(mnesia, Scopes)) + [] -> ok; + Upgrades -> apply_upgrades(Upgrades, Fun) end; Unknown -> throw({error, {future_upgrades_found, Unknown}}) @@ -133,7 +132,7 @@ heads(G) -> %% ------------------------------------------------------------------- -apply_upgrades(Upgrades, ForgetOthers) -> +apply_upgrades(Upgrades, Fun) -> LockFile = lock_filename(dir()), case rabbit_misc:lock_file(LockFile) of ok -> @@ -148,10 +147,7 @@ apply_upgrades(Upgrades, ForgetOthers) -> %% is not intuitive. Remove it. ok = file:delete(lock_filename(BackupDir)), info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), - case ForgetOthers of - true -> rabbit_mnesia:forget_other_nodes(); - _ -> ok - end, + Fun(), [apply_upgrade(Upgrade) || Upgrade <- Upgrades], info("Upgrades: All upgrades applied successfully~n", []), ok = write_version(), -- cgit v1.2.1 From 34ca7a82d250748ea59d92aa499cb562c8332ae4 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 4 Jan 2011 16:12:07 +0000 Subject: Revert arbitrary difference from default. --- src/rabbit_mnesia.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index e5929f86..d8086b56 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -404,11 +404,8 @@ init_db(ClusterNodes, Force) -> %% ensure_version_ok(rabbit_upgrade:read_version()), %% ensure_version_ok( %% rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), - Type = case ClusterNodes == [] orelse - lists:member(node(), ClusterNodes) of - true -> disc; - false -> ram - end, + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), case rabbit_upgrade:maybe_upgrade( [local], reset_fun(ProperClusterNodes)) of ok -> @@ -420,7 +417,10 @@ init_db(ClusterNodes, Force) -> end, ok = wait_for_replicated_tables(), ok = create_local_table_copy(schema, disc_copies), - ok = create_local_table_copies(Type), + ok = create_local_table_copies(case IsDiskNode of + true -> disc; + false -> ram + end), ensure_schema_ok() end; {error, Reason} -> -- cgit v1.2.1 From 8aaef521a855a3df1223e3b1abeafe204b1e58b6 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 4 Jan 2011 16:24:59 +0000 Subject: Fix spec --- src/rabbit_mnesia.erl | 6 ++++-- src/rabbit_upgrade.erl | 6 +++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index d8086b56..11e9a178 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -141,7 +141,8 @@ running_clustered_nodes() -> forget_other_nodes() -> Nodes = all_clustered_nodes() -- [node()], - [{atomic, ok} = mnesia:del_table_copy(schema, Node) || Node <- Nodes]. + [{atomic, ok} = mnesia:del_table_copy(schema, Node) || Node <- Nodes], + ok. empty_ram_only_tables() -> Node = node(), @@ -467,7 +468,8 @@ reset_fun(ProperClusterNodes) -> cannot_delete_schema), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - {ok, _} = mnesia:change_config(extra_db_nodes, ProperClusterNodes) + {ok, _} = mnesia:change_config(extra_db_nodes, ProperClusterNodes), + ok end. create_schema() -> diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 7e59faaf..48c00d69 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -36,8 +36,8 @@ -type(scope() :: 'mnesia' | 'local'). -type(version() :: [step()]). -%% TODO update -%%-spec(maybe_upgrade/1 :: ([scope()]) -> 'ok' | 'version_not_available'). +-spec(maybe_upgrade/2 :: ([scope()], fun (() -> 'ok')) + -> 'ok' | 'version_not_available'). -spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). -spec(desired_version/0 :: () -> version()). @@ -147,7 +147,7 @@ apply_upgrades(Upgrades, Fun) -> %% is not intuitive. Remove it. ok = file:delete(lock_filename(BackupDir)), info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), - Fun(), + ok = Fun(), [apply_upgrade(Upgrade) || Upgrade <- Upgrades], info("Upgrades: All upgrades applied successfully~n", []), ok = write_version(), -- cgit v1.2.1 From 00dd61ca4b2372d698225ea3e58a932bdd1baffc Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 4 Jan 2011 16:34:23 +0000 Subject: Check our version matches the remote version. --- src/rabbit_mnesia.erl | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 11e9a178..82e2a30e 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -401,10 +401,8 @@ init_db(ClusterNodes, Force) -> ok = create_schema(); {[AnotherNode|_], _} -> %% Subsequent node in cluster, catch up - %% TODO what should we ensure? - %% ensure_version_ok(rabbit_upgrade:read_version()), - %% ensure_version_ok( - %% rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), + ensure_version_ok( + rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), case rabbit_upgrade:maybe_upgrade( -- cgit v1.2.1 From cec5a2c8548dcc6c7a7ad44c7b72361adca1fccb Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 6 Jan 2011 17:50:21 +0000 Subject: Decide the node to do mnesia upgrades based on which was the last disc node to shut down. Blow up with a hopefully helpful error message if the "wrong" disc node is started first. This works; you can now upgrade a disc-only cluster. --- src/rabbit_mnesia.erl | 115 ++++++++++++++++++++++++++++++++++--------------- src/rabbit_upgrade.erl | 18 ++++---- 2 files changed, 91 insertions(+), 42 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 82e2a30e..49d04116 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -43,6 +43,8 @@ %% other mnesia-using Erlang applications, such as ejabberd -export([create_tables/0]). +-define(EXAMPLE_RABBIT_TABLE, rabbit_durable_exchange). + -include("rabbit.hrl"). %%---------------------------------------------------------------------------- @@ -164,7 +166,7 @@ nodes_of_type(Type) -> %% Specifically, we check whether a certain table, which we know %% will be written to disk on a disc node, is stored on disk or in %% RAM. - mnesia:table_info(rabbit_durable_exchange, Type). + mnesia:table_info(?EXAMPLE_RABBIT_TABLE, Type). table_definitions() -> [{rabbit_user, @@ -387,40 +389,50 @@ init_db(ClusterNodes, Force) -> true -> ok end, case {Nodes, mnesia:system_info(use_dir)} of - {[], true} -> - %% True single disc node, or "master" (i.e. without - %% config) disc node in cluster, attempt upgrade - ok = wait_for_tables(), - case rabbit_upgrade:maybe_upgrade( - [mnesia, local], fun forget_other_nodes/0) of - ok -> ensure_schema_ok(); - version_not_available -> schema_ok_or_move() - end; {[], false} -> %% Nothing there at all, start from scratch ok = create_schema(); - {[AnotherNode|_], _} -> - %% Subsequent node in cluster, catch up - ensure_version_ok( - rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), - case rabbit_upgrade:maybe_upgrade( - [local], reset_fun(ProperClusterNodes)) of - ok -> - ok; - %% If we're just starting up a new node - %% we won't have a version - version_not_available -> - ok = rabbit_upgrade:write_version() - end, - ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, disc_copies), - ok = create_local_table_copies(case IsDiskNode of - true -> disc; - false -> ram - end), - ensure_schema_ok() + {_, _} -> + DiscNodes = mnesia:table_info(schema, disc_copies), + case are_we_upgrader(DiscNodes) of + true -> + %% True single disc node, or last disc + %% node in cluster to shut down, attempt + %% upgrade + ok = wait_for_tables(), + case rabbit_upgrade:maybe_upgrade( + [mnesia, local], + fun () -> ok end, + fun forget_other_nodes/0) of + ok -> ensure_schema_ok(); + version_not_available -> schema_ok_or_move() + end; + false -> + %% Subsequent node in cluster, catch up + %% TODO how to do this? + %% ensure_version_ok( + %% rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), + case rabbit_upgrade:maybe_upgrade( + [local], + ensure_nodes_running_fun(DiscNodes), + reset_fun(DiscNodes -- [node()])) of + ok -> + ok; + %% If we're just starting up a new node + %% we won't have a version + version_not_available -> + ok = rabbit_upgrade:write_version() + end, + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(case IsDiskNode of + true -> disc; + false -> ram + end), + ensure_schema_ok() + end end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -459,17 +471,52 @@ ensure_schema_ok() -> {error, Reason} -> throw({error, {schema_invalid, Reason}}) end. -reset_fun(ProperClusterNodes) -> +ensure_nodes_running_fun(Nodes) -> + fun() -> + case nodes_running(Nodes) of + [] -> + exit("Cluster upgrade needed. The first node you start " + "should be the last node to be shut down."); + _ -> + ok + end + end. + +reset_fun(Nodes) -> fun() -> mnesia:stop(), rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - {ok, _} = mnesia:change_config(extra_db_nodes, ProperClusterNodes), + {ok, _} = mnesia:change_config(extra_db_nodes, Nodes), ok end. +%% Were we the last node in the cluster to shut down or is there no cluster? +%% The answer to this is yes if: +%% * We are our canonical source for reading a table +%% - If the canonical source is "nowhere" or another node, we are out of date +%% * No other nodes are running Mnesia and have finished booting Rabbit. +%% - Since any node will be its own canonical source once the cluster is up. + +are_we_upgrader(Nodes) -> + Where = mnesia:table_info(?EXAMPLE_RABBIT_TABLE, where_to_read), + Node = node(), + case {Where, nodes_running(Nodes)} of + {Node, []} -> true; + {_, _} -> false + end. + +nodes_running(Nodes) -> + [N || N <- Nodes, node_running(N)]. + +node_running(Node) -> + case rpc:call(Node, application, which_applications, []) of + {badrpc, _} -> false; + Apps -> lists:keysearch(rabbit, 1, Apps) =/= false + end. + create_schema() -> mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 48c00d69..c852a0f9 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -21,7 +21,7 @@ -module(rabbit_upgrade). --export([maybe_upgrade/2, read_version/0, write_version/0, desired_version/0]). +-export([maybe_upgrade/3, read_version/0, write_version/0, desired_version/0]). -include("rabbit.hrl"). @@ -36,7 +36,7 @@ -type(scope() :: 'mnesia' | 'local'). -type(version() :: [step()]). --spec(maybe_upgrade/2 :: ([scope()], fun (() -> 'ok')) +-spec(maybe_upgrade/3 :: ([scope()], fun (() -> 'ok'), fun (() -> 'ok')) -> 'ok' | 'version_not_available'). -spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). @@ -49,21 +49,22 @@ %% Try to upgrade the schema. If no information on the existing schema %% could be found, do nothing. rabbit_mnesia:check_schema_integrity() %% will catch the problem. -maybe_upgrade(Scopes, Fun) -> +maybe_upgrade(Scopes, GuardFun, UpgradeFun) -> case read_version() of {ok, CurrentHeads} -> with_upgrade_graph( - fun (G) -> maybe_upgrade_graph(CurrentHeads, Scopes, Fun, G) end); + fun (G) -> maybe_upgrade_graph(CurrentHeads, Scopes, + GuardFun, UpgradeFun, G) end); {error, enoent} -> version_not_available end. -maybe_upgrade_graph(CurrentHeads, Scopes, Fun, G) -> +maybe_upgrade_graph(CurrentHeads, Scopes, GuardFun, UpgradeFun, G) -> case unknown_heads(CurrentHeads, G) of [] -> case upgrades_to_apply(CurrentHeads, Scopes, G) of [] -> ok; - Upgrades -> apply_upgrades(Upgrades, Fun) + Upgrades -> apply_upgrades(Upgrades, GuardFun, UpgradeFun) end; Unknown -> throw({error, {future_upgrades_found, Unknown}}) @@ -132,7 +133,8 @@ heads(G) -> %% ------------------------------------------------------------------- -apply_upgrades(Upgrades, Fun) -> +apply_upgrades(Upgrades, GuardFun, UpgradeFun) -> + GuardFun(), LockFile = lock_filename(dir()), case rabbit_misc:lock_file(LockFile) of ok -> @@ -147,7 +149,7 @@ apply_upgrades(Upgrades, Fun) -> %% is not intuitive. Remove it. ok = file:delete(lock_filename(BackupDir)), info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), - ok = Fun(), + ok = UpgradeFun(), [apply_upgrade(Upgrade) || Upgrade <- Upgrades], info("Upgrades: All upgrades applied successfully~n", []), ok = write_version(), -- cgit v1.2.1 From 9ab02c62b7edda1a097912b1f0194788df15f2ff Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 7 Jan 2011 11:54:35 +0000 Subject: Ironically our dummy upgrades now need to be upgraded. --- src/rabbit_upgrade_functions.erl | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 8fee70af..1806c40f 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -91,11 +91,21 @@ internal_exchanges() -> || T <- Tables ], ok. +user_to_internal_user() -> + mnesia( + rabbit_user, + fun({user, Username, PasswordHash, IsAdmin}) -> + {internal_user, Username, PasswordHash, IsAdmin} + end, + [username, password_hash, is_admin], internal_user). + + + one() -> mnesia( rabbit_user, - fun ({user, Username, Hash, IsAdmin}) -> - {user, Username, Hash, IsAdmin, foo} + fun ({internal_user, Username, Hash, IsAdmin}) -> + {internal_user, Username, Hash, IsAdmin, foo} end, [username, password_hash, is_admin, extra]). @@ -106,20 +116,11 @@ two() -> three() -> mnesia( rabbit_user, - fun ({user, Username, Hash, IsAdmin, _}) -> - {user, Username, Hash, IsAdmin} + fun ({internal_user, Username, Hash, IsAdmin, _}) -> + {internal_user, Username, Hash, IsAdmin} end, [username, password_hash, is_admin]). - -user_to_internal_user() -> - mnesia( - rabbit_user, - fun({user, Username, PasswordHash, IsAdmin}) -> - {internal_user, Username, PasswordHash, IsAdmin} - end, - [username, password_hash, is_admin], internal_user). - %%-------------------------------------------------------------------- mnesia(TableName, Fun, FieldList) -> -- cgit v1.2.1 From d235fbe0db6c709860e8fa19d7917484ca902d2e Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 7 Jan 2011 12:07:12 +0000 Subject: Refactor a bit, reinstate ensure_version_ok check. --- src/rabbit_mnesia.erl | 86 +++++++++++++++++++++++++++------------------------ 1 file changed, 45 insertions(+), 41 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c536c64f..9ea1be28 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -393,46 +393,7 @@ init_db(ClusterNodes, Force) -> %% Nothing there at all, start from scratch ok = create_schema(); {_, _} -> - DiscNodes = mnesia:table_info(schema, disc_copies), - case are_we_upgrader(DiscNodes) of - true -> - %% True single disc node, or last disc - %% node in cluster to shut down, attempt - %% upgrade - ok = wait_for_tables(), - case rabbit_upgrade:maybe_upgrade( - [mnesia, local], - fun () -> ok end, - fun forget_other_nodes/0) of - ok -> ensure_schema_ok(); - version_not_available -> schema_ok_or_move() - end; - false -> - %% Subsequent node in cluster, catch up - %% TODO how to do this? - %% ensure_version_ok( - %% rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), - case rabbit_upgrade:maybe_upgrade( - [local], - ensure_nodes_running_fun(DiscNodes), - reset_fun(DiscNodes -- [node()])) of - ok -> - ok; - %% If we're just starting up a new node - %% we won't have a version - version_not_available -> - ok = rabbit_upgrade:write_version() - end, - ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, disc_copies), - ok = create_local_table_copies(case IsDiskNode of - true -> disc; - false -> ram - end), - ensure_schema_ok() - end + ok = setup_existing_node(ClusterNodes, Nodes) end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -441,6 +402,49 @@ init_db(ClusterNodes, Force) -> throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) end. +setup_existing_node(ClusterNodes, Nodes) -> + DiscNodes = mnesia:table_info(schema, disc_copies), + case are_we_upgrader(DiscNodes) of + true -> + %% True single disc node, or last disc node in cluster to + %% shut down, attempt upgrade + ok = wait_for_tables(), + case rabbit_upgrade:maybe_upgrade( + [mnesia, local], fun () -> ok end, + fun forget_other_nodes/0) of + ok -> ensure_schema_ok(); + version_not_available -> schema_ok_or_move() + end; + false -> + %% Subsequent node in cluster, catch up + case Nodes of + [AnotherNode|_] -> + ensure_version_ok( + rpc:call(AnotherNode, rabbit_upgrade, read_version, [])); + [] -> + ok + end, + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), + case rabbit_upgrade:maybe_upgrade( + [local], ensure_nodes_running_fun(DiscNodes), + reset_fun(DiscNodes -- [node()])) of + ok -> + ok; + %% If we're just starting up a new node we won't have + %% a version + version_not_available -> + ok = rabbit_upgrade:write_version() + end, + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(case IsDiskNode of + true -> disc; + false -> ram + end), + ensure_schema_ok() + end. + schema_ok_or_move() -> case check_schema_integrity() of ok -> @@ -476,7 +480,7 @@ ensure_nodes_running_fun(Nodes) -> case nodes_running(Nodes) of [] -> exit("Cluster upgrade needed. The first node you start " - "should be the last node to be shut down."); + "should be the last disc node to be shut down."); _ -> ok end -- cgit v1.2.1 From d1e659c8536e4bdd855d881eb2b1b6ea7def180a Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 7 Jan 2011 13:23:21 +0000 Subject: Cosmetic --- src/rabbit_mnesia.erl | 27 ++++++++++++++++----------- src/rabbit_upgrade.erl | 2 +- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 9ea1be28..ca84b29e 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -407,11 +407,10 @@ setup_existing_node(ClusterNodes, Nodes) -> case are_we_upgrader(DiscNodes) of true -> %% True single disc node, or last disc node in cluster to - %% shut down, attempt upgrade + %% shut down, attempt upgrade if necessary ok = wait_for_tables(), - case rabbit_upgrade:maybe_upgrade( - [mnesia, local], fun () -> ok end, - fun forget_other_nodes/0) of + case rabbit_upgrade:maybe_upgrade([mnesia, local], fun () -> ok end, + fun forget_other_nodes/0) of ok -> ensure_schema_ok(); version_not_available -> schema_ok_or_move() end; @@ -427,7 +426,8 @@ setup_existing_node(ClusterNodes, Nodes) -> IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), case rabbit_upgrade:maybe_upgrade( - [local], ensure_nodes_running_fun(DiscNodes), + [local], + ensure_nodes_running_fun(DiscNodes), reset_fun(DiscNodes -- [node()])) of ok -> ok; @@ -475,9 +475,9 @@ ensure_schema_ok() -> {error, Reason} -> throw({error, {schema_invalid, Reason}}) end. -ensure_nodes_running_fun(Nodes) -> +ensure_nodes_running_fun(DiscNodes) -> fun() -> - case nodes_running(Nodes) of + case nodes_running(DiscNodes) of [] -> exit("Cluster upgrade needed. The first node you start " "should be the last disc node to be shut down."); @@ -486,23 +486,28 @@ ensure_nodes_running_fun(Nodes) -> end end. -reset_fun(Nodes) -> +reset_fun(OtherNodes) -> fun() -> mnesia:stop(), rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - {ok, _} = mnesia:change_config(extra_db_nodes, Nodes), + {ok, _} = mnesia:change_config(extra_db_nodes, OtherNodes), ok end. %% Were we the last node in the cluster to shut down or is there no cluster? %% The answer to this is yes if: %% * We are our canonical source for reading a table -%% - If the canonical source is "nowhere" or another node, we are out of date +%% - If the canonical source is "nowhere" or another node, we are out +%% of date +%% and %% * No other nodes are running Mnesia and have finished booting Rabbit. -%% - Since any node will be its own canonical source once the cluster is up. +%% - Since any node will be its own canonical source once the cluster +%% is up, but just having Mnesia running is not enough - that node +%% could be halfway through starting (and deciding it is the upgrader +%% too) are_we_upgrader(Nodes) -> Where = mnesia:table_info(?EXAMPLE_RABBIT_TABLE, where_to_read), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index c852a0f9..3a78dd7f 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -134,7 +134,7 @@ heads(G) -> %% ------------------------------------------------------------------- apply_upgrades(Upgrades, GuardFun, UpgradeFun) -> - GuardFun(), + ok = GuardFun(), LockFile = lock_filename(dir()), case rabbit_misc:lock_file(LockFile) of ok -> -- cgit v1.2.1 From af1a5fa2320b99d421f84c09e1fa8e2594ba3950 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 10 Jan 2011 17:25:13 +0000 Subject: Move the upgrade tests earlier in the boot process. This doesn't work either, just committing it in order not to lose it. --- src/rabbit_mnesia.erl | 107 +++++++++++++++++++++++++------------------------ src/rabbit_upgrade.erl | 53 +++++++++++++----------- 2 files changed, 84 insertions(+), 76 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index ca84b29e..a11347ff 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -94,6 +94,7 @@ status() -> {running_nodes, running_clustered_nodes()}]. init() -> + ok = maybe_reset_for_upgrades(), ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), @@ -141,11 +142,6 @@ all_clustered_nodes() -> running_clustered_nodes() -> mnesia:system_info(running_db_nodes). -forget_other_nodes() -> - Nodes = all_clustered_nodes() -- [node()], - [{atomic, ok} = mnesia:del_table_copy(schema, Node) || Node <- Nodes], - ok. - empty_ram_only_tables() -> Node = node(), lists:foreach( @@ -404,17 +400,17 @@ init_db(ClusterNodes, Force) -> setup_existing_node(ClusterNodes, Nodes) -> DiscNodes = mnesia:table_info(schema, disc_copies), - case are_we_upgrader(DiscNodes) of - true -> - %% True single disc node, or last disc node in cluster to - %% shut down, attempt upgrade if necessary + Node = node(), + case upgrader(DiscNodes) of + Node -> + %% True single disc node, or upgrader node - attempt + %% upgrade if necessary ok = wait_for_tables(), - case rabbit_upgrade:maybe_upgrade([mnesia, local], fun () -> ok end, - fun forget_other_nodes/0) of + case rabbit_upgrade:maybe_upgrade([mnesia, local]) of ok -> ensure_schema_ok(); version_not_available -> schema_ok_or_move() end; - false -> + _ -> %% Subsequent node in cluster, catch up case Nodes of [AnotherNode|_] -> @@ -423,12 +419,8 @@ setup_existing_node(ClusterNodes, Nodes) -> [] -> ok end, - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), - case rabbit_upgrade:maybe_upgrade( - [local], - ensure_nodes_running_fun(DiscNodes), - reset_fun(DiscNodes -- [node()])) of + ok = wait_for_tables(), + case rabbit_upgrade:maybe_upgrade([local]) of ok -> ok; %% If we're just starting up a new node we won't have @@ -436,13 +428,21 @@ setup_existing_node(ClusterNodes, Nodes) -> version_not_available -> ok = rabbit_upgrade:write_version() end, + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), ok = wait_for_replicated_tables(), ok = create_local_table_copy(schema, disc_copies), ok = create_local_table_copies(case IsDiskNode of true -> disc; false -> ram end), - ensure_schema_ok() + ensure_schema_ok(), + %% If we're just starting up a new node we won't have + %% a version + case rabbit_upgrade:read_version() of + {error, _} -> rabbit_upgrade:write_version(); + _ -> ok + end end. schema_ok_or_move() -> @@ -475,50 +475,48 @@ ensure_schema_ok() -> {error, Reason} -> throw({error, {schema_invalid, Reason}}) end. -ensure_nodes_running_fun(DiscNodes) -> - fun() -> - case nodes_running(DiscNodes) of - [] -> - exit("Cluster upgrade needed. The first node you start " - "should be the last disc node to be shut down."); +maybe_reset_for_upgrades() -> + case rabbit_upgrade:upgrade_required([mnesia]) of + true -> + DiscNodes = all_clustered_nodes(), + Upgrader = upgrader(DiscNodes), + case node() of + Upgrader -> + reset_for_primary_upgrade(DiscNodes); _ -> - ok - end + reset_for_non_primary_upgrade(Upgrader, DiscNodes) + end; + false -> + ok end. -reset_fun(OtherNodes) -> - fun() -> +reset_for_primary_upgrade(DiscNodes) -> + Others = DiscNodes -- [node()], + ensure_mnesia_running(), + force_tables(), + [{atomic, ok} = mnesia:del_table_copy(schema, Node) || Node <- Others], + ok. + +reset_for_non_primary_upgrade(Upgrader, DiscNodes) -> + case node_running(Upgrader) of + false -> + exit(lists:flatten( + io_lib:format( + "Cluster upgrade needed. Please start node ~s first", + [Upgrader]))); + true -> + OtherNodes = DiscNodes -- [node()], mnesia:stop(), rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), - rabbit_misc:ensure_ok(mnesia:start(), - cannot_start_mnesia), + mnesia:start(), {ok, _} = mnesia:change_config(extra_db_nodes, OtherNodes), ok end. -%% Were we the last node in the cluster to shut down or is there no cluster? -%% The answer to this is yes if: -%% * We are our canonical source for reading a table -%% - If the canonical source is "nowhere" or another node, we are out -%% of date -%% and -%% * No other nodes are running Mnesia and have finished booting Rabbit. -%% - Since any node will be its own canonical source once the cluster -%% is up, but just having Mnesia running is not enough - that node -%% could be halfway through starting (and deciding it is the upgrader -%% too) - -are_we_upgrader(Nodes) -> - Where = mnesia:table_info(?EXAMPLE_RABBIT_TABLE, where_to_read), - Node = node(), - case {Where, nodes_running(Nodes)} of - {Node, []} -> true; - {_, _} -> false - end. - -nodes_running(Nodes) -> - [N || N <- Nodes, node_running(N)]. +upgrader(Nodes) -> + [Upgrader|_] = lists:usort(Nodes), + Upgrader. node_running(Node) -> case rpc:call(Node, application, which_applications, []) of @@ -639,6 +637,9 @@ wait_for_tables(TableNames) -> throw({error, {failed_waiting_for_tables, Reason}}) end. +force_tables() -> + [mnesia:force_load_table(T) || T <- table_names()]. + reset(Force) -> ok = ensure_mnesia_not_running(), Node = node(), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 3a78dd7f..260f85a1 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -21,7 +21,8 @@ -module(rabbit_upgrade). --export([maybe_upgrade/3, read_version/0, write_version/0, desired_version/0]). +-export([maybe_upgrade/1, upgrade_required/1]). +-export([read_version/0, write_version/0, desired_version/0]). -include("rabbit.hrl"). @@ -36,8 +37,8 @@ -type(scope() :: 'mnesia' | 'local'). -type(version() :: [step()]). --spec(maybe_upgrade/3 :: ([scope()], fun (() -> 'ok'), fun (() -> 'ok')) - -> 'ok' | 'version_not_available'). +-spec(maybe_upgrade/1 :: ([scope()]) -> 'ok' | 'version_not_available'). +-spec(upgrade_required/1 :: ([scope()]) -> boolean()). -spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). -spec(desired_version/0 :: () -> version()). @@ -49,25 +50,18 @@ %% Try to upgrade the schema. If no information on the existing schema %% could be found, do nothing. rabbit_mnesia:check_schema_integrity() %% will catch the problem. -maybe_upgrade(Scopes, GuardFun, UpgradeFun) -> - case read_version() of - {ok, CurrentHeads} -> - with_upgrade_graph( - fun (G) -> maybe_upgrade_graph(CurrentHeads, Scopes, - GuardFun, UpgradeFun, G) end); - {error, enoent} -> - version_not_available +maybe_upgrade(Scopes) -> + case upgrades_required(Scopes) of + version_not_available -> version_not_available; + [] -> ok; + Upgrades -> apply_upgrades(Upgrades) end. -maybe_upgrade_graph(CurrentHeads, Scopes, GuardFun, UpgradeFun, G) -> - case unknown_heads(CurrentHeads, G) of - [] -> - case upgrades_to_apply(CurrentHeads, Scopes, G) of - [] -> ok; - Upgrades -> apply_upgrades(Upgrades, GuardFun, UpgradeFun) - end; - Unknown -> - throw({error, {future_upgrades_found, Unknown}}) +upgrade_required(Scopes) -> + case upgrades_required(Scopes) of + version_not_available -> false; + [] -> false; + _ -> true end. read_version() -> @@ -85,6 +79,21 @@ desired_version() -> %% ------------------------------------------------------------------- +upgrades_required(Scopes) -> + case read_version() of + {ok, CurrentHeads} -> + with_upgrade_graph( + fun (G) -> + case unknown_heads(CurrentHeads, G) of + [] -> upgrades_to_apply(CurrentHeads, Scopes, G); + Unknown -> throw({error, + {future_upgrades_found, Unknown}}) + end + end); + {error, enoent} -> + version_not_available + end. + with_upgrade_graph(Fun) -> case rabbit_misc:build_acyclic_graph( fun vertices/2, fun edges/2, @@ -133,8 +142,7 @@ heads(G) -> %% ------------------------------------------------------------------- -apply_upgrades(Upgrades, GuardFun, UpgradeFun) -> - ok = GuardFun(), +apply_upgrades(Upgrades) -> LockFile = lock_filename(dir()), case rabbit_misc:lock_file(LockFile) of ok -> @@ -149,7 +157,6 @@ apply_upgrades(Upgrades, GuardFun, UpgradeFun) -> %% is not intuitive. Remove it. ok = file:delete(lock_filename(BackupDir)), info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), - ok = UpgradeFun(), [apply_upgrade(Upgrade) || Upgrade <- Upgrades], info("Upgrades: All upgrades applied successfully~n", []), ok = write_version(), -- cgit v1.2.1 From 0fef8fdcc755596782543d432a7103d5c7dd90fc Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 11 Jan 2011 12:44:43 +0000 Subject: Holy %$*! it works. Still needs tidying up somewhat... --- src/rabbit_mnesia.erl | 99 ++++++++---------------------------------------- src/rabbit_prelaunch.erl | 4 +- src/rabbit_upgrade.erl | 77 +++++++++++++++++++++++++++++++------ 3 files changed, 82 insertions(+), 98 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index a11347ff..345ca82a 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -35,7 +35,7 @@ -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, force_cluster/1, reset/0, force_reset/0, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1]). + empty_ram_only_tables/0, copy_db/1, create_cluster_nodes_config/1]). -export([table_names/0]). @@ -94,7 +94,6 @@ status() -> {running_nodes, running_clustered_nodes()}]. init() -> - ok = maybe_reset_for_upgrades(), ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), @@ -399,35 +398,19 @@ init_db(ClusterNodes, Force) -> end. setup_existing_node(ClusterNodes, Nodes) -> - DiscNodes = mnesia:table_info(schema, disc_copies), - Node = node(), - case upgrader(DiscNodes) of - Node -> - %% True single disc node, or upgrader node - attempt - %% upgrade if necessary + case Nodes of + [] -> + %% We're the first node up ok = wait_for_tables(), - case rabbit_upgrade:maybe_upgrade([mnesia, local]) of + case rabbit_upgrade:maybe_upgrade([local]) of ok -> ensure_schema_ok(); version_not_available -> schema_ok_or_move() end; - _ -> + [AnotherNode|_] -> %% Subsequent node in cluster, catch up - case Nodes of - [AnotherNode|_] -> - ensure_version_ok( - rpc:call(AnotherNode, rabbit_upgrade, read_version, [])); - [] -> - ok - end, + ensure_version_ok( + rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), ok = wait_for_tables(), - case rabbit_upgrade:maybe_upgrade([local]) of - ok -> - ok; - %% If we're just starting up a new node we won't have - %% a version - version_not_available -> - ok = rabbit_upgrade:write_version() - end, IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), ok = wait_for_replicated_tables(), @@ -436,13 +419,13 @@ setup_existing_node(ClusterNodes, Nodes) -> true -> disc; false -> ram end), - ensure_schema_ok(), - %% If we're just starting up a new node we won't have - %% a version - case rabbit_upgrade:read_version() of - {error, _} -> rabbit_upgrade:write_version(); - _ -> ok - end + case rabbit_upgrade:maybe_upgrade([local]) of + ok -> ok; + %% If we're just starting up a new node we won't have + %% a version + version_not_available -> ok = rabbit_upgrade:write_version() + end, + ensure_schema_ok() end. schema_ok_or_move() -> @@ -475,55 +458,6 @@ ensure_schema_ok() -> {error, Reason} -> throw({error, {schema_invalid, Reason}}) end. -maybe_reset_for_upgrades() -> - case rabbit_upgrade:upgrade_required([mnesia]) of - true -> - DiscNodes = all_clustered_nodes(), - Upgrader = upgrader(DiscNodes), - case node() of - Upgrader -> - reset_for_primary_upgrade(DiscNodes); - _ -> - reset_for_non_primary_upgrade(Upgrader, DiscNodes) - end; - false -> - ok - end. - -reset_for_primary_upgrade(DiscNodes) -> - Others = DiscNodes -- [node()], - ensure_mnesia_running(), - force_tables(), - [{atomic, ok} = mnesia:del_table_copy(schema, Node) || Node <- Others], - ok. - -reset_for_non_primary_upgrade(Upgrader, DiscNodes) -> - case node_running(Upgrader) of - false -> - exit(lists:flatten( - io_lib:format( - "Cluster upgrade needed. Please start node ~s first", - [Upgrader]))); - true -> - OtherNodes = DiscNodes -- [node()], - mnesia:stop(), - rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), - cannot_delete_schema), - mnesia:start(), - {ok, _} = mnesia:change_config(extra_db_nodes, OtherNodes), - ok - end. - -upgrader(Nodes) -> - [Upgrader|_] = lists:usort(Nodes), - Upgrader. - -node_running(Node) -> - case rpc:call(Node, application, which_applications, []) of - {badrpc, _} -> false; - Apps -> lists:keysearch(rabbit, 1, Apps) =/= false - end. - create_schema() -> mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), @@ -637,9 +571,6 @@ wait_for_tables(TableNames) -> throw({error, {failed_waiting_for_tables, Reason}}) end. -force_tables() -> - [mnesia:force_load_table(T) || T <- table_names()]. - reset(Force) -> ok = ensure_mnesia_not_running(), Node = node(), diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 8ae45abd..c5ee63ba 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -250,8 +250,8 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> - [{apply,{rabbit,prepare,[]}}, Entry]; +process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) -> + [{apply,{rabbit_upgrade,maybe_upgrade_mnesia,[]}}, Entry]; process_entry(Entry) -> [Entry]. diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 260f85a1..9f9e8806 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -21,7 +21,7 @@ -module(rabbit_upgrade). --export([maybe_upgrade/1, upgrade_required/1]). +-export([maybe_upgrade_mnesia/0, maybe_upgrade/1]). -export([read_version/0, write_version/0, desired_version/0]). -include("rabbit.hrl"). @@ -37,8 +37,8 @@ -type(scope() :: 'mnesia' | 'local'). -type(version() :: [step()]). +-spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). -spec(maybe_upgrade/1 :: ([scope()]) -> 'ok' | 'version_not_available'). --spec(upgrade_required/1 :: ([scope()]) -> boolean()). -spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). -spec(desired_version/0 :: () -> version()). @@ -47,9 +47,69 @@ %% ------------------------------------------------------------------- -%% Try to upgrade the schema. If no information on the existing schema -%% could be found, do nothing. rabbit_mnesia:check_schema_integrity() -%% will catch the problem. +maybe_upgrade_mnesia() -> + rabbit:prepare(), + case upgrades_required([mnesia]) of + Upgrades = [_|_] -> + DiscNodes = rabbit_mnesia:all_clustered_nodes(), + Upgrader = upgrader(DiscNodes), + case node() of + Upgrader -> + primary_upgrade(Upgrades, DiscNodes); + _ -> + non_primary_upgrade(Upgrader, DiscNodes) + end; + [] -> + ok; + version_not_available -> + ok + end. + +upgrader(Nodes) -> + [Upgrader|_] = lists:usort(Nodes), + Upgrader. + +primary_upgrade(Upgrades, DiscNodes) -> + Others = DiscNodes -- [node()], + %% TODO this should happen after backing up! + rabbit_misc:ensure_ok(mnesia:start(), + cannot_start_mnesia), + force_tables(), + [{atomic, ok} = mnesia:del_table_copy(schema, Node) || Node <- Others], + apply_upgrades(Upgrades), + ok. + +force_tables() -> + [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()]. + +non_primary_upgrade(Upgrader, DiscNodes) -> + case node_running(Upgrader) of + false -> + Msg = "~n~n * Cluster upgrade needed. Please start node ~s " + "first. * ~n~n~n", + Args = [Upgrader], + %% We don't throw or exit here since that gets thrown + %% straight out into do_boot, generating an erl_crash.dump + %% and displaying any error message in a confusing way. + error_logger:error_msg(Msg, Args), + io:format(Msg, Args), + error_logger:logfile(close), + halt(1); + true -> + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema), + ok = rabbit_mnesia:create_cluster_nodes_config(DiscNodes), + ok + end. + +node_running(Node) -> + case rpc:call(Node, application, which_applications, []) of + {badrpc, _} -> false; + Apps -> lists:keysearch(rabbit, 1, Apps) =/= false + end. + +%% ------------------------------------------------------------------- + maybe_upgrade(Scopes) -> case upgrades_required(Scopes) of version_not_available -> version_not_available; @@ -57,13 +117,6 @@ maybe_upgrade(Scopes) -> Upgrades -> apply_upgrades(Upgrades) end. -upgrade_required(Scopes) -> - case upgrades_required(Scopes) of - version_not_available -> false; - [] -> false; - _ -> true - end. - read_version() -> case rabbit_misc:read_term_file(schema_filename()) of {ok, [Heads]} -> {ok, Heads}; -- cgit v1.2.1 From a153921362e59e87f5052e5ce80f765425777b59 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 11 Jan 2011 13:34:02 +0000 Subject: One DAG per scope. --- src/rabbit_mnesia.erl | 4 +- src/rabbit_upgrade.erl | 82 +++++++++++++++++++++------------------- src/rabbit_upgrade_functions.erl | 4 +- 3 files changed, 48 insertions(+), 42 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 345ca82a..997b12d4 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -402,7 +402,7 @@ setup_existing_node(ClusterNodes, Nodes) -> [] -> %% We're the first node up ok = wait_for_tables(), - case rabbit_upgrade:maybe_upgrade([local]) of + case rabbit_upgrade:maybe_upgrade(local) of ok -> ensure_schema_ok(); version_not_available -> schema_ok_or_move() end; @@ -419,7 +419,7 @@ setup_existing_node(ClusterNodes, Nodes) -> true -> disc; false -> ram end), - case rabbit_upgrade:maybe_upgrade([local]) of + case rabbit_upgrade:maybe_upgrade(local) of ok -> ok; %% If we're just starting up a new node we won't have %% a version diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 9f9e8806..4bdff65a 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -28,17 +28,18 @@ -define(VERSION_FILENAME, "schema_version"). -define(LOCK_FILENAME, "schema_upgrade_lock"). +-define(SCOPES, [mnesia, local]). %% ------------------------------------------------------------------- -ifdef(use_specs). -type(step() :: atom()). --type(scope() :: 'mnesia' | 'local'). -type(version() :: [step()]). +-type(scope() :: 'mnesia' | 'local'). -spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). --spec(maybe_upgrade/1 :: ([scope()]) -> 'ok' | 'version_not_available'). +-spec(maybe_upgrade/1 :: (scope()) -> 'ok' | 'version_not_available'). -spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). -spec(desired_version/0 :: () -> version()). @@ -49,8 +50,8 @@ maybe_upgrade_mnesia() -> rabbit:prepare(), - case upgrades_required([mnesia]) of - Upgrades = [_|_] -> + case upgrades_required(mnesia) of + [_|_] = Upgrades -> DiscNodes = rabbit_mnesia:all_clustered_nodes(), Upgrader = upgrader(DiscNodes), case node() of @@ -72,8 +73,7 @@ upgrader(Nodes) -> primary_upgrade(Upgrades, DiscNodes) -> Others = DiscNodes -- [node()], %% TODO this should happen after backing up! - rabbit_misc:ensure_ok(mnesia:start(), - cannot_start_mnesia), + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), force_tables(), [{atomic, ok} = mnesia:del_table_copy(schema, Node) || Node <- Others], apply_upgrades(Upgrades), @@ -110,8 +110,8 @@ node_running(Node) -> %% ------------------------------------------------------------------- -maybe_upgrade(Scopes) -> - case upgrades_required(Scopes) of +maybe_upgrade(Scope) -> + case upgrades_required(Scope) of version_not_available -> version_not_available; [] -> ok; Upgrades -> apply_upgrades(Upgrades) @@ -128,34 +128,41 @@ write_version() -> ok. desired_version() -> - with_upgrade_graph(fun (G) -> heads(G) end). + lists:append( + [with_upgrade_graph(fun (_, G) -> heads(G) end, Scope, []) + || Scope <- ?SCOPES]). %% ------------------------------------------------------------------- -upgrades_required(Scopes) -> +upgrades_required(Scope) -> case read_version() of {ok, CurrentHeads} -> - with_upgrade_graph( - fun (G) -> - case unknown_heads(CurrentHeads, G) of - [] -> upgrades_to_apply(CurrentHeads, Scopes, G); - Unknown -> throw({error, - {future_upgrades_found, Unknown}}) - end - end); + with_upgrade_graph(fun upgrades_to_apply/2, Scope, CurrentHeads); {error, enoent} -> version_not_available end. -with_upgrade_graph(Fun) -> +with_upgrade_graph(Fun, Scope, CurrentHeads) -> + G0 = make_graph(Scope), + Gs = [G0|[make_graph(S) || S <- ?SCOPES -- [Scope]]], + try + Known = lists:append([digraph:vertices(G) || G <- Gs]), + case unknown_heads(CurrentHeads, Known) of + [] -> ok; + Unknown -> throw({error, {future_upgrades_found, Unknown}}) + end, + Fun(CurrentHeads, G0) + after + [true = digraph:delete(G) || G <- Gs] + end. + +make_graph(Scope) -> case rabbit_misc:build_acyclic_graph( - fun vertices/2, fun edges/2, + fun (Module, Steps) -> vertices(Module, Steps, Scope) end, + fun (Module, Steps) -> edges(Module, Steps, Scope) end, rabbit_misc:all_module_attributes(rabbit_upgrade)) of - {ok, G} -> try - Fun(G) - after - true = digraph:delete(G) - end; + {ok, G} -> + G; {error, {vertex, duplicate, StepName}} -> throw({error, {duplicate_upgrade_step, StepName}}); {error, {edge, {bad_vertex, StepName}, _From, _To}} -> @@ -164,18 +171,19 @@ with_upgrade_graph(Fun) -> throw({error, {cycle_in_upgrade_steps, StepNames}}) end. -vertices(Module, Steps) -> - [{StepName, {Scope, {Module, StepName}}} || - {StepName, Scope, _Reqs} <- Steps]. +vertices(Module, Steps, Scope0) -> + [{StepName, {Module, StepName}} || {StepName, Scope1, _Reqs} <- Steps, + Scope0 == Scope1]. -edges(_Module, Steps) -> - [{Require, StepName} || {StepName, _Scope, Requires} <- Steps, - Require <- Requires]. +edges(_Module, Steps, Scope0) -> + [{Require, StepName} || {StepName, Scope1, Requires} <- Steps, + Require <- Requires, + Scope0 == Scope1]. -unknown_heads(Heads, G) -> - [H || H <- Heads, digraph:vertex(G, H) =:= false]. +unknown_heads(Heads, Known) -> + lists:filter(fun(H) -> not lists:member(H, Known) end, Heads). -upgrades_to_apply(Heads, Scopes, G) -> +upgrades_to_apply(Heads, G) -> %% Take all the vertices which can reach the known heads. That's %% everything we've already applied. Subtract that from all %% vertices: that's what we have to apply. @@ -185,10 +193,8 @@ upgrades_to_apply(Heads, Scopes, G) -> sets:from_list(digraph_utils:reaching(Heads, G)))), %% Form a subgraph from that list and find a topological ordering %% so we can invoke them in order. - Sorted = [element(2, digraph:vertex(G, StepName)) || - StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))], - %% Only return the upgrades for the appropriate scopes - [Upgrade || {Scope, Upgrade} <- Sorted, lists:member(Scope, Scopes)]. + [element(2, digraph:vertex(G, StepName)) || + StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. heads(G) -> lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 4068b090..b9b46f9a 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -31,8 +31,8 @@ -rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). -rabbit_upgrade({one, mnesia, [user_to_internal_user]}). --rabbit_upgrade({two, local, [one]}). --rabbit_upgrade({three, mnesia, [two]}). +-rabbit_upgrade({two, mnesia, [one]}). +-rabbit_upgrade({three, local, []}). %% ------------------------------------------------------------------- -- cgit v1.2.1 From 70c6ce665144f6d85a160e842c4cdfe543865ef4 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 11 Jan 2011 13:44:41 +0000 Subject: Break the cluster *after* taking the backup. --- src/rabbit_mnesia.erl | 9 +-------- src/rabbit_upgrade.erl | 19 ++++++++++++------- src/rabbit_upgrade_functions.erl | 8 ++++---- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 997b12d4..26fda4e9 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -492,14 +492,7 @@ move_db() -> ok. copy_db(Destination) -> - mnesia:stop(), - case rabbit_misc:recursive_copy(dir(), Destination) of - ok -> - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - ok = wait_for_tables(); - {error, E} -> - {error, E} - end. + rabbit_misc:recursive_copy(dir(), Destination). create_tables() -> lists:foreach(fun ({Tab, TabDef}) -> diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 4bdff65a..d0fdbf08 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -72,11 +72,15 @@ upgrader(Nodes) -> primary_upgrade(Upgrades, DiscNodes) -> Others = DiscNodes -- [node()], - %% TODO this should happen after backing up! - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - force_tables(), - [{atomic, ok} = mnesia:del_table_copy(schema, Node) || Node <- Others], - apply_upgrades(Upgrades), + apply_upgrades( + Upgrades, + fun () -> + info("Upgrades: Breaking cluster~n", []), + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + force_tables(), + [{atomic, ok} = mnesia:del_table_copy(schema, Node) + || Node <- Others] + end), ok. force_tables() -> @@ -114,7 +118,7 @@ maybe_upgrade(Scope) -> case upgrades_required(Scope) of version_not_available -> version_not_available; [] -> ok; - Upgrades -> apply_upgrades(Upgrades) + Upgrades -> apply_upgrades(Upgrades, fun() -> ok end) end. read_version() -> @@ -201,7 +205,7 @@ heads(G) -> %% ------------------------------------------------------------------- -apply_upgrades(Upgrades) -> +apply_upgrades(Upgrades, Fun) -> LockFile = lock_filename(dir()), case rabbit_misc:lock_file(LockFile) of ok -> @@ -216,6 +220,7 @@ apply_upgrades(Upgrades) -> %% is not intuitive. Remove it. ok = file:delete(lock_filename(BackupDir)), info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), + Fun(), [apply_upgrade(Upgrade) || Upgrade <- Upgrades], info("Upgrades: All upgrades applied successfully~n", []), ok = write_version(), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b9b46f9a..151b498d 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -110,10 +110,6 @@ one() -> [username, password_hash, is_admin, extra]). two() -> - ok = rabbit_misc:write_term_file(filename:join(rabbit_mnesia:dir(), "test"), - [test]). - -three() -> mnesia( rabbit_user, fun ({internal_user, Username, Hash, IsAdmin, _}) -> @@ -121,6 +117,10 @@ three() -> end, [username, password_hash, is_admin]). +three() -> + ok = rabbit_misc:write_term_file(filename:join(rabbit_mnesia:dir(), "test"), + [test]). + %%-------------------------------------------------------------------- mnesia(TableName, Fun, FieldList) -> -- cgit v1.2.1 From 31cef377dff7bdfce6bff9b802ad0dd22d3341a1 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 11 Jan 2011 15:03:29 +0000 Subject: Store the version as an orddict keyed on different scopes, and thus don't assert that everything is done after the first upgrade. --- src/rabbit_upgrade.erl | 98 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 63 insertions(+), 35 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index d0fdbf08..9d6263fe 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -35,7 +35,7 @@ -ifdef(use_specs). -type(step() :: atom()). --type(version() :: [step()]). +-type(version() :: [{scope(), [step()]}]). -type(scope() :: 'mnesia' | 'local'). -spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). @@ -73,9 +73,10 @@ upgrader(Nodes) -> primary_upgrade(Upgrades, DiscNodes) -> Others = DiscNodes -- [node()], apply_upgrades( + mnesia, Upgrades, fun () -> - info("Upgrades: Breaking cluster~n", []), + info("mnesia upgrades: Breaking cluster~n", []), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), force_tables(), [{atomic, ok} = mnesia:del_table_copy(schema, Node) @@ -118,55 +119,80 @@ maybe_upgrade(Scope) -> case upgrades_required(Scope) of version_not_available -> version_not_available; [] -> ok; - Upgrades -> apply_upgrades(Upgrades, fun() -> ok end) + Upgrades -> apply_upgrades(Scope, Upgrades, + fun() -> ok end) end. read_version() -> case rabbit_misc:read_term_file(schema_filename()) of - {ok, [Heads]} -> {ok, Heads}; + {ok, [V]} -> case orddict:find(mnesia, V) of + error -> {ok, convert_old_version(V)}; + _ -> {ok, V} + end; {error, _} = Err -> Err end. +read_version(Scope) -> + case read_version() of + {error, _} = E -> E; + {ok, V} -> {ok, orddict:fetch(Scope, V)} + end. + write_version() -> ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]), ok. +write_version(Scope) -> + {ok, V0} = read_version(), + V = orddict:store(Scope, desired_version(Scope), V0), + ok = rabbit_misc:write_term_file(schema_filename(), [V]), + ok. + desired_version() -> - lists:append( - [with_upgrade_graph(fun (_, G) -> heads(G) end, Scope, []) - || Scope <- ?SCOPES]). + lists:foldl( + fun (Scope, Acc) -> + orddict:store(Scope, desired_version(Scope), Acc) + end, + orddict:new(), ?SCOPES). + +desired_version(Scope) -> + with_upgrade_graph(fun (G) -> heads(G) end, Scope). + +convert_old_version(Heads) -> + Locals = [add_queue_ttl], + V0 = orddict:new(), + V1 = orddict:store(mnesia, Heads -- Locals, V0), + orddict:store(local, + lists:filter(fun(H) -> lists:member(H, Locals) end, Heads), + V1). %% ------------------------------------------------------------------- upgrades_required(Scope) -> - case read_version() of + case read_version(Scope) of {ok, CurrentHeads} -> - with_upgrade_graph(fun upgrades_to_apply/2, Scope, CurrentHeads); + with_upgrade_graph( + fun (G) -> + case unknown_heads(CurrentHeads, G) of + [] -> upgrades_to_apply(CurrentHeads, G); + Unknown -> throw({error, + {future_upgrades_found, Unknown}}) + end + end, Scope); {error, enoent} -> version_not_available end. -with_upgrade_graph(Fun, Scope, CurrentHeads) -> - G0 = make_graph(Scope), - Gs = [G0|[make_graph(S) || S <- ?SCOPES -- [Scope]]], - try - Known = lists:append([digraph:vertices(G) || G <- Gs]), - case unknown_heads(CurrentHeads, Known) of - [] -> ok; - Unknown -> throw({error, {future_upgrades_found, Unknown}}) - end, - Fun(CurrentHeads, G0) - after - [true = digraph:delete(G) || G <- Gs] - end. - -make_graph(Scope) -> +with_upgrade_graph(Fun, Scope) -> case rabbit_misc:build_acyclic_graph( fun (Module, Steps) -> vertices(Module, Steps, Scope) end, fun (Module, Steps) -> edges(Module, Steps, Scope) end, rabbit_misc:all_module_attributes(rabbit_upgrade)) of - {ok, G} -> - G; + {ok, G} -> try + Fun(G) + after + true = digraph:delete(G) + end; {error, {vertex, duplicate, StepName}} -> throw({error, {duplicate_upgrade_step, StepName}}); {error, {edge, {bad_vertex, StepName}, _From, _To}} -> @@ -205,12 +231,12 @@ heads(G) -> %% ------------------------------------------------------------------- -apply_upgrades(Upgrades, Fun) -> +apply_upgrades(Scope, Upgrades, Fun) -> LockFile = lock_filename(dir()), case rabbit_misc:lock_file(LockFile) of ok -> BackupDir = dir() ++ "-upgrade-backup", - info("Upgrades: ~w to apply~n", [length(Upgrades)]), + info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]), case rabbit_mnesia:copy_db(BackupDir) of ok -> %% We need to make the backup after creating the @@ -219,13 +245,15 @@ apply_upgrades(Upgrades, Fun) -> %% the lock file exists in the backup too, which %% is not intuitive. Remove it. ok = file:delete(lock_filename(BackupDir)), - info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), + info("~s upgrades: Mnesia dir backed up to ~p~n", + [Scope, BackupDir]), Fun(), - [apply_upgrade(Upgrade) || Upgrade <- Upgrades], - info("Upgrades: All upgrades applied successfully~n", []), - ok = write_version(), + [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades], + info("~s upgrades: All upgrades applied successfully~n", + [Scope]), + ok = write_version(Scope), ok = rabbit_misc:recursive_delete([BackupDir]), - info("Upgrades: Mnesia backup removed~n", []), + info("~s upgrades: Mnesia backup removed~n", [Scope]), ok = file:delete(LockFile); {error, E} -> %% If we can't backup, the upgrade hasn't started @@ -238,8 +266,8 @@ apply_upgrades(Upgrades, Fun) -> throw({error, previous_upgrade_failed}) end. -apply_upgrade({M, F}) -> - info("Upgrades: Applying ~w:~w~n", [M, F]), +apply_upgrade(Scope, {M, F}) -> + info("~s upgrades: Applying ~w:~w~n", [Scope, M, F]), ok = apply(M, F, []). %% ------------------------------------------------------------------- -- cgit v1.2.1 From 2a43f15e16c5ab4c47827efc6e361b3badc69fba Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 11 Jan 2011 15:09:19 +0000 Subject: Note that we've upgraded here --- src/rabbit_upgrade.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 9d6263fe..9ce9b385 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -104,6 +104,7 @@ non_primary_upgrade(Upgrader, DiscNodes) -> rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), ok = rabbit_mnesia:create_cluster_nodes_config(DiscNodes), + write_version(mnesia), ok end. -- cgit v1.2.1 From 8d1365c898057bec83c45201d99c7ca8d5815e3a Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 11 Jan 2011 15:11:44 +0000 Subject: Remove test upgrades --- src/rabbit_upgrade_functions.erl | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 151b498d..d2ef31b9 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -30,10 +30,6 @@ -rabbit_upgrade({internal_exchanges, mnesia, []}). -rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). --rabbit_upgrade({one, mnesia, [user_to_internal_user]}). --rabbit_upgrade({two, mnesia, [one]}). --rabbit_upgrade({three, local, []}). - %% ------------------------------------------------------------------- -ifdef(use_specs). @@ -99,28 +95,6 @@ user_to_internal_user() -> end, [username, password_hash, is_admin], internal_user). - - -one() -> - mnesia( - rabbit_user, - fun ({internal_user, Username, Hash, IsAdmin}) -> - {internal_user, Username, Hash, IsAdmin, foo} - end, - [username, password_hash, is_admin, extra]). - -two() -> - mnesia( - rabbit_user, - fun ({internal_user, Username, Hash, IsAdmin, _}) -> - {internal_user, Username, Hash, IsAdmin} - end, - [username, password_hash, is_admin]). - -three() -> - ok = rabbit_misc:write_term_file(filename:join(rabbit_mnesia:dir(), "test"), - [test]). - %%-------------------------------------------------------------------- mnesia(TableName, Fun, FieldList) -> -- cgit v1.2.1 From fc4e251b01f64cc28a30bf902eb36ab68e144aaa Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 11 Jan 2011 15:25:28 +0000 Subject: Minimise difference with default. --- src/rabbit_mnesia.erl | 61 +++++++++++++++++++++++---------------------------- 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 26fda4e9..e63e5de2 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -387,8 +387,34 @@ init_db(ClusterNodes, Force) -> {[], false} -> %% Nothing there at all, start from scratch ok = create_schema(); - {_, _} -> - ok = setup_existing_node(ClusterNodes, Nodes) + {[], _} -> + %% We're the first node up + ok = wait_for_tables(), + case rabbit_upgrade:maybe_upgrade(local) of + ok -> ensure_schema_ok(); + version_not_available -> schema_ok_or_move() + end; + {[AnotherNode|_], _} -> + %% Subsequent node in cluster, catch up + ensure_version_ok( + rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), + ok = wait_for_tables(), + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(case IsDiskNode of + true -> disc; + false -> ram + end), + case rabbit_upgrade:maybe_upgrade(local) of + ok -> ok; + %% If we're just starting up a new node we won't have + %% a version + version_not_available -> + ok = rabbit_upgrade:write_version() + end, + ensure_schema_ok() end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -397,37 +423,6 @@ init_db(ClusterNodes, Force) -> throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) end. -setup_existing_node(ClusterNodes, Nodes) -> - case Nodes of - [] -> - %% We're the first node up - ok = wait_for_tables(), - case rabbit_upgrade:maybe_upgrade(local) of - ok -> ensure_schema_ok(); - version_not_available -> schema_ok_or_move() - end; - [AnotherNode|_] -> - %% Subsequent node in cluster, catch up - ensure_version_ok( - rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), - ok = wait_for_tables(), - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), - ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, disc_copies), - ok = create_local_table_copies(case IsDiskNode of - true -> disc; - false -> ram - end), - case rabbit_upgrade:maybe_upgrade(local) of - ok -> ok; - %% If we're just starting up a new node we won't have - %% a version - version_not_available -> ok = rabbit_upgrade:write_version() - end, - ensure_schema_ok() - end. - schema_ok_or_move() -> case check_schema_integrity() of ok -> -- cgit v1.2.1 From 49174fa2a3610b6158fe70744935b0bf885a1e9e Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 11 Jan 2011 17:03:56 +0000 Subject: Revert this to the old version that we want. --- src/rabbit_upgrade.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 9ce9b385..23dd416a 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -211,8 +211,8 @@ edges(_Module, Steps, Scope0) -> Require <- Requires, Scope0 == Scope1]. -unknown_heads(Heads, Known) -> - lists:filter(fun(H) -> not lists:member(H, Known) end, Heads). +unknown_heads(Heads, G) -> + [H || H <- Heads, digraph:vertex(G, H) =:= false]. upgrades_to_apply(Heads, G) -> %% Take all the vertices which can reach the known heads. That's -- cgit v1.2.1 From d9235728acd857cea1240ab84f64dfa16bfdff54 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 12 Jan 2011 12:01:09 +0000 Subject: rabbit_mnesia:all_clustered_nodes/0 does not return disc nodes only. Duh. But we can do better anyway: allow any disc node to do the upgrade. --- src/rabbit_mnesia.erl | 5 ++- src/rabbit_upgrade.erl | 99 +++++++++++++++++++++++++++++++++----------------- 2 files changed, 70 insertions(+), 34 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index e63e5de2..47e68c87 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -35,7 +35,8 @@ -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, force_cluster/1, reset/0, force_reset/0, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1, create_cluster_nodes_config/1]). + empty_ram_only_tables/0, copy_db/1, + create_cluster_nodes_config/1, read_cluster_nodes_config/0]). -export([table_names/0]). @@ -71,6 +72,8 @@ -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). +-spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). +-spec(read_cluster_nodes_config/0 :: () -> [node()]). -endif. diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 23dd416a..dcbffd03 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -22,7 +22,8 @@ -module(rabbit_upgrade). -export([maybe_upgrade_mnesia/0, maybe_upgrade/1]). --export([read_version/0, write_version/0, desired_version/0]). +-export([read_version/0, write_version/0, desired_version/0, + desired_version/1]). -include("rabbit.hrl"). @@ -43,6 +44,7 @@ -spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). -spec(desired_version/0 :: () -> version()). +-spec(desired_version/1 :: (scope()) -> [step()]). -endif. @@ -52,13 +54,10 @@ maybe_upgrade_mnesia() -> rabbit:prepare(), case upgrades_required(mnesia) of [_|_] = Upgrades -> - DiscNodes = rabbit_mnesia:all_clustered_nodes(), - Upgrader = upgrader(DiscNodes), - case node() of - Upgrader -> - primary_upgrade(Upgrades, DiscNodes); - _ -> - non_primary_upgrade(Upgrader, DiscNodes) + Nodes = rabbit_mnesia:all_clustered_nodes(), + case am_i_upgrader(Nodes) of + true -> primary_upgrade(Upgrades, Nodes); + false -> non_primary_upgrade(Nodes) end; [] -> ok; @@ -66,12 +65,57 @@ maybe_upgrade_mnesia() -> ok end. -upgrader(Nodes) -> - [Upgrader|_] = lists:usort(Nodes), - Upgrader. +am_i_upgrader(Nodes) -> + Running = nodes_running(Nodes), + case Running of + [] -> + case am_i_disc_node() of + true -> + true; + false -> + die("Cluster upgrade needed but this is a ram node.~n " + "Please start any of the disc nodes first.", []) + end; + [Another|_] -> + ClusterVersion = + case rpc:call(Another, + rabbit_upgrade, desired_version, [mnesia]) of + {badrpc, {'EXIT', {undef, _}}} -> unknown_old_version; + {badrpc, Reason} -> {unknown, Reason}; + V -> V + end, + case desired_version(mnesia) of + ClusterVersion -> + %% The other node(s) have upgraded already, I am not the + %% upgrader + false; + MyVersion -> + %% The other node(s) are running an unexpected version. + die("Cluster upgrade needed but other nodes are " + "running ~p~n" + "and I want ~p", [ClusterVersion, MyVersion]) + end + end. + +am_i_disc_node() -> + %% The cluster config does not list all disc nodes, but it will list us + %% if we're one. + case rabbit_mnesia:read_cluster_nodes_config() of + [] -> true; + DiscNodes -> lists:member(node(), DiscNodes) + end. -primary_upgrade(Upgrades, DiscNodes) -> - Others = DiscNodes -- [node()], +die(Msg, Args) -> + %% We don't throw or exit here since that gets thrown + %% straight out into do_boot, generating an erl_crash.dump + %% and displaying any error message in a confusing way. + error_logger:error_msg(Msg, Args), + io:format("~n~n** " ++ Msg ++ " **~n~n~n", Args), + error_logger:logfile(close), + halt(1). + +primary_upgrade(Upgrades, Nodes) -> + Others = Nodes -- [node()], apply_upgrades( mnesia, Upgrades, @@ -87,26 +131,15 @@ primary_upgrade(Upgrades, DiscNodes) -> force_tables() -> [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()]. -non_primary_upgrade(Upgrader, DiscNodes) -> - case node_running(Upgrader) of - false -> - Msg = "~n~n * Cluster upgrade needed. Please start node ~s " - "first. * ~n~n~n", - Args = [Upgrader], - %% We don't throw or exit here since that gets thrown - %% straight out into do_boot, generating an erl_crash.dump - %% and displaying any error message in a confusing way. - error_logger:error_msg(Msg, Args), - io:format(Msg, Args), - error_logger:logfile(close), - halt(1); - true -> - rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), - cannot_delete_schema), - ok = rabbit_mnesia:create_cluster_nodes_config(DiscNodes), - write_version(mnesia), - ok - end. +non_primary_upgrade(Nodes) -> + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema), + ok = rabbit_mnesia:create_cluster_nodes_config(Nodes), + write_version(mnesia), + ok. + +nodes_running(Nodes) -> + [N || N <- Nodes, node_running(N)]. node_running(Node) -> case rpc:call(Node, application, which_applications, []) of -- cgit v1.2.1 From 81fd88b601cfa099f052f4270317248c6f870e72 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 12 Jan 2011 12:43:25 +0000 Subject: Remove pointless differences from default. --- src/rabbit_mnesia.erl | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 47e68c87..6523a036 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -44,8 +44,6 @@ %% other mnesia-using Erlang applications, such as ejabberd -export([create_tables/0]). --define(EXAMPLE_RABBIT_TABLE, rabbit_durable_exchange). - -include("rabbit.hrl"). %%---------------------------------------------------------------------------- @@ -164,7 +162,7 @@ nodes_of_type(Type) -> %% Specifically, we check whether a certain table, which we know %% will be written to disk on a disc node, is stored on disk or in %% RAM. - mnesia:table_info(?EXAMPLE_RABBIT_TABLE, Type). + mnesia:table_info(rabbit_durable_exchange, Type). table_definitions() -> [{rabbit_user, @@ -401,7 +399,6 @@ init_db(ClusterNodes, Force) -> %% Subsequent node in cluster, catch up ensure_version_ok( rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), - ok = wait_for_tables(), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), ok = wait_for_replicated_tables(), -- cgit v1.2.1 From d19649eec7e0eb34fbf16b906d36a713b9737c5b Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 12 Jan 2011 12:54:12 +0000 Subject: Detect old-style versions properly. --- src/rabbit_upgrade.erl | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index dcbffd03..a570df4a 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -159,9 +159,9 @@ maybe_upgrade(Scope) -> read_version() -> case rabbit_misc:read_term_file(schema_filename()) of - {ok, [V]} -> case orddict:find(mnesia, V) of - error -> {ok, convert_old_version(V)}; - _ -> {ok, V} + {ok, [V]} -> case is_orddict(V) of + false -> {ok, convert_old_version(V)}; + true -> {ok, V} end; {error, _} = Err -> Err end. @@ -315,3 +315,9 @@ lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). %% NB: we cannot use rabbit_log here since it may not have been %% started yet info(Msg, Args) -> error_logger:info_msg(Msg, Args). + +%% This doesn't check it's ordered but that's not needed for our purposes +is_orddict(Thing) -> + is_list(Thing) andalso + lists:all(fun(Item) -> is_tuple(Item) andalso size(Item) == 2 end, + Thing). -- cgit v1.2.1 From 87e8c18729974033ecef50a6b91b336e04189a15 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 12 Jan 2011 13:05:53 +0000 Subject: And fix again. --- src/rabbit_upgrade.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index a570df4a..4bf8d661 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -159,7 +159,7 @@ maybe_upgrade(Scope) -> read_version() -> case rabbit_misc:read_term_file(schema_filename()) of - {ok, [V]} -> case is_orddict(V) of + {ok, [V]} -> case is_new_version(V) of false -> {ok, convert_old_version(V)}; true -> {ok, V} end; @@ -316,8 +316,8 @@ lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). %% started yet info(Msg, Args) -> error_logger:info_msg(Msg, Args). -%% This doesn't check it's ordered but that's not needed for our purposes -is_orddict(Thing) -> - is_list(Thing) andalso +is_new_version(Version) -> + is_list(Version) andalso + length(Version) > 0 andalso lists:all(fun(Item) -> is_tuple(Item) andalso size(Item) == 2 end, - Thing). + Version). -- cgit v1.2.1 From 3821445acd31339a98af2ab0508f092ec06332d2 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 12 Jan 2011 13:31:41 +0000 Subject: Don't display a cluster-related message on a single node. --- src/rabbit_upgrade.erl | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 4bf8d661..2c4dad87 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -120,11 +120,16 @@ primary_upgrade(Upgrades, Nodes) -> mnesia, Upgrades, fun () -> - info("mnesia upgrades: Breaking cluster~n", []), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), force_tables(), - [{atomic, ok} = mnesia:del_table_copy(schema, Node) - || Node <- Others] + case Others of + [] -> + ok; + _ -> + info("mnesia upgrades: Breaking cluster~n", []), + [{atomic, ok} = mnesia:del_table_copy(schema, Node) + || Node <- Others] + end end), ok. -- cgit v1.2.1 From e45219e2eea0ef94646518a122dedf6f39fadc2f Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 12 Jan 2011 14:03:49 +0000 Subject: Break the bad news rather than just timing out wait_for_tables as we traditionally have done. --- src/rabbit_upgrade.erl | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 2c4dad87..53ed99d3 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -52,9 +52,9 @@ maybe_upgrade_mnesia() -> rabbit:prepare(), + Nodes = rabbit_mnesia:all_clustered_nodes(), case upgrades_required(mnesia) of [_|_] = Upgrades -> - Nodes = rabbit_mnesia:all_clustered_nodes(), case am_i_upgrader(Nodes) of true -> primary_upgrade(Upgrades, Nodes); false -> non_primary_upgrade(Nodes) @@ -62,7 +62,15 @@ maybe_upgrade_mnesia() -> [] -> ok; version_not_available -> - ok + case Nodes of + [_] -> + ok; + _ -> + die("Cluster upgrade needed but upgrading from < 2.1.1.~n" + " Unfortunately you will need to rebuild the " + "cluster.", + []) + end end. am_i_upgrader(Nodes) -> -- cgit v1.2.1 From 90d3914c6aab0b510c42000d00615d5c51ec4345 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 12 Jan 2011 14:43:32 +0000 Subject: Cosmetic. --- src/rabbit_mnesia.erl | 3 ++- src/rabbit_upgrade.erl | 34 ++++++++++++++-------------------- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 6523a036..ee6ede35 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -408,7 +408,8 @@ init_db(ClusterNodes, Force) -> false -> ram end), case rabbit_upgrade:maybe_upgrade(local) of - ok -> ok; + ok -> + ok; %% If we're just starting up a new node we won't have %% a version version_not_available -> diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 53ed99d3..6df881fd 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -63,13 +63,10 @@ maybe_upgrade_mnesia() -> ok; version_not_available -> case Nodes of - [_] -> - ok; - _ -> - die("Cluster upgrade needed but upgrading from < 2.1.1.~n" - " Unfortunately you will need to rebuild the " - "cluster.", - []) + [_] -> ok; + _ -> die("Cluster upgrade needed but upgrading from " + "< 2.1.1.~n Unfortunately you will need to " + "rebuild the cluster.", []) end end. @@ -78,11 +75,10 @@ am_i_upgrader(Nodes) -> case Running of [] -> case am_i_disc_node() of - true -> - true; - false -> - die("Cluster upgrade needed but this is a ram node.~n " - "Please start any of the disc nodes first.", []) + true -> true; + false -> die("Cluster upgrade needed but this is a ram " + "node.~n Please start any of the disc nodes " + "first.", []) end; [Another|_] -> ClusterVersion = @@ -100,8 +96,8 @@ am_i_upgrader(Nodes) -> MyVersion -> %% The other node(s) are running an unexpected version. die("Cluster upgrade needed but other nodes are " - "running ~p~n" - "and I want ~p", [ClusterVersion, MyVersion]) + "running ~p~nand I want ~p", + [ClusterVersion, MyVersion]) end end. @@ -131,12 +127,10 @@ primary_upgrade(Upgrades, Nodes) -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), force_tables(), case Others of - [] -> - ok; - _ -> - info("mnesia upgrades: Breaking cluster~n", []), - [{atomic, ok} = mnesia:del_table_copy(schema, Node) - || Node <- Others] + [] -> ok; + _ -> info("mnesia upgrades: Breaking cluster~n", []), + [{atomic, ok} = mnesia:del_table_copy(schema, Node) + || Node <- Others] end end), ok. -- cgit v1.2.1 From 5f6b9f8881f55d67775df4db00cb513a037d649d Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 26 Jan 2011 12:32:34 +0000 Subject: Change the new version format from: [{local, [...]}, {mnesia, [...]}]. to: [{rabbit, [{local, [...]}, {mnesia, [...]}]}]. This is to allow for future work allowing plugins to own upgrades (that can be ignored if the plugin is uninstalled), without having to change the format *again*. --- src/rabbit_upgrade.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index b222845d..f279029a 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -163,7 +163,8 @@ read_version() -> case rabbit_misc:read_term_file(schema_filename()) of {ok, [V]} -> case is_new_version(V) of false -> {ok, convert_old_version(V)}; - true -> {ok, V} + true -> [{rabbit, RV}] = V, + {ok, RV} end; {error, _} = Err -> Err end. @@ -175,13 +176,14 @@ read_version(Scope) -> end. write_version() -> - ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]), + ok = rabbit_misc:write_term_file(schema_filename(), + [[{rabbit, desired_version()}]]), ok. write_version(Scope) -> {ok, V0} = read_version(), V = orddict:store(Scope, desired_version(Scope), V0), - ok = rabbit_misc:write_term_file(schema_filename(), [V]), + ok = rabbit_misc:write_term_file(schema_filename(), [[{rabbit, V}]]), ok. desired_version() -> -- cgit v1.2.1 From caea05b408f238891410107431b3b0994e02ae66 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 7 Feb 2011 16:05:02 +0000 Subject: Just depend on "erlang". --- packaging/debs/Debian/debian/control | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index 02da0cc6..b01d38b3 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -7,10 +7,7 @@ Standards-Version: 3.8.0 Package: rabbitmq-server Architecture: all -# erlang-inets is not a strict dependency, but it's needed to allow -# the installation of plugins that use mochiweb. Ideally it would be a -# "Recommends" instead, but gdebi does not install those. -Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), erlang-inets | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends} +Depends: erlang (>= 1:12.b.3), adduser, logrotate, ${misc:Depends} Description: An AMQP server written in Erlang RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and -- cgit v1.2.1 From 0b4ffb33067b778ebbe30fd2c4b0b9f9160c18c3 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 14 Feb 2011 13:08:43 +0000 Subject: Be explicit where we can be --- src/rabbit_mnesia.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index f7befebc..e7da6a43 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -373,7 +373,7 @@ init_db(ClusterNodes, Force) -> {[], false} -> %% Nothing there at all, start from scratch ok = create_schema(); - {[], _} -> + {[], true} -> %% We're the first node up ok = wait_for_tables(), case rabbit_upgrade:maybe_upgrade(local) of -- cgit v1.2.1 From 1c45e13da167b1cc01992521089efda440d29f65 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 14 Feb 2011 13:39:05 +0000 Subject: Cosmetic --- src/rabbit_upgrade.erl | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index f279029a..bd3e829c 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -49,19 +49,19 @@ maybe_upgrade_mnesia() -> rabbit:prepare(), Nodes = rabbit_mnesia:all_clustered_nodes(), case upgrades_required(mnesia) of - [_|_] = Upgrades -> - case am_i_upgrader(Nodes) of - true -> primary_upgrade(Upgrades, Nodes); - false -> non_primary_upgrade(Nodes) - end; - [] -> - ok; version_not_available -> case Nodes of [_] -> ok; _ -> die("Cluster upgrade needed but upgrading from " "< 2.1.1.~n Unfortunately you will need to " "rebuild the cluster.", []) + end; + [] -> + ok; + Upgrades -> + case am_i_upgrader(Nodes) of + true -> primary_upgrade(Upgrades, Nodes); + false -> non_primary_upgrade(Nodes) end end. -- cgit v1.2.1 From 650217882d88c150663e17b8e5a9a8ce4f59f9a4 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 14 Feb 2011 13:59:39 +0000 Subject: inlining --- src/rabbit_upgrade.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index bd3e829c..c8d2ae87 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -66,8 +66,7 @@ maybe_upgrade_mnesia() -> end. am_i_upgrader(Nodes) -> - Running = nodes_running(Nodes), - case Running of + case nodes_running(Nodes) of [] -> case am_i_disc_node() of true -> true; -- cgit v1.2.1 From c34c4592aaf59da9771a6a9a51de076d11da90a8 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 16 Feb 2011 16:46:39 +0000 Subject: Revert where rabbit:prepare happens. --- src/rabbit_prelaunch.erl | 2 ++ src/rabbit_upgrade.erl | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 612aec80..3283e8fd 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -235,6 +235,8 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. +process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> + [{apply,{rabbit,prepare,[]}}, Entry]; process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) -> [{apply,{rabbit_upgrade,maybe_upgrade_mnesia,[]}}, Entry]; process_entry(Entry) -> diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index c8d2ae87..73b9bb0e 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -46,7 +46,6 @@ %% ------------------------------------------------------------------- maybe_upgrade_mnesia() -> - rabbit:prepare(), Nodes = rabbit_mnesia:all_clustered_nodes(), case upgrades_required(mnesia) of version_not_available -> -- cgit v1.2.1 From 6cae135624ca1ae276ec89066593fb11683021d5 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 16 Feb 2011 16:49:07 +0000 Subject: Rename --- src/rabbit_upgrade.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 73b9bb0e..da735b83 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -58,17 +58,17 @@ maybe_upgrade_mnesia() -> [] -> ok; Upgrades -> - case am_i_upgrader(Nodes) of - true -> primary_upgrade(Upgrades, Nodes); - false -> non_primary_upgrade(Nodes) + case upgrade_mode(Nodes) of + primary -> primary_upgrade(Upgrades, Nodes); + secondary -> non_primary_upgrade(Nodes) end end. -am_i_upgrader(Nodes) -> +upgrade_mode(Nodes) -> case nodes_running(Nodes) of [] -> case am_i_disc_node() of - true -> true; + true -> primary; false -> die("Cluster upgrade needed but this is a ram " "node.~n Please start any of the disc nodes " "first.", []) @@ -85,7 +85,7 @@ am_i_upgrader(Nodes) -> ClusterVersion -> %% The other node(s) have upgraded already, I am not the %% upgrader - false; + secondary; MyVersion -> %% The other node(s) are running an unexpected version. die("Cluster upgrade needed but other nodes are " -- cgit v1.2.1 From b978524f5b06030cda66634a9e17cdca7dcb4fb7 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 16 Feb 2011 17:08:08 +0000 Subject: Prose --- src/rabbit_upgrade.erl | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index da735b83..0fdb973b 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -45,6 +45,47 @@ %% ------------------------------------------------------------------- +%% The upgrade logic is quite involved, due to the existence of +%% clusters. +%% +%% Firstly, we have two different types of upgrades to do: Mnesia and +%% everythinq else. Mnesia upgrades need to only be done by one node +%% in the cluster (we treat a non-clustered node as a single-node +%% cluster). This is the primary upgrader. The other upgrades need to +%% be done by all nodes. +%% +%% The primary upgrader has to start first (and do its Mnesia +%% upgrades). Secondary upgraders need to reset their Mnesia database +%% and then rejoin the cluster. They can't do the Mnesia upgrades as +%% well and then merge databases since the cookie for each table will +%% end up different and the merge will fail. +%% +%% This in turn means that we need to determine whether we are the +%% primary or secondary upgrader *before* Mnesia comes up. If we +%% didn't then the secondary upgrader would try to start Mnesia, and +%% either hang waiting for a node which is not yet up, or fail since +%% its schema differs from the other nodes in the cluster. +%% +%% Also, the primary upgrader needs to start Mnesia to do its +%% upgrades, but needs to forcibly load tables rather than wait for +%% them (in case it was not the last node to shut down, in which case +%% it would wait forever). +%% +%% This in turn means that maybe_upgrade_mnesia/0 has to be patched +%% into the boot process by prelaunch before the mnesia application is +%% started. By the time Mnesia is started the upgrades have happened +%% (on the primary), or Mnesia has been reset (on the secondary) and +%% rabbit_mnesia:init_db/2 can then make the node rejoin the clister +%% in the normal way. +%% +%% The non-mnesia upgrades are then triggered by +%% rabbit_mnesia:init_db/2. Of course, it's possible for a given +%% upgrade process to only require Mnesia upgrades, or only require +%% non-Mnesia upgrades. In the latter case no Mnesia resets and +%% reclusterings occur. + +%% ------------------------------------------------------------------- + maybe_upgrade_mnesia() -> Nodes = rabbit_mnesia:all_clustered_nodes(), case upgrades_required(mnesia) of -- cgit v1.2.1 From ea73a62e8b8a86883fb8683d7f61a5693a519f46 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 16 Feb 2011 17:58:58 +0000 Subject: (Untested) Record the nodes that were up when we shut down. --- src/rabbit.erl | 1 + src/rabbit_mnesia.erl | 36 +++++++++++++++++++++++++++++++++++- src/rabbit_upgrade.erl | 46 +++++++++++++++++++++++++++++++++++++--------- 3 files changed, 73 insertions(+), 10 deletions(-) diff --git a/src/rabbit.erl b/src/rabbit.erl index 1beed5c1..ffb6610d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -203,6 +203,7 @@ start() -> end. stop() -> + rabbit_mnesia:record_running_disc_nodes(), ok = rabbit_misc:stop_applications(?APPS). stop_and_halt() -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index e7da6a43..3f7fc0d8 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -21,7 +21,9 @@ cluster/1, force_cluster/1, reset/0, force_reset/0, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, empty_ram_only_tables/0, copy_db/1, - create_cluster_nodes_config/1, read_cluster_nodes_config/0]). + create_cluster_nodes_config/1, read_cluster_nodes_config/0, + record_running_disc_nodes/0, read_previous_run_disc_nodes/0, + delete_previous_run_disc_nodes/0, running_nodes_filename/0]). -export([table_names/0]). @@ -57,6 +59,10 @@ -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). -spec(read_cluster_nodes_config/0 :: () -> [node()]). +-spec(record_running_disc_nodes/0 :: () -> 'ok'). +-spec(read_previous_run_disc_nodes/0 :: () -> [node()]). +-spec(delete_previous_run_disc_nodes/0 :: () -> 'ok'). +-spec(running_nodes_filename/0 :: () -> file:filename()). -endif. @@ -349,6 +355,34 @@ delete_cluster_nodes_config() -> FileName, Reason}}) end. +running_nodes_filename() -> + dir() ++ "/nodes_running_at_shutdown". + +record_running_disc_nodes() -> + FileName = running_nodes_filename(), + Nodes = rabbit_mnesia:nodes_of_type(disc_copies) -- [node()], + %% Don't check the result: we're shutting down anyway and this is + %% a best-effort-basis. + rabbit_misc:write_term_file(FileName, [Nodes]). + +read_previous_run_disc_nodes() -> + FileName = running_nodes_filename(), + case rabbit_misc:read_term_file(FileName) of + {ok, [Nodes]} -> Nodes; + {error, enoent} -> []; + {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, + FileName, Reason}}) + end. + +delete_previous_run_disc_nodes() -> + FileName = running_nodes_filename(), + case file:delete(FileName) of + ok -> ok; + {error, enoent} -> ok; + {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, + FileName, Reason}}) + end. + %% Take a cluster node config and create the right kind of node - a %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. If Force is false, don't allow diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 0fdb973b..23770686 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -49,8 +49,8 @@ %% clusters. %% %% Firstly, we have two different types of upgrades to do: Mnesia and -%% everythinq else. Mnesia upgrades need to only be done by one node -%% in the cluster (we treat a non-clustered node as a single-node +%% everythinq else. Mnesia upgrades must only be done by one node in +%% the cluster (we treat a non-clustered node as a single-node %% cluster). This is the primary upgrader. The other upgrades need to %% be done by all nodes. %% @@ -75,7 +75,7 @@ %% into the boot process by prelaunch before the mnesia application is %% started. By the time Mnesia is started the upgrades have happened %% (on the primary), or Mnesia has been reset (on the secondary) and -%% rabbit_mnesia:init_db/2 can then make the node rejoin the clister +%% rabbit_mnesia:init_db/2 can then make the node rejoin the cluster %% in the normal way. %% %% The non-mnesia upgrades are then triggered by @@ -83,6 +83,22 @@ %% upgrade process to only require Mnesia upgrades, or only require %% non-Mnesia upgrades. In the latter case no Mnesia resets and %% reclusterings occur. +%% +%% The primary upgrader needs to be a disc node. Ideally we would like +%% it to be the last disc node to shut down (since otherwise there's a +%% risk of data loss). On each node we therefore record the disc nodes +%% that were still running when we shut down. A disc node that knows +%% other nodes were up when it shut down, or a ram node, will refuse +%% to be the primary upgrader, and will thus not start when upgrades +%% are needed. +%% +%% However, this is racy if several nodes are shut down at once. Since +%% rabbit records the running nodes, and shuts down before mnesia, the +%% race manifests as all disc nodes thinking they are not the primary +%% upgrader. Therefore the user can remove the record of the last disc +%% node to shut down to get things going again. This may lose any +%% mnesia changes that happened after the node chosen as the primary +%% upgrader was shut down. %% ------------------------------------------------------------------- @@ -103,16 +119,28 @@ maybe_upgrade_mnesia() -> primary -> primary_upgrade(Upgrades, Nodes); secondary -> non_primary_upgrade(Nodes) end - end. + end, + ok = rabbit_mnesia:delete_previous_run_disc_nodes(). upgrade_mode(Nodes) -> case nodes_running(Nodes) of [] -> - case am_i_disc_node() of - true -> primary; - false -> die("Cluster upgrade needed but this is a ram " - "node.~n Please start any of the disc nodes " - "first.", []) + AfterUs = rabbit_mnesia:read_previous_run_disc_nodes(), + case {am_i_disc_node(), AfterUs} of + {true, []} -> + primary; + {true, _} -> + Filename = rabbit_mnesia:running_nodes_filename(), + die("Cluster upgrade needed but other disc nodes shut " + "down after this one.~n Please start one of the " + "disc nodes: ~p first.~n~n Note: if several disc " + "nodes were shut down simultaneously they may all " + "show this message. In which case, remove ~s on one " + "of them and start that.", [AfterUs, Filename]); + {false, _} -> + die("Cluster upgrade needed but this is a ram " + "node.~n Please start one of the disc nodes: " + "~p first.", [AfterUs]) end; [Another|_] -> ClusterVersion = -- cgit v1.2.1 From b30f89113d57a303c52739712408440d75605532 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 17 Feb 2011 10:11:45 +0000 Subject: Oops, that's not exported. --- src/rabbit_mnesia.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 3f7fc0d8..8acb0b02 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -360,7 +360,7 @@ running_nodes_filename() -> record_running_disc_nodes() -> FileName = running_nodes_filename(), - Nodes = rabbit_mnesia:nodes_of_type(disc_copies) -- [node()], + Nodes = nodes_of_type(disc_copies) -- [node()], %% Don't check the result: we're shutting down anyway and this is %% a best-effort-basis. rabbit_misc:write_term_file(FileName, [Nodes]). -- cgit v1.2.1 From 11a1cc6eac6bd4cde6bd971763348d0384ec2520 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 17 Feb 2011 10:39:45 +0000 Subject: Fix our idea of which nodes were running when we shut down. --- src/rabbit_mnesia.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 8acb0b02..367eb6f8 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -360,7 +360,10 @@ running_nodes_filename() -> record_running_disc_nodes() -> FileName = running_nodes_filename(), - Nodes = nodes_of_type(disc_copies) -- [node()], + Nodes = sets:to_list( + sets:intersection( + sets:from_list(nodes_of_type(disc_copies)), + sets:from_list(running_clustered_nodes()))) -- [node()], %% Don't check the result: we're shutting down anyway and this is %% a best-effort-basis. rabbit_misc:write_term_file(FileName, [Nodes]). -- cgit v1.2.1 From a20039bacdf9f9ca06b82e7673a6a423318fb269 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 17 Feb 2011 10:45:21 +0000 Subject: Make the error messages more readable. --- src/rabbit_upgrade.erl | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 23770686..0c2e4bce 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -109,7 +109,7 @@ maybe_upgrade_mnesia() -> case Nodes of [_] -> ok; _ -> die("Cluster upgrade needed but upgrading from " - "< 2.1.1.~n Unfortunately you will need to " + "< 2.1.1.~nUnfortunately you will need to " "rebuild the cluster.", []) end; [] -> @@ -132,15 +132,19 @@ upgrade_mode(Nodes) -> {true, _} -> Filename = rabbit_mnesia:running_nodes_filename(), die("Cluster upgrade needed but other disc nodes shut " - "down after this one.~n Please start one of the " - "disc nodes: ~p first.~n~n Note: if several disc " - "nodes were shut down simultaneously they may all " - "show this message. In which case, remove ~s on one " - "of them and start that.", [AfterUs, Filename]); + "down after this one.~nPlease first start the last " + "disc node to shut down.~nThe disc nodes that were " + "still running when this one shut down are:~n~n" + " ~p~n~nNote: if several disc nodes were shut down " + "simultaneously they may all~nshow this message. " + "In which case, remove the lock file on one of them " + "and~nstart that node. The lock file on this node " + "is:~n~n ~s ", + [AfterUs, Filename]); {false, _} -> - die("Cluster upgrade needed but this is a ram " - "node.~n Please start one of the disc nodes: " - "~p first.", [AfterUs]) + die("Cluster upgrade needed but this is a ram node.~n" + "Please first start the last disc node to shut down.", + []) end; [Another|_] -> ClusterVersion = @@ -176,7 +180,7 @@ die(Msg, Args) -> %% straight out into do_boot, generating an erl_crash.dump %% and displaying any error message in a confusing way. error_logger:error_msg(Msg, Args), - io:format("~n~n** " ++ Msg ++ " **~n~n~n", Args), + io:format("~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args), error_logger:logfile(close), halt(1). -- cgit v1.2.1 From f673f3919cad23798116ca2f63de64a5b36b03b4 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 17 Feb 2011 10:58:18 +0000 Subject: Retain ram-nodeness when upgrading. --- src/rabbit_upgrade.erl | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 0c2e4bce..56dab3e9 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -103,10 +103,11 @@ %% ------------------------------------------------------------------- maybe_upgrade_mnesia() -> - Nodes = rabbit_mnesia:all_clustered_nodes(), + AllNodes = rabbit_mnesia:all_clustered_nodes(), + KnownDiscNodes = rabbit_mnesia:read_cluster_nodes_config(), case upgrades_required(mnesia) of version_not_available -> - case Nodes of + case AllNodes of [_] -> ok; _ -> die("Cluster upgrade needed but upgrading from " "< 2.1.1.~nUnfortunately you will need to " @@ -115,18 +116,18 @@ maybe_upgrade_mnesia() -> [] -> ok; Upgrades -> - case upgrade_mode(Nodes) of - primary -> primary_upgrade(Upgrades, Nodes); - secondary -> non_primary_upgrade(Nodes) + case upgrade_mode(AllNodes, KnownDiscNodes) of + primary -> primary_upgrade(Upgrades, AllNodes); + secondary -> secondary_upgrade(KnownDiscNodes) end end, ok = rabbit_mnesia:delete_previous_run_disc_nodes(). -upgrade_mode(Nodes) -> - case nodes_running(Nodes) of +upgrade_mode(AllNodes, KnownDiscNodes) -> + case nodes_running(AllNodes) of [] -> AfterUs = rabbit_mnesia:read_previous_run_disc_nodes(), - case {am_i_disc_node(), AfterUs} of + case {am_i_disc_node(KnownDiscNodes), AfterUs} of {true, []} -> primary; {true, _} -> @@ -167,10 +168,10 @@ upgrade_mode(Nodes) -> end end. -am_i_disc_node() -> +am_i_disc_node(KnownDiscNodes) -> %% The cluster config does not list all disc nodes, but it will list us %% if we're one. - case rabbit_mnesia:read_cluster_nodes_config() of + case KnownDiscNodes of [] -> true; DiscNodes -> lists:member(node(), DiscNodes) end. @@ -204,10 +205,10 @@ primary_upgrade(Upgrades, Nodes) -> force_tables() -> [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()]. -non_primary_upgrade(Nodes) -> +secondary_upgrade(KnownDiscNodes) -> rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), - ok = rabbit_mnesia:create_cluster_nodes_config(Nodes), + ok = rabbit_mnesia:create_cluster_nodes_config(KnownDiscNodes), write_version(mnesia), ok. -- cgit v1.2.1 From a25d080a27495b7306a282086e3e2c1ccb7d86be Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 22 Feb 2011 11:15:28 +0000 Subject: Make sure logging is working if we're about to actually do something. --- src/rabbit_upgrade.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 9f33fd03..dd19de19 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -107,6 +107,7 @@ maybe_upgrade_mnesia() -> KnownDiscNodes = rabbit_mnesia:read_cluster_nodes_config(), case upgrades_required(mnesia) of version_not_available -> + rabbit:prepare(), %% Ensure we have logs for this case AllNodes of [_] -> ok; _ -> die("Cluster upgrade needed but upgrading from " @@ -116,6 +117,7 @@ maybe_upgrade_mnesia() -> [] -> ok; Upgrades -> + rabbit:prepare(), %% Ensure we have logs for this case upgrade_mode(AllNodes, KnownDiscNodes) of primary -> primary_upgrade(Upgrades, AllNodes); secondary -> secondary_upgrade(KnownDiscNodes) -- cgit v1.2.1 From bd6e51846b5fbbb6d407f0f1482b054563e1cecc Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 22 Feb 2011 12:50:49 +0000 Subject: Don't look at the cluster config, it is not trustworthy (for what we want). --- src/rabbit_mnesia.erl | 2 +- src/rabbit_upgrade.erl | 33 +++++++++++++++++++-------------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 97c4d11e..68654e46 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -18,7 +18,7 @@ -module(rabbit_mnesia). -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, - cluster/1, force_cluster/1, reset/0, force_reset/0, + cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/2, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, create_cluster_nodes_config/1, read_cluster_nodes_config/0, diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index dd19de19..f1f0d6d3 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -104,7 +104,6 @@ maybe_upgrade_mnesia() -> AllNodes = rabbit_mnesia:all_clustered_nodes(), - KnownDiscNodes = rabbit_mnesia:read_cluster_nodes_config(), case upgrades_required(mnesia) of version_not_available -> rabbit:prepare(), %% Ensure we have logs for this @@ -118,18 +117,18 @@ maybe_upgrade_mnesia() -> ok; Upgrades -> rabbit:prepare(), %% Ensure we have logs for this - case upgrade_mode(AllNodes, KnownDiscNodes) of + case upgrade_mode(AllNodes) of primary -> primary_upgrade(Upgrades, AllNodes); - secondary -> secondary_upgrade(KnownDiscNodes) + secondary -> secondary_upgrade(AllNodes) end end, ok = rabbit_mnesia:delete_previous_run_disc_nodes(). -upgrade_mode(AllNodes, KnownDiscNodes) -> +upgrade_mode(AllNodes) -> case nodes_running(AllNodes) of [] -> AfterUs = rabbit_mnesia:read_previous_run_disc_nodes(), - case {am_i_disc_node(KnownDiscNodes), AfterUs} of + case {am_i_disc_node(), AfterUs} of {true, []} -> primary; {true, _} -> @@ -170,13 +169,11 @@ upgrade_mode(AllNodes, KnownDiscNodes) -> end end. -am_i_disc_node(KnownDiscNodes) -> - %% The cluster config does not list all disc nodes, but it will list us - %% if we're one. - case KnownDiscNodes of - [] -> true; - DiscNodes -> lists:member(node(), DiscNodes) - end. +am_i_disc_node() -> + %% This is pretty ugly but we can't start Mnesia and ask it (will hang), + %% we can't look at the config file (may not include us even if we're a + %% disc node). + filelib:is_regular(rabbit_mnesia:dir() ++ "/rabbit_durable_exchange.DCD"). die(Msg, Args) -> %% We don't throw or exit here since that gets thrown @@ -207,10 +204,18 @@ primary_upgrade(Upgrades, Nodes) -> force_tables() -> [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()]. -secondary_upgrade(KnownDiscNodes) -> +secondary_upgrade(AllNodes) -> rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), - ok = rabbit_mnesia:create_cluster_nodes_config(KnownDiscNodes), + %% Note that we cluster with all nodes, rather than all disc nodes + %% (as we can't know all disc nodes at this point). This is safe as + %% we're not writing the cluster config, just setting up Mnesia. + ClusterNodes = case am_i_disc_node() of + true -> AllNodes; + false -> AllNodes -- [node()] + end, + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + rabbit_mnesia:init_db(ClusterNodes, true), write_version(mnesia), ok. -- cgit v1.2.1 From 13bbf692083e6ab07f771b797333f695dc18db32 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 7 Mar 2011 18:00:11 +0000 Subject: Explain the tuple here. --- src/rabbit_upgrade.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index f1f0d6d3..dd253468 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -242,6 +242,8 @@ read_version() -> case rabbit_misc:read_term_file(schema_filename()) of {ok, [V]} -> case is_new_version(V) of false -> {ok, convert_old_version(V)}; + %% Write in this format for future expansion; + %% we want to allow plugins to own upgrades. true -> [{rabbit, RV}] = V, {ok, RV} end; -- cgit v1.2.1 From 4d806324368f812c4831724f868079f3a2835892 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 7 Mar 2011 18:06:53 +0000 Subject: Variety of small QA-related tweaks. --- src/rabbit.erl | 2 +- src/rabbit_mnesia.erl | 7 ++++--- src/rabbit_upgrade.erl | 20 ++++++++++---------- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/rabbit.erl b/src/rabbit.erl index 21c1452f..e3288eaf 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -204,7 +204,7 @@ start() -> end. stop() -> - rabbit_mnesia:record_running_disc_nodes(), + ok = rabbit_mnesia:record_running_disc_nodes(), ok = rabbit_misc:stop_applications(?APPS). stop_and_halt() -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 33e8764c..30083cc0 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -387,7 +387,8 @@ record_running_disc_nodes() -> sets:from_list(running_clustered_nodes()))) -- [node()], %% Don't check the result: we're shutting down anyway and this is %% a best-effort-basis. - rabbit_misc:write_term_file(FileName, [Nodes]). + rabbit_misc:write_term_file(FileName, [Nodes]), + ok. read_previous_run_disc_nodes() -> FileName = running_nodes_filename(), @@ -433,7 +434,7 @@ init_db(ClusterNodes, Force) -> ok = create_schema(); {[], true} -> %% We're the first node up - case rabbit_upgrade:maybe_upgrade(local) of + case rabbit_upgrade:maybe_upgrade_local() of ok -> ensure_schema_integrity(); version_not_available -> schema_ok_or_move() end; @@ -449,7 +450,7 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), - case rabbit_upgrade:maybe_upgrade(local) of + case rabbit_upgrade:maybe_upgrade_local() of ok -> ok; %% If we're just starting up a new node we won't have diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index dd253468..e466eb87 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -16,7 +16,7 @@ -module(rabbit_upgrade). --export([maybe_upgrade_mnesia/0, maybe_upgrade/1]). +-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]). -export([read_version/0, write_version/0, desired_version/0, desired_version/1]). @@ -35,7 +35,7 @@ -type(scope() :: 'mnesia' | 'local'). -spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). --spec(maybe_upgrade/1 :: (scope()) -> 'ok' | 'version_not_available'). +-spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available'). -spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). -spec(desired_version/0 :: () -> version()). @@ -128,7 +128,7 @@ upgrade_mode(AllNodes) -> case nodes_running(AllNodes) of [] -> AfterUs = rabbit_mnesia:read_previous_run_disc_nodes(), - case {am_i_disc_node(), AfterUs} of + case {is_disc_node(), AfterUs} of {true, []} -> primary; {true, _} -> @@ -169,7 +169,7 @@ upgrade_mode(AllNodes) -> end end. -am_i_disc_node() -> +is_disc_node() -> %% This is pretty ugly but we can't start Mnesia and ask it (will hang), %% we can't look at the config file (may not include us even if we're a %% disc node). @@ -210,13 +210,13 @@ secondary_upgrade(AllNodes) -> %% Note that we cluster with all nodes, rather than all disc nodes %% (as we can't know all disc nodes at this point). This is safe as %% we're not writing the cluster config, just setting up Mnesia. - ClusterNodes = case am_i_disc_node() of + ClusterNodes = case is_disc_node() of true -> AllNodes; false -> AllNodes -- [node()] end, rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - rabbit_mnesia:init_db(ClusterNodes, true), - write_version(mnesia), + ok = rabbit_mnesia:init_db(ClusterNodes, true), + ok = write_version(mnesia), ok. nodes_running(Nodes) -> @@ -230,11 +230,11 @@ node_running(Node) -> %% ------------------------------------------------------------------- -maybe_upgrade(Scope) -> - case upgrades_required(Scope) of +maybe_upgrade_local() -> + case upgrades_required(local) of version_not_available -> version_not_available; [] -> ok; - Upgrades -> apply_upgrades(Scope, Upgrades, + Upgrades -> apply_upgrades(local, Upgrades, fun() -> ok end) end. -- cgit v1.2.1 From 165c1d3f25a44c91650556a68ba725239f1f8d12 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 7 Mar 2011 18:21:36 +0000 Subject: Spec, rename functions. --- src/rabbit_mnesia.erl | 13 +++++++------ src/rabbit_upgrade.erl | 4 ++-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 30083cc0..eb92e9fe 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -22,8 +22,8 @@ is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, create_cluster_nodes_config/1, read_cluster_nodes_config/0, - record_running_disc_nodes/0, read_previous_run_disc_nodes/0, - delete_previous_run_disc_nodes/0, running_nodes_filename/0]). + record_running_disc_nodes/0, read_previously_running_disc_nodes/0, + delete_previously_running_disc_nodes/0, running_nodes_filename/0]). -export([table_names/0]). @@ -45,6 +45,7 @@ -spec(dir/0 :: () -> file:filename()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). +-spec(init_db/2 :: ([node()], boolean()) -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). -spec(cluster/1 :: ([node()]) -> 'ok'). -spec(force_cluster/1 :: ([node()]) -> 'ok'). @@ -61,8 +62,8 @@ -spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). -spec(read_cluster_nodes_config/0 :: () -> [node()]). -spec(record_running_disc_nodes/0 :: () -> 'ok'). --spec(read_previous_run_disc_nodes/0 :: () -> [node()]). --spec(delete_previous_run_disc_nodes/0 :: () -> 'ok'). +-spec(read_previously_running_disc_nodes/0 :: () -> [node()]). +-spec(delete_previously_running_disc_nodes/0 :: () -> 'ok'). -spec(running_nodes_filename/0 :: () -> file:filename()). -endif. @@ -390,7 +391,7 @@ record_running_disc_nodes() -> rabbit_misc:write_term_file(FileName, [Nodes]), ok. -read_previous_run_disc_nodes() -> +read_previously_running_disc_nodes() -> FileName = running_nodes_filename(), case rabbit_misc:read_term_file(FileName) of {ok, [Nodes]} -> Nodes; @@ -399,7 +400,7 @@ read_previous_run_disc_nodes() -> FileName, Reason}}) end. -delete_previous_run_disc_nodes() -> +delete_previously_running_disc_nodes() -> FileName = running_nodes_filename(), case file:delete(FileName) of ok -> ok; diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index e466eb87..0a821878 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -122,12 +122,12 @@ maybe_upgrade_mnesia() -> secondary -> secondary_upgrade(AllNodes) end end, - ok = rabbit_mnesia:delete_previous_run_disc_nodes(). + ok = rabbit_mnesia:delete_previously_running_disc_nodes(). upgrade_mode(AllNodes) -> case nodes_running(AllNodes) of [] -> - AfterUs = rabbit_mnesia:read_previous_run_disc_nodes(), + AfterUs = rabbit_mnesia:read_previously_running_disc_nodes(), case {is_disc_node(), AfterUs} of {true, []} -> primary; -- cgit v1.2.1 From f6d550f49e9e90a551ecd20e80d405068db7d781 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 7 Mar 2011 18:23:56 +0000 Subject: Simpler is_new_version/1. --- src/rabbit_upgrade.erl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 0a821878..f59dbdfe 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -401,7 +401,8 @@ lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). info(Msg, Args) -> error_logger:info_msg(Msg, Args). is_new_version(Version) -> - is_list(Version) andalso - length(Version) > 0 andalso - lists:all(fun(Item) -> is_tuple(Item) andalso size(Item) == 2 end, - Version). + try + orddict:size(Version) > 0 + catch error:badarg -> + false + end. -- cgit v1.2.1 From 41ce63c6dab98f8ce0b80bfcd52d86e1a53ef23d Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 8 Mar 2011 14:11:11 +0000 Subject: Don't change the version file format --- src/rabbit_upgrade.erl | 49 ++++++++++++++++++------------------------------- 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index f59dbdfe..8113bad8 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -240,50 +240,44 @@ maybe_upgrade_local() -> read_version() -> case rabbit_misc:read_term_file(schema_filename()) of - {ok, [V]} -> case is_new_version(V) of - false -> {ok, convert_old_version(V)}; - %% Write in this format for future expansion; - %% we want to allow plugins to own upgrades. - true -> [{rabbit, RV}] = V, - {ok, RV} - end; + {ok, [V]} -> {ok, V}; {error, _} = Err -> Err end. read_version(Scope) -> case read_version() of {error, _} = E -> E; - {ok, V} -> {ok, orddict:fetch(Scope, V)} + {ok, V} -> {ok, filter_by_scope(Scope, V)} end. write_version() -> - ok = rabbit_misc:write_term_file(schema_filename(), - [[{rabbit, desired_version()}]]), + ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]), ok. write_version(Scope) -> {ok, V0} = read_version(), - V = orddict:store(Scope, desired_version(Scope), V0), - ok = rabbit_misc:write_term_file(schema_filename(), [[{rabbit, V}]]), + V = flatten([case S of + Scope -> desired_version(S); + _ -> filter_by_scope(S, V0) + end || S <- ?SCOPES]), + ok = rabbit_misc:write_term_file(schema_filename(), [V]), ok. desired_version() -> - lists:foldl( - fun (Scope, Acc) -> - orddict:store(Scope, desired_version(Scope), Acc) - end, - orddict:new(), ?SCOPES). + flatten([desired_version(Scope) || Scope <- ?SCOPES]). desired_version(Scope) -> with_upgrade_graph(fun (G) -> heads(G) end, Scope). -convert_old_version(Heads) -> - Locals = [add_queue_ttl], - V0 = orddict:new(), - V1 = orddict:store(mnesia, Heads -- Locals, V0), - orddict:store(local, - lists:filter(fun(H) -> lists:member(H, Locals) end, Heads), - V1). +flatten(LoL) -> + lists:sort(lists:flatten(LoL)). + +filter_by_scope(Scope, Versions) -> + with_upgrade_graph( + fun(G) -> + ScopeVs = digraph:vertices(G), + [V || V <- Versions, lists:member(V, ScopeVs)] + end, Scope). %% ------------------------------------------------------------------- @@ -399,10 +393,3 @@ lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). %% NB: we cannot use rabbit_log here since it may not have been %% started yet info(Msg, Args) -> error_logger:info_msg(Msg, Args). - -is_new_version(Version) -> - try - orddict:size(Version) > 0 - catch error:badarg -> - false - end. -- cgit v1.2.1 From b610ff9f65d1289904f34a49a242fe843db094cc Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 8 Mar 2011 14:19:40 +0000 Subject: Use lists:append/1. --- src/rabbit_upgrade.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 8113bad8..1284d229 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -270,7 +270,7 @@ desired_version(Scope) -> with_upgrade_graph(fun (G) -> heads(G) end, Scope). flatten(LoL) -> - lists:sort(lists:flatten(LoL)). + lists:sort(lists:append(LoL)). filter_by_scope(Scope, Versions) -> with_upgrade_graph( -- cgit v1.2.1 From 22007275cb3d133e047a291510d716b23fe05dfb Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 9 Mar 2011 11:17:27 +0000 Subject: Correct upgrade step --- src/rabbit_variable_queue.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d1307b85..c75ecf86 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -294,7 +294,7 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({multiple_routing_keys, []}). +-rabbit_upgrade({multiple_routing_keys, local, []}). -ifdef(use_specs). -- cgit v1.2.1 From cebd128e876c49e6d7e91da3ccc10aba1bb3c5b3 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 11 Mar 2011 14:06:21 +0000 Subject: Add timestamps to error_logger messages --- src/rabbit_error_logger.erl | 3 ++- src/rabbit_misc.erl | 8 +++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 0120f0d6..33dfcef9 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -69,6 +69,7 @@ publish(_Other, _Format, _Data, _State) -> publish1(RoutingKey, Format, Data, LogExch) -> {ok, _RoutingRes, _DeliveredQPids} = rabbit_basic:publish(LogExch, RoutingKey, false, false, none, - #'P_basic'{content_type = <<"text/plain">>}, + #'P_basic'{content_type = <<"text/plain">>, + timestamp = rabbit_misc:timestamp()}, list_to_binary(io_lib:format(Format, Data))), ok. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index e79a58a1..713498c8 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -52,7 +52,7 @@ unlink_and_capture_exit/1]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). --export([now_ms/0]). +-export([now_ms/0, timestamp/0]). -export([lock_file/1]). -export([const_ok/1, const/1]). -export([ntoa/1, ntoab/1]). @@ -190,6 +190,7 @@ {bad_edge, [digraph:vertex()]}), digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). +-spec(timestamp/0 ::() -> non_neg_integer()). -spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). -spec(const_ok/1 :: (any()) -> 'ok'). -spec(const/1 :: (A) -> const(A)). @@ -199,6 +200,7 @@ -endif. +-define(EPOCH, {{1970, 1, 1}, {0, 0, 0}}). %%---------------------------------------------------------------------------- method_record_type(Record) -> @@ -791,6 +793,10 @@ get_flag(_, []) -> now_ms() -> timer:now_diff(now(), {0,0,0}) div 1000. +timestamp() -> + calendar:datetime_to_gregorian_seconds(erlang:universaltime()) - + calendar:datetime_to_gregorian_seconds(?EPOCH). + module_attributes(Module) -> case catch Module:module_info(attributes) of {'EXIT', {undef, [{Module, module_info, _} | _]}} -> -- cgit v1.2.1 From ae2e8ee3a60753439654ea6feef90ca7df3a3096 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 16 Mar 2011 17:30:30 +0000 Subject: Abstract and rewrite schema_version handling functions --- src/rabbit_mnesia.erl | 18 +++++---- src/rabbit_upgrade.erl | 96 ++++++++++++++------------------------------- src/rabbit_version.erl | 103 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 143 insertions(+), 74 deletions(-) create mode 100644 src/rabbit_version.erl diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index e61f5fce..fa442c9c 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -442,7 +442,7 @@ init_db(ClusterNodes, Force) -> {[AnotherNode|_], _} -> %% Subsequent node in cluster, catch up ensure_version_ok( - rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), + rpc:call(AnotherNode, rabbit_version, read, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), ok = wait_for_replicated_tables(), @@ -457,7 +457,8 @@ init_db(ClusterNodes, Force) -> %% If we're just starting up a new node we won't have %% a version version_not_available -> - ok = rabbit_upgrade:write_version() + ok = rabbit_version:write( + rabbit_upgrade:desired_version()) end, ensure_schema_integrity() end; @@ -484,13 +485,14 @@ schema_ok_or_move() -> end. ensure_version_ok({ok, DiscVersion}) -> - case rabbit_upgrade:desired_version() of - DiscVersion -> ok; - DesiredVersion -> throw({error, {schema_mismatch, - DesiredVersion, DiscVersion}}) + DesiredVersion = rabbit_upgrade:desired_version(), + case rabbit_version:'=~='(DesiredVersion, DiscVersion) of + true -> ok; + false -> throw({error, {schema_mismatch, + DesiredVersion, DiscVersion}}) end; ensure_version_ok({error, _}) -> - ok = rabbit_upgrade:write_version(). + ok = rabbit_version:write(rabbit_upgrade:desired_version()). create_schema() -> mnesia:stop(), @@ -500,7 +502,7 @@ create_schema() -> cannot_start_mnesia), ok = create_tables(), ok = ensure_schema_integrity(), - ok = rabbit_upgrade:write_version(). + ok = rabbit_version:write(rabbit_upgrade:desired_version()). move_db() -> mnesia:stop(), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index f1134cfa..7a4a4fd8 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -17,8 +17,7 @@ -module(rabbit_upgrade). -export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]). --export([read_version/0, write_version/0, desired_version/0, - desired_version/1]). +-export([desired_version/0]). -include("rabbit.hrl"). @@ -30,16 +29,9 @@ -ifdef(use_specs). --type(step() :: atom()). --type(version() :: [{scope(), [step()]}]). --type(scope() :: 'mnesia' | 'local'). - -spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). -spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available'). --spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). --spec(write_version/0 :: () -> 'ok'). --spec(desired_version/0 :: () -> version()). --spec(desired_version/1 :: (scope()) -> [step()]). +-spec(desired_version/0 :: () -> rabbit_version:version()). -endif. @@ -173,7 +165,7 @@ is_disc_node() -> %% This is pretty ugly but we can't start Mnesia and ask it (will hang), %% we can't look at the config file (may not include us even if we're a %% disc node). - filelib:is_regular(rabbit_mnesia:dir() ++ "/rabbit_durable_exchange.DCD"). + filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")). die(Msg, Args) -> %% We don't throw or exit here since that gets thrown @@ -216,7 +208,7 @@ secondary_upgrade(AllNodes) -> end, rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ok = rabbit_mnesia:init_db(ClusterNodes, true), - ok = write_version(mnesia), + ok = write_desired_scope_version(mnesia), ok. nodes_running(Nodes) -> @@ -238,63 +230,37 @@ maybe_upgrade_local() -> fun() -> ok end) end. -read_version() -> - case rabbit_misc:read_term_file(schema_filename()) of - {ok, [V]} -> {ok, V}; - {error, _} = Err -> Err - end. - -read_version(Scope) -> - case read_version() of - {error, _} = E -> E; - {ok, V} -> {ok, filter_by_scope(Scope, V)} - end. - -write_version() -> - ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]), - ok. - -write_version(Scope) -> - {ok, V0} = read_version(), - V = flatten([case S of - Scope -> desired_version(S); - _ -> filter_by_scope(S, V0) - end || S <- ?SCOPES]), - ok = rabbit_misc:write_term_file(schema_filename(), [V]), - ok. - -desired_version() -> - flatten([desired_version(Scope) || Scope <- ?SCOPES]). +desired_version() -> [{Scope, desired_version(Scope)} || Scope <- ?SCOPES]. -desired_version(Scope) -> - with_upgrade_graph(fun (G) -> heads(G) end, Scope). +desired_version(Scope) -> with_upgrade_graph(fun (G) -> heads(G) end, Scope). -flatten(LoL) -> - lists:sort(lists:append(LoL)). - -filter_by_scope(Scope, Versions) -> - with_upgrade_graph( - fun(G) -> - ScopeVs = digraph:vertices(G), - [V || V <- Versions, lists:member(V, ScopeVs)] - end, Scope). +write_desired_scope_version(Scope) -> + ok = rabbit_version:with_scope_version( + Scope, + fun ({error, Error}) -> + throw({error, {can_not_read_version_to_write_it, Error}}) + end, + fun (_SV) -> {desired_version(Scope), ok} end). %% ------------------------------------------------------------------- upgrades_required(Scope) -> - case read_version(Scope) of - {ok, CurrentHeads} -> - with_upgrade_graph( - fun (G) -> - case unknown_heads(CurrentHeads, G) of - [] -> upgrades_to_apply(CurrentHeads, G); - Unknown -> throw({error, - {future_upgrades_found, Unknown}}) - end - end, Scope); - {error, enoent} -> - version_not_available - end. + rabbit_version:with_scope_version( + Scope, + fun ({error, enoent}) -> version_not_available end, + fun (CurrentHeads) -> + {CurrentHeads, + with_upgrade_graph( + fun (G) -> + case unknown_heads(CurrentHeads, G) of + [] -> + upgrades_to_apply(CurrentHeads, G); + Unknown -> + throw({error, + {future_upgrades_found, Unknown}}) + end + end, Scope)} + end). with_upgrade_graph(Fun, Scope) -> case rabbit_misc:build_acyclic_graph( @@ -363,7 +329,7 @@ apply_upgrades(Scope, Upgrades, Fun) -> [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades], info("~s upgrades: All upgrades applied successfully~n", [Scope]), - ok = write_version(Scope), + ok = write_desired_scope_version(Scope), ok = rabbit_misc:recursive_delete([BackupDir]), info("~s upgrades: Mnesia backup removed~n", [Scope]), ok = file:delete(LockFile); @@ -386,8 +352,6 @@ apply_upgrade(Scope, {M, F}) -> dir() -> rabbit_mnesia:dir(). -schema_filename() -> filename:join(dir(), ?VERSION_FILENAME). - lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). %% NB: we cannot use rabbit_log here since it may not have been diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl new file mode 100644 index 00000000..c88d57fe --- /dev/null +++ b/src/rabbit_version.erl @@ -0,0 +1,103 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_version). + +-export([read/0, write/1, with_scope_version/3, '=~='/2]). + +%% ------------------------------------------------------------------- +-ifdef(use_specs). + +-export_type([step/0, version/0, scope/0]). + +-type(step() :: atom()). +-type(version() :: [{scope(), [step()]}]). +-type(scope() :: atom()). + +-spec(read/0 :: () -> rabbit_types:ok_or_error2(version(), any())). +-spec(write/1 :: (version()) -> 'ok'). +-spec(with_scope_version/3 :: + (scope(), + fun (({'error', any()}) -> E), + fun (([step()]) -> {[step()], A})) -> E | A). +-spec('=~='/2 :: (version(), version()) -> boolean()). + +-endif. +%% ------------------------------------------------------------------- + +-define(VERSION_FILENAME, "schema_version"). + +%% ------------------------------------------------------------------- + +read() -> + case rabbit_misc:read_term_file(schema_filename()) of + {ok, [V]} -> {ok, categorise_by_scope(V)}; + {error, _} = Err -> Err + end. + +write(Version) -> + V = [Name || {_Scope, Names} <- Version, Name <- Names], + ok = rabbit_misc:write_term_file(schema_filename(), [V]). + +with_scope_version(Scope, ErrorHandler, Fun) -> + case read() of + {error, _} = Err -> + ErrorHandler(Err); + {ok, Version} -> + SV = case lists:keysearch(Scope, 1, Version) of + false -> []; + {value, {Scope, SV1}} -> SV1 + end, + {SV2, Result} = Fun(SV), + ok = case SV =:= SV2 of + true -> ok; + false -> write(lists:keystore(Scope, 1, Version, + {Scope, SV2})) + end, + Result + end. + +'=~='(VerA, VerB) -> + matches(lists:usort(VerA), lists:usort(VerB)). + +%% ------------------------------------------------------------------- + +matches([], []) -> + true; +matches([{Scope, SV}|VerA], [{Scope, SV}|VerB]) -> + matches(VerA, VerB); +matches([{Scope, SVA}|VerA], [{Scope, SVB}|VerB]) -> + case {lists:usort(SVA), lists:usort(SVB)} of + {SV, SV} -> matches(VerA, VerB); + _ -> false + end; +matches(_VerA, _VerB) -> + false. + +categorise_by_scope(Heads) when is_list(Heads) -> + Categorised = + [{Scope, Name} || {_Module, Attributes} <- + rabbit_misc:all_module_attributes(rabbit_upgrade), + {Name, Scope, _Requires} <- Attributes, + lists:member(Name, Heads)], + orddict:to_list( + lists:foldl(fun ({Scope, Name}, Version) -> + rabbit_misc:orddict_cons(Scope, Name, Version) + end, orddict:new(), Categorised)). + +dir() -> rabbit_mnesia:dir(). + +schema_filename() -> filename:join(dir(), ?VERSION_FILENAME). -- cgit v1.2.1 From 7f05a48a9e4ebbd62d41e3be3d514f689318abeb Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 16 Mar 2011 18:02:15 +0000 Subject: english --- src/rabbit_upgrade.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 7a4a4fd8..9b2ffa28 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -238,7 +238,7 @@ write_desired_scope_version(Scope) -> ok = rabbit_version:with_scope_version( Scope, fun ({error, Error}) -> - throw({error, {can_not_read_version_to_write_it, Error}}) + throw({error, {cannot_read_version_to_write_it, Error}}) end, fun (_SV) -> {desired_version(Scope), ok} end). -- cgit v1.2.1 From f10c0b62e57dd2e5d0f2dd877e03dfd699298cc9 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 16 Mar 2011 18:13:19 +0000 Subject: ordering --- src/rabbit_version.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index c88d57fe..8c577f9c 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -23,9 +23,9 @@ -export_type([step/0, version/0, scope/0]). +-type(scope() :: atom()). -type(step() :: atom()). -type(version() :: [{scope(), [step()]}]). --type(scope() :: atom()). -spec(read/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write/1 :: (version()) -> 'ok'). -- cgit v1.2.1 From 221433535cd1551a83132d0a8d46440dd12ea433 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 17 Mar 2011 00:52:11 +0000 Subject: incorporate qa feedback. The version.erl api is rather nice now: the version itself is entirely opaque - whilst it can be read, there's nothing provided to decompose it at all. --- src/rabbit_mnesia.erl | 9 ++- src/rabbit_upgrade.erl | 116 +++++++---------------------------- src/rabbit_version.erl | 160 +++++++++++++++++++++++++++++++++++-------------- 3 files changed, 139 insertions(+), 146 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index fa442c9c..4902cfeb 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -457,8 +457,7 @@ init_db(ClusterNodes, Force) -> %% If we're just starting up a new node we won't have %% a version version_not_available -> - ok = rabbit_version:write( - rabbit_upgrade:desired_version()) + ok = rabbit_version:write_desired_version() end, ensure_schema_integrity() end; @@ -485,14 +484,14 @@ schema_ok_or_move() -> end. ensure_version_ok({ok, DiscVersion}) -> - DesiredVersion = rabbit_upgrade:desired_version(), + DesiredVersion = rabbit_version:desired_version(), case rabbit_version:'=~='(DesiredVersion, DiscVersion) of true -> ok; false -> throw({error, {schema_mismatch, DesiredVersion, DiscVersion}}) end; ensure_version_ok({error, _}) -> - ok = rabbit_version:write(rabbit_upgrade:desired_version()). + ok = rabbit_version:write_desired_version(). create_schema() -> mnesia:stop(), @@ -502,7 +501,7 @@ create_schema() -> cannot_start_mnesia), ok = create_tables(), ok = ensure_schema_integrity(), - ok = rabbit_version:write(rabbit_upgrade:desired_version()). + ok = rabbit_version:write_desired_version(). move_db() -> mnesia:stop(), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 9b2ffa28..9347cc53 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -17,13 +17,11 @@ -module(rabbit_upgrade). -export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]). --export([desired_version/0]). -include("rabbit.hrl"). -define(VERSION_FILENAME, "schema_version"). -define(LOCK_FILENAME, "schema_upgrade_lock"). --define(SCOPES, [mnesia, local]). %% ------------------------------------------------------------------- @@ -31,7 +29,6 @@ -spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). -spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available'). --spec(desired_version/0 :: () -> rabbit_version:version()). -endif. @@ -96,8 +93,8 @@ maybe_upgrade_mnesia() -> AllNodes = rabbit_mnesia:all_clustered_nodes(), - case upgrades_required(mnesia) of - version_not_available -> + case rabbit_version:upgrades_required(mnesia) of + {error, version_not_available} -> rabbit:prepare(), %% Ensure we have logs for this case AllNodes of [_] -> ok; @@ -105,9 +102,11 @@ maybe_upgrade_mnesia() -> "< 2.1.1.~nUnfortunately you will need to " "rebuild the cluster.", []) end; - [] -> + {error, _} = Err -> + throw(Err); + {ok, []} -> ok; - Upgrades -> + {ok, Upgrades} -> rabbit:prepare(), %% Ensure we have logs for this case upgrade_mode(AllNodes) of primary -> primary_upgrade(Upgrades, AllNodes); @@ -142,18 +141,19 @@ upgrade_mode(AllNodes) -> end; [Another|_] -> ClusterVersion = - case rpc:call(Another, - rabbit_upgrade, desired_version, [mnesia]) of + case rpc:call(Another, rabbit_version, desired_scope_version, + [mnesia]) of {badrpc, {'EXIT', {undef, _}}} -> unknown_old_version; {badrpc, Reason} -> {unknown, Reason}; V -> V end, - case desired_version(mnesia) of - ClusterVersion -> + MyVersion = rabbit_version:desired_scope_version(mnesia), + case rabbit_version:'=~='(ClusterVersion, MyVersion) of + true -> %% The other node(s) have upgraded already, I am not the %% upgrader secondary; - MyVersion -> + false -> %% The other node(s) are running an unexpected version. die("Cluster upgrade needed but other nodes are " "running ~p~nand I want ~p", @@ -208,7 +208,7 @@ secondary_upgrade(AllNodes) -> end, rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ok = rabbit_mnesia:init_db(ClusterNodes, true), - ok = write_desired_scope_version(mnesia), + ok = rabbit_version:write_desired_scope_version(mnesia), ok. nodes_running(Nodes) -> @@ -223,90 +223,14 @@ node_running(Node) -> %% ------------------------------------------------------------------- maybe_upgrade_local() -> - case upgrades_required(local) of - version_not_available -> version_not_available; - [] -> ok; - Upgrades -> apply_upgrades(local, Upgrades, - fun() -> ok end) + case rabbit_version:upgrades_required(local) of + {error, version_not_available} -> version_not_available; + {error, _} = Err -> throw(Err); + {ok, []} -> ok; + {ok, Upgrades} -> apply_upgrades(local, Upgrades, + fun () -> ok end) end. -desired_version() -> [{Scope, desired_version(Scope)} || Scope <- ?SCOPES]. - -desired_version(Scope) -> with_upgrade_graph(fun (G) -> heads(G) end, Scope). - -write_desired_scope_version(Scope) -> - ok = rabbit_version:with_scope_version( - Scope, - fun ({error, Error}) -> - throw({error, {cannot_read_version_to_write_it, Error}}) - end, - fun (_SV) -> {desired_version(Scope), ok} end). - -%% ------------------------------------------------------------------- - -upgrades_required(Scope) -> - rabbit_version:with_scope_version( - Scope, - fun ({error, enoent}) -> version_not_available end, - fun (CurrentHeads) -> - {CurrentHeads, - with_upgrade_graph( - fun (G) -> - case unknown_heads(CurrentHeads, G) of - [] -> - upgrades_to_apply(CurrentHeads, G); - Unknown -> - throw({error, - {future_upgrades_found, Unknown}}) - end - end, Scope)} - end). - -with_upgrade_graph(Fun, Scope) -> - case rabbit_misc:build_acyclic_graph( - fun (Module, Steps) -> vertices(Module, Steps, Scope) end, - fun (Module, Steps) -> edges(Module, Steps, Scope) end, - rabbit_misc:all_module_attributes(rabbit_upgrade)) of - {ok, G} -> try - Fun(G) - after - true = digraph:delete(G) - end; - {error, {vertex, duplicate, StepName}} -> - throw({error, {duplicate_upgrade_step, StepName}}); - {error, {edge, {bad_vertex, StepName}, _From, _To}} -> - throw({error, {dependency_on_unknown_upgrade_step, StepName}}); - {error, {edge, {bad_edge, StepNames}, _From, _To}} -> - throw({error, {cycle_in_upgrade_steps, StepNames}}) - end. - -vertices(Module, Steps, Scope0) -> - [{StepName, {Module, StepName}} || {StepName, Scope1, _Reqs} <- Steps, - Scope0 == Scope1]. - -edges(_Module, Steps, Scope0) -> - [{Require, StepName} || {StepName, Scope1, Requires} <- Steps, - Require <- Requires, - Scope0 == Scope1]. -unknown_heads(Heads, G) -> - [H || H <- Heads, digraph:vertex(G, H) =:= false]. - -upgrades_to_apply(Heads, G) -> - %% Take all the vertices which can reach the known heads. That's - %% everything we've already applied. Subtract that from all - %% vertices: that's what we have to apply. - Unsorted = sets:to_list( - sets:subtract( - sets:from_list(digraph:vertices(G)), - sets:from_list(digraph_utils:reaching(Heads, G)))), - %% Form a subgraph from that list and find a topological ordering - %% so we can invoke them in order. - [element(2, digraph:vertex(G, StepName)) || - StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. - -heads(G) -> - lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). - %% ------------------------------------------------------------------- apply_upgrades(Scope, Upgrades, Fun) -> @@ -329,7 +253,7 @@ apply_upgrades(Scope, Upgrades, Fun) -> [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades], info("~s upgrades: All upgrades applied successfully~n", [Scope]), - ok = write_desired_scope_version(Scope), + ok = rabbit_version:write_desired_scope_version(Scope), ok = rabbit_misc:recursive_delete([BackupDir]), info("~s upgrades: Mnesia backup removed~n", [Scope]), ok = file:delete(LockFile); diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index 8c577f9c..2d7ba8e4 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -16,86 +16,156 @@ -module(rabbit_version). --export([read/0, write/1, with_scope_version/3, '=~='/2]). +-export([read/0, '=~='/2, desired_version/0, desired_scope_version/1, + write_desired_version/0, write_desired_scope_version/1, + upgrades_required/1]). %% ------------------------------------------------------------------- -ifdef(use_specs). --export_type([step/0, version/0, scope/0]). +-export_type([scope/0, step/0, scope_version/0]). -type(scope() :: atom()). --type(step() :: atom()). --type(version() :: [{scope(), [step()]}]). +-type(scope_version() :: [atom()]). +-type(step() :: {atom(), atom()}). + +-type(version() :: [atom()]). -spec(read/0 :: () -> rabbit_types:ok_or_error2(version(), any())). --spec(write/1 :: (version()) -> 'ok'). --spec(with_scope_version/3 :: - (scope(), - fun (({'error', any()}) -> E), - fun (([step()]) -> {[step()], A})) -> E | A). -spec('=~='/2 :: (version(), version()) -> boolean()). +-spec(desired_version/0 :: () -> version()). +-spec(desired_scope_version/1 :: (scope()) -> scope_version()). +-spec(write_desired_version/0 :: () -> 'ok'). +-spec(write_desired_scope_version/1 :: + (scope()) -> rabbit_types:ok_or_error(any())). +-spec(upgrades_required/1 :: + (scope()) -> rabbit_types:ok_or_error2([step()], any())). -endif. %% ------------------------------------------------------------------- -define(VERSION_FILENAME, "schema_version"). +-define(SCOPES, [mnesia, local]). %% ------------------------------------------------------------------- -read() -> - case rabbit_misc:read_term_file(schema_filename()) of - {ok, [V]} -> {ok, categorise_by_scope(V)}; - {error, _} = Err -> Err - end. +read() -> case rabbit_misc:read_term_file(schema_filename()) of + {ok, [V]} -> {ok, V}; + {error, _} = Err -> Err + end. -write(Version) -> - V = [Name || {_Scope, Names} <- Version, Name <- Names], - ok = rabbit_misc:write_term_file(schema_filename(), [V]). +write(V) -> ok = rabbit_misc:write_term_file(schema_filename(), [V]). -with_scope_version(Scope, ErrorHandler, Fun) -> +read_scope_version(Scope) -> case read() of {error, _} = Err -> - ErrorHandler(Err); + Err; {ok, Version} -> - SV = case lists:keysearch(Scope, 1, Version) of + {ok, case lists:keysearch(Scope, 1, categorise_by_scope(Version)) of false -> []; {value, {Scope, SV1}} -> SV1 - end, - {SV2, Result} = Fun(SV), - ok = case SV =:= SV2 of - true -> ok; - false -> write(lists:keystore(Scope, 1, Version, - {Scope, SV2})) - end, - Result + end} end. +write_scope_version(Scope, ScopeVersion) -> + case read() of + {error, _} = Err -> + Err; + {ok, Version} -> + Version1 = lists:keystore(Scope, 1, categorise_by_scope(Version), + {Scope, ScopeVersion}), + ok = write([Name || {_Scope, Names} <- Version1, Name <- Names]) + end. + +%% ------------------------------------------------------------------- + '=~='(VerA, VerB) -> - matches(lists:usort(VerA), lists:usort(VerB)). + lists:usort(VerA) =:= lists:usort(VerB). + +%% ------------------------------------------------------------------- + +desired_version() -> + [Name || Scope <- ?SCOPES, Name <- desired_scope_version(Scope)]. + +desired_scope_version(Scope) -> with_upgrade_graph(fun heads/1, Scope). + +write_desired_version() -> write(desired_version()). + +write_desired_scope_version(Scope) -> + write_scope_version(Scope, desired_scope_version(Scope)). + +upgrades_required(Scope) -> + case read_scope_version(Scope) of + {error, enoent} -> + {error, version_not_available}; + {ok, CurrentHeads} -> + with_upgrade_graph( + fun (G) -> + case unknown_heads(CurrentHeads, G) of + [] -> {ok, upgrades_to_apply(CurrentHeads, G)}; + Unknown -> {error, {future_upgrades_found, Unknown}} + end + end, Scope) + end. + +%% ------------------------------------------------------------------- + +with_upgrade_graph(Fun, Scope) -> + case rabbit_misc:build_acyclic_graph( + fun (Module, Steps) -> vertices(Module, Steps, Scope) end, + fun (Module, Steps) -> edges(Module, Steps, Scope) end, + rabbit_misc:all_module_attributes(rabbit_upgrade)) of + {ok, G} -> try + Fun(G) + after + true = digraph:delete(G) + end; + {error, {vertex, duplicate, StepName}} -> + throw({error, {duplicate_upgrade_step, StepName}}); + {error, {edge, {bad_vertex, StepName}, _From, _To}} -> + throw({error, {dependency_on_unknown_upgrade_step, StepName}}); + {error, {edge, {bad_edge, StepNames}, _From, _To}} -> + throw({error, {cycle_in_upgrade_steps, StepNames}}) + end. + +vertices(Module, Steps, Scope0) -> + [{StepName, {Module, StepName}} || {StepName, Scope1, _Reqs} <- Steps, + Scope0 == Scope1]. + +edges(_Module, Steps, Scope0) -> + [{Require, StepName} || {StepName, Scope1, Requires} <- Steps, + Require <- Requires, + Scope0 == Scope1]. +unknown_heads(Heads, G) -> + [H || H <- Heads, digraph:vertex(G, H) =:= false]. + +upgrades_to_apply(Heads, G) -> + %% Take all the vertices which can reach the known heads. That's + %% everything we've already applied. Subtract that from all + %% vertices: that's what we have to apply. + Unsorted = sets:to_list( + sets:subtract( + sets:from_list(digraph:vertices(G)), + sets:from_list(digraph_utils:reaching(Heads, G)))), + %% Form a subgraph from that list and find a topological ordering + %% so we can invoke them in order. + [element(2, digraph:vertex(G, StepName)) || + StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. + +heads(G) -> + lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). %% ------------------------------------------------------------------- -matches([], []) -> - true; -matches([{Scope, SV}|VerA], [{Scope, SV}|VerB]) -> - matches(VerA, VerB); -matches([{Scope, SVA}|VerA], [{Scope, SVB}|VerB]) -> - case {lists:usort(SVA), lists:usort(SVB)} of - {SV, SV} -> matches(VerA, VerB); - _ -> false - end; -matches(_VerA, _VerB) -> - false. - -categorise_by_scope(Heads) when is_list(Heads) -> +categorise_by_scope(Version) when is_list(Version) -> Categorised = [{Scope, Name} || {_Module, Attributes} <- rabbit_misc:all_module_attributes(rabbit_upgrade), {Name, Scope, _Requires} <- Attributes, - lists:member(Name, Heads)], + lists:member(Name, Version)], orddict:to_list( - lists:foldl(fun ({Scope, Name}, Version) -> - rabbit_misc:orddict_cons(Scope, Name, Version) + lists:foldl(fun ({Scope, Name}, CatVersion) -> + rabbit_misc:orddict_cons(Scope, Name, CatVersion) end, orddict:new(), Categorised)). dir() -> rabbit_mnesia:dir(). -- cgit v1.2.1 From 5a390fde517e6f8539f75199b357d064d8c11541 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 17 Mar 2011 09:17:27 +0000 Subject: renamings --- src/rabbit_mnesia.erl | 10 +++++----- src/rabbit_upgrade.erl | 8 ++++---- src/rabbit_version.erl | 51 +++++++++++++++++++++++++------------------------- 3 files changed, 34 insertions(+), 35 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 4902cfeb..c598fbb9 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -442,7 +442,7 @@ init_db(ClusterNodes, Force) -> {[AnotherNode|_], _} -> %% Subsequent node in cluster, catch up ensure_version_ok( - rpc:call(AnotherNode, rabbit_version, read, [])), + rpc:call(AnotherNode, rabbit_version, recorded, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), ok = wait_for_replicated_tables(), @@ -457,7 +457,7 @@ init_db(ClusterNodes, Force) -> %% If we're just starting up a new node we won't have %% a version version_not_available -> - ok = rabbit_version:write_desired_version() + ok = rabbit_version:record_desired() end, ensure_schema_integrity() end; @@ -484,14 +484,14 @@ schema_ok_or_move() -> end. ensure_version_ok({ok, DiscVersion}) -> - DesiredVersion = rabbit_version:desired_version(), + DesiredVersion = rabbit_version:desired(), case rabbit_version:'=~='(DesiredVersion, DiscVersion) of true -> ok; false -> throw({error, {schema_mismatch, DesiredVersion, DiscVersion}}) end; ensure_version_ok({error, _}) -> - ok = rabbit_version:write_desired_version(). + ok = rabbit_version:record_desired(). create_schema() -> mnesia:stop(), @@ -501,7 +501,7 @@ create_schema() -> cannot_start_mnesia), ok = create_tables(), ok = ensure_schema_integrity(), - ok = rabbit_version:write_desired_version(). + ok = rabbit_version:record_desired(). move_db() -> mnesia:stop(), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 9347cc53..b4e1191e 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -141,13 +141,13 @@ upgrade_mode(AllNodes) -> end; [Another|_] -> ClusterVersion = - case rpc:call(Another, rabbit_version, desired_scope_version, + case rpc:call(Another, rabbit_version, desired_for_scope, [mnesia]) of {badrpc, {'EXIT', {undef, _}}} -> unknown_old_version; {badrpc, Reason} -> {unknown, Reason}; V -> V end, - MyVersion = rabbit_version:desired_scope_version(mnesia), + MyVersion = rabbit_version:desired_for_scope(mnesia), case rabbit_version:'=~='(ClusterVersion, MyVersion) of true -> %% The other node(s) have upgraded already, I am not the @@ -208,7 +208,7 @@ secondary_upgrade(AllNodes) -> end, rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ok = rabbit_mnesia:init_db(ClusterNodes, true), - ok = rabbit_version:write_desired_scope_version(mnesia), + ok = rabbit_version:record_desired_for_scope(mnesia), ok. nodes_running(Nodes) -> @@ -253,7 +253,7 @@ apply_upgrades(Scope, Upgrades, Fun) -> [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades], info("~s upgrades: All upgrades applied successfully~n", [Scope]), - ok = rabbit_version:write_desired_scope_version(Scope), + ok = rabbit_version:record_desired_for_scope(Scope), ok = rabbit_misc:recursive_delete([BackupDir]), info("~s upgrades: Mnesia backup removed~n", [Scope]), ok = file:delete(LockFile); diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index 2d7ba8e4..e079df4a 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -16,14 +16,14 @@ -module(rabbit_version). --export([read/0, '=~='/2, desired_version/0, desired_scope_version/1, - write_desired_version/0, write_desired_scope_version/1, +-export([recorded/0, '=~='/2, desired/0, desired_for_scope/1, + record_desired/0, record_desired_for_scope/1, upgrades_required/1]). %% ------------------------------------------------------------------- -ifdef(use_specs). --export_type([scope/0, step/0, scope_version/0]). +-export_type([scope/0, step/0]). -type(scope() :: atom()). -type(scope_version() :: [atom()]). @@ -31,12 +31,12 @@ -type(version() :: [atom()]). --spec(read/0 :: () -> rabbit_types:ok_or_error2(version(), any())). --spec('=~='/2 :: (version(), version()) -> boolean()). --spec(desired_version/0 :: () -> version()). --spec(desired_scope_version/1 :: (scope()) -> scope_version()). --spec(write_desired_version/0 :: () -> 'ok'). --spec(write_desired_scope_version/1 :: +-spec(recorded/0 :: () -> rabbit_types:ok_or_error2(version(), any())). +-spec('=~='/2 :: ([A], [A]) -> boolean()). +-spec(desired/0 :: () -> version()). +-spec(desired_for_scope/1 :: (scope()) -> scope_version()). +-spec(record_desired/0 :: () -> 'ok'). +-spec(record_desired_for_scope/1 :: (scope()) -> rabbit_types:ok_or_error(any())). -spec(upgrades_required/1 :: (scope()) -> rabbit_types:ok_or_error2([step()], any())). @@ -49,15 +49,15 @@ %% ------------------------------------------------------------------- -read() -> case rabbit_misc:read_term_file(schema_filename()) of - {ok, [V]} -> {ok, V}; - {error, _} = Err -> Err - end. +recorded() -> case rabbit_misc:read_term_file(schema_filename()) of + {ok, [V]} -> {ok, V}; + {error, _} = Err -> Err + end. -write(V) -> ok = rabbit_misc:write_term_file(schema_filename(), [V]). +record(V) -> ok = rabbit_misc:write_term_file(schema_filename(), [V]). -read_scope_version(Scope) -> - case read() of +recorded_for_scope(Scope) -> + case recorded() of {error, _} = Err -> Err; {ok, Version} -> @@ -67,14 +67,14 @@ read_scope_version(Scope) -> end} end. -write_scope_version(Scope, ScopeVersion) -> - case read() of +record_for_scope(Scope, ScopeVersion) -> + case recorded() of {error, _} = Err -> Err; {ok, Version} -> Version1 = lists:keystore(Scope, 1, categorise_by_scope(Version), {Scope, ScopeVersion}), - ok = write([Name || {_Scope, Names} <- Version1, Name <- Names]) + ok = record([Name || {_Scope, Names} <- Version1, Name <- Names]) end. %% ------------------------------------------------------------------- @@ -84,18 +84,17 @@ write_scope_version(Scope, ScopeVersion) -> %% ------------------------------------------------------------------- -desired_version() -> - [Name || Scope <- ?SCOPES, Name <- desired_scope_version(Scope)]. +desired() -> [Name || Scope <- ?SCOPES, Name <- desired_for_scope(Scope)]. -desired_scope_version(Scope) -> with_upgrade_graph(fun heads/1, Scope). +desired_for_scope(Scope) -> with_upgrade_graph(fun heads/1, Scope). -write_desired_version() -> write(desired_version()). +record_desired() -> record(desired()). -write_desired_scope_version(Scope) -> - write_scope_version(Scope, desired_scope_version(Scope)). +record_desired_for_scope(Scope) -> + record_for_scope(Scope, desired_for_scope(Scope)). upgrades_required(Scope) -> - case read_scope_version(Scope) of + case recorded_for_scope(Scope) of {error, enoent} -> {error, version_not_available}; {ok, CurrentHeads} -> -- cgit v1.2.1 From 349c24621ca359b5e6deac9d43ed8cefd0616152 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 17 Mar 2011 09:22:03 +0000 Subject: If we have version_not_available, then it makes sense to have version_mismatch, not schema_mismatch --- src/rabbit_mnesia.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c598fbb9..867da779 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -487,8 +487,7 @@ ensure_version_ok({ok, DiscVersion}) -> DesiredVersion = rabbit_version:desired(), case rabbit_version:'=~='(DesiredVersion, DiscVersion) of true -> ok; - false -> throw({error, {schema_mismatch, - DesiredVersion, DiscVersion}}) + false -> throw({error, {version_mismatch, DesiredVersion, DiscVersion}}) end; ensure_version_ok({error, _}) -> ok = rabbit_version:record_desired(). -- cgit v1.2.1 From 753447e36efb88eb1580a93c5331894d93d1621c Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 17 Mar 2011 14:41:03 +0000 Subject: Make sure we record if an exchange is actually deleted... --- src/rabbit_binding.erl | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 7ddb7814..1a9cbde1 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -335,12 +335,13 @@ maybe_auto_delete(XName, Bindings, Deletions) -> [] -> add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions); [X] -> - add_deletion(XName, {X, not_deleted, Bindings}, - case rabbit_exchange:maybe_auto_delete(X) of - not_deleted -> Deletions; - {deleted, Deletions1} -> combine_deletions( - Deletions, Deletions1) - end) + case rabbit_exchange:maybe_auto_delete(X) of + not_deleted -> + add_deletion(XName, {X, not_deleted, Bindings}, Deletions); + {deleted, Deletions1} -> + add_deletion(XName, {X, deleted, Bindings}, + combine_deletions(Deletions, Deletions1)) + end end. delete_forward_routes(Route) -> -- cgit v1.2.1 From 8449616cc83d64477ad4fc69921c1e227f7be3a1 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 17 Mar 2011 14:53:30 +0000 Subject: Make code prettier --- src/rabbit_binding.erl | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 1a9cbde1..6167790e 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -331,18 +331,18 @@ group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) -> group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed). maybe_auto_delete(XName, Bindings, Deletions) -> - case mnesia:read({rabbit_exchange, XName}) of - [] -> - add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions); - [X] -> - case rabbit_exchange:maybe_auto_delete(X) of - not_deleted -> - add_deletion(XName, {X, not_deleted, Bindings}, Deletions); - {deleted, Deletions1} -> - add_deletion(XName, {X, deleted, Bindings}, - combine_deletions(Deletions, Deletions1)) - end - end. + {Entry, Deletions1} = + case mnesia:read({rabbit_exchange, XName}) of + [] -> {{undefined, not_deleted, Bindings}, Deletions}; + [X] -> case rabbit_exchange:maybe_auto_delete(X) of + not_deleted -> + {{X, not_deleted, Bindings}, Deletions}; + {deleted, Deletions2} -> + {{X, deleted, Bindings}, + combine_deletions(Deletions, Deletions2)} + end + end, + add_deletion(XName, Entry, Deletions1). delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), -- cgit v1.2.1 From d350c5ce08dff4f0ff64d5d294c00b0866932121 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 17 Mar 2011 16:26:25 +0000 Subject: rabbit:stop() is not always called when rabbit is stopping. E.g. q(). doesn't invoke rabbit:stop/0. It does invoke rabbit:stop/1 though. --- src/rabbit.erl | 2 +- src/rabbit_mnesia.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit.erl b/src/rabbit.erl index b1d88a52..5f88b997 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -204,7 +204,6 @@ start() -> end. stop() -> - ok = rabbit_mnesia:record_running_disc_nodes(), ok = rabbit_misc:stop_applications(?APPS). stop_and_halt() -> @@ -246,6 +245,7 @@ start(normal, []) -> end. stop(_State) -> + ok = rabbit_mnesia:record_running_disc_nodes(), terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 867da779..4d3267a2 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -378,7 +378,7 @@ delete_cluster_nodes_config() -> end. running_nodes_filename() -> - dir() ++ "/nodes_running_at_shutdown". + filename:join(dir(), "nodes_running_at_shutdown"). record_running_disc_nodes() -> FileName = running_nodes_filename(), -- cgit v1.2.1 From 544081a948a2ecc2e114dfb81aaf268cf10d966d Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 17 Mar 2011 16:37:54 +0000 Subject: Improve symmetry: if we write the running_disc_nodes on rabbit shutdown, we should nuke it on rabbit startup. This then means that the prelaunch thingy is always run with the previously_running_disc_nodes file present. I believe this makes no semantic changes, but the improved symmetry is worth having --- src/rabbit.erl | 1 + src/rabbit_upgrade.erl | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit.erl b/src/rabbit.erl index 5f88b997..1361d0f4 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -233,6 +233,7 @@ rotate_logs(BinarySuffix) -> start(normal, []) -> case erts_version_check() of ok -> + ok = rabbit_mnesia:delete_previously_running_disc_nodes(), {ok, SupPid} = rabbit_sup:start_link(), true = register(rabbit, self()), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index b4e1191e..20f53da2 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -112,8 +112,7 @@ maybe_upgrade_mnesia() -> primary -> primary_upgrade(Upgrades, AllNodes); secondary -> secondary_upgrade(AllNodes) end - end, - ok = rabbit_mnesia:delete_previously_running_disc_nodes(). + end. upgrade_mode(AllNodes) -> case nodes_running(AllNodes) of -- cgit v1.2.1 From b13bd327e6d58bfe4fdeb8f8c14f666f493fbe54 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 17 Mar 2011 16:53:15 +0000 Subject: Can't call =~= with non-version args, thus shuffle things around a bit. End up saving a line. --- src/rabbit_upgrade.erl | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 20f53da2..875d971a 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -139,24 +139,23 @@ upgrade_mode(AllNodes) -> []) end; [Another|_] -> - ClusterVersion = - case rpc:call(Another, rabbit_version, desired_for_scope, - [mnesia]) of - {badrpc, {'EXIT', {undef, _}}} -> unknown_old_version; - {badrpc, Reason} -> {unknown, Reason}; - V -> V - end, MyVersion = rabbit_version:desired_for_scope(mnesia), - case rabbit_version:'=~='(ClusterVersion, MyVersion) of - true -> - %% The other node(s) have upgraded already, I am not the - %% upgrader - secondary; - false -> - %% The other node(s) are running an unexpected version. - die("Cluster upgrade needed but other nodes are " - "running ~p~nand I want ~p", - [ClusterVersion, MyVersion]) + ErrFun = fun (ClusterVersion) -> + %% The other node(s) are running an + %% unexpected version. + die("Cluster upgrade needed but other nodes are " + "running ~p~nand I want ~p", + [ClusterVersion, MyVersion]) + end, + case rpc:call(Another, rabbit_version, desired_for_scope, + [mnesia]) of + {badrpc, {'EXIT', {undef, _}}} -> ErrFun(unknown_old_version); + {badrpc, Reason} -> ErrFun({unknown, Reason}); + CV -> case rabbit_version:'=~='( + MyVersion, CV) of + true -> secondary; + false -> ErrFun(CV) + end end end. -- cgit v1.2.1 From 3ae4322d27ee90b19d774418c43fd7e8a0b75ac4 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 18 Mar 2011 12:15:51 +0000 Subject: Ensure mnesia is stopped for the local upgrade backup. --- src/rabbit_upgrade.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index b4e1191e..2c31e602 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -182,7 +182,6 @@ primary_upgrade(Upgrades, Nodes) -> mnesia, Upgrades, fun () -> - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), force_tables(), case Others of [] -> ok; @@ -227,7 +226,8 @@ maybe_upgrade_local() -> {error, version_not_available} -> version_not_available; {error, _} = Err -> throw(Err); {ok, []} -> ok; - {ok, Upgrades} -> apply_upgrades(local, Upgrades, + {ok, Upgrades} -> mnesia:stop(), + apply_upgrades(local, Upgrades, fun () -> ok end) end. @@ -249,6 +249,7 @@ apply_upgrades(Scope, Upgrades, Fun) -> ok = file:delete(lock_filename(BackupDir)), info("~s upgrades: Mnesia dir backed up to ~p~n", [Scope, BackupDir]), + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), Fun(), [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades], info("~s upgrades: All upgrades applied successfully~n", -- cgit v1.2.1 From 82ea108bc5c4f17283f0b0080f7dfcf9baea123d Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 18 Mar 2011 13:38:20 +0000 Subject: Take a single backup before any upgrade, remove it when we're all clear. --- src/rabbit_upgrade.erl | 106 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 69 insertions(+), 37 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index c061cd49..d56b50b2 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -16,7 +16,8 @@ -module(rabbit_upgrade). --export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]). +-export([maybe_backup/0, maybe_upgrade_mnesia/0, maybe_upgrade_local/0, + maybe_remove_backup/0]). -include("rabbit.hrl"). @@ -27,8 +28,10 @@ -ifdef(use_specs). +-spec(maybe_backup/0 :: () -> 'ok'). -spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). -spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available'). +-spec(maybe_remove_backup/0 :: () -> 'ok'). -endif. @@ -91,11 +94,66 @@ %% ------------------------------------------------------------------- +maybe_backup() -> + case backup_required() of + true -> backup(); + _ -> ok + end. + +backup() -> + rabbit:prepare(), %% Ensure we have logs for this + LockFile = lock_filename(dir()), + case rabbit_misc:lock_file(LockFile) of + ok -> + BackupDir = backup_dir(), + case rabbit_mnesia:copy_db(BackupDir) of + ok -> + %% We need to make the backup after creating the + %% lock file so that it protects us from trying to + %% overwrite the backup. Unfortunately this means + %% the lock file exists in the backup too, which + %% is not intuitive. Remove it. + ok = file:delete(lock_filename(BackupDir)), + info("upgrades: Mnesia dir backed up to ~p~n", [BackupDir]); + {error, E} -> + %% If we can't backup, the upgrade hasn't started + %% hence we don't need the lockfile since the real + %% mnesia dir is the good one. + ok = file:delete(LockFile), + throw({could_not_back_up_mnesia_dir, E}) + end; + {error, eexist} -> + throw({error, previous_upgrade_failed}) + end. + + +maybe_remove_backup() -> + case file:read_file_info(backup_dir()) of + {ok, _} -> remove_backup(); + _ -> ok + end. + +remove_backup() -> + LockFile = lock_filename(dir()), + BackupDir = backup_dir(), + ok = rabbit_misc:recursive_delete([BackupDir]), + info("upgrades: Mnesia backup removed~n", []), + ok = file:delete(LockFile). + +backup_required() -> + case {rabbit_version:upgrades_required(mnesia), + rabbit_version:upgrades_required(local)} of + {{ok, []}, {ok, []}} -> false; + {_, {ok, _}} -> true; + {{ok, _}, _} -> true; + _ -> false + end. + maybe_upgrade_mnesia() -> + maybe_backup(), AllNodes = rabbit_mnesia:all_clustered_nodes(), case rabbit_version:upgrades_required(mnesia) of {error, version_not_available} -> - rabbit:prepare(), %% Ensure we have logs for this case AllNodes of [_] -> ok; _ -> die("Cluster upgrade needed but upgrading from " @@ -227,45 +285,18 @@ maybe_upgrade_local() -> {ok, Upgrades} -> mnesia:stop(), apply_upgrades(local, Upgrades, fun () -> ok end) - end. + end, + maybe_remove_backup(). %% ------------------------------------------------------------------- apply_upgrades(Scope, Upgrades, Fun) -> - LockFile = lock_filename(dir()), - case rabbit_misc:lock_file(LockFile) of - ok -> - BackupDir = dir() ++ "-upgrade-backup", - info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]), - case rabbit_mnesia:copy_db(BackupDir) of - ok -> - %% We need to make the backup after creating the - %% lock file so that it protects us from trying to - %% overwrite the backup. Unfortunately this means - %% the lock file exists in the backup too, which - %% is not intuitive. Remove it. - ok = file:delete(lock_filename(BackupDir)), - info("~s upgrades: Mnesia dir backed up to ~p~n", - [Scope, BackupDir]), - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - Fun(), - [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades], - info("~s upgrades: All upgrades applied successfully~n", - [Scope]), - ok = rabbit_version:record_desired_for_scope(Scope), - ok = rabbit_misc:recursive_delete([BackupDir]), - info("~s upgrades: Mnesia backup removed~n", [Scope]), - ok = file:delete(LockFile); - {error, E} -> - %% If we can't backup, the upgrade hasn't started - %% hence we don't need the lockfile since the real - %% mnesia dir is the good one. - ok = file:delete(LockFile), - throw({could_not_back_up_mnesia_dir, E}) - end; - {error, eexist} -> - throw({error, previous_upgrade_failed}) - end. + info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]), + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + Fun(), + [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades], + info("~s upgrades: All upgrades applied successfully~n", [Scope]), + ok = rabbit_version:record_desired_for_scope(Scope). apply_upgrade(Scope, {M, F}) -> info("~s upgrades: Applying ~w:~w~n", [Scope, M, F]), @@ -276,6 +307,7 @@ apply_upgrade(Scope, {M, F}) -> dir() -> rabbit_mnesia:dir(). lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). +backup_dir() -> dir() ++ "-upgrade-backup". %% NB: we cannot use rabbit_log here since it may not have been %% started yet -- cgit v1.2.1 From 5f295dc115d1d93428377051530f79ca26064c20 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 18 Mar 2011 17:57:04 +0000 Subject: Well I thought =~= was beautiful and appropriately approximate to == --- src/rabbit_mnesia.erl | 2 +- src/rabbit_upgrade.erl | 2 +- src/rabbit_version.erl | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 4d3267a2..869f09a1 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -485,7 +485,7 @@ schema_ok_or_move() -> ensure_version_ok({ok, DiscVersion}) -> DesiredVersion = rabbit_version:desired(), - case rabbit_version:'=~='(DesiredVersion, DiscVersion) of + case rabbit_version:matches(DesiredVersion, DiscVersion) of true -> ok; false -> throw({error, {version_mismatch, DesiredVersion, DiscVersion}}) end; diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index d56b50b2..866f20ee 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -209,7 +209,7 @@ upgrade_mode(AllNodes) -> [mnesia]) of {badrpc, {'EXIT', {undef, _}}} -> ErrFun(unknown_old_version); {badrpc, Reason} -> ErrFun({unknown, Reason}); - CV -> case rabbit_version:'=~='( + CV -> case rabbit_version:matches( MyVersion, CV) of true -> secondary; false -> ErrFun(CV) diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index e079df4a..400abc10 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -16,7 +16,7 @@ -module(rabbit_version). --export([recorded/0, '=~='/2, desired/0, desired_for_scope/1, +-export([recorded/0, matches/2, desired/0, desired_for_scope/1, record_desired/0, record_desired_for_scope/1, upgrades_required/1]). @@ -32,7 +32,7 @@ -type(version() :: [atom()]). -spec(recorded/0 :: () -> rabbit_types:ok_or_error2(version(), any())). --spec('=~='/2 :: ([A], [A]) -> boolean()). +-spec(matches/2 :: ([A], [A]) -> boolean()). -spec(desired/0 :: () -> version()). -spec(desired_for_scope/1 :: (scope()) -> scope_version()). -spec(record_desired/0 :: () -> 'ok'). @@ -79,7 +79,7 @@ record_for_scope(Scope, ScopeVersion) -> %% ------------------------------------------------------------------- -'=~='(VerA, VerB) -> +matches(VerA, VerB) -> lists:usort(VerA) =:= lists:usort(VerB). %% ------------------------------------------------------------------- -- cgit v1.2.1 From 50f18ad821cf3e68d9fa9c67eaa2f72106b4aa84 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 21 Mar 2011 14:00:31 +0000 Subject: Various QA tidyups, and stop exporting the backup / remove backup functions. --- src/rabbit_upgrade.erl | 40 ++++++++++++++++++---------------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index d56b50b2..09530f38 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -16,8 +16,7 @@ -module(rabbit_upgrade). --export([maybe_backup/0, maybe_upgrade_mnesia/0, maybe_upgrade_local/0, - maybe_remove_backup/0]). +-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]). -include("rabbit.hrl"). @@ -28,10 +27,8 @@ -ifdef(use_specs). --spec(maybe_backup/0 :: () -> 'ok'). -spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). -spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available'). --spec(maybe_remove_backup/0 :: () -> 'ok'). -endif. @@ -94,13 +91,13 @@ %% ------------------------------------------------------------------- -maybe_backup() -> +maybe_take_backup() -> case backup_required() of - true -> backup(); + true -> take_backup(); _ -> ok end. -backup() -> +take_backup() -> rabbit:prepare(), %% Ensure we have logs for this LockFile = lock_filename(dir()), case rabbit_misc:lock_file(LockFile) of @@ -128,17 +125,15 @@ backup() -> maybe_remove_backup() -> - case file:read_file_info(backup_dir()) of + case filelib:is_dir(backup_dir()) of {ok, _} -> remove_backup(); _ -> ok end. remove_backup() -> - LockFile = lock_filename(dir()), - BackupDir = backup_dir(), - ok = rabbit_misc:recursive_delete([BackupDir]), + ok = rabbit_misc:recursive_delete([backup_dir()]), info("upgrades: Mnesia backup removed~n", []), - ok = file:delete(LockFile). + ok = file:delete(lock_filename(dir())). backup_required() -> case {rabbit_version:upgrades_required(mnesia), @@ -150,7 +145,7 @@ backup_required() -> end. maybe_upgrade_mnesia() -> - maybe_backup(), + maybe_take_backup(), AllNodes = rabbit_mnesia:all_clustered_nodes(), case rabbit_version:upgrades_required(mnesia) of {error, version_not_available} -> @@ -278,15 +273,16 @@ node_running(Node) -> %% ------------------------------------------------------------------- maybe_upgrade_local() -> - case rabbit_version:upgrades_required(local) of - {error, version_not_available} -> version_not_available; - {error, _} = Err -> throw(Err); - {ok, []} -> ok; - {ok, Upgrades} -> mnesia:stop(), - apply_upgrades(local, Upgrades, - fun () -> ok end) - end, - maybe_remove_backup(). + Res = case rabbit_version:upgrades_required(local) of + {error, version_not_available} -> version_not_available; + {error, _} = Err -> throw(Err); + {ok, []} -> ok; + {ok, Upgrades} -> mnesia:stop(), + apply_upgrades(local, Upgrades, + fun () -> ok end) + end, + maybe_remove_backup(), + Res. %% ------------------------------------------------------------------- -- cgit v1.2.1 From b38be006e69e96cdd2e81929b874cd43bad0b9f0 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 21 Mar 2011 14:22:51 +0000 Subject: maybe_remove_backup is safe when returning version_not_available since we would not have taken a backup in the first place. However, this is not exactly obvious, so let's not do that. --- src/rabbit_upgrade.erl | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index b9c7b8dc..73c9ee2b 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -273,16 +273,15 @@ node_running(Node) -> %% ------------------------------------------------------------------- maybe_upgrade_local() -> - Res = case rabbit_version:upgrades_required(local) of - {error, version_not_available} -> version_not_available; - {error, _} = Err -> throw(Err); - {ok, []} -> ok; - {ok, Upgrades} -> mnesia:stop(), - apply_upgrades(local, Upgrades, - fun () -> ok end) - end, - maybe_remove_backup(), - Res. + case rabbit_version:upgrades_required(local) of + {error, version_not_available} -> version_not_available; + {error, _} = Err -> throw(Err); + {ok, []} -> maybe_remove_backup(); + {ok, Upgrades} -> mnesia:stop(), + apply_upgrades(local, Upgrades, + fun () -> ok end), + maybe_remove_backup() + end. %% ------------------------------------------------------------------- -- cgit v1.2.1 From 7f13bc65ab2ea9a1c712990781a80f225c2188e9 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 21 Mar 2011 14:28:34 +0000 Subject: Oops. --- src/rabbit_upgrade.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 73c9ee2b..e84e1f7b 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -126,8 +126,8 @@ take_backup() -> maybe_remove_backup() -> case filelib:is_dir(backup_dir()) of - {ok, _} -> remove_backup(); - _ -> ok + true -> ok = remove_backup(); + _ -> ok end. remove_backup() -> -- cgit v1.2.1 From 18c265d38bd490cd421ce05b29551e65b5b82747 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 21 Mar 2011 16:42:36 +0000 Subject: Don't try to determine whether a backup is needed before doing anything, take it as needed. This inverts the backup and the lock file - the backup now comes first and the lock file is only used to defend apply_upgrades/3. --- src/rabbit_upgrade.erl | 58 ++++++++++++++++---------------------------------- 1 file changed, 18 insertions(+), 40 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index e84e1f7b..0a7e4a37 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -91,39 +91,24 @@ %% ------------------------------------------------------------------- -maybe_take_backup() -> - case backup_required() of - true -> take_backup(); - _ -> ok +ensure_backup() -> + case filelib:is_file(lock_filename()) of + false -> case filelib:is_dir(backup_dir()) of + false -> ok = take_backup(); + _ -> ok + end; + true -> throw({error, previous_upgrade_failed}) end. take_backup() -> rabbit:prepare(), %% Ensure we have logs for this - LockFile = lock_filename(dir()), - case rabbit_misc:lock_file(LockFile) of - ok -> - BackupDir = backup_dir(), - case rabbit_mnesia:copy_db(BackupDir) of - ok -> - %% We need to make the backup after creating the - %% lock file so that it protects us from trying to - %% overwrite the backup. Unfortunately this means - %% the lock file exists in the backup too, which - %% is not intuitive. Remove it. - ok = file:delete(lock_filename(BackupDir)), - info("upgrades: Mnesia dir backed up to ~p~n", [BackupDir]); - {error, E} -> - %% If we can't backup, the upgrade hasn't started - %% hence we don't need the lockfile since the real - %% mnesia dir is the good one. - ok = file:delete(LockFile), - throw({could_not_back_up_mnesia_dir, E}) - end; - {error, eexist} -> - throw({error, previous_upgrade_failed}) + BackupDir = backup_dir(), + case rabbit_mnesia:copy_db(BackupDir) of + ok -> info("upgrades: Mnesia dir backed up to ~p~n", + [BackupDir]); + {error, E} -> throw({could_not_back_up_mnesia_dir, E}) end. - maybe_remove_backup() -> case filelib:is_dir(backup_dir()) of true -> ok = remove_backup(); @@ -132,20 +117,9 @@ maybe_remove_backup() -> remove_backup() -> ok = rabbit_misc:recursive_delete([backup_dir()]), - info("upgrades: Mnesia backup removed~n", []), - ok = file:delete(lock_filename(dir())). - -backup_required() -> - case {rabbit_version:upgrades_required(mnesia), - rabbit_version:upgrades_required(local)} of - {{ok, []}, {ok, []}} -> false; - {_, {ok, _}} -> true; - {{ok, _}, _} -> true; - _ -> false - end. + info("upgrades: Mnesia backup removed~n", []). maybe_upgrade_mnesia() -> - maybe_take_backup(), AllNodes = rabbit_mnesia:all_clustered_nodes(), case rabbit_version:upgrades_required(mnesia) of {error, version_not_available} -> @@ -286,12 +260,15 @@ maybe_upgrade_local() -> %% ------------------------------------------------------------------- apply_upgrades(Scope, Upgrades, Fun) -> + ensure_backup(), + ok = rabbit_misc:lock_file(lock_filename()), info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), Fun(), [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades], info("~s upgrades: All upgrades applied successfully~n", [Scope]), - ok = rabbit_version:record_desired_for_scope(Scope). + ok = rabbit_version:record_desired_for_scope(Scope), + ok = file:delete(lock_filename()). apply_upgrade(Scope, {M, F}) -> info("~s upgrades: Applying ~w:~w~n", [Scope, M, F]), @@ -301,6 +278,7 @@ apply_upgrade(Scope, {M, F}) -> dir() -> rabbit_mnesia:dir(). +lock_filename() -> lock_filename(dir()). lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). backup_dir() -> dir() ++ "-upgrade-backup". -- cgit v1.2.1 From e90061b37554d9acc9601ccdc64fb80cf5141901 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 21 Mar 2011 16:51:34 +0000 Subject: When upgrading a secondary node we call init_db twice: once early to force a cluster rejoin (at which point we are not ready to do local upgrades, e.g. fhc is not running) and then once at the regular time. Deal with that. --- src/rabbit_mnesia.erl | 29 +++++++++++++++++------------ src/rabbit_upgrade.erl | 2 +- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 869f09a1..c1f8a22f 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -18,7 +18,7 @@ -module(rabbit_mnesia). -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, - cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/2, + cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, create_cluster_nodes_config/1, read_cluster_nodes_config/0, @@ -45,7 +45,7 @@ -spec(dir/0 :: () -> file:filename()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). --spec(init_db/2 :: ([node()], boolean()) -> 'ok'). +-spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). -spec(cluster/1 :: ([node()]) -> 'ok'). -spec(force_cluster/1 :: ([node()]) -> 'ok'). @@ -90,7 +90,7 @@ status() -> init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), - ok = init_db(read_cluster_nodes_config(), true), + ok = init_db(read_cluster_nodes_config(), true, true), ok. is_db_empty() -> @@ -112,7 +112,7 @@ cluster(ClusterNodes, Force) -> ok = ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try - ok = init_db(ClusterNodes, Force), + ok = init_db(ClusterNodes, Force, true), ok = create_cluster_nodes_config(ClusterNodes) after mnesia:stop() @@ -413,7 +413,7 @@ delete_previously_running_disc_nodes() -> %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. If Force is false, don't allow %% connections to offline nodes. -init_db(ClusterNodes, Force) -> +init_db(ClusterNodes, Force, DoLocalUpgrades) -> UClusterNodes = lists:usort(ClusterNodes), ProperClusterNodes = UClusterNodes -- [node()], case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of @@ -451,13 +451,18 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), - case rabbit_upgrade:maybe_upgrade_local() of - ok -> - ok; - %% If we're just starting up a new node we won't have - %% a version - version_not_available -> - ok = rabbit_version:record_desired() + case DoLocalUpgrades of + true -> + case rabbit_upgrade:maybe_upgrade_local() of + ok -> + ok; + %% If we're just starting up a new + %% node we won't have a version + version_not_available -> + ok = rabbit_version:record_desired() + end; + false -> + ok end, ensure_schema_integrity() end; diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 0a7e4a37..6959208b 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -231,7 +231,7 @@ secondary_upgrade(AllNodes) -> false -> AllNodes -- [node()] end, rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - ok = rabbit_mnesia:init_db(ClusterNodes, true), + ok = rabbit_mnesia:init_db(ClusterNodes, true, false), ok = rabbit_version:record_desired_for_scope(mnesia), ok. -- cgit v1.2.1 From 49025c80d9eb23f59615f6a92522d48aee5bbd3a Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 21 Mar 2011 16:54:09 +0000 Subject: Better name, vertical space. --- src/rabbit_mnesia.erl | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c1f8a22f..47df1148 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -413,7 +413,7 @@ delete_previously_running_disc_nodes() -> %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. If Force is false, don't allow %% connections to offline nodes. -init_db(ClusterNodes, Force, DoLocalUpgrades) -> +init_db(ClusterNodes, Force, DoSecondaryLocalUpgrades) -> UClusterNodes = lists:usort(ClusterNodes), ProperClusterNodes = UClusterNodes -- [node()], case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of @@ -451,18 +451,16 @@ init_db(ClusterNodes, Force, DoLocalUpgrades) -> true -> disc; false -> ram end), - case DoLocalUpgrades of - true -> - case rabbit_upgrade:maybe_upgrade_local() of - ok -> - ok; - %% If we're just starting up a new - %% node we won't have a version - version_not_available -> - ok = rabbit_version:record_desired() - end; - false -> - ok + case DoSecondaryLocalUpgrades of + true -> case rabbit_upgrade:maybe_upgrade_local() of + ok -> + ok; + %% If we're just starting up a new + %% node we won't have a version + version_not_available -> + ok = rabbit_version:record_desired() + end; + false -> ok end, ensure_schema_integrity() end; -- cgit v1.2.1 From f1d46d7b616b8cb325ff4f6e7f02569fc0e9f5f7 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 21 Mar 2011 17:56:34 +0000 Subject: Add test --- src/gm_speed_test.erl | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 src/gm_speed_test.erl diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl new file mode 100644 index 00000000..defb0f29 --- /dev/null +++ b/src/gm_speed_test.erl @@ -0,0 +1,82 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(gm_speed_test). + +-export([test/3]). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). +-export([wile_e_coyote/2]). + +-behaviour(gm). + +-include("gm_specs.hrl"). + +%% callbacks + +joined(Owner, _Members) -> + Owner ! joined, + ok. + +members_changed(_Owner, _Births, _Deaths) -> + ok. + +handle_msg(Owner, _From, ping) -> + Owner ! ping, + ok. + +terminate(Owner, _Reason) -> + Owner ! terminated, + ok. + +%% other + +wile_e_coyote(Time, WriteUnit) -> + {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()), + receive joined -> ok end, + timer:sleep(1000), %% wait for all to join + timer:send_after(Time, stop), + Start = now(), + {Sent, Received} = loop(Pid, WriteUnit, 0, 0), + End = now(), + ok = gm:leave(Pid), + receive terminated -> ok end, + Elapsed = timer:now_diff(End, Start) / 1000000, + io:format("Sending rate: ~p msgs/sec~nReceiving rate: ~p msgs/sec~n~n", + [Sent/Elapsed, Received/Elapsed]), + ok. + +loop(Pid, WriteUnit, Sent, Received) -> + case read(Received) of + {stop, Received1} -> {Sent, Received1}; + {ok, Received1} -> ok = write(Pid, WriteUnit), + loop(Pid, WriteUnit, Sent + WriteUnit, Received1) + end. + +read(Count) -> + receive + ping -> read(Count + 1); + stop -> {stop, Count} + after 5 -> + {ok, Count} + end. + +write(_Pid, 0) -> ok; +write(Pid, N) -> ok = gm:broadcast(Pid, ping), + write(Pid, N - 1). + +test(Time, WriteUnit, Nodes) -> + ok = gm:create_tables(), + [spawn(Node, ?MODULE, wile_e_coyote, [Time, WriteUnit]) || Node <- Nodes]. -- cgit v1.2.1 From 0cca73f99636dd92c176a8caa54014651f58e25f Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 21 Mar 2011 17:57:54 +0000 Subject: Introduce batching (again - same diff as 5f7d8d07f94f) --- src/gm.erl | 134 ++++++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 92 insertions(+), 42 deletions(-) diff --git a/src/gm.erl b/src/gm.erl index 8cf22581..5b3623cf 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -376,15 +376,16 @@ confirmed_broadcast/2, group_members/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_info/2]). + code_change/3, prioritise_cast/2, prioritise_info/2]). -export([behaviour_info/1]). --export([table_definitions/0]). +-export([table_definitions/0, flush/1]). -define(GROUP_TABLE, gm_group). -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-define(BROADCAST_TIMER, 25). -define(SETS, ordsets). -define(DICT, orddict). @@ -398,7 +399,9 @@ pub_count, members_state, callback_args, - confirms + confirms, + broadcast_buffer, + broadcast_timer }). -record(gm_group, { name, version, members }). @@ -508,21 +511,26 @@ confirmed_broadcast(Server, Msg) -> group_members(Server) -> gen_server2:call(Server, group_members, infinity). +flush(Server) -> + gen_server2:cast(Server, flush). + init([GroupName, Module, Args]) -> random:seed(now()), gen_server2:cast(self(), join), Self = self(), - {ok, #state { self = Self, - left = {Self, undefined}, - right = {Self, undefined}, - group_name = GroupName, - module = Module, - view = undefined, - pub_count = 0, - members_state = undefined, - callback_args = Args, - confirms = queue:new() }, hibernate, + {ok, #state { self = Self, + left = {Self, undefined}, + right = {Self, undefined}, + group_name = GroupName, + module = Module, + view = undefined, + pub_count = 0, + members_state = undefined, + callback_args = Args, + confirms = queue:new(), + broadcast_buffer = [], + broadcast_timer = undefined }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -620,7 +628,11 @@ handle_cast(join, State = #state { self = Self, {Module:joined(Args, all_known_members(View)), State1}); handle_cast(leave, State) -> - {stop, normal, State}. + {stop, normal, State}; + +handle_cast(flush, State) -> + noreply( + flush_broadcast_buffer(State #state { broadcast_timer = undefined })). handle_info({'DOWN', MRef, process, _Pid, _Reason}, @@ -662,14 +674,17 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason}, end. -terminate(Reason, #state { module = Module, - callback_args = Args }) -> +terminate(Reason, State = #state { module = Module, + callback_args = Args }) -> + flush_broadcast_buffer(State), Module:terminate(Args, Reason). code_change(_OldVsn, State, _Extra) -> {ok, State}. +prioritise_cast(flush, _State) -> 1; +prioritise_cast(_ , _State) -> 0. prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1; prioritise_info(_ , _State) -> 0. @@ -782,33 +797,62 @@ handle_msg({activity, _NotLeft, _Activity}, State) -> noreply(State) -> - {noreply, State, hibernate}. + {noreply, ensure_broadcast_timer(State), hibernate}. reply(Reply, State) -> - {reply, Reply, State, hibernate}. - -internal_broadcast(Msg, From, State = #state { self = Self, - pub_count = PubCount, - members_state = MembersState, - module = Module, - confirms = Confirms, - callback_args = Args }) -> - PubMsg = {PubCount, Msg}, - Activity = activity_cons(Self, [PubMsg], [], activity_nil()), - ok = maybe_send_activity(activity_finalise(Activity), State), - MembersState1 = - with_member( - fun (Member = #member { pending_ack = PA }) -> - Member #member { pending_ack = queue:in(PubMsg, PA) } - end, Self, MembersState), + {reply, Reply, ensure_broadcast_timer(State), hibernate}. + +ensure_broadcast_timer(State = #state { broadcast_buffer = [], + broadcast_timer = undefined }) -> + State; +ensure_broadcast_timer(State = #state { broadcast_buffer = [], + broadcast_timer = TRef }) -> + timer:cancel(TRef), + State #state { broadcast_timer = undefined }; +ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> + {ok, TRef} = timer:apply_after(?BROADCAST_TIMER, ?MODULE, flush, [self()]), + State #state { broadcast_timer = TRef }; +ensure_broadcast_timer(State) -> + State. + +internal_broadcast(Msg, From, State = #state { self = Self, + pub_count = PubCount, + module = Module, + confirms = Confirms, + callback_args = Args, + broadcast_buffer = Buffer }) -> + Result = Module:handle_msg(Args, Self, Msg), + Buffer1 = [{PubCount, Msg} | Buffer], Confirms1 = case From of none -> Confirms; _ -> queue:in({PubCount, From}, Confirms) end, - handle_callback_result({Module:handle_msg(Args, Self, Msg), - State #state { pub_count = PubCount + 1, - members_state = MembersState1, - confirms = Confirms1 }}). + State1 = State #state { pub_count = PubCount + 1, + confirms = Confirms1, + broadcast_buffer = Buffer1 }, + case From =/= none of + true -> + handle_callback_result({Result, flush_broadcast_buffer(State1)}); + false -> + handle_callback_result( + {Result, State1 #state { broadcast_buffer = Buffer1 }}) + end. + +flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) -> + State; +flush_broadcast_buffer(State = #state { self = Self, + members_state = MembersState, + broadcast_buffer = Buffer }) -> + Pubs = lists:reverse(Buffer), + Activity = activity_cons(Self, Pubs, [], activity_nil()), + ok = maybe_send_activity(activity_finalise(Activity), State), + MembersState1 = with_member( + fun (Member = #member { pending_ack = PA }) -> + PA1 = queue:join(PA, queue:from_list(Pubs)), + Member #member { pending_ack = PA1 } + end, Self, MembersState), + State #state { members_state = MembersState1, + broadcast_buffer = [] }. %% --------------------------------------------------------------------------- @@ -1093,16 +1137,22 @@ maybe_monitor(Self, Self) -> maybe_monitor(Other, _Self) -> erlang:monitor(process, Other). -check_neighbours(State = #state { self = Self, - left = Left, - right = Right, - view = View }) -> +check_neighbours(State = #state { self = Self, + left = Left, + right = Right, + view = View, + broadcast_buffer = Buffer }) -> #view_member { left = VLeft, right = VRight } = fetch_view_member(Self, View), Ver = view_version(View), Left1 = ensure_neighbour(Ver, Self, Left, VLeft), Right1 = ensure_neighbour(Ver, Self, Right, VRight), - State1 = State #state { left = Left1, right = Right1 }, + Buffer1 = case Right1 of + {Self, undefined} -> []; + _ -> Buffer + end, + State1 = State #state { left = Left1, right = Right1, + broadcast_buffer = Buffer1 }, ok = maybe_send_catchup(Right, State1), State1. -- cgit v1.2.1 From 2aeb64f3ce2bf0f0dec90e23b61578ead79781df Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 22 Mar 2011 12:39:04 +0000 Subject: clarify documentation (thanks Emile) --- src/rabbit_variable_queue.erl | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1b29756b..14c36b12 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -150,10 +150,13 @@ %% responsive. %% %% In the queue we keep track of both messages that are pending -%% delivery and messages that are pending acks. This ensures that -%% purging (deleting the former) and deletion (deleting the former and -%% the latter) are both cheap and do require any scanning through qi -%% segments. +%% delivery and messages that are pending acks. In the event of a +%% queue purge, we only need to load qi segments if the queue has +%% elements in deltas (i.e. it came under significant memory +%% pressure). In the event of a queue deletion, in addition to the +%% preceding, by keeping track of pending acks in RAM, we do not need +%% to search through qi segments looking for messages that are yet to +%% be acknowledged. %% %% Pending acks are recorded in memory either as the tuple {SeqId, %% MsgId, MsgProps} (tuple-form) or as the message itself (message- -- cgit v1.2.1 From 22a202104ae661bdda9ed87977d9f03e1df6f240 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 22 Mar 2011 12:39:48 +0000 Subject: Switch to erlang-nox. --- packaging/debs/Debian/debian/control | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index b01d38b3..45f5c5c4 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -7,7 +7,7 @@ Standards-Version: 3.8.0 Package: rabbitmq-server Architecture: all -Depends: erlang (>= 1:12.b.3), adduser, logrotate, ${misc:Depends} +Depends: erlang-nox (>= 1:12.b.3), adduser, logrotate, ${misc:Depends} Description: An AMQP server written in Erlang RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and -- cgit v1.2.1 From eeb1e5597036ba4464221aa934be00310df5668c Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 22 Mar 2011 15:02:33 +0000 Subject: Enforce a bunch of returns --- src/rabbit_msg_store.erl | 46 ++++++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 2b162f9d..bb26de64 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -850,16 +850,16 @@ terminate(_Reason, State = #msstate { index_state = IndexState, State1 = case CurHdl of undefined -> State; _ -> State2 = internal_sync(State), - file_handle_cache:close(CurHdl), + ok = file_handle_cache:close(CurHdl), State2 end, State3 = close_all_handles(State1), - store_file_summary(FileSummaryEts, Dir), - [ets:delete(T) || + ok = store_file_summary(FileSummaryEts, Dir), + [true = ets:delete(T) || T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]], IndexModule:terminate(IndexState), - store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, - {index_module, IndexModule}], Dir), + ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, + {index_module, IndexModule}], Dir), State3 #msstate { index_state = undefined, current_file_handle = undefined }. @@ -912,13 +912,16 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, false -> [{CRef, MsgIds} | NS] end end, [], CTM), - case {Syncs, CGs} of - {[], []} -> ok; - _ -> file_handle_cache:sync(CurHdl) - end, + ok = case {Syncs, CGs} of + {[], []} -> ok; + _ -> file_handle_cache:sync(CurHdl) + end, [K() || K <- lists:reverse(Syncs)], - [client_confirm(CRef, MsgIds, written, State1) || {CRef, MsgIds} <- CGs], - State1 #msstate { cref_to_msg_ids = dict:new(), on_sync = [] }. + State2 = lists:foldl( + fun ({CRef, MsgIds}, StateN) -> + client_confirm(CRef, MsgIds, written, StateN) + end, State1, CGs), + State2 #msstate { on_sync = [] }. write_action({true, not_found}, _MsgId, State) -> {ignore, undefined, State}; @@ -1466,7 +1469,7 @@ recover_file_summary(false, _Dir) -> recover_file_summary(true, Dir) -> Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME), case ets:file2tab(Path) of - {ok, Tid} -> file:delete(Path), + {ok, Tid} -> ok = file:delete(Path), {true, Tid}; {error, _Error} -> recover_file_summary(false, Dir) end. @@ -1533,9 +1536,7 @@ scan_file_for_valid_messages(Dir, FileName) -> Hdl, filelib:file_size( form_filename(Dir, FileName)), fun scan_fun/2, []), - %% if something really bad has happened, - %% the close could fail, but ignore - file_handle_cache:close(Hdl), + ok = file_handle_cache:close(Hdl), Valid; {error, enoent} -> {ok, [], 0}; {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} @@ -1971,32 +1972,33 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, force_recovery(BaseDir, Store) -> Dir = filename:join(BaseDir, atom_to_list(Store)), - file:delete(filename:join(Dir, ?CLEAN_FILENAME)), + ok = file:delete(filename:join(Dir, ?CLEAN_FILENAME)), recover_crashed_compactions(BaseDir), ok. foreach_file(D, Fun, Files) -> - [Fun(filename:join(D, File)) || File <- Files]. + [ok = Fun(filename:join(D, File)) || File <- Files]. foreach_file(D1, D2, Fun, Files) -> - [Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files]. + [ok = Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files]. transform_dir(BaseDir, Store, TransformFun) -> Dir = filename:join(BaseDir, atom_to_list(Store)), TmpDir = filename:join(Dir, ?TRANSFORM_TMP), TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end, + CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end, case filelib:is_dir(TmpDir) of true -> throw({error, transform_failed_previously}); false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION), foreach_file(Dir, TmpDir, TransformFile, FileList), foreach_file(Dir, fun file:delete/1, FileList), - foreach_file(TmpDir, Dir, fun file:copy/2, FileList), + foreach_file(TmpDir, Dir, CopyFile, FileList), foreach_file(TmpDir, fun file:delete/1, FileList), ok = file:del_dir(TmpDir) end. transform_msg_file(FileOld, FileNew, TransformFun) -> - rabbit_misc:ensure_parent_dirs_exist(FileNew), + ok = rabbit_misc:ensure_parent_dirs_exist(FileNew), {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []), {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write], [{write_buffer, @@ -2009,6 +2011,6 @@ transform_msg_file(FileOld, FileNew, TransformFun) -> {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew), ok end, ok), - file_handle_cache:close(RefOld), - file_handle_cache:close(RefNew), + ok = file_handle_cache:close(RefOld), + ok = file_handle_cache:close(RefNew), ok. -- cgit v1.2.1 From 5fd7264796fbe35dbc7562b1cfc7ef09c4a3f3fb Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 22 Mar 2011 17:44:39 +0000 Subject: Renaming bits and pieces for consistency and checking a few more return values, plus other minor fixes --- src/rabbit_mnesia.erl | 1 + src/rabbit_upgrade.erl | 39 ++++++++++++++++++++------------------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 47df1148..75e6eeed 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -528,6 +528,7 @@ move_db() -> ok. copy_db(Destination) -> + ok = ensure_mnesia_not_running(), rabbit_misc:recursive_copy(dir(), Destination). create_tables() -> diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 6959208b..39a42ef2 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -91,7 +91,7 @@ %% ------------------------------------------------------------------- -ensure_backup() -> +ensure_backup_taken() -> case filelib:is_file(lock_filename()) of false -> case filelib:is_dir(backup_dir()) of false -> ok = take_backup(); @@ -109,7 +109,7 @@ take_backup() -> {error, E} -> throw({could_not_back_up_mnesia_dir, E}) end. -maybe_remove_backup() -> +ensure_backup_removed() -> case filelib:is_dir(backup_dir()) of true -> ok = remove_backup(); _ -> ok @@ -135,6 +135,7 @@ maybe_upgrade_mnesia() -> ok; {ok, Upgrades} -> rabbit:prepare(), %% Ensure we have logs for this + ok = ensure_backup_taken(), case upgrade_mode(AllNodes) of primary -> primary_upgrade(Upgrades, AllNodes); secondary -> secondary_upgrade(AllNodes) @@ -203,18 +204,18 @@ die(Msg, Args) -> primary_upgrade(Upgrades, Nodes) -> Others = Nodes -- [node()], - apply_upgrades( - mnesia, - Upgrades, - fun () -> - force_tables(), - case Others of - [] -> ok; - _ -> info("mnesia upgrades: Breaking cluster~n", []), - [{atomic, ok} = mnesia:del_table_copy(schema, Node) - || Node <- Others] - end - end), + ok = apply_upgrades( + mnesia, + Upgrades, + fun () -> + force_tables(), + case Others of + [] -> ok; + _ -> info("mnesia upgrades: Breaking cluster~n", []), + [{atomic, ok} = mnesia:del_table_copy(schema, Node) + || Node <- Others] + end + end), ok. force_tables() -> @@ -250,17 +251,17 @@ maybe_upgrade_local() -> case rabbit_version:upgrades_required(local) of {error, version_not_available} -> version_not_available; {error, _} = Err -> throw(Err); - {ok, []} -> maybe_remove_backup(); + {ok, []} -> ok = ensure_backup_removed(); {ok, Upgrades} -> mnesia:stop(), - apply_upgrades(local, Upgrades, - fun () -> ok end), - maybe_remove_backup() + ok = ensure_backup_taken(), + ok = apply_upgrades(local, Upgrades, + fun () -> ok end), + ok = ensure_backup_removed() end. %% ------------------------------------------------------------------- apply_upgrades(Scope, Upgrades, Fun) -> - ensure_backup(), ok = rabbit_misc:lock_file(lock_filename()), info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), -- cgit v1.2.1 From 020f72fbafe0a7d62ced75093a01e2d5239ae7ab Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 22 Mar 2011 17:52:19 +0000 Subject: cosmetic(ish): no need to match the return of ensure_* --- src/rabbit_mnesia.erl | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 66436920..963d814e 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -78,8 +78,8 @@ status() -> {running_nodes, running_clustered_nodes()}]. init() -> - ok = ensure_mnesia_running(), - ok = ensure_mnesia_dir(), + ensure_mnesia_running(), + ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), ok. @@ -98,8 +98,8 @@ force_cluster(ClusterNodes) -> %% node. If Force is false, only connections to online nodes are %% allowed. cluster(ClusterNodes, Force) -> - ok = ensure_mnesia_not_running(), - ok = ensure_mnesia_dir(), + ensure_mnesia_not_running(), + ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try ok = init_db(ClusterNodes, Force), @@ -455,7 +455,7 @@ create_schema() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ok = create_tables(), - ok = ensure_schema_integrity(), + ensure_schema_integrity(), ok = rabbit_upgrade:write_version(). move_db() -> @@ -476,7 +476,7 @@ move_db() -> {error, Reason} -> throw({error, {cannot_backup_mnesia, MnesiaDir, BackupDir, Reason}}) end, - ok = ensure_mnesia_dir(), + ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ok. @@ -561,12 +561,12 @@ wait_for_tables(TableNames) -> end. reset(Force) -> - ok = ensure_mnesia_not_running(), + ensure_mnesia_not_running(), Node = node(), case Force of true -> ok; false -> - ok = ensure_mnesia_dir(), + ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), {Nodes, RunningNodes} = try -- cgit v1.2.1 From 60f50338ac0d19486a77ada8e3f7987a47449f25 Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Wed, 23 Mar 2011 10:10:31 +0000 Subject: 2.4.0 changelog entries for debian and fedora --- packaging/RPMS/Fedora/rabbitmq-server.spec | 3 +++ packaging/debs/Debian/debian/changelog | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index ae9b2059..45af770a 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -120,6 +120,9 @@ done rm -rf %{buildroot} %changelog +* Tue Mar 22 2011 Alexandru Scvortov 2.4.0-1 +- New Upstream Release + * Thu Feb 3 2011 simon@rabbitmq.com 2.3.1-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 12165dc0..2ca5074f 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (2.4.0-1) lucid; urgency=low + + * New Upstream Release + + -- Alexandru Scvortov Tue, 22 Mar 2011 17:34:31 +0000 + rabbitmq-server (2.3.1-1) lucid; urgency=low * New Upstream Release -- cgit v1.2.1 -- cgit v1.2.1 From 8b16025be7faf2a5a4d4e403d2150a97e03994be Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 23 Mar 2011 11:53:59 +0000 Subject: New decree is that you're not meant to match against ensure_stuff calls --- src/rabbit_mnesia.erl | 8 +++++--- src/rabbit_upgrade.erl | 10 ++++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index ff1b8c97..6ba9e60a 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -437,8 +437,9 @@ init_db(ClusterNodes, Force, DoSecondaryLocalUpgrades) -> %% We're the first node up case rabbit_upgrade:maybe_upgrade_local() of ok -> ensure_schema_integrity(); - version_not_available -> schema_ok_or_move() - end; + version_not_available -> ok = schema_ok_or_move() + end, + ok; {[AnotherNode|_], _} -> %% Subsequent node in cluster, catch up ensure_version_ok( @@ -462,7 +463,8 @@ init_db(ClusterNodes, Force, DoSecondaryLocalUpgrades) -> end; false -> ok end, - ensure_schema_integrity() + ensure_schema_integrity(), + ok end; {error, Reason} -> %% one reason we may end up here is if we try to join diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 39a42ef2..87a22363 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -135,7 +135,7 @@ maybe_upgrade_mnesia() -> ok; {ok, Upgrades} -> rabbit:prepare(), %% Ensure we have logs for this - ok = ensure_backup_taken(), + ensure_backup_taken(), case upgrade_mode(AllNodes) of primary -> primary_upgrade(Upgrades, AllNodes); secondary -> secondary_upgrade(AllNodes) @@ -251,12 +251,14 @@ maybe_upgrade_local() -> case rabbit_version:upgrades_required(local) of {error, version_not_available} -> version_not_available; {error, _} = Err -> throw(Err); - {ok, []} -> ok = ensure_backup_removed(); + {ok, []} -> ensure_backup_removed(), + ok; {ok, Upgrades} -> mnesia:stop(), - ok = ensure_backup_taken(), + ensure_backup_taken(), ok = apply_upgrades(local, Upgrades, fun () -> ok end), - ok = ensure_backup_removed() + ensure_backup_removed(), + ok end. %% ------------------------------------------------------------------- -- cgit v1.2.1 From 2a4f51d39b3f291a7cd7e8e9f084cee8386a8712 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 23 Mar 2011 12:11:01 +0000 Subject: sort out how often and when we do the rabbit:prepare (set up log handlers), and actually make that do the mnesia upgrade. --- src/rabbit.erl | 3 ++- src/rabbit_prelaunch.erl | 4 +--- src/rabbit_upgrade.erl | 10 ++++------ 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/src/rabbit.erl b/src/rabbit.erl index 1361d0f4..c7d0d905 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -192,7 +192,8 @@ %%---------------------------------------------------------------------------- prepare() -> - ok = ensure_working_log_handlers(). + ok = ensure_working_log_handlers(), + ok = rabbit_upgrade:maybe_upgrade_mnesia(). start() -> try diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 92ad6a24..8800e8d6 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -235,10 +235,8 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> - [{apply,{rabbit,prepare,[]}}, Entry]; process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) -> - [{apply,{rabbit_upgrade,maybe_upgrade_mnesia,[]}}, Entry]; + [{apply,{rabbit,prepare,[]}}, Entry]; process_entry(Entry) -> [Entry]. diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 87a22363..f2d38a93 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -101,7 +101,6 @@ ensure_backup_taken() -> end. take_backup() -> - rabbit:prepare(), %% Ensure we have logs for this BackupDir = backup_dir(), case rabbit_mnesia:copy_db(BackupDir) of ok -> info("upgrades: Mnesia dir backed up to ~p~n", @@ -134,12 +133,11 @@ maybe_upgrade_mnesia() -> {ok, []} -> ok; {ok, Upgrades} -> - rabbit:prepare(), %% Ensure we have logs for this ensure_backup_taken(), - case upgrade_mode(AllNodes) of - primary -> primary_upgrade(Upgrades, AllNodes); - secondary -> secondary_upgrade(AllNodes) - end + ok = case upgrade_mode(AllNodes) of + primary -> primary_upgrade(Upgrades, AllNodes); + secondary -> secondary_upgrade(AllNodes) + end end. upgrade_mode(AllNodes) -> -- cgit v1.2.1 From 02a4098c915add7c5f9b9002cf5ff0d6783e091d Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 23 Mar 2011 14:40:57 +0000 Subject: Detect discnodeishness prior to suffering disclessness --- src/rabbit_upgrade.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index f2d38a93..85f6e88c 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -220,12 +220,14 @@ force_tables() -> [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()]. secondary_upgrade(AllNodes) -> + %% must do this before we wipe out schema + IsDiscNode = is_disc_node(), rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), %% Note that we cluster with all nodes, rather than all disc nodes %% (as we can't know all disc nodes at this point). This is safe as %% we're not writing the cluster config, just setting up Mnesia. - ClusterNodes = case is_disc_node() of + ClusterNodes = case IsDiscNode of true -> AllNodes; false -> AllNodes -- [node()] end, -- cgit v1.2.1 From 2cb7c4257df8c2ae2407779a4e4ca8b09b6b9782 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 23 Mar 2011 15:34:23 +0000 Subject: Switched to now_ms() --- src/rabbit_error_logger.erl | 3 ++- src/rabbit_misc.erl | 8 +------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 33dfcef9..5f53e430 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -70,6 +70,7 @@ publish1(RoutingKey, Format, Data, LogExch) -> {ok, _RoutingRes, _DeliveredQPids} = rabbit_basic:publish(LogExch, RoutingKey, false, false, none, #'P_basic'{content_type = <<"text/plain">>, - timestamp = rabbit_misc:timestamp()}, + timestamp = + rabbit_misc:now_ms() div 1000}, list_to_binary(io_lib:format(Format, Data))), ok. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 713498c8..e79a58a1 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -52,7 +52,7 @@ unlink_and_capture_exit/1]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). --export([now_ms/0, timestamp/0]). +-export([now_ms/0]). -export([lock_file/1]). -export([const_ok/1, const/1]). -export([ntoa/1, ntoab/1]). @@ -190,7 +190,6 @@ {bad_edge, [digraph:vertex()]}), digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). --spec(timestamp/0 ::() -> non_neg_integer()). -spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). -spec(const_ok/1 :: (any()) -> 'ok'). -spec(const/1 :: (A) -> const(A)). @@ -200,7 +199,6 @@ -endif. --define(EPOCH, {{1970, 1, 1}, {0, 0, 0}}). %%---------------------------------------------------------------------------- method_record_type(Record) -> @@ -793,10 +791,6 @@ get_flag(_, []) -> now_ms() -> timer:now_diff(now(), {0,0,0}) div 1000. -timestamp() -> - calendar:datetime_to_gregorian_seconds(erlang:universaltime()) - - calendar:datetime_to_gregorian_seconds(?EPOCH). - module_attributes(Module) -> case catch Module:module_info(attributes) of {'EXIT', {undef, [{Module, module_info, _} | _]}} -> -- cgit v1.2.1 From c63dcaa034093cd1dc217c06c102127d18ac524f Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 23 Mar 2011 16:05:04 +0000 Subject: Record all nodes, don't list them when we refuse to start. --- src/rabbit.erl | 4 ++-- src/rabbit_mnesia.erl | 21 +++++++++------------ src/rabbit_upgrade.erl | 15 ++++++--------- 3 files changed, 17 insertions(+), 23 deletions(-) diff --git a/src/rabbit.erl b/src/rabbit.erl index 1361d0f4..e60886fa 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -233,7 +233,7 @@ rotate_logs(BinarySuffix) -> start(normal, []) -> case erts_version_check() of ok -> - ok = rabbit_mnesia:delete_previously_running_disc_nodes(), + ok = rabbit_mnesia:delete_previously_running_nodes(), {ok, SupPid} = rabbit_sup:start_link(), true = register(rabbit, self()), @@ -246,7 +246,7 @@ start(normal, []) -> end. stop(_State) -> - ok = rabbit_mnesia:record_running_disc_nodes(), + ok = rabbit_mnesia:record_running_nodes(), terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 47df1148..e661e5e3 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -22,8 +22,8 @@ is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, create_cluster_nodes_config/1, read_cluster_nodes_config/0, - record_running_disc_nodes/0, read_previously_running_disc_nodes/0, - delete_previously_running_disc_nodes/0, running_nodes_filename/0]). + record_running_nodes/0, read_previously_running_nodes/0, + delete_previously_running_nodes/0, running_nodes_filename/0]). -export([table_names/0]). @@ -61,9 +61,9 @@ -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). -spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). -spec(read_cluster_nodes_config/0 :: () -> [node()]). --spec(record_running_disc_nodes/0 :: () -> 'ok'). --spec(read_previously_running_disc_nodes/0 :: () -> [node()]). --spec(delete_previously_running_disc_nodes/0 :: () -> 'ok'). +-spec(record_running_nodes/0 :: () -> 'ok'). +-spec(read_previously_running_nodes/0 :: () -> [node()]). +-spec(delete_previously_running_nodes/0 :: () -> 'ok'). -spec(running_nodes_filename/0 :: () -> file:filename()). -endif. @@ -380,18 +380,15 @@ delete_cluster_nodes_config() -> running_nodes_filename() -> filename:join(dir(), "nodes_running_at_shutdown"). -record_running_disc_nodes() -> +record_running_nodes() -> FileName = running_nodes_filename(), - Nodes = sets:to_list( - sets:intersection( - sets:from_list(nodes_of_type(disc_copies)), - sets:from_list(running_clustered_nodes()))) -- [node()], + Nodes = running_clustered_nodes() -- [node()], %% Don't check the result: we're shutting down anyway and this is %% a best-effort-basis. rabbit_misc:write_term_file(FileName, [Nodes]), ok. -read_previously_running_disc_nodes() -> +read_previously_running_nodes() -> FileName = running_nodes_filename(), case rabbit_misc:read_term_file(FileName) of {ok, [Nodes]} -> Nodes; @@ -400,7 +397,7 @@ read_previously_running_disc_nodes() -> FileName, Reason}}) end. -delete_previously_running_disc_nodes() -> +delete_previously_running_nodes() -> FileName = running_nodes_filename(), case file:delete(FileName) of ok -> ok; diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 6959208b..244be522 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -144,7 +144,7 @@ maybe_upgrade_mnesia() -> upgrade_mode(AllNodes) -> case nodes_running(AllNodes) of [] -> - AfterUs = rabbit_mnesia:read_previously_running_disc_nodes(), + AfterUs = rabbit_mnesia:read_previously_running_nodes(), case {is_disc_node(), AfterUs} of {true, []} -> primary; @@ -152,14 +152,11 @@ upgrade_mode(AllNodes) -> Filename = rabbit_mnesia:running_nodes_filename(), die("Cluster upgrade needed but other disc nodes shut " "down after this one.~nPlease first start the last " - "disc node to shut down.~nThe disc nodes that were " - "still running when this one shut down are:~n~n" - " ~p~n~nNote: if several disc nodes were shut down " - "simultaneously they may all~nshow this message. " - "In which case, remove the lock file on one of them " - "and~nstart that node. The lock file on this node " - "is:~n~n ~s ", - [AfterUs, Filename]); + "disc node to shut down.~n~nNote: if several disc " + "nodes were shut down simultaneously they may " + "all~nshow this message. In which case, remove " + "the lock file on one of them and~nstart that node. " + "The lock file on this node is:~n~n ~s ", [Filename]); {false, _} -> die("Cluster upgrade needed but this is a ram node.~n" "Please first start the last disc node to shut down.", -- cgit v1.2.1 From 4bb07e06818a4986507685eda2dff36ab56687c5 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 23 Mar 2011 16:16:23 +0000 Subject: Explain --- src/rabbit_error_logger.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 5f53e430..4b13033e 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -70,6 +70,9 @@ publish1(RoutingKey, Format, Data, LogExch) -> {ok, _RoutingRes, _DeliveredQPids} = rabbit_basic:publish(LogExch, RoutingKey, false, false, none, #'P_basic'{content_type = <<"text/plain">>, + %% NB: 0-9-1 says it's a "64 bit POSIX + %% timestamp". That's second + %% resolution, not millisecond. timestamp = rabbit_misc:now_ms() div 1000}, list_to_binary(io_lib:format(Format, Data))), -- cgit v1.2.1 From 77400eaae417d65c9a2556d9281a44a2d521342c Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Wed, 23 Mar 2011 16:26:33 +0000 Subject: cosmetic --- src/rabbit_error_logger.erl | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 4b13033e..3fb0817a 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -67,13 +67,12 @@ publish(_Other, _Format, _Data, _State) -> ok. publish1(RoutingKey, Format, Data, LogExch) -> + %% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's + %% second resolution, not millisecond. + Timestamp = rabbit_misc:now_ms() div 1000, {ok, _RoutingRes, _DeliveredQPids} = rabbit_basic:publish(LogExch, RoutingKey, false, false, none, #'P_basic'{content_type = <<"text/plain">>, - %% NB: 0-9-1 says it's a "64 bit POSIX - %% timestamp". That's second - %% resolution, not millisecond. - timestamp = - rabbit_misc:now_ms() div 1000}, + timestamp = Timestamp}, list_to_binary(io_lib:format(Format, Data))), ok. -- cgit v1.2.1 From 3b89e0573c46e82557dc2592514907e2a6d0ae71 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 23 Mar 2011 16:50:28 +0000 Subject: ARGH! Trailing line --- src/rabbit_misc.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index e79a58a1..2e9563cf 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -871,4 +871,3 @@ is_process_alive(Pid) -> true -> true; _ -> false end. - -- cgit v1.2.1 From 21ac2b8a105560ab59b62c42d9ce6ad05ea9f34d Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 23 Mar 2011 17:08:19 +0000 Subject: Abstract out continuation --- src/rabbit_mnesia.erl | 29 ++++++++++++++--------------- src/rabbit_upgrade.erl | 2 +- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 9ca52327..8bc89880 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -45,7 +45,7 @@ -spec(dir/0 :: () -> file:filename()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). --spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok'). +-spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). -spec(cluster/1 :: ([node()]) -> 'ok'). -spec(force_cluster/1 :: ([node()]) -> 'ok'). @@ -90,7 +90,8 @@ status() -> init() -> ensure_mnesia_running(), ensure_mnesia_dir(), - ok = init_db(read_cluster_nodes_config(), true, true), + ok = init_db(read_cluster_nodes_config(), true, + fun maybe_upgrade_local_or_record_desired/0), ok. is_db_empty() -> @@ -112,7 +113,7 @@ cluster(ClusterNodes, Force) -> ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try - ok = init_db(ClusterNodes, Force, true), + ok = init_db(ClusterNodes, Force, fun () -> ok end), ok = create_cluster_nodes_config(ClusterNodes) after mnesia:stop() @@ -410,7 +411,7 @@ delete_previously_running_nodes() -> %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. If Force is false, don't allow %% connections to offline nodes. -init_db(ClusterNodes, Force, DoSecondaryLocalUpgrades) -> +init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) -> UClusterNodes = lists:usort(ClusterNodes), ProperClusterNodes = UClusterNodes -- [node()], case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of @@ -449,17 +450,7 @@ init_db(ClusterNodes, Force, DoSecondaryLocalUpgrades) -> true -> disc; false -> ram end), - case DoSecondaryLocalUpgrades of - true -> case rabbit_upgrade:maybe_upgrade_local() of - ok -> - ok; - %% If we're just starting up a new - %% node we won't have a version - version_not_available -> - ok = rabbit_version:record_desired() - end; - false -> ok - end, + ok = SecondaryPostMnesiaFun(), ensure_schema_integrity(), ok end; @@ -470,6 +461,14 @@ init_db(ClusterNodes, Force, DoSecondaryLocalUpgrades) -> throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) end. +maybe_upgrade_local_or_record_desired() -> + case rabbit_upgrade:maybe_upgrade_local() of + ok -> ok; + %% If we're just starting up a new node we won't have a + %% version + version_not_available -> ok = rabbit_version:record_desired() + end. + schema_ok_or_move() -> case check_schema_integrity() of ok -> diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 3981b173..5ec08330 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -229,7 +229,7 @@ secondary_upgrade(AllNodes) -> false -> AllNodes -- [node()] end, rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - ok = rabbit_mnesia:init_db(ClusterNodes, true, false), + ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end), ok = rabbit_version:record_desired_for_scope(mnesia), ok. -- cgit v1.2.1 From 330eb98c7bc0e3df4149807dba765263a06c2d3d Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 23 Mar 2011 17:23:12 +0000 Subject: Turns out it's very important that we do write the schema_version when call mnesia:cluster --- src/rabbit_mnesia.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 8bc89880..fbcf07ae 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -113,7 +113,8 @@ cluster(ClusterNodes, Force) -> ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try - ok = init_db(ClusterNodes, Force, fun () -> ok end), + ok = init_db(ClusterNodes, Force, + fun maybe_upgrade_local_or_record_desired/0), ok = create_cluster_nodes_config(ClusterNodes) after mnesia:stop() -- cgit v1.2.1 From 5d51177b297d5425741b808fb6f78a2712a0376e Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 23 Mar 2011 18:06:16 +0000 Subject: cough --- src/rabbit_upgrade.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 5ec08330..a2abb1e5 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -64,11 +64,11 @@ %% into the boot process by prelaunch before the mnesia application is %% started. By the time Mnesia is started the upgrades have happened %% (on the primary), or Mnesia has been reset (on the secondary) and -%% rabbit_mnesia:init_db/2 can then make the node rejoin the cluster +%% rabbit_mnesia:init_db/3 can then make the node rejoin the cluster %% in the normal way. %% %% The non-mnesia upgrades are then triggered by -%% rabbit_mnesia:init_db/2. Of course, it's possible for a given +%% rabbit_mnesia:init_db/3. Of course, it's possible for a given %% upgrade process to only require Mnesia upgrades, or only require %% non-Mnesia upgrades. In the latter case no Mnesia resets and %% reclusterings occur. -- cgit v1.2.1 From 129628e9f1c9a9d8dc0662de2cc7c50459d622d3 Mon Sep 17 00:00:00 2001 From: Vlad Alexandru Ionescu Date: Wed, 23 Mar 2011 20:12:03 +0000 Subject: removing trap_exit flag in rabbit_channel --- src/rabbit_channel.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0c12614c..5099bf3f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -156,7 +156,6 @@ ready_for_close(Pid) -> init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, Capabilities, CollectorPid, StartLimiterFun]) -> - process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), State = #ch{state = starting, -- cgit v1.2.1