/** * Copyright (C) 2020-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include "mongo/bson/bsonobj.h" #include "mongo/bson/timestamp.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/keypattern.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/resharding/coordinator_document_gen.h" #include "mongo/db/s/resharding/donor_oplog_id_gen.h" #include "mongo/db/s/sharding_state_lock.h" #include "mongo/executor/task_executor.h" #include "mongo/s/catalog/type_tags.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/resharding/common_types_gen.h" #include "mongo/s/shard_id.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/util/str.h" namespace mongo { constexpr auto kReshardFinalOpLogType = "reshardFinalOp"_sd; static const auto kReshardErrorMaxBytes = 2000; /** * Emplaces the 'fetchTimestamp' onto the ClassWithFetchTimestamp if the timestamp has been * emplaced inside the boost::optional. */ template void emplaceCloneTimestampIfExists(ClassWithCloneTimestamp& c, boost::optional cloneTimestamp) { if (!cloneTimestamp) { return; } invariant(!cloneTimestamp->isNull()); if (auto alreadyExistingCloneTimestamp = c.getCloneTimestamp()) { invariant(cloneTimestamp == alreadyExistingCloneTimestamp); } c.setCloneTimestamp(*cloneTimestamp); } template void emplaceApproxBytesToCopyIfExists(ReshardingDocumentWithApproxCopySize& document, boost::optional approxCopySize) { if (!approxCopySize) { return; } invariant(bool(document.getApproxBytesToCopy()) == bool(document.getApproxDocumentsToCopy()), "Expected approxBytesToCopy and approxDocumentsToCopy to either both be set or to" " both be unset"); if (auto alreadyExistingApproxBytesToCopy = document.getApproxBytesToCopy()) { invariant(approxCopySize->getApproxBytesToCopy() == *alreadyExistingApproxBytesToCopy, "Expected the existing and the new values for approxBytesToCopy to be equal"); } if (auto alreadyExistingApproxDocumentsToCopy = document.getApproxDocumentsToCopy()) { invariant(approxCopySize->getApproxDocumentsToCopy() == *alreadyExistingApproxDocumentsToCopy, "Expected the existing and the new values for approxDocumentsToCopy to be equal"); } document.setReshardingApproxCopySizeStruct(std::move(*approxCopySize)); } /** * Emplaces the 'minFetchTimestamp' onto the ClassWithFetchTimestamp if the timestamp has been * emplaced inside the boost::optional. */ template void emplaceMinFetchTimestampIfExists(ClassWithMinFetchTimestamp& c, boost::optional minFetchTimestamp) { if (!minFetchTimestamp) { return; } invariant(!minFetchTimestamp->isNull()); if (auto alreadyExistingMinFetchTimestamp = c.getMinFetchTimestamp()) { invariant(minFetchTimestamp == alreadyExistingMinFetchTimestamp); } c.setMinFetchTimestamp(std::move(minFetchTimestamp)); } /** * Returns a serialized version of the originalError status. If the originalError status exceeds * maxErrorBytes, truncates the status and returns it in the errmsg field of a new status with code * ErrorCodes::ReshardingCollectionTruncatedError. */ BSONObj serializeAndTruncateReshardingErrorIfNeeded(Status originalError); /** * Emplaces the 'abortReason' onto the ClassWithAbortReason if the reason has been emplaced inside * the boost::optional. If the 'abortReason' is too large, emplaces a status with * ErrorCodes::ReshardCollectionTruncatedError and a truncated version of the 'abortReason' for the * errmsg. */ template void emplaceTruncatedAbortReasonIfExists(ClassWithAbortReason& c, boost::optional abortReason) { if (!abortReason) { return; } invariant(!abortReason->isOK()); if (auto alreadyExistingAbortReason = c.getAbortReason()) { // If there already is an abortReason, don't overwrite it. return; } auto truncatedAbortReasonObj = serializeAndTruncateReshardingErrorIfNeeded(abortReason.get()); AbortReason abortReasonStruct; abortReasonStruct.setAbortReason(truncatedAbortReasonObj); c.setAbortReasonStruct(std::move(abortReasonStruct)); } /** * Extract the abortReason BSONObj into a status. */ template Status getStatusFromAbortReason(ClassWithAbortReason& c) { invariant(c.getAbortReason()); auto abortReasonObj = c.getAbortReason().get(); BSONElement codeElement = abortReasonObj["code"]; BSONElement errmsgElement = abortReasonObj["errmsg"]; int code = codeElement.numberInt(); std::string errmsg; if (errmsgElement.type() == String) { errmsg = errmsgElement.String(); } else if (!errmsgElement.eoo()) { errmsg = errmsgElement.toString(); } return Status(ErrorCodes::Error(code), errmsg, abortReasonObj); } /** * Extracts the ShardId from each Donor/RecipientShardEntry in participantShardEntries. */ template std::vector extractShardIdsFromParticipantEntries( const std::vector& participantShardEntries) { std::vector shardIds(participantShardEntries.size()); std::transform(participantShardEntries.begin(), participantShardEntries.end(), shardIds.begin(), [](const auto& shardEntry) { return shardEntry.getId(); }); return shardIds; } /** * Extracts the ShardId from each Donor/RecipientShardEntry in participantShardEntries as a set. */ template std::set extractShardIdsFromParticipantEntriesAsSet( const std::vector& participantShardEntries) { std::set shardIds; std::transform(participantShardEntries.begin(), participantShardEntries.end(), std::inserter(shardIds, shardIds.end()), [](const auto& shardEntry) { return shardEntry.getId(); }); return shardIds; } /** * Helper method to construct a DonorShardEntry with the fields specified. */ DonorShardEntry makeDonorShard(ShardId shardId, DonorStateEnum donorState, boost::optional minFetchTimestamp = boost::none, boost::optional abortReason = boost::none); /** * Helper method to construct a RecipientShardEntry with the fields specified. */ RecipientShardEntry makeRecipientShard(ShardId shardId, RecipientStateEnum recipientState, boost::optional abortReason = boost::none); /** * Gets the UUID for 'nss' from the 'cm' * * Note: throws if the collection does not have a UUID. */ UUID getCollectionUUIDFromChunkManger(const NamespaceString& nss, const ChunkManager& cm); /** * Assembles the namespace string for the temporary resharding collection based on the source * namespace components. * * .system.resharding. */ NamespaceString constructTemporaryReshardingNss(StringData db, const UUID& sourceUuid); /** * Gets the recipient shards for a resharding operation. */ std::set getRecipientShards(OperationContext* opCtx, const NamespaceString& reshardNss, const UUID& reshardingUUID); /** * Asserts that there is not a hole or overlap in the chunks. */ void checkForHolesAndOverlapsInChunks(std::vector& chunks, const KeyPattern& keyPattern); /** * Validates resharded chunks provided with a reshardCollection cmd. Parses each BSONObj to a valid * ReshardedChunk and asserts that each chunk's shardId is associated with an existing entry in * the shardRegistry. Then, asserts that there is not a hole or overlap in the chunks. */ void validateReshardedChunks(const std::vector& chunks, OperationContext* opCtx, const KeyPattern& keyPattern); /** * Selects the highest minFetchTimestamp from the list of donors. * * Throws if not every donor has a minFetchTimestamp. */ Timestamp getHighestMinFetchTimestamp(const std::vector& donorShards); /** * Asserts that there is not an overlap in the zone ranges. */ void checkForOverlappingZones(std::vector& zones); /** * Builds documents to insert into config.tags from zones provided to reshardCollection cmd. */ std::vector buildTagsDocsFromZones(const NamespaceString& tempNss, const std::vector& zones); /** * Creates a pipeline that can be serialized into a query for fetching oplog entries. `startAfter` * may be `Timestamp::isNull()` to fetch from the beginning of the oplog. */ std::unique_ptr createOplogFetchingPipelineForResharding( const boost::intrusive_ptr& expCtx, const ReshardingDonorOplogId& startAfter, UUID collUUID, const ShardId& recipientShard); /** * Returns the shard Id of the recipient shard that would own the document under the new shard * key pattern. */ boost::optional getDestinedRecipient(OperationContext* opCtx, const NamespaceString& sourceNss, const BSONObj& fullDocument, CollectionShardingState* css, const ScopedCollectionDescription& collDesc); /** * Sentinel oplog format: * { * op: "n", * ns: ".", * ui: , * destinedRecipient: , * o: {msg: "Writes to . is temporarily blocked for resharding"}, * o2: {type: "reshardFinalOp", reshardingUUID: }, * fromMigrate: true, * } */ bool isFinalOplog(const repl::OplogEntry& oplog); bool isFinalOplog(const repl::OplogEntry& oplog, UUID reshardingUUID); NamespaceString getLocalOplogBufferNamespace(UUID existingUUID, ShardId donorShardId); NamespaceString getLocalConflictStashNamespace(UUID existingUUID, ShardId donorShardId); } // namespace mongo