From 32d7b91b120d375a0cea04318deaababeb69bb03 Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Mon, 24 Jul 2017 12:03:20 -0500 Subject: Reduce message passing through rexi_server There's no need to have rexi_server handle every process exit. There's already an exception handler and workers can report on their own exit reasons selectively. This also changes job tracking into a concurrent ets update so if an RPC worker doesn't die in rexi code then rexi_server will only handle half of the message passing load since we no longer have to handle process exits. --- src/rexi/src/rexi_server.erl | 129 +++++++++++++++++++++---------------------- 1 file changed, 62 insertions(+), 67 deletions(-) diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl index 4b0362843..ab9cba763 100644 --- a/src/rexi/src/rexi_server.erl +++ b/src/rexi/src/rexi_server.erl @@ -16,30 +16,33 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([start_link/1, init_p/2, init_p/3]). +-export([start_link/1, init_p/5]). -record(job, { - client::reference(), - worker::reference(), + client_ref::reference(), client_pid::pid(), worker_pid::pid() }). --record(error, { - reason, - stack -}). - -record(st, { - workers = ets:new(workers, [private, {keypos, #job.worker}]), - clients = ets:new(clients, [private, {keypos, #job.client}]) + clients = ets:new(clients, [ + public, + {write_concurrency, true}, + {keypos, #job.client_ref} + ]), + workers = ets:new(workers, [ + public, + {write_concurrency, true}, + {keypos, #job.worker_pid} + ]) }). start_link(ServerId) -> gen_server:start_link({local, ServerId}, ?MODULE, [], []). init([]) -> + process_flag(trap_exit, true), {ok, #st{}}. @@ -51,83 +54,70 @@ handle_cast({doit, From, MFA}, St) -> handle_cast({doit, From, undefined, MFA}, St); handle_cast({doit, {ClientPid, ClientRef} = From, Nonce, MFA}, State) -> - {LocalPid, Ref} = spawn_monitor(?MODULE, init_p, [From, MFA, Nonce]), - Job = #job{ - client = ClientRef, - worker = Ref, + LocalPid = spawn_link(?MODULE, init_p, [self(), State, From, MFA, Nonce]), + add_job(State, #job{ + client_ref = ClientRef, client_pid = ClientPid, worker_pid = LocalPid - }, - {noreply, add_job(Job, State)}; - - -handle_cast({kill, FromRef}, #st{clients = Clients} = St) -> - case find_worker(FromRef, Clients) of - #job{worker = KeyRef, worker_pid = Pid} = Job -> - erlang:demonitor(KeyRef), - exit(Pid, kill), - {noreply, remove_job(Job, St)}; - false -> - {noreply, St} + }), + {noreply, State}; + + +handle_cast({kill, ClientRef}, #st{clients = Clients} = St) -> + case ets:lookup(Clients, ClientRef) of + [#job{worker_pid = Pid} = Job] -> + erlang:unlink(Pid), + exit(Pid, kill), + {noreply, remove_job(St, Job)}; + [] -> + {noreply, St} end; handle_cast(_, St) -> couch_log:notice("rexi_server ignored_cast", []), {noreply, St}. -handle_info({'DOWN', Ref, process, _, normal}, #st{workers=Workers} = St) -> - case find_worker(Ref, Workers) of - #job{} = Job -> - {noreply, remove_job(Job, St)}; - false -> - {noreply, St} - end; - -handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers=Workers} = St) -> - case find_worker(Ref, Workers) of - #job{worker_pid=Pid, worker=Ref, client_pid=CPid, client=CRef} =Job -> - Msg = case Error of - #error{reason = {_Class, Reason}, stack = Stack} -> - {Reason, Stack}; - Else -> - Else - end, - notify_caller({CPid, CRef}, Msg), - {noreply, remove_job(Job, St)}; - false -> - {noreply, St} +handle_info({'EXIT', Pid, Error}, #st{workers = Workers} = St) -> + case ets:lookup(Workers, Pid) of + [#job{client_pid = CPid, client_ref = CRef} = Job] -> + notify_caller({CPid, CRef}, Error), + {noreply, remove_job(St, Job)}; + [] -> + {noreply, St} end; handle_info(_Info, St) -> {noreply, St}. terminate(_Reason, St) -> - ets:foldl(fun(#job{worker_pid=Pid},_) -> exit(Pid,kill) end, nil, - St#st.workers), + ets:foldl(fun(#job{worker_pid = Pid},_) -> + exit(Pid,kill) + end, nil, St#st.workers), ok. code_change(_OldVsn, #st{}=State, _Extra) -> {ok, State}. -init_p(From, MFA) -> - init_p(From, MFA, undefined). - %% @doc initializes a process started by rexi_server. --spec init_p({pid(), reference()}, {atom(), atom(), list()}, +-spec init_p(pid(), #st{}, {pid(), reference()}, {atom(), atom(), list()}, string() | undefined) -> any(). -init_p(From, {M,F,A}, Nonce) -> +init_p(Parent, State, From, {M,F,A}, Nonce) -> put(rexi_from, From), put('$initial_call', {M,F,length(A)}), put(nonce, Nonce), - try apply(M, F, A) catch exit:normal -> ok; Class:Reason -> - Stack = clean_stack(), - Args = [M, F, length(A), Class, Reason, Stack], - couch_log:error("rexi_server ~s:~s/~b :: ~p:~p ~100p", Args), - exit(#error{ - reason = {Class, Reason}, - stack = Stack - }) - end. + try + apply(M, F, A) + catch + exit:normal -> + remove_worker(State, self()); + Class:Reason -> + Stack = clean_stack(), + Args = [M, F, length(A), Class, Reason, Stack], + couch_log:error("rexi_server ~s:~s/~b :: ~p:~p ~100p", Args), + notify_caller(From, {Reason, Stack}), + remove_worker(State, self()) + end, + unlink(Parent). %% internal @@ -135,18 +125,23 @@ clean_stack() -> lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end, erlang:get_stacktrace()). -add_job(Job, #st{workers = Workers, clients = Clients} = State) -> +add_job(#st{workers = Workers, clients = Clients} = State, Job) -> ets:insert(Workers, Job), ets:insert(Clients, Job), State. -remove_job(Job, #st{workers = Workers, clients = Clients} = State) -> +remove_job(#st{workers = Workers, clients = Clients} = State, Job) -> ets:delete_object(Workers, Job), ets:delete_object(Clients, Job), State. -find_worker(Ref, Tab) -> - case ets:lookup(Tab, Ref) of [] -> false; [Worker] -> Worker end. +remove_worker(#st{workers = Workers} = State, Pid) -> + case ets:lookup(Workers, Pid) of + [Job] -> + remove_job(State, Job); + [] -> + State + end. notify_caller({Caller, Ref}, Reason) -> rexi_utils:send(Caller, {Ref, {rexi_EXIT, Reason}}). -- cgit v1.2.1