diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-11-13 14:44:57 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2018-11-20 22:41:44 +0000 |
commit | 072f6a59fe9ae19b77507a550c50b973ef3124d2 (patch) | |
tree | f8bb1eb2a2731ecbc02adadbe1ed209aa2c33bc9 | |
parent | ae4011cf301665046c435c18fd6ef088f5881e04 (diff) | |
download | mongo-072f6a59fe9ae19b77507a550c50b973ef3124d2.tar.gz |
SERVER-37750 Optimized $sample stage does not yield
(cherry picked from commit 4177d6d22ab3329a8607bf80a62aa03d4fb2c528)
-rw-r--r-- | buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml | 1 | ||||
-rw-r--r-- | jstests/aggregation/bugs/server37750.js | 80 | ||||
-rw-r--r-- | jstests/change_streams/invalid_namespaces.js | 2 | ||||
-rw-r--r-- | jstests/change_streams/lookup_post_image.js | 2 | ||||
-rw-r--r-- | jstests/change_streams/lookup_post_image_resume_after.js | 2 | ||||
-rw-r--r-- | jstests/libs/fixture_helpers.js | 27 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 32 | ||||
-rw-r--r-- | src/mongo/shell/shell_options.cpp | 2 |
8 files changed, 114 insertions, 34 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml index a9f6b7dcf31..ec75451ddd7 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml @@ -10,6 +10,7 @@ selector: # The following tests start their own ShardingTest. - jstests/aggregation/bugs/server6179.js - jstests/aggregation/bugs/server7781.js + - jstests/aggregation/bugs/server37750.js - jstests/aggregation/mongos_merge.js - jstests/aggregation/mongos_slaveok.js - jstests/aggregation/shard_targeting.js diff --git a/jstests/aggregation/bugs/server37750.js b/jstests/aggregation/bugs/server37750.js new file mode 100644 index 00000000000..56b11511acc --- /dev/null +++ b/jstests/aggregation/bugs/server37750.js @@ -0,0 +1,80 @@ +/** + * Confirms that a sharded $sample which employs the DSSampleFromRandomCursor optimization is + * capable of yielding. + * + * @tags: [assumes_read_concern_unchanged, do_not_wrap_aggregations_in_facets, requires_journaling, + * requires_sharding] + */ +(function() { + "use strict"; + + load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. + + // Set up a 2-shard cluster. Configure 'internalQueryExecYieldIterations' on both shards such + // that operations will yield on each PlanExecuter iteration. + const st = new ShardingTest({ + name: jsTestName(), + shards: 2, + rs: {nodes: 1, setParameter: {internalQueryExecYieldIterations: 1}} + }); + + const mongosDB = st.s.getDB(jsTestName()); + const mongosColl = mongosDB.test; + + // Shard the test collection, split it at {_id: 0}, and move the upper chunk to shard1. + st.shardColl(mongosColl, {_id: 1}, {_id: 0}, {_id: 0}); + + // Insert enough documents on each shard to induce the $sample random-cursor optimization. + for (let i = (-150); i < 150; ++i) { + assert.writeOK(mongosColl.insert({_id: i})); + } + + // Run the initial aggregate for the $sample stage. + const cmdRes = assert.commandWorked(mongosDB.runCommand({ + aggregate: mongosColl.getName(), + pipeline: [{$sample: {size: 3}}], + comment: "$sample random", + cursor: {batchSize: 0} + })); + assert.eq(cmdRes.cursor.firstBatch.length, 0); + + // Force each shard to hang on yield to allow for currentOp capture. + FixtureHelpers.runCommandOnEachPrimary({ + db: mongosDB.getSiblingDB("admin"), + cmdObj: { + configureFailPoint: "setYieldAllLocksHang", + mode: "alwaysOn", + data: {namespace: mongosColl.getFullName()} + } + }); + + // Run $currentOp to confirm that the $sample getMore yields on both shards. + const awaitShell = startParallelShell(() => { + load("jstests/libs/fixture_helpers.js"); + assert.soon(() => db.getSiblingDB("admin") + .aggregate([ + {$currentOp: {}}, + { + $match: { + "originatingCommand.comment": "$sample random", + planSummary: "MULTI_ITERATOR", + numYields: {$gt: 0} + } + } + ]) + .itcount() === 2); + // Release the failpoint and allow the getMores to complete. + FixtureHelpers.runCommandOnEachPrimary({ + db: db.getSiblingDB("admin"), + cmdObj: {configureFailPoint: "setYieldAllLocksHang", mode: "off"} + }); + }, mongosDB.getMongo().port); + + // Retrieve the results for the $sample aggregation. + const sampleCursor = new DBCommandCursor(mongosDB, cmdRes); + assert.eq(sampleCursor.toArray().length, 3); + + // Confirm that the parallel shell completes successfully, and tear down the cluster. + awaitShell(); + st.stop(); +})();
\ No newline at end of file diff --git a/jstests/change_streams/invalid_namespaces.js b/jstests/change_streams/invalid_namespaces.js index ed1c98af9f7..13b012808c5 100644 --- a/jstests/change_streams/invalid_namespaces.js +++ b/jstests/change_streams/invalid_namespaces.js @@ -12,7 +12,7 @@ assertInvalidChangeStreamNss("admin"); assertInvalidChangeStreamNss("config"); // Not allowed to access 'local' database through mongos. - if (!FixtureHelpers.isMongos()) { + if (!FixtureHelpers.isMongos(db)) { assertInvalidChangeStreamNss("local"); } diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js index ce22137f09c..31a70801a8b 100644 --- a/jstests/change_streams/lookup_post_image.js +++ b/jstests/change_streams/lookup_post_image.js @@ -110,7 +110,7 @@ // If this test is running with secondary read preference, it's necessary for the remove // to propagate to all secondary nodes and be available for majority reads before we can // assume looking up the document will fail. - FixtureHelpers.awaitLastOpCommitted(); + FixtureHelpers.awaitLastOpCommitted(db); latestChange = cst.getOneChange(cursor); assert.eq(latestChange.operationType, "update"); diff --git a/jstests/change_streams/lookup_post_image_resume_after.js b/jstests/change_streams/lookup_post_image_resume_after.js index b2d8e345679..1484955aaea 100644 --- a/jstests/change_streams/lookup_post_image_resume_after.js +++ b/jstests/change_streams/lookup_post_image_resume_after.js @@ -50,7 +50,7 @@ // If this test is running with secondary read preference, it's necessary for the drop to // propagate to all secondary nodes and be available for majority reads before we can assume // looking up the document will fail. - FixtureHelpers.awaitLastOpCommitted(); + FixtureHelpers.awaitLastOpCommitted(db); // Check the next $changeStream entry; this is the test document inserted above. let latestChange = cst.getOneChange(firstResumeCursor); diff --git a/jstests/libs/fixture_helpers.js b/jstests/libs/fixture_helpers.js index 812daf14667..90d39767440 100644 --- a/jstests/libs/fixture_helpers.js +++ b/jstests/libs/fixture_helpers.js @@ -5,6 +5,8 @@ * replica set, a sharded cluster, etc. */ var FixtureHelpers = (function() { + load("jstests/concurrency/fsm_workload_helpers/server_types.js"); // For isMongos. + function _getHostStringForReplSet(connectionToNodeInSet) { const isMaster = assert.commandWorked(connectionToNodeInSet.getDB("test").isMaster()); assert( @@ -13,17 +15,13 @@ var FixtureHelpers = (function() { return isMaster.setName + "/" + isMaster.hosts.join(","); } - function isMongos() { - return db.runCommand({isdbgrid: 1}).isdbgrid; - } - /** * Returns an array of connections to each data-bearing replica set in the fixture (not * including the config servers). */ - function _getAllReplicas() { + function _getAllReplicas(db) { let replicas = []; - if (isMongos()) { + if (isMongos(db)) { const shardObjs = db.getSiblingDB("config").shards.find().sort({_id: 1}); replicas = shardObjs.map((shardObj) => new ReplSetTest(shardObj.host)); } else { @@ -37,8 +35,8 @@ var FixtureHelpers = (function() { * in each replica set in the fixture (besides the config servers) to reach the same op time. * Asserts if the fixture is a standalone or if the shards are standalones. */ - function awaitReplication() { - _getAllReplicas().forEach((replSet) => replSet.awaitReplication()); + function awaitReplication(db) { + _getAllReplicas(db).forEach((replSet) => replSet.awaitReplication()); } /** @@ -48,8 +46,8 @@ var FixtureHelpers = (function() { * * Asserts if the fixture is a standalone or if the shards are standalones. */ - function awaitLastOpCommitted() { - _getAllReplicas().forEach((replSet) => replSet.awaitLastOpCommitted()); + function awaitLastOpCommitted(db) { + _getAllReplicas(db).forEach((replSet) => replSet.awaitLastOpCommitted()); } /** @@ -58,9 +56,10 @@ var FixtureHelpers = (function() { * array with the responses from each shard, or with a single element if the fixture was a * replica set. Asserts if the fixture is a standalone or if the shards are standalones. */ - function runCommandOnEachPrimary({dbName, cmdObj}) { - return _getAllReplicas().map((replSet) => assert.commandWorked( - replSet.getPrimary().getDB(dbName).runCommand(cmdObj))); + function runCommandOnEachPrimary({db, cmdObj}) { + return _getAllReplicas(db).map( + (replSet) => + assert.commandWorked(replSet.getPrimary().getDB(db.getName()).runCommand(cmdObj))); } /** @@ -68,7 +67,7 @@ var FixtureHelpers = (function() { * Returns the same connection that 'db' is using if the fixture is not a sharded cluster. */ function getPrimaryForNodeHostingDatabase(db) { - if (!isMongos()) { + if (!isMongos(db)) { return db.getMongo(); } const configDB = db.getSiblingDB("config"); diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 9d49b4ef6cd..9be12b669e6 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -455,6 +455,10 @@ private: */ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorExecutor( Collection* collection, OperationContext* opCtx, long long sampleSize, long long numRecords) { + // Verify that we are already under a collection lock. We avoid taking locks ourselves in this + // function because double-locking forces any PlanExecutor we create to adopt a NO_YIELD policy. + invariant(opCtx->lockState()->isCollectionLockedForMode(collection->ns().ns(), MODE_IS)); + double kMaxSampleRatioForRandCursor = 0.05; if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) { return {nullptr}; @@ -499,22 +503,18 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx opCtx, ws.get(), idxIterator.release(), nullptr, collection); } - { - AutoGetCollectionForRead autoColl(opCtx, collection->ns()); - - // If we're in a sharded environment, we need to filter out documents we don't own. - if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) { - auto shardFilterStage = stdx::make_unique<ShardFilterStage>( - opCtx, - CollectionShardingState::get(opCtx, collection->ns())->getMetadata(), - ws.get(), - stage.release()); - return PlanExecutor::make(opCtx, - std::move(ws), - std::move(shardFilterStage), - collection, - PlanExecutor::YIELD_AUTO); - } + // If we're in a sharded environment, we need to filter out documents we don't own. + if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) { + auto shardFilterStage = stdx::make_unique<ShardFilterStage>( + opCtx, + CollectionShardingState::get(opCtx, collection->ns())->getMetadata(), + ws.get(), + stage.release()); + return PlanExecutor::make(opCtx, + std::move(ws), + std::move(shardFilterStage), + collection, + PlanExecutor::YIELD_AUTO); } return PlanExecutor::make( diff --git a/src/mongo/shell/shell_options.cpp b/src/mongo/shell/shell_options.cpp index c85527b6364..ea9987ab6e1 100644 --- a/src/mongo/shell/shell_options.cpp +++ b/src/mongo/shell/shell_options.cpp @@ -491,7 +491,7 @@ Status storeMongoShellOptions(const moe::Environment& params, if (uriOptions.count("compressors")) shellGlobalParams.networkMessageCompressors = uriOptions["compressors"]; } - + if (!shellGlobalParams.networkMessageCompressors.empty()) { const auto ret = storeMessageCompressionOptions(shellGlobalParams.networkMessageCompressors); |