path: root/src/mongo/executor/network_interface_impl.cpp
diff options
authorSpencer T Brody <>2015-06-03 16:31:14 -0400
committerSpencer T Brody <>2015-06-04 16:04:27 -0400
commit5f9ae8cbea0bcf2601ca7d9ec8cd4de5beb236eb (patch)
tree64e8ab53d71fe3eea3f2f9c9d01a9d899dccd235 /src/mongo/executor/network_interface_impl.cpp
parent0d3d62e2fb017512aee2ae2be6f128e573a0bf5a (diff)
SERVER-18623 Split NetworkInterface and StorageInterface out from ReplicationExecutor
Diffstat (limited to 'src/mongo/executor/network_interface_impl.cpp')
1 files changed, 278 insertions, 0 deletions
diff --git a/src/mongo/executor/network_interface_impl.cpp b/src/mongo/executor/network_interface_impl.cpp
new file mode 100644
index 00000000000..129c082cd32
--- /dev/null
+++ b/src/mongo/executor/network_interface_impl.cpp
@@ -0,0 +1,278 @@
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor
+#include "mongo/platform/basic.h"
+#include "mongo/executor/network_interface_impl.h"
+#include <boost/make_shared.hpp>
+#include <memory>
+#include "mongo/client/connection_pool.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/time_support.h"
+namespace mongo {
+namespace executor {
+namespace {
+ const size_t kMinThreads = 1;
+ const size_t kMaxThreads = 51; // Set to 1 + max repl set size, for heartbeat + wiggle room.
+ const Seconds kMaxIdleThreadAge(30);
+} // namespace
+ NetworkInterfaceImpl::NetworkInterfaceImpl() :
+ NetworkInterface(),
+ _numIdleThreads(0),
+ _nextThreadId(0),
+ _lastFullUtilizationDate(),
+ _isExecutorRunnable(false),
+ _inShutdown(false),
+ _commandRunner(kMessagingPortKeepOpen),
+ _numActiveNetworkRequests(0) {
+ }
+ NetworkInterfaceImpl::~NetworkInterfaceImpl() { }
+ std::string NetworkInterfaceImpl::getDiagnosticString() {
+ boost::lock_guard<boost::mutex> lk(_mutex);
+ str::stream output;
+ output << "NetworkImpl";
+ output << " threads:" << _threads.size();
+ output << " inShutdown:" << _inShutdown;
+ output << " active:" << _numActiveNetworkRequests;
+ output << " pending:" << _pending.size();
+ output << " execRunable:" << _isExecutorRunnable;
+ return output;
+ }
+ void NetworkInterfaceImpl::_startNewNetworkThread_inlock() {
+ if (_inShutdown) {
+ LOG(1) <<
+ "Not starting new replication networking thread while shutting down replication.";
+ return;
+ }
+ if (_threads.size() >= kMaxThreads) {
+ LOG(1) << "Not starting new replication networking thread because " << kMaxThreads <<
+ " are already running; " << _numIdleThreads << " threads are idle and " <<
+ _pending.size() << " network requests are waiting for a thread to serve them.";
+ return;
+ }
+ const std::string threadName(str::stream() << "ReplExecNetThread-" << _nextThreadId++);
+ try {
+ _threads.push_back(
+ boost::make_shared<boost::thread>(
+ stdx::bind(&NetworkInterfaceImpl::_requestProcessorThreadBody,
+ this,
+ threadName)));
+ ++_numIdleThreads;
+ }
+ catch (const std::exception& ex) {
+ error() << "Failed to start " << threadName << "; " << _threads.size() <<
+ " other network threads still running; caught exception: " << ex.what();
+ }
+ }
+ void NetworkInterfaceImpl::startup() {
+ boost::lock_guard<boost::mutex> lk(_mutex);
+ invariant(!_inShutdown);
+ if (!_threads.empty()) {
+ return;
+ }
+ for (size_t i = 0; i < kMinThreads; ++i) {
+ _startNewNetworkThread_inlock();
+ }
+ }
+ void NetworkInterfaceImpl::shutdown() {
+ using std::swap;
+ boost::unique_lock<boost::mutex> lk(_mutex);
+ _inShutdown = true;
+ _hasPending.notify_all();
+ ThreadList threadsToJoin;
+ swap(threadsToJoin, _threads);
+ lk.unlock();
+ _commandRunner.shutdown();
+ std::for_each(threadsToJoin.begin(),
+ threadsToJoin.end(),
+ stdx::bind(&boost::thread::join, stdx::placeholders::_1));
+ }
+ void NetworkInterfaceImpl::signalWorkAvailable() {
+ boost::lock_guard<boost::mutex> lk(_mutex);
+ _signalWorkAvailable_inlock();
+ }
+ void NetworkInterfaceImpl::_signalWorkAvailable_inlock() {
+ if (!_isExecutorRunnable) {
+ _isExecutorRunnable = true;
+ _isExecutorRunnableCondition.notify_one();
+ }
+ }
+ void NetworkInterfaceImpl::waitForWork() {
+ boost::unique_lock<boost::mutex> lk(_mutex);
+ while (!_isExecutorRunnable) {
+ _isExecutorRunnableCondition.wait(lk);
+ }
+ _isExecutorRunnable = false;
+ }
+ void NetworkInterfaceImpl::waitForWorkUntil(Date_t when) {
+ boost::unique_lock<boost::mutex> lk(_mutex);
+ while (!_isExecutorRunnable) {
+ const Milliseconds waitTime(when - now());
+ if (waitTime <= Milliseconds(0)) {
+ break;
+ }
+ _isExecutorRunnableCondition.wait_for(lk, waitTime);
+ }
+ _isExecutorRunnable = false;
+ }
+ void NetworkInterfaceImpl::_requestProcessorThreadBody(
+ NetworkInterfaceImpl* net,
+ const std::string& threadName) {
+ setThreadName(threadName);
+ LOG(1) << "thread starting";
+ net->_consumeNetworkRequests();
+ // At this point, another thread may have destroyed "net", if this thread chose to detach
+ // itself and remove itself from net->_threads before releasing net->_mutex. Do not access
+ // member variables of "net" from here, on.
+ LOG(1) << "thread shutting down";
+ }
+ void NetworkInterfaceImpl::_consumeNetworkRequests() {
+ boost::unique_lock<boost::mutex> lk(_mutex);
+ while (!_inShutdown) {
+ if (_pending.empty()) {
+ if (_threads.size() > kMinThreads) {
+ const Date_t nowDate = now();
+ const Date_t nextThreadRetirementDate =
+ _lastFullUtilizationDate + kMaxIdleThreadAge;
+ if (nowDate > nextThreadRetirementDate) {
+ _lastFullUtilizationDate = nowDate;
+ break;
+ }
+ }
+ _hasPending.wait_for(lk, kMaxIdleThreadAge);
+ continue;
+ }
+ CommandData todo = _pending.front();
+ _pending.pop_front();
+ ++_numActiveNetworkRequests;
+ --_numIdleThreads;
+ lk.unlock();
+ repl::ResponseStatus result = _commandRunner.runCommand(todo.request);
+ LOG(2) << "Network status of sending " << todo.request.cmdObj.firstElementFieldName() <<
+ " to " << << " was " << result.getStatus();
+ todo.onFinish(result);
+ lk.lock();
+ --_numActiveNetworkRequests;
+ ++_numIdleThreads;
+ _signalWorkAvailable_inlock();
+ }
+ --_numIdleThreads;
+ if (_inShutdown) {
+ return;
+ }
+ // This thread is ending because it was idle for too long.
+ // Find self in _threads, remove self from _threads, detach self.
+ for (size_t i = 0; i < _threads.size(); ++i) {
+ if (_threads[i]->get_id() != stdx::this_thread::get_id()) {
+ continue;
+ }
+ _threads[i]->detach();
+ _threads[i].swap(_threads.back());
+ _threads.pop_back();
+ return;
+ }
+ severe().stream() << "Could not find this thread, with id " <<
+ stdx::this_thread::get_id() << " in the replication networking thread pool";
+ fassertFailedNoTrace(28676);
+ }
+ void NetworkInterfaceImpl::startCommand(
+ const repl::ReplicationExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish) {
+ LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " <<
+ boost::lock_guard<boost::mutex> lk(_mutex);
+ _pending.push_back(CommandData());
+ CommandData& cd = _pending.back();
+ cd.cbHandle = cbHandle;
+ cd.request = request;
+ cd.onFinish = onFinish;
+ if (_numIdleThreads < _pending.size()) {
+ _startNewNetworkThread_inlock();
+ }
+ if (_numIdleThreads <= _pending.size()) {
+ _lastFullUtilizationDate = Date_t::now();
+ }
+ _hasPending.notify_one();
+ }
+ void NetworkInterfaceImpl::cancelCommand(
+ const repl::ReplicationExecutor::CallbackHandle& cbHandle) {
+ boost::unique_lock<boost::mutex> lk(_mutex);
+ CommandDataList::iterator iter;
+ for (iter = _pending.begin(); iter != _pending.end(); ++iter) {
+ if (iter->cbHandle == cbHandle) {
+ break;
+ }
+ }
+ if (iter == _pending.end()) {
+ return;
+ }
+ const RemoteCommandCompletionFn onFinish = iter->onFinish;
+ LOG(2) << "Canceled sending " << iter->request.cmdObj.firstElementFieldName() << " to " <<
+ iter->;
+ _pending.erase(iter);
+ lk.unlock();
+ onFinish(repl::ResponseStatus(ErrorCodes::CallbackCanceled, "Callback canceled"));
+ lk.lock();
+ _signalWorkAvailable_inlock();
+ }
+ Date_t NetworkInterfaceImpl::now() {
+ return Date_t::now();
+ }
+} // namespace executor
+} // namespace mongo