summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@gmail.com>2022-11-22 11:31:54 -0500
committerNick Vatamaniuc <nickva@users.noreply.github.com>2022-11-30 00:23:07 -0500
commitf13ceb46ce3c120e1960fa47bfda0a606601900e (patch)
treeabfe7681f984cf8d3071929420ef124790b7fa90
parentfb5ade222d887bb92a564de0e1b8f818d3ffc915 (diff)
downloadcouchdb-f13ceb46ce3c120e1960fa47bfda0a606601900e.tar.gz
Improve validation of replicator job parameters
There are two main improvements: * Replace the auto-inserted replicator VDU with a BDU. Replicator already had a BDU to update the `"owner"` field, so plug right into it and validate everything we need there. This way, the validation and parsing logic is all in one module. The previously inserted VDU design doc will be deleted. * Allow constraining endpoint protocol types and socket options. Previously, users could create replications with any low level socket options. Some of those are dangerous and are possible "foot-guns". Restrict those options to a more usable set. In addition to those improvements, increase test coverage a bit by explicitly checking a few more parsing corner cases. Fixes #4273
-rw-r--r--rel/overlay/etc/default.ini12
-rw-r--r--src/chttpd/src/chttpd.erl4
-rw-r--r--src/couch_replicator/src/couch_replicator.erl10
-rw-r--r--src/couch_replicator/src/couch_replicator_doc_processor.erl9
-rw-r--r--src/couch_replicator/src/couch_replicator_doc_processor_worker.erl14
-rw-r--r--src/couch_replicator/src/couch_replicator_docs.erl820
-rw-r--r--src/couch_replicator/src/couch_replicator_ids.erl8
-rw-r--r--src/couch_replicator/src/couch_replicator_js_functions.hrl183
-rw-r--r--src/couch_replicator/src/couch_replicator_parse.erl750
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl21
-rw-r--r--src/couch_replicator/src/couch_replicator_utils.erl10
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl2
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl2
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl10
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl2
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_test_helper.erl2
-rw-r--r--src/docs/src/config/replicator.rst34
-rw-r--r--src/fabric/src/fabric_doc_update.erl14
18 files changed, 1046 insertions, 861 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 04448aabd..0efc4cb23 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -555,6 +555,18 @@ partitioned||* = true
; See the `inet` Erlang module's man page for the full list of options.
;socket_options = [{keepalive, true}, {nodelay, false}]
+; Valid socket options. Options not in this list are ignored. The full list of
+; options may be found at https://www.erlang.org/doc/man/inet.html#setopts-2.
+;valid_socket_options = buffer,keepalive,nodelay,priority,recbuf,sndbuf
+
+; Valid replication endpoint protocols. Replication jobs with endpoint urls not
+; in this list will fail to run.
+;valid_endpoint_protocols = http,https
+
+; Valid replication proxy protocols. Replication jobs with proxy urls not in
+; this list will fail to run.
+;valid_proxy_protocols = http,https,socks5
+
; Path to a file containing the user's certificate.
;cert_file = /full/path/to/server_cert.pem
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index 13e919cb5..c25c18838 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -1096,6 +1096,10 @@ error_info({error, {database_name_too_long, DbName}}) ->
<<"At least one path segment of `", DbName/binary, "` is too long.">>};
error_info({doc_validation, Reason}) ->
{400, <<"doc_validation">>, Reason};
+error_info({error, <<"endpoint has an invalid url">> = Reason}) ->
+ {400, <<"invalid_replication">>, Reason};
+error_info({error, <<"proxy has an invalid url">> = Reason}) ->
+ {400, <<"invalid_replication">>, Reason};
error_info({missing_stub, Reason}) ->
{412, <<"missing_stub">>, Reason};
error_info(request_entity_too_large) ->
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 39b3903ea..935daaa80 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -25,10 +25,8 @@
-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator.hrl").
-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("mem3/include/mem3.hrl").
--define(DESIGN_DOC_CREATION_DELAY_MSEC, 1000).
-define(REPLICATION_STATES, [
% Just added to scheduler
initializing,
@@ -58,7 +56,7 @@
| {error, any()}
| no_return().
replicate(PostBody, Ctx) ->
- {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
+ {ok, Rep0} = couch_replicator_parse:parse_rep_doc(PostBody, Ctx),
Rep = Rep0#rep{start_time = os:timestamp()},
#rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
case get_value(cancel, Options, false) of
@@ -138,12 +136,16 @@ replication_states() ->
-spec strip_url_creds(binary() | {[_]}) -> binary().
strip_url_creds(Endpoint) ->
- try couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
+ try couch_replicator_parse:parse_rep_db(Endpoint, [], []) of
#httpdb{url = Url} ->
iolist_to_binary(couch_util:url_strip_password(Url))
catch
throw:{error, local_endpoints_not_supported} ->
Endpoint;
+ throw:{error, _} ->
+ % Avoid exposing any part of the URL in case there is a password in
+ % the malformed endpoint URL
+ null;
error:_ ->
% Avoid exposing any part of the URL in case there is a password in
% the malformed endpoint URL
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 436d7c44d..eb4c02b49 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -44,9 +44,7 @@
notify_cluster_event/2
]).
--include_lib("couch/include/couch_db.hrl").
-include("couch_replicator.hrl").
--include_lib("mem3/include/mem3.hrl").
-import(couch_replicator_utils, [
get_json_value/2,
@@ -77,9 +75,8 @@
% couch_multidb_changes API callbacks
-db_created(DbName, Server) ->
+db_created(_DbName, Server) ->
couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
- couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
Server.
db_deleted(DbName, Server) ->
@@ -89,7 +86,7 @@ db_deleted(DbName, Server) ->
db_found(DbName, Server) ->
couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
- couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+ couch_replicator_docs:delete_old_rep_ddoc(DbName),
Server.
db_change(DbName, {ChangeProps} = Change, Server) ->
@@ -169,7 +166,7 @@ process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
% should propagate to db_change function and will be recorded as permanent
% failure in the document. User will have to update the documet to fix the
% problem.
- Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc),
+ Rep0 = couch_replicator_parse:parse_rep_doc_without_id(JsonRepDoc),
Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()},
Filter =
case couch_replicator_filters:parse(Rep#rep.options) of
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
index 22c5f8584..b1014ffa5 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
@@ -162,7 +162,7 @@ doc_processor_worker_test_() ->
t_should_add_job() ->
?_test(begin
Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+ Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
?assert(added_job())
end).
@@ -172,7 +172,7 @@ t_already_running_same_docid() ->
?_test(begin
Id = {?DB, ?DOC1},
mock_already_running(?DB, ?DOC1),
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+ Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
?assert(did_not_add_job())
end).
@@ -182,7 +182,7 @@ t_already_running_transient() ->
?_test(begin
Id = {?DB, ?DOC1},
mock_already_running(null, null),
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+ Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
?assertMatch(
{temporary_error, _},
maybe_start_replication(
@@ -200,7 +200,7 @@ t_already_running_other_db_other_doc() ->
?_test(begin
Id = {?DB, ?DOC1},
mock_already_running(<<"otherdb">>, <<"otherdoc">>),
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+ Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
?assertMatch(
{permanent_failure, _},
maybe_start_replication(
@@ -217,7 +217,7 @@ t_already_running_other_db_other_doc() ->
t_spawn_worker() ->
?_test(begin
Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+ Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
WRef = make_ref(),
meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef),
Pid = spawn_worker(Id, Rep, 0, WRef),
@@ -236,7 +236,7 @@ t_spawn_worker() ->
t_ignore_if_doc_deleted() ->
?_test(begin
Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+ Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
?assertNot(added_job())
@@ -247,7 +247,7 @@ t_ignore_if_doc_deleted() ->
t_ignore_if_worker_ref_does_not_match() ->
?_test(begin
Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+ Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
meck:expect(
couch_replicator_doc_processor,
get_worker_ref,
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index a60f1a1e1..5fb86c4f5 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -13,15 +13,9 @@
-module(couch_replicator_docs).
-export([
- parse_rep_doc/1,
- parse_rep_doc/2,
- parse_rep_db/3,
- parse_rep_doc_without_id/1,
- parse_rep_doc_without_id/2,
before_doc_update/3,
after_doc_read/2,
- ensure_rep_ddoc_exists/1,
- ensure_cluster_rep_ddoc_exists/1,
+ delete_old_rep_ddoc/1,
remove_state_fields/2,
update_doc_completed/3,
update_failed/3,
@@ -31,24 +25,10 @@
]).
-include_lib("couch/include/couch_db.hrl").
--include_lib("ibrowse/include/ibrowse.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
--import(couch_util, [
- get_value/2,
- get_value/3,
- to_binary/1
-]).
-
--import(couch_replicator_utils, [
- get_json_value/2,
- get_json_value/3
-]).
-
--define(REP_DB_NAME, <<"_replicator">>).
+% The ID of now deleted design doc. On every *_replicator db discovery we try
+% to delete it. At some point in the future, remove this logic altogether.
-define(REP_DESIGN_DOC, <<"_design/_replicator">>).
-define(OWNER, <<"owner">>).
-define(CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}}).
@@ -126,176 +106,32 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
]),
ok.
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
+-spec delete_old_rep_ddoc(binary()) -> ok.
+delete_old_rep_ddoc(RepDb) ->
case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
- true ->
- ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
- false ->
- ok
+ true -> delete_old_rep_ddoc(RepDb, ?REP_DESIGN_DOC);
+ false -> ok
end.
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
+-spec delete_old_rep_ddoc(binary(), binary()) -> ok.
+delete_old_rep_ddoc(RepDb, DDocId) ->
case open_rep_doc(RepDb, DDocId) of
{not_found, no_db_file} ->
- %% database was deleted.
ok;
{not_found, _Reason} ->
- DocProps = replication_design_doc_props(DDocId),
- DDoc = couch_doc:from_json_obj({DocProps}),
- couch_log:notice("creating replicator ddoc ~p", [RepDb]),
- {ok, _Rev} = save_rep_doc(RepDb, DDoc);
+ ok;
{ok, Doc} ->
- Latest = replication_design_doc_props(DDocId),
- {Props0} = couch_doc:to_json_obj(Doc, []),
- {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
- case compare_ejson({Props}, {Latest}) of
- true ->
- ok;
- false ->
- LatestWithRev = [{<<"_rev">>, Rev} | Latest],
- DDoc = couch_doc:from_json_obj({LatestWithRev}),
- couch_log:notice("updating replicator ddoc ~p", [RepDb]),
- try
- {ok, _} = save_rep_doc(RepDb, DDoc)
- catch
- throw:conflict ->
- %% ignore, we'll retry next time
- ok
- end
+ DeletedDoc = Doc#doc{deleted = true, body = {[]}},
+ try
+ save_rep_doc(RepDb, DeletedDoc)
+ catch
+ throw:conflict ->
+ % ignore, we'll retry next time
+ ok
end
end,
ok.
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
- DDocId = ?REP_DESIGN_DOC,
- [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
- ensure_rep_ddoc_exists(DbShard, DDocId).
-
--spec compare_ejson({[_]}, {[_]}) -> boolean().
-compare_ejson(EJson1, EJson2) ->
- EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
- EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
- EjsonSorted1 == EjsonSorted2.
-
--spec replication_design_doc_props(binary()) -> [_].
-replication_design_doc_props(DDocId) ->
- [
- {<<"_id">>, DDocId},
- {<<"language">>, <<"javascript">>},
- {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
- ].
-
-% Note: parse_rep_doc can handle filtered replications. During parsing of the
-% replication doc it will make possibly remote http requests to the source
-% database. If failure or parsing of filter docs fails, parse_doc throws a
-% {filter_fetch_error, Error} excation. This exception should be considered
-% transient in respect to the contents of the document itself, since it depends
-% on netowrk availability of the source db and other factors.
--spec parse_rep_doc({[_]}) -> #rep{}.
-parse_rep_doc(RepDoc) ->
- {ok, Rep} =
- try
- parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
- catch
- throw:{error, Reason} ->
- throw({bad_rep_doc, Reason});
- throw:{filter_fetch_error, Reason} ->
- throw({filter_fetch_error, Reason});
- Tag:Err ->
- throw({bad_rep_doc, to_binary({Tag, Err})})
- end,
- Rep.
-
--spec parse_rep_doc_without_id({[_]}) -> #rep{}.
-parse_rep_doc_without_id(RepDoc) ->
- {ok, Rep} =
- try
- parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
- catch
- throw:{error, Reason} ->
- throw({bad_rep_doc, Reason});
- Tag:Err ->
- throw({bad_rep_doc, to_binary({Tag, Err})})
- end,
- Rep.
-
--spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc(Doc, UserCtx) ->
- {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
- Cancel = get_value(cancel, Rep#rep.options, false),
- Id = get_value(id, Rep#rep.options, nil),
- case {Cancel, Id} of
- {true, nil} ->
- % Cancel request with no id, must parse id out of body contents
- {ok, update_rep_id(Rep)};
- {true, Id} ->
- % Cancel request with an id specified, so do not parse id from body
- {ok, Rep};
- {false, _Id} ->
- % Not a cancel request, regular replication doc
- {ok, update_rep_id(Rep)}
- end.
-
--spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc_without_id({Props}, UserCtx) ->
- {SrcProxy, TgtProxy} = parse_proxy_settings(Props),
- Opts = make_options(Props),
- case
- get_value(cancel, Opts, false) andalso
- (get_value(id, Opts, nil) =/= nil)
- of
- true ->
- {ok, #rep{options = Opts, user_ctx = UserCtx}};
- false ->
- Source = parse_rep_db(get_value(<<"source">>, Props), SrcProxy, Opts),
- Target = parse_rep_db(get_value(<<"target">>, Props), TgtProxy, Opts),
- {Type, View} =
- case couch_replicator_filters:view_type(Props, Opts) of
- {error, Error} ->
- throw({bad_request, Error});
- Result ->
- Result
- end,
- Rep = #rep{
- source = Source,
- target = Target,
- options = Opts,
- user_ctx = UserCtx,
- type = Type,
- view = View,
- doc_id = get_value(<<"_id">>, Props, null)
- },
- % Check if can parse filter code, if not throw exception
- case couch_replicator_filters:parse(Opts) of
- {error, FilterError} ->
- throw({error, FilterError});
- {ok, _Filter} ->
- ok
- end,
- {ok, Rep}
- end.
-
-parse_proxy_settings(Props) when is_list(Props) ->
- Proxy = get_value(<<"proxy">>, Props, <<>>),
- SrcProxy = get_value(<<"source_proxy">>, Props, <<>>),
- TgtProxy = get_value(<<"target_proxy">>, Props, <<>>),
-
- case Proxy =/= <<>> of
- true when SrcProxy =/= <<>> ->
- Error = "`proxy` is mutually exclusive with `source_proxy`",
- throw({bad_request, Error});
- true when TgtProxy =/= <<>> ->
- Error = "`proxy` is mutually exclusive with `target_proxy`",
- throw({bad_request, Error});
- true ->
- {Proxy, Proxy};
- false ->
- {SrcProxy, TgtProxy}
- end.
-
% Update a #rep{} record with a replication_id. Calculating the id might involve
% fetching a filter from the source db, and so it could fail intermetently.
% In case of a failure to fetch the filter this function will throw a
@@ -386,316 +222,47 @@ save_rep_doc(DbName, Doc) ->
couch_db:close(Db)
end.
--spec rep_user_ctx({[_]}) -> #user_ctx{}.
-rep_user_ctx({RepDoc}) ->
- case get_json_value(<<"user_ctx">>, RepDoc) of
- undefined ->
- #user_ctx{};
- {UserCtx} ->
- #user_ctx{
- name = get_json_value(<<"name">>, UserCtx, null),
- roles = get_json_value(<<"roles">>, UserCtx, [])
- }
- end.
-
--spec parse_rep_db({[_]} | binary(), [_] | binary(), [_]) -> #httpdb{} | no_return().
-parse_rep_db({Props}, Proxy, Options) ->
- ProxyParams = parse_proxy_params(Proxy),
- ProxyURL =
- case ProxyParams of
- [] -> undefined;
- _ -> binary_to_list(Proxy)
- end,
- Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
- {AuthProps} = get_value(<<"auth">>, Props, {[]}),
- {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
- Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
- DefaultHeaders = (#httpdb{})#httpdb.headers,
- HttpDb = #httpdb{
- url = Url,
- auth_props = AuthProps,
- headers = lists:ukeymerge(1, Headers, DefaultHeaders),
- ibrowse_options = lists:keysort(
- 1,
- [
- {socket_options, get_value(socket_options, Options)}
- | ProxyParams ++ ssl_params(Url)
- ]
- ),
- timeout = get_value(connection_timeout, Options),
- http_connections = get_value(http_connections, Options),
- retries = get_value(retries, Options),
- proxy_url = ProxyURL
- },
- couch_replicator_utils:normalize_basic_auth(HttpDb);
-parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
- parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
-parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
- parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
-parse_rep_db(<<_/binary>>, _Proxy, _Options) ->
- throw({error, local_endpoints_not_supported});
-parse_rep_db(undefined, _Proxy, _Options) ->
- throw({error, <<"Missing replicator database">>}).
-
--spec maybe_add_trailing_slash(binary() | list()) -> list().
-maybe_add_trailing_slash(Url) when is_binary(Url) ->
- maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
- case lists:member($?, Url) of
- true ->
- % skip if there are query params
- Url;
- false ->
- case lists:last(Url) of
- $/ ->
- Url;
- _ ->
- Url ++ "/"
- end
- end.
-
--spec make_options([_]) -> [_].
-make_options(Props) ->
- Options0 = lists:ukeysort(1, convert_options(Props)),
- Options = check_options(Options0),
- DefWorkers = config:get_integer("replicator", "worker_processes", 4),
- DefBatchSize = config:get_integer("replicator", "worker_batch_size", 500),
- DefConns = config:get_integer("replicator", "http_connections", 20),
- DefTimeout = config:get_integer("replicator", "connection_timeout", 30000),
- DefRetries = config:get_integer("replicator", "retries_per_request", 5),
- UseCheckpoints = config:get_boolean("replicator", "use_checkpoints", true),
- UseBulkGet = config:get_boolean("replicator", "use_bulk_get", true),
- DefCheckpointInterval = config:get_integer(
- "replicator",
- "checkpoint_interval",
- 30000
- ),
- {ok, DefSocketOptions} = couch_util:parse_term(
- config:get(
- "replicator",
- "socket_options",
- "[{keepalive, true}, {nodelay, false}]"
- )
- ),
- lists:ukeymerge(
- 1,
- Options,
- lists:keysort(1, [
- {connection_timeout, DefTimeout},
- {retries, DefRetries},
- {http_connections, DefConns},
- {socket_options, DefSocketOptions},
- {worker_batch_size, DefBatchSize},
- {worker_processes, DefWorkers},
- {use_checkpoints, UseCheckpoints},
- {use_bulk_get, UseBulkGet},
- {checkpoint_interval, DefCheckpointInterval}
- ])
- ).
-
--spec convert_options([_]) -> [_].
-convert_options([]) ->
- [];
-convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V) ->
- throw({bad_request, <<"parameter `cancel` must be a boolean">>});
-convert_options([{<<"cancel">>, V} | R]) ->
- [{cancel, V} | convert_options(R)];
-convert_options([{IdOpt, V} | R]) when
- IdOpt =:= <<"_local_id">>;
- IdOpt =:= <<"replication_id">>;
- IdOpt =:= <<"id">>
-->
- [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V) ->
- throw({bad_request, <<"parameter `create_target` must be a boolean">>});
-convert_options([{<<"create_target">>, V} | R]) ->
- [{create_target, V} | convert_options(R)];
-convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) ->
- throw({bad_request, <<"parameter `create_target_params` must be a JSON object">>});
-convert_options([{<<"create_target_params">>, V} | R]) ->
- [{create_target_params, V} | convert_options(R)];
-convert_options([{<<"winning_revs_only">>, V} | _R]) when not is_boolean(V) ->
- throw({bad_request, <<"parameter `winning_revs_only` must be a boolean">>});
-convert_options([{<<"winning_revs_only">>, V} | R]) ->
- [{winning_revs_only, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V) ->
- throw({bad_request, <<"parameter `continuous` must be a boolean">>});
-convert_options([{<<"continuous">>, V} | R]) ->
- [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
- [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
- [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, null} | R]) ->
- convert_options(R);
-convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
- throw({bad_request, <<"parameter `doc_ids` must be an array">>});
-convert_options([{<<"doc_ids">>, V} | R]) ->
- % Ensure same behaviour as old replicator: accept a list of percent
- % encoded doc IDs.
- DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]),
- [{doc_ids, DocIds} | convert_options(R)];
-convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
- throw({bad_request, <<"parameter `selector` must be a JSON object">>});
-convert_options([{<<"selector">>, V} | R]) ->
- [{selector, V} | convert_options(R)];
-convert_options([{<<"worker_processes">>, V} | R]) ->
- [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"worker_batch_size">>, V} | R]) ->
- [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_connections">>, V} | R]) ->
- [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"connection_timeout">>, V} | R]) ->
- [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"retries_per_request">>, V} | R]) ->
- [{retries, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"socket_options">>, V} | R]) ->
- {ok, SocketOptions} = couch_util:parse_term(V),
- [{socket_options, SocketOptions} | convert_options(R)];
-convert_options([{<<"since_seq">>, V} | R]) ->
- [{since_seq, V} | convert_options(R)];
-convert_options([{<<"use_checkpoints">>, V} | R]) ->
- [{use_checkpoints, V} | convert_options(R)];
-convert_options([{<<"use_bulk_get">>, V} | _R]) when not is_boolean(V) ->
- throw({bad_request, <<"parameter `use_bulk_get` must be a boolean">>});
-convert_options([{<<"use_bulk_get">>, V} | R]) ->
- [{use_bulk_get, V} | convert_options(R)];
-convert_options([{<<"checkpoint_interval">>, V} | R]) ->
- [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
-% skip unknown option
-convert_options([_ | R]) ->
- convert_options(R).
-
--spec check_options([_]) -> [_].
-check_options(Options) ->
- DocIds = lists:keyfind(doc_ids, 1, Options),
- Filter = lists:keyfind(filter, 1, Options),
- Selector = lists:keyfind(selector, 1, Options),
- case {DocIds, Filter, Selector} of
- {false, false, false} -> Options;
- {false, false, _} -> Options;
- {false, _, false} -> Options;
- {_, false, false} -> Options;
- _ -> throw({bad_request, "`doc_ids`,`filter`,`selector` are mutually exclusive"})
- end.
-
--spec parse_proxy_params(binary() | [_]) -> [_].
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
- parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
- [];
-parse_proxy_params(ProxyUrl) ->
- #url{
- host = Host,
- port = Port,
- username = User,
- password = Passwd,
- protocol = Protocol
- } = ibrowse_lib:parse_url(ProxyUrl),
- Params =
- [
- {proxy_host, Host},
- {proxy_port, Port}
- ] ++
- case is_list(User) andalso is_list(Passwd) of
- false ->
- [];
- true ->
- [{proxy_user, User}, {proxy_password, Passwd}]
- end,
- case Protocol of
- socks5 ->
- [proxy_to_socks5(Param) || Param <- Params];
- _ ->
- Params
- end.
-
--spec proxy_to_socks5({atom(), string()}) -> {atom(), string()}.
-proxy_to_socks5({proxy_host, Val}) ->
- {socks5_host, Val};
-proxy_to_socks5({proxy_port, Val}) ->
- {socks5_port, Val};
-proxy_to_socks5({proxy_user, Val}) ->
- {socks5_user, Val};
-proxy_to_socks5({proxy_password, Val}) ->
- {socks5_password, Val}.
-
--spec ssl_params([_]) -> [_].
-ssl_params(Url) ->
- case ibrowse_lib:parse_url(Url) of
- #url{protocol = https} ->
- Depth = config:get_integer(
- "replicator",
- "ssl_certificate_max_depth",
- 3
- ),
- VerifyCerts = config:get_boolean(
- "replicator",
- "verify_ssl_certificates",
- false
- ),
- CertFile = config:get("replicator", "cert_file", undefined),
- KeyFile = config:get("replicator", "key_file", undefined),
- Password = config:get("replicator", "password", undefined),
- SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts)],
- SslOpts1 =
- case CertFile /= undefined andalso KeyFile /= undefined of
- true ->
- case Password of
- undefined ->
- [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
- _ ->
- [
- {certfile, CertFile},
- {keyfile, KeyFile},
- {password, Password}
- ] ++ SslOpts
- end;
- false ->
- SslOpts
- end,
- [{is_ssl, true}, {ssl_options, SslOpts1}];
- #url{protocol = http} ->
- []
- end.
-
--spec ssl_verify_options(true | false) -> [_].
-ssl_verify_options(true) ->
- CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
- [{verify, verify_peer}, {cacertfile, CAFile}];
-ssl_verify_options(false) ->
- [{verify, verify_none}].
-
-spec before_doc_update(#doc{}, Db :: any(), couch_db:update_type()) -> #doc{}.
before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) ->
Doc;
+before_doc_update(#doc{} = Doc, _Db, ?REPLICATED_CHANGES) ->
+ % Skip internal replicator updates
+ Doc;
before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
- #user_ctx{
- roles = Roles,
- name = Name
- } = couch_db:get_user_ctx(Db),
- case lists:member(<<"_replicator">>, Roles) of
+ #user_ctx{roles = Roles, name = Name} = couch_db:get_user_ctx(Db),
+ IsReplicator = lists:member(<<"_replicator">>, Roles),
+ Doc1 =
+ case IsReplicator of
+ true -> Doc;
+ false -> before_doc_update_owner(get_value(?OWNER, Body), Name, Db, Doc)
+ end,
+ IsFailed = get_value(<<"_replication_state">>, Body) =:= <<"failed">>,
+ case IsReplicator orelse Doc1#doc.deleted orelse IsFailed of
true ->
- Doc;
+ ok;
false ->
- case couch_util:get_value(?OWNER, Body) of
- undefined ->
- Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
- Name ->
- Doc;
- Other ->
- case (catch couch_db:check_is_admin(Db)) of
- ok when Other =:= null ->
- Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
- ok ->
- Doc;
- _ ->
- throw(
- {forbidden,
- <<"Can't update replication documents", " from other users.">>}
- )
- end
+ try
+ couch_replicator_parse:parse_rep_doc_without_id(Doc1#doc.body)
+ catch
+ throw:{bad_rep_doc, Error} ->
+ throw({forbidden, Error})
end
+ end,
+ Doc1.
+
+before_doc_update_owner(undefined, Name, _Db, #doc{body = {Body}} = Doc) ->
+ Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+before_doc_update_owner(Name, Name, _Db, #doc{} = Doc) ->
+ Doc;
+before_doc_update_owner(Other, Name, Db, #doc{body = {Body}} = Doc) ->
+ case (catch couch_db:check_is_admin(Db)) of
+ ok when Other =:= null ->
+ Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+ ok ->
+ Doc;
+ _ ->
+ Err = <<"Can't update replication documents from other users.">>,
+ throw({forbidden, Err})
end.
-spec after_doc_read(#doc{}, Db :: any()) -> #doc{}.
@@ -707,22 +274,12 @@ after_doc_read(#doc{body = {Body}} = Doc, Db) ->
ok ->
Doc;
_ ->
- case couch_util:get_value(?OWNER, Body) of
+ case get_value(?OWNER, Body) of
Name ->
Doc;
_Other ->
- Source = strip_credentials(
- couch_util:get_value(
- <<"source">>,
- Body
- )
- ),
- Target = strip_credentials(
- couch_util:get_value(
- <<"target">>,
- Body
- )
- ),
+ Source = strip_credentials(get_value(<<"source">>, Body)),
+ Target = strip_credentials(get_value(<<"target">>, Body)),
NewBody0 = ?replace(Body, <<"source">>, Source),
NewBody = ?replace(NewBody0, <<"target">>, Target),
#doc{revs = {Pos, [_ | Revs]}} = Doc,
@@ -765,89 +322,19 @@ error_reason({error, Reason}) ->
error_reason(Reason) ->
to_binary(Reason).
+to_binary(Val) ->
+ couch_util:to_binary(Val).
+
+get_value(Key, Props) ->
+ couch_util:get_value(Key, Props).
+
+get_json_value(Key, Obj) ->
+ couch_replicator_utils:get_json_value(Key, Obj).
+
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
-check_options_pass_values_test() ->
- ?assertEqual(check_options([]), []),
- ?assertEqual(check_options([baz, {other, fiz}]), [baz, {other, fiz}]),
- ?assertEqual(check_options([{doc_ids, x}]), [{doc_ids, x}]),
- ?assertEqual(check_options([{filter, x}]), [{filter, x}]),
- ?assertEqual(check_options([{selector, x}]), [{selector, x}]).
-
-check_options_fail_values_test() ->
- ?assertThrow(
- {bad_request, _},
- check_options([{doc_ids, x}, {filter, y}])
- ),
- ?assertThrow(
- {bad_request, _},
- check_options([{doc_ids, x}, {selector, y}])
- ),
- ?assertThrow(
- {bad_request, _},
- check_options([{filter, x}, {selector, y}])
- ),
- ?assertThrow(
- {bad_request, _},
- check_options([{doc_ids, x}, {selector, y}, {filter, z}])
- ).
-
-check_convert_options_pass_test() ->
- ?assertEqual([], convert_options([])),
- ?assertEqual([], convert_options([{<<"random">>, 42}])),
- ?assertEqual(
- [{cancel, true}],
- convert_options([{<<"cancel">>, true}])
- ),
- ?assertEqual(
- [{create_target, true}],
- convert_options([{<<"create_target">>, true}])
- ),
- ?assertEqual(
- [{winning_revs_only, true}],
- convert_options([{<<"winning_revs_only">>, true}])
- ),
- ?assertEqual(
- [{continuous, true}],
- convert_options([{<<"continuous">>, true}])
- ),
- ?assertEqual(
- [{doc_ids, [<<"id">>]}],
- convert_options([{<<"doc_ids">>, [<<"id">>]}])
- ),
- ?assertEqual(
- [{selector, {key, value}}],
- convert_options([{<<"selector">>, {key, value}}])
- ).
-
-check_convert_options_fail_test() ->
- ?assertThrow(
- {bad_request, _},
- convert_options([{<<"cancel">>, <<"true">>}])
- ),
- ?assertThrow(
- {bad_request, _},
- convert_options([{<<"create_target">>, <<"true">>}])
- ),
- ?assertThrow(
- {bad_request, _},
- convert_options([{<<"winning_revs_only">>, <<"foo">>}])
- ),
- ?assertThrow(
- {bad_request, _},
- convert_options([{<<"continuous">>, <<"true">>}])
- ),
- ?assertThrow(
- {bad_request, _},
- convert_options([{<<"doc_ids">>, not_a_list}])
- ),
- ?assertThrow(
- {bad_request, _},
- convert_options([{<<"selector">>, [{key, value}]}])
- ).
-
check_strip_credentials_test() ->
[
?assertEqual(Expected, strip_credentials(Body))
@@ -879,48 +366,25 @@ check_strip_credentials_test() ->
]
].
-parse_proxy_params_test() ->
- ?assertEqual(
- [
- {proxy_host, "foo.com"},
- {proxy_port, 443},
- {proxy_user, "u"},
- {proxy_password, "p"}
- ],
- parse_proxy_params("https://u:p@foo.com")
- ),
- ?assertEqual(
- [
- {socks5_host, "foo.com"},
- {socks5_port, 1080},
- {socks5_user, "u"},
- {socks5_password, "p"}
- ],
- parse_proxy_params("socks5://u:p@foo.com")
- ).
-
setup() ->
- DbName = ?tempdb(),
+ TmpDbName = ?tempdb(),
+ DbName = <<TmpDbName/binary, "/_replicator">>,
{ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
ok = couch_db:close(Db),
- create_vdu(DbName),
DbName.
teardown(DbName) when is_binary(DbName) ->
couch_server:delete(DbName, [?ADMIN_CTX]),
ok.
-create_vdu(DbName) ->
+create_old_rep_ddoc(DbName) ->
couch_util:with_db(DbName, fun(Db) ->
- VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
- Doc = #doc{
- id = <<"_design/vdu">>,
- body = {[{<<"validate_doc_update">>, VduFun}]}
- },
- {ok, _} = couch_db:update_docs(Db, [Doc])
+ Doc = #doc{id = ?REP_DESIGN_DOC, body = {[]}},
+ {ok, _} = couch_db:update_docs(Db, [Doc]),
+ ok
end).
-update_replicator_doc_with_bad_vdu_test_() ->
+clean_old_replicator_ddoc_test_() ->
{
setup,
fun test_util:start_couch/0,
@@ -930,43 +394,129 @@ update_replicator_doc_with_bad_vdu_test_() ->
fun setup/0,
fun teardown/1,
[
- fun t_vdu_does_not_crash_on_save/1
+ ?TDEF_FE(t_clean_old_ddoc),
+ ?TDEF_FE(t_old_ddoc_already_cleaned),
+ ?TDEF_FE(t_ddoc_delete_missing_db)
]
}
}.
-t_vdu_does_not_crash_on_save(DbName) ->
- ?_test(begin
- Doc = #doc{id = <<"some_id">>, body = {[{<<"foo">>, 42}]}},
- ?assertEqual({ok, forbidden}, save_rep_doc(DbName, Doc))
- end).
+t_clean_old_ddoc(DbName) ->
+ ok = create_old_rep_ddoc(DbName),
+ ?assertMatch({ok, #doc{}}, open_rep_doc(DbName, ?REP_DESIGN_DOC)),
+ delete_old_rep_ddoc(DbName),
+ ?assertEqual({not_found, deleted}, open_rep_doc(DbName, ?REP_DESIGN_DOC)).
+
+t_old_ddoc_already_cleaned(DbName) ->
+ ok = delete_old_rep_ddoc(DbName),
+ ?assertEqual({not_found, missing}, open_rep_doc(DbName, ?REP_DESIGN_DOC)).
-local_replication_endpoint_error_test_() ->
+t_ddoc_delete_missing_db(_DbName) ->
+ ok = delete_old_rep_ddoc(<<"someotherdb">>).
+
+replicator_can_update_docs_test_() ->
{
- foreach,
- fun() ->
- meck:expect(
- config,
- get,
- fun(_, _, Default) -> Default end
- )
- end,
- fun(_) -> meck:unload() end,
- [
- t_error_on_local_endpoint()
- ]
+ setup,
+ fun test_util:start_couch/0,
+ fun test_util:stop_couch/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(t_remove_state_fields),
+ ?TDEF_FE(t_update_doc_completed),
+ ?TDEF_FE(t_update_failed),
+ ?TDEF_FE(t_update_triggered),
+ ?TDEF_FE(t_update_error)
+ ]
+ }
}.
-t_error_on_local_endpoint() ->
- ?_test(begin
- RepDoc =
+t_remove_state_fields(DbName) ->
+ DocId = <<"doc1">>,
+ Doc = #doc{
+ id = DocId,
+ body = {[{<<"_replication_state">>, <<"triggered">>}]}
+ },
+ save_rep_doc(DbName, Doc),
+ remove_state_fields(DbName, DocId),
+ {ok, Doc2} = open_rep_doc(DbName, DocId),
+ ?assertEqual({[]}, Doc2#doc.body).
+
+t_update_doc_completed(DbName) ->
+ DocId = <<"doc1">>,
+ Doc = #doc{
+ id = DocId,
+ body = {[{<<"_replication_state">>, <<"triggered">>}]}
+ },
+ save_rep_doc(DbName, Doc),
+ update_doc_completed(DbName, DocId, [{<<"foo">>, 1}]),
+ {ok, Doc2} = open_rep_doc(DbName, DocId),
+ {Props} = Doc2#doc.body,
+ State = get_value(<<"_replication_state">>, Props),
+ ?assertEqual(<<"completed">>, State),
+ Stats = get_value(<<"_replication_stats">>, Props),
+ ?assertEqual({[{<<"foo">>, 1}]}, Stats).
+
+t_update_failed(DbName) ->
+ DocId = <<"doc1">>,
+ Doc = #doc{
+ id = DocId,
+ body =
{[
- {<<"_id">>, <<"someid">>},
- {<<"source">>, <<"localdb">>},
- {<<"target">>, <<"http://somehost.local/tgt">>}
- ]},
- Expect = local_endpoints_not_supported,
- ?assertThrow({bad_rep_doc, Expect}, parse_rep_doc_without_id(RepDoc))
- end).
+ {<<"_replication_state">>, <<"triggered">>},
+ {<<"_replication_stats">>, {[{<<"foo">>, 1}]}}
+ ]}
+ },
+ save_rep_doc(DbName, Doc),
+ Error = {error, {foo, bar}},
+ update_failed(DbName, DocId, Error),
+ {ok, Doc2} = open_rep_doc(DbName, DocId),
+ {Props} = Doc2#doc.body,
+ State = get_value(<<"_replication_state">>, Props),
+ ?assertEqual(<<"failed">>, State),
+ Reason = get_value(<<"_replication_state_reason">>, Props),
+ ?assertEqual(<<"{foo,bar}">>, Reason),
+ % stats should have been cleared
+ Stats = get_value(<<"_replication_stats">>, Props),
+ ?assertEqual(undefined, Stats).
+
+t_update_triggered(DbName) ->
+ DocId = <<"doc1">>,
+ Doc = #doc{
+ id = DocId,
+ body = {[{}]}
+ },
+ save_rep_doc(DbName, Doc),
+ Rep = #rep{db_name = DbName, doc_id = DocId},
+ update_triggered(Rep, {"123", "+continuous"}),
+ {ok, Doc2} = open_rep_doc(DbName, DocId),
+ {Props} = Doc2#doc.body,
+ State = get_value(<<"_replication_state">>, Props),
+ ?assertEqual(<<"triggered">>, State),
+ Stats = get_value(<<"_replication_stats">>, Props),
+ ?assertEqual(undefined, Stats),
+ RepId = get_value(<<"_replication_id">>, Props),
+ ?assertEqual(<<"123+continuous">>, RepId).
+
+t_update_error(DbName) ->
+ DocId = <<"doc1">>,
+ Doc = #doc{
+ id = DocId,
+ body = {[{}]}
+ },
+ save_rep_doc(DbName, Doc),
+ Rep = #rep{db_name = DbName, doc_id = DocId, id = null},
+ Error = {error, foo},
+ update_error(Rep, Error),
+ {ok, Doc2} = open_rep_doc(DbName, DocId),
+ {Props} = Doc2#doc.body,
+ State = get_value(<<"_replication_state">>, Props),
+ ?assertEqual(<<"error">>, State),
+ Stats = get_value(<<"_replication_stats">>, Props),
+ ?assertEqual(undefined, Stats),
+ RepId = get_value(<<"_replication_id">>, Props),
+ ?assertEqual(null, RepId).
-endif.
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 939070b95..86fe1f26e 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -196,10 +196,10 @@ winning_revs_generates_new_id(_) ->
{<<"source">>, <<"http://foo.example.bar">>},
{<<"target">>, <<"http://bar.example.foo">>}
],
- Rep1 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc1}),
+ Rep1 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc1}),
Id1 = replication_id(Rep1),
RepDoc2 = RepDoc1 ++ [{<<"winning_revs_only">>, true}],
- Rep2 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc2}),
+ Rep2 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc2}),
Id2 = replication_id(Rep2),
?assertNotEqual(Id1, Id2).
@@ -208,10 +208,10 @@ winning_revs_false_same_as_undefined(_) ->
{<<"source">>, <<"http://foo.example.bar">>},
{<<"target">>, <<"http://bar.example.foo">>}
],
- Rep1 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc1}),
+ Rep1 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc1}),
Id1 = replication_id(Rep1),
RepDoc2 = RepDoc1 ++ [{<<"winning_revs_only">>, false}],
- Rep2 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc2}),
+ Rep2 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc2}),
Id2 = replication_id(Rep2),
?assertEqual(Id1, Id2).
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
deleted file mode 100644
index 4f4369075..000000000
--- a/src/couch_replicator/src/couch_replicator_js_functions.hrl
+++ /dev/null
@@ -1,183 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--define(REP_DB_DOC_VALIDATE_FUN, <<"
- function(newDoc, oldDoc, userCtx) {
- function reportError(error_msg) {
- log('Error writing document `' + newDoc._id +
- '\\' to the replicator database: ' + error_msg);
- throw({forbidden: error_msg});
- }
-
- function validateEndpoint(endpoint, fieldName) {
- if ((typeof endpoint !== 'string') &&
- ((typeof endpoint !== 'object') || (endpoint === null))) {
-
- reportError('The `' + fieldName + '\\' property must exist' +
- ' and be either a string or an object.');
- }
-
- if (typeof endpoint === 'object') {
- if ((typeof endpoint.url !== 'string') || !endpoint.url) {
- reportError('The url property must exist in the `' +
- fieldName + '\\' field and must be a non-empty string.');
- }
-
- if ((typeof endpoint.auth !== 'undefined') &&
- ((typeof endpoint.auth !== 'object') ||
- endpoint.auth === null)) {
-
- reportError('`' + fieldName +
- '.auth\\' must be a non-null object.');
- }
-
- if ((typeof endpoint.headers !== 'undefined') &&
- ((typeof endpoint.headers !== 'object') ||
- endpoint.headers === null)) {
-
- reportError('`' + fieldName +
- '.headers\\' must be a non-null object.');
- }
- }
- }
-
- var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
- var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
-
- if (isReplicator) {
- // Always let replicator update the replication document
- return;
- }
-
- if (newDoc._replication_state === 'failed') {
- // Skip validation in case when we update the document with the
- // failed state. In this case it might be malformed. However,
- // replicator will not pay attention to failed documents so this
- // is safe.
- return;
- }
-
- if (!newDoc._deleted) {
- validateEndpoint(newDoc.source, 'source');
- validateEndpoint(newDoc.target, 'target');
-
- if ((typeof newDoc.create_target !== 'undefined') &&
- (typeof newDoc.create_target !== 'boolean')) {
-
- reportError('The `create_target\\' field must be a boolean.');
- }
-
- if ((typeof newDoc.winning_revs_only !== 'undefined') &&
- (typeof newDoc.winning_revs_only !== 'boolean')) {
-
- reportError('The `winning_revs_only\\' field must be a boolean.');
- }
-
- if ((typeof newDoc.continuous !== 'undefined') &&
- (typeof newDoc.continuous !== 'boolean')) {
-
- reportError('The `continuous\\' field must be a boolean.');
- }
-
- if ((typeof newDoc.doc_ids !== 'undefined') &&
- !isArray(newDoc.doc_ids)) {
-
- reportError('The `doc_ids\\' field must be an array of strings.');
- }
-
- if ((typeof newDoc.selector !== 'undefined') &&
- (typeof newDoc.selector !== 'object')) {
-
- reportError('The `selector\\' field must be an object.');
- }
-
- if ((typeof newDoc.filter !== 'undefined') &&
- ((typeof newDoc.filter !== 'string') || !newDoc.filter)) {
-
- reportError('The `filter\\' field must be a non-empty string.');
- }
-
- if ((typeof newDoc.doc_ids !== 'undefined') &&
- (typeof newDoc.selector !== 'undefined')) {
-
- reportError('`doc_ids\\' field is incompatible with `selector\\'.');
- }
-
- if ( ((typeof newDoc.doc_ids !== 'undefined') ||
- (typeof newDoc.selector !== 'undefined')) &&
- (typeof newDoc.filter !== 'undefined') ) {
-
- reportError('`filter\\' field is incompatible with `selector\\' and `doc_ids\\'.');
- }
-
- if ((typeof newDoc.query_params !== 'undefined') &&
- ((typeof newDoc.query_params !== 'object') ||
- newDoc.query_params === null)) {
-
- reportError('The `query_params\\' field must be an object.');
- }
-
- if (newDoc.user_ctx) {
- var user_ctx = newDoc.user_ctx;
-
- if ((typeof user_ctx !== 'object') || (user_ctx === null)) {
- reportError('The `user_ctx\\' property must be a ' +
- 'non-null object.');
- }
-
- if (!(user_ctx.name === null ||
- (typeof user_ctx.name === 'undefined') ||
- ((typeof user_ctx.name === 'string') &&
- user_ctx.name.length > 0))) {
-
- reportError('The `user_ctx.name\\' property must be a ' +
- 'non-empty string or null.');
- }
-
- if (!isAdmin && (user_ctx.name !== userCtx.name)) {
- reportError('The given `user_ctx.name\\' is not valid');
- }
-
- if (user_ctx.roles && !isArray(user_ctx.roles)) {
- reportError('The `user_ctx.roles\\' property must be ' +
- 'an array of strings.');
- }
-
- if (!isAdmin && user_ctx.roles) {
- for (var i = 0; i < user_ctx.roles.length; i++) {
- var role = user_ctx.roles[i];
-
- if (typeof role !== 'string' || role.length === 0) {
- reportError('Roles must be non-empty strings.');
- }
- if (userCtx.roles.indexOf(role) === -1) {
- reportError('Invalid role (`' + role +
- '\\') in the `user_ctx\\'');
- }
- }
- }
- } else {
- if (!isAdmin) {
- reportError('The `user_ctx\\' property is missing (it is ' +
- 'optional for admins only).');
- }
- }
- } else {
- if (!isAdmin) {
- if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) {
- reportError('Replication documents can only be deleted by ' +
- 'admins or by the users who created them.');
- }
- }
- }
- }
-">>).
diff --git a/src/couch_replicator/src/couch_replicator_parse.erl b/src/couch_replicator/src/couch_replicator_parse.erl
new file mode 100644
index 000000000..2359bbd2f
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_parse.erl
@@ -0,0 +1,750 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_parse).
+
+-export([
+ parse_rep_doc/1,
+ parse_rep_doc/2,
+ parse_rep_db/3,
+ parse_rep_doc_without_id/1,
+ parse_rep_doc_without_id/2
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("ibrowse/include/ibrowse.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+-define(DEFAULT_SOCK_OPTS, [{keepalive, true}, {nodelay, false}]).
+-define(VALID_SOCK_OPTS, [
+ buffer,
+ keepalive,
+ nodelay,
+ priority,
+ recbuf,
+ sndbuf
+]).
+-define(VALID_PROTOCOLS, #{
+ endpoint => [http, https],
+ proxy => [http, https, socks5]
+}).
+
+%% erlfmt-ignore
+default_options() ->
+ [
+ {connection_timeout, cfg_int("connection_timeout", 30000)},
+ {retries, cfg_int("retries_per_request", 5)},
+ {http_connections, cfg_int("http_connections", 20)},
+ {worker_batch_size, cfg_int("worker_batch_size", 500)},
+ {worker_processes, cfg_int("worker_processes", 4)},
+ {checkpoint_interval, cfg_int("checkpoint_interval", 30000)},
+ {use_checkpoints, cfg_boolean("use_checkpoints", true)},
+ {use_bulk_get, cfg_boolean("use_bulk_get", true)},
+ {socket_options, cfg_sock_opts()}
+ ].
+
+% Note: parse_rep_doc can handle filtered replications. During parsing of the
+% replication doc it will make possibly remote http requests to the source
+% database. If failure or parsing of filter docs fails, parse_doc throws a
+% {filter_fetch_error, Error} exception. This exception should be considered
+% transient in respect to the contents of the document itself, since it depends
+% on netowrk availability of the source db and other factors.
+-spec parse_rep_doc({[_]}) -> #rep{}.
+parse_rep_doc(RepDoc) ->
+ {ok, Rep} =
+ try
+ parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
+ catch
+ throw:{error, Reason} ->
+ throw({bad_rep_doc, Reason});
+ throw:{filter_fetch_error, Reason} ->
+ throw({filter_fetch_error, Reason});
+ Tag:Err ->
+ throw({bad_rep_doc, to_binary({Tag, Err})})
+ end,
+ Rep.
+
+-spec parse_rep_doc_without_id({[_]}) -> #rep{}.
+parse_rep_doc_without_id(RepDoc) ->
+ {ok, Rep} =
+ try
+ parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
+ catch
+ throw:{error, Reason} ->
+ throw({bad_rep_doc, Reason});
+ Tag:Err ->
+ throw({bad_rep_doc, to_binary({Tag, Err})})
+ end,
+ Rep.
+
+-spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
+parse_rep_doc(Doc, UserCtx) ->
+ {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
+ Cancel = get_value(cancel, Rep#rep.options, false),
+ Id = get_value(id, Rep#rep.options, nil),
+ case {Cancel, Id} of
+ {true, nil} ->
+ % Cancel request with no id, must parse id out of body contents
+ {ok, update_rep_id(Rep)};
+ {true, Id} ->
+ % Cancel request with an id specified, so do not parse id from body
+ {ok, Rep};
+ {false, _Id} ->
+ % Not a cancel request, regular replication doc
+ {ok, update_rep_id(Rep)}
+ end.
+
+-spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.
+parse_rep_doc_without_id({Props}, UserCtx) ->
+ {SrcProxy, TgtProxy} = parse_proxy_settings(Props),
+ Opts = make_options(Props),
+ case
+ get_value(cancel, Opts, false) andalso
+ (get_value(id, Opts, nil) =/= nil)
+ of
+ true ->
+ {ok, #rep{options = Opts, user_ctx = UserCtx}};
+ false ->
+ Source = parse_rep_db(get_value(<<"source">>, Props), SrcProxy, Opts),
+ Target = parse_rep_db(get_value(<<"target">>, Props), TgtProxy, Opts),
+ {Type, View} =
+ case couch_replicator_filters:view_type(Props, Opts) of
+ {error, Error} ->
+ throw({bad_request, Error});
+ Result ->
+ Result
+ end,
+ Rep = #rep{
+ source = Source,
+ target = Target,
+ options = Opts,
+ user_ctx = UserCtx,
+ type = Type,
+ view = View,
+ doc_id = get_value(<<"_id">>, Props, null)
+ },
+ % Check if can parse filter code, if not throw exception
+ case couch_replicator_filters:parse(Opts) of
+ {error, FilterError} ->
+ throw({error, FilterError});
+ {ok, _Filter} ->
+ ok
+ end,
+ {ok, Rep}
+ end.
+
+parse_proxy_settings(Props) when is_list(Props) ->
+ Proxy = get_value(<<"proxy">>, Props, <<>>),
+ SrcProxy = get_value(<<"source_proxy">>, Props, <<>>),
+ TgtProxy = get_value(<<"target_proxy">>, Props, <<>>),
+
+ case Proxy =/= <<>> of
+ true when SrcProxy =/= <<>> ->
+ Error = "`proxy` is mutually exclusive with `source_proxy`",
+ throw({bad_request, Error});
+ true when TgtProxy =/= <<>> ->
+ Error = "`proxy` is mutually exclusive with `target_proxy`",
+ throw({bad_request, Error});
+ true ->
+ {Proxy, Proxy};
+ false ->
+ {SrcProxy, TgtProxy}
+ end.
+
+% Update a #rep{} record with a replication_id. Calculating the id might involve
+% fetching a filter from the source db, and so it could fail intermetently.
+% In case of a failure to fetch the filter this function will throw a
+% `{filter_fetch_error, Reason} exception.
+update_rep_id(Rep) ->
+ RepId = couch_replicator_ids:replication_id(Rep),
+ Rep#rep{id = RepId}.
+
+-spec rep_user_ctx({[_]}) -> #user_ctx{}.
+rep_user_ctx({RepDoc}) ->
+ case get_json_value(<<"user_ctx">>, RepDoc) of
+ undefined ->
+ #user_ctx{};
+ {UserCtx} ->
+ #user_ctx{
+ name = get_json_value(<<"name">>, UserCtx, null),
+ roles = get_json_value(<<"roles">>, UserCtx, [])
+ }
+ end.
+
+-spec parse_rep_db({[_]} | binary(), [_] | binary(), [_]) -> #httpdb{} | no_return().
+parse_rep_db({Props}, Proxy, Options) ->
+ Url0 = get_value(<<"url">>, Props),
+ ok = check_url(Url0, endpoint),
+ Url = maybe_add_trailing_slash(Url0),
+ ProxyParams = parse_proxy_params(Proxy),
+ ProxyURL =
+ case ProxyParams of
+ [] -> undefined;
+ _ -> binary_to_list(Proxy)
+ end,
+ {AuthProps} = get_value(<<"auth">>, Props, {[]}),
+ {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
+ Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
+ DefaultHeaders = (#httpdb{})#httpdb.headers,
+ HttpDb = #httpdb{
+ url = Url,
+ auth_props = AuthProps,
+ headers = lists:ukeymerge(1, Headers, DefaultHeaders),
+ ibrowse_options = lists:keysort(
+ 1,
+ [
+ {socket_options, get_value(socket_options, Options)}
+ | ProxyParams ++ ssl_params(Url)
+ ]
+ ),
+ timeout = get_value(connection_timeout, Options),
+ http_connections = get_value(http_connections, Options),
+ retries = get_value(retries, Options),
+ proxy_url = ProxyURL
+ },
+ couch_replicator_utils:normalize_basic_auth(HttpDb);
+parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
+ parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
+ parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+parse_rep_db(<<_/binary>>, _Proxy, _Options) ->
+ throw({error, local_endpoints_not_supported});
+parse_rep_db(undefined, _Proxy, _Options) ->
+ throw({error, <<"Missing replicator database">>}).
+
+check_url(<<_/binary>> = Url, Type) when is_atom(Type) ->
+ case ibrowse_lib:parse_url(?b2l(Url)) of
+ #url{protocol = Protocol} ->
+ check_protocol(Protocol, Type);
+ {error, _} ->
+ BinType = atom_to_binary(Type),
+ throw({error, <<BinType/binary, " has an invalid url">>})
+ end;
+check_url(_, Type) when is_atom(Type) ->
+ BinType = atom_to_binary(Type),
+ throw({error, <<BinType/binary, " has an invalid url">>}).
+
+check_protocol(Protocol, Type) ->
+ CfgName = "valid_" ++ atom_to_list(Type) ++ "_protocols",
+ Allowed = cfg_atoms(CfgName, maps:get(Type, ?VALID_PROTOCOLS)),
+ check_protocol(Protocol, Type, Allowed).
+
+check_protocol(Protocol, Type, Allowed) ->
+ case lists:member(Protocol, Allowed) of
+ true ->
+ ok;
+ false ->
+ BinType = atom_to_binary(Type),
+ throw({error, <<BinType/binary, " has an invalid url">>})
+ end.
+
+-spec maybe_add_trailing_slash(binary() | list()) -> list().
+maybe_add_trailing_slash(Url) when is_binary(Url) ->
+ maybe_add_trailing_slash(?b2l(Url));
+maybe_add_trailing_slash(Url) ->
+ case lists:member($?, Url) of
+ true ->
+ % skip if there are query params
+ Url;
+ false ->
+ case lists:last(Url) of
+ $/ ->
+ Url;
+ _ ->
+ Url ++ "/"
+ end
+ end.
+
+-spec make_options([_]) -> [_].
+make_options(Props) ->
+ Options0 = lists:ukeysort(1, convert_options(Props)),
+ Options = check_options(Options0),
+ Defaults = lists:keysort(1, default_options()),
+ lists:ukeymerge(1, Options, Defaults).
+
+cfg_int(Var, Default) ->
+ config:get_integer("replicator", Var, Default).
+
+cfg_boolean(Var, Default) ->
+ config:get_boolean("replicator", Var, Default).
+
+cfg_atoms(Cfg, Default) ->
+ case cfg(Cfg) of
+ undefined ->
+ Default;
+ V when is_list(V) ->
+ [list_to_atom(string:strip(S)) || S <- string:split(V, ",")]
+ end.
+
+cfg_sock_opts() ->
+ CfgTerm = cfg("socket_options"),
+ parse_sock_opts(CfgTerm, ?DEFAULT_SOCK_OPTS, ?VALID_SOCK_OPTS).
+
+cfg(Var) ->
+ config:get("replicator", Var).
+
+parse_sock_opts(undefined, Defaults, _) ->
+ Defaults;
+parse_sock_opts(Term, _Defaults, ValidOpts) ->
+ SocketOptions =
+ case couch_util:parse_term(Term) of
+ {ok, Opts} -> Opts;
+ {error, _Error} -> []
+ end,
+ Fun = fun({K, _}) -> lists:member(K, ValidOpts) end,
+ lists:filtermap(Fun, SocketOptions).
+
+sock_opts(CfgTerm) ->
+ ValidOpts = cfg_atoms("valid_socket_options", ?VALID_SOCK_OPTS),
+ parse_sock_opts(CfgTerm, ?DEFAULT_SOCK_OPTS, ValidOpts).
+
+-spec convert_options([_]) -> [_].
+convert_options([]) ->
+ [];
+convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V) ->
+ throw({bad_request, <<"parameter `cancel` must be a boolean">>});
+convert_options([{<<"cancel">>, V} | R]) ->
+ [{cancel, V} | convert_options(R)];
+convert_options([{IdOpt, V} | R]) when
+ IdOpt =:= <<"_local_id">>;
+ IdOpt =:= <<"replication_id">>;
+ IdOpt =:= <<"id">>
+->
+ [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
+convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V) ->
+ throw({bad_request, <<"parameter `create_target` must be a boolean">>});
+convert_options([{<<"create_target">>, V} | R]) ->
+ [{create_target, V} | convert_options(R)];
+convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) ->
+ throw({bad_request, <<"parameter `create_target_params` must be a JSON object">>});
+convert_options([{<<"create_target_params">>, V} | R]) ->
+ [{create_target_params, V} | convert_options(R)];
+convert_options([{<<"winning_revs_only">>, V} | _R]) when not is_boolean(V) ->
+ throw({bad_request, <<"parameter `winning_revs_only` must be a boolean">>});
+convert_options([{<<"winning_revs_only">>, V} | R]) ->
+ [{winning_revs_only, V} | convert_options(R)];
+convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V) ->
+ throw({bad_request, <<"parameter `continuous` must be a boolean">>});
+convert_options([{<<"continuous">>, V} | R]) ->
+ [{continuous, V} | convert_options(R)];
+convert_options([{<<"filter">>, V} | R]) ->
+ [{filter, V} | convert_options(R)];
+convert_options([{<<"query_params">>, V} | _R]) when not is_tuple(V) ->
+ throw({bad_request, <<"parameter `query_params` must be an object">>});
+convert_options([{<<"query_params">>, V} | R]) ->
+ [{query_params, V} | convert_options(R)];
+convert_options([{<<"doc_ids">>, null} | R]) ->
+ convert_options(R);
+convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
+ throw({bad_request, <<"parameter `doc_ids` must be an array">>});
+convert_options([{<<"doc_ids">>, V} | R]) ->
+ % Ensure same behaviour as old replicator: accept a list of percent
+ % encoded doc IDs.
+ DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]),
+ [{doc_ids, DocIds} | convert_options(R)];
+convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
+ throw({bad_request, <<"parameter `selector` must be a JSON object">>});
+convert_options([{<<"selector">>, V} | R]) ->
+ [{selector, V} | convert_options(R)];
+convert_options([{<<"worker_processes">>, V} | R]) ->
+ [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"worker_batch_size">>, V} | R]) ->
+ [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"http_connections">>, V} | R]) ->
+ [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"connection_timeout">>, V} | R]) ->
+ [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"retries_per_request">>, V} | R]) ->
+ [{retries, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"socket_options">>, V} | R]) ->
+ [{socket_options, sock_opts(V)} | convert_options(R)];
+convert_options([{<<"since_seq">>, V} | R]) ->
+ [{since_seq, V} | convert_options(R)];
+convert_options([{<<"use_checkpoints">>, V} | R]) ->
+ [{use_checkpoints, V} | convert_options(R)];
+convert_options([{<<"use_bulk_get">>, V} | _R]) when not is_boolean(V) ->
+ throw({bad_request, <<"parameter `use_bulk_get` must be a boolean">>});
+convert_options([{<<"use_bulk_get">>, V} | R]) ->
+ [{use_bulk_get, V} | convert_options(R)];
+convert_options([{<<"checkpoint_interval">>, V} | R]) ->
+ [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
+% skip unknown option
+convert_options([_ | R]) ->
+ convert_options(R).
+
+-spec check_options([_]) -> [_].
+check_options(Options) ->
+ DocIds = lists:keyfind(doc_ids, 1, Options),
+ Filter = lists:keyfind(filter, 1, Options),
+ Selector = lists:keyfind(selector, 1, Options),
+ case {DocIds, Filter, Selector} of
+ {false, false, false} -> Options;
+ {false, false, _} -> Options;
+ {false, _, false} -> Options;
+ {_, false, false} -> Options;
+ _ -> throw({bad_request, "`doc_ids`,`filter`,`selector` are mutually exclusive"})
+ end.
+
+-spec parse_proxy_params(binary() | [_]) -> [_].
+parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
+ parse_proxy_params(?b2l(ProxyUrl));
+parse_proxy_params([]) ->
+ [];
+parse_proxy_params(ProxyUrl) when is_list(ProxyUrl) ->
+ #url{
+ host = Host,
+ port = Port,
+ username = User,
+ password = Passwd,
+ protocol = Protocol
+ } = ibrowse_lib:parse_url(ProxyUrl),
+ Params =
+ [
+ {proxy_host, Host},
+ {proxy_port, Port}
+ ] ++
+ case is_list(User) andalso is_list(Passwd) of
+ false ->
+ [];
+ true ->
+ [{proxy_user, User}, {proxy_password, Passwd}]
+ end,
+ ok = check_protocol(Protocol, proxy),
+ case Protocol of
+ socks5 ->
+ [proxy_to_socks5(Param) || Param <- Params];
+ _ ->
+ Params
+ end;
+parse_proxy_params(_) ->
+ throw({error, <<"Invalid proxy url">>}).
+
+-spec proxy_to_socks5({atom(), string()}) -> {atom(), string()}.
+proxy_to_socks5({proxy_host, Val}) ->
+ {socks5_host, Val};
+proxy_to_socks5({proxy_port, Val}) ->
+ {socks5_port, Val};
+proxy_to_socks5({proxy_user, Val}) ->
+ {socks5_user, Val};
+proxy_to_socks5({proxy_password, Val}) ->
+ {socks5_password, Val}.
+
+-spec ssl_params([_]) -> [_].
+ssl_params(Url) ->
+ case ibrowse_lib:parse_url(Url) of
+ #url{protocol = https} ->
+ Depth = cfg_int("ssl_certificate_max_depth", 3),
+ VerifyCerts = cfg_boolean("verify_ssl_certificates", false),
+ CertFile = cfg("cert_file"),
+ KeyFile = cfg("key_file"),
+ Password = cfg("password"),
+ SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts)],
+ SslOpts1 =
+ case CertFile /= undefined andalso KeyFile /= undefined of
+ true ->
+ case Password of
+ undefined ->
+ [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
+ _ ->
+ [
+ {certfile, CertFile},
+ {keyfile, KeyFile},
+ {password, Password}
+ ] ++ SslOpts
+ end;
+ false ->
+ SslOpts
+ end,
+ [{is_ssl, true}, {ssl_options, SslOpts1}];
+ #url{protocol = http} ->
+ []
+ end.
+
+-spec ssl_verify_options(true | false) -> [_].
+ssl_verify_options(true) ->
+ CAFile = cfg("ssl_trusted_certificates_file"),
+ [{verify, verify_peer}, {cacertfile, CAFile}];
+ssl_verify_options(false) ->
+ [{verify, verify_none}].
+
+get_value(Key, Props) ->
+ couch_util:get_value(Key, Props).
+
+get_value(Key, Props, Default) ->
+ couch_util:get_value(Key, Props, Default).
+
+to_binary(Val) ->
+ couch_util:to_binary(Val).
+
+get_json_value(Key, Obj) ->
+ couch_replicator_utils:get_json_value(Key, Obj).
+
+get_json_value(Key, Obj, Default) ->
+ couch_replicator_util:get_json_value(Key, Obj, Default).
+
+-ifdef(TEST).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+check_options_pass_values_test() ->
+ ?assertEqual(check_options([]), []),
+ ?assertEqual(check_options([baz, {other, fiz}]), [baz, {other, fiz}]),
+ ?assertEqual(check_options([{doc_ids, x}]), [{doc_ids, x}]),
+ ?assertEqual(check_options([{filter, x}]), [{filter, x}]),
+ ?assertEqual(check_options([{selector, x}]), [{selector, x}]).
+
+check_options_fail_values_test() ->
+ ?assertThrow(
+ {bad_request, _},
+ check_options([{doc_ids, x}, {filter, y}])
+ ),
+ ?assertThrow(
+ {bad_request, _},
+ check_options([{doc_ids, x}, {selector, y}])
+ ),
+ ?assertThrow(
+ {bad_request, _},
+ check_options([{filter, x}, {selector, y}])
+ ),
+ ?assertThrow(
+ {bad_request, _},
+ check_options([{doc_ids, x}, {selector, y}, {filter, z}])
+ ).
+
+check_convert_options_pass_test() ->
+ ?assertEqual([], convert_options([])),
+ ?assertEqual([], convert_options([{<<"random">>, 42}])),
+ ?assertEqual(
+ [{cancel, true}],
+ convert_options([{<<"cancel">>, true}])
+ ),
+ ?assertEqual(
+ [{create_target, true}],
+ convert_options([{<<"create_target">>, true}])
+ ),
+ ?assertEqual(
+ [{winning_revs_only, true}],
+ convert_options([{<<"winning_revs_only">>, true}])
+ ),
+ ?assertEqual(
+ [{continuous, true}],
+ convert_options([{<<"continuous">>, true}])
+ ),
+ ?assertEqual(
+ [{doc_ids, [<<"id">>]}],
+ convert_options([{<<"doc_ids">>, [<<"id">>]}])
+ ),
+ ?assertEqual(
+ [{selector, {key, value}}],
+ convert_options([{<<"selector">>, {key, value}}])
+ ),
+ ?assertEqual(
+ [{query_params, {[{<<"x">>, 1}]}}],
+ convert_options([{<<"query_params">>, {[{<<"x">>, 1}]}}])
+ ).
+
+check_convert_options_fail_test() ->
+ ?assertThrow(
+ {bad_request, _},
+ convert_options([{<<"cancel">>, <<"true">>}])
+ ),
+ ?assertThrow(
+ {bad_request, _},
+ convert_options([{<<"create_target">>, <<"true">>}])
+ ),
+ ?assertThrow(
+ {bad_request, _},
+ convert_options([{<<"winning_revs_only">>, <<"foo">>}])
+ ),
+ ?assertThrow(
+ {bad_request, _},
+ convert_options([{<<"continuous">>, <<"true">>}])
+ ),
+ ?assertThrow(
+ {bad_request, _},
+ convert_options([{<<"doc_ids">>, not_a_list}])
+ ),
+ ?assertThrow(
+ {bad_request, _},
+ convert_options([{<<"selector">>, [{key, value}]}])
+ ),
+ ?assertThrow(
+ {bad_request, _},
+ convert_options([{<<"query_params">>, 42}])
+ ).
+
+local_replication_endpoint_error_test_() ->
+ {
+ foreach,
+ fun meck_config/0,
+ fun(_) -> meck:unload() end,
+ [
+ ?TDEF_FE(t_error_on_local_endpoint),
+ ?TDEF_FE(t_proxy_params_default),
+ ?TDEF_FE(t_parse_db_string),
+ ?TDEF_FE(t_parse_db_url),
+ ?TDEF_FE(t_parse_db_invalid_protocol),
+ ?TDEF_FE(t_parse_proxy_invalid_protocol),
+ ?TDEF_FE(t_parse_sock_opts),
+ ?TDEF_FE(t_parse_sock_opts_invalid)
+ ]
+ }.
+
+meck_config() ->
+ meck:expect(config, get, fun(_, _, Default) -> Default end).
+
+t_error_on_local_endpoint(_) ->
+ RepDoc =
+ {[
+ {<<"_id">>, <<"someid">>},
+ {<<"source">>, <<"localdb">>},
+ {<<"target">>, <<"http://somehost.local/tgt">>}
+ ]},
+ Expect = local_endpoints_not_supported,
+ ?assertThrow({bad_rep_doc, Expect}, parse_rep_doc_without_id(RepDoc)).
+
+t_proxy_params_default(_) ->
+ ?assertEqual(
+ [
+ {proxy_host, "foo.com"},
+ {proxy_port, 443},
+ {proxy_user, "u"},
+ {proxy_password, "p"}
+ ],
+ parse_proxy_params("https://u:p@foo.com")
+ ),
+ ?assertEqual(
+ [
+ {socks5_host, "foo.com"},
+ {socks5_port, 1080},
+ {socks5_user, "u"},
+ {socks5_password, "p"}
+ ],
+ parse_proxy_params("socks5://u:p@foo.com")
+ ).
+
+t_parse_db_string(_) ->
+ ?assertMatch(
+ #httpdb{
+ url = "http://a/",
+ proxy_url = "http://x"
+ },
+ parse_rep_db(<<"http://a">>, <<"http://x">>, [])
+ ),
+ ?assertMatch(
+ #httpdb{
+ url = "https://a/",
+ proxy_url = "https://x"
+ },
+ parse_rep_db(<<"https://a">>, <<"https://x">>, [])
+ ),
+ ?assertThrow({error, _}, parse_rep_db(<<"abc">>, <<"foo">>, [])),
+ ?assertThrow({error, _}, parse_rep_db(undefined, <<"foo">>, [])).
+
+t_parse_db_url(_) ->
+ ?assertMatch(
+ #httpdb{
+ url = "http://a?foo",
+ proxy_url = "http://x"
+ },
+ parse_rep_db({[{<<"url">>, <<"http://a?foo">>}]}, <<"http://x">>, [])
+ ),
+ ?assertMatch(
+ #httpdb{
+ url = "https://a/",
+ proxy_url = "https://x"
+ },
+ parse_rep_db({[{<<"url">>, <<"https://a">>}]}, <<"https://x">>, [])
+ ),
+ PUrl = <<"http://x">>,
+ ?assertThrow({error, _}, parse_rep_db({[{<<"url">>, <<"abc">>}]}, PUrl, [])),
+ ?assertThrow({error, _}, parse_rep_db({[{<<"url">>, <<"httpx://a">>}]}, PUrl, [])),
+ ?assertThrow({error, _}, parse_rep_db({[]}, PUrl, [])).
+
+t_parse_db_invalid_protocol(_) ->
+ MeckFun = fun
+ ("replicator", "valid_endpoint_protocols", _) -> "https";
+ (_, _, Default) -> Default
+ end,
+ meck:expect(config, get, MeckFun),
+ PUrl = <<"http://x">>,
+ ?assertMatch(
+ #httpdb{
+ url = "https://a/",
+ proxy_url = "http://x"
+ },
+ parse_rep_db({[{<<"url">>, <<"https://a">>}]}, PUrl, [])
+ ),
+ ?assertThrow({error, _}, parse_rep_db({[{<<"url">>, <<"http://a">>}]}, PUrl, [])).
+
+t_parse_proxy_invalid_protocol(_) ->
+ MeckFun = fun
+ ("replicator", "valid_proxy_protocols", _) -> "socks5";
+ (_, _, Default) -> Default
+ end,
+ meck:expect(config, get, MeckFun),
+ Url = <<"http://a">>,
+ ?assertMatch(
+ #httpdb{
+ url = "https://a/",
+ proxy_url = "socks5://x"
+ },
+ parse_rep_db({[{<<"url">>, <<"https://a">>}]}, <<"socks5://x">>, [])
+ ),
+ ?assertThrow({error, _}, parse_rep_db({[{<<"url">>, Url}]}, <<"http://x">>, [])).
+
+t_parse_sock_opts(_) ->
+ Allowed = "priority, sndbuf",
+ MeckFun = fun
+ ("replicator", "valid_socket_options", _) -> Allowed;
+ (_, _, Default) -> Default
+ end,
+ meck:expect(config, get, MeckFun),
+ RepDoc =
+ {[
+ {<<"source">>, <<"http://a">>},
+ {<<"target">>, <<"http://b/">>},
+ {<<"socket_options">>, <<"[{priority, 3}, {potato, true}, {sndbuf, 10000}]">>}
+ ]},
+ Rep = parse_rep_doc_without_id(RepDoc),
+ ?assertMatch(
+ #rep{
+ source = #httpdb{url = "http://a/"},
+ target = #httpdb{url = "http://b/"},
+ options = [{_, _} | _]
+ },
+ Rep
+ ),
+ Options = Rep#rep.options,
+ ?assertEqual(
+ [
+ {checkpoint_interval, 30000},
+ {connection_timeout, 30000},
+ {http_connections, 20},
+ {retries, 5},
+ {socket_options, [
+ {priority, 3},
+ {sndbuf, 10000}
+ ]},
+ {use_bulk_get, true},
+ {use_checkpoints, true},
+ {worker_batch_size, 500},
+ {worker_processes, 4}
+ ],
+ Options
+ ).
+
+t_parse_sock_opts_invalid(_) ->
+ ?assertEqual([], parse_sock_opts(<<"<}garbage]][">>, [], [])).
+
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 1ba933a5e..1d3e70c5a 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -1118,7 +1118,7 @@ log_replication_start(#rep_state{rep_details = Rep} = RepState) ->
-ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
replication_start_error_test() ->
?assertEqual(
@@ -1144,13 +1144,26 @@ replication_start_error_test() ->
replication_start_error({http_request_failed, "GET", "http://x/y", {error, {code, 503}}})
).
-scheduler_job_format_status_test() ->
+format_status_test_() ->
+ {
+ foreach,
+ fun meck_config/0,
+ fun(_) -> meck:unload() end,
+ [
+ ?TDEF_FE(t_scheduler_job_format_status)
+ ]
+ }.
+
+meck_config() ->
+ meck:expect(config, get, fun(_, _, Default) -> Default end).
+
+t_scheduler_job_format_status(_) ->
Source = <<"http://u:p@h1/d1">>,
Target = <<"http://u:p@h2/d2">>,
Rep = #rep{
id = {"base", "+ext"},
- source = couch_replicator_docs:parse_rep_db(Source, [], []),
- target = couch_replicator_docs:parse_rep_db(Target, [], []),
+ source = couch_replicator_parse:parse_rep_db(Source, [], []),
+ target = couch_replicator_parse:parse_rep_db(Target, [], []),
options = [{create_target, true}],
doc_id = <<"mydoc">>,
db_name = <<"mydb">>
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index e4a2cd12f..40d188516 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -13,7 +13,6 @@
-module(couch_replicator_utils).
-export([
- parse_rep_doc/2,
replication_id/2,
sum_stats/2,
is_deleted/1,
@@ -95,9 +94,6 @@ replication_id(Rep, Version) ->
sum_stats(S1, S2) ->
couch_replicator_stats:sum_stats(S1, S2).
-parse_rep_doc(Props, UserCtx) ->
- couch_replicator_docs:parse_rep_doc(Props, UserCtx).
-
-spec iso8601(erlang:timestamp()) -> binary().
iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
{{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
@@ -282,7 +278,7 @@ seq_encode(Seq) ->
-ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
remove_basic_auth_from_headers_test_() ->
[
@@ -351,7 +347,7 @@ normalize_rep_test_() ->
{<<"doc_ids">>, [<<"a">>, <<"c">>, <<"b">>]},
{<<"other_field">>, <<"some_value">>}
]},
- Rep1 = couch_replicator_docs:parse_rep_doc_without_id(EJson1),
+ Rep1 = couch_replicator_parse:parse_rep_doc_without_id(EJson1),
EJson2 =
{[
{<<"other_field">>, <<"unrelated">>},
@@ -360,7 +356,7 @@ normalize_rep_test_() ->
{<<"doc_ids">>, [<<"c">>, <<"a">>, <<"b">>]},
{<<"other_field2">>, <<"unrelated2">>}
]},
- Rep2 = couch_replicator_docs:parse_rep_doc_without_id(EJson2),
+ Rep2 = couch_replicator_parse:parse_rep_doc_without_id(EJson2),
?assertEqual(normalize_rep(Rep1), normalize_rep(Rep2))
end)
}.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
index 2d5ef96b1..df8074f1f 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
@@ -304,7 +304,7 @@ replicate(Source, Target) ->
{<<"target">>, db_url(Target)},
{<<"continuous">>, true}
]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+ {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
ok = couch_replicator_scheduler:add_job(Rep),
couch_replicator_scheduler:reschedule(),
Pid = couch_replicator_test_helper:get_pid(Rep#rep.id),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
index 6bdb4ecb2..7ba6bc69d 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -226,7 +226,7 @@ replicate(Source, Target) ->
% Low connection timeout so _changes feed gets restarted quicker
{<<"connection_timeout">>, 3000}
]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+ {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
ok = couch_replicator_scheduler:add_job(Rep),
couch_replicator_scheduler:reschedule(),
{ok, Rep#rep.id}.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
index 3468cda73..758c44f2b 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
@@ -50,7 +50,7 @@ parse_rep_doc_without_proxy(_) ->
{<<"source">>, <<"http://unproxied.com">>},
{<<"target">>, <<"http://otherunproxied.com">>}
]},
- Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc),
+ Rep = couch_replicator_parse:parse_rep_doc(NoProxyDoc),
?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined),
?assertEqual((Rep#rep.target)#httpdb.proxy_url, undefined).
@@ -62,7 +62,7 @@ parse_rep_doc_with_proxy(_) ->
{<<"target">>, <<"http://otherunproxied.com">>},
{<<"proxy">>, ProxyURL}
]},
- Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+ Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc),
?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)),
?assertEqual((Rep#rep.target)#httpdb.proxy_url, binary_to_list(ProxyURL)).
@@ -76,7 +76,7 @@ parse_rep_source_target_proxy(_) ->
{<<"source_proxy">>, SrcProxyURL},
{<<"target_proxy">>, TgtProxyURL}
]},
- Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+ Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc),
?assertEqual(
(Rep#rep.source)#httpdb.proxy_url,
binary_to_list(SrcProxyURL)
@@ -96,7 +96,7 @@ mutually_exclusive_proxy_and_source_proxy(_) ->
]},
?assertThrow(
{bad_rep_doc, _},
- couch_replicator_docs:parse_rep_doc(ProxyDoc)
+ couch_replicator_parse:parse_rep_doc(ProxyDoc)
).
mutually_exclusive_proxy_and_target_proxy(_) ->
@@ -109,5 +109,5 @@ mutually_exclusive_proxy_and_target_proxy(_) ->
]},
?assertThrow(
{bad_rep_doc, _},
- couch_replicator_docs:parse_rep_doc(ProxyDoc)
+ couch_replicator_parse:parse_rep_doc(ProxyDoc)
).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
index 04c665af5..f413e5cf4 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
@@ -214,7 +214,7 @@ replicate(Source, Target) ->
{<<"target">>, db_url(Target)},
{<<"continuous">>, true}
]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+ {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
ok = couch_replicator_scheduler:add_job(Rep),
couch_replicator_scheduler:reschedule(),
Pid = couch_replicator_test_helper:get_pid(Rep#rep.id),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
index f30bdb1cd..f862527f4 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
@@ -173,7 +173,7 @@ replicate(Source, Target) ->
).
replicate({[_ | _]} = RepObject) ->
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+ {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
ok = couch_replicator_scheduler:add_job(Rep),
couch_replicator_scheduler:reschedule(),
Pid = get_pid(Rep#rep.id),
diff --git a/src/docs/src/config/replicator.rst b/src/docs/src/config/replicator.rst
index 092711450..63caca88e 100644
--- a/src/docs/src/config/replicator.rst
+++ b/src/docs/src/config/replicator.rst
@@ -152,6 +152,40 @@ Replicator Database Configuration
.. _inet: http://www.erlang.org/doc/man/inet.html#setopts-2
+ .. config:option:: valid_socket_options :: Erlang socket options
+
+ .. versionadded:: 3.3
+
+ Valid socket options. Options not in this list are ignored. Most of
+ those options are low level and setting some of them may lead to
+ unintended or unpredictable behavior. See `inet`_ Erlang docs for the
+ full list of options::
+
+ [replicator]
+ valid_socket_options = buffer,keepalive,nodelay,priority,recbuf,sndbuf
+
+ .. _inet: http://www.erlang.org/doc/man/inet.html#setopts-2
+
+ .. config:option:: valid_endpoint_protocols :: Replicator endpoint protocols
+
+ .. versionadded:: 3.3
+
+ Valid replication endpoint protocols. Replication jobs with endpoint
+ urls not in this list will fail to run::
+
+ [replicator]
+ valid_endpoint_protocols = http,https
+
+ .. config:option:: valid_proxy_protocols :: Replicator proxy protocols
+
+ .. versionadded:: 3.3
+
+ Valid replication proxy protocols. Replication jobs with proxy
+ urls not in this list will fail to run::
+
+ [replicator]
+ valid_proxy_protocols = http,https,socks5
+
.. config:option:: checkpoint_interval :: Replication checkpoint interval
.. versionadded:: 1.6
diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl
index 5a60dcb32..b7b9e5972 100644
--- a/src/fabric/src/fabric_doc_update.erl
+++ b/src/fabric/src/fabric_doc_update.erl
@@ -114,19 +114,29 @@ handle_message({request_entity_too_large, Entity}, _, _) ->
throw({request_entity_too_large, Entity}).
before_doc_update(DbName, Docs, Opts) ->
+ % Use the same pattern as in couch_db:validate_doc_update/3. If the document was already
+ % checked during the interactive edit we don't want to spend time in the internal replicator
+ % revalidating everything.
+ UpdateType =
+ case get(io_priority) of
+ {internal_repl, _} ->
+ ?REPLICATED_CHANGES;
+ _ ->
+ ?INTERACTIVE_EDIT
+ end,
case {fabric_util:is_replicator_db(DbName), fabric_util:is_users_db(DbName)} of
{true, _} ->
%% cluster db is expensive to create so we only do it if we have to
Db = fabric_util:open_cluster_db(DbName, Opts),
[
- couch_replicator_docs:before_doc_update(Doc, Db, ?REPLICATED_CHANGES)
+ couch_replicator_docs:before_doc_update(Doc, Db, UpdateType)
|| Doc <- Docs
];
{_, true} ->
%% cluster db is expensive to create so we only do it if we have to
Db = fabric_util:open_cluster_db(DbName, Opts),
[
- couch_users_db:before_doc_update(Doc, Db, ?INTERACTIVE_EDIT)
+ couch_users_db:before_doc_update(Doc, Db, UpdateType)
|| Doc <- Docs
];
_ ->