summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-08-28 04:32:32 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2020-09-15 16:13:46 -0400
commitb38d77fbada7cce7de288d2cdcca8839b09888f4 (patch)
treea146a339cda2721ffe59ba50a2f18d394ab83ea1
parentb6e87f8a43eebb4d02dfa52227ba5b77cd4ebc68 (diff)
downloadcouchdb-b38d77fbada7cce7de288d2cdcca8839b09888f4.tar.gz
Move parsing and validation to couch_replicator_parse module
This module is in responsible for parsing either an HTTP `_replicate` request body, or a _replicator doc into an internal `Rep` object (an Erlang map). `parse_transient_rep/2` parses _replicate requests. It also handles cancelations, where requests bodies look like ```{"id": ..., "cancel": true}``` instead of having all the expected parameters. `parse_rep_doc/1` parses _replicator docs. Parsing consists of 3 main parts: - Parsing the endpoint definitions: source and target url, headers, TLS bits and proxies - Parsing options into an options map, possibly using defaults from config parameters - Parsing socket parameters. These now have a hard-coded allow-list as opposed accepting all possible Erlang socket options. The parsing function also double as validation function which gets called from the _replicator's before_doc_update callback when users update replication documents. They would get an immediate feedback if their replicationd document is malformed. Everything is turned into a map object. This object should be able to be serialized and de-serialized to (from) JSON. Since maps are used, add the definitions of some common fields couch_replicator.hrl. Mistyping them should raise a compiler error. couch_replicator_docs lost all of its parsing function and also functions which update intermediate replication doc states (triggered and error). It still handles function which relate to interacting with _replicator docs.
-rw-r--r--src/couch_replicator/src/couch_replicator.hrl102
-rw-r--r--src/couch_replicator/src/couch_replicator_docs.erl870
-rw-r--r--src/couch_replicator/src/couch_replicator_parse.erl545
3 files changed, 751 insertions, 766 deletions
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 2a5b7c8c8..28a86d91b 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -12,32 +12,80 @@
-define(REP_ID_VERSION, 4).
--record(rep, {
- id :: rep_id() | '_' | 'undefined',
- source :: any() | '_',
- target :: any() | '_',
- options :: [_] | '_',
- user_ctx :: any() | '_',
- type = db :: atom() | '_',
- view = nil :: any() | '_',
- doc_id :: any() | '_',
- db_name = null :: null | binary() | '_',
- start_time = {0, 0, 0} :: erlang:timestamp() | '_',
- stats = couch_replicator_stats:new() :: orddict:orddict() | '_'
-}).
-
--type rep_id() :: {string(), string()}.
+% Some fields from the replication doc
+-define(SOURCE, <<"source">>).
+-define(TARGET, <<"target">>).
+-define(CREATE_TARGET, <<"create_target">>).
+-define(DOC_IDS, <<"doc_ids">>).
+-define(SELECTOR, <<"selector">>).
+-define(FILTER, <<"filter">>).
+-define(QUERY_PARAMS, <<"query_params">>).
+-define(URL, <<"url">>).
+-define(AUTH, <<"auth">>).
+-define(HEADERS, <<"headers">>).
+-define(PROXY, <<"proxy">>).
+-define(SOURCE_PROXY, <<"source_proxy">>).
+-define(TARGET_PROXY, <<"target_proxy">>).
+
+-define(REPLICATION_STATE, <<"_replication_state">>).
+-define(REPLICATION_STATS, <<"_replication_stats">>).
+-define(REPLICATION_ID, <<"_replication_id">>).
+-define(REPLICATION_STATE_TIME, <<"_replication_state_time">>).
+-define(REPLICATION_STATE_REASON, <<"_replication_state_reason">>).
+
+% Replication states
+-define(ST_ERROR, <<"error">>).
+-define(ST_COMPLETED, <<"completed">>).
+-define(ST_RUNNING, <<"running">>).
+-define(ST_FAILED, <<"failed">>).
+-define(ST_PENDING, <<"pending">>).
+-define(ST_CRASHING, <<"crashing">>).
+
+% Some fields from a rep object
+-define(REP_ID, <<"rep_id">>).
+-define(BASE_ID, <<"base_id">>).
+-define(DB_NAME, <<"db_name">>).
+-define(DB_UUID, <<"db_uuid">>).
+-define(DOC_ID, <<"doc_id">>).
+-define(REP_USER, <<"rep_user">>).
+-define(START_TIME, <<"start_time">>).
+-define(OPTIONS, <<"options">>).
+
+% Fields for couch job data objects
+-define(REP, <<"rep">>).
+-define(REP_PARSE_ERROR, <<"rep_parse_error">>).
+-define(REP_STATS, <<"rep_stats">>).
+-define(STATE, <<"state">>).
+-define(STATE_INFO, <<"state_info">>).
+-define(DOC_STATE, <<"doc_state">>).
+-define(ERROR_COUNT, <<"error_count">>).
+-define(LAST_UPDATED, <<"last_updated">>).
+-define(LAST_START, <<"last_start">>).
+-define(LAST_ERROR, <<"last_error">>).
+-define(JOB_HISTORY, <<"job_history">>).
+-define(CHECKPOINT_HISTORY, <<"checkpoint_history">>).
+-define(REP_NODE, <<"node">>).
+-define(REP_PID, <<"pid">>).
+
+% Job history tags
+-define(HIST_TYPE, <<"type">>).
+-define(HIST_TIMESTAMP, <<"timestamp">>).
+-define(HIST_REASON, <<"reason">>).
+-define(HIST_ADDED, <<"added">>).
+-define(HIST_STARTED, <<"started">>).
+-define(HIST_STOPPED, <<"stopped">>).
+-define(HIST_PENDING, <<"pending">>).
+-define(HIST_CRASHED, <<"crashed">>).
+
+-define(REP_DB_NAME, <<"_replicator">>).
+
+% Can be used as a guard
+-define(IS_REP_DB(X), (X =:= ?REP_DB_NAME orelse
+ binary_part(X, {byte_size(X), -12}) =:= <<"/_replicator">>)).
+
+
+-type rep_id() :: binary().
+-type job_id() :: binary().
+-type user_name() :: binary() | null.
-type db_doc_id() :: {binary(), binary() | '_'}.
-type seconds() :: non_neg_integer().
--type rep_start_result() ::
- {ok, rep_id()} |
- ignore |
- {temporary_error, binary()} |
- {permanent_failure, binary()}.
-
-
--record(doc_worker_result, {
- id :: db_doc_id(),
- wref :: reference(),
- result :: rep_start_result()
-}).
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index 619063222..f84d1299a 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -13,306 +13,142 @@
-module(couch_replicator_docs).
-export([
- parse_rep_doc/1,
- parse_rep_doc/2,
- parse_rep_db/3,
- parse_rep_doc_without_id/1,
- parse_rep_doc_without_id/2,
+ remove_state_fields/3,
+ update_completed/4,
+ update_failed/4,
before_doc_update/3,
- after_doc_read/2,
- ensure_rep_ddoc_exists/1,
- ensure_cluster_rep_ddoc_exists/1,
- remove_state_fields/2,
- update_doc_completed/3,
- update_failed/3,
- update_rep_id/1,
- update_triggered/2,
- update_error/2
+ after_doc_read/2
]).
-include_lib("couch/include/couch_db.hrl").
--include_lib("ibrowse/include/ibrowse.hrl").
--include_lib("mem3/include/mem3.hrl").
-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
-
--import(couch_util, [
- get_value/2,
- get_value/3,
- to_binary/1
-]).
-
--import(couch_replicator_utils, [
- get_json_value/2,
- get_json_value/3
-]).
--define(REP_DB_NAME, <<"_replicator">>).
--define(REP_DESIGN_DOC, <<"_design/_replicator">>).
-define(OWNER, <<"owner">>).
-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-remove_state_fields(DbName, DocId) ->
- update_rep_doc(DbName, DocId, [
- {<<"_replication_state">>, undefined},
- {<<"_replication_state_time">>, undefined},
- {<<"_replication_state_reason">>, undefined},
- {<<"_replication_id">>, undefined},
- {<<"_replication_stats">>, undefined}]).
+remove_state_fields(null, null, null) ->
+ ok;
+remove_state_fields(DbName, DbUUID, DocId) ->
+ update_rep_doc(DbName, DbUUID, DocId, [
+ {?REPLICATION_STATE, undefined},
+ {?REPLICATION_STATE_TIME, undefined},
+ {?REPLICATION_STATE_REASON, undefined},
+ {?REPLICATION_ID, undefined},
+ {?REPLICATION_STATS, undefined}
+ ]),
+ ok.
--spec update_doc_completed(binary(), binary(), [_]) -> any().
-update_doc_completed(DbName, DocId, Stats) ->
- update_rep_doc(DbName, DocId, [
- {<<"_replication_state">>, <<"completed">>},
- {<<"_replication_state_reason">>, undefined},
- {<<"_replication_stats">>, {Stats}}]),
- couch_stats:increment_counter([couch_replicator, docs,
- completed_state_updates]).
+-spec update_completed(binary(), binary(), binary(), [_]) -> ok.
+update_completed(null, null, _, _) ->
+ ok;
--spec update_failed(binary(), binary(), any()) -> any().
-update_failed(DbName, DocId, Error) ->
- Reason = error_reason(Error),
- couch_log:error("Error processing replication doc `~s` from `~s`: ~s",
- [DocId, DbName, Reason]),
- update_rep_doc(DbName, DocId, [
- {<<"_replication_state">>, <<"failed">>},
- {<<"_replication_stats">>, undefined},
- {<<"_replication_state_reason">>, Reason}]),
+update_completed(DbName, DbUUID, DocId, #{} = Stats0) ->
+ Stats = {maps:to_list(Stats0)},
+ update_rep_doc(DbName, DbUUID, DocId, [
+ {?REPLICATION_STATE, ?ST_COMPLETED},
+ {?REPLICATION_STATE_REASON, undefined},
+ {?REPLICATION_STATS, Stats}]),
couch_stats:increment_counter([couch_replicator, docs,
- failed_state_updates]).
-
-
--spec update_triggered(#rep{}, rep_id()) -> ok.
-update_triggered(Rep, {Base, Ext}) ->
- #rep{
- db_name = DbName,
- doc_id = DocId
- } = Rep,
- update_rep_doc(DbName, DocId, [
- {<<"_replication_state">>, <<"triggered">>},
- {<<"_replication_state_reason">>, undefined},
- {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
- {<<"_replication_stats">>, undefined}]),
+ completed_state_updates
+ ]),
ok.
--spec update_error(#rep{}, any()) -> ok.
-update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
- Reason = error_reason(Error),
- BinRepId = case RepId of
- {Base, Ext} ->
- iolist_to_binary([Base, Ext]);
- _Other ->
- null
- end,
- update_rep_doc(DbName, DocId, [
- {<<"_replication_state">>, <<"error">>},
- {<<"_replication_state_reason">>, Reason},
- {<<"_replication_stats">>, undefined},
- {<<"_replication_id">>, BinRepId}]),
- ok.
-
+-spec update_failed(binary(), binary(), binary(), any()) -> ok.
+update_failed(null, null, null, _) ->
+ ok;
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
- case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
- true ->
- ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
- false ->
- ok
- end.
-
-
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
- case open_rep_doc(RepDb, DDocId) of
- {not_found, no_db_file} ->
- %% database was deleted.
- ok;
- {not_found, _Reason} ->
- DocProps = replication_design_doc_props(DDocId),
- DDoc = couch_doc:from_json_obj({DocProps}),
- couch_log:notice("creating replicator ddoc ~p", [RepDb]),
- {ok, _Rev} = save_rep_doc(RepDb, DDoc);
- {ok, Doc} ->
- Latest = replication_design_doc_props(DDocId),
- {Props0} = couch_doc:to_json_obj(Doc, []),
- {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
- case compare_ejson({Props}, {Latest}) of
- true ->
- ok;
- false ->
- LatestWithRev = [{<<"_rev">>, Rev} | Latest],
- DDoc = couch_doc:from_json_obj({LatestWithRev}),
- couch_log:notice("updating replicator ddoc ~p", [RepDb]),
- try
- {ok, _} = save_rep_doc(RepDb, DDoc)
- catch
- throw:conflict ->
- %% ignore, we'll retry next time
- ok
- end
- end
- end,
+update_failed(DbName, DbUUID, DocId, Error) ->
+ Reason = error_reason(Error),
+ couch_log:error("Error processing replication doc `~s` from `~s`: ~s",
+ [DocId, DbName, Reason]),
+ update_rep_doc(DbName, DbUUID, DocId, [
+ {?REPLICATION_STATE, ?ST_FAILED},
+ {?REPLICATION_STATS, undefined},
+ {?REPLICATION_STATE_REASON, Reason}
+ ]),
+ couch_stats:increment_counter([couch_replicator, docs,
+ failed_state_updates]),
ok.
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
- DDocId = ?REP_DESIGN_DOC,
- [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
- ensure_rep_ddoc_exists(DbShard, DDocId).
-
-
--spec compare_ejson({[_]}, {[_]}) -> boolean().
-compare_ejson(EJson1, EJson2) ->
- EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
- EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
- EjsonSorted1 == EjsonSorted2.
-
-
--spec replication_design_doc_props(binary()) -> [_].
-replication_design_doc_props(DDocId) ->
- [
- {<<"_id">>, DDocId},
- {<<"language">>, <<"javascript">>},
- {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
- ].
-
+-spec before_doc_update(#doc{}, Db::any(), couch_db:update_type()) -> #doc{}.
+before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _, _) ->
+ Doc;
+before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
+ #user_ctx{roles = Roles, name = Name} = fabric2_db:get_user_ctx(Db),
+ IsReplicator = lists:member(<<"_replicator">>, Roles),
-% Note: parse_rep_doc can handle filtered replications. During parsing of the
-% replication doc it will make possibly remote http requests to the source
-% database. If failure or parsing of filter docs fails, parse_doc throws a
-% {filter_fetch_error, Error} excation. This exception should be considered
-% transient in respect to the contents of the document itself, since it depends
-% on netowrk availability of the source db and other factors.
--spec parse_rep_doc({[_]}) -> #rep{}.
-parse_rep_doc(RepDoc) ->
- {ok, Rep} = try
- parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
- catch
- throw:{error, Reason} ->
- throw({bad_rep_doc, Reason});
- throw:{filter_fetch_error, Reason} ->
- throw({filter_fetch_error, Reason});
- Tag:Err ->
- throw({bad_rep_doc, to_binary({Tag, Err})})
+ Doc1 = case IsReplicator of true -> Doc; false ->
+ case couch_util:get_value(?OWNER, Body) of
+ undefined ->
+ Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+ Name ->
+ Doc;
+ Other ->
+ case (catch fabric2_db:check_is_admin(Db)) of
+ ok when Other =:= null ->
+ Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+ ok ->
+ Doc;
+ _ ->
+ throw({forbidden, <<"Can't update replication",
+ "documents from other users.">>})
+ end
+ end
end,
- Rep.
-
--spec parse_rep_doc_without_id({[_]}) -> #rep{}.
-parse_rep_doc_without_id(RepDoc) ->
- {ok, Rep} = try
- parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
- catch
- throw:{error, Reason} ->
- throw({bad_rep_doc, Reason});
- Tag:Err ->
- throw({bad_rep_doc, to_binary({Tag, Err})})
+ Deleted = Doc1#doc.deleted,
+ IsFailed = couch_util:get_value(?REPLICATION_STATE, Body) == ?ST_FAILED,
+ case IsReplicator orelse Deleted orelse IsFailed of true -> ok; false ->
+ try
+ couch_replicator_parse:parse_rep_doc(Doc1#doc.body)
+ catch
+ throw:{bad_rep_doc, Error} ->
+ throw({forbidden, Error})
+ end
end,
- Rep.
-
-
--spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc(Doc, UserCtx) ->
- {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
- Cancel = get_value(cancel, Rep#rep.options, false),
- Id = get_value(id, Rep#rep.options, nil),
- case {Cancel, Id} of
- {true, nil} ->
- % Cancel request with no id, must parse id out of body contents
- {ok, update_rep_id(Rep)};
- {true, Id} ->
- % Cancel request with an id specified, so do not parse id from body
- {ok, Rep};
- {false, _Id} ->
- % Not a cancel request, regular replication doc
- {ok, update_rep_id(Rep)}
- end.
-
-
--spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc_without_id({Props}, UserCtx) ->
- {SrcProxy, TgtProxy} = parse_proxy_settings(Props),
- Opts = make_options(Props),
- case get_value(cancel, Opts, false) andalso
- (get_value(id, Opts, nil) =/= nil) of
- true ->
- {ok, #rep{options = Opts, user_ctx = UserCtx}};
- false ->
- Source = parse_rep_db(get_value(<<"source">>, Props), SrcProxy, Opts),
- Target = parse_rep_db(get_value(<<"target">>, Props), TgtProxy, Opts),
- {Type, View} = case couch_replicator_filters:view_type(Props, Opts) of
- {error, Error} ->
- throw({bad_request, Error});
- Result ->
- Result
- end,
- Rep = #rep{
- source = Source,
- target = Target,
- options = Opts,
- user_ctx = UserCtx,
- type = Type,
- view = View,
- doc_id = get_value(<<"_id">>, Props, null)
- },
- % Check if can parse filter code, if not throw exception
- case couch_replicator_filters:parse(Opts) of
- {error, FilterError} ->
- throw({error, FilterError});
- {ok, _Filter} ->
- ok
- end,
- {ok, Rep}
- end.
+ Doc1.
-parse_proxy_settings(Props) when is_list(Props) ->
- Proxy = get_value(<<"proxy">>, Props, <<>>),
- SrcProxy = get_value(<<"source_proxy">>, Props, <<>>),
- TgtProxy = get_value(<<"target_proxy">>, Props, <<>>),
-
- case Proxy =/= <<>> of
- true when SrcProxy =/= <<>> ->
- Error = "`proxy` is mutually exclusive with `source_proxy`",
- throw({bad_request, Error});
- true when TgtProxy =/= <<>> ->
- Error = "`proxy` is mutually exclusive with `target_proxy`",
- throw({bad_request, Error});
- true ->
- {Proxy, Proxy};
- false ->
- {SrcProxy, TgtProxy}
+-spec after_doc_read(#doc{}, Db::any()) -> #doc{}.
+after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
+ Doc;
+after_doc_read(#doc{body = {Body}} = Doc, Db) ->
+ #user_ctx{name = Name} = fabric2_db:get_user_ctx(Db),
+ case (catch fabric2_db:check_is_admin(Db)) of ok -> Doc; _ ->
+ case couch_util:get_value(?OWNER, Body) of Name -> Doc; _ ->
+ Source0 = couch_util:get_value(<<"source">>, Body),
+ Target0 = couch_util:get_value(<<"target">>, Body),
+ Source = strip_credentials(Source0),
+ Target = strip_credentials(Target0),
+ NewBody0 = ?replace(Body, <<"source">>, Source),
+ NewBody = ?replace(NewBody0, <<"target">>, Target),
+ #doc{revs = {Pos, [_ | Revs]}} = Doc,
+ NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
+ fabric2_db:new_revid(Db, NewDoc)
+ end
end.
-% Update a #rep{} record with a replication_id. Calculating the id might involve
-% fetching a filter from the source db, and so it could fail intermetently.
-% In case of a failure to fetch the filter this function will throw a
-% `{filter_fetch_error, Reason} exception.
-update_rep_id(Rep) ->
- RepId = couch_replicator_ids:replication_id(Rep),
- Rep#rep{id = RepId}.
+update_rep_doc(RepDbName, RepDbUUID, RepDocId, KVs) ->
+ update_rep_doc(RepDbName, RepDbUUID, RepDocId, KVs, 1).
-update_rep_doc(RepDbName, RepDocId, KVs) ->
- update_rep_doc(RepDbName, RepDocId, KVs, 1).
-
-
-update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) ->
+update_rep_doc(RepDbName, RepDbUUID, RepDocId, KVs, Wait)
+ when is_binary(RepDocId) ->
try
- case open_rep_doc(RepDbName, RepDocId) of
+ case open_rep_doc(RepDbName, RepDbUUID, RepDocId) of
{ok, LastRepDoc} ->
- update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2);
+ update_rep_doc(RepDbName, RepDbUUID, LastRepDoc, KVs,
+ Wait * 2);
_ ->
ok
end
@@ -321,25 +157,25 @@ update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) ->
Msg = "Conflict when updating replication doc `~s`. Retrying.",
couch_log:error(Msg, [RepDocId]),
ok = timer:sleep(couch_rand:uniform(erlang:min(128, Wait)) * 100),
- update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2)
+ update_rep_doc(RepDbName, RepDbUUID, RepDocId, KVs, Wait * 2)
end;
-update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
+update_rep_doc(RepDbName, RepDbUUID, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
NewRepDocBody = lists:foldl(
- fun({K, undefined}, Body) ->
+ fun({K, undefined}, Body) when is_binary(K) ->
lists:keydelete(K, 1, Body);
- ({<<"_replication_state">> = K, State} = KV, Body) ->
- case get_json_value(K, Body) of
+ ({?REPLICATION_STATE = K, State} = KV, Body) when is_binary(K) ->
+ case couch_util:get_value(K, Body) of
State ->
Body;
_ ->
Body1 = lists:keystore(K, 1, Body, KV),
- Timestamp = couch_replicator_utils:iso8601(os:timestamp()),
+ Timestamp = couch_replicator_utils:iso8601(),
lists:keystore(
- <<"_replication_state_time">>, 1, Body1,
- {<<"_replication_state_time">>, Timestamp})
+ ?REPLICATION_STATE_TIME, 1, Body1,
+ {?REPLICATION_STATE_TIME, Timestamp})
end;
- ({K, _V} = KV, Body) ->
+ ({K, _V} = KV, Body) when is_binary(K) ->
lists:keystore(K, 1, Body, KV)
end,
RepDocBody, KVs),
@@ -349,331 +185,37 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
_ ->
% Might not succeed - when the replication doc is deleted right
% before this update (not an error, ignore).
- save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
+ save_rep_doc(RepDbName, RepDbUUID, RepDoc#doc{body = {NewRepDocBody}})
end.
-open_rep_doc(DbName, DocId) ->
- case couch_db:open_int(DbName, [?CTX, sys_db]) of
- {ok, Db} ->
- try
- couch_db:open_doc(Db, DocId, [ejson_body])
- after
- couch_db:close(Db)
- end;
- Else ->
- Else
+open_rep_doc(DbName, DbUUID, DocId) when is_binary(DbName), is_binary(DbUUID),
+ is_binary(DocId) ->
+ try
+ case fabric2_db:open(DbName, [?CTX, sys_db, {uuid, DbUUID}]) of
+ {ok, Db} -> fabric2_db:open_doc(Db, DocId, [ejson_body]);
+ Else -> Else
+ end
+ catch
+ error:database_does_not_exist ->
+ {not_found, database_does_not_exist}
end.
-save_rep_doc(DbName, Doc) ->
- {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
+save_rep_doc(DbName, DbUUID, Doc) when is_binary(DbName), is_binary(DbUUID) ->
try
- couch_db:update_doc(Db, Doc, [])
+ {ok, Db} = fabric2_db:open(DbName, [?CTX, sys_db, {uuid, DbUUID}]),
+ fabric2_db:update_doc(Db, Doc, [])
catch
+ error:database_does_not_exist ->
+ {not_found, database_does_not_exist};
% User can accidently write a VDU which prevents _replicator from
% updating replication documents. Avoid crashing replicator and thus
% preventing all other replication jobs on the node from running.
throw:{forbidden, Reason} ->
- Msg = "~p VDU function preventing doc update to ~s ~s ~p",
+ Msg = "~p VDU or BDU function preventing doc update to ~s ~s ~p",
couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]),
{ok, forbidden}
- after
- couch_db:close(Db)
- end.
-
-
--spec rep_user_ctx({[_]}) -> #user_ctx{}.
-rep_user_ctx({RepDoc}) ->
- case get_json_value(<<"user_ctx">>, RepDoc) of
- undefined ->
- #user_ctx{};
- {UserCtx} ->
- #user_ctx{
- name = get_json_value(<<"name">>, UserCtx, null),
- roles = get_json_value(<<"roles">>, UserCtx, [])
- }
- end.
-
-
--spec parse_rep_db({[_]} | binary(), binary(), [_]) -> #httpd{} | binary().
-parse_rep_db({Props}, Proxy, Options) ->
- ProxyParams = parse_proxy_params(Proxy),
- ProxyURL = case ProxyParams of
- [] -> undefined;
- _ -> binary_to_list(Proxy)
- end,
- Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
- {AuthProps} = get_value(<<"auth">>, Props, {[]}),
- {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
- Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
- DefaultHeaders = (#httpdb{})#httpdb.headers,
- #httpdb{
- url = Url,
- auth_props = AuthProps,
- headers = lists:ukeymerge(1, Headers, DefaultHeaders),
- ibrowse_options = lists:keysort(1,
- [{socket_options, get_value(socket_options, Options)} |
- ProxyParams ++ ssl_params(Url)]),
- timeout = get_value(connection_timeout, Options),
- http_connections = get_value(http_connections, Options),
- retries = get_value(retries, Options),
- proxy_url = ProxyURL
- };
-
-parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
- parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
-
-parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
- parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
-
-parse_rep_db(<<_/binary>>, _Proxy, _Options) ->
- throw({error, local_endpoints_not_supported});
-
-parse_rep_db(undefined, _Proxy, _Options) ->
- throw({error, <<"Missing replicator database">>}).
-
-
--spec maybe_add_trailing_slash(binary() | list()) -> list().
-maybe_add_trailing_slash(Url) when is_binary(Url) ->
- maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
- case lists:member($?, Url) of
- true ->
- Url; % skip if there are query params
- false ->
- case lists:last(Url) of
- $/ ->
- Url;
- _ ->
- Url ++ "/"
- end
- end.
-
-
--spec make_options([_]) -> [_].
-make_options(Props) ->
- Options0 = lists:ukeysort(1, convert_options(Props)),
- Options = check_options(Options0),
- DefWorkers = config:get("replicator", "worker_processes", "4"),
- DefBatchSize = config:get("replicator", "worker_batch_size", "500"),
- DefConns = config:get("replicator", "http_connections", "20"),
- DefTimeout = config:get("replicator", "connection_timeout", "30000"),
- DefRetries = config:get("replicator", "retries_per_request", "5"),
- UseCheckpoints = config:get("replicator", "use_checkpoints", "true"),
- DefCheckpointInterval = config:get("replicator", "checkpoint_interval",
- "30000"),
- {ok, DefSocketOptions} = couch_util:parse_term(
- config:get("replicator", "socket_options",
- "[{keepalive, true}, {nodelay, false}]")),
- lists:ukeymerge(1, Options, lists:keysort(1, [
- {connection_timeout, list_to_integer(DefTimeout)},
- {retries, list_to_integer(DefRetries)},
- {http_connections, list_to_integer(DefConns)},
- {socket_options, DefSocketOptions},
- {worker_batch_size, list_to_integer(DefBatchSize)},
- {worker_processes, list_to_integer(DefWorkers)},
- {use_checkpoints, list_to_existing_atom(UseCheckpoints)},
- {checkpoint_interval, list_to_integer(DefCheckpointInterval)}
- ])).
-
-
--spec convert_options([_]) -> [_].
-convert_options([])->
- [];
-convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V)->
- throw({bad_request, <<"parameter `cancel` must be a boolean">>});
-convert_options([{<<"cancel">>, V} | R]) ->
- [{cancel, V} | convert_options(R)];
-convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
- IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
- [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)->
- throw({bad_request, <<"parameter `create_target` must be a boolean">>});
-convert_options([{<<"create_target">>, V} | R]) ->
- [{create_target, V} | convert_options(R)];
-convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) ->
- throw({bad_request,
- <<"parameter `create_target_params` must be a JSON object">>});
-convert_options([{<<"create_target_params">>, V} | R]) ->
- [{create_target_params, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V)->
- throw({bad_request, <<"parameter `continuous` must be a boolean">>});
-convert_options([{<<"continuous">>, V} | R]) ->
- [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
- [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
- [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, null} | R]) ->
- convert_options(R);
-convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
- throw({bad_request, <<"parameter `doc_ids` must be an array">>});
-convert_options([{<<"doc_ids">>, V} | R]) ->
- % Ensure same behaviour as old replicator: accept a list of percent
- % encoded doc IDs.
- DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]),
- [{doc_ids, DocIds} | convert_options(R)];
-convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
- throw({bad_request, <<"parameter `selector` must be a JSON object">>});
-convert_options([{<<"selector">>, V} | R]) ->
- [{selector, V} | convert_options(R)];
-convert_options([{<<"worker_processes">>, V} | R]) ->
- [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"worker_batch_size">>, V} | R]) ->
- [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_connections">>, V} | R]) ->
- [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"connection_timeout">>, V} | R]) ->
- [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"retries_per_request">>, V} | R]) ->
- [{retries, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"socket_options">>, V} | R]) ->
- {ok, SocketOptions} = couch_util:parse_term(V),
- [{socket_options, SocketOptions} | convert_options(R)];
-convert_options([{<<"since_seq">>, V} | R]) ->
- [{since_seq, V} | convert_options(R)];
-convert_options([{<<"use_checkpoints">>, V} | R]) ->
- [{use_checkpoints, V} | convert_options(R)];
-convert_options([{<<"checkpoint_interval">>, V} | R]) ->
- [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([_ | R]) -> % skip unknown option
- convert_options(R).
-
-
--spec check_options([_]) -> [_].
-check_options(Options) ->
- DocIds = lists:keyfind(doc_ids, 1, Options),
- Filter = lists:keyfind(filter, 1, Options),
- Selector = lists:keyfind(selector, 1, Options),
- case {DocIds, Filter, Selector} of
- {false, false, false} -> Options;
- {false, false, _} -> Options;
- {false, _, false} -> Options;
- {_, false, false} -> Options;
- _ ->
- throw({bad_request,
- "`doc_ids`,`filter`,`selector` are mutually exclusive"})
- end.
-
-
--spec parse_proxy_params(binary() | [_]) -> [_].
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
- parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
- [];
-parse_proxy_params(ProxyUrl) ->
- #url{
- host = Host,
- port = Port,
- username = User,
- password = Passwd,
- protocol = Protocol
- } = ibrowse_lib:parse_url(ProxyUrl),
- [
- {proxy_protocol, Protocol},
- {proxy_host, Host},
- {proxy_port, Port}
- ] ++ case is_list(User) andalso is_list(Passwd) of
- false ->
- [];
- true ->
- [{proxy_user, User}, {proxy_password, Passwd}]
- end.
-
-
--spec ssl_params([_]) -> [_].
-ssl_params(Url) ->
- case ibrowse_lib:parse_url(Url) of
- #url{protocol = https} ->
- Depth = list_to_integer(
- config:get("replicator", "ssl_certificate_max_depth", "3")
- ),
- VerifyCerts = config:get("replicator", "verify_ssl_certificates"),
- CertFile = config:get("replicator", "cert_file", undefined),
- KeyFile = config:get("replicator", "key_file", undefined),
- Password = config:get("replicator", "password", undefined),
- SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
- SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of
- true ->
- case Password of
- undefined ->
- [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
- _ ->
- [{certfile, CertFile}, {keyfile, KeyFile},
- {password, Password}] ++ SslOpts
- end;
- false -> SslOpts
- end,
- [{is_ssl, true}, {ssl_options, SslOpts1}];
- #url{protocol = http} ->
- []
- end.
-
-
--spec ssl_verify_options(true | false) -> [_].
-ssl_verify_options(true) ->
- CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
- [{verify, verify_peer}, {cacertfile, CAFile}];
-ssl_verify_options(false) ->
- [{verify, verify_none}].
-
-
--spec before_doc_update(#doc{}, Db::any(), couch_db:update_type()) -> #doc{}.
-before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) ->
- Doc;
-before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
- #user_ctx{
- roles = Roles,
- name = Name
- } = couch_db:get_user_ctx(Db),
- case lists:member(<<"_replicator">>, Roles) of
- true ->
- Doc;
- false ->
- case couch_util:get_value(?OWNER, Body) of
- undefined ->
- Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
- Name ->
- Doc;
- Other ->
- case (catch couch_db:check_is_admin(Db)) of
- ok when Other =:= null ->
- Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
- ok ->
- Doc;
- _ ->
- throw({forbidden, <<"Can't update replication documents",
- " from other users.">>})
- end
- end
- end.
-
-
--spec after_doc_read(#doc{}, Db::any()) -> #doc{}.
-after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
- Doc;
-after_doc_read(#doc{body = {Body}} = Doc, Db) ->
- #user_ctx{name = Name} = couch_db:get_user_ctx(Db),
- case (catch couch_db:check_is_admin(Db)) of
- ok ->
- Doc;
- _ ->
- case couch_util:get_value(?OWNER, Body) of
- Name ->
- Doc;
- _Other ->
- Source = strip_credentials(couch_util:get_value(<<"source">>,
-Body)),
- Target = strip_credentials(couch_util:get_value(<<"target">>,
-Body)),
- NewBody0 = ?replace(Body, <<"source">>, Source),
- NewBody = ?replace(NewBody0, <<"target">>, Target),
- #doc{revs = {Pos, [_ | Revs]}} = Doc,
- NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
- NewRevId = couch_db:new_revid(NewDoc),
- NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
- end
end.
@@ -698,164 +240,14 @@ strip_credentials({Props0}) ->
error_reason({shutdown, Error}) ->
error_reason(Error);
error_reason({bad_rep_doc, Reason}) ->
- to_binary(Reason);
+ couch_util:to_binary(Reason);
+error_reason(#{<<"error">> := Error, <<"reason">> := Reason})
+ when is_binary(Error), is_binary(Reason) ->
+ couch_util:to_binary(io_list:format("~s: ~s", [Error, Reason]));
error_reason({error, {Error, Reason}})
- when is_atom(Error), is_binary(Reason) ->
- to_binary(io_lib:format("~s: ~s", [Error, Reason]));
+ when is_atom(Error), is_binary(Reason) ->
+ couch_util:to_binary(io_lib:format("~s: ~s", [Error, Reason]));
error_reason({error, Reason}) ->
- to_binary(Reason);
+ couch_util:to_binary(Reason);
error_reason(Reason) ->
- to_binary(Reason).
-
-
--ifdef(TEST).
-
-
--include_lib("couch/include/couch_eunit.hrl").
-
-
-check_options_pass_values_test() ->
- ?assertEqual(check_options([]), []),
- ?assertEqual(check_options([baz, {other, fiz}]), [baz, {other, fiz}]),
- ?assertEqual(check_options([{doc_ids, x}]), [{doc_ids, x}]),
- ?assertEqual(check_options([{filter, x}]), [{filter, x}]),
- ?assertEqual(check_options([{selector, x}]), [{selector, x}]).
-
-
-check_options_fail_values_test() ->
- ?assertThrow({bad_request, _},
- check_options([{doc_ids, x}, {filter, y}])),
- ?assertThrow({bad_request, _},
- check_options([{doc_ids, x}, {selector, y}])),
- ?assertThrow({bad_request, _},
- check_options([{filter, x}, {selector, y}])),
- ?assertThrow({bad_request, _},
- check_options([{doc_ids, x}, {selector, y}, {filter, z}])).
-
-
-check_convert_options_pass_test() ->
- ?assertEqual([], convert_options([])),
- ?assertEqual([], convert_options([{<<"random">>, 42}])),
- ?assertEqual([{cancel, true}],
- convert_options([{<<"cancel">>, true}])),
- ?assertEqual([{create_target, true}],
- convert_options([{<<"create_target">>, true}])),
- ?assertEqual([{continuous, true}],
- convert_options([{<<"continuous">>, true}])),
- ?assertEqual([{doc_ids, [<<"id">>]}],
- convert_options([{<<"doc_ids">>, [<<"id">>]}])),
- ?assertEqual([{selector, {key, value}}],
- convert_options([{<<"selector">>, {key, value}}])).
-
-
-check_convert_options_fail_test() ->
- ?assertThrow({bad_request, _},
- convert_options([{<<"cancel">>, <<"true">>}])),
- ?assertThrow({bad_request, _},
- convert_options([{<<"create_target">>, <<"true">>}])),
- ?assertThrow({bad_request, _},
- convert_options([{<<"continuous">>, <<"true">>}])),
- ?assertThrow({bad_request, _},
- convert_options([{<<"doc_ids">>, not_a_list}])),
- ?assertThrow({bad_request, _},
- convert_options([{<<"selector">>, [{key, value}]}])).
-
-check_strip_credentials_test() ->
- [?assertEqual(Expected, strip_credentials(Body)) || {Expected, Body} <- [
- {
- undefined,
- undefined
- },
- {
- <<"https://remote_server/database">>,
- <<"https://foo:bar@remote_server/database">>
- },
- {
- {[{<<"_id">>, <<"foo">>}]},
- {[{<<"_id">>, <<"foo">>}, {<<"headers">>, <<"bar">>}]}
- },
- {
- {[{<<"_id">>, <<"foo">>}, {<<"other">>, <<"bar">>}]},
- {[{<<"_id">>, <<"foo">>}, {<<"other">>, <<"bar">>}]}
- },
- {
- {[{<<"_id">>, <<"foo">>}]},
- {[{<<"_id">>, <<"foo">>}, {<<"headers">>, <<"baz">>}]}
- },
- {
- {[{<<"_id">>, <<"foo">>}]},
- {[{<<"_id">>, <<"foo">>}, {<<"auth">>, <<"pluginsecret">>}]}
- }
- ]].
-
-
-setup() ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- create_vdu(DbName),
- DbName.
-
-
-teardown(DbName) when is_binary(DbName) ->
- couch_server:delete(DbName, [?ADMIN_CTX]),
- ok.
-
-
-create_vdu(DbName) ->
- couch_util:with_db(DbName, fun(Db) ->
- VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
- Doc = #doc{
- id = <<"_design/vdu">>,
- body = {[{<<"validate_doc_update">>, VduFun}]}
- },
- {ok, _} = couch_db:update_docs(Db, [Doc])
- end).
-
-
-update_replicator_doc_with_bad_vdu_test_() ->
- {
- setup,
- fun test_util:start_couch/0,
- fun test_util:stop_couch/1,
- {
- foreach, fun setup/0, fun teardown/1,
- [
- fun t_vdu_does_not_crash_on_save/1
- ]
- }
- }.
-
-
-t_vdu_does_not_crash_on_save(DbName) ->
- ?_test(begin
- Doc = #doc{id = <<"some_id">>, body = {[{<<"foo">>, 42}]}},
- ?assertEqual({ok, forbidden}, save_rep_doc(DbName, Doc))
- end).
-
-
-local_replication_endpoint_error_test_() ->
- {
- foreach,
- fun () -> meck:expect(config, get,
- fun(_, _, Default) -> Default end)
- end,
- fun (_) -> meck:unload() end,
- [
- t_error_on_local_endpoint()
- ]
- }.
-
-
-t_error_on_local_endpoint() ->
- ?_test(begin
- RepDoc = {[
- {<<"_id">>, <<"someid">>},
- {<<"source">>, <<"localdb">>},
- {<<"target">>, <<"http://somehost.local/tgt">>}
- ]},
- Expect = local_endpoints_not_supported,
- ?assertThrow({bad_rep_doc, Expect}, parse_rep_doc_without_id(RepDoc))
- end).
-
--endif.
+ couch_util:to_binary(Reason).
diff --git a/src/couch_replicator/src/couch_replicator_parse.erl b/src/couch_replicator/src/couch_replicator_parse.erl
new file mode 100644
index 000000000..5996ec507
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_parse.erl
@@ -0,0 +1,545 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_parse).
+
+
+-export([
+ parse_rep_doc/1,
+ parse_transient_rep/2,
+ parse_rep/2,
+ parse_rep_db/3
+]).
+
+
+-include_lib("ibrowse/include/ibrowse.hrl").
+-include("couch_replicator.hrl").
+
+
+-define(DEFAULT_SOCK_OPTS, "[{keepalive, true}, {nodelay, false}]").
+-define(VALID_SOCK_OPTS, [
+ buffer,
+ delay_send,
+ exit_on_close,
+ ipv6_v6only,
+ keepalive,
+ nodelay,
+ recbuf,
+ send_timeout,
+ send_timout_close,
+ sndbuf,
+ priority,
+ tos,
+ tclass
+]).
+-define(VALID_PROXY_PROTOCOLS, [http, https, socks5]).
+-define(CONFIG_DEFAULTS, [
+ {"worker_processes", "4", fun list_to_integer/1},
+ {"worker_batch_size", "500", fun list_to_integer/1},
+ {"http_connections", "20", fun list_to_integer/1},
+ {"connection_timeout", "30000", fun list_to_integer/1},
+ {"retries_per_request", "5", fun list_to_integer/1},
+ {"use_checkpoints", "true", fun list_to_existing_atom/1},
+ {"checkpoint_interval", "30000", fun list_to_integer/1},
+ {"socket_options", ?DEFAULT_SOCK_OPTS, fun parse_sock_opts/1}
+]).
+
+
+-spec parse_rep_doc({[_]}) -> #{}.
+parse_rep_doc(RepDoc) ->
+ {ok, Rep} = try
+ parse_rep(RepDoc, null)
+ catch
+ throw:{error, Reason} ->
+ Stack = erlang:get_stacktrace(),
+ LogErr1 = "~p parse_rep_doc fail ~p ~p",
+ couch_log:error(LogErr1, [?MODULE, Reason, Stack]),
+ throw({bad_rep_doc, Reason});
+ Tag:Err ->
+ Stack = erlang:get_stacktrace(),
+ LogErr2 = "~p parse_rep_doc fail ~p:~p ~p",
+ couch_log:error(LogErr2, [?MODULE, Tag, Err, Stack]),
+ throw({bad_rep_doc, couch_util:to_binary({Tag, Err})})
+ end,
+ Rep.
+
+
+-spec parse_transient_rep({[_]} | #{}, user_name()) -> {ok, #{}}.
+parse_transient_rep({Props} = EJson, UserName) when is_list(Props) ->
+ Str = couch_util:json_encode(EJson),
+ Map = couch_util:json_decode(Str, [return_maps]),
+ parse_transient_rep(Map, UserName);
+
+parse_transient_rep(#{} = Body, UserName) ->
+ {ok, Rep} = try
+ parse_rep(Body, UserName)
+ catch
+ throw:{error, Reason} ->
+ Stack = erlang:get_stacktrace(),
+ LogErr1 = "~p parse_transient_rep fail ~p ~p",
+ couch_log:error(LogErr1, [?MODULE, Reason, Stack]),
+ throw({bad_request, Reason});
+ Tag:Err ->
+ Stack = erlang:get_stacktrace(),
+ LogErr2 = "~p parse_transient_rep fail ~p ~p",
+ couch_log:error(LogErr2, [?MODULE, Tag, Err, Stack]),
+ throw({bad_request, couch_util:to_binary({Tag, Err})})
+ end,
+ #{?OPTIONS := Options} = Rep,
+ Cancel = maps:get(<<"cancel">>, Options, false),
+ Id = maps:get(<<"id">>, Options, nil),
+ case {Cancel, Id} of
+ {true, nil} ->
+ % Cancel request with no id, must parse id out of body contents
+ JobId = couch_replicator_ids:job_id(Rep, null, null),
+ {ok, JobId, Rep};
+ {true, Id} ->
+ % Cancel request with an id specified, so do not parse id from body
+ {ok, Id, Rep};
+ {false, _Id} ->
+ JobId = couch_replicator_ids:job_id(Rep, null, null),
+ % Not a cancel request, regular replication doc
+ {ok, JobId, Rep}
+ end.
+
+
+-spec parse_rep({[_]} | #{}, user_name()) -> {ok, #{}}.
+parse_rep({Props} = EJson, UserName) when is_list(Props) ->
+ Str = couch_util:json_encode(EJson),
+ Map = couch_util:json_decode(Str, [return_maps]),
+ parse_rep(Map, UserName);
+
+parse_rep(#{} = Doc, UserName) ->
+ {SrcProxy, TgtProxy} = parse_proxy_settings(Doc),
+ Opts = make_options(Doc),
+ Cancel = maps:get(<<"cancel">>, Opts, false),
+ Id = maps:get(<<"id">>, Opts, nil),
+ case Cancel andalso Id =/= nil of
+ true ->
+ {ok, #{?OPTIONS => Opts, ?REP_USER => UserName}};
+ false ->
+ case {maps:is_key(?SOURCE, Doc), maps:is_key(?TARGET, Doc)} of
+ {false, _} -> throw({error, <<"Missing `source` field">>});
+ {_, false} -> throw({error, <<"Missing `target` field">>});
+ {true, true} -> ok
+ end,
+ #{?SOURCE := Source0, ?TARGET := Target0} = Doc,
+ Source = parse_rep_db(Source0, SrcProxy, Opts),
+ Target = parse_rep_db(Target0, TgtProxy, Opts),
+ case couch_replicator_filters:view_type(Doc, Opts) of
+ {error, Error} -> throw({error, Error});
+ _ -> ok
+ end,
+ case couch_replicator_filters:parse(Opts) of
+ {ok, _} -> ok;
+ {error, FilterError} -> throw({error, FilterError})
+ end,
+ Rep = #{
+ ?SOURCE => Source,
+ ?TARGET => Target,
+ ?OPTIONS => Opts,
+ ?REP_USER => UserName,
+ ?START_TIME => erlang:system_time(second)
+ },
+ {ok, Rep}
+ end.
+
+
+-spec parse_rep_db(#{}, #{}, #{}) -> #{}.
+parse_rep_db(#{} = Endpoint, #{} = ProxyParams, #{} = Options) ->
+ ProxyUrl = case ProxyParams of
+ #{<<"proxy_url">> := PUrl} -> PUrl;
+ _ -> null
+ end,
+
+ Url0 = maps:get(<<"url">>, Endpoint),
+ Url = maybe_add_trailing_slash(Url0),
+
+ AuthProps = maps:get(<<"auth">>, Endpoint, #{}),
+ if is_map(AuthProps) -> ok; true ->
+ throw({error, "if defined, `auth` must be an object"})
+ end,
+
+ Headers0 = maps:get(<<"headers">>, Endpoint, #{}),
+ if is_map(Headers0) -> ok; true ->
+ throw({error, "if defined `headers` must be an object"})
+ end,
+ DefaultHeaders = couch_replicator_utils:default_headers_map(),
+ Headers = maps:merge(DefaultHeaders, Headers0),
+
+ SockOpts = maps:get(<<"socket_options">>, Options, #{}),
+ SockAndProxy = maps:merge(#{
+ <<"socket_options">> => SockOpts
+ }, ProxyParams),
+ SslParams = ssl_params(Url),
+
+ #{
+ <<"url">> => Url,
+ <<"auth_props">> => AuthProps,
+ <<"headers">> => Headers,
+ <<"ibrowse_options">> => maps:merge(SslParams, SockAndProxy),
+ <<"timeout">> => maps:get(<<"connection_timeout">>, Options),
+ <<"http_connections">> => maps:get(<<"http_connections">>, Options),
+ <<"retries">> => maps:get(<<"retries_per_request">>, Options),
+ <<"proxy_url">> => ProxyUrl
+ };
+
+parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
+ parse_rep_db(#{<<"url">> => Url}, Proxy, Options);
+
+parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
+ parse_rep_db(#{<<"url">> => Url}, Proxy, Options);
+
+parse_rep_db(<<_/binary>>, _Proxy, _Options) ->
+ throw({error, local_endpoints_not_supported});
+
+parse_rep_db(undefined, _Proxy, _Options) ->
+ throw({error, <<"Missing replication endpoint">>}).
+
+
+parse_proxy_settings(#{} = Doc) ->
+ Proxy = maps:get(?PROXY, Doc, <<>>),
+ SrcProxy = maps:get(?SOURCE_PROXY, Doc, <<>>),
+ TgtProxy = maps:get(?TARGET_PROXY, Doc, <<>>),
+
+ case Proxy =/= <<>> of
+ true when SrcProxy =/= <<>> ->
+ Error = "`proxy` is mutually exclusive with `source_proxy`",
+ throw({error, Error});
+ true when TgtProxy =/= <<>> ->
+ Error = "`proxy` is mutually exclusive with `target_proxy`",
+ throw({error, Error});
+ true ->
+ {parse_proxy_params(Proxy), parse_proxy_params(Proxy)};
+ false ->
+ {parse_proxy_params(SrcProxy), parse_proxy_params(TgtProxy)}
+ end.
+
+
+-spec maybe_add_trailing_slash(binary()) -> binary().
+maybe_add_trailing_slash(<<>>) ->
+ <<>>;
+
+maybe_add_trailing_slash(Url) when is_binary(Url) ->
+ case binary:match(Url, <<"?">>) of
+ nomatch ->
+ case binary:last(Url) of
+ $/ -> Url;
+ _ -> <<Url/binary, "/">>
+ end;
+ _ ->
+ Url % skip if there are query params
+ end.
+
+
+-spec make_options(#{}) -> #{}.
+make_options(#{} = RepDoc) ->
+ Options0 = convert_options(RepDoc),
+ Options = check_options(Options0),
+ ConfigOptions = lists:foldl(fun({K, Default, ConversionFun}, Acc) ->
+ V = ConversionFun(config:get("replicator", K, Default)),
+ Acc#{list_to_binary(K) => V}
+ end, #{}, ?CONFIG_DEFAULTS),
+ maps:merge(ConfigOptions, Options).
+
+
+-spec convert_options(#{}) -> #{} | no_return().
+convert_options(#{} = Doc) ->
+ maps:fold(fun convert_fold/3, #{}, Doc).
+
+
+-spec convert_fold(binary(), any(), #{}) -> #{}.
+convert_fold(<<"cancel">>, V, Acc) when is_boolean(V) ->
+ Acc#{<<"cancel">> => V};
+convert_fold(<<"cancel">>, _, _) ->
+ throw({error, <<"`cancel` must be a boolean">>});
+convert_fold(IdOpt, V, Acc) when IdOpt =:= <<"_local_id">>;
+ IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
+ Acc#{<<"id">> => couch_replicator_ids:convert(V)};
+convert_fold(<<"create_target">>, V, Acc) when is_boolean(V) ->
+ Acc#{<<"create_target">> => V};
+convert_fold(<<"create_target">>, _, _) ->
+ throw({error, <<"`create_target` must be a boolean">>});
+convert_fold(<<"create_target_params">>, #{} = V, Acc) ->
+ Acc#{<<"create_target_params">> => V};
+convert_fold(<<"create_target_params">>, _, _) ->
+ throw({error, <<"`create_target_params` must be an object">>});
+convert_fold(<<"continuous">>, V, Acc) when is_boolean(V) ->
+ Acc#{<<"continuous">> => V};
+convert_fold(<<"continuous">>, _, _) ->
+ throw({error, <<"`continuous` must be a boolean">>});
+convert_fold(<<"filter">>, V, Acc) when is_binary(V), byte_size(V) > 1 ->
+ Acc#{<<"filter">> => V};
+convert_fold(<<"filter">>, _, _) ->
+ throw({error, <<"`filter` must be a string">>});
+convert_fold(<<"query_params">>, V, Acc) when is_map(V) orelse V =:= null ->
+ Acc#{<<"query_params">> => V};
+convert_fold(<<"query_params">>, _, _Acc) ->
+ throw({error, <<"`query_params` is not `null` or object">>});
+convert_fold(<<"doc_ids">>, null, Acc) ->
+ Acc;
+convert_fold(<<"doc_ids">>, V, Acc) when is_list(V) ->
+ % Compatibility behaviour as: accept a list of percent encoded doc IDs
+ Ids = lists:map(fun(Id) ->
+ case is_binary(Id) andalso byte_size(Id) > 0 of
+ true -> list_to_binary(couch_httpd:unquote(Id));
+ false -> throw({error, <<"`doc_ids` array must contain strings">>})
+ end
+ end, V),
+ Acc#{<<"doc_ids">> => lists:usort(Ids)};
+convert_fold(<<"doc_ids">>, _, _) ->
+ throw({error, <<"`doc_ids` must be an array">>});
+convert_fold(<<"selector">>, #{} = V, Acc) ->
+ Acc#{<<"selector">> => V};
+convert_fold(<<"selector">>, _, _Acc) ->
+ throw({error, <<"`selector` must be a JSON object">>});
+convert_fold(<<"worker_processes">>, V, Acc) ->
+ Acc#{<<"worker_processes">> => bin2int(V, <<"worker_processes">>)};
+convert_fold(<<"worker_batch_size">>, V, Acc) ->
+ Acc#{<<"worker_batch_size">> => bin2int(V, <<"worker_batch_size">>)};
+convert_fold(<<"http_connections">>, V, Acc) ->
+ Acc#{<<"http_connections">> => bin2int(V, <<"http_connections">>)};
+convert_fold(<<"connection_timeout">>, V, Acc) ->
+ Acc#{<<"connection_timeout">> => bin2int(V, <<"connection_timeout">>)};
+convert_fold(<<"retries_per_request">>, V, Acc) ->
+ Acc#{<<"retries_per_request">> => bin2int(V, <<"retries_per_request">>)};
+convert_fold(<<"socket_options">>, V, Acc) ->
+ Acc#{<<"socket_options">> => parse_sock_opts(V)};
+convert_fold(<<"since_seq">>, V, Acc) ->
+ Acc#{<<"since_seq">> => V};
+convert_fold(<<"use_checkpoints">>, V, Acc) when is_boolean(V) ->
+ Acc#{<<"use_checkpoints">> => V};
+convert_fold(<<"use_checkpoints">>, _, _) ->
+ throw({error, <<"`use_checkpoints` must be a boolean">>});
+convert_fold(<<"checkpoint_interval">>, V, Acc) ->
+ Acc#{<<"checkpoint_interval">> => bin2int(V, <<"checkpoint_interval">>)};
+convert_fold(_K, _V, Acc) -> % skip unknown option
+ Acc.
+
+
+bin2int(V, _Field) when is_integer(V) ->
+ V;
+
+bin2int(V, Field) when is_binary(V) ->
+ try
+ erlang:binary_to_integer(V)
+ catch
+ error:badarg ->
+ throw({error, <<"`", Field/binary, "` must be an integer">>})
+ end;
+
+bin2int(_V, Field) ->
+ throw({error, <<"`", Field/binary, "` must be an integer">>}).
+
+
+-spec check_options(#{}) -> #{}.
+check_options(Options) ->
+ DocIds = maps:is_key(<<"doc_ids">>, Options),
+ Filter = maps:is_key(<<"filter">>, Options),
+ Selector = maps:is_key(<<"selector">>, Options),
+ case {DocIds, Filter, Selector} of
+ {false, false, false} -> Options;
+ {false, false, _} -> Options;
+ {false, _, false} -> Options;
+ {_, false, false} -> Options;
+ _ -> throw({error, <<"`doc_ids`,`filter`,`selector` are mutually "
+ " exclusive">>})
+ end.
+
+
+parse_sock_opts(Term) ->
+ {ok, SocketOptions} = couch_util:parse_term(Term),
+ lists:foldl(fun
+ ({K, V}, Acc) when is_atom(K) ->
+ case lists:member(K, ?VALID_SOCK_OPTS) of
+ true -> Acc#{atom_to_binary(K, utf8) => V};
+ false -> Acc
+ end;
+ (_, Acc) ->
+ Acc
+ end, #{}, SocketOptions).
+
+
+-spec parse_proxy_params(binary() | #{}) -> #{}.
+parse_proxy_params(<<>>) ->
+ #{};
+parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl)->
+ #url{
+ host = Host,
+ port = Port,
+ username = User,
+ password = Passwd,
+ protocol = Prot0
+ } = ibrowse_lib:parse_url(binary_to_list(ProxyUrl)),
+ Prot = case lists:member(Prot0, ?VALID_PROXY_PROTOCOLS) of
+ true -> atom_to_binary(Prot0, utf8);
+ false -> throw({error, <<"Unsupported proxy protocol">>})
+ end,
+ ProxyParams = #{
+ <<"proxy_url">> => ProxyUrl,
+ <<"proxy_protocol">> => Prot,
+ <<"proxy_host">> => list_to_binary(Host),
+ <<"proxy_port">> => Port
+ },
+ case is_list(User) andalso is_list(Passwd) of
+ true ->
+ ProxyParams#{
+ <<"proxy_user">> => list_to_binary(User),
+ <<"proxy_password">> => list_to_binary(Passwd)
+ };
+ false ->
+ ProxyParams
+ end.
+
+
+-spec ssl_params(binary()) -> #{}.
+ssl_params(Url) ->
+ case ibrowse_lib:parse_url(binary_to_list(Url)) of
+ #url{protocol = https} ->
+ Depth = list_to_integer(
+ config:get("replicator", "ssl_certificate_max_depth", "3")
+ ),
+ VerifyCerts = config:get("replicator", "verify_ssl_certificates"),
+ CertFile = config:get("replicator", "cert_file", null),
+ KeyFile = config:get("replicator", "key_file", null),
+ Password = config:get("replicator", "password", null),
+ VerifySslOptions = ssl_verify_options(VerifyCerts =:= "true"),
+ SslOpts = maps:merge(VerifySslOptions, #{<<"depth">> => Depth}),
+ HaveCertAndKey = CertFile /= null andalso KeyFile /= null,
+ SslOpts1 = case HaveCertAndKey of false -> SslOpts; true ->
+ CertOpts0 = #{
+ <<"certfile">> => list_to_binary(CertFile),
+ <<"keyfile">> => list_to_binary(KeyFile)
+ },
+ CertOpts = case Password of null -> CertOpts0; _ ->
+ CertOpts0#{<<"password">> => list_to_binary(Password)}
+ end,
+ maps:merge(SslOpts, CertOpts)
+ end,
+ #{<<"is_ssl">> => true, <<"ssl_options">> => SslOpts1};
+ #url{protocol = http} ->
+ #{}
+ end.
+
+
+-spec ssl_verify_options(true | false) -> [_].
+ssl_verify_options(true) ->
+ case config:get("replicator", "ssl_trusted_certificates_file") of
+ undefined ->
+ #{
+ <<"verify">> => <<"verify_peer">>,
+ <<"cacertfile">> => null
+ };
+ CAFile when is_list(CAFile) ->
+ #{
+ <<"verify">> => <<"verify_peer">>,
+ <<"cacertfile">> => list_to_binary(CAFile)
+ }
+ end;
+
+ssl_verify_options(false) ->
+ #{
+ <<"verify">> => <<"verify_none">>
+ }.
+
+
+-ifdef(TEST).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
+
+
+check_options_pass_values_test() ->
+ ?assertEqual(check_options(#{}), #{}),
+ ?assertEqual(check_options(#{<<"baz">> => <<"foo">>}),
+ #{<<"baz">> => <<"foo">>}),
+ ?assertEqual(check_options(#{<<"doc_ids">> => [<<"x">>]}),
+ #{<<"doc_ids">> => [<<"x">>]}),
+ ?assertEqual(check_options(#{<<"filter">> => <<"f">>}),
+ #{<<"filter">> => <<"f">>}),
+ ?assertEqual(check_options(#{<<"selector">> => <<"s">>}),
+ #{<<"selector">> => <<"s">>}).
+
+
+check_options_fail_values_test() ->
+ ?assertThrow({error, _},
+ check_options(#{<<"doc_ids">> => [], <<"filter">> => <<"f">>})),
+ ?assertThrow({error, _},
+ check_options(#{<<"doc_ids">> => [], <<"selector">> => <<"s">>})),
+ ?assertThrow({error, _},
+ check_options(#{<<"filter">> => <<"f">>, <<"selector">> => <<"s">>})),
+ ?assertThrow({error, _},
+ check_options(#{
+ <<"doc_ids">> => [],
+ <<"filter">> => <<"f">>,
+ <<"selector">> => <<"s">>}
+ )).
+
+
+check_convert_options_pass_test() ->
+ ?assertEqual(#{}, convert_options(#{})),
+ ?assertEqual(#{}, convert_options(#{<<"random">> => 42})),
+ ?assertEqual(#{<<"cancel">> => true},
+ convert_options(#{<<"cancel">> => true})),
+ ?assertEqual(#{<<"create_target">> => true},
+ convert_options(#{<<"create_target">> => true})),
+ ?assertEqual(#{<<"continuous">> => true},
+ convert_options(#{<<"continuous">> => true})),
+ ?assertEqual(#{<<"doc_ids">> => [<<"id">>]},
+ convert_options(#{<<"doc_ids">> => [<<"id">>]})),
+ ?assertEqual(#{<<"selector">> => #{<<"key">> => <<"value">>}},
+ convert_options(#{<<"selector">> => #{<<"key">> => <<"value">>}})).
+
+
+check_convert_options_fail_test() ->
+ ?assertThrow({error, _},
+ convert_options(#{<<"cancel">> => <<"true">>})),
+ ?assertThrow({error, _},
+ convert_options(#{<<"create_target">> => <<"true">>})),
+ ?assertThrow({error, _},
+ convert_options(#{<<"continuous">> => <<"true">>})),
+ ?assertThrow({error, _},
+ convert_options(#{<<"doc_ids">> => <<"not_a_list">>})),
+ ?assertThrow({error, _},
+ convert_options(#{<<"selector">> => <<"bad">>})).
+
+
+local_replication_endpoint_error_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(t_error_on_local_endpoint)
+ ]
+ }.
+
+
+setup() ->
+ meck:expect(config, get, fun(_, _, Default) -> Default end).
+
+
+teardown(_) ->
+ meck:unload().
+
+
+t_error_on_local_endpoint(_) ->
+ RepDoc = {[
+ {<<"_id">>, <<"someid">>},
+ {<<"source">>, <<"localdb">>},
+ {<<"target">>, <<"http://somehost.local/tgt">>}
+ ]},
+ Expect = local_endpoints_not_supported,
+ ?assertThrow({bad_rep_doc, Expect}, parse_rep_doc(RepDoc)).
+
+
+-endif.