summaryrefslogtreecommitdiff
path: root/src/mem3/src/mem3_rpc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mem3/src/mem3_rpc.erl')
-rw-r--r--src/mem3/src/mem3_rpc.erl688
1 files changed, 387 insertions, 301 deletions
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index 9e0f42a8e..468bdee21 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -14,7 +14,6 @@
-module(mem3_rpc).
-
-export([
find_common_seq/4,
get_missing_revs/4,
@@ -43,18 +42,14 @@
save_purge_checkpoint_rpc/3,
replicate_rpc/2
-
]).
-
-include("mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-
-define(BATCH_SIZE, 1000).
-define(REXI_CALL_TIMEOUT_MSEC, 600000).
-
% "Pull" is a bit of a misnomer here, as what we're actually doing is
% issuing an RPC request and telling the remote node to push updates to
% us. This lets us reuse all of the battle-tested machinery of mem3_rpc.
@@ -64,7 +59,6 @@ pull_replication(Seed) ->
get_missing_revs(Node, DbName, IdsRevs, Options) ->
rexi_call(Node, {fabric_rpc, get_missing_revs, [DbName, IdsRevs, Options]}).
-
update_docs(Node, DbName, Docs, Options) ->
rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
@@ -77,95 +71,91 @@ load_checkpoint(Node, DbName, SourceNode, SourceUUID, FilterHash) ->
Args = [DbName, SourceNode, SourceUUID, FilterHash],
rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}).
-
load_checkpoint(Node, DbName, SourceNode, SourceUUID) ->
Args = [DbName, SourceNode, SourceUUID],
rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}).
-
save_checkpoint(Node, DbName, DocId, Seq, Entry, History) ->
Args = [DbName, DocId, Seq, Entry, History],
rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}).
-
find_common_seq(Node, DbName, SourceUUID, SourceEpochs) ->
Args = [DbName, SourceUUID, SourceEpochs],
rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}).
-
load_purge_infos(Node, DbName, SourceUUID, Count) ->
Args = [DbName, SourceUUID, Count],
rexi_call(Node, {mem3_rpc, load_purge_infos_rpc, Args}).
-
save_purge_checkpoint(Node, DbName, PurgeDocId, Body) ->
Args = [DbName, PurgeDocId, Body],
rexi_call(Node, {mem3_rpc, save_purge_checkpoint_rpc, Args}).
-
purge_docs(Node, DbName, PurgeInfos, Options) ->
rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PurgeInfos, Options]}).
-
-replicate(Source, Target, DbName, Timeout)
- when is_atom(Source), is_atom(Target), is_binary(DbName) ->
+replicate(Source, Target, DbName, Timeout) when
+ is_atom(Source), is_atom(Target), is_binary(DbName)
+->
Args = [DbName, Target],
rexi_call(Source, {mem3_rpc, replicate_rpc, Args}, Timeout).
-
load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
load_checkpoint_rpc(DbName, SourceNode, SourceUUID, <<>>).
-
load_checkpoint_rpc(DbName, SourceNode, SourceUUID, FilterHash) ->
erlang:put(io_priority, {internal_repl, DbName}),
case get_or_create_db(DbName, [?ADMIN_CTX]) of
- {ok, Db} ->
- TargetUUID = couch_db:get_uuid(Db),
- NewId = mem3_rep:make_local_id(SourceUUID, TargetUUID, FilterHash),
- case couch_db:open_doc(Db, NewId, []) of
- {ok, Doc} ->
- rexi:reply({ok, {NewId, Doc}});
- {not_found, _} ->
- OldId = mem3_rep:make_local_id(SourceNode, node()),
- case couch_db:open_doc(Db, OldId, []) of
- {ok, Doc} ->
- rexi:reply({ok, {NewId, Doc}});
- {not_found, _} ->
- rexi:reply({ok, {NewId, #doc{id = NewId}}})
- end
- end;
- Error ->
- rexi:reply(Error)
+ {ok, Db} ->
+ TargetUUID = couch_db:get_uuid(Db),
+ NewId = mem3_rep:make_local_id(SourceUUID, TargetUUID, FilterHash),
+ case couch_db:open_doc(Db, NewId, []) of
+ {ok, Doc} ->
+ rexi:reply({ok, {NewId, Doc}});
+ {not_found, _} ->
+ OldId = mem3_rep:make_local_id(SourceNode, node()),
+ case couch_db:open_doc(Db, OldId, []) of
+ {ok, Doc} ->
+ rexi:reply({ok, {NewId, Doc}});
+ {not_found, _} ->
+ rexi:reply({ok, {NewId, #doc{id = NewId}}})
+ end
+ end;
+ Error ->
+ rexi:reply(Error)
end.
-
save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
erlang:put(io_priority, {internal_repl, DbName}),
case get_or_create_db(DbName, [?ADMIN_CTX]) of
{ok, Db} ->
- NewEntry = {[
- {<<"target_node">>, atom_to_binary(node(), utf8)},
- {<<"target_uuid">>, couch_db:get_uuid(Db)},
- {<<"target_seq">>, couch_db:get_update_seq(Db)}
- ] ++ NewEntry0},
- Body = {[
- {<<"seq">>, SourceSeq},
- {<<"target_uuid">>, couch_db:get_uuid(Db)},
- {<<"history">>, add_checkpoint(NewEntry, History0)}
- ]},
+ NewEntry = {
+ [
+ {<<"target_node">>, atom_to_binary(node(), utf8)},
+ {<<"target_uuid">>, couch_db:get_uuid(Db)},
+ {<<"target_seq">>, couch_db:get_update_seq(Db)}
+ ] ++ NewEntry0
+ },
+ Body =
+ {[
+ {<<"seq">>, SourceSeq},
+ {<<"target_uuid">>, couch_db:get_uuid(Db)},
+ {<<"history">>, add_checkpoint(NewEntry, History0)}
+ ]},
Doc = #doc{id = Id, body = Body},
- rexi:reply(try couch_db:update_doc(Db, Doc, []) of
- {ok, _} ->
- {ok, Body};
- Else ->
- {error, Else}
- catch
- Exception ->
- Exception;
- error:Reason ->
- {error, Reason}
- end);
+ rexi:reply(
+ try couch_db:update_doc(Db, Doc, []) of
+ {ok, _} ->
+ {ok, Body};
+ Else ->
+ {error, Else}
+ catch
+ Exception ->
+ Exception;
+ error:Reason ->
+ {error, Reason}
+ end
+ );
Error ->
rexi:reply(Error)
end.
@@ -173,17 +163,17 @@ save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
erlang:put(io_priority, {internal_repl, DbName}),
case get_or_create_db(DbName, [?ADMIN_CTX]) of
- {ok, Db} ->
- case couch_db:get_uuid(Db) of
- SourceUUID ->
- TargetEpochs = couch_db:get_epochs(Db),
- Seq = compare_epochs(SourceEpochs, TargetEpochs),
- rexi:reply({ok, Seq});
- _Else ->
- rexi:reply({ok, 0})
- end;
- Error ->
- rexi:reply(Error)
+ {ok, Db} ->
+ case couch_db:get_uuid(Db) of
+ SourceUUID ->
+ TargetEpochs = couch_db:get_epochs(Db),
+ Seq = compare_epochs(SourceEpochs, TargetEpochs),
+ rexi:reply({ok, Seq});
+ _Else ->
+ rexi:reply({ok, 0})
+ end;
+ Error ->
+ rexi:reply(Error)
end.
pull_replication_rpc(Target) ->
@@ -192,29 +182,33 @@ pull_replication_rpc(Target) ->
Repl = fun(Db) -> {Db, mem3_rep:go(Db, Target, Opts)} end,
rexi:reply({ok, lists:map(Repl, Dbs)}).
-
load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
erlang:put(io_priority, {internal_repl, DbName}),
case get_or_create_db(DbName, [?ADMIN_CTX]) of
{ok, Db} ->
TgtUUID = couch_db:get_uuid(Db),
PurgeDocId = mem3_rep:make_purge_id(SrcUUID, TgtUUID),
- StartSeq = case couch_db:open_doc(Db, PurgeDocId, []) of
- {ok, #doc{body = {Props}}} ->
- couch_util:get_value(<<"purge_seq">>, Props);
- {not_found, _} ->
- Oldest = couch_db:get_oldest_purge_seq(Db),
- erlang:max(0, Oldest - 1)
- end,
+ StartSeq =
+ case couch_db:open_doc(Db, PurgeDocId, []) of
+ {ok, #doc{body = {Props}}} ->
+ couch_util:get_value(<<"purge_seq">>, Props);
+ {not_found, _} ->
+ Oldest = couch_db:get_oldest_purge_seq(Db),
+ erlang:max(0, Oldest - 1)
+ end,
FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
NewCount = Count + length(Revs),
NewInfos = [{UUID, Id, Revs} | Infos],
- Status = if NewCount < BatchSize -> ok; true -> stop end,
+ Status =
+ if
+ NewCount < BatchSize -> ok;
+ true -> stop
+ end,
{Status, {NewCount, NewInfos, PSeq}}
end,
InitAcc = {0, [], StartSeq},
{ok, {_, PurgeInfos, ThroughSeq}} =
- couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
+ couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
PurgeSeq = couch_db:get_purge_seq(Db),
Remaining = PurgeSeq - ThroughSeq,
rexi:reply({ok, {PurgeDocId, PurgeInfos, ThroughSeq, Remaining}});
@@ -222,32 +216,33 @@ load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
rexi:reply(Else)
end.
-
save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) ->
erlang:put(io_priority, {internal_repl, DbName}),
case get_or_create_db(DbName, [?ADMIN_CTX]) of
{ok, Db} ->
Doc = #doc{id = PurgeDocId, body = Body},
- Resp = try couch_db:update_doc(Db, Doc, []) of
- Resp0 -> Resp0
- catch T:R ->
- {T, R}
- end,
+ Resp =
+ try couch_db:update_doc(Db, Doc, []) of
+ Resp0 -> Resp0
+ catch
+ T:R ->
+ {T, R}
+ end,
rexi:reply(Resp);
Error ->
rexi:reply(Error)
end.
-
replicate_rpc(DbName, Target) ->
- rexi:reply(try
- Opts = [{batch_size, ?BATCH_SIZE}, {batch_count, all}],
- {ok, mem3_rep:go(DbName, Target, Opts)}
- catch
- Tag:Error ->
- {Tag, Error}
- end).
-
+ rexi:reply(
+ try
+ Opts = [{batch_size, ?BATCH_SIZE}, {batch_count, all}],
+ {ok, mem3_rep:go(DbName, Target, Opts)}
+ catch
+ Tag:Error ->
+ {Tag, Error}
+ end
+ ).
%% @doc Return the sequence where two files with the same UUID diverged.
compare_epochs(SourceEpochs, TargetEpochs) ->
@@ -256,7 +251,6 @@ compare_epochs(SourceEpochs, TargetEpochs) ->
lists:reverse(TargetEpochs)
).
-
compare_rev_epochs([{Node, Seq} | SourceRest], [{Node, Seq} | TargetRest]) ->
% Common history, fast-forward
compare_epochs(SourceRest, TargetRest);
@@ -270,7 +264,6 @@ compare_rev_epochs([{_, SourceSeq} | _], [{_, TargetSeq} | _]) ->
% The source was moved to a new location independently, take the minimum
erlang:min(SourceSeq, TargetSeq) - 1.
-
%% @doc This adds a new update sequence checkpoint to the replication
%% history. Checkpoints are keyed by the source node so that we
%% aren't mixing history between source shard moves.
@@ -298,7 +291,6 @@ add_checkpoint({Props}, {History}) ->
NodeRemoved = lists:keydelete(SourceNode, 1, History),
{[{SourceNode, NewSourceHistory} | NodeRemoved]}.
-
filter_history(SourceSeqThresh, TargetSeqThresh, History) ->
SourceFilter = fun({Entry}) ->
SourceSeq = couch_util:get_value(<<"source_seq">>, Entry),
@@ -311,7 +303,6 @@ filter_history(SourceSeqThresh, TargetSeqThresh, History) ->
SourceFiltered = lists:filter(SourceFilter, History),
lists:filter(TargetFilter, SourceFiltered).
-
%% @doc This function adjusts our history to maintain a
%% history of checkpoints that follow an exponentially
%% increasing age from the most recent checkpoint.
@@ -335,7 +326,7 @@ filter_history(SourceSeqThresh, TargetSeqThresh, History) ->
%% This function expects the provided history to be sorted
%% in descending order of source_seq values.
rebucket([], _NewSeq, Bucket) ->
- {Bucket+1, []};
+ {Bucket + 1, []};
rebucket([{Entry} | RestHistory], NewSeq, Bucket) ->
CurSeq = couch_util:get_value(<<"source_seq">>, Entry),
case find_bucket(NewSeq, CurSeq, Bucket) of
@@ -363,7 +354,6 @@ rebucket([{Entry} | RestHistory], NewSeq, Bucket) ->
{NextBucket, [{Entry} | NewHistory]}
end.
-
%% @doc Find the bucket id for the given sequence pair.
find_bucket(NewSeq, CurSeq, Bucket) ->
% The +1 constant in this comparison is a bit subtle. The
@@ -371,27 +361,27 @@ find_bucket(NewSeq, CurSeq, Bucket) ->
% the history is guaranteed to have a BucketId of 1. This
% also relies on never having a duplicated update
% sequence so adding 1 here guarantees a difference >= 2.
- if (NewSeq - CurSeq + 1) > (2 bsl Bucket) ->
- find_bucket(NewSeq, CurSeq, Bucket+1);
- true ->
- Bucket
+ if
+ (NewSeq - CurSeq + 1) > (2 bsl Bucket) ->
+ find_bucket(NewSeq, CurSeq, Bucket + 1);
+ true ->
+ Bucket
end.
-
rexi_call(Node, MFA) ->
rexi_call(Node, MFA, ?REXI_CALL_TIMEOUT_MSEC).
-
rexi_call(Node, MFA, Timeout) ->
Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
Ref = rexi:cast(Node, self(), MFA, [sync]),
try
- receive {Ref, {ok, Reply}} ->
- Reply;
- {Ref, Error} ->
- erlang:error(Error);
- {rexi_DOWN, Mon, _, Reason} ->
- erlang:error({rexi_DOWN, {Node, Reason}})
+ receive
+ {Ref, {ok, Reply}} ->
+ Reply;
+ {Ref, Error} ->
+ erlang:error(Error);
+ {rexi_DOWN, Mon, _, Reason} ->
+ erlang:error({rexi_DOWN, {Node, Reason}})
after Timeout ->
erlang:error(timeout)
end
@@ -399,15 +389,12 @@ rexi_call(Node, MFA, Timeout) ->
rexi_monitor:stop(Mon)
end.
-
get_or_create_db(DbName, Options) ->
mem3_util:get_or_create_db_int(DbName, Options).
-
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-
-define(SNODE, <<"src@localhost">>).
-define(TNODE, <<"tgt@localhost">>).
-define(SNODE_KV, {<<"source_node">>, ?SNODE}).
@@ -416,7 +403,6 @@ get_or_create_db(DbName, Options) ->
-define(TSEQ, <<"target_seq">>).
-define(ENTRY(S, T), {[?SNODE_KV, {?SSEQ, S}, ?TNODE_KV, {?TSEQ, T}]}).
-
filter_history_data() ->
[
?ENTRY(13, 15),
@@ -424,7 +410,6 @@ filter_history_data() ->
?ENTRY(2, 3)
].
-
filter_history_remove_none_test() ->
?assertEqual(filter_history(20, 20, filter_history_data()), [
?ENTRY(13, 15),
@@ -432,11 +417,9 @@ filter_history_remove_none_test() ->
?ENTRY(2, 3)
]).
-
filter_history_remove_all_test() ->
?assertEqual(filter_history(1, 1, filter_history_data()), []).
-
filter_history_remove_equal_test() ->
?assertEqual(filter_history(10, 10, filter_history_data()), [
?ENTRY(2, 3)
@@ -445,7 +428,6 @@ filter_history_remove_equal_test() ->
?ENTRY(2, 3)
]).
-
filter_history_remove_for_source_and_target_test() ->
?assertEqual(filter_history(11, 20, filter_history_data()), [
?ENTRY(10, 9),
@@ -456,129 +438,176 @@ filter_history_remove_for_source_and_target_test() ->
?ENTRY(2, 3)
]).
-
filter_history_remove_for_both_test() ->
?assertEqual(filter_history(11, 11, filter_history_data()), [
?ENTRY(10, 9),
?ENTRY(2, 3)
]).
-
filter_history_remove_for_both_again_test() ->
?assertEqual(filter_history(3, 4, filter_history_data()), [
?ENTRY(2, 3)
]).
-
add_first_checkpoint_test() ->
History = {[]},
- ?assertEqual(add_checkpoint(?ENTRY(2, 3), History), {[
- {?SNODE, [
- ?ENTRY(2, 3)
+ ?assertEqual(
+ add_checkpoint(?ENTRY(2, 3), History),
+ {[
+ {?SNODE, [
+ ?ENTRY(2, 3)
+ ]}
]}
- ]}).
-
+ ).
add_first_checkpoint_to_empty_test() ->
History = {[{?SNODE, []}]},
- ?assertEqual(add_checkpoint(?ENTRY(2, 3), History), {[
- {?SNODE, [
- ?ENTRY(2, 3)
+ ?assertEqual(
+ add_checkpoint(?ENTRY(2, 3), History),
+ {[
+ {?SNODE, [
+ ?ENTRY(2, 3)
+ ]}
]}
- ]}).
-
+ ).
add_second_checkpoint_test() ->
History = {[{?SNODE, [?ENTRY(2, 3)]}]},
- ?assertEqual(add_checkpoint(?ENTRY(10, 9), History), {[
- {?SNODE, [
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
+ ?assertEqual(
+ add_checkpoint(?ENTRY(10, 9), History),
+ {[
+ {?SNODE, [
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
]}
- ]}).
-
+ ).
add_third_checkpoint_test() ->
- History = {[{?SNODE, [
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]}]},
- ?assertEqual(add_checkpoint(?ENTRY(11, 10), History), {[
- {?SNODE, [
- ?ENTRY(11, 10),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
+ History =
+ {[
+ {?SNODE, [
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
+ ]},
+ ?assertEqual(
+ add_checkpoint(?ENTRY(11, 10), History),
+ {[
+ {?SNODE, [
+ ?ENTRY(11, 10),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
]}
- ]}).
-
+ ).
add_fourth_checkpoint_test() ->
- History = {[{?SNODE, [
- ?ENTRY(11, 10),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]}]},
- ?assertEqual(add_checkpoint(?ENTRY(12, 13), History), {[
- {?SNODE, [
- ?ENTRY(12, 13),
- ?ENTRY(11, 10),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
+ History =
+ {[
+ {?SNODE, [
+ ?ENTRY(11, 10),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
+ ]},
+ ?assertEqual(
+ add_checkpoint(?ENTRY(12, 13), History),
+ {[
+ {?SNODE, [
+ ?ENTRY(12, 13),
+ ?ENTRY(11, 10),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
]}
- ]}).
-
+ ).
add_checkpoint_with_replacement_test() ->
- History = {[{?SNODE, [
- ?ENTRY(12, 13),
- ?ENTRY(11, 10),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]}]},
+ History =
+ {[
+ {?SNODE, [
+ ?ENTRY(12, 13),
+ ?ENTRY(11, 10),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
+ ]},
% Picking a source_seq of 16 to force 10, 11, and 12
% into the same bucket to show we drop the 11 entry.
- ?assertEqual(add_checkpoint(?ENTRY(16, 16), History), {[
- {?SNODE, [
- ?ENTRY(16, 16),
- ?ENTRY(12, 13),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
+ ?assertEqual(
+ add_checkpoint(?ENTRY(16, 16), History),
+ {[
+ {?SNODE, [
+ ?ENTRY(16, 16),
+ ?ENTRY(12, 13),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
]}
- ]}).
+ ).
add_checkpoint_drops_redundant_checkpoints_test() ->
% I've added comments showing the bucket ID based
% on the ?ENTRY passed to add_checkpoint
- History = {[{?SNODE, [
- ?ENTRY(15, 15), % Bucket 0
- ?ENTRY(14, 14), % Bucket 1
- ?ENTRY(13, 13), % Bucket 1
- ?ENTRY(12, 12), % Bucket 2
- ?ENTRY(11, 11), % Bucket 2
- ?ENTRY(10, 10), % Bucket 2
- ?ENTRY(9, 9), % Bucket 2
- ?ENTRY(8, 8), % Bucket 3
- ?ENTRY(7, 7), % Bucket 3
- ?ENTRY(6, 6), % Bucket 3
- ?ENTRY(5, 5), % Bucket 3
- ?ENTRY(4, 4), % Bucket 3
- ?ENTRY(3, 3), % Bucket 3
- ?ENTRY(2, 2), % Bucket 3
- ?ENTRY(1, 1) % Bucket 3
- ]}]},
- ?assertEqual(add_checkpoint(?ENTRY(16, 16), History), {[
- {?SNODE, [
- ?ENTRY(16, 16), % Bucket 0
- ?ENTRY(15, 15), % Bucket 0
- ?ENTRY(14, 14), % Bucket 1
- ?ENTRY(13, 13), % Bucket 1
- ?ENTRY(12, 12), % Bucket 2
- ?ENTRY(9, 9), % Bucket 2
- ?ENTRY(8, 8), % Bucket 3
- ?ENTRY(1, 1) % Bucket 3
+ History =
+ {[
+ {?SNODE, [
+ % Bucket 0
+ ?ENTRY(15, 15),
+ % Bucket 1
+ ?ENTRY(14, 14),
+ % Bucket 1
+ ?ENTRY(13, 13),
+ % Bucket 2
+ ?ENTRY(12, 12),
+ % Bucket 2
+ ?ENTRY(11, 11),
+ % Bucket 2
+ ?ENTRY(10, 10),
+ % Bucket 2
+ ?ENTRY(9, 9),
+ % Bucket 3
+ ?ENTRY(8, 8),
+ % Bucket 3
+ ?ENTRY(7, 7),
+ % Bucket 3
+ ?ENTRY(6, 6),
+ % Bucket 3
+ ?ENTRY(5, 5),
+ % Bucket 3
+ ?ENTRY(4, 4),
+ % Bucket 3
+ ?ENTRY(3, 3),
+ % Bucket 3
+ ?ENTRY(2, 2),
+ % Bucket 3
+ ?ENTRY(1, 1)
+ ]}
+ ]},
+ ?assertEqual(
+ add_checkpoint(?ENTRY(16, 16), History),
+ {[
+ {?SNODE, [
+ % Bucket 0
+ ?ENTRY(16, 16),
+ % Bucket 0
+ ?ENTRY(15, 15),
+ % Bucket 1
+ ?ENTRY(14, 14),
+ % Bucket 1
+ ?ENTRY(13, 13),
+ % Bucket 2
+ ?ENTRY(12, 12),
+ % Bucket 2
+ ?ENTRY(9, 9),
+ % Bucket 3
+ ?ENTRY(8, 8),
+ % Bucket 3
+ ?ENTRY(1, 1)
+ ]}
]}
- ]}).
-
+ ).
add_checkpoint_show_not_always_a_drop_test() ->
% Depending on the edge conditions of buckets we
@@ -588,124 +617,181 @@ add_checkpoint_show_not_always_a_drop_test() ->
%
% I've added comments showing the bucket ID based
% on the ?ENTRY passed to add_checkpoint
- History = {[{?SNODE, [
- ?ENTRY(16, 16), % Bucket 0
- ?ENTRY(15, 15), % Bucket 1
- ?ENTRY(14, 14), % Bucket 1
- ?ENTRY(13, 13), % Bucket 2
- ?ENTRY(12, 12), % Bucket 2
- ?ENTRY(9, 9), % Bucket 3
- ?ENTRY(8, 8), % Bucket 3
- ?ENTRY(1, 1) % Bucket 4
- ]}]},
- ?assertEqual(add_checkpoint(?ENTRY(17, 17), History), {[
- {?SNODE, [
- ?ENTRY(17, 17), % Bucket 0
- ?ENTRY(16, 16), % Bucket 0
- ?ENTRY(15, 15), % Bucket 1
- ?ENTRY(14, 14), % Bucket 1
- ?ENTRY(13, 13), % Bucket 2
- ?ENTRY(12, 12), % Bucket 2
- ?ENTRY(9, 9), % Bucket 3
- ?ENTRY(8, 8), % Bucket 3
- ?ENTRY(1, 1) % Bucket 4
+ History =
+ {[
+ {?SNODE, [
+ % Bucket 0
+ ?ENTRY(16, 16),
+ % Bucket 1
+ ?ENTRY(15, 15),
+ % Bucket 1
+ ?ENTRY(14, 14),
+ % Bucket 2
+ ?ENTRY(13, 13),
+ % Bucket 2
+ ?ENTRY(12, 12),
+ % Bucket 3
+ ?ENTRY(9, 9),
+ % Bucket 3
+ ?ENTRY(8, 8),
+ % Bucket 4
+ ?ENTRY(1, 1)
+ ]}
+ ]},
+ ?assertEqual(
+ add_checkpoint(?ENTRY(17, 17), History),
+ {[
+ {?SNODE, [
+ % Bucket 0
+ ?ENTRY(17, 17),
+ % Bucket 0
+ ?ENTRY(16, 16),
+ % Bucket 1
+ ?ENTRY(15, 15),
+ % Bucket 1
+ ?ENTRY(14, 14),
+ % Bucket 2
+ ?ENTRY(13, 13),
+ % Bucket 2
+ ?ENTRY(12, 12),
+ % Bucket 3
+ ?ENTRY(9, 9),
+ % Bucket 3
+ ?ENTRY(8, 8),
+ % Bucket 4
+ ?ENTRY(1, 1)
+ ]}
]}
- ]}).
-
+ ).
add_checkpoint_big_jump_show_lots_drop_test() ->
% I've added comments showing the bucket ID based
% on the ?ENTRY passed to add_checkpoint
- History = {[{?SNODE, [
- ?ENTRY(16, 16), % Bucket 4
- ?ENTRY(15, 15), % Bucket 4
- ?ENTRY(14, 14), % Bucket 4
- ?ENTRY(13, 13), % Bucket 4
- ?ENTRY(12, 12), % Bucket 4
- ?ENTRY(9, 9), % Bucket 4
- ?ENTRY(8, 8), % Bucket 4
- ?ENTRY(1, 1) % Bucket 4
- ]}]},
- ?assertEqual(add_checkpoint(?ENTRY(32, 32), History), {[
- {?SNODE, [
- ?ENTRY(32, 32), % Bucket 0
- ?ENTRY(16, 16), % Bucket 4
- ?ENTRY(1, 1) % Bucket 4
+ History =
+ {[
+ {?SNODE, [
+ % Bucket 4
+ ?ENTRY(16, 16),
+ % Bucket 4
+ ?ENTRY(15, 15),
+ % Bucket 4
+ ?ENTRY(14, 14),
+ % Bucket 4
+ ?ENTRY(13, 13),
+ % Bucket 4
+ ?ENTRY(12, 12),
+ % Bucket 4
+ ?ENTRY(9, 9),
+ % Bucket 4
+ ?ENTRY(8, 8),
+ % Bucket 4
+ ?ENTRY(1, 1)
+ ]}
+ ]},
+ ?assertEqual(
+ add_checkpoint(?ENTRY(32, 32), History),
+ {[
+ {?SNODE, [
+ % Bucket 0
+ ?ENTRY(32, 32),
+ % Bucket 4
+ ?ENTRY(16, 16),
+ % Bucket 4
+ ?ENTRY(1, 1)
+ ]}
]}
- ]}).
-
+ ).
add_checkpoint_show_filter_history_test() ->
- History = {[{?SNODE, [
- ?ENTRY(16, 16),
- ?ENTRY(15, 15),
- ?ENTRY(14, 14),
- ?ENTRY(13, 13),
- ?ENTRY(12, 12),
- ?ENTRY(9, 9),
- ?ENTRY(8, 8),
- ?ENTRY(1, 1)
- ]}]},
+ History =
+ {[
+ {?SNODE, [
+ ?ENTRY(16, 16),
+ ?ENTRY(15, 15),
+ ?ENTRY(14, 14),
+ ?ENTRY(13, 13),
+ ?ENTRY(12, 12),
+ ?ENTRY(9, 9),
+ ?ENTRY(8, 8),
+ ?ENTRY(1, 1)
+ ]}
+ ]},
% Drop for both
- ?assertEqual(add_checkpoint(?ENTRY(10, 10), History), {[
- {?SNODE, [
- ?ENTRY(10, 10),
- ?ENTRY(9, 9),
- ?ENTRY(8, 8),
- ?ENTRY(1, 1)
+ ?assertEqual(
+ add_checkpoint(?ENTRY(10, 10), History),
+ {[
+ {?SNODE, [
+ ?ENTRY(10, 10),
+ ?ENTRY(9, 9),
+ ?ENTRY(8, 8),
+ ?ENTRY(1, 1)
+ ]}
]}
- ]}),
+ ),
% Drop four source
- ?assertEqual(add_checkpoint(?ENTRY(10, 200), History), {[
- {?SNODE, [
- ?ENTRY(10, 200),
- ?ENTRY(9, 9),
- ?ENTRY(8, 8),
- ?ENTRY(1, 1)
+ ?assertEqual(
+ add_checkpoint(?ENTRY(10, 200), History),
+ {[
+ {?SNODE, [
+ ?ENTRY(10, 200),
+ ?ENTRY(9, 9),
+ ?ENTRY(8, 8),
+ ?ENTRY(1, 1)
+ ]}
]}
- ]}),
+ ),
% Drop for target. Obviously a source_seq of 200
% will end up droping the 8 entry.
- ?assertEqual(add_checkpoint(?ENTRY(200, 10), History), {[
- {?SNODE, [
- ?ENTRY(200, 10),
- ?ENTRY(9, 9),
- ?ENTRY(1, 1)
+ ?assertEqual(
+ add_checkpoint(?ENTRY(200, 10), History),
+ {[
+ {?SNODE, [
+ ?ENTRY(200, 10),
+ ?ENTRY(9, 9),
+ ?ENTRY(1, 1)
+ ]}
]}
- ]}).
-
+ ).
add_checkpoint_from_other_node_test() ->
- History = {[{<<"not_the_source">>, [
- ?ENTRY(12, 13),
- ?ENTRY(11, 10),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
- ]}]},
- % No filtering
- ?assertEqual(add_checkpoint(?ENTRY(1, 1), History), {[
- {?SNODE, [
- ?ENTRY(1, 1)
+ History =
+ {[
+ {<<"not_the_source">>, [
+ ?ENTRY(12, 13),
+ ?ENTRY(11, 10),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
]},
- {<<"not_the_source">>, [
- ?ENTRY(12, 13),
- ?ENTRY(11, 10),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
+ % No filtering
+ ?assertEqual(
+ add_checkpoint(?ENTRY(1, 1), History),
+ {[
+ {?SNODE, [
+ ?ENTRY(1, 1)
+ ]},
+ {<<"not_the_source">>, [
+ ?ENTRY(12, 13),
+ ?ENTRY(11, 10),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
]}
- ]}),
+ ),
% No dropping
- ?assertEqual(add_checkpoint(?ENTRY(200, 200), History), {[
- {?SNODE, [
- ?ENTRY(200, 200)
- ]},
- {<<"not_the_source">>, [
- ?ENTRY(12, 13),
- ?ENTRY(11, 10),
- ?ENTRY(10, 9),
- ?ENTRY(2, 3)
+ ?assertEqual(
+ add_checkpoint(?ENTRY(200, 200), History),
+ {[
+ {?SNODE, [
+ ?ENTRY(200, 200)
+ ]},
+ {<<"not_the_source">>, [
+ ?ENTRY(12, 13),
+ ?ENTRY(11, 10),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
]}
- ]}).
-
+ ).
-endif.