summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2013-03-18 22:51:29 -0500
committerRobert Newson <rnewson@apache.org>2014-07-23 17:56:17 +0100
commitf5907c4b89456d3120a4d1e1080d9203e6522816 (patch)
tree6dfc1110a7651ffc2926487d6e31b74fa45c9abb
parent6cd3b812c0bb5c956d8264aae0ffdd94bbd52314 (diff)
downloadcouchdb-f5907c4b89456d3120a4d1e1080d9203e6522816.tar.gz
[1/3] Start a rexi_server per remote node
This is part of a multi-release upgrade to switch to using a rexi_server instance per remote node. The first commit introduces the new rexi_server instances, the second switches to using the new instances and the third will remove the singleton rexi_server. Each of these commits should be in a separate release. The new pattern introduced by this commit will start a 'rexi_server_%b' process where the '%b' is replaced by the `erlang:phash2(Node)` for which it will service requests. After the second commit is release each rexi generated message will be sent to the rexi_server instance on the remote node handling requests for the local node. The `rexi_utils:server_pid/1` function is used to generate the id of the remote server.
-rw-r--r--src/rexi.erl12
-rw-r--r--src/rexi_server.erl6
-rw-r--r--src/rexi_server_mon.erl123
-rw-r--r--src/rexi_server_sup.erl29
-rw-r--r--src/rexi_sup.erl42
-rw-r--r--src/rexi_utils.erl10
6 files changed, 199 insertions, 23 deletions
diff --git a/src/rexi.erl b/src/rexi.erl
index 8e53dba0f..a4c786dc3 100644
--- a/src/rexi.erl
+++ b/src/rexi.erl
@@ -15,7 +15,6 @@
-export([cast/2, cast/3, cast/4, kill/2]).
-export([reply/1, sync_reply/1, sync_reply/2]).
-export([async_server_call/2, async_server_call/3]).
--export([get_errors/0, get_last_error/0, set_error_limit/1]).
-export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]).
-include_lib("rexi/include/rexi.hrl").
@@ -31,17 +30,6 @@ stop() ->
restart() ->
stop(), start().
--spec get_errors() -> {ok, [#error{}]}.
-get_errors() ->
- gen_server:call(?SERVER, get_errors).
-
--spec get_last_error() -> {ok, #error{}} | {error, empty}.
-get_last_error() ->
- gen_server:call(?SERVER, get_last_error).
-
--spec set_error_limit(pos_integer()) -> ok.
-set_error_limit(N) when is_integer(N), N > 0 ->
- gen_server:call(?SERVER, {set_error_limit, N}).
%% @equiv cast(Node, self(), MFA)
-spec cast(node(), {atom(), atom(), list()}) -> reference().
diff --git a/src/rexi_server.erl b/src/rexi_server.erl
index b2f6de021..49661cca4 100644
--- a/src/rexi_server.erl
+++ b/src/rexi_server.erl
@@ -15,7 +15,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([start_link/0, init_p/2, init_p/3]).
+-export([start_link/1, init_p/2, init_p/3]).
-include_lib("rexi/include/rexi.hrl").
-include_lib("eunit/include/eunit.hrl").
@@ -35,8 +35,8 @@
error_count = 0
}).
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(ServerId) ->
+ gen_server:start_link({local, ServerId}, ?MODULE, [], []).
init([]) ->
{ok, #st{}}.
diff --git a/src/rexi_server_mon.erl b/src/rexi_server_mon.erl
new file mode 100644
index 000000000..3545dba30
--- /dev/null
+++ b/src/rexi_server_mon.erl
@@ -0,0 +1,123 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(rexi_server_mon).
+-behaviour(gen_server).
+
+
+-export([
+ start_link/0,
+ status/0
+]).
+
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-define(SUP_MODULE, rexi_server_sup).
+-define(CHILD_MODULE, rexi_server).
+-define(INTERVAL, 60000).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+status() ->
+ gen_server:call(?MODULE, status).
+
+
+init([]) ->
+ net_kernel:monitor_nodes(true),
+ erlang:send_after(?INTERVAL, self(), check_nodes),
+ {ok, nil}.
+
+
+terminate(_Reason, _St) ->
+ ok.
+
+
+handle_call(status, _From, St) ->
+ case missing_servers() of
+ [] ->
+ {reply, ok, St};
+ Missing ->
+ {reply, {waiting, length(Missing)}, St}
+ end;
+
+handle_call(Msg, _From, St) ->
+ twig:log(notice, "~s ignored_call ~w", [?MODULE, Msg]),
+ {reply, ignored, St}.
+
+
+handle_cast(Msg, St) ->
+ twig:log(notice, "~s ignored_cast ~w", [?MODULE, Msg]),
+ {noreply, St}.
+
+
+handle_info({nodeup, _}, St) ->
+ start_rexi_servers(),
+ {noreply, St};
+
+handle_info({nodedown, _}, St) ->
+ {noreply, St};
+
+handle_info(check_nodes, St) ->
+ start_rexi_servers(),
+ erlang:send_after(?INTERVAL, self(), check_nodes),
+ {noreply, St};
+
+handle_info(Msg, St) ->
+ twig:log(notice, "~s ignored_info ~w", [?MODULE, Msg]),
+ {noreply, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+start_rexi_servers() ->
+ lists:foreach(fun(Id) ->
+ {ok, _} = start_rexi_server(Id)
+ end, missing_servers()).
+
+
+missing_servers() ->
+ ServerIds = [rexi_utils:server_id(Node) || Node <- [node() | nodes()]],
+ ChildIds = [Id || {Id, _, _, _} <- supervisor:which_children(?SUP_MODULE)],
+ ServerIds -- ChildIds.
+
+
+start_rexi_server(ChildId) ->
+ ChildSpec = {
+ ChildId,
+ {rexi_server, start_link, [ChildId]},
+ permanent,
+ brutal_kill,
+ worker,
+ [?CHILD_MODULE]
+ },
+ case supervisor:start_child(?SUP_MODULE, ChildSpec) of
+ {ok, Pid} ->
+ {ok, Pid};
+ Else ->
+ erlang:error(Else)
+ end.
diff --git a/src/rexi_server_sup.erl b/src/rexi_server_sup.erl
new file mode 100644
index 000000000..97b5b9e26
--- /dev/null
+++ b/src/rexi_server_sup.erl
@@ -0,0 +1,29 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(rexi_server_sup).
+-behaviour(supervisor).
+
+
+-export([init/1]).
+
+-export([start_link/0]).
+
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+ {ok, {{one_for_one, 1, 1}, []}}.
diff --git a/src/rexi_sup.erl b/src/rexi_sup.erl
index a8aa80019..1c13eeaa2 100644
--- a/src/rexi_sup.erl
+++ b/src/rexi_sup.erl
@@ -12,17 +12,45 @@
-module(rexi_sup).
-behaviour(supervisor).
--export([init/1]).
-export([start_link/1]).
-
--include_lib("eunit/include/eunit.hrl").
-
-%% Helper macro for declaring children of supervisor
--define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 100, Type, [I]}).
+-export([init/1]).
start_link(Args) ->
supervisor:start_link({local,?MODULE}, ?MODULE, Args).
init([]) ->
- {ok, {{one_for_one, 3, 10}, [?CHILD(rexi_gov_manager, worker), ?CHILD(rexi_server, worker)]}}.
+ {ok, {{one_for_one, 3, 10}, [
+ {
+ rexi_gov_manager,
+ {rexi_gov_manager, start_link, []},
+ permanent,
+ 100,
+ worker,
+ [rexi_gov_manager]
+ },
+ {
+ rexi_server,
+ {rexi_server, start_link, [rexi_server]},
+ permanent,
+ 100,
+ worker,
+ [rexi_server]
+ },
+ {
+ rexi_server_sup,
+ {rexi_server_sup, start_link, []},
+ permanent,
+ 100,
+ supervisor,
+ [rexi_server_sup]
+ },
+ {
+ rexi_server_mon,
+ {rexi_server_mon, start_link, []},
+ permanent,
+ 100,
+ worker,
+ [rexi_server_mon]
+ }
+ ]}}.
diff --git a/src/rexi_utils.erl b/src/rexi_utils.erl
index 1b11576a0..8bc119079 100644
--- a/src/rexi_utils.erl
+++ b/src/rexi_utils.erl
@@ -12,7 +12,15 @@
-module(rexi_utils).
--export([send/2, recv/6]).
+-export([server_id/1, server_pid/1, send/2, recv/6]).
+
+%% @doc Return a rexi_server id for the given node.
+server_id(Node) ->
+ list_to_atom("rexi_server_" ++ integer_to_list(erlang:phash2(Node))).
+
+%% @doc Return a {server_id(node()), Node} Pid name for the given Node.
+server_pid(Node) ->
+ {server_id(node()), Node}.
%% @doc send a message as quickly as possible
send(Dest, Msg) ->