diff options
Diffstat (limited to 'src/mem3/src/mem3_rpc.erl')
-rw-r--r-- | src/mem3/src/mem3_rpc.erl | 688 |
1 files changed, 387 insertions, 301 deletions
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl index 9e0f42a8e..468bdee21 100644 --- a/src/mem3/src/mem3_rpc.erl +++ b/src/mem3/src/mem3_rpc.erl @@ -14,7 +14,6 @@ -module(mem3_rpc). - -export([ find_common_seq/4, get_missing_revs/4, @@ -43,18 +42,14 @@ save_purge_checkpoint_rpc/3, replicate_rpc/2 - ]). - -include("mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). - -define(BATCH_SIZE, 1000). -define(REXI_CALL_TIMEOUT_MSEC, 600000). - % "Pull" is a bit of a misnomer here, as what we're actually doing is % issuing an RPC request and telling the remote node to push updates to % us. This lets us reuse all of the battle-tested machinery of mem3_rpc. @@ -64,7 +59,6 @@ pull_replication(Seed) -> get_missing_revs(Node, DbName, IdsRevs, Options) -> rexi_call(Node, {fabric_rpc, get_missing_revs, [DbName, IdsRevs, Options]}). - update_docs(Node, DbName, Docs, Options) -> rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}). @@ -77,95 +71,91 @@ load_checkpoint(Node, DbName, SourceNode, SourceUUID, FilterHash) -> Args = [DbName, SourceNode, SourceUUID, FilterHash], rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}). - load_checkpoint(Node, DbName, SourceNode, SourceUUID) -> Args = [DbName, SourceNode, SourceUUID], rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}). - save_checkpoint(Node, DbName, DocId, Seq, Entry, History) -> Args = [DbName, DocId, Seq, Entry, History], rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}). - find_common_seq(Node, DbName, SourceUUID, SourceEpochs) -> Args = [DbName, SourceUUID, SourceEpochs], rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}). - load_purge_infos(Node, DbName, SourceUUID, Count) -> Args = [DbName, SourceUUID, Count], rexi_call(Node, {mem3_rpc, load_purge_infos_rpc, Args}). - save_purge_checkpoint(Node, DbName, PurgeDocId, Body) -> Args = [DbName, PurgeDocId, Body], rexi_call(Node, {mem3_rpc, save_purge_checkpoint_rpc, Args}). - purge_docs(Node, DbName, PurgeInfos, Options) -> rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PurgeInfos, Options]}). - -replicate(Source, Target, DbName, Timeout) - when is_atom(Source), is_atom(Target), is_binary(DbName) -> +replicate(Source, Target, DbName, Timeout) when + is_atom(Source), is_atom(Target), is_binary(DbName) +-> Args = [DbName, Target], rexi_call(Source, {mem3_rpc, replicate_rpc, Args}, Timeout). - load_checkpoint_rpc(DbName, SourceNode, SourceUUID) -> load_checkpoint_rpc(DbName, SourceNode, SourceUUID, <<>>). - load_checkpoint_rpc(DbName, SourceNode, SourceUUID, FilterHash) -> erlang:put(io_priority, {internal_repl, DbName}), case get_or_create_db(DbName, [?ADMIN_CTX]) of - {ok, Db} -> - TargetUUID = couch_db:get_uuid(Db), - NewId = mem3_rep:make_local_id(SourceUUID, TargetUUID, FilterHash), - case couch_db:open_doc(Db, NewId, []) of - {ok, Doc} -> - rexi:reply({ok, {NewId, Doc}}); - {not_found, _} -> - OldId = mem3_rep:make_local_id(SourceNode, node()), - case couch_db:open_doc(Db, OldId, []) of - {ok, Doc} -> - rexi:reply({ok, {NewId, Doc}}); - {not_found, _} -> - rexi:reply({ok, {NewId, #doc{id = NewId}}}) - end - end; - Error -> - rexi:reply(Error) + {ok, Db} -> + TargetUUID = couch_db:get_uuid(Db), + NewId = mem3_rep:make_local_id(SourceUUID, TargetUUID, FilterHash), + case couch_db:open_doc(Db, NewId, []) of + {ok, Doc} -> + rexi:reply({ok, {NewId, Doc}}); + {not_found, _} -> + OldId = mem3_rep:make_local_id(SourceNode, node()), + case couch_db:open_doc(Db, OldId, []) of + {ok, Doc} -> + rexi:reply({ok, {NewId, Doc}}); + {not_found, _} -> + rexi:reply({ok, {NewId, #doc{id = NewId}}}) + end + end; + Error -> + rexi:reply(Error) end. - save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) -> erlang:put(io_priority, {internal_repl, DbName}), case get_or_create_db(DbName, [?ADMIN_CTX]) of {ok, Db} -> - NewEntry = {[ - {<<"target_node">>, atom_to_binary(node(), utf8)}, - {<<"target_uuid">>, couch_db:get_uuid(Db)}, - {<<"target_seq">>, couch_db:get_update_seq(Db)} - ] ++ NewEntry0}, - Body = {[ - {<<"seq">>, SourceSeq}, - {<<"target_uuid">>, couch_db:get_uuid(Db)}, - {<<"history">>, add_checkpoint(NewEntry, History0)} - ]}, + NewEntry = { + [ + {<<"target_node">>, atom_to_binary(node(), utf8)}, + {<<"target_uuid">>, couch_db:get_uuid(Db)}, + {<<"target_seq">>, couch_db:get_update_seq(Db)} + ] ++ NewEntry0 + }, + Body = + {[ + {<<"seq">>, SourceSeq}, + {<<"target_uuid">>, couch_db:get_uuid(Db)}, + {<<"history">>, add_checkpoint(NewEntry, History0)} + ]}, Doc = #doc{id = Id, body = Body}, - rexi:reply(try couch_db:update_doc(Db, Doc, []) of - {ok, _} -> - {ok, Body}; - Else -> - {error, Else} - catch - Exception -> - Exception; - error:Reason -> - {error, Reason} - end); + rexi:reply( + try couch_db:update_doc(Db, Doc, []) of + {ok, _} -> + {ok, Body}; + Else -> + {error, Else} + catch + Exception -> + Exception; + error:Reason -> + {error, Reason} + end + ); Error -> rexi:reply(Error) end. @@ -173,17 +163,17 @@ save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) -> find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) -> erlang:put(io_priority, {internal_repl, DbName}), case get_or_create_db(DbName, [?ADMIN_CTX]) of - {ok, Db} -> - case couch_db:get_uuid(Db) of - SourceUUID -> - TargetEpochs = couch_db:get_epochs(Db), - Seq = compare_epochs(SourceEpochs, TargetEpochs), - rexi:reply({ok, Seq}); - _Else -> - rexi:reply({ok, 0}) - end; - Error -> - rexi:reply(Error) + {ok, Db} -> + case couch_db:get_uuid(Db) of + SourceUUID -> + TargetEpochs = couch_db:get_epochs(Db), + Seq = compare_epochs(SourceEpochs, TargetEpochs), + rexi:reply({ok, Seq}); + _Else -> + rexi:reply({ok, 0}) + end; + Error -> + rexi:reply(Error) end. pull_replication_rpc(Target) -> @@ -192,29 +182,33 @@ pull_replication_rpc(Target) -> Repl = fun(Db) -> {Db, mem3_rep:go(Db, Target, Opts)} end, rexi:reply({ok, lists:map(Repl, Dbs)}). - load_purge_infos_rpc(DbName, SrcUUID, BatchSize) -> erlang:put(io_priority, {internal_repl, DbName}), case get_or_create_db(DbName, [?ADMIN_CTX]) of {ok, Db} -> TgtUUID = couch_db:get_uuid(Db), PurgeDocId = mem3_rep:make_purge_id(SrcUUID, TgtUUID), - StartSeq = case couch_db:open_doc(Db, PurgeDocId, []) of - {ok, #doc{body = {Props}}} -> - couch_util:get_value(<<"purge_seq">>, Props); - {not_found, _} -> - Oldest = couch_db:get_oldest_purge_seq(Db), - erlang:max(0, Oldest - 1) - end, + StartSeq = + case couch_db:open_doc(Db, PurgeDocId, []) of + {ok, #doc{body = {Props}}} -> + couch_util:get_value(<<"purge_seq">>, Props); + {not_found, _} -> + Oldest = couch_db:get_oldest_purge_seq(Db), + erlang:max(0, Oldest - 1) + end, FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) -> NewCount = Count + length(Revs), NewInfos = [{UUID, Id, Revs} | Infos], - Status = if NewCount < BatchSize -> ok; true -> stop end, + Status = + if + NewCount < BatchSize -> ok; + true -> stop + end, {Status, {NewCount, NewInfos, PSeq}} end, InitAcc = {0, [], StartSeq}, {ok, {_, PurgeInfos, ThroughSeq}} = - couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc), + couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc), PurgeSeq = couch_db:get_purge_seq(Db), Remaining = PurgeSeq - ThroughSeq, rexi:reply({ok, {PurgeDocId, PurgeInfos, ThroughSeq, Remaining}}); @@ -222,32 +216,33 @@ load_purge_infos_rpc(DbName, SrcUUID, BatchSize) -> rexi:reply(Else) end. - save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) -> erlang:put(io_priority, {internal_repl, DbName}), case get_or_create_db(DbName, [?ADMIN_CTX]) of {ok, Db} -> Doc = #doc{id = PurgeDocId, body = Body}, - Resp = try couch_db:update_doc(Db, Doc, []) of - Resp0 -> Resp0 - catch T:R -> - {T, R} - end, + Resp = + try couch_db:update_doc(Db, Doc, []) of + Resp0 -> Resp0 + catch + T:R -> + {T, R} + end, rexi:reply(Resp); Error -> rexi:reply(Error) end. - replicate_rpc(DbName, Target) -> - rexi:reply(try - Opts = [{batch_size, ?BATCH_SIZE}, {batch_count, all}], - {ok, mem3_rep:go(DbName, Target, Opts)} - catch - Tag:Error -> - {Tag, Error} - end). - + rexi:reply( + try + Opts = [{batch_size, ?BATCH_SIZE}, {batch_count, all}], + {ok, mem3_rep:go(DbName, Target, Opts)} + catch + Tag:Error -> + {Tag, Error} + end + ). %% @doc Return the sequence where two files with the same UUID diverged. compare_epochs(SourceEpochs, TargetEpochs) -> @@ -256,7 +251,6 @@ compare_epochs(SourceEpochs, TargetEpochs) -> lists:reverse(TargetEpochs) ). - compare_rev_epochs([{Node, Seq} | SourceRest], [{Node, Seq} | TargetRest]) -> % Common history, fast-forward compare_epochs(SourceRest, TargetRest); @@ -270,7 +264,6 @@ compare_rev_epochs([{_, SourceSeq} | _], [{_, TargetSeq} | _]) -> % The source was moved to a new location independently, take the minimum erlang:min(SourceSeq, TargetSeq) - 1. - %% @doc This adds a new update sequence checkpoint to the replication %% history. Checkpoints are keyed by the source node so that we %% aren't mixing history between source shard moves. @@ -298,7 +291,6 @@ add_checkpoint({Props}, {History}) -> NodeRemoved = lists:keydelete(SourceNode, 1, History), {[{SourceNode, NewSourceHistory} | NodeRemoved]}. - filter_history(SourceSeqThresh, TargetSeqThresh, History) -> SourceFilter = fun({Entry}) -> SourceSeq = couch_util:get_value(<<"source_seq">>, Entry), @@ -311,7 +303,6 @@ filter_history(SourceSeqThresh, TargetSeqThresh, History) -> SourceFiltered = lists:filter(SourceFilter, History), lists:filter(TargetFilter, SourceFiltered). - %% @doc This function adjusts our history to maintain a %% history of checkpoints that follow an exponentially %% increasing age from the most recent checkpoint. @@ -335,7 +326,7 @@ filter_history(SourceSeqThresh, TargetSeqThresh, History) -> %% This function expects the provided history to be sorted %% in descending order of source_seq values. rebucket([], _NewSeq, Bucket) -> - {Bucket+1, []}; + {Bucket + 1, []}; rebucket([{Entry} | RestHistory], NewSeq, Bucket) -> CurSeq = couch_util:get_value(<<"source_seq">>, Entry), case find_bucket(NewSeq, CurSeq, Bucket) of @@ -363,7 +354,6 @@ rebucket([{Entry} | RestHistory], NewSeq, Bucket) -> {NextBucket, [{Entry} | NewHistory]} end. - %% @doc Find the bucket id for the given sequence pair. find_bucket(NewSeq, CurSeq, Bucket) -> % The +1 constant in this comparison is a bit subtle. The @@ -371,27 +361,27 @@ find_bucket(NewSeq, CurSeq, Bucket) -> % the history is guaranteed to have a BucketId of 1. This % also relies on never having a duplicated update % sequence so adding 1 here guarantees a difference >= 2. - if (NewSeq - CurSeq + 1) > (2 bsl Bucket) -> - find_bucket(NewSeq, CurSeq, Bucket+1); - true -> - Bucket + if + (NewSeq - CurSeq + 1) > (2 bsl Bucket) -> + find_bucket(NewSeq, CurSeq, Bucket + 1); + true -> + Bucket end. - rexi_call(Node, MFA) -> rexi_call(Node, MFA, ?REXI_CALL_TIMEOUT_MSEC). - rexi_call(Node, MFA, Timeout) -> Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]), Ref = rexi:cast(Node, self(), MFA, [sync]), try - receive {Ref, {ok, Reply}} -> - Reply; - {Ref, Error} -> - erlang:error(Error); - {rexi_DOWN, Mon, _, Reason} -> - erlang:error({rexi_DOWN, {Node, Reason}}) + receive + {Ref, {ok, Reply}} -> + Reply; + {Ref, Error} -> + erlang:error(Error); + {rexi_DOWN, Mon, _, Reason} -> + erlang:error({rexi_DOWN, {Node, Reason}}) after Timeout -> erlang:error(timeout) end @@ -399,15 +389,12 @@ rexi_call(Node, MFA, Timeout) -> rexi_monitor:stop(Mon) end. - get_or_create_db(DbName, Options) -> mem3_util:get_or_create_db_int(DbName, Options). - -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). - -define(SNODE, <<"src@localhost">>). -define(TNODE, <<"tgt@localhost">>). -define(SNODE_KV, {<<"source_node">>, ?SNODE}). @@ -416,7 +403,6 @@ get_or_create_db(DbName, Options) -> -define(TSEQ, <<"target_seq">>). -define(ENTRY(S, T), {[?SNODE_KV, {?SSEQ, S}, ?TNODE_KV, {?TSEQ, T}]}). - filter_history_data() -> [ ?ENTRY(13, 15), @@ -424,7 +410,6 @@ filter_history_data() -> ?ENTRY(2, 3) ]. - filter_history_remove_none_test() -> ?assertEqual(filter_history(20, 20, filter_history_data()), [ ?ENTRY(13, 15), @@ -432,11 +417,9 @@ filter_history_remove_none_test() -> ?ENTRY(2, 3) ]). - filter_history_remove_all_test() -> ?assertEqual(filter_history(1, 1, filter_history_data()), []). - filter_history_remove_equal_test() -> ?assertEqual(filter_history(10, 10, filter_history_data()), [ ?ENTRY(2, 3) @@ -445,7 +428,6 @@ filter_history_remove_equal_test() -> ?ENTRY(2, 3) ]). - filter_history_remove_for_source_and_target_test() -> ?assertEqual(filter_history(11, 20, filter_history_data()), [ ?ENTRY(10, 9), @@ -456,129 +438,176 @@ filter_history_remove_for_source_and_target_test() -> ?ENTRY(2, 3) ]). - filter_history_remove_for_both_test() -> ?assertEqual(filter_history(11, 11, filter_history_data()), [ ?ENTRY(10, 9), ?ENTRY(2, 3) ]). - filter_history_remove_for_both_again_test() -> ?assertEqual(filter_history(3, 4, filter_history_data()), [ ?ENTRY(2, 3) ]). - add_first_checkpoint_test() -> History = {[]}, - ?assertEqual(add_checkpoint(?ENTRY(2, 3), History), {[ - {?SNODE, [ - ?ENTRY(2, 3) + ?assertEqual( + add_checkpoint(?ENTRY(2, 3), History), + {[ + {?SNODE, [ + ?ENTRY(2, 3) + ]} ]} - ]}). - + ). add_first_checkpoint_to_empty_test() -> History = {[{?SNODE, []}]}, - ?assertEqual(add_checkpoint(?ENTRY(2, 3), History), {[ - {?SNODE, [ - ?ENTRY(2, 3) + ?assertEqual( + add_checkpoint(?ENTRY(2, 3), History), + {[ + {?SNODE, [ + ?ENTRY(2, 3) + ]} ]} - ]}). - + ). add_second_checkpoint_test() -> History = {[{?SNODE, [?ENTRY(2, 3)]}]}, - ?assertEqual(add_checkpoint(?ENTRY(10, 9), History), {[ - {?SNODE, [ - ?ENTRY(10, 9), - ?ENTRY(2, 3) + ?assertEqual( + add_checkpoint(?ENTRY(10, 9), History), + {[ + {?SNODE, [ + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} ]} - ]}). - + ). add_third_checkpoint_test() -> - History = {[{?SNODE, [ - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]}]}, - ?assertEqual(add_checkpoint(?ENTRY(11, 10), History), {[ - {?SNODE, [ - ?ENTRY(11, 10), - ?ENTRY(10, 9), - ?ENTRY(2, 3) + History = + {[ + {?SNODE, [ + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} + ]}, + ?assertEqual( + add_checkpoint(?ENTRY(11, 10), History), + {[ + {?SNODE, [ + ?ENTRY(11, 10), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} ]} - ]}). - + ). add_fourth_checkpoint_test() -> - History = {[{?SNODE, [ - ?ENTRY(11, 10), - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]}]}, - ?assertEqual(add_checkpoint(?ENTRY(12, 13), History), {[ - {?SNODE, [ - ?ENTRY(12, 13), - ?ENTRY(11, 10), - ?ENTRY(10, 9), - ?ENTRY(2, 3) + History = + {[ + {?SNODE, [ + ?ENTRY(11, 10), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} + ]}, + ?assertEqual( + add_checkpoint(?ENTRY(12, 13), History), + {[ + {?SNODE, [ + ?ENTRY(12, 13), + ?ENTRY(11, 10), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} ]} - ]}). - + ). add_checkpoint_with_replacement_test() -> - History = {[{?SNODE, [ - ?ENTRY(12, 13), - ?ENTRY(11, 10), - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]}]}, + History = + {[ + {?SNODE, [ + ?ENTRY(12, 13), + ?ENTRY(11, 10), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} + ]}, % Picking a source_seq of 16 to force 10, 11, and 12 % into the same bucket to show we drop the 11 entry. - ?assertEqual(add_checkpoint(?ENTRY(16, 16), History), {[ - {?SNODE, [ - ?ENTRY(16, 16), - ?ENTRY(12, 13), - ?ENTRY(10, 9), - ?ENTRY(2, 3) + ?assertEqual( + add_checkpoint(?ENTRY(16, 16), History), + {[ + {?SNODE, [ + ?ENTRY(16, 16), + ?ENTRY(12, 13), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} ]} - ]}). + ). add_checkpoint_drops_redundant_checkpoints_test() -> % I've added comments showing the bucket ID based % on the ?ENTRY passed to add_checkpoint - History = {[{?SNODE, [ - ?ENTRY(15, 15), % Bucket 0 - ?ENTRY(14, 14), % Bucket 1 - ?ENTRY(13, 13), % Bucket 1 - ?ENTRY(12, 12), % Bucket 2 - ?ENTRY(11, 11), % Bucket 2 - ?ENTRY(10, 10), % Bucket 2 - ?ENTRY(9, 9), % Bucket 2 - ?ENTRY(8, 8), % Bucket 3 - ?ENTRY(7, 7), % Bucket 3 - ?ENTRY(6, 6), % Bucket 3 - ?ENTRY(5, 5), % Bucket 3 - ?ENTRY(4, 4), % Bucket 3 - ?ENTRY(3, 3), % Bucket 3 - ?ENTRY(2, 2), % Bucket 3 - ?ENTRY(1, 1) % Bucket 3 - ]}]}, - ?assertEqual(add_checkpoint(?ENTRY(16, 16), History), {[ - {?SNODE, [ - ?ENTRY(16, 16), % Bucket 0 - ?ENTRY(15, 15), % Bucket 0 - ?ENTRY(14, 14), % Bucket 1 - ?ENTRY(13, 13), % Bucket 1 - ?ENTRY(12, 12), % Bucket 2 - ?ENTRY(9, 9), % Bucket 2 - ?ENTRY(8, 8), % Bucket 3 - ?ENTRY(1, 1) % Bucket 3 + History = + {[ + {?SNODE, [ + % Bucket 0 + ?ENTRY(15, 15), + % Bucket 1 + ?ENTRY(14, 14), + % Bucket 1 + ?ENTRY(13, 13), + % Bucket 2 + ?ENTRY(12, 12), + % Bucket 2 + ?ENTRY(11, 11), + % Bucket 2 + ?ENTRY(10, 10), + % Bucket 2 + ?ENTRY(9, 9), + % Bucket 3 + ?ENTRY(8, 8), + % Bucket 3 + ?ENTRY(7, 7), + % Bucket 3 + ?ENTRY(6, 6), + % Bucket 3 + ?ENTRY(5, 5), + % Bucket 3 + ?ENTRY(4, 4), + % Bucket 3 + ?ENTRY(3, 3), + % Bucket 3 + ?ENTRY(2, 2), + % Bucket 3 + ?ENTRY(1, 1) + ]} + ]}, + ?assertEqual( + add_checkpoint(?ENTRY(16, 16), History), + {[ + {?SNODE, [ + % Bucket 0 + ?ENTRY(16, 16), + % Bucket 0 + ?ENTRY(15, 15), + % Bucket 1 + ?ENTRY(14, 14), + % Bucket 1 + ?ENTRY(13, 13), + % Bucket 2 + ?ENTRY(12, 12), + % Bucket 2 + ?ENTRY(9, 9), + % Bucket 3 + ?ENTRY(8, 8), + % Bucket 3 + ?ENTRY(1, 1) + ]} ]} - ]}). - + ). add_checkpoint_show_not_always_a_drop_test() -> % Depending on the edge conditions of buckets we @@ -588,124 +617,181 @@ add_checkpoint_show_not_always_a_drop_test() -> % % I've added comments showing the bucket ID based % on the ?ENTRY passed to add_checkpoint - History = {[{?SNODE, [ - ?ENTRY(16, 16), % Bucket 0 - ?ENTRY(15, 15), % Bucket 1 - ?ENTRY(14, 14), % Bucket 1 - ?ENTRY(13, 13), % Bucket 2 - ?ENTRY(12, 12), % Bucket 2 - ?ENTRY(9, 9), % Bucket 3 - ?ENTRY(8, 8), % Bucket 3 - ?ENTRY(1, 1) % Bucket 4 - ]}]}, - ?assertEqual(add_checkpoint(?ENTRY(17, 17), History), {[ - {?SNODE, [ - ?ENTRY(17, 17), % Bucket 0 - ?ENTRY(16, 16), % Bucket 0 - ?ENTRY(15, 15), % Bucket 1 - ?ENTRY(14, 14), % Bucket 1 - ?ENTRY(13, 13), % Bucket 2 - ?ENTRY(12, 12), % Bucket 2 - ?ENTRY(9, 9), % Bucket 3 - ?ENTRY(8, 8), % Bucket 3 - ?ENTRY(1, 1) % Bucket 4 + History = + {[ + {?SNODE, [ + % Bucket 0 + ?ENTRY(16, 16), + % Bucket 1 + ?ENTRY(15, 15), + % Bucket 1 + ?ENTRY(14, 14), + % Bucket 2 + ?ENTRY(13, 13), + % Bucket 2 + ?ENTRY(12, 12), + % Bucket 3 + ?ENTRY(9, 9), + % Bucket 3 + ?ENTRY(8, 8), + % Bucket 4 + ?ENTRY(1, 1) + ]} + ]}, + ?assertEqual( + add_checkpoint(?ENTRY(17, 17), History), + {[ + {?SNODE, [ + % Bucket 0 + ?ENTRY(17, 17), + % Bucket 0 + ?ENTRY(16, 16), + % Bucket 1 + ?ENTRY(15, 15), + % Bucket 1 + ?ENTRY(14, 14), + % Bucket 2 + ?ENTRY(13, 13), + % Bucket 2 + ?ENTRY(12, 12), + % Bucket 3 + ?ENTRY(9, 9), + % Bucket 3 + ?ENTRY(8, 8), + % Bucket 4 + ?ENTRY(1, 1) + ]} ]} - ]}). - + ). add_checkpoint_big_jump_show_lots_drop_test() -> % I've added comments showing the bucket ID based % on the ?ENTRY passed to add_checkpoint - History = {[{?SNODE, [ - ?ENTRY(16, 16), % Bucket 4 - ?ENTRY(15, 15), % Bucket 4 - ?ENTRY(14, 14), % Bucket 4 - ?ENTRY(13, 13), % Bucket 4 - ?ENTRY(12, 12), % Bucket 4 - ?ENTRY(9, 9), % Bucket 4 - ?ENTRY(8, 8), % Bucket 4 - ?ENTRY(1, 1) % Bucket 4 - ]}]}, - ?assertEqual(add_checkpoint(?ENTRY(32, 32), History), {[ - {?SNODE, [ - ?ENTRY(32, 32), % Bucket 0 - ?ENTRY(16, 16), % Bucket 4 - ?ENTRY(1, 1) % Bucket 4 + History = + {[ + {?SNODE, [ + % Bucket 4 + ?ENTRY(16, 16), + % Bucket 4 + ?ENTRY(15, 15), + % Bucket 4 + ?ENTRY(14, 14), + % Bucket 4 + ?ENTRY(13, 13), + % Bucket 4 + ?ENTRY(12, 12), + % Bucket 4 + ?ENTRY(9, 9), + % Bucket 4 + ?ENTRY(8, 8), + % Bucket 4 + ?ENTRY(1, 1) + ]} + ]}, + ?assertEqual( + add_checkpoint(?ENTRY(32, 32), History), + {[ + {?SNODE, [ + % Bucket 0 + ?ENTRY(32, 32), + % Bucket 4 + ?ENTRY(16, 16), + % Bucket 4 + ?ENTRY(1, 1) + ]} ]} - ]}). - + ). add_checkpoint_show_filter_history_test() -> - History = {[{?SNODE, [ - ?ENTRY(16, 16), - ?ENTRY(15, 15), - ?ENTRY(14, 14), - ?ENTRY(13, 13), - ?ENTRY(12, 12), - ?ENTRY(9, 9), - ?ENTRY(8, 8), - ?ENTRY(1, 1) - ]}]}, + History = + {[ + {?SNODE, [ + ?ENTRY(16, 16), + ?ENTRY(15, 15), + ?ENTRY(14, 14), + ?ENTRY(13, 13), + ?ENTRY(12, 12), + ?ENTRY(9, 9), + ?ENTRY(8, 8), + ?ENTRY(1, 1) + ]} + ]}, % Drop for both - ?assertEqual(add_checkpoint(?ENTRY(10, 10), History), {[ - {?SNODE, [ - ?ENTRY(10, 10), - ?ENTRY(9, 9), - ?ENTRY(8, 8), - ?ENTRY(1, 1) + ?assertEqual( + add_checkpoint(?ENTRY(10, 10), History), + {[ + {?SNODE, [ + ?ENTRY(10, 10), + ?ENTRY(9, 9), + ?ENTRY(8, 8), + ?ENTRY(1, 1) + ]} ]} - ]}), + ), % Drop four source - ?assertEqual(add_checkpoint(?ENTRY(10, 200), History), {[ - {?SNODE, [ - ?ENTRY(10, 200), - ?ENTRY(9, 9), - ?ENTRY(8, 8), - ?ENTRY(1, 1) + ?assertEqual( + add_checkpoint(?ENTRY(10, 200), History), + {[ + {?SNODE, [ + ?ENTRY(10, 200), + ?ENTRY(9, 9), + ?ENTRY(8, 8), + ?ENTRY(1, 1) + ]} ]} - ]}), + ), % Drop for target. Obviously a source_seq of 200 % will end up droping the 8 entry. - ?assertEqual(add_checkpoint(?ENTRY(200, 10), History), {[ - {?SNODE, [ - ?ENTRY(200, 10), - ?ENTRY(9, 9), - ?ENTRY(1, 1) + ?assertEqual( + add_checkpoint(?ENTRY(200, 10), History), + {[ + {?SNODE, [ + ?ENTRY(200, 10), + ?ENTRY(9, 9), + ?ENTRY(1, 1) + ]} ]} - ]}). - + ). add_checkpoint_from_other_node_test() -> - History = {[{<<"not_the_source">>, [ - ?ENTRY(12, 13), - ?ENTRY(11, 10), - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]}]}, - % No filtering - ?assertEqual(add_checkpoint(?ENTRY(1, 1), History), {[ - {?SNODE, [ - ?ENTRY(1, 1) + History = + {[ + {<<"not_the_source">>, [ + ?ENTRY(12, 13), + ?ENTRY(11, 10), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} ]}, - {<<"not_the_source">>, [ - ?ENTRY(12, 13), - ?ENTRY(11, 10), - ?ENTRY(10, 9), - ?ENTRY(2, 3) + % No filtering + ?assertEqual( + add_checkpoint(?ENTRY(1, 1), History), + {[ + {?SNODE, [ + ?ENTRY(1, 1) + ]}, + {<<"not_the_source">>, [ + ?ENTRY(12, 13), + ?ENTRY(11, 10), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} ]} - ]}), + ), % No dropping - ?assertEqual(add_checkpoint(?ENTRY(200, 200), History), {[ - {?SNODE, [ - ?ENTRY(200, 200) - ]}, - {<<"not_the_source">>, [ - ?ENTRY(12, 13), - ?ENTRY(11, 10), - ?ENTRY(10, 9), - ?ENTRY(2, 3) + ?assertEqual( + add_checkpoint(?ENTRY(200, 200), History), + {[ + {?SNODE, [ + ?ENTRY(200, 200) + ]}, + {<<"not_the_source">>, [ + ?ENTRY(12, 13), + ?ENTRY(11, 10), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} ]} - ]}). - + ). -endif. |