summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarren Smith <garren.smith@gmail.com>2018-08-08 09:56:10 +0200
committergarren smith <garren.smith@gmail.com>2018-08-08 19:24:38 +0200
commita6bc72e76c56a1befa8675b06016ecda46ef3a2d (patch)
treef8227e28677a5f9044bed4209f91728955a0174d
parenta7f2aa5175c8fad8f946c3d2ff79558b74b8ee18 (diff)
downloadcouchdb-a6bc72e76c56a1befa8675b06016ecda46ef3a2d.tar.gz
Move mango selector matching to the shard level
This moves the Mango selector matching down to the shard level. this would mean that the document is retrieved from the index and matched against the selector before being sent to the coordinator node. This reduces the network traffic for a mango query Co-authored-by: Paul J. Davis <paul.joseph.davis@gmail.com> Co-authored-by: Garren Smith <garren.smith@gmail.com>
-rw-r--r--src/fabric/src/fabric_rpc.erl19
-rw-r--r--src/mango/src/mango_cursor_view.erl112
-rw-r--r--src/mango/src/mango_execution_stats.erl7
-rw-r--r--src/mango/test/20-no-timeout-test.py38
4 files changed, 156 insertions, 20 deletions
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 60526f495..ef4092d56 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -100,8 +100,8 @@ all_docs(DbName, Options, Args0) ->
set_io_priority(DbName, Options),
Args = fix_skip_and_limit(Args1),
{ok, Db} = get_or_create_db(DbName, Options),
- VAcc0 = #vacc{db=Db},
- couch_mrview:query_all_docs(Db, Args, fun view_cb/2, VAcc0)
+ CB = get_view_cb(Args),
+ couch_mrview:query_all_docs(Db, Args, CB, Args)
end.
update_mrview(DbName, {DDocId, Rev}, ViewName, Args0) ->
@@ -124,8 +124,8 @@ map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
set_io_priority(DbName, DbOptions),
Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
{ok, Db} = get_or_create_db(DbName, DbOptions),
- VAcc0 = #vacc{db=Db},
- couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0).
+ CB = get_view_cb(Args),
+ couch_mrview:query_view(Db, DDoc, ViewName, Args, CB, Args).
%% @equiv reduce_view(DbName, DDoc, ViewName, Args0)
reduce_view(DbName, DDocInfo, ViewName, Args0) ->
@@ -303,6 +303,17 @@ get_or_create_db(DbName, Options) ->
couch_db:open_int(DbName, [{create_if_missing, true} | Options]).
+get_view_cb(#mrargs{extra = Options}) ->
+ case couch_util:get_value(callback, Options) of
+ {Mod, Fun} when is_atom(Mod), is_atom(Fun) ->
+ fun Mod:Fun/2;
+ _ ->
+ fun view_cb/2
+ end;
+get_view_cb(_) ->
+ fun view_cb/2.
+
+
view_cb({meta, Meta}, Acc) ->
% Map function starting
ok = rexi:stream2({meta, Meta}),
diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl
index dbea36e77..51ec68c45 100644
--- a/src/mango/src/mango_cursor_view.erl
+++ b/src/mango/src/mango_cursor_view.erl
@@ -19,6 +19,7 @@
]).
-export([
+ view_cb/2,
handle_message/2,
handle_all_docs_message/2,
composite_indexes/2,
@@ -28,9 +29,13 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric.hrl").
+
-include("mango_cursor.hrl").
-include("mango_idx_view.hrl").
+-define(HEARTBEAT_INTERVAL_IN_USEC, 4000000).
+
create(Db, Indexes, Selector, Opts) ->
FieldRanges = mango_idx_view:field_ranges(Selector),
Composited = composite_indexes(Indexes, FieldRanges),
@@ -93,13 +98,14 @@ maybe_replace_max_json([H | T] = EndKey) when is_list(EndKey) ->
maybe_replace_max_json(EndKey) ->
EndKey.
-base_args(#cursor{index = Idx} = Cursor) ->
+base_args(#cursor{index = Idx, selector = Selector} = Cursor) ->
#mrargs{
view_type = map,
reduce = false,
start_key = mango_idx:start_key(Idx, Cursor#cursor.ranges),
end_key = mango_idx:end_key(Idx, Cursor#cursor.ranges),
- include_docs = true
+ include_docs = true,
+ extra = [{callback, {?MODULE, view_cb}}, {selector, Selector}]
}.
@@ -210,22 +216,84 @@ choose_best_index(_DbName, IndexRanges) ->
{SelectedIndex, SelectedIndexRanges}.
+view_cb({meta, Meta}, Acc) ->
+ % Map function starting
+ put(mango_docs_examined, 0),
+ set_mango_msg_timestamp(),
+ ok = rexi:stream2({meta, Meta}),
+ {ok, Acc};
+view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
+ ViewRow = #view_row{
+ id = couch_util:get_value(id, Row),
+ key = couch_util:get_value(key, Row),
+ doc = couch_util:get_value(doc, Row)
+ },
+ case ViewRow#view_row.doc of
+ undefined ->
+ ViewRow2 = ViewRow#view_row{
+ value = couch_util:get_value(value, Row)
+ },
+ ok = rexi:stream2(ViewRow2),
+ put(mango_docs_examined, 0),
+ set_mango_msg_timestamp();
+ Doc ->
+ Selector = couch_util:get_value(selector, Options),
+ case mango_selector:match(Selector, Doc) of
+ true ->
+ ViewRow2 = ViewRow#view_row{
+ value = get(mango_docs_examined) + 1
+ },
+ ok = rexi:stream2(ViewRow2),
+ put(mango_docs_examined, 0),
+ set_mango_msg_timestamp();
+ false ->
+ put(mango_docs_examined, get(mango_docs_examined) + 1),
+ maybe_send_mango_ping()
+ end
+ end,
+ {ok, Acc};
+view_cb(complete, Acc) ->
+ % Finish view output
+ ok = rexi:stream_last(complete),
+ {ok, Acc};
+view_cb(ok, ddoc_updated) ->
+ rexi:reply({ok, ddoc_updated}).
+
+
+maybe_send_mango_ping() ->
+ Current = os:timestamp(),
+ LastPing = get(mango_last_msg_timestamp),
+ % Fabric will timeout if it has not heard a response from a worker node
+ % after 5 seconds. Send a ping every 4 seconds so the timeout doesn't happen.
+ case timer:now_diff(Current, LastPing) > ?HEARTBEAT_INTERVAL_IN_USEC of
+ false ->
+ ok;
+ true ->
+ rexi:ping(),
+ set_mango_msg_timestamp()
+ end.
+
+
+set_mango_msg_timestamp() ->
+ put(mango_last_msg_timestamp, os:timestamp()).
+
+
handle_message({meta, _}, Cursor) ->
{ok, Cursor};
handle_message({row, Props}, Cursor) ->
- case doc_member(Cursor#cursor.db, Props, Cursor#cursor.opts, Cursor#cursor.execution_stats) of
+ case doc_member(Cursor, Props) of
{ok, Doc, {execution_stats, ExecutionStats1}} ->
Cursor1 = Cursor#cursor {
execution_stats = ExecutionStats1
},
- case mango_selector:match(Cursor1#cursor.selector, Doc) of
- true ->
- Cursor2 = update_bookmark_keys(Cursor1, Props),
- FinalDoc = mango_fields:extract(Doc, Cursor2#cursor.fields),
- handle_doc(Cursor2, FinalDoc);
- false ->
- {ok, Cursor1}
- end;
+ Cursor2 = update_bookmark_keys(Cursor1, Props),
+ FinalDoc = mango_fields:extract(Doc, Cursor2#cursor.fields),
+ handle_doc(Cursor2, FinalDoc);
+ {no_match, _, {execution_stats, ExecutionStats1}} ->
+ Cursor1 = Cursor#cursor {
+ execution_stats = ExecutionStats1
+ },
+ {ok, Cursor1};
Error ->
couch_log:error("~s :: Error loading doc: ~p", [?MODULE, Error]),
{ok, Cursor}
@@ -332,17 +400,31 @@ apply_opts([{_, _} | Rest], Args) ->
apply_opts(Rest, Args).
-doc_member(Db, RowProps, Opts, ExecutionStats) ->
+doc_member(Cursor, RowProps) ->
+ Db = Cursor#cursor.db,
+ Opts = Cursor#cursor.opts,
+ ExecutionStats = Cursor#cursor.execution_stats,
+ Selector = Cursor#cursor.selector,
+ Incr = case couch_util:get_value(value, RowProps) of
+ N when is_integer(N) -> N;
+ _ -> 1
+ end,
case couch_util:get_value(doc, RowProps) of
{DocProps} ->
- ExecutionStats1 = mango_execution_stats:incr_docs_examined(ExecutionStats),
+ ExecutionStats1 = mango_execution_stats:incr_docs_examined(ExecutionStats, Incr),
{ok, {DocProps}, {execution_stats, ExecutionStats1}};
undefined ->
ExecutionStats1 = mango_execution_stats:incr_quorum_docs_examined(ExecutionStats),
Id = couch_util:get_value(id, RowProps),
case mango_util:defer(fabric, open_doc, [Db, Id, Opts]) of
- {ok, #doc{}=Doc} ->
- {ok, couch_doc:to_json_obj(Doc, []), {execution_stats, ExecutionStats1}};
+ {ok, #doc{}=DocProps} ->
+ Doc = couch_doc:to_json_obj(DocProps, []),
+ case mango_selector:match(Selector, Doc) of
+ true ->
+ {ok, Doc, {execution_stats, ExecutionStats1}};
+ false ->
+ {no_match, Doc, {execution_stats, ExecutionStats1}}
+ end;
Else ->
Else
end
diff --git a/src/mango/src/mango_execution_stats.erl b/src/mango/src/mango_execution_stats.erl
index afdb417b7..7e8afd782 100644
--- a/src/mango/src/mango_execution_stats.erl
+++ b/src/mango/src/mango_execution_stats.erl
@@ -17,6 +17,7 @@
to_json/1,
incr_keys_examined/1,
incr_docs_examined/1,
+ incr_docs_examined/2,
incr_quorum_docs_examined/1,
incr_results_returned/1,
log_start/1,
@@ -45,8 +46,12 @@ incr_keys_examined(Stats) ->
incr_docs_examined(Stats) ->
+ incr_docs_examined(Stats, 1).
+
+
+incr_docs_examined(Stats, N) ->
Stats#execution_stats {
- totalDocsExamined = Stats#execution_stats.totalDocsExamined + 1
+ totalDocsExamined = Stats#execution_stats.totalDocsExamined + N
}.
diff --git a/src/mango/test/20-no-timeout-test.py b/src/mango/test/20-no-timeout-test.py
new file mode 100644
index 000000000..93dc146a3
--- /dev/null
+++ b/src/mango/test/20-no-timeout-test.py
@@ -0,0 +1,38 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+import mango
+import copy
+import unittest
+
+class LongRunningMangoTest(mango.DbPerClass):
+
+ def setUp(self):
+ self.db.recreate()
+ docs = []
+ for i in range(100000):
+ docs.append({
+ "_id": str(i),
+ "another": "field"
+ })
+ if i % 20000 == 0:
+ self.db.save_docs(docs)
+ docs = []
+
+ # This test should run to completion and not timeout
+ def test_query_does_not_time_out(self):
+ selector = {
+ "_id": {"$gt": 0},
+ "another": "wrong"
+ }
+ docs = self.db.find(selector)
+ self.assertEqual(len(docs), 0)