summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-01-13 18:21:58 -0500
committerNick Vatamaniuc <nickva@users.noreply.github.com>2020-01-14 11:00:49 -0500
commit3573dcc10d7408f819e87c4c6454519cf1fd0c84 (patch)
tree05d2a446989486c87df319539494e3bf04222e4c
parent0a20de6ffb95f00a73e9eea85004112e20ad52ff (diff)
downloadcouchdb-3573dcc10d7408f819e87c4c6454519cf1fd0c84.tar.gz
Preserve replication job stats when jobs are re-created
Previously we made sure replication job statistics were preserved when the jobs were started and stopped by the scheduler. However, if a db node restarted or user re-created the job, replication stats would be reset to 0. Some statistics like `docs_read` and `docs_written` are perhaps not as critical. However `doc_write_failures` is. That is the indicator that some replication docs have not replicated to the target. Not preserving that statistic meant users could perceive there was a data loss during replication -- data was replicated successfully according to the replication job with no write failures, user deletes source database, then some times later noticed some of their data is missing. These statistics were already logged in the checkpoint history and we just had to initialize a stats object from them when a replication job starts. In that initialization code we pick the highest values from either the running scheduler or the checkpointed log. The reason is that the running stats could be higher if say job was stopped suddenly and failed to checkpoint but scheduler retained the data. Fixes: #2414
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl12
-rw-r--r--src/couch_replicator/src/couch_replicator_stats.erl38
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl145
-rw-r--r--test/elixir/test/replication_test.exs72
4 files changed, 185 insertions, 82 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 12d3e5530..0b33419e1 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -565,7 +565,7 @@ init_state(Rep) ->
options = Options,
type = Type, view = View,
start_time = StartTime,
- stats = Stats
+ stats = ArgStats0
} = Rep,
% Adjust minimum number of http source connections to 2 to avoid deadlock
Src = adjust_maxconn(Src0, BaseId),
@@ -580,6 +580,14 @@ init_state(Rep) ->
[SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep),
{StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
+
+ ArgStats1 = couch_replicator_stats:new(ArgStats0),
+ HistoryStats = case History of
+ [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps);
+ _ -> couch_replicator_stats:new()
+ end,
+ Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats),
+
StartSeq1 = get_value(since_seq, Options, StartSeq0),
StartSeq = {0, StartSeq1},
@@ -609,7 +617,7 @@ init_state(Rep) ->
?DEFAULT_CHECKPOINT_INTERVAL),
type = Type,
view = View,
- stats = couch_replicator_stats:new(Stats)
+ stats = Stats
},
State#rep_state{timer = start_timer(State)}.
diff --git a/src/couch_replicator/src/couch_replicator_stats.erl b/src/couch_replicator/src/couch_replicator_stats.erl
index cd62949e9..37848b3ee 100644
--- a/src/couch_replicator/src/couch_replicator_stats.erl
+++ b/src/couch_replicator/src/couch_replicator_stats.erl
@@ -17,7 +17,8 @@
new/1,
get/2,
increment/2,
- sum_stats/2
+ sum_stats/2,
+ max_stats/2
]).
-export([
@@ -64,14 +65,29 @@ increment(Field, Stats) ->
sum_stats(S1, S2) ->
orddict:merge(fun(_, V1, V2) -> V1+V2 end, S1, S2).
+max_stats(S1, S2) ->
+ orddict:merge(fun(_, V1, V2) -> max(V1, V2) end, S1, S2).
-% Handle initializing from a status object which uses same values but different
-% field names.
-fmap({revisions_checked, V}) -> {true, {missing_checked, V}};
-fmap({missing_revisions_found, V}) -> {true, {missing_found, V}};
-fmap({missing_checked, _}) -> true;
-fmap({missing_found, _}) -> true;
-fmap({docs_read, _}) -> true;
-fmap({docs_written, _}) -> true;
-fmap({doc_write_failures, _}) -> true;
-fmap({_, _}) -> false.
+
+% Handle initializing from a status object, which uses same values but
+% different field names, as well as from ejson props from the checkpoint
+% history
+%
+fmap({missing_found, _}) -> true;
+fmap({missing_revisions_found, V}) -> {true, {missing_found, V}};
+fmap({<<"missing_found">>, V}) -> {true, {missing_found, V}};
+
+fmap({missing_checked, _}) -> true;
+fmap({revisions_checked, V}) -> {true, {missing_checked, V}};
+fmap({<<"missing_checked">>, V}) -> {true, {missing_checked, V}};
+
+fmap({docs_read, _}) -> true;
+fmap({<<"docs_read">>, V}) -> {true, {docs_read, V}};
+
+fmap({docs_written, _}) -> true;
+fmap({<<"docs_written">>, V}) -> {true, {docs_written, V}};
+
+fmap({doc_write_failures, _}) -> true;
+fmap({<<"doc_write_failures">>, V}) -> {true, {doc_write_failures, V}};
+
+fmap({_, _}) -> false.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
index 9dd86b3ef..037f37191 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
@@ -18,48 +18,93 @@
-define(DELAY, 500).
-define(TIMEOUT, 60000).
--define(i2l(I), integer_to_list(I)).
--define(io2b(Io), iolist_to_binary(Io)).
+
+
+setup_all() ->
+ test_util:start_couch([couch_replicator, chttpd, mem3, fabric]).
+
+
+teardown_all(Ctx) ->
+ ok = test_util:stop_couch(Ctx).
setup() ->
- Ctx = test_util:start_couch([couch_replicator, chttpd, mem3, fabric]),
Source = setup_db(),
Target = setup_db(),
- {Ctx, {Source, Target}}.
+ {Source, Target}.
-teardown({Ctx, {Source, Target}}) ->
+teardown({Source, Target}) ->
teardown_db(Source),
teardown_db(Target),
- ok = application:stop(couch_replicator),
- ok = test_util:stop_couch(Ctx).
+ ok.
stats_retained_test_() ->
{
setup,
- fun setup/0,
- fun teardown/1,
- fun t_stats_retained/1
+ fun setup_all/0,
+ fun teardown_all/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ fun t_stats_retained_by_scheduler/1,
+ fun t_stats_retained_on_job_removal/1
+ ]
+ }
}.
-t_stats_retained({_Ctx, {Source, Target}}) ->
+t_stats_retained_by_scheduler({Source, Target}) ->
?_test(begin
- populate_db(Source, 42),
+ {ok, _} = add_vdu(Target),
+ populate_db_reject_even_docs(Source, 1, 10),
{ok, RepPid, RepId} = replicate(Source, Target),
+ wait_target_in_sync(6, Target),
- wait_target_in_sync(Source, Target),
- check_active_tasks(42, 42),
- check_scheduler_jobs(42, 42),
+ check_active_tasks(10, 5, 5),
+ check_scheduler_jobs(10, 5, 5),
stop_job(RepPid),
- check_scheduler_jobs(42, 42),
+ check_scheduler_jobs(10, 5, 5),
start_job(),
- check_active_tasks(42, 42),
- check_scheduler_jobs(42, 42),
+ check_active_tasks(10, 5, 5),
+ check_scheduler_jobs(10, 5, 5),
+ couch_replicator_scheduler:remove_job(RepId)
+ end).
+
+
+t_stats_retained_on_job_removal({Source, Target}) ->
+ ?_test(begin
+ {ok, _} = add_vdu(Target),
+ populate_db_reject_even_docs(Source, 1, 10),
+ {ok, _, RepId} = replicate(Source, Target),
+ wait_target_in_sync(6, Target), % 5 + 1 vdu
+
+ check_active_tasks(10, 5, 5),
+ check_scheduler_jobs(10, 5, 5),
+
+ couch_replicator_scheduler:remove_job(RepId),
+
+ populate_db_reject_even_docs(Source, 11, 20),
+ {ok, _, RepId} = replicate(Source, Target),
+ wait_target_in_sync(11, Target), % 6 + 5
+
+ check_scheduler_jobs(20, 10, 10),
+ check_active_tasks(20, 10, 10),
+
+ couch_replicator_scheduler:remove_job(RepId),
+
+ populate_db_reject_even_docs(Source, 21, 30),
+ {ok, _, RepId} = replicate(Source, Target),
+ wait_target_in_sync(16, Target), % 11 + 5
+
+ check_scheduler_jobs(30, 15, 15),
+ check_active_tasks(30, 15, 15),
+
couch_replicator_scheduler:remove_job(RepId)
end).
@@ -92,14 +137,16 @@ start_job() ->
couch_replicator_scheduler:reschedule().
-check_active_tasks(DocsRead, DocsWritten) ->
+check_active_tasks(DocsRead, DocsWritten, DocsFailed) ->
RepTask = wait_for_task_status(),
?assertNotEqual(timeout, RepTask),
?assertEqual(DocsRead, couch_util:get_value(docs_read, RepTask)),
- ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)).
+ ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)),
+ ?assertEqual(DocsFailed, couch_util:get_value(doc_write_failures,
+ RepTask)).
-check_scheduler_jobs(DocsRead, DocsWritten) ->
+check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) ->
Info = wait_scheduler_info(),
?assert(maps:is_key(<<"changes_pending">>, Info)),
?assert(maps:is_key(<<"doc_write_failures">>, Info)),
@@ -110,7 +157,8 @@ check_scheduler_jobs(DocsRead, DocsWritten) ->
?assert(maps:is_key(<<"source_seq">>, Info)),
?assert(maps:is_key(<<"revisions_checked">>, Info)),
?assertMatch(#{<<"docs_read">> := DocsRead}, Info),
- ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info).
+ ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info),
+ ?assertMatch(#{<<"doc_write_failures">> := DocFailed}, Info).
replication_tasks() ->
@@ -138,25 +186,31 @@ wait_scheduler_info() ->
end).
-populate_db(DbName, DocCount) ->
+populate_db_reject_even_docs(DbName, Start, End) ->
+ BodyFun = fun(Id) ->
+ case Id rem 2 == 0 of
+ true -> {[{<<"nope">>, true}]};
+ false -> {[]}
+ end
+ end,
+ populate_db(DbName, Start, End, BodyFun).
+
+
+populate_db(DbName, Start, End, BodyFun) when is_function(BodyFun, 1) ->
{ok, Db} = couch_db:open_int(DbName, []),
Docs = lists:foldl(
fun(DocIdCounter, Acc) ->
- Id = ?io2b(["doc", ?i2l(DocIdCounter)]),
- Doc = #doc{id = Id, body = {[]}},
+ Id = integer_to_binary(DocIdCounter),
+ Doc = #doc{id = Id, body = BodyFun(DocIdCounter)},
[Doc | Acc]
end,
- [], lists:seq(1, DocCount)),
+ [], lists:seq(Start, End)),
{ok, _} = couch_db:update_docs(Db, Docs, []),
ok = couch_db:close(Db).
-wait_target_in_sync(Source, Target) ->
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
- ok = couch_db:close(SourceDb),
- SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
- wait_target_in_sync_loop(SourceDocCount, Target, 300).
+wait_target_in_sync(DocCount, Target) when is_integer(DocCount) ->
+ wait_target_in_sync_loop(DocCount, Target, 300).
wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
@@ -170,7 +224,7 @@ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
{ok, TargetInfo} = couch_db:get_db_info(Target),
ok = couch_db:close(Target),
TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
- case TargetDocCount == DocCount of
+ case TargetDocCount == DocCount of
true ->
true;
false ->
@@ -201,3 +255,28 @@ scheduler_jobs() ->
{ok, 200, _, Body} = test_request:get(Url, []),
Json = jiffy:decode(Body, [return_maps]),
maps:get(<<"jobs">>, Json).
+
+
+vdu() ->
+ <<"function(newDoc, oldDoc, userCtx) {
+ if(newDoc.nope === true) {
+ throw({forbidden: 'nope'});
+ } else {
+ return;
+ }
+ }">>.
+
+
+add_vdu(DbName) ->
+ DocProps = [
+ {<<"_id">>, <<"_design/vdu">>},
+ {<<"language">>, <<"javascript">>},
+ {<<"validate_doc_update">>, vdu()}
+ ],
+ Doc = couch_doc:from_json_obj({DocProps}, []),
+ {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
+ try
+ {ok, _Rev} = couch_db:update_doc(Db, Doc, [])
+ after
+ couch_db:close(Db)
+ end.
diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs
index 73ceca6a4..bdd683e97 100644
--- a/test/elixir/test/replication_test.exs
+++ b/test/elixir/test/replication_test.exs
@@ -75,8 +75,8 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 2
history = Enum.at(result["history"], 0)
- assert history["docs_written"] == 1
- assert history["docs_read"] == 1
+ assert history["docs_written"] == 2
+ assert history["docs_read"] == 2
assert history["doc_write_failures"] == 0
query = %{
@@ -352,10 +352,10 @@ defmodule ReplicationTest do
assert history["session_id"] == result["session_id"]
assert is_binary(history["start_time"])
assert is_binary(history["end_time"])
- assert history["missing_checked"] == 6
- assert history["missing_found"] == 6
- assert history["docs_read"] == 6
- assert history["docs_written"] == 6
+ assert history["missing_checked"] == 27
+ assert history["missing_found"] == 27
+ assert history["docs_read"] == 27
+ assert history["docs_written"] == 27
assert history["doc_write_failures"] == 0
copy = Couch.get!("/#{tgt_db_name}/#{new_doc["_id"]}").body
@@ -414,10 +414,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 3
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 1
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 28
+ assert history["missing_found"] == 28
+ assert history["docs_read"] == 28
+ assert history["docs_written"] == 28
assert history["doc_write_failures"] == 0
resp = Couch.get("/#{tgt_db_name}/#{del_doc["_id"]}")
@@ -446,10 +446,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 4
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 1
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 29
+ assert history["missing_found"] == 29
+ assert history["docs_read"] == 29
+ assert history["docs_written"] == 29
assert history["doc_write_failures"] == 0
copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
@@ -473,10 +473,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 5
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 1
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 30
+ assert history["missing_found"] == 30
+ assert history["docs_read"] == 30
+ assert history["docs_written"] == 30
assert history["doc_write_failures"] == 0
copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
@@ -502,10 +502,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 6
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 1
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 31
+ assert history["missing_found"] == 31
+ assert history["docs_read"] == 31
+ assert history["docs_written"] == 31
assert history["doc_write_failures"] == 0
copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
@@ -534,10 +534,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 7
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 3
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 34
+ assert history["missing_found"] == 32
+ assert history["docs_read"] == 32
+ assert history["docs_written"] == 32
assert history["doc_write_failures"] == 0
docs = [
@@ -559,10 +559,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 8
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 2
- assert history["missing_found"] == 0
- assert history["docs_read"] == 0
- assert history["docs_written"] == 0
+ assert history["missing_checked"] == 36
+ assert history["missing_found"] == 32
+ assert history["docs_read"] == 32
+ assert history["docs_written"] == 32
assert history["doc_write_failures"] == 0
# Test nothing to replicate
@@ -822,10 +822,10 @@ defmodule ReplicationTest do
assert length(result["history"]) == 2
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 3
- assert history["missing_found"] == 3
- assert history["docs_read"] == 3
- assert history["docs_written"] == 3
+ assert history["missing_checked"] == 19
+ assert history["missing_found"] == 19
+ assert history["docs_read"] == 19
+ assert history["docs_written"] == 19
assert history["doc_write_failures"] == 0
end
@@ -1185,8 +1185,8 @@ defmodule ReplicationTest do
result = replicate(repl_src, repl_tgt, body: repl_body)
assert result["ok"]
- assert result["docs_read"] == 1
- assert result["docs_written"] == 1
+ assert result["docs_read"] == 2
+ assert result["docs_written"] == 2
assert result["doc_write_failures"] == 0
retry_until(fn ->