diff options
50 files changed, 723 insertions, 239 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 944853324f2..52735b5f1fc 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -30,6 +30,7 @@ env.SConscript( 'crypto', 'db', 'dbtests', + 'executor', 'installer', 'logger', 'platform', @@ -189,6 +190,7 @@ mongod = env.Program( "db/mongodandmongos", "db/mongodwebserver", "db/serveronly", + "db/repl/storage_interface_impl", "util/ntservice", ], ) diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 4b32c6c46fd..c46484860d7 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -625,6 +625,7 @@ serveronlyEnv = env.Clone() serveronlyEnv.InjectThirdPartyIncludePaths(libraries=['snappy']) serveronlyLibdeps = [ "$BUILD_DIR/mongo/client/parallel", + "$BUILD_DIR/mongo/executor/network_interface_impl", "$BUILD_DIR/mongo/s/batch_write_types", "$BUILD_DIR/mongo/s/catalog/legacy/catalog_manager_legacy", "$BUILD_DIR/mongo/s/client/sharding_connection_hook", @@ -650,7 +651,6 @@ serveronlyLibdeps = [ "ops/update_driver", "query/query", "range_deleter", - "repl/network_interface_impl", "repl/repl_coordinator_global", "repl/repl_coordinator_impl", "repl/repl_settings", diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index f430cceb675..e5555bdd33e 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -77,12 +77,12 @@ #include "mongo/db/query/internal_plans.h" #include "mongo/db/range_deleter_service.h" #include "mongo/db/repair_database.h" -#include "mongo/db/repl/network_interface_impl.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_external_state_impl.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/restapi.h" #include "mongo/db/server_parameters.h" @@ -93,6 +93,7 @@ #include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage_options.h" #include "mongo/db/ttl.h" +#include "mongo/executor/network_interface_impl.h" #include "mongo/platform/process_id.h" #include "mongo/scripting/engine.h" #include "mongo/stdx/memory.h" @@ -757,7 +758,8 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, ("SetGlobalEnviro repl::ReplicationCoordinatorImpl* replCoord = new repl::ReplicationCoordinatorImpl( getGlobalReplSettings(), new repl::ReplicationCoordinatorExternalStateImpl, - new repl::NetworkInterfaceImpl, + new executor::NetworkInterfaceImpl{}, + new repl::StorageInterfaceImpl{}, new repl::TopologyCoordinatorImpl(Seconds(repl::maxSyncSourceLagSecs)), static_cast<int64_t>(curTimeMillis64())); repl::setGlobalReplicationCoordinator(replCoord); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 146039a94cf..d041ef49b72 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -15,17 +15,20 @@ env.Library('rslog', '$BUILD_DIR/mongo/logger/logger', ]) +env.Library('storage_interface', + 'storage_interface.cpp', + LIBDEPS=[ + ]) + env.Library( - target='network_interface_impl', + target='storage_interface_impl', source=[ - 'network_interface_impl.cpp', + 'storage_interface_impl.cpp', ], LIBDEPS=[ - '$BUILD_DIR/mongo/base/base', - '$BUILD_DIR/mongo/bson/bson', - '$BUILD_DIR/mongo/client/remote_command_runner_impl', - '$BUILD_DIR/mongo/db/coredb', - '$BUILD_DIR/mongo/db/common', + '$BUILD_DIR/mongo/db/serveronly', # For OperationContextImpl + '$BUILD_DIR/mongo/db/service_context', + 'storage_interface', ]) env.Library( @@ -38,7 +41,9 @@ env.Library( LIBDEPS=[ 'database_task', 'task_runner', + 'storage_interface', '$BUILD_DIR/mongo/client/remote_command_runner', + '$BUILD_DIR/mongo/executor/network_interface', '$BUILD_DIR/mongo/util/foundation', '$BUILD_DIR/mongo/util/net/hostandport', ], @@ -343,16 +348,18 @@ env.Library('repl_coordinator_global', env.Library('replmocks', [ - 'network_interface_mock.cpp', 'operation_context_repl_mock.cpp', 'replication_coordinator_external_state_mock.cpp', 'replication_coordinator_mock.cpp', + 'storage_interface_mock.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/concurrency/lock_manager', + '$BUILD_DIR/mongo/executor/network_interface_mock', 'repl_coordinator_interface', 'replica_set_messages', 'replication_executor', + 'storage_interface', ]) env.Library('read_after_optime_args', diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index 92518498300..33357e702e3 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -106,7 +106,7 @@ namespace repl { ReplicationExecutorTest::setUp(); clear(); launchExecutorThread(); - storageInterface.reset(new StorageInterfaceMock()); + storageInterface.reset(new ClonerStorageInterfaceMock()); } void BaseClonerTest::tearDown() { @@ -245,21 +245,21 @@ namespace repl { ASSERT_FALSE(getCloner()->isActive()); } - Status StorageInterfaceMock::beginCollection(OperationContext* txn, - const NamespaceString& nss, - const CollectionOptions& options, - const std::vector<BSONObj>& specs) { + Status ClonerStorageInterfaceMock::beginCollection(OperationContext* txn, + const NamespaceString& nss, + const CollectionOptions& options, + const std::vector<BSONObj>& specs) { return beginCollectionFn ? beginCollectionFn(txn, nss, options, specs) : Status::OK(); } - Status StorageInterfaceMock::insertDocuments(OperationContext* txn, - const NamespaceString& nss, - const std::vector<BSONObj>& docs) { + Status ClonerStorageInterfaceMock::insertDocuments(OperationContext* txn, + const NamespaceString& nss, + const std::vector<BSONObj>& docs) { return insertDocumentsFn ? insertDocumentsFn(txn, nss, docs) : Status::OK(); } - Status StorageInterfaceMock::commitCollection(OperationContext* txn, - const NamespaceString& nss) { + Status ClonerStorageInterfaceMock::commitCollection(OperationContext* txn, + const NamespaceString& nss) { return Status::OK(); } diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h index 064003bbb6d..dca30fe8ae7 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.h +++ b/src/mongo/db/repl/base_cloner_test_fixture.h @@ -36,8 +36,8 @@ #include "mongo/db/clientcursor.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/collection_cloner.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor_test_fixture.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/util/net/hostandport.h" @@ -50,11 +50,11 @@ namespace mongo { namespace repl { class BaseCloner; - class StorageInterfaceMock; + class ClonerStorageInterfaceMock; class BaseClonerTest : public ReplicationExecutorTest { public: - typedef NetworkInterfaceMock::NetworkOperationIterator NetworkOperationIterator; + typedef executor::NetworkInterfaceMock::NetworkOperationIterator NetworkOperationIterator; /** * Creates an initial error status suitable for checking if @@ -128,7 +128,7 @@ namespace repl { protected: - std::unique_ptr<StorageInterfaceMock> storageInterface; + std::unique_ptr<ClonerStorageInterfaceMock> storageInterface; private: @@ -141,7 +141,7 @@ namespace repl { }; - class StorageInterfaceMock : public CollectionCloner::StorageInterface { + class ClonerStorageInterfaceMock : public CollectionCloner::StorageInterface { public: Status beginCollection(OperationContext* txn, const NamespaceString& nss, diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp index bc692b43288..63fe5684aa9 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp @@ -36,11 +36,12 @@ #include "mongo/base/status.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/check_quorum_for_config_change.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/platform/unordered_set.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" @@ -62,6 +63,8 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; + class CheckQuorumTest : public mongo::unittest::Test { protected: CheckQuorumTest(); @@ -71,6 +74,7 @@ namespace { bool isQuorumCheckDone(); NetworkInterfaceMock* _net; + StorageInterfaceMock* _storage; boost::scoped_ptr<ReplicationExecutor> _executor; private: @@ -93,7 +97,8 @@ namespace { void CheckQuorumTest::setUp() { _net = new NetworkInterfaceMock; - _executor.reset(new ReplicationExecutor(_net, 1 /* prng */ )); + _storage = new StorageInterfaceMock; + _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng */ )); _executorThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); } diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index a6841ad2467..9b6927c4b43 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -35,7 +35,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/base_cloner_test_fixture.h" #include "mongo/db/repl/collection_cloner.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/unittest/unittest.h" namespace { diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 8fc727c7b38..6aef8263679 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -33,7 +33,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/data_replicator.h" #include "mongo/db/repl/fetcher.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_external_state_mock.h" @@ -41,6 +40,7 @@ #include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/util/fail_point_service.h" #include "mongo/unittest/unittest.h" @@ -48,6 +48,7 @@ namespace { using namespace mongo; using namespace mongo::repl; + using executor::NetworkInterfaceMock; const HostAndPort target("localhost", -1); diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp index 22e0e452f3d..eb305fbd0fa 100644 --- a/src/mongo/db/repl/database_cloner_test.cpp +++ b/src/mongo/db/repl/database_cloner_test.cpp @@ -35,7 +35,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/base_cloner_test_fixture.h" #include "mongo/db/repl/database_cloner.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/unittest/unittest.h" namespace { diff --git a/src/mongo/db/repl/elect_cmd_runner_test.cpp b/src/mongo/db/repl/elect_cmd_runner_test.cpp index f4d4c913bf1..2da5db3e98b 100644 --- a/src/mongo/db/repl/elect_cmd_runner_test.cpp +++ b/src/mongo/db/repl/elect_cmd_runner_test.cpp @@ -33,11 +33,12 @@ #include "mongo/base/status.h" #include "mongo/db/jsobj.h" +#include "mongo/db/repl/elect_cmd_runner.h" #include "mongo/db/repl/member_heartbeat_data.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/elect_cmd_runner.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" @@ -48,6 +49,8 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; + class ElectCmdRunnerTest : public mongo::unittest::Test { public: void startTest(ElectCmdRunner* electCmdRunner, @@ -65,6 +68,7 @@ namespace { const std::vector<HostAndPort>& hosts); NetworkInterfaceMock* _net; + StorageInterfaceMock* _storage; boost::scoped_ptr<ReplicationExecutor> _executor; boost::scoped_ptr<boost::thread> _executorThread; @@ -77,7 +81,8 @@ namespace { void ElectCmdRunnerTest::setUp() { _net = new NetworkInterfaceMock; - _executor.reset(new ReplicationExecutor(_net, 1 /* prng seed */)); + _storage = new StorageInterfaceMock; + _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng seed */)); _executorThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); } diff --git a/src/mongo/db/repl/election_winner_declarer_test.cpp b/src/mongo/db/repl/election_winner_declarer_test.cpp index bf006dd4355..1b264e26b56 100644 --- a/src/mongo/db/repl/election_winner_declarer_test.cpp +++ b/src/mongo/db/repl/election_winner_declarer_test.cpp @@ -34,8 +34,8 @@ #include "mongo/base/status.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/election_winner_declarer.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" #include "mongo/util/mongoutils/str.h" @@ -46,6 +46,7 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; using unittest::assertGet; bool stringContains(const std::string &haystack, const std::string& needle) { diff --git a/src/mongo/db/repl/fetcher_test.cpp b/src/mongo/db/repl/fetcher_test.cpp index dde20c44fbf..85a693dd48f 100644 --- a/src/mongo/db/repl/fetcher_test.cpp +++ b/src/mongo/db/repl/fetcher_test.cpp @@ -32,9 +32,9 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/fetcher.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/replication_executor_test_fixture.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" @@ -42,6 +42,7 @@ namespace { using namespace mongo; using namespace mongo::repl; + using executor::NetworkInterfaceMock; const HostAndPort target("localhost", -1); const BSONObj findCmdObj = BSON("find" << "coll"); diff --git a/src/mongo/db/repl/freshness_checker_test.cpp b/src/mongo/db/repl/freshness_checker_test.cpp index 6f2e3ab9a85..c271ae0c371 100644 --- a/src/mongo/db/repl/freshness_checker_test.cpp +++ b/src/mongo/db/repl/freshness_checker_test.cpp @@ -33,11 +33,12 @@ #include "mongo/base/status.h" #include "mongo/db/jsobj.h" +#include "mongo/db/repl/freshness_checker.h" #include "mongo/db/repl/member_heartbeat_data.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/freshness_checker.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/platform/unordered_set.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" @@ -49,6 +50,7 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; using unittest::assertGet; bool stringContains(const std::string &haystack, const std::string& needle) { @@ -73,6 +75,7 @@ namespace { } NetworkInterfaceMock* _net; + StorageInterfaceMock* _storage; boost::scoped_ptr<ReplicationExecutor> _executor; boost::scoped_ptr<boost::thread> _executorThread; @@ -91,7 +94,8 @@ namespace { void FreshnessCheckerTest::setUp() { _net = new NetworkInterfaceMock; - _executor.reset(new ReplicationExecutor(_net, 1 /* prng seed */)); + _storage = new StorageInterfaceMock; + _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng seed */)); _executorThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); _checker.reset(new FreshnessChecker); diff --git a/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp b/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp index 29a7812d6f3..8d53dffc062 100644 --- a/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp +++ b/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp @@ -30,9 +30,9 @@ #include <boost/scoped_ptr.hpp> +#include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" -#include "mongo/db/repl/repl_set_heartbeat_response.h" namespace mongo { namespace repl { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 0fbd1b9424d..942681cedf6 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -56,6 +56,7 @@ #include "mongo/db/repl/rs_sync.h" #include "mongo/db/repl/last_vote.h" #include "mongo/db/storage/storage_engine.h" +#include "mongo/executor/network_interface.h" #include "mongo/s/d_state.h" #include "mongo/stdx/functional.h" #include "mongo/util/assert_util.h" @@ -294,8 +295,7 @@ namespace { } void ReplicationCoordinatorExternalStateImpl::closeConnections() { - MessagingPort::closeAllSockets( - ReplicationExecutor::NetworkInterface::kMessagingPortKeepOpen); + MessagingPort::closeAllSockets(executor::NetworkInterface::kMessagingPortKeepOpen); } void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index dc9a7d36ec6..e3ea081940a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -78,6 +78,7 @@ namespace repl { namespace { typedef StatusWith<ReplicationExecutor::CallbackHandle> CBHStatus; + using executor::NetworkInterface; void lockAndCall(boost::unique_lock<boost::mutex>* lk, const stdx::function<void ()>& fn) { if (!lk->owns_lock()) { @@ -159,13 +160,16 @@ namespace { ReplicationCoordinatorExternalState* externalState, TopologyCoordinator* topCoord, int64_t prngSeed, - ReplicationExecutor::NetworkInterface* network, + NetworkInterface* network, + StorageInterface* storage, ReplicationExecutor* replExec) : _settings(settings), _replMode(getReplicationModeFromSettings(settings)), _topCoord(topCoord), _replExecutorIfOwned(replExec ? nullptr : - new ReplicationExecutor(network, prngSeed)), + new ReplicationExecutor(network, + storage, + prngSeed)), _replExecutor(replExec ? *replExec : *_replExecutorIfOwned), _externalState(externalState), _inShutdown(false), @@ -198,13 +202,15 @@ namespace { ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( const ReplSettings& settings, ReplicationCoordinatorExternalState* externalState, - ReplicationExecutor::NetworkInterface* network, + NetworkInterface* network, + StorageInterface* storage, TopologyCoordinator* topCoord, int64_t prngSeed) : ReplicationCoordinatorImpl(settings, externalState, topCoord, prngSeed, network, + storage, nullptr) { } ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( @@ -217,6 +223,7 @@ namespace { topCoord, prngSeed, nullptr, + nullptr, replExec) { } ReplicationCoordinatorImpl::~ReplicationCoordinatorImpl() {} diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index d8dabc082eb..625007cd09f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -38,13 +38,14 @@ #include "mongo/base/status.h" #include "mongo/bson/timestamp.h" #include "mongo/db/service_context.h" -#include "mongo/db/repl/member_state.h" #include "mongo/db/repl/data_replicator.h" +#include "mongo/db/repl/member_state.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_external_state.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/update_position_args.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/unordered_map.h" @@ -81,7 +82,8 @@ namespace repl { // Takes ownership of the "externalState", "topCoord" and "network" objects. ReplicationCoordinatorImpl(const ReplSettings& settings, ReplicationCoordinatorExternalState* externalState, - ReplicationExecutor::NetworkInterface* network, + executor::NetworkInterface* network, + StorageInterface* storage, TopologyCoordinator* topoCoord, int64_t prngSeed); // Takes ownership of the "externalState" and "topCoord" objects. @@ -296,7 +298,8 @@ namespace repl { ReplicationCoordinatorExternalState* externalState, TopologyCoordinator* topCoord, int64_t prngSeed, - ReplicationExecutor::NetworkInterface* network, + executor::NetworkInterface* network, + StorageInterface* storage, ReplicationExecutor* replExec); /** * Configuration states for a replica set node. diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp index 5202e59c65c..a0e4149de24 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp @@ -33,7 +33,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/is_master_response.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/operation_context_repl_mock.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" @@ -41,6 +40,7 @@ #include "mongo/db/repl/replication_coordinator_external_state_mock.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -48,6 +48,8 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; + class ReplCoordElectTest : public ReplCoordTest { protected: void simulateEnoughHeartbeatsForElectability(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index 43d6d2f69b9..c7d5c9944da 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -33,7 +33,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/is_master_response.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/operation_context_repl_mock.h" #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" @@ -41,6 +40,7 @@ #include "mongo/db/repl/replication_coordinator_external_state_mock.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -48,6 +48,8 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; + class ReplCoordElectV1Test : public ReplCoordTest { protected: void simulateEnoughHeartbeatsForElectability(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp index dadc7b5a026..2afcad55859 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp @@ -32,7 +32,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_noop.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/replica_set_config.h" @@ -40,6 +39,7 @@ #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" #include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -47,6 +47,8 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; + class ReplCoordHBTest : public ReplCoordTest { protected: void assertMemberState(MemberState expected, std::string msg = ""); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 5851b6e7e7b..4ae6a358e53 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -32,7 +32,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_noop.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/replica_set_config.h" @@ -40,6 +39,7 @@ #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" #include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -47,6 +47,8 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; + class ReplCoordHBV1Test : public ReplCoordTest { protected: void assertMemberState(MemberState expected, std::string msg = ""); diff --git a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp index e4a0310daf3..5bf1b40f5cf 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp @@ -32,7 +32,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_noop.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/replica_set_config.h" @@ -40,6 +39,7 @@ #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" #include "mongo/db/repl/replication_coordinator.h" // ReplSetReconfigArgs +#include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -47,6 +47,7 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs; TEST_F(ReplCoordTest, ReconfigBeforeInitialized) { diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 3f5a574d0e4..82373106580 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -41,7 +41,6 @@ #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/handshake_args.h" #include "mongo/db/repl/is_master_response.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/operation_context_repl_mock.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_after_optime_args.h" @@ -57,6 +56,7 @@ #include "mongo/db/repl/update_position_args.h" #include "mongo/db/server_options.h" #include "mongo/db/write_concern_options.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" @@ -68,6 +68,7 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs; Status kInterruptedStatus(ErrorCodes::Interrupted, "operation was interrupted"); diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index e7bd4d93703..2359de1f751 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -34,14 +34,15 @@ #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/is_master_response.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/operation_context_repl_mock.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/replication_coordinator_external_state_mock.h" #include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -55,6 +56,8 @@ namespace { } } // namespace + using executor::NetworkInterfaceMock; + ReplicaSetConfig ReplCoordTest::assertMakeRSConfig(const BSONObj& configBson) { ReplicaSetConfig config; ASSERT_OK(config.initialize(configBson)); @@ -104,10 +107,12 @@ namespace { _topo = new TopologyCoordinatorImpl(Seconds(0)); _net = new NetworkInterfaceMock; + _storage = new StorageInterfaceMock; _externalState = new ReplicationCoordinatorExternalStateMock; _repl.reset(new ReplicationCoordinatorImpl(_settings, _externalState, _net, + _storage, _topo, seed)); } diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h index 93d3c7c4bd7..d6a67cced2e 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.h +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h @@ -41,12 +41,16 @@ namespace mongo { class BSONObj; struct HostAndPort; +namespace executor { + class NetworkInterfaceMock; +} // namespace executor + namespace repl { - class NetworkInterfaceMock; class ReplicaSetConfig; class ReplicationCoordinatorExternalStateMock; class ReplicationCoordinatorImpl; + class StorageInterfaceMock; class TopologyCoordinatorImpl; /** @@ -58,7 +62,7 @@ namespace repl { * Makes a ResponseStatus with the given "doc" response and optional elapsed time "millis". */ static ResponseStatus makeResponseStatus(const BSONObj& doc, - Milliseconds millis = Milliseconds(0)); + Milliseconds millis = Milliseconds(0)); /** * Constructs a ReplicaSetConfig from the given BSON, or raises a test failure exception. @@ -75,7 +79,7 @@ namespace repl { /** * Gets the network mock. */ - NetworkInterfaceMock* getNet() { return _net; } + executor::NetworkInterfaceMock* getNet() { return _net; } /** * Gets the replication coordinator under test. @@ -182,7 +186,9 @@ namespace repl { // Owned by ReplicationCoordinatorImpl TopologyCoordinatorImpl* _topo; // Owned by ReplicationCoordinatorImpl - NetworkInterfaceMock* _net; + executor::NetworkInterfaceMock* _net; + // Owned by ReplicationCoordinatorImpl + StorageInterfaceMock* _storage; // Owned by ReplicationCoordinatorImpl ReplicationCoordinatorExternalStateMock* _externalState; ReplSettings _settings; diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp index 118ad0d0ded..72c611b600a 100644 --- a/src/mongo/db/repl/replication_executor.cpp +++ b/src/mongo/db/repl/replication_executor.cpp @@ -33,6 +33,8 @@ #include <limits> #include "mongo/db/repl/database_task.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/executor/network_interface.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" @@ -43,9 +45,14 @@ namespace { stdx::function<void ()> makeNoExcept(const stdx::function<void ()> &fn); } // namespace - ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface, int64_t prngSeed) : + using executor::NetworkInterface; + + ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface, + StorageInterface* storageInterface, + int64_t prngSeed) : _random(prngSeed), _networkInterface(netInterface), + _storageInterface(storageInterface), _totalEventWaiters(0), _inShutdown(false), _dblockWorkers(threadpool::ThreadPool::DoNotStartThreadsTag(), @@ -53,10 +60,10 @@ namespace { "replCallbackWithGlobalLock-"), _dblockTaskRunner( &_dblockWorkers, - stdx::bind(&NetworkInterface::createOperationContext, netInterface)), + stdx::bind(&StorageInterface::createOperationContext, storageInterface)), _dblockExclusiveLockTaskRunner( &_dblockWorkers, - stdx::bind(&NetworkInterface::createOperationContext, netInterface)), + stdx::bind(&StorageInterface::createOperationContext, storageInterface)), _nextId(0) { } @@ -561,15 +568,6 @@ namespace { isSignaledCondition(new boost::condition_variable) { } - // This is a bitmask with the first bit set. It's used to mark connections that should be kept - // open during stepdowns. -#ifndef _MSC_EXTENSIONS - const unsigned int ReplicationExecutor::NetworkInterface::kMessagingPortKeepOpen; -#endif // _MSC_EXTENSIONS - - ReplicationExecutor::NetworkInterface::NetworkInterface() {} - ReplicationExecutor::NetworkInterface::~NetworkInterface() {} - namespace { void callNoExcept(const stdx::function<void ()>& fn) { diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h index b44ee4a4815..b90d0e24133 100644 --- a/src/mongo/db/repl/replication_executor.h +++ b/src/mongo/db/repl/replication_executor.h @@ -53,8 +53,14 @@ namespace mongo { class NamespaceString; class OperationContext; +namespace executor{ + class NetworkInterface; +} // namespace executor + namespace repl { + class StorageInterface; + /** * Event loop for driving state machines in replication. * @@ -112,7 +118,6 @@ namespace repl { struct CallbackData; class CallbackHandle; class EventHandle; - class NetworkInterface; struct RemoteCommandCallbackData; typedef StatusWith<RemoteCommandResponse> ResponseStatus; @@ -141,7 +146,9 @@ namespace repl { * * Takes ownership of the passed NetworkInterface object. */ - explicit ReplicationExecutor(NetworkInterface* netInterface, int64_t pnrgSeed); + ReplicationExecutor(executor::NetworkInterface* netInterface, + StorageInterface* storageInterface, + int64_t pnrgSeed); /** * Destroys an executor. @@ -422,7 +429,8 @@ namespace repl { // PRNG; seeded at class construction time. PseudoRandom _random; - boost::scoped_ptr<NetworkInterface> _networkInterface; + boost::scoped_ptr<executor::NetworkInterface> _networkInterface; + boost::scoped_ptr<StorageInterface> _storageInterface; boost::mutex _mutex; boost::mutex _terribleExLockSyncMutex; boost::condition_variable _noMoreWaitingThreads; @@ -509,92 +517,8 @@ namespace repl { OperationContext* txn; }; - /** - * Interface to networking and lock manager. - */ - class ReplicationExecutor::NetworkInterface { - MONGO_DISALLOW_COPYING(NetworkInterface); - public: - - // A flag to keep replication MessagingPorts open when all other sockets are disconnected. - static const unsigned int kMessagingPortKeepOpen = 1; - - typedef RemoteCommandResponse Response; - typedef stdx::function<void (const ResponseStatus&)> RemoteCommandCompletionFn; - - virtual ~NetworkInterface(); - - /** - * Returns diagnostic info. - */ - virtual std::string getDiagnosticString() = 0; - - /** - * Starts up the network interface. - * - * It is valid to call all methods except shutdown() before this method completes. That is, - * implementations may not assume that startup() completes before startCommand() first - * executes. - * - * Called by the owning ReplicationExecutor inside its run() method. - */ - virtual void startup() = 0; - - /** - * Shuts down the network interface. Must be called before this instance gets deleted, - * if startup() is called. - * - * Called by the owning ReplicationExecutor inside its run() method. - */ - virtual void shutdown() = 0; - - /** - * Blocks the current thread (presumably the executor thread) until the network interface - * knows of work for the executor to perform. - */ - virtual void waitForWork() = 0; - - /** - * Similar to waitForWork, but only blocks until "when". - */ - virtual void waitForWorkUntil(Date_t when) = 0; - - /** - * Signals to the network interface that there is new work (such as a signaled event) for - * the executor to process. Wakes the executor from waitForWork() and friends. - */ - virtual void signalWorkAvailable() = 0; - - /** - * Returns the current time. - */ - virtual Date_t now() = 0; - - /** - * Starts asynchronous execution of the command described by "request". - */ - virtual void startCommand(const CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) = 0; - - /** - * Requests cancelation of the network activity associated with "cbHandle" if it has not yet - * completed. - */ - virtual void cancelCommand(const CallbackHandle& cbHandle) = 0; - - /** - * Creates an operation context for running database operations. - */ - virtual OperationContext* createOperationContext() = 0; - - protected: - NetworkInterface(); - }; - typedef ReplicationExecutor::ResponseStatus ResponseStatus; - // Must be after NetworkInterface class struct ReplicationExecutor::RemoteCommandCallbackData { RemoteCommandCallbackData(ReplicationExecutor* theExecutor, const CallbackHandle& theHandle, diff --git a/src/mongo/db/repl/replication_executor_test.cpp b/src/mongo/db/repl/replication_executor_test.cpp index dbc4f00b6fa..d916f3e01d2 100644 --- a/src/mongo/db/repl/replication_executor_test.cpp +++ b/src/mongo/db/repl/replication_executor_test.cpp @@ -34,9 +34,10 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/replication_executor_test_fixture.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/barrier.h" #include "mongo/unittest/unittest.h" @@ -48,6 +49,8 @@ namespace repl { namespace { + using executor::NetworkInterfaceMock; + bool operator==(const RemoteCommandRequest lhs, const RemoteCommandRequest rhs) { return lhs.target == rhs.target && @@ -155,6 +158,7 @@ namespace { void onGoAfterTriggered(const ReplicationExecutor::CallbackData& cbData); NetworkInterfaceMock* net; + StorageInterfaceMock* storage; ReplicationExecutor executor; boost::thread executorThread; const ReplicationExecutor::EventHandle goEvent; @@ -176,7 +180,8 @@ namespace { EventChainAndWaitingTest::EventChainAndWaitingTest() : net(new NetworkInterfaceMock), - executor(net, prngSeed), + storage(new StorageInterfaceMock), + executor(net, storage, prngSeed), executorThread(stdx::bind(&ReplicationExecutor::run, &executor)), goEvent(unittest::assertGet(executor.makeEvent())), event2(unittest::assertGet(executor.makeEvent())), diff --git a/src/mongo/db/repl/replication_executor_test_fixture.cpp b/src/mongo/db/repl/replication_executor_test_fixture.cpp index b33e1983e82..6efbe072d52 100644 --- a/src/mongo/db/repl/replication_executor_test_fixture.cpp +++ b/src/mongo/db/repl/replication_executor_test_fixture.cpp @@ -30,8 +30,9 @@ #include "mongo/db/repl/replication_executor_test_fixture.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/executor/network_interface_mock.h" namespace mongo { namespace repl { @@ -46,19 +47,20 @@ namespace { ASSERT(!_executorThread); _executorThread.reset( new boost::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); - _net->enterNetwork(); + getNet()->enterNetwork(); } void ReplicationExecutorTest::joinExecutorThread() { ASSERT(_executorThread); - _net->exitNetwork(); + getNet()->exitNetwork(); _executorThread->join(); _executorThread.reset(); } void ReplicationExecutorTest::setUp() { - _net = new NetworkInterfaceMock; - _executor.reset(new ReplicationExecutor(_net, prngSeed)); + _net = new executor::NetworkInterfaceMock; + _storage = new StorageInterfaceMock; + _executor.reset(new ReplicationExecutor(_net, _storage, prngSeed)); } void ReplicationExecutorTest::tearDown() { diff --git a/src/mongo/db/repl/replication_executor_test_fixture.h b/src/mongo/db/repl/replication_executor_test_fixture.h index ad9bc345ea0..479d799b990 100644 --- a/src/mongo/db/repl/replication_executor_test_fixture.h +++ b/src/mongo/db/repl/replication_executor_test_fixture.h @@ -34,12 +34,17 @@ #include "mongo/unittest/unittest.h" namespace mongo { + +namespace executor { + class NetworkInterfaceMock; +} // namespace executor + namespace repl { using std::unique_ptr; - class NetworkInterfaceMock; class ReplicationExecutor; + class StorageInterfaceMock; /** * Test fixture for tests that require a ReplicationExecutor backed by @@ -47,7 +52,7 @@ namespace repl { */ class ReplicationExecutorTest : public unittest::Test { protected: - NetworkInterfaceMock* getNet() { return _net; } + executor::NetworkInterfaceMock* getNet() { return _net; } ReplicationExecutor& getExecutor() { return *_executor; } /** * Runs ReplicationExecutor in background. @@ -80,7 +85,8 @@ namespace repl { private: - NetworkInterfaceMock* _net; + executor::NetworkInterfaceMock* _net; + StorageInterfaceMock* _storage; unique_ptr<ReplicationExecutor> _executor; unique_ptr<boost::thread> _executorThread; }; diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index 37e1dfe9deb..600fbfe52a8 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -54,6 +54,7 @@ #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/update_position_args.h" #include "mongo/db/storage/storage_engine.h" +#include "mongo/executor/network_interface.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -709,7 +710,7 @@ namespace { { AbstractMessagingPort *mp = txn->getClient()->port(); if( mp ) - mp->tag |= ReplicationExecutor::NetworkInterface::kMessagingPortKeepOpen; + mp->tag |= executor::NetworkInterface::kMessagingPortKeepOpen; } if (getGlobalReplicationCoordinator()->isV1ElectionProtocol()) { diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index fa680a44612..ed617fa0cdf 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -28,9 +28,9 @@ #include "mongo/platform/basic.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/db/repl/reporter.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" @@ -38,6 +38,7 @@ namespace { using namespace mongo; using namespace mongo::repl; + using executor::NetworkInterfaceMock; class MockProgressManager : public ReplicationProgressManager { public: diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp index 1fc6765e58e..49209e9a31a 100644 --- a/src/mongo/db/repl/scatter_gather_test.cpp +++ b/src/mongo/db/repl/scatter_gather_test.cpp @@ -31,10 +31,11 @@ #include <boost/scoped_ptr.hpp> #include <boost/thread.hpp> -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/scatter_gather_algorithm.h" #include "mongo/db/repl/scatter_gather_runner.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" @@ -42,6 +43,8 @@ namespace mongo { namespace repl { namespace { + using executor::NetworkInterfaceMock; + /** * Algorithm for testing the ScatterGatherRunner, which will finish running when finish() is * called, or upon receiving responses from two nodes. Creates a three requests algorithm @@ -112,13 +115,15 @@ namespace { // owned by _executor NetworkInterfaceMock* _net; + StorageInterfaceMock* _storage; boost::scoped_ptr<ReplicationExecutor> _executor; boost::scoped_ptr<boost::thread> _executorThread; }; void ScatterGatherTest::setUp() { _net = new NetworkInterfaceMock; - _executor.reset(new ReplicationExecutor(_net, 1 /* prng seed */)); + _storage = new StorageInterfaceMock; + _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng seed */)); _executorThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); } diff --git a/src/mongo/db/repl/storage_interface.cpp b/src/mongo/db/repl/storage_interface.cpp new file mode 100644 index 00000000000..c09d76ad9ff --- /dev/null +++ b/src/mongo/db/repl/storage_interface.cpp @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/storage_interface.h" + +namespace mongo { +namespace repl { + + StorageInterface::StorageInterface() {} + StorageInterface::~StorageInterface() {} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h new file mode 100644 index 00000000000..df51692b2f1 --- /dev/null +++ b/src/mongo/db/repl/storage_interface.h @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#pragma once + + +namespace mongo { + + class OperationContext; + +namespace repl { + + /** + * Storage interface used by used by the ReplicationExecutor inside mongod for supporting + * ReplicationExectutor's ability to take database locks. + */ + class StorageInterface { + public: + virtual ~StorageInterface(); + + /** + * Creates an operation context for running database operations. + */ + virtual OperationContext* createOperationContext() = 0; + + protected: + + StorageInterface(); + + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp new file mode 100644 index 00000000000..73a14ce6330 --- /dev/null +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/storage_interface_impl.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/client.h" +#include "mongo/db/operation_context_impl.h" + +namespace mongo { +namespace repl { + + StorageInterfaceImpl::StorageInterfaceImpl() : StorageInterface() {} + StorageInterfaceImpl::~StorageInterfaceImpl() { } + + OperationContext* StorageInterfaceImpl::createOperationContext() { + if (!ClientBasic::getCurrent()) { + Client::initThreadIfNotAlready(); + AuthorizationSession::get(*ClientBasic::getCurrent())->grantInternalAuthorization(); + } + return new OperationContextImpl(); + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h new file mode 100644 index 00000000000..24cc8268f17 --- /dev/null +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#pragma once + +#include "mongo/db/repl/storage_interface.h" + +namespace mongo { + + class OperationContext; + +namespace repl { + + class StorageInterfaceImpl : public StorageInterface { + public: + explicit StorageInterfaceImpl(); + virtual ~StorageInterfaceImpl(); + + OperationContext* createOperationContext() override; + + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp new file mode 100644 index 00000000000..4a6f4a7a293 --- /dev/null +++ b/src/mongo/db/repl/storage_interface_mock.cpp @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/storage_interface_mock.h" + +#include "mongo/db/repl/operation_context_repl_mock.h" + +namespace mongo { +namespace repl { + + StorageInterfaceMock::StorageInterfaceMock() {} + + StorageInterfaceMock::~StorageInterfaceMock() { } + + OperationContext* StorageInterfaceMock::createOperationContext() { + return new OperationContextReplMock(); + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h new file mode 100644 index 00000000000..4bd3e63ec9d --- /dev/null +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#pragma once + +#include "mongo/db/repl/storage_interface.h" + +namespace mongo { + + class OperationContext; + +namespace repl { + + class StorageInterfaceMock : public StorageInterface { + public: + explicit StorageInterfaceMock(); + virtual ~StorageInterfaceMock(); + + OperationContext* createOperationContext() override; + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/vote_requester_test.cpp b/src/mongo/db/repl/vote_requester_test.cpp index 09632675239..90d0843d843 100644 --- a/src/mongo/db/repl/vote_requester_test.cpp +++ b/src/mongo/db/repl/vote_requester_test.cpp @@ -33,9 +33,9 @@ #include "mongo/base/status.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/vote_requester.h" -#include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/repl_set_request_votes_args.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" #include "mongo/util/mongoutils/str.h" @@ -43,6 +43,8 @@ namespace mongo { namespace repl { namespace { + + using executor::NetworkInterfaceMock; using unittest::assertGet; using RemoteCommandRequest = RemoteCommandRequest; diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript new file mode 100644 index 00000000000..448325b1172 --- /dev/null +++ b/src/mongo/executor/SConscript @@ -0,0 +1,22 @@ +# -*- mode: python -*- + +Import("env") + +env.Library(target='network_interface', + source=['network_interface.cpp',], + LIBDEPS=[ + ]) + +env.Library(target='network_interface_impl', # TODO: rename to thread_pool_network_interface + source=['network_interface_impl.cpp',], + LIBDEPS=[ + '$BUILD_DIR/mongo/client/remote_command_runner_impl', + 'network_interface', + # TODO: add dependency on the task executor *interface* once available. + ]) + +env.Library('network_interface_mock', + 'network_interface_mock.cpp', + LIBDEPS=[ + 'network_interface', + ])
\ No newline at end of file diff --git a/src/mongo/executor/network_interface.cpp b/src/mongo/executor/network_interface.cpp new file mode 100644 index 00000000000..f748efabfeb --- /dev/null +++ b/src/mongo/executor/network_interface.cpp @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/executor/network_interface.h" + + +namespace mongo { +namespace executor { + + // This is a bitmask with the first bit set. It's used to mark connections that should be kept + // open during stepdowns. +#ifndef _MSC_EXTENSIONS + const unsigned int NetworkInterface::kMessagingPortKeepOpen; +#endif // _MSC_EXTENSIONS + + NetworkInterface::NetworkInterface() {} + NetworkInterface::~NetworkInterface() {} + + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h new file mode 100644 index 00000000000..4390e9721e0 --- /dev/null +++ b/src/mongo/executor/network_interface.h @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2014-2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/base/disallow_copying.h" +#include "mongo/db/repl/replication_executor.h" +#include "mongo/stdx/functional.h" + +namespace mongo { +namespace executor { + + /** + * Interface to networking used by ReplicationExecutor. + * + * TODO(spencer): Change to use a TaskExecutor once that interface is available. + */ + class NetworkInterface { + MONGO_DISALLOW_COPYING(NetworkInterface); + public: + + // A flag to keep replication MessagingPorts open when all other sockets are disconnected. + static const unsigned int kMessagingPortKeepOpen = 1; + + typedef RemoteCommandResponse Response; + typedef stdx::function<void (const repl::ResponseStatus&)> RemoteCommandCompletionFn; + + virtual ~NetworkInterface(); + + /** + * Returns diagnostic info. + */ + virtual std::string getDiagnosticString() = 0; + + /** + * Starts up the network interface. + * + * It is valid to call all methods except shutdown() before this method completes. That is, + * implementations may not assume that startup() completes before startCommand() first + * executes. + * + * Called by the owning TaskExecutor inside its run() method. + */ + virtual void startup() = 0; + + /** + * Shuts down the network interface. Must be called before this instance gets deleted, + * if startup() is called. + * + * Called by the owning TaskExecutor inside its run() method. + */ + virtual void shutdown() = 0; + + /** + * Blocks the current thread (presumably the executor thread) until the network interface + * knows of work for the executor to perform. + */ + virtual void waitForWork() = 0; + + /** + * Similar to waitForWork, but only blocks until "when". + */ + virtual void waitForWorkUntil(Date_t when) = 0; + + /** + * Signals to the network interface that there is new work (such as a signaled event) for + * the executor to process. Wakes the executor from waitForWork() and friends. + */ + virtual void signalWorkAvailable() = 0; + + /** + * Returns the current time. + */ + virtual Date_t now() = 0; + + /** + * Starts asynchronous execution of the command described by "request". + */ + virtual void startCommand(const repl::ReplicationExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) = 0; + + /** + * Requests cancelation of the network activity associated with "cbHandle" if it has not yet + * completed. + */ + virtual void cancelCommand(const repl::ReplicationExecutor::CallbackHandle& cbHandle) = 0; + + protected: + NetworkInterface(); + }; + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/db/repl/network_interface_impl.cpp b/src/mongo/executor/network_interface_impl.cpp index 9e76f27a8be..129c082cd32 100644 --- a/src/mongo/db/repl/network_interface_impl.cpp +++ b/src/mongo/executor/network_interface_impl.cpp @@ -26,19 +26,16 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor #include "mongo/platform/basic.h" -#include "mongo/db/repl/network_interface_impl.h" +#include "mongo/executor/network_interface_impl.h" #include <boost/make_shared.hpp> #include <memory> -#include "mongo/db/auth/authorization_session.h" #include "mongo/client/connection_pool.h" -#include "mongo/db/client.h" -#include "mongo/db/operation_context_impl.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/stdx/thread.h" #include "mongo/util/assert_util.h" @@ -46,7 +43,7 @@ #include "mongo/util/time_support.h" namespace mongo { -namespace repl { +namespace executor { namespace { @@ -57,6 +54,7 @@ namespace { } // namespace NetworkInterfaceImpl::NetworkInterfaceImpl() : + NetworkInterface(), _numIdleThreads(0), _nextThreadId(0), _lastFullUtilizationDate(), @@ -200,7 +198,7 @@ namespace { ++_numActiveNetworkRequests; --_numIdleThreads; lk.unlock(); - ResponseStatus result = _commandRunner.runCommand(todo.request); + repl::ResponseStatus result = _commandRunner.runCommand(todo.request); LOG(2) << "Network status of sending " << todo.request.cmdObj.firstElementFieldName() << " to " << todo.request.target << " was " << result.getStatus(); todo.onFinish(result); @@ -226,11 +224,11 @@ namespace { } severe().stream() << "Could not find this thread, with id " << stdx::this_thread::get_id() << " in the replication networking thread pool"; - fassertFailedNoTrace(28581); + fassertFailedNoTrace(28676); } void NetworkInterfaceImpl::startCommand( - const ReplicationExecutor::CallbackHandle& cbHandle, + const repl::ReplicationExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) { LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " << @@ -250,7 +248,8 @@ namespace { _hasPending.notify_one(); } - void NetworkInterfaceImpl::cancelCommand(const ReplicationExecutor::CallbackHandle& cbHandle) { + void NetworkInterfaceImpl::cancelCommand( + const repl::ReplicationExecutor::CallbackHandle& cbHandle) { boost::unique_lock<boost::mutex> lk(_mutex); CommandDataList::iterator iter; for (iter = _pending.begin(); iter != _pending.end(); ++iter) { @@ -266,7 +265,7 @@ namespace { iter->request.target; _pending.erase(iter); lk.unlock(); - onFinish(ResponseStatus(ErrorCodes::CallbackCanceled, "Callback canceled")); + onFinish(repl::ResponseStatus(ErrorCodes::CallbackCanceled, "Callback canceled")); lk.lock(); _signalWorkAvailable_inlock(); } @@ -275,13 +274,5 @@ namespace { return Date_t::now(); } - OperationContext* NetworkInterfaceImpl::createOperationContext() { - if (!ClientBasic::getCurrent()) { - Client::initThreadIfNotAlready(); - AuthorizationSession::get(*ClientBasic::getCurrent())->grantInternalAuthorization(); - } - return new OperationContextImpl(); - } - -} // namespace repl +} // namespace executor } // namespace mongo diff --git a/src/mongo/db/repl/network_interface_impl.h b/src/mongo/executor/network_interface_impl.h index 4350f7c000a..c53dd3647dd 100644 --- a/src/mongo/db/repl/network_interface_impl.h +++ b/src/mongo/executor/network_interface_impl.h @@ -36,14 +36,15 @@ #include <vector> #include "mongo/client/remote_command_runner_impl.h" -#include "mongo/db/repl/replication_executor.h" +#include "mongo/executor/network_interface.h" #include "mongo/stdx/list.h" namespace mongo { -namespace repl { +namespace executor { /** - * Implementation of the network interface used by the ReplicationExecutor inside mongod. + * Implementation of the network interface for use by classes implementing TaskExecutor + * inside mongod. * * This implementation manages a dynamically sized group of worker threads for performing * network operations. The minimum and maximum number of threads is set at compile time, and @@ -65,10 +66,11 @@ namespace repl { * The implementation also manages a pool of network connections to recently contacted remote * nodes. The size of this pool is not bounded, but connections are retired unconditionally * after they have been connected for a certain maximum period. + * TODO(spencer): Rename this to ThreadPoolNetworkInterface */ - class NetworkInterfaceImpl : public ReplicationExecutor::NetworkInterface { + class NetworkInterfaceImpl : public NetworkInterface { public: - explicit NetworkInterfaceImpl(); + NetworkInterfaceImpl(); virtual ~NetworkInterfaceImpl(); virtual std::string getDiagnosticString(); virtual void startup(); @@ -78,20 +80,18 @@ namespace repl { virtual void signalWorkAvailable(); virtual Date_t now(); virtual void startCommand( - const ReplicationExecutor::CallbackHandle& cbHandle, + const repl::ReplicationExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish); - virtual void cancelCommand(const ReplicationExecutor::CallbackHandle& cbHandle); - OperationContext* createOperationContext() override; - - std::string getNextCallbackWithGlobalLockThreadName(); + virtual void cancelCommand(const repl::ReplicationExecutor::CallbackHandle& cbHandle); private: + /** * Information describing an in-flight command. */ struct CommandData { - ReplicationExecutor::CallbackHandle cbHandle; + repl::ReplicationExecutor::CallbackHandle cbHandle; RemoteCommandRequest request; RemoteCommandCompletionFn onFinish; }; @@ -160,5 +160,5 @@ namespace repl { size_t _numActiveNetworkRequests; }; -} // namespace repl +} // namespace executor } // namespace mongo diff --git a/src/mongo/db/repl/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 80263365eba..8acb6d3f86c 100644 --- a/src/mongo/db/repl/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/repl/network_interface_mock.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/db/repl/operation_context_repl_mock.h" #include "mongo/db/repl/replication_executor.h" @@ -36,7 +36,7 @@ #include "mongo/util/time_support.h" namespace mongo { -namespace repl { +namespace executor { NetworkInterfaceMock::NetworkInterfaceMock() : _waitingToRunMask(0), @@ -64,12 +64,8 @@ namespace repl { return _now_inlock(); } - OperationContext* NetworkInterfaceMock::createOperationContext() { - return new OperationContextReplMock(); - } - void NetworkInterfaceMock::startCommand( - const ReplicationExecutor::CallbackHandle& cbHandle, + const repl::ReplicationExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) { @@ -96,13 +92,13 @@ namespace repl { return false; } scheduled->splice(scheduled->begin(), *other, noi); - noi->setResponse(now, ResponseStatus(ErrorCodes::CallbackCanceled, - "Network operation canceled")); + noi->setResponse(now, repl::ResponseStatus(ErrorCodes::CallbackCanceled, + "Network operation canceled")); return true; } void NetworkInterfaceMock::cancelCommand( - const ReplicationExecutor::CallbackHandle& cbHandle) { + const repl::ReplicationExecutor::CallbackHandle& cbHandle) { boost::lock_guard<boost::mutex> lk(_mutex); invariant(!_inShutdown); stdx::function<bool (const NetworkOperation&)> matchesHandle = stdx::bind( @@ -146,8 +142,8 @@ namespace repl { _waitingToRunMask |= kExecutorThread; // Prevents network thread from scheduling. lk.unlock(); for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) { - iter->setResponse(now, ResponseStatus(ErrorCodes::ShutdownInProgress, - "Shutting down mock network")); + iter->setResponse(now, repl::ResponseStatus(ErrorCodes::ShutdownInProgress, + "Shutting down mock network")); iter->finishResponse(); } lk.lock(); @@ -208,7 +204,7 @@ namespace repl { void NetworkInterfaceMock::scheduleResponse( NetworkOperationIterator noi, Date_t when, - const ResponseStatus& response) { + const repl::ResponseStatus& response) { boost::lock_guard<boost::mutex> lk(_mutex); invariant(_currentlyRunning == kNetworkThread); @@ -364,7 +360,7 @@ namespace repl { } NetworkInterfaceMock::NetworkOperation::NetworkOperation( - const ReplicationExecutor::CallbackHandle& cbHandle, + const repl::ReplicationExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& theRequest, Date_t theRequestDate, const RemoteCommandCompletionFn& onFinish) @@ -388,7 +384,7 @@ namespace repl { void NetworkInterfaceMock::NetworkOperation::setResponse( Date_t responseDate, - const ResponseStatus& response) { + const repl::ResponseStatus& response) { invariant(responseDate >= _requestDate); _responseDate = responseDate; @@ -401,5 +397,5 @@ namespace repl { _onFinish = RemoteCommandCompletionFn(); } -} // namespace repl +} // namespace executor } // namespace mongo diff --git a/src/mongo/db/repl/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 255eb7efec7..2ec67edbc86 100644 --- a/src/mongo/db/repl/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -32,23 +32,23 @@ #include <boost/thread/condition_variable.hpp> #include <map> -#include "mongo/db/repl/replication_executor.h" +#include "mongo/executor/network_interface.h" #include "mongo/util/time_support.h" namespace mongo { -namespace repl { +namespace executor { /** * Mock network implementation for use in unit tests. * * To use, construct a new instance on the heap, and keep a pointer to it. Pass - * the pointer to the instance into the ReplicationExecutor constructor, transfering + * the pointer to the instance into the TaskExecutor constructor, transferring * ownership. Start the executor's run() method in a separate thread, schedule the * work you want to test into the executor, then while the test is still going, iterate * through the ready network requests, servicing them and advancing time as needed. * * The mock has a fully virtualized notion of time and the the network. When the - * replication executor under test schedules a network operation, the startCommand + * executor under test schedules a network operation, the startCommand * method of this class adds an entry to the _unscheduled queue for immediate consideration. * The test driver loop, when it examines the request, may schedule a response, ask the * interface to redeliver the request at a later virtual time, or to swallow the virtual @@ -59,7 +59,7 @@ namespace repl { * The thread acting as the "network" and the executor run thread are highly synchronized * by this code, allowing for deterministic control of operation interleaving. */ - class NetworkInterfaceMock : public ReplicationExecutor::NetworkInterface { + class NetworkInterfaceMock : public NetworkInterface { public: class NetworkOperation; typedef stdx::list<NetworkOperation> NetworkOperationList; @@ -71,7 +71,7 @@ namespace repl { //////////////////////////////////////////////////////////////////////////////// // - // ReplicationExecutor::NetworkInterface methods + // NetworkInterface methods // //////////////////////////////////////////////////////////////////////////////// @@ -81,11 +81,10 @@ namespace repl { virtual void waitForWorkUntil(Date_t when); virtual void signalWorkAvailable(); virtual Date_t now(); - virtual void startCommand(const ReplicationExecutor::CallbackHandle& cbHandle, + virtual void startCommand(const repl::ReplicationExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish); - virtual void cancelCommand(const ReplicationExecutor::CallbackHandle& cbHandle); - OperationContext* createOperationContext() override; + virtual void cancelCommand(const repl::ReplicationExecutor::CallbackHandle& cbHandle); //////////////////////////////////////////////////////////////////////////////// @@ -108,7 +107,7 @@ namespace repl { /** * Causes the currently running thread to drop the mantle of "network simulation thread". * - * Call this before calling any methods that might block waiting for the replciation + * Call this before calling any methods that might block waiting for the * executor thread. */ void exitNetwork(); @@ -131,7 +130,7 @@ namespace repl { void scheduleResponse( NetworkOperationIterator noi, Date_t when, - const ResponseStatus& response); + const repl::ResponseStatus& response); /** * Swallows "noi", causing the network interface to not respond to it until @@ -253,7 +252,7 @@ namespace repl { // Pointer to the executor into which this mock is installed. Used to signal the executor // when the clock changes. - ReplicationExecutor* _executor; // (R) + repl::ReplicationExecutor* _executor; // (R) }; /** @@ -262,7 +261,7 @@ namespace repl { class NetworkInterfaceMock::NetworkOperation { public: NetworkOperation(); - NetworkOperation(const ReplicationExecutor::CallbackHandle& cbHandle, + NetworkOperation(const repl::ReplicationExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& theRequest, Date_t theRequestDate, const RemoteCommandCompletionFn& onFinish); @@ -277,13 +276,13 @@ namespace repl { /** * Sets the response and thet virtual time at which it will be delivered. */ - void setResponse(Date_t responseDate, const ResponseStatus& response); + void setResponse(Date_t responseDate, const repl::ResponseStatus& response); /** * Predicate that returns true if cbHandle equals the executor's handle for this network * operation. Used for searching lists of NetworkOperations. */ - bool isForCallback(const ReplicationExecutor::CallbackHandle& cbHandle) const { + bool isForCallback(const repl::ReplicationExecutor::CallbackHandle& cbHandle) const { return cbHandle == _cbHandle; } @@ -318,11 +317,11 @@ namespace repl { Date_t _requestDate; Date_t _nextConsiderationDate; Date_t _responseDate; - ReplicationExecutor::CallbackHandle _cbHandle; + repl::ReplicationExecutor::CallbackHandle _cbHandle; RemoteCommandRequest _request; - ResponseStatus _response; + repl::ResponseStatus _response; RemoteCommandCompletionFn _onFinish; }; -} // namespace repl +} // namespace executor } // namespace mongo diff --git a/src/mongo/logger/log_component.cpp b/src/mongo/logger/log_component.cpp index 63bf64be0cc..8508331c8b6 100644 --- a/src/mongo/logger/log_component.cpp +++ b/src/mongo/logger/log_component.cpp @@ -94,6 +94,7 @@ std::string _dottedNames[LogComponent::kNumLogComponents+1]; case kAccessControl: return createStringData("accessControl"); case kCommand: return createStringData("command"); case kControl: return createStringData("control"); + case kExecutor: return createStringData("executor"); case kGeo: return createStringData("geo"); case kIndex: return createStringData("index"); case kNetwork: return createStringData("network"); @@ -139,6 +140,7 @@ std::string _dottedNames[LogComponent::kNumLogComponents+1]; case kAccessControl: return createStringData("ACCESS "); case kCommand: return createStringData("COMMAND "); case kControl: return createStringData("CONTROL "); + case kExecutor: return createStringData("EXECUTOR"); case kGeo: return createStringData("GEO "); case kIndex: return createStringData("INDEX "); case kNetwork: return createStringData("NETWORK "); diff --git a/src/mongo/logger/log_component.h b/src/mongo/logger/log_component.h index 5ccd9ada003..06afb0012c5 100644 --- a/src/mongo/logger/log_component.h +++ b/src/mongo/logger/log_component.h @@ -47,6 +47,7 @@ namespace logger { kAccessControl, kCommand, kControl, + kExecutor, kGeo, kIndex, kNetwork, |