summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-07-14 15:43:40 -0400
committerBenety Goh <benety@mongodb.com>2016-07-15 16:42:38 -0400
commit292d84272ba6a548e9c81064bcdab00dd1acf2a3 (patch)
tree8642d044f9492800d0a41f398dfd6a024ebe0193 /src/mongo
parente567346a1cc02c9edb8f33c8102a4cc35d1d70f2 (diff)
downloadmongo-292d84272ba6a548e9c81064bcdab00dd1acf2a3.tar.gz
SERVER-25069 DatabasesCloner accepts db worker thread pool at construction
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/data_replicator.cpp1
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp11
-rw-r--r--src/mongo/db/repl/databases_cloner.cpp4
-rw-r--r--src/mongo/db/repl/databases_cloner.h9
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp78
5 files changed, 97 insertions, 6 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 9fb1ed97f08..731d5114220 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -482,6 +482,7 @@ Status DataReplicator::_runInitialSyncAttempt_inlock(OperationContext* txn,
stdx::make_unique<DatabasesCloner>(
_storage,
_exec,
+ _dataReplicatorExternalState->getDbWorkThreadPool(),
syncSource,
[](BSONObj dbInfo) {
const std::string name = dbInfo["name"].str();
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index df60cef0885..b9cf05f948d 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -48,6 +48,7 @@
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/old_thread_pool.h"
#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/mongoutils/str.h"
@@ -204,6 +205,10 @@ public:
return *_storageInterface;
}
+ OldThreadPool& getDbWorkThreadPool() {
+ return *_dbWorkThreadPool;
+ }
+
protected:
struct StorageInterfaceResults {
bool createOplogCalled = false;
@@ -260,6 +265,8 @@ protected:
std::unique_ptr<CollectionBulkLoader>(collInfo->loader));
};
+ _dbWorkThreadPool = stdx::make_unique<OldThreadPool>(1);
+
Client::initThreadIfNotAlready();
reset();
@@ -297,6 +304,7 @@ protected:
auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
dataReplicatorExternalState->taskExecutor = &getExecutor();
+ dataReplicatorExternalState->dbWorkThreadPool = &getDbWorkThreadPool();
dataReplicatorExternalState->currentTerm = 1LL;
dataReplicatorExternalState->lastCommittedOpTime = _myLastOpTime;
{
@@ -330,6 +338,8 @@ protected:
executor::ThreadPoolExecutorTest::joinExecutorThread();
_dr.reset();
+ _dbWorkThreadPool->join();
+ _dbWorkThreadPool.reset();
_storageInterface.reset();
// tearDown() destroys the task executor which was referenced by the data replicator.
@@ -360,6 +370,7 @@ protected:
MemberState _memberState;
std::unique_ptr<SyncSourceSelector> _syncSourceSelector;
std::unique_ptr<StorageInterfaceMock> _storageInterface;
+ std::unique_ptr<OldThreadPool> _dbWorkThreadPool;
std::map<NamespaceString, CollectionMockStats> _collectionStats;
std::map<NamespaceString, CollectionCloneInfo> _collections;
diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp
index 8bb003ad843..86ef8e3fb64 100644
--- a/src/mongo/db/repl/databases_cloner.cpp
+++ b/src/mongo/db/repl/databases_cloner.cpp
@@ -64,17 +64,21 @@ const size_t numListDatabasesRetries = 1;
DatabasesCloner::DatabasesCloner(StorageInterface* si,
executor::TaskExecutor* exec,
+ OldThreadPool* dbWorkThreadPool,
HostAndPort source,
IncludeDbFilterFn includeDbPred,
OnFinishFn finishFn)
: _status(ErrorCodes::NotYetInitialized, ""),
_exec(exec),
+ _dbWorkThreadPool(dbWorkThreadPool),
_source(source),
_includeDbFn(includeDbPred),
_finishFn(finishFn),
_storage(si) {
uassert(ErrorCodes::InvalidOptions, "storage interface must be provided.", si);
uassert(ErrorCodes::InvalidOptions, "executor must be provided.", exec);
+ uassert(
+ ErrorCodes::InvalidOptions, "db worker thread pool must be provided.", dbWorkThreadPool);
uassert(ErrorCodes::InvalidOptions, "source must be provided.", !source.empty());
uassert(ErrorCodes::InvalidOptions, "finishFn must be provided.", finishFn);
uassert(ErrorCodes::InvalidOptions, "includeDbPred must be provided.", includeDbPred);
diff --git a/src/mongo/db/repl/databases_cloner.h b/src/mongo/db/repl/databases_cloner.h
index b9189052446..27b9bddc5ce 100644
--- a/src/mongo/db/repl/databases_cloner.h
+++ b/src/mongo/db/repl/databases_cloner.h
@@ -46,6 +46,9 @@
#include "mongo/util/net/hostandport.h"
namespace mongo {
+
+class OldThreadPool;
+
namespace repl {
namespace {
@@ -64,6 +67,7 @@ public:
using OnFinishFn = stdx::function<void(const Status&)>;
DatabasesCloner(StorageInterface* si,
executor::TaskExecutor* exec,
+ OldThreadPool* dbWorkThreadPool,
HostAndPort source,
IncludeDbFilterFn includeDbPred,
OnFinishFn finishFn);
@@ -112,8 +116,9 @@ private:
mutable stdx::mutex _mutex; // (S)
Status _status{ErrorCodes::NotYetInitialized, ""}; // (M) If it is not OK, we stop everything.
executor::TaskExecutor* _exec; // (R) executor to schedule things with
- HostAndPort _source; // (R) The source to use, until we get an error
- bool _active = false; // (M) false until we start
+ OldThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning.
+ HostAndPort _source; // (R) The source to use, until we get an error
+ bool _active = false; // (M) false until we start
std::vector<std::shared_ptr<DatabaseCloner>> _databaseCloners; // (M) database cloners by name
int _clonersActive = 0; // (M) Number of active cloners left.
std::unique_ptr<RemoteCommandRetryScheduler> _listDBsScheduler; // (M) scheduler for listDBs.
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index 23e24ad28c6..391baf59dbf 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -48,13 +48,13 @@
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/unittest/barrier.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/old_thread_pool.h"
#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/mongoutils/str.h"
-#include "mongo/unittest/barrier.h"
-#include "mongo/unittest/unittest.h"
-
namespace {
using namespace mongo;
using namespace mongo::repl;
@@ -86,12 +86,17 @@ struct StorageInterfaceResults {
class DBsClonerTest : public executor::ThreadPoolExecutorTest {
public:
- DBsClonerTest() : _storageInterface{} {}
+ DBsClonerTest()
+ : _storageInterface{}, _dbWorkThreadPool{OldThreadPool::DoNotStartThreadsTag(), 1} {}
StorageInterface& getStorage() {
return _storageInterface;
}
+ OldThreadPool& getDbWorkThreadPool() {
+ return _dbWorkThreadPool;
+ }
+
void scheduleNetworkResponse(std::string cmdName, const BSONObj& obj) {
NetworkInterfaceMock* net = getNet();
if (!net->hasReadyRequests()) {
@@ -184,9 +189,16 @@ protected:
return StatusWith<std::unique_ptr<CollectionBulkLoader>>(
std::unique_ptr<CollectionBulkLoader>(collInfo->loader));
};
+
+ _dbWorkThreadPool.startThreads();
}
void tearDown() override {
+ executor::ThreadPoolExecutorTest::shutdownExecutorThread();
+ executor::ThreadPoolExecutorTest::joinExecutorThread();
+
+ _dbWorkThreadPool.join();
+
executor::ThreadPoolExecutorTest::tearDown();
}
@@ -282,6 +294,7 @@ protected:
stdx::condition_variable cvDone;
DatabasesCloner cloner{&getStorage(),
&getExecutor(),
+ &getDbWorkThreadPool(),
HostAndPort{"local:1234"},
[](const BSONObj&) { return true; },
[&](const Status& status) {
@@ -307,6 +320,7 @@ protected:
private:
StorageInterfaceMock _storageInterface;
+ OldThreadPool _dbWorkThreadPool;
std::map<NamespaceString, CollectionMockStats> _collectionStats;
std::map<NamespaceString, CollectionCloneInfo> _collections;
StorageInterfaceResults _storageInterfaceWorkDone;
@@ -315,11 +329,65 @@ private:
// TODO: Move tests here from data_replicator_test here and figure out
// how to script common data (dbs, collections, indexes) scenarios w/failures.
+TEST_F(DBsClonerTest, InvalidConstruction) {
+ HostAndPort source{"local:1234"};
+ auto includeDbPred = [](const BSONObj&) { return true; };
+ auto finishFn = [](const Status&) {};
+
+ // Null storage interface.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ DatabasesCloner(
+ nullptr, &getExecutor(), &getDbWorkThreadPool(), source, includeDbPred, finishFn),
+ UserException,
+ ErrorCodes::InvalidOptions,
+ "storage interface must be provided.");
+
+ // Null task executor.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ DatabasesCloner(
+ &getStorage(), nullptr, &getDbWorkThreadPool(), source, includeDbPred, finishFn),
+ UserException,
+ ErrorCodes::InvalidOptions,
+ "executor must be provided.");
+
+ // Null db worker thread pool.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ DatabasesCloner(&getStorage(), &getExecutor(), nullptr, source, includeDbPred, finishFn),
+ UserException,
+ ErrorCodes::InvalidOptions,
+ "db worker thread pool must be provided.");
+
+ // Empty source.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ DatabasesCloner(
+ &getStorage(), &getExecutor(), &getDbWorkThreadPool(), {}, includeDbPred, finishFn),
+ UserException,
+ ErrorCodes::InvalidOptions,
+ "source must be provided.");
+
+ // Null include database predicate.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ DatabasesCloner(
+ &getStorage(), &getExecutor(), &getDbWorkThreadPool(), source, {}, finishFn),
+ UserException,
+ ErrorCodes::InvalidOptions,
+ "includeDbPred must be provided.");
+
+ // Null finish callback.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ DatabasesCloner(
+ &getStorage(), &getExecutor(), &getDbWorkThreadPool(), source, includeDbPred, {}),
+ UserException,
+ ErrorCodes::InvalidOptions,
+ "finishFn must be provided.");
+}
+
TEST_F(DBsClonerTest, FailsOnListDatabases) {
Status result{Status::OK()};
Status expectedResult{ErrorCodes::BadValue, "foo"};
DatabasesCloner cloner{&getStorage(),
&getExecutor(),
+ &getDbWorkThreadPool(),
HostAndPort{"local:1234"},
[](const BSONObj&) { return true; },
[&result](const Status& status) {
@@ -340,6 +408,7 @@ TEST_F(DBsClonerTest, FailsOnListCollectionsOnOnlyDatabase) {
Status result{Status::OK()};
DatabasesCloner cloner{&getStorage(),
&getExecutor(),
+ &getDbWorkThreadPool(),
HostAndPort{"local:1234"},
[](const BSONObj&) { return true; },
[&result](const Status& status) {
@@ -368,6 +437,7 @@ TEST_F(DBsClonerTest, FailsOnListCollectionsOnFirstOfTwoDatabases) {
Status expectedStatus{ErrorCodes::NoSuchKey, "fake"};
DatabasesCloner cloner{&getStorage(),
&getExecutor(),
+ &getDbWorkThreadPool(),
HostAndPort{"local:1234"},
[](const BSONObj&) { return true; },
[&result](const Status& status) {