summaryrefslogtreecommitdiff
path: root/src/mongo/executor/network_interface_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/executor/network_interface_impl.cpp')
-rw-r--r--src/mongo/executor/network_interface_impl.cpp376
1 files changed, 182 insertions, 194 deletions
diff --git a/src/mongo/executor/network_interface_impl.cpp b/src/mongo/executor/network_interface_impl.cpp
index 4e443a45165..0b997f7f219 100644
--- a/src/mongo/executor/network_interface_impl.cpp
+++ b/src/mongo/executor/network_interface_impl.cpp
@@ -46,232 +46,220 @@ namespace executor {
namespace {
- const size_t kMinThreads = 1;
- const size_t kMaxThreads = 51; // Set to 1 + max repl set size, for heartbeat + wiggle room.
- const Seconds kMaxIdleThreadAge(30);
+const size_t kMinThreads = 1;
+const size_t kMaxThreads = 51; // Set to 1 + max repl set size, for heartbeat + wiggle room.
+const Seconds kMaxIdleThreadAge(30);
} // namespace
- NetworkInterfaceImpl::NetworkInterfaceImpl() :
- NetworkInterface(),
- _numIdleThreads(0),
- _nextThreadId(0),
- _lastFullUtilizationDate(),
- _isExecutorRunnable(false),
- _inShutdown(false),
- _commandRunner(kMessagingPortKeepOpen),
- _numActiveNetworkRequests(0) {
+NetworkInterfaceImpl::NetworkInterfaceImpl()
+ : NetworkInterface(),
+ _numIdleThreads(0),
+ _nextThreadId(0),
+ _lastFullUtilizationDate(),
+ _isExecutorRunnable(false),
+ _inShutdown(false),
+ _commandRunner(kMessagingPortKeepOpen),
+ _numActiveNetworkRequests(0) {}
- }
-
- NetworkInterfaceImpl::~NetworkInterfaceImpl() { }
+NetworkInterfaceImpl::~NetworkInterfaceImpl() {}
- std::string NetworkInterfaceImpl::getDiagnosticString() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- str::stream output;
- output << "NetworkImpl";
- output << " threads:" << _threads.size();
- output << " inShutdown:" << _inShutdown;
- output << " active:" << _numActiveNetworkRequests;
- output << " pending:" << _pending.size();
- output << " execRunable:" << _isExecutorRunnable;
- return output;
+std::string NetworkInterfaceImpl::getDiagnosticString() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ str::stream output;
+ output << "NetworkImpl";
+ output << " threads:" << _threads.size();
+ output << " inShutdown:" << _inShutdown;
+ output << " active:" << _numActiveNetworkRequests;
+ output << " pending:" << _pending.size();
+ output << " execRunable:" << _isExecutorRunnable;
+ return output;
+}
+void NetworkInterfaceImpl::_startNewNetworkThread_inlock() {
+ if (_inShutdown) {
+ LOG(1) << "Not starting new replication networking thread while shutting down replication.";
+ return;
}
-
- void NetworkInterfaceImpl::_startNewNetworkThread_inlock() {
- if (_inShutdown) {
- LOG(1) <<
- "Not starting new replication networking thread while shutting down replication.";
- return;
- }
- if (_threads.size() >= kMaxThreads) {
- LOG(1) << "Not starting new replication networking thread because " << kMaxThreads <<
- " are already running; " << _numIdleThreads << " threads are idle and " <<
- _pending.size() << " network requests are waiting for a thread to serve them.";
- return;
- }
- const std::string threadName(str::stream() << "ReplExecNetThread-" << _nextThreadId++);
- try {
- _threads.push_back(
- std::make_shared<stdx::thread>(
- stdx::bind(&NetworkInterfaceImpl::_requestProcessorThreadBody,
- this,
- threadName)));
- ++_numIdleThreads;
- }
- catch (const std::exception& ex) {
- error() << "Failed to start " << threadName << "; " << _threads.size() <<
- " other network threads still running; caught exception: " << ex.what();
- }
+ if (_threads.size() >= kMaxThreads) {
+ LOG(1) << "Not starting new replication networking thread because " << kMaxThreads
+ << " are already running; " << _numIdleThreads << " threads are idle and "
+ << _pending.size() << " network requests are waiting for a thread to serve them.";
+ return;
}
-
- void NetworkInterfaceImpl::startup() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(!_inShutdown);
- if (!_threads.empty()) {
- return;
- }
- for (size_t i = 0; i < kMinThreads; ++i) {
- _startNewNetworkThread_inlock();
- }
+ const std::string threadName(str::stream() << "ReplExecNetThread-" << _nextThreadId++);
+ try {
+ _threads.push_back(std::make_shared<stdx::thread>(
+ stdx::bind(&NetworkInterfaceImpl::_requestProcessorThreadBody, this, threadName)));
+ ++_numIdleThreads;
+ } catch (const std::exception& ex) {
+ error() << "Failed to start " << threadName << "; " << _threads.size()
+ << " other network threads still running; caught exception: " << ex.what();
}
+}
- void NetworkInterfaceImpl::shutdown() {
- using std::swap;
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _inShutdown = true;
- _hasPending.notify_all();
- ThreadList threadsToJoin;
- swap(threadsToJoin, _threads);
- lk.unlock();
- _commandRunner.shutdown();
- std::for_each(threadsToJoin.begin(),
- threadsToJoin.end(),
- stdx::bind(&stdx::thread::join, stdx::placeholders::_1));
+void NetworkInterfaceImpl::startup() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(!_inShutdown);
+ if (!_threads.empty()) {
+ return;
}
-
- void NetworkInterfaceImpl::signalWorkAvailable() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _signalWorkAvailable_inlock();
+ for (size_t i = 0; i < kMinThreads; ++i) {
+ _startNewNetworkThread_inlock();
}
+}
- void NetworkInterfaceImpl::_signalWorkAvailable_inlock() {
- if (!_isExecutorRunnable) {
- _isExecutorRunnable = true;
- _isExecutorRunnableCondition.notify_one();
- }
+void NetworkInterfaceImpl::shutdown() {
+ using std::swap;
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _inShutdown = true;
+ _hasPending.notify_all();
+ ThreadList threadsToJoin;
+ swap(threadsToJoin, _threads);
+ lk.unlock();
+ _commandRunner.shutdown();
+ std::for_each(threadsToJoin.begin(),
+ threadsToJoin.end(),
+ stdx::bind(&stdx::thread::join, stdx::placeholders::_1));
+}
+
+void NetworkInterfaceImpl::signalWorkAvailable() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _signalWorkAvailable_inlock();
+}
+
+void NetworkInterfaceImpl::_signalWorkAvailable_inlock() {
+ if (!_isExecutorRunnable) {
+ _isExecutorRunnable = true;
+ _isExecutorRunnableCondition.notify_one();
}
+}
- void NetworkInterfaceImpl::waitForWork() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (!_isExecutorRunnable) {
- _isExecutorRunnableCondition.wait(lk);
- }
- _isExecutorRunnable = false;
+void NetworkInterfaceImpl::waitForWork() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (!_isExecutorRunnable) {
+ _isExecutorRunnableCondition.wait(lk);
}
+ _isExecutorRunnable = false;
+}
- void NetworkInterfaceImpl::waitForWorkUntil(Date_t when) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (!_isExecutorRunnable) {
- const Milliseconds waitTime(when - now());
- if (waitTime <= Milliseconds(0)) {
- break;
- }
- _isExecutorRunnableCondition.wait_for(lk, waitTime);
+void NetworkInterfaceImpl::waitForWorkUntil(Date_t when) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (!_isExecutorRunnable) {
+ const Milliseconds waitTime(when - now());
+ if (waitTime <= Milliseconds(0)) {
+ break;
}
- _isExecutorRunnable = false;
+ _isExecutorRunnableCondition.wait_for(lk, waitTime);
}
+ _isExecutorRunnable = false;
+}
- void NetworkInterfaceImpl::_requestProcessorThreadBody(
- NetworkInterfaceImpl* net,
- const std::string& threadName) {
- setThreadName(threadName);
- LOG(1) << "thread starting";
- net->_consumeNetworkRequests();
+void NetworkInterfaceImpl::_requestProcessorThreadBody(NetworkInterfaceImpl* net,
+ const std::string& threadName) {
+ setThreadName(threadName);
+ LOG(1) << "thread starting";
+ net->_consumeNetworkRequests();
- // At this point, another thread may have destroyed "net", if this thread chose to detach
- // itself and remove itself from net->_threads before releasing net->_mutex. Do not access
- // member variables of "net" from here, on.
- LOG(1) << "thread shutting down";
- }
+ // At this point, another thread may have destroyed "net", if this thread chose to detach
+ // itself and remove itself from net->_threads before releasing net->_mutex. Do not access
+ // member variables of "net" from here, on.
+ LOG(1) << "thread shutting down";
+}
- void NetworkInterfaceImpl::_consumeNetworkRequests() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (!_inShutdown) {
- if (_pending.empty()) {
- if (_threads.size() > kMinThreads) {
- const Date_t nowDate = now();
- const Date_t nextThreadRetirementDate =
- _lastFullUtilizationDate + kMaxIdleThreadAge;
- if (nowDate > nextThreadRetirementDate) {
- _lastFullUtilizationDate = nowDate;
- break;
- }
+void NetworkInterfaceImpl::_consumeNetworkRequests() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (!_inShutdown) {
+ if (_pending.empty()) {
+ if (_threads.size() > kMinThreads) {
+ const Date_t nowDate = now();
+ const Date_t nextThreadRetirementDate =
+ _lastFullUtilizationDate + kMaxIdleThreadAge;
+ if (nowDate > nextThreadRetirementDate) {
+ _lastFullUtilizationDate = nowDate;
+ break;
}
- _hasPending.wait_for(lk, kMaxIdleThreadAge);
- continue;
}
- CommandData todo = _pending.front();
- _pending.pop_front();
- ++_numActiveNetworkRequests;
- --_numIdleThreads;
- lk.unlock();
- TaskExecutor::ResponseStatus result = _commandRunner.runCommand(todo.request);
- LOG(2) << "Network status of sending " << todo.request.cmdObj.firstElementFieldName() <<
- " to " << todo.request.target << " was " << result.getStatus();
- todo.onFinish(result);
- lk.lock();
- --_numActiveNetworkRequests;
- ++_numIdleThreads;
- _signalWorkAvailable_inlock();
+ _hasPending.wait_for(lk, kMaxIdleThreadAge);
+ continue;
}
+ CommandData todo = _pending.front();
+ _pending.pop_front();
+ ++_numActiveNetworkRequests;
--_numIdleThreads;
- if (_inShutdown) {
- return;
- }
- // This thread is ending because it was idle for too long.
- // Find self in _threads, remove self from _threads, detach self.
- for (size_t i = 0; i < _threads.size(); ++i) {
- if (_threads[i]->get_id() != stdx::this_thread::get_id()) {
- continue;
- }
- _threads[i]->detach();
- _threads[i].swap(_threads.back());
- _threads.pop_back();
- return;
+ lk.unlock();
+ TaskExecutor::ResponseStatus result = _commandRunner.runCommand(todo.request);
+ LOG(2) << "Network status of sending " << todo.request.cmdObj.firstElementFieldName()
+ << " to " << todo.request.target << " was " << result.getStatus();
+ todo.onFinish(result);
+ lk.lock();
+ --_numActiveNetworkRequests;
+ ++_numIdleThreads;
+ _signalWorkAvailable_inlock();
+ }
+ --_numIdleThreads;
+ if (_inShutdown) {
+ return;
+ }
+ // This thread is ending because it was idle for too long.
+ // Find self in _threads, remove self from _threads, detach self.
+ for (size_t i = 0; i < _threads.size(); ++i) {
+ if (_threads[i]->get_id() != stdx::this_thread::get_id()) {
+ continue;
}
- severe().stream() << "Could not find this thread, with id " <<
- stdx::this_thread::get_id() << " in the replication networking thread pool";
- fassertFailedNoTrace(28676);
+ _threads[i]->detach();
+ _threads[i].swap(_threads.back());
+ _threads.pop_back();
+ return;
}
+ severe().stream() << "Could not find this thread, with id " << stdx::this_thread::get_id()
+ << " in the replication networking thread pool";
+ fassertFailedNoTrace(28676);
+}
- void NetworkInterfaceImpl::startCommand(
- const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
- LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " <<
- request.target;
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _pending.push_back(CommandData());
- CommandData& cd = _pending.back();
- cd.cbHandle = cbHandle;
- cd.request = request;
- cd.onFinish = onFinish;
- if (_numIdleThreads < _pending.size()) {
- _startNewNetworkThread_inlock();
- }
- if (_numIdleThreads <= _pending.size()) {
- _lastFullUtilizationDate = Date_t::now();
- }
- _hasPending.notify_one();
+void NetworkInterfaceImpl::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish) {
+ LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " << request.target;
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _pending.push_back(CommandData());
+ CommandData& cd = _pending.back();
+ cd.cbHandle = cbHandle;
+ cd.request = request;
+ cd.onFinish = onFinish;
+ if (_numIdleThreads < _pending.size()) {
+ _startNewNetworkThread_inlock();
+ }
+ if (_numIdleThreads <= _pending.size()) {
+ _lastFullUtilizationDate = Date_t::now();
}
+ _hasPending.notify_one();
+}
- void NetworkInterfaceImpl::cancelCommand(
- const TaskExecutor::CallbackHandle& cbHandle) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- CommandDataList::iterator iter;
- for (iter = _pending.begin(); iter != _pending.end(); ++iter) {
- if (iter->cbHandle == cbHandle) {
- break;
- }
+void NetworkInterfaceImpl::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ CommandDataList::iterator iter;
+ for (iter = _pending.begin(); iter != _pending.end(); ++iter) {
+ if (iter->cbHandle == cbHandle) {
+ break;
}
- if (iter == _pending.end()) {
- return;
- }
- const RemoteCommandCompletionFn onFinish = iter->onFinish;
- LOG(2) << "Canceled sending " << iter->request.cmdObj.firstElementFieldName() << " to " <<
- iter->request.target;
- _pending.erase(iter);
- lk.unlock();
- onFinish(TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled, "Callback canceled"));
- lk.lock();
- _signalWorkAvailable_inlock();
}
-
- Date_t NetworkInterfaceImpl::now() {
- return Date_t::now();
+ if (iter == _pending.end()) {
+ return;
}
+ const RemoteCommandCompletionFn onFinish = iter->onFinish;
+ LOG(2) << "Canceled sending " << iter->request.cmdObj.firstElementFieldName() << " to "
+ << iter->request.target;
+ _pending.erase(iter);
+ lk.unlock();
+ onFinish(TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled, "Callback canceled"));
+ lk.lock();
+ _signalWorkAvailable_inlock();
+}
+
+Date_t NetworkInterfaceImpl::now() {
+ return Date_t::now();
+}
-} // namespace executor
-} // namespace mongo
+} // namespace executor
+} // namespace mongo