summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJay Doane <jaydoane@apache.org>2020-12-14 12:19:47 -0800
committerGitHub <noreply@github.com>2020-12-14 12:19:47 -0800
commit15c8243f3ba21bac8c05374293b79813f1461f96 (patch)
treeb9d0413e5d6f49546e48d75d9872db3c770e8067
parent23b98345f0db38214eaad33b159cf10dd9deaf72 (diff)
parentc16721a35accdb427a50826ae9ad1117baf1d87d (diff)
downloadcouchdb-15c8243f3ba21bac8c05374293b79813f1461f96.tar.gz
Merge pull request #3296 from apache/custodian-merge
Merge custodian
-rw-r--r--rebar.config.script1
-rw-r--r--rel/reltool.config2
-rw-r--r--src/custodian/README8
-rw-r--r--src/custodian/src/custodian.app.src29
-rw-r--r--src/custodian/src/custodian.erl21
-rw-r--r--src/custodian/src/custodian.hrl49
-rw-r--r--src/custodian/src/custodian_app.erl28
-rw-r--r--src/custodian/src/custodian_db_checker.erl165
-rw-r--r--src/custodian/src/custodian_server.erl237
-rw-r--r--src/custodian/src/custodian_sup.erl45
-rw-r--r--src/custodian/src/custodian_util.erl273
11 files changed, 858 insertions, 0 deletions
diff --git a/rebar.config.script b/rebar.config.script
index 1ffbf70b0..03e6fcaf5 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -127,6 +127,7 @@ SubDirs = [
"src/couch_stats",
"src/couch_peruser",
"src/couch_tests",
+ "src/custodian",
"src/ddoc_cache",
"src/dreyfus",
"src/fabric",
diff --git a/rel/reltool.config b/rel/reltool.config
index b58e40071..70f7bbcb9 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -40,6 +40,7 @@
couch_stats,
couch_event,
couch_peruser,
+ custodian,
ddoc_cache,
dreyfus,
ets_lru,
@@ -99,6 +100,7 @@
{app, couch_stats, [{incl_cond, include}]},
{app, couch_event, [{incl_cond, include}]},
{app, couch_peruser, [{incl_cond, include}]},
+ {app, custodian, [{incl_cond, include}]},
{app, ddoc_cache, [{incl_cond, include}]},
{app, dreyfus, [{incl_cond, include}]},
{app, ets_lru, [{incl_cond, include}]},
diff --git a/src/custodian/README b/src/custodian/README
new file mode 100644
index 000000000..72681f447
--- /dev/null
+++ b/src/custodian/README
@@ -0,0 +1,8 @@
+Custodian is responsible for the data stored in CouchDB databases.
+
+Custodian scans the "dbs" database, which details the location of
+every shard of every database and ensures that operators are aware of
+any shard that is under-replicated (has less than N copies).
+
+Custodian accounts for data in transit (as indicated by the
+mem3.redirects section) as well as nodes not recently known to be up.
diff --git a/src/custodian/src/custodian.app.src b/src/custodian/src/custodian.app.src
new file mode 100644
index 000000000..b93b21ebb
--- /dev/null
+++ b/src/custodian/src/custodian.app.src
@@ -0,0 +1,29 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, custodian,
+ [
+ {description, "in your cluster, looking after your stuff"},
+ {vsn, git},
+ {registered, []},
+ {applications, [
+ kernel,
+ stdlib,
+ couch_log,
+ config,
+ couch_event,
+ couch,
+ mem3
+ ]},
+ {mod, { custodian_app, []}},
+ {env, []}
+ ]}.
diff --git a/src/custodian/src/custodian.erl b/src/custodian/src/custodian.erl
new file mode 100644
index 000000000..a16c925b5
--- /dev/null
+++ b/src/custodian/src/custodian.erl
@@ -0,0 +1,21 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(custodian).
+
+-export([report/0, summary/0]).
+
+report() ->
+ custodian_util:report().
+
+summary() ->
+ custodian_util:summary().
diff --git a/src/custodian/src/custodian.hrl b/src/custodian/src/custodian.hrl
new file mode 100644
index 000000000..bce22cf95
--- /dev/null
+++ b/src/custodian/src/custodian.hrl
@@ -0,0 +1,49 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(CUSTODIAN_ID, <<"_design/custodian">>).
+
+-define(CUSTODIAN_VALIDATION,
+<<"function(newDoc, oldDoc) {
+ var i, range, node;
+ if(newDoc['_id'].substring(0, 8) === \"_design/\") return;
+ if(newDoc['_deleted'] === true) return;
+ if (!newDoc.by_node) {
+ throw({forbidden: \"by_node is mandatory\"});
+ }
+ if (!newDoc.by_range) {
+ throw({forbidden: \"by_range is mandatory\"});
+ }
+ for (node in newDoc.by_node) {
+ for (i in newDoc.by_node[node]) {
+ range = newDoc.by_node[node][i];
+ if(!newDoc.by_range[range]) {
+ throw({forbidden: \"by_range for \" + range + \" is missing\"});
+ }
+ if(newDoc.by_range[range].indexOf(node) === -1) {
+ throw({forbidden : \"by_range for \" + range + \" is missing \" + node});
+ }
+ }
+ }
+ for (range in newDoc.by_range) {
+ for (i in newDoc.by_range[range]) {
+ node = newDoc.by_range[range][i];
+ if(!newDoc.by_node[node]) {
+ throw({forbidden: \"by_node for \" + node + \" is missing\"});
+ }
+ if (newDoc.by_node[node].indexOf(range) === -1) {
+ throw({forbidden: \"by_node for \" + node + \" is missing \" + range});
+ }
+ }
+ }
+}
+">>).
diff --git a/src/custodian/src/custodian_app.erl b/src/custodian/src/custodian_app.erl
new file mode 100644
index 000000000..91afe139f
--- /dev/null
+++ b/src/custodian/src/custodian_app.erl
@@ -0,0 +1,28 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(custodian_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+ custodian_sup:start_link().
+
+stop(_State) ->
+ ok.
diff --git a/src/custodian/src/custodian_db_checker.erl b/src/custodian/src/custodian_db_checker.erl
new file mode 100644
index 000000000..8308c8ecb
--- /dev/null
+++ b/src/custodian/src/custodian_db_checker.erl
@@ -0,0 +1,165 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(custodian_db_checker).
+-behaviour(gen_server).
+-vsn(1).
+
+
+-export([start_link/0]).
+
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+-export([
+ check_dbs/0
+]).
+
+
+-record(st, {
+ checker
+}).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init(_) ->
+ process_flag(trap_exit, true),
+ net_kernel:monitor_nodes(true),
+ {ok, restart_checker(#st{})}.
+
+
+terminate(_Reason, St) ->
+ couch_util:shutdown_sync(St#st.checker),
+ ok.
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {invalid_call, Msg}, {invalid_call, Msg}, St}.
+
+
+handle_cast(refresh, St) ->
+ {noreply, restart_checker(St)};
+
+handle_cast(Msg, St) ->
+ {stop, {invalid_cast, Msg}, St}.
+
+
+handle_info({nodeup, _}, St) ->
+ {noreply, restart_checker(St)};
+
+handle_info({nodedown, _}, St) ->
+ {noreply, restart_checker(St)};
+
+handle_info({'EXIT', Pid, normal}, #st{checker=Pid}=St) ->
+ {noreply, St#st{checker=undefined}};
+
+handle_info({'EXIT', Pid, Reason}, #st{checker=Pid}=St) ->
+ couch_log:notice("custodian db checker died ~p", [Reason]),
+ {noreply, restart_checker(St#st{checker=undefined})};
+
+handle_info(Msg, St) ->
+ {stop, {invalid_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+restart_checker(#st{checker=undefined}=St) ->
+ Pid = spawn_link(fun ?MODULE:check_dbs/0),
+ St#st{checker=Pid};
+restart_checker(#st{checker=Pid}=St) when is_pid(Pid) ->
+ St.
+
+
+check_dbs() ->
+ {ok, DbsDb} = custodian_util:ensure_dbs_exists(),
+ try
+ Missing = lists:foldl(fun(DbName, Count) ->
+ case check_db(DbsDb, DbName) of
+ ok -> Count;
+ missing -> Count + 1
+ end
+ end, 0, get_dbs()),
+ case Missing == 0 of
+ true -> clear_missing_dbs_alert();
+ false -> ok
+ end
+ after
+ couch_db:close(DbsDb)
+ end.
+
+
+check_db(DbsDb, DbName) when is_binary(DbName) ->
+ try
+ case couch_db:open_doc(DbsDb, DbName, []) of
+ {ok, _} ->
+ ok;
+ _ ->
+ send_missing_db_alert(DbName),
+ missing
+ end
+ catch _:_ ->
+ send_missing_db_alert(DbName),
+ missing
+ end.
+
+
+get_dbs() ->
+ lists:flatten([
+ get_users_db(),
+ get_stats_db()
+ ]).
+
+
+get_users_db() ->
+ UsersDb = config:get("couch_httpd_auth", "authentication_db", "users"),
+ [list_to_binary(UsersDb)].
+
+
+get_stats_db() ->
+ case application:get_env(ioq, stats_db) of
+ {ok, DbName} when is_binary(DbName) ->
+ [DbName];
+ {ok, DbName} when is_list(DbName) ->
+ [iolist_to_binary(DbName)];
+ _ ->
+ []
+ end.
+
+
+send_missing_db_alert(DbName) ->
+ couch_log:notice("Missing system database ~s", [DbName]),
+ Command = [
+ "send-sensu-event --standalone --critical",
+ " --output=\"Missing system database ",
+ binary_to_list(DbName),
+ "\" --handler=default custodian-missing-db-check"],
+ os:cmd(lists:concat(Command)).
+
+clear_missing_dbs_alert() ->
+ couch_log:notice("All system databases exist.", []),
+ Command = [
+ "send-sensu-event --standalone --ok",
+ " --output=\"All system databases exist\"",
+ " --handler=default custodian-missing-db-check"],
+ os:cmd(lists:concat(Command)).
diff --git a/src/custodian/src/custodian_server.erl b/src/custodian/src/custodian_server.erl
new file mode 100644
index 000000000..322cc3264
--- /dev/null
+++ b/src/custodian/src/custodian_server.erl
@@ -0,0 +1,237 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(custodian_server).
+-behaviour(gen_server).
+-vsn(3).
+-behaviour(config_listener).
+
+% public api.
+-export([start_link/0]).
+
+% gen_server api.
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ code_change/3, terminate/2]).
+
+% exported for callback.
+-export([
+ check_shards/0,
+ handle_db_event/3
+]).
+
+% config_listener callback
+-export([handle_config_change/5, handle_config_terminate/3]).
+
+% private records.
+-record(state, {
+ event_listener,
+ shard_checker,
+ rescan=false
+}).
+
+-define(VSN_0_2_7, 184129240591641721395874905059581858099).
+
+-ifdef(TEST).
+-define(RELISTEN_DELAY, 50).
+-else.
+-define(RELISTEN_DELAY, 5000).
+-endif.
+
+
+% public functions.
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+handle_config_change("couchdb", "maintenance_mode", _, _, S) ->
+ ok = gen_server:cast(?MODULE, refresh),
+ {ok, S};
+handle_config_change(_, _, _, _, S) ->
+ {ok, S}.
+
+handle_config_terminate(_, stop, _) ->
+ ok;
+handle_config_terminate(_Server, _Reason, _State) ->
+ erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).
+
+% gen_server functions.
+init(_) ->
+ process_flag(trap_exit, true),
+ net_kernel:monitor_nodes(true),
+ ok = config:listen_for_changes(?MODULE, nil),
+ {ok, LisPid} = start_event_listener(),
+ {ok, start_shard_checker(#state{
+ event_listener=LisPid
+ })}.
+
+handle_call(_Msg, _From, State) ->
+ {noreply, State}.
+
+handle_cast(refresh, State) ->
+ {noreply, start_shard_checker(State)}.
+
+handle_info({nodeup, _}, State) ->
+ {noreply, start_shard_checker(State)};
+
+handle_info({nodedown, _}, State) ->
+ {noreply, start_shard_checker(State)};
+
+handle_info({'EXIT', Pid, normal}, #state{shard_checker=Pid}=State) ->
+ NewState = State#state{shard_checker=undefined},
+ case State#state.rescan of
+ true ->
+ {noreply, start_shard_checker(NewState)};
+ false ->
+ {noreply, NewState}
+ end;
+
+handle_info({'EXIT', Pid, Reason}, #state{shard_checker=Pid}=State) ->
+ couch_log:notice("custodian shard checker died ~p", [Reason]),
+ NewState = State#state{shard_checker=undefined},
+ {noreply, start_shard_checker(NewState)};
+
+handle_info({'EXIT', Pid, Reason}, #state{event_listener=Pid}=State) ->
+ couch_log:notice("custodian update notifier died ~p", [Reason]),
+ {ok, Pid1} = start_event_listener(),
+ {noreply, State#state{event_listener=Pid1}};
+
+handle_info(restart_config_listener, State) ->
+ ok = config:listen_for_changes(?MODULE, nil),
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ couch_event:stop_listener(State#state.event_listener),
+ couch_util:shutdown_sync(State#state.shard_checker),
+ ok.
+
+code_change(?VSN_0_2_7, State, _Extra) ->
+ ok = config:listen_for_changes(?MODULE, nil),
+ {ok, State};
+code_change(_OldVsn, #state{}=State, _Extra) ->
+ {ok, State}.
+
+% private functions
+
+
+start_shard_checker(#state{shard_checker=undefined}=State) ->
+ State#state{
+ shard_checker=spawn_link(fun ?MODULE:check_shards/0),
+ rescan=false
+ };
+start_shard_checker(#state{shard_checker=Pid}=State) when is_pid(Pid) ->
+ State#state{rescan=true}.
+
+
+start_event_listener() ->
+ couch_event:link_listener(
+ ?MODULE, handle_db_event, nil, [{dbname, <<"dbs">>}]
+ ).
+
+handle_db_event(_DbName, updated, _St) ->
+ gen_server:cast(?MODULE, refresh),
+ {ok, nil};
+handle_db_event(_DbName, _Event, _St) ->
+ {ok, nil}.
+
+check_shards() ->
+ [send_sensu_event(Item) || Item <- custodian:summary()].
+
+send_sensu_event({_, Count} = Item) ->
+ Level = case Count of
+ 0 ->
+ "--ok";
+ 1 ->
+ couch_log:critical("~s", [describe(Item)]),
+ "--critical";
+ _ ->
+ couch_log:warning("~s", [describe(Item)]),
+ "--warning"
+ end,
+ Cmd = lists:concat([
+ "send-sensu-event --standalone ",
+ Level,
+ " --output=\"",
+ describe(Item),
+ "\" ",
+ check_name(Item)
+ ]),
+ os:cmd(Cmd).
+
+describe({{safe, N}, Count}) ->
+ lists:concat([Count, " ", shards(Count), " in cluster with only ", N,
+ " ", copies(N), " on nodes that are currently up"]);
+describe({{live, N}, Count}) ->
+ lists:concat([Count, " ", shards(Count), " in cluster with only ",
+ N, " ", copies(N), " on nodes not in maintenance mode"]);
+describe({conflicted, Count}) ->
+ lists:concat([Count, " conflicted ", shards(Count), " in cluster"]).
+
+check_name({{Type, N}, _}) ->
+ lists:concat(["custodian-", N, "-", Type, "-shards-check"]);
+check_name({Type, _}) ->
+ lists:concat(["custodian-", Type, "-shards-check"]).
+
+shards(1) ->
+ "shard";
+shards(_) ->
+ "shards".
+
+copies(1) ->
+ "copy";
+copies(_) ->
+ "copies".
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+config_update_test_() ->
+ {
+ "Test config updates",
+ {
+ foreach,
+ fun() -> test_util:start_couch([custodian]) end,
+ fun test_util:stop_couch/1,
+ [
+ fun t_restart_config_listener/1
+ ]
+ }
+}.
+
+t_restart_config_listener(_) ->
+ ?_test(begin
+ ConfigMonitor = config_listener_mon(),
+ ?assert(is_process_alive(ConfigMonitor)),
+ test_util:stop_sync(ConfigMonitor),
+ ?assertNot(is_process_alive(ConfigMonitor)),
+ NewConfigMonitor = test_util:wait(fun() ->
+ case config_listener_mon() of
+ undefined -> wait;
+ Pid -> Pid
+ end
+ end),
+ ?assertNotEqual(ConfigMonitor, NewConfigMonitor),
+ ?assert(is_process_alive(NewConfigMonitor))
+ end).
+
+config_listener_mon() ->
+ IsConfigMonitor = fun(P) ->
+ [M | _] = string:tokens(couch_debug:process_name(P), ":"),
+ M =:= "config_listener_mon"
+ end,
+ [{_, MonitoredBy}] = process_info(whereis(?MODULE), [monitored_by]),
+ case lists:filter(IsConfigMonitor, MonitoredBy) of
+ [Pid] -> Pid;
+ [] -> undefined
+ end.
+
+-endif.
diff --git a/src/custodian/src/custodian_sup.erl b/src/custodian/src/custodian_sup.erl
new file mode 100644
index 000000000..97dbd2321
--- /dev/null
+++ b/src/custodian/src/custodian_sup.erl
@@ -0,0 +1,45 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(custodian_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+ {ok, {
+ {one_for_one, 5, 10},
+ [
+ ?CHILD(custodian_server, worker),
+ ?CHILD(custodian_db_checker, worker)
+ ]
+ }}.
+
diff --git a/src/custodian/src/custodian_util.erl b/src/custodian/src/custodian_util.erl
new file mode 100644
index 000000000..785bbd3da
--- /dev/null
+++ b/src/custodian/src/custodian_util.erl
@@ -0,0 +1,273 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(custodian_util).
+-include("custodian.hrl").
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-export([summary/0, report/0]).
+-export([ensure_dbs_exists/0]).
+
+-record(state, {live, safe, n, callback, db, acc}).
+
+%% public functions.
+
+summary() ->
+ Dict0 = dict:from_list([{conflicted, 0}] ++
+ [{{live, N}, 0} || N <- lists:seq(0, cluster_n() - 1)] ++
+ [{{safe, N}, 0} || N <- lists:seq(0, cluster_n() - 1)]),
+ Fun = fun(_Id, _Range, {conflicted, _N}, Dict) ->
+ dict:update_counter(conflicted, 1, Dict);
+ (_Id, _Range, Item, Dict) ->
+ dict:update_counter(Item, 1, Dict)
+ end,
+ dict:to_list(fold_dbs(Dict0, Fun)).
+
+report() ->
+ Fun = fun(Id, _Range, {conflicted, N}, Acc) ->
+ [{Id, {conflicted, N}} | Acc];
+ (Id, Range, Item, Acc) ->
+ [{Id, Range, Item} | Acc]
+ end,
+ fold_dbs([], Fun).
+
+ensure_dbs_exists() ->
+ DbName = config:get("mem3", "shards_db", "dbs"),
+ {ok, Db} = mem3_util:ensure_exists(DbName),
+ ensure_custodian_ddoc_exists(Db),
+ {ok, Db}.
+
+%% private functions.
+
+fold_dbs(Acc, Fun) ->
+ Safe = maybe_redirect([node() | nodes()]),
+ Live = Safe -- maintenance_nodes(Safe),
+ N = cluster_n(),
+ {ok, Db} = ensure_dbs_exists(),
+ try
+ State0 = #state{live=Live, safe=Safe, n=N, callback=Fun, db=Db, acc=Acc},
+ {ok, State1} = couch_db:fold_docs(Db, fun fold_dbs1/2, State0, []),
+ State1#state.acc
+ after
+ couch_db:close(Db)
+ end.
+
+fold_dbs1(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) ->
+ {ok, Acc};
+fold_dbs1(#full_doc_info{deleted=true}, Acc) ->
+ {ok, Acc};
+fold_dbs1(#full_doc_info{id = Id} = FDI, State) ->
+ InternalAcc = case count_conflicts(FDI) of
+ 0 ->
+ State#state.acc;
+ ConflictCount ->
+ (State#state.callback)(Id, null, {conflicted, ConflictCount}, State#state.acc)
+ end,
+ fold_dbs(Id, load_shards(State#state.db, FDI), State#state{acc=InternalAcc}).
+
+
+fold_dbs(Id, Shards, State) ->
+ IsSafe = fun(#shard{node = N}) -> lists:member(N, State#state.safe) end,
+ IsLive = fun(#shard{node = N}) -> lists:member(N, State#state.live) end,
+ TargetN = State#state.n,
+ LiveShards = lists:filter(IsLive, Shards),
+ SafeShards = lists:filter(IsSafe, Shards),
+ Acc0 = State#state.acc,
+ Acc1 = case mem3_util:calculate_max_n(LiveShards) of
+ LiveN when LiveN < TargetN ->
+ LiveRanges = get_range_counts(LiveN, LiveShards, Shards),
+ lists:foldl(fun({Range, N}, FAcc) ->
+ (State#state.callback)(Id, Range, {live, N}, FAcc)
+ end, Acc0, LiveRanges);
+ _ ->
+ Acc0
+ end,
+ Acc2 = case mem3_util:calculate_max_n(SafeShards) of
+ SafeN when SafeN < TargetN ->
+ SafeRanges = get_range_counts(SafeN, SafeShards, Shards),
+ lists:foldl(fun({Range, N}, FAcc) ->
+ (State#state.callback)(Id, Range, {safe, N}, FAcc)
+ end, Acc1, SafeRanges);
+ _ ->
+ Acc1
+ end,
+ {ok, State#state{acc = Acc2}}.
+
+
+get_range_counts(MaxN, Shards, AllShards) ->
+ Ranges = ranges(Shards),
+ AllRanges = ranges(AllShards),
+
+ % Get a list of ranges that were used to fill the MaxN rings. Also return
+ % whatever was left (not part of the rings).
+ {UnusedRanges, UsedRanges} = get_n_rings(MaxN, Ranges, []),
+
+ % All the ranges that participated in filling the N rings will get
+ % their number of copies set to MaxN.
+ UsedCounts = update_counts(UsedRanges, #{}, 1, fun(_) -> MaxN end),
+
+ % Add ranges that were present but didn't get picked in the rings
+ PresentCounts = update_counts(UnusedRanges, UsedCounts, 1, fun(N) ->
+ max(N + 1, MaxN)
+ end),
+
+ % Handle shards that are not present at all. Mark these ranges as missing.
+ Missing = [R || R <- AllRanges, not lists:member(R, Ranges)],
+ RangeCounts = update_counts(Missing, PresentCounts, 0, fun(_) -> 0 end),
+
+ % Report only shards with counts =< MaxN
+ RangeCounts1 = maps:filter(fun(_, N) -> N =< MaxN end, RangeCounts),
+ lists:sort(maps:to_list(RangeCounts1)).
+
+
+update_counts(Ranges, Acc0, Init, UpdateFun) ->
+ lists:foldl(fun({B, E}, Acc) ->
+ maps:update_with({B, E}, UpdateFun, Init, Acc)
+ end, Acc0, Ranges).
+
+
+ranges(Shards) ->
+ lists:map(fun(S) -> [B, E] = mem3:range(S), {B, E} end, Shards).
+
+
+get_n_rings(N, Ranges, Rings) when N =< 0 ->
+ {Ranges, Rings};
+get_n_rings(N, Ranges, Rings) ->
+ Ring = mem3_util:get_ring(Ranges),
+ get_n_rings(N - 1, Ranges -- Ring, Rings ++ Ring).
+
+
+cluster_n() ->
+ list_to_integer(config:get("cluster", "n", "3")).
+
+maintenance_nodes(Nodes) ->
+ {Modes, _} = rpc:multicall(Nodes, config, get, ["couchdb", "maintenance_mode"]),
+ [N || {N, Mode} <- lists:zip(Nodes, Modes), Mode =:= "true"].
+
+load_shards(Db, #full_doc_info{id = Id} = FDI) ->
+ case couch_db:open_doc(Db, FDI, [ejson_body]) of
+ {ok, #doc{body = {Props}}} ->
+ mem3_util:build_shards(Id, Props);
+ {not_found, _} ->
+ erlang:error(database_does_not_exist, ?b2l(Id))
+ end.
+
+maybe_redirect(Nodes) ->
+ maybe_redirect(Nodes, []).
+
+maybe_redirect([], Acc) ->
+ Acc;
+maybe_redirect([Node|Rest], Acc) ->
+ case config:get("mem3.redirects", atom_to_list(Node)) of
+ undefined ->
+ maybe_redirect(Rest, [Node|Acc]);
+ Redirect ->
+ maybe_redirect(Rest, [list_to_atom(Redirect)|Acc])
+ end.
+
+count_conflicts(#full_doc_info{rev_tree = T}) ->
+ Leafs = [1 || {#leaf{deleted=false}, _} <- couch_key_tree:get_all_leafs(T)],
+ length(Leafs) - 1.
+
+ensure_custodian_ddoc_exists(Db) ->
+ case couch_db:open_doc(Db, ?CUSTODIAN_ID, [ejson_body]) of
+ {not_found, _Reason} ->
+ try couch_db:update_doc(Db, custodian_ddoc(), []) of
+ {ok, _} ->
+ ok
+ catch conflict ->
+ {ok, NewDb} = couch_db:reopen(Db),
+ ensure_custodian_ddoc_exists(NewDb)
+ end;
+ {ok, Doc} ->
+ {Props} = couch_doc:to_json_obj(Doc, []),
+ Props1 = lists:keystore(<<"validate_doc_update">>, 1, Props, {<<"validate_doc_update">>, ?CUSTODIAN_VALIDATION}),
+ case Props =:= Props1 of
+ true ->
+ ok;
+ false ->
+ try couch_db:update_doc(Db, couch_doc:from_json_obj({Props1}), []) of
+ {ok, _} ->
+ ok
+ catch conflict ->
+ {ok, NewDb} = couch_db:reopen(Db),
+ ensure_custodian_ddoc_exists(NewDb)
+ end
+ end
+ end.
+
+custodian_ddoc() ->
+ Props = [
+ {<<"_id">>, ?CUSTODIAN_ID},
+ {<<"language">>, <<"javascript">>},
+ {<<"validate_doc_update">>, ?CUSTODIAN_VALIDATION}
+ ],
+ couch_doc:from_json_obj({Props}).
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+
+get_range_counts_test_() ->
+ [?_assertEqual(Res, get_range_counts(N, Shards, AllShards)) || {N, Shards,
+ AllShards, Res} <- [
+ % No shards are present. There is a full range shard that would
+ % fit. Report that range as missing.
+ {0, [], [full()], [{{0, ?RING_END}, 0}]},
+
+ % Can't complete the ring. But would complete it if had the
+ % {2, ?RING_END} interval available.
+ {0, [sh(0, 1)], [sh(0, 1), sh(2, ?RING_END)], [{{2, ?RING_END}, 0}]},
+
+ % Can complete the ring only 1 time. Report that range as the
+ % one available with a count of 1
+ {1, [full()], [full(), full()], [{{0, ?RING_END}, 1}]},
+
+ % Can complete the ring only 1 time with a full range shard, but
+ % there is also {2, ?RING_END} that would complete another the
+ % the ring as well if {0, 1} was present.
+ {1, [sh(2, ?RING_END), full()], [sh(0, 1), sh(2, ?RING_END), full()],
+ [
+ {{0, 1}, 0},
+ {{0, ?RING_END}, 1},
+ {{2, ?RING_END}, 1}
+ ]
+ },
+
+ % Can complete the ring 2 times [{0, 2},{3, ?RING_END)] and full(),
+ % and there is remnant of a 5, 9 range that would comlete the ring
+ % as well if {0, 4} and {10, ?RING_END} were present. So report
+ {2, [sh(0, 2), sh(3, ?RING_END), sh(5, 9), full()], [sh(0, 2), sh(3,
+ ?RING_END), full(), sh(0, 4), sh(5, 9), sh(10, ?RING_END)],
+ [
+ {{0, 2}, 1},
+ {{0, 4}, 0},
+ {{0, ?RING_END}, 1},
+ {{3, ?RING_END}, 1},
+ {{5, 9}, 1},
+ {{10, ?RING_END}, 0}
+ ]
+ }
+ ]].
+
+
+full() ->
+ #shard{range = [0, ?RING_END]}.
+
+
+sh(B, E) ->
+ #shard{range = [B, E]}.
+
+-endif.