summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <robert.newson@gmail.com>2013-11-20 04:53:57 -0800
committerRobert Newson <robert.newson@gmail.com>2013-11-20 04:53:57 -0800
commit32d7cef7e7e0fb286c5f0f3f56a76ba211dfe92c (patch)
tree69d68bc1ba1ee5cac72a516a63fadd59f1a4398a
parent2bf1a8c5e16ca5ef7a72699a68f5a713d70db4f1 (diff)
parent60ec1b4cdc91ab5c6a33545b525e399535fa61e5 (diff)
downloadcouchdb-32d7cef7e7e0fb286c5f0f3f56a76ba211dfe92c.tar.gz
Merge pull request #14 from cloudant/25189-safety-and-liveness
Account for true maintenance mode
-rw-r--r--src/custodian/src/custodian_db_checker.erl22
-rw-r--r--src/custodian/src/custodian_server.erl88
-rw-r--r--src/custodian/src/custodian_util.erl89
3 files changed, 122 insertions, 77 deletions
diff --git a/src/custodian/src/custodian_db_checker.erl b/src/custodian/src/custodian_db_checker.erl
index 02bd0d486..2dd770674 100644
--- a/src/custodian/src/custodian_db_checker.erl
+++ b/src/custodian/src/custodian_db_checker.erl
@@ -151,20 +151,16 @@ get_bacon_db() ->
send_missing_db_alert(DbName) ->
twig:log(notice, "Missing system database ~s", [DbName]),
Command = [
- "send_snmptrap",
- "--trap",
- "CLOUDANT-DBCORE-MIB::cloudantDbcoreMissingDbEvent",
- "-o",
- "'cloudantDbcoreDbName:STRING:" ++ binary_to_list(DbName) ++ "'"
- ],
- os:cmd(string:join(Command, " ")).
-
+ "send-sensu-event --standalone --critical",
+ " --output=\"Missing system database ",
+ binary_to_list(DbName),
+ "\" --handler=default custodian-missing-db-check"],
+ os:cmd(lists:concat(Command)).
clear_missing_dbs_alert() ->
twig:log(notice, "All system databases exist.", []),
Command = [
- "send_snmptrap",
- "--trap",
- "CLOUDANT-DBCORE-MIB::cloudantDbcoreAllDbsAvailableEvent"
- ],
- os:cmd(string:join(Command, " ")).
+ "send-sensu-event --standalone --ok",
+ " --output=\"All system databases exist\"",
+ " --handler=default custodian-missing-db-check"],
+ os:cmd(lists:concat(Command)).
diff --git a/src/custodian/src/custodian_server.erl b/src/custodian/src/custodian_server.erl
index 55f7b46b1..39d93744e 100644
--- a/src/custodian/src/custodian_server.erl
+++ b/src/custodian/src/custodian_server.erl
@@ -2,6 +2,7 @@
-module(custodian_server).
-behaviour(gen_server).
+-behaviour(config_listener).
% public api.
-export([start_link/0]).
@@ -16,6 +17,9 @@
handle_db_event/3
]).
+% config_listener callback
+-export([handle_config_change/5]).
+
% private records.
-record(state, {
event_listener,
@@ -28,10 +32,17 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+handle_config_change("cloudant", "maintenance_mode", _, _, S) ->
+ ok = gen_server:cast(S, refresh),
+ {ok, S};
+handle_config_change(_, _, _, _, S) ->
+ {ok, S}.
+
% gen_server functions.
init(_) ->
process_flag(trap_exit, true),
net_kernel:monitor_nodes(true),
+ ok = config:listen_for_changes(?MODULE, self()),
{ok, LisPid} = start_event_listener(),
{ok, start_shard_checker(#state{
event_listener=LisPid
@@ -43,6 +54,14 @@ handle_call(_Msg, _From, State) ->
handle_cast(refresh, State) ->
{noreply, start_shard_checker(State)}.
+handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
+ erlang:send_after(5000, self(), restart_config_listener),
+ {noreply, State};
+
+handle_info(restart_config_listener, State) ->
+ ok = config:listen_for_changes(?MODULE, self()),
+ {noreply, State};
+
handle_info({nodeup, _}, State) ->
{noreply, start_shard_checker(State)};
@@ -79,6 +98,9 @@ code_change(_OldVsn, {state, Pid}, _Extra) ->
shard_checker=undefined,
rescan=false
}};
+code_change("0.2.7", State, _Extra) ->
+ ok = config:listen_for_changes(?MODULE, self()),
+ {ok, State};
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -106,31 +128,43 @@ handle_db_event(_DbName, _Event, _St) ->
{ok, nil}.
check_shards() ->
- {Unavailable, OneCopy, Impaired, Conflicted} = custodian:summary(),
- send_conflicted_alert(Conflicted),
- send_unavailable_alert(Unavailable),
- send_one_copy_alert(OneCopy),
- send_impaired_alert(Impaired).
-
-%% specific alert functions
-send_conflicted_alert(Count) ->
- send_snmp_alert(Count, "partition tables conflicted", "NoPartitionTablesConflictedEvent", "PartitionTablesConflictedEvent").
-
-send_impaired_alert(Count) ->
- send_snmp_alert(Count, "shards impaired", "AllShardsUnimpairedEvent", "ShardsImpairedEvent").
-
-send_unavailable_alert(Count) ->
- send_snmp_alert(Count, "unavailable shards", "AllShardsAvailableEvent", "ShardsUnavailableEvent").
-
-send_one_copy_alert(Count) ->
- send_snmp_alert(Count, "shards with only one copy", "AllShardsMultipleCopiesEvent", "ShardsOneCopyEvent").
-
-%% generic SNMP alert functions
-send_snmp_alert(0, AlertType, ClearMib, _) ->
- twig:log(notice, "No ~s in this cluster", [AlertType]),
- Cmd = lists:concat(["send_snmptrap --trap CLOUDANT-DBCORE-MIB::cloudantDbcore", ClearMib]),
- os:cmd(Cmd);
-send_snmp_alert(Count, AlertType, _, AlertMib) when is_integer(Count) ->
- twig:log(crit, "~B ~s in this cluster", [Count, AlertType]),
- Cmd = lists:concat(["send_snmptrap --trap CLOUDANT-DBCORE-MIB::cloudantDbcore", AlertMib," -o cloudantDbcoreShardCount:INTEGER:", Count]),
+ [send_sensu_event(Item) || Item <- custodian:summary()].
+
+send_sensu_event({_, Count} = Item) ->
+ if Count > 0 -> twig:log(crit, "~s", [describe(Item)]); true -> ok end,
+ Cmd = lists:concat(["send-sensu-event --standalone ",
+ level(Item),
+ " --output=\"",
+ describe(Item),
+ "\" ",
+ check_name(Item)]),
os:cmd(Cmd).
+
+level({_, 0}) ->
+ "--ok";
+level(_) ->
+ "--critical".
+
+describe({{safe, N}, Count}) ->
+ lists:concat([Count, " ", shards(Count), " in cluster with only ", N,
+ " ", copies(N), " on nodes that are currently up"]);
+describe({{live, N}, Count}) ->
+ lists:concat([Count, " ", shards(Count), " in cluster with only ",
+ N, " ", copies(N), " on nodes not in maintenance mode"]);
+describe({conflicted, Count}) ->
+ lists:concat([Count, " conflicted ", shards(Count), " in cluster"]).
+
+check_name({{Type, N}, _}) ->
+ lists:concat(["custodian-", N, "-", Type, "-shards-check"]);
+check_name({Type, _}) ->
+ lists:concat(["custodian-", Type, "-shards-check"]).
+
+shards(1) ->
+ "shard";
+shards(_) ->
+ "shards".
+
+copies(1) ->
+ "copy";
+copies(_) ->
+ "copies".
diff --git a/src/custodian/src/custodian_util.erl b/src/custodian/src/custodian_util.erl
index 3a4ceb0f9..5de1753cf 100644
--- a/src/custodian/src/custodian_util.erl
+++ b/src/custodian/src/custodian_util.erl
@@ -9,27 +9,24 @@
-export([summary/0, report/0]).
-export([ensure_dbs_exists/0]).
+-record(state, {live, safe, n, callback, db, acc}).
+
%% public functions.
summary() ->
- Fun = fun(_Id, _Range, unavailable, {U, O, I, C}) ->
- {U + 1, O, I, C};
- (_Id, _Range, {impaired, 1}, {U, O, I, C}) ->
- {U, O + 1, I, C};
- (_Id, _Range, {impaired, _N}, {U, O, I, C}) ->
- {U, O, I + 1, C};
- (_Id, _Range, {conflicted, _N}, {U, O, I, C}) ->
- {U, O, I, C + 1}
- end,
- fold_dbs({0, 0, 0, 0}, Fun).
+ Dict0 = dict:from_list([{conflicted, 0}] ++
+ [{{live, N}, 0} || N <- lists:seq(0, cluster_n() - 1)] ++
+ [{{safe, N}, 0} || N <- lists:seq(0, cluster_n() - 1)]),
+ Fun = fun(_Id, _Range, Item, Dict) ->
+ dict:update_counter(Item, 1, Dict)
+ end,
+ dict:to_list(fold_dbs(Dict0, Fun)).
report() ->
- Fun = fun(Id, Range, unavailable, Acc) ->
- [{Id, Range, unavailable}|Acc];
- (Id, Range, {impaired, N}, Acc) ->
- [{Id, Range, {impaired, N}}|Acc];
- (Id, _Range, {conflicted, N}, Acc) ->
- [{Id, {conflicted, N}}|Acc]
+ Fun = fun(Id, _Range, {conflicted, N}, Acc) ->
+ [{Id, {conflicted, N}} | Acc];
+ (Id, Range, Item, Acc) ->
+ [{Id, Range, Item} | Acc]
end,
fold_dbs([], Fun).
@@ -41,13 +38,15 @@ ensure_dbs_exists() ->
%% private functions.
-fold_dbs(Acc0, Fun) ->
- Live = [node() | nodes()],
- N = list_to_integer(config:get("cluster", "n", "3")),
+fold_dbs(Acc, Fun) ->
+ Safe = maybe_redirect([node() | nodes()]),
+ Live = Safe -- maintenance_nodes(Safe),
+ N = cluster_n(),
{ok, Db} = ensure_dbs_exists(),
try
- {ok, _, {_, _, _, _, Acc1}} = couch_db:enum_docs(Db, fun fold_dbs/3, {Live, N, Fun, Db, Acc0}, []),
- Acc1
+ State0 = #state{live=Live, safe=Safe, n=N, callback=Fun, db=Db, acc=Acc},
+ {ok, _, State1} = couch_db:enum_docs(Db, fun fold_dbs/3, State0, []),
+ State1#state.acc
after
couch_db:close(Db)
end.
@@ -56,31 +55,47 @@ fold_dbs(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) ->
{ok, Acc};
fold_dbs(#full_doc_info{deleted=true}, _, Acc) ->
{ok, Acc};
-fold_dbs(#full_doc_info{id = Id} = FDI, _, {_Live, _N, Fun, Db, Acc0} = Acc) ->
+fold_dbs(#full_doc_info{id = Id} = FDI, _, State) ->
InternalAcc = case count_conflicts(FDI) of
0 ->
- Acc0;
+ State#state.acc;
ConflictCount ->
- Fun(Id, null, {conflicted, ConflictCount}, Acc0)
+ (State#state.callback)(Id, null, {conflicted, ConflictCount}, State#state.acc)
end,
- Shards = load_shards(Db, FDI),
+ Shards = load_shards(State#state.db, FDI),
Rs = [R || #shard{range=R} <- lists:ukeysort(#shard.range, Shards)],
ActualN = [{R1, [N || #shard{node=N,range=R2} <- Shards, R1 == R2]} || R1 <- Rs],
- fold_dbs(Id, ActualN, setelement(5, Acc, InternalAcc));
+ fold_dbs(Id, ActualN, State#state{acc=InternalAcc});
fold_dbs(_Id, [], Acc) ->
{ok, Acc};
-fold_dbs(Id, [{Range, Nodes}|Rest], {Live, N, Fun, Db, Acc0}) ->
- Nodes1 = maybe_redirect(Nodes),
- Nodes2 = [Node || Node <- Nodes1, lists:member(Node, Live)],
- Acc1 = case length(Nodes2) of
- 0 ->
- Fun(Id, Range, unavailable, Acc0);
- N1 when N1 < N ->
- Fun(Id, Range, {impaired, N1}, Acc0);
- _ ->
- Acc0
+fold_dbs(Id, [{Range, Nodes}|Rest], State) ->
+ Live = [Node || Node <- Nodes, lists:member(Node, State#state.live)],
+ Safe = [Node || Node <- Nodes, lists:member(Node, State#state.safe)],
+ TargetN = State#state.n,
+ Acc0 = State#state.acc,
+
+ Acc1 = case length(Live) of
+ TargetN ->
+ Acc0;
+ LiveN ->
+ (State#state.callback)(Id, Range, {live, LiveN}, Acc0)
end,
- fold_dbs(Id, Rest, {Live, N, Fun, Db, Acc1}).
+
+ Acc2 = case length(Safe) of
+ TargetN ->
+ Acc1;
+ SafeN ->
+ (State#state.callback)(Id, Range, {safe, SafeN}, Acc1)
+ end,
+
+ fold_dbs(Id, Rest, State#state{acc=Acc2}).
+
+cluster_n() ->
+ list_to_integer(config:get("cluster", "n", "3")).
+
+maintenance_nodes(Nodes) ->
+ {Modes, _} = rpc:multicall(Nodes, config, get, ["cloudant", "maintenance_mode"]),
+ [N || {N, Mode} <- lists:zip(Nodes, Modes), Mode =:= "true"].
load_shards(Db, #full_doc_info{id = Id} = FDI) ->
case couch_db:open_doc(Db, FDI, []) of