summaryrefslogtreecommitdiff
path: root/src/fabric/src/fabric_view.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fabric/src/fabric_view.erl')
-rw-r--r--src/fabric/src/fabric_view.erl478
1 files changed, 0 insertions, 478 deletions
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
deleted file mode 100644
index 425f864c4..000000000
--- a/src/fabric/src/fabric_view.erl
+++ /dev/null
@@ -1,478 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_view).
-
--export([remove_overlapping_shards/2, maybe_send_row/1,
- transform_row/1, keydict/1, extract_view/4, get_shards/2,
- check_down_shards/2, handle_worker_exit/3,
- get_shard_replacements/2, maybe_update_others/5]).
--export([fix_skip_and_limit/1]).
-
--include_lib("fabric/include/fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
-
-%% @doc Check if a downed node affects any of our workers
--spec check_down_shards(#collector{}, node()) ->
- {ok, #collector{}} | {error, any()}.
-check_down_shards(Collector, BadNode) ->
- #collector{callback=Callback, counters=Counters, user_acc=Acc} = Collector,
- Filter = fun(#shard{node = Node}, _) -> Node == BadNode end,
- BadCounters = fabric_dict:filter(Filter, Counters),
- case fabric_dict:size(BadCounters) > 0 of
- true ->
- Reason = {nodedown, <<"progress not possible">>},
- Callback({error, Reason}, Acc),
- {error, Reason};
- false ->
- {ok, Collector}
- end.
-
-%% @doc Handle a worker that dies during a stream
--spec handle_worker_exit(#collector{}, #shard{}, any()) -> {error, any()}.
-handle_worker_exit(Collector, _Worker, Reason) ->
- #collector{callback=Callback, user_acc=Acc} = Collector,
- {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
- {error, Resp}.
-
-
--spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) ->
- [{#shard{}, any()}].
-remove_overlapping_shards(#shard{} = Shard, Counters) ->
- remove_overlapping_shards(Shard, Counters, fun stop_worker/1).
-
-
--spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}], fun()) ->
- [{#shard{}, any()}].
-remove_overlapping_shards(#shard{} = Shard, Counters, RemoveCb) ->
- Counters1 = filter_exact_copies(Shard, Counters, RemoveCb),
- filter_possible_overlaps(Shard, Counters1, RemoveCb).
-
-
-filter_possible_overlaps(Shard, Counters, RemoveCb) ->
- Ranges0 = fabric_util:worker_ranges(Counters),
- #shard{range = [BShard, EShard]} = Shard,
- Ranges = Ranges0 ++ [{BShard, EShard}],
- {Bs, Es} = lists:unzip(Ranges),
- {MinB, MaxE} = {lists:min(Bs), lists:max(Es)},
- % Use a custom sort function which prioritizes the given shard
- % range when the start endpoints match.
- SortFun = fun
- ({B, E}, {B, _}) when {B, E} =:= {BShard, EShard} ->
- % If start matches with the shard's start, shard always wins
- true;
- ({B, _}, {B, E}) when {B, E} =:= {BShard, EShard} ->
- % If start matches with the shard's start, shard always wins
- false;
- ({B, E1}, {B, E2}) ->
- % If start matches, pick the longest range first
- E2 >= E1;
- ({B1, _}, {B2, _}) ->
- % Then, by default, sort by start point
- B1 =< B2
- end,
- Ring = mem3_util:get_ring(Ranges, SortFun, MinB, MaxE),
- fabric_dict:filter(fun
- (S, _) when S =:= Shard ->
- % Keep the original shard
- true;
- (#shard{range = [B, E]} = S, _) ->
- case lists:member({B, E}, Ring) of
- true ->
- true; % Keep it
- false ->
- % Duplicate range, delete after calling callback function
- case is_function(RemoveCb) of
- true -> RemoveCb(S);
- false -> ok
- end,
- false
- end
- end, Counters).
-
-
-filter_exact_copies(#shard{range = Range0} = Shard0, Shards, Cb) ->
- fabric_dict:filter(fun
- (Shard, _) when Shard =:= Shard0 ->
- true; % Don't remove ourselves
- (#shard{range = Range} = Shard, _) when Range =:= Range0 ->
- case is_function(Cb) of
- true -> Cb(Shard);
- false -> ok
- end,
- false;
- (_, _) ->
- true
- end, Shards).
-
-
-stop_worker(#shard{ref = Ref, node = Node}) ->
- rexi:kill(Node, Ref).
-
-
-maybe_send_row(#collector{limit=0} = State) ->
- #collector{counters=Counters, user_acc=AccIn, callback=Callback} = State,
- case fabric_dict:any(0, Counters) of
- true ->
- % we still need to send the total/offset header
- {ok, State};
- false ->
- erase(meta_sent),
- {_, Acc} = Callback(complete, AccIn),
- {stop, State#collector{user_acc=Acc}}
- end;
-maybe_send_row(State) ->
- #collector{
- callback = Callback,
- counters = Counters,
- skip = Skip,
- limit = Limit,
- user_acc = AccIn
- } = State,
- case fabric_dict:any(0, Counters) of
- true ->
- {ok, State};
- false ->
- try get_next_row(State) of
- {_, NewState} when Skip > 0 ->
- maybe_send_row(NewState#collector{skip=Skip-1});
- {Row0, NewState} ->
- Row1 = possibly_embed_doc(NewState, Row0),
- Row2 = detach_partition(Row1),
- Row3 = transform_row(Row2),
- case Callback(Row3, AccIn) of
- {stop, Acc} ->
- {stop, NewState#collector{user_acc=Acc, limit=Limit-1}};
- {ok, Acc} ->
- maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1})
- end
- catch complete ->
- erase(meta_sent),
- {_, Acc} = Callback(complete, AccIn),
- {stop, State#collector{user_acc=Acc}}
- end
- end.
-
-%% if include_docs=true is used when keys and
-%% the values contain "_id" then use the "_id"s
-%% to retrieve documents and embed in result
-possibly_embed_doc(_State,
- #view_row{id=reduced}=Row) ->
- Row;
-possibly_embed_doc(_State,
- #view_row{value=undefined}=Row) ->
- Row;
-possibly_embed_doc(#collector{db_name=DbName, query_args=Args},
- #view_row{key=_Key, id=_Id, value=Value, doc=_Doc}=Row) ->
- #mrargs{include_docs=IncludeDocs} = Args,
- case IncludeDocs andalso is_tuple(Value) of
- true ->
- {Props} = Value,
- Rev0 = couch_util:get_value(<<"_rev">>, Props),
- case couch_util:get_value(<<"_id">>,Props) of
- null -> Row#view_row{doc=null};
- undefined -> Row;
- IncId ->
- % use separate process to call fabric:open_doc
- % to not interfere with current call
- {Pid, Ref} = spawn_monitor(fun() ->
- exit(
- case Rev0 of
- undefined ->
- case fabric:open_doc(DbName, IncId, []) of
- {ok, NewDoc} ->
- Row#view_row{doc=couch_doc:to_json_obj(NewDoc,[])};
- {not_found, _} ->
- Row#view_row{doc=null};
- Else ->
- Row#view_row{doc={error, Else}}
- end;
- Rev0 ->
- Rev = couch_doc:parse_rev(Rev0),
- case fabric:open_revs(DbName, IncId, [Rev], []) of
- {ok, [{ok, NewDoc}]} ->
- Row#view_row{doc=couch_doc:to_json_obj(NewDoc,[])};
- {ok, [{{not_found, _}, Rev}]} ->
- Row#view_row{doc=null};
- Else ->
- Row#view_row{doc={error, Else}}
- end
- end) end),
- receive {'DOWN',Ref,process,Pid, Resp} ->
- Resp
- end
- end;
- _ -> Row
- end.
-
-detach_partition(#view_row{key={p, _Partition, Key}} = Row) ->
- Row#view_row{key = Key};
-detach_partition(#view_row{} = Row) ->
- Row.
-
-keydict(undefined) ->
- undefined;
-keydict(Keys) ->
- {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end,
- {dict:new(),0}, Keys),
- Dict.
-
-%% internal %%
-
-get_next_row(#collector{rows = []}) ->
- throw(complete);
-get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined ->
- #collector{
- query_args = #mrargs{direction = Dir},
- keys = Keys,
- rows = RowDict,
- lang = Lang,
- counters = Counters0,
- collation = Collation
- } = St,
- {Key, RestKeys} = find_next_key(Keys, Dir, Collation, RowDict),
- case dict:find(Key, RowDict) of
- {ok, Records} ->
- NewRowDict = dict:erase(Key, RowDict),
- Counters = lists:foldl(fun(#view_row{worker={Worker,From}}, CntrsAcc) ->
- case From of
- {Pid, _} when is_pid(Pid) ->
- gen_server:reply(From, ok);
- Pid when is_pid(Pid) ->
- rexi:stream_ack(From)
- end,
- fabric_dict:update_counter(Worker, -1, CntrsAcc)
- end, Counters0, Records),
- Wrapped = [[V] || #view_row{value=V} <- Records],
- {ok, [Reduced]} = couch_query_servers:rereduce(Lang, [RedSrc], Wrapped),
- {ok, Finalized} = couch_query_servers:finalize(RedSrc, Reduced),
- NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters},
- {#view_row{key=Key, id=reduced, value=Finalized}, NewSt};
- error ->
- get_next_row(St#collector{keys=RestKeys})
- end;
-get_next_row(State) ->
- #collector{rows = [Row|Rest], counters = Counters0} = State,
- {Worker, From} = Row#view_row.worker,
- rexi:stream_ack(From),
- Counters1 = fabric_dict:update_counter(Worker, -1, Counters0),
- {Row, State#collector{rows = Rest, counters=Counters1}}.
-
-%% TODO: rectify nil <-> undefined discrepancies
-find_next_key(nil, Dir, Collation, RowDict) ->
- find_next_key(undefined, Dir, Collation, RowDict);
-find_next_key(undefined, Dir, Collation, RowDict) ->
- CmpFun = fun(A, B) -> compare(Dir, Collation, A, B) end,
- case lists:sort(CmpFun, dict:fetch_keys(RowDict)) of
- [] ->
- throw(complete);
- [Key|_] ->
- {Key, nil}
- end;
-find_next_key([], _, _, _) ->
- throw(complete);
-find_next_key([Key|Rest], _, _, _) ->
- {Key, Rest}.
-
-transform_row(#view_row{value={[{reduce_overflow_error, Msg}]}}) ->
- {row, [{key,null}, {id,error}, {value,reduce_overflow_error}, {reason,Msg}]};
-transform_row(#view_row{key=Key, id=reduced, value=Value}) ->
- {row, [{key,Key}, {value,Value}]};
-transform_row(#view_row{key=Key, id=undefined}) ->
- {row, [{key,Key}, {id,error}, {value,not_found}]};
-transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) ->
- {row, [{id,Id}, {key,Key}, {value,Value}]};
-transform_row(#view_row{key=Key, id=_Id, value=_Value, doc={error,Reason}}) ->
- {row, [{id,error}, {key,Key}, {value,Reason}]};
-transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) ->
- {row, [{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}.
-
-compare(_, _, A, A) -> true;
-compare(fwd, <<"raw">>, A, B) -> A < B;
-compare(rev, <<"raw">>, A, B) -> B < A;
-compare(fwd, _, A, B) -> couch_ejson_compare:less_json(A, B);
-compare(rev, _, A, B) -> couch_ejson_compare:less_json(B, A).
-
-extract_view(Pid, ViewName, [], _ViewType) ->
- couch_log:error("missing_named_view ~p", [ViewName]),
- exit(Pid, kill),
- exit(missing_named_view);
-extract_view(Pid, ViewName, [View|Rest], ViewType) ->
- case lists:member(ViewName, view_names(View, ViewType)) of
- true ->
- if ViewType == reduce ->
- {index_of(ViewName, view_names(View, reduce)), View};
- true ->
- View
- end;
- false ->
- extract_view(Pid, ViewName, Rest, ViewType)
- end.
-
-view_names(View, Type) when Type == red_map; Type == reduce ->
- [Name || {Name, _} <- View#mrview.reduce_funs];
-view_names(View, map) ->
- View#mrview.map_names.
-
-index_of(X, List) ->
- index_of(X, List, 1).
-
-index_of(_X, [], _I) ->
- not_found;
-index_of(X, [X|_Rest], I) ->
- I;
-index_of(X, [_|Rest], I) ->
- index_of(X, Rest, I+1).
-
-get_shards(Db, #mrargs{} = Args) ->
- DbPartitioned = fabric_util:is_partitioned(Db),
- Partition = couch_mrview_util:get_extra(Args, partition),
- if DbPartitioned orelse Partition == undefined -> ok; true ->
- throw({bad_request, <<"partition specified on non-partitioned db">>})
- end,
- DbName = fabric:dbname(Db),
- % Decide which version of mem3:shards/1,2 or
- % mem3:ushards/1,2 to use for the current
- % request.
- case {Args#mrargs.stable, Partition} of
- {true, undefined} ->
- {mem3:ushards(DbName), []};
- {true, Partition} ->
- Shards = mem3:ushards(DbName, couch_partition:shard_key(Partition)),
- {Shards, [{any, Shards}]};
- {false, undefined} ->
- {mem3:shards(DbName), []};
- {false, Partition} ->
- Shards = mem3:shards(DbName, couch_partition:shard_key(Partition)),
- {Shards, [{any, Shards}]}
- end.
-
-maybe_update_others(DbName, DDoc, ShardsInvolved, ViewName,
- #mrargs{update=lazy} = Args) ->
- ShardsNeedUpdated = mem3:shards(DbName) -- ShardsInvolved,
- lists:foreach(fun(#shard{node=Node, name=ShardName}) ->
- rpc:cast(Node, fabric_rpc, update_mrview, [ShardName, DDoc, ViewName, Args])
- end, ShardsNeedUpdated);
-maybe_update_others(_DbName, _DDoc, _ShardsInvolved, _ViewName, _Args) ->
- ok.
-
-get_shard_replacements(DbName, UsedShards0) ->
- % We only want to generate a replacements list from shards
- % that aren't already used.
- AllLiveShards = mem3:live_shards(DbName, [node() | nodes()]),
- UsedShards = [S#shard{ref=undefined} || S <- UsedShards0],
- get_shard_replacements_int(AllLiveShards -- UsedShards, UsedShards).
-
-get_shard_replacements_int(UnusedShards, UsedShards) ->
- % If we have more than one copy of a range then we don't
- % want to try and add a replacement to any copy.
- RangeCounts = lists:foldl(fun(#shard{range=R}, Acc) ->
- dict:update_counter(R, 1, Acc)
- end, dict:new(), UsedShards),
-
- % For each seq shard range with a count of 1, find any
- % possible replacements from the unused shards. The
- % replacement list is keyed by range.
- lists:foldl(fun(#shard{range = [B, E] = Range}, Acc) ->
- case dict:find(Range, RangeCounts) of
- {ok, 1} ->
- Repls = mem3_util:non_overlapping_shards(UnusedShards, B, E),
- % Only keep non-empty lists of replacements
- if Repls == [] -> Acc; true ->
- [{Range, Repls} | Acc]
- end;
- _ ->
- Acc
- end
- end, [], UsedShards).
-
--spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs::#mrargs{}, WorkerArgs::#mrargs{}}.
-fix_skip_and_limit(#mrargs{} = Args) ->
- {CoordArgs, WorkerArgs} = case couch_mrview_util:get_extra(Args, partition) of
- undefined ->
- #mrargs{skip=Skip, limit=Limit}=Args,
- {Args, Args#mrargs{skip=0, limit=Skip+Limit}};
- _Partition ->
- {Args#mrargs{skip=0}, Args}
- end,
- %% the coordinator needs to finalize each row, so make sure the shards don't
- {CoordArgs, remove_finalizer(WorkerArgs)}.
-
-remove_finalizer(Args) ->
- couch_mrview_util:set_extra(Args, finalizer, null).
-
-
-remove_overlapping_shards_test() ->
- Cb = undefined,
-
- Shards = mk_cnts([[0, 10], [11, 20], [21, ?RING_END]], 3),
-
- % Simple (exact) overlap
- Shard1 = mk_shard("node-3", [11, 20]),
- Shards1 = fabric_dict:store(Shard1, nil, Shards),
- R1 = remove_overlapping_shards(Shard1, Shards1, Cb),
- ?assertEqual([{0, 10}, {11, 20}, {21, ?RING_END}],
- fabric_util:worker_ranges(R1)),
- ?assert(fabric_dict:is_key(Shard1, R1)),
-
- % Split overlap (shard overlap multiple workers)
- Shard2 = mk_shard("node-3", [0, 20]),
- Shards2 = fabric_dict:store(Shard2, nil, Shards),
- R2 = remove_overlapping_shards(Shard2, Shards2, Cb),
- ?assertEqual([{0, 20}, {21, ?RING_END}],
- fabric_util:worker_ranges(R2)),
- ?assert(fabric_dict:is_key(Shard2, R2)).
-
-
-get_shard_replacements_test() ->
- Unused = [mk_shard(N, [B, E]) || {N, B, E} <- [
- {"n1", 11, 20}, {"n1", 21, ?RING_END},
- {"n2", 0, 4}, {"n2", 5, 10}, {"n2", 11, 20},
- {"n3", 0, 21, ?RING_END}
- ]],
- Used = [mk_shard(N, [B, E]) || {N, B, E} <- [
- {"n2", 21, ?RING_END},
- {"n3", 0, 10}, {"n3", 11, 20}
- ]],
- Res = lists:sort(get_shard_replacements_int(Unused, Used)),
- % Notice that [0, 10] range can be replaced by spawning the [0, 4] and [5,
- % 10] workers on n1
- Expect = [
- {[0, 10], [mk_shard("n2", [0, 4]), mk_shard("n2", [5, 10])]},
- {[11, 20], [mk_shard("n1", [11, 20]), mk_shard("n2", [11, 20])]},
- {[21, ?RING_END], [mk_shard("n1", [21, ?RING_END])]}
- ],
- ?assertEqual(Expect, Res).
-
-
-mk_cnts(Ranges, NoNodes) ->
- orddict:from_list([{Shard,nil}
- || Shard <-
- lists:flatten(lists:map(
- fun(Range) ->
- mk_shards(NoNodes,Range,[])
- end, Ranges))]
- ).
-
-mk_shards(0,_Range,Shards) ->
- Shards;
-mk_shards(NoNodes,Range,Shards) ->
- Name ="node-" ++ integer_to_list(NoNodes),
- mk_shards(NoNodes-1,Range, [mk_shard(Name, Range) | Shards]).
-
-
-mk_shard(Name, Range) ->
- Node = list_to_atom(Name),
- BName = list_to_binary(Name),
- #shard{name = BName, node = Node, range = Range}.