diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/disk_monitor.erl | 185 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 93 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 9 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 34 | ||||
-rw-r--r-- | src/vm_memory_monitor.erl | 18 |
5 files changed, 278 insertions, 61 deletions
diff --git a/src/disk_monitor.erl b/src/disk_monitor.erl new file mode 100644 index 00000000..03014b50 --- /dev/null +++ b/src/disk_monitor.erl @@ -0,0 +1,185 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(disk_monitor). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([get_disk_free_limit/0, set_disk_free_limit/1, get_check_interval/0, + set_check_interval/1, get_disk_free/0]). + +-define(SERVER, ?MODULE). +-define(DEFAULT_DISK_CHECK_INTERVAL, 60000). + +-record(state, {dir, + limit, + timeout, + timer, + alarmed + }). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: ({'absolute', integer()} | {'mem_relative', float()}) + -> rabbit_types:ok_pid_or_error()). +-spec(get_disk_free_limit/0 :: () -> integer()). +-spec(set_disk_free_limit/1 :: (integer()) -> 'ok'). +-spec(get_check_interval/0 :: () -> integer()). +-spec(set_check_interval/1 :: (integer()) -> 'ok'). +-spec(get_disk_free/0 :: () -> (integer() | 'unknown')). + +-endif. + +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + +get_disk_free_limit() -> + gen_server:call(?MODULE, get_disk_free_limit, infinity). + +set_disk_free_limit(Limit) -> + gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity). + +get_check_interval() -> + gen_server:call(?MODULE, get_check_interval, infinity). + +set_check_interval(Interval) -> + gen_server:call(?MODULE, {set_check_interval, Interval}, infinity). + +get_disk_free() -> + gen_server:call(?MODULE, get_disk_free, infinity). + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +start_link(Args) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). + +init([Limit]) -> + TRef = start_timer(?DEFAULT_DISK_CHECK_INTERVAL), + State = #state { dir = dir(), + limit = Limit, + timeout = ?DEFAULT_DISK_CHECK_INTERVAL, + timer = TRef, + alarmed = false}, + {ok, internal_update(State)}. + +handle_call(get_disk_free_limit, _From, State) -> + {reply, State#state.limit, State}; + +handle_call({set_disk_free_limit, Limit}, _From, State) -> + State1 = State#state { limit = Limit }, + error_logger:info_msg("Disk free limit changed to ~p bytes.~n", [Limit]), + {reply, ok, internal_update(State1)}; + +handle_call(get_check_interval, _From, State) -> + {reply, State#state.timeout, State}; + +handle_call({set_check_interval, Timeout}, _From, State) -> + {ok, cancel} = timer:cancel(State#state.timer), + {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; + +handle_call(get_disk_free, _From, State = #state { dir = Dir }) -> + {reply, get_disk_free(Dir), State}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(update, State) -> + {noreply, internal_update(State)}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- +%% Server Internals +%%---------------------------------------------------------------------------- + +% the partition / drive containing this directory will be monitored +dir() -> rabbit_mnesia:dir(). + +internal_update(State = #state { limit = Limit, + dir = Dir, + alarmed = Alarmed}) -> + CurrentFreeBytes = get_disk_free(Dir), + LimitBytes = case Limit of + {absolute, L} -> + L; + {mem_relative, R} -> + round(R * vm_memory_monitor:get_total_memory()) + end, +error_logger:info_msg("disk monitor comparing ~p and ~p~n",[CurrentFreeBytes,LimitBytes]), + NewAlarmed = CurrentFreeBytes < LimitBytes, + case {Alarmed, NewAlarmed} of + {false, true} -> + emit_update_info(exceeded, CurrentFreeBytes, LimitBytes), + alarm_handler:set_alarm({{resource_limit, disk, node()}, []}); + {true, false} -> + emit_update_info(obeyed, CurrentFreeBytes, LimitBytes), + alarm_handler:clear_alarm({resource_limit, disk, node()}); + _ -> + ok + end, + State #state {alarmed = NewAlarmed}. + +get_disk_free(Dir) -> + get_disk_free(Dir, os:type()). + +get_disk_free(Dir, {unix, Sun}) + when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris -> + parse_free_unix(rabbit_misc:os_cmd("/usr/bin/df -k " ++ Dir)); +get_disk_free(Dir, {unix, _}) -> + parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir)); +get_disk_free(Dir, {win32, _}) -> + parse_free_win32(rabbit_misc:os_cmd("dir /-C /W " ++ Dir)); +get_disk_free(_, _) -> + unknown. + +parse_free_unix(CommandResult) -> + [_, Stats | _] = string:tokens(CommandResult, "\n"), + [_FS, _Total, _Used, Free | _] = string:tokens(Stats, " \t"), + list_to_integer(Free) * 1024. + +parse_free_win32(CommandResult) -> + LastLine = lists:last(string:tokens(CommandResult, "\r\n")), + [_, _Dir, Free, "bytes", "free"] = string:tokens(LastLine, " "), + list_to_integer(Free). + +emit_update_info(State, CurrentFree, Limit) -> + error_logger:info_msg( + "Disk free space limit now ~p. Free bytes:~p Limit:~p~n", + [State, CurrentFree, Limit]). + +start_timer(Timeout) -> + {ok, TRef} = timer:send_interval(Timeout, update), + TRef. diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 187ec1ab..c95966b5 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -23,7 +23,7 @@ -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). --export([remote_conserve_memory/2]). %% Internal use only +-export([remote_conserve_resources/3]). %% Internal use only -record(alarms, {alertees, alarmed_nodes}). @@ -45,6 +45,14 @@ start() -> ok = alarm_handler:add_alarm_handler(?MODULE, []), {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]), + + {ok, DiskLimit} = application:get_env(disk_free_limit), + rabbit_sup:start_restartable_child(disk_monitor, [DiskLimit]), + case disk_monitor:get_disk_free() of + Number when is_integer(Number) -> ok; + _ -> error_logger:warning_msg("Disabling disk free space monitoring~n"), + ok = rabbit_sup:stop_child(disk_monitor_sup) + end, ok. stop() -> @@ -61,41 +69,41 @@ on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}). %% Can't use alarm_handler:{set,clear}_alarm because that doesn't %% permit notifying a remote node. -remote_conserve_memory(Pid, true) -> +remote_conserve_resources(Pid, Source, true) -> gen_event:notify({alarm_handler, node(Pid)}, - {set_alarm, {{vm_memory_high_watermark, node()}, []}}); -remote_conserve_memory(Pid, false) -> + {set_alarm, {{resource_limit, Source, node()}, []}}); +remote_conserve_resources(Pid, Source, false) -> gen_event:notify({alarm_handler, node(Pid)}, - {clear_alarm, {vm_memory_high_watermark, node()}}). + {clear_alarm, {resource_limit, Source, node()}}). %%---------------------------------------------------------------------------- init([]) -> {ok, #alarms{alertees = dict:new(), - alarmed_nodes = sets:new()}}. + alarmed_nodes = dict:new()}}. handle_call({register, Pid, HighMemMFA}, State) -> - {ok, 0 < sets:size(State#alarms.alarmed_nodes), + {ok, 0 < dict:size(State#alarms.alarmed_nodes), internal_register(Pid, HighMemMFA, State)}; handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) -> - {ok, maybe_alert(fun sets:add_element/2, Node, State)}; +handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) -> + {ok, maybe_alert(fun dict:append/3, Node, Source, State)}; -handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) -> - {ok, maybe_alert(fun sets:del_element/2, Node, State)}; +handle_event({clear_alarm, {resource_limit, Source, Node}}, State) -> + {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)}; handle_event({node_up, Node}, State) -> %% Must do this via notify and not call to avoid possible deadlock. ok = gen_event:notify( {alarm_handler, Node}, - {register, self(), {?MODULE, remote_conserve_memory, []}}), + {register, self(), {?MODULE, remote_conserve_resources, []}}), {ok, State}; handle_event({node_down, Node}, State) -> - {ok, maybe_alert(fun sets:del_element/2, Node, State)}; + {ok, maybe_alert(fun dict_unappend/3, Node, '$all_values', State)}; handle_event({register, Pid, HighMemMFA}, State) -> {ok, internal_register(Pid, HighMemMFA, State)}; @@ -103,7 +111,7 @@ handle_event({register, Pid, HighMemMFA}, State) -> handle_event(_Event, State) -> {ok, State}. -handle_info({'DOWN', _MRef, process, Pid, _Reason}, +handle_info({'DOWN', _MRef, process, Pid, _Source}, State = #alarms{alertees = Alertees}) -> {ok, State#alarms{alertees = dict:erase(Pid, Alertees)}}; @@ -118,34 +126,55 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN, - alertees = Alertees}) -> - AN1 = SetFun(Node, AN), - BeforeSz = sets:size(AN), - AfterSz = sets:size(AN1), +dict_unappend(Key, '$all_values', Dict) -> + dict:erase(Key, Dict); +dict_unappend(Key, Val, Dict) -> + case lists:delete(Val, dict:fetch(Key, Dict)) of + [] -> dict:erase(Key, Dict); + X -> dict:store(Key, X, Dict) + end. + +count_dict_values(Val, Dict) -> + dict:fold(fun (_Node, List, Count) -> + Count + case lists:member(Val, List) of + true -> 1; + false -> 0 + end + end, 0, Dict). + +maybe_alert(UpdateFun, Node, Source, + State = #alarms{alarmed_nodes = AN, + alertees = Alertees}) -> + AN1 = UpdateFun(Node, Source, AN), + BeforeSz = count_dict_values(Source, AN), + AfterSz = count_dict_values(Source, AN1), + %% If we have changed our alarm state, inform the remotes. IsLocal = Node =:= node(), - if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true, Alertees); - IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees); - true -> ok + if IsLocal andalso BeforeSz < AfterSz -> + ok = alert_remote({true, Alertees, Source}); + IsLocal andalso BeforeSz > AfterSz -> + ok = alert_remote({false, Alertees, Source}); + true -> + ok end, %% If the overall alarm state has changed, inform the locals. - case {BeforeSz, AfterSz} of - {0, 1} -> ok = alert_local(true, Alertees); - {1, 0} -> ok = alert_local(false, Alertees); + case {dict:size(AN), dict:size(AN1)} of + {0, 1} -> ok = alert_local({true, Alertees, Source}); + {1, 0} -> ok = alert_local({false, Alertees, Source}); {_, _} -> ok end, State#alarms{alarmed_nodes = AN1}. -alert_local(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=:='/2). +alert_local(AlertSpec) -> alert(AlertSpec, fun erlang:'=:='/2). -alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2). +alert_remote(AlertSpec) -> alert(AlertSpec, fun erlang:'=/='/2). -alert(Alert, Alertees, NodeComparator) -> +alert({Alert, Alertees, Source}, NodeComparator) -> Node = node(), dict:fold(fun (Pid, {M, F, A}, ok) -> case NodeComparator(Node, node(Pid)) of - true -> apply(M, F, A ++ [Pid, Alert]); + true -> apply(M, F, A ++ [Pid, Source, Alert]); false -> ok end end, ok, Alertees). @@ -153,9 +182,9 @@ alert(Alert, Alertees, NodeComparator) -> internal_register(Pid, {M, F, A} = HighMemMFA, State = #alarms{alertees = Alertees}) -> _MRef = erlang:monitor(process, Pid), - case sets:is_element(node(), State#alarms.alarmed_nodes) of - true -> ok = apply(M, F, A ++ [Pid, true]); - false -> ok + case dict:find(node(), State#alarms.alarmed_nodes) of + {ok, Sources} -> [apply(M, F, A ++ [Pid, R, true]) || R <- Sources]; + error -> ok end, NewAlertees = dict:store(Pid, HighMemMFA, Alertees), State#alarms{alertees = NewAlertees}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ddf7f574..39409d2f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -60,6 +60,7 @@ -export([append_rpc_all_nodes/4]). -export([multi_call/2]). -export([quit/1]). +-export([os_cmd/1]). %%---------------------------------------------------------------------------- @@ -204,6 +205,7 @@ -spec(multi_call/2 :: ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(quit/1 :: (integer() | string()) -> no_return()). +-spec(os_cmd/1 :: (string()) -> string()). -endif. @@ -914,3 +916,10 @@ quit(Status) -> {unix, _} -> halt(Status); {win32, _} -> init:stop(Status) end. + +os_cmd(Command) -> + Exec = hd(string:tokens(Command, " ")), + case os:find_executable(Exec) of + false -> throw({command_not_found, Exec}); + _ -> os:cmd(Command) + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 47e796dc..5d08d28c 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -25,7 +25,7 @@ -export([init/4, mainloop/2]). --export([conserve_memory/2, server_properties/1]). +-export([conserve_resources/3, server_properties/1]). -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). @@ -38,7 +38,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, - auth_mechanism, auth_state, conserve_memory, + auth_mechanism, auth_state, conserve_resources, last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, @@ -71,7 +71,7 @@ -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(force_event_refresh/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). --spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(conserve_resources/3 :: (pid(), atom(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> rabbit_framing:amqp_table()). @@ -133,8 +133,8 @@ info(Pid, Items) -> force_event_refresh(Pid) -> gen_server:cast(Pid, force_event_refresh). -conserve_memory(Pid, Conserve) -> - Pid ! {conserve_memory, Conserve}, +conserve_resources(Pid, _Source, Conserve) -> + Pid ! {conserve_resources, Conserve}, ok. server_properties(Protocol) -> @@ -218,7 +218,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, buf_len = 0, auth_mechanism = none, auth_state = none, - conserve_memory = false, + conserve_resources = false, last_blocked_by = none, last_blocked_at = never}, try @@ -276,8 +276,8 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> {other, Other} -> handle_other(Other, Deb, State) end. -handle_other({conserve_memory, Conserve}, Deb, State) -> - recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve})); +handle_other({conserve_resources, Conserve}, Deb, State) -> + recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve})); handle_other({channel_closing, ChPid}, Deb, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), @@ -358,8 +358,8 @@ terminate(Explanation, State) when ?IS_RUNNING(State) -> terminate(_Explanation, State) -> {force, State}. -control_throttle(State = #v1{connection_state = CS, - conserve_memory = Mem}) -> +control_throttle(State = #v1{connection_state = CS, + conserve_resources = Mem}) -> case {CS, Mem orelse credit_flow:blocked()} of {running, true} -> State#v1{connection_state = blocking}; {blocking, false} -> State#v1{connection_state = running}; @@ -377,9 +377,9 @@ maybe_block(State = #v1{connection_state = blocking}) -> maybe_block(State) -> State. -update_last_blocked_by(State = #v1{conserve_memory = true}) -> - State#v1{last_blocked_by = mem}; -update_last_blocked_by(State = #v1{conserve_memory = false}) -> +update_last_blocked_by(State = #v1{conserve_resources = true}) -> + State#v1{last_blocked_by = resource}; +update_last_blocked_by(State = #v1{conserve_resources = false}) -> State#v1{last_blocked_by = flow}. close_connection(State = #v1{queue_collector = Collector, @@ -701,11 +701,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), - Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), State1 = control_throttle( - State#v1{connection_state = running, - connection = NewConnection, - conserve_memory = Conserve}), + State#v1{connection_state = running, + connection = NewConnection, + conserve_resources = Conserve}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index fca55f02..436eaee4 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -181,10 +181,10 @@ internal_update(State = #state { memory_limit = MemLimit, case {Alarmed, NewAlarmed} of {false, true} -> emit_update_info(set, MemUsed, MemLimit), - alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []}); + alarm_handler:set_alarm({{resource_limit, memory, node()}, []}); {true, false} -> emit_update_info(clear, MemUsed, MemLimit), - alarm_handler:clear_alarm({vm_memory_high_watermark, node()}); + alarm_handler:clear_alarm({resource_limit, memory, node()}); _ -> ok end, @@ -223,19 +223,13 @@ get_mem_limit(MemFraction, TotalMemory) -> %%---------------------------------------------------------------------------- %% Internal Helpers %%---------------------------------------------------------------------------- -cmd(Command) -> - Exec = hd(string:tokens(Command, " ")), - case os:find_executable(Exec) of - false -> throw({command_not_found, Exec}); - _ -> os:cmd(Command) - end. %% get_total_memory(OS) -> Total %% Windows and Freebsd code based on: memsup:get_memory_usage/1 %% Original code was part of OTP and released under "Erlang Public License". get_total_memory({unix,darwin}) -> - File = cmd("/usr/bin/vm_stat"), + File = rabbit_misc:os_cmd("/usr/bin/vm_stat"), Lines = string:tokens(File, "\n"), Dict = dict:from_list(lists:map(fun parse_line_mach/1, Lines)), [PageSize, Inactive, Active, Free, Wired] = @@ -284,13 +278,13 @@ get_total_memory({unix, linux}) -> dict:fetch('MemTotal', Dict); get_total_memory({unix, sunos}) -> - File = cmd("/usr/sbin/prtconf"), + File = rabbit_misc:os_cmd("/usr/sbin/prtconf"), Lines = string:tokens(File, "\n"), Dict = dict:from_list(lists:map(fun parse_line_sunos/1, Lines)), dict:fetch('Memory size', Dict); get_total_memory({unix, aix}) -> - File = cmd("/usr/bin/vmstat -v"), + File = rabbit_misc:os_cmd("/usr/bin/vmstat -v"), Lines = string:tokens(File, "\n"), Dict = dict:from_list(lists:map(fun parse_line_aix/1, Lines)), dict:fetch('memory pages', Dict) * 4096; @@ -352,7 +346,7 @@ parse_line_aix(Line) -> end}. sysctl(Def) -> - list_to_integer(cmd("/sbin/sysctl -n " ++ Def) -- "\n"). + list_to_integer(rabbit_misc:os_cmd("/sbin/sysctl -n " ++ Def) -- "\n"). %% file:read_file does not work on files in /proc as it seems to get %% the size of the file first and then read that many bytes. But files |