diff options
4 files changed, 190 insertions, 12 deletions
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl index d46c34720..2a5b7c8c8 100644 --- a/src/couch_replicator/src/couch_replicator.hrl +++ b/src/couch_replicator/src/couch_replicator.hrl @@ -22,7 +22,8 @@ view = nil :: any() | '_', doc_id :: any() | '_', db_name = null :: null | binary() | '_', - start_time = {0, 0, 0} :: erlang:timestamp() | '_' + start_time = {0, 0, 0} :: erlang:timestamp() | '_', + stats = couch_replicator_stats:new() :: orddict:orddict() | '_' }). -type rep_id() :: {string(), string()}. diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index 762ef18fe..e3dbede83 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -40,7 +40,8 @@ health_threshold/0, jobs/0, job/1, - restart_job/1 + restart_job/1, + update_job_stats/2 ]). %% config_listener callbacks @@ -215,6 +216,11 @@ restart_job(JobId) -> end. +-spec update_job_stats(job_id(), term()) -> ok. +update_job_stats(JobId, Stats) -> + gen_server:cast(?MODULE, {update_job_stats, JobId, Stats}). + + %% gen_server functions init(_) -> @@ -283,6 +289,16 @@ handle_cast({set_interval, Interval}, State) when is_integer(Interval), couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]), {noreply, State#state{interval = Interval}}; +handle_cast({update_job_stats, JobId, Stats}, State) -> + case rep_state(JobId) of + nil -> + ok; + #rep{} = Rep -> + NewRep = Rep#rep{stats = Stats}, + true = ets:update_element(?MODULE, JobId, {#job.rep, NewRep}) + end, + {noreply, State}; + handle_cast(UnexpectedMsg, State) -> couch_log:error("~p: received un-expected cast ~p", [?MODULE, UnexpectedMsg]), {noreply, State}. diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 1467d9f30..643b22ac8 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -159,16 +159,9 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) -> {source, ?l2b(SourceName)}, {target, ?l2b(TargetName)}, {continuous, get_value(continuous, Options, false)}, - {revisions_checked, 0}, - {missing_revisions_found, 0}, - {docs_read, 0}, - {docs_written, 0}, - {changes_pending, get_pending_count(State)}, - {doc_write_failures, 0}, {source_seq, HighestSeq}, - {checkpointed_source_seq, CommittedSeq}, {checkpoint_interval, CheckpointInterval} - ]), + ] ++ rep_stats(State)), couch_task_status:set_update_frequency(1000), % Until OTP R14B03: @@ -582,7 +575,8 @@ init_state(Rep) -> source = Src0, target = Tgt, options = Options, user_ctx = UserCtx, type = Type, view = View, - start_time = StartTime + start_time = StartTime, + stats = Stats } = Rep, % Adjust minimum number of http source connections to 2 to avoid deadlock Src = adjust_maxconn(Src0, BaseId), @@ -631,7 +625,8 @@ init_state(Rep) -> checkpoint_interval = get_value(checkpoint_interval, Options, ?DEFAULT_CHECKPOINT_INTERVAL), type = Type, - view = View + view = View, + stats = Stats }, State#rep_state{timer = start_timer(State)}. @@ -983,6 +978,7 @@ update_task(State) -> current_through_seq = {_, ThroughSeq}, highest_seq_done = {_, HighestSeq} } = State, + update_scheduler_job_stats(State), couch_task_status:update( rep_stats(State) ++ [ {source_seq, HighestSeq}, @@ -990,6 +986,11 @@ update_task(State) -> ]). +update_scheduler_job_stats(#rep_state{rep_details = Rep, stats = Stats}) -> + JobId = Rep#rep.id, + couch_replicator_scheduler:update_job_stats(JobId, Stats). + + rep_stats(State) -> #rep_state{ committed_seq = {_, CommittedSeq}, diff --git a/src/couch_replicator/test/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/couch_replicator_retain_stats_between_job_runs.erl new file mode 100644 index 000000000..3b7377b78 --- /dev/null +++ b/src/couch_replicator/test/couch_replicator_retain_stats_between_job_runs.erl @@ -0,0 +1,160 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_retain_stats_between_job_runs). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_replicator/src/couch_replicator.hrl"). + +-define(DELAY, 500). +-define(TIMEOUT, 60000). +-define(i2l(I), integer_to_list(I)). +-define(io2b(Io), iolist_to_binary(Io)). + + +setup() -> + Ctx = test_util:start_couch([couch_replicator]), + Source = setup_db(), + Target = setup_db(), + {Ctx, {Source, Target}}. + + +teardown({Ctx, {Source, Target}}) -> + teardown_db(Source), + teardown_db(Target), + ok = application:stop(couch_replicator), + ok = test_util:stop_couch(Ctx). + + +stats_retained_test_() -> + { + setup, + fun setup/0, + fun teardown/1, + fun t_stats_retained/1 + }. + + +t_stats_retained({_Ctx, {Source, Target}}) -> + ?_test(begin + populate_db(Source, 42), + {ok, RepPid, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + check_active_tasks(42, 42), + reschedule_job(RepPid), + check_active_tasks(42, 42), + couch_replicator_scheduler:remove_job(RepId) + end). + + +setup_db() -> + DbName = ?tempdb(), + {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), + ok = couch_db:close(Db), + DbName. + + +teardown_db(DbName) -> + ok = couch_server:delete(DbName, [?ADMIN_CTX]), + ok. + + +reschedule_job(RepPid) -> + Ref = erlang:monitor(process, RepPid), + gen_server:cast(couch_replicator_scheduler, {set_max_jobs, 0}), + couch_replicator_scheduler:reschedule(), + receive + {'DOWN', Ref, _, _, _} -> ok + after ?TIMEOUT -> + erlang:error(timeout) + end, + gen_server:cast(couch_replicator_scheduler, {set_max_jobs, 500}), + couch_replicator_scheduler:reschedule(). + + +check_active_tasks(DocsRead, DocsWritten) -> + RepTask = wait_for_task_status(), + ?assertNotEqual(timeout, RepTask), + ?assertEqual(DocsRead, couch_util:get_value(docs_read, RepTask)), + ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)). + + +replication_tasks() -> + lists:filter(fun(P) -> + couch_util:get_value(type, P) =:= replication + end, couch_task_status:all()). + + +wait_for_task_status() -> + test_util:wait(fun() -> + case replication_tasks() of + [] -> wait; + [RepTask] -> RepTask + end + end). + + +populate_db(DbName, DocCount) -> + {ok, Db} = couch_db:open_int(DbName, []), + Docs = lists:foldl( + fun(DocIdCounter, Acc) -> + Id = ?io2b(["doc", ?i2l(DocIdCounter)]), + Doc = #doc{id = Id, body = {[]}}, + [Doc | Acc] + end, + [], lists:seq(1, DocCount)), + {ok, _} = couch_db:update_docs(Db, Docs, []), + ok = couch_db:close(Db). + + +wait_target_in_sync(Source, Target) -> + {ok, SourceDb} = couch_db:open_int(Source, []), + {ok, SourceInfo} = couch_db:get_db_info(SourceDb), + ok = couch_db:close(SourceDb), + SourceDocCount = couch_util:get_value(doc_count, SourceInfo), + wait_target_in_sync_loop(SourceDocCount, Target, 300). + + +wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> + erlang:error({assertion_failed, [ + {module, ?MODULE}, {line, ?LINE}, + {reason, "Could not get source and target databases in sync"} + ]}); + +wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> + {ok, Target} = couch_db:open_int(TargetName, []), + {ok, TargetInfo} = couch_db:get_db_info(Target), + ok = couch_db:close(Target), + TargetDocCount = couch_util:get_value(doc_count, TargetInfo), + case TargetDocCount == DocCount of + true -> + true; + false -> + ok = timer:sleep(?DELAY), + wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1) + end. + + +replicate(Source, Target) -> + SrcUrl = couch_replicator_test_helper:db_url(Source), + TgtUrl = couch_replicator_test_helper:db_url(Target), + RepObject = {[ + {<<"source">>, SrcUrl}, + {<<"target">>, TgtUrl}, + {<<"continuous">>, true} + ]}, + {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER), + ok = couch_replicator_scheduler:add_job(Rep), + couch_replicator_scheduler:reschedule(), + Pid = couch_replicator_test_helper:get_pid(Rep#rep.id), + {ok, Pid, Rep#rep.id}. |