/** * Copyright (C) 2020-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/persistent_task_store.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/ddl_lock_manager.h" #include "mongo/db/s/forwardable_operation_metadata.h" #include "mongo/db/s/sharding_ddl_coordinator_gen.h" #include "mongo/db/s/sharding_ddl_coordinator_service.h" #include "mongo/db/session/internal_session_pool.h" #include "mongo/executor/task_executor.h" #include "mongo/logv2/log.h" #include "mongo/util/future.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding namespace mongo { ShardingDDLCoordinatorMetadata extractShardingDDLCoordinatorMetadata(const BSONObj& coorDoc); class ShardingDDLCoordinator : public repl::PrimaryOnlyService::TypedInstance { public: explicit ShardingDDLCoordinator(ShardingDDLCoordinatorService* service, const BSONObj& coorDoc); ~ShardingDDLCoordinator(); /** * Whether this coordinator is allowed to start when user write blocking is enabled, even if the * writeBlockingBypass flag is not set. Coordinators that don't affect user data shall always be * allowed to run even when user write blocking is enabled. */ virtual bool canAlwaysStartWhenUserWritesAreDisabled() const { return false; } /* * Returns a future that will be completed when the construction of this coordinator instance * is completed. * * In particular the returned future will be ready only after this coordinator succesfully * aquires the required locks. */ SharedSemiFuture getConstructionCompletionFuture() { return _constructionCompletionPromise.getFuture(); } /* * Returns a future that will be ready when all the work associated with this coordinator * isntances will be completed. * * In particular the returned future will be ready after this coordinator will succesfully * release all the aquired locks. */ SharedSemiFuture getCompletionFuture() { return _completionPromise.getFuture(); } DDLCoordinatorTypeEnum operationType() const { return _coordId.getOperationType(); } const ForwardableOperationMetadata& getForwardableOpMetadata() const { invariant(_forwardableOpMetadata); return _forwardableOpMetadata.get(); } const boost::optional& getDatabaseVersion() const& { return _databaseVersion; } protected: const NamespaceString& originalNss() const { return _coordId.getNss(); } virtual const NamespaceString& nss() const { if (const auto& bucketNss = metadata().getBucketNss()) { return bucketNss.get(); } return originalNss(); } virtual std::vector _acquireAdditionalLocks(OperationContext* opCtx) { return {}; }; virtual ShardingDDLCoordinatorMetadata const& metadata() const = 0; virtual void setMetadata(ShardingDDLCoordinatorMetadata&& metadata) = 0; /* * Performs a noop write on all shards and the configsvr using the sessionId and txnNumber * specified in 'osi'. */ void _performNoopRetryableWriteOnAllShardsAndConfigsvr( OperationContext* opCtx, const OperationSessionInfo& osi, const std::shared_ptr& executor); /* * Specify if the coordinator must indefinitely be retried in case of exceptions. It is always * expected for a coordinator to make progress after performing intermediate operations that * can't be rollbacked. */ virtual bool _mustAlwaysMakeProgress() { return false; }; /* * Specify if the given error will be retried by the ddl coordinator infrastructure. */ bool _isRetriableErrorForDDLCoordinator(const Status& status); ShardingDDLCoordinatorService* _service; const ShardingDDLCoordinatorId _coordId; const bool _recoveredFromDisk; const boost::optional _forwardableOpMetadata; const boost::optional _databaseVersion; bool _firstExecution{ true}; // True only when executing the coordinator for the first time (meaning it's not a // retry after a retryable error nor a recovered instance from a previous primary) bool _completeOnError{false}; private: SemiFuture run(std::shared_ptr executor, const CancellationToken& token) noexcept override final; virtual ExecutorFuture _runImpl(std::shared_ptr executor, const CancellationToken& token) noexcept = 0; virtual ExecutorFuture _cleanupOnAbort( std::shared_ptr executor, const CancellationToken& token, const Status& status) noexcept; void interrupt(Status status) override final; bool _removeDocument(OperationContext* opCtx); ExecutorFuture _removeDocumentUntillSuccessOrStepdown( std::shared_ptr executor); ExecutorFuture _acquireLockAsync(std::shared_ptr executor, const CancellationToken& token, StringData resource); ExecutorFuture _translateTimeseriesNss( std::shared_ptr executor, const CancellationToken& token); virtual boost::optional getAbortReason() const; Mutex _mutex = MONGO_MAKE_LATCH("ShardingDDLCoordinator::_mutex"); SharedPromise _constructionCompletionPromise; SharedPromise _completionPromise; std::stack _scopedLocks; }; template class ShardingDDLCoordinatorImpl : public ShardingDDLCoordinator { public: boost::optional reportForCurrentOp( MongoProcessInterface::CurrentOpConnectionsMode connMode, MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept override { return basicReportBuilder().obj(); } protected: ShardingDDLCoordinatorImpl(ShardingDDLCoordinatorService* service, const std::string& name, const BSONObj& initialStateDoc) : ShardingDDLCoordinator(service, initialStateDoc), _coordinatorName(name), /* * Force a deserialisation + serialisation of the initialStateDoc to ensure that * _initialState is a full deep copy of the received parameter. */ _initialState( StateDoc::parse(IDLParserContext("CoordinatorInitialState"), initialStateDoc) .toBSON()), _doc(StateDoc::parse(IDLParserContext("CoordinatorDocument"), _initialState)) {} ShardingDDLCoordinatorMetadata const& metadata() const override { return _doc.getShardingDDLCoordinatorMetadata(); } void setMetadata(ShardingDDLCoordinatorMetadata&& metadata) override { stdx::lock_guard lk{_docMutex}; _doc.setShardingDDLCoordinatorMetadata(std::move(metadata)); } virtual void appendCommandInfo(BSONObjBuilder* cmdInfoBuilder) const {}; virtual BSONObjBuilder basicReportBuilder() const noexcept { BSONObjBuilder bob; // Append static info bob.append("type", "op"); bob.append("ns", originalNss().toString()); bob.append("desc", _coordinatorName); bob.append("op", "command"); bob.append("active", true); // Append dynamic fields from the state doc { stdx::lock_guard lk{_docMutex}; if (const auto& bucketNss = _doc.getBucketNss()) { // Bucket namespace is only present in case the collection is a sharded timeseries bob.append("bucketNamespace", bucketNss.get().toString()); } } // Create command description BSONObjBuilder cmdInfoBuilder; { stdx::lock_guard lk{_docMutex}; if (const auto& optComment = getForwardableOpMetadata().getComment()) { cmdInfoBuilder.append(optComment.get().firstElement()); } } appendCommandInfo(&cmdInfoBuilder); bob.append("command", cmdInfoBuilder.obj()); return bob; } const std::string _coordinatorName; const BSONObj _initialState; mutable Mutex _docMutex = MONGO_MAKE_LATCH("ShardingDDLCoordinator::_docMutex"); StateDoc _doc; }; template class RecoverableShardingDDLCoordinator : public ShardingDDLCoordinatorImpl { protected: using ShardingDDLCoordinatorImpl::_doc; using ShardingDDLCoordinatorImpl::_docMutex; RecoverableShardingDDLCoordinator(ShardingDDLCoordinatorService* service, const std::string& name, const BSONObj& initialStateDoc) : ShardingDDLCoordinatorImpl(service, name, initialStateDoc) {} virtual StringData serializePhase(const Phase& phase) const = 0; template auto _buildPhaseHandler(const Phase& newPhase, Func&& handlerFn) { return [=, this] { const auto& currPhase = _doc.getPhase(); if (currPhase > newPhase) { // Do not execute this phase if we already reached a subsequent one. return; } if (currPhase < newPhase) { // Persist the new phase if this is the first time we are executing it. _enterPhase(newPhase); } return handlerFn(); }; } virtual void _enterPhase(const Phase& newPhase) { auto newDoc = [&] { stdx::lock_guard lk{_docMutex}; return _doc; }(); newDoc.setPhase(newPhase); LOGV2_DEBUG(5390501, 2, "DDL coordinator phase transition", "coordinatorId"_attr = _doc.getId(), "newPhase"_attr = serializePhase(newDoc.getPhase()), "oldPhase"_attr = serializePhase(_doc.getPhase())); ServiceContext::UniqueOperationContext uniqueOpCtx; auto opCtx = cc().getOperationContext(); if (!opCtx) { uniqueOpCtx = cc().makeOperationContext(); opCtx = uniqueOpCtx.get(); } if (_doc.getPhase() == Phase::kUnset) { _insertStateDocument(opCtx, std::move(newDoc)); } else { _updateStateDocument(opCtx, std::move(newDoc)); } } BSONObjBuilder basicReportBuilder() const noexcept override { auto baseReportBuilder = ShardingDDLCoordinatorImpl::basicReportBuilder(); const auto currPhase = [&]() { stdx::lock_guard l{_docMutex}; return _doc.getPhase(); }(); baseReportBuilder.append("currentPhase", serializePhase(currPhase)); return baseReportBuilder; } void _insertStateDocument(OperationContext* opCtx, StateDoc&& newDoc) { auto copyMetadata = newDoc.getShardingDDLCoordinatorMetadata(); copyMetadata.setRecoveredFromDisk(true); newDoc.setShardingDDLCoordinatorMetadata(copyMetadata); PersistentTaskStore store(NamespaceString::kShardingDDLCoordinatorsNamespace); try { store.add(opCtx, newDoc, WriteConcerns::kMajorityWriteConcernNoTimeout); } catch (const ExceptionFor&) { // A series of step-up and step-down events can cause a node to try and insert the // document when it has already been persisted locally, but we must still wait for // majority commit. const auto replCoord = repl::ReplicationCoordinator::get(opCtx); const auto lastLocalOpTime = replCoord->getMyLastAppliedOpTime(); WaitForMajorityService::get(opCtx->getServiceContext()) .waitUntilMajority(lastLocalOpTime, opCtx->getCancellationToken()) .get(opCtx); } { stdx::lock_guard lk{_docMutex}; _doc = std::move(newDoc); } } void _updateStateDocument(OperationContext* opCtx, StateDoc&& newDoc) { PersistentTaskStore store(NamespaceString::kShardingDDLCoordinatorsNamespace); invariant(newDoc.getShardingDDLCoordinatorMetadata().getRecoveredFromDisk()); store.update(opCtx, BSON(StateDoc::kIdFieldName << newDoc.getId().toBSON()), newDoc.toBSON(), WriteConcerns::kMajorityWriteConcernNoTimeout); { stdx::lock_guard lk{_docMutex}; _doc = std::move(newDoc); } } // lazily acquire Logical Session ID and a txn number void _updateSession(OperationContext* opCtx) { auto newDoc = [&] { stdx::lock_guard lk{_docMutex}; return _doc; }(); auto newShardingDDLCoordinatorMetadata = newDoc.getShardingDDLCoordinatorMetadata(); auto optSession = newShardingDDLCoordinatorMetadata.getSession(); if (optSession) { auto txnNumber = optSession->getTxnNumber(); optSession->setTxnNumber(++txnNumber); newShardingDDLCoordinatorMetadata.setSession(optSession); } else { auto session = InternalSessionPool::get(opCtx)->acquireSystemSession(); newShardingDDLCoordinatorMetadata.setSession( ShardingDDLSession(session.getSessionId(), session.getTxnNumber())); } newDoc.setShardingDDLCoordinatorMetadata(std::move(newShardingDDLCoordinatorMetadata)); _updateStateDocument(opCtx, std::move(newDoc)); } OperationSessionInfo getCurrentSession() const { auto optSession = [&] { stdx::lock_guard lk{_docMutex}; return _doc.getShardingDDLCoordinatorMetadata().getSession(); }(); invariant(optSession); OperationSessionInfo osi; osi.setSessionId(optSession->getLsid()); osi.setTxnNumber(optSession->getTxnNumber()); return osi; } OperationSessionInfo getNewSession(OperationContext* opCtx) { _updateSession(opCtx); return getCurrentSession(); } virtual boost::optional getAbortReason() const override { const auto& status = _doc.getAbortReason(); invariant(!status || !status->isOK(), "when persisted, status must be an error"); return status; } /** * Persists the abort reason and throws it as an exception. This causes the coordinator to fail, * and triggers the cleanup future chain since there is a the persisted reason. */ void triggerCleanup(OperationContext* opCtx, const Status& status) { LOGV2_INFO(7418502, "Coordinator failed, persisting abort reason", "coordinatorId"_attr = _doc.getId(), "phase"_attr = serializePhase(_doc.getPhase()), "reason"_attr = redact(status)); auto newDoc = [&] { stdx::lock_guard lk{_docMutex}; return _doc; }(); auto coordinatorMetadata = newDoc.getShardingDDLCoordinatorMetadata(); coordinatorMetadata.setAbortReason(status); newDoc.setShardingDDLCoordinatorMetadata(std::move(coordinatorMetadata)); _updateStateDocument(opCtx, std::move(newDoc)); uassertStatusOK(status); } }; #undef MONGO_LOGV2_DEFAULT_COMPONENT } // namespace mongo