diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2019-08-12 11:18:34 -0400 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2019-08-12 11:18:34 -0400 |
commit | 159f680ba66e373cddaf45c1d64264183db8d38a (patch) | |
tree | 70059c22936719321dfa72f633684be41b52cc63 | |
parent | 453787f12a0ac1e6388dd91c6bff1e9af68729fb (diff) | |
download | couchdb-prototype/fdb-replicator-2.tar.gz |
* streamined doc jobs worker (move halt hadling to the top)
* remove invalid tests
* updated couch_replicator_docs to take explicit db and doc params
5 files changed, 281 insertions, 671 deletions
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl index 050fcb4a8..d94c3eb1f 100644 --- a/src/couch_replicator/src/couch_replicator.hrl +++ b/src/couch_replicator/src/couch_replicator.hrl @@ -41,6 +41,12 @@ -define(ST_CRASHING, <<"crashing">>). -define(ST_TRIGGERED, <<"triggered">>). +% Some fields from a rep object +-define(REP_ID, <<"id">>). +-define(DB_NAME, <<"db_name">>). +-define(DOC_ID, <<"doc_id">>). +-define(START_TIME, <<"start_time">>). + % Fields couch job data objects -define(REP, <<"rep">>). -define(REP_PARSE_ERROR, <<"rep_parse_error">>). @@ -51,6 +57,8 @@ -define(DOC_ID, <<"doc_id">>). -define(ERROR_COUNT, <<"error_count">>). -define(LAST_UPDATED, <<"last_updated">>). +-define(HISTORY, <<"history">>). +-define(VER, <<"ver">>). % Accepted job message tag -define(ACCEPTED_JOB, accepted_job). diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl index 9899befb7..d89233906 100644 --- a/src/couch_replicator/src/couch_replicator_doc_processor.erl +++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl @@ -81,116 +81,42 @@ process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) -> process_change(#{name := DbName} = Db, #doc{} = Doc) -> #doc{id = DocId, body = {Props} = Body} = Doc, - {Rep, RepError} = try + {Rep, Error} = try Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body), - Rep1 = Rep0#{ - <<"db_name">> => DbName, - <<"start_time">> => erlang:system_time() - }, + DocState = get_json_value(<<"_replication_state">>, Props, null), + Rep1 = Rep0#{?DB_NAME := DbName, ?DOC_STATE := DocState}, {Rep1, null} catch throw:{bad_rep_doc, Reason} -> {null, couch_replicator_utils:rep_error_to_binary(Reason)} end, - % We keep track of the doc's state in order to clear it if update_docs - % is toggled from true to false - DocState = get_json_value(<<"_replication_state">>, Props, null), case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of {error, not_found} -> - add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState); - {ok, #{?REP := null, ?REP_PARSE_ERROR := RepError}} + add_rep_doc_job(Db, DbName, DocId, Rep, Error); + {ok, #{?REP := null, ?REP_PARSE_ERROR := Error}} when Rep =:= null -> % Same error as before occurred, don't bother updating the job ok; {ok, #{?REP := null}} when Rep =:= null -> % Error occured but it's a different error. Update the job so user % sees the new error - add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState); + add_rep_doc_job(Db, DbName, DocId, Rep, Error); {ok, #{?REP := OldRep, ?REP_PARSE_ERROR := OldError}} -> - NormOldRep = couch_replicator_util:normalize_rep(OldRep), - NormRep = couch_replicator_util:normalize_rep(Rep), - case NormOldRep == NormRep of + case compare_reps(OldRep, Rep) of true -> % Document was changed but none of the parameters relevent % for the replication job have changed, so make it a no-op ok; false -> - add_rep_doc_job(Db, DbName, DocId, Rep, RepError, - DocState) + add_rep_doc_job(Db, DbName, DocId, Rep, Error) end end. - - -rep_docs_job_execute(#{} = Job, #{<<"rep">> := null} = JobData) -> - #{ - <<"rep_parse_error">> := Error, - <<"db_name">> := DbName, - <<"doc_id">> := DocId, - } = JobData, - JobData1 = JobData#{ - <<"finished_state">> := <<"failed">>, - <<"finished_result">> := Error - } - case couch_jobs:finish(undefined, Job, JobData1) of - ok -> - couch_replicator_docs:update_failed(DbName, DocId, Error), - ok; - {error, JobError} -> - Msg = "Replication ~s job could not finish. JobError:~p", - couch_log:error(Msg, [RepId, JobError]), - {error, JobError} - end; - -rep_docs_job_execute(#{} = Job, #{} = JobData) -> - #{<<"rep">> := Rep, <<"doc_state">> := DocState} = JobData, - case lists:member(DocState, [<<"triggered">>, <<"error">>]) of - true -> maybe_remove_state_fields(DbName, DocId), - false -> ok - end, - % completed jobs should finish right away - - % otherwise start computing the replication id - - Rep1 = update_replication_id(Rep), - - % when done add or update the replicaton job - % if jobs has a filter keep checking if filter changes - - -maybe_remove_state_fields(DbName, DocId) -> - case update_docs() of - true -> - ok; - false -> - couch_replicator_docs:remove_state_fields(DbName, DocId) - end. - - -process_updated({DbName, _DocId} = Id, JsonRepDoc) -> - % Parsing replication doc (but not calculating the id) could throw an - % exception which would indicate this document is malformed. This exception - % should propagate to db_change function and will be recorded as permanent - % failure in the document. User will have to update the documet to fix the - % problem. - Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc), - Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()}, - Filter = case couch_replicator_filters:parse(Rep#rep.options) of - {ok, nil} -> - nil; - {ok, {user, _FName, _QP}} -> - user; - {ok, {view, _FName, _QP}} -> - view; - {ok, {docids, _DocIds}} -> - docids; - {ok, {mango, _Selector}} -> - mango; - {error, FilterError} -> - throw(FilterError) - end, - gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity). +compare_reps(Rep1, Rep2) -> + NormRep1 = couch_replicator_util:normalize_rep(Rep1), + NormRep2 = couch_replicator_util:normalize_rep(Rep2), + NormRep1 =:= NormRep2. start_link() -> @@ -218,7 +144,7 @@ handle_call(Msg, _From, #{} = St) -> handle_cast({?ACCEPTED_JOB, Job, JobData}, #{} = St) -> - {noreply, execute_rep_doc_job(Job, JobData, St)}; + {noreply, spawn_worker(Job, JobData, St)}; handle_cast(Msg, #{} = St) -> {stop, {bad_cast, Msg}, St}. @@ -326,236 +252,239 @@ start_acceptors(N, #st{} = St) -> start_worker(Job, #{} = JobData, #{workers := Workers} = St) -> - Parent = self(), - Pid = spawn_link(fun() -> rep_doc_job_worker(Job, JobData, Parent) end), + Pid = spawn_link(fun() -> worker_fun(Job, JobData) end), St#{workers := Workers#{Pid => true}}. -rep_doc_job_worker(Job, #{?REP := null} = RepDocData, Parent) -> +worker_fun(Job, JobData) -> + try + worker_fun1(Job, JobData) + catch + throw:halt -> + Msg = "~p : replication doc job ~p lock conflict", + couch_log:error(Msg, [?MODULE, Job]); + throw:{rep_doc_not_current, DbName, DocId} -> + Msg = "~p : replication doc ~s:~s is not current", + couch_log:error(Msg, [?MODULE, DbName, DocID]), + end. + + +worker_fun1(Job, #{?REP := null} = RepDocData) -> #{ - ?REP_PARSE_ERROR := Error, + ?STATE_INFO := Error, ?DB_NAME := DbName, ?DOC_ID := DocId - ?ERROR_COUNT := ErrorCount, } = RepDocData, - RepDocData1 = RepDocData#{ - ?LAST_UPDATED := erlang:system_time(), - ?ERROR_COUNT := ErrorCount + 1, - ?FINISHED_STATE := ?FAILED, - ?FINISHED_RESULT := Error, - }, - case couch_jobs:finish(undefined, Job, RepDocData1) of - ok -> - couch_replicator_docs:update_failed(DbName, DocId, Error), - St; - {error, JobError} -> - Msg = "~p : replication ~s job could not finish. JobError:~p", - couch_log:error(Msg, [?MODULE, RepId, JobError]), - St - end; + finish_with_permanent_failure(undefined, Job, RepDocData, Error), + couch_replicator_docs:update_failed(DbName, DocId, Error); -rep_doc_job_worker(Job, #{?REP := #{}} = RepDocData, Parent) -> - #{ - ?REP := Rep1, - ?DOC_STATE := DocState, - } = RepDocData, - ok = remove_old_state_fields(Rep, DocState), - % Calculate replication ID. In most case this is fast and easy - % but for filtered replications with user JS filters this - % means making a remote connection to the source db. - Rep2 = #{<<"id">> := RepId} = couch_replicator_docs:update_rep_id(Rep1), + +worker_fun1(Job, #{?REP := #{}} = RepDocData) -> + #{?REP := Rep} = RepDocData, + #{?REP_ID := OldRepId, ?DB_NAME := DbName, ?DOC_ID := DocId} = Rep, + ok = remove_old_state_fields(RepDocData), try - couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> - maybe_start_replication_job(JTx, Rep) - end) + RepWithId = couch_replicator_docs:update_rep_id(Rep), + worker_fun2(Job, OldRepId, RepWithId, RepDocData) catch - throw:{job_start_error, halt} -> - Msg = "~p : replication doc job ~s lock conflict", - couch_log:error(Msg, [?MODULE, RepId]), - exit(normal); - throw:{job_start_error, Error} -> - Msg = "~p : replication job start failed ~slock conflict", - couch_log:error(Msg, [?MODULE, RepId]) + throw:{filter_fetch_error, Error} -> + Error1 = io_lib:format("Filter fetch error ~p", [Error]), + Error2 = couch_util:to_binary(Error1), + finish_with_temporary_error(undefined, Job, RepDocData, Error2), + maybe_update_doc_error(OldRepId, DbName, DocId, Error2) end. -maybe_start_replication_job(JTx, Rep) -> - case couch_jobs:get_job_data(JTx, ?REP_JOBS, RepId) of - {error, not_found} -> - RepJobData = #{ - ?REP := Rep2, - ?STATE := ST_PENDING, - ?STATE_INFO := null, - }, - ok = couch_jobs:add(JTx, ?REP_JOBS, RepJobData), - RepDocData1 = RepDocData#{ - ?REP := Rep2, - ?STATE := ST_SCHEDULED, - ?STATE_INFO := null, - ?ERROR_COUNT := 0, - ?LAST_UPDATED => erlang:system_info() - }, - case couch_jobs:finish(JTx, Job, RepDocData1) of - ok -> ok; - Error -> throw({job_start_error, Error}) - -remove_old_state_fields(#{} = Rep, ?TRIGGERED) -> - case update_docs() of - true -> ok; - false -> couch_replicator_docs:remove_state_fields(DbName, DocId) - end; -remove_old_state_fields(#{} = Rep, ?ERROR) -> - case update_docs() of - true -> ok; - false -> couch_replicator_docs:remove_state_fields(DbName, DocId) - end; +worker_fun2(Job, OldRepId, #{} = Rep, #{} = RepDocData) -> + Result = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + check_rep_doc_current(JTx, Rep), + remove_stale_replication_job(JTx, OldRepId, Rep), + maybe_start_replication_job(JTx, Job, Rep, RepDocData) + end), + case Result of + {ok, RepId} -> + maybe_update_doc_triggered(DbName, DocId, RepId); + ignore -> + ok; + {error, {permanent_failure, Error}} -> + couch_replicator_docs:update_failed(DbName, DocId, Error); + {error, {temporary_error, RepId, Error}} -> + maybe_update_doc_error(RepId, DbName, DocId, Error) + end. -maybe_remove_old_state_fields(#{} = _Rep, _) -> - ok. +check_rep_doc_current(JTx, #{} = Rep) -> + #{?DB_NAME := DbName, ?DOC_ID := DocId, ?VER := Ver} = Rep, + case couch_jobs:get_job_data(JTx, ?REP_DOCS, doc_job_id(DbName, DocId)) of + {ok, #{?REP := #{?VER: = Ver}}} -> + ok; + {ok, #{?REP := #{?VER := V}}} when Ver =/= V -> + throw({rep_doc_not_current, DbName, DocId}); + {error, not_found} -> + throw({rep_doc_not_current, DbName, DocId}); + end. -% Doc processor gen_server private helper functions -% Handle doc update -- add to ets, then start a worker to try to turn it into -% a replication job. In most cases it will succeed quickly but for filtered -% replications or if there are duplicates, it could take longer -% (theoretically indefinitely) until a replication could be started. Before -% adding replication job, make sure to delete all old jobs associated with -% same document. --spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok. -updated_doc(Id, Rep, Filter) -> - NormCurRep = couch_replicator_utils:normalize_rep(current_rep(Id)), - NormNewRep = couch_replicator_utils:normalize_rep(Rep), - case NormCurRep == NormNewRep of - false -> - removed_doc(Id), - Row = #rdoc{ - id = Id, - state = initializing, - rep = Rep, - rid = nil, - filter = Filter, - info = nil, - errcnt = 0, - worker = nil, - last_updated = os:timestamp() - }, - true = ets:insert(?MODULE, Row), - ok = maybe_start_worker(Id); - true -> +% A stale replication job is one still running after the filter +% has been updated and a new replication id was generated. +% +remove_stale_replication_job(_, null, #{}) -> + ok; + +remove_stale_replication_job(JTx, OldRepId, #{} = Rep) -> + #{?REP_ID := RepId, ?VER := Ver} = Rep, + case couch_jobs:get_job_data(JTx, ?REP_JOBS, OldRepId) of + {error, not_found} -> + ok; + {ok, #{?REP := {?VER := Ver}} when OldRep =/= RepId -> + couch_jobs:remove(JTx, ?REP_JOBS, OldRepId) + {ok, #{}} -> ok end. -% Return current #rep{} record if any. If replication hasn't been submitted -% to the scheduler yet, #rep{} record will be in the document processor's -% ETS table, otherwise query scheduler for the #rep{} record. --spec current_rep({binary(), binary()}) -> #rep{} | nil. -current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) -> - case ets:lookup(?MODULE, {DbName, DocId}) of - [] -> - nil; - [#rdoc{state = scheduled, rep = nil, rid = JobId}] -> - % When replication is scheduled, #rep{} record which can be quite - % large compared to other bits in #rdoc is removed in order to avoid - % having to keep 2 copies of it. So have to fetch it from the - % scheduler. - couch_replicator_scheduler:rep_state(JobId); - [#rdoc{rep = Rep}] -> - Rep +maybe_start_replication_job(JTx, Job, #{} = Rep, #{} = RepDocData) -> + {#?REP_ID := RepId, ?DB_NAME := DbName, ?DOC_ID := DocId} = Rep, + case couch_jobs:get_job_data(JTx, ?REP_JOBS, RepId) of + {error, not_found} -> + start_replication_job(JTx, Job, Rep, RepDocData); + {ok, #{?REP := {?DB_NAME := DbName, ?DOC_ID := DocId}} = CurRep} -> + case compare_reps(Rep, CurRep) of + true -> + dont_start_replication_job(JTx, Job, Rep, RepDocData); + false -> + ok = couch_jobs:remove(JTx, ?REP_JOBS, RepId), + start_replication_job(JTx, Job, Rep, RepDocData) + end; + {ok, #{?REP := {?DB_NAME := null}}} -> + Err1 = io_lib:format("Replication `~s` specified by `~s:~s`" + " already running as a transient replication, started via" + " `_replicate` API endpoint", [RepId, DbName, DocId]), + Err2 = couch_util:to_binary(Err1), + ok = finish_with_temporary_error(JTx, Job, RepDocData, Err2), + {error, {temporary_error, RepId, Error2}}; + {ok, #{?REP := {?DB_NAME := OtherDb, ?DOC_ID := OtherDoc}}} -> + Err1 = io_lib:format("Replication `~s` specified by `~s:~s`" + " already started by document `~s:~s`", [RepId, DocId, + DbName, OtherDb, OtherDoc], + Error2 = couch_util:to_binary(Err1), + ok = finish_with_permanent_failure(JTx, Job, RepDocData, Error), + {error, {permanent_failure, Error2}} end. --spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok. -worker_returned(Ref, Id, {ok, RepId}) -> - case ets:lookup(?MODULE, Id) of - [#rdoc{worker = Ref} = Row] -> - Row0 = Row#rdoc{ - state = scheduled, - errcnt = 0, - worker = nil, - last_updated = os:timestamp() - }, - NewRow = case Row0 of - #rdoc{rid = RepId, filter = user} -> - % Filtered replication id didn't change. - Row0; - #rdoc{rid = nil, filter = user} -> - % Calculated new replication id for a filtered replication. Make - % sure to schedule another check as filter code could change. - % Replication starts could have been failing, so also clear - % error count. - Row0#rdoc{rid = RepId}; - #rdoc{rid = OldRepId, filter = user} -> - % Replication id of existing replication job with filter has - % changed. Remove old replication job from scheduler and - % schedule check to check for future changes. - ok = couch_replicator_scheduler:remove_job(OldRepId), - Msg = io_lib:format("Replication id changed: ~p -> ~p", [ - OldRepId, RepId]), - Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)}; - #rdoc{rid = nil} -> - % Calculated new replication id for non-filtered replication. - % Remove replication doc body, after this we won't need it - % anymore. - Row0#rdoc{rep=nil, rid=RepId, info=nil} - end, - true = ets:insert(?MODULE, NewRow), - ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId), - ok = maybe_start_worker(Id); - _ -> - ok % doc could have been deleted, ignore - end, - ok; +finish_with_temporary_error(JTx, Job, RepDocData, Error) -> + #{?ERROR_COUNT := ErrorCount} = RepDocData, + ErrorCount1 = ErrorCount + 1, + RepDocData1 = RepDocData#{ + ?STATE := ?ST_ERROR, + ?STATE_INFO := Error, + ?ERROR_COUNT := ErrorCount1, + } = RepDocData, + schedule_error_backoff(JTx, Job, ErrorCount1), + case couch_jobs:finish(JTx, Job, RepDocData1) of + ok -> ok; + {error, halt} -> throw(halt) + end. -worker_returned(_Ref, _Id, ignore) -> - ok; -worker_returned(Ref, Id, {temporary_error, Reason}) -> - case ets:lookup(?MODULE, Id) of - [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] -> - NewRow = Row#rdoc{ - rid = nil, - state = error, - info = Reason, - errcnt = ErrCnt + 1, - worker = nil, - last_updated = os:timestamp() - }, - true = ets:insert(?MODULE, NewRow), - ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason), - ok = maybe_start_worker(Id); - _ -> - ok % doc could have been deleted, ignore - end, - ok; +finish_with_permanent_failure(JTx, Job, RepDocData, Error) -> + #{?ERROR_COUNT := ErrorCount} = RepDocData, + RepDocData1 = RepDocData#{ + ?STATE := ?ST_FAILED, + ?STATE_INFO := Error, + ?ERROR_COUNT := ErrorCount + 1, + } = RepDocData, + case couch_jobs:finish(JTx, Job, RepDocData1) of + ok -> ok; + {error, halt} -> throw(halt) + end. -worker_returned(Ref, Id, {permanent_failure, _Reason}) -> - case ets:lookup(?MODULE, Id) of - [#rdoc{worker = Ref}] -> - true = ets:delete(?MODULE, Id); - _ -> - ok % doc could have been deleted, ignore - end, + +dont_start_replication_job(JTx, Job, Rep, RepDocData) -> + RepDocData1 = RepDocData#{?LAST_UPDATED => erlang:system_time()}, + ok = schedule_filter_check(JTx, Job, Rep), + case couch_jobs:finish(JTx, Job, RepDocData1) of + ok -> ignore; + {error, halt} -> throw(halt) + end. + + +start_replication_job(JTx, Job, #{} = Rep, #{} = RepDocData) -> + #{?REP_ID := RepId} = Rep, + RepJobData = #{ + ?REP => Rep, + ?STATE => ?ST_PENDING, + ?STATE_INFO => null, + ?ERROR_COUNT => 0, + ?LAST_UPDATED => erlang:system_time(), + ?HISTORY => [] + }, + ok = couch_jobs:add(JTx, ?REP_JOBS, RepId, RepJobData), + RepDocData1 = RepDocData#{ + ?REP := Rep, + ?STATE := ?ST_SCHEDULED, + ?STATE_INFO := null, + ?ERROR_COUNT := 0, + ?LAST_UPDATED => erlang:system_time() + }, + ok = schedule_filter_check(JTx, Job, Rep), + case couch_jobs:finish(JTx, Job, RepDocData1) of + ok -> {ok, RepId}; + {error, halt} -> throw(halt) + end. + + +schedule_error_backoff(JTx, Job, ErrorCount) -> + Exp = min(ErrCnt, ?ERROR_MAX_BACKOFF_EXPONENT), + % ErrCnt is the exponent here. The reason 64 is used is to start at + % 64 (about a minute) max range. Then first backoff would be 30 sec + % on average. Then 1 minute and so on. + NowSec = erlang:system_time(second), + When = NowSec + rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp). + couch_jobs:resubmit(JTx, Job, trunc(When)). + + +schedule_filter_check(JTx, Job, #{<<"filter_type">> := <<"user">>} = Rep) -> + IntervalSec = filter_check_interval_sec(), + NowSec = erlang:system_time(second), + When = NowSec + 0.5 * IntervalSec + rand:uniform(IntervalSec), + couch_jobs:resubmit(JTx, Job, trunc(When)). + +schedule_filter_check(_JTx, _Job, #{}) -> ok. --spec maybe_update_doc_error(#rep{}, any()) -> ok. -maybe_update_doc_error(Rep, Reason) -> +remove_old_state_fields(#{?DOC_STATE := DocState} = RepDocData) when + DocState =:= ?TRIGGERED orelse DocState =:= ?ERROR -> case update_docs() of true -> - couch_replicator_docs:update_error(Rep, Reason); + ok; + false -> + #{?DB_NAME := DbName, ?DOC_ID := DocId} = RepDocData, + couch_replicator_docs:remove_state_fields(DbName, DocId) + end; + +remove_old_state_fields(#{}) -> + ok. + + +-spec maybe_update_doc_error(binary(), binary(), binary(), any()) -> ok. +maybe_update_doc_error(RepId, DbName, DocId, Error) -> + case update_docs() of + true -> + couch_replicator_docs:update_error(RepId, DbName, DocId, Error); false -> ok end. --spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok. -maybe_update_doc_triggered(Rep, RepId) -> +-spec maybe_update_doc_triggered(#{}, rep_id()) -> ok. +maybe_update_doc_triggered(RepId, DbName, DocId) -> case update_docs() of true -> - couch_replicator_docs:update_triggered(Rep, RepId); + couch_replicator_docs:update_triggered(RepId, DbName, DocId); false -> ok end. @@ -570,75 +499,17 @@ error_backoff(ErrCnt) -> couch_rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp). --spec filter_backoff() -> seconds(). -filter_backoff() -> - Total = ets:info(?MODULE, size), - % This value scaled by the number of replications. If the are a lot of them - % wait is longer, but not more than a day (?TS_DAY_SEC). If there are just - % few, wait is shorter, starting at about 30 seconds. `2 *` is used since - % the expected wait would then be 0.5 * Range so it is easier to see the - % average wait. `1 +` is used because couch_rand:uniform only - % accepts >= 1 values and crashes otherwise. - Range = 1 + min(2 * (Total / 10), ?TS_DAY_SEC), - ?MIN_FILTER_DELAY_SEC + couch_rand:uniform(round(Range)). - - -% Document removed from db -- clear ets table and remove all scheduled jobs --spec removed_doc(db_doc_id()) -> ok. -removed_doc({DbName, DocId} = Id) -> - ets:delete(?MODULE, Id), - RepIds = couch_replicator_scheduler:find_jobs_by_doc(DbName, DocId), - lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds). - - -% Whole db shard is gone -- remove all its ets rows and stop jobs --spec removed_db(binary()) -> ok. -removed_db(DbName) -> - EtsPat = #rdoc{id = {DbName, '_'}, _ = '_'}, - ets:match_delete(?MODULE, EtsPat), - RepIds = couch_replicator_scheduler:find_jobs_by_dbname(DbName), - lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds). - - -% Spawn a worker process which will attempt to calculate a replication id, then -% start a replication. Returns a process monitor reference. The worker is -% guaranteed to exit with rep_start_result() type only. --spec maybe_start_worker(db_doc_id()) -> ok. -maybe_start_worker(Id) -> - case ets:lookup(?MODULE, Id) of - [] -> - ok; - [#rdoc{state = scheduled, filter = Filter}] when Filter =/= user -> - ok; - [#rdoc{rep = Rep} = Doc] -> - % For any replication with a user created filter function, periodically - % (every `filter_backoff/0` seconds) to try to see if the user filter - % has changed by using a worker to check for changes. When the worker - % returns check if replication ID has changed. If it hasn't keep - % checking (spawn another worker and so on). If it has stop the job - % with the old ID and continue checking. - Wait = get_worker_wait(Doc), - Ref = make_ref(), - true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}), - couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref), - ok - end. - - --spec get_worker_wait(#rdoc{}) -> seconds(). -get_worker_wait(#rdoc{state = scheduled, filter = user}) -> - filter_backoff(); -get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) -> - error_backoff(ErrCnt); -get_worker_wait(#rdoc{state = initializing}) -> - 0. - - -spec update_docs() -> boolean(). update_docs() -> config:get_boolean("replicator", "update_docs", ?DEFAULT_UPDATE_DOCS). +-spec filter_check_interval_sec() -> integer(). +filter_check_interval_sec() -> + config:get_integer("replicator", "filter_check_interval_sec", + ?DEFAULT_FILTER_CHECK_INTERVAL_SEC). + + % _scheduler/docs HTTP endpoint helpers -spec docs([atom()]) -> [{[_]}] | []. @@ -753,23 +624,33 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) -> -spec add_rep_doc_job(any(), binary(), binary(), #{} | null, - binary() | null, binary() | null) -> ok. -add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError, DocState) -> + binary() | null) -> ok. +add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError) -> JobId = docs_job_id(DbName, DocId), - ok = remove_replication_by_doc_job_id(Db, JobId), - RepDocData = #{ - ?REP => Rep, - ?REP_PARSE_ERROR => RepParseError, - ?DOC_STATE => DocState, - ?DB_NAME => DbName, - ?DOC_ID => DocId, - ?STATE => ST_INITIALIZING, - ?ERROR_COUNT => 0, - ?LAST_UPDATED => erlang:system_time(), - ?STATE => null, - ?STATE_INFO => null. - }, - ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocData). + RepDocData = case Rep of + null -> + #{ + ?REP => null, + ?DB_NAME => DbName, + ?DOC_ID => DocId, + ?STATE => ?ST_INITIALIZING, + ?STATE_INFO => RepParseError + ?ERROR_COUNT => 0, + ?LAST_UPDATED => erlang:system_time() + }; + #{} -> + #{ + ?REP => Rep, + ?STATE => ?ST_INITIALIZING, + ?ERROR_COUNT => 0, + ?LAST_UPDATED => erlang:system_time(), + ?STATE_INFO => null + } + end, + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + ok = remove_replication_by_doc_job_id(JTx, JobId), + ok = couch_jobs:add(JTx, ?REP_DOCS, RepDocData) + end). docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) -> @@ -781,12 +662,14 @@ remove_replication_by_doc_job_id(Tx, Id) -> case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of {error, not_found} -> ok; - {ok, #{?REP := {<<"id">> := null}}} -> + {ok, #{?REP := {?REP_ID := null}}} -> couch_jobs:remove(Tx, ?REP_DOCS, Id), ok; - {ok, #{?REP := {<<"id">> := RepId}}} -> - couch_jobs:remove(Tx, ?REP_JOBS, RepId), - couch_jobs:remove(Tx, ?REP_DOCS, Id), + {ok, #{?REP := {?REP_ID := RepId}}} -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + couch_jobs:remove(JTx, ?REP_JOBS, RepId), + couch_jobs:remove(JTx, ?REP_DOCS, Id) + end), ok end. @@ -1068,7 +951,7 @@ test_rep(Id) -> change() -> {[ - {<<"id">>, ?DOC1}, + {?REP_ID, ?DOC1}, {doc, {[ {<<"_id">>, ?DOC1}, {<<"source">>, <<"http://srchost.local/src">>}, @@ -1079,7 +962,7 @@ change() -> change(State) -> {[ - {<<"id">>, ?DOC1}, + {?REP_ID, ?DOC1}, {doc, {[ {<<"_id">>, ?DOC1}, {<<"source">>, <<"http://srchost.local/src">>}, @@ -1091,7 +974,7 @@ change(State) -> deleted_change() -> {[ - {<<"id">>, ?DOC1}, + {?REP_ID, ?DOC1}, {<<"deleted">>, true}, {doc, {[ {<<"_id">>, ?DOC1}, @@ -1103,7 +986,7 @@ deleted_change() -> bad_change() -> {[ - {<<"id">>, ?DOC2}, + {?REP_ID, ?DOC2}, {doc, {[ {<<"_id">>, ?DOC2}, {<<"source">>, <<"src">>} diff --git a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl deleted file mode 100644 index a4c829323..000000000 --- a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl +++ /dev/null @@ -1,284 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_doc_processor_worker). - --export([ - spawn_worker/4 -]). - --include("couch_replicator.hrl"). - --import(couch_replicator_utils, [ - pp_rep_id/1 -]). - -% 61 seconds here because request usually have 10, 15, 30 second -% timeouts set. We'd want the worker to get a chance to make a few -% requests (maybe one failing one and a retry) and then fail with its -% own error (timeout, network error), which would be more specific and -% informative, before it simply gets killed because of the timeout -% here. That is, if all fails and the worker is actually blocked then -% 61 sec is a safety net to brutally kill the worker so doesn't end up -% hung forever. --define(WORKER_TIMEOUT_MSEC, 61000). - - -% Spawn a worker which attempts to calculate replication id then add a -% replication job to scheduler. This function create a monitor to the worker -% a worker will then exit with the #doc_worker_result{} record within -% ?WORKER_TIMEOUT_MSEC timeout period.A timeout is considered a -%`temporary_error`. Result will be sent as the `Reason` in the {'DOWN',...} -% message. --spec spawn_worker(db_doc_id(), #rep{}, seconds(), reference()) -> pid(). -spawn_worker(Id, Rep, WaitSec, WRef) -> - {Pid, _Ref} = spawn_monitor(fun() -> - worker_fun(Id, Rep, WaitSec, WRef) - end), - Pid. - - -% Private functions - --spec worker_fun(db_doc_id(), #rep{}, seconds(), reference()) -> no_return(). -worker_fun(Id, Rep, WaitSec, WRef) -> - timer:sleep(WaitSec * 1000), - Fun = fun() -> - try maybe_start_replication(Id, Rep, WRef) of - Res -> - exit(Res) - catch - throw:{filter_fetch_error, Reason} -> - exit({temporary_error, Reason}); - _Tag:Reason -> - exit({temporary_error, Reason}) - end - end, - {Pid, Ref} = spawn_monitor(Fun), - receive - {'DOWN', Ref, _, Pid, Result} -> - exit(#doc_worker_result{id = Id, wref = WRef, result = Result}) - after ?WORKER_TIMEOUT_MSEC -> - erlang:demonitor(Ref, [flush]), - exit(Pid, kill), - {DbName, DocId} = Id, - TimeoutSec = round(?WORKER_TIMEOUT_MSEC / 1000), - Msg = io_lib:format("Replication for db ~p doc ~p failed to start due " - "to timeout after ~B seconds", [DbName, DocId, TimeoutSec]), - Result = {temporary_error, couch_util:to_binary(Msg)}, - exit(#doc_worker_result{id = Id, wref = WRef, result = Result}) - end. - - -% Try to start a replication. Used by a worker. This function should return -% rep_start_result(), also throws {filter_fetch_error, Reason} if cannot fetch -% filter.It can also block for an indeterminate amount of time while fetching -% filter. -maybe_start_replication(Id, RepWithoutId, WRef) -> - Rep = couch_replicator_docs:update_rep_id(RepWithoutId), - case maybe_add_job_to_scheduler(Id, Rep, WRef) of - ignore -> - ignore; - {ok, RepId} -> - {ok, RepId}; - {temporary_error, Reason} -> - {temporary_error, Reason}; - {permanent_failure, Reason} -> - {DbName, DocId} = Id, - couch_replicator_docs:update_failed(DbName, DocId, Reason), - {permanent_failure, Reason} - end. - - --spec maybe_add_job_to_scheduler(db_doc_id(), #rep{}, reference()) -> - rep_start_result(). -maybe_add_job_to_scheduler({DbName, DocId}, Rep, WRef) -> - RepId = Rep#rep.id, - case couch_replicator_scheduler:rep_state(RepId) of - nil -> - % Before adding a job check that this worker is still the current - % worker. This is to handle a race condition where a worker which was - % sleeping and then checking a replication filter may inadvertently - % re-add a replication which was already deleted. - case couch_replicator_doc_processor:get_worker_ref({DbName, DocId}) of - WRef -> - ok = couch_replicator_scheduler:add_job(Rep), - {ok, RepId}; - _NilOrOtherWRef -> - ignore - end; - #rep{doc_id = DocId} -> - {ok, RepId}; - #rep{doc_id = null} -> - Msg = io_lib:format("Replication `~s` specified by document `~s`" - " already running as a transient replication, started via" - " `_replicate` API endpoint", [pp_rep_id(RepId), DocId]), - {temporary_error, couch_util:to_binary(Msg)}; - #rep{db_name = OtherDb, doc_id = OtherDocId} -> - Msg = io_lib:format("Replication `~s` specified by document `~s`" - " already started, triggered by document `~s` from db `~s`", - [pp_rep_id(RepId), DocId, OtherDocId, mem3:dbname(OtherDb)]), - {permanent_failure, couch_util:to_binary(Msg)} - end. - - --ifdef(TEST). - --include_lib("eunit/include/eunit.hrl"). - --define(DB, <<"db">>). --define(DOC1, <<"doc1">>). --define(R1, {"ad08e05057046eabe898a2572bbfb573", ""}). - - -doc_processor_worker_test_() -> - { - foreach, - fun setup/0, - fun teardown/1, - [ - t_should_add_job(), - t_already_running_same_docid(), - t_already_running_transient(), - t_already_running_other_db_other_doc(), - t_spawn_worker(), - t_ignore_if_doc_deleted(), - t_ignore_if_worker_ref_does_not_match() - ] - }. - - -% Replication is already running, with same doc id. Ignore change. -t_should_add_job() -> - ?_test(begin - Id = {?DB, ?DOC1}, - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), - ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)), - ?assert(added_job()) - end). - - -% Replication is already running, with same doc id. Ignore change. -t_already_running_same_docid() -> - ?_test(begin - Id = {?DB, ?DOC1}, - mock_already_running(?DB, ?DOC1), - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), - ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)), - ?assert(did_not_add_job()) - end). - - -% There is a transient replication with same replication id running. Ignore. -t_already_running_transient() -> - ?_test(begin - Id = {?DB, ?DOC1}, - mock_already_running(null, null), - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), - ?assertMatch({temporary_error, _}, maybe_start_replication(Id, Rep, - nil)), - ?assert(did_not_add_job()) - end). - - -% There is a duplicate replication potentially from a different db and doc. -% Write permanent failure to doc. -t_already_running_other_db_other_doc() -> - ?_test(begin - Id = {?DB, ?DOC1}, - mock_already_running(<<"otherdb">>, <<"otherdoc">>), - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), - ?assertMatch({permanent_failure, _}, maybe_start_replication(Id, Rep, - nil)), - ?assert(did_not_add_job()), - 1 == meck:num_calls(couch_replicator_docs, update_failed, '_') - end). - - -% Should spawn worker -t_spawn_worker() -> - ?_test(begin - Id = {?DB, ?DOC1}, - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), - WRef = make_ref(), - meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef), - Pid = spawn_worker(Id, Rep, 0, WRef), - Res = receive {'DOWN', _Ref, process, Pid, Reason} -> Reason - after 1000 -> timeout end, - Expect = #doc_worker_result{id = Id, wref = WRef, result = {ok, ?R1}}, - ?assertEqual(Expect, Res), - ?assert(added_job()) - end). - - -% Should not add job if by the time worker got to fetching the filter -% and getting a replication id, replication doc was deleted -t_ignore_if_doc_deleted() -> - ?_test(begin - Id = {?DB, ?DOC1}, - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), - meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil), - ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())), - ?assertNot(added_job()) - end). - - -% Should not add job if by the time worker got to fetchign the filter -% and building a replication id, another worker was spawned. -t_ignore_if_worker_ref_does_not_match() -> - ?_test(begin - Id = {?DB, ?DOC1}, - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), - meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, - make_ref()), - ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())), - ?assertNot(added_job()) - end). - - -% Test helper functions - -setup() -> - meck:expect(couch_replicator_scheduler, add_job, 1, ok), - meck:expect(config, get, fun(_, _, Default) -> Default end), - meck:expect(couch_server, get_uuid, 0, this_is_snek), - meck:expect(couch_replicator_docs, update_failed, 3, ok), - meck:expect(couch_replicator_scheduler, rep_state, 1, nil), - meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil), - ok. - - -teardown(_) -> - meck:unload(). - - -mock_already_running(DbName, DocId) -> - meck:expect(couch_replicator_scheduler, rep_state, - fun(RepId) -> #rep{id = RepId, doc_id = DocId, db_name = DbName} end). - - -added_job() -> - 1 == meck:num_calls(couch_replicator_scheduler, add_job, '_'). - - -did_not_add_job() -> - 0 == meck:num_calls(couch_replicator_scheduler, add_job, '_'). - - -change() -> - {[ - {<<"_id">>, ?DOC1}, - {<<"source">>, <<"http://srchost.local/src">>}, - {<<"target">>, <<"http://tgthost.local/tgt">>} - ]}. - --endif. diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl index eaf8161fe..d235bf666 100644 --- a/src/couch_replicator/src/couch_replicator_docs.erl +++ b/src/couch_replicator/src/couch_replicator_docs.erl @@ -27,7 +27,7 @@ update_failed/3, update_rep_id/1, update_triggered/3, - update_error/2 + update_error/4 ]). @@ -116,13 +116,8 @@ update_triggered(Id, DocId, DbName) -> ok. --spec update_error(#{}, any()) -> ok. -update_error(#rep{} = Rep, Error) -> - #{ - <<"id">> := RepId0, - <<"db_name">> := DbName, - <<"doc_id">> := DocId, - } = Rep, +-spec update_error(binary(), binary(), binary(), any()) -> ok. +update_error(RepId0, DbName, DocId, Error) -> Reason = error_reason(Error), RepId = case RepId0 of Id when is_binary(Id) -> Id; @@ -199,22 +194,30 @@ parse_rep_doc_without_id(#{} = Doc, UserName) -> {error, Error} -> throw({bad_request, Error}); Result -> Result end, + FilterType = couch_replicator_filters:parse(Options) of + {ok, nil} -> null; + {ok, {user, _FName, _QP}} -> <<"user">>; + {ok, {view, _FName, _QP}} -> <<"view">>; + {ok, {docids, _DocIds}} -> <<"doc_ids">>; + {ok, {mango, _Selector}} -> <<"mango">>; + {error, FilterError} -> throw({error, FilterError}) + end, Rep = #{ - <<"id">> => null, + ?REP_ID => null, <<"base_id">> => null, - <<"source">> => Source, - <<"target">> => Target, + ?SOURCE => Source, + ?TARGET => Target, <<"options">> => Opts, <<"user">> => UserName, + <<"filter_type">> => FilterType, <<"type">> => Type, <<"view">> => View, - <<"doc_id">> => maps:get(<<"_id">>, Doc, null) + ?DOC_ID => maps:get(<<"_id">>, Doc, null), + ?DB_NAME => null, + ?DOC_STATE => null, + ?START_TIME => erlang:system_time(), + ?VER => fabric2_util:uuid() }, - % Check if can parse filter code, if not throw exception - case couch_replicator_filters:parse(Opts) of - {error, FilterError} -> throw({error, FilterError}); - {ok, _Filter} -> ok - end, {ok, Rep} end. diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl index b14ea3475..fe785a2bf 100644 --- a/src/couch_replicator/src/couch_replicator_filters.erl +++ b/src/couch_replicator/src/couch_replicator_filters.erl @@ -27,17 +27,17 @@ % For `user` filter, i.e. filters specified as user code % in source database, this code doesn't fetch the filter % code, but only returns the name of the filter. --spec parse([_]) -> +-spec parse(#{}) -> {ok, nil} | {ok, {view, binary(), {[_]}}} | {ok, {user, {binary(), binary()}, {[_]}}} | {ok, {docids, [_]}} | {ok, {mango, {[_]}}} | {error, binary()}. -parse(Options) -> - Filter = couch_util:get_value(filter, Options), - DocIds = couch_util:get_value(doc_ids, Options), - Selector = couch_util:get_value(selector, Options), +parse(#{} = Options) -> + Filter = maps:get(<<"filter">>, Options, undefined), + DocIds = maps:get(<<"doc_ids">>, Options, undefined), + Selector = maps:get(<<"selector">>, Options, undefined), case {Filter, DocIds, Selector} of {undefined, undefined, undefined} -> {ok, nil}; |