diff options
author | Jay Doane <jaydoane@apache.org> | 2020-12-14 12:19:47 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-14 12:19:47 -0800 |
commit | 15c8243f3ba21bac8c05374293b79813f1461f96 (patch) | |
tree | b9d0413e5d6f49546e48d75d9872db3c770e8067 | |
parent | 23b98345f0db38214eaad33b159cf10dd9deaf72 (diff) | |
parent | c16721a35accdb427a50826ae9ad1117baf1d87d (diff) | |
download | couchdb-15c8243f3ba21bac8c05374293b79813f1461f96.tar.gz |
Merge pull request #3296 from apache/custodian-merge
Merge custodian
-rw-r--r-- | rebar.config.script | 1 | ||||
-rw-r--r-- | rel/reltool.config | 2 | ||||
-rw-r--r-- | src/custodian/README | 8 | ||||
-rw-r--r-- | src/custodian/src/custodian.app.src | 29 | ||||
-rw-r--r-- | src/custodian/src/custodian.erl | 21 | ||||
-rw-r--r-- | src/custodian/src/custodian.hrl | 49 | ||||
-rw-r--r-- | src/custodian/src/custodian_app.erl | 28 | ||||
-rw-r--r-- | src/custodian/src/custodian_db_checker.erl | 165 | ||||
-rw-r--r-- | src/custodian/src/custodian_server.erl | 237 | ||||
-rw-r--r-- | src/custodian/src/custodian_sup.erl | 45 | ||||
-rw-r--r-- | src/custodian/src/custodian_util.erl | 273 |
11 files changed, 858 insertions, 0 deletions
diff --git a/rebar.config.script b/rebar.config.script index 1ffbf70b0..03e6fcaf5 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -127,6 +127,7 @@ SubDirs = [ "src/couch_stats", "src/couch_peruser", "src/couch_tests", + "src/custodian", "src/ddoc_cache", "src/dreyfus", "src/fabric", diff --git a/rel/reltool.config b/rel/reltool.config index b58e40071..70f7bbcb9 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -40,6 +40,7 @@ couch_stats, couch_event, couch_peruser, + custodian, ddoc_cache, dreyfus, ets_lru, @@ -99,6 +100,7 @@ {app, couch_stats, [{incl_cond, include}]}, {app, couch_event, [{incl_cond, include}]}, {app, couch_peruser, [{incl_cond, include}]}, + {app, custodian, [{incl_cond, include}]}, {app, ddoc_cache, [{incl_cond, include}]}, {app, dreyfus, [{incl_cond, include}]}, {app, ets_lru, [{incl_cond, include}]}, diff --git a/src/custodian/README b/src/custodian/README new file mode 100644 index 000000000..72681f447 --- /dev/null +++ b/src/custodian/README @@ -0,0 +1,8 @@ +Custodian is responsible for the data stored in CouchDB databases. + +Custodian scans the "dbs" database, which details the location of +every shard of every database and ensures that operators are aware of +any shard that is under-replicated (has less than N copies). + +Custodian accounts for data in transit (as indicated by the +mem3.redirects section) as well as nodes not recently known to be up. diff --git a/src/custodian/src/custodian.app.src b/src/custodian/src/custodian.app.src new file mode 100644 index 000000000..b93b21ebb --- /dev/null +++ b/src/custodian/src/custodian.app.src @@ -0,0 +1,29 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{application, custodian, + [ + {description, "in your cluster, looking after your stuff"}, + {vsn, git}, + {registered, []}, + {applications, [ + kernel, + stdlib, + couch_log, + config, + couch_event, + couch, + mem3 + ]}, + {mod, { custodian_app, []}}, + {env, []} + ]}. diff --git a/src/custodian/src/custodian.erl b/src/custodian/src/custodian.erl new file mode 100644 index 000000000..a16c925b5 --- /dev/null +++ b/src/custodian/src/custodian.erl @@ -0,0 +1,21 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(custodian). + +-export([report/0, summary/0]). + +report() -> + custodian_util:report(). + +summary() -> + custodian_util:summary(). diff --git a/src/custodian/src/custodian.hrl b/src/custodian/src/custodian.hrl new file mode 100644 index 000000000..bce22cf95 --- /dev/null +++ b/src/custodian/src/custodian.hrl @@ -0,0 +1,49 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-define(CUSTODIAN_ID, <<"_design/custodian">>). + +-define(CUSTODIAN_VALIDATION, +<<"function(newDoc, oldDoc) { + var i, range, node; + if(newDoc['_id'].substring(0, 8) === \"_design/\") return; + if(newDoc['_deleted'] === true) return; + if (!newDoc.by_node) { + throw({forbidden: \"by_node is mandatory\"}); + } + if (!newDoc.by_range) { + throw({forbidden: \"by_range is mandatory\"}); + } + for (node in newDoc.by_node) { + for (i in newDoc.by_node[node]) { + range = newDoc.by_node[node][i]; + if(!newDoc.by_range[range]) { + throw({forbidden: \"by_range for \" + range + \" is missing\"}); + } + if(newDoc.by_range[range].indexOf(node) === -1) { + throw({forbidden : \"by_range for \" + range + \" is missing \" + node}); + } + } + } + for (range in newDoc.by_range) { + for (i in newDoc.by_range[range]) { + node = newDoc.by_range[range][i]; + if(!newDoc.by_node[node]) { + throw({forbidden: \"by_node for \" + node + \" is missing\"}); + } + if (newDoc.by_node[node].indexOf(range) === -1) { + throw({forbidden: \"by_node for \" + node + \" is missing \" + range}); + } + } + } +} +">>). diff --git a/src/custodian/src/custodian_app.erl b/src/custodian/src/custodian_app.erl new file mode 100644 index 000000000..91afe139f --- /dev/null +++ b/src/custodian/src/custodian_app.erl @@ -0,0 +1,28 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(custodian_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + custodian_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/custodian/src/custodian_db_checker.erl b/src/custodian/src/custodian_db_checker.erl new file mode 100644 index 000000000..8308c8ecb --- /dev/null +++ b/src/custodian/src/custodian_db_checker.erl @@ -0,0 +1,165 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(custodian_db_checker). +-behaviour(gen_server). +-vsn(1). + + +-export([start_link/0]). + + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + +-export([ + check_dbs/0 +]). + + +-record(st, { + checker +}). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +init(_) -> + process_flag(trap_exit, true), + net_kernel:monitor_nodes(true), + {ok, restart_checker(#st{})}. + + +terminate(_Reason, St) -> + couch_util:shutdown_sync(St#st.checker), + ok. + + +handle_call(Msg, _From, St) -> + {stop, {invalid_call, Msg}, {invalid_call, Msg}, St}. + + +handle_cast(refresh, St) -> + {noreply, restart_checker(St)}; + +handle_cast(Msg, St) -> + {stop, {invalid_cast, Msg}, St}. + + +handle_info({nodeup, _}, St) -> + {noreply, restart_checker(St)}; + +handle_info({nodedown, _}, St) -> + {noreply, restart_checker(St)}; + +handle_info({'EXIT', Pid, normal}, #st{checker=Pid}=St) -> + {noreply, St#st{checker=undefined}}; + +handle_info({'EXIT', Pid, Reason}, #st{checker=Pid}=St) -> + couch_log:notice("custodian db checker died ~p", [Reason]), + {noreply, restart_checker(St#st{checker=undefined})}; + +handle_info(Msg, St) -> + {stop, {invalid_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +restart_checker(#st{checker=undefined}=St) -> + Pid = spawn_link(fun ?MODULE:check_dbs/0), + St#st{checker=Pid}; +restart_checker(#st{checker=Pid}=St) when is_pid(Pid) -> + St. + + +check_dbs() -> + {ok, DbsDb} = custodian_util:ensure_dbs_exists(), + try + Missing = lists:foldl(fun(DbName, Count) -> + case check_db(DbsDb, DbName) of + ok -> Count; + missing -> Count + 1 + end + end, 0, get_dbs()), + case Missing == 0 of + true -> clear_missing_dbs_alert(); + false -> ok + end + after + couch_db:close(DbsDb) + end. + + +check_db(DbsDb, DbName) when is_binary(DbName) -> + try + case couch_db:open_doc(DbsDb, DbName, []) of + {ok, _} -> + ok; + _ -> + send_missing_db_alert(DbName), + missing + end + catch _:_ -> + send_missing_db_alert(DbName), + missing + end. + + +get_dbs() -> + lists:flatten([ + get_users_db(), + get_stats_db() + ]). + + +get_users_db() -> + UsersDb = config:get("couch_httpd_auth", "authentication_db", "users"), + [list_to_binary(UsersDb)]. + + +get_stats_db() -> + case application:get_env(ioq, stats_db) of + {ok, DbName} when is_binary(DbName) -> + [DbName]; + {ok, DbName} when is_list(DbName) -> + [iolist_to_binary(DbName)]; + _ -> + [] + end. + + +send_missing_db_alert(DbName) -> + couch_log:notice("Missing system database ~s", [DbName]), + Command = [ + "send-sensu-event --standalone --critical", + " --output=\"Missing system database ", + binary_to_list(DbName), + "\" --handler=default custodian-missing-db-check"], + os:cmd(lists:concat(Command)). + +clear_missing_dbs_alert() -> + couch_log:notice("All system databases exist.", []), + Command = [ + "send-sensu-event --standalone --ok", + " --output=\"All system databases exist\"", + " --handler=default custodian-missing-db-check"], + os:cmd(lists:concat(Command)). diff --git a/src/custodian/src/custodian_server.erl b/src/custodian/src/custodian_server.erl new file mode 100644 index 000000000..322cc3264 --- /dev/null +++ b/src/custodian/src/custodian_server.erl @@ -0,0 +1,237 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(custodian_server). +-behaviour(gen_server). +-vsn(3). +-behaviour(config_listener). + +% public api. +-export([start_link/0]). + +% gen_server api. +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + code_change/3, terminate/2]). + +% exported for callback. +-export([ + check_shards/0, + handle_db_event/3 +]). + +% config_listener callback +-export([handle_config_change/5, handle_config_terminate/3]). + +% private records. +-record(state, { + event_listener, + shard_checker, + rescan=false +}). + +-define(VSN_0_2_7, 184129240591641721395874905059581858099). + +-ifdef(TEST). +-define(RELISTEN_DELAY, 50). +-else. +-define(RELISTEN_DELAY, 5000). +-endif. + + +% public functions. + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +handle_config_change("couchdb", "maintenance_mode", _, _, S) -> + ok = gen_server:cast(?MODULE, refresh), + {ok, S}; +handle_config_change(_, _, _, _, S) -> + {ok, S}. + +handle_config_terminate(_, stop, _) -> + ok; +handle_config_terminate(_Server, _Reason, _State) -> + erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener). + +% gen_server functions. +init(_) -> + process_flag(trap_exit, true), + net_kernel:monitor_nodes(true), + ok = config:listen_for_changes(?MODULE, nil), + {ok, LisPid} = start_event_listener(), + {ok, start_shard_checker(#state{ + event_listener=LisPid + })}. + +handle_call(_Msg, _From, State) -> + {noreply, State}. + +handle_cast(refresh, State) -> + {noreply, start_shard_checker(State)}. + +handle_info({nodeup, _}, State) -> + {noreply, start_shard_checker(State)}; + +handle_info({nodedown, _}, State) -> + {noreply, start_shard_checker(State)}; + +handle_info({'EXIT', Pid, normal}, #state{shard_checker=Pid}=State) -> + NewState = State#state{shard_checker=undefined}, + case State#state.rescan of + true -> + {noreply, start_shard_checker(NewState)}; + false -> + {noreply, NewState} + end; + +handle_info({'EXIT', Pid, Reason}, #state{shard_checker=Pid}=State) -> + couch_log:notice("custodian shard checker died ~p", [Reason]), + NewState = State#state{shard_checker=undefined}, + {noreply, start_shard_checker(NewState)}; + +handle_info({'EXIT', Pid, Reason}, #state{event_listener=Pid}=State) -> + couch_log:notice("custodian update notifier died ~p", [Reason]), + {ok, Pid1} = start_event_listener(), + {noreply, State#state{event_listener=Pid1}}; + +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}. + +terminate(_Reason, State) -> + couch_event:stop_listener(State#state.event_listener), + couch_util:shutdown_sync(State#state.shard_checker), + ok. + +code_change(?VSN_0_2_7, State, _Extra) -> + ok = config:listen_for_changes(?MODULE, nil), + {ok, State}; +code_change(_OldVsn, #state{}=State, _Extra) -> + {ok, State}. + +% private functions + + +start_shard_checker(#state{shard_checker=undefined}=State) -> + State#state{ + shard_checker=spawn_link(fun ?MODULE:check_shards/0), + rescan=false + }; +start_shard_checker(#state{shard_checker=Pid}=State) when is_pid(Pid) -> + State#state{rescan=true}. + + +start_event_listener() -> + couch_event:link_listener( + ?MODULE, handle_db_event, nil, [{dbname, <<"dbs">>}] + ). + +handle_db_event(_DbName, updated, _St) -> + gen_server:cast(?MODULE, refresh), + {ok, nil}; +handle_db_event(_DbName, _Event, _St) -> + {ok, nil}. + +check_shards() -> + [send_sensu_event(Item) || Item <- custodian:summary()]. + +send_sensu_event({_, Count} = Item) -> + Level = case Count of + 0 -> + "--ok"; + 1 -> + couch_log:critical("~s", [describe(Item)]), + "--critical"; + _ -> + couch_log:warning("~s", [describe(Item)]), + "--warning" + end, + Cmd = lists:concat([ + "send-sensu-event --standalone ", + Level, + " --output=\"", + describe(Item), + "\" ", + check_name(Item) + ]), + os:cmd(Cmd). + +describe({{safe, N}, Count}) -> + lists:concat([Count, " ", shards(Count), " in cluster with only ", N, + " ", copies(N), " on nodes that are currently up"]); +describe({{live, N}, Count}) -> + lists:concat([Count, " ", shards(Count), " in cluster with only ", + N, " ", copies(N), " on nodes not in maintenance mode"]); +describe({conflicted, Count}) -> + lists:concat([Count, " conflicted ", shards(Count), " in cluster"]). + +check_name({{Type, N}, _}) -> + lists:concat(["custodian-", N, "-", Type, "-shards-check"]); +check_name({Type, _}) -> + lists:concat(["custodian-", Type, "-shards-check"]). + +shards(1) -> + "shard"; +shards(_) -> + "shards". + +copies(1) -> + "copy"; +copies(_) -> + "copies". + + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +config_update_test_() -> + { + "Test config updates", + { + foreach, + fun() -> test_util:start_couch([custodian]) end, + fun test_util:stop_couch/1, + [ + fun t_restart_config_listener/1 + ] + } +}. + +t_restart_config_listener(_) -> + ?_test(begin + ConfigMonitor = config_listener_mon(), + ?assert(is_process_alive(ConfigMonitor)), + test_util:stop_sync(ConfigMonitor), + ?assertNot(is_process_alive(ConfigMonitor)), + NewConfigMonitor = test_util:wait(fun() -> + case config_listener_mon() of + undefined -> wait; + Pid -> Pid + end + end), + ?assertNotEqual(ConfigMonitor, NewConfigMonitor), + ?assert(is_process_alive(NewConfigMonitor)) + end). + +config_listener_mon() -> + IsConfigMonitor = fun(P) -> + [M | _] = string:tokens(couch_debug:process_name(P), ":"), + M =:= "config_listener_mon" + end, + [{_, MonitoredBy}] = process_info(whereis(?MODULE), [monitored_by]), + case lists:filter(IsConfigMonitor, MonitoredBy) of + [Pid] -> Pid; + [] -> undefined + end. + +-endif. diff --git a/src/custodian/src/custodian_sup.erl b/src/custodian/src/custodian_sup.erl new file mode 100644 index 000000000..97dbd2321 --- /dev/null +++ b/src/custodian/src/custodian_sup.erl @@ -0,0 +1,45 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(custodian_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + {ok, { + {one_for_one, 5, 10}, + [ + ?CHILD(custodian_server, worker), + ?CHILD(custodian_db_checker, worker) + ] + }}. + diff --git a/src/custodian/src/custodian_util.erl b/src/custodian/src/custodian_util.erl new file mode 100644 index 000000000..785bbd3da --- /dev/null +++ b/src/custodian/src/custodian_util.erl @@ -0,0 +1,273 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(custodian_util). +-include("custodian.hrl"). + +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-export([summary/0, report/0]). +-export([ensure_dbs_exists/0]). + +-record(state, {live, safe, n, callback, db, acc}). + +%% public functions. + +summary() -> + Dict0 = dict:from_list([{conflicted, 0}] ++ + [{{live, N}, 0} || N <- lists:seq(0, cluster_n() - 1)] ++ + [{{safe, N}, 0} || N <- lists:seq(0, cluster_n() - 1)]), + Fun = fun(_Id, _Range, {conflicted, _N}, Dict) -> + dict:update_counter(conflicted, 1, Dict); + (_Id, _Range, Item, Dict) -> + dict:update_counter(Item, 1, Dict) + end, + dict:to_list(fold_dbs(Dict0, Fun)). + +report() -> + Fun = fun(Id, _Range, {conflicted, N}, Acc) -> + [{Id, {conflicted, N}} | Acc]; + (Id, Range, Item, Acc) -> + [{Id, Range, Item} | Acc] + end, + fold_dbs([], Fun). + +ensure_dbs_exists() -> + DbName = config:get("mem3", "shards_db", "dbs"), + {ok, Db} = mem3_util:ensure_exists(DbName), + ensure_custodian_ddoc_exists(Db), + {ok, Db}. + +%% private functions. + +fold_dbs(Acc, Fun) -> + Safe = maybe_redirect([node() | nodes()]), + Live = Safe -- maintenance_nodes(Safe), + N = cluster_n(), + {ok, Db} = ensure_dbs_exists(), + try + State0 = #state{live=Live, safe=Safe, n=N, callback=Fun, db=Db, acc=Acc}, + {ok, State1} = couch_db:fold_docs(Db, fun fold_dbs1/2, State0, []), + State1#state.acc + after + couch_db:close(Db) + end. + +fold_dbs1(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) -> + {ok, Acc}; +fold_dbs1(#full_doc_info{deleted=true}, Acc) -> + {ok, Acc}; +fold_dbs1(#full_doc_info{id = Id} = FDI, State) -> + InternalAcc = case count_conflicts(FDI) of + 0 -> + State#state.acc; + ConflictCount -> + (State#state.callback)(Id, null, {conflicted, ConflictCount}, State#state.acc) + end, + fold_dbs(Id, load_shards(State#state.db, FDI), State#state{acc=InternalAcc}). + + +fold_dbs(Id, Shards, State) -> + IsSafe = fun(#shard{node = N}) -> lists:member(N, State#state.safe) end, + IsLive = fun(#shard{node = N}) -> lists:member(N, State#state.live) end, + TargetN = State#state.n, + LiveShards = lists:filter(IsLive, Shards), + SafeShards = lists:filter(IsSafe, Shards), + Acc0 = State#state.acc, + Acc1 = case mem3_util:calculate_max_n(LiveShards) of + LiveN when LiveN < TargetN -> + LiveRanges = get_range_counts(LiveN, LiveShards, Shards), + lists:foldl(fun({Range, N}, FAcc) -> + (State#state.callback)(Id, Range, {live, N}, FAcc) + end, Acc0, LiveRanges); + _ -> + Acc0 + end, + Acc2 = case mem3_util:calculate_max_n(SafeShards) of + SafeN when SafeN < TargetN -> + SafeRanges = get_range_counts(SafeN, SafeShards, Shards), + lists:foldl(fun({Range, N}, FAcc) -> + (State#state.callback)(Id, Range, {safe, N}, FAcc) + end, Acc1, SafeRanges); + _ -> + Acc1 + end, + {ok, State#state{acc = Acc2}}. + + +get_range_counts(MaxN, Shards, AllShards) -> + Ranges = ranges(Shards), + AllRanges = ranges(AllShards), + + % Get a list of ranges that were used to fill the MaxN rings. Also return + % whatever was left (not part of the rings). + {UnusedRanges, UsedRanges} = get_n_rings(MaxN, Ranges, []), + + % All the ranges that participated in filling the N rings will get + % their number of copies set to MaxN. + UsedCounts = update_counts(UsedRanges, #{}, 1, fun(_) -> MaxN end), + + % Add ranges that were present but didn't get picked in the rings + PresentCounts = update_counts(UnusedRanges, UsedCounts, 1, fun(N) -> + max(N + 1, MaxN) + end), + + % Handle shards that are not present at all. Mark these ranges as missing. + Missing = [R || R <- AllRanges, not lists:member(R, Ranges)], + RangeCounts = update_counts(Missing, PresentCounts, 0, fun(_) -> 0 end), + + % Report only shards with counts =< MaxN + RangeCounts1 = maps:filter(fun(_, N) -> N =< MaxN end, RangeCounts), + lists:sort(maps:to_list(RangeCounts1)). + + +update_counts(Ranges, Acc0, Init, UpdateFun) -> + lists:foldl(fun({B, E}, Acc) -> + maps:update_with({B, E}, UpdateFun, Init, Acc) + end, Acc0, Ranges). + + +ranges(Shards) -> + lists:map(fun(S) -> [B, E] = mem3:range(S), {B, E} end, Shards). + + +get_n_rings(N, Ranges, Rings) when N =< 0 -> + {Ranges, Rings}; +get_n_rings(N, Ranges, Rings) -> + Ring = mem3_util:get_ring(Ranges), + get_n_rings(N - 1, Ranges -- Ring, Rings ++ Ring). + + +cluster_n() -> + list_to_integer(config:get("cluster", "n", "3")). + +maintenance_nodes(Nodes) -> + {Modes, _} = rpc:multicall(Nodes, config, get, ["couchdb", "maintenance_mode"]), + [N || {N, Mode} <- lists:zip(Nodes, Modes), Mode =:= "true"]. + +load_shards(Db, #full_doc_info{id = Id} = FDI) -> + case couch_db:open_doc(Db, FDI, [ejson_body]) of + {ok, #doc{body = {Props}}} -> + mem3_util:build_shards(Id, Props); + {not_found, _} -> + erlang:error(database_does_not_exist, ?b2l(Id)) + end. + +maybe_redirect(Nodes) -> + maybe_redirect(Nodes, []). + +maybe_redirect([], Acc) -> + Acc; +maybe_redirect([Node|Rest], Acc) -> + case config:get("mem3.redirects", atom_to_list(Node)) of + undefined -> + maybe_redirect(Rest, [Node|Acc]); + Redirect -> + maybe_redirect(Rest, [list_to_atom(Redirect)|Acc]) + end. + +count_conflicts(#full_doc_info{rev_tree = T}) -> + Leafs = [1 || {#leaf{deleted=false}, _} <- couch_key_tree:get_all_leafs(T)], + length(Leafs) - 1. + +ensure_custodian_ddoc_exists(Db) -> + case couch_db:open_doc(Db, ?CUSTODIAN_ID, [ejson_body]) of + {not_found, _Reason} -> + try couch_db:update_doc(Db, custodian_ddoc(), []) of + {ok, _} -> + ok + catch conflict -> + {ok, NewDb} = couch_db:reopen(Db), + ensure_custodian_ddoc_exists(NewDb) + end; + {ok, Doc} -> + {Props} = couch_doc:to_json_obj(Doc, []), + Props1 = lists:keystore(<<"validate_doc_update">>, 1, Props, {<<"validate_doc_update">>, ?CUSTODIAN_VALIDATION}), + case Props =:= Props1 of + true -> + ok; + false -> + try couch_db:update_doc(Db, couch_doc:from_json_obj({Props1}), []) of + {ok, _} -> + ok + catch conflict -> + {ok, NewDb} = couch_db:reopen(Db), + ensure_custodian_ddoc_exists(NewDb) + end + end + end. + +custodian_ddoc() -> + Props = [ + {<<"_id">>, ?CUSTODIAN_ID}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, ?CUSTODIAN_VALIDATION} + ], + couch_doc:from_json_obj({Props}). + + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + + +get_range_counts_test_() -> + [?_assertEqual(Res, get_range_counts(N, Shards, AllShards)) || {N, Shards, + AllShards, Res} <- [ + % No shards are present. There is a full range shard that would + % fit. Report that range as missing. + {0, [], [full()], [{{0, ?RING_END}, 0}]}, + + % Can't complete the ring. But would complete it if had the + % {2, ?RING_END} interval available. + {0, [sh(0, 1)], [sh(0, 1), sh(2, ?RING_END)], [{{2, ?RING_END}, 0}]}, + + % Can complete the ring only 1 time. Report that range as the + % one available with a count of 1 + {1, [full()], [full(), full()], [{{0, ?RING_END}, 1}]}, + + % Can complete the ring only 1 time with a full range shard, but + % there is also {2, ?RING_END} that would complete another the + % the ring as well if {0, 1} was present. + {1, [sh(2, ?RING_END), full()], [sh(0, 1), sh(2, ?RING_END), full()], + [ + {{0, 1}, 0}, + {{0, ?RING_END}, 1}, + {{2, ?RING_END}, 1} + ] + }, + + % Can complete the ring 2 times [{0, 2},{3, ?RING_END)] and full(), + % and there is remnant of a 5, 9 range that would comlete the ring + % as well if {0, 4} and {10, ?RING_END} were present. So report + {2, [sh(0, 2), sh(3, ?RING_END), sh(5, 9), full()], [sh(0, 2), sh(3, + ?RING_END), full(), sh(0, 4), sh(5, 9), sh(10, ?RING_END)], + [ + {{0, 2}, 1}, + {{0, 4}, 0}, + {{0, ?RING_END}, 1}, + {{3, ?RING_END}, 1}, + {{5, 9}, 1}, + {{10, ?RING_END}, 0} + ] + } + ]]. + + +full() -> + #shard{range = [0, ?RING_END]}. + + +sh(B, E) -> + #shard{range = [B, E]}. + +-endif. |