diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-01-13 18:21:58 -0500 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2020-01-14 00:59:00 -0500 |
commit | 679a12aac6ef11b37cf829c7d522c99c346a9285 (patch) | |
tree | 05d2a446989486c87df319539494e3bf04222e4c | |
parent | 66afc45015827cd55d8a5663f19da09e18c2780e (diff) | |
download | couchdb-persist-stats-across-job-removals.tar.gz |
Preserve replication job stats when jobs are re-createdpersist-stats-across-job-removals
Previously we made sure replication job statistics were preserved when the jobs
were started and stopped by the scheduler. However, if a VM node restarted or
user re-created the job, replication stats would be reset to 0.
Some statistics like `docs_read` and `docs_written` were perhaps not as
critical. However `doc_write_failures` was. That is the only indicator that
some replication docs have been skipped and not replicated to the target. Not
preserving that statistic meant users could perceive a data loss.
These statistics were already log in the checkpoint history and we just had to
initialize a stats object from them when a replication job starts. In that
initialization code we pick the highest values from either the running
scheduler or the checkpointed log. The reason is that the running stats could
be higher if say job was stopped suddenly and failed to checkpoint but
scheduler retained the data.
Fixes: https://github.com/apache/couchdb/issues/2414
4 files changed, 185 insertions, 82 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 12d3e5530..0b33419e1 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -565,7 +565,7 @@ init_state(Rep) -> options = Options, type = Type, view = View, start_time = StartTime, - stats = Stats + stats = ArgStats0 } = Rep, % Adjust minimum number of http source connections to 2 to avoid deadlock Src = adjust_maxconn(Src0, BaseId), @@ -580,6 +580,14 @@ init_state(Rep) -> [SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep), {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog), + + ArgStats1 = couch_replicator_stats:new(ArgStats0), + HistoryStats = case History of + [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps); + _ -> couch_replicator_stats:new() + end, + Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats), + StartSeq1 = get_value(since_seq, Options, StartSeq0), StartSeq = {0, StartSeq1}, @@ -609,7 +617,7 @@ init_state(Rep) -> ?DEFAULT_CHECKPOINT_INTERVAL), type = Type, view = View, - stats = couch_replicator_stats:new(Stats) + stats = Stats }, State#rep_state{timer = start_timer(State)}. diff --git a/src/couch_replicator/src/couch_replicator_stats.erl b/src/couch_replicator/src/couch_replicator_stats.erl index cd62949e9..37848b3ee 100644 --- a/src/couch_replicator/src/couch_replicator_stats.erl +++ b/src/couch_replicator/src/couch_replicator_stats.erl @@ -17,7 +17,8 @@ new/1, get/2, increment/2, - sum_stats/2 + sum_stats/2, + max_stats/2 ]). -export([ @@ -64,14 +65,29 @@ increment(Field, Stats) -> sum_stats(S1, S2) -> orddict:merge(fun(_, V1, V2) -> V1+V2 end, S1, S2). +max_stats(S1, S2) -> + orddict:merge(fun(_, V1, V2) -> max(V1, V2) end, S1, S2). -% Handle initializing from a status object which uses same values but different -% field names. -fmap({revisions_checked, V}) -> {true, {missing_checked, V}}; -fmap({missing_revisions_found, V}) -> {true, {missing_found, V}}; -fmap({missing_checked, _}) -> true; -fmap({missing_found, _}) -> true; -fmap({docs_read, _}) -> true; -fmap({docs_written, _}) -> true; -fmap({doc_write_failures, _}) -> true; -fmap({_, _}) -> false. + +% Handle initializing from a status object, which uses same values but +% different field names, as well as from ejson props from the checkpoint +% history +% +fmap({missing_found, _}) -> true; +fmap({missing_revisions_found, V}) -> {true, {missing_found, V}}; +fmap({<<"missing_found">>, V}) -> {true, {missing_found, V}}; + +fmap({missing_checked, _}) -> true; +fmap({revisions_checked, V}) -> {true, {missing_checked, V}}; +fmap({<<"missing_checked">>, V}) -> {true, {missing_checked, V}}; + +fmap({docs_read, _}) -> true; +fmap({<<"docs_read">>, V}) -> {true, {docs_read, V}}; + +fmap({docs_written, _}) -> true; +fmap({<<"docs_written">>, V}) -> {true, {docs_written, V}}; + +fmap({doc_write_failures, _}) -> true; +fmap({<<"doc_write_failures">>, V}) -> {true, {doc_write_failures, V}}; + +fmap({_, _}) -> false. diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl index 9dd86b3ef..037f37191 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl @@ -18,48 +18,93 @@ -define(DELAY, 500). -define(TIMEOUT, 60000). --define(i2l(I), integer_to_list(I)). --define(io2b(Io), iolist_to_binary(Io)). + + +setup_all() -> + test_util:start_couch([couch_replicator, chttpd, mem3, fabric]). + + +teardown_all(Ctx) -> + ok = test_util:stop_couch(Ctx). setup() -> - Ctx = test_util:start_couch([couch_replicator, chttpd, mem3, fabric]), Source = setup_db(), Target = setup_db(), - {Ctx, {Source, Target}}. + {Source, Target}. -teardown({Ctx, {Source, Target}}) -> +teardown({Source, Target}) -> teardown_db(Source), teardown_db(Target), - ok = application:stop(couch_replicator), - ok = test_util:stop_couch(Ctx). + ok. stats_retained_test_() -> { setup, - fun setup/0, - fun teardown/1, - fun t_stats_retained/1 + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + fun t_stats_retained_by_scheduler/1, + fun t_stats_retained_on_job_removal/1 + ] + } }. -t_stats_retained({_Ctx, {Source, Target}}) -> +t_stats_retained_by_scheduler({Source, Target}) -> ?_test(begin - populate_db(Source, 42), + {ok, _} = add_vdu(Target), + populate_db_reject_even_docs(Source, 1, 10), {ok, RepPid, RepId} = replicate(Source, Target), + wait_target_in_sync(6, Target), - wait_target_in_sync(Source, Target), - check_active_tasks(42, 42), - check_scheduler_jobs(42, 42), + check_active_tasks(10, 5, 5), + check_scheduler_jobs(10, 5, 5), stop_job(RepPid), - check_scheduler_jobs(42, 42), + check_scheduler_jobs(10, 5, 5), start_job(), - check_active_tasks(42, 42), - check_scheduler_jobs(42, 42), + check_active_tasks(10, 5, 5), + check_scheduler_jobs(10, 5, 5), + couch_replicator_scheduler:remove_job(RepId) + end). + + +t_stats_retained_on_job_removal({Source, Target}) -> + ?_test(begin + {ok, _} = add_vdu(Target), + populate_db_reject_even_docs(Source, 1, 10), + {ok, _, RepId} = replicate(Source, Target), + wait_target_in_sync(6, Target), % 5 + 1 vdu + + check_active_tasks(10, 5, 5), + check_scheduler_jobs(10, 5, 5), + + couch_replicator_scheduler:remove_job(RepId), + + populate_db_reject_even_docs(Source, 11, 20), + {ok, _, RepId} = replicate(Source, Target), + wait_target_in_sync(11, Target), % 6 + 5 + + check_scheduler_jobs(20, 10, 10), + check_active_tasks(20, 10, 10), + + couch_replicator_scheduler:remove_job(RepId), + + populate_db_reject_even_docs(Source, 21, 30), + {ok, _, RepId} = replicate(Source, Target), + wait_target_in_sync(16, Target), % 11 + 5 + + check_scheduler_jobs(30, 15, 15), + check_active_tasks(30, 15, 15), + couch_replicator_scheduler:remove_job(RepId) end). @@ -92,14 +137,16 @@ start_job() -> couch_replicator_scheduler:reschedule(). -check_active_tasks(DocsRead, DocsWritten) -> +check_active_tasks(DocsRead, DocsWritten, DocsFailed) -> RepTask = wait_for_task_status(), ?assertNotEqual(timeout, RepTask), ?assertEqual(DocsRead, couch_util:get_value(docs_read, RepTask)), - ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)). + ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)), + ?assertEqual(DocsFailed, couch_util:get_value(doc_write_failures, + RepTask)). -check_scheduler_jobs(DocsRead, DocsWritten) -> +check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) -> Info = wait_scheduler_info(), ?assert(maps:is_key(<<"changes_pending">>, Info)), ?assert(maps:is_key(<<"doc_write_failures">>, Info)), @@ -110,7 +157,8 @@ check_scheduler_jobs(DocsRead, DocsWritten) -> ?assert(maps:is_key(<<"source_seq">>, Info)), ?assert(maps:is_key(<<"revisions_checked">>, Info)), ?assertMatch(#{<<"docs_read">> := DocsRead}, Info), - ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info). + ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info), + ?assertMatch(#{<<"doc_write_failures">> := DocFailed}, Info). replication_tasks() -> @@ -138,25 +186,31 @@ wait_scheduler_info() -> end). -populate_db(DbName, DocCount) -> +populate_db_reject_even_docs(DbName, Start, End) -> + BodyFun = fun(Id) -> + case Id rem 2 == 0 of + true -> {[{<<"nope">>, true}]}; + false -> {[]} + end + end, + populate_db(DbName, Start, End, BodyFun). + + +populate_db(DbName, Start, End, BodyFun) when is_function(BodyFun, 1) -> {ok, Db} = couch_db:open_int(DbName, []), Docs = lists:foldl( fun(DocIdCounter, Acc) -> - Id = ?io2b(["doc", ?i2l(DocIdCounter)]), - Doc = #doc{id = Id, body = {[]}}, + Id = integer_to_binary(DocIdCounter), + Doc = #doc{id = Id, body = BodyFun(DocIdCounter)}, [Doc | Acc] end, - [], lists:seq(1, DocCount)), + [], lists:seq(Start, End)), {ok, _} = couch_db:update_docs(Db, Docs, []), ok = couch_db:close(Db). -wait_target_in_sync(Source, Target) -> - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, SourceInfo} = couch_db:get_db_info(SourceDb), - ok = couch_db:close(SourceDb), - SourceDocCount = couch_util:get_value(doc_count, SourceInfo), - wait_target_in_sync_loop(SourceDocCount, Target, 300). +wait_target_in_sync(DocCount, Target) when is_integer(DocCount) -> + wait_target_in_sync_loop(DocCount, Target, 300). wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> @@ -170,7 +224,7 @@ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> {ok, TargetInfo} = couch_db:get_db_info(Target), ok = couch_db:close(Target), TargetDocCount = couch_util:get_value(doc_count, TargetInfo), - case TargetDocCount == DocCount of + case TargetDocCount == DocCount of true -> true; false -> @@ -201,3 +255,28 @@ scheduler_jobs() -> {ok, 200, _, Body} = test_request:get(Url, []), Json = jiffy:decode(Body, [return_maps]), maps:get(<<"jobs">>, Json). + + +vdu() -> + <<"function(newDoc, oldDoc, userCtx) { + if(newDoc.nope === true) { + throw({forbidden: 'nope'}); + } else { + return; + } + }">>. + + +add_vdu(DbName) -> + DocProps = [ + {<<"_id">>, <<"_design/vdu">>}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, vdu()} + ], + Doc = couch_doc:from_json_obj({DocProps}, []), + {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), + try + {ok, _Rev} = couch_db:update_doc(Db, Doc, []) + after + couch_db:close(Db) + end. diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs index 73ceca6a4..bdd683e97 100644 --- a/test/elixir/test/replication_test.exs +++ b/test/elixir/test/replication_test.exs @@ -75,8 +75,8 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 2 history = Enum.at(result["history"], 0) - assert history["docs_written"] == 1 - assert history["docs_read"] == 1 + assert history["docs_written"] == 2 + assert history["docs_read"] == 2 assert history["doc_write_failures"] == 0 query = %{ @@ -352,10 +352,10 @@ defmodule ReplicationTest do assert history["session_id"] == result["session_id"] assert is_binary(history["start_time"]) assert is_binary(history["end_time"]) - assert history["missing_checked"] == 6 - assert history["missing_found"] == 6 - assert history["docs_read"] == 6 - assert history["docs_written"] == 6 + assert history["missing_checked"] == 27 + assert history["missing_found"] == 27 + assert history["docs_read"] == 27 + assert history["docs_written"] == 27 assert history["doc_write_failures"] == 0 copy = Couch.get!("/#{tgt_db_name}/#{new_doc["_id"]}").body @@ -414,10 +414,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 3 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 1 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 28 + assert history["missing_found"] == 28 + assert history["docs_read"] == 28 + assert history["docs_written"] == 28 assert history["doc_write_failures"] == 0 resp = Couch.get("/#{tgt_db_name}/#{del_doc["_id"]}") @@ -446,10 +446,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 4 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 1 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 29 + assert history["missing_found"] == 29 + assert history["docs_read"] == 29 + assert history["docs_written"] == 29 assert history["doc_write_failures"] == 0 copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body @@ -473,10 +473,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 5 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 1 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 30 + assert history["missing_found"] == 30 + assert history["docs_read"] == 30 + assert history["docs_written"] == 30 assert history["doc_write_failures"] == 0 copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body @@ -502,10 +502,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 6 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 1 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 31 + assert history["missing_found"] == 31 + assert history["docs_read"] == 31 + assert history["docs_written"] == 31 assert history["doc_write_failures"] == 0 copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body @@ -534,10 +534,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 7 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 3 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 34 + assert history["missing_found"] == 32 + assert history["docs_read"] == 32 + assert history["docs_written"] == 32 assert history["doc_write_failures"] == 0 docs = [ @@ -559,10 +559,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 8 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 2 - assert history["missing_found"] == 0 - assert history["docs_read"] == 0 - assert history["docs_written"] == 0 + assert history["missing_checked"] == 36 + assert history["missing_found"] == 32 + assert history["docs_read"] == 32 + assert history["docs_written"] == 32 assert history["doc_write_failures"] == 0 # Test nothing to replicate @@ -822,10 +822,10 @@ defmodule ReplicationTest do assert length(result["history"]) == 2 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 3 - assert history["missing_found"] == 3 - assert history["docs_read"] == 3 - assert history["docs_written"] == 3 + assert history["missing_checked"] == 19 + assert history["missing_found"] == 19 + assert history["docs_read"] == 19 + assert history["docs_written"] == 19 assert history["doc_write_failures"] == 0 end @@ -1185,8 +1185,8 @@ defmodule ReplicationTest do result = replicate(repl_src, repl_tgt, body: repl_body) assert result["ok"] - assert result["docs_read"] == 1 - assert result["docs_written"] == 1 + assert result["docs_read"] == 2 + assert result["docs_written"] == 2 assert result["doc_write_failures"] == 0 retry_until(fn -> |