diff options
-rw-r--r-- | rel/overlay/etc/default.ini | 7 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_misc.erl | 8 | ||||
-rw-r--r-- | src/mem3/src/mem3_nodes.erl | 26 | ||||
-rw-r--r-- | src/mem3/src/mem3_rep.erl | 9 | ||||
-rw-r--r-- | src/mem3/src/mem3_rpc.erl | 13 | ||||
-rw-r--r-- | src/mem3/src/mem3_seeds.erl | 162 | ||||
-rw-r--r-- | src/mem3/src/mem3_sup.erl | 1 | ||||
-rw-r--r-- | src/mem3/src/mem3_sync.erl | 3 | ||||
-rw-r--r-- | src/mem3/src/mem3_util.erl | 9 | ||||
-rw-r--r-- | src/mem3/test/mem3_seeds_test.erl | 64 |
10 files changed, 284 insertions, 18 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index f384de3aa..edaebf9e2 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -77,6 +77,13 @@ q=8 n=3 ; placement = metro-dc-a:2,metro-dc-b:1 +; Supply a comma-delimited list of node names that this node should +; contact in order to join a cluster. If a seedlist is configured the ``_up`` +; endpoint will return a 404 until the node has successfully contacted at +; least one of the members of the seedlist and replicated an up-to-date copy +; of the ``_nodes``, ``_dbs``, and ``_users`` system databases. +; seedlist = couchdb@node1.example.com,couchdb@node2.example.com + [chttpd] ; These settings affect the main, clustered port (5984 by default). port = {{cluster_port}} diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl index 1a6b3cb50..fc03fb512 100644 --- a/src/chttpd/src/chttpd_misc.erl +++ b/src/chttpd/src/chttpd_misc.erl @@ -480,7 +480,13 @@ handle_up_req(#httpd{method='GET'} = Req) -> "nolb" -> send_json(Req, 404, {[{status, nolb}]}); _ -> - send_json(Req, 200, {[{status, ok}]}) + {ok, {Status}} = mem3_seeds:get_status(), + case couch_util:get_value(status, Status) of + ok -> + send_json(Req, 200, {Status}); + seeding -> + send_json(Req, 404, {Status}) + end end; handle_up_req(Req) -> diff --git a/src/mem3/src/mem3_nodes.erl b/src/mem3/src/mem3_nodes.erl index 019ceaf32..dd5be1a72 100644 --- a/src/mem3/src/mem3_nodes.erl +++ b/src/mem3/src/mem3_nodes.erl @@ -93,15 +93,7 @@ initialize_nodelist() -> DbName = config:get("mem3", "nodes_db", "_nodes"), {ok, Db} = mem3_util:ensure_exists(DbName), {ok, _} = couch_db:fold_docs(Db, fun first_fold/2, Db, []), - % add self if not already present - case ets:lookup(?MODULE, node()) of - [_] -> - ok; - [] -> - ets:insert(?MODULE, {node(), []}), - Doc = #doc{id = couch_util:to_binary(node())}, - {ok, _} = couch_db:update_doc(Db, Doc, []) - end, + insert_if_missing(Db, [node() | mem3_seeds:get_seeds()]), Seq = couch_db:get_update_seq(Db), couch_db:close(Db), Seq. @@ -145,3 +137,19 @@ changes_callback({change, {Change}, _}, _) -> {ok, couch_util:get_value(<<"seq">>, Change)}; changes_callback(timeout, _) -> {ok, nil}. + +insert_if_missing(Db, Nodes) -> + Docs = lists:foldl(fun(Node, Acc) -> + case ets:lookup(?MODULE, Node) of + [_] -> + Acc; + [] -> + ets:insert(?MODULE, {Node, []}), + [#doc{id = couch_util:to_binary(Node)} | Acc] + end + end, [], Nodes), + if Docs =/= [] -> + {ok, _} = couch_db:update_docs(Db, Docs, []); + true -> + {ok, []} + end. diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index 5920f3bd4..b65fa7a86 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -495,7 +495,7 @@ update_locals(Acc) -> {<<"source_node">>, atom_to_binary(node(), utf8)}, {<<"source_uuid">>, couch_db:get_uuid(Db)}, {<<"source_seq">>, Seq}, - {<<"timestamp">>, list_to_binary(iso8601_timestamp())} + {<<"timestamp">>, list_to_binary(mem3_util:iso8601_timestamp())} ], NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History), {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []). @@ -572,13 +572,6 @@ filter_doc(_, _) -> keep. -iso8601_timestamp() -> - {_,_,Micro} = Now = os:timestamp(), - {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now), - Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ", - io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]). - - -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl index 35d1d0a49..61a29d9bf 100644 --- a/src/mem3/src/mem3_rpc.erl +++ b/src/mem3/src/mem3_rpc.erl @@ -19,6 +19,7 @@ find_common_seq/4, get_missing_revs/4, update_docs/4, + pull_replication/1, load_checkpoint/4, save_checkpoint/6, @@ -31,6 +32,7 @@ -export([ find_common_seq_rpc/3, load_checkpoint_rpc/3, + pull_replication_rpc/1, save_checkpoint_rpc/5, load_purge_infos_rpc/3, @@ -41,6 +43,11 @@ -include("mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). +% "Pull" is a bit of a misnomer here, as what we're actually doing is +% issuing an RPC request and telling the remote node to push updates to +% us. This lets us reuse all of the battle-tested machinery of mem3_rpc. +pull_replication(Seed) -> + rexi_call(Seed, {mem3_rpc, pull_replication_rpc, [node()]}). get_missing_revs(Node, DbName, IdsRevs, Options) -> rexi_call(Node, {fabric_rpc, get_missing_revs, [DbName, IdsRevs, Options]}). @@ -148,6 +155,12 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) -> rexi:reply(Error) end. +pull_replication_rpc(Target) -> + Dbs = mem3_sync:local_dbs(), + Opts = [{batch_size, 1000}, {batch_count, 50}], + Repl = fun(Db) -> {Db, mem3_rep:go(Db, Target, Opts)} end, + rexi:reply({ok, lists:map(Repl, Dbs)}). + load_purge_infos_rpc(DbName, SrcUUID, BatchSize) -> erlang:put(io_priority, {internal_repl, DbName}), diff --git a/src/mem3/src/mem3_seeds.erl b/src/mem3/src/mem3_seeds.erl new file mode 100644 index 000000000..f1aceb996 --- /dev/null +++ b/src/mem3/src/mem3_seeds.erl @@ -0,0 +1,162 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_seeds). +-behaviour(gen_server). + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + terminate/2 +]). + +-export([ + start_link/0, + get_seeds/0, + get_status/0 +]). + +-record(st, { + ready = false, + seeds = [], + jobref = nil, + status = [] % nested proplist keyed on node name +}). + +-define(REPLICATION_INTERVAL, 60000). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +get_seeds() -> + case config:get("cluster", "seedlist") of + undefined -> + []; + List -> + Nodes = string:tokens(List, ","), + Seeds = [list_to_atom(Node) || Node <- Nodes] -- [node()], + mem3_util:rotate_list(node(), Seeds) + end. + +get_status() -> + gen_server:call(?MODULE, get_status). + +init([]) -> + Seeds = get_seeds(), + InitStatus = [{Seed, {[]}} || Seed <- Seeds], + State = #st{ + seeds = Seeds, + ready = case Seeds of [] -> true; _ -> false end, + jobref = start_replication(Seeds), + status = InitStatus + }, + {ok, State}. + +handle_call(get_status, _From, St) -> + Status = {[ + {status, case St#st.ready of true -> ok; false -> seeding end}, + {seeds, {St#st.status}} + ]}, + {reply, {ok, Status}, St}. + +handle_cast(_Msg, St) -> + {noreply, St}. + +handle_info(start_replication, #st{jobref=nil} = St) -> + JobRef = start_replication(St#st.seeds), + {noreply, St#st{jobref = JobRef}}; + +handle_info({'DOWN', Ref, _, Pid, Output}, #st{jobref = {Pid, Ref}} = St) -> + {noreply, update_state(St, Output)}; + +handle_info(_Msg, St) -> + {noreply, St}. + +terminate(_Reason, _St) -> + ok. + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + +% internal functions + +start_replication([]) -> + nil; +start_replication([Seed | _]) -> + spawn_monitor(fun() -> + Reply = mem3_rpc:pull_replication(Seed), + exit({ok, Reply}) + end). + +update_state(State, {ok, Data}) -> + #st{seeds = [Current | Tail], status = Status} = State, + Report = {[ + {timestamp, list_to_binary(mem3_util:iso8601_timestamp())}, + {last_replication_status, ok}, + format_data(Data) + ]}, + NewStatus = lists:ukeymerge(1, [{Current, Report}], Status), + Ready = is_ready(State#st.ready, Data), + case Ready of + true -> + Seeds = Tail ++ [Current], + Job = nil; + false -> + % Try to progress this same seed again + Seeds = [Current | Tail], + Job = start_replication([Current | Tail]) + end, + State#st{ + seeds = Seeds, + jobref = Job, + ready = Ready, + status = NewStatus + }; +update_state(State, {_Error, _Stack}) -> + #st{seeds = [Current | Tail], status = Status} = State, + Report = {[ + {timestamp, list_to_binary(mem3_util:iso8601_timestamp())}, + {last_replication_status, error} + ]}, + NewStatus = lists:ukeymerge(1, [{Current, Report}], Status), + Seeds = Tail ++ [Current], + if not State#st.ready -> + erlang:send_after(1000, self(), start_replication); + true -> + ok + end, + State#st{ + seeds = Seeds, + jobref = nil, + status = NewStatus + }. + +is_ready(true, _) -> + true; +is_ready(false, Data) -> + lists:all(fun({_DbName, Pending}) -> Pending =:= {ok, 0} end, Data). + +format_data(Data) -> + Formatted = lists:map(fun({DbName, Status}) -> + case Status of + {ok, Pending} when is_number(Pending) -> + {DbName, Pending}; + {error, Tag} -> + {DbName, list_to_binary(io_lib:format("~p", [Tag]))}; + _Else -> + {DbName, unknown_error} + end + end, Data), + {pending_updates, {Formatted}}. diff --git a/src/mem3/src/mem3_sup.erl b/src/mem3/src/mem3_sup.erl index 80b8ca37f..0adaf51e0 100644 --- a/src/mem3/src/mem3_sup.erl +++ b/src/mem3/src/mem3_sup.erl @@ -21,6 +21,7 @@ init(_Args) -> Children = [ child(mem3_events), child(mem3_nodes), + child(mem3_seeds), child(mem3_sync_nodes), % Order important? child(mem3_sync), child(mem3_shards), diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl index 640181509..693fc4f31 100644 --- a/src/mem3/src/mem3_sync.erl +++ b/src/mem3/src/mem3_sync.erl @@ -19,6 +19,9 @@ -export([start_link/0, get_active/0, get_queue/0, push/1, push/2, remove_node/1, remove_shard/1, initial_sync/1, get_backlog/0, nodes_db/0, shards_db/0, users_db/0, find_next_node/0]). +-export([ + local_dbs/0 +]). -import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]). diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl index 254a6dfa6..927607aff 100644 --- a/src/mem3/src/mem3_util.erl +++ b/src/mem3/src/mem3_util.erl @@ -16,6 +16,9 @@ n_val/2, q_val/1, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1, shard_info/1, ensure_exists/1, open_db_doc/1]). -export([is_deleted/1, rotate_list/2]). +-export([ + iso8601_timestamp/0 +]). %% do not use outside mem3. -export([build_ordered_shards/2, downcast/1]). @@ -270,3 +273,9 @@ downcast(#ordered_shard{}=S) -> }; downcast(Shards) when is_list(Shards) -> [downcast(Shard) || Shard <- Shards]. + +iso8601_timestamp() -> + {_,_,Micro} = Now = os:timestamp(), + {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now), + Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ", + io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]). diff --git a/src/mem3/test/mem3_seeds_test.erl b/src/mem3/test/mem3_seeds_test.erl new file mode 100644 index 000000000..19e007950 --- /dev/null +++ b/src/mem3/test/mem3_seeds_test.erl @@ -0,0 +1,64 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_seeds_test). + +-include_lib("eunit/include/eunit.hrl"). + +a_test_() -> + Tests = [ + {"empty seedlist should set status ok", fun empty_seedlist_status_ok/0}, + {"all seedlist nodes unreachable keeps status seeding", fun seedlist_misconfiguration/0}, + {"seedlist entries should be present in _nodes", fun check_nodelist/0} + ], + {setup, fun setup/0, fun teardown/1, Tests}. + +empty_seedlist_status_ok() -> + ok = application:start(mem3), + try + {ok, {Result}} = mem3_seeds:get_status(), + ?assertEqual({[]}, couch_util:get_value(seeds, Result)), + ?assertEqual(ok, couch_util:get_value(status, Result)) + after + application:stop(mem3) + end. + +seedlist_misconfiguration() -> + config:set("cluster", "seedlist", "couchdb@node1.example.com,couchdb@node2.example.com", false), + ok = application:start(mem3), + try + {ok, {Result}} = mem3_seeds:get_status(), + {Seeds} = couch_util:get_value(seeds, Result), + ?assertEqual(2, length(Seeds)), + ?assertMatch({_}, couch_util:get_value('couchdb@node1.example.com', Seeds)), + ?assertMatch({_}, couch_util:get_value('couchdb@node2.example.com', Seeds)), + ?assertEqual(seeding, couch_util:get_value(status, Result)) + after + application:stop(mem3) + end. + +check_nodelist() -> + config:set("cluster", "seedlist", "couchdb@node1.example.com,couchdb@node2.example.com", false), + ok = application:start(mem3), + try + Nodes = mem3:nodes(), + ?assert(lists:member('couchdb@node1.example.com', Nodes)), + ?assert(lists:member('couchdb@node2.example.com', Nodes)) + after + application:stop(mem3) + end. + +setup() -> + test_util:start_couch([rexi]). + +teardown(Ctx) -> + test_util:stop_couch(Ctx). |