diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-01-19 21:45:38 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-01-19 21:45:38 +0000 |
commit | b974a4a86e8ae233a80ed10a85fea00f7a3321d4 (patch) | |
tree | d0ab27671b1e55929f5297762a23c63a37a0eeae | |
parent | 1b13d8c1784a6724d4a64e26eb53b95d3f40a40e (diff) | |
parent | 3da3fe70f3ca2fcb8debbc23a7a92447b83f555b (diff) | |
download | rabbitmq-server-b974a4a86e8ae233a80ed10a85fea00f7a3321d4.tar.gz |
merge v1_7 into default
-rw-r--r-- | src/rabbit.erl | 306 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 10 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 6 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 7 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 23 | ||||
-rw-r--r-- | src/rabbit_plugin_activator.erl | 25 | ||||
-rw-r--r-- | src/rabbit_sup.erl | 11 |
7 files changed, 269 insertions, 119 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index c6dde385..5314540f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -39,6 +39,104 @@ -export([log_location/1]). +%%--------------------------------------------------------------------------- +%% Boot steps. +-export([maybe_insert_default_data/0]). + +-rabbit_boot_step({codec_correctness_check, + [{description, "codec correctness check"}, + {mfa, {rabbit_binary_generator, + check_empty_content_body_frame_size, + []}}]}). + +-rabbit_boot_step({database, + [{mfa, {rabbit_mnesia, init, []}}, + {pre, kernel_ready}]}). + +-rabbit_boot_step({rabbit_log, + [{description, "logging server"}, + {mfa, {rabbit_sup, start_child, [rabbit_log]}}, + {pre, kernel_ready}]}). + +-rabbit_boot_step({rabbit_hooks, + [{description, "internal event notification system"}, + {mfa, {rabbit_hooks, start, []}}, + {pre, kernel_ready}]}). + +-rabbit_boot_step({kernel_ready, + [{description, "kernel ready"}]}). + +-rabbit_boot_step({rabbit_alarm, + [{description, "alarm handler"}, + {mfa, {rabbit_alarm, start, []}}, + {post, kernel_ready}, + {pre, core_initialized}]}). + +-rabbit_boot_step({rabbit_amqqueue_sup, + [{description, "queue supervisor"}, + {mfa, {rabbit_amqqueue, start, []}}, + {post, kernel_ready}, + {pre, core_initialized}]}). + +-rabbit_boot_step({rabbit_router, + [{description, "cluster router"}, + {mfa, {rabbit_sup, start_child, [rabbit_router]}}, + {post, kernel_ready}, + {pre, core_initialized}]}). + +-rabbit_boot_step({rabbit_node_monitor, + [{description, "node monitor"}, + {mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}}, + {post, kernel_ready}, + {post, rabbit_amqqueue_sup}, + {pre, core_initialized}]}). + +-rabbit_boot_step({core_initialized, + [{description, "core initialized"}]}). + +-rabbit_boot_step({empty_db_check, + [{description, "empty DB check"}, + {mfa, {?MODULE, maybe_insert_default_data, []}}, + {post, core_initialized}]}). + +-rabbit_boot_step({exchange_recovery, + [{description, "exchange recovery"}, + {mfa, {rabbit_exchange, recover, []}}, + {post, empty_db_check}]}). + +-rabbit_boot_step({queue_recovery, + [{description, "queue recovery"}, + {mfa, {rabbit_amqqueue, recover, []}}, + {post, exchange_recovery}]}). + +-rabbit_boot_step({persister, + [{mfa, {rabbit_sup, start_child, [rabbit_persister]}}, + {post, queue_recovery}]}). + +-rabbit_boot_step({guid_generator, + [{description, "guid generator"}, + {mfa, {rabbit_sup, start_child, [rabbit_guid]}}, + {post, persister}, + {pre, routing_ready}]}). + +-rabbit_boot_step({routing_ready, + [{description, "message delivery logic ready"}]}). + +-rabbit_boot_step({log_relay, + [{description, "error log relay"}, + {mfa, {rabbit_error_logger, boot, []}}, + {post, routing_ready}]}). + +-rabbit_boot_step({networking, + [{mfa, {rabbit_networking, boot, []}}, + {post, log_relay}, + {pre, networking_listening}]}). + +-rabbit_boot_step({networking_listening, + [{description, "network listeners available"}]}). + +%%--------------------------------------------------------------------------- + -import(application). -import(mnesia). -import(lists). @@ -79,7 +177,7 @@ prepare() -> start() -> try ok = prepare(), - ok = rabbit_misc:start_applications(?APPS) + ok = rabbit_misc:start_applications(?APPS) after %%give the error loggers some time to catch up timer:sleep(100) @@ -115,98 +213,15 @@ rotate_logs(BinarySuffix) -> %%-------------------------------------------------------------------- start(normal, []) -> - {ok, SupPid} = rabbit_sup:start_link(), print_banner(), - - lists:foreach( - fun ({Msg, Thunk}) -> - io:format("starting ~-20s ...", [Msg]), - Thunk(), - io:format("done~n"); - ({Msg, M, F, A}) -> - io:format("starting ~-20s ...", [Msg]), - apply(M, F, A), - io:format("done~n") - end, - [{"database", - fun () -> ok = rabbit_mnesia:init() end}, - {"core processes", - fun () -> - ok = start_child(rabbit_log), - ok = rabbit_hooks:start(), - - ok = rabbit_binary_generator: - check_empty_content_body_frame_size(), - - ok = rabbit_alarm:start(), - - {ok, MemoryWatermark} = - application:get_env(vm_memory_high_watermark), - ok = case MemoryWatermark == 0 of - true -> - ok; - false -> - start_child(vm_memory_monitor, [MemoryWatermark]) - end, - - ok = rabbit_amqqueue:start(), - - ok = start_child(rabbit_router), - ok = start_child(rabbit_node_monitor) - end}, - {"recovery", - fun () -> - ok = maybe_insert_default_data(), - ok = rabbit_exchange:recover(), - ok = rabbit_amqqueue:recover() - end}, - {"persister", - fun () -> - ok = start_child(rabbit_persister) - end}, - {"guid generator", - fun () -> - ok = start_child(rabbit_guid) - end}, - {"builtin applications", - fun () -> - {ok, DefaultVHost} = application:get_env(default_vhost), - ok = error_logger:add_report_handler( - rabbit_error_logger, [DefaultVHost]), - ok = start_builtin_amq_applications() - end}, - {"TCP listeners", - fun () -> - ok = rabbit_networking:start(), - {ok, TcpListeners} = application:get_env(tcp_listeners), - lists:foreach( - fun ({Host, Port}) -> - ok = rabbit_networking:start_tcp_listener(Host, Port) - end, - TcpListeners) - end}, - {"SSL listeners", - fun () -> - case application:get_env(ssl_listeners) of - {ok, []} -> - ok; - {ok, SslListeners} -> - ok = rabbit_misc:start_applications([crypto, ssl]), - - {ok, SslOpts} = application:get_env(ssl_options), - - [rabbit_networking:start_ssl_listener - (Host, Port, SslOpts) || {Host, Port} <- SslListeners], - ok - end - end}]), - + [ok = run_boot_step(Step) || Step <- boot_steps()], io:format("~nbroker running~n"), {ok, SupPid}. + stop(_State) -> terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), ok = rabbit_alarm:stop(), @@ -216,10 +231,108 @@ stop(_State) -> end, ok. -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- + +boot_error(Format, Args) -> + io:format("BOOT ERROR: " ++ Format, Args), + error_logger:error_msg(Format, Args), + timer:sleep(1000), + exit({?MODULE, failure_during_boot}). + +run_boot_step({StepName, Attributes}) -> + Description = case lists:keysearch(description, 1, Attributes) of + {value, {_, D}} -> D; + false -> StepName + end, + case [MFA || {mfa, MFA} <- Attributes] of + [] -> + io:format("progress -- ~s~n", [Description]); + MFAs -> + io:format("starting ~-40s ...", [Description]), + [case catch apply(M,F,A) of + {'EXIT', Reason} -> + boot_error("FAILED~nReason: ~p~n", [Reason]); + ok -> + ok + end || {M,F,A} <- MFAs], + io:format("done~n"), + ok + end. + +boot_steps() -> + AllApps = [App || {App, _, _} <- application:loaded_applications()], + Modules = lists:usort( + lists:append([Modules + || {ok, Modules} <- + [application:get_key(App, modules) + || App <- AllApps]])), + UnsortedSteps = + lists:flatmap(fun (Module) -> + [{StepName, Attributes} + || {rabbit_boot_step, [{StepName, Attributes}]} + <- Module:module_info(attributes)] + end, Modules), + sort_boot_steps(UnsortedSteps). + +sort_boot_steps(UnsortedSteps) -> + G = digraph:new([acyclic]), + + %% Add vertices, with duplicate checking. + [case digraph:vertex(G, StepName) of + false -> digraph:add_vertex(G, StepName, Step); + _ -> boot_error("Duplicate boot step name: ~w~n", [StepName]) + end || Step = {StepName, _Attrs} <- UnsortedSteps], + + %% Add edges, detecting cycles and missing vertices. + lists:foreach(fun ({StepName, Attributes}) -> + [add_boot_step_dep(G, StepName, PrecedingStepName) + || {post, PrecedingStepName} <- Attributes], + [add_boot_step_dep(G, SucceedingStepName, StepName) + || {pre, SucceedingStepName} <- Attributes] + end, UnsortedSteps), + + %% Use topological sort to find a consistent ordering (if there is + %% one, otherwise fail). + SortedStepsRev = [begin + {StepName, Step} = digraph:vertex(G, StepName), + Step + end || StepName <- digraph_utils:topsort(G)], + SortedSteps = lists:reverse(SortedStepsRev), + + digraph:delete(G), + + %% Check that all mentioned {M,F,A} triples are exported. + case [{StepName, {M,F,A}} + || {StepName, Attributes} <- SortedSteps, + {mfa, {M,F,A}} <- Attributes, + not erlang:function_exported(M, F, length(A))] of + [] -> SortedSteps; + MissingFunctions -> boot_error("Boot step functions not exported: ~p~n", + [MissingFunctions]) + end. + +add_boot_step_dep(G, RunsSecond, RunsFirst) -> + case digraph:add_edge(G, RunsSecond, RunsFirst) of + {error, Reason} -> + boot_error("Could not add boot step dependency of ~w on ~w:~n~s", + [RunsSecond, RunsFirst, + case Reason of + {bad_vertex, V} -> + io_lib:format("Boot step not registered: ~w~n", [V]); + {bad_edge, [First | Rest]} -> + [io_lib:format("Cyclic dependency: ~w", [First]), + [io_lib:format(" depends on ~w", [Next]) + || Next <- Rest], + io_lib:format(" depends on ~w~n", [First])] + end]); + _ -> + ok + end. + +%%--------------------------------------------------------------------------- log_location(Type) -> - case application:get_env(Type, case Type of + case application:get_env(Type, case Type of kernel -> error_logger; sasl -> sasl_error_logger end) of @@ -275,15 +388,6 @@ print_banner() -> lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings), io:nl(). -start_child(Mod) -> - start_child(Mod, []). - -start_child(Mod, Args) -> - {ok,_} = supervisor:start_child(rabbit_sup, - {Mod, {Mod, start_link, Args}, - transient, 100, worker, [Mod]}), - ok. - ensure_working_log_handlers() -> Handlers = gen_event:which_handlers(error_logger), ok = ensure_working_log_handler(error_logger_file_h, @@ -309,7 +413,7 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler, throw({error, {cannot_log_to_tty, TTYHandler, not_installed}}) end; - _ -> case lists:member(NewFHandler, Handlers) of + _ -> case lists:member(NewFHandler, Handlers) of true -> ok; false -> case rotate_logs(LogLocation, "", OldFHandler, NewFHandler) of @@ -341,12 +445,6 @@ insert_default_data() -> DefaultReadPerm), ok. -start_builtin_amq_applications() -> - %%TODO: we may want to create a separate supervisor for these so - %%they don't bring down the entire app when they die and fail to - %%restart - ok. - rotate_logs(File, Suffix, Handler) -> rotate_logs(File, Suffix, Handler, Handler). diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 9a639ed4..534409aa 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -54,7 +54,15 @@ %%---------------------------------------------------------------------------- start() -> - ok = alarm_handler:add_alarm_handler(?MODULE, []). + ok = alarm_handler:add_alarm_handler(?MODULE, []), + {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), + ok = case MemoryWatermark == 0 of + true -> + ok; + false -> + rabbit_sup:start_child(vm_memory_monitor, [MemoryWatermark]) + end, + ok. stop() -> ok = alarm_handler:delete_alarm_handler(?MODULE). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index b28574b7..b9bd71b7 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -37,8 +37,14 @@ -behaviour(gen_event). +-export([boot/0]). + -export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]). +boot() -> + {ok, DefaultVHost} = application:get_env(default_vhost), + ok = error_logger:add_report_handler(?MODULE, [DefaultVHost]). + init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9762619f..0866da3f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -97,7 +97,7 @@ -spec(enable_cover/1 :: (string()) -> ok_or_error()). -spec(report_cover/1 :: (string()) -> 'ok'). -spec(throw_on_error/2 :: - (atom(), thunk({error, any()} | {ok, A} | A)) -> A). + (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). -spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(with_user/2 :: (username(), thunk(A)) -> A). @@ -340,6 +340,9 @@ intersperse(Sep, [E|T]) -> [E, Sep | intersperse(Sep, T)]. %% This is a modified version of Luke Gorrie's pmap - %% http://lukego.livejournal.com/6753.html - that doesn't care about %% the order in which results are received. +%% +%% WARNING: This is is deliberately lightweight rather than robust -- if F +%% throws, upmap will hang forever, so make sure F doesn't throw! upmap(F, L) -> Parent = self(), Ref = make_ref(), @@ -428,7 +431,7 @@ append_file(File, _, Suffix) -> ensure_parent_dirs_exist(Filename) -> case filelib:ensure_dir(Filename) of ok -> ok; - {error, Reason} -> + {error, Reason} -> throw({error, {cannot_create_parent_dirs, Filename, Reason}}) end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 3a0f9240..84658a85 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -31,7 +31,7 @@ -module(rabbit_networking). --export([start/0, start_tcp_listener/2, start_ssl_listener/3, +-export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3, stop_tcp_listener/2, on_node_down/1, active_listeners/0, node_listeners/1, connections/0, connection_info/1, connection_info/2, connection_info_all/0, @@ -82,6 +82,27 @@ %%---------------------------------------------------------------------------- +boot() -> + ok = start(), + ok = boot_tcp(), + ok = boot_ssl(). + +boot_tcp() -> + {ok, TcpListeners} = application:get_env(tcp_listeners), + [ok = start_tcp_listener(Host, Port) || {Host, Port} <- TcpListeners], + ok. + +boot_ssl() -> + case application:get_env(ssl_listeners) of + {ok, []} -> + ok; + {ok, SslListeners} -> + ok = rabbit_misc:start_applications([crypto, ssl]), + {ok, SslOpts} = application:get_env(ssl_options), + [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners], + ok + end. + start() -> {ok,_} = supervisor:start_child( rabbit_sup, diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 9f787920..4fcfab78 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -96,12 +96,20 @@ start() -> {ok, Module, Warnings} -> %% This gets lots of spurious no-source warnings when we %% have .ez files, so we want to supress them to prevent - %% hiding real issues. + %% hiding real issues. On Ubuntu, we also get warnings + %% about kernel/stdlib sources being out of date, which we + %% also ignore for the same reason. WarningStr = Module:format_warning( [W || W <- Warnings, case W of {warning, {source_not_found, _}} -> false; - _ -> true + {warning, {obj_out_of_date, {_,_,WApp,_,_}}} + when WApp == mnesia; + WApp == stdlib; + WApp == kernel; + WApp == sasl; + WApp == os_mon -> false; + _ -> true end]), case length(WarningStr) of 0 -> ok; @@ -222,7 +230,7 @@ expand_dependencies(Current, [Next|Rest]) -> post_process_script(ScriptFile) -> case file:consult(ScriptFile) of {ok, [{script, Name, Entries}]} -> - NewEntries = process_entries(Entries), + NewEntries = lists:flatmap(fun process_entry/1, Entries), case file:open(ScriptFile, [write]) of {ok, Fd} -> io:format(Fd, "%% script generated at ~w ~w~n~p.~n", @@ -236,13 +244,10 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entries([]) -> - []; -process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} | - Rest]) -> - [Entry, {apply,{rabbit,prepare,[]}} | Rest]; -process_entries([Entry|Rest]) -> - [Entry | process_entries(Rest)]. +process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) -> + [Entry, {apply,{rabbit,prepare,[]}}]; +process_entry(Entry) -> + [Entry]. error(Fmt, Args) -> io:format("ERROR: " ++ Fmt ++ "~n", Args), diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 730d7909..ef32544c 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor). --export([start_link/0]). +-export([start_link/0, start_child/1, start_child/2]). -export([init/1]). @@ -42,5 +42,14 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). +start_child(Mod) -> + start_child(Mod, []). + +start_child(Mod, Args) -> + {ok, _} = supervisor:start_child(?SERVER, + {Mod, {Mod, start_link, Args}, + transient, 100, worker, [Mod]}), + ok. + init([]) -> {ok, {{one_for_one, 10, 10}, []}}. |