summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2016-05-13 18:12:52 -0400
committerNick Vatamaniuc <vatamane@apache.org>2017-04-28 17:35:50 -0400
commitd89f21bff34a21d7ba296d43b3b0c12021416424 (patch)
tree16be92a662667416bb6884030c6e83449ea7d8fa
parentd3d90976c77998b46622e403e0c05d82990cd59f (diff)
downloadcouchdb-d89f21bff34a21d7ba296d43b3b0c12021416424.tar.gz
Refactor utils into 3 modules
Over the years utils accumulated a lot of functionality. Clean up a bit by separating it into specific modules according to semantics: - couch_replicator_docs : Handle read and writing to replicator dbs. It includes updating state fields, parsing options from documents, and making sure replication VDU design document is in sync. - couch_replicator_filters : Fetch and manipulate replication filters. - couch_replicator_ids : Calculate replication IDs. Handles versioning and Pretty formatting of IDs. Filtered replications using user filter functions incorporate a filter code hash into the calculation, in that case call couch_replicator_filters module to fetch the filter from the source. Jira: COUCHDB-3324
-rw-r--r--src/couch_replicator/src/couch_replicator_docs.erl756
-rw-r--r--src/couch_replicator/src/couch_replicator_filters.erl214
-rw-r--r--src/couch_replicator/src/couch_replicator_ids.erl127
-rw-r--r--src/couch_replicator/src/couch_replicator_utils.erl583
4 files changed, 1198 insertions, 482 deletions
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
new file mode 100644
index 000000000..cce4ce23c
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -0,0 +1,756 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_docs).
+
+-export([
+ parse_rep_doc/1,
+ parse_rep_doc/2,
+ parse_rep_db/3,
+ parse_rep_doc_without_id/1,
+ parse_rep_doc_without_id/2,
+ before_doc_update/2,
+ after_doc_read/2,
+ ensure_rep_db_exists/0,
+ ensure_rep_ddoc_exists/1,
+ ensure_cluster_rep_ddoc_exists/1,
+ remove_state_fields/2,
+ update_doc_completed/3,
+ update_failed/3,
+ update_rep_id/1,
+ update_triggered/2,
+ update_error/2
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("ibrowse/include/ibrowse.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include("couch_replicator.hrl").
+-include("couch_replicator_js_functions.hrl").
+
+-import(couch_util, [
+ get_value/2,
+ get_value/3,
+ to_binary/1
+]).
+
+-import(couch_replicator_utils, [
+ get_json_value/2,
+ get_json_value/3
+]).
+
+
+-define(REP_DB_NAME, <<"_replicator">>).
+-define(REP_DESIGN_DOC, <<"_design/_replicator">>).
+-define(OWNER, <<"owner">>).
+-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
+-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
+
+
+remove_state_fields(DbName, DocId) ->
+ update_rep_doc(DbName, DocId, [
+ {<<"_replication_state">>, undefined},
+ {<<"_replication_state_time">>, undefined},
+ {<<"_replication_state_reason">>, undefined},
+ {<<"_replication_id">>, undefined},
+ {<<"_replication_stats">>, undefined}]).
+
+
+-spec update_doc_completed(binary(), binary(), [_]) -> any().
+update_doc_completed(DbName, DocId, Stats) ->
+ update_rep_doc(DbName, DocId, [
+ {<<"_replication_state">>, <<"completed">>},
+ {<<"_replication_state_reason">>, undefined},
+ {<<"_replication_stats">>, {Stats}}]),
+ couch_stats:increment_counter([couch_replicator, docs,
+ completed_state_updates]).
+
+
+-spec update_failed(binary(), binary(), any()) -> any().
+update_failed(DbName, DocId, Error) ->
+ Reason = error_reason(Error),
+ couch_log:error("Error processing replication doc `~s` from `~s`: ~s",
+ [DocId, DbName, Reason]),
+ update_rep_doc(DbName, DocId, [
+ {<<"_replication_state">>, <<"failed">>},
+ {<<"_replication_stats">>, undefined},
+ {<<"_replication_state_reason">>, Reason}]),
+ couch_stats:increment_counter([couch_replicator, docs,
+ failed_state_updates]).
+
+
+-spec update_triggered(#rep{}, rep_id()) -> ok.
+update_triggered(Rep, {Base, Ext}) ->
+ #rep{
+ db_name = DbName,
+ doc_id = DocId
+ } = Rep,
+ update_rep_doc(DbName, DocId, [
+ {<<"_replication_state">>, <<"triggered">>},
+ {<<"_replication_state_reason">>, undefined},
+ {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
+ {<<"_replication_stats">>, undefined}]),
+ ok.
+
+
+-spec update_error(#rep{}, any()) -> ok.
+update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
+ Reason = error_reason(Error),
+ BinRepId = case RepId of
+ {Base, Ext} ->
+ iolist_to_binary([Base, Ext]);
+ _Other ->
+ null
+ end,
+ update_rep_doc(DbName, DocId, [
+ {<<"_replication_state">>, <<"error">>},
+ {<<"_replication_state_reason">>, Reason},
+ {<<"_replication_stats">>, undefined},
+ {<<"_replication_id">>, BinRepId}]),
+ ok.
+
+
+-spec ensure_rep_db_exists() -> {ok, #db{}}.
+ensure_rep_db_exists() ->
+ Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db,
+ nologifmissing]) of
+ {ok, Db0} ->
+ Db0;
+ _Error ->
+ {ok, Db0} = couch_db:create(?REP_DB_NAME, [?CTX, sys_db]),
+ Db0
+ end,
+ ok = ensure_rep_ddoc_exists(?REP_DB_NAME),
+ {ok, Db}.
+
+
+-spec ensure_rep_ddoc_exists(binary()) -> ok.
+ensure_rep_ddoc_exists(RepDb) ->
+ case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
+ true ->
+ ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
+ false ->
+ ok
+ end.
+
+
+-spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
+ensure_rep_ddoc_exists(RepDb, DDocId) ->
+ case open_rep_doc(RepDb, DDocId) of
+ {not_found, no_db_file} ->
+ %% database was deleted.
+ ok;
+ {not_found, _Reason} ->
+ DocProps = replication_design_doc_props(DDocId),
+ DDoc = couch_doc:from_json_obj({DocProps}),
+ couch_log:notice("creating replicator ddoc", []),
+ {ok, _Rev} = save_rep_doc(RepDb, DDoc);
+ {ok, Doc} ->
+ Latest = replication_design_doc_props(DDocId),
+ {Props0} = couch_doc:to_json_obj(Doc, []),
+ {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
+ case compare_ejson({Props}, {Latest}) of
+ true ->
+ ok;
+ false ->
+ LatestWithRev = [{<<"_rev">>, Rev} | Latest],
+ DDoc = couch_doc:from_json_obj({LatestWithRev}),
+ couch_log:notice("updating replicator ddoc", []),
+ try
+ {ok, _} = save_rep_doc(RepDb, DDoc)
+ catch
+ throw:conflict ->
+ %% ignore, we'll retry next time
+ ok
+ end
+ end
+ end,
+ ok.
+
+
+-spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
+ensure_cluster_rep_ddoc_exists(RepDb) ->
+ DDocId = ?REP_DESIGN_DOC,
+ [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
+ ensure_rep_ddoc_exists(DbShard, DDocId).
+
+
+-spec compare_ejson({[_]}, {[_]}) -> boolean().
+compare_ejson(EJson1, EJson2) ->
+ EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
+ EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
+ EjsonSorted1 == EjsonSorted2.
+
+
+-spec replication_design_doc_props(binary()) -> [_].
+replication_design_doc_props(DDocId) ->
+ [
+ {<<"_id">>, DDocId},
+ {<<"language">>, <<"javascript">>},
+ {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+ ].
+
+
+% Note: parse_rep_doc can handle filtered replications. During parsing of the
+% replication doc it will make possibly remote http requests to the source
+% database. If failure or parsing of filter docs fails, parse_doc throws a
+% {filter_fetch_error, Error} excation. This exception should be considered
+% transient in respect to the contents of the document itself, since it depends
+% on netowrk availability of the source db and other factors.
+-spec parse_rep_doc({[_]}) -> #rep{}.
+parse_rep_doc(RepDoc) ->
+ {ok, Rep} = try
+ parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
+ catch
+ throw:{error, Reason} ->
+ throw({bad_rep_doc, Reason});
+ throw:{filter_fetch_error, Reason} ->
+ throw({filter_fetch_error, Reason});
+ Tag:Err ->
+ throw({bad_rep_doc, to_binary({Tag, Err})})
+ end,
+ Rep.
+
+
+-spec parse_rep_doc_without_id({[_]}) -> #rep{}.
+parse_rep_doc_without_id(RepDoc) ->
+ {ok, Rep} = try
+ parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
+ catch
+ throw:{error, Reason} ->
+ throw({bad_rep_doc, Reason});
+ Tag:Err ->
+ throw({bad_rep_doc, to_binary({Tag, Err})})
+ end,
+ Rep.
+
+
+-spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
+parse_rep_doc(Doc, UserCtx) ->
+ {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
+ Cancel = get_value(cancel, Rep#rep.options, false),
+ Id = get_value(id, Rep#rep.options, nil),
+ case {Cancel, Id} of
+ {true, nil} ->
+ % Cancel request with no id, must parse id out of body contents
+ {ok, update_rep_id(Rep)};
+ {true, Id} ->
+ % Cancel request with an id specified, so do not parse id from body
+ {ok, Rep};
+ {false, _Id} ->
+ % Not a cancel request, regular replication doc
+ {ok, update_rep_id(Rep)}
+ end.
+
+
+-spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.
+parse_rep_doc_without_id({Props}, UserCtx) ->
+ Proxy = get_value(<<"proxy">>, Props, <<>>),
+ Opts = make_options(Props),
+ case get_value(cancel, Opts, false) andalso
+ (get_value(id, Opts, nil) =/= nil) of
+ true ->
+ {ok, #rep{options = Opts, user_ctx = UserCtx}};
+ false ->
+ Source = parse_rep_db(get_value(<<"source">>, Props), Proxy, Opts),
+ Target = parse_rep_db(get_value(<<"target">>, Props), Proxy, Opts),
+ {Type, View} = case couch_replicator_filters:view_type(Props, Opts) of
+ {error, Error} ->
+ throw({bad_request, Error});
+ Result ->
+ Result
+ end,
+ Rep = #rep{
+ source = Source,
+ target = Target,
+ options = Opts,
+ user_ctx = UserCtx,
+ type = Type,
+ view = View,
+ doc_id = get_value(<<"_id">>, Props, null)
+ },
+ % Check if can parse filter code, if not throw exception
+ case couch_replicator_filters:parse(Opts) of
+ {error, FilterError} ->
+ throw({error, FilterError});
+ {ok, _Filter} ->
+ ok
+ end,
+ {ok, Rep}
+ end.
+
+
+% Update a #rep{} record with a replication_id. Calculating the id might involve
+% fetching a filter from the source db, and so it could fail intermetently.
+% In case of a failure to fetch the filter this function will throw a
+% `{filter_fetch_error, Reason} exception.
+update_rep_id(Rep) ->
+ RepId = couch_replicator_ids:replication_id(Rep),
+ Rep#rep{id = RepId}.
+
+
+update_rep_doc(RepDbName, RepDocId, KVs) ->
+ update_rep_doc(RepDbName, RepDocId, KVs, 1).
+
+
+update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) ->
+ try
+ case open_rep_doc(RepDbName, RepDocId) of
+ {ok, LastRepDoc} ->
+ update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2);
+ _ ->
+ ok
+ end
+ catch
+ throw:conflict ->
+ Msg = "Conflict when updating replication doc `~s`. Retrying.",
+ couch_log:error(Msg, [RepDocId]),
+ ok = timer:sleep(random:uniform(erlang:min(128, Wait)) * 100),
+ update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2)
+ end;
+
+update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
+ NewRepDocBody = lists:foldl(
+ fun({K, undefined}, Body) ->
+ lists:keydelete(K, 1, Body);
+ ({<<"_replication_state">> = K, State} = KV, Body) ->
+ case get_json_value(K, Body) of
+ State ->
+ Body;
+ _ ->
+ Body1 = lists:keystore(K, 1, Body, KV),
+ Timestamp = couch_replicator_utils:iso8601(os:timestamp()),
+ lists:keystore(
+ <<"_replication_state_time">>, 1, Body1,
+ {<<"_replication_state_time">>, Timestamp})
+ end;
+ ({K, _V} = KV, Body) ->
+ lists:keystore(K, 1, Body, KV)
+ end,
+ RepDocBody, KVs),
+ case NewRepDocBody of
+ RepDocBody ->
+ ok;
+ _ ->
+ % Might not succeed - when the replication doc is deleted right
+ % before this update (not an error, ignore).
+ save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
+ end.
+
+
+open_rep_doc(DbName, DocId) ->
+ case couch_db:open_int(DbName, [?CTX, sys_db]) of
+ {ok, Db} ->
+ try
+ couch_db:open_doc(Db, DocId, [ejson_body])
+ after
+ couch_db:close(Db)
+ end;
+ Else ->
+ Else
+ end.
+
+
+save_rep_doc(DbName, Doc) ->
+ {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
+ try
+ couch_db:update_doc(Db, Doc, [])
+ after
+ couch_db:close(Db)
+ end.
+
+
+-spec rep_user_ctx({[_]}) -> #user_ctx{}.
+rep_user_ctx({RepDoc}) ->
+ case get_json_value(<<"user_ctx">>, RepDoc) of
+ undefined ->
+ #user_ctx{};
+ {UserCtx} ->
+ #user_ctx{
+ name = get_json_value(<<"name">>, UserCtx, null),
+ roles = get_json_value(<<"roles">>, UserCtx, [])
+ }
+ end.
+
+
+-spec parse_rep_db({[_]} | binary(), binary(), [_]) -> #httpd{} | binary().
+parse_rep_db({Props}, Proxy, Options) ->
+ ProxyParams = parse_proxy_params(Proxy),
+ ProxyURL = case ProxyParams of
+ [] -> undefined;
+ _ -> binary_to_list(Proxy)
+ end,
+ Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
+ {AuthProps} = get_value(<<"auth">>, Props, {[]}),
+ {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
+ Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
+ DefaultHeaders = (#httpdb{})#httpdb.headers,
+ OAuth = case get_value(<<"oauth">>, AuthProps) of
+ undefined ->
+ nil;
+ {OauthProps} ->
+ #oauth{
+ consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)),
+ token = ?b2l(get_value(<<"token">>, OauthProps)),
+ token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)),
+ consumer_secret = ?b2l(get_value(<<"consumer_secret">>,
+ OauthProps)),
+ signature_method =
+ case get_value(<<"signature_method">>, OauthProps) of
+ undefined -> hmac_sha1;
+ <<"PLAINTEXT">> -> plaintext;
+ <<"HMAC-SHA1">> -> hmac_sha1;
+ <<"RSA-SHA1">> -> rsa_sha1
+ end
+ }
+ end,
+ #httpdb{
+ url = Url,
+ oauth = OAuth,
+ headers = lists:ukeymerge(1, Headers, DefaultHeaders),
+ ibrowse_options = lists:keysort(1,
+ [{socket_options, get_value(socket_options, Options)} |
+ ProxyParams ++ ssl_params(Url)]),
+ timeout = get_value(connection_timeout, Options),
+ http_connections = get_value(http_connections, Options),
+ retries = get_value(retries, Options),
+ proxy_url = ProxyURL
+ };
+
+parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
+ parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+
+parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
+ parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+
+parse_rep_db(<<DbName/binary>>, _Proxy, _Options) ->
+ DbName;
+
+parse_rep_db(undefined, _Proxy, _Options) ->
+ throw({error, <<"Missing replicator database">>}).
+
+
+-spec maybe_add_trailing_slash(binary() | list()) -> list().
+maybe_add_trailing_slash(Url) when is_binary(Url) ->
+ maybe_add_trailing_slash(?b2l(Url));
+maybe_add_trailing_slash(Url) ->
+ case lists:member($?, Url) of
+ true ->
+ Url; % skip if there are query params
+ false ->
+ case lists:last(Url) of
+ $/ ->
+ Url;
+ _ ->
+ Url ++ "/"
+ end
+ end.
+
+
+-spec make_options([_]) -> [_].
+make_options(Props) ->
+ Options0 = lists:ukeysort(1, convert_options(Props)),
+ Options = check_options(Options0),
+ DefWorkers = config:get("replicator", "worker_processes", "4"),
+ DefBatchSize = config:get("replicator", "worker_batch_size", "500"),
+ DefConns = config:get("replicator", "http_connections", "20"),
+ DefTimeout = config:get("replicator", "connection_timeout", "30000"),
+ DefRetries = config:get("replicator", "retries_per_request", "10"),
+ UseCheckpoints = config:get("replicator", "use_checkpoints", "true"),
+ DefCheckpointInterval = config:get("replicator", "checkpoint_interval",
+ "30000"),
+ {ok, DefSocketOptions} = couch_util:parse_term(
+ config:get("replicator", "socket_options",
+ "[{keepalive, true}, {nodelay, false}]")),
+ lists:ukeymerge(1, Options, lists:keysort(1, [
+ {connection_timeout, list_to_integer(DefTimeout)},
+ {retries, list_to_integer(DefRetries)},
+ {http_connections, list_to_integer(DefConns)},
+ {socket_options, DefSocketOptions},
+ {worker_batch_size, list_to_integer(DefBatchSize)},
+ {worker_processes, list_to_integer(DefWorkers)},
+ {use_checkpoints, list_to_existing_atom(UseCheckpoints)},
+ {checkpoint_interval, list_to_integer(DefCheckpointInterval)}
+ ])).
+
+
+-spec convert_options([_]) -> [_].
+convert_options([])->
+ [];
+convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V)->
+ throw({bad_request, <<"parameter `cancel` must be a boolean">>});
+convert_options([{<<"cancel">>, V} | R]) ->
+ [{cancel, V} | convert_options(R)];
+convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
+ IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
+ [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
+convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)->
+ throw({bad_request, <<"parameter `create_target` must be a boolean">>});
+convert_options([{<<"create_target">>, V} | R]) ->
+ [{create_target, V} | convert_options(R)];
+convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V)->
+ throw({bad_request, <<"parameter `continuous` must be a boolean">>});
+convert_options([{<<"continuous">>, V} | R]) ->
+ [{continuous, V} | convert_options(R)];
+convert_options([{<<"filter">>, V} | R]) ->
+ [{filter, V} | convert_options(R)];
+convert_options([{<<"query_params">>, V} | R]) ->
+ [{query_params, V} | convert_options(R)];
+convert_options([{<<"doc_ids">>, null} | R]) ->
+ convert_options(R);
+convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
+ throw({bad_request, <<"parameter `doc_ids` must be an array">>});
+convert_options([{<<"doc_ids">>, V} | R]) ->
+ % Ensure same behaviour as old replicator: accept a list of percent
+ % encoded doc IDs.
+ DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]),
+ [{doc_ids, DocIds} | convert_options(R)];
+convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
+ throw({bad_request, <<"parameter `selector` must be a JSON object">>});
+convert_options([{<<"selector">>, V} | R]) ->
+ [{selector, V} | convert_options(R)];
+convert_options([{<<"worker_processes">>, V} | R]) ->
+ [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"worker_batch_size">>, V} | R]) ->
+ [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"http_connections">>, V} | R]) ->
+ [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"connection_timeout">>, V} | R]) ->
+ [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"retries_per_request">>, V} | R]) ->
+ [{retries, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"socket_options">>, V} | R]) ->
+ {ok, SocketOptions} = couch_util:parse_term(V),
+ [{socket_options, SocketOptions} | convert_options(R)];
+convert_options([{<<"since_seq">>, V} | R]) ->
+ [{since_seq, V} | convert_options(R)];
+convert_options([{<<"use_checkpoints">>, V} | R]) ->
+ [{use_checkpoints, V} | convert_options(R)];
+convert_options([{<<"checkpoint_interval">>, V} | R]) ->
+ [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([_ | R]) -> % skip unknown option
+ convert_options(R).
+
+
+-spec check_options([_]) -> [_].
+check_options(Options) ->
+ DocIds = lists:keyfind(doc_ids, 1, Options),
+ Filter = lists:keyfind(filter, 1, Options),
+ Selector = lists:keyfind(selector, 1, Options),
+ case {DocIds, Filter, Selector} of
+ {false, false, false} -> Options;
+ {false, false, _} -> Options;
+ {false, _, false} -> Options;
+ {_, false, false} -> Options;
+ _ ->
+ throw({bad_request,
+ "`doc_ids`,`filter`,`selector` are mutually exclusive"})
+ end.
+
+
+-spec parse_proxy_params(binary() | [_]) -> [_].
+parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
+ parse_proxy_params(?b2l(ProxyUrl));
+parse_proxy_params([]) ->
+ [];
+parse_proxy_params(ProxyUrl) ->
+ #url{
+ host = Host,
+ port = Port,
+ username = User,
+ password = Passwd,
+ protocol = Protocol
+ } = ibrowse_lib:parse_url(ProxyUrl),
+ [
+ {proxy_protocol, Protocol},
+ {proxy_host, Host},
+ {proxy_port, Port}
+ ] ++ case is_list(User) andalso is_list(Passwd) of
+ false ->
+ [];
+ true ->
+ [{proxy_user, User}, {proxy_password, Passwd}]
+ end.
+
+
+-spec ssl_params([_]) -> [_].
+ssl_params(Url) ->
+ case ibrowse_lib:parse_url(Url) of
+ #url{protocol = https} ->
+ Depth = list_to_integer(
+ config:get("replicator", "ssl_certificate_max_depth", "3")
+ ),
+ VerifyCerts = config:get("replicator", "verify_ssl_certificates"),
+ CertFile = config:get("replicator", "cert_file", undefined),
+ KeyFile = config:get("replicator", "key_file", undefined),
+ Password = config:get("replicator", "password", undefined),
+ SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
+ SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of
+ true ->
+ case Password of
+ undefined ->
+ [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
+ _ ->
+ [{certfile, CertFile}, {keyfile, KeyFile},
+ {password, Password}] ++ SslOpts
+ end;
+ false -> SslOpts
+ end,
+ [{is_ssl, true}, {ssl_options, SslOpts1}];
+ #url{protocol = http} ->
+ []
+ end.
+
+
+-spec ssl_verify_options(true | false) -> [_].
+ssl_verify_options(true) ->
+ CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
+ [{verify, verify_peer}, {cacertfile, CAFile}];
+ssl_verify_options(false) ->
+ [{verify, verify_none}].
+
+
+-spec before_doc_update(#doc{}, #db{}) -> #doc{}.
+before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
+ Doc;
+before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
+ #user_ctx{roles = Roles, name = Name} = UserCtx,
+ case lists:member(<<"_replicator">>, Roles) of
+ true ->
+ Doc;
+ false ->
+ case couch_util:get_value(?OWNER, Body) of
+ undefined ->
+ Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+ Name ->
+ Doc;
+ Other ->
+ case (catch couch_db:check_is_admin(Db)) of
+ ok when Other =:= null ->
+ Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+ ok ->
+ Doc;
+ _ ->
+ throw({forbidden, <<"Can't update replication documents",
+ " from other users.">>})
+ end
+ end
+ end.
+
+
+-spec after_doc_read(#doc{}, #db{}) -> #doc{}.
+after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
+ Doc;
+after_doc_read(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
+ #user_ctx{name = Name} = UserCtx,
+ case (catch couch_db:check_is_admin(Db)) of
+ ok ->
+ Doc;
+ _ ->
+ case couch_util:get_value(?OWNER, Body) of
+ Name ->
+ Doc;
+ _Other ->
+ Source = strip_credentials(couch_util:get_value(<<"source">>,
+Body)),
+ Target = strip_credentials(couch_util:get_value(<<"target">>,
+Body)),
+ NewBody0 = ?replace(Body, <<"source">>, Source),
+ NewBody = ?replace(NewBody0, <<"target">>, Target),
+ #doc{revs = {Pos, [_ | Revs]}} = Doc,
+ NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
+ NewRevId = couch_db:new_revid(NewDoc),
+ NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
+ end
+ end.
+
+
+-spec strip_credentials(undefined) -> undefined;
+ (binary()) -> binary();
+ ({[_]}) -> {[_]}.
+strip_credentials(undefined) ->
+ undefined;
+strip_credentials(Url) when is_binary(Url) ->
+ re:replace(Url,
+ "http(s)?://(?:[^:]+):[^@]+@(.*)$",
+ "http\\1://\\2",
+ [{return, binary}]);
+strip_credentials({Props}) ->
+ {lists:keydelete(<<"oauth">>, 1, Props)}.
+
+
+error_reason({shutdown, Error}) ->
+ error_reason(Error);
+error_reason({bad_rep_doc, Reason}) ->
+ to_binary(Reason);
+error_reason({error, {Error, Reason}})
+ when is_atom(Error), is_binary(Reason) ->
+ to_binary(io_lib:format("~s: ~s", [Error, Reason]));
+error_reason({error, Reason}) ->
+ to_binary(Reason);
+error_reason(Reason) ->
+ to_binary(Reason).
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+check_options_pass_values_test() ->
+ ?assertEqual(check_options([]), []),
+ ?assertEqual(check_options([baz, {other, fiz}]), [baz, {other, fiz}]),
+ ?assertEqual(check_options([{doc_ids, x}]), [{doc_ids, x}]),
+ ?assertEqual(check_options([{filter, x}]), [{filter, x}]),
+ ?assertEqual(check_options([{selector, x}]), [{selector, x}]).
+
+
+check_options_fail_values_test() ->
+ ?assertThrow({bad_request, _},
+ check_options([{doc_ids, x}, {filter, y}])),
+ ?assertThrow({bad_request, _},
+ check_options([{doc_ids, x}, {selector, y}])),
+ ?assertThrow({bad_request, _},
+ check_options([{filter, x}, {selector, y}])),
+ ?assertThrow({bad_request, _},
+ check_options([{doc_ids, x}, {selector, y}, {filter, z}])).
+
+
+check_convert_options_pass_test() ->
+ ?assertEqual([], convert_options([])),
+ ?assertEqual([], convert_options([{<<"random">>, 42}])),
+ ?assertEqual([{cancel, true}],
+ convert_options([{<<"cancel">>, true}])),
+ ?assertEqual([{create_target, true}],
+ convert_options([{<<"create_target">>, true}])),
+ ?assertEqual([{continuous, true}],
+ convert_options([{<<"continuous">>, true}])),
+ ?assertEqual([{doc_ids, [<<"id">>]}],
+ convert_options([{<<"doc_ids">>, [<<"id">>]}])),
+ ?assertEqual([{selector, {key, value}}],
+ convert_options([{<<"selector">>, {key, value}}])).
+
+
+check_convert_options_fail_test() ->
+ ?assertThrow({bad_request, _},
+ convert_options([{<<"cancel">>, <<"true">>}])),
+ ?assertThrow({bad_request, _},
+ convert_options([{<<"create_target">>, <<"true">>}])),
+ ?assertThrow({bad_request, _},
+ convert_options([{<<"continuous">>, <<"true">>}])),
+ ?assertThrow({bad_request, _},
+ convert_options([{<<"doc_ids">>, not_a_list}])),
+ ?assertThrow({bad_request, _},
+ convert_options([{<<"selector">>, [{key, value}]}])).
+
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl
new file mode 100644
index 000000000..5668820d1
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_filters.erl
@@ -0,0 +1,214 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_filters).
+
+-export([
+ parse/1,
+ fetch/4,
+ view_type/2,
+ ejsort/1
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+% Parse the filter from replication options proplist.
+% Return {ok, {FilterType,...}} | {error, ParseError}.
+% For `user` filter, i.e. filters specified as user code
+% in source database, this code doesn't fetch the filter
+% code, but only returns the name of the filter.
+-spec parse([_]) ->
+ {ok, nil} |
+ {ok, {view, binary(), {[_]}}} |
+ {ok, {user, {binary(), binary()}, {[_]}}} |
+ {ok, {docids, [_]}} |
+ {ok, {mango, {[_]}}} |
+ {error, binary()}.
+parse(Options) ->
+ Filter = couch_util:get_value(filter, Options),
+ DocIds = couch_util:get_value(doc_ids, Options),
+ Selector = couch_util:get_value(selector, Options),
+ case {Filter, DocIds, Selector} of
+ {undefined, undefined, undefined} ->
+ {ok, nil};
+ {<<"_", _/binary>>, undefined, undefined} ->
+ {ok, {view, Filter, query_params(Options)}};
+ {_, undefined, undefined} ->
+ case parse_user_filter(Filter) of
+ {ok, {Doc, FilterName}} ->
+ {ok, {user, {Doc, FilterName}, query_params(Options)}};
+ {error, Error} ->
+ {error, Error}
+ end;
+ {undefined, _, undefined} ->
+ {ok, {docids, DocIds}};
+ {undefined, undefined, _} ->
+ {ok, {mango, ejsort(mango_selector:normalize(Selector))}};
+ _ ->
+ Err = "`selector`, `filter` and `doc_ids` are mutually exclusive",
+ {error, list_to_binary(Err)}
+ end.
+
+
+% Fetches body of filter function from source database. Guaranteed to either
+% return {ok, Body} or an {error, Reason}. Also assume this function might
+% block due to network / socket issues for an undeterminted amount of time.
+-spec fetch(binary(), binary(), binary(), #user_ctx{}) ->
+ {ok, {[_]}} | {error, binary()}.
+fetch(DDocName, FilterName, Source, UserCtx) ->
+ {Pid, Ref} = spawn_monitor(fun() ->
+ try fetch_internal(DDocName, FilterName, Source, UserCtx) of
+ Resp ->
+ exit({exit_ok, Resp})
+ catch
+ throw:{fetch_error, Reason} ->
+ exit({exit_fetch_error, Reason});
+ _OtherTag:Reason ->
+ exit({exit_other_error, Reason})
+ end
+ end),
+ receive
+ {'DOWN', Ref, process, Pid, {exit_ok, Resp}} ->
+ {ok, Resp};
+ {'DOWN', Ref, process, Pid, {exit_fetch_error, Reason}} ->
+ {error, Reason};
+ {'DOWN', Ref, process, Pid, {exit_other_error, Reason}} ->
+ {error, couch_util:to_binary(Reason)}
+ end.
+
+
+% Get replication type and view (if any) from replication document props
+-spec view_type([_], [_]) ->
+ {view, {binary(), binary()}} | {db, nil} | {error, binary()}.
+view_type(Props, Options) ->
+ case couch_util:get_value(<<"filter">>, Props) of
+ <<"_view">> ->
+ {QP} = couch_util:get_value(query_params, Options, {[]}),
+ ViewParam = couch_util:get_value(<<"view">>, QP),
+ case re:split(ViewParam, <<"/">>) of
+ [DName, ViewName] ->
+ {view, {<< "_design/", DName/binary >>, ViewName}};
+ _ ->
+ {error, <<"Invalid `view` parameter.">>}
+ end;
+ _ ->
+ {db, nil}
+ end.
+
+
+% Private functions
+
+fetch_internal(DDocName, FilterName, Source, UserCtx) ->
+ Db = case (catch couch_replicator_api_wrap:db_open(Source,
+ [{user_ctx, UserCtx}])) of
+ {ok, Db0} ->
+ Db0;
+ DbError ->
+ DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
+ [couch_replicator_api_wrap:db_uri(Source),
+ couch_util:to_binary(DbError)]),
+ throw({fetch_error, iolist_to_binary(DbErrorMsg)})
+ end,
+ try
+ Body = case (catch couch_replicator_api_wrap:open_doc(
+ Db, <<"_design/", DDocName/binary>>, [ejson_body])) of
+ {ok, #doc{body = Body0}} ->
+ Body0;
+ DocError ->
+ DocErrorMsg = io_lib:format(
+ "Couldn't open document `_design/~s` from source "
+ "database `~s`: ~s", [DDocName,
+ couch_replicator_api_wrap:db_uri(Source),
+ couch_util:to_binary(DocError)]
+ ),
+ throw({fetch_error, iolist_to_binary(DocErrorMsg)})
+ end,
+ try
+ Code = couch_util:get_nested_json_value(
+ Body, [<<"filters">>, FilterName]),
+ re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
+ catch
+ _Tag:CodeError ->
+ CodeErrorMsg = io_lib:format(
+ "Couldn't parse filter code from document ~s on `~s` "
+ " Error: ~s", [DDocName,
+ couch_replicator_api_wrap:db_uri(Source),
+ couch_util:to_binary(CodeError)]
+ ),
+ throw({fetch_error, CodeErrorMsg})
+ end
+ after
+ couch_replicator_api_wrap:db_close(Db)
+ end.
+
+
+-spec query_params([_]) -> {[_]}.
+query_params(Options)->
+ couch_util:get_value(query_params, Options, {[]}).
+
+
+parse_user_filter(Filter) ->
+ case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
+ {match, [DDocName0, FilterName0]} ->
+ {ok, {DDocName0, FilterName0}};
+ _ ->
+ {error, <<"Invalid filter. Must match `ddocname/filtername`.">>}
+ end.
+
+
+% Sort an EJSON object's properties to attempt
+% to generate a unique representation. This is used
+% to reduce the chance of getting different
+% replication checkpoints for the same Mango selector
+ejsort({V})->
+ ejsort_props(V, []);
+ejsort(V) when is_list(V) ->
+ ejsort_array(V, []);
+ejsort(V) ->
+ V.
+
+
+ejsort_props([], Acc)->
+ {lists:keysort(1, Acc)};
+ejsort_props([{K, V}| R], Acc) ->
+ ejsort_props(R, [{K, ejsort(V)} | Acc]).
+
+
+ejsort_array([], Acc)->
+ lists:reverse(Acc);
+ejsort_array([V | R], Acc) ->
+ ejsort_array(R, [ejsort(V) | Acc]).
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+ejsort_basic_values_test() ->
+ ?assertEqual(ejsort(0), 0),
+ ?assertEqual(ejsort(<<"a">>), <<"a">>),
+ ?assertEqual(ejsort(true), true),
+ ?assertEqual(ejsort([]), []),
+ ?assertEqual(ejsort({[]}), {[]}).
+
+
+ejsort_compound_values_test() ->
+ ?assertEqual(ejsort([2, 1, 3, <<"a">>]), [2, 1, 3, <<"a">>]),
+ Ej1 = {[{<<"a">>, 0}, {<<"c">>, 0}, {<<"b">>, 0}]},
+ Ej1s = {[{<<"a">>, 0}, {<<"b">>, 0}, {<<"c">>, 0}]},
+ ?assertEqual(ejsort(Ej1), Ej1s),
+ Ej2 = {[{<<"x">>, Ej1}, {<<"z">>, Ej1}, {<<"y">>, [Ej1, Ej1]}]},
+ ?assertEqual(ejsort(Ej2),
+ {[{<<"x">>, Ej1s}, {<<"y">>, [Ej1s, Ej1s]}, {<<"z">>, Ej1s}]}).
+
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
new file mode 100644
index 000000000..7f26db757
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -0,0 +1,127 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_ids).
+
+-export([
+ replication_id/1,
+ replication_id/2,
+ convert/1
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+% replication_id/1 and replication_id/2 will attempt to fetch
+% filter code for filtered replications. If fetching or parsing
+% of the remotely fetched filter code fails they throw:
+% {filter_fetch_error, Error} exception.
+%
+
+replication_id(#rep{options = Options} = Rep) ->
+ BaseId = replication_id(Rep, ?REP_ID_VERSION),
+ {BaseId, maybe_append_options([continuous, create_target], Options)}.
+
+
+% Versioned clauses for generating replication IDs.
+% If a change is made to how replications are identified,
+% please add a new clause and increase ?REP_ID_VERSION.
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 3) ->
+ UUID = couch_server:get_uuid(),
+ Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+ Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+ maybe_append_filters([UUID, Src, Tgt], Rep);
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 2) ->
+ {ok, HostName} = inet:gethostname(),
+ Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
+ P when is_number(P) ->
+ P;
+ _ ->
+ % On restart we might be called before the couch_httpd process is
+ % started.
+ % TODO: we might be under an SSL socket server only, or both under
+ % SSL and a non-SSL socket.
+ % ... mochiweb_socket_server:get(https, port)
+ list_to_integer(config:get("httpd", "port", "5984"))
+ end,
+ Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+ Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+ maybe_append_filters([HostName, Port, Src, Tgt], Rep);
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
+ {ok, HostName} = inet:gethostname(),
+ Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+ Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+ maybe_append_filters([HostName, Src, Tgt], Rep).
+
+
+-spec convert([_] | binary() | {string(), string()}) -> {string(), string()}.
+convert(Id) when is_list(Id) ->
+ convert(?l2b(Id));
+convert(Id) when is_binary(Id) ->
+ lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id));
+convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) ->
+ Id.
+
+
+% Private functions
+
+maybe_append_filters(Base,
+ #rep{source = Source, user_ctx = UserCtx, options = Options}) ->
+ Base2 = Base ++
+ case couch_replicator_filters:parse(Options) of
+ {ok, nil} ->
+ [];
+ {ok, {view, Filter, QueryParams}} ->
+ [Filter, QueryParams];
+ {ok, {user, {Doc, Filter}, QueryParams}} ->
+ case couch_replicator_filters:fetch(Doc, Filter, Source, UserCtx) of
+ {ok, Code} ->
+ [Code, QueryParams];
+ {error, Error} ->
+ throw({filter_fetch_error, Error})
+ end;
+ {ok, {docids, DocIds}} ->
+ [DocIds];
+ {ok, {mango, Selector}} ->
+ [Selector];
+ {error, FilterParseError} ->
+ throw({error, FilterParseError})
+ end,
+ couch_util:to_hex(couch_crypto:hash(md5, term_to_binary(Base2))).
+
+
+maybe_append_options(Options, RepOptions) ->
+ lists:foldl(fun(Option, Acc) ->
+ Acc ++
+ case couch_util:get_value(Option, RepOptions, false) of
+ true ->
+ "+" ++ atom_to_list(Option);
+ false ->
+ ""
+ end
+ end, [], Options).
+
+
+get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
+ DefaultHeaders = (#httpdb{})#httpdb.headers,
+ case OAuth of
+ nil ->
+ {remote, Url, Headers -- DefaultHeaders};
+ #oauth{} ->
+ {remote, Url, Headers -- DefaultHeaders, OAuth}
+ end;
+get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
+ {local, DbName, UserCtx}.
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index e96d52a41..05836d483 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -12,17 +12,28 @@
-module(couch_replicator_utils).
--export([parse_rep_doc/2]).
--export([open_db/1, close_db/1]).
--export([start_db_compaction_notifier/2, stop_db_compaction_notifier/1]).
--export([replication_id/2]).
--export([sum_stats/2, is_deleted/1]).
+-export([
+ parse_rep_doc/2,
+ open_db/1,
+ close_db/1,
+ start_db_compaction_notifier/2,
+ stop_db_compaction_notifier/1,
+ replication_id/2,
+ sum_stats/2,
+ is_deleted/1,
+ rep_error_to_binary/1,
+ get_json_value/2,
+ get_json_value/3,
+ pp_rep_id/1,
+ iso8601/1,
+ filter_state/3
+]).
--export([handle_db_event/3]).
+-export([
+ handle_db_event/3
+]).
-include_lib("couch/include/couch_db.hrl").
--include_lib("ibrowse/include/ibrowse.hrl").
--include("couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").
-import(couch_util, [
@@ -31,385 +42,6 @@
]).
-parse_rep_doc({Props}, UserCtx) ->
- ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)),
- Options = make_options(Props),
- case get_value(cancel, Options, false) andalso
- (get_value(id, Options, nil) =/= nil) of
- true ->
- {ok, #rep{options = Options, user_ctx = UserCtx}};
- false ->
- Source = parse_rep_db(get_value(<<"source">>, Props),
- ProxyParams, Options),
- Target = parse_rep_db(get_value(<<"target">>, Props),
- ProxyParams, Options),
-
-
- {RepType, View} = case get_value(<<"filter">>, Props) of
- <<"_view">> ->
- {QP} = get_value(query_params, Options, {[]}),
- ViewParam = get_value(<<"view">>, QP),
- View1 = case re:split(ViewParam, <<"/">>) of
- [DName, ViewName] ->
- {<< "_design/", DName/binary >>, ViewName};
- _ ->
- throw({bad_request, "Invalid `view` parameter."})
- end,
- {view, View1};
- _ ->
- {db, nil}
- end,
-
- Rep = #rep{
- source = Source,
- target = Target,
- options = Options,
- user_ctx = UserCtx,
- type = RepType,
- view = View,
- doc_id = get_value(<<"_id">>, Props, null)
- },
- {ok, Rep#rep{id = replication_id(Rep)}}
- end.
-
-
-replication_id(#rep{options = Options} = Rep) ->
- BaseId = replication_id(Rep, ?REP_ID_VERSION),
- {BaseId, maybe_append_options([continuous, create_target], Options)}.
-
-
-% Versioned clauses for generating replication IDs.
-% If a change is made to how replications are identified,
-% please add a new clause and increase ?REP_ID_VERSION.
-
-replication_id(#rep{user_ctx = UserCtx} = Rep, 3) ->
- UUID = couch_server:get_uuid(),
- Src = get_rep_endpoint(UserCtx, Rep#rep.source),
- Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
- maybe_append_filters([UUID, Src, Tgt], Rep);
-
-replication_id(#rep{user_ctx = UserCtx} = Rep, 2) ->
- {ok, HostName} = inet:gethostname(),
- Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
- P when is_number(P) ->
- P;
- _ ->
- % On restart we might be called before the couch_httpd process is
- % started.
- % TODO: we might be under an SSL socket server only, or both under
- % SSL and a non-SSL socket.
- % ... mochiweb_socket_server:get(https, port)
- list_to_integer(config:get("httpd", "port", "5984"))
- end,
- Src = get_rep_endpoint(UserCtx, Rep#rep.source),
- Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
- maybe_append_filters([HostName, Port, Src, Tgt], Rep);
-
-replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
- {ok, HostName} = inet:gethostname(),
- Src = get_rep_endpoint(UserCtx, Rep#rep.source),
- Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
- maybe_append_filters([HostName, Src, Tgt], Rep).
-
-
-maybe_append_filters(Base,
- #rep{source = Source, user_ctx = UserCtx, options = Options}) ->
- Filter = get_value(filter, Options),
- DocIds = get_value(doc_ids, Options),
- Selector = get_value(selector, Options),
- Base2 = Base ++
- case {Filter, DocIds, Selector} of
- {undefined, undefined, undefined} ->
- [];
- {<<"_", _/binary>>, undefined, undefined} ->
- [Filter, get_value(query_params, Options, {[]})];
- {_, undefined, undefined} ->
- [filter_code(Filter, Source, UserCtx),
- get_value(query_params, Options, {[]})];
- {undefined, _, undefined} ->
- [DocIds];
- {undefined, undefined, _} ->
- [ejsort(mango_selector:normalize(Selector))];
- _ ->
- throw({error, <<"`selector`, `filter` and `doc_ids` fields are mutually exclusive">>})
- end,
- couch_util:to_hex(couch_crypto:hash(md5, term_to_binary(Base2))).
-
-
-filter_code(Filter, Source, UserCtx) ->
- {DDocName, FilterName} =
- case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
- {match, [DDocName0, FilterName0]} ->
- {DDocName0, FilterName0};
- _ ->
- throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>})
- end,
- Db = case (catch couch_replicator_api_wrap:db_open(Source, [{user_ctx, UserCtx}])) of
- {ok, Db0} ->
- Db0;
- DbError ->
- DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
- [couch_replicator_api_wrap:db_uri(Source), couch_util:to_binary(DbError)]),
- throw({error, iolist_to_binary(DbErrorMsg)})
- end,
- try
- Body = case (catch couch_replicator_api_wrap:open_doc(
- Db, <<"_design/", DDocName/binary>>, [ejson_body])) of
- {ok, #doc{body = Body0}} ->
- Body0;
- DocError ->
- DocErrorMsg = io_lib:format(
- "Couldn't open document `_design/~s` from source "
- "database `~s`: ~s", [DDocName, couch_replicator_api_wrap:db_uri(Source),
- couch_util:to_binary(DocError)]),
- throw({error, iolist_to_binary(DocErrorMsg)})
- end,
- Code = couch_util:get_nested_json_value(
- Body, [<<"filters">>, FilterName]),
- re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
- after
- couch_replicator_api_wrap:db_close(Db)
- end.
-
-
-maybe_append_options(Options, RepOptions) ->
- lists:foldl(fun(Option, Acc) ->
- Acc ++
- case get_value(Option, RepOptions, false) of
- true ->
- "+" ++ atom_to_list(Option);
- false ->
- ""
- end
- end, [], Options).
-
-
-get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
- DefaultHeaders = (#httpdb{})#httpdb.headers,
- case OAuth of
- nil ->
- {remote, Url, Headers -- DefaultHeaders};
- #oauth{} ->
- {remote, Url, Headers -- DefaultHeaders, OAuth}
- end;
-get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
- {local, DbName, UserCtx}.
-
-
-parse_rep_db({Props}, ProxyParams, Options) ->
- Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
- {AuthProps} = get_value(<<"auth">>, Props, {[]}),
- {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
- Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
- DefaultHeaders = (#httpdb{})#httpdb.headers,
- OAuth = case get_value(<<"oauth">>, AuthProps) of
- undefined ->
- nil;
- {OauthProps} ->
- #oauth{
- consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)),
- token = ?b2l(get_value(<<"token">>, OauthProps)),
- token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)),
- consumer_secret = ?b2l(get_value(<<"consumer_secret">>, OauthProps)),
- signature_method =
- case get_value(<<"signature_method">>, OauthProps) of
- undefined -> hmac_sha1;
- <<"PLAINTEXT">> -> plaintext;
- <<"HMAC-SHA1">> -> hmac_sha1;
- <<"RSA-SHA1">> -> rsa_sha1
- end
- }
- end,
- #httpdb{
- url = Url,
- oauth = OAuth,
- headers = lists:ukeymerge(1, Headers, DefaultHeaders),
- ibrowse_options = lists:keysort(1,
- [{socket_options, get_value(socket_options, Options)} |
- ProxyParams ++ ssl_params(Url)]),
- timeout = get_value(connection_timeout, Options),
- http_connections = get_value(http_connections, Options),
- retries = get_value(retries, Options)
- };
-parse_rep_db(<<"http://", _/binary>> = Url, ProxyParams, Options) ->
- parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
-parse_rep_db(<<"https://", _/binary>> = Url, ProxyParams, Options) ->
- parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
-parse_rep_db(<<DbName/binary>>, _ProxyParams, _Options) ->
- DbName.
-
-
-maybe_add_trailing_slash(Url) when is_binary(Url) ->
- maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
- case lists:last(Url) of
- $/ ->
- Url;
- _ ->
- Url ++ "/"
- end.
-
-
-make_options(Props) ->
- Options0 = lists:ukeysort(1, convert_options(Props)),
- Options = check_options(Options0),
- DefWorkers = config:get("replicator", "worker_processes", "4"),
- DefBatchSize = config:get("replicator", "worker_batch_size", "500"),
- DefConns = config:get("replicator", "http_connections", "20"),
- DefTimeout = config:get("replicator", "connection_timeout", "30000"),
- DefRetries = config:get("replicator", "retries_per_request", "10"),
- UseCheckpoints = config:get("replicator", "use_checkpoints", "true"),
- DefCheckpointInterval = config:get("replicator", "checkpoint_interval", "30000"),
- {ok, DefSocketOptions} = couch_util:parse_term(
- config:get("replicator", "socket_options",
- "[{keepalive, true}, {nodelay, false}]")),
- lists:ukeymerge(1, Options, lists:keysort(1, [
- {connection_timeout, list_to_integer(DefTimeout)},
- {retries, list_to_integer(DefRetries)},
- {http_connections, list_to_integer(DefConns)},
- {socket_options, DefSocketOptions},
- {worker_batch_size, list_to_integer(DefBatchSize)},
- {worker_processes, list_to_integer(DefWorkers)},
- {use_checkpoints, list_to_existing_atom(UseCheckpoints)},
- {checkpoint_interval, list_to_integer(DefCheckpointInterval)}
- ])).
-
-
-convert_options([])->
- [];
-convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V)->
- throw({bad_request, <<"parameter `cancel` must be a boolean">>});
-convert_options([{<<"cancel">>, V} | R]) ->
- [{cancel, V} | convert_options(R)];
-convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
- IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
- Id = lists:splitwith(fun(X) -> X =/= $+ end, ?b2l(V)),
- [{id, Id} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)->
- throw({bad_request, <<"parameter `create_target` must be a boolean">>});
-convert_options([{<<"create_target">>, V} | R]) ->
- [{create_target, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V)->
- throw({bad_request, <<"parameter `continuous` must be a boolean">>});
-convert_options([{<<"continuous">>, V} | R]) ->
- [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
- [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
- [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, null} | R]) ->
- convert_options(R);
-convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
- throw({bad_request, <<"parameter `doc_ids` must be an array">>});
-convert_options([{<<"doc_ids">>, V} | R]) ->
- % Ensure same behaviour as old replicator: accept a list of percent
- % encoded doc IDs.
- DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V],
- [{doc_ids, DocIds} | convert_options(R)];
-convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
- throw({bad_request, <<"parameter `selector` must be a JSON object">>});
-convert_options([{<<"selector">>, V} | R]) ->
- [{selector, V} | convert_options(R)];
-convert_options([{<<"worker_processes">>, V} | R]) ->
- [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"worker_batch_size">>, V} | R]) ->
- [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_connections">>, V} | R]) ->
- [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"connection_timeout">>, V} | R]) ->
- [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"retries_per_request">>, V} | R]) ->
- [{retries, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"socket_options">>, V} | R]) ->
- {ok, SocketOptions} = couch_util:parse_term(V),
- [{socket_options, SocketOptions} | convert_options(R)];
-convert_options([{<<"since_seq">>, V} | R]) ->
- [{since_seq, V} | convert_options(R)];
-convert_options([{<<"use_checkpoints">>, V} | R]) ->
- [{use_checkpoints, V} | convert_options(R)];
-convert_options([{<<"checkpoint_interval">>, V} | R]) ->
- [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([_ | R]) -> % skip unknown option
- convert_options(R).
-
-check_options(Options) ->
- DocIds = lists:keyfind(doc_ids, 1, Options),
- Filter = lists:keyfind(filter, 1, Options),
- Selector = lists:keyfind(selector, 1, Options),
- case {DocIds, Filter, Selector} of
- {false, false, false} -> Options;
- {false, false, _} -> Options;
- {false, _, false} -> Options;
- {_, false, false} -> Options;
- _ ->
- throw({bad_request, "`doc_ids`, `filter`, `selector` are mutually exclusive options"})
- end.
-
-
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
- parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
- [];
-parse_proxy_params(ProxyUrl) ->
- #url{
- host = Host,
- port = Port,
- username = User,
- password = Passwd,
- protocol = Protocol
- } = ibrowse_lib:parse_url(ProxyUrl),
- [{proxy_protocol, Protocol}, {proxy_host, Host}, {proxy_port, Port}] ++
- case is_list(User) andalso is_list(Passwd) of
- false ->
- [];
- true ->
- [{proxy_user, User}, {proxy_password, Passwd}]
- end.
-
-
-ssl_params(Url) ->
- case ibrowse_lib:parse_url(Url) of
- #url{protocol = https} ->
- Depth = list_to_integer(
- config:get("replicator", "ssl_certificate_max_depth", "3")
- ),
- VerifyCerts = config:get("replicator", "verify_ssl_certificates"),
- CertFile = config:get("replicator", "cert_file", undefined),
- KeyFile = config:get("replicator", "key_file", undefined),
- Password = config:get("replicator", "password", undefined),
- SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
- SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of
- true ->
- case Password of
- undefined ->
- [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
- _ ->
- [{certfile, CertFile}, {keyfile, KeyFile},
- {password, Password}] ++ SslOpts
- end;
- false -> SslOpts
- end,
- [{is_ssl, true}, {ssl_options, SslOpts1}];
- #url{protocol = http} ->
- []
- end.
-
-ssl_verify_options(Value) ->
- ssl_verify_options(Value, erlang:system_info(otp_release)).
-
-ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" ->
- CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
- [{verify, verify_peer}, {cacertfile, CAFile}];
-ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" ->
- [{verify, verify_none}];
-ssl_verify_options(true, _OTPVersion) ->
- CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
- [{verify, 2}, {cacertfile, CAFile}];
-ssl_verify_options(false, _OTPVersion) ->
- [{verify, 0}].
-
-
-%% New db record has Options field removed here to enable smoother dbcore migration
open_db(#db{name = Name, user_ctx = UserCtx}) ->
{ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx} | []]),
Db;
@@ -444,103 +76,90 @@ handle_db_event(DbName, compacted, Server) ->
handle_db_event(_DbName, _Event, Server) ->
{ok, Server}.
-% Obsolete - remove in next release
+
+rep_error_to_binary(Error) ->
+ couch_util:to_binary(error_reason(Error)).
+
+
+error_reason({shutdown, Error}) ->
+ error_reason(Error);
+error_reason({error, {Error, Reason}})
+ when is_atom(Error), is_binary(Reason) ->
+ io_lib:format("~s: ~s", [Error, Reason]);
+error_reason({error, Reason}) ->
+ Reason;
+error_reason(Reason) ->
+ Reason.
+
+
+get_json_value(Key, Props) ->
+ get_json_value(Key, Props, undefined).
+
+get_json_value(Key, Props, Default) when is_atom(Key) ->
+ Ref = make_ref(),
+ case get_value(Key, Props, Ref) of
+ Ref ->
+ get_value(?l2b(atom_to_list(Key)), Props, Default);
+ Else ->
+ Else
+ end;
+get_json_value(Key, Props, Default) when is_binary(Key) ->
+ Ref = make_ref(),
+ case get_value(Key, Props, Ref) of
+ Ref ->
+ get_value(list_to_atom(?b2l(Key)), Props, Default);
+ Else ->
+ Else
+ end.
+
+
+% pretty-print replication id
+-spec pp_rep_id(#rep{} | rep_id()) -> string().
+pp_rep_id(#rep{id = RepId}) ->
+ pp_rep_id(RepId);
+pp_rep_id({Base, Extension}) ->
+ Base ++ Extension.
+
+
+% NV: TODO: this function is not used outside api wrap module
+% consider moving it there during final cleanup
+is_deleted(Change) ->
+ get_json_value(<<"deleted">>, Change, false).
+
+
+% NV: TODO: proxy some functions which used to be here, later remove
+% these and replace calls to their respective modules
+replication_id(Rep, Version) ->
+ couch_replicator_ids:replication_id(Rep, Version).
+
+
sum_stats(S1, S2) ->
couch_replicator_stats:sum_stats(S1, S2).
-is_deleted(Change) ->
- case couch_util:get_value(<<"deleted">>, Change) of
- undefined ->
- % keep backwards compatibility for a while
- couch_util:get_value(deleted, Change, false);
- Else ->
- Else
- end.
+parse_rep_doc(Props, UserCtx) ->
+ couch_replicator_docs:parse_rep_doc(Props, UserCtx).
-% Sort an EJSON object's properties to attempt
-% to generate a unique representation. This is used
-% to reduce the chance of getting different
-% replication checkpoints for the same Mango selector
-ejsort({V})->
- ejsort_props(V, []);
-ejsort(V) when is_list(V) ->
- ejsort_array(V, []);
-ejsort(V) ->
- V.
-
-ejsort_props([], Acc)->
- {lists:keysort(1, Acc)};
-ejsort_props([{K, V}| R], Acc) ->
- ejsort_props(R, [{K, ejsort(V)} | Acc]).
-
-ejsort_array([], Acc)->
- lists:reverse(Acc);
-ejsort_array([V | R], Acc) ->
- ejsort_array(R, [ejsort(V) | Acc]).
-
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
-ejsort_basic_values_test() ->
- ?assertEqual(ejsort(0), 0),
- ?assertEqual(ejsort(<<"a">>), <<"a">>),
- ?assertEqual(ejsort(true), true),
- ?assertEqual(ejsort([]), []),
- ?assertEqual(ejsort({[]}), {[]}).
-
-ejsort_compound_values_test() ->
- ?assertEqual(ejsort([2, 1, 3 ,<<"a">>]), [2, 1, 3, <<"a">>]),
- Ej1 = {[{<<"a">>, 0}, {<<"c">>, 0}, {<<"b">>, 0}]},
- Ej1s = {[{<<"a">>, 0}, {<<"b">>, 0}, {<<"c">>, 0}]},
- ?assertEqual(ejsort(Ej1), Ej1s),
- Ej2 = {[{<<"x">>, Ej1}, {<<"z">>, Ej1}, {<<"y">>, [Ej1, Ej1]}]},
- ?assertEqual(ejsort(Ej2),
- {[{<<"x">>, Ej1s}, {<<"y">>, [Ej1s, Ej1s]}, {<<"z">>, Ej1s}]}).
-
-check_options_pass_values_test() ->
- ?assertEqual(check_options([]), []),
- ?assertEqual(check_options([baz, {other,fiz}]), [baz, {other, fiz}]),
- ?assertEqual(check_options([{doc_ids, x}]), [{doc_ids, x}]),
- ?assertEqual(check_options([{filter, x}]), [{filter, x}]),
- ?assertEqual(check_options([{selector, x}]), [{selector, x}]).
-
-check_options_fail_values_test() ->
- ?assertThrow({bad_request, _},
- check_options([{doc_ids, x}, {filter, y}])),
- ?assertThrow({bad_request, _},
- check_options([{doc_ids, x}, {selector, y}])),
- ?assertThrow({bad_request, _},
- check_options([{filter, x}, {selector, y}])),
- ?assertThrow({bad_request, _},
- check_options([{doc_ids, x}, {selector, y}, {filter, z}])).
-
-check_convert_options_pass_test() ->
- ?assertEqual([], convert_options([])),
- ?assertEqual([], convert_options([{<<"random">>, 42}])),
- ?assertEqual([{cancel, true}],
- convert_options([{<<"cancel">>, true}])),
- ?assertEqual([{create_target, true}],
- convert_options([{<<"create_target">>, true}])),
- ?assertEqual([{continuous, true}],
- convert_options([{<<"continuous">>, true}])),
- ?assertEqual([{doc_ids, [<<"id">>]}],
- convert_options([{<<"doc_ids">>, [<<"id">>]}])),
- ?assertEqual([{selector, {key, value}}],
- convert_options([{<<"selector">>, {key, value}}])).
-
-check_convert_options_fail_test() ->
- ?assertThrow({bad_request, _},
- convert_options([{<<"cancel">>, <<"true">>}])),
- ?assertThrow({bad_request, _},
- convert_options([{<<"create_target">>, <<"true">>}])),
- ?assertThrow({bad_request, _},
- convert_options([{<<"continuous">>, <<"true">>}])),
- ?assertThrow({bad_request, _},
- convert_options([{<<"doc_ids">>, not_a_list}])),
- ?assertThrow({bad_request, _},
- convert_options([{<<"selector">>, [{key, value}]}])).
-
--endif.
+
+-spec iso8601(erlang:timestamp()) -> binary().
+iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
+ {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
+ Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
+ iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])).
+
+
+%% Filter replication info ejson by state provided. If it matches return
+%% the input value, if it doesn't return 'skip'. This is used from replicator
+%% fabric coordinator and worker.
+-spec filter_state(atom(), [atom()], {[_ | _]}) -> {[_ | _]} | skip.
+filter_state(null = _State, _States, _Info) ->
+ skip;
+filter_state(_ = _State, [] = _States, Info) ->
+ Info;
+filter_state(State, States, Info) ->
+ case lists:member(State, States) of
+ true ->
+ Info;
+ false ->
+ skip
+ end.