summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2017-02-03 09:59:23 -0600
committerPaul J. Davis <paul.joseph.davis@gmail.com>2017-04-06 12:30:40 -0500
commit6a0b4b248bbdbf501399996469abc0f981a70467 (patch)
tree901f52e9b59a67c554b90762618c8b03dbcf2639
parent7349bd971b05ca750fb7aaa68e8c159aac5cc152 (diff)
downloadcouchdb-6a0b4b248bbdbf501399996469abc0f981a70467.tar.gz
Move calculate_start_seq and owner_of
These functions were originally implemented in fabric_rpc.erl where they really didn't belong. Moving them to couch_db.erl allows us to keep the unit tests intact rather than just removing them now that the #db record is being made private. COUCHDB-3288
-rw-r--r--src/couch/src/couch_db.erl103
1 files changed, 102 insertions, 1 deletions
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index de9dd9fa1..1f68200e4 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -83,6 +83,9 @@
changes_since/5,
count_changes_since/2,
+ calculate_start_seq/3,
+ owner_of/2,
+
start_compact/1,
cancel_compact/1,
wait_for_compaction/1,
@@ -386,7 +389,9 @@ get_uuid(#db{}=Db) ->
couch_db_header:uuid(Db#db.header).
get_epochs(#db{}=Db) ->
- couch_db_header:epochs(Db#db.header).
+ Epochs = couch_db_header:epochs(Db#db.header),
+ validate_epochs(Epochs),
+ Epochs.
get_compacted_seq(#db{}=Db) ->
couch_db_header:compacted_seq(Db#db.header).
@@ -1360,6 +1365,78 @@ enum_docs(Db, NS, InFun, InAcc, Options0) ->
Db#db.id_tree, FoldFun, InAcc, Options),
{ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
+
+calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
+ Seq;
+calculate_start_seq(Db, Node, {Seq, Uuid}) ->
+ % Treat the current node as the epoch node
+ calculate_start_seq(Db, Node, {Seq, Uuid, Node});
+calculate_start_seq(Db, _Node, {Seq, Uuid, EpochNode}) ->
+ case is_prefix(Uuid, get_uuid(Db)) of
+ true ->
+ case is_owner(EpochNode, Seq, get_epochs(Db)) of
+ true -> Seq;
+ false -> 0
+ end;
+ false ->
+ %% The file was rebuilt, most likely in a different
+ %% order, so rewind.
+ 0
+ end;
+calculate_start_seq(Db, _Node, {replace, OriginalNode, Uuid, Seq}) ->
+ case is_prefix(Uuid, couch_db:get_uuid(Db)) of
+ true ->
+ start_seq(get_epochs(Db), OriginalNode, Seq);
+ false ->
+ {replace, OriginalNode, Uuid, Seq}
+ end.
+
+
+validate_epochs(Epochs) ->
+ %% Assert uniqueness.
+ case length(Epochs) == length(lists:ukeysort(2, Epochs)) of
+ true -> ok;
+ false -> erlang:error(duplicate_epoch)
+ end,
+ %% Assert order.
+ case Epochs == lists:sort(fun({_, A}, {_, B}) -> B =< A end, Epochs) of
+ true -> ok;
+ false -> erlang:error(epoch_order)
+ end.
+
+
+is_prefix(Pattern, Subject) ->
+ binary:longest_common_prefix([Pattern, Subject]) == size(Pattern).
+
+
+is_owner(Node, Seq, Epochs) ->
+ Node =:= owner_of(Epochs, Seq).
+
+
+owner_of(Db, Seq) when not is_list(Db) ->
+ owner_of(get_epochs(Db), Seq);
+owner_of([], _Seq) ->
+ undefined;
+owner_of([{EpochNode, EpochSeq} | _Rest], Seq) when Seq > EpochSeq ->
+ EpochNode;
+owner_of([_ | Rest], Seq) ->
+ owner_of(Rest, Seq).
+
+
+start_seq([{OrigNode, EpochSeq} | _], OrigNode, Seq) when Seq > EpochSeq ->
+ %% OrigNode is the owner of the Seq so we can safely stream from there
+ Seq;
+start_seq([{_, NewSeq}, {OrigNode, _} | _], OrigNode, Seq) when Seq > NewSeq ->
+ %% We transferred this file before Seq was written on OrigNode, so we need
+ %% to stream from the beginning of the next epoch. Note that it is _not_
+ %% necessary for the current node to own the epoch beginning at NewSeq
+ NewSeq;
+start_seq([_ | Rest], OrigNode, Seq) ->
+ start_seq(Rest, OrigNode, Seq);
+start_seq([], OrigNode, Seq) ->
+ erlang:error({epoch_mismatch, OrigNode, Seq}).
+
+
extract_namespace(Options0) ->
case proplists:split(Options0, [namespace]) of
{[[{namespace, NS}]], Options} ->
@@ -1698,6 +1775,30 @@ should_fail_validate_dbname(DbName) ->
ok
end)}.
+calculate_start_seq_test() ->
+ %% uuid mismatch is always a rewind.
+ Hdr1 = couch_db_header:new(),
+ Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]),
+ ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})),
+ %% uuid matches and seq is owned by node.
+ Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]),
+ ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})),
+ %% uuids match but seq is not owned by node.
+ Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]),
+ ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})),
+ %% return integer if we didn't get a vector.
+ ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)).
+
+is_owner_test() ->
+ ?assertNot(is_owner(foo, 1, [])),
+ ?assertNot(is_owner(foo, 1, [{foo, 1}])),
+ ?assert(is_owner(foo, 2, [{foo, 1}])),
+ ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])),
+ ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
+ ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])),
+ ?assertError(duplicate_epoch, validate_epochs([{foo, 1}, {bar, 1}])),
+ ?assertError(epoch_order, validate_epochs([{foo, 100}, {bar, 200}])).
+
to_binary(DbName) when is_list(DbName) ->
?l2b(DbName);
to_binary(DbName) when is_binary(DbName) ->