summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rel/overlay/etc/default.ini7
-rw-r--r--src/chttpd/src/chttpd_misc.erl8
-rw-r--r--src/mem3/src/mem3_nodes.erl26
-rw-r--r--src/mem3/src/mem3_rep.erl9
-rw-r--r--src/mem3/src/mem3_rpc.erl13
-rw-r--r--src/mem3/src/mem3_seeds.erl162
-rw-r--r--src/mem3/src/mem3_sup.erl1
-rw-r--r--src/mem3/src/mem3_sync.erl3
-rw-r--r--src/mem3/src/mem3_util.erl9
-rw-r--r--src/mem3/test/mem3_seeds_test.erl64
10 files changed, 284 insertions, 18 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index f384de3aa..edaebf9e2 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -77,6 +77,13 @@ q=8
n=3
; placement = metro-dc-a:2,metro-dc-b:1
+; Supply a comma-delimited list of node names that this node should
+; contact in order to join a cluster. If a seedlist is configured the ``_up``
+; endpoint will return a 404 until the node has successfully contacted at
+; least one of the members of the seedlist and replicated an up-to-date copy
+; of the ``_nodes``, ``_dbs``, and ``_users`` system databases.
+; seedlist = couchdb@node1.example.com,couchdb@node2.example.com
+
[chttpd]
; These settings affect the main, clustered port (5984 by default).
port = {{cluster_port}}
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index 1a6b3cb50..fc03fb512 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -480,7 +480,13 @@ handle_up_req(#httpd{method='GET'} = Req) ->
"nolb" ->
send_json(Req, 404, {[{status, nolb}]});
_ ->
- send_json(Req, 200, {[{status, ok}]})
+ {ok, {Status}} = mem3_seeds:get_status(),
+ case couch_util:get_value(status, Status) of
+ ok ->
+ send_json(Req, 200, {Status});
+ seeding ->
+ send_json(Req, 404, {Status})
+ end
end;
handle_up_req(Req) ->
diff --git a/src/mem3/src/mem3_nodes.erl b/src/mem3/src/mem3_nodes.erl
index 019ceaf32..dd5be1a72 100644
--- a/src/mem3/src/mem3_nodes.erl
+++ b/src/mem3/src/mem3_nodes.erl
@@ -93,15 +93,7 @@ initialize_nodelist() ->
DbName = config:get("mem3", "nodes_db", "_nodes"),
{ok, Db} = mem3_util:ensure_exists(DbName),
{ok, _} = couch_db:fold_docs(Db, fun first_fold/2, Db, []),
- % add self if not already present
- case ets:lookup(?MODULE, node()) of
- [_] ->
- ok;
- [] ->
- ets:insert(?MODULE, {node(), []}),
- Doc = #doc{id = couch_util:to_binary(node())},
- {ok, _} = couch_db:update_doc(Db, Doc, [])
- end,
+ insert_if_missing(Db, [node() | mem3_seeds:get_seeds()]),
Seq = couch_db:get_update_seq(Db),
couch_db:close(Db),
Seq.
@@ -145,3 +137,19 @@ changes_callback({change, {Change}, _}, _) ->
{ok, couch_util:get_value(<<"seq">>, Change)};
changes_callback(timeout, _) ->
{ok, nil}.
+
+insert_if_missing(Db, Nodes) ->
+ Docs = lists:foldl(fun(Node, Acc) ->
+ case ets:lookup(?MODULE, Node) of
+ [_] ->
+ Acc;
+ [] ->
+ ets:insert(?MODULE, {Node, []}),
+ [#doc{id = couch_util:to_binary(Node)} | Acc]
+ end
+ end, [], Nodes),
+ if Docs =/= [] ->
+ {ok, _} = couch_db:update_docs(Db, Docs, []);
+ true ->
+ {ok, []}
+ end.
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 5920f3bd4..b65fa7a86 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -495,7 +495,7 @@ update_locals(Acc) ->
{<<"source_node">>, atom_to_binary(node(), utf8)},
{<<"source_uuid">>, couch_db:get_uuid(Db)},
{<<"source_seq">>, Seq},
- {<<"timestamp">>, list_to_binary(iso8601_timestamp())}
+ {<<"timestamp">>, list_to_binary(mem3_util:iso8601_timestamp())}
],
NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History),
{ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
@@ -572,13 +572,6 @@ filter_doc(_, _) ->
keep.
-iso8601_timestamp() ->
- {_,_,Micro} = Now = os:timestamp(),
- {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
- Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
- io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
-
-
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index 35d1d0a49..61a29d9bf 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -19,6 +19,7 @@
find_common_seq/4,
get_missing_revs/4,
update_docs/4,
+ pull_replication/1,
load_checkpoint/4,
save_checkpoint/6,
@@ -31,6 +32,7 @@
-export([
find_common_seq_rpc/3,
load_checkpoint_rpc/3,
+ pull_replication_rpc/1,
save_checkpoint_rpc/5,
load_purge_infos_rpc/3,
@@ -41,6 +43,11 @@
-include("mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
+% "Pull" is a bit of a misnomer here, as what we're actually doing is
+% issuing an RPC request and telling the remote node to push updates to
+% us. This lets us reuse all of the battle-tested machinery of mem3_rpc.
+pull_replication(Seed) ->
+ rexi_call(Seed, {mem3_rpc, pull_replication_rpc, [node()]}).
get_missing_revs(Node, DbName, IdsRevs, Options) ->
rexi_call(Node, {fabric_rpc, get_missing_revs, [DbName, IdsRevs, Options]}).
@@ -148,6 +155,12 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
rexi:reply(Error)
end.
+pull_replication_rpc(Target) ->
+ Dbs = mem3_sync:local_dbs(),
+ Opts = [{batch_size, 1000}, {batch_count, 50}],
+ Repl = fun(Db) -> {Db, mem3_rep:go(Db, Target, Opts)} end,
+ rexi:reply({ok, lists:map(Repl, Dbs)}).
+
load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
erlang:put(io_priority, {internal_repl, DbName}),
diff --git a/src/mem3/src/mem3_seeds.erl b/src/mem3/src/mem3_seeds.erl
new file mode 100644
index 000000000..f1aceb996
--- /dev/null
+++ b/src/mem3/src/mem3_seeds.erl
@@ -0,0 +1,162 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_seeds).
+-behaviour(gen_server).
+
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3,
+ terminate/2
+]).
+
+-export([
+ start_link/0,
+ get_seeds/0,
+ get_status/0
+]).
+
+-record(st, {
+ ready = false,
+ seeds = [],
+ jobref = nil,
+ status = [] % nested proplist keyed on node name
+}).
+
+-define(REPLICATION_INTERVAL, 60000).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_seeds() ->
+ case config:get("cluster", "seedlist") of
+ undefined ->
+ [];
+ List ->
+ Nodes = string:tokens(List, ","),
+ Seeds = [list_to_atom(Node) || Node <- Nodes] -- [node()],
+ mem3_util:rotate_list(node(), Seeds)
+ end.
+
+get_status() ->
+ gen_server:call(?MODULE, get_status).
+
+init([]) ->
+ Seeds = get_seeds(),
+ InitStatus = [{Seed, {[]}} || Seed <- Seeds],
+ State = #st{
+ seeds = Seeds,
+ ready = case Seeds of [] -> true; _ -> false end,
+ jobref = start_replication(Seeds),
+ status = InitStatus
+ },
+ {ok, State}.
+
+handle_call(get_status, _From, St) ->
+ Status = {[
+ {status, case St#st.ready of true -> ok; false -> seeding end},
+ {seeds, {St#st.status}}
+ ]},
+ {reply, {ok, Status}, St}.
+
+handle_cast(_Msg, St) ->
+ {noreply, St}.
+
+handle_info(start_replication, #st{jobref=nil} = St) ->
+ JobRef = start_replication(St#st.seeds),
+ {noreply, St#st{jobref = JobRef}};
+
+handle_info({'DOWN', Ref, _, Pid, Output}, #st{jobref = {Pid, Ref}} = St) ->
+ {noreply, update_state(St, Output)};
+
+handle_info(_Msg, St) ->
+ {noreply, St}.
+
+terminate(_Reason, _St) ->
+ ok.
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+% internal functions
+
+start_replication([]) ->
+ nil;
+start_replication([Seed | _]) ->
+ spawn_monitor(fun() ->
+ Reply = mem3_rpc:pull_replication(Seed),
+ exit({ok, Reply})
+ end).
+
+update_state(State, {ok, Data}) ->
+ #st{seeds = [Current | Tail], status = Status} = State,
+ Report = {[
+ {timestamp, list_to_binary(mem3_util:iso8601_timestamp())},
+ {last_replication_status, ok},
+ format_data(Data)
+ ]},
+ NewStatus = lists:ukeymerge(1, [{Current, Report}], Status),
+ Ready = is_ready(State#st.ready, Data),
+ case Ready of
+ true ->
+ Seeds = Tail ++ [Current],
+ Job = nil;
+ false ->
+ % Try to progress this same seed again
+ Seeds = [Current | Tail],
+ Job = start_replication([Current | Tail])
+ end,
+ State#st{
+ seeds = Seeds,
+ jobref = Job,
+ ready = Ready,
+ status = NewStatus
+ };
+update_state(State, {_Error, _Stack}) ->
+ #st{seeds = [Current | Tail], status = Status} = State,
+ Report = {[
+ {timestamp, list_to_binary(mem3_util:iso8601_timestamp())},
+ {last_replication_status, error}
+ ]},
+ NewStatus = lists:ukeymerge(1, [{Current, Report}], Status),
+ Seeds = Tail ++ [Current],
+ if not State#st.ready ->
+ erlang:send_after(1000, self(), start_replication);
+ true ->
+ ok
+ end,
+ State#st{
+ seeds = Seeds,
+ jobref = nil,
+ status = NewStatus
+ }.
+
+is_ready(true, _) ->
+ true;
+is_ready(false, Data) ->
+ lists:all(fun({_DbName, Pending}) -> Pending =:= {ok, 0} end, Data).
+
+format_data(Data) ->
+ Formatted = lists:map(fun({DbName, Status}) ->
+ case Status of
+ {ok, Pending} when is_number(Pending) ->
+ {DbName, Pending};
+ {error, Tag} ->
+ {DbName, list_to_binary(io_lib:format("~p", [Tag]))};
+ _Else ->
+ {DbName, unknown_error}
+ end
+ end, Data),
+ {pending_updates, {Formatted}}.
diff --git a/src/mem3/src/mem3_sup.erl b/src/mem3/src/mem3_sup.erl
index 80b8ca37f..0adaf51e0 100644
--- a/src/mem3/src/mem3_sup.erl
+++ b/src/mem3/src/mem3_sup.erl
@@ -21,6 +21,7 @@ init(_Args) ->
Children = [
child(mem3_events),
child(mem3_nodes),
+ child(mem3_seeds),
child(mem3_sync_nodes), % Order important?
child(mem3_sync),
child(mem3_shards),
diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl
index 640181509..693fc4f31 100644
--- a/src/mem3/src/mem3_sync.erl
+++ b/src/mem3/src/mem3_sync.erl
@@ -19,6 +19,9 @@
-export([start_link/0, get_active/0, get_queue/0, push/1, push/2,
remove_node/1, remove_shard/1, initial_sync/1, get_backlog/0, nodes_db/0,
shards_db/0, users_db/0, find_next_node/0]).
+-export([
+ local_dbs/0
+]).
-import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]).
diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl
index 254a6dfa6..927607aff 100644
--- a/src/mem3/src/mem3_util.erl
+++ b/src/mem3/src/mem3_util.erl
@@ -16,6 +16,9 @@
n_val/2, q_val/1, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
shard_info/1, ensure_exists/1, open_db_doc/1]).
-export([is_deleted/1, rotate_list/2]).
+-export([
+ iso8601_timestamp/0
+]).
%% do not use outside mem3.
-export([build_ordered_shards/2, downcast/1]).
@@ -270,3 +273,9 @@ downcast(#ordered_shard{}=S) ->
};
downcast(Shards) when is_list(Shards) ->
[downcast(Shard) || Shard <- Shards].
+
+iso8601_timestamp() ->
+ {_,_,Micro} = Now = os:timestamp(),
+ {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
+ Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
+ io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
diff --git a/src/mem3/test/mem3_seeds_test.erl b/src/mem3/test/mem3_seeds_test.erl
new file mode 100644
index 000000000..19e007950
--- /dev/null
+++ b/src/mem3/test/mem3_seeds_test.erl
@@ -0,0 +1,64 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_seeds_test).
+
+-include_lib("eunit/include/eunit.hrl").
+
+a_test_() ->
+ Tests = [
+ {"empty seedlist should set status ok", fun empty_seedlist_status_ok/0},
+ {"all seedlist nodes unreachable keeps status seeding", fun seedlist_misconfiguration/0},
+ {"seedlist entries should be present in _nodes", fun check_nodelist/0}
+ ],
+ {setup, fun setup/0, fun teardown/1, Tests}.
+
+empty_seedlist_status_ok() ->
+ ok = application:start(mem3),
+ try
+ {ok, {Result}} = mem3_seeds:get_status(),
+ ?assertEqual({[]}, couch_util:get_value(seeds, Result)),
+ ?assertEqual(ok, couch_util:get_value(status, Result))
+ after
+ application:stop(mem3)
+ end.
+
+seedlist_misconfiguration() ->
+ config:set("cluster", "seedlist", "couchdb@node1.example.com,couchdb@node2.example.com", false),
+ ok = application:start(mem3),
+ try
+ {ok, {Result}} = mem3_seeds:get_status(),
+ {Seeds} = couch_util:get_value(seeds, Result),
+ ?assertEqual(2, length(Seeds)),
+ ?assertMatch({_}, couch_util:get_value('couchdb@node1.example.com', Seeds)),
+ ?assertMatch({_}, couch_util:get_value('couchdb@node2.example.com', Seeds)),
+ ?assertEqual(seeding, couch_util:get_value(status, Result))
+ after
+ application:stop(mem3)
+ end.
+
+check_nodelist() ->
+ config:set("cluster", "seedlist", "couchdb@node1.example.com,couchdb@node2.example.com", false),
+ ok = application:start(mem3),
+ try
+ Nodes = mem3:nodes(),
+ ?assert(lists:member('couchdb@node1.example.com', Nodes)),
+ ?assert(lists:member('couchdb@node2.example.com', Nodes))
+ after
+ application:stop(mem3)
+ end.
+
+setup() ->
+ test_util:start_couch([rexi]).
+
+teardown(Ctx) ->
+ test_util:stop_couch(Ctx).