/** * Copyright (C) 2022-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include "mongo/db/repl/replica_set_aware_service.h" #include "mongo/db/s/range_deletion_task_gen.h" #include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/util/future_util.h" namespace mongo { class RangeDeleterService : public ReplicaSetAwareServiceShardSvr { public: RangeDeleterService() = default; static RangeDeleterService* get(ServiceContext* serviceContext); static RangeDeleterService* get(OperationContext* opCtx); private: /* * In memory representation of registered range deletion tasks. To each non-pending range * deletion task corresponds a registered task on the service. */ class RangeDeletion : public ChunkRange { public: RangeDeletion(const RangeDeletionTask& task) : ChunkRange(task.getRange().getMin(), task.getRange().getMax()) {} ~RangeDeletion() { if (!_completionPromise.getFuture().isReady()) { _completionPromise.setError( Status{ErrorCodes::Interrupted, "Range deletion interrupted"}); } } SharedSemiFuture getPendingFuture() { return _pendingPromise.getFuture(); } void clearPending() { if (!_pendingPromise.getFuture().isReady()) { _pendingPromise.emplaceValue(); } } SharedSemiFuture getCompletionFuture() const { return _completionPromise.getFuture().semi().share(); } void makeReady() { _completionPromise.emplaceValue(); } private: // Marked ready once the range deletion has been fully processed SharedPromise _completionPromise; SharedPromise _pendingPromise; }; /* * Internal comparator to sort ranges in _rangeDeletionTasks's sets. * * NB: it ONLY makes sense to use this on ranges that are comparable, meaning * the ones based on the same key pattern (aka the ones belonging to the same * sharded collection). */ struct RANGES_COMPARATOR { bool operator()(const std::shared_ptr& a, const std::shared_ptr& b) const { return a->getMin().woCompare(b->getMin()) < 0; } }; /* * Class enclosing a thread continuously processing "ready" range deletions, meaning tasks * that are allowed to be processed (already drained ongoing queries and already waited for * `orphanCleanupDelaySecs`). */ class ReadyRangeDeletionsProcessor { public: ReadyRangeDeletionsProcessor(OperationContext* opCtx); ~ReadyRangeDeletionsProcessor(); /* * Interrupt ongoing range deletions */ void shutdown(); /* * Schedule a range deletion at the end of the queue */ void emplaceRangeDeletion(const RangeDeletionTask& rdt); private: /* * Return true if this processor have been shutted down */ bool _stopRequested() const; /* * Remove a range deletion from the head of the queue. Supposed to be called only once a * range deletion successfully finishes. */ void _completedRangeDeletion(); /* * Code executed by the internal thread */ void _runRangeDeletions(); mutable Mutex _mutex = MONGO_MAKE_LATCH("ReadyRangeDeletionsProcessor"); /* * Condition variable notified when: * - The component has been initialized (the operation context has been instantiated) * - The instance is shutting down (the operation context has been marked killed) * - A new range deletion is scheduled (the queue size has increased by one) */ stdx::condition_variable _condVar; /* Queue containing scheduled range deletions */ std::queue _queue; /* Pointer to the (one and only) operation context used by the thread */ ServiceContext::UniqueOperationContext _threadOpCtxHolder; /* Thread consuming the range deletions queue */ stdx::thread _thread; enum State { kRunning, kStopped }; State _state{kRunning}; }; // Keeping track of per-collection registered range deletion tasks stdx::unordered_map, RANGES_COMPARATOR>, UUID::Hash> _rangeDeletionTasks; // Mono-threaded executor processing range deletion tasks std::shared_ptr _executor; enum State { kInitializing, kUp, kDown }; State _state{kDown}; // Future markes as ready when the state changes to "up" SemiFuture _stepUpCompletedFuture; // Operation context used for initialization ServiceContext::UniqueOperationContext _initOpCtxHolder; /* Acquire mutex only if service is up (for "user" operation) */ [[nodiscard]] stdx::unique_lock _acquireMutexFailIfServiceNotUp() { stdx::unique_lock lg(_mutex_DO_NOT_USE_DIRECTLY); uassert(ErrorCodes::NotYetInitialized, "Range deleter service not up", _state == kUp); return lg; } /* Unconditionally acquire mutex (for internal operations) */ [[nodiscard]] stdx::unique_lock _acquireMutexUnconditionally() { stdx::unique_lock lg(_mutex_DO_NOT_USE_DIRECTLY); return lg; } // TODO SERVER-67642 implement fine-grained per-collection locking // Protecting the access to all class members (DO NOT USE DIRECTLY: rely on // `_acquireMutexUnconditionally` and `_acquireMutexFailIfServiceNotUp`) Mutex _mutex_DO_NOT_USE_DIRECTLY = MONGO_MAKE_LATCH("RangeDeleterService::_mutex"); public: /* * Register a task on the range deleter service. * Returns a future that will be marked ready once the range deletion will be completed. * * In case of trying to register an already existing task, the original future will be returned. * * A task can be registered only if the service is up (except for tasks resubmitted on step-up). * * When a task is registered as `pending`, it can be unblocked by calling again the same method * with `pending=false`. */ SharedSemiFuture registerTask( const RangeDeletionTask& rdt, SemiFuture&& waitForActiveQueriesToComplete = SemiFuture::makeReady(), bool fromResubmitOnStepUp = false, bool pending = false); /* * Deregister a task from the range deleter service. */ void deregisterTask(const UUID& collUUID, const ChunkRange& range); /* * Returns the number of registered range deletion tasks for a collection */ int getNumRangeDeletionTasksForCollection(const UUID& collectionUUID); /* * Returns a future marked as ready when all overlapping range deletion tasks complete. * * NB: in case an overlapping range deletion task is registered AFTER invoking this method, * it will not be taken into account. Handling this scenario is responsibility of the caller. * */ SharedSemiFuture getOverlappingRangeDeletionsFuture(const UUID& collectionUUID, const ChunkRange& range); /* ReplicaSetAwareServiceShardSvr implemented methods */ void onStartup(OperationContext* opCtx) override; void onStepUpComplete(OperationContext* opCtx, long long term) override; void onStepDown() override; void onShutdown() override; /* * Returns the RangeDeleterService state with the following schema: * {collectionUUIDA: [{min: x, max: y}, {min: w, max: z}....], collectionUUIDB: ......} */ BSONObj dumpState(); /* * Returns the total number of range deletion tasks registered on the service. */ long long totalNumOfRegisteredTasks(); /* ONLY FOR TESTING: wait for the state to become "up" */ void _waitForRangeDeleterServiceUp_FOR_TESTING() { _stepUpCompletedFuture.get(); } std::unique_ptr _readyRangeDeletionsProcessorPtr; private: /* Join all threads and executor and reset the in memory state of the service * Used for onStartUpBegin and on onShutdown */ void _joinAndResetState(); /* Asynchronously register range deletions on the service. To be called on on step-up */ void _recoverRangeDeletionsOnStepUp(OperationContext* opCtx); /* Called by shutdown/stepdown hooks to interrupt the service */ void _stopService(); /* ReplicaSetAwareServiceShardSvr "empty implemented" methods */ void onInitialDataAvailable(OperationContext* opCtx, bool isMajorityDataAvailable) override final {} void onStepUpBegin(OperationContext* opCtx, long long term) override final{}; void onBecomeArbiter() override final {} }; } // namespace mongo