summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-11-18 12:23:23 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-11-18 12:23:23 +0000
commit5f7e24198de005dd24ef20f73299a6848c15dcde (patch)
treee115e89a2177341ad07b00e1837d25ad7f17b7af
parent631f130edc3f8fbb704e1c1390925b8d7009b103 (diff)
parentdc4749f6cf83be2b83021e86724ed7ff92ce44d3 (diff)
downloadrabbitmq-server-5f7e24198de005dd24ef20f73299a6848c15dcde.tar.gz
One head is better than two
-rw-r--r--src/rabbit.erl81
-rw-r--r--src/rabbit_exchange.erl15
-rw-r--r--src/rabbit_misc.erl46
-rw-r--r--src/rabbit_mnesia.erl31
-rw-r--r--src/rabbit_upgrade.erl64
5 files changed, 128 insertions, 109 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index a1dd2c2e..61a3a53d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -291,11 +291,10 @@ run_boot_step({StepName, Attributes}) ->
io:format("-- ~s~n", [Description]);
MFAs ->
io:format("starting ~-60s ...", [Description]),
- [case catch apply(M,F,A) of
- {'EXIT', Reason} ->
- boot_error("FAILED~nReason: ~p~n", [Reason]);
- ok ->
- ok
+ [try
+ apply(M,F,A)
+ catch
+ _:Reason -> boot_error("FAILED~nReason: ~p~n", [Reason])
end || {M,F,A} <- MFAs],
io:format("done~n"),
ok
@@ -315,43 +314,43 @@ edges(_Module, Steps) ->
{Key, OtherStep} <- Atts,
Key =:= requires orelse Key =:= enables].
-graph_build_error({vertex, duplicate, StepName}) ->
- boot_error("Duplicate boot step name: ~w~n", [StepName]);
-graph_build_error({edge, Reason, From, To}) ->
- boot_error(
- "Could not add boot step dependency of ~w on ~w:~n~s",
- [To, From,
- case Reason of
- {bad_vertex, V} ->
- io_lib:format("Boot step not registered: ~w~n", [V]);
- {bad_edge, [First | Rest]} ->
- [io_lib:format("Cyclic dependency: ~w", [First]),
- [io_lib:format(" depends on ~w", [Next]) || Next <- Rest],
- io_lib:format(" depends on ~w~n", [First])]
- end]).
-
sort_boot_steps(UnsortedSteps) ->
- G = rabbit_misc:build_acyclic_graph(
- fun vertices/2, fun edges/2, fun graph_build_error/1, UnsortedSteps),
-
- %% Use topological sort to find a consistent ordering (if there is
- %% one, otherwise fail).
- SortedStepsRev = [begin
- {StepName, Step} = digraph:vertex(G, StepName),
- Step
- end || StepName <- digraph_utils:topsort(G)],
- SortedSteps = lists:reverse(SortedStepsRev),
-
- digraph:delete(G),
-
- %% Check that all mentioned {M,F,A} triples are exported.
- case [{StepName, {M,F,A}}
- || {StepName, Attributes} <- SortedSteps,
- {mfa, {M,F,A}} <- Attributes,
- not erlang:function_exported(M, F, length(A))] of
- [] -> SortedSteps;
- MissingFunctions -> boot_error("Boot step functions not exported: ~p~n",
- [MissingFunctions])
+ case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2,
+ UnsortedSteps) of
+ {ok, G} ->
+ %% Use topological sort to find a consistent ordering (if
+ %% there is one, otherwise fail).
+ SortedSteps = lists:reverse(
+ [begin
+ {StepName, Step} = digraph:vertex(G, StepName),
+ Step
+ end || StepName <- digraph_utils:topsort(G)]),
+ digraph:delete(G),
+ %% Check that all mentioned {M,F,A} triples are exported.
+ case [{StepName, {M,F,A}} ||
+ {StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
+ not erlang:function_exported(M, F, length(A))] of
+ [] -> SortedSteps;
+ MissingFunctions -> boot_error(
+ "Boot step functions not exported: ~p~n",
+ [MissingFunctions])
+ end;
+ {error, {vertex, duplicate, StepName}} ->
+ boot_error("Duplicate boot step name: ~w~n", [StepName]);
+ {error, {edge, Reason, From, To}} ->
+ boot_error(
+ "Could not add boot step dependency of ~w on ~w:~n~s",
+ [To, From,
+ case Reason of
+ {bad_vertex, V} ->
+ io_lib:format("Boot step not registered: ~w~n", [V]);
+ {bad_edge, [First | Rest]} ->
+ [io_lib:format("Cyclic dependency: ~w", [First]),
+ [io_lib:format(" depends on ~w", [Next]) ||
+ Next <- Rest],
+ io_lib:format(" depends on ~w~n", [First])]
+ end])
end.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 61a24388..4c0f341f 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -228,7 +228,7 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
publish(X = #exchange{name = XName}, Delivery) ->
rabbit_router:deliver(
- route(Delivery, {queue:from_list([X]), sets:from_list([XName]), []}),
+ route(Delivery, {queue:from_list([X]), XName, []}),
Delivery).
route(Delivery, {WorkList, SeenXs, QNames}) ->
@@ -252,13 +252,22 @@ process_alternate(_X, Results) ->
Results.
process_route(#resource{kind = exchange} = XName,
+ {_WorkList, XName, _QNames} = Acc) ->
+ Acc;
+process_route(#resource{kind = exchange} = XName,
+ {WorkList, #resource{kind = exchange} = SeenX, QNames}) ->
+ {case lookup(XName) of
+ {ok, X} -> queue:in(X, WorkList);
+ {error, not_found} -> WorkList
+ end, gb_sets:from_list([SeenX, XName]), QNames};
+process_route(#resource{kind = exchange} = XName,
{WorkList, SeenXs, QNames} = Acc) ->
- case sets:is_element(XName, SeenXs) of
+ case gb_sets:is_element(XName, SeenXs) of
true -> Acc;
false -> {case lookup(XName) of
{ok, X} -> queue:in(X, WorkList);
{error, not_found} -> WorkList
- end, sets:add_element(XName, SeenXs), QNames}
+ end, gb_sets:add_element(XName, SeenXs), QNames}
end;
process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 9290508f..230f4db5 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -64,7 +64,7 @@
-export([recursive_delete/1, dict_cons/3, orddict_cons/3,
unlink_and_capture_exit/1]).
-export([get_options/2]).
--export([all_module_attributes/1, build_acyclic_graph/4]).
+-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
-import(mnesia).
@@ -86,10 +86,9 @@
:: rabbit_types:channel_exit() | rabbit_types:connection_exit()).
-type(digraph_label() :: term()).
-type(graph_vertex_fun() ::
- fun ((atom(), [term()]) -> {digraph:vertex(), digraph_label()})).
+ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
- fun ((atom(), [term()]) -> {digraph:vertex(), digraph:vertex()})).
--type(graph_error_fun() :: fun ((any()) -> any() | no_return())).
+ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -190,9 +189,13 @@
-spec(get_options/2 :: ([optdef()], [string()])
-> {[string()], [{string(), any()}]}).
-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
--spec(build_acyclic_graph/4 :: (graph_vertex_fun(), graph_edge_fun(),
- graph_error_fun(), [{atom(), [term()]}]) ->
- digraph()).
+-spec(build_acyclic_graph/3 ::
+ (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}])
+ -> rabbit_types:ok_or_error2(digraph(),
+ {'vertex', 'duplicate', digraph:vertex()} |
+ {'edge', ({bad_vertex, digraph:vertex()} |
+ {bad_edge, [digraph:vertex()]}),
+ digraph:vertex(), digraph:vertex()})).
-spec(now_ms/0 :: () -> non_neg_integer()).
-endif.
@@ -760,16 +763,21 @@ all_module_attributes(Name) ->
end, [], Modules).
-build_acyclic_graph(VertexFun, EdgeFun, ErrorFun, Graph) ->
+build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
G = digraph:new([acyclic]),
- [ case digraph:vertex(G, Vertex) of
- false -> digraph:add_vertex(G, Vertex, Label);
- _ -> ErrorFun({vertex, duplicate, Vertex})
- end || {Module, Atts} <- Graph,
- {Vertex, Label} <- VertexFun(Module, Atts) ],
- [ case digraph:add_edge(G, From, To) of
- {error, E} -> ErrorFun({edge, E, From, To});
- _ -> ok
- end || {Module, Atts} <- Graph,
- {From, To} <- EdgeFun(Module, Atts) ],
- G.
+ try
+ [case digraph:vertex(G, Vertex) of
+ false -> digraph:add_vertex(G, Vertex, Label);
+ _ -> ok = throw({graph_error, {vertex, duplicate, Vertex}})
+ end || {Module, Atts} <- Graph,
+ {Vertex, Label} <- VertexFun(Module, Atts)],
+ [case digraph:add_edge(G, From, To) of
+ {error, E} -> throw({graph_error, {edge, E, From, To}});
+ _ -> ok
+ end || {Module, Atts} <- Graph,
+ {From, To} <- EdgeFun(Module, Atts)],
+ {ok, G}
+ catch {graph_error, Reason} ->
+ true = digraph:delete(G),
+ {error, Reason}
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index da81f884..cb3251c7 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -378,7 +378,7 @@ init_db(ClusterNodes, Force) ->
wait_for_tables(),
case rabbit_upgrade:maybe_upgrade() of
ok ->
- schema_ok_or_exit();
+ ensure_schema_ok();
version_not_available ->
schema_ok_or_move()
end;
@@ -386,15 +386,15 @@ init_db(ClusterNodes, Force) ->
%% "Master" (i.e. without config) disc node in cluster,
%% verify schema
wait_for_tables(),
- version_ok_or_exit(rabbit_upgrade:read_version()),
- schema_ok_or_exit();
+ ensure_version_ok(rabbit_upgrade:read_version()),
+ ensure_schema_ok();
{[], false, _} ->
%% First RAM node in cluster, start from scratch
ok = create_schema();
{[AnotherNode|_], _, _} ->
%% Subsequent node in cluster, catch up
- version_ok_or_exit(rabbit_upgrade:read_version()),
- version_ok_or_exit(
+ ensure_version_ok(rabbit_upgrade:read_version()),
+ ensure_version_ok(
rpc:call(AnotherNode, rabbit_upgrade, read_version, [])),
IsDiskNode = ClusterNodes == [] orelse
lists:member(node(), ClusterNodes),
@@ -404,7 +404,7 @@ init_db(ClusterNodes, Force) ->
true -> disc;
false -> ram
end),
- schema_ok_or_exit()
+ ensure_schema_ok()
end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
@@ -428,22 +428,19 @@ schema_ok_or_move() ->
ok = create_schema()
end.
-version_ok_or_exit({ok, DiscVersion}) ->
+ensure_version_ok({ok, DiscVersion}) ->
case rabbit_upgrade:desired_version() of
- DiscVersion ->
- ok;
- DesiredVersion ->
- exit({schema_mismatch, DesiredVersion, DiscVersion})
+ DiscVersion -> ok;
+ DesiredVersion -> throw({error, {schema_mismatch,
+ DesiredVersion, DiscVersion}})
end;
-version_ok_or_exit({error, _}) ->
+ensure_version_ok({error, _}) ->
ok = rabbit_upgrade:write_version().
-schema_ok_or_exit() ->
+ensure_schema_ok() ->
case check_schema_integrity() of
- ok ->
- ok;
- {error, Reason} ->
- exit({schema_invalid, Reason})
+ ok -> ok;
+ {error, Reason} -> throw({error, {schema_invalid, Reason}})
end.
create_schema() ->
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 9522227e..27a94f6f 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -32,11 +32,13 @@
-ifdef(use_specs).
+-type(step() :: atom()).
+-type(version() :: [step()]).
+
-spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available').
--spec(read_version/0 ::
- () -> {'ok', [any()]} | rabbit_types:error(any())).
+-spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())).
-spec(write_version/0 :: () -> 'ok').
--spec(desired_version/0 :: () -> [atom()]).
+-spec(desired_version/0 :: () -> version()).
-endif.
@@ -48,18 +50,17 @@
maybe_upgrade() ->
case read_version() of
{ok, CurrentHeads} ->
- G = load_graph(),
- case unknown_heads(CurrentHeads, G) of
- [] ->
- case upgrades_to_apply(CurrentHeads, G) of
- [] -> ok;
- Upgrades -> apply_upgrades(Upgrades)
- end;
- Unknown ->
- exit({future_upgrades_found, Unknown})
- end,
- true = digraph:delete(G),
- ok;
+ with_upgrade_graph(
+ fun (G) ->
+ case unknown_heads(CurrentHeads, G) of
+ [] -> case upgrades_to_apply(CurrentHeads, G) of
+ [] -> ok;
+ Upgrades -> apply_upgrades(Upgrades)
+ end;
+ Unknown -> throw({error,
+ {future_upgrades_found, Unknown}})
+ end
+ end);
{error, enoent} ->
version_not_available
end.
@@ -75,17 +76,26 @@ write_version() ->
ok.
desired_version() ->
- G = load_graph(),
- Version = heads(G),
- true = digraph:delete(G),
- Version.
+ with_upgrade_graph(fun (G) -> heads(G) end).
%% -------------------------------------------------------------------
-load_graph() ->
- Upgrades = rabbit_misc:all_module_attributes(rabbit_upgrade),
- rabbit_misc:build_acyclic_graph(
- fun vertices/2, fun edges/2, fun graph_build_error/1, Upgrades).
+with_upgrade_graph(Fun) ->
+ case rabbit_misc:build_acyclic_graph(
+ fun vertices/2, fun edges/2,
+ rabbit_misc:all_module_attributes(rabbit_upgrade)) of
+ {ok, G} -> try
+ Fun(G)
+ after
+ true = digraph:delete(G)
+ end;
+ {error, {vertex, duplicate, StepName}} ->
+ throw({error, {duplicate_upgrade_step, StepName}});
+ {error, {edge, {bad_vertex, StepName}, _From, _To}} ->
+ throw({error, {dependency_on_unknown_upgrade_step, StepName}});
+ {error, {edge, {bad_edge, StepNames}, _From, _To}} ->
+ throw({error, {cycle_in_upgrade_steps, StepNames}})
+ end.
vertices(Module, Steps) ->
[{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps].
@@ -93,10 +103,6 @@ vertices(Module, Steps) ->
edges(_Module, Steps) ->
[{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires].
-graph_build_error({vertex, duplicate, StepName}) ->
- exit({duplicate_upgrade, StepName});
-graph_build_error({edge, E, From, To}) ->
- exit({E, From, To}).
unknown_heads(Heads, G) ->
[H || H <- Heads, digraph:vertex(G, H) =:= false].
@@ -130,9 +136,9 @@ apply_upgrades(Upgrades) ->
ok = write_version(),
ok = file:delete(LockFile);
{error, eexist} ->
- exit(previous_upgrade_failed);
+ throw({error, previous_upgrade_failed});
{error, _} = Error ->
- exit(Error)
+ throw(Error)
end.
apply_upgrade({M, F}) ->