diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator.erl')
-rw-r--r-- | src/couch_replicator/src/couch_replicator.erl | 791 |
1 files changed, 504 insertions, 287 deletions
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl index b38f31b59..a690d37c3 100644 --- a/src/couch_replicator/src/couch_replicator.erl +++ b/src/couch_replicator/src/couch_replicator.erl @@ -14,285 +14,513 @@ -export([ replicate/2, - replication_states/0, + + jobs/0, job/1, - doc/3, - active_doc/2, - info_from_doc/2, - restart_job/1 + docs/2, + doc/2, + + after_db_create/2, + after_db_delete/2, + after_doc_write/6, + + ensure_rep_db_exists/0, + + rescan_jobs/0, + rescan_jobs/1, + reenqueue_jobs/0, + reenqueue_jobs/1, + remove_jobs/0, + get_job_ids/0 ]). + +-include_lib("ibrowse/include/ibrowse.hrl"). -include_lib("couch/include/couch_db.hrl"). -include("couch_replicator.hrl"). --include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). --include_lib("couch_mrview/include/couch_mrview.hrl"). --include_lib("mem3/include/mem3.hrl"). - --define(DESIGN_DOC_CREATION_DELAY_MSEC, 1000). --define(REPLICATION_STATES, [ - initializing, % Just added to scheduler - error, % Could not be turned into a replication job - running, % Scheduled and running - pending, % Scheduled and waiting to run - crashing, % Scheduled but crashing, backed off by the scheduler - completed, % Non-continuous (normal) completed replication - failed % Terminal failure, will not be retried anymore -]). - --import(couch_util, [ - get_value/2, - get_value/3 -]). -spec replicate({[_]}, any()) -> {ok, {continuous, binary()}} | - {ok, {[_]}} | + {ok, #{}} | {ok, {cancelled, binary()}} | {error, any()} | no_return(). -replicate(PostBody, Ctx) -> - {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx), - Rep = Rep0#rep{start_time = os:timestamp()}, - #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep, - case get_value(cancel, Options, false) of - true -> - CancelRepId = case get_value(id, Options, nil) of - nil -> - RepId; - RepId2 -> - RepId2 - end, - case check_authorization(CancelRepId, UserCtx) of - ok -> - cancel_replication(CancelRepId); - not_found -> - {error, not_found} - end; - false -> - check_authorization(RepId, UserCtx), - {ok, Listener} = rep_result_listener(RepId), - Result = do_replication_loop(Rep), - couch_replicator_notifier:stop(Listener), - Result +replicate(Body, #user_ctx{name = User} = UserCtx) -> + {ok, Id, Rep} = couch_replicator_parse:parse_transient_rep(Body, User), + #{?OPTIONS := Options} = Rep, + JobId = case couch_replicator_jobs:get_job_id(undefined, Id) of + {ok, JobId0} -> JobId0; + {error, not_found} -> Id + end, + case maps:get(<<"cancel">>, Options, false) of + true -> + case check_authorization(JobId, UserCtx) of + ok -> cancel_replication(JobId); + not_found -> {error, not_found} + end; + false -> + check_authorization(JobId, UserCtx), + ok = start_transient_job(JobId, Rep), + case maps:get(<<"continuous">>, Options, false) of + true -> + case couch_replicator_jobs:wait_running(JobId) of + {ok, #{?STATE := ?ST_RUNNING} = JobData} -> + {ok, {continuous, maps:get(?REP_ID, JobData)}}; + {ok, #{?STATE := ?ST_FAILED} = JobData} -> + {error, maps:get(?STATE_INFO, JobData)}; + {error, Error} -> + {error, Error} + end; + false -> + case couch_replicator_jobs:wait_result(JobId) of + {ok, #{?STATE := ?ST_COMPLETED} = JobData} -> + {ok, maps:get(?CHECKPOINT_HISTORY, JobData)}; + {ok, #{?STATE := ?ST_FAILED} = JobData} -> + {error, maps:get(?STATE_INFO, JobData)}; + {error, Error} -> + {error, Error} + end + end end. --spec do_replication_loop(#rep{}) -> - {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}. -do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) -> - ok = couch_replicator_scheduler:add_job(Rep), - case get_value(continuous, Options, false) of - true -> - {ok, {continuous, ?l2b(BaseId ++ Ext)}}; - false -> - wait_for_result(Id) +jobs() -> + FoldFun = fun(_JTx, _JobId, CouchJobsState, JobData, Acc) -> + case CouchJobsState of + pending -> [job_ejson(JobData) | Acc]; + running -> [job_ejson(JobData) | Acc]; + finished -> Acc + end + end, + couch_replicator_jobs:fold_jobs(undefined, FoldFun, []). + + +job(Id0) when is_binary(Id0) -> + Id1 = couch_replicator_ids:convert(Id0), + JobId = case couch_replicator_jobs:get_job_id(undefined, Id1) of + {ok, JobId0} -> JobId0; + {error, not_found} -> Id1 + end, + case couch_replicator_jobs:get_job_data(undefined, JobId) of + {ok, #{} = JobData} -> {ok, job_ejson(JobData)}; + {error, not_found} -> {error, not_found} end. --spec rep_result_listener(rep_id()) -> {ok, pid()}. -rep_result_listener(RepId) -> - ReplyTo = self(), - {ok, _Listener} = couch_replicator_notifier:start_link( - fun({_, RepId2, _} = Ev) when RepId2 =:= RepId -> - ReplyTo ! Ev; - (_) -> - ok - end). +docs(#{} = Db, States) when is_list(States) -> + DbName = fabric2_db:name(Db), + FoldFun = fun(_JTx, _JobId, _, JobData, Acc) -> + case JobData of + #{?DB_NAME := DbName, ?STATE := State} -> + case {States, lists:member(State, States)} of + {[], _} -> [doc_ejson(JobData) | Acc]; + {[_ | _], true} -> [doc_ejson(JobData) | Acc]; + {[_ | _], false} -> Acc + end; + #{} -> + Acc + end + end, + couch_replicator_jobs:fold_jobs(undefined, FoldFun, []). --spec wait_for_result(rep_id()) -> - {ok, {[_]}} | {error, any()}. -wait_for_result(RepId) -> - receive - {finished, RepId, RepResult} -> - {ok, RepResult}; - {error, RepId, Reason} -> - {error, Reason} +doc(#{} = Db, DocId) when is_binary(DocId) -> + DbUUID = fabric2_db:get_uuid(Db), + JobId = couch_replicator_ids:job_id(DbUUID, DocId), + case couch_replicator_jobs:get_job_data(undefined, JobId) of + {ok, #{} = JobData} -> {ok, doc_ejson(JobData)}; + {error, not_found} -> {error, not_found} end. --spec cancel_replication(rep_id()) -> - {ok, {cancelled, binary()}} | {error, not_found}. -cancel_replication({BasedId, Extension} = RepId) -> - FullRepId = BasedId ++ Extension, - couch_log:notice("Canceling replication '~s' ...", [FullRepId]), - case couch_replicator_scheduler:rep_state(RepId) of - #rep{} -> - ok = couch_replicator_scheduler:remove_job(RepId), - couch_log:notice("Replication '~s' cancelled", [FullRepId]), - {ok, {cancelled, ?l2b(FullRepId)}}; - nil -> - couch_log:notice("Replication '~s' not found", [FullRepId]), - {error, not_found} - end. +after_db_create(DbName, DbUUID) when ?IS_REP_DB(DbName)-> + couch_stats:increment_counter([couch_replicator, docs, dbs_created]), + try fabric2_db:open(DbName, [{uuid, DbUUID}, ?ADMIN_CTX]) of + {ok, Db} -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + ok = add_jobs_from_db(TxDb) + end) + catch + error:database_does_not_exist -> + ok + end; +after_db_create(_DbName, _DbUUID) -> + ok. --spec replication_states() -> [atom()]. -replication_states() -> - ?REPLICATION_STATES. +after_db_delete(DbName, DbUUID) when ?IS_REP_DB(DbName) -> + couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]), + FoldFun = fun(JTx, JobId, _, JobData, ok) -> + case JobData of + #{?DB_UUID := DbUUID} -> + ok = couch_replicator_jobs:remove_job(JTx, JobId); + #{} -> + ok + end + end, + couch_replicator_jobs:fold_jobs(undefined, FoldFun, ok); + +after_db_delete(_DbName, _DbUUID) -> + ok. + + +after_doc_write(#{name := DbName} = Db, #doc{} = Doc, _NewWinner, _OldWinner, + _NewRevId, _Seq) when ?IS_REP_DB(DbName) -> + couch_stats:increment_counter([couch_replicator, docs, db_changes]), + {Props} = Doc#doc.body, + case couch_util:get_value(?REPLICATION_STATE, Props) of + ?ST_COMPLETED -> ok; + ?ST_FAILED -> ok; + _ -> process_change(Db, Doc) + end; + +after_doc_write(_Db, _Doc, _NewWinner, _OldWinner, _NewRevId, _Seq) -> + ok. + + +% This is called from supervisor, must return ignore. +-spec ensure_rep_db_exists() -> ignore. +ensure_rep_db_exists() -> + couch_replicator_jobs:set_timeout(), + case config:get_boolean("replicator", "create_replicator_db", false) of + true -> + UserCtx = #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}, + Opts = [{user_ctx, UserCtx}, sys_db], + case fabric2_db:create(?REP_DB_NAME, Opts) of + {error, file_exists} -> ok; + {ok, _Db} -> ok + end; + false -> + ok + end, + ignore. --spec strip_url_creds(binary() | {[_]}) -> binary(). -strip_url_creds(Endpoint) -> - try - couch_replicator_docs:parse_rep_db(Endpoint, [], []) of - #httpdb{url = Url} -> - iolist_to_binary(couch_util:url_strip_password(Url)) + +% Testing and debug functions + +rescan_jobs() -> + rescan_jobs(?REP_DB_NAME). + + +rescan_jobs(DbName) when is_binary(DbName), ?IS_REP_DB(DbName) -> + try fabric2_db:open(DbName, [?ADMIN_CTX]) of + {ok, Db} -> + after_db_create(DbName, fabric2_db:get_uuid(Db)) catch - throw:{error, local_endpoints_not_supported} -> - Endpoint + error:database_does_not_exist -> + database_does_not_exist end. --spec job(binary()) -> {ok, {[_]}} | {error, not_found}. -job(JobId0) when is_binary(JobId0) -> - JobId = couch_replicator_ids:convert(JobId0), - {Res, _Bad} = rpc:multicall(couch_replicator_scheduler, job, [JobId]), - case [JobInfo || {ok, JobInfo} <- Res] of - [JobInfo| _] -> - {ok, JobInfo}; - [] -> - {error, not_found} - end. +reenqueue_jobs() -> + reenqueue_jobs(?REP_DB_NAME). --spec restart_job(binary() | list() | rep_id()) -> - {ok, {[_]}} | {error, not_found}. -restart_job(JobId0) -> - JobId = couch_replicator_ids:convert(JobId0), - {Res, _} = rpc:multicall(couch_replicator_scheduler, restart_job, [JobId]), - case [JobInfo || {ok, JobInfo} <- Res] of - [JobInfo| _] -> - {ok, JobInfo}; - [] -> - {error, not_found} +reenqueue_jobs(DbName) when is_binary(DbName), ?IS_REP_DB(DbName) -> + try fabric2_db:open(DbName, [?ADMIN_CTX]) of + {ok, Db} -> + DbUUID = fabric2_db:get_uuid(Db), + ok = after_db_delete(DbName, DbUUID), + ok = after_db_create(DbName, DbUUID) + catch + error:database_does_not_exist -> + database_does_not_exist end. --spec active_doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}. -active_doc(DbName, DocId) -> - try - Shards = mem3:shards(DbName), - Live = [node() | nodes()], - Nodes = lists:usort([N || #shard{node=N} <- Shards, - lists:member(N, Live)]), - Owner = mem3:owner(DbName, DocId, Nodes), - case active_doc_rpc(DbName, DocId, [Owner]) of - {ok, DocInfo} -> - {ok, DocInfo}; +remove_jobs() -> + % If we clear a large number of jobs make sure to use batching so we don't + % take too long, if use individual transactions, and also don't timeout if + % use a single transaction + FoldFun = fun + (_, JobId, _, _, Acc) when length(Acc) > 250 -> + couch_replicator_jobs:remove_jobs(undefined, [JobId | Acc]); + (_, JobId, _, _, Acc) -> + [JobId | Acc] + end, + Acc = couch_replicator_jobs:fold_jobs(undefined, FoldFun, []), + [] = couch_replicator_jobs:remove_jobs(undefined, Acc), + ok. + + +get_job_ids() -> + couch_replicator_jobs:get_job_ids(undefined). + + +% Private functions + +-spec start_transient_job(binary(), #{}) -> ok. +start_transient_job(JobId, #{} = Rep) -> + JobData = couch_replicator_jobs:new_job(Rep, null, null, null, + ?ST_PENDING, null, null), + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + case couch_replicator_jobs:get_job_data(JTx, JobId) of + {ok, #{?REP := OldRep, ?STATE := State}} -> + SameRep = couch_replicator_utils:compare_reps(Rep, OldRep), + Active = State =:= ?ST_PENDING orelse State =:= ?ST_RUNNING, + case SameRep andalso Active of + true -> + % If a job with the same paremeters is running we don't + % stop and just ignore the request. This is mainly for + % compatibility where users are able to idempotently + % POST the same job without it being stopped and + % restarted. + ok; + false -> + couch_replicator_jobs:add_job(JTx, JobId, JobData) + end; {error, not_found} -> - active_doc_rpc(DbName, DocId, Nodes -- [Owner]) + ok = couch_replicator_jobs:add_job(JTx, JobId, JobData) end - catch - % Might be a local database - error:database_does_not_exist -> - active_doc_rpc(DbName, DocId, [node()]) - end. + end). --spec active_doc_rpc(binary(), binary(), [node()]) -> - {ok, {[_]}} | {error, not_found}. -active_doc_rpc(_DbName, _DocId, []) -> - {error, not_found}; -active_doc_rpc(DbName, DocId, [Node]) when Node =:= node() -> - couch_replicator_doc_processor:doc(DbName, DocId); -active_doc_rpc(DbName, DocId, Nodes) -> - {Res, _Bad} = rpc:multicall(Nodes, couch_replicator_doc_processor, doc, - [DbName, DocId]), - case [DocInfo || {ok, DocInfo} <- Res] of - [DocInfo | _] -> - {ok, DocInfo}; - [] -> - {error, not_found} - end. +-spec cancel_replication(job_id()) -> + {ok, {cancelled, binary()}} | {error, not_found}. +cancel_replication(JobId) when is_binary(JobId) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + Id = case couch_replicator_jobs:get_job_data(JTx, JobId) of + {ok, #{?REP_ID := RepId}} when is_binary(RepId) -> + RepId; + _ -> + JobId + end, + couch_log:notice("Canceling replication '~s'", [Id]), + case couch_replicator_jobs:remove_job(JTx, JobId) of + {error, not_found} -> + {error, not_found}; + ok -> + {ok, {cancelled, Id}} + end + end). --spec doc(binary(), binary(), any()) -> {ok, {[_]}} | {error, not_found}. -doc(RepDb, DocId, UserCtx) -> - case active_doc(RepDb, DocId) of - {ok, DocInfo} -> - {ok, DocInfo}; - {error, not_found} -> - doc_from_db(RepDb, DocId, UserCtx) - end. +process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) -> + ok; +process_change(#{} = Db, #doc{deleted = true} = Doc) -> + DbUUID = fabric2_db:get_uuid(Db), + JobId = couch_replicator_ids:job_id(DbUUID, Doc#doc.id), + couch_replicator_jobs:remove_job(undefined, JobId); --spec doc_from_db(binary(), binary(), any()) -> {ok, {[_]}} | {error, not_found}. -doc_from_db(RepDb, DocId, UserCtx) -> - case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of - {ok, Doc} -> - {ok, info_from_doc(RepDb, couch_doc:to_json_obj(Doc, []))}; - {not_found, _Reason} -> - {error, not_found} - end. +process_change(#{} = Db, #doc{deleted = false} = Doc) -> + #doc{id = DocId, body = {Props} = Body} = Doc, + DbName = fabric2_db:name(Db), + DbUUID = fabric2_db:get_uuid(Db), + {Rep, DocState, Error} = try + Rep0 = couch_replicator_parse:parse_rep_doc(Body), + DocState0 = couch_util:get_value(?REPLICATION_STATE, Props, null), + {Rep0, DocState0, null} + catch + throw:{bad_rep_doc, Reason} -> + {null, null, couch_replicator_utils:rep_error_to_binary(Reason)} + end, + JobId = couch_replicator_ids:job_id(DbUUID, DocId), + JobData = case Rep of + null -> + couch_relicator_jobs:new_job(Rep, DbName, DbUUID, DocId, + ?ST_FAILED, Error, null); + #{} -> + couch_replicator_jobs:new_job(Rep, DbName, DbUUID, DocId, + ?ST_PENDING, null, DocState) + end, + LogMsg = "~p : replication doc update db:~s doc:~s job_id:~s doc_state:~s", + couch_log:notice(LogMsg, [?MODULE, DbName, DocId, JobId, DocState]), + + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Db), fun(JTx) -> + case couch_replicator_jobs:get_job_data(JTx, JobId) of + {ok, #{?REP := null, ?STATE_INFO := Error}} when Rep =:= null -> + % Same error as before occurred, don't bother updating the job + ok; + {ok, #{?REP := null}} when Rep =:= null -> + % New error so the job is updated + couch_replicator_jobs:add_job(JTx, JobId, JobData); + {ok, #{?REP := OldRep, ?STATE := State}} when is_map(Rep) -> + SameRep = couch_replicator_utils:compare_reps(Rep, OldRep), + Active = State =:= ?ST_PENDING orelse State =:= ?ST_RUNNING, + case SameRep andalso Active of + true -> + % Document was changed but none of the parameters + % relevent for the replication job have changed, so + % make it a no-op + ok; + false -> + couch_replicator_jobs:add_job(JTx, JobId, JobData) + end; + {error, not_found} -> + couch_replicator_jobs:add_job(JTx, JobId, JobData) + end --spec info_from_doc(binary(), {[_]}) -> {[_]}. -info_from_doc(RepDb, {Props}) -> - DocId = get_value(<<"_id">>, Props), - Source = get_value(<<"source">>, Props), - Target = get_value(<<"target">>, Props), - State0 = state_atom(get_value(<<"_replication_state">>, Props, null)), - StateTime = get_value(<<"_replication_state_time">>, Props, null), - {State1, StateInfo, ErrorCount, StartTime} = case State0 of - completed -> - {InfoP} = get_value(<<"_replication_stats">>, Props, {[]}), - case lists:keytake(<<"start_time">>, 1, InfoP) of - {value, {_, Time}, InfoP1} -> - {State0, {InfoP1}, 0, Time}; - false -> - case lists:keytake(start_time, 1, InfoP) of - {value, {_, Time}, InfoP1} -> - {State0, {InfoP1}, 0, Time}; - false -> - {State0, {InfoP}, 0, null} - end - end; - failed -> - Info = get_value(<<"_replication_state_reason">>, Props, nil), - EJsonInfo = couch_replicator_utils:ejson_state_info(Info), - {State0, EJsonInfo, 1, StateTime}; - _OtherState -> - {null, null, 0, null} + end). + + +-spec add_jobs_from_db(#{}) -> ok. +add_jobs_from_db(#{} = TxDb) -> + FoldFun = fun + ({meta, _Meta}, ok) -> + {ok, ok}; + (complete, ok) -> + {ok, ok}; + ({row, Row}, ok) -> + Db = TxDb#{tx := undefined}, + ok = process_change(Db, get_doc(TxDb, Row)), + {ok, ok} + end, + Opts = [{restart_tx, true}], + {ok, ok} = fabric2_db:fold_docs(TxDb, FoldFun, ok, Opts), + ok. + + +-spec get_doc(#{}, list()) -> #doc{}. +get_doc(TxDb, Row) -> + {_, DocId} = lists:keyfind(id, 1, Row), + {ok, #doc{deleted = false} = Doc} = fabric2_db:open_doc(TxDb, DocId, []), + Doc. + + +doc_ejson(#{} = JobData) -> + #{ + ?REP := Rep, + ?REP_ID := RepId, + ?DB_NAME := DbName, + ?DOC_ID := DocId, + ?STATE := State, + ?STATE_INFO := Info0, + ?ERROR_COUNT := ErrorCount, + ?LAST_UPDATED := LastUpdatedSec, + ?REP_NODE := Node, + ?REP_PID := Pid, + ?REP_STATS := Stats + } = JobData, + + #{ + ?SOURCE := #{<<"url">> := Source, <<"proxy_url">> := SourceProxy}, + ?TARGET := #{<<"url">> := Target, <<"proxy_url">> := TargetProxy}, + ?START_TIME := StartSec + } = Rep, + + LastUpdatedISO8601 = couch_replicator_utils:iso8601(LastUpdatedSec), + StartISO8601 = couch_replicator_utils:iso8601(StartSec), + + Info = case State of + ?ST_RUNNING -> Stats; + ?ST_PENDING -> Stats; + _Other -> Info0 end, - {[ - {doc_id, DocId}, - {database, RepDb}, - {id, null}, - {source, strip_url_creds(Source)}, - {target, strip_url_creds(Target)}, - {state, State1}, - {error_count, ErrorCount}, - {info, StateInfo}, - {start_time, StartTime}, - {last_updated, StateTime} - ]}. - - -state_atom(<<"triggered">>) -> - triggered; % This handles a legacy case were document wasn't converted yet -state_atom(State) when is_binary(State) -> - erlang:binary_to_existing_atom(State, utf8); -state_atom(State) when is_atom(State) -> - State. + + #{ + <<"id">> => RepId, + <<"database">> => DbName, + <<"doc_id">> => DocId, + <<"source">> => ejson_url(Source), + <<"target">> => ejson_url(Target), + <<"source_proxy">> => ejson_url(SourceProxy), + <<"target_proxy">> => ejson_url(TargetProxy), + <<"state">> => State, + <<"info">> => Info, + <<"error_count">> => ErrorCount, + <<"last_updated">> => LastUpdatedISO8601, + <<"start_time">> => StartISO8601, + <<"node">> => Node, + <<"pid">> => Pid + }. + + +job_ejson(#{} = JobData) -> + #{ + ?REP := Rep, + ?REP_ID := RepId, + ?DB_NAME := DbName, + ?DOC_ID := DocId, + ?STATE := State, + ?STATE_INFO := Info0, + ?JOB_HISTORY := History, + ?REP_STATS := Stats, + ?REP_NODE := Node, + ?REP_PID := Pid + } = JobData, + + #{ + ?SOURCE := #{<<"url">> := Source}, + ?TARGET := #{<<"url">> := Target}, + ?REP_USER := User, + ?START_TIME := StartSec + } = Rep, + + StartISO8601 = couch_replicator_utils:iso8601(StartSec), + + History1 = lists:map(fun(#{?HIST_TIMESTAMP := Ts} = Evt) -> + Evt#{?HIST_TIMESTAMP := couch_replicator_utils:iso8601(Ts)} + end, History), + + Info = case State of + ?ST_RUNNING -> Stats; + ?ST_PENDING -> Stats; + _Other -> Info0 + end, + + #{ + <<"id">> => RepId, + <<"database">> => DbName, + <<"doc_id">> => DocId, + <<"source">> => ejson_url(Source), + <<"target">> => ejson_url(Target), + <<"state">> => State, + <<"info">> => Info, + <<"user">> => User, + <<"history">> => History1, + <<"start_time">> => StartISO8601, + <<"node">> => Node, + <<"pid">> => Pid + }. + + +ejson_url(Url) when is_binary(Url) -> + strip_url_creds(Url); + +ejson_url(null) -> + null. + + +-spec strip_url_creds(binary()) -> binary() | null. +strip_url_creds(Url) -> + try + case ibrowse_lib:parse_url(binary_to_list(Url)) of + #url{} -> ok; + {error, Error} -> error(Error) + end, + iolist_to_binary(couch_util:url_strip_password(Url)) + catch + error:_ -> + % Avoid exposing any part of the URL in case there is a password in + % the malformed endpoint URL + null + end. -spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found. -check_authorization(RepId, #user_ctx{name = Name} = Ctx) -> - case couch_replicator_scheduler:rep_state(RepId) of - #rep{user_ctx = #user_ctx{name = Name}} -> - ok; - #rep{} -> - couch_httpd:verify_is_server_admin(Ctx); - nil -> - not_found +check_authorization(JobId, #user_ctx{} = Ctx) when is_binary(JobId) -> + #user_ctx{name = Name} = Ctx, + case couch_replicator_jobs:get_job_data(undefined, JobId) of + {error, not_found} -> + not_found; + {ok, #{?DB_NAME := DbName}} when is_binary(DbName) -> + throw({unauthorized, <<"Persistent replication collision">>}); + {ok, #{?REP := #{?REP_USER := Name}}} -> + ok; + {ok, #{}} -> + couch_httpd:verify_is_server_admin(Ctx) end. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). + authorization_test_() -> { @@ -300,52 +528,42 @@ authorization_test_() -> fun () -> ok end, fun (_) -> meck:unload() end, [ - t_admin_is_always_authorized(), - t_username_must_match(), - t_replication_not_found() + ?TDEF_FE(t_admin_is_always_authorized), + ?TDEF_FE(t_username_must_match), + ?TDEF_FE(t_replication_not_found) ] }. -t_admin_is_always_authorized() -> - ?_test(begin - expect_rep_user_ctx(<<"someuser">>, <<"_admin">>), - UserCtx = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]}, - ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx)) - end). +t_admin_is_always_authorized(_) -> + expect_job_data({ok, #{?REP => #{?REP_USER => <<"someuser">>}}}), + UserCtx = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]}, + ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx)). -t_username_must_match() -> - ?_test(begin - expect_rep_user_ctx(<<"user">>, <<"somerole">>), - UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]}, - ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx1)), - UserCtx2 = #user_ctx{name = <<"other">>, roles = [<<"somerole">>]}, - ?assertThrow({unauthorized, _}, check_authorization(<<"RepId">>, - UserCtx2)) - end). +t_username_must_match(_) -> + expect_job_data({ok, #{?REP => #{?REP_USER => <<"user1">>}}}), + UserCtx1 = #user_ctx{name = <<"user1">>, roles = [<<"somerole">>]}, + ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx1)), + UserCtx2 = #user_ctx{name = <<"other">>, roles = [<<"somerole">>]}, + ?assertThrow({unauthorized, _}, check_authorization(<<"RepId">>, + UserCtx2)). -t_replication_not_found() -> - ?_test(begin - meck:expect(couch_replicator_scheduler, rep_state, 1, nil), - UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]}, - ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx1)), - UserCtx2 = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]}, - ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx2)) - end). +t_replication_not_found(_) -> + expect_job_data({error, not_found}), + UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]}, + ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx1)), + UserCtx2 = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]}, + ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx2)). -expect_rep_user_ctx(Name, Role) -> - meck:expect(couch_replicator_scheduler, rep_state, - fun(_Id) -> - UserCtx = #user_ctx{name = Name, roles = [Role]}, - #rep{user_ctx = UserCtx} - end). +expect_job_data(JobDataRes) -> + meck:expect(couch_replicator_jobs, get_job_data, 2, JobDataRes). strip_url_creds_test_() -> - { + { setup, fun() -> meck:expect(config, get, fun(_, _, Default) -> Default end) @@ -353,40 +571,39 @@ strip_url_creds_test_() -> fun(_) -> meck:unload() end, - [ - t_strip_http_basic_creds(), - t_strip_http_props_creds(), - t_strip_local_db_creds() - ] + with([ + ?TDEF(t_strip_http_basic_creds), + ?TDEF(t_strip_url_creds_errors) + ]) }. -t_strip_local_db_creds() -> - ?_test(?assertEqual(<<"localdb">>, strip_url_creds(<<"localdb">>))). +t_strip_http_basic_creds(_) -> + Url1 = <<"http://adm:pass@host/db/">>, + ?assertEqual(<<"http://adm:*****@host/db/">>, strip_url_creds(Url1)), + Url2 = <<"https://adm:pass@host/db/">>, + ?assertEqual(<<"https://adm:*****@host/db/">>, strip_url_creds(Url2)), + Url3 = <<"http://adm:pass@host:80/db/">>, + ?assertEqual(<<"http://adm:*****@host:80/db/">>, strip_url_creds(Url3)), + Url4 = <<"http://adm:pass@host/db?a=b&c=d">>, + ?assertEqual(<<"http://adm:*****@host/db?a=b&c=d">>, + strip_url_creds(Url4)). + + +t_strip_url_creds_errors(_) -> + Bad1 = <<"http://adm:pass/bad">>, + ?assertEqual(null, strip_url_creds(Bad1)), + Bad2 = <<"more garbage">>, + ?assertEqual(null, strip_url_creds(Bad2)), + Bad3 = <<"http://a:b:c">>, + ?assertEqual(null, strip_url_creds(Bad3)), + Bad4 = <<"http://adm:pass:pass/bad">>, + ?assertEqual(null, strip_url_creds(Bad4)), + ?assertEqual(null, strip_url_creds(null)), + ?assertEqual(null, strip_url_creds(42)), + ?assertEqual(null, strip_url_creds([<<"a">>, <<"b">>])), + Bad5 = <<"http://adm:pass/bad">>, + ?assertEqual(null, strip_url_creds(Bad5)). -t_strip_http_basic_creds() -> - ?_test(begin - Url1 = <<"http://adm:pass@host/db">>, - ?assertEqual(<<"http://adm:*****@host/db/">>, strip_url_creds(Url1)), - Url2 = <<"https://adm:pass@host/db">>, - ?assertEqual(<<"https://adm:*****@host/db/">>, strip_url_creds(Url2)), - Url3 = <<"http://adm:pass@host:80/db">>, - ?assertEqual(<<"http://adm:*****@host:80/db/">>, strip_url_creds(Url3)), - Url4 = <<"http://adm:pass@host/db?a=b&c=d">>, - ?assertEqual(<<"http://adm:*****@host/db?a=b&c=d">>, - strip_url_creds(Url4)) - end). - - -t_strip_http_props_creds() -> - ?_test(begin - Props1 = {[{<<"url">>, <<"http://adm:pass@host/db">>}]}, - ?assertEqual(<<"http://adm:*****@host/db/">>, strip_url_creds(Props1)), - Props2 = {[ {<<"url">>, <<"http://host/db">>}, - {<<"headers">>, {[{<<"Authorization">>, <<"Basic pa55">>}]}} - ]}, - ?assertEqual(<<"http://host/db/">>, strip_url_creds(Props2)) - end). - -endif. |