summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_batch_inserter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/migration_batch_inserter.cpp')
-rw-r--r--src/mongo/db/s/migration_batch_inserter.cpp200
1 files changed, 0 insertions, 200 deletions
diff --git a/src/mongo/db/s/migration_batch_inserter.cpp b/src/mongo/db/s/migration_batch_inserter.cpp
deleted file mode 100644
index 37f55947745..00000000000
--- a/src/mongo/db/s/migration_batch_inserter.cpp
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Copyright (C) 2022-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration
-
-#include "mongo/db/s/migration_batch_inserter.h"
-
-#include "mongo/db/s/migration_util.h"
-#include "mongo/db/transaction_participant.h"
-#include "mongo/logv2/log.h"
-
-namespace mongo {
-
-namespace {
-
-void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) {
- MongoDOperationContextSession::checkOut(opCtx);
- TransactionParticipant::get(opCtx).beginOrContinue(opCtx,
- {*opCtx->getTxnNumber()},
- boost::none /* autocommit */,
- boost::none /* startTransaction */);
-}
-
-template <typename Callable>
-constexpr bool returnsVoid() {
- return std::is_void_v<std::invoke_result_t<Callable>>;
-}
-
-// Yields the checked out session before running the given function. If the function runs without
-// throwing, will reacquire the session and verify it is still valid to proceed with the migration.
-template <typename Callable, std::enable_if_t<!returnsVoid<Callable>(), int> = 0>
-auto runWithoutSession(OperationContext* opCtx, Callable callable) {
- MongoDOperationContextSession::checkIn(opCtx, OperationContextSession::CheckInReason::kYield);
-
- auto retVal = callable();
-
- // The below code can throw, so it cannot run in a scope guard.
- opCtx->checkForInterrupt();
- checkOutSessionAndVerifyTxnState(opCtx);
-
- return retVal;
-}
-
-// Same as runWithoutSession above but takes a void function.
-template <typename Callable, std::enable_if_t<returnsVoid<Callable>(), int> = 0>
-void runWithoutSession(OperationContext* opCtx, Callable callable) {
- MongoDOperationContextSession::checkIn(opCtx, OperationContextSession::CheckInReason::kYield);
-
- callable();
-
- // The below code can throw, so it cannot run in a scope guard.
- opCtx->checkForInterrupt();
- checkOutSessionAndVerifyTxnState(opCtx);
-}
-} // namespace
-
-
-void MigrationBatchInserter::onCreateThread(const std::string& threadName) {
- Client::initThread(threadName, getGlobalServiceContext(), nullptr);
- {
- stdx::lock_guard<Client> lk(cc());
- cc().setSystemOperationKillableByStepdown(lk);
- }
-}
-
-void MigrationBatchInserter::run(Status status) const try {
- // Run is passed in a non-ok status if this function runs inline.
- // That happens if we schedule this task on a ThreadPool that is
- // already shutdown. If we were to schedule a task on a shutdown ThreadPool,
- // then there is a logic error in our code. Therefore, we assert that here.
-
- invariant(status.isOK());
- auto arr = _batch["objects"].Obj();
- if (arr.isEmpty())
- return;
-
- auto executor =
- Grid::get(_innerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
-
- auto applicationOpCtx = CancelableOperationContext(
- cc().makeOperationContext(), _innerOpCtx->getCancellationToken(), executor);
-
- auto opCtx = applicationOpCtx.get();
-
- auto assertNotAborted = [&]() {
- {
- stdx::lock_guard<Client> lk(*_outerOpCtx->getClient());
- _outerOpCtx->checkForInterrupt();
- }
- opCtx->checkForInterrupt();
- };
-
- auto it = arr.begin();
- while (it != arr.end()) {
- int batchNumCloned = 0;
- int batchClonedBytes = 0;
- const int batchMaxCloned = migrateCloneInsertionBatchSize.load();
-
- assertNotAborted();
-
- write_ops::InsertCommandRequest insertOp(_nss);
- insertOp.getWriteCommandRequestBase().setOrdered(true);
- insertOp.setDocuments([&] {
- std::vector<BSONObj> toInsert;
- while (it != arr.end() && (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) {
- const auto& doc = *it;
- BSONObj docToClone = doc.Obj();
- toInsert.push_back(docToClone);
- batchNumCloned++;
- batchClonedBytes += docToClone.objsize();
- ++it;
- }
- return toInsert;
- }());
-
- {
- // Disable the schema validation (during document inserts and updates)
- // and any internal validation for opCtx for performInserts()
- DisableDocumentValidation documentValidationDisabler(
- opCtx,
- DocumentValidationSettings::kDisableSchemaValidation |
- DocumentValidationSettings::kDisableInternalValidation);
- const auto reply =
- write_ops_exec::performInserts(opCtx, insertOp, OperationSource::kFromMigrate);
- for (unsigned long i = 0; i < reply.results.size(); ++i) {
- uassertStatusOKWithContext(
- reply.results[i],
- str::stream() << "Insert of " << insertOp.getDocuments()[i] << " failed.");
- }
- // Revert to the original DocumentValidationSettings for opCtx
- }
-
- migrationutil::persistUpdatedNumOrphans(
- opCtx, _migrationId, _collectionUuid, batchNumCloned);
- _migrationProgress->updateMaxOptime(
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp());
-
- ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch(batchNumCloned);
- LOGV2(6718408,
- "Incrementing numCloned count by {batchNumCloned} and numClonedBytes by "
- "{batchClonedBytes}",
- "batchNumCloned"_attr = batchNumCloned,
- "batchClonedBytes"_attr = batchClonedBytes);
- _migrationProgress->incNumCloned(batchNumCloned);
- _migrationProgress->incNumBytes(batchClonedBytes);
-
- if (_writeConcern.needToWaitForOtherNodes() && _threadCount == 1) {
- runWithoutSession(_outerOpCtx, [&] {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- _writeConcern);
- if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
- LOGV2_WARNING(22011,
- "secondaryThrottle on, but doc insert timed out; continuing",
- "migrationId"_attr = _migrationId.toBSON());
- } else {
- uassertStatusOK(replStatus.status);
- }
- });
- }
-
- sleepmillis(migrateCloneInsertionBatchDelayMS.load());
- }
-} catch (const DBException& e) {
- stdx::lock_guard<Client> lk(*_innerOpCtx->getClient());
- _innerOpCtx->getServiceContext()->killOperation(lk, _innerOpCtx, ErrorCodes::Error(6718402));
- LOGV2(6718407,
- "Batch application failed: {error}",
- "Batch application failed",
- "error"_attr = e.toStatus());
-}
-} // namespace mongo