summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/commands/apply_ops_cmd.cpp2
-rw-r--r--src/mongo/db/repl/apply_ops.cpp85
-rw-r--r--src/mongo/db/repl/apply_ops.h4
-rw-r--r--src/mongo/db/repl/apply_ops_test.cpp18
-rw-r--r--src/mongo/db/repl/oplog.cpp15
-rw-r--r--src/mongo/db/repl/sync_tail.cpp17
-rw-r--r--src/mongo/db/transaction_participant.cpp35
-rw-r--r--src/mongo/db/transaction_participant.h7
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp3
9 files changed, 107 insertions, 79 deletions
diff --git a/src/mongo/db/commands/apply_ops_cmd.cpp b/src/mongo/db/commands/apply_ops_cmd.cpp
index da7f16f886e..d9ecc23b99b 100644
--- a/src/mongo/db/commands/apply_ops_cmd.cpp
+++ b/src/mongo/db/commands/apply_ops_cmd.cpp
@@ -260,7 +260,7 @@ public:
}
auto applyOpsStatus = CommandHelpers::appendCommandStatusNoThrow(
- result, repl::applyOps(opCtx, dbname, cmdObj, oplogApplicationMode, {}, &result));
+ result, repl::applyOps(opCtx, dbname, cmdObj, oplogApplicationMode, &result));
return applyOpsStatus;
}
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 41e1dc4cdfa..8f638cd6fe7 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/matcher/matcher.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/operation_context_session_mongod.h"
#include "mongo/db/query/collation/collation_spec.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
@@ -278,14 +279,8 @@ Status _applyOps(OperationContext* opCtx,
}
Status _applyPrepareTransaction(OperationContext* opCtx,
- const std::string& dbName,
- const BSONObj& applyOpCmd,
- const ApplyOpsCommandInfo& info,
- repl::OplogApplication::Mode oplogApplicationMode,
- BSONObjBuilder* result,
- int* numApplied,
- BSONArrayBuilder* opsBuilder,
- const OpTime& optime) {
+ const repl::OplogEntry& entry,
+ repl::OplogApplication::Mode oplogApplicationMode) {
// Wait until the end of recovery to apply the operations from the prepared transaction.
if (oplogApplicationMode == OplogApplication::Mode::kRecovering) {
if (!serverGlobalParams.enableMajorityReadConcern) {
@@ -304,22 +299,42 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
// TODO: SERVER-36492 Only run on secondary until we support initial sync.
invariant(oplogApplicationMode == repl::OplogApplication::Mode::kSecondary);
+ const auto info = ApplyOpsCommandInfo::parse(entry.getObject());
+ invariant(info.getPrepare() && *info.getPrepare());
uassert(
50946,
"applyOps with prepared must only include CRUD operations and cannot have precondition.",
!info.getPreCondition() && info.areOpsCrudOnly());
- // Session has been checked out by sync_tail.
- auto transaction = TransactionParticipant::get(opCtx);
- invariant(transaction);
+ // Transaction operations are in its own batch, so we can modify their opCtx.
+ invariant(entry.getSessionId());
+ invariant(entry.getTxnNumber());
+ opCtx->setLogicalSessionId(*entry.getSessionId());
+ opCtx->setTxnNumber(*entry.getTxnNumber());
+ // The write on transaction table may be applied concurrently, so refreshing state
+ // from disk may read that write, causing starting a new transaction on an existing
+ // txnNumber. Thus, we start a new transaction without refreshing state from disk.
+ OperationContextSessionMongodWithoutRefresh sessionCheckout(opCtx);
+ auto transaction = TransactionParticipant::get(opCtx);
transaction->unstashTransactionResources(opCtx, "prepareTransaction");
- auto status = _applyOps(
- opCtx, dbName, applyOpCmd, info, oplogApplicationMode, result, numApplied, opsBuilder);
+
+ // Apply the operations via applysOps functionality.
+ int numApplied = 0;
+ BSONObjBuilder resultWeDontCareAbout;
+ auto status = _applyOps(opCtx,
+ entry.getNss().db().toString(),
+ entry.getObject(),
+ info,
+ oplogApplicationMode,
+ &resultWeDontCareAbout,
+ &numApplied,
+ nullptr);
if (!status.isOK()) {
return status;
}
- transaction->prepareTransaction(opCtx, optime);
+ invariant(!entry.getOpTime().isNull());
+ transaction->prepareTransaction(opCtx, entry.getOpTime());
transaction->stashTransactionResources(opCtx);
return Status::OK();
}
@@ -401,39 +416,41 @@ ApplyOpsCommandInfo::ApplyOpsCommandInfo(const BSONObj& applyOpCmd)
}
}
+Status applyApplyOpsOplogEntry(OperationContext* opCtx,
+ const OplogEntry& entry,
+ repl::OplogApplication::Mode oplogApplicationMode) {
+ // Apply prepare transaction operation if "prepare" is true.
+ // The lock requirement of transaction operations should be the same as that on the primary,
+ // so we don't acquire the locks conservatively for them.
+ if (entry.shouldPrepare()) {
+ return _applyPrepareTransaction(opCtx, entry, oplogApplicationMode);
+ }
+ BSONObjBuilder resultWeDontCareAbout;
+ return applyOps(opCtx,
+ entry.getNss().db().toString(),
+ entry.getObject(),
+ oplogApplicationMode,
+ &resultWeDontCareAbout);
+}
+
Status applyOps(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& applyOpCmd,
repl::OplogApplication::Mode oplogApplicationMode,
- boost::optional<OpTime> optime,
BSONObjBuilder* result) {
auto info = ApplyOpsCommandInfo::parse(applyOpCmd);
int numApplied = 0;
- // Apply prepare transaction operation if "prepare" is true.
- // The lock requirement of transaction operations should be the same as that on the primary,
- // so we don't acquire the locks conservatively for them.
- if (info.getPrepare().get_value_or(false)) {
- invariant(optime);
- return _applyPrepareTransaction(opCtx,
- dbName,
- applyOpCmd,
- info,
- oplogApplicationMode,
- result,
- &numApplied,
- nullptr,
- *optime);
- }
-
boost::optional<Lock::GlobalWrite> globalWriteLock;
boost::optional<Lock::DBLock> dbWriteLock;
+ uassert(
+ ErrorCodes::BadValue, "applyOps command can't have 'prepare' field", !info.getPrepare());
+
// There's only one case where we are allowed to take the database lock instead of the global
- // lock - no preconditions; only CRUD ops; non-atomic mode; and not for transaction prepare.
- if (!info.getPreCondition() && info.areOpsCrudOnly() && !info.getAllowAtomic() &&
- !info.getPrepare()) {
+ // lock - no preconditions; only CRUD ops; and non-atomic mode.
+ if (!info.getPreCondition() && info.areOpsCrudOnly() && !info.getAllowAtomic()) {
dbWriteLock.emplace(opCtx, dbName, MODE_IX);
} else {
globalWriteLock.emplace(opCtx);
diff --git a/src/mongo/db/repl/apply_ops.h b/src/mongo/db/repl/apply_ops.h
index 6c53fccad49..a2af668d4fb 100644
--- a/src/mongo/db/repl/apply_ops.h
+++ b/src/mongo/db/repl/apply_ops.h
@@ -96,8 +96,10 @@ Status applyOps(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& applyOpCmd,
repl::OplogApplication::Mode oplogApplicationMode,
- boost::optional<OpTime> optime,
BSONObjBuilder* result);
+Status applyApplyOpsOplogEntry(OperationContext* opCtx,
+ const OplogEntry& entry,
+ repl::OplogApplication::Mode oplogApplicationMode);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/apply_ops_test.cpp b/src/mongo/db/repl/apply_ops_test.cpp
index 84573838b5c..36d167c68ac 100644
--- a/src/mongo/db/repl/apply_ops_test.cpp
+++ b/src/mongo/db/repl/apply_ops_test.cpp
@@ -154,7 +154,7 @@ TEST_F(ApplyOpsTest, CommandInNestedApplyOpsReturnsSuccess) {
<< BSON("applyOps" << BSON_ARRAY(innerCmdObj)));
auto cmdObj = BSON("applyOps" << BSON_ARRAY(innerApplyOpsObj));
- ASSERT_OK(applyOps(opCtx.get(), nss.db().toString(), cmdObj, mode, {}, &resultBuilder));
+ ASSERT_OK(applyOps(opCtx.get(), nss.db().toString(), cmdObj, mode, &resultBuilder));
ASSERT_BSONOBJ_EQ({}, _opObserver->onApplyOpsCmdObj);
}
@@ -184,7 +184,7 @@ TEST_F(ApplyOpsTest, InsertInNestedApplyOpsReturnsSuccess) {
auto cmdObj = BSON("applyOps" << BSON_ARRAY(innerApplyOpsObj));
ASSERT_OK(_storage->createCollection(opCtx.get(), nss, options));
- ASSERT_OK(applyOps(opCtx.get(), nss.db().toString(), cmdObj, mode, {}, &resultBuilder));
+ ASSERT_OK(applyOps(opCtx.get(), nss.db().toString(), cmdObj, mode, &resultBuilder));
ASSERT_BSONOBJ_EQ(BSON("applyOps" << BSON_ARRAY(innerCmdObj)), _opObserver->onApplyOpsCmdObj);
}
@@ -193,7 +193,7 @@ TEST_F(ApplyOpsTest, AtomicApplyOpsWithNoOpsReturnsSuccess) {
auto mode = OplogApplication::Mode::kApplyOpsCmd;
BSONObjBuilder resultBuilder;
auto cmdObj = BSON("applyOps" << BSONArray());
- ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder));
+ ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder));
ASSERT_BSONOBJ_EQ(cmdObj, _opObserver->onApplyOpsCmdObj);
}
@@ -229,7 +229,7 @@ TEST_F(ApplyOpsTest,
auto cmdObj = makeApplyOpsWithInsertOperation(nss, boost::none, documentToInsert);
BSONObjBuilder resultBuilder;
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound,
- applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder));
+ applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder));
auto result = resultBuilder.obj();
auto status = getStatusFromApplyOpsResult(result);
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status);
@@ -250,7 +250,7 @@ TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithUuidIntoCollectionWithUuid) {
auto documentToInsert = BSON("_id" << 0);
auto cmdObj = makeApplyOpsWithInsertOperation(nss, uuid, documentToInsert);
BSONObjBuilder resultBuilder;
- ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder));
+ ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder));
ASSERT_BSONOBJ_EQ(cmdObj, _opObserver->onApplyOpsCmdObj);
}
@@ -273,7 +273,7 @@ TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithUuidIntoCollectionWithOtherUuid) {
auto cmdObj = makeApplyOpsWithInsertOperation(nss, applyOpsUuid, documentToInsert);
BSONObjBuilder resultBuilder;
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound,
- applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder));
+ applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder));
auto result = resultBuilder.obj();
auto status = getStatusFromApplyOpsResult(result);
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status);
@@ -293,7 +293,7 @@ TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithoutUuidIntoCollectionWithUuid) {
auto documentToInsert = BSON("_id" << 0);
auto cmdObj = makeApplyOpsWithInsertOperation(nss, boost::none, documentToInsert);
BSONObjBuilder resultBuilder;
- ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, {}, &resultBuilder));
+ ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder));
// Insert operation provided by caller did not contain collection uuid but applyOps() should add
// the uuid to the oplog entry.
@@ -330,7 +330,6 @@ TEST_F(ApplyOpsTest, ApplyOpsPropagatesOplogApplicationMode) {
nss.coll().toString(),
cmdObj,
OplogApplication::Mode::kInitialSync,
- {},
&resultBuilder));
ASSERT_EQUALS(1, countLogLinesContaining("oplog application mode: InitialSync"));
@@ -341,7 +340,6 @@ TEST_F(ApplyOpsTest, ApplyOpsPropagatesOplogApplicationMode) {
nss.coll().toString(),
cmdObj,
OplogApplication::Mode::kSecondary,
- {},
&resultBuilder));
ASSERT_EQUALS(1, countLogLinesContaining("oplog application mode: Secondary"));
@@ -504,7 +502,7 @@ TEST_F(ApplyOpsTest, ApplyOpsFailsToDropAdmin) {
auto dropDatabaseCmdObj = BSON("applyOps" << BSON_ARRAY(dropDatabaseOp));
BSONObjBuilder resultBuilder;
auto status =
- applyOps(opCtx.get(), nss.db().toString(), dropDatabaseCmdObj, mode, {}, &resultBuilder);
+ applyOps(opCtx.get(), nss.db().toString(), dropDatabaseCmdObj, mode, &resultBuilder);
ASSERT_EQUALS(ErrorCodes::IllegalOperation, status);
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 9dad9bcacc7..cb38db43b73 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -977,8 +977,7 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode) -> Status {
- BSONObjBuilder resultWeDontCareAbout;
- return applyOps(opCtx, nsToDatabase(ns), cmd, mode, opTime, &resultWeDontCareAbout);
+ return applyApplyOpsOplogEntry(opCtx, entry, mode);
}}},
{"convertToCapped",
{[](OperationContext* opCtx,
@@ -1018,17 +1017,7 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode) -> Status {
- // We don't put transactions into the prepare state until the end of recovery, so there is
- // no transaction to abort.
- if (mode == OplogApplication::Mode::kRecovering) {
- return Status::OK();
- }
- // Session has been checked out by sync_tail.
- auto transaction = TransactionParticipant::get(opCtx);
- invariant(transaction);
- transaction->unstashTransactionResources(opCtx, "abortTransaction");
- transaction->abortActiveTransaction(opCtx);
- return Status::OK();
+ return TransactionParticipant::applyAbortTransaction(opCtx, entry, mode);
}}},
};
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 017d66adf92..f67c3e005ec 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -56,7 +56,6 @@
#include "mongo/db/logical_session_id.h"
#include "mongo/db/multi_key_path_tracker.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/operation_context_session_mongod.h"
#include "mongo/db/query/query_knobs.h"
#include "mongo/db/repl/applier_helpers.h"
#include "mongo/db/repl/apply_ops.h"
@@ -1174,22 +1173,6 @@ Status multiSyncApply(OperationContext* opCtx,
// If we didn't create a group, try to apply the op individually.
try {
- // The write on transaction table may be applied concurrently, so refreshing state
- // from disk may read that write, causing starting a new transaction on an existing
- // txnNumber. Thus, we start a new transaction without refreshing state from disk.
- boost::optional<OperationContextSessionMongodWithoutRefresh> sessionTxnState;
- if (entry.shouldPrepare() ||
- entry.getCommandType() == OplogEntry::CommandType::kAbortTransaction) {
- // The update on transaction table may be scheduled to the same writer.
- invariant(ops->size() <= 2);
- // Transaction operations are in its own batch, so we can modify their opCtx.
- invariant(entry.getSessionId());
- invariant(entry.getTxnNumber());
- opCtx->setLogicalSessionId(*entry.getSessionId());
- opCtx->setTxnNumber(*entry.getTxnNumber());
- // Check out the session, with autoCommit = false and startMultiDocTxn = true.
- sessionTxnState.emplace(opCtx);
- }
const Status status = SyncTail::syncApply(opCtx, entry.raw, oplogApplicationMode);
if (!status.isOK()) {
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 1a658477cb9..5f4d47fc0f9 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/op_observer.h"
+#include "mongo/db/operation_context_session_mongod.h"
#include "mongo/db/ops/update.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/repl/repl_client_info.h"
@@ -294,6 +295,40 @@ const LogicalSessionId& TransactionParticipant::_sessionId() const {
return owningSession->getSessionId();
}
+Status TransactionParticipant::applyAbortTransaction(OperationContext* opCtx,
+ const repl::OplogEntry& entry,
+ repl::OplogApplication::Mode mode) {
+ // We don't put transactions into the prepare state until the end of recovery, so there is
+ // no transaction to abort.
+ if (mode == repl::OplogApplication::Mode::kRecovering) {
+ return Status::OK();
+ }
+
+ // Return error if run via applyOps command.
+ uassert(50972,
+ "abortTransaction is only used internally by secondaries.",
+ mode != repl::OplogApplication::Mode::kApplyOpsCmd);
+
+ // TODO: SERVER-36492 Only run on secondary until we support initial sync.
+ invariant(mode == repl::OplogApplication::Mode::kSecondary);
+
+ // Transaction operations are in its own batch, so we can modify their opCtx.
+ invariant(entry.getSessionId());
+ invariant(entry.getTxnNumber());
+ opCtx->setLogicalSessionId(*entry.getSessionId());
+ opCtx->setTxnNumber(*entry.getTxnNumber());
+ // The write on transaction table may be applied concurrently, so refreshing state
+ // from disk may read that write, causing starting a new transaction on an existing
+ // txnNumber. Thus, we start a new transaction without refreshing state from disk.
+ OperationContextSessionMongodWithoutRefresh sessionCheckout(opCtx);
+
+ auto transaction = TransactionParticipant::get(opCtx);
+ transaction->unstashTransactionResources(opCtx, "abortTransaction");
+ transaction->abortActiveTransaction(opCtx);
+ return Status::OK();
+}
+
+
void TransactionParticipant::_beginOrContinueRetryableWrite(WithLock wl, TxnNumber txnNumber) {
if (txnNumber > _activeTxnNumber) {
// New retryable write.
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 29e9b3e8bfe..18048474617 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -161,6 +161,13 @@ public:
static TransactionParticipant* getFromNonCheckedOutSession(Session* session);
/**
+ * Apply `abortTransaction` oplog entry.
+ */
+ static Status applyAbortTransaction(OperationContext* opCtx,
+ const repl::OplogEntry& entry,
+ repl::OplogApplication::Mode mode);
+
+ /**
* Kills the transaction if it is running, ensuring that it releases all resources, even if the
* transaction is in prepare(). Avoids writing any oplog entries or making any changes to the
* transaction table. State for prepared transactions will be re-constituted at startup.
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 03e9d193e22..018720d4938 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -289,7 +289,6 @@ public:
dbName,
BSON("applyOps" << applyOpsList),
repl::OplogApplication::Mode::kApplyOpsCmd,
- {},
&result);
if (!status.isOK()) {
return status;
@@ -308,7 +307,6 @@ public:
dbName,
BSON("applyOps" << applyOpsList << "allowAtomic" << false),
repl::OplogApplication::Mode::kApplyOpsCmd,
- {},
&result);
if (!status.isOK()) {
return status;
@@ -649,7 +647,6 @@ public:
<< "o"
<< BSON("applyOps" << BSONArrayBuilder().obj())))),
repl::OplogApplication::Mode::kApplyOpsCmd,
- {},
&result));
}