diff options
Diffstat (limited to 'src/mongo/executor/network_interface_impl.cpp')
-rw-r--r-- | src/mongo/executor/network_interface_impl.cpp | 376 |
1 files changed, 182 insertions, 194 deletions
diff --git a/src/mongo/executor/network_interface_impl.cpp b/src/mongo/executor/network_interface_impl.cpp index 4e443a45165..0b997f7f219 100644 --- a/src/mongo/executor/network_interface_impl.cpp +++ b/src/mongo/executor/network_interface_impl.cpp @@ -46,232 +46,220 @@ namespace executor { namespace { - const size_t kMinThreads = 1; - const size_t kMaxThreads = 51; // Set to 1 + max repl set size, for heartbeat + wiggle room. - const Seconds kMaxIdleThreadAge(30); +const size_t kMinThreads = 1; +const size_t kMaxThreads = 51; // Set to 1 + max repl set size, for heartbeat + wiggle room. +const Seconds kMaxIdleThreadAge(30); } // namespace - NetworkInterfaceImpl::NetworkInterfaceImpl() : - NetworkInterface(), - _numIdleThreads(0), - _nextThreadId(0), - _lastFullUtilizationDate(), - _isExecutorRunnable(false), - _inShutdown(false), - _commandRunner(kMessagingPortKeepOpen), - _numActiveNetworkRequests(0) { +NetworkInterfaceImpl::NetworkInterfaceImpl() + : NetworkInterface(), + _numIdleThreads(0), + _nextThreadId(0), + _lastFullUtilizationDate(), + _isExecutorRunnable(false), + _inShutdown(false), + _commandRunner(kMessagingPortKeepOpen), + _numActiveNetworkRequests(0) {} - } - - NetworkInterfaceImpl::~NetworkInterfaceImpl() { } +NetworkInterfaceImpl::~NetworkInterfaceImpl() {} - std::string NetworkInterfaceImpl::getDiagnosticString() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - str::stream output; - output << "NetworkImpl"; - output << " threads:" << _threads.size(); - output << " inShutdown:" << _inShutdown; - output << " active:" << _numActiveNetworkRequests; - output << " pending:" << _pending.size(); - output << " execRunable:" << _isExecutorRunnable; - return output; +std::string NetworkInterfaceImpl::getDiagnosticString() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + str::stream output; + output << "NetworkImpl"; + output << " threads:" << _threads.size(); + output << " inShutdown:" << _inShutdown; + output << " active:" << _numActiveNetworkRequests; + output << " pending:" << _pending.size(); + output << " execRunable:" << _isExecutorRunnable; + return output; +} +void NetworkInterfaceImpl::_startNewNetworkThread_inlock() { + if (_inShutdown) { + LOG(1) << "Not starting new replication networking thread while shutting down replication."; + return; } - - void NetworkInterfaceImpl::_startNewNetworkThread_inlock() { - if (_inShutdown) { - LOG(1) << - "Not starting new replication networking thread while shutting down replication."; - return; - } - if (_threads.size() >= kMaxThreads) { - LOG(1) << "Not starting new replication networking thread because " << kMaxThreads << - " are already running; " << _numIdleThreads << " threads are idle and " << - _pending.size() << " network requests are waiting for a thread to serve them."; - return; - } - const std::string threadName(str::stream() << "ReplExecNetThread-" << _nextThreadId++); - try { - _threads.push_back( - std::make_shared<stdx::thread>( - stdx::bind(&NetworkInterfaceImpl::_requestProcessorThreadBody, - this, - threadName))); - ++_numIdleThreads; - } - catch (const std::exception& ex) { - error() << "Failed to start " << threadName << "; " << _threads.size() << - " other network threads still running; caught exception: " << ex.what(); - } + if (_threads.size() >= kMaxThreads) { + LOG(1) << "Not starting new replication networking thread because " << kMaxThreads + << " are already running; " << _numIdleThreads << " threads are idle and " + << _pending.size() << " network requests are waiting for a thread to serve them."; + return; } - - void NetworkInterfaceImpl::startup() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(!_inShutdown); - if (!_threads.empty()) { - return; - } - for (size_t i = 0; i < kMinThreads; ++i) { - _startNewNetworkThread_inlock(); - } + const std::string threadName(str::stream() << "ReplExecNetThread-" << _nextThreadId++); + try { + _threads.push_back(std::make_shared<stdx::thread>( + stdx::bind(&NetworkInterfaceImpl::_requestProcessorThreadBody, this, threadName))); + ++_numIdleThreads; + } catch (const std::exception& ex) { + error() << "Failed to start " << threadName << "; " << _threads.size() + << " other network threads still running; caught exception: " << ex.what(); } +} - void NetworkInterfaceImpl::shutdown() { - using std::swap; - stdx::unique_lock<stdx::mutex> lk(_mutex); - _inShutdown = true; - _hasPending.notify_all(); - ThreadList threadsToJoin; - swap(threadsToJoin, _threads); - lk.unlock(); - _commandRunner.shutdown(); - std::for_each(threadsToJoin.begin(), - threadsToJoin.end(), - stdx::bind(&stdx::thread::join, stdx::placeholders::_1)); +void NetworkInterfaceImpl::startup() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(!_inShutdown); + if (!_threads.empty()) { + return; } - - void NetworkInterfaceImpl::signalWorkAvailable() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _signalWorkAvailable_inlock(); + for (size_t i = 0; i < kMinThreads; ++i) { + _startNewNetworkThread_inlock(); } +} - void NetworkInterfaceImpl::_signalWorkAvailable_inlock() { - if (!_isExecutorRunnable) { - _isExecutorRunnable = true; - _isExecutorRunnableCondition.notify_one(); - } +void NetworkInterfaceImpl::shutdown() { + using std::swap; + stdx::unique_lock<stdx::mutex> lk(_mutex); + _inShutdown = true; + _hasPending.notify_all(); + ThreadList threadsToJoin; + swap(threadsToJoin, _threads); + lk.unlock(); + _commandRunner.shutdown(); + std::for_each(threadsToJoin.begin(), + threadsToJoin.end(), + stdx::bind(&stdx::thread::join, stdx::placeholders::_1)); +} + +void NetworkInterfaceImpl::signalWorkAvailable() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _signalWorkAvailable_inlock(); +} + +void NetworkInterfaceImpl::_signalWorkAvailable_inlock() { + if (!_isExecutorRunnable) { + _isExecutorRunnable = true; + _isExecutorRunnableCondition.notify_one(); } +} - void NetworkInterfaceImpl::waitForWork() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (!_isExecutorRunnable) { - _isExecutorRunnableCondition.wait(lk); - } - _isExecutorRunnable = false; +void NetworkInterfaceImpl::waitForWork() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (!_isExecutorRunnable) { + _isExecutorRunnableCondition.wait(lk); } + _isExecutorRunnable = false; +} - void NetworkInterfaceImpl::waitForWorkUntil(Date_t when) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (!_isExecutorRunnable) { - const Milliseconds waitTime(when - now()); - if (waitTime <= Milliseconds(0)) { - break; - } - _isExecutorRunnableCondition.wait_for(lk, waitTime); +void NetworkInterfaceImpl::waitForWorkUntil(Date_t when) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (!_isExecutorRunnable) { + const Milliseconds waitTime(when - now()); + if (waitTime <= Milliseconds(0)) { + break; } - _isExecutorRunnable = false; + _isExecutorRunnableCondition.wait_for(lk, waitTime); } + _isExecutorRunnable = false; +} - void NetworkInterfaceImpl::_requestProcessorThreadBody( - NetworkInterfaceImpl* net, - const std::string& threadName) { - setThreadName(threadName); - LOG(1) << "thread starting"; - net->_consumeNetworkRequests(); +void NetworkInterfaceImpl::_requestProcessorThreadBody(NetworkInterfaceImpl* net, + const std::string& threadName) { + setThreadName(threadName); + LOG(1) << "thread starting"; + net->_consumeNetworkRequests(); - // At this point, another thread may have destroyed "net", if this thread chose to detach - // itself and remove itself from net->_threads before releasing net->_mutex. Do not access - // member variables of "net" from here, on. - LOG(1) << "thread shutting down"; - } + // At this point, another thread may have destroyed "net", if this thread chose to detach + // itself and remove itself from net->_threads before releasing net->_mutex. Do not access + // member variables of "net" from here, on. + LOG(1) << "thread shutting down"; +} - void NetworkInterfaceImpl::_consumeNetworkRequests() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (!_inShutdown) { - if (_pending.empty()) { - if (_threads.size() > kMinThreads) { - const Date_t nowDate = now(); - const Date_t nextThreadRetirementDate = - _lastFullUtilizationDate + kMaxIdleThreadAge; - if (nowDate > nextThreadRetirementDate) { - _lastFullUtilizationDate = nowDate; - break; - } +void NetworkInterfaceImpl::_consumeNetworkRequests() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (!_inShutdown) { + if (_pending.empty()) { + if (_threads.size() > kMinThreads) { + const Date_t nowDate = now(); + const Date_t nextThreadRetirementDate = + _lastFullUtilizationDate + kMaxIdleThreadAge; + if (nowDate > nextThreadRetirementDate) { + _lastFullUtilizationDate = nowDate; + break; } - _hasPending.wait_for(lk, kMaxIdleThreadAge); - continue; } - CommandData todo = _pending.front(); - _pending.pop_front(); - ++_numActiveNetworkRequests; - --_numIdleThreads; - lk.unlock(); - TaskExecutor::ResponseStatus result = _commandRunner.runCommand(todo.request); - LOG(2) << "Network status of sending " << todo.request.cmdObj.firstElementFieldName() << - " to " << todo.request.target << " was " << result.getStatus(); - todo.onFinish(result); - lk.lock(); - --_numActiveNetworkRequests; - ++_numIdleThreads; - _signalWorkAvailable_inlock(); + _hasPending.wait_for(lk, kMaxIdleThreadAge); + continue; } + CommandData todo = _pending.front(); + _pending.pop_front(); + ++_numActiveNetworkRequests; --_numIdleThreads; - if (_inShutdown) { - return; - } - // This thread is ending because it was idle for too long. - // Find self in _threads, remove self from _threads, detach self. - for (size_t i = 0; i < _threads.size(); ++i) { - if (_threads[i]->get_id() != stdx::this_thread::get_id()) { - continue; - } - _threads[i]->detach(); - _threads[i].swap(_threads.back()); - _threads.pop_back(); - return; + lk.unlock(); + TaskExecutor::ResponseStatus result = _commandRunner.runCommand(todo.request); + LOG(2) << "Network status of sending " << todo.request.cmdObj.firstElementFieldName() + << " to " << todo.request.target << " was " << result.getStatus(); + todo.onFinish(result); + lk.lock(); + --_numActiveNetworkRequests; + ++_numIdleThreads; + _signalWorkAvailable_inlock(); + } + --_numIdleThreads; + if (_inShutdown) { + return; + } + // This thread is ending because it was idle for too long. + // Find self in _threads, remove self from _threads, detach self. + for (size_t i = 0; i < _threads.size(); ++i) { + if (_threads[i]->get_id() != stdx::this_thread::get_id()) { + continue; } - severe().stream() << "Could not find this thread, with id " << - stdx::this_thread::get_id() << " in the replication networking thread pool"; - fassertFailedNoTrace(28676); + _threads[i]->detach(); + _threads[i].swap(_threads.back()); + _threads.pop_back(); + return; } + severe().stream() << "Could not find this thread, with id " << stdx::this_thread::get_id() + << " in the replication networking thread pool"; + fassertFailedNoTrace(28676); +} - void NetworkInterfaceImpl::startCommand( - const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { - LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " << - request.target; - stdx::lock_guard<stdx::mutex> lk(_mutex); - _pending.push_back(CommandData()); - CommandData& cd = _pending.back(); - cd.cbHandle = cbHandle; - cd.request = request; - cd.onFinish = onFinish; - if (_numIdleThreads < _pending.size()) { - _startNewNetworkThread_inlock(); - } - if (_numIdleThreads <= _pending.size()) { - _lastFullUtilizationDate = Date_t::now(); - } - _hasPending.notify_one(); +void NetworkInterfaceImpl::startCommand(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) { + LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " << request.target; + stdx::lock_guard<stdx::mutex> lk(_mutex); + _pending.push_back(CommandData()); + CommandData& cd = _pending.back(); + cd.cbHandle = cbHandle; + cd.request = request; + cd.onFinish = onFinish; + if (_numIdleThreads < _pending.size()) { + _startNewNetworkThread_inlock(); + } + if (_numIdleThreads <= _pending.size()) { + _lastFullUtilizationDate = Date_t::now(); } + _hasPending.notify_one(); +} - void NetworkInterfaceImpl::cancelCommand( - const TaskExecutor::CallbackHandle& cbHandle) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - CommandDataList::iterator iter; - for (iter = _pending.begin(); iter != _pending.end(); ++iter) { - if (iter->cbHandle == cbHandle) { - break; - } +void NetworkInterfaceImpl::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + CommandDataList::iterator iter; + for (iter = _pending.begin(); iter != _pending.end(); ++iter) { + if (iter->cbHandle == cbHandle) { + break; } - if (iter == _pending.end()) { - return; - } - const RemoteCommandCompletionFn onFinish = iter->onFinish; - LOG(2) << "Canceled sending " << iter->request.cmdObj.firstElementFieldName() << " to " << - iter->request.target; - _pending.erase(iter); - lk.unlock(); - onFinish(TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled, "Callback canceled")); - lk.lock(); - _signalWorkAvailable_inlock(); } - - Date_t NetworkInterfaceImpl::now() { - return Date_t::now(); + if (iter == _pending.end()) { + return; } + const RemoteCommandCompletionFn onFinish = iter->onFinish; + LOG(2) << "Canceled sending " << iter->request.cmdObj.firstElementFieldName() << " to " + << iter->request.target; + _pending.erase(iter); + lk.unlock(); + onFinish(TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled, "Callback canceled")); + lk.lock(); + _signalWorkAvailable_inlock(); +} + +Date_t NetworkInterfaceImpl::now() { + return Date_t::now(); +} -} // namespace executor -} // namespace mongo +} // namespace executor +} // namespace mongo |