diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2019-03-18 13:32:14 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2019-04-03 10:48:45 -0400 |
commit | d10b7955929c1581f192fc487840cbf5005f84c9 (patch) | |
tree | 7b66fab88eba4c39c1a95919792cb972be7157e7 | |
parent | 3227e61ebbe6ee15d122036ec85b57ebffeb062d (diff) | |
download | couchdb-d10b7955929c1581f192fc487840cbf5005f84c9.tar.gz |
Uneven shard copy handling in mem3 and fabric
The introduction of shard splitting will eliminate the contraint that all
document copies are located in shards with same range boundaries. That
assumption was made by default in mem3 and fabric functions that do shard
replacement, worker spawning, unpacking `_changes` update sequences and some
others. This commit updates those places to handle the case where document
copies might be in different shard ranges.
A good place to start from is the `mem3_util:get_ring()` function. This
function returns a full non-overlapped ring from a set of possibly overlapping
shards.
This function is used by almost everything else in this commit:
1) It's used when only a single copy of the data is needed, for example in
cases where _all_docs or _changes procesessig.
2) Used when checking if progress is possible after some nodes died.
`get_ring()` returns `[]` when it cannot find a full ring is used to indicate
that progress is not possible.
3) During shard replacement. This is pershaps the most complicated case. During
replacement besides just finding a possible covering of the ring from the set
of shards, it is also desirable to find one that minimizes the number of
workers that have to be replaced. A neat trick used here is to provide
`get_ring` with a custom sort function, which prioritizes certain shard copies
over others. In case of replacements it prioritiezes shards for which workers
have already spawned. In the default cause `get_ring()` will prioritize longer
ranges over shorter ones, so for example, to cover the interval [00-ff] with
either [00-7f, 80-ff] or [00-ff] shards ranges, it will pick the single [00-ff]
range instead of [00-7f, 80-ff] pair.
Co-authored-by: Paul J. Davis <davisp@apache.org>
29 files changed, 1719 insertions, 422 deletions
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index a40f1131e..ab38eb895 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -1526,6 +1526,17 @@ calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) -> calculate_start_seq(Db, Node, {Seq, Uuid}) -> % Treat the current node as the epoch node calculate_start_seq(Db, Node, {Seq, Uuid, Node}); +calculate_start_seq(Db, _Node, {Seq, {split, Uuid}, EpochNode}) -> + case is_owner(EpochNode, Seq, get_epochs(Db)) of + true -> + % Find last replicated sequence from split source to target + mem3_rep:find_split_target_seq(Db, EpochNode, Uuid, Seq); + false -> + couch_log:warning("~p calculate_start_seq not owner " + "db: ~p, seq: ~p, uuid: ~p, epoch_node: ~p, epochs: ~p", + [?MODULE, Db#db.name, Seq, Uuid, EpochNode, get_epochs(Db)]), + 0 + end; calculate_start_seq(Db, _Node, {Seq, Uuid, EpochNode}) -> case is_prefix(Uuid, get_uuid(Db)) of true -> diff --git a/src/couch/src/couch_multidb_changes.erl b/src/couch/src/couch_multidb_changes.erl index b6a7873fb..ec805632a 100644 --- a/src/couch/src/couch_multidb_changes.erl +++ b/src/couch/src/couch_multidb_changes.erl @@ -185,7 +185,8 @@ register_with_event_server(Server) -> -spec db_callback(created | deleted | updated, binary(), #state{}) -> #state{}. db_callback(created, DbName, #state{mod = Mod, ctx = Ctx} = State) -> - State#state{ctx = Mod:db_created(DbName, Ctx)}; + NewState = State#state{ctx = Mod:db_created(DbName, Ctx)}, + resume_scan(DbName, NewState); db_callback(deleted, DbName, #state{mod = Mod, ctx = Ctx} = State) -> State#state{ctx = Mod:db_deleted(DbName, Ctx)}; db_callback(updated, DbName, State) -> @@ -446,7 +447,8 @@ t_handle_call_checkpoint_existing() -> t_handle_info_created() -> ?_test(begin - State = mock_state(), + Tid = mock_ets(), + State = mock_state(Tid), handle_info_check({'$couch_event', ?DBNAME, created}, State), ?assert(meck:validate(?MOD)), ?assert(meck:called(?MOD, db_created, [?DBNAME, zig])) diff --git a/src/fabric/include/fabric.hrl b/src/fabric/include/fabric.hrl index be1d63926..2a4da8bcf 100644 --- a/src/fabric/include/fabric.hrl +++ b/src/fabric/include/fabric.hrl @@ -36,8 +36,10 @@ -record(stream_acc, { workers, + ready, start_fun, - replacements + replacements, + ring_opts }). -record(view_row, {key, id, value, doc, worker}). diff --git a/src/fabric/src/fabric_db_doc_count.erl b/src/fabric/src/fabric_db_doc_count.erl index a0fd3ecd1..a91014b7c 100644 --- a/src/fabric/src/fabric_db_doc_count.erl +++ b/src/fabric/src/fabric_db_doc_count.erl @@ -22,7 +22,7 @@ go(DbName) -> Shards = mem3:shards(DbName), Workers = fabric_util:submit_jobs(Shards, get_doc_count, []), RexiMon = fabric_util:create_monitors(Shards), - Acc0 = {fabric_dict:init(Workers, nil), 0}, + Acc0 = {fabric_dict:init(Workers, nil), []}, try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of {timeout, {WorkersDict, _}} -> DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil), @@ -34,38 +34,29 @@ go(DbName) -> rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) -> - case fabric_util:remove_down_workers(Counters, NodeRef) of - {ok, NewCounters} -> - {ok, {NewCounters, Acc}}; - error -> - {error, {nodedown, <<"progress not possible">>}} +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Resps}) -> + case fabric_ring:node_down(NodeRef, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps}}; + error -> {error, {nodedown, <<"progress not possible">>}} end; -handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) -> - NewCounters = lists:keydelete(Shard, #shard.ref, Counters), - case fabric_view:is_progress_possible(NewCounters) of - true -> - {ok, {NewCounters, Acc}}; - false -> - {error, Reason} +handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps}) -> + case fabric_ring:handle_error(Shard, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps}}; + error -> {error, Reason} end; -handle_message({ok, Count}, Shard, {Counters, Acc}) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - % already heard from someone else in this range - {ok, {Counters, Acc}}; - nil -> - C1 = fabric_dict:store(Shard, ok, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - case fabric_dict:any(nil, C2) of - true -> - {ok, {C2, Count+Acc}}; - false -> - {stop, Count+Acc} - end +handle_message({ok, Count}, Shard, {Counters, Resps}) -> + case fabric_ring:handle_response(Shard, Count, Counters, Resps) of + {ok, {Counters1, Resps1}} -> + {ok, {Counters1, Resps1}}; + {stop, Resps1} -> + Total = fabric_dict:fold(fun(_, C, A) -> A + C end, 0, Resps1), + {stop, Total} end; -handle_message(_, _, Acc) -> - {ok, Acc}. +handle_message(Reason, Shard, {Counters, Resps}) -> + case fabric_ring:handle_error(Shard, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps}}; + error -> {error, Reason} + end. diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl index fe93878b5..bb7a3530e 100644 --- a/src/fabric/src/fabric_db_info.erl +++ b/src/fabric/src/fabric_db_info.erl @@ -23,7 +23,8 @@ go(DbName) -> RexiMon = fabric_util:create_monitors(Shards), Fun = fun handle_message/3, {ok, ClusterInfo} = get_cluster_info(Shards), - Acc0 = {fabric_dict:init(Workers, nil), [], [{cluster, ClusterInfo}]}, + CInfo = [{cluster, ClusterInfo}], + Acc0 = {fabric_dict:init(Workers, nil), [], CInfo}, try case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of @@ -46,49 +47,47 @@ go(DbName) -> rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, - _, {_,NodeRef},_}, _Shard, {Counters, PseqAcc, Acc}) -> - case fabric_util:remove_down_workers(Counters, NodeRef) of - {ok, NewCounters} -> - {ok, {NewCounters, PseqAcc, Acc}}; - error -> - {error, {nodedown, <<"progress not possible">>}} + +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _, {Counters, Resps, CInfo}) -> + case fabric_ring:node_down(NodeRef, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps, CInfo}}; + error -> {error, {nodedown, <<"progress not possible">>}} + end; + +handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps, CInfo}) -> + case fabric_ring:handle_error(Shard, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps, CInfo}}; + error -> {error, Reason} end; -handle_message({rexi_EXIT, Reason}, Shard, {Counters, PseqAcc, Acc}) -> - NewCounters = fabric_dict:erase(Shard, Counters), - case fabric_view:is_progress_possible(NewCounters) of - true -> - {ok, {NewCounters, PseqAcc, Acc}}; - false -> - {error, Reason} +handle_message({ok, Info}, Shard, {Counters, Resps, CInfo}) -> + case fabric_ring:handle_response(Shard, Info, Counters, Resps) of + {ok, {Counters1, Resps1}} -> + {ok, {Counters1, Resps1, CInfo}}; + {stop, Resps1} -> + {stop, build_final_response(CInfo, Shard#shard.dbname, Resps1)} end; -handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, PseqAcc, Acc}) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - % already heard from someone else in this range - {ok, {Counters, PseqAcc, Acc}}; - nil -> +handle_message(Reason, Shard, {Counters, Resps, CInfo}) -> + case fabric_ring:handle_error(Shard, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps, CInfo}}; + error -> {error, Reason} + end. + + +build_final_response(CInfo, DbName, Responses) -> + AccF = fabric_dict:fold(fun(Shard, Info, {Seqs, PSeqs, Infos}) -> Seq = couch_util:get_value(update_seq, Info), - C1 = fabric_dict:store(Shard, Seq, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), PSeq = couch_util:get_value(purge_seq, Info), - NewPseqAcc = [{Shard, PSeq}|PseqAcc], - case fabric_dict:any(nil, C2) of - true -> - {ok, {C2, NewPseqAcc, [Info|Acc]}}; - false -> - {stop, [ - {db_name,Name}, - {purge_seq, fabric_view_changes:pack_seqs(NewPseqAcc)}, - {update_seq, fabric_view_changes:pack_seqs(C2)} | - merge_results(lists:flatten([Info|Acc])) - ]} - end - end; -handle_message(_, _, Acc) -> - {ok, Acc}. + {[{Shard, Seq} | Seqs], [{Shard, PSeq} | PSeqs], [Info | Infos]} + end, {[], [], []}, Responses), + {Seqs, PSeqs, Infos} = AccF, + PackedSeq = fabric_view_changes:pack_seqs(Seqs), + PackedPSeq = fabric_view_changes:pack_seqs(PSeqs), + MergedInfos = merge_results(lists:flatten([CInfo | Infos])), + Sequences = [{purge_seq, PackedPSeq}, {update_seq, PackedSeq}], + [{db_name, DbName}] ++ Sequences ++ MergedInfos. + merge_results(Info) -> Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, diff --git a/src/fabric/src/fabric_db_meta.erl b/src/fabric/src/fabric_db_meta.erl index 26e1b3752..348b06d51 100644 --- a/src/fabric/src/fabric_db_meta.erl +++ b/src/fabric/src/fabric_db_meta.erl @@ -135,9 +135,9 @@ check_sec_set_int(NumWorkers, SetWorkers) -> true -> throw(no_majority); false -> ok end, - % Hack to reuse fabric_view:is_progress_possible/1 + % Hack to reuse fabric_ring:is_progress_possible/1 FakeCounters = [{S, 0} || S <- SetWorkers], - case fabric_view:is_progress_possible(FakeCounters) of + case fabric_ring:is_progress_possible(FakeCounters) of false -> throw(no_ring); true -> ok end, diff --git a/src/fabric/src/fabric_db_partition_info.erl b/src/fabric/src/fabric_db_partition_info.erl index 97e669a52..2978832f0 100644 --- a/src/fabric/src/fabric_db_partition_info.erl +++ b/src/fabric/src/fabric_db_partition_info.erl @@ -52,7 +52,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) -> handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) -> NewCounters = fabric_dict:erase(Shard, Counters), - case fabric_view:is_progress_possible(NewCounters) of + case fabric_ring:is_progress_possible(NewCounters) of true -> {ok, {NewCounters, Acc}}; false -> diff --git a/src/fabric/src/fabric_db_update_listener.erl b/src/fabric/src/fabric_db_update_listener.erl index 13d8b9e7b..fb2937be1 100644 --- a/src/fabric/src/fabric_db_update_listener.erl +++ b/src/fabric/src/fabric_db_update_listener.erl @@ -111,7 +111,7 @@ cleanup_monitor(Parent, Ref, Notifiers) -> end. stop_update_notifiers(Notifiers) -> - [rexi:kill(Node, Ref) || #worker{node=Node, ref=Ref} <- Notifiers]. + rexi:kill_all([{N, Ref} || #worker{node = N, ref = Ref} <- Notifiers]). stop({Pid, Ref}) -> erlang:send(Pid, {Ref, done}). @@ -169,7 +169,7 @@ handle_message(done, _, _) -> handle_error(Node, Reason, #acc{shards = Shards} = Acc) -> Rest = lists:filter(fun(#shard{node = N}) -> N /= Node end, Shards), - case fabric_view:is_progress_possible([{R, nil} || R <- Rest]) of + case fabric_ring:is_progress_possible([{R, nil} || R <- Rest]) of true -> {ok, Acc#acc{shards = Rest}}; false -> diff --git a/src/fabric/src/fabric_design_doc_count.erl b/src/fabric/src/fabric_design_doc_count.erl index 22d03c5d4..b0efc3007 100644 --- a/src/fabric/src/fabric_design_doc_count.erl +++ b/src/fabric/src/fabric_design_doc_count.erl @@ -22,7 +22,7 @@ go(DbName) -> Shards = mem3:shards(DbName), Workers = fabric_util:submit_jobs(Shards, get_design_doc_count, []), RexiMon = fabric_util:create_monitors(Shards), - Acc0 = {fabric_dict:init(Workers, nil), 0}, + Acc0 = {fabric_dict:init(Workers, nil), []}, try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of {timeout, {WorkersDict, _}} -> DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil), @@ -34,36 +34,29 @@ go(DbName) -> rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) -> - case fabric_util:remove_down_workers(Counters, NodeRef) of - {ok, NewCounters} -> - {ok, {NewCounters, Acc}}; - error -> - {error, {nodedown, <<"progress not possible">>}} +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Resps}) -> + case fabric_ring:node_down(NodeRef, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps}}; + error -> {error, {nodedown, <<"progress not possible">>}} end; -handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) -> - NewCounters = lists:keydelete(Shard, #shard.ref, Counters), - case fabric_view:is_progress_possible(NewCounters) of - true -> - {ok, {NewCounters, Acc}}; - false -> - {error, Reason} +handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps}) -> + case fabric_ring:handle_error(Shard, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps}}; + error -> {error, Reason} end; -handle_message({ok, Count}, Shard, {Counters, Acc}) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - {ok, {Counters, Acc}}; - nil -> - C1 = fabric_dict:store(Shard, ok, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - case fabric_dict:any(nil, C2) of - true -> - {ok, {C2, Count+Acc}}; - false -> - {stop, Count+Acc} - end +handle_message({ok, Count}, Shard, {Counters, Resps}) -> + case fabric_ring:handle_response(Shard, Count, Counters, Resps) of + {ok, {Counters1, Resps1}} -> + {ok, {Counters1, Resps1}}; + {stop, Resps1} -> + Total = fabric_dict:fold(fun(_, C, A) -> A + C end, 0, Resps1), + {stop, Total} end; -handle_message(_, _, Acc) -> - {ok, Acc}. + +handle_message(Reason, Shard, {Counters, Resps}) -> + case fabric_ring:handle_error(Shard, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps}}; + error -> {error, Reason} + end. diff --git a/src/fabric/src/fabric_dict.erl b/src/fabric/src/fabric_dict.erl index a336b47b0..b63ed2095 100644 --- a/src/fabric/src/fabric_dict.erl +++ b/src/fabric/src/fabric_dict.erl @@ -56,3 +56,6 @@ fold(Fun, Acc0, Dict) -> to_list(Dict) -> orddict:to_list(Dict). + +from_list(KVList) when is_list(KVList) -> + orddict:from_list(KVList). diff --git a/src/fabric/src/fabric_doc_open.erl b/src/fabric/src/fabric_doc_open.erl index aafdcfb79..743ad8c74 100644 --- a/src/fabric/src/fabric_doc_open.erl +++ b/src/fabric/src/fabric_doc_open.erl @@ -322,7 +322,7 @@ t_handle_message_reply() -> Acc0 = #acc{workers=Workers, r=2, replies=[]}, ?_test(begin - meck:expect(rexi, kill, fun(_, _) -> ok end), + meck:expect(rexi, kill_all, fun(_) -> ok end), % Test that we continue when we haven't met R yet ?assertMatch( @@ -416,7 +416,7 @@ t_store_node_revs() -> InitAcc = #acc{workers = [W1, W2, W3], replies = [], r = 2}, ?_test(begin - meck:expect(rexi, kill, fun(_, _) -> ok end), + meck:expect(rexi, kill_all, fun(_) -> ok end), % Simple case {ok, #acc{node_revs = NodeRevs1}} = handle_message(Foo1, W1, InitAcc), diff --git a/src/fabric/src/fabric_group_info.erl b/src/fabric/src/fabric_group_info.erl index 8383a7e28..be507420e 100644 --- a/src/fabric/src/fabric_group_info.erl +++ b/src/fabric/src/fabric_group_info.erl @@ -27,7 +27,8 @@ go(DbName, #doc{id=DDocId}) -> Ushards = mem3:ushards(DbName), Workers = fabric_util:submit_jobs(Shards, group_info, [DDocId]), RexiMon = fabric_util:create_monitors(Shards), - Acc = acc_init(Workers, Ushards), + USet = sets:from_list([{Id, N} || #shard{name = Id, node = N} <- Ushards]), + Acc = {fabric_dict:init(Workers, nil), [], USet}, try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc) of {timeout, {WorkersDict, _, _}} -> DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil), @@ -39,56 +40,42 @@ go(DbName, #doc{id=DDocId}) -> rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, - {Counters, Acc, Ushards}) -> - case fabric_util:remove_down_workers(Counters, NodeRef) of - {ok, NewCounters} -> - {ok, {NewCounters, Acc, Ushards}}; - error -> - {error, {nodedown, <<"progress not possible">>}} +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _, {Counters, Resps, USet}) -> + case fabric_ring:node_down(NodeRef, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps, USet}}; + error -> {error, {nodedown, <<"progress not possible">>}} end; -handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc, Ushards}) -> - NewCounters = lists:keydelete(Shard, #shard.ref, Counters), - case fabric_view:is_progress_possible(NewCounters) of - true -> - {ok, {NewCounters, Acc, Ushards}}; - false -> - {error, Reason} +handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps, USet}) -> + case fabric_ring:handle_error(Shard, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps, USet}}; + error -> {error, Reason} end; -handle_message({ok, Info}, Shard, {Counters0, Acc, Ushards}) -> - case fabric_dict:lookup_element(Shard, Counters0) of - undefined -> - % already heard from other node in this range - {ok, {Counters0, Acc, Ushards}}; - nil -> - NewAcc = append_result(Info, Shard, Acc, Ushards), - Counters1 = fabric_dict:store(Shard, ok, Counters0), - Counters = fabric_view:remove_overlapping_shards(Shard, Counters1), - case is_complete(Counters) of - false -> - {ok, {Counters, NewAcc, Ushards}}; - true -> - Pending = aggregate_pending(NewAcc), - Infos = get_infos(NewAcc), - Results = [{updates_pending, {Pending}} | merge_results(Infos)], - {stop, Results} - end +handle_message({ok, Info}, Shard, {Counters, Resps, USet}) -> + case fabric_ring:handle_response(Shard, Info, Counters, Resps) of + {ok, {Counters1, Resps1}} -> + {ok, {Counters1, Resps1, USet}}; + {stop, Resps1} -> + {stop, build_final_response(USet, Resps1)} end; -handle_message(_, _, Acc) -> - {ok, Acc}. -acc_init(Workers, Ushards) -> - Set = sets:from_list([{Id, N} || #shard{name = Id, node = N} <- Ushards]), - {fabric_dict:init(Workers, nil), dict:new(), Set}. +handle_message(Reason, Shard, {Counters, Resps, USet}) -> + case fabric_ring:handle_error(Shard, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps, USet}}; + error -> {error, Reason} + end. + -is_complete(Counters) -> - not fabric_dict:any(nil, Counters). +build_final_response(USet, Responses) -> + AccF = fabric_dict:fold(fun(#shard{name = Id, node = Node}, Info, Acc) -> + IsPreferred = sets:is_element({Id, Node}, USet), + dict:append(Id, {Node, IsPreferred, Info}, Acc) + end, dict:new(), Responses), + Pending = aggregate_pending(AccF), + Infos = get_infos(AccF), + [{updates_pending, {Pending}} | merge_results(Infos)]. -append_result(Info, #shard{name = Name, node = Node}, Acc, Ushards) -> - IsPreferred = sets:is_element({Name, Node}, Ushards), - dict:append(Name, {Node, IsPreferred, Info}, Acc). get_infos(Acc) -> Values = [V || {_, V} <- dict:to_list(Acc)], diff --git a/src/fabric/src/fabric_ring.erl b/src/fabric/src/fabric_ring.erl new file mode 100644 index 000000000..110edb9ab --- /dev/null +++ b/src/fabric/src/fabric_ring.erl @@ -0,0 +1,519 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_ring). + + +-export([ + is_progress_possible/1, + is_progress_possible/2, + get_shard_replacements/2, + node_down/3, + node_down/4, + handle_error/3, + handle_error/4, + handle_response/4, + handle_response/5 +]). + + +-include_lib("fabric/include/fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + + +-type fabric_dict() :: [{#shard{}, any()}]. +-type ring_opts() :: [atom() | tuple()]. + + +%% @doc looks for a fully covered keyrange in the list of counters +-spec is_progress_possible(fabric_dict()) -> boolean(). +is_progress_possible(Counters) -> + is_progress_possible(Counters, []). + + +%% @doc looks for a fully covered keyrange in the list of counters +%% This version take ring option to configure how progress will +%% be checked. By default, [], checks that the full ring is covered. +-spec is_progress_possible(fabric_dict(), ring_opts()) -> boolean(). +is_progress_possible(Counters, RingOpts) -> + is_progress_possible(Counters, [], 0, ?RING_END, RingOpts). + + +-spec get_shard_replacements(binary(), [#shard{}]) -> [#shard{}]. +get_shard_replacements(DbName, UsedShards0) -> + % We only want to generate a replacements list from shards + % that aren't already used. + AllLiveShards = mem3:live_shards(DbName, [node() | nodes()]), + UsedShards = [S#shard{ref=undefined} || S <- UsedShards0], + get_shard_replacements_int(AllLiveShards -- UsedShards, UsedShards). + + +-spec node_down(node(), fabric_dict(), fabric_dict()) -> + {ok, fabric_dict()} | error. +node_down(Node, Workers, Responses) -> + node_down(Node, Workers, Responses, []). + + +-spec node_down(node(), fabric_dict(), fabric_dict(), ring_opts()) -> + {ok, fabric_dict()} | error. +node_down(Node, Workers, Responses, RingOpts) -> + {B, E} = range_bounds(Workers, Responses), + Workers1 = fabric_dict:filter(fun(#shard{node = N}, _) -> + N =/= Node + end, Workers), + case is_progress_possible(Workers1, Responses, B, E, RingOpts) of + true -> {ok, Workers1}; + false -> error + end. + + +-spec handle_error(#shard{}, fabric_dict(), fabric_dict()) -> + {ok, fabric_dict()} | error. +handle_error(Shard, Workers, Responses) -> + handle_error(Shard, Workers, Responses, []). + + +-spec handle_error(#shard{}, fabric_dict(), fabric_dict(), ring_opts()) -> + {ok, fabric_dict()} | error. +handle_error(Shard, Workers, Responses, RingOpts) -> + {B, E} = range_bounds(Workers, Responses), + Workers1 = fabric_dict:erase(Shard, Workers), + case is_progress_possible(Workers1, Responses, B, E, RingOpts) of + true -> {ok, Workers1}; + false -> error + end. + + +-spec handle_response(#shard{}, any(), fabric_dict(), fabric_dict()) -> + {ok, {fabric_dict(), fabric_dict()}} | {stop, fabric_dict()}. +handle_response(Shard, Response, Workers, Responses) -> + handle_response(Shard, Response, Workers, Responses, []). + + +-spec handle_response(#shard{}, any(), fabric_dict(), fabric_dict(), + ring_opts()) -> + {ok, {fabric_dict(), fabric_dict()}} | {stop, fabric_dict()}. +handle_response(Shard, Response, Workers, Responses, RingOpts) -> + handle_response(Shard, Response, Workers, Responses, RingOpts, + fun stop_workers/1). + + +% Worker response handler. Gets reponses from shard and puts them in the list +% until they complete a full ring. Then kill unused responses and remaining +% workers. +% +% How a ring "completes" is driven by RingOpts: +% +% * When RingOpts is [] (the default case) responses must form a "clean" +% ring, where all copies at the start of the range and end of the range must +% have the same boundary values. +% +% * When RingOpts is [{any, [#shard{}]}] responses are accepted from any of +% the provided list of shards. This type of ring might be used when querying +% a partitioned database. As soon as a result from any of the shards +% arrives, result collection stops. +% +handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) -> + Workers1 = fabric_dict:erase(Shard, Workers), + case RingOpts of + [] -> + #shard{range = [B, E]} = Shard, + Responses1 = [{{B, E}, Shard, Response} | Responses], + handle_response_ring(Workers1, Responses1, CleanupCb); + [{any, Any}] -> + handle_response_any(Shard, Response, Workers1, Any, CleanupCb) + end. + + +handle_response_ring(Workers, Responses, CleanupCb) -> + {MinB, MaxE} = range_bounds(Workers, Responses), + Ranges = lists:map(fun({R, _, _}) -> R end, Responses), + case mem3_util:get_ring(Ranges, MinB, MaxE) of + [] -> + {ok, {Workers, Responses}}; + Ring -> + % Return one response per range in the ring. The + % response list is reversed before sorting so that the + % first shard copy to reply is first. We use keysort + % because it is documented as being stable so that + % we keep the relative order of duplicate shards + SortedResponses = lists:keysort(1, lists:reverse(Responses)), + UsedResponses = get_responses(Ring, SortedResponses), + % Kill all the remaining workers as well as the redunant responses + stop_unused_workers(Workers, Responses, UsedResponses, CleanupCb), + {stop, fabric_dict:from_list(UsedResponses)} + end. + + +handle_response_any(Shard, Response, Workers, Any, CleanupCb) -> + case lists:member(Shard#shard{ref = undefined}, Any) of + true -> + stop_unused_workers(Workers, [], [], CleanupCb), + {stop, fabric_dict:from_list([{Shard, Response}])}; + false -> + {ok, {Workers, []}} + end. + + +% Check if workers still waiting and the already received responses could +% still form a continous range. The range won't always be the full ring, and +% the bounds are computed based on the minimum and maximum interval beginning +% and ends. +% +% There is also a special case where even if the ring cannot be formed, but +% there is an overlap between all the shards, then it's considered that +% progress can still be made. This is essentially to allow for split +% partitioned shards where one shard copy on a node was split the set of ranges +% might look like: 00-ff, 00-ff, 07-ff. Even if both 00-ff workers exit, +% progress can still be made with the remaining 07-ff copy. +% +-spec is_progress_possible(fabric_dict(), [{any(), #shard{}, any()}], + non_neg_integer(), non_neg_integer(), ring_opts()) -> boolean(). +is_progress_possible([], [], _, _, _) -> + false; + +is_progress_possible(Counters, Responses, MinB, MaxE, []) -> + ResponseRanges = lists:map(fun({{B, E}, _, _}) -> {B, E} end, Responses), + Ranges = fabric_util:worker_ranges(Counters) ++ ResponseRanges, + mem3_util:get_ring(Ranges, MinB, MaxE) =/= []; + +is_progress_possible(Counters, Responses, _, _, [{any, AnyShards}]) -> + InAny = fun(S) -> lists:member(S#shard{ref = undefined}, AnyShards) end, + case fabric_dict:filter(fun(S, _) -> InAny(S) end, Counters) of + [] -> + case lists:filter(fun({_, S, _}) -> InAny(S) end, Responses) of + [] -> false; + [_ | _] -> true + end; + [_ | _] -> + true + end. + + +get_shard_replacements_int(UnusedShards, UsedShards) -> + % If we have more than one copy of a range then we don't + % want to try and add a replacement to any copy. + RangeCounts = lists:foldl(fun(#shard{range=R}, Acc) -> + dict:update_counter(R, 1, Acc) + end, dict:new(), UsedShards), + + % For each seq shard range with a count of 1, find any + % possible replacements from the unused shards. The + % replacement list is keyed by range. + lists:foldl(fun(#shard{range = [B, E] = Range}, Acc) -> + case dict:find(Range, RangeCounts) of + {ok, 1} -> + Repls = mem3_util:non_overlapping_shards(UnusedShards, B, E), + % Only keep non-empty lists of replacements + if Repls == [] -> Acc; true -> + [{Range, Repls} | Acc] + end; + _ -> + Acc + end + end, [], UsedShards). + + +range_bounds(Workers, Responses) -> + RespRanges = lists:map(fun({R, _, _}) -> R end, Responses), + Ranges = fabric_util:worker_ranges(Workers) ++ RespRanges, + {Bs, Es} = lists:unzip(Ranges), + {lists:min(Bs), lists:max(Es)}. + + +get_responses([], _) -> + []; + +get_responses([Range | Ranges], [{Range, Shard, Value} | Resps]) -> + [{Shard, Value} | get_responses(Ranges, Resps)]; + +get_responses(Ranges, [_DupeRangeResp | Resps]) -> + get_responses(Ranges, Resps). + + +stop_unused_workers(_, _, _, undefined) -> + ok; + +stop_unused_workers(Workers, AllResponses, UsedResponses, CleanupCb) -> + WorkerShards = [S || {S, _} <- Workers], + Used = [S || {S, _} <- UsedResponses], + Unused = [S || {_, S, _} <- AllResponses, not lists:member(S, Used)], + CleanupCb(WorkerShards ++ Unused). + + +stop_workers(Shards) when is_list(Shards) -> + rexi:kill_all([{Node, Ref} || #shard{node = Node, ref = Ref} <- Shards]). + + +% Unit tests + +is_progress_possible_full_range_test() -> + % a base case + ?assertEqual(false, is_progress_possible([], [], 0, 0, [])), + T1 = [[0, ?RING_END]], + ?assertEqual(true, is_progress_possible(mk_cnts(T1))), + T2 = [[0, 10], [11, 20], [21, ?RING_END]], + ?assertEqual(true, is_progress_possible(mk_cnts(T2))), + % gap + T3 = [[0, 10], [12, ?RING_END]], + ?assertEqual(false, is_progress_possible(mk_cnts(T3))), + % outside range + T4 = [[1, 10], [11, 20], [21, ?RING_END]], + ?assertEqual(false, is_progress_possible(mk_cnts(T4))), + % outside range + T5 = [[0, 10], [11, 20], [21, ?RING_END + 1]], + ?assertEqual(false, is_progress_possible(mk_cnts(T5))), + % possible progress but with backtracking + T6 = [[0, 10], [11, 20], [0, 5], [6, 21], [21, ?RING_END]], + ?assertEqual(true, is_progress_possible(mk_cnts(T6))), + % not possible, overlap is not exact + T7 = [[0, 10], [13, 20], [21, ?RING_END], [9, 12]], + ?assertEqual(false, is_progress_possible(mk_cnts(T7))). + + +is_progress_possible_with_responses_test() -> + C1 = mk_cnts([[0, ?RING_END]]), + ?assertEqual(true, is_progress_possible(C1, [], 0, ?RING_END, [])), + % check for gaps + C2 = mk_cnts([[5, 6], [7, 8]]), + ?assertEqual(true, is_progress_possible(C2, [], 5, 8, [])), + ?assertEqual(false, is_progress_possible(C2, [], 4, 8, [])), + ?assertEqual(false, is_progress_possible(C2, [], 5, 7, [])), + ?assertEqual(false, is_progress_possible(C2, [], 4, 9, [])), + % check for uneven shard range copies + C3 = mk_cnts([[2, 5], [2, 10]]), + ?assertEqual(true, is_progress_possible(C3, [], 2, 10, [])), + ?assertEqual(false, is_progress_possible(C3, [], 2, 11, [])), + ?assertEqual(false, is_progress_possible(C3, [], 3, 10, [])), + % they overlap but still not a proper ring + C4 = mk_cnts([[2, 4], [3, 7], [6, 10]]), + ?assertEqual(false, is_progress_possible(C4, [], 2, 10, [])), + % some of the ranges are in responses + RS1 = mk_resps([{"n1", 7, 8, 42}]), + C5 = mk_cnts([[5, 6]]), + ?assertEqual(true, is_progress_possible(C5, RS1, 5, 8, [])), + ?assertEqual(false, is_progress_possible([], RS1, 5, 8, [])), + ?assertEqual(true, is_progress_possible([], RS1, 7, 8, [])). + + +is_progress_possible_with_ring_opts_test() -> + Opts = [{any, [mk_shard("n1", [0, 5]), mk_shard("n2", [3, 10])]}], + C1 = [{mk_shard("n1", [0, ?RING_END]), nil}], + RS1 = mk_resps([{"n1", 3, 10, 42}]), + ?assertEqual(false, is_progress_possible(C1, [], 0, ?RING_END, Opts)), + ?assertEqual(false, is_progress_possible([], [], 0, ?RING_END, Opts)), + ?assertEqual(false, is_progress_possible([], RS1, 0, ?RING_END, Opts)), + % explicitly accept only the shard specified in the ring options + ?assertEqual(false, is_progress_possible([], RS1, 3, 10, [{any, []}])), + % need to match the node exactly + ?assertEqual(false, is_progress_possible([], RS1, 3, 10, Opts)), + RS2 = mk_resps([{"n2", 3, 10, 42}]), + ?assertEqual(true, is_progress_possible([], RS2, 3, 10, Opts)), + % assert that counters can fill the ring not just the response + C2 = [{mk_shard("n1", [0, 5]), nil}], + ?assertEqual(true, is_progress_possible(C2, [], 0, ?RING_END, Opts)). + + +get_shard_replacements_test() -> + Unused = [mk_shard(N, [B, E]) || {N, B, E} <- [ + {"n1", 11, 20}, {"n1", 21, ?RING_END}, + {"n2", 0, 4}, {"n2", 5, 10}, {"n2", 11, 20}, + {"n3", 0, 21, ?RING_END} + ]], + Used = [mk_shard(N, [B, E]) || {N, B, E} <- [ + {"n2", 21, ?RING_END}, + {"n3", 0, 10}, {"n3", 11, 20} + ]], + Res = lists:sort(get_shard_replacements_int(Unused, Used)), + % Notice that [0, 10] range can be replaces by spawning the + % [0, 4] and [5, 10] workers on n1 + Expect = [ + {[0, 10], [mk_shard("n2", [0, 4]), mk_shard("n2", [5, 10])]}, + {[11, 20], [mk_shard("n1", [11, 20]), mk_shard("n2", [11, 20])]}, + {[21, ?RING_END], [mk_shard("n1", [21, ?RING_END])]} + ], + ?assertEqual(Expect, Res). + + +handle_response_basic_test() -> + Shard1 = mk_shard("n1", [0, 1]), + Shard2 = mk_shard("n1", [2, ?RING_END]), + + Workers1 = fabric_dict:init([Shard1, Shard2], nil), + + Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined), + ?assertMatch({ok, {_, _}}, Result1), + {ok, {Workers2, Responses1}} = Result1, + ?assertEqual(fabric_dict:erase(Shard1, Workers1), Workers2), + ?assertEqual([{{0, 1}, Shard1, 42}], Responses1), + + Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined), + ?assertEqual({stop, [{Shard1, 42}, {Shard2, 43}]}, Result2). + + +handle_response_incomplete_ring_test() -> + Shard1 = mk_shard("n1", [0, 1]), + Shard2 = mk_shard("n1", [2, 10]), + + Workers1 = fabric_dict:init([Shard1, Shard2], nil), + + Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined), + ?assertMatch({ok, {_, _}}, Result1), + {ok, {Workers2, Responses1}} = Result1, + ?assertEqual(fabric_dict:erase(Shard1, Workers1), Workers2), + ?assertEqual([{{0, 1}, Shard1, 42}], Responses1), + + Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined), + ?assertEqual({stop, [{Shard1, 42}, {Shard2, 43}]}, Result2). + + +handle_response_multiple_copies_test() -> + Shard1 = mk_shard("n1", [0, 1]), + Shard2 = mk_shard("n2", [0, 1]), + Shard3 = mk_shard("n1", [2, ?RING_END]), + + Workers1 = fabric_dict:init([Shard1, Shard2, Shard3], nil), + + Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined), + ?assertMatch({ok, {_, _}}, Result1), + {ok, {Workers2, Responses1}} = Result1, + + Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined), + ?assertMatch({ok, {_, _}}, Result2), + {ok, {Workers3, Responses2}} = Result2, + + Result3 = handle_response(Shard3, 44, Workers3, Responses2, [], undefined), + % Use the value (42) to distinguish between [0, 1] copies. In reality + % they should have the same value but here we need to assert that copy + % that responded first is included in the ring. + ?assertEqual({stop, [{Shard1, 42}, {Shard3, 44}]}, Result3). + + +handle_response_backtracking_test() -> + Shard1 = mk_shard("n1", [0, 5]), + Shard2 = mk_shard("n1", [10, ?RING_END]), + Shard3 = mk_shard("n2", [2, ?RING_END]), + Shard4 = mk_shard("n3", [0, 1]), + + Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4], nil), + + Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined), + ?assertMatch({ok, {_, _}}, Result1), + {ok, {Workers2, Responses1}} = Result1, + + Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined), + ?assertMatch({ok, {_, _}}, Result2), + {ok, {Workers3, Responses2}} = Result2, + + Result3 = handle_response(Shard3, 44, Workers3, Responses2, [], undefined), + ?assertMatch({ok, {_, _}}, Result3), + {ok, {Workers4, Responses3}} = Result3, + + Result4 = handle_response(Shard4, 45, Workers4, Responses3, [], undefined), + ?assertEqual({stop, [{Shard3, 44}, {Shard4, 45}]}, Result4). + + +handle_response_ring_opts_test() -> + Shard1 = mk_shard("n1", [0, 5]), + Shard2 = mk_shard("n2", [0, 1]), + Shard3 = mk_shard("n3", [0, 1]), + + Opts = [{any, [mk_shard("n3", [0, 1])]}], + + ShardList = [Shard1, Shard2, Shard3], + WithRefs = [S#shard{ref = make_ref()} || S <- ShardList], + Workers1 = fabric_dict:init(WithRefs, nil), + + Result1 = handle_response(Shard1, 42, Workers1, [], Opts, undefined), + ?assertMatch({ok, {_, _}}, Result1), + {ok, {Workers2, []}} = Result1, + + % Still waiting because the node doesn't match + Result2 = handle_response(Shard2, 43, Workers2, [], Opts, undefined), + ?assertMatch({ok, {_, _}}, Result2), + {ok, {Workers3, []}} = Result2, + + Result3 = handle_response(Shard3, 44, Workers3, [], Opts, undefined), + ?assertEqual({stop, [{Shard3, 44}]}, Result3). + + +handle_error_test() -> + Shard1 = mk_shard("n1", [0, 5]), + Shard2 = mk_shard("n1", [10, ?RING_END]), + Shard3 = mk_shard("n2", [2, ?RING_END]), + Shard4 = mk_shard("n3", [0, 1]), + + Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4], nil), + + Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined), + ?assertMatch({ok, {_, _}}, Result1), + {ok, {Workers2, Responses1}} = Result1, + + Result2 = handle_error(Shard2, Workers2, Responses1), + ?assertMatch({ok, _}, Result2), + {ok, Workers3} = Result2, + ?assertEqual(fabric_dict:erase(Shard2, Workers2), Workers3), + + Result3 = handle_response(Shard3, 44, Workers3, Responses1, [], undefined), + ?assertMatch({ok, {_, _}}, Result3), + {ok, {Workers4, Responses3}} = Result3, + ?assertEqual(error, handle_error(Shard4, Workers4, Responses3)). + + +node_down_test() -> + Shard1 = mk_shard("n1", [0, 5]), + Shard2 = mk_shard("n1", [10, ?RING_END]), + Shard3 = mk_shard("n2", [2, ?RING_END]), + Shard4 = mk_shard("n3", [0, 1]), + + Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4], nil), + + Result1 = handle_response(Shard1, 42, Workers1, [], [], undefined), + ?assertMatch({ok, {_, _}}, Result1), + {ok, {Workers2, Responses1}} = Result1, + + Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined), + ?assertMatch({ok, {_, _}}, Result2), + {ok, {Workers3, Responses2}} = Result2, + + Result3 = node_down(n1, Workers3, Responses2), + ?assertMatch({ok, _}, Result3), + {ok, Workers4} = Result3, + ?assertEqual([{Shard3, nil}, {Shard4, nil}], Workers4), + + Result4 = handle_response(Shard3, 44, Workers4, Responses2, [], undefined), + ?assertMatch({ok, {_, _}}, Result4), + {ok, {Workers5, Responses3}} = Result4, + + % Note: Shard3 was already processed, it's ok if n2 went down after + ?assertEqual({ok, [{Shard4, nil}]}, node_down(n2, Workers5, Responses3)), + + ?assertEqual(error, node_down(n3, Workers5, Responses3)). + + +mk_cnts(Ranges) -> + Shards = lists:map(fun mk_shard/1, Ranges), + fabric_dict:init([S#shard{ref = make_ref()} || S <- Shards], nil). + + +mk_resps(RangeNameVals) -> + [{{B, E}, mk_shard(Name, [B, E]), V} || {Name, B, E, V} <- RangeNameVals]. + + +mk_shard([B, E]) when is_integer(B), is_integer(E) -> + #shard{range = [B, E]}. + + +mk_shard(Name, Range) -> + Node = list_to_atom(Name), + BName = list_to_binary(Name), + #shard{name = BName, node = Node, range = Range}. diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl index 288c67cab..d0a549d6e 100644 --- a/src/fabric/src/fabric_streams.erl +++ b/src/fabric/src/fabric_streams.erl @@ -14,7 +14,9 @@ -export([ start/2, + start/3, start/4, + start/5, cleanup/1 ]). @@ -28,17 +30,28 @@ start(Workers, Keypos) -> start(Workers, Keypos, undefined, undefined). -start(Workers0, Keypos, StartFun, Replacements) -> + +start(Workers, Keypos, RingOpts) -> + start(Workers, Keypos, undefined, undefined, RingOpts). + + +start(Workers, Keypos, StartFun, Replacements) -> + start(Workers, Keypos, StartFun, Replacements, []). + + +start(Workers0, Keypos, StartFun, Replacements, RingOpts) -> Fun = fun handle_stream_start/3, Acc = #stream_acc{ workers = fabric_dict:init(Workers0, waiting), + ready = [], start_fun = StartFun, - replacements = Replacements + replacements = Replacements, + ring_opts = RingOpts }, spawn_worker_cleaner(self(), Workers0), Timeout = fabric_util:request_timeout(), case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of - {ok, #stream_acc{workers=Workers}} -> + {ok, #stream_acc{ready = Workers}} -> AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) -> rexi:stream_start(From), [Worker | WorkerAcc] @@ -64,69 +77,74 @@ cleanup(Workers) -> handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) -> - case fabric_util:remove_down_workers(St#stream_acc.workers, NodeRef) of - {ok, Workers} -> - {ok, St#stream_acc{workers=Workers}}; - error -> - Reason = {nodedown, <<"progress not possible">>}, - {error, Reason} + #stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St, + case fabric_ring:node_down(NodeRef, Workers, Ready, RingOpts) of + {ok, Workers1} -> + {ok, St#stream_acc{workers = Workers1}}; + error -> + {error, {nodedown, <<"progress not possible">>}} end; handle_stream_start({rexi_EXIT, Reason}, Worker, St) -> - Workers = fabric_dict:erase(Worker, St#stream_acc.workers), - Replacements = St#stream_acc.replacements, - case {fabric_view:is_progress_possible(Workers), Reason} of - {true, _} -> - {ok, St#stream_acc{workers=Workers}}; - {false, {maintenance_mode, _Node}} when Replacements /= undefined -> - % Check if we have replacements for this range - % and start the new workers if so. - case lists:keytake(Worker#shard.range, 1, Replacements) of - {value, {_Range, WorkerReplacements}, NewReplacements} -> - FinalWorkers = lists:foldl(fun(Repl, NewWorkers) -> - NewWorker = (St#stream_acc.start_fun)(Repl), - add_worker_to_cleaner(self(), NewWorker), - fabric_dict:store(NewWorker, waiting, NewWorkers) - end, Workers, WorkerReplacements), - % Assert that our replaced worker provides us - % the oppurtunity to make progress. - true = fabric_view:is_progress_possible(FinalWorkers), - NewRefs = fabric_dict:fetch_keys(FinalWorkers), - {new_refs, NewRefs, St#stream_acc{ - workers=FinalWorkers, - replacements=NewReplacements - }}; - false -> - % If we progress isn't possible and we don't have any - % replacements then we're dead in the water. - Error = {nodedown, <<"progress not possible">>}, - {error, Error} - end; - {false, _} -> - {error, fabric_util:error_info(Reason)} + #stream_acc{ + workers = Workers, + ready = Ready, + replacements = Replacements, + ring_opts = RingOpts + } = St, + case {fabric_ring:handle_error(Worker, Workers, Ready, RingOpts), Reason} of + {{ok, Workers1}, _Reason} -> + {ok, St#stream_acc{workers = Workers1}}; + {error, {maintenance_mode, _Node}} when Replacements /= undefined -> + % Check if we have replacements for this range + % and start the new workers if so. + case lists:keytake(Worker#shard.range, 1, Replacements) of + {value, {_Range, WorkerReplacements}, NewReplacements} -> + FinalWorkers = lists:foldl(fun(Repl, NewWorkers) -> + NewWorker = (St#stream_acc.start_fun)(Repl), + add_worker_to_cleaner(self(), NewWorker), + fabric_dict:store(NewWorker, waiting, NewWorkers) + end, Workers, WorkerReplacements), + % Assert that our replaced worker provides us + % the oppurtunity to make progress. + true = fabric_ring:is_progress_possible(FinalWorkers), + NewRefs = fabric_dict:fetch_keys(FinalWorkers), + {new_refs, NewRefs, St#stream_acc{ + workers=FinalWorkers, + replacements=NewReplacements + }}; + false -> + % If we progress isn't possible and we don't have any + % replacements then we're dead in the water. + {error, {nodedown, <<"progress not possible">>}} + end; + {error, _} -> + {error, fabric_util:error_info(Reason)} end; handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) -> - case fabric_dict:lookup_element(Worker, St#stream_acc.workers) of + #stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St, + case fabric_dict:lookup_element(Worker, Workers) of undefined -> % This worker lost the race with other partition copies, terminate rexi:stream_cancel(From), {ok, St}; waiting -> - % Don't ack the worker yet so they don't start sending us - % rows until we're ready - Workers0 = fabric_dict:store(Worker, From, St#stream_acc.workers), - Workers1 = fabric_view:remove_overlapping_shards(Worker, Workers0), - case fabric_dict:any(waiting, Workers1) of - true -> - {ok, St#stream_acc{workers=Workers1}}; - false -> - {stop, St#stream_acc{workers=Workers1}} + case fabric_ring:handle_response(Worker, From, Workers, Ready, RingOpts) of + {ok, {Workers1, Ready1}} -> + % Don't have a full ring yet. Keep getting responses + {ok, St#stream_acc{workers = Workers1, ready = Ready1}}; + {stop, Ready1} -> + % Have a full ring of workers. But don't ack the worker + % yet so they don't start sending us rows until we're ready + {stop, St#stream_acc{workers = [], ready = Ready1}} end end; handle_stream_start({ok, ddoc_updated}, _, St) -> - cleanup(St#stream_acc.workers), + WaitingWorkers = [W || {W, _} <- St#stream_acc.workers], + ReadyWorkers = [W || {W, _} <- St#stream_acc.ready], + cleanup(WaitingWorkers ++ ReadyWorkers), {stop, ddoc_updated}; handle_stream_start(Else, _, _) -> @@ -199,7 +217,7 @@ should_clean_workers() -> Ref = erlang:monitor(process, Cleaner), Coord ! die, receive {'DOWN', Ref, _, Cleaner, _} -> ok end, - ?assertEqual(2, meck:num_calls(rexi, kill, 2)) + ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)) end). @@ -219,7 +237,7 @@ does_not_fire_if_cleanup_called() -> receive {'DOWN', Ref, _, _, _} -> ok end, % 2 calls would be from cleanup/1 function. If cleanup process fired % too it would have been 4 calls total. - ?assertEqual(2, meck:num_calls(rexi, kill, 2)) + ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)) end). @@ -236,12 +254,12 @@ should_clean_additional_worker_too() -> Ref = erlang:monitor(process, Cleaner), Coord ! die, receive {'DOWN', Ref, _, Cleaner, _} -> ok end, - ?assertEqual(2, meck:num_calls(rexi, kill, 2)) + ?assertEqual(1, meck:num_calls(rexi, kill_all, 1)) end). setup() -> - ok = meck:expect(rexi, kill, fun(_, _) -> ok end). + ok = meck:expect(rexi, kill_all, fun(_) -> ok end). teardown(_) -> diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 1ba1d17ea..aaf0623f0 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -22,6 +22,7 @@ -export([is_partitioned/1]). -export([validate_all_docs_args/2, validate_args/3]). -export([upgrade_mrargs/1]). +-export([worker_ranges/1]). -compile({inline, [{doc_id_and_rev,1}]}). @@ -34,7 +35,7 @@ remove_down_workers(Workers, BadNode) -> Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end, NewWorkers = fabric_dict:filter(Filter, Workers), - case fabric_view:is_progress_possible(NewWorkers) of + case fabric_ring:is_progress_possible(NewWorkers) of true -> {ok, NewWorkers}; false -> @@ -51,7 +52,7 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) -> end, Shards). cleanup(Workers) -> - [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers]. + rexi:kill_all([{Node, Ref} || #shard{node = Node, ref = Ref} <- Workers]). recv(Workers, Keypos, Fun, Acc0) -> rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity). @@ -334,3 +335,10 @@ upgrade_mrargs({mrargs, sorted = Sorted, extra = Extra }. + + +worker_ranges(Workers) -> + Ranges = fabric_dict:fold(fun(#shard{range=[X, Y]}, _, Acc) -> + [{X, Y} | Acc] + end, [], Workers), + lists:usort(Ranges). diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index 27b0c275f..55b44e6f7 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -48,61 +48,83 @@ handle_worker_exit(Collector, _Worker, Reason) -> %% @doc looks for a fully covered keyrange in the list of counters -spec is_progress_possible([{#shard{}, term()}]) -> boolean(). -is_progress_possible([]) -> - false; is_progress_possible(Counters) -> - Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, - [], Counters), - [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges), - Result = lists:foldl(fun - (_, fail) -> - % we've already declared failure - fail; - (_, complete) -> - % this is the success condition, we can fast-forward - complete; - ({X,_}, Tail) when X > (Tail+1) -> - % gap in the keyrange, we're dead - fail; - ({_,Y}, Tail) -> - case erlang:max(Tail, Y) of - End when (End+1) =:= (2 bsl 31) -> - complete; - Else -> - % the normal condition, adding to the tail - Else - end - end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest), - (Start =:= 0) andalso (Result =:= complete). + fabric_ring:is_progress_possible(Counters). -spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) -> [{#shard{}, any()}]. -remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) -> - fabric_dict:filter(fun(#shard{range=[X,Y], node=Node, ref=Ref} = Shard, _) -> - if Shard =:= Shard0 -> - % we can't remove ourselves +remove_overlapping_shards(#shard{} = Shard, Counters) -> + remove_overlapping_shards(Shard, Counters, fun stop_worker/1). + + +-spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}], fun()) -> + [{#shard{}, any()}]. +remove_overlapping_shards(#shard{} = Shard, Counters, RemoveCb) -> + Counters1 = filter_exact_copies(Shard, Counters, RemoveCb), + filter_possible_overlaps(Shard, Counters1, RemoveCb). + + +filter_possible_overlaps(Shard, Counters, RemoveCb) -> + Ranges0 = fabric_util:worker_ranges(Counters), + #shard{range = [BShard, EShard]} = Shard, + Ranges = Ranges0 ++ [{BShard, EShard}], + {Bs, Es} = lists:unzip(Ranges), + {MinB, MaxE} = {lists:min(Bs), lists:max(Es)}, + % Use a custom sort function which prioritizes the given shard + % range when the start endpoints match. + SortFun = fun + ({B, E}, {B, _}) when {B, E} =:= {BShard, EShard} -> + % If start matches with the shard's start, shard always wins true; - A < B, X >= A, X < B -> - % lower bound is inside our range - rexi:kill(Node, Ref), - false; - A < B, Y > A, Y =< B -> - % upper bound is inside our range - rexi:kill(Node, Ref), - false; - B < A, X >= A orelse B < A, X < B -> - % target shard wraps the key range, lower bound is inside - rexi:kill(Node, Ref), + ({B, _}, {B, E}) when {B, E} =:= {BShard, EShard} -> + % If start matches with the shard's start, shard always wins false; - B < A, Y > A orelse B < A, Y =< B -> - % target shard wraps the key range, upper bound is inside - rexi:kill(Node, Ref), + ({B, E1}, {B, E2}) -> + % If start matches, pick the longest range first + E2 >= E1; + ({B1, _}, {B2, _}) -> + % Then, by default, sort by start point + B1 =< B2 + end, + Ring = mem3_util:get_ring(Ranges, SortFun, MinB, MaxE), + fabric_dict:filter(fun + (S, _) when S =:= Shard -> + % Keep the original shard + true; + (#shard{range = [B, E]} = S, _) -> + case lists:member({B, E}, Ring) of + true -> + true; % Keep it + false -> + % Duplicate range, delete after calling callback function + case is_function(RemoveCb) of + true -> RemoveCb(S); + false -> ok + end, + false + end + end, Counters). + + +filter_exact_copies(#shard{range = Range0} = Shard0, Shards, Cb) -> + fabric_dict:filter(fun + (Shard, _) when Shard =:= Shard0 -> + true; % Don't remove ourselves + (#shard{range = Range} = Shard, _) when Range =:= Range0 -> + case is_function(Cb) of + true -> Cb(Shard); + false -> ok + end, false; - true -> + (_, _) -> true - end end, Shards). + +stop_worker(#shard{ref = Ref, node = Node}) -> + rexi:kill(Node, Ref). + + maybe_send_row(#collector{limit=0} = State) -> #collector{counters=Counters, user_acc=AccIn, callback=Callback} = State, case fabric_dict:any(0, Counters) of @@ -329,13 +351,15 @@ get_shards(Db, #mrargs{} = Args) -> % request. case {Args#mrargs.stable, Partition} of {true, undefined} -> - mem3:ushards(DbName); + {mem3:ushards(DbName), []}; {true, Partition} -> - mem3:ushards(DbName, couch_partition:shard_key(Partition)); + Shards = mem3:ushards(DbName, couch_partition:shard_key(Partition)), + {Shards, [{any, Shards}]}; {false, undefined} -> - mem3:shards(DbName); + {mem3:shards(DbName), []}; {false, Partition} -> - mem3:shards(DbName, couch_partition:shard_key(Partition)) + Shards = mem3:shards(DbName, couch_partition:shard_key(Partition)), + {Shards, [{any, Shards}]} end. maybe_update_others(DbName, DDoc, ShardsInvolved, ViewName, @@ -352,8 +376,9 @@ get_shard_replacements(DbName, UsedShards0) -> % that aren't already used. AllLiveShards = mem3:live_shards(DbName, [node() | nodes()]), UsedShards = [S#shard{ref=undefined} || S <- UsedShards0], - UnusedShards = AllLiveShards -- UsedShards, + get_shard_replacements_int(AllLiveShards -- UsedShards, UsedShards). +get_shard_replacements_int(UnusedShards, UsedShards) -> % If we have more than one copy of a range then we don't % want to try and add a replacement to any copy. RangeCounts = lists:foldl(fun(#shard{range=R}, Acc) -> @@ -363,10 +388,10 @@ get_shard_replacements(DbName, UsedShards0) -> % For each seq shard range with a count of 1, find any % possible replacements from the unused shards. The % replacement list is keyed by range. - lists:foldl(fun(#shard{range=Range}, Acc) -> + lists:foldl(fun(#shard{range = [B, E] = Range}, Acc) -> case dict:find(Range, RangeCounts) of {ok, 1} -> - Repls = [S || S <- UnusedShards, S#shard.range =:= Range], + Repls = mem3_util:non_overlapping_shards(UnusedShards, B, E), % Only keep non-empty lists of replacements if Repls == [] -> Acc; true -> [{Range, Repls} | Acc] @@ -406,26 +431,59 @@ is_progress_possible_test() -> ?assertEqual(is_progress_possible(mk_cnts(T4)),false), % outside range T5 = [[0,10],[11,20],[21,EndPoint]], - ?assertEqual(is_progress_possible(mk_cnts(T5)),false). + ?assertEqual(is_progress_possible(mk_cnts(T5)),false), + T6 = [[0, 10], [11, 20], [0, 5], [6, 21], [21, EndPoint - 1]], + ?assertEqual(is_progress_possible(mk_cnts(T6)), true), + % not possible, overlap is not exact + T7 = [[0, 10], [13, 20], [21, EndPoint - 1], [9, 12]], + ?assertEqual(is_progress_possible(mk_cnts(T7)), false). + remove_overlapping_shards_test() -> - meck:new(rexi), - meck:expect(rexi, kill, fun(_, _) -> ok end), - EndPoint = 2 bsl 31, - T1 = [[0,10],[11,20],[21,EndPoint-1]], - Shards = mk_cnts(T1,3), - ?assertEqual(orddict:size( - remove_overlapping_shards(#shard{name=list_to_atom("node-3"), - node=list_to_atom("node-3"), - range=[11,20]}, - Shards)),7), - meck:unload(rexi). + Cb = undefined, + + Shards = mk_cnts([[0, 10], [11, 20], [21, ?RING_END]], 3), + + % Simple (exact) overlap + Shard1 = mk_shard("node-3", [11, 20]), + Shards1 = fabric_dict:store(Shard1, nil, Shards), + R1 = remove_overlapping_shards(Shard1, Shards1, Cb), + ?assertEqual([{0, 10}, {11, 20}, {21, ?RING_END}], + fabric_util:worker_ranges(R1)), + ?assert(fabric_dict:is_key(Shard1, R1)), + + % Split overlap (shard overlap multiple workers) + Shard2 = mk_shard("node-3", [0, 20]), + Shards2 = fabric_dict:store(Shard2, nil, Shards), + R2 = remove_overlapping_shards(Shard2, Shards2, Cb), + ?assertEqual([{0, 20}, {21, ?RING_END}], + fabric_util:worker_ranges(R2)), + ?assert(fabric_dict:is_key(Shard2, R2)). + + +get_shard_replacements_test() -> + Unused = [mk_shard(N, [B, E]) || {N, B, E} <- [ + {"n1", 11, 20}, {"n1", 21, ?RING_END}, + {"n2", 0, 4}, {"n2", 5, 10}, {"n2", 11, 20}, + {"n3", 0, 21, ?RING_END} + ]], + Used = [mk_shard(N, [B, E]) || {N, B, E} <- [ + {"n2", 21, ?RING_END}, + {"n3", 0, 10}, {"n3", 11, 20} + ]], + Res = lists:sort(get_shard_replacements_int(Unused, Used)), + % Notice that [0, 10] range can be replaced by spawning the [0, 4] and [5, + % 10] workers on n1 + Expect = [ + {[0, 10], [mk_shard("n2", [0, 4]), mk_shard("n2", [5, 10])]}, + {[11, 20], [mk_shard("n1", [11, 20]), mk_shard("n2", [11, 20])]}, + {[21, ?RING_END], [mk_shard("n1", [21, ?RING_END])]} + ], + ?assertEqual(Expect, Res). + mk_cnts(Ranges) -> - Shards = lists:map(fun(Range) -> - #shard{range=Range} - end, - Ranges), + Shards = lists:map(fun mk_shard/1, Ranges), orddict:from_list([{Shard,nil} || Shard <- Shards]). mk_cnts(Ranges, NoNodes) -> @@ -440,6 +498,15 @@ mk_cnts(Ranges, NoNodes) -> mk_shards(0,_Range,Shards) -> Shards; mk_shards(NoNodes,Range,Shards) -> - NodeName = list_to_atom("node-" ++ integer_to_list(NoNodes)), - mk_shards(NoNodes-1,Range, - [#shard{name=NodeName, node=NodeName, range=Range} | Shards]). + Name ="node-" ++ integer_to_list(NoNodes), + mk_shards(NoNodes-1,Range, [mk_shard(Name, Range) | Shards]). + + +mk_shard([B, E]) when is_integer(B), is_integer(E) -> + #shard{range = [B, E]}. + + +mk_shard(Name, Range) -> + Node = list_to_atom(Name), + BName = list_to_binary(Name), + #shard{name = BName, node = Node, range = Range}. diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl index 9049eaa90..1d87e3ddd 100644 --- a/src/fabric/src/fabric_view_all_docs.erl +++ b/src/fabric/src/fabric_view_all_docs.erl @@ -23,12 +23,12 @@ go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) -> {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs), DbName = fabric:dbname(Db), - Shards = shards(Db, QueryArgs), + {Shards, RingOpts} = shards(Db, QueryArgs), Workers0 = fabric_util:submit_jobs( Shards, fabric_rpc, all_docs, [Options, WorkerArgs]), RexiMon = fabric_util:create_monitors(Workers0), try - case fabric_streams:start(Workers0, #shard.ref) of + case fabric_streams:start(Workers0, #shard.ref, RingOpts) of {ok, Workers} -> try go(DbName, Options, Workers, CoordArgs, Callback, Acc) diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl index f96bb058d..febbd3169 100644 --- a/src/fabric/src/fabric_view_changes.erl +++ b/src/fabric/src/fabric_view_changes.erl @@ -127,24 +127,30 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0) send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) -> LiveNodes = [node() | nodes()], AllLiveShards = mem3:live_shards(DbName, LiveNodes), - Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) -> - case lists:member(Shard, AllLiveShards) of - true -> - Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}), - [{Shard#shard{ref = Ref}, Seq}]; - false -> - % Find some replacement shards to cover the missing range - % TODO It's possible in rare cases of shard merging to end up - % with overlapping shard ranges from this technique - lists:map(fun(#shard{name=Name2, node=N2} = NewShard) -> - Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2, ChangesArgs, - make_replacement_arg(N, Seq)]}), - {NewShard#shard{ref = Ref}, 0} - end, find_replacement_shards(Shard, AllLiveShards)) - end - end, unpack_seqs(PackedSeqs, DbName)), + Seqs0 = unpack_seqs(PackedSeqs, DbName), + {WSeqs0, Dead, Reps} = find_replacements(Seqs0, AllLiveShards), + % Start workers which didn't need replacements + WSeqs = lists:map(fun({#shard{name = Name, node = N} = S, Seq}) -> + Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Seq]}), + {S#shard{ref = Ref}, Seq} + end, WSeqs0), + % For some dead workers see if they are a result of split shards. In that + % case make a replacement argument so that local rexi workers can calculate + % (hopefully) a > 0 update sequence. + {WSplitSeqs0, Reps1} = find_split_shard_replacements(Dead, Reps), + WSplitSeqs = lists:map(fun({#shard{name = Name, node = N} = S, Seq}) -> + Arg = make_replacement_arg(N, Seq), + Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}), + {S#shard{ref = Ref}, Seq} + end, WSplitSeqs0), + % For ranges that were not split start sequences from 0 + WReps = lists:map(fun(#shard{name = Name, node = N} = S) -> + Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}), + {S#shard{ref = Ref}, 0} + end, Reps1), + Seqs = WSeqs ++ WSplitSeqs ++ WReps, {Workers0, _} = lists:unzip(Seqs), - Repls = fabric_view:get_shard_replacements(DbName, Workers0), + Repls = fabric_ring:get_shard_replacements(DbName, Workers0), StartFun = fun(#shard{name=Name, node=N, range=R0}=Shard) -> %% Find the original shard copy in the Seqs array case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, Seqs) of @@ -386,6 +392,24 @@ seq({Seq, _Uuid, _Node}) -> Seq; seq({Seq, _Uuid}) -> Seq; seq(Seq) -> Seq. + +unpack_seq_regex_match(Packed) -> + NewPattern = "^\\[[0-9]+\s*,\s*\"(?<opaque>.*)\"\\]$", + OldPattern = "^\"?([0-9]+-)?(?<opaque>.*?)\"?$", + Options = [{capture, [opaque], binary}], + case re:run(Packed, NewPattern, Options) of + {match, Match} -> + Match; + nomatch -> + {match, Match} = re:run(Packed, OldPattern, Options), + Match + end. + + +unpack_seq_decode_term(Opaque) -> + binary_to_term(couch_util:decodeBase64Url(Opaque)). + + unpack_seqs(0, DbName) -> fabric_dict:init(mem3:shards(DbName), 0); @@ -396,23 +420,14 @@ unpack_seqs([_SeqNum, Opaque], DbName) -> % deprecated do_unpack_seqs(Opaque, DbName); unpack_seqs(Packed, DbName) -> - NewPattern = "^\\[[0-9]+\s*,\s*\"(?<opaque>.*)\"\\]$", - OldPattern = "^\"?([0-9]+-)?(?<opaque>.*?)\"?$", - Options = [{capture, [opaque], binary}], - Opaque = case re:run(Packed, NewPattern, Options) of - {match, Match} -> - Match; - nomatch -> - {match, Match} = re:run(Packed, OldPattern, Options), - Match - end, + Opaque = unpack_seq_regex_match(Packed), do_unpack_seqs(Opaque, DbName). do_unpack_seqs(Opaque, DbName) -> % A preventative fix for FB 13533 to remove duplicate shards. % This just picks each unique shard and keeps the largest seq % value recorded. - Decoded = binary_to_term(couch_util:decodeBase64Url(Opaque)), + Decoded = unpack_seq_decode_term(Opaque), DedupDict = lists:foldl(fun({Node, [A, B], Seq}, Acc) -> dict:append({Node, [A, B]}, Seq, Acc) end, dict:new(), Decoded), @@ -431,28 +446,28 @@ do_unpack_seqs(Opaque, DbName) -> end end, Deduped), - % Fill holes in the since sequence. If/when we ever start - % using overlapping shard ranges this will need to be updated - % to not include shard ranges that overlap entries in Upacked. - % A quick and dirty approach would be like such: - % - % lists:foldl(fun(S, Acc) -> - % fabric_view:remove_overlapping_shards(S, Acc) - % end, mem3:shards(DbName), Unpacked) - % - % Unfortunately remove_overlapping_shards isn't reusable because - % of its calls to rexi:kill/2. When we get to overlapping - % shard ranges and have to rewrite shard range management - % we can revisit this simpler algorithm. - case fabric_view:is_progress_possible(Unpacked) of + % This just handles the case if the ring in the unpacked sequence + % received is not complete and in that case tries to fill in the + % missing ranges with shards from the shard map + case fabric_ring:is_progress_possible(Unpacked) of true -> Unpacked; false -> - Extract = fun({Shard, _Seq}) -> Shard#shard.range end, - Ranges = lists:usort(lists:map(Extract, Unpacked)), - Filter = fun(S) -> not lists:member(S#shard.range, Ranges) end, - Replacements = lists:filter(Filter, mem3:shards(DbName)), - Unpacked ++ [{R, get_old_seq(R, Deduped)} || R <- Replacements] + PotentialWorkers = lists:map(fun({Node, [A, B], Seq}) -> + case mem3:get_shard(DbName, Node, [A, B]) of + {ok, Shard} -> + {Shard, Seq}; + {error, not_found} -> + {#shard{node = Node, range = [A, B]}, Seq} + end + end, Deduped), + Shards = mem3:shards(DbName), + {Unpacked1, Dead, Reps} = find_replacements(PotentialWorkers, Shards), + {Splits, Reps1} = find_split_shard_replacements(Dead, Reps), + RepSeqs = lists:map(fun(#shard{} = S) -> + {S, get_old_seq(S, Deduped)} + end, Reps1), + Unpacked1 ++ Splits ++ RepSeqs end. @@ -491,18 +506,119 @@ changes_row(Props0) -> Props2 = lists:filter(fun({K,_V}) -> lists:member(K, Allowed) end, Props1), {change, {Props2}}. -find_replacement_shards(#shard{range=Range}, AllShards) -> - % TODO make this moar betta -- we might have split or merged the partition - [Shard || Shard <- AllShards, Shard#shard.range =:= Range]. + +find_replacements(Workers, AllShards) -> + % Build map [B, E] => [Worker1, Worker2, ...] for all workers + WrkMap = lists:foldl(fun({#shard{range = [B, E]}, _} = W, Acc) -> + maps:update_with({B, E}, fun(Ws) -> [W | Ws] end, [W], Acc) + end, #{}, fabric_dict:to_list(Workers)), + + % Build map [B, E] => [Shard1, Shard2, ...] for all shards + AllMap = lists:foldl(fun(#shard{range = [B, E]} = S, Acc) -> + maps:update_with({B, E}, fun(Ss) -> [S | Ss] end, [S], Acc) + end, #{}, AllShards), + + % Custom sort function will prioritize workers over other shards. + % The idea is to not unnecessarily kill workers if we don't have to + SortFun = fun + (R1 = {B, E1}, R2 = {B, E2}) -> + case {maps:is_key(R1, WrkMap), maps:is_key(R2, WrkMap)} of + {true, true} -> + % Both are workers, larger interval wins + E1 >= E2; + {true, false} -> + % First element is a worker range, it wins + true; + {false, true} -> + % Second element is a worker range, it wins + false; + {false, false} -> + % Neither one is a worker interval, pick larger one + E1 >= E2 + end; + ({B1, _}, {B2, _}) -> + B1 =< B2 + end, + Ring = mem3_util:get_ring(maps:keys(AllMap), SortFun), + + % Keep only workers in the ring and from one of the available nodes + Keep = fun(#shard{range = [B, E], node = N}) -> + lists:member({B, E}, Ring) andalso lists:keyfind(N, #shard.node, + maps:get({B, E}, AllMap)) =/= false + end, + Workers1 = fabric_dict:filter(fun(S, _) -> Keep(S) end, Workers), + Removed = fabric_dict:filter(fun(S, _) -> not Keep(S) end, Workers), + + {Rep, _} = lists:foldl(fun(R, {RepAcc, AllMapAcc}) -> + case maps:is_key(R, WrkMap)of + true -> + % It's a worker and in the map of available shards. Make sure + % to keep it only if there is a range available on that node + % only (reuse Keep/1 predicate from above) + WorkersInRange = maps:get(R, WrkMap), + case lists:any(fun({S, _}) -> Keep(S) end, WorkersInRange) of + true -> + {RepAcc, AllMapAcc}; + false -> + [Shard | Rest] = maps:get(R, AllMapAcc), + {[Shard | RepAcc], AllMapAcc#{R := Rest}} + end; + false -> + % No worker for this range. Replace from available shards + [Shard | Rest] = maps:get(R, AllMapAcc), + {[Shard | RepAcc], AllMapAcc#{R := Rest}} + end + end, {[], AllMap}, Ring), + + % Return the list of workers that are part of ring, list of removed workers + % and a list of replacement shards that could be used to make sure the ring + % completes. + {Workers1, Removed, Rep}. + + +% From the list of dead workers determine if any are a result of a split shard. +% In that case perhaps there is a way to not rewind the changes feed back to 0. +% Returns {NewWorkers, Available} where NewWorkers is the list of +% viable workers Available is the list of still unused input Shards +find_split_shard_replacements(DeadWorkers, Shards) -> + Acc0 = {[], Shards}, + AccF = fabric_dict:fold(fun(#shard{node = WN, range = R}, Seq, Acc) -> + [B, E] = R, + {SplitWorkers, Available} = Acc, + ShardsOnSameNode = [S || #shard{node = N} = S <- Available, N =:= WN], + SplitShards = mem3_util:non_overlapping_shards(ShardsOnSameNode, B, E), + RepCount = length(SplitShards), + NewWorkers = [{S, make_split_seq(Seq, RepCount)} || S <- SplitShards], + NewAvailable = [S || S <- Available, not lists:member(S, SplitShards)], + {NewWorkers ++ SplitWorkers, NewAvailable} + end, Acc0, DeadWorkers), + {Workers, Available} = AccF, + {fabric_dict:from_list(Workers), Available}. + + +make_split_seq({Num, Uuid, Node}, RepCount) when RepCount > 1 -> + {Num, {split, Uuid}, Node}; +make_split_seq(Seq, _) -> + Seq. + validate_start_seq(_DbName, "now") -> ok; -validate_start_seq(DbName, Seq) -> - try unpack_seqs(Seq, DbName) of _Any -> +validate_start_seq(_DbName, 0) -> + ok; +validate_start_seq(_DbName, "0") -> + ok; +validate_start_seq(_DbName, Seq) -> + try + case Seq of + [_SeqNum, Opaque] -> + unpack_seq_decode_term(Opaque); + Seq -> + Opaque = unpack_seq_regex_match(Seq), + unpack_seq_decode_term(Opaque) + end, ok catch - error:database_does_not_exist -> - {error, database_does_not_exist}; _:_ -> Reason = <<"Malformed sequence supplied in 'since' parameter.">>, {error, {bad_request, Reason}} @@ -520,51 +636,185 @@ get_changes_epoch() -> increment_changes_epoch() -> application:set_env(fabric, changes_epoch, os:timestamp()). -unpack_seqs_test() -> + +unpack_seq_setup() -> meck:new(mem3), meck:new(fabric_view), meck:expect(mem3, get_shard, fun(_, _, _) -> {ok, #shard{}} end), - meck:expect(fabric_view, is_progress_possible, fun(_) -> true end), - - % BigCouch 0.3 style. - assert_shards("23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA"), - - % BigCouch 0.4 style. - assert_shards([23423,<<"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA">>]), - - % BigCouch 0.4 style (as string). - assert_shards("[23423,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), - assert_shards("[23423 ,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), - assert_shards("[23423, \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), - assert_shards("[23423 , \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), - - % with internal hypen - assert_shards("651-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ" - "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8" - "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"), - assert_shards([651,"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ" - "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8" - "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"]), - - % CouchDB 1.2 style - assert_shards("\"23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\""), - - meck:unload(fabric_view), - meck:unload(mem3). + meck:expect(fabric_ring, is_progress_possible, fun(_) -> true end), + ok. + + +unpack_seqs_test_() -> + { + setup, + fun unpack_seq_setup/0, + fun (_) -> meck:unload() end, + [ + t_unpack_seqs() + ] + }. + + +t_unpack_seqs() -> + ?_test(begin + % BigCouch 0.3 style. + assert_shards("23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" + "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" + "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA"), + + % BigCouch 0.4 style. + assert_shards([23423,<<"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" + "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" + "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA">>]), + + % BigCouch 0.4 style (as string). + assert_shards("[23423,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" + "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" + "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), + assert_shards("[23423 ,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" + "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" + "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), + assert_shards("[23423, \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" + "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" + "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), + assert_shards("[23423 , \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" + "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" + "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), + + % with internal hypen + assert_shards("651-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ" + "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8" + "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"), + assert_shards([651,"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ" + "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8" + "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"]), + + % CouchDB 1.2 style + assert_shards("\"23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" + "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" + "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"") + end). + assert_shards(Packed) -> ?assertMatch([{#shard{},_}|_], unpack_seqs(Packed, <<"foo">>)). + + +find_replacements_test() -> + % None of the workers are in the live list of shard but there is a + % replacement on n3 for the full range. It should get picked instead of + % the two smaller one on n2. + Workers1 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]), + AllShards1 = [ + mk_shard("n1", 11, ?RING_END), + mk_shard("n2", 0, 4), + mk_shard("n2", 5, 10), + mk_shard("n3", 0, ?RING_END) + ], + {WorkersRes1, Dead1, Reps1} = find_replacements(Workers1, AllShards1), + ?assertEqual([], WorkersRes1), + ?assertEqual(Workers1, Dead1), + ?assertEqual([mk_shard("n3", 0, ?RING_END)], Reps1), + + % None of the workers are in the live list of shards and there is a + % split replacement from n2 (range [0, 10] replaced with [0, 4], [5, 10]) + Workers2 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]), + AllShards2 = [ + mk_shard("n1", 11, ?RING_END), + mk_shard("n2", 0, 4), + mk_shard("n2", 5, 10) + ], + {WorkersRes2, Dead2, Reps2} = find_replacements(Workers2, AllShards2), + ?assertEqual([], WorkersRes2), + ?assertEqual(Workers2, Dead2), + ?assertEqual([ + mk_shard("n1", 11, ?RING_END), + mk_shard("n2", 0, 4), + mk_shard("n2", 5, 10) + ], lists:sort(Reps2)), + + % One worker is available and one needs to be replaced. Replacement will be + % from two split shards + Workers3 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]), + AllShards3 = [ + mk_shard("n1", 11, ?RING_END), + mk_shard("n2", 0, 4), + mk_shard("n2", 5, 10), + mk_shard("n2", 11, ?RING_END) + ], + {WorkersRes3, Dead3, Reps3} = find_replacements(Workers3, AllShards3), + ?assertEqual(mk_workers([{"n2", 11, ?RING_END}]), WorkersRes3), + ?assertEqual(mk_workers([{"n1", 0, 10}]), Dead3), + ?assertEqual([ + mk_shard("n2", 0, 4), + mk_shard("n2", 5, 10) + ], lists:sort(Reps3)), + + % All workers are available. Make sure they are not killed even if there is + % a longer (single) shard to replace them. + Workers4 = mk_workers([{"n1", 0, 10}, {"n1", 11, ?RING_END}]), + AllShards4 = [ + mk_shard("n1", 0, 10), + mk_shard("n1", 11, ?RING_END), + mk_shard("n2", 0, 4), + mk_shard("n2", 5, 10), + mk_shard("n3", 0, ?RING_END) + ], + {WorkersRes4, Dead4, Reps4} = find_replacements(Workers4, AllShards4), + ?assertEqual(Workers4, WorkersRes4), + ?assertEqual([], Dead4), + ?assertEqual([], Reps4). + + +mk_workers(NodesRanges) -> + mk_workers(NodesRanges, nil). + +mk_workers(NodesRanges, Val) -> + orddict:from_list([{mk_shard(N, B, E), Val} || {N, B, E} <- NodesRanges]). + + +mk_shard(Name, B, E) -> + Node = list_to_atom(Name), + BName = list_to_binary(Name), + #shard{name = BName, node = Node, range = [B, E]}. + + +find_split_shard_replacements_test() -> + % One worker is can be replaced and one can't + Dead1 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42), + Shards1 = [ + mk_shard("n1", 0, 4), + mk_shard("n1", 5, 10), + mk_shard("n3", 11, ?RING_END) + ], + {Workers1, ShardsLeft1} = find_split_shard_replacements(Dead1, Shards1), + ?assertEqual(mk_workers([{"n1", 0, 4}, {"n1", 5, 10}], 42), Workers1), + ?assertEqual([mk_shard("n3", 11, ?RING_END)], ShardsLeft1), + + % All workers can be replaced - one by 1 shard, another by 3 smaller shards + Dead2 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42), + Shards2 = [ + mk_shard("n1", 0, 10), + mk_shard("n2", 11, 12), + mk_shard("n2", 13, 14), + mk_shard("n2", 15, ?RING_END) + ], + {Workers2, ShardsLeft2} = find_split_shard_replacements(Dead2, Shards2), + ?assertEqual(mk_workers([ + {"n1", 0, 10}, + {"n2", 11, 12}, + {"n2", 13, 14}, + {"n2", 15, ?RING_END} + ], 42), Workers2), + ?assertEqual([], ShardsLeft2), + + % No workers can be replaced. Ranges match but they are on different nodes + Dead3 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42), + Shards3 = [ + mk_shard("n2", 0, 10), + mk_shard("n3", 11, ?RING_END) + ], + {Workers3, ShardsLeft3} = find_split_shard_replacements(Dead3, Shards3), + ?assertEqual([], Workers3), + ?assertEqual(Shards3, ShardsLeft3). diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index 1bbc6d201..5a5cc138b 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -26,11 +26,11 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo) go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) -> DbName = fabric:dbname(Db), - Shards = fabric_view:get_shards(Db, Args), + {Shards, RingOpts} = fabric_view:get_shards(Db, Args), {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args), DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args), - Repls = fabric_view:get_shard_replacements(DbName, Shards), + Repls = fabric_ring:get_shard_replacements(DbName, Shards), RPCArgs = [DocIdAndRev, View, WorkerArgs, Options], StartFun = fun(Shard) -> hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs)) @@ -38,7 +38,8 @@ go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) -> Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs), RexiMon = fabric_util:create_monitors(Workers0), try - case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of + case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls, + RingOpts) of {ok, ddoc_updated} -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index b157c37e5..a432b2cd5 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -25,19 +25,20 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) - go(Db, DDoc, VName, Args, Callback, Acc, VInfo) -> DbName = fabric:dbname(Db), - Shards = fabric_view:get_shards(Db, Args), + {Shards, RingOpts} = fabric_view:get_shards(Db, Args), {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args), DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), RPCArgs = [DocIdAndRev, VName, WorkerArgs], fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args), - Repls = fabric_view:get_shard_replacements(DbName, Shards), + Repls = fabric_ring:get_shard_replacements(DbName, Shards), StartFun = fun(Shard) -> hd(fabric_util:submit_jobs([Shard], fabric_rpc, reduce_view, RPCArgs)) end, Workers0 = fabric_util:submit_jobs(Shards,fabric_rpc,reduce_view,RPCArgs), RexiMon = fabric_util:create_monitors(Workers0), try - case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of + case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls, + RingOpts) of {ok, ddoc_updated} -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> diff --git a/src/mem3/include/mem3.hrl b/src/mem3/include/mem3.hrl index 9be8a7f99..d97b25469 100644 --- a/src/mem3/include/mem3.hrl +++ b/src/mem3/include/mem3.hrl @@ -10,6 +10,11 @@ % License for the specific language governing permissions and limitations under % the License. + +% The last element in the ring +-define(RING_END, 2 bsl 31 - 1). + + % type specification hacked to suppress dialyzer warning re: match spec -record(shard, { name :: binary() | '_' | 'undefined', diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl index dea0c7a5b..dc666fdae 100644 --- a/src/mem3/src/mem3.erl +++ b/src/mem3/src/mem3.erl @@ -150,7 +150,8 @@ ushards(DbName, Shards0, ZoneMap) -> % but sort each zone separately to ensure a consistent choice between % nodes in the same zone. Shards = choose_ushards(DbName, L ++ S) ++ choose_ushards(DbName, D), - lists:ukeysort(#shard.range, Shards). + OverlappedShards = lists:ukeysort(#shard.range, Shards), + mem3_util:non_overlapping_shards(OverlappedShards). get_shard(DbName, Node, Range) -> mem3_shards:get(DbName, Node, Range). diff --git a/src/mem3/src/mem3_httpd.erl b/src/mem3/src/mem3_httpd.erl index c922141b1..3df7e1876 100644 --- a/src/mem3/src/mem3_httpd.erl +++ b/src/mem3/src/mem3_httpd.erl @@ -77,7 +77,7 @@ json_shards([#shard{node=Node, range=[B,E]} | Rest], AccIn) -> json_shards(Rest, dict:append(Range, Node, AccIn)). sync_shard(ShardName) -> - Shards = mem3_shards:for_shard_name(ShardName), + Shards = mem3_shards:for_shard_range(ShardName), [rpc:call(S1#shard.node, mem3_sync, push, [S1, S2#shard.node]) || S1 <- Shards, S2 <- Shards, S1 =/= S2], ok. diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl index 6afc22f57..dfa40c338 100644 --- a/src/mem3/src/mem3_shards.erl +++ b/src/mem3/src/mem3_shards.erl @@ -21,7 +21,7 @@ -export([start_link/0]). -export([for_db/1, for_db/2, for_docid/2, for_docid/3, get/3, local/1, fold/2]). --export([for_shard_name/1]). +-export([for_shard_range/1]). -export([set_max_size/1]). -export([get_changes_pid/0]). @@ -95,36 +95,34 @@ for_docid(DbName, DocId, Options) -> false -> mem3_util:downcast(Shards) end. -for_shard_name(ShardName) -> - for_shard_name(ShardName, []). - -for_shard_name(ShardName, Options) -> +for_shard_range(ShardName) -> DbName = mem3:dbname(ShardName), + [B, E] = mem3:range(ShardName), ShardHead = #shard{ - name = ShardName, dbname = DbName, + range = ['$1', '$2'], _ = '_' }, OrderedShardHead = #ordered_shard{ - name = ShardName, dbname = DbName, + range = ['$1', '$2'], _ = '_' }, - ShardSpec = {ShardHead, [], ['$_']}, - OrderedShardSpec = {OrderedShardHead, [], ['$_']}, + % see mem3_util:range_overlap/2 for an explanation how it works + Conditions = [{'=<', '$1', E}, {'=<', B, '$2'}], + ShardSpec = {ShardHead, Conditions, ['$_']}, + OrderedShardSpec = {OrderedShardHead, Conditions, ['$_']}, Shards = try ets:select(?SHARDS, [ShardSpec, OrderedShardSpec]) of [] -> - filter_shards_by_name(ShardName, load_shards_from_disk(DbName)); + filter_shards_by_range([B, E], load_shards_from_disk(DbName)); Else -> gen_server:cast(?MODULE, {cache_hit, DbName}), Else catch error:badarg -> - filter_shards_by_name(ShardName, load_shards_from_disk(DbName)) + filter_shards_by_range([B, E], load_shards_from_disk(DbName)) end, - case lists:member(ordered, Options) of - true -> Shards; - false -> mem3_util:downcast(Shards) - end. + mem3_util:downcast(Shards). + get(DbName, Node, Range) -> Res = lists:foldl(fun(#shard{node=N, range=R}=S, Acc) -> @@ -509,17 +507,12 @@ flush_write(DbName, Writer, WriteTimeout) -> erlang:exit({mem3_shards_write_timeout, DbName}) end. -filter_shards_by_name(Name, Shards) -> - filter_shards_by_name(Name, [], Shards). - -filter_shards_by_name(_, Matches, []) -> - Matches; -filter_shards_by_name(Name, Matches, [#ordered_shard{name=Name}=S|Ss]) -> - filter_shards_by_name(Name, [S|Matches], Ss); -filter_shards_by_name(Name, Matches, [#shard{name=Name}=S|Ss]) -> - filter_shards_by_name(Name, [S|Matches], Ss); -filter_shards_by_name(Name, Matches, [_|Ss]) -> - filter_shards_by_name(Name, Matches, Ss). + +filter_shards_by_range(Range, Shards)-> + lists:filter(fun + (#ordered_shard{range = R}) -> mem3_util:range_overlap(Range, R); + (#shard{range = R}) -> mem3_util:range_overlap(Range, R) + end, Shards). -ifdef(TEST). diff --git a/src/mem3/src/mem3_sync_event_listener.erl b/src/mem3/src/mem3_sync_event_listener.erl index 56ffe3d07..e3368e23f 100644 --- a/src/mem3/src/mem3_sync_event_listener.erl +++ b/src/mem3/src/mem3_sync_event_listener.erl @@ -192,7 +192,7 @@ maybe_push_shards(St) -> end. push_shard(ShardName) -> - try mem3_shards:for_shard_name(ShardName) of + try mem3_shards:for_shard_range(ShardName) of Shards -> Live = nodes(), lists:foreach( diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl index b44ca2332..e8cba5d7b 100644 --- a/src/mem3/src/mem3_util.erl +++ b/src/mem3/src/mem3_util.erl @@ -17,7 +17,18 @@ shard_info/1, ensure_exists/1, open_db_doc/1]). -export([is_deleted/1, rotate_list/2]). -export([ - iso8601_timestamp/0 + iso8601_timestamp/0, + live_nodes/0, + replicate_dbs_to_all_nodes/1, + replicate_dbs_from_all_nodes/1, + range_overlap/2, + get_ring/1, + get_ring/2, + get_ring/3, + get_ring/4, + non_overlapping_shards/1, + non_overlapping_shards/3, + calculate_max_n/1 ]). %% do not use outside mem3. @@ -286,3 +297,272 @@ iso8601_timestamp() -> {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now), Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ", io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]). + + +live_nodes() -> + LiveNodes = [node() | nodes()], + Mem3Nodes = lists:sort(mem3:nodes()), + [N || N <- Mem3Nodes, lists:member(N, LiveNodes)]. + + +% Replicate "dbs" db to all nodes. Basically push the changes to all the live +% mem3:nodes(). Returns only after all current changes have been replicated, +% which could be a while. +% +replicate_dbs_to_all_nodes(Timeout) -> + DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")), + Targets= mem3_util:live_nodes() -- [node()], + Res = [start_replication(node(), T, DbName, Timeout) || T <- Targets], + collect_replication_results(Res, Timeout). + + +% Replicate "dbs" db from all nodes to this node. Basically make an rpc call +% to all the nodes an have them push their changes to this node. Then monitor +% them until they are all done. +% +replicate_dbs_from_all_nodes(Timeout) -> + DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")), + Sources = mem3_util:live_nodes() -- [node()], + Res = [start_replication(S, node(), DbName, Timeout) || S <- Sources], + collect_replication_results(Res, Timeout). + + +% Spawn and monitor a single replication of a database to a target node. +% Returns {ok, PidRef}. This function could be called locally or remotely from +% mem3_rpc, for instance when replicating other nodes' data to this node. +% +start_replication(Source, Target, DbName, Timeout) -> + spawn_monitor(fun() -> + case mem3_rpc:replicate(Source, Target, DbName, Timeout) of + {ok, 0} -> + exit(ok); + Other -> + exit(Other) + end + end). + + +collect_replication_results(Replications, Timeout) -> + Res = [collect_replication_result(R, Timeout) || R <- Replications], + case [R || R <- Res, R =/= ok] of + [] -> + ok; + Errors -> + {error, Errors} + end. + + +collect_replication_result({Pid, Ref}, Timeout) when is_pid(Pid) -> + receive + {'DOWN', Ref, _, _, Res} -> + Res + after Timeout -> + demonitor(Pid, [flush]), + exit(Pid, kill), + {error, {timeout, Timeout, node(Pid)}} + end; + +collect_replication_result(Error, _) -> + {error, Error}. + + +% Consider these cases: +% +% A-------B +% +% overlap: +% X--------Y +% X-Y +% X-------Y +% X-------------------Y +% +% no overlap: +% X-Y because A !=< Y +% X-Y because X !=< B +% +range_overlap([A, B], [X, Y]) when + is_integer(A), is_integer(B), + is_integer(X), is_integer(Y), + A =< B, X =< Y -> + A =< Y andalso X =< B. + + +non_overlapping_shards(Shards) -> + {Start, End} = lists:foldl(fun(Shard, {Min, Max}) -> + [B, E] = mem3:range(Shard), + {min(B, Min), max(E, Max)} + end, {0, ?RING_END}, Shards), + non_overlapping_shards(Shards, Start, End). + + +non_overlapping_shards([], _, _) -> + []; + +non_overlapping_shards(Shards, Start, End) -> + Ranges = lists:map(fun(Shard) -> + [B, E] = mem3:range(Shard), + {B, E} + end, Shards), + Ring = get_ring(Ranges, fun sort_ranges_fun/2, Start, End), + lists:filter(fun(Shard) -> + [B, E] = mem3:range(Shard), + lists:member({B, E}, Ring) + end, Shards). + + +% Given a list of shards, return the maximum number of copies +% across all the ranges. If the ring is incomplete it will return 0. +% If there it is an n = 1 database, it should return 1, etc. +calculate_max_n(Shards) -> + Ranges = lists:map(fun(Shard) -> + [B, E] = mem3:range(Shard), + {B, E} + end, Shards), + calculate_max_n(Ranges, get_ring(Ranges), 0). + + +calculate_max_n(_Ranges, [], N) -> + N; + +calculate_max_n(Ranges, Ring, N) -> + NewRanges = Ranges -- Ring, + calculate_max_n(NewRanges, get_ring(NewRanges), N + 1). + + +get_ring(Ranges) -> + get_ring(Ranges, fun sort_ranges_fun/2, 0, ?RING_END). + + +get_ring(Ranges, SortFun) when is_function(SortFun, 2) -> + get_ring(Ranges, SortFun, 0, ?RING_END). + + +get_ring(Ranges, Start, End) when is_integer(Start), is_integer(End), + Start >= 0, End >= 0, Start =< End -> + get_ring(Ranges, fun sort_ranges_fun/2, Start, End). + +% Build a ring out of a list of possibly overlapping ranges. If a ring cannot +% be built then [] is returned. Start and End supply a custom range such that +% only intervals in that range will be considered. SortFun is a custom sorting +% function to sort intervals before the ring is built. The custom sort function +% can be used to prioritize how the ring is built, for example, whether to use +% shortest ranges first (and thus have more total shards) or longer or any +% other scheme. +% +get_ring([], _SortFun, _Start, _End) -> + []; +get_ring(Ranges, SortFun, Start, End) when is_function(SortFun, 2), + is_integer(Start), is_integer(End), + Start >= 0, End >= 0, Start =< End -> + Sorted = lists:usort(SortFun, Ranges), + case get_subring_int(Start, End, Sorted) of + fail -> []; + Ring -> Ring + end. + + +get_subring_int(_, _, []) -> + fail; + +get_subring_int(Start, EndMax, [{Start, End} = Range | Tail]) -> + case End =:= EndMax of + true -> + [Range]; + false -> + case get_subring_int(End + 1, EndMax, Tail) of + fail -> + get_subring_int(Start, EndMax, Tail); + Acc -> + [Range | Acc] + end + end; + +get_subring_int(Start1, _, [{Start2, _} | _]) when Start2 > Start1 -> + % Found a gap, this attempt is done + fail; + +get_subring_int(Start1, EndMax, [{Start2, _} | Rest]) when Start2 < Start1 -> + % We've overlapped the head, skip the shard + get_subring_int(Start1, EndMax, Rest). + + +% Sort ranges by starting point, then sort so that +% the longest range comes first +sort_ranges_fun({B, E1}, {B, E2}) -> + E2 =< E1; + +sort_ranges_fun({B1, _}, {B2, _}) -> + B1 =< B2. + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +range_overlap_test_() -> + [?_assertEqual(Res, range_overlap(R1, R2)) || {R1, R2, Res} <- [ + {[2, 6], [1, 3], true}, + {[2, 6], [3, 4], true}, + {[2, 6], [4, 8], true}, + {[2, 6], [1, 9], true}, + {[2, 6], [1, 2], true}, + {[2, 6], [6, 7], true}, + {[2, 6], [0, 1], false}, + {[2, 6], [7, 9], false} + ]]. + + +non_overlapping_shards_test() -> + [?_assertEqual(Res, non_overlapping_shards(Shards)) || {Shards, Res} <- [ + { + [shard(0, ?RING_END)], + [shard(0, ?RING_END)] + }, + { + [shard(0, 1)], + [shard(0, 1)] + }, + { + [shard(0, 1), shard(0, 1)], + [shard(0, 1)] + }, + { + [shard(0, 1), shard(3, 4)], + [] + }, + { + [shard(0, 1), shard(1, 2), shard(2, 3)], + [shard(0, 1), shard(2, 3)] + }, + { + [shard(1, 2), shard(0, 1)], + [shard(0, 1), shard(1, 2)] + }, + { + [shard(0, 1), shard(0, 2), shard(2, 5), shard(3, 5)], + [shard(0, 2), shard(2, 5)] + }, + { + [shard(0, 2), shard(4, 5), shard(1, 3)], + [] + } + + ]]. + + +calculate_max_n_test_() -> + [?_assertEqual(Res, calculate_max_n(Shards)) || {Res, Shards} <- [ + {0, []}, + {0, [shard(1, ?RING_END)]}, + {1, [shard(0, ?RING_END)]}, + {1, [shard(0, ?RING_END), shard(1, ?RING_END)]}, + {2, [shard(0, ?RING_END), shard(0, ?RING_END)]}, + {2, [shard(0, 1), shard(2, ?RING_END), shard(0, ?RING_END)]}, + {0, [shard(0, 3), shard(5, ?RING_END), shard(1, ?RING_END)]} + ]]. + + +shard(Begin, End) -> + #shard{range = [Begin, End]}. + +-endif. diff --git a/src/mem3/test/mem3_ring_prop_tests.erl b/src/mem3/test/mem3_ring_prop_tests.erl new file mode 100644 index 000000000..9f4f86f5f --- /dev/null +++ b/src/mem3/test/mem3_ring_prop_tests.erl @@ -0,0 +1,144 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_ring_prop_tests). + + +-include_lib("triq/include/triq.hrl"). +-triq(eunit). + + +% Properties + +prop_get_ring_with_connected_intervals() -> + ?FORALL({Start, End}, oneof(ranges()), + ?FORALL(Intervals, g_connected_intervals(Start, End), + mem3_util:get_ring(Intervals, Start, End) =:= lists:sort(Intervals) + ) + ). + + +prop_get_ring_connected_plus_random_intervals() -> + ?FORALL({Intervals, Extra}, {g_connected_intervals(1, 100), + g_random_intervals(1, 100)}, + ?IMPLIES(sets:is_disjoint(endpoints(Intervals), endpoints(Extra)), + begin + AllInts = Intervals ++ Extra, + Ring = mem3_util:get_ring(AllInts, 1, 100), + Ring =:= lists:sort(Intervals) + end + ) + ). + + +prop_get_ring_connected_with_sub_intervals() -> + ?FORALL(Intervals, g_connected_intervals(1, 100), + ?FORALL(SubIntervals, g_subintervals(Intervals), + begin + AllInts = Intervals ++ SubIntervals, + Ring = mem3_util:get_ring(AllInts, 1, 100), + Ring =:= lists:sort(Intervals) + end + ) + ). + + +prop_get_ring_with_disconnected_intervals() -> + ?FORALL({Start, End}, oneof(ranges()), + ?FORALL(Intervals, g_disconnected_intervals(Start, End), + mem3_util:get_ring(Intervals, Start, End) =:= [] + ) + ). + + +% Generators + +ranges() -> + [{1, 10}, {0, 2 bsl 31 - 1}, {2 bsl 31 - 10, 2 bsl 31 - 1}]. + + +g_connected_intervals(Begin, End) -> + ?SIZED(Size, g_connected_intervals(Begin, End, 5 * Size)). + + +g_connected_intervals(Begin, End, Split) when Begin =< End -> + ?LET(N, choose(0, Split), + begin + if + N == 0 -> + [{Begin, End}]; + N > 0 -> + Ns = lists:seq(1, N - 1), + Bs = lists:usort([rand_range(Begin, End) || _ <- Ns]), + Es = [B - 1 || B <- Bs], + shuffle(lists:zip([Begin] ++ Bs, Es ++ [End])) + end + end). + + +g_non_trivial_connected_intervals(Begin, End, Split) -> + ?SUCHTHAT(Connected, g_connected_intervals(Begin, End, Split), + length(Connected) > 1). + + +g_disconnected_intervals(Begin, End) -> + ?SIZED(Size, g_disconnected_intervals(Begin, End, Size)). + + +g_disconnected_intervals(Begin, End, Split) when Begin =< End -> + ?LET(Connected, g_non_trivial_connected_intervals(Begin, End, Split), + begin + I = triq_rnd:uniform(length(Connected)) - 1, + {Before, [_ | After]} = lists:split(I, Connected), + Before ++ After + end). + + +g_subintervals(Intervals) -> + lists:foldl(fun(R, Acc) -> split_interval(R) ++ Acc end, [], Intervals). + + +split_interval({B, E}) when E - B >= 2 -> + E1 = rand_range(B, E) - 1, + B1 = E1 + 1, + [{B, E1}, {B1, E}]; + +split_interval(_Range) -> + []. + + +g_random_intervals(Start, End) -> + ?LET(N, choose(1, 10), + begin + [begin + B = rand_range(Start, End), + E = rand_range(B, End), + {B, E} + end || _ <- lists:seq(1, N)] + end). + + +rand_range(B, B) -> + B; + +rand_range(B, E) -> + B + triq_rnd:uniform(E - B). + + +shuffle(L) -> + Tagged = [{triq_rnd:uniform(), X} || X <- L], + [X || {_, X} <- lists:sort(Tagged)]. + + +endpoints(Ranges) -> + {Begins, Ends} = lists:unzip(Ranges), + sets:from_list(Begins ++ Ends). diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl index f774dc9d4..21e2b5388 100644 --- a/src/rexi/src/rexi.erl +++ b/src/rexi/src/rexi.erl @@ -12,7 +12,7 @@ -module(rexi). -export([start/0, stop/0, restart/0]). --export([cast/2, cast/3, cast/4, kill/2]). +-export([cast/2, cast/3, cast/4, kill/2, kill_all/1]). -export([reply/1, sync_reply/1, sync_reply/2]). -export([async_server_call/2, async_server_call/3]). -export([stream_init/0, stream_init/1]). @@ -76,6 +76,18 @@ kill(Node, Ref) -> rexi_utils:send(rexi_utils:server_pid(Node), cast_msg({kill, Ref})), ok. +%% @doc Sends an async kill signal to the remote processes associated with Refs. +%% No rexi_EXIT message will be sent. +-spec kill_all([{node(), reference()}]) -> ok. +kill_all(NodeRefs) when is_list(NodeRefs) -> + PerNodeMap = lists:foldl(fun({Node, Ref}, Acc) -> + maps:update_with(Node, fun(Refs) -> [Ref | Refs] end, [Ref], Acc) + end, #{}, NodeRefs), + maps:map(fun(Node, Refs) -> + rexi_utils:send(rexi_utils:server_pid(Node), cast_msg({kill_all, Refs})) + end, PerNodeMap), + ok. + %% @equiv async_server_call(Server, self(), Request) -spec async_server_call(pid() | {atom(),node()}, any()) -> reference(). async_server_call(Server, Request) -> diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl index 58a510b68..fedff69c3 100644 --- a/src/rexi/src/rexi_server.erl +++ b/src/rexi/src/rexi_server.erl @@ -79,15 +79,13 @@ handle_cast({doit, {ClientPid, ClientRef} = From, Nonce, MFA}, State) -> {noreply, add_job(Job, State)}; -handle_cast({kill, FromRef}, #st{clients = Clients} = St) -> - case find_worker(FromRef, Clients) of - #job{worker = KeyRef, worker_pid = Pid} = Job -> - erlang:demonitor(KeyRef), - exit(Pid, kill), - {noreply, remove_job(Job, St)}; - false -> - {noreply, St} - end; +handle_cast({kill, FromRef}, St) -> + kill_worker(FromRef, St), + {noreply, St}; + +handle_cast({kill_all, FromRefs}, St) -> + lists:foreach(fun(FromRef) -> kill_worker(FromRef, St) end, FromRefs), + {noreply, St}; handle_cast(_, St) -> couch_log:notice("rexi_server ignored_cast", []), @@ -181,3 +179,15 @@ find_worker(Ref, Tab) -> notify_caller({Caller, Ref}, Reason) -> rexi_utils:send(Caller, {Ref, {rexi_EXIT, Reason}}). + + +kill_worker(FromRef, #st{clients = Clients} = St) -> + case find_worker(FromRef, Clients) of + #job{worker = KeyRef, worker_pid = Pid} = Job -> + erlang:demonitor(KeyRef), + exit(Pid, kill), + remove_job(Job, St), + ok; + false -> + ok + end. |