diff options
author | Spencer T Brody <spencer@mongodb.com> | 2015-06-03 16:31:14 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2015-06-04 16:04:27 -0400 |
commit | 5f9ae8cbea0bcf2601ca7d9ec8cd4de5beb236eb (patch) | |
tree | 64e8ab53d71fe3eea3f2f9c9d01a9d899dccd235 /src/mongo/db | |
parent | 0d3d62e2fb017512aee2ae2be6f128e573a0bf5a (diff) | |
download | mongo-5f9ae8cbea0bcf2601ca7d9ec8cd4de5beb236eb.tar.gz |
SERVER-18623 Split NetworkInterface and StorageInterface out from ReplicationExecutor
Diffstat (limited to 'src/mongo/db')
44 files changed, 475 insertions, 1357 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 4b32c6c46fd..c46484860d7 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -625,6 +625,7 @@ serveronlyEnv = env.Clone() serveronlyEnv.InjectThirdPartyIncludePaths(libraries=['snappy']) serveronlyLibdeps = [ "$BUILD_DIR/mongo/client/parallel", + "$BUILD_DIR/mongo/executor/network_interface_impl", "$BUILD_DIR/mongo/s/batch_write_types", "$BUILD_DIR/mongo/s/catalog/legacy/catalog_manager_legacy", "$BUILD_DIR/mongo/s/client/sharding_connection_hook", @@ -650,7 +651,6 @@ serveronlyLibdeps = [ "ops/update_driver", "query/query", "range_deleter", - "repl/network_interface_impl", "repl/repl_coordinator_global", "repl/repl_coordinator_impl", "repl/repl_settings", diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index f430cceb675..e5555bdd33e 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -77,12 +77,12 @@ #include "mongo/db/query/internal_plans.h" #include "mongo/db/range_deleter_service.h" #include "mongo/db/repair_database.h" -#include "mongo/db/repl/network_interface_impl.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_external_state_impl.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/restapi.h" #include "mongo/db/server_parameters.h" @@ -93,6 +93,7 @@ #include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage_options.h" #include "mongo/db/ttl.h" +#include "mongo/executor/network_interface_impl.h" #include "mongo/platform/process_id.h" #include "mongo/scripting/engine.h" #include "mongo/stdx/memory.h" @@ -757,7 +758,8 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, ("SetGlobalEnviro repl::ReplicationCoordinatorImpl* replCoord = new repl::ReplicationCoordinatorImpl( getGlobalReplSettings(), new repl::ReplicationCoordinatorExternalStateImpl, - new repl::NetworkInterfaceImpl, + new executor::NetworkInterfaceImpl{}, + new repl::StorageInterfaceImpl{}, new repl::TopologyCoordinatorImpl(Seconds(repl::maxSyncSourceLagSecs)), static_cast<int64_t>(curTimeMillis64())); repl::setGlobalReplicationCoordinator(replCoord); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 146039a94cf..d041ef49b72 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -15,17 +15,20 @@ env.Library('rslog', '$BUILD_DIR/mongo/logger/logger', ]) +env.Library('storage_interface', + 'storage_interface.cpp', + LIBDEPS=[ + ]) + env.Library( - target='network_interface_impl', + target='storage_interface_impl', source=[ - 'network_interface_impl.cpp', + 'storage_interface_impl.cpp', ], LIBDEPS=[ - '$BUILD_DIR/mongo/base/base', - '$BUILD_DIR/mongo/bson/bson', - '$BUILD_DIR/mongo/client/remote_command_runner_impl', - '$BUILD_DIR/mongo/db/coredb', - '$BUILD_DIR/mongo/db/common', + '$BUILD_DIR/mongo/db/serveronly', # For OperationContextImpl + '$BUILD_DIR/mongo/db/service_context', + 'storage_interface', ]) env.Library( @@ -38,7 +41,9 @@ env.Library( LIBDEPS=[ 'database_task', 'task_runner', + 'storage_interface', '$BUILD_DIR/mongo/client/remote_command_runner', + '$BUILD_DIR/mongo/executor/network_interface', '$BUILD_DIR/mongo/util/foundation', '$BUILD_DIR/mongo/util/net/hostandport', ], @@ -343,16 +348,18 @@ env.Library('repl_coordinator_global', env.Library('replmocks', [ - 'network_interface_mock.cpp', 'operation_context_repl_mock.cpp', 'replication_coordinator_external_state_mock.cpp', 'replication_coordinator_mock.cpp', + 'storage_interface_mock.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/concurrency/lock_manager', + '$BUILD_DIR/mongo/executor/network_interface_mock', 'repl_coordinator_interface', 'replica_set_messages', 'replication_executor', + 'storage_interface', ]) env.Library('read_after_optime_args', diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index 92518498300..33357e702e3 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -106,7 +106,7 @@ namespace repl { ReplicationExecutorTest::setUp(); clear(); launchExecutorThread(); - storageInterface.reset(new StorageInterfaceMock()); + storageInterface.reset(new ClonerStorageInterfaceMock()); } void BaseClonerTest::tearDown() { @@ -245,21 +245,21 @@ namespace repl { ASSERT_FALSE(getCloner()->isActive()); } - Status StorageInterfaceMock::beginCollection(OperationContext* txn, - const NamespaceString& nss, - const CollectionOptions& options, - const std::vector<BSONObj>& specs) { + Status ClonerStorageInterfaceMock::beginCollection(OperationContext* txn, + const NamespaceString& nss, + const CollectionOptions& options, + const std::vector<BSONObj>& specs) { return beginCollectionFn ? beginCollectionFn(txn, nss, options, specs) : Status::OK(); } - Status StorageInterfaceMock::insertDocuments(OperationContext* txn, - const NamespaceString& nss, - const std::vector<BSONObj>& docs) { + Status ClonerStorageInterfaceMock::insertDocuments(OperationContext* txn, + const NamespaceString& nss, + const std::vector<BSONObj>& docs) { return insertDocumentsFn ? insertDocumentsFn(txn, nss, docs) : Status::OK(); } - Status StorageInterfaceMock::commitCollection(OperationContext* txn, - const NamespaceString& nss) { + Status ClonerStorageInterfaceMock::commitCollection(OperationContext* txn, + const NamespaceString& nss) { return Status::OK(); } diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h index 064003bbb6d..dca30fe8ae7 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.h +++ b/src/mongo/db/repl/base_cloner_test_fixture.h @@ -36,8 +36,8 @@ #include "mongo/db/clientcursor.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/collection_cloner.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor_test_fixture.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/util/net/hostandport.h" @@ -50,11 +50,11 @@ namespace mongo { namespace repl { class BaseCloner; - class StorageInterfaceMock; + class ClonerStorageInterfaceMock; class BaseClonerTest : public ReplicationExecutorTest { public: - typedef NetworkInterfaceMock::NetworkOperationIterator NetworkOperationIterator; + typedef executor::NetworkInterfaceMock::NetworkOperationIterator NetworkOperationIterator; /** * Creates an initial error status suitable for checking if @@ -128,7 +128,7 @@ namespace repl { protected: - std::unique_ptr<StorageInterfaceMock> storageInterface; + std::unique_ptr<ClonerStorageInterfaceMock> storageInterface; private: @@ -141,7 +141,7 @@ namespace repl { }; - class StorageInterfaceMock : public CollectionCloner::StorageInterface { + class ClonerStorageInterfaceMock : public CollectionCloner::StorageInterface { public: Status beginCollection(OperationContext* txn, const NamespaceString& nss, diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp index bc692b43288..63fe5684aa9 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp @@ -36,11 +36,12 @@ #include "mongo/base/status.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/check_quorum_for_config_change.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/platform/unordered_set.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" @@ -62,6 +63,8 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; + class CheckQuorumTest : public mongo::unittest::Test { protected: CheckQuorumTest(); @@ -71,6 +74,7 @@ namespace { bool isQuorumCheckDone(); NetworkInterfaceMock* _net; + StorageInterfaceMock* _storage; boost::scoped_ptr<ReplicationExecutor> _executor; private: @@ -93,7 +97,8 @@ namespace { void CheckQuorumTest::setUp() { _net = new NetworkInterfaceMock; - _executor.reset(new ReplicationExecutor(_net, 1 /* prng */ )); + _storage = new StorageInterfaceMock; + _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng */ )); _executorThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); } diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index a6841ad2467..9b6927c4b43 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -35,7 +35,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/base_cloner_test_fixture.h" #include "mongo/db/repl/collection_cloner.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/unittest/unittest.h" namespace { diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 8fc727c7b38..6aef8263679 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -33,7 +33,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/data_replicator.h" #include "mongo/db/repl/fetcher.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_external_state_mock.h" @@ -41,6 +40,7 @@ #include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/util/fail_point_service.h" #include "mongo/unittest/unittest.h" @@ -48,6 +48,7 @@ namespace { using namespace mongo; using namespace mongo::repl; + using executor::NetworkInterfaceMock; const HostAndPort target("localhost", -1); diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp index 22e0e452f3d..eb305fbd0fa 100644 --- a/src/mongo/db/repl/database_cloner_test.cpp +++ b/src/mongo/db/repl/database_cloner_test.cpp @@ -35,7 +35,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/base_cloner_test_fixture.h" #include "mongo/db/repl/database_cloner.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/unittest/unittest.h" namespace { diff --git a/src/mongo/db/repl/elect_cmd_runner_test.cpp b/src/mongo/db/repl/elect_cmd_runner_test.cpp index f4d4c913bf1..2da5db3e98b 100644 --- a/src/mongo/db/repl/elect_cmd_runner_test.cpp +++ b/src/mongo/db/repl/elect_cmd_runner_test.cpp @@ -33,11 +33,12 @@ #include "mongo/base/status.h" #include "mongo/db/jsobj.h" +#include "mongo/db/repl/elect_cmd_runner.h" #include "mongo/db/repl/member_heartbeat_data.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/elect_cmd_runner.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" @@ -48,6 +49,8 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; + class ElectCmdRunnerTest : public mongo::unittest::Test { public: void startTest(ElectCmdRunner* electCmdRunner, @@ -65,6 +68,7 @@ namespace { const std::vector<HostAndPort>& hosts); NetworkInterfaceMock* _net; + StorageInterfaceMock* _storage; boost::scoped_ptr<ReplicationExecutor> _executor; boost::scoped_ptr<boost::thread> _executorThread; @@ -77,7 +81,8 @@ namespace { void ElectCmdRunnerTest::setUp() { _net = new NetworkInterfaceMock; - _executor.reset(new ReplicationExecutor(_net, 1 /* prng seed */)); + _storage = new StorageInterfaceMock; + _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng seed */)); _executorThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); } diff --git a/src/mongo/db/repl/election_winner_declarer_test.cpp b/src/mongo/db/repl/election_winner_declarer_test.cpp index bf006dd4355..1b264e26b56 100644 --- a/src/mongo/db/repl/election_winner_declarer_test.cpp +++ b/src/mongo/db/repl/election_winner_declarer_test.cpp @@ -34,8 +34,8 @@ #include "mongo/base/status.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/election_winner_declarer.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" #include "mongo/util/mongoutils/str.h" @@ -46,6 +46,7 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; using unittest::assertGet; bool stringContains(const std::string &haystack, const std::string& needle) { diff --git a/src/mongo/db/repl/fetcher_test.cpp b/src/mongo/db/repl/fetcher_test.cpp index dde20c44fbf..85a693dd48f 100644 --- a/src/mongo/db/repl/fetcher_test.cpp +++ b/src/mongo/db/repl/fetcher_test.cpp @@ -32,9 +32,9 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/fetcher.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/replication_executor_test_fixture.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" @@ -42,6 +42,7 @@ namespace { using namespace mongo; using namespace mongo::repl; + using executor::NetworkInterfaceMock; const HostAndPort target("localhost", -1); const BSONObj findCmdObj = BSON("find" << "coll"); diff --git a/src/mongo/db/repl/freshness_checker_test.cpp b/src/mongo/db/repl/freshness_checker_test.cpp index 6f2e3ab9a85..c271ae0c371 100644 --- a/src/mongo/db/repl/freshness_checker_test.cpp +++ b/src/mongo/db/repl/freshness_checker_test.cpp @@ -33,11 +33,12 @@ #include "mongo/base/status.h" #include "mongo/db/jsobj.h" +#include "mongo/db/repl/freshness_checker.h" #include "mongo/db/repl/member_heartbeat_data.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/freshness_checker.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/platform/unordered_set.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" @@ -49,6 +50,7 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; using unittest::assertGet; bool stringContains(const std::string &haystack, const std::string& needle) { @@ -73,6 +75,7 @@ namespace { } NetworkInterfaceMock* _net; + StorageInterfaceMock* _storage; boost::scoped_ptr<ReplicationExecutor> _executor; boost::scoped_ptr<boost::thread> _executorThread; @@ -91,7 +94,8 @@ namespace { void FreshnessCheckerTest::setUp() { _net = new NetworkInterfaceMock; - _executor.reset(new ReplicationExecutor(_net, 1 /* prng seed */)); + _storage = new StorageInterfaceMock; + _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng seed */)); _executorThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); _checker.reset(new FreshnessChecker); diff --git a/src/mongo/db/repl/network_interface_impl.cpp b/src/mongo/db/repl/network_interface_impl.cpp deleted file mode 100644 index 9e76f27a8be..00000000000 --- a/src/mongo/db/repl/network_interface_impl.cpp +++ /dev/null @@ -1,287 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/network_interface_impl.h" - -#include <boost/make_shared.hpp> -#include <memory> - -#include "mongo/db/auth/authorization_session.h" -#include "mongo/client/connection_pool.h" -#include "mongo/db/client.h" -#include "mongo/db/operation_context_impl.h" -#include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/stdx/thread.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/log.h" -#include "mongo/util/time_support.h" - -namespace mongo { -namespace repl { - -namespace { - - const size_t kMinThreads = 1; - const size_t kMaxThreads = 51; // Set to 1 + max repl set size, for heartbeat + wiggle room. - const Seconds kMaxIdleThreadAge(30); - -} // namespace - - NetworkInterfaceImpl::NetworkInterfaceImpl() : - _numIdleThreads(0), - _nextThreadId(0), - _lastFullUtilizationDate(), - _isExecutorRunnable(false), - _inShutdown(false), - _commandRunner(kMessagingPortKeepOpen), - _numActiveNetworkRequests(0) { - - } - - NetworkInterfaceImpl::~NetworkInterfaceImpl() { } - - std::string NetworkInterfaceImpl::getDiagnosticString() { - boost::lock_guard<boost::mutex> lk(_mutex); - str::stream output; - output << "NetworkImpl"; - output << " threads:" << _threads.size(); - output << " inShutdown:" << _inShutdown; - output << " active:" << _numActiveNetworkRequests; - output << " pending:" << _pending.size(); - output << " execRunable:" << _isExecutorRunnable; - return output; - - } - - void NetworkInterfaceImpl::_startNewNetworkThread_inlock() { - if (_inShutdown) { - LOG(1) << - "Not starting new replication networking thread while shutting down replication."; - return; - } - if (_threads.size() >= kMaxThreads) { - LOG(1) << "Not starting new replication networking thread because " << kMaxThreads << - " are already running; " << _numIdleThreads << " threads are idle and " << - _pending.size() << " network requests are waiting for a thread to serve them."; - return; - } - const std::string threadName(str::stream() << "ReplExecNetThread-" << _nextThreadId++); - try { - _threads.push_back( - boost::make_shared<boost::thread>( - stdx::bind(&NetworkInterfaceImpl::_requestProcessorThreadBody, - this, - threadName))); - ++_numIdleThreads; - } - catch (const std::exception& ex) { - error() << "Failed to start " << threadName << "; " << _threads.size() << - " other network threads still running; caught exception: " << ex.what(); - } - } - - void NetworkInterfaceImpl::startup() { - boost::lock_guard<boost::mutex> lk(_mutex); - invariant(!_inShutdown); - if (!_threads.empty()) { - return; - } - for (size_t i = 0; i < kMinThreads; ++i) { - _startNewNetworkThread_inlock(); - } - } - - void NetworkInterfaceImpl::shutdown() { - using std::swap; - boost::unique_lock<boost::mutex> lk(_mutex); - _inShutdown = true; - _hasPending.notify_all(); - ThreadList threadsToJoin; - swap(threadsToJoin, _threads); - lk.unlock(); - _commandRunner.shutdown(); - std::for_each(threadsToJoin.begin(), - threadsToJoin.end(), - stdx::bind(&boost::thread::join, stdx::placeholders::_1)); - } - - void NetworkInterfaceImpl::signalWorkAvailable() { - boost::lock_guard<boost::mutex> lk(_mutex); - _signalWorkAvailable_inlock(); - } - - void NetworkInterfaceImpl::_signalWorkAvailable_inlock() { - if (!_isExecutorRunnable) { - _isExecutorRunnable = true; - _isExecutorRunnableCondition.notify_one(); - } - } - - void NetworkInterfaceImpl::waitForWork() { - boost::unique_lock<boost::mutex> lk(_mutex); - while (!_isExecutorRunnable) { - _isExecutorRunnableCondition.wait(lk); - } - _isExecutorRunnable = false; - } - - void NetworkInterfaceImpl::waitForWorkUntil(Date_t when) { - boost::unique_lock<boost::mutex> lk(_mutex); - while (!_isExecutorRunnable) { - const Milliseconds waitTime(when - now()); - if (waitTime <= Milliseconds(0)) { - break; - } - _isExecutorRunnableCondition.wait_for(lk, waitTime); - } - _isExecutorRunnable = false; - } - - void NetworkInterfaceImpl::_requestProcessorThreadBody( - NetworkInterfaceImpl* net, - const std::string& threadName) { - setThreadName(threadName); - LOG(1) << "thread starting"; - net->_consumeNetworkRequests(); - - // At this point, another thread may have destroyed "net", if this thread chose to detach - // itself and remove itself from net->_threads before releasing net->_mutex. Do not access - // member variables of "net" from here, on. - LOG(1) << "thread shutting down"; - } - - void NetworkInterfaceImpl::_consumeNetworkRequests() { - boost::unique_lock<boost::mutex> lk(_mutex); - while (!_inShutdown) { - if (_pending.empty()) { - if (_threads.size() > kMinThreads) { - const Date_t nowDate = now(); - const Date_t nextThreadRetirementDate = - _lastFullUtilizationDate + kMaxIdleThreadAge; - if (nowDate > nextThreadRetirementDate) { - _lastFullUtilizationDate = nowDate; - break; - } - } - _hasPending.wait_for(lk, kMaxIdleThreadAge); - continue; - } - CommandData todo = _pending.front(); - _pending.pop_front(); - ++_numActiveNetworkRequests; - --_numIdleThreads; - lk.unlock(); - ResponseStatus result = _commandRunner.runCommand(todo.request); - LOG(2) << "Network status of sending " << todo.request.cmdObj.firstElementFieldName() << - " to " << todo.request.target << " was " << result.getStatus(); - todo.onFinish(result); - lk.lock(); - --_numActiveNetworkRequests; - ++_numIdleThreads; - _signalWorkAvailable_inlock(); - } - --_numIdleThreads; - if (_inShutdown) { - return; - } - // This thread is ending because it was idle for too long. - // Find self in _threads, remove self from _threads, detach self. - for (size_t i = 0; i < _threads.size(); ++i) { - if (_threads[i]->get_id() != stdx::this_thread::get_id()) { - continue; - } - _threads[i]->detach(); - _threads[i].swap(_threads.back()); - _threads.pop_back(); - return; - } - severe().stream() << "Could not find this thread, with id " << - stdx::this_thread::get_id() << " in the replication networking thread pool"; - fassertFailedNoTrace(28581); - } - - void NetworkInterfaceImpl::startCommand( - const ReplicationExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { - LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " << - request.target; - boost::lock_guard<boost::mutex> lk(_mutex); - _pending.push_back(CommandData()); - CommandData& cd = _pending.back(); - cd.cbHandle = cbHandle; - cd.request = request; - cd.onFinish = onFinish; - if (_numIdleThreads < _pending.size()) { - _startNewNetworkThread_inlock(); - } - if (_numIdleThreads <= _pending.size()) { - _lastFullUtilizationDate = Date_t::now(); - } - _hasPending.notify_one(); - } - - void NetworkInterfaceImpl::cancelCommand(const ReplicationExecutor::CallbackHandle& cbHandle) { - boost::unique_lock<boost::mutex> lk(_mutex); - CommandDataList::iterator iter; - for (iter = _pending.begin(); iter != _pending.end(); ++iter) { - if (iter->cbHandle == cbHandle) { - break; - } - } - if (iter == _pending.end()) { - return; - } - const RemoteCommandCompletionFn onFinish = iter->onFinish; - LOG(2) << "Canceled sending " << iter->request.cmdObj.firstElementFieldName() << " to " << - iter->request.target; - _pending.erase(iter); - lk.unlock(); - onFinish(ResponseStatus(ErrorCodes::CallbackCanceled, "Callback canceled")); - lk.lock(); - _signalWorkAvailable_inlock(); - } - - Date_t NetworkInterfaceImpl::now() { - return Date_t::now(); - } - - OperationContext* NetworkInterfaceImpl::createOperationContext() { - if (!ClientBasic::getCurrent()) { - Client::initThreadIfNotAlready(); - AuthorizationSession::get(*ClientBasic::getCurrent())->grantInternalAuthorization(); - } - return new OperationContextImpl(); - } - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/network_interface_impl.h b/src/mongo/db/repl/network_interface_impl.h deleted file mode 100644 index 4350f7c000a..00000000000 --- a/src/mongo/db/repl/network_interface_impl.h +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - - -#pragma once - -#include <boost/shared_ptr.hpp> -#include <boost/thread.hpp> -#include <boost/thread/condition_variable.hpp> -#include <boost/thread/mutex.hpp> -#include <vector> - -#include "mongo/client/remote_command_runner_impl.h" -#include "mongo/db/repl/replication_executor.h" -#include "mongo/stdx/list.h" - -namespace mongo { -namespace repl { - - /** - * Implementation of the network interface used by the ReplicationExecutor inside mongod. - * - * This implementation manages a dynamically sized group of worker threads for performing - * network operations. The minimum and maximum number of threads is set at compile time, and - * the exact number of threads is adjusted dynamically, using the following two rules. - * - * 1.) If the number of worker threads is less than the maximum, there are no idle worker - * threads, and the client enqueues a new network operation via startCommand(), the network - * interface spins up a new worker thread. This decision is made on the assumption that - * spinning up a new thread is faster than the round-trip time for processing a remote command, - * and so this will minimize wait time. - * - * 2.) If the number of worker threads has exceeded the the peak number of scheduled outstanding - * network commands continuously for a period of time (kMaxIdleThreadAge), one thread is retired - * from the pool and the monitoring of idle threads is reset. This means that at most one - * thread retires every kMaxIdleThreadAge units of time. The value of kMaxIdleThreadAge is set - * to be much larger than the expected frequency of new requests, averaging out short-duration - * periods of idleness, as occur between heartbeats. - * - * The implementation also manages a pool of network connections to recently contacted remote - * nodes. The size of this pool is not bounded, but connections are retired unconditionally - * after they have been connected for a certain maximum period. - */ - class NetworkInterfaceImpl : public ReplicationExecutor::NetworkInterface { - public: - explicit NetworkInterfaceImpl(); - virtual ~NetworkInterfaceImpl(); - virtual std::string getDiagnosticString(); - virtual void startup(); - virtual void shutdown(); - virtual void waitForWork(); - virtual void waitForWorkUntil(Date_t when); - virtual void signalWorkAvailable(); - virtual Date_t now(); - virtual void startCommand( - const ReplicationExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish); - virtual void cancelCommand(const ReplicationExecutor::CallbackHandle& cbHandle); - OperationContext* createOperationContext() override; - - std::string getNextCallbackWithGlobalLockThreadName(); - - private: - /** - * Information describing an in-flight command. - */ - struct CommandData { - ReplicationExecutor::CallbackHandle cbHandle; - RemoteCommandRequest request; - RemoteCommandCompletionFn onFinish; - }; - typedef stdx::list<CommandData> CommandDataList; - typedef std::vector<boost::shared_ptr<boost::thread> > ThreadList; - - /** - * Thread body for threads that synchronously perform network requests from - * the _pending list. - */ - static void _requestProcessorThreadBody(NetworkInterfaceImpl* net, - const std::string& threadName); - - /** - * Run loop that iteratively consumes network requests in a request processor thread. - */ - void _consumeNetworkRequests(); - - /** - * Notifies the network threads that there is work available. - */ - void _signalWorkAvailable_inlock(); - - /** - * Starts a new network thread. - */ - void _startNewNetworkThread_inlock(); - - // Mutex guarding the state of this network interface, except for the remote command - // executor, which has its own concurrency control. - boost::mutex _mutex; - - // Condition signaled to indicate that there is work in the _pending queue. - boost::condition_variable _hasPending; - - // Queue of yet-to-be-executed network operations. - CommandDataList _pending; - - // List of threads serving as the worker pool. - ThreadList _threads; - - // Count of idle threads. - size_t _numIdleThreads; - - // Id counter for assigning thread names - size_t _nextThreadId; - - // The last time that _pending.size() + _numActiveNetworkRequests grew to be at least - // _threads.size(). - Date_t _lastFullUtilizationDate; - - // Condition signaled to indicate that the executor, blocked in waitForWorkUntil or - // waitForWork, should wake up. - boost::condition_variable _isExecutorRunnableCondition; - - // Flag indicating whether or not the executor associated with this interface is runnable. - bool _isExecutorRunnable; - - // Flag indicating when this interface is being shut down (because shutdown() has executed). - bool _inShutdown; - - // Interface for running remote commands - RemoteCommandRunnerImpl _commandRunner; - - // Number of active network requests - size_t _numActiveNetworkRequests; - }; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/network_interface_mock.cpp b/src/mongo/db/repl/network_interface_mock.cpp deleted file mode 100644 index 80263365eba..00000000000 --- a/src/mongo/db/repl/network_interface_mock.cpp +++ /dev/null @@ -1,405 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/network_interface_mock.h" - -#include "mongo/db/repl/operation_context_repl_mock.h" -#include "mongo/db/repl/replication_executor.h" -#include "mongo/stdx/functional.h" -#include "mongo/util/time_support.h" - -namespace mongo { -namespace repl { - - NetworkInterfaceMock::NetworkInterfaceMock() - : _waitingToRunMask(0), - _currentlyRunning(kNoThread), - _now(fassertStatusOK(18653, dateFromISOString("2014-08-01T00:00:00Z"))), - _hasStarted(false), - _inShutdown(false), - _executorNextWakeupDate(Date_t::max()) { - } - - NetworkInterfaceMock::~NetworkInterfaceMock() { - boost::unique_lock<boost::mutex> lk(_mutex); - invariant(!_hasStarted || _inShutdown); - invariant(_scheduled.empty()); - invariant(_blackHoled.empty()); - } - - std::string NetworkInterfaceMock::getDiagnosticString() { - // TODO something better. - return "NetworkInterfaceMock diagnostics here"; - } - - Date_t NetworkInterfaceMock::now() { - boost::lock_guard<boost::mutex> lk(_mutex); - return _now_inlock(); - } - - OperationContext* NetworkInterfaceMock::createOperationContext() { - return new OperationContextReplMock(); - } - - void NetworkInterfaceMock::startCommand( - const ReplicationExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { - - boost::lock_guard<boost::mutex> lk(_mutex); - invariant(!_inShutdown); - const Date_t now = _now_inlock(); - NetworkOperationIterator insertBefore = _unscheduled.begin(); - while ((insertBefore != _unscheduled.end()) && - (insertBefore->getNextConsiderationDate() <= now)) { - - ++insertBefore; - } - _unscheduled.insert(insertBefore, NetworkOperation(cbHandle, request, now, onFinish)); - } - - static bool findAndCancelIf( - const stdx::function<bool (const NetworkInterfaceMock::NetworkOperation&)>& matchFn, - NetworkInterfaceMock::NetworkOperationList* other, - NetworkInterfaceMock::NetworkOperationList* scheduled, - const Date_t now) { - const NetworkInterfaceMock::NetworkOperationIterator noi = - std::find_if(other->begin(), other->end(), matchFn); - if (noi == other->end()) { - return false; - } - scheduled->splice(scheduled->begin(), *other, noi); - noi->setResponse(now, ResponseStatus(ErrorCodes::CallbackCanceled, - "Network operation canceled")); - return true; - } - - void NetworkInterfaceMock::cancelCommand( - const ReplicationExecutor::CallbackHandle& cbHandle) { - boost::lock_guard<boost::mutex> lk(_mutex); - invariant(!_inShutdown); - stdx::function<bool (const NetworkOperation&)> matchesHandle = stdx::bind( - &NetworkOperation::isForCallback, - stdx::placeholders::_1, - cbHandle); - const Date_t now = _now_inlock(); - if (findAndCancelIf(matchesHandle, &_unscheduled, &_scheduled, now)) { - return; - } - if (findAndCancelIf(matchesHandle, &_blackHoled, &_scheduled, now)) { - return; - } - if (findAndCancelIf(matchesHandle, &_scheduled, &_scheduled, now)) { - return; - } - // No not-in-progress network command matched cbHandle. Oh, well. - } - - void NetworkInterfaceMock::startup() { - boost::lock_guard<boost::mutex> lk(_mutex); - invariant(!_hasStarted); - _hasStarted = true; - _inShutdown = false; - invariant(_currentlyRunning == kNoThread); - _currentlyRunning = kExecutorThread; - } - - void NetworkInterfaceMock::shutdown() { - boost::unique_lock<boost::mutex> lk(_mutex); - invariant(_hasStarted); - invariant(!_inShutdown); - _inShutdown = true; - NetworkOperationList todo; - todo.splice(todo.end(), _scheduled); - todo.splice(todo.end(), _unscheduled); - todo.splice(todo.end(), _processing); - todo.splice(todo.end(), _blackHoled); - - const Date_t now = _now_inlock(); - _waitingToRunMask |= kExecutorThread; // Prevents network thread from scheduling. - lk.unlock(); - for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) { - iter->setResponse(now, ResponseStatus(ErrorCodes::ShutdownInProgress, - "Shutting down mock network")); - iter->finishResponse(); - } - lk.lock(); - invariant(_currentlyRunning == kExecutorThread); - _currentlyRunning = kNoThread; - _waitingToRunMask = kNetworkThread; - _shouldWakeNetworkCondition.notify_one(); - } - - void NetworkInterfaceMock::enterNetwork() { - boost::unique_lock<boost::mutex> lk(_mutex); - while (!_isNetworkThreadRunnable_inlock()) { - _shouldWakeNetworkCondition.wait(lk); - } - _currentlyRunning = kNetworkThread; - _waitingToRunMask &= ~kNetworkThread; - } - - void NetworkInterfaceMock::exitNetwork() { - boost::lock_guard<boost::mutex> lk(_mutex); - if (_currentlyRunning != kNetworkThread) { - return; - } - _currentlyRunning = kNoThread; - if (_isExecutorThreadRunnable_inlock()) { - _shouldWakeExecutorCondition.notify_one(); - } - _waitingToRunMask |= kNetworkThread; - } - - bool NetworkInterfaceMock::hasReadyRequests() { - boost::lock_guard<boost::mutex> lk(_mutex); - invariant(_currentlyRunning == kNetworkThread); - return _hasReadyRequests_inlock(); - } - - bool NetworkInterfaceMock::_hasReadyRequests_inlock() { - if (_unscheduled.empty()) - return false; - if (_unscheduled.front().getNextConsiderationDate() > _now_inlock()) { - return false; - } - return true; - } - - NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNextReadyRequest() { - boost::unique_lock<boost::mutex> lk(_mutex); - invariant(_currentlyRunning == kNetworkThread); - while (!_hasReadyRequests_inlock()) { - _waitingToRunMask |= kExecutorThread; - _runReadyNetworkOperations_inlock(&lk); - } - invariant(_hasReadyRequests_inlock()); - _processing.splice(_processing.begin(), _unscheduled, _unscheduled.begin()); - return _processing.begin(); - } - - void NetworkInterfaceMock::scheduleResponse( - NetworkOperationIterator noi, - Date_t when, - const ResponseStatus& response) { - - boost::lock_guard<boost::mutex> lk(_mutex); - invariant(_currentlyRunning == kNetworkThread); - NetworkOperationIterator insertBefore = _scheduled.begin(); - while ((insertBefore != _scheduled.end()) && (insertBefore->getResponseDate() <= when)) { - ++insertBefore; - } - noi->setResponse(when, response); - _scheduled.splice(insertBefore, _processing, noi); - } - - void NetworkInterfaceMock::blackHole(NetworkOperationIterator noi) { - boost::lock_guard<boost::mutex> lk(_mutex); - invariant(_currentlyRunning == kNetworkThread); - _blackHoled.splice(_blackHoled.end(), _processing, noi); - } - - void NetworkInterfaceMock::requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil) { - boost::lock_guard<boost::mutex> lk(_mutex); - invariant(_currentlyRunning == kNetworkThread); - invariant(noi->getNextConsiderationDate() < dontAskUntil); - invariant(_now_inlock() < dontAskUntil); - NetworkOperationIterator insertBefore = _unscheduled.begin(); - for (; insertBefore != _unscheduled.end(); ++insertBefore) { - if (insertBefore->getNextConsiderationDate() >= dontAskUntil) { - break; - } - } - noi->setNextConsiderationDate(dontAskUntil); - _unscheduled.splice(insertBefore, _processing, noi); - } - - void NetworkInterfaceMock::runUntil(Date_t until) { - boost::unique_lock<boost::mutex> lk(_mutex); - invariant(_currentlyRunning == kNetworkThread); - invariant(until > _now_inlock()); - while (until > _now_inlock()) { - _runReadyNetworkOperations_inlock(&lk); - if (_hasReadyRequests_inlock()) { - break; - } - Date_t newNow = _executorNextWakeupDate; - if (!_scheduled.empty() && _scheduled.front().getResponseDate() < newNow) { - newNow = _scheduled.front().getResponseDate(); - } - if (until < newNow) { - newNow = until; - } - invariant(_now_inlock() <= newNow); - _now = newNow; - _waitingToRunMask |= kExecutorThread; - } - _runReadyNetworkOperations_inlock(&lk); - } - - void NetworkInterfaceMock::runReadyNetworkOperations() { - boost::unique_lock<boost::mutex> lk(_mutex); - invariant(_currentlyRunning == kNetworkThread); - _runReadyNetworkOperations_inlock(&lk); - } - - void NetworkInterfaceMock::waitForWork() { - boost::unique_lock<boost::mutex> lk(_mutex); - invariant(_currentlyRunning == kExecutorThread); - _waitForWork_inlock(&lk); - } - - void NetworkInterfaceMock::waitForWorkUntil(Date_t when) { - boost::unique_lock<boost::mutex> lk(_mutex); - invariant(_currentlyRunning == kExecutorThread); - _executorNextWakeupDate = when; - if (_executorNextWakeupDate <= _now_inlock()) { - return; - } - _waitForWork_inlock(&lk); - } - - void NetworkInterfaceMock::signalWorkAvailable() { - boost::lock_guard<boost::mutex> lk(_mutex); - _waitingToRunMask |= kExecutorThread; - if (_currentlyRunning == kNoThread) { - _shouldWakeExecutorCondition.notify_one(); - } - } - - void NetworkInterfaceMock::_runReadyNetworkOperations_inlock( - boost::unique_lock<boost::mutex>* lk) { - while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) { - invariant(_currentlyRunning == kNetworkThread); - NetworkOperation op = _scheduled.front(); - _scheduled.pop_front(); - _waitingToRunMask |= kExecutorThread; - lk->unlock(); - op.finishResponse(); - lk->lock(); - } - invariant(_currentlyRunning == kNetworkThread); - if (!(_waitingToRunMask & kExecutorThread)) { - return; - } - _shouldWakeExecutorCondition.notify_one(); - _currentlyRunning = kNoThread; - while (!_isNetworkThreadRunnable_inlock()) { - _shouldWakeNetworkCondition.wait(*lk); - } - _currentlyRunning = kNetworkThread; - _waitingToRunMask &= ~kNetworkThread; - } - - void NetworkInterfaceMock::_waitForWork_inlock(boost::unique_lock<boost::mutex>* lk) { - if (_waitingToRunMask & kExecutorThread) { - _waitingToRunMask &= ~kExecutorThread; - return; - } - _currentlyRunning = kNoThread; - while (!_isExecutorThreadRunnable_inlock()) { - _waitingToRunMask |= kNetworkThread; - _shouldWakeNetworkCondition.notify_one(); - _shouldWakeExecutorCondition.wait(*lk); - } - _currentlyRunning = kExecutorThread; - _waitingToRunMask &= ~kExecutorThread; - } - - bool NetworkInterfaceMock::_isNetworkThreadRunnable_inlock() { - if (_currentlyRunning != kNoThread) { - return false; - } - if (_waitingToRunMask != kNetworkThread) { - return false; - } - return true; - } - - bool NetworkInterfaceMock::_isExecutorThreadRunnable_inlock() { - if (_currentlyRunning != kNoThread) { - return false; - } - return _waitingToRunMask & kExecutorThread; - } - - static const StatusWith<RemoteCommandResponse> kUnsetResponse( - ErrorCodes::InternalError, - "NetworkOperation::_response never set"); - - NetworkInterfaceMock::NetworkOperation::NetworkOperation() - : _requestDate(), - _nextConsiderationDate(), - _responseDate(), - _request(), - _response(kUnsetResponse), - _onFinish() { - } - - NetworkInterfaceMock::NetworkOperation::NetworkOperation( - const ReplicationExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& theRequest, - Date_t theRequestDate, - const RemoteCommandCompletionFn& onFinish) - : _requestDate(theRequestDate), - _nextConsiderationDate(theRequestDate), - _responseDate(), - _cbHandle(cbHandle), - _request(theRequest), - _response(kUnsetResponse), - _onFinish(onFinish) { - } - - NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {} - - void NetworkInterfaceMock::NetworkOperation::setNextConsiderationDate( - Date_t nextConsiderationDate) { - - invariant(nextConsiderationDate > _nextConsiderationDate); - _nextConsiderationDate = nextConsiderationDate; - } - - void NetworkInterfaceMock::NetworkOperation::setResponse( - Date_t responseDate, - const ResponseStatus& response) { - - invariant(responseDate >= _requestDate); - _responseDate = responseDate; - _response = response; - } - - void NetworkInterfaceMock::NetworkOperation::finishResponse() { - invariant(_onFinish); - _onFinish(_response); - _onFinish = RemoteCommandCompletionFn(); - } - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/network_interface_mock.h b/src/mongo/db/repl/network_interface_mock.h deleted file mode 100644 index 255eb7efec7..00000000000 --- a/src/mongo/db/repl/network_interface_mock.h +++ /dev/null @@ -1,328 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <boost/thread/mutex.hpp> -#include <boost/thread/condition_variable.hpp> -#include <map> - -#include "mongo/db/repl/replication_executor.h" -#include "mongo/util/time_support.h" - -namespace mongo { -namespace repl { - - /** - * Mock network implementation for use in unit tests. - * - * To use, construct a new instance on the heap, and keep a pointer to it. Pass - * the pointer to the instance into the ReplicationExecutor constructor, transfering - * ownership. Start the executor's run() method in a separate thread, schedule the - * work you want to test into the executor, then while the test is still going, iterate - * through the ready network requests, servicing them and advancing time as needed. - * - * The mock has a fully virtualized notion of time and the the network. When the - * replication executor under test schedules a network operation, the startCommand - * method of this class adds an entry to the _unscheduled queue for immediate consideration. - * The test driver loop, when it examines the request, may schedule a response, ask the - * interface to redeliver the request at a later virtual time, or to swallow the virtual - * request until the end of the simulation. The test driver loop can also instruct the - * interface to run forward through virtual time until there are operations ready to - * consider, via runUntil. - * - * The thread acting as the "network" and the executor run thread are highly synchronized - * by this code, allowing for deterministic control of operation interleaving. - */ - class NetworkInterfaceMock : public ReplicationExecutor::NetworkInterface { - public: - class NetworkOperation; - typedef stdx::list<NetworkOperation> NetworkOperationList; - typedef NetworkOperationList::iterator NetworkOperationIterator; - - NetworkInterfaceMock(); - virtual ~NetworkInterfaceMock(); - virtual std::string getDiagnosticString(); - - //////////////////////////////////////////////////////////////////////////////// - // - // ReplicationExecutor::NetworkInterface methods - // - //////////////////////////////////////////////////////////////////////////////// - - virtual void startup(); - virtual void shutdown(); - virtual void waitForWork(); - virtual void waitForWorkUntil(Date_t when); - virtual void signalWorkAvailable(); - virtual Date_t now(); - virtual void startCommand(const ReplicationExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish); - virtual void cancelCommand(const ReplicationExecutor::CallbackHandle& cbHandle); - OperationContext* createOperationContext() override; - - - //////////////////////////////////////////////////////////////////////////////// - // - // Methods for simulating network operations and the passage of time. - // - // Methods in this section are to be called by the thread currently simulating - // the network. - // - //////////////////////////////////////////////////////////////////////////////// - - /** - * Causes the currently running (non-executor) thread to assume the mantle of the network - * simulation thread. - * - * Call this before calling any of the other methods in this section. - */ - void enterNetwork(); - - /** - * Causes the currently running thread to drop the mantle of "network simulation thread". - * - * Call this before calling any methods that might block waiting for the replciation - * executor thread. - */ - void exitNetwork(); - - /** - * Returns true if there are unscheduled network requests to be processed. - */ - bool hasReadyRequests(); - - /** - * Gets the next unscheduled request to process, blocking until one is available. - * - * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork. - */ - NetworkOperationIterator getNextReadyRequest(); - - /** - * Schedules "response" in response to "noi" at virtual time "when". - */ - void scheduleResponse( - NetworkOperationIterator noi, - Date_t when, - const ResponseStatus& response); - - /** - * Swallows "noi", causing the network interface to not respond to it until - * shutdown() is called. - */ - void blackHole(NetworkOperationIterator noi); - - /** - * Defers decision making on "noi" until virtual time "dontAskUntil". Use - * this when getNextReadyRequest() returns a request you want to deal with - * after looking at other requests. - */ - void requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil); - - /** - * Runs the simulator forward until now() == until or hasReadyRequests() is true. - * - * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork. - */ - void runUntil(Date_t until); - - /** - * Processes all ready, scheduled network operations. - * - * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork. - */ - void runReadyNetworkOperations(); - - private: - /** - * Type used to identify which thread (network mock or executor) is currently executing. - * - * Values are used in a bitmask, as well. - */ - enum ThreadType { - kNoThread = 0, - kExecutorThread = 1, - kNetworkThread = 2 - }; - - /** - * Returns the current virtualized time. - */ - Date_t _now_inlock() const { return _now; } - - /** - * Implementation of waitForWork*. - */ - void _waitForWork_inlock(boost::unique_lock<boost::mutex>* lk); - - /** - * Returns true if there are ready requests for the network thread to service. - */ - bool _hasReadyRequests_inlock(); - - /** - * Returns true if the network thread could run right now. - */ - bool _isNetworkThreadRunnable_inlock(); - - /** - * Returns true if the executor thread could run right now. - */ - bool _isExecutorThreadRunnable_inlock(); - - /** - * Runs all ready network operations, called while holding "lk". May drop and - * reaquire "lk" several times, but will not return until the executor has blocked - * in waitFor*. - */ - void _runReadyNetworkOperations_inlock(boost::unique_lock<boost::mutex>* lk); - - // Mutex that synchronizes access to mutable data in this class and its subclasses. - // Fields guarded by the mutex are labled (M), below, and those that are read-only - // in multi-threaded execution, and so unsynchronized, are labeled (R). - boost::mutex _mutex; - - // Condition signaled to indicate that the network processing thread should wake up. - boost::condition_variable _shouldWakeNetworkCondition; // (M) - - // Condition signaled to indicate that the executor run thread should wake up. - boost::condition_variable _shouldWakeExecutorCondition; // (M) - - // Bitmask indicating which threads are runnable. - int _waitingToRunMask; // (M) - - // Indicator of which thread, if any, is currently running. - ThreadType _currentlyRunning; // (M) - - // The current time reported by this instance of NetworkInterfaceMock. - Date_t _now; // (M) - - // Set to true by "startUp()" - bool _hasStarted; // (M) - - // Set to true by "shutDown()". - bool _inShutdown; // (M) - - // Next date that the executor expects to wake up at (due to a scheduleWorkAt() call). - Date_t _executorNextWakeupDate; // (M) - - // List of network operations whose responses haven't been scheduled or blackholed. This is - // where network requests are first queued. It is sorted by - // NetworkOperation::_nextConsiderationDate, which is set to now() when startCommand() is - // called, and adjusted by requeueAt(). - NetworkOperationList _unscheduled; // (M) - - // List of network operations that have been returned by getNextReadyRequest() but not - // yet scheudled, black-holed or requeued. - NetworkOperationList _processing; // (M) - - // List of network operations whose responses have been scheduled but not delivered, sorted - // by NetworkOperation::_responseDate. These operations will have their responses delivered - // when now() == getResponseDate(). - NetworkOperationList _scheduled; // (M) - - // List of network operations that will not be responded to until shutdown() is called. - NetworkOperationList _blackHoled; // (M) - - // Pointer to the executor into which this mock is installed. Used to signal the executor - // when the clock changes. - ReplicationExecutor* _executor; // (R) - }; - - /** - * Representation of an in-progress network operation. - */ - class NetworkInterfaceMock::NetworkOperation { - public: - NetworkOperation(); - NetworkOperation(const ReplicationExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& theRequest, - Date_t theRequestDate, - const RemoteCommandCompletionFn& onFinish); - ~NetworkOperation(); - - /** - * Adjusts the stored virtual time at which this entry will be subject to consideration - * by the test harness. - */ - void setNextConsiderationDate(Date_t nextConsiderationDate); - - /** - * Sets the response and thet virtual time at which it will be delivered. - */ - void setResponse(Date_t responseDate, const ResponseStatus& response); - - /** - * Predicate that returns true if cbHandle equals the executor's handle for this network - * operation. Used for searching lists of NetworkOperations. - */ - bool isForCallback(const ReplicationExecutor::CallbackHandle& cbHandle) const { - return cbHandle == _cbHandle; - } - - /** - * Gets the request that initiated this operation. - */ - const RemoteCommandRequest& getRequest() const { return _request; } - - /** - * Gets the virtual time at which the operation was started. - */ - Date_t getRequestDate() const { return _requestDate; } - - /** - * Gets the virtual time at which the test harness should next consider what to do - * with this request. - */ - Date_t getNextConsiderationDate() const { return _nextConsiderationDate; } - - /** - * After setResponse() has been called, returns the virtual time at which - * the response should be delivered. - */ - Date_t getResponseDate() const { return _responseDate; } - - /** - * Delivers the response, by invoking the onFinish callback passed into the constructor. - */ - void finishResponse(); - - private: - Date_t _requestDate; - Date_t _nextConsiderationDate; - Date_t _responseDate; - ReplicationExecutor::CallbackHandle _cbHandle; - RemoteCommandRequest _request; - ResponseStatus _response; - RemoteCommandCompletionFn _onFinish; - }; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp b/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp index 29a7812d6f3..8d53dffc062 100644 --- a/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp +++ b/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp @@ -30,9 +30,9 @@ #include <boost/scoped_ptr.hpp> +#include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" -#include "mongo/db/repl/repl_set_heartbeat_response.h" namespace mongo { namespace repl { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 0fbd1b9424d..942681cedf6 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -56,6 +56,7 @@ #include "mongo/db/repl/rs_sync.h" #include "mongo/db/repl/last_vote.h" #include "mongo/db/storage/storage_engine.h" +#include "mongo/executor/network_interface.h" #include "mongo/s/d_state.h" #include "mongo/stdx/functional.h" #include "mongo/util/assert_util.h" @@ -294,8 +295,7 @@ namespace { } void ReplicationCoordinatorExternalStateImpl::closeConnections() { - MessagingPort::closeAllSockets( - ReplicationExecutor::NetworkInterface::kMessagingPortKeepOpen); + MessagingPort::closeAllSockets(executor::NetworkInterface::kMessagingPortKeepOpen); } void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index dc9a7d36ec6..e3ea081940a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -78,6 +78,7 @@ namespace repl { namespace { typedef StatusWith<ReplicationExecutor::CallbackHandle> CBHStatus; + using executor::NetworkInterface; void lockAndCall(boost::unique_lock<boost::mutex>* lk, const stdx::function<void ()>& fn) { if (!lk->owns_lock()) { @@ -159,13 +160,16 @@ namespace { ReplicationCoordinatorExternalState* externalState, TopologyCoordinator* topCoord, int64_t prngSeed, - ReplicationExecutor::NetworkInterface* network, + NetworkInterface* network, + StorageInterface* storage, ReplicationExecutor* replExec) : _settings(settings), _replMode(getReplicationModeFromSettings(settings)), _topCoord(topCoord), _replExecutorIfOwned(replExec ? nullptr : - new ReplicationExecutor(network, prngSeed)), + new ReplicationExecutor(network, + storage, + prngSeed)), _replExecutor(replExec ? *replExec : *_replExecutorIfOwned), _externalState(externalState), _inShutdown(false), @@ -198,13 +202,15 @@ namespace { ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( const ReplSettings& settings, ReplicationCoordinatorExternalState* externalState, - ReplicationExecutor::NetworkInterface* network, + NetworkInterface* network, + StorageInterface* storage, TopologyCoordinator* topCoord, int64_t prngSeed) : ReplicationCoordinatorImpl(settings, externalState, topCoord, prngSeed, network, + storage, nullptr) { } ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( @@ -217,6 +223,7 @@ namespace { topCoord, prngSeed, nullptr, + nullptr, replExec) { } ReplicationCoordinatorImpl::~ReplicationCoordinatorImpl() {} diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index d8dabc082eb..625007cd09f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -38,13 +38,14 @@ #include "mongo/base/status.h" #include "mongo/bson/timestamp.h" #include "mongo/db/service_context.h" -#include "mongo/db/repl/member_state.h" #include "mongo/db/repl/data_replicator.h" +#include "mongo/db/repl/member_state.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_external_state.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/update_position_args.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/unordered_map.h" @@ -81,7 +82,8 @@ namespace repl { // Takes ownership of the "externalState", "topCoord" and "network" objects. ReplicationCoordinatorImpl(const ReplSettings& settings, ReplicationCoordinatorExternalState* externalState, - ReplicationExecutor::NetworkInterface* network, + executor::NetworkInterface* network, + StorageInterface* storage, TopologyCoordinator* topoCoord, int64_t prngSeed); // Takes ownership of the "externalState" and "topCoord" objects. @@ -296,7 +298,8 @@ namespace repl { ReplicationCoordinatorExternalState* externalState, TopologyCoordinator* topCoord, int64_t prngSeed, - ReplicationExecutor::NetworkInterface* network, + executor::NetworkInterface* network, + StorageInterface* storage, ReplicationExecutor* replExec); /** * Configuration states for a replica set node. diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp index 5202e59c65c..a0e4149de24 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp @@ -33,7 +33,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/is_master_response.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/operation_context_repl_mock.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" @@ -41,6 +40,7 @@ #include "mongo/db/repl/replication_coordinator_external_state_mock.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -48,6 +48,8 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; + class ReplCoordElectTest : public ReplCoordTest { protected: void simulateEnoughHeartbeatsForElectability(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index 43d6d2f69b9..c7d5c9944da 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -33,7 +33,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/is_master_response.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/operation_context_repl_mock.h" #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" @@ -41,6 +40,7 @@ #include "mongo/db/repl/replication_coordinator_external_state_mock.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -48,6 +48,8 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; + class ReplCoordElectV1Test : public ReplCoordTest { protected: void simulateEnoughHeartbeatsForElectability(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp index dadc7b5a026..2afcad55859 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp @@ -32,7 +32,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_noop.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/replica_set_config.h" @@ -40,6 +39,7 @@ #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" #include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -47,6 +47,8 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; + class ReplCoordHBTest : public ReplCoordTest { protected: void assertMemberState(MemberState expected, std::string msg = ""); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 5851b6e7e7b..4ae6a358e53 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -32,7 +32,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_noop.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/replica_set_config.h" @@ -40,6 +39,7 @@ #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" #include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -47,6 +47,8 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; + class ReplCoordHBV1Test : public ReplCoordTest { protected: void assertMemberState(MemberState expected, std::string msg = ""); diff --git a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp index e4a0310daf3..5bf1b40f5cf 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp @@ -32,7 +32,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_noop.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/replica_set_config.h" @@ -40,6 +39,7 @@ #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" #include "mongo/db/repl/replication_coordinator.h" // ReplSetReconfigArgs +#include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -47,6 +47,7 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs; TEST_F(ReplCoordTest, ReconfigBeforeInitialized) { diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 3f5a574d0e4..82373106580 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -41,7 +41,6 @@ #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/handshake_args.h" #include "mongo/db/repl/is_master_response.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/operation_context_repl_mock.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_after_optime_args.h" @@ -57,6 +56,7 @@ #include "mongo/db/repl/update_position_args.h" #include "mongo/db/server_options.h" #include "mongo/db/write_concern_options.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" @@ -68,6 +68,7 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs; Status kInterruptedStatus(ErrorCodes::Interrupted, "operation was interrupted"); diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index e7bd4d93703..2359de1f751 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -34,14 +34,15 @@ #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/is_master_response.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/operation_context_repl_mock.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/replication_coordinator_external_state_mock.h" #include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -55,6 +56,8 @@ namespace { } } // namespace + using executor::NetworkInterfaceMock; + ReplicaSetConfig ReplCoordTest::assertMakeRSConfig(const BSONObj& configBson) { ReplicaSetConfig config; ASSERT_OK(config.initialize(configBson)); @@ -104,10 +107,12 @@ namespace { _topo = new TopologyCoordinatorImpl(Seconds(0)); _net = new NetworkInterfaceMock; + _storage = new StorageInterfaceMock; _externalState = new ReplicationCoordinatorExternalStateMock; _repl.reset(new ReplicationCoordinatorImpl(_settings, _externalState, _net, + _storage, _topo, seed)); } diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h index 93d3c7c4bd7..d6a67cced2e 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.h +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h @@ -41,12 +41,16 @@ namespace mongo { class BSONObj; struct HostAndPort; +namespace executor { + class NetworkInterfaceMock; +} // namespace executor + namespace repl { - class NetworkInterfaceMock; class ReplicaSetConfig; class ReplicationCoordinatorExternalStateMock; class ReplicationCoordinatorImpl; + class StorageInterfaceMock; class TopologyCoordinatorImpl; /** @@ -58,7 +62,7 @@ namespace repl { * Makes a ResponseStatus with the given "doc" response and optional elapsed time "millis". */ static ResponseStatus makeResponseStatus(const BSONObj& doc, - Milliseconds millis = Milliseconds(0)); + Milliseconds millis = Milliseconds(0)); /** * Constructs a ReplicaSetConfig from the given BSON, or raises a test failure exception. @@ -75,7 +79,7 @@ namespace repl { /** * Gets the network mock. */ - NetworkInterfaceMock* getNet() { return _net; } + executor::NetworkInterfaceMock* getNet() { return _net; } /** * Gets the replication coordinator under test. @@ -182,7 +186,9 @@ namespace repl { // Owned by ReplicationCoordinatorImpl TopologyCoordinatorImpl* _topo; // Owned by ReplicationCoordinatorImpl - NetworkInterfaceMock* _net; + executor::NetworkInterfaceMock* _net; + // Owned by ReplicationCoordinatorImpl + StorageInterfaceMock* _storage; // Owned by ReplicationCoordinatorImpl ReplicationCoordinatorExternalStateMock* _externalState; ReplSettings _settings; diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp index 118ad0d0ded..72c611b600a 100644 --- a/src/mongo/db/repl/replication_executor.cpp +++ b/src/mongo/db/repl/replication_executor.cpp @@ -33,6 +33,8 @@ #include <limits> #include "mongo/db/repl/database_task.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/executor/network_interface.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" @@ -43,9 +45,14 @@ namespace { stdx::function<void ()> makeNoExcept(const stdx::function<void ()> &fn); } // namespace - ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface, int64_t prngSeed) : + using executor::NetworkInterface; + + ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface, + StorageInterface* storageInterface, + int64_t prngSeed) : _random(prngSeed), _networkInterface(netInterface), + _storageInterface(storageInterface), _totalEventWaiters(0), _inShutdown(false), _dblockWorkers(threadpool::ThreadPool::DoNotStartThreadsTag(), @@ -53,10 +60,10 @@ namespace { "replCallbackWithGlobalLock-"), _dblockTaskRunner( &_dblockWorkers, - stdx::bind(&NetworkInterface::createOperationContext, netInterface)), + stdx::bind(&StorageInterface::createOperationContext, storageInterface)), _dblockExclusiveLockTaskRunner( &_dblockWorkers, - stdx::bind(&NetworkInterface::createOperationContext, netInterface)), + stdx::bind(&StorageInterface::createOperationContext, storageInterface)), _nextId(0) { } @@ -561,15 +568,6 @@ namespace { isSignaledCondition(new boost::condition_variable) { } - // This is a bitmask with the first bit set. It's used to mark connections that should be kept - // open during stepdowns. -#ifndef _MSC_EXTENSIONS - const unsigned int ReplicationExecutor::NetworkInterface::kMessagingPortKeepOpen; -#endif // _MSC_EXTENSIONS - - ReplicationExecutor::NetworkInterface::NetworkInterface() {} - ReplicationExecutor::NetworkInterface::~NetworkInterface() {} - namespace { void callNoExcept(const stdx::function<void ()>& fn) { diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h index b44ee4a4815..b90d0e24133 100644 --- a/src/mongo/db/repl/replication_executor.h +++ b/src/mongo/db/repl/replication_executor.h @@ -53,8 +53,14 @@ namespace mongo { class NamespaceString; class OperationContext; +namespace executor{ + class NetworkInterface; +} // namespace executor + namespace repl { + class StorageInterface; + /** * Event loop for driving state machines in replication. * @@ -112,7 +118,6 @@ namespace repl { struct CallbackData; class CallbackHandle; class EventHandle; - class NetworkInterface; struct RemoteCommandCallbackData; typedef StatusWith<RemoteCommandResponse> ResponseStatus; @@ -141,7 +146,9 @@ namespace repl { * * Takes ownership of the passed NetworkInterface object. */ - explicit ReplicationExecutor(NetworkInterface* netInterface, int64_t pnrgSeed); + ReplicationExecutor(executor::NetworkInterface* netInterface, + StorageInterface* storageInterface, + int64_t pnrgSeed); /** * Destroys an executor. @@ -422,7 +429,8 @@ namespace repl { // PRNG; seeded at class construction time. PseudoRandom _random; - boost::scoped_ptr<NetworkInterface> _networkInterface; + boost::scoped_ptr<executor::NetworkInterface> _networkInterface; + boost::scoped_ptr<StorageInterface> _storageInterface; boost::mutex _mutex; boost::mutex _terribleExLockSyncMutex; boost::condition_variable _noMoreWaitingThreads; @@ -509,92 +517,8 @@ namespace repl { OperationContext* txn; }; - /** - * Interface to networking and lock manager. - */ - class ReplicationExecutor::NetworkInterface { - MONGO_DISALLOW_COPYING(NetworkInterface); - public: - - // A flag to keep replication MessagingPorts open when all other sockets are disconnected. - static const unsigned int kMessagingPortKeepOpen = 1; - - typedef RemoteCommandResponse Response; - typedef stdx::function<void (const ResponseStatus&)> RemoteCommandCompletionFn; - - virtual ~NetworkInterface(); - - /** - * Returns diagnostic info. - */ - virtual std::string getDiagnosticString() = 0; - - /** - * Starts up the network interface. - * - * It is valid to call all methods except shutdown() before this method completes. That is, - * implementations may not assume that startup() completes before startCommand() first - * executes. - * - * Called by the owning ReplicationExecutor inside its run() method. - */ - virtual void startup() = 0; - - /** - * Shuts down the network interface. Must be called before this instance gets deleted, - * if startup() is called. - * - * Called by the owning ReplicationExecutor inside its run() method. - */ - virtual void shutdown() = 0; - - /** - * Blocks the current thread (presumably the executor thread) until the network interface - * knows of work for the executor to perform. - */ - virtual void waitForWork() = 0; - - /** - * Similar to waitForWork, but only blocks until "when". - */ - virtual void waitForWorkUntil(Date_t when) = 0; - - /** - * Signals to the network interface that there is new work (such as a signaled event) for - * the executor to process. Wakes the executor from waitForWork() and friends. - */ - virtual void signalWorkAvailable() = 0; - - /** - * Returns the current time. - */ - virtual Date_t now() = 0; - - /** - * Starts asynchronous execution of the command described by "request". - */ - virtual void startCommand(const CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) = 0; - - /** - * Requests cancelation of the network activity associated with "cbHandle" if it has not yet - * completed. - */ - virtual void cancelCommand(const CallbackHandle& cbHandle) = 0; - - /** - * Creates an operation context for running database operations. - */ - virtual OperationContext* createOperationContext() = 0; - - protected: - NetworkInterface(); - }; - typedef ReplicationExecutor::ResponseStatus ResponseStatus; - // Must be after NetworkInterface class struct ReplicationExecutor::RemoteCommandCallbackData { RemoteCommandCallbackData(ReplicationExecutor* theExecutor, const CallbackHandle& theHandle, diff --git a/src/mongo/db/repl/replication_executor_test.cpp b/src/mongo/db/repl/replication_executor_test.cpp index dbc4f00b6fa..d916f3e01d2 100644 --- a/src/mongo/db/repl/replication_executor_test.cpp +++ b/src/mongo/db/repl/replication_executor_test.cpp @@ -34,9 +34,10 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/replication_executor_test_fixture.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/barrier.h" #include "mongo/unittest/unittest.h" @@ -48,6 +49,8 @@ namespace repl { namespace { + using executor::NetworkInterfaceMock; + bool operator==(const RemoteCommandRequest lhs, const RemoteCommandRequest rhs) { return lhs.target == rhs.target && @@ -155,6 +158,7 @@ namespace { void onGoAfterTriggered(const ReplicationExecutor::CallbackData& cbData); NetworkInterfaceMock* net; + StorageInterfaceMock* storage; ReplicationExecutor executor; boost::thread executorThread; const ReplicationExecutor::EventHandle goEvent; @@ -176,7 +180,8 @@ namespace { EventChainAndWaitingTest::EventChainAndWaitingTest() : net(new NetworkInterfaceMock), - executor(net, prngSeed), + storage(new StorageInterfaceMock), + executor(net, storage, prngSeed), executorThread(stdx::bind(&ReplicationExecutor::run, &executor)), goEvent(unittest::assertGet(executor.makeEvent())), event2(unittest::assertGet(executor.makeEvent())), diff --git a/src/mongo/db/repl/replication_executor_test_fixture.cpp b/src/mongo/db/repl/replication_executor_test_fixture.cpp index b33e1983e82..6efbe072d52 100644 --- a/src/mongo/db/repl/replication_executor_test_fixture.cpp +++ b/src/mongo/db/repl/replication_executor_test_fixture.cpp @@ -30,8 +30,9 @@ #include "mongo/db/repl/replication_executor_test_fixture.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/executor/network_interface_mock.h" namespace mongo { namespace repl { @@ -46,19 +47,20 @@ namespace { ASSERT(!_executorThread); _executorThread.reset( new boost::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); - _net->enterNetwork(); + getNet()->enterNetwork(); } void ReplicationExecutorTest::joinExecutorThread() { ASSERT(_executorThread); - _net->exitNetwork(); + getNet()->exitNetwork(); _executorThread->join(); _executorThread.reset(); } void ReplicationExecutorTest::setUp() { - _net = new NetworkInterfaceMock; - _executor.reset(new ReplicationExecutor(_net, prngSeed)); + _net = new executor::NetworkInterfaceMock; + _storage = new StorageInterfaceMock; + _executor.reset(new ReplicationExecutor(_net, _storage, prngSeed)); } void ReplicationExecutorTest::tearDown() { diff --git a/src/mongo/db/repl/replication_executor_test_fixture.h b/src/mongo/db/repl/replication_executor_test_fixture.h index ad9bc345ea0..479d799b990 100644 --- a/src/mongo/db/repl/replication_executor_test_fixture.h +++ b/src/mongo/db/repl/replication_executor_test_fixture.h @@ -34,12 +34,17 @@ #include "mongo/unittest/unittest.h" namespace mongo { + +namespace executor { + class NetworkInterfaceMock; +} // namespace executor + namespace repl { using std::unique_ptr; - class NetworkInterfaceMock; class ReplicationExecutor; + class StorageInterfaceMock; /** * Test fixture for tests that require a ReplicationExecutor backed by @@ -47,7 +52,7 @@ namespace repl { */ class ReplicationExecutorTest : public unittest::Test { protected: - NetworkInterfaceMock* getNet() { return _net; } + executor::NetworkInterfaceMock* getNet() { return _net; } ReplicationExecutor& getExecutor() { return *_executor; } /** * Runs ReplicationExecutor in background. @@ -80,7 +85,8 @@ namespace repl { private: - NetworkInterfaceMock* _net; + executor::NetworkInterfaceMock* _net; + StorageInterfaceMock* _storage; unique_ptr<ReplicationExecutor> _executor; unique_ptr<boost::thread> _executorThread; }; diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index 37e1dfe9deb..600fbfe52a8 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -54,6 +54,7 @@ #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/update_position_args.h" #include "mongo/db/storage/storage_engine.h" +#include "mongo/executor/network_interface.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -709,7 +710,7 @@ namespace { { AbstractMessagingPort *mp = txn->getClient()->port(); if( mp ) - mp->tag |= ReplicationExecutor::NetworkInterface::kMessagingPortKeepOpen; + mp->tag |= executor::NetworkInterface::kMessagingPortKeepOpen; } if (getGlobalReplicationCoordinator()->isV1ElectionProtocol()) { diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index fa680a44612..ed617fa0cdf 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -28,9 +28,9 @@ #include "mongo/platform/basic.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/db/repl/reporter.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" @@ -38,6 +38,7 @@ namespace { using namespace mongo; using namespace mongo::repl; + using executor::NetworkInterfaceMock; class MockProgressManager : public ReplicationProgressManager { public: diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp index 1fc6765e58e..49209e9a31a 100644 --- a/src/mongo/db/repl/scatter_gather_test.cpp +++ b/src/mongo/db/repl/scatter_gather_test.cpp @@ -31,10 +31,11 @@ #include <boost/scoped_ptr.hpp> #include <boost/thread.hpp> -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/scatter_gather_algorithm.h" #include "mongo/db/repl/scatter_gather_runner.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" @@ -42,6 +43,8 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; + /** * Algorithm for testing the ScatterGatherRunner, which will finish running when finish() is * called, or upon receiving responses from two nodes. Creates a three requests algorithm @@ -112,13 +115,15 @@ namespace { // owned by _executor NetworkInterfaceMock* _net; + StorageInterfaceMock* _storage; boost::scoped_ptr<ReplicationExecutor> _executor; boost::scoped_ptr<boost::thread> _executorThread; }; void ScatterGatherTest::setUp() { _net = new NetworkInterfaceMock; - _executor.reset(new ReplicationExecutor(_net, 1 /* prng seed */)); + _storage = new StorageInterfaceMock; + _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng seed */)); _executorThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); } diff --git a/src/mongo/db/repl/storage_interface.cpp b/src/mongo/db/repl/storage_interface.cpp new file mode 100644 index 00000000000..c09d76ad9ff --- /dev/null +++ b/src/mongo/db/repl/storage_interface.cpp @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/storage_interface.h" + +namespace mongo { +namespace repl { + + StorageInterface::StorageInterface() {} + StorageInterface::~StorageInterface() {} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h new file mode 100644 index 00000000000..df51692b2f1 --- /dev/null +++ b/src/mongo/db/repl/storage_interface.h @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#pragma once + + +namespace mongo { + + class OperationContext; + +namespace repl { + + /** + * Storage interface used by used by the ReplicationExecutor inside mongod for supporting + * ReplicationExectutor's ability to take database locks. + */ + class StorageInterface { + public: + virtual ~StorageInterface(); + + /** + * Creates an operation context for running database operations. + */ + virtual OperationContext* createOperationContext() = 0; + + protected: + + StorageInterface(); + + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp new file mode 100644 index 00000000000..73a14ce6330 --- /dev/null +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/storage_interface_impl.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/client.h" +#include "mongo/db/operation_context_impl.h" + +namespace mongo { +namespace repl { + + StorageInterfaceImpl::StorageInterfaceImpl() : StorageInterface() {} + StorageInterfaceImpl::~StorageInterfaceImpl() { } + + OperationContext* StorageInterfaceImpl::createOperationContext() { + if (!ClientBasic::getCurrent()) { + Client::initThreadIfNotAlready(); + AuthorizationSession::get(*ClientBasic::getCurrent())->grantInternalAuthorization(); + } + return new OperationContextImpl(); + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h new file mode 100644 index 00000000000..24cc8268f17 --- /dev/null +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#pragma once + +#include "mongo/db/repl/storage_interface.h" + +namespace mongo { + + class OperationContext; + +namespace repl { + + class StorageInterfaceImpl : public StorageInterface { + public: + explicit StorageInterfaceImpl(); + virtual ~StorageInterfaceImpl(); + + OperationContext* createOperationContext() override; + + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp new file mode 100644 index 00000000000..4a6f4a7a293 --- /dev/null +++ b/src/mongo/db/repl/storage_interface_mock.cpp @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/storage_interface_mock.h" + +#include "mongo/db/repl/operation_context_repl_mock.h" + +namespace mongo { +namespace repl { + + StorageInterfaceMock::StorageInterfaceMock() {} + + StorageInterfaceMock::~StorageInterfaceMock() { } + + OperationContext* StorageInterfaceMock::createOperationContext() { + return new OperationContextReplMock(); + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h new file mode 100644 index 00000000000..4bd3e63ec9d --- /dev/null +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#pragma once + +#include "mongo/db/repl/storage_interface.h" + +namespace mongo { + + class OperationContext; + +namespace repl { + + class StorageInterfaceMock : public StorageInterface { + public: + explicit StorageInterfaceMock(); + virtual ~StorageInterfaceMock(); + + OperationContext* createOperationContext() override; + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/vote_requester_test.cpp b/src/mongo/db/repl/vote_requester_test.cpp index 09632675239..90d0843d843 100644 --- a/src/mongo/db/repl/vote_requester_test.cpp +++ b/src/mongo/db/repl/vote_requester_test.cpp @@ -33,9 +33,9 @@ #include "mongo/base/status.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/vote_requester.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/repl_set_request_votes_args.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" #include "mongo/util/mongoutils/str.h" @@ -43,6 +43,8 @@ namespace mongo { namespace repl { namespace { + + using executor::NetworkInterfaceMock; using unittest::assertGet; using RemoteCommandRequest = RemoteCommandRequest; |