summaryrefslogtreecommitdiff
path: root/src/mango
diff options
context:
space:
mode:
authorMike Rhodes <mike.rhodes@dx13.co.uk>2023-01-06 11:56:46 +0000
committerJan Lehnardt <jan@apache.org>2023-01-20 18:04:31 +0100
commit00e24b0dfb3435dfb531a755a9a46442936cbaea (patch)
tree30b8defb68d6897e3f465e333ce3358d5fbeea4e /src/mango
parent5f2d5b135b5ebf3a3e19e2200afdacf9fe0cb1fa (diff)
downloadcouchdb-00e24b0dfb3435dfb531a755a9a46442936cbaea.tar.gz
Push down field projection in mango to shard
This commit aims to improve Mango by reducing the data transferred to the coordinator during query execution. It may reduce memory or CPU use at the coordinator but that isn't the primary goal. Currently, when documents are read at the shard level, they are compared locally at the shard with the selector to ensure they match before they are sent to the coordinator. This ensures we're not sending documents across the network that the coordinator immediately discards, saving bandwidth and coordinator processing. This commit further executes field projection (`fields` in the query) at the shard level. This should further save bandwidth, particularly for queries that project few fields from large documents. One item of complexity is that a query may request a quorum read of documents, meaning that we need to do the document read at the coordinator and not the shard, then perform the `selector` and `fields` processing there rather than at the shard. To ensure that documents are processed consistently whether at the shard or coordinator, match_and_extract_doc/3 is added. There is still one orphan call outside match_and_extract_doc/2 to extract/2 which supports cluster upgrade and should later be removed. Shard level processing is already performed in a callback, view_cb/2, that's passed to fabric's view processing to run for each row in the view result set. It's used for the shard local selector and fields processing. To make it clear what arguments are destined for this callback, the commit encapsulates the arguments, using viewcbargs_new/2 and viewcbargs_get/2. As we push down more functionality to the shard, the context this function needs to carry with it will increase, so having a record for it will be valuable. Supporting cluster upgrades: The commit supports shard pushdown for Mango `fields` processing for situations during rolling cluster upgrades. In the state where the coordinator is speaking to an upgraded node, the view_cb/2 needs to support being passed just the `selector` outside of the new viewcbargs record. In this case, the shard will not process fields, but the coordinator will. In the situation where the coordinator is upgraded but the shard is not, we need to send the selector to the shard via `selector` and also execute the fields projection at the coordinator. Therefore we pass arguments to view_cb/2 via both `selector` and `callback_args` and have an apparently spurious field projection (mango_fields:extract/2) in the code that receives back values from the shard ( factored out into doc_member_and_extract). Both of these affordances should only need to exist through one minor version change and be removed thereafter -- if people are jumping several minor versions of CouchDB in one go, hopefully they are prepared for a bit of trouble. Testing upgrade states: As view_cb is completely separate from the rest of the cursor code, we can first try out the branch's code using view_cb from `main`, and then the other way -- the branch's view_cb with the rest of the file from main. I did both of these tests successfully.
Diffstat (limited to 'src/mango')
-rw-r--r--src/mango/src/mango_cursor_view.erl169
1 files changed, 146 insertions, 23 deletions
diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl
index 9202ce071..47195341c 100644
--- a/src/mango/src/mango_cursor_view.erl
+++ b/src/mango/src/mango_cursor_view.erl
@@ -35,6 +35,19 @@
-define(HEARTBEAT_INTERVAL_IN_USEC, 4000000).
+% viewcbargs wraps up the arguments that view_cb uses into a single
+% entry in the mrargs.extra list. We use a Map to allow us to later
+% add fields without having old messages causing errors/crashes.
+viewcbargs_new(Selector, Fields) ->
+ #{
+ selector => Selector,
+ fields => Fields
+ }.
+viewcbargs_get(selector, Args) when is_map(Args) ->
+ maps:get(selector, Args, undefined);
+viewcbargs_get(fields, Args) when is_map(Args) ->
+ maps:get(fields, Args, undefined).
+
create(Db, Indexes, Selector, Opts) ->
FieldRanges = mango_idx_view:field_ranges(Selector),
Composited = composite_indexes(Indexes, FieldRanges),
@@ -100,7 +113,7 @@ maybe_replace_max_json([H | T] = EndKey) when is_list(EndKey) ->
maybe_replace_max_json(EndKey) ->
EndKey.
-base_args(#cursor{index = Idx, selector = Selector} = Cursor) ->
+base_args(#cursor{index = Idx, selector = Selector, fields = Fields} = Cursor) ->
{StartKey, EndKey} =
case Cursor#cursor.ranges of
[empty] ->
@@ -118,8 +131,13 @@ base_args(#cursor{index = Idx, selector = Selector} = Cursor) ->
end_key = EndKey,
include_docs = true,
extra = [
+ % view_cb pushes down post hoc matching and field extraction to
+ % the shard.
{callback, {?MODULE, view_cb}},
+ % TODO remove selector. It supports older nodes during version upgrades.
{selector, Selector},
+ {callback_args, viewcbargs_new(Selector, Fields)},
+
{ignore_partition_query_limit, true}
]
}.
@@ -248,6 +266,19 @@ view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
key = couch_util:get_value(key, Row),
doc = couch_util:get_value(doc, Row)
},
+ % This supports receiving our "arguments" either as just the `selector`
+ % or in the new record in `callback_args`. This is to support mid-upgrade
+ % clusters where the non-upgraded coordinator nodes will send the older style.
+ % TODO remove this in a couple of couchdb versions.
+ {Selector, Fields} =
+ case couch_util:get_value(callback_args, Options) of
+ % old style
+ undefined ->
+ {couch_util:get_value(selector, Options), undefined};
+ % new style - assume a viewcbargs
+ Args = #{} ->
+ {viewcbargs_get(selector, Args), viewcbargs_get(fields, Args)}
+ end,
case ViewRow#view_row.doc of
null ->
maybe_send_mango_ping();
@@ -256,14 +287,18 @@ view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
ok = rexi:stream2(ViewRow),
set_mango_msg_timestamp();
Doc ->
+ % We slightly abuse the doc field in the view response here,
+ % because we may return something other than the full document:
+ % we may have projected the requested `fields` from the query.
+ % However, this oddness is confined to being visible in this module.
put(mango_docs_examined, get(mango_docs_examined) + 1),
- Selector = couch_util:get_value(selector, Options),
couch_stats:increment_counter([mango, docs_examined]),
- case mango_selector:match(Selector, Doc) of
- true ->
- ok = rexi:stream2(ViewRow),
+ case match_and_extract_doc(Doc, Selector, Fields) of
+ {match, FinalDoc} ->
+ FinalViewRow = ViewRow#view_row{doc = FinalDoc},
+ ok = rexi:stream2(FinalViewRow),
set_mango_msg_timestamp();
- false ->
+ {no_match, undefined} ->
maybe_send_mango_ping()
end
end,
@@ -277,6 +312,22 @@ view_cb(complete, Acc) ->
view_cb(ok, ddoc_updated) ->
rexi:reply({ok, ddoc_updated}).
+%% match_and_extract_doc checks whether Doc matches Selector. If it does,
+%% extract Fields and return {match, FinalDoc}; otherwise return {no_match, undefined}.
+-spec match_and_extract_doc(
+ Doc :: term(),
+ Selector :: term(),
+ Fields :: [string()] | undefined | all_fields
+) -> {match | no_match, term() | undefined}.
+match_and_extract_doc(Doc, Selector, Fields) ->
+ case mango_selector:match(Selector, Doc) of
+ true ->
+ FinalDoc = mango_fields:extract(Doc, Fields),
+ {match, FinalDoc};
+ false ->
+ {no_match, undefined}
+ end.
+
maybe_send_mango_ping() ->
Current = os:timestamp(),
LastPing = get(mango_last_msg_timestamp),
@@ -296,14 +347,13 @@ set_mango_msg_timestamp() ->
handle_message({meta, _}, Cursor) ->
{ok, Cursor};
handle_message({row, Props}, Cursor) ->
- case doc_member(Cursor, Props) of
+ case doc_member_and_extract(Cursor, Props) of
{ok, Doc, {execution_stats, Stats}} ->
Cursor1 = Cursor#cursor{
execution_stats = Stats
},
Cursor2 = update_bookmark_keys(Cursor1, Props),
- FinalDoc = mango_fields:extract(Doc, Cursor2#cursor.fields),
- handle_doc(Cursor2, FinalDoc);
+ handle_doc(Cursor2, Doc);
{no_match, _, {execution_stats, Stats}} ->
Cursor1 = Cursor#cursor{
execution_stats = Stats
@@ -422,16 +472,21 @@ apply_opts([{_, _} | Rest], Args) ->
% Ignore unknown options
apply_opts(Rest, Args).
-doc_member(Cursor, RowProps) ->
+doc_member_and_extract(Cursor, RowProps) ->
Db = Cursor#cursor.db,
Opts = Cursor#cursor.opts,
ExecutionStats = Cursor#cursor.execution_stats,
Selector = Cursor#cursor.selector,
case couch_util:get_value(doc, RowProps) of
{DocProps} ->
- % only matching documents are returned; the selector
- % is evaluated at the shard level in view_cb({row, Row},
- {ok, {DocProps}, {execution_stats, ExecutionStats}};
+ % If the query doesn't request quorum doc read via r>1,
+ % match_and_extract_doc/3 is executed in view_cb, ie, locally
+ % on the shard. We only receive back the final result for the query.
+ % TODO during upgrade, some nodes will not be processing `fields`
+ % on the shard because they're old, so re-execute here just in case.
+ % Remove this later, same time as the duplicate extract at the coordinator.
+ DocProps2 = mango_fields:extract({DocProps}, Cursor#cursor.fields),
+ {ok, DocProps2, {execution_stats, ExecutionStats}};
undefined ->
% an undefined doc was returned, indicating we should
% perform a quorum fetch
@@ -441,7 +496,12 @@ doc_member(Cursor, RowProps) ->
case mango_util:defer(fabric, open_doc, [Db, Id, Opts]) of
{ok, #doc{} = DocProps} ->
Doc = couch_doc:to_json_obj(DocProps, []),
- match_doc(Selector, Doc, ExecutionStats1);
+ case match_and_extract_doc(Doc, Selector, Cursor#cursor.fields) of
+ {match, FinalDoc} ->
+ {ok, FinalDoc, {execution_stats, ExecutionStats1}};
+ {no_match, undefined} ->
+ {no_match, Doc, {execution_stats, ExecutionStats1}}
+ end;
Else ->
Else
end;
@@ -450,14 +510,6 @@ doc_member(Cursor, RowProps) ->
{no_match, null, {execution_stats, ExecutionStats}}
end.
-match_doc(Selector, Doc, ExecutionStats) ->
- case mango_selector:match(Selector, Doc) of
- true ->
- {ok, Doc, {execution_stats, ExecutionStats}};
- false ->
- {no_match, Doc, {execution_stats, ExecutionStats}}
- end.
-
is_design_doc(RowProps) ->
case couch_util:get_value(id, RowProps) of
<<"_design/", _/binary>> -> true;
@@ -479,6 +531,8 @@ update_bookmark_keys(Cursor, _Props) ->
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
+%% Test the doc_member_and_extract bypasses the selector check if it receives
+%% a document in RowProps.doc.
does_not_refetch_doc_with_value_test() ->
Cursor = #cursor{
db = <<"db">>,
@@ -498,7 +552,76 @@ does_not_refetch_doc_with_value_test() ->
]
}}
],
- {Match, _, _} = doc_member(Cursor, RowProps),
+ {Match, _, _} = doc_member_and_extract(Cursor, RowProps),
?assertEqual(Match, ok).
+%% Test that field filtering is duplicated in doc_member_and_extract even when
+%% returning a value via RowProps.doc (ie, should have been done on the shard).
+%% This is needed temporarily for mixed version upgrades, as some shards may
+%% not have performed the field extraction. This can be later removed.
+doc_member_and_extract_fields_test() ->
+ Cursor = #cursor{
+ db = <<"db">>,
+ opts = [],
+ execution_stats = #execution_stats{},
+ %% no selector here as we should be bypassing this in the case of
+ %% shard level selector application.
+ fields = [<<"user_id">>, <<"a_non_existent_field">>]
+ },
+ RowProps = [
+ {id, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
+ {key, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
+ {doc,
+ {
+ [
+ {<<"_id">>, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
+ {<<"_rev">>, <<"1-a954fe2308f14307756067b0e18c2968">>},
+ {<<"user_id">>, 11}
+ ]
+ }}
+ ],
+ {Match, Doc, _} = doc_member_and_extract(Cursor, RowProps),
+ ?assertEqual(ok, Match),
+ ?assertEqual({[{<<"user_id">>, 11}]}, Doc).
+
+%% match_and_extract_doc should return full Doc when Doc matches Selector and
+%% Fields is undefined.
+match_and_extract_doc_match_test() ->
+ Doc = {[{<<"_id">>, <<"myid">>}, {<<"_rev">>, <<"myrev">>}, {<<"user_id">>, 11}]},
+ Selector = mango_selector:normalize({[{<<"user_id">>, 11}]}),
+ Fields = undefined,
+ {Match, FinalDoc} = match_and_extract_doc(Doc, Selector, Fields),
+ ?assertEqual(match, Match),
+ ?assertEqual(Doc, FinalDoc).
+
+%% match_and_extract_doc should return projected Doc when Doc matches Selector
+%% and Fields is a list of fields.
+match_and_extract_doc_matchextract_test() ->
+ Doc = {[{<<"_id">>, <<"myid">>}, {<<"_rev">>, <<"myrev">>}, {<<"user_id">>, 11}]},
+ Selector = mango_selector:normalize({[{<<"user_id">>, 11}]}),
+ Fields = [<<"_id">>, <<"user_id">>],
+ {Match, FinalDoc} = match_and_extract_doc(Doc, Selector, Fields),
+ ?assertEqual(match, Match),
+ ?assertEqual({[{<<"_id">>, <<"myid">>}, {<<"user_id">>, 11}]}, FinalDoc).
+
+%% match_and_extract_doc should return no document when Doc does not match
+%% Selector.
+match_and_extract_doc_nomatch_test() ->
+ Doc = {[{<<"_id">>, <<"myid">>}, {<<"_rev">>, <<"myrev">>}, {<<"user_id">>, 11}]},
+ Selector = mango_selector:normalize({[{<<"user_id">>, <<"1234">>}]}),
+ Fields = undefined,
+ {Match, FinalDoc} = match_and_extract_doc(Doc, Selector, Fields),
+ ?assertEqual(no_match, Match),
+ ?assertEqual(undefined, FinalDoc).
+
+%% match_and_extract_doc should return no document when Doc does not match
+%% Selector even if Fields is defined.
+match_and_extract_doc_nomatch_fields_test() ->
+ Doc = {[{<<"_id">>, <<"myid">>}, {<<"_rev">>, <<"myrev">>}, {<<"user_id">>, 11}]},
+ Selector = mango_selector:normalize({[{<<"user_id">>, 1234}]}),
+ Fields = [<<"_id">>, <<"user_id">>],
+ {Match, FinalDoc} = match_and_extract_doc(Doc, Selector, Fields),
+ ?assertEqual(no_match, Match),
+ ?assertEqual(undefined, FinalDoc).
+
-endif.