summaryrefslogtreecommitdiff
path: root/src/couch_replicator/src/couch_replicator_doc_processor.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_doc_processor.erl')
-rw-r--r--src/couch_replicator/src/couch_replicator_doc_processor.erl613
1 files changed, 248 insertions, 365 deletions
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 9899befb7..d89233906 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -81,116 +81,42 @@ process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) ->
process_change(#{name := DbName} = Db, #doc{} = Doc) ->
#doc{id = DocId, body = {Props} = Body} = Doc,
- {Rep, RepError} = try
+ {Rep, Error} = try
Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body),
- Rep1 = Rep0#{
- <<"db_name">> => DbName,
- <<"start_time">> => erlang:system_time()
- },
+ DocState = get_json_value(<<"_replication_state">>, Props, null),
+ Rep1 = Rep0#{?DB_NAME := DbName, ?DOC_STATE := DocState},
{Rep1, null}
catch
throw:{bad_rep_doc, Reason} ->
{null, couch_replicator_utils:rep_error_to_binary(Reason)}
end,
- % We keep track of the doc's state in order to clear it if update_docs
- % is toggled from true to false
- DocState = get_json_value(<<"_replication_state">>, Props, null),
case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of
{error, not_found} ->
- add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState);
- {ok, #{?REP := null, ?REP_PARSE_ERROR := RepError}}
+ add_rep_doc_job(Db, DbName, DocId, Rep, Error);
+ {ok, #{?REP := null, ?REP_PARSE_ERROR := Error}}
when Rep =:= null ->
% Same error as before occurred, don't bother updating the job
ok;
{ok, #{?REP := null}} when Rep =:= null ->
% Error occured but it's a different error. Update the job so user
% sees the new error
- add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState);
+ add_rep_doc_job(Db, DbName, DocId, Rep, Error);
{ok, #{?REP := OldRep, ?REP_PARSE_ERROR := OldError}} ->
- NormOldRep = couch_replicator_util:normalize_rep(OldRep),
- NormRep = couch_replicator_util:normalize_rep(Rep),
- case NormOldRep == NormRep of
+ case compare_reps(OldRep, Rep) of
true ->
% Document was changed but none of the parameters relevent
% for the replication job have changed, so make it a no-op
ok;
false ->
- add_rep_doc_job(Db, DbName, DocId, Rep, RepError,
- DocState)
+ add_rep_doc_job(Db, DbName, DocId, Rep, Error)
end
end.
-
-
-rep_docs_job_execute(#{} = Job, #{<<"rep">> := null} = JobData) ->
- #{
- <<"rep_parse_error">> := Error,
- <<"db_name">> := DbName,
- <<"doc_id">> := DocId,
- } = JobData,
- JobData1 = JobData#{
- <<"finished_state">> := <<"failed">>,
- <<"finished_result">> := Error
- }
- case couch_jobs:finish(undefined, Job, JobData1) of
- ok ->
- couch_replicator_docs:update_failed(DbName, DocId, Error),
- ok;
- {error, JobError} ->
- Msg = "Replication ~s job could not finish. JobError:~p",
- couch_log:error(Msg, [RepId, JobError]),
- {error, JobError}
- end;
-
-rep_docs_job_execute(#{} = Job, #{} = JobData) ->
- #{<<"rep">> := Rep, <<"doc_state">> := DocState} = JobData,
- case lists:member(DocState, [<<"triggered">>, <<"error">>]) of
- true -> maybe_remove_state_fields(DbName, DocId),
- false -> ok
- end,
- % completed jobs should finish right away
-
- % otherwise start computing the replication id
-
- Rep1 = update_replication_id(Rep),
-
- % when done add or update the replicaton job
- % if jobs has a filter keep checking if filter changes
-
-
-maybe_remove_state_fields(DbName, DocId) ->
- case update_docs() of
- true ->
- ok;
- false ->
- couch_replicator_docs:remove_state_fields(DbName, DocId)
- end.
-
-
-process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
- % Parsing replication doc (but not calculating the id) could throw an
- % exception which would indicate this document is malformed. This exception
- % should propagate to db_change function and will be recorded as permanent
- % failure in the document. User will have to update the documet to fix the
- % problem.
- Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc),
- Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()},
- Filter = case couch_replicator_filters:parse(Rep#rep.options) of
- {ok, nil} ->
- nil;
- {ok, {user, _FName, _QP}} ->
- user;
- {ok, {view, _FName, _QP}} ->
- view;
- {ok, {docids, _DocIds}} ->
- docids;
- {ok, {mango, _Selector}} ->
- mango;
- {error, FilterError} ->
- throw(FilterError)
- end,
- gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity).
+compare_reps(Rep1, Rep2) ->
+ NormRep1 = couch_replicator_util:normalize_rep(Rep1),
+ NormRep2 = couch_replicator_util:normalize_rep(Rep2),
+ NormRep1 =:= NormRep2.
start_link() ->
@@ -218,7 +144,7 @@ handle_call(Msg, _From, #{} = St) ->
handle_cast({?ACCEPTED_JOB, Job, JobData}, #{} = St) ->
- {noreply, execute_rep_doc_job(Job, JobData, St)};
+ {noreply, spawn_worker(Job, JobData, St)};
handle_cast(Msg, #{} = St) ->
{stop, {bad_cast, Msg}, St}.
@@ -326,236 +252,239 @@ start_acceptors(N, #st{} = St) ->
start_worker(Job, #{} = JobData, #{workers := Workers} = St) ->
- Parent = self(),
- Pid = spawn_link(fun() -> rep_doc_job_worker(Job, JobData, Parent) end),
+ Pid = spawn_link(fun() -> worker_fun(Job, JobData) end),
St#{workers := Workers#{Pid => true}}.
-rep_doc_job_worker(Job, #{?REP := null} = RepDocData, Parent) ->
+worker_fun(Job, JobData) ->
+ try
+ worker_fun1(Job, JobData)
+ catch
+ throw:halt ->
+ Msg = "~p : replication doc job ~p lock conflict",
+ couch_log:error(Msg, [?MODULE, Job]);
+ throw:{rep_doc_not_current, DbName, DocId} ->
+ Msg = "~p : replication doc ~s:~s is not current",
+ couch_log:error(Msg, [?MODULE, DbName, DocID]),
+ end.
+
+
+worker_fun1(Job, #{?REP := null} = RepDocData) ->
#{
- ?REP_PARSE_ERROR := Error,
+ ?STATE_INFO := Error,
?DB_NAME := DbName,
?DOC_ID := DocId
- ?ERROR_COUNT := ErrorCount,
} = RepDocData,
- RepDocData1 = RepDocData#{
- ?LAST_UPDATED := erlang:system_time(),
- ?ERROR_COUNT := ErrorCount + 1,
- ?FINISHED_STATE := ?FAILED,
- ?FINISHED_RESULT := Error,
- },
- case couch_jobs:finish(undefined, Job, RepDocData1) of
- ok ->
- couch_replicator_docs:update_failed(DbName, DocId, Error),
- St;
- {error, JobError} ->
- Msg = "~p : replication ~s job could not finish. JobError:~p",
- couch_log:error(Msg, [?MODULE, RepId, JobError]),
- St
- end;
+ finish_with_permanent_failure(undefined, Job, RepDocData, Error),
+ couch_replicator_docs:update_failed(DbName, DocId, Error);
-rep_doc_job_worker(Job, #{?REP := #{}} = RepDocData, Parent) ->
- #{
- ?REP := Rep1,
- ?DOC_STATE := DocState,
- } = RepDocData,
- ok = remove_old_state_fields(Rep, DocState),
- % Calculate replication ID. In most case this is fast and easy
- % but for filtered replications with user JS filters this
- % means making a remote connection to the source db.
- Rep2 = #{<<"id">> := RepId} = couch_replicator_docs:update_rep_id(Rep1),
+
+worker_fun1(Job, #{?REP := #{}} = RepDocData) ->
+ #{?REP := Rep} = RepDocData,
+ #{?REP_ID := OldRepId, ?DB_NAME := DbName, ?DOC_ID := DocId} = Rep,
+ ok = remove_old_state_fields(RepDocData),
try
- couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
- maybe_start_replication_job(JTx, Rep)
- end)
+ RepWithId = couch_replicator_docs:update_rep_id(Rep),
+ worker_fun2(Job, OldRepId, RepWithId, RepDocData)
catch
- throw:{job_start_error, halt} ->
- Msg = "~p : replication doc job ~s lock conflict",
- couch_log:error(Msg, [?MODULE, RepId]),
- exit(normal);
- throw:{job_start_error, Error} ->
- Msg = "~p : replication job start failed ~slock conflict",
- couch_log:error(Msg, [?MODULE, RepId])
+ throw:{filter_fetch_error, Error} ->
+ Error1 = io_lib:format("Filter fetch error ~p", [Error]),
+ Error2 = couch_util:to_binary(Error1),
+ finish_with_temporary_error(undefined, Job, RepDocData, Error2),
+ maybe_update_doc_error(OldRepId, DbName, DocId, Error2)
end.
-maybe_start_replication_job(JTx, Rep) ->
- case couch_jobs:get_job_data(JTx, ?REP_JOBS, RepId) of
- {error, not_found} ->
- RepJobData = #{
- ?REP := Rep2,
- ?STATE := ST_PENDING,
- ?STATE_INFO := null,
- },
- ok = couch_jobs:add(JTx, ?REP_JOBS, RepJobData),
- RepDocData1 = RepDocData#{
- ?REP := Rep2,
- ?STATE := ST_SCHEDULED,
- ?STATE_INFO := null,
- ?ERROR_COUNT := 0,
- ?LAST_UPDATED => erlang:system_info()
- },
- case couch_jobs:finish(JTx, Job, RepDocData1) of
- ok -> ok;
- Error -> throw({job_start_error, Error})
-
-remove_old_state_fields(#{} = Rep, ?TRIGGERED) ->
- case update_docs() of
- true -> ok;
- false -> couch_replicator_docs:remove_state_fields(DbName, DocId)
- end;
-remove_old_state_fields(#{} = Rep, ?ERROR) ->
- case update_docs() of
- true -> ok;
- false -> couch_replicator_docs:remove_state_fields(DbName, DocId)
- end;
+worker_fun2(Job, OldRepId, #{} = Rep, #{} = RepDocData) ->
+ Result = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ check_rep_doc_current(JTx, Rep),
+ remove_stale_replication_job(JTx, OldRepId, Rep),
+ maybe_start_replication_job(JTx, Job, Rep, RepDocData)
+ end),
+ case Result of
+ {ok, RepId} ->
+ maybe_update_doc_triggered(DbName, DocId, RepId);
+ ignore ->
+ ok;
+ {error, {permanent_failure, Error}} ->
+ couch_replicator_docs:update_failed(DbName, DocId, Error);
+ {error, {temporary_error, RepId, Error}} ->
+ maybe_update_doc_error(RepId, DbName, DocId, Error)
+ end.
-maybe_remove_old_state_fields(#{} = _Rep, _) ->
- ok.
+check_rep_doc_current(JTx, #{} = Rep) ->
+ #{?DB_NAME := DbName, ?DOC_ID := DocId, ?VER := Ver} = Rep,
+ case couch_jobs:get_job_data(JTx, ?REP_DOCS, doc_job_id(DbName, DocId)) of
+ {ok, #{?REP := #{?VER: = Ver}}} ->
+ ok;
+ {ok, #{?REP := #{?VER := V}}} when Ver =/= V ->
+ throw({rep_doc_not_current, DbName, DocId});
+ {error, not_found} ->
+ throw({rep_doc_not_current, DbName, DocId});
+ end.
-% Doc processor gen_server private helper functions
-% Handle doc update -- add to ets, then start a worker to try to turn it into
-% a replication job. In most cases it will succeed quickly but for filtered
-% replications or if there are duplicates, it could take longer
-% (theoretically indefinitely) until a replication could be started. Before
-% adding replication job, make sure to delete all old jobs associated with
-% same document.
--spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok.
-updated_doc(Id, Rep, Filter) ->
- NormCurRep = couch_replicator_utils:normalize_rep(current_rep(Id)),
- NormNewRep = couch_replicator_utils:normalize_rep(Rep),
- case NormCurRep == NormNewRep of
- false ->
- removed_doc(Id),
- Row = #rdoc{
- id = Id,
- state = initializing,
- rep = Rep,
- rid = nil,
- filter = Filter,
- info = nil,
- errcnt = 0,
- worker = nil,
- last_updated = os:timestamp()
- },
- true = ets:insert(?MODULE, Row),
- ok = maybe_start_worker(Id);
- true ->
+% A stale replication job is one still running after the filter
+% has been updated and a new replication id was generated.
+%
+remove_stale_replication_job(_, null, #{}) ->
+ ok;
+
+remove_stale_replication_job(JTx, OldRepId, #{} = Rep) ->
+ #{?REP_ID := RepId, ?VER := Ver} = Rep,
+ case couch_jobs:get_job_data(JTx, ?REP_JOBS, OldRepId) of
+ {error, not_found} ->
+ ok;
+ {ok, #{?REP := {?VER := Ver}} when OldRep =/= RepId ->
+ couch_jobs:remove(JTx, ?REP_JOBS, OldRepId)
+ {ok, #{}} ->
ok
end.
-% Return current #rep{} record if any. If replication hasn't been submitted
-% to the scheduler yet, #rep{} record will be in the document processor's
-% ETS table, otherwise query scheduler for the #rep{} record.
--spec current_rep({binary(), binary()}) -> #rep{} | nil.
-current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
- case ets:lookup(?MODULE, {DbName, DocId}) of
- [] ->
- nil;
- [#rdoc{state = scheduled, rep = nil, rid = JobId}] ->
- % When replication is scheduled, #rep{} record which can be quite
- % large compared to other bits in #rdoc is removed in order to avoid
- % having to keep 2 copies of it. So have to fetch it from the
- % scheduler.
- couch_replicator_scheduler:rep_state(JobId);
- [#rdoc{rep = Rep}] ->
- Rep
+maybe_start_replication_job(JTx, Job, #{} = Rep, #{} = RepDocData) ->
+ {#?REP_ID := RepId, ?DB_NAME := DbName, ?DOC_ID := DocId} = Rep,
+ case couch_jobs:get_job_data(JTx, ?REP_JOBS, RepId) of
+ {error, not_found} ->
+ start_replication_job(JTx, Job, Rep, RepDocData);
+ {ok, #{?REP := {?DB_NAME := DbName, ?DOC_ID := DocId}} = CurRep} ->
+ case compare_reps(Rep, CurRep) of
+ true ->
+ dont_start_replication_job(JTx, Job, Rep, RepDocData);
+ false ->
+ ok = couch_jobs:remove(JTx, ?REP_JOBS, RepId),
+ start_replication_job(JTx, Job, Rep, RepDocData)
+ end;
+ {ok, #{?REP := {?DB_NAME := null}}} ->
+ Err1 = io_lib:format("Replication `~s` specified by `~s:~s`"
+ " already running as a transient replication, started via"
+ " `_replicate` API endpoint", [RepId, DbName, DocId]),
+ Err2 = couch_util:to_binary(Err1),
+ ok = finish_with_temporary_error(JTx, Job, RepDocData, Err2),
+ {error, {temporary_error, RepId, Error2}};
+ {ok, #{?REP := {?DB_NAME := OtherDb, ?DOC_ID := OtherDoc}}} ->
+ Err1 = io_lib:format("Replication `~s` specified by `~s:~s`"
+ " already started by document `~s:~s`", [RepId, DocId,
+ DbName, OtherDb, OtherDoc],
+ Error2 = couch_util:to_binary(Err1),
+ ok = finish_with_permanent_failure(JTx, Job, RepDocData, Error),
+ {error, {permanent_failure, Error2}}
end.
--spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok.
-worker_returned(Ref, Id, {ok, RepId}) ->
- case ets:lookup(?MODULE, Id) of
- [#rdoc{worker = Ref} = Row] ->
- Row0 = Row#rdoc{
- state = scheduled,
- errcnt = 0,
- worker = nil,
- last_updated = os:timestamp()
- },
- NewRow = case Row0 of
- #rdoc{rid = RepId, filter = user} ->
- % Filtered replication id didn't change.
- Row0;
- #rdoc{rid = nil, filter = user} ->
- % Calculated new replication id for a filtered replication. Make
- % sure to schedule another check as filter code could change.
- % Replication starts could have been failing, so also clear
- % error count.
- Row0#rdoc{rid = RepId};
- #rdoc{rid = OldRepId, filter = user} ->
- % Replication id of existing replication job with filter has
- % changed. Remove old replication job from scheduler and
- % schedule check to check for future changes.
- ok = couch_replicator_scheduler:remove_job(OldRepId),
- Msg = io_lib:format("Replication id changed: ~p -> ~p", [
- OldRepId, RepId]),
- Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)};
- #rdoc{rid = nil} ->
- % Calculated new replication id for non-filtered replication.
- % Remove replication doc body, after this we won't need it
- % anymore.
- Row0#rdoc{rep=nil, rid=RepId, info=nil}
- end,
- true = ets:insert(?MODULE, NewRow),
- ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId),
- ok = maybe_start_worker(Id);
- _ ->
- ok % doc could have been deleted, ignore
- end,
- ok;
+finish_with_temporary_error(JTx, Job, RepDocData, Error) ->
+ #{?ERROR_COUNT := ErrorCount} = RepDocData,
+ ErrorCount1 = ErrorCount + 1,
+ RepDocData1 = RepDocData#{
+ ?STATE := ?ST_ERROR,
+ ?STATE_INFO := Error,
+ ?ERROR_COUNT := ErrorCount1,
+ } = RepDocData,
+ schedule_error_backoff(JTx, Job, ErrorCount1),
+ case couch_jobs:finish(JTx, Job, RepDocData1) of
+ ok -> ok;
+ {error, halt} -> throw(halt)
+ end.
-worker_returned(_Ref, _Id, ignore) ->
- ok;
-worker_returned(Ref, Id, {temporary_error, Reason}) ->
- case ets:lookup(?MODULE, Id) of
- [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] ->
- NewRow = Row#rdoc{
- rid = nil,
- state = error,
- info = Reason,
- errcnt = ErrCnt + 1,
- worker = nil,
- last_updated = os:timestamp()
- },
- true = ets:insert(?MODULE, NewRow),
- ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason),
- ok = maybe_start_worker(Id);
- _ ->
- ok % doc could have been deleted, ignore
- end,
- ok;
+finish_with_permanent_failure(JTx, Job, RepDocData, Error) ->
+ #{?ERROR_COUNT := ErrorCount} = RepDocData,
+ RepDocData1 = RepDocData#{
+ ?STATE := ?ST_FAILED,
+ ?STATE_INFO := Error,
+ ?ERROR_COUNT := ErrorCount + 1,
+ } = RepDocData,
+ case couch_jobs:finish(JTx, Job, RepDocData1) of
+ ok -> ok;
+ {error, halt} -> throw(halt)
+ end.
-worker_returned(Ref, Id, {permanent_failure, _Reason}) ->
- case ets:lookup(?MODULE, Id) of
- [#rdoc{worker = Ref}] ->
- true = ets:delete(?MODULE, Id);
- _ ->
- ok % doc could have been deleted, ignore
- end,
+
+dont_start_replication_job(JTx, Job, Rep, RepDocData) ->
+ RepDocData1 = RepDocData#{?LAST_UPDATED => erlang:system_time()},
+ ok = schedule_filter_check(JTx, Job, Rep),
+ case couch_jobs:finish(JTx, Job, RepDocData1) of
+ ok -> ignore;
+ {error, halt} -> throw(halt)
+ end.
+
+
+start_replication_job(JTx, Job, #{} = Rep, #{} = RepDocData) ->
+ #{?REP_ID := RepId} = Rep,
+ RepJobData = #{
+ ?REP => Rep,
+ ?STATE => ?ST_PENDING,
+ ?STATE_INFO => null,
+ ?ERROR_COUNT => 0,
+ ?LAST_UPDATED => erlang:system_time(),
+ ?HISTORY => []
+ },
+ ok = couch_jobs:add(JTx, ?REP_JOBS, RepId, RepJobData),
+ RepDocData1 = RepDocData#{
+ ?REP := Rep,
+ ?STATE := ?ST_SCHEDULED,
+ ?STATE_INFO := null,
+ ?ERROR_COUNT := 0,
+ ?LAST_UPDATED => erlang:system_time()
+ },
+ ok = schedule_filter_check(JTx, Job, Rep),
+ case couch_jobs:finish(JTx, Job, RepDocData1) of
+ ok -> {ok, RepId};
+ {error, halt} -> throw(halt)
+ end.
+
+
+schedule_error_backoff(JTx, Job, ErrorCount) ->
+ Exp = min(ErrCnt, ?ERROR_MAX_BACKOFF_EXPONENT),
+ % ErrCnt is the exponent here. The reason 64 is used is to start at
+ % 64 (about a minute) max range. Then first backoff would be 30 sec
+ % on average. Then 1 minute and so on.
+ NowSec = erlang:system_time(second),
+ When = NowSec + rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp).
+ couch_jobs:resubmit(JTx, Job, trunc(When)).
+
+
+schedule_filter_check(JTx, Job, #{<<"filter_type">> := <<"user">>} = Rep) ->
+ IntervalSec = filter_check_interval_sec(),
+ NowSec = erlang:system_time(second),
+ When = NowSec + 0.5 * IntervalSec + rand:uniform(IntervalSec),
+ couch_jobs:resubmit(JTx, Job, trunc(When)).
+
+schedule_filter_check(_JTx, _Job, #{}) ->
ok.
--spec maybe_update_doc_error(#rep{}, any()) -> ok.
-maybe_update_doc_error(Rep, Reason) ->
+remove_old_state_fields(#{?DOC_STATE := DocState} = RepDocData) when
+ DocState =:= ?TRIGGERED orelse DocState =:= ?ERROR ->
case update_docs() of
true ->
- couch_replicator_docs:update_error(Rep, Reason);
+ ok;
+ false ->
+ #{?DB_NAME := DbName, ?DOC_ID := DocId} = RepDocData,
+ couch_replicator_docs:remove_state_fields(DbName, DocId)
+ end;
+
+remove_old_state_fields(#{}) ->
+ ok.
+
+
+-spec maybe_update_doc_error(binary(), binary(), binary(), any()) -> ok.
+maybe_update_doc_error(RepId, DbName, DocId, Error) ->
+ case update_docs() of
+ true ->
+ couch_replicator_docs:update_error(RepId, DbName, DocId, Error);
false ->
ok
end.
--spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok.
-maybe_update_doc_triggered(Rep, RepId) ->
+-spec maybe_update_doc_triggered(#{}, rep_id()) -> ok.
+maybe_update_doc_triggered(RepId, DbName, DocId) ->
case update_docs() of
true ->
- couch_replicator_docs:update_triggered(Rep, RepId);
+ couch_replicator_docs:update_triggered(RepId, DbName, DocId);
false ->
ok
end.
@@ -570,75 +499,17 @@ error_backoff(ErrCnt) ->
couch_rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp).
--spec filter_backoff() -> seconds().
-filter_backoff() ->
- Total = ets:info(?MODULE, size),
- % This value scaled by the number of replications. If the are a lot of them
- % wait is longer, but not more than a day (?TS_DAY_SEC). If there are just
- % few, wait is shorter, starting at about 30 seconds. `2 *` is used since
- % the expected wait would then be 0.5 * Range so it is easier to see the
- % average wait. `1 +` is used because couch_rand:uniform only
- % accepts >= 1 values and crashes otherwise.
- Range = 1 + min(2 * (Total / 10), ?TS_DAY_SEC),
- ?MIN_FILTER_DELAY_SEC + couch_rand:uniform(round(Range)).
-
-
-% Document removed from db -- clear ets table and remove all scheduled jobs
--spec removed_doc(db_doc_id()) -> ok.
-removed_doc({DbName, DocId} = Id) ->
- ets:delete(?MODULE, Id),
- RepIds = couch_replicator_scheduler:find_jobs_by_doc(DbName, DocId),
- lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
-
-
-% Whole db shard is gone -- remove all its ets rows and stop jobs
--spec removed_db(binary()) -> ok.
-removed_db(DbName) ->
- EtsPat = #rdoc{id = {DbName, '_'}, _ = '_'},
- ets:match_delete(?MODULE, EtsPat),
- RepIds = couch_replicator_scheduler:find_jobs_by_dbname(DbName),
- lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
-
-
-% Spawn a worker process which will attempt to calculate a replication id, then
-% start a replication. Returns a process monitor reference. The worker is
-% guaranteed to exit with rep_start_result() type only.
--spec maybe_start_worker(db_doc_id()) -> ok.
-maybe_start_worker(Id) ->
- case ets:lookup(?MODULE, Id) of
- [] ->
- ok;
- [#rdoc{state = scheduled, filter = Filter}] when Filter =/= user ->
- ok;
- [#rdoc{rep = Rep} = Doc] ->
- % For any replication with a user created filter function, periodically
- % (every `filter_backoff/0` seconds) to try to see if the user filter
- % has changed by using a worker to check for changes. When the worker
- % returns check if replication ID has changed. If it hasn't keep
- % checking (spawn another worker and so on). If it has stop the job
- % with the old ID and continue checking.
- Wait = get_worker_wait(Doc),
- Ref = make_ref(),
- true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}),
- couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref),
- ok
- end.
-
-
--spec get_worker_wait(#rdoc{}) -> seconds().
-get_worker_wait(#rdoc{state = scheduled, filter = user}) ->
- filter_backoff();
-get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) ->
- error_backoff(ErrCnt);
-get_worker_wait(#rdoc{state = initializing}) ->
- 0.
-
-
-spec update_docs() -> boolean().
update_docs() ->
config:get_boolean("replicator", "update_docs", ?DEFAULT_UPDATE_DOCS).
+-spec filter_check_interval_sec() -> integer().
+filter_check_interval_sec() ->
+ config:get_integer("replicator", "filter_check_interval_sec",
+ ?DEFAULT_FILTER_CHECK_INTERVAL_SEC).
+
+
% _scheduler/docs HTTP endpoint helpers
-spec docs([atom()]) -> [{[_]}] | [].
@@ -753,23 +624,33 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
-spec add_rep_doc_job(any(), binary(), binary(), #{} | null,
- binary() | null, binary() | null) -> ok.
-add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError, DocState) ->
+ binary() | null) -> ok.
+add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError) ->
JobId = docs_job_id(DbName, DocId),
- ok = remove_replication_by_doc_job_id(Db, JobId),
- RepDocData = #{
- ?REP => Rep,
- ?REP_PARSE_ERROR => RepParseError,
- ?DOC_STATE => DocState,
- ?DB_NAME => DbName,
- ?DOC_ID => DocId,
- ?STATE => ST_INITIALIZING,
- ?ERROR_COUNT => 0,
- ?LAST_UPDATED => erlang:system_time(),
- ?STATE => null,
- ?STATE_INFO => null.
- },
- ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocData).
+ RepDocData = case Rep of
+ null ->
+ #{
+ ?REP => null,
+ ?DB_NAME => DbName,
+ ?DOC_ID => DocId,
+ ?STATE => ?ST_INITIALIZING,
+ ?STATE_INFO => RepParseError
+ ?ERROR_COUNT => 0,
+ ?LAST_UPDATED => erlang:system_time()
+ };
+ #{} ->
+ #{
+ ?REP => Rep,
+ ?STATE => ?ST_INITIALIZING,
+ ?ERROR_COUNT => 0,
+ ?LAST_UPDATED => erlang:system_time(),
+ ?STATE_INFO => null
+ }
+ end,
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ ok = remove_replication_by_doc_job_id(JTx, JobId),
+ ok = couch_jobs:add(JTx, ?REP_DOCS, RepDocData)
+ end).
docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
@@ -781,12 +662,14 @@ remove_replication_by_doc_job_id(Tx, Id) ->
case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of
{error, not_found} ->
ok;
- {ok, #{?REP := {<<"id">> := null}}} ->
+ {ok, #{?REP := {?REP_ID := null}}} ->
couch_jobs:remove(Tx, ?REP_DOCS, Id),
ok;
- {ok, #{?REP := {<<"id">> := RepId}}} ->
- couch_jobs:remove(Tx, ?REP_JOBS, RepId),
- couch_jobs:remove(Tx, ?REP_DOCS, Id),
+ {ok, #{?REP := {?REP_ID := RepId}}} ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ couch_jobs:remove(JTx, ?REP_JOBS, RepId),
+ couch_jobs:remove(JTx, ?REP_DOCS, Id)
+ end),
ok
end.
@@ -1068,7 +951,7 @@ test_rep(Id) ->
change() ->
{[
- {<<"id">>, ?DOC1},
+ {?REP_ID, ?DOC1},
{doc, {[
{<<"_id">>, ?DOC1},
{<<"source">>, <<"http://srchost.local/src">>},
@@ -1079,7 +962,7 @@ change() ->
change(State) ->
{[
- {<<"id">>, ?DOC1},
+ {?REP_ID, ?DOC1},
{doc, {[
{<<"_id">>, ?DOC1},
{<<"source">>, <<"http://srchost.local/src">>},
@@ -1091,7 +974,7 @@ change(State) ->
deleted_change() ->
{[
- {<<"id">>, ?DOC1},
+ {?REP_ID, ?DOC1},
{<<"deleted">>, true},
{doc, {[
{<<"_id">>, ?DOC1},
@@ -1103,7 +986,7 @@ deleted_change() ->
bad_change() ->
{[
- {<<"id">>, ?DOC2},
+ {?REP_ID, ?DOC2},
{doc, {[
{<<"_id">>, ?DOC2},
{<<"source">>, <<"src">>}