diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-11-13 14:44:57 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2018-11-20 04:41:44 +0000 |
commit | bba874c3fd9f43e9f8a6820fc600f3d86e1d4099 (patch) | |
tree | 0cad411b77605eace86b6696f642b3dca3ec2fe6 | |
parent | ac99b600745b488968257d936d8fad0f755c4f1c (diff) | |
download | mongo-bba874c3fd9f43e9f8a6820fc600f3d86e1d4099.tar.gz |
SERVER-37750 Optimized $sample stage does not yield
(cherry picked from commit 4177d6d22ab3329a8607bf80a62aa03d4fb2c528)
-rw-r--r-- | jstests/aggregation/bugs/server37750.js | 80 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 32 |
2 files changed, 96 insertions, 16 deletions
diff --git a/jstests/aggregation/bugs/server37750.js b/jstests/aggregation/bugs/server37750.js new file mode 100644 index 00000000000..2ca9dad9ec4 --- /dev/null +++ b/jstests/aggregation/bugs/server37750.js @@ -0,0 +1,80 @@ +/** + * Confirms that a sharded $sample which employs the DSSampleFromRandomCursor optimization is + * capable of yielding. + * + * @tags: [assumes_read_concern_unchanged, do_not_wrap_aggregations_in_facets, requires_journaling, + * requires_sharding] + */ +(function() { + "use strict"; + + load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. + + // Set up a 2-shard cluster. Configure 'internalQueryExecYieldIterations' on both shards such + // that operations will yield on each PlanExecuter iteration. + const st = new ShardingTest({ + name: jsTestName(), + shards: 2, + rs: {nodes: 1, setParameter: {internalQueryExecYieldIterations: 1}} + }); + + const mongosDB = st.s.getDB(jsTestName()); + const mongosColl = mongosDB.test; + + // Shard the test collection, split it at {_id: 0}, and move the upper chunk to shard1. + st.shardColl(mongosColl, {_id: 1}, {_id: 0}, {_id: 0}); + + // Insert enough documents on each shard to induce the $sample random-cursor optimization. + for (let i = (-150); i < 150; ++i) { + assert.commandWorked(mongosColl.insert({_id: i})); + } + + // Run the initial aggregate for the $sample stage. + const cmdRes = assert.commandWorked(mongosDB.runCommand({ + aggregate: mongosColl.getName(), + pipeline: [{$sample: {size: 3}}], + comment: "$sample random", + cursor: {batchSize: 0} + })); + assert.eq(cmdRes.cursor.firstBatch.length, 0); + + // Force each shard to hang on yield to allow for currentOp capture. + FixtureHelpers.runCommandOnEachPrimary({ + db: mongosDB.getSiblingDB("admin"), + cmdObj: { + configureFailPoint: "setYieldAllLocksHang", + mode: "alwaysOn", + data: {namespace: mongosColl.getFullName()} + } + }); + + // Run $currentOp to confirm that the $sample getMore yields on both shards. + const awaitShell = startParallelShell(() => { + load("jstests/libs/fixture_helpers.js"); + assert.soon(() => db.getSiblingDB("admin") + .aggregate([ + {$currentOp: {}}, + { + $match: { + "originatingCommand.comment": "$sample random", + planSummary: "MULTI_ITERATOR", + numYields: {$gt: 0} + } + } + ]) + .itcount() === 2); + // Release the failpoint and allow the getMores to complete. + FixtureHelpers.runCommandOnEachPrimary({ + db: db.getSiblingDB("admin"), + cmdObj: {configureFailPoint: "setYieldAllLocksHang", mode: "off"} + }); + }, mongosDB.getMongo().port); + + // Retrieve the results for the $sample aggregation. + const sampleCursor = new DBCommandCursor(mongosDB, cmdRes); + assert.eq(sampleCursor.toArray().length, 3); + + // Confirm that the parallel shell completes successfully, and tear down the cluster. + awaitShell(); + st.stop(); +})();
\ No newline at end of file diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 4d2f0be3f8f..c99b3acadd2 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -105,6 +105,10 @@ namespace { */ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorExecutor( Collection* collection, OperationContext* opCtx, long long sampleSize, long long numRecords) { + // Verify that we are already under a collection lock. We avoid taking locks ourselves in this + // function because double-locking forces any PlanExecutor we create to adopt a NO_YIELD policy. + invariant(opCtx->lockState()->isCollectionLockedForMode(collection->ns().ns(), MODE_IS)); + double kMaxSampleRatioForRandCursor = 0.05; if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) { return {nullptr}; @@ -149,22 +153,18 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx opCtx, ws.get(), idxIterator.release(), nullptr, collection); } - { - AutoGetCollectionForRead autoColl(opCtx, collection->ns()); - - // If we're in a sharded environment, we need to filter out documents we don't own. - if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) { - auto shardFilterStage = stdx::make_unique<ShardFilterStage>( - opCtx, - CollectionShardingState::get(opCtx, collection->ns())->getMetadata(opCtx), - ws.get(), - stage.release()); - return PlanExecutor::make(opCtx, - std::move(ws), - std::move(shardFilterStage), - collection, - PlanExecutor::YIELD_AUTO); - } + // If we're in a sharded environment, we need to filter out documents we don't own. + if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) { + auto shardFilterStage = stdx::make_unique<ShardFilterStage>( + opCtx, + CollectionShardingState::get(opCtx, collection->ns())->getMetadata(opCtx), + ws.get(), + stage.release()); + return PlanExecutor::make(opCtx, + std::move(ws), + std::move(shardFilterStage), + collection, + PlanExecutor::YIELD_AUTO); } return PlanExecutor::make( |