summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-03-29 17:43:13 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-03-29 17:43:13 +0100
commit7f0e98504cb462893490bd85e167dee56d220177 (patch)
treee77ab494ad384cce32313f0ac7bfb3f6c1855e19
parent1e5845ac32aa86f04bfe75ab161ab9113490c72e (diff)
parent1c8b17fd830b8f38663e6767df6c006a712671d6 (diff)
downloadrabbitmq-server-7f0e98504cb462893490bd85e167dee56d220177.tar.gz
Merge bug 23463
-rw-r--r--Makefile8
-rw-r--r--docs/rabbitmqctl.1.xml4
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_alarm.erl89
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_disk_monitor.erl196
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_reader.erl34
-rw-r--r--src/rabbit_sup.erl24
-rw-r--r--src/vm_memory_monitor.erl4
11 files changed, 309 insertions, 68 deletions
diff --git a/Makefile b/Makefile
index e8975856..d16cd4d0 100644
--- a/Makefile
+++ b/Makefile
@@ -216,12 +216,12 @@ start-rabbit-on-node: all
stop-rabbit-on-node: all
echo "rabbit:stop()." | $(ERL_CALL)
-set-memory-alarm: all
- echo "alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []})." | \
+set-resource-alarm: all
+ echo "alarm_handler:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \
$(ERL_CALL)
-clear-memory-alarm: all
- echo "alarm_handler:clear_alarm({vm_memory_high_watermark, node()})." | \
+clear-resource-alarm: all
+ echo "alarm_handler:clear_alarm({resource_limit, $(SOURCE), node()})." | \
$(ERL_CALL)
stop-node:
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index ee07ebfe..62a5b6d3 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1074,8 +1074,8 @@
<varlistentry>
<term>last_blocked_by</term>
<listitem><para>The reason for which this connection
- was last blocked. One of 'mem' - due to a memory
- alarm, 'flow' - due to internal flow control, or
+ was last blocked. One of 'resource' - due to a memory
+ or disk alarm, 'flow' - due to internal flow control, or
'none' if the connection was never
blocked.</para></listitem>
</varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index fd19051d..b7d14f20 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -19,6 +19,7 @@
{ssl_listeners, []},
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
+ {disk_free_limit, {mem_relative, 1.0}},
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
{frame_max, 131072},
diff --git a/src/rabbit.erl b/src/rabbit.erl
index dd5fb89c..eec7e34e 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -327,7 +327,11 @@ status() ->
[{vm_memory_high_watermark, {vm_memory_monitor,
get_vm_memory_high_watermark, []}},
{vm_memory_limit, {vm_memory_monitor,
- get_memory_limit, []}}]),
+ get_memory_limit, []}},
+ {disk_free_limit, {rabbit_disk_monitor,
+ get_disk_free_limit, []}},
+ {disk_free, {rabbit_disk_monitor,
+ get_disk_free, []}}]),
S3 = rabbit_misc:with_exit_handler(
fun () -> [] end,
fun () -> [{file_descriptors, file_handle_cache:info()}] end),
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 187ec1ab..04e0c141 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -23,7 +23,7 @@
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
--export([remote_conserve_memory/2]). %% Internal use only
+-export([remote_conserve_resources/3]). %% Internal use only
-record(alarms, {alertees, alarmed_nodes}).
@@ -45,6 +45,9 @@ start() ->
ok = alarm_handler:add_alarm_handler(?MODULE, []),
{ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]),
+
+ {ok, DiskLimit} = application:get_env(disk_free_limit),
+ rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]),
ok.
stop() ->
@@ -61,41 +64,41 @@ on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}).
%% Can't use alarm_handler:{set,clear}_alarm because that doesn't
%% permit notifying a remote node.
-remote_conserve_memory(Pid, true) ->
+remote_conserve_resources(Pid, Source, true) ->
gen_event:notify({alarm_handler, node(Pid)},
- {set_alarm, {{vm_memory_high_watermark, node()}, []}});
-remote_conserve_memory(Pid, false) ->
+ {set_alarm, {{resource_limit, Source, node()}, []}});
+remote_conserve_resources(Pid, Source, false) ->
gen_event:notify({alarm_handler, node(Pid)},
- {clear_alarm, {vm_memory_high_watermark, node()}}).
+ {clear_alarm, {resource_limit, Source, node()}}).
%%----------------------------------------------------------------------------
init([]) ->
{ok, #alarms{alertees = dict:new(),
- alarmed_nodes = sets:new()}}.
+ alarmed_nodes = dict:new()}}.
handle_call({register, Pid, HighMemMFA}, State) ->
- {ok, 0 < sets:size(State#alarms.alarmed_nodes),
+ {ok, 0 < dict:size(State#alarms.alarmed_nodes),
internal_register(Pid, HighMemMFA, State)};
handle_call(_Request, State) ->
{ok, not_understood, State}.
-handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) ->
- {ok, maybe_alert(fun sets:add_element/2, Node, State)};
+handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) ->
+ {ok, maybe_alert(fun dict:append/3, Node, Source, State)};
-handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) ->
- {ok, maybe_alert(fun sets:del_element/2, Node, State)};
+handle_event({clear_alarm, {resource_limit, Source, Node}}, State) ->
+ {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)};
handle_event({node_up, Node}, State) ->
%% Must do this via notify and not call to avoid possible deadlock.
ok = gen_event:notify(
{alarm_handler, Node},
- {register, self(), {?MODULE, remote_conserve_memory, []}}),
+ {register, self(), {?MODULE, remote_conserve_resources, []}}),
{ok, State};
handle_event({node_down, Node}, State) ->
- {ok, maybe_alert(fun sets:del_element/2, Node, State)};
+ {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)};
handle_event({register, Pid, HighMemMFA}, State) ->
{ok, internal_register(Pid, HighMemMFA, State)};
@@ -118,34 +121,58 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN,
- alertees = Alertees}) ->
- AN1 = SetFun(Node, AN),
- BeforeSz = sets:size(AN),
- AfterSz = sets:size(AN1),
+dict_unappend_all(Key, _Val, Dict) ->
+ dict:erase(Key, Dict).
+
+dict_unappend(Key, Val, Dict) ->
+ case lists:delete(Val, dict:fetch(Key, Dict)) of
+ [] -> dict:erase(Key, Dict);
+ X -> dict:store(Key, X, Dict)
+ end.
+
+count_dict_values(Val, Dict) ->
+ dict:fold(fun (_Node, List, Count) ->
+ Count + case lists:member(Val, List) of
+ true -> 1;
+ false -> 0
+ end
+ end, 0, Dict).
+
+maybe_alert(UpdateFun, Node, Source,
+ State = #alarms{alarmed_nodes = AN,
+ alertees = Alertees}) ->
+ AN1 = UpdateFun(Node, Source, AN),
+ BeforeSz = count_dict_values(Source, AN),
+ AfterSz = count_dict_values(Source, AN1),
+
%% If we have changed our alarm state, inform the remotes.
IsLocal = Node =:= node(),
- if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true, Alertees);
- IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees);
- true -> ok
+ if IsLocal andalso BeforeSz < AfterSz ->
+ ok = alert_remote(true, Alertees, Source);
+ IsLocal andalso BeforeSz > AfterSz ->
+ ok = alert_remote(false, Alertees, Source);
+ true ->
+ ok
end,
%% If the overall alarm state has changed, inform the locals.
- case {BeforeSz, AfterSz} of
- {0, 1} -> ok = alert_local(true, Alertees);
- {1, 0} -> ok = alert_local(false, Alertees);
+ case {dict:size(AN), dict:size(AN1)} of
+ {0, 1} -> ok = alert_local(true, Alertees, Source);
+ {1, 0} -> ok = alert_local(false, Alertees, Source);
{_, _} -> ok
end,
State#alarms{alarmed_nodes = AN1}.
-alert_local(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=:='/2).
+alert_local(Alert, Alertees, _Source) ->
+ alert(Alertees, [Alert], fun erlang:'=:='/2).
-alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2).
+alert_remote(Alert, Alertees, Source) ->
+ alert(Alertees, [Source, Alert], fun erlang:'=/='/2).
-alert(Alert, Alertees, NodeComparator) ->
+alert(Alertees, AlertArg, NodeComparator) ->
Node = node(),
dict:fold(fun (Pid, {M, F, A}, ok) ->
case NodeComparator(Node, node(Pid)) of
- true -> apply(M, F, A ++ [Pid, Alert]);
+ true -> apply(M, F, A ++ [Pid] ++ AlertArg);
false -> ok
end
end, ok, Alertees).
@@ -153,9 +180,9 @@ alert(Alert, Alertees, NodeComparator) ->
internal_register(Pid, {M, F, A} = HighMemMFA,
State = #alarms{alertees = Alertees}) ->
_MRef = erlang:monitor(process, Pid),
- case sets:is_element(node(), State#alarms.alarmed_nodes) of
- true -> ok = apply(M, F, A ++ [Pid, true]);
- false -> ok
+ case dict:find(node(), State#alarms.alarmed_nodes) of
+ {ok, _Sources} -> apply(M, F, A ++ [Pid, true]);
+ error -> ok
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
State#alarms{alertees = NewAlertees}.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6a775adf..51f88c8f 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -328,7 +328,7 @@ action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
0 -> Arg ++ ".0";
_ -> Arg
end),
- Inform("Setting memory threshhold on ~p to ~p", [Node, Frac]),
+ Inform("Setting memory threshold on ~p to ~p", [Node, Frac]),
rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]);
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
new file mode 100644
index 00000000..831d5290
--- /dev/null
+++ b/src/rabbit_disk_monitor.erl
@@ -0,0 +1,196 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_disk_monitor).
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([get_disk_free_limit/0, set_disk_free_limit/1, get_check_interval/0,
+ set_check_interval/1, get_disk_free/0]).
+
+-define(SERVER, ?MODULE).
+-define(DEFAULT_DISK_CHECK_INTERVAL, 60000).
+
+-record(state, {dir,
+ limit,
+ timeout,
+ timer,
+ alarmed
+ }).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(disk_free_limit() :: (integer() | {'mem_relative', float()})).
+-spec(start_link/1 :: (disk_free_limit()) -> rabbit_types:ok_pid_or_error()).
+-spec(get_disk_free_limit/0 :: () -> integer()).
+-spec(set_disk_free_limit/1 :: (disk_free_limit()) -> 'ok').
+-spec(get_check_interval/0 :: () -> integer()).
+-spec(set_check_interval/1 :: (integer()) -> 'ok').
+-spec(get_disk_free/0 :: () -> (integer() | 'unknown')).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
+get_disk_free_limit() ->
+ gen_server:call(?MODULE, get_disk_free_limit, infinity).
+
+set_disk_free_limit(Limit) ->
+ gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity).
+
+get_check_interval() ->
+ gen_server:call(?MODULE, get_check_interval, infinity).
+
+set_check_interval(Interval) ->
+ gen_server:call(?MODULE, {set_check_interval, Interval}, infinity).
+
+get_disk_free() ->
+ gen_server:call(?MODULE, get_disk_free, infinity).
+
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+start_link(Args) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
+
+init([Limit]) ->
+ TRef = start_timer(?DEFAULT_DISK_CHECK_INTERVAL),
+ Dir = dir(),
+ State = #state { dir = Dir,
+ timeout = ?DEFAULT_DISK_CHECK_INTERVAL,
+ timer = TRef,
+ alarmed = false},
+ case {catch get_disk_free(Dir),
+ vm_memory_monitor:get_total_memory()} of
+ {N1, N2} when is_integer(N1), is_integer(N2) ->
+ {ok, set_disk_limits(State, Limit)};
+ _ ->
+ rabbit_log:info("Disabling disk free space monitoring "
+ "on unsupported platform~n"),
+ {stop, unsupported_platform}
+ end.
+
+handle_call(get_disk_free_limit, _From, State) ->
+ {reply, interpret_limit(State#state.limit), State};
+
+handle_call({set_disk_free_limit, Limit}, _From, State) ->
+ {reply, ok, set_disk_limits(State, Limit)};
+
+handle_call(get_check_interval, _From, State) ->
+ {reply, State#state.timeout, State};
+
+handle_call({set_check_interval, Timeout}, _From, State) ->
+ {ok, cancel} = timer:cancel(State#state.timer),
+ {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
+
+handle_call(get_disk_free, _From, State = #state { dir = Dir }) ->
+ {reply, get_disk_free(Dir), State};
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(update, State) ->
+ {noreply, internal_update(State)};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+%% Server Internals
+%%----------------------------------------------------------------------------
+
+% the partition / drive containing this directory will be monitored
+dir() -> rabbit_mnesia:dir().
+
+set_disk_limits(State, Limit) ->
+ State1 = State#state { limit = Limit },
+ rabbit_log:info("Disk free limit set to ~pMB~n",
+ [trunc(interpret_limit(Limit) / 1048576)]),
+ internal_update(State1).
+
+internal_update(State = #state { limit = Limit,
+ dir = Dir,
+ alarmed = Alarmed}) ->
+ CurrentFreeBytes = get_disk_free(Dir),
+ LimitBytes = interpret_limit(Limit),
+ NewAlarmed = CurrentFreeBytes < LimitBytes,
+ case {Alarmed, NewAlarmed} of
+ {false, true} ->
+ emit_update_info("exceeded", CurrentFreeBytes, LimitBytes),
+ alarm_handler:set_alarm({{resource_limit, disk, node()}, []});
+ {true, false} ->
+ emit_update_info("below limit", CurrentFreeBytes, LimitBytes),
+ alarm_handler:clear_alarm({resource_limit, disk, node()});
+ _ ->
+ ok
+ end,
+ State #state {alarmed = NewAlarmed}.
+
+get_disk_free(Dir) ->
+ get_disk_free(Dir, os:type()).
+
+get_disk_free(Dir, {unix, Sun})
+ when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris ->
+ parse_free_unix(rabbit_misc:os_cmd("/usr/bin/df -k " ++ Dir));
+get_disk_free(Dir, {unix, _}) ->
+ parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir));
+get_disk_free(Dir, {win32, _}) ->
+ parse_free_win32(rabbit_misc:os_cmd("dir /-C /W " ++ Dir));
+get_disk_free(_, _) ->
+ unknown.
+
+parse_free_unix(CommandResult) ->
+ [_, Stats | _] = string:tokens(CommandResult, "\n"),
+ [_FS, _Total, _Used, Free | _] = string:tokens(Stats, " \t"),
+ list_to_integer(Free) * 1024.
+
+parse_free_win32(CommandResult) ->
+ LastLine = lists:last(string:tokens(CommandResult, "\r\n")),
+ [_, _Dir, Free, "bytes", "free"] = string:tokens(LastLine, " "),
+ list_to_integer(Free).
+
+interpret_limit({mem_relative, R}) ->
+ round(R * vm_memory_monitor:get_total_memory());
+interpret_limit(L) ->
+ L.
+
+emit_update_info(State, CurrentFree, Limit) ->
+ rabbit_log:info(
+ "Disk free space limit now ~s. Free bytes:~p Limit:~p~n",
+ [State, CurrentFree, Limit]).
+
+start_timer(Timeout) ->
+ {ok, TRef} = timer:send_interval(Timeout, update),
+ TRef.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ddf7f574..39409d2f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -60,6 +60,7 @@
-export([append_rpc_all_nodes/4]).
-export([multi_call/2]).
-export([quit/1]).
+-export([os_cmd/1]).
%%----------------------------------------------------------------------------
@@ -204,6 +205,7 @@
-spec(multi_call/2 ::
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(quit/1 :: (integer() | string()) -> no_return()).
+-spec(os_cmd/1 :: (string()) -> string()).
-endif.
@@ -914,3 +916,10 @@ quit(Status) ->
{unix, _} -> halt(Status);
{win32, _} -> init:stop(Status)
end.
+
+os_cmd(Command) ->
+ Exec = hd(string:tokens(Command, " ")),
+ case os:find_executable(Exec) of
+ false -> throw({command_not_found, Exec});
+ _ -> os:cmd(Command)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 47e796dc..5e9e78d3 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -25,7 +25,7 @@
-export([init/4, mainloop/2]).
--export([conserve_memory/2, server_properties/1]).
+-export([conserve_resources/2, server_properties/1]).
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
@@ -38,7 +38,7 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state, conserve_memory,
+ auth_mechanism, auth_state, conserve_resources,
last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
@@ -71,7 +71,7 @@
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(force_event_refresh/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
--spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(conserve_resources/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/1 :: (rabbit_types:protocol()) ->
rabbit_framing:amqp_table()).
@@ -133,8 +133,8 @@ info(Pid, Items) ->
force_event_refresh(Pid) ->
gen_server:cast(Pid, force_event_refresh).
-conserve_memory(Pid, Conserve) ->
- Pid ! {conserve_memory, Conserve},
+conserve_resources(Pid, Conserve) ->
+ Pid ! {conserve_resources, Conserve},
ok.
server_properties(Protocol) ->
@@ -218,7 +218,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
buf_len = 0,
auth_mechanism = none,
auth_state = none,
- conserve_memory = false,
+ conserve_resources = false,
last_blocked_by = none,
last_blocked_at = never},
try
@@ -276,8 +276,8 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{other, Other} -> handle_other(Other, Deb, State)
end.
-handle_other({conserve_memory, Conserve}, Deb, State) ->
- recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve}));
+handle_other({conserve_resources, Conserve}, Deb, State) ->
+ recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -358,8 +358,8 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-control_throttle(State = #v1{connection_state = CS,
- conserve_memory = Mem}) ->
+control_throttle(State = #v1{connection_state = CS,
+ conserve_resources = Mem}) ->
case {CS, Mem orelse credit_flow:blocked()} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
@@ -377,9 +377,9 @@ maybe_block(State = #v1{connection_state = blocking}) ->
maybe_block(State) ->
State.
-update_last_blocked_by(State = #v1{conserve_memory = true}) ->
- State#v1{last_blocked_by = mem};
-update_last_blocked_by(State = #v1{conserve_memory = false}) ->
+update_last_blocked_by(State = #v1{conserve_resources = true}) ->
+ State#v1{last_blocked_by = resource};
+update_last_blocked_by(State = #v1{conserve_resources = false}) ->
State#v1{last_blocked_by = flow}.
close_connection(State = #v1{queue_collector = Collector,
@@ -701,11 +701,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
State1 = control_throttle(
- State#v1{connection_state = running,
- connection = NewConnection,
- conserve_memory = Conserve}),
+ State#v1{connection_state = running,
+ connection = NewConnection,
+ conserve_resources = Conserve}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 0965e3b3..bf2b4798 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -52,22 +52,20 @@ start_child(Mod, Args) ->
start_child(Mod, Mod, Args).
start_child(ChildId, Mod, Args) ->
- {ok, _} = supervisor:start_child(?SERVER,
- {ChildId, {Mod, start_link, Args},
- transient, ?MAX_WAIT, worker, [Mod]}),
- ok.
+ child_reply(supervisor:start_child(?SERVER,
+ {ChildId, {Mod, start_link, Args},
+ transient, ?MAX_WAIT, worker, [Mod]})).
start_restartable_child(Mod) ->
start_restartable_child(Mod, []).
start_restartable_child(Mod, Args) ->
Name = list_to_atom(atom_to_list(Mod) ++ "_sup"),
- {ok, _} = supervisor:start_child(
- ?SERVER,
- {Name, {rabbit_restartable_sup, start_link,
- [Name, {Mod, start_link, Args}]},
- transient, infinity, supervisor, [rabbit_restartable_sup]}),
- ok.
+ child_reply(supervisor:start_child(
+ ?SERVER,
+ {Name, {rabbit_restartable_sup, start_link,
+ [Name, {Mod, start_link, Args}]},
+ transient, infinity, supervisor, [rabbit_restartable_sup]})).
stop_child(ChildId) ->
case supervisor:terminate_child(?SERVER, ChildId) of
@@ -77,3 +75,9 @@ stop_child(ChildId) ->
init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.
+
+
+%%----------------------------------------------------------------------------
+
+child_reply({ok, _}) -> ok;
+child_reply(X) -> X.
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index fca55f02..fb184d1a 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -181,10 +181,10 @@ internal_update(State = #state { memory_limit = MemLimit,
case {Alarmed, NewAlarmed} of
{false, true} ->
emit_update_info(set, MemUsed, MemLimit),
- alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []});
+ alarm_handler:set_alarm({{resource_limit, memory, node()}, []});
{true, false} ->
emit_update_info(clear, MemUsed, MemLimit),
- alarm_handler:clear_alarm({vm_memory_high_watermark, node()});
+ alarm_handler:clear_alarm({resource_limit, memory, node()});
_ ->
ok
end,