/** * Copyright 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/repl/base_cloner_test_fixture.h" #include #include "mongo/db/jsobj.h" #include "mongo/stdx/thread.h" namespace mongo { namespace repl { using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; const HostAndPort BaseClonerTest::target("localhost", -1); const NamespaceString BaseClonerTest::nss("db.coll"); const BSONObj BaseClonerTest::idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" << "_id_" << "ns" << nss.ns()); // static BSONObj BaseClonerTest::createCursorResponse(CursorId cursorId, const std::string& ns, const BSONArray& docs, const char* batchFieldName) { return BSON("cursor" << BSON("id" << cursorId << "ns" << ns << batchFieldName << docs) << "ok" << 1); } // static BSONObj BaseClonerTest::createCursorResponse(CursorId cursorId, const BSONArray& docs, const char* batchFieldName) { return createCursorResponse(cursorId, nss.toString(), docs, batchFieldName); } // static BSONObj BaseClonerTest::createCursorResponse(CursorId cursorId, const BSONArray& docs) { return createCursorResponse(cursorId, docs, "firstBatch"); } // static BSONObj BaseClonerTest::createListCollectionsResponse(CursorId cursorId, const BSONArray& colls, const char* fieldName) { return createCursorResponse(cursorId, "test.$cmd.listCollections.coll", colls, fieldName); } // static BSONObj BaseClonerTest::createListCollectionsResponse(CursorId cursorId, const BSONArray& colls) { return createListCollectionsResponse(cursorId, colls, "firstBatch"); } // static BSONObj BaseClonerTest::createListIndexesResponse(CursorId cursorId, const BSONArray& specs, const char* batchFieldName) { return createCursorResponse(cursorId, "test.$cmd.listIndexes.coll", specs, batchFieldName); } // static BSONObj BaseClonerTest::createListIndexesResponse(CursorId cursorId, const BSONArray& specs) { return createListIndexesResponse(cursorId, specs, "firstBatch"); } BaseClonerTest::BaseClonerTest() : _mutex(), _setStatusCondition(), _status(getDetectableErrorStatus()) {} void BaseClonerTest::setUp() { ReplicationExecutorTest::setUp(); clear(); launchExecutorThread(); storageInterface.reset(new ClonerStorageInterfaceMock()); } void BaseClonerTest::tearDown() { ReplicationExecutorTest::tearDown(); storageInterface.reset(); } void BaseClonerTest::clear() { _status = getDetectableErrorStatus(); } void BaseClonerTest::setStatus(const Status& status) { stdx::unique_lock lk(_mutex); _status = status; _setStatusCondition.notify_all(); } const Status& BaseClonerTest::getStatus() const { stdx::unique_lock lk(_mutex); return _status; } void BaseClonerTest::scheduleNetworkResponse(NetworkOperationIterator noi, const BSONObj& obj) { auto net = getNet(); Milliseconds millis(0); RemoteCommandResponse response(obj, BSONObj(), millis); ReplicationExecutor::ResponseStatus responseStatus(response); net->scheduleResponse(noi, net->now(), responseStatus); } void BaseClonerTest::scheduleNetworkResponse(NetworkOperationIterator noi, ErrorCodes::Error code, const std::string& reason) { auto net = getNet(); ReplicationExecutor::ResponseStatus responseStatus(code, reason); net->scheduleResponse(noi, net->now(), responseStatus); } void BaseClonerTest::scheduleNetworkResponse(const BSONObj& obj) { ASSERT_TRUE(getNet()->hasReadyRequests()); scheduleNetworkResponse(getNet()->getNextReadyRequest(), obj); } void BaseClonerTest::scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason) { ASSERT_TRUE(getNet()->hasReadyRequests()); scheduleNetworkResponse(getNet()->getNextReadyRequest(), code, reason); } void BaseClonerTest::processNetworkResponse(const BSONObj& obj) { scheduleNetworkResponse(obj); finishProcessingNetworkResponse(); } void BaseClonerTest::processNetworkResponse(ErrorCodes::Error code, const std::string& reason) { scheduleNetworkResponse(code, reason); finishProcessingNetworkResponse(); } void BaseClonerTest::finishProcessingNetworkResponse() { clear(); getNet()->runReadyNetworkOperations(); } void BaseClonerTest::testLifeCycle() { // GetDiagnosticString ASSERT_FALSE(getCloner()->getDiagnosticString().empty()); // IsActiveAfterStart ASSERT_FALSE(getCloner()->isActive()); ASSERT_OK(getCloner()->start()); ASSERT_TRUE(getCloner()->isActive()); tearDown(); // StartWhenActive setUp(); ASSERT_OK(getCloner()->start()); ASSERT_TRUE(getCloner()->isActive()); ASSERT_NOT_OK(getCloner()->start()); ASSERT_TRUE(getCloner()->isActive()); tearDown(); // CancelWithoutStart setUp(); ASSERT_FALSE(getCloner()->isActive()); getCloner()->cancel(); ASSERT_FALSE(getCloner()->isActive()); tearDown(); // WaitWithoutStart setUp(); ASSERT_FALSE(getCloner()->isActive()); getCloner()->wait(); ASSERT_FALSE(getCloner()->isActive()); tearDown(); // ShutdownBeforeStart setUp(); getExecutor().shutdown(); ASSERT_NOT_OK(getCloner()->start()); ASSERT_FALSE(getCloner()->isActive()); tearDown(); // StartAndCancel setUp(); ASSERT_OK(getCloner()->start()); scheduleNetworkResponse(BSON("ok" << 1)); getCloner()->cancel(); finishProcessingNetworkResponse(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code()); ASSERT_FALSE(getCloner()->isActive()); tearDown(); // StartButShutdown setUp(); ASSERT_OK(getCloner()->start()); scheduleNetworkResponse(BSON("ok" << 1)); getExecutor().shutdown(); // Network interface should not deliver mock response to callback. finishProcessingNetworkResponse(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code()); ASSERT_FALSE(getCloner()->isActive()); } Status ClonerStorageInterfaceMock::beginCollection(OperationContext* txn, const NamespaceString& nss, const CollectionOptions& options, const std::vector& specs) { return beginCollectionFn ? beginCollectionFn(txn, nss, options, specs) : Status::OK(); } Status ClonerStorageInterfaceMock::insertDocuments(OperationContext* txn, const NamespaceString& nss, const std::vector& docs) { return insertDocumentsFn ? insertDocumentsFn(txn, nss, docs) : Status::OK(); } Status ClonerStorageInterfaceMock::commitCollection(OperationContext* txn, const NamespaceString& nss) { return Status::OK(); } Status ClonerStorageInterfaceMock::insertMissingDoc(OperationContext* txn, const NamespaceString& nss, const BSONObj& doc) { return Status::OK(); } Status ClonerStorageInterfaceMock::dropUserDatabases(OperationContext* txn) { return dropUserDatabasesFn ? dropUserDatabasesFn(txn) : Status::OK(); } } // namespace repl } // namespace mongo