summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-11-13 14:44:57 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2018-11-20 04:41:44 +0000
commitbba874c3fd9f43e9f8a6820fc600f3d86e1d4099 (patch)
tree0cad411b77605eace86b6696f642b3dca3ec2fe6
parentac99b600745b488968257d936d8fad0f755c4f1c (diff)
downloadmongo-bba874c3fd9f43e9f8a6820fc600f3d86e1d4099.tar.gz
SERVER-37750 Optimized $sample stage does not yield
(cherry picked from commit 4177d6d22ab3329a8607bf80a62aa03d4fb2c528)
-rw-r--r--jstests/aggregation/bugs/server37750.js80
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp32
2 files changed, 96 insertions, 16 deletions
diff --git a/jstests/aggregation/bugs/server37750.js b/jstests/aggregation/bugs/server37750.js
new file mode 100644
index 00000000000..2ca9dad9ec4
--- /dev/null
+++ b/jstests/aggregation/bugs/server37750.js
@@ -0,0 +1,80 @@
+/**
+ * Confirms that a sharded $sample which employs the DSSampleFromRandomCursor optimization is
+ * capable of yielding.
+ *
+ * @tags: [assumes_read_concern_unchanged, do_not_wrap_aggregations_in_facets, requires_journaling,
+ * requires_sharding]
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+
+ // Set up a 2-shard cluster. Configure 'internalQueryExecYieldIterations' on both shards such
+ // that operations will yield on each PlanExecuter iteration.
+ const st = new ShardingTest({
+ name: jsTestName(),
+ shards: 2,
+ rs: {nodes: 1, setParameter: {internalQueryExecYieldIterations: 1}}
+ });
+
+ const mongosDB = st.s.getDB(jsTestName());
+ const mongosColl = mongosDB.test;
+
+ // Shard the test collection, split it at {_id: 0}, and move the upper chunk to shard1.
+ st.shardColl(mongosColl, {_id: 1}, {_id: 0}, {_id: 0});
+
+ // Insert enough documents on each shard to induce the $sample random-cursor optimization.
+ for (let i = (-150); i < 150; ++i) {
+ assert.commandWorked(mongosColl.insert({_id: i}));
+ }
+
+ // Run the initial aggregate for the $sample stage.
+ const cmdRes = assert.commandWorked(mongosDB.runCommand({
+ aggregate: mongosColl.getName(),
+ pipeline: [{$sample: {size: 3}}],
+ comment: "$sample random",
+ cursor: {batchSize: 0}
+ }));
+ assert.eq(cmdRes.cursor.firstBatch.length, 0);
+
+ // Force each shard to hang on yield to allow for currentOp capture.
+ FixtureHelpers.runCommandOnEachPrimary({
+ db: mongosDB.getSiblingDB("admin"),
+ cmdObj: {
+ configureFailPoint: "setYieldAllLocksHang",
+ mode: "alwaysOn",
+ data: {namespace: mongosColl.getFullName()}
+ }
+ });
+
+ // Run $currentOp to confirm that the $sample getMore yields on both shards.
+ const awaitShell = startParallelShell(() => {
+ load("jstests/libs/fixture_helpers.js");
+ assert.soon(() => db.getSiblingDB("admin")
+ .aggregate([
+ {$currentOp: {}},
+ {
+ $match: {
+ "originatingCommand.comment": "$sample random",
+ planSummary: "MULTI_ITERATOR",
+ numYields: {$gt: 0}
+ }
+ }
+ ])
+ .itcount() === 2);
+ // Release the failpoint and allow the getMores to complete.
+ FixtureHelpers.runCommandOnEachPrimary({
+ db: db.getSiblingDB("admin"),
+ cmdObj: {configureFailPoint: "setYieldAllLocksHang", mode: "off"}
+ });
+ }, mongosDB.getMongo().port);
+
+ // Retrieve the results for the $sample aggregation.
+ const sampleCursor = new DBCommandCursor(mongosDB, cmdRes);
+ assert.eq(sampleCursor.toArray().length, 3);
+
+ // Confirm that the parallel shell completes successfully, and tear down the cluster.
+ awaitShell();
+ st.stop();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 4d2f0be3f8f..c99b3acadd2 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -105,6 +105,10 @@ namespace {
*/
StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorExecutor(
Collection* collection, OperationContext* opCtx, long long sampleSize, long long numRecords) {
+ // Verify that we are already under a collection lock. We avoid taking locks ourselves in this
+ // function because double-locking forces any PlanExecutor we create to adopt a NO_YIELD policy.
+ invariant(opCtx->lockState()->isCollectionLockedForMode(collection->ns().ns(), MODE_IS));
+
double kMaxSampleRatioForRandCursor = 0.05;
if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) {
return {nullptr};
@@ -149,22 +153,18 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx
opCtx, ws.get(), idxIterator.release(), nullptr, collection);
}
- {
- AutoGetCollectionForRead autoColl(opCtx, collection->ns());
-
- // If we're in a sharded environment, we need to filter out documents we don't own.
- if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) {
- auto shardFilterStage = stdx::make_unique<ShardFilterStage>(
- opCtx,
- CollectionShardingState::get(opCtx, collection->ns())->getMetadata(opCtx),
- ws.get(),
- stage.release());
- return PlanExecutor::make(opCtx,
- std::move(ws),
- std::move(shardFilterStage),
- collection,
- PlanExecutor::YIELD_AUTO);
- }
+ // If we're in a sharded environment, we need to filter out documents we don't own.
+ if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) {
+ auto shardFilterStage = stdx::make_unique<ShardFilterStage>(
+ opCtx,
+ CollectionShardingState::get(opCtx, collection->ns())->getMetadata(opCtx),
+ ws.get(),
+ stage.release());
+ return PlanExecutor::make(opCtx,
+ std::move(ws),
+ std::move(shardFilterStage),
+ collection,
+ PlanExecutor::YIELD_AUTO);
}
return PlanExecutor::make(