summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2018-06-07 11:54:35 -0400
committerJudah Schvimer <judah@mongodb.com>2018-06-07 11:54:35 -0400
commitda63637defad5975040f8eac0e98c86c8d8e2533 (patch)
treee4ebf901a2fef4b8367e1adec35cd9113d5d9640 /src/mongo/db
parent6a66e646c41071c5bf0e28d885a758e05f353536 (diff)
downloadmongo-da63637defad5975040f8eac0e98c86c8d8e2533.tar.gz
SERVER-34824 Make prepareTransaction command write a prepare oplog entry and use its optime as the prepare timestamp
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/commands/dbcheck.cpp3
-rw-r--r--src/mongo/db/commands/txn_cmds.cpp3
-rw-r--r--src/mongo/db/op_observer.h5
-rw-r--r--src/mongo/db/op_observer_impl.cpp122
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp116
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp20
-rw-r--r--src/mongo/db/repl/oplog.cpp19
-rw-r--r--src/mongo/db/repl/oplog.h4
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp5
-rw-r--r--src/mongo/db/repl/oplog_entry.h5
-rw-r--r--src/mongo/db/repl/oplog_entry.idl4
-rw-r--r--src/mongo/db/repl/oplog_test.cpp17
-rw-r--r--src/mongo/db/repl/sync_tail.cpp4
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp3
-rw-r--r--src/mongo/db/session.cpp17
-rw-r--r--src/mongo/db/session.h22
-rw-r--r--src/mongo/db/session_test.cpp9
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h2
18 files changed, 311 insertions, 69 deletions
diff --git a/src/mongo/db/commands/dbcheck.cpp b/src/mongo/db/commands/dbcheck.cpp
index 7ab68f7077f..3380a7c3198 100644
--- a/src/mongo/db/commands/dbcheck.cpp
+++ b/src/mongo/db/commands/dbcheck.cpp
@@ -477,7 +477,8 @@ private:
wallClockTime,
{},
kUninitializedStmtId,
- {});
+ {},
+ false /* prepare */);
uow.commit();
return result;
});
diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp
index 650c1012c40..db96037f735 100644
--- a/src/mongo/db/commands/txn_cmds.cpp
+++ b/src/mongo/db/commands/txn_cmds.cpp
@@ -101,7 +101,6 @@ public:
MONGO_FAIL_POINT_DEFINE(pauseAfterTransactionPrepare);
-// TODO: This is a stub for testing storage prepare functionality.
class CmdPrepareTxn : public BasicCommand {
public:
CmdPrepareTxn() : BasicCommand("prepareTransaction") {}
@@ -119,7 +118,7 @@ public:
}
std::string help() const override {
- return "Preprares a transaction. THIS IS A STUB FOR TESTING.";
+ return "Prepares a transaction. This is only expected to be called by mongos.";
}
Status checkAuthForOperation(OperationContext* opCtx,
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index 9a543aa0aa4..38027587930 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -264,10 +264,7 @@ public:
/**
* The onTransactionPrepare method is called when an atomic transaction is prepared. It must be
- * called when a transaction is active. It generates an OpTime and sets the prepare timestamp on
- * the recovery unit.
- * TODO: This is an incomplete implementation and should only be used for testing. It does not
- * write the prepare oplog entry, only generates an OpTime.
+ * called when a transaction is active.
*/
virtual void onTransactionPrepare(OperationContext* opCtx) = 0;
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index aa85d823222..90b4cfdadbc 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_entry_gen.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/shard_server_op_observer.h"
@@ -73,7 +74,8 @@ repl::OpTime logOperation(OperationContext* opCtx,
Date_t wallClockTime,
const OperationSessionInfo& sessionInfo,
StmtId stmtId,
- const repl::OplogLink& oplogLink) {
+ const repl::OplogLink& oplogLink,
+ bool prepare) {
auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
auto opTime = repl::logOp(opCtx,
opstr,
@@ -85,7 +87,9 @@ repl::OpTime logOperation(OperationContext* opCtx,
wallClockTime,
sessionInfo,
stmtId,
- oplogLink);
+ oplogLink,
+ prepare);
+
times.push_back(opTime);
return opTime;
}
@@ -193,7 +197,8 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
opTimes.wallClockTime,
sessionInfo,
args.stmtId,
- {});
+ {},
+ false /* prepare */);
opTimes.prePostImageOpTime = noteUpdateOpTime;
@@ -214,7 +219,8 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
opTimes.wallClockTime,
sessionInfo,
args.stmtId,
- oplogLink);
+ oplogLink,
+ false /* prepare */);
return opTimes;
}
@@ -252,7 +258,8 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
opTimes.wallClockTime,
sessionInfo,
stmtId,
- {});
+ {},
+ false /* prepare */);
opTimes.prePostImageOpTime = noteOplog;
oplogLink.preImageOpTime = noteOplog;
}
@@ -268,7 +275,8 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
opTimes.wallClockTime,
sessionInfo,
stmtId,
- oplogLink);
+ oplogLink,
+ false /* prepare */);
return opTimes;
}
@@ -280,7 +288,8 @@ OpTimeBundle replLogApplyOps(OperationContext* opCtx,
const BSONObj& applyOpCmd,
const OperationSessionInfo& sessionInfo,
StmtId stmtId,
- const repl::OplogLink& oplogLink) {
+ const repl::OplogLink& oplogLink,
+ bool prepare) {
OpTimeBundle times;
times.wallClockTime = getWallClockTimeForOpLog(opCtx);
times.writeOpTime = logOperation(opCtx,
@@ -293,7 +302,8 @@ OpTimeBundle replLogApplyOps(OperationContext* opCtx,
times.wallClockTime,
sessionInfo,
stmtId,
- oplogLink);
+ oplogLink,
+ prepare);
return times;
}
@@ -325,7 +335,8 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
getWallClockTimeForOpLog(opCtx),
{},
kUninitializedStmtId,
- {});
+ {},
+ false /* prepare */);
} else {
logOperation(opCtx,
"i",
@@ -337,7 +348,8 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
getWallClockTimeForOpLog(opCtx),
{},
kUninitializedStmtId,
- {});
+ {},
+ false /* prepare */);
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -561,7 +573,8 @@ void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx,
getWallClockTimeForOpLog(opCtx),
{},
kUninitializedStmtId,
- {});
+ {},
+ false /* prepare */);
}
void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
@@ -604,7 +617,8 @@ void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
getWallClockTimeForOpLog(opCtx),
{},
kUninitializedStmtId,
- {});
+ {},
+ false /* prepare */);
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -650,7 +664,8 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx,
getWallClockTimeForOpLog(opCtx),
{},
kUninitializedStmtId,
- {});
+ {},
+ false /* prepare */);
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -687,7 +702,8 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string&
getWallClockTimeForOpLog(opCtx),
{},
kUninitializedStmtId,
- {});
+ {},
+ false /* prepare */);
uassert(
50714, "dropping the admin database is not allowed.", dbName != NamespaceString::kAdminDb);
@@ -720,7 +736,8 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
getWallClockTimeForOpLog(opCtx),
{},
kUninitializedStmtId,
- {});
+ {},
+ false /* prepare */);
}
uassert(50715,
@@ -760,7 +777,8 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx,
getWallClockTimeForOpLog(opCtx),
{},
kUninitializedStmtId,
- {});
+ {},
+ false /* prepare */);
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "c", cmdNss, cmdObj, &indexInfo);
@@ -795,7 +813,8 @@ repl::OpTime OpObserverImpl::preRenameCollection(OperationContext* const opCtx,
getWallClockTimeForOpLog(opCtx),
{},
kUninitializedStmtId,
- {});
+ {},
+ false /* prepare */);
return {};
}
@@ -848,7 +867,10 @@ void OpObserverImpl::onApplyOps(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& applyOpCmd) {
const NamespaceString cmdNss{dbName, "$cmd"};
- replLogApplyOps(opCtx, cmdNss, applyOpCmd, {}, kUninitializedStmtId, {});
+
+ // Only transactional 'applyOps' commands can be prepared.
+ constexpr bool prepare = false;
+ replLogApplyOps(opCtx, cmdNss, applyOpCmd, {}, kUninitializedStmtId, {}, prepare);
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "c", cmdNss, applyOpCmd, nullptr);
@@ -872,30 +894,27 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
getWallClockTimeForOpLog(opCtx),
{},
kUninitializedStmtId,
- {});
+ {},
+ false /* prepare */);
}
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "c", cmdNss, cmdObj, nullptr);
}
-void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) {
- invariant(opCtx->getTxnNumber());
- Session* const session = OperationContextSession::get(opCtx);
- invariant(session);
- auto stmts = session->endTransactionAndRetrieveOperations(opCtx);
-
- // It is possible that the transaction resulted in no changes. In that case, we should
- // not write an empty applyOps entry.
- if (stmts.empty())
- return;
+namespace {
+OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
+ Session* const session,
+ std::vector<repl::ReplOperation> stmts,
+ bool prepare) {
BSONObjBuilder applyOpsBuilder;
BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd));
for (auto& stmt : stmts) {
opsArray.append(stmt.toBSON());
}
opsArray.done();
+
const NamespaceString cmdNss{"admin", "$cmd"};
OperationSessionInfo sessionInfo;
@@ -909,10 +928,12 @@ void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) {
try {
auto applyOpCmd = applyOpsBuilder.done();
- auto times = replLogApplyOps(opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink);
+ auto times =
+ replLogApplyOps(opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare);
onWriteOpCompleted(
opCtx, cmdNss, session, {stmtId}, times.writeOpTime, times.wallClockTime);
+ return times;
} catch (const AssertionException& e) {
// Change the error code to TransactionTooLarge if it is BSONObjectTooLarge.
uassert(ErrorCodes::TransactionTooLarge,
@@ -920,15 +941,52 @@ void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) {
e.code() != ErrorCodes::BSONObjectTooLarge);
throw;
}
+ MONGO_UNREACHABLE;
+}
+
+} // namespace
+
+void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) {
+ invariant(opCtx->getTxnNumber());
+ Session* const session = OperationContextSession::get(opCtx);
+ invariant(session);
+ invariant(session->inMultiDocumentTransaction());
+ auto stmts = session->endTransactionAndRetrieveOperations(opCtx);
+
+ // It is possible that the transaction resulted in no changes. In that case, we should
+ // not write an empty applyOps entry.
+ if (stmts.empty())
+ return;
+
+ const auto commitOpTime =
+ logApplyOpsForTransaction(opCtx, session, stmts, false /* prepare */).writeOpTime;
+ invariant(!commitOpTime.isNull());
}
void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx) {
invariant(opCtx->getTxnNumber());
Session* const session = OperationContextSession::get(opCtx);
+ invariant(session);
invariant(session->inMultiDocumentTransaction());
+ auto stmts = session->endTransactionAndRetrieveOperations(opCtx);
+
+ // It is possible that the transaction resulted in no changes. In that case, we should
+ // not write an empty applyOps entry.
+ if (stmts.empty())
+ return;
- auto opTime = repl::getNextOpTimeNoPersistForTesting(opCtx).opTime;
- opCtx->recoveryUnit()->setPrepareTimestamp(opTime.getTimestamp());
+ // We write the oplog entry in a side transaction so that we do not commit the now-prepared
+ // transaction. We then return to the main transaction and set its 'prepareTimestamp'.
+ repl::OpTime prepareOpTime;
+ {
+ Session::SideTransactionBlock sideTxn(opCtx);
+ WriteUnitOfWork wuow(opCtx);
+ prepareOpTime =
+ logApplyOpsForTransaction(opCtx, session, stmts, true /* prepare */).writeOpTime;
+ wuow.commit();
+ }
+ invariant(!prepareOpTime.isNull());
+ opCtx->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
}
void OpObserverImpl::onTransactionAbort(OperationContext* opCtx) {
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index f0a4c058ca7..4947927c380 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -541,6 +541,116 @@ private:
boost::optional<ScopedSession> _session;
};
+TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) {
+ const NamespaceString nss1("testDB", "testColl");
+ const NamespaceString nss2("testDB2", "testColl2");
+ auto uuid1 = CollectionUUID::gen();
+ auto uuid2 = CollectionUUID::gen();
+ const TxnNumber txnNum = 2;
+ opCtx()->setTxnNumber(txnNum);
+ OperationContextSession opSession(opCtx(),
+ true /* checkOutSession */,
+ false /* autocommit */,
+ true /* startTransaction*/,
+ "testDB",
+ "insert");
+
+ session()->unstashTransactionResources(opCtx(), "insert");
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+ inserts1.emplace_back(1,
+ BSON("_id" << 1 << "data"
+ << "y"));
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+
+ OplogUpdateEntryArgs update2;
+ update2.nss = nss2;
+ update2.uuid = uuid2;
+ update2.stmtId = 1;
+ update2.updatedDoc = BSON("_id" << 0 << "data"
+ << "y");
+ update2.update = BSON("$set" << BSON("data"
+ << "y"));
+ update2.criteria = BSON("_id" << 0);
+ opObserver().onUpdate(opCtx(), update2);
+
+ opObserver().aboutToDelete(opCtx(),
+ nss1,
+ BSON("_id" << 0 << "data"
+ << "x"));
+ opObserver().onDelete(opCtx(), nss1, uuid1, 0, false, boost::none);
+
+ opObserver().onTransactionPrepare(opCtx());
+
+ auto oplogEntry = getSingleOplogEntry(opCtx());
+ checkCommonFields(oplogEntry);
+ auto o = oplogEntry.getObjectField("o");
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("_id" << 0 << "data"
+ << "x"))
+ << BSON("op"
+ << "i"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("_id" << 1 << "data"
+ << "y"))
+ << BSON("op"
+ << "u"
+ << "ns"
+ << nss2.toString()
+ << "ui"
+ << uuid2
+ << "o"
+ << BSON("$set" << BSON("data"
+ << "y"))
+ << "o2"
+ << BSON("_id" << 0))
+ << BSON("op"
+ << "d"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("_id" << 0))));
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT(oplogEntry.getBoolField("prepare"));
+}
+
+TEST_F(OpObserverTransactionTest, PreparingEmptyTransactionLogsNothing) {
+ const TxnNumber txnNum = 2;
+ opCtx()->setTxnNumber(txnNum);
+ OperationContextSession opSession(opCtx(),
+ true /* checkOutSession */,
+ false /* autocommit */,
+ true /* startTransaction*/,
+ "admin",
+ "prepareTransaction");
+
+ session()->unstashTransactionResources(opCtx(), "prepareTransaction");
+ opObserver().onTransactionPrepare(opCtx());
+
+ AutoGetCollection autoColl1(opCtx(), NamespaceString::kRsOplogNamespace, MODE_IX);
+ repl::OplogInterfaceLocal oplogInterface(opCtx(), NamespaceString::kRsOplogNamespace.ns());
+ auto oplogIter = oplogInterface.makeIterator();
+ ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, oplogIter->next().getStatus());
+}
+
TEST_F(OpObserverTransactionTest, TransactionalInsertTest) {
const NamespaceString nss1("testDB", "testColl");
const NamespaceString nss2("testDB2", "testColl2");
@@ -617,6 +727,8 @@ TEST_F(OpObserverTransactionTest, TransactionalInsertTest) {
<< BSON("_id" << 3 << "data"
<< "w"))));
ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_FALSE(oplogEntry.hasField("prepare"));
+ ASSERT_FALSE(oplogEntry.getBoolField("prepare"));
}
TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) {
@@ -687,6 +799,8 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) {
<< "o2"
<< BSON("_id" << 1))));
ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_FALSE(oplogEntry.hasField("prepare"));
+ ASSERT_FALSE(oplogEntry.getBoolField("prepare"));
}
TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) {
@@ -739,6 +853,8 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) {
<< "o"
<< BSON("_id" << 1))));
ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_FALSE(oplogEntry.hasField("prepare"));
+ ASSERT_FALSE(oplogEntry.getBoolField("prepare"));
}
DEATH_TEST_F(OpObserverTest, AboutToDeleteMustPreceedOnDelete, "invariant") {
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 4450ba31870..8e9754a058a 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -120,24 +120,10 @@ void finishCurOp(OperationContext* opCtx, CurOp* curOp) {
const bool shouldSample =
curOp->completeAndLogOperation(opCtx, MONGO_LOG_DEFAULT_COMPONENT);
- auto session = OperationContextSession::get(opCtx);
if (curOp->shouldDBProfile(shouldSample)) {
- boost::optional<Session::TxnResources> txnResources;
- if (session && session->inSnapshotReadOrMultiDocumentTransaction()) {
- // Stash the current transaction so that writes to the profile collection are not
- // done as part of the transaction. This must be done under the client lock, since
- // we are modifying 'opCtx'.
- stdx::lock_guard<Client> clientLock(*opCtx->getClient());
- txnResources = Session::TxnResources(opCtx);
- }
- ON_BLOCK_EXIT([&] {
- if (txnResources) {
- // Restore the transaction state onto 'opCtx'. This must be done under the
- // client lock, since we are modifying 'opCtx'.
- stdx::lock_guard<Client> clientLock(*opCtx->getClient());
- txnResources->release(opCtx);
- }
- });
+ // Stash the current transaction so that writes to the profile collection are not
+ // done as part of the transaction.
+ Session::SideTransactionBlock sideTxn(opCtx);
profile(opCtx, CurOp::get(opCtx)->getNetworkOp());
}
} catch (const DBException& ex) {
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index ead136c18a7..a0d13182694 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -330,7 +330,8 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx,
Date_t wallTime,
const OperationSessionInfo& sessionInfo,
StmtId statementId,
- const OplogLink& oplogLink) {
+ const OplogLink& oplogLink,
+ bool prepare) {
BSONObjBuilder b(256);
b.append("ts", optime.getTimestamp());
@@ -353,6 +354,11 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx,
b.appendDate("wall", wallTime);
appendSessionInfo(opCtx, &b, statementId, sessionInfo, oplogLink);
+
+ if (prepare) {
+ b.appendBool(OplogEntryBase::kPrepareFieldName, true);
+ }
+
return OplogDocWriter(OplogDocWriter(b.obj(), obj));
}
} // end anon namespace
@@ -427,7 +433,8 @@ OpTime logOp(OperationContext* opCtx,
Date_t wallClockTime,
const OperationSessionInfo& sessionInfo,
StmtId statementId,
- const OplogLink& oplogLink) {
+ const OplogLink& oplogLink,
+ bool prepare) {
auto replCoord = ReplicationCoordinator::get(opCtx);
// For commands, the test below is on the command ns and therefore does not check for
// specific namespaces such as system.profile. This is the caller's responsibility.
@@ -460,7 +467,8 @@ OpTime logOp(OperationContext* opCtx,
wallClockTime,
sessionInfo,
statementId,
- oplogLink);
+ oplogLink,
+ prepare);
const DocWriter* basePtr = &writer;
auto timestamp = slot.opTime.getTimestamp();
_logOpsInner(opCtx, nss, &basePtr, &timestamp, 1, oplog, slot.opTime);
@@ -515,6 +523,8 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
if (insertStatementOplogSlot.opTime.isNull()) {
_getNextOpTimes(opCtx, oplog, 1, &insertStatementOplogSlot);
}
+ // Only 'applyOps' oplog entries can be prepared.
+ constexpr bool prepare = false;
writers.emplace_back(_logOpWriter(opCtx,
"i",
nss,
@@ -527,7 +537,8 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
wallClockTime,
sessionInfo,
begin[i].stmtId,
- oplogLink));
+ oplogLink,
+ prepare));
oplogLink.prevOpTime = insertStatementOplogSlot.opTime;
timestamps[i] = oplogLink.prevOpTime.getTimestamp();
opTimes.push_back(insertStatementOplogSlot.opTime);
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 7c569e7d3aa..b4d34ca0ef6 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -126,6 +126,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
* oplogLink this contains the timestamp that points to the previous write that will be
* linked via prevTs, and the timestamps of the oplog entry that contains the document
* before/after update was applied. The timestamps are ignored if isNull() is true.
+ * prepare this specifies if the oplog entry should be put into a 'prepare' state.
*
* Returns the optime of the oplog entry written to the oplog.
* Returns a null optime if oplog was not modified.
@@ -140,7 +141,8 @@ OpTime logOp(OperationContext* opCtx,
Date_t wallClockTime,
const OperationSessionInfo& sessionInfo,
StmtId stmtId,
- const OplogLink& oplogLink);
+ const OplogLink& oplogLink,
+ bool prepare);
// Flush out the cached pointer to the oplog.
// Used by the closeDatabase command to ensure we don't cache closed things.
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 7748a68f18e..731bde4e43b 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -257,6 +257,11 @@ bool OplogEntry::isCrudOpType() const {
return isCrudOpType(getOpType());
}
+bool OplogEntry::shouldPrepare() const {
+ invariant(getCommandType() == OplogEntry::CommandType::kApplyOps);
+ return getPrepare() && *getPrepare();
+}
+
BSONElement OplogEntry::getIdElement() const {
invariant(isCrudOpType());
if (getOpType() == OpTypeEnum::kUpdate) {
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 270ddd01ae3..0c632dc3500 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -115,6 +115,11 @@ public:
bool isCrudOpType() const;
/**
+ * Returns if the operation should be prepared. Must be called on an 'applyOps' entry.
+ */
+ bool shouldPrepare() const;
+
+ /**
* Returns the _id of the document being modified. Must be called on CRUD ops.
*/
BSONElement getIdElement() const;
diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl
index fa31e20d790..9d0af006a97 100644
--- a/src/mongo/db/repl/oplog_entry.idl
+++ b/src/mongo/db/repl/oplog_entry.idl
@@ -139,3 +139,7 @@ structs:
optional: true
description: "The optime of another oplog entry that contains the document
after an update was applied."
+ prepare:
+ type: bool
+ optional: true
+ description: "Specifies that this operation should be put into a 'prepare' state"
diff --git a/src/mongo/db/repl/oplog_test.cpp b/src/mongo/db/repl/oplog_test.cpp
index 9a38b77d489..93738bb47ed 100644
--- a/src/mongo/db/repl/oplog_test.cpp
+++ b/src/mongo/db/repl/oplog_test.cpp
@@ -110,7 +110,8 @@ TEST_F(OplogTest, LogOpReturnsOpTimeOnSuccessfulInsertIntoOplogCollection) {
Date_t::now(),
{},
kUninitializedStmtId,
- {});
+ {},
+ false /* prepare */);
ASSERT_FALSE(opTime.isNull());
wunit.commit();
}
@@ -222,8 +223,18 @@ OpTime _logOpNoopWithMsg(OperationContext* opCtx,
// logOp() must be called while holding lock because ephemeralForTest storage engine does not
// support concurrent updates to its internal state.
const auto msgObj = BSON("msg" << nss.ns());
- auto opTime = logOp(
- opCtx, "n", nss, {}, msgObj, nullptr, false, Date_t::now(), {}, kUninitializedStmtId, {});
+ auto opTime = logOp(opCtx,
+ "n",
+ nss,
+ {},
+ msgObj,
+ nullptr,
+ false,
+ Date_t::now(),
+ {},
+ kUninitializedStmtId,
+ {},
+ false /* prepare */);
ASSERT_FALSE(opTime.isNull());
ASSERT(opTimeNssMap->find(opTime) == opTimeNssMap->end())
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 11e277188b2..b7370e04b02 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -572,6 +572,10 @@ void fillWriterVectors(OperationContext* opCtx,
// Extract applyOps operations and fill writers with extracted operations using this
// function.
if (op.isCommand() && op.getCommandType() == OplogEntry::CommandType::kApplyOps) {
+ if (op.shouldPrepare()) {
+ // TODO (SERVER-35307) mark operations as needing prepare.
+ continue;
+ }
try {
derivedOps->emplace_back(ApplyOps::extractOperations(op));
fillWriterVectors(
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index ac037085450..0c318ba7605 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -281,7 +281,8 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
*oplogEntry.getWallClockTime(),
sessionInfo,
stmtId,
- oplogLink);
+ oplogLink,
+ false /* prepare */);
auto oplogOpTime = result.oplogTime;
uassert(40633,
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index d0ab4bbcc8b..95457f11663 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -722,6 +722,23 @@ void Session::TxnResources::release(OperationContext* opCtx) {
readConcernArgs = _readConcernArgs;
}
+Session::SideTransactionBlock::SideTransactionBlock(OperationContext* opCtx) : _opCtx(opCtx) {
+ if (_opCtx->getWriteUnitOfWork()) {
+ // This must be done under the client lock, since we are modifying '_opCtx'.
+ stdx::lock_guard<Client> clientLock(*_opCtx->getClient());
+ _txnResources = Session::TxnResources(_opCtx);
+ }
+}
+
+Session::SideTransactionBlock::~SideTransactionBlock() {
+ if (_txnResources) {
+ // Restore the transaction state onto '_opCtx'. This must be done under the
+ // client lock, since we are modifying '_opCtx'.
+ stdx::lock_guard<Client> clientLock(*_opCtx->getClient());
+ _txnResources->release(_opCtx);
+ }
+}
+
void Session::stashTransactionResources(OperationContext* opCtx) {
if (opCtx->getClient()->isInDirectClient()) {
return;
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index 793d5dec814..c2d29abd55e 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -107,6 +107,25 @@ public:
WriteUnitOfWork::RecoveryUnitState _ruState;
};
+ /**
+ * An RAII object that stashes `TxnResouces` from the `opCtx` onto the stack. At destruction
+ * it unstashes the `TxnResources` back onto the `opCtx`.
+ */
+ class SideTransactionBlock {
+ public:
+ SideTransactionBlock(OperationContext* opCtx);
+ ~SideTransactionBlock();
+
+ // Rule of 5: because we have a class-defined destructor, we need to explictly specify
+ // the move operator and move assignment operator.
+ SideTransactionBlock(SideTransactionBlock&&) = default;
+ SideTransactionBlock& operator=(SideTransactionBlock&&) = default;
+
+ private:
+ boost::optional<Session::TxnResources> _txnResources;
+ OperationContext* _opCtx;
+ };
+
using CommittedStatementTimestampMap = stdx::unordered_map<StmtId, repl::OpTime>;
using CursorKillFunction =
std::function<size_t(OperationContext*, LogicalSessionId, TxnNumber)>;
@@ -305,7 +324,8 @@ public:
/**
* Returns whether we are in a multi-document transaction, which means we have an active
- * transaction which has autoCommit:false and has not been committed or aborted.
+ * transaction which has autoCommit:false and has not been committed or aborted. It is possible
+ * that the current transaction is stashed onto the stack via a `SideTransactionBlock`.
*/
bool inMultiDocumentTransaction() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index 38f4a9e2563..a85e37899ce 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -126,7 +126,8 @@ protected:
Date_t::now(),
osi,
stmtId,
- link);
+ link,
+ false /* prepare */);
}
void bumpTxnNumberFromDifferentOpCtx(Session* session, TxnNumber newTxnNum) {
@@ -526,7 +527,8 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
wallClockTime,
osi,
1,
- {});
+ {},
+ false /* prepare */);
session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {1}, opTime, wallClockTime);
wuow.commit();
@@ -552,7 +554,8 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
wallClockTime,
osi,
kIncompleteHistoryStmtId,
- link);
+ link,
+ false /* prepare */);
session.onWriteOpCompletedOnPrimary(
opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime);
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
index 87d46bef8a9..45d72ff7c44 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
@@ -76,6 +76,8 @@ public:
virtual void setOrderedCommit(bool orderedCommit) {}
+ virtual void setPrepareTimestamp(Timestamp) {}
+
private:
typedef std::shared_ptr<Change> ChangePtr;
typedef std::vector<ChangePtr> Changes;