summaryrefslogtreecommitdiff
path: root/src/mongo/s/write_ops/bulk_write_exec.cpp
diff options
context:
space:
mode:
authorkauboy26 <vishnu.kaushik@mongodb.com>2023-04-19 16:59:11 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-19 18:20:22 +0000
commit7587251132c626f4949d61047c37f73d47f88dea (patch)
tree090c271822729a3e11df20d4de0542260f2f3660 /src/mongo/s/write_ops/bulk_write_exec.cpp
parentadf6d5a0c5569c4f53867afeaac390a892fca60f (diff)
downloadmongo-7587251132c626f4949d61047c37f73d47f88dea.tar.gz
SERVER-72984 send bulk write commands from mongos to mongod
Diffstat (limited to 'src/mongo/s/write_ops/bulk_write_exec.cpp')
-rw-r--r--src/mongo/s/write_ops/bulk_write_exec.cpp66
1 files changed, 60 insertions, 6 deletions
diff --git a/src/mongo/s/write_ops/bulk_write_exec.cpp b/src/mongo/s/write_ops/bulk_write_exec.cpp
index 89303c51a49..719cf6e8086 100644
--- a/src/mongo/s/write_ops/bulk_write_exec.cpp
+++ b/src/mongo/s/write_ops/bulk_write_exec.cpp
@@ -30,26 +30,75 @@
#include "mongo/s/write_ops/bulk_write_exec.h"
#include "mongo/base/error_codes.h"
+#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/commands/bulk_write_gen.h"
+#include "mongo/db/database_name.h"
+#include "mongo/db/write_concern_options.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/async_requests_sender.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
+#include "mongo/s/multi_statement_transaction_requests_sender.h"
#include "mongo/s/transaction_router.h"
+#include "mongo/s/write_ops/batch_write_op.h"
+#include "mongo/s/write_ops/write_op.h"
#include "mongo/s/write_ops/write_without_shard_key_util.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
namespace mongo {
+namespace bulk_write_exec {
namespace {
// The number of times we'll try to continue a batch op if no progress is being made. This only
// applies when no writes are occurring and metadata is not changing on reload.
const int kMaxRoundsWithoutProgress(5);
+// Send and process the child batches. Each child batch is targeted at a unique shard: therefore
+// one shard will have only one batch incoming.
+void executeChildBatches(OperationContext* opCtx,
+ const TargetedBatchMap& childBatches,
+ const BulkWriteOp& bulkWriteOp) {
+ std::vector<AsyncRequestsSender::Request> requests;
+ for (auto& childBatch : childBatches) {
+ auto request = [&]() {
+ auto bulkReq = bulkWriteOp.buildBulkCommandRequest(*childBatch.second);
+
+ // Transform the request into a sendable BSON.
+ BSONObjBuilder builder;
+ bulkReq.serialize(BSONObj(), &builder);
+
+ logical_session_id_helpers::serializeLsidAndTxnNumber(opCtx, &builder);
+ builder.append(WriteConcernOptions::kWriteConcernField,
+ opCtx->getWriteConcern().toBSON());
+
+ return builder.obj();
+ }();
+
+ requests.emplace_back(childBatch.first, request);
+ }
+
+ bool isRetryableWrite = opCtx->getTxnNumber() && !TransactionRouter::get(opCtx);
+
+ // Use MultiStatementTransactionRequestsSender to send any ready sub-batches to targeted
+ // shard endpoints. Requests are sent on construction.
+ MultiStatementTransactionRequestsSender ars(
+ opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ DatabaseName("admin"),
+ requests,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ isRetryableWrite ? Shard::RetryPolicy::kIdempotent : Shard::RetryPolicy::kNoRetry);
+
+ while (!ars.done()) {
+ // Block until a response is available.
+ auto response = ars.next();
+ }
+}
+
} // namespace
-namespace bulk_write_exec {
std::vector<BulkWriteReplyItem> execute(OperationContext* opCtx,
const std::vector<std::unique_ptr<NSTargeter>>& targeters,
@@ -72,6 +121,10 @@ std::vector<BulkWriteReplyItem> execute(OperationContext* opCtx,
// re-batch ops based on their targeted shard id.
TargetedBatchMap childBatches;
+ // Divide and group ("target") the operations in the bulk write command. Some operations may
+ // be split up (such as an update that needs to go to more than one shard), while others may
+ // be grouped together if they need to go to the same shard.
+ // These operations are grouped by shardId in the TargetedBatchMap childBatches.
bool recordTargetErrors = refreshedTargeter;
auto targetStatus = bulkWriteOp.target(targeters, recordTargetErrors, childBatches);
if (!targetStatus.isOK()) {
@@ -86,12 +139,10 @@ std::vector<BulkWriteReplyItem> execute(OperationContext* opCtx,
refreshedTargeter = true;
}
- // 2: Use MultiStatementTransactionRequestsSender to send any ready sub-batches to targeted
- // shard endpoints.
+ // Send the child batches and wait for responses.
+ executeChildBatches(opCtx, childBatches, bulkWriteOp);
- // 3: Wait for responses for all those sub-batches and keep track of the responses from
- // sub-batches based on the op index in the original bulkWrite command. Abort the batch upon
- // errors for ordered writes or transactions.
+ // 3: Abort the batch upon errors for ordered writes or transactions.
// TODO(SERVER-72792): Remove the logic below that mimics ok responses and process real
// batch responses.
for (const auto& childBatch : childBatches) {
@@ -157,6 +208,7 @@ BulkWriteOp::BulkWriteOp(OperationContext* opCtx, const BulkWriteCommandRequest&
: _opCtx(opCtx),
_clientRequest(clientRequest),
_txnNum(_opCtx->getTxnNumber()),
+ _writeConcern(opCtx->getWriteConcern()),
_inTransaction(static_cast<bool>(TransactionRouter::get(opCtx))),
_isRetryableWrite(opCtx->isRetryableWrite()) {
_writeOps.reserve(_clientRequest.getOps().size());
@@ -238,6 +290,8 @@ BulkWriteCommandRequest BulkWriteOp::buildBulkCommandRequest(
// TODO (SERVER-72989): Attach stmtIds etc. when building support for retryable
// writes on mongos
+ request.setDbName(DatabaseName("admin"));
+
return request;
}