summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction
diff options
context:
space:
mode:
authorseanzimm <sean.zimmerman@mongodb.com>2023-04-26 17:07:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-26 19:18:17 +0000
commit1524074b3f8eca426596a07271d88b94cc0094e6 (patch)
tree794dcfad1b24d71aa515ab14364233a041aff452 /src/mongo/db/transaction
parent5ac6f19876a0adf6bf5c7ac1e5eb5b8b7fcc531d (diff)
downloadmongo-1524074b3f8eca426596a07271d88b94cc0094e6.tar.gz
SERVER-72991: Enhance Internal Transactions API for bulkWrite
Diffstat (limited to 'src/mongo/db/transaction')
-rw-r--r--src/mongo/db/transaction/transaction_api.cpp42
-rw-r--r--src/mongo/db/transaction/transaction_api.h14
-rw-r--r--src/mongo/db/transaction/transaction_api_test.cpp18
3 files changed, 73 insertions, 1 deletions
diff --git a/src/mongo/db/transaction/transaction_api.cpp b/src/mongo/db/transaction/transaction_api.cpp
index 6c3ed0f9c13..f772f437bcf 100644
--- a/src/mongo/db/transaction/transaction_api.cpp
+++ b/src/mongo/db/transaction/transaction_api.cpp
@@ -55,6 +55,7 @@
#include "mongo/db/write_concern_options.h"
#include "mongo/executor/inline_executor.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/idl/idl_parser.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/factory.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -521,6 +522,43 @@ BatchedCommandResponse SEPTransactionClient::runCRUDOpSync(const BatchedCommandR
return std::move(result).get();
}
+ExecutorFuture<BulkWriteCommandReply> SEPTransactionClient::_runCRUDOp(
+ const BulkWriteCommandRequest& cmd) const {
+ BSONObjBuilder cmdBob(cmd.toBSON(BSONObj()));
+ // BulkWrite can only execute on admin DB.
+ return runCommand(DatabaseName::kAdmin, cmdBob.obj())
+ .thenRunOn(_executor)
+ .then([](BSONObj reply) {
+ uassertStatusOK(getStatusFromWriteCommandReply(reply));
+
+ IDLParserContext ctx("BulkWriteCommandReplyParse");
+ auto response = BulkWriteCommandReply::parse(ctx, reply);
+ return response;
+ });
+}
+
+SemiFuture<BulkWriteCommandReply> SEPTransactionClient::runCRUDOp(
+ const BulkWriteCommandRequest& cmd) const {
+ return _runCRUDOp(cmd).semi();
+}
+
+BulkWriteCommandReply SEPTransactionClient::runCRUDOpSync(
+ const BulkWriteCommandRequest& cmd) const {
+
+ Notification<void> mayReturn;
+
+ auto result =
+ _runCRUDOp(cmd)
+ .unsafeToInlineFuture()
+ // Use tap and tapError instead of tapAll since tapAll is not move-only type friendly
+ .tap([&](auto&&) { mayReturn.set(); })
+ .tapError([&](auto&&) { mayReturn.set(); });
+
+ runFutureInline(_inlineExecutor.get(), mayReturn);
+
+ return std::move(result).get();
+}
+
ExecutorFuture<std::vector<BSONObj>> SEPTransactionClient::_exhaustiveFind(
const FindCommandRequest& cmd) const {
return runCommand(cmd.getDbName(), cmd.toBSON({}))
@@ -816,7 +854,9 @@ void Transaction::prepareRequest(BSONObjBuilder* cmdBuilder) {
!isRetryableWriteCommand(
cmdBuilder->asTempObj().firstElement().fieldNameStringData()) ||
(cmdBuilder->hasField(write_ops::WriteCommandRequestBase::kStmtIdsFieldName) ||
- cmdBuilder->hasField(write_ops::WriteCommandRequestBase::kStmtIdFieldName)),
+ cmdBuilder->hasField(write_ops::WriteCommandRequestBase::kStmtIdFieldName)) ||
+ (cmdBuilder->hasField(BulkWriteCommandRequest::kStmtIdFieldName) ||
+ cmdBuilder->hasField(BulkWriteCommandRequest::kStmtIdsFieldName)),
str::stream()
<< "In a retryable write transaction every retryable write command should have an "
"explicit statement id, command: "
diff --git a/src/mongo/db/transaction/transaction_api.h b/src/mongo/db/transaction/transaction_api.h
index ff5cb5aa335..d754ab7390f 100644
--- a/src/mongo/db/transaction/transaction_api.h
+++ b/src/mongo/db/transaction/transaction_api.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/db/cancelable_operation_context.h"
+#include "mongo/db/commands/bulk_write_gen.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/find_command_gen.h"
#include "mongo/db/resource_yielder.h"
@@ -129,6 +130,13 @@ public:
std::vector<StmtId> stmtIds) const = 0;
/**
+ * Helper method to run BulkWriteCommandRequest in the transaction client's transaction.
+ */
+ virtual SemiFuture<BulkWriteCommandReply> runCRUDOp(
+ const BulkWriteCommandRequest& cmd) const = 0;
+ virtual BulkWriteCommandReply runCRUDOpSync(const BulkWriteCommandRequest& cmd) const = 0;
+
+ /**
* Helper method that runs the given find in the transaction client's transaction and will
* iterate and exhaust the find's cursor, returning a vector with all matching documents.
*/
@@ -291,6 +299,10 @@ public:
virtual BatchedCommandResponse runCRUDOpSync(const BatchedCommandRequest& cmd,
std::vector<StmtId> stmtIds) const override;
+ virtual SemiFuture<BulkWriteCommandReply> runCRUDOp(
+ const BulkWriteCommandRequest& cmd) const override;
+ virtual BulkWriteCommandReply runCRUDOpSync(const BulkWriteCommandRequest& cmd) const override;
+
virtual SemiFuture<std::vector<BSONObj>> exhaustiveFind(
const FindCommandRequest& cmd) const override;
virtual std::vector<BSONObj> exhaustiveFindSync(const FindCommandRequest& cmd) const override;
@@ -309,6 +321,8 @@ private:
ExecutorFuture<BatchedCommandResponse> _runCRUDOp(const BatchedCommandRequest& cmd,
std::vector<StmtId> stmtIds) const;
+ ExecutorFuture<BulkWriteCommandReply> _runCRUDOp(const BulkWriteCommandRequest& cmd) const;
+
ExecutorFuture<std::vector<BSONObj>> _exhaustiveFind(const FindCommandRequest& cmd) const;
private:
diff --git a/src/mongo/db/transaction/transaction_api_test.cpp b/src/mongo/db/transaction/transaction_api_test.cpp
index d1bd94b067d..b14b6918644 100644
--- a/src/mongo/db/transaction/transaction_api_test.cpp
+++ b/src/mongo/db/transaction/transaction_api_test.cpp
@@ -204,6 +204,15 @@ public:
MONGO_UNREACHABLE;
}
+ virtual SemiFuture<BulkWriteCommandReply> runCRUDOp(
+ const BulkWriteCommandRequest& cmd) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ virtual BulkWriteCommandReply runCRUDOpSync(const BulkWriteCommandRequest& cmd) const override {
+ MONGO_UNREACHABLE;
+ }
+
virtual bool supportsClientTransactionContext() const override {
return true;
}
@@ -505,6 +514,15 @@ public:
MONGO_UNREACHABLE;
}
+ virtual SemiFuture<BulkWriteCommandReply> runCRUDOp(
+ const BulkWriteCommandRequest& cmd) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ virtual BulkWriteCommandReply runCRUDOpSync(const BulkWriteCommandRequest& cmd) const override {
+ MONGO_UNREACHABLE;
+ }
+
virtual SemiFuture<std::vector<BSONObj>> exhaustiveFind(
const FindCommandRequest& cmd) const override {
MONGO_UNREACHABLE;