summaryrefslogtreecommitdiff
path: root/src/mango/src/mango_cursor_view.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mango/src/mango_cursor_view.erl')
-rw-r--r--src/mango/src/mango_cursor_view.erl175
1 files changed, 156 insertions, 19 deletions
diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl
index d5cffbc5c..e044c56fc 100644
--- a/src/mango/src/mango_cursor_view.erl
+++ b/src/mango/src/mango_cursor_view.erl
@@ -79,6 +79,15 @@ viewcbargs_get(fields, Args) when is_map(Args) ->
viewcbargs_get(covering_index, Args) when is_map(Args) ->
maps:get(covering_index, Args, undefined).
+-spec shard_stats_get(Key, Args) -> Stat when
+ Key :: docs_examined | keys_examined,
+ Args :: shard_stats_v2(),
+ Stat :: non_neg_integer().
+shard_stats_get(docs_examined, Args) when is_map(Args) ->
+ maps:get(docs_examined, Args, 0);
+shard_stats_get(keys_examined, Args) when is_map(Args) ->
+ maps:get(keys_examined, Args, 0).
+
-spec create(Db, Indexes, Selector, Options) -> {ok, #cursor{}} when
Db :: database(),
Indexes :: [#idx{}],
@@ -187,7 +196,12 @@ base_args(#cursor{index = Idx, selector = Selector, fields = Fields} = Cursor) -
{selector, Selector},
{callback_args, viewcbargs_new(Selector, Fields, undefined)},
- {ignore_partition_query_limit, true}
+ {ignore_partition_query_limit, true},
+
+ % Request execution statistics in a map. The purpose of this option is
+ % to maintain interoperability on version upgrades.
+ % TODO remove this option in a later version.
+ {execution_stats_map, true}
]
}.
@@ -326,11 +340,13 @@ choose_best_index(IndexRanges) ->
(ok, ddoc_updated) -> any().
view_cb({meta, Meta}, Acc) ->
% Map function starting
- put(mango_docs_examined, 0),
+ mango_execution_stats:shard_init(),
set_mango_msg_timestamp(),
ok = rexi:stream2({meta, Meta}),
{ok, Acc};
view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
+ mango_execution_stats:shard_incr_keys_examined(),
+ couch_stats:increment_counter([mango, keys_examined]),
ViewRow = #view_row{
id = couch_util:get_value(id, Row),
key = couch_util:get_value(key, Row),
@@ -379,14 +395,23 @@ view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
ok = rexi:stream2(ViewRow),
set_mango_msg_timestamp();
{Doc, _} ->
- put(mango_docs_examined, get(mango_docs_examined) + 1),
+ mango_execution_stats:shard_incr_docs_examined(),
couch_stats:increment_counter([mango, docs_examined]),
Process(Doc)
end,
{ok, Acc};
-view_cb(complete, Acc) ->
+view_cb(complete, #mrargs{extra = Options} = Acc) ->
+ ShardStats = mango_execution_stats:shard_get_stats(),
+ Stats =
+ case couch_util:get_value(execution_stats_map, Options, false) of
+ true ->
+ ShardStats;
+ false ->
+ DocsExamined = maps:get(docs_examined, ShardStats),
+ {docs_examined, DocsExamined}
+ end,
% Send shard-level execution stats
- ok = rexi:stream2({execution_stats, {docs_examined, get(mango_docs_examined)}}),
+ ok = rexi:stream2({execution_stats, Stats}),
% Finish view output
ok = rexi:stream_last(complete),
{ok, Acc};
@@ -459,12 +484,20 @@ handle_message({row, Props}, Cursor) ->
couch_log:error("~s :: Error loading doc: ~p", [?MODULE, Error]),
{ok, Cursor}
end;
-handle_message({execution_stats, ShardStats}, #cursor{execution_stats = Stats} = Cursor) ->
- {docs_examined, DocsExamined} = ShardStats,
- Cursor1 = Cursor#cursor{
+handle_message({execution_stats, {docs_examined, DocsExamined}}, Cursor0) ->
+ #cursor{execution_stats = Stats} = Cursor0,
+ Cursor = Cursor0#cursor{
execution_stats = mango_execution_stats:incr_docs_examined(Stats, DocsExamined)
},
- {ok, Cursor1};
+ {ok, Cursor};
+handle_message({execution_stats, #{} = ShardStats}, Cursor0) ->
+ DocsExamined = shard_stats_get(docs_examined, ShardStats),
+ KeysExamined = shard_stats_get(keys_examined, ShardStats),
+ #cursor{execution_stats = Stats0} = Cursor0,
+ Stats1 = mango_execution_stats:incr_docs_examined(Stats0, DocsExamined),
+ Stats = mango_execution_stats:incr_keys_examined(Stats1, KeysExamined),
+ Cursor = Cursor0#cursor{execution_stats = Stats},
+ {ok, Cursor};
handle_message(complete, Cursor) ->
{ok, Cursor};
handle_message({error, Reason}, _Cursor) ->
@@ -702,7 +735,8 @@ base_opts_test() ->
fields => Fields,
covering_index => undefined
}},
- {ignore_partition_query_limit, true}
+ {ignore_partition_query_limit, true},
+ {execution_stats_map, true}
],
MRArgs =
#mrargs{
@@ -945,6 +979,7 @@ execute_test_() ->
[
?TDEF_FE(t_execute_empty),
?TDEF_FE(t_execute_ok_all_docs),
+ ?TDEF_FE(t_execute_ok_all_docs_with_execution_stats),
?TDEF_FE(t_execute_ok_query_view),
?TDEF_FE(t_execute_error)
]
@@ -997,7 +1032,8 @@ t_execute_ok_all_docs(_) ->
fields => Fields,
covering_index => undefined
}},
- {ignore_partition_query_limit, true}
+ {ignore_partition_query_limit, true},
+ {execution_stats_map, true}
],
Args =
#mrargs{
@@ -1060,7 +1096,8 @@ t_execute_ok_query_view(_) ->
fields => Fields,
covering_index => undefined
}},
- {ignore_partition_query_limit, true}
+ {ignore_partition_query_limit, true},
+ {execution_stats_map, true}
],
Args =
#mrargs{
@@ -1084,6 +1121,79 @@ t_execute_ok_query_view(_) ->
?assertEqual({ok, updated_accumulator}, execute(Cursor, fun foo:bar/2, accumulator)),
?assert(meck:called(fabric, query_view, '_')).
+t_execute_ok_all_docs_with_execution_stats(_) ->
+ Bookmark = bookmark,
+ Stats =
+ {[
+ {total_keys_examined, 0},
+ {total_docs_examined, 0},
+ {total_quorum_docs_examined, 0},
+ {results_returned, 0},
+ {execution_time_ms, '_'}
+ ]},
+ UserFnDefinition =
+ [
+ {[{add_key, bookmark, Bookmark}, accumulator], {undefined, updated_accumulator1}},
+ {
+ [{add_key, execution_stats, Stats}, updated_accumulator1],
+ {undefined, updated_accumulator2}
+ }
+ ],
+ meck:expect(foo, bar, UserFnDefinition),
+ Index = #idx{type = <<"json">>, def = all_docs},
+ Selector = {[]},
+ Fields = all_fields,
+ Cursor =
+ #cursor{
+ index = Index,
+ db = db,
+ selector = Selector,
+ fields = Fields,
+ ranges = [{'$gte', start_key, '$lte', end_key}],
+ opts = [{user_ctx, user_ctx}, {execution_stats, true}],
+ bookmark = nil
+ },
+ Cursor1 =
+ Cursor#cursor{
+ user_acc = accumulator,
+ user_fun = fun foo:bar/2,
+ execution_stats = '_'
+ },
+ Cursor2 =
+ Cursor1#cursor{
+ bookmark = Bookmark,
+ bookmark_docid = undefined,
+ bookmark_key = undefined,
+ execution_stats = #execution_stats{executionStartTime = {0, 0, 0}}
+ },
+ Extra =
+ [
+ {callback, {mango_cursor_view, view_cb}},
+ {selector, Selector},
+ {callback_args, #{
+ selector => Selector,
+ fields => Fields,
+ covering_index => undefined
+ }},
+ {ignore_partition_query_limit, true},
+ {execution_stats_map, true}
+ ],
+ Args =
+ #mrargs{
+ view_type = map,
+ reduce = false,
+ start_key = [start_key],
+ end_key = [end_key, ?MAX_JSON_OBJ],
+ include_docs = true,
+ extra = Extra
+ },
+ Parameters = [
+ db, [{user_ctx, user_ctx}], fun mango_cursor_view:handle_all_docs_message/2, Cursor1, Args
+ ],
+ meck:expect(fabric, all_docs, Parameters, meck:val({ok, Cursor2})),
+ ?assertEqual({ok, updated_accumulator2}, execute(Cursor, fun foo:bar/2, accumulator)),
+ ?assert(meck:called(fabric, all_docs, '_')).
+
t_execute_error(_) ->
Cursor =
#cursor{
@@ -1119,7 +1229,8 @@ view_cb_test_() ->
?TDEF_FE(t_view_cb_row_matching_covered_doc),
?TDEF_FE(t_view_cb_row_non_matching_covered_doc),
?TDEF_FE(t_view_cb_row_backwards_compatible),
- ?TDEF_FE(t_view_cb_complete),
+ ?TDEF_FE(t_view_cb_complete_shard_stats_v1),
+ ?TDEF_FE(t_view_cb_complete_shard_stats_v2),
?TDEF_FE(t_view_cb_ok)
]
}.
@@ -1143,7 +1254,7 @@ t_view_cb_row_matching_regular_doc(_) ->
}}
]
},
- put(mango_docs_examined, 0),
+ mango_execution_stats:shard_init(),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assert(meck:called(rexi, stream2, '_')).
@@ -1161,7 +1272,7 @@ t_view_cb_row_non_matching_regular_doc(_) ->
}}
]
},
- put(mango_docs_examined, 0),
+ mango_execution_stats:shard_init(),
put(mango_last_msg_timestamp, os:timestamp()),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assertNot(meck:called(rexi, stream2, '_')).
@@ -1179,6 +1290,7 @@ t_view_cb_row_null_doc(_) ->
}}
]
},
+ mango_execution_stats:shard_init(),
put(mango_last_msg_timestamp, os:timestamp()),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assertNot(meck:called(rexi, stream2, '_')).
@@ -1197,6 +1309,7 @@ t_view_cb_row_missing_doc_triggers_quorum_fetch(_) ->
}}
]
},
+ mango_execution_stats:shard_init(),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assert(meck:called(rexi, stream2, '_')).
@@ -1222,6 +1335,7 @@ t_view_cb_row_matching_covered_doc(_) ->
}}
]
},
+ mango_execution_stats:shard_init(),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assert(meck:called(rexi, stream2, '_')).
@@ -1244,6 +1358,7 @@ t_view_cb_row_non_matching_covered_doc(_) ->
}}
]
},
+ mango_execution_stats:shard_init(),
put(mango_last_msg_timestamp, os:timestamp()),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assertNot(meck:called(rexi, stream2, '_')).
@@ -1252,14 +1367,27 @@ t_view_cb_row_backwards_compatible(_) ->
Row = [{id, id}, {key, key}, {doc, null}],
meck:expect(rexi, stream2, ['_'], undefined),
Accumulator = #mrargs{extra = [{selector, {[]}}]},
+ mango_execution_stats:shard_init(),
put(mango_last_msg_timestamp, os:timestamp()),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assertNot(meck:called(rexi, stream2, '_')).
-t_view_cb_complete(_) ->
+t_view_cb_complete_shard_stats_v1(_) ->
meck:expect(rexi, stream2, [{execution_stats, {docs_examined, '_'}}], meck:val(ok)),
meck:expect(rexi, stream_last, [complete], meck:val(ok)),
- ?assertEqual({ok, accumulator}, view_cb(complete, accumulator)),
+ Accumulator = #mrargs{},
+ mango_execution_stats:shard_init(),
+ ?assertEqual({ok, Accumulator}, view_cb(complete, Accumulator)),
+ ?assert(meck:called(rexi, stream2, '_')),
+ ?assert(meck:called(rexi, stream_last, '_')).
+
+t_view_cb_complete_shard_stats_v2(_) ->
+ ShardStats = #{docs_examined => '_', keys_examined => '_'},
+ meck:expect(rexi, stream2, [{execution_stats, ShardStats}], meck:val(ok)),
+ meck:expect(rexi, stream_last, [complete], meck:val(ok)),
+ Accumulator = #mrargs{extra = [{execution_stats_map, true}]},
+ mango_execution_stats:shard_init(),
+ ?assertEqual({ok, Accumulator}, view_cb(complete, Accumulator)),
?assert(meck:called(rexi, stream2, '_')),
?assert(meck:called(rexi, stream_last, '_')).
@@ -1323,7 +1451,8 @@ handle_message_test_() ->
?TDEF_FE(t_handle_message_row_ok_triggers_quorum_fetch_no_match),
?TDEF_FE(t_handle_message_row_no_match),
?TDEF_FE(t_handle_message_row_error),
- ?TDEF_FE(t_handle_message_execution_stats),
+ ?TDEF_FE(t_handle_message_execution_stats_v1),
+ ?TDEF_FE(t_handle_message_execution_stats_v2),
?TDEF_FE(t_handle_message_complete),
?TDEF_FE(t_handle_message_error)
]
@@ -1455,7 +1584,7 @@ t_handle_message_row_error(_) ->
meck:delete(mango_util, defer, 3),
meck:delete(couch_log, error, 2).
-t_handle_message_execution_stats(_) ->
+t_handle_message_execution_stats_v1(_) ->
ShardStats = {docs_examined, 42},
ExecutionStats = #execution_stats{totalDocsExamined = 11},
ExecutionStats1 = #execution_stats{totalDocsExamined = 53},
@@ -1463,6 +1592,14 @@ t_handle_message_execution_stats(_) ->
Cursor1 = #cursor{execution_stats = ExecutionStats1},
?assertEqual({ok, Cursor1}, handle_message({execution_stats, ShardStats}, Cursor)).
+t_handle_message_execution_stats_v2(_) ->
+ ShardStats = #{docs_examined => 42, keys_examined => 53},
+ ExecutionStats = #execution_stats{totalDocsExamined = 11, totalKeysExamined = 22},
+ ExecutionStats1 = #execution_stats{totalDocsExamined = 53, totalKeysExamined = 75},
+ Cursor = #cursor{execution_stats = ExecutionStats},
+ Cursor1 = #cursor{execution_stats = ExecutionStats1},
+ ?assertEqual({ok, Cursor1}, handle_message({execution_stats, ShardStats}, Cursor)).
+
t_handle_message_complete(_) ->
?assertEqual({ok, cursor}, handle_message(complete, cursor)).