summaryrefslogtreecommitdiff
path: root/src/fabric/src/fabric_view.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fabric/src/fabric_view.erl')
-rw-r--r--src/fabric/src/fabric_view.erl521
1 files changed, 295 insertions, 226 deletions
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index 3048e8987..c2ef13392 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -12,10 +12,18 @@
-module(fabric_view).
--export([remove_overlapping_shards/2, maybe_send_row/1,
- transform_row/1, keydict/1, extract_view/4, get_shards/2,
- check_down_shards/2, handle_worker_exit/3,
- get_shard_replacements/2, maybe_update_others/5]).
+-export([
+ remove_overlapping_shards/2,
+ maybe_send_row/1,
+ transform_row/1,
+ keydict/1,
+ extract_view/4,
+ get_shards/2,
+ check_down_shards/2,
+ handle_worker_exit/3,
+ get_shard_replacements/2,
+ maybe_update_others/5
+]).
-export([fix_skip_and_limit/1]).
-include_lib("fabric/include/fabric.hrl").
@@ -27,7 +35,7 @@
-spec check_down_shards(#collector{}, node()) ->
{ok, #collector{}} | {error, any()}.
check_down_shards(Collector, BadNode) ->
- #collector{callback=Callback, counters=Counters, user_acc=Acc} = Collector,
+ #collector{callback = Callback, counters = Counters, user_acc = Acc} = Collector,
Filter = fun(#shard{node = Node}, _) -> Node == BadNode end,
BadCounters = fabric_dict:filter(Filter, Counters),
case fabric_dict:size(BadCounters) > 0 of
@@ -42,24 +50,21 @@ check_down_shards(Collector, BadNode) ->
%% @doc Handle a worker that dies during a stream
-spec handle_worker_exit(#collector{}, #shard{}, any()) -> {error, any()}.
handle_worker_exit(Collector, _Worker, Reason) ->
- #collector{callback=Callback, user_acc=Acc} = Collector,
+ #collector{callback = Callback, user_acc = Acc} = Collector,
{ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
{error, Resp}.
-
-spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) ->
[{#shard{}, any()}].
remove_overlapping_shards(#shard{} = Shard, Counters) ->
remove_overlapping_shards(Shard, Counters, fun stop_worker/1).
-
-spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}], fun()) ->
[{#shard{}, any()}].
remove_overlapping_shards(#shard{} = Shard, Counters, RemoveCb) ->
Counters1 = filter_exact_copies(Shard, Counters, RemoveCb),
filter_possible_overlaps(Shard, Counters1, RemoveCb).
-
filter_possible_overlaps(Shard, Counters, RemoveCb) ->
Ranges0 = fabric_util:worker_ranges(Counters),
#shard{range = [BShard, EShard]} = Shard,
@@ -83,54 +88,59 @@ filter_possible_overlaps(Shard, Counters, RemoveCb) ->
B1 =< B2
end,
Ring = mem3_util:get_ring(Ranges, SortFun, MinB, MaxE),
- fabric_dict:filter(fun
- (S, _) when S =:= Shard ->
- % Keep the original shard
- true;
- (#shard{range = [B, E]} = S, _) ->
- case lists:member({B, E}, Ring) of
- true ->
- true; % Keep it
- false ->
- % Duplicate range, delete after calling callback function
- case is_function(RemoveCb) of
- true -> RemoveCb(S);
- false -> ok
- end,
- false
- end
- end, Counters).
-
+ fabric_dict:filter(
+ fun
+ (S, _) when S =:= Shard ->
+ % Keep the original shard
+ true;
+ (#shard{range = [B, E]} = S, _) ->
+ case lists:member({B, E}, Ring) of
+ true ->
+ % Keep it
+ true;
+ false ->
+ % Duplicate range, delete after calling callback function
+ case is_function(RemoveCb) of
+ true -> RemoveCb(S);
+ false -> ok
+ end,
+ false
+ end
+ end,
+ Counters
+ ).
filter_exact_copies(#shard{range = Range0} = Shard0, Shards, Cb) ->
- fabric_dict:filter(fun
- (Shard, _) when Shard =:= Shard0 ->
- true; % Don't remove ourselves
- (#shard{range = Range} = Shard, _) when Range =:= Range0 ->
- case is_function(Cb) of
- true -> Cb(Shard);
- false -> ok
- end,
- false;
- (_, _) ->
- true
- end, Shards).
-
+ fabric_dict:filter(
+ fun
+ (Shard, _) when Shard =:= Shard0 ->
+ % Don't remove ourselves
+ true;
+ (#shard{range = Range} = Shard, _) when Range =:= Range0 ->
+ case is_function(Cb) of
+ true -> Cb(Shard);
+ false -> ok
+ end,
+ false;
+ (_, _) ->
+ true
+ end,
+ Shards
+ ).
stop_worker(#shard{ref = Ref, node = Node}) ->
rexi:kill(Node, Ref).
-
-maybe_send_row(#collector{limit=0} = State) ->
- #collector{counters=Counters, user_acc=AccIn, callback=Callback} = State,
+maybe_send_row(#collector{limit = 0} = State) ->
+ #collector{counters = Counters, user_acc = AccIn, callback = Callback} = State,
case fabric_dict:any(0, Counters) of
- true ->
- % we still need to send the total/offset header
- {ok, State};
- false ->
- erase(meta_sent),
- {_, Acc} = Callback(complete, AccIn),
- {stop, State#collector{user_acc=Acc}}
+ true ->
+ % we still need to send the total/offset header
+ {ok, State};
+ false ->
+ erase(meta_sent),
+ {_, Acc} = Callback(complete, AccIn),
+ {stop, State#collector{user_acc = Acc}}
end;
maybe_send_row(State) ->
#collector{
@@ -141,82 +151,95 @@ maybe_send_row(State) ->
user_acc = AccIn
} = State,
case fabric_dict:any(0, Counters) of
- true ->
- {ok, State};
- false ->
- try get_next_row(State) of
- {_, NewState} when Skip > 0 ->
- maybe_send_row(NewState#collector{skip=Skip-1});
- {Row0, NewState} ->
- Row1 = possibly_embed_doc(NewState, Row0),
- Row2 = detach_partition(Row1),
- Row3 = transform_row(Row2),
- case Callback(Row3, AccIn) of
- {stop, Acc} ->
- {stop, NewState#collector{user_acc=Acc, limit=Limit-1}};
- {ok, Acc} ->
- maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1})
+ true ->
+ {ok, State};
+ false ->
+ try get_next_row(State) of
+ {_, NewState} when Skip > 0 ->
+ maybe_send_row(NewState#collector{skip = Skip - 1});
+ {Row0, NewState} ->
+ Row1 = possibly_embed_doc(NewState, Row0),
+ Row2 = detach_partition(Row1),
+ Row3 = transform_row(Row2),
+ case Callback(Row3, AccIn) of
+ {stop, Acc} ->
+ {stop, NewState#collector{user_acc = Acc, limit = Limit - 1}};
+ {ok, Acc} ->
+ maybe_send_row(NewState#collector{user_acc = Acc, limit = Limit - 1})
+ end
+ catch
+ complete ->
+ erase(meta_sent),
+ {_, Acc} = Callback(complete, AccIn),
+ {stop, State#collector{user_acc = Acc}}
end
- catch complete ->
- erase(meta_sent),
- {_, Acc} = Callback(complete, AccIn),
- {stop, State#collector{user_acc=Acc}}
- end
end.
%% if include_docs=true is used when keys and
%% the values contain "_id" then use the "_id"s
%% to retrieve documents and embed in result
-possibly_embed_doc(_State,
- #view_row{id=reduced}=Row) ->
+possibly_embed_doc(
+ _State,
+ #view_row{id = reduced} = Row
+) ->
Row;
-possibly_embed_doc(_State,
- #view_row{value=undefined}=Row) ->
+possibly_embed_doc(
+ _State,
+ #view_row{value = undefined} = Row
+) ->
Row;
-possibly_embed_doc(#collector{db_name=DbName, query_args=Args},
- #view_row{key=_Key, id=_Id, value=Value, doc=_Doc}=Row) ->
- #mrargs{include_docs=IncludeDocs} = Args,
+possibly_embed_doc(
+ #collector{db_name = DbName, query_args = Args},
+ #view_row{key = _Key, id = _Id, value = Value, doc = _Doc} = Row
+) ->
+ #mrargs{include_docs = IncludeDocs} = Args,
case IncludeDocs andalso is_tuple(Value) of
- true ->
- {Props} = Value,
- Rev0 = couch_util:get_value(<<"_rev">>, Props),
- case couch_util:get_value(<<"_id">>,Props) of
- null -> Row#view_row{doc=null};
- undefined -> Row;
- IncId ->
- % use separate process to call fabric:open_doc
- % to not interfere with current call
- {Pid, Ref} = spawn_monitor(fun() ->
- exit(
- case Rev0 of
+ true ->
+ {Props} = Value,
+ Rev0 = couch_util:get_value(<<"_rev">>, Props),
+ case couch_util:get_value(<<"_id">>, Props) of
+ null ->
+ Row#view_row{doc = null};
undefined ->
- case fabric:open_doc(DbName, IncId, []) of
- {ok, NewDoc} ->
- Row#view_row{doc=couch_doc:to_json_obj(NewDoc,[])};
- {not_found, _} ->
- Row#view_row{doc=null};
- Else ->
- Row#view_row{doc={error, Else}}
- end;
- Rev0 ->
- Rev = couch_doc:parse_rev(Rev0),
- case fabric:open_revs(DbName, IncId, [Rev], []) of
- {ok, [{ok, NewDoc}]} ->
- Row#view_row{doc=couch_doc:to_json_obj(NewDoc,[])};
- {ok, [{{not_found, _}, Rev}]} ->
- Row#view_row{doc=null};
- Else ->
- Row#view_row{doc={error, Else}}
+ Row;
+ IncId ->
+ % use separate process to call fabric:open_doc
+ % to not interfere with current call
+ {Pid, Ref} = spawn_monitor(fun() ->
+ exit(
+ case Rev0 of
+ undefined ->
+ case fabric:open_doc(DbName, IncId, []) of
+ {ok, NewDoc} ->
+ Row#view_row{doc = couch_doc:to_json_obj(NewDoc, [])};
+ {not_found, _} ->
+ Row#view_row{doc = null};
+ Else ->
+ Row#view_row{doc = {error, Else}}
+ end;
+ Rev0 ->
+ Rev = couch_doc:parse_rev(Rev0),
+ case fabric:open_revs(DbName, IncId, [Rev], []) of
+ {ok, [{ok, NewDoc}]} ->
+ Row#view_row{doc = couch_doc:to_json_obj(NewDoc, [])};
+ {ok, [{{not_found, _}, Rev}]} ->
+ Row#view_row{doc = null};
+ Else ->
+ Row#view_row{doc = {error, Else}}
+ end
+ end
+ )
+ end),
+ receive
+ {'DOWN', Ref, process, Pid, Resp} ->
+ Resp
end
- end) end),
- receive {'DOWN',Ref,process,Pid, Resp} ->
- Resp
- end
- end;
- _ -> Row
+ end;
+ _ ->
+ Row
end.
-detach_partition(#view_row{key={p, _Partition, Key}} = Row) ->
+detach_partition(#view_row{key = {p, _Partition, Key}} = Row) ->
Row#view_row{key = Key};
detach_partition(#view_row{} = Row) ->
Row.
@@ -224,8 +247,11 @@ detach_partition(#view_row{} = Row) ->
keydict(undefined) ->
undefined;
keydict(Keys) ->
- {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end,
- {dict:new(),0}, Keys),
+ {Dict, _} = lists:foldl(
+ fun(K, {D, I}) -> {dict:store(K, I, D), I + 1} end,
+ {dict:new(), 0},
+ Keys
+ ),
Dict.
%% internal %%
@@ -243,30 +269,34 @@ get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined ->
} = St,
{Key, RestKeys} = find_next_key(Keys, Dir, Collation, RowDict),
case reduce_row_dict_take(Key, RowDict, Collation) of
- {Records, NewRowDict} ->
- Counters = lists:foldl(fun(#view_row{worker={Worker,From}}, CntrsAcc) ->
- case From of
- {Pid, _} when is_pid(Pid) ->
- gen_server:reply(From, ok);
- Pid when is_pid(Pid) ->
- rexi:stream_ack(From)
- end,
- fabric_dict:update_counter(Worker, -1, CntrsAcc)
- end, Counters0, Records),
- Wrapped = [[V] || #view_row{value=V} <- Records],
- {ok, [Reduced]} = couch_query_servers:rereduce(Lang, [RedSrc], Wrapped),
- {ok, Finalized} = couch_query_servers:finalize(RedSrc, Reduced),
- NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters},
- {#view_row{key=Key, id=reduced, value=Finalized}, NewSt};
- error ->
- get_next_row(St#collector{keys=RestKeys})
+ {Records, NewRowDict} ->
+ Counters = lists:foldl(
+ fun(#view_row{worker = {Worker, From}}, CntrsAcc) ->
+ case From of
+ {Pid, _} when is_pid(Pid) ->
+ gen_server:reply(From, ok);
+ Pid when is_pid(Pid) ->
+ rexi:stream_ack(From)
+ end,
+ fabric_dict:update_counter(Worker, -1, CntrsAcc)
+ end,
+ Counters0,
+ Records
+ ),
+ Wrapped = [[V] || #view_row{value = V} <- Records],
+ {ok, [Reduced]} = couch_query_servers:rereduce(Lang, [RedSrc], Wrapped),
+ {ok, Finalized} = couch_query_servers:finalize(RedSrc, Reduced),
+ NewSt = St#collector{keys = RestKeys, rows = NewRowDict, counters = Counters},
+ {#view_row{key = Key, id = reduced, value = Finalized}, NewSt};
+ error ->
+ get_next_row(St#collector{keys = RestKeys})
end;
get_next_row(State) ->
- #collector{rows = [Row|Rest], counters = Counters0} = State,
+ #collector{rows = [Row | Rest], counters = Counters0} = State,
{Worker, From} = Row#view_row.worker,
rexi:stream_ack(From),
Counters1 = fabric_dict:update_counter(Worker, -1, Counters0),
- {Row, State#collector{rows = Rest, counters=Counters1}}.
+ {Row, State#collector{rows = Rest, counters = Counters1}}.
reduce_row_dict_take(Key, Dict, <<"raw">>) ->
dict:take(Key, Dict);
@@ -278,9 +308,13 @@ reduce_row_dict_take(Key, Dict, _Collation) ->
error;
[_ | _] ->
{Keys, Vals} = lists:unzip(KVs),
- NewDict = lists:foldl(fun(K, Acc) ->
- dict:erase(K, Acc)
- end, Dict, Keys),
+ NewDict = lists:foldl(
+ fun(K, Acc) ->
+ dict:erase(K, Acc)
+ end,
+ Dict,
+ Keys
+ ),
{lists:flatten(Vals), NewDict}
end.
@@ -290,28 +324,28 @@ find_next_key(nil, Dir, Collation, RowDict) ->
find_next_key(undefined, Dir, Collation, RowDict) ->
CmpFun = fun(A, B) -> compare(Dir, Collation, A, B) end,
case lists:sort(CmpFun, dict:fetch_keys(RowDict)) of
- [] ->
- throw(complete);
- [Key|_] ->
- {Key, nil}
+ [] ->
+ throw(complete);
+ [Key | _] ->
+ {Key, nil}
end;
find_next_key([], _, _, _) ->
throw(complete);
-find_next_key([Key|Rest], _, _, _) ->
+find_next_key([Key | Rest], _, _, _) ->
{Key, Rest}.
-transform_row(#view_row{value={[{reduce_overflow_error, Msg}]}}) ->
- {row, [{key,null}, {id,error}, {value,reduce_overflow_error}, {reason,Msg}]};
-transform_row(#view_row{key=Key, id=reduced, value=Value}) ->
- {row, [{key,Key}, {value,Value}]};
-transform_row(#view_row{key=Key, id=undefined}) ->
- {row, [{key,Key}, {id,error}, {value,not_found}]};
-transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) ->
- {row, [{id,Id}, {key,Key}, {value,Value}]};
-transform_row(#view_row{key=Key, id=_Id, value=_Value, doc={error,Reason}}) ->
- {row, [{id,error}, {key,Key}, {value,Reason}]};
-transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) ->
- {row, [{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}.
+transform_row(#view_row{value = {[{reduce_overflow_error, Msg}]}}) ->
+ {row, [{key, null}, {id, error}, {value, reduce_overflow_error}, {reason, Msg}]};
+transform_row(#view_row{key = Key, id = reduced, value = Value}) ->
+ {row, [{key, Key}, {value, Value}]};
+transform_row(#view_row{key = Key, id = undefined}) ->
+ {row, [{key, Key}, {id, error}, {value, not_found}]};
+transform_row(#view_row{key = Key, id = Id, value = Value, doc = undefined}) ->
+ {row, [{id, Id}, {key, Key}, {value, Value}]};
+transform_row(#view_row{key = Key, id = _Id, value = _Value, doc = {error, Reason}}) ->
+ {row, [{id, error}, {key, Key}, {value, Reason}]};
+transform_row(#view_row{key = Key, id = Id, value = Value, doc = Doc}) ->
+ {row, [{id, Id}, {key, Key}, {value, Value}, {doc, Doc}]}.
compare(fwd, <<"raw">>, A, B) -> A < B;
compare(rev, <<"raw">>, A, B) -> B < A;
@@ -322,16 +356,17 @@ extract_view(Pid, ViewName, [], _ViewType) ->
couch_log:error("missing_named_view ~p", [ViewName]),
exit(Pid, kill),
exit(missing_named_view);
-extract_view(Pid, ViewName, [View|Rest], ViewType) ->
+extract_view(Pid, ViewName, [View | Rest], ViewType) ->
case lists:member(ViewName, view_names(View, ViewType)) of
- true ->
- if ViewType == reduce ->
- {index_of(ViewName, view_names(View, reduce)), View};
true ->
- View
- end;
- false ->
- extract_view(Pid, ViewName, Rest, ViewType)
+ if
+ ViewType == reduce ->
+ {index_of(ViewName, view_names(View, reduce)), View};
+ true ->
+ View
+ end;
+ false ->
+ extract_view(Pid, ViewName, Rest, ViewType)
end.
view_names(View, Type) when Type == red_map; Type == reduce ->
@@ -344,16 +379,17 @@ index_of(X, List) ->
index_of(_X, [], _I) ->
not_found;
-index_of(X, [X|_Rest], I) ->
+index_of(X, [X | _Rest], I) ->
I;
-index_of(X, [_|Rest], I) ->
- index_of(X, Rest, I+1).
+index_of(X, [_ | Rest], I) ->
+ index_of(X, Rest, I + 1).
get_shards(Db, #mrargs{} = Args) ->
DbPartitioned = fabric_util:is_partitioned(Db),
Partition = couch_mrview_util:get_extra(Args, partition),
- if DbPartitioned orelse Partition == undefined -> ok; true ->
- throw({bad_request, <<"partition specified on non-partitioned db">>})
+ if
+ DbPartitioned orelse Partition == undefined -> ok;
+ true -> throw({bad_request, <<"partition specified on non-partitioned db">>})
end,
DbName = fabric:dbname(Db),
% Decide which version of mem3:shards/1,2 or
@@ -372,12 +408,20 @@ get_shards(Db, #mrargs{} = Args) ->
{Shards, [{any, Shards}]}
end.
-maybe_update_others(DbName, DDoc, ShardsInvolved, ViewName,
- #mrargs{update=lazy} = Args) ->
+maybe_update_others(
+ DbName,
+ DDoc,
+ ShardsInvolved,
+ ViewName,
+ #mrargs{update = lazy} = Args
+) ->
ShardsNeedUpdated = mem3:shards(DbName) -- ShardsInvolved,
- lists:foreach(fun(#shard{node=Node, name=ShardName}) ->
- rpc:cast(Node, fabric_rpc, update_mrview, [ShardName, DDoc, ViewName, Args])
- end, ShardsNeedUpdated);
+ lists:foreach(
+ fun(#shard{node = Node, name = ShardName}) ->
+ rpc:cast(Node, fabric_rpc, update_mrview, [ShardName, DDoc, ViewName, Args])
+ end,
+ ShardsNeedUpdated
+ );
maybe_update_others(_DbName, _DDoc, _ShardsInvolved, _ViewName, _Args) ->
ok.
@@ -385,48 +429,57 @@ get_shard_replacements(DbName, UsedShards0) ->
% We only want to generate a replacements list from shards
% that aren't already used.
AllLiveShards = mem3:live_shards(DbName, [node() | nodes()]),
- UsedShards = [S#shard{ref=undefined} || S <- UsedShards0],
+ UsedShards = [S#shard{ref = undefined} || S <- UsedShards0],
get_shard_replacements_int(AllLiveShards -- UsedShards, UsedShards).
get_shard_replacements_int(UnusedShards, UsedShards) ->
% If we have more than one copy of a range then we don't
% want to try and add a replacement to any copy.
- RangeCounts = lists:foldl(fun(#shard{range=R}, Acc) ->
- dict:update_counter(R, 1, Acc)
- end, dict:new(), UsedShards),
+ RangeCounts = lists:foldl(
+ fun(#shard{range = R}, Acc) ->
+ dict:update_counter(R, 1, Acc)
+ end,
+ dict:new(),
+ UsedShards
+ ),
% For each seq shard range with a count of 1, find any
% possible replacements from the unused shards. The
% replacement list is keyed by range.
- lists:foldl(fun(#shard{range = [B, E] = Range}, Acc) ->
- case dict:find(Range, RangeCounts) of
- {ok, 1} ->
- Repls = mem3_util:non_overlapping_shards(UnusedShards, B, E),
- % Only keep non-empty lists of replacements
- if Repls == [] -> Acc; true ->
- [{Range, Repls} | Acc]
- end;
- _ ->
- Acc
- end
- end, [], UsedShards).
-
--spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs::#mrargs{}, WorkerArgs::#mrargs{}}.
+ lists:foldl(
+ fun(#shard{range = [B, E] = Range}, Acc) ->
+ case dict:find(Range, RangeCounts) of
+ {ok, 1} ->
+ Repls = mem3_util:non_overlapping_shards(UnusedShards, B, E),
+ % Only keep non-empty lists of replacements
+ if
+ Repls == [] -> Acc;
+ true -> [{Range, Repls} | Acc]
+ end;
+ _ ->
+ Acc
+ end
+ end,
+ [],
+ UsedShards
+ ).
+
+-spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs :: #mrargs{}, WorkerArgs :: #mrargs{}}.
fix_skip_and_limit(#mrargs{} = Args) ->
- {CoordArgs, WorkerArgs} = case couch_mrview_util:get_extra(Args, partition) of
- undefined ->
- #mrargs{skip=Skip, limit=Limit}=Args,
- {Args, Args#mrargs{skip=0, limit=Skip+Limit}};
- _Partition ->
- {Args#mrargs{skip=0}, Args}
- end,
+ {CoordArgs, WorkerArgs} =
+ case couch_mrview_util:get_extra(Args, partition) of
+ undefined ->
+ #mrargs{skip = Skip, limit = Limit} = Args,
+ {Args, Args#mrargs{skip = 0, limit = Skip + Limit}};
+ _Partition ->
+ {Args#mrargs{skip = 0}, Args}
+ end,
%% the coordinator needs to finalize each row, so make sure the shards don't
{CoordArgs, remove_finalizer(WorkerArgs)}.
remove_finalizer(Args) ->
couch_mrview_util:set_extra(Args, finalizer, null).
-
remove_overlapping_shards_test() ->
Cb = undefined,
@@ -436,29 +489,42 @@ remove_overlapping_shards_test() ->
Shard1 = mk_shard("node-3", [11, 20]),
Shards1 = fabric_dict:store(Shard1, nil, Shards),
R1 = remove_overlapping_shards(Shard1, Shards1, Cb),
- ?assertEqual([{0, 10}, {11, 20}, {21, ?RING_END}],
- fabric_util:worker_ranges(R1)),
+ ?assertEqual(
+ [{0, 10}, {11, 20}, {21, ?RING_END}],
+ fabric_util:worker_ranges(R1)
+ ),
?assert(fabric_dict:is_key(Shard1, R1)),
% Split overlap (shard overlap multiple workers)
Shard2 = mk_shard("node-3", [0, 20]),
Shards2 = fabric_dict:store(Shard2, nil, Shards),
R2 = remove_overlapping_shards(Shard2, Shards2, Cb),
- ?assertEqual([{0, 20}, {21, ?RING_END}],
- fabric_util:worker_ranges(R2)),
+ ?assertEqual(
+ [{0, 20}, {21, ?RING_END}],
+ fabric_util:worker_ranges(R2)
+ ),
?assert(fabric_dict:is_key(Shard2, R2)).
-
get_shard_replacements_test() ->
- Unused = [mk_shard(N, [B, E]) || {N, B, E} <- [
- {"n1", 11, 20}, {"n1", 21, ?RING_END},
- {"n2", 0, 4}, {"n2", 5, 10}, {"n2", 11, 20},
- {"n3", 0, 21, ?RING_END}
- ]],
- Used = [mk_shard(N, [B, E]) || {N, B, E} <- [
- {"n2", 21, ?RING_END},
- {"n3", 0, 10}, {"n3", 11, 20}
- ]],
+ Unused = [
+ mk_shard(N, [B, E])
+ || {N, B, E} <- [
+ {"n1", 11, 20},
+ {"n1", 21, ?RING_END},
+ {"n2", 0, 4},
+ {"n2", 5, 10},
+ {"n2", 11, 20},
+ {"n3", 0, 21, ?RING_END}
+ ]
+ ],
+ Used = [
+ mk_shard(N, [B, E])
+ || {N, B, E} <- [
+ {"n2", 21, ?RING_END},
+ {"n3", 0, 10},
+ {"n3", 11, 20}
+ ]
+ ],
Res = lists:sort(get_shard_replacements_int(Unused, Used)),
% Notice that [0, 10] range can be replaced by spawning the [0, 4] and [5,
% 10] workers on n1
@@ -469,22 +535,25 @@ get_shard_replacements_test() ->
],
?assertEqual(Expect, Res).
-
mk_cnts(Ranges, NoNodes) ->
- orddict:from_list([{Shard,nil}
- || Shard <-
- lists:flatten(lists:map(
- fun(Range) ->
- mk_shards(NoNodes,Range,[])
- end, Ranges))]
- ).
-
-mk_shards(0,_Range,Shards) ->
- Shards;
-mk_shards(NoNodes,Range,Shards) ->
- Name ="node-" ++ integer_to_list(NoNodes),
- mk_shards(NoNodes-1,Range, [mk_shard(Name, Range) | Shards]).
+ orddict:from_list([
+ {Shard, nil}
+ || Shard <-
+ lists:flatten(
+ lists:map(
+ fun(Range) ->
+ mk_shards(NoNodes, Range, [])
+ end,
+ Ranges
+ )
+ )
+ ]).
+mk_shards(0, _Range, Shards) ->
+ Shards;
+mk_shards(NoNodes, Range, Shards) ->
+ Name = "node-" ++ integer_to_list(NoNodes),
+ mk_shards(NoNodes - 1, Range, [mk_shard(Name, Range) | Shards]).
mk_shard(Name, Range) ->
Node = list_to_atom(Name),