summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-03-18 13:32:14 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2019-04-03 10:48:45 -0400
commitd10b7955929c1581f192fc487840cbf5005f84c9 (patch)
tree7b66fab88eba4c39c1a95919792cb972be7157e7
parent3227e61ebbe6ee15d122036ec85b57ebffeb062d (diff)
downloadcouchdb-d10b7955929c1581f192fc487840cbf5005f84c9.tar.gz
Uneven shard copy handling in mem3 and fabric
The introduction of shard splitting will eliminate the contraint that all document copies are located in shards with same range boundaries. That assumption was made by default in mem3 and fabric functions that do shard replacement, worker spawning, unpacking `_changes` update sequences and some others. This commit updates those places to handle the case where document copies might be in different shard ranges. A good place to start from is the `mem3_util:get_ring()` function. This function returns a full non-overlapped ring from a set of possibly overlapping shards. This function is used by almost everything else in this commit: 1) It's used when only a single copy of the data is needed, for example in cases where _all_docs or _changes procesessig. 2) Used when checking if progress is possible after some nodes died. `get_ring()` returns `[]` when it cannot find a full ring is used to indicate that progress is not possible. 3) During shard replacement. This is pershaps the most complicated case. During replacement besides just finding a possible covering of the ring from the set of shards, it is also desirable to find one that minimizes the number of workers that have to be replaced. A neat trick used here is to provide `get_ring` with a custom sort function, which prioritizes certain shard copies over others. In case of replacements it prioritiezes shards for which workers have already spawned. In the default cause `get_ring()` will prioritize longer ranges over shorter ones, so for example, to cover the interval [00-ff] with either [00-7f, 80-ff] or [00-ff] shards ranges, it will pick the single [00-ff] range instead of [00-7f, 80-ff] pair. Co-authored-by: Paul J. Davis <davisp@apache.org>
-rw-r--r--src/couch/src/couch_db.erl11
-rw-r--r--src/couch/src/couch_multidb_changes.erl6
-rw-r--r--src/fabric/include/fabric.hrl4
-rw-r--r--src/fabric/src/fabric_db_doc_count.erl51
-rw-r--r--src/fabric/src/fabric_db_info.erl75
-rw-r--r--src/fabric/src/fabric_db_meta.erl4
-rw-r--r--src/fabric/src/fabric_db_partition_info.erl2
-rw-r--r--src/fabric/src/fabric_db_update_listener.erl4
-rw-r--r--src/fabric/src/fabric_design_doc_count.erl51
-rw-r--r--src/fabric/src/fabric_dict.erl3
-rw-r--r--src/fabric/src/fabric_doc_open.erl4
-rw-r--r--src/fabric/src/fabric_group_info.erl73
-rw-r--r--src/fabric/src/fabric_ring.erl519
-rw-r--r--src/fabric/src/fabric_streams.erl128
-rw-r--r--src/fabric/src/fabric_util.erl12
-rw-r--r--src/fabric/src/fabric_view.erl209
-rw-r--r--src/fabric/src/fabric_view_all_docs.erl4
-rw-r--r--src/fabric/src/fabric_view_changes.erl442
-rw-r--r--src/fabric/src/fabric_view_map.erl7
-rw-r--r--src/fabric/src/fabric_view_reduce.erl7
-rw-r--r--src/mem3/include/mem3.hrl5
-rw-r--r--src/mem3/src/mem3.erl3
-rw-r--r--src/mem3/src/mem3_httpd.erl2
-rw-r--r--src/mem3/src/mem3_shards.erl45
-rw-r--r--src/mem3/src/mem3_sync_event_listener.erl2
-rw-r--r--src/mem3/src/mem3_util.erl282
-rw-r--r--src/mem3/test/mem3_ring_prop_tests.erl144
-rw-r--r--src/rexi/src/rexi.erl14
-rw-r--r--src/rexi/src/rexi_server.erl28
29 files changed, 1719 insertions, 422 deletions
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index a40f1131e..ab38eb895 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -1526,6 +1526,17 @@ calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
calculate_start_seq(Db, Node, {Seq, Uuid}) ->
% Treat the current node as the epoch node
calculate_start_seq(Db, Node, {Seq, Uuid, Node});
+calculate_start_seq(Db, _Node, {Seq, {split, Uuid}, EpochNode}) ->
+ case is_owner(EpochNode, Seq, get_epochs(Db)) of
+ true ->
+ % Find last replicated sequence from split source to target
+ mem3_rep:find_split_target_seq(Db, EpochNode, Uuid, Seq);
+ false ->
+ couch_log:warning("~p calculate_start_seq not owner "
+ "db: ~p, seq: ~p, uuid: ~p, epoch_node: ~p, epochs: ~p",
+ [?MODULE, Db#db.name, Seq, Uuid, EpochNode, get_epochs(Db)]),
+ 0
+ end;
calculate_start_seq(Db, _Node, {Seq, Uuid, EpochNode}) ->
case is_prefix(Uuid, get_uuid(Db)) of
true ->
diff --git a/src/couch/src/couch_multidb_changes.erl b/src/couch/src/couch_multidb_changes.erl
index b6a7873fb..ec805632a 100644
--- a/src/couch/src/couch_multidb_changes.erl
+++ b/src/couch/src/couch_multidb_changes.erl
@@ -185,7 +185,8 @@ register_with_event_server(Server) ->
-spec db_callback(created | deleted | updated, binary(), #state{}) -> #state{}.
db_callback(created, DbName, #state{mod = Mod, ctx = Ctx} = State) ->
- State#state{ctx = Mod:db_created(DbName, Ctx)};
+ NewState = State#state{ctx = Mod:db_created(DbName, Ctx)},
+ resume_scan(DbName, NewState);
db_callback(deleted, DbName, #state{mod = Mod, ctx = Ctx} = State) ->
State#state{ctx = Mod:db_deleted(DbName, Ctx)};
db_callback(updated, DbName, State) ->
@@ -446,7 +447,8 @@ t_handle_call_checkpoint_existing() ->
t_handle_info_created() ->
?_test(begin
- State = mock_state(),
+ Tid = mock_ets(),
+ State = mock_state(Tid),
handle_info_check({'$couch_event', ?DBNAME, created}, State),
?assert(meck:validate(?MOD)),
?assert(meck:called(?MOD, db_created, [?DBNAME, zig]))
diff --git a/src/fabric/include/fabric.hrl b/src/fabric/include/fabric.hrl
index be1d63926..2a4da8bcf 100644
--- a/src/fabric/include/fabric.hrl
+++ b/src/fabric/include/fabric.hrl
@@ -36,8 +36,10 @@
-record(stream_acc, {
workers,
+ ready,
start_fun,
- replacements
+ replacements,
+ ring_opts
}).
-record(view_row, {key, id, value, doc, worker}).
diff --git a/src/fabric/src/fabric_db_doc_count.erl b/src/fabric/src/fabric_db_doc_count.erl
index a0fd3ecd1..a91014b7c 100644
--- a/src/fabric/src/fabric_db_doc_count.erl
+++ b/src/fabric/src/fabric_db_doc_count.erl
@@ -22,7 +22,7 @@ go(DbName) ->
Shards = mem3:shards(DbName),
Workers = fabric_util:submit_jobs(Shards, get_doc_count, []),
RexiMon = fabric_util:create_monitors(Shards),
- Acc0 = {fabric_dict:init(Workers, nil), 0},
+ Acc0 = {fabric_dict:init(Workers, nil), []},
try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
{timeout, {WorkersDict, _}} ->
DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
@@ -34,38 +34,29 @@ go(DbName) ->
rexi_monitor:stop(RexiMon)
end.
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
- case fabric_util:remove_down_workers(Counters, NodeRef) of
- {ok, NewCounters} ->
- {ok, {NewCounters, Acc}};
- error ->
- {error, {nodedown, <<"progress not possible">>}}
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Resps}) ->
+ case fabric_ring:node_down(NodeRef, Counters, Resps) of
+ {ok, Counters1} -> {ok, {Counters1, Resps}};
+ error -> {error, {nodedown, <<"progress not possible">>}}
end;
-handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
- NewCounters = lists:keydelete(Shard, #shard.ref, Counters),
- case fabric_view:is_progress_possible(NewCounters) of
- true ->
- {ok, {NewCounters, Acc}};
- false ->
- {error, Reason}
+handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps}) ->
+ case fabric_ring:handle_error(Shard, Counters, Resps) of
+ {ok, Counters1} -> {ok, {Counters1, Resps}};
+ error -> {error, Reason}
end;
-handle_message({ok, Count}, Shard, {Counters, Acc}) ->
- case fabric_dict:lookup_element(Shard, Counters) of
- undefined ->
- % already heard from someone else in this range
- {ok, {Counters, Acc}};
- nil ->
- C1 = fabric_dict:store(Shard, ok, Counters),
- C2 = fabric_view:remove_overlapping_shards(Shard, C1),
- case fabric_dict:any(nil, C2) of
- true ->
- {ok, {C2, Count+Acc}};
- false ->
- {stop, Count+Acc}
- end
+handle_message({ok, Count}, Shard, {Counters, Resps}) ->
+ case fabric_ring:handle_response(Shard, Count, Counters, Resps) of
+ {ok, {Counters1, Resps1}} ->
+ {ok, {Counters1, Resps1}};
+ {stop, Resps1} ->
+ Total = fabric_dict:fold(fun(_, C, A) -> A + C end, 0, Resps1),
+ {stop, Total}
end;
-handle_message(_, _, Acc) ->
- {ok, Acc}.
+handle_message(Reason, Shard, {Counters, Resps}) ->
+ case fabric_ring:handle_error(Shard, Counters, Resps) of
+ {ok, Counters1} -> {ok, {Counters1, Resps}};
+ error -> {error, Reason}
+ end.
diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl
index fe93878b5..bb7a3530e 100644
--- a/src/fabric/src/fabric_db_info.erl
+++ b/src/fabric/src/fabric_db_info.erl
@@ -23,7 +23,8 @@ go(DbName) ->
RexiMon = fabric_util:create_monitors(Shards),
Fun = fun handle_message/3,
{ok, ClusterInfo} = get_cluster_info(Shards),
- Acc0 = {fabric_dict:init(Workers, nil), [], [{cluster, ClusterInfo}]},
+ CInfo = [{cluster, ClusterInfo}],
+ Acc0 = {fabric_dict:init(Workers, nil), [], CInfo},
try
case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
@@ -46,49 +47,47 @@ go(DbName) ->
rexi_monitor:stop(RexiMon)
end.
-handle_message({rexi_DOWN,
- _, {_,NodeRef},_}, _Shard, {Counters, PseqAcc, Acc}) ->
- case fabric_util:remove_down_workers(Counters, NodeRef) of
- {ok, NewCounters} ->
- {ok, {NewCounters, PseqAcc, Acc}};
- error ->
- {error, {nodedown, <<"progress not possible">>}}
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _, {Counters, Resps, CInfo}) ->
+ case fabric_ring:node_down(NodeRef, Counters, Resps) of
+ {ok, Counters1} -> {ok, {Counters1, Resps, CInfo}};
+ error -> {error, {nodedown, <<"progress not possible">>}}
+ end;
+
+handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps, CInfo}) ->
+ case fabric_ring:handle_error(Shard, Counters, Resps) of
+ {ok, Counters1} -> {ok, {Counters1, Resps, CInfo}};
+ error -> {error, Reason}
end;
-handle_message({rexi_EXIT, Reason}, Shard, {Counters, PseqAcc, Acc}) ->
- NewCounters = fabric_dict:erase(Shard, Counters),
- case fabric_view:is_progress_possible(NewCounters) of
- true ->
- {ok, {NewCounters, PseqAcc, Acc}};
- false ->
- {error, Reason}
+handle_message({ok, Info}, Shard, {Counters, Resps, CInfo}) ->
+ case fabric_ring:handle_response(Shard, Info, Counters, Resps) of
+ {ok, {Counters1, Resps1}} ->
+ {ok, {Counters1, Resps1, CInfo}};
+ {stop, Resps1} ->
+ {stop, build_final_response(CInfo, Shard#shard.dbname, Resps1)}
end;
-handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, PseqAcc, Acc}) ->
- case fabric_dict:lookup_element(Shard, Counters) of
- undefined ->
- % already heard from someone else in this range
- {ok, {Counters, PseqAcc, Acc}};
- nil ->
+handle_message(Reason, Shard, {Counters, Resps, CInfo}) ->
+ case fabric_ring:handle_error(Shard, Counters, Resps) of
+ {ok, Counters1} -> {ok, {Counters1, Resps, CInfo}};
+ error -> {error, Reason}
+ end.
+
+
+build_final_response(CInfo, DbName, Responses) ->
+ AccF = fabric_dict:fold(fun(Shard, Info, {Seqs, PSeqs, Infos}) ->
Seq = couch_util:get_value(update_seq, Info),
- C1 = fabric_dict:store(Shard, Seq, Counters),
- C2 = fabric_view:remove_overlapping_shards(Shard, C1),
PSeq = couch_util:get_value(purge_seq, Info),
- NewPseqAcc = [{Shard, PSeq}|PseqAcc],
- case fabric_dict:any(nil, C2) of
- true ->
- {ok, {C2, NewPseqAcc, [Info|Acc]}};
- false ->
- {stop, [
- {db_name,Name},
- {purge_seq, fabric_view_changes:pack_seqs(NewPseqAcc)},
- {update_seq, fabric_view_changes:pack_seqs(C2)} |
- merge_results(lists:flatten([Info|Acc]))
- ]}
- end
- end;
-handle_message(_, _, Acc) ->
- {ok, Acc}.
+ {[{Shard, Seq} | Seqs], [{Shard, PSeq} | PSeqs], [Info | Infos]}
+ end, {[], [], []}, Responses),
+ {Seqs, PSeqs, Infos} = AccF,
+ PackedSeq = fabric_view_changes:pack_seqs(Seqs),
+ PackedPSeq = fabric_view_changes:pack_seqs(PSeqs),
+ MergedInfos = merge_results(lists:flatten([CInfo | Infos])),
+ Sequences = [{purge_seq, PackedPSeq}, {update_seq, PackedSeq}],
+ [{db_name, DbName}] ++ Sequences ++ MergedInfos.
+
merge_results(Info) ->
Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end,
diff --git a/src/fabric/src/fabric_db_meta.erl b/src/fabric/src/fabric_db_meta.erl
index 26e1b3752..348b06d51 100644
--- a/src/fabric/src/fabric_db_meta.erl
+++ b/src/fabric/src/fabric_db_meta.erl
@@ -135,9 +135,9 @@ check_sec_set_int(NumWorkers, SetWorkers) ->
true -> throw(no_majority);
false -> ok
end,
- % Hack to reuse fabric_view:is_progress_possible/1
+ % Hack to reuse fabric_ring:is_progress_possible/1
FakeCounters = [{S, 0} || S <- SetWorkers],
- case fabric_view:is_progress_possible(FakeCounters) of
+ case fabric_ring:is_progress_possible(FakeCounters) of
false -> throw(no_ring);
true -> ok
end,
diff --git a/src/fabric/src/fabric_db_partition_info.erl b/src/fabric/src/fabric_db_partition_info.erl
index 97e669a52..2978832f0 100644
--- a/src/fabric/src/fabric_db_partition_info.erl
+++ b/src/fabric/src/fabric_db_partition_info.erl
@@ -52,7 +52,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
NewCounters = fabric_dict:erase(Shard, Counters),
- case fabric_view:is_progress_possible(NewCounters) of
+ case fabric_ring:is_progress_possible(NewCounters) of
true ->
{ok, {NewCounters, Acc}};
false ->
diff --git a/src/fabric/src/fabric_db_update_listener.erl b/src/fabric/src/fabric_db_update_listener.erl
index 13d8b9e7b..fb2937be1 100644
--- a/src/fabric/src/fabric_db_update_listener.erl
+++ b/src/fabric/src/fabric_db_update_listener.erl
@@ -111,7 +111,7 @@ cleanup_monitor(Parent, Ref, Notifiers) ->
end.
stop_update_notifiers(Notifiers) ->
- [rexi:kill(Node, Ref) || #worker{node=Node, ref=Ref} <- Notifiers].
+ rexi:kill_all([{N, Ref} || #worker{node = N, ref = Ref} <- Notifiers]).
stop({Pid, Ref}) ->
erlang:send(Pid, {Ref, done}).
@@ -169,7 +169,7 @@ handle_message(done, _, _) ->
handle_error(Node, Reason, #acc{shards = Shards} = Acc) ->
Rest = lists:filter(fun(#shard{node = N}) -> N /= Node end, Shards),
- case fabric_view:is_progress_possible([{R, nil} || R <- Rest]) of
+ case fabric_ring:is_progress_possible([{R, nil} || R <- Rest]) of
true ->
{ok, Acc#acc{shards = Rest}};
false ->
diff --git a/src/fabric/src/fabric_design_doc_count.erl b/src/fabric/src/fabric_design_doc_count.erl
index 22d03c5d4..b0efc3007 100644
--- a/src/fabric/src/fabric_design_doc_count.erl
+++ b/src/fabric/src/fabric_design_doc_count.erl
@@ -22,7 +22,7 @@ go(DbName) ->
Shards = mem3:shards(DbName),
Workers = fabric_util:submit_jobs(Shards, get_design_doc_count, []),
RexiMon = fabric_util:create_monitors(Shards),
- Acc0 = {fabric_dict:init(Workers, nil), 0},
+ Acc0 = {fabric_dict:init(Workers, nil), []},
try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
{timeout, {WorkersDict, _}} ->
DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
@@ -34,36 +34,29 @@ go(DbName) ->
rexi_monitor:stop(RexiMon)
end.
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
- case fabric_util:remove_down_workers(Counters, NodeRef) of
- {ok, NewCounters} ->
- {ok, {NewCounters, Acc}};
- error ->
- {error, {nodedown, <<"progress not possible">>}}
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Resps}) ->
+ case fabric_ring:node_down(NodeRef, Counters, Resps) of
+ {ok, Counters1} -> {ok, {Counters1, Resps}};
+ error -> {error, {nodedown, <<"progress not possible">>}}
end;
-handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
- NewCounters = lists:keydelete(Shard, #shard.ref, Counters),
- case fabric_view:is_progress_possible(NewCounters) of
- true ->
- {ok, {NewCounters, Acc}};
- false ->
- {error, Reason}
+handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps}) ->
+ case fabric_ring:handle_error(Shard, Counters, Resps) of
+ {ok, Counters1} -> {ok, {Counters1, Resps}};
+ error -> {error, Reason}
end;
-handle_message({ok, Count}, Shard, {Counters, Acc}) ->
- case fabric_dict:lookup_element(Shard, Counters) of
- undefined ->
- {ok, {Counters, Acc}};
- nil ->
- C1 = fabric_dict:store(Shard, ok, Counters),
- C2 = fabric_view:remove_overlapping_shards(Shard, C1),
- case fabric_dict:any(nil, C2) of
- true ->
- {ok, {C2, Count+Acc}};
- false ->
- {stop, Count+Acc}
- end
+handle_message({ok, Count}, Shard, {Counters, Resps}) ->
+ case fabric_ring:handle_response(Shard, Count, Counters, Resps) of
+ {ok, {Counters1, Resps1}} ->
+ {ok, {Counters1, Resps1}};
+ {stop, Resps1} ->
+ Total = fabric_dict:fold(fun(_, C, A) -> A + C end, 0, Resps1),
+ {stop, Total}
end;
-handle_message(_, _, Acc) ->
- {ok, Acc}.
+
+handle_message(Reason, Shard, {Counters, Resps}) ->
+ case fabric_ring:handle_error(Shard, Counters, Resps) of
+ {ok, Counters1} -> {ok, {Counters1, Resps}};
+ error -> {error, Reason}
+ end.
diff --git a/src/fabric/src/fabric_dict.erl b/src/fabric/src/fabric_dict.erl
index a336b47b0..b63ed2095 100644
--- a/src/fabric/src/fabric_dict.erl
+++ b/src/fabric/src/fabric_dict.erl
@@ -56,3 +56,6 @@ fold(Fun, Acc0, Dict) ->
to_list(Dict) ->
orddict:to_list(Dict).
+
+from_list(KVList) when is_list(KVList) ->
+ orddict:from_list(KVList).
diff --git a/src/fabric/src/fabric_doc_open.erl b/src/fabric/src/fabric_doc_open.erl
index aafdcfb79..743ad8c74 100644
--- a/src/fabric/src/fabric_doc_open.erl
+++ b/src/fabric/src/fabric_doc_open.erl
@@ -322,7 +322,7 @@ t_handle_message_reply() ->
Acc0 = #acc{workers=Workers, r=2, replies=[]},
?_test(begin
- meck:expect(rexi, kill, fun(_, _) -> ok end),
+ meck:expect(rexi, kill_all, fun(_) -> ok end),
% Test that we continue when we haven't met R yet
?assertMatch(
@@ -416,7 +416,7 @@ t_store_node_revs() ->
InitAcc = #acc{workers = [W1, W2, W3], replies = [], r = 2},
?_test(begin
- meck:expect(rexi, kill, fun(_, _) -> ok end),
+ meck:expect(rexi, kill_all, fun(_) -> ok end),
% Simple case
{ok, #acc{node_revs = NodeRevs1}} = handle_message(Foo1, W1, InitAcc),
diff --git a/src/fabric/src/fabric_group_info.erl b/src/fabric/src/fabric_group_info.erl
index 8383a7e28..be507420e 100644
--- a/src/fabric/src/fabric_group_info.erl
+++ b/src/fabric/src/fabric_group_info.erl
@@ -27,7 +27,8 @@ go(DbName, #doc{id=DDocId}) ->
Ushards = mem3:ushards(DbName),
Workers = fabric_util:submit_jobs(Shards, group_info, [DDocId]),
RexiMon = fabric_util:create_monitors(Shards),
- Acc = acc_init(Workers, Ushards),
+ USet = sets:from_list([{Id, N} || #shard{name = Id, node = N} <- Ushards]),
+ Acc = {fabric_dict:init(Workers, nil), [], USet},
try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc) of
{timeout, {WorkersDict, _, _}} ->
DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
@@ -39,56 +40,42 @@ go(DbName, #doc{id=DDocId}) ->
rexi_monitor:stop(RexiMon)
end.
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard,
- {Counters, Acc, Ushards}) ->
- case fabric_util:remove_down_workers(Counters, NodeRef) of
- {ok, NewCounters} ->
- {ok, {NewCounters, Acc, Ushards}};
- error ->
- {error, {nodedown, <<"progress not possible">>}}
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _, {Counters, Resps, USet}) ->
+ case fabric_ring:node_down(NodeRef, Counters, Resps) of
+ {ok, Counters1} -> {ok, {Counters1, Resps, USet}};
+ error -> {error, {nodedown, <<"progress not possible">>}}
end;
-handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc, Ushards}) ->
- NewCounters = lists:keydelete(Shard, #shard.ref, Counters),
- case fabric_view:is_progress_possible(NewCounters) of
- true ->
- {ok, {NewCounters, Acc, Ushards}};
- false ->
- {error, Reason}
+handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps, USet}) ->
+ case fabric_ring:handle_error(Shard, Counters, Resps) of
+ {ok, Counters1} -> {ok, {Counters1, Resps, USet}};
+ error -> {error, Reason}
end;
-handle_message({ok, Info}, Shard, {Counters0, Acc, Ushards}) ->
- case fabric_dict:lookup_element(Shard, Counters0) of
- undefined ->
- % already heard from other node in this range
- {ok, {Counters0, Acc, Ushards}};
- nil ->
- NewAcc = append_result(Info, Shard, Acc, Ushards),
- Counters1 = fabric_dict:store(Shard, ok, Counters0),
- Counters = fabric_view:remove_overlapping_shards(Shard, Counters1),
- case is_complete(Counters) of
- false ->
- {ok, {Counters, NewAcc, Ushards}};
- true ->
- Pending = aggregate_pending(NewAcc),
- Infos = get_infos(NewAcc),
- Results = [{updates_pending, {Pending}} | merge_results(Infos)],
- {stop, Results}
- end
+handle_message({ok, Info}, Shard, {Counters, Resps, USet}) ->
+ case fabric_ring:handle_response(Shard, Info, Counters, Resps) of
+ {ok, {Counters1, Resps1}} ->
+ {ok, {Counters1, Resps1, USet}};
+ {stop, Resps1} ->
+ {stop, build_final_response(USet, Resps1)}
end;
-handle_message(_, _, Acc) ->
- {ok, Acc}.
-acc_init(Workers, Ushards) ->
- Set = sets:from_list([{Id, N} || #shard{name = Id, node = N} <- Ushards]),
- {fabric_dict:init(Workers, nil), dict:new(), Set}.
+handle_message(Reason, Shard, {Counters, Resps, USet}) ->
+ case fabric_ring:handle_error(Shard, Counters, Resps) of
+ {ok, Counters1} -> {ok, {Counters1, Resps, USet}};
+ error -> {error, Reason}
+ end.
+
-is_complete(Counters) ->
- not fabric_dict:any(nil, Counters).
+build_final_response(USet, Responses) ->
+ AccF = fabric_dict:fold(fun(#shard{name = Id, node = Node}, Info, Acc) ->
+ IsPreferred = sets:is_element({Id, Node}, USet),
+ dict:append(Id, {Node, IsPreferred, Info}, Acc)
+ end, dict:new(), Responses),
+ Pending = aggregate_pending(AccF),
+ Infos = get_infos(AccF),
+ [{updates_pending, {Pending}} | merge_results(Infos)].
-append_result(Info, #shard{name = Name, node = Node}, Acc, Ushards) ->
- IsPreferred = sets:is_element({Name, Node}, Ushards),
- dict:append(Name, {Node, IsPreferred, Info}, Acc).
get_infos(Acc) ->
Values = [V || {_, V} <- dict:to_list(Acc)],
diff --git a/src/fabric/src/fabric_ring.erl b/src/fabric/src/fabric_ring.erl
new file mode 100644
index 000000000..110edb9ab
--- /dev/null
+++ b/src/fabric/src/fabric_ring.erl
@@ -0,0 +1,519 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_ring).
+
+
+-export([
+ is_progress_possible/1,
+ is_progress_possible/2,
+ get_shard_replacements/2,
+ node_down/3,
+ node_down/4,
+ handle_error/3,
+ handle_error/4,
+ handle_response/4,
+ handle_response/5
+]).
+
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+-type fabric_dict() :: [{#shard{}, any()}].
+-type ring_opts() :: [atom() | tuple()].
+
+
+%% @doc looks for a fully covered keyrange in the list of counters
+-spec is_progress_possible(fabric_dict()) -> boolean().
+is_progress_possible(Counters) ->
+ is_progress_possible(Counters, []).
+
+
+%% @doc looks for a fully covered keyrange in the list of counters
+%% This version take ring option to configure how progress will
+%% be checked. By default, [], checks that the full ring is covered.
+-spec is_progress_possible(fabric_dict(), ring_opts()) -> boolean().
+is_progress_possible(Counters, RingOpts) ->
+ is_progress_possible(Counters, [], 0, ?RING_END, RingOpts).
+
+
+-spec get_shard_replacements(binary(), [#shard{}]) -> [#shard{}].
+get_shard_replacements(DbName, UsedShards0) ->
+ % We only want to generate a replacements list from shards
+ % that aren't already used.
+ AllLiveShards = mem3:live_shards(DbName, [node() | nodes()]),
+ UsedShards = [S#shard{ref=undefined} || S <- UsedShards0],
+ get_shard_replacements_int(AllLiveShards -- UsedShards, UsedShards).
+
+
+-spec node_down(node(), fabric_dict(), fabric_dict()) ->
+ {ok, fabric_dict()} | error.
+node_down(Node, Workers, Responses) ->
+ node_down(Node, Workers, Responses, []).
+
+
+-spec node_down(node(), fabric_dict(), fabric_dict(), ring_opts()) ->
+ {ok, fabric_dict()} | error.
+node_down(Node, Workers, Responses, RingOpts) ->
+ {B, E} = range_bounds(Workers, Responses),
+ Workers1 = fabric_dict:filter(fun(#shard{node = N}, _) ->
+ N =/= Node
+ end, Workers),
+ case is_progress_possible(Workers1, Responses, B, E, RingOpts) of
+ true -> {ok, Workers1};
+ false -> error
+ end.
+
+
+-spec handle_error(#shard{}, fabric_dict(), fabric_dict()) ->
+ {ok, fabric_dict()} | error.
+handle_error(Shard, Workers, Responses) ->
+ handle_error(Shard, Workers, Responses, []).
+
+
+-spec handle_error(#shard{}, fabric_dict(), fabric_dict(), ring_opts()) ->
+ {ok, fabric_dict()} | error.
+handle_error(Shard, Workers, Responses, RingOpts) ->
+ {B, E} = range_bounds(Workers, Responses),
+ Workers1 = fabric_dict:erase(Shard, Workers),
+ case is_progress_possible(Workers1, Responses, B, E, RingOpts) of
+ true -> {ok, Workers1};
+ false -> error
+ end.
+
+
+-spec handle_response(#shard{}, any(), fabric_dict(), fabric_dict()) ->
+ {ok, {fabric_dict(), fabric_dict()}} | {stop, fabric_dict()}.
+handle_response(Shard, Response, Workers, Responses) ->
+ handle_response(Shard, Response, Workers, Responses, []).
+
+
+-spec handle_response(#shard{}, any(), fabric_dict(), fabric_dict(),
+ ring_opts()) ->
+ {ok, {fabric_dict(), fabric_dict()}} | {stop, fabric_dict()}.
+handle_response(Shard, Response, Workers, Responses, RingOpts) ->
+ handle_response(Shard, Response, Workers, Responses, RingOpts,
+ fun stop_workers/1).
+
+
+% Worker response handler. Gets reponses from shard and puts them in the list
+% until they complete a full ring. Then kill unused responses and remaining
+% workers.
+%
+% How a ring "completes" is driven by RingOpts:
+%
+% * When RingOpts is [] (the default case) responses must form a "clean"
+% ring, where all copies at the start of the range and end of the range must
+% have the same boundary values.
+%
+% * When RingOpts is [{any, [#shard{}]}] responses are accepted from any of
+% the provided list of shards. This type of ring might be used when querying
+% a partitioned database. As soon as a result from any of the shards
+% arrives, result collection stops.
+%
+handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) ->
+ Workers1 = fabric_dict:erase(Shard, Workers),
+ case RingOpts of
+ [] ->
+ #shard{range = [B, E]} = Shard,
+ Responses1 = [{{B, E}, Shard, Response} | Responses],
+ handle_response_ring(Workers1, Responses1, CleanupCb);
+ [{any, Any}] ->
+ handle_response_any(Shard, Response, Workers1, Any, CleanupCb)
+ end.
+
+
+handle_response_ring(Workers, Responses, CleanupCb) ->
+ {MinB, MaxE} = range_bounds(Workers, Responses),
+ Ranges = lists:map(fun({R, _, _}) -> R end, Responses),
+ case mem3_util:get_ring(Ranges, MinB, MaxE) of
+ [] ->
+ {ok, {Workers, Responses}};
+ Ring ->
+ % Return one response per range in the ring. The
+ % response list is reversed before sorting so that the
+ % first shard copy to reply is first. We use keysort
+ % because it is documented as being stable so that
+ % we keep the relative order of duplicate shards
+ SortedResponses = lists:keysort(1, lists:reverse(Responses)),
+ UsedResponses = get_responses(Ring, SortedResponses),
+ % Kill all the remaining workers as well as the redunant responses
+ stop_unused_workers(Workers, Responses, UsedResponses, CleanupCb),
+ {stop, fabric_dict:from_list(UsedResponses)}
+ end.
+
+
+handle_response_any(Shard, Response, Workers, Any, CleanupCb) ->
+ case lists:member(Shard#shard{ref = undefined}, Any) of
+ true ->
+ stop_unused_workers(Workers, [], [], CleanupCb),
+ {stop, fabric_dict:from_list([{Shard, Response}])};
+ false ->
+ {ok, {Workers, []}}
+ end.
+
+
+% Check if workers still waiting and the already received responses could
+% still form a continous range. The range won't always be the full ring, and
+% the bounds are computed based on the minimum and maximum interval beginning
+% and ends.
+%
+% There is also a special case where even if the ring cannot be formed, but
+% there is an overlap between all the shards, then it's considered that
+% progress can still be made. This is essentially to allow for split
+% partitioned shards where one shard copy on a node was split the set of ranges
+% might look like: 00-ff, 00-ff, 07-ff. Even if both 00-ff workers exit,
+% progress can still be made with the remaining 07-ff copy.
+%
+-spec is_progress_possible(fabric_dict(), [{any(), #shard{}, any()}],
+ non_neg_integer(), non_neg_integer(), ring_opts()) -> boolean().
+is_progress_possible([], [], _, _, _) ->
+ false;
+
+is_progress_possible(Counters, Responses, MinB, MaxE, []) ->
+ ResponseRanges = lists:map(fun({{B, E}, _, _}) -> {B, E} end, Responses),
+ Ranges = fabric_util:worker_ranges(Counters) ++ ResponseRanges,
+ mem3_util:get_ring(Ranges, MinB, MaxE) =/= [];
+
+is_progress_possible(Counters, Responses, _, _, [{any, AnyShards}]) ->
+ InAny = fun(S) -> lists:member(S#shard{ref = undefined}, AnyShards) end,
+ case fabric_dict:filter(fun(S, _) -> InAny(S) end, Counters) of
+ [] ->
+ case lists:filter(fun({_, S, _}) -> InAny(S) end, Responses) of
+ [] -> false;
+ [_ | _] -> true
+ end;
+ [_ | _] ->
+ true
+ end.
+
+
+get_shard_replacements_int(UnusedShards, UsedShards) ->
+ % If we have more than one copy of a range then we don't
+ % want to try and add a replacement to any copy.
+ RangeCounts = lists:foldl(fun(#shard{range=R}, Acc) ->
+ dict:update_counter(R, 1, Acc)
+ end, dict:new(), UsedShards),
+
+ % For each seq shard range with a count of 1, find any
+ % possible replacements from the unused shards. The
+ % replacement list is keyed by range.
+ lists:foldl(fun(#shard{range = [B, E] = Range}, Acc) ->
+ case dict:find(Range, RangeCounts) of
+ {ok, 1} ->
+ Repls = mem3_util:non_overlapping_shards(UnusedShards, B, E),
+ % Only keep non-empty lists of replacements
+ if Repls == [] -> Acc; true ->
+ [{Range, Repls} | Acc]
+ end;
+ _ ->
+ Acc
+ end
+ end, [], UsedShards).
+
+
+range_bounds(Workers, Responses) ->
+ RespRanges = lists:map(fun({R, _, _}) -> R end, Responses),
+ Ranges = fabric_util:worker_ranges(Workers) ++ RespRanges,
+ {Bs, Es} = lists:unzip(Ranges),
+ {lists:min(Bs), lists:max(Es)}.
+
+
+get_responses([], _) ->
+ [];
+
+get_responses([Range | Ranges], [{Range, Shard, Value} | Resps]) ->
+ [{Shard, Value} | get_responses(Ranges, Resps)];
+
+get_responses(Ranges, [_DupeRangeResp | Resps]) ->
+ get_responses(Ranges, Resps).
+
+
+stop_unused_workers(_, _, _, undefined) ->
+ ok;
+
+stop_unused_workers(Workers, AllResponses, UsedResponses, CleanupCb) ->
+ WorkerShards = [S || {S, _} <- Workers],
+ Used = [S || {S, _} <- UsedResponses],
+ Unused = [S || {_, S, _} <- AllResponses, not lists:member(S, Used)],
+ CleanupCb(WorkerShards ++ Unused).
+
+
+stop_workers(Shards) when is_list(Shards) ->
+ rexi:kill_all([{Node, Ref} || #shard{node = Node, ref = Ref} <- Shards]).
+
+
+% Unit tests
+
+is_progress_possible_full_range_test() ->
+ % a base case
+ ?assertEqual(false, is_progress_possible([], [], 0, 0, [])),
+ T1 = [[0, ?RING_END]],
+ ?assertEqual(true, is_progress_possible(mk_cnts(T1))),
+ T2 = [[0, 10], [11, 20], [21, ?RING_END]],
+ ?assertEqual(true, is_progress_possible(mk_cnts(T2))),
+ % gap
+ T3 = [[0, 10], [12, ?RING_END]],
+ ?assertEqual(false, is_progress_possible(mk_cnts(T3))),
+ % outside range
+ T4 = [[1, 10], [11, 20], [21, ?RING_END]],
+ ?assertEqual(false, is_progress_possible(mk_cnts(T4))),
+ % outside range
+ T5 = [[0, 10], [11, 20], [21, ?RING_END + 1]],
+ ?assertEqual(false, is_progress_possible(mk_cnts(T5))),
+ % possible progress but with backtracking
+ T6 = [[0, 10], [11, 20], [0, 5], [6, 21], [21, ?RING_END]],
+ ?assertEqual(true, is_progress_possible(mk_cnts(T6))),
+ % not possible, overlap is not exact
+ T7 = [[0, 10], [13, 20], [21, ?RING_END], [9, 12]],
+ ?assertEqual(false, is_progress_possible(mk_cnts(T7))).
+
+
+is_progress_possible_with_responses_test() ->
+ C1 = mk_cnts([[0, ?RING_END]]),
+ ?assertEqual(true, is_progress_possible(C1, [], 0, ?RING_END, [])),
+ % check for gaps
+ C2 = mk_cnts([[5, 6], [7, 8]]),
+ ?assertEqual(true, is_progress_possible(C2, [], 5, 8, [])),
+ ?assertEqual(false, is_progress_possible(C2, [], 4, 8, [])),
+ ?assertEqual(false, is_progress_possible(C2, [], 5, 7, [])),
+ ?assertEqual(false, is_progress_possible(C2, [], 4, 9, [])),
+ % check for uneven shard range copies
+ C3 = mk_cnts([[2, 5], [2, 10]]),
+ ?assertEqual(true, is_progress_possible(C3, [], 2, 10, [])),
+ ?assertEqual(false, is_progress_possible(C3, [], 2, 11, [])),
+ ?assertEqual(false, is_progress_possible(C3, [], 3, 10, [])),
+ % they overlap but still not a proper ring
+ C4 = mk_cnts([[2, 4], [3, 7], [6, 10]]),
+ ?assertEqual(false, is_progress_possible(C4, [], 2, 10, [])),
+ % some of the ranges are in responses
+ RS1 = mk_resps([{"n1", 7, 8, 42}]),
+ C5 = mk_cnts([[5, 6]]),
+ ?assertEqual(true, is_progress_possible(C5, RS1, 5, 8, [])),
+ ?assertEqual(false, is_progress_possible([], RS1, 5, 8, [])),
+ ?assertEqual(true, is_progress_possible([], RS1, 7, 8, [])).
+
+
+is_progress_possible_with_ring_opts_test() ->
+ Opts = [{any, [mk_shard("n1", [0, 5]), mk_shard("n2", [3, 10])]}],
+ C1 = [{mk_shard("n1", [0, ?RING_END]), nil}],
+ RS1 = mk_resps([{"n1", 3, 10, 42}]),
+ ?assertEqual(false, is_progress_possible(C1, [], 0, ?RING_END, Opts)),
+ ?assertEqual(false, is_progress_possible([], [], 0, ?RING_END, Opts)),
+ ?assertEqual(false, is_progress_possible([], RS1, 0, ?RING_END, Opts)),
+ % explicitly accept only the shard specified in the ring options
+ ?assertEqual(false, is_progress_possible([], RS1, 3, 10, [{any, []}])),
+ % need to match the node exactly
+ ?assertEqual(false, is_progress_possible([], RS1, 3, 10, Opts)),
+ RS2 = mk_resps([{"n2", 3, 10, 42}]),
+ ?assertEqual(true, is_progress_possible([], RS2, 3, 10, Opts)),
+ % assert that counters can fill the ring not just the response
+ C2 = [{mk_shard("n1", [0, 5]), nil}],
+ ?assertEqual(true, is_progress_possible(C2, [], 0, ?RING_END, Opts)).
+
+
+get_shard_replacements_test() ->
+ Unused = [mk_shard(N, [B, E]) || {N, B, E} <- [
+ {"n1", 11, 20}, {"n1", 21, ?RING_END},
+ {"n2", 0, 4}, {"n2", 5, 10}, {"n2", 11, 20},
+ {"n3", 0, 21, ?RING_END}
+ ]],
+ Used = [mk_shard(N, [B, E]) || {N, B, E} <- [
+ {"n2", 21, ?RING_END},
+ {"n3", 0, 10}, {"n3", 11, 20}
+ ]],
+ Res = lists:sort(get_shard_replacements_int(Unused, Used)),
+ % Notice that [0, 10] range can be replaces by spawning the
+ % [0, 4] and [5, 10] workers on n1
+ Expect = [
+ {[0, 10], [mk_shard("n2", [0, 4]), mk_shard("n2", [5, 10])]},
+ {[11, 20], [mk_shard("n1", [11, 20]), mk_shard("n2", [11, 20])]},
+ {[21, ?RING_END], [mk_shard("n1", [21, ?RING_END])]}
+ ],
+ ?assertEqual(Expect, Res).
+
+
+handle_response_basic_test() ->
+ Shard1 = mk_shard("n1", [0, 1]),
+ Shard2 = mk_shard("n1", [2, ?RING_END]),
+
+ Workers1 = fabric_dict:init([Shard1, Shard2], nil),
+
+ Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
+ ?assertMatch({ok, {_, _}}, Result1),
+ {ok, {Workers2, Responses1}} = Result1,
+ ?assertEqual(fabric_dict:erase(Shard1, Workers1), Workers2),
+ ?assertEqual([{{0, 1}, Shard1, 42}], Responses1),
+
+ Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
+ ?assertEqual({stop, [{Shard1, 42}, {Shard2, 43}]}, Result2).
+
+
+handle_response_incomplete_ring_test() ->
+ Shard1 = mk_shard("n1", [0, 1]),
+ Shard2 = mk_shard("n1", [2, 10]),
+
+ Workers1 = fabric_dict:init([Shard1, Shard2], nil),
+
+ Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
+ ?assertMatch({ok, {_, _}}, Result1),
+ {ok, {Workers2, Responses1}} = Result1,
+ ?assertEqual(fabric_dict:erase(Shard1, Workers1), Workers2),
+ ?assertEqual([{{0, 1}, Shard1, 42}], Responses1),
+
+ Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
+ ?assertEqual({stop, [{Shard1, 42}, {Shard2, 43}]}, Result2).
+
+
+handle_response_multiple_copies_test() ->
+ Shard1 = mk_shard("n1", [0, 1]),
+ Shard2 = mk_shard("n2", [0, 1]),
+ Shard3 = mk_shard("n1", [2, ?RING_END]),
+
+ Workers1 = fabric_dict:init([Shard1, Shard2, Shard3], nil),
+
+ Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
+ ?assertMatch({ok, {_, _}}, Result1),
+ {ok, {Workers2, Responses1}} = Result1,
+
+ Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
+ ?assertMatch({ok, {_, _}}, Result2),
+ {ok, {Workers3, Responses2}} = Result2,
+
+ Result3 = handle_response(Shard3, 44, Workers3, Responses2, [], undefined),
+ % Use the value (42) to distinguish between [0, 1] copies. In reality
+ % they should have the same value but here we need to assert that copy
+ % that responded first is included in the ring.
+ ?assertEqual({stop, [{Shard1, 42}, {Shard3, 44}]}, Result3).
+
+
+handle_response_backtracking_test() ->
+ Shard1 = mk_shard("n1", [0, 5]),
+ Shard2 = mk_shard("n1", [10, ?RING_END]),
+ Shard3 = mk_shard("n2", [2, ?RING_END]),
+ Shard4 = mk_shard("n3", [0, 1]),
+
+ Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4], nil),
+
+ Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
+ ?assertMatch({ok, {_, _}}, Result1),
+ {ok, {Workers2, Responses1}} = Result1,
+
+ Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
+ ?assertMatch({ok, {_, _}}, Result2),
+ {ok, {Workers3, Responses2}} = Result2,
+
+ Result3 = handle_response(Shard3, 44, Workers3, Responses2, [], undefined),
+ ?assertMatch({ok, {_, _}}, Result3),
+ {ok, {Workers4, Responses3}} = Result3,
+
+ Result4 = handle_response(Shard4, 45, Workers4, Responses3, [], undefined),
+ ?assertEqual({stop, [{Shard3, 44}, {Shard4, 45}]}, Result4).
+
+
+handle_response_ring_opts_test() ->
+ Shard1 = mk_shard("n1", [0, 5]),
+ Shard2 = mk_shard("n2", [0, 1]),
+ Shard3 = mk_shard("n3", [0, 1]),
+
+ Opts = [{any, [mk_shard("n3", [0, 1])]}],
+
+ ShardList = [Shard1, Shard2, Shard3],
+ WithRefs = [S#shard{ref = make_ref()} || S <- ShardList],
+ Workers1 = fabric_dict:init(WithRefs, nil),
+
+ Result1 = handle_response(Shard1, 42, Workers1, [], Opts, undefined),
+ ?assertMatch({ok, {_, _}}, Result1),
+ {ok, {Workers2, []}} = Result1,
+
+ % Still waiting because the node doesn't match
+ Result2 = handle_response(Shard2, 43, Workers2, [], Opts, undefined),
+ ?assertMatch({ok, {_, _}}, Result2),
+ {ok, {Workers3, []}} = Result2,
+
+ Result3 = handle_response(Shard3, 44, Workers3, [], Opts, undefined),
+ ?assertEqual({stop, [{Shard3, 44}]}, Result3).
+
+
+handle_error_test() ->
+ Shard1 = mk_shard("n1", [0, 5]),
+ Shard2 = mk_shard("n1", [10, ?RING_END]),
+ Shard3 = mk_shard("n2", [2, ?RING_END]),
+ Shard4 = mk_shard("n3", [0, 1]),
+
+ Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4], nil),
+
+ Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
+ ?assertMatch({ok, {_, _}}, Result1),
+ {ok, {Workers2, Responses1}} = Result1,
+
+ Result2 = handle_error(Shard2, Workers2, Responses1),
+ ?assertMatch({ok, _}, Result2),
+ {ok, Workers3} = Result2,
+ ?assertEqual(fabric_dict:erase(Shard2, Workers2), Workers3),
+
+ Result3 = handle_response(Shard3, 44, Workers3, Responses1, [], undefined),
+ ?assertMatch({ok, {_, _}}, Result3),
+ {ok, {Workers4, Responses3}} = Result3,
+ ?assertEqual(error, handle_error(Shard4, Workers4, Responses3)).
+
+
+node_down_test() ->
+ Shard1 = mk_shard("n1", [0, 5]),
+ Shard2 = mk_shard("n1", [10, ?RING_END]),
+ Shard3 = mk_shard("n2", [2, ?RING_END]),
+ Shard4 = mk_shard("n3", [0, 1]),
+
+ Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4], nil),
+
+ Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined),
+ ?assertMatch({ok, {_, _}}, Result1),
+ {ok, {Workers2, Responses1}} = Result1,
+
+ Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
+ ?assertMatch({ok, {_, _}}, Result2),
+ {ok, {Workers3, Responses2}} = Result2,
+
+ Result3 = node_down(n1, Workers3, Responses2),
+ ?assertMatch({ok, _}, Result3),
+ {ok, Workers4} = Result3,
+ ?assertEqual([{Shard3, nil}, {Shard4, nil}], Workers4),
+
+ Result4 = handle_response(Shard3, 44, Workers4, Responses2, [], undefined),
+ ?assertMatch({ok, {_, _}}, Result4),
+ {ok, {Workers5, Responses3}} = Result4,
+
+ % Note: Shard3 was already processed, it's ok if n2 went down after
+ ?assertEqual({ok, [{Shard4, nil}]}, node_down(n2, Workers5, Responses3)),
+
+ ?assertEqual(error, node_down(n3, Workers5, Responses3)).
+
+
+mk_cnts(Ranges) ->
+ Shards = lists:map(fun mk_shard/1, Ranges),
+ fabric_dict:init([S#shard{ref = make_ref()} || S <- Shards], nil).
+
+
+mk_resps(RangeNameVals) ->
+ [{{B, E}, mk_shard(Name, [B, E]), V} || {Name, B, E, V} <- RangeNameVals].
+
+
+mk_shard([B, E]) when is_integer(B), is_integer(E) ->
+ #shard{range = [B, E]}.
+
+
+mk_shard(Name, Range) ->
+ Node = list_to_atom(Name),
+ BName = list_to_binary(Name),
+ #shard{name = BName, node = Node, range = Range}.
diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl
index 288c67cab..d0a549d6e 100644
--- a/src/fabric/src/fabric_streams.erl
+++ b/src/fabric/src/fabric_streams.erl
@@ -14,7 +14,9 @@
-export([
start/2,
+ start/3,
start/4,
+ start/5,
cleanup/1
]).
@@ -28,17 +30,28 @@
start(Workers, Keypos) ->
start(Workers, Keypos, undefined, undefined).
-start(Workers0, Keypos, StartFun, Replacements) ->
+
+start(Workers, Keypos, RingOpts) ->
+ start(Workers, Keypos, undefined, undefined, RingOpts).
+
+
+start(Workers, Keypos, StartFun, Replacements) ->
+ start(Workers, Keypos, StartFun, Replacements, []).
+
+
+start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
Fun = fun handle_stream_start/3,
Acc = #stream_acc{
workers = fabric_dict:init(Workers0, waiting),
+ ready = [],
start_fun = StartFun,
- replacements = Replacements
+ replacements = Replacements,
+ ring_opts = RingOpts
},
spawn_worker_cleaner(self(), Workers0),
Timeout = fabric_util:request_timeout(),
case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
- {ok, #stream_acc{workers=Workers}} ->
+ {ok, #stream_acc{ready = Workers}} ->
AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
rexi:stream_start(From),
[Worker | WorkerAcc]
@@ -64,69 +77,74 @@ cleanup(Workers) ->
handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
- case fabric_util:remove_down_workers(St#stream_acc.workers, NodeRef) of
- {ok, Workers} ->
- {ok, St#stream_acc{workers=Workers}};
- error ->
- Reason = {nodedown, <<"progress not possible">>},
- {error, Reason}
+ #stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St,
+ case fabric_ring:node_down(NodeRef, Workers, Ready, RingOpts) of
+ {ok, Workers1} ->
+ {ok, St#stream_acc{workers = Workers1}};
+ error ->
+ {error, {nodedown, <<"progress not possible">>}}
end;
handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
- Workers = fabric_dict:erase(Worker, St#stream_acc.workers),
- Replacements = St#stream_acc.replacements,
- case {fabric_view:is_progress_possible(Workers), Reason} of
- {true, _} ->
- {ok, St#stream_acc{workers=Workers}};
- {false, {maintenance_mode, _Node}} when Replacements /= undefined ->
- % Check if we have replacements for this range
- % and start the new workers if so.
- case lists:keytake(Worker#shard.range, 1, Replacements) of
- {value, {_Range, WorkerReplacements}, NewReplacements} ->
- FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
- NewWorker = (St#stream_acc.start_fun)(Repl),
- add_worker_to_cleaner(self(), NewWorker),
- fabric_dict:store(NewWorker, waiting, NewWorkers)
- end, Workers, WorkerReplacements),
- % Assert that our replaced worker provides us
- % the oppurtunity to make progress.
- true = fabric_view:is_progress_possible(FinalWorkers),
- NewRefs = fabric_dict:fetch_keys(FinalWorkers),
- {new_refs, NewRefs, St#stream_acc{
- workers=FinalWorkers,
- replacements=NewReplacements
- }};
- false ->
- % If we progress isn't possible and we don't have any
- % replacements then we're dead in the water.
- Error = {nodedown, <<"progress not possible">>},
- {error, Error}
- end;
- {false, _} ->
- {error, fabric_util:error_info(Reason)}
+ #stream_acc{
+ workers = Workers,
+ ready = Ready,
+ replacements = Replacements,
+ ring_opts = RingOpts
+ } = St,
+ case {fabric_ring:handle_error(Worker, Workers, Ready, RingOpts), Reason} of
+ {{ok, Workers1}, _Reason} ->
+ {ok, St#stream_acc{workers = Workers1}};
+ {error, {maintenance_mode, _Node}} when Replacements /= undefined ->
+ % Check if we have replacements for this range
+ % and start the new workers if so.
+ case lists:keytake(Worker#shard.range, 1, Replacements) of
+ {value, {_Range, WorkerReplacements}, NewReplacements} ->
+ FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
+ NewWorker = (St#stream_acc.start_fun)(Repl),
+ add_worker_to_cleaner(self(), NewWorker),
+ fabric_dict:store(NewWorker, waiting, NewWorkers)
+ end, Workers, WorkerReplacements),
+ % Assert that our replaced worker provides us
+ % the oppurtunity to make progress.
+ true = fabric_ring:is_progress_possible(FinalWorkers),
+ NewRefs = fabric_dict:fetch_keys(FinalWorkers),
+ {new_refs, NewRefs, St#stream_acc{
+ workers=FinalWorkers,
+ replacements=NewReplacements
+ }};
+ false ->
+ % If we progress isn't possible and we don't have any
+ % replacements then we're dead in the water.
+ {error, {nodedown, <<"progress not possible">>}}
+ end;
+ {error, _} ->
+ {error, fabric_util:error_info(Reason)}
end;
handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
- case fabric_dict:lookup_element(Worker, St#stream_acc.workers) of
+ #stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St,
+ case fabric_dict:lookup_element(Worker, Workers) of
undefined ->
% This worker lost the race with other partition copies, terminate
rexi:stream_cancel(From),
{ok, St};
waiting ->
- % Don't ack the worker yet so they don't start sending us
- % rows until we're ready
- Workers0 = fabric_dict:store(Worker, From, St#stream_acc.workers),
- Workers1 = fabric_view:remove_overlapping_shards(Worker, Workers0),
- case fabric_dict:any(waiting, Workers1) of
- true ->
- {ok, St#stream_acc{workers=Workers1}};
- false ->
- {stop, St#stream_acc{workers=Workers1}}
+ case fabric_ring:handle_response(Worker, From, Workers, Ready, RingOpts) of
+ {ok, {Workers1, Ready1}} ->
+ % Don't have a full ring yet. Keep getting responses
+ {ok, St#stream_acc{workers = Workers1, ready = Ready1}};
+ {stop, Ready1} ->
+ % Have a full ring of workers. But don't ack the worker
+ % yet so they don't start sending us rows until we're ready
+ {stop, St#stream_acc{workers = [], ready = Ready1}}
end
end;
handle_stream_start({ok, ddoc_updated}, _, St) ->
- cleanup(St#stream_acc.workers),
+ WaitingWorkers = [W || {W, _} <- St#stream_acc.workers],
+ ReadyWorkers = [W || {W, _} <- St#stream_acc.ready],
+ cleanup(WaitingWorkers ++ ReadyWorkers),
{stop, ddoc_updated};
handle_stream_start(Else, _, _) ->
@@ -199,7 +217,7 @@ should_clean_workers() ->
Ref = erlang:monitor(process, Cleaner),
Coord ! die,
receive {'DOWN', Ref, _, Cleaner, _} -> ok end,
- ?assertEqual(2, meck:num_calls(rexi, kill, 2))
+ ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
end).
@@ -219,7 +237,7 @@ does_not_fire_if_cleanup_called() ->
receive {'DOWN', Ref, _, _, _} -> ok end,
% 2 calls would be from cleanup/1 function. If cleanup process fired
% too it would have been 4 calls total.
- ?assertEqual(2, meck:num_calls(rexi, kill, 2))
+ ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
end).
@@ -236,12 +254,12 @@ should_clean_additional_worker_too() ->
Ref = erlang:monitor(process, Cleaner),
Coord ! die,
receive {'DOWN', Ref, _, Cleaner, _} -> ok end,
- ?assertEqual(2, meck:num_calls(rexi, kill, 2))
+ ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
end).
setup() ->
- ok = meck:expect(rexi, kill, fun(_, _) -> ok end).
+ ok = meck:expect(rexi, kill_all, fun(_) -> ok end).
teardown(_) ->
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 1ba1d17ea..aaf0623f0 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -22,6 +22,7 @@
-export([is_partitioned/1]).
-export([validate_all_docs_args/2, validate_args/3]).
-export([upgrade_mrargs/1]).
+-export([worker_ranges/1]).
-compile({inline, [{doc_id_and_rev,1}]}).
@@ -34,7 +35,7 @@
remove_down_workers(Workers, BadNode) ->
Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end,
NewWorkers = fabric_dict:filter(Filter, Workers),
- case fabric_view:is_progress_possible(NewWorkers) of
+ case fabric_ring:is_progress_possible(NewWorkers) of
true ->
{ok, NewWorkers};
false ->
@@ -51,7 +52,7 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
end, Shards).
cleanup(Workers) ->
- [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
+ rexi:kill_all([{Node, Ref} || #shard{node = Node, ref = Ref} <- Workers]).
recv(Workers, Keypos, Fun, Acc0) ->
rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity).
@@ -334,3 +335,10 @@ upgrade_mrargs({mrargs,
sorted = Sorted,
extra = Extra
}.
+
+
+worker_ranges(Workers) ->
+ Ranges = fabric_dict:fold(fun(#shard{range=[X, Y]}, _, Acc) ->
+ [{X, Y} | Acc]
+ end, [], Workers),
+ lists:usort(Ranges).
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index 27b0c275f..55b44e6f7 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -48,61 +48,83 @@ handle_worker_exit(Collector, _Worker, Reason) ->
%% @doc looks for a fully covered keyrange in the list of counters
-spec is_progress_possible([{#shard{}, term()}]) -> boolean().
-is_progress_possible([]) ->
- false;
is_progress_possible(Counters) ->
- Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end,
- [], Counters),
- [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges),
- Result = lists:foldl(fun
- (_, fail) ->
- % we've already declared failure
- fail;
- (_, complete) ->
- % this is the success condition, we can fast-forward
- complete;
- ({X,_}, Tail) when X > (Tail+1) ->
- % gap in the keyrange, we're dead
- fail;
- ({_,Y}, Tail) ->
- case erlang:max(Tail, Y) of
- End when (End+1) =:= (2 bsl 31) ->
- complete;
- Else ->
- % the normal condition, adding to the tail
- Else
- end
- end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest),
- (Start =:= 0) andalso (Result =:= complete).
+ fabric_ring:is_progress_possible(Counters).
-spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) ->
[{#shard{}, any()}].
-remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) ->
- fabric_dict:filter(fun(#shard{range=[X,Y], node=Node, ref=Ref} = Shard, _) ->
- if Shard =:= Shard0 ->
- % we can't remove ourselves
+remove_overlapping_shards(#shard{} = Shard, Counters) ->
+ remove_overlapping_shards(Shard, Counters, fun stop_worker/1).
+
+
+-spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}], fun()) ->
+ [{#shard{}, any()}].
+remove_overlapping_shards(#shard{} = Shard, Counters, RemoveCb) ->
+ Counters1 = filter_exact_copies(Shard, Counters, RemoveCb),
+ filter_possible_overlaps(Shard, Counters1, RemoveCb).
+
+
+filter_possible_overlaps(Shard, Counters, RemoveCb) ->
+ Ranges0 = fabric_util:worker_ranges(Counters),
+ #shard{range = [BShard, EShard]} = Shard,
+ Ranges = Ranges0 ++ [{BShard, EShard}],
+ {Bs, Es} = lists:unzip(Ranges),
+ {MinB, MaxE} = {lists:min(Bs), lists:max(Es)},
+ % Use a custom sort function which prioritizes the given shard
+ % range when the start endpoints match.
+ SortFun = fun
+ ({B, E}, {B, _}) when {B, E} =:= {BShard, EShard} ->
+ % If start matches with the shard's start, shard always wins
true;
- A < B, X >= A, X < B ->
- % lower bound is inside our range
- rexi:kill(Node, Ref),
- false;
- A < B, Y > A, Y =< B ->
- % upper bound is inside our range
- rexi:kill(Node, Ref),
- false;
- B < A, X >= A orelse B < A, X < B ->
- % target shard wraps the key range, lower bound is inside
- rexi:kill(Node, Ref),
+ ({B, _}, {B, E}) when {B, E} =:= {BShard, EShard} ->
+ % If start matches with the shard's start, shard always wins
false;
- B < A, Y > A orelse B < A, Y =< B ->
- % target shard wraps the key range, upper bound is inside
- rexi:kill(Node, Ref),
+ ({B, E1}, {B, E2}) ->
+ % If start matches, pick the longest range first
+ E2 >= E1;
+ ({B1, _}, {B2, _}) ->
+ % Then, by default, sort by start point
+ B1 =< B2
+ end,
+ Ring = mem3_util:get_ring(Ranges, SortFun, MinB, MaxE),
+ fabric_dict:filter(fun
+ (S, _) when S =:= Shard ->
+ % Keep the original shard
+ true;
+ (#shard{range = [B, E]} = S, _) ->
+ case lists:member({B, E}, Ring) of
+ true ->
+ true; % Keep it
+ false ->
+ % Duplicate range, delete after calling callback function
+ case is_function(RemoveCb) of
+ true -> RemoveCb(S);
+ false -> ok
+ end,
+ false
+ end
+ end, Counters).
+
+
+filter_exact_copies(#shard{range = Range0} = Shard0, Shards, Cb) ->
+ fabric_dict:filter(fun
+ (Shard, _) when Shard =:= Shard0 ->
+ true; % Don't remove ourselves
+ (#shard{range = Range} = Shard, _) when Range =:= Range0 ->
+ case is_function(Cb) of
+ true -> Cb(Shard);
+ false -> ok
+ end,
false;
- true ->
+ (_, _) ->
true
- end
end, Shards).
+
+stop_worker(#shard{ref = Ref, node = Node}) ->
+ rexi:kill(Node, Ref).
+
+
maybe_send_row(#collector{limit=0} = State) ->
#collector{counters=Counters, user_acc=AccIn, callback=Callback} = State,
case fabric_dict:any(0, Counters) of
@@ -329,13 +351,15 @@ get_shards(Db, #mrargs{} = Args) ->
% request.
case {Args#mrargs.stable, Partition} of
{true, undefined} ->
- mem3:ushards(DbName);
+ {mem3:ushards(DbName), []};
{true, Partition} ->
- mem3:ushards(DbName, couch_partition:shard_key(Partition));
+ Shards = mem3:ushards(DbName, couch_partition:shard_key(Partition)),
+ {Shards, [{any, Shards}]};
{false, undefined} ->
- mem3:shards(DbName);
+ {mem3:shards(DbName), []};
{false, Partition} ->
- mem3:shards(DbName, couch_partition:shard_key(Partition))
+ Shards = mem3:shards(DbName, couch_partition:shard_key(Partition)),
+ {Shards, [{any, Shards}]}
end.
maybe_update_others(DbName, DDoc, ShardsInvolved, ViewName,
@@ -352,8 +376,9 @@ get_shard_replacements(DbName, UsedShards0) ->
% that aren't already used.
AllLiveShards = mem3:live_shards(DbName, [node() | nodes()]),
UsedShards = [S#shard{ref=undefined} || S <- UsedShards0],
- UnusedShards = AllLiveShards -- UsedShards,
+ get_shard_replacements_int(AllLiveShards -- UsedShards, UsedShards).
+get_shard_replacements_int(UnusedShards, UsedShards) ->
% If we have more than one copy of a range then we don't
% want to try and add a replacement to any copy.
RangeCounts = lists:foldl(fun(#shard{range=R}, Acc) ->
@@ -363,10 +388,10 @@ get_shard_replacements(DbName, UsedShards0) ->
% For each seq shard range with a count of 1, find any
% possible replacements from the unused shards. The
% replacement list is keyed by range.
- lists:foldl(fun(#shard{range=Range}, Acc) ->
+ lists:foldl(fun(#shard{range = [B, E] = Range}, Acc) ->
case dict:find(Range, RangeCounts) of
{ok, 1} ->
- Repls = [S || S <- UnusedShards, S#shard.range =:= Range],
+ Repls = mem3_util:non_overlapping_shards(UnusedShards, B, E),
% Only keep non-empty lists of replacements
if Repls == [] -> Acc; true ->
[{Range, Repls} | Acc]
@@ -406,26 +431,59 @@ is_progress_possible_test() ->
?assertEqual(is_progress_possible(mk_cnts(T4)),false),
% outside range
T5 = [[0,10],[11,20],[21,EndPoint]],
- ?assertEqual(is_progress_possible(mk_cnts(T5)),false).
+ ?assertEqual(is_progress_possible(mk_cnts(T5)),false),
+ T6 = [[0, 10], [11, 20], [0, 5], [6, 21], [21, EndPoint - 1]],
+ ?assertEqual(is_progress_possible(mk_cnts(T6)), true),
+ % not possible, overlap is not exact
+ T7 = [[0, 10], [13, 20], [21, EndPoint - 1], [9, 12]],
+ ?assertEqual(is_progress_possible(mk_cnts(T7)), false).
+
remove_overlapping_shards_test() ->
- meck:new(rexi),
- meck:expect(rexi, kill, fun(_, _) -> ok end),
- EndPoint = 2 bsl 31,
- T1 = [[0,10],[11,20],[21,EndPoint-1]],
- Shards = mk_cnts(T1,3),
- ?assertEqual(orddict:size(
- remove_overlapping_shards(#shard{name=list_to_atom("node-3"),
- node=list_to_atom("node-3"),
- range=[11,20]},
- Shards)),7),
- meck:unload(rexi).
+ Cb = undefined,
+
+ Shards = mk_cnts([[0, 10], [11, 20], [21, ?RING_END]], 3),
+
+ % Simple (exact) overlap
+ Shard1 = mk_shard("node-3", [11, 20]),
+ Shards1 = fabric_dict:store(Shard1, nil, Shards),
+ R1 = remove_overlapping_shards(Shard1, Shards1, Cb),
+ ?assertEqual([{0, 10}, {11, 20}, {21, ?RING_END}],
+ fabric_util:worker_ranges(R1)),
+ ?assert(fabric_dict:is_key(Shard1, R1)),
+
+ % Split overlap (shard overlap multiple workers)
+ Shard2 = mk_shard("node-3", [0, 20]),
+ Shards2 = fabric_dict:store(Shard2, nil, Shards),
+ R2 = remove_overlapping_shards(Shard2, Shards2, Cb),
+ ?assertEqual([{0, 20}, {21, ?RING_END}],
+ fabric_util:worker_ranges(R2)),
+ ?assert(fabric_dict:is_key(Shard2, R2)).
+
+
+get_shard_replacements_test() ->
+ Unused = [mk_shard(N, [B, E]) || {N, B, E} <- [
+ {"n1", 11, 20}, {"n1", 21, ?RING_END},
+ {"n2", 0, 4}, {"n2", 5, 10}, {"n2", 11, 20},
+ {"n3", 0, 21, ?RING_END}
+ ]],
+ Used = [mk_shard(N, [B, E]) || {N, B, E} <- [
+ {"n2", 21, ?RING_END},
+ {"n3", 0, 10}, {"n3", 11, 20}
+ ]],
+ Res = lists:sort(get_shard_replacements_int(Unused, Used)),
+ % Notice that [0, 10] range can be replaced by spawning the [0, 4] and [5,
+ % 10] workers on n1
+ Expect = [
+ {[0, 10], [mk_shard("n2", [0, 4]), mk_shard("n2", [5, 10])]},
+ {[11, 20], [mk_shard("n1", [11, 20]), mk_shard("n2", [11, 20])]},
+ {[21, ?RING_END], [mk_shard("n1", [21, ?RING_END])]}
+ ],
+ ?assertEqual(Expect, Res).
+
mk_cnts(Ranges) ->
- Shards = lists:map(fun(Range) ->
- #shard{range=Range}
- end,
- Ranges),
+ Shards = lists:map(fun mk_shard/1, Ranges),
orddict:from_list([{Shard,nil} || Shard <- Shards]).
mk_cnts(Ranges, NoNodes) ->
@@ -440,6 +498,15 @@ mk_cnts(Ranges, NoNodes) ->
mk_shards(0,_Range,Shards) ->
Shards;
mk_shards(NoNodes,Range,Shards) ->
- NodeName = list_to_atom("node-" ++ integer_to_list(NoNodes)),
- mk_shards(NoNodes-1,Range,
- [#shard{name=NodeName, node=NodeName, range=Range} | Shards]).
+ Name ="node-" ++ integer_to_list(NoNodes),
+ mk_shards(NoNodes-1,Range, [mk_shard(Name, Range) | Shards]).
+
+
+mk_shard([B, E]) when is_integer(B), is_integer(E) ->
+ #shard{range = [B, E]}.
+
+
+mk_shard(Name, Range) ->
+ Node = list_to_atom(Name),
+ BName = list_to_binary(Name),
+ #shard{name = BName, node = Node, range = Range}.
diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl
index 9049eaa90..1d87e3ddd 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -23,12 +23,12 @@
go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
{CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs),
DbName = fabric:dbname(Db),
- Shards = shards(Db, QueryArgs),
+ {Shards, RingOpts} = shards(Db, QueryArgs),
Workers0 = fabric_util:submit_jobs(
Shards, fabric_rpc, all_docs, [Options, WorkerArgs]),
RexiMon = fabric_util:create_monitors(Workers0),
try
- case fabric_streams:start(Workers0, #shard.ref) of
+ case fabric_streams:start(Workers0, #shard.ref, RingOpts) of
{ok, Workers} ->
try
go(DbName, Options, Workers, CoordArgs, Callback, Acc)
diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl
index f96bb058d..febbd3169 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -127,24 +127,30 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0)
send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
LiveNodes = [node() | nodes()],
AllLiveShards = mem3:live_shards(DbName, LiveNodes),
- Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
- case lists:member(Shard, AllLiveShards) of
- true ->
- Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}),
- [{Shard#shard{ref = Ref}, Seq}];
- false ->
- % Find some replacement shards to cover the missing range
- % TODO It's possible in rare cases of shard merging to end up
- % with overlapping shard ranges from this technique
- lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
- Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2, ChangesArgs,
- make_replacement_arg(N, Seq)]}),
- {NewShard#shard{ref = Ref}, 0}
- end, find_replacement_shards(Shard, AllLiveShards))
- end
- end, unpack_seqs(PackedSeqs, DbName)),
+ Seqs0 = unpack_seqs(PackedSeqs, DbName),
+ {WSeqs0, Dead, Reps} = find_replacements(Seqs0, AllLiveShards),
+ % Start workers which didn't need replacements
+ WSeqs = lists:map(fun({#shard{name = Name, node = N} = S, Seq}) ->
+ Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Seq]}),
+ {S#shard{ref = Ref}, Seq}
+ end, WSeqs0),
+ % For some dead workers see if they are a result of split shards. In that
+ % case make a replacement argument so that local rexi workers can calculate
+ % (hopefully) a > 0 update sequence.
+ {WSplitSeqs0, Reps1} = find_split_shard_replacements(Dead, Reps),
+ WSplitSeqs = lists:map(fun({#shard{name = Name, node = N} = S, Seq}) ->
+ Arg = make_replacement_arg(N, Seq),
+ Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}),
+ {S#shard{ref = Ref}, Seq}
+ end, WSplitSeqs0),
+ % For ranges that were not split start sequences from 0
+ WReps = lists:map(fun(#shard{name = Name, node = N} = S) ->
+ Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}),
+ {S#shard{ref = Ref}, 0}
+ end, Reps1),
+ Seqs = WSeqs ++ WSplitSeqs ++ WReps,
{Workers0, _} = lists:unzip(Seqs),
- Repls = fabric_view:get_shard_replacements(DbName, Workers0),
+ Repls = fabric_ring:get_shard_replacements(DbName, Workers0),
StartFun = fun(#shard{name=Name, node=N, range=R0}=Shard) ->
%% Find the original shard copy in the Seqs array
case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, Seqs) of
@@ -386,6 +392,24 @@ seq({Seq, _Uuid, _Node}) -> Seq;
seq({Seq, _Uuid}) -> Seq;
seq(Seq) -> Seq.
+
+unpack_seq_regex_match(Packed) ->
+ NewPattern = "^\\[[0-9]+\s*,\s*\"(?<opaque>.*)\"\\]$",
+ OldPattern = "^\"?([0-9]+-)?(?<opaque>.*?)\"?$",
+ Options = [{capture, [opaque], binary}],
+ case re:run(Packed, NewPattern, Options) of
+ {match, Match} ->
+ Match;
+ nomatch ->
+ {match, Match} = re:run(Packed, OldPattern, Options),
+ Match
+ end.
+
+
+unpack_seq_decode_term(Opaque) ->
+ binary_to_term(couch_util:decodeBase64Url(Opaque)).
+
+
unpack_seqs(0, DbName) ->
fabric_dict:init(mem3:shards(DbName), 0);
@@ -396,23 +420,14 @@ unpack_seqs([_SeqNum, Opaque], DbName) -> % deprecated
do_unpack_seqs(Opaque, DbName);
unpack_seqs(Packed, DbName) ->
- NewPattern = "^\\[[0-9]+\s*,\s*\"(?<opaque>.*)\"\\]$",
- OldPattern = "^\"?([0-9]+-)?(?<opaque>.*?)\"?$",
- Options = [{capture, [opaque], binary}],
- Opaque = case re:run(Packed, NewPattern, Options) of
- {match, Match} ->
- Match;
- nomatch ->
- {match, Match} = re:run(Packed, OldPattern, Options),
- Match
- end,
+ Opaque = unpack_seq_regex_match(Packed),
do_unpack_seqs(Opaque, DbName).
do_unpack_seqs(Opaque, DbName) ->
% A preventative fix for FB 13533 to remove duplicate shards.
% This just picks each unique shard and keeps the largest seq
% value recorded.
- Decoded = binary_to_term(couch_util:decodeBase64Url(Opaque)),
+ Decoded = unpack_seq_decode_term(Opaque),
DedupDict = lists:foldl(fun({Node, [A, B], Seq}, Acc) ->
dict:append({Node, [A, B]}, Seq, Acc)
end, dict:new(), Decoded),
@@ -431,28 +446,28 @@ do_unpack_seqs(Opaque, DbName) ->
end
end, Deduped),
- % Fill holes in the since sequence. If/when we ever start
- % using overlapping shard ranges this will need to be updated
- % to not include shard ranges that overlap entries in Upacked.
- % A quick and dirty approach would be like such:
- %
- % lists:foldl(fun(S, Acc) ->
- % fabric_view:remove_overlapping_shards(S, Acc)
- % end, mem3:shards(DbName), Unpacked)
- %
- % Unfortunately remove_overlapping_shards isn't reusable because
- % of its calls to rexi:kill/2. When we get to overlapping
- % shard ranges and have to rewrite shard range management
- % we can revisit this simpler algorithm.
- case fabric_view:is_progress_possible(Unpacked) of
+ % This just handles the case if the ring in the unpacked sequence
+ % received is not complete and in that case tries to fill in the
+ % missing ranges with shards from the shard map
+ case fabric_ring:is_progress_possible(Unpacked) of
true ->
Unpacked;
false ->
- Extract = fun({Shard, _Seq}) -> Shard#shard.range end,
- Ranges = lists:usort(lists:map(Extract, Unpacked)),
- Filter = fun(S) -> not lists:member(S#shard.range, Ranges) end,
- Replacements = lists:filter(Filter, mem3:shards(DbName)),
- Unpacked ++ [{R, get_old_seq(R, Deduped)} || R <- Replacements]
+ PotentialWorkers = lists:map(fun({Node, [A, B], Seq}) ->
+ case mem3:get_shard(DbName, Node, [A, B]) of
+ {ok, Shard} ->
+ {Shard, Seq};
+ {error, not_found} ->
+ {#shard{node = Node, range = [A, B]}, Seq}
+ end
+ end, Deduped),
+ Shards = mem3:shards(DbName),
+ {Unpacked1, Dead, Reps} = find_replacements(PotentialWorkers, Shards),
+ {Splits, Reps1} = find_split_shard_replacements(Dead, Reps),
+ RepSeqs = lists:map(fun(#shard{} = S) ->
+ {S, get_old_seq(S, Deduped)}
+ end, Reps1),
+ Unpacked1 ++ Splits ++ RepSeqs
end.
@@ -491,18 +506,119 @@ changes_row(Props0) ->
Props2 = lists:filter(fun({K,_V}) -> lists:member(K, Allowed) end, Props1),
{change, {Props2}}.
-find_replacement_shards(#shard{range=Range}, AllShards) ->
- % TODO make this moar betta -- we might have split or merged the partition
- [Shard || Shard <- AllShards, Shard#shard.range =:= Range].
+
+find_replacements(Workers, AllShards) ->
+ % Build map [B, E] => [Worker1, Worker2, ...] for all workers
+ WrkMap = lists:foldl(fun({#shard{range = [B, E]}, _} = W, Acc) ->
+ maps:update_with({B, E}, fun(Ws) -> [W | Ws] end, [W], Acc)
+ end, #{}, fabric_dict:to_list(Workers)),
+
+ % Build map [B, E] => [Shard1, Shard2, ...] for all shards
+ AllMap = lists:foldl(fun(#shard{range = [B, E]} = S, Acc) ->
+ maps:update_with({B, E}, fun(Ss) -> [S | Ss] end, [S], Acc)
+ end, #{}, AllShards),
+
+ % Custom sort function will prioritize workers over other shards.
+ % The idea is to not unnecessarily kill workers if we don't have to
+ SortFun = fun
+ (R1 = {B, E1}, R2 = {B, E2}) ->
+ case {maps:is_key(R1, WrkMap), maps:is_key(R2, WrkMap)} of
+ {true, true} ->
+ % Both are workers, larger interval wins
+ E1 >= E2;
+ {true, false} ->
+ % First element is a worker range, it wins
+ true;
+ {false, true} ->
+ % Second element is a worker range, it wins
+ false;
+ {false, false} ->
+ % Neither one is a worker interval, pick larger one
+ E1 >= E2
+ end;
+ ({B1, _}, {B2, _}) ->
+ B1 =< B2
+ end,
+ Ring = mem3_util:get_ring(maps:keys(AllMap), SortFun),
+
+ % Keep only workers in the ring and from one of the available nodes
+ Keep = fun(#shard{range = [B, E], node = N}) ->
+ lists:member({B, E}, Ring) andalso lists:keyfind(N, #shard.node,
+ maps:get({B, E}, AllMap)) =/= false
+ end,
+ Workers1 = fabric_dict:filter(fun(S, _) -> Keep(S) end, Workers),
+ Removed = fabric_dict:filter(fun(S, _) -> not Keep(S) end, Workers),
+
+ {Rep, _} = lists:foldl(fun(R, {RepAcc, AllMapAcc}) ->
+ case maps:is_key(R, WrkMap)of
+ true ->
+ % It's a worker and in the map of available shards. Make sure
+ % to keep it only if there is a range available on that node
+ % only (reuse Keep/1 predicate from above)
+ WorkersInRange = maps:get(R, WrkMap),
+ case lists:any(fun({S, _}) -> Keep(S) end, WorkersInRange) of
+ true ->
+ {RepAcc, AllMapAcc};
+ false ->
+ [Shard | Rest] = maps:get(R, AllMapAcc),
+ {[Shard | RepAcc], AllMapAcc#{R := Rest}}
+ end;
+ false ->
+ % No worker for this range. Replace from available shards
+ [Shard | Rest] = maps:get(R, AllMapAcc),
+ {[Shard | RepAcc], AllMapAcc#{R := Rest}}
+ end
+ end, {[], AllMap}, Ring),
+
+ % Return the list of workers that are part of ring, list of removed workers
+ % and a list of replacement shards that could be used to make sure the ring
+ % completes.
+ {Workers1, Removed, Rep}.
+
+
+% From the list of dead workers determine if any are a result of a split shard.
+% In that case perhaps there is a way to not rewind the changes feed back to 0.
+% Returns {NewWorkers, Available} where NewWorkers is the list of
+% viable workers Available is the list of still unused input Shards
+find_split_shard_replacements(DeadWorkers, Shards) ->
+ Acc0 = {[], Shards},
+ AccF = fabric_dict:fold(fun(#shard{node = WN, range = R}, Seq, Acc) ->
+ [B, E] = R,
+ {SplitWorkers, Available} = Acc,
+ ShardsOnSameNode = [S || #shard{node = N} = S <- Available, N =:= WN],
+ SplitShards = mem3_util:non_overlapping_shards(ShardsOnSameNode, B, E),
+ RepCount = length(SplitShards),
+ NewWorkers = [{S, make_split_seq(Seq, RepCount)} || S <- SplitShards],
+ NewAvailable = [S || S <- Available, not lists:member(S, SplitShards)],
+ {NewWorkers ++ SplitWorkers, NewAvailable}
+ end, Acc0, DeadWorkers),
+ {Workers, Available} = AccF,
+ {fabric_dict:from_list(Workers), Available}.
+
+
+make_split_seq({Num, Uuid, Node}, RepCount) when RepCount > 1 ->
+ {Num, {split, Uuid}, Node};
+make_split_seq(Seq, _) ->
+ Seq.
+
validate_start_seq(_DbName, "now") ->
ok;
-validate_start_seq(DbName, Seq) ->
- try unpack_seqs(Seq, DbName) of _Any ->
+validate_start_seq(_DbName, 0) ->
+ ok;
+validate_start_seq(_DbName, "0") ->
+ ok;
+validate_start_seq(_DbName, Seq) ->
+ try
+ case Seq of
+ [_SeqNum, Opaque] ->
+ unpack_seq_decode_term(Opaque);
+ Seq ->
+ Opaque = unpack_seq_regex_match(Seq),
+ unpack_seq_decode_term(Opaque)
+ end,
ok
catch
- error:database_does_not_exist ->
- {error, database_does_not_exist};
_:_ ->
Reason = <<"Malformed sequence supplied in 'since' parameter.">>,
{error, {bad_request, Reason}}
@@ -520,51 +636,185 @@ get_changes_epoch() ->
increment_changes_epoch() ->
application:set_env(fabric, changes_epoch, os:timestamp()).
-unpack_seqs_test() ->
+
+unpack_seq_setup() ->
meck:new(mem3),
meck:new(fabric_view),
meck:expect(mem3, get_shard, fun(_, _, _) -> {ok, #shard{}} end),
- meck:expect(fabric_view, is_progress_possible, fun(_) -> true end),
-
- % BigCouch 0.3 style.
- assert_shards("23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
- "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
- "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA"),
-
- % BigCouch 0.4 style.
- assert_shards([23423,<<"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
- "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
- "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA">>]),
-
- % BigCouch 0.4 style (as string).
- assert_shards("[23423,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
- "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
- "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
- assert_shards("[23423 ,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
- "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
- "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
- assert_shards("[23423, \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
- "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
- "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
- assert_shards("[23423 , \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
- "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
- "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
-
- % with internal hypen
- assert_shards("651-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ"
- "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8"
- "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"),
- assert_shards([651,"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ"
- "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8"
- "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"]),
-
- % CouchDB 1.2 style
- assert_shards("\"23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
- "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
- "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\""),
-
- meck:unload(fabric_view),
- meck:unload(mem3).
+ meck:expect(fabric_ring, is_progress_possible, fun(_) -> true end),
+ ok.
+
+
+unpack_seqs_test_() ->
+ {
+ setup,
+ fun unpack_seq_setup/0,
+ fun (_) -> meck:unload() end,
+ [
+ t_unpack_seqs()
+ ]
+ }.
+
+
+t_unpack_seqs() ->
+ ?_test(begin
+ % BigCouch 0.3 style.
+ assert_shards("23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
+ "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
+ "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA"),
+
+ % BigCouch 0.4 style.
+ assert_shards([23423,<<"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
+ "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
+ "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA">>]),
+
+ % BigCouch 0.4 style (as string).
+ assert_shards("[23423,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
+ "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
+ "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
+ assert_shards("[23423 ,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
+ "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
+ "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
+ assert_shards("[23423, \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
+ "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
+ "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
+ assert_shards("[23423 , \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
+ "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
+ "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
+
+ % with internal hypen
+ assert_shards("651-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ"
+ "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8"
+ "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"),
+ assert_shards([651,"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ"
+ "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8"
+ "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"]),
+
+ % CouchDB 1.2 style
+ assert_shards("\"23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
+ "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
+ "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"")
+ end).
+
assert_shards(Packed) ->
?assertMatch([{#shard{},_}|_], unpack_seqs(Packed, <<"foo">>)).
+
+
+find_replacements_test() ->
+ % None of the workers are in the live list of shard but there is a
+ % replacement on n3 for the full range. It should get picked instead of
+ % the two smaller one on n2.
+ Workers1 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]),
+ AllShards1 = [
+ mk_shard("n1", 11, ?RING_END),
+ mk_shard("n2", 0, 4),
+ mk_shard("n2", 5, 10),
+ mk_shard("n3", 0, ?RING_END)
+ ],
+ {WorkersRes1, Dead1, Reps1} = find_replacements(Workers1, AllShards1),
+ ?assertEqual([], WorkersRes1),
+ ?assertEqual(Workers1, Dead1),
+ ?assertEqual([mk_shard("n3", 0, ?RING_END)], Reps1),
+
+ % None of the workers are in the live list of shards and there is a
+ % split replacement from n2 (range [0, 10] replaced with [0, 4], [5, 10])
+ Workers2 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]),
+ AllShards2 = [
+ mk_shard("n1", 11, ?RING_END),
+ mk_shard("n2", 0, 4),
+ mk_shard("n2", 5, 10)
+ ],
+ {WorkersRes2, Dead2, Reps2} = find_replacements(Workers2, AllShards2),
+ ?assertEqual([], WorkersRes2),
+ ?assertEqual(Workers2, Dead2),
+ ?assertEqual([
+ mk_shard("n1", 11, ?RING_END),
+ mk_shard("n2", 0, 4),
+ mk_shard("n2", 5, 10)
+ ], lists:sort(Reps2)),
+
+ % One worker is available and one needs to be replaced. Replacement will be
+ % from two split shards
+ Workers3 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]),
+ AllShards3 = [
+ mk_shard("n1", 11, ?RING_END),
+ mk_shard("n2", 0, 4),
+ mk_shard("n2", 5, 10),
+ mk_shard("n2", 11, ?RING_END)
+ ],
+ {WorkersRes3, Dead3, Reps3} = find_replacements(Workers3, AllShards3),
+ ?assertEqual(mk_workers([{"n2", 11, ?RING_END}]), WorkersRes3),
+ ?assertEqual(mk_workers([{"n1", 0, 10}]), Dead3),
+ ?assertEqual([
+ mk_shard("n2", 0, 4),
+ mk_shard("n2", 5, 10)
+ ], lists:sort(Reps3)),
+
+ % All workers are available. Make sure they are not killed even if there is
+ % a longer (single) shard to replace them.
+ Workers4 = mk_workers([{"n1", 0, 10}, {"n1", 11, ?RING_END}]),
+ AllShards4 = [
+ mk_shard("n1", 0, 10),
+ mk_shard("n1", 11, ?RING_END),
+ mk_shard("n2", 0, 4),
+ mk_shard("n2", 5, 10),
+ mk_shard("n3", 0, ?RING_END)
+ ],
+ {WorkersRes4, Dead4, Reps4} = find_replacements(Workers4, AllShards4),
+ ?assertEqual(Workers4, WorkersRes4),
+ ?assertEqual([], Dead4),
+ ?assertEqual([], Reps4).
+
+
+mk_workers(NodesRanges) ->
+ mk_workers(NodesRanges, nil).
+
+mk_workers(NodesRanges, Val) ->
+ orddict:from_list([{mk_shard(N, B, E), Val} || {N, B, E} <- NodesRanges]).
+
+
+mk_shard(Name, B, E) ->
+ Node = list_to_atom(Name),
+ BName = list_to_binary(Name),
+ #shard{name = BName, node = Node, range = [B, E]}.
+
+
+find_split_shard_replacements_test() ->
+ % One worker is can be replaced and one can't
+ Dead1 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42),
+ Shards1 = [
+ mk_shard("n1", 0, 4),
+ mk_shard("n1", 5, 10),
+ mk_shard("n3", 11, ?RING_END)
+ ],
+ {Workers1, ShardsLeft1} = find_split_shard_replacements(Dead1, Shards1),
+ ?assertEqual(mk_workers([{"n1", 0, 4}, {"n1", 5, 10}], 42), Workers1),
+ ?assertEqual([mk_shard("n3", 11, ?RING_END)], ShardsLeft1),
+
+ % All workers can be replaced - one by 1 shard, another by 3 smaller shards
+ Dead2 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42),
+ Shards2 = [
+ mk_shard("n1", 0, 10),
+ mk_shard("n2", 11, 12),
+ mk_shard("n2", 13, 14),
+ mk_shard("n2", 15, ?RING_END)
+ ],
+ {Workers2, ShardsLeft2} = find_split_shard_replacements(Dead2, Shards2),
+ ?assertEqual(mk_workers([
+ {"n1", 0, 10},
+ {"n2", 11, 12},
+ {"n2", 13, 14},
+ {"n2", 15, ?RING_END}
+ ], 42), Workers2),
+ ?assertEqual([], ShardsLeft2),
+
+ % No workers can be replaced. Ranges match but they are on different nodes
+ Dead3 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42),
+ Shards3 = [
+ mk_shard("n2", 0, 10),
+ mk_shard("n3", 11, ?RING_END)
+ ],
+ {Workers3, ShardsLeft3} = find_split_shard_replacements(Dead3, Shards3),
+ ?assertEqual([], Workers3),
+ ?assertEqual(Shards3, ShardsLeft3).
diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl
index 1bbc6d201..5a5cc138b 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -26,11 +26,11 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo)
go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
DbName = fabric:dbname(Db),
- Shards = fabric_view:get_shards(Db, Args),
+ {Shards, RingOpts} = fabric_view:get_shards(Db, Args),
{CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args),
- Repls = fabric_view:get_shard_replacements(DbName, Shards),
+ Repls = fabric_ring:get_shard_replacements(DbName, Shards),
RPCArgs = [DocIdAndRev, View, WorkerArgs, Options],
StartFun = fun(Shard) ->
hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
@@ -38,7 +38,8 @@ go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs),
RexiMon = fabric_util:create_monitors(Workers0),
try
- case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of
+ case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls,
+ RingOpts) of
{ok, ddoc_updated} ->
Callback({error, ddoc_updated}, Acc);
{ok, Workers} ->
diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl
index b157c37e5..a432b2cd5 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -25,19 +25,20 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
DbName = fabric:dbname(Db),
- Shards = fabric_view:get_shards(Db, Args),
+ {Shards, RingOpts} = fabric_view:get_shards(Db, Args),
{CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
RPCArgs = [DocIdAndRev, VName, WorkerArgs],
fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args),
- Repls = fabric_view:get_shard_replacements(DbName, Shards),
+ Repls = fabric_ring:get_shard_replacements(DbName, Shards),
StartFun = fun(Shard) ->
hd(fabric_util:submit_jobs([Shard], fabric_rpc, reduce_view, RPCArgs))
end,
Workers0 = fabric_util:submit_jobs(Shards,fabric_rpc,reduce_view,RPCArgs),
RexiMon = fabric_util:create_monitors(Workers0),
try
- case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of
+ case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls,
+ RingOpts) of
{ok, ddoc_updated} ->
Callback({error, ddoc_updated}, Acc);
{ok, Workers} ->
diff --git a/src/mem3/include/mem3.hrl b/src/mem3/include/mem3.hrl
index 9be8a7f99..d97b25469 100644
--- a/src/mem3/include/mem3.hrl
+++ b/src/mem3/include/mem3.hrl
@@ -10,6 +10,11 @@
% License for the specific language governing permissions and limitations under
% the License.
+
+% The last element in the ring
+-define(RING_END, 2 bsl 31 - 1).
+
+
% type specification hacked to suppress dialyzer warning re: match spec
-record(shard, {
name :: binary() | '_' | 'undefined',
diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl
index dea0c7a5b..dc666fdae 100644
--- a/src/mem3/src/mem3.erl
+++ b/src/mem3/src/mem3.erl
@@ -150,7 +150,8 @@ ushards(DbName, Shards0, ZoneMap) ->
% but sort each zone separately to ensure a consistent choice between
% nodes in the same zone.
Shards = choose_ushards(DbName, L ++ S) ++ choose_ushards(DbName, D),
- lists:ukeysort(#shard.range, Shards).
+ OverlappedShards = lists:ukeysort(#shard.range, Shards),
+ mem3_util:non_overlapping_shards(OverlappedShards).
get_shard(DbName, Node, Range) ->
mem3_shards:get(DbName, Node, Range).
diff --git a/src/mem3/src/mem3_httpd.erl b/src/mem3/src/mem3_httpd.erl
index c922141b1..3df7e1876 100644
--- a/src/mem3/src/mem3_httpd.erl
+++ b/src/mem3/src/mem3_httpd.erl
@@ -77,7 +77,7 @@ json_shards([#shard{node=Node, range=[B,E]} | Rest], AccIn) ->
json_shards(Rest, dict:append(Range, Node, AccIn)).
sync_shard(ShardName) ->
- Shards = mem3_shards:for_shard_name(ShardName),
+ Shards = mem3_shards:for_shard_range(ShardName),
[rpc:call(S1#shard.node, mem3_sync, push, [S1, S2#shard.node]) ||
S1 <- Shards, S2 <- Shards, S1 =/= S2],
ok.
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index 6afc22f57..dfa40c338 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -21,7 +21,7 @@
-export([start_link/0]).
-export([for_db/1, for_db/2, for_docid/2, for_docid/3, get/3, local/1, fold/2]).
--export([for_shard_name/1]).
+-export([for_shard_range/1]).
-export([set_max_size/1]).
-export([get_changes_pid/0]).
@@ -95,36 +95,34 @@ for_docid(DbName, DocId, Options) ->
false -> mem3_util:downcast(Shards)
end.
-for_shard_name(ShardName) ->
- for_shard_name(ShardName, []).
-
-for_shard_name(ShardName, Options) ->
+for_shard_range(ShardName) ->
DbName = mem3:dbname(ShardName),
+ [B, E] = mem3:range(ShardName),
ShardHead = #shard{
- name = ShardName,
dbname = DbName,
+ range = ['$1', '$2'],
_ = '_'
},
OrderedShardHead = #ordered_shard{
- name = ShardName,
dbname = DbName,
+ range = ['$1', '$2'],
_ = '_'
},
- ShardSpec = {ShardHead, [], ['$_']},
- OrderedShardSpec = {OrderedShardHead, [], ['$_']},
+ % see mem3_util:range_overlap/2 for an explanation how it works
+ Conditions = [{'=<', '$1', E}, {'=<', B, '$2'}],
+ ShardSpec = {ShardHead, Conditions, ['$_']},
+ OrderedShardSpec = {OrderedShardHead, Conditions, ['$_']},
Shards = try ets:select(?SHARDS, [ShardSpec, OrderedShardSpec]) of
[] ->
- filter_shards_by_name(ShardName, load_shards_from_disk(DbName));
+ filter_shards_by_range([B, E], load_shards_from_disk(DbName));
Else ->
gen_server:cast(?MODULE, {cache_hit, DbName}),
Else
catch error:badarg ->
- filter_shards_by_name(ShardName, load_shards_from_disk(DbName))
+ filter_shards_by_range([B, E], load_shards_from_disk(DbName))
end,
- case lists:member(ordered, Options) of
- true -> Shards;
- false -> mem3_util:downcast(Shards)
- end.
+ mem3_util:downcast(Shards).
+
get(DbName, Node, Range) ->
Res = lists:foldl(fun(#shard{node=N, range=R}=S, Acc) ->
@@ -509,17 +507,12 @@ flush_write(DbName, Writer, WriteTimeout) ->
erlang:exit({mem3_shards_write_timeout, DbName})
end.
-filter_shards_by_name(Name, Shards) ->
- filter_shards_by_name(Name, [], Shards).
-
-filter_shards_by_name(_, Matches, []) ->
- Matches;
-filter_shards_by_name(Name, Matches, [#ordered_shard{name=Name}=S|Ss]) ->
- filter_shards_by_name(Name, [S|Matches], Ss);
-filter_shards_by_name(Name, Matches, [#shard{name=Name}=S|Ss]) ->
- filter_shards_by_name(Name, [S|Matches], Ss);
-filter_shards_by_name(Name, Matches, [_|Ss]) ->
- filter_shards_by_name(Name, Matches, Ss).
+
+filter_shards_by_range(Range, Shards)->
+ lists:filter(fun
+ (#ordered_shard{range = R}) -> mem3_util:range_overlap(Range, R);
+ (#shard{range = R}) -> mem3_util:range_overlap(Range, R)
+ end, Shards).
-ifdef(TEST).
diff --git a/src/mem3/src/mem3_sync_event_listener.erl b/src/mem3/src/mem3_sync_event_listener.erl
index 56ffe3d07..e3368e23f 100644
--- a/src/mem3/src/mem3_sync_event_listener.erl
+++ b/src/mem3/src/mem3_sync_event_listener.erl
@@ -192,7 +192,7 @@ maybe_push_shards(St) ->
end.
push_shard(ShardName) ->
- try mem3_shards:for_shard_name(ShardName) of
+ try mem3_shards:for_shard_range(ShardName) of
Shards ->
Live = nodes(),
lists:foreach(
diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl
index b44ca2332..e8cba5d7b 100644
--- a/src/mem3/src/mem3_util.erl
+++ b/src/mem3/src/mem3_util.erl
@@ -17,7 +17,18 @@
shard_info/1, ensure_exists/1, open_db_doc/1]).
-export([is_deleted/1, rotate_list/2]).
-export([
- iso8601_timestamp/0
+ iso8601_timestamp/0,
+ live_nodes/0,
+ replicate_dbs_to_all_nodes/1,
+ replicate_dbs_from_all_nodes/1,
+ range_overlap/2,
+ get_ring/1,
+ get_ring/2,
+ get_ring/3,
+ get_ring/4,
+ non_overlapping_shards/1,
+ non_overlapping_shards/3,
+ calculate_max_n/1
]).
%% do not use outside mem3.
@@ -286,3 +297,272 @@ iso8601_timestamp() ->
{{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
+
+
+live_nodes() ->
+ LiveNodes = [node() | nodes()],
+ Mem3Nodes = lists:sort(mem3:nodes()),
+ [N || N <- Mem3Nodes, lists:member(N, LiveNodes)].
+
+
+% Replicate "dbs" db to all nodes. Basically push the changes to all the live
+% mem3:nodes(). Returns only after all current changes have been replicated,
+% which could be a while.
+%
+replicate_dbs_to_all_nodes(Timeout) ->
+ DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+ Targets= mem3_util:live_nodes() -- [node()],
+ Res = [start_replication(node(), T, DbName, Timeout) || T <- Targets],
+ collect_replication_results(Res, Timeout).
+
+
+% Replicate "dbs" db from all nodes to this node. Basically make an rpc call
+% to all the nodes an have them push their changes to this node. Then monitor
+% them until they are all done.
+%
+replicate_dbs_from_all_nodes(Timeout) ->
+ DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+ Sources = mem3_util:live_nodes() -- [node()],
+ Res = [start_replication(S, node(), DbName, Timeout) || S <- Sources],
+ collect_replication_results(Res, Timeout).
+
+
+% Spawn and monitor a single replication of a database to a target node.
+% Returns {ok, PidRef}. This function could be called locally or remotely from
+% mem3_rpc, for instance when replicating other nodes' data to this node.
+%
+start_replication(Source, Target, DbName, Timeout) ->
+ spawn_monitor(fun() ->
+ case mem3_rpc:replicate(Source, Target, DbName, Timeout) of
+ {ok, 0} ->
+ exit(ok);
+ Other ->
+ exit(Other)
+ end
+ end).
+
+
+collect_replication_results(Replications, Timeout) ->
+ Res = [collect_replication_result(R, Timeout) || R <- Replications],
+ case [R || R <- Res, R =/= ok] of
+ [] ->
+ ok;
+ Errors ->
+ {error, Errors}
+ end.
+
+
+collect_replication_result({Pid, Ref}, Timeout) when is_pid(Pid) ->
+ receive
+ {'DOWN', Ref, _, _, Res} ->
+ Res
+ after Timeout ->
+ demonitor(Pid, [flush]),
+ exit(Pid, kill),
+ {error, {timeout, Timeout, node(Pid)}}
+ end;
+
+collect_replication_result(Error, _) ->
+ {error, Error}.
+
+
+% Consider these cases:
+%
+% A-------B
+%
+% overlap:
+% X--------Y
+% X-Y
+% X-------Y
+% X-------------------Y
+%
+% no overlap:
+% X-Y because A !=< Y
+% X-Y because X !=< B
+%
+range_overlap([A, B], [X, Y]) when
+ is_integer(A), is_integer(B),
+ is_integer(X), is_integer(Y),
+ A =< B, X =< Y ->
+ A =< Y andalso X =< B.
+
+
+non_overlapping_shards(Shards) ->
+ {Start, End} = lists:foldl(fun(Shard, {Min, Max}) ->
+ [B, E] = mem3:range(Shard),
+ {min(B, Min), max(E, Max)}
+ end, {0, ?RING_END}, Shards),
+ non_overlapping_shards(Shards, Start, End).
+
+
+non_overlapping_shards([], _, _) ->
+ [];
+
+non_overlapping_shards(Shards, Start, End) ->
+ Ranges = lists:map(fun(Shard) ->
+ [B, E] = mem3:range(Shard),
+ {B, E}
+ end, Shards),
+ Ring = get_ring(Ranges, fun sort_ranges_fun/2, Start, End),
+ lists:filter(fun(Shard) ->
+ [B, E] = mem3:range(Shard),
+ lists:member({B, E}, Ring)
+ end, Shards).
+
+
+% Given a list of shards, return the maximum number of copies
+% across all the ranges. If the ring is incomplete it will return 0.
+% If there it is an n = 1 database, it should return 1, etc.
+calculate_max_n(Shards) ->
+ Ranges = lists:map(fun(Shard) ->
+ [B, E] = mem3:range(Shard),
+ {B, E}
+ end, Shards),
+ calculate_max_n(Ranges, get_ring(Ranges), 0).
+
+
+calculate_max_n(_Ranges, [], N) ->
+ N;
+
+calculate_max_n(Ranges, Ring, N) ->
+ NewRanges = Ranges -- Ring,
+ calculate_max_n(NewRanges, get_ring(NewRanges), N + 1).
+
+
+get_ring(Ranges) ->
+ get_ring(Ranges, fun sort_ranges_fun/2, 0, ?RING_END).
+
+
+get_ring(Ranges, SortFun) when is_function(SortFun, 2) ->
+ get_ring(Ranges, SortFun, 0, ?RING_END).
+
+
+get_ring(Ranges, Start, End) when is_integer(Start), is_integer(End),
+ Start >= 0, End >= 0, Start =< End ->
+ get_ring(Ranges, fun sort_ranges_fun/2, Start, End).
+
+% Build a ring out of a list of possibly overlapping ranges. If a ring cannot
+% be built then [] is returned. Start and End supply a custom range such that
+% only intervals in that range will be considered. SortFun is a custom sorting
+% function to sort intervals before the ring is built. The custom sort function
+% can be used to prioritize how the ring is built, for example, whether to use
+% shortest ranges first (and thus have more total shards) or longer or any
+% other scheme.
+%
+get_ring([], _SortFun, _Start, _End) ->
+ [];
+get_ring(Ranges, SortFun, Start, End) when is_function(SortFun, 2),
+ is_integer(Start), is_integer(End),
+ Start >= 0, End >= 0, Start =< End ->
+ Sorted = lists:usort(SortFun, Ranges),
+ case get_subring_int(Start, End, Sorted) of
+ fail -> [];
+ Ring -> Ring
+ end.
+
+
+get_subring_int(_, _, []) ->
+ fail;
+
+get_subring_int(Start, EndMax, [{Start, End} = Range | Tail]) ->
+ case End =:= EndMax of
+ true ->
+ [Range];
+ false ->
+ case get_subring_int(End + 1, EndMax, Tail) of
+ fail ->
+ get_subring_int(Start, EndMax, Tail);
+ Acc ->
+ [Range | Acc]
+ end
+ end;
+
+get_subring_int(Start1, _, [{Start2, _} | _]) when Start2 > Start1 ->
+ % Found a gap, this attempt is done
+ fail;
+
+get_subring_int(Start1, EndMax, [{Start2, _} | Rest]) when Start2 < Start1 ->
+ % We've overlapped the head, skip the shard
+ get_subring_int(Start1, EndMax, Rest).
+
+
+% Sort ranges by starting point, then sort so that
+% the longest range comes first
+sort_ranges_fun({B, E1}, {B, E2}) ->
+ E2 =< E1;
+
+sort_ranges_fun({B1, _}, {B2, _}) ->
+ B1 =< B2.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+range_overlap_test_() ->
+ [?_assertEqual(Res, range_overlap(R1, R2)) || {R1, R2, Res} <- [
+ {[2, 6], [1, 3], true},
+ {[2, 6], [3, 4], true},
+ {[2, 6], [4, 8], true},
+ {[2, 6], [1, 9], true},
+ {[2, 6], [1, 2], true},
+ {[2, 6], [6, 7], true},
+ {[2, 6], [0, 1], false},
+ {[2, 6], [7, 9], false}
+ ]].
+
+
+non_overlapping_shards_test() ->
+ [?_assertEqual(Res, non_overlapping_shards(Shards)) || {Shards, Res} <- [
+ {
+ [shard(0, ?RING_END)],
+ [shard(0, ?RING_END)]
+ },
+ {
+ [shard(0, 1)],
+ [shard(0, 1)]
+ },
+ {
+ [shard(0, 1), shard(0, 1)],
+ [shard(0, 1)]
+ },
+ {
+ [shard(0, 1), shard(3, 4)],
+ []
+ },
+ {
+ [shard(0, 1), shard(1, 2), shard(2, 3)],
+ [shard(0, 1), shard(2, 3)]
+ },
+ {
+ [shard(1, 2), shard(0, 1)],
+ [shard(0, 1), shard(1, 2)]
+ },
+ {
+ [shard(0, 1), shard(0, 2), shard(2, 5), shard(3, 5)],
+ [shard(0, 2), shard(2, 5)]
+ },
+ {
+ [shard(0, 2), shard(4, 5), shard(1, 3)],
+ []
+ }
+
+ ]].
+
+
+calculate_max_n_test_() ->
+ [?_assertEqual(Res, calculate_max_n(Shards)) || {Res, Shards} <- [
+ {0, []},
+ {0, [shard(1, ?RING_END)]},
+ {1, [shard(0, ?RING_END)]},
+ {1, [shard(0, ?RING_END), shard(1, ?RING_END)]},
+ {2, [shard(0, ?RING_END), shard(0, ?RING_END)]},
+ {2, [shard(0, 1), shard(2, ?RING_END), shard(0, ?RING_END)]},
+ {0, [shard(0, 3), shard(5, ?RING_END), shard(1, ?RING_END)]}
+ ]].
+
+
+shard(Begin, End) ->
+ #shard{range = [Begin, End]}.
+
+-endif.
diff --git a/src/mem3/test/mem3_ring_prop_tests.erl b/src/mem3/test/mem3_ring_prop_tests.erl
new file mode 100644
index 000000000..9f4f86f5f
--- /dev/null
+++ b/src/mem3/test/mem3_ring_prop_tests.erl
@@ -0,0 +1,144 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_ring_prop_tests).
+
+
+-include_lib("triq/include/triq.hrl").
+-triq(eunit).
+
+
+% Properties
+
+prop_get_ring_with_connected_intervals() ->
+ ?FORALL({Start, End}, oneof(ranges()),
+ ?FORALL(Intervals, g_connected_intervals(Start, End),
+ mem3_util:get_ring(Intervals, Start, End) =:= lists:sort(Intervals)
+ )
+ ).
+
+
+prop_get_ring_connected_plus_random_intervals() ->
+ ?FORALL({Intervals, Extra}, {g_connected_intervals(1, 100),
+ g_random_intervals(1, 100)},
+ ?IMPLIES(sets:is_disjoint(endpoints(Intervals), endpoints(Extra)),
+ begin
+ AllInts = Intervals ++ Extra,
+ Ring = mem3_util:get_ring(AllInts, 1, 100),
+ Ring =:= lists:sort(Intervals)
+ end
+ )
+ ).
+
+
+prop_get_ring_connected_with_sub_intervals() ->
+ ?FORALL(Intervals, g_connected_intervals(1, 100),
+ ?FORALL(SubIntervals, g_subintervals(Intervals),
+ begin
+ AllInts = Intervals ++ SubIntervals,
+ Ring = mem3_util:get_ring(AllInts, 1, 100),
+ Ring =:= lists:sort(Intervals)
+ end
+ )
+ ).
+
+
+prop_get_ring_with_disconnected_intervals() ->
+ ?FORALL({Start, End}, oneof(ranges()),
+ ?FORALL(Intervals, g_disconnected_intervals(Start, End),
+ mem3_util:get_ring(Intervals, Start, End) =:= []
+ )
+ ).
+
+
+% Generators
+
+ranges() ->
+ [{1, 10}, {0, 2 bsl 31 - 1}, {2 bsl 31 - 10, 2 bsl 31 - 1}].
+
+
+g_connected_intervals(Begin, End) ->
+ ?SIZED(Size, g_connected_intervals(Begin, End, 5 * Size)).
+
+
+g_connected_intervals(Begin, End, Split) when Begin =< End ->
+ ?LET(N, choose(0, Split),
+ begin
+ if
+ N == 0 ->
+ [{Begin, End}];
+ N > 0 ->
+ Ns = lists:seq(1, N - 1),
+ Bs = lists:usort([rand_range(Begin, End) || _ <- Ns]),
+ Es = [B - 1 || B <- Bs],
+ shuffle(lists:zip([Begin] ++ Bs, Es ++ [End]))
+ end
+ end).
+
+
+g_non_trivial_connected_intervals(Begin, End, Split) ->
+ ?SUCHTHAT(Connected, g_connected_intervals(Begin, End, Split),
+ length(Connected) > 1).
+
+
+g_disconnected_intervals(Begin, End) ->
+ ?SIZED(Size, g_disconnected_intervals(Begin, End, Size)).
+
+
+g_disconnected_intervals(Begin, End, Split) when Begin =< End ->
+ ?LET(Connected, g_non_trivial_connected_intervals(Begin, End, Split),
+ begin
+ I = triq_rnd:uniform(length(Connected)) - 1,
+ {Before, [_ | After]} = lists:split(I, Connected),
+ Before ++ After
+ end).
+
+
+g_subintervals(Intervals) ->
+ lists:foldl(fun(R, Acc) -> split_interval(R) ++ Acc end, [], Intervals).
+
+
+split_interval({B, E}) when E - B >= 2 ->
+ E1 = rand_range(B, E) - 1,
+ B1 = E1 + 1,
+ [{B, E1}, {B1, E}];
+
+split_interval(_Range) ->
+ [].
+
+
+g_random_intervals(Start, End) ->
+ ?LET(N, choose(1, 10),
+ begin
+ [begin
+ B = rand_range(Start, End),
+ E = rand_range(B, End),
+ {B, E}
+ end || _ <- lists:seq(1, N)]
+ end).
+
+
+rand_range(B, B) ->
+ B;
+
+rand_range(B, E) ->
+ B + triq_rnd:uniform(E - B).
+
+
+shuffle(L) ->
+ Tagged = [{triq_rnd:uniform(), X} || X <- L],
+ [X || {_, X} <- lists:sort(Tagged)].
+
+
+endpoints(Ranges) ->
+ {Begins, Ends} = lists:unzip(Ranges),
+ sets:from_list(Begins ++ Ends).
diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl
index f774dc9d4..21e2b5388 100644
--- a/src/rexi/src/rexi.erl
+++ b/src/rexi/src/rexi.erl
@@ -12,7 +12,7 @@
-module(rexi).
-export([start/0, stop/0, restart/0]).
--export([cast/2, cast/3, cast/4, kill/2]).
+-export([cast/2, cast/3, cast/4, kill/2, kill_all/1]).
-export([reply/1, sync_reply/1, sync_reply/2]).
-export([async_server_call/2, async_server_call/3]).
-export([stream_init/0, stream_init/1]).
@@ -76,6 +76,18 @@ kill(Node, Ref) ->
rexi_utils:send(rexi_utils:server_pid(Node), cast_msg({kill, Ref})),
ok.
+%% @doc Sends an async kill signal to the remote processes associated with Refs.
+%% No rexi_EXIT message will be sent.
+-spec kill_all([{node(), reference()}]) -> ok.
+kill_all(NodeRefs) when is_list(NodeRefs) ->
+ PerNodeMap = lists:foldl(fun({Node, Ref}, Acc) ->
+ maps:update_with(Node, fun(Refs) -> [Ref | Refs] end, [Ref], Acc)
+ end, #{}, NodeRefs),
+ maps:map(fun(Node, Refs) ->
+ rexi_utils:send(rexi_utils:server_pid(Node), cast_msg({kill_all, Refs}))
+ end, PerNodeMap),
+ ok.
+
%% @equiv async_server_call(Server, self(), Request)
-spec async_server_call(pid() | {atom(),node()}, any()) -> reference().
async_server_call(Server, Request) ->
diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl
index 58a510b68..fedff69c3 100644
--- a/src/rexi/src/rexi_server.erl
+++ b/src/rexi/src/rexi_server.erl
@@ -79,15 +79,13 @@ handle_cast({doit, {ClientPid, ClientRef} = From, Nonce, MFA}, State) ->
{noreply, add_job(Job, State)};
-handle_cast({kill, FromRef}, #st{clients = Clients} = St) ->
- case find_worker(FromRef, Clients) of
- #job{worker = KeyRef, worker_pid = Pid} = Job ->
- erlang:demonitor(KeyRef),
- exit(Pid, kill),
- {noreply, remove_job(Job, St)};
- false ->
- {noreply, St}
- end;
+handle_cast({kill, FromRef}, St) ->
+ kill_worker(FromRef, St),
+ {noreply, St};
+
+handle_cast({kill_all, FromRefs}, St) ->
+ lists:foreach(fun(FromRef) -> kill_worker(FromRef, St) end, FromRefs),
+ {noreply, St};
handle_cast(_, St) ->
couch_log:notice("rexi_server ignored_cast", []),
@@ -181,3 +179,15 @@ find_worker(Ref, Tab) ->
notify_caller({Caller, Ref}, Reason) ->
rexi_utils:send(Caller, {Ref, {rexi_EXIT, Reason}}).
+
+
+kill_worker(FromRef, #st{clients = Clients} = St) ->
+ case find_worker(FromRef, Clients) of
+ #job{worker = KeyRef, worker_pid = Pid} = Job ->
+ erlang:demonitor(KeyRef),
+ exit(Pid, kill),
+ remove_job(Job, St),
+ ok;
+ false ->
+ ok
+ end.