summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/global_index/global_index_inserter.cpp76
-rw-r--r--src/mongo/db/transaction/transaction_api.cpp27
-rw-r--r--src/mongo/db/transaction/transaction_api.h15
-rw-r--r--src/mongo/db/transaction/transaction_api_test.cpp165
4 files changed, 244 insertions, 39 deletions
diff --git a/src/mongo/db/s/global_index/global_index_inserter.cpp b/src/mongo/db/s/global_index/global_index_inserter.cpp
index 673810360cd..93cbbb20049 100644
--- a/src/mongo/db/s/global_index/global_index_inserter.cpp
+++ b/src/mongo/db/s/global_index/global_index_inserter.cpp
@@ -64,43 +64,45 @@ NamespaceString GlobalIndexInserter::_skipIdNss() {
void GlobalIndexInserter::processDoc(OperationContext* opCtx,
const BSONObj& indexKeyValues,
const BSONObj& documentKey) {
- auto insertToGlobalIndexFn =
- [this, service = opCtx->getServiceContext(), indexKeyValues, documentKey](
- const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
- FindCommandRequest skipIdQuery(_skipIdNss());
- skipIdQuery.setFilter(BSON("_id" << documentKey));
- skipIdQuery.setLimit(1);
-
- return txnClient.exhaustiveFind(skipIdQuery)
- .thenRunOn(txnExec)
- .then([this, service, indexKeyValues, documentKey, &txnClient, txnExec](
- const auto& skipIdDocResults) {
- auto client = service->makeClient("globalIndexInserter");
- auto opCtx = service->makeOperationContext(client.get());
- globalIndexInserterPauseAfterReadingSkipCollection.pauseWhileSet(opCtx.get());
-
- if (!skipIdDocResults.empty()) {
- return SemiFuture<void>::makeReady();
- }
-
- InsertGlobalIndexKey globalIndexEntryInsert(_indexUUID);
- // Note: dbName is unused by command but required by idl.
- globalIndexEntryInsert.setDbName(DatabaseName::kAdmin);
- globalIndexEntryInsert.setGlobalIndexKeyEntry(
- GlobalIndexKeyEntry(indexKeyValues, documentKey));
-
- return txnClient.runCommand(_nss.dbName(), globalIndexEntryInsert.toBSON({}))
- .thenRunOn(txnExec)
- .then([this, documentKey, &txnClient](const auto& commandResponse) {
- write_ops::InsertCommandRequest skipIdInsert(_skipIdNss());
-
- skipIdInsert.setDocuments({BSON("_id" << documentKey)});
- return txnClient.runCRUDOp({skipIdInsert}, {}).ignoreValue();
- })
- .semi();
- })
- .semi();
- };
+ auto insertToGlobalIndexFn = [this,
+ service = opCtx->getServiceContext(),
+ indexKeyValues,
+ documentKey](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
+ FindCommandRequest skipIdQuery(_skipIdNss());
+ skipIdQuery.setFilter(BSON("_id" << documentKey));
+ skipIdQuery.setLimit(1);
+
+ return txnClient.exhaustiveFind(skipIdQuery)
+ .thenRunOn(txnExec)
+ .then([this, service, indexKeyValues, documentKey, &txnClient, txnExec](
+ const auto& skipIdDocResults) {
+ auto client = service->makeClient("globalIndexInserter");
+ auto opCtx = service->makeOperationContext(client.get());
+ globalIndexInserterPauseAfterReadingSkipCollection.pauseWhileSet(opCtx.get());
+
+ if (!skipIdDocResults.empty()) {
+ return SemiFuture<void>::makeReady();
+ }
+
+ InsertGlobalIndexKey globalIndexEntryInsert(_indexUUID);
+ // Note: dbName is unused by command but required by idl.
+ globalIndexEntryInsert.setDbName(DatabaseName::kAdmin);
+ globalIndexEntryInsert.setGlobalIndexKeyEntry(
+ GlobalIndexKeyEntry(indexKeyValues, documentKey));
+
+ return txnClient.runCommandChecked(_nss.dbName(), globalIndexEntryInsert.toBSON({}))
+ .thenRunOn(txnExec)
+ .then([this, documentKey, &txnClient](const auto& commandResponse) {
+ write_ops::InsertCommandRequest skipIdInsert(_skipIdNss());
+
+ skipIdInsert.setDocuments({BSON("_id" << documentKey)});
+ return txnClient.runCRUDOp({skipIdInsert}, {}).ignoreValue();
+ })
+ .semi();
+ })
+ .semi();
+ };
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
txn_api::SyncTransactionWithRetries txn(opCtx, _executor, nullptr, inlineExecutor);
diff --git a/src/mongo/db/transaction/transaction_api.cpp b/src/mongo/db/transaction/transaction_api.cpp
index 17e50980c99..a2854b6dbf7 100644
--- a/src/mongo/db/transaction/transaction_api.cpp
+++ b/src/mongo/db/transaction/transaction_api.cpp
@@ -452,9 +452,10 @@ ExecutorFuture<BSONObj> SEPTransactionClient::_runCommand(const DatabaseName& db
return _behaviors->handleRequest(cancellableOpCtx.get(), requestMessage)
.thenRunOn(_executor)
.then([this](DbResponse dbResponse) {
+ // NOTE: The API uses this method to run commit and abort, so be careful about adding
+ // new logic here to ensure it cannot interfere with error handling for either command.
auto reply = rpc::makeReply(&dbResponse.response)->getCommandReply().getOwned();
_hooks->runReplyHook(reply);
- uassertStatusOK(getStatusFromCommandResult(reply));
return reply;
});
}
@@ -475,6 +476,30 @@ SemiFuture<BSONObj> SEPTransactionClient::runCommand(const DatabaseName& dbName,
return _runCommand(dbName, cmdObj).semi();
}
+ExecutorFuture<BSONObj> SEPTransactionClient::_runCommandChecked(const DatabaseName& dbName,
+ BSONObj cmdObj) const {
+ return _runCommand(dbName, cmdObj).then([](BSONObj reply) {
+ uassertStatusOK(getStatusFromCommandResult(reply));
+ return reply;
+ });
+}
+
+SemiFuture<BSONObj> SEPTransactionClient::runCommandChecked(const DatabaseName& dbName,
+ BSONObj cmdObj) const {
+ return _runCommandChecked(dbName, cmdObj).semi();
+}
+
+BSONObj SEPTransactionClient::runCommandCheckedSync(const DatabaseName& dbName,
+ BSONObj cmdObj) const {
+ Notification<void> mayReturn;
+ auto result = _runCommandChecked(dbName, cmdObj).unsafeToInlineFuture().tapAll([&](auto&&) {
+ mayReturn.set();
+ });
+ runFutureInline(_inlineExecutor.get(), mayReturn);
+
+ return std::move(result).get();
+}
+
ExecutorFuture<BatchedCommandResponse> SEPTransactionClient::_runCRUDOp(
const BatchedCommandRequest& cmd, std::vector<StmtId> stmtIds) const {
invariant(!stmtIds.size() || (cmd.sizeWriteOps() == stmtIds.size()),
diff --git a/src/mongo/db/transaction/transaction_api.h b/src/mongo/db/transaction/transaction_api.h
index 323a4af98f9..28367668175 100644
--- a/src/mongo/db/transaction/transaction_api.h
+++ b/src/mongo/db/transaction/transaction_api.h
@@ -100,8 +100,15 @@ public:
* Runs the given command as part of the transaction that owns this transaction client.
*/
virtual SemiFuture<BSONObj> runCommand(const DatabaseName& dbName, BSONObj cmd) const = 0;
-
virtual BSONObj runCommandSync(const DatabaseName& dbName, BSONObj cmd) const = 0;
+
+ /**
+ * Same as runCommand but will assert the command status is ok.
+ */
+ virtual SemiFuture<BSONObj> runCommandChecked(const DatabaseName& dbName,
+ BSONObj cmd) const = 0;
+ virtual BSONObj runCommandCheckedSync(const DatabaseName& dbName, BSONObj cmd) const = 0;
+
/**
* Helper method to run commands representable as a BatchedCommandRequest in the transaction
* client's transaction.
@@ -298,6 +305,10 @@ public:
virtual SemiFuture<BSONObj> runCommand(const DatabaseName& dbName, BSONObj cmd) const override;
virtual BSONObj runCommandSync(const DatabaseName& dbName, BSONObj cmd) const override;
+ virtual SemiFuture<BSONObj> runCommandChecked(const DatabaseName& dbName,
+ BSONObj cmd) const override;
+ virtual BSONObj runCommandCheckedSync(const DatabaseName& dbName, BSONObj cmd) const override;
+
virtual SemiFuture<BatchedCommandResponse> runCRUDOp(
const BatchedCommandRequest& cmd, std::vector<StmtId> stmtIds) const override;
virtual BatchedCommandResponse runCRUDOpSync(const BatchedCommandRequest& cmd,
@@ -322,6 +333,8 @@ public:
private:
ExecutorFuture<BSONObj> _runCommand(const DatabaseName& dbName, BSONObj cmd) const;
+ ExecutorFuture<BSONObj> _runCommandChecked(const DatabaseName& dbName, BSONObj cmd) const;
+
ExecutorFuture<BatchedCommandResponse> _runCRUDOp(const BatchedCommandRequest& cmd,
std::vector<StmtId> stmtIds) const;
diff --git a/src/mongo/db/transaction/transaction_api_test.cpp b/src/mongo/db/transaction/transaction_api_test.cpp
index b9290d0cd86..4bdbb16a141 100644
--- a/src/mongo/db/transaction/transaction_api_test.cpp
+++ b/src/mongo/db/transaction/transaction_api_test.cpp
@@ -80,6 +80,19 @@ const BSONObj kRetryableWriteConcernError =
const BSONObj kResWithRetryableWriteConcernError =
BSON("ok" << 1 << "writeConcernError" << kRetryableWriteConcernError);
+const BSONObj kResWithTransientCommitErrorAndRetryableWriteConcernError =
+ BSON("ok" << 0 << "code" << ErrorCodes::LockTimeout << kErrorLabelsFieldName
+ << BSON_ARRAY(ErrorLabel::kTransientTransaction) << "writeConcernError"
+ << kRetryableWriteConcernError);
+
+const BSONObj kResWithNonTransientCommitErrorAndRetryableWriteConcernError =
+ BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction << "writeConcernError"
+ << kRetryableWriteConcernError);
+
+const BSONObj kResWithNonTransientCommitErrorAndNonRetryableWriteConcernError =
+ BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction << "writeConcernError"
+ << kWriteConcernError);
+
class MockResourceYielder : public ResourceYielder {
public:
void yield(OperationContext*) {
@@ -194,6 +207,15 @@ public:
MONGO_UNREACHABLE;
}
+ virtual BSONObj runCommandCheckedSync(const DatabaseName& dbName, BSONObj cmd) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ virtual SemiFuture<BSONObj> runCommandChecked(const DatabaseName& dbName,
+ BSONObj cmd) const override {
+ MONGO_UNREACHABLE;
+ }
+
virtual SemiFuture<BatchedCommandResponse> runCRUDOp(
const BatchedCommandRequest& cmd, std::vector<StmtId> stmtIds) const override {
MONGO_UNREACHABLE;
@@ -502,6 +524,15 @@ public:
MONGO_UNREACHABLE;
}
+ virtual BSONObj runCommandCheckedSync(const DatabaseName& dbName, BSONObj cmd) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ virtual SemiFuture<BSONObj> runCommandChecked(const DatabaseName& dbName,
+ BSONObj cmd) const override {
+ MONGO_UNREACHABLE;
+ }
+
virtual SemiFuture<BatchedCommandResponse> runCRUDOp(
const BatchedCommandRequest& cmd, std::vector<StmtId> stmtIds) const override {
MONGO_UNREACHABLE;
@@ -1070,6 +1101,49 @@ TEST_F(TxnAPITest, OwnSession_CommitError) {
ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd);
}
+TEST_F(TxnAPITest, DoesNotRetryOnNonTransientCommitErrorWithNonRetryableCommitWCError) {
+ auto swResult = txnWithRetries().runNoThrow(
+ opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ mockClient()->setNextCommandResponse(kOKInsertResponse);
+ auto insertRes =
+ txnClient
+ .runCommand(DatabaseName::createDatabaseName_forTest(boost::none, "user"_sd),
+ BSON("insert"
+ << "foo"
+ << "documents" << BSON_ARRAY(BSON("x" << 1))))
+ .get();
+ ASSERT_OK(getStatusFromWriteCommandReply(insertRes));
+
+ ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
+ assertTxnMetadata(
+ mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
+ assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone);
+
+ // The commit response.
+ mockClient()->setNextCommandResponse(
+ kResWithNonTransientCommitErrorAndNonRetryableWriteConcernError);
+
+ return SemiFuture<void>::makeReady();
+ });
+ ASSERT(swResult.getStatus().isOK());
+ ASSERT_EQ(swResult.getValue().cmdStatus, ErrorCodes::NoSuchTransaction);
+ ASSERT_EQ(swResult.getValue().wcError.toStatus(), ErrorCodes::WriteConcernFailed);
+ ASSERT_EQ(swResult.getValue().getEffectiveStatus(), ErrorCodes::NoSuchTransaction);
+
+ ASSERT_EQ(1, InternalTransactionMetrics::get(opCtx())->getStarted());
+ ASSERT_EQ(0, InternalTransactionMetrics::get(opCtx())->getRetriedTransactions());
+ ASSERT_EQ(0, InternalTransactionMetrics::get(opCtx())->getRetriedCommits());
+ ASSERT_EQ(0, InternalTransactionMetrics::get(opCtx())->getSucceeded());
+
+ auto lastRequest = mockClient()->getLastSentRequest();
+ assertTxnMetadata(lastRequest,
+ 0 /* txnNumber */,
+ boost::none /* startTransaction */,
+ boost::none /* readConcern */,
+ WriteConcernOptions().toBSON() /* writeConcern */);
+ ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd);
+}
+
TEST_F(TxnAPITest, OwnSession_TransientCommitError) {
int attempt = -1;
auto swResult = txnWithRetries().runNoThrow(
@@ -1243,6 +1317,97 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) {
ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd);
}
+TEST_F(TxnAPITest, RetriesOnNonTransientCommitWithErrorRetryableCommitWCError) {
+ auto swResult = txnWithRetries().runNoThrow(
+ opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ mockClient()->setNextCommandResponse(kOKInsertResponse);
+ auto insertRes =
+ txnClient
+ .runCommand(DatabaseName::createDatabaseName_forTest(boost::none, "user"_sd),
+ BSON("insert"
+ << "foo"
+ << "documents" << BSON_ARRAY(BSON("x" << 1))))
+ .get();
+ ASSERT_OK(getStatusFromWriteCommandReply(insertRes));
+
+ ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
+ assertTxnMetadata(
+ mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
+ assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone);
+
+ // The commit responses.
+ mockClient()->setNextCommandResponse(
+ kResWithNonTransientCommitErrorAndRetryableWriteConcernError);
+ mockClient()->setNextCommandResponse(kOKCommandResponse);
+ return SemiFuture<void>::makeReady();
+ });
+ ASSERT(swResult.getStatus().isOK());
+ ASSERT(swResult.getValue().getEffectiveStatus().isOK());
+
+ ASSERT_EQ(1, InternalTransactionMetrics::get(opCtx())->getStarted());
+ ASSERT_EQ(0, InternalTransactionMetrics::get(opCtx())->getRetriedTransactions());
+ ASSERT_EQ(1, InternalTransactionMetrics::get(opCtx())->getRetriedCommits());
+ ASSERT_EQ(1, InternalTransactionMetrics::get(opCtx())->getSucceeded());
+
+ auto lastRequest = mockClient()->getLastSentRequest();
+ assertTxnMetadata(lastRequest,
+ 0 /* txnNumber */,
+ boost::none /* startTransaction */,
+ boost::none /* readConcern */,
+ CommandHelpers::kMajorityWriteConcern.toBSON());
+ assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone);
+ ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd);
+}
+
+TEST_F(TxnAPITest, RetriesOnTransientCommitErrorWithRetryableWCError) {
+ int attempt = -1;
+ auto swResult = txnWithRetries().runNoThrow(
+ opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ attempt += 1;
+ mockClient()->setNextCommandResponse(kOKInsertResponse);
+ auto insertRes =
+ txnClient
+ .runCommand(DatabaseName::createDatabaseName_forTest(boost::none, "user"_sd),
+ BSON("insert"
+ << "foo"
+ << "documents" << BSON_ARRAY(BSON("x" << 1))))
+ .get();
+ ASSERT_OK(getStatusFromWriteCommandReply(insertRes));
+
+ ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
+ assertTxnMetadata(mockClient()->getLastSentRequest(),
+ attempt /* txnNumber */,
+ true /* startTransaction */);
+ assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone);
+
+ // Set commit response. Initial commit response is a transient txn error so the
+ // transaction retries.
+ if (attempt == 0) {
+ mockClient()->setNextCommandResponse(
+ kResWithTransientCommitErrorAndRetryableWriteConcernError);
+ } else {
+ mockClient()->setNextCommandResponse(kOKCommandResponse);
+ }
+ return SemiFuture<void>::makeReady();
+ });
+ ASSERT(swResult.getStatus().isOK());
+ ASSERT(swResult.getValue().getEffectiveStatus().isOK());
+
+ ASSERT_EQ(1, InternalTransactionMetrics::get(opCtx())->getStarted());
+ ASSERT_EQ(1, InternalTransactionMetrics::get(opCtx())->getRetriedTransactions());
+ ASSERT_EQ(0, InternalTransactionMetrics::get(opCtx())->getRetriedCommits());
+ ASSERT_EQ(1, InternalTransactionMetrics::get(opCtx())->getSucceeded());
+
+ auto lastRequest = mockClient()->getLastSentRequest();
+ assertTxnMetadata(lastRequest,
+ attempt /* txnNumber */,
+ boost::none /* startTransaction */,
+ boost::none /* readConcern */,
+ WriteConcernOptions().toBSON() /* writeConcern */);
+ assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone);
+ ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd);
+}
+
TEST_F(TxnAPITest, RunNoErrors) {
txnWithRetries().run(opCtx(),
[&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {