diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-08-14 21:47:00 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-09-13 16:32:15 -0400 |
commit | 022c7d50ac5f7f62fd4e91df513baa56d011fc80 (patch) | |
tree | 091e7b4fe824bc4897f5fd9a29d29d14e277b3b3 | |
parent | db3d9907d3934b34395a5b6f2f358994b9da5d99 (diff) | |
download | mongo-022c7d50ac5f7f62fd4e91df513baa56d011fc80.tar.gz |
SERVER-35307 Support prepared transactions on secondaries and immediately abort them.
20 files changed, 316 insertions, 128 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml index 2b32e5a8584..59b198cf104 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml @@ -74,13 +74,23 @@ selector: - jstests/core/txns/transactions_profiling.js # The downstream syncing node affects the top output. - jstests/core/top.js - # TODO (SERVER-35865): Unblacklist when we correctly write and apply 'commitTransaction' oplog - # entries. + # TODO (SERVER-36492): Unblacklist when we correctly reconstruct prepared transactions after + # initial sync. + - jstests/core/txns/abort_prepared_transaction.js + - jstests/core/txns/commit_prepared_transaction_errors.js + - jstests/core/txns/empty_prepare.js + - jstests/core/txns/no_new_transactions_when_prepared_transaction_in_progress.js + - jstests/core/txns/prepare_conflict.js + - jstests/core/txns/prepare_conflict_read_concern_behavior.js + - jstests/core/txns/prepare_prepared_transaction.js + - jstests/core/txns/statement_ids_accepted.js + # TODO (SERVER-35865): Unblacklist when we also correctly write and apply 'commitTransaction' + # oplog entries, besides SERVER-36492. - jstests/core/txns/commit_prepared_transaction.js - jstests/core/txns/disallow_operations_on_prepared_transaction.js - jstests/core/txns/ensure_active_txn_for_prepare_transaction.js - - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js - jstests/core/txns/prepare_requires_fcv42.js + - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js run_hook_interval: &run_hook_interval 20 executor: diff --git a/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml index 61f2f2050fe..a656b0dbcf7 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml @@ -11,13 +11,25 @@ selector: - jstests/core/capped_update.js # Having duplicate namespaces is not supported and will cause initial sync to fail. - jstests/core/views/duplicate_ns.js - # TODO (SERVER-35865): Unblacklist when we correctly write and apply 'commitTransaction' oplog - # entries. + # TODO (SERVER-36492): Unblacklist when we correctly reconstruct prepared transactions after + # initial sync. + - jstests/core/txns/abort_prepared_transaction.js + - jstests/core/txns/commit_prepared_transaction_errors.js + - jstests/core/txns/empty_prepare.js + - jstests/core/txns/no_new_transactions_when_prepared_transaction_in_progress.js + - jstests/core/txns/prepare_conflict.js + - jstests/core/txns/prepare_conflict_read_concern_behavior.js + - jstests/core/txns/prepare_prepared_transaction.js + - jstests/core/txns/statement_ids_accepted.js + # TODO (SERVER-35865): Unblacklist when we also correctly write and apply 'commitTransaction' + # oplog entries, besides SERVER-36492. + - jstests/core/txns/commit_prepared_transaction.js + - jstests/core/txns/disallow_operations_on_prepared_transaction.js - jstests/core/txns/commit_prepared_transaction.js - jstests/core/txns/disallow_operations_on_prepared_transaction.js - jstests/core/txns/ensure_active_txn_for_prepare_transaction.js - - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js - jstests/core/txns/prepare_requires_fcv42.js + - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js run_hook_interval: &run_hook_interval 20 executor: diff --git a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml index b20eb68ce54..77576a782d1 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml @@ -14,13 +14,23 @@ selector: # true. This causes the test to hang because the secondary is running with the "rsSyncStopApply" # failpoint enabled. - jstests/core/geo_update_btree.js - # TODO (SERVER-35865): Unblacklist when we correctly write and apply 'commitTransaction' oplog - # entries. + # TODO (SERVER-35879): Unblacklist when we correctly reconstruct prepared transactions on + # startup. + - jstests/core/txns/abort_prepared_transaction.js + - jstests/core/txns/commit_prepared_transaction_errors.js + - jstests/core/txns/empty_prepare.js + - jstests/core/txns/no_new_transactions_when_prepared_transaction_in_progress.js + - jstests/core/txns/prepare_conflict.js + - jstests/core/txns/prepare_conflict_read_concern_behavior.js + - jstests/core/txns/prepare_prepared_transaction.js + - jstests/core/txns/statement_ids_accepted.js + # TODO (SERVER-35865): Unblacklist when we also correctly write and apply 'commitTransaction' + # oplog entries, besides SERVER-35879. - jstests/core/txns/commit_prepared_transaction.js - jstests/core/txns/disallow_operations_on_prepared_transaction.js - jstests/core/txns/ensure_active_txn_for_prepare_transaction.js - - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js - jstests/core/txns/prepare_requires_fcv42.js + - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js executor: archive: diff --git a/src/mongo/db/commands/apply_ops_cmd.cpp b/src/mongo/db/commands/apply_ops_cmd.cpp index 599f3a52902..b6dabaabe58 100644 --- a/src/mongo/db/commands/apply_ops_cmd.cpp +++ b/src/mongo/db/commands/apply_ops_cmd.cpp @@ -262,7 +262,7 @@ public: } auto applyOpsStatus = CommandHelpers::appendCommandStatusNoThrow( - result, repl::applyOps(opCtx, dbname, cmdObj, oplogApplicationMode, &result)); + result, repl::applyOps(opCtx, dbname, cmdObj, oplogApplicationMode, {}, &result)); return applyOpsStatus; } diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index df2c4418f59..99b4034c52f 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -940,6 +940,10 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, try { // We are only given an oplog slot for prepared transactions. auto prepare = !prepareOplogSlot.opTime.isNull(); + if (prepare) { + // TODO: SERVER-36814 Remove "prepare" field on applyOps. + applyOpsBuilder.append("prepare", true); + } auto applyOpCmd = applyOpsBuilder.done(); auto times = replLogApplyOps( opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, prepareOplogSlot); @@ -1014,6 +1018,11 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, const OplogSl invariant(!prepareOpTime.opTime.isNull()); auto stmts = txnParticipant->endTransactionAndRetrieveOperations(opCtx); + // Don't write oplog entry on secondaries. + if (!opCtx->writesAreReplicated()) { + return; + } + // We write the oplog entry in a side transaction so that we do not commit the now-prepared // transaction. // We write an empty 'applyOps' entry if there were no writes to choose a prepare timestamp @@ -1035,6 +1044,10 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, const OplogSl } void OpObserverImpl::onTransactionAbort(OperationContext* opCtx) { + if (!opCtx->writesAreReplicated()) { + return; + } + invariant(opCtx->getTxnNumber()); Session* const session = OperationContextSession::get(opCtx); invariant(session); diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 0d2ca89c7b2..f337a92ae9e 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -638,7 +638,9 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) { << "ui" << uuid1 << "o" - << BSON("_id" << 0)))); + << BSON("_id" << 0))) + << "prepare" + << true); ASSERT_BSONOBJ_EQ(oExpected, o); ASSERT(oplogEntry.getPrepare()); ASSERT(oplogEntry.getPrepare().get()); @@ -699,7 +701,9 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) { << "ui" << uuid << "o" - << doc))); + << doc)) + << "prepare" + << true); ASSERT_BSONOBJ_EQ(oExpected, o); ASSERT(oplogEntry.getPrepare()); } @@ -757,7 +761,7 @@ TEST_F(OpObserverTransactionTest, PreparingEmptyTransactionLogsEmptyApplyOps) { checkCommonFields(oplogEntryObj); OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj)); auto o = oplogEntry.getObject(); - auto oExpected = BSON("applyOps" << BSONArray()); + auto oExpected = BSON("applyOps" << BSONArray() << "prepare" << true); ASSERT_BSONOBJ_EQ(oExpected, o); ASSERT(oplogEntry.getPrepare()); ASSERT(oplogEntry.getPrepare().get()); diff --git a/src/mongo/db/operation_context_session_mongod.cpp b/src/mongo/db/operation_context_session_mongod.cpp index eae70542f0b..c2163b89bff 100644 --- a/src/mongo/db/operation_context_session_mongod.cpp +++ b/src/mongo/db/operation_context_session_mongod.cpp @@ -67,4 +67,21 @@ OperationContextSessionMongod::OperationContextSessionMongod(OperationContext* o } } +OperationContextSessionMongodWithoutRefresh::OperationContextSessionMongodWithoutRefresh( + OperationContext* opCtx) + : _operationContextSession(opCtx, true /* checkout */) { + invariant(!opCtx->getClient()->isInDirectClient()); + auto session = OperationContextSession::get(opCtx); + invariant(session); + + auto clientTxnNumber = *opCtx->getTxnNumber(); + // Session is refreshed, but the transaction participant isn't. + session->refreshFromStorageIfNeeded(opCtx); + session->beginOrContinueTxn(opCtx, clientTxnNumber); + + auto txnParticipant = TransactionParticipant::get(opCtx); + invariant(txnParticipant); + txnParticipant->beginTransactionUnconditionally(clientTxnNumber); +} + } // namespace mongo diff --git a/src/mongo/db/operation_context_session_mongod.h b/src/mongo/db/operation_context_session_mongod.h index a2f7e0fa78a..9c0a13e4e0e 100644 --- a/src/mongo/db/operation_context_session_mongod.h +++ b/src/mongo/db/operation_context_session_mongod.h @@ -53,4 +53,19 @@ private: OperationContextSession _operationContextSession; }; +/** + * Similar to OperationContextSessionMongod, but this starts a new transaction unconditionally + * without refreshing the state from disk. The session reloads the state from disk but + * the transaction participant will not use the on-disk state to refresh its in-memory state. + * + * This is used for transaction secondary application and recovery. + */ +class OperationContextSessionMongodWithoutRefresh { +public: + OperationContextSessionMongodWithoutRefresh(OperationContext* opCtx); + +private: + OperationContextSession _operationContextSession; +}; + } // namespace mongo diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index 8f3f25b8514..ee458fa7cb3 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -50,6 +50,7 @@ #include "mongo/db/query/collation/collation_spec.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" +#include "mongo/db/transaction_participant.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -259,6 +260,40 @@ Status _applyOps(OperationContext* opCtx, return Status::OK(); } +Status _applyPrepareTransaction(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& applyOpCmd, + const ApplyOpsCommandInfo& info, + repl::OplogApplication::Mode oplogApplicationMode, + BSONObjBuilder* result, + int* numApplied, + BSONArrayBuilder* opsBuilder, + const OpTime& optime) { + // Only run on secondary. + uassert(50945, + "applyOps with prepared flag is only used internally by secondaries.", + oplogApplicationMode == repl::OplogApplication::Mode::kSecondary); + uassert( + 50946, + "applyOps with prepared must only include CRUD operations and cannot have precondition.", + !info.getPreCondition() && info.areOpsCrudOnly()); + + // Session has been checked out by sync_tail. + auto transaction = TransactionParticipant::get(opCtx); + invariant(transaction); + + transaction->unstashTransactionResources(opCtx, "prepareTransaction"); + + // Abort transaction unconditionally for now. + // TODO: SERVER-35875 / SERVER-35877 Abort or commit transactions on secondaries accordingly. + ScopeGuard abortGuard = MakeGuard([&] { transaction->abortActiveTransaction(opCtx); }); + + _applyOps( + opCtx, dbName, applyOpCmd, info, oplogApplicationMode, result, numApplied, opsBuilder); + transaction->prepareTransaction(opCtx, optime); + return Status::OK(); +} + Status _checkPrecondition(OperationContext* opCtx, const std::vector<BSONObj>& preConditions, BSONObjBuilder* result) { @@ -340,6 +375,7 @@ Status applyOps(OperationContext* opCtx, const std::string& dbName, const BSONObj& applyOpCmd, repl::OplogApplication::Mode oplogApplicationMode, + boost::optional<OpTime> optime, BSONObjBuilder* result) { auto info = ApplyOpsCommandInfo::parse(applyOpCmd); @@ -347,8 +383,9 @@ Status applyOps(OperationContext* opCtx, boost::optional<Lock::DBLock> dbWriteLock; // There's only one case where we are allowed to take the database lock instead of the global - // lock - no preconditions; only CRUD ops; and non-atomic mode. - if (!info.getPreCondition() && info.areOpsCrudOnly() && !info.getAllowAtomic()) { + // lock - no preconditions; only CRUD ops; non-atomic mode; and not for transaction prepare. + if (!info.getPreCondition() && info.areOpsCrudOnly() && !info.getAllowAtomic() && + !info.getPrepare()) { dbWriteLock.emplace(opCtx, dbName, MODE_IX); } else { globalWriteLock.emplace(opCtx); @@ -371,6 +408,21 @@ Status applyOps(OperationContext* opCtx, } int numApplied = 0; + + // Apply prepare transaction operation if "prepare" is true. + if (info.getPrepare().get_value_or(false)) { + invariant(optime); + return _applyPrepareTransaction(opCtx, + dbName, + applyOpCmd, + info, + oplogApplicationMode, + result, + &numApplied, + nullptr, + *optime); + } + if (!info.isAtomic()) { return _applyOps( opCtx, dbName, applyOpCmd, info, oplogApplicationMode, result, &numApplied, nullptr); diff --git a/src/mongo/db/repl/apply_ops.h b/src/mongo/db/repl/apply_ops.h index 45400998c9e..6c53fccad49 100644 --- a/src/mongo/db/repl/apply_ops.h +++ b/src/mongo/db/repl/apply_ops.h @@ -89,11 +89,14 @@ private: * the given command object. This function may be called as part of a direct user invocation of the * 'applyOps' command, or as part of the application of an 'applyOps' oplog operation. In either * case, the mode can be set to determine how the internal ops are executed. + * + * For oplog application, the optime of the oplog entry will be given as the "optime" argument. */ Status applyOps(OperationContext* opCtx, const std::string& dbName, const BSONObj& applyOpCmd, repl::OplogApplication::Mode oplogApplicationMode, + boost::optional<OpTime> optime, BSONObjBuilder* result); } // namespace repl diff --git a/src/mongo/db/repl/apply_ops.idl b/src/mongo/db/repl/apply_ops.idl index 947aa543e46..28e81a39d6f 100644 --- a/src/mongo/db/repl/apply_ops.idl +++ b/src/mongo/db/repl/apply_ops.idl @@ -66,3 +66,8 @@ structs: description: "applyOps supports checking the documents of existing collections before proceeding to execute the given operations. This flag is set to true if the 'preCondition' option is provided." + + prepare: + type: bool + optional: true + description: "Specifies that this operation should be put into a 'prepare' state" diff --git a/src/mongo/db/repl/apply_ops_test.cpp b/src/mongo/db/repl/apply_ops_test.cpp index ada452132ea..8b5179609bc 100644 --- a/src/mongo/db/repl/apply_ops_test.cpp +++ b/src/mongo/db/repl/apply_ops_test.cpp @@ -154,7 +154,7 @@ TEST_F(ApplyOpsTest, CommandInNestedApplyOpsReturnsSuccess) { << BSON("applyOps" << BSON_ARRAY(innerCmdObj))); auto cmdObj = BSON("applyOps" << BSON_ARRAY(innerApplyOpsObj)); - ASSERT_OK(applyOps(opCtx.get(), nss.db().toString(), cmdObj, mode, &resultBuilder)); + ASSERT_OK(applyOps(opCtx.get(), nss.db().toString(), cmdObj, mode, {}, &resultBuilder)); ASSERT_BSONOBJ_EQ({}, _opObserver->onApplyOpsCmdObj); } @@ -184,7 +184,7 @@ TEST_F(ApplyOpsTest, InsertInNestedApplyOpsReturnsSuccess) { auto cmdObj = BSON("applyOps" << BSON_ARRAY(innerApplyOpsObj)); ASSERT_OK(_storage->createCollection(opCtx.get(), nss, options)); - ASSERT_OK(applyOps(opCtx.get(), nss.db().toString(), cmdObj, mode, &resultBuilder)); + ASSERT_OK(applyOps(opCtx.get(), nss.db().toString(), cmdObj, mode, {}, &resultBuilder)); ASSERT_BSONOBJ_EQ(BSON("applyOps" << BSON_ARRAY(innerCmdObj)), _opObserver->onApplyOpsCmdObj); } @@ -193,7 +193,7 @@ TEST_F(ApplyOpsTest, AtomicApplyOpsWithNoOpsReturnsSuccess) { auto mode = OplogApplication::Mode::kApplyOpsCmd; BSONObjBuilder resultBuilder; auto cmdObj = BSON("applyOps" << BSONArray()); - ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder)); + ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder)); ASSERT_BSONOBJ_EQ(cmdObj, _opObserver->onApplyOpsCmdObj); } @@ -229,7 +229,7 @@ TEST_F(ApplyOpsTest, auto cmdObj = makeApplyOpsWithInsertOperation(nss, boost::none, documentToInsert); BSONObjBuilder resultBuilder; ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, - applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder)); + applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder)); auto result = resultBuilder.obj(); auto status = getStatusFromApplyOpsResult(result); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status); @@ -250,7 +250,7 @@ TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithUuidIntoCollectionWithUuid) { auto documentToInsert = BSON("_id" << 0); auto cmdObj = makeApplyOpsWithInsertOperation(nss, uuid, documentToInsert); BSONObjBuilder resultBuilder; - ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder)); + ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder)); ASSERT_BSONOBJ_EQ(cmdObj, _opObserver->onApplyOpsCmdObj); } @@ -273,7 +273,7 @@ TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithUuidIntoCollectionWithOtherUuid) { auto cmdObj = makeApplyOpsWithInsertOperation(nss, applyOpsUuid, documentToInsert); BSONObjBuilder resultBuilder; ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, - applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder)); + applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder)); auto result = resultBuilder.obj(); auto status = getStatusFromApplyOpsResult(result); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status); @@ -293,7 +293,7 @@ TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithoutUuidIntoCollectionWithUuid) { auto documentToInsert = BSON("_id" << 0); auto cmdObj = makeApplyOpsWithInsertOperation(nss, boost::none, documentToInsert); BSONObjBuilder resultBuilder; - ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder)); + ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder)); // Insert operation provided by caller did not contain collection uuid but applyOps() should add // the uuid to the oplog entry. @@ -330,6 +330,7 @@ TEST_F(ApplyOpsTest, ApplyOpsPropagatesOplogApplicationMode) { nss.coll().toString(), cmdObj, OplogApplication::Mode::kInitialSync, + {}, &resultBuilder)); ASSERT_EQUALS(1, countLogLinesContaining("oplog application mode: InitialSync")); @@ -340,6 +341,7 @@ TEST_F(ApplyOpsTest, ApplyOpsPropagatesOplogApplicationMode) { nss.coll().toString(), cmdObj, OplogApplication::Mode::kSecondary, + {}, &resultBuilder)); ASSERT_EQUALS(1, countLogLinesContaining("oplog application mode: Secondary")); @@ -502,7 +504,7 @@ TEST_F(ApplyOpsTest, ApplyOpsFailsToDropAdmin) { auto dropDatabaseCmdObj = BSON("applyOps" << BSON_ARRAY(dropDatabaseOp)); BSONObjBuilder resultBuilder; auto status = - applyOps(opCtx.get(), nss.db().toString(), dropDatabaseCmdObj, mode, &resultBuilder); + applyOps(opCtx.get(), nss.db().toString(), dropDatabaseCmdObj, mode, {}, &resultBuilder); ASSERT_EQUALS(ErrorCodes::IllegalOperation, status); } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 247608b2a6a..b49297eb3e4 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -963,7 +963,7 @@ std::map<std::string, ApplyOpMetadata> opsMap = { const OpTime& opTime, OplogApplication::Mode mode) -> Status { BSONObjBuilder resultWeDontCareAbout; - return applyOps(opCtx, nsToDatabase(ns), cmd, mode, &resultWeDontCareAbout); + return applyOps(opCtx, nsToDatabase(ns), cmd, mode, opTime, &resultWeDontCareAbout); }}}, {"convertToCapped", {[](OperationContext* opCtx, @@ -985,14 +985,20 @@ std::map<std::string, ApplyOpMetadata> opsMap = { return emptyCapped(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd)); }, {ErrorCodes::NamespaceNotFound}}}, + {"commitTransaction", + {[](OperationContext* opCtx, + const char* ns, + const BSONElement& ui, + BSONObj& cmd, + const OpTime& opTime, + OplogApplication::Mode mode) -> Status { return Status::OK(); }}}, {"abortTransaction", {[](OperationContext* opCtx, const char* ns, const BSONElement& ui, BSONObj& cmd, const OpTime& opTime, - OplogApplication::Mode mode) -> Status { return Status::OK(); }, - {}}}, + OplogApplication::Mode mode) -> Status { return Status::OK(); }}}, }; } // namespace @@ -1561,30 +1567,32 @@ Status applyCommand_inlock(OperationContext* opCtx, } } - const bool assignCommandTimestamp = [opCtx, mode] { + const bool assignCommandTimestamp = [opCtx, mode, &op] { const auto replMode = ReplicationCoordinator::get(opCtx)->getReplicationMode(); if (opCtx->writesAreReplicated()) { // We do not assign timestamps on replicated writes since they will get their oplog // timestamp once they are logged. return false; - } else { - switch (replMode) { - case ReplicationCoordinator::modeReplSet: { - // The 'applyOps' command never logs 'applyOps' oplog entries with nested - // command operations, so this code will never be run from inside the 'applyOps' - // command on secondaries. Thus, the timestamps in the command oplog - // entries are always real timestamps from this oplog and we should - // timestamp our writes with them. - return true; - } - case ReplicationCoordinator::modeNone: { - // Only assign timestamps on standalones during replication recovery when - // started with 'recoverFromOplogAsStandalone'. - return mode == OplogApplication::Mode::kRecovering; - } + } + + switch (replMode) { + case ReplicationCoordinator::modeReplSet: { + // The 'applyOps' command never logs 'applyOps' oplog entries with nested + // command operations, so this code will never be run from inside the 'applyOps' + // command on secondaries. Thus, the timestamps in the command oplog + // entries are always real timestamps from this oplog and we should + // timestamp our writes with them. + // + // However, if "prepare" is specified, don't assign commit timestamp. + return !op.getBoolField("prepare"); + } + case ReplicationCoordinator::modeNone: { + // Only assign timestamps on standalones during replication recovery when + // started with 'recoverFromOplogAsStandalone'. + return mode == OplogApplication::Mode::kRecovering; } - MONGO_UNREACHABLE; } + MONGO_UNREACHABLE; }(); invariant(!assignCommandTimestamp || !opTime.isNull(), str::stream() << "Oplog entry did not have 'ts' field when expected: " << redact(op)); diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index ff2ffecb51d..cf7838a7f19 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -262,7 +262,6 @@ bool OplogEntry::isCrudOpType() const { } bool OplogEntry::shouldPrepare() const { - invariant(getCommandType() == OplogEntry::CommandType::kApplyOps); return getPrepare() && *getPrepare(); } diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index ae3717efbca..5c57adde2c5 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -55,6 +55,7 @@ #include "mongo/db/logical_session_id.h" #include "mongo/db/multi_key_path_tracker.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context_session_mongod.h" #include "mongo/db/query/query_knobs.h" #include "mongo/db/repl/applier_helpers.h" #include "mongo/db/repl/apply_ops.h" @@ -534,11 +535,8 @@ void fillWriterVectors(OperationContext* opCtx, // Extract applyOps operations and fill writers with extracted operations using this // function. - if (op.isCommand() && op.getCommandType() == OplogEntry::CommandType::kApplyOps) { - if (op.shouldPrepare()) { - // TODO (SERVER-35307) mark operations as needing prepare. - continue; - } + if (op.isCommand() && op.getCommandType() == OplogEntry::CommandType::kApplyOps && + !op.shouldPrepare()) { try { derivedOps->emplace_back(ApplyOps::extractOperations(op)); @@ -930,7 +928,8 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, // Oplog entries on 'system.views' should also be processed one at a time. View catalog // immediately reflects changes for each oplog entry so we can see inconsistent view catalog if // multiple oplog entries on 'system.views' are being applied out of the original order. - if ((entry.isCommand() && entry.getCommandType() != OplogEntry::CommandType::kApplyOps) || + if ((entry.isCommand() && + (entry.getCommandType() != OplogEntry::CommandType::kApplyOps || entry.shouldPrepare())) || entry.getNamespace().isSystemDotViews()) { if (ops->getCount() == 1) { // apply commands one-at-a-time @@ -1165,6 +1164,22 @@ Status multiSyncApply(OperationContext* opCtx, // If we didn't create a group, try to apply the op individually. try { + // The write on transaction table may be applied concurrently, so refreshing state + // from disk may read that write, causing starting a new transaction on an existing + // txnNumber. Thus, we start a new transaction without refreshing state from disk. + boost::optional<OperationContextSessionMongodWithoutRefresh> sessionTxnState; + if (entry.shouldPrepare()) { + // Prepare transaction is in its own batch. We cannot modify the opCtx for other + // ops. + // The update on transaction table may be scheduled to the same writer. + invariant(ops->size() <= 2); + invariant(entry.getSessionId()); + invariant(entry.getTxnNumber()); + opCtx->setLogicalSessionId(*entry.getSessionId()); + opCtx->setTxnNumber(*entry.getTxnNumber()); + // Check out the session, with autoCommit = false and startMultiDocTxn = true. + sessionTxnState.emplace(opCtx); + } const Status status = SyncTail::syncApply(opCtx, entry.raw, oplogApplicationMode); if (!status.isOK()) { diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp index 14d61155242..24e11e04137 100644 --- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp +++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp @@ -106,7 +106,7 @@ public: } // Add prepareTimestamp to the command response. - auto timestamp = txnParticipant->prepareTransaction(opCtx); + auto timestamp = txnParticipant->prepareTransaction(opCtx, {}); result.append("prepareTimestamp", timestamp); return true; @@ -273,7 +273,7 @@ public: txnParticipant->abortActiveUnpreparedOrStashPreparedTransaction(opCtx); }); - txnParticipant->prepareTransaction(opCtx); + txnParticipant->prepareTransaction(opCtx, {}); txnParticipant->stashTransactionResources(opCtx); guard.Dismiss(); diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 1d9fd5e4321..7755e0b1af0 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -178,31 +178,6 @@ void TransactionParticipant::_continueMultiDocumentTransaction(WithLock wl, TxnN } void TransactionParticipant::_beginMultiDocumentTransaction(WithLock wl, TxnNumber txnNumber) { - // Servers in a sharded cluster can start a new transaction at the active transaction number to - // allow internal retries by routers on re-targeting errors, like StaleShardVersion or - // SnapshotTooOld. - if (txnNumber == _activeTxnNumber) { - uassert(ErrorCodes::ConflictingOperationInProgress, - "Only servers in a sharded cluster can start a new transaction at the active " - "transaction number", - serverGlobalParams.clusterRole != ClusterRole::None); - - // The active transaction number can only be reused if the transaction is not in a state - // that indicates it has been involved in a two phase commit. In normal operation this check - // should never fail. - // - // TODO SERVER-36639: Ensure the active transaction number cannot be reused if the - // transaction is in the abort after prepare state (or any state indicating the participant - // has been involved in a two phase commit). - const auto restartableStates = TransactionState::kInProgress | TransactionState::kAborted; - uassert(50911, - str::stream() << "Cannot start a transaction at given transaction number " - << txnNumber - << " a transaction with the same number is in state " - << _txnState.toString(), - _txnState.isInSet(wl, restartableStates)); - } - // Aborts any in-progress txns. _setNewTxnNumber(wl, txnNumber); _autoCommit = false; @@ -257,6 +232,36 @@ void TransactionParticipant::beginOrContinue(TxnNumber txnNumber, // earlier when parsing the request. invariant(*startTransaction); + // Servers in a sharded cluster can start a new transaction at the active transaction number to + // allow internal retries by routers on re-targeting errors, like StaleShardVersion or + // SnapshotTooOld. + if (txnNumber == _activeTxnNumber) { + uassert(ErrorCodes::ConflictingOperationInProgress, + "Only servers in a sharded cluster can start a new transaction at the active " + "transaction number", + serverGlobalParams.clusterRole != ClusterRole::None); + + // The active transaction number can only be reused if the transaction is not in a state + // that indicates it has been involved in a two phase commit. In normal operation this check + // should never fail. + // + // TODO SERVER-36639: Ensure the active transaction number cannot be reused if the + // transaction is in the abort after prepare state (or any state indicating the participant + // has been involved in a two phase commit). + const auto restartableStates = TransactionState::kInProgress | TransactionState::kAborted; + uassert(50911, + str::stream() << "Cannot start a transaction at given transaction number " + << txnNumber + << " a transaction with the same number is in state " + << _txnState.toString(), + _txnState.isInSet(lg, restartableStates)); + } + + _beginMultiDocumentTransaction(lg, txnNumber); +} + +void TransactionParticipant::beginTransactionUnconditionally(TxnNumber txnNumber) { + stdx::lock_guard<stdx::mutex> lg(_mutex); _beginMultiDocumentTransaction(lg, txnNumber); } @@ -522,7 +527,8 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx } } -Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx) { +Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx, + boost::optional<repl::OpTime> prepareOptime) { stdx::unique_lock<stdx::mutex> lk(_mutex); // Always check session's txnNumber and '_txnState', since they can be modified by // session kill and migration, which do not check out the session. @@ -534,6 +540,9 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx) { "cannot change transaction number while the session has a prepared transaction"}); ScopeGuard abortGuard = MakeGuard([&] { + // Prepare transaction on secondaries should always succeed. + invariant(!prepareOptime); + if (lk.owns_lock()) { lk.unlock(); } @@ -542,36 +551,41 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx) { _txnState.transitionTo(lk, TransactionState::kPrepared); - // Reserve an optime for the 'prepareTimestamp'. This will create a hole in the oplog and cause - // 'snapshot' and 'afterClusterTime' readers to block until this transaction is done being - // prepared. When the OplogSlotReserver goes out of scope and is destroyed, the - // storage-transaction it uses to keep the hole open will abort and the slot (and corresponding - // oplog hole) will vanish. - OplogSlotReserver oplogSlotReserver(opCtx); - const auto prepareOplogSlot = oplogSlotReserver.getReservedOplogSlot(); - const auto prepareTimestamp = prepareOplogSlot.opTime.getTimestamp(); - invariant(_prepareOpTime.isNull(), - str::stream() << "This transaction has already reserved a prepareOpTime at: " - << _prepareOpTime.toString()); - _prepareOpTime = prepareOplogSlot.opTime; - - if (MONGO_FAIL_POINT(hangAfterReservingPrepareTimestamp)) { - // This log output is used in js tests so please leave it. - log() << "transaction - hangAfterReservingPrepareTimestamp fail point " - "enabled. Blocking until fail point is disabled. Prepare OpTime: " - << prepareOplogSlot.opTime; - MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterReservingPrepareTimestamp); + boost::optional<OplogSlotReserver> oplogSlotReserver; + OplogSlot prepareOplogSlot; + if (prepareOptime) { + // On secondary, we just prepare the transaction and discard the buffered ops. + prepareOplogSlot = OplogSlot(*prepareOptime, 0); + } else { + // On primary, we reserve an optime, prepare the transaction and write the oplog entry. + // + // Reserve an optime for the 'prepareTimestamp'. This will create a hole in the oplog and + // cause 'snapshot' and 'afterClusterTime' readers to block until this transaction is done + // being prepared. When the OplogSlotReserver goes out of scope and is destroyed, the + // storage-transaction it uses to keep the hole open will abort and the slot (and + // corresponding oplog hole) will vanish. + oplogSlotReserver.emplace(opCtx); + prepareOplogSlot = oplogSlotReserver->getReservedOplogSlot(); + invariant(_prepareOpTime.isNull(), + str::stream() << "This transaction has already reserved a prepareOpTime at: " + << _prepareOpTime.toString()); + _prepareOpTime = prepareOplogSlot.opTime; + + if (MONGO_FAIL_POINT(hangAfterReservingPrepareTimestamp)) { + // This log output is used in js tests so please leave it. + log() << "transaction - hangAfterReservingPrepareTimestamp fail point " + "enabled. Blocking until fail point is disabled. Prepare OpTime: " + << prepareOplogSlot.opTime; + MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterReservingPrepareTimestamp); + } } - - opCtx->recoveryUnit()->setPrepareTimestamp(prepareTimestamp); + opCtx->recoveryUnit()->setPrepareTimestamp(prepareOplogSlot.opTime.getTimestamp()); opCtx->getWriteUnitOfWork()->prepare(); // We need to unlock the session to run the opObserver onTransactionPrepare, which calls back // into the session. lk.unlock(); - auto opObserver = opCtx->getServiceContext()->getOpObserver(); - invariant(opObserver); - opObserver->onTransactionPrepare(opCtx, prepareOplogSlot); + opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(opCtx, prepareOplogSlot); // After the oplog entry is written successfully, it is illegal to implicitly abort or fail. try { @@ -590,7 +604,7 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx) { fassertFailedWithStatus(50906, exceptionToStatus()); } - return prepareTimestamp; + return prepareOplogSlot.opTime.getTimestamp(); } void TransactionParticipant::addTransactionOperation(OperationContext* opCtx, diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index a32b8d095d0..48264e4157e 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -184,8 +184,11 @@ public: /** * Puts a transaction into a prepared state and returns the prepareTimestamp. + * + * On secondary, the "prepareTimestamp" will be given in the oplog. */ - Timestamp prepareTransaction(OperationContext* opCtx); + Timestamp prepareTransaction(OperationContext* opCtx, + boost::optional<repl::OpTime> prepareOptime); /** * Returns whether we are in a multi-document transaction, which means we have an active @@ -343,6 +346,8 @@ public: boost::optional<bool> autocommit, boost::optional<bool> startTransaction); + void beginTransactionUnconditionally(TxnNumber txnNumber); + static Status isValid(StringData dbName, StringData cmdName); void transitionToPreparedforTest() { diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index 7680c056a47..e3a64a45cdc 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -644,7 +644,7 @@ TEST_F(TxnParticipantTest, CommitTransactionSetsCommitTimestampOnPreparedTransac // The transaction machinery cannot store an empty locker. Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); txnParticipant->commitPreparedTransaction(opCtx(), commitTimestamp); ASSERT_EQ(commitTimestamp, actualCommitTimestamp); @@ -704,7 +704,7 @@ TEST_F(TxnParticipantTest, CommitTransactionWithoutCommitTimestampFailsOnPrepare // The transaction machinery cannot store an empty locker. Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_THROWS_CODE(txnParticipant->commitUnpreparedTransaction(opCtx()), AssertionException, ErrorCodes::InvalidOptions); @@ -718,7 +718,7 @@ TEST_F(TxnParticipantTest, CommitTransactionWithNullCommitTimestampFailsOnPrepar // The transaction machinery cannot store an empty locker. Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_THROWS_CODE(txnParticipant->commitPreparedTransaction(opCtx(), Timestamp()), AssertionException, ErrorCodes::InvalidOptions); @@ -933,7 +933,7 @@ TEST_F(TxnParticipantTest, ConcurrencyOfActivePreparedAbortAndArbitraryAbort) { txnParticipant->unstashTransactionResources(opCtx(), "insert"); ASSERT(txnParticipant->inMultiDocumentTransaction()); - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); // The transaction may be aborted without checking out the txnParticipant. txnParticipant->abortArbitraryTransaction(); @@ -956,7 +956,7 @@ TEST_F(TxnParticipantTest, ConcurrencyOfPrepareTransactionAndAbort) { ASSERT(txnParticipant->transactionIsAborted()); // A prepareTransaction() after an abort should uassert. - ASSERT_THROWS_CODE(txnParticipant->prepareTransaction(opCtx()), + ASSERT_THROWS_CODE(txnParticipant->prepareTransaction(opCtx(), {}), AssertionException, ErrorCodes::NoSuchTransaction); ASSERT_FALSE(_opObserver->transactionPrepared); @@ -983,7 +983,7 @@ TEST_F(TxnParticipantTest, KillSessionsDuringPrepareDoesNotAbortTransaction) { }; // Check that prepareTimestamp gets set. - auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx()); + auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); ASSERT(_opObserver->transactionPrepared); ASSERT_FALSE(txnParticipant->transactionIsAborted()); @@ -1003,7 +1003,7 @@ DEATH_TEST_F(TxnParticipantTest, AbortDuringPrepareIsFatal, "Invariant") { ASSERT(txnParticipant->transactionIsAborted()); }; - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); } TEST_F(TxnParticipantTest, ThrowDuringOnTransactionPrepareAbortsTransaction) { @@ -1014,7 +1014,7 @@ TEST_F(TxnParticipantTest, ThrowDuringOnTransactionPrepareAbortsTransaction) { _opObserver->onTransactionPrepareThrowsException = true; - ASSERT_THROWS_CODE(txnParticipant->prepareTransaction(opCtx()), + ASSERT_THROWS_CODE(txnParticipant->prepareTransaction(opCtx(), {}), AssertionException, ErrorCodes::OperationFailed); ASSERT_FALSE(_opObserver->transactionPrepared); @@ -1037,7 +1037,7 @@ TEST_F(TxnParticipantTest, KillSessionsDuringPreparedCommitDoesNotAbortTransacti ASSERT_FALSE(txnParticipant->transactionIsAborted()); }; - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); txnParticipant->commitPreparedTransaction(opCtx(), commitTimestamp); ASSERT(_opObserver->transactionCommitted); @@ -1063,7 +1063,7 @@ TEST_F(TxnParticipantTest, AbortDuringPreparedCommitDoesNotAbortTransaction) { ASSERT_FALSE(txnParticipant->transactionIsAborted()); }; - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); txnParticipant->commitPreparedTransaction(opCtx(), commitTimestamp); ASSERT(_opObserver->transactionCommitted); @@ -1079,7 +1079,7 @@ TEST_F(TxnParticipantTest, ThrowDuringPreparedOnTransactionCommitDoesNothing) { const auto commitTimestamp = Timestamp(1, 1); _opObserver->onTransactionCommitThrowsException = true; - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_THROWS_CODE(txnParticipant->commitPreparedTransaction(opCtx(), commitTimestamp), AssertionException, @@ -1141,7 +1141,7 @@ TEST_F(TxnParticipantTest, ConcurrencyOfPrepareTransactionAndMigration) { // A prepareTransaction() after a migration that bumps the active transaction number should // uassert. - ASSERT_THROWS_CODE(txnParticipant->prepareTransaction(opCtx()), + ASSERT_THROWS_CODE(txnParticipant->prepareTransaction(opCtx(), {}), AssertionException, ErrorCodes::ConflictingOperationInProgress); ASSERT_FALSE(_opObserver->transactionPrepared); @@ -1169,7 +1169,7 @@ TEST_F(TxnParticipantTest, KillSessionsDoesNotAbortPreparedTransactions) { }; // Check that prepareTimestamp gets set. - auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx()); + auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); txnParticipant->stashTransactionResources(opCtx()); @@ -1193,7 +1193,7 @@ TEST_F(TxnParticipantTest, TransactionTimeoutDoesNotAbortPreparedTransactions) { }; // Check that prepareTimestamp gets set. - auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx()); + auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); txnParticipant->stashTransactionResources(opCtx()); @@ -1218,7 +1218,7 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr }; // Check that prepareTimestamp gets set. - auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx()); + auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); txnParticipant->stashTransactionResources(opCtx()); @@ -1251,7 +1251,7 @@ TEST_F(TxnParticipantTest, CannotInsertInPreparedTransaction) { auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); txnParticipant->addTransactionOperation(opCtx(), operation); - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_THROWS_CODE(txnParticipant->unstashTransactionResources(opCtx(), "insert"), AssertionException, @@ -1269,7 +1269,7 @@ TEST_F(TxnParticipantTest, MigrationThrowsOnPreparedTransaction) { auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); txnParticipant->addTransactionOperation(opCtx(), operation); - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); // A migration may bump the active transaction number without checking out the session. auto higherTxnNum = *opCtx()->getTxnNumber() + 1; @@ -1289,7 +1289,7 @@ TEST_F(TxnParticipantTest, ImplictAbortDoesNotAbortPreparedTransaction) { auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); txnParticipant->addTransactionOperation(opCtx(), operation); - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); // The next command throws an exception and wants to abort the transaction. // This is a no-op. @@ -1305,7 +1305,7 @@ DEATH_TEST_F(TxnParticipantTest, AbortIsIllegalDuringCommittingPreparedTransacti txnParticipant->unstashTransactionResources(opCtx(), "insert"); auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); txnParticipant->addTransactionOperation(opCtx(), operation); - auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx()); + auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); auto sessionId = *opCtx()->getLogicalSessionId(); auto txnNum = *opCtx()->getTxnNumber(); @@ -1490,7 +1490,7 @@ protected: txnParticipant->unstashTransactionResources(opCtx(), "insert"); auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); txnParticipant->addTransactionOperation(opCtx(), operation); - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_THROWS_CODE( txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), autocommit, startTransaction), @@ -1656,7 +1656,7 @@ TEST_F(TxnParticipantTest, KillSessionsDuringPreparedAbortFails) { OperationContextSessionMongod opCtxSession(opCtx(), true, false, true); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction"); - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); auto originalFn = _opObserver->onTransactionAbortFn; _opObserver->onTransactionAbortFn = [&] { @@ -1678,7 +1678,7 @@ TEST_F(TxnParticipantTest, ActiveAbortSucceedsDuringPreparedAbort) { OperationContextSessionMongod opCtxSession(opCtx(), true, false, true); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction"); - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); auto sessionId = *opCtx()->getLogicalSessionId(); auto txnNumber = *opCtx()->getTxnNumber(); @@ -1707,7 +1707,7 @@ TEST_F(TxnParticipantTest, ThrowDuringPreparedOnTransactionAbortIsFatal) { OperationContextSessionMongod opCtxSession(opCtx(), true, false, true); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction"); - txnParticipant->prepareTransaction(opCtx()); + txnParticipant->prepareTransaction(opCtx(), {}); _opObserver->onTransactionAbortThrowsException = true; diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 553f693f7c6..62bd216202f 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -287,6 +287,7 @@ public: dbName, BSON("applyOps" << applyOpsList), repl::OplogApplication::Mode::kApplyOpsCmd, + {}, &result); if (!status.isOK()) { return status; @@ -305,6 +306,7 @@ public: dbName, BSON("applyOps" << applyOpsList << "allowAtomic" << false), repl::OplogApplication::Mode::kApplyOpsCmd, + {}, &result); if (!status.isOK()) { return status; @@ -645,6 +647,7 @@ public: << "o" << BSON("applyOps" << BSONArrayBuilder().obj())))), repl::OplogApplication::Mode::kApplyOpsCmd, + {}, &result)); } @@ -721,6 +724,7 @@ public: nss.db().toString(), fullCommand.done(), repl::OplogApplication::Mode::kApplyOpsCmd, + {}, &result)); @@ -2624,7 +2628,7 @@ public: } txnParticipant->unstashTransactionResources(_opCtx, "insert"); - txnParticipant->prepareTransaction(_opCtx); + txnParticipant->prepareTransaction(_opCtx, {}); txnParticipant->stashTransactionResources(_opCtx); { |