diff options
Diffstat (limited to 'src/mango/src/mango_cursor_view.erl')
-rw-r--r-- | src/mango/src/mango_cursor_view.erl | 175 |
1 files changed, 156 insertions, 19 deletions
diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl index d5cffbc5c..e044c56fc 100644 --- a/src/mango/src/mango_cursor_view.erl +++ b/src/mango/src/mango_cursor_view.erl @@ -79,6 +79,15 @@ viewcbargs_get(fields, Args) when is_map(Args) -> viewcbargs_get(covering_index, Args) when is_map(Args) -> maps:get(covering_index, Args, undefined). +-spec shard_stats_get(Key, Args) -> Stat when + Key :: docs_examined | keys_examined, + Args :: shard_stats_v2(), + Stat :: non_neg_integer(). +shard_stats_get(docs_examined, Args) when is_map(Args) -> + maps:get(docs_examined, Args, 0); +shard_stats_get(keys_examined, Args) when is_map(Args) -> + maps:get(keys_examined, Args, 0). + -spec create(Db, Indexes, Selector, Options) -> {ok, #cursor{}} when Db :: database(), Indexes :: [#idx{}], @@ -187,7 +196,12 @@ base_args(#cursor{index = Idx, selector = Selector, fields = Fields} = Cursor) - {selector, Selector}, {callback_args, viewcbargs_new(Selector, Fields, undefined)}, - {ignore_partition_query_limit, true} + {ignore_partition_query_limit, true}, + + % Request execution statistics in a map. The purpose of this option is + % to maintain interoperability on version upgrades. + % TODO remove this option in a later version. + {execution_stats_map, true} ] }. @@ -326,11 +340,13 @@ choose_best_index(IndexRanges) -> (ok, ddoc_updated) -> any(). view_cb({meta, Meta}, Acc) -> % Map function starting - put(mango_docs_examined, 0), + mango_execution_stats:shard_init(), set_mango_msg_timestamp(), ok = rexi:stream2({meta, Meta}), {ok, Acc}; view_cb({row, Row}, #mrargs{extra = Options} = Acc) -> + mango_execution_stats:shard_incr_keys_examined(), + couch_stats:increment_counter([mango, keys_examined]), ViewRow = #view_row{ id = couch_util:get_value(id, Row), key = couch_util:get_value(key, Row), @@ -379,14 +395,23 @@ view_cb({row, Row}, #mrargs{extra = Options} = Acc) -> ok = rexi:stream2(ViewRow), set_mango_msg_timestamp(); {Doc, _} -> - put(mango_docs_examined, get(mango_docs_examined) + 1), + mango_execution_stats:shard_incr_docs_examined(), couch_stats:increment_counter([mango, docs_examined]), Process(Doc) end, {ok, Acc}; -view_cb(complete, Acc) -> +view_cb(complete, #mrargs{extra = Options} = Acc) -> + ShardStats = mango_execution_stats:shard_get_stats(), + Stats = + case couch_util:get_value(execution_stats_map, Options, false) of + true -> + ShardStats; + false -> + DocsExamined = maps:get(docs_examined, ShardStats), + {docs_examined, DocsExamined} + end, % Send shard-level execution stats - ok = rexi:stream2({execution_stats, {docs_examined, get(mango_docs_examined)}}), + ok = rexi:stream2({execution_stats, Stats}), % Finish view output ok = rexi:stream_last(complete), {ok, Acc}; @@ -459,12 +484,20 @@ handle_message({row, Props}, Cursor) -> couch_log:error("~s :: Error loading doc: ~p", [?MODULE, Error]), {ok, Cursor} end; -handle_message({execution_stats, ShardStats}, #cursor{execution_stats = Stats} = Cursor) -> - {docs_examined, DocsExamined} = ShardStats, - Cursor1 = Cursor#cursor{ +handle_message({execution_stats, {docs_examined, DocsExamined}}, Cursor0) -> + #cursor{execution_stats = Stats} = Cursor0, + Cursor = Cursor0#cursor{ execution_stats = mango_execution_stats:incr_docs_examined(Stats, DocsExamined) }, - {ok, Cursor1}; + {ok, Cursor}; +handle_message({execution_stats, #{} = ShardStats}, Cursor0) -> + DocsExamined = shard_stats_get(docs_examined, ShardStats), + KeysExamined = shard_stats_get(keys_examined, ShardStats), + #cursor{execution_stats = Stats0} = Cursor0, + Stats1 = mango_execution_stats:incr_docs_examined(Stats0, DocsExamined), + Stats = mango_execution_stats:incr_keys_examined(Stats1, KeysExamined), + Cursor = Cursor0#cursor{execution_stats = Stats}, + {ok, Cursor}; handle_message(complete, Cursor) -> {ok, Cursor}; handle_message({error, Reason}, _Cursor) -> @@ -702,7 +735,8 @@ base_opts_test() -> fields => Fields, covering_index => undefined }}, - {ignore_partition_query_limit, true} + {ignore_partition_query_limit, true}, + {execution_stats_map, true} ], MRArgs = #mrargs{ @@ -945,6 +979,7 @@ execute_test_() -> [ ?TDEF_FE(t_execute_empty), ?TDEF_FE(t_execute_ok_all_docs), + ?TDEF_FE(t_execute_ok_all_docs_with_execution_stats), ?TDEF_FE(t_execute_ok_query_view), ?TDEF_FE(t_execute_error) ] @@ -997,7 +1032,8 @@ t_execute_ok_all_docs(_) -> fields => Fields, covering_index => undefined }}, - {ignore_partition_query_limit, true} + {ignore_partition_query_limit, true}, + {execution_stats_map, true} ], Args = #mrargs{ @@ -1060,7 +1096,8 @@ t_execute_ok_query_view(_) -> fields => Fields, covering_index => undefined }}, - {ignore_partition_query_limit, true} + {ignore_partition_query_limit, true}, + {execution_stats_map, true} ], Args = #mrargs{ @@ -1084,6 +1121,79 @@ t_execute_ok_query_view(_) -> ?assertEqual({ok, updated_accumulator}, execute(Cursor, fun foo:bar/2, accumulator)), ?assert(meck:called(fabric, query_view, '_')). +t_execute_ok_all_docs_with_execution_stats(_) -> + Bookmark = bookmark, + Stats = + {[ + {total_keys_examined, 0}, + {total_docs_examined, 0}, + {total_quorum_docs_examined, 0}, + {results_returned, 0}, + {execution_time_ms, '_'} + ]}, + UserFnDefinition = + [ + {[{add_key, bookmark, Bookmark}, accumulator], {undefined, updated_accumulator1}}, + { + [{add_key, execution_stats, Stats}, updated_accumulator1], + {undefined, updated_accumulator2} + } + ], + meck:expect(foo, bar, UserFnDefinition), + Index = #idx{type = <<"json">>, def = all_docs}, + Selector = {[]}, + Fields = all_fields, + Cursor = + #cursor{ + index = Index, + db = db, + selector = Selector, + fields = Fields, + ranges = [{'$gte', start_key, '$lte', end_key}], + opts = [{user_ctx, user_ctx}, {execution_stats, true}], + bookmark = nil + }, + Cursor1 = + Cursor#cursor{ + user_acc = accumulator, + user_fun = fun foo:bar/2, + execution_stats = '_' + }, + Cursor2 = + Cursor1#cursor{ + bookmark = Bookmark, + bookmark_docid = undefined, + bookmark_key = undefined, + execution_stats = #execution_stats{executionStartTime = {0, 0, 0}} + }, + Extra = + [ + {callback, {mango_cursor_view, view_cb}}, + {selector, Selector}, + {callback_args, #{ + selector => Selector, + fields => Fields, + covering_index => undefined + }}, + {ignore_partition_query_limit, true}, + {execution_stats_map, true} + ], + Args = + #mrargs{ + view_type = map, + reduce = false, + start_key = [start_key], + end_key = [end_key, ?MAX_JSON_OBJ], + include_docs = true, + extra = Extra + }, + Parameters = [ + db, [{user_ctx, user_ctx}], fun mango_cursor_view:handle_all_docs_message/2, Cursor1, Args + ], + meck:expect(fabric, all_docs, Parameters, meck:val({ok, Cursor2})), + ?assertEqual({ok, updated_accumulator2}, execute(Cursor, fun foo:bar/2, accumulator)), + ?assert(meck:called(fabric, all_docs, '_')). + t_execute_error(_) -> Cursor = #cursor{ @@ -1119,7 +1229,8 @@ view_cb_test_() -> ?TDEF_FE(t_view_cb_row_matching_covered_doc), ?TDEF_FE(t_view_cb_row_non_matching_covered_doc), ?TDEF_FE(t_view_cb_row_backwards_compatible), - ?TDEF_FE(t_view_cb_complete), + ?TDEF_FE(t_view_cb_complete_shard_stats_v1), + ?TDEF_FE(t_view_cb_complete_shard_stats_v2), ?TDEF_FE(t_view_cb_ok) ] }. @@ -1143,7 +1254,7 @@ t_view_cb_row_matching_regular_doc(_) -> }} ] }, - put(mango_docs_examined, 0), + mango_execution_stats:shard_init(), ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assert(meck:called(rexi, stream2, '_')). @@ -1161,7 +1272,7 @@ t_view_cb_row_non_matching_regular_doc(_) -> }} ] }, - put(mango_docs_examined, 0), + mango_execution_stats:shard_init(), put(mango_last_msg_timestamp, os:timestamp()), ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assertNot(meck:called(rexi, stream2, '_')). @@ -1179,6 +1290,7 @@ t_view_cb_row_null_doc(_) -> }} ] }, + mango_execution_stats:shard_init(), put(mango_last_msg_timestamp, os:timestamp()), ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assertNot(meck:called(rexi, stream2, '_')). @@ -1197,6 +1309,7 @@ t_view_cb_row_missing_doc_triggers_quorum_fetch(_) -> }} ] }, + mango_execution_stats:shard_init(), ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assert(meck:called(rexi, stream2, '_')). @@ -1222,6 +1335,7 @@ t_view_cb_row_matching_covered_doc(_) -> }} ] }, + mango_execution_stats:shard_init(), ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assert(meck:called(rexi, stream2, '_')). @@ -1244,6 +1358,7 @@ t_view_cb_row_non_matching_covered_doc(_) -> }} ] }, + mango_execution_stats:shard_init(), put(mango_last_msg_timestamp, os:timestamp()), ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assertNot(meck:called(rexi, stream2, '_')). @@ -1252,14 +1367,27 @@ t_view_cb_row_backwards_compatible(_) -> Row = [{id, id}, {key, key}, {doc, null}], meck:expect(rexi, stream2, ['_'], undefined), Accumulator = #mrargs{extra = [{selector, {[]}}]}, + mango_execution_stats:shard_init(), put(mango_last_msg_timestamp, os:timestamp()), ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assertNot(meck:called(rexi, stream2, '_')). -t_view_cb_complete(_) -> +t_view_cb_complete_shard_stats_v1(_) -> meck:expect(rexi, stream2, [{execution_stats, {docs_examined, '_'}}], meck:val(ok)), meck:expect(rexi, stream_last, [complete], meck:val(ok)), - ?assertEqual({ok, accumulator}, view_cb(complete, accumulator)), + Accumulator = #mrargs{}, + mango_execution_stats:shard_init(), + ?assertEqual({ok, Accumulator}, view_cb(complete, Accumulator)), + ?assert(meck:called(rexi, stream2, '_')), + ?assert(meck:called(rexi, stream_last, '_')). + +t_view_cb_complete_shard_stats_v2(_) -> + ShardStats = #{docs_examined => '_', keys_examined => '_'}, + meck:expect(rexi, stream2, [{execution_stats, ShardStats}], meck:val(ok)), + meck:expect(rexi, stream_last, [complete], meck:val(ok)), + Accumulator = #mrargs{extra = [{execution_stats_map, true}]}, + mango_execution_stats:shard_init(), + ?assertEqual({ok, Accumulator}, view_cb(complete, Accumulator)), ?assert(meck:called(rexi, stream2, '_')), ?assert(meck:called(rexi, stream_last, '_')). @@ -1323,7 +1451,8 @@ handle_message_test_() -> ?TDEF_FE(t_handle_message_row_ok_triggers_quorum_fetch_no_match), ?TDEF_FE(t_handle_message_row_no_match), ?TDEF_FE(t_handle_message_row_error), - ?TDEF_FE(t_handle_message_execution_stats), + ?TDEF_FE(t_handle_message_execution_stats_v1), + ?TDEF_FE(t_handle_message_execution_stats_v2), ?TDEF_FE(t_handle_message_complete), ?TDEF_FE(t_handle_message_error) ] @@ -1455,7 +1584,7 @@ t_handle_message_row_error(_) -> meck:delete(mango_util, defer, 3), meck:delete(couch_log, error, 2). -t_handle_message_execution_stats(_) -> +t_handle_message_execution_stats_v1(_) -> ShardStats = {docs_examined, 42}, ExecutionStats = #execution_stats{totalDocsExamined = 11}, ExecutionStats1 = #execution_stats{totalDocsExamined = 53}, @@ -1463,6 +1592,14 @@ t_handle_message_execution_stats(_) -> Cursor1 = #cursor{execution_stats = ExecutionStats1}, ?assertEqual({ok, Cursor1}, handle_message({execution_stats, ShardStats}, Cursor)). +t_handle_message_execution_stats_v2(_) -> + ShardStats = #{docs_examined => 42, keys_examined => 53}, + ExecutionStats = #execution_stats{totalDocsExamined = 11, totalKeysExamined = 22}, + ExecutionStats1 = #execution_stats{totalDocsExamined = 53, totalKeysExamined = 75}, + Cursor = #cursor{execution_stats = ExecutionStats}, + Cursor1 = #cursor{execution_stats = ExecutionStats1}, + ?assertEqual({ok, Cursor1}, handle_message({execution_stats, ShardStats}, Cursor)). + t_handle_message_complete(_) -> ?assertEqual({ok, cursor}, handle_message(complete, cursor)). |