diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-11-18 12:23:23 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-11-18 12:23:23 +0000 |
commit | 5f7e24198de005dd24ef20f73299a6848c15dcde (patch) | |
tree | e115e89a2177341ad07b00e1837d25ad7f17b7af /src | |
parent | 631f130edc3f8fbb704e1c1390925b8d7009b103 (diff) | |
parent | dc4749f6cf83be2b83021e86724ed7ff92ce44d3 (diff) | |
download | rabbitmq-server-5f7e24198de005dd24ef20f73299a6848c15dcde.tar.gz |
One head is better than two
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit.erl | 81 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 15 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 46 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 31 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 64 |
5 files changed, 128 insertions, 109 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index a1dd2c2e..61a3a53d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -291,11 +291,10 @@ run_boot_step({StepName, Attributes}) -> io:format("-- ~s~n", [Description]); MFAs -> io:format("starting ~-60s ...", [Description]), - [case catch apply(M,F,A) of - {'EXIT', Reason} -> - boot_error("FAILED~nReason: ~p~n", [Reason]); - ok -> - ok + [try + apply(M,F,A) + catch + _:Reason -> boot_error("FAILED~nReason: ~p~n", [Reason]) end || {M,F,A} <- MFAs], io:format("done~n"), ok @@ -315,43 +314,43 @@ edges(_Module, Steps) -> {Key, OtherStep} <- Atts, Key =:= requires orelse Key =:= enables]. -graph_build_error({vertex, duplicate, StepName}) -> - boot_error("Duplicate boot step name: ~w~n", [StepName]); -graph_build_error({edge, Reason, From, To}) -> - boot_error( - "Could not add boot step dependency of ~w on ~w:~n~s", - [To, From, - case Reason of - {bad_vertex, V} -> - io_lib:format("Boot step not registered: ~w~n", [V]); - {bad_edge, [First | Rest]} -> - [io_lib:format("Cyclic dependency: ~w", [First]), - [io_lib:format(" depends on ~w", [Next]) || Next <- Rest], - io_lib:format(" depends on ~w~n", [First])] - end]). - sort_boot_steps(UnsortedSteps) -> - G = rabbit_misc:build_acyclic_graph( - fun vertices/2, fun edges/2, fun graph_build_error/1, UnsortedSteps), - - %% Use topological sort to find a consistent ordering (if there is - %% one, otherwise fail). - SortedStepsRev = [begin - {StepName, Step} = digraph:vertex(G, StepName), - Step - end || StepName <- digraph_utils:topsort(G)], - SortedSteps = lists:reverse(SortedStepsRev), - - digraph:delete(G), - - %% Check that all mentioned {M,F,A} triples are exported. - case [{StepName, {M,F,A}} - || {StepName, Attributes} <- SortedSteps, - {mfa, {M,F,A}} <- Attributes, - not erlang:function_exported(M, F, length(A))] of - [] -> SortedSteps; - MissingFunctions -> boot_error("Boot step functions not exported: ~p~n", - [MissingFunctions]) + case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2, + UnsortedSteps) of + {ok, G} -> + %% Use topological sort to find a consistent ordering (if + %% there is one, otherwise fail). + SortedSteps = lists:reverse( + [begin + {StepName, Step} = digraph:vertex(G, StepName), + Step + end || StepName <- digraph_utils:topsort(G)]), + digraph:delete(G), + %% Check that all mentioned {M,F,A} triples are exported. + case [{StepName, {M,F,A}} || + {StepName, Attributes} <- SortedSteps, + {mfa, {M,F,A}} <- Attributes, + not erlang:function_exported(M, F, length(A))] of + [] -> SortedSteps; + MissingFunctions -> boot_error( + "Boot step functions not exported: ~p~n", + [MissingFunctions]) + end; + {error, {vertex, duplicate, StepName}} -> + boot_error("Duplicate boot step name: ~w~n", [StepName]); + {error, {edge, Reason, From, To}} -> + boot_error( + "Could not add boot step dependency of ~w on ~w:~n~s", + [To, From, + case Reason of + {bad_vertex, V} -> + io_lib:format("Boot step not registered: ~w~n", [V]); + {bad_edge, [First | Rest]} -> + [io_lib:format("Cyclic dependency: ~w", [First]), + [io_lib:format(" depends on ~w", [Next]) || + Next <- Rest], + io_lib:format(" depends on ~w~n", [First])] + end]) end. %%--------------------------------------------------------------------------- diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 61a24388..4c0f341f 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -228,7 +228,7 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X = #exchange{name = XName}, Delivery) -> rabbit_router:deliver( - route(Delivery, {queue:from_list([X]), sets:from_list([XName]), []}), + route(Delivery, {queue:from_list([X]), XName, []}), Delivery). route(Delivery, {WorkList, SeenXs, QNames}) -> @@ -252,13 +252,22 @@ process_alternate(_X, Results) -> Results. process_route(#resource{kind = exchange} = XName, + {_WorkList, XName, _QNames} = Acc) -> + Acc; +process_route(#resource{kind = exchange} = XName, + {WorkList, #resource{kind = exchange} = SeenX, QNames}) -> + {case lookup(XName) of + {ok, X} -> queue:in(X, WorkList); + {error, not_found} -> WorkList + end, gb_sets:from_list([SeenX, XName]), QNames}; +process_route(#resource{kind = exchange} = XName, {WorkList, SeenXs, QNames} = Acc) -> - case sets:is_element(XName, SeenXs) of + case gb_sets:is_element(XName, SeenXs) of true -> Acc; false -> {case lookup(XName) of {ok, X} -> queue:in(X, WorkList); {error, not_found} -> WorkList - end, sets:add_element(XName, SeenXs), QNames} + end, gb_sets:add_element(XName, SeenXs), QNames} end; process_route(#resource{kind = queue} = QName, {WorkList, SeenXs, QNames}) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9290508f..230f4db5 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -64,7 +64,7 @@ -export([recursive_delete/1, dict_cons/3, orddict_cons/3, unlink_and_capture_exit/1]). -export([get_options/2]). --export([all_module_attributes/1, build_acyclic_graph/4]). +-export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). -import(mnesia). @@ -86,10 +86,9 @@ :: rabbit_types:channel_exit() | rabbit_types:connection_exit()). -type(digraph_label() :: term()). -type(graph_vertex_fun() :: - fun ((atom(), [term()]) -> {digraph:vertex(), digraph_label()})). + fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: - fun ((atom(), [term()]) -> {digraph:vertex(), digraph:vertex()})). --type(graph_error_fun() :: fun ((any()) -> any() | no_return())). + fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -190,9 +189,13 @@ -spec(get_options/2 :: ([optdef()], [string()]) -> {[string()], [{string(), any()}]}). -spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). --spec(build_acyclic_graph/4 :: (graph_vertex_fun(), graph_edge_fun(), - graph_error_fun(), [{atom(), [term()]}]) -> - digraph()). +-spec(build_acyclic_graph/3 :: + (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}]) + -> rabbit_types:ok_or_error2(digraph(), + {'vertex', 'duplicate', digraph:vertex()} | + {'edge', ({bad_vertex, digraph:vertex()} | + {bad_edge, [digraph:vertex()]}), + digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). -endif. @@ -760,16 +763,21 @@ all_module_attributes(Name) -> end, [], Modules). -build_acyclic_graph(VertexFun, EdgeFun, ErrorFun, Graph) -> +build_acyclic_graph(VertexFun, EdgeFun, Graph) -> G = digraph:new([acyclic]), - [ case digraph:vertex(G, Vertex) of - false -> digraph:add_vertex(G, Vertex, Label); - _ -> ErrorFun({vertex, duplicate, Vertex}) - end || {Module, Atts} <- Graph, - {Vertex, Label} <- VertexFun(Module, Atts) ], - [ case digraph:add_edge(G, From, To) of - {error, E} -> ErrorFun({edge, E, From, To}); - _ -> ok - end || {Module, Atts} <- Graph, - {From, To} <- EdgeFun(Module, Atts) ], - G. + try + [case digraph:vertex(G, Vertex) of + false -> digraph:add_vertex(G, Vertex, Label); + _ -> ok = throw({graph_error, {vertex, duplicate, Vertex}}) + end || {Module, Atts} <- Graph, + {Vertex, Label} <- VertexFun(Module, Atts)], + [case digraph:add_edge(G, From, To) of + {error, E} -> throw({graph_error, {edge, E, From, To}}); + _ -> ok + end || {Module, Atts} <- Graph, + {From, To} <- EdgeFun(Module, Atts)], + {ok, G} + catch {graph_error, Reason} -> + true = digraph:delete(G), + {error, Reason} + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index da81f884..cb3251c7 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -378,7 +378,7 @@ init_db(ClusterNodes, Force) -> wait_for_tables(), case rabbit_upgrade:maybe_upgrade() of ok -> - schema_ok_or_exit(); + ensure_schema_ok(); version_not_available -> schema_ok_or_move() end; @@ -386,15 +386,15 @@ init_db(ClusterNodes, Force) -> %% "Master" (i.e. without config) disc node in cluster, %% verify schema wait_for_tables(), - version_ok_or_exit(rabbit_upgrade:read_version()), - schema_ok_or_exit(); + ensure_version_ok(rabbit_upgrade:read_version()), + ensure_schema_ok(); {[], false, _} -> %% First RAM node in cluster, start from scratch ok = create_schema(); {[AnotherNode|_], _, _} -> %% Subsequent node in cluster, catch up - version_ok_or_exit(rabbit_upgrade:read_version()), - version_ok_or_exit( + ensure_version_ok(rabbit_upgrade:read_version()), + ensure_version_ok( rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), @@ -404,7 +404,7 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), - schema_ok_or_exit() + ensure_schema_ok() end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -428,22 +428,19 @@ schema_ok_or_move() -> ok = create_schema() end. -version_ok_or_exit({ok, DiscVersion}) -> +ensure_version_ok({ok, DiscVersion}) -> case rabbit_upgrade:desired_version() of - DiscVersion -> - ok; - DesiredVersion -> - exit({schema_mismatch, DesiredVersion, DiscVersion}) + DiscVersion -> ok; + DesiredVersion -> throw({error, {schema_mismatch, + DesiredVersion, DiscVersion}}) end; -version_ok_or_exit({error, _}) -> +ensure_version_ok({error, _}) -> ok = rabbit_upgrade:write_version(). -schema_ok_or_exit() -> +ensure_schema_ok() -> case check_schema_integrity() of - ok -> - ok; - {error, Reason} -> - exit({schema_invalid, Reason}) + ok -> ok; + {error, Reason} -> throw({error, {schema_invalid, Reason}}) end. create_schema() -> diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 9522227e..27a94f6f 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -32,11 +32,13 @@ -ifdef(use_specs). +-type(step() :: atom()). +-type(version() :: [step()]). + -spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available'). --spec(read_version/0 :: - () -> {'ok', [any()]} | rabbit_types:error(any())). +-spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). --spec(desired_version/0 :: () -> [atom()]). +-spec(desired_version/0 :: () -> version()). -endif. @@ -48,18 +50,17 @@ maybe_upgrade() -> case read_version() of {ok, CurrentHeads} -> - G = load_graph(), - case unknown_heads(CurrentHeads, G) of - [] -> - case upgrades_to_apply(CurrentHeads, G) of - [] -> ok; - Upgrades -> apply_upgrades(Upgrades) - end; - Unknown -> - exit({future_upgrades_found, Unknown}) - end, - true = digraph:delete(G), - ok; + with_upgrade_graph( + fun (G) -> + case unknown_heads(CurrentHeads, G) of + [] -> case upgrades_to_apply(CurrentHeads, G) of + [] -> ok; + Upgrades -> apply_upgrades(Upgrades) + end; + Unknown -> throw({error, + {future_upgrades_found, Unknown}}) + end + end); {error, enoent} -> version_not_available end. @@ -75,17 +76,26 @@ write_version() -> ok. desired_version() -> - G = load_graph(), - Version = heads(G), - true = digraph:delete(G), - Version. + with_upgrade_graph(fun (G) -> heads(G) end). %% ------------------------------------------------------------------- -load_graph() -> - Upgrades = rabbit_misc:all_module_attributes(rabbit_upgrade), - rabbit_misc:build_acyclic_graph( - fun vertices/2, fun edges/2, fun graph_build_error/1, Upgrades). +with_upgrade_graph(Fun) -> + case rabbit_misc:build_acyclic_graph( + fun vertices/2, fun edges/2, + rabbit_misc:all_module_attributes(rabbit_upgrade)) of + {ok, G} -> try + Fun(G) + after + true = digraph:delete(G) + end; + {error, {vertex, duplicate, StepName}} -> + throw({error, {duplicate_upgrade_step, StepName}}); + {error, {edge, {bad_vertex, StepName}, _From, _To}} -> + throw({error, {dependency_on_unknown_upgrade_step, StepName}}); + {error, {edge, {bad_edge, StepNames}, _From, _To}} -> + throw({error, {cycle_in_upgrade_steps, StepNames}}) + end. vertices(Module, Steps) -> [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps]. @@ -93,10 +103,6 @@ vertices(Module, Steps) -> edges(_Module, Steps) -> [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. -graph_build_error({vertex, duplicate, StepName}) -> - exit({duplicate_upgrade, StepName}); -graph_build_error({edge, E, From, To}) -> - exit({E, From, To}). unknown_heads(Heads, G) -> [H || H <- Heads, digraph:vertex(G, H) =:= false]. @@ -130,9 +136,9 @@ apply_upgrades(Upgrades) -> ok = write_version(), ok = file:delete(LockFile); {error, eexist} -> - exit(previous_upgrade_failed); + throw({error, previous_upgrade_failed}); {error, _} = Error -> - exit(Error) + throw(Error) end. apply_upgrade({M, F}) -> |