/**
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/list.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/service_executor.h"
#include "mongo/transport/service_executor_task_names.h"
#include "mongo/util/tick_source.h"
#include
namespace mongo {
namespace transport {
/**
* This is an ASIO-based adaptive ServiceExecutor. It guarantees that threads will not become stuck
* or deadlocked longer that its configured timeout and that idle threads will terminate themselves
* if they spend more than its configure idle threshold idle.
*/
class ServiceExecutorAdaptive : public ServiceExecutor {
public:
struct Options {
virtual ~Options() = default;
// The minimum number of threads the executor will keep running to service tasks.
virtual int reservedThreads() const = 0;
// The amount of time each worker thread runs before considering exiting because of
// idleness.
virtual Milliseconds workerThreadRunTime() const = 0;
// workerThreadRuntime() is offset by a random value between -jitter and +jitter to prevent
// thundering herds
virtual int runTimeJitter() const = 0;
// The amount of time the controller thread will wait before checking for stuck threads
// to guarantee forward progress
virtual Milliseconds stuckThreadTimeout() const = 0;
// The maximum allowed latency between when a task is scheduled and a thread is started to
// service it.
virtual Microseconds maxQueueLatency() const = 0;
// Threads that spend less than this threshold doing work during their workerThreadRunTime
// period will exit
virtual int idlePctThreshold() const = 0;
// The maximum allowable depth of recursion for tasks scheduled with the MayRecurse flag
// before stack unwinding is forced.
virtual int recursionLimit() const = 0;
};
explicit ServiceExecutorAdaptive(ServiceContext* ctx, std::shared_ptr ioCtx);
explicit ServiceExecutorAdaptive(ServiceContext* ctx,
std::shared_ptr ioCtx,
std::unique_ptr config);
ServiceExecutorAdaptive(ServiceExecutorAdaptive&&) = default;
ServiceExecutorAdaptive& operator=(ServiceExecutorAdaptive&&) = default;
virtual ~ServiceExecutorAdaptive();
Status start() final;
Status shutdown(Milliseconds timeout) final;
Status schedule(Task task, ScheduleFlags flags, ServiceExecutorTaskName taskName) final;
Mode transportMode() const final {
return Mode::kAsynchronous;
}
void appendStats(BSONObjBuilder* bob) const final;
int threadsRunning() {
return _threadsRunning.load();
}
private:
class TickTimer {
public:
explicit TickTimer(TickSource* tickSource)
: _tickSource(tickSource),
_ticksPerMillisecond(_tickSource->getTicksPerSecond() / 1000),
_start(_tickSource->getTicks()) {
invariant(_ticksPerMillisecond > 0);
}
TickSource::Tick sinceStartTicks() const {
return _tickSource->getTicks() - _start.load();
}
Milliseconds sinceStart() const {
return Milliseconds{sinceStartTicks() / _ticksPerMillisecond};
}
void reset() {
_start.store(_tickSource->getTicks());
}
private:
TickSource* const _tickSource;
const TickSource::Tick _ticksPerMillisecond;
AtomicWord _start;
};
class CumulativeTickTimer {
public:
CumulativeTickTimer(TickSource* ts) : _timer(ts) {}
TickSource::Tick markStopped() {
stdx::lock_guard lk(_mutex);
invariant(_running);
_running = false;
auto curTime = _timer.sinceStartTicks();
_accumulator += curTime;
return curTime;
}
void markRunning() {
stdx::lock_guard lk(_mutex);
invariant(!_running);
_timer.reset();
_running = true;
}
TickSource::Tick totalTime() const {
stdx::lock_guard lk(_mutex);
if (!_running)
return _accumulator;
return _timer.sinceStartTicks() + _accumulator;
}
private:
TickTimer _timer;
mutable stdx::mutex _mutex;
TickSource::Tick _accumulator = 0;
bool _running = false;
};
struct Metrics {
AtomicWord _totalQueued{0};
AtomicWord _totalExecuted{0};
AtomicWord _totalSpentQueued{0};
AtomicWord _totalSpentExecuting{0};
};
using MetricsArray =
std::array(ServiceExecutorTaskName::kMaxTaskName)>;
enum class ThreadCreationReason { kStuckDetection, kStarvation, kReserveMinimum, kError, kMax };
enum class ThreadTimer { kRunning, kExecuting };
struct ThreadState {
ThreadState(TickSource* ts) : running(ts), executing(ts) {}
CumulativeTickTimer running;
TickSource::Tick executingCurRun;
CumulativeTickTimer executing;
MetricsArray threadMetrics;
std::int64_t markIdleCounter = 0;
int recursionDepth = 0;
};
using ThreadList = stdx::list;
void _startWorkerThread(ThreadCreationReason reason);
static StringData _threadStartedByToString(ThreadCreationReason reason);
void _workerThreadRoutine(int threadId, ThreadList::iterator it);
void _controllerThreadRoutine();
bool _isStarved() const;
Milliseconds _getThreadJitter() const;
void _accumulateTaskMetrics(MetricsArray* outArray, const MetricsArray& inputArray) const;
void _accumulateAllTaskMetrics(MetricsArray* outputMetricsArray,
const stdx::unique_lock& lk) const;
TickSource::Tick _getThreadTimerTotal(ThreadTimer which,
const stdx::unique_lock& lk) const;
std::shared_ptr _ioContext;
std::unique_ptr _config;
mutable stdx::mutex _threadsMutex;
ThreadList _threads;
std::array(ThreadCreationReason::kMax)> _threadStartCounters;
stdx::thread _controllerThread;
TickSource* const _tickSource;
AtomicWord _isRunning{false};
// These counters are used to detect stuck threads and high task queuing.
AtomicWord _threadsRunning{0};
AtomicWord _threadsPending{0};
AtomicWord _threadsInUse{0};
AtomicWord _tasksQueued{0};
AtomicWord _deferredTasksQueued{0};
TickTimer _lastScheduleTimer;
AtomicWord _pastThreadsSpentExecuting{0};
AtomicWord _pastThreadsSpentRunning{0};
static thread_local ThreadState* _localThreadState;
// These counters are only used for reporting in serverStatus.
AtomicWord _totalQueued{0};
AtomicWord _totalExecuted{0};
AtomicWord _totalSpentQueued{0};
// Threads signal this condition variable when they exit so we can gracefully shutdown
// the executor.
stdx::condition_variable _deathCondition;
// Tasks should signal this condition variable if they want the thread controller to
// track their progress and do fast stuck detection
AtomicWord _starvationCheckRequests{0};
stdx::condition_variable _scheduleCondition;
MetricsArray _accumulatedMetrics;
};
} // namespace transport
} // namespace mongo