summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@gmail.com>2022-09-19 17:39:19 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2022-09-21 17:30:12 -0400
commitc4e8f59569d35bb02538a446d6ab8bcc4bdddf10 (patch)
tree447220417baae9deab8793446ba0a19d4bac8313
parent21eebad0fb6ea62786d915b29797983be537908a (diff)
downloadcouchdb-c4e8f59569d35bb02538a446d6ab8bcc4bdddf10.tar.gz
Explicitly maintain a fully connected cluster
Previously, it was possible for the nodes to disconnect, and for that state to persist util the nodes restarted. Some fabric requests could reconnect the nodes as a side-effect of sending remote messages, but most of the fabric requests currently start a rexi monitor, which immediately delivers a `rexi_DOWN` message to the coordinator for worker nodes not in the `[node() | nodes()]` list. That happens before `erlang:send/2,3` gets called, so there is nothing there to eventually reconnect the nodes. To avoid relying on the random request reconnecting the cluster, use an explicit monitor process. It does the initial connections, as well as periodically maintains them. [1] https://github.com/apache/couchdb/blob/main/src/rexi/src/rexi_monitor.erl#L45
-rw-r--r--rel/overlay/etc/default.ini5
-rw-r--r--src/docs/src/config/cluster.rst8
-rw-r--r--src/mem3/src/mem3_distribution.erl93
-rw-r--r--src/mem3/src/mem3_sup.erl1
-rw-r--r--src/mem3/src/mem3_sync.erl1
-rw-r--r--src/mem3/src/mem3_sync_event.erl2
-rw-r--r--src/mem3/test/eunit/mem3_distribution_test.erl74
7 files changed, 182 insertions, 2 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index b989ba3fa..1b1f6111d 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -124,6 +124,11 @@ couch = couch_bt_engine
; of the ``_nodes``, ``_dbs``, and ``_users`` system databases.
; seedlist = couchdb@node1.example.com,couchdb@node2.example.com
+; Period in seconds specifying how often to attempt reconnecting to
+; disconnected nodes. There is a 25% random jitter applied to this
+; value.
+;reconnect_interval_sec = 37
+
[chttpd]
; These settings affect the main, clustered port (5984 by default).
port = {{cluster_port}}
diff --git a/src/docs/src/config/cluster.rst b/src/docs/src/config/cluster.rst
index 8801b9cde..72959829c 100644
--- a/src/docs/src/config/cluster.rst
+++ b/src/docs/src/config/cluster.rst
@@ -87,6 +87,14 @@ Cluster Options
[cluster]
seedlist = couchdb@node1.example.com,couchdb@node2.example.com
+ .. config:option:: reconnect_interval_sec:: Cluster connectivity check period.
+
+ .. versionadded:: 3.3
+
+ Period in seconds specifying how often to attempt reconnecting to
+ disconnected nodes. There is a 25% random jitter applied to this
+ value.
+
RPC Performance Tuning
======================
diff --git a/src/mem3/src/mem3_distribution.erl b/src/mem3/src/mem3_distribution.erl
new file mode 100644
index 000000000..a2b77de8a
--- /dev/null
+++ b/src/mem3/src/mem3_distribution.erl
@@ -0,0 +1,93 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% This module is in charge of keeping the cluster connected. If nodes
+% disconnect they are reconnected with net_kernel:connect_node/1.
+
+-module(mem3_distribution).
+
+-behaviour(gen_server).
+
+-export([
+ start_link/0,
+ connect_node/1
+]).
+
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+-define(JITTER_PERCENT, 0.25).
+
+-record(st, {
+ tref
+}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+connect_node(Node) when is_atom(Node) ->
+ net_kernel:connect_node(Node).
+
+init(_) ->
+ connect(false),
+ {ok, #st{tref = erlang:send_after(wait_msec(), self(), connect)}}.
+
+handle_call(Msg, _From, #st{} = St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+handle_cast(Msg, #st{} = St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+handle_info(connect, #st{} = St) ->
+ erlang:cancel_timer(St#st.tref),
+ ok = connect(true),
+ {noreply, St#st{tref = erlang:send_after(wait_msec(), self(), connect)}};
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+code_change(_OldVsn, #st{} = St, _Extra) ->
+ {ok, St}.
+
+connect(Log) ->
+ Expected = ordsets:from_list([N || N <- mem3:nodes(), N =/= node()]),
+ Connected = ordsets:from_list(nodes()),
+ NotConnected = ordsets:subtract(Expected, Connected),
+ connect(ordsets:to_list(NotConnected), Log).
+
+connect([], _Log) ->
+ ok;
+connect([N | Rest], Log) ->
+ ConnectRes = ?MODULE:connect_node(N),
+ log(Log, ConnectRes, N),
+ connect(Rest, Log).
+
+log(true, true, Node) ->
+ couch_log:warning("~s : reconnected to ~s", [?MODULE, Node]),
+ ok;
+log(_, _, _) ->
+ % Failed to connect or we don't want to log it
+ ok.
+
+wait_msec() ->
+ IntervalSec = config:get_integer("cluster", "reconnect_interval_sec", 37),
+ IntervalMSec = IntervalSec * 1000,
+ IntervalMSec + jitter(IntervalMSec).
+
+jitter(Interval) ->
+ Jitter = round(Interval * ?JITTER_PERCENT),
+ % rand:uniform(0) crashes!
+ rand:uniform(max(1, Jitter)).
diff --git a/src/mem3/src/mem3_sup.erl b/src/mem3/src/mem3_sup.erl
index a2dc5ba8d..862ef6b50 100644
--- a/src/mem3/src/mem3_sup.erl
+++ b/src/mem3/src/mem3_sup.erl
@@ -21,6 +21,7 @@ init(_Args) ->
Children = [
child(mem3_events),
child(mem3_nodes),
+ child(mem3_distribution),
child(mem3_seeds),
% Order important?
child(mem3_sync_nodes),
diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl
index 3d1c18420..179435965 100644
--- a/src/mem3/src/mem3_sync.erl
+++ b/src/mem3/src/mem3_sync.erl
@@ -266,7 +266,6 @@ sync_nodes_and_dbs() ->
[push(Db, Node) || Db <- local_dbs()].
initial_sync() ->
- [net_kernel:connect_node(Node) || Node <- mem3:nodes()],
mem3_sync_nodes:add(nodes()).
initial_sync(Live) ->
diff --git a/src/mem3/src/mem3_sync_event.erl b/src/mem3/src/mem3_sync_event.erl
index ec6debb45..de9d3e74e 100644
--- a/src/mem3/src/mem3_sync_event.erl
+++ b/src/mem3/src/mem3_sync_event.erl
@@ -28,7 +28,7 @@ init(_) ->
{ok, nil}.
handle_event({add_node, Node}, State) when Node =/= node() ->
- net_kernel:connect_node(Node),
+ mem3_distribution:connect_node(Node),
mem3_sync_nodes:add([Node]),
{ok, State};
handle_event({remove_node, Node}, State) ->
diff --git a/src/mem3/test/eunit/mem3_distribution_test.erl b/src/mem3/test/eunit/mem3_distribution_test.erl
new file mode 100644
index 000000000..d442a47b3
--- /dev/null
+++ b/src/mem3/test/eunit/mem3_distribution_test.erl
@@ -0,0 +1,74 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_distribution_test).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end).
+-define(MOD, mem3_distribution).
+
+setup() ->
+ Ctx = test_util:start_couch([mem3]),
+ meck:new(mem3, [passthrough]),
+ meck:new(mem3_distribution, [passthrough]),
+ meck:new(couch_log, [passthrough]),
+ Ctx.
+
+teardown(Ctx) ->
+ meck:unload(),
+ test_util:stop_couch(Ctx).
+
+mem3_distribution_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(periodic_scheduler_works),
+ ?TDEF_FE(connect_to_unconnected_nodes)
+ ]
+ }.
+
+periodic_scheduler_works(_) ->
+ St = sys:get_state(?MOD),
+ {st, TRef} = St,
+ TVal = erlang:read_timer(TRef),
+ ?assert(is_integer(TVal)),
+ ?assert(TVal > 0),
+ ?assert(TVal =< 70000),
+ {noreply, St1} = ?MOD:handle_info(connect, St),
+ {st, TRef1} = St1,
+ ?assertNotEqual(TRef, TRef1),
+ TVal1 = erlang:read_timer(TRef1),
+ ?assert(is_integer(TVal1)).
+
+connect_to_unconnected_nodes(_) ->
+ Nodes = ['foo', 'bar'],
+ meck:expect(mem3, nodes, 0, Nodes),
+ meck:reset(?MOD),
+ % Simulate connect timer expiry
+ ?MOD ! connect,
+ meck:wait(?MOD, connect_node, [foo], 5000),
+ meck:wait(?MOD, connect_node, [bar], 5000),
+ % connect_node returns false => no reconnection log
+ timer:sleep(100),
+ ?assertEqual(0, meck:num_calls(couch_log, warning, 2)),
+ % Make connect return true
+ meck:reset(?MOD),
+ meck:expect(?MOD, connect_node, 1, true),
+ % Simulate connect timer expiry
+ ?MOD ! connect,
+ meck:wait(?MOD, connect_node, [foo], 5000),
+ meck:wait(?MOD, connect_node, [bar], 5000),
+ % connect_node returns true => emit reconnection log
+ meck:wait(2, couch_log, warning, 2, 5000).