diff options
-rw-r--r-- | src/rabbit.erl | 341 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 10 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 6 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 23 | ||||
-rw-r--r-- | src/rabbit_plugin_activator.erl | 25 | ||||
-rw-r--r-- | src/rabbit_sup.erl | 11 |
6 files changed, 274 insertions, 142 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index f8c3ef28..136b47ca 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -39,6 +39,115 @@ -export([log_location/1]). +%%--------------------------------------------------------------------------- +%% Boot steps. +-export([maybe_insert_default_data/0]). + +-rabbit_boot_step({codec_correctness_check, + [{description, "codec correctness check"}, + {mfa, {rabbit_binary_generator, + check_empty_content_body_frame_size, + []}}]}). + +-rabbit_boot_step({database, + [{mfa, {rabbit_mnesia, init, []}}, + {pre, kernel_ready}]}). + +-rabbit_boot_step({rabbit_exchange_type, + [{description, "exchange type registry"}, + {mfa, {rabbit_sup, start_child, [rabbit_exchange_type]}}, + {pre, kernel_ready}]}). + +-rabbit_boot_step({rabbit_log, + [{description, "logging server"}, + {mfa, {rabbit_sup, start_child, [rabbit_log]}}, + {pre, kernel_ready}]}). + +-rabbit_boot_step({rabbit_hooks, + [{description, "internal event notification system"}, + {mfa, {rabbit_hooks, start, []}}, + {pre, kernel_ready}]}). + +-rabbit_boot_step({kernel_ready, + [{description, "kernel ready"}]}). + +-rabbit_boot_step({rabbit_alarm, + [{description, "alarm handler"}, + {mfa, {rabbit_alarm, start, []}}, + {post, kernel_ready}, + {pre, core_initialized}]}). + +-rabbit_boot_step({rabbit_amqqueue_sup, + [{description, "queue supervisor"}, + {mfa, {rabbit_amqqueue, start, []}}, + {post, kernel_ready}, + {pre, core_initialized}]}). + +-rabbit_boot_step({rabbit_router, + [{description, "cluster router"}, + {mfa, {rabbit_sup, start_child, [rabbit_router]}}, + {post, kernel_ready}, + {pre, core_initialized}]}). + +-rabbit_boot_step({rabbit_node_monitor, + [{description, "node monitor"}, + {mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}}, + {post, kernel_ready}, + {post, rabbit_amqqueue_sup}, + {pre, core_initialized}]}). + +-rabbit_boot_step({rabbit_exchange_events, + [{description, "exchange event notifier"}, + {mfa, {rabbit_sup, start_child, [rabbit_exchange_events]}}, + {post, kernel_ready}, + {pre, core_initialized}]}). + +-rabbit_boot_step({core_initialized, + [{description, "core initialized"}]}). + +-rabbit_boot_step({empty_db_check, + [{description, "empty DB check"}, + {mfa, {?MODULE, maybe_insert_default_data, []}}, + {post, core_initialized}]}). + +-rabbit_boot_step({exchange_recovery, + [{description, "exchange recovery"}, + {mfa, {rabbit_exchange, recover, []}}, + {post, empty_db_check}]}). + +-rabbit_boot_step({queue_recovery, + [{description, "queue recovery"}, + {mfa, {rabbit_amqqueue, recover, []}}, + {post, exchange_recovery}]}). + +-rabbit_boot_step({persister, + [{mfa, {rabbit_sup, start_child, [rabbit_persister]}}, + {post, queue_recovery}]}). + +-rabbit_boot_step({guid_generator, + [{description, "guid generator"}, + {mfa, {rabbit_sup, start_child, [rabbit_guid]}}, + {post, persister}, + {pre, routing_ready}]}). + +-rabbit_boot_step({routing_ready, + [{description, "message delivery logic ready"}]}). + +-rabbit_boot_step({log_relay, + [{description, "error log relay"}, + {mfa, {rabbit_error_logger, boot, []}}, + {post, routing_ready}]}). + +-rabbit_boot_step({networking, + [{mfa, {rabbit_networking, boot, []}}, + {post, log_relay}, + {pre, networking_listening}]}). + +-rabbit_boot_step({networking_listening, + [{description, "network listeners available"}]}). + +%%--------------------------------------------------------------------------- + -import(application). -import(mnesia). -import(lists). @@ -79,7 +188,7 @@ prepare() -> start() -> try ok = prepare(), - ok = rabbit_misc:start_applications(?APPS) + ok = rabbit_misc:start_applications(?APPS) after %%give the error loggers some time to catch up timer:sleep(100) @@ -115,102 +224,10 @@ rotate_logs(BinarySuffix) -> %%-------------------------------------------------------------------- start(normal, []) -> - {ok, SupPid} = rabbit_sup:start_link(), print_banner(), - - HookModules = discover_static_hooks(startup_hook), - - lists:foreach( - fun ({Phase, Msg, Thunk}) -> - io:format("starting ~-20s ...", [Msg]), - ok = run_static_hooks(HookModules, startup_hook, {pre, Phase}), - Thunk(), - ok = run_static_hooks(HookModules, startup_hook, {post, Phase}), - io:format("done~n"); - ({Phase, Msg, M, F, A}) -> - io:format("starting ~-20s ...", [Msg]), - ok = run_static_hooks(HookModules, startup_hook, {pre, Phase}), - apply(M, F, A), - ok = run_static_hooks(HookModules, startup_hook, {post, Phase}), - io:format("done~n") - end, - [{database, "database", - fun () -> ok = rabbit_mnesia:init() end}, - {core_processes, "core processes", - fun () -> - ok = start_child(rabbit_exchange_type), - ok = start_child(rabbit_log), - ok = rabbit_hooks:start(), - - ok = rabbit_binary_generator: - check_empty_content_body_frame_size(), - - ok = rabbit_alarm:start(), - - {ok, MemoryWatermark} = - application:get_env(vm_memory_high_watermark), - ok = case MemoryWatermark == 0 of - true -> - ok; - false -> - start_child(vm_memory_monitor, [MemoryWatermark]) - end, - - ok = rabbit_amqqueue:start(), - - ok = start_child(rabbit_router), - ok = start_child(rabbit_node_monitor), - ok = start_child(rabbit_exchange_events) - end}, - {recovery, "recovery", - fun () -> - ok = maybe_insert_default_data(), - ok = rabbit_exchange:recover(), - ok = rabbit_amqqueue:recover() - end}, - {persister, "persister", - fun () -> - ok = start_child(rabbit_persister) - end}, - {guid_generator, "guid generator", - fun () -> - ok = start_child(rabbit_guid) - end}, - {builtin_applications, "builtin applications", - fun () -> - {ok, DefaultVHost} = application:get_env(default_vhost), - ok = error_logger:add_report_handler( - rabbit_error_logger, [DefaultVHost]), - ok = start_builtin_amq_applications() - end}, - {tcp_listeners, "TCP listeners", - fun () -> - ok = rabbit_networking:start(), - {ok, TcpListeners} = application:get_env(tcp_listeners), - lists:foreach( - fun ({Host, Port}) -> - ok = rabbit_networking:start_tcp_listener(Host, Port) - end, - TcpListeners) - end}, - {ssl_listeners, "SSL listeners", - fun () -> - case application:get_env(ssl_listeners) of - {ok, []} -> - ok; - {ok, SslListeners} -> - ok = rabbit_misc:start_applications([crypto, ssl]), - - {ok, SslOpts} = application:get_env(ssl_options), - - [rabbit_networking:start_ssl_listener - (Host, Port, SslOpts) || {Host, Port} <- SslListeners], - ok - end - end}]), - + [ok = run_boot_step(Step) || Step <- boot_steps()], io:format("~nbroker running~n"), {ok, SupPid}. @@ -224,10 +241,108 @@ stop(_State) -> end, ok. -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- + +boot_error(Format, Args) -> + io:format("BOOT ERROR: " ++ Format, Args), + error_logger:error_msg(Format, Args), + timer:sleep(1000), + exit({?MODULE, failure_during_boot}). + +run_boot_step({StepName, Attributes}) -> + Description = case lists:keysearch(description, 1, Attributes) of + {value, {_, D}} -> D; + false -> StepName + end, + case [MFA || {mfa, MFA} <- Attributes] of + [] -> + io:format("progress -- ~s~n", [Description]); + MFAs -> + io:format("starting ~-40s ...", [Description]), + [case catch apply(M,F,A) of + {'EXIT', Reason} -> + boot_error("FAILED~nReason: ~p~n", [Reason]); + ok -> + ok + end || {M,F,A} <- MFAs], + io:format("done~n"), + ok + end. + +boot_steps() -> + AllApps = [App || {App, _, _} <- application:loaded_applications()], + Modules = lists:usort( + lists:append([Modules + || {ok, Modules} <- + [application:get_key(App, modules) + || App <- AllApps]])), + UnsortedSteps = + lists:flatmap(fun (Module) -> + [{StepName, Attributes} + || {rabbit_boot_step, [{StepName, Attributes}]} + <- Module:module_info(attributes)] + end, Modules), + sort_boot_steps(UnsortedSteps). + +sort_boot_steps(UnsortedSteps) -> + G = digraph:new([acyclic]), + + %% Add vertices, with duplicate checking. + [case digraph:vertex(G, StepName) of + false -> digraph:add_vertex(G, StepName, Step); + _ -> boot_error("Duplicate boot step name: ~w~n", [StepName]) + end || Step = {StepName, _Attrs} <- UnsortedSteps], + + %% Add edges, detecting cycles and missing vertices. + lists:foreach(fun ({StepName, Attributes}) -> + [add_boot_step_dep(G, StepName, PrecedingStepName) + || {post, PrecedingStepName} <- Attributes], + [add_boot_step_dep(G, SucceedingStepName, StepName) + || {pre, SucceedingStepName} <- Attributes] + end, UnsortedSteps), + + %% Use topological sort to find a consistent ordering (if there is + %% one, otherwise fail). + SortedStepsRev = [begin + {StepName, Step} = digraph:vertex(G, StepName), + Step + end || StepName <- digraph_utils:topsort(G)], + SortedSteps = lists:reverse(SortedStepsRev), + + digraph:delete(G), + + %% Check that all mentioned {M,F,A} triples are exported. + case [{StepName, {M,F,A}} + || {StepName, Attributes} <- SortedSteps, + {mfa, {M,F,A}} <- Attributes, + not erlang:function_exported(M, F, length(A))] of + [] -> SortedSteps; + MissingFunctions -> boot_error("Boot step functions not exported: ~p~n", + [MissingFunctions]) + end. + +add_boot_step_dep(G, RunsSecond, RunsFirst) -> + case digraph:add_edge(G, RunsSecond, RunsFirst) of + {error, Reason} -> + boot_error("Could not add boot step dependency of ~w on ~w:~n~s", + [RunsSecond, RunsFirst, + case Reason of + {bad_vertex, V} -> + io_lib:format("Boot step not registered: ~w~n", [V]); + {bad_edge, [First | Rest]} -> + [io_lib:format("Cyclic dependency: ~w", [First]), + [io_lib:format(" depends on ~w", [Next]) + || Next <- Rest], + io_lib:format(" depends on ~w~n", [First])] + end]); + _ -> + ok + end. + +%%--------------------------------------------------------------------------- log_location(Type) -> - case application:get_env(Type, case Type of + case application:get_env(Type, case Type of kernel -> error_logger; sasl -> sasl_error_logger end) of @@ -283,15 +398,6 @@ print_banner() -> lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings), io:nl(). -start_child(Mod) -> - start_child(Mod, []). - -start_child(Mod, Args) -> - {ok,_} = supervisor:start_child(rabbit_sup, - {Mod, {Mod, start_link, Args}, - transient, 100, worker, [Mod]}), - ok. - ensure_working_log_handlers() -> Handlers = gen_event:which_handlers(error_logger), ok = ensure_working_log_handler(error_logger_file_h, @@ -317,7 +423,7 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler, throw({error, {cannot_log_to_tty, TTYHandler, not_installed}}) end; - _ -> case lists:member(NewFHandler, Handlers) of + _ -> case lists:member(NewFHandler, Handlers) of true -> ok; false -> case rotate_logs(LogLocation, "", OldFHandler, NewFHandler) of @@ -349,12 +455,6 @@ insert_default_data() -> DefaultReadPerm), ok. -start_builtin_amq_applications() -> - %%TODO: we may want to create a separate supervisor for these so - %%they don't bring down the entire app when they die and fail to - %%restart - ok. - rotate_logs(File, Suffix, Handler) -> rotate_logs(File, Suffix, Handler, Handler). @@ -377,20 +477,3 @@ log_rotation_result(ok, {error, SaslLogError}) -> {error, {cannot_rotate_sasl_logs, SaslLogError}}; log_rotation_result(ok, ok) -> ok. - -discover_static_hooks(Hook) -> - %% App files don't let us stick arbitrary keys in, so we do - %% something a bit icky here and go for "convention over - %% configuration", choosing to examine modules with names starting - %% with 'rabbit_static_hook_' to see if they have appropriate - %% exported hook functions. - [M || {App, _, _} <- application:loaded_applications(), - M <- begin {ok, Ms} = application:get_key(App, modules), Ms end, - case atom_to_list(M) of "rabbit_static_hook_" ++ _ -> true; _ -> false end, - {module, M} == code:load_file(M), - erlang:function_exported(M, Hook, 1)]. - -run_static_hooks(HookModules, Hook, Event) -> - ok = lists:foreach(fun (M) -> - {M, Hook, ok} = {M, Hook, M:Hook(Event)} - end, HookModules). diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 9a639ed4..534409aa 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -54,7 +54,15 @@ %%---------------------------------------------------------------------------- start() -> - ok = alarm_handler:add_alarm_handler(?MODULE, []). + ok = alarm_handler:add_alarm_handler(?MODULE, []), + {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), + ok = case MemoryWatermark == 0 of + true -> + ok; + false -> + rabbit_sup:start_child(vm_memory_monitor, [MemoryWatermark]) + end, + ok. stop() -> ok = alarm_handler:delete_alarm_handler(?MODULE). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 297ed5aa..9651ae12 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -37,8 +37,14 @@ -behaviour(gen_event). +-export([boot/0]). + -export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]). +boot() -> + {ok, DefaultVHost} = application:get_env(default_vhost), + ok = error_logger:add_report_handler(?MODULE, [DefaultVHost]). + init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 3a0f9240..84658a85 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -31,7 +31,7 @@ -module(rabbit_networking). --export([start/0, start_tcp_listener/2, start_ssl_listener/3, +-export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3, stop_tcp_listener/2, on_node_down/1, active_listeners/0, node_listeners/1, connections/0, connection_info/1, connection_info/2, connection_info_all/0, @@ -82,6 +82,27 @@ %%---------------------------------------------------------------------------- +boot() -> + ok = start(), + ok = boot_tcp(), + ok = boot_ssl(). + +boot_tcp() -> + {ok, TcpListeners} = application:get_env(tcp_listeners), + [ok = start_tcp_listener(Host, Port) || {Host, Port} <- TcpListeners], + ok. + +boot_ssl() -> + case application:get_env(ssl_listeners) of + {ok, []} -> + ok; + {ok, SslListeners} -> + ok = rabbit_misc:start_applications([crypto, ssl]), + {ok, SslOpts} = application:get_env(ssl_options), + [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners], + ok + end. + start() -> {ok,_} = supervisor:start_child( rabbit_sup, diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 9f787920..4fcfab78 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -96,12 +96,20 @@ start() -> {ok, Module, Warnings} -> %% This gets lots of spurious no-source warnings when we %% have .ez files, so we want to supress them to prevent - %% hiding real issues. + %% hiding real issues. On Ubuntu, we also get warnings + %% about kernel/stdlib sources being out of date, which we + %% also ignore for the same reason. WarningStr = Module:format_warning( [W || W <- Warnings, case W of {warning, {source_not_found, _}} -> false; - _ -> true + {warning, {obj_out_of_date, {_,_,WApp,_,_}}} + when WApp == mnesia; + WApp == stdlib; + WApp == kernel; + WApp == sasl; + WApp == os_mon -> false; + _ -> true end]), case length(WarningStr) of 0 -> ok; @@ -222,7 +230,7 @@ expand_dependencies(Current, [Next|Rest]) -> post_process_script(ScriptFile) -> case file:consult(ScriptFile) of {ok, [{script, Name, Entries}]} -> - NewEntries = process_entries(Entries), + NewEntries = lists:flatmap(fun process_entry/1, Entries), case file:open(ScriptFile, [write]) of {ok, Fd} -> io:format(Fd, "%% script generated at ~w ~w~n~p.~n", @@ -236,13 +244,10 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entries([]) -> - []; -process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} | - Rest]) -> - [Entry, {apply,{rabbit,prepare,[]}} | Rest]; -process_entries([Entry|Rest]) -> - [Entry | process_entries(Rest)]. +process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) -> + [Entry, {apply,{rabbit,prepare,[]}}]; +process_entry(Entry) -> + [Entry]. error(Fmt, Args) -> io:format("ERROR: " ++ Fmt ++ "~n", Args), diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 730d7909..ef32544c 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor). --export([start_link/0]). +-export([start_link/0, start_child/1, start_child/2]). -export([init/1]). @@ -42,5 +42,14 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). +start_child(Mod) -> + start_child(Mod, []). + +start_child(Mod, Args) -> + {ok, _} = supervisor:start_child(?SERVER, + {Mod, {Mod, start_link, Args}, + transient, 100, worker, [Mod]}), + ok. + init([]) -> {ok, {{one_for_one, 10, 10}, []}}. |