diff options
Diffstat (limited to 'deps/rabbit/src/rabbit_sysmon_handler.erl')
-rw-r--r-- | deps/rabbit/src/rabbit_sysmon_handler.erl | 235 |
1 files changed, 235 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_sysmon_handler.erl b/deps/rabbit/src/rabbit_sysmon_handler.erl new file mode 100644 index 0000000000..8f7298ed6e --- /dev/null +++ b/deps/rabbit/src/rabbit_sysmon_handler.erl @@ -0,0 +1,235 @@ +%% Copyright (c) 2011 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% https://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. + +%% @doc A custom event handler to the `sysmon_handler' application's +%% `system_monitor' event manager. +%% +%% This module attempts to discover more information about a process +%% that generates a system_monitor event. + +-module(rabbit_sysmon_handler). + +-behaviour(gen_event). + +%% API +-export([add_handler/0]). + +%% gen_event callbacks +-export([init/1, handle_event/2, handle_call/2, + handle_info/2, terminate/2, code_change/3]). + +-record(state, {timer_ref :: reference() | undefined}). + +-define(INACTIVITY_TIMEOUT, 5000). + +%%%=================================================================== +%%% gen_event callbacks +%%%=================================================================== + +add_handler() -> + %% Vulnerable to race conditions (installing handler multiple + %% times), but risk is zero in the common OTP app startup case. + case lists:member(?MODULE, gen_event:which_handlers(sysmon_handler)) of + true -> + ok; + false -> + sysmon_handler_filter:add_custom_handler(?MODULE, []) + end. + +%%%=================================================================== +%%% gen_event callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever a new event handler is added to an event manager, +%% this function is called to initialize the event handler. +%% +%% @spec init(Args) -> {ok, State} +%% @end +%%-------------------------------------------------------------------- +init([]) -> + {ok, #state{}, hibernate}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event manager receives an event sent using +%% gen_event:notify/2 or gen_event:sync_notify/2, this function is +%% called for each installed event handler to handle the event. +%% +%% @spec handle_event(Event, State) -> +%% {ok, State} | +%% {swap_handler, Args1, State1, Mod2, Args2} | +%% remove_handler +%% @end +%%-------------------------------------------------------------------- +handle_event({monitor, Pid, Type, _Info}, + State=#state{timer_ref=TimerRef}) when Pid == self() -> + %% Reset the inactivity timeout + NewTimerRef = reset_timer(TimerRef), + maybe_collect_garbage(Type), + {ok, State#state{timer_ref=NewTimerRef}}; +handle_event({monitor, PidOrPort, Type, Info}, State=#state{timer_ref=TimerRef}) -> + %% Reset the inactivity timeout + NewTimerRef = reset_timer(TimerRef), + {Fmt, Args} = format_pretty_proc_or_port_info(PidOrPort), + rabbit_log:warning("~p ~w ~w " ++ Fmt ++ " ~w", [?MODULE, Type, PidOrPort] ++ Args ++ [Info]), + {ok, State#state{timer_ref=NewTimerRef}}; +handle_event({suppressed, Type, Info}, State=#state{timer_ref=TimerRef}) -> + %% Reset the inactivity timeout + NewTimerRef = reset_timer(TimerRef), + rabbit_log:debug("~p encountered a suppressed event of type ~w: ~w", [?MODULE, Type, Info]), + {ok, State#state{timer_ref=NewTimerRef}}; +handle_event(Event, State=#state{timer_ref=TimerRef}) -> + NewTimerRef = reset_timer(TimerRef), + rabbit_log:warning("~p unhandled event: ~p", [?MODULE, Event]), + {ok, State#state{timer_ref=NewTimerRef}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event manager receives a request sent using +%% gen_event:call/3,4, this function is called for the specified +%% event handler to handle the request. +%% +%% @spec handle_call(Request, State) -> +%% {ok, Reply, State} | +%% {swap_handler, Reply, Args1, State1, Mod2, Args2} | +%% {remove_handler, Reply} +%% @end +%%-------------------------------------------------------------------- +handle_call(_Call, State) -> + Reply = not_supported, + {ok, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called for each installed event handler when +%% an event manager receives any other message than an event or a +%% synchronous request (or a system message). +%% +%% @spec handle_info(Info, State) -> +%% {ok, State} | +%% {swap_handler, Args1, State1, Mod2, Args2} | +%% remove_handler +%% @end +%%-------------------------------------------------------------------- +handle_info(inactivity_timeout, State) -> + %% No events have arrived for the timeout period + %% so hibernate to free up resources. + {ok, State, hibernate}; +handle_info(Info, State) -> + rabbit_log:info("handle_info got ~p", [Info]), + {ok, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event handler is deleted from an event manager, this +%% function is called. It should be the opposite of Module:init/1 and +%% do any necessary cleaning up. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +format_pretty_proc_or_port_info(PidOrPort) -> + try + case get_pretty_proc_or_port_info(PidOrPort) of + undefined -> + {"", []}; + Res -> + Res + end + catch C:E:S -> + {"Pid ~w, ~W ~W at ~w\n", + [PidOrPort, C, 20, E, 20, S]} + end. + +get_pretty_proc_or_port_info(Pid) when is_pid(Pid) -> + Infos = [registered_name, initial_call, current_function, message_queue_len], + case process_info(Pid, Infos) of + undefined -> + undefined; + [] -> + undefined; + [{registered_name, RN0}, ICT1, {_, CF}, {_, MQL}] -> + ICT = case proc_lib:translate_initial_call(Pid) of + {proc_lib, init_p, 5} -> % not by proc_lib, see docs + ICT1; + ICT2 -> + {initial_call, ICT2} + end, + RNL = if RN0 == [] -> []; + true -> [{name, RN0}] + end, + {"~w", [RNL ++ [ICT, CF, {message_queue_len, MQL}]]} + end; +get_pretty_proc_or_port_info(Port) when is_port(Port) -> + PortInfo = erlang:port_info(Port), + {value, {name, Name}, PortInfo2} = lists:keytake(name, 1, PortInfo), + QueueSize = [erlang:port_info(Port, queue_size)], + Connected = case proplists:get_value(connected, PortInfo2) of + undefined -> + []; + ConnectedPid -> + case proc_lib:translate_initial_call(ConnectedPid) of + {proc_lib, init_p, 5} -> % not by proc_lib, see docs + []; + ICT -> + [{initial_call, ICT}] + end + end, + {"name ~s ~w", [Name, lists:append([PortInfo2, QueueSize, Connected])]}. + + +%% @doc If the message type is due to a large heap warning +%% and the source is ourself, go ahead and collect garbage +%% to avoid the death spiral. +-spec maybe_collect_garbage(atom()) -> ok. +maybe_collect_garbage(large_heap) -> + erlang:garbage_collect(), + ok; +maybe_collect_garbage(_) -> + ok. + +-spec reset_timer(undefined | reference()) -> reference(). +reset_timer(undefined) -> + erlang:send_after(?INACTIVITY_TIMEOUT, self(), inactivity_timeout); +reset_timer(TimerRef) -> + _ = erlang:cancel_timer(TimerRef), + reset_timer(undefined). |