diff options
author | Judah Schvimer <judah@mongodb.com> | 2018-06-07 11:54:35 -0400 |
---|---|---|
committer | Judah Schvimer <judah@mongodb.com> | 2018-06-07 11:54:35 -0400 |
commit | da63637defad5975040f8eac0e98c86c8d8e2533 (patch) | |
tree | e4ebf901a2fef4b8367e1adec35cd9113d5d9640 /src/mongo/db | |
parent | 6a66e646c41071c5bf0e28d885a758e05f353536 (diff) | |
download | mongo-da63637defad5975040f8eac0e98c86c8d8e2533.tar.gz |
SERVER-34824 Make prepareTransaction command write a prepare oplog entry and use its optime as the prepare timestamp
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/commands/dbcheck.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/commands/txn_cmds.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/op_observer.h | 5 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 122 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 116 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.idl | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_test.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/session.h | 22 | ||||
-rw-r--r-- | src/mongo/db/session_test.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h | 2 |
18 files changed, 311 insertions, 69 deletions
diff --git a/src/mongo/db/commands/dbcheck.cpp b/src/mongo/db/commands/dbcheck.cpp index 7ab68f7077f..3380a7c3198 100644 --- a/src/mongo/db/commands/dbcheck.cpp +++ b/src/mongo/db/commands/dbcheck.cpp @@ -477,7 +477,8 @@ private: wallClockTime, {}, kUninitializedStmtId, - {}); + {}, + false /* prepare */); uow.commit(); return result; }); diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp index 650c1012c40..db96037f735 100644 --- a/src/mongo/db/commands/txn_cmds.cpp +++ b/src/mongo/db/commands/txn_cmds.cpp @@ -101,7 +101,6 @@ public: MONGO_FAIL_POINT_DEFINE(pauseAfterTransactionPrepare); -// TODO: This is a stub for testing storage prepare functionality. class CmdPrepareTxn : public BasicCommand { public: CmdPrepareTxn() : BasicCommand("prepareTransaction") {} @@ -119,7 +118,7 @@ public: } std::string help() const override { - return "Preprares a transaction. THIS IS A STUB FOR TESTING."; + return "Prepares a transaction. This is only expected to be called by mongos."; } Status checkAuthForOperation(OperationContext* opCtx, diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 9a543aa0aa4..38027587930 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -264,10 +264,7 @@ public: /** * The onTransactionPrepare method is called when an atomic transaction is prepared. It must be - * called when a transaction is active. It generates an OpTime and sets the prepare timestamp on - * the recovery unit. - * TODO: This is an incomplete implementation and should only be used for testing. It does not - * write the prepare oplog entry, only generates an OpTime. + * called when a transaction is active. */ virtual void onTransactionPrepare(OperationContext* opCtx) = 0; diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index aa85d823222..90b4cfdadbc 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -45,6 +45,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_entry_gen.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/shard_server_op_observer.h" @@ -73,7 +74,8 @@ repl::OpTime logOperation(OperationContext* opCtx, Date_t wallClockTime, const OperationSessionInfo& sessionInfo, StmtId stmtId, - const repl::OplogLink& oplogLink) { + const repl::OplogLink& oplogLink, + bool prepare) { auto& times = OpObserver::Times::get(opCtx).reservedOpTimes; auto opTime = repl::logOp(opCtx, opstr, @@ -85,7 +87,9 @@ repl::OpTime logOperation(OperationContext* opCtx, wallClockTime, sessionInfo, stmtId, - oplogLink); + oplogLink, + prepare); + times.push_back(opTime); return opTime; } @@ -193,7 +197,8 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, opTimes.wallClockTime, sessionInfo, args.stmtId, - {}); + {}, + false /* prepare */); opTimes.prePostImageOpTime = noteUpdateOpTime; @@ -214,7 +219,8 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, opTimes.wallClockTime, sessionInfo, args.stmtId, - oplogLink); + oplogLink, + false /* prepare */); return opTimes; } @@ -252,7 +258,8 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, opTimes.wallClockTime, sessionInfo, stmtId, - {}); + {}, + false /* prepare */); opTimes.prePostImageOpTime = noteOplog; oplogLink.preImageOpTime = noteOplog; } @@ -268,7 +275,8 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, opTimes.wallClockTime, sessionInfo, stmtId, - oplogLink); + oplogLink, + false /* prepare */); return opTimes; } @@ -280,7 +288,8 @@ OpTimeBundle replLogApplyOps(OperationContext* opCtx, const BSONObj& applyOpCmd, const OperationSessionInfo& sessionInfo, StmtId stmtId, - const repl::OplogLink& oplogLink) { + const repl::OplogLink& oplogLink, + bool prepare) { OpTimeBundle times; times.wallClockTime = getWallClockTimeForOpLog(opCtx); times.writeOpTime = logOperation(opCtx, @@ -293,7 +302,8 @@ OpTimeBundle replLogApplyOps(OperationContext* opCtx, times.wallClockTime, sessionInfo, stmtId, - oplogLink); + oplogLink, + prepare); return times; } @@ -325,7 +335,8 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, - {}); + {}, + false /* prepare */); } else { logOperation(opCtx, "i", @@ -337,7 +348,8 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, - {}); + {}, + false /* prepare */); } AuthorizationManager::get(opCtx->getServiceContext()) @@ -561,7 +573,8 @@ void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, - {}); + {}, + false /* prepare */); } void OpObserverImpl::onCreateCollection(OperationContext* opCtx, @@ -604,7 +617,8 @@ void OpObserverImpl::onCreateCollection(OperationContext* opCtx, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, - {}); + {}, + false /* prepare */); } AuthorizationManager::get(opCtx->getServiceContext()) @@ -650,7 +664,8 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, - {}); + {}, + false /* prepare */); } AuthorizationManager::get(opCtx->getServiceContext()) @@ -687,7 +702,8 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string& getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, - {}); + {}, + false /* prepare */); uassert( 50714, "dropping the admin database is not allowed.", dbName != NamespaceString::kAdminDb); @@ -720,7 +736,8 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, - {}); + {}, + false /* prepare */); } uassert(50715, @@ -760,7 +777,8 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, - {}); + {}, + false /* prepare */); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, cmdObj, &indexInfo); @@ -795,7 +813,8 @@ repl::OpTime OpObserverImpl::preRenameCollection(OperationContext* const opCtx, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, - {}); + {}, + false /* prepare */); return {}; } @@ -848,7 +867,10 @@ void OpObserverImpl::onApplyOps(OperationContext* opCtx, const std::string& dbName, const BSONObj& applyOpCmd) { const NamespaceString cmdNss{dbName, "$cmd"}; - replLogApplyOps(opCtx, cmdNss, applyOpCmd, {}, kUninitializedStmtId, {}); + + // Only transactional 'applyOps' commands can be prepared. + constexpr bool prepare = false; + replLogApplyOps(opCtx, cmdNss, applyOpCmd, {}, kUninitializedStmtId, {}, prepare); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, applyOpCmd, nullptr); @@ -872,30 +894,27 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, - {}); + {}, + false /* prepare */); } AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, cmdObj, nullptr); } -void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) { - invariant(opCtx->getTxnNumber()); - Session* const session = OperationContextSession::get(opCtx); - invariant(session); - auto stmts = session->endTransactionAndRetrieveOperations(opCtx); - - // It is possible that the transaction resulted in no changes. In that case, we should - // not write an empty applyOps entry. - if (stmts.empty()) - return; +namespace { +OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, + Session* const session, + std::vector<repl::ReplOperation> stmts, + bool prepare) { BSONObjBuilder applyOpsBuilder; BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd)); for (auto& stmt : stmts) { opsArray.append(stmt.toBSON()); } opsArray.done(); + const NamespaceString cmdNss{"admin", "$cmd"}; OperationSessionInfo sessionInfo; @@ -909,10 +928,12 @@ void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) { try { auto applyOpCmd = applyOpsBuilder.done(); - auto times = replLogApplyOps(opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink); + auto times = + replLogApplyOps(opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare); onWriteOpCompleted( opCtx, cmdNss, session, {stmtId}, times.writeOpTime, times.wallClockTime); + return times; } catch (const AssertionException& e) { // Change the error code to TransactionTooLarge if it is BSONObjectTooLarge. uassert(ErrorCodes::TransactionTooLarge, @@ -920,15 +941,52 @@ void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) { e.code() != ErrorCodes::BSONObjectTooLarge); throw; } + MONGO_UNREACHABLE; +} + +} // namespace + +void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) { + invariant(opCtx->getTxnNumber()); + Session* const session = OperationContextSession::get(opCtx); + invariant(session); + invariant(session->inMultiDocumentTransaction()); + auto stmts = session->endTransactionAndRetrieveOperations(opCtx); + + // It is possible that the transaction resulted in no changes. In that case, we should + // not write an empty applyOps entry. + if (stmts.empty()) + return; + + const auto commitOpTime = + logApplyOpsForTransaction(opCtx, session, stmts, false /* prepare */).writeOpTime; + invariant(!commitOpTime.isNull()); } void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx) { invariant(opCtx->getTxnNumber()); Session* const session = OperationContextSession::get(opCtx); + invariant(session); invariant(session->inMultiDocumentTransaction()); + auto stmts = session->endTransactionAndRetrieveOperations(opCtx); + + // It is possible that the transaction resulted in no changes. In that case, we should + // not write an empty applyOps entry. + if (stmts.empty()) + return; - auto opTime = repl::getNextOpTimeNoPersistForTesting(opCtx).opTime; - opCtx->recoveryUnit()->setPrepareTimestamp(opTime.getTimestamp()); + // We write the oplog entry in a side transaction so that we do not commit the now-prepared + // transaction. We then return to the main transaction and set its 'prepareTimestamp'. + repl::OpTime prepareOpTime; + { + Session::SideTransactionBlock sideTxn(opCtx); + WriteUnitOfWork wuow(opCtx); + prepareOpTime = + logApplyOpsForTransaction(opCtx, session, stmts, true /* prepare */).writeOpTime; + wuow.commit(); + } + invariant(!prepareOpTime.isNull()); + opCtx->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); } void OpObserverImpl::onTransactionAbort(OperationContext* opCtx) { diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index f0a4c058ca7..4947927c380 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -541,6 +541,116 @@ private: boost::optional<ScopedSession> _session; }; +TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) { + const NamespaceString nss1("testDB", "testColl"); + const NamespaceString nss2("testDB2", "testColl2"); + auto uuid1 = CollectionUUID::gen(); + auto uuid2 = CollectionUUID::gen(); + const TxnNumber txnNum = 2; + opCtx()->setTxnNumber(txnNum); + OperationContextSession opSession(opCtx(), + true /* checkOutSession */, + false /* autocommit */, + true /* startTransaction*/, + "testDB", + "insert"); + + session()->unstashTransactionResources(opCtx(), "insert"); + WriteUnitOfWork wuow(opCtx()); + AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); + AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); + + std::vector<InsertStatement> inserts1; + inserts1.emplace_back(0, + BSON("_id" << 0 << "data" + << "x")); + inserts1.emplace_back(1, + BSON("_id" << 1 << "data" + << "y")); + opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); + + OplogUpdateEntryArgs update2; + update2.nss = nss2; + update2.uuid = uuid2; + update2.stmtId = 1; + update2.updatedDoc = BSON("_id" << 0 << "data" + << "y"); + update2.update = BSON("$set" << BSON("data" + << "y")); + update2.criteria = BSON("_id" << 0); + opObserver().onUpdate(opCtx(), update2); + + opObserver().aboutToDelete(opCtx(), + nss1, + BSON("_id" << 0 << "data" + << "x")); + opObserver().onDelete(opCtx(), nss1, uuid1, 0, false, boost::none); + + opObserver().onTransactionPrepare(opCtx()); + + auto oplogEntry = getSingleOplogEntry(opCtx()); + checkCommonFields(oplogEntry); + auto o = oplogEntry.getObjectField("o"); + auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("_id" << 0 << "data" + << "x")) + << BSON("op" + << "i" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("_id" << 1 << "data" + << "y")) + << BSON("op" + << "u" + << "ns" + << nss2.toString() + << "ui" + << uuid2 + << "o" + << BSON("$set" << BSON("data" + << "y")) + << "o2" + << BSON("_id" << 0)) + << BSON("op" + << "d" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("_id" << 0)))); + ASSERT_BSONOBJ_EQ(oExpected, o); + ASSERT(oplogEntry.getBoolField("prepare")); +} + +TEST_F(OpObserverTransactionTest, PreparingEmptyTransactionLogsNothing) { + const TxnNumber txnNum = 2; + opCtx()->setTxnNumber(txnNum); + OperationContextSession opSession(opCtx(), + true /* checkOutSession */, + false /* autocommit */, + true /* startTransaction*/, + "admin", + "prepareTransaction"); + + session()->unstashTransactionResources(opCtx(), "prepareTransaction"); + opObserver().onTransactionPrepare(opCtx()); + + AutoGetCollection autoColl1(opCtx(), NamespaceString::kRsOplogNamespace, MODE_IX); + repl::OplogInterfaceLocal oplogInterface(opCtx(), NamespaceString::kRsOplogNamespace.ns()); + auto oplogIter = oplogInterface.makeIterator(); + ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, oplogIter->next().getStatus()); +} + TEST_F(OpObserverTransactionTest, TransactionalInsertTest) { const NamespaceString nss1("testDB", "testColl"); const NamespaceString nss2("testDB2", "testColl2"); @@ -617,6 +727,8 @@ TEST_F(OpObserverTransactionTest, TransactionalInsertTest) { << BSON("_id" << 3 << "data" << "w")))); ASSERT_BSONOBJ_EQ(oExpected, o); + ASSERT_FALSE(oplogEntry.hasField("prepare")); + ASSERT_FALSE(oplogEntry.getBoolField("prepare")); } TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) { @@ -687,6 +799,8 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) { << "o2" << BSON("_id" << 1)))); ASSERT_BSONOBJ_EQ(oExpected, o); + ASSERT_FALSE(oplogEntry.hasField("prepare")); + ASSERT_FALSE(oplogEntry.getBoolField("prepare")); } TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) { @@ -739,6 +853,8 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) { << "o" << BSON("_id" << 1)))); ASSERT_BSONOBJ_EQ(oExpected, o); + ASSERT_FALSE(oplogEntry.hasField("prepare")); + ASSERT_FALSE(oplogEntry.getBoolField("prepare")); } DEATH_TEST_F(OpObserverTest, AboutToDeleteMustPreceedOnDelete, "invariant") { diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 4450ba31870..8e9754a058a 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -120,24 +120,10 @@ void finishCurOp(OperationContext* opCtx, CurOp* curOp) { const bool shouldSample = curOp->completeAndLogOperation(opCtx, MONGO_LOG_DEFAULT_COMPONENT); - auto session = OperationContextSession::get(opCtx); if (curOp->shouldDBProfile(shouldSample)) { - boost::optional<Session::TxnResources> txnResources; - if (session && session->inSnapshotReadOrMultiDocumentTransaction()) { - // Stash the current transaction so that writes to the profile collection are not - // done as part of the transaction. This must be done under the client lock, since - // we are modifying 'opCtx'. - stdx::lock_guard<Client> clientLock(*opCtx->getClient()); - txnResources = Session::TxnResources(opCtx); - } - ON_BLOCK_EXIT([&] { - if (txnResources) { - // Restore the transaction state onto 'opCtx'. This must be done under the - // client lock, since we are modifying 'opCtx'. - stdx::lock_guard<Client> clientLock(*opCtx->getClient()); - txnResources->release(opCtx); - } - }); + // Stash the current transaction so that writes to the profile collection are not + // done as part of the transaction. + Session::SideTransactionBlock sideTxn(opCtx); profile(opCtx, CurOp::get(opCtx)->getNetworkOp()); } } catch (const DBException& ex) { diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index ead136c18a7..a0d13182694 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -330,7 +330,8 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx, Date_t wallTime, const OperationSessionInfo& sessionInfo, StmtId statementId, - const OplogLink& oplogLink) { + const OplogLink& oplogLink, + bool prepare) { BSONObjBuilder b(256); b.append("ts", optime.getTimestamp()); @@ -353,6 +354,11 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx, b.appendDate("wall", wallTime); appendSessionInfo(opCtx, &b, statementId, sessionInfo, oplogLink); + + if (prepare) { + b.appendBool(OplogEntryBase::kPrepareFieldName, true); + } + return OplogDocWriter(OplogDocWriter(b.obj(), obj)); } } // end anon namespace @@ -427,7 +433,8 @@ OpTime logOp(OperationContext* opCtx, Date_t wallClockTime, const OperationSessionInfo& sessionInfo, StmtId statementId, - const OplogLink& oplogLink) { + const OplogLink& oplogLink, + bool prepare) { auto replCoord = ReplicationCoordinator::get(opCtx); // For commands, the test below is on the command ns and therefore does not check for // specific namespaces such as system.profile. This is the caller's responsibility. @@ -460,7 +467,8 @@ OpTime logOp(OperationContext* opCtx, wallClockTime, sessionInfo, statementId, - oplogLink); + oplogLink, + prepare); const DocWriter* basePtr = &writer; auto timestamp = slot.opTime.getTimestamp(); _logOpsInner(opCtx, nss, &basePtr, ×tamp, 1, oplog, slot.opTime); @@ -515,6 +523,8 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, if (insertStatementOplogSlot.opTime.isNull()) { _getNextOpTimes(opCtx, oplog, 1, &insertStatementOplogSlot); } + // Only 'applyOps' oplog entries can be prepared. + constexpr bool prepare = false; writers.emplace_back(_logOpWriter(opCtx, "i", nss, @@ -527,7 +537,8 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, wallClockTime, sessionInfo, begin[i].stmtId, - oplogLink)); + oplogLink, + prepare)); oplogLink.prevOpTime = insertStatementOplogSlot.opTime; timestamps[i] = oplogLink.prevOpTime.getTimestamp(); opTimes.push_back(insertStatementOplogSlot.opTime); diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 7c569e7d3aa..b4d34ca0ef6 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -126,6 +126,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, * oplogLink this contains the timestamp that points to the previous write that will be * linked via prevTs, and the timestamps of the oplog entry that contains the document * before/after update was applied. The timestamps are ignored if isNull() is true. + * prepare this specifies if the oplog entry should be put into a 'prepare' state. * * Returns the optime of the oplog entry written to the oplog. * Returns a null optime if oplog was not modified. @@ -140,7 +141,8 @@ OpTime logOp(OperationContext* opCtx, Date_t wallClockTime, const OperationSessionInfo& sessionInfo, StmtId stmtId, - const OplogLink& oplogLink); + const OplogLink& oplogLink, + bool prepare); // Flush out the cached pointer to the oplog. // Used by the closeDatabase command to ensure we don't cache closed things. diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 7748a68f18e..731bde4e43b 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -257,6 +257,11 @@ bool OplogEntry::isCrudOpType() const { return isCrudOpType(getOpType()); } +bool OplogEntry::shouldPrepare() const { + invariant(getCommandType() == OplogEntry::CommandType::kApplyOps); + return getPrepare() && *getPrepare(); +} + BSONElement OplogEntry::getIdElement() const { invariant(isCrudOpType()); if (getOpType() == OpTypeEnum::kUpdate) { diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 270ddd01ae3..0c632dc3500 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -115,6 +115,11 @@ public: bool isCrudOpType() const; /** + * Returns if the operation should be prepared. Must be called on an 'applyOps' entry. + */ + bool shouldPrepare() const; + + /** * Returns the _id of the document being modified. Must be called on CRUD ops. */ BSONElement getIdElement() const; diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl index fa31e20d790..9d0af006a97 100644 --- a/src/mongo/db/repl/oplog_entry.idl +++ b/src/mongo/db/repl/oplog_entry.idl @@ -139,3 +139,7 @@ structs: optional: true description: "The optime of another oplog entry that contains the document after an update was applied." + prepare: + type: bool + optional: true + description: "Specifies that this operation should be put into a 'prepare' state" diff --git a/src/mongo/db/repl/oplog_test.cpp b/src/mongo/db/repl/oplog_test.cpp index 9a38b77d489..93738bb47ed 100644 --- a/src/mongo/db/repl/oplog_test.cpp +++ b/src/mongo/db/repl/oplog_test.cpp @@ -110,7 +110,8 @@ TEST_F(OplogTest, LogOpReturnsOpTimeOnSuccessfulInsertIntoOplogCollection) { Date_t::now(), {}, kUninitializedStmtId, - {}); + {}, + false /* prepare */); ASSERT_FALSE(opTime.isNull()); wunit.commit(); } @@ -222,8 +223,18 @@ OpTime _logOpNoopWithMsg(OperationContext* opCtx, // logOp() must be called while holding lock because ephemeralForTest storage engine does not // support concurrent updates to its internal state. const auto msgObj = BSON("msg" << nss.ns()); - auto opTime = logOp( - opCtx, "n", nss, {}, msgObj, nullptr, false, Date_t::now(), {}, kUninitializedStmtId, {}); + auto opTime = logOp(opCtx, + "n", + nss, + {}, + msgObj, + nullptr, + false, + Date_t::now(), + {}, + kUninitializedStmtId, + {}, + false /* prepare */); ASSERT_FALSE(opTime.isNull()); ASSERT(opTimeNssMap->find(opTime) == opTimeNssMap->end()) diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 11e277188b2..b7370e04b02 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -572,6 +572,10 @@ void fillWriterVectors(OperationContext* opCtx, // Extract applyOps operations and fill writers with extracted operations using this // function. if (op.isCommand() && op.getCommandType() == OplogEntry::CommandType::kApplyOps) { + if (op.shouldPrepare()) { + // TODO (SERVER-35307) mark operations as needing prepare. + continue; + } try { derivedOps->emplace_back(ApplyOps::extractOperations(op)); fillWriterVectors( diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index ac037085450..0c318ba7605 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -281,7 +281,8 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, *oplogEntry.getWallClockTime(), sessionInfo, stmtId, - oplogLink); + oplogLink, + false /* prepare */); auto oplogOpTime = result.oplogTime; uassert(40633, diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index d0ab4bbcc8b..95457f11663 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -722,6 +722,23 @@ void Session::TxnResources::release(OperationContext* opCtx) { readConcernArgs = _readConcernArgs; } +Session::SideTransactionBlock::SideTransactionBlock(OperationContext* opCtx) : _opCtx(opCtx) { + if (_opCtx->getWriteUnitOfWork()) { + // This must be done under the client lock, since we are modifying '_opCtx'. + stdx::lock_guard<Client> clientLock(*_opCtx->getClient()); + _txnResources = Session::TxnResources(_opCtx); + } +} + +Session::SideTransactionBlock::~SideTransactionBlock() { + if (_txnResources) { + // Restore the transaction state onto '_opCtx'. This must be done under the + // client lock, since we are modifying '_opCtx'. + stdx::lock_guard<Client> clientLock(*_opCtx->getClient()); + _txnResources->release(_opCtx); + } +} + void Session::stashTransactionResources(OperationContext* opCtx) { if (opCtx->getClient()->isInDirectClient()) { return; diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index 793d5dec814..c2d29abd55e 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -107,6 +107,25 @@ public: WriteUnitOfWork::RecoveryUnitState _ruState; }; + /** + * An RAII object that stashes `TxnResouces` from the `opCtx` onto the stack. At destruction + * it unstashes the `TxnResources` back onto the `opCtx`. + */ + class SideTransactionBlock { + public: + SideTransactionBlock(OperationContext* opCtx); + ~SideTransactionBlock(); + + // Rule of 5: because we have a class-defined destructor, we need to explictly specify + // the move operator and move assignment operator. + SideTransactionBlock(SideTransactionBlock&&) = default; + SideTransactionBlock& operator=(SideTransactionBlock&&) = default; + + private: + boost::optional<Session::TxnResources> _txnResources; + OperationContext* _opCtx; + }; + using CommittedStatementTimestampMap = stdx::unordered_map<StmtId, repl::OpTime>; using CursorKillFunction = std::function<size_t(OperationContext*, LogicalSessionId, TxnNumber)>; @@ -305,7 +324,8 @@ public: /** * Returns whether we are in a multi-document transaction, which means we have an active - * transaction which has autoCommit:false and has not been committed or aborted. + * transaction which has autoCommit:false and has not been committed or aborted. It is possible + * that the current transaction is stashed onto the stack via a `SideTransactionBlock`. */ bool inMultiDocumentTransaction() const { stdx::lock_guard<stdx::mutex> lk(_mutex); diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index 38f4a9e2563..a85e37899ce 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -126,7 +126,8 @@ protected: Date_t::now(), osi, stmtId, - link); + link, + false /* prepare */); } void bumpTxnNumberFromDifferentOpCtx(Session* session, TxnNumber newTxnNum) { @@ -526,7 +527,8 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { wallClockTime, osi, 1, - {}); + {}, + false /* prepare */); session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {1}, opTime, wallClockTime); wuow.commit(); @@ -552,7 +554,8 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { wallClockTime, osi, kIncompleteHistoryStmtId, - link); + link, + false /* prepare */); session.onWriteOpCompletedOnPrimary( opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime); diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h index 87d46bef8a9..45d72ff7c44 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h @@ -76,6 +76,8 @@ public: virtual void setOrderedCommit(bool orderedCommit) {} + virtual void setPrepareTimestamp(Timestamp) {} + private: typedef std::shared_ptr<Change> ChangePtr; typedef std::vector<ChangePtr> Changes; |