From 20c85d4848b4e4b3c88e1788eaff362143fffd20 Mon Sep 17 00:00:00 2001 From: Jonathan Reams Date: Tue, 25 Jul 2017 16:26:29 -0400 Subject: SERVER-29838 Implement basic adaptive ServiceExecutor --- src/mongo/transport/service_executor_adaptive.h | 172 ++++++++++++++++++++++++ 1 file changed, 172 insertions(+) create mode 100644 src/mongo/transport/service_executor_adaptive.h (limited to 'src/mongo/transport/service_executor_adaptive.h') diff --git a/src/mongo/transport/service_executor_adaptive.h b/src/mongo/transport/service_executor_adaptive.h new file mode 100644 index 00000000000..1ff32831d16 --- /dev/null +++ b/src/mongo/transport/service_executor_adaptive.h @@ -0,0 +1,172 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include + +#include "mongo/db/service_context.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/list.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/service_executor.h" +#include "mongo/util/tick_source.h" + +#include + +namespace mongo { +namespace transport { + +/** + * This is an ASIO-based adaptive ServiceExecutor. It guarantees that threads will not become stuck + * or deadlocked longer that its configured timeout and that idle threads will terminate themselves + * if they spend more than its configure idle threshold idle. + */ +class ServiceExecutorAdaptive : public ServiceExecutor { +public: + struct Options { + virtual ~Options() = default; + // The minimum number of threads the executor will keep running to service tasks. + virtual int reservedThreads() const = 0; + + // The amount of time each worker thread runs before considering exiting because of + // idleness. + virtual Milliseconds workerThreadRunTime() const = 0; + + // workerThreadRuntime() is offset by a random value between -jitter and +jitter to prevent + // thundering herds + virtual int runTimeJitter() const = 0; + + // The amount of time the controller thread will wait before checking for stuck threads + // to guarantee forward progress + virtual Milliseconds stuckThreadTimeout() const = 0; + + // The maximum allowed latency between when a task is scheduled and a thread is started to + // service it. + virtual Microseconds maxQueueLatency() const = 0; + + // Threads that spend less than this threshold doing work during their workerThreadRunTime + // period will exit + virtual int idlePctThreshold() const = 0; + }; + + explicit ServiceExecutorAdaptive(ServiceContext* ctx, std::shared_ptr ioCtx); + explicit ServiceExecutorAdaptive(ServiceContext* ctx, + std::shared_ptr ioCtx, + std::unique_ptr config); + + ServiceExecutorAdaptive(ServiceExecutorAdaptive&&) = default; + ServiceExecutorAdaptive& operator=(ServiceExecutorAdaptive&&) = default; + virtual ~ServiceExecutorAdaptive(); + + Status start() final; + Status shutdown() final; + Status schedule(Task task, ScheduleFlags flags) final; + + void appendStats(BSONObjBuilder* bob) const final; + + int threadsRunning() { + return _threadsRunning.load(); + } + +private: + using ThreadList = stdx::list; + class TickTimer { + public: + explicit TickTimer(TickSource* tickSource) + : _tickSource(tickSource), + _ticksPerMillisecond(_tickSource->getTicksPerSecond() / 1000), + _start(_tickSource->getTicks()) { + invariant(_ticksPerMillisecond > 0); + } + + TickSource::Tick sinceStartTicks() const { + return _tickSource->getTicks() - _start.load(); + } + + Milliseconds sinceStart() const { + return Milliseconds{sinceStartTicks() / _ticksPerMillisecond}; + } + + void reset() { + _start.store(_tickSource->getTicks()); + } + + private: + TickSource* const _tickSource; + const TickSource::Tick _ticksPerMillisecond; + AtomicWord _start; + }; + + void _startWorkerThread(); + void _workerThreadRoutine(int threadId, ThreadList::iterator it); + void _controllerThreadRoutine(); + bool _isStarved(int pending = -1) const; + Milliseconds _getThreadJitter() const; + TickSource::Tick _getCurrentThreadsRunningTime() const; + + std::shared_ptr _ioContext; + + std::unique_ptr _config; + + stdx::mutex _threadsMutex; + ThreadList _threads; + stdx::thread _controllerThread; + + TickSource* const _tickSource; + AtomicWord _isRunning{false}; + + // These counters are used to detect stuck threads and high task queuing. + AtomicWord _threadsRunning{0}; + AtomicWord _threadsPending{0}; + AtomicWord _tasksExecuting{0}; + AtomicWord _tasksPending{0}; + TickTimer _lastScheduleTimer; + AtomicWord _totalSpentExecuting{0}; + AtomicWord _totalSpentRunning{0}; + + mutable stdx::mutex _threadsRunningTimersMutex; + std::list _threadsRunningTimers; + + // These counters are only used for reporting in serverStatus. + AtomicWord _totalScheduled{0}; + AtomicWord _totalExecuted{0}; + AtomicWord _totalSpentScheduled{0}; + + // Threads signal this condition variable when they exit so we can gracefully shutdown + // the executor. + stdx::condition_variable _deathCondition; + + // Tasks should signal this condition variable if they want the thread controller to + // track their progress and do fast stuck detection + stdx::condition_variable _scheduleCondition; +}; + +} // namespace transport +} // namespace mongo -- cgit v1.2.1