summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/network_interface.cpp16
-rw-r--r--src/mongo/executor/network_interface.h157
-rw-r--r--src/mongo/executor/network_interface_impl.cpp376
-rw-r--r--src/mongo/executor/network_interface_impl.h230
-rw-r--r--src/mongo/executor/network_interface_mock.cpp632
-rw-r--r--src/mongo/executor/network_interface_mock.h549
-rw-r--r--src/mongo/executor/task_executor.cpp110
-rw-r--r--src/mongo/executor/task_executor.h542
8 files changed, 1284 insertions, 1328 deletions
diff --git a/src/mongo/executor/network_interface.cpp b/src/mongo/executor/network_interface.cpp
index f748efabfeb..f025b1b1de3 100644
--- a/src/mongo/executor/network_interface.cpp
+++ b/src/mongo/executor/network_interface.cpp
@@ -34,15 +34,15 @@
namespace mongo {
namespace executor {
- // This is a bitmask with the first bit set. It's used to mark connections that should be kept
- // open during stepdowns.
+// This is a bitmask with the first bit set. It's used to mark connections that should be kept
+// open during stepdowns.
#ifndef _MSC_EXTENSIONS
- const unsigned int NetworkInterface::kMessagingPortKeepOpen;
-#endif // _MSC_EXTENSIONS
+const unsigned int NetworkInterface::kMessagingPortKeepOpen;
+#endif // _MSC_EXTENSIONS
- NetworkInterface::NetworkInterface() {}
- NetworkInterface::~NetworkInterface() {}
+NetworkInterface::NetworkInterface() {}
+NetworkInterface::~NetworkInterface() {}
-} // namespace executor
-} // namespace mongo
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index ef664b7d529..3db07ba382f 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -37,84 +37,83 @@
namespace mongo {
namespace executor {
+/**
+ * Interface to networking for use by TaskExecutor implementations.
+ */
+class NetworkInterface {
+ MONGO_DISALLOW_COPYING(NetworkInterface);
+
+public:
+ // A flag to keep replication MessagingPorts open when all other sockets are disconnected.
+ static const unsigned int kMessagingPortKeepOpen = 1;
+
+ using Response = RemoteCommandResponse;
+ using RemoteCommandCompletionFn = stdx::function<void(const TaskExecutor::ResponseStatus&)>;
+
+ virtual ~NetworkInterface();
+
+ /**
+ * Returns diagnostic info.
+ */
+ virtual std::string getDiagnosticString() = 0;
+
+ /**
+ * Starts up the network interface.
+ *
+ * It is valid to call all methods except shutdown() before this method completes. That is,
+ * implementations may not assume that startup() completes before startCommand() first
+ * executes.
+ *
+ * Called by the owning TaskExecutor inside its run() method.
+ */
+ virtual void startup() = 0;
+
+ /**
+ * Shuts down the network interface. Must be called before this instance gets deleted,
+ * if startup() is called.
+ *
+ * Called by the owning TaskExecutor inside its run() method.
+ */
+ virtual void shutdown() = 0;
+
+ /**
+ * Blocks the current thread (presumably the executor thread) until the network interface
+ * knows of work for the executor to perform.
+ */
+ virtual void waitForWork() = 0;
+
+ /**
+ * Similar to waitForWork, but only blocks until "when".
+ */
+ virtual void waitForWorkUntil(Date_t when) = 0;
+
+ /**
+ * Signals to the network interface that there is new work (such as a signaled event) for
+ * the executor to process. Wakes the executor from waitForWork() and friends.
+ */
+ virtual void signalWorkAvailable() = 0;
+
/**
- * Interface to networking for use by TaskExecutor implementations.
+ * Returns the current time.
*/
- class NetworkInterface {
- MONGO_DISALLOW_COPYING(NetworkInterface);
- public:
-
- // A flag to keep replication MessagingPorts open when all other sockets are disconnected.
- static const unsigned int kMessagingPortKeepOpen = 1;
-
- using Response = RemoteCommandResponse;
- using RemoteCommandCompletionFn =
- stdx::function<void (const TaskExecutor::ResponseStatus&)>;
-
- virtual ~NetworkInterface();
-
- /**
- * Returns diagnostic info.
- */
- virtual std::string getDiagnosticString() = 0;
-
- /**
- * Starts up the network interface.
- *
- * It is valid to call all methods except shutdown() before this method completes. That is,
- * implementations may not assume that startup() completes before startCommand() first
- * executes.
- *
- * Called by the owning TaskExecutor inside its run() method.
- */
- virtual void startup() = 0;
-
- /**
- * Shuts down the network interface. Must be called before this instance gets deleted,
- * if startup() is called.
- *
- * Called by the owning TaskExecutor inside its run() method.
- */
- virtual void shutdown() = 0;
-
- /**
- * Blocks the current thread (presumably the executor thread) until the network interface
- * knows of work for the executor to perform.
- */
- virtual void waitForWork() = 0;
-
- /**
- * Similar to waitForWork, but only blocks until "when".
- */
- virtual void waitForWorkUntil(Date_t when) = 0;
-
- /**
- * Signals to the network interface that there is new work (such as a signaled event) for
- * the executor to process. Wakes the executor from waitForWork() and friends.
- */
- virtual void signalWorkAvailable() = 0;
-
- /**
- * Returns the current time.
- */
- virtual Date_t now() = 0;
-
- /**
- * Starts asynchronous execution of the command described by "request".
- */
- virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) = 0;
-
- /**
- * Requests cancelation of the network activity associated with "cbHandle" if it has not yet
- * completed.
- */
- virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) = 0;
-
- protected:
- NetworkInterface();
- };
-
-} // namespace executor
-} // namespace mongo
+ virtual Date_t now() = 0;
+
+ /**
+ * Starts asynchronous execution of the command described by "request".
+ */
+ virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish) = 0;
+
+ /**
+ * Requests cancelation of the network activity associated with "cbHandle" if it has not yet
+ * completed.
+ */
+ virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) = 0;
+
+protected:
+ NetworkInterface();
+};
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/network_interface_impl.cpp b/src/mongo/executor/network_interface_impl.cpp
index 4e443a45165..0b997f7f219 100644
--- a/src/mongo/executor/network_interface_impl.cpp
+++ b/src/mongo/executor/network_interface_impl.cpp
@@ -46,232 +46,220 @@ namespace executor {
namespace {
- const size_t kMinThreads = 1;
- const size_t kMaxThreads = 51; // Set to 1 + max repl set size, for heartbeat + wiggle room.
- const Seconds kMaxIdleThreadAge(30);
+const size_t kMinThreads = 1;
+const size_t kMaxThreads = 51; // Set to 1 + max repl set size, for heartbeat + wiggle room.
+const Seconds kMaxIdleThreadAge(30);
} // namespace
- NetworkInterfaceImpl::NetworkInterfaceImpl() :
- NetworkInterface(),
- _numIdleThreads(0),
- _nextThreadId(0),
- _lastFullUtilizationDate(),
- _isExecutorRunnable(false),
- _inShutdown(false),
- _commandRunner(kMessagingPortKeepOpen),
- _numActiveNetworkRequests(0) {
+NetworkInterfaceImpl::NetworkInterfaceImpl()
+ : NetworkInterface(),
+ _numIdleThreads(0),
+ _nextThreadId(0),
+ _lastFullUtilizationDate(),
+ _isExecutorRunnable(false),
+ _inShutdown(false),
+ _commandRunner(kMessagingPortKeepOpen),
+ _numActiveNetworkRequests(0) {}
- }
-
- NetworkInterfaceImpl::~NetworkInterfaceImpl() { }
+NetworkInterfaceImpl::~NetworkInterfaceImpl() {}
- std::string NetworkInterfaceImpl::getDiagnosticString() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- str::stream output;
- output << "NetworkImpl";
- output << " threads:" << _threads.size();
- output << " inShutdown:" << _inShutdown;
- output << " active:" << _numActiveNetworkRequests;
- output << " pending:" << _pending.size();
- output << " execRunable:" << _isExecutorRunnable;
- return output;
+std::string NetworkInterfaceImpl::getDiagnosticString() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ str::stream output;
+ output << "NetworkImpl";
+ output << " threads:" << _threads.size();
+ output << " inShutdown:" << _inShutdown;
+ output << " active:" << _numActiveNetworkRequests;
+ output << " pending:" << _pending.size();
+ output << " execRunable:" << _isExecutorRunnable;
+ return output;
+}
+void NetworkInterfaceImpl::_startNewNetworkThread_inlock() {
+ if (_inShutdown) {
+ LOG(1) << "Not starting new replication networking thread while shutting down replication.";
+ return;
}
-
- void NetworkInterfaceImpl::_startNewNetworkThread_inlock() {
- if (_inShutdown) {
- LOG(1) <<
- "Not starting new replication networking thread while shutting down replication.";
- return;
- }
- if (_threads.size() >= kMaxThreads) {
- LOG(1) << "Not starting new replication networking thread because " << kMaxThreads <<
- " are already running; " << _numIdleThreads << " threads are idle and " <<
- _pending.size() << " network requests are waiting for a thread to serve them.";
- return;
- }
- const std::string threadName(str::stream() << "ReplExecNetThread-" << _nextThreadId++);
- try {
- _threads.push_back(
- std::make_shared<stdx::thread>(
- stdx::bind(&NetworkInterfaceImpl::_requestProcessorThreadBody,
- this,
- threadName)));
- ++_numIdleThreads;
- }
- catch (const std::exception& ex) {
- error() << "Failed to start " << threadName << "; " << _threads.size() <<
- " other network threads still running; caught exception: " << ex.what();
- }
+ if (_threads.size() >= kMaxThreads) {
+ LOG(1) << "Not starting new replication networking thread because " << kMaxThreads
+ << " are already running; " << _numIdleThreads << " threads are idle and "
+ << _pending.size() << " network requests are waiting for a thread to serve them.";
+ return;
}
-
- void NetworkInterfaceImpl::startup() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(!_inShutdown);
- if (!_threads.empty()) {
- return;
- }
- for (size_t i = 0; i < kMinThreads; ++i) {
- _startNewNetworkThread_inlock();
- }
+ const std::string threadName(str::stream() << "ReplExecNetThread-" << _nextThreadId++);
+ try {
+ _threads.push_back(std::make_shared<stdx::thread>(
+ stdx::bind(&NetworkInterfaceImpl::_requestProcessorThreadBody, this, threadName)));
+ ++_numIdleThreads;
+ } catch (const std::exception& ex) {
+ error() << "Failed to start " << threadName << "; " << _threads.size()
+ << " other network threads still running; caught exception: " << ex.what();
}
+}
- void NetworkInterfaceImpl::shutdown() {
- using std::swap;
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _inShutdown = true;
- _hasPending.notify_all();
- ThreadList threadsToJoin;
- swap(threadsToJoin, _threads);
- lk.unlock();
- _commandRunner.shutdown();
- std::for_each(threadsToJoin.begin(),
- threadsToJoin.end(),
- stdx::bind(&stdx::thread::join, stdx::placeholders::_1));
+void NetworkInterfaceImpl::startup() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(!_inShutdown);
+ if (!_threads.empty()) {
+ return;
}
-
- void NetworkInterfaceImpl::signalWorkAvailable() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _signalWorkAvailable_inlock();
+ for (size_t i = 0; i < kMinThreads; ++i) {
+ _startNewNetworkThread_inlock();
}
+}
- void NetworkInterfaceImpl::_signalWorkAvailable_inlock() {
- if (!_isExecutorRunnable) {
- _isExecutorRunnable = true;
- _isExecutorRunnableCondition.notify_one();
- }
+void NetworkInterfaceImpl::shutdown() {
+ using std::swap;
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _inShutdown = true;
+ _hasPending.notify_all();
+ ThreadList threadsToJoin;
+ swap(threadsToJoin, _threads);
+ lk.unlock();
+ _commandRunner.shutdown();
+ std::for_each(threadsToJoin.begin(),
+ threadsToJoin.end(),
+ stdx::bind(&stdx::thread::join, stdx::placeholders::_1));
+}
+
+void NetworkInterfaceImpl::signalWorkAvailable() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _signalWorkAvailable_inlock();
+}
+
+void NetworkInterfaceImpl::_signalWorkAvailable_inlock() {
+ if (!_isExecutorRunnable) {
+ _isExecutorRunnable = true;
+ _isExecutorRunnableCondition.notify_one();
}
+}
- void NetworkInterfaceImpl::waitForWork() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (!_isExecutorRunnable) {
- _isExecutorRunnableCondition.wait(lk);
- }
- _isExecutorRunnable = false;
+void NetworkInterfaceImpl::waitForWork() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (!_isExecutorRunnable) {
+ _isExecutorRunnableCondition.wait(lk);
}
+ _isExecutorRunnable = false;
+}
- void NetworkInterfaceImpl::waitForWorkUntil(Date_t when) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (!_isExecutorRunnable) {
- const Milliseconds waitTime(when - now());
- if (waitTime <= Milliseconds(0)) {
- break;
- }
- _isExecutorRunnableCondition.wait_for(lk, waitTime);
+void NetworkInterfaceImpl::waitForWorkUntil(Date_t when) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (!_isExecutorRunnable) {
+ const Milliseconds waitTime(when - now());
+ if (waitTime <= Milliseconds(0)) {
+ break;
}
- _isExecutorRunnable = false;
+ _isExecutorRunnableCondition.wait_for(lk, waitTime);
}
+ _isExecutorRunnable = false;
+}
- void NetworkInterfaceImpl::_requestProcessorThreadBody(
- NetworkInterfaceImpl* net,
- const std::string& threadName) {
- setThreadName(threadName);
- LOG(1) << "thread starting";
- net->_consumeNetworkRequests();
+void NetworkInterfaceImpl::_requestProcessorThreadBody(NetworkInterfaceImpl* net,
+ const std::string& threadName) {
+ setThreadName(threadName);
+ LOG(1) << "thread starting";
+ net->_consumeNetworkRequests();
- // At this point, another thread may have destroyed "net", if this thread chose to detach
- // itself and remove itself from net->_threads before releasing net->_mutex. Do not access
- // member variables of "net" from here, on.
- LOG(1) << "thread shutting down";
- }
+ // At this point, another thread may have destroyed "net", if this thread chose to detach
+ // itself and remove itself from net->_threads before releasing net->_mutex. Do not access
+ // member variables of "net" from here, on.
+ LOG(1) << "thread shutting down";
+}
- void NetworkInterfaceImpl::_consumeNetworkRequests() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (!_inShutdown) {
- if (_pending.empty()) {
- if (_threads.size() > kMinThreads) {
- const Date_t nowDate = now();
- const Date_t nextThreadRetirementDate =
- _lastFullUtilizationDate + kMaxIdleThreadAge;
- if (nowDate > nextThreadRetirementDate) {
- _lastFullUtilizationDate = nowDate;
- break;
- }
+void NetworkInterfaceImpl::_consumeNetworkRequests() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (!_inShutdown) {
+ if (_pending.empty()) {
+ if (_threads.size() > kMinThreads) {
+ const Date_t nowDate = now();
+ const Date_t nextThreadRetirementDate =
+ _lastFullUtilizationDate + kMaxIdleThreadAge;
+ if (nowDate > nextThreadRetirementDate) {
+ _lastFullUtilizationDate = nowDate;
+ break;
}
- _hasPending.wait_for(lk, kMaxIdleThreadAge);
- continue;
}
- CommandData todo = _pending.front();
- _pending.pop_front();
- ++_numActiveNetworkRequests;
- --_numIdleThreads;
- lk.unlock();
- TaskExecutor::ResponseStatus result = _commandRunner.runCommand(todo.request);
- LOG(2) << "Network status of sending " << todo.request.cmdObj.firstElementFieldName() <<
- " to " << todo.request.target << " was " << result.getStatus();
- todo.onFinish(result);
- lk.lock();
- --_numActiveNetworkRequests;
- ++_numIdleThreads;
- _signalWorkAvailable_inlock();
+ _hasPending.wait_for(lk, kMaxIdleThreadAge);
+ continue;
}
+ CommandData todo = _pending.front();
+ _pending.pop_front();
+ ++_numActiveNetworkRequests;
--_numIdleThreads;
- if (_inShutdown) {
- return;
- }
- // This thread is ending because it was idle for too long.
- // Find self in _threads, remove self from _threads, detach self.
- for (size_t i = 0; i < _threads.size(); ++i) {
- if (_threads[i]->get_id() != stdx::this_thread::get_id()) {
- continue;
- }
- _threads[i]->detach();
- _threads[i].swap(_threads.back());
- _threads.pop_back();
- return;
+ lk.unlock();
+ TaskExecutor::ResponseStatus result = _commandRunner.runCommand(todo.request);
+ LOG(2) << "Network status of sending " << todo.request.cmdObj.firstElementFieldName()
+ << " to " << todo.request.target << " was " << result.getStatus();
+ todo.onFinish(result);
+ lk.lock();
+ --_numActiveNetworkRequests;
+ ++_numIdleThreads;
+ _signalWorkAvailable_inlock();
+ }
+ --_numIdleThreads;
+ if (_inShutdown) {
+ return;
+ }
+ // This thread is ending because it was idle for too long.
+ // Find self in _threads, remove self from _threads, detach self.
+ for (size_t i = 0; i < _threads.size(); ++i) {
+ if (_threads[i]->get_id() != stdx::this_thread::get_id()) {
+ continue;
}
- severe().stream() << "Could not find this thread, with id " <<
- stdx::this_thread::get_id() << " in the replication networking thread pool";
- fassertFailedNoTrace(28676);
+ _threads[i]->detach();
+ _threads[i].swap(_threads.back());
+ _threads.pop_back();
+ return;
}
+ severe().stream() << "Could not find this thread, with id " << stdx::this_thread::get_id()
+ << " in the replication networking thread pool";
+ fassertFailedNoTrace(28676);
+}
- void NetworkInterfaceImpl::startCommand(
- const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
- LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " <<
- request.target;
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _pending.push_back(CommandData());
- CommandData& cd = _pending.back();
- cd.cbHandle = cbHandle;
- cd.request = request;
- cd.onFinish = onFinish;
- if (_numIdleThreads < _pending.size()) {
- _startNewNetworkThread_inlock();
- }
- if (_numIdleThreads <= _pending.size()) {
- _lastFullUtilizationDate = Date_t::now();
- }
- _hasPending.notify_one();
+void NetworkInterfaceImpl::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish) {
+ LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " << request.target;
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _pending.push_back(CommandData());
+ CommandData& cd = _pending.back();
+ cd.cbHandle = cbHandle;
+ cd.request = request;
+ cd.onFinish = onFinish;
+ if (_numIdleThreads < _pending.size()) {
+ _startNewNetworkThread_inlock();
+ }
+ if (_numIdleThreads <= _pending.size()) {
+ _lastFullUtilizationDate = Date_t::now();
}
+ _hasPending.notify_one();
+}
- void NetworkInterfaceImpl::cancelCommand(
- const TaskExecutor::CallbackHandle& cbHandle) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- CommandDataList::iterator iter;
- for (iter = _pending.begin(); iter != _pending.end(); ++iter) {
- if (iter->cbHandle == cbHandle) {
- break;
- }
+void NetworkInterfaceImpl::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ CommandDataList::iterator iter;
+ for (iter = _pending.begin(); iter != _pending.end(); ++iter) {
+ if (iter->cbHandle == cbHandle) {
+ break;
}
- if (iter == _pending.end()) {
- return;
- }
- const RemoteCommandCompletionFn onFinish = iter->onFinish;
- LOG(2) << "Canceled sending " << iter->request.cmdObj.firstElementFieldName() << " to " <<
- iter->request.target;
- _pending.erase(iter);
- lk.unlock();
- onFinish(TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled, "Callback canceled"));
- lk.lock();
- _signalWorkAvailable_inlock();
}
-
- Date_t NetworkInterfaceImpl::now() {
- return Date_t::now();
+ if (iter == _pending.end()) {
+ return;
}
+ const RemoteCommandCompletionFn onFinish = iter->onFinish;
+ LOG(2) << "Canceled sending " << iter->request.cmdObj.firstElementFieldName() << " to "
+ << iter->request.target;
+ _pending.erase(iter);
+ lk.unlock();
+ onFinish(TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled, "Callback canceled"));
+ lk.lock();
+ _signalWorkAvailable_inlock();
+}
+
+Date_t NetworkInterfaceImpl::now() {
+ return Date_t::now();
+}
-} // namespace executor
-} // namespace mongo
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/network_interface_impl.h b/src/mongo/executor/network_interface_impl.h
index 3744bf808b9..6e29d09263f 100644
--- a/src/mongo/executor/network_interface_impl.h
+++ b/src/mongo/executor/network_interface_impl.h
@@ -41,123 +41,121 @@
namespace mongo {
namespace executor {
+/**
+ * Implementation of the network interface for use by classes implementing TaskExecutor
+ * inside mongod.
+ *
+ * This implementation manages a dynamically sized group of worker threads for performing
+ * network operations. The minimum and maximum number of threads is set at compile time, and
+ * the exact number of threads is adjusted dynamically, using the following two rules.
+ *
+ * 1.) If the number of worker threads is less than the maximum, there are no idle worker
+ * threads, and the client enqueues a new network operation via startCommand(), the network
+ * interface spins up a new worker thread. This decision is made on the assumption that
+ * spinning up a new thread is faster than the round-trip time for processing a remote command,
+ * and so this will minimize wait time.
+ *
+ * 2.) If the number of worker threads has exceeded the the peak number of scheduled outstanding
+ * network commands continuously for a period of time (kMaxIdleThreadAge), one thread is retired
+ * from the pool and the monitoring of idle threads is reset. This means that at most one
+ * thread retires every kMaxIdleThreadAge units of time. The value of kMaxIdleThreadAge is set
+ * to be much larger than the expected frequency of new requests, averaging out short-duration
+ * periods of idleness, as occur between heartbeats.
+ *
+ * The implementation also manages a pool of network connections to recently contacted remote
+ * nodes. The size of this pool is not bounded, but connections are retired unconditionally
+ * after they have been connected for a certain maximum period.
+ * TODO(spencer): Rename this to ThreadPoolNetworkInterface
+ */
+class NetworkInterfaceImpl : public NetworkInterface {
+public:
+ NetworkInterfaceImpl();
+ virtual ~NetworkInterfaceImpl();
+ virtual std::string getDiagnosticString();
+ virtual void startup();
+ virtual void shutdown();
+ virtual void waitForWork();
+ virtual void waitForWorkUntil(Date_t when);
+ virtual void signalWorkAvailable();
+ virtual Date_t now();
+ virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish);
+ virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle);
+
+private:
/**
- * Implementation of the network interface for use by classes implementing TaskExecutor
- * inside mongod.
- *
- * This implementation manages a dynamically sized group of worker threads for performing
- * network operations. The minimum and maximum number of threads is set at compile time, and
- * the exact number of threads is adjusted dynamically, using the following two rules.
- *
- * 1.) If the number of worker threads is less than the maximum, there are no idle worker
- * threads, and the client enqueues a new network operation via startCommand(), the network
- * interface spins up a new worker thread. This decision is made on the assumption that
- * spinning up a new thread is faster than the round-trip time for processing a remote command,
- * and so this will minimize wait time.
- *
- * 2.) If the number of worker threads has exceeded the the peak number of scheduled outstanding
- * network commands continuously for a period of time (kMaxIdleThreadAge), one thread is retired
- * from the pool and the monitoring of idle threads is reset. This means that at most one
- * thread retires every kMaxIdleThreadAge units of time. The value of kMaxIdleThreadAge is set
- * to be much larger than the expected frequency of new requests, averaging out short-duration
- * periods of idleness, as occur between heartbeats.
- *
- * The implementation also manages a pool of network connections to recently contacted remote
- * nodes. The size of this pool is not bounded, but connections are retired unconditionally
- * after they have been connected for a certain maximum period.
- * TODO(spencer): Rename this to ThreadPoolNetworkInterface
+ * Information describing an in-flight command.
*/
- class NetworkInterfaceImpl : public NetworkInterface {
- public:
- NetworkInterfaceImpl();
- virtual ~NetworkInterfaceImpl();
- virtual std::string getDiagnosticString();
- virtual void startup();
- virtual void shutdown();
- virtual void waitForWork();
- virtual void waitForWorkUntil(Date_t when);
- virtual void signalWorkAvailable();
- virtual Date_t now();
- virtual void startCommand(
- const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish);
- virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle);
-
- private:
-
- /**
- * Information describing an in-flight command.
- */
- struct CommandData {
- TaskExecutor::CallbackHandle cbHandle;
- RemoteCommandRequest request;
- RemoteCommandCompletionFn onFinish;
- };
- typedef stdx::list<CommandData> CommandDataList;
- typedef std::vector<std::shared_ptr<stdx::thread> > ThreadList;
-
- /**
- * Thread body for threads that synchronously perform network requests from
- * the _pending list.
- */
- static void _requestProcessorThreadBody(NetworkInterfaceImpl* net,
- const std::string& threadName);
-
- /**
- * Run loop that iteratively consumes network requests in a request processor thread.
- */
- void _consumeNetworkRequests();
-
- /**
- * Notifies the network threads that there is work available.
- */
- void _signalWorkAvailable_inlock();
-
- /**
- * Starts a new network thread.
- */
- void _startNewNetworkThread_inlock();
-
- // Mutex guarding the state of this network interface, except for the remote command
- // executor, which has its own concurrency control.
- stdx::mutex _mutex;
-
- // Condition signaled to indicate that there is work in the _pending queue.
- stdx::condition_variable _hasPending;
-
- // Queue of yet-to-be-executed network operations.
- CommandDataList _pending;
-
- // List of threads serving as the worker pool.
- ThreadList _threads;
-
- // Count of idle threads.
- size_t _numIdleThreads;
-
- // Id counter for assigning thread names
- size_t _nextThreadId;
-
- // The last time that _pending.size() + _numActiveNetworkRequests grew to be at least
- // _threads.size().
- Date_t _lastFullUtilizationDate;
-
- // Condition signaled to indicate that the executor, blocked in waitForWorkUntil or
- // waitForWork, should wake up.
- stdx::condition_variable _isExecutorRunnableCondition;
-
- // Flag indicating whether or not the executor associated with this interface is runnable.
- bool _isExecutorRunnable;
-
- // Flag indicating when this interface is being shut down (because shutdown() has executed).
- bool _inShutdown;
-
- // Interface for running remote commands
- RemoteCommandRunnerImpl _commandRunner;
-
- // Number of active network requests
- size_t _numActiveNetworkRequests;
+ struct CommandData {
+ TaskExecutor::CallbackHandle cbHandle;
+ RemoteCommandRequest request;
+ RemoteCommandCompletionFn onFinish;
};
+ typedef stdx::list<CommandData> CommandDataList;
+ typedef std::vector<std::shared_ptr<stdx::thread>> ThreadList;
+
+ /**
+ * Thread body for threads that synchronously perform network requests from
+ * the _pending list.
+ */
+ static void _requestProcessorThreadBody(NetworkInterfaceImpl* net,
+ const std::string& threadName);
+
+ /**
+ * Run loop that iteratively consumes network requests in a request processor thread.
+ */
+ void _consumeNetworkRequests();
+
+ /**
+ * Notifies the network threads that there is work available.
+ */
+ void _signalWorkAvailable_inlock();
+
+ /**
+ * Starts a new network thread.
+ */
+ void _startNewNetworkThread_inlock();
+
+ // Mutex guarding the state of this network interface, except for the remote command
+ // executor, which has its own concurrency control.
+ stdx::mutex _mutex;
+
+ // Condition signaled to indicate that there is work in the _pending queue.
+ stdx::condition_variable _hasPending;
+
+ // Queue of yet-to-be-executed network operations.
+ CommandDataList _pending;
+
+ // List of threads serving as the worker pool.
+ ThreadList _threads;
+
+ // Count of idle threads.
+ size_t _numIdleThreads;
+
+ // Id counter for assigning thread names
+ size_t _nextThreadId;
+
+ // The last time that _pending.size() + _numActiveNetworkRequests grew to be at least
+ // _threads.size().
+ Date_t _lastFullUtilizationDate;
+
+ // Condition signaled to indicate that the executor, blocked in waitForWorkUntil or
+ // waitForWork, should wake up.
+ stdx::condition_variable _isExecutorRunnableCondition;
+
+ // Flag indicating whether or not the executor associated with this interface is runnable.
+ bool _isExecutorRunnable;
+
+ // Flag indicating when this interface is being shut down (because shutdown() has executed).
+ bool _inShutdown;
+
+ // Interface for running remote commands
+ RemoteCommandRunnerImpl _commandRunner;
+
+ // Number of active network requests
+ size_t _numActiveNetworkRequests;
+};
-} // namespace executor
-} // namespace mongo
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 6f13f42afd5..cee26e4bdc0 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -36,364 +36,350 @@
namespace mongo {
namespace executor {
- NetworkInterfaceMock::NetworkInterfaceMock()
- : _waitingToRunMask(0),
- _currentlyRunning(kNoThread),
- _now(fassertStatusOK(18653, dateFromISOString("2014-08-01T00:00:00Z"))),
- _hasStarted(false),
- _inShutdown(false),
- _executorNextWakeupDate(Date_t::max()) {
+NetworkInterfaceMock::NetworkInterfaceMock()
+ : _waitingToRunMask(0),
+ _currentlyRunning(kNoThread),
+ _now(fassertStatusOK(18653, dateFromISOString("2014-08-01T00:00:00Z"))),
+ _hasStarted(false),
+ _inShutdown(false),
+ _executorNextWakeupDate(Date_t::max()) {}
+
+NetworkInterfaceMock::~NetworkInterfaceMock() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ invariant(!_hasStarted || _inShutdown);
+ invariant(_scheduled.empty());
+ invariant(_blackHoled.empty());
+}
+
+std::string NetworkInterfaceMock::getDiagnosticString() {
+ // TODO something better.
+ return "NetworkInterfaceMock diagnostics here";
+}
+
+Date_t NetworkInterfaceMock::now() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _now_inlock();
+}
+
+void NetworkInterfaceMock::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(!_inShutdown);
+ const Date_t now = _now_inlock();
+ NetworkOperationIterator insertBefore = _unscheduled.begin();
+ while ((insertBefore != _unscheduled.end()) &&
+ (insertBefore->getNextConsiderationDate() <= now)) {
+ ++insertBefore;
}
-
- NetworkInterfaceMock::~NetworkInterfaceMock() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(!_hasStarted || _inShutdown);
- invariant(_scheduled.empty());
- invariant(_blackHoled.empty());
+ _unscheduled.insert(insertBefore, NetworkOperation(cbHandle, request, now, onFinish));
+}
+
+static bool findAndCancelIf(
+ const stdx::function<bool(const NetworkInterfaceMock::NetworkOperation&)>& matchFn,
+ NetworkInterfaceMock::NetworkOperationList* other,
+ NetworkInterfaceMock::NetworkOperationList* scheduled,
+ const Date_t now) {
+ const NetworkInterfaceMock::NetworkOperationIterator noi =
+ std::find_if(other->begin(), other->end(), matchFn);
+ if (noi == other->end()) {
+ return false;
}
-
- std::string NetworkInterfaceMock::getDiagnosticString() {
- // TODO something better.
- return "NetworkInterfaceMock diagnostics here";
+ scheduled->splice(scheduled->begin(), *other, noi);
+ noi->setResponse(
+ now,
+ TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled, "Network operation canceled"));
+ return true;
+}
+
+void NetworkInterfaceMock::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(!_inShutdown);
+ stdx::function<bool(const NetworkOperation&)> matchesHandle =
+ stdx::bind(&NetworkOperation::isForCallback, stdx::placeholders::_1, cbHandle);
+ const Date_t now = _now_inlock();
+ if (findAndCancelIf(matchesHandle, &_unscheduled, &_scheduled, now)) {
+ return;
}
-
- Date_t NetworkInterfaceMock::now() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _now_inlock();
+ if (findAndCancelIf(matchesHandle, &_blackHoled, &_scheduled, now)) {
+ return;
}
-
- void NetworkInterfaceMock::startCommand(
- const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(!_inShutdown);
- const Date_t now = _now_inlock();
- NetworkOperationIterator insertBefore = _unscheduled.begin();
- while ((insertBefore != _unscheduled.end()) &&
- (insertBefore->getNextConsiderationDate() <= now)) {
-
- ++insertBefore;
- }
- _unscheduled.insert(insertBefore, NetworkOperation(cbHandle, request, now, onFinish));
+ if (findAndCancelIf(matchesHandle, &_scheduled, &_scheduled, now)) {
+ return;
}
-
- static bool findAndCancelIf(
- const stdx::function<bool (const NetworkInterfaceMock::NetworkOperation&)>& matchFn,
- NetworkInterfaceMock::NetworkOperationList* other,
- NetworkInterfaceMock::NetworkOperationList* scheduled,
- const Date_t now) {
- const NetworkInterfaceMock::NetworkOperationIterator noi =
- std::find_if(other->begin(), other->end(), matchFn);
- if (noi == other->end()) {
- return false;
- }
- scheduled->splice(scheduled->begin(), *other, noi);
- noi->setResponse(now, TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled,
- "Network operation canceled"));
- return true;
+ // No not-in-progress network command matched cbHandle. Oh, well.
+}
+
+void NetworkInterfaceMock::startup() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(!_hasStarted);
+ _hasStarted = true;
+ _inShutdown = false;
+ invariant(_currentlyRunning == kNoThread);
+ _currentlyRunning = kExecutorThread;
+}
+
+void NetworkInterfaceMock::shutdown() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ invariant(_hasStarted);
+ invariant(!_inShutdown);
+ _inShutdown = true;
+ NetworkOperationList todo;
+ todo.splice(todo.end(), _scheduled);
+ todo.splice(todo.end(), _unscheduled);
+ todo.splice(todo.end(), _processing);
+ todo.splice(todo.end(), _blackHoled);
+
+ const Date_t now = _now_inlock();
+ _waitingToRunMask |= kExecutorThread; // Prevents network thread from scheduling.
+ lk.unlock();
+ for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) {
+ iter->setResponse(now,
+ TaskExecutor::ResponseStatus(ErrorCodes::ShutdownInProgress,
+ "Shutting down mock network"));
+ iter->finishResponse();
}
-
- void NetworkInterfaceMock::cancelCommand(
- const TaskExecutor::CallbackHandle& cbHandle) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(!_inShutdown);
- stdx::function<bool (const NetworkOperation&)> matchesHandle = stdx::bind(
- &NetworkOperation::isForCallback,
- stdx::placeholders::_1,
- cbHandle);
- const Date_t now = _now_inlock();
- if (findAndCancelIf(matchesHandle, &_unscheduled, &_scheduled, now)) {
- return;
- }
- if (findAndCancelIf(matchesHandle, &_blackHoled, &_scheduled, now)) {
- return;
- }
- if (findAndCancelIf(matchesHandle, &_scheduled, &_scheduled, now)) {
- return;
- }
- // No not-in-progress network command matched cbHandle. Oh, well.
- }
-
- void NetworkInterfaceMock::startup() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(!_hasStarted);
- _hasStarted = true;
- _inShutdown = false;
- invariant(_currentlyRunning == kNoThread);
- _currentlyRunning = kExecutorThread;
- }
-
- void NetworkInterfaceMock::shutdown() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_hasStarted);
- invariant(!_inShutdown);
- _inShutdown = true;
- NetworkOperationList todo;
- todo.splice(todo.end(), _scheduled);
- todo.splice(todo.end(), _unscheduled);
- todo.splice(todo.end(), _processing);
- todo.splice(todo.end(), _blackHoled);
-
- const Date_t now = _now_inlock();
- _waitingToRunMask |= kExecutorThread; // Prevents network thread from scheduling.
- lk.unlock();
- for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) {
- iter->setResponse(now, TaskExecutor::ResponseStatus(ErrorCodes::ShutdownInProgress,
- "Shutting down mock network"));
- iter->finishResponse();
- }
- lk.lock();
- invariant(_currentlyRunning == kExecutorThread);
- _currentlyRunning = kNoThread;
- _waitingToRunMask = kNetworkThread;
- _shouldWakeNetworkCondition.notify_one();
+ lk.lock();
+ invariant(_currentlyRunning == kExecutorThread);
+ _currentlyRunning = kNoThread;
+ _waitingToRunMask = kNetworkThread;
+ _shouldWakeNetworkCondition.notify_one();
+}
+
+void NetworkInterfaceMock::enterNetwork() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (!_isNetworkThreadRunnable_inlock()) {
+ _shouldWakeNetworkCondition.wait(lk);
}
-
- void NetworkInterfaceMock::enterNetwork() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (!_isNetworkThreadRunnable_inlock()) {
- _shouldWakeNetworkCondition.wait(lk);
- }
- _currentlyRunning = kNetworkThread;
- _waitingToRunMask &= ~kNetworkThread;
+ _currentlyRunning = kNetworkThread;
+ _waitingToRunMask &= ~kNetworkThread;
+}
+
+void NetworkInterfaceMock::exitNetwork() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_currentlyRunning != kNetworkThread) {
+ return;
}
-
- void NetworkInterfaceMock::exitNetwork() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_currentlyRunning != kNetworkThread) {
- return;
- }
- _currentlyRunning = kNoThread;
- if (_isExecutorThreadRunnable_inlock()) {
- _shouldWakeExecutorCondition.notify_one();
- }
- _waitingToRunMask |= kNetworkThread;
+ _currentlyRunning = kNoThread;
+ if (_isExecutorThreadRunnable_inlock()) {
+ _shouldWakeExecutorCondition.notify_one();
}
-
- bool NetworkInterfaceMock::hasReadyRequests() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- return _hasReadyRequests_inlock();
+ _waitingToRunMask |= kNetworkThread;
+}
+
+bool NetworkInterfaceMock::hasReadyRequests() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_currentlyRunning == kNetworkThread);
+ return _hasReadyRequests_inlock();
+}
+
+bool NetworkInterfaceMock::_hasReadyRequests_inlock() {
+ if (_unscheduled.empty())
+ return false;
+ if (_unscheduled.front().getNextConsiderationDate() > _now_inlock()) {
+ return false;
}
+ return true;
+}
- bool NetworkInterfaceMock::_hasReadyRequests_inlock() {
- if (_unscheduled.empty())
- return false;
- if (_unscheduled.front().getNextConsiderationDate() > _now_inlock()) {
- return false;
- }
- return true;
+NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNextReadyRequest() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ invariant(_currentlyRunning == kNetworkThread);
+ while (!_hasReadyRequests_inlock()) {
+ _waitingToRunMask |= kExecutorThread;
+ _runReadyNetworkOperations_inlock(&lk);
}
-
- NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNextReadyRequest() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- while (!_hasReadyRequests_inlock()) {
- _waitingToRunMask |= kExecutorThread;
- _runReadyNetworkOperations_inlock(&lk);
- }
- invariant(_hasReadyRequests_inlock());
- _processing.splice(_processing.begin(), _unscheduled, _unscheduled.begin());
- return _processing.begin();
+ invariant(_hasReadyRequests_inlock());
+ _processing.splice(_processing.begin(), _unscheduled, _unscheduled.begin());
+ return _processing.begin();
+}
+
+void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi,
+ Date_t when,
+ const TaskExecutor::ResponseStatus& response) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_currentlyRunning == kNetworkThread);
+ NetworkOperationIterator insertBefore = _scheduled.begin();
+ while ((insertBefore != _scheduled.end()) && (insertBefore->getResponseDate() <= when)) {
+ ++insertBefore;
}
-
- void NetworkInterfaceMock::scheduleResponse(
- NetworkOperationIterator noi,
- Date_t when,
- const TaskExecutor::ResponseStatus& response) {
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- NetworkOperationIterator insertBefore = _scheduled.begin();
- while ((insertBefore != _scheduled.end()) && (insertBefore->getResponseDate() <= when)) {
- ++insertBefore;
+ noi->setResponse(when, response);
+ _scheduled.splice(insertBefore, _processing, noi);
+}
+
+void NetworkInterfaceMock::blackHole(NetworkOperationIterator noi) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_currentlyRunning == kNetworkThread);
+ _blackHoled.splice(_blackHoled.end(), _processing, noi);
+}
+
+void NetworkInterfaceMock::requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_currentlyRunning == kNetworkThread);
+ invariant(noi->getNextConsiderationDate() < dontAskUntil);
+ invariant(_now_inlock() < dontAskUntil);
+ NetworkOperationIterator insertBefore = _unscheduled.begin();
+ for (; insertBefore != _unscheduled.end(); ++insertBefore) {
+ if (insertBefore->getNextConsiderationDate() >= dontAskUntil) {
+ break;
}
- noi->setResponse(when, response);
- _scheduled.splice(insertBefore, _processing, noi);
}
-
- void NetworkInterfaceMock::blackHole(NetworkOperationIterator noi) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- _blackHoled.splice(_blackHoled.end(), _processing, noi);
- }
-
- void NetworkInterfaceMock::requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- invariant(noi->getNextConsiderationDate() < dontAskUntil);
- invariant(_now_inlock() < dontAskUntil);
- NetworkOperationIterator insertBefore = _unscheduled.begin();
- for (; insertBefore != _unscheduled.end(); ++insertBefore) {
- if (insertBefore->getNextConsiderationDate() >= dontAskUntil) {
- break;
- }
+ noi->setNextConsiderationDate(dontAskUntil);
+ _unscheduled.splice(insertBefore, _processing, noi);
+}
+
+void NetworkInterfaceMock::runUntil(Date_t until) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ invariant(_currentlyRunning == kNetworkThread);
+ invariant(until > _now_inlock());
+ while (until > _now_inlock()) {
+ _runReadyNetworkOperations_inlock(&lk);
+ if (_hasReadyRequests_inlock()) {
+ break;
}
- noi->setNextConsiderationDate(dontAskUntil);
- _unscheduled.splice(insertBefore, _processing, noi);
- }
-
- void NetworkInterfaceMock::runUntil(Date_t until) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- invariant(until > _now_inlock());
- while (until > _now_inlock()) {
- _runReadyNetworkOperations_inlock(&lk);
- if (_hasReadyRequests_inlock()) {
- break;
- }
- Date_t newNow = _executorNextWakeupDate;
- if (!_scheduled.empty() && _scheduled.front().getResponseDate() < newNow) {
- newNow = _scheduled.front().getResponseDate();
- }
- if (until < newNow) {
- newNow = until;
- }
- invariant(_now_inlock() <= newNow);
- _now = newNow;
- _waitingToRunMask |= kExecutorThread;
+ Date_t newNow = _executorNextWakeupDate;
+ if (!_scheduled.empty() && _scheduled.front().getResponseDate() < newNow) {
+ newNow = _scheduled.front().getResponseDate();
}
- _runReadyNetworkOperations_inlock(&lk);
- }
-
- void NetworkInterfaceMock::runReadyNetworkOperations() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- _runReadyNetworkOperations_inlock(&lk);
- }
-
- void NetworkInterfaceMock::waitForWork() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_currentlyRunning == kExecutorThread);
- _waitForWork_inlock(&lk);
- }
-
- void NetworkInterfaceMock::waitForWorkUntil(Date_t when) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_currentlyRunning == kExecutorThread);
- _executorNextWakeupDate = when;
- if (_executorNextWakeupDate <= _now_inlock()) {
- return;
+ if (until < newNow) {
+ newNow = until;
}
- _waitForWork_inlock(&lk);
- }
-
- void NetworkInterfaceMock::signalWorkAvailable() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_now_inlock() <= newNow);
+ _now = newNow;
_waitingToRunMask |= kExecutorThread;
- if (_currentlyRunning == kNoThread) {
- _shouldWakeExecutorCondition.notify_one();
- }
}
+ _runReadyNetworkOperations_inlock(&lk);
+}
+
+void NetworkInterfaceMock::runReadyNetworkOperations() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ invariant(_currentlyRunning == kNetworkThread);
+ _runReadyNetworkOperations_inlock(&lk);
+}
+
+void NetworkInterfaceMock::waitForWork() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ invariant(_currentlyRunning == kExecutorThread);
+ _waitForWork_inlock(&lk);
+}
+
+void NetworkInterfaceMock::waitForWorkUntil(Date_t when) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ invariant(_currentlyRunning == kExecutorThread);
+ _executorNextWakeupDate = when;
+ if (_executorNextWakeupDate <= _now_inlock()) {
+ return;
+ }
+ _waitForWork_inlock(&lk);
+}
- void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(
- stdx::unique_lock<stdx::mutex>* lk) {
- while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) {
- invariant(_currentlyRunning == kNetworkThread);
- NetworkOperation op = _scheduled.front();
- _scheduled.pop_front();
- _waitingToRunMask |= kExecutorThread;
- lk->unlock();
- op.finishResponse();
- lk->lock();
- }
- invariant(_currentlyRunning == kNetworkThread);
- if (!(_waitingToRunMask & kExecutorThread)) {
- return;
- }
+void NetworkInterfaceMock::signalWorkAvailable() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _waitingToRunMask |= kExecutorThread;
+ if (_currentlyRunning == kNoThread) {
_shouldWakeExecutorCondition.notify_one();
- _currentlyRunning = kNoThread;
- while (!_isNetworkThreadRunnable_inlock()) {
- _shouldWakeNetworkCondition.wait(*lk);
- }
- _currentlyRunning = kNetworkThread;
- _waitingToRunMask &= ~kNetworkThread;
}
+}
- void NetworkInterfaceMock::_waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk) {
- if (_waitingToRunMask & kExecutorThread) {
- _waitingToRunMask &= ~kExecutorThread;
- return;
- }
- _currentlyRunning = kNoThread;
- while (!_isExecutorThreadRunnable_inlock()) {
- _waitingToRunMask |= kNetworkThread;
- _shouldWakeNetworkCondition.notify_one();
- _shouldWakeExecutorCondition.wait(*lk);
- }
- _currentlyRunning = kExecutorThread;
- _waitingToRunMask &= ~kExecutorThread;
+void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk) {
+ while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) {
+ invariant(_currentlyRunning == kNetworkThread);
+ NetworkOperation op = _scheduled.front();
+ _scheduled.pop_front();
+ _waitingToRunMask |= kExecutorThread;
+ lk->unlock();
+ op.finishResponse();
+ lk->lock();
}
-
- bool NetworkInterfaceMock::_isNetworkThreadRunnable_inlock() {
- if (_currentlyRunning != kNoThread) {
- return false;
- }
- if (_waitingToRunMask != kNetworkThread) {
- return false;
- }
- return true;
+ invariant(_currentlyRunning == kNetworkThread);
+ if (!(_waitingToRunMask & kExecutorThread)) {
+ return;
}
-
- bool NetworkInterfaceMock::_isExecutorThreadRunnable_inlock() {
- if (_currentlyRunning != kNoThread) {
- return false;
- }
- return _waitingToRunMask & kExecutorThread;
+ _shouldWakeExecutorCondition.notify_one();
+ _currentlyRunning = kNoThread;
+ while (!_isNetworkThreadRunnable_inlock()) {
+ _shouldWakeNetworkCondition.wait(*lk);
}
+ _currentlyRunning = kNetworkThread;
+ _waitingToRunMask &= ~kNetworkThread;
+}
- static const StatusWith<RemoteCommandResponse> kUnsetResponse(
- ErrorCodes::InternalError,
- "NetworkOperation::_response never set");
-
- NetworkInterfaceMock::NetworkOperation::NetworkOperation()
- : _requestDate(),
- _nextConsiderationDate(),
- _responseDate(),
- _request(),
- _response(kUnsetResponse),
- _onFinish() {
+void NetworkInterfaceMock::_waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk) {
+ if (_waitingToRunMask & kExecutorThread) {
+ _waitingToRunMask &= ~kExecutorThread;
+ return;
}
-
- NetworkInterfaceMock::NetworkOperation::NetworkOperation(
- const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& theRequest,
- Date_t theRequestDate,
- const RemoteCommandCompletionFn& onFinish)
- : _requestDate(theRequestDate),
- _nextConsiderationDate(theRequestDate),
- _responseDate(),
- _cbHandle(cbHandle),
- _request(theRequest),
- _response(kUnsetResponse),
- _onFinish(onFinish) {
+ _currentlyRunning = kNoThread;
+ while (!_isExecutorThreadRunnable_inlock()) {
+ _waitingToRunMask |= kNetworkThread;
+ _shouldWakeNetworkCondition.notify_one();
+ _shouldWakeExecutorCondition.wait(*lk);
}
+ _currentlyRunning = kExecutorThread;
+ _waitingToRunMask &= ~kExecutorThread;
+}
- NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {}
-
- void NetworkInterfaceMock::NetworkOperation::setNextConsiderationDate(
- Date_t nextConsiderationDate) {
-
- invariant(nextConsiderationDate > _nextConsiderationDate);
- _nextConsiderationDate = nextConsiderationDate;
+bool NetworkInterfaceMock::_isNetworkThreadRunnable_inlock() {
+ if (_currentlyRunning != kNoThread) {
+ return false;
}
-
- void NetworkInterfaceMock::NetworkOperation::setResponse(
- Date_t responseDate,
- const TaskExecutor::ResponseStatus& response) {
-
- invariant(responseDate >= _requestDate);
- _responseDate = responseDate;
- _response = response;
+ if (_waitingToRunMask != kNetworkThread) {
+ return false;
}
+ return true;
+}
- void NetworkInterfaceMock::NetworkOperation::finishResponse() {
- invariant(_onFinish);
- _onFinish(_response);
- _onFinish = RemoteCommandCompletionFn();
+bool NetworkInterfaceMock::_isExecutorThreadRunnable_inlock() {
+ if (_currentlyRunning != kNoThread) {
+ return false;
}
+ return _waitingToRunMask & kExecutorThread;
+}
+
+static const StatusWith<RemoteCommandResponse> kUnsetResponse(
+ ErrorCodes::InternalError, "NetworkOperation::_response never set");
+
+NetworkInterfaceMock::NetworkOperation::NetworkOperation()
+ : _requestDate(),
+ _nextConsiderationDate(),
+ _responseDate(),
+ _request(),
+ _response(kUnsetResponse),
+ _onFinish() {}
+
+NetworkInterfaceMock::NetworkOperation::NetworkOperation(
+ const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& theRequest,
+ Date_t theRequestDate,
+ const RemoteCommandCompletionFn& onFinish)
+ : _requestDate(theRequestDate),
+ _nextConsiderationDate(theRequestDate),
+ _responseDate(),
+ _cbHandle(cbHandle),
+ _request(theRequest),
+ _response(kUnsetResponse),
+ _onFinish(onFinish) {}
+
+NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {}
+
+void NetworkInterfaceMock::NetworkOperation::setNextConsiderationDate(
+ Date_t nextConsiderationDate) {
+ invariant(nextConsiderationDate > _nextConsiderationDate);
+ _nextConsiderationDate = nextConsiderationDate;
+}
+
+void NetworkInterfaceMock::NetworkOperation::setResponse(
+ Date_t responseDate, const TaskExecutor::ResponseStatus& response) {
+ invariant(responseDate >= _requestDate);
+ _responseDate = responseDate;
+ _response = response;
+}
+
+void NetworkInterfaceMock::NetworkOperation::finishResponse() {
+ invariant(_onFinish);
+ _onFinish(_response);
+ _onFinish = RemoteCommandCompletionFn();
+}
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index ff16f8eff6a..910566aa375 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -39,286 +39,291 @@
namespace mongo {
namespace executor {
+/**
+ * Mock network implementation for use in unit tests.
+ *
+ * To use, construct a new instance on the heap, and keep a pointer to it. Pass
+ * the pointer to the instance into the TaskExecutor constructor, transferring
+ * ownership. Start the executor's run() method in a separate thread, schedule the
+ * work you want to test into the executor, then while the test is still going, iterate
+ * through the ready network requests, servicing them and advancing time as needed.
+ *
+ * The mock has a fully virtualized notion of time and the the network. When the
+ * executor under test schedules a network operation, the startCommand
+ * method of this class adds an entry to the _unscheduled queue for immediate consideration.
+ * The test driver loop, when it examines the request, may schedule a response, ask the
+ * interface to redeliver the request at a later virtual time, or to swallow the virtual
+ * request until the end of the simulation. The test driver loop can also instruct the
+ * interface to run forward through virtual time until there are operations ready to
+ * consider, via runUntil.
+ *
+ * The thread acting as the "network" and the executor run thread are highly synchronized
+ * by this code, allowing for deterministic control of operation interleaving.
+ */
+class NetworkInterfaceMock : public NetworkInterface {
+public:
+ class NetworkOperation;
+ typedef stdx::list<NetworkOperation> NetworkOperationList;
+ typedef NetworkOperationList::iterator NetworkOperationIterator;
+
+ NetworkInterfaceMock();
+ virtual ~NetworkInterfaceMock();
+ virtual std::string getDiagnosticString();
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // NetworkInterface methods
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ virtual void startup();
+ virtual void shutdown();
+ virtual void waitForWork();
+ virtual void waitForWorkUntil(Date_t when);
+ virtual void signalWorkAvailable();
+ virtual Date_t now();
+ virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish);
+ virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle);
+
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Methods for simulating network operations and the passage of time.
+ //
+ // Methods in this section are to be called by the thread currently simulating
+ // the network.
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Causes the currently running (non-executor) thread to assume the mantle of the network
+ * simulation thread.
+ *
+ * Call this before calling any of the other methods in this section.
+ */
+ void enterNetwork();
+
+ /**
+ * Causes the currently running thread to drop the mantle of "network simulation thread".
+ *
+ * Call this before calling any methods that might block waiting for the
+ * executor thread.
+ */
+ void exitNetwork();
+
+ /**
+ * Returns true if there are unscheduled network requests to be processed.
+ */
+ bool hasReadyRequests();
+
+ /**
+ * Gets the next unscheduled request to process, blocking until one is available.
+ *
+ * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
+ */
+ NetworkOperationIterator getNextReadyRequest();
+
+ /**
+ * Schedules "response" in response to "noi" at virtual time "when".
+ */
+ void scheduleResponse(NetworkOperationIterator noi,
+ Date_t when,
+ const TaskExecutor::ResponseStatus& response);
+
+ /**
+ * Swallows "noi", causing the network interface to not respond to it until
+ * shutdown() is called.
+ */
+ void blackHole(NetworkOperationIterator noi);
+
+ /**
+ * Defers decision making on "noi" until virtual time "dontAskUntil". Use
+ * this when getNextReadyRequest() returns a request you want to deal with
+ * after looking at other requests.
+ */
+ void requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil);
+
/**
- * Mock network implementation for use in unit tests.
+ * Runs the simulator forward until now() == until or hasReadyRequests() is true.
*
- * To use, construct a new instance on the heap, and keep a pointer to it. Pass
- * the pointer to the instance into the TaskExecutor constructor, transferring
- * ownership. Start the executor's run() method in a separate thread, schedule the
- * work you want to test into the executor, then while the test is still going, iterate
- * through the ready network requests, servicing them and advancing time as needed.
+ * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
+ */
+ void runUntil(Date_t until);
+
+ /**
+ * Processes all ready, scheduled network operations.
*
- * The mock has a fully virtualized notion of time and the the network. When the
- * executor under test schedules a network operation, the startCommand
- * method of this class adds an entry to the _unscheduled queue for immediate consideration.
- * The test driver loop, when it examines the request, may schedule a response, ask the
- * interface to redeliver the request at a later virtual time, or to swallow the virtual
- * request until the end of the simulation. The test driver loop can also instruct the
- * interface to run forward through virtual time until there are operations ready to
- * consider, via runUntil.
+ * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
+ */
+ void runReadyNetworkOperations();
+
+private:
+ /**
+ * Type used to identify which thread (network mock or executor) is currently executing.
*
- * The thread acting as the "network" and the executor run thread are highly synchronized
- * by this code, allowing for deterministic control of operation interleaving.
+ * Values are used in a bitmask, as well.
+ */
+ enum ThreadType { kNoThread = 0, kExecutorThread = 1, kNetworkThread = 2 };
+
+ /**
+ * Returns the current virtualized time.
+ */
+ Date_t _now_inlock() const {
+ return _now;
+ }
+
+ /**
+ * Implementation of waitForWork*.
+ */
+ void _waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk);
+
+ /**
+ * Returns true if there are ready requests for the network thread to service.
+ */
+ bool _hasReadyRequests_inlock();
+
+ /**
+ * Returns true if the network thread could run right now.
+ */
+ bool _isNetworkThreadRunnable_inlock();
+
+ /**
+ * Returns true if the executor thread could run right now.
+ */
+ bool _isExecutorThreadRunnable_inlock();
+
+ /**
+ * Runs all ready network operations, called while holding "lk". May drop and
+ * reaquire "lk" several times, but will not return until the executor has blocked
+ * in waitFor*.
+ */
+ void _runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk);
+
+ // Mutex that synchronizes access to mutable data in this class and its subclasses.
+ // Fields guarded by the mutex are labled (M), below, and those that are read-only
+ // in multi-threaded execution, and so unsynchronized, are labeled (R).
+ stdx::mutex _mutex;
+
+ // Condition signaled to indicate that the network processing thread should wake up.
+ stdx::condition_variable _shouldWakeNetworkCondition; // (M)
+
+ // Condition signaled to indicate that the executor run thread should wake up.
+ stdx::condition_variable _shouldWakeExecutorCondition; // (M)
+
+ // Bitmask indicating which threads are runnable.
+ int _waitingToRunMask; // (M)
+
+ // Indicator of which thread, if any, is currently running.
+ ThreadType _currentlyRunning; // (M)
+
+ // The current time reported by this instance of NetworkInterfaceMock.
+ Date_t _now; // (M)
+
+ // Set to true by "startUp()"
+ bool _hasStarted; // (M)
+
+ // Set to true by "shutDown()".
+ bool _inShutdown; // (M)
+
+ // Next date that the executor expects to wake up at (due to a scheduleWorkAt() call).
+ Date_t _executorNextWakeupDate; // (M)
+
+ // List of network operations whose responses haven't been scheduled or blackholed. This is
+ // where network requests are first queued. It is sorted by
+ // NetworkOperation::_nextConsiderationDate, which is set to now() when startCommand() is
+ // called, and adjusted by requeueAt().
+ NetworkOperationList _unscheduled; // (M)
+
+ // List of network operations that have been returned by getNextReadyRequest() but not
+ // yet scheudled, black-holed or requeued.
+ NetworkOperationList _processing; // (M)
+
+ // List of network operations whose responses have been scheduled but not delivered, sorted
+ // by NetworkOperation::_responseDate. These operations will have their responses delivered
+ // when now() == getResponseDate().
+ NetworkOperationList _scheduled; // (M)
+
+ // List of network operations that will not be responded to until shutdown() is called.
+ NetworkOperationList _blackHoled; // (M)
+};
+
+/**
+ * Representation of an in-progress network operation.
+ */
+class NetworkInterfaceMock::NetworkOperation {
+public:
+ NetworkOperation();
+ NetworkOperation(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& theRequest,
+ Date_t theRequestDate,
+ const RemoteCommandCompletionFn& onFinish);
+ ~NetworkOperation();
+
+ /**
+ * Adjusts the stored virtual time at which this entry will be subject to consideration
+ * by the test harness.
+ */
+ void setNextConsiderationDate(Date_t nextConsiderationDate);
+
+ /**
+ * Sets the response and thet virtual time at which it will be delivered.
+ */
+ void setResponse(Date_t responseDate, const TaskExecutor::ResponseStatus& response);
+
+ /**
+ * Predicate that returns true if cbHandle equals the executor's handle for this network
+ * operation. Used for searching lists of NetworkOperations.
+ */
+ bool isForCallback(const TaskExecutor::CallbackHandle& cbHandle) const {
+ return cbHandle == _cbHandle;
+ }
+
+ /**
+ * Gets the request that initiated this operation.
+ */
+ const RemoteCommandRequest& getRequest() const {
+ return _request;
+ }
+
+ /**
+ * Gets the virtual time at which the operation was started.
+ */
+ Date_t getRequestDate() const {
+ return _requestDate;
+ }
+
+ /**
+ * Gets the virtual time at which the test harness should next consider what to do
+ * with this request.
+ */
+ Date_t getNextConsiderationDate() const {
+ return _nextConsiderationDate;
+ }
+
+ /**
+ * After setResponse() has been called, returns the virtual time at which
+ * the response should be delivered.
*/
- class NetworkInterfaceMock : public NetworkInterface {
- public:
- class NetworkOperation;
- typedef stdx::list<NetworkOperation> NetworkOperationList;
- typedef NetworkOperationList::iterator NetworkOperationIterator;
-
- NetworkInterfaceMock();
- virtual ~NetworkInterfaceMock();
- virtual std::string getDiagnosticString();
-
- ////////////////////////////////////////////////////////////////////////////////
- //
- // NetworkInterface methods
- //
- ////////////////////////////////////////////////////////////////////////////////
-
- virtual void startup();
- virtual void shutdown();
- virtual void waitForWork();
- virtual void waitForWorkUntil(Date_t when);
- virtual void signalWorkAvailable();
- virtual Date_t now();
- virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish);
- virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle);
-
-
- ////////////////////////////////////////////////////////////////////////////////
- //
- // Methods for simulating network operations and the passage of time.
- //
- // Methods in this section are to be called by the thread currently simulating
- // the network.
- //
- ////////////////////////////////////////////////////////////////////////////////
-
- /**
- * Causes the currently running (non-executor) thread to assume the mantle of the network
- * simulation thread.
- *
- * Call this before calling any of the other methods in this section.
- */
- void enterNetwork();
-
- /**
- * Causes the currently running thread to drop the mantle of "network simulation thread".
- *
- * Call this before calling any methods that might block waiting for the
- * executor thread.
- */
- void exitNetwork();
-
- /**
- * Returns true if there are unscheduled network requests to be processed.
- */
- bool hasReadyRequests();
-
- /**
- * Gets the next unscheduled request to process, blocking until one is available.
- *
- * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
- */
- NetworkOperationIterator getNextReadyRequest();
-
- /**
- * Schedules "response" in response to "noi" at virtual time "when".
- */
- void scheduleResponse(
- NetworkOperationIterator noi,
- Date_t when,
- const TaskExecutor::ResponseStatus& response);
-
- /**
- * Swallows "noi", causing the network interface to not respond to it until
- * shutdown() is called.
- */
- void blackHole(NetworkOperationIterator noi);
-
- /**
- * Defers decision making on "noi" until virtual time "dontAskUntil". Use
- * this when getNextReadyRequest() returns a request you want to deal with
- * after looking at other requests.
- */
- void requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil);
-
- /**
- * Runs the simulator forward until now() == until or hasReadyRequests() is true.
- *
- * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
- */
- void runUntil(Date_t until);
-
- /**
- * Processes all ready, scheduled network operations.
- *
- * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
- */
- void runReadyNetworkOperations();
-
- private:
- /**
- * Type used to identify which thread (network mock or executor) is currently executing.
- *
- * Values are used in a bitmask, as well.
- */
- enum ThreadType {
- kNoThread = 0,
- kExecutorThread = 1,
- kNetworkThread = 2
- };
-
- /**
- * Returns the current virtualized time.
- */
- Date_t _now_inlock() const { return _now; }
-
- /**
- * Implementation of waitForWork*.
- */
- void _waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk);
-
- /**
- * Returns true if there are ready requests for the network thread to service.
- */
- bool _hasReadyRequests_inlock();
-
- /**
- * Returns true if the network thread could run right now.
- */
- bool _isNetworkThreadRunnable_inlock();
-
- /**
- * Returns true if the executor thread could run right now.
- */
- bool _isExecutorThreadRunnable_inlock();
-
- /**
- * Runs all ready network operations, called while holding "lk". May drop and
- * reaquire "lk" several times, but will not return until the executor has blocked
- * in waitFor*.
- */
- void _runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk);
-
- // Mutex that synchronizes access to mutable data in this class and its subclasses.
- // Fields guarded by the mutex are labled (M), below, and those that are read-only
- // in multi-threaded execution, and so unsynchronized, are labeled (R).
- stdx::mutex _mutex;
-
- // Condition signaled to indicate that the network processing thread should wake up.
- stdx::condition_variable _shouldWakeNetworkCondition; // (M)
-
- // Condition signaled to indicate that the executor run thread should wake up.
- stdx::condition_variable _shouldWakeExecutorCondition; // (M)
-
- // Bitmask indicating which threads are runnable.
- int _waitingToRunMask; // (M)
-
- // Indicator of which thread, if any, is currently running.
- ThreadType _currentlyRunning; // (M)
-
- // The current time reported by this instance of NetworkInterfaceMock.
- Date_t _now; // (M)
-
- // Set to true by "startUp()"
- bool _hasStarted; // (M)
-
- // Set to true by "shutDown()".
- bool _inShutdown; // (M)
-
- // Next date that the executor expects to wake up at (due to a scheduleWorkAt() call).
- Date_t _executorNextWakeupDate; // (M)
-
- // List of network operations whose responses haven't been scheduled or blackholed. This is
- // where network requests are first queued. It is sorted by
- // NetworkOperation::_nextConsiderationDate, which is set to now() when startCommand() is
- // called, and adjusted by requeueAt().
- NetworkOperationList _unscheduled; // (M)
-
- // List of network operations that have been returned by getNextReadyRequest() but not
- // yet scheudled, black-holed or requeued.
- NetworkOperationList _processing; // (M)
-
- // List of network operations whose responses have been scheduled but not delivered, sorted
- // by NetworkOperation::_responseDate. These operations will have their responses delivered
- // when now() == getResponseDate().
- NetworkOperationList _scheduled; // (M)
-
- // List of network operations that will not be responded to until shutdown() is called.
- NetworkOperationList _blackHoled; // (M)
- };
+ Date_t getResponseDate() const {
+ return _responseDate;
+ }
/**
- * Representation of an in-progress network operation.
+ * Delivers the response, by invoking the onFinish callback passed into the constructor.
*/
- class NetworkInterfaceMock::NetworkOperation {
- public:
- NetworkOperation();
- NetworkOperation(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& theRequest,
- Date_t theRequestDate,
- const RemoteCommandCompletionFn& onFinish);
- ~NetworkOperation();
-
- /**
- * Adjusts the stored virtual time at which this entry will be subject to consideration
- * by the test harness.
- */
- void setNextConsiderationDate(Date_t nextConsiderationDate);
-
- /**
- * Sets the response and thet virtual time at which it will be delivered.
- */
- void setResponse(Date_t responseDate, const TaskExecutor::ResponseStatus& response);
-
- /**
- * Predicate that returns true if cbHandle equals the executor's handle for this network
- * operation. Used for searching lists of NetworkOperations.
- */
- bool isForCallback(const TaskExecutor::CallbackHandle& cbHandle) const {
- return cbHandle == _cbHandle;
- }
-
- /**
- * Gets the request that initiated this operation.
- */
- const RemoteCommandRequest& getRequest() const { return _request; }
-
- /**
- * Gets the virtual time at which the operation was started.
- */
- Date_t getRequestDate() const { return _requestDate; }
-
- /**
- * Gets the virtual time at which the test harness should next consider what to do
- * with this request.
- */
- Date_t getNextConsiderationDate() const { return _nextConsiderationDate; }
-
- /**
- * After setResponse() has been called, returns the virtual time at which
- * the response should be delivered.
- */
- Date_t getResponseDate() const { return _responseDate; }
-
- /**
- * Delivers the response, by invoking the onFinish callback passed into the constructor.
- */
- void finishResponse();
-
- private:
- Date_t _requestDate;
- Date_t _nextConsiderationDate;
- Date_t _responseDate;
- TaskExecutor::CallbackHandle _cbHandle;
- RemoteCommandRequest _request;
- TaskExecutor::ResponseStatus _response;
- RemoteCommandCompletionFn _onFinish;
- };
+ void finishResponse();
+
+private:
+ Date_t _requestDate;
+ Date_t _nextConsiderationDate;
+ Date_t _responseDate;
+ TaskExecutor::CallbackHandle _cbHandle;
+ RemoteCommandRequest _request;
+ TaskExecutor::ResponseStatus _response;
+ RemoteCommandCompletionFn _onFinish;
+};
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/task_executor.cpp b/src/mongo/executor/task_executor.cpp
index b7f1e4f99bf..1ecbd0f1e62 100644
--- a/src/mongo/executor/task_executor.cpp
+++ b/src/mongo/executor/task_executor.cpp
@@ -33,63 +33,53 @@
namespace mongo {
namespace executor {
- TaskExecutor::TaskExecutor() = default;
- TaskExecutor::~TaskExecutor() = default;
-
- TaskExecutor::CallbackState::CallbackState() = default;
- TaskExecutor::CallbackState::~CallbackState() = default;
-
- TaskExecutor::CallbackHandle::CallbackHandle() = default;
- TaskExecutor::CallbackHandle::CallbackHandle(std::shared_ptr<CallbackState> callback) :
- _callback(std::move(callback)) {}
-
- TaskExecutor::EventState::EventState() = default;
- TaskExecutor::EventState::~EventState() = default;
-
- TaskExecutor::EventHandle::EventHandle() = default;
- TaskExecutor::EventHandle::EventHandle(std::shared_ptr<EventState> event) :
- _event(std::move(event)) {}
-
- TaskExecutor::CallbackArgs::CallbackArgs(TaskExecutor* theExecutor,
- const CallbackHandle& theHandle,
- const Status& theStatus,
- OperationContext* theTxn) :
- executor(theExecutor),
- myHandle(theHandle),
- status(theStatus),
- txn(theTxn) {
- }
-
-
- TaskExecutor::RemoteCommandCallbackArgs::RemoteCommandCallbackArgs(
- TaskExecutor* theExecutor,
- const CallbackHandle& theHandle,
- const RemoteCommandRequest& theRequest,
- const ResponseStatus& theResponse) :
- executor(theExecutor),
- myHandle(theHandle),
- request(theRequest),
- response(theResponse) {
- }
-
- TaskExecutor::CallbackState* TaskExecutor::getCallbackFromHandle(
- const CallbackHandle& cbHandle) {
- return cbHandle.getCallback();
- }
-
- TaskExecutor::EventState* TaskExecutor::getEventFromHandle(const EventHandle& eventHandle) {
- return eventHandle.getEvent();
- }
-
- void TaskExecutor::setEventForHandle(EventHandle* eventHandle,
- std::shared_ptr<EventState> event) {
- eventHandle->setEvent(std::move(event));
- }
-
- void TaskExecutor::setCallbackForHandle(CallbackHandle* cbHandle,
- std::shared_ptr<CallbackState> callback) {
- cbHandle->setCallback(std::move(callback));
- }
-
-} // namespace executor
-} // namespace mongo
+TaskExecutor::TaskExecutor() = default;
+TaskExecutor::~TaskExecutor() = default;
+
+TaskExecutor::CallbackState::CallbackState() = default;
+TaskExecutor::CallbackState::~CallbackState() = default;
+
+TaskExecutor::CallbackHandle::CallbackHandle() = default;
+TaskExecutor::CallbackHandle::CallbackHandle(std::shared_ptr<CallbackState> callback)
+ : _callback(std::move(callback)) {}
+
+TaskExecutor::EventState::EventState() = default;
+TaskExecutor::EventState::~EventState() = default;
+
+TaskExecutor::EventHandle::EventHandle() = default;
+TaskExecutor::EventHandle::EventHandle(std::shared_ptr<EventState> event)
+ : _event(std::move(event)) {}
+
+TaskExecutor::CallbackArgs::CallbackArgs(TaskExecutor* theExecutor,
+ const CallbackHandle& theHandle,
+ const Status& theStatus,
+ OperationContext* theTxn)
+ : executor(theExecutor), myHandle(theHandle), status(theStatus), txn(theTxn) {}
+
+
+TaskExecutor::RemoteCommandCallbackArgs::RemoteCommandCallbackArgs(
+ TaskExecutor* theExecutor,
+ const CallbackHandle& theHandle,
+ const RemoteCommandRequest& theRequest,
+ const ResponseStatus& theResponse)
+ : executor(theExecutor), myHandle(theHandle), request(theRequest), response(theResponse) {}
+
+TaskExecutor::CallbackState* TaskExecutor::getCallbackFromHandle(const CallbackHandle& cbHandle) {
+ return cbHandle.getCallback();
+}
+
+TaskExecutor::EventState* TaskExecutor::getEventFromHandle(const EventHandle& eventHandle) {
+ return eventHandle.getEvent();
+}
+
+void TaskExecutor::setEventForHandle(EventHandle* eventHandle, std::shared_ptr<EventState> event) {
+ eventHandle->setEvent(std::move(event));
+}
+
+void TaskExecutor::setCallbackForHandle(CallbackHandle* cbHandle,
+ std::shared_ptr<CallbackState> callback) {
+ cbHandle->setCallback(std::move(callback));
+}
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index 5f42ebeee08..46e195bd7a9 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -41,328 +41,318 @@
namespace mongo {
- class OperationContext;
+class OperationContext;
namespace executor {
+/**
+ * Generic event loop with notions of events and callbacks.
+ *
+ * Callbacks represent work to be performed by the executor.
+ * They may be scheduled by client threads or by other callbacks. Methods that
+ * schedule callbacks return a CallbackHandle if they are able to enqueue the callback in the
+ * appropriate work queue. Every CallbackHandle represents an invocation of a function that
+ * will happen before the executor goes out of scope. Calling cancel(CallbackHandle) schedules
+ * the specified callback to run with a flag indicating that it is "canceled," but it will run.
+ * Client threads may block waiting for a callback to execute by calling wait(CallbackHandle).
+ *
+ * Events are level-triggered and may only be signaled one time. Client threads and callbacks
+ * may schedule callbacks to be run by the executor after the event is signaled, and client
+ * threads may ask the executor to block them until after the event is signaled.
+ *
+ * If an event is unsignaled when shutdown is called, the executor will ensure that any threads
+ * blocked in waitForEvent() eventually return.
+ *
+ * Logically, Callbacks and Events exist for the life of the executor. That means that while
+ * the executor is in scope, no CallbackHandle or EventHandle is stale.
+ */
+class TaskExecutor {
+ MONGO_DISALLOW_COPYING(TaskExecutor);
+
+public:
+ struct CallbackArgs;
+ struct RemoteCommandCallbackArgs;
+ class CallbackState;
+ class CallbackHandle;
+ class EventState;
+ class EventHandle;
+
+ using ResponseStatus = StatusWith<RemoteCommandResponse>;
+
/**
- * Generic event loop with notions of events and callbacks.
+ * Type of a regular callback function.
*
- * Callbacks represent work to be performed by the executor.
- * They may be scheduled by client threads or by other callbacks. Methods that
- * schedule callbacks return a CallbackHandle if they are able to enqueue the callback in the
- * appropriate work queue. Every CallbackHandle represents an invocation of a function that
- * will happen before the executor goes out of scope. Calling cancel(CallbackHandle) schedules
- * the specified callback to run with a flag indicating that it is "canceled," but it will run.
- * Client threads may block waiting for a callback to execute by calling wait(CallbackHandle).
+ * The status argument passed at invocation will have code ErrorCodes::CallbackCanceled if
+ * the callback was canceled for any reason (including shutdown). Otherwise, it should have
+ * Status::OK().
+ */
+ using CallbackFn = stdx::function<void(const CallbackArgs&)>;
+
+ /**
+ * Type of a callback from a request to run a command on a remote MongoDB node.
*
- * Events are level-triggered and may only be signaled one time. Client threads and callbacks
- * may schedule callbacks to be run by the executor after the event is signaled, and client
- * threads may ask the executor to block them until after the event is signaled.
+ * The StatusWith<const BSONObj> will have ErrorCodes::CallbackCanceled if the callback was
+ * canceled. Otherwise, its status will represent any failure to execute the command.
+ * If the command executed and a response came back, then the status object will contain
+ * the BSONObj returned by the command, with the "ok" field indicating the success of the
+ * command in the usual way.
+ */
+ using RemoteCommandCallbackFn = stdx::function<void(const RemoteCommandCallbackArgs&)>;
+
+ virtual ~TaskExecutor();
+
+ /**
+ * Signals to the executor that it should shut down.
+ */
+ virtual void shutdown() = 0;
+
+ /**
+ * Returns diagnostic information.
+ */
+ virtual std::string getDiagnosticString() = 0;
+
+ /**
+ * Gets the current time. Callbacks should use this method to read the system clock.
+ */
+ virtual Date_t now() = 0;
+
+ /**
+ * Creates a new event. Returns a handle to the event, or ErrorCodes::ShutdownInProgress if
+ * makeEvent() fails because the executor is shutting down.
+ *
+ * May be called by client threads or callbacks running in the executor.
+ */
+ virtual StatusWith<EventHandle> makeEvent() = 0;
+
+ /**
+ * Signals the event, making waiting client threads and callbacks runnable.
*
- * If an event is unsignaled when shutdown is called, the executor will ensure that any threads
- * blocked in waitForEvent() eventually return.
+ * May be called up to one time per event.
*
- * Logically, Callbacks and Events exist for the life of the executor. That means that while
- * the executor is in scope, no CallbackHandle or EventHandle is stale.
+ * May be called by client threads or callbacks running in the executor.
*/
- class TaskExecutor {
- MONGO_DISALLOW_COPYING(TaskExecutor);
- public:
- struct CallbackArgs;
- struct RemoteCommandCallbackArgs;
- class CallbackState;
- class CallbackHandle;
- class EventState;
- class EventHandle;
-
- using ResponseStatus = StatusWith<RemoteCommandResponse>;
-
- /**
- * Type of a regular callback function.
- *
- * The status argument passed at invocation will have code ErrorCodes::CallbackCanceled if
- * the callback was canceled for any reason (including shutdown). Otherwise, it should have
- * Status::OK().
- */
- using CallbackFn = stdx::function<void (const CallbackArgs&)>;
-
- /**
- * Type of a callback from a request to run a command on a remote MongoDB node.
- *
- * The StatusWith<const BSONObj> will have ErrorCodes::CallbackCanceled if the callback was
- * canceled. Otherwise, its status will represent any failure to execute the command.
- * If the command executed and a response came back, then the status object will contain
- * the BSONObj returned by the command, with the "ok" field indicating the success of the
- * command in the usual way.
- */
- using RemoteCommandCallbackFn = stdx::function<void (const RemoteCommandCallbackArgs&)>;
-
- virtual ~TaskExecutor();
-
- /**
- * Signals to the executor that it should shut down.
- */
- virtual void shutdown() = 0;
-
- /**
- * Returns diagnostic information.
- */
- virtual std::string getDiagnosticString() = 0;
-
- /**
- * Gets the current time. Callbacks should use this method to read the system clock.
- */
- virtual Date_t now() = 0;
-
- /**
- * Creates a new event. Returns a handle to the event, or ErrorCodes::ShutdownInProgress if
- * makeEvent() fails because the executor is shutting down.
- *
- * May be called by client threads or callbacks running in the executor.
- */
- virtual StatusWith<EventHandle> makeEvent() = 0;
-
- /**
- * Signals the event, making waiting client threads and callbacks runnable.
- *
- * May be called up to one time per event.
- *
- * May be called by client threads or callbacks running in the executor.
- */
- virtual void signalEvent(const EventHandle& event) = 0;
-
- /**
- * Schedules a callback, "work", to run after "event" is signaled. If "event"
- * has already been signaled, marks "work" as immediately runnable.
- *
- * If "event" has yet to be signaled when "shutdown()" is called, "work" will
- * be scheduled with a status of ErrorCodes::CallbackCanceled.
- *
- * May be called by client threads or callbacks running in the executor.
- */
- virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event,
- const CallbackFn& work) = 0;
-
- /**
- * Blocks the calling thread until after "event" is signaled. Also returns
- * if the event is never signaled but shutdown() is called on the executor.
- *
- * NOTE: Do not call from a callback running in the executor.
- *
- * TODO(schwerin): Change return type so that the caller can know which of the two reasons
- * led to this method returning.
- */
- virtual void waitForEvent(const EventHandle& event) = 0;
-
- /**
- * Schedules "work" to be run by the executor ASAP.
- *
- * Returns a handle for waiting on or canceling the callback, or
- * ErrorCodes::ShutdownInProgress.
- *
- * May be called by client threads or callbacks running in the executor.
- */
- virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) = 0;
-
- /**
- * Schedules "work" to be run by the executor no sooner than "when".
- *
- * Returns a handle for waiting on or canceling the callback, or
- * ErrorCodes::ShutdownInProgress.
- *
- * May be called by client threads or callbacks running in the executor.
- */
- virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) = 0;
-
- /**
- * Schedules "cb" to be run by the executor with the result of executing the remote command
- * described by "request".
- *
- * Returns a handle for waiting on or canceling the callback, or
- * ErrorCodes::ShutdownInProgress.
- *
- * May be called by client threads or callbacks running in the executor.
- */
- virtual StatusWith<CallbackHandle> scheduleRemoteCommand(
- const RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) = 0;
-
- /**
- * If the callback referenced by "cbHandle" hasn't already executed, marks it as
- * canceled and runnable.
- *
- * May be called by client threads or callbacks running in the executor.
- */
- virtual void cancel(const CallbackHandle& cbHandle) = 0;
-
- /**
- * Blocks until the executor finishes running the callback referenced by "cbHandle".
- *
- * Because callbacks all run during shutdown if they weren't run beforehand, there is no need
- * to indicate the reason for returning from wait(CallbackHandle). It is always that the
- * callback ran.
- *
- * NOTE: Do not call from a callback running in the executor.
- */
- virtual void wait(const CallbackHandle& cbHandle) = 0;
-
- protected:
-
- TaskExecutor();
-
- // Retrieves the Callback from a given CallbackHandle
- CallbackState* getCallbackFromHandle(const CallbackHandle& cbHandle);
-
- // Retrieves the Event from a given EventHandle
- EventState* getEventFromHandle(const EventHandle& eventHandle);
-
- // Sets the given CallbackHandle to point to the given callback.
- void setCallbackForHandle(CallbackHandle* cbHandle,
- std::shared_ptr<CallbackState> callback);
-
- // Sets the given EventHandle to point to the given event.
- void setEventForHandle(EventHandle* eventHandle, std::shared_ptr<EventState> event);
- };
+ virtual void signalEvent(const EventHandle& event) = 0;
/**
- * Class representing a scheduled callback and providing methods for interacting with it.
+ * Schedules a callback, "work", to run after "event" is signaled. If "event"
+ * has already been signaled, marks "work" as immediately runnable.
+ *
+ * If "event" has yet to be signaled when "shutdown()" is called, "work" will
+ * be scheduled with a status of ErrorCodes::CallbackCanceled.
+ *
+ * May be called by client threads or callbacks running in the executor.
*/
- class TaskExecutor::CallbackState {
- MONGO_DISALLOW_COPYING(CallbackState);
- public:
+ virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event,
+ const CallbackFn& work) = 0;
- virtual ~CallbackState();
+ /**
+ * Blocks the calling thread until after "event" is signaled. Also returns
+ * if the event is never signaled but shutdown() is called on the executor.
+ *
+ * NOTE: Do not call from a callback running in the executor.
+ *
+ * TODO(schwerin): Change return type so that the caller can know which of the two reasons
+ * led to this method returning.
+ */
+ virtual void waitForEvent(const EventHandle& event) = 0;
- virtual void cancel() = 0;
- virtual void waitForCompletion() = 0;
+ /**
+ * Schedules "work" to be run by the executor ASAP.
+ *
+ * Returns a handle for waiting on or canceling the callback, or
+ * ErrorCodes::ShutdownInProgress.
+ *
+ * May be called by client threads or callbacks running in the executor.
+ */
+ virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) = 0;
- protected:
+ /**
+ * Schedules "work" to be run by the executor no sooner than "when".
+ *
+ * Returns a handle for waiting on or canceling the callback, or
+ * ErrorCodes::ShutdownInProgress.
+ *
+ * May be called by client threads or callbacks running in the executor.
+ */
+ virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) = 0;
- CallbackState();
+ /**
+ * Schedules "cb" to be run by the executor with the result of executing the remote command
+ * described by "request".
+ *
+ * Returns a handle for waiting on or canceling the callback, or
+ * ErrorCodes::ShutdownInProgress.
+ *
+ * May be called by client threads or callbacks running in the executor.
+ */
+ virtual StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb) = 0;
- };
+ /**
+ * If the callback referenced by "cbHandle" hasn't already executed, marks it as
+ * canceled and runnable.
+ *
+ * May be called by client threads or callbacks running in the executor.
+ */
+ virtual void cancel(const CallbackHandle& cbHandle) = 0;
/**
- * Handle to a CallbackState.
+ * Blocks until the executor finishes running the callback referenced by "cbHandle".
+ *
+ * Because callbacks all run during shutdown if they weren't run beforehand, there is no need
+ * to indicate the reason for returning from wait(CallbackHandle). It is always that the
+ * callback ran.
+ *
+ * NOTE: Do not call from a callback running in the executor.
*/
- class TaskExecutor::CallbackHandle {
- friend class TaskExecutor;
+ virtual void wait(const CallbackHandle& cbHandle) = 0;
- public:
+protected:
+ TaskExecutor();
- CallbackHandle();
- explicit CallbackHandle(std::shared_ptr<CallbackState> cbData);
+ // Retrieves the Callback from a given CallbackHandle
+ CallbackState* getCallbackFromHandle(const CallbackHandle& cbHandle);
- bool operator==(const CallbackHandle &other) const {
- return _callback == other._callback;
- }
+ // Retrieves the Event from a given EventHandle
+ EventState* getEventFromHandle(const EventHandle& eventHandle);
- bool operator!=(const CallbackHandle &other) const {
- return !(*this == other);
- }
+ // Sets the given CallbackHandle to point to the given callback.
+ void setCallbackForHandle(CallbackHandle* cbHandle, std::shared_ptr<CallbackState> callback);
- bool isValid() const {
- return _callback.get();
- }
+ // Sets the given EventHandle to point to the given event.
+ void setEventForHandle(EventHandle* eventHandle, std::shared_ptr<EventState> event);
+};
- private:
+/**
+ * Class representing a scheduled callback and providing methods for interacting with it.
+ */
+class TaskExecutor::CallbackState {
+ MONGO_DISALLOW_COPYING(CallbackState);
- void setCallback(std::shared_ptr<CallbackState> callback) {
- _callback = callback;
- }
+public:
+ virtual ~CallbackState();
- CallbackState* getCallback() const {
- return _callback.get();
- }
+ virtual void cancel() = 0;
+ virtual void waitForCompletion() = 0;
- std::shared_ptr<CallbackState> _callback;
- };
+protected:
+ CallbackState();
+};
- /**
- * Class representing a scheduled event and providing methods for interacting with it.
- */
- class TaskExecutor::EventState {
- MONGO_DISALLOW_COPYING(EventState);
- public:
+/**
+ * Handle to a CallbackState.
+ */
+class TaskExecutor::CallbackHandle {
+ friend class TaskExecutor;
- virtual ~EventState();
+public:
+ CallbackHandle();
+ explicit CallbackHandle(std::shared_ptr<CallbackState> cbData);
- virtual void signal() = 0;
- virtual void waitUntilSignaled() = 0;
- virtual bool isSignaled() = 0;
+ bool operator==(const CallbackHandle& other) const {
+ return _callback == other._callback;
+ }
- protected:
+ bool operator!=(const CallbackHandle& other) const {
+ return !(*this == other);
+ }
- EventState();
- };
+ bool isValid() const {
+ return _callback.get();
+ }
- /**
- * Handle to an EventState.
- */
- class TaskExecutor::EventHandle {
- friend class TaskExecutor;
+private:
+ void setCallback(std::shared_ptr<CallbackState> callback) {
+ _callback = callback;
+ }
- public:
+ CallbackState* getCallback() const {
+ return _callback.get();
+ }
- EventHandle();
- explicit EventHandle(std::shared_ptr<EventState> event);
+ std::shared_ptr<CallbackState> _callback;
+};
- bool operator==(const EventHandle &other) const {
- return _event == other._event;
- }
+/**
+ * Class representing a scheduled event and providing methods for interacting with it.
+ */
+class TaskExecutor::EventState {
+ MONGO_DISALLOW_COPYING(EventState);
- bool operator!=(const EventHandle &other) const {
- return !(*this == other);
- }
+public:
+ virtual ~EventState();
- bool isValid() const {
- return _event.get();
- }
+ virtual void signal() = 0;
+ virtual void waitUntilSignaled() = 0;
+ virtual bool isSignaled() = 0;
- private:
+protected:
+ EventState();
+};
- void setEvent(std::shared_ptr<EventState> event) {
- _event = event;
- }
+/**
+ * Handle to an EventState.
+ */
+class TaskExecutor::EventHandle {
+ friend class TaskExecutor;
- EventState* getEvent() const {
- return _event.get();
- }
+public:
+ EventHandle();
+ explicit EventHandle(std::shared_ptr<EventState> event);
- std::shared_ptr<EventState> _event;
- };
+ bool operator==(const EventHandle& other) const {
+ return _event == other._event;
+ }
- /**
- * Argument passed to all callbacks scheduled via a TaskExecutor.
- */
- struct TaskExecutor::CallbackArgs {
- CallbackArgs(TaskExecutor* theExecutor,
- const CallbackHandle& theHandle,
- const Status& theStatus,
- OperationContext* txn = NULL);
-
- TaskExecutor* executor;
- CallbackHandle myHandle;
- Status status;
- OperationContext* txn;
- };
+ bool operator!=(const EventHandle& other) const {
+ return !(*this == other);
+ }
- /**
- * Argument passed to all remote command callbacks scheduled via a TaskExecutor.
- */
- struct TaskExecutor::RemoteCommandCallbackArgs {
- RemoteCommandCallbackArgs(TaskExecutor* theExecutor,
- const CallbackHandle& theHandle,
- const RemoteCommandRequest& theRequest,
- const StatusWith<RemoteCommandResponse>& theResponse);
+ bool isValid() const {
+ return _event.get();
+ }
- TaskExecutor* executor;
- CallbackHandle myHandle;
- RemoteCommandRequest request;
- StatusWith<RemoteCommandResponse> response;
- };
+private:
+ void setEvent(std::shared_ptr<EventState> event) {
+ _event = event;
+ }
-} // namespace executor
-} // namespace mongo
+ EventState* getEvent() const {
+ return _event.get();
+ }
+ std::shared_ptr<EventState> _event;
+};
+
+/**
+ * Argument passed to all callbacks scheduled via a TaskExecutor.
+ */
+struct TaskExecutor::CallbackArgs {
+ CallbackArgs(TaskExecutor* theExecutor,
+ const CallbackHandle& theHandle,
+ const Status& theStatus,
+ OperationContext* txn = NULL);
+
+ TaskExecutor* executor;
+ CallbackHandle myHandle;
+ Status status;
+ OperationContext* txn;
+};
+
+/**
+ * Argument passed to all remote command callbacks scheduled via a TaskExecutor.
+ */
+struct TaskExecutor::RemoteCommandCallbackArgs {
+ RemoteCommandCallbackArgs(TaskExecutor* theExecutor,
+ const CallbackHandle& theHandle,
+ const RemoteCommandRequest& theRequest,
+ const StatusWith<RemoteCommandResponse>& theResponse);
+
+ TaskExecutor* executor;
+ CallbackHandle myHandle;
+ RemoteCommandRequest request;
+ StatusWith<RemoteCommandResponse> response;
+};
+
+} // namespace executor
+} // namespace mongo