summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-02-25 11:59:57 -0500
committerNick Vatamaniuc <vatamane@apache.org>2019-02-28 11:49:01 -0500
commit6a4f97241ebbe6e113518d4a47102d619358ed9d (patch)
tree04f487ec744cc54a5b116c4922c559602119482a
parentc39852e18606a2234b4c6a57ce94bc87810c4870 (diff)
downloadcouchdb-6a4f97241ebbe6e113518d4a47102d619358ed9d.tar.gz
Uneven shard copy handling in mem3 and fabric
The introduction of shard splitting will eliminate the contraint that all document copies are located in shards with same range boundaries. That assumption was made by default in mem3 and fabric functions that do shard replacement, worker spawning, unpacking `_changes` update sequences and some others. This commit updates those places to handle the case where document copies might be in different shard ranges. A good place to start from is the `mem3_util:get_ring()` function. This function returns a full non-overlapped ring from a set of possibly overlapping shards. This function is used by almost everything else in this commit: 1) It's used when only a single copy of the data is needed, for example in cases where _all_docs or _changes procesessig. 2) Used when checking if progress is possible after some nodes died. `get_ring()` returns `[]` when it cannot find a full ring is used to indicate that progress is not possible. 3) During shard replacement. This is pershaps the most complicated case. During replacement besides just finding a possible covering of the ring from the set of shards, it is also desirable to find one that minimizes the number of workers that have to be replaced. A neat trick used here is to provide `get_ring` with a custom sort function, which prioritizes certain shard copies over others. In case of replacements it prioritiezes shards for which workers have already spawned. In the default cause `get_ring()` will prioritize longer ranges over shorter ones, so for example, to cover the interval [00-ff] with either [00-7f, 80-ff] or [00-ff] shards ranges, it will pick the single [00-ff] range instead of [00-7f, 80-ff] pair. Issue #1920 Co-authored-by: Paul J. Davis <davisp@apache.org>
-rw-r--r--src/fabric/src/fabric_dict.erl3
-rw-r--r--src/fabric/src/fabric_view.erl239
-rw-r--r--src/fabric/src/fabric_view_changes.erl339
-rw-r--r--src/mem3/include/mem3.hrl5
-rw-r--r--src/mem3/src/mem3.erl3
-rw-r--r--src/mem3/src/mem3_shards.erl41
-rw-r--r--src/mem3/src/mem3_util.erl282
-rw-r--r--src/mem3/test/mem3_ring_prop_tests.erl84
8 files changed, 838 insertions, 158 deletions
diff --git a/src/fabric/src/fabric_dict.erl b/src/fabric/src/fabric_dict.erl
index a336b47b0..b63ed2095 100644
--- a/src/fabric/src/fabric_dict.erl
+++ b/src/fabric/src/fabric_dict.erl
@@ -56,3 +56,6 @@ fold(Fun, Acc0, Dict) ->
to_list(Dict) ->
orddict:to_list(Dict).
+
+from_list(KVList) when is_list(KVList) ->
+ orddict:from_list(KVList).
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index 27b0c275f..4f6e34c50 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -12,10 +12,13 @@
-module(fabric_view).
--export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
+-export([is_progress_possible/1, remove_overlapping_shards/2,
+ remove_overlapping_shards/3, maybe_send_row/1,
transform_row/1, keydict/1, extract_view/4, get_shards/2,
check_down_shards/2, handle_worker_exit/3,
- get_shard_replacements/2, maybe_update_others/5]).
+ get_shard_replacements/2, maybe_update_others/5
+]).
+
-export([fix_skip_and_limit/1]).
-include_lib("fabric/include/fabric.hrl").
@@ -46,63 +49,95 @@ handle_worker_exit(Collector, _Worker, Reason) ->
{ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
{error, Resp}.
+
+-spec get_worker_ranges([{#shard{}, any()}]) -> [{integer(), integer()}].
+get_worker_ranges(Counters) ->
+ Ranges = fabric_dict:fold(fun(#shard{range=[X, Y]}, _, Acc) ->
+ [{X, Y} | Acc]
+ end, [], Counters),
+ lists:usort(Ranges).
+
+
%% @doc looks for a fully covered keyrange in the list of counters
-spec is_progress_possible([{#shard{}, term()}]) -> boolean().
-is_progress_possible([]) ->
- false;
is_progress_possible(Counters) ->
- Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end,
- [], Counters),
- [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges),
- Result = lists:foldl(fun
- (_, fail) ->
- % we've already declared failure
- fail;
- (_, complete) ->
- % this is the success condition, we can fast-forward
- complete;
- ({X,_}, Tail) when X > (Tail+1) ->
- % gap in the keyrange, we're dead
- fail;
- ({_,Y}, Tail) ->
- case erlang:max(Tail, Y) of
- End when (End+1) =:= (2 bsl 31) ->
- complete;
- Else ->
- % the normal condition, adding to the tail
- Else
- end
- end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest),
- (Start =:= 0) andalso (Result =:= complete).
+ mem3_util:get_ring(get_worker_ranges(Counters)) =/= [].
+
-spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) ->
[{#shard{}, any()}].
-remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) ->
- fabric_dict:filter(fun(#shard{range=[X,Y], node=Node, ref=Ref} = Shard, _) ->
- if Shard =:= Shard0 ->
- % we can't remove ourselves
+remove_overlapping_shards(#shard{} = Shard, Counters) ->
+ remove_overlapping_shards(Shard, Counters, fun stop_worker/1).
+
+
+-spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}], fun()) ->
+ [{#shard{}, any()}].
+remove_overlapping_shards(#shard{} = Shard, Counters, RemoveCb) ->
+ Counters1 = filter_exact_copies(Shard, Counters, RemoveCb),
+ filter_possible_overlaps(Shard, Counters1, RemoveCb).
+
+
+filter_possible_overlaps(Shard, Counters, RemoveCb) ->
+ Ranges0 = get_worker_ranges(Counters),
+ #shard{range = [BShard, EShard]} = Shard,
+ Ranges = Ranges0 ++ [{BShard, EShard}],
+ {Bs, Es} = lists:unzip(Ranges),
+ {MinB, MaxE} = {lists:min(Bs), lists:max(Es)},
+ % Use a custom sort function which prioritizes the given shard
+ % range when the start endpoints match.
+ SortFun = fun
+ ({B, E}, {B, _}) when {B, E} =:= {BShard, EShard} ->
+ % If start matches with the shard's start, shard always wins
true;
- A < B, X >= A, X < B ->
- % lower bound is inside our range
- rexi:kill(Node, Ref),
- false;
- A < B, Y > A, Y =< B ->
- % upper bound is inside our range
- rexi:kill(Node, Ref),
+ ({B, _}, {B, E}) when {B, E} =:= {BShard, EShard} ->
+ % If start matches with te shard's start, shard always wins
false;
- B < A, X >= A orelse B < A, X < B ->
- % target shard wraps the key range, lower bound is inside
- rexi:kill(Node, Ref),
- false;
- B < A, Y > A orelse B < A, Y =< B ->
- % target shard wraps the key range, upper bound is inside
- rexi:kill(Node, Ref),
+ ({B, E1}, {B, E2}) ->
+ % If start matches, pick the longest range first
+ E2 >= E1;
+ ({B1, _}, {B2, _}) ->
+ % Then, by default, sort by start point
+ B1 =< B2
+ end,
+ Ring = mem3_util:get_ring(Ranges, SortFun, MinB, MaxE),
+ fabric_dict:filter(fun
+ (S, _) when S =:= Shard ->
+ % Keep the original shard
+ true;
+ (#shard{range = [B, E]} = S, _) ->
+ case lists:member({B, E}, Ring) of
+ true ->
+ true; % Keep it
+ false ->
+ % Duplicate range, delete after calling callback function
+ case is_function(RemoveCb) of
+ true -> RemoveCb(S);
+ false -> ok
+ end,
+ false
+ end
+ end, Counters).
+
+
+filter_exact_copies(#shard{range = Range0} = Shard0, Shards, Cb) ->
+ fabric_dict:filter(fun
+ (Shard, _) when Shard =:= Shard0 ->
+ true; % Don't remove ourselves
+ (#shard{range = Range} = Shard, _) when Range =:= Range0 ->
+ case is_function(Cb) of
+ true -> Cb(Shard);
+ false -> ok
+ end,
false;
- true ->
+ (_, _) ->
true
- end
end, Shards).
+
+stop_worker(#shard{ref = Ref, node = Node}) ->
+ rexi:kill(Node, Ref).
+
+
maybe_send_row(#collector{limit=0} = State) ->
#collector{counters=Counters, user_acc=AccIn, callback=Callback} = State,
case fabric_dict:any(0, Counters) of
@@ -352,8 +387,9 @@ get_shard_replacements(DbName, UsedShards0) ->
% that aren't already used.
AllLiveShards = mem3:live_shards(DbName, [node() | nodes()]),
UsedShards = [S#shard{ref=undefined} || S <- UsedShards0],
- UnusedShards = AllLiveShards -- UsedShards,
+ get_shard_replacements_int(AllLiveShards -- UsedShards, UsedShards).
+get_shard_replacements_int(UnusedShards, UsedShards) ->
% If we have more than one copy of a range then we don't
% want to try and add a replacement to any copy.
RangeCounts = lists:foldl(fun(#shard{range=R}, Acc) ->
@@ -363,10 +399,10 @@ get_shard_replacements(DbName, UsedShards0) ->
% For each seq shard range with a count of 1, find any
% possible replacements from the unused shards. The
% replacement list is keyed by range.
- lists:foldl(fun(#shard{range=Range}, Acc) ->
+ lists:foldl(fun(#shard{range = [B, E] = Range}, Acc) ->
case dict:find(Range, RangeCounts) of
{ok, 1} ->
- Repls = [S || S <- UnusedShards, S#shard.range =:= Range],
+ Repls = mem3_util:non_overlapping_shards(UnusedShards, B, E),
% Only keep non-empty lists of replacements
if Repls == [] -> Acc; true ->
[{Range, Repls} | Acc]
@@ -388,44 +424,78 @@ fix_skip_and_limit(#mrargs{} = Args) ->
%% the coordinator needs to finalize each row, so make sure the shards don't
{CoordArgs, remove_finalizer(WorkerArgs)}.
+
remove_finalizer(Args) ->
couch_mrview_util:set_extra(Args, finalizer, null).
-% unit test
+
+% Unit tests
+
is_progress_possible_test() ->
- EndPoint = 2 bsl 31,
- T1 = [[0, EndPoint-1]],
- ?assertEqual(is_progress_possible(mk_cnts(T1)),true),
- T2 = [[0,10],[11,20],[21,EndPoint-1]],
- ?assertEqual(is_progress_possible(mk_cnts(T2)),true),
+ T1 = [[0, ?RING_END]],
+ ?assertEqual(is_progress_possible(mk_cnts(T1)), true),
+ T2 = [[0, 10], [11, 20], [21, ?RING_END]],
+ ?assertEqual(is_progress_possible(mk_cnts(T2)), true),
% gap
- T3 = [[0,10],[12,EndPoint-1]],
- ?assertEqual(is_progress_possible(mk_cnts(T3)),false),
+ T3 = [[0, 10], [12, ?RING_END]],
+ ?assertEqual(is_progress_possible(mk_cnts(T3)), false),
% outside range
- T4 = [[1,10],[11,20],[21,EndPoint-1]],
- ?assertEqual(is_progress_possible(mk_cnts(T4)),false),
+ T4 = [[1, 10], [11, 20], [21, ?RING_END]],
+ ?assertEqual(is_progress_possible(mk_cnts(T4)), false),
% outside range
- T5 = [[0,10],[11,20],[21,EndPoint]],
- ?assertEqual(is_progress_possible(mk_cnts(T5)),false).
+ T5 = [[0, 10], [11, 20], [21, ?RING_END + 1]],
+ ?assertEqual(is_progress_possible(mk_cnts(T5)), false),
+ % possible progress but with backtracking
+ T6 = [[0, 10], [11, 20], [0, 5], [6, 21], [21, ?RING_END]],
+ ?assertEqual(is_progress_possible(mk_cnts(T6)), true),
+ % not possible, overlap is not exact
+ T7 = [[0, 10], [13, 20], [21, ?RING_END], [9, 12]],
+ ?assertEqual(is_progress_possible(mk_cnts(T7)), false).
+
remove_overlapping_shards_test() ->
- meck:new(rexi),
- meck:expect(rexi, kill, fun(_, _) -> ok end),
- EndPoint = 2 bsl 31,
- T1 = [[0,10],[11,20],[21,EndPoint-1]],
- Shards = mk_cnts(T1,3),
- ?assertEqual(orddict:size(
- remove_overlapping_shards(#shard{name=list_to_atom("node-3"),
- node=list_to_atom("node-3"),
- range=[11,20]},
- Shards)),7),
- meck:unload(rexi).
+ Cb = undefined,
+
+ Shards = mk_cnts([[0, 10], [11, 20], [21, ?RING_END]], 3),
+
+ % Simple (exact) overlap
+ Shard1 = mk_shard("node-3", [11, 20]),
+ Shards1 = fabric_dict:store(Shard1, nil, Shards),
+ R1 = remove_overlapping_shards(Shard1, Shards1, Cb),
+ ?assertEqual([{0, 10}, {11, 20}, {21, ?RING_END}], get_worker_ranges(R1)),
+ ?assert(fabric_dict:is_key(Shard1, R1)),
+
+ % Split overlap (shard overlap multiple workers)
+ Shard2 = mk_shard("node-3", [0, 20]),
+ Shards2 = fabric_dict:store(Shard2, nil, Shards),
+ R2 = remove_overlapping_shards(Shard2, Shards2, Cb),
+ ?assertEqual([{0, 20}, {21, ?RING_END}], get_worker_ranges(R2)),
+ ?assert(fabric_dict:is_key(Shard2, R2)).
+
+
+get_shard_replacements_test() ->
+ Unused = [mk_shard(N, [B, E]) || {N, B, E} <- [
+ {"n1", 11, 20}, {"n1", 21, ?RING_END},
+ {"n2", 0, 4}, {"n2", 5, 10}, {"n2", 11, 20},
+ {"n3", 0, 21, ?RING_END}
+ ]],
+ Used = [mk_shard(N, [B, E]) || {N, B, E} <- [
+ {"n2", 21, ?RING_END},
+ {"n3", 0, 10}, {"n3", 11, 20}
+ ]],
+ Res = lists:sort(get_shard_replacements_int(Unused, Used)),
+ % Notice that [0, 10] range can be replaces by spawning the
+ % [0, 4] and [5, 10] workers on n1
+ Expect = [
+ {[0, 10], [mk_shard("n2", [0, 4]), mk_shard("n2", [5, 10])]},
+ {[11, 20], [mk_shard("n1", [11, 20]), mk_shard("n2", [11, 20])]},
+ {[21, ?RING_END], [mk_shard("n1", [21, ?RING_END])]}
+ ],
+ ?assertEqual(Expect, Res).
+
mk_cnts(Ranges) ->
- Shards = lists:map(fun(Range) ->
- #shard{range=Range}
- end,
- Ranges),
+ Shards = lists:map(fun mk_shard/1, Ranges),
orddict:from_list([{Shard,nil} || Shard <- Shards]).
mk_cnts(Ranges, NoNodes) ->
@@ -440,6 +510,15 @@ mk_cnts(Ranges, NoNodes) ->
mk_shards(0,_Range,Shards) ->
Shards;
mk_shards(NoNodes,Range,Shards) ->
- NodeName = list_to_atom("node-" ++ integer_to_list(NoNodes)),
- mk_shards(NoNodes-1,Range,
- [#shard{name=NodeName, node=NodeName, range=Range} | Shards]).
+ Name ="node-" ++ integer_to_list(NoNodes),
+ mk_shards(NoNodes-1,Range, [mk_shard(Name, Range) | Shards]).
+
+
+mk_shard([B, E]) when is_integer(B), is_integer(E) ->
+ #shard{range = [B, E]}.
+
+
+mk_shard(Name, Range) ->
+ Node = list_to_atom(Name),
+ BName = list_to_binary(Name),
+ #shard{name=BName, node=Node, range=Range}.
diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl
index f96bb058d..b28f84daf 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -127,22 +127,28 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0)
send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
LiveNodes = [node() | nodes()],
AllLiveShards = mem3:live_shards(DbName, LiveNodes),
- Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
- case lists:member(Shard, AllLiveShards) of
- true ->
- Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}),
- [{Shard#shard{ref = Ref}, Seq}];
- false ->
- % Find some replacement shards to cover the missing range
- % TODO It's possible in rare cases of shard merging to end up
- % with overlapping shard ranges from this technique
- lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
- Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2, ChangesArgs,
- make_replacement_arg(N, Seq)]}),
- {NewShard#shard{ref = Ref}, 0}
- end, find_replacement_shards(Shard, AllLiveShards))
- end
- end, unpack_seqs(PackedSeqs, DbName)),
+ Seqs0 = unpack_seqs(PackedSeqs, DbName),
+ {WSeqs0, Dead, Reps} = find_replacements(Seqs0, AllLiveShards),
+ % Start workers which didn't need replacements
+ WSeqs = lists:map(fun({#shard{name = Name, node = N} = S, Seq}) ->
+ Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Seq]}),
+ {S#shard{ref = Ref}, Seq}
+ end, WSeqs0),
+ % For some dead workers see if they are a result of split shards. In that
+ % case make a replacement argument so that local rexi workers can calculate
+ % (hopefully) a > 0 update sequence.
+ {WSplitSeqs0, Reps1} = find_split_shard_replacements(Dead, Reps),
+ WSplitSeqs = lists:map(fun({#shard{name = Name, node = N} = S, Seq}) ->
+ Arg = make_replacement_arg(N, Seq),
+ Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}),
+ {S#shard{ref = Ref}, Seq}
+ end, WSplitSeqs0),
+ % For ranges that were not split start sequences from 0
+ WReps = lists:map(fun(#shard{name = Name, node = N} = S) ->
+ Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}),
+ {S#shard{ref = Ref}, 0}
+ end, Reps1),
+ Seqs = WSeqs ++ WSplitSeqs ++ WReps,
{Workers0, _} = lists:unzip(Seqs),
Repls = fabric_view:get_shard_replacements(DbName, Workers0),
StartFun = fun(#shard{name=Name, node=N, range=R0}=Shard) ->
@@ -386,6 +392,24 @@ seq({Seq, _Uuid, _Node}) -> Seq;
seq({Seq, _Uuid}) -> Seq;
seq(Seq) -> Seq.
+
+unpack_seq_regex_match(Packed) ->
+ NewPattern = "^\\[[0-9]+\s*,\s*\"(?<opaque>.*)\"\\]$",
+ OldPattern = "^\"?([0-9]+-)?(?<opaque>.*?)\"?$",
+ Options = [{capture, [opaque], binary}],
+ case re:run(Packed, NewPattern, Options) of
+ {match, Match} ->
+ Match;
+ nomatch ->
+ {match, Match} = re:run(Packed, OldPattern, Options),
+ Match
+ end.
+
+
+unpack_seq_decode_term(Opaque) ->
+ binary_to_term(couch_util:decodeBase64Url(Opaque)).
+
+
unpack_seqs(0, DbName) ->
fabric_dict:init(mem3:shards(DbName), 0);
@@ -396,23 +420,14 @@ unpack_seqs([_SeqNum, Opaque], DbName) -> % deprecated
do_unpack_seqs(Opaque, DbName);
unpack_seqs(Packed, DbName) ->
- NewPattern = "^\\[[0-9]+\s*,\s*\"(?<opaque>.*)\"\\]$",
- OldPattern = "^\"?([0-9]+-)?(?<opaque>.*?)\"?$",
- Options = [{capture, [opaque], binary}],
- Opaque = case re:run(Packed, NewPattern, Options) of
- {match, Match} ->
- Match;
- nomatch ->
- {match, Match} = re:run(Packed, OldPattern, Options),
- Match
- end,
+ Opaque = unpack_seq_regex_match(Packed),
do_unpack_seqs(Opaque, DbName).
do_unpack_seqs(Opaque, DbName) ->
% A preventative fix for FB 13533 to remove duplicate shards.
% This just picks each unique shard and keeps the largest seq
% value recorded.
- Decoded = binary_to_term(couch_util:decodeBase64Url(Opaque)),
+ Decoded = unpack_seq_decode_term(Opaque),
DedupDict = lists:foldl(fun({Node, [A, B], Seq}, Acc) ->
dict:append({Node, [A, B]}, Seq, Acc)
end, dict:new(), Decoded),
@@ -431,28 +446,28 @@ do_unpack_seqs(Opaque, DbName) ->
end
end, Deduped),
- % Fill holes in the since sequence. If/when we ever start
- % using overlapping shard ranges this will need to be updated
- % to not include shard ranges that overlap entries in Upacked.
- % A quick and dirty approach would be like such:
- %
- % lists:foldl(fun(S, Acc) ->
- % fabric_view:remove_overlapping_shards(S, Acc)
- % end, mem3:shards(DbName), Unpacked)
- %
- % Unfortunately remove_overlapping_shards isn't reusable because
- % of its calls to rexi:kill/2. When we get to overlapping
- % shard ranges and have to rewrite shard range management
- % we can revisit this simpler algorithm.
+ % This just handles the case if the ring in the unpacked sequence
+ % received is not complete and in that case tries to fill in the
+ % missing ranges with shards from the shard map
case fabric_view:is_progress_possible(Unpacked) of
true ->
Unpacked;
false ->
- Extract = fun({Shard, _Seq}) -> Shard#shard.range end,
- Ranges = lists:usort(lists:map(Extract, Unpacked)),
- Filter = fun(S) -> not lists:member(S#shard.range, Ranges) end,
- Replacements = lists:filter(Filter, mem3:shards(DbName)),
- Unpacked ++ [{R, get_old_seq(R, Deduped)} || R <- Replacements]
+ PotentialWorkers = lists:map(fun({Node, [A, B], Seq}) ->
+ case mem3:get_shard(DbName, Node, [A, B]) of
+ {ok, Shard} ->
+ {Shard, Seq};
+ {error, not_found} ->
+ {#shard{node = Node, range = [A, B]}, Seq}
+ end
+ end, Deduped),
+ Shards = mem3:shards(DbName),
+ {Unpacked1, Dead, Reps} = find_replacements(PotentialWorkers, Shards),
+ {Splits, Reps1} = find_split_shard_replacements(Dead, Reps),
+ RepSeqs = lists:map(fun(#shard{} = S) ->
+ {S, get_old_seq(S, Deduped)}
+ end, Reps1),
+ Unpacked1 ++ Splits ++ RepSeqs
end.
@@ -491,18 +506,119 @@ changes_row(Props0) ->
Props2 = lists:filter(fun({K,_V}) -> lists:member(K, Allowed) end, Props1),
{change, {Props2}}.
-find_replacement_shards(#shard{range=Range}, AllShards) ->
- % TODO make this moar betta -- we might have split or merged the partition
- [Shard || Shard <- AllShards, Shard#shard.range =:= Range].
+
+find_replacements(Workers, AllShards) ->
+ % Build map [B, E] => [Worker1, Worker2, ...] for all workers
+ WrkMap = lists:foldl(fun({#shard{range = [B, E]}, _} = W, Acc) ->
+ maps:update_with({B, E}, fun(Ws) -> [W | Ws] end, [W], Acc)
+ end, #{}, fabric_dict:to_list(Workers)),
+
+ % Build map [B, E] => [Shard1, Shard2, ...] for all shards
+ AllMap = lists:foldl(fun(#shard{range = [B, E]} = S, Acc) ->
+ maps:update_with({B, E}, fun(Ss) -> [S | Ss] end, [S], Acc)
+ end, #{}, AllShards),
+
+ % Custom sort function will prioritize workers over other shards.
+ % The idea is to not unnecessarily kill workers if we don't have to
+ SortFun = fun
+ (R1 = {B, E1}, R2 = {B, E2}) ->
+ case {maps:is_key(R1, WrkMap), maps:is_key(R2, WrkMap)} of
+ {true, true} ->
+ % Both are workers, larger interval wins
+ E1 >= E2;
+ {true, false} ->
+ % First element is a worker range, it wins
+ true;
+ {false, true} ->
+ % Second element is a worker range, it wins
+ false;
+ {false, false} ->
+ % Neither one is a worker interval, pick larger one
+ E1 >= E2
+ end;
+ ({B1, _}, {B2, _}) ->
+ B1 =< B2
+ end,
+ Ring = mem3_util:get_ring(maps:keys(AllMap), SortFun),
+
+ % Keep only workers in the ring and from one of the available nodes
+ Keep = fun(#shard{range = [B, E], node = N}) ->
+ lists:member({B, E}, Ring) andalso lists:keyfind(N, #shard.node,
+ maps:get({B, E}, AllMap)) =/= false
+ end,
+ Workers1 = fabric_dict:filter(fun(S, _) -> Keep(S) end, Workers),
+ Removed = fabric_dict:filter(fun(S, _) -> not Keep(S) end, Workers),
+
+ {Rep, _} = lists:foldl(fun(R, {RepAcc, AllMapAcc}) ->
+ case maps:is_key(R, WrkMap)of
+ true ->
+ % It's a worker and in the map of available shards. Make sure
+ % to keep it only if there is a range available on that node
+ % only (reuse Keep/1 predicate from above)
+ WorkersInRange = maps:get(R, WrkMap),
+ case lists:any(fun({S, _}) -> Keep(S) end, WorkersInRange) of
+ true ->
+ {RepAcc, AllMapAcc};
+ false ->
+ [Shard | Rest] = maps:get(R, AllMapAcc),
+ {[Shard | RepAcc], AllMapAcc#{R := Rest}}
+ end;
+ false ->
+ % No worker for this range. Replace from available shards
+ [Shard | Rest] = maps:get(R, AllMapAcc),
+ {[Shard | RepAcc], AllMapAcc#{R := Rest}}
+ end
+ end, {[], AllMap}, Ring),
+
+ % Return the list of workers that are part of ring, list of removed workers
+ % and a list of replacement shards that could be used to make sure the ring
+ % completes.
+ {Workers1, Removed, Rep}.
+
+
+% From the list of dead workers determine if any are a result of a split shard.
+% In that case perhaps there is a way to not rewind the changes feed back to 0.
+% Returns {NewWorkers, Available} where NewWorkers is the list of
+% viable workers Available is the list of still unused input Shards
+find_split_shard_replacements(DeadWorkers, Shards) ->
+ Acc0 = {[], Shards},
+ AccF = fabric_dict:fold(fun(#shard{node = WN, range = R}, Seq, Acc) ->
+ [B, E] = R,
+ {SplitWorkers, Available} = Acc,
+ ShardsOnSameNode = [S || #shard{node = N} = S <- Available, N =:= WN],
+ SplitShards = mem3_util:non_overlapping_shards(ShardsOnSameNode, B, E),
+ RepCount = length(SplitShards),
+ NewWorkers = [{S, make_split_seq(Seq, RepCount)} || S <- SplitShards],
+ NewAvailable = [S || S <- Available, not lists:member(S, SplitShards)],
+ {NewWorkers ++ SplitWorkers, NewAvailable}
+ end, Acc0, DeadWorkers),
+ {Workers, Available} = AccF,
+ {fabric_dict:from_list(Workers), Available}.
+
+
+make_split_seq({Num, _UuidPrefix, Node}, RepCount) when RepCount > 1 ->
+ {Num, split, Node};
+make_split_seq(Seq, _) ->
+ Seq.
+
validate_start_seq(_DbName, "now") ->
ok;
-validate_start_seq(DbName, Seq) ->
- try unpack_seqs(Seq, DbName) of _Any ->
+validate_start_seq(_DbName, 0) ->
+ ok;
+validate_start_seq(_DbName, "0") ->
+ ok;
+validate_start_seq(_DbName, Seq) ->
+ try
+ case Seq of
+ [_SeqNum, Opaque] ->
+ unpack_seq_decode_term(Opaque);
+ Seq ->
+ Opaque = unpack_seq_regex_match(Seq),
+ unpack_seq_decode_term(Opaque)
+ end,
ok
catch
- error:database_does_not_exist ->
- {error, database_does_not_exist};
_:_ ->
Reason = <<"Malformed sequence supplied in 'since' parameter.">>,
{error, {bad_request, Reason}}
@@ -568,3 +684,122 @@ unpack_seqs_test() ->
assert_shards(Packed) ->
?assertMatch([{#shard{},_}|_], unpack_seqs(Packed, <<"foo">>)).
+
+
+find_replacements_test() ->
+ % None of the workers are in the live list of shard but there is a
+ % replacement on n3 for the full range. It should get picked instead of
+ % the two smaller one on n2.
+ Workers1 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]),
+ AllShards1 = [
+ mk_shard("n1", 11, ?RING_END),
+ mk_shard("n2", 0, 4),
+ mk_shard("n2", 5, 10),
+ mk_shard("n3", 0, ?RING_END)
+ ],
+ {WorkersRes1, Dead1, Reps1} = find_replacements(Workers1, AllShards1),
+ ?assertEqual([], WorkersRes1),
+ ?assertEqual(Workers1, Dead1),
+ ?assertEqual([mk_shard("n3", 0, ?RING_END)], Reps1),
+
+ % None of the workers are in the live list of shards and there is a
+ % split replacement from n2 (range [0, 10] replaced with [0, 4], [5, 10])
+ Workers2 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]),
+ AllShards2 = [
+ mk_shard("n1", 11, ?RING_END),
+ mk_shard("n2", 0, 4),
+ mk_shard("n2", 5, 10)
+ ],
+ {WorkersRes2, Dead2, Reps2} = find_replacements(Workers2, AllShards2),
+ ?assertEqual([], WorkersRes2),
+ ?assertEqual(Workers2, Dead2),
+ ?assertEqual([
+ mk_shard("n1", 11, ?RING_END),
+ mk_shard("n2", 0, 4),
+ mk_shard("n2", 5, 10)
+ ], lists:sort(Reps2)),
+
+ % One worker is available and one needs to be replaced. Replacement will be
+ % from two split shards
+ Workers3 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]),
+ AllShards3 = [
+ mk_shard("n1", 11, ?RING_END),
+ mk_shard("n2", 0, 4),
+ mk_shard("n2", 5, 10),
+ mk_shard("n2", 11, ?RING_END)
+ ],
+ {WorkersRes3, Dead3, Reps3} = find_replacements(Workers3, AllShards3),
+ ?assertEqual(mk_workers([{"n2", 11, ?RING_END}]), WorkersRes3),
+ ?assertEqual(mk_workers([{"n1", 0, 10}]), Dead3),
+ ?assertEqual([
+ mk_shard("n2", 0, 4),
+ mk_shard("n2", 5, 10)
+ ], lists:sort(Reps3)),
+
+ % All workers are available. Make sure they are not killed even if there is
+ % a longer (single) shard to replace them.
+ Workers4 = mk_workers([{"n1", 0, 10}, {"n1", 11, ?RING_END}]),
+ AllShards4 = [
+ mk_shard("n1", 0, 10),
+ mk_shard("n1", 11, ?RING_END),
+ mk_shard("n2", 0, 4),
+ mk_shard("n2", 5, 10),
+ mk_shard("n3", 0, ?RING_END)
+ ],
+ {WorkersRes4, Dead4, Reps4} = find_replacements(Workers4, AllShards4),
+ ?assertEqual(Workers4, WorkersRes4),
+ ?assertEqual([], Dead4),
+ ?assertEqual([], Reps4).
+
+
+mk_workers(NodesRanges) ->
+ mk_workers(NodesRanges, nil).
+
+mk_workers(NodesRanges, Val) ->
+ orddict:from_list([{mk_shard(N, B, E), Val} || {N, B, E} <- NodesRanges]).
+
+
+mk_shard(Name, B, E) ->
+ Node = list_to_atom(Name),
+ BName = list_to_binary(Name),
+ #shard{name=BName, node=Node, range=[B, E]}.
+
+
+find_split_shard_replacements_test() ->
+ % One worker is can be replaced and one can't
+ Dead1 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42),
+ Shards1 = [
+ mk_shard("n1", 0, 4),
+ mk_shard("n1", 5, 10),
+ mk_shard("n3", 11, ?RING_END)
+ ],
+ {Workers1, ShardsLeft1} = find_split_shard_replacements(Dead1, Shards1),
+ ?assertEqual(mk_workers([{"n1", 0, 4}, {"n1", 5, 10}], 42), Workers1),
+ ?assertEqual([mk_shard("n3", 11, ?RING_END)], ShardsLeft1),
+
+ % All workers can be replaced - one by 1 shard, another by 3 smaller shards
+ Dead2 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42),
+ Shards2 = [
+ mk_shard("n1", 0, 10),
+ mk_shard("n2", 11, 12),
+ mk_shard("n2", 13, 14),
+ mk_shard("n2", 15, ?RING_END)
+ ],
+ {Workers2, ShardsLeft2} = find_split_shard_replacements(Dead2, Shards2),
+ ?assertEqual(mk_workers([
+ {"n1", 0, 10},
+ {"n2", 11, 12},
+ {"n2", 13, 14},
+ {"n2", 15, ?RING_END}
+ ], 42), Workers2),
+ ?assertEqual([], ShardsLeft2),
+
+ % No workers can be replaced. Ranges match but they are on different nodes
+ Dead3 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42),
+ Shards3 = [
+ mk_shard("n2", 0, 10),
+ mk_shard("n3", 11, ?RING_END)
+ ],
+ {Workers3, ShardsLeft3} = find_split_shard_replacements(Dead3, Shards3),
+ ?assertEqual([], Workers3),
+ ?assertEqual(Shards3, ShardsLeft3).
diff --git a/src/mem3/include/mem3.hrl b/src/mem3/include/mem3.hrl
index 9be8a7f99..d97b25469 100644
--- a/src/mem3/include/mem3.hrl
+++ b/src/mem3/include/mem3.hrl
@@ -10,6 +10,11 @@
% License for the specific language governing permissions and limitations under
% the License.
+
+% The last element in the ring
+-define(RING_END, 2 bsl 31 - 1).
+
+
% type specification hacked to suppress dialyzer warning re: match spec
-record(shard, {
name :: binary() | '_' | 'undefined',
diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl
index dea0c7a5b..dc666fdae 100644
--- a/src/mem3/src/mem3.erl
+++ b/src/mem3/src/mem3.erl
@@ -150,7 +150,8 @@ ushards(DbName, Shards0, ZoneMap) ->
% but sort each zone separately to ensure a consistent choice between
% nodes in the same zone.
Shards = choose_ushards(DbName, L ++ S) ++ choose_ushards(DbName, D),
- lists:ukeysort(#shard.range, Shards).
+ OverlappedShards = lists:ukeysort(#shard.range, Shards),
+ mem3_util:non_overlapping_shards(OverlappedShards).
get_shard(DbName, Node, Range) ->
mem3_shards:get(DbName, Node, Range).
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index 6afc22f57..e4b65633e 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -96,35 +96,33 @@ for_docid(DbName, DocId, Options) ->
end.
for_shard_name(ShardName) ->
- for_shard_name(ShardName, []).
-
-for_shard_name(ShardName, Options) ->
DbName = mem3:dbname(ShardName),
+ [B, E] = mem3:range(ShardName),
ShardHead = #shard{
- name = ShardName,
dbname = DbName,
+ range = ['$1', '$2'],
_ = '_'
},
OrderedShardHead = #ordered_shard{
- name = ShardName,
dbname = DbName,
+ range = ['$1', '$2'],
_ = '_'
},
- ShardSpec = {ShardHead, [], ['$_']},
- OrderedShardSpec = {OrderedShardHead, [], ['$_']},
+ % see mem3_util:range_overlap/2 for an explanation how it works
+ Conditions = [{'=<', '$1', E}, {'=<', B, '$2'}],
+ ShardSpec = {ShardHead, Conditions, ['$_']},
+ OrderedShardSpec = {OrderedShardHead, Conditions, ['$_']},
Shards = try ets:select(?SHARDS, [ShardSpec, OrderedShardSpec]) of
[] ->
- filter_shards_by_name(ShardName, load_shards_from_disk(DbName));
+ filter_shards_by_range([B, E], load_shards_from_disk(DbName));
Else ->
gen_server:cast(?MODULE, {cache_hit, DbName}),
Else
catch error:badarg ->
- filter_shards_by_name(ShardName, load_shards_from_disk(DbName))
+ filter_shards_by_range([B, E], load_shards_from_disk(DbName))
end,
- case lists:member(ordered, Options) of
- true -> Shards;
- false -> mem3_util:downcast(Shards)
- end.
+ mem3_util:downcast(Shards).
+
get(DbName, Node, Range) ->
Res = lists:foldl(fun(#shard{node=N, range=R}=S, Acc) ->
@@ -509,17 +507,12 @@ flush_write(DbName, Writer, WriteTimeout) ->
erlang:exit({mem3_shards_write_timeout, DbName})
end.
-filter_shards_by_name(Name, Shards) ->
- filter_shards_by_name(Name, [], Shards).
-
-filter_shards_by_name(_, Matches, []) ->
- Matches;
-filter_shards_by_name(Name, Matches, [#ordered_shard{name=Name}=S|Ss]) ->
- filter_shards_by_name(Name, [S|Matches], Ss);
-filter_shards_by_name(Name, Matches, [#shard{name=Name}=S|Ss]) ->
- filter_shards_by_name(Name, [S|Matches], Ss);
-filter_shards_by_name(Name, Matches, [_|Ss]) ->
- filter_shards_by_name(Name, Matches, Ss).
+
+filter_shards_by_range(Range, Shards)->
+ lists:filter(fun
+ (#ordered_shard{range = R}) -> mem3_util:range_overlap(Range, R);
+ (#shard{range = R}) -> mem3_util:range_overlap(Range, R)
+ end, Shards).
-ifdef(TEST).
diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl
index b44ca2332..e8cba5d7b 100644
--- a/src/mem3/src/mem3_util.erl
+++ b/src/mem3/src/mem3_util.erl
@@ -17,7 +17,18 @@
shard_info/1, ensure_exists/1, open_db_doc/1]).
-export([is_deleted/1, rotate_list/2]).
-export([
- iso8601_timestamp/0
+ iso8601_timestamp/0,
+ live_nodes/0,
+ replicate_dbs_to_all_nodes/1,
+ replicate_dbs_from_all_nodes/1,
+ range_overlap/2,
+ get_ring/1,
+ get_ring/2,
+ get_ring/3,
+ get_ring/4,
+ non_overlapping_shards/1,
+ non_overlapping_shards/3,
+ calculate_max_n/1
]).
%% do not use outside mem3.
@@ -286,3 +297,272 @@ iso8601_timestamp() ->
{{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
+
+
+live_nodes() ->
+ LiveNodes = [node() | nodes()],
+ Mem3Nodes = lists:sort(mem3:nodes()),
+ [N || N <- Mem3Nodes, lists:member(N, LiveNodes)].
+
+
+% Replicate "dbs" db to all nodes. Basically push the changes to all the live
+% mem3:nodes(). Returns only after all current changes have been replicated,
+% which could be a while.
+%
+replicate_dbs_to_all_nodes(Timeout) ->
+ DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+ Targets= mem3_util:live_nodes() -- [node()],
+ Res = [start_replication(node(), T, DbName, Timeout) || T <- Targets],
+ collect_replication_results(Res, Timeout).
+
+
+% Replicate "dbs" db from all nodes to this node. Basically make an rpc call
+% to all the nodes an have them push their changes to this node. Then monitor
+% them until they are all done.
+%
+replicate_dbs_from_all_nodes(Timeout) ->
+ DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+ Sources = mem3_util:live_nodes() -- [node()],
+ Res = [start_replication(S, node(), DbName, Timeout) || S <- Sources],
+ collect_replication_results(Res, Timeout).
+
+
+% Spawn and monitor a single replication of a database to a target node.
+% Returns {ok, PidRef}. This function could be called locally or remotely from
+% mem3_rpc, for instance when replicating other nodes' data to this node.
+%
+start_replication(Source, Target, DbName, Timeout) ->
+ spawn_monitor(fun() ->
+ case mem3_rpc:replicate(Source, Target, DbName, Timeout) of
+ {ok, 0} ->
+ exit(ok);
+ Other ->
+ exit(Other)
+ end
+ end).
+
+
+collect_replication_results(Replications, Timeout) ->
+ Res = [collect_replication_result(R, Timeout) || R <- Replications],
+ case [R || R <- Res, R =/= ok] of
+ [] ->
+ ok;
+ Errors ->
+ {error, Errors}
+ end.
+
+
+collect_replication_result({Pid, Ref}, Timeout) when is_pid(Pid) ->
+ receive
+ {'DOWN', Ref, _, _, Res} ->
+ Res
+ after Timeout ->
+ demonitor(Pid, [flush]),
+ exit(Pid, kill),
+ {error, {timeout, Timeout, node(Pid)}}
+ end;
+
+collect_replication_result(Error, _) ->
+ {error, Error}.
+
+
+% Consider these cases:
+%
+% A-------B
+%
+% overlap:
+% X--------Y
+% X-Y
+% X-------Y
+% X-------------------Y
+%
+% no overlap:
+% X-Y because A !=< Y
+% X-Y because X !=< B
+%
+range_overlap([A, B], [X, Y]) when
+ is_integer(A), is_integer(B),
+ is_integer(X), is_integer(Y),
+ A =< B, X =< Y ->
+ A =< Y andalso X =< B.
+
+
+non_overlapping_shards(Shards) ->
+ {Start, End} = lists:foldl(fun(Shard, {Min, Max}) ->
+ [B, E] = mem3:range(Shard),
+ {min(B, Min), max(E, Max)}
+ end, {0, ?RING_END}, Shards),
+ non_overlapping_shards(Shards, Start, End).
+
+
+non_overlapping_shards([], _, _) ->
+ [];
+
+non_overlapping_shards(Shards, Start, End) ->
+ Ranges = lists:map(fun(Shard) ->
+ [B, E] = mem3:range(Shard),
+ {B, E}
+ end, Shards),
+ Ring = get_ring(Ranges, fun sort_ranges_fun/2, Start, End),
+ lists:filter(fun(Shard) ->
+ [B, E] = mem3:range(Shard),
+ lists:member({B, E}, Ring)
+ end, Shards).
+
+
+% Given a list of shards, return the maximum number of copies
+% across all the ranges. If the ring is incomplete it will return 0.
+% If there it is an n = 1 database, it should return 1, etc.
+calculate_max_n(Shards) ->
+ Ranges = lists:map(fun(Shard) ->
+ [B, E] = mem3:range(Shard),
+ {B, E}
+ end, Shards),
+ calculate_max_n(Ranges, get_ring(Ranges), 0).
+
+
+calculate_max_n(_Ranges, [], N) ->
+ N;
+
+calculate_max_n(Ranges, Ring, N) ->
+ NewRanges = Ranges -- Ring,
+ calculate_max_n(NewRanges, get_ring(NewRanges), N + 1).
+
+
+get_ring(Ranges) ->
+ get_ring(Ranges, fun sort_ranges_fun/2, 0, ?RING_END).
+
+
+get_ring(Ranges, SortFun) when is_function(SortFun, 2) ->
+ get_ring(Ranges, SortFun, 0, ?RING_END).
+
+
+get_ring(Ranges, Start, End) when is_integer(Start), is_integer(End),
+ Start >= 0, End >= 0, Start =< End ->
+ get_ring(Ranges, fun sort_ranges_fun/2, Start, End).
+
+% Build a ring out of a list of possibly overlapping ranges. If a ring cannot
+% be built then [] is returned. Start and End supply a custom range such that
+% only intervals in that range will be considered. SortFun is a custom sorting
+% function to sort intervals before the ring is built. The custom sort function
+% can be used to prioritize how the ring is built, for example, whether to use
+% shortest ranges first (and thus have more total shards) or longer or any
+% other scheme.
+%
+get_ring([], _SortFun, _Start, _End) ->
+ [];
+get_ring(Ranges, SortFun, Start, End) when is_function(SortFun, 2),
+ is_integer(Start), is_integer(End),
+ Start >= 0, End >= 0, Start =< End ->
+ Sorted = lists:usort(SortFun, Ranges),
+ case get_subring_int(Start, End, Sorted) of
+ fail -> [];
+ Ring -> Ring
+ end.
+
+
+get_subring_int(_, _, []) ->
+ fail;
+
+get_subring_int(Start, EndMax, [{Start, End} = Range | Tail]) ->
+ case End =:= EndMax of
+ true ->
+ [Range];
+ false ->
+ case get_subring_int(End + 1, EndMax, Tail) of
+ fail ->
+ get_subring_int(Start, EndMax, Tail);
+ Acc ->
+ [Range | Acc]
+ end
+ end;
+
+get_subring_int(Start1, _, [{Start2, _} | _]) when Start2 > Start1 ->
+ % Found a gap, this attempt is done
+ fail;
+
+get_subring_int(Start1, EndMax, [{Start2, _} | Rest]) when Start2 < Start1 ->
+ % We've overlapped the head, skip the shard
+ get_subring_int(Start1, EndMax, Rest).
+
+
+% Sort ranges by starting point, then sort so that
+% the longest range comes first
+sort_ranges_fun({B, E1}, {B, E2}) ->
+ E2 =< E1;
+
+sort_ranges_fun({B1, _}, {B2, _}) ->
+ B1 =< B2.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+range_overlap_test_() ->
+ [?_assertEqual(Res, range_overlap(R1, R2)) || {R1, R2, Res} <- [
+ {[2, 6], [1, 3], true},
+ {[2, 6], [3, 4], true},
+ {[2, 6], [4, 8], true},
+ {[2, 6], [1, 9], true},
+ {[2, 6], [1, 2], true},
+ {[2, 6], [6, 7], true},
+ {[2, 6], [0, 1], false},
+ {[2, 6], [7, 9], false}
+ ]].
+
+
+non_overlapping_shards_test() ->
+ [?_assertEqual(Res, non_overlapping_shards(Shards)) || {Shards, Res} <- [
+ {
+ [shard(0, ?RING_END)],
+ [shard(0, ?RING_END)]
+ },
+ {
+ [shard(0, 1)],
+ [shard(0, 1)]
+ },
+ {
+ [shard(0, 1), shard(0, 1)],
+ [shard(0, 1)]
+ },
+ {
+ [shard(0, 1), shard(3, 4)],
+ []
+ },
+ {
+ [shard(0, 1), shard(1, 2), shard(2, 3)],
+ [shard(0, 1), shard(2, 3)]
+ },
+ {
+ [shard(1, 2), shard(0, 1)],
+ [shard(0, 1), shard(1, 2)]
+ },
+ {
+ [shard(0, 1), shard(0, 2), shard(2, 5), shard(3, 5)],
+ [shard(0, 2), shard(2, 5)]
+ },
+ {
+ [shard(0, 2), shard(4, 5), shard(1, 3)],
+ []
+ }
+
+ ]].
+
+
+calculate_max_n_test_() ->
+ [?_assertEqual(Res, calculate_max_n(Shards)) || {Res, Shards} <- [
+ {0, []},
+ {0, [shard(1, ?RING_END)]},
+ {1, [shard(0, ?RING_END)]},
+ {1, [shard(0, ?RING_END), shard(1, ?RING_END)]},
+ {2, [shard(0, ?RING_END), shard(0, ?RING_END)]},
+ {2, [shard(0, 1), shard(2, ?RING_END), shard(0, ?RING_END)]},
+ {0, [shard(0, 3), shard(5, ?RING_END), shard(1, ?RING_END)]}
+ ]].
+
+
+shard(Begin, End) ->
+ #shard{range = [Begin, End]}.
+
+-endif.
diff --git a/src/mem3/test/mem3_ring_prop_tests.erl b/src/mem3/test/mem3_ring_prop_tests.erl
new file mode 100644
index 000000000..2eb78c32f
--- /dev/null
+++ b/src/mem3/test/mem3_ring_prop_tests.erl
@@ -0,0 +1,84 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_ring_prop_tests).
+
+
+-include_lib("triq/include/triq.hrl").
+-triq(eunit).
+
+-compile(export_all).
+
+
+% Properties
+
+prop_get_ring_with_connected_intervals() ->
+ ?FORALL({Start, End}, oneof(ranges()),
+ ?FORALL(Intervals, g_connected_intervals(Start, End),
+ mem3_util:get_ring(Intervals, Start, End) =:= Intervals
+ )
+ ).
+
+
+prop_get_ring_with_disconnected_intervals() ->
+ ?FORALL({Start, End}, oneof(ranges()),
+ ?FORALL(Intervals, g_disconnected_intervals(Start, End),
+ mem3_util:get_ring(Intervals, Start, End) =:= []
+ )
+ ).
+
+
+% Generators
+
+ranges() ->
+ [{0,1}, {1, 10}, {0, 2 bsl 31 - 1}, {2 bsl 31 - 10, 2 bsl 31 - 1}].
+
+
+g_connected_intervals(Begin, End) ->
+ ?SIZED(Size, g_connected_intervals(Begin, End, 10 * Size)).
+
+
+g_connected_intervals(Begin, End, Split) when Begin =< End ->
+ ?LET(N, choose(0, Split),
+ begin
+ if
+ N == 0 ->
+ [{Begin, End}];
+ N > 0 ->
+ Ns = lists:seq(1, N - 1),
+ Bs = lists:usort([rand_range(Begin, End) || _ <- Ns]),
+ Es = [B - 1 || B <- Bs],
+ lists:zip([Begin] ++ Bs, Es ++ [End])
+ end
+ end).
+
+
+g_non_trivial_connected_intervals(Begin, End, Split) ->
+ ?SUCHTHAT(Connected, g_connected_intervals(Begin, End, Split),
+ length(Connected) > 1).
+
+
+g_disconnected_intervals(Begin, End) ->
+ ?SIZED(Size, g_disconnected_intervals(Begin, End, Size)).
+
+
+g_disconnected_intervals(Begin, End, Split) when Begin =< End ->
+ ?LET(Connected, g_non_trivial_connected_intervals(Begin, End, Split),
+ begin
+ I = triq_rnd:uniform(length(Connected)) - 1,
+ {Before, [_ | After]} = lists:split(I, Connected),
+ Before ++ After
+ end).
+
+
+rand_range(B, E) ->
+ B + triq_rnd:uniform(E - B).