summaryrefslogtreecommitdiff
path: root/src/mango/src/mango_cursor_view.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mango/src/mango_cursor_view.erl')
-rw-r--r--src/mango/src/mango_cursor_view.erl387
1 files changed, 82 insertions, 305 deletions
diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl
index 1c4b3423e..21c402bda 100644
--- a/src/mango/src/mango_cursor_view.erl
+++ b/src/mango/src/mango_cursor_view.erl
@@ -19,9 +19,7 @@
]).
-export([
- view_cb/2,
handle_message/2,
- handle_all_docs_message/2,
composite_indexes/2,
choose_best_index/2
]).
@@ -44,7 +42,7 @@ create(Db, Indexes, Selector, Opts) ->
Limit = couch_util:get_value(limit, Opts, mango_opts:default_limit()),
Skip = couch_util:get_value(skip, Opts, 0),
Fields = couch_util:get_value(fields, Opts, all_fields),
- Bookmark = couch_util:get_value(bookmark, Opts),
+ Bookmark = couch_util:get_value(bookmark, Opts),
{ok, #cursor{
db = Db,
@@ -60,24 +58,23 @@ create(Db, Indexes, Selector, Opts) ->
explain(Cursor) ->
- #cursor{
- opts = Opts
- } = Cursor,
-
- BaseArgs = base_args(Cursor),
- Args = apply_opts(Opts, BaseArgs),
-
- [{mrargs, {[
- {include_docs, Args#mrargs.include_docs},
- {view_type, Args#mrargs.view_type},
- {reduce, Args#mrargs.reduce},
- {partition, couch_mrview_util:get_extra(Args, partition, null)},
- {start_key, maybe_replace_max_json(Args#mrargs.start_key)},
- {end_key, maybe_replace_max_json(Args#mrargs.end_key)},
- {direction, Args#mrargs.direction},
- {stable, Args#mrargs.stable},
- {update, Args#mrargs.update},
- {conflicts, Args#mrargs.conflicts}
+ #{
+ start_key := StartKey,
+ end_key := EndKey,
+ dir := Direction
+ } = index_args(Cursor),
+
+ [{args, {[
+ {include_docs, true},
+ {view_type, <<"fdb">>},
+ {reduce, false},
+ {partition, false},
+ {start_key, maybe_replace_max_json(StartKey)},
+ {end_key, maybe_replace_max_json(EndKey)},
+ {direction, Direction},
+ {stable, false},
+ {update, true},
+ {conflicts, false}
]}}].
@@ -99,18 +96,47 @@ maybe_replace_max_json([H | T] = EndKey) when is_list(EndKey) ->
maybe_replace_max_json(EndKey) ->
EndKey.
-base_args(#cursor{index = Idx, selector = Selector} = Cursor) ->
- #mrargs{
- view_type = map,
- reduce = false,
- start_key = mango_idx:start_key(Idx, Cursor#cursor.ranges),
- end_key = mango_idx:end_key(Idx, Cursor#cursor.ranges),
- include_docs = true,
- extra = [{callback, {?MODULE, view_cb}}, {selector, Selector}]
- }.
+%% TODO: When supported, handle:
+%% partitions
+%% conflicts
+index_args(#cursor{} = Cursor) ->
+ #cursor{
+ index = Idx,
+ opts = Opts,
+ bookmark = Bookmark
+ } = Cursor,
+
+ Args0 = #{
+ start_key => mango_idx:start_key(Idx, Cursor#cursor.ranges),
+ start_key_docid => <<>>,
+ end_key => mango_idx:end_key(Idx, Cursor#cursor.ranges),
+ end_key_docid => <<255>>,
+ skip => 0
+ },
+
+ Sort = couch_util:get_value(sort, Opts, [<<"asc">>]),
+ Args1 = case mango_sort:directions(Sort) of
+ [<<"desc">> | _] ->
+ #{
+ start_key := SK,
+ start_key_docid := SKDI,
+ end_key := EK,
+ end_key_docid := EKDI
+ } = Args0,
+ Args0#{
+ dir => rev,
+ start_key => EK,
+ start_key_docid => EKDI,
+ end_key => SK,
+ end_key_docid => SKDI
+ };
+ _ ->
+ Args0#{dir => fwd}
+ end,
+ mango_json_bookmark:update_args(Bookmark, Args1).
-execute(#cursor{db = Db, index = Idx, execution_stats = Stats} = Cursor0, UserFun, UserAcc) ->
+execute(#cursor{db = Db, execution_stats = Stats} = Cursor0, UserFun, UserAcc) ->
Cursor = Cursor0#cursor{
user_fun = UserFun,
user_acc = UserAcc,
@@ -121,32 +147,22 @@ execute(#cursor{db = Db, index = Idx, execution_stats = Stats} = Cursor0, UserFu
% empty indicates unsatisfiable ranges, so don't perform search
{ok, UserAcc};
_ ->
- BaseArgs = base_args(Cursor),
- #cursor{opts = Opts, bookmark = Bookmark} = Cursor,
- Args0 = apply_opts(Opts, BaseArgs),
- Args = mango_json_bookmark:update_args(Bookmark, Args0),
- UserCtx = couch_util:get_value(user_ctx, Opts, #user_ctx{}),
- DbOpts = [{user_ctx, UserCtx}],
- Result = case mango_idx:def(Idx) of
- all_docs ->
- CB = fun ?MODULE:handle_all_docs_message/2,
- fabric:all_docs(Db, DbOpts, CB, Cursor, Args);
- _ ->
- CB = fun ?MODULE:handle_message/2,
- % Normal view
- DDoc = ddocid(Idx),
- Name = mango_idx:name(Idx),
- fabric:query_view(Db, DbOpts, DDoc, Name, CB, Cursor, Args)
- end,
+ Args = index_args(Cursor),
+ CB = fun ?MODULE:handle_message/2,
+ Result = mango_fdb:query(Db, CB, Cursor, Args),
case Result of
{ok, LastCursor} ->
NewBookmark = mango_json_bookmark:create(LastCursor),
Arg = {add_key, bookmark, NewBookmark},
- {_Go, FinalUserAcc} = UserFun(Arg, LastCursor#cursor.user_acc),
- Stats0 = LastCursor#cursor.execution_stats,
- FinalUserAcc0 = mango_execution_stats:maybe_add_stats(Opts, UserFun, Stats0, FinalUserAcc),
- FinalUserAcc1 = mango_cursor:maybe_add_warning(UserFun, Cursor, FinalUserAcc0),
- {ok, FinalUserAcc1};
+ #cursor{
+ opts = Opts,
+ execution_stats = Stats0,
+ user_acc = FinalUserAcc0
+ } = LastCursor,
+ {_Go, FinalUserAcc1} = UserFun(Arg, FinalUserAcc0),
+ FinalUserAcc2 = mango_execution_stats:maybe_add_stats(Opts, UserFun, Stats0, FinalUserAcc1),
+ FinalUserAcc3 = mango_cursor:maybe_add_warning(UserFun, Cursor, FinalUserAcc2),
+ {ok, FinalUserAcc3};
{error, Reason} ->
{error, Reason}
end
@@ -217,80 +233,16 @@ choose_best_index(_DbName, IndexRanges) ->
{SelectedIndex, SelectedIndexRanges}.
-view_cb({meta, Meta}, Acc) ->
- % Map function starting
- put(mango_docs_examined, 0),
- set_mango_msg_timestamp(),
- ok = rexi:stream2({meta, Meta}),
- {ok, Acc};
-view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
- ViewRow = #view_row{
- id = couch_util:get_value(id, Row),
- key = couch_util:get_value(key, Row),
- doc = couch_util:get_value(doc, Row)
- },
- case ViewRow#view_row.doc of
- null ->
- put(mango_docs_examined, get(mango_docs_examined) + 1),
- maybe_send_mango_ping();
- undefined ->
- ViewRow2 = ViewRow#view_row{
- value = couch_util:get_value(value, Row)
- },
- ok = rexi:stream2(ViewRow2),
- put(mango_docs_examined, 0),
- set_mango_msg_timestamp();
- Doc ->
- Selector = couch_util:get_value(selector, Options),
- case mango_selector:match(Selector, Doc) of
- true ->
- ViewRow2 = ViewRow#view_row{
- value = get(mango_docs_examined) + 1
- },
- ok = rexi:stream2(ViewRow2),
- put(mango_docs_examined, 0),
- set_mango_msg_timestamp();
- false ->
- put(mango_docs_examined, get(mango_docs_examined) + 1),
- maybe_send_mango_ping()
- end
- end,
- {ok, Acc};
-view_cb(complete, Acc) ->
- % Finish view output
- ok = rexi:stream_last(complete),
- {ok, Acc};
-view_cb(ok, ddoc_updated) ->
- rexi:reply({ok, ddoc_updated}).
-
-
-maybe_send_mango_ping() ->
- Current = os:timestamp(),
- LastPing = get(mango_last_msg_timestamp),
- % Fabric will timeout if it has not heard a response from a worker node
- % after 5 seconds. Send a ping every 4 seconds so the timeout doesn't happen.
- case timer:now_diff(Current, LastPing) > ?HEARTBEAT_INTERVAL_IN_USEC of
- false ->
- ok;
- true ->
- rexi:ping(),
- set_mango_msg_timestamp()
- end.
-
-
-set_mango_msg_timestamp() ->
- put(mango_last_msg_timestamp, os:timestamp()).
-
-
handle_message({meta, _}, Cursor) ->
{ok, Cursor};
-handle_message({row, Props}, Cursor) ->
- case doc_member(Cursor, Props) of
+handle_message({doc, Key, Doc}, Cursor) ->
+ case match_doc(Cursor, Doc) of
{ok, Doc, {execution_stats, ExecutionStats1}} ->
Cursor1 = Cursor#cursor {
execution_stats = ExecutionStats1
},
- Cursor2 = update_bookmark_keys(Cursor1, Props),
+ {Props} = Doc,
+ Cursor2 = update_bookmark_keys(Cursor1, {Key, Props}),
FinalDoc = mango_fields:extract(Doc, Cursor2#cursor.fields),
handle_doc(Cursor2, FinalDoc);
{no_match, _, {execution_stats, ExecutionStats1}} ->
@@ -308,15 +260,6 @@ handle_message({error, Reason}, _Cursor) ->
{error, Reason}.
-handle_all_docs_message({row, Props}, Cursor) ->
- case is_design_doc(Props) of
- true -> {ok, Cursor};
- false -> handle_message({row, Props}, Cursor)
- end;
-handle_all_docs_message(Message, Cursor) ->
- handle_message(Message, Cursor).
-
-
handle_doc(#cursor{skip = S} = C, _) when S > 0 ->
{ok, C#cursor{skip = S - 1}};
handle_doc(#cursor{limit = L, execution_stats = Stats} = C, Doc) when L > 0 ->
@@ -332,191 +275,25 @@ handle_doc(C, _Doc) ->
{stop, C}.
-ddocid(Idx) ->
- case mango_idx:ddoc(Idx) of
- <<"_design/", Rest/binary>> ->
- Rest;
- Else ->
- Else
- end.
-
-
-apply_opts([], Args) ->
- Args;
-apply_opts([{r, RStr} | Rest], Args) ->
- IncludeDocs = case list_to_integer(RStr) of
- 1 ->
- true;
- R when R > 1 ->
- % We don't load the doc in the view query because
- % we have to do a quorum read in the coordinator
- % so there's no point.
- false
- end,
- NewArgs = Args#mrargs{include_docs = IncludeDocs},
- apply_opts(Rest, NewArgs);
-apply_opts([{conflicts, true} | Rest], Args) ->
- NewArgs = Args#mrargs{conflicts = true},
- apply_opts(Rest, NewArgs);
-apply_opts([{conflicts, false} | Rest], Args) ->
- % Ignored cause default
- apply_opts(Rest, Args);
-apply_opts([{sort, Sort} | Rest], Args) ->
- % We only support single direction sorts
- % so nothing fancy here.
- case mango_sort:directions(Sort) of
- [] ->
- apply_opts(Rest, Args);
- [<<"asc">> | _] ->
- apply_opts(Rest, Args);
- [<<"desc">> | _] ->
- SK = Args#mrargs.start_key,
- SKDI = Args#mrargs.start_key_docid,
- EK = Args#mrargs.end_key,
- EKDI = Args#mrargs.end_key_docid,
- NewArgs = Args#mrargs{
- direction = rev,
- start_key = EK,
- start_key_docid = EKDI,
- end_key = SK,
- end_key_docid = SKDI
- },
- apply_opts(Rest, NewArgs)
- end;
-apply_opts([{stale, ok} | Rest], Args) ->
- NewArgs = Args#mrargs{
- stable = true,
- update = false
- },
- apply_opts(Rest, NewArgs);
-apply_opts([{stable, true} | Rest], Args) ->
- NewArgs = Args#mrargs{
- stable = true
- },
- apply_opts(Rest, NewArgs);
-apply_opts([{update, false} | Rest], Args) ->
- NewArgs = Args#mrargs{
- update = false
- },
- apply_opts(Rest, NewArgs);
-apply_opts([{partition, <<>>} | Rest], Args) ->
- apply_opts(Rest, Args);
-apply_opts([{partition, Partition} | Rest], Args) when is_binary(Partition) ->
- NewArgs = couch_mrview_util:set_extra(Args, partition, Partition),
- apply_opts(Rest, NewArgs);
-apply_opts([{_, _} | Rest], Args) ->
- % Ignore unknown options
- apply_opts(Rest, Args).
-
-
-doc_member(Cursor, RowProps) ->
- Db = Cursor#cursor.db,
- Opts = Cursor#cursor.opts,
- ExecutionStats = Cursor#cursor.execution_stats,
+match_doc(Cursor, Doc) ->
+ ExecStats = Cursor#cursor.execution_stats,
Selector = Cursor#cursor.selector,
- {Matched, Incr} = case couch_util:get_value(value, RowProps) of
- N when is_integer(N) -> {true, N};
- _ -> {false, 1}
- end,
- case couch_util:get_value(doc, RowProps) of
- {DocProps} ->
- ExecutionStats1 = mango_execution_stats:incr_docs_examined(ExecutionStats, Incr),
- case Matched of
- true ->
- {ok, {DocProps}, {execution_stats, ExecutionStats1}};
- false ->
- match_doc(Selector, {DocProps}, ExecutionStats1)
- end;
- undefined ->
- ExecutionStats1 = mango_execution_stats:incr_quorum_docs_examined(ExecutionStats),
- Id = couch_util:get_value(id, RowProps),
- case mango_util:defer(fabric, open_doc, [Db, Id, Opts]) of
- {ok, #doc{}=DocProps} ->
- Doc = couch_doc:to_json_obj(DocProps, []),
- match_doc(Selector, Doc, ExecutionStats1);
- Else ->
- Else
- end;
- null ->
- ExecutionStats1 = mango_execution_stats:incr_docs_examined(ExecutionStats),
- {no_match, null, {execution_stats, ExecutionStats1}}
- end.
-
+ ExecStats1 = mango_execution_stats:incr_docs_examined(ExecStats, 1),
-match_doc(Selector, Doc, ExecutionStats) ->
case mango_selector:match(Selector, Doc) of
true ->
- {ok, Doc, {execution_stats, ExecutionStats}};
+ {ok, Doc, {execution_stats, ExecStats1}};
false ->
- {no_match, Doc, {execution_stats, ExecutionStats}}
- end.
-
-
-is_design_doc(RowProps) ->
- case couch_util:get_value(id, RowProps) of
- <<"_design/", _/binary>> -> true;
- _ -> false
+ {no_match, Doc, {execution_stats, ExecStats1}}
end.
-update_bookmark_keys(#cursor{limit = Limit} = Cursor, Props) when Limit > 0 ->
- Id = couch_util:get_value(id, Props),
- Key = couch_util:get_value(key, Props),
- Cursor#cursor {
+update_bookmark_keys(#cursor{limit = Limit} = Cursor, {Key, Props})
+ when Limit > 0 ->
+ Id = couch_util:get_value(<<"_id">>, Props),
+ Cursor#cursor {
bookmark_docid = Id,
bookmark_key = Key
};
update_bookmark_keys(Cursor, _Props) ->
Cursor.
-
-
-%%%%%%%% module tests below %%%%%%%%
-
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
-runs_match_on_doc_with_no_value_test() ->
- Cursor = #cursor {
- db = <<"db">>,
- opts = [],
- execution_stats = #execution_stats{},
- selector = mango_selector:normalize({[{<<"user_id">>, <<"1234">>}]})
- },
- RowProps = [
- {id,<<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
- {key,<<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
- {doc,{
- [
- {<<"_id">>,<<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
- {<<"_rev">>,<<"1-a954fe2308f14307756067b0e18c2968">>},
- {<<"user_id">>,11}
- ]
- }}
- ],
- {Match, _, _} = doc_member(Cursor, RowProps),
- ?assertEqual(Match, no_match).
-
-does_not_run_match_on_doc_with_value_test() ->
- Cursor = #cursor {
- db = <<"db">>,
- opts = [],
- execution_stats = #execution_stats{},
- selector = mango_selector:normalize({[{<<"user_id">>, <<"1234">>}]})
- },
- RowProps = [
- {id,<<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
- {key,<<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
- {value,1},
- {doc,{
- [
- {<<"_id">>,<<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
- {<<"_rev">>,<<"1-a954fe2308f14307756067b0e18c2968">>},
- {<<"user_id">>,11}
- ]
- }}
- ],
- {Match, _, _} = doc_member(Cursor, RowProps),
- ?assertEqual(Match, ok).
-
-
--endif.