diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-03-29 17:43:13 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-03-29 17:43:13 +0100 |
commit | 7f0e98504cb462893490bd85e167dee56d220177 (patch) | |
tree | e77ab494ad384cce32313f0ac7bfb3f6c1855e19 | |
parent | 1e5845ac32aa86f04bfe75ab161ab9113490c72e (diff) | |
parent | 1c8b17fd830b8f38663e6767df6c006a712671d6 (diff) | |
download | rabbitmq-server-7f0e98504cb462893490bd85e167dee56d220177.tar.gz |
Merge bug 23463
-rw-r--r-- | Makefile | 8 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 4 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rw-r--r-- | src/rabbit.erl | 6 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 89 | ||||
-rw-r--r-- | src/rabbit_control.erl | 2 | ||||
-rw-r--r-- | src/rabbit_disk_monitor.erl | 196 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 9 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 34 | ||||
-rw-r--r-- | src/rabbit_sup.erl | 24 | ||||
-rw-r--r-- | src/vm_memory_monitor.erl | 4 |
11 files changed, 309 insertions, 68 deletions
@@ -216,12 +216,12 @@ start-rabbit-on-node: all stop-rabbit-on-node: all echo "rabbit:stop()." | $(ERL_CALL) -set-memory-alarm: all - echo "alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []})." | \ +set-resource-alarm: all + echo "alarm_handler:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \ $(ERL_CALL) -clear-memory-alarm: all - echo "alarm_handler:clear_alarm({vm_memory_high_watermark, node()})." | \ +clear-resource-alarm: all + echo "alarm_handler:clear_alarm({resource_limit, $(SOURCE), node()})." | \ $(ERL_CALL) stop-node: diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index ee07ebfe..62a5b6d3 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1074,8 +1074,8 @@ <varlistentry> <term>last_blocked_by</term> <listitem><para>The reason for which this connection - was last blocked. One of 'mem' - due to a memory - alarm, 'flow' - due to internal flow control, or + was last blocked. One of 'resource' - due to a memory + or disk alarm, 'flow' - due to internal flow control, or 'none' if the connection was never blocked.</para></listitem> </varlistentry> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index fd19051d..b7d14f20 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -19,6 +19,7 @@ {ssl_listeners, []}, {ssl_options, []}, {vm_memory_high_watermark, 0.4}, + {disk_free_limit, {mem_relative, 1.0}}, {msg_store_index_module, rabbit_msg_store_ets_index}, {backing_queue_module, rabbit_variable_queue}, {frame_max, 131072}, diff --git a/src/rabbit.erl b/src/rabbit.erl index dd5fb89c..eec7e34e 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -327,7 +327,11 @@ status() -> [{vm_memory_high_watermark, {vm_memory_monitor, get_vm_memory_high_watermark, []}}, {vm_memory_limit, {vm_memory_monitor, - get_memory_limit, []}}]), + get_memory_limit, []}}, + {disk_free_limit, {rabbit_disk_monitor, + get_disk_free_limit, []}}, + {disk_free, {rabbit_disk_monitor, + get_disk_free, []}}]), S3 = rabbit_misc:with_exit_handler( fun () -> [] end, fun () -> [{file_descriptors, file_handle_cache:info()}] end), diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 187ec1ab..04e0c141 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -23,7 +23,7 @@ -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). --export([remote_conserve_memory/2]). %% Internal use only +-export([remote_conserve_resources/3]). %% Internal use only -record(alarms, {alertees, alarmed_nodes}). @@ -45,6 +45,9 @@ start() -> ok = alarm_handler:add_alarm_handler(?MODULE, []), {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]), + + {ok, DiskLimit} = application:get_env(disk_free_limit), + rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]), ok. stop() -> @@ -61,41 +64,41 @@ on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}). %% Can't use alarm_handler:{set,clear}_alarm because that doesn't %% permit notifying a remote node. -remote_conserve_memory(Pid, true) -> +remote_conserve_resources(Pid, Source, true) -> gen_event:notify({alarm_handler, node(Pid)}, - {set_alarm, {{vm_memory_high_watermark, node()}, []}}); -remote_conserve_memory(Pid, false) -> + {set_alarm, {{resource_limit, Source, node()}, []}}); +remote_conserve_resources(Pid, Source, false) -> gen_event:notify({alarm_handler, node(Pid)}, - {clear_alarm, {vm_memory_high_watermark, node()}}). + {clear_alarm, {resource_limit, Source, node()}}). %%---------------------------------------------------------------------------- init([]) -> {ok, #alarms{alertees = dict:new(), - alarmed_nodes = sets:new()}}. + alarmed_nodes = dict:new()}}. handle_call({register, Pid, HighMemMFA}, State) -> - {ok, 0 < sets:size(State#alarms.alarmed_nodes), + {ok, 0 < dict:size(State#alarms.alarmed_nodes), internal_register(Pid, HighMemMFA, State)}; handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) -> - {ok, maybe_alert(fun sets:add_element/2, Node, State)}; +handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) -> + {ok, maybe_alert(fun dict:append/3, Node, Source, State)}; -handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) -> - {ok, maybe_alert(fun sets:del_element/2, Node, State)}; +handle_event({clear_alarm, {resource_limit, Source, Node}}, State) -> + {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)}; handle_event({node_up, Node}, State) -> %% Must do this via notify and not call to avoid possible deadlock. ok = gen_event:notify( {alarm_handler, Node}, - {register, self(), {?MODULE, remote_conserve_memory, []}}), + {register, self(), {?MODULE, remote_conserve_resources, []}}), {ok, State}; handle_event({node_down, Node}, State) -> - {ok, maybe_alert(fun sets:del_element/2, Node, State)}; + {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)}; handle_event({register, Pid, HighMemMFA}, State) -> {ok, internal_register(Pid, HighMemMFA, State)}; @@ -118,34 +121,58 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN, - alertees = Alertees}) -> - AN1 = SetFun(Node, AN), - BeforeSz = sets:size(AN), - AfterSz = sets:size(AN1), +dict_unappend_all(Key, _Val, Dict) -> + dict:erase(Key, Dict). + +dict_unappend(Key, Val, Dict) -> + case lists:delete(Val, dict:fetch(Key, Dict)) of + [] -> dict:erase(Key, Dict); + X -> dict:store(Key, X, Dict) + end. + +count_dict_values(Val, Dict) -> + dict:fold(fun (_Node, List, Count) -> + Count + case lists:member(Val, List) of + true -> 1; + false -> 0 + end + end, 0, Dict). + +maybe_alert(UpdateFun, Node, Source, + State = #alarms{alarmed_nodes = AN, + alertees = Alertees}) -> + AN1 = UpdateFun(Node, Source, AN), + BeforeSz = count_dict_values(Source, AN), + AfterSz = count_dict_values(Source, AN1), + %% If we have changed our alarm state, inform the remotes. IsLocal = Node =:= node(), - if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true, Alertees); - IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees); - true -> ok + if IsLocal andalso BeforeSz < AfterSz -> + ok = alert_remote(true, Alertees, Source); + IsLocal andalso BeforeSz > AfterSz -> + ok = alert_remote(false, Alertees, Source); + true -> + ok end, %% If the overall alarm state has changed, inform the locals. - case {BeforeSz, AfterSz} of - {0, 1} -> ok = alert_local(true, Alertees); - {1, 0} -> ok = alert_local(false, Alertees); + case {dict:size(AN), dict:size(AN1)} of + {0, 1} -> ok = alert_local(true, Alertees, Source); + {1, 0} -> ok = alert_local(false, Alertees, Source); {_, _} -> ok end, State#alarms{alarmed_nodes = AN1}. -alert_local(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=:='/2). +alert_local(Alert, Alertees, _Source) -> + alert(Alertees, [Alert], fun erlang:'=:='/2). -alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2). +alert_remote(Alert, Alertees, Source) -> + alert(Alertees, [Source, Alert], fun erlang:'=/='/2). -alert(Alert, Alertees, NodeComparator) -> +alert(Alertees, AlertArg, NodeComparator) -> Node = node(), dict:fold(fun (Pid, {M, F, A}, ok) -> case NodeComparator(Node, node(Pid)) of - true -> apply(M, F, A ++ [Pid, Alert]); + true -> apply(M, F, A ++ [Pid] ++ AlertArg); false -> ok end end, ok, Alertees). @@ -153,9 +180,9 @@ alert(Alert, Alertees, NodeComparator) -> internal_register(Pid, {M, F, A} = HighMemMFA, State = #alarms{alertees = Alertees}) -> _MRef = erlang:monitor(process, Pid), - case sets:is_element(node(), State#alarms.alarmed_nodes) of - true -> ok = apply(M, F, A ++ [Pid, true]); - false -> ok + case dict:find(node(), State#alarms.alarmed_nodes) of + {ok, _Sources} -> apply(M, F, A ++ [Pid, true]); + error -> ok end, NewAlertees = dict:store(Pid, HighMemMFA, Alertees), State#alarms{alertees = NewAlertees}. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6a775adf..51f88c8f 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -328,7 +328,7 @@ action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> 0 -> Arg ++ ".0"; _ -> Arg end), - Inform("Setting memory threshhold on ~p to ~p", [Node, Frac]), + Inform("Setting memory threshold on ~p to ~p", [Node, Frac]), rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]); action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl new file mode 100644 index 00000000..831d5290 --- /dev/null +++ b/src/rabbit_disk_monitor.erl @@ -0,0 +1,196 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_disk_monitor). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([get_disk_free_limit/0, set_disk_free_limit/1, get_check_interval/0, + set_check_interval/1, get_disk_free/0]). + +-define(SERVER, ?MODULE). +-define(DEFAULT_DISK_CHECK_INTERVAL, 60000). + +-record(state, {dir, + limit, + timeout, + timer, + alarmed + }). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(disk_free_limit() :: (integer() | {'mem_relative', float()})). +-spec(start_link/1 :: (disk_free_limit()) -> rabbit_types:ok_pid_or_error()). +-spec(get_disk_free_limit/0 :: () -> integer()). +-spec(set_disk_free_limit/1 :: (disk_free_limit()) -> 'ok'). +-spec(get_check_interval/0 :: () -> integer()). +-spec(set_check_interval/1 :: (integer()) -> 'ok'). +-spec(get_disk_free/0 :: () -> (integer() | 'unknown')). + +-endif. + +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + +get_disk_free_limit() -> + gen_server:call(?MODULE, get_disk_free_limit, infinity). + +set_disk_free_limit(Limit) -> + gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity). + +get_check_interval() -> + gen_server:call(?MODULE, get_check_interval, infinity). + +set_check_interval(Interval) -> + gen_server:call(?MODULE, {set_check_interval, Interval}, infinity). + +get_disk_free() -> + gen_server:call(?MODULE, get_disk_free, infinity). + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +start_link(Args) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). + +init([Limit]) -> + TRef = start_timer(?DEFAULT_DISK_CHECK_INTERVAL), + Dir = dir(), + State = #state { dir = Dir, + timeout = ?DEFAULT_DISK_CHECK_INTERVAL, + timer = TRef, + alarmed = false}, + case {catch get_disk_free(Dir), + vm_memory_monitor:get_total_memory()} of + {N1, N2} when is_integer(N1), is_integer(N2) -> + {ok, set_disk_limits(State, Limit)}; + _ -> + rabbit_log:info("Disabling disk free space monitoring " + "on unsupported platform~n"), + {stop, unsupported_platform} + end. + +handle_call(get_disk_free_limit, _From, State) -> + {reply, interpret_limit(State#state.limit), State}; + +handle_call({set_disk_free_limit, Limit}, _From, State) -> + {reply, ok, set_disk_limits(State, Limit)}; + +handle_call(get_check_interval, _From, State) -> + {reply, State#state.timeout, State}; + +handle_call({set_check_interval, Timeout}, _From, State) -> + {ok, cancel} = timer:cancel(State#state.timer), + {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; + +handle_call(get_disk_free, _From, State = #state { dir = Dir }) -> + {reply, get_disk_free(Dir), State}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(update, State) -> + {noreply, internal_update(State)}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- +%% Server Internals +%%---------------------------------------------------------------------------- + +% the partition / drive containing this directory will be monitored +dir() -> rabbit_mnesia:dir(). + +set_disk_limits(State, Limit) -> + State1 = State#state { limit = Limit }, + rabbit_log:info("Disk free limit set to ~pMB~n", + [trunc(interpret_limit(Limit) / 1048576)]), + internal_update(State1). + +internal_update(State = #state { limit = Limit, + dir = Dir, + alarmed = Alarmed}) -> + CurrentFreeBytes = get_disk_free(Dir), + LimitBytes = interpret_limit(Limit), + NewAlarmed = CurrentFreeBytes < LimitBytes, + case {Alarmed, NewAlarmed} of + {false, true} -> + emit_update_info("exceeded", CurrentFreeBytes, LimitBytes), + alarm_handler:set_alarm({{resource_limit, disk, node()}, []}); + {true, false} -> + emit_update_info("below limit", CurrentFreeBytes, LimitBytes), + alarm_handler:clear_alarm({resource_limit, disk, node()}); + _ -> + ok + end, + State #state {alarmed = NewAlarmed}. + +get_disk_free(Dir) -> + get_disk_free(Dir, os:type()). + +get_disk_free(Dir, {unix, Sun}) + when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris -> + parse_free_unix(rabbit_misc:os_cmd("/usr/bin/df -k " ++ Dir)); +get_disk_free(Dir, {unix, _}) -> + parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir)); +get_disk_free(Dir, {win32, _}) -> + parse_free_win32(rabbit_misc:os_cmd("dir /-C /W " ++ Dir)); +get_disk_free(_, _) -> + unknown. + +parse_free_unix(CommandResult) -> + [_, Stats | _] = string:tokens(CommandResult, "\n"), + [_FS, _Total, _Used, Free | _] = string:tokens(Stats, " \t"), + list_to_integer(Free) * 1024. + +parse_free_win32(CommandResult) -> + LastLine = lists:last(string:tokens(CommandResult, "\r\n")), + [_, _Dir, Free, "bytes", "free"] = string:tokens(LastLine, " "), + list_to_integer(Free). + +interpret_limit({mem_relative, R}) -> + round(R * vm_memory_monitor:get_total_memory()); +interpret_limit(L) -> + L. + +emit_update_info(State, CurrentFree, Limit) -> + rabbit_log:info( + "Disk free space limit now ~s. Free bytes:~p Limit:~p~n", + [State, CurrentFree, Limit]). + +start_timer(Timeout) -> + {ok, TRef} = timer:send_interval(Timeout, update), + TRef. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ddf7f574..39409d2f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -60,6 +60,7 @@ -export([append_rpc_all_nodes/4]). -export([multi_call/2]). -export([quit/1]). +-export([os_cmd/1]). %%---------------------------------------------------------------------------- @@ -204,6 +205,7 @@ -spec(multi_call/2 :: ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(quit/1 :: (integer() | string()) -> no_return()). +-spec(os_cmd/1 :: (string()) -> string()). -endif. @@ -914,3 +916,10 @@ quit(Status) -> {unix, _} -> halt(Status); {win32, _} -> init:stop(Status) end. + +os_cmd(Command) -> + Exec = hd(string:tokens(Command, " ")), + case os:find_executable(Exec) of + false -> throw({command_not_found, Exec}); + _ -> os:cmd(Command) + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 47e796dc..5e9e78d3 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -25,7 +25,7 @@ -export([init/4, mainloop/2]). --export([conserve_memory/2, server_properties/1]). +-export([conserve_resources/2, server_properties/1]). -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). @@ -38,7 +38,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, - auth_mechanism, auth_state, conserve_memory, + auth_mechanism, auth_state, conserve_resources, last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, @@ -71,7 +71,7 @@ -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(force_event_refresh/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). --spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(conserve_resources/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> rabbit_framing:amqp_table()). @@ -133,8 +133,8 @@ info(Pid, Items) -> force_event_refresh(Pid) -> gen_server:cast(Pid, force_event_refresh). -conserve_memory(Pid, Conserve) -> - Pid ! {conserve_memory, Conserve}, +conserve_resources(Pid, Conserve) -> + Pid ! {conserve_resources, Conserve}, ok. server_properties(Protocol) -> @@ -218,7 +218,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, buf_len = 0, auth_mechanism = none, auth_state = none, - conserve_memory = false, + conserve_resources = false, last_blocked_by = none, last_blocked_at = never}, try @@ -276,8 +276,8 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> {other, Other} -> handle_other(Other, Deb, State) end. -handle_other({conserve_memory, Conserve}, Deb, State) -> - recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve})); +handle_other({conserve_resources, Conserve}, Deb, State) -> + recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve})); handle_other({channel_closing, ChPid}, Deb, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), @@ -358,8 +358,8 @@ terminate(Explanation, State) when ?IS_RUNNING(State) -> terminate(_Explanation, State) -> {force, State}. -control_throttle(State = #v1{connection_state = CS, - conserve_memory = Mem}) -> +control_throttle(State = #v1{connection_state = CS, + conserve_resources = Mem}) -> case {CS, Mem orelse credit_flow:blocked()} of {running, true} -> State#v1{connection_state = blocking}; {blocking, false} -> State#v1{connection_state = running}; @@ -377,9 +377,9 @@ maybe_block(State = #v1{connection_state = blocking}) -> maybe_block(State) -> State. -update_last_blocked_by(State = #v1{conserve_memory = true}) -> - State#v1{last_blocked_by = mem}; -update_last_blocked_by(State = #v1{conserve_memory = false}) -> +update_last_blocked_by(State = #v1{conserve_resources = true}) -> + State#v1{last_blocked_by = resource}; +update_last_blocked_by(State = #v1{conserve_resources = false}) -> State#v1{last_blocked_by = flow}. close_connection(State = #v1{queue_collector = Collector, @@ -701,11 +701,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), - Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), State1 = control_throttle( - State#v1{connection_state = running, - connection = NewConnection, - conserve_memory = Conserve}), + State#v1{connection_state = running, + connection = NewConnection, + conserve_resources = Conserve}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 0965e3b3..bf2b4798 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -52,22 +52,20 @@ start_child(Mod, Args) -> start_child(Mod, Mod, Args). start_child(ChildId, Mod, Args) -> - {ok, _} = supervisor:start_child(?SERVER, - {ChildId, {Mod, start_link, Args}, - transient, ?MAX_WAIT, worker, [Mod]}), - ok. + child_reply(supervisor:start_child(?SERVER, + {ChildId, {Mod, start_link, Args}, + transient, ?MAX_WAIT, worker, [Mod]})). start_restartable_child(Mod) -> start_restartable_child(Mod, []). start_restartable_child(Mod, Args) -> Name = list_to_atom(atom_to_list(Mod) ++ "_sup"), - {ok, _} = supervisor:start_child( - ?SERVER, - {Name, {rabbit_restartable_sup, start_link, - [Name, {Mod, start_link, Args}]}, - transient, infinity, supervisor, [rabbit_restartable_sup]}), - ok. + child_reply(supervisor:start_child( + ?SERVER, + {Name, {rabbit_restartable_sup, start_link, + [Name, {Mod, start_link, Args}]}, + transient, infinity, supervisor, [rabbit_restartable_sup]})). stop_child(ChildId) -> case supervisor:terminate_child(?SERVER, ChildId) of @@ -77,3 +75,9 @@ stop_child(ChildId) -> init([]) -> {ok, {{one_for_all, 0, 1}, []}}. + + +%%---------------------------------------------------------------------------- + +child_reply({ok, _}) -> ok; +child_reply(X) -> X. diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index fca55f02..fb184d1a 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -181,10 +181,10 @@ internal_update(State = #state { memory_limit = MemLimit, case {Alarmed, NewAlarmed} of {false, true} -> emit_update_info(set, MemUsed, MemLimit), - alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []}); + alarm_handler:set_alarm({{resource_limit, memory, node()}, []}); {true, false} -> emit_update_info(clear, MemUsed, MemLimit), - alarm_handler:clear_alarm({vm_memory_high_watermark, node()}); + alarm_handler:clear_alarm({resource_limit, memory, node()}); _ -> ok end, |