summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-01-25 14:40:21 +0000
committerMatthew Sackman <matthew@lshift.net>2010-01-25 14:40:21 +0000
commit1fba7a766887416c037f6b91d3f69823e78febbc (patch)
tree362d48d85bad28218f06f8add0a5afbbdb54bd68
parentee7914b92d9c190efbd5726c8946a96f28721d6a (diff)
parentf4f262feca4ca84fe3c76c220376361d4adf7744 (diff)
downloadrabbitmq-server-1fba7a766887416c037f6b91d3f69823e78febbc.tar.gz
Merged v1_7 into default
-rw-r--r--.hgignore1
-rw-r--r--packaging/macports/Makefile1
-rw-r--r--src/rabbit.erl306
-rw-r--r--src/rabbit_alarm.erl10
-rw-r--r--src/rabbit_error_logger.erl6
-rw-r--r--src/rabbit_misc.erl7
-rw-r--r--src/rabbit_networking.erl23
-rw-r--r--src/rabbit_plugin_activator.erl25
-rw-r--r--src/rabbit_sup.erl11
9 files changed, 271 insertions, 119 deletions
diff --git a/.hgignore b/.hgignore
index ccd0b09f..fd096dda 100644
--- a/.hgignore
+++ b/.hgignore
@@ -19,6 +19,7 @@ syntax: regexp
^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$
^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$
^packaging/debs/apt-repository/debian$
+^packaging/macports/macports$
^packaging/generic-unix/rabbitmq-server-generic-unix-.*\.tar\.gz$
^packaging/windows/rabbitmq-server-windows-.*\.zip$
diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile
index 53d27f9b..4db305eb 100644
--- a/packaging/macports/Makefile
+++ b/packaging/macports/Makefile
@@ -29,6 +29,7 @@ $(DEST)/Portfile: Portfile.in
done >checksums.sed
sed -e "s|@VERSION@|$(VERSION)|g;s|@BASE_URL@|$(REAL_WEB_URL)|g" \
-f checksums.sed <$^ >$@
+ rm checksums.sed
macports: dirs $(DEST)/Portfile
for f in rabbitmq-asroot-script-wrapper rabbitmq-script-wrapper ; do \
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c6dde385..88b8e7a4 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -39,6 +39,104 @@
-export([log_location/1]).
+%%---------------------------------------------------------------------------
+%% Boot steps.
+-export([maybe_insert_default_data/0]).
+
+-rabbit_boot_step({codec_correctness_check,
+ [{description, "codec correctness check"},
+ {mfa, {rabbit_binary_generator,
+ check_empty_content_body_frame_size,
+ []}}]}).
+
+-rabbit_boot_step({database,
+ [{mfa, {rabbit_mnesia, init, []}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({rabbit_log,
+ [{description, "logging server"},
+ {mfa, {rabbit_sup, start_child, [rabbit_log]}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({rabbit_hooks,
+ [{description, "internal event notification system"},
+ {mfa, {rabbit_hooks, start, []}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({kernel_ready,
+ [{description, "kernel ready"}]}).
+
+-rabbit_boot_step({rabbit_alarm,
+ [{description, "alarm handler"},
+ {mfa, {rabbit_alarm, start, []}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_amqqueue_sup,
+ [{description, "queue supervisor"},
+ {mfa, {rabbit_amqqueue, start, []}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_router,
+ [{description, "cluster router"},
+ {mfa, {rabbit_sup, start_child, [rabbit_router]}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_node_monitor,
+ [{description, "node monitor"},
+ {mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}},
+ {post, kernel_ready},
+ {post, rabbit_amqqueue_sup},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({core_initialized,
+ [{description, "core initialized"}]}).
+
+-rabbit_boot_step({empty_db_check,
+ [{description, "empty DB check"},
+ {mfa, {?MODULE, maybe_insert_default_data, []}},
+ {post, core_initialized}]}).
+
+-rabbit_boot_step({exchange_recovery,
+ [{description, "exchange recovery"},
+ {mfa, {rabbit_exchange, recover, []}},
+ {post, empty_db_check}]}).
+
+-rabbit_boot_step({queue_recovery,
+ [{description, "queue recovery"},
+ {mfa, {rabbit_amqqueue, recover, []}},
+ {post, exchange_recovery}]}).
+
+-rabbit_boot_step({persister,
+ [{mfa, {rabbit_sup, start_child, [rabbit_persister]}},
+ {post, queue_recovery}]}).
+
+-rabbit_boot_step({guid_generator,
+ [{description, "guid generator"},
+ {mfa, {rabbit_sup, start_child, [rabbit_guid]}},
+ {post, persister},
+ {pre, routing_ready}]}).
+
+-rabbit_boot_step({routing_ready,
+ [{description, "message delivery logic ready"}]}).
+
+-rabbit_boot_step({log_relay,
+ [{description, "error log relay"},
+ {mfa, {rabbit_error_logger, boot, []}},
+ {post, routing_ready}]}).
+
+-rabbit_boot_step({networking,
+ [{mfa, {rabbit_networking, boot, []}},
+ {post, log_relay},
+ {pre, networking_listening}]}).
+
+-rabbit_boot_step({networking_listening,
+ [{description, "network listeners available"}]}).
+
+%%---------------------------------------------------------------------------
+
-import(application).
-import(mnesia).
-import(lists).
@@ -79,7 +177,7 @@ prepare() ->
start() ->
try
ok = prepare(),
- ok = rabbit_misc:start_applications(?APPS)
+ ok = rabbit_misc:start_applications(?APPS)
after
%%give the error loggers some time to catch up
timer:sleep(100)
@@ -115,98 +213,15 @@ rotate_logs(BinarySuffix) ->
%%--------------------------------------------------------------------
start(normal, []) ->
-
{ok, SupPid} = rabbit_sup:start_link(),
print_banner(),
-
- lists:foreach(
- fun ({Msg, Thunk}) ->
- io:format("starting ~-20s ...", [Msg]),
- Thunk(),
- io:format("done~n");
- ({Msg, M, F, A}) ->
- io:format("starting ~-20s ...", [Msg]),
- apply(M, F, A),
- io:format("done~n")
- end,
- [{"database",
- fun () -> ok = rabbit_mnesia:init() end},
- {"core processes",
- fun () ->
- ok = start_child(rabbit_log),
- ok = rabbit_hooks:start(),
-
- ok = rabbit_binary_generator:
- check_empty_content_body_frame_size(),
-
- ok = rabbit_alarm:start(),
-
- {ok, MemoryWatermark} =
- application:get_env(vm_memory_high_watermark),
- ok = case MemoryWatermark == 0 of
- true ->
- ok;
- false ->
- start_child(vm_memory_monitor, [MemoryWatermark])
- end,
-
- ok = rabbit_amqqueue:start(),
-
- ok = start_child(rabbit_router),
- ok = start_child(rabbit_node_monitor)
- end},
- {"recovery",
- fun () ->
- ok = maybe_insert_default_data(),
- ok = rabbit_exchange:recover(),
- ok = rabbit_amqqueue:recover()
- end},
- {"persister",
- fun () ->
- ok = start_child(rabbit_persister)
- end},
- {"guid generator",
- fun () ->
- ok = start_child(rabbit_guid)
- end},
- {"builtin applications",
- fun () ->
- {ok, DefaultVHost} = application:get_env(default_vhost),
- ok = error_logger:add_report_handler(
- rabbit_error_logger, [DefaultVHost]),
- ok = start_builtin_amq_applications()
- end},
- {"TCP listeners",
- fun () ->
- ok = rabbit_networking:start(),
- {ok, TcpListeners} = application:get_env(tcp_listeners),
- lists:foreach(
- fun ({Host, Port}) ->
- ok = rabbit_networking:start_tcp_listener(Host, Port)
- end,
- TcpListeners)
- end},
- {"SSL listeners",
- fun () ->
- case application:get_env(ssl_listeners) of
- {ok, []} ->
- ok;
- {ok, SslListeners} ->
- ok = rabbit_misc:start_applications([crypto, ssl]),
-
- {ok, SslOpts} = application:get_env(ssl_options),
-
- [rabbit_networking:start_ssl_listener
- (Host, Port, SslOpts) || {Host, Port} <- SslListeners],
- ok
- end
- end}]),
-
+ [ok = run_boot_step(Step) || Step <- boot_steps()],
io:format("~nbroker running~n"),
{ok, SupPid}.
+
stop(_State) ->
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
@@ -216,10 +231,108 @@ stop(_State) ->
end,
ok.
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
+
+boot_error(Format, Args) ->
+ io:format("BOOT ERROR: " ++ Format, Args),
+ error_logger:error_msg(Format, Args),
+ timer:sleep(1000),
+ exit({?MODULE, failure_during_boot}).
+
+run_boot_step({StepName, Attributes}) ->
+ Description = case lists:keysearch(description, 1, Attributes) of
+ {value, {_, D}} -> D;
+ false -> StepName
+ end,
+ case [MFA || {mfa, MFA} <- Attributes] of
+ [] ->
+ io:format("-- ~s~n", [Description]);
+ MFAs ->
+ io:format("starting ~-40s ...", [Description]),
+ [case catch apply(M,F,A) of
+ {'EXIT', Reason} ->
+ boot_error("FAILED~nReason: ~p~n", [Reason]);
+ ok ->
+ ok
+ end || {M,F,A} <- MFAs],
+ io:format("done~n"),
+ ok
+ end.
+
+boot_steps() ->
+ AllApps = [App || {App, _, _} <- application:loaded_applications()],
+ Modules = lists:usort(
+ lists:append([Modules
+ || {ok, Modules} <-
+ [application:get_key(App, modules)
+ || App <- AllApps]])),
+ UnsortedSteps =
+ lists:flatmap(fun (Module) ->
+ [{StepName, Attributes}
+ || {rabbit_boot_step, [{StepName, Attributes}]}
+ <- Module:module_info(attributes)]
+ end, Modules),
+ sort_boot_steps(UnsortedSteps).
+
+sort_boot_steps(UnsortedSteps) ->
+ G = digraph:new([acyclic]),
+
+ %% Add vertices, with duplicate checking.
+ [case digraph:vertex(G, StepName) of
+ false -> digraph:add_vertex(G, StepName, Step);
+ _ -> boot_error("Duplicate boot step name: ~w~n", [StepName])
+ end || Step = {StepName, _Attrs} <- UnsortedSteps],
+
+ %% Add edges, detecting cycles and missing vertices.
+ lists:foreach(fun ({StepName, Attributes}) ->
+ [add_boot_step_dep(G, StepName, PrecedingStepName)
+ || {post, PrecedingStepName} <- Attributes],
+ [add_boot_step_dep(G, SucceedingStepName, StepName)
+ || {pre, SucceedingStepName} <- Attributes]
+ end, UnsortedSteps),
+
+ %% Use topological sort to find a consistent ordering (if there is
+ %% one, otherwise fail).
+ SortedStepsRev = [begin
+ {StepName, Step} = digraph:vertex(G, StepName),
+ Step
+ end || StepName <- digraph_utils:topsort(G)],
+ SortedSteps = lists:reverse(SortedStepsRev),
+
+ digraph:delete(G),
+
+ %% Check that all mentioned {M,F,A} triples are exported.
+ case [{StepName, {M,F,A}}
+ || {StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
+ not erlang:function_exported(M, F, length(A))] of
+ [] -> SortedSteps;
+ MissingFunctions -> boot_error("Boot step functions not exported: ~p~n",
+ [MissingFunctions])
+ end.
+
+add_boot_step_dep(G, RunsSecond, RunsFirst) ->
+ case digraph:add_edge(G, RunsSecond, RunsFirst) of
+ {error, Reason} ->
+ boot_error("Could not add boot step dependency of ~w on ~w:~n~s",
+ [RunsSecond, RunsFirst,
+ case Reason of
+ {bad_vertex, V} ->
+ io_lib:format("Boot step not registered: ~w~n", [V]);
+ {bad_edge, [First | Rest]} ->
+ [io_lib:format("Cyclic dependency: ~w", [First]),
+ [io_lib:format(" depends on ~w", [Next])
+ || Next <- Rest],
+ io_lib:format(" depends on ~w~n", [First])]
+ end]);
+ _ ->
+ ok
+ end.
+
+%%---------------------------------------------------------------------------
log_location(Type) ->
- case application:get_env(Type, case Type of
+ case application:get_env(Type, case Type of
kernel -> error_logger;
sasl -> sasl_error_logger
end) of
@@ -275,15 +388,6 @@ print_banner() ->
lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
io:nl().
-start_child(Mod) ->
- start_child(Mod, []).
-
-start_child(Mod, Args) ->
- {ok,_} = supervisor:start_child(rabbit_sup,
- {Mod, {Mod, start_link, Args},
- transient, 100, worker, [Mod]}),
- ok.
-
ensure_working_log_handlers() ->
Handlers = gen_event:which_handlers(error_logger),
ok = ensure_working_log_handler(error_logger_file_h,
@@ -309,7 +413,7 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
throw({error, {cannot_log_to_tty,
TTYHandler, not_installed}})
end;
- _ -> case lists:member(NewFHandler, Handlers) of
+ _ -> case lists:member(NewFHandler, Handlers) of
true -> ok;
false -> case rotate_logs(LogLocation, "",
OldFHandler, NewFHandler) of
@@ -341,12 +445,6 @@ insert_default_data() ->
DefaultReadPerm),
ok.
-start_builtin_amq_applications() ->
- %%TODO: we may want to create a separate supervisor for these so
- %%they don't bring down the entire app when they die and fail to
- %%restart
- ok.
-
rotate_logs(File, Suffix, Handler) ->
rotate_logs(File, Suffix, Handler, Handler).
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 9a639ed4..534409aa 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -54,7 +54,15 @@
%%----------------------------------------------------------------------------
start() ->
- ok = alarm_handler:add_alarm_handler(?MODULE, []).
+ ok = alarm_handler:add_alarm_handler(?MODULE, []),
+ {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
+ ok = case MemoryWatermark == 0 of
+ true ->
+ ok;
+ false ->
+ rabbit_sup:start_child(vm_memory_monitor, [MemoryWatermark])
+ end,
+ ok.
stop() ->
ok = alarm_handler:delete_alarm_handler(?MODULE).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index b28574b7..b9bd71b7 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -37,8 +37,14 @@
-behaviour(gen_event).
+-export([boot/0]).
+
-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]).
+boot() ->
+ {ok, DefaultVHost} = application:get_env(default_vhost),
+ ok = error_logger:add_report_handler(?MODULE, [DefaultVHost]).
+
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 9762619f..0866da3f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -97,7 +97,7 @@
-spec(enable_cover/1 :: (string()) -> ok_or_error()).
-spec(report_cover/1 :: (string()) -> 'ok').
-spec(throw_on_error/2 ::
- (atom(), thunk({error, any()} | {ok, A} | A)) -> A).
+ (atom(), thunk({error, any()} | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(with_user/2 :: (username(), thunk(A)) -> A).
@@ -340,6 +340,9 @@ intersperse(Sep, [E|T]) -> [E, Sep | intersperse(Sep, T)].
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
%% the order in which results are received.
+%%
+%% WARNING: This is is deliberately lightweight rather than robust -- if F
+%% throws, upmap will hang forever, so make sure F doesn't throw!
upmap(F, L) ->
Parent = self(),
Ref = make_ref(),
@@ -428,7 +431,7 @@ append_file(File, _, Suffix) ->
ensure_parent_dirs_exist(Filename) ->
case filelib:ensure_dir(Filename) of
ok -> ok;
- {error, Reason} ->
+ {error, Reason} ->
throw({error, {cannot_create_parent_dirs, Filename, Reason}})
end.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 3a0f9240..84658a85 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -31,7 +31,7 @@
-module(rabbit_networking).
--export([start/0, start_tcp_listener/2, start_ssl_listener/3,
+-export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3,
stop_tcp_listener/2, on_node_down/1, active_listeners/0,
node_listeners/1, connections/0, connection_info/1,
connection_info/2, connection_info_all/0,
@@ -82,6 +82,27 @@
%%----------------------------------------------------------------------------
+boot() ->
+ ok = start(),
+ ok = boot_tcp(),
+ ok = boot_ssl().
+
+boot_tcp() ->
+ {ok, TcpListeners} = application:get_env(tcp_listeners),
+ [ok = start_tcp_listener(Host, Port) || {Host, Port} <- TcpListeners],
+ ok.
+
+boot_ssl() ->
+ case application:get_env(ssl_listeners) of
+ {ok, []} ->
+ ok;
+ {ok, SslListeners} ->
+ ok = rabbit_misc:start_applications([crypto, ssl]),
+ {ok, SslOpts} = application:get_env(ssl_options),
+ [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
+ ok
+ end.
+
start() ->
{ok,_} = supervisor:start_child(
rabbit_sup,
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 9f787920..4fcfab78 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -96,12 +96,20 @@ start() ->
{ok, Module, Warnings} ->
%% This gets lots of spurious no-source warnings when we
%% have .ez files, so we want to supress them to prevent
- %% hiding real issues.
+ %% hiding real issues. On Ubuntu, we also get warnings
+ %% about kernel/stdlib sources being out of date, which we
+ %% also ignore for the same reason.
WarningStr = Module:format_warning(
[W || W <- Warnings,
case W of
{warning, {source_not_found, _}} -> false;
- _ -> true
+ {warning, {obj_out_of_date, {_,_,WApp,_,_}}}
+ when WApp == mnesia;
+ WApp == stdlib;
+ WApp == kernel;
+ WApp == sasl;
+ WApp == os_mon -> false;
+ _ -> true
end]),
case length(WarningStr) of
0 -> ok;
@@ -222,7 +230,7 @@ expand_dependencies(Current, [Next|Rest]) ->
post_process_script(ScriptFile) ->
case file:consult(ScriptFile) of
{ok, [{script, Name, Entries}]} ->
- NewEntries = process_entries(Entries),
+ NewEntries = lists:flatmap(fun process_entry/1, Entries),
case file:open(ScriptFile, [write]) of
{ok, Fd} ->
io:format(Fd, "%% script generated at ~w ~w~n~p.~n",
@@ -236,13 +244,10 @@ post_process_script(ScriptFile) ->
{error, {failed_to_load_script, Reason}}
end.
-process_entries([]) ->
- [];
-process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} |
- Rest]) ->
- [Entry, {apply,{rabbit,prepare,[]}} | Rest];
-process_entries([Entry|Rest]) ->
- [Entry | process_entries(Rest)].
+process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) ->
+ [Entry, {apply,{rabbit,prepare,[]}}];
+process_entry(Entry) ->
+ [Entry].
error(Fmt, Args) ->
io:format("ERROR: " ++ Fmt ++ "~n", Args),
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 730d7909..ef32544c 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -33,7 +33,7 @@
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/0, start_child/1, start_child/2]).
-export([init/1]).
@@ -42,5 +42,14 @@
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_child(Mod) ->
+ start_child(Mod, []).
+
+start_child(Mod, Args) ->
+ {ok, _} = supervisor:start_child(?SERVER,
+ {Mod, {Mod, start_link, Args},
+ transient, 100, worker, [Mod]}),
+ ok.
+
init([]) ->
{ok, {{one_for_one, 10, 10}, []}}.