diff options
-rw-r--r-- | buildscripts/resmokeconfig/suites/replica_sets_pv0.yml | 3 | ||||
-rw-r--r-- | jstests/replsets/awaitdata_getmore_new_last_committed_optime.js | 109 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.h | 6 |
4 files changed, 139 insertions, 8 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_pv0.yml b/buildscripts/resmokeconfig/suites/replica_sets_pv0.yml index be2a0047e8f..11520daa0ca 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_pv0.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_pv0.yml @@ -38,7 +38,8 @@ selector: - jstests/replsets/noop_writes_wait_for_write_concern_fcv.js # Election handoff is not supported in PV0, since it requires replSetStepUp. - jstests/replsets/election_handoff*.js - + # replSetGetStatus doesn't return read concern majority optime in PV0. + - jstests/replsets/awaitdata_getmore_new_last_committed_optime.js executor: config: diff --git a/jstests/replsets/awaitdata_getmore_new_last_committed_optime.js b/jstests/replsets/awaitdata_getmore_new_last_committed_optime.js new file mode 100644 index 00000000000..c2ab32c68f2 --- /dev/null +++ b/jstests/replsets/awaitdata_getmore_new_last_committed_optime.js @@ -0,0 +1,109 @@ +// Regression test to ensure that we don't crash during a getMore if the client's +// lastKnownCommittedOpTime switches from being ahead of the node's lastCommittedOpTime to behind +// while an awaitData query is running. See SERVER-35239. + +(function() { + 'use strict'; + load('jstests/replsets/rslib.js'); + load('jstests/libs/check_log.js'); + + const name = 'awaitdata_getmore_new_last_committed_optime'; + const replSet = new ReplSetTest({name: name, nodes: 5}); + + replSet.startSet(); + replSet.initiate(); + + const dbName = 'test'; + const collName = 'coll'; + + const primary = replSet.getPrimary(); + const secondaries = replSet.getSecondaries(); + const secondary = secondaries[0]; + + const primaryDB = primary.getDB(dbName); + + // Create capped collection on primary and allow it to be committed. + assert.commandWorked(primaryDB.createCollection(collName, {capped: true, size: 2048})); + replSet.awaitReplication(); + replSet.awaitLastOpCommitted(); + + // Stop data replication on 3 secondaries to prevent writes being committed. + jsTestLog('Stopping replication'); + stopServerReplication(secondaries[1]); + stopServerReplication(secondaries[2]); + stopServerReplication(secondaries[3]); + + // Write data to primary. + for (let i = 0; i < 2; i++) { + assert.writeOK(primaryDB[collName].insert({_id: i}, {writeConcern: {w: 2}})); + } + + jsTestLog('Secondary has replicated data'); + + jsTestLog('Starting parallel shell'); + // Start a parallel shell because we'll be enabling a failpoint that will make the thread hang. + let waitForGetMoreToFinish = startParallelShell(() => { + load('jstests/replsets/rslib.js'); + + const secondary = db.getMongo(); + secondary.setSlaveOk(); + + const dbName = 'test'; + const collName = 'coll'; + const awaitDataDB = db.getSiblingDB('test'); + + // Create awaitData cursor and get all data written so that a following getMore will have to + // wait for more data. + let cmdRes = + awaitDataDB.runCommand({find: collName, batchSize: 2, awaitData: true, tailable: true}); + assert.commandWorked(cmdRes); + assert.gt(cmdRes.cursor.id, NumberLong(0)); + assert.eq(cmdRes.cursor.ns, dbName + "." + collName); + assert.eq(cmdRes.cursor.firstBatch.length, 2, tojson(cmdRes)); + + // Enable failpoint. + assert.commandWorked(db.adminCommand( + {configureFailPoint: 'planExecutorHangBeforeShouldWaitForInserts', mode: 'alwaysOn'})); + + // Call getMore on awaitData cursor with lastKnownCommittedOpTime ahead of node. This will + // hang until we've disabled the failpoint. maxTimeMS must be set otherwise the default + // timeout for waiting for inserts is 1 second. + const lastOpTime = getLastOpTime(secondary); + cmdRes = awaitDataDB.runCommand({ + getMore: cmdRes.cursor.id, + collection: collName, + batchSize: NumberInt(2), + maxTimeMS: 10000, + lastKnownCommittedOpTime: lastOpTime + }); + + assert.commandWorked(cmdRes); + assert.gt(cmdRes.cursor.id, NumberLong(0)); + assert.eq(cmdRes.cursor.ns, dbName + "." + collName); + assert.eq(cmdRes.cursor.nextBatch.length, 0, tojson(cmdRes)); + }, secondary.port); + + // Ensure that we've hit the failpoint before moving on. + checkLog.contains( + secondary, 'PlanExecutor - planExecutorHangBeforeShouldWaitForInserts fail point enabled'); + + // Restart replication on the other nodes. + jsTestLog('Restarting replication'); + restartServerReplication(secondaries[1]); + restartServerReplication(secondaries[2]); + restartServerReplication(secondaries[3]); + + // Wait until all nodes have committed the last op. At this point in executing the getMore, + // the node's lastCommittedOpTime should now be ahead of the client's lastKnownCommittedOpTime. + replSet.awaitLastOpCommitted(); + jsTestLog('All nodes caught up'); + + // Disable failpoint. + assert.commandWorked(secondary.adminCommand( + {configureFailPoint: 'planExecutorHangBeforeShouldWaitForInserts', mode: 'off'})); + + waitForGetMoreToFinish(); + jsTestLog('Parallel shell successfully exited'); + + replSet.stopSet(); +})(); diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index 04a2052ba5a..b7d786f9207 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + #include "mongo/platform/basic.h" #include "mongo/db/query/plan_executor.h" @@ -53,6 +55,7 @@ #include "mongo/db/storage/record_fetcher.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" #include "mongo/util/scopeguard.h" #include "mongo/util/stacktrace.h" @@ -74,6 +77,7 @@ struct CappedInsertNotifierData { namespace { MONGO_FP_DECLARE(planExecutorAlwaysFails); +MONGO_FP_DECLARE(planExecutorHangBeforeShouldWaitForInserts); /** * Constructs a PlanYieldPolicy based on 'policy'. @@ -418,22 +422,27 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o return getNextImpl(objOut, dlOut); } +bool PlanExecutor::shouldListenForInserts() { + return _cq && _cq->getQueryRequest().isTailableAndAwaitData() && + awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() && + awaitDataState(_opCtx).waitForInsertsDeadline > + _opCtx->getServiceContext()->getPreciseClockSource()->now(); +} bool PlanExecutor::shouldWaitForInserts() { // If this is an awaitData-respecting operation and we have time left and we're not interrupted, // we should wait for inserts. - if (_cq && _cq->getQueryRequest().isTailableAndAwaitData() && - awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() && - awaitDataState(_opCtx).waitForInsertsDeadline > - _opCtx->getServiceContext()->getPreciseClockSource()->now()) { + if (shouldListenForInserts()) { // We expect awaitData cursors to be yielding. invariant(_yieldPolicy->canReleaseLocksDuringExecution()); // For operations with a last committed opTime, we should not wait if the replication - // coordinator's lastCommittedOpTime has changed. + // coordinator's lastCommittedOpTime has progressed past the client's lastCommittedOpTime. + // In that case, we will return early so that we can inform the client of the new + // lastCommittedOpTime immediately. if (!clientsLastKnownCommittedOpTime(_opCtx).isNull()) { auto replCoord = repl::ReplicationCoordinator::get(_opCtx); - return clientsLastKnownCommittedOpTime(_opCtx) == replCoord->getLastCommittedOpTime(); + return clientsLastKnownCommittedOpTime(_opCtx) >= replCoord->getLastCommittedOpTime(); } return true; } @@ -528,7 +537,8 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, // insert notifier the entire time we are in the loop. Holding a shared pointer to the capped // insert notifier is necessary for the notifierVersion to advance. CappedInsertNotifierData cappedInsertNotifierData; - if (shouldWaitForInserts()) { + if (shouldListenForInserts()) { + // We always construct the CappedInsertNotifier for awaitData cursors. cappedInsertNotifierData.notifier = getCappedInsertNotifier(); } for (;;) { @@ -617,6 +627,11 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, } else if (PlanStage::NEED_TIME == code) { // Fall through to yield check at end of large conditional. } else if (PlanStage::IS_EOF == code) { + if (MONGO_FAIL_POINT(planExecutorHangBeforeShouldWaitForInserts)) { + log() << "PlanExecutor - planExecutorHangBeforeShouldWaitForInserts fail point " + "enabled. Blocking until fail point is disabled."; + MONGO_FAIL_POINT_PAUSE_WHILE_SET(planExecutorHangBeforeShouldWaitForInserts); + } if (!shouldWaitForInserts()) { return PlanExecutor::IS_EOF; } diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index 3de96faf96b..f0d09024c7e 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -458,6 +458,12 @@ public: private: /** + * Returns true if the PlanExecutor should listen for inserts, which is when a getMore is called + * on a tailable and awaitData cursor that still has time left and hasn't been interrupted. + */ + bool shouldListenForInserts(); + + /** * Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore * is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF * should be returned immediately. |