diff options
author | kauboy26 <vishnu.kaushik@mongodb.com> | 2023-04-19 16:59:11 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-19 18:20:22 +0000 |
commit | 7587251132c626f4949d61047c37f73d47f88dea (patch) | |
tree | 090c271822729a3e11df20d4de0542260f2f3660 /src/mongo/s | |
parent | adf6d5a0c5569c4f53867afeaac390a892fca60f (diff) | |
download | mongo-7587251132c626f4949d61047c37f73d47f88dea.tar.gz |
SERVER-72984 send bulk write commands from mongos to mongod
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/write_ops/bulk_write_exec.cpp | 66 | ||||
-rw-r--r-- | src/mongo/s/write_ops/bulk_write_exec.h | 3 |
2 files changed, 63 insertions, 6 deletions
diff --git a/src/mongo/s/write_ops/bulk_write_exec.cpp b/src/mongo/s/write_ops/bulk_write_exec.cpp index 89303c51a49..719cf6e8086 100644 --- a/src/mongo/s/write_ops/bulk_write_exec.cpp +++ b/src/mongo/s/write_ops/bulk_write_exec.cpp @@ -30,26 +30,75 @@ #include "mongo/s/write_ops/bulk_write_exec.h" #include "mongo/base/error_codes.h" +#include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/commands/bulk_write_gen.h" +#include "mongo/db/database_name.h" +#include "mongo/db/write_concern_options.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/log.h" +#include "mongo/s/async_requests_sender.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" +#include "mongo/s/multi_statement_transaction_requests_sender.h" #include "mongo/s/transaction_router.h" +#include "mongo/s/write_ops/batch_write_op.h" +#include "mongo/s/write_ops/write_op.h" #include "mongo/s/write_ops/write_without_shard_key_util.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding namespace mongo { +namespace bulk_write_exec { namespace { // The number of times we'll try to continue a batch op if no progress is being made. This only // applies when no writes are occurring and metadata is not changing on reload. const int kMaxRoundsWithoutProgress(5); +// Send and process the child batches. Each child batch is targeted at a unique shard: therefore +// one shard will have only one batch incoming. +void executeChildBatches(OperationContext* opCtx, + const TargetedBatchMap& childBatches, + const BulkWriteOp& bulkWriteOp) { + std::vector<AsyncRequestsSender::Request> requests; + for (auto& childBatch : childBatches) { + auto request = [&]() { + auto bulkReq = bulkWriteOp.buildBulkCommandRequest(*childBatch.second); + + // Transform the request into a sendable BSON. + BSONObjBuilder builder; + bulkReq.serialize(BSONObj(), &builder); + + logical_session_id_helpers::serializeLsidAndTxnNumber(opCtx, &builder); + builder.append(WriteConcernOptions::kWriteConcernField, + opCtx->getWriteConcern().toBSON()); + + return builder.obj(); + }(); + + requests.emplace_back(childBatch.first, request); + } + + bool isRetryableWrite = opCtx->getTxnNumber() && !TransactionRouter::get(opCtx); + + // Use MultiStatementTransactionRequestsSender to send any ready sub-batches to targeted + // shard endpoints. Requests are sent on construction. + MultiStatementTransactionRequestsSender ars( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + DatabaseName("admin"), + requests, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + isRetryableWrite ? Shard::RetryPolicy::kIdempotent : Shard::RetryPolicy::kNoRetry); + + while (!ars.done()) { + // Block until a response is available. + auto response = ars.next(); + } +} + } // namespace -namespace bulk_write_exec { std::vector<BulkWriteReplyItem> execute(OperationContext* opCtx, const std::vector<std::unique_ptr<NSTargeter>>& targeters, @@ -72,6 +121,10 @@ std::vector<BulkWriteReplyItem> execute(OperationContext* opCtx, // re-batch ops based on their targeted shard id. TargetedBatchMap childBatches; + // Divide and group ("target") the operations in the bulk write command. Some operations may + // be split up (such as an update that needs to go to more than one shard), while others may + // be grouped together if they need to go to the same shard. + // These operations are grouped by shardId in the TargetedBatchMap childBatches. bool recordTargetErrors = refreshedTargeter; auto targetStatus = bulkWriteOp.target(targeters, recordTargetErrors, childBatches); if (!targetStatus.isOK()) { @@ -86,12 +139,10 @@ std::vector<BulkWriteReplyItem> execute(OperationContext* opCtx, refreshedTargeter = true; } - // 2: Use MultiStatementTransactionRequestsSender to send any ready sub-batches to targeted - // shard endpoints. + // Send the child batches and wait for responses. + executeChildBatches(opCtx, childBatches, bulkWriteOp); - // 3: Wait for responses for all those sub-batches and keep track of the responses from - // sub-batches based on the op index in the original bulkWrite command. Abort the batch upon - // errors for ordered writes or transactions. + // 3: Abort the batch upon errors for ordered writes or transactions. // TODO(SERVER-72792): Remove the logic below that mimics ok responses and process real // batch responses. for (const auto& childBatch : childBatches) { @@ -157,6 +208,7 @@ BulkWriteOp::BulkWriteOp(OperationContext* opCtx, const BulkWriteCommandRequest& : _opCtx(opCtx), _clientRequest(clientRequest), _txnNum(_opCtx->getTxnNumber()), + _writeConcern(opCtx->getWriteConcern()), _inTransaction(static_cast<bool>(TransactionRouter::get(opCtx))), _isRetryableWrite(opCtx->isRetryableWrite()) { _writeOps.reserve(_clientRequest.getOps().size()); @@ -238,6 +290,8 @@ BulkWriteCommandRequest BulkWriteOp::buildBulkCommandRequest( // TODO (SERVER-72989): Attach stmtIds etc. when building support for retryable // writes on mongos + request.setDbName(DatabaseName("admin")); + return request; } diff --git a/src/mongo/s/write_ops/bulk_write_exec.h b/src/mongo/s/write_ops/bulk_write_exec.h index 41d6ebf9c4d..51d862ad3f9 100644 --- a/src/mongo/s/write_ops/bulk_write_exec.h +++ b/src/mongo/s/write_ops/bulk_write_exec.h @@ -149,6 +149,9 @@ private: // Cached transaction number (if one is present on the operation contex). boost::optional<TxnNumber> _txnNum; + // The write concern that the bulk write command was issued with. + WriteConcernOptions _writeConcern; + // Set to true if this write is part of a transaction. const bool _inTransaction{false}; const bool _isRetryableWrite{false}; |