summaryrefslogtreecommitdiff
path: root/src/rexi
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@gmail.com>2021-11-20 01:00:08 -0500
committerNick Vatamaniuc <nickva@users.noreply.github.com>2021-11-22 17:31:31 -0500
commitb78ccf18cb4ed6e183ed0bf4e5cbe40d7a7dc493 (patch)
tree82158f0b6c7e97e6955bf0c558aac6eb0329b410 /src/rexi
parent6e87e43fae23647b281ede250ad9f1a68a8f1cde (diff)
downloadcouchdb-b78ccf18cb4ed6e183ed0bf4e5cbe40d7a7dc493.tar.gz
Apply erlfmt formatting to source tree
These exceptions from main were ported over to 3.x ``` --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -491,6 +491,7 @@ extract_cookie(#httpd{mochi_req = MochiReq}) -> end. %%% end hack +%% erlfmt-ignore set_auth_handlers() -> AuthenticationDefault = "{chttpd_auth, cookie_authentication_handler}, ``` ``` --- a/src/couch/src/couch_debug.erl +++ b/src/couch/src/couch_debug.erl @@ -49,6 +49,7 @@ help() -> ]. -spec help(Function :: atom()) -> ok. +%% erlfmt-ignore help(opened_files) -> ```
Diffstat (limited to 'src/rexi')
-rw-r--r--src/rexi/src/rexi.erl79
-rw-r--r--src/rexi/src/rexi_app.erl1
-rw-r--r--src/rexi/src/rexi_buffer.erl29
-rw-r--r--src/rexi/src/rexi_monitor.erl24
-rw-r--r--src/rexi/src/rexi_server.erl132
-rw-r--r--src/rexi/src/rexi_server_mon.erl54
-rw-r--r--src/rexi/src/rexi_server_sup.erl3
-rw-r--r--src/rexi/src/rexi_sup.erl87
-rw-r--r--src/rexi/src/rexi_utils.erl102
9 files changed, 263 insertions, 248 deletions
diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl
index 170503b7c..fb1763e51 100644
--- a/src/rexi/src/rexi.erl
+++ b/src/rexi/src/rexi.erl
@@ -30,8 +30,8 @@ stop() ->
application:stop(rexi).
restart() ->
- stop(), start().
-
+ stop(),
+ start().
%% @equiv cast(Node, self(), MFA)
-spec cast(node(), {atom(), atom(), list()}) -> reference().
@@ -86,22 +86,34 @@ kill_all(NodeRefs) when is_list(NodeRefs) ->
%% configure the cluster to send kill_all messages.
case config:get_boolean("rexi", "use_kill_all", false) of
true ->
- PerNodeMap = lists:foldl(fun({Node, Ref}, Acc) ->
- maps:update_with(Node, fun(Refs) ->
- [Ref | Refs]
- end, [Ref], Acc)
- end, #{}, NodeRefs),
- maps:map(fun(Node, Refs) ->
- ServerPid = rexi_utils:server_pid(Node),
- rexi_utils:send(ServerPid, cast_msg({kill_all, Refs}))
- end, PerNodeMap);
+ PerNodeMap = lists:foldl(
+ fun({Node, Ref}, Acc) ->
+ maps:update_with(
+ Node,
+ fun(Refs) ->
+ [Ref | Refs]
+ end,
+ [Ref],
+ Acc
+ )
+ end,
+ #{},
+ NodeRefs
+ ),
+ maps:map(
+ fun(Node, Refs) ->
+ ServerPid = rexi_utils:server_pid(Node),
+ rexi_utils:send(ServerPid, cast_msg({kill_all, Refs}))
+ end,
+ PerNodeMap
+ );
false ->
lists:foreach(fun({Node, Ref}) -> kill(Node, Ref) end, NodeRefs)
end,
ok.
%% @equiv async_server_call(Server, self(), Request)
--spec async_server_call(pid() | {atom(),node()}, any()) -> reference().
+-spec async_server_call(pid() | {atom(), node()}, any()) -> reference().
async_server_call(Server, Request) ->
async_server_call(Server, self(), Request).
@@ -110,17 +122,17 @@ async_server_call(Server, Request) ->
%% function acts more like cast() than call() in that the server process
%% is not monitored. Clients who want to know if the server is alive should
%% monitor it themselves before calling this function.
--spec async_server_call(pid() | {atom(),node()}, pid(), any()) -> reference().
+-spec async_server_call(pid() | {atom(), node()}, pid(), any()) -> reference().
async_server_call(Server, Caller, Request) ->
Ref = make_ref(),
- rexi_utils:send(Server, {'$gen_call', {Caller,Ref}, Request}),
+ rexi_utils:send(Server, {'$gen_call', {Caller, Ref}, Request}),
Ref.
%% @doc convenience function to reply to the original rexi Caller.
-spec reply(any()) -> any().
reply(Reply) ->
{Caller, Ref} = get(rexi_from),
- erlang:send(Caller, {Ref,Reply}).
+ erlang:send(Caller, {Ref, Reply}).
%% @equiv sync_reply(Reply, 300000)
sync_reply(Reply) ->
@@ -133,9 +145,10 @@ sync_reply(Reply) ->
sync_reply(Reply, Timeout) ->
{Caller, Ref} = get(rexi_from),
Tag = make_ref(),
- erlang:send(Caller, {Ref, {self(),Tag}, Reply}),
- receive {Tag, Response} ->
- Response
+ erlang:send(Caller, {Ref, {self(), Tag}, Reply}),
+ receive
+ {Tag, Response} ->
+ Response
after Timeout ->
timeout
end.
@@ -174,7 +187,7 @@ stream_init(Timeout) ->
%% sending messages. The `From` should be the value provided by
%% the worker in the rexi_STREAM_INIT message.
-spec stream_start({pid(), any()}) -> ok.
-stream_start({Pid, _Tag}=From) when is_pid(Pid) ->
+stream_start({Pid, _Tag} = From) when is_pid(Pid) ->
gen_server:reply(From, rexi_STREAM_START).
%% @doc Cancel a worker stream
@@ -184,7 +197,7 @@ stream_start({Pid, _Tag}=From) when is_pid(Pid) ->
%% The `From` should be the value provided by the worker in the
%% rexi_STREAM_INIT message.
-spec stream_cancel({pid(), any()}) -> ok.
-stream_cancel({Pid, _Tag}=From) when is_pid(Pid) ->
+stream_cancel({Pid, _Tag} = From) when is_pid(Pid) ->
gen_server:reply(From, rexi_STREAM_CANCEL).
%% @equiv stream(Msg, 100, 300000)
@@ -202,13 +215,14 @@ stream(Msg, Limit) ->
stream(Msg, Limit, Timeout) ->
try maybe_wait(Limit, Timeout) of
{ok, Count} ->
- put(rexi_unacked, Count+1),
+ put(rexi_unacked, Count + 1),
{Caller, Ref} = get(rexi_from),
erlang:send(Caller, {Ref, self(), Msg}),
ok
- catch throw:timeout ->
- couch_stats:increment_counter([rexi, streams, timeout, stream]),
- exit(timeout)
+ catch
+ throw:timeout ->
+ couch_stats:increment_counter([rexi, streams, timeout, stream]),
+ exit(timeout)
end.
%% @equiv stream2(Msg, 5, 300000)
@@ -230,13 +244,14 @@ stream2(Msg, Limit, Timeout) ->
maybe_init_stream(Timeout),
try maybe_wait(Limit, Timeout) of
{ok, Count} ->
- put(rexi_unacked, Count+1),
+ put(rexi_unacked, Count + 1),
{Caller, Ref} = get(rexi_from),
erlang:send(Caller, {Ref, self(), Msg}),
ok
- catch throw:timeout ->
- couch_stats:increment_counter([rexi, streams, timeout, stream]),
- exit(timeout)
+ catch
+ throw:timeout ->
+ couch_stats:increment_counter([rexi, streams, timeout, stream]),
+ exit(timeout)
end.
%% @equiv stream_last(Msg, 300000)
@@ -259,14 +274,12 @@ stream_ack(Client) ->
stream_ack(Client, N) ->
erlang:send(Client, {rexi_ack, N}).
-
%% Sends a ping message to the coordinator. This is for long running
%% operations on a node that could exceed the rexi timeout
-ping() ->
+ping() ->
{Caller, _} = get(rexi_from),
erlang:send(Caller, {rexi, '$rexi_ping'}).
-
%% internal functions %%
cast_msg(Msg) -> {'$gen_cast', Msg}.
@@ -304,7 +317,7 @@ maybe_wait(Limit, Timeout) ->
wait_for_ack(Count, Timeout) ->
receive
- {rexi_ack, N} -> drain_acks(Count-N)
+ {rexi_ack, N} -> drain_acks(Count - N)
after Timeout ->
couch_stats:increment_counter([rexi, streams, timeout, wait_for_ack]),
throw(timeout)
@@ -314,7 +327,7 @@ drain_acks(Count) when Count < 0 ->
erlang:error(mismatched_rexi_ack);
drain_acks(Count) ->
receive
- {rexi_ack, N} -> drain_acks(Count-N)
+ {rexi_ack, N} -> drain_acks(Count - N)
after 0 ->
{ok, Count}
end.
diff --git a/src/rexi/src/rexi_app.erl b/src/rexi/src/rexi_app.erl
index 0f1e892b5..61e7886e1 100644
--- a/src/rexi/src/rexi_app.erl
+++ b/src/rexi/src/rexi_app.erl
@@ -14,7 +14,6 @@
-behaviour(application).
-export([start/2, stop/1]).
-
start(_Type, StartArgs) ->
rexi_sup:start_link(StartArgs).
diff --git a/src/rexi/src/rexi_buffer.erl b/src/rexi/src/rexi_buffer.erl
index d16dc8ba3..7f0079f03 100644
--- a/src/rexi/src/rexi_buffer.erl
+++ b/src/rexi/src/rexi_buffer.erl
@@ -15,10 +15,16 @@
-vsn(1).
% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+]).
--export ([
+-export([
send/2,
start_link/1
]).
@@ -37,7 +43,6 @@ send(Dest, Msg) ->
Server = list_to_atom(lists:concat([rexi_buffer, "_", get_node(Dest)])),
gen_server:cast(Server, {deliver, Dest, Msg}).
-
init(_) ->
%% TODO Leverage os_mon to discover available memory in the system
Max = list_to_integer(config:get("rexi", "buffer_count", "2000")),
@@ -45,7 +50,6 @@ init(_) ->
handle_call(erase_buffer, _From, State) ->
{reply, ok, State#state{buffer = queue:new(), count = 0}, 0};
-
handle_call(get_buffered_count, _From, State) ->
{reply, State#state.count, State, 0}.
@@ -53,19 +57,19 @@ handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) ->
couch_stats:increment_counter([rexi, buffered]),
Q2 = queue:in({Dest, Msg}, Q),
case should_drop(State) of
- true ->
- couch_stats:increment_counter([rexi, dropped]),
+ true ->
+ couch_stats:increment_counter([rexi, dropped]),
{noreply, State#state{buffer = queue:drop(Q2)}, 0};
- false ->
- {noreply, State#state{buffer = Q2, count = C+1}, 0}
+ false ->
+ {noreply, State#state{buffer = Q2, count = C + 1}, 0}
end.
-handle_info(timeout, #state{sender = nil, buffer = {[],[]}, count = 0}=State) ->
+handle_info(timeout, #state{sender = nil, buffer = {[], []}, count = 0} = State) ->
{noreply, State};
handle_info(timeout, #state{sender = nil, count = C} = State) when C > 0 ->
#state{buffer = Q, count = C} = State,
{{value, {Dest, Msg}}, Q2} = queue:out_r(Q),
- NewState = State#state{buffer = Q2, count = C-1},
+ NewState = State#state{buffer = Q2, count = C - 1},
case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
ok when C =:= 1 ->
% We just sent the last queued messsage, we'll use this opportunity
@@ -82,7 +86,6 @@ handle_info(timeout, #state{sender = nil, count = C} = State) when C > 0 ->
handle_info(timeout, State) ->
% Waiting on a sender to return
{noreply, State};
-
handle_info({'DOWN', Ref, _, Pid, _}, #state{sender = {Pid, Ref}} = State) ->
{noreply, State#state{sender = nil}, 0}.
@@ -91,7 +94,7 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, {state, Buffer, Sender, Count}, _Extra) ->
Max = list_to_integer(config:get("rexi", "buffer_count", "2000")),
- {ok, #state{buffer=Buffer, sender=Sender, count=Count, max_count=Max}};
+ {ok, #state{buffer = Buffer, sender = Sender, count = Count, max_count = Max}};
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
diff --git a/src/rexi/src/rexi_monitor.erl b/src/rexi/src/rexi_monitor.erl
index f90ec5160..7fe66db71 100644
--- a/src/rexi/src/rexi_monitor.erl
+++ b/src/rexi/src/rexi_monitor.erl
@@ -14,16 +14,17 @@
-export([start/1, stop/1]).
-export([wait_monitors/1]).
-
%% @doc spawn_links a process which monitors the supplied list of items and
%% returns the process ID. If a monitored process exits, the caller will
%% receive a {rexi_DOWN, MonitoringPid, DeadPid, Reason} message.
--spec start([pid() | atom() | {atom(),node()}]) -> pid().
+-spec start([pid() | atom() | {atom(), node()}]) -> pid().
start(Procs) ->
Parent = self(),
Nodes = [node() | nodes()],
- {Mon, Skip} = lists:partition(fun(P) -> should_monitor(P, Nodes) end,
- Procs),
+ {Mon, Skip} = lists:partition(
+ fun(P) -> should_monitor(P, Nodes) end,
+ Procs
+ ),
spawn_link(fun() ->
[notify_parent(Parent, P, noconnect) || P <- Skip],
[erlang:monitor(process, P) || P <- Mon],
@@ -50,16 +51,17 @@ should_monitor({_, Node}, Nodes) ->
wait_monitors(Parent) ->
receive
- {'DOWN', _, process, Pid, Reason} ->
- notify_parent(Parent, Pid, Reason),
- ?MODULE:wait_monitors(Parent);
- {Parent, shutdown} ->
- ok
+ {'DOWN', _, process, Pid, Reason} ->
+ notify_parent(Parent, Pid, Reason),
+ ?MODULE:wait_monitors(Parent);
+ {Parent, shutdown} ->
+ ok
end.
flush_down_messages() ->
- receive {rexi_DOWN, _, _, _} ->
- flush_down_messages()
+ receive
+ {rexi_DOWN, _, _, _} ->
+ flush_down_messages()
after 0 ->
ok
end.
diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl
index 4cd9ce66e..47c128d7b 100644
--- a/src/rexi/src/rexi_server.erl
+++ b/src/rexi/src/rexi_server.erl
@@ -13,8 +13,14 @@
-module(rexi_server).
-behaviour(gen_server).
-vsn(1).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+]).
-export([start_link/1, init_p/2, init_p/3]).
@@ -24,10 +30,10 @@
-include_lib("rexi/include/rexi.hrl").
-record(job, {
- client::reference(),
- worker::reference(),
- client_pid::pid(),
- worker_pid::pid()
+ client :: reference(),
+ worker :: reference(),
+ client_pid :: pid(),
+ worker_pid :: pid()
}).
-record(st, {
@@ -47,30 +53,27 @@ init([]) ->
handle_call(get_errors, _From, #st{errors = Errors} = St) ->
{reply, {ok, lists:reverse(queue:to_list(Errors))}, St};
-
handle_call(get_last_error, _From, #st{errors = Errors} = St) ->
try
{reply, {ok, queue:get_r(Errors)}, St}
- catch error:empty ->
- {reply, {error, empty}, St}
+ catch
+ error:empty ->
+ {reply, {error, empty}, St}
end;
-
-handle_call({set_error_limit, N}, _From, #st{error_count=Len, errors=Q} = St) ->
- if N < Len ->
- {NewQ, _} = queue:split(N, Q);
- true ->
- NewQ = Q
+handle_call({set_error_limit, N}, _From, #st{error_count = Len, errors = Q} = St) ->
+ if
+ N < Len ->
+ {NewQ, _} = queue:split(N, Q);
+ true ->
+ NewQ = Q
end,
NewLen = queue:len(NewQ),
- {reply, ok, St#st{error_limit=N, error_count=NewLen, errors=NewQ}};
-
+ {reply, ok, St#st{error_limit = N, error_count = NewLen, errors = NewQ}};
handle_call(_Request, _From, St) ->
{reply, ignored, St}.
-
handle_cast({doit, From, MFA}, St) ->
handle_cast({doit, From, undefined, MFA}, St);
-
handle_cast({doit, {ClientPid, ClientRef} = From, Nonce, MFA}, State) ->
{LocalPid, Ref} = spawn_monitor(?MODULE, init_p, [From, MFA, Nonce]),
Job = #job{
@@ -80,60 +83,61 @@ handle_cast({doit, {ClientPid, ClientRef} = From, Nonce, MFA}, State) ->
worker_pid = LocalPid
},
{noreply, add_job(Job, State)};
-
-
handle_cast({kill, FromRef}, St) ->
kill_worker(FromRef, St),
{noreply, St};
-
handle_cast({kill_all, FromRefs}, St) ->
lists:foreach(fun(FromRef) -> kill_worker(FromRef, St) end, FromRefs),
{noreply, St};
-
handle_cast(_, St) ->
couch_log:notice("rexi_server ignored_cast", []),
{noreply, St}.
-handle_info({'DOWN', Ref, process, _, normal}, #st{workers=Workers} = St) ->
+handle_info({'DOWN', Ref, process, _, normal}, #st{workers = Workers} = St) ->
case find_worker(Ref, Workers) of
- #job{} = Job ->
- {noreply, remove_job(Job, St)};
- false ->
- {noreply, St}
+ #job{} = Job ->
+ {noreply, remove_job(Job, St)};
+ false ->
+ {noreply, St}
end;
-
-handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers=Workers} = St) ->
+handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers = Workers} = St) ->
case find_worker(Ref, Workers) of
- #job{worker_pid=Pid, worker=Ref, client_pid=CPid, client=CRef} =Job ->
- case Error of #error{reason = {_Class, Reason}, stack = Stack} ->
- notify_caller({CPid, CRef}, {Reason, Stack}),
- St1 = save_error(Error, St),
- {noreply, remove_job(Job, St1)};
- _ ->
- notify_caller({CPid, CRef}, Error),
- {noreply, remove_job(Job, St)}
- end;
- false ->
- {noreply, St}
+ #job{worker_pid = Pid, worker = Ref, client_pid = CPid, client = CRef} = Job ->
+ case Error of
+ #error{reason = {_Class, Reason}, stack = Stack} ->
+ notify_caller({CPid, CRef}, {Reason, Stack}),
+ St1 = save_error(Error, St),
+ {noreply, remove_job(Job, St1)};
+ _ ->
+ notify_caller({CPid, CRef}, Error),
+ {noreply, remove_job(Job, St)}
+ end;
+ false ->
+ {noreply, St}
end;
-
handle_info(_Info, St) ->
{noreply, St}.
terminate(_Reason, St) ->
- ets:foldl(fun(#job{worker_pid=Pid},_) -> exit(Pid,kill) end, nil,
- St#st.workers),
+ ets:foldl(
+ fun(#job{worker_pid = Pid}, _) -> exit(Pid, kill) end,
+ nil,
+ St#st.workers
+ ),
ok.
-code_change(_OldVsn, #st{}=State, _Extra) ->
+code_change(_OldVsn, #st{} = State, _Extra) ->
{ok, State}.
init_p(From, MFA) ->
init_p(From, MFA, undefined).
%% @doc initializes a process started by rexi_server.
--spec init_p({pid(), reference()}, {atom(), atom(), list()},
- string() | undefined) -> any().
+-spec init_p(
+ {pid(), reference()},
+ {atom(), atom(), list()},
+ string() | undefined
+) -> any().
init_p(From, {M,F,A}, Nonce) ->
put(rexi_from, From),
put('$initial_call', {M,F,length(A)}),
@@ -158,13 +162,19 @@ init_p(From, {M,F,A}, Nonce) ->
save_error(_E, #st{error_limit = 0} = St) ->
St;
-save_error(E, #st{errors=Q, error_limit=L, error_count=C} = St) when C >= L ->
+save_error(E, #st{errors = Q, error_limit = L, error_count = C} = St) when C >= L ->
St#st{errors = queue:in(E, queue:drop(Q))};
-save_error(E, #st{errors=Q, error_count=C} = St) ->
- St#st{errors = queue:in(E, Q), error_count = C+1}.
+save_error(E, #st{errors = Q, error_count = C} = St) ->
+ St#st{errors = queue:in(E, Q), error_count = C + 1}.
clean_stack(S) ->
- lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end, S).
+ lists:map(
+ fun
+ ({M, F, A}) when is_list(A) -> {M, F, length(A)};
+ (X) -> X
+ end,
+ S
+ ).
add_job(Job, #st{workers = Workers, clients = Clients} = State) ->
ets:insert(Workers, Job),
@@ -177,19 +187,21 @@ remove_job(Job, #st{workers = Workers, clients = Clients} = State) ->
State.
find_worker(Ref, Tab) ->
- case ets:lookup(Tab, Ref) of [] -> false; [Worker] -> Worker end.
+ case ets:lookup(Tab, Ref) of
+ [] -> false;
+ [Worker] -> Worker
+ end.
notify_caller({Caller, Ref}, Reason) ->
rexi_utils:send(Caller, {Ref, {rexi_EXIT, Reason}}).
-
kill_worker(FromRef, #st{clients = Clients} = St) ->
case find_worker(FromRef, Clients) of
- #job{worker = KeyRef, worker_pid = Pid} = Job ->
- erlang:demonitor(KeyRef),
- exit(Pid, kill),
- remove_job(Job, St),
- ok;
- false ->
- ok
+ #job{worker = KeyRef, worker_pid = Pid} = Job ->
+ erlang:demonitor(KeyRef),
+ exit(Pid, kill),
+ remove_job(Job, St),
+ ok;
+ false ->
+ ok
end.
diff --git a/src/rexi/src/rexi_server_mon.erl b/src/rexi/src/rexi_server_mon.erl
index cfe1144ce..9057807e6 100644
--- a/src/rexi/src/rexi_server_mon.erl
+++ b/src/rexi/src/rexi_server_mon.erl
@@ -1,5 +1,5 @@
% Copyright 2010-2013 Cloudant
-%
+%
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
@@ -17,13 +17,11 @@
-behaviour(mem3_cluster).
-vsn(1).
-
-export([
start_link/1,
status/0
]).
-
-export([
init/1,
terminate/2,
@@ -34,23 +32,19 @@
]).
-export([
- cluster_stable/1,
- cluster_unstable/1
+ cluster_stable/1,
+ cluster_unstable/1
]).
-
-define(CLUSTER_STABILITY_PERIOD_SEC, 15).
-
start_link(ChildMod) ->
Name = list_to_atom(lists:concat([ChildMod, "_mon"])),
gen_server:start_link({local, Name}, ?MODULE, ChildMod, []).
-
status() ->
gen_server:call(?MODULE, status).
-
% Mem3 cluster callbacks
cluster_unstable(Server) ->
@@ -62,21 +56,22 @@ cluster_stable(Server) ->
gen_server:cast(Server, cluster_stable),
Server.
-
% gen_server callbacks
init(ChildMod) ->
- {ok, _Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(),
- ?CLUSTER_STABILITY_PERIOD_SEC, ?CLUSTER_STABILITY_PERIOD_SEC),
+ {ok, _Mem3Cluster} = mem3_cluster:start_link(
+ ?MODULE,
+ self(),
+ ?CLUSTER_STABILITY_PERIOD_SEC,
+ ?CLUSTER_STABILITY_PERIOD_SEC
+ ),
start_servers(ChildMod),
couch_log:notice("~s : started servers", [ChildMod]),
{ok, ChildMod}.
-
terminate(_Reason, _St) ->
ok.
-
handle_call(status, _From, ChildMod) ->
case missing_servers(ChildMod) of
[] ->
@@ -84,7 +79,6 @@ handle_call(status, _From, ChildMod) ->
Missing ->
{reply, {waiting, length(Missing)}, ChildMod}
end;
-
handle_call(Msg, _From, St) ->
couch_log:notice("~s ignored_call ~w", [?MODULE, Msg]),
{reply, ignored, St}.
@@ -96,7 +90,6 @@ handle_cast(cluster_unstable, ChildMod) ->
couch_log:notice("~s : cluster unstable", [ChildMod]),
start_servers(ChildMod),
{noreply, ChildMod};
-
% When cluster is stable, start any servers for new nodes and stop servers for
% the ones that disconnected.
handle_cast(cluster_stable, ChildMod) ->
@@ -104,51 +97,48 @@ handle_cast(cluster_stable, ChildMod) ->
start_servers(ChildMod),
stop_servers(ChildMod),
{noreply, ChildMod};
-
handle_cast(Msg, St) ->
couch_log:notice("~s ignored_cast ~w", [?MODULE, Msg]),
{noreply, St}.
-
handle_info(Msg, St) ->
couch_log:notice("~s ignored_info ~w", [?MODULE, Msg]),
{noreply, St}.
-
code_change(_OldVsn, nil, _Extra) ->
{ok, rexi_server};
code_change(_OldVsn, St, _Extra) ->
{ok, St}.
-
start_servers(ChildMod) ->
- lists:foreach(fun(Id) ->
- {ok, _} = start_server(ChildMod, Id)
- end, missing_servers(ChildMod)).
+ lists:foreach(
+ fun(Id) ->
+ {ok, _} = start_server(ChildMod, Id)
+ end,
+ missing_servers(ChildMod)
+ ).
stop_servers(ChildMod) ->
- lists:foreach(fun(Id) ->
- ok = stop_server(ChildMod, Id)
- end, extra_servers(ChildMod)).
-
+ lists:foreach(
+ fun(Id) ->
+ ok = stop_server(ChildMod, Id)
+ end,
+ extra_servers(ChildMod)
+ ).
server_ids(ChildMod) ->
Nodes = [node() | nodes()],
[list_to_atom(lists:concat([ChildMod, "_", Node])) || Node <- Nodes].
-
running_servers(ChildMod) ->
[Id || {Id, _, _, _} <- supervisor:which_children(sup_module(ChildMod))].
-
missing_servers(ChildMod) ->
server_ids(ChildMod) -- running_servers(ChildMod).
-
extra_servers(ChildMod) ->
running_servers(ChildMod) -- server_ids(ChildMod).
-
start_server(ChildMod, ChildId) ->
ChildSpec = {
ChildId,
@@ -165,12 +155,10 @@ start_server(ChildMod, ChildId) ->
erlang:error(Else)
end.
-
stop_server(ChildMod, ChildId) ->
SupMod = sup_module(ChildMod),
ok = supervisor:terminate_child(SupMod, ChildId),
ok = supervisor:delete_child(SupMod, ChildId).
-
sup_module(ChildMod) ->
list_to_atom(lists:concat([ChildMod, "_sup"])).
diff --git a/src/rexi/src/rexi_server_sup.erl b/src/rexi/src/rexi_server_sup.erl
index 29c6ad60c..53497197f 100644
--- a/src/rexi/src/rexi_server_sup.erl
+++ b/src/rexi/src/rexi_server_sup.erl
@@ -15,15 +15,12 @@
-module(rexi_server_sup).
-behaviour(supervisor).
-
-export([init/1]).
-export([start_link/1]).
-
start_link(Name) ->
supervisor:start_link({local, Name}, ?MODULE, []).
-
init([]) ->
{ok, {{one_for_one, 1, 1}, []}}.
diff --git a/src/rexi/src/rexi_sup.erl b/src/rexi/src/rexi_sup.erl
index 3d9aa2a16..3bea0ed15 100644
--- a/src/rexi/src/rexi_sup.erl
+++ b/src/rexi/src/rexi_sup.erl
@@ -17,48 +17,49 @@
-export([init/1]).
start_link(Args) ->
- supervisor:start_link({local,?MODULE}, ?MODULE, Args).
+ supervisor:start_link({local, ?MODULE}, ?MODULE, Args).
init([]) ->
- {ok, {{rest_for_one, 3, 10}, [
- {
- rexi_server,
- {rexi_server, start_link, [rexi_server]},
- permanent,
- 100,
- worker,
- [rexi_server]
- },
- {
- rexi_server_sup,
- {rexi_server_sup, start_link, [rexi_server_sup]},
- permanent,
- 100,
- supervisor,
- [rexi_server_sup]
- },
- {
- rexi_server_mon,
- {rexi_server_mon, start_link, [rexi_server]},
- permanent,
- 100,
- worker,
- [rexi_server_mon]
- },
- {
- rexi_buffer_sup,
- {rexi_server_sup, start_link, [rexi_buffer_sup]},
- permanent,
- 100,
- supervisor,
- [rexi_server_sup]
- },
- {
- rexi_buffer_mon,
- {rexi_server_mon, start_link, [rexi_buffer]},
- permanent,
- 100,
- worker,
- [rexi_server_mon]
- }
- ]}}.
+ {ok,
+ {{rest_for_one, 3, 10}, [
+ {
+ rexi_server,
+ {rexi_server, start_link, [rexi_server]},
+ permanent,
+ 100,
+ worker,
+ [rexi_server]
+ },
+ {
+ rexi_server_sup,
+ {rexi_server_sup, start_link, [rexi_server_sup]},
+ permanent,
+ 100,
+ supervisor,
+ [rexi_server_sup]
+ },
+ {
+ rexi_server_mon,
+ {rexi_server_mon, start_link, [rexi_server]},
+ permanent,
+ 100,
+ worker,
+ [rexi_server_mon]
+ },
+ {
+ rexi_buffer_sup,
+ {rexi_server_sup, start_link, [rexi_buffer_sup]},
+ permanent,
+ 100,
+ supervisor,
+ [rexi_server_sup]
+ },
+ {
+ rexi_buffer_mon,
+ {rexi_server_mon, start_link, [rexi_buffer]},
+ permanent,
+ 100,
+ worker,
+ [rexi_server_mon]
+ }
+ ]}}.
diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl
index 960318418..d59c5ea0f 100644
--- a/src/rexi/src/rexi_utils.erl
+++ b/src/rexi/src/rexi_utils.erl
@@ -17,10 +17,10 @@
%% @doc Return a rexi_server id for the given node.
server_id(Node) ->
case config:get_boolean("rexi", "server_per_node", true) of
- true ->
- list_to_atom("rexi_server_" ++ atom_to_list(Node));
- _ ->
- rexi_server
+ true ->
+ list_to_atom("rexi_server_" ++ atom_to_list(Node));
+ _ ->
+ rexi_server
end.
%% @doc Return a {server_id(node()), Node} Pid name for the given Node.
@@ -30,11 +30,11 @@ server_pid(Node) ->
%% @doc send a message as quickly as possible
send(Dest, Msg) ->
case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
- ok ->
- ok;
- _ ->
- % treat nosuspend and noconnect the same
- rexi_buffer:send(Dest, Msg)
+ ok ->
+ ok;
+ _ ->
+ % treat nosuspend and noconnect the same
+ rexi_buffer:send(Dest, Msg)
end.
%% @doc set up the receive loop with an overall timeout
@@ -53,53 +53,53 @@ recv(Refs, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) ->
process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of
- {ok, Acc} ->
- process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO);
- {new_refs, NewRefList, Acc} ->
- process_mailbox(NewRefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO);
- {stop, Acc} ->
- {ok, Acc};
- Error ->
- Error
+ {ok, Acc} ->
+ process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO);
+ {new_refs, NewRefList, Acc} ->
+ process_mailbox(NewRefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO);
+ {stop, Acc} ->
+ {ok, Acc};
+ Error ->
+ Error
end.
process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
receive
- {timeout, TimeoutRef} ->
- {timeout, Acc0};
- {rexi, Ref, Msg} ->
- case lists:keyfind(Ref, Keypos, RefList) of
- false ->
+ {timeout, TimeoutRef} ->
+ {timeout, Acc0};
+ {rexi, Ref, Msg} ->
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, Worker, Acc0)
+ end;
+ {rexi, Ref, From, Msg} ->
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, {Worker, From}, Acc0)
+ end;
+ {rexi, '$rexi_ping'} ->
{ok, Acc0};
- Worker ->
- Fun(Msg, Worker, Acc0)
- end;
- {rexi, Ref, From, Msg} ->
- case lists:keyfind(Ref, Keypos, RefList) of
- false ->
- {ok, Acc0};
- Worker ->
- Fun(Msg, {Worker, From}, Acc0)
- end;
- {rexi, '$rexi_ping'} ->
- {ok, Acc0};
- {Ref, Msg} ->
- case lists:keyfind(Ref, Keypos, RefList) of
- false ->
- % this was some non-matching message which we will ignore
- {ok, Acc0};
- Worker ->
- Fun(Msg, Worker, Acc0)
- end;
- {Ref, From, Msg} ->
- case lists:keyfind(Ref, Keypos, RefList) of
- false ->
- {ok, Acc0};
- Worker ->
- Fun(Msg, {Worker, From}, Acc0)
- end;
- {rexi_DOWN, _, _, _} = Msg ->
- Fun(Msg, nil, Acc0)
+ {Ref, Msg} ->
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ % this was some non-matching message which we will ignore
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, Worker, Acc0)
+ end;
+ {Ref, From, Msg} ->
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, {Worker, From}, Acc0)
+ end;
+ {rexi_DOWN, _, _, _} = Msg ->
+ Fun(Msg, nil, Acc0)
after PerMsgTO ->
{timeout, Acc0}
end.