summaryrefslogtreecommitdiff
path: root/src/rexi
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2017-09-25 13:00:13 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2017-09-25 15:34:16 -0400
commit1eaf17890ab8d6bf7504355e4e32aaf2357b1398 (patch)
tree5aa3a0bc0ffa9b01027922331bf43ddb4468c584 /src/rexi
parent8d1c7043731fbaa5f4f93243df5144416c946604 (diff)
downloadcouchdb-1eaf17890ab8d6bf7504355e4e32aaf2357b1398.tar.gz
Do not buffer rexi messages to disconnected nodes
Instead wait 15 seconds after last cluster configuration change, if there were no more changes to the cluster, stop rexi buffers and servers for nodes which are no longer connected. Extract and reuse cluster stability check from `couch_replicator_clustering` and move it to `mem3_cluster` module, so both replicator and rexi can use it. Users of `mem3_cluster` would implement a behavior callback API then spawn_link the cluster monitor with their specific period values. This also simplifies the logic in rexi_server_mon as it no longer needs to handle `{nodeup, _}` and `{nodedown, _}` messages. On any cluster membership change it will get a `cluster_unstable` message. It then immediately spawns new servers and buffers if needed. Only when cluster has stabilized it will stop servers and buffers for disconnected nodes. The idea is to allow for short periods of disconnects between nodes before throwing away all the buffered messages.
Diffstat (limited to 'src/rexi')
-rw-r--r--src/rexi/src/rexi_server_mon.erl84
1 files changed, 64 insertions, 20 deletions
diff --git a/src/rexi/src/rexi_server_mon.erl b/src/rexi/src/rexi_server_mon.erl
index e6b5eb98e..86fecaff6 100644
--- a/src/rexi/src/rexi_server_mon.erl
+++ b/src/rexi/src/rexi_server_mon.erl
@@ -14,6 +14,7 @@
-module(rexi_server_mon).
-behaviour(gen_server).
+-behaviour(mem3_cluster).
-vsn(1).
@@ -32,8 +33,13 @@
code_change/3
]).
+-export([
+ cluster_stable/1,
+ cluster_unstable/1
+]).
--define(INTERVAL, 60000).
+
+-define(CLUSTER_STABILITY_PERIOD_SEC, 15).
start_link(ChildMod) ->
@@ -45,9 +51,23 @@ status() ->
gen_server:call(?MODULE, status).
+% Mem3 cluster callbacks
+
+cluster_unstable(Server) ->
+ couch_log:notice("~s : cluster unstable", [?MODULE]),
+ gen_server:cast(Server, cluster_unstable),
+ Server.
+
+cluster_stable(Server) ->
+ gen_server:cast(Server, cluster_stable),
+ Server.
+
+
+% gen_server callbacks
+
init(ChildMod) ->
- net_kernel:monitor_nodes(true),
- erlang:send(self(), check_nodes),
+ {ok, _Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(),
+ ?CLUSTER_STABILITY_PERIOD_SEC, ?CLUSTER_STABILITY_PERIOD_SEC),
{ok, ChildMod}.
@@ -67,24 +87,27 @@ handle_call(Msg, _From, St) ->
couch_log:notice("~s ignored_call ~w", [?MODULE, Msg]),
{reply, ignored, St}.
-
-handle_cast(Msg, St) ->
- couch_log:notice("~s ignored_cast ~w", [?MODULE, Msg]),
- {noreply, St}.
-
-
-handle_info({nodeup, _}, ChildMod) ->
+% If cluster is unstable a node was added or just removed. Check if any nodes
+% can be started, but do not immediately stop nodes, defer that till cluster
+% stabilized.
+handle_cast(cluster_unstable, ChildMod) ->
+ couch_log:notice("~s : cluster unstable", [ChildMod]),
start_servers(ChildMod),
{noreply, ChildMod};
-handle_info({nodedown, _}, St) ->
- {noreply, St};
-
-handle_info(check_nodes, ChildMod) ->
+% When cluster is stable, start any servers for new nodes and stop servers for
+% the ones that disconnected.
+handle_cast(cluster_stable, ChildMod) ->
+ couch_log:notice("~s : cluster stable", [ChildMod]),
start_servers(ChildMod),
- erlang:send_after(?INTERVAL, self(), check_nodes),
+ stop_servers(ChildMod),
{noreply, ChildMod};
+handle_cast(Msg, St) ->
+ couch_log:notice("~s ignored_cast ~w", [?MODULE, Msg]),
+ {noreply, St}.
+
+
handle_info(Msg, St) ->
couch_log:notice("~s ignored_info ~w", [?MODULE, Msg]),
{noreply, St}.
@@ -101,13 +124,27 @@ start_servers(ChildMod) ->
{ok, _} = start_server(ChildMod, Id)
end, missing_servers(ChildMod)).
+stop_servers(ChildMod) ->
+ lists:foreach(fun(Id) ->
+ ok = stop_server(ChildMod, Id)
+ end, extra_servers(ChildMod)).
+
+
+server_ids(ChildMod) ->
+ Nodes = [node() | nodes()],
+ [list_to_atom(lists:concat([ChildMod, "_", Node])) || Node <- Nodes].
+
+
+running_servers(ChildMod) ->
+ [Id || {Id, _, _, _} <- supervisor:which_children(sup_module(ChildMod))].
+
missing_servers(ChildMod) ->
- ServerIds = [list_to_atom(lists:concat([ChildMod, "_", Node]))
- || Node <- [node() | nodes()]],
- SupModule = sup_module(ChildMod),
- ChildIds = [Id || {Id, _, _, _} <- supervisor:which_children(SupModule)],
- ServerIds -- ChildIds.
+ server_ids(ChildMod) -- running_servers(ChildMod).
+
+
+extra_servers(ChildMod) ->
+ running_servers(ChildMod) -- server_ids(ChildMod).
start_server(ChildMod, ChildId) ->
@@ -126,5 +163,12 @@ start_server(ChildMod, ChildId) ->
erlang:error(Else)
end.
+
+stop_server(ChildMod, ChildId) ->
+ SupMod = sup_module(ChildMod),
+ ok = supervisor:terminate_child(SupMod, ChildId),
+ ok = supervisor:delete_child(SupMod, ChildId).
+
+
sup_module(ChildMod) ->
list_to_atom(lists:concat([ChildMod, "_sup"])).