summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit.erl341
-rw-r--r--src/rabbit_alarm.erl10
-rw-r--r--src/rabbit_error_logger.erl6
-rw-r--r--src/rabbit_networking.erl23
-rw-r--r--src/rabbit_plugin_activator.erl25
-rw-r--r--src/rabbit_sup.erl11
6 files changed, 274 insertions, 142 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index f8c3ef28..136b47ca 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -39,6 +39,115 @@
-export([log_location/1]).
+%%---------------------------------------------------------------------------
+%% Boot steps.
+-export([maybe_insert_default_data/0]).
+
+-rabbit_boot_step({codec_correctness_check,
+ [{description, "codec correctness check"},
+ {mfa, {rabbit_binary_generator,
+ check_empty_content_body_frame_size,
+ []}}]}).
+
+-rabbit_boot_step({database,
+ [{mfa, {rabbit_mnesia, init, []}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({rabbit_exchange_type,
+ [{description, "exchange type registry"},
+ {mfa, {rabbit_sup, start_child, [rabbit_exchange_type]}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({rabbit_log,
+ [{description, "logging server"},
+ {mfa, {rabbit_sup, start_child, [rabbit_log]}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({rabbit_hooks,
+ [{description, "internal event notification system"},
+ {mfa, {rabbit_hooks, start, []}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({kernel_ready,
+ [{description, "kernel ready"}]}).
+
+-rabbit_boot_step({rabbit_alarm,
+ [{description, "alarm handler"},
+ {mfa, {rabbit_alarm, start, []}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_amqqueue_sup,
+ [{description, "queue supervisor"},
+ {mfa, {rabbit_amqqueue, start, []}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_router,
+ [{description, "cluster router"},
+ {mfa, {rabbit_sup, start_child, [rabbit_router]}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_node_monitor,
+ [{description, "node monitor"},
+ {mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}},
+ {post, kernel_ready},
+ {post, rabbit_amqqueue_sup},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_exchange_events,
+ [{description, "exchange event notifier"},
+ {mfa, {rabbit_sup, start_child, [rabbit_exchange_events]}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({core_initialized,
+ [{description, "core initialized"}]}).
+
+-rabbit_boot_step({empty_db_check,
+ [{description, "empty DB check"},
+ {mfa, {?MODULE, maybe_insert_default_data, []}},
+ {post, core_initialized}]}).
+
+-rabbit_boot_step({exchange_recovery,
+ [{description, "exchange recovery"},
+ {mfa, {rabbit_exchange, recover, []}},
+ {post, empty_db_check}]}).
+
+-rabbit_boot_step({queue_recovery,
+ [{description, "queue recovery"},
+ {mfa, {rabbit_amqqueue, recover, []}},
+ {post, exchange_recovery}]}).
+
+-rabbit_boot_step({persister,
+ [{mfa, {rabbit_sup, start_child, [rabbit_persister]}},
+ {post, queue_recovery}]}).
+
+-rabbit_boot_step({guid_generator,
+ [{description, "guid generator"},
+ {mfa, {rabbit_sup, start_child, [rabbit_guid]}},
+ {post, persister},
+ {pre, routing_ready}]}).
+
+-rabbit_boot_step({routing_ready,
+ [{description, "message delivery logic ready"}]}).
+
+-rabbit_boot_step({log_relay,
+ [{description, "error log relay"},
+ {mfa, {rabbit_error_logger, boot, []}},
+ {post, routing_ready}]}).
+
+-rabbit_boot_step({networking,
+ [{mfa, {rabbit_networking, boot, []}},
+ {post, log_relay},
+ {pre, networking_listening}]}).
+
+-rabbit_boot_step({networking_listening,
+ [{description, "network listeners available"}]}).
+
+%%---------------------------------------------------------------------------
+
-import(application).
-import(mnesia).
-import(lists).
@@ -79,7 +188,7 @@ prepare() ->
start() ->
try
ok = prepare(),
- ok = rabbit_misc:start_applications(?APPS)
+ ok = rabbit_misc:start_applications(?APPS)
after
%%give the error loggers some time to catch up
timer:sleep(100)
@@ -115,102 +224,10 @@ rotate_logs(BinarySuffix) ->
%%--------------------------------------------------------------------
start(normal, []) ->
-
{ok, SupPid} = rabbit_sup:start_link(),
print_banner(),
-
- HookModules = discover_static_hooks(startup_hook),
-
- lists:foreach(
- fun ({Phase, Msg, Thunk}) ->
- io:format("starting ~-20s ...", [Msg]),
- ok = run_static_hooks(HookModules, startup_hook, {pre, Phase}),
- Thunk(),
- ok = run_static_hooks(HookModules, startup_hook, {post, Phase}),
- io:format("done~n");
- ({Phase, Msg, M, F, A}) ->
- io:format("starting ~-20s ...", [Msg]),
- ok = run_static_hooks(HookModules, startup_hook, {pre, Phase}),
- apply(M, F, A),
- ok = run_static_hooks(HookModules, startup_hook, {post, Phase}),
- io:format("done~n")
- end,
- [{database, "database",
- fun () -> ok = rabbit_mnesia:init() end},
- {core_processes, "core processes",
- fun () ->
- ok = start_child(rabbit_exchange_type),
- ok = start_child(rabbit_log),
- ok = rabbit_hooks:start(),
-
- ok = rabbit_binary_generator:
- check_empty_content_body_frame_size(),
-
- ok = rabbit_alarm:start(),
-
- {ok, MemoryWatermark} =
- application:get_env(vm_memory_high_watermark),
- ok = case MemoryWatermark == 0 of
- true ->
- ok;
- false ->
- start_child(vm_memory_monitor, [MemoryWatermark])
- end,
-
- ok = rabbit_amqqueue:start(),
-
- ok = start_child(rabbit_router),
- ok = start_child(rabbit_node_monitor),
- ok = start_child(rabbit_exchange_events)
- end},
- {recovery, "recovery",
- fun () ->
- ok = maybe_insert_default_data(),
- ok = rabbit_exchange:recover(),
- ok = rabbit_amqqueue:recover()
- end},
- {persister, "persister",
- fun () ->
- ok = start_child(rabbit_persister)
- end},
- {guid_generator, "guid generator",
- fun () ->
- ok = start_child(rabbit_guid)
- end},
- {builtin_applications, "builtin applications",
- fun () ->
- {ok, DefaultVHost} = application:get_env(default_vhost),
- ok = error_logger:add_report_handler(
- rabbit_error_logger, [DefaultVHost]),
- ok = start_builtin_amq_applications()
- end},
- {tcp_listeners, "TCP listeners",
- fun () ->
- ok = rabbit_networking:start(),
- {ok, TcpListeners} = application:get_env(tcp_listeners),
- lists:foreach(
- fun ({Host, Port}) ->
- ok = rabbit_networking:start_tcp_listener(Host, Port)
- end,
- TcpListeners)
- end},
- {ssl_listeners, "SSL listeners",
- fun () ->
- case application:get_env(ssl_listeners) of
- {ok, []} ->
- ok;
- {ok, SslListeners} ->
- ok = rabbit_misc:start_applications([crypto, ssl]),
-
- {ok, SslOpts} = application:get_env(ssl_options),
-
- [rabbit_networking:start_ssl_listener
- (Host, Port, SslOpts) || {Host, Port} <- SslListeners],
- ok
- end
- end}]),
-
+ [ok = run_boot_step(Step) || Step <- boot_steps()],
io:format("~nbroker running~n"),
{ok, SupPid}.
@@ -224,10 +241,108 @@ stop(_State) ->
end,
ok.
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
+
+boot_error(Format, Args) ->
+ io:format("BOOT ERROR: " ++ Format, Args),
+ error_logger:error_msg(Format, Args),
+ timer:sleep(1000),
+ exit({?MODULE, failure_during_boot}).
+
+run_boot_step({StepName, Attributes}) ->
+ Description = case lists:keysearch(description, 1, Attributes) of
+ {value, {_, D}} -> D;
+ false -> StepName
+ end,
+ case [MFA || {mfa, MFA} <- Attributes] of
+ [] ->
+ io:format("progress -- ~s~n", [Description]);
+ MFAs ->
+ io:format("starting ~-40s ...", [Description]),
+ [case catch apply(M,F,A) of
+ {'EXIT', Reason} ->
+ boot_error("FAILED~nReason: ~p~n", [Reason]);
+ ok ->
+ ok
+ end || {M,F,A} <- MFAs],
+ io:format("done~n"),
+ ok
+ end.
+
+boot_steps() ->
+ AllApps = [App || {App, _, _} <- application:loaded_applications()],
+ Modules = lists:usort(
+ lists:append([Modules
+ || {ok, Modules} <-
+ [application:get_key(App, modules)
+ || App <- AllApps]])),
+ UnsortedSteps =
+ lists:flatmap(fun (Module) ->
+ [{StepName, Attributes}
+ || {rabbit_boot_step, [{StepName, Attributes}]}
+ <- Module:module_info(attributes)]
+ end, Modules),
+ sort_boot_steps(UnsortedSteps).
+
+sort_boot_steps(UnsortedSteps) ->
+ G = digraph:new([acyclic]),
+
+ %% Add vertices, with duplicate checking.
+ [case digraph:vertex(G, StepName) of
+ false -> digraph:add_vertex(G, StepName, Step);
+ _ -> boot_error("Duplicate boot step name: ~w~n", [StepName])
+ end || Step = {StepName, _Attrs} <- UnsortedSteps],
+
+ %% Add edges, detecting cycles and missing vertices.
+ lists:foreach(fun ({StepName, Attributes}) ->
+ [add_boot_step_dep(G, StepName, PrecedingStepName)
+ || {post, PrecedingStepName} <- Attributes],
+ [add_boot_step_dep(G, SucceedingStepName, StepName)
+ || {pre, SucceedingStepName} <- Attributes]
+ end, UnsortedSteps),
+
+ %% Use topological sort to find a consistent ordering (if there is
+ %% one, otherwise fail).
+ SortedStepsRev = [begin
+ {StepName, Step} = digraph:vertex(G, StepName),
+ Step
+ end || StepName <- digraph_utils:topsort(G)],
+ SortedSteps = lists:reverse(SortedStepsRev),
+
+ digraph:delete(G),
+
+ %% Check that all mentioned {M,F,A} triples are exported.
+ case [{StepName, {M,F,A}}
+ || {StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
+ not erlang:function_exported(M, F, length(A))] of
+ [] -> SortedSteps;
+ MissingFunctions -> boot_error("Boot step functions not exported: ~p~n",
+ [MissingFunctions])
+ end.
+
+add_boot_step_dep(G, RunsSecond, RunsFirst) ->
+ case digraph:add_edge(G, RunsSecond, RunsFirst) of
+ {error, Reason} ->
+ boot_error("Could not add boot step dependency of ~w on ~w:~n~s",
+ [RunsSecond, RunsFirst,
+ case Reason of
+ {bad_vertex, V} ->
+ io_lib:format("Boot step not registered: ~w~n", [V]);
+ {bad_edge, [First | Rest]} ->
+ [io_lib:format("Cyclic dependency: ~w", [First]),
+ [io_lib:format(" depends on ~w", [Next])
+ || Next <- Rest],
+ io_lib:format(" depends on ~w~n", [First])]
+ end]);
+ _ ->
+ ok
+ end.
+
+%%---------------------------------------------------------------------------
log_location(Type) ->
- case application:get_env(Type, case Type of
+ case application:get_env(Type, case Type of
kernel -> error_logger;
sasl -> sasl_error_logger
end) of
@@ -283,15 +398,6 @@ print_banner() ->
lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
io:nl().
-start_child(Mod) ->
- start_child(Mod, []).
-
-start_child(Mod, Args) ->
- {ok,_} = supervisor:start_child(rabbit_sup,
- {Mod, {Mod, start_link, Args},
- transient, 100, worker, [Mod]}),
- ok.
-
ensure_working_log_handlers() ->
Handlers = gen_event:which_handlers(error_logger),
ok = ensure_working_log_handler(error_logger_file_h,
@@ -317,7 +423,7 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
throw({error, {cannot_log_to_tty,
TTYHandler, not_installed}})
end;
- _ -> case lists:member(NewFHandler, Handlers) of
+ _ -> case lists:member(NewFHandler, Handlers) of
true -> ok;
false -> case rotate_logs(LogLocation, "",
OldFHandler, NewFHandler) of
@@ -349,12 +455,6 @@ insert_default_data() ->
DefaultReadPerm),
ok.
-start_builtin_amq_applications() ->
- %%TODO: we may want to create a separate supervisor for these so
- %%they don't bring down the entire app when they die and fail to
- %%restart
- ok.
-
rotate_logs(File, Suffix, Handler) ->
rotate_logs(File, Suffix, Handler, Handler).
@@ -377,20 +477,3 @@ log_rotation_result(ok, {error, SaslLogError}) ->
{error, {cannot_rotate_sasl_logs, SaslLogError}};
log_rotation_result(ok, ok) ->
ok.
-
-discover_static_hooks(Hook) ->
- %% App files don't let us stick arbitrary keys in, so we do
- %% something a bit icky here and go for "convention over
- %% configuration", choosing to examine modules with names starting
- %% with 'rabbit_static_hook_' to see if they have appropriate
- %% exported hook functions.
- [M || {App, _, _} <- application:loaded_applications(),
- M <- begin {ok, Ms} = application:get_key(App, modules), Ms end,
- case atom_to_list(M) of "rabbit_static_hook_" ++ _ -> true; _ -> false end,
- {module, M} == code:load_file(M),
- erlang:function_exported(M, Hook, 1)].
-
-run_static_hooks(HookModules, Hook, Event) ->
- ok = lists:foreach(fun (M) ->
- {M, Hook, ok} = {M, Hook, M:Hook(Event)}
- end, HookModules).
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 9a639ed4..534409aa 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -54,7 +54,15 @@
%%----------------------------------------------------------------------------
start() ->
- ok = alarm_handler:add_alarm_handler(?MODULE, []).
+ ok = alarm_handler:add_alarm_handler(?MODULE, []),
+ {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
+ ok = case MemoryWatermark == 0 of
+ true ->
+ ok;
+ false ->
+ rabbit_sup:start_child(vm_memory_monitor, [MemoryWatermark])
+ end,
+ ok.
stop() ->
ok = alarm_handler:delete_alarm_handler(?MODULE).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 297ed5aa..9651ae12 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -37,8 +37,14 @@
-behaviour(gen_event).
+-export([boot/0]).
+
-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]).
+boot() ->
+ {ok, DefaultVHost} = application:get_env(default_vhost),
+ ok = error_logger:add_report_handler(?MODULE, [DefaultVHost]).
+
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 3a0f9240..84658a85 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -31,7 +31,7 @@
-module(rabbit_networking).
--export([start/0, start_tcp_listener/2, start_ssl_listener/3,
+-export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3,
stop_tcp_listener/2, on_node_down/1, active_listeners/0,
node_listeners/1, connections/0, connection_info/1,
connection_info/2, connection_info_all/0,
@@ -82,6 +82,27 @@
%%----------------------------------------------------------------------------
+boot() ->
+ ok = start(),
+ ok = boot_tcp(),
+ ok = boot_ssl().
+
+boot_tcp() ->
+ {ok, TcpListeners} = application:get_env(tcp_listeners),
+ [ok = start_tcp_listener(Host, Port) || {Host, Port} <- TcpListeners],
+ ok.
+
+boot_ssl() ->
+ case application:get_env(ssl_listeners) of
+ {ok, []} ->
+ ok;
+ {ok, SslListeners} ->
+ ok = rabbit_misc:start_applications([crypto, ssl]),
+ {ok, SslOpts} = application:get_env(ssl_options),
+ [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
+ ok
+ end.
+
start() ->
{ok,_} = supervisor:start_child(
rabbit_sup,
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 9f787920..4fcfab78 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -96,12 +96,20 @@ start() ->
{ok, Module, Warnings} ->
%% This gets lots of spurious no-source warnings when we
%% have .ez files, so we want to supress them to prevent
- %% hiding real issues.
+ %% hiding real issues. On Ubuntu, we also get warnings
+ %% about kernel/stdlib sources being out of date, which we
+ %% also ignore for the same reason.
WarningStr = Module:format_warning(
[W || W <- Warnings,
case W of
{warning, {source_not_found, _}} -> false;
- _ -> true
+ {warning, {obj_out_of_date, {_,_,WApp,_,_}}}
+ when WApp == mnesia;
+ WApp == stdlib;
+ WApp == kernel;
+ WApp == sasl;
+ WApp == os_mon -> false;
+ _ -> true
end]),
case length(WarningStr) of
0 -> ok;
@@ -222,7 +230,7 @@ expand_dependencies(Current, [Next|Rest]) ->
post_process_script(ScriptFile) ->
case file:consult(ScriptFile) of
{ok, [{script, Name, Entries}]} ->
- NewEntries = process_entries(Entries),
+ NewEntries = lists:flatmap(fun process_entry/1, Entries),
case file:open(ScriptFile, [write]) of
{ok, Fd} ->
io:format(Fd, "%% script generated at ~w ~w~n~p.~n",
@@ -236,13 +244,10 @@ post_process_script(ScriptFile) ->
{error, {failed_to_load_script, Reason}}
end.
-process_entries([]) ->
- [];
-process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} |
- Rest]) ->
- [Entry, {apply,{rabbit,prepare,[]}} | Rest];
-process_entries([Entry|Rest]) ->
- [Entry | process_entries(Rest)].
+process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) ->
+ [Entry, {apply,{rabbit,prepare,[]}}];
+process_entry(Entry) ->
+ [Entry].
error(Fmt, Args) ->
io:format("ERROR: " ++ Fmt ++ "~n", Args),
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 730d7909..ef32544c 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -33,7 +33,7 @@
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/0, start_child/1, start_child/2]).
-export([init/1]).
@@ -42,5 +42,14 @@
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_child(Mod) ->
+ start_child(Mod, []).
+
+start_child(Mod, Args) ->
+ {ok, _} = supervisor:start_child(?SERVER,
+ {Mod, {Mod, start_link, Args},
+ transient, 100, worker, [Mod]}),
+ ok.
+
init([]) ->
{ok, {{one_for_one, 10, 10}, []}}.