diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_doc_processor.erl')
-rw-r--r-- | src/couch_replicator/src/couch_replicator_doc_processor.erl | 613 |
1 files changed, 248 insertions, 365 deletions
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl index 9899befb7..d89233906 100644 --- a/src/couch_replicator/src/couch_replicator_doc_processor.erl +++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl @@ -81,116 +81,42 @@ process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) -> process_change(#{name := DbName} = Db, #doc{} = Doc) -> #doc{id = DocId, body = {Props} = Body} = Doc, - {Rep, RepError} = try + {Rep, Error} = try Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body), - Rep1 = Rep0#{ - <<"db_name">> => DbName, - <<"start_time">> => erlang:system_time() - }, + DocState = get_json_value(<<"_replication_state">>, Props, null), + Rep1 = Rep0#{?DB_NAME := DbName, ?DOC_STATE := DocState}, {Rep1, null} catch throw:{bad_rep_doc, Reason} -> {null, couch_replicator_utils:rep_error_to_binary(Reason)} end, - % We keep track of the doc's state in order to clear it if update_docs - % is toggled from true to false - DocState = get_json_value(<<"_replication_state">>, Props, null), case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of {error, not_found} -> - add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState); - {ok, #{?REP := null, ?REP_PARSE_ERROR := RepError}} + add_rep_doc_job(Db, DbName, DocId, Rep, Error); + {ok, #{?REP := null, ?REP_PARSE_ERROR := Error}} when Rep =:= null -> % Same error as before occurred, don't bother updating the job ok; {ok, #{?REP := null}} when Rep =:= null -> % Error occured but it's a different error. Update the job so user % sees the new error - add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState); + add_rep_doc_job(Db, DbName, DocId, Rep, Error); {ok, #{?REP := OldRep, ?REP_PARSE_ERROR := OldError}} -> - NormOldRep = couch_replicator_util:normalize_rep(OldRep), - NormRep = couch_replicator_util:normalize_rep(Rep), - case NormOldRep == NormRep of + case compare_reps(OldRep, Rep) of true -> % Document was changed but none of the parameters relevent % for the replication job have changed, so make it a no-op ok; false -> - add_rep_doc_job(Db, DbName, DocId, Rep, RepError, - DocState) + add_rep_doc_job(Db, DbName, DocId, Rep, Error) end end. - - -rep_docs_job_execute(#{} = Job, #{<<"rep">> := null} = JobData) -> - #{ - <<"rep_parse_error">> := Error, - <<"db_name">> := DbName, - <<"doc_id">> := DocId, - } = JobData, - JobData1 = JobData#{ - <<"finished_state">> := <<"failed">>, - <<"finished_result">> := Error - } - case couch_jobs:finish(undefined, Job, JobData1) of - ok -> - couch_replicator_docs:update_failed(DbName, DocId, Error), - ok; - {error, JobError} -> - Msg = "Replication ~s job could not finish. JobError:~p", - couch_log:error(Msg, [RepId, JobError]), - {error, JobError} - end; - -rep_docs_job_execute(#{} = Job, #{} = JobData) -> - #{<<"rep">> := Rep, <<"doc_state">> := DocState} = JobData, - case lists:member(DocState, [<<"triggered">>, <<"error">>]) of - true -> maybe_remove_state_fields(DbName, DocId), - false -> ok - end, - % completed jobs should finish right away - - % otherwise start computing the replication id - - Rep1 = update_replication_id(Rep), - - % when done add or update the replicaton job - % if jobs has a filter keep checking if filter changes - - -maybe_remove_state_fields(DbName, DocId) -> - case update_docs() of - true -> - ok; - false -> - couch_replicator_docs:remove_state_fields(DbName, DocId) - end. - - -process_updated({DbName, _DocId} = Id, JsonRepDoc) -> - % Parsing replication doc (but not calculating the id) could throw an - % exception which would indicate this document is malformed. This exception - % should propagate to db_change function and will be recorded as permanent - % failure in the document. User will have to update the documet to fix the - % problem. - Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc), - Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()}, - Filter = case couch_replicator_filters:parse(Rep#rep.options) of - {ok, nil} -> - nil; - {ok, {user, _FName, _QP}} -> - user; - {ok, {view, _FName, _QP}} -> - view; - {ok, {docids, _DocIds}} -> - docids; - {ok, {mango, _Selector}} -> - mango; - {error, FilterError} -> - throw(FilterError) - end, - gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity). +compare_reps(Rep1, Rep2) -> + NormRep1 = couch_replicator_util:normalize_rep(Rep1), + NormRep2 = couch_replicator_util:normalize_rep(Rep2), + NormRep1 =:= NormRep2. start_link() -> @@ -218,7 +144,7 @@ handle_call(Msg, _From, #{} = St) -> handle_cast({?ACCEPTED_JOB, Job, JobData}, #{} = St) -> - {noreply, execute_rep_doc_job(Job, JobData, St)}; + {noreply, spawn_worker(Job, JobData, St)}; handle_cast(Msg, #{} = St) -> {stop, {bad_cast, Msg}, St}. @@ -326,236 +252,239 @@ start_acceptors(N, #st{} = St) -> start_worker(Job, #{} = JobData, #{workers := Workers} = St) -> - Parent = self(), - Pid = spawn_link(fun() -> rep_doc_job_worker(Job, JobData, Parent) end), + Pid = spawn_link(fun() -> worker_fun(Job, JobData) end), St#{workers := Workers#{Pid => true}}. -rep_doc_job_worker(Job, #{?REP := null} = RepDocData, Parent) -> +worker_fun(Job, JobData) -> + try + worker_fun1(Job, JobData) + catch + throw:halt -> + Msg = "~p : replication doc job ~p lock conflict", + couch_log:error(Msg, [?MODULE, Job]); + throw:{rep_doc_not_current, DbName, DocId} -> + Msg = "~p : replication doc ~s:~s is not current", + couch_log:error(Msg, [?MODULE, DbName, DocID]), + end. + + +worker_fun1(Job, #{?REP := null} = RepDocData) -> #{ - ?REP_PARSE_ERROR := Error, + ?STATE_INFO := Error, ?DB_NAME := DbName, ?DOC_ID := DocId - ?ERROR_COUNT := ErrorCount, } = RepDocData, - RepDocData1 = RepDocData#{ - ?LAST_UPDATED := erlang:system_time(), - ?ERROR_COUNT := ErrorCount + 1, - ?FINISHED_STATE := ?FAILED, - ?FINISHED_RESULT := Error, - }, - case couch_jobs:finish(undefined, Job, RepDocData1) of - ok -> - couch_replicator_docs:update_failed(DbName, DocId, Error), - St; - {error, JobError} -> - Msg = "~p : replication ~s job could not finish. JobError:~p", - couch_log:error(Msg, [?MODULE, RepId, JobError]), - St - end; + finish_with_permanent_failure(undefined, Job, RepDocData, Error), + couch_replicator_docs:update_failed(DbName, DocId, Error); -rep_doc_job_worker(Job, #{?REP := #{}} = RepDocData, Parent) -> - #{ - ?REP := Rep1, - ?DOC_STATE := DocState, - } = RepDocData, - ok = remove_old_state_fields(Rep, DocState), - % Calculate replication ID. In most case this is fast and easy - % but for filtered replications with user JS filters this - % means making a remote connection to the source db. - Rep2 = #{<<"id">> := RepId} = couch_replicator_docs:update_rep_id(Rep1), + +worker_fun1(Job, #{?REP := #{}} = RepDocData) -> + #{?REP := Rep} = RepDocData, + #{?REP_ID := OldRepId, ?DB_NAME := DbName, ?DOC_ID := DocId} = Rep, + ok = remove_old_state_fields(RepDocData), try - couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> - maybe_start_replication_job(JTx, Rep) - end) + RepWithId = couch_replicator_docs:update_rep_id(Rep), + worker_fun2(Job, OldRepId, RepWithId, RepDocData) catch - throw:{job_start_error, halt} -> - Msg = "~p : replication doc job ~s lock conflict", - couch_log:error(Msg, [?MODULE, RepId]), - exit(normal); - throw:{job_start_error, Error} -> - Msg = "~p : replication job start failed ~slock conflict", - couch_log:error(Msg, [?MODULE, RepId]) + throw:{filter_fetch_error, Error} -> + Error1 = io_lib:format("Filter fetch error ~p", [Error]), + Error2 = couch_util:to_binary(Error1), + finish_with_temporary_error(undefined, Job, RepDocData, Error2), + maybe_update_doc_error(OldRepId, DbName, DocId, Error2) end. -maybe_start_replication_job(JTx, Rep) -> - case couch_jobs:get_job_data(JTx, ?REP_JOBS, RepId) of - {error, not_found} -> - RepJobData = #{ - ?REP := Rep2, - ?STATE := ST_PENDING, - ?STATE_INFO := null, - }, - ok = couch_jobs:add(JTx, ?REP_JOBS, RepJobData), - RepDocData1 = RepDocData#{ - ?REP := Rep2, - ?STATE := ST_SCHEDULED, - ?STATE_INFO := null, - ?ERROR_COUNT := 0, - ?LAST_UPDATED => erlang:system_info() - }, - case couch_jobs:finish(JTx, Job, RepDocData1) of - ok -> ok; - Error -> throw({job_start_error, Error}) - -remove_old_state_fields(#{} = Rep, ?TRIGGERED) -> - case update_docs() of - true -> ok; - false -> couch_replicator_docs:remove_state_fields(DbName, DocId) - end; -remove_old_state_fields(#{} = Rep, ?ERROR) -> - case update_docs() of - true -> ok; - false -> couch_replicator_docs:remove_state_fields(DbName, DocId) - end; +worker_fun2(Job, OldRepId, #{} = Rep, #{} = RepDocData) -> + Result = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + check_rep_doc_current(JTx, Rep), + remove_stale_replication_job(JTx, OldRepId, Rep), + maybe_start_replication_job(JTx, Job, Rep, RepDocData) + end), + case Result of + {ok, RepId} -> + maybe_update_doc_triggered(DbName, DocId, RepId); + ignore -> + ok; + {error, {permanent_failure, Error}} -> + couch_replicator_docs:update_failed(DbName, DocId, Error); + {error, {temporary_error, RepId, Error}} -> + maybe_update_doc_error(RepId, DbName, DocId, Error) + end. -maybe_remove_old_state_fields(#{} = _Rep, _) -> - ok. +check_rep_doc_current(JTx, #{} = Rep) -> + #{?DB_NAME := DbName, ?DOC_ID := DocId, ?VER := Ver} = Rep, + case couch_jobs:get_job_data(JTx, ?REP_DOCS, doc_job_id(DbName, DocId)) of + {ok, #{?REP := #{?VER: = Ver}}} -> + ok; + {ok, #{?REP := #{?VER := V}}} when Ver =/= V -> + throw({rep_doc_not_current, DbName, DocId}); + {error, not_found} -> + throw({rep_doc_not_current, DbName, DocId}); + end. -% Doc processor gen_server private helper functions -% Handle doc update -- add to ets, then start a worker to try to turn it into -% a replication job. In most cases it will succeed quickly but for filtered -% replications or if there are duplicates, it could take longer -% (theoretically indefinitely) until a replication could be started. Before -% adding replication job, make sure to delete all old jobs associated with -% same document. --spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok. -updated_doc(Id, Rep, Filter) -> - NormCurRep = couch_replicator_utils:normalize_rep(current_rep(Id)), - NormNewRep = couch_replicator_utils:normalize_rep(Rep), - case NormCurRep == NormNewRep of - false -> - removed_doc(Id), - Row = #rdoc{ - id = Id, - state = initializing, - rep = Rep, - rid = nil, - filter = Filter, - info = nil, - errcnt = 0, - worker = nil, - last_updated = os:timestamp() - }, - true = ets:insert(?MODULE, Row), - ok = maybe_start_worker(Id); - true -> +% A stale replication job is one still running after the filter +% has been updated and a new replication id was generated. +% +remove_stale_replication_job(_, null, #{}) -> + ok; + +remove_stale_replication_job(JTx, OldRepId, #{} = Rep) -> + #{?REP_ID := RepId, ?VER := Ver} = Rep, + case couch_jobs:get_job_data(JTx, ?REP_JOBS, OldRepId) of + {error, not_found} -> + ok; + {ok, #{?REP := {?VER := Ver}} when OldRep =/= RepId -> + couch_jobs:remove(JTx, ?REP_JOBS, OldRepId) + {ok, #{}} -> ok end. -% Return current #rep{} record if any. If replication hasn't been submitted -% to the scheduler yet, #rep{} record will be in the document processor's -% ETS table, otherwise query scheduler for the #rep{} record. --spec current_rep({binary(), binary()}) -> #rep{} | nil. -current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) -> - case ets:lookup(?MODULE, {DbName, DocId}) of - [] -> - nil; - [#rdoc{state = scheduled, rep = nil, rid = JobId}] -> - % When replication is scheduled, #rep{} record which can be quite - % large compared to other bits in #rdoc is removed in order to avoid - % having to keep 2 copies of it. So have to fetch it from the - % scheduler. - couch_replicator_scheduler:rep_state(JobId); - [#rdoc{rep = Rep}] -> - Rep +maybe_start_replication_job(JTx, Job, #{} = Rep, #{} = RepDocData) -> + {#?REP_ID := RepId, ?DB_NAME := DbName, ?DOC_ID := DocId} = Rep, + case couch_jobs:get_job_data(JTx, ?REP_JOBS, RepId) of + {error, not_found} -> + start_replication_job(JTx, Job, Rep, RepDocData); + {ok, #{?REP := {?DB_NAME := DbName, ?DOC_ID := DocId}} = CurRep} -> + case compare_reps(Rep, CurRep) of + true -> + dont_start_replication_job(JTx, Job, Rep, RepDocData); + false -> + ok = couch_jobs:remove(JTx, ?REP_JOBS, RepId), + start_replication_job(JTx, Job, Rep, RepDocData) + end; + {ok, #{?REP := {?DB_NAME := null}}} -> + Err1 = io_lib:format("Replication `~s` specified by `~s:~s`" + " already running as a transient replication, started via" + " `_replicate` API endpoint", [RepId, DbName, DocId]), + Err2 = couch_util:to_binary(Err1), + ok = finish_with_temporary_error(JTx, Job, RepDocData, Err2), + {error, {temporary_error, RepId, Error2}}; + {ok, #{?REP := {?DB_NAME := OtherDb, ?DOC_ID := OtherDoc}}} -> + Err1 = io_lib:format("Replication `~s` specified by `~s:~s`" + " already started by document `~s:~s`", [RepId, DocId, + DbName, OtherDb, OtherDoc], + Error2 = couch_util:to_binary(Err1), + ok = finish_with_permanent_failure(JTx, Job, RepDocData, Error), + {error, {permanent_failure, Error2}} end. --spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok. -worker_returned(Ref, Id, {ok, RepId}) -> - case ets:lookup(?MODULE, Id) of - [#rdoc{worker = Ref} = Row] -> - Row0 = Row#rdoc{ - state = scheduled, - errcnt = 0, - worker = nil, - last_updated = os:timestamp() - }, - NewRow = case Row0 of - #rdoc{rid = RepId, filter = user} -> - % Filtered replication id didn't change. - Row0; - #rdoc{rid = nil, filter = user} -> - % Calculated new replication id for a filtered replication. Make - % sure to schedule another check as filter code could change. - % Replication starts could have been failing, so also clear - % error count. - Row0#rdoc{rid = RepId}; - #rdoc{rid = OldRepId, filter = user} -> - % Replication id of existing replication job with filter has - % changed. Remove old replication job from scheduler and - % schedule check to check for future changes. - ok = couch_replicator_scheduler:remove_job(OldRepId), - Msg = io_lib:format("Replication id changed: ~p -> ~p", [ - OldRepId, RepId]), - Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)}; - #rdoc{rid = nil} -> - % Calculated new replication id for non-filtered replication. - % Remove replication doc body, after this we won't need it - % anymore. - Row0#rdoc{rep=nil, rid=RepId, info=nil} - end, - true = ets:insert(?MODULE, NewRow), - ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId), - ok = maybe_start_worker(Id); - _ -> - ok % doc could have been deleted, ignore - end, - ok; +finish_with_temporary_error(JTx, Job, RepDocData, Error) -> + #{?ERROR_COUNT := ErrorCount} = RepDocData, + ErrorCount1 = ErrorCount + 1, + RepDocData1 = RepDocData#{ + ?STATE := ?ST_ERROR, + ?STATE_INFO := Error, + ?ERROR_COUNT := ErrorCount1, + } = RepDocData, + schedule_error_backoff(JTx, Job, ErrorCount1), + case couch_jobs:finish(JTx, Job, RepDocData1) of + ok -> ok; + {error, halt} -> throw(halt) + end. -worker_returned(_Ref, _Id, ignore) -> - ok; -worker_returned(Ref, Id, {temporary_error, Reason}) -> - case ets:lookup(?MODULE, Id) of - [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] -> - NewRow = Row#rdoc{ - rid = nil, - state = error, - info = Reason, - errcnt = ErrCnt + 1, - worker = nil, - last_updated = os:timestamp() - }, - true = ets:insert(?MODULE, NewRow), - ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason), - ok = maybe_start_worker(Id); - _ -> - ok % doc could have been deleted, ignore - end, - ok; +finish_with_permanent_failure(JTx, Job, RepDocData, Error) -> + #{?ERROR_COUNT := ErrorCount} = RepDocData, + RepDocData1 = RepDocData#{ + ?STATE := ?ST_FAILED, + ?STATE_INFO := Error, + ?ERROR_COUNT := ErrorCount + 1, + } = RepDocData, + case couch_jobs:finish(JTx, Job, RepDocData1) of + ok -> ok; + {error, halt} -> throw(halt) + end. -worker_returned(Ref, Id, {permanent_failure, _Reason}) -> - case ets:lookup(?MODULE, Id) of - [#rdoc{worker = Ref}] -> - true = ets:delete(?MODULE, Id); - _ -> - ok % doc could have been deleted, ignore - end, + +dont_start_replication_job(JTx, Job, Rep, RepDocData) -> + RepDocData1 = RepDocData#{?LAST_UPDATED => erlang:system_time()}, + ok = schedule_filter_check(JTx, Job, Rep), + case couch_jobs:finish(JTx, Job, RepDocData1) of + ok -> ignore; + {error, halt} -> throw(halt) + end. + + +start_replication_job(JTx, Job, #{} = Rep, #{} = RepDocData) -> + #{?REP_ID := RepId} = Rep, + RepJobData = #{ + ?REP => Rep, + ?STATE => ?ST_PENDING, + ?STATE_INFO => null, + ?ERROR_COUNT => 0, + ?LAST_UPDATED => erlang:system_time(), + ?HISTORY => [] + }, + ok = couch_jobs:add(JTx, ?REP_JOBS, RepId, RepJobData), + RepDocData1 = RepDocData#{ + ?REP := Rep, + ?STATE := ?ST_SCHEDULED, + ?STATE_INFO := null, + ?ERROR_COUNT := 0, + ?LAST_UPDATED => erlang:system_time() + }, + ok = schedule_filter_check(JTx, Job, Rep), + case couch_jobs:finish(JTx, Job, RepDocData1) of + ok -> {ok, RepId}; + {error, halt} -> throw(halt) + end. + + +schedule_error_backoff(JTx, Job, ErrorCount) -> + Exp = min(ErrCnt, ?ERROR_MAX_BACKOFF_EXPONENT), + % ErrCnt is the exponent here. The reason 64 is used is to start at + % 64 (about a minute) max range. Then first backoff would be 30 sec + % on average. Then 1 minute and so on. + NowSec = erlang:system_time(second), + When = NowSec + rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp). + couch_jobs:resubmit(JTx, Job, trunc(When)). + + +schedule_filter_check(JTx, Job, #{<<"filter_type">> := <<"user">>} = Rep) -> + IntervalSec = filter_check_interval_sec(), + NowSec = erlang:system_time(second), + When = NowSec + 0.5 * IntervalSec + rand:uniform(IntervalSec), + couch_jobs:resubmit(JTx, Job, trunc(When)). + +schedule_filter_check(_JTx, _Job, #{}) -> ok. --spec maybe_update_doc_error(#rep{}, any()) -> ok. -maybe_update_doc_error(Rep, Reason) -> +remove_old_state_fields(#{?DOC_STATE := DocState} = RepDocData) when + DocState =:= ?TRIGGERED orelse DocState =:= ?ERROR -> case update_docs() of true -> - couch_replicator_docs:update_error(Rep, Reason); + ok; + false -> + #{?DB_NAME := DbName, ?DOC_ID := DocId} = RepDocData, + couch_replicator_docs:remove_state_fields(DbName, DocId) + end; + +remove_old_state_fields(#{}) -> + ok. + + +-spec maybe_update_doc_error(binary(), binary(), binary(), any()) -> ok. +maybe_update_doc_error(RepId, DbName, DocId, Error) -> + case update_docs() of + true -> + couch_replicator_docs:update_error(RepId, DbName, DocId, Error); false -> ok end. --spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok. -maybe_update_doc_triggered(Rep, RepId) -> +-spec maybe_update_doc_triggered(#{}, rep_id()) -> ok. +maybe_update_doc_triggered(RepId, DbName, DocId) -> case update_docs() of true -> - couch_replicator_docs:update_triggered(Rep, RepId); + couch_replicator_docs:update_triggered(RepId, DbName, DocId); false -> ok end. @@ -570,75 +499,17 @@ error_backoff(ErrCnt) -> couch_rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp). --spec filter_backoff() -> seconds(). -filter_backoff() -> - Total = ets:info(?MODULE, size), - % This value scaled by the number of replications. If the are a lot of them - % wait is longer, but not more than a day (?TS_DAY_SEC). If there are just - % few, wait is shorter, starting at about 30 seconds. `2 *` is used since - % the expected wait would then be 0.5 * Range so it is easier to see the - % average wait. `1 +` is used because couch_rand:uniform only - % accepts >= 1 values and crashes otherwise. - Range = 1 + min(2 * (Total / 10), ?TS_DAY_SEC), - ?MIN_FILTER_DELAY_SEC + couch_rand:uniform(round(Range)). - - -% Document removed from db -- clear ets table and remove all scheduled jobs --spec removed_doc(db_doc_id()) -> ok. -removed_doc({DbName, DocId} = Id) -> - ets:delete(?MODULE, Id), - RepIds = couch_replicator_scheduler:find_jobs_by_doc(DbName, DocId), - lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds). - - -% Whole db shard is gone -- remove all its ets rows and stop jobs --spec removed_db(binary()) -> ok. -removed_db(DbName) -> - EtsPat = #rdoc{id = {DbName, '_'}, _ = '_'}, - ets:match_delete(?MODULE, EtsPat), - RepIds = couch_replicator_scheduler:find_jobs_by_dbname(DbName), - lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds). - - -% Spawn a worker process which will attempt to calculate a replication id, then -% start a replication. Returns a process monitor reference. The worker is -% guaranteed to exit with rep_start_result() type only. --spec maybe_start_worker(db_doc_id()) -> ok. -maybe_start_worker(Id) -> - case ets:lookup(?MODULE, Id) of - [] -> - ok; - [#rdoc{state = scheduled, filter = Filter}] when Filter =/= user -> - ok; - [#rdoc{rep = Rep} = Doc] -> - % For any replication with a user created filter function, periodically - % (every `filter_backoff/0` seconds) to try to see if the user filter - % has changed by using a worker to check for changes. When the worker - % returns check if replication ID has changed. If it hasn't keep - % checking (spawn another worker and so on). If it has stop the job - % with the old ID and continue checking. - Wait = get_worker_wait(Doc), - Ref = make_ref(), - true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}), - couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref), - ok - end. - - --spec get_worker_wait(#rdoc{}) -> seconds(). -get_worker_wait(#rdoc{state = scheduled, filter = user}) -> - filter_backoff(); -get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) -> - error_backoff(ErrCnt); -get_worker_wait(#rdoc{state = initializing}) -> - 0. - - -spec update_docs() -> boolean(). update_docs() -> config:get_boolean("replicator", "update_docs", ?DEFAULT_UPDATE_DOCS). +-spec filter_check_interval_sec() -> integer(). +filter_check_interval_sec() -> + config:get_integer("replicator", "filter_check_interval_sec", + ?DEFAULT_FILTER_CHECK_INTERVAL_SEC). + + % _scheduler/docs HTTP endpoint helpers -spec docs([atom()]) -> [{[_]}] | []. @@ -753,23 +624,33 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) -> -spec add_rep_doc_job(any(), binary(), binary(), #{} | null, - binary() | null, binary() | null) -> ok. -add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError, DocState) -> + binary() | null) -> ok. +add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError) -> JobId = docs_job_id(DbName, DocId), - ok = remove_replication_by_doc_job_id(Db, JobId), - RepDocData = #{ - ?REP => Rep, - ?REP_PARSE_ERROR => RepParseError, - ?DOC_STATE => DocState, - ?DB_NAME => DbName, - ?DOC_ID => DocId, - ?STATE => ST_INITIALIZING, - ?ERROR_COUNT => 0, - ?LAST_UPDATED => erlang:system_time(), - ?STATE => null, - ?STATE_INFO => null. - }, - ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocData). + RepDocData = case Rep of + null -> + #{ + ?REP => null, + ?DB_NAME => DbName, + ?DOC_ID => DocId, + ?STATE => ?ST_INITIALIZING, + ?STATE_INFO => RepParseError + ?ERROR_COUNT => 0, + ?LAST_UPDATED => erlang:system_time() + }; + #{} -> + #{ + ?REP => Rep, + ?STATE => ?ST_INITIALIZING, + ?ERROR_COUNT => 0, + ?LAST_UPDATED => erlang:system_time(), + ?STATE_INFO => null + } + end, + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + ok = remove_replication_by_doc_job_id(JTx, JobId), + ok = couch_jobs:add(JTx, ?REP_DOCS, RepDocData) + end). docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) -> @@ -781,12 +662,14 @@ remove_replication_by_doc_job_id(Tx, Id) -> case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of {error, not_found} -> ok; - {ok, #{?REP := {<<"id">> := null}}} -> + {ok, #{?REP := {?REP_ID := null}}} -> couch_jobs:remove(Tx, ?REP_DOCS, Id), ok; - {ok, #{?REP := {<<"id">> := RepId}}} -> - couch_jobs:remove(Tx, ?REP_JOBS, RepId), - couch_jobs:remove(Tx, ?REP_DOCS, Id), + {ok, #{?REP := {?REP_ID := RepId}}} -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + couch_jobs:remove(JTx, ?REP_JOBS, RepId), + couch_jobs:remove(JTx, ?REP_DOCS, Id) + end), ok end. @@ -1068,7 +951,7 @@ test_rep(Id) -> change() -> {[ - {<<"id">>, ?DOC1}, + {?REP_ID, ?DOC1}, {doc, {[ {<<"_id">>, ?DOC1}, {<<"source">>, <<"http://srchost.local/src">>}, @@ -1079,7 +962,7 @@ change() -> change(State) -> {[ - {<<"id">>, ?DOC1}, + {?REP_ID, ?DOC1}, {doc, {[ {<<"_id">>, ?DOC1}, {<<"source">>, <<"http://srchost.local/src">>}, @@ -1091,7 +974,7 @@ change(State) -> deleted_change() -> {[ - {<<"id">>, ?DOC1}, + {?REP_ID, ?DOC1}, {<<"deleted">>, true}, {doc, {[ {<<"_id">>, ?DOC1}, @@ -1103,7 +986,7 @@ deleted_change() -> bad_change() -> {[ - {<<"id">>, ?DOC2}, + {?REP_ID, ?DOC2}, {doc, {[ {<<"_id">>, ?DOC2}, {<<"source">>, <<"src">>} |