summaryrefslogtreecommitdiff
path: root/src/mem3/src/mem3_rpc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mem3/src/mem3_rpc.erl')
-rw-r--r--src/mem3/src/mem3_rpc.erl797
1 files changed, 0 insertions, 797 deletions
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
deleted file mode 100644
index 468bdee21..000000000
--- a/src/mem3/src/mem3_rpc.erl
+++ /dev/null
@@ -1,797 +0,0 @@
-% Copyright 2013 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(mem3_rpc).
-
--export([
- find_common_seq/4,
- get_missing_revs/4,
- update_docs/4,
- pull_replication/1,
- load_checkpoint/4,
- load_checkpoint/5,
- save_checkpoint/6,
-
- load_purge_infos/4,
- save_purge_checkpoint/4,
- purge_docs/4,
-
- replicate/4
-]).
-
-% Private RPC callbacks
--export([
- find_common_seq_rpc/3,
- load_checkpoint_rpc/3,
- pull_replication_rpc/1,
- load_checkpoint_rpc/4,
- save_checkpoint_rpc/5,
-
- load_purge_infos_rpc/3,
- save_purge_checkpoint_rpc/3,
-
- replicate_rpc/2
-]).
-
--include("mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
--define(BATCH_SIZE, 1000).
--define(REXI_CALL_TIMEOUT_MSEC, 600000).
-
-% "Pull" is a bit of a misnomer here, as what we're actually doing is
-% issuing an RPC request and telling the remote node to push updates to
-% us. This lets us reuse all of the battle-tested machinery of mem3_rpc.
-pull_replication(Seed) ->
- rexi_call(Seed, {mem3_rpc, pull_replication_rpc, [node()]}).
-
-get_missing_revs(Node, DbName, IdsRevs, Options) ->
- rexi_call(Node, {fabric_rpc, get_missing_revs, [DbName, IdsRevs, Options]}).
-
-update_docs(Node, DbName, Docs, Options) ->
- rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
-
-load_checkpoint(Node, DbName, SourceNode, SourceUUID, <<>>) ->
- % Upgrade clause for a mixed cluster for old nodes that don't have
- % load_checkpoint_rpc/4 yet. FilterHash is currently not
- % used and so defaults to <<>> everywhere
- load_checkpoint(Node, DbName, SourceNode, SourceUUID);
-load_checkpoint(Node, DbName, SourceNode, SourceUUID, FilterHash) ->
- Args = [DbName, SourceNode, SourceUUID, FilterHash],
- rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}).
-
-load_checkpoint(Node, DbName, SourceNode, SourceUUID) ->
- Args = [DbName, SourceNode, SourceUUID],
- rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}).
-
-save_checkpoint(Node, DbName, DocId, Seq, Entry, History) ->
- Args = [DbName, DocId, Seq, Entry, History],
- rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}).
-
-find_common_seq(Node, DbName, SourceUUID, SourceEpochs) ->
- Args = [DbName, SourceUUID, SourceEpochs],
- rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}).
-
-load_purge_infos(Node, DbName, SourceUUID, Count) ->
- Args = [DbName, SourceUUID, Count],
- rexi_call(Node, {mem3_rpc, load_purge_infos_rpc, Args}).
-
-save_purge_checkpoint(Node, DbName, PurgeDocId, Body) ->
- Args = [DbName, PurgeDocId, Body],
- rexi_call(Node, {mem3_rpc, save_purge_checkpoint_rpc, Args}).
-
-purge_docs(Node, DbName, PurgeInfos, Options) ->
- rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PurgeInfos, Options]}).
-
-replicate(Source, Target, DbName, Timeout) when
- is_atom(Source), is_atom(Target), is_binary(DbName)
-->
- Args = [DbName, Target],
- rexi_call(Source, {mem3_rpc, replicate_rpc, Args}, Timeout).
-
-load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
- load_checkpoint_rpc(DbName, SourceNode, SourceUUID, <<>>).
-
-load_checkpoint_rpc(DbName, SourceNode, SourceUUID, FilterHash) ->
- erlang:put(io_priority, {internal_repl, DbName}),
- case get_or_create_db(DbName, [?ADMIN_CTX]) of
- {ok, Db} ->
- TargetUUID = couch_db:get_uuid(Db),
- NewId = mem3_rep:make_local_id(SourceUUID, TargetUUID, FilterHash),
- case couch_db:open_doc(Db, NewId, []) of
- {ok, Doc} ->
- rexi:reply({ok, {NewId, Doc}});
- {not_found, _} ->
- OldId = mem3_rep:make_local_id(SourceNode, node()),
- case couch_db:open_doc(Db, OldId, []) of
- {ok, Doc} ->
- rexi:reply({ok, {NewId, Doc}});
- {not_found, _} ->
- rexi:reply({ok, {NewId, #doc{id = NewId}}})
- end
- end;
- Error ->
- rexi:reply(Error)
- end.
-
-save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
- erlang:put(io_priority, {internal_repl, DbName}),
- case get_or_create_db(DbName, [?ADMIN_CTX]) of
- {ok, Db} ->
- NewEntry = {
- [
- {<<"target_node">>, atom_to_binary(node(), utf8)},
- {<<"target_uuid">>, couch_db:get_uuid(Db)},
- {<<"target_seq">>, couch_db:get_update_seq(Db)}
- ] ++ NewEntry0
- },
- Body =
- {[
- {<<"seq">>, SourceSeq},
- {<<"target_uuid">>, couch_db:get_uuid(Db)},
- {<<"history">>, add_checkpoint(NewEntry, History0)}
- ]},
- Doc = #doc{id = Id, body = Body},
- rexi:reply(
- try couch_db:update_doc(Db, Doc, []) of
- {ok, _} ->
- {ok, Body};
- Else ->
- {error, Else}
- catch
- Exception ->
- Exception;
- error:Reason ->
- {error, Reason}
- end
- );
- Error ->
- rexi:reply(Error)
- end.
-
-find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
- erlang:put(io_priority, {internal_repl, DbName}),
- case get_or_create_db(DbName, [?ADMIN_CTX]) of
- {ok, Db} ->
- case couch_db:get_uuid(Db) of
- SourceUUID ->
- TargetEpochs = couch_db:get_epochs(Db),
- Seq = compare_epochs(SourceEpochs, TargetEpochs),
- rexi:reply({ok, Seq});
- _Else ->
- rexi:reply({ok, 0})
- end;
- Error ->
- rexi:reply(Error)
- end.
-
-pull_replication_rpc(Target) ->
- Dbs = mem3_sync:local_dbs(),
- Opts = [{batch_size, 1000}, {batch_count, 50}],
- Repl = fun(Db) -> {Db, mem3_rep:go(Db, Target, Opts)} end,
- rexi:reply({ok, lists:map(Repl, Dbs)}).
-
-load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
- erlang:put(io_priority, {internal_repl, DbName}),
- case get_or_create_db(DbName, [?ADMIN_CTX]) of
- {ok, Db} ->
- TgtUUID = couch_db:get_uuid(Db),
- PurgeDocId = mem3_rep:make_purge_id(SrcUUID, TgtUUID),
- StartSeq =
- case couch_db:open_doc(Db, PurgeDocId, []) of
- {ok, #doc{body = {Props}}} ->
- couch_util:get_value(<<"purge_seq">>, Props);
- {not_found, _} ->
- Oldest = couch_db:get_oldest_purge_seq(Db),
- erlang:max(0, Oldest - 1)
- end,
- FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
- NewCount = Count + length(Revs),
- NewInfos = [{UUID, Id, Revs} | Infos],
- Status =
- if
- NewCount < BatchSize -> ok;
- true -> stop
- end,
- {Status, {NewCount, NewInfos, PSeq}}
- end,
- InitAcc = {0, [], StartSeq},
- {ok, {_, PurgeInfos, ThroughSeq}} =
- couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
- PurgeSeq = couch_db:get_purge_seq(Db),
- Remaining = PurgeSeq - ThroughSeq,
- rexi:reply({ok, {PurgeDocId, PurgeInfos, ThroughSeq, Remaining}});
- Else ->
- rexi:reply(Else)
- end.
-
-save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) ->
- erlang:put(io_priority, {internal_repl, DbName}),
- case get_or_create_db(DbName, [?ADMIN_CTX]) of
- {ok, Db} ->
- Doc = #doc{id = PurgeDocId, body = Body},
- Resp =
- try couch_db:update_doc(Db, Doc, []) of
- Resp0 -> Resp0
- catch
- T:R ->
- {T, R}
- end,
- rexi:reply(Resp);
- Error ->
- rexi:reply(Error)
- end.
-
-replicate_rpc(DbName, Target) ->
- rexi:reply(
- try
- Opts = [{batch_size, ?BATCH_SIZE}, {batch_count, all}],
- {ok, mem3_rep:go(DbName, Target, Opts)}
- catch
- Tag:Error ->
- {Tag, Error}
- end
- ).
-
-%% @doc Return the sequence where two files with the same UUID diverged.
-compare_epochs(SourceEpochs, TargetEpochs) ->
- compare_rev_epochs(
- lists:reverse(SourceEpochs),
- lists:reverse(TargetEpochs)
- ).
-
-compare_rev_epochs([{Node, Seq} | SourceRest], [{Node, Seq} | TargetRest]) ->
- % Common history, fast-forward
- compare_epochs(SourceRest, TargetRest);
-compare_rev_epochs([], [{_, TargetSeq} | _]) ->
- % Source has not moved, start from seq just before the target took over
- TargetSeq - 1;
-compare_rev_epochs([{_, SourceSeq} | _], []) ->
- % Target has not moved, start from seq where source diverged
- SourceSeq;
-compare_rev_epochs([{_, SourceSeq} | _], [{_, TargetSeq} | _]) ->
- % The source was moved to a new location independently, take the minimum
- erlang:min(SourceSeq, TargetSeq) - 1.
-
-%% @doc This adds a new update sequence checkpoint to the replication
-%% history. Checkpoints are keyed by the source node so that we
-%% aren't mixing history between source shard moves.
-add_checkpoint({Props}, {History}) ->
- % Extract the source and target seqs for reference
- SourceSeq = couch_util:get_value(<<"source_seq">>, Props),
- TargetSeq = couch_util:get_value(<<"target_seq">>, Props),
-
- % Get the history relevant to the source node.
- SourceNode = couch_util:get_value(<<"source_node">>, Props),
- SourceHistory = couch_util:get_value(SourceNode, History, []),
-
- % If either the source or target shard has been truncated
- % we need to filter out any history that was stored for
- % any larger update seq than we're currently recording.
- FilteredHistory = filter_history(SourceSeq, TargetSeq, SourceHistory),
-
- % Re-bucket our history based on the most recent source
- % sequence. This is where we drop old checkpoints to
- % maintain the exponential distribution.
- {_, RebucketedHistory} = rebucket(FilteredHistory, SourceSeq, 0),
- NewSourceHistory = [{Props} | RebucketedHistory],
-
- % Finally update the source node history and we're done.
- NodeRemoved = lists:keydelete(SourceNode, 1, History),
- {[{SourceNode, NewSourceHistory} | NodeRemoved]}.
-
-filter_history(SourceSeqThresh, TargetSeqThresh, History) ->
- SourceFilter = fun({Entry}) ->
- SourceSeq = couch_util:get_value(<<"source_seq">>, Entry),
- SourceSeq < SourceSeqThresh
- end,
- TargetFilter = fun({Entry}) ->
- TargetSeq = couch_util:get_value(<<"target_seq">>, Entry),
- TargetSeq < TargetSeqThresh
- end,
- SourceFiltered = lists:filter(SourceFilter, History),
- lists:filter(TargetFilter, SourceFiltered).
-
-%% @doc This function adjusts our history to maintain a
-%% history of checkpoints that follow an exponentially
-%% increasing age from the most recent checkpoint.
-%%
-%% The terms newest and oldest used in these comments
-%% refers to the (NewSeq - CurSeq) difference where smaller
-%% values are considered newer.
-%%
-%% It works by assigning each entry to a bucket and keeping
-%% the newest and oldest entry in each bucket. Keeping
-%% both the newest and oldest means that we won't end up
-%% with empty buckets as checkpoints are promoted to new
-%% buckets.
-%%
-%% The return value of this function is a two-tuple of the
-%% form `{BucketId, History}` where BucketId is the id of
-%% the bucket for the first entry in History. This is used
-%% when recursing to detect the oldest value in a given
-%% bucket.
-%%
-%% This function expects the provided history to be sorted
-%% in descending order of source_seq values.
-rebucket([], _NewSeq, Bucket) ->
- {Bucket + 1, []};
-rebucket([{Entry} | RestHistory], NewSeq, Bucket) ->
- CurSeq = couch_util:get_value(<<"source_seq">>, Entry),
- case find_bucket(NewSeq, CurSeq, Bucket) of
- Bucket ->
- % This entry is in an existing bucket which means
- % we will only keep it if its the oldest value
- % in the bucket. To detect this we rebucket the
- % rest of the list and only include Entry if the
- % rest of the list is in a bigger bucket.
- case rebucket(RestHistory, NewSeq, Bucket) of
- {Bucket, NewHistory} ->
- % There's another entry in this bucket so we drop the
- % current entry.
- {Bucket, NewHistory};
- {NextBucket, NewHistory} when NextBucket > Bucket ->
- % The rest of the history was rebucketed into a larger
- % bucket so this is the oldest entry in the current
- % bucket.
- {Bucket, [{Entry} | NewHistory]}
- end;
- NextBucket when NextBucket > Bucket ->
- % This entry is the newest in NextBucket so we add it
- % to our history and continue rebucketing.
- {_, NewHistory} = rebucket(RestHistory, NewSeq, NextBucket),
- {NextBucket, [{Entry} | NewHistory]}
- end.
-
-%% @doc Find the bucket id for the given sequence pair.
-find_bucket(NewSeq, CurSeq, Bucket) ->
- % The +1 constant in this comparison is a bit subtle. The
- % reason for it is to make sure that the first entry in
- % the history is guaranteed to have a BucketId of 1. This
- % also relies on never having a duplicated update
- % sequence so adding 1 here guarantees a difference >= 2.
- if
- (NewSeq - CurSeq + 1) > (2 bsl Bucket) ->
- find_bucket(NewSeq, CurSeq, Bucket + 1);
- true ->
- Bucket
- end.
-
-rexi_call(Node, MFA) ->
- rexi_call(Node, MFA, ?REXI_CALL_TIMEOUT_MSEC).
-
-rexi_call(Node, MFA, Timeout) ->
- Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
- Ref = rexi:cast(Node, self(), MFA, [sync]),
- try
- receive
- {Ref, {ok, Reply}} ->
- Reply;
- {Ref, Error} ->
- erlang:error(Error);
- {rexi_DOWN, Mon, _, Reason} ->
- erlang:error({rexi_DOWN, {Node, Reason}})
- after Timeout ->
- erlang:error(timeout)
- end
- after
- rexi_monitor:stop(Mon)
- end.
-
-get_or_create_db(DbName, Options) ->
- mem3_util:get_or_create_db_int(DbName, Options).
-
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
--define(SNODE, <<"src@localhost">>).
--define(TNODE, <<"tgt@localhost">>).
--define(SNODE_KV, {<<"source_node">>, ?SNODE}).
--define(TNODE_KV, {<<"target_node">>, ?TNODE}).
--define(SSEQ, <<"source_seq">>).
--define(TSEQ, <<"target_seq">>).
--define(ENTRY(S, T), {[?SNODE_KV, {?SSEQ, S}, ?TNODE_KV, {?TSEQ, T}]}).
-
-filter_history_data() ->
- [
- ?ENTRY(13, 15),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ].
-
-filter_history_remove_none_test() ->
- ?assertEqual(filter_history(20, 20, filter_history_data()), [
- ?ENTRY(13, 15),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]).
-
-filter_history_remove_all_test() ->
- ?assertEqual(filter_history(1, 1, filter_history_data()), []).
-
-filter_history_remove_equal_test() ->
- ?assertEqual(filter_history(10, 10, filter_history_data()), [
- ?ENTRY(2, 3)
- ]),
- ?assertEqual(filter_history(11, 9, filter_history_data()), [
- ?ENTRY(2, 3)
- ]).
-
-filter_history_remove_for_source_and_target_test() ->
- ?assertEqual(filter_history(11, 20, filter_history_data()), [
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]),
- ?assertEqual(filter_history(14, 14, filter_history_data()), [
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]).
-
-filter_history_remove_for_both_test() ->
- ?assertEqual(filter_history(11, 11, filter_history_data()), [
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]).
-
-filter_history_remove_for_both_again_test() ->
- ?assertEqual(filter_history(3, 4, filter_history_data()), [
- ?ENTRY(2, 3)
- ]).
-
-add_first_checkpoint_test() ->
- History = {[]},
- ?assertEqual(
- add_checkpoint(?ENTRY(2, 3), History),
- {[
- {?SNODE, [
- ?ENTRY(2, 3)
- ]}
- ]}
- ).
-
-add_first_checkpoint_to_empty_test() ->
- History = {[{?SNODE, []}]},
- ?assertEqual(
- add_checkpoint(?ENTRY(2, 3), History),
- {[
- {?SNODE, [
- ?ENTRY(2, 3)
- ]}
- ]}
- ).
-
-add_second_checkpoint_test() ->
- History = {[{?SNODE, [?ENTRY(2, 3)]}]},
- ?assertEqual(
- add_checkpoint(?ENTRY(10, 9), History),
- {[
- {?SNODE, [
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]}
- ]}
- ).
-
-add_third_checkpoint_test() ->
- History =
- {[
- {?SNODE, [
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]}
- ]},
- ?assertEqual(
- add_checkpoint(?ENTRY(11, 10), History),
- {[
- {?SNODE, [
- ?ENTRY(11, 10),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]}
- ]}
- ).
-
-add_fourth_checkpoint_test() ->
- History =
- {[
- {?SNODE, [
- ?ENTRY(11, 10),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]}
- ]},
- ?assertEqual(
- add_checkpoint(?ENTRY(12, 13), History),
- {[
- {?SNODE, [
- ?ENTRY(12, 13),
- ?ENTRY(11, 10),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]}
- ]}
- ).
-
-add_checkpoint_with_replacement_test() ->
- History =
- {[
- {?SNODE, [
- ?ENTRY(12, 13),
- ?ENTRY(11, 10),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]}
- ]},
- % Picking a source_seq of 16 to force 10, 11, and 12
- % into the same bucket to show we drop the 11 entry.
- ?assertEqual(
- add_checkpoint(?ENTRY(16, 16), History),
- {[
- {?SNODE, [
- ?ENTRY(16, 16),
- ?ENTRY(12, 13),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]}
- ]}
- ).
-
-add_checkpoint_drops_redundant_checkpoints_test() ->
- % I've added comments showing the bucket ID based
- % on the ?ENTRY passed to add_checkpoint
- History =
- {[
- {?SNODE, [
- % Bucket 0
- ?ENTRY(15, 15),
- % Bucket 1
- ?ENTRY(14, 14),
- % Bucket 1
- ?ENTRY(13, 13),
- % Bucket 2
- ?ENTRY(12, 12),
- % Bucket 2
- ?ENTRY(11, 11),
- % Bucket 2
- ?ENTRY(10, 10),
- % Bucket 2
- ?ENTRY(9, 9),
- % Bucket 3
- ?ENTRY(8, 8),
- % Bucket 3
- ?ENTRY(7, 7),
- % Bucket 3
- ?ENTRY(6, 6),
- % Bucket 3
- ?ENTRY(5, 5),
- % Bucket 3
- ?ENTRY(4, 4),
- % Bucket 3
- ?ENTRY(3, 3),
- % Bucket 3
- ?ENTRY(2, 2),
- % Bucket 3
- ?ENTRY(1, 1)
- ]}
- ]},
- ?assertEqual(
- add_checkpoint(?ENTRY(16, 16), History),
- {[
- {?SNODE, [
- % Bucket 0
- ?ENTRY(16, 16),
- % Bucket 0
- ?ENTRY(15, 15),
- % Bucket 1
- ?ENTRY(14, 14),
- % Bucket 1
- ?ENTRY(13, 13),
- % Bucket 2
- ?ENTRY(12, 12),
- % Bucket 2
- ?ENTRY(9, 9),
- % Bucket 3
- ?ENTRY(8, 8),
- % Bucket 3
- ?ENTRY(1, 1)
- ]}
- ]}
- ).
-
-add_checkpoint_show_not_always_a_drop_test() ->
- % Depending on the edge conditions of buckets we
- % may not always drop values when adding new
- % checkpoints. In this case 12 stays because there's
- % no longer a value for 10 or 11.
- %
- % I've added comments showing the bucket ID based
- % on the ?ENTRY passed to add_checkpoint
- History =
- {[
- {?SNODE, [
- % Bucket 0
- ?ENTRY(16, 16),
- % Bucket 1
- ?ENTRY(15, 15),
- % Bucket 1
- ?ENTRY(14, 14),
- % Bucket 2
- ?ENTRY(13, 13),
- % Bucket 2
- ?ENTRY(12, 12),
- % Bucket 3
- ?ENTRY(9, 9),
- % Bucket 3
- ?ENTRY(8, 8),
- % Bucket 4
- ?ENTRY(1, 1)
- ]}
- ]},
- ?assertEqual(
- add_checkpoint(?ENTRY(17, 17), History),
- {[
- {?SNODE, [
- % Bucket 0
- ?ENTRY(17, 17),
- % Bucket 0
- ?ENTRY(16, 16),
- % Bucket 1
- ?ENTRY(15, 15),
- % Bucket 1
- ?ENTRY(14, 14),
- % Bucket 2
- ?ENTRY(13, 13),
- % Bucket 2
- ?ENTRY(12, 12),
- % Bucket 3
- ?ENTRY(9, 9),
- % Bucket 3
- ?ENTRY(8, 8),
- % Bucket 4
- ?ENTRY(1, 1)
- ]}
- ]}
- ).
-
-add_checkpoint_big_jump_show_lots_drop_test() ->
- % I've added comments showing the bucket ID based
- % on the ?ENTRY passed to add_checkpoint
- History =
- {[
- {?SNODE, [
- % Bucket 4
- ?ENTRY(16, 16),
- % Bucket 4
- ?ENTRY(15, 15),
- % Bucket 4
- ?ENTRY(14, 14),
- % Bucket 4
- ?ENTRY(13, 13),
- % Bucket 4
- ?ENTRY(12, 12),
- % Bucket 4
- ?ENTRY(9, 9),
- % Bucket 4
- ?ENTRY(8, 8),
- % Bucket 4
- ?ENTRY(1, 1)
- ]}
- ]},
- ?assertEqual(
- add_checkpoint(?ENTRY(32, 32), History),
- {[
- {?SNODE, [
- % Bucket 0
- ?ENTRY(32, 32),
- % Bucket 4
- ?ENTRY(16, 16),
- % Bucket 4
- ?ENTRY(1, 1)
- ]}
- ]}
- ).
-
-add_checkpoint_show_filter_history_test() ->
- History =
- {[
- {?SNODE, [
- ?ENTRY(16, 16),
- ?ENTRY(15, 15),
- ?ENTRY(14, 14),
- ?ENTRY(13, 13),
- ?ENTRY(12, 12),
- ?ENTRY(9, 9),
- ?ENTRY(8, 8),
- ?ENTRY(1, 1)
- ]}
- ]},
- % Drop for both
- ?assertEqual(
- add_checkpoint(?ENTRY(10, 10), History),
- {[
- {?SNODE, [
- ?ENTRY(10, 10),
- ?ENTRY(9, 9),
- ?ENTRY(8, 8),
- ?ENTRY(1, 1)
- ]}
- ]}
- ),
- % Drop four source
- ?assertEqual(
- add_checkpoint(?ENTRY(10, 200), History),
- {[
- {?SNODE, [
- ?ENTRY(10, 200),
- ?ENTRY(9, 9),
- ?ENTRY(8, 8),
- ?ENTRY(1, 1)
- ]}
- ]}
- ),
- % Drop for target. Obviously a source_seq of 200
- % will end up droping the 8 entry.
- ?assertEqual(
- add_checkpoint(?ENTRY(200, 10), History),
- {[
- {?SNODE, [
- ?ENTRY(200, 10),
- ?ENTRY(9, 9),
- ?ENTRY(1, 1)
- ]}
- ]}
- ).
-
-add_checkpoint_from_other_node_test() ->
- History =
- {[
- {<<"not_the_source">>, [
- ?ENTRY(12, 13),
- ?ENTRY(11, 10),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]}
- ]},
- % No filtering
- ?assertEqual(
- add_checkpoint(?ENTRY(1, 1), History),
- {[
- {?SNODE, [
- ?ENTRY(1, 1)
- ]},
- {<<"not_the_source">>, [
- ?ENTRY(12, 13),
- ?ENTRY(11, 10),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]}
- ]}
- ),
- % No dropping
- ?assertEqual(
- add_checkpoint(?ENTRY(200, 200), History),
- {[
- {?SNODE, [
- ?ENTRY(200, 200)
- ]},
- {<<"not_the_source">>, [
- ?ENTRY(12, 13),
- ?ENTRY(11, 10),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]}
- ]}
- ).
-
--endif.