summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@10gen.com>2021-07-21 13:57:54 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-27 13:47:45 +0000
commit68ffa209cea68d9dc22f3fd1681c690a6ee0e18c (patch)
tree99c3ba0f2271b3492976f99bf8880c1ffec41fa4
parent550ed16f978dd43e81577c7b0e9ede3de7f68f6a (diff)
downloadmongo-68ffa209cea68d9dc22f3fd1681c690a6ee0e18c.tar.gz
SERVER-57642 Configure $sample pushdown PlanExecutor to use 'INTERRUPT_ONLY' yield policy when running in a transaction
(cherry picked from commit d2448e3da8a121955d5cb2bdbb50c8f2c1e9f6ca) (cherry picked from commit 7984847de09877d062c66373ebf108f1b24de077) (cherry picked from commit aed22553124e994a54385f36c5d5ce1467b3cd92) (cherry picked from commit b20d2e60b357fa928727a3bbd37afce95dd6e49f)
-rw-r--r--jstests/noPassthrough/sample_pushdown_transaction.js52
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp12
2 files changed, 57 insertions, 7 deletions
diff --git a/jstests/noPassthrough/sample_pushdown_transaction.js b/jstests/noPassthrough/sample_pushdown_transaction.js
new file mode 100644
index 00000000000..d9e677dfa33
--- /dev/null
+++ b/jstests/noPassthrough/sample_pushdown_transaction.js
@@ -0,0 +1,52 @@
+/**
+ * Verify that $sample push down works properly in a transaction. This test was designed to
+ * reproduce SERVER-57642.
+ *
+ * Requires WiredTiger for random cursor support.
+ * @tags: [requires_wiredtiger, requires_replication]
+ */
+(function() {
+ 'use strict';
+
+ load('jstests/libs/analyze_plan.js'); // For planHasStage.
+
+ // Set up.
+ const rst = new ReplSetTest({nodes: 1});
+ rst.startSet();
+ rst.initiate();
+ const collName = 'sample_pushdown';
+ const dbName = 'test';
+ const testDB = rst.getPrimary().getDB(dbName);
+ const coll = testDB[collName];
+
+ // In order to construct a plan that uses a storage engine random cursor, we not only need more
+ // than 100 records in our collection, we also need the sample size to be less than 5% of the
+ // number of documents in our collection.
+ const numDocs = 1000;
+ const sampleSize = numDocs * .03;
+ let docs = [];
+ for (let i = 0; i < numDocs; ++i) {
+ docs.push({a: i});
+ }
+ assert.commandWorked(coll.insert(docs));
+ const pipeline = [{$sample: {size: sampleSize}}, {$match: {a: {$gte: 0}}}];
+
+ // Verify that our pipeline uses $sample push down.
+ const explain = coll.explain().aggregate(pipeline);
+ assert(aggPlanHasStage(explain, "$sampleFromRandomCursor"), tojson(explain));
+
+ // Start the transaction.
+ const session = testDB.getMongo().startSession({causalConsistency: false});
+ const sessionDB = session.getDatabase(dbName);
+ session.startTransaction();
+
+ // Run the pipeline.
+ const randDocs = sessionDB[collName].aggregate(pipeline).toArray();
+
+ // Verify that we have at least one result.
+ assert.gt(randDocs.length, 0, tojson(randDocs));
+
+ // Clean up.
+ assert.commandWorked(session.abortTransaction_forTesting());
+ rst.stopSet();
+})();
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index c4e19f548d5..472965c6b4a 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -154,6 +154,8 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx
opCtx, ws.get(), idxIterator.release(), nullptr, collection);
}
+ auto yieldPolicy =
+ opCtx->getWriteUnitOfWork() ? PlanExecutor::INTERRUPT_ONLY : PlanExecutor::YIELD_AUTO;
// If we're in a sharded environment, we need to filter out documents we don't own.
if (OperationShardingState::isOperationVersioned(opCtx)) {
auto shardFilterStage = stdx::make_unique<ShardFilterStage>(
@@ -161,15 +163,11 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx
CollectionShardingState::get(opCtx, collection->ns())->getOrphansFilter(opCtx),
ws.get(),
stage.release());
- return PlanExecutor::make(opCtx,
- std::move(ws),
- std::move(shardFilterStage),
- collection,
- PlanExecutor::YIELD_AUTO);
+ return PlanExecutor::make(
+ opCtx, std::move(ws), std::move(shardFilterStage), collection, yieldPolicy);
}
- return PlanExecutor::make(
- opCtx, std::move(ws), std::move(stage), collection, PlanExecutor::YIELD_AUTO);
+ return PlanExecutor::make(opCtx, std::move(ws), std::move(stage), collection, yieldPolicy);
}
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor(