From bc3e230523e4677e2f3fed64ea89c369182a9272 Mon Sep 17 00:00:00 2001 From: Charlie Swanson Date: Mon, 28 Aug 2017 15:10:42 -0400 Subject: SERVER-30704 Use ARM to merge agg cursors on mongos. --- ...aggregation_sharded_collections_passthrough.yml | 1 - ...harding_last_stable_mongos_and_mixed_shards.yml | 2 + jstests/aggregation/bugs/server6118.js | 39 ---- jstests/aggregation/bugs/server6125.js | 4 +- jstests/aggregation/bugs/sort_arrays.js | 16 ++ jstests/aggregation/mongos_merge.js | 19 +- jstests/aggregation/use_query_project_and_sort.js | 72 +++++++ jstests/aggregation/use_query_projection.js | 80 ++++++++ jstests/aggregation/use_query_sort.js | 86 ++++++++ jstests/libs/analyze_plan.js | 42 ++++ jstests/sharding/agg_sort.js | 225 +++++++++++++++++++++ src/mongo/db/pipeline/dependencies.cpp | 50 +++-- src/mongo/db/pipeline/dependencies.h | 28 ++- src/mongo/db/pipeline/dependencies_test.cpp | 5 +- src/mongo/db/pipeline/document.cpp | 21 +- src/mongo/db/pipeline/document.h | 25 ++- src/mongo/db/pipeline/document_internal.h | 16 ++ src/mongo/db/pipeline/document_source.h | 2 +- src/mongo/db/pipeline/document_source_limit.cpp | 4 +- src/mongo/db/pipeline/document_source_limit.h | 43 ++-- src/mongo/db/pipeline/document_source_match.cpp | 13 +- src/mongo/db/pipeline/document_source_match.h | 6 +- .../db/pipeline/document_source_match_test.cpp | 6 +- .../db/pipeline/document_source_merge_cursors.cpp | 8 +- .../db/pipeline/document_source_merge_cursors.h | 11 +- src/mongo/db/pipeline/document_source_sample.cpp | 8 +- src/mongo/db/pipeline/document_source_sample.h | 6 +- .../document_source_sample_from_random_cursor.cpp | 5 + src/mongo/db/pipeline/document_source_skip.cpp | 4 +- src/mongo/db/pipeline/document_source_skip.h | 36 ++-- src/mongo/db/pipeline/document_source_sort.cpp | 177 ++++++++++++---- src/mongo/db/pipeline/document_source_sort.h | 39 +++- src/mongo/db/pipeline/pipeline.cpp | 22 +- src/mongo/db/pipeline/pipeline.h | 6 + src/mongo/db/pipeline/pipeline_d.cpp | 65 +++--- src/mongo/db/query/query_planner.cpp | 20 +- src/mongo/s/commands/cluster_aggregate.cpp | 13 +- src/mongo/s/query/SConscript | 6 +- src/mongo/s/query/async_results_merger.cpp | 11 +- src/mongo/s/query/async_results_merger.h | 8 +- src/mongo/s/query/cluster_client_cursor_impl.cpp | 99 +++++++-- src/mongo/s/query/router_exec_stage.h | 27 ++- .../s/query/router_stage_aggregation_merge.cpp | 79 -------- src/mongo/s/query/router_stage_aggregation_merge.h | 62 ------ src/mongo/s/query/router_stage_limit.cpp | 12 -- src/mongo/s/query/router_stage_limit.h | 6 - src/mongo/s/query/router_stage_merge.cpp | 2 +- src/mongo/s/query/router_stage_merge.h | 3 +- src/mongo/s/query/router_stage_mock.cpp | 2 +- src/mongo/s/query/router_stage_mock.h | 5 +- src/mongo/s/query/router_stage_pipeline.cpp | 135 +++++++++++++ src/mongo/s/query/router_stage_pipeline.h | 63 ++++++ .../query/router_stage_remove_metadata_fields.cpp | 89 ++++++++ .../s/query/router_stage_remove_metadata_fields.h | 54 +++++ .../router_stage_remove_metadata_fields_test.cpp | 196 ++++++++++++++++++ src/mongo/s/query/router_stage_remove_sortkey.cpp | 75 ------- src/mongo/s/query/router_stage_remove_sortkey.h | 53 ----- .../s/query/router_stage_remove_sortkey_test.cpp | 171 ---------------- src/mongo/s/query/router_stage_skip.cpp | 12 -- src/mongo/s/query/router_stage_skip.h | 6 - src/mongo/s/query/store_possible_cursor.cpp | 9 +- 61 files changed, 1668 insertions(+), 742 deletions(-) delete mode 100644 jstests/aggregation/bugs/server6118.js create mode 100644 jstests/aggregation/bugs/sort_arrays.js create mode 100644 jstests/aggregation/use_query_project_and_sort.js create mode 100644 jstests/aggregation/use_query_projection.js create mode 100644 jstests/aggregation/use_query_sort.js create mode 100644 jstests/sharding/agg_sort.js delete mode 100644 src/mongo/s/query/router_stage_aggregation_merge.cpp delete mode 100644 src/mongo/s/query/router_stage_aggregation_merge.h create mode 100644 src/mongo/s/query/router_stage_pipeline.cpp create mode 100644 src/mongo/s/query/router_stage_pipeline.h create mode 100644 src/mongo/s/query/router_stage_remove_metadata_fields.cpp create mode 100644 src/mongo/s/query/router_stage_remove_metadata_fields.h create mode 100644 src/mongo/s/query/router_stage_remove_metadata_fields_test.cpp delete mode 100644 src/mongo/s/query/router_stage_remove_sortkey.cpp delete mode 100644 src/mongo/s/query/router_stage_remove_sortkey.h delete mode 100644 src/mongo/s/query/router_stage_remove_sortkey_test.cpp diff --git a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml index 16fcbfe29ad..c326b8e4875 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml @@ -8,7 +8,6 @@ selector: - jstests/aggregation/sources/*/*.js exclude_files: # The following tests start their own ShardingTest. - - jstests/aggregation/bugs/server6118.js - jstests/aggregation/bugs/server6179.js - jstests/aggregation/bugs/server7781.js - jstests/aggregation/mongos_merge.js diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 8bcc55f14f2..52937a2116a 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -4,6 +4,8 @@ selector: roots: - jstests/sharding/*.js exclude_files: + # SERVER-30899 We changed the meaning of 'fromRouter' - impacting sorts on metadata. + - jstests/sharding/agg_sort.js # Doesn't use ShardingTest so won't actually be run in a mixed version configuration - jstests/sharding/config_version_rollback.js # Behavior change to addShard diff --git a/jstests/aggregation/bugs/server6118.js b/jstests/aggregation/bugs/server6118.js deleted file mode 100644 index af287661e42..00000000000 --- a/jstests/aggregation/bugs/server6118.js +++ /dev/null @@ -1,39 +0,0 @@ -// SERVER-6118: support for sharded sorts -(function() { - 'use strict'; - - var s = new ShardingTest({shards: 2}); - - assert.commandWorked(s.s0.adminCommand({enablesharding: "test"})); - s.ensurePrimaryShard('test', 'shard0001'); - assert.commandWorked(s.s0.adminCommand({shardcollection: "test.data", key: {_id: 1}})); - - var d = s.getDB("test"); - - // Insert _id values 0 - 99 - var N = 100; - - var bulkOp = d.data.initializeOrderedBulkOp(); - for (var i = 0; i < N; ++i) { - bulkOp.insert({_id: i}); - } - bulkOp.execute(); - - // Split the data into 3 chunks - assert.commandWorked(s.s0.adminCommand({split: "test.data", middle: {_id: 33}})); - assert.commandWorked(s.s0.adminCommand({split: "test.data", middle: {_id: 66}})); - - // Migrate the middle chunk to another shard - assert.commandWorked(s.s0.adminCommand( - {movechunk: "test.data", find: {_id: 50}, to: s.getOther(s.getPrimaryShard("test")).name})); - - // Check that the results are in order. - var result = d.data.aggregate({$sort: {_id: 1}}).toArray(); - printjson(result); - - for (var i = 0; i < N; ++i) { - assert.eq(i, result[i]._id); - } - - s.stop(); -})(); diff --git a/jstests/aggregation/bugs/server6125.js b/jstests/aggregation/bugs/server6125.js index bfc4f471318..592a560312a 100644 --- a/jstests/aggregation/bugs/server6125.js +++ b/jstests/aggregation/bugs/server6125.js @@ -56,8 +56,8 @@ function setupArray() { // Symbol not implemented in JS {_id: 5, a: {}, ty: "Object"}, {_id: 6, a: new DBRef("test.s6125", ObjectId("0102030405060708090A0B0C")), ty: "DBRef"}, - {_id: 7, a: [], ty: "Empty Array"}, - {_id: 8, a: [1, 2, "a", "B"], ty: "Array"}, + {_id: 7, a: [[]], ty: "Empty Array"}, + {_id: 8, a: [[1, 2, "a", "B"]], ty: "Array"}, {_id: 9, a: BinData(0, "77+9"), ty: "BinData"}, {_id: 10, a: new ObjectId("0102030405060708090A0B0C"), ty: "ObjectId"}, {_id: 11, a: true, ty: "Boolean"}, diff --git a/jstests/aggregation/bugs/sort_arrays.js b/jstests/aggregation/bugs/sort_arrays.js new file mode 100644 index 00000000000..47c27736b76 --- /dev/null +++ b/jstests/aggregation/bugs/sort_arrays.js @@ -0,0 +1,16 @@ +// Tests that sorting by a field that contains an array will sort by the minimum element in that +// array. +(function() { + "use strict"; + const coll = db.foo; + coll.drop(); + assert.writeOK(coll.insert([{_id: 2, a: [2, 3]}, {_id: 3, a: [2, 4]}, {_id: 4, a: [2, 1]}])); + const expectedOrder = [{_id: 4, a: [2, 1]}, {_id: 2, a: [2, 3]}, {_id: 3, a: [2, 4]}]; + + assert.eq(coll.aggregate([{$sort: {a: 1}}]).toArray(), expectedOrder); + assert.eq(coll.find().sort({a: 1}).toArray(), expectedOrder); + + assert.commandWorked(coll.ensureIndex({a: 1})); + assert.eq(coll.aggregate([{$sort: {a: 1}}]).toArray(), expectedOrder); + assert.eq(coll.find().sort({a: 1}).toArray(), expectedOrder); +}()); diff --git a/jstests/aggregation/mongos_merge.js b/jstests/aggregation/mongos_merge.js index fe13b337930..fd49ef4c7e5 100644 --- a/jstests/aggregation/mongos_merge.js +++ b/jstests/aggregation/mongos_merge.js @@ -229,5 +229,22 @@ expectedCount: 100 }); + // Test that a pipeline whose merging half can be run on mongos using only the mongos execution + // machinery returns the correct results. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_all_mongos_runnable_skip_and_limit_stages", + pipeline: [ + {$match: {_id: {$gte: -5, $lte: 100}}}, + {$sort: {_id: -1}}, + {$skip: 5}, + {$limit: 10}, + {$skip: 5}, + {$limit: 1}, + ], + batchSize: 10, + expectedCount: 1 + }); + // TODO SERVER-30882 Find a way to assert that all stages get absorbed by mongos. + st.stop(); -})(); \ No newline at end of file +})(); diff --git a/jstests/aggregation/use_query_project_and_sort.js b/jstests/aggregation/use_query_project_and_sort.js new file mode 100644 index 00000000000..b5afc09a7cc --- /dev/null +++ b/jstests/aggregation/use_query_project_and_sort.js @@ -0,0 +1,72 @@ +// Tests that an aggregation that only needs a finite set of fields to do its computations, and has +// a $sort stage near the front of the pipeline can use the query system to provide a covered plan +// which only returns those fields, in the desired order, without fetching the full document. +// +// Relies on the ability to push leading $sorts down to the query system, so cannot wrap pipelines +// in $facet stages: +// @tags: [do_not_wrap_aggregations_in_facets] +(function() { + "use strict"; + + load("jstests/libs/analyze_plan.js"); // For 'aggPlanHasStage' and other explain helpers. + + const coll = db.use_query_project_and_sort; + coll.drop(); + + const bulk = coll.initializeUnorderedBulkOp(); + for (let i = 0; i < 100; ++i) { + bulk.insert({_id: i, x: "string", a: -i, y: i % 2}); + } + assert.writeOK(bulk.execute()); + + function assertQueryCoversProjectionAndSort(pipeline) { + const explainOutput = coll.explain().aggregate(pipeline); + assert(!aggPlanHasStage(explainOutput, "FETCH"), + "Expected pipeline " + tojsononeline(pipeline) + + " *not* to include a FETCH stage in the explain output: " + + tojson(explainOutput)); + assert(!aggPlanHasStage(explainOutput, "SORT"), + "Expected pipeline " + tojsononeline(pipeline) + + " *not* to include a SORT stage in the explain output: " + + tojson(explainOutput)); + assert(!aggPlanHasStage(explainOutput, "$sort"), + "Expected pipeline " + tojsononeline(pipeline) + + " *not* to include a SORT stage in the explain output: " + + tojson(explainOutput)); + assert(aggPlanHasStage(explainOutput, "IXSCAN"), + "Expected pipeline " + tojsononeline(pipeline) + + " to include an index scan in the explain output: " + tojson(explainOutput)); + assert(!hasRejectedPlans(explainOutput), + "Expected pipeline " + tojsononeline(pipeline) + + " not to have any rejected plans in the explain output: " + + tojson(explainOutput)); + return explainOutput; + } + + assert.commandWorked(coll.createIndex({x: 1, a: -1, _id: 1})); + + // Test that a pipeline requiring a subset of the fields in a compound index can use that index + // to cover the query. + assertQueryCoversProjectionAndSort( + [{$match: {x: "string"}}, {$sort: {x: 1}}, {$project: {_id: 0, x: 1}}]); + assertQueryCoversProjectionAndSort( + [{$match: {x: "string"}}, {$sort: {x: 1}}, {$project: {_id: 1, x: 1}}]); + assertQueryCoversProjectionAndSort( + [{$match: {x: "string"}}, {$sort: {x: -1, a: 1}}, {$project: {_id: 1, x: 1}}]); + assertQueryCoversProjectionAndSort( + [{$match: {x: "string"}}, {$sort: {x: 1, a: -1, _id: 1}}, {$project: {_id: 1}}]); + assertQueryCoversProjectionAndSort( + [{$match: {x: "string"}}, {$sort: {x: 1, a: -1, _id: 1}}, {$project: {_id: 1, x: 1}}]); + assertQueryCoversProjectionAndSort( + [{$match: {x: "string"}}, {$sort: {x: 1, a: -1, _id: 1}}, {$project: {_id: 1, a: 1}}]); + assertQueryCoversProjectionAndSort([ + {$match: {x: "string"}}, + {$sort: {x: 1, a: -1, _id: 1}}, + {$project: {_id: 0, a: 1, x: 1}} + ]); + assertQueryCoversProjectionAndSort([ + {$match: {x: "string"}}, + {$sort: {x: 1, a: -1, _id: 1}}, + {$project: {_id: 1, x: 1, a: 1}} + ]); +}()); diff --git a/jstests/aggregation/use_query_projection.js b/jstests/aggregation/use_query_projection.js new file mode 100644 index 00000000000..cf239a7e5d7 --- /dev/null +++ b/jstests/aggregation/use_query_projection.js @@ -0,0 +1,80 @@ +// Tests that an aggregation that only needs a finite set of fields to do its computations can +// sometimes use the query system to provide a covered plan which only returns those fields without +// fetching the full document. +// +// Relies on the initial $match being pushed into the query system in order for the planner to +// consider an index scan, so the pipelines cannot be wrapped in facet stages. +// @tags: [do_not_wrap_aggregations_in_facets] +(function() { + "use strict"; + + load("jstests/libs/analyze_plan.js"); // For 'aggPlanHasStage' and other explain helpers. + + const coll = db.use_query_projection; + coll.drop(); + + const bulk = coll.initializeUnorderedBulkOp(); + for (let i = 0; i < 100; ++i) { + bulk.insert({_id: i, x: "string", a: -i, y: i % 2}); + } + assert.writeOK(bulk.execute()); + + function assertQueryCoversProjection(pipeline) { + const explainOutput = coll.explain().aggregate(pipeline); + assert(!aggPlanHasStage(explainOutput, "FETCH"), + "Expected pipeline " + tojsononeline(pipeline) + + " *not* to include a FETCH stage in the explain output: " + + tojson(explainOutput)); + assert(aggPlanHasStage(explainOutput, "IXSCAN"), + "Expected pipeline " + tojsononeline(pipeline) + + " to include an index scan in the explain output: " + tojson(explainOutput)); + assert(!hasRejectedPlans(explainOutput), + "Expected pipeline " + tojsononeline(pipeline) + + " not to have any rejected plans in the explain output: " + + tojson(explainOutput)); + return explainOutput; + } + + function assertQueryDoesNotCoverProjection(pipeline) { + const explainOutput = coll.explain().aggregate(pipeline); + assert(aggPlanHasStage(explainOutput, "FETCH") || aggPlanHasStage("COLLSCAN"), + "Expected pipeline " + tojsononeline(pipeline) + + " to include a FETCH or COLLSCAN stage in the explain output: " + + tojson(explainOutput)); + assert(!hasRejectedPlans(explainOutput), + "Expected pipeline " + tojsononeline(pipeline) + + " not to have any rejected plans in the explain output: " + + tojson(explainOutput)); + return explainOutput; + } + + assert.commandWorked(coll.createIndex({x: 1, a: -1, _id: 1})); + + // Test that a pipeline requiring a subset of the fields in a compound index can use that index + // to cover the query. + assertQueryCoversProjection([{$match: {x: "string"}}, {$project: {_id: 1, x: 1}}]); + assertQueryCoversProjection([{$match: {x: "string"}}, {$project: {_id: 0, x: 1}}]); + assertQueryCoversProjection([{$match: {x: "string"}}, {$project: {_id: 0, x: 1, a: 1}}]); + assertQueryCoversProjection([{$match: {x: "string"}}, {$project: {_id: 1, x: 1, a: 1}}]); + assertQueryCoversProjection( + [{$match: {_id: 0, x: "string"}}, {$project: {_id: 1, x: 1, a: 1}}]); + + // Test that a pipeline requiring a field that is not in the index cannot use a covered plan. + assertQueryDoesNotCoverProjection([{$match: {x: "string"}}, {$project: {notThere: 1}}]); + + // Test that a covered plan is the only plan considered, even if another plan would be equally + // selective. Add an equally selective index, then rely on assertQueryCoversProjection() to + // assert that there is only one considered plan, and it is a covered plan. + assert.commandWorked(coll.createIndex({x: 1})); + assertQueryCoversProjection([ + {$match: {_id: 0, x: "string"}}, + {$sort: {x: 1, a: 1}}, // Note: not indexable, but doesn't add any additional dependencies. + {$project: {_id: 1, x: 1, a: 1}}, + ]); + + // Test that a multikey index will prevent a covered plan. + assert.commandWorked(coll.dropIndex({x: 1})); // Make sure there is only one plan considered. + assert.writeOK(coll.insert({x: ["an", "array!"]})); + assertQueryDoesNotCoverProjection([{$match: {x: "string"}}, {$project: {_id: 1, x: 1}}]); + assertQueryDoesNotCoverProjection([{$match: {x: "string"}}, {$project: {_id: 1, x: 1, a: 1}}]); +}()); diff --git a/jstests/aggregation/use_query_sort.js b/jstests/aggregation/use_query_sort.js new file mode 100644 index 00000000000..7d0e1b74f36 --- /dev/null +++ b/jstests/aggregation/use_query_sort.js @@ -0,0 +1,86 @@ +// Tests that an aggregation with a $sort near the front of the pipeline can sometimes use the query +// system to provide the sort. +// +// Relies on the ability to push leading $sorts down to the query system, so cannot wrap pipelines +// in $facet stages: +// @tags: [do_not_wrap_aggregations_in_facets] +(function() { + "use strict"; + + load("jstests/libs/analyze_plan.js"); // For 'aggPlanHasStage' and other explain helpers. + + const coll = db.use_query_sort; + coll.drop(); + + const bulk = coll.initializeUnorderedBulkOp(); + for (let i = 0; i < 100; ++i) { + bulk.insert({_id: i, x: "string", a: -i, y: i % 2}); + } + assert.writeOK(bulk.execute()); + + function assertHasNonBlockingQuerySort(pipeline) { + const explainOutput = coll.explain().aggregate(pipeline); + assert(!aggPlanHasStage(explainOutput, "$sort"), + "Expected pipeline " + tojsononeline(pipeline) + + " *not* to include a $sort stage in the explain output: " + + tojson(explainOutput)); + assert(!aggPlanHasStage(explainOutput, "SORT"), + "Expected pipeline " + tojsononeline(pipeline) + + " *not* to include a SORT stage in the explain output: " + + tojson(explainOutput)); + assert(aggPlanHasStage(explainOutput, "IXSCAN"), + "Expected pipeline " + tojsononeline(pipeline) + + " to include an index scan in the explain output: " + tojson(explainOutput)); + assert(!hasRejectedPlans(explainOutput), + "Expected pipeline " + tojsononeline(pipeline) + + " not to have any rejected plans in the explain output: " + + tojson(explainOutput)); + return explainOutput; + } + + function assertDoesNotHaveQuerySort(pipeline) { + const explainOutput = coll.explain().aggregate(pipeline); + assert(aggPlanHasStage(explainOutput, "$sort"), + "Expected pipeline " + tojsononeline(pipeline) + + " to include a $sort stage in the explain output: " + tojson(explainOutput)); + assert(!aggPlanHasStage(explainOutput, "SORT"), + "Expected pipeline " + tojsononeline(pipeline) + + " *not* to include a SORT stage in the explain output: " + + tojson(explainOutput)); + assert(!hasRejectedPlans(explainOutput), + "Expected pipeline " + tojsononeline(pipeline) + + " not to have any rejected plans in the explain output: " + + tojson(explainOutput)); + return explainOutput; + } + + // Test that a sort on the _id can use the query system to provide the sort. + assertHasNonBlockingQuerySort([{$sort: {_id: -1}}]); + assertHasNonBlockingQuerySort([{$sort: {_id: 1}}]); + assertHasNonBlockingQuerySort([{$match: {_id: {$gte: 50}}}, {$sort: {_id: 1}}]); + assertHasNonBlockingQuerySort([{$match: {_id: {$gte: 50}}}, {$sort: {_id: -1}}]); + + // Test that a sort on a field not in any index cannot use a query system sort, and thus still + // has a $sort stage. + assertDoesNotHaveQuerySort([{$sort: {x: -1}}]); + assertDoesNotHaveQuerySort([{$sort: {x: 1}}]); + assertDoesNotHaveQuerySort([{$match: {_id: {$gte: 50}}}, {$sort: {x: 1}}]); + + assert.commandWorked(coll.createIndex({x: 1, y: -1})); + + assertHasNonBlockingQuerySort([{$sort: {x: 1, y: -1}}]); + assertHasNonBlockingQuerySort([{$sort: {x: 1}}]); + assertDoesNotHaveQuerySort([{$sort: {y: 1}}]); + assertDoesNotHaveQuerySort([{$sort: {x: 1, y: 1}}]); + + // Test that a $match on a field not present in the same index eligible to provide a sort can + // still result in a index scan on the sort field (SERVER-7568). + assertHasNonBlockingQuerySort([{$match: {_id: {$gte: 50}}}, {$sort: {x: 1}}]); + + // Test that a sort on the text score does not use the query system to provide the sort, since + // it would need to be a blocking sort, and we prefer the $sort stage to the query system's sort + // implementation. + assert.commandWorked(coll.createIndex({x: "text"})); + assertDoesNotHaveQuerySort( + [{$match: {$text: {$search: "test"}}}, {$sort: {key: {$meta: "textScore"}}}]); +}()); diff --git a/jstests/libs/analyze_plan.js b/jstests/libs/analyze_plan.js index f08ccc330d7..1c8ca223274 100644 --- a/jstests/libs/analyze_plan.js +++ b/jstests/libs/analyze_plan.js @@ -55,6 +55,48 @@ function getPlanStage(root, stage) { } } +/** + * Given the root stage of explain's JSON representation of a query plan ('root'), returns true if + * the query planner reports at least one rejected alternative plan, and false otherwise. + */ +function hasRejectedPlans(root) { + function sectionHasRejectedPlans(explainSection) { + assert(explainSection.hasOwnProperty("rejectedPlans"), tojson(explainSection)); + return explainSection.rejectedPlans.length !== 0; + } + + function cursorStageHasRejectedPlans(cursorStage) { + assert(cursorStage.hasOwnProperty("$cursor"), tojson(cursorStage)); + assert(cursorStage.$cursor.hasOwnProperty("queryPlanner"), tojson(cursorStage)); + return sectionHasRejectedPlans(cursorStage.$cursor.queryPlanner); + } + + if (root.hasOwnProperty("shards")) { + // This is a sharded agg explain. + const cursorStages = getAggPlanStages(root, "$cursor"); + assert(cursorStages.length !== 0, "Did not find any $cursor stages in sharded agg explain"); + return cursorStages.find((cursorStage) => cursorStageHasRejectedPlans(cursorStage)) !== + undefined; + } else if (root.hasOwnProperty("stages")) { + // This is an agg explain. + const cursorStages = getAggPlanStages(root, "$cursor"); + return cursorStages.find((cursorStage) => cursorStageHasRejectedPlans(cursorStage)) !== + undefined; + } else { + // This is some sort of query explain. + assert(root.hasOwnProperty("queryPlanner"), tojson(root)); + assert(root.queryPlanner.hasOwnProperty("winningPlan"), tojson(root)); + if (!root.queryPlanner.winningPlan.hasOwnProperty("shards")) { + // This is an unsharded explain. + return sectionHasRejectedPlans(root.queryPlanner); + } + // This is a sharded explain. Each entry in the shards array contains a 'winningPlan' and + // 'rejectedPlans'. + return root.queryPlanner.shards.find((shard) => sectionHasRejectedPlans(shard)) !== + undefined; + } +} + /** * Given the root stage of agg explain's JSON representation of a query plan ('root'), returns all * subdocuments whose stage is 'stage'. This can either be an agg stage name like "$cursor" or diff --git a/jstests/sharding/agg_sort.js b/jstests/sharding/agg_sort.js new file mode 100644 index 00000000000..bd12565dd20 --- /dev/null +++ b/jstests/sharding/agg_sort.js @@ -0,0 +1,225 @@ +// Tests that the sort order is obeyed when an aggregation requests sorted results that are +// scattered across multiple shards. +(function() { + 'use strict'; + + const shardingTest = new ShardingTest({shards: 2}); + + const db = shardingTest.getDB("test"); + const coll = db.sharded_agg_sort; + coll.drop(); + + assert.commandWorked(shardingTest.s0.adminCommand({enableSharding: db.getName()})); + shardingTest.ensurePrimaryShard(db.getName(), 'shard0001'); + assert.commandWorked( + shardingTest.s0.adminCommand({shardCollection: coll.getFullName(), key: {_id: 1}})); + + const nDocs = 10; + const yValues = [ + "abc", + "ABC", + null, + 1, + NumberLong(2), + NumberDecimal(-20), + MinKey, + MaxKey, + BinData(0, ""), + [3, 4], + ]; + const bulkOp = coll.initializeOrderedBulkOp(); + for (var i = 0; i < nDocs; ++i) { + bulkOp.insert({_id: i, x: Math.floor(i / 2), y: yValues[i]}); + } + assert.writeOK(bulkOp.execute()); + + // Split the data into 3 chunks + assert.commandWorked( + shardingTest.s0.adminCommand({split: coll.getFullName(), middle: {_id: 3}})); + assert.commandWorked( + shardingTest.s0.adminCommand({split: coll.getFullName(), middle: {_id: 6}})); + + // Migrate the middle chunk to another shard + assert.commandWorked(shardingTest.s0.adminCommand({ + movechunk: coll.getFullName(), + find: {_id: 5}, + to: shardingTest.getOther(shardingTest.getPrimaryShard(db.getName())).name + })); + + function assertResultsEqual({actual, expected}) { + const resultsAsString = " actual: " + tojson(actual) + "\n expected: " + tojson(expected); + assert.eq( + actual.length, expected.length, `different number of results:\n" + ${resultsAsString}`); + for (let i = 0; i < actual.length; i++) { + assert.eq( + actual[i], expected[i], `different results at index ${i}:\n${resultsAsString}`); + } + } + + function testSorts() { + // Test a basic sort by _id. + assertResultsEqual({ + actual: coll.aggregate([{$sort: {_id: 1}}]).toArray(), + expected: [ + {_id: 0, x: 0, y: "abc"}, + {_id: 1, x: 0, y: "ABC"}, + {_id: 2, x: 1, y: null}, + {_id: 3, x: 1, y: 1}, + {_id: 4, x: 2, y: NumberLong(2)}, + {_id: 5, x: 2, y: NumberDecimal(-20)}, + {_id: 6, x: 3, y: MinKey}, + {_id: 7, x: 3, y: MaxKey}, + {_id: 8, x: 4, y: BinData(0, "")}, + {_id: 9, x: 4, y: [3, 4]}, + ], + }); + assertResultsEqual({ + actual: coll.aggregate([{$sort: {_id: 1}}, {$project: {_id: 1}}]).toArray(), + expected: new Array(nDocs).fill().map(function(_, index) { + return {_id: index}; + }), + }); + + // Test a compound sort. + assertResultsEqual({ + actual: coll.aggregate([{$sort: {x: 1, y: 1}}]).toArray(), + expected: [ + {_id: 1, x: 0, y: "ABC"}, + {_id: 0, x: 0, y: "abc"}, + {_id: 2, x: 1, y: null}, + {_id: 3, x: 1, y: 1}, + {_id: 5, x: 2, y: NumberDecimal(-20)}, + {_id: 4, x: 2, y: NumberLong(2)}, + {_id: 6, x: 3, y: MinKey}, + {_id: 7, x: 3, y: MaxKey}, + {_id: 9, x: 4, y: [3, 4]}, + {_id: 8, x: 4, y: BinData(0, "")}, + ], + }); + assertResultsEqual({ + actual: + coll.aggregate([{$sort: {x: 1, y: 1}}, {$project: {_id: 0, x: 1, y: 1}}]).toArray(), + expected: [ + {x: 0, y: "ABC"}, + {x: 0, y: "abc"}, + {x: 1, y: null}, + {x: 1, y: 1}, + {x: 2, y: NumberDecimal(-20)}, + {x: 2, y: NumberLong(2)}, + {x: 3, y: MinKey}, + {x: 3, y: MaxKey}, + {x: 4, y: [3, 4]}, + {x: 4, y: BinData(0, "")}, + ], + }); + + // Test a compound sort with a missing field. + assertResultsEqual({ + actual: coll.aggregate({$sort: {missing: -1, x: 1, _id: -1}}).toArray(), + expected: [ + {_id: 1, x: 0, y: "ABC"}, + {_id: 0, x: 0, y: "abc"}, + {_id: 3, x: 1, y: 1}, + {_id: 2, x: 1, y: null}, + {_id: 5, x: 2, y: NumberDecimal(-20)}, + {_id: 4, x: 2, y: NumberLong(2)}, + {_id: 7, x: 3, y: MaxKey}, + {_id: 6, x: 3, y: MinKey}, + {_id: 9, x: 4, y: [3, 4]}, + {_id: 8, x: 4, y: BinData(0, "")}, + ] + }); + } + testSorts(); + assert.commandWorked(coll.createIndex({x: 1})); + testSorts(); + assert.commandWorked(coll.createIndex({x: 1, y: 1})); + testSorts(); + assert.commandWorked(coll.createIndex({missing: 1, x: -1})); + testSorts(); + assert.commandWorked(coll.createIndex({missing: -1, x: 1, _id: -1})); + testSorts(); + + // Test that a sort including the text score is merged properly in a sharded cluster. + const textColl = db.sharded_agg_sort_text; + + assert.commandWorked( + shardingTest.s0.adminCommand({shardCollection: textColl.getFullName(), key: {_id: 1}})); + + assert.writeOK(textColl.insert([ + {_id: 0, text: "apple"}, + {_id: 1, text: "apple orange banana apple"}, + {_id: 2, text: "apple orange"}, + {_id: 3, text: "apple orange banana apple apple banana"}, + {_id: 4, text: "apple orange banana"}, + {_id: 5, text: "apple orange banana apple apple"}, + ])); + + // Split the data into 3 chunks + assert.commandWorked( + shardingTest.s0.adminCommand({split: textColl.getFullName(), middle: {_id: 2}})); + assert.commandWorked( + shardingTest.s0.adminCommand({split: textColl.getFullName(), middle: {_id: 4}})); + + // Migrate the middle chunk to another shard + assert.commandWorked(shardingTest.s0.adminCommand({ + movechunk: textColl.getFullName(), + find: {_id: 3}, + to: shardingTest.getOther(shardingTest.getPrimaryShard(db.getName())).name + })); + + assert.commandWorked(textColl.createIndex({text: "text"})); + assertResultsEqual({ + actual: textColl + .aggregate([ + {$match: {$text: {$search: "apple banana orange"}}}, + {$sort: {x: {$meta: "textScore"}}} + ]) + .toArray(), + expected: [ + {_id: 3, text: "apple orange banana apple apple banana"}, + {_id: 5, text: "apple orange banana apple apple"}, + {_id: 1, text: "apple orange banana apple"}, + {_id: 4, text: "apple orange banana"}, + {_id: 2, text: "apple orange"}, + {_id: 0, text: "apple"}, + ], + }); + + function assertSortedByMetaField(results) { + for (let i = 0; i < results.length - 1; ++i) { + assert(results[i].hasOwnProperty("meta"), + `Expected all results to have "meta" field, found one without it at index ${i}`); + assert.gte( + results[i].meta, + results[i + 1].meta, + `Expected results to be sorted by "meta" field, descending. Detected unsorted` + + ` results at index ${i}, entire result set: ${tojson(results)}`); + } + } + + assertSortedByMetaField(textColl + .aggregate([ + {$match: {$text: {$search: "apple banana orange"}}}, + {$sort: {x: {$meta: "textScore"}}}, + {$project: {_id: 0, meta: {$meta: "textScore"}}}, + ]) + .toArray()); + + assertSortedByMetaField(textColl + .aggregate([ + {$match: {$text: {$search: "apple banana orange"}}}, + {$project: {_id: 0, meta: {$meta: "textScore"}}}, + {$sort: {meta: -1}}, + ]) + .toArray()); + + assertSortedByMetaField(textColl + .aggregate([ + {$sample: {size: 10}}, + {$project: {_id: 0, meta: {$meta: "randVal"}}}, + ]) + .toArray()); + + shardingTest.stop(); +})(); diff --git a/src/mongo/db/pipeline/dependencies.cpp b/src/mongo/db/pipeline/dependencies.cpp index bb6b7175ae5..a1fc3b88a19 100644 --- a/src/mongo/db/pipeline/dependencies.cpp +++ b/src/mongo/db/pipeline/dependencies.cpp @@ -41,34 +41,40 @@ using std::vector; namespace str = mongoutils::str; -BSONObj DepsTracker::toProjection() const { - if (fields.empty() && !needWholeDocument) { - if (_needTextScore) { - // We only need the text score, but there is no easy way to express this in the query - // projection language. We use $noFieldsNeeded with a textScore meta-projection since - // this is an inclusion projection which will exclude all existing fields but add the - // textScore metadata. - return BSON("_id" << 0 << "$noFieldsNeeded" << 1 << Document::metaFieldTextScore - << BSON("$meta" - << "textScore")); - } else { - // We truly need no information (we are doing a count or something similar). In this - // case, the DocumentSourceCursor will know there aren't any dependencies, and we can - // ignore the documents returned from the query system. We pass an empty object as the - // projection so that we have a chance of using the COUNT_SCAN optimization. - return BSONObj(); - } +bool DepsTracker::_appendMetaProjections(BSONObjBuilder* projectionBuilder) const { + if (_needTextScore) { + projectionBuilder->append(Document::metaFieldTextScore, + BSON("$meta" + << "textScore")); } + if (_needSortKey) { + projectionBuilder->append(Document::metaFieldSortKey, + BSON("$meta" + << "sortKey")); + } + return (_needTextScore || _needSortKey); +} +BSONObj DepsTracker::toProjection() const { BSONObjBuilder bb; - if (_needTextScore) - bb.append(Document::metaFieldTextScore, - BSON("$meta" - << "textScore")); + const bool needsMetadata = _appendMetaProjections(&bb); - if (needWholeDocument) + if (needWholeDocument) { return bb.obj(); + } + + if (fields.empty()) { + if (needsMetadata) { + // We only need metadata, but there is no easy way to express this in the query + // projection language. We use $noFieldsNeeded with a meta-projection since this is an + // inclusion projection which will exclude all existing fields but add the metadata. + bb.append("_id", 0); + bb.append("$noFieldsNeeded", 1); + } + // We either need nothing (as we would if this was logically a count), or only the metadata. + return bb.obj(); + } bool needId = false; string last; diff --git a/src/mongo/db/pipeline/dependencies.h b/src/mongo/db/pipeline/dependencies.h index 9afd9adbaec..19cf3ee33f8 100644 --- a/src/mongo/db/pipeline/dependencies.h +++ b/src/mongo/db/pipeline/dependencies.h @@ -47,7 +47,7 @@ struct DepsTracker { enum MetadataAvailable { kNoMetadata = 0, kTextScore = 1 }; DepsTracker(MetadataAvailable metadataAvailable = kNoMetadata) - : needWholeDocument(false), _metadataAvailable(metadataAvailable), _needTextScore(false) {} + : _metadataAvailable(metadataAvailable) {} /** * Returns a projection object covering the dependencies tracked by this class. @@ -56,10 +56,6 @@ struct DepsTracker { boost::optional toParsedDeps() const; - std::set fields; // names of needed fields in dotted notation - bool needWholeDocument; // if true, ignore fields and assume the whole document is needed - - bool hasNoRequirements() const { return fields.empty() && !needWholeDocument && !_needTextScore; } @@ -85,9 +81,29 @@ struct DepsTracker { _needTextScore = needTextScore; } + bool getNeedSortKey() const { + return _needSortKey; + } + + void setNeedSortKey(bool needSortKey) { + // We don't expect to ever unset '_needSortKey'. + invariant(!_needSortKey || needSortKey); + _needSortKey = needSortKey; + } + + std::set fields; // The names of needed fields in dotted notation. + bool needWholeDocument = false; // If true, ignore 'fields' and assume the whole document is + // needed. private: + /** + * Appends the meta projections for the sort key and/or text score to 'bb' if necessary. Returns + * true if either type of metadata was needed, and false otherwise. + */ + bool _appendMetaProjections(BSONObjBuilder* bb) const; + MetadataAvailable _metadataAvailable; - bool _needTextScore; + bool _needTextScore = false; // if true, add a {$meta: "textScore"} to the projection. + bool _needSortKey = false; // if true, add a {$meta: "sortKey"} to the projection. }; /** diff --git a/src/mongo/db/pipeline/dependencies_test.cpp b/src/mongo/db/pipeline/dependencies_test.cpp index 5bfd008bbfe..cb33e1fcc9f 100644 --- a/src/mongo/db/pipeline/dependencies_test.cpp +++ b/src/mongo/db/pipeline/dependencies_test.cpp @@ -145,8 +145,9 @@ TEST(DependenciesToProjectionTest, ShouldAttemptToExcludeOtherFieldsIfOnlyTextSc deps.needWholeDocument = false; deps.setNeedTextScore(true); ASSERT_BSONOBJ_EQ(deps.toProjection(), - BSON("_id" << 0 << "$noFieldsNeeded" << 1 << Document::metaFieldTextScore - << metaTextScore)); + BSON(Document::metaFieldTextScore << metaTextScore << "_id" << 0 + << "$noFieldsNeeded" + << 1)); } TEST(DependenciesToProjectionTest, diff --git a/src/mongo/db/pipeline/document.cpp b/src/mongo/db/pipeline/document.cpp index 835ae1bc716..4f531fe72f3 100644 --- a/src/mongo/db/pipeline/document.cpp +++ b/src/mongo/db/pipeline/document.cpp @@ -45,6 +45,9 @@ using std::vector; const DocumentStorage DocumentStorage::kEmptyDoc; +const std::vector Document::allMetadataFieldNames = { + Document::metaFieldTextScore, Document::metaFieldRandVal, Document::metaFieldSortKey}; + Position DocumentStorage::findField(StringData requested) const { int reqSize = requested.size(); // get size calculation out of the way if needed @@ -201,6 +204,7 @@ intrusive_ptr DocumentStorage::clone() const { out->_metaFields = _metaFields; out->_textScore = _textScore; out->_randVal = _randVal; + out->_sortKey = _sortKey.getOwned(); // Tell values that they have been memcpyed (updates ref counts) for (DocumentStorageIterator it = out->iteratorAll(); !it.atEnd(); it.advance()) { @@ -265,8 +269,9 @@ BSONObj Document::toBson() const { return bb.obj(); } -const StringData Document::metaFieldTextScore("$textScore"_sd); -const StringData Document::metaFieldRandVal("$randVal"_sd); +constexpr StringData Document::metaFieldTextScore; +constexpr StringData Document::metaFieldRandVal; +constexpr StringData Document::metaFieldSortKey; BSONObj Document::toBsonWithMetaData() const { BSONObjBuilder bb; @@ -275,6 +280,8 @@ BSONObj Document::toBsonWithMetaData() const { bb.append(metaFieldTextScore, getTextScore()); if (hasRandMetaField()) bb.append(metaFieldRandVal, getRandMetaField()); + if (hasSortKeyMetaField()) + bb.append(metaFieldSortKey, getSortKeyMetaField()); return bb.obj(); } @@ -292,6 +299,9 @@ Document Document::fromBsonWithMetaData(const BSONObj& bson) { } else if (fieldName == metaFieldRandVal) { md.setRandMetaField(elem.Double()); continue; + } else if (fieldName == metaFieldSortKey) { + md.setSortKeyMetaField(elem.Obj()); + continue; } } @@ -465,6 +475,10 @@ void Document::serializeForSorter(BufBuilder& buf) const { buf.appendNum(char(DocumentStorage::MetaType::RAND_VAL + 1)); buf.appendNum(getRandMetaField()); } + if (hasSortKeyMetaField()) { + buf.appendNum(char(DocumentStorage::MetaType::SORT_KEY + 1)); + getSortKeyMetaField().appendSelfToBufBuilder(buf); + } buf.appendNum(char(0)); } @@ -481,6 +495,9 @@ Document Document::deserializeForSorter(BufReader& buf, const SorterDeserializeS doc.setTextScore(buf.read>()); } else if (marker == char(DocumentStorage::MetaType::RAND_VAL) + 1) { doc.setRandMetaField(buf.read>()); + } else if (marker == char(DocumentStorage::MetaType::SORT_KEY) + 1) { + doc.setSortKeyMetaField( + BSONObj::deserializeForSorter(buf, BSONObj::SorterDeserializeSettings())); } else { uasserted(28744, "Unrecognized marker, unable to deserialize buffer"); } diff --git a/src/mongo/db/pipeline/document.h b/src/mongo/db/pipeline/document.h index 5e5980f5a51..a8ebfd1c4d2 100644 --- a/src/mongo/db/pipeline/document.h +++ b/src/mongo/db/pipeline/document.h @@ -90,6 +90,12 @@ public: const Document& rhs; }; + static constexpr StringData metaFieldTextScore = "$textScore"_sd; + static constexpr StringData metaFieldRandVal = "$randVal"_sd; + static constexpr StringData metaFieldSortKey = "$sortKey"_sd; + + static const std::vector allMetadataFieldNames; + /// Empty Document (does no allocation) Document() {} @@ -211,6 +217,12 @@ public: */ static Document fromBsonWithMetaData(const BSONObj& bson); + /** + * Given a BSON object that may have metadata fields added as part of toBsonWithMetadata(), + * returns the same object without any of the metadata fields. + */ + static BSONObj stripMetadataFields(const BSONObj& bsonWithMetadata); + // Support BSONObjBuilder and BSONArrayBuilder "stream" API friend BSONObjBuilder& operator<<(BSONObjBuilderValueStream& builder, const Document& d); @@ -233,7 +245,6 @@ public: return Document(storage().clone().get()); } - static const StringData metaFieldTextScore; // "$textScore" bool hasTextScore() const { return storage().hasTextScore(); } @@ -241,7 +252,6 @@ public: return storage().getTextScore(); } - static const StringData metaFieldRandVal; // "$randVal" bool hasRandMetaField() const { return storage().hasRandMetaField(); } @@ -249,6 +259,13 @@ public: return storage().getRandMetaField(); } + bool hasSortKeyMetaField() const { + return storage().hasSortKeyMetaField(); + } + BSONObj getSortKeyMetaField() const { + return storage().getSortKeyMetaField(); + } + /// members for Sorter struct SorterDeserializeSettings {}; // unused void serializeForSorter(BufBuilder& buf) const; @@ -493,6 +510,10 @@ public: storage().setRandMetaField(val); } + void setSortKeyMetaField(BSONObj sortKey) { + storage().setSortKeyMetaField(sortKey); + } + /** Convert to a read-only document and release reference. * * Call this to indicate that you are done with this Document and will diff --git a/src/mongo/db/pipeline/document_internal.h b/src/mongo/db/pipeline/document_internal.h index 2fd3c78cf34..baa68e60658 100644 --- a/src/mongo/db/pipeline/document_internal.h +++ b/src/mongo/db/pipeline/document_internal.h @@ -196,6 +196,7 @@ public: enum MetaType : char { TEXT_SCORE, RAND_VAL, + SORT_KEY, NUM_FIELDS }; @@ -280,6 +281,9 @@ public: if (source.hasRandMetaField()) { setRandMetaField(source.getRandMetaField()); } + if (source.hasSortKeyMetaField()) { + setSortKeyMetaField(source.getSortKeyMetaField()); + } } bool hasTextScore() const { @@ -304,6 +308,17 @@ public: _randVal = val; } + bool hasSortKeyMetaField() const { + return _metaFields.test(MetaType::SORT_KEY); + } + BSONObj getSortKeyMetaField() const { + return _sortKey; + } + void setSortKeyMetaField(BSONObj sortKey) { + _metaFields.set(MetaType::SORT_KEY); + _sortKey = sortKey.getOwned(); + } + private: /// Same as lastElement->next() or firstElement() if empty. const ValueElement* end() const { @@ -385,6 +400,7 @@ private: std::bitset _metaFields; double _textScore; double _randVal; + BSONObj _sortKey; // When adding a field, make sure to update clone() method // Defined in document.cpp diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 134a8a86e3d..b0793113863 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -449,7 +449,7 @@ public: EXHAUSTIVE_FIELDS = 0x2, // Later stages won't need more metadata from input. For example, a $group stage will group - // documents together, discarding their text score. + // documents together, discarding their text score and sort keys. EXHAUSTIVE_META = 0x4, // Later stages won't need either fields or metadata. diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp index 704ba907165..8256d055805 100644 --- a/src/mongo/db/pipeline/document_source_limit.cpp +++ b/src/mongo/db/pipeline/document_source_limit.cpp @@ -49,9 +49,7 @@ REGISTER_DOCUMENT_SOURCE(limit, LiteParsedDocumentSourceDefault::parse, DocumentSourceLimit::createFromBson); -const char* DocumentSourceLimit::getSourceName() const { - return "$limit"; -} +constexpr StringData DocumentSourceLimit::kStageName; Pipeline::SourceContainer::iterator DocumentSourceLimit::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index f5f93fff1a4..7bf47638b21 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -34,9 +34,25 @@ namespace mongo { class DocumentSourceLimit final : public DocumentSource, public SplittableDocumentSource { public: - // virtuals from DocumentSource + static constexpr StringData kStageName = "$limit"_sd; + + /** + * Create a new $limit stage. + */ + static boost::intrusive_ptr create( + const boost::intrusive_ptr& pExpCtx, long long limit); + + /** + * Parse a $limit stage from a BSON stage specification. 'elem's field name must be "$limit". + */ + static boost::intrusive_ptr createFromBson( + BSONElement elem, const boost::intrusive_ptr& pExpCtx); + GetNextResult getNext() final; - const char* getSourceName() const final; + const char* getSourceName() const final { + return kStageName.rawData(); + } + BSONObjSet getOutputSorts() final { return pSource ? pSource->getOutputSorts() : SimpleBSONObjComparator::kInstance.makeBSONObjSet(); @@ -59,15 +75,6 @@ public: return SEE_NEXT; // This doesn't affect needed fields } - /** - Create a new limiting DocumentSource. - - @param pExpCtx the expression context for the pipeline - @returns the DocumentSource - */ - static boost::intrusive_ptr create( - const boost::intrusive_ptr& pExpCtx, long long limit); - /** * Returns the current DocumentSourceLimit for use in the shards pipeline. Running this stage on * the shards is an optimization, but is not strictly necessary in order to produce correct @@ -93,20 +100,6 @@ public: _limit = newLimit; } - /** - Create a limiting DocumentSource from BSON. - - This is a convenience method that uses the above, and operates on - a BSONElement that has been deteremined to be an Object with an - element named $limit. - - @param pBsonElement the BSONELement that defines the limit - @param pExpCtx the expression context - @returns the grouping DocumentSource - */ - static boost::intrusive_ptr createFromBson( - BSONElement elem, const boost::intrusive_ptr& pExpCtx); - private: DocumentSourceLimit(const boost::intrusive_ptr& pExpCtx, long long limit); diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp index 5493d21ecd1..17f8cc34e83 100644 --- a/src/mongo/db/pipeline/document_source_match.cpp +++ b/src/mongo/db/pipeline/document_source_match.cpp @@ -471,10 +471,11 @@ BSONObj DocumentSourceMatch::getQuery() const { DocumentSource::GetDepsReturn DocumentSourceMatch::getDependencies(DepsTracker* deps) const { if (isTextQuery()) { - // A $text aggregation field should return EXHAUSTIVE_ALL, since we don't necessarily know - // what field it will be searching without examining indices. + // A $text aggregation field should return EXHAUSTIVE_FIELDS, since we don't necessarily + // know what field it will be searching without examining indices. deps->needWholeDocument = true; - return EXHAUSTIVE_ALL; + deps->setNeedTextScore(true); + return EXHAUSTIVE_FIELDS; } addDependencies(deps); @@ -493,7 +494,11 @@ void DocumentSourceMatch::addDependencies(DepsTracker* deps) const { DocumentSourceMatch::DocumentSourceMatch(const BSONObj& query, const intrusive_ptr& pExpCtx) - : DocumentSource(pExpCtx), _predicate(query.getOwned()), _isTextQuery(isTextQuery(query)) { + : DocumentSource(pExpCtx), + _predicate(query.getOwned()), + _isTextQuery(isTextQuery(query)), + _dependencies(_isTextQuery ? DepsTracker::MetadataAvailable::kTextScore + : DepsTracker::MetadataAvailable::kNoMetadata) { StatusWithMatchExpression status = uassertStatusOK( MatchExpressionParser::parse(_predicate, pExpCtx->getCollator(), diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index 135dbc39e17..06219b22596 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -160,11 +160,11 @@ private: std::unique_ptr _expression; - // Cache the dependencies so that we know what fields we need to serialize to BSON for matching. - DepsTracker _dependencies; - BSONObj _predicate; const bool _isTextQuery; + + // Cache the dependencies so that we know what fields we need to serialize to BSON for matching. + DepsTracker _dependencies; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_match_test.cpp b/src/mongo/db/pipeline/document_source_match_test.cpp index c01ae31db9a..cbe9c522aeb 100644 --- a/src/mongo/db/pipeline/document_source_match_test.cpp +++ b/src/mongo/db/pipeline/document_source_match_test.cpp @@ -218,10 +218,10 @@ TEST_F(DocumentSourceMatchTest, ShouldAddDependenciesOfAllBranchesOfOrClause) { TEST_F(DocumentSourceMatchTest, TextSearchShouldRequireWholeDocumentAndTextScore) { auto match = DocumentSourceMatch::create(fromjson("{$text: {$search: 'hello'} }"), getExpCtx()); - DepsTracker dependencies; - ASSERT_EQUALS(DocumentSource::EXHAUSTIVE_ALL, match->getDependencies(&dependencies)); + DepsTracker dependencies(DepsTracker::MetadataAvailable::kTextScore); + ASSERT_EQUALS(DocumentSource::EXHAUSTIVE_FIELDS, match->getDependencies(&dependencies)); ASSERT_EQUALS(true, dependencies.needWholeDocument); - ASSERT_EQUALS(false, dependencies.getNeedTextScore()); + ASSERT_EQUALS(true, dependencies.getNeedTextScore()); } TEST_F(DocumentSourceMatchTest, ShouldOnlyAddOuterFieldAsDependencyOfImplicitEqualityPredicate) { diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp index d53c2183443..23d413d6b60 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp @@ -39,6 +39,8 @@ using std::make_pair; using std::string; using std::vector; +constexpr StringData DocumentSourceMergeCursors::kStageName; + DocumentSourceMergeCursors::DocumentSourceMergeCursors( std::vector cursorDescriptors, const intrusive_ptr& pExpCtx) @@ -48,10 +50,6 @@ REGISTER_DOCUMENT_SOURCE(mergeCursors, LiteParsedDocumentSourceDefault::parse, DocumentSourceMergeCursors::createFromBson); -const char* DocumentSourceMergeCursors::getSourceName() const { - return "$mergeCursors"; -} - intrusive_ptr DocumentSourceMergeCursors::create( std::vector cursorDescriptors, const intrusive_ptr& pExpCtx) { @@ -96,7 +94,7 @@ Value DocumentSourceMergeCursors::serialize( << "id" << _cursorDescriptors[i].cursorId))); } - return Value(DOC(getSourceName() << Value(cursors))); + return Value(DOC(kStageName << Value(cursors))); } DocumentSourceMergeCursors::CursorAndConnection::CursorAndConnection( diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index e83316fa6ea..63521e2c742 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -37,6 +37,8 @@ namespace mongo { class DocumentSourceMergeCursors : public DocumentSource { public: + static constexpr StringData kStageName = "$mergeCursors"_sd; + struct CursorDescriptor { CursorDescriptor(ConnectionString connectionString, std::string ns, CursorId cursorId) : connectionString(std::move(connectionString)), @@ -48,14 +50,17 @@ public: CursorId cursorId; }; - // virtuals from DocumentSource GetNextResult getNext() final; - const char* getSourceName() const final; + + const char* getSourceName() const final { + return kStageName.rawData(); + } + Value serialize(boost::optional explain = boost::none) const final; StageConstraints constraints() const final { StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + constraints.hostRequirement = HostTypeRequirement::kAnyShard; constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; constraints.isAllowedInsideFacetStage = false; diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp index c174d95d935..09e3d431493 100644 --- a/src/mongo/db/pipeline/document_source_sample.cpp +++ b/src/mongo/db/pipeline/document_source_sample.cpp @@ -40,6 +40,8 @@ namespace mongo { using boost::intrusive_ptr; +constexpr StringData DocumentSourceSample::kStageName; + DocumentSourceSample::DocumentSourceSample(const intrusive_ptr& pExpCtx) : DocumentSource(pExpCtx), _size(0) {} @@ -47,10 +49,6 @@ REGISTER_DOCUMENT_SOURCE(sample, LiteParsedDocumentSourceDefault::parse, DocumentSourceSample::createFromBson); -const char* DocumentSourceSample::getSourceName() const { - return "$sample"; -} - DocumentSource::GetNextResult DocumentSourceSample::getNext() { if (_size == 0) return GetNextResult::makeEOF(); @@ -84,7 +82,7 @@ DocumentSource::GetNextResult DocumentSourceSample::getNext() { } Value DocumentSourceSample::serialize(boost::optional explain) const { - return Value(DOC(getSourceName() << DOC("size" << _size))); + return Value(DOC(kStageName << DOC("size" << _size))); } namespace { diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h index 662c6a9a49d..07a85c77e33 100644 --- a/src/mongo/db/pipeline/document_source_sample.h +++ b/src/mongo/db/pipeline/document_source_sample.h @@ -35,8 +35,12 @@ namespace mongo { class DocumentSourceSample final : public DocumentSource, public SplittableDocumentSource { public: + static constexpr StringData kStageName = "$sample"_sd; + GetNextResult getNext() final; - const char* getSourceName() const final; + const char* getSourceName() const final { + return kStageName.rawData(); + } Value serialize(boost::optional explain = boost::none) const final; StageConstraints constraints() const final { diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp index ff6c4e6ec16..b429dbba6bf 100644 --- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp @@ -92,6 +92,11 @@ DocumentSource::GetNextResult DocumentSourceSampleFromRandomCursor::getNext() { MutableDocument md(nextResult.releaseDocument()); md.setRandMetaField(_randMetaFieldVal); + if (pExpCtx->needsMerge) { + // This stage will be merged by sorting results according to this random metadata field, but + // the merging logic expects to sort by the sort key metadata. + md.setSortKeyMetaField(BSON("" << _randMetaFieldVal)); + } return md.freeze(); } diff --git a/src/mongo/db/pipeline/document_source_skip.cpp b/src/mongo/db/pipeline/document_source_skip.cpp index 61125a879f4..cf1b99e6e93 100644 --- a/src/mongo/db/pipeline/document_source_skip.cpp +++ b/src/mongo/db/pipeline/document_source_skip.cpp @@ -50,9 +50,7 @@ REGISTER_DOCUMENT_SOURCE(skip, LiteParsedDocumentSourceDefault::parse, DocumentSourceSkip::createFromBson); -const char* DocumentSourceSkip::getSourceName() const { - return "$skip"; -} +constexpr StringData DocumentSourceSkip::kStageName; DocumentSource::GetNextResult DocumentSourceSkip::getNext() { pExpCtx->checkForInterrupt(); diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h index 8fc27a59f87..fc87d7e1eaa 100644 --- a/src/mongo/db/pipeline/document_source_skip.h +++ b/src/mongo/db/pipeline/document_source_skip.h @@ -34,9 +34,27 @@ namespace mongo { class DocumentSourceSkip final : public DocumentSource, public SplittableDocumentSource { public: - // virtuals from DocumentSource + static constexpr StringData kStageName = "$skip"_sd; + + /** + * Convenience method for creating a $skip stage. + */ + static boost::intrusive_ptr create( + const boost::intrusive_ptr& pExpCtx, long long nToSkip); + + /** + * Parses the user-supplied BSON into a $skip stage. + * + * Throws a AssertionException if 'elem' is an invalid $skip specification. + */ + static boost::intrusive_ptr createFromBson( + BSONElement elem, const boost::intrusive_ptr& pExpCtx); + GetNextResult getNext() final; - const char* getSourceName() const final; + + const char* getSourceName() const final { + return kStageName.rawData(); + } StageConstraints constraints() const final { StageConstraints constraints; @@ -77,20 +95,6 @@ public: _nToSkip = newSkip; } - /** - * Convenience method for creating a $skip stage. - */ - static boost::intrusive_ptr create( - const boost::intrusive_ptr& pExpCtx, long long nToSkip); - - /** - * Parses the user-supplied BSON into a $skip stage. - * - * Throws a AssertionException if 'elem' is an invalid $skip specification. - */ - static boost::intrusive_ptr createFromBson( - BSONElement elem, const boost::intrusive_ptr& pExpCtx); - private: explicit DocumentSourceSkip(const boost::intrusive_ptr& pExpCtx, long long nToSkip); diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 4d78d08ccaa..fd113a9c386 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -47,6 +47,54 @@ using std::make_pair; using std::string; using std::vector; +namespace { +Value missingToNull(Value maybeMissing) { + return maybeMissing.missing() ? Value(BSONNULL) : maybeMissing; +} + +/** + * Converts a Value representing an in-memory sort key to a BSONObj representing a serialized sort + * key. If 'sortPatternSize' is 1, returns a BSON object with 'value' as it's only value - and an + * empty field name. Otherwise asserts that 'value' is an array of length 'sortPatternSize', and + * returns a BSONObj with one field for each value in the array, each field using the empty field + * name. + */ +BSONObj serializeSortKey(size_t sortPatternSize, Value value) { + // Missing values don't serialize correctly in this format, so use nulls instead, since they are + // considered equivalent with woCompare(). + if (sortPatternSize == 1) { + return BSON("" << missingToNull(value)); + } + invariant(value.isArray()); + invariant(value.getArrayLength() == sortPatternSize); + BSONObjBuilder bb; + for (auto&& val : value.getArray()) { + bb << "" << missingToNull(val); + } + return bb.obj(); +} + +/** + * Converts a BSONObj representing a serialized sort key into a Value, which we use for in-memory + * comparisons. BSONObj {'': 1, '': [2, 3]} becomes Value [1, [2, 3]]. + */ +Value deserializeSortKey(size_t sortPatternSize, BSONObj bsonSortKey) { + vector keys; + keys.reserve(sortPatternSize); + for (auto&& elt : bsonSortKey) { + keys.push_back(Value{elt}); + } + invariant(keys.size() == sortPatternSize); + if (sortPatternSize == 1) { + // As a special case for a sort on a single field, we do not put the keys into an array. + return keys[0]; + } + return Value{std::move(keys)}; +} + +} // namespace +constexpr StringData DocumentSourceSort::kStageName; + DocumentSourceSort::DocumentSourceSort(const intrusive_ptr& pExpCtx) : DocumentSource(pExpCtx), _mergingPresorted(false) {} @@ -54,10 +102,6 @@ REGISTER_DOCUMENT_SOURCE(sort, LiteParsedDocumentSourceDefault::parse, DocumentSourceSort::createFromBson); -const char* DocumentSourceSort::getSourceName() const { - return "$sort"; -} - DocumentSource::GetNextResult DocumentSourceSort::getNext() { pExpCtx->checkForInterrupt(); @@ -83,17 +127,18 @@ DocumentSource::GetNextResult DocumentSourceSort::getNext() { void DocumentSourceSort::serializeToArray( std::vector& array, boost::optional explain) const { if (explain) { // always one Value for combined $sort + $limit - array.push_back(Value( - DOC(getSourceName() << DOC( - "sortKey" << serializeSortKey(static_cast(explain)) << "mergePresorted" - << (_mergingPresorted ? Value(true) : Value()) - << "limit" - << (limitSrc ? Value(limitSrc->getLimit()) : Value()))))); + array.push_back(Value(DOC( + kStageName << DOC("sortKey" << sortKeyPattern(SortKeySerialization::kForExplain) + << "mergePresorted" + << (_mergingPresorted ? Value(true) : Value()) + << "limit" + << (limitSrc ? Value(limitSrc->getLimit()) : Value()))))); } else { // one Value for $sort and maybe a Value for $limit - MutableDocument inner(serializeSortKey(static_cast(explain))); - if (_mergingPresorted) + MutableDocument inner(sortKeyPattern(SortKeySerialization::kForPipelineSerialization)); + if (_mergingPresorted) { inner["$mergePresorted"] = Value(true); - array.push_back(Value(DOC(getSourceName() << inner.freeze()))); + } + array.push_back(Value(DOC(kStageName << inner.freeze()))); if (limitSrc) { limitSrc->serializeToArray(array); @@ -109,7 +154,7 @@ long long DocumentSourceSort::getLimit() const { return limitSrc ? limitSrc->getLimit() : -1; } -Document DocumentSourceSort::serializeSortKey(bool explain) const { +Document DocumentSourceSort::sortKeyPattern(SortKeySerialization serializationMode) const { MutableDocument keyObj; const size_t n = _sortPattern.size(); for (size_t i = 0; i < n; ++i) { @@ -118,9 +163,22 @@ Document DocumentSourceSort::serializeSortKey(bool explain) const { keyObj.setField(_sortPattern[i].fieldPath->fullPath(), Value(_sortPattern[i].isAscending ? 1 : -1)); } else { - // For sorts that are not simply on a field path, use a made-up field name. - keyObj[string(str::stream() << "$computed" << i)] = - _sortPattern[i].expression->serialize(explain); + // Sorting by an expression, use a made up field name. + auto computedFieldName = string(str::stream() << "$computed" << i); + switch (serializationMode) { + case SortKeySerialization::kForExplain: + case SortKeySerialization::kForPipelineSerialization: { + const bool isExplain = (serializationMode == SortKeySerialization::kForExplain); + keyObj[computedFieldName] = _sortPattern[i].expression->serialize(isExplain); + break; + } + case SortKeySerialization::kForSortKeyMerging: { + // We need to be able to tell which direction the sort is. Expression sorts are + // always descending. + keyObj[computedFieldName] = Value(-1); + break; + } + } } } return keyObj.freeze(); @@ -149,6 +207,10 @@ DocumentSource::GetDepsReturn DocumentSourceSort::getDependencies(DepsTracker* d deps->fields.insert(keyPart.fieldPath->fullPath()); } } + if (pExpCtx->needsMerge) { + // Include the sort key if we will merge several sorted streams later. + deps->setNeedSortKey(true); + } return SEE_NEXT; } @@ -220,9 +282,11 @@ intrusive_ptr DocumentSourceSort::create( uassert(15976, "$sort stage must have at least one sort key", !pSort->_sortPattern.empty()); - const bool isExplain = false; - pSort->_sortKeyGen = - SortKeyGenerator{pSort->serializeSortKey(isExplain).toBson(), pExpCtx->getCollator()}; + pSort->_sortKeyGen = SortKeyGenerator{ + // The SortKeyGenerator expects the expressions to be serialized in order to detect a sort + // by a metadata field. + pSort->sortKeyPattern(SortKeySerialization::kForPipelineSerialization).toBson(), + pExpCtx->getCollator()}; if (limit > 0) { pSort->setLimitSrc(DocumentSourceLimit::create(pExpCtx, limit)); @@ -269,12 +333,19 @@ DocumentSource::GetNextResult DocumentSourceSort::populate() { } } -void DocumentSourceSort::loadDocument(const Document& doc) { +void DocumentSourceSort::loadDocument(Document&& doc) { invariant(!_populated); if (!_sorter) { _sorter.reset(MySorter::make(makeSortOptions(), Comparator(*this))); } - _sorter->add(extractKey(doc), doc); + + Value sortKey; + Document docForSorter; + // We always need to extract the sort key if we've reached this point. If the query system had + // already computed the sort key we'd have split the pipeline there, would be merging presorted + // documents, and wouldn't use this method. + std::tie(sortKey, docForSorter) = extractSortKey(std::move(doc)); + _sorter->add(sortKey, docForSorter); } void DocumentSourceSort::loadingDone() { @@ -295,8 +366,18 @@ public: return _cursor->more(); } Data next() { - const Document doc = DocumentSourceMergeCursors::nextSafeFrom(_cursor); - return make_pair(_sorter->extractKey(doc), doc); + auto doc = DocumentSourceMergeCursors::nextSafeFrom(_cursor); + if (doc.hasSortKeyMetaField()) { + // We set the sort key metadata field during the first half of the sort, so just use + // that as the sort key here. + return make_pair( + deserializeSortKey(_sorter->_sortPattern.size(), doc.getSortKeyMetaField()), doc); + } else { + // It's possible this result is coming from a shard that is still on an old version. If + // that's the case, it won't tell us it's sort key - we'll have to re-compute it + // ourselves. + return _sorter->extractSortKey(std::move(doc)); + } } private: @@ -344,7 +425,7 @@ StatusWith DocumentSourceSort::extractKeyFast(const Document& doc) const return Value{std::move(keys)}; } -Value DocumentSourceSort::extractKeyWithArray(const Document& doc) const { +BSONObj DocumentSourceSort::extractKeyWithArray(const Document& doc) const { SortKeyGenerator::Metadata metadata; if (doc.hasTextScore()) { metadata.textScore = doc.getTextScore(); @@ -356,26 +437,40 @@ Value DocumentSourceSort::extractKeyWithArray(const Document& doc) const { // Convert the Document to a BSONObj, but only do the conversion for the paths we actually need. // Then run the result through the SortKeyGenerator to obtain the final sort key. auto bsonDoc = document_path_support::documentToBsonWithPaths(doc, _paths); - auto bsonKey = uassertStatusOK(_sortKeyGen->getSortKey(std::move(bsonDoc), &metadata)); - - // Convert BSON sort key, which is an object with empty field names, into the corresponding - // array of keys representation as a Value. BSONObj {'': 1, '': [2, 3]} becomes Value [1, [2, - // 3]]. - vector keys; - keys.reserve(_sortPattern.size()); - for (auto&& elt : bsonKey) { - keys.push_back(Value{elt}); - } - return Value{std::move(keys)}; + return uassertStatusOK(_sortKeyGen->getSortKey(std::move(bsonDoc), &metadata)); } -Value DocumentSourceSort::extractKey(const Document& doc) const { - auto key = extractKeyFast(doc); - if (key.isOK()) { - return key.getValue(); +std::pair DocumentSourceSort::extractSortKey(Document&& doc) const { + boost::optional serializedSortKey; // Only populated if we need to merge with other + // sorted results later. Serialized in the standard + // BSON sort key format with empty field names, + // e.g. {'': 1, '': [2, 3]}. + + Value inMemorySortKey; // The Value we will use for comparisons within the sorter. + + auto fastKey = extractKeyFast(doc); + if (fastKey.isOK()) { + inMemorySortKey = std::move(fastKey.getValue()); + if (pExpCtx->needsMerge) { + serializedSortKey = serializeSortKey(_sortPattern.size(), inMemorySortKey); + } + } else { + // We have to do it the slow way - through the sort key generator. This will generate a BSON + // sort key, which is an object with empty field names. We then need to convert this BSON + // representation into the corresponding array of keys as a Value. BSONObj {'': 1, '': [2, + // 3]} becomes Value [1, [2, 3]]. + serializedSortKey = extractKeyWithArray(doc); + inMemorySortKey = deserializeSortKey(_sortPattern.size(), *serializedSortKey); } - return extractKeyWithArray(doc); + MutableDocument toBeSorted(std::move(doc)); + if (pExpCtx->needsMerge) { + // We need to be merged, so will have to be serialized. Save the sort key here to avoid + // re-computing it during the merge. + invariant(serializedSortKey); + toBeSorted.setSortKeyMetaField(*serializedSortKey); + } + return {inMemorySortKey, toBeSorted.freeze()}; } int DocumentSourceSort::compare(const Value& lhs, const Value& rhs) const { diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 5731051cbd3..2494400eaae 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -39,10 +39,20 @@ namespace mongo { class DocumentSourceSort final : public DocumentSource, public SplittableDocumentSource { public: static const uint64_t kMaxMemoryUsageBytes = 100 * 1024 * 1024; + static constexpr StringData kStageName = "$sort"_sd; + + enum class SortKeySerialization { + kForExplain, + kForPipelineSerialization, + kForSortKeyMerging, + }; - // virtuals from DocumentSource GetNextResult getNext() final; - const char* getSourceName() const final; + + const char* getSourceName() const final { + return kStageName.rawData(); + } + void serializeToArray( std::vector& array, boost::optional explain = boost::none) const final; @@ -73,8 +83,10 @@ public: boost::intrusive_ptr getShardSource() final; boost::intrusive_ptr getMergeSource() final; - /// Write out a Document whose contents are the sort key. - Document serializeSortKey(bool explain) const; + /** + * Write out a Document whose contents are the sort key pattern. + */ + Document sortKeyPattern(SortKeySerialization) const; /** * Parses a $sort stage from the user-supplied BSON. @@ -101,7 +113,7 @@ public: * coming from another DocumentSource. Once all documents have been added, the caller must call * loadingDone() before using getNext() to receive the documents in sorted order. */ - void loadDocument(const Document& doc); + void loadDocument(Document&& doc); /** * Signals to the sort stage that there will be no more input documents. It is an error to call @@ -179,11 +191,15 @@ private: SortOptions makeSortOptions() const; /** - * Returns the sort key for 'doc' based on the SortPattern. Attempts to generate the key using a - * fast path that does not handle arrays. If an array is encountered, falls back on - * extractKeyWithArray(). + * Returns the sort key for 'doc', as well as the document that should be entered into the + * sorter to eventually be returned. If we will need to later merge the sorted results with + * other results, this method adds the sort key as metadata onto 'doc' to speed up the merge + * later. + * + * Attempts to generate the key using a fast path that does not handle arrays. If an array is + * encountered, falls back on extractKeyWithArray(). */ - Value extractKey(const Document& doc) const; + std::pair extractSortKey(Document&& doc) const; /** * Returns the sort key for 'doc' based on the SortPattern, or ErrorCodes::InternalError if an @@ -198,9 +214,10 @@ private: StatusWith extractKeyPart(const Document& doc, const SortPatternPart& keyPart) const; /** - * Returns the sort key for 'doc' based on the SortPattern. + * Returns the sort key for 'doc' based on the SortPattern. Note this is in the BSONObj format - + * with empty field names. */ - Value extractKeyWithArray(const Document& doc) const; + BSONObj extractKeyWithArray(const Document& doc) const; int compare(const Value& lhs, const Value& rhs) const; diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 89975fc4cf2..c5edf02f50d 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -374,10 +374,10 @@ void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipelin void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe) { - DepsTracker mergeDeps( - mergePipe->getDependencies(DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery()) - ? DepsTracker::MetadataAvailable::kTextScore - : DepsTracker::MetadataAvailable::kNoMetadata)); + auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery()) + ? DepsTracker::MetadataAvailable::kTextScore + : DepsTracker::MetadataAvailable::kNoMetadata; + DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata)); if (mergeDeps.needWholeDocument) return; // the merge needs all fields, so nothing we can do. @@ -399,7 +399,7 @@ void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipelin // 2) Optimization IS NOT applied immediately following a $project or $group since it would // add an unnecessary project (and therefore a deep-copy). for (auto&& source : shardPipe->_sources) { - DepsTracker dt; + DepsTracker dt(depsMetadata); if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS) return; } @@ -528,6 +528,9 @@ DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAva if (localDeps.getNeedTextScore()) deps.setNeedTextScore(true); + if (localDeps.getNeedSortKey()) + deps.setNeedSortKey(true); + knowAllMeta = status & DocumentSource::EXHAUSTIVE_META; } @@ -552,4 +555,13 @@ DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAva return deps; } +boost::intrusive_ptr Pipeline::popFrontStageWithName(StringData targetStageName) { + if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) { + return nullptr; + } + auto targetStage = _sources.front(); + _sources.pop_front(); + stitch(); + return targetStage; +} } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 35b18954fdd..7300b8e542e 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -263,6 +263,12 @@ public: return _sources; } + /** + * Removes and returns the first stage of the pipeline if its name is 'targetStageName'. Returns + * nullptr if there is no first stage, or if the stage's name is not 'targetStageName'. + */ + boost::intrusive_ptr popFrontStageWithName(StringData targetStageName); + /** * PipelineD is a "sister" class that has additional functionality for the Pipeline. It exists * because of linkage requirements. Pipeline needs to function in mongod and mongos. PipelineD diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 42f9ea6615c..7f6a7c5fc49 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -448,6 +448,13 @@ StatusWith> attemptToGetExe return getExecutorFind( opCtx, collection, nss, std::move(cq.getValue()), PlanExecutor::YIELD_AUTO, plannerOpts); } + +BSONObj removeSortKeyMetaProjection(BSONObj projectionObj) { + if (!projectionObj[Document::metaFieldSortKey]) { + return projectionObj; + } + return projectionObj.removeField(Document::metaFieldSortKey); +} } // namespace void PipelineD::prepareCursorSource(Collection* collection, @@ -526,20 +533,18 @@ void PipelineD::prepareCursorSource(Collection* collection, BSONObj projForQuery = deps.toProjection(); - /* - Look for an initial sort; we'll try to add this to the - Cursor we create. If we're successful in doing that (further down), - we'll remove the $sort from the pipeline, because the documents - will already come sorted in the specified order as a result of the - index scan. - */ + // Look for an initial sort; we'll try to add this to the Cursor we create. If we're successful + // in doing that, we'll remove the $sort from the pipeline, because the documents will already + // come sorted in the specified order as a result of the index scan. intrusive_ptr sortStage; BSONObj sortObj; if (!sources.empty()) { sortStage = dynamic_cast(sources.front().get()); if (sortStage) { - // build the sort key - sortObj = sortStage->serializeSortKey(/*explain*/ false).toBson(); + sortObj = sortStage + ->sortKeyPattern( + DocumentSourceSort::SortKeySerialization::kForPipelineSerialization) + .toBson(); } } @@ -611,26 +616,30 @@ StatusWith> PipelineD::prep plannerOpts |= QueryPlannerParams::IS_COUNT; } - // The only way to get a text score is to let the query system handle the projection. In all - // other cases, unless the query system can do an index-covered projection and avoid going to - // the raw record at all, it is faster to have ParsedDeps filter the fields we need. - if (!deps.getNeedTextScore()) { + // The only way to get a text score or the sort key is to let the query system handle the + // projection. In all other cases, unless the query system can do an index-covered projection + // and avoid going to the raw record at all, it is faster to have ParsedDeps filter the fields + // we need. + if (!deps.getNeedTextScore() && !deps.getNeedSortKey()) { plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS; } - BSONObj emptyProjection; + const BSONObj emptyProjection; + const BSONObj metaSortProjection = BSON("$meta" + << "sortKey"); if (sortStage) { // See if the query system can provide a non-blocking sort. - auto swExecutorSort = attemptToGetExecutor(opCtx, - collection, - nss, - expCtx, - oplogReplay, - queryObj, - emptyProjection, - *sortObj, - aggRequest, - plannerOpts); + auto swExecutorSort = + attemptToGetExecutor(opCtx, + collection, + nss, + expCtx, + oplogReplay, + queryObj, + expCtx->needsMerge ? metaSortProjection : emptyProjection, + *sortObj, + aggRequest, + plannerOpts); if (swExecutorSort.isOK()) { // Success! Now see if the query system can also cover the projection. @@ -682,6 +691,14 @@ StatusWith> PipelineD::prep // Either there was no $sort stage, or the query system could not provide a non-blocking // sort. dassert(sortObj->isEmpty()); + *projectionObj = removeSortKeyMetaProjection(*projectionObj); + if (deps.getNeedSortKey() && !deps.getNeedTextScore()) { + // A sort key requirement would have prevented us from being able to add this parameter + // before, but now we know the query system won't cover the sort, so we will be able to + // compute the sort key ourselves during the $sort stage, and thus don't need a query + // projection to do so. + plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS; + } // See if the query system can cover the projection. auto swExecutorProj = attemptToGetExecutor(opCtx, diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp index 5d5909d4aa5..56e9041dd23 100644 --- a/src/mongo/db/query/query_planner.cpp +++ b/src/mongo/db/query/query_planner.cpp @@ -107,7 +107,25 @@ string optionString(size_t options) { ss << "INDEX_INTERSECTION "; } if (options & QueryPlannerParams::KEEP_MUTATIONS) { - ss << "KEEP_MUTATIONS"; + ss << "KEEP_MUTATIONS "; + } + if (options & QueryPlannerParams::IS_COUNT) { + ss << "IS_COUNT "; + } + if (options & QueryPlannerParams::SPLIT_LIMITED_SORT) { + ss << "SPLIT_LIMITED_SORT "; + } + if (options & QueryPlannerParams::CANNOT_TRIM_IXISECT) { + ss << "CANNOT_TRIM_IXISECT "; + } + if (options & QueryPlannerParams::SNAPSHOT_USE_ID) { + ss << "SNAPSHOT_USE_ID "; + } + if (options & QueryPlannerParams::NO_UNCOVERED_PROJECTIONS) { + ss << "NO_UNCOVERED_PROJECTIONS "; + } + if (options & QueryPlannerParams::GENERATE_COVERED_IXSCANS) { + ss << "GENERATE_COVERED_IXSCANS "; } return ss; diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index f2ba6b2b839..1bdbd85e850 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -332,6 +332,13 @@ BSONObj establishMergingMongosCursor( params.mergePipeline = std::move(pipelineForMerging); params.remotes = std::move(cursors); + // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch + // size we pass here is used for getMores, so do not specify a batch size if the initial request + // had a batch size of 0. + params.batchSize = request.getBatchSize() == 0 + ? boost::none + : boost::optional(request.getBatchSize()); + auto ccc = ClusterClientCursorImpl::make( opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); @@ -582,10 +589,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // If we reach here, we have a merge pipeline to dispatch. invariant(pipelineForMerging); - // We need a DocumentSourceMergeCursors regardless of whether we merge on mongoS or on a shard. - pipelineForMerging->addInitialSource( - DocumentSourceMergeCursors::create(parseCursors(cursors), mergeCtx)); - // First, check whether we can merge on the mongoS. if (pipelineForMerging->canRunOnMongos() && !internalQueryProhibitMergingOnMongoS.load()) { // Register the new mongoS cursor, and retrieve the initial batch of results. @@ -602,6 +605,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // If we cannot merge on mongoS, establish the merge cursor on a shard. + pipelineForMerging->addInitialSource( + DocumentSourceMergeCursors::create(parseCursors(cursors), mergeCtx)); auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, pipelineForMerging); auto mergeResponse = uassertStatusOK(establishMergingShardCursor( diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 460dcf7912e..2ae87217bc8 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -32,11 +32,11 @@ env.Library( env.Library( target="router_exec_stage", source=[ - "router_stage_aggregation_merge.cpp", "router_stage_limit.cpp", "router_stage_merge.cpp", "router_stage_mock.cpp", - "router_stage_remove_sortkey.cpp", + "router_stage_pipeline.cpp", + "router_stage_remove_metadata_fields.cpp", "router_stage_skip.cpp", ], LIBDEPS=[ @@ -48,7 +48,7 @@ env.CppUnitTest( target="router_exec_stage_test", source=[ "router_stage_limit_test.cpp", - "router_stage_remove_sortkey_test.cpp", + "router_stage_remove_metadata_fields_test.cpp", "router_stage_skip_test.cpp", ], LIBDEPS=[ diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 19281785be0..50886944bb2 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -61,7 +61,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, _mergeQueue(MergingComparator(_remotes, _params->sort)) { size_t remoteIndex = 0; for (const auto& remote : _params->remotes) { - _remotes.emplace_back(remote.hostAndPort, remote.cursorResponse.getCursorId()); + _remotes.emplace_back(remote.hostAndPort, + remote.cursorResponse.getNSS(), + remote.cursorResponse.getCursorId()); // We don't check the return value of addBatchToBuffer here; if there was an error, // it will be stored in the remote and the first call to ready() will return true. @@ -269,7 +271,7 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { adjustedBatchSize = *_params->batchSize - remote.fetchedCount; } - BSONObj cmdObj = GetMoreRequest(_params->nsString, + BSONObj cmdObj = GetMoreRequest(remote.cursorNss, remote.cursorId, adjustedBatchSize, _awaitDataTimeout, @@ -582,8 +584,11 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o // AsyncResultsMerger::RemoteCursorData::RemoteCursorData(HostAndPort hostAndPort, + NamespaceString cursorNss, CursorId establishedCursorId) - : cursorId(establishedCursorId), shardHostAndPort(std::move(hostAndPort)) {} + : cursorId(establishedCursorId), + cursorNss(std::move(cursorNss)), + shardHostAndPort(std::move(hostAndPort)) {} const HostAndPort& AsyncResultsMerger::RemoteCursorData::getTargetHost() const { return shardHostAndPort; diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 04262309a99..c6ec2a26052 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -202,7 +202,9 @@ private: * reported from the remote. */ struct RemoteCursorData { - RemoteCursorData(HostAndPort hostAndPort, CursorId establishedCursorId); + RemoteCursorData(HostAndPort hostAndPort, + NamespaceString cursorNss, + CursorId establishedCursorId); /** * Returns the resolved host and port on which the remote cursor resides. @@ -230,6 +232,10 @@ private: // member will be set to zero. CursorId cursorId; + // The namespace this cursor belongs to - note this may be different than the namespace of + // the operation if there is a view. + NamespaceString cursorNss; + // The exact host in the shard on which the cursor resides. HostAndPort shardHostAndPort; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index e7716355c0f..f286cee408e 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -26,17 +26,18 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery - #include "mongo/platform/basic.h" #include "mongo/s/query/cluster_client_cursor_impl.h" -#include "mongo/s/query/router_stage_aggregation_merge.h" +#include "mongo/db/pipeline/document_source_limit.h" +#include "mongo/db/pipeline/document_source_skip.h" +#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/s/query/router_stage_limit.h" #include "mongo/s/query/router_stage_merge.h" #include "mongo/s/query/router_stage_mock.h" -#include "mongo/s/query/router_stage_remove_sortkey.h" +#include "mongo/s/query/router_stage_pipeline.h" +#include "mongo/s/query/router_stage_remove_metadata_fields.h" #include "mongo/s/query/router_stage_skip.h" #include "mongo/stdx/memory.h" @@ -140,18 +141,87 @@ boost::optional ClusterClientCursorImpl::getLsid() const { return _lsid; } +namespace { + +/** + * Rips off an initial $sort stage that will be handled by mongos execution machinery. Returns the + * sort key pattern of such a $sort stage if there was one, and boost::none otherwise. + */ +boost::optional extractLeadingSort(Pipeline* mergePipeline) { + if (auto frontSort = mergePipeline->popFrontStageWithName(DocumentSourceSort::kStageName)) { + auto sortStage = static_cast(frontSort.get()); + if (auto sortLimit = sortStage->getLimitSrc()) { + // There was a limit stage absorbed into the sort stage, so we need to preserve that. + mergePipeline->addInitialSource(sortLimit); + } + return sortStage + ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging) + .toBson(); + } + return boost::none; +} + +bool isSkipOrLimit(const boost::intrusive_ptr& stage) { + return (dynamic_cast(stage.get()) || + dynamic_cast(stage.get())); +} + +bool isAllLimitsAndSkips(Pipeline* pipeline) { + const auto stages = pipeline->getSources(); + return std::all_of( + stages.begin(), stages.end(), [&](const auto& stage) { return isSkipOrLimit(stage); }); +} + +std::unique_ptr buildPipelinePlan(executor::TaskExecutor* executor, + ClusterClientCursorParams* params) { + invariant(params->mergePipeline); + invariant(!params->skip); + invariant(!params->limit); + auto* pipeline = params->mergePipeline.get(); + auto* opCtx = pipeline->getContext()->opCtx; + + std::unique_ptr root = + stdx::make_unique(opCtx, executor, params); + if (!isAllLimitsAndSkips(pipeline)) { + return stdx::make_unique(std::move(root), + std::move(params->mergePipeline)); + } + + // After extracting an optional leading $sort, the pipeline consisted entirely of $skip and + // $limit stages. Avoid creating a RouterStagePipeline (which will go through an expensive + // conversion from BSONObj -> Document for each result), and create a RouterExecStage tree + // instead. + while (!pipeline->getSources().empty()) { + invariant(isSkipOrLimit(pipeline->getSources().front())); + if (auto skip = pipeline->popFrontStageWithName(DocumentSourceSkip::kStageName)) { + root = stdx::make_unique( + opCtx, std::move(root), static_cast(skip.get())->getSkip()); + } else if (auto limit = pipeline->popFrontStageWithName(DocumentSourceLimit::kStageName)) { + root = stdx::make_unique( + opCtx, std::move(root), static_cast(limit.get())->getLimit()); + } + } + if (!params->sort.isEmpty()) { + // We are executing the pipeline without using a Pipeline, so we need to strip out any + // Document metadata ourselves. Note we only need this stage if there was a sort, since + // otherwise there would be no way for this half of the pipeline to require any metadata + // fields. + root = stdx::make_unique( + opCtx, std::move(root), Document::allMetadataFieldNames); + } + return root; +} +} // namespace + std::unique_ptr ClusterClientCursorImpl::buildMergerPlan( OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) { const auto skip = params->skip; const auto limit = params->limit; - const bool hasSort = !params->sort.isEmpty(); - - // The first stage always merges from the remotes. If 'mergePipeline' has been specified in - // ClusterClientCursorParams, then RouterStageAggregationMerge should be the root and only node. - // Otherwise, construct a RouterStage pipeline from the remotes, skip, limit, and sort fields in - // 'params'. if (params->mergePipeline) { - return stdx::make_unique(std::move(params->mergePipeline)); + if (auto sort = extractLeadingSort(params->mergePipeline.get())) { + params->sort = *sort; + } + return buildPipelinePlan(executor, params); } std::unique_ptr root = @@ -165,8 +235,13 @@ std::unique_ptr ClusterClientCursorImpl::buildMergerPlan( root = stdx::make_unique(opCtx, std::move(root), *limit); } + const bool hasSort = !params->sort.isEmpty(); if (hasSort) { - root = stdx::make_unique(opCtx, std::move(root)); + // Strip out the sort key after sorting. + root = stdx::make_unique( + opCtx, + std::move(root), + std::vector{ClusterClientCursorParams::kSortKeyField}); } return root; diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index ac074d92b62..e7d8ebb65b9 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -77,12 +77,18 @@ public: * currently attached. This is so that a killing thread may call this method with its own * operation context. */ - virtual void kill(OperationContext* opCtx) = 0; + virtual void kill(OperationContext* opCtx) { + invariant(_child); // The default implementation forwards to the child stage. + _child->kill(opCtx); + } /** * Returns whether or not all the remote cursors are exhausted. */ - virtual bool remotesExhausted() = 0; + virtual bool remotesExhausted() { + invariant(_child); // The default implementation forwards to the child stage. + return _child->remotesExhausted(); + } /** * Sets the maxTimeMS value that the cursor should forward with any internally issued getMore @@ -91,7 +97,15 @@ public: * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. if * the cursor is not tailable + awaitData). */ - virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0; + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + if (_child) { + auto childStatus = _child->setAwaitDataTimeout(awaitDataTimeout); + if (!childStatus.isOK()) { + return childStatus; + } + } + return doSetAwaitDataTimeout(awaitDataTimeout); + } /** * Sets the current operation context to be used by the router stage. @@ -134,6 +148,13 @@ protected: */ virtual void doDetachFromOperationContext() {} + /** + * Performs any stage-specific await data timeout actions. + */ + virtual Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { + return Status::OK(); + } + /** * Returns an unowned pointer to the child stage, or nullptr if there is no child. */ diff --git a/src/mongo/s/query/router_stage_aggregation_merge.cpp b/src/mongo/s/query/router_stage_aggregation_merge.cpp deleted file mode 100644 index a4273dbd4a7..00000000000 --- a/src/mongo/s/query/router_stage_aggregation_merge.cpp +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/s/query/router_stage_aggregation_merge.h" - -#include "mongo/db/pipeline/document_source_merge_cursors.h" -#include "mongo/db/pipeline/expression_context.h" - -namespace mongo { - -RouterStageAggregationMerge::RouterStageAggregationMerge( - std::unique_ptr mergePipeline) - : RouterExecStage(mergePipeline->getContext()->opCtx), - _mergePipeline(std::move(mergePipeline)) {} - -StatusWith RouterStageAggregationMerge::next() { - // Pipeline::getNext will return a boost::optional or boost::none if EOF. - if (auto result = _mergePipeline->getNext()) { - return {result->toBson()}; - } - - // If we reach this point, we have hit EOF. - _mergePipeline.get_deleter().dismissDisposal(); - _mergePipeline->dispose(getOpCtx()); - - return {ClusterQueryResult()}; -} - -void RouterStageAggregationMerge::doReattachToOperationContext() { - _mergePipeline->reattachToOperationContext(getOpCtx()); -} - -void RouterStageAggregationMerge::doDetachFromOperationContext() { - _mergePipeline->detachFromOperationContext(); -} - -void RouterStageAggregationMerge::kill(OperationContext* opCtx) { - _mergePipeline.get_deleter().dismissDisposal(); - _mergePipeline->dispose(opCtx); -} - -bool RouterStageAggregationMerge::remotesExhausted() { - const auto mergeSource = - static_cast(_mergePipeline->getSources().front().get()); - return mergeSource->remotesExhausted(); -} - -Status RouterStageAggregationMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { - return {ErrorCodes::InvalidOptions, "maxTimeMS is not valid for aggregation getMore"}; -} - -} // namespace mongo diff --git a/src/mongo/s/query/router_stage_aggregation_merge.h b/src/mongo/s/query/router_stage_aggregation_merge.h deleted file mode 100644 index 363b46e73d9..00000000000 --- a/src/mongo/s/query/router_stage_aggregation_merge.h +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - -#include "mongo/s/query/router_exec_stage.h" - -#include "mongo/db/pipeline/pipeline.h" - -namespace mongo { - -/** - * Draws results from a Pipeline with a DocumentSourceMergeCursors at its head, which is the - * underlying source of the stream of merged documents manipulated by the RouterStage pipeline. - */ -class RouterStageAggregationMerge final : public RouterExecStage { -public: - RouterStageAggregationMerge(std::unique_ptr mergePipeline); - - StatusWith next() final; - - void kill(OperationContext* opCtx) final; - - bool remotesExhausted() final; - - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - -protected: - void doReattachToOperationContext() final; - - void doDetachFromOperationContext() final; - -private: - std::unique_ptr _mergePipeline; -}; - -} // namespace mongo diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp index feb8f11626f..b3fd2b14651 100644 --- a/src/mongo/s/query/router_stage_limit.cpp +++ b/src/mongo/s/query/router_stage_limit.cpp @@ -57,16 +57,4 @@ StatusWith RouterStageLimit::next() { return childResult; } -void RouterStageLimit::kill(OperationContext* opCtx) { - getChildStage()->kill(opCtx); -} - -bool RouterStageLimit::remotesExhausted() { - return getChildStage()->remotesExhausted(); -} - -Status RouterStageLimit::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { - return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); -} - } // namespace mongo diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h index 42ef46c21ab..1a158e2c3a7 100644 --- a/src/mongo/s/query/router_stage_limit.h +++ b/src/mongo/s/query/router_stage_limit.h @@ -43,12 +43,6 @@ public: StatusWith next() final; - void kill(OperationContext* opCtx) final; - - bool remotesExhausted() final; - - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - private: long long _limit; diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index 78ee1a3475a..f2a159003c2 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -69,7 +69,7 @@ bool RouterStageMerge::remotesExhausted() { return _arm.remotesExhausted(); } -Status RouterStageMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { +Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { return _arm.setAwaitDataTimeout(awaitDataTimeout); } diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index 23503c664f6..78c5383e0ee 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -53,7 +53,8 @@ public: bool remotesExhausted() final; - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; +protected: + Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; protected: void doReattachToOperationContext() override { diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp index edeb1f9945c..7ebc3a6a554 100644 --- a/src/mongo/s/query/router_stage_mock.cpp +++ b/src/mongo/s/query/router_stage_mock.cpp @@ -68,7 +68,7 @@ bool RouterStageMock::remotesExhausted() { return _remotesExhausted; } -Status RouterStageMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { +Status RouterStageMock::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { _awaitDataTimeout = awaitDataTimeout; return Status::OK(); } diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h index e2f8e7adab5..8e3075103d5 100644 --- a/src/mongo/s/query/router_stage_mock.h +++ b/src/mongo/s/query/router_stage_mock.h @@ -51,8 +51,6 @@ public: bool remotesExhausted() final; - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - /** * Queues a BSONObj to be returned. */ @@ -79,6 +77,9 @@ public: */ StatusWith getAwaitDataTimeout(); +protected: + Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + private: std::queue> _resultsQueue; bool _remotesExhausted = false; diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp new file mode 100644 index 00000000000..d9cf02f85c3 --- /dev/null +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -0,0 +1,135 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/query/router_stage_pipeline.h" + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_merge_cursors.h" +#include "mongo/db/pipeline/expression_context.h" + +namespace mongo { + +namespace { + +/** + * A class that acts as an adapter between the RouterExecStage and DocumentSource interfaces, + * translating results from an input RouterExecStage into DocumentSource::GetNextResults. + */ +class DocumentSourceRouterAdapter : public DocumentSource { +public: + static boost::intrusive_ptr create( + const boost::intrusive_ptr& expCtx, + std::unique_ptr childStage) { + return new DocumentSourceRouterAdapter(expCtx, std::move(childStage)); + } + + GetNextResult getNext() final { + auto next = uassertStatusOK(_child->next()); + if (auto nextObj = next.getResult()) { + return Document::fromBsonWithMetaData(*nextObj); + } + return GetNextResult::makeEOF(); + } + + void doDispose() final { + _child->kill(pExpCtx->opCtx); + } + + void reattachToOperationContext(OperationContext* opCtx) final { + _child->reattachToOperationContext(opCtx); + } + + void detachFromOperationContext() final { + _child->detachFromOperationContext(); + } + + Value serialize(boost::optional explain) const final { + invariant(explain); // We shouldn't need to serialize this stage to send it anywhere. + return Value(); // Return the empty value to hide this stage from explain output. + } + + bool remotesExhausted() { + return _child->remotesExhausted(); + } + +private: + DocumentSourceRouterAdapter(const boost::intrusive_ptr& expCtx, + std::unique_ptr childStage) + : DocumentSource(expCtx), _child(std::move(childStage)) {} + + std::unique_ptr _child; +}; +} // namespace + +RouterStagePipeline::RouterStagePipeline(std::unique_ptr child, + std::unique_ptr mergePipeline) + : RouterExecStage(mergePipeline->getContext()->opCtx), + _mergePipeline(std::move(mergePipeline)) { + // Add an adapter to the front of the pipeline to draw results from 'child'. + _mergePipeline->addInitialSource( + DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child))); +} + +StatusWith RouterStagePipeline::next() { + // Pipeline::getNext will return a boost::optional or boost::none if EOF. + if (auto result = _mergePipeline->getNext()) { + return {result->toBson()}; + } + + // If we reach this point, we have hit EOF. + _mergePipeline.get_deleter().dismissDisposal(); + _mergePipeline->dispose(getOpCtx()); + + return {ClusterQueryResult()}; +} + +void RouterStagePipeline::doReattachToOperationContext() { + _mergePipeline->reattachToOperationContext(getOpCtx()); +} + +void RouterStagePipeline::doDetachFromOperationContext() { + _mergePipeline->detachFromOperationContext(); +} + +void RouterStagePipeline::kill(OperationContext* opCtx) { + _mergePipeline.get_deleter().dismissDisposal(); + _mergePipeline->dispose(opCtx); +} + +bool RouterStagePipeline::remotesExhausted() { + return static_cast(_mergePipeline->getSources().front().get()) + ->remotesExhausted(); +} + +Status RouterStagePipeline::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { + return {ErrorCodes::InvalidOptions, "maxTimeMS is not valid for aggregation getMore"}; +} + +} // namespace mongo diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h new file mode 100644 index 00000000000..780f1fe0e47 --- /dev/null +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/s/query/router_exec_stage.h" + +#include "mongo/db/pipeline/pipeline.h" + +namespace mongo { + +/** + * Inserts a pipeline into the router execution tree, drawing results from the input stage, feeding + * them through the pipeline, and outputting the results of the pipeline. + */ +class RouterStagePipeline final : public RouterExecStage { +public: + RouterStagePipeline(std::unique_ptr child, + std::unique_ptr mergePipeline); + + StatusWith next() final; + + void kill(OperationContext* opCtx) final; + + bool remotesExhausted() final; + +protected: + Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + + void doReattachToOperationContext() final; + + void doDetachFromOperationContext() final; + +private: + std::unique_ptr _mergePipeline; +}; + +} // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_metadata_fields.cpp b/src/mongo/s/query/router_stage_remove_metadata_fields.cpp new file mode 100644 index 00000000000..3be98380e4e --- /dev/null +++ b/src/mongo/s/query/router_stage_remove_metadata_fields.cpp @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include + +#include "mongo/s/query/router_stage_remove_metadata_fields.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/pipeline/document.h" + +namespace mongo { + +RouterStageRemoveMetadataFields::RouterStageRemoveMetadataFields( + OperationContext* opCtx, + std::unique_ptr child, + std::vector metadataFields) + : RouterExecStage(opCtx, std::move(child)), _metaFields(std::move(metadataFields)) { + for (auto&& fieldName : _metaFields) { + invariant(fieldName[0] == '$'); // We use this information to optimize next(). + } +} + +StatusWith RouterStageRemoveMetadataFields::next() { + auto childResult = getChildStage()->next(); + if (!childResult.isOK() || !childResult.getValue().getResult()) { + return childResult; + } + + BSONObjIterator iterator(*childResult.getValue().getResult()); + // Find the first field that we need to remove. + while (iterator.more() && (*iterator).fieldName()[0] != '$' && + std::find(_metaFields.begin(), _metaFields.end(), (*iterator).fieldNameStringData()) == + _metaFields.end()) { + ++iterator; + } + + if (!iterator.more()) { + // We got all the way to the end without finding any fields to remove, just return the whole + // document. + return childResult; + } + + // Copy everything up to the first metadata field. + const auto firstElementBufferStart = + childResult.getValue().getResult()->firstElement().rawdata(); + auto endOfNonMetaFieldBuffer = (*iterator).rawdata(); + BSONObjBuilder builder; + builder.bb().appendBuf(firstElementBufferStart, + endOfNonMetaFieldBuffer - firstElementBufferStart); + + // Copy any remaining fields that are not metadata. We expect metadata fields are likely to be + // at the end of the document, so there is likely nothing else to copy. + while ((++iterator).more()) { + if (std::find(_metaFields.begin(), _metaFields.end(), (*iterator).fieldNameStringData()) == + _metaFields.end()) { + builder.append(*iterator); + } + } + return {builder.obj()}; +} + +} // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_metadata_fields.h b/src/mongo/s/query/router_stage_remove_metadata_fields.h new file mode 100644 index 00000000000..07c9c7c36bb --- /dev/null +++ b/src/mongo/s/query/router_stage_remove_metadata_fields.h @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include + +#include "mongo/s/query/router_exec_stage.h" + +namespace mongo { + +/** + * Removes metadata fields from a BSON object. + */ +class RouterStageRemoveMetadataFields final : public RouterExecStage { +public: + RouterStageRemoveMetadataFields(OperationContext* opCtx, + std::unique_ptr child, + std::vector fieldsToRemove); + + StatusWith next() final; + +private: + // Use a StringMap so we can look up by StringData - avoiding a string allocation on each field + // in each object. The value here is meaningless. + std::vector _metaFields; +}; + +} // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_metadata_fields_test.cpp b/src/mongo/s/query/router_stage_remove_metadata_fields_test.cpp new file mode 100644 index 00000000000..bb8ea4613b8 --- /dev/null +++ b/src/mongo/s/query/router_stage_remove_metadata_fields_test.cpp @@ -0,0 +1,196 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/query/router_stage_remove_metadata_fields.h" + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/s/query/router_stage_mock.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + +namespace { + +// These tests use router stages, which do not actually use their OperationContext, so rather than +// going through the trouble of making one, we'll just use nullptr throughout. +OperationContext* opCtx = nullptr; + +TEST(RouterStageRemoveMetadataFieldsTest, RemovesMetaDataFields) { + auto mockStage = stdx::make_unique(opCtx); + mockStage->queueResult(BSON("a" << 4 << "$sortKey" << 1 << "b" << 3)); + mockStage->queueResult(BSON("$sortKey" << BSON("" << 3) << "c" << BSON("d" + << "foo"))); + mockStage->queueResult(BSON("a" << 3)); + mockStage->queueResult(BSON("a" << 3 << "$randVal" << 4 << "$sortKey" << 2)); + mockStage->queueResult( + BSON("$textScore" << 2 << "a" << 3 << "$randVal" << 4 << "$sortKey" << 2)); + mockStage->queueResult(BSON("$textScore" << 2)); + mockStage->queueResult(BSONObj()); + + auto sortKeyStage = stdx::make_unique( + opCtx, std::move(mockStage), Document::allMetadataFieldNames); + + auto firstResult = sortKeyStage->next(); + ASSERT_OK(firstResult.getStatus()); + ASSERT(firstResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 4 << "b" << 3)); + + auto secondResult = sortKeyStage->next(); + ASSERT_OK(secondResult.getStatus()); + ASSERT(secondResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), + BSON("c" << BSON("d" + << "foo"))); + + auto thirdResult = sortKeyStage->next(); + ASSERT_OK(thirdResult.getStatus()); + ASSERT(thirdResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 3)); + + auto fourthResult = sortKeyStage->next(); + ASSERT_OK(fourthResult.getStatus()); + ASSERT(fourthResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSON("a" << 3)); + + auto fifthResult = sortKeyStage->next(); + ASSERT_OK(fifthResult.getStatus()); + ASSERT(fifthResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*fifthResult.getValue().getResult(), BSON("a" << 3)); + + auto sixthResult = sortKeyStage->next(); + ASSERT_OK(sixthResult.getStatus()); + ASSERT(sixthResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*sixthResult.getValue().getResult(), BSONObj()); + + auto seventhResult = sortKeyStage->next(); + ASSERT_OK(seventhResult.getStatus()); + ASSERT(seventhResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*seventhResult.getValue().getResult(), BSONObj()); + + auto eighthResult = sortKeyStage->next(); + ASSERT_OK(eighthResult.getStatus()); + ASSERT(eighthResult.getValue().isEOF()); +} + +TEST(RouterStageRemoveMetadataFieldsTest, PropagatesError) { + auto mockStage = stdx::make_unique(opCtx); + mockStage->queueResult(BSON("$sortKey" << 1)); + mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened")); + + auto sortKeyStage = stdx::make_unique( + opCtx, std::move(mockStage), std::vector{"$sortKey"_sd}); + + auto firstResult = sortKeyStage->next(); + ASSERT_OK(firstResult.getStatus()); + ASSERT(firstResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSONObj()); + + auto secondResult = sortKeyStage->next(); + ASSERT_NOT_OK(secondResult.getStatus()); + ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue); + ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened"); +} + +TEST(RouterStageRemoveMetadataFieldsTest, ToleratesMidStreamEOF) { + auto mockStage = stdx::make_unique(opCtx); + mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1)); + mockStage->queueEOF(); + mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2)); + + auto sortKeyStage = stdx::make_unique( + opCtx, std::move(mockStage), std::vector{"$sortKey"_sd}); + + auto firstResult = sortKeyStage->next(); + ASSERT_OK(firstResult.getStatus()); + ASSERT(firstResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1)); + + auto secondResult = sortKeyStage->next(); + ASSERT_OK(secondResult.getStatus()); + ASSERT(secondResult.getValue().isEOF()); + + auto thirdResult = sortKeyStage->next(); + ASSERT_OK(thirdResult.getStatus()); + ASSERT(thirdResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 2 << "b" << 2)); + + auto fourthResult = sortKeyStage->next(); + ASSERT_OK(fourthResult.getStatus()); + ASSERT(fourthResult.getValue().isEOF()); +} + +TEST(RouterStageRemoveMetadataFieldsTest, RemotesExhausted) { + auto mockStage = stdx::make_unique(opCtx); + mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1)); + mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2)); + mockStage->markRemotesExhausted(); + + auto sortKeyStage = stdx::make_unique( + opCtx, std::move(mockStage), std::vector{"$sortKey"_sd}); + ASSERT_TRUE(sortKeyStage->remotesExhausted()); + + auto firstResult = sortKeyStage->next(); + ASSERT_OK(firstResult.getStatus()); + ASSERT(firstResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1)); + ASSERT_TRUE(sortKeyStage->remotesExhausted()); + + auto secondResult = sortKeyStage->next(); + ASSERT_OK(secondResult.getStatus()); + ASSERT(secondResult.getValue().getResult()); + ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2 << "b" << 2)); + ASSERT_TRUE(sortKeyStage->remotesExhausted()); + + auto thirdResult = sortKeyStage->next(); + ASSERT_OK(thirdResult.getStatus()); + ASSERT(thirdResult.getValue().isEOF()); + ASSERT_TRUE(sortKeyStage->remotesExhausted()); +} + +TEST(RouterStageRemoveMetadataFieldsTest, ForwardsAwaitDataTimeout) { + auto mockStage = stdx::make_unique(opCtx); + auto mockStagePtr = mockStage.get(); + ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus()); + + auto sortKeyStage = stdx::make_unique( + opCtx, std::move(mockStage), std::vector{"$sortKey"_sd}); + ASSERT_OK(sortKeyStage->setAwaitDataTimeout(Milliseconds(789))); + + auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout(); + ASSERT_OK(awaitDataTimeout.getStatus()); + ASSERT_EQ(789, durationCount(awaitDataTimeout.getValue())); +} + +} // namespace + +} // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp deleted file mode 100644 index fe7a8cf0f7d..00000000000 --- a/src/mongo/s/query/router_stage_remove_sortkey.cpp +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery - -#include "mongo/platform/basic.h" - -#include "mongo/s/query/router_stage_remove_sortkey.h" - -#include "mongo/bson/bsonobjbuilder.h" -#include "mongo/s/query/cluster_client_cursor_params.h" -#include "mongo/util/mongoutils/str.h" - -namespace mongo { - -RouterStageRemoveSortKey::RouterStageRemoveSortKey(OperationContext* opCtx, - std::unique_ptr child) - : RouterExecStage(opCtx, std::move(child)) {} - -StatusWith RouterStageRemoveSortKey::next() { - auto childResult = getChildStage()->next(); - if (!childResult.isOK() || !childResult.getValue().getResult()) { - return childResult; - } - - const auto& childObj = childResult.getValue().getResult(); - - BSONObjBuilder builder; - for (BSONElement elt : *childObj) { - if (!str::equals(elt.fieldName(), ClusterClientCursorParams::kSortKeyField)) { - builder.append(elt); - } - } - - return {builder.obj()}; -} - -void RouterStageRemoveSortKey::kill(OperationContext* opCtx) { - getChildStage()->kill(opCtx); -} - -bool RouterStageRemoveSortKey::remotesExhausted() { - return getChildStage()->remotesExhausted(); -} - -Status RouterStageRemoveSortKey::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { - return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); -} - -} // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h deleted file mode 100644 index ba71364dfa9..00000000000 --- a/src/mongo/s/query/router_stage_remove_sortkey.h +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/s/query/router_exec_stage.h" - -namespace mongo { - -/** - * Removes the sort key added to each document by mongod's sortKey meta-projection. - * - * Only needed if the query specifies a sort. - */ -class RouterStageRemoveSortKey final : public RouterExecStage { -public: - RouterStageRemoveSortKey(OperationContext* opCtx, std::unique_ptr child); - - StatusWith next() final; - - void kill(OperationContext* opCtx) final; - - bool remotesExhausted() final; - - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; -}; - -} // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp deleted file mode 100644 index 5767549ad64..00000000000 --- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Copyright 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/s/query/router_stage_remove_sortkey.h" - -#include "mongo/bson/bsonobj.h" -#include "mongo/bson/bsonobjbuilder.h" -#include "mongo/s/query/router_stage_mock.h" -#include "mongo/stdx/memory.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { - -namespace { - -// These tests use router stages, which do not actually use their OperationContext, so rather than -// going through the trouble of making one, we'll just use nullptr throughout. -OperationContext* opCtx = nullptr; - -TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) { - auto mockStage = stdx::make_unique(opCtx); - mockStage->queueResult(BSON("a" << 4 << "$sortKey" << 1 << "b" << 3)); - mockStage->queueResult(BSON("$sortKey" << BSON("" << 3) << "c" << BSON("d" - << "foo"))); - mockStage->queueResult(BSON("a" << 3)); - mockStage->queueResult(BSONObj()); - - auto sortKeyStage = stdx::make_unique(opCtx, std::move(mockStage)); - - auto firstResult = sortKeyStage->next(); - ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue().getResult()); - ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 4 << "b" << 3)); - - auto secondResult = sortKeyStage->next(); - ASSERT_OK(secondResult.getStatus()); - ASSERT(secondResult.getValue().getResult()); - ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), - BSON("c" << BSON("d" - << "foo"))); - - auto thirdResult = sortKeyStage->next(); - ASSERT_OK(thirdResult.getStatus()); - ASSERT(thirdResult.getValue().getResult()); - ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 3)); - - auto fourthResult = sortKeyStage->next(); - ASSERT_OK(fourthResult.getStatus()); - ASSERT(fourthResult.getValue().getResult()); - ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSONObj()); - - auto fifthResult = sortKeyStage->next(); - ASSERT_OK(fifthResult.getStatus()); - ASSERT(fifthResult.getValue().isEOF()); -} - -TEST(RouterStageRemoveSortKeyTest, PropagatesError) { - auto mockStage = stdx::make_unique(opCtx); - mockStage->queueResult(BSON("$sortKey" << 1)); - mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened")); - - auto sortKeyStage = stdx::make_unique(opCtx, std::move(mockStage)); - - auto firstResult = sortKeyStage->next(); - ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue().getResult()); - ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSONObj()); - - auto secondResult = sortKeyStage->next(); - ASSERT_NOT_OK(secondResult.getStatus()); - ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue); - ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened"); -} - -TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) { - auto mockStage = stdx::make_unique(opCtx); - mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1)); - mockStage->queueEOF(); - mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2)); - - auto sortKeyStage = stdx::make_unique(opCtx, std::move(mockStage)); - - auto firstResult = sortKeyStage->next(); - ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue().getResult()); - ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1)); - - auto secondResult = sortKeyStage->next(); - ASSERT_OK(secondResult.getStatus()); - ASSERT(secondResult.getValue().isEOF()); - - auto thirdResult = sortKeyStage->next(); - ASSERT_OK(thirdResult.getStatus()); - ASSERT(thirdResult.getValue().getResult()); - ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 2 << "b" << 2)); - - auto fourthResult = sortKeyStage->next(); - ASSERT_OK(fourthResult.getStatus()); - ASSERT(fourthResult.getValue().isEOF()); -} - -TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) { - auto mockStage = stdx::make_unique(opCtx); - mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1)); - mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2)); - mockStage->markRemotesExhausted(); - - auto sortKeyStage = stdx::make_unique(opCtx, std::move(mockStage)); - ASSERT_TRUE(sortKeyStage->remotesExhausted()); - - auto firstResult = sortKeyStage->next(); - ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue().getResult()); - ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1)); - ASSERT_TRUE(sortKeyStage->remotesExhausted()); - - auto secondResult = sortKeyStage->next(); - ASSERT_OK(secondResult.getStatus()); - ASSERT(secondResult.getValue().getResult()); - ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2 << "b" << 2)); - ASSERT_TRUE(sortKeyStage->remotesExhausted()); - - auto thirdResult = sortKeyStage->next(); - ASSERT_OK(thirdResult.getStatus()); - ASSERT(thirdResult.getValue().isEOF()); - ASSERT_TRUE(sortKeyStage->remotesExhausted()); -} - -TEST(RouterStageRemoveSortKeyTest, ForwardsAwaitDataTimeout) { - auto mockStage = stdx::make_unique(opCtx); - auto mockStagePtr = mockStage.get(); - ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus()); - - auto sortKeyStage = stdx::make_unique(opCtx, std::move(mockStage)); - ASSERT_OK(sortKeyStage->setAwaitDataTimeout(Milliseconds(789))); - - auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout(); - ASSERT_OK(awaitDataTimeout.getStatus()); - ASSERT_EQ(789, durationCount(awaitDataTimeout.getValue())); -} - -} // namespace - -} // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp index 50d2107b14c..b514731c9cd 100644 --- a/src/mongo/s/query/router_stage_skip.cpp +++ b/src/mongo/s/query/router_stage_skip.cpp @@ -58,16 +58,4 @@ StatusWith RouterStageSkip::next() { return getChildStage()->next(); } -void RouterStageSkip::kill(OperationContext* opCtx) { - getChildStage()->kill(opCtx); -} - -bool RouterStageSkip::remotesExhausted() { - return getChildStage()->remotesExhausted(); -} - -Status RouterStageSkip::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { - return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); -} - } // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h index 49051128577..9e67d25b74d 100644 --- a/src/mongo/s/query/router_stage_skip.h +++ b/src/mongo/s/query/router_stage_skip.h @@ -43,12 +43,6 @@ public: StatusWith next() final; - void kill(OperationContext* opCtx) final; - - bool remotesExhausted() final; - - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - private: long long _skip; diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index dce282c5892..506ac226636 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -63,10 +63,11 @@ StatusWith storePossibleCursor(OperationContext* opCtx, ClusterClientCursorParams params( incomingCursorResponse.getValue().getNSS(), AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames()); - params.remotes.emplace_back( - shardId, - server, - CursorResponse(requestedNss, incomingCursorResponse.getValue().getCursorId(), {})); + params.remotes.emplace_back(shardId, + server, + CursorResponse(incomingCursorResponse.getValue().getNSS(), + incomingCursorResponse.getValue().getCursorId(), + {})); auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params)); -- cgit v1.2.1