summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2018-11-10 21:40:43 -0500
committerGitHub <noreply@github.com>2018-11-10 21:40:43 -0500
commit0302e9de73edc94d63c16ed016c6dd666d7ede80 (patch)
tree997b528adaf3931024fbc1cd50ab764c20a2d347
parent844d6fe021121b89d98bf89cf4e9cc1d7e68a270 (diff)
downloadcouchdb-0302e9de73edc94d63c16ed016c6dd666d7ede80.tar.gz
Enable cluster auto-assembly through a seedlist (#1658)
Enable cluster auto-assembly through a seedlist This introduces a new config setting which allows an administrator to configure an initial list of nodes that should be contacted when a node boots up: [cluster] seedlist = couchdb@node1.example.com,couchdb@node2.example.com,couchdb@node3.example.com If configured, CouchDB will add every node in the seedlist to the _nodes DB automatically, which will trigger a distributed Erlang connection and a replication of the internal system databases to the local node. This eliminates the need to explicitly add each node using the HTTP API. We also modify the /_up endpoint to reflect the progress of the initial seeding of the node. If a seedlist is configured the endpoint will return 404 until the local node has updated its local replica of each of the system databases from one of the members of the seedlist. Once the status flips to "ok" the endpoint will return 200 and it's safe to direct requests to the new node.
-rw-r--r--rel/overlay/etc/default.ini7
-rw-r--r--src/chttpd/src/chttpd_misc.erl8
-rw-r--r--src/mem3/src/mem3_nodes.erl26
-rw-r--r--src/mem3/src/mem3_rep.erl9
-rw-r--r--src/mem3/src/mem3_rpc.erl13
-rw-r--r--src/mem3/src/mem3_seeds.erl162
-rw-r--r--src/mem3/src/mem3_sup.erl1
-rw-r--r--src/mem3/src/mem3_sync.erl3
-rw-r--r--src/mem3/src/mem3_util.erl9
-rw-r--r--src/mem3/test/mem3_seeds_test.erl64
10 files changed, 284 insertions, 18 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index f384de3aa..edaebf9e2 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -77,6 +77,13 @@ q=8
n=3
; placement = metro-dc-a:2,metro-dc-b:1
+; Supply a comma-delimited list of node names that this node should
+; contact in order to join a cluster. If a seedlist is configured the ``_up``
+; endpoint will return a 404 until the node has successfully contacted at
+; least one of the members of the seedlist and replicated an up-to-date copy
+; of the ``_nodes``, ``_dbs``, and ``_users`` system databases.
+; seedlist = couchdb@node1.example.com,couchdb@node2.example.com
+
[chttpd]
; These settings affect the main, clustered port (5984 by default).
port = {{cluster_port}}
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index 1a6b3cb50..fc03fb512 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -480,7 +480,13 @@ handle_up_req(#httpd{method='GET'} = Req) ->
"nolb" ->
send_json(Req, 404, {[{status, nolb}]});
_ ->
- send_json(Req, 200, {[{status, ok}]})
+ {ok, {Status}} = mem3_seeds:get_status(),
+ case couch_util:get_value(status, Status) of
+ ok ->
+ send_json(Req, 200, {Status});
+ seeding ->
+ send_json(Req, 404, {Status})
+ end
end;
handle_up_req(Req) ->
diff --git a/src/mem3/src/mem3_nodes.erl b/src/mem3/src/mem3_nodes.erl
index 019ceaf32..dd5be1a72 100644
--- a/src/mem3/src/mem3_nodes.erl
+++ b/src/mem3/src/mem3_nodes.erl
@@ -93,15 +93,7 @@ initialize_nodelist() ->
DbName = config:get("mem3", "nodes_db", "_nodes"),
{ok, Db} = mem3_util:ensure_exists(DbName),
{ok, _} = couch_db:fold_docs(Db, fun first_fold/2, Db, []),
- % add self if not already present
- case ets:lookup(?MODULE, node()) of
- [_] ->
- ok;
- [] ->
- ets:insert(?MODULE, {node(), []}),
- Doc = #doc{id = couch_util:to_binary(node())},
- {ok, _} = couch_db:update_doc(Db, Doc, [])
- end,
+ insert_if_missing(Db, [node() | mem3_seeds:get_seeds()]),
Seq = couch_db:get_update_seq(Db),
couch_db:close(Db),
Seq.
@@ -145,3 +137,19 @@ changes_callback({change, {Change}, _}, _) ->
{ok, couch_util:get_value(<<"seq">>, Change)};
changes_callback(timeout, _) ->
{ok, nil}.
+
+insert_if_missing(Db, Nodes) ->
+ Docs = lists:foldl(fun(Node, Acc) ->
+ case ets:lookup(?MODULE, Node) of
+ [_] ->
+ Acc;
+ [] ->
+ ets:insert(?MODULE, {Node, []}),
+ [#doc{id = couch_util:to_binary(Node)} | Acc]
+ end
+ end, [], Nodes),
+ if Docs =/= [] ->
+ {ok, _} = couch_db:update_docs(Db, Docs, []);
+ true ->
+ {ok, []}
+ end.
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 5920f3bd4..b65fa7a86 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -495,7 +495,7 @@ update_locals(Acc) ->
{<<"source_node">>, atom_to_binary(node(), utf8)},
{<<"source_uuid">>, couch_db:get_uuid(Db)},
{<<"source_seq">>, Seq},
- {<<"timestamp">>, list_to_binary(iso8601_timestamp())}
+ {<<"timestamp">>, list_to_binary(mem3_util:iso8601_timestamp())}
],
NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History),
{ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
@@ -572,13 +572,6 @@ filter_doc(_, _) ->
keep.
-iso8601_timestamp() ->
- {_,_,Micro} = Now = os:timestamp(),
- {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
- Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
- io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
-
-
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index 35d1d0a49..61a29d9bf 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -19,6 +19,7 @@
find_common_seq/4,
get_missing_revs/4,
update_docs/4,
+ pull_replication/1,
load_checkpoint/4,
save_checkpoint/6,
@@ -31,6 +32,7 @@
-export([
find_common_seq_rpc/3,
load_checkpoint_rpc/3,
+ pull_replication_rpc/1,
save_checkpoint_rpc/5,
load_purge_infos_rpc/3,
@@ -41,6 +43,11 @@
-include("mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
+% "Pull" is a bit of a misnomer here, as what we're actually doing is
+% issuing an RPC request and telling the remote node to push updates to
+% us. This lets us reuse all of the battle-tested machinery of mem3_rpc.
+pull_replication(Seed) ->
+ rexi_call(Seed, {mem3_rpc, pull_replication_rpc, [node()]}).
get_missing_revs(Node, DbName, IdsRevs, Options) ->
rexi_call(Node, {fabric_rpc, get_missing_revs, [DbName, IdsRevs, Options]}).
@@ -148,6 +155,12 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
rexi:reply(Error)
end.
+pull_replication_rpc(Target) ->
+ Dbs = mem3_sync:local_dbs(),
+ Opts = [{batch_size, 1000}, {batch_count, 50}],
+ Repl = fun(Db) -> {Db, mem3_rep:go(Db, Target, Opts)} end,
+ rexi:reply({ok, lists:map(Repl, Dbs)}).
+
load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
erlang:put(io_priority, {internal_repl, DbName}),
diff --git a/src/mem3/src/mem3_seeds.erl b/src/mem3/src/mem3_seeds.erl
new file mode 100644
index 000000000..f1aceb996
--- /dev/null
+++ b/src/mem3/src/mem3_seeds.erl
@@ -0,0 +1,162 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_seeds).
+-behaviour(gen_server).
+
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3,
+ terminate/2
+]).
+
+-export([
+ start_link/0,
+ get_seeds/0,
+ get_status/0
+]).
+
+-record(st, {
+ ready = false,
+ seeds = [],
+ jobref = nil,
+ status = [] % nested proplist keyed on node name
+}).
+
+-define(REPLICATION_INTERVAL, 60000).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_seeds() ->
+ case config:get("cluster", "seedlist") of
+ undefined ->
+ [];
+ List ->
+ Nodes = string:tokens(List, ","),
+ Seeds = [list_to_atom(Node) || Node <- Nodes] -- [node()],
+ mem3_util:rotate_list(node(), Seeds)
+ end.
+
+get_status() ->
+ gen_server:call(?MODULE, get_status).
+
+init([]) ->
+ Seeds = get_seeds(),
+ InitStatus = [{Seed, {[]}} || Seed <- Seeds],
+ State = #st{
+ seeds = Seeds,
+ ready = case Seeds of [] -> true; _ -> false end,
+ jobref = start_replication(Seeds),
+ status = InitStatus
+ },
+ {ok, State}.
+
+handle_call(get_status, _From, St) ->
+ Status = {[
+ {status, case St#st.ready of true -> ok; false -> seeding end},
+ {seeds, {St#st.status}}
+ ]},
+ {reply, {ok, Status}, St}.
+
+handle_cast(_Msg, St) ->
+ {noreply, St}.
+
+handle_info(start_replication, #st{jobref=nil} = St) ->
+ JobRef = start_replication(St#st.seeds),
+ {noreply, St#st{jobref = JobRef}};
+
+handle_info({'DOWN', Ref, _, Pid, Output}, #st{jobref = {Pid, Ref}} = St) ->
+ {noreply, update_state(St, Output)};
+
+handle_info(_Msg, St) ->
+ {noreply, St}.
+
+terminate(_Reason, _St) ->
+ ok.
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+% internal functions
+
+start_replication([]) ->
+ nil;
+start_replication([Seed | _]) ->
+ spawn_monitor(fun() ->
+ Reply = mem3_rpc:pull_replication(Seed),
+ exit({ok, Reply})
+ end).
+
+update_state(State, {ok, Data}) ->
+ #st{seeds = [Current | Tail], status = Status} = State,
+ Report = {[
+ {timestamp, list_to_binary(mem3_util:iso8601_timestamp())},
+ {last_replication_status, ok},
+ format_data(Data)
+ ]},
+ NewStatus = lists:ukeymerge(1, [{Current, Report}], Status),
+ Ready = is_ready(State#st.ready, Data),
+ case Ready of
+ true ->
+ Seeds = Tail ++ [Current],
+ Job = nil;
+ false ->
+ % Try to progress this same seed again
+ Seeds = [Current | Tail],
+ Job = start_replication([Current | Tail])
+ end,
+ State#st{
+ seeds = Seeds,
+ jobref = Job,
+ ready = Ready,
+ status = NewStatus
+ };
+update_state(State, {_Error, _Stack}) ->
+ #st{seeds = [Current | Tail], status = Status} = State,
+ Report = {[
+ {timestamp, list_to_binary(mem3_util:iso8601_timestamp())},
+ {last_replication_status, error}
+ ]},
+ NewStatus = lists:ukeymerge(1, [{Current, Report}], Status),
+ Seeds = Tail ++ [Current],
+ if not State#st.ready ->
+ erlang:send_after(1000, self(), start_replication);
+ true ->
+ ok
+ end,
+ State#st{
+ seeds = Seeds,
+ jobref = nil,
+ status = NewStatus
+ }.
+
+is_ready(true, _) ->
+ true;
+is_ready(false, Data) ->
+ lists:all(fun({_DbName, Pending}) -> Pending =:= {ok, 0} end, Data).
+
+format_data(Data) ->
+ Formatted = lists:map(fun({DbName, Status}) ->
+ case Status of
+ {ok, Pending} when is_number(Pending) ->
+ {DbName, Pending};
+ {error, Tag} ->
+ {DbName, list_to_binary(io_lib:format("~p", [Tag]))};
+ _Else ->
+ {DbName, unknown_error}
+ end
+ end, Data),
+ {pending_updates, {Formatted}}.
diff --git a/src/mem3/src/mem3_sup.erl b/src/mem3/src/mem3_sup.erl
index 80b8ca37f..0adaf51e0 100644
--- a/src/mem3/src/mem3_sup.erl
+++ b/src/mem3/src/mem3_sup.erl
@@ -21,6 +21,7 @@ init(_Args) ->
Children = [
child(mem3_events),
child(mem3_nodes),
+ child(mem3_seeds),
child(mem3_sync_nodes), % Order important?
child(mem3_sync),
child(mem3_shards),
diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl
index 640181509..693fc4f31 100644
--- a/src/mem3/src/mem3_sync.erl
+++ b/src/mem3/src/mem3_sync.erl
@@ -19,6 +19,9 @@
-export([start_link/0, get_active/0, get_queue/0, push/1, push/2,
remove_node/1, remove_shard/1, initial_sync/1, get_backlog/0, nodes_db/0,
shards_db/0, users_db/0, find_next_node/0]).
+-export([
+ local_dbs/0
+]).
-import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]).
diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl
index 254a6dfa6..927607aff 100644
--- a/src/mem3/src/mem3_util.erl
+++ b/src/mem3/src/mem3_util.erl
@@ -16,6 +16,9 @@
n_val/2, q_val/1, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
shard_info/1, ensure_exists/1, open_db_doc/1]).
-export([is_deleted/1, rotate_list/2]).
+-export([
+ iso8601_timestamp/0
+]).
%% do not use outside mem3.
-export([build_ordered_shards/2, downcast/1]).
@@ -270,3 +273,9 @@ downcast(#ordered_shard{}=S) ->
};
downcast(Shards) when is_list(Shards) ->
[downcast(Shard) || Shard <- Shards].
+
+iso8601_timestamp() ->
+ {_,_,Micro} = Now = os:timestamp(),
+ {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
+ Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
+ io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
diff --git a/src/mem3/test/mem3_seeds_test.erl b/src/mem3/test/mem3_seeds_test.erl
new file mode 100644
index 000000000..19e007950
--- /dev/null
+++ b/src/mem3/test/mem3_seeds_test.erl
@@ -0,0 +1,64 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_seeds_test).
+
+-include_lib("eunit/include/eunit.hrl").
+
+a_test_() ->
+ Tests = [
+ {"empty seedlist should set status ok", fun empty_seedlist_status_ok/0},
+ {"all seedlist nodes unreachable keeps status seeding", fun seedlist_misconfiguration/0},
+ {"seedlist entries should be present in _nodes", fun check_nodelist/0}
+ ],
+ {setup, fun setup/0, fun teardown/1, Tests}.
+
+empty_seedlist_status_ok() ->
+ ok = application:start(mem3),
+ try
+ {ok, {Result}} = mem3_seeds:get_status(),
+ ?assertEqual({[]}, couch_util:get_value(seeds, Result)),
+ ?assertEqual(ok, couch_util:get_value(status, Result))
+ after
+ application:stop(mem3)
+ end.
+
+seedlist_misconfiguration() ->
+ config:set("cluster", "seedlist", "couchdb@node1.example.com,couchdb@node2.example.com", false),
+ ok = application:start(mem3),
+ try
+ {ok, {Result}} = mem3_seeds:get_status(),
+ {Seeds} = couch_util:get_value(seeds, Result),
+ ?assertEqual(2, length(Seeds)),
+ ?assertMatch({_}, couch_util:get_value('couchdb@node1.example.com', Seeds)),
+ ?assertMatch({_}, couch_util:get_value('couchdb@node2.example.com', Seeds)),
+ ?assertEqual(seeding, couch_util:get_value(status, Result))
+ after
+ application:stop(mem3)
+ end.
+
+check_nodelist() ->
+ config:set("cluster", "seedlist", "couchdb@node1.example.com,couchdb@node2.example.com", false),
+ ok = application:start(mem3),
+ try
+ Nodes = mem3:nodes(),
+ ?assert(lists:member('couchdb@node1.example.com', Nodes)),
+ ?assert(lists:member('couchdb@node2.example.com', Nodes))
+ after
+ application:stop(mem3)
+ end.
+
+setup() ->
+ test_util:start_couch([rexi]).
+
+teardown(Ctx) ->
+ test_util:stop_couch(Ctx).