summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-08-28 04:34:21 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2020-09-15 16:13:46 -0400
commit5b98e8a6c169449d1a3e362e52e86822ef350ed5 (patch)
treeb891a032138589b5c01b14d3e9eba39c3fd68c62
parent3c9b7540cbb41225b35c89b741e0c5b83cdbf4e1 (diff)
downloadcouchdb-5b98e8a6c169449d1a3e362e52e86822ef350ed5.tar.gz
Update frontend replicator modules
The frontend is the part responsible for parsing replication parameters and creating replication jobs. Most of that happens in the `couch_replicator` module. - `couch_replicator:replicate/2` is the main API for creating transient replication jobs. - Replication jobs from `_replicator` documents are updated from `couch_replicator:after_*` callbacks. `after_db_create/2` besides being called on db creation also gets called when a database is undeleted and `add_jobs_from_db/1` function will attempt to parse them all. `couch_replicator` exports monitoring functions `docs/2,3 and jobs/0,1`. Those get called from HTTP handlers for `_scheduler/docs` and `_scheduler/jobs` respectively. For hands-on remsh access there some debuging functions such as: - rescan_jobs/0,1 : Simulates a db being re-created so all the jobs are added - reenqueue_jobs/0,1 : Deletes all the jobs from a db then re-adds them - remove_jobs/0 : Removes all the replication jobs - get_job_ids/0 : Read the RepId -> JobId mapping area
-rw-r--r--src/couch_replicator/src/couch_replicator.erl716
-rw-r--r--src/couch_replicator/src/couch_replicator_epi.erl58
-rw-r--r--src/couch_replicator/src/couch_replicator_fabric2_plugin.erl36
3 files changed, 530 insertions, 280 deletions
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index b38f31b59..f34ac7d7f 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -14,279 +14,484 @@
-export([
replicate/2,
- replication_states/0,
+
+ jobs/0,
job/1,
- doc/3,
- active_doc/2,
- info_from_doc/2,
- restart_job/1
+ docs/2,
+ doc/2,
+
+ after_db_create/2,
+ after_db_delete/2,
+ after_doc_write/6,
+
+ ensure_rep_db_exists/0,
+
+ rescan_jobs/0,
+ rescan_jobs/1,
+ reenqueue_jobs/0,
+ reenqueue_jobs/1,
+ remove_jobs/0,
+ get_job_ids/0
]).
+
-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator.hrl").
--include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
--include_lib("mem3/include/mem3.hrl").
-
--define(DESIGN_DOC_CREATION_DELAY_MSEC, 1000).
--define(REPLICATION_STATES, [
- initializing, % Just added to scheduler
- error, % Could not be turned into a replication job
- running, % Scheduled and running
- pending, % Scheduled and waiting to run
- crashing, % Scheduled but crashing, backed off by the scheduler
- completed, % Non-continuous (normal) completed replication
- failed % Terminal failure, will not be retried anymore
-]).
-
--import(couch_util, [
- get_value/2,
- get_value/3
-]).
-spec replicate({[_]}, any()) ->
{ok, {continuous, binary()}} |
- {ok, {[_]}} |
+ {ok, #{}} |
{ok, {cancelled, binary()}} |
{error, any()} |
no_return().
-replicate(PostBody, Ctx) ->
- {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
- Rep = Rep0#rep{start_time = os:timestamp()},
- #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
- case get_value(cancel, Options, false) of
- true ->
- CancelRepId = case get_value(id, Options, nil) of
- nil ->
- RepId;
- RepId2 ->
- RepId2
- end,
- case check_authorization(CancelRepId, UserCtx) of
- ok ->
- cancel_replication(CancelRepId);
- not_found ->
- {error, not_found}
- end;
- false ->
- check_authorization(RepId, UserCtx),
- {ok, Listener} = rep_result_listener(RepId),
- Result = do_replication_loop(Rep),
- couch_replicator_notifier:stop(Listener),
- Result
+replicate(Body, #user_ctx{name = User} = UserCtx) ->
+ {ok, Id, Rep} = couch_replicator_parse:parse_transient_rep(Body, User),
+ #{?OPTIONS := Options} = Rep,
+ JobId = case couch_replicator_jobs:get_job_id(undefined, Id) of
+ {ok, JobId0} -> JobId0;
+ {error, not_found} -> Id
+ end,
+ case maps:get(<<"cancel">>, Options, false) of
+ true ->
+ case check_authorization(JobId, UserCtx) of
+ ok -> cancel_replication(JobId);
+ not_found -> {error, not_found}
+ end;
+ false ->
+ check_authorization(JobId, UserCtx),
+ ok = start_transient_job(JobId, Rep),
+ case maps:get(<<"continuous">>, Options, false) of
+ true ->
+ case couch_replicator_jobs:wait_running(JobId) of
+ {ok, #{?STATE := ?ST_RUNNING} = JobData} ->
+ {ok, {continuous, maps:get(?REP_ID, JobData)}};
+ {ok, #{?STATE := ?ST_FAILED} = JobData} ->
+ {error, maps:get(?STATE_INFO, JobData)};
+ {error, Error} ->
+ {error, Error}
+ end;
+ false ->
+ case couch_replicator_jobs:wait_result(JobId) of
+ {ok, #{?STATE := ?ST_COMPLETED} = JobData} ->
+ {ok, maps:get(?CHECKPOINT_HISTORY, JobData)};
+ {ok, #{?STATE := ?ST_FAILED} = JobData} ->
+ {error, maps:get(?STATE_INFO, JobData)};
+ {error, Error} ->
+ {error, Error}
+ end
+ end
end.
--spec do_replication_loop(#rep{}) ->
- {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}.
-do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
- ok = couch_replicator_scheduler:add_job(Rep),
- case get_value(continuous, Options, false) of
- true ->
- {ok, {continuous, ?l2b(BaseId ++ Ext)}};
- false ->
- wait_for_result(Id)
+jobs() ->
+ FoldFun = fun(_JTx, _JobId, CouchJobsState, JobData, Acc) ->
+ case CouchJobsState of
+ pending -> [job_ejson(JobData) | Acc];
+ running -> [job_ejson(JobData) | Acc];
+ finished -> Acc
+ end
+ end,
+ couch_replicator_jobs:fold_jobs(undefined, FoldFun, []).
+
+
+job(Id0) when is_binary(Id0) ->
+ Id1 = couch_replicator_ids:convert(Id0),
+ JobId = case couch_replicator_jobs:get_job_id(undefined, Id1) of
+ {ok, JobId0} -> JobId0;
+ {error, not_found} -> Id1
+ end,
+ case couch_replicator_jobs:get_job_data(undefined, JobId) of
+ {ok, #{} = JobData} -> {ok, job_ejson(JobData)};
+ {error, not_found} -> {error, not_found}
end.
--spec rep_result_listener(rep_id()) -> {ok, pid()}.
-rep_result_listener(RepId) ->
- ReplyTo = self(),
- {ok, _Listener} = couch_replicator_notifier:start_link(
- fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
- ReplyTo ! Ev;
- (_) ->
- ok
- end).
+docs(#{} = Db, States) when is_list(States) ->
+ DbName = fabric2_db:name(Db),
+ FoldFun = fun(_JTx, _JobId, _, JobData, Acc) ->
+ case JobData of
+ #{?DB_NAME := DbName, ?STATE := State} ->
+ case {States, lists:member(State, States)} of
+ {[], _} -> [doc_ejson(JobData) | Acc];
+ {[_ | _], true} -> [doc_ejson(JobData) | Acc];
+ {[_ | _], false} -> Acc
+ end;
+ #{} ->
+ Acc
+ end
+ end,
+ couch_replicator_jobs:fold_jobs(undefined, FoldFun, []).
--spec wait_for_result(rep_id()) ->
- {ok, {[_]}} | {error, any()}.
-wait_for_result(RepId) ->
- receive
- {finished, RepId, RepResult} ->
- {ok, RepResult};
- {error, RepId, Reason} ->
- {error, Reason}
+doc(#{} = Db, DocId) when is_binary(DocId) ->
+ DbUUID = fabric2_db:get_uuid(Db),
+ JobId = couch_replicator_ids:job_id(DbUUID, DocId),
+ case couch_replicator_jobs:get_job_data(undefined, JobId) of
+ {ok, #{} = JobData} -> {ok, doc_ejson(JobData)};
+ {error, not_found} -> {error, not_found}
end.
--spec cancel_replication(rep_id()) ->
- {ok, {cancelled, binary()}} | {error, not_found}.
-cancel_replication({BasedId, Extension} = RepId) ->
- FullRepId = BasedId ++ Extension,
- couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
- case couch_replicator_scheduler:rep_state(RepId) of
- #rep{} ->
- ok = couch_replicator_scheduler:remove_job(RepId),
- couch_log:notice("Replication '~s' cancelled", [FullRepId]),
- {ok, {cancelled, ?l2b(FullRepId)}};
- nil ->
- couch_log:notice("Replication '~s' not found", [FullRepId]),
- {error, not_found}
- end.
+after_db_create(DbName, DbUUID) when ?IS_REP_DB(DbName)->
+ couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
+ try fabric2_db:open(DbName, [{uuid, DbUUID}, ?ADMIN_CTX]) of
+ {ok, Db} ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ ok = add_jobs_from_db(TxDb)
+ end)
+ catch
+ error:database_does_not_exist ->
+ ok
+ end;
+
+after_db_create(_DbName, _DbUUID) ->
+ ok.
+
+
+after_db_delete(DbName, DbUUID) when ?IS_REP_DB(DbName) ->
+ couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
+ FoldFun = fun(JTx, JobId, _, JobData, ok) ->
+ case JobData of
+ #{?DB_UUID := DbUUID} ->
+ ok = couch_replicator_jobs:remove_job(JTx, JobId);
+ #{} ->
+ ok
+ end
+ end,
+ couch_replicator_jobs:fold_jobs(undefined, FoldFun, ok);
+
+after_db_delete(_DbName, _DbUUID) ->
+ ok.
+
+
+after_doc_write(#{name := DbName} = Db, #doc{} = Doc, _NewWinner, _OldWinner,
+ _NewRevId, _Seq) when ?IS_REP_DB(DbName) ->
+ couch_stats:increment_counter([couch_replicator, docs, db_changes]),
+ {Props} = Doc#doc.body,
+ case couch_util:get_value(?REPLICATION_STATE, Props) of
+ ?ST_COMPLETED -> ok;
+ ?ST_FAILED -> ok;
+ _ -> process_change(Db, Doc)
+ end;
+
+after_doc_write(_Db, _Doc, _NewWinner, _OldWinner, _NewRevId, _Seq) ->
+ ok.
+
+
+% This is called from supervisor, must return ignore.
+-spec ensure_rep_db_exists() -> ignore.
+ensure_rep_db_exists() ->
+ couch_replicator_jobs:set_timeout(),
+ case config:get_boolean("replicator", "create_replicator_db", false) of
+ true ->
+ UserCtx = #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]},
+ Opts = [{user_ctx, UserCtx}, sys_db],
+ case fabric2_db:create(?REP_DB_NAME, Opts) of
+ {error, file_exists} -> ok;
+ {ok, _Db} -> ok
+ end;
+ false ->
+ ok
+ end,
+ ignore.
--spec replication_states() -> [atom()].
-replication_states() ->
- ?REPLICATION_STATES.
+% Testing and debug functions
+rescan_jobs() ->
+ rescan_jobs(?REP_DB_NAME).
--spec strip_url_creds(binary() | {[_]}) -> binary().
-strip_url_creds(Endpoint) ->
- try
- couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
- #httpdb{url = Url} ->
- iolist_to_binary(couch_util:url_strip_password(Url))
+
+rescan_jobs(DbName) when is_binary(DbName), ?IS_REP_DB(DbName) ->
+ try fabric2_db:open(DbName, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ after_db_create(DbName, fabric2_db:get_uuid(Db))
catch
- throw:{error, local_endpoints_not_supported} ->
- Endpoint
+ error:database_does_not_exist ->
+ database_does_not_exist
end.
--spec job(binary()) -> {ok, {[_]}} | {error, not_found}.
-job(JobId0) when is_binary(JobId0) ->
- JobId = couch_replicator_ids:convert(JobId0),
- {Res, _Bad} = rpc:multicall(couch_replicator_scheduler, job, [JobId]),
- case [JobInfo || {ok, JobInfo} <- Res] of
- [JobInfo| _] ->
- {ok, JobInfo};
- [] ->
- {error, not_found}
- end.
+reenqueue_jobs() ->
+ reenqueue_jobs(?REP_DB_NAME).
--spec restart_job(binary() | list() | rep_id()) ->
- {ok, {[_]}} | {error, not_found}.
-restart_job(JobId0) ->
- JobId = couch_replicator_ids:convert(JobId0),
- {Res, _} = rpc:multicall(couch_replicator_scheduler, restart_job, [JobId]),
- case [JobInfo || {ok, JobInfo} <- Res] of
- [JobInfo| _] ->
- {ok, JobInfo};
- [] ->
- {error, not_found}
+reenqueue_jobs(DbName) when is_binary(DbName), ?IS_REP_DB(DbName) ->
+ try fabric2_db:open(DbName, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ DbUUID = fabric2_db:get_uuid(Db),
+ ok = after_db_delete(DbName, DbUUID),
+ ok = after_db_create(DbName, DbUUID)
+ catch
+ error:database_does_not_exist ->
+ database_does_not_exist
end.
--spec active_doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}.
-active_doc(DbName, DocId) ->
- try
- Shards = mem3:shards(DbName),
- Live = [node() | nodes()],
- Nodes = lists:usort([N || #shard{node=N} <- Shards,
- lists:member(N, Live)]),
- Owner = mem3:owner(DbName, DocId, Nodes),
- case active_doc_rpc(DbName, DocId, [Owner]) of
- {ok, DocInfo} ->
- {ok, DocInfo};
+remove_jobs() ->
+ % If we clear a large number of jobs make sure to use batching so we don't
+ % take too long, if use individual transactions, and also don't timeout if
+ % use a single transaction
+ FoldFun = fun
+ (_, JobId, _, _, Acc) when length(Acc) > 250 ->
+ couch_replicator_jobs:remove_jobs(undefined, [JobId | Acc]);
+ (_, JobId, _, _, Acc) ->
+ [JobId | Acc]
+ end,
+ Acc = couch_replicator_jobs:fold_jobs(undefined, FoldFun, []),
+ [] = couch_replicator_jobs:remove_jobs(undefined, Acc),
+ ok.
+
+
+get_job_ids() ->
+ couch_replicator_jobs:get_job_ids(undefined).
+
+
+% Private functions
+
+-spec start_transient_job(binary(), #{}) -> ok.
+start_transient_job(JobId, #{} = Rep) ->
+ JobData = couch_replicator_jobs:new_job(Rep, null, null, null,
+ ?ST_PENDING, null, null),
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ case couch_replicator_jobs:get_job_data(JTx, JobId) of
+ {ok, #{?REP := OldRep, ?STATE := State}} ->
+ SameRep = couch_replicator_utils:compare_reps(Rep, OldRep),
+ Active = State =:= ?ST_PENDING orelse State =:= ?ST_RUNNING,
+ case SameRep andalso Active of
+ true ->
+ % If a job with the same paremeters is running we don't
+ % stop and just ignore the request. This is mainly for
+ % compatibility where users are able to idempotently
+ % POST the same job without it being stopped and
+ % restarted.
+ ok;
+ false ->
+ couch_replicator_jobs:add_job(JTx, JobId, JobData)
+ end;
{error, not_found} ->
- active_doc_rpc(DbName, DocId, Nodes -- [Owner])
+ ok = couch_replicator_jobs:add_job(JTx, JobId, JobData)
end
- catch
- % Might be a local database
- error:database_does_not_exist ->
- active_doc_rpc(DbName, DocId, [node()])
- end.
+ end).
--spec active_doc_rpc(binary(), binary(), [node()]) ->
- {ok, {[_]}} | {error, not_found}.
-active_doc_rpc(_DbName, _DocId, []) ->
- {error, not_found};
-active_doc_rpc(DbName, DocId, [Node]) when Node =:= node() ->
- couch_replicator_doc_processor:doc(DbName, DocId);
-active_doc_rpc(DbName, DocId, Nodes) ->
- {Res, _Bad} = rpc:multicall(Nodes, couch_replicator_doc_processor, doc,
- [DbName, DocId]),
- case [DocInfo || {ok, DocInfo} <- Res] of
- [DocInfo | _] ->
- {ok, DocInfo};
- [] ->
- {error, not_found}
- end.
+-spec cancel_replication(job_id()) ->
+ {ok, {cancelled, binary()}} | {error, not_found}.
+cancel_replication(JobId) when is_binary(JobId) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ Id = case couch_replicator_jobs:get_job_data(JTx, JobId) of
+ {ok, #{?REP_ID := RepId}} when is_binary(RepId) ->
+ RepId;
+ _ ->
+ JobId
+ end,
+ couch_log:notice("Canceling replication '~s'", [Id]),
+ case couch_replicator_jobs:remove_job(JTx, JobId) of
+ {error, not_found} ->
+ {error, not_found};
+ ok ->
+ {ok, {cancelled, Id}}
+ end
+ end).
--spec doc(binary(), binary(), any()) -> {ok, {[_]}} | {error, not_found}.
-doc(RepDb, DocId, UserCtx) ->
- case active_doc(RepDb, DocId) of
- {ok, DocInfo} ->
- {ok, DocInfo};
- {error, not_found} ->
- doc_from_db(RepDb, DocId, UserCtx)
- end.
+process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
+ ok;
+process_change(#{} = Db, #doc{deleted = true} = Doc) ->
+ DbUUID = fabric2_db:get_uuid(Db),
+ JobId = couch_replicator_ids:job_id(DbUUID, Doc#doc.id),
+ couch_replicator_jobs:remove_job(undefined, JobId);
--spec doc_from_db(binary(), binary(), any()) -> {ok, {[_]}} | {error, not_found}.
-doc_from_db(RepDb, DocId, UserCtx) ->
- case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of
- {ok, Doc} ->
- {ok, info_from_doc(RepDb, couch_doc:to_json_obj(Doc, []))};
- {not_found, _Reason} ->
- {error, not_found}
- end.
+process_change(#{} = Db, #doc{deleted = false} = Doc) ->
+ #doc{id = DocId, body = {Props} = Body} = Doc,
+ DbName = fabric2_db:name(Db),
+ DbUUID = fabric2_db:get_uuid(Db),
+ {Rep, DocState, Error} = try
+ Rep0 = couch_replicator_parse:parse_rep_doc(Body),
+ DocState0 = couch_util:get_value(?REPLICATION_STATE, Props, null),
+ {Rep0, DocState0, null}
+ catch
+ throw:{bad_rep_doc, Reason} ->
+ {null, null, couch_replicator_utils:rep_error_to_binary(Reason)}
+ end,
+ JobId = couch_replicator_ids:job_id(DbUUID, DocId),
+ JobData = case Rep of
+ null ->
+ couch_relicator_jobs:new_job(Rep, DbName, DbUUID, DocId,
+ ?ST_FAILED, Error, null);
+ #{} ->
+ couch_replicator_jobs:new_job(Rep, DbName, DbUUID, DocId,
+ ?ST_PENDING, null, DocState)
+ end,
+ LogMsg = "~p : replication doc update db:~s doc:~s job_id:~s doc_state:~s",
+ couch_log:notice(LogMsg, [?MODULE, DbName, DocId, JobId, DocState]),
+
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Db), fun(JTx) ->
+ case couch_replicator_jobs:get_job_data(JTx, JobId) of
+ {ok, #{?REP := null, ?STATE_INFO := Error}} when Rep =:= null ->
+ % Same error as before occurred, don't bother updating the job
+ ok;
+ {ok, #{?REP := null}} when Rep =:= null ->
+ % New error so the job is updated
+ couch_replicator_jobs:add_job(JTx, JobId, JobData);
+ {ok, #{?REP := OldRep, ?STATE := State}} when is_map(Rep) ->
+ SameRep = couch_replicator_utils:compare_reps(Rep, OldRep),
+ Active = State =:= ?ST_PENDING orelse State =:= ?ST_RUNNING,
+ case SameRep andalso Active of
+ true ->
+ % Document was changed but none of the parameters
+ % relevent for the replication job have changed, so
+ % make it a no-op
+ ok;
+ false ->
+ couch_replicator_jobs:add_job(JTx, JobId, JobData)
+ end;
+ {error, not_found} ->
+ couch_replicator_jobs:add_job(JTx, JobId, JobData)
+ end
--spec info_from_doc(binary(), {[_]}) -> {[_]}.
-info_from_doc(RepDb, {Props}) ->
- DocId = get_value(<<"_id">>, Props),
- Source = get_value(<<"source">>, Props),
- Target = get_value(<<"target">>, Props),
- State0 = state_atom(get_value(<<"_replication_state">>, Props, null)),
- StateTime = get_value(<<"_replication_state_time">>, Props, null),
- {State1, StateInfo, ErrorCount, StartTime} = case State0 of
- completed ->
- {InfoP} = get_value(<<"_replication_stats">>, Props, {[]}),
- case lists:keytake(<<"start_time">>, 1, InfoP) of
- {value, {_, Time}, InfoP1} ->
- {State0, {InfoP1}, 0, Time};
- false ->
- case lists:keytake(start_time, 1, InfoP) of
- {value, {_, Time}, InfoP1} ->
- {State0, {InfoP1}, 0, Time};
- false ->
- {State0, {InfoP}, 0, null}
- end
- end;
- failed ->
- Info = get_value(<<"_replication_state_reason">>, Props, nil),
- EJsonInfo = couch_replicator_utils:ejson_state_info(Info),
- {State0, EJsonInfo, 1, StateTime};
- _OtherState ->
- {null, null, 0, null}
+ end).
+
+
+-spec add_jobs_from_db(#{}) -> ok.
+add_jobs_from_db(#{} = TxDb) ->
+ FoldFun = fun
+ ({meta, _Meta}, ok) ->
+ {ok, ok};
+ (complete, ok) ->
+ {ok, ok};
+ ({row, Row}, ok) ->
+ Db = TxDb#{tx := undefined},
+ ok = process_change(Db, get_doc(TxDb, Row)),
+ {ok, ok}
end,
- {[
- {doc_id, DocId},
- {database, RepDb},
- {id, null},
- {source, strip_url_creds(Source)},
- {target, strip_url_creds(Target)},
- {state, State1},
- {error_count, ErrorCount},
- {info, StateInfo},
- {start_time, StartTime},
- {last_updated, StateTime}
- ]}.
-
-
-state_atom(<<"triggered">>) ->
- triggered; % This handles a legacy case were document wasn't converted yet
-state_atom(State) when is_binary(State) ->
- erlang:binary_to_existing_atom(State, utf8);
-state_atom(State) when is_atom(State) ->
- State.
+ Opts = [{restart_tx, true}],
+ {ok, ok} = fabric2_db:fold_docs(TxDb, FoldFun, ok, Opts),
+ ok.
+
+
+-spec get_doc(#{}, list()) -> #doc{}.
+get_doc(TxDb, Row) ->
+ {_, DocId} = lists:keyfind(id, 1, Row),
+ {ok, #doc{deleted = false} = Doc} = fabric2_db:open_doc(TxDb, DocId, []),
+ Doc.
+
+
+doc_ejson(#{} = JobData) ->
+ #{
+ ?REP := Rep,
+ ?REP_ID := RepId,
+ ?DB_NAME := DbName,
+ ?DOC_ID := DocId,
+ ?STATE := State,
+ ?STATE_INFO := Info0,
+ ?ERROR_COUNT := ErrorCount,
+ ?LAST_UPDATED := LastUpdatedSec,
+ ?REP_NODE := Node,
+ ?REP_PID := Pid,
+ ?REP_STATS := Stats
+ } = JobData,
+
+ #{
+ ?SOURCE := #{<<"url">> := Source, <<"proxy_url">> := SourceProxy},
+ ?TARGET := #{<<"url">> := Target, <<"proxy_url">> := TargetProxy},
+ ?START_TIME := StartSec
+ } = Rep,
+
+ LastUpdatedISO8601 = couch_replicator_utils:iso8601(LastUpdatedSec),
+ StartISO8601 = couch_replicator_utils:iso8601(StartSec),
+
+ Info = case State of
+ ?ST_RUNNING -> Stats;
+ ?ST_PENDING -> Stats;
+ _Other -> Info0
+ end,
+
+ #{
+ <<"id">> => RepId,
+ <<"database">> => DbName,
+ <<"doc_id">> => DocId,
+ <<"source">> => ejson_url(Source),
+ <<"target">> => ejson_url(Target),
+ <<"source_proxy">> => ejson_url(SourceProxy),
+ <<"target_proxy">> => ejson_url(TargetProxy),
+ <<"state">> => State,
+ <<"info">> => Info,
+ <<"error_count">> => ErrorCount,
+ <<"last_updated">> => LastUpdatedISO8601,
+ <<"start_time">> => StartISO8601,
+ <<"node">> => Node,
+ <<"pid">> => Pid
+ }.
+
+
+job_ejson(#{} = JobData) ->
+ #{
+ ?REP := Rep,
+ ?REP_ID := RepId,
+ ?DB_NAME := DbName,
+ ?DOC_ID := DocId,
+ ?STATE := State,
+ ?STATE_INFO := Info0,
+ ?JOB_HISTORY := History,
+ ?REP_STATS := Stats
+ } = JobData,
+
+ #{
+ ?SOURCE := #{<<"url">> := Source},
+ ?TARGET := #{<<"url">> := Target},
+ ?REP_USER := User,
+ ?START_TIME := StartSec
+ } = Rep,
+
+ StartISO8601 = couch_replicator_utils:iso8601(StartSec),
+
+ History1 = lists:map(fun(#{?HIST_TIMESTAMP := Ts} = Evt) ->
+ Evt#{?HIST_TIMESTAMP := couch_replicator_utils:iso8601(Ts)}
+ end, History),
+
+ Info = case State of
+ ?ST_RUNNING -> Stats;
+ ?ST_PENDING -> Stats;
+ _Other -> Info0
+ end,
+
+ #{
+ <<"id">> => RepId,
+ <<"database">> => DbName,
+ <<"doc_id">> => DocId,
+ <<"source">> => ejson_url(Source),
+ <<"target">> => ejson_url(Target),
+ <<"state">> => State,
+ <<"info">> => Info,
+ <<"user">> => User,
+ <<"history">> => History1,
+ <<"start_time">> => StartISO8601
+ }.
+
+
+ejson_url(Url) when is_binary(Url) ->
+ list_to_binary(couch_util:url_strip_password(Url));
+
+ejson_url(null) ->
+ null.
-spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
-check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
- case couch_replicator_scheduler:rep_state(RepId) of
- #rep{user_ctx = #user_ctx{name = Name}} ->
- ok;
- #rep{} ->
- couch_httpd:verify_is_server_admin(Ctx);
- nil ->
- not_found
+check_authorization(JobId, #user_ctx{} = Ctx) when is_binary(JobId) ->
+ #user_ctx{name = Name} = Ctx,
+ case couch_replicator_jobs:get_job_data(undefined, JobId) of
+ {error, not_found} ->
+ not_found;
+ {ok, #{?DB_NAME := DbName}} when is_binary(DbName) ->
+ throw({unauthorized, <<"Persistent replication collision">>});
+ {ok, #{?REP := #{?REP_USER := Name}}} ->
+ ok;
+ {ok, #{}} ->
+ couch_httpd:verify_is_server_admin(Ctx)
end.
@@ -309,16 +514,16 @@ authorization_test_() ->
t_admin_is_always_authorized() ->
?_test(begin
- expect_rep_user_ctx(<<"someuser">>, <<"_admin">>),
+ expect_job_data({ok, #{?REP => #{?REP_USER => <<"someuser">>}}}),
UserCtx = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]},
?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx))
end).
t_username_must_match() ->
- ?_test(begin
- expect_rep_user_ctx(<<"user">>, <<"somerole">>),
- UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]},
+ ?_test(begin
+ expect_job_data({ok, #{?REP => #{?REP_USER => <<"user1">>}}}),
+ UserCtx1 = #user_ctx{name = <<"user1">>, roles = [<<"somerole">>]},
?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx1)),
UserCtx2 = #user_ctx{name = <<"other">>, roles = [<<"somerole">>]},
?assertThrow({unauthorized, _}, check_authorization(<<"RepId">>,
@@ -327,8 +532,8 @@ t_username_must_match() ->
t_replication_not_found() ->
- ?_test(begin
- meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
+ ?_test(begin
+ expect_job_data({error, not_found}),
UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]},
?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx1)),
UserCtx2 = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]},
@@ -336,57 +541,8 @@ t_replication_not_found() ->
end).
-expect_rep_user_ctx(Name, Role) ->
- meck:expect(couch_replicator_scheduler, rep_state,
- fun(_Id) ->
- UserCtx = #user_ctx{name = Name, roles = [Role]},
- #rep{user_ctx = UserCtx}
- end).
-
+expect_job_data(JobDataRes) ->
+ meck:expect(couch_replicator_jobs, get_job_data, 2, JobDataRes).
-strip_url_creds_test_() ->
- {
- setup,
- fun() ->
- meck:expect(config, get, fun(_, _, Default) -> Default end)
- end,
- fun(_) ->
- meck:unload()
- end,
- [
- t_strip_http_basic_creds(),
- t_strip_http_props_creds(),
- t_strip_local_db_creds()
- ]
- }.
-
-
-t_strip_local_db_creds() ->
- ?_test(?assertEqual(<<"localdb">>, strip_url_creds(<<"localdb">>))).
-
-
-t_strip_http_basic_creds() ->
- ?_test(begin
- Url1 = <<"http://adm:pass@host/db">>,
- ?assertEqual(<<"http://adm:*****@host/db/">>, strip_url_creds(Url1)),
- Url2 = <<"https://adm:pass@host/db">>,
- ?assertEqual(<<"https://adm:*****@host/db/">>, strip_url_creds(Url2)),
- Url3 = <<"http://adm:pass@host:80/db">>,
- ?assertEqual(<<"http://adm:*****@host:80/db/">>, strip_url_creds(Url3)),
- Url4 = <<"http://adm:pass@host/db?a=b&c=d">>,
- ?assertEqual(<<"http://adm:*****@host/db?a=b&c=d">>,
- strip_url_creds(Url4))
- end).
-
-
-t_strip_http_props_creds() ->
- ?_test(begin
- Props1 = {[{<<"url">>, <<"http://adm:pass@host/db">>}]},
- ?assertEqual(<<"http://adm:*****@host/db/">>, strip_url_creds(Props1)),
- Props2 = {[ {<<"url">>, <<"http://host/db">>},
- {<<"headers">>, {[{<<"Authorization">>, <<"Basic pa55">>}]}}
- ]},
- ?assertEqual(<<"http://host/db/">>, strip_url_creds(Props2))
- end).
-endif.
diff --git a/src/couch_replicator/src/couch_replicator_epi.erl b/src/couch_replicator/src/couch_replicator_epi.erl
new file mode 100644
index 000000000..9fb1790b5
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_epi.erl
@@ -0,0 +1,58 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_replicator_epi).
+
+
+-behaviour(couch_epi_plugin).
+
+
+-export([
+ app/0,
+ providers/0,
+ services/0,
+ data_subscriptions/0,
+ data_providers/0,
+ processes/0,
+ notify/3
+]).
+
+
+app() ->
+ couch_replicator.
+
+
+providers() ->
+ [
+ {fabric2_db, couch_replicator_fabric2_plugin}
+ ].
+
+
+services() ->
+ [].
+
+
+data_subscriptions() ->
+ [].
+
+
+data_providers() ->
+ [].
+
+
+processes() ->
+ [].
+
+
+notify(_Key, _Old, _New) ->
+ ok.
diff --git a/src/couch_replicator/src/couch_replicator_fabric2_plugin.erl b/src/couch_replicator/src/couch_replicator_fabric2_plugin.erl
new file mode 100644
index 000000000..7bf614512
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_fabric2_plugin.erl
@@ -0,0 +1,36 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_replicator_fabric2_plugin).
+
+
+-export([
+ after_db_create/2,
+ after_db_delete/2,
+ after_doc_write/6
+]).
+
+
+after_db_create(DbName, DbUUID) ->
+ couch_replicator:after_db_create(DbName, DbUUID),
+ [DbName, DbUUID].
+
+
+after_db_delete(DbName, DbUUID) ->
+ couch_replicator:after_db_delete(DbName, DbUUID),
+ [DbName, DbUUID].
+
+
+after_doc_write(Db, Doc, Winner, OldWinner, RevId, Seq)->
+ couch_replicator:after_doc_write(Db, Doc, Winner, OldWinner, RevId, Seq),
+ [Db, Doc, Winner, OldWinner, RevId, Seq].