diff options
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/network_interface.cpp | 16 | ||||
-rw-r--r-- | src/mongo/executor/network_interface.h | 157 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_impl.cpp | 376 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_impl.h | 230 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 632 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 549 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.cpp | 110 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.h | 542 |
8 files changed, 1284 insertions, 1328 deletions
diff --git a/src/mongo/executor/network_interface.cpp b/src/mongo/executor/network_interface.cpp index f748efabfeb..f025b1b1de3 100644 --- a/src/mongo/executor/network_interface.cpp +++ b/src/mongo/executor/network_interface.cpp @@ -34,15 +34,15 @@ namespace mongo { namespace executor { - // This is a bitmask with the first bit set. It's used to mark connections that should be kept - // open during stepdowns. +// This is a bitmask with the first bit set. It's used to mark connections that should be kept +// open during stepdowns. #ifndef _MSC_EXTENSIONS - const unsigned int NetworkInterface::kMessagingPortKeepOpen; -#endif // _MSC_EXTENSIONS +const unsigned int NetworkInterface::kMessagingPortKeepOpen; +#endif // _MSC_EXTENSIONS - NetworkInterface::NetworkInterface() {} - NetworkInterface::~NetworkInterface() {} +NetworkInterface::NetworkInterface() {} +NetworkInterface::~NetworkInterface() {} -} // namespace executor -} // namespace mongo +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index ef664b7d529..3db07ba382f 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -37,84 +37,83 @@ namespace mongo { namespace executor { +/** + * Interface to networking for use by TaskExecutor implementations. + */ +class NetworkInterface { + MONGO_DISALLOW_COPYING(NetworkInterface); + +public: + // A flag to keep replication MessagingPorts open when all other sockets are disconnected. + static const unsigned int kMessagingPortKeepOpen = 1; + + using Response = RemoteCommandResponse; + using RemoteCommandCompletionFn = stdx::function<void(const TaskExecutor::ResponseStatus&)>; + + virtual ~NetworkInterface(); + + /** + * Returns diagnostic info. + */ + virtual std::string getDiagnosticString() = 0; + + /** + * Starts up the network interface. + * + * It is valid to call all methods except shutdown() before this method completes. That is, + * implementations may not assume that startup() completes before startCommand() first + * executes. + * + * Called by the owning TaskExecutor inside its run() method. + */ + virtual void startup() = 0; + + /** + * Shuts down the network interface. Must be called before this instance gets deleted, + * if startup() is called. + * + * Called by the owning TaskExecutor inside its run() method. + */ + virtual void shutdown() = 0; + + /** + * Blocks the current thread (presumably the executor thread) until the network interface + * knows of work for the executor to perform. + */ + virtual void waitForWork() = 0; + + /** + * Similar to waitForWork, but only blocks until "when". + */ + virtual void waitForWorkUntil(Date_t when) = 0; + + /** + * Signals to the network interface that there is new work (such as a signaled event) for + * the executor to process. Wakes the executor from waitForWork() and friends. + */ + virtual void signalWorkAvailable() = 0; + /** - * Interface to networking for use by TaskExecutor implementations. + * Returns the current time. */ - class NetworkInterface { - MONGO_DISALLOW_COPYING(NetworkInterface); - public: - - // A flag to keep replication MessagingPorts open when all other sockets are disconnected. - static const unsigned int kMessagingPortKeepOpen = 1; - - using Response = RemoteCommandResponse; - using RemoteCommandCompletionFn = - stdx::function<void (const TaskExecutor::ResponseStatus&)>; - - virtual ~NetworkInterface(); - - /** - * Returns diagnostic info. - */ - virtual std::string getDiagnosticString() = 0; - - /** - * Starts up the network interface. - * - * It is valid to call all methods except shutdown() before this method completes. That is, - * implementations may not assume that startup() completes before startCommand() first - * executes. - * - * Called by the owning TaskExecutor inside its run() method. - */ - virtual void startup() = 0; - - /** - * Shuts down the network interface. Must be called before this instance gets deleted, - * if startup() is called. - * - * Called by the owning TaskExecutor inside its run() method. - */ - virtual void shutdown() = 0; - - /** - * Blocks the current thread (presumably the executor thread) until the network interface - * knows of work for the executor to perform. - */ - virtual void waitForWork() = 0; - - /** - * Similar to waitForWork, but only blocks until "when". - */ - virtual void waitForWorkUntil(Date_t when) = 0; - - /** - * Signals to the network interface that there is new work (such as a signaled event) for - * the executor to process. Wakes the executor from waitForWork() and friends. - */ - virtual void signalWorkAvailable() = 0; - - /** - * Returns the current time. - */ - virtual Date_t now() = 0; - - /** - * Starts asynchronous execution of the command described by "request". - */ - virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) = 0; - - /** - * Requests cancelation of the network activity associated with "cbHandle" if it has not yet - * completed. - */ - virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) = 0; - - protected: - NetworkInterface(); - }; - -} // namespace executor -} // namespace mongo + virtual Date_t now() = 0; + + /** + * Starts asynchronous execution of the command described by "request". + */ + virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) = 0; + + /** + * Requests cancelation of the network activity associated with "cbHandle" if it has not yet + * completed. + */ + virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) = 0; + +protected: + NetworkInterface(); +}; + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/network_interface_impl.cpp b/src/mongo/executor/network_interface_impl.cpp index 4e443a45165..0b997f7f219 100644 --- a/src/mongo/executor/network_interface_impl.cpp +++ b/src/mongo/executor/network_interface_impl.cpp @@ -46,232 +46,220 @@ namespace executor { namespace { - const size_t kMinThreads = 1; - const size_t kMaxThreads = 51; // Set to 1 + max repl set size, for heartbeat + wiggle room. - const Seconds kMaxIdleThreadAge(30); +const size_t kMinThreads = 1; +const size_t kMaxThreads = 51; // Set to 1 + max repl set size, for heartbeat + wiggle room. +const Seconds kMaxIdleThreadAge(30); } // namespace - NetworkInterfaceImpl::NetworkInterfaceImpl() : - NetworkInterface(), - _numIdleThreads(0), - _nextThreadId(0), - _lastFullUtilizationDate(), - _isExecutorRunnable(false), - _inShutdown(false), - _commandRunner(kMessagingPortKeepOpen), - _numActiveNetworkRequests(0) { +NetworkInterfaceImpl::NetworkInterfaceImpl() + : NetworkInterface(), + _numIdleThreads(0), + _nextThreadId(0), + _lastFullUtilizationDate(), + _isExecutorRunnable(false), + _inShutdown(false), + _commandRunner(kMessagingPortKeepOpen), + _numActiveNetworkRequests(0) {} - } - - NetworkInterfaceImpl::~NetworkInterfaceImpl() { } +NetworkInterfaceImpl::~NetworkInterfaceImpl() {} - std::string NetworkInterfaceImpl::getDiagnosticString() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - str::stream output; - output << "NetworkImpl"; - output << " threads:" << _threads.size(); - output << " inShutdown:" << _inShutdown; - output << " active:" << _numActiveNetworkRequests; - output << " pending:" << _pending.size(); - output << " execRunable:" << _isExecutorRunnable; - return output; +std::string NetworkInterfaceImpl::getDiagnosticString() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + str::stream output; + output << "NetworkImpl"; + output << " threads:" << _threads.size(); + output << " inShutdown:" << _inShutdown; + output << " active:" << _numActiveNetworkRequests; + output << " pending:" << _pending.size(); + output << " execRunable:" << _isExecutorRunnable; + return output; +} +void NetworkInterfaceImpl::_startNewNetworkThread_inlock() { + if (_inShutdown) { + LOG(1) << "Not starting new replication networking thread while shutting down replication."; + return; } - - void NetworkInterfaceImpl::_startNewNetworkThread_inlock() { - if (_inShutdown) { - LOG(1) << - "Not starting new replication networking thread while shutting down replication."; - return; - } - if (_threads.size() >= kMaxThreads) { - LOG(1) << "Not starting new replication networking thread because " << kMaxThreads << - " are already running; " << _numIdleThreads << " threads are idle and " << - _pending.size() << " network requests are waiting for a thread to serve them."; - return; - } - const std::string threadName(str::stream() << "ReplExecNetThread-" << _nextThreadId++); - try { - _threads.push_back( - std::make_shared<stdx::thread>( - stdx::bind(&NetworkInterfaceImpl::_requestProcessorThreadBody, - this, - threadName))); - ++_numIdleThreads; - } - catch (const std::exception& ex) { - error() << "Failed to start " << threadName << "; " << _threads.size() << - " other network threads still running; caught exception: " << ex.what(); - } + if (_threads.size() >= kMaxThreads) { + LOG(1) << "Not starting new replication networking thread because " << kMaxThreads + << " are already running; " << _numIdleThreads << " threads are idle and " + << _pending.size() << " network requests are waiting for a thread to serve them."; + return; } - - void NetworkInterfaceImpl::startup() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(!_inShutdown); - if (!_threads.empty()) { - return; - } - for (size_t i = 0; i < kMinThreads; ++i) { - _startNewNetworkThread_inlock(); - } + const std::string threadName(str::stream() << "ReplExecNetThread-" << _nextThreadId++); + try { + _threads.push_back(std::make_shared<stdx::thread>( + stdx::bind(&NetworkInterfaceImpl::_requestProcessorThreadBody, this, threadName))); + ++_numIdleThreads; + } catch (const std::exception& ex) { + error() << "Failed to start " << threadName << "; " << _threads.size() + << " other network threads still running; caught exception: " << ex.what(); } +} - void NetworkInterfaceImpl::shutdown() { - using std::swap; - stdx::unique_lock<stdx::mutex> lk(_mutex); - _inShutdown = true; - _hasPending.notify_all(); - ThreadList threadsToJoin; - swap(threadsToJoin, _threads); - lk.unlock(); - _commandRunner.shutdown(); - std::for_each(threadsToJoin.begin(), - threadsToJoin.end(), - stdx::bind(&stdx::thread::join, stdx::placeholders::_1)); +void NetworkInterfaceImpl::startup() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(!_inShutdown); + if (!_threads.empty()) { + return; } - - void NetworkInterfaceImpl::signalWorkAvailable() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _signalWorkAvailable_inlock(); + for (size_t i = 0; i < kMinThreads; ++i) { + _startNewNetworkThread_inlock(); } +} - void NetworkInterfaceImpl::_signalWorkAvailable_inlock() { - if (!_isExecutorRunnable) { - _isExecutorRunnable = true; - _isExecutorRunnableCondition.notify_one(); - } +void NetworkInterfaceImpl::shutdown() { + using std::swap; + stdx::unique_lock<stdx::mutex> lk(_mutex); + _inShutdown = true; + _hasPending.notify_all(); + ThreadList threadsToJoin; + swap(threadsToJoin, _threads); + lk.unlock(); + _commandRunner.shutdown(); + std::for_each(threadsToJoin.begin(), + threadsToJoin.end(), + stdx::bind(&stdx::thread::join, stdx::placeholders::_1)); +} + +void NetworkInterfaceImpl::signalWorkAvailable() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _signalWorkAvailable_inlock(); +} + +void NetworkInterfaceImpl::_signalWorkAvailable_inlock() { + if (!_isExecutorRunnable) { + _isExecutorRunnable = true; + _isExecutorRunnableCondition.notify_one(); } +} - void NetworkInterfaceImpl::waitForWork() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (!_isExecutorRunnable) { - _isExecutorRunnableCondition.wait(lk); - } - _isExecutorRunnable = false; +void NetworkInterfaceImpl::waitForWork() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (!_isExecutorRunnable) { + _isExecutorRunnableCondition.wait(lk); } + _isExecutorRunnable = false; +} - void NetworkInterfaceImpl::waitForWorkUntil(Date_t when) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (!_isExecutorRunnable) { - const Milliseconds waitTime(when - now()); - if (waitTime <= Milliseconds(0)) { - break; - } - _isExecutorRunnableCondition.wait_for(lk, waitTime); +void NetworkInterfaceImpl::waitForWorkUntil(Date_t when) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (!_isExecutorRunnable) { + const Milliseconds waitTime(when - now()); + if (waitTime <= Milliseconds(0)) { + break; } - _isExecutorRunnable = false; + _isExecutorRunnableCondition.wait_for(lk, waitTime); } + _isExecutorRunnable = false; +} - void NetworkInterfaceImpl::_requestProcessorThreadBody( - NetworkInterfaceImpl* net, - const std::string& threadName) { - setThreadName(threadName); - LOG(1) << "thread starting"; - net->_consumeNetworkRequests(); +void NetworkInterfaceImpl::_requestProcessorThreadBody(NetworkInterfaceImpl* net, + const std::string& threadName) { + setThreadName(threadName); + LOG(1) << "thread starting"; + net->_consumeNetworkRequests(); - // At this point, another thread may have destroyed "net", if this thread chose to detach - // itself and remove itself from net->_threads before releasing net->_mutex. Do not access - // member variables of "net" from here, on. - LOG(1) << "thread shutting down"; - } + // At this point, another thread may have destroyed "net", if this thread chose to detach + // itself and remove itself from net->_threads before releasing net->_mutex. Do not access + // member variables of "net" from here, on. + LOG(1) << "thread shutting down"; +} - void NetworkInterfaceImpl::_consumeNetworkRequests() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (!_inShutdown) { - if (_pending.empty()) { - if (_threads.size() > kMinThreads) { - const Date_t nowDate = now(); - const Date_t nextThreadRetirementDate = - _lastFullUtilizationDate + kMaxIdleThreadAge; - if (nowDate > nextThreadRetirementDate) { - _lastFullUtilizationDate = nowDate; - break; - } +void NetworkInterfaceImpl::_consumeNetworkRequests() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (!_inShutdown) { + if (_pending.empty()) { + if (_threads.size() > kMinThreads) { + const Date_t nowDate = now(); + const Date_t nextThreadRetirementDate = + _lastFullUtilizationDate + kMaxIdleThreadAge; + if (nowDate > nextThreadRetirementDate) { + _lastFullUtilizationDate = nowDate; + break; } - _hasPending.wait_for(lk, kMaxIdleThreadAge); - continue; } - CommandData todo = _pending.front(); - _pending.pop_front(); - ++_numActiveNetworkRequests; - --_numIdleThreads; - lk.unlock(); - TaskExecutor::ResponseStatus result = _commandRunner.runCommand(todo.request); - LOG(2) << "Network status of sending " << todo.request.cmdObj.firstElementFieldName() << - " to " << todo.request.target << " was " << result.getStatus(); - todo.onFinish(result); - lk.lock(); - --_numActiveNetworkRequests; - ++_numIdleThreads; - _signalWorkAvailable_inlock(); + _hasPending.wait_for(lk, kMaxIdleThreadAge); + continue; } + CommandData todo = _pending.front(); + _pending.pop_front(); + ++_numActiveNetworkRequests; --_numIdleThreads; - if (_inShutdown) { - return; - } - // This thread is ending because it was idle for too long. - // Find self in _threads, remove self from _threads, detach self. - for (size_t i = 0; i < _threads.size(); ++i) { - if (_threads[i]->get_id() != stdx::this_thread::get_id()) { - continue; - } - _threads[i]->detach(); - _threads[i].swap(_threads.back()); - _threads.pop_back(); - return; + lk.unlock(); + TaskExecutor::ResponseStatus result = _commandRunner.runCommand(todo.request); + LOG(2) << "Network status of sending " << todo.request.cmdObj.firstElementFieldName() + << " to " << todo.request.target << " was " << result.getStatus(); + todo.onFinish(result); + lk.lock(); + --_numActiveNetworkRequests; + ++_numIdleThreads; + _signalWorkAvailable_inlock(); + } + --_numIdleThreads; + if (_inShutdown) { + return; + } + // This thread is ending because it was idle for too long. + // Find self in _threads, remove self from _threads, detach self. + for (size_t i = 0; i < _threads.size(); ++i) { + if (_threads[i]->get_id() != stdx::this_thread::get_id()) { + continue; } - severe().stream() << "Could not find this thread, with id " << - stdx::this_thread::get_id() << " in the replication networking thread pool"; - fassertFailedNoTrace(28676); + _threads[i]->detach(); + _threads[i].swap(_threads.back()); + _threads.pop_back(); + return; } + severe().stream() << "Could not find this thread, with id " << stdx::this_thread::get_id() + << " in the replication networking thread pool"; + fassertFailedNoTrace(28676); +} - void NetworkInterfaceImpl::startCommand( - const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { - LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " << - request.target; - stdx::lock_guard<stdx::mutex> lk(_mutex); - _pending.push_back(CommandData()); - CommandData& cd = _pending.back(); - cd.cbHandle = cbHandle; - cd.request = request; - cd.onFinish = onFinish; - if (_numIdleThreads < _pending.size()) { - _startNewNetworkThread_inlock(); - } - if (_numIdleThreads <= _pending.size()) { - _lastFullUtilizationDate = Date_t::now(); - } - _hasPending.notify_one(); +void NetworkInterfaceImpl::startCommand(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) { + LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " << request.target; + stdx::lock_guard<stdx::mutex> lk(_mutex); + _pending.push_back(CommandData()); + CommandData& cd = _pending.back(); + cd.cbHandle = cbHandle; + cd.request = request; + cd.onFinish = onFinish; + if (_numIdleThreads < _pending.size()) { + _startNewNetworkThread_inlock(); + } + if (_numIdleThreads <= _pending.size()) { + _lastFullUtilizationDate = Date_t::now(); } + _hasPending.notify_one(); +} - void NetworkInterfaceImpl::cancelCommand( - const TaskExecutor::CallbackHandle& cbHandle) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - CommandDataList::iterator iter; - for (iter = _pending.begin(); iter != _pending.end(); ++iter) { - if (iter->cbHandle == cbHandle) { - break; - } +void NetworkInterfaceImpl::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + CommandDataList::iterator iter; + for (iter = _pending.begin(); iter != _pending.end(); ++iter) { + if (iter->cbHandle == cbHandle) { + break; } - if (iter == _pending.end()) { - return; - } - const RemoteCommandCompletionFn onFinish = iter->onFinish; - LOG(2) << "Canceled sending " << iter->request.cmdObj.firstElementFieldName() << " to " << - iter->request.target; - _pending.erase(iter); - lk.unlock(); - onFinish(TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled, "Callback canceled")); - lk.lock(); - _signalWorkAvailable_inlock(); } - - Date_t NetworkInterfaceImpl::now() { - return Date_t::now(); + if (iter == _pending.end()) { + return; } + const RemoteCommandCompletionFn onFinish = iter->onFinish; + LOG(2) << "Canceled sending " << iter->request.cmdObj.firstElementFieldName() << " to " + << iter->request.target; + _pending.erase(iter); + lk.unlock(); + onFinish(TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled, "Callback canceled")); + lk.lock(); + _signalWorkAvailable_inlock(); +} + +Date_t NetworkInterfaceImpl::now() { + return Date_t::now(); +} -} // namespace executor -} // namespace mongo +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/network_interface_impl.h b/src/mongo/executor/network_interface_impl.h index 3744bf808b9..6e29d09263f 100644 --- a/src/mongo/executor/network_interface_impl.h +++ b/src/mongo/executor/network_interface_impl.h @@ -41,123 +41,121 @@ namespace mongo { namespace executor { +/** + * Implementation of the network interface for use by classes implementing TaskExecutor + * inside mongod. + * + * This implementation manages a dynamically sized group of worker threads for performing + * network operations. The minimum and maximum number of threads is set at compile time, and + * the exact number of threads is adjusted dynamically, using the following two rules. + * + * 1.) If the number of worker threads is less than the maximum, there are no idle worker + * threads, and the client enqueues a new network operation via startCommand(), the network + * interface spins up a new worker thread. This decision is made on the assumption that + * spinning up a new thread is faster than the round-trip time for processing a remote command, + * and so this will minimize wait time. + * + * 2.) If the number of worker threads has exceeded the the peak number of scheduled outstanding + * network commands continuously for a period of time (kMaxIdleThreadAge), one thread is retired + * from the pool and the monitoring of idle threads is reset. This means that at most one + * thread retires every kMaxIdleThreadAge units of time. The value of kMaxIdleThreadAge is set + * to be much larger than the expected frequency of new requests, averaging out short-duration + * periods of idleness, as occur between heartbeats. + * + * The implementation also manages a pool of network connections to recently contacted remote + * nodes. The size of this pool is not bounded, but connections are retired unconditionally + * after they have been connected for a certain maximum period. + * TODO(spencer): Rename this to ThreadPoolNetworkInterface + */ +class NetworkInterfaceImpl : public NetworkInterface { +public: + NetworkInterfaceImpl(); + virtual ~NetworkInterfaceImpl(); + virtual std::string getDiagnosticString(); + virtual void startup(); + virtual void shutdown(); + virtual void waitForWork(); + virtual void waitForWorkUntil(Date_t when); + virtual void signalWorkAvailable(); + virtual Date_t now(); + virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish); + virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle); + +private: /** - * Implementation of the network interface for use by classes implementing TaskExecutor - * inside mongod. - * - * This implementation manages a dynamically sized group of worker threads for performing - * network operations. The minimum and maximum number of threads is set at compile time, and - * the exact number of threads is adjusted dynamically, using the following two rules. - * - * 1.) If the number of worker threads is less than the maximum, there are no idle worker - * threads, and the client enqueues a new network operation via startCommand(), the network - * interface spins up a new worker thread. This decision is made on the assumption that - * spinning up a new thread is faster than the round-trip time for processing a remote command, - * and so this will minimize wait time. - * - * 2.) If the number of worker threads has exceeded the the peak number of scheduled outstanding - * network commands continuously for a period of time (kMaxIdleThreadAge), one thread is retired - * from the pool and the monitoring of idle threads is reset. This means that at most one - * thread retires every kMaxIdleThreadAge units of time. The value of kMaxIdleThreadAge is set - * to be much larger than the expected frequency of new requests, averaging out short-duration - * periods of idleness, as occur between heartbeats. - * - * The implementation also manages a pool of network connections to recently contacted remote - * nodes. The size of this pool is not bounded, but connections are retired unconditionally - * after they have been connected for a certain maximum period. - * TODO(spencer): Rename this to ThreadPoolNetworkInterface + * Information describing an in-flight command. */ - class NetworkInterfaceImpl : public NetworkInterface { - public: - NetworkInterfaceImpl(); - virtual ~NetworkInterfaceImpl(); - virtual std::string getDiagnosticString(); - virtual void startup(); - virtual void shutdown(); - virtual void waitForWork(); - virtual void waitForWorkUntil(Date_t when); - virtual void signalWorkAvailable(); - virtual Date_t now(); - virtual void startCommand( - const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish); - virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle); - - private: - - /** - * Information describing an in-flight command. - */ - struct CommandData { - TaskExecutor::CallbackHandle cbHandle; - RemoteCommandRequest request; - RemoteCommandCompletionFn onFinish; - }; - typedef stdx::list<CommandData> CommandDataList; - typedef std::vector<std::shared_ptr<stdx::thread> > ThreadList; - - /** - * Thread body for threads that synchronously perform network requests from - * the _pending list. - */ - static void _requestProcessorThreadBody(NetworkInterfaceImpl* net, - const std::string& threadName); - - /** - * Run loop that iteratively consumes network requests in a request processor thread. - */ - void _consumeNetworkRequests(); - - /** - * Notifies the network threads that there is work available. - */ - void _signalWorkAvailable_inlock(); - - /** - * Starts a new network thread. - */ - void _startNewNetworkThread_inlock(); - - // Mutex guarding the state of this network interface, except for the remote command - // executor, which has its own concurrency control. - stdx::mutex _mutex; - - // Condition signaled to indicate that there is work in the _pending queue. - stdx::condition_variable _hasPending; - - // Queue of yet-to-be-executed network operations. - CommandDataList _pending; - - // List of threads serving as the worker pool. - ThreadList _threads; - - // Count of idle threads. - size_t _numIdleThreads; - - // Id counter for assigning thread names - size_t _nextThreadId; - - // The last time that _pending.size() + _numActiveNetworkRequests grew to be at least - // _threads.size(). - Date_t _lastFullUtilizationDate; - - // Condition signaled to indicate that the executor, blocked in waitForWorkUntil or - // waitForWork, should wake up. - stdx::condition_variable _isExecutorRunnableCondition; - - // Flag indicating whether or not the executor associated with this interface is runnable. - bool _isExecutorRunnable; - - // Flag indicating when this interface is being shut down (because shutdown() has executed). - bool _inShutdown; - - // Interface for running remote commands - RemoteCommandRunnerImpl _commandRunner; - - // Number of active network requests - size_t _numActiveNetworkRequests; + struct CommandData { + TaskExecutor::CallbackHandle cbHandle; + RemoteCommandRequest request; + RemoteCommandCompletionFn onFinish; }; + typedef stdx::list<CommandData> CommandDataList; + typedef std::vector<std::shared_ptr<stdx::thread>> ThreadList; + + /** + * Thread body for threads that synchronously perform network requests from + * the _pending list. + */ + static void _requestProcessorThreadBody(NetworkInterfaceImpl* net, + const std::string& threadName); + + /** + * Run loop that iteratively consumes network requests in a request processor thread. + */ + void _consumeNetworkRequests(); + + /** + * Notifies the network threads that there is work available. + */ + void _signalWorkAvailable_inlock(); + + /** + * Starts a new network thread. + */ + void _startNewNetworkThread_inlock(); + + // Mutex guarding the state of this network interface, except for the remote command + // executor, which has its own concurrency control. + stdx::mutex _mutex; + + // Condition signaled to indicate that there is work in the _pending queue. + stdx::condition_variable _hasPending; + + // Queue of yet-to-be-executed network operations. + CommandDataList _pending; + + // List of threads serving as the worker pool. + ThreadList _threads; + + // Count of idle threads. + size_t _numIdleThreads; + + // Id counter for assigning thread names + size_t _nextThreadId; + + // The last time that _pending.size() + _numActiveNetworkRequests grew to be at least + // _threads.size(). + Date_t _lastFullUtilizationDate; + + // Condition signaled to indicate that the executor, blocked in waitForWorkUntil or + // waitForWork, should wake up. + stdx::condition_variable _isExecutorRunnableCondition; + + // Flag indicating whether or not the executor associated with this interface is runnable. + bool _isExecutorRunnable; + + // Flag indicating when this interface is being shut down (because shutdown() has executed). + bool _inShutdown; + + // Interface for running remote commands + RemoteCommandRunnerImpl _commandRunner; + + // Number of active network requests + size_t _numActiveNetworkRequests; +}; -} // namespace executor -} // namespace mongo +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 6f13f42afd5..cee26e4bdc0 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -36,364 +36,350 @@ namespace mongo { namespace executor { - NetworkInterfaceMock::NetworkInterfaceMock() - : _waitingToRunMask(0), - _currentlyRunning(kNoThread), - _now(fassertStatusOK(18653, dateFromISOString("2014-08-01T00:00:00Z"))), - _hasStarted(false), - _inShutdown(false), - _executorNextWakeupDate(Date_t::max()) { +NetworkInterfaceMock::NetworkInterfaceMock() + : _waitingToRunMask(0), + _currentlyRunning(kNoThread), + _now(fassertStatusOK(18653, dateFromISOString("2014-08-01T00:00:00Z"))), + _hasStarted(false), + _inShutdown(false), + _executorNextWakeupDate(Date_t::max()) {} + +NetworkInterfaceMock::~NetworkInterfaceMock() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + invariant(!_hasStarted || _inShutdown); + invariant(_scheduled.empty()); + invariant(_blackHoled.empty()); +} + +std::string NetworkInterfaceMock::getDiagnosticString() { + // TODO something better. + return "NetworkInterfaceMock diagnostics here"; +} + +Date_t NetworkInterfaceMock::now() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _now_inlock(); +} + +void NetworkInterfaceMock::startCommand(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(!_inShutdown); + const Date_t now = _now_inlock(); + NetworkOperationIterator insertBefore = _unscheduled.begin(); + while ((insertBefore != _unscheduled.end()) && + (insertBefore->getNextConsiderationDate() <= now)) { + ++insertBefore; } - - NetworkInterfaceMock::~NetworkInterfaceMock() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(!_hasStarted || _inShutdown); - invariant(_scheduled.empty()); - invariant(_blackHoled.empty()); + _unscheduled.insert(insertBefore, NetworkOperation(cbHandle, request, now, onFinish)); +} + +static bool findAndCancelIf( + const stdx::function<bool(const NetworkInterfaceMock::NetworkOperation&)>& matchFn, + NetworkInterfaceMock::NetworkOperationList* other, + NetworkInterfaceMock::NetworkOperationList* scheduled, + const Date_t now) { + const NetworkInterfaceMock::NetworkOperationIterator noi = + std::find_if(other->begin(), other->end(), matchFn); + if (noi == other->end()) { + return false; } - - std::string NetworkInterfaceMock::getDiagnosticString() { - // TODO something better. - return "NetworkInterfaceMock diagnostics here"; + scheduled->splice(scheduled->begin(), *other, noi); + noi->setResponse( + now, + TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled, "Network operation canceled")); + return true; +} + +void NetworkInterfaceMock::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(!_inShutdown); + stdx::function<bool(const NetworkOperation&)> matchesHandle = + stdx::bind(&NetworkOperation::isForCallback, stdx::placeholders::_1, cbHandle); + const Date_t now = _now_inlock(); + if (findAndCancelIf(matchesHandle, &_unscheduled, &_scheduled, now)) { + return; } - - Date_t NetworkInterfaceMock::now() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _now_inlock(); + if (findAndCancelIf(matchesHandle, &_blackHoled, &_scheduled, now)) { + return; } - - void NetworkInterfaceMock::startCommand( - const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { - - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(!_inShutdown); - const Date_t now = _now_inlock(); - NetworkOperationIterator insertBefore = _unscheduled.begin(); - while ((insertBefore != _unscheduled.end()) && - (insertBefore->getNextConsiderationDate() <= now)) { - - ++insertBefore; - } - _unscheduled.insert(insertBefore, NetworkOperation(cbHandle, request, now, onFinish)); + if (findAndCancelIf(matchesHandle, &_scheduled, &_scheduled, now)) { + return; } - - static bool findAndCancelIf( - const stdx::function<bool (const NetworkInterfaceMock::NetworkOperation&)>& matchFn, - NetworkInterfaceMock::NetworkOperationList* other, - NetworkInterfaceMock::NetworkOperationList* scheduled, - const Date_t now) { - const NetworkInterfaceMock::NetworkOperationIterator noi = - std::find_if(other->begin(), other->end(), matchFn); - if (noi == other->end()) { - return false; - } - scheduled->splice(scheduled->begin(), *other, noi); - noi->setResponse(now, TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled, - "Network operation canceled")); - return true; + // No not-in-progress network command matched cbHandle. Oh, well. +} + +void NetworkInterfaceMock::startup() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(!_hasStarted); + _hasStarted = true; + _inShutdown = false; + invariant(_currentlyRunning == kNoThread); + _currentlyRunning = kExecutorThread; +} + +void NetworkInterfaceMock::shutdown() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + invariant(_hasStarted); + invariant(!_inShutdown); + _inShutdown = true; + NetworkOperationList todo; + todo.splice(todo.end(), _scheduled); + todo.splice(todo.end(), _unscheduled); + todo.splice(todo.end(), _processing); + todo.splice(todo.end(), _blackHoled); + + const Date_t now = _now_inlock(); + _waitingToRunMask |= kExecutorThread; // Prevents network thread from scheduling. + lk.unlock(); + for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) { + iter->setResponse(now, + TaskExecutor::ResponseStatus(ErrorCodes::ShutdownInProgress, + "Shutting down mock network")); + iter->finishResponse(); } - - void NetworkInterfaceMock::cancelCommand( - const TaskExecutor::CallbackHandle& cbHandle) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(!_inShutdown); - stdx::function<bool (const NetworkOperation&)> matchesHandle = stdx::bind( - &NetworkOperation::isForCallback, - stdx::placeholders::_1, - cbHandle); - const Date_t now = _now_inlock(); - if (findAndCancelIf(matchesHandle, &_unscheduled, &_scheduled, now)) { - return; - } - if (findAndCancelIf(matchesHandle, &_blackHoled, &_scheduled, now)) { - return; - } - if (findAndCancelIf(matchesHandle, &_scheduled, &_scheduled, now)) { - return; - } - // No not-in-progress network command matched cbHandle. Oh, well. - } - - void NetworkInterfaceMock::startup() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(!_hasStarted); - _hasStarted = true; - _inShutdown = false; - invariant(_currentlyRunning == kNoThread); - _currentlyRunning = kExecutorThread; - } - - void NetworkInterfaceMock::shutdown() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_hasStarted); - invariant(!_inShutdown); - _inShutdown = true; - NetworkOperationList todo; - todo.splice(todo.end(), _scheduled); - todo.splice(todo.end(), _unscheduled); - todo.splice(todo.end(), _processing); - todo.splice(todo.end(), _blackHoled); - - const Date_t now = _now_inlock(); - _waitingToRunMask |= kExecutorThread; // Prevents network thread from scheduling. - lk.unlock(); - for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) { - iter->setResponse(now, TaskExecutor::ResponseStatus(ErrorCodes::ShutdownInProgress, - "Shutting down mock network")); - iter->finishResponse(); - } - lk.lock(); - invariant(_currentlyRunning == kExecutorThread); - _currentlyRunning = kNoThread; - _waitingToRunMask = kNetworkThread; - _shouldWakeNetworkCondition.notify_one(); + lk.lock(); + invariant(_currentlyRunning == kExecutorThread); + _currentlyRunning = kNoThread; + _waitingToRunMask = kNetworkThread; + _shouldWakeNetworkCondition.notify_one(); +} + +void NetworkInterfaceMock::enterNetwork() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (!_isNetworkThreadRunnable_inlock()) { + _shouldWakeNetworkCondition.wait(lk); } - - void NetworkInterfaceMock::enterNetwork() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (!_isNetworkThreadRunnable_inlock()) { - _shouldWakeNetworkCondition.wait(lk); - } - _currentlyRunning = kNetworkThread; - _waitingToRunMask &= ~kNetworkThread; + _currentlyRunning = kNetworkThread; + _waitingToRunMask &= ~kNetworkThread; +} + +void NetworkInterfaceMock::exitNetwork() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_currentlyRunning != kNetworkThread) { + return; } - - void NetworkInterfaceMock::exitNetwork() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_currentlyRunning != kNetworkThread) { - return; - } - _currentlyRunning = kNoThread; - if (_isExecutorThreadRunnable_inlock()) { - _shouldWakeExecutorCondition.notify_one(); - } - _waitingToRunMask |= kNetworkThread; + _currentlyRunning = kNoThread; + if (_isExecutorThreadRunnable_inlock()) { + _shouldWakeExecutorCondition.notify_one(); } - - bool NetworkInterfaceMock::hasReadyRequests() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_currentlyRunning == kNetworkThread); - return _hasReadyRequests_inlock(); + _waitingToRunMask |= kNetworkThread; +} + +bool NetworkInterfaceMock::hasReadyRequests() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_currentlyRunning == kNetworkThread); + return _hasReadyRequests_inlock(); +} + +bool NetworkInterfaceMock::_hasReadyRequests_inlock() { + if (_unscheduled.empty()) + return false; + if (_unscheduled.front().getNextConsiderationDate() > _now_inlock()) { + return false; } + return true; +} - bool NetworkInterfaceMock::_hasReadyRequests_inlock() { - if (_unscheduled.empty()) - return false; - if (_unscheduled.front().getNextConsiderationDate() > _now_inlock()) { - return false; - } - return true; +NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNextReadyRequest() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + invariant(_currentlyRunning == kNetworkThread); + while (!_hasReadyRequests_inlock()) { + _waitingToRunMask |= kExecutorThread; + _runReadyNetworkOperations_inlock(&lk); } - - NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNextReadyRequest() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_currentlyRunning == kNetworkThread); - while (!_hasReadyRequests_inlock()) { - _waitingToRunMask |= kExecutorThread; - _runReadyNetworkOperations_inlock(&lk); - } - invariant(_hasReadyRequests_inlock()); - _processing.splice(_processing.begin(), _unscheduled, _unscheduled.begin()); - return _processing.begin(); + invariant(_hasReadyRequests_inlock()); + _processing.splice(_processing.begin(), _unscheduled, _unscheduled.begin()); + return _processing.begin(); +} + +void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi, + Date_t when, + const TaskExecutor::ResponseStatus& response) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_currentlyRunning == kNetworkThread); + NetworkOperationIterator insertBefore = _scheduled.begin(); + while ((insertBefore != _scheduled.end()) && (insertBefore->getResponseDate() <= when)) { + ++insertBefore; } - - void NetworkInterfaceMock::scheduleResponse( - NetworkOperationIterator noi, - Date_t when, - const TaskExecutor::ResponseStatus& response) { - - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_currentlyRunning == kNetworkThread); - NetworkOperationIterator insertBefore = _scheduled.begin(); - while ((insertBefore != _scheduled.end()) && (insertBefore->getResponseDate() <= when)) { - ++insertBefore; + noi->setResponse(when, response); + _scheduled.splice(insertBefore, _processing, noi); +} + +void NetworkInterfaceMock::blackHole(NetworkOperationIterator noi) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_currentlyRunning == kNetworkThread); + _blackHoled.splice(_blackHoled.end(), _processing, noi); +} + +void NetworkInterfaceMock::requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_currentlyRunning == kNetworkThread); + invariant(noi->getNextConsiderationDate() < dontAskUntil); + invariant(_now_inlock() < dontAskUntil); + NetworkOperationIterator insertBefore = _unscheduled.begin(); + for (; insertBefore != _unscheduled.end(); ++insertBefore) { + if (insertBefore->getNextConsiderationDate() >= dontAskUntil) { + break; } - noi->setResponse(when, response); - _scheduled.splice(insertBefore, _processing, noi); } - - void NetworkInterfaceMock::blackHole(NetworkOperationIterator noi) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_currentlyRunning == kNetworkThread); - _blackHoled.splice(_blackHoled.end(), _processing, noi); - } - - void NetworkInterfaceMock::requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_currentlyRunning == kNetworkThread); - invariant(noi->getNextConsiderationDate() < dontAskUntil); - invariant(_now_inlock() < dontAskUntil); - NetworkOperationIterator insertBefore = _unscheduled.begin(); - for (; insertBefore != _unscheduled.end(); ++insertBefore) { - if (insertBefore->getNextConsiderationDate() >= dontAskUntil) { - break; - } + noi->setNextConsiderationDate(dontAskUntil); + _unscheduled.splice(insertBefore, _processing, noi); +} + +void NetworkInterfaceMock::runUntil(Date_t until) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + invariant(_currentlyRunning == kNetworkThread); + invariant(until > _now_inlock()); + while (until > _now_inlock()) { + _runReadyNetworkOperations_inlock(&lk); + if (_hasReadyRequests_inlock()) { + break; } - noi->setNextConsiderationDate(dontAskUntil); - _unscheduled.splice(insertBefore, _processing, noi); - } - - void NetworkInterfaceMock::runUntil(Date_t until) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_currentlyRunning == kNetworkThread); - invariant(until > _now_inlock()); - while (until > _now_inlock()) { - _runReadyNetworkOperations_inlock(&lk); - if (_hasReadyRequests_inlock()) { - break; - } - Date_t newNow = _executorNextWakeupDate; - if (!_scheduled.empty() && _scheduled.front().getResponseDate() < newNow) { - newNow = _scheduled.front().getResponseDate(); - } - if (until < newNow) { - newNow = until; - } - invariant(_now_inlock() <= newNow); - _now = newNow; - _waitingToRunMask |= kExecutorThread; + Date_t newNow = _executorNextWakeupDate; + if (!_scheduled.empty() && _scheduled.front().getResponseDate() < newNow) { + newNow = _scheduled.front().getResponseDate(); } - _runReadyNetworkOperations_inlock(&lk); - } - - void NetworkInterfaceMock::runReadyNetworkOperations() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_currentlyRunning == kNetworkThread); - _runReadyNetworkOperations_inlock(&lk); - } - - void NetworkInterfaceMock::waitForWork() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_currentlyRunning == kExecutorThread); - _waitForWork_inlock(&lk); - } - - void NetworkInterfaceMock::waitForWorkUntil(Date_t when) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_currentlyRunning == kExecutorThread); - _executorNextWakeupDate = when; - if (_executorNextWakeupDate <= _now_inlock()) { - return; + if (until < newNow) { + newNow = until; } - _waitForWork_inlock(&lk); - } - - void NetworkInterfaceMock::signalWorkAvailable() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_now_inlock() <= newNow); + _now = newNow; _waitingToRunMask |= kExecutorThread; - if (_currentlyRunning == kNoThread) { - _shouldWakeExecutorCondition.notify_one(); - } } + _runReadyNetworkOperations_inlock(&lk); +} + +void NetworkInterfaceMock::runReadyNetworkOperations() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + invariant(_currentlyRunning == kNetworkThread); + _runReadyNetworkOperations_inlock(&lk); +} + +void NetworkInterfaceMock::waitForWork() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + invariant(_currentlyRunning == kExecutorThread); + _waitForWork_inlock(&lk); +} + +void NetworkInterfaceMock::waitForWorkUntil(Date_t when) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + invariant(_currentlyRunning == kExecutorThread); + _executorNextWakeupDate = when; + if (_executorNextWakeupDate <= _now_inlock()) { + return; + } + _waitForWork_inlock(&lk); +} - void NetworkInterfaceMock::_runReadyNetworkOperations_inlock( - stdx::unique_lock<stdx::mutex>* lk) { - while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) { - invariant(_currentlyRunning == kNetworkThread); - NetworkOperation op = _scheduled.front(); - _scheduled.pop_front(); - _waitingToRunMask |= kExecutorThread; - lk->unlock(); - op.finishResponse(); - lk->lock(); - } - invariant(_currentlyRunning == kNetworkThread); - if (!(_waitingToRunMask & kExecutorThread)) { - return; - } +void NetworkInterfaceMock::signalWorkAvailable() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _waitingToRunMask |= kExecutorThread; + if (_currentlyRunning == kNoThread) { _shouldWakeExecutorCondition.notify_one(); - _currentlyRunning = kNoThread; - while (!_isNetworkThreadRunnable_inlock()) { - _shouldWakeNetworkCondition.wait(*lk); - } - _currentlyRunning = kNetworkThread; - _waitingToRunMask &= ~kNetworkThread; } +} - void NetworkInterfaceMock::_waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk) { - if (_waitingToRunMask & kExecutorThread) { - _waitingToRunMask &= ~kExecutorThread; - return; - } - _currentlyRunning = kNoThread; - while (!_isExecutorThreadRunnable_inlock()) { - _waitingToRunMask |= kNetworkThread; - _shouldWakeNetworkCondition.notify_one(); - _shouldWakeExecutorCondition.wait(*lk); - } - _currentlyRunning = kExecutorThread; - _waitingToRunMask &= ~kExecutorThread; +void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk) { + while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) { + invariant(_currentlyRunning == kNetworkThread); + NetworkOperation op = _scheduled.front(); + _scheduled.pop_front(); + _waitingToRunMask |= kExecutorThread; + lk->unlock(); + op.finishResponse(); + lk->lock(); } - - bool NetworkInterfaceMock::_isNetworkThreadRunnable_inlock() { - if (_currentlyRunning != kNoThread) { - return false; - } - if (_waitingToRunMask != kNetworkThread) { - return false; - } - return true; + invariant(_currentlyRunning == kNetworkThread); + if (!(_waitingToRunMask & kExecutorThread)) { + return; } - - bool NetworkInterfaceMock::_isExecutorThreadRunnable_inlock() { - if (_currentlyRunning != kNoThread) { - return false; - } - return _waitingToRunMask & kExecutorThread; + _shouldWakeExecutorCondition.notify_one(); + _currentlyRunning = kNoThread; + while (!_isNetworkThreadRunnable_inlock()) { + _shouldWakeNetworkCondition.wait(*lk); } + _currentlyRunning = kNetworkThread; + _waitingToRunMask &= ~kNetworkThread; +} - static const StatusWith<RemoteCommandResponse> kUnsetResponse( - ErrorCodes::InternalError, - "NetworkOperation::_response never set"); - - NetworkInterfaceMock::NetworkOperation::NetworkOperation() - : _requestDate(), - _nextConsiderationDate(), - _responseDate(), - _request(), - _response(kUnsetResponse), - _onFinish() { +void NetworkInterfaceMock::_waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk) { + if (_waitingToRunMask & kExecutorThread) { + _waitingToRunMask &= ~kExecutorThread; + return; } - - NetworkInterfaceMock::NetworkOperation::NetworkOperation( - const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& theRequest, - Date_t theRequestDate, - const RemoteCommandCompletionFn& onFinish) - : _requestDate(theRequestDate), - _nextConsiderationDate(theRequestDate), - _responseDate(), - _cbHandle(cbHandle), - _request(theRequest), - _response(kUnsetResponse), - _onFinish(onFinish) { + _currentlyRunning = kNoThread; + while (!_isExecutorThreadRunnable_inlock()) { + _waitingToRunMask |= kNetworkThread; + _shouldWakeNetworkCondition.notify_one(); + _shouldWakeExecutorCondition.wait(*lk); } + _currentlyRunning = kExecutorThread; + _waitingToRunMask &= ~kExecutorThread; +} - NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {} - - void NetworkInterfaceMock::NetworkOperation::setNextConsiderationDate( - Date_t nextConsiderationDate) { - - invariant(nextConsiderationDate > _nextConsiderationDate); - _nextConsiderationDate = nextConsiderationDate; +bool NetworkInterfaceMock::_isNetworkThreadRunnable_inlock() { + if (_currentlyRunning != kNoThread) { + return false; } - - void NetworkInterfaceMock::NetworkOperation::setResponse( - Date_t responseDate, - const TaskExecutor::ResponseStatus& response) { - - invariant(responseDate >= _requestDate); - _responseDate = responseDate; - _response = response; + if (_waitingToRunMask != kNetworkThread) { + return false; } + return true; +} - void NetworkInterfaceMock::NetworkOperation::finishResponse() { - invariant(_onFinish); - _onFinish(_response); - _onFinish = RemoteCommandCompletionFn(); +bool NetworkInterfaceMock::_isExecutorThreadRunnable_inlock() { + if (_currentlyRunning != kNoThread) { + return false; } + return _waitingToRunMask & kExecutorThread; +} + +static const StatusWith<RemoteCommandResponse> kUnsetResponse( + ErrorCodes::InternalError, "NetworkOperation::_response never set"); + +NetworkInterfaceMock::NetworkOperation::NetworkOperation() + : _requestDate(), + _nextConsiderationDate(), + _responseDate(), + _request(), + _response(kUnsetResponse), + _onFinish() {} + +NetworkInterfaceMock::NetworkOperation::NetworkOperation( + const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& theRequest, + Date_t theRequestDate, + const RemoteCommandCompletionFn& onFinish) + : _requestDate(theRequestDate), + _nextConsiderationDate(theRequestDate), + _responseDate(), + _cbHandle(cbHandle), + _request(theRequest), + _response(kUnsetResponse), + _onFinish(onFinish) {} + +NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {} + +void NetworkInterfaceMock::NetworkOperation::setNextConsiderationDate( + Date_t nextConsiderationDate) { + invariant(nextConsiderationDate > _nextConsiderationDate); + _nextConsiderationDate = nextConsiderationDate; +} + +void NetworkInterfaceMock::NetworkOperation::setResponse( + Date_t responseDate, const TaskExecutor::ResponseStatus& response) { + invariant(responseDate >= _requestDate); + _responseDate = responseDate; + _response = response; +} + +void NetworkInterfaceMock::NetworkOperation::finishResponse() { + invariant(_onFinish); + _onFinish(_response); + _onFinish = RemoteCommandCompletionFn(); +} } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index ff16f8eff6a..910566aa375 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -39,286 +39,291 @@ namespace mongo { namespace executor { +/** + * Mock network implementation for use in unit tests. + * + * To use, construct a new instance on the heap, and keep a pointer to it. Pass + * the pointer to the instance into the TaskExecutor constructor, transferring + * ownership. Start the executor's run() method in a separate thread, schedule the + * work you want to test into the executor, then while the test is still going, iterate + * through the ready network requests, servicing them and advancing time as needed. + * + * The mock has a fully virtualized notion of time and the the network. When the + * executor under test schedules a network operation, the startCommand + * method of this class adds an entry to the _unscheduled queue for immediate consideration. + * The test driver loop, when it examines the request, may schedule a response, ask the + * interface to redeliver the request at a later virtual time, or to swallow the virtual + * request until the end of the simulation. The test driver loop can also instruct the + * interface to run forward through virtual time until there are operations ready to + * consider, via runUntil. + * + * The thread acting as the "network" and the executor run thread are highly synchronized + * by this code, allowing for deterministic control of operation interleaving. + */ +class NetworkInterfaceMock : public NetworkInterface { +public: + class NetworkOperation; + typedef stdx::list<NetworkOperation> NetworkOperationList; + typedef NetworkOperationList::iterator NetworkOperationIterator; + + NetworkInterfaceMock(); + virtual ~NetworkInterfaceMock(); + virtual std::string getDiagnosticString(); + + //////////////////////////////////////////////////////////////////////////////// + // + // NetworkInterface methods + // + //////////////////////////////////////////////////////////////////////////////// + + virtual void startup(); + virtual void shutdown(); + virtual void waitForWork(); + virtual void waitForWorkUntil(Date_t when); + virtual void signalWorkAvailable(); + virtual Date_t now(); + virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish); + virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle); + + + //////////////////////////////////////////////////////////////////////////////// + // + // Methods for simulating network operations and the passage of time. + // + // Methods in this section are to be called by the thread currently simulating + // the network. + // + //////////////////////////////////////////////////////////////////////////////// + + /** + * Causes the currently running (non-executor) thread to assume the mantle of the network + * simulation thread. + * + * Call this before calling any of the other methods in this section. + */ + void enterNetwork(); + + /** + * Causes the currently running thread to drop the mantle of "network simulation thread". + * + * Call this before calling any methods that might block waiting for the + * executor thread. + */ + void exitNetwork(); + + /** + * Returns true if there are unscheduled network requests to be processed. + */ + bool hasReadyRequests(); + + /** + * Gets the next unscheduled request to process, blocking until one is available. + * + * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork. + */ + NetworkOperationIterator getNextReadyRequest(); + + /** + * Schedules "response" in response to "noi" at virtual time "when". + */ + void scheduleResponse(NetworkOperationIterator noi, + Date_t when, + const TaskExecutor::ResponseStatus& response); + + /** + * Swallows "noi", causing the network interface to not respond to it until + * shutdown() is called. + */ + void blackHole(NetworkOperationIterator noi); + + /** + * Defers decision making on "noi" until virtual time "dontAskUntil". Use + * this when getNextReadyRequest() returns a request you want to deal with + * after looking at other requests. + */ + void requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil); + /** - * Mock network implementation for use in unit tests. + * Runs the simulator forward until now() == until or hasReadyRequests() is true. * - * To use, construct a new instance on the heap, and keep a pointer to it. Pass - * the pointer to the instance into the TaskExecutor constructor, transferring - * ownership. Start the executor's run() method in a separate thread, schedule the - * work you want to test into the executor, then while the test is still going, iterate - * through the ready network requests, servicing them and advancing time as needed. + * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork. + */ + void runUntil(Date_t until); + + /** + * Processes all ready, scheduled network operations. * - * The mock has a fully virtualized notion of time and the the network. When the - * executor under test schedules a network operation, the startCommand - * method of this class adds an entry to the _unscheduled queue for immediate consideration. - * The test driver loop, when it examines the request, may schedule a response, ask the - * interface to redeliver the request at a later virtual time, or to swallow the virtual - * request until the end of the simulation. The test driver loop can also instruct the - * interface to run forward through virtual time until there are operations ready to - * consider, via runUntil. + * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork. + */ + void runReadyNetworkOperations(); + +private: + /** + * Type used to identify which thread (network mock or executor) is currently executing. * - * The thread acting as the "network" and the executor run thread are highly synchronized - * by this code, allowing for deterministic control of operation interleaving. + * Values are used in a bitmask, as well. + */ + enum ThreadType { kNoThread = 0, kExecutorThread = 1, kNetworkThread = 2 }; + + /** + * Returns the current virtualized time. + */ + Date_t _now_inlock() const { + return _now; + } + + /** + * Implementation of waitForWork*. + */ + void _waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk); + + /** + * Returns true if there are ready requests for the network thread to service. + */ + bool _hasReadyRequests_inlock(); + + /** + * Returns true if the network thread could run right now. + */ + bool _isNetworkThreadRunnable_inlock(); + + /** + * Returns true if the executor thread could run right now. + */ + bool _isExecutorThreadRunnable_inlock(); + + /** + * Runs all ready network operations, called while holding "lk". May drop and + * reaquire "lk" several times, but will not return until the executor has blocked + * in waitFor*. + */ + void _runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk); + + // Mutex that synchronizes access to mutable data in this class and its subclasses. + // Fields guarded by the mutex are labled (M), below, and those that are read-only + // in multi-threaded execution, and so unsynchronized, are labeled (R). + stdx::mutex _mutex; + + // Condition signaled to indicate that the network processing thread should wake up. + stdx::condition_variable _shouldWakeNetworkCondition; // (M) + + // Condition signaled to indicate that the executor run thread should wake up. + stdx::condition_variable _shouldWakeExecutorCondition; // (M) + + // Bitmask indicating which threads are runnable. + int _waitingToRunMask; // (M) + + // Indicator of which thread, if any, is currently running. + ThreadType _currentlyRunning; // (M) + + // The current time reported by this instance of NetworkInterfaceMock. + Date_t _now; // (M) + + // Set to true by "startUp()" + bool _hasStarted; // (M) + + // Set to true by "shutDown()". + bool _inShutdown; // (M) + + // Next date that the executor expects to wake up at (due to a scheduleWorkAt() call). + Date_t _executorNextWakeupDate; // (M) + + // List of network operations whose responses haven't been scheduled or blackholed. This is + // where network requests are first queued. It is sorted by + // NetworkOperation::_nextConsiderationDate, which is set to now() when startCommand() is + // called, and adjusted by requeueAt(). + NetworkOperationList _unscheduled; // (M) + + // List of network operations that have been returned by getNextReadyRequest() but not + // yet scheudled, black-holed or requeued. + NetworkOperationList _processing; // (M) + + // List of network operations whose responses have been scheduled but not delivered, sorted + // by NetworkOperation::_responseDate. These operations will have their responses delivered + // when now() == getResponseDate(). + NetworkOperationList _scheduled; // (M) + + // List of network operations that will not be responded to until shutdown() is called. + NetworkOperationList _blackHoled; // (M) +}; + +/** + * Representation of an in-progress network operation. + */ +class NetworkInterfaceMock::NetworkOperation { +public: + NetworkOperation(); + NetworkOperation(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& theRequest, + Date_t theRequestDate, + const RemoteCommandCompletionFn& onFinish); + ~NetworkOperation(); + + /** + * Adjusts the stored virtual time at which this entry will be subject to consideration + * by the test harness. + */ + void setNextConsiderationDate(Date_t nextConsiderationDate); + + /** + * Sets the response and thet virtual time at which it will be delivered. + */ + void setResponse(Date_t responseDate, const TaskExecutor::ResponseStatus& response); + + /** + * Predicate that returns true if cbHandle equals the executor's handle for this network + * operation. Used for searching lists of NetworkOperations. + */ + bool isForCallback(const TaskExecutor::CallbackHandle& cbHandle) const { + return cbHandle == _cbHandle; + } + + /** + * Gets the request that initiated this operation. + */ + const RemoteCommandRequest& getRequest() const { + return _request; + } + + /** + * Gets the virtual time at which the operation was started. + */ + Date_t getRequestDate() const { + return _requestDate; + } + + /** + * Gets the virtual time at which the test harness should next consider what to do + * with this request. + */ + Date_t getNextConsiderationDate() const { + return _nextConsiderationDate; + } + + /** + * After setResponse() has been called, returns the virtual time at which + * the response should be delivered. */ - class NetworkInterfaceMock : public NetworkInterface { - public: - class NetworkOperation; - typedef stdx::list<NetworkOperation> NetworkOperationList; - typedef NetworkOperationList::iterator NetworkOperationIterator; - - NetworkInterfaceMock(); - virtual ~NetworkInterfaceMock(); - virtual std::string getDiagnosticString(); - - //////////////////////////////////////////////////////////////////////////////// - // - // NetworkInterface methods - // - //////////////////////////////////////////////////////////////////////////////// - - virtual void startup(); - virtual void shutdown(); - virtual void waitForWork(); - virtual void waitForWorkUntil(Date_t when); - virtual void signalWorkAvailable(); - virtual Date_t now(); - virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish); - virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle); - - - //////////////////////////////////////////////////////////////////////////////// - // - // Methods for simulating network operations and the passage of time. - // - // Methods in this section are to be called by the thread currently simulating - // the network. - // - //////////////////////////////////////////////////////////////////////////////// - - /** - * Causes the currently running (non-executor) thread to assume the mantle of the network - * simulation thread. - * - * Call this before calling any of the other methods in this section. - */ - void enterNetwork(); - - /** - * Causes the currently running thread to drop the mantle of "network simulation thread". - * - * Call this before calling any methods that might block waiting for the - * executor thread. - */ - void exitNetwork(); - - /** - * Returns true if there are unscheduled network requests to be processed. - */ - bool hasReadyRequests(); - - /** - * Gets the next unscheduled request to process, blocking until one is available. - * - * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork. - */ - NetworkOperationIterator getNextReadyRequest(); - - /** - * Schedules "response" in response to "noi" at virtual time "when". - */ - void scheduleResponse( - NetworkOperationIterator noi, - Date_t when, - const TaskExecutor::ResponseStatus& response); - - /** - * Swallows "noi", causing the network interface to not respond to it until - * shutdown() is called. - */ - void blackHole(NetworkOperationIterator noi); - - /** - * Defers decision making on "noi" until virtual time "dontAskUntil". Use - * this when getNextReadyRequest() returns a request you want to deal with - * after looking at other requests. - */ - void requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil); - - /** - * Runs the simulator forward until now() == until or hasReadyRequests() is true. - * - * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork. - */ - void runUntil(Date_t until); - - /** - * Processes all ready, scheduled network operations. - * - * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork. - */ - void runReadyNetworkOperations(); - - private: - /** - * Type used to identify which thread (network mock or executor) is currently executing. - * - * Values are used in a bitmask, as well. - */ - enum ThreadType { - kNoThread = 0, - kExecutorThread = 1, - kNetworkThread = 2 - }; - - /** - * Returns the current virtualized time. - */ - Date_t _now_inlock() const { return _now; } - - /** - * Implementation of waitForWork*. - */ - void _waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk); - - /** - * Returns true if there are ready requests for the network thread to service. - */ - bool _hasReadyRequests_inlock(); - - /** - * Returns true if the network thread could run right now. - */ - bool _isNetworkThreadRunnable_inlock(); - - /** - * Returns true if the executor thread could run right now. - */ - bool _isExecutorThreadRunnable_inlock(); - - /** - * Runs all ready network operations, called while holding "lk". May drop and - * reaquire "lk" several times, but will not return until the executor has blocked - * in waitFor*. - */ - void _runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk); - - // Mutex that synchronizes access to mutable data in this class and its subclasses. - // Fields guarded by the mutex are labled (M), below, and those that are read-only - // in multi-threaded execution, and so unsynchronized, are labeled (R). - stdx::mutex _mutex; - - // Condition signaled to indicate that the network processing thread should wake up. - stdx::condition_variable _shouldWakeNetworkCondition; // (M) - - // Condition signaled to indicate that the executor run thread should wake up. - stdx::condition_variable _shouldWakeExecutorCondition; // (M) - - // Bitmask indicating which threads are runnable. - int _waitingToRunMask; // (M) - - // Indicator of which thread, if any, is currently running. - ThreadType _currentlyRunning; // (M) - - // The current time reported by this instance of NetworkInterfaceMock. - Date_t _now; // (M) - - // Set to true by "startUp()" - bool _hasStarted; // (M) - - // Set to true by "shutDown()". - bool _inShutdown; // (M) - - // Next date that the executor expects to wake up at (due to a scheduleWorkAt() call). - Date_t _executorNextWakeupDate; // (M) - - // List of network operations whose responses haven't been scheduled or blackholed. This is - // where network requests are first queued. It is sorted by - // NetworkOperation::_nextConsiderationDate, which is set to now() when startCommand() is - // called, and adjusted by requeueAt(). - NetworkOperationList _unscheduled; // (M) - - // List of network operations that have been returned by getNextReadyRequest() but not - // yet scheudled, black-holed or requeued. - NetworkOperationList _processing; // (M) - - // List of network operations whose responses have been scheduled but not delivered, sorted - // by NetworkOperation::_responseDate. These operations will have their responses delivered - // when now() == getResponseDate(). - NetworkOperationList _scheduled; // (M) - - // List of network operations that will not be responded to until shutdown() is called. - NetworkOperationList _blackHoled; // (M) - }; + Date_t getResponseDate() const { + return _responseDate; + } /** - * Representation of an in-progress network operation. + * Delivers the response, by invoking the onFinish callback passed into the constructor. */ - class NetworkInterfaceMock::NetworkOperation { - public: - NetworkOperation(); - NetworkOperation(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& theRequest, - Date_t theRequestDate, - const RemoteCommandCompletionFn& onFinish); - ~NetworkOperation(); - - /** - * Adjusts the stored virtual time at which this entry will be subject to consideration - * by the test harness. - */ - void setNextConsiderationDate(Date_t nextConsiderationDate); - - /** - * Sets the response and thet virtual time at which it will be delivered. - */ - void setResponse(Date_t responseDate, const TaskExecutor::ResponseStatus& response); - - /** - * Predicate that returns true if cbHandle equals the executor's handle for this network - * operation. Used for searching lists of NetworkOperations. - */ - bool isForCallback(const TaskExecutor::CallbackHandle& cbHandle) const { - return cbHandle == _cbHandle; - } - - /** - * Gets the request that initiated this operation. - */ - const RemoteCommandRequest& getRequest() const { return _request; } - - /** - * Gets the virtual time at which the operation was started. - */ - Date_t getRequestDate() const { return _requestDate; } - - /** - * Gets the virtual time at which the test harness should next consider what to do - * with this request. - */ - Date_t getNextConsiderationDate() const { return _nextConsiderationDate; } - - /** - * After setResponse() has been called, returns the virtual time at which - * the response should be delivered. - */ - Date_t getResponseDate() const { return _responseDate; } - - /** - * Delivers the response, by invoking the onFinish callback passed into the constructor. - */ - void finishResponse(); - - private: - Date_t _requestDate; - Date_t _nextConsiderationDate; - Date_t _responseDate; - TaskExecutor::CallbackHandle _cbHandle; - RemoteCommandRequest _request; - TaskExecutor::ResponseStatus _response; - RemoteCommandCompletionFn _onFinish; - }; + void finishResponse(); + +private: + Date_t _requestDate; + Date_t _nextConsiderationDate; + Date_t _responseDate; + TaskExecutor::CallbackHandle _cbHandle; + RemoteCommandRequest _request; + TaskExecutor::ResponseStatus _response; + RemoteCommandCompletionFn _onFinish; +}; } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/task_executor.cpp b/src/mongo/executor/task_executor.cpp index b7f1e4f99bf..1ecbd0f1e62 100644 --- a/src/mongo/executor/task_executor.cpp +++ b/src/mongo/executor/task_executor.cpp @@ -33,63 +33,53 @@ namespace mongo { namespace executor { - TaskExecutor::TaskExecutor() = default; - TaskExecutor::~TaskExecutor() = default; - - TaskExecutor::CallbackState::CallbackState() = default; - TaskExecutor::CallbackState::~CallbackState() = default; - - TaskExecutor::CallbackHandle::CallbackHandle() = default; - TaskExecutor::CallbackHandle::CallbackHandle(std::shared_ptr<CallbackState> callback) : - _callback(std::move(callback)) {} - - TaskExecutor::EventState::EventState() = default; - TaskExecutor::EventState::~EventState() = default; - - TaskExecutor::EventHandle::EventHandle() = default; - TaskExecutor::EventHandle::EventHandle(std::shared_ptr<EventState> event) : - _event(std::move(event)) {} - - TaskExecutor::CallbackArgs::CallbackArgs(TaskExecutor* theExecutor, - const CallbackHandle& theHandle, - const Status& theStatus, - OperationContext* theTxn) : - executor(theExecutor), - myHandle(theHandle), - status(theStatus), - txn(theTxn) { - } - - - TaskExecutor::RemoteCommandCallbackArgs::RemoteCommandCallbackArgs( - TaskExecutor* theExecutor, - const CallbackHandle& theHandle, - const RemoteCommandRequest& theRequest, - const ResponseStatus& theResponse) : - executor(theExecutor), - myHandle(theHandle), - request(theRequest), - response(theResponse) { - } - - TaskExecutor::CallbackState* TaskExecutor::getCallbackFromHandle( - const CallbackHandle& cbHandle) { - return cbHandle.getCallback(); - } - - TaskExecutor::EventState* TaskExecutor::getEventFromHandle(const EventHandle& eventHandle) { - return eventHandle.getEvent(); - } - - void TaskExecutor::setEventForHandle(EventHandle* eventHandle, - std::shared_ptr<EventState> event) { - eventHandle->setEvent(std::move(event)); - } - - void TaskExecutor::setCallbackForHandle(CallbackHandle* cbHandle, - std::shared_ptr<CallbackState> callback) { - cbHandle->setCallback(std::move(callback)); - } - -} // namespace executor -} // namespace mongo +TaskExecutor::TaskExecutor() = default; +TaskExecutor::~TaskExecutor() = default; + +TaskExecutor::CallbackState::CallbackState() = default; +TaskExecutor::CallbackState::~CallbackState() = default; + +TaskExecutor::CallbackHandle::CallbackHandle() = default; +TaskExecutor::CallbackHandle::CallbackHandle(std::shared_ptr<CallbackState> callback) + : _callback(std::move(callback)) {} + +TaskExecutor::EventState::EventState() = default; +TaskExecutor::EventState::~EventState() = default; + +TaskExecutor::EventHandle::EventHandle() = default; +TaskExecutor::EventHandle::EventHandle(std::shared_ptr<EventState> event) + : _event(std::move(event)) {} + +TaskExecutor::CallbackArgs::CallbackArgs(TaskExecutor* theExecutor, + const CallbackHandle& theHandle, + const Status& theStatus, + OperationContext* theTxn) + : executor(theExecutor), myHandle(theHandle), status(theStatus), txn(theTxn) {} + + +TaskExecutor::RemoteCommandCallbackArgs::RemoteCommandCallbackArgs( + TaskExecutor* theExecutor, + const CallbackHandle& theHandle, + const RemoteCommandRequest& theRequest, + const ResponseStatus& theResponse) + : executor(theExecutor), myHandle(theHandle), request(theRequest), response(theResponse) {} + +TaskExecutor::CallbackState* TaskExecutor::getCallbackFromHandle(const CallbackHandle& cbHandle) { + return cbHandle.getCallback(); +} + +TaskExecutor::EventState* TaskExecutor::getEventFromHandle(const EventHandle& eventHandle) { + return eventHandle.getEvent(); +} + +void TaskExecutor::setEventForHandle(EventHandle* eventHandle, std::shared_ptr<EventState> event) { + eventHandle->setEvent(std::move(event)); +} + +void TaskExecutor::setCallbackForHandle(CallbackHandle* cbHandle, + std::shared_ptr<CallbackState> callback) { + cbHandle->setCallback(std::move(callback)); +} + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 5f42ebeee08..46e195bd7a9 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -41,328 +41,318 @@ namespace mongo { - class OperationContext; +class OperationContext; namespace executor { +/** + * Generic event loop with notions of events and callbacks. + * + * Callbacks represent work to be performed by the executor. + * They may be scheduled by client threads or by other callbacks. Methods that + * schedule callbacks return a CallbackHandle if they are able to enqueue the callback in the + * appropriate work queue. Every CallbackHandle represents an invocation of a function that + * will happen before the executor goes out of scope. Calling cancel(CallbackHandle) schedules + * the specified callback to run with a flag indicating that it is "canceled," but it will run. + * Client threads may block waiting for a callback to execute by calling wait(CallbackHandle). + * + * Events are level-triggered and may only be signaled one time. Client threads and callbacks + * may schedule callbacks to be run by the executor after the event is signaled, and client + * threads may ask the executor to block them until after the event is signaled. + * + * If an event is unsignaled when shutdown is called, the executor will ensure that any threads + * blocked in waitForEvent() eventually return. + * + * Logically, Callbacks and Events exist for the life of the executor. That means that while + * the executor is in scope, no CallbackHandle or EventHandle is stale. + */ +class TaskExecutor { + MONGO_DISALLOW_COPYING(TaskExecutor); + +public: + struct CallbackArgs; + struct RemoteCommandCallbackArgs; + class CallbackState; + class CallbackHandle; + class EventState; + class EventHandle; + + using ResponseStatus = StatusWith<RemoteCommandResponse>; + /** - * Generic event loop with notions of events and callbacks. + * Type of a regular callback function. * - * Callbacks represent work to be performed by the executor. - * They may be scheduled by client threads or by other callbacks. Methods that - * schedule callbacks return a CallbackHandle if they are able to enqueue the callback in the - * appropriate work queue. Every CallbackHandle represents an invocation of a function that - * will happen before the executor goes out of scope. Calling cancel(CallbackHandle) schedules - * the specified callback to run with a flag indicating that it is "canceled," but it will run. - * Client threads may block waiting for a callback to execute by calling wait(CallbackHandle). + * The status argument passed at invocation will have code ErrorCodes::CallbackCanceled if + * the callback was canceled for any reason (including shutdown). Otherwise, it should have + * Status::OK(). + */ + using CallbackFn = stdx::function<void(const CallbackArgs&)>; + + /** + * Type of a callback from a request to run a command on a remote MongoDB node. * - * Events are level-triggered and may only be signaled one time. Client threads and callbacks - * may schedule callbacks to be run by the executor after the event is signaled, and client - * threads may ask the executor to block them until after the event is signaled. + * The StatusWith<const BSONObj> will have ErrorCodes::CallbackCanceled if the callback was + * canceled. Otherwise, its status will represent any failure to execute the command. + * If the command executed and a response came back, then the status object will contain + * the BSONObj returned by the command, with the "ok" field indicating the success of the + * command in the usual way. + */ + using RemoteCommandCallbackFn = stdx::function<void(const RemoteCommandCallbackArgs&)>; + + virtual ~TaskExecutor(); + + /** + * Signals to the executor that it should shut down. + */ + virtual void shutdown() = 0; + + /** + * Returns diagnostic information. + */ + virtual std::string getDiagnosticString() = 0; + + /** + * Gets the current time. Callbacks should use this method to read the system clock. + */ + virtual Date_t now() = 0; + + /** + * Creates a new event. Returns a handle to the event, or ErrorCodes::ShutdownInProgress if + * makeEvent() fails because the executor is shutting down. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual StatusWith<EventHandle> makeEvent() = 0; + + /** + * Signals the event, making waiting client threads and callbacks runnable. * - * If an event is unsignaled when shutdown is called, the executor will ensure that any threads - * blocked in waitForEvent() eventually return. + * May be called up to one time per event. * - * Logically, Callbacks and Events exist for the life of the executor. That means that while - * the executor is in scope, no CallbackHandle or EventHandle is stale. + * May be called by client threads or callbacks running in the executor. */ - class TaskExecutor { - MONGO_DISALLOW_COPYING(TaskExecutor); - public: - struct CallbackArgs; - struct RemoteCommandCallbackArgs; - class CallbackState; - class CallbackHandle; - class EventState; - class EventHandle; - - using ResponseStatus = StatusWith<RemoteCommandResponse>; - - /** - * Type of a regular callback function. - * - * The status argument passed at invocation will have code ErrorCodes::CallbackCanceled if - * the callback was canceled for any reason (including shutdown). Otherwise, it should have - * Status::OK(). - */ - using CallbackFn = stdx::function<void (const CallbackArgs&)>; - - /** - * Type of a callback from a request to run a command on a remote MongoDB node. - * - * The StatusWith<const BSONObj> will have ErrorCodes::CallbackCanceled if the callback was - * canceled. Otherwise, its status will represent any failure to execute the command. - * If the command executed and a response came back, then the status object will contain - * the BSONObj returned by the command, with the "ok" field indicating the success of the - * command in the usual way. - */ - using RemoteCommandCallbackFn = stdx::function<void (const RemoteCommandCallbackArgs&)>; - - virtual ~TaskExecutor(); - - /** - * Signals to the executor that it should shut down. - */ - virtual void shutdown() = 0; - - /** - * Returns diagnostic information. - */ - virtual std::string getDiagnosticString() = 0; - - /** - * Gets the current time. Callbacks should use this method to read the system clock. - */ - virtual Date_t now() = 0; - - /** - * Creates a new event. Returns a handle to the event, or ErrorCodes::ShutdownInProgress if - * makeEvent() fails because the executor is shutting down. - * - * May be called by client threads or callbacks running in the executor. - */ - virtual StatusWith<EventHandle> makeEvent() = 0; - - /** - * Signals the event, making waiting client threads and callbacks runnable. - * - * May be called up to one time per event. - * - * May be called by client threads or callbacks running in the executor. - */ - virtual void signalEvent(const EventHandle& event) = 0; - - /** - * Schedules a callback, "work", to run after "event" is signaled. If "event" - * has already been signaled, marks "work" as immediately runnable. - * - * If "event" has yet to be signaled when "shutdown()" is called, "work" will - * be scheduled with a status of ErrorCodes::CallbackCanceled. - * - * May be called by client threads or callbacks running in the executor. - */ - virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, - const CallbackFn& work) = 0; - - /** - * Blocks the calling thread until after "event" is signaled. Also returns - * if the event is never signaled but shutdown() is called on the executor. - * - * NOTE: Do not call from a callback running in the executor. - * - * TODO(schwerin): Change return type so that the caller can know which of the two reasons - * led to this method returning. - */ - virtual void waitForEvent(const EventHandle& event) = 0; - - /** - * Schedules "work" to be run by the executor ASAP. - * - * Returns a handle for waiting on or canceling the callback, or - * ErrorCodes::ShutdownInProgress. - * - * May be called by client threads or callbacks running in the executor. - */ - virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) = 0; - - /** - * Schedules "work" to be run by the executor no sooner than "when". - * - * Returns a handle for waiting on or canceling the callback, or - * ErrorCodes::ShutdownInProgress. - * - * May be called by client threads or callbacks running in the executor. - */ - virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) = 0; - - /** - * Schedules "cb" to be run by the executor with the result of executing the remote command - * described by "request". - * - * Returns a handle for waiting on or canceling the callback, or - * ErrorCodes::ShutdownInProgress. - * - * May be called by client threads or callbacks running in the executor. - */ - virtual StatusWith<CallbackHandle> scheduleRemoteCommand( - const RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) = 0; - - /** - * If the callback referenced by "cbHandle" hasn't already executed, marks it as - * canceled and runnable. - * - * May be called by client threads or callbacks running in the executor. - */ - virtual void cancel(const CallbackHandle& cbHandle) = 0; - - /** - * Blocks until the executor finishes running the callback referenced by "cbHandle". - * - * Because callbacks all run during shutdown if they weren't run beforehand, there is no need - * to indicate the reason for returning from wait(CallbackHandle). It is always that the - * callback ran. - * - * NOTE: Do not call from a callback running in the executor. - */ - virtual void wait(const CallbackHandle& cbHandle) = 0; - - protected: - - TaskExecutor(); - - // Retrieves the Callback from a given CallbackHandle - CallbackState* getCallbackFromHandle(const CallbackHandle& cbHandle); - - // Retrieves the Event from a given EventHandle - EventState* getEventFromHandle(const EventHandle& eventHandle); - - // Sets the given CallbackHandle to point to the given callback. - void setCallbackForHandle(CallbackHandle* cbHandle, - std::shared_ptr<CallbackState> callback); - - // Sets the given EventHandle to point to the given event. - void setEventForHandle(EventHandle* eventHandle, std::shared_ptr<EventState> event); - }; + virtual void signalEvent(const EventHandle& event) = 0; /** - * Class representing a scheduled callback and providing methods for interacting with it. + * Schedules a callback, "work", to run after "event" is signaled. If "event" + * has already been signaled, marks "work" as immediately runnable. + * + * If "event" has yet to be signaled when "shutdown()" is called, "work" will + * be scheduled with a status of ErrorCodes::CallbackCanceled. + * + * May be called by client threads or callbacks running in the executor. */ - class TaskExecutor::CallbackState { - MONGO_DISALLOW_COPYING(CallbackState); - public: + virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, + const CallbackFn& work) = 0; - virtual ~CallbackState(); + /** + * Blocks the calling thread until after "event" is signaled. Also returns + * if the event is never signaled but shutdown() is called on the executor. + * + * NOTE: Do not call from a callback running in the executor. + * + * TODO(schwerin): Change return type so that the caller can know which of the two reasons + * led to this method returning. + */ + virtual void waitForEvent(const EventHandle& event) = 0; - virtual void cancel() = 0; - virtual void waitForCompletion() = 0; + /** + * Schedules "work" to be run by the executor ASAP. + * + * Returns a handle for waiting on or canceling the callback, or + * ErrorCodes::ShutdownInProgress. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) = 0; - protected: + /** + * Schedules "work" to be run by the executor no sooner than "when". + * + * Returns a handle for waiting on or canceling the callback, or + * ErrorCodes::ShutdownInProgress. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) = 0; - CallbackState(); + /** + * Schedules "cb" to be run by the executor with the result of executing the remote command + * described by "request". + * + * Returns a handle for waiting on or canceling the callback, or + * ErrorCodes::ShutdownInProgress. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb) = 0; - }; + /** + * If the callback referenced by "cbHandle" hasn't already executed, marks it as + * canceled and runnable. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual void cancel(const CallbackHandle& cbHandle) = 0; /** - * Handle to a CallbackState. + * Blocks until the executor finishes running the callback referenced by "cbHandle". + * + * Because callbacks all run during shutdown if they weren't run beforehand, there is no need + * to indicate the reason for returning from wait(CallbackHandle). It is always that the + * callback ran. + * + * NOTE: Do not call from a callback running in the executor. */ - class TaskExecutor::CallbackHandle { - friend class TaskExecutor; + virtual void wait(const CallbackHandle& cbHandle) = 0; - public: +protected: + TaskExecutor(); - CallbackHandle(); - explicit CallbackHandle(std::shared_ptr<CallbackState> cbData); + // Retrieves the Callback from a given CallbackHandle + CallbackState* getCallbackFromHandle(const CallbackHandle& cbHandle); - bool operator==(const CallbackHandle &other) const { - return _callback == other._callback; - } + // Retrieves the Event from a given EventHandle + EventState* getEventFromHandle(const EventHandle& eventHandle); - bool operator!=(const CallbackHandle &other) const { - return !(*this == other); - } + // Sets the given CallbackHandle to point to the given callback. + void setCallbackForHandle(CallbackHandle* cbHandle, std::shared_ptr<CallbackState> callback); - bool isValid() const { - return _callback.get(); - } + // Sets the given EventHandle to point to the given event. + void setEventForHandle(EventHandle* eventHandle, std::shared_ptr<EventState> event); +}; - private: +/** + * Class representing a scheduled callback and providing methods for interacting with it. + */ +class TaskExecutor::CallbackState { + MONGO_DISALLOW_COPYING(CallbackState); - void setCallback(std::shared_ptr<CallbackState> callback) { - _callback = callback; - } +public: + virtual ~CallbackState(); - CallbackState* getCallback() const { - return _callback.get(); - } + virtual void cancel() = 0; + virtual void waitForCompletion() = 0; - std::shared_ptr<CallbackState> _callback; - }; +protected: + CallbackState(); +}; - /** - * Class representing a scheduled event and providing methods for interacting with it. - */ - class TaskExecutor::EventState { - MONGO_DISALLOW_COPYING(EventState); - public: +/** + * Handle to a CallbackState. + */ +class TaskExecutor::CallbackHandle { + friend class TaskExecutor; - virtual ~EventState(); +public: + CallbackHandle(); + explicit CallbackHandle(std::shared_ptr<CallbackState> cbData); - virtual void signal() = 0; - virtual void waitUntilSignaled() = 0; - virtual bool isSignaled() = 0; + bool operator==(const CallbackHandle& other) const { + return _callback == other._callback; + } - protected: + bool operator!=(const CallbackHandle& other) const { + return !(*this == other); + } - EventState(); - }; + bool isValid() const { + return _callback.get(); + } - /** - * Handle to an EventState. - */ - class TaskExecutor::EventHandle { - friend class TaskExecutor; +private: + void setCallback(std::shared_ptr<CallbackState> callback) { + _callback = callback; + } - public: + CallbackState* getCallback() const { + return _callback.get(); + } - EventHandle(); - explicit EventHandle(std::shared_ptr<EventState> event); + std::shared_ptr<CallbackState> _callback; +}; - bool operator==(const EventHandle &other) const { - return _event == other._event; - } +/** + * Class representing a scheduled event and providing methods for interacting with it. + */ +class TaskExecutor::EventState { + MONGO_DISALLOW_COPYING(EventState); - bool operator!=(const EventHandle &other) const { - return !(*this == other); - } +public: + virtual ~EventState(); - bool isValid() const { - return _event.get(); - } + virtual void signal() = 0; + virtual void waitUntilSignaled() = 0; + virtual bool isSignaled() = 0; - private: +protected: + EventState(); +}; - void setEvent(std::shared_ptr<EventState> event) { - _event = event; - } +/** + * Handle to an EventState. + */ +class TaskExecutor::EventHandle { + friend class TaskExecutor; - EventState* getEvent() const { - return _event.get(); - } +public: + EventHandle(); + explicit EventHandle(std::shared_ptr<EventState> event); - std::shared_ptr<EventState> _event; - }; + bool operator==(const EventHandle& other) const { + return _event == other._event; + } - /** - * Argument passed to all callbacks scheduled via a TaskExecutor. - */ - struct TaskExecutor::CallbackArgs { - CallbackArgs(TaskExecutor* theExecutor, - const CallbackHandle& theHandle, - const Status& theStatus, - OperationContext* txn = NULL); - - TaskExecutor* executor; - CallbackHandle myHandle; - Status status; - OperationContext* txn; - }; + bool operator!=(const EventHandle& other) const { + return !(*this == other); + } - /** - * Argument passed to all remote command callbacks scheduled via a TaskExecutor. - */ - struct TaskExecutor::RemoteCommandCallbackArgs { - RemoteCommandCallbackArgs(TaskExecutor* theExecutor, - const CallbackHandle& theHandle, - const RemoteCommandRequest& theRequest, - const StatusWith<RemoteCommandResponse>& theResponse); + bool isValid() const { + return _event.get(); + } - TaskExecutor* executor; - CallbackHandle myHandle; - RemoteCommandRequest request; - StatusWith<RemoteCommandResponse> response; - }; +private: + void setEvent(std::shared_ptr<EventState> event) { + _event = event; + } -} // namespace executor -} // namespace mongo + EventState* getEvent() const { + return _event.get(); + } + std::shared_ptr<EventState> _event; +}; + +/** + * Argument passed to all callbacks scheduled via a TaskExecutor. + */ +struct TaskExecutor::CallbackArgs { + CallbackArgs(TaskExecutor* theExecutor, + const CallbackHandle& theHandle, + const Status& theStatus, + OperationContext* txn = NULL); + + TaskExecutor* executor; + CallbackHandle myHandle; + Status status; + OperationContext* txn; +}; + +/** + * Argument passed to all remote command callbacks scheduled via a TaskExecutor. + */ +struct TaskExecutor::RemoteCommandCallbackArgs { + RemoteCommandCallbackArgs(TaskExecutor* theExecutor, + const CallbackHandle& theHandle, + const RemoteCommandRequest& theRequest, + const StatusWith<RemoteCommandResponse>& theResponse); + + TaskExecutor* executor; + CallbackHandle myHandle; + RemoteCommandRequest request; + StatusWith<RemoteCommandResponse> response; +}; + +} // namespace executor +} // namespace mongo |