summaryrefslogtreecommitdiff
path: root/src/couch_views/src/couch_views_reader.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_views/src/couch_views_reader.erl')
-rw-r--r--src/couch_views/src/couch_views_reader.erl346
1 files changed, 346 insertions, 0 deletions
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
new file mode 100644
index 000000000..3c5862749
--- /dev/null
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -0,0 +1,346 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_reader).
+
+-export([
+ read/6
+]).
+
+
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+
+read(Db, Mrst, ViewName, UserCallback, UserAcc, Args) ->
+ ReadFun = case Args of
+ #mrargs{view_type = map} -> fun read_map_view/6;
+ #mrargs{view_type = red} -> fun read_red_view/6
+ end,
+ ReadFun(Db, Mrst, ViewName, UserCallback, UserAcc, Args).
+
+
+read_map_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
+ try
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ #mrst{
+ language = Lang,
+ views = Views
+ } = Mrst = couch_views_trees:open(TxDb, Mrst0),
+
+ View = get_map_view(Lang, Args, ViewName, Views),
+ Fun = fun handle_map_row/4,
+
+ Meta = get_map_meta(TxDb, Mrst, View, Args),
+ UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+
+ Acc0 = #{
+ db => TxDb,
+ skip => Args#mrargs.skip,
+ limit => Args#mrargs.limit,
+ mrargs => undefined,
+ callback => UserCallback,
+ acc => UserAcc1
+ },
+
+ Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
+ Opts = mrargs_to_fdb_options(KeyArgs),
+ KeyAcc1 = KeyAcc0#{
+ mrargs := KeyArgs
+ },
+ couch_views_trees:fold_map_idx(TxDb, View, Opts, Fun, KeyAcc1)
+ end, Acc0, expand_keys_args(Args)),
+
+ #{
+ acc := UserAcc2
+ } = Acc1,
+ {ok, maybe_stop(UserCallback(complete, UserAcc2))}
+ end)
+ catch
+ throw:{complete, Out} ->
+ {_, Final} = UserCallback(complete, Out),
+ {ok, Final};
+ throw:{done, Out} ->
+ {ok, Out}
+ end.
+
+
+read_red_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
+ #mrst{
+ language = Lang,
+ views = Views
+ } = Mrst0,
+ {Idx, Lang, View0} = get_red_view(Lang, Args, ViewName, Views),
+ Mrst1 = Mrst0#mrst{views = [View0]},
+ ReadOpts = [{read_only, Idx}],
+ try
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ #mrst{
+ language = Lang,
+ views = [View1]
+ } = Mrst = couch_views_trees:open(TxDb, Mrst1, ReadOpts),
+
+ #mrargs{
+ extra = Extra
+ } = Args,
+
+ Fun = fun handle_red_row/3,
+
+ Meta = get_red_meta(TxDb, Mrst, View1, Args),
+ UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+
+ Finalizer = case couch_util:get_value(finalizer, Extra) of
+ undefined ->
+ {_, FunSrc} = lists:nth(Idx, View1#mrview.reduce_funs),
+ FunSrc;
+ CustomFun->
+ CustomFun
+ end,
+
+ Acc0 = #{
+ db => TxDb,
+ skip => Args#mrargs.skip,
+ limit => Args#mrargs.limit,
+ mrargs => undefined,
+ finalizer => Finalizer,
+ red_idx => Idx,
+ language => Lang,
+ callback => UserCallback,
+ acc => UserAcc1
+ },
+
+ Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
+ Opts = mrargs_to_fdb_options(KeyArgs),
+ KeyAcc1 = KeyAcc0#{
+ mrargs := KeyArgs
+ },
+ couch_views_trees:fold_red_idx(
+ TxDb,
+ View1,
+ Idx,
+ Opts,
+ Fun,
+ KeyAcc1
+ )
+ end, Acc0, expand_keys_args(Args)),
+
+ #{
+ acc := UserAcc2
+ } = Acc1,
+ {ok, maybe_stop(UserCallback(complete, UserAcc2))}
+ end)
+ catch
+ throw:{complete, Out} ->
+ {_, Final} = UserCallback(complete, Out),
+ {ok, Final};
+ throw:{done, Out} ->
+ {ok, Out}
+ end.
+
+
+get_map_meta(TxDb, Mrst, View, #mrargs{update_seq = true}) ->
+ TotalRows = couch_views_trees:get_row_count(TxDb, View),
+ ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
+ {meta, [{update_seq, ViewSeq}, {total, TotalRows}, {offset, null}]};
+
+get_map_meta(TxDb, _Mrst, View, #mrargs{}) ->
+ TotalRows = couch_views_trees:get_row_count(TxDb, View),
+ {meta, [{total, TotalRows}, {offset, null}]}.
+
+
+get_red_meta(TxDb, Mrst, _View, #mrargs{update_seq = true}) ->
+ ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
+ {meta, [{update_seq, ViewSeq}]};
+
+get_red_meta(_TxDb, _Mrst, _View, #mrargs{}) ->
+ {meta, []}.
+
+
+handle_map_row(_DocId, _Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
+ Acc#{skip := Skip - 1};
+
+handle_map_row(_DocID, _Key, _Value, #{limit := 0, acc := UserAcc}) ->
+ throw({complete, UserAcc});
+
+handle_map_row(DocId, Key, Value, Acc) ->
+ #{
+ db := TxDb,
+ limit := Limit,
+ mrargs := Args,
+ callback := UserCallback,
+ acc := UserAcc0
+ } = Acc,
+
+ BaseRow = [
+ {id, DocId},
+ {key, Key},
+ {value, Value}
+ ],
+
+ Row = BaseRow ++ if not Args#mrargs.include_docs -> []; true ->
+ DocOpts0 = Args#mrargs.doc_options,
+ DocOpts1 = DocOpts0 ++ case Args#mrargs.conflicts of
+ true -> [conflicts];
+ _ -> []
+ end,
+
+ {TargetDocId, Rev} = get_doc_id(DocId, Value),
+ DocObj = load_doc(TxDb, TargetDocId, Rev, DocOpts1),
+ [{doc, DocObj}]
+ end,
+
+ UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+ Acc#{limit := Limit - 1, acc := UserAcc1}.
+
+
+handle_red_row(_Key, _Red, #{skip := Skip} = Acc) when Skip > 0 ->
+ Acc#{skip := Skip - 1};
+
+handle_red_row(_Key, _Value, #{limit := 0, acc := UserAcc}) ->
+ throw({complete, UserAcc});
+
+handle_red_row(Key0, Value0, Acc) ->
+ #{
+ limit := Limit,
+ finalizer := Finalizer,
+ callback := UserCallback,
+ acc := UserAcc0
+ } = Acc,
+
+ Key1 = case Key0 of
+ undefined -> null;
+ _ -> Key0
+ end,
+ Value1 = maybe_finalize(Finalizer, Value0),
+ Row = [{key, Key1}, {value, Value1}],
+
+ UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+ Acc#{limit := Limit - 1, acc := UserAcc1}.
+
+
+maybe_finalize(null, Red) ->
+ Red;
+maybe_finalize(Finalizer, Red) ->
+ {ok, Finalized} = couch_query_servers:finalize(Finalizer, Red),
+ Finalized.
+
+
+get_map_view(Lang, Args, ViewName, Views) ->
+ case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
+ {map, View, _Args} -> View;
+ {red, {_Idx, _Lang, View}, _} -> View
+ end.
+
+
+get_red_view(Lang, Args, ViewName, Views) ->
+ case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
+ {red, {Idx, Lang, View}, _} -> {Idx, Lang, View};
+ _ -> throw({not_found, missing_named_view})
+ end.
+
+
+expand_keys_args(#mrargs{keys = undefined} = Args) ->
+ [Args];
+
+expand_keys_args(#mrargs{keys = Keys} = Args) ->
+ lists:map(fun(Key) ->
+ Args#mrargs{
+ start_key = Key,
+ end_key = Key
+ }
+ end, Keys).
+
+
+mrargs_to_fdb_options(Args) ->
+ #mrargs{
+ view_type = ViewType,
+ start_key = StartKey,
+ start_key_docid = StartKeyDocId,
+ end_key = EndKey,
+ end_key_docid = EndKeyDocId0,
+ direction = Direction,
+ inclusive_end = InclusiveEnd,
+ group_level = GroupLevel
+ } = Args,
+
+ StartKeyOpts = if StartKey == undefined -> []; true ->
+ [{start_key, {StartKey, StartKeyDocId}}]
+ end,
+
+ EndKeyDocId = case {Direction, EndKeyDocId0} of
+ {fwd, <<255>>} when InclusiveEnd -> <<255>>;
+ {fwd, <<255>>} when not InclusiveEnd -> <<>>;
+ {rev, <<>>} when InclusiveEnd -> <<>>;
+ {rev, <<>>} when not InclusiveEnd -> <<255>>;
+ _ -> EndKeyDocId0
+ end,
+
+ EndKeyOpts = if EndKey == undefined -> []; true ->
+ [{end_key, {EndKey, EndKeyDocId}}]
+ end,
+
+ GroupFunOpt = make_group_key_fun(ViewType, GroupLevel),
+
+ [
+ {dir, Direction},
+ {inclusive_end, InclusiveEnd}
+ ] ++ StartKeyOpts ++ EndKeyOpts ++ GroupFunOpt.
+
+
+make_group_key_fun(map, _) ->
+ [];
+
+make_group_key_fun(red, exact) ->
+ [
+ {group_key_fun, fun({Key, _DocId}) -> Key end}
+ ];
+
+make_group_key_fun(red, 0) ->
+ [
+ {group_key_fun, group_all}
+ ];
+
+make_group_key_fun(red, N) when is_integer(N), N > 0 ->
+ GKFun = fun
+ ({Key, _DocId}) when is_list(Key) -> lists:sublist(Key, N);
+ ({Key, _DocId}) -> Key
+ end,
+ [{group_key_fun, GKFun}].
+
+
+maybe_stop({ok, Acc}) -> Acc;
+maybe_stop({stop, Acc}) -> throw({done, Acc}).
+
+
+get_doc_id(Id, {Props}) ->
+ DocId = couch_util:get_value(<<"_id">>, Props, Id),
+ Rev = couch_util:get_value(<<"_rev">>, Props, null),
+ {DocId, Rev};
+
+get_doc_id(Id, _Value) ->
+ {Id, null}.
+
+
+load_doc(TxDb, Id, null, DocOpts) ->
+ case fabric2_db:open_doc(TxDb, Id, DocOpts) of
+ {ok, Doc} -> couch_doc:to_json_obj(Doc, DocOpts);
+ {not_found, _} -> null
+ end;
+
+load_doc(TxDb, Id, Rev, DocOpts) ->
+ Rev1 = couch_doc:parse_rev(Rev),
+ case fabric2_db:open_doc_revs(TxDb, Id, [Rev1], DocOpts) of
+ {ok, [{ok, Doc}]} -> couch_doc:to_json_obj(Doc, DocOpts);
+ {ok, [_Else]} -> null
+ end.