summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-10-31 11:58:39 -0400
committerNick Vatamaniuc <vatamane@apache.org>2019-11-06 13:29:34 -0500
commitc47c509d559fb929e0f00a8812db2da35299df65 (patch)
treea92fc4fd642f2623812469f9f6faea8f2984a001
parentd60551d89100dccb5c649dd6cad3b7c7ec1371e0 (diff)
downloadcouchdb-active-task-stats-in-scheduler.tar.gz
Return detailed replication stats for running and pending jobsactive-task-stats-in-scheduler
Previously `_scheduled/docs` returned detailed replication statistics for completed jobs only. To get the same level of details from a running or pending jobs users had to use `_active_tasks`, which is not optimal and required jumping between monitoring endpoints. `info` field was originally meant to hold these statistics but they were not implemented and it just returned `null` as a placeholder. With work for 3.0 finalizing, this might be a good time to add this improvement to avoid disturbing the API afterwards. Just updating the `_scheduler/docs` was not quite enough since, replications started from the `_replicate` endpoint would not be visible there and users would still have to access `_active_tasks` to get inspect them, so let's add the `info` field to the `_scheduler/jobs` as well. After this update, all states and status details from `_active_tasks` and `_replicator` docs should be available under `_scheduler/jobs` and `_scheduler/docs` endpoints.
-rw-r--r--src/couch_replicator/src/couch_replicator_doc_processor.erl11
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.erl21
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl16
-rw-r--r--src/couch_replicator/src/couch_replicator_stats.erl50
-rw-r--r--src/couch_replicator/src/couch_replicator_utils.erl16
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl51
6 files changed, 107 insertions, 58 deletions
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 23cdeeaac..29170ed53 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -533,15 +533,6 @@ doc_lookup(Db, DocId, HealthThreshold) ->
end.
--spec ejson_state_info(binary() | nil) -> binary() | null.
-ejson_state_info(nil) ->
- null;
-ejson_state_info(Info) when is_binary(Info) ->
- Info;
-ejson_state_info(Info) ->
- couch_replicator_utils:rep_error_to_binary(Info).
-
-
-spec ejson_rep_id(rep_id() | nil) -> binary() | null.
ejson_rep_id(nil) ->
null;
@@ -579,7 +570,7 @@ ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) ->
{database, DbName},
{id, ejson_rep_id(RepId)},
{state, RepState},
- {info, ejson_state_info(StateInfo)},
+ {info, couch_replicator_utils:ejson_state_info(StateInfo)},
{error_count, ErrorCount},
{node, node()},
{last_updated, couch_replicator_utils:iso8601(StateTime)},
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index c9da377c6..d534973be 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -148,19 +148,19 @@ job_summary(JobId, HealthThreshold) ->
[{{crashed, Error}, _When} | _] ->
{crashing, crash_reason_json(Error)};
[_ | _] ->
- {pending, null}
+ {pending, Rep#rep.stats}
end;
{undefined, ErrorCount} when ErrorCount > 0 ->
[{{crashed, Error}, _When} | _] = History,
{crashing, crash_reason_json(Error)};
{Pid, ErrorCount} when is_pid(Pid) ->
- {running, null}
+ {running, Rep#rep.stats}
end,
[
{source, iolist_to_binary(ejson_url(Rep#rep.source))},
{target, iolist_to_binary(ejson_url(Rep#rep.target))},
{state, State},
- {info, Info},
+ {info, couch_replicator_utils:ejson_state_info(Info)},
{error_count, ErrorCount},
{last_updated, last_updated(History)},
{start_time,
@@ -829,6 +829,7 @@ job_ejson(Job) ->
{database, Rep#rep.db_name},
{user, (Rep#rep.user_ctx)#user_ctx.name},
{doc_id, Rep#rep.doc_id},
+ {info, couch_replicator_utils:ejson_state_info(Rep#rep.stats)},
{history, History},
{node, node()},
{start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
@@ -1431,7 +1432,12 @@ t_job_summary_running() ->
Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
?assertEqual(running, proplists:get_value(state, Summary)),
?assertEqual(null, proplists:get_value(info, Summary)),
- ?assertEqual(0, proplists:get_value(error_count, Summary))
+ ?assertEqual(0, proplists:get_value(error_count, Summary)),
+
+ Stats = [{source_seq, <<"1-abc">>}],
+ handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
+ Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
+ ?assertEqual({Stats}, proplists:get_value(info, Summary1))
end).
@@ -1447,7 +1453,12 @@ t_job_summary_pending() ->
Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
?assertEqual(pending, proplists:get_value(state, Summary)),
?assertEqual(null, proplists:get_value(info, Summary)),
- ?assertEqual(0, proplists:get_value(error_count, Summary))
+ ?assertEqual(0, proplists:get_value(error_count, Summary)),
+
+ Stats = [{doc_write_failures, 1}],
+ handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
+ Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
+ ?assertEqual({Stats}, proplists:get_value(info, Summary1))
end).
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 565a2bd97..d69febb81 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -600,7 +600,7 @@ init_state(Rep) ->
?DEFAULT_CHECKPOINT_INTERVAL),
type = Type,
view = View,
- stats = Stats
+ stats = couch_replicator_stats:new(Stats)
},
State#rep_state{timer = start_timer(State)}.
@@ -949,20 +949,16 @@ get_pending_count_int(#rep_state{source = Db}=St) ->
update_task(State) ->
#rep_state{
+ rep_details = #rep{id = JobId},
current_through_seq = {_, ThroughSeq},
highest_seq_done = {_, HighestSeq}
} = State,
- update_scheduler_job_stats(State),
- couch_task_status:update(
- rep_stats(State) ++ [
+ Status = rep_stats(State) ++ [
{source_seq, HighestSeq},
{through_seq, ThroughSeq}
- ]).
-
-
-update_scheduler_job_stats(#rep_state{rep_details = Rep, stats = Stats}) ->
- JobId = Rep#rep.id,
- couch_replicator_scheduler:update_job_stats(JobId, Stats).
+ ],
+ couch_replicator_scheduler:update_job_stats(JobId, Status),
+ couch_task_status:update(Status).
rep_stats(State) ->
diff --git a/src/couch_replicator/src/couch_replicator_stats.erl b/src/couch_replicator/src/couch_replicator_stats.erl
index af8ba4e4f..cd62949e9 100644
--- a/src/couch_replicator/src/couch_replicator_stats.erl
+++ b/src/couch_replicator/src/couch_replicator_stats.erl
@@ -12,14 +12,6 @@
-module(couch_replicator_stats).
--record(rep_stats, {
- missing_checked = 0,
- missing_found = 0,
- docs_read = 0,
- docs_written = 0,
- doc_write_failures = 0
-}).
-
-export([
new/0,
new/1,
@@ -39,26 +31,27 @@
new() ->
orddict:new().
-new(Initializers) when is_list(Initializers) ->
- orddict:from_list(Initializers).
+new(Initializers0) when is_list(Initializers0) ->
+ Initializers1 = lists:filtermap(fun fmap/1, Initializers0),
+ orddict:from_list(Initializers1).
missing_checked(Stats) ->
- get(missing_checked, upgrade(Stats)).
+ get(missing_checked, Stats).
missing_found(Stats) ->
- get(missing_found, upgrade(Stats)).
+ get(missing_found, Stats).
docs_read(Stats) ->
- get(docs_read, upgrade(Stats)).
+ get(docs_read, Stats).
docs_written(Stats) ->
- get(docs_written, upgrade(Stats)).
+ get(docs_written, Stats).
doc_write_failures(Stats) ->
- get(doc_write_failures, upgrade(Stats)).
+ get(doc_write_failures, Stats).
get(Field, Stats) ->
- case orddict:find(Field, upgrade(Stats)) of
+ case orddict:find(Field, Stats) of
{ok, Value} ->
Value;
error ->
@@ -66,18 +59,19 @@ get(Field, Stats) ->
end.
increment(Field, Stats) ->
- orddict:update_counter(Field, 1, upgrade(Stats)).
+ orddict:update_counter(Field, 1, Stats).
sum_stats(S1, S2) ->
- orddict:merge(fun(_, V1, V2) -> V1+V2 end, upgrade(S1), upgrade(S2)).
+ orddict:merge(fun(_, V1, V2) -> V1+V2 end, S1, S2).
+
-upgrade(#rep_stats{} = Stats) ->
- orddict:from_list([
- {missing_checked, Stats#rep_stats.missing_checked},
- {missing_found, Stats#rep_stats.missing_found},
- {docs_read, Stats#rep_stats.docs_read},
- {docs_written, Stats#rep_stats.docs_written},
- {doc_write_failures, Stats#rep_stats.doc_write_failures}
- ]);
-upgrade(Stats) ->
- Stats.
+% Handle initializing from a status object which uses same values but different
+% field names.
+fmap({revisions_checked, V}) -> {true, {missing_checked, V}};
+fmap({missing_revisions_found, V}) -> {true, {missing_found, V}};
+fmap({missing_checked, _}) -> true;
+fmap({missing_found, _}) -> true;
+fmap({docs_read, _}) -> true;
+fmap({docs_written, _}) -> true;
+fmap({doc_write_failures, _}) -> true;
+fmap({_, _}) -> false.
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index ccf241324..856c1b581 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -24,7 +24,8 @@
iso8601/1,
filter_state/3,
remove_basic_auth_from_headers/1,
- normalize_rep/1
+ normalize_rep/1,
+ ejson_state_info/1
]).
@@ -176,6 +177,19 @@ normalize_rep(#rep{} = Rep)->
}.
+-spec ejson_state_info(binary() | nil) -> binary() | null.
+ejson_state_info(nil) ->
+ null;
+ejson_state_info(Info) when is_binary(Info) ->
+ Info;
+ejson_state_info([]) ->
+ null; % Status not set yet => null for compatibility reasons
+ejson_state_info([{_, _} | _] = Info) ->
+ {Info};
+ejson_state_info(Info) ->
+ couch_replicator_utils:rep_error_to_binary(Info).
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
index 3b7377b78..9dd86b3ef 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
@@ -23,7 +23,7 @@
setup() ->
- Ctx = test_util:start_couch([couch_replicator]),
+ Ctx = test_util:start_couch([couch_replicator, chttpd, mem3, fabric]),
Source = setup_db(),
Target = setup_db(),
{Ctx, {Source, Target}}.
@@ -49,10 +49,17 @@ t_stats_retained({_Ctx, {Source, Target}}) ->
?_test(begin
populate_db(Source, 42),
{ok, RepPid, RepId} = replicate(Source, Target),
+
wait_target_in_sync(Source, Target),
check_active_tasks(42, 42),
- reschedule_job(RepPid),
+ check_scheduler_jobs(42, 42),
+
+ stop_job(RepPid),
+ check_scheduler_jobs(42, 42),
+
+ start_job(),
check_active_tasks(42, 42),
+ check_scheduler_jobs(42, 42),
couch_replicator_scheduler:remove_job(RepId)
end).
@@ -69,7 +76,7 @@ teardown_db(DbName) ->
ok.
-reschedule_job(RepPid) ->
+stop_job(RepPid) ->
Ref = erlang:monitor(process, RepPid),
gen_server:cast(couch_replicator_scheduler, {set_max_jobs, 0}),
couch_replicator_scheduler:reschedule(),
@@ -77,7 +84,10 @@ reschedule_job(RepPid) ->
{'DOWN', Ref, _, _, _} -> ok
after ?TIMEOUT ->
erlang:error(timeout)
- end,
+ end.
+
+
+start_job() ->
gen_server:cast(couch_replicator_scheduler, {set_max_jobs, 500}),
couch_replicator_scheduler:reschedule().
@@ -89,6 +99,20 @@ check_active_tasks(DocsRead, DocsWritten) ->
?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)).
+check_scheduler_jobs(DocsRead, DocsWritten) ->
+ Info = wait_scheduler_info(),
+ ?assert(maps:is_key(<<"changes_pending">>, Info)),
+ ?assert(maps:is_key(<<"doc_write_failures">>, Info)),
+ ?assert(maps:is_key(<<"docs_read">>, Info)),
+ ?assert(maps:is_key(<<"docs_written">>, Info)),
+ ?assert(maps:is_key(<<"missing_revisions_found">>, Info)),
+ ?assert(maps:is_key(<<"checkpointed_source_seq">>, Info)),
+ ?assert(maps:is_key(<<"source_seq">>, Info)),
+ ?assert(maps:is_key(<<"revisions_checked">>, Info)),
+ ?assertMatch(#{<<"docs_read">> := DocsRead}, Info),
+ ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info).
+
+
replication_tasks() ->
lists:filter(fun(P) ->
couch_util:get_value(type, P) =:= replication
@@ -104,6 +128,16 @@ wait_for_task_status() ->
end).
+wait_scheduler_info() ->
+ test_util:wait(fun() ->
+ case scheduler_jobs() of
+ [] -> wait;
+ [#{<<"info">> := null}] -> wait;
+ [#{<<"info">> := Info}] -> Info
+ end
+ end).
+
+
populate_db(DbName, DocCount) ->
{ok, Db} = couch_db:open_int(DbName, []),
Docs = lists:foldl(
@@ -158,3 +192,12 @@ replicate(Source, Target) ->
couch_replicator_scheduler:reschedule(),
Pid = couch_replicator_test_helper:get_pid(Rep#rep.id),
{ok, Pid, Rep#rep.id}.
+
+
+scheduler_jobs() ->
+ Addr = config:get("chttpd", "bind_address", "127.0.0.1"),
+ Port = mochiweb_socket_server:get(chttpd, port),
+ Url = lists:flatten(io_lib:format("http://~s:~b/_scheduler/jobs", [Addr, Port])),
+ {ok, 200, _, Body} = test_request:get(Url, []),
+ Json = jiffy:decode(Body, [return_maps]),
+ maps:get(<<"jobs">>, Json).