/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT mongo::logger::LogComponent::kExecutor
#include "mongo/platform/basic.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include
#include
#include
#include "mongo/base/checked_cast.h"
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status_with.h"
#include "mongo/db/operation_context.h"
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/network_interface.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/transport/baton.h"
#include "mongo/util/concurrency/thread_pool_interface.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/time_support.h"
namespace mongo {
namespace executor {
namespace {
MONGO_FAIL_POINT_DEFINE(scheduleIntoPoolSpinsUntilThreadPoolShutsDown);
}
class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState {
MONGO_DISALLOW_COPYING(CallbackState);
public:
static std::shared_ptr make(CallbackFn&& cb,
Date_t readyDate,
const transport::BatonHandle& baton) {
return std::make_shared(std::move(cb), readyDate, baton);
}
/**
* Do not call directly. Use make.
*/
CallbackState(CallbackFn&& cb, Date_t theReadyDate, const transport::BatonHandle& baton)
: callback(std::move(cb)), readyDate(theReadyDate), baton(baton) {}
virtual ~CallbackState() = default;
bool isCanceled() const override {
return canceled.load() > 0;
}
void cancel() override {
MONGO_UNREACHABLE;
}
void waitForCompletion() override {
MONGO_UNREACHABLE;
}
// All fields except for "canceled" are guarded by the owning task executor's _mutex. The
// "canceled" field may be observed without holding _mutex, but may only be set while holding
// _mutex.
CallbackFn callback;
AtomicUInt32 canceled{0U};
WorkQueue::iterator iter;
Date_t readyDate;
bool isNetworkOperation = false;
AtomicWord isFinished{false};
boost::optional finishedCondition;
transport::BatonHandle baton;
};
class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState {
MONGO_DISALLOW_COPYING(EventState);
public:
static std::shared_ptr make() {
return std::make_shared();
}
EventState() = default;
void signal() override {
MONGO_UNREACHABLE;
}
void waitUntilSignaled() override {
MONGO_UNREACHABLE;
}
bool isSignaled() override {
MONGO_UNREACHABLE;
}
// All fields guarded by the owning task executor's _mutex.
bool isSignaledFlag = false;
stdx::condition_variable isSignaledCondition;
EventList::iterator iter;
WorkQueue waiters;
};
ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr pool,
std::shared_ptr net)
: _net(std::move(net)), _pool(std::move(pool)) {}
ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() {
shutdown();
auto lk = _join(stdx::unique_lock(_mutex));
invariant(_state == shutdownComplete);
}
void ThreadPoolTaskExecutor::startup() {
_net->startup();
stdx::lock_guard lk(_mutex);
if (_inShutdown_inlock()) {
return;
}
invariant(_state == preStart);
_setState_inlock(running);
_pool->startup();
}
void ThreadPoolTaskExecutor::shutdown() {
stdx::unique_lock lk(_mutex);
if (_inShutdown_inlock()) {
invariant(_networkInProgressQueue.empty());
invariant(_sleepersQueue.empty());
return;
}
_setState_inlock(joinRequired);
WorkQueue pending;
pending.splice(pending.end(), _networkInProgressQueue);
pending.splice(pending.end(), _sleepersQueue);
for (auto&& eventState : _unsignaledEvents) {
pending.splice(pending.end(), eventState->waiters);
}
for (auto&& cbState : pending) {
cbState->canceled.store(1);
}
for (auto&& cbState : _poolInProgressQueue) {
cbState->canceled.store(1);
}
scheduleIntoPool_inlock(&pending, std::move(lk));
_pool->shutdown();
}
void ThreadPoolTaskExecutor::join() {
_join(stdx::unique_lock(_mutex));
}
stdx::unique_lock ThreadPoolTaskExecutor::_join(stdx::unique_lock lk) {
_stateChange.wait(lk, [this] {
switch (_state) {
case preStart:
return false;
case running:
return false;
case joinRequired:
return true;
case joining:
return false;
case shutdownComplete:
return true;
}
MONGO_UNREACHABLE;
});
if (_state == shutdownComplete) {
return lk;
}
invariant(_state == joinRequired);
_setState_inlock(joining);
lk.unlock();
_pool->join();
lk.lock();
while (!_unsignaledEvents.empty()) {
auto eventState = _unsignaledEvents.front();
invariant(eventState->waiters.empty());
EventHandle event;
setEventForHandle(&event, std::move(eventState));
signalEvent_inlock(event, std::move(lk));
lk = stdx::unique_lock(_mutex);
}
lk.unlock();
_net->shutdown();
lk.lock();
// The _poolInProgressQueue may not be empty if the network interface attempted to schedule work
// into _pool after _pool->shutdown(). Because _pool->join() has returned, we know that any
// items left in _poolInProgressQueue will never be processed by another thread, so we process
// them now.
while (!_poolInProgressQueue.empty()) {
auto cbState = _poolInProgressQueue.front();
lk.unlock();
runCallback(std::move(cbState));
lk.lock();
}
invariant(_networkInProgressQueue.empty());
invariant(_sleepersQueue.empty());
invariant(_unsignaledEvents.empty());
_setState_inlock(shutdownComplete);
return lk;
}
void ThreadPoolTaskExecutor::appendDiagnosticBSON(BSONObjBuilder* b) const {
stdx::lock_guard lk(_mutex);
// ThreadPool details
// TODO: fill in
BSONObjBuilder poolCounters(b->subobjStart("pool"));
poolCounters.appendIntOrLL("inProgressCount", _poolInProgressQueue.size());
poolCounters.done();
// Queues
BSONObjBuilder queues(b->subobjStart("queues"));
queues.appendIntOrLL("networkInProgress", _networkInProgressQueue.size());
queues.appendIntOrLL("sleepers", _sleepersQueue.size());
queues.done();
b->appendIntOrLL("unsignaledEvents", _unsignaledEvents.size());
b->append("shuttingDown", _inShutdown_inlock());
b->append("networkInterface", _net->getDiagnosticString());
}
Date_t ThreadPoolTaskExecutor::now() {
return _net->now();
}
StatusWith ThreadPoolTaskExecutor::makeEvent() {
auto el = makeSingletonEventList();
EventHandle event;
setEventForHandle(&event, el.front());
stdx::lock_guard lk(_mutex);
if (_inShutdown_inlock()) {
return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
_unsignaledEvents.splice(_unsignaledEvents.end(), el);
return event;
}
void ThreadPoolTaskExecutor::signalEvent(const EventHandle& event) {
stdx::unique_lock lk(_mutex);
signalEvent_inlock(event, std::move(lk));
}
StatusWith ThreadPoolTaskExecutor::onEvent(const EventHandle& event,
const CallbackFn& work) {
if (!event.isValid()) {
return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"};
}
auto wq = makeSingletonWorkQueue(work, nullptr);
stdx::unique_lock lk(_mutex);
auto eventState = checked_cast(getEventFromHandle(event));
auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq);
if (!cbHandle.isOK()) {
return cbHandle;
}
if (eventState->isSignaledFlag) {
scheduleIntoPool_inlock(&eventState->waiters, std::move(lk));
}
return cbHandle;
}
StatusWith ThreadPoolTaskExecutor::waitForEvent(OperationContext* opCtx,
const EventHandle& event,
Date_t deadline) {
invariant(opCtx);
invariant(event.isValid());
auto eventState = checked_cast(getEventFromHandle(event));
stdx::unique_lock lk(_mutex);
// std::condition_variable::wait() can wake up spuriously, so we have to loop until the event
// is signalled or we time out.
while (!eventState->isSignaledFlag) {
auto status = opCtx->waitForConditionOrInterruptNoAssertUntil(
eventState->isSignaledCondition, lk, deadline);
if (!status.isOK() || stdx::cv_status::timeout == status) {
return status;
}
}
return stdx::cv_status::no_timeout;
}
void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) {
invariant(event.isValid());
auto eventState = checked_cast(getEventFromHandle(event));
stdx::unique_lock lk(_mutex);
while (!eventState->isSignaledFlag) {
eventState->isSignaledCondition.wait(lk);
}
}
StatusWith ThreadPoolTaskExecutor::scheduleWork(
const CallbackFn& work) {
auto wq = makeSingletonWorkQueue(work, nullptr);
WorkQueue temp;
stdx::unique_lock lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&temp, &wq);
if (!cbHandle.isOK()) {
return cbHandle;
}
scheduleIntoPool_inlock(&temp, std::move(lk));
return cbHandle;
}
StatusWith ThreadPoolTaskExecutor::scheduleWorkAt(
Date_t when, const CallbackFn& work) {
if (when <= now()) {
return scheduleWork(work);
}
auto wq = makeSingletonWorkQueue(work, nullptr, when);
stdx::unique_lock lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq);
if (!cbHandle.isOK()) {
return cbHandle;
}
lk.unlock();
_net->setAlarm(when,
[this, cbHandle] {
auto cbState =
checked_cast(getCallbackFromHandle(cbHandle.getValue()));
if (cbState->canceled.load()) {
return;
}
stdx::unique_lock lk(_mutex);
if (cbState->canceled.load()) {
return;
}
scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk));
},
nullptr)
.transitional_ignore();
return cbHandle;
}
namespace {
using ResponseStatus = TaskExecutor::ResponseStatus;
// If the request received a connection from the pool but failed in its execution,
// convert the raw Status in cbData to a RemoteCommandResponse so that the callback,
// which expects a RemoteCommandResponse as part of RemoteCommandCallbackArgs,
// can be run despite a RemoteCommandResponse never having been created.
void remoteCommandFinished(const TaskExecutor::CallbackArgs& cbData,
const TaskExecutor::RemoteCommandCallbackFn& cb,
const RemoteCommandRequest& request,
const ResponseStatus& rs) {
cb(TaskExecutor::RemoteCommandCallbackArgs(cbData.executor, cbData.myHandle, request, rs));
}
// If the request failed to receive a connection from the pool,
// convert the raw Status in cbData to a RemoteCommandResponse so that the callback,
// which expects a RemoteCommandResponse as part of RemoteCommandCallbackArgs,
// can be run despite a RemoteCommandResponse never having been created.
void remoteCommandFailedEarly(const TaskExecutor::CallbackArgs& cbData,
const TaskExecutor::RemoteCommandCallbackFn& cb,
const RemoteCommandRequest& request) {
invariant(!cbData.status.isOK());
cb(TaskExecutor::RemoteCommandCallbackArgs(
cbData.executor, cbData.myHandle, request, {cbData.status}));
}
} // namespace
StatusWith ThreadPoolTaskExecutor::scheduleRemoteCommand(
const RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
const transport::BatonHandle& baton) {
RemoteCommandRequest scheduledRequest = request;
if (request.timeout == RemoteCommandRequest::kNoTimeout) {
scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate;
} else {
scheduledRequest.expirationDate = _net->now() + scheduledRequest.timeout;
}
// In case the request fails to even get a connection from the pool,
// we wrap the callback in a method that prepares its input parameters.
auto wq = makeSingletonWorkQueue(
[scheduledRequest, cb](const CallbackArgs& cbData) {
remoteCommandFailedEarly(cbData, cb, scheduledRequest);
},
baton);
wq.front()->isNetworkOperation = true;
stdx::unique_lock lk(_mutex);
auto swCbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, &wq);
if (!swCbHandle.isOK())
return swCbHandle;
const auto cbState = _networkInProgressQueue.back();
LOG(3) << "Scheduling remote command request: " << redact(scheduledRequest.toString());
lk.unlock();
auto commandStatus = _net->startCommand(
swCbHandle.getValue(),
scheduledRequest,
[this, scheduledRequest, cbState, cb](const ResponseStatus& response) {
using std::swap;
CallbackFn newCb = [cb, scheduledRequest, response](const CallbackArgs& cbData) {
remoteCommandFinished(cbData, cb, scheduledRequest, response);
};
stdx::unique_lock lk(_mutex);
if (_inShutdown_inlock()) {
return;
}
LOG(3) << "Received remote response: "
<< redact(response.isOK() ? response.toString() : response.status.toString());
swap(cbState->callback, newCb);
scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk));
},
baton);
if (!commandStatus.isOK())
return commandStatus;
return swCbHandle;
}
void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) {
invariant(cbHandle.isValid());
auto cbState = checked_cast(getCallbackFromHandle(cbHandle));
stdx::unique_lock lk(_mutex);
if (_inShutdown_inlock()) {
return;
}
cbState->canceled.store(1);
if (cbState->isNetworkOperation) {
lk.unlock();
_net->cancelCommand(cbHandle, cbState->baton);
return;
}
if (cbState->readyDate != Date_t{}) {
// This callback might still be in the sleeper queue; if it is, schedule it now
// rather than when the alarm fires.
auto iter = std::find_if(_sleepersQueue.begin(),
_sleepersQueue.end(),
[cbState](const std::shared_ptr& other) {
return cbState == other.get();
});
if (iter != _sleepersQueue.end()) {
invariant(iter == cbState->iter);
scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk));
}
}
}
void ThreadPoolTaskExecutor::wait(const CallbackHandle& cbHandle) {
invariant(cbHandle.isValid());
auto cbState = checked_cast(getCallbackFromHandle(cbHandle));
if (cbState->isFinished.load()) {
return;
}
stdx::unique_lock lk(_mutex);
if (!cbState->finishedCondition) {
cbState->finishedCondition.emplace();
}
while (!cbState->isFinished.load()) {
cbState->finishedCondition->wait(lk);
}
}
void ThreadPoolTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) const {
_net->appendConnectionStats(stats);
}
StatusWith ThreadPoolTaskExecutor::enqueueCallbackState_inlock(
WorkQueue* queue, WorkQueue* wq) {
if (_inShutdown_inlock()) {
return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
invariant(!wq->empty());
queue->splice(queue->end(), *wq, wq->begin());
invariant(wq->empty());
CallbackHandle cbHandle;
setCallbackForHandle(&cbHandle, queue->back());
return cbHandle;
}
ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue(
CallbackFn work, const transport::BatonHandle& baton, Date_t when) {
WorkQueue result;
result.emplace_front(CallbackState::make(std::move(work), when, baton));
result.front()->iter = result.begin();
return result;
}
ThreadPoolTaskExecutor::EventList ThreadPoolTaskExecutor::makeSingletonEventList() {
EventList result;
result.emplace_front(EventState::make());
result.front()->iter = result.begin();
return result;
}
void ThreadPoolTaskExecutor::signalEvent_inlock(const EventHandle& event,
stdx::unique_lock lk) {
invariant(event.isValid());
auto eventState = checked_cast(getEventFromHandle(event));
invariant(!eventState->isSignaledFlag);
eventState->isSignaledFlag = true;
eventState->isSignaledCondition.notify_all();
_unsignaledEvents.erase(eventState->iter);
scheduleIntoPool_inlock(&eventState->waiters, std::move(lk));
}
void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
stdx::unique_lock lk) {
scheduleIntoPool_inlock(fromQueue, fromQueue->begin(), fromQueue->end(), std::move(lk));
}
void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
const WorkQueue::iterator& iter,
stdx::unique_lock lk) {
scheduleIntoPool_inlock(fromQueue, iter, std::next(iter), std::move(lk));
}
void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
const WorkQueue::iterator& begin,
const WorkQueue::iterator& end,
stdx::unique_lock lk) {
dassert(fromQueue != &_poolInProgressQueue);
std::vector> todo(begin, end);
_poolInProgressQueue.splice(_poolInProgressQueue.end(), *fromQueue, begin, end);
lk.unlock();
if (MONGO_FAIL_POINT(scheduleIntoPoolSpinsUntilThreadPoolShutsDown)) {
scheduleIntoPoolSpinsUntilThreadPoolShutsDown.setMode(FailPoint::off);
while (_pool->schedule([] {}) != ErrorCodes::ShutdownInProgress) {
sleepmillis(100);
}
}
for (const auto& cbState : todo) {
if (cbState->baton) {
cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); });
} else {
const auto status =
_pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
if (status == ErrorCodes::ShutdownInProgress)
break;
fassert(28735, status);
}
}
_net->signalWorkAvailable();
}
void ThreadPoolTaskExecutor::runCallback(std::shared_ptr cbStateArg) {
CallbackHandle cbHandle;
setCallbackForHandle(&cbHandle, cbStateArg);
CallbackArgs args(this,
std::move(cbHandle),
cbStateArg->canceled.load()
? Status({ErrorCodes::CallbackCanceled, "Callback canceled"})
: Status::OK());
invariant(!cbStateArg->isFinished.load());
{
// After running callback function, clear 'cbStateArg->callback' to release any resources
// that might be held by this function object.
// Swap 'cbStateArg->callback' with temporary copy before running callback for exception
// safety.
TaskExecutor::CallbackFn callback;
std::swap(cbStateArg->callback, callback);
callback(std::move(args));
}
cbStateArg->isFinished.store(true);
stdx::lock_guard lk(_mutex);
_poolInProgressQueue.erase(cbStateArg->iter);
if (cbStateArg->finishedCondition) {
cbStateArg->finishedCondition->notify_all();
}
}
bool ThreadPoolTaskExecutor::_inShutdown_inlock() const {
return _state >= joinRequired;
}
void ThreadPoolTaskExecutor::_setState_inlock(State newState) {
if (newState == _state) {
return;
}
_state = newState;
_stateChange.notify_all();
}
void ThreadPoolTaskExecutor::dropConnections(const HostAndPort& hostAndPort) {
_net->dropConnections(hostAndPort);
}
} // namespace executor
} // namespace mongo