summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharded_core_txns.yml1
-rw-r--r--jstests/core/txns/prepare_conflict.js33
-rw-r--r--jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js231
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/catalog/uuid_catalog.h2
-rw-r--r--src/mongo/db/free_mon/free_mon_op_observer.h2
-rw-r--r--src/mongo/db/op_observer.h4
-rw-r--r--src/mongo/db/op_observer_impl.cpp28
-rw-r--r--src/mongo/db/op_observer_impl.h2
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp14
-rw-r--r--src/mongo/db/op_observer_noop.h2
-rw-r--r--src/mongo/db/op_observer_registry.h4
-rw-r--r--src/mongo/db/repl/SConscript10
-rw-r--r--src/mongo/db/repl/oplog.cpp2
-rw-r--r--src/mongo/db/repl/oplog.h20
-rw-r--r--src/mongo/db/repl/oplog_shim.cpp35
-rw-r--r--src/mongo/db/s/config_server_op_observer.h2
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h2
-rw-r--r--src/mongo/db/session.cpp93
-rw-r--r--src/mongo/db/session.h31
-rw-r--r--src/mongo/db/session_test.cpp48
-rw-r--r--src/mongo/db/storage/write_unit_of_work.cpp12
-rw-r--r--src/mongo/db/storage/write_unit_of_work.h2
27 files changed, 519 insertions, 66 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml
index 5ebf921108d..dba7fd9b567 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml
@@ -77,6 +77,7 @@ selector:
# TODO (SERVER-35865): Unblacklist when we correctly write and apply 'commitTransaction' oplog
# entries.
- jstests/core/txns/commit_prepared_transaction.js
+ - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js
run_hook_interval: &run_hook_interval 20
executor:
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml
index 693bbc87c8f..4ececd471fd 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml
@@ -14,6 +14,7 @@ selector:
# TODO (SERVER-35865): Unblacklist when we correctly write and apply 'commitTransaction' oplog
# entries.
- jstests/core/txns/commit_prepared_transaction.js
+ - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js
run_hook_interval: &run_hook_interval 20
executor:
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml
index 47d09c7e859..40a40434b62 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml
@@ -26,6 +26,7 @@ selector:
# TODO (SERVER-35865): Unblacklist when we correctly write and apply 'commitTransaction' oplog
# entries.
- jstests/core/txns/commit_prepared_transaction.js
+ - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml
index 90c3e9bc1ae..3e26a655d41 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml
@@ -17,6 +17,7 @@ selector:
# TODO (SERVER-35865): Unblacklist when we correctly write and apply 'commitTransaction' oplog
# entries.
- jstests/core/txns/commit_prepared_transaction.js
+ - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/sharded_core_txns.yml b/buildscripts/resmokeconfig/suites/sharded_core_txns.yml
index a9f77e85e4e..7f37c9c1c28 100644
--- a/buildscripts/resmokeconfig/suites/sharded_core_txns.yml
+++ b/buildscripts/resmokeconfig/suites/sharded_core_txns.yml
@@ -22,6 +22,7 @@ selector:
- jstests/core/txns/no_new_transactions_when_prepared_transaction_in_progress.js
- jstests/core/txns/prepare_committed_transaction.js
- jstests/core/txns/prepare_conflict.js
+ - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js
# Mongos doesn't upconvert from local or majority level readConcern to snapshot.
- jstests/core/txns/upconvert_read_concern.js
diff --git a/jstests/core/txns/prepare_conflict.js b/jstests/core/txns/prepare_conflict.js
index 5fc81d0cdff..870af78909a 100644
--- a/jstests/core/txns/prepare_conflict.js
+++ b/jstests/core/txns/prepare_conflict.js
@@ -32,9 +32,16 @@
assert(prepareConflicted);
}
- // Insert the document.
- const doc1 = {_id: 1, x: 1};
- assert.commandWorked(testColl.insert(doc1));
+ // Insert a document modified by the transaction.
+ const txnDoc = {_id: 1, x: 1};
+ assert.commandWorked(testColl.insert(txnDoc));
+
+ // Insert a document unmodified by the transaction.
+ const otherDoc = {_id: 2, y: 2};
+ assert.commandWorked(testColl.insert(otherDoc));
+
+ // Create an index on 'y' to avoid conflicts on the field.
+ assert.commandWorked(testColl.createIndex({y: 1}));
// Enable the profiler to log slow queries. We expect a 'find' to hang until the prepare
// conflict is resolved.
@@ -45,19 +52,31 @@
session.startTransaction({readConcern: {level: "snapshot"}});
assert.commandWorked(sessionDB.runCommand({
update: collName,
- updates: [{q: doc1, u: {$inc: {x: 1}}}],
+ updates: [{q: txnDoc, u: {$inc: {x: 1}}}],
}));
assert.commandWorked(sessionDB.adminCommand({prepareTransaction: 1}));
- assertPrepareConflict({_id: 1});
+
+ // Conflict on _id of prepared document.
+ assertPrepareConflict({_id: txnDoc._id});
+
+ // Conflict on field that could be added to a prepared document.
+ assertPrepareConflict({randomField: "random"});
+
+ // No conflict on _id of a non-prepared document.
+ assert.commandWorked(testDB.runCommand({find: collName, filter: {_id: otherDoc._id}}));
+
+ // No conflict on indexed field of a non-prepared document.
+ assert.commandWorked(testDB.runCommand({find: collName, filter: {y: otherDoc.y}}));
// At this point, we can guarantee all subsequent reads will conflict. Do a read in a parallel
// shell, abort the transaction, then ensure the read succeeded with the old document.
TestData.collName = collName;
TestData.dbName = dbName;
+ TestData.txnDoc = txnDoc;
const findAwait = startParallelShell(function() {
const it = db.getSiblingDB(TestData.dbName)
- .runCommand({find: TestData.collName, filter: {_id: 1}});
+ .runCommand({find: TestData.collName, filter: {_id: TestData.txnDoc._id}});
}, db.getMongo().port);
session.abortTransaction();
@@ -66,5 +85,5 @@
findAwait({checkExitSuccess: true});
// The document should be unmodified, because we aborted.
- assert.eq(doc1, testColl.findOne(doc1));
+ assert.eq(txnDoc, testColl.findOne(txnDoc));
})();
diff --git a/jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js b/jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js
new file mode 100644
index 00000000000..74671ee8439
--- /dev/null
+++ b/jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js
@@ -0,0 +1,231 @@
+/**
+ * Tests that timestamped reads, reads with snapshot and afterClusterTime, wait for the prepare
+ * transaction oplog entry to be visible before choosing a read timestamp.
+ *
+ * @tags: [uses_transactions]
+ */
+(function() {
+ 'use strict';
+ load("jstests/libs/check_log.js");
+ load('jstests/core/txns/libs/prepare_helpers.js');
+ load('jstests/libs/parallel_shell_helpers.js');
+
+ TestData.dbName = 'test';
+ const baseCollName = 'timestamped_reads_wait_for_prepare_oplog_visibility';
+ const testDB = db.getSiblingDB(TestData.dbName);
+ TestData.failureTimeout = 1 * 1000; // 1 second.
+ TestData.successTimeout = 5 * 60 * 1000; // 5 minutes.
+ TestData.txnDoc = {_id: 1, x: 1};
+ TestData.otherDoc = {_id: 2, y: 7};
+ TestData.txnDocFilter = {_id: TestData.txnDoc._id};
+ TestData.otherDocFilter = {_id: TestData.otherDoc._id};
+
+ /**
+ * A function that accepts a 'readFunc' and a collection name. 'readFunc' accepts a collection
+ * name and returns an object with an 'oplogVisibility' test field and a 'prepareConflict' test
+ * field. This function is run in a separate thread and tests that oplog visibility blocks
+ * certain reads and that prepare conflicts block other types of reads.
+ */
+ const readThreadFunc = function(readFunc, _collName) {
+ load("jstests/libs/check_log.js");
+
+ // Do not start reads until we are blocked in 'prepareTransaction'.
+ checkLog.contains(db.getMongo(), "hangAfterReservingPrepareTimestamp fail point enabled");
+
+ // Create a 'readFuncObj' from the 'readFunc'.
+ const readFuncObj = readFunc(_collName);
+ readFuncObj.oplogVisibility();
+
+ // Let the transaction finish preparing and wait for 'prepareTransaction' to complete.
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: 'hangAfterReservingPrepareTimestamp', mode: 'off'}));
+ checkLog.contains(db.getMongo(), "command: prepareTransaction");
+
+ readFuncObj.prepareConflict();
+ };
+
+ function runTest(prefix, readFunc) {
+ // Reset the log history between tests.
+ assert.commandWorked(db.adminCommand({clearLog: 'global'}));
+
+ jsTestLog('Testing oplog visibility for ' + prefix);
+ const collName = baseCollName + '_' + prefix;
+ const testColl = testDB.getCollection(collName);
+
+ testColl.drop({writeConcern: {w: "majority"}});
+ assert.commandWorked(testDB.runCommand({create: collName, writeConcern: {w: 'majority'}}));
+
+ assert.commandWorked(testDB.adminCommand(
+ {configureFailPoint: 'hangAfterReservingPrepareTimestamp', mode: 'alwaysOn'}));
+
+ // Insert a document for the transaction.
+ assert.commandWorked(testColl.insert(TestData.txnDoc));
+ // Insert a document untouched by the transaction.
+ assert.commandWorked(testColl.insert(TestData.otherDoc));
+
+ // Start a transaction with a single update on the 'txnDoc'.
+ const session = db.getMongo().startSession({causalConsistency: false});
+ const sessionDB = session.getDatabase(TestData.dbName);
+ session.startTransaction({readConcern: {level: 'snapshot'}});
+ assert.commandWorked(sessionDB[collName].update(TestData.txnDoc, {$inc: {x: 1}}));
+
+ // We set the log level up to know when 'prepareTransaction' completes.
+ db.setLogLevel(1);
+
+ // Clear the log history to ensure we only see the most recent 'prepareTransaction'
+ // failpoint log message.
+ assert.commandWorked(db.adminCommand({clearLog: 'global'}));
+ const joinReadThread = startParallelShell(funWithArgs(readThreadFunc, readFunc, collName));
+
+ jsTestLog("Preparing the transaction for " + prefix);
+ const prepareTimestamp = PrepareHelpers.prepareTransaction(session);
+
+ db.setLogLevel(0);
+ joinReadThread({checkExitSuccess: true});
+
+ PrepareHelpers.commitTransaction(session, prepareTimestamp);
+ }
+
+ const snapshotRead = function(_collName) {
+ const _db = db.getSiblingDB(TestData.dbName);
+
+ const session = db.getMongo().startSession({causalConsistency: false});
+ const sessionDB = session.getDatabase(TestData.dbName);
+ session.startTransaction({readConcern: {level: 'snapshot'}});
+
+ const oplogVisibility = function() {
+ jsTestLog("Snapshot reads should block on oplog visibility.");
+ assert.commandFailedWithCode(sessionDB.runCommand({
+ find: _collName,
+ filter: TestData.txnDocFilter,
+ maxTimeMS: TestData.failureTimeout
+ }),
+ ErrorCodes.MaxTimeMSExpired);
+ assert.commandFailedWithCode(sessionDB.runCommand({
+ find: _collName,
+ filter: TestData.otherDocFilter,
+ maxTimeMS: TestData.failureTimeout
+ }),
+ ErrorCodes.MaxTimeMSExpired);
+ };
+
+ const prepareConflict = function() {
+ jsTestLog("Snapshot reads should block on prepared transactions for " +
+ "conflicting documents.");
+ assert.commandFailedWithCode(sessionDB.runCommand({
+ find: _collName,
+ filter: TestData.txnDocFilter,
+ maxTimeMS: TestData.failureTimeout
+ }),
+ ErrorCodes.MaxTimeMSExpired);
+
+ jsTestLog("Snapshot reads should succeed on non-conflicting documents while a " +
+ "transaction is in prepare.");
+ let cursor = assert.commandWorked(sessionDB.runCommand({
+ find: _collName,
+ filter: TestData.otherDocFilter,
+ maxTimeMS: TestData.successTimeout
+ }));
+ assert.docEq(cursor.cursor.firstBatch, [TestData.otherDoc], tojson(cursor));
+ };
+
+ return {oplogVisibility: oplogVisibility, prepareConflict: prepareConflict};
+ };
+
+ const afterClusterTime = function(_collName) {
+ const _db = db.getSiblingDB(TestData.dbName);
+
+ // Advance the cluster time with an arbitrary other insert.
+ let res = assert.commandWorked(
+ _db.runCommand({insert: _collName, documents: [{advanceClusterTime: 1}]}));
+ assert(res.hasOwnProperty("$clusterTime"), tojson(res));
+ assert(res.$clusterTime.hasOwnProperty("clusterTime"), tojson(res));
+ const clusterTime = res.$clusterTime.clusterTime;
+ jsTestLog("Using afterClusterTime: " + clusterTime);
+
+ const oplogVisibility = function() {
+ jsTestLog("afterClusterTime reads should block on oplog visibility.");
+ assert.commandFailedWithCode(_db.runCommand({
+ find: _collName,
+ filter: TestData.txnDocFilter,
+ readConcern: {afterClusterTime: clusterTime},
+ maxTimeMS: TestData.failureTimeout
+ }),
+ ErrorCodes.MaxTimeMSExpired);
+ assert.commandFailedWithCode(_db.runCommand({
+ find: _collName,
+ filter: TestData.otherDocFilter,
+ readConcern: {afterClusterTime: clusterTime},
+ maxTimeMS: TestData.failureTimeout
+ }),
+ ErrorCodes.MaxTimeMSExpired);
+ };
+
+ const prepareConflict = function() {
+ jsTestLog("afterClusterTime reads should block on prepared transactions for " +
+ "conflicting documents.");
+ assert.commandFailedWithCode(_db.runCommand({
+ find: _collName,
+ filter: TestData.txnDocFilter,
+ readConcern: {afterClusterTime: clusterTime},
+ maxTimeMS: TestData.failureTimeout
+ }),
+ ErrorCodes.MaxTimeMSExpired);
+
+ jsTestLog("afterClusterTime reads should succeed on non-conflicting documents " +
+ "while transaction is in prepare.");
+ let cursor = assert.commandWorked(_db.runCommand({
+ find: _collName,
+ filter: TestData.otherDocFilter,
+ readConcern: {afterClusterTime: clusterTime},
+ maxTimeMS: TestData.successTimeout
+ }));
+ assert.docEq(cursor.cursor.firstBatch, [TestData.otherDoc], tojson(cursor));
+ };
+
+ return {oplogVisibility: oplogVisibility, prepareConflict: prepareConflict};
+ };
+
+ const normalRead = function(_collName) {
+ const _db = db.getSiblingDB(TestData.dbName);
+
+ const oplogVisibility = function() {
+ jsTestLog("Ordinary reads should not block on oplog visibility.");
+ let cursor = assert.commandWorked(_db.runCommand({
+ find: _collName,
+ filter: TestData.txnDocFilter,
+ maxTimeMS: TestData.successTimeout
+ }));
+ assert.docEq(cursor.cursor.firstBatch, [TestData.txnDoc], tojson(cursor));
+ cursor = assert.commandWorked(_db.runCommand({
+ find: _collName,
+ filter: TestData.otherDocFilter,
+ maxTimeMS: TestData.successTimeout
+ }));
+ assert.docEq(cursor.cursor.firstBatch, [TestData.otherDoc], tojson(cursor));
+ };
+
+ const prepareConflict = function() {
+ jsTestLog("Ordinary reads should not block on prepared transactions.");
+ // TODO (SERVER-36382): Uncomment this block when local reads don't cause prepare
+ // conflicts.
+ // cursor = assert.commandWorked(_db.runCommand(
+ // {find: _collName, filter: TestData.txnDocFilter, maxTimeMS:
+ // TestData.successTimeout}));
+ // assert.docEq(cursor.cursor.firstBatch, [TestData.txnDoc], tojson(cursor));
+ let cursor = assert.commandWorked(_db.runCommand({
+ find: _collName,
+ filter: TestData.otherDocFilter,
+ maxTimeMS: TestData.successTimeout
+ }));
+ assert.docEq(cursor.cursor.firstBatch, [TestData.otherDoc], tojson(cursor));
+ };
+
+ return {oplogVisibility: oplogVisibility, prepareConflict: prepareConflict};
+ };
+
+ runTest('normal_reads', normalRead);
+ // TODO (SERVER-35821): Unblacklist this snapshot reads test.
+ // runTest('snapshot_reads', snapshotRead);
+ runTest('afterClusterTime', afterClusterTime);
+})();
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index c242abd8d26..e7a9d32fba5 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -645,6 +645,7 @@ env.Library(
'logical_session_id',
'namespace_string',
'repl/oplog_entry',
+ 'repl/oplog_shim',
's/sharding_api_d',
'views/views',
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
diff --git a/src/mongo/db/catalog/uuid_catalog.h b/src/mongo/db/catalog/uuid_catalog.h
index 54e05d33177..b561e1d1a3d 100644
--- a/src/mongo/db/catalog/uuid_catalog.h
+++ b/src/mongo/db/catalog/uuid_catalog.h
@@ -115,7 +115,7 @@ public:
const NamespaceString& collectionName,
OptionalCollectionUUID uuid) override {}
void onTransactionCommit(OperationContext* opCtx, bool wasPrepared) override {}
- void onTransactionPrepare(OperationContext* opCtx) override {}
+ void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {}
void onTransactionAbort(OperationContext* opCtx) override {}
void onReplicationRollback(OperationContext* opCtx,
const RollbackObserverInfo& rbInfo) override {}
diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h
index e70f5d77932..7b4d5861a2d 100644
--- a/src/mongo/db/free_mon/free_mon_op_observer.h
+++ b/src/mongo/db/free_mon/free_mon_op_observer.h
@@ -133,7 +133,7 @@ public:
void onTransactionCommit(OperationContext* opCtx, bool wasPrepared) final {}
- void onTransactionPrepare(OperationContext* opCtx) final {}
+ void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) final {}
void onTransactionAbort(OperationContext* opCtx) final {}
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index f2d3c766045..ebc0590bad0 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -269,8 +269,10 @@ public:
/**
* The onTransactionPrepare method is called when an atomic transaction is prepared. It must be
* called when a transaction is active.
+ *
+ * The 'prepareOpTime' is passed in to be used as the OpTime of the oplog entry.
*/
- virtual void onTransactionPrepare(OperationContext* opCtx) = 0;
+ virtual void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) = 0;
/**
* The onTransactionAbort method is called when an atomic transaction aborts, before the
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index e75618ed99e..16b9701cd47 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -294,7 +294,8 @@ OpTimeBundle replLogApplyOps(OperationContext* opCtx,
const OperationSessionInfo& sessionInfo,
StmtId stmtId,
const repl::OplogLink& oplogLink,
- bool prepare) {
+ bool prepare,
+ const OplogSlot& oplogSlot) {
OpTimeBundle times;
times.wallClockTime = getWallClockTimeForOpLog(opCtx);
times.writeOpTime = logOperation(opCtx,
@@ -309,7 +310,7 @@ OpTimeBundle replLogApplyOps(OperationContext* opCtx,
stmtId,
oplogLink,
prepare,
- OplogSlot());
+ oplogSlot);
return times;
}
@@ -885,7 +886,7 @@ void OpObserverImpl::onApplyOps(OperationContext* opCtx,
// Only transactional 'applyOps' commands can be prepared.
constexpr bool prepare = false;
- replLogApplyOps(opCtx, cmdNss, applyOpCmd, {}, kUninitializedStmtId, {}, prepare);
+ replLogApplyOps(opCtx, cmdNss, applyOpCmd, {}, kUninitializedStmtId, {}, prepare, OplogSlot());
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "c", cmdNss, applyOpCmd, nullptr);
@@ -923,7 +924,7 @@ namespace {
OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
Session* const session,
std::vector<repl::ReplOperation> stmts,
- bool prepare) {
+ const OplogSlot& prepareOplogSlot) {
BSONObjBuilder applyOpsBuilder;
BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd));
for (auto& stmt : stmts) {
@@ -943,9 +944,11 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
invariant(oplogLink.prevOpTime.isNull());
try {
+ // We are only given an oplog slot for prepared transactions.
+ auto prepare = !prepareOplogSlot.opTime.isNull();
auto applyOpCmd = applyOpsBuilder.done();
- auto times =
- replLogApplyOps(opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare);
+ auto times = replLogApplyOps(
+ opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, prepareOplogSlot);
onWriteOpCompleted(
opCtx, cmdNss, session, {stmtId}, times.writeOpTime, times.wallClockTime);
@@ -996,32 +999,29 @@ void OpObserverImpl::onTransactionCommit(OperationContext* opCtx, bool wasPrepar
return;
const auto commitOpTime =
- logApplyOpsForTransaction(opCtx, session, stmts, false /* prepare */).writeOpTime;
+ logApplyOpsForTransaction(opCtx, session, stmts, OplogSlot()).writeOpTime;
invariant(!commitOpTime.isNull());
}
}
-void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx) {
+void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) {
invariant(opCtx->getTxnNumber());
Session* const session = OperationContextSession::get(opCtx);
invariant(session);
invariant(session->inMultiDocumentTransaction());
+ invariant(!prepareOpTime.opTime.isNull());
auto stmts = session->endTransactionAndRetrieveOperations(opCtx);
// We write the oplog entry in a side transaction so that we do not commit the now-prepared
- // transaction. We then return to the main transaction and set its 'prepareTimestamp'.
+ // transaction.
// We write an empty 'applyOps' entry if there were no writes to choose a prepare timestamp
// and allow this transaction to be continued on failover.
- repl::OpTime prepareOpTime;
{
Session::SideTransactionBlock sideTxn(opCtx);
WriteUnitOfWork wuow(opCtx);
- prepareOpTime =
- logApplyOpsForTransaction(opCtx, session, stmts, true /* prepare */).writeOpTime;
+ logApplyOpsForTransaction(opCtx, session, stmts, prepareOpTime);
wuow.commit();
}
- invariant(!prepareOpTime.isNull());
- opCtx->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
}
void OpObserverImpl::onTransactionAbort(OperationContext* opCtx) {
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index 4931629a64c..1fa7e076eb4 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -111,7 +111,7 @@ public:
const NamespaceString& collectionName,
OptionalCollectionUUID uuid);
void onTransactionCommit(OperationContext* opCtx, bool wasPrepared) override;
- void onTransactionPrepare(OperationContext* opCtx) override;
+ void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override;
void onTransactionAbort(OperationContext* opCtx) override;
void onReplicationRollback(OperationContext* opCtx,
const RollbackObserverInfo& rbInfo) override;
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 0fd3455b930..048b4491dae 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -593,7 +593,12 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) {
opObserver().onDelete(opCtx(), nss1, uuid1, 0, false, boost::none);
session()->transitionToPreparedforTest();
- opObserver().onTransactionPrepare(opCtx());
+ {
+ WriteUnitOfWork wuow(opCtx());
+ OplogSlot slot = repl::getNextOpTime(opCtx());
+ opObserver().onTransactionPrepare(opCtx(), slot);
+ opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
+ }
auto oplogEntryObj = getSingleOplogEntry(opCtx());
checkCommonFields(oplogEntryObj);
@@ -654,7 +659,12 @@ TEST_F(OpObserverTransactionTest, PreparingEmptyTransactionLogsEmptyApplyOps) {
session()->unstashTransactionResources(opCtx(), "prepareTransaction");
session()->transitionToPreparedforTest();
- opObserver().onTransactionPrepare(opCtx());
+ {
+ WriteUnitOfWork wuow(opCtx());
+ OplogSlot slot = repl::getNextOpTime(opCtx());
+ opObserver().onTransactionPrepare(opCtx(), slot);
+ opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
+ }
auto oplogEntryObj = getSingleOplogEntry(opCtx());
checkCommonFields(oplogEntryObj);
diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h
index b04fce35c50..98be250f0cf 100644
--- a/src/mongo/db/op_observer_noop.h
+++ b/src/mongo/db/op_observer_noop.h
@@ -110,7 +110,7 @@ public:
const NamespaceString& collectionName,
OptionalCollectionUUID uuid) override {}
void onTransactionCommit(OperationContext* opCtx, bool wasPrepared) override{};
- void onTransactionPrepare(OperationContext* opCtx) override{};
+ void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override{};
void onTransactionAbort(OperationContext* opCtx) override{};
void onReplicationRollback(OperationContext* opCtx,
const RollbackObserverInfo& rbInfo) override {}
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index d1a34bbef6e..a89950d81cd 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -222,10 +222,10 @@ public:
o->onTransactionCommit(opCtx, wasPrepared);
}
- void onTransactionPrepare(OperationContext* opCtx) override {
+ void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {
ReservedTimes times{opCtx};
for (auto& observer : _observers) {
- observer->onTransactionPrepare(opCtx);
+ observer->onTransactionPrepare(opCtx, prepareOpTime);
}
}
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index db50f0278c7..a5f66564955 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -31,6 +31,16 @@ env.Library(
)
env.Library(
+ target="oplog_shim",
+ source=[
+ "oplog_shim.cpp",
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ ],
+)
+
+env.Library(
target='oplogreader',
source=[
'oplogreader.cpp',
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index db12da59795..93906fc21a5 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -678,7 +678,7 @@ void createOplog(OperationContext* opCtx) {
createOplog(opCtx, localOplogInfo(opCtx->getServiceContext()).oplogName, isReplSet);
}
-OplogSlot getNextOpTime(OperationContext* opCtx) {
+MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTime)(OperationContext* opCtx)->OplogSlot {
// The local oplog collection pointer must already be established by this point.
// We can't establish it here because that would require locking the local database, which would
// be a lock order violation.
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index b209b695fa3..565ab7f31cb 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -31,6 +31,7 @@
#include <string>
#include <vector>
+#include "mongo/base/shim.h"
#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
@@ -264,11 +265,20 @@ void createIndexForApplyOps(OperationContext* opCtx,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats,
OplogApplication::Mode mode);
-/**
- * Allocates optimes for new entries in the oplog. Returns an OplogSlot or a vector of OplogSlots,
- * which contain the new optimes along with their terms and newly calculated hash fields.
- */
-OplogSlot getNextOpTime(OperationContext* opCtx);
+// Shims currently do not support free functions so we wrap getNextOpTime in a class as a
+// workaround.
+struct GetNextOpTimeClass {
+ /**
+ * Allocates optimes for new entries in the oplog. Returns an OplogSlot or a vector of
+ * OplogSlots, which contain the new optimes along with their terms and newly calculated hash
+ * fields.
+ */
+ static MONGO_DECLARE_SHIM((OperationContext * opCtx)->OplogSlot) getNextOpTime;
+};
+
+inline OplogSlot getNextOpTime(OperationContext* opCtx) {
+ return GetNextOpTimeClass::getNextOpTime(opCtx);
+}
/**
* Allocates an OpTime, but does not update the storage engine with the timestamp. This is used to
diff --git a/src/mongo/db/repl/oplog_shim.cpp b/src/mongo/db/repl/oplog_shim.cpp
new file mode 100644
index 00000000000..4f6ad99932f
--- /dev/null
+++ b/src/mongo/db/repl/oplog_shim.cpp
@@ -0,0 +1,35 @@
+/**
+* Copyright (C) 2018 MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+#include "mongo/db/repl/oplog.h"
+
+namespace mongo {
+namespace repl {
+MONGO_DEFINE_SHIM(GetNextOpTimeClass::getNextOpTime);
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index 193cb29320c..ebed73cce59 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -133,7 +133,7 @@ public:
void onTransactionCommit(OperationContext* opCtx, bool wasPrepared) override {}
- void onTransactionPrepare(OperationContext* opCtx) override {}
+ void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {}
void onTransactionAbort(OperationContext* opCtx) override {}
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index 21001318ac6..9edc8da25fa 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -134,7 +134,7 @@ public:
void onTransactionCommit(OperationContext* opCtx, bool wasPrepared) override {}
- void onTransactionPrepare(OperationContext* opCtx) override {}
+ void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {}
void onTransactionAbort(OperationContext* opCtx) override {}
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 501895072e3..bb892794afa 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -286,6 +286,8 @@ MONGO_FAIL_POINT_DEFINE(onPrimaryTransactionalWrite);
// Failpoint which will pause an operation just after allocating a point-in-time storage engine
// transaction.
MONGO_FAIL_POINT_DEFINE(hangAfterPreallocateSnapshot);
+
+MONGO_FAIL_POINT_DEFINE(hangAfterReservingPrepareTimestamp);
} // namespace
const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1));
@@ -636,6 +638,47 @@ void Session::_checkTxnValid(WithLock, TxnNumber txnNumber) const {
txnNumber >= _activeTxnNumber);
}
+Session::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx) {
+ // Stash the transaction on the OperationContext on the stack. At the end of this function it
+ // will be unstashed onto the OperationContext.
+ Session::SideTransactionBlock sideTxn(opCtx);
+
+ // Begin a new WUOW and reserve a slot in the oplog.
+ WriteUnitOfWork wuow(opCtx);
+ _oplogSlot = repl::getNextOpTime(opCtx);
+
+ // Release the WUOW state since this WUOW is no longer in use.
+ wuow.release();
+
+ // The new transaction should have an empty locker, and thus we do not need to save it.
+ invariant(opCtx->lockState()->getClientState() == Locker::ClientState::kInactive);
+ _locker = opCtx->swapLockState(stdx::make_unique<LockerImpl>());
+ _locker->unsetThreadId();
+
+ // This thread must still respect the transaction lock timeout, since it can prevent the
+ // transaction from making progress.
+ auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load();
+ if (maxTransactionLockMillis >= 0) {
+ opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis));
+ }
+
+ // Save the RecoveryUnit from the new transaction and replace it with an empty one.
+ _recoveryUnit = std::unique_ptr<RecoveryUnit>(opCtx->releaseRecoveryUnit());
+ opCtx->setRecoveryUnit(opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit(),
+ WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork);
+}
+
+Session::OplogSlotReserver::~OplogSlotReserver() {
+ // If the constructor did not complete, we do not attempt to abort the units of work.
+ if (_recoveryUnit) {
+ // We should be at WUOW nesting level 1, only the top level WUOW for the oplog reservation
+ // side transaction.
+ _locker->endWriteUnitOfWork();
+ invariant(!_locker->inAWriteUnitOfWork());
+ _recoveryUnit->abortUnitOfWork();
+ }
+}
+
Session::TxnResources::TxnResources(OperationContext* opCtx) {
_ruState = opCtx->getWriteUnitOfWork()->release();
opCtx->setWriteUnitOfWork(nullptr);
@@ -683,8 +726,10 @@ void Session::TxnResources::release(OperationContext* opCtx) {
opCtx->swapLockState(std::move(_locker));
opCtx->lockState()->updateThreadIdToCurrentThread();
- opCtx->setRecoveryUnit(_recoveryUnit.release(),
- WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork);
+ auto oldState = opCtx->setRecoveryUnit(_recoveryUnit.release(),
+ WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork);
+ invariant(oldState == WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork,
+ str::stream() << "RecoveryUnit state was " << oldState);
opCtx->setWriteUnitOfWork(WriteUnitOfWork::createForSnapshotResume(opCtx, _ruState));
@@ -870,24 +915,50 @@ Timestamp Session::prepareTransaction(OperationContext* opCtx) {
_txnState.transitionTo(lk, TransactionState::kPrepared);
+ // Reserve an optime for the 'prepareTimestamp'. This will create a hole in the oplog and cause
+ // 'snapshot' and 'afterClusterTime' readers to block until this transaction is done being
+ // prepared. When the OplogSlotReserver goes out of scope and is destroyed, the
+ // storage-transaction it uses to keep the hole open will abort and the slot (and corresponding
+ // oplog hole) will vanish.
+ OplogSlotReserver oplogSlotReserver(opCtx);
+ const auto prepareOplogSlot = oplogSlotReserver.getReservedOplogSlot();
+ const auto prepareTimestamp = prepareOplogSlot.opTime.getTimestamp();
+
+ if (MONGO_FAIL_POINT(hangAfterReservingPrepareTimestamp)) {
+ // This log output is used in js tests so please leave it.
+ log() << "transaction - hangAfterReservingPrepareTimestamp fail point "
+ "enabled. Blocking until fail point is disabled. Prepare OpTime: "
+ << prepareOplogSlot.opTime;
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterReservingPrepareTimestamp);
+ }
+
+ opCtx->recoveryUnit()->setPrepareTimestamp(prepareTimestamp);
+ opCtx->getWriteUnitOfWork()->prepare();
+
// We need to unlock the session to run the opObserver onTransactionPrepare, which calls back
// into the session.
lk.unlock();
auto opObserver = opCtx->getServiceContext()->getOpObserver();
invariant(opObserver);
- opObserver->onTransactionPrepare(opCtx);
- lk.lock();
- _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
+ opObserver->onTransactionPrepare(opCtx, prepareOplogSlot);
- // Ensure that the transaction is still prepared.
- invariant(_txnState.isPrepared(lk), str::stream() << "Current state: " << _txnState);
+ // After the oplog entry is written successfully, it is illegal to implicitly abort or fail.
+ try {
+ abortGuard.Dismiss();
- opCtx->getWriteUnitOfWork()->prepare();
+ lk.lock();
- abortGuard.Dismiss();
+ // Although we are not allowed to abort here, we check that we don't even try to. If we do
+ // try to, that is a bug and we will fassert below.
+ _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
- // Return the prepareTimestamp from the recovery unit.
- return opCtx->recoveryUnit()->getPrepareTimestamp();
+ // Ensure that the transaction is still prepared.
+ invariant(_txnState.isPrepared(lk), str::stream() << "Current state: " << _txnState);
+ } catch (...) {
+ severe() << "Illegal exception after transaction was prepared.";
+ fassertFailedWithStatus(50906, exceptionToStatus());
+ }
+ return prepareTimestamp;
}
void Session::abortArbitraryTransaction() {
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index 6faa0f28b5f..21fa7b9deb5 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -36,6 +36,7 @@
#include "mongo/db/logical_session_id.h"
#include "mongo/db/multi_key_path_tracker.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/session_txn_record_gen.h"
@@ -483,6 +484,36 @@ private:
const repl::OpTime& lastStmtIdWriteTs);
/**
+ * Reserves a slot in the oplog with an open storage-transaction while it is alive. Reserves the
+ * slot at construction. Aborts the storage-transaction and releases the oplog slot at
+ * destruction.
+ */
+ class OplogSlotReserver {
+ public:
+ OplogSlotReserver(OperationContext* opCtx);
+
+ ~OplogSlotReserver();
+
+ // Rule of 5: because we have a class-defined destructor, we need to explictly specify
+ // the move operator and move assignment operator.
+ OplogSlotReserver(OplogSlotReserver&&) = default;
+ OplogSlotReserver& operator=(OplogSlotReserver&&) = default;
+
+ /**
+ * Returns the oplog slot reserved at construction.
+ */
+ OplogSlot getReservedOplogSlot() const {
+ invariant(!_oplogSlot.opTime.isNull());
+ return _oplogSlot;
+ }
+
+ private:
+ std::unique_ptr<Locker> _locker;
+ std::unique_ptr<RecoveryUnit> _recoveryUnit;
+ OplogSlot _oplogSlot;
+ };
+
+ /**
* Indicates the state of the current multi-document transaction, if any. If the transaction is
* in any state but kInProgress, no more operations can be collected. Once the transaction is in
* kPrepared, the transaction is not allowed to abort outside of an 'abortTransaction' command.
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index d5618ac91af..588b10dbee6 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -54,7 +54,6 @@ namespace {
const NamespaceString kNss("TestDB", "TestColl");
const OptionalCollectionUUID kUUID;
-const Timestamp kPrepareTimestamp(Timestamp(1, 1));
/**
* Creates an OplogEntry with given parameters and preset defaults for this test suite.
@@ -87,7 +86,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
class OpObserverMock : public OpObserverNoop {
public:
- void onTransactionPrepare(OperationContext* opCtx) override;
+ void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override;
bool onTransactionPrepareThrowsException = false;
bool transactionPrepared = false;
stdx::function<void()> onTransactionPrepareFn = [this]() { transactionPrepared = true; };
@@ -100,13 +99,9 @@ public:
};
};
-void OpObserverMock::onTransactionPrepare(OperationContext* opCtx) {
+void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) {
ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
- OpObserverNoop::onTransactionPrepare(opCtx);
-
- // Get the recovery unit and set the prepareTimestamp.
- RecoveryUnit* recoveryUnit = opCtx->recoveryUnit();
- recoveryUnit->setPrepareTimestamp(kPrepareTimestamp);
+ OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime);
uassert(ErrorCodes::OperationFailed,
"onTransactionPrepare() failed",
@@ -1525,10 +1520,13 @@ TEST_F(SessionTest, KillSessionsDuringPrepareDoesNotAbortTransaction) {
session.unstashTransactionResources(opCtx(), "prepareTransaction");
+ auto ruPrepareTimestamp = Timestamp();
auto originalFn = _opObserver->onTransactionPrepareFn;
_opObserver->onTransactionPrepareFn = [&]() {
originalFn();
+ ruPrepareTimestamp = opCtx()->recoveryUnit()->getPrepareTimestamp();
+ ASSERT_FALSE(ruPrepareTimestamp.isNull());
// The transaction may be aborted without checking out the session.
session.abortArbitraryTransaction();
ASSERT_FALSE(session.transactionIsAborted());
@@ -1536,12 +1534,12 @@ TEST_F(SessionTest, KillSessionsDuringPrepareDoesNotAbortTransaction) {
// Check that prepareTimestamp gets set.
auto prepareTimestamp = session.prepareTransaction(opCtx());
- ASSERT_EQ(kPrepareTimestamp, prepareTimestamp);
+ ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
ASSERT(_opObserver->transactionPrepared);
ASSERT_FALSE(session.transactionIsAborted());
}
-TEST_F(SessionTest, AbortDuringPrepareAbortsTransaction) {
+DEATH_TEST_F(SessionTest, AbortDuringPrepareIsFatal, "Fatal assertion 50906") {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -1562,10 +1560,7 @@ TEST_F(SessionTest, AbortDuringPrepareAbortsTransaction) {
ASSERT(session.transactionIsAborted());
};
- // A prepareTransaction() after an abort should uassert.
- ASSERT_THROWS_CODE(
- session.prepareTransaction(opCtx()), AssertionException, ErrorCodes::NoSuchTransaction);
- ASSERT(session.transactionIsAborted());
+ session.prepareTransaction(opCtx());
}
TEST_F(SessionTest, ThrowDuringOnTransactionPrepareAbortsTransaction) {
@@ -1834,9 +1829,18 @@ TEST_F(SessionTest, KillSessionsDoesNotAbortPreparedTransactions) {
session.unstashTransactionResources(opCtx(), "insert");
- // Check that prepareTimestamp is set.
+ auto ruPrepareTimestamp = Timestamp();
+ auto originalFn = _opObserver->onTransactionPrepareFn;
+ _opObserver->onTransactionPrepareFn = [&]() {
+ originalFn();
+
+ ruPrepareTimestamp = opCtx()->recoveryUnit()->getPrepareTimestamp();
+ ASSERT_FALSE(ruPrepareTimestamp.isNull());
+ };
+
+ // Check that prepareTimestamp gets set.
auto prepareTimestamp = session.prepareTransaction(opCtx());
- ASSERT_EQ(kPrepareTimestamp, prepareTimestamp);
+ ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
session.stashTransactionResources(opCtx());
session.abortArbitraryTransaction();
@@ -1856,9 +1860,19 @@ TEST_F(SessionTest, CannotStartNewTransactionWhilePreparedTransactionInProgress)
session.unstashTransactionResources(opCtx(), "insert");
+ auto ruPrepareTimestamp = Timestamp();
+ auto originalFn = _opObserver->onTransactionPrepareFn;
+ _opObserver->onTransactionPrepareFn = [&]() {
+ originalFn();
+
+ ruPrepareTimestamp = opCtx()->recoveryUnit()->getPrepareTimestamp();
+ ASSERT_FALSE(ruPrepareTimestamp.isNull());
+ };
+
// Check that prepareTimestamp gets set.
auto prepareTimestamp = session.prepareTransaction(opCtx());
- ASSERT_EQ(kPrepareTimestamp, prepareTimestamp);
+ ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
+
session.stashTransactionResources(opCtx());
// Try to start a new transaction while there is already a prepared transaction on the
diff --git a/src/mongo/db/storage/write_unit_of_work.cpp b/src/mongo/db/storage/write_unit_of_work.cpp
index 0a37830e0ad..e7ae368ada9 100644
--- a/src/mongo/db/storage/write_unit_of_work.cpp
+++ b/src/mongo/db/storage/write_unit_of_work.cpp
@@ -100,4 +100,16 @@ void WriteUnitOfWork::commit() {
_committed = true;
}
+std::ostream& operator<<(std::ostream& os, WriteUnitOfWork::RecoveryUnitState state) {
+ switch (state) {
+ case WriteUnitOfWork::kNotInUnitOfWork:
+ return os << "NotInUnitOfWork";
+ case WriteUnitOfWork::kActiveUnitOfWork:
+ return os << "ActiveUnitOfWork";
+ case WriteUnitOfWork::kFailedUnitOfWork:
+ return os << "FailedUnitOfWork";
+ }
+ MONGO_UNREACHABLE;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/storage/write_unit_of_work.h b/src/mongo/db/storage/write_unit_of_work.h
index 410dda73bfd..d5d7f08ecf9 100644
--- a/src/mongo/db/storage/write_unit_of_work.h
+++ b/src/mongo/db/storage/write_unit_of_work.h
@@ -107,4 +107,6 @@ private:
bool _released = false;
};
+std::ostream& operator<<(std::ostream& os, WriteUnitOfWork::RecoveryUnitState state);
+
} // namespace mongo