summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-08-28 04:33:11 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2020-09-15 16:13:46 -0400
commit276d19731bc5df73838f40efc126f1f709e04fbe (patch)
tree2315401f4f799f72711bc45a0f97a61ab7fd416d
parent941cfc3d7b33cbfbf7e95eb7db388515d0595399 (diff)
downloadcouchdb-276d19731bc5df73838f40efc126f1f709e04fbe.tar.gz
Update couch_replicator_ids
This module is responsible for calculating replication IDs. It inspects all the replication options which may affect the replication results and hashes them into a single ID. CouchDB replicator tries to maintain compatibility with older versions of itself so it keep tracks of how to calculate replication IDs used by previous version of CouchDB. Replication ID calculation algorithms have their own version, the latest one is at version 4. One of the goals of this update is to not alter the replication ID algorithm and keep it at version 4, such that for all the same parameters the replication IDs should stay the same as they would be on CouchDB <= 3.x. That is why in some internal function, options maps and binares are turned back into proplist and tuples before hashing is performed. There is a unit tests which asserts that the replication ID calcuated with this update matches what was calcuated in CouchDB 3.x. Internal representation of the replication ID has changed slighly. Previously it was represented by a tuple of `{BaseId, ExtId}`, where `BaseId` was the ID without any options such as `continuous` or `create_target`, and `ExtId` was the concatenated list of those options. In most cases it was useful to operate on the full ID and in only a few place the `BaseId` was needed. So the calculation function was updated to return `{RepId, BaseId}` instead. `RepId` is a binary that is the full relication ID (base + extensions) and `BaseId` is just the base. The function which calculated the base ID was updated to actually be called `base_id/2` as opposed to `replication_id/2`. Another update to the module is a function which calculates replication job IDs. A `JobId` is used to identify replication jobs in the `couch_jobs` API. A `JobId`, unlike a `RepId` never requires making a network round-trip to calculate. For replications created from `_replicator` docs, `JobId` is defined as the concatenation of the database instance UUID and document ID. For a transient jobs it is calculated by hashing the source, target endpoint parameters, replication options. In fact, it is almost the same as a replication ID, with one important difference that the filter design doc name and function name are used instead of the contents of the filter from the source, so no network round-trip is necessary to calculate it.
-rw-r--r--src/couch_replicator/src/couch_replicator_ids.erl202
1 files changed, 141 insertions, 61 deletions
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 04e71c3ef..d1cbe571c 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -14,7 +14,9 @@
-export([
replication_id/1,
- replication_id/2,
+ base_id/2,
+ job_id/3,
+ job_id/2,
convert/1
]).
@@ -30,28 +32,31 @@
% {filter_fetch_error, Error} exception.
%
-replication_id(#rep{options = Options} = Rep) ->
- BaseId = replication_id(Rep, ?REP_ID_VERSION),
- {BaseId, maybe_append_options([continuous, create_target], Options)}.
+replication_id(#{?OPTIONS := Options} = Rep) ->
+ BaseId = base_id(Rep, ?REP_ID_VERSION),
+ UseOpts = [<<"continuous">>, <<"create_target">>],
+ ExtId = maybe_append_options(UseOpts, Options),
+ RepId = iolist_to_binary([BaseId, ExtId]),
+ {RepId, BaseId}.
% Versioned clauses for generating replication IDs.
% If a change is made to how replications are identified,
% please add a new clause and increase ?REP_ID_VERSION.
-replication_id(#rep{} = Rep, 4) ->
+base_id(#{?SOURCE := Src, ?TARGET := Tgt} = Rep, 4) ->
UUID = couch_server:get_uuid(),
- SrcInfo = get_v4_endpoint(Rep#rep.source),
- TgtInfo = get_v4_endpoint(Rep#rep.target),
+ SrcInfo = get_v4_endpoint(Src),
+ TgtInfo = get_v4_endpoint(Tgt),
maybe_append_filters([UUID, SrcInfo, TgtInfo], Rep);
-replication_id(#rep{} = Rep, 3) ->
+base_id(#{?SOURCE := Src0, ?TARGET := Tgt0} = Rep, 3) ->
UUID = couch_server:get_uuid(),
- Src = get_rep_endpoint(Rep#rep.source),
- Tgt = get_rep_endpoint(Rep#rep.target),
+ Src = get_rep_endpoint(Src0),
+ Tgt = get_rep_endpoint(Tgt0),
maybe_append_filters([UUID, Src, Tgt], Rep);
-replication_id(#rep{} = Rep, 2) ->
+base_id(#{?SOURCE := Src0, ?TARGET := Tgt0} = Rep, 2) ->
{ok, HostName} = inet:gethostname(),
Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
P when is_number(P) ->
@@ -64,47 +69,76 @@ replication_id(#rep{} = Rep, 2) ->
% ... mochiweb_socket_server:get(https, port)
list_to_integer(config:get("httpd", "port", "5984"))
end,
- Src = get_rep_endpoint(Rep#rep.source),
- Tgt = get_rep_endpoint(Rep#rep.target),
+ Src = get_rep_endpoint(Src0),
+ Tgt = get_rep_endpoint(Tgt0),
maybe_append_filters([HostName, Port, Src, Tgt], Rep);
-replication_id(#rep{} = Rep, 1) ->
+base_id(#{?SOURCE := Src0, ?TARGET := Tgt0} = Rep, 1) ->
{ok, HostName} = inet:gethostname(),
- Src = get_rep_endpoint(Rep#rep.source),
- Tgt = get_rep_endpoint(Rep#rep.target),
+ Src = get_rep_endpoint(Src0),
+ Tgt = get_rep_endpoint(Tgt0),
maybe_append_filters([HostName, Src, Tgt], Rep).
--spec convert([_] | binary() | {string(), string()}) -> {string(), string()}.
-convert(Id) when is_list(Id) ->
- convert(?l2b(Id));
+-spec job_id(#{}, binary() | null, binary() | null) -> binary().
+job_id(#{} = Rep, null = _DbUUID, null = _DocId) ->
+ #{
+ ?SOURCE := Src,
+ ?TARGET := Tgt,
+ ?REP_USER := UserName,
+ ?OPTIONS := Options
+ } = Rep,
+ UUID = couch_server:get_uuid(),
+ SrcInfo = get_v4_endpoint(Src),
+ TgtInfo = get_v4_endpoint(Tgt),
+ UseOpts = [<<"continuous">>, <<"create_target">>],
+ Opts = maybe_append_options(UseOpts, Options),
+ IdParts = [UUID, SrcInfo, TgtInfo, UserName, Opts],
+ maybe_append_filters(IdParts, Rep, false);
+
+job_id(#{} = _Rep, DbUUID, DocId) when is_binary(DbUUID), is_binary(DocId) ->
+ job_id(DbUUID, DocId).
+
+
+-spec job_id(binary(), binary()) -> binary().
+job_id(DbUUID, DocId) when is_binary(DbUUID), is_binary(DocId) ->
+ <<DbUUID/binary, "|", DocId/binary>>.
+
+
+-spec convert(binary()) -> binary().
convert(Id0) when is_binary(Id0) ->
% Spaces can result from mochiweb incorrectly unquoting + characters from
% the URL path. So undo the incorrect parsing here to avoid forcing
% users to url encode + characters.
- Id = binary:replace(Id0, <<" ">>, <<"+">>, [global]),
- lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id));
-convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) ->
- Id.
+ binary:replace(Id0, <<" ">>, <<"+">>, [global]).
% Private functions
-maybe_append_filters(Base,
- #rep{source = Source, options = Options}) ->
+maybe_append_filters(Base, #{} = Rep) ->
+ maybe_append_filters(Base, Rep, true).
+
+
+maybe_append_filters(Base, #{} = Rep, FetchFilter) ->
+ #{
+ ?SOURCE := Source,
+ ?OPTIONS := Options
+ } = Rep,
Base2 = Base ++
case couch_replicator_filters:parse(Options) of
{ok, nil} ->
[];
{ok, {view, Filter, QueryParams}} ->
[Filter, QueryParams];
- {ok, {user, {Doc, Filter}, QueryParams}} ->
+ {ok, {user, {Doc, Filter}, QueryParams}} when FetchFilter =:= true ->
case couch_replicator_filters:fetch(Doc, Filter, Source) of
{ok, Code} ->
[Code, QueryParams];
{error, Error} ->
throw({filter_fetch_error, Error})
end;
+ {ok, {user, {Doc, Filter}, QueryParams}} when FetchFilter =:= false ->
+ [Doc, Filter, QueryParams];
{ok, {docids, DocIds}} ->
[DocIds];
{ok, {mango, Selector}} ->
@@ -112,27 +146,33 @@ maybe_append_filters(Base,
{error, FilterParseError} ->
throw({error, FilterParseError})
end,
- couch_util:to_hex(couch_hash:md5_hash(term_to_binary(Base2))).
+ Res = couch_util:to_hex(couch_hash:md5_hash(term_to_binary(Base2))),
+ list_to_binary(Res).
-maybe_append_options(Options, RepOptions) ->
+maybe_append_options(Options, #{} = RepOptions) ->
lists:foldl(fun(Option, Acc) ->
Acc ++
- case couch_util:get_value(Option, RepOptions, false) of
- true ->
- "+" ++ atom_to_list(Option);
- false ->
- ""
+ case maps:get(Option, RepOptions, false) of
+ true -> "+" ++ binary_to_list(Option);
+ false -> ""
end
end, [], Options).
-get_rep_endpoint(#httpdb{url=Url, headers=Headers}) ->
+get_rep_endpoint(#{<<"url">> := Url0, <<"headers">> := Headers0}) ->
+ % We turn everything to lists and proplists to calculate the same
+ % replication ID as CouchDB <= 3.x
+ Url = binary_to_list(Url0),
+ Headers1 = maps:fold(fun(K, V, Acc) ->
+ [{binary_to_list(K), binary_to_list(V)} | Acc]
+ end, [], Headers0),
+ Headers2 = lists:keysort(1, Headers1),
DefaultHeaders = (#httpdb{})#httpdb.headers,
- {remote, Url, Headers -- DefaultHeaders}.
+ {remote, Url, Headers2 -- DefaultHeaders}.
-get_v4_endpoint(#httpdb{} = HttpDb) ->
+get_v4_endpoint(#{} = HttpDb) ->
{remote, Url, Headers} = get_rep_endpoint(HttpDb),
{{UserFromHeaders, _}, HeadersWithoutBasicAuth} =
couch_replicator_utils:remove_basic_auth_from_headers(Headers),
@@ -184,92 +224,132 @@ get_non_default_port(_Schema, Port) ->
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
replication_id_convert_test_() ->
[?_assertEqual(Expected, convert(Id)) || {Expected, Id} <- [
- {{"abc", ""}, "abc"},
- {{"abc", ""}, <<"abc">>},
- {{"abc", "+x+y"}, <<"abc+x+y">>},
- {{"abc", "+x+y"}, {"abc", "+x+y"}},
- {{"abc", "+x+y"}, <<"abc x y">>}
+ {<<"abc">>, <<"abc">>},
+ {<<"abc+x">>, <<"abc+x">>},
+ {<<"abc+x">>, <<"abc x">>},
+ {<<"abc+x+y">>, <<"abc+x+y">>},
+ {<<"abc+x+y">>, <<"abc x y">>}
]].
+
http_v4_endpoint_test_() ->
[?_assertMatch({remote, User, Host, Port, Path, HeadersNoAuth, undefined},
- get_v4_endpoint(#httpdb{url = Url, headers = Headers})) ||
+ get_v4_endpoint(#{<<"url">> => Url, <<"headers">> => Headers})) ||
{{User, Host, Port, Path, HeadersNoAuth}, {Url, Headers}} <- [
{
{undefined, "host", default, "/", []},
- {"http://host", []}
+ {<<"http://host">>, #{}}
},
{
{undefined, "host", default, "/", []},
- {"https://host", []}
+ {<<"https://host">>, #{}}
},
{
{undefined, "host", default, "/", []},
- {"http://host:5984", []}
+ {<<"http://host:5984">>, #{}}
},
{
{undefined, "host", 1, "/", []},
- {"http://host:1", []}
+ {<<"http://host:1">>, #{}}
},
{
{undefined, "host", 2, "/", []},
- {"https://host:2", []}
+ {<<"https://host:2">>, #{}}
},
{
- {undefined, "host", default, "/", [{"h","v"}]},
- {"http://host", [{"h","v"}]}
+ {undefined, "host", default, "/", [{"h", "v"}]},
+ {<<"http://host">>, #{<<"h">> => <<"v">>}}
},
{
{undefined, "host", default, "/a/b", []},
- {"http://host/a/b", []}
+ {<<"http://host/a/b">>, #{}}
},
{
{"user", "host", default, "/", []},
- {"http://user:pass@host", []}
+ {<<"http://user:pass@host">>, #{}}
},
{
{"user", "host", 3, "/", []},
- {"http://user:pass@host:3", []}
+ {<<"http://user:pass@host:3">>, #{}}
},
{
{"user", "host", default, "/", []},
- {"http://user:newpass@host", []}
+ {<<"http://user:newpass@host">>, #{}}
},
{
{"user", "host", default, "/", []},
- {"http://host", [basic_auth("user","pass")]}
+ {<<"http://host">>, basic_auth(<<"user">>, <<"pass">>)}
},
{
{"user", "host", default, "/", []},
- {"http://host", [basic_auth("user","newpass")]}
+ {<<"http://host">>, basic_auth(<<"user">>, <<"newpass">>)}
},
{
{"user1", "host", default, "/", []},
- {"http://user1:pass1@host", [basic_auth("user2","pass2")]}
+ {<<"http://user1:pass1@host">>, basic_auth(<<"user2">>,
+ <<"pass2">>)}
},
{
{"user", "host", default, "/", [{"h", "v"}]},
- {"http://host", [{"h", "v"}, basic_auth("user","pass")]}
+ {<<"http://host">>, maps:merge(#{<<"h">> => <<"v">>},
+ basic_auth(<<"user">>, <<"pass">>))}
},
{
{undefined, "random_junk", undefined, undefined},
- {"random_junk", []}
+ {<<"random_junk">>, #{}}
},
{
{undefined, "host", default, "/", []},
- {"http://host", [{"Authorization", "Basic bad"}]}
+ {<<"http://host">>, #{<<"Authorization">> =>
+ <<"Basic bad">>}}
}
]
].
basic_auth(User, Pass) ->
- B64Auth = base64:encode_to_string(User ++ ":" ++ Pass),
- {"Authorization", "Basic " ++ B64Auth}.
+ B64Auth = base64:encode(<<User/binary, ":", Pass/binary>>),
+ #{<<"Authorization">> => <<"Basic ", B64Auth/binary>>}.
+
+
+version4_matches_couchdb3_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(id_matches_couchdb3)
+ ]
+ }.
+
+
+setup() ->
+ meck:expect(config, get, fun(_, _, Default) -> Default end).
+
+
+teardown(_) ->
+ meck:unload().
+
+
+id_matches_couchdb3(_) ->
+ {ok, Rep} = couch_replicator_parse:parse_rep(#{
+ <<"source">> => <<"http://adm:pass@127.0.0.1/abc">>,
+ <<"target">> => <<"http://adm:pass@127.0.0.1/xyz">>,
+ <<"create_target">> => true,
+ <<"continuous">> => true
+ }, null),
+ meck:expect(couch_server, get_uuid, 0, "somefixedid"),
+ {RepId, BaseId} = replication_id(Rep),
+ % Calculated on CouchDB 3.x
+ RepId3x = <<"ff71e1208f93ba054eb60e7ca8683fe4+continuous+create_target">>,
+ BaseId3x = <<"ff71e1208f93ba054eb60e7ca8683fe4">>,
+ ?assertEqual(RepId3x, RepId),
+ ?assertEqual(BaseId3x, BaseId).
-endif.