diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-10-02 20:33:35 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-11-06 18:17:56 -0500 |
commit | b4c190b4c2ede6a493cb012aed2107480cc03812 (patch) | |
tree | ed4148ae8f4f69f9d4aa154f119733bd13736a5b /src/mongo/db | |
parent | 77823d2a5267b1b7917190e095f2a7243ad32a76 (diff) | |
download | mongo-b4c190b4c2ede6a493cb012aed2107480cc03812.tar.gz |
SERVER-35877 Secondaries commit transactions when applying commitTransaction oplog entries in their own batch
Also move transaction oplog application into its own file.
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/apply_ops.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/transaction_oplog_application.cpp | 134 | ||||
-rw-r--r-- | src/mongo/db/repl/transaction_oplog_application.h | 53 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 63 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 7 |
9 files changed, 215 insertions, 86 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 1ccf72f16da..995d79bfb5a 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -1040,6 +1040,10 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, void OpObserverImpl::onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, boost::optional<Timestamp> commitTimestamp) { + if (!opCtx->writesAreReplicated()) { + return; + } + invariant(opCtx->getTxnNumber()); Session* const session = OperationContextSession::get(opCtx); invariant(session); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index f922555381c..910d84bebfd 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -10,6 +10,7 @@ env.Library( 'apply_ops.cpp', 'do_txn.cpp', 'oplog.cpp', + 'transaction_oplog_application.cpp', env.Idlc('apply_ops.idl')[0], ], LIBDEPS_PRIVATE=[ diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index 5096cf82c35..ea7e62763a0 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -130,9 +130,6 @@ Status _applyOps(OperationContext* opCtx, Status status(ErrorCodes::InternalError, ""); if (haveWrappingWUOW) { - // Atomic applyOps command already acquired the global write lock. - invariant(opCtx->lockState()->isW() || - oplogApplicationMode != repl::OplogApplication::Mode::kApplyOpsCmd); // Only CRUD operations are allowed in atomic mode. invariant(*opType != 'c'); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 47fd238871b..9cb6367b75c 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -80,13 +80,13 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/timestamp_block.h" +#include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/db/session_catalog.h" #include "mongo/db/stats/counters.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage/storage_options.h" -#include "mongo/db/transaction_history_iterator.h" #include "mongo/db/transaction_participant.h" #include "mongo/platform/random.h" #include "mongo/scripting/engine.h" @@ -1018,32 +1018,7 @@ std::map<std::string, ApplyOpMetadata> opsMap = { const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode) -> Status { - if (mode == OplogApplication::Mode::kRecovering) { - const auto replCoord = ReplicationCoordinator::get(opCtx); - const auto recoveryTimestamp = replCoord->getRecoveryTimestamp(); - invariant(recoveryTimestamp); - - // If the commitTimestamp is before the recoveryTimestamp, then the data already - // reflects the operations from the transaction. - const auto commitTimestamp = cmd["commitTimestamp"].timestamp(); - if (recoveryTimestamp.get() > commitTimestamp) { - return Status::OK(); - } - - // Get the corresponding prepareTransaction oplog entry. - TransactionHistoryIterator iter(opTime); - invariant(iter.hasNext()); - const auto commitOplogEntry = iter.next(opCtx); - invariant(iter.hasNext()); - const auto prepareOplogEntry = iter.next(opCtx); - - // Transform prepare command into a normal applyOps command. - const auto prepareCmd = prepareOplogEntry.getOperationToApply().removeField("prepare"); - - BSONObjBuilder resultWeDontCareAbout; - return applyOps(opCtx, nsToDatabase(ns), prepareCmd, mode, &resultWeDontCareAbout); - } - return Status::OK(); + return applyCommitTransaction(opCtx, entry, mode); }}}, {"abortTransaction", {[](OperationContext* opCtx, @@ -1053,7 +1028,7 @@ std::map<std::string, ApplyOpMetadata> opsMap = { const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode) -> Status { - return TransactionParticipant::applyAbortTransaction(opCtx, entry, mode); + return applyAbortTransaction(opCtx, entry, mode); }}}, }; @@ -1632,7 +1607,8 @@ Status applyCommand_inlock(OperationContext* opCtx, // Don't assign commit timestamp for transaction commands. const StringData commandName(o.firstElementFieldName()); - if (op.getBoolField("prepare") || commandName == "abortTransaction") + if (op.getBoolField("prepare") || commandName == "abortTransaction" || + commandName == "commitTransaction") return false; switch (replMode) { diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index e848471f83c..140f76c7a7d 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -363,7 +363,7 @@ Status SyncTail::syncApply(OperationContext* opCtx, const StringData commandName(op["o"].embeddedObject().firstElementFieldName()); // SERVER-37313: createIndex does not need to take the Global X lock. if (!op.getBoolField("prepare") && commandName != "abortTransaction" && - commandName != "createIndexes") { + commandName != "createIndexes" && commandName != "commitTransaction") { globalWriteLock.emplace(opCtx); } diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp new file mode 100644 index 00000000000..00460ed68f2 --- /dev/null +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -0,0 +1,134 @@ + +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/repl/transaction_oplog_application.h" + +#include "mongo/platform/basic.h" + +#include "mongo/db/commands/txn_cmds_gen.h" +#include "mongo/db/operation_context_session_mongod.h" +#include "mongo/db/repl/apply_ops.h" +#include "mongo/db/transaction_history_iterator.h" +#include "mongo/db/transaction_participant.h" + +namespace mongo { + +Status applyCommitTransaction(OperationContext* opCtx, + const repl::OplogEntry& entry, + repl::OplogApplication::Mode mode) { + // Return error if run via applyOps command. + uassert(50987, + "commitTransaction is only used internally by secondaries.", + mode != repl::OplogApplication::Mode::kApplyOpsCmd); + + IDLParserErrorContext ctx("commitTransaction"); + auto commitCommand = CommitTransactionOplogObject::parse(ctx, entry.getObject()); + + if (mode == repl::OplogApplication::Mode::kRecovering) { + const auto replCoord = repl::ReplicationCoordinator::get(opCtx); + const auto recoveryTimestamp = replCoord->getRecoveryTimestamp(); + invariant(recoveryTimestamp); + + // If the commitTimestamp is before the recoveryTimestamp, then the data already + // reflects the operations from the transaction. + const auto& commitTimestamp = commitCommand.getCommitTimestamp(); + if (recoveryTimestamp.get() > commitTimestamp) { + return Status::OK(); + } + + // Get the corresponding prepareTransaction oplog entry. + TransactionHistoryIterator iter(entry.getOpTime()); + invariant(iter.hasNext()); + const auto commitOplogEntry = iter.next(opCtx); + invariant(iter.hasNext()); + const auto prepareOplogEntry = iter.next(opCtx); + + // Transform prepare command into a normal applyOps command. + const auto prepareCmd = prepareOplogEntry.getOperationToApply().removeField("prepare"); + + BSONObjBuilder resultWeDontCareAbout; + return applyOps( + opCtx, entry.getNss().db().toString(), prepareCmd, mode, &resultWeDontCareAbout); + } + + // TODO: SERVER-36492 Only run on secondary until we support initial sync. + invariant(mode == repl::OplogApplication::Mode::kSecondary); + + // Transaction operations are in its own batch, so we can modify their opCtx. + invariant(entry.getSessionId()); + invariant(entry.getTxnNumber()); + opCtx->setLogicalSessionId(*entry.getSessionId()); + opCtx->setTxnNumber(*entry.getTxnNumber()); + // The write on transaction table may be applied concurrently, so refreshing state + // from disk may read that write, causing starting a new transaction on an existing + // txnNumber. Thus, we start a new transaction without refreshing state from disk. + OperationContextSessionMongodWithoutRefresh sessionCheckout(opCtx); + + auto transaction = TransactionParticipant::get(opCtx); + invariant(transaction); + transaction->unstashTransactionResources(opCtx, "commitTransaction"); + transaction->commitPreparedTransaction(opCtx, commitCommand.getCommitTimestamp()); + return Status::OK(); +} + +Status applyAbortTransaction(OperationContext* opCtx, + const repl::OplogEntry& entry, + repl::OplogApplication::Mode mode) { + // Return error if run via applyOps command. + uassert(50972, + "abortTransaction is only used internally by secondaries.", + mode != repl::OplogApplication::Mode::kApplyOpsCmd); + + // We don't put transactions into the prepare state until the end of recovery, so there is + // no transaction to abort. + if (mode == repl::OplogApplication::Mode::kRecovering) { + return Status::OK(); + } + + // TODO: SERVER-36492 Only run on secondary until we support initial sync. + invariant(mode == repl::OplogApplication::Mode::kSecondary); + + // Transaction operations are in its own batch, so we can modify their opCtx. + invariant(entry.getSessionId()); + invariant(entry.getTxnNumber()); + opCtx->setLogicalSessionId(*entry.getSessionId()); + opCtx->setTxnNumber(*entry.getTxnNumber()); + // The write on transaction table may be applied concurrently, so refreshing state + // from disk may read that write, causing starting a new transaction on an existing + // txnNumber. Thus, we start a new transaction without refreshing state from disk. + OperationContextSessionMongodWithoutRefresh sessionCheckout(opCtx); + + auto transaction = TransactionParticipant::get(opCtx); + transaction->unstashTransactionResources(opCtx, "abortTransaction"); + transaction->abortActiveTransaction(opCtx); + return Status::OK(); +} + +} // namespace mongo diff --git a/src/mongo/db/repl/transaction_oplog_application.h b/src/mongo/db/repl/transaction_oplog_application.h new file mode 100644 index 00000000000..5880701c34f --- /dev/null +++ b/src/mongo/db/repl/transaction_oplog_application.h @@ -0,0 +1,53 @@ + +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_entry.h" + +namespace mongo { + +/** + * Apply `commitTransaction` oplog entry. + */ +Status applyCommitTransaction(OperationContext* opCtx, + const repl::OplogEntry& entry, + repl::OplogApplication::Mode mode); + +/** + * Apply `abortTransaction` oplog entry. + */ +Status applyAbortTransaction(OperationContext* opCtx, + const repl::OplogEntry& entry, + repl::OplogApplication::Mode mode); + +} // namespace mongo diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index b06cb225bda..a6b1c14b532 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -48,7 +48,6 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/op_observer.h" -#include "mongo/db/operation_context_session_mongod.h" #include "mongo/db/ops/update.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/repl/repl_client_info.h" @@ -297,40 +296,6 @@ const LogicalSessionId& TransactionParticipant::_sessionId() const { return owningSession->getSessionId(); } -Status TransactionParticipant::applyAbortTransaction(OperationContext* opCtx, - const repl::OplogEntry& entry, - repl::OplogApplication::Mode mode) { - // We don't put transactions into the prepare state until the end of recovery, so there is - // no transaction to abort. - if (mode == repl::OplogApplication::Mode::kRecovering) { - return Status::OK(); - } - - // Return error if run via applyOps command. - uassert(50972, - "abortTransaction is only used internally by secondaries.", - mode != repl::OplogApplication::Mode::kApplyOpsCmd); - - // TODO: SERVER-36492 Only run on secondary until we support initial sync. - invariant(mode == repl::OplogApplication::Mode::kSecondary); - - // Transaction operations are in its own batch, so we can modify their opCtx. - invariant(entry.getSessionId()); - invariant(entry.getTxnNumber()); - opCtx->setLogicalSessionId(*entry.getSessionId()); - opCtx->setTxnNumber(*entry.getTxnNumber()); - // The write on transaction table may be applied concurrently, so refreshing state - // from disk may read that write, causing starting a new transaction on an existing - // txnNumber. Thus, we start a new transaction without refreshing state from disk. - OperationContextSessionMongodWithoutRefresh sessionCheckout(opCtx); - - auto transaction = TransactionParticipant::get(opCtx); - transaction->unstashTransactionResources(opCtx, "abortTransaction"); - transaction->abortActiveTransaction(opCtx); - return Status::OK(); -} - - void TransactionParticipant::_beginOrContinueRetryableWrite(WithLock wl, TxnNumber txnNumber) { if (txnNumber > _activeTxnNumber) { // New retryable write. @@ -945,17 +910,23 @@ void TransactionParticipant::commitPreparedTransaction(OperationContext* opCtx, opCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp); try { - // We reserve an oplog slot before committing the transaction so that no writes that are - // causally related to the transaction commit enter the oplog at a timestamp earlier than - // the commit oplog entry. - OplogSlotReserver oplogSlotReserver(opCtx); - const auto commitOplogSlot = oplogSlotReserver.getReservedOplogSlot(); - invariant(commitOplogSlot.opTime.getTimestamp() >= commitTimestamp, - str::stream() << "Commit oplog entry must be greater than or equal to commit " - "timestamp due to causal consistency. commit timestamp: " - << commitTimestamp.toBSON() - << ", commit oplog entry optime: " - << commitOplogSlot.opTime.toBSON()); + // On secondary, we generate a fake empty oplog slot, since it's not used by opObserver. + OplogSlot commitOplogSlot; + boost::optional<OplogSlotReserver> oplogSlotReserver; + + // On primary, we reserve an oplog slot before committing the transaction so that no + // writes that are causally related to the transaction commit enter the oplog at a + // timestamp earlier than the commit oplog entry. + if (opCtx->writesAreReplicated()) { + oplogSlotReserver.emplace(opCtx); + commitOplogSlot = oplogSlotReserver->getReservedOplogSlot(); + invariant(commitOplogSlot.opTime.getTimestamp() >= commitTimestamp, + str::stream() << "Commit oplog entry must be greater than or equal to commit " + "timestamp due to causal consistency. commit timestamp: " + << commitTimestamp.toBSON() + << ", commit oplog entry optime: " + << commitOplogSlot.opTime.toBSON()); + } // We need to unlock the session to run the opObserver onTransactionCommit, which calls back // into the session. We also do not want to write to storage with the mutex locked. diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index d21a5c1ff87..e543b7213a5 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -156,13 +156,6 @@ public: static TransactionParticipant* getFromNonCheckedOutSession(Session* session); /** - * Apply `abortTransaction` oplog entry. - */ - static Status applyAbortTransaction(OperationContext* opCtx, - const repl::OplogEntry& entry, - repl::OplogApplication::Mode mode); - - /** * Kills the transaction if it is running, ensuring that it releases all resources, even if the * transaction is in prepare(). Avoids writing any oplog entries or making any changes to the * transaction table. State for prepared transactions will be re-constituted at startup. |