summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2017-07-24 12:03:20 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2017-07-24 12:03:20 -0500
commit32d7b91b120d375a0cea04318deaababeb69bb03 (patch)
treec5ed26401e36037a5c5663bb83b48a46507c497b
parent5c478ee9ee867bbe2f4deb23cf0350983160dd5e (diff)
downloadcouchdb-optimize-rexi.tar.gz
Reduce message passing through rexi_serveroptimize-rexi
There's no need to have rexi_server handle every process exit. There's already an exception handler and workers can report on their own exit reasons selectively. This also changes job tracking into a concurrent ets update so if an RPC worker doesn't die in rexi code then rexi_server will only handle half of the message passing load since we no longer have to handle process exits.
-rw-r--r--src/rexi/src/rexi_server.erl129
1 files changed, 62 insertions, 67 deletions
diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl
index 4b0362843..ab9cba763 100644
--- a/src/rexi/src/rexi_server.erl
+++ b/src/rexi/src/rexi_server.erl
@@ -16,30 +16,33 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([start_link/1, init_p/2, init_p/3]).
+-export([start_link/1, init_p/5]).
-record(job, {
- client::reference(),
- worker::reference(),
+ client_ref::reference(),
client_pid::pid(),
worker_pid::pid()
}).
--record(error, {
- reason,
- stack
-}).
-
-record(st, {
- workers = ets:new(workers, [private, {keypos, #job.worker}]),
- clients = ets:new(clients, [private, {keypos, #job.client}])
+ clients = ets:new(clients, [
+ public,
+ {write_concurrency, true},
+ {keypos, #job.client_ref}
+ ]),
+ workers = ets:new(workers, [
+ public,
+ {write_concurrency, true},
+ {keypos, #job.worker_pid}
+ ])
}).
start_link(ServerId) ->
gen_server:start_link({local, ServerId}, ?MODULE, [], []).
init([]) ->
+ process_flag(trap_exit, true),
{ok, #st{}}.
@@ -51,83 +54,70 @@ handle_cast({doit, From, MFA}, St) ->
handle_cast({doit, From, undefined, MFA}, St);
handle_cast({doit, {ClientPid, ClientRef} = From, Nonce, MFA}, State) ->
- {LocalPid, Ref} = spawn_monitor(?MODULE, init_p, [From, MFA, Nonce]),
- Job = #job{
- client = ClientRef,
- worker = Ref,
+ LocalPid = spawn_link(?MODULE, init_p, [self(), State, From, MFA, Nonce]),
+ add_job(State, #job{
+ client_ref = ClientRef,
client_pid = ClientPid,
worker_pid = LocalPid
- },
- {noreply, add_job(Job, State)};
-
-
-handle_cast({kill, FromRef}, #st{clients = Clients} = St) ->
- case find_worker(FromRef, Clients) of
- #job{worker = KeyRef, worker_pid = Pid} = Job ->
- erlang:demonitor(KeyRef),
- exit(Pid, kill),
- {noreply, remove_job(Job, St)};
- false ->
- {noreply, St}
+ }),
+ {noreply, State};
+
+
+handle_cast({kill, ClientRef}, #st{clients = Clients} = St) ->
+ case ets:lookup(Clients, ClientRef) of
+ [#job{worker_pid = Pid} = Job] ->
+ erlang:unlink(Pid),
+ exit(Pid, kill),
+ {noreply, remove_job(St, Job)};
+ [] ->
+ {noreply, St}
end;
handle_cast(_, St) ->
couch_log:notice("rexi_server ignored_cast", []),
{noreply, St}.
-handle_info({'DOWN', Ref, process, _, normal}, #st{workers=Workers} = St) ->
- case find_worker(Ref, Workers) of
- #job{} = Job ->
- {noreply, remove_job(Job, St)};
- false ->
- {noreply, St}
- end;
-
-handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers=Workers} = St) ->
- case find_worker(Ref, Workers) of
- #job{worker_pid=Pid, worker=Ref, client_pid=CPid, client=CRef} =Job ->
- Msg = case Error of
- #error{reason = {_Class, Reason}, stack = Stack} ->
- {Reason, Stack};
- Else ->
- Else
- end,
- notify_caller({CPid, CRef}, Msg),
- {noreply, remove_job(Job, St)};
- false ->
- {noreply, St}
+handle_info({'EXIT', Pid, Error}, #st{workers = Workers} = St) ->
+ case ets:lookup(Workers, Pid) of
+ [#job{client_pid = CPid, client_ref = CRef} = Job] ->
+ notify_caller({CPid, CRef}, Error),
+ {noreply, remove_job(St, Job)};
+ [] ->
+ {noreply, St}
end;
handle_info(_Info, St) ->
{noreply, St}.
terminate(_Reason, St) ->
- ets:foldl(fun(#job{worker_pid=Pid},_) -> exit(Pid,kill) end, nil,
- St#st.workers),
+ ets:foldl(fun(#job{worker_pid = Pid},_) ->
+ exit(Pid,kill)
+ end, nil, St#st.workers),
ok.
code_change(_OldVsn, #st{}=State, _Extra) ->
{ok, State}.
-init_p(From, MFA) ->
- init_p(From, MFA, undefined).
-
%% @doc initializes a process started by rexi_server.
--spec init_p({pid(), reference()}, {atom(), atom(), list()},
+-spec init_p(pid(), #st{}, {pid(), reference()}, {atom(), atom(), list()},
string() | undefined) -> any().
-init_p(From, {M,F,A}, Nonce) ->
+init_p(Parent, State, From, {M,F,A}, Nonce) ->
put(rexi_from, From),
put('$initial_call', {M,F,length(A)}),
put(nonce, Nonce),
- try apply(M, F, A) catch exit:normal -> ok; Class:Reason ->
- Stack = clean_stack(),
- Args = [M, F, length(A), Class, Reason, Stack],
- couch_log:error("rexi_server ~s:~s/~b :: ~p:~p ~100p", Args),
- exit(#error{
- reason = {Class, Reason},
- stack = Stack
- })
- end.
+ try
+ apply(M, F, A)
+ catch
+ exit:normal ->
+ remove_worker(State, self());
+ Class:Reason ->
+ Stack = clean_stack(),
+ Args = [M, F, length(A), Class, Reason, Stack],
+ couch_log:error("rexi_server ~s:~s/~b :: ~p:~p ~100p", Args),
+ notify_caller(From, {Reason, Stack}),
+ remove_worker(State, self())
+ end,
+ unlink(Parent).
%% internal
@@ -135,18 +125,23 @@ clean_stack() ->
lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end,
erlang:get_stacktrace()).
-add_job(Job, #st{workers = Workers, clients = Clients} = State) ->
+add_job(#st{workers = Workers, clients = Clients} = State, Job) ->
ets:insert(Workers, Job),
ets:insert(Clients, Job),
State.
-remove_job(Job, #st{workers = Workers, clients = Clients} = State) ->
+remove_job(#st{workers = Workers, clients = Clients} = State, Job) ->
ets:delete_object(Workers, Job),
ets:delete_object(Clients, Job),
State.
-find_worker(Ref, Tab) ->
- case ets:lookup(Tab, Ref) of [] -> false; [Worker] -> Worker end.
+remove_worker(#st{workers = Workers} = State, Pid) ->
+ case ets:lookup(Workers, Pid) of
+ [Job] ->
+ remove_job(State, Job);
+ [] ->
+ State
+ end.
notify_caller({Caller, Ref}, Reason) ->
rexi_utils:send(Caller, {Ref, {rexi_EXIT, Reason}}).