diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-01-13 18:21:58 -0500 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2020-01-14 11:00:49 -0500 |
commit | 3573dcc10d7408f819e87c4c6454519cf1fd0c84 (patch) | |
tree | 05d2a446989486c87df319539494e3bf04222e4c | |
parent | 0a20de6ffb95f00a73e9eea85004112e20ad52ff (diff) | |
download | couchdb-3573dcc10d7408f819e87c4c6454519cf1fd0c84.tar.gz |
Preserve replication job stats when jobs are re-created
Previously we made sure replication job statistics were preserved when
the jobs were started and stopped by the scheduler. However, if a db
node restarted or user re-created the job, replication stats would be
reset to 0.
Some statistics like `docs_read` and `docs_written` are perhaps not as
critical. However `doc_write_failures` is. That is the indicator that
some replication docs have not replicated to the target. Not
preserving that statistic meant users could perceive there was a data
loss during replication -- data was replicated successfully according
to the replication job with no write failures, user deletes source
database, then some times later noticed some of their data is missing.
These statistics were already logged in the checkpoint history and we
just had to initialize a stats object from them when a replication job
starts. In that initialization code we pick the highest values from
either the running scheduler or the checkpointed log. The reason is
that the running stats could be higher if say job was stopped suddenly
and failed to checkpoint but scheduler retained the data.
Fixes: #2414
4 files changed, 185 insertions, 82 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 12d3e5530..0b33419e1 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -565,7 +565,7 @@ init_state(Rep) -> options = Options, type = Type, view = View, start_time = StartTime, - stats = Stats + stats = ArgStats0 } = Rep, % Adjust minimum number of http source connections to 2 to avoid deadlock Src = adjust_maxconn(Src0, BaseId), @@ -580,6 +580,14 @@ init_state(Rep) -> [SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep), {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog), + + ArgStats1 = couch_replicator_stats:new(ArgStats0), + HistoryStats = case History of + [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps); + _ -> couch_replicator_stats:new() + end, + Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats), + StartSeq1 = get_value(since_seq, Options, StartSeq0), StartSeq = {0, StartSeq1}, @@ -609,7 +617,7 @@ init_state(Rep) -> ?DEFAULT_CHECKPOINT_INTERVAL), type = Type, view = View, - stats = couch_replicator_stats:new(Stats) + stats = Stats }, State#rep_state{timer = start_timer(State)}. diff --git a/src/couch_replicator/src/couch_replicator_stats.erl b/src/couch_replicator/src/couch_replicator_stats.erl index cd62949e9..37848b3ee 100644 --- a/src/couch_replicator/src/couch_replicator_stats.erl +++ b/src/couch_replicator/src/couch_replicator_stats.erl @@ -17,7 +17,8 @@ new/1, get/2, increment/2, - sum_stats/2 + sum_stats/2, + max_stats/2 ]). -export([ @@ -64,14 +65,29 @@ increment(Field, Stats) -> sum_stats(S1, S2) -> orddict:merge(fun(_, V1, V2) -> V1+V2 end, S1, S2). +max_stats(S1, S2) -> + orddict:merge(fun(_, V1, V2) -> max(V1, V2) end, S1, S2). -% Handle initializing from a status object which uses same values but different -% field names. -fmap({revisions_checked, V}) -> {true, {missing_checked, V}}; -fmap({missing_revisions_found, V}) -> {true, {missing_found, V}}; -fmap({missing_checked, _}) -> true; -fmap({missing_found, _}) -> true; -fmap({docs_read, _}) -> true; -fmap({docs_written, _}) -> true; -fmap({doc_write_failures, _}) -> true; -fmap({_, _}) -> false. + +% Handle initializing from a status object, which uses same values but +% different field names, as well as from ejson props from the checkpoint +% history +% +fmap({missing_found, _}) -> true; +fmap({missing_revisions_found, V}) -> {true, {missing_found, V}}; +fmap({<<"missing_found">>, V}) -> {true, {missing_found, V}}; + +fmap({missing_checked, _}) -> true; +fmap({revisions_checked, V}) -> {true, {missing_checked, V}}; +fmap({<<"missing_checked">>, V}) -> {true, {missing_checked, V}}; + +fmap({docs_read, _}) -> true; +fmap({<<"docs_read">>, V}) -> {true, {docs_read, V}}; + +fmap({docs_written, _}) -> true; +fmap({<<"docs_written">>, V}) -> {true, {docs_written, V}}; + +fmap({doc_write_failures, _}) -> true; +fmap({<<"doc_write_failures">>, V}) -> {true, {doc_write_failures, V}}; + +fmap({_, _}) -> false. diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl index 9dd86b3ef..037f37191 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl @@ -18,48 +18,93 @@ -define(DELAY, 500). -define(TIMEOUT, 60000). --define(i2l(I), integer_to_list(I)). --define(io2b(Io), iolist_to_binary(Io)). + + +setup_all() -> + test_util:start_couch([couch_replicator, chttpd, mem3, fabric]). + + +teardown_all(Ctx) -> + ok = test_util:stop_couch(Ctx). setup() -> - Ctx = test_util:start_couch([couch_replicator, chttpd, mem3, fabric]), Source = setup_db(), Target = setup_db(), - {Ctx, {Source, Target}}. + {Source, Target}. -teardown({Ctx, {Source, Target}}) -> +teardown({Source, Target}) -> teardown_db(Source), teardown_db(Target), - ok = application:stop(couch_replicator), - ok = test_util:stop_couch(Ctx). + ok. stats_retained_test_() -> { setup, - fun setup/0, - fun teardown/1, - fun t_stats_retained/1 + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + fun t_stats_retained_by_scheduler/1, + fun t_stats_retained_on_job_removal/1 + ] + } }. -t_stats_retained({_Ctx, {Source, Target}}) -> +t_stats_retained_by_scheduler({Source, Target}) -> ?_test(begin - populate_db(Source, 42), + {ok, _} = add_vdu(Target), + populate_db_reject_even_docs(Source, 1, 10), {ok, RepPid, RepId} = replicate(Source, Target), + wait_target_in_sync(6, Target), - wait_target_in_sync(Source, Target), - check_active_tasks(42, 42), - check_scheduler_jobs(42, 42), + check_active_tasks(10, 5, 5), + check_scheduler_jobs(10, 5, 5), stop_job(RepPid), - check_scheduler_jobs(42, 42), + check_scheduler_jobs(10, 5, 5), start_job(), - check_active_tasks(42, 42), - check_scheduler_jobs(42, 42), + check_active_tasks(10, 5, 5), + check_scheduler_jobs(10, 5, 5), + couch_replicator_scheduler:remove_job(RepId) + end). + + +t_stats_retained_on_job_removal({Source, Target}) -> + ?_test(begin + {ok, _} = add_vdu(Target), + populate_db_reject_even_docs(Source, 1, 10), + {ok, _, RepId} = replicate(Source, Target), + wait_target_in_sync(6, Target), % 5 + 1 vdu + + check_active_tasks(10, 5, 5), + check_scheduler_jobs(10, 5, 5), + + couch_replicator_scheduler:remove_job(RepId), + + populate_db_reject_even_docs(Source, 11, 20), + {ok, _, RepId} = replicate(Source, Target), + wait_target_in_sync(11, Target), % 6 + 5 + + check_scheduler_jobs(20, 10, 10), + check_active_tasks(20, 10, 10), + + couch_replicator_scheduler:remove_job(RepId), + + populate_db_reject_even_docs(Source, 21, 30), + {ok, _, RepId} = replicate(Source, Target), + wait_target_in_sync(16, Target), % 11 + 5 + + check_scheduler_jobs(30, 15, 15), + check_active_tasks(30, 15, 15), + couch_replicator_scheduler:remove_job(RepId) end). @@ -92,14 +137,16 @@ start_job() -> couch_replicator_scheduler:reschedule(). -check_active_tasks(DocsRead, DocsWritten) -> +check_active_tasks(DocsRead, DocsWritten, DocsFailed) -> RepTask = wait_for_task_status(), ?assertNotEqual(timeout, RepTask), ?assertEqual(DocsRead, couch_util:get_value(docs_read, RepTask)), - ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)). + ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)), + ?assertEqual(DocsFailed, couch_util:get_value(doc_write_failures, + RepTask)). -check_scheduler_jobs(DocsRead, DocsWritten) -> +check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) -> Info = wait_scheduler_info(), ?assert(maps:is_key(<<"changes_pending">>, Info)), ?assert(maps:is_key(<<"doc_write_failures">>, Info)), @@ -110,7 +157,8 @@ check_scheduler_jobs(DocsRead, DocsWritten) -> ?assert(maps:is_key(<<"source_seq">>, Info)), ?assert(maps:is_key(<<"revisions_checked">>, Info)), ?assertMatch(#{<<"docs_read">> := DocsRead}, Info), - ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info). + ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info), + ?assertMatch(#{<<"doc_write_failures">> := DocFailed}, Info). replication_tasks() -> @@ -138,25 +186,31 @@ wait_scheduler_info() -> end). -populate_db(DbName, DocCount) -> +populate_db_reject_even_docs(DbName, Start, End) -> + BodyFun = fun(Id) -> + case Id rem 2 == 0 of + true -> {[{<<"nope">>, true}]}; + false -> {[]} + end + end, + populate_db(DbName, Start, End, BodyFun). + + +populate_db(DbName, Start, End, BodyFun) when is_function(BodyFun, 1) -> {ok, Db} = couch_db:open_int(DbName, []), Docs = lists:foldl( fun(DocIdCounter, Acc) -> - Id = ?io2b(["doc", ?i2l(DocIdCounter)]), - Doc = #doc{id = Id, body = {[]}}, + Id = integer_to_binary(DocIdCounter), + Doc = #doc{id = Id, body = BodyFun(DocIdCounter)}, [Doc | Acc] end, - [], lists:seq(1, DocCount)), + [], lists:seq(Start, End)), {ok, _} = couch_db:update_docs(Db, Docs, []), ok = couch_db:close(Db). -wait_target_in_sync(Source, Target) -> - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, SourceInfo} = couch_db:get_db_info(SourceDb), - ok = couch_db:close(SourceDb), - SourceDocCount = couch_util:get_value(doc_count, SourceInfo), - wait_target_in_sync_loop(SourceDocCount, Target, 300). +wait_target_in_sync(DocCount, Target) when is_integer(DocCount) -> + wait_target_in_sync_loop(DocCount, Target, 300). wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> @@ -170,7 +224,7 @@ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> {ok, TargetInfo} = couch_db:get_db_info(Target), ok = couch_db:close(Target), TargetDocCount = couch_util:get_value(doc_count, TargetInfo), - case TargetDocCount == DocCount of + case TargetDocCount == DocCount of true -> true; false -> @@ -201,3 +255,28 @@ scheduler_jobs() -> {ok, 200, _, Body} = test_request:get(Url, []), Json = jiffy:decode(Body, [return_maps]), maps:get(<<"jobs">>, Json). + + +vdu() -> + <<"function(newDoc, oldDoc, userCtx) { + if(newDoc.nope === true) { + throw({forbidden: 'nope'}); + } else { + return; + } + }">>. + + +add_vdu(DbName) -> + DocProps = [ + {<<"_id">>, <<"_design/vdu">>}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, vdu()} + ], + Doc = couch_doc:from_json_obj({DocProps}, []), + {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), + try + {ok, _Rev} = couch_db:update_doc(Db, Doc, []) + after + couch_db:close(Db) + end. diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs index 73ceca6a4..bdd683e97 100644 --- a/test/elixir/test/replication_test.exs +++ b/test/elixir/test/replication_test.exs @@ -75,8 +75,8 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 2 history = Enum.at(result["history"], 0) - assert history["docs_written"] == 1 - assert history["docs_read"] == 1 + assert history["docs_written"] == 2 + assert history["docs_read"] == 2 assert history["doc_write_failures"] == 0 query = %{ @@ -352,10 +352,10 @@ defmodule ReplicationTest do assert history["session_id"] == result["session_id"] assert is_binary(history["start_time"]) assert is_binary(history["end_time"]) - assert history["missing_checked"] == 6 - assert history["missing_found"] == 6 - assert history["docs_read"] == 6 - assert history["docs_written"] == 6 + assert history["missing_checked"] == 27 + assert history["missing_found"] == 27 + assert history["docs_read"] == 27 + assert history["docs_written"] == 27 assert history["doc_write_failures"] == 0 copy = Couch.get!("/#{tgt_db_name}/#{new_doc["_id"]}").body @@ -414,10 +414,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 3 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 1 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 28 + assert history["missing_found"] == 28 + assert history["docs_read"] == 28 + assert history["docs_written"] == 28 assert history["doc_write_failures"] == 0 resp = Couch.get("/#{tgt_db_name}/#{del_doc["_id"]}") @@ -446,10 +446,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 4 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 1 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 29 + assert history["missing_found"] == 29 + assert history["docs_read"] == 29 + assert history["docs_written"] == 29 assert history["doc_write_failures"] == 0 copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body @@ -473,10 +473,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 5 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 1 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 30 + assert history["missing_found"] == 30 + assert history["docs_read"] == 30 + assert history["docs_written"] == 30 assert history["doc_write_failures"] == 0 copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body @@ -502,10 +502,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 6 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 1 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 31 + assert history["missing_found"] == 31 + assert history["docs_read"] == 31 + assert history["docs_written"] == 31 assert history["doc_write_failures"] == 0 copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body @@ -534,10 +534,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 7 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 3 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 34 + assert history["missing_found"] == 32 + assert history["docs_read"] == 32 + assert history["docs_written"] == 32 assert history["doc_write_failures"] == 0 docs = [ @@ -559,10 +559,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 8 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 2 - assert history["missing_found"] == 0 - assert history["docs_read"] == 0 - assert history["docs_written"] == 0 + assert history["missing_checked"] == 36 + assert history["missing_found"] == 32 + assert history["docs_read"] == 32 + assert history["docs_written"] == 32 assert history["doc_write_failures"] == 0 # Test nothing to replicate @@ -822,10 +822,10 @@ defmodule ReplicationTest do assert length(result["history"]) == 2 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 3 - assert history["missing_found"] == 3 - assert history["docs_read"] == 3 - assert history["docs_written"] == 3 + assert history["missing_checked"] == 19 + assert history["missing_found"] == 19 + assert history["docs_read"] == 19 + assert history["docs_written"] == 19 assert history["doc_write_failures"] == 0 end @@ -1185,8 +1185,8 @@ defmodule ReplicationTest do result = replicate(repl_src, repl_tgt, body: repl_body) assert result["ok"] - assert result["docs_read"] == 1 - assert result["docs_written"] == 1 + assert result["docs_read"] == 2 + assert result["docs_written"] == 2 assert result["doc_write_failures"] == 0 retry_until(fn -> |