From 4cee07d8a97bb0663e7bfbc3f2e1fbf539140adf Mon Sep 17 00:00:00 2001 From: Judah Schvimer Date: Mon, 21 May 2018 18:52:55 -0400 Subject: SERVER-35113 Allow single voting primaries to advance stable timestamp even when last applied does not advance --- jstests/replsets/storage_commit_out_of_order.js | 74 ++++++++++++++++++++++ src/mongo/db/repl/oplog.cpp | 11 ++++ src/mongo/db/repl/replication_coordinator_impl.cpp | 5 ++ 3 files changed, 90 insertions(+) create mode 100644 jstests/replsets/storage_commit_out_of_order.js diff --git a/jstests/replsets/storage_commit_out_of_order.js b/jstests/replsets/storage_commit_out_of_order.js new file mode 100644 index 00000000000..d856d47cf7d --- /dev/null +++ b/jstests/replsets/storage_commit_out_of_order.js @@ -0,0 +1,74 @@ +/** + * Tests that single voting primaries can commit majority writes when they storage-commit out of + * order. This test first inserts a document to set the last applied optime, all committed + * timestamp, and stable timestamp. It then spawns 'n' threads and holds them behind a barrier. Once + * the threads are all waiting at the barrier, the threads all do a w:majority insert. We turn on a + * fail point that will block the first thread to receive an optime from the optime generator for a + * few seconds while the other threads get later optimes and commit their inserts. The hung thread + * is released after a few seconds and asserts that its write concern can be satisfied. + */ +(function() { + 'use strict'; + + load('jstests/libs/parallelTester.js'); + + const rst = new ReplSetTest({nodes: 1}); + rst.startSet(); + rst.initiate(); + const dbName = 'storage_commit_out_of_order'; + const collName = 'foo'; + const numThreads = 2; + const primary = rst.getPrimary(); + const coll = primary.getDB(dbName).getCollection(collName); + + /** + * Waits for the provided latch to reach 0 and then does a single w:majority insert. + */ + const majorityInsert = function(num, host, dbName, collName, latch) { + const m = new Mongo(host); + latch.countDown(); + while (latch.getCount() > 0) { + // do nothing + } + return m.getDB(dbName).runCommand({ + insert: collName, + documents: [{b: num}], + writeConcern: {w: 'majority', wtimeout: ReplSetTest.kDefaultTimeoutMS} + }); + }; + + assert.commandWorked(primary.setLogLevel(2, 'replication')); + assert.commandWorked(coll.insert( + {a: 1}, {writeConcern: {w: 'majority', wtimeout: ReplSetTest.kDefaultTimeoutMS}})); + + // Turn on a fail point to force the first thread to receive an optime from the optime + // generator to wait a few seconds before storage-committing the insert. + assert.commandWorked(primary.adminCommand({ + configureFailPoint: 'sleepBetweenInsertOpTimeGenerationAndLogOp', + mode: {times: 1}, + data: {waitForMillis: 3000} + })); + + // Start a bunch of threads. They will block waiting on the latch to hit 0. + const t = []; + const counter = new CountDownLatch(numThreads + 1); + for (let i = 0; i < numThreads; ++i) { + t[i] = new ScopedThread(majorityInsert, i, coll.getMongo().host, dbName, collName, counter); + t[i].start(); + } + + // Release the threads with the latch once they are all blocked on it. + jsTestLog('All threads started.'); + assert.soon(() => counter.getCount() === 1); + jsTestLog('All threads at barrier.'); + counter.countDown(); + jsTestLog('All threads finishing.'); + + // Wait for all threads to complete and ensure they succeeded. + for (let i = 0; i < numThreads; ++i) { + t[i].join(); + assert.commandWorked(t[i].returnData()); + } + + rst.stopSet(); +}()); \ No newline at end of file diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index bbeebdc85f9..d50da9506d1 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -111,6 +111,9 @@ using IndexVersion = IndexDescriptor::IndexVersion; namespace repl { namespace { + +MONGO_FP_DECLARE(sleepBetweenInsertOpTimeGenerationAndLogOp); + /** * The `_localOplogCollection` pointer is always valid (or null) because an * operation must take the global exclusive lock to set the pointer to null when @@ -526,6 +529,14 @@ std::vector logInsertOps(OperationContext* opCtx, opTimes.push_back(insertStatementOplogSlot.opTime); } + MONGO_FAIL_POINT_BLOCK(sleepBetweenInsertOpTimeGenerationAndLogOp, customWait) { + const BSONObj& data = customWait.getData(); + auto numMillis = data["waitForMillis"].numberInt(); + log() << "Sleeping for " << numMillis << "ms after receiving " << count << " optimes from " + << opTimes.front() << " to " << opTimes.back(); + sleepmillis(numMillis); + } + std::unique_ptr basePtrs(new DocWriter const*[count]); for (size_t i = 0; i < count; i++) { basePtrs[i] = &writers[i]; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 88d3e717419..d16dfcaa613 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1046,6 +1046,11 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opT if (opTime > _getMyLastAppliedOpTime_inlock()) { _setMyLastAppliedOpTime_inlock(opTime, false, consistency); _reportUpstream_inlock(std::move(lock)); + } else if (consistency == DataConsistency::Consistent && _canAcceptNonLocalWrites && + _rsConfig.getWriteMajority() == 1) { + // Single vote primaries may have a lagged stable timestamp due to paring back the stable + // timestamp to the all committed timestamp. + _setStableTimestampForStorage_inlock(); } } -- cgit v1.2.1