diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2017-02-03 09:59:23 -0600 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2017-09-27 15:35:59 -0500 |
commit | 20a1021307030f199b1a8a8430aa201ea0a042cf (patch) | |
tree | fb44c8c4438f5da9f16740f4906af1d759e7d0ea | |
parent | 7c3cf50d0f7a13fd5d758d9af44c2c6eb518e1da (diff) | |
download | couchdb-20a1021307030f199b1a8a8430aa201ea0a042cf.tar.gz |
Move calculate_start_seq and owner_of
These functions were originally implemented in fabric_rpc.erl where they
really didn't belong. Moving them to couch_db.erl allows us to keep the
unit tests intact rather than just removing them now that the #db record
is being made private.
COUCHDB-3288
-rw-r--r-- | src/couch/src/couch_db.erl | 103 | ||||
-rw-r--r-- | src/fabric/src/fabric_rpc.erl | 103 |
2 files changed, 112 insertions, 94 deletions
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 1813ae8a4..09d60eec7 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -83,6 +83,9 @@ changes_since/5, count_changes_since/2, + calculate_start_seq/3, + owner_of/2, + start_compact/1, cancel_compact/1, wait_for_compaction/1, @@ -386,7 +389,9 @@ get_uuid(#db{}=Db) -> couch_db_header:uuid(Db#db.header). get_epochs(#db{}=Db) -> - couch_db_header:epochs(Db#db.header). + Epochs = couch_db_header:epochs(Db#db.header), + validate_epochs(Epochs), + Epochs. get_compacted_seq(#db{}=Db) -> couch_db_header:compacted_seq(Db#db.header). @@ -1364,6 +1369,78 @@ enum_docs(Db, NS, InFun, InAcc, Options0) -> Db#db.id_tree, FoldFun, InAcc, Options), {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}. + +calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) -> + Seq; +calculate_start_seq(Db, Node, {Seq, Uuid}) -> + % Treat the current node as the epoch node + calculate_start_seq(Db, Node, {Seq, Uuid, Node}); +calculate_start_seq(Db, _Node, {Seq, Uuid, EpochNode}) -> + case is_prefix(Uuid, get_uuid(Db)) of + true -> + case is_owner(EpochNode, Seq, get_epochs(Db)) of + true -> Seq; + false -> 0 + end; + false -> + %% The file was rebuilt, most likely in a different + %% order, so rewind. + 0 + end; +calculate_start_seq(Db, _Node, {replace, OriginalNode, Uuid, Seq}) -> + case is_prefix(Uuid, couch_db:get_uuid(Db)) of + true -> + start_seq(get_epochs(Db), OriginalNode, Seq); + false -> + {replace, OriginalNode, Uuid, Seq} + end. + + +validate_epochs(Epochs) -> + %% Assert uniqueness. + case length(Epochs) == length(lists:ukeysort(2, Epochs)) of + true -> ok; + false -> erlang:error(duplicate_epoch) + end, + %% Assert order. + case Epochs == lists:sort(fun({_, A}, {_, B}) -> B =< A end, Epochs) of + true -> ok; + false -> erlang:error(epoch_order) + end. + + +is_prefix(Pattern, Subject) -> + binary:longest_common_prefix([Pattern, Subject]) == size(Pattern). + + +is_owner(Node, Seq, Epochs) -> + Node =:= owner_of(Epochs, Seq). + + +owner_of(Db, Seq) when not is_list(Db) -> + owner_of(get_epochs(Db), Seq); +owner_of([], _Seq) -> + undefined; +owner_of([{EpochNode, EpochSeq} | _Rest], Seq) when Seq > EpochSeq -> + EpochNode; +owner_of([_ | Rest], Seq) -> + owner_of(Rest, Seq). + + +start_seq([{OrigNode, EpochSeq} | _], OrigNode, Seq) when Seq > EpochSeq -> + %% OrigNode is the owner of the Seq so we can safely stream from there + Seq; +start_seq([{_, NewSeq}, {OrigNode, _} | _], OrigNode, Seq) when Seq > NewSeq -> + %% We transferred this file before Seq was written on OrigNode, so we need + %% to stream from the beginning of the next epoch. Note that it is _not_ + %% necessary for the current node to own the epoch beginning at NewSeq + NewSeq; +start_seq([_ | Rest], OrigNode, Seq) -> + start_seq(Rest, OrigNode, Seq); +start_seq([], OrigNode, Seq) -> + erlang:error({epoch_mismatch, OrigNode, Seq}). + + extract_namespace(Options0) -> case proplists:split(Options0, [namespace]) of {[[{namespace, NS}]], Options} -> @@ -1702,6 +1779,30 @@ should_fail_validate_dbname(DbName) -> ok end)}. +calculate_start_seq_test() -> + %% uuid mismatch is always a rewind. + Hdr1 = couch_db_header:new(), + Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]), + ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})), + %% uuid matches and seq is owned by node. + Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]), + ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})), + %% uuids match but seq is not owned by node. + Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]), + ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})), + %% return integer if we didn't get a vector. + ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)). + +is_owner_test() -> + ?assertNot(is_owner(foo, 1, [])), + ?assertNot(is_owner(foo, 1, [{foo, 1}])), + ?assert(is_owner(foo, 2, [{foo, 1}])), + ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])), + ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])), + ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])), + ?assertError(duplicate_epoch, validate_epochs([{foo, 1}, {bar, 1}])), + ?assertError(epoch_order, validate_epochs([{foo, 100}, {bar, 200}])). + to_binary(DbName) when is_list(DbName) -> ?l2b(DbName); to_binary(DbName) when is_binary(DbName) -> diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 93d7d1536..2201e359a 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -76,13 +76,13 @@ changes(DbName, Options, StartVector, DbOptions) -> args = Args, options = Options, pending = couch_db:count_changes_since(Db, StartSeq), - epochs = get_epochs(Db) + epochs = couch_db:get_epochs(Db) }, try {ok, #cacc{seq=LastSeq, pending=Pending, epochs=Epochs}} = couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0), rexi:stream_last({complete, [ - {seq, {LastSeq, uuid(Db), owner_of(LastSeq, Epochs)}}, + {seq, {LastSeq, uuid(Db), couch_db:owner_of(Epochs, LastSeq)}}, {pending, Pending} ]}) after @@ -362,7 +362,7 @@ changes_enumerator(DocInfo, Acc) -> Opts = if Conflicts -> [conflicts | DocOptions]; true -> DocOptions end, ChangesRow = {change, [ {pending, Pending-1}, - {seq, {Seq, uuid(Db), owner_of(Seq, Epochs)}}, + {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}, {id, Id}, {changes, Results}, {deleted, Del} | @@ -460,79 +460,20 @@ set_io_priority(DbName, Options) -> ok end. -calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) -> - Seq; -calculate_start_seq(Db, Node, {Seq, Uuid}) -> - % Treat the current node as the epoch node - calculate_start_seq(Db, Node, {Seq, Uuid, Node}); -calculate_start_seq(Db, _Node, {Seq, Uuid, EpochNode}) -> - case is_prefix(Uuid, couch_db:get_uuid(Db)) of - true -> - case is_owner(EpochNode, Seq, couch_db:get_epochs(Db)) of - true -> Seq; - false -> 0 - end; - false -> - %% The file was rebuilt, most likely in a different - %% order, so rewind. - 0 - end; -calculate_start_seq(Db, _Node, {replace, OriginalNode, Uuid, Seq}) -> - case is_prefix(Uuid, couch_db:get_uuid(Db)) of - true -> - start_seq(get_epochs(Db), OriginalNode, Seq); - false -> + +calculate_start_seq(Db, Node, Seq) -> + case couch_db:calculate_start_seq(Db, Node, Seq) of + N when is_integer(N) -> + N; + {replace, OriginalNode, Uuid, OriginalSeq} -> %% Scan history looking for an entry with %% * target_node == TargetNode %% * target_uuid == TargetUUID %% * target_seq =< TargetSeq %% If such an entry is found, stream from associated source_seq - mem3_rep:find_source_seq(Db, OriginalNode, Uuid, Seq) + mem3_rep:find_source_seq(Db, OriginalNode, Uuid, OriginalSeq) end. -is_prefix(Pattern, Subject) -> - binary:longest_common_prefix([Pattern, Subject]) == size(Pattern). - -is_owner(Node, Seq, Epochs) -> - validate_epochs(Epochs), - Node =:= owner_of(Seq, Epochs). - -owner_of(_Seq, []) -> - undefined; -owner_of(Seq, [{EpochNode, EpochSeq} | _Rest]) when Seq > EpochSeq -> - EpochNode; -owner_of(Seq, [_ | Rest]) -> - owner_of(Seq, Rest). - -get_epochs(Db) -> - Epochs = couch_db:get_epochs(Db), - validate_epochs(Epochs), - Epochs. - -start_seq([{OrigNode, EpochSeq} | _], OrigNode, Seq) when Seq > EpochSeq -> - %% OrigNode is the owner of the Seq so we can safely stream from there - Seq; -start_seq([{_, NewSeq}, {OrigNode, _} | _], OrigNode, Seq) when Seq > NewSeq -> - %% We transferred this file before Seq was written on OrigNode, so we need - %% to stream from the beginning of the next epoch. Note that it is _not_ - %% necessary for the current node to own the epoch beginning at NewSeq - NewSeq; -start_seq([_ | Rest], OrigNode, Seq) -> - start_seq(Rest, OrigNode, Seq); -start_seq([], OrigNode, Seq) -> - erlang:error({epoch_mismatch, OrigNode, Seq}). - -validate_epochs(Epochs) -> - %% Assert uniqueness. - case length(Epochs) == length(lists:ukeysort(2, Epochs)) of - true -> ok; - false -> erlang:error(duplicate_epoch) - end, - %% Assert order. - case Epochs == lists:sort(fun({_, A}, {_, B}) -> B =< A end, Epochs) of - true -> ok; - false -> erlang:error(epoch_order) - end. uuid(Db) -> Uuid = couch_db:get_uuid(Db), @@ -544,30 +485,6 @@ uuid_prefix_len() -> -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -calculate_start_seq_test() -> - %% uuid mismatch is always a rewind. - Hdr1 = couch_db_header:new(), - Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]), - ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})), - %% uuid matches and seq is owned by node. - Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]), - ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})), - %% uuids match but seq is not owned by node. - Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]), - ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})), - %% return integer if we didn't get a vector. - ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)). - -is_owner_test() -> - ?assertNot(is_owner(foo, 1, [])), - ?assertNot(is_owner(foo, 1, [{foo, 1}])), - ?assert(is_owner(foo, 2, [{foo, 1}])), - ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])), - ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])), - ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])), - ?assertError(duplicate_epoch, is_owner(foo, 1, [{foo, 1}, {bar, 1}])), - ?assertError(epoch_order, is_owner(foo, 1, [{foo, 100}, {bar, 200}])). - maybe_filtered_json_doc_no_filter_test() -> Body = {[{<<"a">>, 1}]}, Doc = #doc{id = <<"1">>, revs = {1, [<<"r1">>]}, body = Body}, |