diff options
Diffstat (limited to 'src/fabric/src/fabric_view_changes.erl')
-rw-r--r-- | src/fabric/src/fabric_view_changes.erl | 820 |
1 files changed, 0 insertions, 820 deletions
diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl deleted file mode 100644 index febbd3169..000000000 --- a/src/fabric/src/fabric_view_changes.erl +++ /dev/null @@ -1,820 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_view_changes). - --export([go/5, pack_seqs/1, unpack_seqs/2]). --export([increment_changes_epoch/0]). - -%% exported for upgrade purposes. --export([keep_sending_changes/8]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("eunit/include/eunit.hrl"). - --import(fabric_db_update_listener, [wait_db_updated/1]). - -go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse - Feed == "longpoll" orelse Feed == "eventsource" -> - Args = make_changes_args(Options), - Since = get_start_seq(DbName, Args), - case validate_start_seq(DbName, Since) of - ok -> - {ok, Acc} = Callback(start, Acc0), - {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback), - Ref = make_ref(), - Parent = self(), - UpdateListener = {spawn_link(fabric_db_update_listener, go, - [Parent, Ref, DbName, Timeout]), - Ref}, - put(changes_epoch, get_changes_epoch()), - try - keep_sending_changes( - DbName, - Args, - Callback, - Since, - Acc, - Timeout, - UpdateListener, - os:timestamp() - ) - after - fabric_db_update_listener:stop(UpdateListener) - end; - Error -> - Callback(Error, Acc0) - end; - -go(DbName, "normal", Options, Callback, Acc0) -> - Args = make_changes_args(Options), - Since = get_start_seq(DbName, Args), - case validate_start_seq(DbName, Since) of - ok -> - {ok, Acc} = Callback(start, Acc0), - {ok, Collector} = send_changes( - DbName, - Args, - Callback, - Since, - Acc, - 5000 - ), - #collector{counters=Seqs, user_acc=AccOut, offset=Offset} = Collector, - Callback({stop, pack_seqs(Seqs), pending_count(Offset)}, AccOut); - Error -> - Callback(Error, Acc0) - end. - -keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0) -> - #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args, - {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, Timeout), - #collector{ - limit = Limit2, - counters = NewSeqs, - offset = Offset, - user_acc = AccOut0 - } = Collector, - LastSeq = pack_seqs(NewSeqs), - MaintenanceMode = config:get("couchdb", "maintenance_mode"), - NewEpoch = get_changes_epoch() > erlang:get(changes_epoch), - if Limit > Limit2, Feed == "longpoll"; - MaintenanceMode == "true"; MaintenanceMode == "nolb"; NewEpoch -> - Callback({stop, LastSeq, pending_count(Offset)}, AccOut0); - true -> - {ok, AccOut} = Callback(waiting_for_updates, AccOut0), - WaitForUpdate = wait_db_updated(UpListen), - AccumulatedTime = timer:now_diff(os:timestamp(), T0) div 1000, - Max = case config:get("fabric", "changes_duration") of - undefined -> - infinity; - MaxStr -> - list_to_integer(MaxStr) - end, - case {Heartbeat, AccumulatedTime > Max, WaitForUpdate} of - {_, _, changes_feed_died} -> - Callback({stop, LastSeq, pending_count(Offset)}, AccOut); - {undefined, _, timeout} -> - Callback({stop, LastSeq, pending_count(Offset)}, AccOut); - {_, true, timeout} -> - Callback({stop, LastSeq, pending_count(Offset)}, AccOut); - _ -> - {ok, AccTimeout} = Callback(timeout, AccOut), - ?MODULE:keep_sending_changes( - DbName, - Args#changes_args{limit=Limit2}, - Callback, - LastSeq, - AccTimeout, - Timeout, - UpListen, - T0 - ) - end - end. - -send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) -> - LiveNodes = [node() | nodes()], - AllLiveShards = mem3:live_shards(DbName, LiveNodes), - Seqs0 = unpack_seqs(PackedSeqs, DbName), - {WSeqs0, Dead, Reps} = find_replacements(Seqs0, AllLiveShards), - % Start workers which didn't need replacements - WSeqs = lists:map(fun({#shard{name = Name, node = N} = S, Seq}) -> - Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Seq]}), - {S#shard{ref = Ref}, Seq} - end, WSeqs0), - % For some dead workers see if they are a result of split shards. In that - % case make a replacement argument so that local rexi workers can calculate - % (hopefully) a > 0 update sequence. - {WSplitSeqs0, Reps1} = find_split_shard_replacements(Dead, Reps), - WSplitSeqs = lists:map(fun({#shard{name = Name, node = N} = S, Seq}) -> - Arg = make_replacement_arg(N, Seq), - Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}), - {S#shard{ref = Ref}, Seq} - end, WSplitSeqs0), - % For ranges that were not split start sequences from 0 - WReps = lists:map(fun(#shard{name = Name, node = N} = S) -> - Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}), - {S#shard{ref = Ref}, 0} - end, Reps1), - Seqs = WSeqs ++ WSplitSeqs ++ WReps, - {Workers0, _} = lists:unzip(Seqs), - Repls = fabric_ring:get_shard_replacements(DbName, Workers0), - StartFun = fun(#shard{name=Name, node=N, range=R0}=Shard) -> - %% Find the original shard copy in the Seqs array - case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, Seqs) of - [{#shard{}, {replace, _, _, _}} | _] -> - % Don't attempt to replace a replacement - SeqArg = 0; - [{#shard{node = OldNode}, OldSeq} | _] -> - SeqArg = make_replacement_arg(OldNode, OldSeq); - _ -> - % TODO this clause is probably unreachable in the N>2 - % case because we compute replacements only if a shard has one - % in the original set. - couch_log:error("Streaming ~s from zero while replacing ~p", - [Name, PackedSeqs]), - SeqArg = 0 - end, - Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, SeqArg]}), - Shard#shard{ref = Ref} - end, - RexiMon = fabric_util:create_monitors(Workers0), - try - case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of - {ok, Workers} -> - try - LiveSeqs = lists:map(fun(W) -> - case lists:keyfind(W, 1, Seqs) of - {W, Seq} -> {W, Seq}; - _ -> {W, 0} - end - end, Workers), - send_changes(DbName, Workers, LiveSeqs, ChangesArgs, - Callback, AccIn, Timeout) - after - fabric_streams:cleanup(Workers) - end; - {timeout, NewState} -> - DefunctWorkers = fabric_util:remove_done_workers( - NewState#stream_acc.workers, - waiting - ), - fabric_util:log_timeout( - DefunctWorkers, - "changes" - ), - throw({error, timeout}); - {error, Reason} -> - throw({error, Reason}); - Else -> - throw({error, Else}) - end - after - rexi_monitor:stop(RexiMon) - end. - -send_changes(DbName, Workers, Seqs, ChangesArgs, Callback, AccIn, Timeout) -> - State = #collector{ - db_name = DbName, - query_args = ChangesArgs, - callback = Callback, - counters = orddict:from_list(Seqs), - user_acc = AccIn, - limit = ChangesArgs#changes_args.limit, - offset = fabric_dict:init(Workers, null), - rows = Seqs % store sequence positions instead - }, - %% TODO: errors need to be handled here - receive_results(Workers, State, Timeout, Callback). - -receive_results(Workers, State, Timeout, Callback) -> - case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State, - Timeout, infinity) of - {timeout, NewState0} -> - {ok, AccOut} = Callback(timeout, NewState0#collector.user_acc), - NewState = NewState0#collector{user_acc = AccOut}, - receive_results(Workers, NewState, Timeout, Callback); - {_, NewState} -> - {ok, NewState} - end. - -handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> - fabric_view:check_down_shards(State, NodeRef); - -handle_message({rexi_EXIT, Reason}, Worker, State) -> - fabric_view:handle_worker_exit(State, Worker, Reason); - -% Temporary upgrade clause - Case 24236 -handle_message({complete, Key}, Worker, State) when is_tuple(Key) -> - handle_message({complete, [{seq, Key}, {pending, 0}]}, Worker, State); - -handle_message({change, Props}, {Worker, _}, #collector{limit=0} = State) -> - O0 = State#collector.offset, - O1 = case fabric_dict:lookup_element(Worker, O0) of - null -> - % Use Pending+1 because we're ignoring this row in the response - Pending = couch_util:get_value(pending, Props, 0), - fabric_dict:store(Worker, Pending+1, O0); - _ -> - O0 - end, - maybe_stop(State#collector{offset = O1}); - -handle_message({complete, Props}, Worker, #collector{limit=0} = State) -> - O0 = State#collector.offset, - O1 = case fabric_dict:lookup_element(Worker, O0) of - null -> - fabric_dict:store(Worker, couch_util:get_value(pending,Props), O0); - _ -> - O0 - end, - maybe_stop(State#collector{offset = O1}); - -handle_message({no_pass, Props}, {Worker, From}, #collector{limit=0} = State) - when is_list(Props) -> - #collector{counters = S0, offset = O0} = State, - O1 = case fabric_dict:lookup_element(Worker, O0) of - null -> - fabric_dict:store(Worker, couch_util:get_value(pending, Props), O0); - _ -> - O0 - end, - S1 = fabric_dict:store(Worker, couch_util:get_value(seq, Props), S0), - rexi:stream_ack(From), - maybe_stop(State#collector{counters = S1, offset = O1}); - -handle_message(#change{} = Row, {Worker, From}, St) -> - Change = {change, [ - {seq, Row#change.key}, - {id, Row#change.id}, - {changes, Row#change.value}, - {deleted, Row#change.deleted}, - {doc, Row#change.doc} - ]}, - handle_message(Change, {Worker, From}, St); - -handle_message({change, Props}, {Worker, From}, St) -> - #collector{ - callback = Callback, - counters = S0, - offset = O0, - limit = Limit, - user_acc = AccIn - } = St, - true = fabric_dict:is_key(Worker, S0), - S1 = fabric_dict:store(Worker, couch_util:get_value(seq, Props), S0), - O1 = fabric_dict:store(Worker, couch_util:get_value(pending, Props), O0), - % Temporary hack for FB 23637 - Interval = erlang:get(changes_seq_interval), - if (Interval == undefined) orelse (Limit rem Interval == 0) -> - Props2 = lists:keyreplace(seq, 1, Props, {seq, pack_seqs(S1)}); - true -> - Props2 = lists:keyreplace(seq, 1, Props, {seq, null}) - end, - {Go, Acc} = Callback(changes_row(Props2), AccIn), - rexi:stream_ack(From), - {Go, St#collector{counters=S1, offset=O1, limit=Limit-1, user_acc=Acc}}; - -%% upgrade clause -handle_message({no_pass, Seq}, From, St) when is_integer(Seq) -> - handle_message({no_pass, [{seq, Seq}]}, From, St); - -handle_message({no_pass, Props}, {Worker, From}, St) -> - Seq = couch_util:get_value(seq, Props), - #collector{counters = S0} = St, - true = fabric_dict:is_key(Worker, S0), - S1 = fabric_dict:store(Worker, Seq, S0), - rexi:stream_ack(From), - {ok, St#collector{counters=S1}}; - -handle_message({complete, Props}, Worker, State) -> - Key = couch_util:get_value(seq, Props), - #collector{ - counters = S0, - offset = O0, - total_rows = Completed % override - } = State, - true = fabric_dict:is_key(Worker, S0), - S1 = fabric_dict:store(Worker, Key, S0), - O1 = fabric_dict:store(Worker, couch_util:get_value(pending, Props), O0), - NewState = State#collector{counters=S1, offset=O1, total_rows=Completed+1}, - % We're relying on S1 having exactly the numnber of workers that - % are participtaing in this response. With the new stream_start - % that's a bit more obvious but historically it wasn't quite - % so clear. The Completed variable is just a hacky override - % of the total_rows field in the #collector{} record. - NumWorkers = fabric_dict:size(S1), - Go = case NumWorkers =:= (Completed+1) of - true -> stop; - false -> ok - end, - {Go, NewState}. - - -make_replacement_arg(Node, {Seq, Uuid}) -> - {replace, Node, Uuid, Seq}; -make_replacement_arg(_Node, {Seq, Uuid, EpochNode}) -> - % The replacement should properly be computed aginst the node that owned - % the sequence when it was written to disk (the EpochNode) rather than the - % node we're trying to replace. - {replace, EpochNode, Uuid, Seq}; -make_replacement_arg(_, _) -> - 0. - -maybe_stop(#collector{offset = Offset} = State) -> - case fabric_dict:any(null, Offset) of - false -> - {stop, State}; - true -> - % Wait till we've heard from everyone to compute pending count - {ok, State} - end. - -make_changes_args(#changes_args{style=Style, filter_fun=undefined}=Args) -> - Args#changes_args{filter_fun = {default, Style}}; -make_changes_args(Args) -> - Args. - -get_start_seq(DbName, #changes_args{dir=Dir, since=Since}) - when Dir == rev; Since == "now" -> - {ok, Info} = fabric:get_db_info(DbName), - couch_util:get_value(update_seq, Info); -get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) -> - Since. - -pending_count(Dict) -> - fabric_dict:fold(fun - (_Worker, Count, Acc) when is_integer(Count), is_integer(Acc) -> - Count + Acc; - (_Worker, _Count, _Acc) -> - null - end, 0, Dict). - -pack_seqs(Workers) -> - SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers], - SeqSum = lists:sum([seq(S) || {_,_,S} <- SeqList]), - Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])), - ?l2b([integer_to_list(SeqSum), $-, Opaque]). - -seq({Seq, _Uuid, _Node}) -> Seq; -seq({Seq, _Uuid}) -> Seq; -seq(Seq) -> Seq. - - -unpack_seq_regex_match(Packed) -> - NewPattern = "^\\[[0-9]+\s*,\s*\"(?<opaque>.*)\"\\]$", - OldPattern = "^\"?([0-9]+-)?(?<opaque>.*?)\"?$", - Options = [{capture, [opaque], binary}], - case re:run(Packed, NewPattern, Options) of - {match, Match} -> - Match; - nomatch -> - {match, Match} = re:run(Packed, OldPattern, Options), - Match - end. - - -unpack_seq_decode_term(Opaque) -> - binary_to_term(couch_util:decodeBase64Url(Opaque)). - - -unpack_seqs(0, DbName) -> - fabric_dict:init(mem3:shards(DbName), 0); - -unpack_seqs("0", DbName) -> - fabric_dict:init(mem3:shards(DbName), 0); - -unpack_seqs([_SeqNum, Opaque], DbName) -> % deprecated - do_unpack_seqs(Opaque, DbName); - -unpack_seqs(Packed, DbName) -> - Opaque = unpack_seq_regex_match(Packed), - do_unpack_seqs(Opaque, DbName). - -do_unpack_seqs(Opaque, DbName) -> - % A preventative fix for FB 13533 to remove duplicate shards. - % This just picks each unique shard and keeps the largest seq - % value recorded. - Decoded = unpack_seq_decode_term(Opaque), - DedupDict = lists:foldl(fun({Node, [A, B], Seq}, Acc) -> - dict:append({Node, [A, B]}, Seq, Acc) - end, dict:new(), Decoded), - Deduped = lists:map(fun({{Node, [A, B]}, SeqList}) -> - {Node, [A, B], lists:max(SeqList)} - end, dict:to_list(DedupDict)), - - % Create a fabric_dict of {Shard, Seq} entries - % TODO relies on internal structure of fabric_dict as keylist - Unpacked = lists:flatmap(fun({Node, [A,B], Seq}) -> - case mem3:get_shard(DbName, Node, [A,B]) of - {ok, Shard} -> - [{Shard, Seq}]; - {error, not_found} -> - [] - end - end, Deduped), - - % This just handles the case if the ring in the unpacked sequence - % received is not complete and in that case tries to fill in the - % missing ranges with shards from the shard map - case fabric_ring:is_progress_possible(Unpacked) of - true -> - Unpacked; - false -> - PotentialWorkers = lists:map(fun({Node, [A, B], Seq}) -> - case mem3:get_shard(DbName, Node, [A, B]) of - {ok, Shard} -> - {Shard, Seq}; - {error, not_found} -> - {#shard{node = Node, range = [A, B]}, Seq} - end - end, Deduped), - Shards = mem3:shards(DbName), - {Unpacked1, Dead, Reps} = find_replacements(PotentialWorkers, Shards), - {Splits, Reps1} = find_split_shard_replacements(Dead, Reps), - RepSeqs = lists:map(fun(#shard{} = S) -> - {S, get_old_seq(S, Deduped)} - end, Reps1), - Unpacked1 ++ Splits ++ RepSeqs - end. - - -get_old_seq(#shard{range=R}=Shard, SinceSeqs) -> - case lists:keyfind(R, 2, SinceSeqs) of - {Node, R, Seq} when is_number(Seq) -> - % Unfortunately we don't have access to the db - % uuid so we can't set a replacememnt here. - couch_log:warning("~p get_old_seq missing uuid " - "node: ~p, range: ~p, seq: ~p", [?MODULE, Node, R, Seq]), - 0; - {Node, R, {Seq, Uuid}} -> - % This update seq is using the old format that - % didn't include the node. This information is - % important for replacement. - {Seq, Uuid, Node}; - {_Node, R, {Seq, Uuid, EpochNode}} -> - % This is the newest sequence format that we - % can use for replacement. - {Seq, Uuid, EpochNode}; - Error -> - couch_log:warning("~p get_old_seq error: ~p, shard: ~p, seqs: ~p", - [?MODULE, Error, Shard, SinceSeqs]), - 0 - end. - - -changes_row(Props0) -> - Props1 = case couch_util:get_value(deleted, Props0) of - true -> - Props0; - _ -> - lists:keydelete(deleted, 1, Props0) - end, - Allowed = [seq, id, changes, deleted, doc, error], - Props2 = lists:filter(fun({K,_V}) -> lists:member(K, Allowed) end, Props1), - {change, {Props2}}. - - -find_replacements(Workers, AllShards) -> - % Build map [B, E] => [Worker1, Worker2, ...] for all workers - WrkMap = lists:foldl(fun({#shard{range = [B, E]}, _} = W, Acc) -> - maps:update_with({B, E}, fun(Ws) -> [W | Ws] end, [W], Acc) - end, #{}, fabric_dict:to_list(Workers)), - - % Build map [B, E] => [Shard1, Shard2, ...] for all shards - AllMap = lists:foldl(fun(#shard{range = [B, E]} = S, Acc) -> - maps:update_with({B, E}, fun(Ss) -> [S | Ss] end, [S], Acc) - end, #{}, AllShards), - - % Custom sort function will prioritize workers over other shards. - % The idea is to not unnecessarily kill workers if we don't have to - SortFun = fun - (R1 = {B, E1}, R2 = {B, E2}) -> - case {maps:is_key(R1, WrkMap), maps:is_key(R2, WrkMap)} of - {true, true} -> - % Both are workers, larger interval wins - E1 >= E2; - {true, false} -> - % First element is a worker range, it wins - true; - {false, true} -> - % Second element is a worker range, it wins - false; - {false, false} -> - % Neither one is a worker interval, pick larger one - E1 >= E2 - end; - ({B1, _}, {B2, _}) -> - B1 =< B2 - end, - Ring = mem3_util:get_ring(maps:keys(AllMap), SortFun), - - % Keep only workers in the ring and from one of the available nodes - Keep = fun(#shard{range = [B, E], node = N}) -> - lists:member({B, E}, Ring) andalso lists:keyfind(N, #shard.node, - maps:get({B, E}, AllMap)) =/= false - end, - Workers1 = fabric_dict:filter(fun(S, _) -> Keep(S) end, Workers), - Removed = fabric_dict:filter(fun(S, _) -> not Keep(S) end, Workers), - - {Rep, _} = lists:foldl(fun(R, {RepAcc, AllMapAcc}) -> - case maps:is_key(R, WrkMap)of - true -> - % It's a worker and in the map of available shards. Make sure - % to keep it only if there is a range available on that node - % only (reuse Keep/1 predicate from above) - WorkersInRange = maps:get(R, WrkMap), - case lists:any(fun({S, _}) -> Keep(S) end, WorkersInRange) of - true -> - {RepAcc, AllMapAcc}; - false -> - [Shard | Rest] = maps:get(R, AllMapAcc), - {[Shard | RepAcc], AllMapAcc#{R := Rest}} - end; - false -> - % No worker for this range. Replace from available shards - [Shard | Rest] = maps:get(R, AllMapAcc), - {[Shard | RepAcc], AllMapAcc#{R := Rest}} - end - end, {[], AllMap}, Ring), - - % Return the list of workers that are part of ring, list of removed workers - % and a list of replacement shards that could be used to make sure the ring - % completes. - {Workers1, Removed, Rep}. - - -% From the list of dead workers determine if any are a result of a split shard. -% In that case perhaps there is a way to not rewind the changes feed back to 0. -% Returns {NewWorkers, Available} where NewWorkers is the list of -% viable workers Available is the list of still unused input Shards -find_split_shard_replacements(DeadWorkers, Shards) -> - Acc0 = {[], Shards}, - AccF = fabric_dict:fold(fun(#shard{node = WN, range = R}, Seq, Acc) -> - [B, E] = R, - {SplitWorkers, Available} = Acc, - ShardsOnSameNode = [S || #shard{node = N} = S <- Available, N =:= WN], - SplitShards = mem3_util:non_overlapping_shards(ShardsOnSameNode, B, E), - RepCount = length(SplitShards), - NewWorkers = [{S, make_split_seq(Seq, RepCount)} || S <- SplitShards], - NewAvailable = [S || S <- Available, not lists:member(S, SplitShards)], - {NewWorkers ++ SplitWorkers, NewAvailable} - end, Acc0, DeadWorkers), - {Workers, Available} = AccF, - {fabric_dict:from_list(Workers), Available}. - - -make_split_seq({Num, Uuid, Node}, RepCount) when RepCount > 1 -> - {Num, {split, Uuid}, Node}; -make_split_seq(Seq, _) -> - Seq. - - -validate_start_seq(_DbName, "now") -> - ok; -validate_start_seq(_DbName, 0) -> - ok; -validate_start_seq(_DbName, "0") -> - ok; -validate_start_seq(_DbName, Seq) -> - try - case Seq of - [_SeqNum, Opaque] -> - unpack_seq_decode_term(Opaque); - Seq -> - Opaque = unpack_seq_regex_match(Seq), - unpack_seq_decode_term(Opaque) - end, - ok - catch - _:_ -> - Reason = <<"Malformed sequence supplied in 'since' parameter.">>, - {error, {bad_request, Reason}} - end. - -get_changes_epoch() -> - case application:get_env(fabric, changes_epoch) of - undefined -> - increment_changes_epoch(), - get_changes_epoch(); - {ok, Epoch} -> - Epoch - end. - -increment_changes_epoch() -> - application:set_env(fabric, changes_epoch, os:timestamp()). - - -unpack_seq_setup() -> - meck:new(mem3), - meck:new(fabric_view), - meck:expect(mem3, get_shard, fun(_, _, _) -> {ok, #shard{}} end), - meck:expect(fabric_ring, is_progress_possible, fun(_) -> true end), - ok. - - -unpack_seqs_test_() -> - { - setup, - fun unpack_seq_setup/0, - fun (_) -> meck:unload() end, - [ - t_unpack_seqs() - ] - }. - - -t_unpack_seqs() -> - ?_test(begin - % BigCouch 0.3 style. - assert_shards("23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA"), - - % BigCouch 0.4 style. - assert_shards([23423,<<"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA">>]), - - % BigCouch 0.4 style (as string). - assert_shards("[23423,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), - assert_shards("[23423 ,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), - assert_shards("[23423, \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), - assert_shards("[23423 , \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), - - % with internal hypen - assert_shards("651-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ" - "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8" - "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"), - assert_shards([651,"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ" - "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8" - "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"]), - - % CouchDB 1.2 style - assert_shards("\"23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" - "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" - "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"") - end). - - -assert_shards(Packed) -> - ?assertMatch([{#shard{},_}|_], unpack_seqs(Packed, <<"foo">>)). - - -find_replacements_test() -> - % None of the workers are in the live list of shard but there is a - % replacement on n3 for the full range. It should get picked instead of - % the two smaller one on n2. - Workers1 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]), - AllShards1 = [ - mk_shard("n1", 11, ?RING_END), - mk_shard("n2", 0, 4), - mk_shard("n2", 5, 10), - mk_shard("n3", 0, ?RING_END) - ], - {WorkersRes1, Dead1, Reps1} = find_replacements(Workers1, AllShards1), - ?assertEqual([], WorkersRes1), - ?assertEqual(Workers1, Dead1), - ?assertEqual([mk_shard("n3", 0, ?RING_END)], Reps1), - - % None of the workers are in the live list of shards and there is a - % split replacement from n2 (range [0, 10] replaced with [0, 4], [5, 10]) - Workers2 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]), - AllShards2 = [ - mk_shard("n1", 11, ?RING_END), - mk_shard("n2", 0, 4), - mk_shard("n2", 5, 10) - ], - {WorkersRes2, Dead2, Reps2} = find_replacements(Workers2, AllShards2), - ?assertEqual([], WorkersRes2), - ?assertEqual(Workers2, Dead2), - ?assertEqual([ - mk_shard("n1", 11, ?RING_END), - mk_shard("n2", 0, 4), - mk_shard("n2", 5, 10) - ], lists:sort(Reps2)), - - % One worker is available and one needs to be replaced. Replacement will be - % from two split shards - Workers3 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}]), - AllShards3 = [ - mk_shard("n1", 11, ?RING_END), - mk_shard("n2", 0, 4), - mk_shard("n2", 5, 10), - mk_shard("n2", 11, ?RING_END) - ], - {WorkersRes3, Dead3, Reps3} = find_replacements(Workers3, AllShards3), - ?assertEqual(mk_workers([{"n2", 11, ?RING_END}]), WorkersRes3), - ?assertEqual(mk_workers([{"n1", 0, 10}]), Dead3), - ?assertEqual([ - mk_shard("n2", 0, 4), - mk_shard("n2", 5, 10) - ], lists:sort(Reps3)), - - % All workers are available. Make sure they are not killed even if there is - % a longer (single) shard to replace them. - Workers4 = mk_workers([{"n1", 0, 10}, {"n1", 11, ?RING_END}]), - AllShards4 = [ - mk_shard("n1", 0, 10), - mk_shard("n1", 11, ?RING_END), - mk_shard("n2", 0, 4), - mk_shard("n2", 5, 10), - mk_shard("n3", 0, ?RING_END) - ], - {WorkersRes4, Dead4, Reps4} = find_replacements(Workers4, AllShards4), - ?assertEqual(Workers4, WorkersRes4), - ?assertEqual([], Dead4), - ?assertEqual([], Reps4). - - -mk_workers(NodesRanges) -> - mk_workers(NodesRanges, nil). - -mk_workers(NodesRanges, Val) -> - orddict:from_list([{mk_shard(N, B, E), Val} || {N, B, E} <- NodesRanges]). - - -mk_shard(Name, B, E) -> - Node = list_to_atom(Name), - BName = list_to_binary(Name), - #shard{name = BName, node = Node, range = [B, E]}. - - -find_split_shard_replacements_test() -> - % One worker is can be replaced and one can't - Dead1 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42), - Shards1 = [ - mk_shard("n1", 0, 4), - mk_shard("n1", 5, 10), - mk_shard("n3", 11, ?RING_END) - ], - {Workers1, ShardsLeft1} = find_split_shard_replacements(Dead1, Shards1), - ?assertEqual(mk_workers([{"n1", 0, 4}, {"n1", 5, 10}], 42), Workers1), - ?assertEqual([mk_shard("n3", 11, ?RING_END)], ShardsLeft1), - - % All workers can be replaced - one by 1 shard, another by 3 smaller shards - Dead2 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42), - Shards2 = [ - mk_shard("n1", 0, 10), - mk_shard("n2", 11, 12), - mk_shard("n2", 13, 14), - mk_shard("n2", 15, ?RING_END) - ], - {Workers2, ShardsLeft2} = find_split_shard_replacements(Dead2, Shards2), - ?assertEqual(mk_workers([ - {"n1", 0, 10}, - {"n2", 11, 12}, - {"n2", 13, 14}, - {"n2", 15, ?RING_END} - ], 42), Workers2), - ?assertEqual([], ShardsLeft2), - - % No workers can be replaced. Ranges match but they are on different nodes - Dead3 = mk_workers([{"n1", 0, 10}, {"n2", 11, ?RING_END}], 42), - Shards3 = [ - mk_shard("n2", 0, 10), - mk_shard("n3", 11, ?RING_END) - ], - {Workers3, ShardsLeft3} = find_split_shard_replacements(Dead3, Shards3), - ?assertEqual([], Workers3), - ?assertEqual(Shards3, ShardsLeft3). |