summaryrefslogtreecommitdiff
path: root/src/mongo/transport/service_executor_adaptive.h
diff options
context:
space:
mode:
authorJonathan Reams <jbreams@mongodb.com>2017-07-25 16:26:29 -0400
committerJonathan Reams <jbreams@mongodb.com>2017-08-02 07:16:24 -0400
commit20c85d4848b4e4b3c88e1788eaff362143fffd20 (patch)
tree8133945dc37f97fc507b2fa183dbe3f3916e3151 /src/mongo/transport/service_executor_adaptive.h
parentc99aaa0d4152783f533883ebf311a947b7854d3d (diff)
downloadmongo-20c85d4848b4e4b3c88e1788eaff362143fffd20.tar.gz
SERVER-29838 Implement basic adaptive ServiceExecutor
Diffstat (limited to 'src/mongo/transport/service_executor_adaptive.h')
-rw-r--r--src/mongo/transport/service_executor_adaptive.h172
1 files changed, 172 insertions, 0 deletions
diff --git a/src/mongo/transport/service_executor_adaptive.h b/src/mongo/transport/service_executor_adaptive.h
new file mode 100644
index 00000000000..1ff32831d16
--- /dev/null
+++ b/src/mongo/transport/service_executor_adaptive.h
@@ -0,0 +1,172 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/db/service_context.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/list.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/transport/service_executor.h"
+#include "mongo/util/tick_source.h"
+
+#include <asio.hpp>
+
+namespace mongo {
+namespace transport {
+
+/**
+ * This is an ASIO-based adaptive ServiceExecutor. It guarantees that threads will not become stuck
+ * or deadlocked longer that its configured timeout and that idle threads will terminate themselves
+ * if they spend more than its configure idle threshold idle.
+ */
+class ServiceExecutorAdaptive : public ServiceExecutor {
+public:
+ struct Options {
+ virtual ~Options() = default;
+ // The minimum number of threads the executor will keep running to service tasks.
+ virtual int reservedThreads() const = 0;
+
+ // The amount of time each worker thread runs before considering exiting because of
+ // idleness.
+ virtual Milliseconds workerThreadRunTime() const = 0;
+
+ // workerThreadRuntime() is offset by a random value between -jitter and +jitter to prevent
+ // thundering herds
+ virtual int runTimeJitter() const = 0;
+
+ // The amount of time the controller thread will wait before checking for stuck threads
+ // to guarantee forward progress
+ virtual Milliseconds stuckThreadTimeout() const = 0;
+
+ // The maximum allowed latency between when a task is scheduled and a thread is started to
+ // service it.
+ virtual Microseconds maxQueueLatency() const = 0;
+
+ // Threads that spend less than this threshold doing work during their workerThreadRunTime
+ // period will exit
+ virtual int idlePctThreshold() const = 0;
+ };
+
+ explicit ServiceExecutorAdaptive(ServiceContext* ctx, std::shared_ptr<asio::io_context> ioCtx);
+ explicit ServiceExecutorAdaptive(ServiceContext* ctx,
+ std::shared_ptr<asio::io_context> ioCtx,
+ std::unique_ptr<Options> config);
+
+ ServiceExecutorAdaptive(ServiceExecutorAdaptive&&) = default;
+ ServiceExecutorAdaptive& operator=(ServiceExecutorAdaptive&&) = default;
+ virtual ~ServiceExecutorAdaptive();
+
+ Status start() final;
+ Status shutdown() final;
+ Status schedule(Task task, ScheduleFlags flags) final;
+
+ void appendStats(BSONObjBuilder* bob) const final;
+
+ int threadsRunning() {
+ return _threadsRunning.load();
+ }
+
+private:
+ using ThreadList = stdx::list<stdx::thread>;
+ class TickTimer {
+ public:
+ explicit TickTimer(TickSource* tickSource)
+ : _tickSource(tickSource),
+ _ticksPerMillisecond(_tickSource->getTicksPerSecond() / 1000),
+ _start(_tickSource->getTicks()) {
+ invariant(_ticksPerMillisecond > 0);
+ }
+
+ TickSource::Tick sinceStartTicks() const {
+ return _tickSource->getTicks() - _start.load();
+ }
+
+ Milliseconds sinceStart() const {
+ return Milliseconds{sinceStartTicks() / _ticksPerMillisecond};
+ }
+
+ void reset() {
+ _start.store(_tickSource->getTicks());
+ }
+
+ private:
+ TickSource* const _tickSource;
+ const TickSource::Tick _ticksPerMillisecond;
+ AtomicWord<TickSource::Tick> _start;
+ };
+
+ void _startWorkerThread();
+ void _workerThreadRoutine(int threadId, ThreadList::iterator it);
+ void _controllerThreadRoutine();
+ bool _isStarved(int pending = -1) const;
+ Milliseconds _getThreadJitter() const;
+ TickSource::Tick _getCurrentThreadsRunningTime() const;
+
+ std::shared_ptr<asio::io_context> _ioContext;
+
+ std::unique_ptr<Options> _config;
+
+ stdx::mutex _threadsMutex;
+ ThreadList _threads;
+ stdx::thread _controllerThread;
+
+ TickSource* const _tickSource;
+ AtomicWord<bool> _isRunning{false};
+
+ // These counters are used to detect stuck threads and high task queuing.
+ AtomicWord<int> _threadsRunning{0};
+ AtomicWord<int> _threadsPending{0};
+ AtomicWord<int> _tasksExecuting{0};
+ AtomicWord<int> _tasksPending{0};
+ TickTimer _lastScheduleTimer;
+ AtomicWord<TickSource::Tick> _totalSpentExecuting{0};
+ AtomicWord<TickSource::Tick> _totalSpentRunning{0};
+
+ mutable stdx::mutex _threadsRunningTimersMutex;
+ std::list<TickTimer> _threadsRunningTimers;
+
+ // These counters are only used for reporting in serverStatus.
+ AtomicWord<int64_t> _totalScheduled{0};
+ AtomicWord<int64_t> _totalExecuted{0};
+ AtomicWord<TickSource::Tick> _totalSpentScheduled{0};
+
+ // Threads signal this condition variable when they exit so we can gracefully shutdown
+ // the executor.
+ stdx::condition_variable _deathCondition;
+
+ // Tasks should signal this condition variable if they want the thread controller to
+ // track their progress and do fast stuck detection
+ stdx::condition_variable _scheduleCondition;
+};
+
+} // namespace transport
+} // namespace mongo