diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2013-03-18 22:51:29 -0500 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2014-07-23 17:56:17 +0100 |
commit | f5907c4b89456d3120a4d1e1080d9203e6522816 (patch) | |
tree | 6dfc1110a7651ffc2926487d6e31b74fa45c9abb | |
parent | 6cd3b812c0bb5c956d8264aae0ffdd94bbd52314 (diff) | |
download | couchdb-f5907c4b89456d3120a4d1e1080d9203e6522816.tar.gz |
[1/3] Start a rexi_server per remote node
This is part of a multi-release upgrade to switch to using a rexi_server
instance per remote node. The first commit introduces the new
rexi_server instances, the second switches to using the new instances
and the third will remove the singleton rexi_server. Each of these
commits should be in a separate release.
The new pattern introduced by this commit will start a 'rexi_server_%b'
process where the '%b' is replaced by the `erlang:phash2(Node)` for
which it will service requests. After the second commit is release each
rexi generated message will be sent to the rexi_server instance on the
remote node handling requests for the local node. The
`rexi_utils:server_pid/1` function is used to generate the id of the
remote server.
-rw-r--r-- | src/rexi.erl | 12 | ||||
-rw-r--r-- | src/rexi_server.erl | 6 | ||||
-rw-r--r-- | src/rexi_server_mon.erl | 123 | ||||
-rw-r--r-- | src/rexi_server_sup.erl | 29 | ||||
-rw-r--r-- | src/rexi_sup.erl | 42 | ||||
-rw-r--r-- | src/rexi_utils.erl | 10 |
6 files changed, 199 insertions, 23 deletions
diff --git a/src/rexi.erl b/src/rexi.erl index 8e53dba0f..a4c786dc3 100644 --- a/src/rexi.erl +++ b/src/rexi.erl @@ -15,7 +15,6 @@ -export([cast/2, cast/3, cast/4, kill/2]). -export([reply/1, sync_reply/1, sync_reply/2]). -export([async_server_call/2, async_server_call/3]). --export([get_errors/0, get_last_error/0, set_error_limit/1]). -export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]). -include_lib("rexi/include/rexi.hrl"). @@ -31,17 +30,6 @@ stop() -> restart() -> stop(), start(). --spec get_errors() -> {ok, [#error{}]}. -get_errors() -> - gen_server:call(?SERVER, get_errors). - --spec get_last_error() -> {ok, #error{}} | {error, empty}. -get_last_error() -> - gen_server:call(?SERVER, get_last_error). - --spec set_error_limit(pos_integer()) -> ok. -set_error_limit(N) when is_integer(N), N > 0 -> - gen_server:call(?SERVER, {set_error_limit, N}). %% @equiv cast(Node, self(), MFA) -spec cast(node(), {atom(), atom(), list()}) -> reference(). diff --git a/src/rexi_server.erl b/src/rexi_server.erl index b2f6de021..49661cca4 100644 --- a/src/rexi_server.erl +++ b/src/rexi_server.erl @@ -15,7 +15,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([start_link/0, init_p/2, init_p/3]). +-export([start_link/1, init_p/2, init_p/3]). -include_lib("rexi/include/rexi.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -35,8 +35,8 @@ error_count = 0 }). -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +start_link(ServerId) -> + gen_server:start_link({local, ServerId}, ?MODULE, [], []). init([]) -> {ok, #st{}}. diff --git a/src/rexi_server_mon.erl b/src/rexi_server_mon.erl new file mode 100644 index 000000000..3545dba30 --- /dev/null +++ b/src/rexi_server_mon.erl @@ -0,0 +1,123 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(rexi_server_mon). +-behaviour(gen_server). + + +-export([ + start_link/0, + status/0 +]). + + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-define(SUP_MODULE, rexi_server_sup). +-define(CHILD_MODULE, rexi_server). +-define(INTERVAL, 60000). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +status() -> + gen_server:call(?MODULE, status). + + +init([]) -> + net_kernel:monitor_nodes(true), + erlang:send_after(?INTERVAL, self(), check_nodes), + {ok, nil}. + + +terminate(_Reason, _St) -> + ok. + + +handle_call(status, _From, St) -> + case missing_servers() of + [] -> + {reply, ok, St}; + Missing -> + {reply, {waiting, length(Missing)}, St} + end; + +handle_call(Msg, _From, St) -> + twig:log(notice, "~s ignored_call ~w", [?MODULE, Msg]), + {reply, ignored, St}. + + +handle_cast(Msg, St) -> + twig:log(notice, "~s ignored_cast ~w", [?MODULE, Msg]), + {noreply, St}. + + +handle_info({nodeup, _}, St) -> + start_rexi_servers(), + {noreply, St}; + +handle_info({nodedown, _}, St) -> + {noreply, St}; + +handle_info(check_nodes, St) -> + start_rexi_servers(), + erlang:send_after(?INTERVAL, self(), check_nodes), + {noreply, St}; + +handle_info(Msg, St) -> + twig:log(notice, "~s ignored_info ~w", [?MODULE, Msg]), + {noreply, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +start_rexi_servers() -> + lists:foreach(fun(Id) -> + {ok, _} = start_rexi_server(Id) + end, missing_servers()). + + +missing_servers() -> + ServerIds = [rexi_utils:server_id(Node) || Node <- [node() | nodes()]], + ChildIds = [Id || {Id, _, _, _} <- supervisor:which_children(?SUP_MODULE)], + ServerIds -- ChildIds. + + +start_rexi_server(ChildId) -> + ChildSpec = { + ChildId, + {rexi_server, start_link, [ChildId]}, + permanent, + brutal_kill, + worker, + [?CHILD_MODULE] + }, + case supervisor:start_child(?SUP_MODULE, ChildSpec) of + {ok, Pid} -> + {ok, Pid}; + Else -> + erlang:error(Else) + end. diff --git a/src/rexi_server_sup.erl b/src/rexi_server_sup.erl new file mode 100644 index 000000000..97b5b9e26 --- /dev/null +++ b/src/rexi_server_sup.erl @@ -0,0 +1,29 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(rexi_server_sup). +-behaviour(supervisor). + + +-export([init/1]). + +-export([start_link/0]). + + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + + +init([]) -> + {ok, {{one_for_one, 1, 1}, []}}. diff --git a/src/rexi_sup.erl b/src/rexi_sup.erl index a8aa80019..1c13eeaa2 100644 --- a/src/rexi_sup.erl +++ b/src/rexi_sup.erl @@ -12,17 +12,45 @@ -module(rexi_sup). -behaviour(supervisor). --export([init/1]). -export([start_link/1]). - --include_lib("eunit/include/eunit.hrl"). - -%% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 100, Type, [I]}). +-export([init/1]). start_link(Args) -> supervisor:start_link({local,?MODULE}, ?MODULE, Args). init([]) -> - {ok, {{one_for_one, 3, 10}, [?CHILD(rexi_gov_manager, worker), ?CHILD(rexi_server, worker)]}}. + {ok, {{one_for_one, 3, 10}, [ + { + rexi_gov_manager, + {rexi_gov_manager, start_link, []}, + permanent, + 100, + worker, + [rexi_gov_manager] + }, + { + rexi_server, + {rexi_server, start_link, [rexi_server]}, + permanent, + 100, + worker, + [rexi_server] + }, + { + rexi_server_sup, + {rexi_server_sup, start_link, []}, + permanent, + 100, + supervisor, + [rexi_server_sup] + }, + { + rexi_server_mon, + {rexi_server_mon, start_link, []}, + permanent, + 100, + worker, + [rexi_server_mon] + } + ]}}. diff --git a/src/rexi_utils.erl b/src/rexi_utils.erl index 1b11576a0..8bc119079 100644 --- a/src/rexi_utils.erl +++ b/src/rexi_utils.erl @@ -12,7 +12,15 @@ -module(rexi_utils). --export([send/2, recv/6]). +-export([server_id/1, server_pid/1, send/2, recv/6]). + +%% @doc Return a rexi_server id for the given node. +server_id(Node) -> + list_to_atom("rexi_server_" ++ integer_to_list(erlang:phash2(Node))). + +%% @doc Return a {server_id(node()), Node} Pid name for the given Node. +server_pid(Node) -> + {server_id(node()), Node}. %% @doc send a message as quickly as possible send(Dest, Msg) -> |