diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-08-28 04:32:32 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2020-09-15 16:13:46 -0400 |
commit | b38d77fbada7cce7de288d2cdcca8839b09888f4 (patch) | |
tree | a146a339cda2721ffe59ba50a2f18d394ab83ea1 | |
parent | b6e87f8a43eebb4d02dfa52227ba5b77cd4ebc68 (diff) | |
download | couchdb-b38d77fbada7cce7de288d2cdcca8839b09888f4.tar.gz |
Move parsing and validation to couch_replicator_parse module
This module is in responsible for parsing either an HTTP `_replicate` request
body, or a _replicator doc into an internal `Rep` object (an Erlang map).
`parse_transient_rep/2` parses _replicate requests. It also handles
cancelations, where requests bodies look like ```{"id": ..., "cancel": true}```
instead of having all the expected parameters.
`parse_rep_doc/1` parses _replicator docs.
Parsing consists of 3 main parts:
- Parsing the endpoint definitions: source and target url, headers, TLS bits
and proxies
- Parsing options into an options map, possibly using defaults from config
parameters
- Parsing socket parameters. These now have a hard-coded allow-list as opposed
accepting all possible Erlang socket options.
The parsing function also double as validation function which gets called from
the _replicator's before_doc_update callback when users update replication
documents. They would get an immediate feedback if their replicationd document
is malformed.
Everything is turned into a map object. This object should be able to be
serialized and de-serialized to (from) JSON.
Since maps are used, add the definitions of some common fields
couch_replicator.hrl. Mistyping them should raise a compiler error.
couch_replicator_docs lost all of its parsing function and also functions which
update intermediate replication doc states (triggered and error). It still
handles function which relate to interacting with _replicator docs.
-rw-r--r-- | src/couch_replicator/src/couch_replicator.hrl | 102 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_docs.erl | 870 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_parse.erl | 545 |
3 files changed, 751 insertions, 766 deletions
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl index 2a5b7c8c8..28a86d91b 100644 --- a/src/couch_replicator/src/couch_replicator.hrl +++ b/src/couch_replicator/src/couch_replicator.hrl @@ -12,32 +12,80 @@ -define(REP_ID_VERSION, 4). --record(rep, { - id :: rep_id() | '_' | 'undefined', - source :: any() | '_', - target :: any() | '_', - options :: [_] | '_', - user_ctx :: any() | '_', - type = db :: atom() | '_', - view = nil :: any() | '_', - doc_id :: any() | '_', - db_name = null :: null | binary() | '_', - start_time = {0, 0, 0} :: erlang:timestamp() | '_', - stats = couch_replicator_stats:new() :: orddict:orddict() | '_' -}). - --type rep_id() :: {string(), string()}. +% Some fields from the replication doc +-define(SOURCE, <<"source">>). +-define(TARGET, <<"target">>). +-define(CREATE_TARGET, <<"create_target">>). +-define(DOC_IDS, <<"doc_ids">>). +-define(SELECTOR, <<"selector">>). +-define(FILTER, <<"filter">>). +-define(QUERY_PARAMS, <<"query_params">>). +-define(URL, <<"url">>). +-define(AUTH, <<"auth">>). +-define(HEADERS, <<"headers">>). +-define(PROXY, <<"proxy">>). +-define(SOURCE_PROXY, <<"source_proxy">>). +-define(TARGET_PROXY, <<"target_proxy">>). + +-define(REPLICATION_STATE, <<"_replication_state">>). +-define(REPLICATION_STATS, <<"_replication_stats">>). +-define(REPLICATION_ID, <<"_replication_id">>). +-define(REPLICATION_STATE_TIME, <<"_replication_state_time">>). +-define(REPLICATION_STATE_REASON, <<"_replication_state_reason">>). + +% Replication states +-define(ST_ERROR, <<"error">>). +-define(ST_COMPLETED, <<"completed">>). +-define(ST_RUNNING, <<"running">>). +-define(ST_FAILED, <<"failed">>). +-define(ST_PENDING, <<"pending">>). +-define(ST_CRASHING, <<"crashing">>). + +% Some fields from a rep object +-define(REP_ID, <<"rep_id">>). +-define(BASE_ID, <<"base_id">>). +-define(DB_NAME, <<"db_name">>). +-define(DB_UUID, <<"db_uuid">>). +-define(DOC_ID, <<"doc_id">>). +-define(REP_USER, <<"rep_user">>). +-define(START_TIME, <<"start_time">>). +-define(OPTIONS, <<"options">>). + +% Fields for couch job data objects +-define(REP, <<"rep">>). +-define(REP_PARSE_ERROR, <<"rep_parse_error">>). +-define(REP_STATS, <<"rep_stats">>). +-define(STATE, <<"state">>). +-define(STATE_INFO, <<"state_info">>). +-define(DOC_STATE, <<"doc_state">>). +-define(ERROR_COUNT, <<"error_count">>). +-define(LAST_UPDATED, <<"last_updated">>). +-define(LAST_START, <<"last_start">>). +-define(LAST_ERROR, <<"last_error">>). +-define(JOB_HISTORY, <<"job_history">>). +-define(CHECKPOINT_HISTORY, <<"checkpoint_history">>). +-define(REP_NODE, <<"node">>). +-define(REP_PID, <<"pid">>). + +% Job history tags +-define(HIST_TYPE, <<"type">>). +-define(HIST_TIMESTAMP, <<"timestamp">>). +-define(HIST_REASON, <<"reason">>). +-define(HIST_ADDED, <<"added">>). +-define(HIST_STARTED, <<"started">>). +-define(HIST_STOPPED, <<"stopped">>). +-define(HIST_PENDING, <<"pending">>). +-define(HIST_CRASHED, <<"crashed">>). + +-define(REP_DB_NAME, <<"_replicator">>). + +% Can be used as a guard +-define(IS_REP_DB(X), (X =:= ?REP_DB_NAME orelse + binary_part(X, {byte_size(X), -12}) =:= <<"/_replicator">>)). + + +-type rep_id() :: binary(). +-type job_id() :: binary(). +-type user_name() :: binary() | null. -type db_doc_id() :: {binary(), binary() | '_'}. -type seconds() :: non_neg_integer(). --type rep_start_result() :: - {ok, rep_id()} | - ignore | - {temporary_error, binary()} | - {permanent_failure, binary()}. - - --record(doc_worker_result, { - id :: db_doc_id(), - wref :: reference(), - result :: rep_start_result() -}). diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl index 619063222..f84d1299a 100644 --- a/src/couch_replicator/src/couch_replicator_docs.erl +++ b/src/couch_replicator/src/couch_replicator_docs.erl @@ -13,306 +13,142 @@ -module(couch_replicator_docs). -export([ - parse_rep_doc/1, - parse_rep_doc/2, - parse_rep_db/3, - parse_rep_doc_without_id/1, - parse_rep_doc_without_id/2, + remove_state_fields/3, + update_completed/4, + update_failed/4, before_doc_update/3, - after_doc_read/2, - ensure_rep_ddoc_exists/1, - ensure_cluster_rep_ddoc_exists/1, - remove_state_fields/2, - update_doc_completed/3, - update_failed/3, - update_rep_id/1, - update_triggered/2, - update_error/2 + after_doc_read/2 ]). -include_lib("couch/include/couch_db.hrl"). --include_lib("ibrowse/include/ibrowse.hrl"). --include_lib("mem3/include/mem3.hrl"). -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). --include("couch_replicator_js_functions.hrl"). - --import(couch_util, [ - get_value/2, - get_value/3, - to_binary/1 -]). - --import(couch_replicator_utils, [ - get_json_value/2, - get_json_value/3 -]). --define(REP_DB_NAME, <<"_replicator">>). --define(REP_DESIGN_DOC, <<"_design/_replicator">>). -define(OWNER, <<"owner">>). -define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}). -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})). -remove_state_fields(DbName, DocId) -> - update_rep_doc(DbName, DocId, [ - {<<"_replication_state">>, undefined}, - {<<"_replication_state_time">>, undefined}, - {<<"_replication_state_reason">>, undefined}, - {<<"_replication_id">>, undefined}, - {<<"_replication_stats">>, undefined}]). +remove_state_fields(null, null, null) -> + ok; +remove_state_fields(DbName, DbUUID, DocId) -> + update_rep_doc(DbName, DbUUID, DocId, [ + {?REPLICATION_STATE, undefined}, + {?REPLICATION_STATE_TIME, undefined}, + {?REPLICATION_STATE_REASON, undefined}, + {?REPLICATION_ID, undefined}, + {?REPLICATION_STATS, undefined} + ]), + ok. --spec update_doc_completed(binary(), binary(), [_]) -> any(). -update_doc_completed(DbName, DocId, Stats) -> - update_rep_doc(DbName, DocId, [ - {<<"_replication_state">>, <<"completed">>}, - {<<"_replication_state_reason">>, undefined}, - {<<"_replication_stats">>, {Stats}}]), - couch_stats:increment_counter([couch_replicator, docs, - completed_state_updates]). +-spec update_completed(binary(), binary(), binary(), [_]) -> ok. +update_completed(null, null, _, _) -> + ok; --spec update_failed(binary(), binary(), any()) -> any(). -update_failed(DbName, DocId, Error) -> - Reason = error_reason(Error), - couch_log:error("Error processing replication doc `~s` from `~s`: ~s", - [DocId, DbName, Reason]), - update_rep_doc(DbName, DocId, [ - {<<"_replication_state">>, <<"failed">>}, - {<<"_replication_stats">>, undefined}, - {<<"_replication_state_reason">>, Reason}]), +update_completed(DbName, DbUUID, DocId, #{} = Stats0) -> + Stats = {maps:to_list(Stats0)}, + update_rep_doc(DbName, DbUUID, DocId, [ + {?REPLICATION_STATE, ?ST_COMPLETED}, + {?REPLICATION_STATE_REASON, undefined}, + {?REPLICATION_STATS, Stats}]), couch_stats:increment_counter([couch_replicator, docs, - failed_state_updates]). - - --spec update_triggered(#rep{}, rep_id()) -> ok. -update_triggered(Rep, {Base, Ext}) -> - #rep{ - db_name = DbName, - doc_id = DocId - } = Rep, - update_rep_doc(DbName, DocId, [ - {<<"_replication_state">>, <<"triggered">>}, - {<<"_replication_state_reason">>, undefined}, - {<<"_replication_id">>, iolist_to_binary([Base, Ext])}, - {<<"_replication_stats">>, undefined}]), + completed_state_updates + ]), ok. --spec update_error(#rep{}, any()) -> ok. -update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) -> - Reason = error_reason(Error), - BinRepId = case RepId of - {Base, Ext} -> - iolist_to_binary([Base, Ext]); - _Other -> - null - end, - update_rep_doc(DbName, DocId, [ - {<<"_replication_state">>, <<"error">>}, - {<<"_replication_state_reason">>, Reason}, - {<<"_replication_stats">>, undefined}, - {<<"_replication_id">>, BinRepId}]), - ok. - +-spec update_failed(binary(), binary(), binary(), any()) -> ok. +update_failed(null, null, null, _) -> + ok; --spec ensure_rep_ddoc_exists(binary()) -> ok. -ensure_rep_ddoc_exists(RepDb) -> - case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of - true -> - ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC); - false -> - ok - end. - - --spec ensure_rep_ddoc_exists(binary(), binary()) -> ok. -ensure_rep_ddoc_exists(RepDb, DDocId) -> - case open_rep_doc(RepDb, DDocId) of - {not_found, no_db_file} -> - %% database was deleted. - ok; - {not_found, _Reason} -> - DocProps = replication_design_doc_props(DDocId), - DDoc = couch_doc:from_json_obj({DocProps}), - couch_log:notice("creating replicator ddoc ~p", [RepDb]), - {ok, _Rev} = save_rep_doc(RepDb, DDoc); - {ok, Doc} -> - Latest = replication_design_doc_props(DDocId), - {Props0} = couch_doc:to_json_obj(Doc, []), - {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0), - case compare_ejson({Props}, {Latest}) of - true -> - ok; - false -> - LatestWithRev = [{<<"_rev">>, Rev} | Latest], - DDoc = couch_doc:from_json_obj({LatestWithRev}), - couch_log:notice("updating replicator ddoc ~p", [RepDb]), - try - {ok, _} = save_rep_doc(RepDb, DDoc) - catch - throw:conflict -> - %% ignore, we'll retry next time - ok - end - end - end, +update_failed(DbName, DbUUID, DocId, Error) -> + Reason = error_reason(Error), + couch_log:error("Error processing replication doc `~s` from `~s`: ~s", + [DocId, DbName, Reason]), + update_rep_doc(DbName, DbUUID, DocId, [ + {?REPLICATION_STATE, ?ST_FAILED}, + {?REPLICATION_STATS, undefined}, + {?REPLICATION_STATE_REASON, Reason} + ]), + couch_stats:increment_counter([couch_replicator, docs, + failed_state_updates]), ok. --spec ensure_cluster_rep_ddoc_exists(binary()) -> ok. -ensure_cluster_rep_ddoc_exists(RepDb) -> - DDocId = ?REP_DESIGN_DOC, - [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId), - ensure_rep_ddoc_exists(DbShard, DDocId). - - --spec compare_ejson({[_]}, {[_]}) -> boolean(). -compare_ejson(EJson1, EJson2) -> - EjsonSorted1 = couch_replicator_filters:ejsort(EJson1), - EjsonSorted2 = couch_replicator_filters:ejsort(EJson2), - EjsonSorted1 == EjsonSorted2. - - --spec replication_design_doc_props(binary()) -> [_]. -replication_design_doc_props(DDocId) -> - [ - {<<"_id">>, DDocId}, - {<<"language">>, <<"javascript">>}, - {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} - ]. - +-spec before_doc_update(#doc{}, Db::any(), couch_db:update_type()) -> #doc{}. +before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _, _) -> + Doc; +before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) -> + #user_ctx{roles = Roles, name = Name} = fabric2_db:get_user_ctx(Db), + IsReplicator = lists:member(<<"_replicator">>, Roles), -% Note: parse_rep_doc can handle filtered replications. During parsing of the -% replication doc it will make possibly remote http requests to the source -% database. If failure or parsing of filter docs fails, parse_doc throws a -% {filter_fetch_error, Error} excation. This exception should be considered -% transient in respect to the contents of the document itself, since it depends -% on netowrk availability of the source db and other factors. --spec parse_rep_doc({[_]}) -> #rep{}. -parse_rep_doc(RepDoc) -> - {ok, Rep} = try - parse_rep_doc(RepDoc, rep_user_ctx(RepDoc)) - catch - throw:{error, Reason} -> - throw({bad_rep_doc, Reason}); - throw:{filter_fetch_error, Reason} -> - throw({filter_fetch_error, Reason}); - Tag:Err -> - throw({bad_rep_doc, to_binary({Tag, Err})}) + Doc1 = case IsReplicator of true -> Doc; false -> + case couch_util:get_value(?OWNER, Body) of + undefined -> + Doc#doc{body = {?replace(Body, ?OWNER, Name)}}; + Name -> + Doc; + Other -> + case (catch fabric2_db:check_is_admin(Db)) of + ok when Other =:= null -> + Doc#doc{body = {?replace(Body, ?OWNER, Name)}}; + ok -> + Doc; + _ -> + throw({forbidden, <<"Can't update replication", + "documents from other users.">>}) + end + end end, - Rep. - --spec parse_rep_doc_without_id({[_]}) -> #rep{}. -parse_rep_doc_without_id(RepDoc) -> - {ok, Rep} = try - parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc)) - catch - throw:{error, Reason} -> - throw({bad_rep_doc, Reason}); - Tag:Err -> - throw({bad_rep_doc, to_binary({Tag, Err})}) + Deleted = Doc1#doc.deleted, + IsFailed = couch_util:get_value(?REPLICATION_STATE, Body) == ?ST_FAILED, + case IsReplicator orelse Deleted orelse IsFailed of true -> ok; false -> + try + couch_replicator_parse:parse_rep_doc(Doc1#doc.body) + catch + throw:{bad_rep_doc, Error} -> + throw({forbidden, Error}) + end end, - Rep. - - --spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}. -parse_rep_doc(Doc, UserCtx) -> - {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx), - Cancel = get_value(cancel, Rep#rep.options, false), - Id = get_value(id, Rep#rep.options, nil), - case {Cancel, Id} of - {true, nil} -> - % Cancel request with no id, must parse id out of body contents - {ok, update_rep_id(Rep)}; - {true, Id} -> - % Cancel request with an id specified, so do not parse id from body - {ok, Rep}; - {false, _Id} -> - % Not a cancel request, regular replication doc - {ok, update_rep_id(Rep)} - end. - - --spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}. -parse_rep_doc_without_id({Props}, UserCtx) -> - {SrcProxy, TgtProxy} = parse_proxy_settings(Props), - Opts = make_options(Props), - case get_value(cancel, Opts, false) andalso - (get_value(id, Opts, nil) =/= nil) of - true -> - {ok, #rep{options = Opts, user_ctx = UserCtx}}; - false -> - Source = parse_rep_db(get_value(<<"source">>, Props), SrcProxy, Opts), - Target = parse_rep_db(get_value(<<"target">>, Props), TgtProxy, Opts), - {Type, View} = case couch_replicator_filters:view_type(Props, Opts) of - {error, Error} -> - throw({bad_request, Error}); - Result -> - Result - end, - Rep = #rep{ - source = Source, - target = Target, - options = Opts, - user_ctx = UserCtx, - type = Type, - view = View, - doc_id = get_value(<<"_id">>, Props, null) - }, - % Check if can parse filter code, if not throw exception - case couch_replicator_filters:parse(Opts) of - {error, FilterError} -> - throw({error, FilterError}); - {ok, _Filter} -> - ok - end, - {ok, Rep} - end. + Doc1. -parse_proxy_settings(Props) when is_list(Props) -> - Proxy = get_value(<<"proxy">>, Props, <<>>), - SrcProxy = get_value(<<"source_proxy">>, Props, <<>>), - TgtProxy = get_value(<<"target_proxy">>, Props, <<>>), - - case Proxy =/= <<>> of - true when SrcProxy =/= <<>> -> - Error = "`proxy` is mutually exclusive with `source_proxy`", - throw({bad_request, Error}); - true when TgtProxy =/= <<>> -> - Error = "`proxy` is mutually exclusive with `target_proxy`", - throw({bad_request, Error}); - true -> - {Proxy, Proxy}; - false -> - {SrcProxy, TgtProxy} +-spec after_doc_read(#doc{}, Db::any()) -> #doc{}. +after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) -> + Doc; +after_doc_read(#doc{body = {Body}} = Doc, Db) -> + #user_ctx{name = Name} = fabric2_db:get_user_ctx(Db), + case (catch fabric2_db:check_is_admin(Db)) of ok -> Doc; _ -> + case couch_util:get_value(?OWNER, Body) of Name -> Doc; _ -> + Source0 = couch_util:get_value(<<"source">>, Body), + Target0 = couch_util:get_value(<<"target">>, Body), + Source = strip_credentials(Source0), + Target = strip_credentials(Target0), + NewBody0 = ?replace(Body, <<"source">>, Source), + NewBody = ?replace(NewBody0, <<"target">>, Target), + #doc{revs = {Pos, [_ | Revs]}} = Doc, + NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}}, + fabric2_db:new_revid(Db, NewDoc) + end end. -% Update a #rep{} record with a replication_id. Calculating the id might involve -% fetching a filter from the source db, and so it could fail intermetently. -% In case of a failure to fetch the filter this function will throw a -% `{filter_fetch_error, Reason} exception. -update_rep_id(Rep) -> - RepId = couch_replicator_ids:replication_id(Rep), - Rep#rep{id = RepId}. +update_rep_doc(RepDbName, RepDbUUID, RepDocId, KVs) -> + update_rep_doc(RepDbName, RepDbUUID, RepDocId, KVs, 1). -update_rep_doc(RepDbName, RepDocId, KVs) -> - update_rep_doc(RepDbName, RepDocId, KVs, 1). - - -update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) -> +update_rep_doc(RepDbName, RepDbUUID, RepDocId, KVs, Wait) + when is_binary(RepDocId) -> try - case open_rep_doc(RepDbName, RepDocId) of + case open_rep_doc(RepDbName, RepDbUUID, RepDocId) of {ok, LastRepDoc} -> - update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2); + update_rep_doc(RepDbName, RepDbUUID, LastRepDoc, KVs, + Wait * 2); _ -> ok end @@ -321,25 +157,25 @@ update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) -> Msg = "Conflict when updating replication doc `~s`. Retrying.", couch_log:error(Msg, [RepDocId]), ok = timer:sleep(couch_rand:uniform(erlang:min(128, Wait)) * 100), - update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2) + update_rep_doc(RepDbName, RepDbUUID, RepDocId, KVs, Wait * 2) end; -update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) -> +update_rep_doc(RepDbName, RepDbUUID, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) -> NewRepDocBody = lists:foldl( - fun({K, undefined}, Body) -> + fun({K, undefined}, Body) when is_binary(K) -> lists:keydelete(K, 1, Body); - ({<<"_replication_state">> = K, State} = KV, Body) -> - case get_json_value(K, Body) of + ({?REPLICATION_STATE = K, State} = KV, Body) when is_binary(K) -> + case couch_util:get_value(K, Body) of State -> Body; _ -> Body1 = lists:keystore(K, 1, Body, KV), - Timestamp = couch_replicator_utils:iso8601(os:timestamp()), + Timestamp = couch_replicator_utils:iso8601(), lists:keystore( - <<"_replication_state_time">>, 1, Body1, - {<<"_replication_state_time">>, Timestamp}) + ?REPLICATION_STATE_TIME, 1, Body1, + {?REPLICATION_STATE_TIME, Timestamp}) end; - ({K, _V} = KV, Body) -> + ({K, _V} = KV, Body) when is_binary(K) -> lists:keystore(K, 1, Body, KV) end, RepDocBody, KVs), @@ -349,331 +185,37 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) -> _ -> % Might not succeed - when the replication doc is deleted right % before this update (not an error, ignore). - save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}}) + save_rep_doc(RepDbName, RepDbUUID, RepDoc#doc{body = {NewRepDocBody}}) end. -open_rep_doc(DbName, DocId) -> - case couch_db:open_int(DbName, [?CTX, sys_db]) of - {ok, Db} -> - try - couch_db:open_doc(Db, DocId, [ejson_body]) - after - couch_db:close(Db) - end; - Else -> - Else +open_rep_doc(DbName, DbUUID, DocId) when is_binary(DbName), is_binary(DbUUID), + is_binary(DocId) -> + try + case fabric2_db:open(DbName, [?CTX, sys_db, {uuid, DbUUID}]) of + {ok, Db} -> fabric2_db:open_doc(Db, DocId, [ejson_body]); + Else -> Else + end + catch + error:database_does_not_exist -> + {not_found, database_does_not_exist} end. -save_rep_doc(DbName, Doc) -> - {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]), +save_rep_doc(DbName, DbUUID, Doc) when is_binary(DbName), is_binary(DbUUID) -> try - couch_db:update_doc(Db, Doc, []) + {ok, Db} = fabric2_db:open(DbName, [?CTX, sys_db, {uuid, DbUUID}]), + fabric2_db:update_doc(Db, Doc, []) catch + error:database_does_not_exist -> + {not_found, database_does_not_exist}; % User can accidently write a VDU which prevents _replicator from % updating replication documents. Avoid crashing replicator and thus % preventing all other replication jobs on the node from running. throw:{forbidden, Reason} -> - Msg = "~p VDU function preventing doc update to ~s ~s ~p", + Msg = "~p VDU or BDU function preventing doc update to ~s ~s ~p", couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]), {ok, forbidden} - after - couch_db:close(Db) - end. - - --spec rep_user_ctx({[_]}) -> #user_ctx{}. -rep_user_ctx({RepDoc}) -> - case get_json_value(<<"user_ctx">>, RepDoc) of - undefined -> - #user_ctx{}; - {UserCtx} -> - #user_ctx{ - name = get_json_value(<<"name">>, UserCtx, null), - roles = get_json_value(<<"roles">>, UserCtx, []) - } - end. - - --spec parse_rep_db({[_]} | binary(), binary(), [_]) -> #httpd{} | binary(). -parse_rep_db({Props}, Proxy, Options) -> - ProxyParams = parse_proxy_params(Proxy), - ProxyURL = case ProxyParams of - [] -> undefined; - _ -> binary_to_list(Proxy) - end, - Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)), - {AuthProps} = get_value(<<"auth">>, Props, {[]}), - {BinHeaders} = get_value(<<"headers">>, Props, {[]}), - Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]), - DefaultHeaders = (#httpdb{})#httpdb.headers, - #httpdb{ - url = Url, - auth_props = AuthProps, - headers = lists:ukeymerge(1, Headers, DefaultHeaders), - ibrowse_options = lists:keysort(1, - [{socket_options, get_value(socket_options, Options)} | - ProxyParams ++ ssl_params(Url)]), - timeout = get_value(connection_timeout, Options), - http_connections = get_value(http_connections, Options), - retries = get_value(retries, Options), - proxy_url = ProxyURL - }; - -parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) -> - parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options); - -parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) -> - parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options); - -parse_rep_db(<<_/binary>>, _Proxy, _Options) -> - throw({error, local_endpoints_not_supported}); - -parse_rep_db(undefined, _Proxy, _Options) -> - throw({error, <<"Missing replicator database">>}). - - --spec maybe_add_trailing_slash(binary() | list()) -> list(). -maybe_add_trailing_slash(Url) when is_binary(Url) -> - maybe_add_trailing_slash(?b2l(Url)); -maybe_add_trailing_slash(Url) -> - case lists:member($?, Url) of - true -> - Url; % skip if there are query params - false -> - case lists:last(Url) of - $/ -> - Url; - _ -> - Url ++ "/" - end - end. - - --spec make_options([_]) -> [_]. -make_options(Props) -> - Options0 = lists:ukeysort(1, convert_options(Props)), - Options = check_options(Options0), - DefWorkers = config:get("replicator", "worker_processes", "4"), - DefBatchSize = config:get("replicator", "worker_batch_size", "500"), - DefConns = config:get("replicator", "http_connections", "20"), - DefTimeout = config:get("replicator", "connection_timeout", "30000"), - DefRetries = config:get("replicator", "retries_per_request", "5"), - UseCheckpoints = config:get("replicator", "use_checkpoints", "true"), - DefCheckpointInterval = config:get("replicator", "checkpoint_interval", - "30000"), - {ok, DefSocketOptions} = couch_util:parse_term( - config:get("replicator", "socket_options", - "[{keepalive, true}, {nodelay, false}]")), - lists:ukeymerge(1, Options, lists:keysort(1, [ - {connection_timeout, list_to_integer(DefTimeout)}, - {retries, list_to_integer(DefRetries)}, - {http_connections, list_to_integer(DefConns)}, - {socket_options, DefSocketOptions}, - {worker_batch_size, list_to_integer(DefBatchSize)}, - {worker_processes, list_to_integer(DefWorkers)}, - {use_checkpoints, list_to_existing_atom(UseCheckpoints)}, - {checkpoint_interval, list_to_integer(DefCheckpointInterval)} - ])). - - --spec convert_options([_]) -> [_]. -convert_options([])-> - []; -convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V)-> - throw({bad_request, <<"parameter `cancel` must be a boolean">>}); -convert_options([{<<"cancel">>, V} | R]) -> - [{cancel, V} | convert_options(R)]; -convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>; - IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> -> - [{id, couch_replicator_ids:convert(V)} | convert_options(R)]; -convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)-> - throw({bad_request, <<"parameter `create_target` must be a boolean">>}); -convert_options([{<<"create_target">>, V} | R]) -> - [{create_target, V} | convert_options(R)]; -convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) -> - throw({bad_request, - <<"parameter `create_target_params` must be a JSON object">>}); -convert_options([{<<"create_target_params">>, V} | R]) -> - [{create_target_params, V} | convert_options(R)]; -convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V)-> - throw({bad_request, <<"parameter `continuous` must be a boolean">>}); -convert_options([{<<"continuous">>, V} | R]) -> - [{continuous, V} | convert_options(R)]; -convert_options([{<<"filter">>, V} | R]) -> - [{filter, V} | convert_options(R)]; -convert_options([{<<"query_params">>, V} | R]) -> - [{query_params, V} | convert_options(R)]; -convert_options([{<<"doc_ids">>, null} | R]) -> - convert_options(R); -convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) -> - throw({bad_request, <<"parameter `doc_ids` must be an array">>}); -convert_options([{<<"doc_ids">>, V} | R]) -> - % Ensure same behaviour as old replicator: accept a list of percent - % encoded doc IDs. - DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]), - [{doc_ids, DocIds} | convert_options(R)]; -convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) -> - throw({bad_request, <<"parameter `selector` must be a JSON object">>}); -convert_options([{<<"selector">>, V} | R]) -> - [{selector, V} | convert_options(R)]; -convert_options([{<<"worker_processes">>, V} | R]) -> - [{worker_processes, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"worker_batch_size">>, V} | R]) -> - [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"http_connections">>, V} | R]) -> - [{http_connections, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"connection_timeout">>, V} | R]) -> - [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"retries_per_request">>, V} | R]) -> - [{retries, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"socket_options">>, V} | R]) -> - {ok, SocketOptions} = couch_util:parse_term(V), - [{socket_options, SocketOptions} | convert_options(R)]; -convert_options([{<<"since_seq">>, V} | R]) -> - [{since_seq, V} | convert_options(R)]; -convert_options([{<<"use_checkpoints">>, V} | R]) -> - [{use_checkpoints, V} | convert_options(R)]; -convert_options([{<<"checkpoint_interval">>, V} | R]) -> - [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([_ | R]) -> % skip unknown option - convert_options(R). - - --spec check_options([_]) -> [_]. -check_options(Options) -> - DocIds = lists:keyfind(doc_ids, 1, Options), - Filter = lists:keyfind(filter, 1, Options), - Selector = lists:keyfind(selector, 1, Options), - case {DocIds, Filter, Selector} of - {false, false, false} -> Options; - {false, false, _} -> Options; - {false, _, false} -> Options; - {_, false, false} -> Options; - _ -> - throw({bad_request, - "`doc_ids`,`filter`,`selector` are mutually exclusive"}) - end. - - --spec parse_proxy_params(binary() | [_]) -> [_]. -parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> - parse_proxy_params(?b2l(ProxyUrl)); -parse_proxy_params([]) -> - []; -parse_proxy_params(ProxyUrl) -> - #url{ - host = Host, - port = Port, - username = User, - password = Passwd, - protocol = Protocol - } = ibrowse_lib:parse_url(ProxyUrl), - [ - {proxy_protocol, Protocol}, - {proxy_host, Host}, - {proxy_port, Port} - ] ++ case is_list(User) andalso is_list(Passwd) of - false -> - []; - true -> - [{proxy_user, User}, {proxy_password, Passwd}] - end. - - --spec ssl_params([_]) -> [_]. -ssl_params(Url) -> - case ibrowse_lib:parse_url(Url) of - #url{protocol = https} -> - Depth = list_to_integer( - config:get("replicator", "ssl_certificate_max_depth", "3") - ), - VerifyCerts = config:get("replicator", "verify_ssl_certificates"), - CertFile = config:get("replicator", "cert_file", undefined), - KeyFile = config:get("replicator", "key_file", undefined), - Password = config:get("replicator", "password", undefined), - SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")], - SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of - true -> - case Password of - undefined -> - [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts; - _ -> - [{certfile, CertFile}, {keyfile, KeyFile}, - {password, Password}] ++ SslOpts - end; - false -> SslOpts - end, - [{is_ssl, true}, {ssl_options, SslOpts1}]; - #url{protocol = http} -> - [] - end. - - --spec ssl_verify_options(true | false) -> [_]. -ssl_verify_options(true) -> - CAFile = config:get("replicator", "ssl_trusted_certificates_file"), - [{verify, verify_peer}, {cacertfile, CAFile}]; -ssl_verify_options(false) -> - [{verify, verify_none}]. - - --spec before_doc_update(#doc{}, Db::any(), couch_db:update_type()) -> #doc{}. -before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) -> - Doc; -before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) -> - #user_ctx{ - roles = Roles, - name = Name - } = couch_db:get_user_ctx(Db), - case lists:member(<<"_replicator">>, Roles) of - true -> - Doc; - false -> - case couch_util:get_value(?OWNER, Body) of - undefined -> - Doc#doc{body = {?replace(Body, ?OWNER, Name)}}; - Name -> - Doc; - Other -> - case (catch couch_db:check_is_admin(Db)) of - ok when Other =:= null -> - Doc#doc{body = {?replace(Body, ?OWNER, Name)}}; - ok -> - Doc; - _ -> - throw({forbidden, <<"Can't update replication documents", - " from other users.">>}) - end - end - end. - - --spec after_doc_read(#doc{}, Db::any()) -> #doc{}. -after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) -> - Doc; -after_doc_read(#doc{body = {Body}} = Doc, Db) -> - #user_ctx{name = Name} = couch_db:get_user_ctx(Db), - case (catch couch_db:check_is_admin(Db)) of - ok -> - Doc; - _ -> - case couch_util:get_value(?OWNER, Body) of - Name -> - Doc; - _Other -> - Source = strip_credentials(couch_util:get_value(<<"source">>, -Body)), - Target = strip_credentials(couch_util:get_value(<<"target">>, -Body)), - NewBody0 = ?replace(Body, <<"source">>, Source), - NewBody = ?replace(NewBody0, <<"target">>, Target), - #doc{revs = {Pos, [_ | Revs]}} = Doc, - NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}}, - NewRevId = couch_db:new_revid(NewDoc), - NewDoc#doc{revs = {Pos, [NewRevId | Revs]}} - end end. @@ -698,164 +240,14 @@ strip_credentials({Props0}) -> error_reason({shutdown, Error}) -> error_reason(Error); error_reason({bad_rep_doc, Reason}) -> - to_binary(Reason); + couch_util:to_binary(Reason); +error_reason(#{<<"error">> := Error, <<"reason">> := Reason}) + when is_binary(Error), is_binary(Reason) -> + couch_util:to_binary(io_list:format("~s: ~s", [Error, Reason])); error_reason({error, {Error, Reason}}) - when is_atom(Error), is_binary(Reason) -> - to_binary(io_lib:format("~s: ~s", [Error, Reason])); + when is_atom(Error), is_binary(Reason) -> + couch_util:to_binary(io_lib:format("~s: ~s", [Error, Reason])); error_reason({error, Reason}) -> - to_binary(Reason); + couch_util:to_binary(Reason); error_reason(Reason) -> - to_binary(Reason). - - --ifdef(TEST). - - --include_lib("couch/include/couch_eunit.hrl"). - - -check_options_pass_values_test() -> - ?assertEqual(check_options([]), []), - ?assertEqual(check_options([baz, {other, fiz}]), [baz, {other, fiz}]), - ?assertEqual(check_options([{doc_ids, x}]), [{doc_ids, x}]), - ?assertEqual(check_options([{filter, x}]), [{filter, x}]), - ?assertEqual(check_options([{selector, x}]), [{selector, x}]). - - -check_options_fail_values_test() -> - ?assertThrow({bad_request, _}, - check_options([{doc_ids, x}, {filter, y}])), - ?assertThrow({bad_request, _}, - check_options([{doc_ids, x}, {selector, y}])), - ?assertThrow({bad_request, _}, - check_options([{filter, x}, {selector, y}])), - ?assertThrow({bad_request, _}, - check_options([{doc_ids, x}, {selector, y}, {filter, z}])). - - -check_convert_options_pass_test() -> - ?assertEqual([], convert_options([])), - ?assertEqual([], convert_options([{<<"random">>, 42}])), - ?assertEqual([{cancel, true}], - convert_options([{<<"cancel">>, true}])), - ?assertEqual([{create_target, true}], - convert_options([{<<"create_target">>, true}])), - ?assertEqual([{continuous, true}], - convert_options([{<<"continuous">>, true}])), - ?assertEqual([{doc_ids, [<<"id">>]}], - convert_options([{<<"doc_ids">>, [<<"id">>]}])), - ?assertEqual([{selector, {key, value}}], - convert_options([{<<"selector">>, {key, value}}])). - - -check_convert_options_fail_test() -> - ?assertThrow({bad_request, _}, - convert_options([{<<"cancel">>, <<"true">>}])), - ?assertThrow({bad_request, _}, - convert_options([{<<"create_target">>, <<"true">>}])), - ?assertThrow({bad_request, _}, - convert_options([{<<"continuous">>, <<"true">>}])), - ?assertThrow({bad_request, _}, - convert_options([{<<"doc_ids">>, not_a_list}])), - ?assertThrow({bad_request, _}, - convert_options([{<<"selector">>, [{key, value}]}])). - -check_strip_credentials_test() -> - [?assertEqual(Expected, strip_credentials(Body)) || {Expected, Body} <- [ - { - undefined, - undefined - }, - { - <<"https://remote_server/database">>, - <<"https://foo:bar@remote_server/database">> - }, - { - {[{<<"_id">>, <<"foo">>}]}, - {[{<<"_id">>, <<"foo">>}, {<<"headers">>, <<"bar">>}]} - }, - { - {[{<<"_id">>, <<"foo">>}, {<<"other">>, <<"bar">>}]}, - {[{<<"_id">>, <<"foo">>}, {<<"other">>, <<"bar">>}]} - }, - { - {[{<<"_id">>, <<"foo">>}]}, - {[{<<"_id">>, <<"foo">>}, {<<"headers">>, <<"baz">>}]} - }, - { - {[{<<"_id">>, <<"foo">>}]}, - {[{<<"_id">>, <<"foo">>}, {<<"auth">>, <<"pluginsecret">>}]} - } - ]]. - - -setup() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - ok = couch_db:close(Db), - create_vdu(DbName), - DbName. - - -teardown(DbName) when is_binary(DbName) -> - couch_server:delete(DbName, [?ADMIN_CTX]), - ok. - - -create_vdu(DbName) -> - couch_util:with_db(DbName, fun(Db) -> - VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>, - Doc = #doc{ - id = <<"_design/vdu">>, - body = {[{<<"validate_doc_update">>, VduFun}]} - }, - {ok, _} = couch_db:update_docs(Db, [Doc]) - end). - - -update_replicator_doc_with_bad_vdu_test_() -> - { - setup, - fun test_util:start_couch/0, - fun test_util:stop_couch/1, - { - foreach, fun setup/0, fun teardown/1, - [ - fun t_vdu_does_not_crash_on_save/1 - ] - } - }. - - -t_vdu_does_not_crash_on_save(DbName) -> - ?_test(begin - Doc = #doc{id = <<"some_id">>, body = {[{<<"foo">>, 42}]}}, - ?assertEqual({ok, forbidden}, save_rep_doc(DbName, Doc)) - end). - - -local_replication_endpoint_error_test_() -> - { - foreach, - fun () -> meck:expect(config, get, - fun(_, _, Default) -> Default end) - end, - fun (_) -> meck:unload() end, - [ - t_error_on_local_endpoint() - ] - }. - - -t_error_on_local_endpoint() -> - ?_test(begin - RepDoc = {[ - {<<"_id">>, <<"someid">>}, - {<<"source">>, <<"localdb">>}, - {<<"target">>, <<"http://somehost.local/tgt">>} - ]}, - Expect = local_endpoints_not_supported, - ?assertThrow({bad_rep_doc, Expect}, parse_rep_doc_without_id(RepDoc)) - end). - --endif. + couch_util:to_binary(Reason). diff --git a/src/couch_replicator/src/couch_replicator_parse.erl b/src/couch_replicator/src/couch_replicator_parse.erl new file mode 100644 index 000000000..5996ec507 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_parse.erl @@ -0,0 +1,545 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_parse). + + +-export([ + parse_rep_doc/1, + parse_transient_rep/2, + parse_rep/2, + parse_rep_db/3 +]). + + +-include_lib("ibrowse/include/ibrowse.hrl"). +-include("couch_replicator.hrl"). + + +-define(DEFAULT_SOCK_OPTS, "[{keepalive, true}, {nodelay, false}]"). +-define(VALID_SOCK_OPTS, [ + buffer, + delay_send, + exit_on_close, + ipv6_v6only, + keepalive, + nodelay, + recbuf, + send_timeout, + send_timout_close, + sndbuf, + priority, + tos, + tclass +]). +-define(VALID_PROXY_PROTOCOLS, [http, https, socks5]). +-define(CONFIG_DEFAULTS, [ + {"worker_processes", "4", fun list_to_integer/1}, + {"worker_batch_size", "500", fun list_to_integer/1}, + {"http_connections", "20", fun list_to_integer/1}, + {"connection_timeout", "30000", fun list_to_integer/1}, + {"retries_per_request", "5", fun list_to_integer/1}, + {"use_checkpoints", "true", fun list_to_existing_atom/1}, + {"checkpoint_interval", "30000", fun list_to_integer/1}, + {"socket_options", ?DEFAULT_SOCK_OPTS, fun parse_sock_opts/1} +]). + + +-spec parse_rep_doc({[_]}) -> #{}. +parse_rep_doc(RepDoc) -> + {ok, Rep} = try + parse_rep(RepDoc, null) + catch + throw:{error, Reason} -> + Stack = erlang:get_stacktrace(), + LogErr1 = "~p parse_rep_doc fail ~p ~p", + couch_log:error(LogErr1, [?MODULE, Reason, Stack]), + throw({bad_rep_doc, Reason}); + Tag:Err -> + Stack = erlang:get_stacktrace(), + LogErr2 = "~p parse_rep_doc fail ~p:~p ~p", + couch_log:error(LogErr2, [?MODULE, Tag, Err, Stack]), + throw({bad_rep_doc, couch_util:to_binary({Tag, Err})}) + end, + Rep. + + +-spec parse_transient_rep({[_]} | #{}, user_name()) -> {ok, #{}}. +parse_transient_rep({Props} = EJson, UserName) when is_list(Props) -> + Str = couch_util:json_encode(EJson), + Map = couch_util:json_decode(Str, [return_maps]), + parse_transient_rep(Map, UserName); + +parse_transient_rep(#{} = Body, UserName) -> + {ok, Rep} = try + parse_rep(Body, UserName) + catch + throw:{error, Reason} -> + Stack = erlang:get_stacktrace(), + LogErr1 = "~p parse_transient_rep fail ~p ~p", + couch_log:error(LogErr1, [?MODULE, Reason, Stack]), + throw({bad_request, Reason}); + Tag:Err -> + Stack = erlang:get_stacktrace(), + LogErr2 = "~p parse_transient_rep fail ~p ~p", + couch_log:error(LogErr2, [?MODULE, Tag, Err, Stack]), + throw({bad_request, couch_util:to_binary({Tag, Err})}) + end, + #{?OPTIONS := Options} = Rep, + Cancel = maps:get(<<"cancel">>, Options, false), + Id = maps:get(<<"id">>, Options, nil), + case {Cancel, Id} of + {true, nil} -> + % Cancel request with no id, must parse id out of body contents + JobId = couch_replicator_ids:job_id(Rep, null, null), + {ok, JobId, Rep}; + {true, Id} -> + % Cancel request with an id specified, so do not parse id from body + {ok, Id, Rep}; + {false, _Id} -> + JobId = couch_replicator_ids:job_id(Rep, null, null), + % Not a cancel request, regular replication doc + {ok, JobId, Rep} + end. + + +-spec parse_rep({[_]} | #{}, user_name()) -> {ok, #{}}. +parse_rep({Props} = EJson, UserName) when is_list(Props) -> + Str = couch_util:json_encode(EJson), + Map = couch_util:json_decode(Str, [return_maps]), + parse_rep(Map, UserName); + +parse_rep(#{} = Doc, UserName) -> + {SrcProxy, TgtProxy} = parse_proxy_settings(Doc), + Opts = make_options(Doc), + Cancel = maps:get(<<"cancel">>, Opts, false), + Id = maps:get(<<"id">>, Opts, nil), + case Cancel andalso Id =/= nil of + true -> + {ok, #{?OPTIONS => Opts, ?REP_USER => UserName}}; + false -> + case {maps:is_key(?SOURCE, Doc), maps:is_key(?TARGET, Doc)} of + {false, _} -> throw({error, <<"Missing `source` field">>}); + {_, false} -> throw({error, <<"Missing `target` field">>}); + {true, true} -> ok + end, + #{?SOURCE := Source0, ?TARGET := Target0} = Doc, + Source = parse_rep_db(Source0, SrcProxy, Opts), + Target = parse_rep_db(Target0, TgtProxy, Opts), + case couch_replicator_filters:view_type(Doc, Opts) of + {error, Error} -> throw({error, Error}); + _ -> ok + end, + case couch_replicator_filters:parse(Opts) of + {ok, _} -> ok; + {error, FilterError} -> throw({error, FilterError}) + end, + Rep = #{ + ?SOURCE => Source, + ?TARGET => Target, + ?OPTIONS => Opts, + ?REP_USER => UserName, + ?START_TIME => erlang:system_time(second) + }, + {ok, Rep} + end. + + +-spec parse_rep_db(#{}, #{}, #{}) -> #{}. +parse_rep_db(#{} = Endpoint, #{} = ProxyParams, #{} = Options) -> + ProxyUrl = case ProxyParams of + #{<<"proxy_url">> := PUrl} -> PUrl; + _ -> null + end, + + Url0 = maps:get(<<"url">>, Endpoint), + Url = maybe_add_trailing_slash(Url0), + + AuthProps = maps:get(<<"auth">>, Endpoint, #{}), + if is_map(AuthProps) -> ok; true -> + throw({error, "if defined, `auth` must be an object"}) + end, + + Headers0 = maps:get(<<"headers">>, Endpoint, #{}), + if is_map(Headers0) -> ok; true -> + throw({error, "if defined `headers` must be an object"}) + end, + DefaultHeaders = couch_replicator_utils:default_headers_map(), + Headers = maps:merge(DefaultHeaders, Headers0), + + SockOpts = maps:get(<<"socket_options">>, Options, #{}), + SockAndProxy = maps:merge(#{ + <<"socket_options">> => SockOpts + }, ProxyParams), + SslParams = ssl_params(Url), + + #{ + <<"url">> => Url, + <<"auth_props">> => AuthProps, + <<"headers">> => Headers, + <<"ibrowse_options">> => maps:merge(SslParams, SockAndProxy), + <<"timeout">> => maps:get(<<"connection_timeout">>, Options), + <<"http_connections">> => maps:get(<<"http_connections">>, Options), + <<"retries">> => maps:get(<<"retries_per_request">>, Options), + <<"proxy_url">> => ProxyUrl + }; + +parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) -> + parse_rep_db(#{<<"url">> => Url}, Proxy, Options); + +parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) -> + parse_rep_db(#{<<"url">> => Url}, Proxy, Options); + +parse_rep_db(<<_/binary>>, _Proxy, _Options) -> + throw({error, local_endpoints_not_supported}); + +parse_rep_db(undefined, _Proxy, _Options) -> + throw({error, <<"Missing replication endpoint">>}). + + +parse_proxy_settings(#{} = Doc) -> + Proxy = maps:get(?PROXY, Doc, <<>>), + SrcProxy = maps:get(?SOURCE_PROXY, Doc, <<>>), + TgtProxy = maps:get(?TARGET_PROXY, Doc, <<>>), + + case Proxy =/= <<>> of + true when SrcProxy =/= <<>> -> + Error = "`proxy` is mutually exclusive with `source_proxy`", + throw({error, Error}); + true when TgtProxy =/= <<>> -> + Error = "`proxy` is mutually exclusive with `target_proxy`", + throw({error, Error}); + true -> + {parse_proxy_params(Proxy), parse_proxy_params(Proxy)}; + false -> + {parse_proxy_params(SrcProxy), parse_proxy_params(TgtProxy)} + end. + + +-spec maybe_add_trailing_slash(binary()) -> binary(). +maybe_add_trailing_slash(<<>>) -> + <<>>; + +maybe_add_trailing_slash(Url) when is_binary(Url) -> + case binary:match(Url, <<"?">>) of + nomatch -> + case binary:last(Url) of + $/ -> Url; + _ -> <<Url/binary, "/">> + end; + _ -> + Url % skip if there are query params + end. + + +-spec make_options(#{}) -> #{}. +make_options(#{} = RepDoc) -> + Options0 = convert_options(RepDoc), + Options = check_options(Options0), + ConfigOptions = lists:foldl(fun({K, Default, ConversionFun}, Acc) -> + V = ConversionFun(config:get("replicator", K, Default)), + Acc#{list_to_binary(K) => V} + end, #{}, ?CONFIG_DEFAULTS), + maps:merge(ConfigOptions, Options). + + +-spec convert_options(#{}) -> #{} | no_return(). +convert_options(#{} = Doc) -> + maps:fold(fun convert_fold/3, #{}, Doc). + + +-spec convert_fold(binary(), any(), #{}) -> #{}. +convert_fold(<<"cancel">>, V, Acc) when is_boolean(V) -> + Acc#{<<"cancel">> => V}; +convert_fold(<<"cancel">>, _, _) -> + throw({error, <<"`cancel` must be a boolean">>}); +convert_fold(IdOpt, V, Acc) when IdOpt =:= <<"_local_id">>; + IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> -> + Acc#{<<"id">> => couch_replicator_ids:convert(V)}; +convert_fold(<<"create_target">>, V, Acc) when is_boolean(V) -> + Acc#{<<"create_target">> => V}; +convert_fold(<<"create_target">>, _, _) -> + throw({error, <<"`create_target` must be a boolean">>}); +convert_fold(<<"create_target_params">>, #{} = V, Acc) -> + Acc#{<<"create_target_params">> => V}; +convert_fold(<<"create_target_params">>, _, _) -> + throw({error, <<"`create_target_params` must be an object">>}); +convert_fold(<<"continuous">>, V, Acc) when is_boolean(V) -> + Acc#{<<"continuous">> => V}; +convert_fold(<<"continuous">>, _, _) -> + throw({error, <<"`continuous` must be a boolean">>}); +convert_fold(<<"filter">>, V, Acc) when is_binary(V), byte_size(V) > 1 -> + Acc#{<<"filter">> => V}; +convert_fold(<<"filter">>, _, _) -> + throw({error, <<"`filter` must be a string">>}); +convert_fold(<<"query_params">>, V, Acc) when is_map(V) orelse V =:= null -> + Acc#{<<"query_params">> => V}; +convert_fold(<<"query_params">>, _, _Acc) -> + throw({error, <<"`query_params` is not `null` or object">>}); +convert_fold(<<"doc_ids">>, null, Acc) -> + Acc; +convert_fold(<<"doc_ids">>, V, Acc) when is_list(V) -> + % Compatibility behaviour as: accept a list of percent encoded doc IDs + Ids = lists:map(fun(Id) -> + case is_binary(Id) andalso byte_size(Id) > 0 of + true -> list_to_binary(couch_httpd:unquote(Id)); + false -> throw({error, <<"`doc_ids` array must contain strings">>}) + end + end, V), + Acc#{<<"doc_ids">> => lists:usort(Ids)}; +convert_fold(<<"doc_ids">>, _, _) -> + throw({error, <<"`doc_ids` must be an array">>}); +convert_fold(<<"selector">>, #{} = V, Acc) -> + Acc#{<<"selector">> => V}; +convert_fold(<<"selector">>, _, _Acc) -> + throw({error, <<"`selector` must be a JSON object">>}); +convert_fold(<<"worker_processes">>, V, Acc) -> + Acc#{<<"worker_processes">> => bin2int(V, <<"worker_processes">>)}; +convert_fold(<<"worker_batch_size">>, V, Acc) -> + Acc#{<<"worker_batch_size">> => bin2int(V, <<"worker_batch_size">>)}; +convert_fold(<<"http_connections">>, V, Acc) -> + Acc#{<<"http_connections">> => bin2int(V, <<"http_connections">>)}; +convert_fold(<<"connection_timeout">>, V, Acc) -> + Acc#{<<"connection_timeout">> => bin2int(V, <<"connection_timeout">>)}; +convert_fold(<<"retries_per_request">>, V, Acc) -> + Acc#{<<"retries_per_request">> => bin2int(V, <<"retries_per_request">>)}; +convert_fold(<<"socket_options">>, V, Acc) -> + Acc#{<<"socket_options">> => parse_sock_opts(V)}; +convert_fold(<<"since_seq">>, V, Acc) -> + Acc#{<<"since_seq">> => V}; +convert_fold(<<"use_checkpoints">>, V, Acc) when is_boolean(V) -> + Acc#{<<"use_checkpoints">> => V}; +convert_fold(<<"use_checkpoints">>, _, _) -> + throw({error, <<"`use_checkpoints` must be a boolean">>}); +convert_fold(<<"checkpoint_interval">>, V, Acc) -> + Acc#{<<"checkpoint_interval">> => bin2int(V, <<"checkpoint_interval">>)}; +convert_fold(_K, _V, Acc) -> % skip unknown option + Acc. + + +bin2int(V, _Field) when is_integer(V) -> + V; + +bin2int(V, Field) when is_binary(V) -> + try + erlang:binary_to_integer(V) + catch + error:badarg -> + throw({error, <<"`", Field/binary, "` must be an integer">>}) + end; + +bin2int(_V, Field) -> + throw({error, <<"`", Field/binary, "` must be an integer">>}). + + +-spec check_options(#{}) -> #{}. +check_options(Options) -> + DocIds = maps:is_key(<<"doc_ids">>, Options), + Filter = maps:is_key(<<"filter">>, Options), + Selector = maps:is_key(<<"selector">>, Options), + case {DocIds, Filter, Selector} of + {false, false, false} -> Options; + {false, false, _} -> Options; + {false, _, false} -> Options; + {_, false, false} -> Options; + _ -> throw({error, <<"`doc_ids`,`filter`,`selector` are mutually " + " exclusive">>}) + end. + + +parse_sock_opts(Term) -> + {ok, SocketOptions} = couch_util:parse_term(Term), + lists:foldl(fun + ({K, V}, Acc) when is_atom(K) -> + case lists:member(K, ?VALID_SOCK_OPTS) of + true -> Acc#{atom_to_binary(K, utf8) => V}; + false -> Acc + end; + (_, Acc) -> + Acc + end, #{}, SocketOptions). + + +-spec parse_proxy_params(binary() | #{}) -> #{}. +parse_proxy_params(<<>>) -> + #{}; +parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl)-> + #url{ + host = Host, + port = Port, + username = User, + password = Passwd, + protocol = Prot0 + } = ibrowse_lib:parse_url(binary_to_list(ProxyUrl)), + Prot = case lists:member(Prot0, ?VALID_PROXY_PROTOCOLS) of + true -> atom_to_binary(Prot0, utf8); + false -> throw({error, <<"Unsupported proxy protocol">>}) + end, + ProxyParams = #{ + <<"proxy_url">> => ProxyUrl, + <<"proxy_protocol">> => Prot, + <<"proxy_host">> => list_to_binary(Host), + <<"proxy_port">> => Port + }, + case is_list(User) andalso is_list(Passwd) of + true -> + ProxyParams#{ + <<"proxy_user">> => list_to_binary(User), + <<"proxy_password">> => list_to_binary(Passwd) + }; + false -> + ProxyParams + end. + + +-spec ssl_params(binary()) -> #{}. +ssl_params(Url) -> + case ibrowse_lib:parse_url(binary_to_list(Url)) of + #url{protocol = https} -> + Depth = list_to_integer( + config:get("replicator", "ssl_certificate_max_depth", "3") + ), + VerifyCerts = config:get("replicator", "verify_ssl_certificates"), + CertFile = config:get("replicator", "cert_file", null), + KeyFile = config:get("replicator", "key_file", null), + Password = config:get("replicator", "password", null), + VerifySslOptions = ssl_verify_options(VerifyCerts =:= "true"), + SslOpts = maps:merge(VerifySslOptions, #{<<"depth">> => Depth}), + HaveCertAndKey = CertFile /= null andalso KeyFile /= null, + SslOpts1 = case HaveCertAndKey of false -> SslOpts; true -> + CertOpts0 = #{ + <<"certfile">> => list_to_binary(CertFile), + <<"keyfile">> => list_to_binary(KeyFile) + }, + CertOpts = case Password of null -> CertOpts0; _ -> + CertOpts0#{<<"password">> => list_to_binary(Password)} + end, + maps:merge(SslOpts, CertOpts) + end, + #{<<"is_ssl">> => true, <<"ssl_options">> => SslOpts1}; + #url{protocol = http} -> + #{} + end. + + +-spec ssl_verify_options(true | false) -> [_]. +ssl_verify_options(true) -> + case config:get("replicator", "ssl_trusted_certificates_file") of + undefined -> + #{ + <<"verify">> => <<"verify_peer">>, + <<"cacertfile">> => null + }; + CAFile when is_list(CAFile) -> + #{ + <<"verify">> => <<"verify_peer">>, + <<"cacertfile">> => list_to_binary(CAFile) + } + end; + +ssl_verify_options(false) -> + #{ + <<"verify">> => <<"verify_none">> + }. + + +-ifdef(TEST). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). + + +check_options_pass_values_test() -> + ?assertEqual(check_options(#{}), #{}), + ?assertEqual(check_options(#{<<"baz">> => <<"foo">>}), + #{<<"baz">> => <<"foo">>}), + ?assertEqual(check_options(#{<<"doc_ids">> => [<<"x">>]}), + #{<<"doc_ids">> => [<<"x">>]}), + ?assertEqual(check_options(#{<<"filter">> => <<"f">>}), + #{<<"filter">> => <<"f">>}), + ?assertEqual(check_options(#{<<"selector">> => <<"s">>}), + #{<<"selector">> => <<"s">>}). + + +check_options_fail_values_test() -> + ?assertThrow({error, _}, + check_options(#{<<"doc_ids">> => [], <<"filter">> => <<"f">>})), + ?assertThrow({error, _}, + check_options(#{<<"doc_ids">> => [], <<"selector">> => <<"s">>})), + ?assertThrow({error, _}, + check_options(#{<<"filter">> => <<"f">>, <<"selector">> => <<"s">>})), + ?assertThrow({error, _}, + check_options(#{ + <<"doc_ids">> => [], + <<"filter">> => <<"f">>, + <<"selector">> => <<"s">>} + )). + + +check_convert_options_pass_test() -> + ?assertEqual(#{}, convert_options(#{})), + ?assertEqual(#{}, convert_options(#{<<"random">> => 42})), + ?assertEqual(#{<<"cancel">> => true}, + convert_options(#{<<"cancel">> => true})), + ?assertEqual(#{<<"create_target">> => true}, + convert_options(#{<<"create_target">> => true})), + ?assertEqual(#{<<"continuous">> => true}, + convert_options(#{<<"continuous">> => true})), + ?assertEqual(#{<<"doc_ids">> => [<<"id">>]}, + convert_options(#{<<"doc_ids">> => [<<"id">>]})), + ?assertEqual(#{<<"selector">> => #{<<"key">> => <<"value">>}}, + convert_options(#{<<"selector">> => #{<<"key">> => <<"value">>}})). + + +check_convert_options_fail_test() -> + ?assertThrow({error, _}, + convert_options(#{<<"cancel">> => <<"true">>})), + ?assertThrow({error, _}, + convert_options(#{<<"create_target">> => <<"true">>})), + ?assertThrow({error, _}, + convert_options(#{<<"continuous">> => <<"true">>})), + ?assertThrow({error, _}, + convert_options(#{<<"doc_ids">> => <<"not_a_list">>})), + ?assertThrow({error, _}, + convert_options(#{<<"selector">> => <<"bad">>})). + + +local_replication_endpoint_error_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_error_on_local_endpoint) + ] + }. + + +setup() -> + meck:expect(config, get, fun(_, _, Default) -> Default end). + + +teardown(_) -> + meck:unload(). + + +t_error_on_local_endpoint(_) -> + RepDoc = {[ + {<<"_id">>, <<"someid">>}, + {<<"source">>, <<"localdb">>}, + {<<"target">>, <<"http://somehost.local/tgt">>} + ]}, + Expect = local_endpoints_not_supported, + ?assertThrow({bad_rep_doc, Expect}, parse_rep_doc(RepDoc)). + + +-endif. |