diff options
author | seanzimm <sean.zimmerman@mongodb.com> | 2023-04-26 17:07:23 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-26 19:18:17 +0000 |
commit | 1524074b3f8eca426596a07271d88b94cc0094e6 (patch) | |
tree | 794dcfad1b24d71aa515ab14364233a041aff452 /src/mongo/db/transaction | |
parent | 5ac6f19876a0adf6bf5c7ac1e5eb5b8b7fcc531d (diff) | |
download | mongo-1524074b3f8eca426596a07271d88b94cc0094e6.tar.gz |
SERVER-72991: Enhance Internal Transactions API for bulkWrite
Diffstat (limited to 'src/mongo/db/transaction')
-rw-r--r-- | src/mongo/db/transaction/transaction_api.cpp | 42 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_api.h | 14 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_api_test.cpp | 18 |
3 files changed, 73 insertions, 1 deletions
diff --git a/src/mongo/db/transaction/transaction_api.cpp b/src/mongo/db/transaction/transaction_api.cpp index 6c3ed0f9c13..f772f437bcf 100644 --- a/src/mongo/db/transaction/transaction_api.cpp +++ b/src/mongo/db/transaction/transaction_api.cpp @@ -55,6 +55,7 @@ #include "mongo/db/write_concern_options.h" #include "mongo/executor/inline_executor.h" #include "mongo/executor/task_executor.h" +#include "mongo/idl/idl_parser.h" #include "mongo/logv2/log.h" #include "mongo/rpc/factory.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -521,6 +522,43 @@ BatchedCommandResponse SEPTransactionClient::runCRUDOpSync(const BatchedCommandR return std::move(result).get(); } +ExecutorFuture<BulkWriteCommandReply> SEPTransactionClient::_runCRUDOp( + const BulkWriteCommandRequest& cmd) const { + BSONObjBuilder cmdBob(cmd.toBSON(BSONObj())); + // BulkWrite can only execute on admin DB. + return runCommand(DatabaseName::kAdmin, cmdBob.obj()) + .thenRunOn(_executor) + .then([](BSONObj reply) { + uassertStatusOK(getStatusFromWriteCommandReply(reply)); + + IDLParserContext ctx("BulkWriteCommandReplyParse"); + auto response = BulkWriteCommandReply::parse(ctx, reply); + return response; + }); +} + +SemiFuture<BulkWriteCommandReply> SEPTransactionClient::runCRUDOp( + const BulkWriteCommandRequest& cmd) const { + return _runCRUDOp(cmd).semi(); +} + +BulkWriteCommandReply SEPTransactionClient::runCRUDOpSync( + const BulkWriteCommandRequest& cmd) const { + + Notification<void> mayReturn; + + auto result = + _runCRUDOp(cmd) + .unsafeToInlineFuture() + // Use tap and tapError instead of tapAll since tapAll is not move-only type friendly + .tap([&](auto&&) { mayReturn.set(); }) + .tapError([&](auto&&) { mayReturn.set(); }); + + runFutureInline(_inlineExecutor.get(), mayReturn); + + return std::move(result).get(); +} + ExecutorFuture<std::vector<BSONObj>> SEPTransactionClient::_exhaustiveFind( const FindCommandRequest& cmd) const { return runCommand(cmd.getDbName(), cmd.toBSON({})) @@ -816,7 +854,9 @@ void Transaction::prepareRequest(BSONObjBuilder* cmdBuilder) { !isRetryableWriteCommand( cmdBuilder->asTempObj().firstElement().fieldNameStringData()) || (cmdBuilder->hasField(write_ops::WriteCommandRequestBase::kStmtIdsFieldName) || - cmdBuilder->hasField(write_ops::WriteCommandRequestBase::kStmtIdFieldName)), + cmdBuilder->hasField(write_ops::WriteCommandRequestBase::kStmtIdFieldName)) || + (cmdBuilder->hasField(BulkWriteCommandRequest::kStmtIdFieldName) || + cmdBuilder->hasField(BulkWriteCommandRequest::kStmtIdsFieldName)), str::stream() << "In a retryable write transaction every retryable write command should have an " "explicit statement id, command: " diff --git a/src/mongo/db/transaction/transaction_api.h b/src/mongo/db/transaction/transaction_api.h index ff5cb5aa335..d754ab7390f 100644 --- a/src/mongo/db/transaction/transaction_api.h +++ b/src/mongo/db/transaction/transaction_api.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/db/cancelable_operation_context.h" +#include "mongo/db/commands/bulk_write_gen.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/find_command_gen.h" #include "mongo/db/resource_yielder.h" @@ -129,6 +130,13 @@ public: std::vector<StmtId> stmtIds) const = 0; /** + * Helper method to run BulkWriteCommandRequest in the transaction client's transaction. + */ + virtual SemiFuture<BulkWriteCommandReply> runCRUDOp( + const BulkWriteCommandRequest& cmd) const = 0; + virtual BulkWriteCommandReply runCRUDOpSync(const BulkWriteCommandRequest& cmd) const = 0; + + /** * Helper method that runs the given find in the transaction client's transaction and will * iterate and exhaust the find's cursor, returning a vector with all matching documents. */ @@ -291,6 +299,10 @@ public: virtual BatchedCommandResponse runCRUDOpSync(const BatchedCommandRequest& cmd, std::vector<StmtId> stmtIds) const override; + virtual SemiFuture<BulkWriteCommandReply> runCRUDOp( + const BulkWriteCommandRequest& cmd) const override; + virtual BulkWriteCommandReply runCRUDOpSync(const BulkWriteCommandRequest& cmd) const override; + virtual SemiFuture<std::vector<BSONObj>> exhaustiveFind( const FindCommandRequest& cmd) const override; virtual std::vector<BSONObj> exhaustiveFindSync(const FindCommandRequest& cmd) const override; @@ -309,6 +321,8 @@ private: ExecutorFuture<BatchedCommandResponse> _runCRUDOp(const BatchedCommandRequest& cmd, std::vector<StmtId> stmtIds) const; + ExecutorFuture<BulkWriteCommandReply> _runCRUDOp(const BulkWriteCommandRequest& cmd) const; + ExecutorFuture<std::vector<BSONObj>> _exhaustiveFind(const FindCommandRequest& cmd) const; private: diff --git a/src/mongo/db/transaction/transaction_api_test.cpp b/src/mongo/db/transaction/transaction_api_test.cpp index d1bd94b067d..b14b6918644 100644 --- a/src/mongo/db/transaction/transaction_api_test.cpp +++ b/src/mongo/db/transaction/transaction_api_test.cpp @@ -204,6 +204,15 @@ public: MONGO_UNREACHABLE; } + virtual SemiFuture<BulkWriteCommandReply> runCRUDOp( + const BulkWriteCommandRequest& cmd) const override { + MONGO_UNREACHABLE; + } + + virtual BulkWriteCommandReply runCRUDOpSync(const BulkWriteCommandRequest& cmd) const override { + MONGO_UNREACHABLE; + } + virtual bool supportsClientTransactionContext() const override { return true; } @@ -505,6 +514,15 @@ public: MONGO_UNREACHABLE; } + virtual SemiFuture<BulkWriteCommandReply> runCRUDOp( + const BulkWriteCommandRequest& cmd) const override { + MONGO_UNREACHABLE; + } + + virtual BulkWriteCommandReply runCRUDOpSync(const BulkWriteCommandRequest& cmd) const override { + MONGO_UNREACHABLE; + } + virtual SemiFuture<std::vector<BSONObj>> exhaustiveFind( const FindCommandRequest& cmd) const override { MONGO_UNREACHABLE; |