summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-01-13 18:21:58 -0500
committerNick Vatamaniuc <vatamane@apache.org>2020-01-14 00:59:00 -0500
commit679a12aac6ef11b37cf829c7d522c99c346a9285 (patch)
tree05d2a446989486c87df319539494e3bf04222e4c
parent66afc45015827cd55d8a5663f19da09e18c2780e (diff)
downloadcouchdb-persist-stats-across-job-removals.tar.gz
Preserve replication job stats when jobs are re-createdpersist-stats-across-job-removals
Previously we made sure replication job statistics were preserved when the jobs were started and stopped by the scheduler. However, if a VM node restarted or user re-created the job, replication stats would be reset to 0. Some statistics like `docs_read` and `docs_written` were perhaps not as critical. However `doc_write_failures` was. That is the only indicator that some replication docs have been skipped and not replicated to the target. Not preserving that statistic meant users could perceive a data loss. These statistics were already log in the checkpoint history and we just had to initialize a stats object from them when a replication job starts. In that initialization code we pick the highest values from either the running scheduler or the checkpointed log. The reason is that the running stats could be higher if say job was stopped suddenly and failed to checkpoint but scheduler retained the data. Fixes: https://github.com/apache/couchdb/issues/2414
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl12
-rw-r--r--src/couch_replicator/src/couch_replicator_stats.erl38
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl145
-rw-r--r--test/elixir/test/replication_test.exs72
4 files changed, 185 insertions, 82 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 12d3e5530..0b33419e1 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -565,7 +565,7 @@ init_state(Rep) ->
options = Options,
type = Type, view = View,
start_time = StartTime,
- stats = Stats
+ stats = ArgStats0
} = Rep,
% Adjust minimum number of http source connections to 2 to avoid deadlock
Src = adjust_maxconn(Src0, BaseId),
@@ -580,6 +580,14 @@ init_state(Rep) ->
[SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep),
{StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
+
+ ArgStats1 = couch_replicator_stats:new(ArgStats0),
+ HistoryStats = case History of
+ [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps);
+ _ -> couch_replicator_stats:new()
+ end,
+ Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats),
+
StartSeq1 = get_value(since_seq, Options, StartSeq0),
StartSeq = {0, StartSeq1},
@@ -609,7 +617,7 @@ init_state(Rep) ->
?DEFAULT_CHECKPOINT_INTERVAL),
type = Type,
view = View,
- stats = couch_replicator_stats:new(Stats)
+ stats = Stats
},
State#rep_state{timer = start_timer(State)}.
diff --git a/src/couch_replicator/src/couch_replicator_stats.erl b/src/couch_replicator/src/couch_replicator_stats.erl
index cd62949e9..37848b3ee 100644
--- a/src/couch_replicator/src/couch_replicator_stats.erl
+++ b/src/couch_replicator/src/couch_replicator_stats.erl
@@ -17,7 +17,8 @@
new/1,
get/2,
increment/2,
- sum_stats/2
+ sum_stats/2,
+ max_stats/2
]).
-export([
@@ -64,14 +65,29 @@ increment(Field, Stats) ->
sum_stats(S1, S2) ->
orddict:merge(fun(_, V1, V2) -> V1+V2 end, S1, S2).
+max_stats(S1, S2) ->
+ orddict:merge(fun(_, V1, V2) -> max(V1, V2) end, S1, S2).
-% Handle initializing from a status object which uses same values but different
-% field names.
-fmap({revisions_checked, V}) -> {true, {missing_checked, V}};
-fmap({missing_revisions_found, V}) -> {true, {missing_found, V}};
-fmap({missing_checked, _}) -> true;
-fmap({missing_found, _}) -> true;
-fmap({docs_read, _}) -> true;
-fmap({docs_written, _}) -> true;
-fmap({doc_write_failures, _}) -> true;
-fmap({_, _}) -> false.
+
+% Handle initializing from a status object, which uses same values but
+% different field names, as well as from ejson props from the checkpoint
+% history
+%
+fmap({missing_found, _}) -> true;
+fmap({missing_revisions_found, V}) -> {true, {missing_found, V}};
+fmap({<<"missing_found">>, V}) -> {true, {missing_found, V}};
+
+fmap({missing_checked, _}) -> true;
+fmap({revisions_checked, V}) -> {true, {missing_checked, V}};
+fmap({<<"missing_checked">>, V}) -> {true, {missing_checked, V}};
+
+fmap({docs_read, _}) -> true;
+fmap({<<"docs_read">>, V}) -> {true, {docs_read, V}};
+
+fmap({docs_written, _}) -> true;
+fmap({<<"docs_written">>, V}) -> {true, {docs_written, V}};
+
+fmap({doc_write_failures, _}) -> true;
+fmap({<<"doc_write_failures">>, V}) -> {true, {doc_write_failures, V}};
+
+fmap({_, _}) -> false.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
index 9dd86b3ef..037f37191 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
@@ -18,48 +18,93 @@
-define(DELAY, 500).
-define(TIMEOUT, 60000).
--define(i2l(I), integer_to_list(I)).
--define(io2b(Io), iolist_to_binary(Io)).
+
+
+setup_all() ->
+ test_util:start_couch([couch_replicator, chttpd, mem3, fabric]).
+
+
+teardown_all(Ctx) ->
+ ok = test_util:stop_couch(Ctx).
setup() ->
- Ctx = test_util:start_couch([couch_replicator, chttpd, mem3, fabric]),
Source = setup_db(),
Target = setup_db(),
- {Ctx, {Source, Target}}.
+ {Source, Target}.
-teardown({Ctx, {Source, Target}}) ->
+teardown({Source, Target}) ->
teardown_db(Source),
teardown_db(Target),
- ok = application:stop(couch_replicator),
- ok = test_util:stop_couch(Ctx).
+ ok.
stats_retained_test_() ->
{
setup,
- fun setup/0,
- fun teardown/1,
- fun t_stats_retained/1
+ fun setup_all/0,
+ fun teardown_all/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ fun t_stats_retained_by_scheduler/1,
+ fun t_stats_retained_on_job_removal/1
+ ]
+ }
}.
-t_stats_retained({_Ctx, {Source, Target}}) ->
+t_stats_retained_by_scheduler({Source, Target}) ->
?_test(begin
- populate_db(Source, 42),
+ {ok, _} = add_vdu(Target),
+ populate_db_reject_even_docs(Source, 1, 10),
{ok, RepPid, RepId} = replicate(Source, Target),
+ wait_target_in_sync(6, Target),
- wait_target_in_sync(Source, Target),
- check_active_tasks(42, 42),
- check_scheduler_jobs(42, 42),
+ check_active_tasks(10, 5, 5),
+ check_scheduler_jobs(10, 5, 5),
stop_job(RepPid),
- check_scheduler_jobs(42, 42),
+ check_scheduler_jobs(10, 5, 5),
start_job(),
- check_active_tasks(42, 42),
- check_scheduler_jobs(42, 42),
+ check_active_tasks(10, 5, 5),
+ check_scheduler_jobs(10, 5, 5),
+ couch_replicator_scheduler:remove_job(RepId)
+ end).
+
+
+t_stats_retained_on_job_removal({Source, Target}) ->
+ ?_test(begin
+ {ok, _} = add_vdu(Target),
+ populate_db_reject_even_docs(Source, 1, 10),
+ {ok, _, RepId} = replicate(Source, Target),
+ wait_target_in_sync(6, Target), % 5 + 1 vdu
+
+ check_active_tasks(10, 5, 5),
+ check_scheduler_jobs(10, 5, 5),
+
+ couch_replicator_scheduler:remove_job(RepId),
+
+ populate_db_reject_even_docs(Source, 11, 20),
+ {ok, _, RepId} = replicate(Source, Target),
+ wait_target_in_sync(11, Target), % 6 + 5
+
+ check_scheduler_jobs(20, 10, 10),
+ check_active_tasks(20, 10, 10),
+
+ couch_replicator_scheduler:remove_job(RepId),
+
+ populate_db_reject_even_docs(Source, 21, 30),
+ {ok, _, RepId} = replicate(Source, Target),
+ wait_target_in_sync(16, Target), % 11 + 5
+
+ check_scheduler_jobs(30, 15, 15),
+ check_active_tasks(30, 15, 15),
+
couch_replicator_scheduler:remove_job(RepId)
end).
@@ -92,14 +137,16 @@ start_job() ->
couch_replicator_scheduler:reschedule().
-check_active_tasks(DocsRead, DocsWritten) ->
+check_active_tasks(DocsRead, DocsWritten, DocsFailed) ->
RepTask = wait_for_task_status(),
?assertNotEqual(timeout, RepTask),
?assertEqual(DocsRead, couch_util:get_value(docs_read, RepTask)),
- ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)).
+ ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)),
+ ?assertEqual(DocsFailed, couch_util:get_value(doc_write_failures,
+ RepTask)).
-check_scheduler_jobs(DocsRead, DocsWritten) ->
+check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) ->
Info = wait_scheduler_info(),
?assert(maps:is_key(<<"changes_pending">>, Info)),
?assert(maps:is_key(<<"doc_write_failures">>, Info)),
@@ -110,7 +157,8 @@ check_scheduler_jobs(DocsRead, DocsWritten) ->
?assert(maps:is_key(<<"source_seq">>, Info)),
?assert(maps:is_key(<<"revisions_checked">>, Info)),
?assertMatch(#{<<"docs_read">> := DocsRead}, Info),
- ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info).
+ ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info),
+ ?assertMatch(#{<<"doc_write_failures">> := DocFailed}, Info).
replication_tasks() ->
@@ -138,25 +186,31 @@ wait_scheduler_info() ->
end).
-populate_db(DbName, DocCount) ->
+populate_db_reject_even_docs(DbName, Start, End) ->
+ BodyFun = fun(Id) ->
+ case Id rem 2 == 0 of
+ true -> {[{<<"nope">>, true}]};
+ false -> {[]}
+ end
+ end,
+ populate_db(DbName, Start, End, BodyFun).
+
+
+populate_db(DbName, Start, End, BodyFun) when is_function(BodyFun, 1) ->
{ok, Db} = couch_db:open_int(DbName, []),
Docs = lists:foldl(
fun(DocIdCounter, Acc) ->
- Id = ?io2b(["doc", ?i2l(DocIdCounter)]),
- Doc = #doc{id = Id, body = {[]}},
+ Id = integer_to_binary(DocIdCounter),
+ Doc = #doc{id = Id, body = BodyFun(DocIdCounter)},
[Doc | Acc]
end,
- [], lists:seq(1, DocCount)),
+ [], lists:seq(Start, End)),
{ok, _} = couch_db:update_docs(Db, Docs, []),
ok = couch_db:close(Db).
-wait_target_in_sync(Source, Target) ->
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
- ok = couch_db:close(SourceDb),
- SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
- wait_target_in_sync_loop(SourceDocCount, Target, 300).
+wait_target_in_sync(DocCount, Target) when is_integer(DocCount) ->
+ wait_target_in_sync_loop(DocCount, Target, 300).
wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
@@ -170,7 +224,7 @@ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
{ok, TargetInfo} = couch_db:get_db_info(Target),
ok = couch_db:close(Target),
TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
- case TargetDocCount == DocCount of
+ case TargetDocCount == DocCount of
true ->
true;
false ->
@@ -201,3 +255,28 @@ scheduler_jobs() ->
{ok, 200, _, Body} = test_request:get(Url, []),
Json = jiffy:decode(Body, [return_maps]),
maps:get(<<"jobs">>, Json).
+
+
+vdu() ->
+ <<"function(newDoc, oldDoc, userCtx) {
+ if(newDoc.nope === true) {
+ throw({forbidden: 'nope'});
+ } else {
+ return;
+ }
+ }">>.
+
+
+add_vdu(DbName) ->
+ DocProps = [
+ {<<"_id">>, <<"_design/vdu">>},
+ {<<"language">>, <<"javascript">>},
+ {<<"validate_doc_update">>, vdu()}
+ ],
+ Doc = couch_doc:from_json_obj({DocProps}, []),
+ {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
+ try
+ {ok, _Rev} = couch_db:update_doc(Db, Doc, [])
+ after
+ couch_db:close(Db)
+ end.
diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs
index 73ceca6a4..bdd683e97 100644
--- a/test/elixir/test/replication_test.exs
+++ b/test/elixir/test/replication_test.exs
@@ -75,8 +75,8 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 2
history = Enum.at(result["history"], 0)
- assert history["docs_written"] == 1
- assert history["docs_read"] == 1
+ assert history["docs_written"] == 2
+ assert history["docs_read"] == 2
assert history["doc_write_failures"] == 0
query = %{
@@ -352,10 +352,10 @@ defmodule ReplicationTest do
assert history["session_id"] == result["session_id"]
assert is_binary(history["start_time"])
assert is_binary(history["end_time"])
- assert history["missing_checked"] == 6
- assert history["missing_found"] == 6
- assert history["docs_read"] == 6
- assert history["docs_written"] == 6
+ assert history["missing_checked"] == 27
+ assert history["missing_found"] == 27
+ assert history["docs_read"] == 27
+ assert history["docs_written"] == 27
assert history["doc_write_failures"] == 0
copy = Couch.get!("/#{tgt_db_name}/#{new_doc["_id"]}").body
@@ -414,10 +414,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 3
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 1
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 28
+ assert history["missing_found"] == 28
+ assert history["docs_read"] == 28
+ assert history["docs_written"] == 28
assert history["doc_write_failures"] == 0
resp = Couch.get("/#{tgt_db_name}/#{del_doc["_id"]}")
@@ -446,10 +446,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 4
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 1
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 29
+ assert history["missing_found"] == 29
+ assert history["docs_read"] == 29
+ assert history["docs_written"] == 29
assert history["doc_write_failures"] == 0
copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
@@ -473,10 +473,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 5
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 1
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 30
+ assert history["missing_found"] == 30
+ assert history["docs_read"] == 30
+ assert history["docs_written"] == 30
assert history["doc_write_failures"] == 0
copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
@@ -502,10 +502,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 6
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 1
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 31
+ assert history["missing_found"] == 31
+ assert history["docs_read"] == 31
+ assert history["docs_written"] == 31
assert history["doc_write_failures"] == 0
copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
@@ -534,10 +534,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 7
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 3
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 34
+ assert history["missing_found"] == 32
+ assert history["docs_read"] == 32
+ assert history["docs_written"] == 32
assert history["doc_write_failures"] == 0
docs = [
@@ -559,10 +559,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 8
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 2
- assert history["missing_found"] == 0
- assert history["docs_read"] == 0
- assert history["docs_written"] == 0
+ assert history["missing_checked"] == 36
+ assert history["missing_found"] == 32
+ assert history["docs_read"] == 32
+ assert history["docs_written"] == 32
assert history["doc_write_failures"] == 0
# Test nothing to replicate
@@ -822,10 +822,10 @@ defmodule ReplicationTest do
assert length(result["history"]) == 2
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 3
- assert history["missing_found"] == 3
- assert history["docs_read"] == 3
- assert history["docs_written"] == 3
+ assert history["missing_checked"] == 19
+ assert history["missing_found"] == 19
+ assert history["docs_read"] == 19
+ assert history["docs_written"] == 19
assert history["doc_write_failures"] == 0
end
@@ -1185,8 +1185,8 @@ defmodule ReplicationTest do
result = replicate(repl_src, repl_tgt, body: repl_body)
assert result["ok"]
- assert result["docs_read"] == 1
- assert result["docs_written"] == 1
+ assert result["docs_read"] == 2
+ assert result["docs_written"] == 2
assert result["doc_write_failures"] == 0
retry_until(fn ->