diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-10-04 17:13:37 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-10-15 22:31:10 -0400 |
commit | 596881a262f664f95f681cc07dd2098c8a223c39 (patch) | |
tree | d577697456803986936523bba507ecacd3421aab | |
parent | 44c2055cba5221732a319c27210025ce7ccab71e (diff) | |
download | mongo-596881a262f664f95f681cc07dd2098c8a223c39.tar.gz |
SERVER-35877 Push down session checkout into secondary application of transaction oplog entries.
-rw-r--r-- | src/mongo/db/commands/apply_ops_cmd.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/apply_ops.cpp | 85 | ||||
-rw-r--r-- | src/mongo/db/repl/apply_ops.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/apply_ops_test.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 7 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 3 |
9 files changed, 107 insertions, 79 deletions
diff --git a/src/mongo/db/commands/apply_ops_cmd.cpp b/src/mongo/db/commands/apply_ops_cmd.cpp index da7f16f886e..d9ecc23b99b 100644 --- a/src/mongo/db/commands/apply_ops_cmd.cpp +++ b/src/mongo/db/commands/apply_ops_cmd.cpp @@ -260,7 +260,7 @@ public: } auto applyOpsStatus = CommandHelpers::appendCommandStatusNoThrow( - result, repl::applyOps(opCtx, dbname, cmdObj, oplogApplicationMode, {}, &result)); + result, repl::applyOps(opCtx, dbname, cmdObj, oplogApplicationMode, &result)); return applyOpsStatus; } diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index 41e1dc4cdfa..8f638cd6fe7 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -47,6 +47,7 @@ #include "mongo/db/matcher/matcher.h" #include "mongo/db/op_observer.h" #include "mongo/db/operation_context.h" +#include "mongo/db/operation_context_session_mongod.h" #include "mongo/db/query/collation/collation_spec.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" @@ -278,14 +279,8 @@ Status _applyOps(OperationContext* opCtx, } Status _applyPrepareTransaction(OperationContext* opCtx, - const std::string& dbName, - const BSONObj& applyOpCmd, - const ApplyOpsCommandInfo& info, - repl::OplogApplication::Mode oplogApplicationMode, - BSONObjBuilder* result, - int* numApplied, - BSONArrayBuilder* opsBuilder, - const OpTime& optime) { + const repl::OplogEntry& entry, + repl::OplogApplication::Mode oplogApplicationMode) { // Wait until the end of recovery to apply the operations from the prepared transaction. if (oplogApplicationMode == OplogApplication::Mode::kRecovering) { if (!serverGlobalParams.enableMajorityReadConcern) { @@ -304,22 +299,42 @@ Status _applyPrepareTransaction(OperationContext* opCtx, // TODO: SERVER-36492 Only run on secondary until we support initial sync. invariant(oplogApplicationMode == repl::OplogApplication::Mode::kSecondary); + const auto info = ApplyOpsCommandInfo::parse(entry.getObject()); + invariant(info.getPrepare() && *info.getPrepare()); uassert( 50946, "applyOps with prepared must only include CRUD operations and cannot have precondition.", !info.getPreCondition() && info.areOpsCrudOnly()); - // Session has been checked out by sync_tail. - auto transaction = TransactionParticipant::get(opCtx); - invariant(transaction); + // Transaction operations are in its own batch, so we can modify their opCtx. + invariant(entry.getSessionId()); + invariant(entry.getTxnNumber()); + opCtx->setLogicalSessionId(*entry.getSessionId()); + opCtx->setTxnNumber(*entry.getTxnNumber()); + // The write on transaction table may be applied concurrently, so refreshing state + // from disk may read that write, causing starting a new transaction on an existing + // txnNumber. Thus, we start a new transaction without refreshing state from disk. + OperationContextSessionMongodWithoutRefresh sessionCheckout(opCtx); + auto transaction = TransactionParticipant::get(opCtx); transaction->unstashTransactionResources(opCtx, "prepareTransaction"); - auto status = _applyOps( - opCtx, dbName, applyOpCmd, info, oplogApplicationMode, result, numApplied, opsBuilder); + + // Apply the operations via applysOps functionality. + int numApplied = 0; + BSONObjBuilder resultWeDontCareAbout; + auto status = _applyOps(opCtx, + entry.getNss().db().toString(), + entry.getObject(), + info, + oplogApplicationMode, + &resultWeDontCareAbout, + &numApplied, + nullptr); if (!status.isOK()) { return status; } - transaction->prepareTransaction(opCtx, optime); + invariant(!entry.getOpTime().isNull()); + transaction->prepareTransaction(opCtx, entry.getOpTime()); transaction->stashTransactionResources(opCtx); return Status::OK(); } @@ -401,39 +416,41 @@ ApplyOpsCommandInfo::ApplyOpsCommandInfo(const BSONObj& applyOpCmd) } } +Status applyApplyOpsOplogEntry(OperationContext* opCtx, + const OplogEntry& entry, + repl::OplogApplication::Mode oplogApplicationMode) { + // Apply prepare transaction operation if "prepare" is true. + // The lock requirement of transaction operations should be the same as that on the primary, + // so we don't acquire the locks conservatively for them. + if (entry.shouldPrepare()) { + return _applyPrepareTransaction(opCtx, entry, oplogApplicationMode); + } + BSONObjBuilder resultWeDontCareAbout; + return applyOps(opCtx, + entry.getNss().db().toString(), + entry.getObject(), + oplogApplicationMode, + &resultWeDontCareAbout); +} + Status applyOps(OperationContext* opCtx, const std::string& dbName, const BSONObj& applyOpCmd, repl::OplogApplication::Mode oplogApplicationMode, - boost::optional<OpTime> optime, BSONObjBuilder* result) { auto info = ApplyOpsCommandInfo::parse(applyOpCmd); int numApplied = 0; - // Apply prepare transaction operation if "prepare" is true. - // The lock requirement of transaction operations should be the same as that on the primary, - // so we don't acquire the locks conservatively for them. - if (info.getPrepare().get_value_or(false)) { - invariant(optime); - return _applyPrepareTransaction(opCtx, - dbName, - applyOpCmd, - info, - oplogApplicationMode, - result, - &numApplied, - nullptr, - *optime); - } - boost::optional<Lock::GlobalWrite> globalWriteLock; boost::optional<Lock::DBLock> dbWriteLock; + uassert( + ErrorCodes::BadValue, "applyOps command can't have 'prepare' field", !info.getPrepare()); + // There's only one case where we are allowed to take the database lock instead of the global - // lock - no preconditions; only CRUD ops; non-atomic mode; and not for transaction prepare. - if (!info.getPreCondition() && info.areOpsCrudOnly() && !info.getAllowAtomic() && - !info.getPrepare()) { + // lock - no preconditions; only CRUD ops; and non-atomic mode. + if (!info.getPreCondition() && info.areOpsCrudOnly() && !info.getAllowAtomic()) { dbWriteLock.emplace(opCtx, dbName, MODE_IX); } else { globalWriteLock.emplace(opCtx); diff --git a/src/mongo/db/repl/apply_ops.h b/src/mongo/db/repl/apply_ops.h index 6c53fccad49..a2af668d4fb 100644 --- a/src/mongo/db/repl/apply_ops.h +++ b/src/mongo/db/repl/apply_ops.h @@ -96,8 +96,10 @@ Status applyOps(OperationContext* opCtx, const std::string& dbName, const BSONObj& applyOpCmd, repl::OplogApplication::Mode oplogApplicationMode, - boost::optional<OpTime> optime, BSONObjBuilder* result); +Status applyApplyOpsOplogEntry(OperationContext* opCtx, + const OplogEntry& entry, + repl::OplogApplication::Mode oplogApplicationMode); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/apply_ops_test.cpp b/src/mongo/db/repl/apply_ops_test.cpp index 84573838b5c..36d167c68ac 100644 --- a/src/mongo/db/repl/apply_ops_test.cpp +++ b/src/mongo/db/repl/apply_ops_test.cpp @@ -154,7 +154,7 @@ TEST_F(ApplyOpsTest, CommandInNestedApplyOpsReturnsSuccess) { << BSON("applyOps" << BSON_ARRAY(innerCmdObj))); auto cmdObj = BSON("applyOps" << BSON_ARRAY(innerApplyOpsObj)); - ASSERT_OK(applyOps(opCtx.get(), nss.db().toString(), cmdObj, mode, {}, &resultBuilder)); + ASSERT_OK(applyOps(opCtx.get(), nss.db().toString(), cmdObj, mode, &resultBuilder)); ASSERT_BSONOBJ_EQ({}, _opObserver->onApplyOpsCmdObj); } @@ -184,7 +184,7 @@ TEST_F(ApplyOpsTest, InsertInNestedApplyOpsReturnsSuccess) { auto cmdObj = BSON("applyOps" << BSON_ARRAY(innerApplyOpsObj)); ASSERT_OK(_storage->createCollection(opCtx.get(), nss, options)); - ASSERT_OK(applyOps(opCtx.get(), nss.db().toString(), cmdObj, mode, {}, &resultBuilder)); + ASSERT_OK(applyOps(opCtx.get(), nss.db().toString(), cmdObj, mode, &resultBuilder)); ASSERT_BSONOBJ_EQ(BSON("applyOps" << BSON_ARRAY(innerCmdObj)), _opObserver->onApplyOpsCmdObj); } @@ -193,7 +193,7 @@ TEST_F(ApplyOpsTest, AtomicApplyOpsWithNoOpsReturnsSuccess) { auto mode = OplogApplication::Mode::kApplyOpsCmd; BSONObjBuilder resultBuilder; auto cmdObj = BSON("applyOps" << BSONArray()); - ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder)); + ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder)); ASSERT_BSONOBJ_EQ(cmdObj, _opObserver->onApplyOpsCmdObj); } @@ -229,7 +229,7 @@ TEST_F(ApplyOpsTest, auto cmdObj = makeApplyOpsWithInsertOperation(nss, boost::none, documentToInsert); BSONObjBuilder resultBuilder; ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, - applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder)); + applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder)); auto result = resultBuilder.obj(); auto status = getStatusFromApplyOpsResult(result); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status); @@ -250,7 +250,7 @@ TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithUuidIntoCollectionWithUuid) { auto documentToInsert = BSON("_id" << 0); auto cmdObj = makeApplyOpsWithInsertOperation(nss, uuid, documentToInsert); BSONObjBuilder resultBuilder; - ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder)); + ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder)); ASSERT_BSONOBJ_EQ(cmdObj, _opObserver->onApplyOpsCmdObj); } @@ -273,7 +273,7 @@ TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithUuidIntoCollectionWithOtherUuid) { auto cmdObj = makeApplyOpsWithInsertOperation(nss, applyOpsUuid, documentToInsert); BSONObjBuilder resultBuilder; ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, - applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder)); + applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder)); auto result = resultBuilder.obj(); auto status = getStatusFromApplyOpsResult(result); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status); @@ -293,7 +293,7 @@ TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithoutUuidIntoCollectionWithUuid) { auto documentToInsert = BSON("_id" << 0); auto cmdObj = makeApplyOpsWithInsertOperation(nss, boost::none, documentToInsert); BSONObjBuilder resultBuilder; - ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder)); + ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder)); // Insert operation provided by caller did not contain collection uuid but applyOps() should add // the uuid to the oplog entry. @@ -330,7 +330,6 @@ TEST_F(ApplyOpsTest, ApplyOpsPropagatesOplogApplicationMode) { nss.coll().toString(), cmdObj, OplogApplication::Mode::kInitialSync, - {}, &resultBuilder)); ASSERT_EQUALS(1, countLogLinesContaining("oplog application mode: InitialSync")); @@ -341,7 +340,6 @@ TEST_F(ApplyOpsTest, ApplyOpsPropagatesOplogApplicationMode) { nss.coll().toString(), cmdObj, OplogApplication::Mode::kSecondary, - {}, &resultBuilder)); ASSERT_EQUALS(1, countLogLinesContaining("oplog application mode: Secondary")); @@ -504,7 +502,7 @@ TEST_F(ApplyOpsTest, ApplyOpsFailsToDropAdmin) { auto dropDatabaseCmdObj = BSON("applyOps" << BSON_ARRAY(dropDatabaseOp)); BSONObjBuilder resultBuilder; auto status = - applyOps(opCtx.get(), nss.db().toString(), dropDatabaseCmdObj, mode, {}, &resultBuilder); + applyOps(opCtx.get(), nss.db().toString(), dropDatabaseCmdObj, mode, &resultBuilder); ASSERT_EQUALS(ErrorCodes::IllegalOperation, status); } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 9dad9bcacc7..cb38db43b73 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -977,8 +977,7 @@ std::map<std::string, ApplyOpMetadata> opsMap = { const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode) -> Status { - BSONObjBuilder resultWeDontCareAbout; - return applyOps(opCtx, nsToDatabase(ns), cmd, mode, opTime, &resultWeDontCareAbout); + return applyApplyOpsOplogEntry(opCtx, entry, mode); }}}, {"convertToCapped", {[](OperationContext* opCtx, @@ -1018,17 +1017,7 @@ std::map<std::string, ApplyOpMetadata> opsMap = { const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode) -> Status { - // We don't put transactions into the prepare state until the end of recovery, so there is - // no transaction to abort. - if (mode == OplogApplication::Mode::kRecovering) { - return Status::OK(); - } - // Session has been checked out by sync_tail. - auto transaction = TransactionParticipant::get(opCtx); - invariant(transaction); - transaction->unstashTransactionResources(opCtx, "abortTransaction"); - transaction->abortActiveTransaction(opCtx); - return Status::OK(); + return TransactionParticipant::applyAbortTransaction(opCtx, entry, mode); }}}, }; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 017d66adf92..f67c3e005ec 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -56,7 +56,6 @@ #include "mongo/db/logical_session_id.h" #include "mongo/db/multi_key_path_tracker.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/operation_context_session_mongod.h" #include "mongo/db/query/query_knobs.h" #include "mongo/db/repl/applier_helpers.h" #include "mongo/db/repl/apply_ops.h" @@ -1174,22 +1173,6 @@ Status multiSyncApply(OperationContext* opCtx, // If we didn't create a group, try to apply the op individually. try { - // The write on transaction table may be applied concurrently, so refreshing state - // from disk may read that write, causing starting a new transaction on an existing - // txnNumber. Thus, we start a new transaction without refreshing state from disk. - boost::optional<OperationContextSessionMongodWithoutRefresh> sessionTxnState; - if (entry.shouldPrepare() || - entry.getCommandType() == OplogEntry::CommandType::kAbortTransaction) { - // The update on transaction table may be scheduled to the same writer. - invariant(ops->size() <= 2); - // Transaction operations are in its own batch, so we can modify their opCtx. - invariant(entry.getSessionId()); - invariant(entry.getTxnNumber()); - opCtx->setLogicalSessionId(*entry.getSessionId()); - opCtx->setTxnNumber(*entry.getTxnNumber()); - // Check out the session, with autoCommit = false and startMultiDocTxn = true. - sessionTxnState.emplace(opCtx); - } const Status status = SyncTail::syncApply(opCtx, entry.raw, oplogApplicationMode); if (!status.isOK()) { diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 1a658477cb9..5f4d47fc0f9 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -46,6 +46,7 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/op_observer.h" +#include "mongo/db/operation_context_session_mongod.h" #include "mongo/db/ops/update.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/repl/repl_client_info.h" @@ -294,6 +295,40 @@ const LogicalSessionId& TransactionParticipant::_sessionId() const { return owningSession->getSessionId(); } +Status TransactionParticipant::applyAbortTransaction(OperationContext* opCtx, + const repl::OplogEntry& entry, + repl::OplogApplication::Mode mode) { + // We don't put transactions into the prepare state until the end of recovery, so there is + // no transaction to abort. + if (mode == repl::OplogApplication::Mode::kRecovering) { + return Status::OK(); + } + + // Return error if run via applyOps command. + uassert(50972, + "abortTransaction is only used internally by secondaries.", + mode != repl::OplogApplication::Mode::kApplyOpsCmd); + + // TODO: SERVER-36492 Only run on secondary until we support initial sync. + invariant(mode == repl::OplogApplication::Mode::kSecondary); + + // Transaction operations are in its own batch, so we can modify their opCtx. + invariant(entry.getSessionId()); + invariant(entry.getTxnNumber()); + opCtx->setLogicalSessionId(*entry.getSessionId()); + opCtx->setTxnNumber(*entry.getTxnNumber()); + // The write on transaction table may be applied concurrently, so refreshing state + // from disk may read that write, causing starting a new transaction on an existing + // txnNumber. Thus, we start a new transaction without refreshing state from disk. + OperationContextSessionMongodWithoutRefresh sessionCheckout(opCtx); + + auto transaction = TransactionParticipant::get(opCtx); + transaction->unstashTransactionResources(opCtx, "abortTransaction"); + transaction->abortActiveTransaction(opCtx); + return Status::OK(); +} + + void TransactionParticipant::_beginOrContinueRetryableWrite(WithLock wl, TxnNumber txnNumber) { if (txnNumber > _activeTxnNumber) { // New retryable write. diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 29e9b3e8bfe..18048474617 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -161,6 +161,13 @@ public: static TransactionParticipant* getFromNonCheckedOutSession(Session* session); /** + * Apply `abortTransaction` oplog entry. + */ + static Status applyAbortTransaction(OperationContext* opCtx, + const repl::OplogEntry& entry, + repl::OplogApplication::Mode mode); + + /** * Kills the transaction if it is running, ensuring that it releases all resources, even if the * transaction is in prepare(). Avoids writing any oplog entries or making any changes to the * transaction table. State for prepared transactions will be re-constituted at startup. diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 03e9d193e22..018720d4938 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -289,7 +289,6 @@ public: dbName, BSON("applyOps" << applyOpsList), repl::OplogApplication::Mode::kApplyOpsCmd, - {}, &result); if (!status.isOK()) { return status; @@ -308,7 +307,6 @@ public: dbName, BSON("applyOps" << applyOpsList << "allowAtomic" << false), repl::OplogApplication::Mode::kApplyOpsCmd, - {}, &result); if (!status.isOK()) { return status; @@ -649,7 +647,6 @@ public: << "o" << BSON("applyOps" << BSONArrayBuilder().obj())))), repl::OplogApplication::Mode::kApplyOpsCmd, - {}, &result)); } |