/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include
#include
#include
#include
#include "mongo/client/remote_command_runner_impl.h"
#include "mongo/executor/network_interface.h"
#include "mongo/stdx/list.h"
namespace mongo {
namespace executor {
/**
* Implementation of the network interface for use by classes implementing TaskExecutor
* inside mongod.
*
* This implementation manages a dynamically sized group of worker threads for performing
* network operations. The minimum and maximum number of threads is set at compile time, and
* the exact number of threads is adjusted dynamically, using the following two rules.
*
* 1.) If the number of worker threads is less than the maximum, there are no idle worker
* threads, and the client enqueues a new network operation via startCommand(), the network
* interface spins up a new worker thread. This decision is made on the assumption that
* spinning up a new thread is faster than the round-trip time for processing a remote command,
* and so this will minimize wait time.
*
* 2.) If the number of worker threads has exceeded the the peak number of scheduled outstanding
* network commands continuously for a period of time (kMaxIdleThreadAge), one thread is retired
* from the pool and the monitoring of idle threads is reset. This means that at most one
* thread retires every kMaxIdleThreadAge units of time. The value of kMaxIdleThreadAge is set
* to be much larger than the expected frequency of new requests, averaging out short-duration
* periods of idleness, as occur between heartbeats.
*
* The implementation also manages a pool of network connections to recently contacted remote
* nodes. The size of this pool is not bounded, but connections are retired unconditionally
* after they have been connected for a certain maximum period.
* TODO(spencer): Rename this to ThreadPoolNetworkInterface
*/
class NetworkInterfaceImpl : public NetworkInterface {
public:
NetworkInterfaceImpl();
virtual ~NetworkInterfaceImpl();
virtual std::string getDiagnosticString();
virtual void startup();
virtual void shutdown();
virtual void waitForWork();
virtual void waitForWorkUntil(Date_t when);
virtual void signalWorkAvailable();
virtual Date_t now();
virtual void startCommand(
const repl::ReplicationExecutor::CallbackHandle& cbHandle,
const RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish);
virtual void cancelCommand(const repl::ReplicationExecutor::CallbackHandle& cbHandle);
private:
/**
* Information describing an in-flight command.
*/
struct CommandData {
repl::ReplicationExecutor::CallbackHandle cbHandle;
RemoteCommandRequest request;
RemoteCommandCompletionFn onFinish;
};
typedef stdx::list CommandDataList;
typedef std::vector > ThreadList;
/**
* Thread body for threads that synchronously perform network requests from
* the _pending list.
*/
static void _requestProcessorThreadBody(NetworkInterfaceImpl* net,
const std::string& threadName);
/**
* Run loop that iteratively consumes network requests in a request processor thread.
*/
void _consumeNetworkRequests();
/**
* Notifies the network threads that there is work available.
*/
void _signalWorkAvailable_inlock();
/**
* Starts a new network thread.
*/
void _startNewNetworkThread_inlock();
// Mutex guarding the state of this network interface, except for the remote command
// executor, which has its own concurrency control.
boost::mutex _mutex;
// Condition signaled to indicate that there is work in the _pending queue.
boost::condition_variable _hasPending;
// Queue of yet-to-be-executed network operations.
CommandDataList _pending;
// List of threads serving as the worker pool.
ThreadList _threads;
// Count of idle threads.
size_t _numIdleThreads;
// Id counter for assigning thread names
size_t _nextThreadId;
// The last time that _pending.size() + _numActiveNetworkRequests grew to be at least
// _threads.size().
Date_t _lastFullUtilizationDate;
// Condition signaled to indicate that the executor, blocked in waitForWorkUntil or
// waitForWork, should wake up.
boost::condition_variable _isExecutorRunnableCondition;
// Flag indicating whether or not the executor associated with this interface is runnable.
bool _isExecutorRunnable;
// Flag indicating when this interface is being shut down (because shutdown() has executed).
bool _inShutdown;
// Interface for running remote commands
RemoteCommandRunnerImpl _commandRunner;
// Number of active network requests
size_t _numActiveNetworkRequests;
};
} // namespace executor
} // namespace mongo