diff options
Diffstat (limited to 'src/mem3/src/mem3_rpc.erl')
-rw-r--r-- | src/mem3/src/mem3_rpc.erl | 797 |
1 files changed, 0 insertions, 797 deletions
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl deleted file mode 100644 index 468bdee21..000000000 --- a/src/mem3/src/mem3_rpc.erl +++ /dev/null @@ -1,797 +0,0 @@ -% Copyright 2013 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_rpc). - --export([ - find_common_seq/4, - get_missing_revs/4, - update_docs/4, - pull_replication/1, - load_checkpoint/4, - load_checkpoint/5, - save_checkpoint/6, - - load_purge_infos/4, - save_purge_checkpoint/4, - purge_docs/4, - - replicate/4 -]). - -% Private RPC callbacks --export([ - find_common_seq_rpc/3, - load_checkpoint_rpc/3, - pull_replication_rpc/1, - load_checkpoint_rpc/4, - save_checkpoint_rpc/5, - - load_purge_infos_rpc/3, - save_purge_checkpoint_rpc/3, - - replicate_rpc/2 -]). - --include("mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - --define(BATCH_SIZE, 1000). --define(REXI_CALL_TIMEOUT_MSEC, 600000). - -% "Pull" is a bit of a misnomer here, as what we're actually doing is -% issuing an RPC request and telling the remote node to push updates to -% us. This lets us reuse all of the battle-tested machinery of mem3_rpc. -pull_replication(Seed) -> - rexi_call(Seed, {mem3_rpc, pull_replication_rpc, [node()]}). - -get_missing_revs(Node, DbName, IdsRevs, Options) -> - rexi_call(Node, {fabric_rpc, get_missing_revs, [DbName, IdsRevs, Options]}). - -update_docs(Node, DbName, Docs, Options) -> - rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}). - -load_checkpoint(Node, DbName, SourceNode, SourceUUID, <<>>) -> - % Upgrade clause for a mixed cluster for old nodes that don't have - % load_checkpoint_rpc/4 yet. FilterHash is currently not - % used and so defaults to <<>> everywhere - load_checkpoint(Node, DbName, SourceNode, SourceUUID); -load_checkpoint(Node, DbName, SourceNode, SourceUUID, FilterHash) -> - Args = [DbName, SourceNode, SourceUUID, FilterHash], - rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}). - -load_checkpoint(Node, DbName, SourceNode, SourceUUID) -> - Args = [DbName, SourceNode, SourceUUID], - rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}). - -save_checkpoint(Node, DbName, DocId, Seq, Entry, History) -> - Args = [DbName, DocId, Seq, Entry, History], - rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}). - -find_common_seq(Node, DbName, SourceUUID, SourceEpochs) -> - Args = [DbName, SourceUUID, SourceEpochs], - rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}). - -load_purge_infos(Node, DbName, SourceUUID, Count) -> - Args = [DbName, SourceUUID, Count], - rexi_call(Node, {mem3_rpc, load_purge_infos_rpc, Args}). - -save_purge_checkpoint(Node, DbName, PurgeDocId, Body) -> - Args = [DbName, PurgeDocId, Body], - rexi_call(Node, {mem3_rpc, save_purge_checkpoint_rpc, Args}). - -purge_docs(Node, DbName, PurgeInfos, Options) -> - rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PurgeInfos, Options]}). - -replicate(Source, Target, DbName, Timeout) when - is_atom(Source), is_atom(Target), is_binary(DbName) --> - Args = [DbName, Target], - rexi_call(Source, {mem3_rpc, replicate_rpc, Args}, Timeout). - -load_checkpoint_rpc(DbName, SourceNode, SourceUUID) -> - load_checkpoint_rpc(DbName, SourceNode, SourceUUID, <<>>). - -load_checkpoint_rpc(DbName, SourceNode, SourceUUID, FilterHash) -> - erlang:put(io_priority, {internal_repl, DbName}), - case get_or_create_db(DbName, [?ADMIN_CTX]) of - {ok, Db} -> - TargetUUID = couch_db:get_uuid(Db), - NewId = mem3_rep:make_local_id(SourceUUID, TargetUUID, FilterHash), - case couch_db:open_doc(Db, NewId, []) of - {ok, Doc} -> - rexi:reply({ok, {NewId, Doc}}); - {not_found, _} -> - OldId = mem3_rep:make_local_id(SourceNode, node()), - case couch_db:open_doc(Db, OldId, []) of - {ok, Doc} -> - rexi:reply({ok, {NewId, Doc}}); - {not_found, _} -> - rexi:reply({ok, {NewId, #doc{id = NewId}}}) - end - end; - Error -> - rexi:reply(Error) - end. - -save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) -> - erlang:put(io_priority, {internal_repl, DbName}), - case get_or_create_db(DbName, [?ADMIN_CTX]) of - {ok, Db} -> - NewEntry = { - [ - {<<"target_node">>, atom_to_binary(node(), utf8)}, - {<<"target_uuid">>, couch_db:get_uuid(Db)}, - {<<"target_seq">>, couch_db:get_update_seq(Db)} - ] ++ NewEntry0 - }, - Body = - {[ - {<<"seq">>, SourceSeq}, - {<<"target_uuid">>, couch_db:get_uuid(Db)}, - {<<"history">>, add_checkpoint(NewEntry, History0)} - ]}, - Doc = #doc{id = Id, body = Body}, - rexi:reply( - try couch_db:update_doc(Db, Doc, []) of - {ok, _} -> - {ok, Body}; - Else -> - {error, Else} - catch - Exception -> - Exception; - error:Reason -> - {error, Reason} - end - ); - Error -> - rexi:reply(Error) - end. - -find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) -> - erlang:put(io_priority, {internal_repl, DbName}), - case get_or_create_db(DbName, [?ADMIN_CTX]) of - {ok, Db} -> - case couch_db:get_uuid(Db) of - SourceUUID -> - TargetEpochs = couch_db:get_epochs(Db), - Seq = compare_epochs(SourceEpochs, TargetEpochs), - rexi:reply({ok, Seq}); - _Else -> - rexi:reply({ok, 0}) - end; - Error -> - rexi:reply(Error) - end. - -pull_replication_rpc(Target) -> - Dbs = mem3_sync:local_dbs(), - Opts = [{batch_size, 1000}, {batch_count, 50}], - Repl = fun(Db) -> {Db, mem3_rep:go(Db, Target, Opts)} end, - rexi:reply({ok, lists:map(Repl, Dbs)}). - -load_purge_infos_rpc(DbName, SrcUUID, BatchSize) -> - erlang:put(io_priority, {internal_repl, DbName}), - case get_or_create_db(DbName, [?ADMIN_CTX]) of - {ok, Db} -> - TgtUUID = couch_db:get_uuid(Db), - PurgeDocId = mem3_rep:make_purge_id(SrcUUID, TgtUUID), - StartSeq = - case couch_db:open_doc(Db, PurgeDocId, []) of - {ok, #doc{body = {Props}}} -> - couch_util:get_value(<<"purge_seq">>, Props); - {not_found, _} -> - Oldest = couch_db:get_oldest_purge_seq(Db), - erlang:max(0, Oldest - 1) - end, - FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) -> - NewCount = Count + length(Revs), - NewInfos = [{UUID, Id, Revs} | Infos], - Status = - if - NewCount < BatchSize -> ok; - true -> stop - end, - {Status, {NewCount, NewInfos, PSeq}} - end, - InitAcc = {0, [], StartSeq}, - {ok, {_, PurgeInfos, ThroughSeq}} = - couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc), - PurgeSeq = couch_db:get_purge_seq(Db), - Remaining = PurgeSeq - ThroughSeq, - rexi:reply({ok, {PurgeDocId, PurgeInfos, ThroughSeq, Remaining}}); - Else -> - rexi:reply(Else) - end. - -save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) -> - erlang:put(io_priority, {internal_repl, DbName}), - case get_or_create_db(DbName, [?ADMIN_CTX]) of - {ok, Db} -> - Doc = #doc{id = PurgeDocId, body = Body}, - Resp = - try couch_db:update_doc(Db, Doc, []) of - Resp0 -> Resp0 - catch - T:R -> - {T, R} - end, - rexi:reply(Resp); - Error -> - rexi:reply(Error) - end. - -replicate_rpc(DbName, Target) -> - rexi:reply( - try - Opts = [{batch_size, ?BATCH_SIZE}, {batch_count, all}], - {ok, mem3_rep:go(DbName, Target, Opts)} - catch - Tag:Error -> - {Tag, Error} - end - ). - -%% @doc Return the sequence where two files with the same UUID diverged. -compare_epochs(SourceEpochs, TargetEpochs) -> - compare_rev_epochs( - lists:reverse(SourceEpochs), - lists:reverse(TargetEpochs) - ). - -compare_rev_epochs([{Node, Seq} | SourceRest], [{Node, Seq} | TargetRest]) -> - % Common history, fast-forward - compare_epochs(SourceRest, TargetRest); -compare_rev_epochs([], [{_, TargetSeq} | _]) -> - % Source has not moved, start from seq just before the target took over - TargetSeq - 1; -compare_rev_epochs([{_, SourceSeq} | _], []) -> - % Target has not moved, start from seq where source diverged - SourceSeq; -compare_rev_epochs([{_, SourceSeq} | _], [{_, TargetSeq} | _]) -> - % The source was moved to a new location independently, take the minimum - erlang:min(SourceSeq, TargetSeq) - 1. - -%% @doc This adds a new update sequence checkpoint to the replication -%% history. Checkpoints are keyed by the source node so that we -%% aren't mixing history between source shard moves. -add_checkpoint({Props}, {History}) -> - % Extract the source and target seqs for reference - SourceSeq = couch_util:get_value(<<"source_seq">>, Props), - TargetSeq = couch_util:get_value(<<"target_seq">>, Props), - - % Get the history relevant to the source node. - SourceNode = couch_util:get_value(<<"source_node">>, Props), - SourceHistory = couch_util:get_value(SourceNode, History, []), - - % If either the source or target shard has been truncated - % we need to filter out any history that was stored for - % any larger update seq than we're currently recording. - FilteredHistory = filter_history(SourceSeq, TargetSeq, SourceHistory), - - % Re-bucket our history based on the most recent source - % sequence. This is where we drop old checkpoints to - % maintain the exponential distribution. - {_, RebucketedHistory} = rebucket(FilteredHistory, SourceSeq, 0), - NewSourceHistory = [{Props} | RebucketedHistory], - - % Finally update the source node history and we're done. - NodeRemoved = lists:keydelete(SourceNode, 1, History), - {[{SourceNode, NewSourceHistory} | NodeRemoved]}. - -filter_history(SourceSeqThresh, TargetSeqThresh, History) -> - SourceFilter = fun({Entry}) -> - SourceSeq = couch_util:get_value(<<"source_seq">>, Entry), - SourceSeq < SourceSeqThresh - end, - TargetFilter = fun({Entry}) -> - TargetSeq = couch_util:get_value(<<"target_seq">>, Entry), - TargetSeq < TargetSeqThresh - end, - SourceFiltered = lists:filter(SourceFilter, History), - lists:filter(TargetFilter, SourceFiltered). - -%% @doc This function adjusts our history to maintain a -%% history of checkpoints that follow an exponentially -%% increasing age from the most recent checkpoint. -%% -%% The terms newest and oldest used in these comments -%% refers to the (NewSeq - CurSeq) difference where smaller -%% values are considered newer. -%% -%% It works by assigning each entry to a bucket and keeping -%% the newest and oldest entry in each bucket. Keeping -%% both the newest and oldest means that we won't end up -%% with empty buckets as checkpoints are promoted to new -%% buckets. -%% -%% The return value of this function is a two-tuple of the -%% form `{BucketId, History}` where BucketId is the id of -%% the bucket for the first entry in History. This is used -%% when recursing to detect the oldest value in a given -%% bucket. -%% -%% This function expects the provided history to be sorted -%% in descending order of source_seq values. -rebucket([], _NewSeq, Bucket) -> - {Bucket + 1, []}; -rebucket([{Entry} | RestHistory], NewSeq, Bucket) -> - CurSeq = couch_util:get_value(<<"source_seq">>, Entry), - case find_bucket(NewSeq, CurSeq, Bucket) of - Bucket -> - % This entry is in an existing bucket which means - % we will only keep it if its the oldest value - % in the bucket. To detect this we rebucket the - % rest of the list and only include Entry if the - % rest of the list is in a bigger bucket. - case rebucket(RestHistory, NewSeq, Bucket) of - {Bucket, NewHistory} -> - % There's another entry in this bucket so we drop the - % current entry. - {Bucket, NewHistory}; - {NextBucket, NewHistory} when NextBucket > Bucket -> - % The rest of the history was rebucketed into a larger - % bucket so this is the oldest entry in the current - % bucket. - {Bucket, [{Entry} | NewHistory]} - end; - NextBucket when NextBucket > Bucket -> - % This entry is the newest in NextBucket so we add it - % to our history and continue rebucketing. - {_, NewHistory} = rebucket(RestHistory, NewSeq, NextBucket), - {NextBucket, [{Entry} | NewHistory]} - end. - -%% @doc Find the bucket id for the given sequence pair. -find_bucket(NewSeq, CurSeq, Bucket) -> - % The +1 constant in this comparison is a bit subtle. The - % reason for it is to make sure that the first entry in - % the history is guaranteed to have a BucketId of 1. This - % also relies on never having a duplicated update - % sequence so adding 1 here guarantees a difference >= 2. - if - (NewSeq - CurSeq + 1) > (2 bsl Bucket) -> - find_bucket(NewSeq, CurSeq, Bucket + 1); - true -> - Bucket - end. - -rexi_call(Node, MFA) -> - rexi_call(Node, MFA, ?REXI_CALL_TIMEOUT_MSEC). - -rexi_call(Node, MFA, Timeout) -> - Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]), - Ref = rexi:cast(Node, self(), MFA, [sync]), - try - receive - {Ref, {ok, Reply}} -> - Reply; - {Ref, Error} -> - erlang:error(Error); - {rexi_DOWN, Mon, _, Reason} -> - erlang:error({rexi_DOWN, {Node, Reason}}) - after Timeout -> - erlang:error(timeout) - end - after - rexi_monitor:stop(Mon) - end. - -get_or_create_db(DbName, Options) -> - mem3_util:get_or_create_db_int(DbName, Options). - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). - --define(SNODE, <<"src@localhost">>). --define(TNODE, <<"tgt@localhost">>). --define(SNODE_KV, {<<"source_node">>, ?SNODE}). --define(TNODE_KV, {<<"target_node">>, ?TNODE}). --define(SSEQ, <<"source_seq">>). --define(TSEQ, <<"target_seq">>). --define(ENTRY(S, T), {[?SNODE_KV, {?SSEQ, S}, ?TNODE_KV, {?TSEQ, T}]}). - -filter_history_data() -> - [ - ?ENTRY(13, 15), - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]. - -filter_history_remove_none_test() -> - ?assertEqual(filter_history(20, 20, filter_history_data()), [ - ?ENTRY(13, 15), - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]). - -filter_history_remove_all_test() -> - ?assertEqual(filter_history(1, 1, filter_history_data()), []). - -filter_history_remove_equal_test() -> - ?assertEqual(filter_history(10, 10, filter_history_data()), [ - ?ENTRY(2, 3) - ]), - ?assertEqual(filter_history(11, 9, filter_history_data()), [ - ?ENTRY(2, 3) - ]). - -filter_history_remove_for_source_and_target_test() -> - ?assertEqual(filter_history(11, 20, filter_history_data()), [ - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]), - ?assertEqual(filter_history(14, 14, filter_history_data()), [ - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]). - -filter_history_remove_for_both_test() -> - ?assertEqual(filter_history(11, 11, filter_history_data()), [ - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]). - -filter_history_remove_for_both_again_test() -> - ?assertEqual(filter_history(3, 4, filter_history_data()), [ - ?ENTRY(2, 3) - ]). - -add_first_checkpoint_test() -> - History = {[]}, - ?assertEqual( - add_checkpoint(?ENTRY(2, 3), History), - {[ - {?SNODE, [ - ?ENTRY(2, 3) - ]} - ]} - ). - -add_first_checkpoint_to_empty_test() -> - History = {[{?SNODE, []}]}, - ?assertEqual( - add_checkpoint(?ENTRY(2, 3), History), - {[ - {?SNODE, [ - ?ENTRY(2, 3) - ]} - ]} - ). - -add_second_checkpoint_test() -> - History = {[{?SNODE, [?ENTRY(2, 3)]}]}, - ?assertEqual( - add_checkpoint(?ENTRY(10, 9), History), - {[ - {?SNODE, [ - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]} - ]} - ). - -add_third_checkpoint_test() -> - History = - {[ - {?SNODE, [ - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]} - ]}, - ?assertEqual( - add_checkpoint(?ENTRY(11, 10), History), - {[ - {?SNODE, [ - ?ENTRY(11, 10), - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]} - ]} - ). - -add_fourth_checkpoint_test() -> - History = - {[ - {?SNODE, [ - ?ENTRY(11, 10), - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]} - ]}, - ?assertEqual( - add_checkpoint(?ENTRY(12, 13), History), - {[ - {?SNODE, [ - ?ENTRY(12, 13), - ?ENTRY(11, 10), - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]} - ]} - ). - -add_checkpoint_with_replacement_test() -> - History = - {[ - {?SNODE, [ - ?ENTRY(12, 13), - ?ENTRY(11, 10), - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]} - ]}, - % Picking a source_seq of 16 to force 10, 11, and 12 - % into the same bucket to show we drop the 11 entry. - ?assertEqual( - add_checkpoint(?ENTRY(16, 16), History), - {[ - {?SNODE, [ - ?ENTRY(16, 16), - ?ENTRY(12, 13), - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]} - ]} - ). - -add_checkpoint_drops_redundant_checkpoints_test() -> - % I've added comments showing the bucket ID based - % on the ?ENTRY passed to add_checkpoint - History = - {[ - {?SNODE, [ - % Bucket 0 - ?ENTRY(15, 15), - % Bucket 1 - ?ENTRY(14, 14), - % Bucket 1 - ?ENTRY(13, 13), - % Bucket 2 - ?ENTRY(12, 12), - % Bucket 2 - ?ENTRY(11, 11), - % Bucket 2 - ?ENTRY(10, 10), - % Bucket 2 - ?ENTRY(9, 9), - % Bucket 3 - ?ENTRY(8, 8), - % Bucket 3 - ?ENTRY(7, 7), - % Bucket 3 - ?ENTRY(6, 6), - % Bucket 3 - ?ENTRY(5, 5), - % Bucket 3 - ?ENTRY(4, 4), - % Bucket 3 - ?ENTRY(3, 3), - % Bucket 3 - ?ENTRY(2, 2), - % Bucket 3 - ?ENTRY(1, 1) - ]} - ]}, - ?assertEqual( - add_checkpoint(?ENTRY(16, 16), History), - {[ - {?SNODE, [ - % Bucket 0 - ?ENTRY(16, 16), - % Bucket 0 - ?ENTRY(15, 15), - % Bucket 1 - ?ENTRY(14, 14), - % Bucket 1 - ?ENTRY(13, 13), - % Bucket 2 - ?ENTRY(12, 12), - % Bucket 2 - ?ENTRY(9, 9), - % Bucket 3 - ?ENTRY(8, 8), - % Bucket 3 - ?ENTRY(1, 1) - ]} - ]} - ). - -add_checkpoint_show_not_always_a_drop_test() -> - % Depending on the edge conditions of buckets we - % may not always drop values when adding new - % checkpoints. In this case 12 stays because there's - % no longer a value for 10 or 11. - % - % I've added comments showing the bucket ID based - % on the ?ENTRY passed to add_checkpoint - History = - {[ - {?SNODE, [ - % Bucket 0 - ?ENTRY(16, 16), - % Bucket 1 - ?ENTRY(15, 15), - % Bucket 1 - ?ENTRY(14, 14), - % Bucket 2 - ?ENTRY(13, 13), - % Bucket 2 - ?ENTRY(12, 12), - % Bucket 3 - ?ENTRY(9, 9), - % Bucket 3 - ?ENTRY(8, 8), - % Bucket 4 - ?ENTRY(1, 1) - ]} - ]}, - ?assertEqual( - add_checkpoint(?ENTRY(17, 17), History), - {[ - {?SNODE, [ - % Bucket 0 - ?ENTRY(17, 17), - % Bucket 0 - ?ENTRY(16, 16), - % Bucket 1 - ?ENTRY(15, 15), - % Bucket 1 - ?ENTRY(14, 14), - % Bucket 2 - ?ENTRY(13, 13), - % Bucket 2 - ?ENTRY(12, 12), - % Bucket 3 - ?ENTRY(9, 9), - % Bucket 3 - ?ENTRY(8, 8), - % Bucket 4 - ?ENTRY(1, 1) - ]} - ]} - ). - -add_checkpoint_big_jump_show_lots_drop_test() -> - % I've added comments showing the bucket ID based - % on the ?ENTRY passed to add_checkpoint - History = - {[ - {?SNODE, [ - % Bucket 4 - ?ENTRY(16, 16), - % Bucket 4 - ?ENTRY(15, 15), - % Bucket 4 - ?ENTRY(14, 14), - % Bucket 4 - ?ENTRY(13, 13), - % Bucket 4 - ?ENTRY(12, 12), - % Bucket 4 - ?ENTRY(9, 9), - % Bucket 4 - ?ENTRY(8, 8), - % Bucket 4 - ?ENTRY(1, 1) - ]} - ]}, - ?assertEqual( - add_checkpoint(?ENTRY(32, 32), History), - {[ - {?SNODE, [ - % Bucket 0 - ?ENTRY(32, 32), - % Bucket 4 - ?ENTRY(16, 16), - % Bucket 4 - ?ENTRY(1, 1) - ]} - ]} - ). - -add_checkpoint_show_filter_history_test() -> - History = - {[ - {?SNODE, [ - ?ENTRY(16, 16), - ?ENTRY(15, 15), - ?ENTRY(14, 14), - ?ENTRY(13, 13), - ?ENTRY(12, 12), - ?ENTRY(9, 9), - ?ENTRY(8, 8), - ?ENTRY(1, 1) - ]} - ]}, - % Drop for both - ?assertEqual( - add_checkpoint(?ENTRY(10, 10), History), - {[ - {?SNODE, [ - ?ENTRY(10, 10), - ?ENTRY(9, 9), - ?ENTRY(8, 8), - ?ENTRY(1, 1) - ]} - ]} - ), - % Drop four source - ?assertEqual( - add_checkpoint(?ENTRY(10, 200), History), - {[ - {?SNODE, [ - ?ENTRY(10, 200), - ?ENTRY(9, 9), - ?ENTRY(8, 8), - ?ENTRY(1, 1) - ]} - ]} - ), - % Drop for target. Obviously a source_seq of 200 - % will end up droping the 8 entry. - ?assertEqual( - add_checkpoint(?ENTRY(200, 10), History), - {[ - {?SNODE, [ - ?ENTRY(200, 10), - ?ENTRY(9, 9), - ?ENTRY(1, 1) - ]} - ]} - ). - -add_checkpoint_from_other_node_test() -> - History = - {[ - {<<"not_the_source">>, [ - ?ENTRY(12, 13), - ?ENTRY(11, 10), - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]} - ]}, - % No filtering - ?assertEqual( - add_checkpoint(?ENTRY(1, 1), History), - {[ - {?SNODE, [ - ?ENTRY(1, 1) - ]}, - {<<"not_the_source">>, [ - ?ENTRY(12, 13), - ?ENTRY(11, 10), - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]} - ]} - ), - % No dropping - ?assertEqual( - add_checkpoint(?ENTRY(200, 200), History), - {[ - {?SNODE, [ - ?ENTRY(200, 200) - ]}, - {<<"not_the_source">>, [ - ?ENTRY(12, 13), - ?ENTRY(11, 10), - ?ENTRY(10, 9), - ?ENTRY(2, 3) - ]} - ]} - ). - --endif. |