summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2015-06-03 16:31:14 -0400
committerSpencer T Brody <spencer@mongodb.com>2015-06-04 16:04:27 -0400
commit5f9ae8cbea0bcf2601ca7d9ec8cd4de5beb236eb (patch)
tree64e8ab53d71fe3eea3f2f9c9d01a9d899dccd235 /src/mongo/db
parent0d3d62e2fb017512aee2ae2be6f128e573a0bf5a (diff)
downloadmongo-5f9ae8cbea0bcf2601ca7d9ec8cd4de5beb236eb.tar.gz
SERVER-18623 Split NetworkInterface and StorageInterface out from ReplicationExecutor
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/db.cpp6
-rw-r--r--src/mongo/db/repl/SConscript23
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.cpp20
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.h10
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change_test.cpp9
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp1
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp3
-rw-r--r--src/mongo/db/repl/database_cloner_test.cpp1
-rw-r--r--src/mongo/db/repl/elect_cmd_runner_test.cpp11
-rw-r--r--src/mongo/db/repl/election_winner_declarer_test.cpp3
-rw-r--r--src/mongo/db/repl/fetcher_test.cpp3
-rw-r--r--src/mongo/db/repl/freshness_checker_test.cpp10
-rw-r--r--src/mongo/db/repl/network_interface_impl.cpp287
-rw-r--r--src/mongo/db/repl/network_interface_impl.h164
-rw-r--r--src/mongo/db/repl/network_interface_mock.cpp405
-rw-r--r--src/mongo/db/repl/network_interface_mock.h328
-rw-r--r--src/mongo/db/repl/repl_set_heartbeat_response_test.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp13
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h9
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp7
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.h14
-rw-r--r--src/mongo/db/repl/replication_executor.cpp22
-rw-r--r--src/mongo/db/repl/replication_executor.h98
-rw-r--r--src/mongo/db/repl/replication_executor_test.cpp9
-rw-r--r--src/mongo/db/repl/replication_executor_test_fixture.cpp12
-rw-r--r--src/mongo/db/repl/replication_executor_test_fixture.h12
-rw-r--r--src/mongo/db/repl/replset_commands.cpp3
-rw-r--r--src/mongo/db/repl/reporter_test.cpp3
-rw-r--r--src/mongo/db/repl/scatter_gather_test.cpp9
-rw-r--r--src/mongo/db/repl/storage_interface.cpp42
-rw-r--r--src/mongo/db/repl/storage_interface.h59
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp54
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h50
-rw-r--r--src/mongo/db/repl/storage_interface_mock.cpp49
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h49
-rw-r--r--src/mongo/db/repl/vote_requester_test.cpp4
44 files changed, 475 insertions, 1357 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 4b32c6c46fd..c46484860d7 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -625,6 +625,7 @@ serveronlyEnv = env.Clone()
serveronlyEnv.InjectThirdPartyIncludePaths(libraries=['snappy'])
serveronlyLibdeps = [
"$BUILD_DIR/mongo/client/parallel",
+ "$BUILD_DIR/mongo/executor/network_interface_impl",
"$BUILD_DIR/mongo/s/batch_write_types",
"$BUILD_DIR/mongo/s/catalog/legacy/catalog_manager_legacy",
"$BUILD_DIR/mongo/s/client/sharding_connection_hook",
@@ -650,7 +651,6 @@ serveronlyLibdeps = [
"ops/update_driver",
"query/query",
"range_deleter",
- "repl/network_interface_impl",
"repl/repl_coordinator_global",
"repl/repl_coordinator_impl",
"repl/repl_settings",
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index f430cceb675..e5555bdd33e 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -77,12 +77,12 @@
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/range_deleter_service.h"
#include "mongo/db/repair_database.h"
-#include "mongo/db/repl/network_interface_impl.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator_external_state_impl.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
+#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/db/restapi.h"
#include "mongo/db/server_parameters.h"
@@ -93,6 +93,7 @@
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/storage_options.h"
#include "mongo/db/ttl.h"
+#include "mongo/executor/network_interface_impl.h"
#include "mongo/platform/process_id.h"
#include "mongo/scripting/engine.h"
#include "mongo/stdx/memory.h"
@@ -757,7 +758,8 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, ("SetGlobalEnviro
repl::ReplicationCoordinatorImpl* replCoord = new repl::ReplicationCoordinatorImpl(
getGlobalReplSettings(),
new repl::ReplicationCoordinatorExternalStateImpl,
- new repl::NetworkInterfaceImpl,
+ new executor::NetworkInterfaceImpl{},
+ new repl::StorageInterfaceImpl{},
new repl::TopologyCoordinatorImpl(Seconds(repl::maxSyncSourceLagSecs)),
static_cast<int64_t>(curTimeMillis64()));
repl::setGlobalReplicationCoordinator(replCoord);
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 146039a94cf..d041ef49b72 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -15,17 +15,20 @@ env.Library('rslog',
'$BUILD_DIR/mongo/logger/logger',
])
+env.Library('storage_interface',
+ 'storage_interface.cpp',
+ LIBDEPS=[
+ ])
+
env.Library(
- target='network_interface_impl',
+ target='storage_interface_impl',
source=[
- 'network_interface_impl.cpp',
+ 'storage_interface_impl.cpp',
],
LIBDEPS=[
- '$BUILD_DIR/mongo/base/base',
- '$BUILD_DIR/mongo/bson/bson',
- '$BUILD_DIR/mongo/client/remote_command_runner_impl',
- '$BUILD_DIR/mongo/db/coredb',
- '$BUILD_DIR/mongo/db/common',
+ '$BUILD_DIR/mongo/db/serveronly', # For OperationContextImpl
+ '$BUILD_DIR/mongo/db/service_context',
+ 'storage_interface',
])
env.Library(
@@ -38,7 +41,9 @@ env.Library(
LIBDEPS=[
'database_task',
'task_runner',
+ 'storage_interface',
'$BUILD_DIR/mongo/client/remote_command_runner',
+ '$BUILD_DIR/mongo/executor/network_interface',
'$BUILD_DIR/mongo/util/foundation',
'$BUILD_DIR/mongo/util/net/hostandport',
],
@@ -343,16 +348,18 @@ env.Library('repl_coordinator_global',
env.Library('replmocks',
[
- 'network_interface_mock.cpp',
'operation_context_repl_mock.cpp',
'replication_coordinator_external_state_mock.cpp',
'replication_coordinator_mock.cpp',
+ 'storage_interface_mock.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
+ '$BUILD_DIR/mongo/executor/network_interface_mock',
'repl_coordinator_interface',
'replica_set_messages',
'replication_executor',
+ 'storage_interface',
])
env.Library('read_after_optime_args',
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp
index 92518498300..33357e702e3 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp
@@ -106,7 +106,7 @@ namespace repl {
ReplicationExecutorTest::setUp();
clear();
launchExecutorThread();
- storageInterface.reset(new StorageInterfaceMock());
+ storageInterface.reset(new ClonerStorageInterfaceMock());
}
void BaseClonerTest::tearDown() {
@@ -245,21 +245,21 @@ namespace repl {
ASSERT_FALSE(getCloner()->isActive());
}
- Status StorageInterfaceMock::beginCollection(OperationContext* txn,
- const NamespaceString& nss,
- const CollectionOptions& options,
- const std::vector<BSONObj>& specs) {
+ Status ClonerStorageInterfaceMock::beginCollection(OperationContext* txn,
+ const NamespaceString& nss,
+ const CollectionOptions& options,
+ const std::vector<BSONObj>& specs) {
return beginCollectionFn ? beginCollectionFn(txn, nss, options, specs) : Status::OK();
}
- Status StorageInterfaceMock::insertDocuments(OperationContext* txn,
- const NamespaceString& nss,
- const std::vector<BSONObj>& docs) {
+ Status ClonerStorageInterfaceMock::insertDocuments(OperationContext* txn,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& docs) {
return insertDocumentsFn ? insertDocumentsFn(txn, nss, docs) : Status::OK();
}
- Status StorageInterfaceMock::commitCollection(OperationContext* txn,
- const NamespaceString& nss) {
+ Status ClonerStorageInterfaceMock::commitCollection(OperationContext* txn,
+ const NamespaceString& nss) {
return Status::OK();
}
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h
index 064003bbb6d..dca30fe8ae7 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.h
+++ b/src/mongo/db/repl/base_cloner_test_fixture.h
@@ -36,8 +36,8 @@
#include "mongo/db/clientcursor.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/collection_cloner.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/replication_executor_test_fixture.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/util/net/hostandport.h"
@@ -50,11 +50,11 @@ namespace mongo {
namespace repl {
class BaseCloner;
- class StorageInterfaceMock;
+ class ClonerStorageInterfaceMock;
class BaseClonerTest : public ReplicationExecutorTest {
public:
- typedef NetworkInterfaceMock::NetworkOperationIterator NetworkOperationIterator;
+ typedef executor::NetworkInterfaceMock::NetworkOperationIterator NetworkOperationIterator;
/**
* Creates an initial error status suitable for checking if
@@ -128,7 +128,7 @@ namespace repl {
protected:
- std::unique_ptr<StorageInterfaceMock> storageInterface;
+ std::unique_ptr<ClonerStorageInterfaceMock> storageInterface;
private:
@@ -141,7 +141,7 @@ namespace repl {
};
- class StorageInterfaceMock : public CollectionCloner::StorageInterface {
+ class ClonerStorageInterfaceMock : public CollectionCloner::StorageInterface {
public:
Status beginCollection(OperationContext* txn,
const NamespaceString& nss,
diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
index bc692b43288..63fe5684aa9 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
+++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
@@ -36,11 +36,12 @@
#include "mongo/base/status.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/check_quorum_for_config_change.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_response.h"
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/replication_executor.h"
+#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/platform/unordered_set.h"
#include "mongo/stdx/functional.h"
#include "mongo/unittest/unittest.h"
@@ -62,6 +63,8 @@ namespace mongo {
namespace repl {
namespace {
+ using executor::NetworkInterfaceMock;
+
class CheckQuorumTest : public mongo::unittest::Test {
protected:
CheckQuorumTest();
@@ -71,6 +74,7 @@ namespace {
bool isQuorumCheckDone();
NetworkInterfaceMock* _net;
+ StorageInterfaceMock* _storage;
boost::scoped_ptr<ReplicationExecutor> _executor;
private:
@@ -93,7 +97,8 @@ namespace {
void CheckQuorumTest::setUp() {
_net = new NetworkInterfaceMock;
- _executor.reset(new ReplicationExecutor(_net, 1 /* prng */ ));
+ _storage = new StorageInterfaceMock;
+ _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng */ ));
_executorThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run,
_executor.get())));
}
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index a6841ad2467..9b6927c4b43 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -35,7 +35,6 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/base_cloner_test_fixture.h"
#include "mongo/db/repl/collection_cloner.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/unittest/unittest.h"
namespace {
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 8fc727c7b38..6aef8263679 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/data_replicator.h"
#include "mongo/db/repl/fetcher.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
@@ -41,6 +40,7 @@
#include "mongo/db/repl/replication_executor_test_fixture.h"
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/unittest/unittest.h"
@@ -48,6 +48,7 @@
namespace {
using namespace mongo;
using namespace mongo::repl;
+ using executor::NetworkInterfaceMock;
const HostAndPort target("localhost", -1);
diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp
index 22e0e452f3d..eb305fbd0fa 100644
--- a/src/mongo/db/repl/database_cloner_test.cpp
+++ b/src/mongo/db/repl/database_cloner_test.cpp
@@ -35,7 +35,6 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/base_cloner_test_fixture.h"
#include "mongo/db/repl/database_cloner.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/unittest/unittest.h"
namespace {
diff --git a/src/mongo/db/repl/elect_cmd_runner_test.cpp b/src/mongo/db/repl/elect_cmd_runner_test.cpp
index f4d4c913bf1..2da5db3e98b 100644
--- a/src/mongo/db/repl/elect_cmd_runner_test.cpp
+++ b/src/mongo/db/repl/elect_cmd_runner_test.cpp
@@ -33,11 +33,12 @@
#include "mongo/base/status.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/elect_cmd_runner.h"
#include "mongo/db/repl/member_heartbeat_data.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/replication_executor.h"
-#include "mongo/db/repl/elect_cmd_runner.h"
+#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/stdx/functional.h"
#include "mongo/unittest/unittest.h"
@@ -48,6 +49,8 @@ namespace mongo {
namespace repl {
namespace {
+ using executor::NetworkInterfaceMock;
+
class ElectCmdRunnerTest : public mongo::unittest::Test {
public:
void startTest(ElectCmdRunner* electCmdRunner,
@@ -65,6 +68,7 @@ namespace {
const std::vector<HostAndPort>& hosts);
NetworkInterfaceMock* _net;
+ StorageInterfaceMock* _storage;
boost::scoped_ptr<ReplicationExecutor> _executor;
boost::scoped_ptr<boost::thread> _executorThread;
@@ -77,7 +81,8 @@ namespace {
void ElectCmdRunnerTest::setUp() {
_net = new NetworkInterfaceMock;
- _executor.reset(new ReplicationExecutor(_net, 1 /* prng seed */));
+ _storage = new StorageInterfaceMock;
+ _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng seed */));
_executorThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run,
_executor.get())));
}
diff --git a/src/mongo/db/repl/election_winner_declarer_test.cpp b/src/mongo/db/repl/election_winner_declarer_test.cpp
index bf006dd4355..1b264e26b56 100644
--- a/src/mongo/db/repl/election_winner_declarer_test.cpp
+++ b/src/mongo/db/repl/election_winner_declarer_test.cpp
@@ -34,8 +34,8 @@
#include "mongo/base/status.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/election_winner_declarer.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/replication_executor.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/stdx/functional.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/mongoutils/str.h"
@@ -46,6 +46,7 @@ namespace mongo {
namespace repl {
namespace {
+ using executor::NetworkInterfaceMock;
using unittest::assertGet;
bool stringContains(const std::string &haystack, const std::string& needle) {
diff --git a/src/mongo/db/repl/fetcher_test.cpp b/src/mongo/db/repl/fetcher_test.cpp
index dde20c44fbf..85a693dd48f 100644
--- a/src/mongo/db/repl/fetcher_test.cpp
+++ b/src/mongo/db/repl/fetcher_test.cpp
@@ -32,9 +32,9 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/fetcher.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/replication_executor_test_fixture.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/unittest/unittest.h"
@@ -42,6 +42,7 @@ namespace {
using namespace mongo;
using namespace mongo::repl;
+ using executor::NetworkInterfaceMock;
const HostAndPort target("localhost", -1);
const BSONObj findCmdObj = BSON("find" << "coll");
diff --git a/src/mongo/db/repl/freshness_checker_test.cpp b/src/mongo/db/repl/freshness_checker_test.cpp
index 6f2e3ab9a85..c271ae0c371 100644
--- a/src/mongo/db/repl/freshness_checker_test.cpp
+++ b/src/mongo/db/repl/freshness_checker_test.cpp
@@ -33,11 +33,12 @@
#include "mongo/base/status.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/freshness_checker.h"
#include "mongo/db/repl/member_heartbeat_data.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/replication_executor.h"
-#include "mongo/db/repl/freshness_checker.h"
+#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/platform/unordered_set.h"
#include "mongo/stdx/functional.h"
#include "mongo/unittest/unittest.h"
@@ -49,6 +50,7 @@ namespace mongo {
namespace repl {
namespace {
+ using executor::NetworkInterfaceMock;
using unittest::assertGet;
bool stringContains(const std::string &haystack, const std::string& needle) {
@@ -73,6 +75,7 @@ namespace {
}
NetworkInterfaceMock* _net;
+ StorageInterfaceMock* _storage;
boost::scoped_ptr<ReplicationExecutor> _executor;
boost::scoped_ptr<boost::thread> _executorThread;
@@ -91,7 +94,8 @@ namespace {
void FreshnessCheckerTest::setUp() {
_net = new NetworkInterfaceMock;
- _executor.reset(new ReplicationExecutor(_net, 1 /* prng seed */));
+ _storage = new StorageInterfaceMock;
+ _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng seed */));
_executorThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run,
_executor.get())));
_checker.reset(new FreshnessChecker);
diff --git a/src/mongo/db/repl/network_interface_impl.cpp b/src/mongo/db/repl/network_interface_impl.cpp
deleted file mode 100644
index 9e76f27a8be..00000000000
--- a/src/mongo/db/repl/network_interface_impl.cpp
+++ /dev/null
@@ -1,287 +0,0 @@
-/**
- * Copyright (C) 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/repl/network_interface_impl.h"
-
-#include <boost/make_shared.hpp>
-#include <memory>
-
-#include "mongo/db/auth/authorization_session.h"
-#include "mongo/client/connection_pool.h"
-#include "mongo/db/client.h"
-#include "mongo/db/operation_context_impl.h"
-#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/log.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-namespace repl {
-
-namespace {
-
- const size_t kMinThreads = 1;
- const size_t kMaxThreads = 51; // Set to 1 + max repl set size, for heartbeat + wiggle room.
- const Seconds kMaxIdleThreadAge(30);
-
-} // namespace
-
- NetworkInterfaceImpl::NetworkInterfaceImpl() :
- _numIdleThreads(0),
- _nextThreadId(0),
- _lastFullUtilizationDate(),
- _isExecutorRunnable(false),
- _inShutdown(false),
- _commandRunner(kMessagingPortKeepOpen),
- _numActiveNetworkRequests(0) {
-
- }
-
- NetworkInterfaceImpl::~NetworkInterfaceImpl() { }
-
- std::string NetworkInterfaceImpl::getDiagnosticString() {
- boost::lock_guard<boost::mutex> lk(_mutex);
- str::stream output;
- output << "NetworkImpl";
- output << " threads:" << _threads.size();
- output << " inShutdown:" << _inShutdown;
- output << " active:" << _numActiveNetworkRequests;
- output << " pending:" << _pending.size();
- output << " execRunable:" << _isExecutorRunnable;
- return output;
-
- }
-
- void NetworkInterfaceImpl::_startNewNetworkThread_inlock() {
- if (_inShutdown) {
- LOG(1) <<
- "Not starting new replication networking thread while shutting down replication.";
- return;
- }
- if (_threads.size() >= kMaxThreads) {
- LOG(1) << "Not starting new replication networking thread because " << kMaxThreads <<
- " are already running; " << _numIdleThreads << " threads are idle and " <<
- _pending.size() << " network requests are waiting for a thread to serve them.";
- return;
- }
- const std::string threadName(str::stream() << "ReplExecNetThread-" << _nextThreadId++);
- try {
- _threads.push_back(
- boost::make_shared<boost::thread>(
- stdx::bind(&NetworkInterfaceImpl::_requestProcessorThreadBody,
- this,
- threadName)));
- ++_numIdleThreads;
- }
- catch (const std::exception& ex) {
- error() << "Failed to start " << threadName << "; " << _threads.size() <<
- " other network threads still running; caught exception: " << ex.what();
- }
- }
-
- void NetworkInterfaceImpl::startup() {
- boost::lock_guard<boost::mutex> lk(_mutex);
- invariant(!_inShutdown);
- if (!_threads.empty()) {
- return;
- }
- for (size_t i = 0; i < kMinThreads; ++i) {
- _startNewNetworkThread_inlock();
- }
- }
-
- void NetworkInterfaceImpl::shutdown() {
- using std::swap;
- boost::unique_lock<boost::mutex> lk(_mutex);
- _inShutdown = true;
- _hasPending.notify_all();
- ThreadList threadsToJoin;
- swap(threadsToJoin, _threads);
- lk.unlock();
- _commandRunner.shutdown();
- std::for_each(threadsToJoin.begin(),
- threadsToJoin.end(),
- stdx::bind(&boost::thread::join, stdx::placeholders::_1));
- }
-
- void NetworkInterfaceImpl::signalWorkAvailable() {
- boost::lock_guard<boost::mutex> lk(_mutex);
- _signalWorkAvailable_inlock();
- }
-
- void NetworkInterfaceImpl::_signalWorkAvailable_inlock() {
- if (!_isExecutorRunnable) {
- _isExecutorRunnable = true;
- _isExecutorRunnableCondition.notify_one();
- }
- }
-
- void NetworkInterfaceImpl::waitForWork() {
- boost::unique_lock<boost::mutex> lk(_mutex);
- while (!_isExecutorRunnable) {
- _isExecutorRunnableCondition.wait(lk);
- }
- _isExecutorRunnable = false;
- }
-
- void NetworkInterfaceImpl::waitForWorkUntil(Date_t when) {
- boost::unique_lock<boost::mutex> lk(_mutex);
- while (!_isExecutorRunnable) {
- const Milliseconds waitTime(when - now());
- if (waitTime <= Milliseconds(0)) {
- break;
- }
- _isExecutorRunnableCondition.wait_for(lk, waitTime);
- }
- _isExecutorRunnable = false;
- }
-
- void NetworkInterfaceImpl::_requestProcessorThreadBody(
- NetworkInterfaceImpl* net,
- const std::string& threadName) {
- setThreadName(threadName);
- LOG(1) << "thread starting";
- net->_consumeNetworkRequests();
-
- // At this point, another thread may have destroyed "net", if this thread chose to detach
- // itself and remove itself from net->_threads before releasing net->_mutex. Do not access
- // member variables of "net" from here, on.
- LOG(1) << "thread shutting down";
- }
-
- void NetworkInterfaceImpl::_consumeNetworkRequests() {
- boost::unique_lock<boost::mutex> lk(_mutex);
- while (!_inShutdown) {
- if (_pending.empty()) {
- if (_threads.size() > kMinThreads) {
- const Date_t nowDate = now();
- const Date_t nextThreadRetirementDate =
- _lastFullUtilizationDate + kMaxIdleThreadAge;
- if (nowDate > nextThreadRetirementDate) {
- _lastFullUtilizationDate = nowDate;
- break;
- }
- }
- _hasPending.wait_for(lk, kMaxIdleThreadAge);
- continue;
- }
- CommandData todo = _pending.front();
- _pending.pop_front();
- ++_numActiveNetworkRequests;
- --_numIdleThreads;
- lk.unlock();
- ResponseStatus result = _commandRunner.runCommand(todo.request);
- LOG(2) << "Network status of sending " << todo.request.cmdObj.firstElementFieldName() <<
- " to " << todo.request.target << " was " << result.getStatus();
- todo.onFinish(result);
- lk.lock();
- --_numActiveNetworkRequests;
- ++_numIdleThreads;
- _signalWorkAvailable_inlock();
- }
- --_numIdleThreads;
- if (_inShutdown) {
- return;
- }
- // This thread is ending because it was idle for too long.
- // Find self in _threads, remove self from _threads, detach self.
- for (size_t i = 0; i < _threads.size(); ++i) {
- if (_threads[i]->get_id() != stdx::this_thread::get_id()) {
- continue;
- }
- _threads[i]->detach();
- _threads[i].swap(_threads.back());
- _threads.pop_back();
- return;
- }
- severe().stream() << "Could not find this thread, with id " <<
- stdx::this_thread::get_id() << " in the replication networking thread pool";
- fassertFailedNoTrace(28581);
- }
-
- void NetworkInterfaceImpl::startCommand(
- const ReplicationExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
- LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " <<
- request.target;
- boost::lock_guard<boost::mutex> lk(_mutex);
- _pending.push_back(CommandData());
- CommandData& cd = _pending.back();
- cd.cbHandle = cbHandle;
- cd.request = request;
- cd.onFinish = onFinish;
- if (_numIdleThreads < _pending.size()) {
- _startNewNetworkThread_inlock();
- }
- if (_numIdleThreads <= _pending.size()) {
- _lastFullUtilizationDate = Date_t::now();
- }
- _hasPending.notify_one();
- }
-
- void NetworkInterfaceImpl::cancelCommand(const ReplicationExecutor::CallbackHandle& cbHandle) {
- boost::unique_lock<boost::mutex> lk(_mutex);
- CommandDataList::iterator iter;
- for (iter = _pending.begin(); iter != _pending.end(); ++iter) {
- if (iter->cbHandle == cbHandle) {
- break;
- }
- }
- if (iter == _pending.end()) {
- return;
- }
- const RemoteCommandCompletionFn onFinish = iter->onFinish;
- LOG(2) << "Canceled sending " << iter->request.cmdObj.firstElementFieldName() << " to " <<
- iter->request.target;
- _pending.erase(iter);
- lk.unlock();
- onFinish(ResponseStatus(ErrorCodes::CallbackCanceled, "Callback canceled"));
- lk.lock();
- _signalWorkAvailable_inlock();
- }
-
- Date_t NetworkInterfaceImpl::now() {
- return Date_t::now();
- }
-
- OperationContext* NetworkInterfaceImpl::createOperationContext() {
- if (!ClientBasic::getCurrent()) {
- Client::initThreadIfNotAlready();
- AuthorizationSession::get(*ClientBasic::getCurrent())->grantInternalAuthorization();
- }
- return new OperationContextImpl();
- }
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/network_interface_impl.h b/src/mongo/db/repl/network_interface_impl.h
deleted file mode 100644
index 4350f7c000a..00000000000
--- a/src/mongo/db/repl/network_interface_impl.h
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Copyright (C) 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-
-#pragma once
-
-#include <boost/shared_ptr.hpp>
-#include <boost/thread.hpp>
-#include <boost/thread/condition_variable.hpp>
-#include <boost/thread/mutex.hpp>
-#include <vector>
-
-#include "mongo/client/remote_command_runner_impl.h"
-#include "mongo/db/repl/replication_executor.h"
-#include "mongo/stdx/list.h"
-
-namespace mongo {
-namespace repl {
-
- /**
- * Implementation of the network interface used by the ReplicationExecutor inside mongod.
- *
- * This implementation manages a dynamically sized group of worker threads for performing
- * network operations. The minimum and maximum number of threads is set at compile time, and
- * the exact number of threads is adjusted dynamically, using the following two rules.
- *
- * 1.) If the number of worker threads is less than the maximum, there are no idle worker
- * threads, and the client enqueues a new network operation via startCommand(), the network
- * interface spins up a new worker thread. This decision is made on the assumption that
- * spinning up a new thread is faster than the round-trip time for processing a remote command,
- * and so this will minimize wait time.
- *
- * 2.) If the number of worker threads has exceeded the the peak number of scheduled outstanding
- * network commands continuously for a period of time (kMaxIdleThreadAge), one thread is retired
- * from the pool and the monitoring of idle threads is reset. This means that at most one
- * thread retires every kMaxIdleThreadAge units of time. The value of kMaxIdleThreadAge is set
- * to be much larger than the expected frequency of new requests, averaging out short-duration
- * periods of idleness, as occur between heartbeats.
- *
- * The implementation also manages a pool of network connections to recently contacted remote
- * nodes. The size of this pool is not bounded, but connections are retired unconditionally
- * after they have been connected for a certain maximum period.
- */
- class NetworkInterfaceImpl : public ReplicationExecutor::NetworkInterface {
- public:
- explicit NetworkInterfaceImpl();
- virtual ~NetworkInterfaceImpl();
- virtual std::string getDiagnosticString();
- virtual void startup();
- virtual void shutdown();
- virtual void waitForWork();
- virtual void waitForWorkUntil(Date_t when);
- virtual void signalWorkAvailable();
- virtual Date_t now();
- virtual void startCommand(
- const ReplicationExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish);
- virtual void cancelCommand(const ReplicationExecutor::CallbackHandle& cbHandle);
- OperationContext* createOperationContext() override;
-
- std::string getNextCallbackWithGlobalLockThreadName();
-
- private:
- /**
- * Information describing an in-flight command.
- */
- struct CommandData {
- ReplicationExecutor::CallbackHandle cbHandle;
- RemoteCommandRequest request;
- RemoteCommandCompletionFn onFinish;
- };
- typedef stdx::list<CommandData> CommandDataList;
- typedef std::vector<boost::shared_ptr<boost::thread> > ThreadList;
-
- /**
- * Thread body for threads that synchronously perform network requests from
- * the _pending list.
- */
- static void _requestProcessorThreadBody(NetworkInterfaceImpl* net,
- const std::string& threadName);
-
- /**
- * Run loop that iteratively consumes network requests in a request processor thread.
- */
- void _consumeNetworkRequests();
-
- /**
- * Notifies the network threads that there is work available.
- */
- void _signalWorkAvailable_inlock();
-
- /**
- * Starts a new network thread.
- */
- void _startNewNetworkThread_inlock();
-
- // Mutex guarding the state of this network interface, except for the remote command
- // executor, which has its own concurrency control.
- boost::mutex _mutex;
-
- // Condition signaled to indicate that there is work in the _pending queue.
- boost::condition_variable _hasPending;
-
- // Queue of yet-to-be-executed network operations.
- CommandDataList _pending;
-
- // List of threads serving as the worker pool.
- ThreadList _threads;
-
- // Count of idle threads.
- size_t _numIdleThreads;
-
- // Id counter for assigning thread names
- size_t _nextThreadId;
-
- // The last time that _pending.size() + _numActiveNetworkRequests grew to be at least
- // _threads.size().
- Date_t _lastFullUtilizationDate;
-
- // Condition signaled to indicate that the executor, blocked in waitForWorkUntil or
- // waitForWork, should wake up.
- boost::condition_variable _isExecutorRunnableCondition;
-
- // Flag indicating whether or not the executor associated with this interface is runnable.
- bool _isExecutorRunnable;
-
- // Flag indicating when this interface is being shut down (because shutdown() has executed).
- bool _inShutdown;
-
- // Interface for running remote commands
- RemoteCommandRunnerImpl _commandRunner;
-
- // Number of active network requests
- size_t _numActiveNetworkRequests;
- };
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/network_interface_mock.cpp b/src/mongo/db/repl/network_interface_mock.cpp
deleted file mode 100644
index 80263365eba..00000000000
--- a/src/mongo/db/repl/network_interface_mock.cpp
+++ /dev/null
@@ -1,405 +0,0 @@
-/**
- * Copyright (C) 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/repl/network_interface_mock.h"
-
-#include "mongo/db/repl/operation_context_repl_mock.h"
-#include "mongo/db/repl/replication_executor.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-namespace repl {
-
- NetworkInterfaceMock::NetworkInterfaceMock()
- : _waitingToRunMask(0),
- _currentlyRunning(kNoThread),
- _now(fassertStatusOK(18653, dateFromISOString("2014-08-01T00:00:00Z"))),
- _hasStarted(false),
- _inShutdown(false),
- _executorNextWakeupDate(Date_t::max()) {
- }
-
- NetworkInterfaceMock::~NetworkInterfaceMock() {
- boost::unique_lock<boost::mutex> lk(_mutex);
- invariant(!_hasStarted || _inShutdown);
- invariant(_scheduled.empty());
- invariant(_blackHoled.empty());
- }
-
- std::string NetworkInterfaceMock::getDiagnosticString() {
- // TODO something better.
- return "NetworkInterfaceMock diagnostics here";
- }
-
- Date_t NetworkInterfaceMock::now() {
- boost::lock_guard<boost::mutex> lk(_mutex);
- return _now_inlock();
- }
-
- OperationContext* NetworkInterfaceMock::createOperationContext() {
- return new OperationContextReplMock();
- }
-
- void NetworkInterfaceMock::startCommand(
- const ReplicationExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
-
- boost::lock_guard<boost::mutex> lk(_mutex);
- invariant(!_inShutdown);
- const Date_t now = _now_inlock();
- NetworkOperationIterator insertBefore = _unscheduled.begin();
- while ((insertBefore != _unscheduled.end()) &&
- (insertBefore->getNextConsiderationDate() <= now)) {
-
- ++insertBefore;
- }
- _unscheduled.insert(insertBefore, NetworkOperation(cbHandle, request, now, onFinish));
- }
-
- static bool findAndCancelIf(
- const stdx::function<bool (const NetworkInterfaceMock::NetworkOperation&)>& matchFn,
- NetworkInterfaceMock::NetworkOperationList* other,
- NetworkInterfaceMock::NetworkOperationList* scheduled,
- const Date_t now) {
- const NetworkInterfaceMock::NetworkOperationIterator noi =
- std::find_if(other->begin(), other->end(), matchFn);
- if (noi == other->end()) {
- return false;
- }
- scheduled->splice(scheduled->begin(), *other, noi);
- noi->setResponse(now, ResponseStatus(ErrorCodes::CallbackCanceled,
- "Network operation canceled"));
- return true;
- }
-
- void NetworkInterfaceMock::cancelCommand(
- const ReplicationExecutor::CallbackHandle& cbHandle) {
- boost::lock_guard<boost::mutex> lk(_mutex);
- invariant(!_inShutdown);
- stdx::function<bool (const NetworkOperation&)> matchesHandle = stdx::bind(
- &NetworkOperation::isForCallback,
- stdx::placeholders::_1,
- cbHandle);
- const Date_t now = _now_inlock();
- if (findAndCancelIf(matchesHandle, &_unscheduled, &_scheduled, now)) {
- return;
- }
- if (findAndCancelIf(matchesHandle, &_blackHoled, &_scheduled, now)) {
- return;
- }
- if (findAndCancelIf(matchesHandle, &_scheduled, &_scheduled, now)) {
- return;
- }
- // No not-in-progress network command matched cbHandle. Oh, well.
- }
-
- void NetworkInterfaceMock::startup() {
- boost::lock_guard<boost::mutex> lk(_mutex);
- invariant(!_hasStarted);
- _hasStarted = true;
- _inShutdown = false;
- invariant(_currentlyRunning == kNoThread);
- _currentlyRunning = kExecutorThread;
- }
-
- void NetworkInterfaceMock::shutdown() {
- boost::unique_lock<boost::mutex> lk(_mutex);
- invariant(_hasStarted);
- invariant(!_inShutdown);
- _inShutdown = true;
- NetworkOperationList todo;
- todo.splice(todo.end(), _scheduled);
- todo.splice(todo.end(), _unscheduled);
- todo.splice(todo.end(), _processing);
- todo.splice(todo.end(), _blackHoled);
-
- const Date_t now = _now_inlock();
- _waitingToRunMask |= kExecutorThread; // Prevents network thread from scheduling.
- lk.unlock();
- for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) {
- iter->setResponse(now, ResponseStatus(ErrorCodes::ShutdownInProgress,
- "Shutting down mock network"));
- iter->finishResponse();
- }
- lk.lock();
- invariant(_currentlyRunning == kExecutorThread);
- _currentlyRunning = kNoThread;
- _waitingToRunMask = kNetworkThread;
- _shouldWakeNetworkCondition.notify_one();
- }
-
- void NetworkInterfaceMock::enterNetwork() {
- boost::unique_lock<boost::mutex> lk(_mutex);
- while (!_isNetworkThreadRunnable_inlock()) {
- _shouldWakeNetworkCondition.wait(lk);
- }
- _currentlyRunning = kNetworkThread;
- _waitingToRunMask &= ~kNetworkThread;
- }
-
- void NetworkInterfaceMock::exitNetwork() {
- boost::lock_guard<boost::mutex> lk(_mutex);
- if (_currentlyRunning != kNetworkThread) {
- return;
- }
- _currentlyRunning = kNoThread;
- if (_isExecutorThreadRunnable_inlock()) {
- _shouldWakeExecutorCondition.notify_one();
- }
- _waitingToRunMask |= kNetworkThread;
- }
-
- bool NetworkInterfaceMock::hasReadyRequests() {
- boost::lock_guard<boost::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- return _hasReadyRequests_inlock();
- }
-
- bool NetworkInterfaceMock::_hasReadyRequests_inlock() {
- if (_unscheduled.empty())
- return false;
- if (_unscheduled.front().getNextConsiderationDate() > _now_inlock()) {
- return false;
- }
- return true;
- }
-
- NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNextReadyRequest() {
- boost::unique_lock<boost::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- while (!_hasReadyRequests_inlock()) {
- _waitingToRunMask |= kExecutorThread;
- _runReadyNetworkOperations_inlock(&lk);
- }
- invariant(_hasReadyRequests_inlock());
- _processing.splice(_processing.begin(), _unscheduled, _unscheduled.begin());
- return _processing.begin();
- }
-
- void NetworkInterfaceMock::scheduleResponse(
- NetworkOperationIterator noi,
- Date_t when,
- const ResponseStatus& response) {
-
- boost::lock_guard<boost::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- NetworkOperationIterator insertBefore = _scheduled.begin();
- while ((insertBefore != _scheduled.end()) && (insertBefore->getResponseDate() <= when)) {
- ++insertBefore;
- }
- noi->setResponse(when, response);
- _scheduled.splice(insertBefore, _processing, noi);
- }
-
- void NetworkInterfaceMock::blackHole(NetworkOperationIterator noi) {
- boost::lock_guard<boost::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- _blackHoled.splice(_blackHoled.end(), _processing, noi);
- }
-
- void NetworkInterfaceMock::requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil) {
- boost::lock_guard<boost::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- invariant(noi->getNextConsiderationDate() < dontAskUntil);
- invariant(_now_inlock() < dontAskUntil);
- NetworkOperationIterator insertBefore = _unscheduled.begin();
- for (; insertBefore != _unscheduled.end(); ++insertBefore) {
- if (insertBefore->getNextConsiderationDate() >= dontAskUntil) {
- break;
- }
- }
- noi->setNextConsiderationDate(dontAskUntil);
- _unscheduled.splice(insertBefore, _processing, noi);
- }
-
- void NetworkInterfaceMock::runUntil(Date_t until) {
- boost::unique_lock<boost::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- invariant(until > _now_inlock());
- while (until > _now_inlock()) {
- _runReadyNetworkOperations_inlock(&lk);
- if (_hasReadyRequests_inlock()) {
- break;
- }
- Date_t newNow = _executorNextWakeupDate;
- if (!_scheduled.empty() && _scheduled.front().getResponseDate() < newNow) {
- newNow = _scheduled.front().getResponseDate();
- }
- if (until < newNow) {
- newNow = until;
- }
- invariant(_now_inlock() <= newNow);
- _now = newNow;
- _waitingToRunMask |= kExecutorThread;
- }
- _runReadyNetworkOperations_inlock(&lk);
- }
-
- void NetworkInterfaceMock::runReadyNetworkOperations() {
- boost::unique_lock<boost::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- _runReadyNetworkOperations_inlock(&lk);
- }
-
- void NetworkInterfaceMock::waitForWork() {
- boost::unique_lock<boost::mutex> lk(_mutex);
- invariant(_currentlyRunning == kExecutorThread);
- _waitForWork_inlock(&lk);
- }
-
- void NetworkInterfaceMock::waitForWorkUntil(Date_t when) {
- boost::unique_lock<boost::mutex> lk(_mutex);
- invariant(_currentlyRunning == kExecutorThread);
- _executorNextWakeupDate = when;
- if (_executorNextWakeupDate <= _now_inlock()) {
- return;
- }
- _waitForWork_inlock(&lk);
- }
-
- void NetworkInterfaceMock::signalWorkAvailable() {
- boost::lock_guard<boost::mutex> lk(_mutex);
- _waitingToRunMask |= kExecutorThread;
- if (_currentlyRunning == kNoThread) {
- _shouldWakeExecutorCondition.notify_one();
- }
- }
-
- void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(
- boost::unique_lock<boost::mutex>* lk) {
- while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) {
- invariant(_currentlyRunning == kNetworkThread);
- NetworkOperation op = _scheduled.front();
- _scheduled.pop_front();
- _waitingToRunMask |= kExecutorThread;
- lk->unlock();
- op.finishResponse();
- lk->lock();
- }
- invariant(_currentlyRunning == kNetworkThread);
- if (!(_waitingToRunMask & kExecutorThread)) {
- return;
- }
- _shouldWakeExecutorCondition.notify_one();
- _currentlyRunning = kNoThread;
- while (!_isNetworkThreadRunnable_inlock()) {
- _shouldWakeNetworkCondition.wait(*lk);
- }
- _currentlyRunning = kNetworkThread;
- _waitingToRunMask &= ~kNetworkThread;
- }
-
- void NetworkInterfaceMock::_waitForWork_inlock(boost::unique_lock<boost::mutex>* lk) {
- if (_waitingToRunMask & kExecutorThread) {
- _waitingToRunMask &= ~kExecutorThread;
- return;
- }
- _currentlyRunning = kNoThread;
- while (!_isExecutorThreadRunnable_inlock()) {
- _waitingToRunMask |= kNetworkThread;
- _shouldWakeNetworkCondition.notify_one();
- _shouldWakeExecutorCondition.wait(*lk);
- }
- _currentlyRunning = kExecutorThread;
- _waitingToRunMask &= ~kExecutorThread;
- }
-
- bool NetworkInterfaceMock::_isNetworkThreadRunnable_inlock() {
- if (_currentlyRunning != kNoThread) {
- return false;
- }
- if (_waitingToRunMask != kNetworkThread) {
- return false;
- }
- return true;
- }
-
- bool NetworkInterfaceMock::_isExecutorThreadRunnable_inlock() {
- if (_currentlyRunning != kNoThread) {
- return false;
- }
- return _waitingToRunMask & kExecutorThread;
- }
-
- static const StatusWith<RemoteCommandResponse> kUnsetResponse(
- ErrorCodes::InternalError,
- "NetworkOperation::_response never set");
-
- NetworkInterfaceMock::NetworkOperation::NetworkOperation()
- : _requestDate(),
- _nextConsiderationDate(),
- _responseDate(),
- _request(),
- _response(kUnsetResponse),
- _onFinish() {
- }
-
- NetworkInterfaceMock::NetworkOperation::NetworkOperation(
- const ReplicationExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& theRequest,
- Date_t theRequestDate,
- const RemoteCommandCompletionFn& onFinish)
- : _requestDate(theRequestDate),
- _nextConsiderationDate(theRequestDate),
- _responseDate(),
- _cbHandle(cbHandle),
- _request(theRequest),
- _response(kUnsetResponse),
- _onFinish(onFinish) {
- }
-
- NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {}
-
- void NetworkInterfaceMock::NetworkOperation::setNextConsiderationDate(
- Date_t nextConsiderationDate) {
-
- invariant(nextConsiderationDate > _nextConsiderationDate);
- _nextConsiderationDate = nextConsiderationDate;
- }
-
- void NetworkInterfaceMock::NetworkOperation::setResponse(
- Date_t responseDate,
- const ResponseStatus& response) {
-
- invariant(responseDate >= _requestDate);
- _responseDate = responseDate;
- _response = response;
- }
-
- void NetworkInterfaceMock::NetworkOperation::finishResponse() {
- invariant(_onFinish);
- _onFinish(_response);
- _onFinish = RemoteCommandCompletionFn();
- }
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/network_interface_mock.h b/src/mongo/db/repl/network_interface_mock.h
deleted file mode 100644
index 255eb7efec7..00000000000
--- a/src/mongo/db/repl/network_interface_mock.h
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
- * Copyright (C) 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
-#include <map>
-
-#include "mongo/db/repl/replication_executor.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-namespace repl {
-
- /**
- * Mock network implementation for use in unit tests.
- *
- * To use, construct a new instance on the heap, and keep a pointer to it. Pass
- * the pointer to the instance into the ReplicationExecutor constructor, transfering
- * ownership. Start the executor's run() method in a separate thread, schedule the
- * work you want to test into the executor, then while the test is still going, iterate
- * through the ready network requests, servicing them and advancing time as needed.
- *
- * The mock has a fully virtualized notion of time and the the network. When the
- * replication executor under test schedules a network operation, the startCommand
- * method of this class adds an entry to the _unscheduled queue for immediate consideration.
- * The test driver loop, when it examines the request, may schedule a response, ask the
- * interface to redeliver the request at a later virtual time, or to swallow the virtual
- * request until the end of the simulation. The test driver loop can also instruct the
- * interface to run forward through virtual time until there are operations ready to
- * consider, via runUntil.
- *
- * The thread acting as the "network" and the executor run thread are highly synchronized
- * by this code, allowing for deterministic control of operation interleaving.
- */
- class NetworkInterfaceMock : public ReplicationExecutor::NetworkInterface {
- public:
- class NetworkOperation;
- typedef stdx::list<NetworkOperation> NetworkOperationList;
- typedef NetworkOperationList::iterator NetworkOperationIterator;
-
- NetworkInterfaceMock();
- virtual ~NetworkInterfaceMock();
- virtual std::string getDiagnosticString();
-
- ////////////////////////////////////////////////////////////////////////////////
- //
- // ReplicationExecutor::NetworkInterface methods
- //
- ////////////////////////////////////////////////////////////////////////////////
-
- virtual void startup();
- virtual void shutdown();
- virtual void waitForWork();
- virtual void waitForWorkUntil(Date_t when);
- virtual void signalWorkAvailable();
- virtual Date_t now();
- virtual void startCommand(const ReplicationExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish);
- virtual void cancelCommand(const ReplicationExecutor::CallbackHandle& cbHandle);
- OperationContext* createOperationContext() override;
-
-
- ////////////////////////////////////////////////////////////////////////////////
- //
- // Methods for simulating network operations and the passage of time.
- //
- // Methods in this section are to be called by the thread currently simulating
- // the network.
- //
- ////////////////////////////////////////////////////////////////////////////////
-
- /**
- * Causes the currently running (non-executor) thread to assume the mantle of the network
- * simulation thread.
- *
- * Call this before calling any of the other methods in this section.
- */
- void enterNetwork();
-
- /**
- * Causes the currently running thread to drop the mantle of "network simulation thread".
- *
- * Call this before calling any methods that might block waiting for the replciation
- * executor thread.
- */
- void exitNetwork();
-
- /**
- * Returns true if there are unscheduled network requests to be processed.
- */
- bool hasReadyRequests();
-
- /**
- * Gets the next unscheduled request to process, blocking until one is available.
- *
- * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
- */
- NetworkOperationIterator getNextReadyRequest();
-
- /**
- * Schedules "response" in response to "noi" at virtual time "when".
- */
- void scheduleResponse(
- NetworkOperationIterator noi,
- Date_t when,
- const ResponseStatus& response);
-
- /**
- * Swallows "noi", causing the network interface to not respond to it until
- * shutdown() is called.
- */
- void blackHole(NetworkOperationIterator noi);
-
- /**
- * Defers decision making on "noi" until virtual time "dontAskUntil". Use
- * this when getNextReadyRequest() returns a request you want to deal with
- * after looking at other requests.
- */
- void requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil);
-
- /**
- * Runs the simulator forward until now() == until or hasReadyRequests() is true.
- *
- * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
- */
- void runUntil(Date_t until);
-
- /**
- * Processes all ready, scheduled network operations.
- *
- * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
- */
- void runReadyNetworkOperations();
-
- private:
- /**
- * Type used to identify which thread (network mock or executor) is currently executing.
- *
- * Values are used in a bitmask, as well.
- */
- enum ThreadType {
- kNoThread = 0,
- kExecutorThread = 1,
- kNetworkThread = 2
- };
-
- /**
- * Returns the current virtualized time.
- */
- Date_t _now_inlock() const { return _now; }
-
- /**
- * Implementation of waitForWork*.
- */
- void _waitForWork_inlock(boost::unique_lock<boost::mutex>* lk);
-
- /**
- * Returns true if there are ready requests for the network thread to service.
- */
- bool _hasReadyRequests_inlock();
-
- /**
- * Returns true if the network thread could run right now.
- */
- bool _isNetworkThreadRunnable_inlock();
-
- /**
- * Returns true if the executor thread could run right now.
- */
- bool _isExecutorThreadRunnable_inlock();
-
- /**
- * Runs all ready network operations, called while holding "lk". May drop and
- * reaquire "lk" several times, but will not return until the executor has blocked
- * in waitFor*.
- */
- void _runReadyNetworkOperations_inlock(boost::unique_lock<boost::mutex>* lk);
-
- // Mutex that synchronizes access to mutable data in this class and its subclasses.
- // Fields guarded by the mutex are labled (M), below, and those that are read-only
- // in multi-threaded execution, and so unsynchronized, are labeled (R).
- boost::mutex _mutex;
-
- // Condition signaled to indicate that the network processing thread should wake up.
- boost::condition_variable _shouldWakeNetworkCondition; // (M)
-
- // Condition signaled to indicate that the executor run thread should wake up.
- boost::condition_variable _shouldWakeExecutorCondition; // (M)
-
- // Bitmask indicating which threads are runnable.
- int _waitingToRunMask; // (M)
-
- // Indicator of which thread, if any, is currently running.
- ThreadType _currentlyRunning; // (M)
-
- // The current time reported by this instance of NetworkInterfaceMock.
- Date_t _now; // (M)
-
- // Set to true by "startUp()"
- bool _hasStarted; // (M)
-
- // Set to true by "shutDown()".
- bool _inShutdown; // (M)
-
- // Next date that the executor expects to wake up at (due to a scheduleWorkAt() call).
- Date_t _executorNextWakeupDate; // (M)
-
- // List of network operations whose responses haven't been scheduled or blackholed. This is
- // where network requests are first queued. It is sorted by
- // NetworkOperation::_nextConsiderationDate, which is set to now() when startCommand() is
- // called, and adjusted by requeueAt().
- NetworkOperationList _unscheduled; // (M)
-
- // List of network operations that have been returned by getNextReadyRequest() but not
- // yet scheudled, black-holed or requeued.
- NetworkOperationList _processing; // (M)
-
- // List of network operations whose responses have been scheduled but not delivered, sorted
- // by NetworkOperation::_responseDate. These operations will have their responses delivered
- // when now() == getResponseDate().
- NetworkOperationList _scheduled; // (M)
-
- // List of network operations that will not be responded to until shutdown() is called.
- NetworkOperationList _blackHoled; // (M)
-
- // Pointer to the executor into which this mock is installed. Used to signal the executor
- // when the clock changes.
- ReplicationExecutor* _executor; // (R)
- };
-
- /**
- * Representation of an in-progress network operation.
- */
- class NetworkInterfaceMock::NetworkOperation {
- public:
- NetworkOperation();
- NetworkOperation(const ReplicationExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& theRequest,
- Date_t theRequestDate,
- const RemoteCommandCompletionFn& onFinish);
- ~NetworkOperation();
-
- /**
- * Adjusts the stored virtual time at which this entry will be subject to consideration
- * by the test harness.
- */
- void setNextConsiderationDate(Date_t nextConsiderationDate);
-
- /**
- * Sets the response and thet virtual time at which it will be delivered.
- */
- void setResponse(Date_t responseDate, const ResponseStatus& response);
-
- /**
- * Predicate that returns true if cbHandle equals the executor's handle for this network
- * operation. Used for searching lists of NetworkOperations.
- */
- bool isForCallback(const ReplicationExecutor::CallbackHandle& cbHandle) const {
- return cbHandle == _cbHandle;
- }
-
- /**
- * Gets the request that initiated this operation.
- */
- const RemoteCommandRequest& getRequest() const { return _request; }
-
- /**
- * Gets the virtual time at which the operation was started.
- */
- Date_t getRequestDate() const { return _requestDate; }
-
- /**
- * Gets the virtual time at which the test harness should next consider what to do
- * with this request.
- */
- Date_t getNextConsiderationDate() const { return _nextConsiderationDate; }
-
- /**
- * After setResponse() has been called, returns the virtual time at which
- * the response should be delivered.
- */
- Date_t getResponseDate() const { return _responseDate; }
-
- /**
- * Delivers the response, by invoking the onFinish callback passed into the constructor.
- */
- void finishResponse();
-
- private:
- Date_t _requestDate;
- Date_t _nextConsiderationDate;
- Date_t _responseDate;
- ReplicationExecutor::CallbackHandle _cbHandle;
- RemoteCommandRequest _request;
- ResponseStatus _response;
- RemoteCommandCompletionFn _onFinish;
- };
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp b/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp
index 29a7812d6f3..8d53dffc062 100644
--- a/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp
+++ b/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp
@@ -30,9 +30,9 @@
#include <boost/scoped_ptr.hpp>
+#include "mongo/db/repl/repl_set_heartbeat_response.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
-#include "mongo/db/repl/repl_set_heartbeat_response.h"
namespace mongo {
namespace repl {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 0fbd1b9424d..942681cedf6 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -56,6 +56,7 @@
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/last_vote.h"
#include "mongo/db/storage/storage_engine.h"
+#include "mongo/executor/network_interface.h"
#include "mongo/s/d_state.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/assert_util.h"
@@ -294,8 +295,7 @@ namespace {
}
void ReplicationCoordinatorExternalStateImpl::closeConnections() {
- MessagingPort::closeAllSockets(
- ReplicationExecutor::NetworkInterface::kMessagingPortKeepOpen);
+ MessagingPort::closeAllSockets(executor::NetworkInterface::kMessagingPortKeepOpen);
}
void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index dc9a7d36ec6..e3ea081940a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -78,6 +78,7 @@ namespace repl {
namespace {
typedef StatusWith<ReplicationExecutor::CallbackHandle> CBHStatus;
+ using executor::NetworkInterface;
void lockAndCall(boost::unique_lock<boost::mutex>* lk, const stdx::function<void ()>& fn) {
if (!lk->owns_lock()) {
@@ -159,13 +160,16 @@ namespace {
ReplicationCoordinatorExternalState* externalState,
TopologyCoordinator* topCoord,
int64_t prngSeed,
- ReplicationExecutor::NetworkInterface* network,
+ NetworkInterface* network,
+ StorageInterface* storage,
ReplicationExecutor* replExec) :
_settings(settings),
_replMode(getReplicationModeFromSettings(settings)),
_topCoord(topCoord),
_replExecutorIfOwned(replExec ? nullptr :
- new ReplicationExecutor(network, prngSeed)),
+ new ReplicationExecutor(network,
+ storage,
+ prngSeed)),
_replExecutor(replExec ? *replExec : *_replExecutorIfOwned),
_externalState(externalState),
_inShutdown(false),
@@ -198,13 +202,15 @@ namespace {
ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
const ReplSettings& settings,
ReplicationCoordinatorExternalState* externalState,
- ReplicationExecutor::NetworkInterface* network,
+ NetworkInterface* network,
+ StorageInterface* storage,
TopologyCoordinator* topCoord,
int64_t prngSeed) : ReplicationCoordinatorImpl(settings,
externalState,
topCoord,
prngSeed,
network,
+ storage,
nullptr) { }
ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
@@ -217,6 +223,7 @@ namespace {
topCoord,
prngSeed,
nullptr,
+ nullptr,
replExec) { }
ReplicationCoordinatorImpl::~ReplicationCoordinatorImpl() {}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index d8dabc082eb..625007cd09f 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -38,13 +38,14 @@
#include "mongo/base/status.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/service_context.h"
-#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/data_replicator.h"
+#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_external_state.h"
#include "mongo/db/repl/replication_executor.h"
+#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/update_position_args.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/unordered_map.h"
@@ -81,7 +82,8 @@ namespace repl {
// Takes ownership of the "externalState", "topCoord" and "network" objects.
ReplicationCoordinatorImpl(const ReplSettings& settings,
ReplicationCoordinatorExternalState* externalState,
- ReplicationExecutor::NetworkInterface* network,
+ executor::NetworkInterface* network,
+ StorageInterface* storage,
TopologyCoordinator* topoCoord,
int64_t prngSeed);
// Takes ownership of the "externalState" and "topCoord" objects.
@@ -296,7 +298,8 @@ namespace repl {
ReplicationCoordinatorExternalState* externalState,
TopologyCoordinator* topCoord,
int64_t prngSeed,
- ReplicationExecutor::NetworkInterface* network,
+ executor::NetworkInterface* network,
+ StorageInterface* storage,
ReplicationExecutor* replExec);
/**
* Configuration states for a replica set node.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
index 5202e59c65c..a0e4149de24 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/repl/is_master_response.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/operation_context_repl_mock.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_response.h"
@@ -41,6 +40,7 @@
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/log.h"
@@ -48,6 +48,8 @@ namespace mongo {
namespace repl {
namespace {
+ using executor::NetworkInterfaceMock;
+
class ReplCoordElectTest : public ReplCoordTest {
protected:
void simulateEnoughHeartbeatsForElectability();
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index 43d6d2f69b9..c7d5c9944da 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/repl/is_master_response.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/operation_context_repl_mock.h"
#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
#include "mongo/db/repl/repl_set_heartbeat_response.h"
@@ -41,6 +40,7 @@
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/log.h"
@@ -48,6 +48,8 @@ namespace mongo {
namespace repl {
namespace {
+ using executor::NetworkInterfaceMock;
+
class ReplCoordElectV1Test : public ReplCoordTest {
protected:
void simulateEnoughHeartbeatsForElectability();
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp
index dadc7b5a026..2afcad55859 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp
@@ -32,7 +32,6 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context_noop.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_response.h"
#include "mongo/db/repl/replica_set_config.h"
@@ -40,6 +39,7 @@
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/log.h"
@@ -47,6 +47,8 @@ namespace mongo {
namespace repl {
namespace {
+ using executor::NetworkInterfaceMock;
+
class ReplCoordHBTest : public ReplCoordTest {
protected:
void assertMemberState(MemberState expected, std::string msg = "");
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index 5851b6e7e7b..4ae6a358e53 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -32,7 +32,6 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context_noop.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
#include "mongo/db/repl/replica_set_config.h"
@@ -40,6 +39,7 @@
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/log.h"
@@ -47,6 +47,8 @@ namespace mongo {
namespace repl {
namespace {
+ using executor::NetworkInterfaceMock;
+
class ReplCoordHBV1Test : public ReplCoordTest {
protected:
void assertMemberState(MemberState expected, std::string msg = "");
diff --git a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp
index e4a0310daf3..5bf1b40f5cf 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp
@@ -32,7 +32,6 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context_noop.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_response.h"
#include "mongo/db/repl/replica_set_config.h"
@@ -40,6 +39,7 @@
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
#include "mongo/db/repl/replication_coordinator.h" // ReplSetReconfigArgs
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/log.h"
@@ -47,6 +47,7 @@ namespace mongo {
namespace repl {
namespace {
+ using executor::NetworkInterfaceMock;
typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs;
TEST_F(ReplCoordTest, ReconfigBeforeInitialized) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 3f5a574d0e4..82373106580 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -41,7 +41,6 @@
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/repl/handshake_args.h"
#include "mongo/db/repl/is_master_response.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/operation_context_repl_mock.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_after_optime_args.h"
@@ -57,6 +56,7 @@
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/server_options.h"
#include "mongo/db/write_concern_options.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/stdx/functional.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@@ -68,6 +68,7 @@ namespace mongo {
namespace repl {
namespace {
+ using executor::NetworkInterfaceMock;
typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs;
Status kInterruptedStatus(ErrorCodes::Interrupted, "operation was interrupted");
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index e7bd4d93703..2359de1f751 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -34,14 +34,15 @@
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/repl/is_master_response.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/operation_context_repl_mock.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
+#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/stdx/functional.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/log.h"
@@ -55,6 +56,8 @@ namespace {
}
} // namespace
+ using executor::NetworkInterfaceMock;
+
ReplicaSetConfig ReplCoordTest::assertMakeRSConfig(const BSONObj& configBson) {
ReplicaSetConfig config;
ASSERT_OK(config.initialize(configBson));
@@ -104,10 +107,12 @@ namespace {
_topo = new TopologyCoordinatorImpl(Seconds(0));
_net = new NetworkInterfaceMock;
+ _storage = new StorageInterfaceMock;
_externalState = new ReplicationCoordinatorExternalStateMock;
_repl.reset(new ReplicationCoordinatorImpl(_settings,
_externalState,
_net,
+ _storage,
_topo,
seed));
}
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h
index 93d3c7c4bd7..d6a67cced2e 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.h
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h
@@ -41,12 +41,16 @@ namespace mongo {
class BSONObj;
struct HostAndPort;
+namespace executor {
+ class NetworkInterfaceMock;
+} // namespace executor
+
namespace repl {
- class NetworkInterfaceMock;
class ReplicaSetConfig;
class ReplicationCoordinatorExternalStateMock;
class ReplicationCoordinatorImpl;
+ class StorageInterfaceMock;
class TopologyCoordinatorImpl;
/**
@@ -58,7 +62,7 @@ namespace repl {
* Makes a ResponseStatus with the given "doc" response and optional elapsed time "millis".
*/
static ResponseStatus makeResponseStatus(const BSONObj& doc,
- Milliseconds millis = Milliseconds(0));
+ Milliseconds millis = Milliseconds(0));
/**
* Constructs a ReplicaSetConfig from the given BSON, or raises a test failure exception.
@@ -75,7 +79,7 @@ namespace repl {
/**
* Gets the network mock.
*/
- NetworkInterfaceMock* getNet() { return _net; }
+ executor::NetworkInterfaceMock* getNet() { return _net; }
/**
* Gets the replication coordinator under test.
@@ -182,7 +186,9 @@ namespace repl {
// Owned by ReplicationCoordinatorImpl
TopologyCoordinatorImpl* _topo;
// Owned by ReplicationCoordinatorImpl
- NetworkInterfaceMock* _net;
+ executor::NetworkInterfaceMock* _net;
+ // Owned by ReplicationCoordinatorImpl
+ StorageInterfaceMock* _storage;
// Owned by ReplicationCoordinatorImpl
ReplicationCoordinatorExternalStateMock* _externalState;
ReplSettings _settings;
diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp
index 118ad0d0ded..72c611b600a 100644
--- a/src/mongo/db/repl/replication_executor.cpp
+++ b/src/mongo/db/repl/replication_executor.cpp
@@ -33,6 +33,8 @@
#include <limits>
#include "mongo/db/repl/database_task.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/executor/network_interface.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/mongoutils/str.h"
@@ -43,9 +45,14 @@ namespace {
stdx::function<void ()> makeNoExcept(const stdx::function<void ()> &fn);
} // namespace
- ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface, int64_t prngSeed) :
+ using executor::NetworkInterface;
+
+ ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface,
+ StorageInterface* storageInterface,
+ int64_t prngSeed) :
_random(prngSeed),
_networkInterface(netInterface),
+ _storageInterface(storageInterface),
_totalEventWaiters(0),
_inShutdown(false),
_dblockWorkers(threadpool::ThreadPool::DoNotStartThreadsTag(),
@@ -53,10 +60,10 @@ namespace {
"replCallbackWithGlobalLock-"),
_dblockTaskRunner(
&_dblockWorkers,
- stdx::bind(&NetworkInterface::createOperationContext, netInterface)),
+ stdx::bind(&StorageInterface::createOperationContext, storageInterface)),
_dblockExclusiveLockTaskRunner(
&_dblockWorkers,
- stdx::bind(&NetworkInterface::createOperationContext, netInterface)),
+ stdx::bind(&StorageInterface::createOperationContext, storageInterface)),
_nextId(0) {
}
@@ -561,15 +568,6 @@ namespace {
isSignaledCondition(new boost::condition_variable) {
}
- // This is a bitmask with the first bit set. It's used to mark connections that should be kept
- // open during stepdowns.
-#ifndef _MSC_EXTENSIONS
- const unsigned int ReplicationExecutor::NetworkInterface::kMessagingPortKeepOpen;
-#endif // _MSC_EXTENSIONS
-
- ReplicationExecutor::NetworkInterface::NetworkInterface() {}
- ReplicationExecutor::NetworkInterface::~NetworkInterface() {}
-
namespace {
void callNoExcept(const stdx::function<void ()>& fn) {
diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h
index b44ee4a4815..b90d0e24133 100644
--- a/src/mongo/db/repl/replication_executor.h
+++ b/src/mongo/db/repl/replication_executor.h
@@ -53,8 +53,14 @@ namespace mongo {
class NamespaceString;
class OperationContext;
+namespace executor{
+ class NetworkInterface;
+} // namespace executor
+
namespace repl {
+ class StorageInterface;
+
/**
* Event loop for driving state machines in replication.
*
@@ -112,7 +118,6 @@ namespace repl {
struct CallbackData;
class CallbackHandle;
class EventHandle;
- class NetworkInterface;
struct RemoteCommandCallbackData;
typedef StatusWith<RemoteCommandResponse> ResponseStatus;
@@ -141,7 +146,9 @@ namespace repl {
*
* Takes ownership of the passed NetworkInterface object.
*/
- explicit ReplicationExecutor(NetworkInterface* netInterface, int64_t pnrgSeed);
+ ReplicationExecutor(executor::NetworkInterface* netInterface,
+ StorageInterface* storageInterface,
+ int64_t pnrgSeed);
/**
* Destroys an executor.
@@ -422,7 +429,8 @@ namespace repl {
// PRNG; seeded at class construction time.
PseudoRandom _random;
- boost::scoped_ptr<NetworkInterface> _networkInterface;
+ boost::scoped_ptr<executor::NetworkInterface> _networkInterface;
+ boost::scoped_ptr<StorageInterface> _storageInterface;
boost::mutex _mutex;
boost::mutex _terribleExLockSyncMutex;
boost::condition_variable _noMoreWaitingThreads;
@@ -509,92 +517,8 @@ namespace repl {
OperationContext* txn;
};
- /**
- * Interface to networking and lock manager.
- */
- class ReplicationExecutor::NetworkInterface {
- MONGO_DISALLOW_COPYING(NetworkInterface);
- public:
-
- // A flag to keep replication MessagingPorts open when all other sockets are disconnected.
- static const unsigned int kMessagingPortKeepOpen = 1;
-
- typedef RemoteCommandResponse Response;
- typedef stdx::function<void (const ResponseStatus&)> RemoteCommandCompletionFn;
-
- virtual ~NetworkInterface();
-
- /**
- * Returns diagnostic info.
- */
- virtual std::string getDiagnosticString() = 0;
-
- /**
- * Starts up the network interface.
- *
- * It is valid to call all methods except shutdown() before this method completes. That is,
- * implementations may not assume that startup() completes before startCommand() first
- * executes.
- *
- * Called by the owning ReplicationExecutor inside its run() method.
- */
- virtual void startup() = 0;
-
- /**
- * Shuts down the network interface. Must be called before this instance gets deleted,
- * if startup() is called.
- *
- * Called by the owning ReplicationExecutor inside its run() method.
- */
- virtual void shutdown() = 0;
-
- /**
- * Blocks the current thread (presumably the executor thread) until the network interface
- * knows of work for the executor to perform.
- */
- virtual void waitForWork() = 0;
-
- /**
- * Similar to waitForWork, but only blocks until "when".
- */
- virtual void waitForWorkUntil(Date_t when) = 0;
-
- /**
- * Signals to the network interface that there is new work (such as a signaled event) for
- * the executor to process. Wakes the executor from waitForWork() and friends.
- */
- virtual void signalWorkAvailable() = 0;
-
- /**
- * Returns the current time.
- */
- virtual Date_t now() = 0;
-
- /**
- * Starts asynchronous execution of the command described by "request".
- */
- virtual void startCommand(const CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) = 0;
-
- /**
- * Requests cancelation of the network activity associated with "cbHandle" if it has not yet
- * completed.
- */
- virtual void cancelCommand(const CallbackHandle& cbHandle) = 0;
-
- /**
- * Creates an operation context for running database operations.
- */
- virtual OperationContext* createOperationContext() = 0;
-
- protected:
- NetworkInterface();
- };
-
typedef ReplicationExecutor::ResponseStatus ResponseStatus;
- // Must be after NetworkInterface class
struct ReplicationExecutor::RemoteCommandCallbackData {
RemoteCommandCallbackData(ReplicationExecutor* theExecutor,
const CallbackHandle& theHandle,
diff --git a/src/mongo/db/repl/replication_executor_test.cpp b/src/mongo/db/repl/replication_executor_test.cpp
index dbc4f00b6fa..d916f3e01d2 100644
--- a/src/mongo/db/repl/replication_executor_test.cpp
+++ b/src/mongo/db/repl/replication_executor_test.cpp
@@ -34,9 +34,10 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/replication_executor_test_fixture.h"
+#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/stdx/functional.h"
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/unittest.h"
@@ -48,6 +49,8 @@ namespace repl {
namespace {
+ using executor::NetworkInterfaceMock;
+
bool operator==(const RemoteCommandRequest lhs,
const RemoteCommandRequest rhs) {
return lhs.target == rhs.target &&
@@ -155,6 +158,7 @@ namespace {
void onGoAfterTriggered(const ReplicationExecutor::CallbackData& cbData);
NetworkInterfaceMock* net;
+ StorageInterfaceMock* storage;
ReplicationExecutor executor;
boost::thread executorThread;
const ReplicationExecutor::EventHandle goEvent;
@@ -176,7 +180,8 @@ namespace {
EventChainAndWaitingTest::EventChainAndWaitingTest() :
net(new NetworkInterfaceMock),
- executor(net, prngSeed),
+ storage(new StorageInterfaceMock),
+ executor(net, storage, prngSeed),
executorThread(stdx::bind(&ReplicationExecutor::run, &executor)),
goEvent(unittest::assertGet(executor.makeEvent())),
event2(unittest::assertGet(executor.makeEvent())),
diff --git a/src/mongo/db/repl/replication_executor_test_fixture.cpp b/src/mongo/db/repl/replication_executor_test_fixture.cpp
index b33e1983e82..6efbe072d52 100644
--- a/src/mongo/db/repl/replication_executor_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_executor_test_fixture.cpp
@@ -30,8 +30,9 @@
#include "mongo/db/repl/replication_executor_test_fixture.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/replication_executor.h"
+#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/executor/network_interface_mock.h"
namespace mongo {
namespace repl {
@@ -46,19 +47,20 @@ namespace {
ASSERT(!_executorThread);
_executorThread.reset(
new boost::thread(stdx::bind(&ReplicationExecutor::run, _executor.get())));
- _net->enterNetwork();
+ getNet()->enterNetwork();
}
void ReplicationExecutorTest::joinExecutorThread() {
ASSERT(_executorThread);
- _net->exitNetwork();
+ getNet()->exitNetwork();
_executorThread->join();
_executorThread.reset();
}
void ReplicationExecutorTest::setUp() {
- _net = new NetworkInterfaceMock;
- _executor.reset(new ReplicationExecutor(_net, prngSeed));
+ _net = new executor::NetworkInterfaceMock;
+ _storage = new StorageInterfaceMock;
+ _executor.reset(new ReplicationExecutor(_net, _storage, prngSeed));
}
void ReplicationExecutorTest::tearDown() {
diff --git a/src/mongo/db/repl/replication_executor_test_fixture.h b/src/mongo/db/repl/replication_executor_test_fixture.h
index ad9bc345ea0..479d799b990 100644
--- a/src/mongo/db/repl/replication_executor_test_fixture.h
+++ b/src/mongo/db/repl/replication_executor_test_fixture.h
@@ -34,12 +34,17 @@
#include "mongo/unittest/unittest.h"
namespace mongo {
+
+namespace executor {
+ class NetworkInterfaceMock;
+} // namespace executor
+
namespace repl {
using std::unique_ptr;
- class NetworkInterfaceMock;
class ReplicationExecutor;
+ class StorageInterfaceMock;
/**
* Test fixture for tests that require a ReplicationExecutor backed by
@@ -47,7 +52,7 @@ namespace repl {
*/
class ReplicationExecutorTest : public unittest::Test {
protected:
- NetworkInterfaceMock* getNet() { return _net; }
+ executor::NetworkInterfaceMock* getNet() { return _net; }
ReplicationExecutor& getExecutor() { return *_executor; }
/**
* Runs ReplicationExecutor in background.
@@ -80,7 +85,8 @@ namespace repl {
private:
- NetworkInterfaceMock* _net;
+ executor::NetworkInterfaceMock* _net;
+ StorageInterfaceMock* _storage;
unique_ptr<ReplicationExecutor> _executor;
unique_ptr<boost::thread> _executorThread;
};
diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp
index 37e1dfe9deb..600fbfe52a8 100644
--- a/src/mongo/db/repl/replset_commands.cpp
+++ b/src/mongo/db/repl/replset_commands.cpp
@@ -54,6 +54,7 @@
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/storage/storage_engine.h"
+#include "mongo/executor/network_interface.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
@@ -709,7 +710,7 @@ namespace {
{
AbstractMessagingPort *mp = txn->getClient()->port();
if( mp )
- mp->tag |= ReplicationExecutor::NetworkInterface::kMessagingPortKeepOpen;
+ mp->tag |= executor::NetworkInterface::kMessagingPortKeepOpen;
}
if (getGlobalReplicationCoordinator()->isV1ElectionProtocol()) {
diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp
index fa680a44612..ed617fa0cdf 100644
--- a/src/mongo/db/repl/reporter_test.cpp
+++ b/src/mongo/db/repl/reporter_test.cpp
@@ -28,9 +28,9 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/replication_executor_test_fixture.h"
#include "mongo/db/repl/reporter.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/unittest/unittest.h"
@@ -38,6 +38,7 @@ namespace {
using namespace mongo;
using namespace mongo::repl;
+ using executor::NetworkInterfaceMock;
class MockProgressManager : public ReplicationProgressManager {
public:
diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp
index 1fc6765e58e..49209e9a31a 100644
--- a/src/mongo/db/repl/scatter_gather_test.cpp
+++ b/src/mongo/db/repl/scatter_gather_test.cpp
@@ -31,10 +31,11 @@
#include <boost/scoped_ptr.hpp>
#include <boost/thread.hpp>
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/scatter_gather_algorithm.h"
#include "mongo/db/repl/scatter_gather_runner.h"
+#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/stdx/functional.h"
#include "mongo/unittest/unittest.h"
@@ -42,6 +43,8 @@ namespace mongo {
namespace repl {
namespace {
+ using executor::NetworkInterfaceMock;
+
/**
* Algorithm for testing the ScatterGatherRunner, which will finish running when finish() is
* called, or upon receiving responses from two nodes. Creates a three requests algorithm
@@ -112,13 +115,15 @@ namespace {
// owned by _executor
NetworkInterfaceMock* _net;
+ StorageInterfaceMock* _storage;
boost::scoped_ptr<ReplicationExecutor> _executor;
boost::scoped_ptr<boost::thread> _executorThread;
};
void ScatterGatherTest::setUp() {
_net = new NetworkInterfaceMock;
- _executor.reset(new ReplicationExecutor(_net, 1 /* prng seed */));
+ _storage = new StorageInterfaceMock;
+ _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng seed */));
_executorThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run,
_executor.get())));
}
diff --git a/src/mongo/db/repl/storage_interface.cpp b/src/mongo/db/repl/storage_interface.cpp
new file mode 100644
index 00000000000..c09d76ad9ff
--- /dev/null
+++ b/src/mongo/db/repl/storage_interface.cpp
@@ -0,0 +1,42 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/storage_interface.h"
+
+namespace mongo {
+namespace repl {
+
+ StorageInterface::StorageInterface() {}
+ StorageInterface::~StorageInterface() {}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
new file mode 100644
index 00000000000..df51692b2f1
--- /dev/null
+++ b/src/mongo/db/repl/storage_interface.h
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#pragma once
+
+
+namespace mongo {
+
+ class OperationContext;
+
+namespace repl {
+
+ /**
+ * Storage interface used by used by the ReplicationExecutor inside mongod for supporting
+ * ReplicationExectutor's ability to take database locks.
+ */
+ class StorageInterface {
+ public:
+ virtual ~StorageInterface();
+
+ /**
+ * Creates an operation context for running database operations.
+ */
+ virtual OperationContext* createOperationContext() = 0;
+
+ protected:
+
+ StorageInterface();
+
+ };
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
new file mode 100644
index 00000000000..73a14ce6330
--- /dev/null
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/storage_interface_impl.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/client.h"
+#include "mongo/db/operation_context_impl.h"
+
+namespace mongo {
+namespace repl {
+
+ StorageInterfaceImpl::StorageInterfaceImpl() : StorageInterface() {}
+ StorageInterfaceImpl::~StorageInterfaceImpl() { }
+
+ OperationContext* StorageInterfaceImpl::createOperationContext() {
+ if (!ClientBasic::getCurrent()) {
+ Client::initThreadIfNotAlready();
+ AuthorizationSession::get(*ClientBasic::getCurrent())->grantInternalAuthorization();
+ }
+ return new OperationContextImpl();
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
new file mode 100644
index 00000000000..24cc8268f17
--- /dev/null
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#pragma once
+
+#include "mongo/db/repl/storage_interface.h"
+
+namespace mongo {
+
+ class OperationContext;
+
+namespace repl {
+
+ class StorageInterfaceImpl : public StorageInterface {
+ public:
+ explicit StorageInterfaceImpl();
+ virtual ~StorageInterfaceImpl();
+
+ OperationContext* createOperationContext() override;
+
+ };
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp
new file mode 100644
index 00000000000..4a6f4a7a293
--- /dev/null
+++ b/src/mongo/db/repl/storage_interface_mock.cpp
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/storage_interface_mock.h"
+
+#include "mongo/db/repl/operation_context_repl_mock.h"
+
+namespace mongo {
+namespace repl {
+
+ StorageInterfaceMock::StorageInterfaceMock() {}
+
+ StorageInterfaceMock::~StorageInterfaceMock() { }
+
+ OperationContext* StorageInterfaceMock::createOperationContext() {
+ return new OperationContextReplMock();
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
new file mode 100644
index 00000000000..4bd3e63ec9d
--- /dev/null
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#pragma once
+
+#include "mongo/db/repl/storage_interface.h"
+
+namespace mongo {
+
+ class OperationContext;
+
+namespace repl {
+
+ class StorageInterfaceMock : public StorageInterface {
+ public:
+ explicit StorageInterfaceMock();
+ virtual ~StorageInterfaceMock();
+
+ OperationContext* createOperationContext() override;
+ };
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/vote_requester_test.cpp b/src/mongo/db/repl/vote_requester_test.cpp
index 09632675239..90d0843d843 100644
--- a/src/mongo/db/repl/vote_requester_test.cpp
+++ b/src/mongo/db/repl/vote_requester_test.cpp
@@ -33,9 +33,9 @@
#include "mongo/base/status.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/vote_requester.h"
-#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/repl_set_request_votes_args.h"
#include "mongo/db/repl/replication_executor.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/stdx/functional.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/mongoutils/str.h"
@@ -43,6 +43,8 @@
namespace mongo {
namespace repl {
namespace {
+
+ using executor::NetworkInterfaceMock;
using unittest::assertGet;
using RemoteCommandRequest = RemoteCommandRequest;