summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-07-25 13:50:00 +0100
committerEmile Joubert <emile@rabbitmq.com>2012-07-25 13:50:00 +0100
commit0a5477e7801e57754f312715a8f784745ff9a3d9 (patch)
tree67aa47e55dc8517ded254c1a2ff4a0fab6d46842
parentcdfc1b7b26f6a874ff24cedbc1d733057f33dfd7 (diff)
parent78c5faed5c2b8f11371028f3857ce935b9c8eb75 (diff)
downloadrabbitmq-server-0a5477e7801e57754f312715a8f784745ff9a3d9.tar.gz
Merged bug24971 into default
-rw-r--r--Makefile16
-rwxr-xr-xcheck_xref291
-rw-r--r--packaging/windows-exe/rabbitmq_nsi.in6
-rw-r--r--src/file_handle_cache.erl27
-rw-r--r--src/rabbit.erl13
-rw-r--r--src/rabbit_alarm.erl81
-rw-r--r--src/rabbit_amqqueue.erl21
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_channel.erl72
-rw-r--r--src/rabbit_control_main.erl13
-rw-r--r--src/rabbit_disk_monitor.erl4
-rw-r--r--src/rabbit_exchange_type.erl8
-rw-r--r--src/rabbit_mirror_queue_misc.erl48
-rw-r--r--src/rabbit_mirror_queue_slave.erl44
-rw-r--r--src/rabbit_misc.erl34
-rw-r--r--src/rabbit_nodes.erl4
-rw-r--r--src/rabbit_prelaunch.erl6
-rw-r--r--src/rabbit_reader.erl237
-rw-r--r--src/vm_memory_monitor.erl31
19 files changed, 693 insertions, 265 deletions
diff --git a/Makefile b/Makefile
index 0e3960dc..f3729cfa 100644
--- a/Makefile
+++ b/Makefile
@@ -103,7 +103,7 @@ endif
all: $(TARGETS)
-.PHONY: plugins
+.PHONY: plugins check-xref
ifneq "$(PLUGINS_SRC_DIR)" ""
plugins:
[ -d "$(PLUGINS_SRC_DIR)/rabbitmq-server" ] || ln -s "$(CURDIR)" "$(PLUGINS_SRC_DIR)/rabbitmq-server"
@@ -111,9 +111,19 @@ plugins:
PLUGINS_SRC_DIR="" $(MAKE) -C "$(PLUGINS_SRC_DIR)" plugins-dist PLUGINS_DIST_DIR="$(CURDIR)/$(PLUGINS_DIR)" VERSION=$(VERSION)
echo "Put your EZs here and use rabbitmq-plugins to enable them." > $(PLUGINS_DIR)/README
rm -f $(PLUGINS_DIR)/rabbit_common*.ez
+
+# add -q to remove printout of warnings....
+check-xref: $(BEAM_TARGETS) $(PLUGINS_DIR)
+ rm -rf lib
+ ./check_xref $(PLUGINS_DIR) -q
+
else
plugins:
# Not building plugins
+
+check-xref:
+ $(info xref checks are disabled)
+
endif
$(DEPS_FILE): $(SOURCES) $(INCLUDES)
@@ -217,11 +227,11 @@ stop-rabbit-on-node: all
echo "rabbit:stop()." | $(ERL_CALL)
set-resource-alarm: all
- echo "alarm_handler:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \
+ echo "rabbit_alarm:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \
$(ERL_CALL)
clear-resource-alarm: all
- echo "alarm_handler:clear_alarm({resource_limit, $(SOURCE), node()})." | \
+ echo "rabbit_alarm:clear_alarm({resource_limit, $(SOURCE), node()})." | \
$(ERL_CALL)
stop-node:
diff --git a/check_xref b/check_xref
new file mode 100755
index 00000000..8f65f3b1
--- /dev/null
+++ b/check_xref
@@ -0,0 +1,291 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+-mode(compile).
+
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%%
+
+main(["-h"]) ->
+ io:format("usage: check_xref PluginDirectory (options)~n"
+ "options:~n"
+ " -q - quiet mode (only prints errors)~n"
+ " -X - disables all filters~n");
+main([PluginsDir|Argv]) ->
+ put({?MODULE, quiet}, lists:member("-q", Argv)),
+ put({?MODULE, no_filters}, lists:member("-X", Argv)),
+
+ {ok, Cwd} = file:get_cwd(),
+ code:add_pathz(filename:join(Cwd, "ebin")),
+ LibDir = filename:join(Cwd, "lib"),
+ case filelib:is_dir(LibDir) of
+ false -> ok;
+ true -> os:cmd("rm -rf " ++ LibDir)
+ end,
+ Rc = try
+ check(Cwd, PluginsDir, LibDir, checks())
+ catch
+ _:Err ->
+ io:format(user, "failed: ~p~n", [Err]),
+ 1
+ end,
+ shutdown(Rc, LibDir).
+
+shutdown(Rc, LibDir) ->
+ os:cmd("rm -rf " ++ LibDir),
+ erlang:halt(Rc).
+
+check(Cwd, PluginsDir, LibDir, Checks) ->
+ {ok, Plugins} = file:list_dir(PluginsDir),
+ ok = file:make_dir(LibDir),
+ [begin
+ Source = filename:join(PluginsDir, Plugin),
+ Target = filename:join(LibDir, Plugin),
+ IsExternal = external_dependency(Plugin),
+ AppN = case IsExternal of
+ true -> filename:join(LibDir, unmangle_name(Plugin));
+ false -> filename:join(
+ LibDir, filename:basename(Plugin, ".ez"))
+ end,
+
+ report(info, "mkdir -p ~s~n", [Target]),
+ filelib:ensure_dir(Target),
+
+ report(info, "cp ~s ~s~n", [Source, Target]),
+ {ok, _} = file:copy(Source, Target),
+
+ report(info, "unzip -d ~s ~s~n", [LibDir, Target]),
+ {ok, _} = zip:unzip(Target, [{cwd, LibDir}]),
+
+ UnpackDir = filename:join(LibDir, filename:basename(Target, ".ez")),
+ report(info, "mv ~s ~s~n", [UnpackDir, AppN]),
+ ok = file:rename(UnpackDir, AppN),
+
+ code:add_patha(filename:join(AppN, "ebin")),
+ case IsExternal of
+ true -> App = list_to_atom(hd(string:tokens(filename:basename(AppN),
+ "-"))),
+ report(info, "loading ~p~n", [App]),
+ application:load(App),
+ store_third_party(App);
+ _ -> ok
+ end
+ end || Plugin <- Plugins,
+ lists:suffix(".ez", Plugin)],
+
+ RabbitAppEbin = filename:join([LibDir, "rabbit", "ebin"]),
+ filelib:ensure_dir(filename:join(RabbitAppEbin, "foo")),
+ {ok, Beams} = file:list_dir("ebin"),
+ [{ok, _} = file:copy(filename:join("ebin", Beam),
+ filename:join(RabbitAppEbin, Beam)) || Beam <- Beams],
+ xref:start(?MODULE),
+ xref:set_default(?MODULE, [{verbose, false}, {warnings, false}]),
+ xref:set_library_path(?MODULE, code:get_path()),
+ xref:add_release(?MODULE, Cwd, {name, rabbit}),
+ store_unresolved_calls(),
+ Results = lists:flatten([perform_analysis(Q) || Q <- Checks]),
+ report(Results).
+
+%%
+%% Analysis
+%%
+
+perform_analysis({Query, Description, Severity}) ->
+ perform_analysis({Query, Description, Severity, fun(_) -> false end});
+perform_analysis({Query, Description, Severity, Filter}) ->
+ report_progress("Checking whether any code ~s "
+ "(~s)~n", [Description, Query]),
+ case analyse(Query) of
+ {ok, Analysis} ->
+ [filter(Result, Filter) ||
+ Result <- process_analysis(Query, Description,
+ Severity, Analysis)];
+ {error, Module, Reason} ->
+ {analysis_error, {Module, Reason}}
+ end.
+
+partition(Results) ->
+ lists:partition(fun({{_, L}, _}) -> L =:= error end, Results).
+
+analyse(Query) when is_atom(Query) ->
+ xref:analyse(?MODULE, Query, [{verbose, false}]);
+analyse(Query) when is_list(Query) ->
+ xref:q(?MODULE, Query).
+
+process_analysis(Query, Tag, Severity, Analysis) when is_atom(Query) ->
+ [{{Tag, Severity}, MFA} || MFA <- Analysis];
+process_analysis(Query, Tag, Severity, Analysis) when is_list(Query) ->
+ [{{Tag, Severity}, Result} || Result <- Analysis].
+
+checks() ->
+ [{"(XXL)(Lin) ((XC - UC) || (XU - X - B))",
+ "has call to undefined function(s)",
+ error, filters()},
+ {"(Lin) (L - LU)", "has unused local function(s)",
+ error, filters()},
+ {"(Lin) (LU * (X - XU))",
+ "has exported function(s) only used locally",
+ warning, filters()},
+ {"(Lin) (DF * (XU + LU))", "used deprecated function(s)",
+ warning, filters()}].
+% {"(Lin) (X - XU)", "possibly unused export",
+% warning, fun filter_unused/1}].
+
+%%
+%% noise filters (can be disabled with -X) - strip uninteresting analyses
+%%
+
+filter(Result, Filter) ->
+ case Filter(Result) of
+ false -> Result;
+ true -> [] %% NB: this gets flattened out later on....
+ end.
+
+filters() ->
+ case get({?MODULE, no_filters}) of
+ true -> fun(_) -> false end;
+ _ -> filter_chain([fun is_unresolved_call/1, fun is_callback/1,
+ fun is_unused/1, fun is_irrelevant/1])
+ end.
+
+filter_chain(FnChain) ->
+ fun(AnalysisResult) ->
+ lists:foldl(fun(F, false) -> F(cleanup(AnalysisResult));
+ (_F, true) -> true
+ end, false, FnChain)
+ end.
+
+cleanup({{_, _},{{{{_,_,_}=MFA1,_},{{_,_,_}=MFA2,_}},_}}) -> {MFA1, MFA2};
+cleanup({{_, _},{{{_,_,_}=MFA1,_},{{_,_,_}=MFA2,_}}}) -> {MFA1, MFA2};
+cleanup({{_, _},{{_,_,_}=MFA1,{_,_,_}=MFA2},_}) -> {MFA1, MFA2};
+cleanup({{_, _},{{_,_,_}=MFA1,{_,_,_}=MFA2}}) -> {MFA1, MFA2};
+cleanup({{_, _}, {_,_,_}=MFA}) -> MFA;
+cleanup({{_, _}, {{_,_,_}=MFA,_}}) -> MFA;
+cleanup({{_,_,_}=MFA, {_,_,_}}) -> MFA;
+cleanup({{_,_,_}=MFA, {_,_,_},_}) -> MFA;
+cleanup(Other) -> Other.
+
+is_irrelevant({{M,_,_}, {_,_,_}}) ->
+ is_irrelevant(M);
+is_irrelevant({M,_,_}) ->
+ is_irrelevant(M);
+is_irrelevant(Mod) when is_atom(Mod) ->
+ lists:member(Mod, get({?MODULE, third_party})).
+
+is_unused({{_,_,_}=MFA, {_,_,_}}) ->
+ is_unused(MFA);
+is_unused({M,_F,_A}) ->
+ lists:suffix("_tests", atom_to_list(M));
+is_unused(_) ->
+ false.
+
+is_unresolved_call({_, F, A}) ->
+ UC = get({?MODULE, unresolved_calls}),
+ sets:is_element({'$M_EXPR', F, A}, UC);
+is_unresolved_call(_) ->
+ false.
+
+%% TODO: cache this....
+is_callback({M,_,_}=MFA) ->
+ Attributes = M:module_info(attributes),
+ Behaviours = proplists:append_values(behaviour, Attributes),
+ {_, Callbacks} = lists:foldl(fun acc_behaviours/2, {M, []}, Behaviours),
+ lists:member(MFA, Callbacks);
+is_callback(_) ->
+ false.
+
+acc_behaviours(B, {M, CB}=Acc) ->
+ case catch(B:behaviour_info(callbacks)) of
+ [{_,_} | _] = Callbacks ->
+ {M, CB ++ [{M, F, A} || {F,A} <- Callbacks]};
+ _ ->
+ Acc
+ end.
+
+%%
+%% reporting/output
+%%
+
+report(Results) ->
+ [report_failures(F) || F <- Results],
+ {Errors, Warnings} = partition(Results),
+ report(info, "Completed: ~p errors, ~p warnings~n",
+ [length(Errors), length(Warnings)]),
+ case length(Errors) > 0 of
+ true -> 1;
+ false -> 0
+ end.
+
+report_failures({analysis_error, {Mod, Reason}}) ->
+ report(error, "~s:0 Analysis Error: ~p~n", [source_file(Mod), Reason]);
+report_failures({{Tag, Level}, {{{{M,_,_},L},{{M2,F2,A2},_}},_}}) ->
+ report(Level, "~s:~w ~s ~p:~p/~p~n",
+ [source_file(M), L, Tag, M2, F2, A2]);
+report_failures({{Tag, Level}, {{M,F,A},L}}) ->
+ report(Level, "~s:~w ~s ~p:~p/~p~n", [source_file(M), L, Tag, M, F, A]);
+report_failures({{Tag, Level}, {M,F,A}}) ->
+ report(Level, "~s:unknown ~s ~p:~p/~p~n", [source_file(M), Tag, M, F, A]);
+report_failures(Term) ->
+ report(error, "Ignoring ~p~n", [Term]),
+ ok.
+
+report_progress(Fmt, Args) ->
+ report(info, Fmt, Args).
+
+report(Level, Fmt, Args) ->
+ case {get({?MODULE, quiet}), Level} of
+ {true, error} -> do_report(lookup_prefix(Level), Fmt, Args);
+ {false, _} -> do_report(lookup_prefix(Level), Fmt, Args);
+ _ -> ok
+ end.
+
+do_report(Prefix, Fmt, Args) ->
+ io:format(Prefix ++ Fmt, Args).
+
+lookup_prefix(error) -> "ERROR: ";
+lookup_prefix(warning) -> "WARNING: ";
+lookup_prefix(info) -> "INFO: ".
+
+source_file(M) ->
+ proplists:get_value(source, M:module_info(compile)).
+
+%%
+%% setup/code-path/file-system ops
+%%
+
+store_third_party(App) ->
+ {ok, AppConfig} = application:get_all_key(App),
+ case get({?MODULE, third_party}) of
+ undefined ->
+ put({?MODULE, third_party},
+ proplists:get_value(modules, AppConfig));
+ Modules ->
+ put({?MODULE, third_party},
+ proplists:get_value(modules, AppConfig) ++ Modules)
+ end.
+
+%% TODO: this ought not to be maintained in such a fashion
+external_dependency(Path) ->
+ lists:any(fun(P) -> lists:prefix(P, Path) end,
+ ["mochiweb", "webmachine", "rfc4627", "eldap"]).
+
+unmangle_name(Path) ->
+ [Name, Vsn | _] = re:split(Path, "-", [{return, list}]),
+ string:join([Name, Vsn], "-").
+
+store_unresolved_calls() ->
+ {ok, UCFull} = analyse("UC"),
+ UC = [MFA || {_, {_,_,_} = MFA} <- UCFull],
+ put({?MODULE, unresolved_calls}, sets:from_list(UC)).
diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in
index 91510991..f5257040 100644
--- a/packaging/windows-exe/rabbitmq_nsi.in
+++ b/packaging/windows-exe/rabbitmq_nsi.in
@@ -101,7 +101,9 @@ Section "RabbitMQ Service" RabbitService
ExpandEnvStrings $0 %COMSPEC%
ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" install'
ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" start'
- CopyFiles "$WINDIR\.erlang.cookie" "$PROFILE\.erlang.cookie"
+ ReadEnvStr $1 "HOMEDRIVE"
+ ReadEnvStr $2 "HOMEPATH"
+ CopyFiles "$WINDIR\.erlang.cookie" "$1$2\.erlang.cookie"
SectionEnd
;--------------------------------
@@ -234,4 +236,4 @@ Function findErlang
System::Call 'Kernel32::SetEnvironmentVariableA(t, t) i("ERLANG_HOME", "$0").r0'
${EndIf}
-FunctionEnd \ No newline at end of file
+FunctionEnd
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 13ee4249..68c095d2 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -150,8 +150,8 @@
info/0, info/1]).
-export([ulimit/0]).
--export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, prioritise_cast/2]).
+-export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2,
+ handle_info/2, terminate/2, code_change/3, prioritise_cast/2]).
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
@@ -195,7 +195,9 @@
obtain_count,
obtain_pending,
clients,
- timer_ref
+ timer_ref,
+ alarm_set,
+ alarm_clear
}).
-record(cstate,
@@ -268,7 +270,11 @@
%%----------------------------------------------------------------------------
start_link() ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
+ start_link(fun alarm_handler:set_alarm/1, fun alarm_handler:clear_alarm/1).
+
+start_link(AlarmSet, AlarmClear) ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [AlarmSet, AlarmClear],
+ [{timeout, infinity}]).
register_callback(M, F, A)
when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
@@ -806,7 +812,7 @@ i(Item, _) -> throw({bad_argument, Item}).
%% gen_server2 callbacks
%%----------------------------------------------------------------------------
-init([]) ->
+init([AlarmSet, AlarmClear]) ->
Limit = case application:get_env(file_handles_high_watermark) of
{ok, Watermark} when (is_integer(Watermark) andalso
Watermark > 0) ->
@@ -830,7 +836,9 @@ init([]) ->
obtain_count = 0,
obtain_pending = pending_new(),
clients = Clients,
- timer_ref = undefined }}.
+ timer_ref = undefined,
+ alarm_set = AlarmSet,
+ alarm_clear = AlarmClear }}.
prioritise_cast(Msg, _State) ->
case Msg of
@@ -1026,10 +1034,11 @@ obtain_limit_reached(#fhc_state { obtain_limit = Limit,
obtain_count = Count}) ->
Limit =/= infinity andalso Count >= Limit.
-adjust_alarm(OldState, NewState) ->
+adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet,
+ alarm_clear = AlarmClear }, NewState) ->
case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of
- {false, true} -> alarm_handler:set_alarm({file_descriptor_limit, []});
- {true, false} -> alarm_handler:clear_alarm(file_descriptor_limit);
+ {false, true} -> AlarmSet({file_descriptor_limit, []});
+ {true, false} -> AlarmClear(file_descriptor_limit);
_ -> ok
end,
NewState.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index fda489fe..ed258c71 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -20,7 +20,8 @@
-export([start/0, boot/0, stop/0,
stop_and_halt/0, await_startup/0, status/0, is_running/0,
- is_running/1, environment/0, rotate_logs/1, force_event_refresh/0]).
+ is_running/1, environment/0, rotate_logs/1, force_event_refresh/0,
+ start_fhc/0]).
-export([start/2, stop/1]).
@@ -53,8 +54,7 @@
-rabbit_boot_step({file_handle_cache,
[{description, "file handle cache server"},
- {mfa, {rabbit_sup, start_restartable_child,
- [file_handle_cache]}},
+ {mfa, {rabbit, start_fhc, []}},
{requires, pre_boot},
{enables, worker_pool}]}).
@@ -730,3 +730,10 @@ config_files() ->
[File] <- Files];
error -> []
end.
+
+%% We don't want this in fhc since it references rabbit stuff. And we can't put
+%% this in the bootstep directly.
+start_fhc() ->
+ rabbit_sup:start_restartable_child(
+ file_handle_cache,
+ [fun rabbit_alarm:set_alarm/1, fun rabbit_alarm:clear_alarm/1]).
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index d16d90a4..e6625b2b 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -18,22 +18,28 @@
-behaviour(gen_event).
--export([start/0, stop/0, register/2, on_node_up/1, on_node_down/1]).
+-export([start_link/0, start/0, stop/0, register/2, set_alarm/1,
+ clear_alarm/1, get_alarms/0, on_node_up/1, on_node_down/1]).
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
-export([remote_conserve_resources/3]). %% Internal use only
--record(alarms, {alertees, alarmed_nodes}).
+-define(SERVER, ?MODULE).
+
+-record(alarms, {alertees, alarmed_nodes, alarms}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()).
+-spec(set_alarm/1 :: (any()) -> 'ok').
+-spec(clear_alarm/1 :: (any()) -> 'ok').
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
@@ -41,59 +47,70 @@
%%----------------------------------------------------------------------------
+start_link() ->
+ gen_event:start_link({local, ?SERVER}).
+
start() ->
- ok = alarm_handler:add_alarm_handler(?MODULE, []),
+ ok = rabbit_sup:start_restartable_child(?MODULE),
+ ok = gen_event:add_handler(?SERVER, ?MODULE, []),
{ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
- rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]),
-
+ rabbit_sup:start_restartable_child(
+ vm_memory_monitor, [MemoryWatermark, fun rabbit_alarm:set_alarm/1,
+ fun rabbit_alarm:clear_alarm/1]),
{ok, DiskLimit} = application:get_env(disk_free_limit),
rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]),
ok.
-stop() ->
- ok = alarm_handler:delete_alarm_handler(?MODULE).
+stop() -> ok.
register(Pid, HighMemMFA) ->
- gen_event:call(alarm_handler, ?MODULE,
- {register, Pid, HighMemMFA},
+ gen_event:call(?SERVER, ?MODULE, {register, Pid, HighMemMFA},
infinity).
-on_node_up(Node) -> gen_event:notify(alarm_handler, {node_up, Node}).
+set_alarm(Alarm) -> gen_event:notify(?SERVER, {set_alarm, Alarm}).
+clear_alarm(Alarm) -> gen_event:notify(?SERVER, {clear_alarm, Alarm}).
+
+get_alarms() -> gen_event:call(?SERVER, ?MODULE, get_alarms, infinity).
-on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}).
+on_node_up(Node) -> gen_event:notify(?SERVER, {node_up, Node}).
+on_node_down(Node) -> gen_event:notify(?SERVER, {node_down, Node}).
-%% Can't use alarm_handler:{set,clear}_alarm because that doesn't
-%% permit notifying a remote node.
remote_conserve_resources(Pid, Source, true) ->
- gen_event:notify({alarm_handler, node(Pid)},
+ gen_event:notify({?SERVER, node(Pid)},
{set_alarm, {{resource_limit, Source, node()}, []}});
remote_conserve_resources(Pid, Source, false) ->
- gen_event:notify({alarm_handler, node(Pid)},
+ gen_event:notify({?SERVER, node(Pid)},
{clear_alarm, {resource_limit, Source, node()}}).
+
%%----------------------------------------------------------------------------
init([]) ->
{ok, #alarms{alertees = dict:new(),
- alarmed_nodes = dict:new()}}.
+ alarmed_nodes = dict:new(),
+ alarms = []}}.
handle_call({register, Pid, HighMemMFA}, State) ->
{ok, 0 < dict:size(State#alarms.alarmed_nodes),
internal_register(Pid, HighMemMFA, State)};
+handle_call(get_alarms, State = #alarms{alarms = Alarms}) ->
+ {ok, Alarms, State};
+
handle_call(_Request, State) ->
{ok, not_understood, State}.
-handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) ->
- {ok, maybe_alert(fun dict:append/3, Node, Source, State)};
+handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
+ handle_set_alarm(Alarm, State#alarms{alarms = [Alarm|Alarms]});
-handle_event({clear_alarm, {resource_limit, Source, Node}}, State) ->
- {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)};
+handle_event({clear_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
+ handle_clear_alarm(Alarm, State#alarms{alarms = lists:keydelete(Alarm, 1,
+ Alarms)});
handle_event({node_up, Node}, State) ->
%% Must do this via notify and not call to avoid possible deadlock.
ok = gen_event:notify(
- {alarm_handler, Node},
+ {?SERVER, Node},
{register, self(), {?MODULE, remote_conserve_resources, []}}),
{ok, State};
@@ -186,3 +203,25 @@ internal_register(Pid, {M, F, A} = HighMemMFA,
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
State#alarms{alertees = NewAlertees}.
+
+handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
+ rabbit_log:warning("~s resource limit alarm set on node ~p~n",
+ [Source, Node]),
+ {ok, maybe_alert(fun dict:append/3, Node, Source, State)};
+handle_set_alarm({file_descriptor_limit, []}, State) ->
+ rabbit_log:warning("file descriptor limit alarm set~n"),
+ {ok, State};
+handle_set_alarm(Alarm, State) ->
+ rabbit_log:warning("alarm '~p' set~n", [Alarm]),
+ {ok, State}.
+
+handle_clear_alarm({resource_limit, Source, Node}, State) ->
+ rabbit_log:warning("~s resource limit alarm cleared on node ~p~n",
+ [Source, Node]),
+ {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)};
+handle_clear_alarm(file_descriptor_limit, State) ->
+ rabbit_log:warning("file descriptor limit alarm cleared~n"),
+ {ok, State};
+handle_clear_alarm(Alarm, State) ->
+ rabbit_log:warning("alarm '~p' cleared~n", [Alarm]),
+ {ok, State}.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index afbaea65..d82ac266 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -599,7 +599,7 @@ on_node_down(Node) ->
slave_pids = []}
<- mnesia:table(rabbit_queue),
node(Pid) == Node andalso
- not is_process_alive(Pid)])),
+ not rabbit_misc:is_process_alive(Pid)])),
{Qs, Dels} = lists:unzip(QsDels),
T = rabbit_binding:process_deletions(
lists:foldl(fun rabbit_binding:combine_deletions/2,
@@ -672,13 +672,18 @@ qpids(Qs) -> lists:append([[QPid | SPids] ||
#amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
safe_delegate_call_ok(F, Pids) ->
- case delegate:invoke(Pids, fun (Pid) ->
- rabbit_misc:with_exit_handler(
- fun () -> ok end,
- fun () -> F(Pid) end)
- end) of
- {_, []} -> ok;
- {_, Bad} -> {error, Bad}
+ {_, Bads} = delegate:invoke(Pids, fun (Pid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> ok end,
+ fun () -> F(Pid) end)
+ end),
+ case lists:filter(fun ({_Pid, {exit, {R, _}, _}}) ->
+ rabbit_misc:is_abnormal_exit(R);
+ ({_Pid, _}) ->
+ false
+ end, Bads) of
+ [] -> ok;
+ Bads1 -> {error, Bads1}
end.
delegate_call(Pid, Msg) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8933de87..388af413 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -788,7 +788,7 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
unconfirmed = UC}) ->
case pmon:is_monitored(QPid, QMons) of
false -> noreply(State);
- true -> case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> case rabbit_misc:is_abnormal_exit(Reason) of
true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
QNameS = rabbit_misc:rs(qname(State)),
rabbit_log:warning("DLQ ~p for ~s died with "
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 22c6a223..69fe0edc 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -267,7 +267,7 @@ handle_cast({method, Method, Content, Flow},
catch
exit:Reason = #amqp_error{} ->
MethodName = rabbit_misc:method_record_type(Method),
- send_exception(Reason#amqp_error{method = MethodName}, State);
+ handle_exception(Reason#amqp_error{method = MethodName}, State);
_:Reason ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
@@ -400,24 +400,29 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
-send_exception(Reason, State = #ch{protocol = Protocol,
- channel = Channel,
- writer_pid = WriterPid,
- reader_pid = ReaderPid,
- conn_pid = ConnPid}) ->
- {CloseChannel, CloseMethod} =
- rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
- [ConnPid, Channel, Reason]),
+handle_exception(Reason, State = #ch{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ conn_pid = ConnPid}) ->
%% something bad's happened: notify_queues may not be 'ok'
{_Result, State1} = notify_queues(State),
- case CloseChannel of
- Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod),
- {noreply, State1};
- _ -> ReaderPid ! {channel_exit, Channel, Reason},
- {stop, normal, State1}
+ case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of
+ {Channel, CloseMethod} ->
+ rabbit_log:error("connection ~p, channel ~p - soft error:~n~p~n",
+ [ConnPid, Channel, Reason]),
+ ok = rabbit_writer:send_command(WriterPid, CloseMethod),
+ {noreply, State1};
+ {0, _} ->
+ ReaderPid ! {channel_exit, Channel, Reason},
+ {stop, normal, State1}
end.
+precondition_failed(Format) -> precondition_failed(Format, []).
+
+precondition_failed(Format, Params) ->
+ rabbit_misc:protocol_error(precondition_failed, Format, Params).
+
return_queue_declare_ok(#resource{name = ActualName},
NoWait, MessageCount, ConsumerCount, State) ->
return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait,
@@ -461,9 +466,9 @@ check_user_id_header(#'P_basic'{user_id = Username},
ok;
check_user_id_header(#'P_basic'{user_id = Claimed},
#ch{user = #user{username = Actual}}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "user_id property set to '~s' but "
- "authenticated user was '~s'", [Claimed, Actual]).
+ precondition_failed(
+ "user_id property set to '~s' but authenticated user was '~s'",
+ [Claimed, Actual]).
check_internal_exchange(#exchange{name = Name, internal = true}) ->
rabbit_misc:protocol_error(access_refused,
@@ -625,8 +630,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
State1#ch{uncommitted_message_q = NewTMQ}
end};
{error, Reason} ->
- rabbit_misc:protocol_error(precondition_failed,
- "invalid message: ~p", [Reason])
+ precondition_failed("invalid message: ~p", [Reason])
end;
handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
@@ -881,8 +885,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
{error, not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, in_use} ->
- rabbit_misc:protocol_error(
- precondition_failed, "~s in use", [rabbit_misc:rs(ExchangeName)]);
+ precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]);
ok ->
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
@@ -980,11 +983,9 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
- rabbit_misc:protocol_error(
- precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]);
+ precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
{error, not_empty} ->
- rabbit_misc:protocol_error(
- precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]);
+ precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]);
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
#'queue.delete_ok'{message_count = PurgedMessageCount})
@@ -1019,15 +1020,13 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
#'queue.purge_ok'{message_count = PurgedMessageCount});
handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot switch from confirm to tx mode", []);
+ precondition_failed("cannot switch from confirm to tx mode");
handle_method(#'tx.select'{}, _, State) ->
{reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}};
handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "channel is not transactional", []);
+ precondition_failed("channel is not transactional");
handle_method(#'tx.commit'{}, _,
State = #ch{uncommitted_message_q = TMQ,
@@ -1041,8 +1040,7 @@ handle_method(#'tx.commit'{}, _,
{noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "channel is not transactional", []);
+ precondition_failed("channel is not transactional");
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
uncommitted_acks = TAL,
@@ -1052,8 +1050,7 @@ handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
{reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot switch from tx to confirm mode", []);
+ precondition_failed("cannot switch from tx to confirm mode");
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
@@ -1119,7 +1116,7 @@ monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons,
delivering_queues = sets:add_element(QPid, DQ)}.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
- case rabbit_misc:is_abnormal_termination(Reason) of
+ case rabbit_misc:is_abnormal_exit(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
send_nacks(MXs, State#ch{unconfirmed = UC1});
false -> {MXs, UC1} = dtree:take(QPid, UC),
@@ -1263,8 +1260,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
QTail, DeliveryTag, Multiple)
end;
{empty, _} ->
- rabbit_misc:protocol_error(
- precondition_failed, "unknown delivery tag ~w", [DeliveryTag])
+ precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
ack(Acked, State) ->
@@ -1423,7 +1419,7 @@ complete_tx(State = #ch{tx_status = committing}) ->
ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
State#ch{tx_status = in_progress};
complete_tx(State = #ch{tx_status = failed}) ->
- {noreply, State1} = send_exception(
+ {noreply, State1} = handle_exception(
rabbit_misc:amqp_error(
precondition_failed, "partial tx completion", [],
'tx.commit'),
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index b23088cc..0dda32f1 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -190,11 +190,11 @@ print_report(Node, {Descr, Module, InfoFun, KeysFun}, VHostArg) ->
print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg).
print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) ->
- case Results = rpc_call(Node, Module, InfoFun, VHostArg) of
- [_|_] -> InfoItems = rpc_call(Node, Module, KeysFun, []),
- display_row([atom_to_list(I) || I <- InfoItems]),
- display_info_list(Results, InfoItems);
- _ -> ok
+ case rpc_call(Node, Module, InfoFun, VHostArg) of
+ [_|_] = Results -> InfoItems = rpc_call(Node, Module, KeysFun, []),
+ display_row([atom_to_list(I) || I <- InfoItems]),
+ display_info_list(Results, InfoItems);
+ _ -> ok
end,
io:nl().
@@ -432,14 +432,13 @@ action(list_parameters, Node, Args = [], _Opts, Inform) ->
rabbit_runtime_parameters:info_keys());
action(report, Node, _Args, _Opts, Inform) ->
- io:format("Reporting server status on ~p~n~n", [erlang:universaltime()]),
+ Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
N <- unsafe_rpc(Node, rabbit_mnesia, running_clustered_nodes, []),
Action <- [status, cluster_status, environment]],
VHosts = unsafe_rpc(Node, rabbit_vhost, list, []),
[print_report(Node, Q) || Q <- ?GLOBAL_QUERIES],
[print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts],
- io:format("End of server status report~n"),
ok;
action(eval, Node, [Expr], _Opts, _Inform) ->
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index 58375abb..e72181c0 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -149,10 +149,10 @@ internal_update(State = #state { limit = Limit,
case {Alarmed, NewAlarmed} of
{false, true} ->
emit_update_info("exceeded", CurrentFreeBytes, LimitBytes),
- alarm_handler:set_alarm({{resource_limit, disk, node()}, []});
+ rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []});
{true, false} ->
emit_update_info("below limit", CurrentFreeBytes, LimitBytes),
- alarm_handler:clear_alarm({resource_limit, disk, node()});
+ rabbit_alarm:clear_alarm({resource_limit, disk, node()});
_ ->
ok
end,
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index e6470b72..9a793aab 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -54,13 +54,13 @@
%% called when comparing exchanges for equivalence - should return ok or
%% exit with #amqp_error{}
--callback assert_args_equivalence (rabbit_types:exchange(),
- rabbit_framing:amqp_table()) ->
+-callback assert_args_equivalence(rabbit_types:exchange(),
+ rabbit_framing:amqp_table()) ->
'ok' | rabbit_types:connection_exit().
%% called when the policy attached to this exchange changes.
--callback policy_changed (
- serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
+-callback policy_changed(serial(), rabbit_types:exchange(),
+ rabbit_types:exchange()) -> 'ok'.
-else.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 180677fe..ba62a734 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -62,7 +62,9 @@ remove_from_queue(QueueName, DeadPids) ->
slave_pids = SPids }] ->
[QPid1 | SPids1] = Alive =
[Pid || Pid <- [QPid | SPids],
- not lists:member(node(Pid), DeadNodes)],
+ not lists:member(node(Pid),
+ DeadNodes) orelse
+ rabbit_misc:is_process_alive(Pid)],
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
{ok, QPid1, []};
@@ -134,22 +136,40 @@ add_mirror(Queue, MirrorNode) ->
Queue,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] -> case rabbit_mirror_queue_slave_sup:start_child(
- MirrorNode, [Q]) of
- {ok, undefined} -> %% Already running
- ok;
- {ok, SPid} ->
- rabbit_log:info(
- "Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, SPid]),
- ok;
- Other ->
- Other
- end;
- [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
+ [] ->
+ start_child(Name, MirrorNode, Q);
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true ->
+ {error,{queue_already_mirrored_on_node,
+ MirrorNode}};
+ false ->
+ start_child(Name, MirrorNode, Q)
+ end
end
end).
+start_child(Name, MirrorNode, Q) ->
+ case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of
+ {ok, undefined} ->
+ %% this means the mirror process was
+ %% already running on the given node.
+ ok;
+ {ok, SPid} ->
+ rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, SPid]),
+ ok;
+ {error, {{stale_master_pid, StalePid}, _}} ->
+ rabbit_log:warning("Detected stale HA master while adding "
+ "mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, StalePid]),
+ ok;
+ {error, {{duplicate_live_master, _}=Err, _}} ->
+ throw(Err);
+ Other ->
+ Other
+ end.
+
if_mirrored_queue(Queue, Fun) ->
rabbit_amqqueue:with(
Queue, fun (#amqqueue { arguments = Args } = Q) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 60d3e027..c4ae307c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -101,19 +101,10 @@ info(QPid) ->
init(#amqqueue { name = QueueName } = Q) ->
Self = self(),
Node = node(),
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] -> MPids1 = MPids ++ [Self],
- ok = rabbit_amqqueue:store_queue(
- Q1 #amqqueue { slave_pids = MPids1 }),
- {new, QPid};
- [SPid] -> true = rabbit_misc:is_process_alive(SPid),
- existing
- end
- end) of
+ case rabbit_misc:execute_mnesia_transaction(fun() ->
+ init_it(Self, Node,
+ QueueName)
+ end) of
{new, MPid} ->
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
{ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -150,10 +141,37 @@ init(#amqqueue { name = QueueName } = Q) ->
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
+ {stale, StalePid} ->
+ {stop, {stale_master_pid, StalePid}};
+ duplicate_live_master ->
+ {stop, {duplicate_live_master, Node}};
existing ->
ignore
end.
+init_it(Self, Node, QueueName) ->
+ [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
+ [] ->
+ MPids1 = MPids ++ [Self],
+ ok = rabbit_amqqueue:store_queue(Q1#amqqueue{slave_pids=MPids1}),
+ {new, QPid};
+ [QPid] ->
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> duplicate_live_master;
+ false -> {stale, QPid}
+ end;
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true -> existing;
+ false -> MPids1 = (MPids -- [SPid]) ++ [Self],
+ ok = rabbit_amqqueue:store_queue(
+ Q1#amqqueue{ slave_pids = MPids1 }),
+ {new, QPid}
+ end
+ end.
+
handle_call({deliver, Delivery = #delivery { immediate = true }},
From, State) ->
%% It is safe to reply 'false' here even if a) we've not seen the
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index d41aa09b..25a51d22 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -29,14 +29,14 @@
-export([enable_cover/1, report_cover/1]).
-export([start_cover/1]).
-export([confirm_to_sender/2]).
--export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
--export([is_abnormal_termination/1]).
+-export([throw_on_error/2, with_exit_handler/2, is_abnormal_exit/1,
+ filter_exit_map/2]).
-export([with_user/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([execute_mnesia_transaction/2]).
-export([execute_mnesia_tx_with_tail/1]).
-export([ensure_ok/2]).
--export([tcp_name/3]).
+-export([tcp_name/3, format_inet_error/1]).
-export([upmap/2, map_in_order/2]).
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
@@ -61,6 +61,11 @@
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
+%% Horrible macro to use in guards
+-define(IS_BENIGN_EXIT(R),
+ R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal;
+ R =:= shutdown).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -137,8 +142,8 @@
-spec(throw_on_error/2 ::
(atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
+-spec(is_abnormal_exit/1 :: (any()) -> boolean()).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(is_abnormal_termination/1 :: (any()) -> boolean()).
-spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 ::
(rabbit_types:username(), rabbit_types:vhost(), thunk(A))
@@ -152,6 +157,7 @@
-spec(tcp_name/3 ::
(atom(), inet:ip_address(), rabbit_networking:ip_port())
-> atom()).
+-spec(format_inet_error/1 :: (atom()) -> string()).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(table_filter/3:: (fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'),
@@ -423,13 +429,14 @@ with_exit_handler(Handler, Thunk) ->
try
Thunk()
catch
- exit:{R, _} when R =:= noproc; R =:= nodedown;
- R =:= normal; R =:= shutdown ->
- Handler();
- exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
- Handler()
+ exit:{R, _} when ?IS_BENIGN_EXIT(R) -> Handler();
+ exit:{{R, _}, _} when ?IS_BENIGN_EXIT(R) -> Handler()
end.
+is_abnormal_exit(R) when ?IS_BENIGN_EXIT(R) -> false;
+is_abnormal_exit({R, _}) when ?IS_BENIGN_EXIT(R) -> false;
+is_abnormal_exit(_) -> true.
+
filter_exit_map(F, L) ->
Ref = make_ref(),
lists:filter(fun (R) -> R =/= Ref end,
@@ -437,11 +444,6 @@ filter_exit_map(F, L) ->
fun () -> Ref end,
fun () -> F(I) end) || I <- L]).
-is_abnormal_termination(Reason)
- when Reason =:= noproc; Reason =:= noconnection;
- Reason =:= normal; Reason =:= shutdown -> false;
-is_abnormal_termination({shutdown, _}) -> false;
-is_abnormal_termination(_) -> true.
with_user(Username, Thunk) ->
fun () ->
@@ -510,6 +512,10 @@ tcp_name(Prefix, IPAddress, Port)
list_to_atom(
format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])).
+format_inet_error(address) -> "cannot connect to host/port";
+format_inet_error(timeout) -> "timed out";
+format_inet_error(Error) -> inet:format_error(Error).
+
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
%% the order in which results are received.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 1c23632d..c8d77b0f 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -70,8 +70,8 @@ diagnostics0() ->
diagnostics_host(Host) ->
case names(Host) of
{error, EpmdReason} ->
- {"- unable to connect to epmd on ~s: ~w",
- [Host, EpmdReason]};
+ {"- unable to connect to epmd on ~s: ~w (~s)",
+ [Host, EpmdReason, rabbit_misc:format_inet_error(EpmdReason)]};
{ok, NamePorts} ->
{"- ~s: ~p",
[Host, [{list_to_atom(Name), Port} ||
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index d56211b5..b0454435 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -67,9 +67,5 @@ duplicate_node_check(NodeStr) ->
{error, EpmdReason} ->
rabbit_misc:quit("epmd error for host ~p: ~p (~s)~n",
[NodeHost, EpmdReason,
- case EpmdReason of
- address -> "unable to establish tcp connection";
- timeout -> "timed out establishing tcp connection";
- _ -> inet:format_error(EpmdReason)
- end])
+ rabbit_misc:format_inet_error(EpmdReason)])
end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 61868cc1..19dac70c 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -173,6 +173,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
server_capabilities(_) ->
[].
+%%--------------------------------------------------------------------------
+
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
@@ -311,7 +313,7 @@ handle_other(handshake_timeout, Deb, State)
mainloop(Deb, State);
handle_other(handshake_timeout, _Deb, State) ->
throw({handshake_timeout, State#v1.callback});
-handle_other(timeout, Deb, State = #v1{connection_state = closed}) ->
+handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) ->
mainloop(Deb, State);
handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) ->
throw({heartbeat_timeout, S});
@@ -353,9 +355,9 @@ switch_callback(State, Callback, Length) ->
State#v1{callback = Callback, recv_len = Length}.
terminate(Explanation, State) when ?IS_RUNNING(State) ->
- {normal, send_exception(State, 0,
- rabbit_misc:amqp_error(
- connection_forced, Explanation, [], none))};
+ {normal, handle_exception(State, 0,
+ rabbit_misc:amqp_error(
+ connection_forced, Explanation, [], none))};
terminate(_Explanation, State) ->
{force, State}.
@@ -383,6 +385,9 @@ update_last_blocked_by(State = #v1{conserve_resources = true}) ->
update_last_blocked_by(State = #v1{conserve_resources = false}) ->
State#v1{last_blocked_by = flow}.
+%%--------------------------------------------------------------------------
+%% error handling / termination
+
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -406,24 +411,10 @@ handle_dependent_exit(ChPid, Reason, State) ->
{_Channel, controlled} ->
maybe_close(control_throttle(State));
{Channel, uncontrolled} ->
- log(error, "AMQP connection ~p, channel ~p - error:~n~p~n",
- [self(), Channel, Reason]),
maybe_close(handle_exception(control_throttle(State),
Channel, Reason))
end.
-channel_cleanup(ChPid) ->
- case get({ch_pid, ChPid}) of
- undefined -> undefined;
- {Channel, MRef} -> credit_flow:peer_down(ChPid),
- erase({channel, Channel}),
- erase({ch_pid, ChPid}),
- erlang:demonitor(MRef, [flush]),
- Channel
- end.
-
-all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
-
terminate_channels() ->
NChannels =
length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
@@ -477,6 +468,80 @@ maybe_close(State) ->
termination_kind(normal) -> controlled;
termination_kind(_) -> uncontrolled.
+handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
+ log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
+ [self(), closed, Channel, Reason]),
+ State;
+handle_exception(State = #v1{connection = #connection{protocol = Protocol},
+ connection_state = CS},
+ Channel, Reason)
+ when ?IS_RUNNING(State) orelse CS =:= closing ->
+ log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
+ [self(), CS, Channel, Reason]),
+ {0, CloseMethod} =
+ rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
+ terminate_channels(),
+ State1 = close_connection(State),
+ ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
+ State1;
+handle_exception(State, Channel, Reason) ->
+ %% We don't trust the client at this point - force them to wait
+ %% for a bit so they can't DOS us with repeated failed logins etc.
+ timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({handshake_error, State#v1.connection_state, Channel, Reason}).
+
+frame_error(Error, Type, Channel, Payload, State) ->
+ {Str, Bin} = payload_snippet(Payload),
+ handle_exception(State, Channel,
+ rabbit_misc:amqp_error(frame_error,
+ "type ~p, ~s octets = ~p: ~p",
+ [Type, Str, Bin, Error], none)).
+
+unexpected_frame(Type, Channel, Payload, State) ->
+ {Str, Bin} = payload_snippet(Payload),
+ handle_exception(State, Channel,
+ rabbit_misc:amqp_error(unexpected_frame,
+ "type ~p, ~s octets = ~p",
+ [Type, Str, Bin], none)).
+
+payload_snippet(Payload) when size(Payload) =< 16 ->
+ {"all", Payload};
+payload_snippet(<<Snippet:16/binary, _/binary>>) ->
+ {"first 16", Snippet}.
+
+%%--------------------------------------------------------------------------
+
+create_channel(Channel, State) ->
+ #v1{sock = Sock, queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State,
+ {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
+ Protocol, User, VHost, Capabilities, Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ {ChPid, AState}.
+
+channel_cleanup(ChPid) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> undefined;
+ {Channel, MRef} -> credit_flow:peer_down(ChPid),
+ erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ erlang:demonitor(MRef, [flush]),
+ Channel
+ end.
+
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
+
+%%--------------------------------------------------------------------------
+
handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
connection = #connection{protocol = Protocol}})
@@ -492,34 +557,43 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
- error -> throw({unknown_frame, 0, Type, Payload});
+ error -> frame_error(unknown_frame, Type, 0, Payload, State);
heartbeat -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
- Other -> throw({unexpected_frame_on_channel0, Other})
+ _Other -> unexpected_frame(Type, 0, Payload, State)
end;
handle_frame(Type, Channel, Payload,
- State = #v1{connection = #connection{protocol = Protocol}}) ->
+ State = #v1{connection = #connection{protocol = Protocol}})
+ when ?IS_RUNNING(State) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
- error -> throw({unknown_frame, Channel, Type, Payload});
- heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State)
- end.
+ error -> frame_error(unknown_frame, Type, Channel, Payload, State);
+ heartbeat -> unexpected_frame(Type, Channel, Payload, State);
+ Frame -> process_frame(Frame, Channel, State)
+ end;
+handle_frame(Type, Channel, Payload, State) ->
+ unexpected_frame(Type, Channel, Payload, State).
process_frame(Frame, Channel, State) ->
- case get({channel, Channel}) of
- {ChPid, AState} ->
- case process_channel_frame(Frame, ChPid, AState) of
- {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {error, Reason} -> handle_exception(State, Channel, Reason)
- end;
- undefined when ?IS_RUNNING(State) ->
- ok = create_channel(Channel, State),
- process_frame(Frame, Channel, State);
- undefined ->
- throw({channel_frame_while_starting,
- Channel, State#v1.connection_state, Frame})
+ {ChPid, AState} = case get({channel, Channel}) of
+ undefined -> create_channel(Channel, State);
+ Other -> Other
+ end,
+ case process_channel_frame(Frame, ChPid, AState) of
+ {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {error, Reason} -> handle_exception(State, Channel, Reason)
+ end.
+
+process_channel_frame(Frame, ChPid, AState) ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} -> {ok, NewAState};
+ {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
+ {ok, NewAState};
+ {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
+ ChPid, Method, Content),
+ {ok, NewAState};
+ {error, Reason} -> {error, Reason}
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
@@ -536,19 +610,20 @@ post_process_frame({method, MethodName, _}, _ChPid,
post_process_frame(_Frame, _ChPid, State) ->
control_throttle(State).
+%%--------------------------------------------------------------------------
+
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
PayloadSize + 1));
-handle_input({frame_payload, Type, Channel, PayloadSize},
- PayloadAndMarker, State) ->
- case PayloadAndMarker of
- <<Payload:PayloadSize/binary, ?FRAME_END>> ->
- switch_callback(handle_frame(Type, Channel, Payload, State),
- frame_header, 7);
- _ ->
- throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
+handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
+ <<Payload:PayloadSize/binary, EndMarker>> = Data,
+ case EndMarker of
+ ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
+ switch_callback(State1, frame_header, 7);
+ _ -> frame_error({invalid_frame_end_marker, EndMarker},
+ Type, Channel, Payload, State)
end;
%% The two rules pertaining to version negotiation:
@@ -619,24 +694,14 @@ ensure_stats_timer(State) ->
handle_method0(MethodName, FieldsBin,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- HandleException =
- fun(R) ->
- case ?IS_RUNNING(State) of
- true -> send_exception(State, 0, R);
- %% We don't trust the client at this point - force
- %% them to wait for a bit so they can't DOS us with
- %% repeated failed logins etc.
- false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, State#v1.connection_state, R})
- end
- end,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
catch exit:#amqp_error{method = none} = Reason ->
- HandleException(Reason#amqp_error{method = MethodName});
+ handle_exception(State, 0, Reason#amqp_error{method = MethodName});
Type:Reason ->
- HandleException({Type, Reason, MethodName, erlang:get_stacktrace()})
+ Stack = erlang:get_stacktrace(),
+ handle_exception(State, 0, {Type, Reason, MethodName, Stack})
end.
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
@@ -838,8 +903,8 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
- socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end,
- fun ([{_, I}]) -> I end);
+ socket_info(fun (S) -> rabbit_net:getstat(S, [SockStat]) end,
+ fun ([{_, I}]) -> I end, Sock);
i(state, #v1{connection_state = S}) ->
S;
i(last_blocked_by, #v1{last_blocked_by = By}) ->
@@ -875,10 +940,7 @@ i(Item, #v1{}) ->
throw({bad_argument, Item}).
socket_info(Get, Select, Sock) ->
- socket_info(fun() -> Get(Sock) end, Select).
-
-socket_info(Get, Select) ->
- case Get() of
+ case Get(Sock) of
{ok, T} -> Select(T);
{error, _} -> ''
end.
@@ -901,51 +963,6 @@ cert_info(F, Sock) ->
{ok, Cert} -> list_to_binary(F(Cert))
end.
-%%--------------------------------------------------------------------------
-
-create_channel(Channel, State) ->
- #v1{sock = Sock, queue_collector = Collector,
- channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost,
- capabilities = Capabilities}} = State,
- {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
- Protocol, User, VHost, Capabilities, Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- ok.
-
-process_channel_frame(Frame, ChPid, AState) ->
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} -> {ok, NewAState};
- {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
- {ok, NewAState};
- {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
- ChPid, Method, Content),
- {ok, NewAState};
- {error, Reason} -> {error, Reason}
- end.
-
-handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
- State;
-handle_exception(State, Channel, Reason) ->
- send_exception(State, Channel, Reason).
-
-send_exception(State = #v1{connection = #connection{protocol = Protocol}},
- Channel, Reason) ->
- {0, CloseMethod} =
- rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- terminate_channels(),
- State1 = close_connection(State),
- ok = rabbit_writer:internal_send_command(
- State1#v1.sock, 0, CloseMethod, Protocol),
- State1.
-
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index fb184d1a..df5f73e7 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -27,7 +27,7 @@
-behaviour(gen_server).
--export([start_link/1]).
+-export([start_link/1, start_link/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -51,7 +51,9 @@
memory_limit,
timeout,
timer,
- alarmed
+ alarmed,
+ alarm_set,
+ alarm_clear
}).
%%----------------------------------------------------------------------------
@@ -59,6 +61,8 @@
-ifdef(use_specs).
-spec(start_link/1 :: (float()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/3 :: (float(), fun ((any()) -> 'ok'),
+ fun ((any()) -> 'ok')) -> rabbit_types:ok_pid_or_error()).
-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
-spec(get_vm_limit/0 :: () -> non_neg_integer()).
-spec(get_check_interval/0 :: () -> non_neg_integer()).
@@ -99,14 +103,21 @@ get_memory_limit() ->
%% gen_server callbacks
%%----------------------------------------------------------------------------
-start_link(Args) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
+start_link(MemFraction) ->
+ start_link(MemFraction,
+ fun alarm_handler:set_alarm/1, fun alarm_handler:clear_alarm/1).
-init([MemFraction]) ->
+start_link(MemFraction, AlarmSet, AlarmClear) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE,
+ [MemFraction, AlarmSet, AlarmClear], []).
+
+init([MemFraction, AlarmSet, AlarmClear]) ->
TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
State = #state { timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
timer = TRef,
- alarmed = false},
+ alarmed = false,
+ alarm_set = AlarmSet,
+ alarm_clear = AlarmClear },
{ok, set_mem_limits(State, MemFraction)}.
handle_call(get_vm_memory_high_watermark, _From, State) ->
@@ -175,16 +186,18 @@ set_mem_limits(State, MemFraction) ->
memory_limit = MemLim }).
internal_update(State = #state { memory_limit = MemLimit,
- alarmed = Alarmed}) ->
+ alarmed = Alarmed,
+ alarm_set = AlarmSet,
+ alarm_clear = AlarmClear }) ->
MemUsed = erlang:memory(total),
NewAlarmed = MemUsed > MemLimit,
case {Alarmed, NewAlarmed} of
{false, true} ->
emit_update_info(set, MemUsed, MemLimit),
- alarm_handler:set_alarm({{resource_limit, memory, node()}, []});
+ AlarmSet({{resource_limit, memory, node()}, []});
{true, false} ->
emit_update_info(clear, MemUsed, MemLimit),
- alarm_handler:clear_alarm({resource_limit, memory, node()});
+ AlarmClear({resource_limit, memory, node()});
_ ->
ok
end,