summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2017-09-25 13:00:13 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2017-09-25 15:34:16 -0400
commit1eaf17890ab8d6bf7504355e4e32aaf2357b1398 (patch)
tree5aa3a0bc0ffa9b01027922331bf43ddb4468c584
parent8d1c7043731fbaa5f4f93243df5144416c946604 (diff)
downloadcouchdb-1eaf17890ab8d6bf7504355e4e32aaf2357b1398.tar.gz
Do not buffer rexi messages to disconnected nodes
Instead wait 15 seconds after last cluster configuration change, if there were no more changes to the cluster, stop rexi buffers and servers for nodes which are no longer connected. Extract and reuse cluster stability check from `couch_replicator_clustering` and move it to `mem3_cluster` module, so both replicator and rexi can use it. Users of `mem3_cluster` would implement a behavior callback API then spawn_link the cluster monitor with their specific period values. This also simplifies the logic in rexi_server_mon as it no longer needs to handle `{nodeup, _}` and `{nodedown, _}` messages. On any cluster membership change it will get a `cluster_unstable` message. It then immediately spawns new servers and buffers if needed. Only when cluster has stabilized it will stop servers and buffers for disconnected nodes. The idea is to allow for short periods of disconnects between nodes before throwing away all the buffered messages.
-rw-r--r--src/couch_replicator/src/couch_replicator_clustering.erl116
-rw-r--r--src/mem3/src/mem3_cluster.erl161
-rw-r--r--src/mem3/test/mem3_cluster_test.erl133
-rw-r--r--src/rexi/src/rexi_server_mon.erl84
4 files changed, 396 insertions, 98 deletions
diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl
index 7618f24d6..ed01465d5 100644
--- a/src/couch_replicator/src/couch_replicator_clustering.erl
+++ b/src/couch_replicator/src/couch_replicator_clustering.erl
@@ -28,6 +28,7 @@
-behaviour(gen_server).
-behaviour(config_listener).
+-behaviour(mem3_cluster).
-export([
start_link/0
@@ -55,6 +56,12 @@
handle_config_terminate/3
]).
+% mem3_cluster callbacks
+-export([
+ cluster_stable/1,
+ cluster_unstable/1
+]).
+
-include_lib("couch/include/couch_db.hrl").
-include_lib("mem3/include/mem3.hrl").
@@ -63,11 +70,8 @@
-define(RELISTEN_DELAY, 5000).
-record(state, {
- start_time :: erlang:timestamp(),
- last_change :: erlang:timestamp(),
- period = ?DEFAULT_QUIET_PERIOD :: non_neg_integer(),
- start_period = ?DEFAULT_START_PERIOD :: non_neg_integer(),
- timer :: reference()
+ mem3_cluster_pid :: pid(),
+ cluster_stable :: boolean()
}).
@@ -115,64 +119,55 @@ link_cluster_event_listener(Mod, Fun, Args)
Pid.
+% Mem3 cluster callbacks
+
+cluster_unstable(Server) ->
+ couch_replicator_notifier:notify({cluster, unstable}),
+ couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
+ couch_log:notice("~s : cluster unstable", [?MODULE]),
+ gen_server:cast(Server, cluster_unstable),
+ Server.
+
+cluster_stable(Server) ->
+ couch_replicator_notifier:notify({cluster, stable}),
+ couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
+ couch_log:notice("~s : cluster stable", [?MODULE]),
+ gen_server:cast(Server, cluster_stable),
+ Server.
+
+
% gen_server callbacks
init([]) ->
- net_kernel:monitor_nodes(true),
ok = config:listen_for_changes(?MODULE, nil),
Period = abs(config:get_integer("replicator", "cluster_quiet_period",
?DEFAULT_QUIET_PERIOD)),
StartPeriod = abs(config:get_integer("replicator", "cluster_start_period",
?DEFAULT_START_PERIOD)),
- couch_log:debug("Initialized clustering gen_server ~w", [self()]),
couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
- {ok, #state{
- start_time = os:timestamp(),
- last_change = os:timestamp(),
- period = Period,
- start_period = StartPeriod,
- timer = new_timer(StartPeriod)
- }}.
+ {ok, Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), StartPeriod,
+ Period),
+ {ok, #state{mem3_cluster_pid = Mem3Cluster, cluster_stable = false}}.
terminate(_Reason, _State) ->
ok.
-handle_call(is_stable, _From, State) ->
- {reply, is_stable(State), State}.
+handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) ->
+ {reply, IsStable, State}.
-handle_cast({set_period, QuietPeriod}, State) ->
- {noreply, State#state{period = QuietPeriod}}.
+handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) ->
+ ok = mem3_cluster:set_period(Pid, Period),
+ {noreply, State};
+handle_cast(cluster_stable, State) ->
+ {noreply, State#state{cluster_stable = true}};
-handle_info({nodeup, Node}, State) ->
- Timer = new_timer(interval(State)),
- couch_replicator_notifier:notify({cluster, unstable}),
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
- couch_log:notice("~s : nodeup ~s, cluster unstable", [?MODULE, Node]),
- {noreply, State#state{last_change = os:timestamp(), timer = Timer}};
+handle_cast(cluster_unstable, State) ->
+ {noreply, State#state{cluster_stable = false}}.
-handle_info({nodedown, Node}, State) ->
- Timer = new_timer(interval(State)),
- couch_replicator_notifier:notify({cluster, unstable}),
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
- couch_log:notice("~s : nodedown ~s, cluster unstable", [?MODULE, Node]),
- {noreply, State#state{last_change = os:timestamp(), timer = Timer}};
-
-handle_info(stability_check, State) ->
- erlang:cancel_timer(State#state.timer),
- case is_stable(State) of
- true ->
- couch_replicator_notifier:notify({cluster, stable}),
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
- couch_log:notice("~s : publish cluster `stable` event", [?MODULE]),
- {noreply, State};
- false ->
- Timer = new_timer(interval(State)),
- {noreply, State#state{timer = Timer}}
- end;
handle_info(restart_config_listener, State) ->
ok = config:listen_for_changes(?MODULE, nil),
@@ -185,41 +180,6 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
--spec new_timer(non_neg_integer()) -> reference().
-new_timer(IntervalSec) ->
- erlang:send_after(IntervalSec * 1000, self(), stability_check).
-
-
-% For the first Period seconds after node boot we check cluster stability every
-% StartPeriod seconds. Once the initial Period seconds have passed we continue
-% to monitor once every Period seconds
--spec interval(#state{}) -> non_neg_integer().
-interval(#state{period = Period, start_period = StartPeriod,
- start_time = T0}) ->
- case now_diff_sec(T0) > Period of
- true ->
- % Normal operation
- Period;
- false ->
- % During startup
- StartPeriod
- end.
-
-
--spec is_stable(#state{}) -> boolean().
-is_stable(#state{last_change = TS} = State) ->
- now_diff_sec(TS) > interval(State).
-
-
--spec now_diff_sec(erlang:timestamp()) -> non_neg_integer().
-now_diff_sec(Time) ->
- case timer:now_diff(os:timestamp(), Time) of
- USec when USec < 0 ->
- 0;
- USec when USec >= 0 ->
- USec / 1000000
- end.
-
handle_config_change("replicator", "cluster_quiet_period", V, _, S) ->
ok = gen_server:cast(?MODULE, {set_period, list_to_integer(V)}),
diff --git a/src/mem3/src/mem3_cluster.erl b/src/mem3/src/mem3_cluster.erl
new file mode 100644
index 000000000..7e3d477cb
--- /dev/null
+++ b/src/mem3/src/mem3_cluster.erl
@@ -0,0 +1,161 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Maintain cluster stability information. A cluster is considered stable if there
+% were no changes to during a given period of time.
+%
+% To be notified of cluster stability / instability the owner module must
+% implement the mem3_cluster behavior. When cluster membership changes,
+% cluster_unstable behavior callback will be called. After that is are no more
+% changes to the cluster, then cluster_stable callback will be called.
+%
+% The period is passed in as start argument but it can also be set dynamically
+% via the set_period/2 API call.
+%
+% In some cases it might be useful to have a shorter pariod during startup.
+% That can be configured via the StartPeriod argument. If the time since start
+% is less than a full period, then the StartPeriod is used as the period.
+
+
+-module(mem3_cluster).
+
+-behaviour(gen_server).
+
+-export([
+ start_link/4,
+ set_period/2
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-callback cluster_stable(Context :: term()) -> NewContext :: term().
+-callback cluster_unstable(Context :: term()) -> NewContext :: term().
+
+
+-record(state, {
+ mod :: atom(),
+ ctx :: term(),
+ start_time :: erlang:timestamp(),
+ last_change :: erlang:timestamp(),
+ period :: integer(),
+ start_period :: integer(),
+ timer :: reference()
+}).
+
+
+-spec start_link(module(), term(), integer(), integer()) ->
+ {ok, pid()} | ignore | {error, term()}.
+start_link(Module, Context, StartPeriod, Period)
+ when is_atom(Module), is_integer(StartPeriod), is_integer(Period) ->
+ gen_server:start_link(?MODULE, [Module, Context, StartPeriod, Period], []).
+
+
+-spec set_period(pid(), integer()) -> ok.
+set_period(Server, Period) when is_pid(Server), is_integer(Period) ->
+ gen_server:cast(Server, {set_period, Period}).
+
+
+% gen_server callbacks
+
+init([Module, Context, StartPeriod, Period]) ->
+ net_kernel:monitor_nodes(true),
+ {ok, #state{
+ mod = Module,
+ ctx = Context,
+ start_time = os:timestamp(),
+ last_change = os:timestamp(),
+ period = Period,
+ start_period = StartPeriod,
+ timer = new_timer(StartPeriod)
+ }}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+handle_call(_Msg, _From, State) ->
+ {reply, ignored, State}.
+
+
+handle_cast({set_period, Period}, State) ->
+ {noreply, State#state{period = Period}}.
+
+
+handle_info({nodeup, _Node}, State) ->
+ {noreply, cluster_changed(State)};
+
+handle_info({nodedown, _Node}, State) ->
+ {noreply, cluster_changed(State)};
+
+handle_info(stability_check, #state{mod = Mod, ctx = Ctx} = State) ->
+ erlang:cancel_timer(State#state.timer),
+ case now_diff_sec(State#state.last_change) > interval(State) of
+ true ->
+ {noreply, State#state{ctx = Mod:cluster_stable(Ctx)}};
+ false ->
+ Timer = new_timer(interval(State)),
+ {noreply, State#state{timer = Timer}}
+ end.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+%% Internal functions
+
+-spec cluster_changed(#state{}) -> #state{}.
+cluster_changed(#state{mod = Mod, ctx = Ctx} = State) ->
+ State#state{
+ last_change = os:timestamp(),
+ timer = new_timer(interval(State)),
+ ctx = Mod:cluster_unstable(Ctx)
+ }.
+
+
+-spec new_timer(non_neg_integer()) -> reference().
+new_timer(IntervalSec) ->
+ erlang:send_after(IntervalSec * 1000, self(), stability_check).
+
+
+% For the first Period seconds after node boot we check cluster stability every
+% StartPeriod seconds. Once the initial Period seconds have passed we continue
+% to monitor once every Period seconds
+-spec interval(#state{}) -> non_neg_integer().
+interval(#state{period = Period, start_period = StartPeriod,
+ start_time = T0}) ->
+ case now_diff_sec(T0) > Period of
+ true ->
+ % Normal operation
+ Period;
+ false ->
+ % During startup
+ StartPeriod
+ end.
+
+
+-spec now_diff_sec(erlang:timestamp()) -> non_neg_integer().
+now_diff_sec(Time) ->
+ case timer:now_diff(os:timestamp(), Time) of
+ USec when USec < 0 ->
+ 0;
+ USec when USec >= 0 ->
+ USec / 1000000
+ end.
diff --git a/src/mem3/test/mem3_cluster_test.erl b/src/mem3/test/mem3_cluster_test.erl
new file mode 100644
index 000000000..4610d64bd
--- /dev/null
+++ b/src/mem3/test/mem3_cluster_test.erl
@@ -0,0 +1,133 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_cluster_test).
+
+-behavior(mem3_cluster).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-export([
+ cluster_unstable/1,
+ cluster_stable/1
+]).
+
+
+% Mem3 cluster callbacks
+
+cluster_unstable(Server) ->
+ Server ! cluster_unstable,
+ Server.
+
+cluster_stable(Server) ->
+ Server ! cluster_stable,
+ Server.
+
+
+mem3_cluster_test_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ t_cluster_stable_during_startup_period(),
+ t_cluster_unstable_delivered_on_nodeup(),
+ t_cluster_unstable_delivered_on_nodedown(),
+ t_wait_period_is_reset_after_last_change()
+ ]
+ }.
+
+
+t_cluster_stable_during_startup_period() ->
+ ?_test(begin
+ {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 2),
+ register(?MODULE, Pid),
+ receive
+ cluster_stable ->
+ ?assert(true)
+ after 1500 ->
+ throw(timeout)
+ end,
+ unlink(Pid),
+ exit(Pid, kill)
+ end).
+
+
+t_cluster_unstable_delivered_on_nodeup() ->
+ ?_test(begin
+ {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 2),
+ register(?MODULE, Pid),
+ Pid ! {nodeup, node()},
+ receive
+ cluster_unstable ->
+ ?assert(true)
+ after 1000 ->
+ throw(timeout)
+ end,
+ unlink(Pid),
+ exit(Pid, kill)
+ end).
+
+
+t_cluster_unstable_delivered_on_nodedown() ->
+ ?_test(begin
+ {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 2),
+ register(?MODULE, Pid),
+ Pid ! {nodedown, node()},
+ receive
+ cluster_unstable ->
+ ?assert(true)
+ after 1000 ->
+ throw(timeout)
+ end,
+ unlink(Pid),
+ exit(Pid, kill)
+ end).
+
+
+t_wait_period_is_reset_after_last_change() ->
+ ?_test(begin
+ {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 1),
+ register(?MODULE, Pid),
+ timer:sleep(800),
+ Pid ! {nodeup, node()}, % after 800 sec send a nodeup
+ receive
+ cluster_stable ->
+ ?assert(false)
+ after 400 ->
+ ?assert(true) % stability check should have been reset
+ end,
+ timer:sleep(1000),
+ receive
+ cluster_stable ->
+ ?assert(true)
+ after 0 ->
+ ?assert(false) % cluster_stable arrives after enough quiet time
+ end,
+ unlink(Pid),
+ exit(Pid, kill)
+ end).
+
+
+% Test helper functions
+
+setup() ->
+ ok.
+
+teardown(_) ->
+ case whereis(?MODULE) of
+ undefined ->
+ ok;
+ Pid when is_pid(Pid) ->
+ unlink(Pid),
+ exit(Pid, kill)
+ end.
diff --git a/src/rexi/src/rexi_server_mon.erl b/src/rexi/src/rexi_server_mon.erl
index e6b5eb98e..86fecaff6 100644
--- a/src/rexi/src/rexi_server_mon.erl
+++ b/src/rexi/src/rexi_server_mon.erl
@@ -14,6 +14,7 @@
-module(rexi_server_mon).
-behaviour(gen_server).
+-behaviour(mem3_cluster).
-vsn(1).
@@ -32,8 +33,13 @@
code_change/3
]).
+-export([
+ cluster_stable/1,
+ cluster_unstable/1
+]).
--define(INTERVAL, 60000).
+
+-define(CLUSTER_STABILITY_PERIOD_SEC, 15).
start_link(ChildMod) ->
@@ -45,9 +51,23 @@ status() ->
gen_server:call(?MODULE, status).
+% Mem3 cluster callbacks
+
+cluster_unstable(Server) ->
+ couch_log:notice("~s : cluster unstable", [?MODULE]),
+ gen_server:cast(Server, cluster_unstable),
+ Server.
+
+cluster_stable(Server) ->
+ gen_server:cast(Server, cluster_stable),
+ Server.
+
+
+% gen_server callbacks
+
init(ChildMod) ->
- net_kernel:monitor_nodes(true),
- erlang:send(self(), check_nodes),
+ {ok, _Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(),
+ ?CLUSTER_STABILITY_PERIOD_SEC, ?CLUSTER_STABILITY_PERIOD_SEC),
{ok, ChildMod}.
@@ -67,24 +87,27 @@ handle_call(Msg, _From, St) ->
couch_log:notice("~s ignored_call ~w", [?MODULE, Msg]),
{reply, ignored, St}.
-
-handle_cast(Msg, St) ->
- couch_log:notice("~s ignored_cast ~w", [?MODULE, Msg]),
- {noreply, St}.
-
-
-handle_info({nodeup, _}, ChildMod) ->
+% If cluster is unstable a node was added or just removed. Check if any nodes
+% can be started, but do not immediately stop nodes, defer that till cluster
+% stabilized.
+handle_cast(cluster_unstable, ChildMod) ->
+ couch_log:notice("~s : cluster unstable", [ChildMod]),
start_servers(ChildMod),
{noreply, ChildMod};
-handle_info({nodedown, _}, St) ->
- {noreply, St};
-
-handle_info(check_nodes, ChildMod) ->
+% When cluster is stable, start any servers for new nodes and stop servers for
+% the ones that disconnected.
+handle_cast(cluster_stable, ChildMod) ->
+ couch_log:notice("~s : cluster stable", [ChildMod]),
start_servers(ChildMod),
- erlang:send_after(?INTERVAL, self(), check_nodes),
+ stop_servers(ChildMod),
{noreply, ChildMod};
+handle_cast(Msg, St) ->
+ couch_log:notice("~s ignored_cast ~w", [?MODULE, Msg]),
+ {noreply, St}.
+
+
handle_info(Msg, St) ->
couch_log:notice("~s ignored_info ~w", [?MODULE, Msg]),
{noreply, St}.
@@ -101,13 +124,27 @@ start_servers(ChildMod) ->
{ok, _} = start_server(ChildMod, Id)
end, missing_servers(ChildMod)).
+stop_servers(ChildMod) ->
+ lists:foreach(fun(Id) ->
+ ok = stop_server(ChildMod, Id)
+ end, extra_servers(ChildMod)).
+
+
+server_ids(ChildMod) ->
+ Nodes = [node() | nodes()],
+ [list_to_atom(lists:concat([ChildMod, "_", Node])) || Node <- Nodes].
+
+
+running_servers(ChildMod) ->
+ [Id || {Id, _, _, _} <- supervisor:which_children(sup_module(ChildMod))].
+
missing_servers(ChildMod) ->
- ServerIds = [list_to_atom(lists:concat([ChildMod, "_", Node]))
- || Node <- [node() | nodes()]],
- SupModule = sup_module(ChildMod),
- ChildIds = [Id || {Id, _, _, _} <- supervisor:which_children(SupModule)],
- ServerIds -- ChildIds.
+ server_ids(ChildMod) -- running_servers(ChildMod).
+
+
+extra_servers(ChildMod) ->
+ running_servers(ChildMod) -- server_ids(ChildMod).
start_server(ChildMod, ChildId) ->
@@ -126,5 +163,12 @@ start_server(ChildMod, ChildId) ->
erlang:error(Else)
end.
+
+stop_server(ChildMod, ChildId) ->
+ SupMod = sup_module(ChildMod),
+ ok = supervisor:terminate_child(SupMod, ChildId),
+ ok = supervisor:delete_child(SupMod, ChildId).
+
+
sup_module(ChildMod) ->
list_to_atom(lists:concat([ChildMod, "_sup"])).