diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2017-07-25 16:26:29 -0400 |
---|---|---|
committer | Jonathan Reams <jbreams@mongodb.com> | 2017-08-02 07:16:24 -0400 |
commit | 20c85d4848b4e4b3c88e1788eaff362143fffd20 (patch) | |
tree | 8133945dc37f97fc507b2fa183dbe3f3916e3151 /src/mongo/transport/service_executor_adaptive.h | |
parent | c99aaa0d4152783f533883ebf311a947b7854d3d (diff) | |
download | mongo-20c85d4848b4e4b3c88e1788eaff362143fffd20.tar.gz |
SERVER-29838 Implement basic adaptive ServiceExecutor
Diffstat (limited to 'src/mongo/transport/service_executor_adaptive.h')
-rw-r--r-- | src/mongo/transport/service_executor_adaptive.h | 172 |
1 files changed, 172 insertions, 0 deletions
diff --git a/src/mongo/transport/service_executor_adaptive.h b/src/mongo/transport/service_executor_adaptive.h new file mode 100644 index 00000000000..1ff32831d16 --- /dev/null +++ b/src/mongo/transport/service_executor_adaptive.h @@ -0,0 +1,172 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/db/service_context.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/list.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/service_executor.h" +#include "mongo/util/tick_source.h" + +#include <asio.hpp> + +namespace mongo { +namespace transport { + +/** + * This is an ASIO-based adaptive ServiceExecutor. It guarantees that threads will not become stuck + * or deadlocked longer that its configured timeout and that idle threads will terminate themselves + * if they spend more than its configure idle threshold idle. + */ +class ServiceExecutorAdaptive : public ServiceExecutor { +public: + struct Options { + virtual ~Options() = default; + // The minimum number of threads the executor will keep running to service tasks. + virtual int reservedThreads() const = 0; + + // The amount of time each worker thread runs before considering exiting because of + // idleness. + virtual Milliseconds workerThreadRunTime() const = 0; + + // workerThreadRuntime() is offset by a random value between -jitter and +jitter to prevent + // thundering herds + virtual int runTimeJitter() const = 0; + + // The amount of time the controller thread will wait before checking for stuck threads + // to guarantee forward progress + virtual Milliseconds stuckThreadTimeout() const = 0; + + // The maximum allowed latency between when a task is scheduled and a thread is started to + // service it. + virtual Microseconds maxQueueLatency() const = 0; + + // Threads that spend less than this threshold doing work during their workerThreadRunTime + // period will exit + virtual int idlePctThreshold() const = 0; + }; + + explicit ServiceExecutorAdaptive(ServiceContext* ctx, std::shared_ptr<asio::io_context> ioCtx); + explicit ServiceExecutorAdaptive(ServiceContext* ctx, + std::shared_ptr<asio::io_context> ioCtx, + std::unique_ptr<Options> config); + + ServiceExecutorAdaptive(ServiceExecutorAdaptive&&) = default; + ServiceExecutorAdaptive& operator=(ServiceExecutorAdaptive&&) = default; + virtual ~ServiceExecutorAdaptive(); + + Status start() final; + Status shutdown() final; + Status schedule(Task task, ScheduleFlags flags) final; + + void appendStats(BSONObjBuilder* bob) const final; + + int threadsRunning() { + return _threadsRunning.load(); + } + +private: + using ThreadList = stdx::list<stdx::thread>; + class TickTimer { + public: + explicit TickTimer(TickSource* tickSource) + : _tickSource(tickSource), + _ticksPerMillisecond(_tickSource->getTicksPerSecond() / 1000), + _start(_tickSource->getTicks()) { + invariant(_ticksPerMillisecond > 0); + } + + TickSource::Tick sinceStartTicks() const { + return _tickSource->getTicks() - _start.load(); + } + + Milliseconds sinceStart() const { + return Milliseconds{sinceStartTicks() / _ticksPerMillisecond}; + } + + void reset() { + _start.store(_tickSource->getTicks()); + } + + private: + TickSource* const _tickSource; + const TickSource::Tick _ticksPerMillisecond; + AtomicWord<TickSource::Tick> _start; + }; + + void _startWorkerThread(); + void _workerThreadRoutine(int threadId, ThreadList::iterator it); + void _controllerThreadRoutine(); + bool _isStarved(int pending = -1) const; + Milliseconds _getThreadJitter() const; + TickSource::Tick _getCurrentThreadsRunningTime() const; + + std::shared_ptr<asio::io_context> _ioContext; + + std::unique_ptr<Options> _config; + + stdx::mutex _threadsMutex; + ThreadList _threads; + stdx::thread _controllerThread; + + TickSource* const _tickSource; + AtomicWord<bool> _isRunning{false}; + + // These counters are used to detect stuck threads and high task queuing. + AtomicWord<int> _threadsRunning{0}; + AtomicWord<int> _threadsPending{0}; + AtomicWord<int> _tasksExecuting{0}; + AtomicWord<int> _tasksPending{0}; + TickTimer _lastScheduleTimer; + AtomicWord<TickSource::Tick> _totalSpentExecuting{0}; + AtomicWord<TickSource::Tick> _totalSpentRunning{0}; + + mutable stdx::mutex _threadsRunningTimersMutex; + std::list<TickTimer> _threadsRunningTimers; + + // These counters are only used for reporting in serverStatus. + AtomicWord<int64_t> _totalScheduled{0}; + AtomicWord<int64_t> _totalExecuted{0}; + AtomicWord<TickSource::Tick> _totalSpentScheduled{0}; + + // Threads signal this condition variable when they exit so we can gracefully shutdown + // the executor. + stdx::condition_variable _deathCondition; + + // Tasks should signal this condition variable if they want the thread controller to + // track their progress and do fast stuck detection + stdx::condition_variable _scheduleCondition; +}; + +} // namespace transport +} // namespace mongo |