summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/replsets/storage_commit_out_of_order.js74
-rw-r--r--src/mongo/db/repl/oplog.cpp11
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp5
3 files changed, 90 insertions, 0 deletions
diff --git a/jstests/replsets/storage_commit_out_of_order.js b/jstests/replsets/storage_commit_out_of_order.js
new file mode 100644
index 00000000000..d856d47cf7d
--- /dev/null
+++ b/jstests/replsets/storage_commit_out_of_order.js
@@ -0,0 +1,74 @@
+/**
+ * Tests that single voting primaries can commit majority writes when they storage-commit out of
+ * order. This test first inserts a document to set the last applied optime, all committed
+ * timestamp, and stable timestamp. It then spawns 'n' threads and holds them behind a barrier. Once
+ * the threads are all waiting at the barrier, the threads all do a w:majority insert. We turn on a
+ * fail point that will block the first thread to receive an optime from the optime generator for a
+ * few seconds while the other threads get later optimes and commit their inserts. The hung thread
+ * is released after a few seconds and asserts that its write concern can be satisfied.
+ */
+(function() {
+ 'use strict';
+
+ load('jstests/libs/parallelTester.js');
+
+ const rst = new ReplSetTest({nodes: 1});
+ rst.startSet();
+ rst.initiate();
+ const dbName = 'storage_commit_out_of_order';
+ const collName = 'foo';
+ const numThreads = 2;
+ const primary = rst.getPrimary();
+ const coll = primary.getDB(dbName).getCollection(collName);
+
+ /**
+ * Waits for the provided latch to reach 0 and then does a single w:majority insert.
+ */
+ const majorityInsert = function(num, host, dbName, collName, latch) {
+ const m = new Mongo(host);
+ latch.countDown();
+ while (latch.getCount() > 0) {
+ // do nothing
+ }
+ return m.getDB(dbName).runCommand({
+ insert: collName,
+ documents: [{b: num}],
+ writeConcern: {w: 'majority', wtimeout: ReplSetTest.kDefaultTimeoutMS}
+ });
+ };
+
+ assert.commandWorked(primary.setLogLevel(2, 'replication'));
+ assert.commandWorked(coll.insert(
+ {a: 1}, {writeConcern: {w: 'majority', wtimeout: ReplSetTest.kDefaultTimeoutMS}}));
+
+ // Turn on a fail point to force the first thread to receive an optime from the optime
+ // generator to wait a few seconds before storage-committing the insert.
+ assert.commandWorked(primary.adminCommand({
+ configureFailPoint: 'sleepBetweenInsertOpTimeGenerationAndLogOp',
+ mode: {times: 1},
+ data: {waitForMillis: 3000}
+ }));
+
+ // Start a bunch of threads. They will block waiting on the latch to hit 0.
+ const t = [];
+ const counter = new CountDownLatch(numThreads + 1);
+ for (let i = 0; i < numThreads; ++i) {
+ t[i] = new ScopedThread(majorityInsert, i, coll.getMongo().host, dbName, collName, counter);
+ t[i].start();
+ }
+
+ // Release the threads with the latch once they are all blocked on it.
+ jsTestLog('All threads started.');
+ assert.soon(() => counter.getCount() === 1);
+ jsTestLog('All threads at barrier.');
+ counter.countDown();
+ jsTestLog('All threads finishing.');
+
+ // Wait for all threads to complete and ensure they succeeded.
+ for (let i = 0; i < numThreads; ++i) {
+ t[i].join();
+ assert.commandWorked(t[i].returnData());
+ }
+
+ rst.stopSet();
+}()); \ No newline at end of file
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index bbeebdc85f9..d50da9506d1 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -111,6 +111,9 @@ using IndexVersion = IndexDescriptor::IndexVersion;
namespace repl {
namespace {
+
+MONGO_FP_DECLARE(sleepBetweenInsertOpTimeGenerationAndLogOp);
+
/**
* The `_localOplogCollection` pointer is always valid (or null) because an
* operation must take the global exclusive lock to set the pointer to null when
@@ -526,6 +529,14 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
opTimes.push_back(insertStatementOplogSlot.opTime);
}
+ MONGO_FAIL_POINT_BLOCK(sleepBetweenInsertOpTimeGenerationAndLogOp, customWait) {
+ const BSONObj& data = customWait.getData();
+ auto numMillis = data["waitForMillis"].numberInt();
+ log() << "Sleeping for " << numMillis << "ms after receiving " << count << " optimes from "
+ << opTimes.front() << " to " << opTimes.back();
+ sleepmillis(numMillis);
+ }
+
std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]);
for (size_t i = 0; i < count; i++) {
basePtrs[i] = &writers[i];
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 88d3e717419..d16dfcaa613 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1046,6 +1046,11 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opT
if (opTime > _getMyLastAppliedOpTime_inlock()) {
_setMyLastAppliedOpTime_inlock(opTime, false, consistency);
_reportUpstream_inlock(std::move(lock));
+ } else if (consistency == DataConsistency::Consistent && _canAcceptNonLocalWrites &&
+ _rsConfig.getWriteMajority() == 1) {
+ // Single vote primaries may have a lagged stable timestamp due to paring back the stable
+ // timestamp to the all committed timestamp.
+ _setStableTimestampForStorage_inlock();
}
}