summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/kernel/src/Makefile3
-rw-r--r--lib/kernel/src/kernel.app.src2
-rw-r--r--lib/kernel/src/logger.erl51
-rw-r--r--lib/kernel/src/logger_h_common.erl19
-rw-r--r--lib/kernel/src/logger_h_common.hrl29
-rw-r--r--lib/kernel/src/logger_olp.erl213
-rw-r--r--lib/kernel/src/logger_proxy.erl152
-rw-r--r--lib/kernel/src/logger_server.erl20
-rw-r--r--lib/kernel/src/logger_sup.erl4
-rw-r--r--lib/kernel/test/logger.cover1
10 files changed, 392 insertions, 102 deletions
diff --git a/lib/kernel/src/Makefile b/lib/kernel/src/Makefile
index c076726bf4..c3fee02334 100644
--- a/lib/kernel/src/Makefile
+++ b/lib/kernel/src/Makefile
@@ -119,6 +119,7 @@ MODULES = \
logger_filters \
logger_formatter \
logger_olp \
+ logger_proxy \
logger_server \
logger_simple_h \
logger_sup \
@@ -280,6 +281,8 @@ $(EBIN)/logger_config.beam: logger_internal.hrl ../include/logger.hrl
$(EBIN)/logger_disk_log_h.beam: logger_h_common.hrl logger_internal.hrl ../include/logger.hrl ../include/file.hrl
$(EBIN)/logger_filters.beam: logger_internal.hrl ../include/logger.hrl
$(EBIN)/logger_formatter.beam: logger_internal.hrl ../include/logger.hrl
+$(EBIN)/logger_olp.beam: logger_h_common.hrl logger_internal.hrl
+$(EBIN)/logger_proxy.beam: logger_internal.hrl
$(EBIN)/logger_server.beam: logger_internal.hrl ../include/logger.hrl
$(EBIN)/logger_simple_h.beam: logger_internal.hrl ../include/logger.hrl
$(EBIN)/logger_std_h.beam: logger_h_common.hrl logger_internal.hrl ../include/logger.hrl ../include/file.hrl
diff --git a/lib/kernel/src/kernel.app.src b/lib/kernel/src/kernel.app.src
index fe073621c8..a1d9e8e215 100644
--- a/lib/kernel/src/kernel.app.src
+++ b/lib/kernel/src/kernel.app.src
@@ -68,6 +68,8 @@
logger_formatter,
logger_h_common,
logger_handler_watcher,
+ logger_olp,
+ logger_proxy,
logger_server,
logger_simple_h,
logger_std_h,
diff --git a/lib/kernel/src/logger.erl b/lib/kernel/src/logger.erl
index 6762998d4f..1611d489e6 100644
--- a/lib/kernel/src/logger.erl
+++ b/lib/kernel/src/logger.erl
@@ -672,6 +672,17 @@ init_kernel_handlers(Env) ->
%% This function is responsible for resolving the handler config
%% and then starting the correct handlers. This is done after the
%% kernel supervisor tree has been started as it needs the logger_sup.
+add_handlers(kernel) ->
+ Env = get_logger_env(kernel),
+ case get_proxy_opts(Env) of
+ undefined ->
+ add_handlers(kernel,Env);
+ Opts ->
+ case logger_olp:set_opts(logger_proxy,Opts) of
+ ok -> add_handlers(kernel,Env);
+ {error, Reason} -> {error,{bad_proxy_config,Reason}}
+ end
+ end;
add_handlers(App) when is_atom(App) ->
add_handlers(App,get_logger_env(App));
add_handlers(HandlerConfig) ->
@@ -729,6 +740,8 @@ check_logger_config(kernel,[{filters,_,_}|Env]) ->
check_logger_config(kernel,Env);
check_logger_config(kernel,[{module_level,_,_}|Env]) ->
check_logger_config(kernel,Env);
+check_logger_config(kernel,[{proxy,_}|Env]) ->
+ check_logger_config(kernel,Env);
check_logger_config(_,Bad) ->
throw(Bad).
@@ -784,6 +797,13 @@ get_primary_filters(Env) ->
_ -> throw({multiple_filters,Env})
end.
+get_proxy_opts(Env) ->
+ case [P || P={proxy,_} <- Env] of
+ [{proxy,Opts}] -> Opts;
+ [] -> undefined;
+ _ -> throw({multiple_proxies,Env})
+ end.
+
%% This function looks at the kernel logger environment
%% and updates it so that the correct logger is configured
init_default_config(Type,Env) when Type==standard_io;
@@ -878,42 +898,43 @@ log_allowed(Location,Level,Msg,Meta0) when is_map(Meta0) ->
%% (function or macro).
Meta = add_default_metadata(
maps:merge(Location,maps:merge(proc_meta(),Meta0))),
+ Tid = tid(),
case node(maps:get(gl,Meta)) of
Node when Node=/=node() ->
- log_remote(Node,Level,Msg,Meta),
- do_log_allowed(Level,Msg,Meta);
+ log_remote(Node,Level,Msg,Meta,Tid),
+ do_log_allowed(Level,Msg,Meta,Tid);
_ ->
- do_log_allowed(Level,Msg,Meta)
+ do_log_allowed(Level,Msg,Meta,Tid)
end.
-do_log_allowed(Level,{Format,Args}=Msg,Meta)
+do_log_allowed(Level,{Format,Args}=Msg,Meta,Tid)
when ?IS_LEVEL(Level),
is_list(Format),
is_list(Args),
is_map(Meta) ->
- logger_backend:log_allowed(#{level=>Level,msg=>Msg,meta=>Meta},tid());
-do_log_allowed(Level,Report,Meta)
+ logger_backend:log_allowed(#{level=>Level,msg=>Msg,meta=>Meta},Tid);
+do_log_allowed(Level,Report,Meta,Tid)
when ?IS_LEVEL(Level),
?IS_REPORT(Report),
is_map(Meta) ->
logger_backend:log_allowed(#{level=>Level,msg=>{report,Report},meta=>Meta},
- tid());
-do_log_allowed(Level,String,Meta)
+ Tid);
+do_log_allowed(Level,String,Meta,Tid)
when ?IS_LEVEL(Level),
?IS_STRING(String),
is_map(Meta) ->
logger_backend:log_allowed(#{level=>Level,msg=>{string,String},meta=>Meta},
- tid()).
+ Tid).
tid() ->
ets:whereis(?LOGGER_TABLE).
-log_remote(Node,Level,{Format,Args},Meta) ->
- log_remote(Node,{log,Level,Format,Args,Meta});
-log_remote(Node,Level,Msg,Meta) ->
- log_remote(Node,{log,Level,Msg,Meta}).
+log_remote(Node,Level,{Format,Args},Meta,Tid) ->
+ log_remote(Node,{log,Level,Format,Args,Meta},Tid);
+log_remote(Node,Level,Msg,Meta,Tid) ->
+ log_remote(Node,{log,Level,Msg,Meta},Tid).
-log_remote(Node,Request) ->
- {logger,Node} ! Request,
+log_remote(Node,Request,Tid) ->
+ logger_proxy:log(logger_server:get_proxy_ref(Tid),{remote,Node,Request}),
ok.
add_default_metadata(Meta) ->
diff --git a/lib/kernel/src/logger_h_common.erl b/lib/kernel/src/logger_h_common.erl
index dd8ace8249..6f55c5997d 100644
--- a/lib/kernel/src/logger_h_common.erl
+++ b/lib/kernel/src/logger_h_common.erl
@@ -178,7 +178,7 @@ changing_config(SetOrUpdate,
log(LogEvent, Config = #{config := #{olp:=Olp}}) ->
%% if the handler has crashed, we must drop this event
%% and hope the handler restarts so we can try again
- true = logger_olp:is_alive(Olp),
+ true = is_process_alive(logger_olp:get_pid(Olp)),
Bin = log_to_binary(LogEvent, Config),
logger_olp:load(Olp,Bin).
@@ -206,8 +206,9 @@ start(OlpOpts0, #{id := Name, module:=Module, config:=HConfig} = Config0) ->
type => worker,
modules => [?MODULE]},
case supervisor:start_child(logger_sup, ChildSpec) of
- {ok,Pid,{Olp,OlpOpts}} ->
+ {ok,Pid,Olp} ->
ok = logger_handler_watcher:register_handler(Name,Pid),
+ OlpOpts = logger_olp:get_opts(Olp),
{ok,Config0#{config=>(maps:merge(HConfig,OlpOpts))#{olp=>Olp}}};
{error,{Reason,Ch}} when is_tuple(Ch), element(1,Ch)==child ->
{error,Reason};
@@ -246,14 +247,14 @@ handle_load(Bin, #{id:=Name,
ctrl_sync_count := CtrlSync}=State) ->
if CtrlSync==0 ->
{_,HS1} = Module:write(Name, sync, Bin, HandlerState),
- {ok,State#{handler_state => HS1,
- ctrl_sync_count => ?CONTROLLER_SYNC_INTERVAL,
- last_op=>write}};
+ State#{handler_state => HS1,
+ ctrl_sync_count => ?CONTROLLER_SYNC_INTERVAL,
+ last_op=>write};
true ->
{_,HS1} = Module:write(Name, async, Bin, HandlerState),
- {ok,State#{handler_state => HS1,
- ctrl_sync_count => CtrlSync-1,
- last_op=>write}}
+ State#{handler_state => HS1,
+ ctrl_sync_count => CtrlSync-1,
+ last_op=>write}
end.
handle_call(filesync, _From, State = #{id := Name,
@@ -295,7 +296,7 @@ handle_info(Info, #{id := Name, module := Module,
{noreply,State#{handler_state => Module:handle_info(Name,Info,HandlerState)}}.
terminate(overloaded=Reason, #{id:=Name}=State) ->
- log_handler_info(Name, "Handler ~p overloaded and stopping", [Name], State),
+ _ = log_handler_info(Name,"Handler ~p overloaded and stopping",[Name],State),
do_terminate(Reason,State),
ConfigResult = logger:get_handler_config(Name),
case ConfigResult of
diff --git a/lib/kernel/src/logger_h_common.hrl b/lib/kernel/src/logger_h_common.hrl
index 261b0a6246..f2c2dc2a4e 100644
--- a/lib/kernel/src/logger_h_common.hrl
+++ b/lib/kernel/src/logger_h_common.hrl
@@ -206,12 +206,16 @@
%%% officially released (as some counters will grow very large
%%% over time).
-%%-define(SAVE_STATS, true).
+%% -define(SAVE_STATS, true).
-ifdef(SAVE_STATS).
-define(merge_with_stats(STATE),
- STATE#{flushes => 0, flushed => 0, drops => 0,
- burst_drops => 0, casts => 0, calls => 0,
- max_qlen => 0, max_time => 0}).
+ begin
+ TIME = ?timestamp(),
+ STATE#{start => TIME, time => {TIME,0},
+ flushes => 0, flushed => 0, drops => 0,
+ burst_drops => 0, casts => 0, calls => 0,
+ writes => 0, max_qlen => 0, max_time => 0,
+ freq => {TIME,0,0}} end).
-define(update_max_qlen(QLEN, STATE),
begin #{max_qlen := QLEN0} = STATE,
@@ -234,13 +238,28 @@
-define(update_other(OTHER, VAR, INCVAL, STATE),
begin #{OTHER := VAR} = STATE,
STATE#{OTHER => VAR+INCVAL} end).
-
+
+ -define(update_freq(TIME,STATE),
+ begin
+ case STATE of
+ #{freq := {START, 49, _}} ->
+ STATE#{freq => {TIME, 0, trunc(1000000*50/(?diff_time(TIME,START)))}};
+ #{freq := {START, N, FREQ}} ->
+ STATE#{freq => {START, N+1, FREQ}}
+ end end).
+
+ -define(update_time(TIME,STATE),
+ begin #{start := START} = STATE,
+ STATE#{time => {TIME,trunc((?diff_time(TIME,START))/1000000)}} end).
+
-else. % DEFAULT!
-define(merge_with_stats(STATE), STATE).
-define(update_max_qlen(_QLEN, STATE), STATE).
-define(update_calls_or_casts(_CALL_OR_CAST, _INC, STATE), STATE).
-define(update_max_time(_TIME, STATE), STATE).
-define(update_other(_OTHER, _VAR, _INCVAL, STATE), STATE).
+ -define(update_freq(_TIME, STATE), STATE).
+ -define(update_time(_TIME, STATE), STATE).
-endif.
%%%-----------------------------------------------------------------
diff --git a/lib/kernel/src/logger_olp.erl b/lib/kernel/src/logger_olp.erl
index 6b76c78c73..7c6a1a8547 100644
--- a/lib/kernel/src/logger_olp.erl
+++ b/lib/kernel/src/logger_olp.erl
@@ -25,47 +25,62 @@
%% API
-export([start_link/4, load/2, info/1, reset/1, stop/1, restart/1,
- set_opts/2, get_opts/1, get_default_opts/0, is_alive/1,
- call/2, cast/2]).
+ set_opts/2, get_opts/1, get_default_opts/0, get_pid/1,
+ call/2, cast/2, get_ref/0, get_ref/1]).
%% gen_server and proc_lib callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-%%%-----------------------------------------------------------------
-%% -define(CONFIG_KEYS,[sync_mode_qlen,
-%% drop_mode_qlen,
-%% flush_qlen,
-%% burst_limit_enable,
-%% burst_limit_max_count,
-%% burst_limit_window_time,
-%% overload_kill_enable,
-%% overload_kill_qlen,
-%% overload_kill_mem_size,
-%% overload_kill_restart_after]).
+-define(OPT_KEYS,[sync_mode_qlen,
+ drop_mode_qlen,
+ flush_qlen,
+ burst_limit_enable,
+ burst_limit_max_count,
+ burst_limit_window_time,
+ overload_kill_enable,
+ overload_kill_qlen,
+ overload_kill_mem_size,
+ overload_kill_restart_after]).
+
+-export_type([olp_ref/0, options/0]).
+
+-opaque olp_ref() :: {atom(),pid(),ets:tid()}.
+
+-type options() :: #{sync_mode_qlen => integer(),
+ drop_mode_qlen => integer(),
+ flush_qlen => integer(),
+ burst_limit_enable => boolean(),
+ burst_limit_max_count => integer(),
+ burst_limit_window_time => integer(),
+ overload_kill_enable => boolean(),
+ overload_kill_qlen => integer(),
+ overload_kill_mem_size => integer(),
+ overload_kill_restart_after => integer()}.
%%%-----------------------------------------------------------------
%%% API
-%-spec start_link(Name,Module,Args,Options) -> {ok,Pid,Olp} | {error,Reason}.
+-spec start_link(Name,Module,Args,Options) -> {ok,Pid,Olp} | {error,Reason} when
+ Name :: atom(),
+ Module :: module(),
+ Args :: term(),
+ Options :: options(),
+ Pid :: pid(),
+ Olp :: olp_ref(),
+ Reason :: term().
start_link(Name,Module,Args,Options0) when is_map(Options0) ->
Options = maps:merge(get_default_opts(),Options0),
case check_opts(Options) of
ok ->
- case proc_lib:start_link(?MODULE,init,
- [[Name,Module,Args,Options]]) of
- {ok,Pid,Olp} ->
- {ok,Pid,{Olp,Options}};
- Error ->
- Error
- end;
+ proc_lib:start_link(?MODULE,init,[[Name,Module,Args,Options]]);
Error ->
Error
end.
-is_alive({_Name,Pid,_ModeRef}) ->
- is_process_alive(Pid).
-
+-spec load(Olp, Msg) -> ok when
+ Olp :: olp_ref(),
+ Msg :: term().
load({_Name,Pid,ModeRef},Msg) ->
%% If the process is getting overloaded, the message will be
%% synchronous instead of asynchronous (slows down the tempo of a
@@ -92,22 +107,36 @@ load({_Name,Pid,ModeRef},Msg) ->
end,
ok.
+-spec info(Olp) -> map() | {error, busy} when
+ Olp :: atom() | pid() | olp_ref().
info(Olp) ->
call(Olp, info).
+-spec reset(Olp) -> ok | {error, busy} when
+ Olp :: atom() | pid() | olp_ref().
reset(Olp) ->
call(Olp, reset).
+-spec stop(Olp) -> ok when
+ Olp :: atom() | pid() | olp_ref().
stop({_Name,Pid,_ModRef}) ->
+ stop(Pid);
+stop(Pid) ->
_ = gen_server:call(Pid, stop),
ok.
-set_opts({_Name,Pid,_ModRef}, Opts) ->
- gen_server:call(Pid, {set_opts,Opts}).
+-spec set_opts(Olp, Opts) -> ok | {error,term()} | {error, busy} when
+ Olp :: atom() | pid() | olp_ref(),
+ Opts :: options().
+set_opts(Olp, Opts) ->
+ call(Olp, {set_opts,Opts}).
-get_opts({_Name,Pid,_ModRef}) ->
- gen_server:call(Pid, get_opts).
+-spec get_opts(Olp) -> options() | {error, busy} when
+ Olp :: atom() | pid() | olp_ref().
+get_opts(Olp) ->
+ call(Olp, get_opts).
+-spec get_default_opts() -> options().
get_default_opts() ->
#{sync_mode_qlen => ?SYNC_MODE_QLEN,
drop_mode_qlen => ?DROP_MODE_QLEN,
@@ -120,11 +149,30 @@ get_default_opts() ->
overload_kill_mem_size => ?OVERLOAD_KILL_MEM_SIZE,
overload_kill_restart_after => ?OVERLOAD_KILL_RESTART_AFTER}.
+-spec restart(fun(() -> any())) -> ok.
restart(Fun) ->
- erlang:display(restarting),
- erlang:display(_ = Fun()),
+ Result =
+ try Fun()
+ catch C:R:S ->
+ {error,{restart_failed,Fun,C,R,S}}
+ end,
+ ?LOG_INTERNAL(debug,[{logger_olp,restart},
+ {result,Result}]),
ok.
+-spec get_ref() -> olp_ref().
+get_ref() ->
+ get(olp_ref).
+
+-spec get_ref(PidOrName) -> olp_ref() | {error, busy} when
+ PidOrName :: pid() | atom().
+get_ref(PidOrName) ->
+ call(PidOrName,get_ref).
+
+-spec get_pid(olp_ref()) -> pid().
+get_pid({_Name,Pid,_ModeRef}) ->
+ Pid.
+
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
@@ -136,13 +184,15 @@ init([Name,Module,Args,Options]) ->
?init_test_hooks(),
?start_observation(Name),
- try Module:init(Args) of
- {ok,CBState} ->
- try ets:new(Name, [public]) of
- ModeRef ->
+ try ets:new(Name, [public]) of
+ ModeRef ->
+ OlpRef = {Name,self(),ModeRef},
+ put(olp_ref,OlpRef),
+ try Module:init(Args) of
+ {ok,CBState} ->
?set_mode(ModeRef, async),
T0 = ?timestamp(),
- proc_lib:init_ack({ok,self(),{Name,self(),ModeRef}}),
+ proc_lib:init_ack({ok,self(),OlpRef}),
%% Storing options in state to avoid copying
%% (sending) the option data with each message
State0 = ?merge_with_stats(
@@ -156,15 +206,17 @@ init([Name,Module,Args,Options]) ->
burst_msg_count => 0,
cb_state => CBState}),
State = reset_restart_flag(State0),
- gen_server:enter_loop(?MODULE, [], State)
+ gen_server:enter_loop(?MODULE, [], State);
+ Error ->
+ _ = ets:delete(ModeRef),
+ unregister(Name),
+ proc_lib:init_ack(Error)
catch
_:Error ->
+ _ = ets:delete(ModeRef),
unregister(Name),
proc_lib:init_ack(Error)
- end;
- Error ->
- unregister(Name),
- proc_lib:init_ack(Error)
+ end
catch
_:Error ->
unregister(Name),
@@ -177,8 +229,11 @@ handle_call({'$olp_load', Msg}, _From, State) ->
%% Result == ok | dropped
{reply,Result, State1};
+handle_call(get_ref,_From,#{id:=Name,mode_ref:=ModeRef}=State) ->
+ {reply,{Name,self(),ModeRef},State};
+
handle_call({set_opts,Opts0},_From,State) ->
- Opts = maps:merge(get_default_opts(),Opts0),
+ Opts = maps:merge(maps:with(?OPT_KEYS,State),Opts0),
case check_opts(Opts) of
ok ->
{reply, ok, maps:merge(State,Opts)};
@@ -186,6 +241,9 @@ handle_call({set_opts,Opts0},_From,State) ->
{reply, Error, State}
end;
+handle_call(get_opts,_From,State) ->
+ {reply, maps:with(?OPT_KEYS,State), State};
+
handle_call(info, _From, State) ->
{reply, State, State};
@@ -214,7 +272,6 @@ handle_call(Msg, From, #{module:=Module,cb_state:=CBState}=State) ->
%% This is the asynchronous load event.
handle_cast({'$olp_load', Msg}, State) ->
{_Result,State1} = do_load(Msg, cast, State),
- %% Result == ok | dropped
{noreply,State1};
handle_cast(Msg, #{module:=Module, cb_state:=CBState} = State) ->
@@ -230,7 +287,10 @@ handle_info(Msg, #{module := Module, cb_state := CBState} = State) ->
{noreply,CBState1} ->
{noreply,State#{cb_state=>CBState1}};
{noreply,CBState1,Timeout} ->
- {noreply,State#{cb_state=>CBState1},Timeout}
+ {noreply,State#{cb_state=>CBState1},Timeout};
+ {load,CBState1} ->
+ {_,State1} = do_load(Msg, cast, State#{cb_state=>CBState1}),
+ {noreply,State1}
end.
terminate({shutdown,{overloaded,_QLen,_Mem}},
@@ -242,12 +302,11 @@ terminate({shutdown,{overloaded,_QLen,_Mem}},
case try_callback_call(Module,terminate,[overloaded,CBState],ok) of
{ok,Fun} when is_function(Fun,0), is_integer(RestartAfter) ->
set_restart_flag(State),
- timer:apply_after(RestartAfter,?MODULE,restart,[Fun]),
+ _ = timer:apply_after(RestartAfter,?MODULE,restart,[Fun]),
ok;
_ ->
ok
- end,
- ok;
+ end;
terminate(Reason, #{id:=Name, module:=Module, cb_state:=CBState}) ->
_ = try_callback_call(Module,terminate,[Reason,CBState],ok),
unregister(Name),
@@ -259,6 +318,8 @@ code_change(_OldVsn, State, _Extra) ->
%%%-----------------------------------------------------------------
%%% Internal functions
+-spec call(Olp, term()) -> term() | {error,busy} when
+ Olp :: atom() | pid() | olp_ref().
call({_Name, Pid, _ModeRef},Msg) ->
call(Pid, Msg);
call(Server, Msg) ->
@@ -268,6 +329,7 @@ call(Server, Msg) ->
_:{timeout,_} -> {error,busy}
end.
+-spec cast(olp_ref(),term()) -> ok.
cast({_Name, Pid, _ModeRef},Msg) ->
gen_server:cast(Pid, Msg).
@@ -276,26 +338,27 @@ cast({_Name, Pid, _ModeRef},Msg) ->
%% before LogWindowSize events have been handled
do_load(Msg, CallOrCast, State) ->
T1 = ?timestamp(),
+ State1 = ?update_time(T1,State),
%% check if the process is getting overloaded, or if it's
%% recovering from overload (the check must be done for each
%% event to react quickly to large bursts of events and
%% to ensure that the handler can never end up in drop mode
%% with an empty mailbox, which would stop operation)
- {Mode1,QLen,Mem,State1} = check_load(State),
+ {Mode1,QLen,Mem,State2} = check_load(State1),
%% kill the handler if it can't keep up with the load
- kill_if_choked(QLen, Mem, State1),
+ kill_if_choked(QLen, Mem, State2),
if Mode1 == flush ->
- flush(T1, State1);
+ flush(T1, State2);
true ->
- handle_load(Mode1, T1, Msg, CallOrCast, State1)
+ handle_load(Mode1, T1, Msg, CallOrCast, State2)
end.
%% this function is called by do_load/3 after an overload check
%% has been performed, where QLen > FlushQLen
-flush(T1, State=#{id := _Name, last_load_ts := T0, mode_ref := ModeRef}) ->
+flush(T1, State=#{id := _Name, mode := Mode, last_load_ts := T0, mode_ref := ModeRef}) ->
%% flush load messages in the mailbox (a limited number in order
%% to not cause long delays)
NewFlushed = flush_load(?FLUSH_MAX_N),
@@ -306,17 +369,18 @@ flush(T1, State=#{id := _Name, last_load_ts := T0, mode_ref := ModeRef}) ->
%% because of the receive loop when flushing messages, the
%% handler will be scheduled out often and the mailbox could
%% grow very large, so we'd better check the queue again here
- {_,_QLen1} = process_info(self(), message_queue_len),
- ?observe(_Name,{max_qlen,_QLen1}),
+ {_,QLen1} = process_info(self(), message_queue_len),
+ ?observe(_Name,{max_qlen,QLen1}),
%% Add 1 for the current log event
?observe(_Name,{flushed,NewFlushed+1}),
State2 = ?update_max_time(?diff_time(T1,T0),State1),
- State3 = ?update_max_qlen(_QLen1,State2),
+ State3 = ?update_max_qlen(QLen1,State2),
+ State4 = maybe_notify_mode_change(async,QLen1,State3),
{dropped,?update_other(flushed,FLUSHED,NewFlushed,
- State3#{mode => ?set_mode(ModeRef,async),
- last_qlen => 0,
+ State4#{mode => ?change_mode(ModeRef,Mode,async),
+ last_qlen => QLen1,
last_load_ts => T1})}.
%% this function is called to actually handle the message
@@ -334,7 +398,7 @@ handle_load(Mode, T1, Msg, _CallOrCast,
{Result,LastQLen1,CBState1} =
if DoWrite ->
?observe(_Name,{_CallOrCast,1}),
- {ok,CBS} = try_callback_call(Module,handle_load,[Msg,CBState]),
+ CBS = try_callback_call(Module,handle_load,[Msg,CBState]),
{ok,element(2, process_info(self(), message_queue_len)),CBS};
true ->
?observe(_Name,{flushed,1}),
@@ -353,9 +417,10 @@ handle_load(Mode, T1, Msg, _CallOrCast,
State3 =
if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso
(Time > ?IDLE_DETECT_TIME_USEC) ->
- S = notify(idle,State2),
- S#{mode => ?change_mode(ModeRef, Mode, async),
- burst_msg_count => 0};
+ S1 = notify(idle,State2),
+ S2 = maybe_notify_mode_change(async,LastQLen1,S1),
+ S2#{mode => ?change_mode(ModeRef, Mode, async),
+ burst_msg_count => 0};
true ->
State2#{mode => Mode}
end,
@@ -365,7 +430,14 @@ handle_load(Mode, T1, Msg, _CallOrCast,
?update_max_time(Time,
State5#{last_qlen := LastQLen1,
last_load_ts => T1}),
- {Result,State6}.
+ State7 = case Result of
+ ok ->
+ S = ?update_freq(T1,State6),
+ ?update_other(writes,WRITES,1,S);
+ _ ->
+ State6
+ end,
+ {Result,State7}.
%%%-----------------------------------------------------------------
@@ -452,7 +524,7 @@ check_load(State = #{id:=_Name, mode_ref := ModeRef, mode := Mode,
QLen >= DropModeQLen ->
%% Note that drop mode will force load messages to
%% be dropped on the client side (never sent to
- %% the handler).
+ %% the olp process).
IncDrops = if Mode == drop -> 0; true -> 1 end,
{?change_mode(ModeRef, Mode, drop), IncDrops,0};
QLen >= SyncModeQLen ->
@@ -461,10 +533,11 @@ check_load(State = #{id:=_Name, mode_ref := ModeRef, mode := Mode,
{?change_mode(ModeRef, Mode, async), 0,0}
end,
State1 = ?update_other(drops,DROPS,_NewDrops,State),
- State2 = maybe_notify_mode_change(Mode1,State1),
+ State2 = ?update_max_qlen(QLen,State1),
+ State3 = maybe_notify_mode_change(Mode1,QLen,State2),
{Mode1, QLen, Mem,
?update_other(flushes,FLUSHES,_NewFlushes,
- State2#{last_qlen => QLen})}.
+ State3#{last_qlen => QLen})}.
limit_burst(#{burst_limit_enable := false}=State) ->
{true,State};
@@ -517,7 +590,11 @@ flush_load(N, Limit) ->
flush_load(N+1, Limit);
{'$gen_call',{Pid,MRef},{'$olp_load',_}} ->
Pid ! {MRef, dropped},
- flush_load(N+1, Limit)
+ flush_load(N+1, Limit);
+ {log,_,_,_,_} ->
+ flush_load(N+1, Limit);
+ {log,_,_,_} ->
+ flush_load(N+1, Limit)
after
0 -> N
end.
@@ -528,13 +605,13 @@ overload_levels_ok(Options) ->
FQL = maps:get(flush_qlen, Options, ?FLUSH_QLEN),
(DMQL > 1) andalso (SMQL =< DMQL) andalso (DMQL =< FQL).
-maybe_notify_mode_change(drop,#{mode:=Mode0}=State)
+maybe_notify_mode_change(drop,_QLen,#{mode:=Mode0}=State)
when Mode0=/=drop ->
notify({mode_change,Mode0,drop},State);
-maybe_notify_mode_change(Mode1,#{mode:=drop}=State)
+maybe_notify_mode_change(Mode1,_QLen,#{mode:=drop}=State)
when Mode1==async; Mode1==sync ->
notify({mode_change,drop,Mode1},State);
-maybe_notify_mode_change(_,State) ->
+maybe_notify_mode_change(_,_,State) ->
State.
notify(Note,#{module:=Module,cb_state:=CBState}=State) ->
diff --git a/lib/kernel/src/logger_proxy.erl b/lib/kernel/src/logger_proxy.erl
new file mode 100644
index 0000000000..f89891bff0
--- /dev/null
+++ b/lib/kernel/src/logger_proxy.erl
@@ -0,0 +1,152 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(logger_proxy).
+
+%% API
+-export([start_link/0, restart/0, log/2, child_spec/0]).
+
+%% logger_olp callbacks
+-export([init/1, handle_load/2, handle_info/2, terminate/2,
+ notify/2]).
+
+-include("logger_internal.hrl").
+
+-define(SERVER,?MODULE).
+
+%%%-----------------------------------------------------------------
+%%% API
+-spec log(Olp, RemoteLog) -> ok when
+ Olp :: logger_olp:olp_ref(),
+ RemoteLog :: {remote,node(),LogEvent},
+ LogEvent :: {log,Level,Format,Args,Meta} |
+ {log,Level,StringOrReport,Meta},
+ Level :: logger:level(),
+ Format :: io:format(),
+ Args :: list(term()),
+ StringOrReport :: unicode:chardata() | logger:report(),
+ Meta :: logger:metadata().
+log(Olp, RemoteLog) ->
+ case logger_olp:get_pid(Olp) =:= self() of
+ true ->
+ %% This happens when the log event comes from the
+ %% emulator, and the group leader is on a remote node.
+ _ = handle_load(RemoteLog, no_state),
+ ok;
+ false ->
+ logger_olp:load(Olp, RemoteLog)
+ end.
+
+%% Called by supervisor
+-spec start_link() -> {ok,pid(),logger_olp:olp_ref()} | {error,term()}.
+start_link() ->
+ %% Notice that sync_mode is only used when logging to remote node,
+ %% i.e. when the log/2 API function is called.
+ %%
+ %% When receiving log events from the emulator or from a remote
+ %% node, the log event is sent as a message to this process, and
+ %% thus received directly in handle_info/2. This means that the
+ %% mode (async/sync/drop) is not read before the message is
+ %% sent. Thus sync mode is never entered, and drop mode is
+ %% implemented by setting the system_logger flag to undefined (see
+ %% notify/2)
+ %%
+ %% Burst limit is disabled, since this is only a proxy and we
+ %% don't want to limit bursts twice (here and in the handler).
+ Opts = #{sync_mode_qlen=>500,
+ drop_mode_qlen=>1000,
+ flush_qlen=>5000,
+ burst_limit_enable=>false},
+ logger_olp:start_link(?SERVER,?MODULE,[],Opts).
+
+%% Fun used for restarting this process after it has been killed due
+%% to overload (must set overload_kill_enable=>true in opts)
+restart() ->
+ case supervisor:start_child(logger_sup, child_spec()) of
+ {ok,_Pid,Olp} ->
+ {ok,Olp};
+ {error,{Reason,Ch}} when is_tuple(Ch), element(1,Ch)==child ->
+ {error,Reason};
+ Error ->
+ Error
+ end.
+
+%% Called internally and by logger_sup
+child_spec() ->
+ Name = ?SERVER,
+ #{id => Name,
+ start => {?MODULE, start_link, []},
+ restart => temporary,
+ shutdown => 2000,
+ type => worker,
+ modules => [?MODULE]}.
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init([]) ->
+ process_flag(trap_exit, true),
+ _ = erlang:system_flag(system_logger,self()),
+ logger_server:set_proxy_ref(logger_olp:get_ref()),
+ {ok,no_state}.
+
+%% Log event to send to the node where the group leader of it's client resides
+handle_load({remote,Node,Log},State) ->
+ %% If the connection is overloaded (send_nosuspend returns false),
+ %% we drop the message.
+ _ = erlang:send_nosuspend({?SERVER,Node},Log),
+ State;
+%% Log event to log on this node
+handle_load({log,Level,Format,Args,Meta},State) ->
+ try_log([Level,Format,Args,Meta]),
+ State;
+handle_load({log,Level,Report,Meta},State) ->
+ try_log([Level,Report,Meta]),
+ State.
+
+%% Log event sent to this process e.g. from the emulator - it is really load
+handle_info(Log,State) when is_tuple(Log), element(1,Log)==log ->
+ {load,State}.
+
+terminate(overloaded, _State) ->
+ _ = erlang:system_flag(system_logger,undefined),
+ {ok,fun ?MODULE:restart/0};
+terminate(_Reason, _State) ->
+ _ = erlang:system_flag(system_logger,whereis(logger)),
+ ok.
+
+notify({mode_change,_Mode0,drop},State) ->
+ _ = erlang:system_flag(system_logger,undefined),
+ State;
+notify({mode_change,drop,_Mode1},State) ->
+ _ = erlang:system_flag(system_logger,self()),
+ State;
+notify(_Note,State) ->
+ State.
+
+%%%-----------------------------------------------------------------
+%%% Internal functions
+try_log(Args) ->
+ try apply(logger,log,Args)
+ catch C:R:S ->
+ ?LOG_INTERNAL(debug,[{?MODULE,log_failed},
+ {log,Args},
+ {reason,{C,R,S}}])
+ end.
diff --git a/lib/kernel/src/logger_server.erl b/lib/kernel/src/logger_server.erl
index b7735dbcf7..c58edf51f8 100644
--- a/lib/kernel/src/logger_server.erl
+++ b/lib/kernel/src/logger_server.erl
@@ -22,7 +22,7 @@
-behaviour(gen_server).
%% API
--export([start_link/0,
+-export([start_link/0, set_proxy_ref/1, get_proxy_ref/1,
add_handler/3, remove_handler/1,
add_filter/2, remove_filter/2,
set_module_level/2, unset_module_level/0,
@@ -43,7 +43,7 @@
-define(SERVER, logger).
-define(LOGGER_SERVER_TAG, '$logger_cb_process').
--record(state, {tid, async_req, async_req_queue}).
+-record(state, {tid, async_req, async_req_queue, remote_logger}).
%%%===================================================================
%%% API
@@ -52,6 +52,14 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+-spec set_proxy_ref(logger_olp:olp_ref()) -> ok.
+set_proxy_ref(ProxyRef) ->
+ call({set_proxy_ref,ProxyRef}).
+
+-spec get_proxy_ref(ets:tid()) -> logger_olp:olp_ref().
+get_proxy_ref(Tid) ->
+ ets:lookup_element(Tid,proxy_ref,2).
+
add_handler(Id,Module,Config0) ->
try {check_id(Id),check_mod(Module)} of
{ok,ok} ->
@@ -311,7 +319,10 @@ handle_call({set_module_level,Modules,Level}, _From, #state{tid=Tid}=State) ->
{reply,Reply,State};
handle_call({unset_module_level,Modules}, _From, #state{tid=Tid}=State) ->
Reply = logger_config:unset_module_level(Tid,Modules),
- {reply,Reply,State}.
+ {reply,Reply,State};
+handle_call({set_proxy_ref,ProxyRef},_From,#state{tid=Tid}=State) ->
+ true = ets:insert(Tid,{proxy_ref,ProxyRef}),
+ {reply,ok,State}.
handle_cast({async_req_reply,_Ref,_Reply} = Reply,State) ->
call_h_reply(Reply,State);
@@ -357,7 +368,7 @@ terminate(_Reason, _State) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
-call(Request) ->
+call(Request) when is_tuple(Request) ->
Action = element(1,Request),
case get(?LOGGER_SERVER_TAG) of
true when
@@ -369,6 +380,7 @@ call(Request) ->
gen_server:call(?SERVER,Request,?DEFAULT_LOGGER_CALL_TIMEOUT)
end.
+
do_add_filter(Tid,Id,{FId,_} = Filter) ->
case logger_config:get(Tid,Id) of
{ok,Config} ->
diff --git a/lib/kernel/src/logger_sup.erl b/lib/kernel/src/logger_sup.erl
index 3d6f482e20..9ea8558a16 100644
--- a/lib/kernel/src/logger_sup.erl
+++ b/lib/kernel/src/logger_sup.erl
@@ -50,7 +50,9 @@ init([]) ->
start => {logger_handler_watcher, start_link, []},
shutdown => brutal_kill},
- {ok, {SupFlags, [Watcher]}}.
+ Proxy = logger_proxy:child_spec(),
+
+ {ok, {SupFlags, [Watcher,Proxy]}}.
%%%===================================================================
%%% Internal functions
diff --git a/lib/kernel/test/logger.cover b/lib/kernel/test/logger.cover
index a9ef81903d..9691aa295e 100644
--- a/lib/kernel/test/logger.cover
+++ b/lib/kernel/test/logger.cover
@@ -9,6 +9,7 @@
logger_handler_watcher,
logger_h_common,
logger_olp,
+ logger_proxy,
logger_server,
logger_simple_h,
logger_std_h,