diff options
author | Nick Vatamaniuc <vatamane@gmail.com> | 2021-11-20 01:00:08 -0500 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2021-11-22 17:31:31 -0500 |
commit | b78ccf18cb4ed6e183ed0bf4e5cbe40d7a7dc493 (patch) | |
tree | 82158f0b6c7e97e6955bf0c558aac6eb0329b410 /src/rexi | |
parent | 6e87e43fae23647b281ede250ad9f1a68a8f1cde (diff) | |
download | couchdb-b78ccf18cb4ed6e183ed0bf4e5cbe40d7a7dc493.tar.gz |
Apply erlfmt formatting to source tree
These exceptions from main were ported over to 3.x
```
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -491,6 +491,7 @@ extract_cookie(#httpd{mochi_req = MochiReq}) ->
end.
%%% end hack
+%% erlfmt-ignore
set_auth_handlers() ->
AuthenticationDefault = "{chttpd_auth, cookie_authentication_handler},
```
```
--- a/src/couch/src/couch_debug.erl
+++ b/src/couch/src/couch_debug.erl
@@ -49,6 +49,7 @@ help() ->
].
-spec help(Function :: atom()) -> ok.
+%% erlfmt-ignore
help(opened_files) ->
```
Diffstat (limited to 'src/rexi')
-rw-r--r-- | src/rexi/src/rexi.erl | 79 | ||||
-rw-r--r-- | src/rexi/src/rexi_app.erl | 1 | ||||
-rw-r--r-- | src/rexi/src/rexi_buffer.erl | 29 | ||||
-rw-r--r-- | src/rexi/src/rexi_monitor.erl | 24 | ||||
-rw-r--r-- | src/rexi/src/rexi_server.erl | 132 | ||||
-rw-r--r-- | src/rexi/src/rexi_server_mon.erl | 54 | ||||
-rw-r--r-- | src/rexi/src/rexi_server_sup.erl | 3 | ||||
-rw-r--r-- | src/rexi/src/rexi_sup.erl | 87 | ||||
-rw-r--r-- | src/rexi/src/rexi_utils.erl | 102 |
9 files changed, 263 insertions, 248 deletions
diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl index 170503b7c..fb1763e51 100644 --- a/src/rexi/src/rexi.erl +++ b/src/rexi/src/rexi.erl @@ -30,8 +30,8 @@ stop() -> application:stop(rexi). restart() -> - stop(), start(). - + stop(), + start(). %% @equiv cast(Node, self(), MFA) -spec cast(node(), {atom(), atom(), list()}) -> reference(). @@ -86,22 +86,34 @@ kill_all(NodeRefs) when is_list(NodeRefs) -> %% configure the cluster to send kill_all messages. case config:get_boolean("rexi", "use_kill_all", false) of true -> - PerNodeMap = lists:foldl(fun({Node, Ref}, Acc) -> - maps:update_with(Node, fun(Refs) -> - [Ref | Refs] - end, [Ref], Acc) - end, #{}, NodeRefs), - maps:map(fun(Node, Refs) -> - ServerPid = rexi_utils:server_pid(Node), - rexi_utils:send(ServerPid, cast_msg({kill_all, Refs})) - end, PerNodeMap); + PerNodeMap = lists:foldl( + fun({Node, Ref}, Acc) -> + maps:update_with( + Node, + fun(Refs) -> + [Ref | Refs] + end, + [Ref], + Acc + ) + end, + #{}, + NodeRefs + ), + maps:map( + fun(Node, Refs) -> + ServerPid = rexi_utils:server_pid(Node), + rexi_utils:send(ServerPid, cast_msg({kill_all, Refs})) + end, + PerNodeMap + ); false -> lists:foreach(fun({Node, Ref}) -> kill(Node, Ref) end, NodeRefs) end, ok. %% @equiv async_server_call(Server, self(), Request) --spec async_server_call(pid() | {atom(),node()}, any()) -> reference(). +-spec async_server_call(pid() | {atom(), node()}, any()) -> reference(). async_server_call(Server, Request) -> async_server_call(Server, self(), Request). @@ -110,17 +122,17 @@ async_server_call(Server, Request) -> %% function acts more like cast() than call() in that the server process %% is not monitored. Clients who want to know if the server is alive should %% monitor it themselves before calling this function. --spec async_server_call(pid() | {atom(),node()}, pid(), any()) -> reference(). +-spec async_server_call(pid() | {atom(), node()}, pid(), any()) -> reference(). async_server_call(Server, Caller, Request) -> Ref = make_ref(), - rexi_utils:send(Server, {'$gen_call', {Caller,Ref}, Request}), + rexi_utils:send(Server, {'$gen_call', {Caller, Ref}, Request}), Ref. %% @doc convenience function to reply to the original rexi Caller. -spec reply(any()) -> any(). reply(Reply) -> {Caller, Ref} = get(rexi_from), - erlang:send(Caller, {Ref,Reply}). + erlang:send(Caller, {Ref, Reply}). %% @equiv sync_reply(Reply, 300000) sync_reply(Reply) -> @@ -133,9 +145,10 @@ sync_reply(Reply) -> sync_reply(Reply, Timeout) -> {Caller, Ref} = get(rexi_from), Tag = make_ref(), - erlang:send(Caller, {Ref, {self(),Tag}, Reply}), - receive {Tag, Response} -> - Response + erlang:send(Caller, {Ref, {self(), Tag}, Reply}), + receive + {Tag, Response} -> + Response after Timeout -> timeout end. @@ -174,7 +187,7 @@ stream_init(Timeout) -> %% sending messages. The `From` should be the value provided by %% the worker in the rexi_STREAM_INIT message. -spec stream_start({pid(), any()}) -> ok. -stream_start({Pid, _Tag}=From) when is_pid(Pid) -> +stream_start({Pid, _Tag} = From) when is_pid(Pid) -> gen_server:reply(From, rexi_STREAM_START). %% @doc Cancel a worker stream @@ -184,7 +197,7 @@ stream_start({Pid, _Tag}=From) when is_pid(Pid) -> %% The `From` should be the value provided by the worker in the %% rexi_STREAM_INIT message. -spec stream_cancel({pid(), any()}) -> ok. -stream_cancel({Pid, _Tag}=From) when is_pid(Pid) -> +stream_cancel({Pid, _Tag} = From) when is_pid(Pid) -> gen_server:reply(From, rexi_STREAM_CANCEL). %% @equiv stream(Msg, 100, 300000) @@ -202,13 +215,14 @@ stream(Msg, Limit) -> stream(Msg, Limit, Timeout) -> try maybe_wait(Limit, Timeout) of {ok, Count} -> - put(rexi_unacked, Count+1), + put(rexi_unacked, Count + 1), {Caller, Ref} = get(rexi_from), erlang:send(Caller, {Ref, self(), Msg}), ok - catch throw:timeout -> - couch_stats:increment_counter([rexi, streams, timeout, stream]), - exit(timeout) + catch + throw:timeout -> + couch_stats:increment_counter([rexi, streams, timeout, stream]), + exit(timeout) end. %% @equiv stream2(Msg, 5, 300000) @@ -230,13 +244,14 @@ stream2(Msg, Limit, Timeout) -> maybe_init_stream(Timeout), try maybe_wait(Limit, Timeout) of {ok, Count} -> - put(rexi_unacked, Count+1), + put(rexi_unacked, Count + 1), {Caller, Ref} = get(rexi_from), erlang:send(Caller, {Ref, self(), Msg}), ok - catch throw:timeout -> - couch_stats:increment_counter([rexi, streams, timeout, stream]), - exit(timeout) + catch + throw:timeout -> + couch_stats:increment_counter([rexi, streams, timeout, stream]), + exit(timeout) end. %% @equiv stream_last(Msg, 300000) @@ -259,14 +274,12 @@ stream_ack(Client) -> stream_ack(Client, N) -> erlang:send(Client, {rexi_ack, N}). - %% Sends a ping message to the coordinator. This is for long running %% operations on a node that could exceed the rexi timeout -ping() -> +ping() -> {Caller, _} = get(rexi_from), erlang:send(Caller, {rexi, '$rexi_ping'}). - %% internal functions %% cast_msg(Msg) -> {'$gen_cast', Msg}. @@ -304,7 +317,7 @@ maybe_wait(Limit, Timeout) -> wait_for_ack(Count, Timeout) -> receive - {rexi_ack, N} -> drain_acks(Count-N) + {rexi_ack, N} -> drain_acks(Count - N) after Timeout -> couch_stats:increment_counter([rexi, streams, timeout, wait_for_ack]), throw(timeout) @@ -314,7 +327,7 @@ drain_acks(Count) when Count < 0 -> erlang:error(mismatched_rexi_ack); drain_acks(Count) -> receive - {rexi_ack, N} -> drain_acks(Count-N) + {rexi_ack, N} -> drain_acks(Count - N) after 0 -> {ok, Count} end. diff --git a/src/rexi/src/rexi_app.erl b/src/rexi/src/rexi_app.erl index 0f1e892b5..61e7886e1 100644 --- a/src/rexi/src/rexi_app.erl +++ b/src/rexi/src/rexi_app.erl @@ -14,7 +14,6 @@ -behaviour(application). -export([start/2, stop/1]). - start(_Type, StartArgs) -> rexi_sup:start_link(StartArgs). diff --git a/src/rexi/src/rexi_buffer.erl b/src/rexi/src/rexi_buffer.erl index d16dc8ba3..7f0079f03 100644 --- a/src/rexi/src/rexi_buffer.erl +++ b/src/rexi/src/rexi_buffer.erl @@ -15,10 +15,16 @@ -vsn(1). % gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). --export ([ +-export([ send/2, start_link/1 ]). @@ -37,7 +43,6 @@ send(Dest, Msg) -> Server = list_to_atom(lists:concat([rexi_buffer, "_", get_node(Dest)])), gen_server:cast(Server, {deliver, Dest, Msg}). - init(_) -> %% TODO Leverage os_mon to discover available memory in the system Max = list_to_integer(config:get("rexi", "buffer_count", "2000")), @@ -45,7 +50,6 @@ init(_) -> handle_call(erase_buffer, _From, State) -> {reply, ok, State#state{buffer = queue:new(), count = 0}, 0}; - handle_call(get_buffered_count, _From, State) -> {reply, State#state.count, State, 0}. @@ -53,19 +57,19 @@ handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) -> couch_stats:increment_counter([rexi, buffered]), Q2 = queue:in({Dest, Msg}, Q), case should_drop(State) of - true -> - couch_stats:increment_counter([rexi, dropped]), + true -> + couch_stats:increment_counter([rexi, dropped]), {noreply, State#state{buffer = queue:drop(Q2)}, 0}; - false -> - {noreply, State#state{buffer = Q2, count = C+1}, 0} + false -> + {noreply, State#state{buffer = Q2, count = C + 1}, 0} end. -handle_info(timeout, #state{sender = nil, buffer = {[],[]}, count = 0}=State) -> +handle_info(timeout, #state{sender = nil, buffer = {[], []}, count = 0} = State) -> {noreply, State}; handle_info(timeout, #state{sender = nil, count = C} = State) when C > 0 -> #state{buffer = Q, count = C} = State, {{value, {Dest, Msg}}, Q2} = queue:out_r(Q), - NewState = State#state{buffer = Q2, count = C-1}, + NewState = State#state{buffer = Q2, count = C - 1}, case erlang:send(Dest, Msg, [noconnect, nosuspend]) of ok when C =:= 1 -> % We just sent the last queued messsage, we'll use this opportunity @@ -82,7 +86,6 @@ handle_info(timeout, #state{sender = nil, count = C} = State) when C > 0 -> handle_info(timeout, State) -> % Waiting on a sender to return {noreply, State}; - handle_info({'DOWN', Ref, _, Pid, _}, #state{sender = {Pid, Ref}} = State) -> {noreply, State#state{sender = nil}, 0}. @@ -91,7 +94,7 @@ terminate(_Reason, _State) -> code_change(_OldVsn, {state, Buffer, Sender, Count}, _Extra) -> Max = list_to_integer(config:get("rexi", "buffer_count", "2000")), - {ok, #state{buffer=Buffer, sender=Sender, count=Count, max_count=Max}}; + {ok, #state{buffer = Buffer, sender = Sender, count = Count, max_count = Max}}; code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/rexi/src/rexi_monitor.erl b/src/rexi/src/rexi_monitor.erl index f90ec5160..7fe66db71 100644 --- a/src/rexi/src/rexi_monitor.erl +++ b/src/rexi/src/rexi_monitor.erl @@ -14,16 +14,17 @@ -export([start/1, stop/1]). -export([wait_monitors/1]). - %% @doc spawn_links a process which monitors the supplied list of items and %% returns the process ID. If a monitored process exits, the caller will %% receive a {rexi_DOWN, MonitoringPid, DeadPid, Reason} message. --spec start([pid() | atom() | {atom(),node()}]) -> pid(). +-spec start([pid() | atom() | {atom(), node()}]) -> pid(). start(Procs) -> Parent = self(), Nodes = [node() | nodes()], - {Mon, Skip} = lists:partition(fun(P) -> should_monitor(P, Nodes) end, - Procs), + {Mon, Skip} = lists:partition( + fun(P) -> should_monitor(P, Nodes) end, + Procs + ), spawn_link(fun() -> [notify_parent(Parent, P, noconnect) || P <- Skip], [erlang:monitor(process, P) || P <- Mon], @@ -50,16 +51,17 @@ should_monitor({_, Node}, Nodes) -> wait_monitors(Parent) -> receive - {'DOWN', _, process, Pid, Reason} -> - notify_parent(Parent, Pid, Reason), - ?MODULE:wait_monitors(Parent); - {Parent, shutdown} -> - ok + {'DOWN', _, process, Pid, Reason} -> + notify_parent(Parent, Pid, Reason), + ?MODULE:wait_monitors(Parent); + {Parent, shutdown} -> + ok end. flush_down_messages() -> - receive {rexi_DOWN, _, _, _} -> - flush_down_messages() + receive + {rexi_DOWN, _, _, _} -> + flush_down_messages() after 0 -> ok end. diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl index 4cd9ce66e..47c128d7b 100644 --- a/src/rexi/src/rexi_server.erl +++ b/src/rexi/src/rexi_server.erl @@ -13,8 +13,14 @@ -module(rexi_server). -behaviour(gen_server). -vsn(1). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). -export([start_link/1, init_p/2, init_p/3]). @@ -24,10 +30,10 @@ -include_lib("rexi/include/rexi.hrl"). -record(job, { - client::reference(), - worker::reference(), - client_pid::pid(), - worker_pid::pid() + client :: reference(), + worker :: reference(), + client_pid :: pid(), + worker_pid :: pid() }). -record(st, { @@ -47,30 +53,27 @@ init([]) -> handle_call(get_errors, _From, #st{errors = Errors} = St) -> {reply, {ok, lists:reverse(queue:to_list(Errors))}, St}; - handle_call(get_last_error, _From, #st{errors = Errors} = St) -> try {reply, {ok, queue:get_r(Errors)}, St} - catch error:empty -> - {reply, {error, empty}, St} + catch + error:empty -> + {reply, {error, empty}, St} end; - -handle_call({set_error_limit, N}, _From, #st{error_count=Len, errors=Q} = St) -> - if N < Len -> - {NewQ, _} = queue:split(N, Q); - true -> - NewQ = Q +handle_call({set_error_limit, N}, _From, #st{error_count = Len, errors = Q} = St) -> + if + N < Len -> + {NewQ, _} = queue:split(N, Q); + true -> + NewQ = Q end, NewLen = queue:len(NewQ), - {reply, ok, St#st{error_limit=N, error_count=NewLen, errors=NewQ}}; - + {reply, ok, St#st{error_limit = N, error_count = NewLen, errors = NewQ}}; handle_call(_Request, _From, St) -> {reply, ignored, St}. - handle_cast({doit, From, MFA}, St) -> handle_cast({doit, From, undefined, MFA}, St); - handle_cast({doit, {ClientPid, ClientRef} = From, Nonce, MFA}, State) -> {LocalPid, Ref} = spawn_monitor(?MODULE, init_p, [From, MFA, Nonce]), Job = #job{ @@ -80,60 +83,61 @@ handle_cast({doit, {ClientPid, ClientRef} = From, Nonce, MFA}, State) -> worker_pid = LocalPid }, {noreply, add_job(Job, State)}; - - handle_cast({kill, FromRef}, St) -> kill_worker(FromRef, St), {noreply, St}; - handle_cast({kill_all, FromRefs}, St) -> lists:foreach(fun(FromRef) -> kill_worker(FromRef, St) end, FromRefs), {noreply, St}; - handle_cast(_, St) -> couch_log:notice("rexi_server ignored_cast", []), {noreply, St}. -handle_info({'DOWN', Ref, process, _, normal}, #st{workers=Workers} = St) -> +handle_info({'DOWN', Ref, process, _, normal}, #st{workers = Workers} = St) -> case find_worker(Ref, Workers) of - #job{} = Job -> - {noreply, remove_job(Job, St)}; - false -> - {noreply, St} + #job{} = Job -> + {noreply, remove_job(Job, St)}; + false -> + {noreply, St} end; - -handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers=Workers} = St) -> +handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers = Workers} = St) -> case find_worker(Ref, Workers) of - #job{worker_pid=Pid, worker=Ref, client_pid=CPid, client=CRef} =Job -> - case Error of #error{reason = {_Class, Reason}, stack = Stack} -> - notify_caller({CPid, CRef}, {Reason, Stack}), - St1 = save_error(Error, St), - {noreply, remove_job(Job, St1)}; - _ -> - notify_caller({CPid, CRef}, Error), - {noreply, remove_job(Job, St)} - end; - false -> - {noreply, St} + #job{worker_pid = Pid, worker = Ref, client_pid = CPid, client = CRef} = Job -> + case Error of + #error{reason = {_Class, Reason}, stack = Stack} -> + notify_caller({CPid, CRef}, {Reason, Stack}), + St1 = save_error(Error, St), + {noreply, remove_job(Job, St1)}; + _ -> + notify_caller({CPid, CRef}, Error), + {noreply, remove_job(Job, St)} + end; + false -> + {noreply, St} end; - handle_info(_Info, St) -> {noreply, St}. terminate(_Reason, St) -> - ets:foldl(fun(#job{worker_pid=Pid},_) -> exit(Pid,kill) end, nil, - St#st.workers), + ets:foldl( + fun(#job{worker_pid = Pid}, _) -> exit(Pid, kill) end, + nil, + St#st.workers + ), ok. -code_change(_OldVsn, #st{}=State, _Extra) -> +code_change(_OldVsn, #st{} = State, _Extra) -> {ok, State}. init_p(From, MFA) -> init_p(From, MFA, undefined). %% @doc initializes a process started by rexi_server. --spec init_p({pid(), reference()}, {atom(), atom(), list()}, - string() | undefined) -> any(). +-spec init_p( + {pid(), reference()}, + {atom(), atom(), list()}, + string() | undefined +) -> any(). init_p(From, {M,F,A}, Nonce) -> put(rexi_from, From), put('$initial_call', {M,F,length(A)}), @@ -158,13 +162,19 @@ init_p(From, {M,F,A}, Nonce) -> save_error(_E, #st{error_limit = 0} = St) -> St; -save_error(E, #st{errors=Q, error_limit=L, error_count=C} = St) when C >= L -> +save_error(E, #st{errors = Q, error_limit = L, error_count = C} = St) when C >= L -> St#st{errors = queue:in(E, queue:drop(Q))}; -save_error(E, #st{errors=Q, error_count=C} = St) -> - St#st{errors = queue:in(E, Q), error_count = C+1}. +save_error(E, #st{errors = Q, error_count = C} = St) -> + St#st{errors = queue:in(E, Q), error_count = C + 1}. clean_stack(S) -> - lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end, S). + lists:map( + fun + ({M, F, A}) when is_list(A) -> {M, F, length(A)}; + (X) -> X + end, + S + ). add_job(Job, #st{workers = Workers, clients = Clients} = State) -> ets:insert(Workers, Job), @@ -177,19 +187,21 @@ remove_job(Job, #st{workers = Workers, clients = Clients} = State) -> State. find_worker(Ref, Tab) -> - case ets:lookup(Tab, Ref) of [] -> false; [Worker] -> Worker end. + case ets:lookup(Tab, Ref) of + [] -> false; + [Worker] -> Worker + end. notify_caller({Caller, Ref}, Reason) -> rexi_utils:send(Caller, {Ref, {rexi_EXIT, Reason}}). - kill_worker(FromRef, #st{clients = Clients} = St) -> case find_worker(FromRef, Clients) of - #job{worker = KeyRef, worker_pid = Pid} = Job -> - erlang:demonitor(KeyRef), - exit(Pid, kill), - remove_job(Job, St), - ok; - false -> - ok + #job{worker = KeyRef, worker_pid = Pid} = Job -> + erlang:demonitor(KeyRef), + exit(Pid, kill), + remove_job(Job, St), + ok; + false -> + ok end. diff --git a/src/rexi/src/rexi_server_mon.erl b/src/rexi/src/rexi_server_mon.erl index cfe1144ce..9057807e6 100644 --- a/src/rexi/src/rexi_server_mon.erl +++ b/src/rexi/src/rexi_server_mon.erl @@ -1,5 +1,5 @@ % Copyright 2010-2013 Cloudant -% +% % Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at @@ -17,13 +17,11 @@ -behaviour(mem3_cluster). -vsn(1). - -export([ start_link/1, status/0 ]). - -export([ init/1, terminate/2, @@ -34,23 +32,19 @@ ]). -export([ - cluster_stable/1, - cluster_unstable/1 + cluster_stable/1, + cluster_unstable/1 ]). - -define(CLUSTER_STABILITY_PERIOD_SEC, 15). - start_link(ChildMod) -> Name = list_to_atom(lists:concat([ChildMod, "_mon"])), gen_server:start_link({local, Name}, ?MODULE, ChildMod, []). - status() -> gen_server:call(?MODULE, status). - % Mem3 cluster callbacks cluster_unstable(Server) -> @@ -62,21 +56,22 @@ cluster_stable(Server) -> gen_server:cast(Server, cluster_stable), Server. - % gen_server callbacks init(ChildMod) -> - {ok, _Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), - ?CLUSTER_STABILITY_PERIOD_SEC, ?CLUSTER_STABILITY_PERIOD_SEC), + {ok, _Mem3Cluster} = mem3_cluster:start_link( + ?MODULE, + self(), + ?CLUSTER_STABILITY_PERIOD_SEC, + ?CLUSTER_STABILITY_PERIOD_SEC + ), start_servers(ChildMod), couch_log:notice("~s : started servers", [ChildMod]), {ok, ChildMod}. - terminate(_Reason, _St) -> ok. - handle_call(status, _From, ChildMod) -> case missing_servers(ChildMod) of [] -> @@ -84,7 +79,6 @@ handle_call(status, _From, ChildMod) -> Missing -> {reply, {waiting, length(Missing)}, ChildMod} end; - handle_call(Msg, _From, St) -> couch_log:notice("~s ignored_call ~w", [?MODULE, Msg]), {reply, ignored, St}. @@ -96,7 +90,6 @@ handle_cast(cluster_unstable, ChildMod) -> couch_log:notice("~s : cluster unstable", [ChildMod]), start_servers(ChildMod), {noreply, ChildMod}; - % When cluster is stable, start any servers for new nodes and stop servers for % the ones that disconnected. handle_cast(cluster_stable, ChildMod) -> @@ -104,51 +97,48 @@ handle_cast(cluster_stable, ChildMod) -> start_servers(ChildMod), stop_servers(ChildMod), {noreply, ChildMod}; - handle_cast(Msg, St) -> couch_log:notice("~s ignored_cast ~w", [?MODULE, Msg]), {noreply, St}. - handle_info(Msg, St) -> couch_log:notice("~s ignored_info ~w", [?MODULE, Msg]), {noreply, St}. - code_change(_OldVsn, nil, _Extra) -> {ok, rexi_server}; code_change(_OldVsn, St, _Extra) -> {ok, St}. - start_servers(ChildMod) -> - lists:foreach(fun(Id) -> - {ok, _} = start_server(ChildMod, Id) - end, missing_servers(ChildMod)). + lists:foreach( + fun(Id) -> + {ok, _} = start_server(ChildMod, Id) + end, + missing_servers(ChildMod) + ). stop_servers(ChildMod) -> - lists:foreach(fun(Id) -> - ok = stop_server(ChildMod, Id) - end, extra_servers(ChildMod)). - + lists:foreach( + fun(Id) -> + ok = stop_server(ChildMod, Id) + end, + extra_servers(ChildMod) + ). server_ids(ChildMod) -> Nodes = [node() | nodes()], [list_to_atom(lists:concat([ChildMod, "_", Node])) || Node <- Nodes]. - running_servers(ChildMod) -> [Id || {Id, _, _, _} <- supervisor:which_children(sup_module(ChildMod))]. - missing_servers(ChildMod) -> server_ids(ChildMod) -- running_servers(ChildMod). - extra_servers(ChildMod) -> running_servers(ChildMod) -- server_ids(ChildMod). - start_server(ChildMod, ChildId) -> ChildSpec = { ChildId, @@ -165,12 +155,10 @@ start_server(ChildMod, ChildId) -> erlang:error(Else) end. - stop_server(ChildMod, ChildId) -> SupMod = sup_module(ChildMod), ok = supervisor:terminate_child(SupMod, ChildId), ok = supervisor:delete_child(SupMod, ChildId). - sup_module(ChildMod) -> list_to_atom(lists:concat([ChildMod, "_sup"])). diff --git a/src/rexi/src/rexi_server_sup.erl b/src/rexi/src/rexi_server_sup.erl index 29c6ad60c..53497197f 100644 --- a/src/rexi/src/rexi_server_sup.erl +++ b/src/rexi/src/rexi_server_sup.erl @@ -15,15 +15,12 @@ -module(rexi_server_sup). -behaviour(supervisor). - -export([init/1]). -export([start_link/1]). - start_link(Name) -> supervisor:start_link({local, Name}, ?MODULE, []). - init([]) -> {ok, {{one_for_one, 1, 1}, []}}. diff --git a/src/rexi/src/rexi_sup.erl b/src/rexi/src/rexi_sup.erl index 3d9aa2a16..3bea0ed15 100644 --- a/src/rexi/src/rexi_sup.erl +++ b/src/rexi/src/rexi_sup.erl @@ -17,48 +17,49 @@ -export([init/1]). start_link(Args) -> - supervisor:start_link({local,?MODULE}, ?MODULE, Args). + supervisor:start_link({local, ?MODULE}, ?MODULE, Args). init([]) -> - {ok, {{rest_for_one, 3, 10}, [ - { - rexi_server, - {rexi_server, start_link, [rexi_server]}, - permanent, - 100, - worker, - [rexi_server] - }, - { - rexi_server_sup, - {rexi_server_sup, start_link, [rexi_server_sup]}, - permanent, - 100, - supervisor, - [rexi_server_sup] - }, - { - rexi_server_mon, - {rexi_server_mon, start_link, [rexi_server]}, - permanent, - 100, - worker, - [rexi_server_mon] - }, - { - rexi_buffer_sup, - {rexi_server_sup, start_link, [rexi_buffer_sup]}, - permanent, - 100, - supervisor, - [rexi_server_sup] - }, - { - rexi_buffer_mon, - {rexi_server_mon, start_link, [rexi_buffer]}, - permanent, - 100, - worker, - [rexi_server_mon] - } - ]}}. + {ok, + {{rest_for_one, 3, 10}, [ + { + rexi_server, + {rexi_server, start_link, [rexi_server]}, + permanent, + 100, + worker, + [rexi_server] + }, + { + rexi_server_sup, + {rexi_server_sup, start_link, [rexi_server_sup]}, + permanent, + 100, + supervisor, + [rexi_server_sup] + }, + { + rexi_server_mon, + {rexi_server_mon, start_link, [rexi_server]}, + permanent, + 100, + worker, + [rexi_server_mon] + }, + { + rexi_buffer_sup, + {rexi_server_sup, start_link, [rexi_buffer_sup]}, + permanent, + 100, + supervisor, + [rexi_server_sup] + }, + { + rexi_buffer_mon, + {rexi_server_mon, start_link, [rexi_buffer]}, + permanent, + 100, + worker, + [rexi_server_mon] + } + ]}}. diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl index 960318418..d59c5ea0f 100644 --- a/src/rexi/src/rexi_utils.erl +++ b/src/rexi/src/rexi_utils.erl @@ -17,10 +17,10 @@ %% @doc Return a rexi_server id for the given node. server_id(Node) -> case config:get_boolean("rexi", "server_per_node", true) of - true -> - list_to_atom("rexi_server_" ++ atom_to_list(Node)); - _ -> - rexi_server + true -> + list_to_atom("rexi_server_" ++ atom_to_list(Node)); + _ -> + rexi_server end. %% @doc Return a {server_id(node()), Node} Pid name for the given Node. @@ -30,11 +30,11 @@ server_pid(Node) -> %% @doc send a message as quickly as possible send(Dest, Msg) -> case erlang:send(Dest, Msg, [noconnect, nosuspend]) of - ok -> - ok; - _ -> - % treat nosuspend and noconnect the same - rexi_buffer:send(Dest, Msg) + ok -> + ok; + _ -> + % treat nosuspend and noconnect the same + rexi_buffer:send(Dest, Msg) end. %% @doc set up the receive loop with an overall timeout @@ -53,53 +53,53 @@ recv(Refs, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) -> process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of - {ok, Acc} -> - process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO); - {new_refs, NewRefList, Acc} -> - process_mailbox(NewRefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO); - {stop, Acc} -> - {ok, Acc}; - Error -> - Error + {ok, Acc} -> + process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO); + {new_refs, NewRefList, Acc} -> + process_mailbox(NewRefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO); + {stop, Acc} -> + {ok, Acc}; + Error -> + Error end. process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> receive - {timeout, TimeoutRef} -> - {timeout, Acc0}; - {rexi, Ref, Msg} -> - case lists:keyfind(Ref, Keypos, RefList) of - false -> + {timeout, TimeoutRef} -> + {timeout, Acc0}; + {rexi, Ref, Msg} -> + case lists:keyfind(Ref, Keypos, RefList) of + false -> + {ok, Acc0}; + Worker -> + Fun(Msg, Worker, Acc0) + end; + {rexi, Ref, From, Msg} -> + case lists:keyfind(Ref, Keypos, RefList) of + false -> + {ok, Acc0}; + Worker -> + Fun(Msg, {Worker, From}, Acc0) + end; + {rexi, '$rexi_ping'} -> {ok, Acc0}; - Worker -> - Fun(Msg, Worker, Acc0) - end; - {rexi, Ref, From, Msg} -> - case lists:keyfind(Ref, Keypos, RefList) of - false -> - {ok, Acc0}; - Worker -> - Fun(Msg, {Worker, From}, Acc0) - end; - {rexi, '$rexi_ping'} -> - {ok, Acc0}; - {Ref, Msg} -> - case lists:keyfind(Ref, Keypos, RefList) of - false -> - % this was some non-matching message which we will ignore - {ok, Acc0}; - Worker -> - Fun(Msg, Worker, Acc0) - end; - {Ref, From, Msg} -> - case lists:keyfind(Ref, Keypos, RefList) of - false -> - {ok, Acc0}; - Worker -> - Fun(Msg, {Worker, From}, Acc0) - end; - {rexi_DOWN, _, _, _} = Msg -> - Fun(Msg, nil, Acc0) + {Ref, Msg} -> + case lists:keyfind(Ref, Keypos, RefList) of + false -> + % this was some non-matching message which we will ignore + {ok, Acc0}; + Worker -> + Fun(Msg, Worker, Acc0) + end; + {Ref, From, Msg} -> + case lists:keyfind(Ref, Keypos, RefList) of + false -> + {ok, Acc0}; + Worker -> + Fun(Msg, {Worker, From}, Acc0) + end; + {rexi_DOWN, _, _, _} = Msg -> + Fun(Msg, nil, Acc0) after PerMsgTO -> {timeout, Acc0} end. |