diff options
Diffstat (limited to 'src/mango/src/mango_cursor_view.erl')
-rw-r--r-- | src/mango/src/mango_cursor_view.erl | 387 |
1 files changed, 82 insertions, 305 deletions
diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl index 1c4b3423e..21c402bda 100644 --- a/src/mango/src/mango_cursor_view.erl +++ b/src/mango/src/mango_cursor_view.erl @@ -19,9 +19,7 @@ ]). -export([ - view_cb/2, handle_message/2, - handle_all_docs_message/2, composite_indexes/2, choose_best_index/2 ]). @@ -44,7 +42,7 @@ create(Db, Indexes, Selector, Opts) -> Limit = couch_util:get_value(limit, Opts, mango_opts:default_limit()), Skip = couch_util:get_value(skip, Opts, 0), Fields = couch_util:get_value(fields, Opts, all_fields), - Bookmark = couch_util:get_value(bookmark, Opts), + Bookmark = couch_util:get_value(bookmark, Opts), {ok, #cursor{ db = Db, @@ -60,24 +58,23 @@ create(Db, Indexes, Selector, Opts) -> explain(Cursor) -> - #cursor{ - opts = Opts - } = Cursor, - - BaseArgs = base_args(Cursor), - Args = apply_opts(Opts, BaseArgs), - - [{mrargs, {[ - {include_docs, Args#mrargs.include_docs}, - {view_type, Args#mrargs.view_type}, - {reduce, Args#mrargs.reduce}, - {partition, couch_mrview_util:get_extra(Args, partition, null)}, - {start_key, maybe_replace_max_json(Args#mrargs.start_key)}, - {end_key, maybe_replace_max_json(Args#mrargs.end_key)}, - {direction, Args#mrargs.direction}, - {stable, Args#mrargs.stable}, - {update, Args#mrargs.update}, - {conflicts, Args#mrargs.conflicts} + #{ + start_key := StartKey, + end_key := EndKey, + dir := Direction + } = index_args(Cursor), + + [{args, {[ + {include_docs, true}, + {view_type, <<"fdb">>}, + {reduce, false}, + {partition, false}, + {start_key, maybe_replace_max_json(StartKey)}, + {end_key, maybe_replace_max_json(EndKey)}, + {direction, Direction}, + {stable, false}, + {update, true}, + {conflicts, false} ]}}]. @@ -99,18 +96,47 @@ maybe_replace_max_json([H | T] = EndKey) when is_list(EndKey) -> maybe_replace_max_json(EndKey) -> EndKey. -base_args(#cursor{index = Idx, selector = Selector} = Cursor) -> - #mrargs{ - view_type = map, - reduce = false, - start_key = mango_idx:start_key(Idx, Cursor#cursor.ranges), - end_key = mango_idx:end_key(Idx, Cursor#cursor.ranges), - include_docs = true, - extra = [{callback, {?MODULE, view_cb}}, {selector, Selector}] - }. +%% TODO: When supported, handle: +%% partitions +%% conflicts +index_args(#cursor{} = Cursor) -> + #cursor{ + index = Idx, + opts = Opts, + bookmark = Bookmark + } = Cursor, + + Args0 = #{ + start_key => mango_idx:start_key(Idx, Cursor#cursor.ranges), + start_key_docid => <<>>, + end_key => mango_idx:end_key(Idx, Cursor#cursor.ranges), + end_key_docid => <<255>>, + skip => 0 + }, + + Sort = couch_util:get_value(sort, Opts, [<<"asc">>]), + Args1 = case mango_sort:directions(Sort) of + [<<"desc">> | _] -> + #{ + start_key := SK, + start_key_docid := SKDI, + end_key := EK, + end_key_docid := EKDI + } = Args0, + Args0#{ + dir => rev, + start_key => EK, + start_key_docid => EKDI, + end_key => SK, + end_key_docid => SKDI + }; + _ -> + Args0#{dir => fwd} + end, + mango_json_bookmark:update_args(Bookmark, Args1). -execute(#cursor{db = Db, index = Idx, execution_stats = Stats} = Cursor0, UserFun, UserAcc) -> +execute(#cursor{db = Db, execution_stats = Stats} = Cursor0, UserFun, UserAcc) -> Cursor = Cursor0#cursor{ user_fun = UserFun, user_acc = UserAcc, @@ -121,32 +147,22 @@ execute(#cursor{db = Db, index = Idx, execution_stats = Stats} = Cursor0, UserFu % empty indicates unsatisfiable ranges, so don't perform search {ok, UserAcc}; _ -> - BaseArgs = base_args(Cursor), - #cursor{opts = Opts, bookmark = Bookmark} = Cursor, - Args0 = apply_opts(Opts, BaseArgs), - Args = mango_json_bookmark:update_args(Bookmark, Args0), - UserCtx = couch_util:get_value(user_ctx, Opts, #user_ctx{}), - DbOpts = [{user_ctx, UserCtx}], - Result = case mango_idx:def(Idx) of - all_docs -> - CB = fun ?MODULE:handle_all_docs_message/2, - fabric:all_docs(Db, DbOpts, CB, Cursor, Args); - _ -> - CB = fun ?MODULE:handle_message/2, - % Normal view - DDoc = ddocid(Idx), - Name = mango_idx:name(Idx), - fabric:query_view(Db, DbOpts, DDoc, Name, CB, Cursor, Args) - end, + Args = index_args(Cursor), + CB = fun ?MODULE:handle_message/2, + Result = mango_fdb:query(Db, CB, Cursor, Args), case Result of {ok, LastCursor} -> NewBookmark = mango_json_bookmark:create(LastCursor), Arg = {add_key, bookmark, NewBookmark}, - {_Go, FinalUserAcc} = UserFun(Arg, LastCursor#cursor.user_acc), - Stats0 = LastCursor#cursor.execution_stats, - FinalUserAcc0 = mango_execution_stats:maybe_add_stats(Opts, UserFun, Stats0, FinalUserAcc), - FinalUserAcc1 = mango_cursor:maybe_add_warning(UserFun, Cursor, FinalUserAcc0), - {ok, FinalUserAcc1}; + #cursor{ + opts = Opts, + execution_stats = Stats0, + user_acc = FinalUserAcc0 + } = LastCursor, + {_Go, FinalUserAcc1} = UserFun(Arg, FinalUserAcc0), + FinalUserAcc2 = mango_execution_stats:maybe_add_stats(Opts, UserFun, Stats0, FinalUserAcc1), + FinalUserAcc3 = mango_cursor:maybe_add_warning(UserFun, Cursor, FinalUserAcc2), + {ok, FinalUserAcc3}; {error, Reason} -> {error, Reason} end @@ -217,80 +233,16 @@ choose_best_index(_DbName, IndexRanges) -> {SelectedIndex, SelectedIndexRanges}. -view_cb({meta, Meta}, Acc) -> - % Map function starting - put(mango_docs_examined, 0), - set_mango_msg_timestamp(), - ok = rexi:stream2({meta, Meta}), - {ok, Acc}; -view_cb({row, Row}, #mrargs{extra = Options} = Acc) -> - ViewRow = #view_row{ - id = couch_util:get_value(id, Row), - key = couch_util:get_value(key, Row), - doc = couch_util:get_value(doc, Row) - }, - case ViewRow#view_row.doc of - null -> - put(mango_docs_examined, get(mango_docs_examined) + 1), - maybe_send_mango_ping(); - undefined -> - ViewRow2 = ViewRow#view_row{ - value = couch_util:get_value(value, Row) - }, - ok = rexi:stream2(ViewRow2), - put(mango_docs_examined, 0), - set_mango_msg_timestamp(); - Doc -> - Selector = couch_util:get_value(selector, Options), - case mango_selector:match(Selector, Doc) of - true -> - ViewRow2 = ViewRow#view_row{ - value = get(mango_docs_examined) + 1 - }, - ok = rexi:stream2(ViewRow2), - put(mango_docs_examined, 0), - set_mango_msg_timestamp(); - false -> - put(mango_docs_examined, get(mango_docs_examined) + 1), - maybe_send_mango_ping() - end - end, - {ok, Acc}; -view_cb(complete, Acc) -> - % Finish view output - ok = rexi:stream_last(complete), - {ok, Acc}; -view_cb(ok, ddoc_updated) -> - rexi:reply({ok, ddoc_updated}). - - -maybe_send_mango_ping() -> - Current = os:timestamp(), - LastPing = get(mango_last_msg_timestamp), - % Fabric will timeout if it has not heard a response from a worker node - % after 5 seconds. Send a ping every 4 seconds so the timeout doesn't happen. - case timer:now_diff(Current, LastPing) > ?HEARTBEAT_INTERVAL_IN_USEC of - false -> - ok; - true -> - rexi:ping(), - set_mango_msg_timestamp() - end. - - -set_mango_msg_timestamp() -> - put(mango_last_msg_timestamp, os:timestamp()). - - handle_message({meta, _}, Cursor) -> {ok, Cursor}; -handle_message({row, Props}, Cursor) -> - case doc_member(Cursor, Props) of +handle_message({doc, Key, Doc}, Cursor) -> + case match_doc(Cursor, Doc) of {ok, Doc, {execution_stats, ExecutionStats1}} -> Cursor1 = Cursor#cursor { execution_stats = ExecutionStats1 }, - Cursor2 = update_bookmark_keys(Cursor1, Props), + {Props} = Doc, + Cursor2 = update_bookmark_keys(Cursor1, {Key, Props}), FinalDoc = mango_fields:extract(Doc, Cursor2#cursor.fields), handle_doc(Cursor2, FinalDoc); {no_match, _, {execution_stats, ExecutionStats1}} -> @@ -308,15 +260,6 @@ handle_message({error, Reason}, _Cursor) -> {error, Reason}. -handle_all_docs_message({row, Props}, Cursor) -> - case is_design_doc(Props) of - true -> {ok, Cursor}; - false -> handle_message({row, Props}, Cursor) - end; -handle_all_docs_message(Message, Cursor) -> - handle_message(Message, Cursor). - - handle_doc(#cursor{skip = S} = C, _) when S > 0 -> {ok, C#cursor{skip = S - 1}}; handle_doc(#cursor{limit = L, execution_stats = Stats} = C, Doc) when L > 0 -> @@ -332,191 +275,25 @@ handle_doc(C, _Doc) -> {stop, C}. -ddocid(Idx) -> - case mango_idx:ddoc(Idx) of - <<"_design/", Rest/binary>> -> - Rest; - Else -> - Else - end. - - -apply_opts([], Args) -> - Args; -apply_opts([{r, RStr} | Rest], Args) -> - IncludeDocs = case list_to_integer(RStr) of - 1 -> - true; - R when R > 1 -> - % We don't load the doc in the view query because - % we have to do a quorum read in the coordinator - % so there's no point. - false - end, - NewArgs = Args#mrargs{include_docs = IncludeDocs}, - apply_opts(Rest, NewArgs); -apply_opts([{conflicts, true} | Rest], Args) -> - NewArgs = Args#mrargs{conflicts = true}, - apply_opts(Rest, NewArgs); -apply_opts([{conflicts, false} | Rest], Args) -> - % Ignored cause default - apply_opts(Rest, Args); -apply_opts([{sort, Sort} | Rest], Args) -> - % We only support single direction sorts - % so nothing fancy here. - case mango_sort:directions(Sort) of - [] -> - apply_opts(Rest, Args); - [<<"asc">> | _] -> - apply_opts(Rest, Args); - [<<"desc">> | _] -> - SK = Args#mrargs.start_key, - SKDI = Args#mrargs.start_key_docid, - EK = Args#mrargs.end_key, - EKDI = Args#mrargs.end_key_docid, - NewArgs = Args#mrargs{ - direction = rev, - start_key = EK, - start_key_docid = EKDI, - end_key = SK, - end_key_docid = SKDI - }, - apply_opts(Rest, NewArgs) - end; -apply_opts([{stale, ok} | Rest], Args) -> - NewArgs = Args#mrargs{ - stable = true, - update = false - }, - apply_opts(Rest, NewArgs); -apply_opts([{stable, true} | Rest], Args) -> - NewArgs = Args#mrargs{ - stable = true - }, - apply_opts(Rest, NewArgs); -apply_opts([{update, false} | Rest], Args) -> - NewArgs = Args#mrargs{ - update = false - }, - apply_opts(Rest, NewArgs); -apply_opts([{partition, <<>>} | Rest], Args) -> - apply_opts(Rest, Args); -apply_opts([{partition, Partition} | Rest], Args) when is_binary(Partition) -> - NewArgs = couch_mrview_util:set_extra(Args, partition, Partition), - apply_opts(Rest, NewArgs); -apply_opts([{_, _} | Rest], Args) -> - % Ignore unknown options - apply_opts(Rest, Args). - - -doc_member(Cursor, RowProps) -> - Db = Cursor#cursor.db, - Opts = Cursor#cursor.opts, - ExecutionStats = Cursor#cursor.execution_stats, +match_doc(Cursor, Doc) -> + ExecStats = Cursor#cursor.execution_stats, Selector = Cursor#cursor.selector, - {Matched, Incr} = case couch_util:get_value(value, RowProps) of - N when is_integer(N) -> {true, N}; - _ -> {false, 1} - end, - case couch_util:get_value(doc, RowProps) of - {DocProps} -> - ExecutionStats1 = mango_execution_stats:incr_docs_examined(ExecutionStats, Incr), - case Matched of - true -> - {ok, {DocProps}, {execution_stats, ExecutionStats1}}; - false -> - match_doc(Selector, {DocProps}, ExecutionStats1) - end; - undefined -> - ExecutionStats1 = mango_execution_stats:incr_quorum_docs_examined(ExecutionStats), - Id = couch_util:get_value(id, RowProps), - case mango_util:defer(fabric, open_doc, [Db, Id, Opts]) of - {ok, #doc{}=DocProps} -> - Doc = couch_doc:to_json_obj(DocProps, []), - match_doc(Selector, Doc, ExecutionStats1); - Else -> - Else - end; - null -> - ExecutionStats1 = mango_execution_stats:incr_docs_examined(ExecutionStats), - {no_match, null, {execution_stats, ExecutionStats1}} - end. - + ExecStats1 = mango_execution_stats:incr_docs_examined(ExecStats, 1), -match_doc(Selector, Doc, ExecutionStats) -> case mango_selector:match(Selector, Doc) of true -> - {ok, Doc, {execution_stats, ExecutionStats}}; + {ok, Doc, {execution_stats, ExecStats1}}; false -> - {no_match, Doc, {execution_stats, ExecutionStats}} - end. - - -is_design_doc(RowProps) -> - case couch_util:get_value(id, RowProps) of - <<"_design/", _/binary>> -> true; - _ -> false + {no_match, Doc, {execution_stats, ExecStats1}} end. -update_bookmark_keys(#cursor{limit = Limit} = Cursor, Props) when Limit > 0 -> - Id = couch_util:get_value(id, Props), - Key = couch_util:get_value(key, Props), - Cursor#cursor { +update_bookmark_keys(#cursor{limit = Limit} = Cursor, {Key, Props}) + when Limit > 0 -> + Id = couch_util:get_value(<<"_id">>, Props), + Cursor#cursor { bookmark_docid = Id, bookmark_key = Key }; update_bookmark_keys(Cursor, _Props) -> Cursor. - - -%%%%%%%% module tests below %%%%%%%% - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). - -runs_match_on_doc_with_no_value_test() -> - Cursor = #cursor { - db = <<"db">>, - opts = [], - execution_stats = #execution_stats{}, - selector = mango_selector:normalize({[{<<"user_id">>, <<"1234">>}]}) - }, - RowProps = [ - {id,<<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>}, - {key,<<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>}, - {doc,{ - [ - {<<"_id">>,<<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>}, - {<<"_rev">>,<<"1-a954fe2308f14307756067b0e18c2968">>}, - {<<"user_id">>,11} - ] - }} - ], - {Match, _, _} = doc_member(Cursor, RowProps), - ?assertEqual(Match, no_match). - -does_not_run_match_on_doc_with_value_test() -> - Cursor = #cursor { - db = <<"db">>, - opts = [], - execution_stats = #execution_stats{}, - selector = mango_selector:normalize({[{<<"user_id">>, <<"1234">>}]}) - }, - RowProps = [ - {id,<<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>}, - {key,<<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>}, - {value,1}, - {doc,{ - [ - {<<"_id">>,<<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>}, - {<<"_rev">>,<<"1-a954fe2308f14307756067b0e18c2968">>}, - {<<"user_id">>,11} - ] - }} - ], - {Match, _, _} = doc_member(Cursor, RowProps), - ?assertEqual(Match, ok). - - --endif. |