If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/db/s/resharding/resharding_op_observer.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/db_raii.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/resharding/donor_document_gen.h" #include "mongo/db/s/resharding/resharding_coordinator_service.h" #include "mongo/logv2/log.h" namespace mongo { namespace { std::shared_ptr getReshardingCoordinatorObserver( OperationContext* opCtx, const BSONObj& reshardingId) { auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); auto service = registry->lookupServiceByName(kReshardingCoordinatorServiceName); auto instance = ReshardingCoordinatorService::ReshardingCoordinator::lookup(opCtx, service, reshardingId); iassert( 5400001, "ReshardingCoordinatorService instance does not exist", instance.is_initialized()); return (*instance)->getObserver(); } boost::optional parseNewMinFetchTimestampValue(const BSONObj& obj) { auto doc = ReshardingDonorDocument::parse(IDLParserErrorContext("Resharding"), obj); if (doc.getMutableState().getState() == DonorStateEnum::kDonatingInitialData) { return doc.getMutableState().getMinFetchTimestamp().get(); } else { return boost::none; } } void assertCanExtractShardKeyFromDocs(OperationContext* opCtx, const NamespaceString& nss, std::vector::const_iterator begin, std::vector::const_iterator end) { const auto metadata = CollectionShardingRuntime::get(opCtx, nss)->getCurrentMetadataIfKnown(); // A user can manually create a 'db.system.resharding.' collection that isn't guaranteed to be // sharded outside of running reshardCollection. uassert(ErrorCodes::NamespaceNotSharded, str::stream() << "Temporary resharding collection " << nss.toString() << " is not sharded", metadata && metadata->isSharded()); auto chunkManager = *metadata->getChunkManager(); const auto& shardKeyPattern = chunkManager.getShardKeyPattern(); for (auto it = begin; it != end; ++it) { shardKeyPattern.extractShardKeyFromDocThrows(it->doc); } } boost::optional _calculatePin(OperationContext* opCtx) { // We recalculate the pin by looking at all documents inside the resharding donor // collection. The caller may or may not be in a transaction. If the caller is in a transaction, // we intentionally read any uncommitted writes it has made. // // If there are concurrent transactions updating different keys in the donor collection, there // can be write skew resulting in the wrong pin, including leaking a resource. We enforce the // collection is held in exclusive mode to prevent this. // TODO: Uncomment // invariant(opCtx->lockState()->isCollectionLockedForMode( // NamespaceString::kDonorReshardingOperationsNamespace, LockMode::MODE_X)); // If the RecoveryUnit already had an open snapshot, keep the snapshot open. Otherwise abandon // the snapshot when exitting the function. auto scopeGuard = makeGuard([&] { opCtx->recoveryUnit()->abandonSnapshot(); }); if (opCtx->recoveryUnit()->isActive()) { scopeGuard.dismiss(); } AutoGetCollectionForRead autoColl(opCtx, NamespaceString::kDonorReshardingOperationsNamespace); if (!autoColl) { return boost::none; } Timestamp ret = Timestamp::max(); auto cursor = autoColl->getCursor(opCtx); for (auto doc = cursor->next(); doc; doc = cursor->next()) { if (auto fetchTs = parseNewMinFetchTimestampValue(doc.get().data.toBson()); fetchTs) { ret = std::min(ret, fetchTs.get()); } } if (ret == Timestamp::max()) { return boost::none; } return ret; } void _doPin(OperationContext* opCtx) { auto storageEngine = opCtx->getServiceContext()->getStorageEngine(); boost::optional pin = _calculatePin(opCtx); auto replCoord = repl::ReplicationCoordinator::get(opCtx); if (!pin) { storageEngine->unpinOldestTimestamp(ReshardingHistoryHook::kName.toString()); return; } StatusWith res = storageEngine->pinOldestTimestamp( opCtx, ReshardingHistoryHook::kName.toString(), pin.get(), false); if (!res.isOK()) { if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::Mode::modeReplSet) { // The pin has failed, but we're in standalone mode. Ignore the error. return; } const auto state = replCoord->getMemberState(); if (state.primary()) { // If we're a primary, the pin can fail and the error should bubble up and fail // resharding. uassertStatusOK(res); } else if (state.secondary()) { // The pin timestamp can be earlier than the oplog entry being processed. Thus // the oldest timestamp can race ahead of the pin request. It's not ideal this // node cannot participate in donating documents for the cloning phase, but this // is the most robust path forward. Ignore this case. LOGV2_WARNING(5384104, "This node is unable to pin history for resharding", "requestedTs"_attr = pin.get()); } else { // For recovery cases we also ignore the error. The expected scenario is the pin // request is no longer needed, but the write to delete the pin was rolled // back. The write to delete the pin won't be issued until the collection // cloning phase of resharding is majority committed. Thus there should be no // consequence to observing this error. Ignore this case. LOGV2(5384103, "The requested pin was unavailable, but should also be unnecessary", "requestedTs"_attr = pin.get()); } } } } // namespace boost::optional ReshardingHistoryHook::calculatePin(OperationContext* opCtx) { return _calculatePin(opCtx); } ReshardingOpObserver::ReshardingOpObserver() = default; ReshardingOpObserver::~ReshardingOpObserver() = default; void ReshardingOpObserver::onInserts(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, std::vector::const_iterator begin, std::vector::const_iterator end, bool fromMigrate) { if (nss == NamespaceString::kDonorReshardingOperationsNamespace) { // If a document is inserted into the resharding donor collection with a // `minFetchTimestamp`, we assume the document was inserted as part of initial sync and do // nothing to pin history. return; } // This is a no-op if either replication is not enabled or this node is a secondary if (!repl::ReplicationCoordinator::get(opCtx)->isReplEnabled() || !opCtx->writesAreReplicated()) { return; } if (nss.isTemporaryReshardingCollection()) { assertCanExtractShardKeyFromDocs(opCtx, nss, begin, end); } } void ReshardingOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { if (args.nss == NamespaceString::kDonorReshardingOperationsNamespace) { // Primaries and secondaries should execute pinning logic when observing changes to the // donor resharding document. _doPin(opCtx); } // This is a no-op if either replication is not enabled or this node is a secondary if (!repl::ReplicationCoordinator::get(opCtx)->isReplEnabled() || !opCtx->writesAreReplicated()) { return; } if (args.nss == NamespaceString::kConfigReshardingOperationsNamespace) { auto newCoordinatorDoc = ReshardingCoordinatorDocument::parse( IDLParserErrorContext("reshardingCoordinatorDoc"), args.updateArgs.updatedDoc); auto reshardingId = BSON(ReshardingCoordinatorDocument::kReshardingUUIDFieldName << newCoordinatorDoc.getReshardingUUID()); auto observer = getReshardingCoordinatorObserver(opCtx, reshardingId); opCtx->recoveryUnit()->onCommit( [observer = std::move(observer), newCoordinatorDoc = std::move(newCoordinatorDoc)]( boost::optional unusedCommitTime) mutable { observer->onReshardingParticipantTransition(newCoordinatorDoc); }); } else if (args.nss.isTemporaryReshardingCollection()) { const std::vector updateDoc{InsertStatement{args.updateArgs.updatedDoc}}; assertCanExtractShardKeyFromDocs(opCtx, args.nss, updateDoc.begin(), updateDoc.end()); } } void ReshardingOpObserver::onDelete(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, StmtId stmtId, bool fromMigrate, const boost::optional& deletedDoc) { if (nss == NamespaceString::kDonorReshardingOperationsNamespace) { _doPin(opCtx); } } } // namespace mongo