summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_pv0.yml3
-rw-r--r--jstests/replsets/awaitdata_getmore_new_last_committed_optime.js109
-rw-r--r--src/mongo/db/query/plan_executor.cpp29
-rw-r--r--src/mongo/db/query/plan_executor.h6
4 files changed, 139 insertions, 8 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_pv0.yml b/buildscripts/resmokeconfig/suites/replica_sets_pv0.yml
index be2a0047e8f..11520daa0ca 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_pv0.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_pv0.yml
@@ -38,7 +38,8 @@ selector:
- jstests/replsets/noop_writes_wait_for_write_concern_fcv.js
# Election handoff is not supported in PV0, since it requires replSetStepUp.
- jstests/replsets/election_handoff*.js
-
+ # replSetGetStatus doesn't return read concern majority optime in PV0.
+ - jstests/replsets/awaitdata_getmore_new_last_committed_optime.js
executor:
config:
diff --git a/jstests/replsets/awaitdata_getmore_new_last_committed_optime.js b/jstests/replsets/awaitdata_getmore_new_last_committed_optime.js
new file mode 100644
index 00000000000..c2ab32c68f2
--- /dev/null
+++ b/jstests/replsets/awaitdata_getmore_new_last_committed_optime.js
@@ -0,0 +1,109 @@
+// Regression test to ensure that we don't crash during a getMore if the client's
+// lastKnownCommittedOpTime switches from being ahead of the node's lastCommittedOpTime to behind
+// while an awaitData query is running. See SERVER-35239.
+
+(function() {
+ 'use strict';
+ load('jstests/replsets/rslib.js');
+ load('jstests/libs/check_log.js');
+
+ const name = 'awaitdata_getmore_new_last_committed_optime';
+ const replSet = new ReplSetTest({name: name, nodes: 5});
+
+ replSet.startSet();
+ replSet.initiate();
+
+ const dbName = 'test';
+ const collName = 'coll';
+
+ const primary = replSet.getPrimary();
+ const secondaries = replSet.getSecondaries();
+ const secondary = secondaries[0];
+
+ const primaryDB = primary.getDB(dbName);
+
+ // Create capped collection on primary and allow it to be committed.
+ assert.commandWorked(primaryDB.createCollection(collName, {capped: true, size: 2048}));
+ replSet.awaitReplication();
+ replSet.awaitLastOpCommitted();
+
+ // Stop data replication on 3 secondaries to prevent writes being committed.
+ jsTestLog('Stopping replication');
+ stopServerReplication(secondaries[1]);
+ stopServerReplication(secondaries[2]);
+ stopServerReplication(secondaries[3]);
+
+ // Write data to primary.
+ for (let i = 0; i < 2; i++) {
+ assert.writeOK(primaryDB[collName].insert({_id: i}, {writeConcern: {w: 2}}));
+ }
+
+ jsTestLog('Secondary has replicated data');
+
+ jsTestLog('Starting parallel shell');
+ // Start a parallel shell because we'll be enabling a failpoint that will make the thread hang.
+ let waitForGetMoreToFinish = startParallelShell(() => {
+ load('jstests/replsets/rslib.js');
+
+ const secondary = db.getMongo();
+ secondary.setSlaveOk();
+
+ const dbName = 'test';
+ const collName = 'coll';
+ const awaitDataDB = db.getSiblingDB('test');
+
+ // Create awaitData cursor and get all data written so that a following getMore will have to
+ // wait for more data.
+ let cmdRes =
+ awaitDataDB.runCommand({find: collName, batchSize: 2, awaitData: true, tailable: true});
+ assert.commandWorked(cmdRes);
+ assert.gt(cmdRes.cursor.id, NumberLong(0));
+ assert.eq(cmdRes.cursor.ns, dbName + "." + collName);
+ assert.eq(cmdRes.cursor.firstBatch.length, 2, tojson(cmdRes));
+
+ // Enable failpoint.
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: 'planExecutorHangBeforeShouldWaitForInserts', mode: 'alwaysOn'}));
+
+ // Call getMore on awaitData cursor with lastKnownCommittedOpTime ahead of node. This will
+ // hang until we've disabled the failpoint. maxTimeMS must be set otherwise the default
+ // timeout for waiting for inserts is 1 second.
+ const lastOpTime = getLastOpTime(secondary);
+ cmdRes = awaitDataDB.runCommand({
+ getMore: cmdRes.cursor.id,
+ collection: collName,
+ batchSize: NumberInt(2),
+ maxTimeMS: 10000,
+ lastKnownCommittedOpTime: lastOpTime
+ });
+
+ assert.commandWorked(cmdRes);
+ assert.gt(cmdRes.cursor.id, NumberLong(0));
+ assert.eq(cmdRes.cursor.ns, dbName + "." + collName);
+ assert.eq(cmdRes.cursor.nextBatch.length, 0, tojson(cmdRes));
+ }, secondary.port);
+
+ // Ensure that we've hit the failpoint before moving on.
+ checkLog.contains(
+ secondary, 'PlanExecutor - planExecutorHangBeforeShouldWaitForInserts fail point enabled');
+
+ // Restart replication on the other nodes.
+ jsTestLog('Restarting replication');
+ restartServerReplication(secondaries[1]);
+ restartServerReplication(secondaries[2]);
+ restartServerReplication(secondaries[3]);
+
+ // Wait until all nodes have committed the last op. At this point in executing the getMore,
+ // the node's lastCommittedOpTime should now be ahead of the client's lastKnownCommittedOpTime.
+ replSet.awaitLastOpCommitted();
+ jsTestLog('All nodes caught up');
+
+ // Disable failpoint.
+ assert.commandWorked(secondary.adminCommand(
+ {configureFailPoint: 'planExecutorHangBeforeShouldWaitForInserts', mode: 'off'}));
+
+ waitForGetMoreToFinish();
+ jsTestLog('Parallel shell successfully exited');
+
+ replSet.stopSet();
+})();
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index 04a2052ba5a..b7d786f9207 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -26,6 +26,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+
#include "mongo/platform/basic.h"
#include "mongo/db/query/plan_executor.h"
@@ -53,6 +55,7 @@
#include "mongo/db/storage/record_fetcher.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/fail_point_service.h"
+#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/stacktrace.h"
@@ -74,6 +77,7 @@ struct CappedInsertNotifierData {
namespace {
MONGO_FP_DECLARE(planExecutorAlwaysFails);
+MONGO_FP_DECLARE(planExecutorHangBeforeShouldWaitForInserts);
/**
* Constructs a PlanYieldPolicy based on 'policy'.
@@ -418,22 +422,27 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o
return getNextImpl(objOut, dlOut);
}
+bool PlanExecutor::shouldListenForInserts() {
+ return _cq && _cq->getQueryRequest().isTailableAndAwaitData() &&
+ awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() &&
+ awaitDataState(_opCtx).waitForInsertsDeadline >
+ _opCtx->getServiceContext()->getPreciseClockSource()->now();
+}
bool PlanExecutor::shouldWaitForInserts() {
// If this is an awaitData-respecting operation and we have time left and we're not interrupted,
// we should wait for inserts.
- if (_cq && _cq->getQueryRequest().isTailableAndAwaitData() &&
- awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() &&
- awaitDataState(_opCtx).waitForInsertsDeadline >
- _opCtx->getServiceContext()->getPreciseClockSource()->now()) {
+ if (shouldListenForInserts()) {
// We expect awaitData cursors to be yielding.
invariant(_yieldPolicy->canReleaseLocksDuringExecution());
// For operations with a last committed opTime, we should not wait if the replication
- // coordinator's lastCommittedOpTime has changed.
+ // coordinator's lastCommittedOpTime has progressed past the client's lastCommittedOpTime.
+ // In that case, we will return early so that we can inform the client of the new
+ // lastCommittedOpTime immediately.
if (!clientsLastKnownCommittedOpTime(_opCtx).isNull()) {
auto replCoord = repl::ReplicationCoordinator::get(_opCtx);
- return clientsLastKnownCommittedOpTime(_opCtx) == replCoord->getLastCommittedOpTime();
+ return clientsLastKnownCommittedOpTime(_opCtx) >= replCoord->getLastCommittedOpTime();
}
return true;
}
@@ -528,7 +537,8 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
// insert notifier the entire time we are in the loop. Holding a shared pointer to the capped
// insert notifier is necessary for the notifierVersion to advance.
CappedInsertNotifierData cappedInsertNotifierData;
- if (shouldWaitForInserts()) {
+ if (shouldListenForInserts()) {
+ // We always construct the CappedInsertNotifier for awaitData cursors.
cappedInsertNotifierData.notifier = getCappedInsertNotifier();
}
for (;;) {
@@ -617,6 +627,11 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
} else if (PlanStage::NEED_TIME == code) {
// Fall through to yield check at end of large conditional.
} else if (PlanStage::IS_EOF == code) {
+ if (MONGO_FAIL_POINT(planExecutorHangBeforeShouldWaitForInserts)) {
+ log() << "PlanExecutor - planExecutorHangBeforeShouldWaitForInserts fail point "
+ "enabled. Blocking until fail point is disabled.";
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(planExecutorHangBeforeShouldWaitForInserts);
+ }
if (!shouldWaitForInserts()) {
return PlanExecutor::IS_EOF;
}
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index 3de96faf96b..f0d09024c7e 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -458,6 +458,12 @@ public:
private:
/**
+ * Returns true if the PlanExecutor should listen for inserts, which is when a getMore is called
+ * on a tailable and awaitData cursor that still has time left and hasn't been interrupted.
+ */
+ bool shouldListenForInserts();
+
+ /**
* Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore
* is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF
* should be returned immediately.