diff options
author | Andrew Witten <andrew.witten@mongodb.com> | 2022-12-22 23:28:14 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-23 17:01:18 +0000 |
commit | ede7bdea766fff40678d03a7051b0d15de0fbba6 (patch) | |
tree | 2422031308d3b708c76d9bc58e62cf6bbfdc651f /src/mongo/db/s/migration_batch_inserter.cpp | |
parent | c585f971a68e707da7420a6a5c19988bb65a6205 (diff) | |
download | mongo-ede7bdea766fff40678d03a7051b0d15de0fbba6.tar.gz |
SERVER-67183 Add parallel fetchers and inserters for chunk migration
(cherry picked from commit 1564f715d16870df7a524ad702aad6be0f2da1f5)
Diffstat (limited to 'src/mongo/db/s/migration_batch_inserter.cpp')
-rw-r--r-- | src/mongo/db/s/migration_batch_inserter.cpp | 200 |
1 files changed, 200 insertions, 0 deletions
diff --git a/src/mongo/db/s/migration_batch_inserter.cpp b/src/mongo/db/s/migration_batch_inserter.cpp new file mode 100644 index 00000000000..37f55947745 --- /dev/null +++ b/src/mongo/db/s/migration_batch_inserter.cpp @@ -0,0 +1,200 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration + +#include "mongo/db/s/migration_batch_inserter.h" + +#include "mongo/db/s/migration_util.h" +#include "mongo/db/transaction_participant.h" +#include "mongo/logv2/log.h" + +namespace mongo { + +namespace { + +void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) { + MongoDOperationContextSession::checkOut(opCtx); + TransactionParticipant::get(opCtx).beginOrContinue(opCtx, + {*opCtx->getTxnNumber()}, + boost::none /* autocommit */, + boost::none /* startTransaction */); +} + +template <typename Callable> +constexpr bool returnsVoid() { + return std::is_void_v<std::invoke_result_t<Callable>>; +} + +// Yields the checked out session before running the given function. If the function runs without +// throwing, will reacquire the session and verify it is still valid to proceed with the migration. +template <typename Callable, std::enable_if_t<!returnsVoid<Callable>(), int> = 0> +auto runWithoutSession(OperationContext* opCtx, Callable callable) { + MongoDOperationContextSession::checkIn(opCtx, OperationContextSession::CheckInReason::kYield); + + auto retVal = callable(); + + // The below code can throw, so it cannot run in a scope guard. + opCtx->checkForInterrupt(); + checkOutSessionAndVerifyTxnState(opCtx); + + return retVal; +} + +// Same as runWithoutSession above but takes a void function. +template <typename Callable, std::enable_if_t<returnsVoid<Callable>(), int> = 0> +void runWithoutSession(OperationContext* opCtx, Callable callable) { + MongoDOperationContextSession::checkIn(opCtx, OperationContextSession::CheckInReason::kYield); + + callable(); + + // The below code can throw, so it cannot run in a scope guard. + opCtx->checkForInterrupt(); + checkOutSessionAndVerifyTxnState(opCtx); +} +} // namespace + + +void MigrationBatchInserter::onCreateThread(const std::string& threadName) { + Client::initThread(threadName, getGlobalServiceContext(), nullptr); + { + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationKillableByStepdown(lk); + } +} + +void MigrationBatchInserter::run(Status status) const try { + // Run is passed in a non-ok status if this function runs inline. + // That happens if we schedule this task on a ThreadPool that is + // already shutdown. If we were to schedule a task on a shutdown ThreadPool, + // then there is a logic error in our code. Therefore, we assert that here. + + invariant(status.isOK()); + auto arr = _batch["objects"].Obj(); + if (arr.isEmpty()) + return; + + auto executor = + Grid::get(_innerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor(); + + auto applicationOpCtx = CancelableOperationContext( + cc().makeOperationContext(), _innerOpCtx->getCancellationToken(), executor); + + auto opCtx = applicationOpCtx.get(); + + auto assertNotAborted = [&]() { + { + stdx::lock_guard<Client> lk(*_outerOpCtx->getClient()); + _outerOpCtx->checkForInterrupt(); + } + opCtx->checkForInterrupt(); + }; + + auto it = arr.begin(); + while (it != arr.end()) { + int batchNumCloned = 0; + int batchClonedBytes = 0; + const int batchMaxCloned = migrateCloneInsertionBatchSize.load(); + + assertNotAborted(); + + write_ops::InsertCommandRequest insertOp(_nss); + insertOp.getWriteCommandRequestBase().setOrdered(true); + insertOp.setDocuments([&] { + std::vector<BSONObj> toInsert; + while (it != arr.end() && (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) { + const auto& doc = *it; + BSONObj docToClone = doc.Obj(); + toInsert.push_back(docToClone); + batchNumCloned++; + batchClonedBytes += docToClone.objsize(); + ++it; + } + return toInsert; + }()); + + { + // Disable the schema validation (during document inserts and updates) + // and any internal validation for opCtx for performInserts() + DisableDocumentValidation documentValidationDisabler( + opCtx, + DocumentValidationSettings::kDisableSchemaValidation | + DocumentValidationSettings::kDisableInternalValidation); + const auto reply = + write_ops_exec::performInserts(opCtx, insertOp, OperationSource::kFromMigrate); + for (unsigned long i = 0; i < reply.results.size(); ++i) { + uassertStatusOKWithContext( + reply.results[i], + str::stream() << "Insert of " << insertOp.getDocuments()[i] << " failed."); + } + // Revert to the original DocumentValidationSettings for opCtx + } + + migrationutil::persistUpdatedNumOrphans( + opCtx, _migrationId, _collectionUuid, batchNumCloned); + _migrationProgress->updateMaxOptime( + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp()); + + ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch(batchNumCloned); + LOGV2(6718408, + "Incrementing numCloned count by {batchNumCloned} and numClonedBytes by " + "{batchClonedBytes}", + "batchNumCloned"_attr = batchNumCloned, + "batchClonedBytes"_attr = batchClonedBytes); + _migrationProgress->incNumCloned(batchNumCloned); + _migrationProgress->incNumBytes(batchClonedBytes); + + if (_writeConcern.needToWaitForOtherNodes() && _threadCount == 1) { + runWithoutSession(_outerOpCtx, [&] { + repl::ReplicationCoordinator::StatusAndDuration replStatus = + repl::ReplicationCoordinator::get(opCtx)->awaitReplication( + opCtx, + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), + _writeConcern); + if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { + LOGV2_WARNING(22011, + "secondaryThrottle on, but doc insert timed out; continuing", + "migrationId"_attr = _migrationId.toBSON()); + } else { + uassertStatusOK(replStatus.status); + } + }); + } + + sleepmillis(migrateCloneInsertionBatchDelayMS.load()); + } +} catch (const DBException& e) { + stdx::lock_guard<Client> lk(*_innerOpCtx->getClient()); + _innerOpCtx->getServiceContext()->killOperation(lk, _innerOpCtx, ErrorCodes::Error(6718402)); + LOGV2(6718407, + "Batch application failed: {error}", + "Batch application failed", + "error"_attr = e.toStatus()); +} +} // namespace mongo |