diff options
5 files changed, 281 insertions, 671 deletions
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 050fcb4a8..d94c3eb1f 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -41,6 +41,12 @@
-define(ST_CRASHING, <<"crashing">>).
-define(ST_TRIGGERED, <<"triggered">>).
+% Some fields from a rep object
+-define(REP_ID, <<"id">>).
+-define(DB_NAME, <<"db_name">>).
+-define(DOC_ID, <<"doc_id">>).
+-define(START_TIME, <<"start_time">>).
% Fields couch job data objects
-define(REP, <<"rep">>).
-define(REP_PARSE_ERROR, <<"rep_parse_error">>).
@@ -51,6 +57,8 @@
-define(DOC_ID, <<"doc_id">>).
-define(ERROR_COUNT, <<"error_count">>).
-define(LAST_UPDATED, <<"last_updated">>).
+-define(HISTORY, <<"history">>).
+-define(VER, <<"ver">>).
% Accepted job message tag
-define(ACCEPTED_JOB, accepted_job).
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 9899befb7..d89233906 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -81,116 +81,42 @@ process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) ->
process_change(#{name := DbName} = Db, #doc{} = Doc) ->
#doc{id = DocId, body = {Props} = Body} = Doc,
- {Rep, RepError} = try
+ {Rep, Error} = try
Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body),
- Rep1 = Rep0#{
- <<"db_name">> => DbName,
- <<"start_time">> => erlang:system_time()
- },
+ DocState = get_json_value(<<"_replication_state">>, Props, null),
+ Rep1 = Rep0#{?DB_NAME := DbName, ?DOC_STATE := DocState},
{Rep1, null}
throw:{bad_rep_doc, Reason} ->
{null, couch_replicator_utils:rep_error_to_binary(Reason)}
- % We keep track of the doc's state in order to clear it if update_docs
- % is toggled from true to false
- DocState = get_json_value(<<"_replication_state">>, Props, null),
case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of
{error, not_found} ->
- add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState);
- {ok, #{?REP := null, ?REP_PARSE_ERROR := RepError}}
+ add_rep_doc_job(Db, DbName, DocId, Rep, Error);
+ {ok, #{?REP := null, ?REP_PARSE_ERROR := Error}}
when Rep =:= null ->
% Same error as before occurred, don't bother updating the job
{ok, #{?REP := null}} when Rep =:= null ->
% Error occured but it's a different error. Update the job so user
% sees the new error
- add_rep_doc_job(Db, DbName, DocId, Rep, RepError, DocState);
+ add_rep_doc_job(Db, DbName, DocId, Rep, Error);
{ok, #{?REP := OldRep, ?REP_PARSE_ERROR := OldError}} ->
- NormOldRep = couch_replicator_util:normalize_rep(OldRep),
- NormRep = couch_replicator_util:normalize_rep(Rep),
- case NormOldRep == NormRep of
+ case compare_reps(OldRep, Rep) of
true ->
% Document was changed but none of the parameters relevent
% for the replication job have changed, so make it a no-op
false ->
- add_rep_doc_job(Db, DbName, DocId, Rep, RepError,
- DocState)
+ add_rep_doc_job(Db, DbName, DocId, Rep, Error)
-rep_docs_job_execute(#{} = Job, #{<<"rep">> := null} = JobData) ->
- #{
- <<"rep_parse_error">> := Error,
- <<"db_name">> := DbName,
- <<"doc_id">> := DocId,
- } = JobData,
- JobData1 = JobData#{
- <<"finished_state">> := <<"failed">>,
- <<"finished_result">> := Error
- }
- case couch_jobs:finish(undefined, Job, JobData1) of
- ok ->
- couch_replicator_docs:update_failed(DbName, DocId, Error),
- ok;
- {error, JobError} ->
- Msg = "Replication ~s job could not finish. JobError:~p",
- couch_log:error(Msg, [RepId, JobError]),
- {error, JobError}
- end;
-rep_docs_job_execute(#{} = Job, #{} = JobData) ->
- #{<<"rep">> := Rep, <<"doc_state">> := DocState} = JobData,
- case lists:member(DocState, [<<"triggered">>, <<"error">>]) of
- true -> maybe_remove_state_fields(DbName, DocId),
- false -> ok
- end,
- % completed jobs should finish right away
- % otherwise start computing the replication id
- Rep1 = update_replication_id(Rep),
- % when done add or update the replicaton job
- % if jobs has a filter keep checking if filter changes
-maybe_remove_state_fields(DbName, DocId) ->
- case update_docs() of
- true ->
- ok;
- false ->
- couch_replicator_docs:remove_state_fields(DbName, DocId)
- end.
-process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
- % Parsing replication doc (but not calculating the id) could throw an
- % exception which would indicate this document is malformed. This exception
- % should propagate to db_change function and will be recorded as permanent
- % failure in the document. User will have to update the documet to fix the
- % problem.
- Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc),
- Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()},
- Filter = case couch_replicator_filters:parse(Rep#rep.options) of
- {ok, nil} ->
- nil;
- {ok, {user, _FName, _QP}} ->
- user;
- {ok, {view, _FName, _QP}} ->
- view;
- {ok, {docids, _DocIds}} ->
- docids;
- {ok, {mango, _Selector}} ->
- mango;
- {error, FilterError} ->
- throw(FilterError)
- end,
- gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity).
+compare_reps(Rep1, Rep2) ->
+ NormRep1 = couch_replicator_util:normalize_rep(Rep1),
+ NormRep2 = couch_replicator_util:normalize_rep(Rep2),
+ NormRep1 =:= NormRep2.
start_link() ->
@@ -218,7 +144,7 @@ handle_call(Msg, _From, #{} = St) ->
handle_cast({?ACCEPTED_JOB, Job, JobData}, #{} = St) ->
- {noreply, execute_rep_doc_job(Job, JobData, St)};
+ {noreply, spawn_worker(Job, JobData, St)};
handle_cast(Msg, #{} = St) ->
{stop, {bad_cast, Msg}, St}.
@@ -326,236 +252,239 @@ start_acceptors(N, #st{} = St) ->
start_worker(Job, #{} = JobData, #{workers := Workers} = St) ->
- Parent = self(),
- Pid = spawn_link(fun() -> rep_doc_job_worker(Job, JobData, Parent) end),
+ Pid = spawn_link(fun() -> worker_fun(Job, JobData) end),
St#{workers := Workers#{Pid => true}}.
-rep_doc_job_worker(Job, #{?REP := null} = RepDocData, Parent) ->
+worker_fun(Job, JobData) ->
+ try
+ worker_fun1(Job, JobData)
+ catch
+ throw:halt ->
+ Msg = "~p : replication doc job ~p lock conflict",
+ couch_log:error(Msg, [?MODULE, Job]);
+ throw:{rep_doc_not_current, DbName, DocId} ->
+ Msg = "~p : replication doc ~s:~s is not current",
+ couch_log:error(Msg, [?MODULE, DbName, DocID]),
+ end.
+worker_fun1(Job, #{?REP := null} = RepDocData) ->
- ?REP_PARSE_ERROR := Error,
+ ?STATE_INFO := Error,
?DB_NAME := DbName,
?DOC_ID := DocId
- ?ERROR_COUNT := ErrorCount,
} = RepDocData,
- RepDocData1 = RepDocData#{
- ?LAST_UPDATED := erlang:system_time(),
- ?ERROR_COUNT := ErrorCount + 1,
- },
- case couch_jobs:finish(undefined, Job, RepDocData1) of
- ok ->
- couch_replicator_docs:update_failed(DbName, DocId, Error),
- St;
- {error, JobError} ->
- Msg = "~p : replication ~s job could not finish. JobError:~p",
- couch_log:error(Msg, [?MODULE, RepId, JobError]),
- St
- end;
+ finish_with_permanent_failure(undefined, Job, RepDocData, Error),
+ couch_replicator_docs:update_failed(DbName, DocId, Error);
-rep_doc_job_worker(Job, #{?REP := #{}} = RepDocData, Parent) ->
- #{
- ?REP := Rep1,
- ?DOC_STATE := DocState,
- } = RepDocData,
- ok = remove_old_state_fields(Rep, DocState),
- % Calculate replication ID. In most case this is fast and easy
- % but for filtered replications with user JS filters this
- % means making a remote connection to the source db.
- Rep2 = #{<<"id">> := RepId} = couch_replicator_docs:update_rep_id(Rep1),
+worker_fun1(Job, #{?REP := #{}} = RepDocData) ->
+ #{?REP := Rep} = RepDocData,
+ #{?REP_ID := OldRepId, ?DB_NAME := DbName, ?DOC_ID := DocId} = Rep,
+ ok = remove_old_state_fields(RepDocData),
- couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
- maybe_start_replication_job(JTx, Rep)
- end)
+ RepWithId = couch_replicator_docs:update_rep_id(Rep),
+ worker_fun2(Job, OldRepId, RepWithId, RepDocData)
- throw:{job_start_error, halt} ->
- Msg = "~p : replication doc job ~s lock conflict",
- couch_log:error(Msg, [?MODULE, RepId]),
- exit(normal);
- throw:{job_start_error, Error} ->
- Msg = "~p : replication job start failed ~slock conflict",
- couch_log:error(Msg, [?MODULE, RepId])
+ throw:{filter_fetch_error, Error} ->
+ Error1 = io_lib:format("Filter fetch error ~p", [Error]),
+ Error2 = couch_util:to_binary(Error1),
+ finish_with_temporary_error(undefined, Job, RepDocData, Error2),
+ maybe_update_doc_error(OldRepId, DbName, DocId, Error2)
-maybe_start_replication_job(JTx, Rep) ->
- case couch_jobs:get_job_data(JTx, ?REP_JOBS, RepId) of
- {error, not_found} ->
- RepJobData = #{
- ?REP := Rep2,
- ?STATE_INFO := null,
- },
- ok = couch_jobs:add(JTx, ?REP_JOBS, RepJobData),
- RepDocData1 = RepDocData#{
- ?REP := Rep2,
- ?STATE_INFO := null,
- ?ERROR_COUNT := 0,
- ?LAST_UPDATED => erlang:system_info()
- },
- case couch_jobs:finish(JTx, Job, RepDocData1) of
- ok -> ok;
- Error -> throw({job_start_error, Error})
-remove_old_state_fields(#{} = Rep, ?TRIGGERED) ->
- case update_docs() of
- true -> ok;
- false -> couch_replicator_docs:remove_state_fields(DbName, DocId)
- end;
-remove_old_state_fields(#{} = Rep, ?ERROR) ->
- case update_docs() of
- true -> ok;
- false -> couch_replicator_docs:remove_state_fields(DbName, DocId)
- end;
+worker_fun2(Job, OldRepId, #{} = Rep, #{} = RepDocData) ->
+ Result = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ check_rep_doc_current(JTx, Rep),
+ remove_stale_replication_job(JTx, OldRepId, Rep),
+ maybe_start_replication_job(JTx, Job, Rep, RepDocData)
+ end),
+ case Result of
+ {ok, RepId} ->
+ maybe_update_doc_triggered(DbName, DocId, RepId);
+ ignore ->
+ ok;
+ {error, {permanent_failure, Error}} ->
+ couch_replicator_docs:update_failed(DbName, DocId, Error);
+ {error, {temporary_error, RepId, Error}} ->
+ maybe_update_doc_error(RepId, DbName, DocId, Error)
+ end.
-maybe_remove_old_state_fields(#{} = _Rep, _) ->
- ok.
+check_rep_doc_current(JTx, #{} = Rep) ->
+ #{?DB_NAME := DbName, ?DOC_ID := DocId, ?VER := Ver} = Rep,
+ case couch_jobs:get_job_data(JTx, ?REP_DOCS, doc_job_id(DbName, DocId)) of
+ {ok, #{?REP := #{?VER: = Ver}}} ->
+ ok;
+ {ok, #{?REP := #{?VER := V}}} when Ver =/= V ->
+ throw({rep_doc_not_current, DbName, DocId});
+ {error, not_found} ->
+ throw({rep_doc_not_current, DbName, DocId});
+ end.
-% Doc processor gen_server private helper functions
-% Handle doc update -- add to ets, then start a worker to try to turn it into
-% a replication job. In most cases it will succeed quickly but for filtered
-% replications or if there are duplicates, it could take longer
-% (theoretically indefinitely) until a replication could be started. Before
-% adding replication job, make sure to delete all old jobs associated with
-% same document.
--spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok.
-updated_doc(Id, Rep, Filter) ->
- NormCurRep = couch_replicator_utils:normalize_rep(current_rep(Id)),
- NormNewRep = couch_replicator_utils:normalize_rep(Rep),
- case NormCurRep == NormNewRep of
- false ->
- removed_doc(Id),
- Row = #rdoc{
- id = Id,
- state = initializing,
- rep = Rep,
- rid = nil,
- filter = Filter,
- info = nil,
- errcnt = 0,
- worker = nil,
- last_updated = os:timestamp()
- },
- true = ets:insert(?MODULE, Row),
- ok = maybe_start_worker(Id);
- true ->
+% A stale replication job is one still running after the filter
+% has been updated and a new replication id was generated.
+remove_stale_replication_job(_, null, #{}) ->
+ ok;
+remove_stale_replication_job(JTx, OldRepId, #{} = Rep) ->
+ #{?REP_ID := RepId, ?VER := Ver} = Rep,
+ case couch_jobs:get_job_data(JTx, ?REP_JOBS, OldRepId) of
+ {error, not_found} ->
+ ok;
+ {ok, #{?REP := {?VER := Ver}} when OldRep =/= RepId ->
+ couch_jobs:remove(JTx, ?REP_JOBS, OldRepId)
+ {ok, #{}} ->
-% Return current #rep{} record if any. If replication hasn't been submitted
-% to the scheduler yet, #rep{} record will be in the document processor's
-% ETS table, otherwise query scheduler for the #rep{} record.
--spec current_rep({binary(), binary()}) -> #rep{} | nil.
-current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
- case ets:lookup(?MODULE, {DbName, DocId}) of
- [] ->
- nil;
- [#rdoc{state = scheduled, rep = nil, rid = JobId}] ->
- % When replication is scheduled, #rep{} record which can be quite
- % large compared to other bits in #rdoc is removed in order to avoid
- % having to keep 2 copies of it. So have to fetch it from the
- % scheduler.
- couch_replicator_scheduler:rep_state(JobId);
- [#rdoc{rep = Rep}] ->
- Rep
+maybe_start_replication_job(JTx, Job, #{} = Rep, #{} = RepDocData) ->
+ {#?REP_ID := RepId, ?DB_NAME := DbName, ?DOC_ID := DocId} = Rep,
+ case couch_jobs:get_job_data(JTx, ?REP_JOBS, RepId) of
+ {error, not_found} ->
+ start_replication_job(JTx, Job, Rep, RepDocData);
+ {ok, #{?REP := {?DB_NAME := DbName, ?DOC_ID := DocId}} = CurRep} ->
+ case compare_reps(Rep, CurRep) of
+ true ->
+ dont_start_replication_job(JTx, Job, Rep, RepDocData);
+ false ->
+ ok = couch_jobs:remove(JTx, ?REP_JOBS, RepId),
+ start_replication_job(JTx, Job, Rep, RepDocData)
+ end;
+ {ok, #{?REP := {?DB_NAME := null}}} ->
+ Err1 = io_lib:format("Replication `~s` specified by `~s:~s`"
+ " already running as a transient replication, started via"
+ " `_replicate` API endpoint", [RepId, DbName, DocId]),
+ Err2 = couch_util:to_binary(Err1),
+ ok = finish_with_temporary_error(JTx, Job, RepDocData, Err2),
+ {error, {temporary_error, RepId, Error2}};
+ {ok, #{?REP := {?DB_NAME := OtherDb, ?DOC_ID := OtherDoc}}} ->
+ Err1 = io_lib:format("Replication `~s` specified by `~s:~s`"
+ " already started by document `~s:~s`", [RepId, DocId,
+ DbName, OtherDb, OtherDoc],
+ Error2 = couch_util:to_binary(Err1),
+ ok = finish_with_permanent_failure(JTx, Job, RepDocData, Error),
+ {error, {permanent_failure, Error2}}
--spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok.
-worker_returned(Ref, Id, {ok, RepId}) ->
- case ets:lookup(?MODULE, Id) of
- [#rdoc{worker = Ref} = Row] ->
- Row0 = Row#rdoc{
- state = scheduled,
- errcnt = 0,
- worker = nil,
- last_updated = os:timestamp()
- },
- NewRow = case Row0 of
- #rdoc{rid = RepId, filter = user} ->
- % Filtered replication id didn't change.
- Row0;
- #rdoc{rid = nil, filter = user} ->
- % Calculated new replication id for a filtered replication. Make
- % sure to schedule another check as filter code could change.
- % Replication starts could have been failing, so also clear
- % error count.
- Row0#rdoc{rid = RepId};
- #rdoc{rid = OldRepId, filter = user} ->
- % Replication id of existing replication job with filter has
- % changed. Remove old replication job from scheduler and
- % schedule check to check for future changes.
- ok = couch_replicator_scheduler:remove_job(OldRepId),
- Msg = io_lib:format("Replication id changed: ~p -> ~p", [
- OldRepId, RepId]),
- Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)};
- #rdoc{rid = nil} ->
- % Calculated new replication id for non-filtered replication.
- % Remove replication doc body, after this we won't need it
- % anymore.
- Row0#rdoc{rep=nil, rid=RepId, info=nil}
- end,
- true = ets:insert(?MODULE, NewRow),
- ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId),
- ok = maybe_start_worker(Id);
- _ ->
- ok % doc could have been deleted, ignore
- end,
- ok;
+finish_with_temporary_error(JTx, Job, RepDocData, Error) ->
+ #{?ERROR_COUNT := ErrorCount} = RepDocData,
+ ErrorCount1 = ErrorCount + 1,
+ RepDocData1 = RepDocData#{
+ ?STATE_INFO := Error,
+ ?ERROR_COUNT := ErrorCount1,
+ } = RepDocData,
+ schedule_error_backoff(JTx, Job, ErrorCount1),
+ case couch_jobs:finish(JTx, Job, RepDocData1) of
+ ok -> ok;
+ {error, halt} -> throw(halt)
+ end.
-worker_returned(_Ref, _Id, ignore) ->
- ok;
-worker_returned(Ref, Id, {temporary_error, Reason}) ->
- case ets:lookup(?MODULE, Id) of
- [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] ->
- NewRow = Row#rdoc{
- rid = nil,
- state = error,
- info = Reason,
- errcnt = ErrCnt + 1,
- worker = nil,
- last_updated = os:timestamp()
- },
- true = ets:insert(?MODULE, NewRow),
- ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason),
- ok = maybe_start_worker(Id);
- _ ->
- ok % doc could have been deleted, ignore
- end,
- ok;
+finish_with_permanent_failure(JTx, Job, RepDocData, Error) ->
+ #{?ERROR_COUNT := ErrorCount} = RepDocData,
+ RepDocData1 = RepDocData#{
+ ?STATE_INFO := Error,
+ ?ERROR_COUNT := ErrorCount + 1,
+ } = RepDocData,
+ case couch_jobs:finish(JTx, Job, RepDocData1) of
+ ok -> ok;
+ {error, halt} -> throw(halt)
+ end.
-worker_returned(Ref, Id, {permanent_failure, _Reason}) ->
- case ets:lookup(?MODULE, Id) of
- [#rdoc{worker = Ref}] ->
- true = ets:delete(?MODULE, Id);
- _ ->
- ok % doc could have been deleted, ignore
- end,
+dont_start_replication_job(JTx, Job, Rep, RepDocData) ->
+ RepDocData1 = RepDocData#{?LAST_UPDATED => erlang:system_time()},
+ ok = schedule_filter_check(JTx, Job, Rep),
+ case couch_jobs:finish(JTx, Job, RepDocData1) of
+ ok -> ignore;
+ {error, halt} -> throw(halt)
+ end.
+start_replication_job(JTx, Job, #{} = Rep, #{} = RepDocData) ->
+ #{?REP_ID := RepId} = Rep,
+ RepJobData = #{
+ ?REP => Rep,
+ ?STATE_INFO => null,
+ ?ERROR_COUNT => 0,
+ ?LAST_UPDATED => erlang:system_time(),
+ ?HISTORY => []
+ },
+ ok = couch_jobs:add(JTx, ?REP_JOBS, RepId, RepJobData),
+ RepDocData1 = RepDocData#{
+ ?REP := Rep,
+ ?STATE_INFO := null,
+ ?ERROR_COUNT := 0,
+ ?LAST_UPDATED => erlang:system_time()
+ },
+ ok = schedule_filter_check(JTx, Job, Rep),
+ case couch_jobs:finish(JTx, Job, RepDocData1) of
+ ok -> {ok, RepId};
+ {error, halt} -> throw(halt)
+ end.
+schedule_error_backoff(JTx, Job, ErrorCount) ->
+ % ErrCnt is the exponent here. The reason 64 is used is to start at
+ % 64 (about a minute) max range. Then first backoff would be 30 sec
+ % on average. Then 1 minute and so on.
+ NowSec = erlang:system_time(second),
+ When = NowSec + rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp).
+ couch_jobs:resubmit(JTx, Job, trunc(When)).
+schedule_filter_check(JTx, Job, #{<<"filter_type">> := <<"user">>} = Rep) ->
+ IntervalSec = filter_check_interval_sec(),
+ NowSec = erlang:system_time(second),
+ When = NowSec + 0.5 * IntervalSec + rand:uniform(IntervalSec),
+ couch_jobs:resubmit(JTx, Job, trunc(When)).
+schedule_filter_check(_JTx, _Job, #{}) ->
--spec maybe_update_doc_error(#rep{}, any()) -> ok.
-maybe_update_doc_error(Rep, Reason) ->
+remove_old_state_fields(#{?DOC_STATE := DocState} = RepDocData) when
+ DocState =:= ?TRIGGERED orelse DocState =:= ?ERROR ->
case update_docs() of
true ->
- couch_replicator_docs:update_error(Rep, Reason);
+ ok;
+ false ->
+ #{?DB_NAME := DbName, ?DOC_ID := DocId} = RepDocData,
+ couch_replicator_docs:remove_state_fields(DbName, DocId)
+ end;
+remove_old_state_fields(#{}) ->
+ ok.
+-spec maybe_update_doc_error(binary(), binary(), binary(), any()) -> ok.
+maybe_update_doc_error(RepId, DbName, DocId, Error) ->
+ case update_docs() of
+ true ->
+ couch_replicator_docs:update_error(RepId, DbName, DocId, Error);
false ->
--spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok.
-maybe_update_doc_triggered(Rep, RepId) ->
+-spec maybe_update_doc_triggered(#{}, rep_id()) -> ok.
+maybe_update_doc_triggered(RepId, DbName, DocId) ->
case update_docs() of
true ->
- couch_replicator_docs:update_triggered(Rep, RepId);
+ couch_replicator_docs:update_triggered(RepId, DbName, DocId);
false ->
@@ -570,75 +499,17 @@ error_backoff(ErrCnt) ->
couch_rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp).
--spec filter_backoff() -> seconds().
-filter_backoff() ->
- Total = ets:info(?MODULE, size),
- % This value scaled by the number of replications. If the are a lot of them
- % wait is longer, but not more than a day (?TS_DAY_SEC). If there are just
- % few, wait is shorter, starting at about 30 seconds. `2 *` is used since
- % the expected wait would then be 0.5 * Range so it is easier to see the
- % average wait. `1 +` is used because couch_rand:uniform only
- % accepts >= 1 values and crashes otherwise.
- Range = 1 + min(2 * (Total / 10), ?TS_DAY_SEC),
- ?MIN_FILTER_DELAY_SEC + couch_rand:uniform(round(Range)).
-% Document removed from db -- clear ets table and remove all scheduled jobs
--spec removed_doc(db_doc_id()) -> ok.
-removed_doc({DbName, DocId} = Id) ->
- ets:delete(?MODULE, Id),
- RepIds = couch_replicator_scheduler:find_jobs_by_doc(DbName, DocId),
- lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
-% Whole db shard is gone -- remove all its ets rows and stop jobs
--spec removed_db(binary()) -> ok.
-removed_db(DbName) ->
- EtsPat = #rdoc{id = {DbName, '_'}, _ = '_'},
- ets:match_delete(?MODULE, EtsPat),
- RepIds = couch_replicator_scheduler:find_jobs_by_dbname(DbName),
- lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
-% Spawn a worker process which will attempt to calculate a replication id, then
-% start a replication. Returns a process monitor reference. The worker is
-% guaranteed to exit with rep_start_result() type only.
--spec maybe_start_worker(db_doc_id()) -> ok.
-maybe_start_worker(Id) ->
- case ets:lookup(?MODULE, Id) of
- [] ->
- ok;
- [#rdoc{state = scheduled, filter = Filter}] when Filter =/= user ->
- ok;
- [#rdoc{rep = Rep} = Doc] ->
- % For any replication with a user created filter function, periodically
- % (every `filter_backoff/0` seconds) to try to see if the user filter
- % has changed by using a worker to check for changes. When the worker
- % returns check if replication ID has changed. If it hasn't keep
- % checking (spawn another worker and so on). If it has stop the job
- % with the old ID and continue checking.
- Wait = get_worker_wait(Doc),
- Ref = make_ref(),
- true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}),
- couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref),
- ok
- end.
--spec get_worker_wait(#rdoc{}) -> seconds().
-get_worker_wait(#rdoc{state = scheduled, filter = user}) ->
- filter_backoff();
-get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) ->
- error_backoff(ErrCnt);
-get_worker_wait(#rdoc{state = initializing}) ->
- 0.
-spec update_docs() -> boolean().
update_docs() ->
config:get_boolean("replicator", "update_docs", ?DEFAULT_UPDATE_DOCS).
+-spec filter_check_interval_sec() -> integer().
+filter_check_interval_sec() ->
+ config:get_integer("replicator", "filter_check_interval_sec",
% _scheduler/docs HTTP endpoint helpers
-spec docs([atom()]) -> [{[_]}] | [].
@@ -753,23 +624,33 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
-spec add_rep_doc_job(any(), binary(), binary(), #{} | null,
- binary() | null, binary() | null) -> ok.
-add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError, DocState) ->
+ binary() | null) -> ok.
+add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError) ->
JobId = docs_job_id(DbName, DocId),
- ok = remove_replication_by_doc_job_id(Db, JobId),
- RepDocData = #{
- ?REP => Rep,
- ?REP_PARSE_ERROR => RepParseError,
- ?DOC_STATE => DocState,
- ?DB_NAME => DbName,
- ?DOC_ID => DocId,
- ?ERROR_COUNT => 0,
- ?LAST_UPDATED => erlang:system_time(),
- ?STATE => null,
- ?STATE_INFO => null.
- },
- ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocData).
+ RepDocData = case Rep of
+ null ->
+ #{
+ ?REP => null,
+ ?DB_NAME => DbName,
+ ?DOC_ID => DocId,
+ ?STATE_INFO => RepParseError
+ ?ERROR_COUNT => 0,
+ ?LAST_UPDATED => erlang:system_time()
+ };
+ #{} ->
+ #{
+ ?REP => Rep,
+ ?ERROR_COUNT => 0,
+ ?LAST_UPDATED => erlang:system_time(),
+ ?STATE_INFO => null
+ }
+ end,
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ ok = remove_replication_by_doc_job_id(JTx, JobId),
+ ok = couch_jobs:add(JTx, ?REP_DOCS, RepDocData)
+ end).
docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
@@ -781,12 +662,14 @@ remove_replication_by_doc_job_id(Tx, Id) ->
case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of
{error, not_found} ->
- {ok, #{?REP := {<<"id">> := null}}} ->
+ {ok, #{?REP := {?REP_ID := null}}} ->
couch_jobs:remove(Tx, ?REP_DOCS, Id),
- {ok, #{?REP := {<<"id">> := RepId}}} ->
- couch_jobs:remove(Tx, ?REP_JOBS, RepId),
- couch_jobs:remove(Tx, ?REP_DOCS, Id),
+ {ok, #{?REP := {?REP_ID := RepId}}} ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ couch_jobs:remove(JTx, ?REP_JOBS, RepId),
+ couch_jobs:remove(JTx, ?REP_DOCS, Id)
+ end),
@@ -1068,7 +951,7 @@ test_rep(Id) ->
change() ->
- {<<"id">>, ?DOC1},
+ {?REP_ID, ?DOC1},
{doc, {[
{<<"_id">>, ?DOC1},
{<<"source">>, <<"http://srchost.local/src">>},
@@ -1079,7 +962,7 @@ change() ->
change(State) ->
- {<<"id">>, ?DOC1},
+ {?REP_ID, ?DOC1},
{doc, {[
{<<"_id">>, ?DOC1},
{<<"source">>, <<"http://srchost.local/src">>},
@@ -1091,7 +974,7 @@ change(State) ->
deleted_change() ->
- {<<"id">>, ?DOC1},
+ {?REP_ID, ?DOC1},
{<<"deleted">>, true},
{doc, {[
{<<"_id">>, ?DOC1},
@@ -1103,7 +986,7 @@ deleted_change() ->
bad_change() ->
- {<<"id">>, ?DOC2},
+ {?REP_ID, ?DOC2},
{doc, {[
{<<"_id">>, ?DOC2},
{<<"source">>, <<"src">>}
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
deleted file mode 100644
index a4c829323..000000000
--- a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
+++ /dev/null
@@ -1,284 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
- spawn_worker/4
--import(couch_replicator_utils, [
- pp_rep_id/1
-% 61 seconds here because request usually have 10, 15, 30 second
-% timeouts set. We'd want the worker to get a chance to make a few
-% requests (maybe one failing one and a retry) and then fail with its
-% own error (timeout, network error), which would be more specific and
-% informative, before it simply gets killed because of the timeout
-% here. That is, if all fails and the worker is actually blocked then
-% 61 sec is a safety net to brutally kill the worker so doesn't end up
-% hung forever.
--define(WORKER_TIMEOUT_MSEC, 61000).
-% Spawn a worker which attempts to calculate replication id then add a
-% replication job to scheduler. This function create a monitor to the worker
-% a worker will then exit with the #doc_worker_result{} record within
-% ?WORKER_TIMEOUT_MSEC timeout period.A timeout is considered a
-%`temporary_error`. Result will be sent as the `Reason` in the {'DOWN',...}
-% message.
--spec spawn_worker(db_doc_id(), #rep{}, seconds(), reference()) -> pid().
-spawn_worker(Id, Rep, WaitSec, WRef) ->
- {Pid, _Ref} = spawn_monitor(fun() ->
- worker_fun(Id, Rep, WaitSec, WRef)
- end),
- Pid.
-% Private functions
--spec worker_fun(db_doc_id(), #rep{}, seconds(), reference()) -> no_return().
-worker_fun(Id, Rep, WaitSec, WRef) ->
- timer:sleep(WaitSec * 1000),
- Fun = fun() ->
- try maybe_start_replication(Id, Rep, WRef) of
- Res ->
- exit(Res)
- catch
- throw:{filter_fetch_error, Reason} ->
- exit({temporary_error, Reason});
- _Tag:Reason ->
- exit({temporary_error, Reason})
- end
- end,
- {Pid, Ref} = spawn_monitor(Fun),
- receive
- {'DOWN', Ref, _, Pid, Result} ->
- exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
- erlang:demonitor(Ref, [flush]),
- exit(Pid, kill),
- {DbName, DocId} = Id,
- TimeoutSec = round(?WORKER_TIMEOUT_MSEC / 1000),
- Msg = io_lib:format("Replication for db ~p doc ~p failed to start due "
- "to timeout after ~B seconds", [DbName, DocId, TimeoutSec]),
- Result = {temporary_error, couch_util:to_binary(Msg)},
- exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
- end.
-% Try to start a replication. Used by a worker. This function should return
-% rep_start_result(), also throws {filter_fetch_error, Reason} if cannot fetch
-% filter.It can also block for an indeterminate amount of time while fetching
-% filter.
-maybe_start_replication(Id, RepWithoutId, WRef) ->
- Rep = couch_replicator_docs:update_rep_id(RepWithoutId),
- case maybe_add_job_to_scheduler(Id, Rep, WRef) of
- ignore ->
- ignore;
- {ok, RepId} ->
- {ok, RepId};
- {temporary_error, Reason} ->
- {temporary_error, Reason};
- {permanent_failure, Reason} ->
- {DbName, DocId} = Id,
- couch_replicator_docs:update_failed(DbName, DocId, Reason),
- {permanent_failure, Reason}
- end.
--spec maybe_add_job_to_scheduler(db_doc_id(), #rep{}, reference()) ->
- rep_start_result().
-maybe_add_job_to_scheduler({DbName, DocId}, Rep, WRef) ->
- RepId =,
- case couch_replicator_scheduler:rep_state(RepId) of
- nil ->
- % Before adding a job check that this worker is still the current
- % worker. This is to handle a race condition where a worker which was
- % sleeping and then checking a replication filter may inadvertently
- % re-add a replication which was already deleted.
- case couch_replicator_doc_processor:get_worker_ref({DbName, DocId}) of
- WRef ->
- ok = couch_replicator_scheduler:add_job(Rep),
- {ok, RepId};
- _NilOrOtherWRef ->
- ignore
- end;
- #rep{doc_id = DocId} ->
- {ok, RepId};
- #rep{doc_id = null} ->
- Msg = io_lib:format("Replication `~s` specified by document `~s`"
- " already running as a transient replication, started via"
- " `_replicate` API endpoint", [pp_rep_id(RepId), DocId]),
- {temporary_error, couch_util:to_binary(Msg)};
- #rep{db_name = OtherDb, doc_id = OtherDocId} ->
- Msg = io_lib:format("Replication `~s` specified by document `~s`"
- " already started, triggered by document `~s` from db `~s`",
- [pp_rep_id(RepId), DocId, OtherDocId, mem3:dbname(OtherDb)]),
- {permanent_failure, couch_util:to_binary(Msg)}
- end.
--define(DB, <<"db">>).
--define(DOC1, <<"doc1">>).
--define(R1, {"ad08e05057046eabe898a2572bbfb573", ""}).
-doc_processor_worker_test_() ->
- {
- foreach,
- fun setup/0,
- fun teardown/1,
- [
- t_should_add_job(),
- t_already_running_same_docid(),
- t_already_running_transient(),
- t_already_running_other_db_other_doc(),
- t_spawn_worker(),
- t_ignore_if_doc_deleted(),
- t_ignore_if_worker_ref_does_not_match()
- ]
- }.
-% Replication is already running, with same doc id. Ignore change.
-t_should_add_job() ->
- ?_test(begin
- Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
- ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
- ?assert(added_job())
- end).
-% Replication is already running, with same doc id. Ignore change.
-t_already_running_same_docid() ->
- ?_test(begin
- Id = {?DB, ?DOC1},
- mock_already_running(?DB, ?DOC1),
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
- ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
- ?assert(did_not_add_job())
- end).
-% There is a transient replication with same replication id running. Ignore.
-t_already_running_transient() ->
- ?_test(begin
- Id = {?DB, ?DOC1},
- mock_already_running(null, null),
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
- ?assertMatch({temporary_error, _}, maybe_start_replication(Id, Rep,
- nil)),
- ?assert(did_not_add_job())
- end).
-% There is a duplicate replication potentially from a different db and doc.
-% Write permanent failure to doc.
-t_already_running_other_db_other_doc() ->
- ?_test(begin
- Id = {?DB, ?DOC1},
- mock_already_running(<<"otherdb">>, <<"otherdoc">>),
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
- ?assertMatch({permanent_failure, _}, maybe_start_replication(Id, Rep,
- nil)),
- ?assert(did_not_add_job()),
- 1 == meck:num_calls(couch_replicator_docs, update_failed, '_')
- end).
-% Should spawn worker
-t_spawn_worker() ->
- ?_test(begin
- Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
- WRef = make_ref(),
- meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef),
- Pid = spawn_worker(Id, Rep, 0, WRef),
- Res = receive {'DOWN', _Ref, process, Pid, Reason} -> Reason
- after 1000 -> timeout end,
- Expect = #doc_worker_result{id = Id, wref = WRef, result = {ok, ?R1}},
- ?assertEqual(Expect, Res),
- ?assert(added_job())
- end).
-% Should not add job if by the time worker got to fetching the filter
-% and getting a replication id, replication doc was deleted
-t_ignore_if_doc_deleted() ->
- ?_test(begin
- Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
- meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
- ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
- ?assertNot(added_job())
- end).
-% Should not add job if by the time worker got to fetchign the filter
-% and building a replication id, another worker was spawned.
-t_ignore_if_worker_ref_does_not_match() ->
- ?_test(begin
- Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
- meck:expect(couch_replicator_doc_processor, get_worker_ref, 1,
- make_ref()),
- ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
- ?assertNot(added_job())
- end).
-% Test helper functions
-setup() ->
- meck:expect(couch_replicator_scheduler, add_job, 1, ok),
- meck:expect(config, get, fun(_, _, Default) -> Default end),
- meck:expect(couch_server, get_uuid, 0, this_is_snek),
- meck:expect(couch_replicator_docs, update_failed, 3, ok),
- meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
- meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
- ok.
-teardown(_) ->
- meck:unload().
-mock_already_running(DbName, DocId) ->
- meck:expect(couch_replicator_scheduler, rep_state,
- fun(RepId) -> #rep{id = RepId, doc_id = DocId, db_name = DbName} end).
-added_job() ->
- 1 == meck:num_calls(couch_replicator_scheduler, add_job, '_').
-did_not_add_job() ->
- 0 == meck:num_calls(couch_replicator_scheduler, add_job, '_').
-change() ->
- {[
- {<<"_id">>, ?DOC1},
- {<<"source">>, <<"http://srchost.local/src">>},
- {<<"target">>, <<"http://tgthost.local/tgt">>}
- ]}.
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index eaf8161fe..d235bf666 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -27,7 +27,7 @@
- update_error/2
+ update_error/4
@@ -116,13 +116,8 @@ update_triggered(Id, DocId, DbName) ->
--spec update_error(#{}, any()) -> ok.
-update_error(#rep{} = Rep, Error) ->
- #{
- <<"id">> := RepId0,
- <<"db_name">> := DbName,
- <<"doc_id">> := DocId,
- } = Rep,
+-spec update_error(binary(), binary(), binary(), any()) -> ok.
+update_error(RepId0, DbName, DocId, Error) ->
Reason = error_reason(Error),
RepId = case RepId0 of
Id when is_binary(Id) -> Id;
@@ -199,22 +194,30 @@ parse_rep_doc_without_id(#{} = Doc, UserName) ->
{error, Error} -> throw({bad_request, Error});
Result -> Result
+ FilterType = couch_replicator_filters:parse(Options) of
+ {ok, nil} -> null;
+ {ok, {user, _FName, _QP}} -> <<"user">>;
+ {ok, {view, _FName, _QP}} -> <<"view">>;
+ {ok, {docids, _DocIds}} -> <<"doc_ids">>;
+ {ok, {mango, _Selector}} -> <<"mango">>;
+ {error, FilterError} -> throw({error, FilterError})
+ end,
Rep = #{
- <<"id">> => null,
+ ?REP_ID => null,
<<"base_id">> => null,
- <<"source">> => Source,
- <<"target">> => Target,
+ ?SOURCE => Source,
+ ?TARGET => Target,
<<"options">> => Opts,
<<"user">> => UserName,
+ <<"filter_type">> => FilterType,
<<"type">> => Type,
<<"view">> => View,
- <<"doc_id">> => maps:get(<<"_id">>, Doc, null)
+ ?DOC_ID => maps:get(<<"_id">>, Doc, null),
+ ?DB_NAME => null,
+ ?DOC_STATE => null,
+ ?START_TIME => erlang:system_time(),
+ ?VER => fabric2_util:uuid()
- % Check if can parse filter code, if not throw exception
- case couch_replicator_filters:parse(Opts) of
- {error, FilterError} -> throw({error, FilterError});
- {ok, _Filter} -> ok
- end,
{ok, Rep}
diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl
index b14ea3475..fe785a2bf 100644
--- a/src/couch_replicator/src/couch_replicator_filters.erl
+++ b/src/couch_replicator/src/couch_replicator_filters.erl
@@ -27,17 +27,17 @@
% For `user` filter, i.e. filters specified as user code
% in source database, this code doesn't fetch the filter
% code, but only returns the name of the filter.
--spec parse([_]) ->
+-spec parse(#{}) ->
{ok, nil} |
{ok, {view, binary(), {[_]}}} |
{ok, {user, {binary(), binary()}, {[_]}}} |
{ok, {docids, [_]}} |
{ok, {mango, {[_]}}} |
{error, binary()}.
-parse(Options) ->
- Filter = couch_util:get_value(filter, Options),
- DocIds = couch_util:get_value(doc_ids, Options),
- Selector = couch_util:get_value(selector, Options),
+parse(#{} = Options) ->
+ Filter = maps:get(<<"filter">>, Options, undefined),
+ DocIds = maps:get(<<"doc_ids">>, Options, undefined),
+ Selector = maps:get(<<"selector">>, Options, undefined),
case {Filter, DocIds, Selector} of
{undefined, undefined, undefined} ->
{ok, nil};