authorNick Vatamaniuc <>2019-07-16 12:57:13 -0400
committerNick Vatamaniuc <>2019-07-31 18:59:02 -0400
commitbab8763c57ff6ca962bbb5f06e1dba39f452dcb5 (patch)
parent9ff8fa1fad512b3ecfa867db6a9c82e16446f8b0 (diff)
FDB Replicator WIP
21 files changed, 990 insertions, 1326 deletions
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index 4d32c03c5..7326bca63 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -403,20 +403,6 @@ maybe_log(_HttpReq, #httpd_resp{should_log = false}) ->
-%% HACK: replication currently handles two forms of input, #db{} style
-%% and #http_db style. We need a third that makes use of fabric. #db{}
-%% works fine for replicating the dbs and nodes database because they
-%% aren't sharded. So for now when a local db is specified as the source or
-%% the target, it's hacked to make it a full url and treated as a remote.
-possibly_hack(#httpd{path_parts=[<<"_replicate">>]}=Req) ->
- {Props0} = chttpd:json_body_obj(Req),
- Props1 = fix_uri(Req, Props0, <<"source">>),
- Props2 = fix_uri(Req, Props1, <<"target">>),
- put(post_body, {Props2}),
- Req;
-possibly_hack(Req) ->
- Req.
check_request_uri_length(Uri) ->
check_request_uri_length(Uri, config:get("httpd", "max_uri_length")).
@@ -439,53 +425,6 @@ check_url_encoding([$% | _]) ->
check_url_encoding([_ | Rest]) ->
-fix_uri(Req, Props, Type) ->
- case replication_uri(Type, Props) of
- undefined ->
- Props;
- Uri0 ->
- case is_http(Uri0) of
- true ->
- Props;
- false ->
- Uri = make_uri(Req, quote(Uri0)),
- [{Type,Uri}|proplists:delete(Type,Props)]
- end
- end.
-replication_uri(Type, PostProps) ->
- case couch_util:get_value(Type, PostProps) of
- {Props} ->
- couch_util:get_value(<<"url">>, Props);
- Else ->
- Else
- end.
-is_http(<<"http://", _/binary>>) ->
- true;
-is_http(<<"https://", _/binary>>) ->
- true;
-is_http(_) ->
- false.
-make_uri(Req, Raw) ->
- Port = integer_to_list(mochiweb_socket_server:get(chttpd, port)),
- Url = list_to_binary(["http://", config:get("httpd", "bind_address"),
- ":", Port, "/", Raw]),
- Headers = [
- {<<"authorization">>, ?l2b(header_value(Req,"authorization",""))},
- {<<"cookie">>, ?l2b(extract_cookie(Req))}
- ],
- {[{<<"url">>,Url}, {<<"headers">>,{Headers}}]}.
-extract_cookie(#httpd{mochi_req = MochiReq}) ->
- case MochiReq:get_cookie_value("AuthSession") of
- undefined ->
- "";
- AuthSession ->
- "AuthSession=" ++ AuthSession
- end.
-%%% end hack
set_auth_handlers() ->
AuthenticationDefault = "{chttpd_auth, cookie_authentication_handler},
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index 11d2c5b72..d832ade8c 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -203,8 +203,8 @@ handle_task_status_req(Req) ->
handle_replicate_req(#httpd{method='POST', user_ctx=Ctx} = Req) ->
chttpd:validate_ctype(Req, "application/json"),
%% see HACK in chttpd.erl about replication
- PostBody = get(post_body),
- case replicate(PostBody, Ctx) of
+ PostBody = chttpd:json_body_obj(Req),
+ case couch_replicator:replicate(PostBody, Ctx) of
{ok, {continuous, RepId}} ->
send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]});
{ok, {cancelled, RepId}} ->
@@ -223,50 +223,6 @@ handle_replicate_req(#httpd{method='POST', user_ctx=Ctx} = Req) ->
handle_replicate_req(Req) ->
send_method_not_allowed(Req, "POST").
-replicate({Props} = PostBody, Ctx) ->
- case couch_util:get_value(<<"cancel">>, Props) of
- true ->
- cancel_replication(PostBody, Ctx);
- _ ->
- Node = choose_node([
- couch_util:get_value(<<"source">>, Props),
- couch_util:get_value(<<"target">>, Props)
- ]),
- case rpc:call(Node, couch_replicator, replicate, [PostBody, Ctx]) of
- {badrpc, Reason} ->
- erlang:error(Reason);
- Res ->
- Res
- end
- end.
-cancel_replication(PostBody, Ctx) ->
- {Res, _Bad} = rpc:multicall(couch_replicator, replicate, [PostBody, Ctx]),
- case [X || {ok, {cancelled, _}} = X <- Res] of
- [Success|_] ->
- % Report success if at least one node canceled the replication
- Success;
- [] ->
- case lists:usort(Res) of
- [UniqueReply] ->
- % Report a universally agreed-upon reply
- UniqueReply;
- [] ->
- {error, badrpc};
- Else ->
- % Unclear what to do here -- pick the first error?
- % Except try ignoring any {error, not_found} responses
- % because we'll always get two of those
- hd(Else -- [{error, not_found}])
- end
- end.
-choose_node(Key) when is_binary(Key) ->
- Checksum = erlang:crc32(Key),
- Nodes = lists:sort([node()|erlang:nodes()]),
- lists:nth(1 + Checksum rem length(Nodes), Nodes);
-choose_node(Key) ->
- choose_node(term_to_binary(Key)).
handle_reload_query_servers_req(#httpd{method='POST'}=Req) ->
chttpd:validate_ctype(Req, "application/json"),
diff --git a/src/chttpd/test/eunit/chttpd_handlers_tests.erl b/src/chttpd/test/eunit/chttpd_handlers_tests.erl
index f3e8f5dcd..5ae80d02b 100644
--- a/src/chttpd/test/eunit/chttpd_handlers_tests.erl
+++ b/src/chttpd/test/eunit/chttpd_handlers_tests.erl
@@ -70,7 +70,8 @@ request_replicate(Url, Body) ->
Headers = [{"Content-Type", "application/json"}],
Handler = {chttpd_misc, handle_replicate_req},
request(post, Url, Headers, Body, Handler, fun(Req) ->
- chttpd:send_json(Req, 200, get(post_body))
+ PostBody = chttpd:json_body_obj(Req),
+ chttpd:send_json(Req, 200, PostBody)
request(Method, Url, Headers, Body, {M, F}, MockFun) ->
diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
index d469ed41a..393f2563f 100644
--- a/src/couch_jobs/src/couch_jobs.erl
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -19,6 +19,8 @@
+ get_jobs/2,
+ get_jobs/3,
% Job processing
@@ -103,6 +105,21 @@ get_job_state(Tx, Type, JobId) when is_binary(JobId) ->
+-spec get_jobs(jtx(), job_type()) -> #{}.
+get_jobs(Tx, Type) when is_binary(JobId) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ couch_job_fdb:get_jobs(JTx, Type)
+ end).
+-spec get_jobs(jtx(), job_type()) -> #{}.
+get_jobs(Tx, Type, Filter) when is_binary(JobId), is_function(Filter, 1) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ couch_job_fdb:get_jobs(JTx, Type, Filter)
+ end).
%% Job processor API
-spec accept(job_type()) -> {ok, job(), job_data()} | {error, any()}.
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index e4fa31cee..225a258b0 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -52,30 +52,27 @@
{ok, {cancelled, binary()}} |
{error, any()} |
-replicate(PostBody, Ctx) ->
- {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
- Rep = Rep0#rep{start_time = os:timestamp()},
- #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
- case get_value(cancel, Options, false) of
- true ->
- CancelRepId = case get_value(id, Options, nil) of
- nil ->
- RepId;
- RepId2 ->
- RepId2
- end,
- case check_authorization(CancelRepId, UserCtx) of
- ok ->
- cancel_replication(CancelRepId);
- not_found ->
- {error, not_found}
- end;
- false ->
- check_authorization(RepId, UserCtx),
- {ok, Listener} = rep_result_listener(RepId),
- Result = do_replication_loop(Rep),
- couch_replicator_notifier:stop(Listener),
- Result
+replicate(PostBody, UserCtx) ->
+ {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, UserCtx),
+ Rep = Rep0#{<<"start_time">> => erlang:system_time()},
+ #{<<"id">> := RepId, <<"options">> := Options} = Rep,
+ case maps:get(<<"cancel">>, Options, false) of
+ true ->
+ CancelRepId = case maps:get(<<"id">>, Options, nil) of
+ nil -> RepId;
+ RepId2 -> RepId2
+ end,
+ case check_authorization(CancelRepId, UserCtx) of
+ ok -> cancel_replication(CancelRepId);
+ not_found -> {error, not_found}
+ end;
+ false ->
+ check_authorization(RepId, UserCtx),
+ ok = couch_replicator_scheduler:add_job(Rep),
+ case maps:get(<<"continuous">>, Options, false) of
+ true -> {ok, {continuous, Id}};
+ false -> wait_for_result(Id)
+ end
@@ -83,57 +80,50 @@ replicate(PostBody, Ctx) ->
% it returns `ignore`.
-spec ensure_rep_db_exists() -> ignore.
ensure_rep_db_exists() ->
- {ok, _Db} = couch_replicator_docs:ensure_rep_db_exists(),
+ ok = couch_replicator_docs:ensure_rep_db_exists(),
+ couch_jobs:set_type_timeout(?REP_DOCS, ?REP_DOCS_TIMEOUT_MSEC),
+ couch_jobs:set_type_timeout(?REP_JOBS, ?REP_JOBS_TIMEOUT_MSEC),
--spec do_replication_loop(#rep{}) ->
- {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}.
-do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
- ok = couch_replicator_scheduler:add_job(Rep),
- case get_value(continuous, Options, false) of
- true ->
- {ok, {continuous, ?l2b(BaseId ++ Ext)}};
- false ->
- wait_for_result(Id)
- end.
--spec rep_result_listener(rep_id()) -> {ok, pid()}.
-rep_result_listener(RepId) ->
- ReplyTo = self(),
- {ok, _Listener} = couch_replicator_notifier:start_link(
- fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
- ReplyTo ! Ev;
- (_) ->
- ok
- end).
-spec wait_for_result(rep_id()) ->
{ok, {[_]}} | {error, any()}.
wait_for_result(RepId) ->
- receive
- {finished, RepId, RepResult} ->
- {ok, RepResult};
- {error, RepId, Reason} ->
- {error, Reason}
+ FinishRes = case couch_jobs:subscribe(?REP_JOBS, RepId) of
+ {ok, finished, JobData} ->
+ {ok, JobData};
+ {ok, SubId, _, _} ->
+ case couch_jobs:wait(SubId, finished, infinity) of
+ {?REP_JOBS, RepId, finished, JobData} -> {ok, JobData};
+ timeout -> timeout
+ end;
+ {error, Error} ->
+ {error, Error}
+ end,
+ case FinishRes of
+ {ok, #{<<"finished_result">> := CheckpointHistory}} ->
+ {ok, CheckpointHistory};
+ timeout ->
+ {error, timeout};
+ {error, Error} ->
+ {error, Error}
-spec cancel_replication(rep_id()) ->
{ok, {cancelled, binary()}} | {error, not_found}.
-cancel_replication({BasedId, Extension} = RepId) ->
- FullRepId = BasedId ++ Extension,
- couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
- case couch_replicator_scheduler:rep_state(RepId) of
- #rep{} ->
- ok = couch_replicator_scheduler:remove_job(RepId),
- couch_log:notice("Replication '~s' cancelled", [FullRepId]),
- {ok, {cancelled, ?l2b(FullRepId)}};
- nil ->
- couch_log:notice("Replication '~s' not found", [FullRepId]),
- {error, not_found}
+cancel_replication(RepId) when is_binary(RepId) ->
+ couch_log:notice("Canceling replication '~s' ...", [RepId]),
+ case couch_jobs:get_job_data(undefined, ?REP_JOBS, RepId) of
+ {error_not, found} ->
+ {error, not_found};
+ #{<<"rep">> := #{<<"db_name">> := null}} ->
+ couch_jobs:remove(undefined, ?REP_JOBS, RepId)
+ {ok, {cancelled, ?l2b(FullRepId)}};
+ #{<<"rep">> := #{}} ->
+ % Job was started from a replicator doc canceling via _replicate
+ % doesn't quite make sense, instead replicator should be deleted.
+ {error, not_found}
@@ -142,10 +132,10 @@ replication_states() ->
--spec strip_url_creds(binary() | {[_]}) -> binary().
+-spec strip_url_creds(binary() | #{}) -> binary().
strip_url_creds(Endpoint) ->
- case couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
- #httpdb{url=Url} ->
+ case couch_replicator_docs:parse_rep_db(Endpoint, #{}, #{}) of
+ #{<<"url">> := Url} ->
LocalDb when is_binary(LocalDb) ->
@@ -286,13 +276,13 @@ state_atom(State) when is_atom(State) ->
-spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
- case couch_replicator_scheduler:rep_state(RepId) of
- #rep{user_ctx = #user_ctx{name = Name}} ->
- ok;
- #rep{} ->
- couch_httpd:verify_is_server_admin(Ctx);
- nil ->
- not_found
+ case couch_jobs:get_job_data(undefined, ?REP_JOBS, RePid) of
+ {error_not, found} ->
+ not_found;
+ #{<<"rep">> := {<<"user">> := Name}} ->
+ ok;
+ #{} ->
+ couch_httpd:verify_is_server_admin(Ctx)
@@ -342,13 +332,6 @@ t_replication_not_found() ->
-expect_rep_user_ctx(Name, Role) ->
- meck:expect(couch_replicator_scheduler, rep_state,
- fun(_Id) ->
- UserCtx = #user_ctx{name = Name, roles = [Role]},
- #rep{user_ctx = UserCtx}
- end).
strip_url_creds_test_() ->
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 2a5b7c8c8..6584078fc 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -11,6 +11,10 @@
% the License.
-define(REP_ID_VERSION, 4).
+-define(REP_DOCS, <<"repdocs">>).
+-define(REP_JOBS, <<"repjobs">>).
+-define(REP_DOCS_TIMEOUT_MSEC, 17000).
+-define(REP_JOBS_TIMEOUT_MSEC, 33000).
-record(rep, {
id :: rep_id() | '_' | 'undefined',
@@ -22,11 +26,12 @@
view = nil :: any() | '_',
doc_id :: any() | '_',
db_name = null :: null | binary() | '_',
- start_time = {0, 0, 0} :: erlang:timestamp() | '_',
+ start_time = 0 :: integer() | '_',
stats = couch_replicator_stats:new() :: orddict:orddict() | '_'
--type rep_id() :: {string(), string()}.
+-type rep_id() :: binary().
+-type user_name() :: binary() | null.
-type db_doc_id() :: {binary(), binary() | '_'}.
-type seconds() :: non_neg_integer().
-type rep_start_result() ::
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index ab1de7df9..5c03632f0 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -38,8 +38,8 @@
- db_uri/1,
- normalize_db/1
+ db_uri/1
+ db_from_map/1,
-import(couch_replicator_httpc, [
@@ -57,21 +57,19 @@
-define(MAX_URL_LEN, 7000).
-define(MIN_URL_LEN, 200).
-db_uri(#httpdb{url = Url}) ->
+db_uri(#{<<"url">> := Url}) ->
-db_uri(DbName) when is_binary(DbName) ->
- ?b2l(DbName);
+db_uri(#httpdb{url = Url}) ->
+ couch_util:url_strip_password(Url).
-db_uri(Db) ->
- db_uri(couch_db:name(Db)).
+db_open(#{} = Db) ->
+ db_open(Db, false, []);
-db_open(Db) ->
- db_open(Db, false, []).
-db_open(#httpdb{} = Db1, Create, CreateParams) ->
- {ok, Db} = couch_replicator_httpc:setup(Db1),
+db_open(#{} = Db0, Create, CreateParams) ->
+ {ok, Db} = couch_replicator_httpc:setup(db_from_json(Db0)),
case Create of
false ->
@@ -149,6 +147,7 @@ get_pending_count(#httpdb{} = Db, Seq) ->
{ok, couch_util:get_value(<<"pending">>, Props, null)}
get_view_info(#httpdb{} = Db, DDocId, ViewName) ->
Path = io_lib:format("~s/_view/~s/_info", [DDocId, ViewName]),
send_req(Db, [{path, Path}],
@@ -285,7 +284,6 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
open_doc_revs(RetryDb, Id, Revs, Options, Fun, Acc)
error_reason({http_request_failed, "GET", _Url, {error, timeout}}) ->
error_reason({http_request_failed, "GET", _Url, {error, {_, req_timedout}}}) ->
@@ -356,7 +354,6 @@ update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
update_docs(Db, DocList, Options) ->
update_docs(Db, DocList, Options, interactive_edit).
@@ -889,23 +886,6 @@ header_value(Key, Headers, Default) ->
-% Normalize an #httpdb{} or #db{} record such that it can be used for
-% comparisons. This means remove things like pids and also sort options / props.
-normalize_db(#httpdb{} = HttpDb) ->
- #httpdb{
- url = HttpDb#httpdb.url,
- auth_props = lists:sort(HttpDb#httpdb.auth_props),
- headers = lists:keysort(1, HttpDb#httpdb.headers),
- timeout = HttpDb#httpdb.timeout,
- ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options),
- retries = HttpDb#httpdb.retries,
- http_connections = HttpDb#httpdb.http_connections
- };
-normalize_db(<<DbName/binary>>) ->
- DbName.
maybe_append_create_query_params(Db, []) ->
@@ -914,27 +894,72 @@ maybe_append_create_query_params(Db, CreateParams) ->
Db#httpdb{url = NewUrl}.
+db_from_json(#{} = DbMap) ->
+ #{
+ <<"url">> := Url,
+ <<"auth">> := Auth,
+ <<"headers">> := Headers0,
+ <<"ibrowse_options">> := IBrowseOptions0,
+ <<"timeout">> := Timeout,
+ <<"http_connections">> := HttpConnections,
+ <<"retries">> := Retries,
+ <<"proxy_url">> := ProxyURL0
+ } = DbMap,
+ Headers = maps:fold(fun(K, V, Acc) ->
+ [{binary_to_list(K), binary_to_list(V)} | Acc]
+ end, [], Headers0),
+ IBrowseOptions0 = maps:fold(fun
+ (<<"proxy_protocol">>, V, Acc) ->
+ [{binary_to_atom(K), binary_to_existing_atom(V)} | Acc];
+ (<<"socket_options">>, #{} = SockOpts, Acc) ->
+ SockOptsKVs = maps:fold(fun sock_opts_fold/3, [], SockOpts),
+ [{socket_options, SockOptsKVs} | Acc];
+ (<<"ssl_options">>, #{} = SslOpts, Acc) ->
+ SslOptsKVs = maps:fold(fun ssl_opts_fold/3, [], SslOpts),
+ [{ssl_options, SslOptsKVs} | Acc];
+ (K, V, Acc) when is_binary(V) ->
+ [{binary_to_atom(K), binary_to_list(V)} | Acc];
+ (K, V, Acc) ->
+ [{binary_to_atom(K), V} | Acc]
+ end, [], IBrowseOptions0),
+ ProxyUrl = case ProxyUrl0 of
+ null -> undefined,
+ V when is_binary(V) -> binary_to_list(V)
+ end,
+ #httpdb{
+ url = binary_to_list(Url),
+ auth_props = maps:to_list(Auth),
+ headers = Headers,
+ ibrowse_options = IBrowseOptions,
+ timeout = Timeout,
+ http_connections = HttpConnections,
+ retries = Retries,
+ proxy_url = ProxyURL
+ }.
+% See couch_replicator_docs:ssl_params/1 for ssl parsed options
+% and
+% all latest SSL server options
+ssl_opts_fold(K, V, Acc) when is_boolean(V); is_integer(V) ->
+ [{binary_to_atom(K), V} | Acc];
+ssl_opts_fold(K, null, Acc) ->
+ [{binary_to_atom(K), undefined} | Acc];
+ssl_opts_fold(<<"verify">>, V, Acc) ->
+ [{binary_to_atom(K), binary_to_atom(V)};
-normalize_http_db_test() ->
- HttpDb = #httpdb{
- url = "http://host/db",
- auth_props = [{"key", "val"}],
- headers = [{"k2","v2"}, {"k1","v1"}],
- timeout = 30000,
- ibrowse_options = [{k2, v2}, {k1, v1}],
- retries = 10,
- http_connections = 20
- },
- Expected = HttpDb#httpdb{
- headers = [{"k1","v1"}, {"k2","v2"}],
- ibrowse_options = [{k1, v1}, {k2, v2}]
- },
- ?assertEqual(Expected, normalize_db(HttpDb)),
- ?assertEqual(<<"local">>, normalize_db(<<"local">>)).
+ssl_opts_fold(K, V, Acc) when is_list(V) ->
+ [{binary_to_atom(K), binary_to_list(V)} | Acc].
+% See ?VALID_SOCK_OPTS in couch_replicator_docs for accepted socket options
+sock_opts_fold(K, V, Acc) when is_list(V) ->
+ [{binary_to_atom(K), binary_to_atom(V)} | Acc];
+sock_opts_fold(K, V, Acc) when is_boolean(V); is_integer(V) ->
+ [{binary_to_atom(K), V} | Acc].
diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl
deleted file mode 100644
index a7f7573b6..000000000
--- a/src/couch_replicator/src/couch_replicator_clustering.erl
+++ /dev/null
@@ -1,248 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-% Maintain cluster membership and stability notifications for replications.
-% On changes to cluster membership, broadcast events to `replication` gen_event.
-% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events.
-% Cluster stability is defined as "there have been no nodes added or removed in
-% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a
-% speedier startup, during initialization there is a shorter StartupPeriod
-% in effect (also configurable).
-% This module is also in charge of calculating ownership of replications based
-% on where their _replicator db documents shards live.
- start_link/0
- init/1,
- terminate/2,
- handle_call/3,
- handle_info/2,
- handle_cast/2,
- code_change/3
- owner/2,
- is_stable/0,
- link_cluster_event_listener/3
-% config_listener callbacks
- handle_config_change/5,
- handle_config_terminate/3
-% mem3_cluster callbacks
- cluster_stable/1,
- cluster_unstable/1
--define(DEFAULT_QUIET_PERIOD, 60). % seconds
--define(DEFAULT_START_PERIOD, 5). % seconds
--define(RELISTEN_DELAY, 5000).
--record(state, {
- mem3_cluster_pid :: pid(),
- cluster_stable :: boolean()
--spec start_link() -> {ok, pid()} | ignore | {error, term()}.
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-% owner/2 function computes ownership for a {DbName, DocId} tuple
-% `unstable` if cluster is considered to be unstable i.e. it has changed
-% recently, or returns node() which of the owner.
--spec owner(Dbname :: binary(), DocId :: binary()) -> node() | unstable.
-owner(<<"shards/", _/binary>> = DbName, DocId) ->
- case is_stable() of
- false ->
- unstable;
- true ->
- owner_int(DbName, DocId)
- end;
-owner(_DbName, _DocId) ->
- node().
--spec is_stable() -> true | false.
-is_stable() ->
- gen_server:call(?MODULE, is_stable).
--spec link_cluster_event_listener(atom(), atom(), list()) -> pid().
-link_cluster_event_listener(Mod, Fun, Args)
- when is_atom(Mod), is_atom(Fun), is_list(Args) ->
- CallbackFun =
- fun(Event = {cluster, _}) -> erlang:apply(Mod, Fun, Args ++ [Event]);
- (_) -> ok
- end,
- {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun),
- Pid.
-% Mem3 cluster callbacks
-cluster_unstable(Server) ->
- ok = gen_server:call(Server, set_unstable),
- couch_replicator_notifier:notify({cluster, unstable}),
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
- couch_log:notice("~s : cluster unstable", [?MODULE]),
- Server.
-cluster_stable(Server) ->
- ok = gen_server:call(Server, set_stable),
- couch_replicator_notifier:notify({cluster, stable}),
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
- couch_log:notice("~s : cluster stable", [?MODULE]),
- Server.
-% gen_server callbacks
-init([]) ->
- ok = config:listen_for_changes(?MODULE, nil),
- Period = abs(config:get_integer("replicator", "cluster_quiet_period",
- StartPeriod = abs(config:get_integer("replicator", "cluster_start_period",
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
- {ok, Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), StartPeriod,
- Period),
- {ok, #state{mem3_cluster_pid = Mem3Cluster, cluster_stable = false}}.
-terminate(_Reason, _State) ->
- ok.
-handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) ->
- {reply, IsStable, State};
-handle_call(set_stable, _From, State) ->
- {reply, ok, State#state{cluster_stable = true}};
-handle_call(set_unstable, _From, State) ->
- {reply, ok, State#state{cluster_stable = false}}.
-handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) ->
- ok = mem3_cluster:set_period(Pid, Period),
- {noreply, State}.
-handle_info(restart_config_listener, State) ->
- ok = config:listen_for_changes(?MODULE, nil),
- {noreply, State}.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-%% Internal functions
-handle_config_change("replicator", "cluster_quiet_period", V, _, S) ->
- ok = gen_server:cast(?MODULE, {set_period, list_to_integer(V)}),
- {ok, S};
-handle_config_change(_, _, _, _, S) ->
- {ok, S}.
-handle_config_terminate(_, stop, _) -> ok;
-handle_config_terminate(_S, _R, _St) ->
- Pid = whereis(?MODULE),
- erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
--spec owner_int(binary(), binary()) -> node().
-owner_int(ShardName, DocId) ->
- DbName = mem3:dbname(ShardName),
- Live = [node() | nodes()],
- Shards = mem3:shards(DbName, DocId),
- Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)],
- mem3:owner(DbName, DocId, Nodes).
-replicator_clustering_test_() ->
- {
- foreach,
- fun setup/0,
- fun teardown/1,
- [
- t_stable_callback(),
- t_unstable_callback()
- ]
- }.
-t_stable_callback() ->
- ?_test(begin
- ?assertEqual(false, is_stable()),
- cluster_stable(whereis(?MODULE)),
- ?assertEqual(true, is_stable())
- end).
-t_unstable_callback() ->
- ?_test(begin
- cluster_stable(whereis(?MODULE)),
- ?assertEqual(true, is_stable()),
- cluster_unstable(whereis(?MODULE)),
- ?assertEqual(false, is_stable())
- end).
-setup() ->
- meck:expect(couch_log, notice, 2, ok),
- meck:expect(config, get, fun(_, _, Default) -> Default end),
- meck:expect(config, listen_for_changes, 2, ok),
- meck:expect(couch_stats, update_gauge, 2, ok),
- meck:expect(couch_replicator_notifier, notify, 1, ok),
- {ok, Pid} = start_link(),
- Pid.
-teardown(Pid) ->
- unlink(Pid),
- exit(Pid, kill),
- meck:unload().
diff --git a/src/couch_replicator/src/couch_replicator_db_changes.erl b/src/couch_replicator/src/couch_replicator_db_changes.erl
deleted file mode 100644
index 92b0222c4..000000000
--- a/src/couch_replicator/src/couch_replicator_db_changes.erl
+++ /dev/null
@@ -1,108 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
- start_link/0
- init/1,
- terminate/2,
- handle_call/3,
- handle_info/2,
- handle_cast/2,
- code_change/3
- notify_cluster_event/2
--record(state, {
- event_listener :: pid(),
- mdb_changes :: pid() | nil
--spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
-notify_cluster_event(Server, {cluster, _} = Event) ->
- gen_server:cast(Server, Event).
--spec start_link() ->
- {ok, pid()} | ignore | {error, any()}.
-start_link() ->
- gen_server:start_link(?MODULE, [], []).
-init([]) ->
- EvtPid = couch_replicator_clustering:link_cluster_event_listener(?MODULE,
- notify_cluster_event, [self()]),
- State = #state{event_listener = EvtPid, mdb_changes = nil},
- case couch_replicator_clustering:is_stable() of
- true ->
- {ok, restart_mdb_changes(State)};
- false ->
- {ok, State}
- end.
-terminate(_Reason, _State) ->
- ok.
-handle_call(_Msg, _From, State) ->
- {reply, {error, invalid_call}, State}.
-handle_cast({cluster, unstable}, State) ->
- {noreply, stop_mdb_changes(State)};
-handle_cast({cluster, stable}, State) ->
- {noreply, restart_mdb_changes(State)}.
-handle_info(_Msg, State) ->
- {noreply, State}.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
--spec restart_mdb_changes(#state{}) -> #state{}.
-restart_mdb_changes(#state{mdb_changes = nil} = State) ->
- Suffix = <<"_replicator">>,
- CallbackMod = couch_replicator_doc_processor,
- Options = [skip_ddocs],
- {ok, Pid} = couch_multidb_changes:start_link(Suffix, CallbackMod, nil,
- Options),
- couch_stats:increment_counter([couch_replicator, db_scans]),
- couch_log:notice("Started replicator db changes listener ~p", [Pid]),
- State#state{mdb_changes = Pid};
-restart_mdb_changes(#state{mdb_changes = _Pid} = State) ->
- restart_mdb_changes(stop_mdb_changes(State)).
--spec stop_mdb_changes(#state{}) -> #state{}.
-stop_mdb_changes(#state{mdb_changes = nil} = State) ->
- State;
-stop_mdb_changes(#state{mdb_changes = Pid} = State) ->
- couch_log:notice("Stopping replicator db changes listener ~p", [Pid]),
- unlink(Pid),
- exit(Pid, kill),
- State#state{mdb_changes = nil}.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 772037d8d..6f4481491 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -13,7 +13,6 @@
@@ -29,10 +28,9 @@
- db_created/2,
- db_deleted/2,
- db_found/2,
- db_change/3
+ during_doc_update/3,
+ after_db_create/1,
+ after_db_delete/1
@@ -40,8 +38,7 @@
- get_worker_ref/1,
- notify_cluster_event/2
+ get_worker_ref/1
@@ -76,88 +73,103 @@
-% couch_multidb_changes API callbacks
+during_doc_update(#doc{} = Doc, Db, _UpdateType) ->
+ couch_stats:increment_counter([couch_replicator, docs, db_changes]),
+ ok = process_change(Db, Doc).
-db_created(DbName, Server) ->
+after_db_create(#{name := DbName}) ->
couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
- couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
- Server.
+ couch_replicator_docs:ensure_rep_ddoc_exists(DbName).
-db_deleted(DbName, Server) ->
+after_db_delete(#{name := DbName}) ->
couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
- ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity),
- Server.
+ remove_replications_by_dbname(DbName).
-db_found(DbName, Server) ->
- couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
- couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
- Server.
+process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
+ ok;
-db_change(DbName, {ChangeProps} = Change, Server) ->
- couch_stats:increment_counter([couch_replicator, docs, db_changes]),
- try
- ok = process_change(DbName, Change)
+process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) ->
+ Id = docs_job_id(DbName,,
+ ok = remove_replication_by_doc_job_id(Db, Id);
+process_change(#{name := DbName} = Db, #doc{} = Doc) ->
+ #doc{id = DocId, body = {Props} = Body} = Doc,
+ {Rep, RepError} = try
+ Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body),
+ Rep1 = Rep0#{
+ <<"db_name">> => DbName,
+ <<"start_time">> => erlang:system_time()
+ },
+ {Rep1, null}
- _Tag:Error ->
- {RepProps} = get_json_value(doc, ChangeProps),
- DocId = get_json_value(<<"_id">>, RepProps),
- couch_replicator_docs:update_failed(DbName, DocId, Error)
+ throw:{bad_rep_doc, Reason} ->
+ {null, couch_replicator_utils:rep_error_to_binary(Reason)}
- Server.
--spec get_worker_ref(db_doc_id()) -> reference() | nil.
-get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
- case ets:lookup(?MODULE, {DbName, DocId}) of
- [#rdoc{worker = WRef}] when is_reference(WRef) ->
- WRef;
- [#rdoc{worker = nil}] ->
- nil;
- [] ->
- nil
+ % We keep track of the doc's state in order to clear it if update_docs
+ % is toggled from true to false
+ DocState = get_json_value(<<"_replication_state">>, Props, null),
+ case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of
+ {error, not_found} ->
+ update_replication_job(Db, DbName, DocId, Rep, RepError, DocState);
+ {ok, #{<<"rep">> := null, <<"rep_parse_error">> := RepError}}
+ when Rep =:= null ->
+ % Same error as before occurred, don't bother updating the job
+ ok;
+ {ok, #{<<"rep">> := null}} when Rep =:= null ->
+ % Error occured but it's a different error. Update the job so user
+ % sees the new error
+ update_replication_job(Db, DbName, DocId, Rep, RepError, DocState);
+ {ok, #{<<"rep">> := OldRep, <<"rep_parse_error">> := OldError}} ->
+ NormOldRep = couch_replicator_util:normalize_rep(OldRep),
+ NormRep = couch_replicator_util:normalize_rep(Rep),
+ case NormOldRep == NormRep of
+ true ->
+ % Document was changed but none of the parameters relevent
+ % for the replication job have changed, so make it a no-op
+ ok;
+ false ->
+ update_replication_job(Db, DbName, DocId, Rep, RepError,
+ DocState)
+ end
-% Cluster membership change notification callback
--spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
-notify_cluster_event(Server, {cluster, _} = Event) ->
- gen_server:cast(Server, Event).
-process_change(DbName, {Change}) ->
- {RepProps} = JsonRepDoc = get_json_value(doc, Change),
- DocId = get_json_value(<<"_id">>, RepProps),
- Owner = couch_replicator_clustering:owner(DbName, DocId),
- Id = {DbName, DocId},
- case {Owner, get_json_value(deleted, Change, false)} of
- {_, true} ->
- ok = gen_server:call(?MODULE, {removed, Id}, infinity);
- {unstable, false} ->
- couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
- {ThisNode, false} when ThisNode =:= node() ->
- case get_json_value(<<"_replication_state">>, RepProps) of
- undefined ->
- ok = process_updated(Id, JsonRepDoc);
- <<"triggered">> ->
- maybe_remove_state_fields(DbName, DocId),
- ok = process_updated(Id, JsonRepDoc);
- <<"completed">> ->
- ok = gen_server:call(?MODULE, {completed, Id}, infinity);
- <<"error">> ->
- % Handle replications started from older versions of replicator
- % which wrote transient errors to replication docs
- maybe_remove_state_fields(DbName, DocId),
- ok = process_updated(Id, JsonRepDoc);
- <<"failed">> ->
- ok
- end;
- {Owner, false} ->
- ok
+rep_docs_job_execute(#{} = Job, #{<<"rep">> := null} = JobData) ->
+ #{
+ <<"rep_parse_error">> := Error,
+ <<"db_name">> := DbName,
+ <<"doc_id">> := DocId,
+ } = JobData,
+ JobData1 = JobData#{
+ <<"finished_state">> := <<"failed">>,
+ <<"finished_result">> := Error
+ }
+ case couch_jobs:finish(undefined, Job, JobData1) of
+ ok ->
+ couch_replicator_docs:update_failed(DbName, DocId, Error),
+ ok;
+ {error, JobError} ->
+ Msg = "Replication ~s job could not finish. JobError:~p",
+ couch_log:error(Msg, [RepId, JobError]),
+ {error, JobError}
+ end;
+rep_docs_job_execute(#{} = Job, #{} = JobData) ->
+ #{<<"rep">> := Rep, <<"doc_state">> := DocState} = JobData,
+ case lists:member(DocState, [<<"triggered">>, <<"error">>]) of
+ true -> maybe_remove_state_fields(DbName, DocId),
+ false -> ok
- ok.
+ % completed jobs should finish right away
+ % otherwise start computing the replication id
+ Rep1 = update_replication_id(Rep),
+ % when done add or update the replicaton job
+ % if jobs has a filter keep checking if filter changes
maybe_remove_state_fields(DbName, DocId) ->
@@ -203,8 +215,6 @@ start_link() ->
init([]) ->
?MODULE = ets:new(?MODULE, [named_table, {keypos,},
{read_concurrency, true}, {write_concurrency, true}]),
- couch_replicator_clustering:link_cluster_event_listener(?MODULE,
- notify_cluster_event, [self()]),
{ok, nil}.
@@ -228,15 +238,6 @@ handle_call({clean_up_replications, DbName}, _From, State) ->
ok = removed_db(DbName),
{reply, ok, State}.
-handle_cast({cluster, unstable}, State) ->
- % Ignoring unstable state transition
- {noreply, State};
-handle_cast({cluster, stable}, State) ->
- % Membership changed recheck all the replication document ownership
- nil = ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE),
- {noreply, State};
handle_cast(Msg, State) ->
{stop, {error, unexpected_message, Msg}, State}.
@@ -591,21 +592,57 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
lists:member(State, States).
--spec cluster_membership_foldl(#rdoc{}, nil) -> nil.
-cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
- case couch_replicator_clustering:owner(DbName, DocId) of
- unstable ->
- nil;
- ThisNode when ThisNode =:= node() ->
- nil;
- OtherNode ->
- Msg = "Replication doc ~p:~p with id ~p usurped by node ~p",
- couch_log:notice(Msg, [DbName, DocId, RepId, OtherNode]),
- removed_doc(Id),
- nil
+-spec update_replication(any(), binary(), binary(), #{} | null,
+ binary() | null, binary() | null) -> ok.
+update_replication_job(Tx, DbName, DocId, Rep, RepParseError, DocState) ->
+ JobId = docs_job_id(DbName, DocId),
+ ok = remove_replication_by_doc_job_id(Db, JobId),
+ RepDocsJob = #{
+ <<"rep_id">> := null,
+ <<"db_name">> := DbName,
+ <<"doc_id">> := DocId,
+ <<"rep">> := Rep,
+ <<"rep_parse_error">> := RepParseError,
+ <<"doc_state">> := DocState
+ },
+ ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocsJob).
+docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
+ <<DbName/binary, "|", Id/binary>>.
+-spec remove_replication_by_doc_job_id(Tx, Id) -> ok.
+remove_replication_by_doc_job_id(Tx, Id) ->
+ case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of
+ {error, not_found} ->
+ ok;
+ {ok, #{<<"rep_id">> := null}} ->
+ couch_jobs:remove(Tx, ?REP_DOCS, Id),
+ ok;
+ {ok, #{<<"rep_id">> := RepId}} ->
+ couch_jobs:remove(Tx, ?REP_JOBS, RepId),
+ couch_jobs:remove(Tx, ?REP_DOCS, Id),
+ ok
+-spec remove_replications_by_dbname(DbName) -> ok.
+remove_replications_by_dbname(DbName) ->
+ DbNameSize = byte_size(DbName),
+ Filter = fun
+ (<<DbName:DbNameSize/binary, "|", _, _/binary>>) -> true;
+ (_) -> false
+ end,
+ JobsMap = couch_job:get_jobs(undefined, ?REP_DOCS, Filter),
+ % Batch these into smaller transactions eventually...
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ maps:map(fun(Id, _) ->
+ remove_replication_by_doc_job_id(JTx, Id)
+ end, JobsMap)
+ end).
@@ -634,8 +671,7 @@ doc_processor_test_() ->
- t_ejson_docs(),
- t_cluster_membership_foldl()
+ t_ejson_docs()
@@ -787,21 +823,6 @@ t_ejson_docs() ->
-% Check that when cluster membership changes records from doc processor and job
-% scheduler get removed
-t_cluster_membership_foldl() ->
- ?_test(begin
- mock_existing_jobs_lookup([test_rep(?R1)]),
- ?assertEqual(ok, process_change(?DB, change())),
- meck:expect(couch_replicator_clustering, owner, 2, different_node),
- ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
- gen_server:cast(?MODULE, {cluster, stable}),
- meck:wait(2, couch_replicator_scheduler, find_jobs_by_doc, 2, 5000),
- ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})),
- ?assert(removed_job(?R1))
- end).
get_worker_ref_test_() ->
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index c07caa1aa..cdc6a106b 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -22,12 +22,11 @@
- ensure_cluster_rep_ddoc_exists/1,
- update_triggered/2,
+ update_triggered/3,
@@ -57,6 +56,23 @@
-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
+-define(DEFAULT_SOCK_OPTS, "[{keepalive, true}, {nodelay, false}]").
+-define(VALID_SOCK_OPTS, [buffer, delay_send, exit_on_close, ipv6_v6only,
+ keepalive, nodelay, recbuf, send_timeout, send_timout_close, sndbuf,
+ priority, tos, tclass
+ {"worker_processes", "4", fun list_to_integer/1},
+ {"worker_batch_size", "500", fun list_to_integer/1},
+ {"http_connections", "20", fun list_to_integer/1},
+ {"connection_timeout", "30000", fun list_to_integer/1},
+ {"retries_per_request", "5", fun list_to_integer/1},
+ {"use_checkpoints", "true", fun list_to_existing_atom/1},
+ {"checkpoint_interval", "30000", fun list_to_integer/1},
+ {"socket_options", ?DEFAULT_SOCK_OPTS, fun parse_sock_opts/1}
remove_state_fields(DbName, DocId) ->
update_rep_doc(DbName, DocId, [
@@ -90,28 +106,27 @@ update_failed(DbName, DocId, Error) ->
--spec update_triggered(#rep{}, rep_id()) -> ok.
-update_triggered(Rep, {Base, Ext}) ->
- #rep{
- db_name = DbName,
- doc_id = DocId
- } = Rep,
+-spec update_triggered(binary(), binary(), binary()) -> ok.
+update_triggered(Id, DocId, DbName) ->
update_rep_doc(DbName, DocId, [
{<<"_replication_state">>, <<"triggered">>},
{<<"_replication_state_reason">>, undefined},
- {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
+ {<<"_replication_id">>, Id},
{<<"_replication_stats">>, undefined}]),
--spec update_error(#rep{}, any()) -> ok.
-update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
+-spec update_error(#{}, any()) -> ok.
+update_error(#rep{} = Rep, Error) ->
+ #{
+ <<"id">> := RepId0,
+ <<"db_name">> := DbName,
+ <<"doc_id">> := DocId,
+ } = Rep,
Reason = error_reason(Error),
- BinRepId = case RepId of
- {Base, Ext} ->
- iolist_to_binary([Base, Ext]);
- _Other ->
- null
+ RepId = case RepId0 of
+ Id when is_binary(Id) -> Id;
+ _Other -> null
update_rep_doc(DbName, DocId, [
{<<"_replication_state">>, <<"error">>},
@@ -121,34 +136,22 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
--spec ensure_rep_db_exists() -> {ok, Db::any()}.
+-spec ensure_rep_db_exists() -> ok.
ensure_rep_db_exists() ->
- Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db,
- nologifmissing]) of
- {ok, Db0} ->
- Db0;
- _Error ->
- {ok, Db0} = couch_db:create(?REP_DB_NAME, [?CTX, sys_db]),
- Db0
- end,
- ok = ensure_rep_ddoc_exists(?REP_DB_NAME),
- {ok, Db}.
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
- case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
- true ->
- ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
- false ->
+ Opts = [?CTX, sys_db, nologifmissing],
+ case fabric2_db:create(?REP_DB_NAME, Opts) of
+ {error, file_exists} ->
+ ok;
+ {ok, _Db} ->
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
+-spec ensure_rep_ddoc_exists(binary()) -> ok.
+ensure_rep_ddoc_exists(RepDb) ->
case open_rep_doc(RepDb, DDocId) of
- {not_found, no_db_file} ->
+ {not_found, database_does_not_exist} ->
%% database was deleted.
{not_found, _Reason} ->
@@ -179,13 +182,6 @@ ensure_rep_ddoc_exists(RepDb, DDocId) ->
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
- [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
- ensure_rep_ddoc_exists(DbShard, DDocId).
-spec compare_ejson({[_]}, {[_]}) -> boolean().
compare_ejson(EJson1, EJson2) ->
EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
@@ -202,31 +198,10 @@ replication_design_doc_props(DDocId) ->
-% Note: parse_rep_doc can handle filtered replications. During parsing of the
-% replication doc it will make possibly remote http requests to the source
-% database. If failure or parsing of filter docs fails, parse_doc throws a
-% {filter_fetch_error, Error} excation. This exception should be considered
-% transient in respect to the contents of the document itself, since it depends
-% on netowrk availability of the source db and other factors.
--spec parse_rep_doc({[_]}) -> #rep{}.
-parse_rep_doc(RepDoc) ->
- {ok, Rep} = try
- parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
- catch
- throw:{error, Reason} ->
- throw({bad_rep_doc, Reason});
- throw:{filter_fetch_error, Reason} ->
- throw({filter_fetch_error, Reason});
- Tag:Err ->
- throw({bad_rep_doc, to_binary({Tag, Err})})
- end,
- Rep.
--spec parse_rep_doc_without_id({[_]}) -> #rep{}.
+-spec parse_rep_doc_without_id({[_]}) -> #{}.
parse_rep_doc_without_id(RepDoc) ->
{ok, Rep} = try
- parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
+ parse_rep_doc_without_id(RepDoc, rep_user_name(RepDoc))
throw:{error, Reason} ->
throw({bad_rep_doc, Reason});
@@ -236,11 +211,12 @@ parse_rep_doc_without_id(RepDoc) ->
--spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc(Doc, UserCtx) ->
- {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
- Cancel = get_value(cancel, Rep#rep.options, false),
- Id = get_value(id, Rep#rep.options, nil),
+-spec parse_rep_doc({[_]}, user_name()) -> {ok, #{}}.
+parse_rep_doc({[_]} = Doc, UserName) ->
+ {ok, Rep} = parse_rep_doc_without_id(Doc, UserName),
+ #{<<"options">> := Options} = Rep,
+ Cancel = maps:get(<<"cancel">>, Options, false),
+ Id = maps:get(<<"id">>, Options, nil),
case {Cancel, Id} of
{true, nil} ->
% Cancel request with no id, must parse id out of body contents
@@ -254,38 +230,43 @@ parse_rep_doc(Doc, UserCtx) ->
--spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc_without_id({Props}, UserCtx) ->
- Proxy = get_value(<<"proxy">>, Props, <<>>),
- Opts = make_options(Props),
- case get_value(cancel, Opts, false) andalso
- (get_value(id, Opts, nil) =/= nil) of
+-spec parse_rep_doc_without_id({[_]} | #{}, user_name()) -> {ok, #{}}.
+parse_rep_doc_without_id({[_]} = EJson, UserName) ->
+ % Normalize all field names to be binaries and turn into a map
+ parse_rep_doc_without_id(Map, UserName);
+parse_rep_doc_without_id(#{} = Doc, UserName) ->
+ Proxy = parse_proxy_params(maps:get(<<"proxy">>, Doc, <<>>)),
+ Opts = make_options(Doc),
+ Cancel = maps:get(<<"cancel">>, Opts, false),
+ Id = maps:get(<<"id">>, Opts, nil),
+ case Cancel andalso Id =/= nil of
true ->
- {ok, #rep{options = Opts, user_ctx = UserCtx}};
+ {ok, #{<<"options">> => Opts, <<"user">> => UserName}};
false ->
- Source = parse_rep_db(get_value(<<"source">>, Props), Proxy, Opts),
- Target = parse_rep_db(get_value(<<"target">>, Props), Proxy, Opts),
+ #{<<"source">> := Source0, <<"target">> := Target0} = Doc,
+ Source = parse_rep_db(Source0, Proxy, Opts),
+ Target = parse_rep_db(Target0, Proxy, Opts),
{Type, View} = case couch_replicator_filters:view_type(Props, Opts) of
- {error, Error} ->
- throw({bad_request, Error});
- Result ->
- Result
+ {error, Error} -> throw({bad_request, Error});
+ Result -> Result
- Rep = #rep{
- source = Source,
- target = Target,
- options = Opts,
- user_ctx = UserCtx,
- type = Type,
- view = View,
- doc_id = get_value(<<"_id">>, Props, null)
+ Rep = #{
+ <<"id">> => null,
+ <<"base_id">> => null,
+ <<"source">> => Source,
+ <<"target">> => Target,
+ <<"options">> => Opts,
+ <<"user">> => UserName,
+ <<"type">> => Type,
+ <<"view">> => View,
+ <<"doc_id">> => maps:get(<<"_id">>, Doc, null)
% Check if can parse filter code, if not throw exception
case couch_replicator_filters:parse(Opts) of
- {error, FilterError} ->
- throw({error, FilterError});
- {ok, _Filter} ->
- ok
+ {error, FilterError} -> throw({error, FilterError});
+ {ok, _Filter} -> ok
{ok, Rep}
@@ -295,9 +276,10 @@ parse_rep_doc_without_id({Props}, UserCtx) ->
% fetching a filter from the source db, and so it could fail intermetently.
% In case of a failure to fetch the filter this function will throw a
% `{filter_fetch_error, Reason} exception.
-update_rep_id(Rep) ->
- RepId = couch_replicator_ids:replication_id(Rep),
- Rep#rep{id = RepId}.
+update_rep_id(#{} = Rep) ->
+ {BaseId, ExtId} = couch_replicator_ids:replication_id(Rep),
+ RepId = erlang:iolist_to_binary([BaseId, ExtId]),
+ Rep#{<<"id">> => RepId, <<"base_id">> = BaseId}.
update_rep_doc(RepDbName, RepDocId, KVs) ->
@@ -350,22 +332,21 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
open_rep_doc(DbName, DocId) ->
- case couch_db:open_int(DbName, [?CTX, sys_db]) of
- {ok, Db} ->
- try
- couch_db:open_doc(Db, DocId, [ejson_body])
- after
- couch_db:close(Db)
- end;
- Else ->
- Else
+ try
+ case fabric2_db:open(DbName, [?CTX, sys_db]) of
+ {ok, Db} -> fabric2_db:open_doc(Db, DocId, [ejson_body]);
+ Else -> Else
+ end
+ catch
+ error:database_does_not_exist ->
+ {not_found, database_does_not_exist}
save_rep_doc(DbName, Doc) ->
- {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
+ {ok, Db} = fabric2_db:open(DbName, [?CTX, sys_db]),
- couch_db:update_doc(Db, Doc, [])
+ fabric2_db:update_doc(Db, Doc, [])
% User can accidently write a VDU which prevents _replicator from
% updating replication documents. Avoid crashing replicator and thus
@@ -374,54 +355,56 @@ save_rep_doc(DbName, Doc) ->
Msg = "~p VDU function preventing doc update to ~s ~s ~p",
couch_log:error(Msg, [?MODULE, DbName,, Reason]),
{ok, forbidden}
- after
- couch_db:close(Db)
--spec rep_user_ctx({[_]}) -> #user_ctx{}.
-rep_user_ctx({RepDoc}) ->
+-spec rep_user_name({[_]}) -> binary() | null.
+rep_user_name({RepDoc}) ->
case get_json_value(<<"user_ctx">>, RepDoc) of
- undefined ->
- #user_ctx{};
- {UserCtx} ->
- #user_ctx{
- name = get_json_value(<<"name">>, UserCtx, null),
- roles = get_json_value(<<"roles">>, UserCtx, [])
- }
+ undefined -> null;
+ {UserCtx} -> get_json_value(<<"name">>, UserCtx, null)
--spec parse_rep_db({[_]} | binary(), binary(), [_]) -> #httpd{} | binary().
-parse_rep_db({Props}, Proxy, Options) ->
- ProxyParams = parse_proxy_params(Proxy),
+-spec parse_rep_db(#{}, #{}, #{}) -> #{}.
+parse_rep_db(#{} = Endpoint, #{} = ProxyParams, #{} = Options) ->
ProxyURL = case ProxyParams of
- [] -> undefined;
- _ -> binary_to_list(Proxy)
+ #{<<"proxy_url">> := PUrl} -> PUrl;
+ _ -> null
- Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
- {AuthProps} = get_value(<<"auth">>, Props, {[]}),
- {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
- Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
- DefaultHeaders = (#httpdb{})#httpdb.headers,
- #httpdb{
- url = Url,
- auth_props = AuthProps,
- headers = lists:ukeymerge(1, Headers, DefaultHeaders),
- ibrowse_options = lists:keysort(1,
- [{socket_options, get_value(socket_options, Options)} |
- ProxyParams ++ ssl_params(Url)]),
- timeout = get_value(connection_timeout, Options),
- http_connections = get_value(http_connections, Options),
- retries = get_value(retries, Options),
- proxy_url = ProxyURL
- };
+ Url0 = maps:get(<<"url">>, Endpoint),
+ Url = maybe_add_trailing_slash(Url0),
+ AuthProps = maps:get(<<"auth">>, Endpoint, #{}),
+ Headers0 = maps:get(<<"headers">>, Endpoint, #{}),
+ DefaultHeaders = couch_replicator_utils:get_default_headers(),
+ % For same keys values in second map override those in the first
+ Headers = maps:merge(DefaultHeaders, Headers0),
+ SockOpts = maps:get(<<"socket_options">>, Options, #{}),
+ SockAndProxy = maps:merge(SockOpts, ProxyParams),
+ SslParams = ssl_params(Url),
+ #{
+ <<"url">> => Url,
+ <<"auth_props">> => AuthProps,
+ <<"headers">> => Headers,
+ <<"ibrowse_options">> => maps:merge(SslParams, SockAndProxy),
+ <<"timeout">> => maps:get(<<"timeout">>, Options),
+ <<"http_connections">> => maps:get(<<"http_connections">>, Options),
+ <<"retries">> => maps:get(<<"retries">>, Options)
+ <<"proxy_url">> => ProxyUrl
+ }.
parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
- parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+ parse_rep_db(#{<<"url">> => Url}, Proxy, Options);
parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
- parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+ parse_rep_db(#{<<"url">> => Url}, Proxy, Options);
parse_rep_db(<<_/binary>>, _Proxy, _Options) ->
throw({error, <<"Local endpoints not supported since CouchDB 3.x">>});
@@ -430,118 +413,99 @@ parse_rep_db(undefined, _Proxy, _Options) ->
throw({error, <<"Missing replicator database">>}).
--spec maybe_add_trailing_slash(binary() | list()) -> list().
+-spec maybe_add_trailing_slash(binary()) -> binary().
+maybe_add_trailing_slash(<<>>) ->
+ <<>>;
maybe_add_trailing_slash(Url) when is_binary(Url) ->
- maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
- case lists:member($?, Url) of
- true ->
- Url; % skip if there are query params
- false ->
- case lists:last(Url) of
- $/ ->
- Url;
- _ ->
- Url ++ "/"
- end
+ case binary:match(Url, <<"?">>) of
+ nomatch ->
+ case binary:last(Url) of
+ $/ -> Url;
+ _ -> <<Url/binary, "/">>;
+ _ ->
+ Url % skip if there are query params
--spec make_options([_]) -> [_].
-make_options(Props) ->
- Options0 = lists:ukeysort(1, convert_options(Props)),
+-spec make_options(#{}) -> #{}.
+make_options(#{} = RepDoc) ->
+ Options0 = maps:fold(fun convert_options/3, #{}, RepDoc)
Options = check_options(Options0),
- DefWorkers = config:get("replicator", "worker_processes", "4"),
- DefBatchSize = config:get("replicator", "worker_batch_size", "500"),
- DefConns = config:get("replicator", "http_connections", "20"),
- DefTimeout = config:get("replicator", "connection_timeout", "30000"),
- DefRetries = config:get("replicator", "retries_per_request", "5"),
- UseCheckpoints = config:get("replicator", "use_checkpoints", "true"),
- DefCheckpointInterval = config:get("replicator", "checkpoint_interval",
- "30000"),
- {ok, DefSocketOptions} = couch_util:parse_term(
- config:get("replicator", "socket_options",
- "[{keepalive, true}, {nodelay, false}]")),
- lists:ukeymerge(1, Options, lists:keysort(1, [
- {connection_timeout, list_to_integer(DefTimeout)},
- {retries, list_to_integer(DefRetries)},
- {http_connections, list_to_integer(DefConns)},
- {socket_options, DefSocketOptions},
- {worker_batch_size, list_to_integer(DefBatchSize)},
- {worker_processes, list_to_integer(DefWorkers)},
- {use_checkpoints, list_to_existing_atom(UseCheckpoints)},
- {checkpoint_interval, list_to_integer(DefCheckpointInterval)}
- ])).
--spec convert_options([_]) -> [_].
- [];
-convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V)->
+ ConfigOptions = lists:foldl(fun({K, Default, ConversionFun}, Acc) ->
+ V = ConversionFun(config:get("replicator", K, Default)),
+ Acc#{list_to_binary(K) => V}
+ end, #{}, ?CONFIG_DEFAULTS),
+ maps:merge(ConfigOptions, Options).
+-spec convert_options(binary(), any(), #{}) -> #{}.
+convert_options(<<"cancel">>, V, _Acc) when not is_boolean(V)->
throw({bad_request, <<"parameter `cancel` must be a boolean">>});
-convert_options([{<<"cancel">>, V} | R]) ->
- [{cancel, V} | convert_options(R)];
-convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
+convert_options(<<"cancel">>, V, Acc) ->
+ Acc#{<<"cancel">> => V};
+convert_options(IdOpt, V, Acc) when IdOpt =:= <<"_local_id">>;
IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
- [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)->
+ Acc#{<<"id">> => couch_replicator_ids:convert(V)};
+convert_options(<<"create_target">>, V, _Acc) when not is_boolean(V)->
throw({bad_request, <<"parameter `create_target` must be a boolean">>});
-convert_options([{<<"create_target">>, V} | R]) ->
- [{create_target, V} | convert_options(R)];
-convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) ->
+convert_options(<<"create_target">>, V, Acc) ->
+ Acc#{<<"create_target">> => V};
+convert_options(<<"create_target_params">>, V, _Acc) when not is_tuple(V) ->
<<"parameter `create_target_params` must be a JSON object">>});
-convert_options([{<<"create_target_params">>, V} | R]) ->
- [{create_target_params, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V)->
+convert_options(<<"create_target_params">>, V, Acc) ->
+ Acc#{<<"create_target_params">> => V};
+convert_options(<<"continuous">>, V, Acc) when not is_boolean(V)->
throw({bad_request, <<"parameter `continuous` must be a boolean">>});
-convert_options([{<<"continuous">>, V} | R]) ->
- [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
- [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
- [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, null} | R]) ->
- convert_options(R);
-convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
+convert_options(<<"continuous">>, V, Acc) ->
+ Acc#{<<"continuous">> => V};
+convert_options(<<"filter">>, V, Acc) ->
+ Acc#{<<"filter">> => V};
+convert_options(<<"query_params">>, V, Acc) ->
+ Acc#{<<"query_params">> => V};
+convert_options(<<"doc_ids">>, null, Acc) ->
+ Acc;
+convert_options(<<"doc_ids">>, V, _Acc) when not is_list(V) ->
throw({bad_request, <<"parameter `doc_ids` must be an array">>});
-convert_options([{<<"doc_ids">>, V} | R]) ->
+convert_options(<<"doc_ids">>, V, Acc) ->
% Ensure same behaviour as old replicator: accept a list of percent
% encoded doc IDs.
DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]),
- [{doc_ids, DocIds} | convert_options(R)];
-convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
+ Acc#{<<"doc_ids">> => DocIds};
+convert_options(<<"selector">>, V, _Acc) when not is_tuple(V) ->
throw({bad_request, <<"parameter `selector` must be a JSON object">>});
-convert_options([{<<"selector">>, V} | R]) ->
- [{selector, V} | convert_options(R)];
-convert_options([{<<"worker_processes">>, V} | R]) ->
- [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"worker_batch_size">>, V} | R]) ->
- [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_connections">>, V} | R]) ->
- [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"connection_timeout">>, V} | R]) ->
- [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"retries_per_request">>, V} | R]) ->
- [{retries, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"socket_options">>, V} | R]) ->
- {ok, SocketOptions} = couch_util:parse_term(V),
- [{socket_options, SocketOptions} | convert_options(R)];
-convert_options([{<<"since_seq">>, V} | R]) ->
- [{since_seq, V} | convert_options(R)];
-convert_options([{<<"use_checkpoints">>, V} | R]) ->
- [{use_checkpoints, V} | convert_options(R)];
-convert_options([{<<"checkpoint_interval">>, V} | R]) ->
- [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([_ | R]) -> % skip unknown option
- convert_options(R).
--spec check_options([_]) -> [_].
+convert_options(<<"selector">>, V, Acc) ->
+ Acc#{<<"selector">> => V};
+convert_options(<<"worker_processes">>, V, Acc) ->
+ Acc#{<<"worker_processes">> => couch_util:to_integer(V)};
+convert_options(<<"worker_batch_size">>, V, Acc) ->
+ Acc#{<<"worker_batch_size">> => couch_util:to_integer(V)};
+convert_options(<<"http_connections">>, V, Acc) ->
+ Acc#{<<"http_connections">> => couch_util:to_integer(V)};
+convert_options(<<"connection_timeout">>, V, Acc) ->
+ Acc#{<<"connection_timeout">> => couch_util:to_integer(V)};
+convert_options(<<"retries_per_request">>, V, Acc) ->
+ Acc#{<<"retries">> => couch_util:to_integer(V)};
+convert_options(<<"socket_options">>, V, Acc) ->
+ Acc#{<<"socket_options">> => parse_sock_opts(V)};
+convert_options(<<"since_seq">>, V, Acc) ->
+ Acc#{<<"since_seq">> => V};
+convert_options(<<"use_checkpoints">>, V, Acc) when not is_boolean(V)->
+ throw({bad_request, <<"parameter `use_checkpoints` must be a boolean">>});
+convert_options(<<"use_checkpoints">>, V, Acc) ->
+ Acc#{<<"use_checkpoints">> => V};
+convert_options(<<"checkpoint_interval">>, V, Acc) ->
+ Acc#{<<"checkpoint_interval">>, couch_util:to_integer(V)};
+convert_options(_K, _V, Acc) -> % skip unknown option
+ Acc.
+-spec check_options(#{}) -> #{}.
check_options(Options) ->
- DocIds = lists:keyfind(doc_ids, 1, Options),
- Filter = lists:keyfind(filter, 1, Options),
- Selector = lists:keyfind(selector, 1, Options),
+ DocIds = maps:is_key(<<"doc_ids">>, Options),
+ Filter = maps:is_key(<<"filter">>, Options),
+ Selector = maps:is_key(<<"selector">>, Options),
case {DocIds, Filter, Selector} of
{false, false, false} -> Options;
{false, false, _} -> Options;
@@ -553,66 +517,113 @@ check_options(Options) ->
--spec parse_proxy_params(binary() | [_]) -> [_].
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
- parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
- [];
-parse_proxy_params(ProxyUrl) ->
+parse_sock_opts(V) ->
+ {ok, SocketOptions} = couch_util:parse_term(V),
+ lists:foldl(fun
+ ({K, V}, Acc) when is_atom(K) ->
+ case lists:member(K, ?VALID_SOCKET_OPTIONS) of
+ true -> Acc#{atom_to_binary(K) => V};
+ false -> Acc
+ end;
+ (_, Acc) ->
+ Acc
+ end, #{}, SocketOptions).
+-spec parse_proxy_params(binary() | #{}) -> #{}.
+parse_proxy_params(<<>>) ->
+ #{};
+parse_proxy_params(ProxyUrl0) when is_binary(ProxyUrl0)->
+ ProxyUrl = binary_to_list(ProxyUrl0),
host = Host,
port = Port,
username = User,
password = Passwd,
- protocol = Protocol
+ protocol = Protocol0
} = ibrowse_lib:parse_url(ProxyUrl),
- [
- {proxy_protocol, Protocol},
- {proxy_host, Host},
- {proxy_port, Port}
- ] ++ case is_list(User) andalso is_list(Passwd) of
+ Protocol = atom_to_binary(Protocol, utf8),
+ case lists:member(Protocol, [<<"http">>, <<"https">>, <<"socks5">>]) of
+ true ->
+ atom_to_binary(Protocol, utf8);
false ->
- [];
+ Error = <<"Unsupported proxy protocol", Protocol/binary>>,
+ throw({bad_request, Error})
+ end,
+ ProxyParams = #{
+ <<"proxy_url">> => ProxyUrl,
+ <<"proxy_protocol">> => Protocol,
+ <<"proxy_host">> => list_to_binary(Host),
+ <<"proxy_port">> => Port
+ #},
+ case is_list(User) andalso is_list(Passwd) of
true ->
- [{proxy_user, User}, {proxy_password, Passwd}]
- end.
+ ProxyParams#{
+ <<"proxy_user">> => list_to_binary(User),
+ <<"proxy_password">> => list_to_binary(Passwd)
+ };
+ false ->
+ ProxyParams
+ end.
--spec ssl_params([_]) -> [_].
+-spec ssl_params(binary()) -> #{}.
ssl_params(Url) ->
- case ibrowse_lib:parse_url(Url) of
+ case ibrowse_lib:parse_url(binary_to_list(Url)) of
#url{protocol = https} ->
Depth = list_to_integer(
config:get("replicator", "ssl_certificate_max_depth", "3")
VerifyCerts = config:get("replicator", "verify_ssl_certificates"),
- CertFile = config:get("replicator", "cert_file", undefined),
- KeyFile = config:get("replicator", "key_file", undefined),
- Password = config:get("replicator", "password", undefined),
- SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
- SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of
+ CertFile = config:get("replicator", "cert_file", null),
+ KeyFile = config:get("replicator", "key_file", null),
+ Password = config:get("replicator", "password", null),
+ VerifySslOptions = ssl_verify_options(VerifyCerts =:= "true"),
+ SslOpts = maps:merge(VerifySslOptions, #{<<"depth">> => Depth}),
+ SslOpts1 = case CertFile /= null andalso KeyFile /= null of
true ->
- case Password of
- undefined ->
- [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
+ CertFileOpts = case Password of
+ null ->
+ #{
+ <<"certfile">> => list_to_binary(CertFile),
+ <<"keyfile">> => list_to_binary(KeyFile)
+ };
_ ->
- [{certfile, CertFile}, {keyfile, KeyFile},
- {password, Password}] ++ SslOpts
- end;
- false -> SslOpts
+ #{
+ <<"certfile">> => list_to_binary(CertFile),
+ <<"keyfile">> => list_to_binary(KeyFile),
+ <<"password">> => list_to_binary(Password)
+ }
+ end,
+ maps:merge(SslOpts, CertFileOpts)
+ false ->
+ SslOpts
- [{is_ssl, true}, {ssl_options, SslOpts1}];
+ #{<<"is_ssl">> => true, <<"ssl_options">> => SslOpts1};
#url{protocol = http} ->
- []
+ #{}
-spec ssl_verify_options(true | false) -> [_].
ssl_verify_options(true) ->
- CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
- [{verify, verify_peer}, {cacertfile, CAFile}];
+ case config:get("replicator", "ssl_trusted_certificates_file", undefined) of
+ undefined ->
+ #{
+ <<"verify">> => <<"verify_peer">>,
+ <<"cacertfile">> => null
+ };
+ CAFile when is_list(CAFile) ->
+ #{
+ <<"verify">> => <<"verify_peer">>,
+ <<"cacertfile">> => list_to_binary(CAFile)
+ }
+ end;
ssl_verify_options(false) ->
- [{verify, verify_none}].
+ #{
+ <<"verify">> => <<"verify_none">>
+ }.
-spec before_doc_update(#doc{}, Db::any(), couch_db:update_type()) -> #doc{}.
@@ -622,7 +633,7 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
roles = Roles,
name = Name
- } = couch_db:get_user_ctx(Db),
+ } = fabric2_db:get_user_ctx(Db),
case lists:member(<<"_replicator">>, Roles) of
true ->
@@ -633,7 +644,7 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
Name ->
Other ->
- case (catch couch_db:check_is_admin(Db)) of
+ case (catch fabric2_db:check_is_admin(Db)) of
ok when Other =:= null ->
Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
ok ->
@@ -650,8 +661,8 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
after_doc_read(#doc{body = {Body}} = Doc, Db) ->
- #user_ctx{name = Name} = couch_db:get_user_ctx(Db),
- case (catch couch_db:check_is_admin(Db)) of
+ #user_ctx{name = Name} = fabric2_db:get_user_ctx(Db),
+ case (catch fabric2_db:check_is_admin(Db)) of
ok ->
_ ->
@@ -659,16 +670,15 @@ after_doc_read(#doc{body = {Body}} = Doc, Db) ->
Name ->
_Other ->
- Source = strip_credentials(couch_util:get_value(<<"source">>,
- Target = strip_credentials(couch_util:get_value(<<"target">>,
+ Source0 = couch_util:get_value(<<"source">>, Body),
+ Target0 = couch_util:get_value(<<"target">>, Body),
+ Source = strip_credentials(Source0),
+ Target = strip_credentials(Target0),
NewBody0 = ?replace(Body, <<"source">>, Source),
NewBody = ?replace(NewBody0, <<"target">>, Target),
#doc{revs = {Pos, [_ | Revs]}} = Doc,
NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
- NewRevId = couch_db:new_revid(NewDoc),
- NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
+ fabric2_db:new_revid(NewDoc)
@@ -779,27 +789,24 @@ check_strip_credentials_test() ->
setup() ->
DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- create_vdu(DbName),
+ {ok, Db} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+ create_vdu(Db),
teardown(DbName) when is_binary(DbName) ->
- couch_server:delete(DbName, [?ADMIN_CTX]),
+ fabric2_db:delete(DbName, [?ADMIN_CTX]),
-create_vdu(DbName) ->
- couch_util:with_db(DbName, fun(Db) ->
- VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
- Doc = #doc{
- id = <<"_design/vdu">>,
- body = {[{<<"validate_doc_update">>, VduFun}]}
- },
- {ok, _} = couch_db:update_docs(Db, [Doc]),
- couch_db:ensure_full_commit(Db)
- end).
+create_vdu(Db) ->
+ VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>,
+ Doc = #doc{
+ id = <<"_design/vdu">>,
+ body = {[{<<"validate_doc_update">>, VduFun}]}
+ },
+ {ok, _} = fabric2_db:update_doc(Db, [Doc]),
+ ok.
update_replicator_doc_with_bad_vdu_test_() ->
diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl
index c8980001a..b14ea3475 100644
--- a/src/couch_replicator/src/couch_replicator_filters.erl
+++ b/src/couch_replicator/src/couch_replicator_filters.erl
@@ -88,22 +88,24 @@ fetch(DDocName, FilterName, Source) ->
% Get replication type and view (if any) from replication document props
--spec view_type([_], [_]) ->
- {view, {binary(), binary()}} | {db, nil} | {error, binary()}.
-view_type(Props, Options) ->
- case couch_util:get_value(<<"filter">>, Props) of
- <<"_view">> ->
- {QP} = couch_util:get_value(query_params, Options, {[]}),
- ViewParam = couch_util:get_value(<<"view">>, QP),
- case re:split(ViewParam, <<"/">>) of
- [DName, ViewName] ->
- {view, {<< "_design/", DName/binary >>, ViewName}};
- _ ->
- {error, <<"Invalid `view` parameter.">>}
- end;
+-spec view_type(#{}, [_]) ->
+ {binary(), #{}} | {error, binary()}.
+view_type(#{<<"filter">> := <<"_view">>}, Options) ->
+ {QP} = couch_util:get_value(query_params, Options, {[]}),
+ ViewParam = couch_util:get_value(<<"view">>, QP),
+ case re:split(ViewParam, <<"/">>) of
+ [DName, ViewName] ->
+ DDocMap = #{
+ <<"ddoc">> => <<"_design/",DName/binary>>,
+ <<"view">> => ViewName
+ },
+ {<<"view">>, DDocMap};
_ ->
- {db, nil}
- end.
+ {error, <<"Invalid `view` parameter.">>}
+ end;
+view_type(#{}, [_] = Options) ->
+ {<<"db">>, #{}}.
% Private functions
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 04e71c3ef..a3f622046 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -30,28 +30,29 @@
% {filter_fetch_error, Error} exception.
-replication_id(#rep{options = Options} = Rep) ->
+replication_id(#{<<"options">> := Options} = Rep) ->
BaseId = replication_id(Rep, ?REP_ID_VERSION),
- {BaseId, maybe_append_options([continuous, create_target], Options)}.
+ UseOpts = [<<"continuous">>, <<"create_target">>]
+ {BaseId, maybe_append_options(UseOpts, Options)}.
% Versioned clauses for generating replication IDs.
% If a change is made to how replications are identified,
% please add a new clause and increase ?REP_ID_VERSION.
-replication_id(#rep{} = Rep, 4) ->
+replication_id(#{<<"source">> := Src, <<"target">> := Tgt} = Rep, 4) ->
UUID = couch_server:get_uuid(),
- SrcInfo = get_v4_endpoint(Rep#rep.source),
- TgtInfo = get_v4_endpoint(,
+ SrcInfo = get_v4_endpoint(Src),
+ TgtInfo = get_v4_endpoint(Tgt),
maybe_append_filters([UUID, SrcInfo, TgtInfo], Rep);
-replication_id(#rep{} = Rep, 3) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 3) ->
UUID = couch_server:get_uuid(),
- Src = get_rep_endpoint(Rep#rep.source),
- Tgt = get_rep_endpoint(,
+ Src = get_rep_endpoint(Src0),
+ Tgt = get_rep_endpoint(Tgt0),
maybe_append_filters([UUID, Src, Tgt], Rep);
-replication_id(#rep{} = Rep, 2) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 2) ->
{ok, HostName} = inet:gethostname(),
Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
P when is_number(P) ->
@@ -64,14 +65,14 @@ replication_id(#rep{} = Rep, 2) ->
% ... mochiweb_socket_server:get(https, port)
list_to_integer(config:get("httpd", "port", "5984"))
- Src = get_rep_endpoint(Rep#rep.source),
- Tgt = get_rep_endpoint(,
+ Src = get_rep_endpoint(Src0),
+ Tgt = get_rep_endpoint(Tgt0),
maybe_append_filters([HostName, Port, Src, Tgt], Rep);
-replication_id(#rep{} = Rep, 1) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 1) ->
{ok, HostName} = inet:gethostname(),
- Src = get_rep_endpoint(Rep#rep.source),
- Tgt = get_rep_endpoint(,
+ Src = get_rep_endpoint(Src0),
+ Tgt = get_rep_endpoint(Tgt0),
maybe_append_filters([HostName, Src, Tgt], Rep).
@@ -83,15 +84,23 @@ convert(Id0) when is_binary(Id0) ->
% the URL path. So undo the incorrect parsing here to avoid forcing
% users to url encode + characters.
Id = binary:replace(Id0, <<" ">>, <<"+">>, [global]),
- lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id));
-convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) ->
+ case binary:split(Id, <<"+">>) of
+ [BaseId, Ext] -> {BaseId, Ext};
+ [BaseId] -> {BaseId, <<>>}
+ end
+convert({BaseId, Ext}) when is_list(BaseId), is_list(Ext) ->
+ {list_to_binary(BaseId), list_to_binary(Ext)};
+convert({BaseId, Ext} = Id) when is_binary(BaseId), is_binary(Ext) ->
% Private functions
- #rep{source = Source, options = Options}) ->
+maybe_append_filters(Base, #{} = Rep) ->
+ #{
+ <<"source">> := Source,
+ <<"options">> := Options
+ } = Rep,
Base2 = Base ++
case couch_replicator_filters:parse(Options) of
{ok, nil} ->
@@ -112,7 +121,8 @@ maybe_append_filters(Base,
{error, FilterParseError} ->
throw({error, FilterParseError})
- couch_util:to_hex(couch_hash:md5_hash(term_to_binary(Base2))).
+ Res = couch_util:to_hex(couch_hash:md5_hash(term_to_binary(Base2))),
+ list_to_binary(Res).
maybe_append_options(Options, RepOptions) ->
@@ -127,12 +137,19 @@ maybe_append_options(Options, RepOptions) ->
end, [], Options).
-get_rep_endpoint(#httpdb{url=Url, headers=Headers}) ->
+get_rep_endpoint(#{<<"url">> := Url0, <<"headers">> := Headers0}) ->
+ Url = binary_to_list(Url0),
+ % We turn headers into a proplist of string() KVs to calculate
+ % the same replication ID as CouchDB 2.x
+ Headers1 = maps:fold(fun(K, V, Acc) ->
+ [{binary_to_list(K), binary_to_list(V)} | Acc]
+ end, [], Header0),
+ Headers2 = lists:keysort(1, Headers1),
DefaultHeaders = (#httpdb{})#httpdb.headers,
- {remote, Url, Headers -- DefaultHeaders}.
+ {remote, Url, Headers2 -- DefaultHeaders}.
-get_v4_endpoint(#httpdb{} = HttpDb) ->
+get_v4_endpoint(#{} = HttpDb) ->
{remote, Url, Headers} = get_rep_endpoint(HttpDb),
{{UserFromHeaders, _}, HeadersWithoutBasicAuth} =
@@ -141,7 +158,6 @@ get_v4_endpoint(#httpdb{} = HttpDb) ->
OAuth = undefined, % Keep this to ensure checkpoints don't change
{remote, User, Host, NonDefaultPort, Path, HeadersWithoutBasicAuth, OAuth}.
pick_defined_value(Values) ->
case [V || V <- Values, V /= undefined] of
[] ->
diff --git a/src/couch_replicator/src/couch_replicator_notifier.erl b/src/couch_replicator/src/couch_replicator_notifier.erl
deleted file mode 100644
index f7640a349..000000000
--- a/src/couch_replicator/src/couch_replicator_notifier.erl
+++ /dev/null
@@ -1,58 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-% public API
--export([start_link/1, stop/1, notify/1]).
-% gen_event callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_event/2, handle_call/2, handle_info/2]).
-start_link(FunAcc) ->
- couch_event_sup:start_link(couch_replication,
- {couch_replicator_notifier, make_ref()}, FunAcc).
-notify(Event) ->
- gen_event:notify(couch_replication, Event).
-stop(Pid) ->
- couch_event_sup:stop(Pid).
-init(FunAcc) ->
- {ok, FunAcc}.
-terminate(_Reason, _State) ->
- ok.
-handle_event(Event, Fun) when is_function(Fun, 1) ->
- Fun(Event),
- {ok, Fun};
-handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) ->
- Acc2 = Fun(Event, Acc),
- {ok, {Fun, Acc2}}.
-handle_call(_Msg, State) ->
- {ok, ok, State}.
-handle_info(_Msg, State) ->
- {ok, State}.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 565a2bd97..e8ddc8443 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -15,7 +15,7 @@
- start_link/1
+ start_link/3
@@ -39,17 +39,16 @@
--import(couch_replicator_utils, [
- pp_rep_id/1
-define(LOWEST_SEQ, 0).
-record(rep_state, {
- rep_details,
+ job,
+ job_data,
+ id,
+ base_id,
@@ -73,37 +72,36 @@
stats = couch_replicator_stats:new(),
- source_monitor = nil,
- target_monitor = nil,
source_seq = nil,
use_checkpoints = true,
checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
type = db,
- view = nil
+ view = nil,
+ user = null,
+ options = #{}
-start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
- RepChildId = BaseId ++ Ext,
- Source = couch_replicator_api_wrap:db_uri(Src),
- Target = couch_replicator_api_wrap:db_uri(Tgt),
- ServerName = {global, {?MODULE,}},
- case gen_server:start_link(ServerName, ?MODULE, Rep, []) of
+start_link(#{] = Job, #{} = JobData) ->
+ case gen_server:start_link(?MODULE, {Job, JobData}, []) of
{ok, Pid} ->
{ok, Pid};
{error, Reason} ->
- couch_log:warning("failed to start replication `~s` (`~s` -> `~s`)",
- [RepChildId, Source, Target]),
+ #{<<"rep">> := Rep} = JobData,
+ {<<"id">> := Id, <<"source">> := Src, <<"target">> := Ttg} = Rep,
+ Source = couch_replicator_api_wrap:db_uri(Src),
+ Target = couch_replicator_api_wrap:db_uri(Tgt),
+ ErrMsg = "failed to start replication `~s` (`~s` -> `~s`)",
+ couch_log:warning(ErrMsg, [RepId, Source, Target]),
{error, Reason}
-init(InitArgs) ->
- {ok, InitArgs, 0}.
+init({#{} = Job, #{} = JobData}) ->
+ {ok, {Job, JobData}, 0}.
-do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
+do_init(#{} = Job, #{} = JobData) ->
process_flag(trap_exit, true),
@@ -115,8 +113,12 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
target_name = TargetName,
start_seq = {_Ts, StartSeq},
highest_seq_done = {_, HighestSeq},
- checkpoint_interval = CheckpointInterval
- } = State = init_state(Rep),
+ checkpoint_interval = CheckpointInterval,
+ user = User,
+ options = Options,
+ doc_id = DocId,
+ db_name = DbName
+ } = State = init_state(Job, JobData),
NumWorkers = get_value(worker_processes, Options),
BatchSize = get_value(worker_batch_size, Options),
@@ -147,10 +149,10 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
{type, replication},
- {user,},
- {replication_id, ?l2b(BaseId ++ Ext)},
- {database, Rep#rep.db_name},
- {doc_id, Rep#rep.doc_id},
+ {user, User},
+ {replication_id,},
+ {database, DbName},
+ {doc_id, DocId},
{source, ?l2b(SourceName)},
{target, ?l2b(TargetName)},
{continuous, get_value(continuous, Options, false)},
@@ -159,16 +161,6 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
] ++ rep_stats(State)),
- % Until OTP R14B03:
- %
- % Restarting a temporary supervised child implies that the original arguments
- % (#rep{} record) specified in the MFA component of the supervisor
- % child spec will always be used whenever the child is restarted.
- % This implies the same replication performance tunning parameters will
- % always be used. The solution is to delete the child spec (see
- % cancel_replication/1) and then start the replication again, but this is
- % unfortunately not immune to race conditions.
couch_log:debug("Worker pids are: ~p", [Workers]),
@@ -222,7 +214,6 @@ handle_call({report_seq_done, Seq, StatsInc}, From,
{noreply, NewState}.
handle_cast(checkpoint, State) ->
case do_checkpoint(State) of
{ok, NewState} ->
@@ -242,14 +233,6 @@ handle_cast({report_seq, Seq},
handle_info(shutdown, St) ->
{stop, shutdown, St};
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
- couch_log:error("Source database is down. Reason: ~p", [Why]),
- {stop, source_db_down, St};
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
- couch_log:error("Target database is down. Reason: ~p", [Why]),
- {stop, target_db_down, St};
handle_info({'EXIT', Pid, max_backoff}, State) ->
couch_log:error("Max backoff reached child process ~p", [Pid]),
{stop, {shutdown, max_backoff}, State};
@@ -308,9 +291,10 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
{stop, {worker_died, Pid, Reason}, State2}
-handle_info(timeout, InitArgs) ->
- try do_init(InitArgs) of {ok, State} ->
- {noreply, State}
+handle_info(timeout, {#{} = Job, #{} = JobData} = InitArgs) ->
+ try do_init(Job, JobData) of
+ {ok, State} ->
+ {noreply, State}
exit:{http_request_failed, _, _, max_backoff} ->
{stop, {shutdown, max_backoff}, {error, InitArgs}};
@@ -325,13 +309,12 @@ handle_info(timeout, InitArgs) ->
-terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
- checkpoint_history = CheckpointHistory} = State) ->
- terminate_cleanup(State),
- couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
- doc_update_completed(Rep, rep_stats(State));
+terminate(normal, #rep_state{} = State) ->
+ % Note: when terminating `normal`, the job was already marked as finished.
+ % if that fails then we'd end up in the error terminate clause
+ terminate_cleanup(State).
-terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
+terminate(shutdown, #rep_state{id = RepId} = State) ->
% Replication stopped via _scheduler_sup:terminate_child/1, which can be
% occur during regular scheduler operation or when job is removed from
% the scheduler.
@@ -343,53 +326,57 @@ terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
couch_log:error(LogMsg, [?MODULE, RepId, Error]),
- couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}),
+ finish_couch_job(State1, <<"stopped">>, null),
-terminate({shutdown, max_backoff}, {error, InitArgs}) ->
- #rep{id = {BaseId, Ext} = RepId} = InitArgs,
+terminate({shutdown, max_backoff}, {error, {#{} = Job, #{} = JobData}}) ->
+ % Here we handle the case when replication fails during initialization.
+ % That is before the #rep_state{} is even built.
+ #{<<"rep">> := #{<<"id">> := RepId}} = JobData,
couch_stats:increment_counter([couch_replicator, failed_starts]),
- couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]),
- couch_replicator_notifier:notify({error, RepId, max_backoff});
-terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) ->
- #rep{
- id = {BaseId, Ext} = RepId,
- source = Source0,
- target = Target0,
- doc_id = DocId,
- db_name = DbName
- } = InitArgs,
+ couch_log:warning("Replication `~s` reached max backoff ", [RepId]),
+ finish_couch_job(Job, JobData, <<"error">>, max_backoff);
+terminate({shutdown, {error, Error}}, {error, Class, Stack, {Job, JobData}}) ->
+ % Here we handle the case when replication fails during initialization.
+ #{<<"rep">> := Rep} = JobData,
+ #{
+ <<"id">> := Id,
+ <<"source">> := Source0,
+ <<"target">> := Target0,
+ <<"doc_id">> := DocId,
+ <<"db_name">> := DbName
+ } = Rep,
Source = couch_replicator_api_wrap:db_uri(Source0),
Target = couch_replicator_api_wrap:db_uri(Target0),
- RepIdStr = BaseId ++ Ext,
Msg = "~p:~p: Replication ~s failed to start ~p -> ~p doc ~p:~p stack:~p",
- couch_log:error(Msg, [Class, Error, RepIdStr, Source, Target, DbName,
+ couch_log:error(Msg, [Class, Error, RepId, Source, Target, DbName,
DocId, Stack]),
couch_stats:increment_counter([couch_replicator, failed_starts]),
- couch_replicator_notifier:notify({error, RepId, Error});
+ finish_couch_job(Job, JobData, <<"error">>, Error);
-terminate({shutdown, max_backoff}, State) ->
+terminate({shutdown, max_backoff}, #rep_state{} = State) ->
+ id = RepId,
source_name = Source,
target_name = Target,
- rep_details = #rep{id = {BaseId, Ext} = RepId}
} = State,
couch_log:error("Replication `~s` (`~s` -> `~s`) reached max backoff",
- [BaseId ++ Ext, Source, Target]),
+ [RepId, Source, Target]),
- couch_replicator_notifier:notify({error, RepId, max_backoff});
+ finish_couch_job(State, <<"error">>, max_backoff);
terminate(Reason, State) ->
+ #rep_state{
+ id = RepId,
source_name = Source,
target_name = Target,
- rep_details = #rep{id = {BaseId, Ext} = RepId}
} = State,
couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
- [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
+ [RepId, Source, Target, to_binary(Reason)]),
- couch_replicator_notifier:notify({error, RepId, Reason}).
+ finish_couch_job(State, <<"error">>, Reason).
terminate_cleanup(State) ->
@@ -403,22 +390,19 @@ code_change(_OldVsn, #rep_state{}=State, _Extra) ->
format_status(_Opt, [_PDict, State]) ->
+ id = Id,
source = Source,
target = Target,
- rep_details = RepDetails,
start_seq = StartSeq,
source_seq = SourceSeq,
committed_seq = CommitedSeq,
current_through_seq = ThroughSeq,
highest_seq_done = HighestSeqDone,
- session_id = SessionId
- } = state_strip_creds(State),
- #rep{
- id = RepId,
- options = Options,
+ session_id = SessionId,
doc_id = DocId,
- db_name = DbName
- } = RepDetails,
+ db_name = DbName,
+ options = Options
+ } = state_strip_creds(State),
{rep_id, RepId},
{source, couch_replicator_api_wrap:db_uri(Source)},
@@ -462,73 +446,108 @@ httpdb_strip_creds(LocalDb) ->
-rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
- Rep#rep{
- source = httpdb_strip_creds(Source),
- target = httpdb_strip_creds(Target)
- }.
-state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
- % #rep_state contains the source and target at the top level and also
- % in the nested #rep_details record
+state_strip_creds(#rep_state{source = Source, target = Target} = State) ->
- rep_details = rep_strip_creds(Rep),
source = httpdb_strip_creds(Source),
target = httpdb_strip_creds(Target)
-adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
+adjust_maxconn(Src = #{<<"http_connections">> : = 1}, RepId) ->
Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
couch_log:notice(Msg, [RepId]),
- Src#httpdb{http_connections = 2};
+ Src#{<<"http_connections">> := 2};
adjust_maxconn(Src, _RepId) ->
--spec doc_update_triggered(#rep{}) -> ok.
-doc_update_triggered(#rep{db_name = null}) ->
+-spec doc_update_triggered(#rep_state{}) -> ok.
+doc_update_triggered(#rep_state{db_name = null}) ->
-doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) ->
+doc_update_triggered(#rep_state{} = State) ->
+ #rep_state{id = Id, doc_id = DocId, db_name = DbName} = State,
case couch_replicator_doc_processor:update_docs() of
true ->
- couch_replicator_docs:update_triggered(Rep, RepId);
+ couch_replicator_docs:update_triggered(Id, DocId, DbName);
false ->
- couch_log:notice("Document `~s` triggered replication `~s`",
- [DocId, pp_rep_id(RepId)]),
+ couch_log:notice("Document `~s` triggered replication `~s`", [DocId, Id]),
--spec doc_update_completed(#rep{}, list()) -> ok.
-doc_update_completed(#rep{db_name = null}, _Stats) ->
+-spec doc_update_completed(#rep_state{}) -> ok.
+doc_update_completed(#rep_state{db_name = null}) ->
-doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName,
- start_time = StartTime}, Stats0) ->
- Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(StartTime)}],
+doc_update_completed(#rep_state{} = State) ->
+ #rep_state{
+ id = Id,
+ doc_id = DocId,
+ db_name = DbName,
+ start_time = Start,
+ stats = Stats0
+ } = State,
+ Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(Start)}],
couch_replicator_docs:update_doc_completed(DbName, DocId, Stats),
- couch_log:notice("Replication `~s` completed (triggered by `~s`)",
- [pp_rep_id(RepId), DocId]),
+ couch_log:notice("Replication `~s` completed (triggered by `~s:~s`)",
+ [Id, DbName, DocId]),
do_last_checkpoint(#rep_state{seqs_in_progress = [],
highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
- {stop, normal, cancel_timer(State)};
+ History = State#rep_state.checkoint_history,
+ Result = case finish_couch_job(State, <<"completed">>, History) of
+ ok -> normal;
+ {error, _} = Error -> Error
+ end,
+ {stop, Result, cancel_timer(State)};
do_last_checkpoint(#rep_state{seqs_in_progress = [],
highest_seq_done = Seq} = State) ->
case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
{ok, NewState} ->
couch_stats:increment_counter([couch_replicator, checkpoints, success]),
- {stop, normal, cancel_timer(NewState)};
+ History = NewState#rep_state.checkpoint_history,
+ Result = case finish_couch_job(NewState, <<"completed">>, History) of
+ ok -> normal;
+ {error, _} = Error -> Error
+ end,
+ {stop, Result, cancel_timer(NewState)};
Error ->
couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
{stop, Error, State}
+finish_couch_job(#rep_state{} = State, FinishedState, Result) ->
+ #rep_state{job = Job, job_data = Jobdata} = State,
+ finish_couch_job(Job, JobData, FinishedState, Result).
+finish_couch_job(#{} = Job, #{} = JobData, FinishState, Result0) ->
+ #{<<"rep">> := #{<<"id">> := RepId}} = JobData,
+ case Result of
+ null -> null;
+ #{} -> Result0;
+ <<_/binary>> -> Result0;
+ Atom when is_atom(Atom) -> atom_to_binary(Atom, utf8)
+ Other -> couch_replicator_utils:rep_error_to_binary(Result0)
+ end,
+ JobData= JobData0#{
+ <<"finished_state">> => FinishState,
+ <<"finished_result">> => Result
+ },
+ case couch_jobs:finish(undefined, Job, JobData) of
+ ok ->
+ doc_update_completed(State),
+ ok;
+ {error, Error} ->
+ Msg = "Replication ~s job could not finish. Error:~p",
+ couch_log:error(Msg, [RepId, Error]),
+ {error, Error}
+ end.
start_timer(State) ->
After = State#rep_state.checkpoint_interval,
case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
@@ -547,21 +566,36 @@ cancel_timer(#rep_state{timer = Timer} = State) ->
State#rep_state{timer = nil}.
-init_state(Rep) ->
- #rep{
- id = {BaseId, _Ext},
- source = Src0, target = Tgt,
- options = Options,
- type = Type, view = View,
- start_time = StartTime,
- stats = Stats
+init_state(#{} = Job, #{<<"rep">> =: Rep}} = JobData) ->
+ #{
+ <<"id">> := Id,
+ <<"base_id">> := BaseId,
+ <<"source">> := Src0,
+ <<"target">> := Tgt,
+ <<"type">> := Type,
+ <<"view">> := View,
+ <<"start_time">> := StartTime,
+ <<"stats">> := Stats,
+ <<"options">> := OptionsMap,
+ <<"user_ctx">> := UserCtx,
+ <<"db_name">> := DbName,
+ <<"doc_id">> := DocId,
} = Rep,
+ Options = maps:fold(fun(K, V, Acc) ->
+ [{binary_to_atom(K, utf8), V} | Acc]
+ end, [], OptionsMap),
% Adjust minimum number of http source connections to 2 to avoid deadlock
Src = adjust_maxconn(Src0, BaseId),
{ok, Source} = couch_replicator_api_wrap:db_open(Src),
{CreateTargetParams} = get_value(create_target_params, Options, {[]}),
{ok, Target} = couch_replicator_api_wrap:db_open(Tgt,
- get_value(create_target, Options, false), CreateTargetParams),
+ CreateTgt = get_value(create_target, Options, false),
+ CreateParams = maps:to_list(get_value(create_target_params, Options, #{}),
+ {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, UserCtx, CreateTgt,
+ CreateParams),
{ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
{ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
@@ -576,7 +610,10 @@ init_state(Rep) ->
#doc{body={CheckpointHistory}} = SourceLog,
State = #rep_state{
- rep_details = Rep,
+ job = Job,
+ job_data = JobData,
+ id = Id,
+ base_id = BaseId,
source_name = couch_replicator_api_wrap:db_uri(Source),
target_name = couch_replicator_api_wrap:db_uri(Target),
source = Source,
@@ -592,28 +629,27 @@ init_state(Rep) ->
src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
session_id = couch_uuids:random(),
- source_monitor = db_monitor(Source),
- target_monitor = db_monitor(Target),
source_seq = SourceSeq,
- use_checkpoints = get_value(use_checkpoints, Options, true),
- checkpoint_interval = get_value(checkpoint_interval, Options,
+ use_checkpoints = get_value(use_checkpoints, Options),
+ checkpoint_interval = get_value(checkpoint_interval, Options),
type = Type,
view = View,
stats = Stats
+ doc_id = DocId,
+ db_name = DbName
State#rep_state{timer = start_timer(State)}.
-find_and_migrate_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
+find_and_migrate_logs(DbList, #{<<"base_id">> := BaseId} = Rep) ->
LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
- fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
+ fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, State, []).
fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
-fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
+fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, #{} = Rep, Acc) ->
case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
{error, <<"not_found">>} when Vsn > 1 ->
OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
@@ -633,8 +669,8 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
-maybe_save_migrated_log(Rep, Db, #doc{} = Doc, OldId) ->
- case get_value(use_checkpoints, Rep#rep.options, true) of
+maybe_save_migrated_log(#{<<"options">> = Options}, Db, #doc{} = Doc, OldId) ->
+ case maps:get(<<"use_checkpoints">>, Options) of
true ->
update_checkpoint(Db, Doc),
Msg = "Migrated replication checkpoint. Db:~p ~p -> ~p",
@@ -697,7 +733,7 @@ do_checkpoint(State) ->
src_starttime = SrcInstanceStartTime,
tgt_starttime = TgtInstanceStartTime,
stats = Stats,
- rep_details = #rep{options = Options},
+ options = Options,
session_id = SessionId
} = State,
case commit_to_both(Source, Target) of
@@ -906,14 +942,13 @@ has_session_id(SessionId, [{Props} | Rest]) ->
db_monitor(#httpdb{}) ->
- nil;
+ nil;
db_monitor(Db) ->
- couch_db:monitor(Db).
+ couch_db:monitor(Db).
-get_pending_count(St) ->
- Rep = St#rep_state.rep_details,
- Timeout = get_value(connection_timeout, Rep#rep.options),
+get_pending_count(#rep_state{options = Options} = St) ->
+ Timeout = get_value(connection_timeout, Options),
TimeoutMicro = Timeout * 1000,
case get(pending_count_state) of
{LastUpdate, PendingCount} ->
@@ -960,8 +995,7 @@ update_task(State) ->
-update_scheduler_job_stats(#rep_state{rep_details = Rep, stats = Stats}) ->
- JobId =,
+update_scheduler_job_stats(#rep_state{id = JobId, stats = Stats}) ->
couch_replicator_scheduler:update_job_stats(JobId, Stats).
@@ -998,24 +1032,21 @@ replication_start_error(Error) ->
-log_replication_start(#rep_state{rep_details = Rep} = RepState) ->
- #rep{
- id = {BaseId, Ext},
- doc_id = DocId,
- db_name = DbName,
- options = Options
- } = Rep,
- Id = BaseId ++ Ext,
- Workers = get_value(worker_processes, Options),
- BatchSize = get_value(worker_batch_size, Options),
+log_replication_start(#rep_state{} = RepState) ->
- source_name = Source, % credentials already stripped
- target_name = Target, % credentials already stripped
- session_id = Sid
+ id = Id,
+ doc_id = DocId,
+ db_name = DbName,
+ options = Options,
+ source_name = Source,
+ target_name = Target,
+ session_id = Sid,
} = RepState,
+ Workers = get_value(worker_processes, Options),
+ BatchSize = get_value(worker_batch_size, Options),
From = case DbName of
- ShardName when is_binary(ShardName) ->
- io_lib:format("from doc ~s:~s", [mem3:dbname(ShardName), DocId]);
+ Name when is_binary(Name) ->
+ io_lib:format("from doc ~s:~s", [Name, DocId]);
_ ->
"from _replicate endpoint"
@@ -1048,14 +1079,13 @@ scheduler_job_format_status_test() ->
Target = <<"http://u:p@h2/d2">>,
Rep = #rep{
id = {"base", "+ext"},
- source = couch_replicator_docs:parse_rep_db(Source, [], []),
- target = couch_replicator_docs:parse_rep_db(Target, [], []),
+ source = couch_replicator_docs:parse_rep_db(Source, #{}, #{}),
+ target = couch_replicator_docs:parse_rep_db(Target, #{}, #{}),
options = [{create_target, true}],
doc_id = <<"mydoc">>,
db_name = <<"mydb">>
State = #rep_state{
- rep_details = Rep,
source = Rep#rep.source,
target =,
session_id = <<"a">>,
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
index 8ab55f838..3ea9dff4e 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
@@ -17,7 +17,7 @@
%% public api
- start_child/1,
+ start_child/2,
@@ -37,8 +37,8 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-start_child(#rep{} = Rep) ->
- supervisor:start_child(?MODULE, [Rep]).
+start_child(#{} = Job, #{} = Rep) ->
+ supervisor:start_child(?MODULE, [Job, Rep]).
terminate_child(Pid) ->
diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl
index 5475e8f37..b86529f26 100644
--- a/src/couch_replicator/src/couch_replicator_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_sup.erl
@@ -20,18 +20,6 @@ start_link() ->
init(_Args) ->
Children = [
- {couch_replication_event,
- {gen_event, start_link, [{local, couch_replication}]},
- permanent,
- brutal_kill,
- worker,
- dynamic},
- {couch_replicator_clustering,
- {couch_replicator_clustering, start_link, []},
- permanent,
- brutal_kill,
- worker,
- [couch_replicator_clustering]},
{couch_replicator_connection, start_link, []},
@@ -70,12 +58,6 @@ init(_Args) ->
- [couch_replicator]},
- {couch_replicator_db_changes,
- {couch_replicator_db_changes, start_link, []},
- permanent,
- brutal_kill,
- worker,
- [couch_multidb_changes]}
+ [couch_replicator]}
{ok, {{rest_for_one,10,1}, Children}}.
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index ccf241324..b71ffeb46 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -20,11 +20,11 @@
- pp_rep_id/1,
- normalize_rep/1
+ normalize_rep/1,
+ default_headers_map/0
@@ -74,14 +74,6 @@ get_json_value(Key, Props, Default) when is_binary(Key) ->
-% pretty-print replication id
--spec pp_rep_id(#rep{} | rep_id()) -> string().
-pp_rep_id(#rep{id = RepId}) ->
- pp_rep_id(RepId);
-pp_rep_id({Base, Extension}) ->
- Base ++ Extension.
% NV: TODO: this function is not used outside api wrap module
% consider moving it there during final cleanup
is_deleted(Change) ->
@@ -102,8 +94,13 @@ parse_rep_doc(Props, UserCtx) ->
couch_replicator_docs:parse_rep_doc(Props, UserCtx).
--spec iso8601(erlang:timestamp()) -> binary().
-iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
+-spec iso8601(integer()) -> binary().
+iso8601(Native) when is_integer(Native) ->
+ ErlangSystemTime = erlang:convert_time_unit(Native, native, microsecond),
+ MegaSecs = ErlangSystemTime div 1000000000000,
+ Secs = ErlangSystemTime div 1000000 - MegaSecs * 1000000,
+ MicroSecs = ErlangSystemTime rem 1000000,
+ {MegaSecs, Secs, MicroSecs}.
{{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])).
@@ -157,25 +154,39 @@ decode_basic_creds(Base64) ->
-% Normalize a #rep{} record such that it doesn't contain time dependent fields
+% Normalize a rep map such that it doesn't contain time dependent fields
% pids (like httpc pools), and options / props are sorted. This function would
% used during comparisons.
--spec normalize_rep(#rep{} | nil) -> #rep{} | nil.
-normalize_rep(nil) ->
- nil;
-normalize_rep(#rep{} = Rep)->
- #rep{
- source = couch_replicator_api_wrap:normalize_db(Rep#rep.source),
- target = couch_replicator_api_wrap:normalize_db(,
- options = Rep#rep.options, % already sorted in make_options/1
- type = Rep#rep.type,
- view = Rep#rep.view,
- doc_id = Rep#rep.doc_id,
- db_name = Rep#rep.db_name
+-spec normalize_rep(#{} | null) -> #{} | null.
+normalize_rep(null) ->
+ null;
+normalize_rep(#{} = Rep)->
+ Ks = [<<"options">>, <<"type">>, <<"view">>, <<"doc_id">>, <<"db_name">>],
+ Rep1 = maps:with(Ks, Rep),
+ #{<<"source">> := Source, <<"target">> := Target} = Rep,
+ Rep1#{
+ <<"source">> => normalize_endpoint(Source),
+ <<"target">> => normalize_endpoint(Target)
+normalize_endpoint(<<DbName/binary>>) ->
+ DbName;
+normalize_endpoint(#{} = Endpoint) ->
+ Ks = [<<"url">>, <<"auth_props">>, <<"headers">>, <<"timeout">>,
+ <<"ibrowse_options">>, <<"retries">>, <<"http_connections">>
+ ],
+ maps:with(Ks, Endpoint).
+get_default_headers() ->
+ lists:foldl(fun({K, V}, Acc) ->
+ Acc#{list_to_binary(K) => list_to_binary(V)}
+ end, #{}, (#httpdb{})#httpdb.headers).
@@ -254,4 +265,23 @@ normalize_rep_test_() ->
+normalize_endpoint() ->
+ HttpDb = #httpdb{
+ url = "http://host/db",
+ auth_props = [{"key", "val"}],
+ headers = [{"k2","v2"}, {"k1","v1"}],
+ timeout = 30000,
+ ibrowse_options = [{k2, v2}, {k1, v1}],
+ retries = 10,
+ http_connections = 20
+ },
+ Expected = HttpDb#httpdb{
+ headers = [{"k1","v1"}, {"k2","v2"}],
+ ibrowse_options = [{k1, v1}, {k2, v2}]
+ },
+ ?assertEqual(Expected, normalize_db(HttpDb)),
+ ?assertEqual(<<"local">>, normalize_db(<<"local">>)).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
index 4f545bcb5..5fb922a3e 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
@@ -49,7 +49,7 @@ parse_rep_doc_without_proxy(_) ->
{<<"source">>, <<"">>},
{<<"target">>, <<"">>}
- Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc),
+ Rep = couch_replicator_docs:parse_rep_doc_without_id(NoProxyDoc),
?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined),
?assertEqual((, undefined)
@@ -63,7 +63,7 @@ parse_rep_doc_with_proxy(_) ->
{<<"target">>, <<"">>},
{<<"proxy">>, ProxyURL}
- Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+ Rep = couch_replicator_docs:parse_rep_doc_without_id(ProxyDoc),
?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)),
?assertEqual((, binary_to_list(ProxyURL))
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index c926da9e0..9ec9f2bcf 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -29,6 +29,9 @@
+ get_during_doc_update_fun/1,
+ get_after_db_create_fun/1,
+ get_after_db_delete_fun/1,
@@ -155,7 +158,9 @@ create(DbName, Options) ->
#{} = Db0 ->
Db1 = maybe_add_sys_db_callbacks(Db0),
ok = fabric2_server:store(Db1),
- {ok, Db1#{tx := undefined}};
+ Db2 = Db1#{tx := undefined},
+ ok = apply_after_db_create(Db2),
+ {ok, Db2};
Error ->
@@ -188,6 +193,7 @@ delete(DbName, Options) ->
if Resp /= ok -> Resp; true ->
+ ok = apply_after_db_delete(Db#{tx := undefined}),
@@ -264,6 +270,19 @@ get_after_doc_read_fun(#{after_doc_read := AfterDocRead}) ->
get_before_doc_update_fun(#{before_doc_update := BeforeDocUpdate}) ->
+get_during_doc_update_fun(#{during_doc_update := DuringDocUpdate}) ->
+ DuringDocUpdate.
+get_after_db_create_fun(#{after_db_create := AfterDbCreate}) ->
+ AfterDbCreate.
+get_after_db_delete_fun(#{after_db_delete := AfterDbDelete}) ->
+ AfterDbDelete.
get_committed_update_seq(#{} = Db) ->
@@ -762,24 +781,33 @@ maybe_add_sys_db_callbacks(Db) ->
IsReplicatorDb = is_replicator_db(Db),
IsUsersDb = is_users_db(Db),
- {BDU, ADR} = if
+ {BDU, DDU, ADR, ADC, ADD} = if
IsReplicatorDb ->
fun couch_replicator_docs:before_doc_update/3,
- fun couch_replicator_docs:after_doc_read/2
+ fun couch_replicator_doc_processor:during_doc_update/3,
+ fun couch_replicator_docs:after_doc_read/2,
+ fun couch_replicator_doc_processor:after_db_create/1,
+ fun couch_replicator_doc_processor:after_db_delete/1
IsUsersDb ->
fun fabric2_users_db:before_doc_update/3,
- fun fabric2_users_db:after_doc_read/2
+ undefined,
+ fun fabric2_users_db:after_doc_read/2,
+ undefined,
+ undefined
true ->
- {undefined, undefined}
+ {undefined, undefined, undefined, undefined, undefined}
before_doc_update := BDU,
- after_doc_read := ADR
+ during_doc_update := DDU,
+ after_doc_read := ADR,
+ after_db_create := ADC,
+ after_db_delete := ADD
@@ -1042,6 +1070,33 @@ apply_before_doc_update(Db, Docs, Options) ->
+apply_during_doc_update(#{during_doc_update := DDU} = Db, Doc, UpdateType)
+ when is_function(DDU, 3) ->
+ DDU(Doc, Db, UpdateType),
+ ok;
+apply_during_doc_update(#{during_doc_update := undefined}, _, _) ->
+ ok.
+apply_after_db_create(#{after_db_create := ADC} = Db)
+ when is_function(ADC, 1) ->
+ ADC(Db),
+ ok;
+apply_after_db_create(#{after_db_create := undefined}) ->
+ ok.
+apply_after_db_delete(#{after_db_delete := ADD} = Db)
+ when is_function(ADD, 1) ->
+ ADD(Db),
+ ok;
+apply_after_db_delete(#{after_db_delete := undefined}) ->
+ ok.
update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
IsLocal = case of
<<?LOCAL_DOC_PREFIX, _/binary>> -> true;
@@ -1218,6 +1273,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) ->
+ ok = apply_during_doc_update(Db, Doc3, interactive_edit),
{ok, {NewRevPos, NewRev}}.
@@ -1301,6 +1358,8 @@ update_doc_replicated(Db, Doc0, _Options) ->
+ ok = apply_during_doc_update(Db, Doc3, replicated_changes),
{ok, []}.
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 71cb68f21..6387afd1e 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -162,7 +162,10 @@ create(#{} = Db0, Options) ->
validate_doc_update_funs => [],
before_doc_update => undefined,
+ during_doc_update => undefined,
after_doc_read => undefined,
+ after_db_create => undefined,
+ after_db_delete => undefined,
% All other db things as we add features,
db_options => Options
@@ -199,8 +202,10 @@ open(#{} = Db0, Options) ->
% bits.
validate_doc_update_funs => [],
before_doc_update => undefined,
+ during_doc_update => undefined,
after_doc_read => undefined,
+ after_db_create => undefined,
+ after_db_delete => undefined,
db_options => Options