diff options
author | Nick Vatamaniuc <vatamane@gmail.com> | 2022-09-19 17:39:19 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2022-09-21 17:30:12 -0400 |
commit | c4e8f59569d35bb02538a446d6ab8bcc4bdddf10 (patch) | |
tree | 447220417baae9deab8793446ba0a19d4bac8313 | |
parent | 21eebad0fb6ea62786d915b29797983be537908a (diff) | |
download | couchdb-c4e8f59569d35bb02538a446d6ab8bcc4bdddf10.tar.gz |
Explicitly maintain a fully connected cluster
Previously, it was possible for the nodes to disconnect, and for that state to
persist util the nodes restarted. Some fabric requests could reconnect the
nodes as a side-effect of sending remote messages, but most of the fabric
requests currently start a rexi monitor, which immediately delivers a
`rexi_DOWN` message to the coordinator for worker nodes not in the `[node() |
nodes()]` list. That happens before `erlang:send/2,3` gets called, so there is
nothing there to eventually reconnect the nodes.
To avoid relying on the random request reconnecting the cluster, use an
explicit monitor process. It does the initial connections, as well as
periodically maintains them.
[1] https://github.com/apache/couchdb/blob/main/src/rexi/src/rexi_monitor.erl#L45
-rw-r--r-- | rel/overlay/etc/default.ini | 5 | ||||
-rw-r--r-- | src/docs/src/config/cluster.rst | 8 | ||||
-rw-r--r-- | src/mem3/src/mem3_distribution.erl | 93 | ||||
-rw-r--r-- | src/mem3/src/mem3_sup.erl | 1 | ||||
-rw-r--r-- | src/mem3/src/mem3_sync.erl | 1 | ||||
-rw-r--r-- | src/mem3/src/mem3_sync_event.erl | 2 | ||||
-rw-r--r-- | src/mem3/test/eunit/mem3_distribution_test.erl | 74 |
7 files changed, 182 insertions, 2 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index b989ba3fa..1b1f6111d 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -124,6 +124,11 @@ couch = couch_bt_engine ; of the ``_nodes``, ``_dbs``, and ``_users`` system databases. ; seedlist = couchdb@node1.example.com,couchdb@node2.example.com +; Period in seconds specifying how often to attempt reconnecting to +; disconnected nodes. There is a 25% random jitter applied to this +; value. +;reconnect_interval_sec = 37 + [chttpd] ; These settings affect the main, clustered port (5984 by default). port = {{cluster_port}} diff --git a/src/docs/src/config/cluster.rst b/src/docs/src/config/cluster.rst index 8801b9cde..72959829c 100644 --- a/src/docs/src/config/cluster.rst +++ b/src/docs/src/config/cluster.rst @@ -87,6 +87,14 @@ Cluster Options [cluster] seedlist = couchdb@node1.example.com,couchdb@node2.example.com + .. config:option:: reconnect_interval_sec:: Cluster connectivity check period. + + .. versionadded:: 3.3 + + Period in seconds specifying how often to attempt reconnecting to + disconnected nodes. There is a 25% random jitter applied to this + value. + RPC Performance Tuning ====================== diff --git a/src/mem3/src/mem3_distribution.erl b/src/mem3/src/mem3_distribution.erl new file mode 100644 index 000000000..a2b77de8a --- /dev/null +++ b/src/mem3/src/mem3_distribution.erl @@ -0,0 +1,93 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% This module is in charge of keeping the cluster connected. If nodes +% disconnect they are reconnected with net_kernel:connect_node/1. + +-module(mem3_distribution). + +-behaviour(gen_server). + +-export([ + start_link/0, + connect_node/1 +]). + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + +-define(JITTER_PERCENT, 0.25). + +-record(st, { + tref +}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +connect_node(Node) when is_atom(Node) -> + net_kernel:connect_node(Node). + +init(_) -> + connect(false), + {ok, #st{tref = erlang:send_after(wait_msec(), self(), connect)}}. + +handle_call(Msg, _From, #st{} = St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + +handle_cast(Msg, #st{} = St) -> + {stop, {bad_cast, Msg}, St}. + +handle_info(connect, #st{} = St) -> + erlang:cancel_timer(St#st.tref), + ok = connect(true), + {noreply, St#st{tref = erlang:send_after(wait_msec(), self(), connect)}}; +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + +code_change(_OldVsn, #st{} = St, _Extra) -> + {ok, St}. + +connect(Log) -> + Expected = ordsets:from_list([N || N <- mem3:nodes(), N =/= node()]), + Connected = ordsets:from_list(nodes()), + NotConnected = ordsets:subtract(Expected, Connected), + connect(ordsets:to_list(NotConnected), Log). + +connect([], _Log) -> + ok; +connect([N | Rest], Log) -> + ConnectRes = ?MODULE:connect_node(N), + log(Log, ConnectRes, N), + connect(Rest, Log). + +log(true, true, Node) -> + couch_log:warning("~s : reconnected to ~s", [?MODULE, Node]), + ok; +log(_, _, _) -> + % Failed to connect or we don't want to log it + ok. + +wait_msec() -> + IntervalSec = config:get_integer("cluster", "reconnect_interval_sec", 37), + IntervalMSec = IntervalSec * 1000, + IntervalMSec + jitter(IntervalMSec). + +jitter(Interval) -> + Jitter = round(Interval * ?JITTER_PERCENT), + % rand:uniform(0) crashes! + rand:uniform(max(1, Jitter)). diff --git a/src/mem3/src/mem3_sup.erl b/src/mem3/src/mem3_sup.erl index a2dc5ba8d..862ef6b50 100644 --- a/src/mem3/src/mem3_sup.erl +++ b/src/mem3/src/mem3_sup.erl @@ -21,6 +21,7 @@ init(_Args) -> Children = [ child(mem3_events), child(mem3_nodes), + child(mem3_distribution), child(mem3_seeds), % Order important? child(mem3_sync_nodes), diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl index 3d1c18420..179435965 100644 --- a/src/mem3/src/mem3_sync.erl +++ b/src/mem3/src/mem3_sync.erl @@ -266,7 +266,6 @@ sync_nodes_and_dbs() -> [push(Db, Node) || Db <- local_dbs()]. initial_sync() -> - [net_kernel:connect_node(Node) || Node <- mem3:nodes()], mem3_sync_nodes:add(nodes()). initial_sync(Live) -> diff --git a/src/mem3/src/mem3_sync_event.erl b/src/mem3/src/mem3_sync_event.erl index ec6debb45..de9d3e74e 100644 --- a/src/mem3/src/mem3_sync_event.erl +++ b/src/mem3/src/mem3_sync_event.erl @@ -28,7 +28,7 @@ init(_) -> {ok, nil}. handle_event({add_node, Node}, State) when Node =/= node() -> - net_kernel:connect_node(Node), + mem3_distribution:connect_node(Node), mem3_sync_nodes:add([Node]), {ok, State}; handle_event({remove_node, Node}, State) -> diff --git a/src/mem3/test/eunit/mem3_distribution_test.erl b/src/mem3/test/eunit/mem3_distribution_test.erl new file mode 100644 index 000000000..d442a47b3 --- /dev/null +++ b/src/mem3/test/eunit/mem3_distribution_test.erl @@ -0,0 +1,74 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_distribution_test). + +-include_lib("couch/include/couch_eunit.hrl"). + +-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end). +-define(MOD, mem3_distribution). + +setup() -> + Ctx = test_util:start_couch([mem3]), + meck:new(mem3, [passthrough]), + meck:new(mem3_distribution, [passthrough]), + meck:new(couch_log, [passthrough]), + Ctx. + +teardown(Ctx) -> + meck:unload(), + test_util:stop_couch(Ctx). + +mem3_distribution_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(periodic_scheduler_works), + ?TDEF_FE(connect_to_unconnected_nodes) + ] + }. + +periodic_scheduler_works(_) -> + St = sys:get_state(?MOD), + {st, TRef} = St, + TVal = erlang:read_timer(TRef), + ?assert(is_integer(TVal)), + ?assert(TVal > 0), + ?assert(TVal =< 70000), + {noreply, St1} = ?MOD:handle_info(connect, St), + {st, TRef1} = St1, + ?assertNotEqual(TRef, TRef1), + TVal1 = erlang:read_timer(TRef1), + ?assert(is_integer(TVal1)). + +connect_to_unconnected_nodes(_) -> + Nodes = ['foo', 'bar'], + meck:expect(mem3, nodes, 0, Nodes), + meck:reset(?MOD), + % Simulate connect timer expiry + ?MOD ! connect, + meck:wait(?MOD, connect_node, [foo], 5000), + meck:wait(?MOD, connect_node, [bar], 5000), + % connect_node returns false => no reconnection log + timer:sleep(100), + ?assertEqual(0, meck:num_calls(couch_log, warning, 2)), + % Make connect return true + meck:reset(?MOD), + meck:expect(?MOD, connect_node, 1, true), + % Simulate connect timer expiry + ?MOD ! connect, + meck:wait(?MOD, connect_node, [foo], 5000), + meck:wait(?MOD, connect_node, [bar], 5000), + % connect_node returns true => emit reconnection log + meck:wait(2, couch_log, warning, 2, 5000). |