summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2015-04-13 16:57:54 -0400
committerBenety Goh <benety@mongodb.com>2015-04-21 11:14:01 -0400
commit61dd7ef93ab33b2a379be1caaf07b244d67343fb (patch)
treee63d5c2dc6a712bc7be7acfdf8175b634dd2c858
parent6b4074051eeafc3ebeb9d85828eb24d098e3192d (diff)
downloadmongo-61dd7ef93ab33b2a379be1caaf07b244d67343fb.tar.gz
SERVER-18015 SERVER-17894 data replication collection cloner cleanup
-rw-r--r--src/mongo/db/repl/SConscript2
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp11
-rw-r--r--src/mongo/db/repl/collection_cloner.h57
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp252
4 files changed, 57 insertions, 265 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 280b31a2a4e..af38224af98 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -326,6 +326,6 @@ env.CppUnitTest(
source='collection_cloner_test.cpp',
LIBDEPS=[
'collection_cloner',
- 'replication_executor_test_fixture',
+ 'base_cloner_test_fixture',
],
)
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 56066be614b..3a89ece4d4b 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -83,11 +83,14 @@ namespace repl {
uassert(ErrorCodes::BadValue, "null replication executor", executor);
uassert(ErrorCodes::BadValue, "invalid collection namespace: " + sourceNss.ns(),
sourceNss.isValid());
- uassert(ErrorCodes::BadValue, "null storage interface", storageInterface);
uassertStatusOK(options.validate());
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", work);
+ uassert(ErrorCodes::BadValue, "null storage interface", storageInterface);
}
- CollectionCloner::~CollectionCloner() { }
+ const NamespaceString& CollectionCloner::getSourceNamespace() const {
+ return _sourceNss;
+ }
std::string CollectionCloner::getDiagnosticString() const {
boost::lock_guard<boost::mutex> lk(_mutex);
@@ -172,7 +175,7 @@ namespace repl {
}
}
- void CollectionCloner::setScheduleDbWorkFn(ScheduleDbWorkFn scheduleDbWorkFn) {
+ void CollectionCloner::setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn) {
boost::lock_guard<boost::mutex> lk(_mutex);
_scheduleDbWorkFn = scheduleDbWorkFn;
@@ -293,7 +296,5 @@ namespace repl {
_work(Status::OK());
}
- CollectionCloner::StorageInterface::~StorageInterface() { }
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 5823e0bf96d..98d62ff527f 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -38,6 +38,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/base_cloner.h"
#include "mongo/db/repl/fetcher.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/stdx/functional.h"
@@ -46,7 +47,7 @@
namespace mongo {
namespace repl {
- class CollectionCloner {
+ class CollectionCloner : public BaseCloner {
MONGO_DISALLOW_COPYING(CollectionCloner);
public:
@@ -58,20 +59,14 @@ namespace repl {
class StorageInterface;
/**
- * Fetcher callback function to report final status of collection cloning.
- */
- typedef stdx::function<void (const Status&)> CallbackFn;
-
- /**
* Type of function to schedule database work with the executor.
*
* Must be consistent with ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock().
*
* Used for testing only.
*/
- typedef stdx::function<
- StatusWith<ReplicationExecutor::CallbackHandle> (
- const ReplicationExecutor::CallbackFn&)> ScheduleDbWorkFn;
+ using ScheduleDbWorkFn = stdx::function<StatusWith<ReplicationExecutor::CallbackHandle> (
+ const ReplicationExecutor::CallbackFn&)>;
/**
* Creates CollectionCloner task in inactive state. Use start() to activate cloner.
@@ -89,45 +84,23 @@ namespace repl {
const CallbackFn& work,
StorageInterface* storageInterface);
- virtual ~CollectionCloner();
+ virtual ~CollectionCloner() = default;
- /**
- * Returns diagnostic information.
- */
- std::string getDiagnosticString() const;
+ const NamespaceString& getSourceNamespace() const;
- /**
- * Returns true if the cloner has been started (but has not completed).
- */
- bool isActive() const;
+ std::string getDiagnosticString() const override;
- /**
- * Starts collection cloning by scheduling initial command to be run by the executor.
- */
- Status start();
+ bool isActive() const override;
- /**
- * Cancels current remote command request.
- * Returns immediately if collection cloner is not active.
- *
- * If the cloner is canceled after start() has been called, '_work' will be invoked
- * with a ErrorCodes::CallbackCanceled status.
- */
- void cancel();
+ Status start() override;
+
+ void cancel() override;
//
// Testing only functions below.
//
- /**
- * Waits for active remote commands and database worker to complete.
- * Returns immediately if collection cloner is not active.
- *
- * TODO: Internal state not sufficiently protected for production use.
- *
- * For testing only.
- */
- void wait();
+ void wait() override;
/**
* Waits for database worker to complete.
@@ -142,7 +115,7 @@ namespace repl {
*
* For testing only.
*/
- void setScheduleDbWorkFn(ScheduleDbWorkFn scheduleDbWorkFn);
+ void setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn);
private:
@@ -193,7 +166,7 @@ namespace repl {
// Owned by us.
std::unique_ptr<StorageInterface> _storageInterface;
- // Protects member data of this Fetcher.
+ // Protects member data of this collection cloner.
mutable boost::mutex _mutex;
// _active is true when Collection Cloner is started.
@@ -232,7 +205,7 @@ namespace repl {
/**
* When the storage interface is destroyed, it will commit the index builder.
*/
- virtual ~StorageInterface();
+ virtual ~StorageInterface() = default;
/**
* Creates a collection with the provided indexes.
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index b00aeaa2b6d..8beac905dc5 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -33,9 +33,9 @@
#include "mongo/db/commands.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/base_cloner_test_fixture.h"
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/network_interface_mock.h"
-#include "mongo/db/repl/replication_executor_test_fixture.h"
#include "mongo/unittest/unittest.h"
namespace {
@@ -43,187 +43,46 @@ namespace {
using namespace mongo;
using namespace mongo::repl;
- typedef NetworkInterfaceMock::NetworkOperationIterator NetworkOperationIterator;
-
- const HostAndPort target("localhost", -1);
- const NamespaceString nss("db.coll");
- const BSONObj idIndexSpec = BSON("v" << 1 <<
- "key" << BSON("_id" << 1) <<
- "name" << "_id_" <<
- "ns" << nss.ns());
-
- /**
- * Creates a cursor response with given array of documents.
- */
- BSONObj createCursorResponse(CursorId cursorId,
- const std::string& ns,
- const BSONArray& docs,
- const char* batchFieldName) {
- return BSON("cursor" << BSON("id" << cursorId <<
- "ns" << ns <<
- batchFieldName << docs) <<
- "ok" << 1);
- }
-
- BSONObj createCursorResponse(CursorId cursorId,
- const BSONArray& docs,
- const char* batchFieldName) {
- return createCursorResponse(cursorId, nss.toString(), docs, batchFieldName);
- }
-
- BSONObj createCursorResponse(CursorId cursorId,
- const BSONArray& docs) {
- return createCursorResponse(cursorId, docs, "firstBatch");
- }
-
- /**
- * Creates a listIndexes response with given array of index specs.
- */
- BSONObj createListIndexesResponse(CursorId cursorId,
- const BSONArray& specs,
- const char* batchFieldName) {
- return createCursorResponse(cursorId, "test.$cmd.listIndexes.coll", specs, batchFieldName);
- }
-
- BSONObj createListIndexesResponse(CursorId cursorId, const BSONArray& specs) {
- return createListIndexesResponse(cursorId, specs, "firstBatch");
- }
-
- class StorageInterfaceMock : public CollectionCloner::StorageInterface {
+ class CollectionClonerTest : public BaseClonerTest {
public:
- Status beginCollection(OperationContext* txn,
- const NamespaceString& nss,
- const CollectionOptions& options,
- const std::vector<BSONObj>& specs) override {
- return beginCollectionFn ? beginCollectionFn(txn, nss, options, specs) : Status::OK();
- }
-
- Status insertDocuments(OperationContext* txn,
- const NamespaceString& ns,
- const std::vector<BSONObj>& docs) override {
- return insertDocumentsFn ? insertDocumentsFn(txn, ns, docs) : Status::OK();
- }
-
- stdx::function<Status (OperationContext*,
- const NamespaceString&,
- const CollectionOptions&,
- const std::vector<BSONObj>&)> beginCollectionFn;
-
- stdx::function<Status (OperationContext*,
- const NamespaceString&,
- const std::vector<BSONObj>&)> insertDocumentsFn;
- };
-
- class CollectionClonerTest : public ReplicationExecutorTest {
- public:
- static Status getDefaultStatus();
CollectionClonerTest();
+
void setUp() override;
void tearDown() override;
- void clear();
- void scheduleNetworkResponse(NetworkOperationIterator noi,
- const BSONObj& obj);
- void scheduleNetworkResponse(NetworkOperationIterator noi,
- ErrorCodes::Error code, const std::string& reason);
- void scheduleNetworkResponse(const BSONObj& obj);
- void scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason);
- void processNetworkResponse(const BSONObj& obj);
- void processNetworkResponse(ErrorCodes::Error code, const std::string& reason);
- void finishProcessingNetworkResponse();
+ BaseCloner* getCloner() const override;
protected:
CollectionOptions options;
- Status status;
StorageInterfaceMock* storageInterface;
std::unique_ptr<CollectionCloner> collectionCloner;
-
- private:
- void _callback(const Status& status);
};
- Status CollectionClonerTest::getDefaultStatus() {
- return Status(ErrorCodes::InternalError, "Not mutated");
- }
-
CollectionClonerTest::CollectionClonerTest()
: options(),
- status(getDefaultStatus()),
+ storageInterface(nullptr),
collectionCloner() { }
void CollectionClonerTest::setUp() {
- ReplicationExecutorTest::setUp();
+ BaseClonerTest::setUp();
options.reset();
options.storageEngine = BSON("storageEngine1" << BSONObj());
- clear();
storageInterface = new StorageInterfaceMock();
collectionCloner.reset(new CollectionCloner(&getExecutor(), target, nss, options,
- stdx::bind(&CollectionClonerTest::_callback,
+ stdx::bind(&CollectionClonerTest::setStatus,
this,
stdx::placeholders::_1),
storageInterface));
- launchExecutorThread();
}
void CollectionClonerTest::tearDown() {
- ReplicationExecutorTest::tearDown();
+ BaseClonerTest::tearDown();
// Executor may still invoke collection cloner's callback before shutting down.
collectionCloner.reset();
options.reset();
}
- void CollectionClonerTest::clear() {
- status = getDefaultStatus();
- }
-
- void CollectionClonerTest::scheduleNetworkResponse(NetworkOperationIterator noi,
- const BSONObj& obj) {
-
- auto net = getNet();
- ReplicationExecutor::Milliseconds millis(0);
- ReplicationExecutor::RemoteCommandResponse response(obj, millis);
- ReplicationExecutor::ResponseStatus responseStatus(response);
- net->scheduleResponse(noi, net->now(), responseStatus);
- }
-
- void CollectionClonerTest::scheduleNetworkResponse(NetworkOperationIterator noi,
- ErrorCodes::Error code,
- const std::string& reason) {
-
- auto net = getNet();
- ReplicationExecutor::ResponseStatus responseStatus(code, reason);
- net->scheduleResponse(noi, net->now(), responseStatus);
- }
-
- void CollectionClonerTest::scheduleNetworkResponse(const BSONObj& obj) {
- ASSERT_TRUE(getNet()->hasReadyRequests());
- scheduleNetworkResponse(getNet()->getNextReadyRequest(), obj);
- }
-
- void CollectionClonerTest::scheduleNetworkResponse(ErrorCodes::Error code,
- const std::string& reason) {
- ASSERT_TRUE(getNet()->hasReadyRequests());
- scheduleNetworkResponse(getNet()->getNextReadyRequest(), code, reason);
- }
-
- void CollectionClonerTest::processNetworkResponse(const BSONObj& obj) {
- scheduleNetworkResponse(obj);
- finishProcessingNetworkResponse();
- }
-
- void CollectionClonerTest::processNetworkResponse(ErrorCodes::Error code,
- const std::string& reason) {
- scheduleNetworkResponse(code, reason);
- finishProcessingNetworkResponse();
- }
-
- void CollectionClonerTest::finishProcessingNetworkResponse() {
- clear();
- ASSERT_TRUE(collectionCloner->isActive());
- getNet()->runReadyNetworkOperations();
- }
-
- void CollectionClonerTest::_callback(const Status& theStatus) {
- status = theStatus;
+ BaseCloner* CollectionClonerTest::getCloner() const {
+ return collectionCloner.get();
}
TEST_F(CollectionClonerTest, InvalidConstruction) {
@@ -257,59 +116,18 @@ namespace {
ASSERT_THROWS(CollectionCloner(&executor, target, nss, invalidOptions, cb, si),
UserException);
}
- }
-
- TEST_F(CollectionClonerTest, GetDiagnosticString) {
- ASSERT_FALSE(collectionCloner->getDiagnosticString().empty());
- }
-
- TEST_F(CollectionClonerTest, IsActiveAfterStart) {
- ASSERT_FALSE(collectionCloner->isActive());
- ASSERT_OK(collectionCloner->start());
- ASSERT_TRUE(collectionCloner->isActive());
- }
-
- TEST_F(CollectionClonerTest, StartWhenActive) {
- ASSERT_OK(collectionCloner->start());
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_NOT_OK(collectionCloner->start());
- }
-
- TEST_F(CollectionClonerTest, CancelWithoutStart) {
- ASSERT_FALSE(collectionCloner->isActive());
- collectionCloner->cancel();
- }
- TEST_F(CollectionClonerTest, WaitWithoutStart) {
- ASSERT_FALSE(collectionCloner->isActive());
- collectionCloner->wait();
- }
-
- TEST_F(CollectionClonerTest, ShutdownBeforeStart) {
- getExecutor().shutdown();
- ASSERT_NOT_OK(collectionCloner->start());
- ASSERT_FALSE(collectionCloner->isActive());
- }
-
- TEST_F(CollectionClonerTest, StartAndCancel) {
- ASSERT_OK(collectionCloner->start());
- scheduleNetworkResponse(BSON("ok" << 1));
-
- collectionCloner->cancel();
- finishProcessingNetworkResponse();
-
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code());
+ // Callback function cannot be null.
+ {
+ CollectionCloner::CallbackFn nullCb;
+ CollectionCloner::StorageInterface* si = new StorageInterfaceMock();
+ ASSERT_THROWS(CollectionCloner(&executor, target, nss, options, nullCb, si),
+ UserException);
+ }
}
- TEST_F(CollectionClonerTest, StartButShutdown) {
- ASSERT_OK(collectionCloner->start());
- scheduleNetworkResponse(BSON("ok" << 1));
-
- getExecutor().shutdown();
- // Network interface should not deliver mock response to callback.
- finishProcessingNetworkResponse();
-
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code());
+ TEST_F(CollectionClonerTest, ClonerLifeCycle) {
+ testLifeCycle();
}
TEST_F(CollectionClonerTest, FirstRemoteCommand) {
@@ -332,7 +150,7 @@ namespace {
processNetworkResponse(
BSON("ok" << 0 << "errmsg" << "" << "code" << ErrorCodes::NamespaceNotFound));
- ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status.code());
+ ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
@@ -345,7 +163,7 @@ namespace {
// the cloner stops the fetcher from retrieving more results.
processNetworkResponse(createListIndexesResponse(1, BSONArray()));
- ASSERT_EQUALS(getDefaultStatus(), status);
+ ASSERT_EQUALS(getDefaultStatus(), getStatus());
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(getNet()->hasReadyRequests());
@@ -362,7 +180,7 @@ namespace {
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- ASSERT_EQUALS(ErrorCodes::UnknownError, status.code());
+ ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
@@ -384,7 +202,7 @@ namespace {
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code());
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
@@ -402,7 +220,7 @@ namespace {
collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(ErrorCodes::OperationFailed, status.code());
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
@@ -433,7 +251,7 @@ namespace {
processNetworkResponse(createListIndexesResponse(1, BSON_ARRAY(specs[0] << specs[1])));
// 'status' should not be modified because cloning is not finished.
- ASSERT_EQUALS(getDefaultStatus(), status);
+ ASSERT_EQUALS(getDefaultStatus(), getStatus());
ASSERT_TRUE(collectionCloner->isActive());
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(specs[2]), "nextBatch"));
@@ -441,7 +259,7 @@ namespace {
collectionCloner->waitForDbWorker();
// 'status' will be set if listIndexes fails.
- ASSERT_EQUALS(getDefaultStatus(), status);
+ ASSERT_EQUALS(getDefaultStatus(), getStatus());
ASSERT_EQUALS(nss.ns(), collNss.ns());
ASSERT_EQUALS(options.toBSON(), collOptions.toBSON());
@@ -474,7 +292,7 @@ namespace {
collectionCloner->waitForDbWorker();
ASSERT_TRUE(collectionCreated);
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code());
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
@@ -517,7 +335,7 @@ namespace {
processNetworkResponse(
BSON("ok" << 0 << "errmsg" << "" << "code" << ErrorCodes::CursorNotFound));
- ASSERT_EQUALS(ErrorCodes::CursorNotFound, status.code());
+ ASSERT_EQUALS(ErrorCodes::CursorNotFound, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
@@ -537,7 +355,7 @@ namespace {
net->runReadyNetworkOperations();
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code());
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
@@ -557,7 +375,7 @@ namespace {
const BSONObj doc = BSON("_id" << 1);
processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc)));
- ASSERT_EQUALS(ErrorCodes::UnknownError, status.code());
+ ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
@@ -584,7 +402,7 @@ namespace {
processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc)));
collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code());
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
@@ -607,7 +425,7 @@ namespace {
collectionCloner->wait();
- ASSERT_EQUALS(ErrorCodes::OperationFailed, status.code());
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
@@ -634,7 +452,7 @@ namespace {
ASSERT_EQUALS(1U, collDocuments.size());
ASSERT_EQUALS(doc, collDocuments[0]);
- ASSERT_OK(status);
+ ASSERT_OK(getStatus());
ASSERT_FALSE(collectionCloner->isActive());
}
@@ -661,7 +479,7 @@ namespace {
ASSERT_EQUALS(1U, collDocuments.size());
ASSERT_EQUALS(doc, collDocuments[0]);
- ASSERT_EQUALS(getDefaultStatus(), status);
+ ASSERT_EQUALS(getDefaultStatus(), getStatus());
ASSERT_TRUE(collectionCloner->isActive());
const BSONObj doc2 = BSON("_id" << 1);
@@ -671,7 +489,7 @@ namespace {
ASSERT_EQUALS(1U, collDocuments.size());
ASSERT_EQUALS(doc2, collDocuments[0]);
- ASSERT_OK(status);
+ ASSERT_OK(getStatus());
ASSERT_FALSE(collectionCloner->isActive());
}