summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2020-01-06 17:48:35 +0000
committerevergreen <evergreen@mongodb.com>2020-01-06 17:48:35 +0000
commite62de66b40d490267506ef89b081dcca3c33b508 (patch)
tree58d32ed3a10c4a1bfdc2ba69d008394400424a7b
parent6474ab0ffa9bd0fcfe526bbb07a84100b79d34fb (diff)
downloadmongo-e62de66b40d490267506ef89b081dcca3c33b508.tar.gz
SERVER-43978 Allow stable timestamp to advance after a transaction that reserved an oplog timestamp is aborted
(cherry picked from commit 754c07c70cf5cd3c6760683bc29c927010a5718a)
-rw-r--r--jstests/replsets/stable_timestamp_can_advance_after_oplog_hole_abort.js195
-rw-r--r--src/mongo/db/catalog/database_impl.cpp7
-rw-r--r--src/mongo/db/op_observer_impl.cpp8
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp11
-rw-r--r--src/mongo/db/repl/local_oplog_info.cpp26
-rw-r--r--src/mongo/db/transaction_participant.cpp7
6 files changed, 238 insertions, 16 deletions
diff --git a/jstests/replsets/stable_timestamp_can_advance_after_oplog_hole_abort.js b/jstests/replsets/stable_timestamp_can_advance_after_oplog_hole_abort.js
new file mode 100644
index 00000000000..590e8e6d1e6
--- /dev/null
+++ b/jstests/replsets/stable_timestamp_can_advance_after_oplog_hole_abort.js
@@ -0,0 +1,195 @@
+/**
+ * Test that the stable timestamp can advance after an oplog hole is released via an abort.
+ *
+ * @tags: [uses_transactions]
+ */
+
+(function() {
+"use strict";
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallelTester.js");
+
+const replTest = new ReplSetTest({nodes: 1});
+replTest.startSet();
+replTest.initiate();
+
+const primary = replTest.getPrimary();
+
+const dbName = "test";
+const collName = "stable_timestamp_coll";
+const majorityWriteCollName = "stable_timestamp_coll_majority_writes";
+
+const testDB = primary.getDB(dbName);
+const testColl = testDB.getCollection(collName);
+const majorityWriteColl = testDB.getCollection(majorityWriteCollName);
+let id = 0;
+
+// Create the necessary collections.
+assert.commandWorked(testDB.createCollection(collName));
+assert.commandWorked(testDB.createCollection(majorityWriteCollName));
+
+/**
+ * The general structure of each test below is as follows:
+ *
+ * 1. Start a command/operation C1.
+ * 2. C1 reserves an oplog timestamp T1 and then pauses.
+ * 3. Start a w:majority write operation C2 at T2, where T2 > T1.
+ * 4. C2 completes its write and starts waiting for write concern.
+ * 5. Abort operation C1.
+ * 6. Ensure C2 write concern waiting completes successfully.
+ *
+ * The first operation can be any operation that reserves an oplog timestamp and then later aborts.
+ * This could be any number of operations that write to the oplog, including DDL and/or CRUD ops. We
+ * test a few different varieties below.
+ *
+ */
+
+// Run a write with {w: "majority"}.
+function majorityWriteFn(host, dbName, collName, doc) {
+ const testDB = new Mongo(host).getDB(dbName);
+ const testColl = testDB.getCollection(collName);
+
+ assert.commandWorked(
+ testColl.insert(doc, {writeConcern: {w: "majority", wtimeout: 10 * 1000}}));
+}
+
+//
+// Test createCollection abort.
+//
+
+// Create a new collection.
+function createCollFn(host, dbName, collName, expectedErrCode) {
+ const testDB = new Mongo(host).getDB(dbName);
+ jsTestLog("Creating a new collection.");
+ assert.commandFailedWithCode(testDB.createCollection(collName), expectedErrCode);
+}
+
+function testCreateCollection() {
+ jsTestLog("Running createCollection test.");
+
+ // Initialize the failpoint.
+ const hangCreatefailPoint =
+ configureFailPoint(primary, "hangAndFailAfterCreateCollectionReservesOpTime");
+
+ // Start operation T1.
+ jsTestLog("Starting the create collection operation.");
+ const createColl = new Thread(createCollFn, primary.host, dbName, "newColl", 51267);
+ createColl.start();
+ hangCreatefailPoint.wait();
+
+ // Start operation T2, the majority write.
+ jsTestLog("Starting the majority write operation.");
+ const doc = {_id: id++};
+ const majorityWrite =
+ new Thread(majorityWriteFn, primary.host, dbName, majorityWriteCollName, doc);
+ majorityWrite.start();
+
+ // Wait until the majority write operation has completed and is waiting for write concern.
+ assert.soon(() => majorityWriteColl.find(doc).itcount() === 1);
+
+ jsTestLog("Releasing the failpoint.");
+ hangCreatefailPoint.off();
+
+ jsTestLog("Waiting for the operations to complete.");
+ createColl.join();
+ majorityWrite.join();
+}
+
+//
+// Test insert abort.
+//
+
+// Insert a single document into a given collection.
+function insertFn(host, dbName, collName, doc, expectedErrCode) {
+ const testDB = new Mongo(host).getDB(dbName);
+ const testColl = testDB.getCollection(collName);
+
+ // Create the new collection.
+ jsTestLog("Inserting document: " + tojson(doc));
+ assert.commandFailedWithCode(testColl.insert(doc), expectedErrCode);
+}
+
+function testInsert() {
+ jsTestLog("Running insert test.");
+
+ const failPoint = configureFailPoint(primary,
+ "hangAndFailAfterDocumentInsertsReserveOpTimes",
+ {collectionNS: testColl.getFullName()});
+
+ // Start operation T1.
+ jsTestLog("Starting the insert operation.");
+ const insert = new Thread(insertFn, primary.host, dbName, collName, {insert: 1}, 51269);
+ insert.start();
+ failPoint.wait();
+
+ // Start operation T2, the majority write.
+ jsTestLog("Starting the majority write operation.");
+ const doc = {_id: id++};
+ const majorityWrite =
+ new Thread(majorityWriteFn, primary.host, dbName, majorityWriteCollName, doc);
+ majorityWrite.start();
+
+ // Wait until the majority write operation has completed and is waiting for write concern.
+ jsTestLog("Waiting until the majority write is visible.");
+ assert.soon(() => majorityWriteColl.find(doc).itcount() === 1);
+
+ jsTestLog("Releasing the failpoint.");
+ failPoint.off();
+
+ jsTestLog("Waiting for the operations to complete.");
+ insert.join();
+ majorityWrite.join();
+}
+
+//
+// Test unprepared transaction commit abort.
+//
+
+// Run and commit a transaction that inserts a document.
+function transactionFn(host, dbName, collName) {
+ const session = new Mongo(host).startSession();
+ const sessionDB = session.getDatabase(dbName);
+
+ session.startTransaction();
+ sessionDB[collName].insert({});
+ assert.commandFailedWithCode(session.commitTransaction_forTesting(), 51268);
+}
+
+function testUnpreparedTransactionCommit() {
+ jsTestLog("Running unprepared transaction commit test.");
+
+ const failPoint =
+ configureFailPoint(primary, "hangAndFailUnpreparedCommitAfterReservingOplogSlot");
+
+ // Start operation T1.
+ jsTestLog("Starting the transaction.");
+ const txn = new Thread(transactionFn, primary.host, dbName, collName);
+ txn.start();
+ failPoint.wait();
+
+ // Start operation T2, the majority write.
+ jsTestLog("Starting the majority write operation.");
+ const doc = {_id: id++};
+ const majorityWrite =
+ new Thread(majorityWriteFn, primary.host, dbName, majorityWriteCollName, doc);
+ majorityWrite.start();
+
+ // Wait until the majority write operation has completed and is waiting for write concern.
+ jsTestLog("Waiting until the majority write is visible.");
+ assert.soon(() => majorityWriteColl.find(doc).itcount() === 1);
+
+ jsTestLog("Releasing the failpoint.");
+ failPoint.off();
+
+ jsTestLog("Waiting for the operations to complete.");
+ txn.join();
+ majorityWrite.join();
+}
+
+// Execute all the tests.
+testCreateCollection();
+testInsert();
+testUnpreparedTransactionCommit();
+
+replTest.stopSet();
+}()); \ No newline at end of file
diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp
index 4fb3849f254..8642fd8068b 100644
--- a/src/mongo/db/catalog/database_impl.cpp
+++ b/src/mongo/db/catalog/database_impl.cpp
@@ -82,6 +82,7 @@ namespace mongo {
namespace {
MONGO_FAIL_POINT_DEFINE(hangBeforeLoggingCreateCollection);
+MONGO_FAIL_POINT_DEFINE(hangAndFailAfterCreateCollectionReservesOpTime);
Status validateDBNameForWindows(StringData dbname) {
const std::vector<std::string> windowsReservedNames = {
@@ -642,6 +643,12 @@ Collection* DatabaseImpl::createCollection(OperationContext* opCtx,
createOplogSlot = repl::getNextOpTime(opCtx);
}
+ if (MONGO_unlikely(hangAndFailAfterCreateCollectionReservesOpTime.shouldFail())) {
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(
+ opCtx, hangAndFailAfterCreateCollectionReservesOpTime);
+ uasserted(51267, "hangAndFailAfterCreateCollectionReservesOpTime fail point enabled");
+ }
+
_checkCanCreateCollection(opCtx, nss, optionsWithUUID);
audit::logCreateCollection(&cc(), nss.ns());
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 2fe31d10d98..73b60b7d904 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -69,6 +69,7 @@ using repl::OplogEntry;
namespace {
MONGO_FAIL_POINT_DEFINE(failCollectionUpdates);
+MONGO_FAIL_POINT_DEFINE(hangAndFailUnpreparedCommitAfterReservingOplogSlot);
const auto documentKeyDecoration = OperationContext::declareDecoration<BSONObj>();
@@ -1297,6 +1298,13 @@ void OpObserverImpl::onUnpreparedTransactionCommit(
auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size());
invariant(oplogSlots.size() == statements.size());
+ if (MONGO_unlikely(hangAndFailUnpreparedCommitAfterReservingOplogSlot.shouldFail())) {
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(
+ opCtx, hangAndFailUnpreparedCommitAfterReservingOplogSlot);
+ uasserted(51268,
+ "hangAndFailUnpreparedCommitAfterReservingOplogSlot fail point enabled");
+ }
+
// Log in-progress entries for the transaction along with the implicit commit.
int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots, false);
commitOpTime = oplogSlots[numOplogEntries - 1];
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 3ae46dc6146..324caf762ab 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -97,6 +97,7 @@ MONGO_FAIL_POINT_DEFINE(hangAfterAllChildRemoveOpsArePopped);
MONGO_FAIL_POINT_DEFINE(hangDuringBatchInsert);
MONGO_FAIL_POINT_DEFINE(hangDuringBatchUpdate);
MONGO_FAIL_POINT_DEFINE(hangDuringBatchRemove);
+MONGO_FAIL_POINT_DEFINE(hangAndFailAfterDocumentInsertsReserveOpTimes);
// The withLock fail points are for testing interruptability of these operations, so they will not
// themselves check for interrupt.
MONGO_FAIL_POINT_DEFINE(hangWithLockDuringBatchInsert);
@@ -316,6 +317,16 @@ void insertDocuments(OperationContext* opCtx,
}
}
+ MONGO_FAIL_POINT_BLOCK(hangAndFailAfterDocumentInsertsReserveOpTimes, nssData) {
+ const BSONObj& data = nssData.getData();
+ const auto collElem = data["collectionNS"];
+ if (!collElem || collection->ns().ns() == collElem.str()) {
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(
+ opCtx, hangAndFailAfterDocumentInsertsReserveOpTimes);
+ uasserted(51269, "hangAndFailAfterDocumentInsertsReserveOpTimes fail point enabled");
+ }
+ }
+
uassertStatusOK(
collection->insertDocuments(opCtx, begin, end, &CurOp::get(opCtx)->debug(), fromMigrate));
wuow.commit();
diff --git a/src/mongo/db/repl/local_oplog_info.cpp b/src/mongo/db/repl/local_oplog_info.cpp
index b17da6d88c5..7a2797bdb47 100644
--- a/src/mongo/db/repl/local_oplog_info.cpp
+++ b/src/mongo/db/repl/local_oplog_info.cpp
@@ -120,21 +120,29 @@ std::vector<OplogSlot> LocalOplogInfo::getNextOpTimes(OperationContext* opCtx, s
// Allow the storage engine to start the transaction outside the critical section.
opCtx->recoveryUnit()->preallocateSnapshot();
- stdx::lock_guard<Latch> lk(_newOpMutex);
-
- ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp();
- const bool orderedCommit = false;
+ {
+ stdx::lock_guard<Latch> lk(_newOpMutex);
- // The local oplog collection pointer must already be established by this point.
- // We can't establish it here because that would require locking the local database, which would
- // be a lock order violation.
- invariant(_oplog);
- fassert(28560, _oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit));
+ ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp();
+ const bool orderedCommit = false;
+ // The local oplog collection pointer must already be established by this point.
+ // We can't establish it here because that would require locking the local database, which
+ // would be a lock order violation.
+ invariant(_oplog);
+ fassert(28560, _oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit));
+ }
std::vector<OplogSlot> oplogSlots(count);
for (std::size_t i = 0; i < count; i++) {
oplogSlots[i] = {Timestamp(ts.asULL() + i), term};
}
+
+ // If we abort a transaction that has reserved an optime, we should make sure to update the
+ // stable timestamp if necessary, since this oplog hole may have been holding back the stable
+ // timestamp.
+ opCtx->recoveryUnit()->onRollback(
+ [replCoord]() { replCoord->attemptToAdvanceStableTimestamp(); });
+
return oplogSlots;
}
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index b627d2fd637..dde6317acb4 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -676,13 +676,6 @@ TransactionParticipant::OplogSlotReserver::~OplogSlotReserver() {
// side transaction.
_recoveryUnit->abortUnitOfWork();
}
-
- // After releasing the oplog hole, the all_durable timestamp can advance past this oplog hole,
- // if there are no other open holes. Check if we can advance the stable timestamp any further
- // since a majority write may be waiting on the stable timestamp to advance beyond this oplog
- // hole to acknowledge the write to the user.
- auto replCoord = repl::ReplicationCoordinator::get(_opCtx);
- replCoord->attemptToAdvanceStableTimestamp();
}
TransactionParticipant::TxnResources::TxnResources(WithLock wl,