summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-11-13 14:44:57 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2018-11-20 22:41:44 +0000
commit072f6a59fe9ae19b77507a550c50b973ef3124d2 (patch)
treef8bb1eb2a2731ecbc02adadbe1ed209aa2c33bc9
parentae4011cf301665046c435c18fd6ef088f5881e04 (diff)
downloadmongo-072f6a59fe9ae19b77507a550c50b973ef3124d2.tar.gz
SERVER-37750 Optimized $sample stage does not yield
(cherry picked from commit 4177d6d22ab3329a8607bf80a62aa03d4fb2c528)
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml1
-rw-r--r--jstests/aggregation/bugs/server37750.js80
-rw-r--r--jstests/change_streams/invalid_namespaces.js2
-rw-r--r--jstests/change_streams/lookup_post_image.js2
-rw-r--r--jstests/change_streams/lookup_post_image_resume_after.js2
-rw-r--r--jstests/libs/fixture_helpers.js27
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp32
-rw-r--r--src/mongo/shell/shell_options.cpp2
8 files changed, 114 insertions, 34 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
index a9f6b7dcf31..ec75451ddd7 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
@@ -10,6 +10,7 @@ selector:
# The following tests start their own ShardingTest.
- jstests/aggregation/bugs/server6179.js
- jstests/aggregation/bugs/server7781.js
+ - jstests/aggregation/bugs/server37750.js
- jstests/aggregation/mongos_merge.js
- jstests/aggregation/mongos_slaveok.js
- jstests/aggregation/shard_targeting.js
diff --git a/jstests/aggregation/bugs/server37750.js b/jstests/aggregation/bugs/server37750.js
new file mode 100644
index 00000000000..56b11511acc
--- /dev/null
+++ b/jstests/aggregation/bugs/server37750.js
@@ -0,0 +1,80 @@
+/**
+ * Confirms that a sharded $sample which employs the DSSampleFromRandomCursor optimization is
+ * capable of yielding.
+ *
+ * @tags: [assumes_read_concern_unchanged, do_not_wrap_aggregations_in_facets, requires_journaling,
+ * requires_sharding]
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+
+ // Set up a 2-shard cluster. Configure 'internalQueryExecYieldIterations' on both shards such
+ // that operations will yield on each PlanExecuter iteration.
+ const st = new ShardingTest({
+ name: jsTestName(),
+ shards: 2,
+ rs: {nodes: 1, setParameter: {internalQueryExecYieldIterations: 1}}
+ });
+
+ const mongosDB = st.s.getDB(jsTestName());
+ const mongosColl = mongosDB.test;
+
+ // Shard the test collection, split it at {_id: 0}, and move the upper chunk to shard1.
+ st.shardColl(mongosColl, {_id: 1}, {_id: 0}, {_id: 0});
+
+ // Insert enough documents on each shard to induce the $sample random-cursor optimization.
+ for (let i = (-150); i < 150; ++i) {
+ assert.writeOK(mongosColl.insert({_id: i}));
+ }
+
+ // Run the initial aggregate for the $sample stage.
+ const cmdRes = assert.commandWorked(mongosDB.runCommand({
+ aggregate: mongosColl.getName(),
+ pipeline: [{$sample: {size: 3}}],
+ comment: "$sample random",
+ cursor: {batchSize: 0}
+ }));
+ assert.eq(cmdRes.cursor.firstBatch.length, 0);
+
+ // Force each shard to hang on yield to allow for currentOp capture.
+ FixtureHelpers.runCommandOnEachPrimary({
+ db: mongosDB.getSiblingDB("admin"),
+ cmdObj: {
+ configureFailPoint: "setYieldAllLocksHang",
+ mode: "alwaysOn",
+ data: {namespace: mongosColl.getFullName()}
+ }
+ });
+
+ // Run $currentOp to confirm that the $sample getMore yields on both shards.
+ const awaitShell = startParallelShell(() => {
+ load("jstests/libs/fixture_helpers.js");
+ assert.soon(() => db.getSiblingDB("admin")
+ .aggregate([
+ {$currentOp: {}},
+ {
+ $match: {
+ "originatingCommand.comment": "$sample random",
+ planSummary: "MULTI_ITERATOR",
+ numYields: {$gt: 0}
+ }
+ }
+ ])
+ .itcount() === 2);
+ // Release the failpoint and allow the getMores to complete.
+ FixtureHelpers.runCommandOnEachPrimary({
+ db: db.getSiblingDB("admin"),
+ cmdObj: {configureFailPoint: "setYieldAllLocksHang", mode: "off"}
+ });
+ }, mongosDB.getMongo().port);
+
+ // Retrieve the results for the $sample aggregation.
+ const sampleCursor = new DBCommandCursor(mongosDB, cmdRes);
+ assert.eq(sampleCursor.toArray().length, 3);
+
+ // Confirm that the parallel shell completes successfully, and tear down the cluster.
+ awaitShell();
+ st.stop();
+})(); \ No newline at end of file
diff --git a/jstests/change_streams/invalid_namespaces.js b/jstests/change_streams/invalid_namespaces.js
index ed1c98af9f7..13b012808c5 100644
--- a/jstests/change_streams/invalid_namespaces.js
+++ b/jstests/change_streams/invalid_namespaces.js
@@ -12,7 +12,7 @@
assertInvalidChangeStreamNss("admin");
assertInvalidChangeStreamNss("config");
// Not allowed to access 'local' database through mongos.
- if (!FixtureHelpers.isMongos()) {
+ if (!FixtureHelpers.isMongos(db)) {
assertInvalidChangeStreamNss("local");
}
diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js
index ce22137f09c..31a70801a8b 100644
--- a/jstests/change_streams/lookup_post_image.js
+++ b/jstests/change_streams/lookup_post_image.js
@@ -110,7 +110,7 @@
// If this test is running with secondary read preference, it's necessary for the remove
// to propagate to all secondary nodes and be available for majority reads before we can
// assume looking up the document will fail.
- FixtureHelpers.awaitLastOpCommitted();
+ FixtureHelpers.awaitLastOpCommitted(db);
latestChange = cst.getOneChange(cursor);
assert.eq(latestChange.operationType, "update");
diff --git a/jstests/change_streams/lookup_post_image_resume_after.js b/jstests/change_streams/lookup_post_image_resume_after.js
index b2d8e345679..1484955aaea 100644
--- a/jstests/change_streams/lookup_post_image_resume_after.js
+++ b/jstests/change_streams/lookup_post_image_resume_after.js
@@ -50,7 +50,7 @@
// If this test is running with secondary read preference, it's necessary for the drop to
// propagate to all secondary nodes and be available for majority reads before we can assume
// looking up the document will fail.
- FixtureHelpers.awaitLastOpCommitted();
+ FixtureHelpers.awaitLastOpCommitted(db);
// Check the next $changeStream entry; this is the test document inserted above.
let latestChange = cst.getOneChange(firstResumeCursor);
diff --git a/jstests/libs/fixture_helpers.js b/jstests/libs/fixture_helpers.js
index 812daf14667..90d39767440 100644
--- a/jstests/libs/fixture_helpers.js
+++ b/jstests/libs/fixture_helpers.js
@@ -5,6 +5,8 @@
* replica set, a sharded cluster, etc.
*/
var FixtureHelpers = (function() {
+ load("jstests/concurrency/fsm_workload_helpers/server_types.js"); // For isMongos.
+
function _getHostStringForReplSet(connectionToNodeInSet) {
const isMaster = assert.commandWorked(connectionToNodeInSet.getDB("test").isMaster());
assert(
@@ -13,17 +15,13 @@ var FixtureHelpers = (function() {
return isMaster.setName + "/" + isMaster.hosts.join(",");
}
- function isMongos() {
- return db.runCommand({isdbgrid: 1}).isdbgrid;
- }
-
/**
* Returns an array of connections to each data-bearing replica set in the fixture (not
* including the config servers).
*/
- function _getAllReplicas() {
+ function _getAllReplicas(db) {
let replicas = [];
- if (isMongos()) {
+ if (isMongos(db)) {
const shardObjs = db.getSiblingDB("config").shards.find().sort({_id: 1});
replicas = shardObjs.map((shardObj) => new ReplSetTest(shardObj.host));
} else {
@@ -37,8 +35,8 @@ var FixtureHelpers = (function() {
* in each replica set in the fixture (besides the config servers) to reach the same op time.
* Asserts if the fixture is a standalone or if the shards are standalones.
*/
- function awaitReplication() {
- _getAllReplicas().forEach((replSet) => replSet.awaitReplication());
+ function awaitReplication(db) {
+ _getAllReplicas(db).forEach((replSet) => replSet.awaitReplication());
}
/**
@@ -48,8 +46,8 @@ var FixtureHelpers = (function() {
*
* Asserts if the fixture is a standalone or if the shards are standalones.
*/
- function awaitLastOpCommitted() {
- _getAllReplicas().forEach((replSet) => replSet.awaitLastOpCommitted());
+ function awaitLastOpCommitted(db) {
+ _getAllReplicas(db).forEach((replSet) => replSet.awaitLastOpCommitted());
}
/**
@@ -58,9 +56,10 @@ var FixtureHelpers = (function() {
* array with the responses from each shard, or with a single element if the fixture was a
* replica set. Asserts if the fixture is a standalone or if the shards are standalones.
*/
- function runCommandOnEachPrimary({dbName, cmdObj}) {
- return _getAllReplicas().map((replSet) => assert.commandWorked(
- replSet.getPrimary().getDB(dbName).runCommand(cmdObj)));
+ function runCommandOnEachPrimary({db, cmdObj}) {
+ return _getAllReplicas(db).map(
+ (replSet) =>
+ assert.commandWorked(replSet.getPrimary().getDB(db.getName()).runCommand(cmdObj)));
}
/**
@@ -68,7 +67,7 @@ var FixtureHelpers = (function() {
* Returns the same connection that 'db' is using if the fixture is not a sharded cluster.
*/
function getPrimaryForNodeHostingDatabase(db) {
- if (!isMongos()) {
+ if (!isMongos(db)) {
return db.getMongo();
}
const configDB = db.getSiblingDB("config");
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 9d49b4ef6cd..9be12b669e6 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -455,6 +455,10 @@ private:
*/
StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorExecutor(
Collection* collection, OperationContext* opCtx, long long sampleSize, long long numRecords) {
+ // Verify that we are already under a collection lock. We avoid taking locks ourselves in this
+ // function because double-locking forces any PlanExecutor we create to adopt a NO_YIELD policy.
+ invariant(opCtx->lockState()->isCollectionLockedForMode(collection->ns().ns(), MODE_IS));
+
double kMaxSampleRatioForRandCursor = 0.05;
if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) {
return {nullptr};
@@ -499,22 +503,18 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx
opCtx, ws.get(), idxIterator.release(), nullptr, collection);
}
- {
- AutoGetCollectionForRead autoColl(opCtx, collection->ns());
-
- // If we're in a sharded environment, we need to filter out documents we don't own.
- if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) {
- auto shardFilterStage = stdx::make_unique<ShardFilterStage>(
- opCtx,
- CollectionShardingState::get(opCtx, collection->ns())->getMetadata(),
- ws.get(),
- stage.release());
- return PlanExecutor::make(opCtx,
- std::move(ws),
- std::move(shardFilterStage),
- collection,
- PlanExecutor::YIELD_AUTO);
- }
+ // If we're in a sharded environment, we need to filter out documents we don't own.
+ if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) {
+ auto shardFilterStage = stdx::make_unique<ShardFilterStage>(
+ opCtx,
+ CollectionShardingState::get(opCtx, collection->ns())->getMetadata(),
+ ws.get(),
+ stage.release());
+ return PlanExecutor::make(opCtx,
+ std::move(ws),
+ std::move(shardFilterStage),
+ collection,
+ PlanExecutor::YIELD_AUTO);
}
return PlanExecutor::make(
diff --git a/src/mongo/shell/shell_options.cpp b/src/mongo/shell/shell_options.cpp
index c85527b6364..ea9987ab6e1 100644
--- a/src/mongo/shell/shell_options.cpp
+++ b/src/mongo/shell/shell_options.cpp
@@ -491,7 +491,7 @@ Status storeMongoShellOptions(const moe::Environment& params,
if (uriOptions.count("compressors"))
shellGlobalParams.networkMessageCompressors = uriOptions["compressors"];
}
-
+
if (!shellGlobalParams.networkMessageCompressors.empty()) {
const auto ret =
storeMessageCompressionOptions(shellGlobalParams.networkMessageCompressors);