diff options
author | Adam Kocoloski <kocolosk@apache.org> | 2018-11-10 21:40:43 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-11-10 21:40:43 -0500 |
commit | 0302e9de73edc94d63c16ed016c6dd666d7ede80 (patch) | |
tree | 997b528adaf3931024fbc1cd50ab764c20a2d347 | |
parent | 844d6fe021121b89d98bf89cf4e9cc1d7e68a270 (diff) | |
download | couchdb-0302e9de73edc94d63c16ed016c6dd666d7ede80.tar.gz |
Enable cluster auto-assembly through a seedlist (#1658)
Enable cluster auto-assembly through a seedlist
This introduces a new config setting which allows an administrator to
configure an initial list of nodes that should be contacted when a node
boots up:
[cluster]
seedlist = couchdb@node1.example.com,couchdb@node2.example.com,couchdb@node3.example.com
If configured, CouchDB will add every node in the seedlist to the _nodes
DB automatically, which will trigger a distributed Erlang connection and
a replication of the internal system databases to the local node. This
eliminates the need to explicitly add each node using the HTTP API.
We also modify the /_up endpoint to reflect the progress of the initial seeding
of the node. If a seedlist is configured the endpoint will return 404 until the
local node has updated its local replica of each of the system databases from
one of the members of the seedlist. Once the status flips to "ok" the endpoint
will return 200 and it's safe to direct requests to the new node.
-rw-r--r-- | rel/overlay/etc/default.ini | 7 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_misc.erl | 8 | ||||
-rw-r--r-- | src/mem3/src/mem3_nodes.erl | 26 | ||||
-rw-r--r-- | src/mem3/src/mem3_rep.erl | 9 | ||||
-rw-r--r-- | src/mem3/src/mem3_rpc.erl | 13 | ||||
-rw-r--r-- | src/mem3/src/mem3_seeds.erl | 162 | ||||
-rw-r--r-- | src/mem3/src/mem3_sup.erl | 1 | ||||
-rw-r--r-- | src/mem3/src/mem3_sync.erl | 3 | ||||
-rw-r--r-- | src/mem3/src/mem3_util.erl | 9 | ||||
-rw-r--r-- | src/mem3/test/mem3_seeds_test.erl | 64 |
10 files changed, 284 insertions, 18 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index f384de3aa..edaebf9e2 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -77,6 +77,13 @@ q=8 n=3 ; placement = metro-dc-a:2,metro-dc-b:1 +; Supply a comma-delimited list of node names that this node should +; contact in order to join a cluster. If a seedlist is configured the ``_up`` +; endpoint will return a 404 until the node has successfully contacted at +; least one of the members of the seedlist and replicated an up-to-date copy +; of the ``_nodes``, ``_dbs``, and ``_users`` system databases. +; seedlist = couchdb@node1.example.com,couchdb@node2.example.com + [chttpd] ; These settings affect the main, clustered port (5984 by default). port = {{cluster_port}} diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl index 1a6b3cb50..fc03fb512 100644 --- a/src/chttpd/src/chttpd_misc.erl +++ b/src/chttpd/src/chttpd_misc.erl @@ -480,7 +480,13 @@ handle_up_req(#httpd{method='GET'} = Req) -> "nolb" -> send_json(Req, 404, {[{status, nolb}]}); _ -> - send_json(Req, 200, {[{status, ok}]}) + {ok, {Status}} = mem3_seeds:get_status(), + case couch_util:get_value(status, Status) of + ok -> + send_json(Req, 200, {Status}); + seeding -> + send_json(Req, 404, {Status}) + end end; handle_up_req(Req) -> diff --git a/src/mem3/src/mem3_nodes.erl b/src/mem3/src/mem3_nodes.erl index 019ceaf32..dd5be1a72 100644 --- a/src/mem3/src/mem3_nodes.erl +++ b/src/mem3/src/mem3_nodes.erl @@ -93,15 +93,7 @@ initialize_nodelist() -> DbName = config:get("mem3", "nodes_db", "_nodes"), {ok, Db} = mem3_util:ensure_exists(DbName), {ok, _} = couch_db:fold_docs(Db, fun first_fold/2, Db, []), - % add self if not already present - case ets:lookup(?MODULE, node()) of - [_] -> - ok; - [] -> - ets:insert(?MODULE, {node(), []}), - Doc = #doc{id = couch_util:to_binary(node())}, - {ok, _} = couch_db:update_doc(Db, Doc, []) - end, + insert_if_missing(Db, [node() | mem3_seeds:get_seeds()]), Seq = couch_db:get_update_seq(Db), couch_db:close(Db), Seq. @@ -145,3 +137,19 @@ changes_callback({change, {Change}, _}, _) -> {ok, couch_util:get_value(<<"seq">>, Change)}; changes_callback(timeout, _) -> {ok, nil}. + +insert_if_missing(Db, Nodes) -> + Docs = lists:foldl(fun(Node, Acc) -> + case ets:lookup(?MODULE, Node) of + [_] -> + Acc; + [] -> + ets:insert(?MODULE, {Node, []}), + [#doc{id = couch_util:to_binary(Node)} | Acc] + end + end, [], Nodes), + if Docs =/= [] -> + {ok, _} = couch_db:update_docs(Db, Docs, []); + true -> + {ok, []} + end. diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index 5920f3bd4..b65fa7a86 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -495,7 +495,7 @@ update_locals(Acc) -> {<<"source_node">>, atom_to_binary(node(), utf8)}, {<<"source_uuid">>, couch_db:get_uuid(Db)}, {<<"source_seq">>, Seq}, - {<<"timestamp">>, list_to_binary(iso8601_timestamp())} + {<<"timestamp">>, list_to_binary(mem3_util:iso8601_timestamp())} ], NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History), {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []). @@ -572,13 +572,6 @@ filter_doc(_, _) -> keep. -iso8601_timestamp() -> - {_,_,Micro} = Now = os:timestamp(), - {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now), - Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ", - io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]). - - -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl index 35d1d0a49..61a29d9bf 100644 --- a/src/mem3/src/mem3_rpc.erl +++ b/src/mem3/src/mem3_rpc.erl @@ -19,6 +19,7 @@ find_common_seq/4, get_missing_revs/4, update_docs/4, + pull_replication/1, load_checkpoint/4, save_checkpoint/6, @@ -31,6 +32,7 @@ -export([ find_common_seq_rpc/3, load_checkpoint_rpc/3, + pull_replication_rpc/1, save_checkpoint_rpc/5, load_purge_infos_rpc/3, @@ -41,6 +43,11 @@ -include("mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). +% "Pull" is a bit of a misnomer here, as what we're actually doing is +% issuing an RPC request and telling the remote node to push updates to +% us. This lets us reuse all of the battle-tested machinery of mem3_rpc. +pull_replication(Seed) -> + rexi_call(Seed, {mem3_rpc, pull_replication_rpc, [node()]}). get_missing_revs(Node, DbName, IdsRevs, Options) -> rexi_call(Node, {fabric_rpc, get_missing_revs, [DbName, IdsRevs, Options]}). @@ -148,6 +155,12 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) -> rexi:reply(Error) end. +pull_replication_rpc(Target) -> + Dbs = mem3_sync:local_dbs(), + Opts = [{batch_size, 1000}, {batch_count, 50}], + Repl = fun(Db) -> {Db, mem3_rep:go(Db, Target, Opts)} end, + rexi:reply({ok, lists:map(Repl, Dbs)}). + load_purge_infos_rpc(DbName, SrcUUID, BatchSize) -> erlang:put(io_priority, {internal_repl, DbName}), diff --git a/src/mem3/src/mem3_seeds.erl b/src/mem3/src/mem3_seeds.erl new file mode 100644 index 000000000..f1aceb996 --- /dev/null +++ b/src/mem3/src/mem3_seeds.erl @@ -0,0 +1,162 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_seeds). +-behaviour(gen_server). + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + terminate/2 +]). + +-export([ + start_link/0, + get_seeds/0, + get_status/0 +]). + +-record(st, { + ready = false, + seeds = [], + jobref = nil, + status = [] % nested proplist keyed on node name +}). + +-define(REPLICATION_INTERVAL, 60000). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +get_seeds() -> + case config:get("cluster", "seedlist") of + undefined -> + []; + List -> + Nodes = string:tokens(List, ","), + Seeds = [list_to_atom(Node) || Node <- Nodes] -- [node()], + mem3_util:rotate_list(node(), Seeds) + end. + +get_status() -> + gen_server:call(?MODULE, get_status). + +init([]) -> + Seeds = get_seeds(), + InitStatus = [{Seed, {[]}} || Seed <- Seeds], + State = #st{ + seeds = Seeds, + ready = case Seeds of [] -> true; _ -> false end, + jobref = start_replication(Seeds), + status = InitStatus + }, + {ok, State}. + +handle_call(get_status, _From, St) -> + Status = {[ + {status, case St#st.ready of true -> ok; false -> seeding end}, + {seeds, {St#st.status}} + ]}, + {reply, {ok, Status}, St}. + +handle_cast(_Msg, St) -> + {noreply, St}. + +handle_info(start_replication, #st{jobref=nil} = St) -> + JobRef = start_replication(St#st.seeds), + {noreply, St#st{jobref = JobRef}}; + +handle_info({'DOWN', Ref, _, Pid, Output}, #st{jobref = {Pid, Ref}} = St) -> + {noreply, update_state(St, Output)}; + +handle_info(_Msg, St) -> + {noreply, St}. + +terminate(_Reason, _St) -> + ok. + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + +% internal functions + +start_replication([]) -> + nil; +start_replication([Seed | _]) -> + spawn_monitor(fun() -> + Reply = mem3_rpc:pull_replication(Seed), + exit({ok, Reply}) + end). + +update_state(State, {ok, Data}) -> + #st{seeds = [Current | Tail], status = Status} = State, + Report = {[ + {timestamp, list_to_binary(mem3_util:iso8601_timestamp())}, + {last_replication_status, ok}, + format_data(Data) + ]}, + NewStatus = lists:ukeymerge(1, [{Current, Report}], Status), + Ready = is_ready(State#st.ready, Data), + case Ready of + true -> + Seeds = Tail ++ [Current], + Job = nil; + false -> + % Try to progress this same seed again + Seeds = [Current | Tail], + Job = start_replication([Current | Tail]) + end, + State#st{ + seeds = Seeds, + jobref = Job, + ready = Ready, + status = NewStatus + }; +update_state(State, {_Error, _Stack}) -> + #st{seeds = [Current | Tail], status = Status} = State, + Report = {[ + {timestamp, list_to_binary(mem3_util:iso8601_timestamp())}, + {last_replication_status, error} + ]}, + NewStatus = lists:ukeymerge(1, [{Current, Report}], Status), + Seeds = Tail ++ [Current], + if not State#st.ready -> + erlang:send_after(1000, self(), start_replication); + true -> + ok + end, + State#st{ + seeds = Seeds, + jobref = nil, + status = NewStatus + }. + +is_ready(true, _) -> + true; +is_ready(false, Data) -> + lists:all(fun({_DbName, Pending}) -> Pending =:= {ok, 0} end, Data). + +format_data(Data) -> + Formatted = lists:map(fun({DbName, Status}) -> + case Status of + {ok, Pending} when is_number(Pending) -> + {DbName, Pending}; + {error, Tag} -> + {DbName, list_to_binary(io_lib:format("~p", [Tag]))}; + _Else -> + {DbName, unknown_error} + end + end, Data), + {pending_updates, {Formatted}}. diff --git a/src/mem3/src/mem3_sup.erl b/src/mem3/src/mem3_sup.erl index 80b8ca37f..0adaf51e0 100644 --- a/src/mem3/src/mem3_sup.erl +++ b/src/mem3/src/mem3_sup.erl @@ -21,6 +21,7 @@ init(_Args) -> Children = [ child(mem3_events), child(mem3_nodes), + child(mem3_seeds), child(mem3_sync_nodes), % Order important? child(mem3_sync), child(mem3_shards), diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl index 640181509..693fc4f31 100644 --- a/src/mem3/src/mem3_sync.erl +++ b/src/mem3/src/mem3_sync.erl @@ -19,6 +19,9 @@ -export([start_link/0, get_active/0, get_queue/0, push/1, push/2, remove_node/1, remove_shard/1, initial_sync/1, get_backlog/0, nodes_db/0, shards_db/0, users_db/0, find_next_node/0]). +-export([ + local_dbs/0 +]). -import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]). diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl index 254a6dfa6..927607aff 100644 --- a/src/mem3/src/mem3_util.erl +++ b/src/mem3/src/mem3_util.erl @@ -16,6 +16,9 @@ n_val/2, q_val/1, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1, shard_info/1, ensure_exists/1, open_db_doc/1]). -export([is_deleted/1, rotate_list/2]). +-export([ + iso8601_timestamp/0 +]). %% do not use outside mem3. -export([build_ordered_shards/2, downcast/1]). @@ -270,3 +273,9 @@ downcast(#ordered_shard{}=S) -> }; downcast(Shards) when is_list(Shards) -> [downcast(Shard) || Shard <- Shards]. + +iso8601_timestamp() -> + {_,_,Micro} = Now = os:timestamp(), + {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now), + Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ", + io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]). diff --git a/src/mem3/test/mem3_seeds_test.erl b/src/mem3/test/mem3_seeds_test.erl new file mode 100644 index 000000000..19e007950 --- /dev/null +++ b/src/mem3/test/mem3_seeds_test.erl @@ -0,0 +1,64 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_seeds_test). + +-include_lib("eunit/include/eunit.hrl"). + +a_test_() -> + Tests = [ + {"empty seedlist should set status ok", fun empty_seedlist_status_ok/0}, + {"all seedlist nodes unreachable keeps status seeding", fun seedlist_misconfiguration/0}, + {"seedlist entries should be present in _nodes", fun check_nodelist/0} + ], + {setup, fun setup/0, fun teardown/1, Tests}. + +empty_seedlist_status_ok() -> + ok = application:start(mem3), + try + {ok, {Result}} = mem3_seeds:get_status(), + ?assertEqual({[]}, couch_util:get_value(seeds, Result)), + ?assertEqual(ok, couch_util:get_value(status, Result)) + after + application:stop(mem3) + end. + +seedlist_misconfiguration() -> + config:set("cluster", "seedlist", "couchdb@node1.example.com,couchdb@node2.example.com", false), + ok = application:start(mem3), + try + {ok, {Result}} = mem3_seeds:get_status(), + {Seeds} = couch_util:get_value(seeds, Result), + ?assertEqual(2, length(Seeds)), + ?assertMatch({_}, couch_util:get_value('couchdb@node1.example.com', Seeds)), + ?assertMatch({_}, couch_util:get_value('couchdb@node2.example.com', Seeds)), + ?assertEqual(seeding, couch_util:get_value(status, Result)) + after + application:stop(mem3) + end. + +check_nodelist() -> + config:set("cluster", "seedlist", "couchdb@node1.example.com,couchdb@node2.example.com", false), + ok = application:start(mem3), + try + Nodes = mem3:nodes(), + ?assert(lists:member('couchdb@node1.example.com', Nodes)), + ?assert(lists:member('couchdb@node2.example.com', Nodes)) + after + application:stop(mem3) + end. + +setup() -> + test_util:start_couch([rexi]). + +teardown(Ctx) -> + test_util:stop_couch(Ctx). |