summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2018-10-02 20:33:35 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2018-11-06 18:17:56 -0500
commitb4c190b4c2ede6a493cb012aed2107480cc03812 (patch)
treeed4148ae8f4f69f9d4aa154f119733bd13736a5b /src/mongo/db
parent77823d2a5267b1b7917190e095f2a7243ad32a76 (diff)
downloadmongo-b4c190b4c2ede6a493cb012aed2107480cc03812.tar.gz
SERVER-35877 Secondaries commit transactions when applying commitTransaction oplog entries in their own batch
Also move transaction oplog application into its own file.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/op_observer_impl.cpp4
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/apply_ops.cpp3
-rw-r--r--src/mongo/db/repl/oplog.cpp34
-rw-r--r--src/mongo/db/repl/sync_tail.cpp2
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp134
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.h53
-rw-r--r--src/mongo/db/transaction_participant.cpp63
-rw-r--r--src/mongo/db/transaction_participant.h7
9 files changed, 215 insertions, 86 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 1ccf72f16da..995d79bfb5a 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -1040,6 +1040,10 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
void OpObserverImpl::onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
boost::optional<Timestamp> commitTimestamp) {
+ if (!opCtx->writesAreReplicated()) {
+ return;
+ }
+
invariant(opCtx->getTxnNumber());
Session* const session = OperationContextSession::get(opCtx);
invariant(session);
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index f922555381c..910d84bebfd 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -10,6 +10,7 @@ env.Library(
'apply_ops.cpp',
'do_txn.cpp',
'oplog.cpp',
+ 'transaction_oplog_application.cpp',
env.Idlc('apply_ops.idl')[0],
],
LIBDEPS_PRIVATE=[
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 5096cf82c35..ea7e62763a0 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -130,9 +130,6 @@ Status _applyOps(OperationContext* opCtx,
Status status(ErrorCodes::InternalError, "");
if (haveWrappingWUOW) {
- // Atomic applyOps command already acquired the global write lock.
- invariant(opCtx->lockState()->isW() ||
- oplogApplicationMode != repl::OplogApplication::Mode::kApplyOpsCmd);
// Only CRUD operations are allowed in atomic mode.
invariant(*opType != 'c');
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 47fd238871b..9cb6367b75c 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -80,13 +80,13 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/timestamp_block.h"
+#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/storage/storage_options.h"
-#include "mongo/db/transaction_history_iterator.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/platform/random.h"
#include "mongo/scripting/engine.h"
@@ -1018,32 +1018,7 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode) -> Status {
- if (mode == OplogApplication::Mode::kRecovering) {
- const auto replCoord = ReplicationCoordinator::get(opCtx);
- const auto recoveryTimestamp = replCoord->getRecoveryTimestamp();
- invariant(recoveryTimestamp);
-
- // If the commitTimestamp is before the recoveryTimestamp, then the data already
- // reflects the operations from the transaction.
- const auto commitTimestamp = cmd["commitTimestamp"].timestamp();
- if (recoveryTimestamp.get() > commitTimestamp) {
- return Status::OK();
- }
-
- // Get the corresponding prepareTransaction oplog entry.
- TransactionHistoryIterator iter(opTime);
- invariant(iter.hasNext());
- const auto commitOplogEntry = iter.next(opCtx);
- invariant(iter.hasNext());
- const auto prepareOplogEntry = iter.next(opCtx);
-
- // Transform prepare command into a normal applyOps command.
- const auto prepareCmd = prepareOplogEntry.getOperationToApply().removeField("prepare");
-
- BSONObjBuilder resultWeDontCareAbout;
- return applyOps(opCtx, nsToDatabase(ns), prepareCmd, mode, &resultWeDontCareAbout);
- }
- return Status::OK();
+ return applyCommitTransaction(opCtx, entry, mode);
}}},
{"abortTransaction",
{[](OperationContext* opCtx,
@@ -1053,7 +1028,7 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode) -> Status {
- return TransactionParticipant::applyAbortTransaction(opCtx, entry, mode);
+ return applyAbortTransaction(opCtx, entry, mode);
}}},
};
@@ -1632,7 +1607,8 @@ Status applyCommand_inlock(OperationContext* opCtx,
// Don't assign commit timestamp for transaction commands.
const StringData commandName(o.firstElementFieldName());
- if (op.getBoolField("prepare") || commandName == "abortTransaction")
+ if (op.getBoolField("prepare") || commandName == "abortTransaction" ||
+ commandName == "commitTransaction")
return false;
switch (replMode) {
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index e848471f83c..140f76c7a7d 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -363,7 +363,7 @@ Status SyncTail::syncApply(OperationContext* opCtx,
const StringData commandName(op["o"].embeddedObject().firstElementFieldName());
// SERVER-37313: createIndex does not need to take the Global X lock.
if (!op.getBoolField("prepare") && commandName != "abortTransaction" &&
- commandName != "createIndexes") {
+ commandName != "createIndexes" && commandName != "commitTransaction") {
globalWriteLock.emplace(opCtx);
}
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
new file mode 100644
index 00000000000..00460ed68f2
--- /dev/null
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -0,0 +1,134 @@
+
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/repl/transaction_oplog_application.h"
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/commands/txn_cmds_gen.h"
+#include "mongo/db/operation_context_session_mongod.h"
+#include "mongo/db/repl/apply_ops.h"
+#include "mongo/db/transaction_history_iterator.h"
+#include "mongo/db/transaction_participant.h"
+
+namespace mongo {
+
+Status applyCommitTransaction(OperationContext* opCtx,
+ const repl::OplogEntry& entry,
+ repl::OplogApplication::Mode mode) {
+ // Return error if run via applyOps command.
+ uassert(50987,
+ "commitTransaction is only used internally by secondaries.",
+ mode != repl::OplogApplication::Mode::kApplyOpsCmd);
+
+ IDLParserErrorContext ctx("commitTransaction");
+ auto commitCommand = CommitTransactionOplogObject::parse(ctx, entry.getObject());
+
+ if (mode == repl::OplogApplication::Mode::kRecovering) {
+ const auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ const auto recoveryTimestamp = replCoord->getRecoveryTimestamp();
+ invariant(recoveryTimestamp);
+
+ // If the commitTimestamp is before the recoveryTimestamp, then the data already
+ // reflects the operations from the transaction.
+ const auto& commitTimestamp = commitCommand.getCommitTimestamp();
+ if (recoveryTimestamp.get() > commitTimestamp) {
+ return Status::OK();
+ }
+
+ // Get the corresponding prepareTransaction oplog entry.
+ TransactionHistoryIterator iter(entry.getOpTime());
+ invariant(iter.hasNext());
+ const auto commitOplogEntry = iter.next(opCtx);
+ invariant(iter.hasNext());
+ const auto prepareOplogEntry = iter.next(opCtx);
+
+ // Transform prepare command into a normal applyOps command.
+ const auto prepareCmd = prepareOplogEntry.getOperationToApply().removeField("prepare");
+
+ BSONObjBuilder resultWeDontCareAbout;
+ return applyOps(
+ opCtx, entry.getNss().db().toString(), prepareCmd, mode, &resultWeDontCareAbout);
+ }
+
+ // TODO: SERVER-36492 Only run on secondary until we support initial sync.
+ invariant(mode == repl::OplogApplication::Mode::kSecondary);
+
+ // Transaction operations are in its own batch, so we can modify their opCtx.
+ invariant(entry.getSessionId());
+ invariant(entry.getTxnNumber());
+ opCtx->setLogicalSessionId(*entry.getSessionId());
+ opCtx->setTxnNumber(*entry.getTxnNumber());
+ // The write on transaction table may be applied concurrently, so refreshing state
+ // from disk may read that write, causing starting a new transaction on an existing
+ // txnNumber. Thus, we start a new transaction without refreshing state from disk.
+ OperationContextSessionMongodWithoutRefresh sessionCheckout(opCtx);
+
+ auto transaction = TransactionParticipant::get(opCtx);
+ invariant(transaction);
+ transaction->unstashTransactionResources(opCtx, "commitTransaction");
+ transaction->commitPreparedTransaction(opCtx, commitCommand.getCommitTimestamp());
+ return Status::OK();
+}
+
+Status applyAbortTransaction(OperationContext* opCtx,
+ const repl::OplogEntry& entry,
+ repl::OplogApplication::Mode mode) {
+ // Return error if run via applyOps command.
+ uassert(50972,
+ "abortTransaction is only used internally by secondaries.",
+ mode != repl::OplogApplication::Mode::kApplyOpsCmd);
+
+ // We don't put transactions into the prepare state until the end of recovery, so there is
+ // no transaction to abort.
+ if (mode == repl::OplogApplication::Mode::kRecovering) {
+ return Status::OK();
+ }
+
+ // TODO: SERVER-36492 Only run on secondary until we support initial sync.
+ invariant(mode == repl::OplogApplication::Mode::kSecondary);
+
+ // Transaction operations are in its own batch, so we can modify their opCtx.
+ invariant(entry.getSessionId());
+ invariant(entry.getTxnNumber());
+ opCtx->setLogicalSessionId(*entry.getSessionId());
+ opCtx->setTxnNumber(*entry.getTxnNumber());
+ // The write on transaction table may be applied concurrently, so refreshing state
+ // from disk may read that write, causing starting a new transaction on an existing
+ // txnNumber. Thus, we start a new transaction without refreshing state from disk.
+ OperationContextSessionMongodWithoutRefresh sessionCheckout(opCtx);
+
+ auto transaction = TransactionParticipant::get(opCtx);
+ transaction->unstashTransactionResources(opCtx, "abortTransaction");
+ transaction->abortActiveTransaction(opCtx);
+ return Status::OK();
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/repl/transaction_oplog_application.h b/src/mongo/db/repl/transaction_oplog_application.h
new file mode 100644
index 00000000000..5880701c34f
--- /dev/null
+++ b/src/mongo/db/repl/transaction_oplog_application.h
@@ -0,0 +1,53 @@
+
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_entry.h"
+
+namespace mongo {
+
+/**
+ * Apply `commitTransaction` oplog entry.
+ */
+Status applyCommitTransaction(OperationContext* opCtx,
+ const repl::OplogEntry& entry,
+ repl::OplogApplication::Mode mode);
+
+/**
+ * Apply `abortTransaction` oplog entry.
+ */
+Status applyAbortTransaction(OperationContext* opCtx,
+ const repl::OplogEntry& entry,
+ repl::OplogApplication::Mode mode);
+
+} // namespace mongo
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index b06cb225bda..a6b1c14b532 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -48,7 +48,6 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/op_observer.h"
-#include "mongo/db/operation_context_session_mongod.h"
#include "mongo/db/ops/update.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/repl/repl_client_info.h"
@@ -297,40 +296,6 @@ const LogicalSessionId& TransactionParticipant::_sessionId() const {
return owningSession->getSessionId();
}
-Status TransactionParticipant::applyAbortTransaction(OperationContext* opCtx,
- const repl::OplogEntry& entry,
- repl::OplogApplication::Mode mode) {
- // We don't put transactions into the prepare state until the end of recovery, so there is
- // no transaction to abort.
- if (mode == repl::OplogApplication::Mode::kRecovering) {
- return Status::OK();
- }
-
- // Return error if run via applyOps command.
- uassert(50972,
- "abortTransaction is only used internally by secondaries.",
- mode != repl::OplogApplication::Mode::kApplyOpsCmd);
-
- // TODO: SERVER-36492 Only run on secondary until we support initial sync.
- invariant(mode == repl::OplogApplication::Mode::kSecondary);
-
- // Transaction operations are in its own batch, so we can modify their opCtx.
- invariant(entry.getSessionId());
- invariant(entry.getTxnNumber());
- opCtx->setLogicalSessionId(*entry.getSessionId());
- opCtx->setTxnNumber(*entry.getTxnNumber());
- // The write on transaction table may be applied concurrently, so refreshing state
- // from disk may read that write, causing starting a new transaction on an existing
- // txnNumber. Thus, we start a new transaction without refreshing state from disk.
- OperationContextSessionMongodWithoutRefresh sessionCheckout(opCtx);
-
- auto transaction = TransactionParticipant::get(opCtx);
- transaction->unstashTransactionResources(opCtx, "abortTransaction");
- transaction->abortActiveTransaction(opCtx);
- return Status::OK();
-}
-
-
void TransactionParticipant::_beginOrContinueRetryableWrite(WithLock wl, TxnNumber txnNumber) {
if (txnNumber > _activeTxnNumber) {
// New retryable write.
@@ -945,17 +910,23 @@ void TransactionParticipant::commitPreparedTransaction(OperationContext* opCtx,
opCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp);
try {
- // We reserve an oplog slot before committing the transaction so that no writes that are
- // causally related to the transaction commit enter the oplog at a timestamp earlier than
- // the commit oplog entry.
- OplogSlotReserver oplogSlotReserver(opCtx);
- const auto commitOplogSlot = oplogSlotReserver.getReservedOplogSlot();
- invariant(commitOplogSlot.opTime.getTimestamp() >= commitTimestamp,
- str::stream() << "Commit oplog entry must be greater than or equal to commit "
- "timestamp due to causal consistency. commit timestamp: "
- << commitTimestamp.toBSON()
- << ", commit oplog entry optime: "
- << commitOplogSlot.opTime.toBSON());
+ // On secondary, we generate a fake empty oplog slot, since it's not used by opObserver.
+ OplogSlot commitOplogSlot;
+ boost::optional<OplogSlotReserver> oplogSlotReserver;
+
+ // On primary, we reserve an oplog slot before committing the transaction so that no
+ // writes that are causally related to the transaction commit enter the oplog at a
+ // timestamp earlier than the commit oplog entry.
+ if (opCtx->writesAreReplicated()) {
+ oplogSlotReserver.emplace(opCtx);
+ commitOplogSlot = oplogSlotReserver->getReservedOplogSlot();
+ invariant(commitOplogSlot.opTime.getTimestamp() >= commitTimestamp,
+ str::stream() << "Commit oplog entry must be greater than or equal to commit "
+ "timestamp due to causal consistency. commit timestamp: "
+ << commitTimestamp.toBSON()
+ << ", commit oplog entry optime: "
+ << commitOplogSlot.opTime.toBSON());
+ }
// We need to unlock the session to run the opObserver onTransactionCommit, which calls back
// into the session. We also do not want to write to storage with the mutex locked.
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index d21a5c1ff87..e543b7213a5 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -156,13 +156,6 @@ public:
static TransactionParticipant* getFromNonCheckedOutSession(Session* session);
/**
- * Apply `abortTransaction` oplog entry.
- */
- static Status applyAbortTransaction(OperationContext* opCtx,
- const repl::OplogEntry& entry,
- repl::OplogApplication::Mode mode);
-
- /**
* Kills the transaction if it is running, ensuring that it releases all resources, even if the
* transaction is in prepare(). Avoids writing any oplog entries or making any changes to the
* transaction table. State for prepared transactions will be re-constituted at startup.