path: root/src/mongo/db/s/migration_batch_inserter.cpp
diff options
authorAndrew Witten <>2022-12-22 23:28:14 +0000
committerEvergreen Agent <>2023-01-23 17:01:18 +0000
commitede7bdea766fff40678d03a7051b0d15de0fbba6 (patch)
tree2422031308d3b708c76d9bc58e62cf6bbfdc651f /src/mongo/db/s/migration_batch_inserter.cpp
parentc585f971a68e707da7420a6a5c19988bb65a6205 (diff)
SERVER-67183 Add parallel fetchers and inserters for chunk migration
(cherry picked from commit 1564f715d16870df7a524ad702aad6be0f2da1f5)
Diffstat (limited to 'src/mongo/db/s/migration_batch_inserter.cpp')
1 files changed, 200 insertions, 0 deletions
diff --git a/src/mongo/db/s/migration_batch_inserter.cpp b/src/mongo/db/s/migration_batch_inserter.cpp
new file mode 100644
index 00000000000..37f55947745
--- /dev/null
+++ b/src/mongo/db/s/migration_batch_inserter.cpp
@@ -0,0 +1,200 @@
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration
+#include "mongo/db/s/migration_batch_inserter.h"
+#include "mongo/db/s/migration_util.h"
+#include "mongo/db/transaction_participant.h"
+#include "mongo/logv2/log.h"
+namespace mongo {
+namespace {
+void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) {
+ MongoDOperationContextSession::checkOut(opCtx);
+ TransactionParticipant::get(opCtx).beginOrContinue(opCtx,
+ {*opCtx->getTxnNumber()},
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */);
+template <typename Callable>
+constexpr bool returnsVoid() {
+ return std::is_void_v<std::invoke_result_t<Callable>>;
+// Yields the checked out session before running the given function. If the function runs without
+// throwing, will reacquire the session and verify it is still valid to proceed with the migration.
+template <typename Callable, std::enable_if_t<!returnsVoid<Callable>(), int> = 0>
+auto runWithoutSession(OperationContext* opCtx, Callable callable) {
+ MongoDOperationContextSession::checkIn(opCtx, OperationContextSession::CheckInReason::kYield);
+ auto retVal = callable();
+ // The below code can throw, so it cannot run in a scope guard.
+ opCtx->checkForInterrupt();
+ checkOutSessionAndVerifyTxnState(opCtx);
+ return retVal;
+// Same as runWithoutSession above but takes a void function.
+template <typename Callable, std::enable_if_t<returnsVoid<Callable>(), int> = 0>
+void runWithoutSession(OperationContext* opCtx, Callable callable) {
+ MongoDOperationContextSession::checkIn(opCtx, OperationContextSession::CheckInReason::kYield);
+ callable();
+ // The below code can throw, so it cannot run in a scope guard.
+ opCtx->checkForInterrupt();
+ checkOutSessionAndVerifyTxnState(opCtx);
+} // namespace
+void MigrationBatchInserter::onCreateThread(const std::string& threadName) {
+ Client::initThread(threadName, getGlobalServiceContext(), nullptr);
+ {
+ stdx::lock_guard<Client> lk(cc());
+ cc().setSystemOperationKillableByStepdown(lk);
+ }
+void MigrationBatchInserter::run(Status status) const try {
+ // Run is passed in a non-ok status if this function runs inline.
+ // That happens if we schedule this task on a ThreadPool that is
+ // already shutdown. If we were to schedule a task on a shutdown ThreadPool,
+ // then there is a logic error in our code. Therefore, we assert that here.
+ invariant(status.isOK());
+ auto arr = _batch["objects"].Obj();
+ if (arr.isEmpty())
+ return;
+ auto executor =
+ Grid::get(_innerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
+ auto applicationOpCtx = CancelableOperationContext(
+ cc().makeOperationContext(), _innerOpCtx->getCancellationToken(), executor);
+ auto opCtx = applicationOpCtx.get();
+ auto assertNotAborted = [&]() {
+ {
+ stdx::lock_guard<Client> lk(*_outerOpCtx->getClient());
+ _outerOpCtx->checkForInterrupt();
+ }
+ opCtx->checkForInterrupt();
+ };
+ auto it = arr.begin();
+ while (it != arr.end()) {
+ int batchNumCloned = 0;
+ int batchClonedBytes = 0;
+ const int batchMaxCloned = migrateCloneInsertionBatchSize.load();
+ assertNotAborted();
+ write_ops::InsertCommandRequest insertOp(_nss);
+ insertOp.getWriteCommandRequestBase().setOrdered(true);
+ insertOp.setDocuments([&] {
+ std::vector<BSONObj> toInsert;
+ while (it != arr.end() && (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) {
+ const auto& doc = *it;
+ BSONObj docToClone = doc.Obj();
+ toInsert.push_back(docToClone);
+ batchNumCloned++;
+ batchClonedBytes += docToClone.objsize();
+ ++it;
+ }
+ return toInsert;
+ }());
+ {
+ // Disable the schema validation (during document inserts and updates)
+ // and any internal validation for opCtx for performInserts()
+ DisableDocumentValidation documentValidationDisabler(
+ opCtx,
+ DocumentValidationSettings::kDisableSchemaValidation |
+ DocumentValidationSettings::kDisableInternalValidation);
+ const auto reply =
+ write_ops_exec::performInserts(opCtx, insertOp, OperationSource::kFromMigrate);
+ for (unsigned long i = 0; i < reply.results.size(); ++i) {
+ uassertStatusOKWithContext(
+ reply.results[i],
+ str::stream() << "Insert of " << insertOp.getDocuments()[i] << " failed.");
+ }
+ // Revert to the original DocumentValidationSettings for opCtx
+ }
+ migrationutil::persistUpdatedNumOrphans(
+ opCtx, _migrationId, _collectionUuid, batchNumCloned);
+ _migrationProgress->updateMaxOptime(
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp());
+ ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch(batchNumCloned);
+ LOGV2(6718408,
+ "Incrementing numCloned count by {batchNumCloned} and numClonedBytes by "
+ "{batchClonedBytes}",
+ "batchNumCloned"_attr = batchNumCloned,
+ "batchClonedBytes"_attr = batchClonedBytes);
+ _migrationProgress->incNumCloned(batchNumCloned);
+ _migrationProgress->incNumBytes(batchClonedBytes);
+ if (_writeConcern.needToWaitForOtherNodes() && _threadCount == 1) {
+ runWithoutSession(_outerOpCtx, [&] {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ _writeConcern);
+ if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
+ LOGV2_WARNING(22011,
+ "secondaryThrottle on, but doc insert timed out; continuing",
+ "migrationId"_attr = _migrationId.toBSON());
+ } else {
+ uassertStatusOK(replStatus.status);
+ }
+ });
+ }
+ sleepmillis(migrateCloneInsertionBatchDelayMS.load());
+ }
+} catch (const DBException& e) {
+ stdx::lock_guard<Client> lk(*_innerOpCtx->getClient());
+ _innerOpCtx->getServiceContext()->killOperation(lk, _innerOpCtx, ErrorCodes::Error(6718402));
+ LOGV2(6718407,
+ "Batch application failed: {error}",
+ "Batch application failed",
+ "error"_attr = e.toStatus());
+} // namespace mongo