From c47c509d559fb929e0f00a8812db2da35299df65 Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Thu, 31 Oct 2019 11:58:39 -0400 Subject: Return detailed replication stats for running and pending jobs Previously `_scheduled/docs` returned detailed replication statistics for completed jobs only. To get the same level of details from a running or pending jobs users had to use `_active_tasks`, which is not optimal and required jumping between monitoring endpoints. `info` field was originally meant to hold these statistics but they were not implemented and it just returned `null` as a placeholder. With work for 3.0 finalizing, this might be a good time to add this improvement to avoid disturbing the API afterwards. Just updating the `_scheduler/docs` was not quite enough since, replications started from the `_replicate` endpoint would not be visible there and users would still have to access `_active_tasks` to get inspect them, so let's add the `info` field to the `_scheduler/jobs` as well. After this update, all states and status details from `_active_tasks` and `_replicator` docs should be available under `_scheduler/jobs` and `_scheduler/docs` endpoints. --- .../src/couch_replicator_doc_processor.erl | 11 +---- .../src/couch_replicator_scheduler.erl | 21 ++++++--- .../src/couch_replicator_scheduler_job.erl | 16 +++---- .../src/couch_replicator_stats.erl | 50 ++++++++++----------- .../src/couch_replicator_utils.erl | 16 ++++++- ...ch_replicator_retain_stats_between_job_runs.erl | 51 ++++++++++++++++++++-- 6 files changed, 107 insertions(+), 58 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl index 23cdeeaac..29170ed53 100644 --- a/src/couch_replicator/src/couch_replicator_doc_processor.erl +++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl @@ -533,15 +533,6 @@ doc_lookup(Db, DocId, HealthThreshold) -> end. --spec ejson_state_info(binary() | nil) -> binary() | null. -ejson_state_info(nil) -> - null; -ejson_state_info(Info) when is_binary(Info) -> - Info; -ejson_state_info(Info) -> - couch_replicator_utils:rep_error_to_binary(Info). - - -spec ejson_rep_id(rep_id() | nil) -> binary() | null. ejson_rep_id(nil) -> null; @@ -579,7 +570,7 @@ ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) -> {database, DbName}, {id, ejson_rep_id(RepId)}, {state, RepState}, - {info, ejson_state_info(StateInfo)}, + {info, couch_replicator_utils:ejson_state_info(StateInfo)}, {error_count, ErrorCount}, {node, node()}, {last_updated, couch_replicator_utils:iso8601(StateTime)}, diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index c9da377c6..d534973be 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -148,19 +148,19 @@ job_summary(JobId, HealthThreshold) -> [{{crashed, Error}, _When} | _] -> {crashing, crash_reason_json(Error)}; [_ | _] -> - {pending, null} + {pending, Rep#rep.stats} end; {undefined, ErrorCount} when ErrorCount > 0 -> [{{crashed, Error}, _When} | _] = History, {crashing, crash_reason_json(Error)}; {Pid, ErrorCount} when is_pid(Pid) -> - {running, null} + {running, Rep#rep.stats} end, [ {source, iolist_to_binary(ejson_url(Rep#rep.source))}, {target, iolist_to_binary(ejson_url(Rep#rep.target))}, {state, State}, - {info, Info}, + {info, couch_replicator_utils:ejson_state_info(Info)}, {error_count, ErrorCount}, {last_updated, last_updated(History)}, {start_time, @@ -829,6 +829,7 @@ job_ejson(Job) -> {database, Rep#rep.db_name}, {user, (Rep#rep.user_ctx)#user_ctx.name}, {doc_id, Rep#rep.doc_id}, + {info, couch_replicator_utils:ejson_state_info(Rep#rep.stats)}, {history, History}, {node, node()}, {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)} @@ -1431,7 +1432,12 @@ t_job_summary_running() -> Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), ?assertEqual(running, proplists:get_value(state, Summary)), ?assertEqual(null, proplists:get_value(info, Summary)), - ?assertEqual(0, proplists:get_value(error_count, Summary)) + ?assertEqual(0, proplists:get_value(error_count, Summary)), + + Stats = [{source_seq, <<"1-abc">>}], + handle_cast({update_job_stats, job1, Stats}, mock_state(1)), + Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), + ?assertEqual({Stats}, proplists:get_value(info, Summary1)) end). @@ -1447,7 +1453,12 @@ t_job_summary_pending() -> Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), ?assertEqual(pending, proplists:get_value(state, Summary)), ?assertEqual(null, proplists:get_value(info, Summary)), - ?assertEqual(0, proplists:get_value(error_count, Summary)) + ?assertEqual(0, proplists:get_value(error_count, Summary)), + + Stats = [{doc_write_failures, 1}], + handle_cast({update_job_stats, job1, Stats}, mock_state(1)), + Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), + ?assertEqual({Stats}, proplists:get_value(info, Summary1)) end). diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 565a2bd97..d69febb81 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -600,7 +600,7 @@ init_state(Rep) -> ?DEFAULT_CHECKPOINT_INTERVAL), type = Type, view = View, - stats = Stats + stats = couch_replicator_stats:new(Stats) }, State#rep_state{timer = start_timer(State)}. @@ -949,20 +949,16 @@ get_pending_count_int(#rep_state{source = Db}=St) -> update_task(State) -> #rep_state{ + rep_details = #rep{id = JobId}, current_through_seq = {_, ThroughSeq}, highest_seq_done = {_, HighestSeq} } = State, - update_scheduler_job_stats(State), - couch_task_status:update( - rep_stats(State) ++ [ + Status = rep_stats(State) ++ [ {source_seq, HighestSeq}, {through_seq, ThroughSeq} - ]). - - -update_scheduler_job_stats(#rep_state{rep_details = Rep, stats = Stats}) -> - JobId = Rep#rep.id, - couch_replicator_scheduler:update_job_stats(JobId, Stats). + ], + couch_replicator_scheduler:update_job_stats(JobId, Status), + couch_task_status:update(Status). rep_stats(State) -> diff --git a/src/couch_replicator/src/couch_replicator_stats.erl b/src/couch_replicator/src/couch_replicator_stats.erl index af8ba4e4f..cd62949e9 100644 --- a/src/couch_replicator/src/couch_replicator_stats.erl +++ b/src/couch_replicator/src/couch_replicator_stats.erl @@ -12,14 +12,6 @@ -module(couch_replicator_stats). --record(rep_stats, { - missing_checked = 0, - missing_found = 0, - docs_read = 0, - docs_written = 0, - doc_write_failures = 0 -}). - -export([ new/0, new/1, @@ -39,26 +31,27 @@ new() -> orddict:new(). -new(Initializers) when is_list(Initializers) -> - orddict:from_list(Initializers). +new(Initializers0) when is_list(Initializers0) -> + Initializers1 = lists:filtermap(fun fmap/1, Initializers0), + orddict:from_list(Initializers1). missing_checked(Stats) -> - get(missing_checked, upgrade(Stats)). + get(missing_checked, Stats). missing_found(Stats) -> - get(missing_found, upgrade(Stats)). + get(missing_found, Stats). docs_read(Stats) -> - get(docs_read, upgrade(Stats)). + get(docs_read, Stats). docs_written(Stats) -> - get(docs_written, upgrade(Stats)). + get(docs_written, Stats). doc_write_failures(Stats) -> - get(doc_write_failures, upgrade(Stats)). + get(doc_write_failures, Stats). get(Field, Stats) -> - case orddict:find(Field, upgrade(Stats)) of + case orddict:find(Field, Stats) of {ok, Value} -> Value; error -> @@ -66,18 +59,19 @@ get(Field, Stats) -> end. increment(Field, Stats) -> - orddict:update_counter(Field, 1, upgrade(Stats)). + orddict:update_counter(Field, 1, Stats). sum_stats(S1, S2) -> - orddict:merge(fun(_, V1, V2) -> V1+V2 end, upgrade(S1), upgrade(S2)). + orddict:merge(fun(_, V1, V2) -> V1+V2 end, S1, S2). + -upgrade(#rep_stats{} = Stats) -> - orddict:from_list([ - {missing_checked, Stats#rep_stats.missing_checked}, - {missing_found, Stats#rep_stats.missing_found}, - {docs_read, Stats#rep_stats.docs_read}, - {docs_written, Stats#rep_stats.docs_written}, - {doc_write_failures, Stats#rep_stats.doc_write_failures} - ]); -upgrade(Stats) -> - Stats. +% Handle initializing from a status object which uses same values but different +% field names. +fmap({revisions_checked, V}) -> {true, {missing_checked, V}}; +fmap({missing_revisions_found, V}) -> {true, {missing_found, V}}; +fmap({missing_checked, _}) -> true; +fmap({missing_found, _}) -> true; +fmap({docs_read, _}) -> true; +fmap({docs_written, _}) -> true; +fmap({doc_write_failures, _}) -> true; +fmap({_, _}) -> false. diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl index ccf241324..856c1b581 100644 --- a/src/couch_replicator/src/couch_replicator_utils.erl +++ b/src/couch_replicator/src/couch_replicator_utils.erl @@ -24,7 +24,8 @@ iso8601/1, filter_state/3, remove_basic_auth_from_headers/1, - normalize_rep/1 + normalize_rep/1, + ejson_state_info/1 ]). @@ -176,6 +177,19 @@ normalize_rep(#rep{} = Rep)-> }. +-spec ejson_state_info(binary() | nil) -> binary() | null. +ejson_state_info(nil) -> + null; +ejson_state_info(Info) when is_binary(Info) -> + Info; +ejson_state_info([]) -> + null; % Status not set yet => null for compatibility reasons +ejson_state_info([{_, _} | _] = Info) -> + {Info}; +ejson_state_info(Info) -> + couch_replicator_utils:rep_error_to_binary(Info). + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl index 3b7377b78..9dd86b3ef 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl @@ -23,7 +23,7 @@ setup() -> - Ctx = test_util:start_couch([couch_replicator]), + Ctx = test_util:start_couch([couch_replicator, chttpd, mem3, fabric]), Source = setup_db(), Target = setup_db(), {Ctx, {Source, Target}}. @@ -49,10 +49,17 @@ t_stats_retained({_Ctx, {Source, Target}}) -> ?_test(begin populate_db(Source, 42), {ok, RepPid, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), check_active_tasks(42, 42), - reschedule_job(RepPid), + check_scheduler_jobs(42, 42), + + stop_job(RepPid), + check_scheduler_jobs(42, 42), + + start_job(), check_active_tasks(42, 42), + check_scheduler_jobs(42, 42), couch_replicator_scheduler:remove_job(RepId) end). @@ -69,7 +76,7 @@ teardown_db(DbName) -> ok. -reschedule_job(RepPid) -> +stop_job(RepPid) -> Ref = erlang:monitor(process, RepPid), gen_server:cast(couch_replicator_scheduler, {set_max_jobs, 0}), couch_replicator_scheduler:reschedule(), @@ -77,7 +84,10 @@ reschedule_job(RepPid) -> {'DOWN', Ref, _, _, _} -> ok after ?TIMEOUT -> erlang:error(timeout) - end, + end. + + +start_job() -> gen_server:cast(couch_replicator_scheduler, {set_max_jobs, 500}), couch_replicator_scheduler:reschedule(). @@ -89,6 +99,20 @@ check_active_tasks(DocsRead, DocsWritten) -> ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)). +check_scheduler_jobs(DocsRead, DocsWritten) -> + Info = wait_scheduler_info(), + ?assert(maps:is_key(<<"changes_pending">>, Info)), + ?assert(maps:is_key(<<"doc_write_failures">>, Info)), + ?assert(maps:is_key(<<"docs_read">>, Info)), + ?assert(maps:is_key(<<"docs_written">>, Info)), + ?assert(maps:is_key(<<"missing_revisions_found">>, Info)), + ?assert(maps:is_key(<<"checkpointed_source_seq">>, Info)), + ?assert(maps:is_key(<<"source_seq">>, Info)), + ?assert(maps:is_key(<<"revisions_checked">>, Info)), + ?assertMatch(#{<<"docs_read">> := DocsRead}, Info), + ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info). + + replication_tasks() -> lists:filter(fun(P) -> couch_util:get_value(type, P) =:= replication @@ -104,6 +128,16 @@ wait_for_task_status() -> end). +wait_scheduler_info() -> + test_util:wait(fun() -> + case scheduler_jobs() of + [] -> wait; + [#{<<"info">> := null}] -> wait; + [#{<<"info">> := Info}] -> Info + end + end). + + populate_db(DbName, DocCount) -> {ok, Db} = couch_db:open_int(DbName, []), Docs = lists:foldl( @@ -158,3 +192,12 @@ replicate(Source, Target) -> couch_replicator_scheduler:reschedule(), Pid = couch_replicator_test_helper:get_pid(Rep#rep.id), {ok, Pid, Rep#rep.id}. + + +scheduler_jobs() -> + Addr = config:get("chttpd", "bind_address", "127.0.0.1"), + Port = mochiweb_socket_server:get(chttpd, port), + Url = lists:flatten(io_lib:format("http://~s:~b/_scheduler/jobs", [Addr, Port])), + {ok, 200, _, Body} = test_request:get(Url, []), + Json = jiffy:decode(Body, [return_maps]), + maps:get(<<"jobs">>, Json). -- cgit v1.2.1