/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include
#include
#include
#include "mongo/client/fetcher.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/feature_compatibility_version_parser.h"
#include "mongo/db/json.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/repl/base_cloner_test_fixture.h"
#include "mongo/db/repl/data_replicator_external_state_mock.h"
#include "mongo/db/repl/initial_syncer.h"
#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_consistency_markers_mock.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/replication_recovery_mock.h"
#include "mongo/db/repl/reporter.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/repl/sync_source_resolver.h"
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/db/repl/sync_source_selector_mock.h"
#include "mongo/db/repl/task_executor_mock.h"
#include "mongo/db/repl/update_position_args.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/scopeguard.h"
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
namespace repl {
/**
* Insertion operator for InitialSyncer::State. Formats initial syncer state for output stream.
*/
std::ostream& operator<<(std::ostream& os, const InitialSyncer::State& state) {
switch (state) {
case InitialSyncer::State::kPreStart:
return os << "PreStart";
case InitialSyncer::State::kRunning:
return os << "Running";
case InitialSyncer::State::kShuttingDown:
return os << "ShuttingDown";
case InitialSyncer::State::kComplete:
return os << "Complete";
}
MONGO_UNREACHABLE;
}
} // namespace repl
} // namespace mongo
namespace {
using namespace mongo;
using namespace mongo::repl;
using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
using unittest::log;
using LockGuard = stdx::lock_guard;
using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard;
using UniqueLock = stdx::unique_lock;
struct CollectionCloneInfo {
CollectionMockStats stats;
CollectionBulkLoaderMock* loader = nullptr;
Status status{ErrorCodes::NotYetInitialized, ""};
};
class InitialSyncerTest : public executor::ThreadPoolExecutorTest, public SyncSourceSelector {
public:
InitialSyncerTest() {}
executor::ThreadPoolMock::Options makeThreadPoolMockOptions() const override;
/**
* clear/reset state
*/
void reset() {
_setMyLastOptime = [this](const OpTime& opTime,
ReplicationCoordinator::DataConsistency consistency) {
_myLastOpTime = opTime;
};
_myLastOpTime = OpTime();
_syncSourceSelector = stdx::make_unique();
}
// SyncSourceSelector
void clearSyncSourceBlacklist() override {
_syncSourceSelector->clearSyncSourceBlacklist();
}
HostAndPort chooseNewSyncSource(const OpTime& ot) override {
return _syncSourceSelector->chooseNewSyncSource(ot);
}
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
_syncSourceSelector->blacklistSyncSource(host, until);
}
bool shouldChangeSyncSource(const HostAndPort& currentSource,
const rpc::ReplSetMetadata& replMetadata,
boost::optional oqMetadata) override {
return _syncSourceSelector->shouldChangeSyncSource(currentSource, replMetadata, oqMetadata);
}
void scheduleNetworkResponse(std::string cmdName, const BSONObj& obj) {
NetworkInterfaceMock* net = getNet();
if (!net->hasReadyRequests()) {
log() << "The network doesn't have a request to process for this response: " << obj;
}
verifyNextRequestCommandName(cmdName);
scheduleNetworkResponse(net->getNextReadyRequest(), obj);
}
void scheduleNetworkResponse(NetworkInterfaceMock::NetworkOperationIterator noi,
const BSONObj& obj) {
NetworkInterfaceMock* net = getNet();
Milliseconds millis(0);
RemoteCommandResponse response(obj, BSONObj(), millis);
log() << "Sending response for network request:";
log() << " req: " << noi->getRequest().dbname << "." << noi->getRequest().cmdObj;
log() << " resp:" << response;
net->scheduleResponse(noi, net->now(), response);
}
void scheduleNetworkResponse(std::string cmdName, Status errorStatus) {
NetworkInterfaceMock* net = getNet();
if (!getNet()->hasReadyRequests()) {
log() << "The network doesn't have a request to process for the error: " << errorStatus;
}
verifyNextRequestCommandName(cmdName);
net->scheduleResponse(net->getNextReadyRequest(), net->now(), errorStatus);
}
void processNetworkResponse(std::string cmdName, const BSONObj& obj) {
scheduleNetworkResponse(cmdName, obj);
finishProcessingNetworkResponse();
}
void processNetworkResponse(std::string cmdName, Status errorStatus) {
scheduleNetworkResponse(cmdName, errorStatus);
finishProcessingNetworkResponse();
}
/**
* Schedules and processes a successful response to the network request sent by InitialSyncer's
* last oplog entry fetcher. Also validates the find command arguments in the request.
*/
void processSuccessfulLastOplogEntryFetcherResponse(std::vector docs);
/**
* Schedules and processes a successful response to the network request sent by InitialSyncer's
* feature compatibility version fetcher. Includes the 'docs' provided in the response.
*/
void processSuccessfulFCVFetcherResponse(std::vector docs);
/**
* Schedules and processes a successful response to the network request sent by InitialSyncer's
* feature compatibility version fetcher. Always includes a valid fCV=3.6 document in the
* response.
*/
void processSuccessfulFCVFetcherResponse36();
void finishProcessingNetworkResponse() {
getNet()->runReadyNetworkOperations();
if (getNet()->hasReadyRequests()) {
log() << "The network has unexpected requests to process, next req:";
NetworkInterfaceMock::NetworkOperation req = *getNet()->getNextReadyRequest();
log() << req.getDiagnosticString();
}
ASSERT_FALSE(getNet()->hasReadyRequests());
}
InitialSyncer& getInitialSyncer() {
return *_initialSyncer;
}
DataReplicatorExternalStateMock* getExternalState() {
return _externalState;
}
StorageInterface& getStorage() {
return *_storageInterface;
}
protected:
struct StorageInterfaceResults {
bool createOplogCalled = false;
bool truncateCalled = false;
bool insertedOplogEntries = false;
int oplogEntriesInserted = 0;
bool droppedUserDBs = false;
std::vector droppedCollections;
int documentsInsertedCount = 0;
bool uniqueIndexUpdated = false;
bool upgradeNonReplicatedUniqueIndexesShouldFail = false;
};
stdx::mutex _storageInterfaceWorkDoneMutex; // protects _storageInterfaceWorkDone.
StorageInterfaceResults _storageInterfaceWorkDone;
void setUp() override {
executor::ThreadPoolExecutorTest::setUp();
_storageInterface = stdx::make_unique();
_storageInterface->createOplogFn = [this](OperationContext* opCtx,
const NamespaceString& nss) {
LockGuard lock(_storageInterfaceWorkDoneMutex);
_storageInterfaceWorkDone.createOplogCalled = true;
_storageInterfaceWorkDone.uniqueIndexUpdated = false;
_storageInterfaceWorkDone.upgradeNonReplicatedUniqueIndexesShouldFail = false;
return Status::OK();
};
_storageInterface->truncateCollFn = [this](OperationContext* opCtx,
const NamespaceString& nss) {
LockGuard lock(_storageInterfaceWorkDoneMutex);
_storageInterfaceWorkDone.truncateCalled = true;
return Status::OK();
};
_storageInterface->insertDocumentFn = [this](OperationContext* opCtx,
const NamespaceStringOrUUID& nsOrUUID,
const TimestampedBSONObj& doc,
long long term) {
LockGuard lock(_storageInterfaceWorkDoneMutex);
++_storageInterfaceWorkDone.documentsInsertedCount;
return Status::OK();
};
_storageInterface->insertDocumentsFn = [this](OperationContext* opCtx,
const NamespaceStringOrUUID& nsOrUUID,
const std::vector& ops) {
LockGuard lock(_storageInterfaceWorkDoneMutex);
_storageInterfaceWorkDone.insertedOplogEntries = true;
++_storageInterfaceWorkDone.oplogEntriesInserted;
return Status::OK();
};
_storageInterface->dropCollFn = [this](OperationContext* opCtx,
const NamespaceString& nss) {
LockGuard lock(_storageInterfaceWorkDoneMutex);
_storageInterfaceWorkDone.droppedCollections.push_back(nss.ns());
return Status::OK();
};
_storageInterface->dropUserDBsFn = [this](OperationContext* opCtx) {
LockGuard lock(_storageInterfaceWorkDoneMutex);
_storageInterfaceWorkDone.droppedUserDBs = true;
return Status::OK();
};
_storageInterface->createCollectionForBulkFn =
[this](const NamespaceString& nss,
const CollectionOptions& options,
const BSONObj idIndexSpec,
const std::vector& secondaryIndexSpecs) {
// Get collection info from map.
const auto collInfo = &_collections[nss];
if (collInfo->stats.initCalled) {
log() << "reusing collection during test which may cause problems, ns:" << nss;
}
(collInfo->loader = new CollectionBulkLoaderMock(&collInfo->stats))
->init(secondaryIndexSpecs)
.transitional_ignore();
return StatusWith>(
std::unique_ptr(collInfo->loader));
};
_storageInterface->upgradeNonReplicatedUniqueIndexesFn = [this](OperationContext* opCtx) {
LockGuard lock(_storageInterfaceWorkDoneMutex);
if (_storageInterfaceWorkDone.upgradeNonReplicatedUniqueIndexesShouldFail) {
// One of the status codes a failed upgradeNonReplicatedUniqueIndexes call
// can return is NamespaceNotFound.
return Status(ErrorCodes::NamespaceNotFound,
"upgradeNonReplicatedUniqueIndexes failed because the desired "
"ns was not found.");
} else {
_storageInterfaceWorkDone.uniqueIndexUpdated = true;
return Status::OK();
}
};
_dbWorkThreadPool = stdx::make_unique(ThreadPool::Options());
_dbWorkThreadPool->startup();
Client::initThreadIfNotAlready();
reset();
launchExecutorThread();
_replicationProcess = stdx::make_unique(
_storageInterface.get(),
stdx::make_unique(),
stdx::make_unique());
_executorProxy = stdx::make_unique(&getExecutor());
_myLastOpTime = OpTime({3, 0}, 1);
InitialSyncerOptions options;
options.initialSyncRetryWait = Milliseconds(1);
options.getMyLastOptime = [this]() { return _myLastOpTime; };
options.setMyLastOptime = [this](const OpTime& opTime,
ReplicationCoordinator::DataConsistency consistency) {
_setMyLastOptime(opTime, consistency);
};
options.resetOptimes = [this]() { _myLastOpTime = OpTime(); };
options.syncSourceSelector = this;
_options = options;
ThreadPool::Options threadPoolOptions;
threadPoolOptions.poolName = "replication";
threadPoolOptions.minThreads = 1U;
threadPoolOptions.maxThreads = 1U;
threadPoolOptions.onCreateThread = [](const std::string& threadName) {
Client::initThread(threadName.c_str());
};
auto dataReplicatorExternalState = stdx::make_unique();
dataReplicatorExternalState->taskExecutor = _executorProxy.get();
dataReplicatorExternalState->currentTerm = 1LL;
dataReplicatorExternalState->lastCommittedOpTime = _myLastOpTime;
{
ReplSetConfig config;
ASSERT_OK(config.initialize(BSON("_id"
<< "myset"
<< "version"
<< 1
<< "protocolVersion"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "localhost:12345"))
<< "settings"
<< BSON("electionTimeoutMillis" << 10000))));
dataReplicatorExternalState->replSetConfigResult = config;
}
_externalState = dataReplicatorExternalState.get();
_lastApplied = getDetectableErrorStatus();
_onCompletion = [this](const StatusWith& lastApplied) {
_lastApplied = lastApplied;
};
try {
// When creating InitialSyncer, we wrap _onCompletion so that we can override the
// InitialSyncer's callback behavior post-construction.
// See InitialSyncerTransitionsToCompleteWhenFinishCallbackThrowsException.
_initialSyncer = stdx::make_unique(
options,
std::move(dataReplicatorExternalState),
_dbWorkThreadPool.get(),
_storageInterface.get(),
_replicationProcess.get(),
[this](const StatusWith& lastApplied) {
_onCompletion(lastApplied);
});
_initialSyncer->setScheduleDbWorkFn_forTest(
[this](const executor::TaskExecutor::CallbackFn& work) {
return getExecutor().scheduleWork(work);
});
} catch (...) {
ASSERT_OK(exceptionToStatus());
}
}
void tearDownExecutorThread() {
if (_executorThreadShutdownComplete) {
return;
}
getExecutor().shutdown();
getExecutor().join();
_executorThreadShutdownComplete = true;
}
void tearDown() override {
tearDownExecutorThread();
_initialSyncer.reset();
_dbWorkThreadPool.reset();
_replicationProcess.reset();
_storageInterface.reset();
}
/**
* Note: An empty cmdName will skip validation.
*/
void verifyNextRequestCommandName(std::string cmdName) {
const auto net = getNet();
ASSERT_TRUE(net->hasReadyRequests());
if (cmdName != "") {
const NetworkInterfaceMock::NetworkOperationIterator req =
net->getFrontOfUnscheduledQueue();
const BSONObj reqBSON = req->getRequest().cmdObj;
const BSONElement cmdElem = reqBSON.firstElement();
auto reqCmdName = cmdElem.fieldNameStringData();
ASSERT_EQ(cmdName, reqCmdName);
}
}
void runInitialSyncWithBadFCVResponse(std::vector docs,
ErrorCodes::Error expectedError);
void doSuccessfulInitialSyncWithOneBatch(bool shouldSetFCV);
OplogEntry doInitialSyncWithOneBatch(bool shouldSetFCV);
std::unique_ptr _executorProxy;
InitialSyncerOptions _options;
InitialSyncerOptions::SetMyLastOptimeFn _setMyLastOptime;
OpTime _myLastOpTime;
std::unique_ptr _syncSourceSelector;
std::unique_ptr _storageInterface;
std::unique_ptr _replicationProcess;
std::unique_ptr _dbWorkThreadPool;
std::map _collectionStats;
std::map _collections;
StatusWith _lastApplied = Status(ErrorCodes::NotYetInitialized, "");
InitialSyncer::OnCompletionFn _onCompletion;
private:
DataReplicatorExternalStateMock* _externalState;
std::unique_ptr _initialSyncer;
bool _executorThreadShutdownComplete = false;
};
executor::ThreadPoolMock::Options InitialSyncerTest::makeThreadPoolMockOptions() const {
executor::ThreadPoolMock::Options options;
options.onCreateThread = []() { Client::initThread("InitialSyncerTest"); };
return options;
}
void advanceClock(NetworkInterfaceMock* net, Milliseconds duration) {
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
auto when = net->now() + duration;
ASSERT_EQUALS(when, net->runUntil(when));
}
ServiceContext::UniqueOperationContext makeOpCtx() {
return cc().makeOperationContext();
}
/**
* Generates a replSetGetRBID response.
*/
BSONObj makeRollbackCheckerResponse(int rollbackId) {
return BSON("ok" << 1 << "rbid" << rollbackId);
}
/**
* Generates a cursor response for a Fetcher to consume.
*/
RemoteCommandResponse makeCursorResponse(CursorId cursorId,
const NamespaceString& nss,
std::vector docs,
bool isFirstBatch = true,
int rbid = 1) {
OpTime futureOpTime(Timestamp(1000, 1000), 1000);
rpc::OplogQueryMetadata oqMetadata(futureOpTime, futureOpTime, rbid, 0, 0);
BSONObjBuilder metadataBob;
ASSERT_OK(oqMetadata.writeToMetadata(&metadataBob));
auto metadataObj = metadataBob.obj();
BSONObjBuilder bob;
{
BSONObjBuilder cursorBob(bob.subobjStart("cursor"));
cursorBob.append("id", cursorId);
cursorBob.append("ns", nss.toString());
{
BSONArrayBuilder batchBob(
cursorBob.subarrayStart(isFirstBatch ? "firstBatch" : "nextBatch"));
for (const auto& doc : docs) {
batchBob.append(doc);
}
}
}
bob.append("ok", 1);
return {bob.obj(), metadataObj, Milliseconds(0)};
}
/**
* Generates a listDatabases response for a DatabasesCloner to consume.
*/
BSONObj makeListDatabasesResponse(std::vector databaseNames) {
BSONObjBuilder bob;
{
BSONArrayBuilder databasesBob(bob.subarrayStart("databases"));
for (const auto& name : databaseNames) {
BSONObjBuilder nameBob(databasesBob.subobjStart());
nameBob.append("name", name);
}
}
bob.append("ok", 1);
return bob.obj();
}
/**
* Generates oplog entries with the given number used for the timestamp.
*/
OplogEntry makeOplogEntry(int t,
OpTypeEnum opType = OpTypeEnum::kInsert,
int version = OplogEntry::kOplogVersion) {
BSONObj oField = BSON("_id" << t << "a" << t);
if (opType == OpTypeEnum::kCommand) {
// Insert an arbitrary command name so that the oplog entry is valid.
oField = BSON("dropIndexes"
<< "a_1");
}
return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime
static_cast(t), // hash
opType, // op type
NamespaceString("a.a"), // namespace
boost::none, // uuid
boost::none, // fromMigrate
version, // version
oField, // o
boost::none, // o2
{}, // sessionInfo
boost::none, // upsert
boost::none, // wall clock time
boost::none, // statement id
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none); // post-image optime
}
BSONObj makeOplogEntryObj(int t,
OpTypeEnum opType = OpTypeEnum::kInsert,
int version = OplogEntry::kOplogVersion) {
return makeOplogEntry(t, opType, version).toBSON();
}
void InitialSyncerTest::processSuccessfulLastOplogEntryFetcherResponse(std::vector docs) {
auto net = getNet();
auto request = assertRemoteCommandNameEquals(
"find",
net->scheduleSuccessfulResponse(makeCursorResponse(0LL, _options.localOplogNS, docs)));
ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
ASSERT_TRUE(request.cmdObj.hasField("sort"));
ASSERT_EQUALS(mongo::BSONType::Object, request.cmdObj["sort"].type());
ASSERT_BSONOBJ_EQ(BSON("$natural" << -1), request.cmdObj.getObjectField("sort"));
net->runReadyNetworkOperations();
}
void assertFCVRequest(RemoteCommandRequest request) {
ASSERT_EQUALS(NamespaceString::kServerConfigurationNamespace.db(), request.dbname)
<< request.toString();
ASSERT_EQUALS(NamespaceString::kServerConfigurationNamespace.coll(),
request.cmdObj.getStringField("find"));
ASSERT_BSONOBJ_EQ(BSON("_id" << FeatureCompatibilityVersionParser::kParameterName),
request.cmdObj.getObjectField("filter"));
}
void InitialSyncerTest::processSuccessfulFCVFetcherResponse36() {
auto docs = {BSON("_id" << FeatureCompatibilityVersionParser::kParameterName << "version"
<< FeatureCompatibilityVersionParser::kVersion36)};
processSuccessfulFCVFetcherResponse(docs);
}
void InitialSyncerTest::processSuccessfulFCVFetcherResponse(std::vector docs) {
auto net = getNet();
auto request = assertRemoteCommandNameEquals(
"find",
net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, NamespaceString::kServerConfigurationNamespace, docs)));
assertFCVRequest(request);
net->runReadyNetworkOperations();
}
TEST_F(InitialSyncerTest, InvalidConstruction) {
InitialSyncerOptions options;
options.getMyLastOptime = []() { return OpTime(); };
options.setMyLastOptime = [](const OpTime&,
ReplicationCoordinator::DataConsistency consistency) {};
options.resetOptimes = []() {};
options.syncSourceSelector = this;
auto callback = [](const StatusWith&) {};
// Null task executor in external state.
{
auto dataReplicatorExternalState = stdx::make_unique();
ASSERT_THROWS_CODE_AND_WHAT(InitialSyncer(options,
std::move(dataReplicatorExternalState),
_dbWorkThreadPool.get(),
_storageInterface.get(),
_replicationProcess.get(),
callback),
AssertionException,
ErrorCodes::BadValue,
"task executor cannot be null");
}
// Null callback function.
{
auto dataReplicatorExternalState = stdx::make_unique();
dataReplicatorExternalState->taskExecutor = &getExecutor();
ASSERT_THROWS_CODE_AND_WHAT(InitialSyncer(options,
std::move(dataReplicatorExternalState),
_dbWorkThreadPool.get(),
_storageInterface.get(),
_replicationProcess.get(),
InitialSyncer::OnCompletionFn()),
AssertionException,
ErrorCodes::BadValue,
"callback function cannot be null");
}
}
TEST_F(InitialSyncerTest, CreateDestroy) {}
const std::uint32_t maxAttempts = 1U;
TEST_F(InitialSyncerTest, StartupReturnsIllegalOperationIfAlreadyActive) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
ASSERT_FALSE(initialSyncer->isActive());
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(initialSyncer->isActive());
ASSERT_EQUALS(ErrorCodes::IllegalOperation, initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(initialSyncer->isActive());
}
TEST_F(InitialSyncerTest, StartupReturnsShutdownInProgressIfInitialSyncerIsShuttingDown) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
ASSERT_FALSE(initialSyncer->isActive());
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(initialSyncer->isActive());
// SyncSourceSelector returns an invalid sync source so InitialSyncer is stuck waiting for
// another sync source in 'Options::syncSourceRetryWait' ms.
ASSERT_OK(initialSyncer->shutdown());
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
}
TEST_F(InitialSyncerTest, StartupReturnsShutdownInProgressIfExecutorIsShutdown) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
getExecutor().shutdown();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_FALSE(initialSyncer->isActive());
// Cannot startup initial syncer again since it's in the Complete state.
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
}
TEST_F(InitialSyncerTest, ShutdownTransitionsStateToCompleteIfCalledBeforeStartup) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
ASSERT_OK(initialSyncer->shutdown());
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
// Initial syncer is inactive when it's in the Complete state.
ASSERT_FALSE(initialSyncer->isActive());
}
TEST_F(InitialSyncerTest, StartupSetsInitialSyncFlagOnSuccess) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Initial sync flag should not be set before starting.
ASSERT_FALSE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(initialSyncer->isActive());
// Initial sync flag should be set.
ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
}
TEST_F(InitialSyncerTest, StartupSetsInitialDataTimestampAndStableTimestampOnSuccess) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Set initial data timestamp forward first.
auto serviceCtx = opCtx.get()->getServiceContext();
_storageInterface->setInitialDataTimestamp(serviceCtx, Timestamp(5, 5));
_storageInterface->setStableTimestamp(serviceCtx, Timestamp(6, 6));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(initialSyncer->isActive());
ASSERT_EQUALS(Timestamp::kAllowUnstableCheckpointsSentinel,
_storageInterface->getInitialDataTimestamp());
ASSERT_EQUALS(Timestamp::min(), _storageInterface->getStableTimestamp());
}
TEST_F(InitialSyncerTest, InitialSyncerReturnsCallbackCanceledIfShutdownImmediatelyAfterStartup) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
// This will cancel the _startInitialSyncAttemptCallback() task scheduled by startup().
ASSERT_OK(initialSyncer->shutdown());
// Depending on which InitialSyncer stage (_chooseSyncSource or _rollbackCheckerResetCallback)
// was interrupted by shutdown(), we may have to request the network interface to deliver
// cancellation signals to the InitialSyncer callbacks in for InitialSyncer to run to
// completion.
executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerRetriesSyncSourceSelectionIfChooseNewSyncSourceReturnsInvalidSyncSource) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Override chooseNewSyncSource() result in SyncSourceSelectorMock before calling startup()
// because InitialSyncer will look for a valid sync source immediately after startup.
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
// Run first sync source selection attempt.
executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
// InitialSyncer will not drop user databases while looking for a valid sync source.
ASSERT_FALSE(_storageInterfaceWorkDone.droppedUserDBs);
// First sync source selection attempt failed. Update SyncSourceSelectorMock to return valid
// sync source next time chooseNewSyncSource() is called.
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
// Advance clock until the next sync source selection attempt.
advanceClock(getNet(), _options.syncSourceRetryWait);
// DataReplictor drops user databases after obtaining a valid sync source.
ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs);
}
const std::uint32_t chooseSyncSourceMaxAttempts = 10U;
/**
* Advances executor clock so that InitialSyncer exhausts all 'chooseSyncSourceMaxAttempts' (server
* parameter numInitialSyncConnectAttempts) sync source selection attempts.
* If SyncSourceSelectorMock keeps returning an invalid sync source, InitialSyncer will retry every
* '_options.syncSourceRetryWait' ms up to a maximum of 'chooseSyncSourceMaxAttempts' attempts.
*/
void _simulateChooseSyncSourceFailure(executor::NetworkInterfaceMock* net,
Milliseconds syncSourceRetryWait) {
advanceClock(net, int(chooseSyncSourceMaxAttempts - 1) * syncSourceRetryWait);
}
TEST_F(
InitialSyncerTest,
InitialSyncerReturnsInitialSyncOplogSourceMissingIfNoValidSyncSourceCanBeFoundAfterTenFailedChooseSyncSourceAttempts) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Override chooseNewSyncSource() result in SyncSourceSelectorMock before calling startup()
// because InitialSyncer will look for a valid sync source immediately after startup.
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
_simulateChooseSyncSourceFailure(getNet(), _options.syncSourceRetryWait);
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::InitialSyncOplogSourceMissing, _lastApplied);
}
// Confirms that InitialSyncer keeps retrying initial sync.
// Make every initial sync attempt fail early by having the sync source selector always return an
// invalid sync source.
TEST_F(InitialSyncerTest,
InitialSyncerRetriesInitialSyncUpToMaxAttemptsAndReturnsLastAttemptError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
const std::uint32_t initialSyncMaxAttempts = 3U;
ASSERT_OK(initialSyncer->startup(opCtx.get(), initialSyncMaxAttempts));
auto net = getNet();
for (std::uint32_t i = 0; i < initialSyncMaxAttempts; ++i) {
_simulateChooseSyncSourceFailure(net, _options.syncSourceRetryWait);
advanceClock(net, _options.initialSyncRetryWait);
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::InitialSyncOplogSourceMissing, _lastApplied);
// Check number of failed attempts in stats.
auto progress = initialSyncer->getInitialSyncProgress();
unittest::log() << "Progress after " << initialSyncMaxAttempts
<< " failed attempts: " << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), int(initialSyncMaxAttempts))
<< progress;
ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), int(initialSyncMaxAttempts))
<< progress;
}
TEST_F(InitialSyncerTest, InitialSyncerResetsOptimesOnNewAttempt) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
// Set the last optime to an arbitrary nonzero value. The value of the 'consistency' argument
// doesn't matter.
auto origOptime = OpTime(Timestamp(1000, 1), 1);
_setMyLastOptime(origOptime, ReplicationCoordinator::DataConsistency::Inconsistent);
// Start initial sync.
const std::uint32_t initialSyncMaxAttempts = 1U;
ASSERT_OK(initialSyncer->startup(opCtx.get(), initialSyncMaxAttempts));
auto net = getNet();
// Simulate a failed initial sync attempt
_simulateChooseSyncSourceFailure(net, _options.syncSourceRetryWait);
advanceClock(net, _options.initialSyncRetryWait);
initialSyncer->join();
// Make sure the initial sync attempt reset optimes.
ASSERT_EQUALS(OpTime(), _options.getMyLastOptime());
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsCallbackCanceledIfShutdownWhileRetryingSyncSourceSelection) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
auto when = net->now() + _options.syncSourceRetryWait / 2;
ASSERT_GREATER_THAN(when, net->now());
ASSERT_EQUALS(when, net->runUntil(when));
}
// This will cancel the _chooseSyncSourceCallback() task scheduled at getNet()->now() +
// '_options.syncSourceRetryWait'.
ASSERT_OK(initialSyncer->shutdown());
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextChooseSyncSourceCallback) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
_executorProxy->shouldFailScheduleWorkAtRequest = []() { return true; };
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextInitialSyncAttempt) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
ASSERT_EQUALS(InitialSyncer::State::kPreStart, initialSyncer->getState_forTest());
ASSERT_OK(initialSyncer->startup(opCtx.get(), 2U));
ASSERT_EQUALS(InitialSyncer::State::kRunning, initialSyncer->getState_forTest());
// Advance clock so that we run all but the last sync source callback.
auto net = getNet();
advanceClock(net, int(chooseSyncSourceMaxAttempts - 2) * _options.syncSourceRetryWait);
// Last choose sync source attempt should now be scheduled. Advance clock so we fail last
// choose sync source attempt which cause the next initial sync attempt to be scheduled.
_executorProxy->shouldFailScheduleWorkAtRequest = []() { return true; };
advanceClock(net, _options.syncSourceRetryWait);
initialSyncer->join();
ASSERT_EQUALS(InitialSyncer::State::kComplete, initialSyncer->getState_forTest());
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
// This test verifies that the initial syncer will still transition to a complete state even if
// the completion callback function throws an exception.
TEST_F(InitialSyncerTest, InitialSyncerTransitionsToCompleteWhenFinishCallbackThrowsException) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_onCompletion = [this](const StatusWith& lastApplied) {
_lastApplied = lastApplied;
uassert(ErrorCodes::InternalError, "", false);
};
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_OK(initialSyncer->shutdown());
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
class SharedCallbackState {
MONGO_DISALLOW_COPYING(SharedCallbackState);
public:
explicit SharedCallbackState(bool* sharedCallbackStateDestroyed)
: _sharedCallbackStateDestroyed(sharedCallbackStateDestroyed) {}
~SharedCallbackState() {
*_sharedCallbackStateDestroyed = true;
}
private:
bool* _sharedCallbackStateDestroyed;
};
TEST_F(InitialSyncerTest, InitialSyncerResetsOnCompletionCallbackFunctionPointerUponCompletion) {
bool sharedCallbackStateDestroyed = false;
auto sharedCallbackData = std::make_shared(&sharedCallbackStateDestroyed);
decltype(_lastApplied) lastApplied = getDetectableErrorStatus();
auto dataReplicatorExternalState = stdx::make_unique();
dataReplicatorExternalState->taskExecutor = &getExecutor();
auto initialSyncer = stdx::make_unique(
_options,
std::move(dataReplicatorExternalState),
_dbWorkThreadPool.get(),
_storageInterface.get(),
_replicationProcess.get(),
[&lastApplied, sharedCallbackData](const StatusWith& result) {
lastApplied = result;
});
ON_BLOCK_EXIT([this]() { getExecutor().shutdown(); });
auto opCtx = makeOpCtx();
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
sharedCallbackData.reset();
ASSERT_FALSE(sharedCallbackStateDestroyed);
ASSERT_OK(initialSyncer->shutdown());
// Depending on which InitialSyncer stage (_chooseSyncSource or _rollbackCheckerResetCallback)
// was interrupted by shutdown(), we may have to request the network interface to deliver
// cancellation signals to the InitialSyncer callbacks in for InitialSyncer to run to
// completion.
executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, lastApplied);
// InitialSyncer should reset 'InitialSyncer::_onCompletion' after running callback function
// for the last time before becoming inactive.
// This ensures that we release resources associated with 'InitialSyncer::_onCompletion'.
ASSERT_TRUE(sharedCallbackStateDestroyed);
}
TEST_F(InitialSyncerTest, InitialSyncerTruncatesOplogAndDropsReplicatedDatabases) {
// We are not interested in proceeding beyond the dropUserDB stage so we inject a failure
// after setting '_storageInterfaceWorkDone.droppedUserDBs' to true.
auto oldDropUserDBsFn = _storageInterface->dropUserDBsFn;
_storageInterface->dropUserDBsFn = [oldDropUserDBsFn](OperationContext* opCtx) {
ASSERT_OK(oldDropUserDBsFn(opCtx));
return Status(ErrorCodes::OperationFailed, "drop userdbs failed");
};
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
LockGuard lock(_storageInterfaceWorkDoneMutex);
ASSERT_TRUE(_storageInterfaceWorkDone.truncateCalled);
ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs);
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetRollbackIdScheduleError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// replSetGetRBID is the first remote command to be scheduled by the initial syncer after
// creating the oplog collection.
executor::RemoteCommandRequest request;
_executorProxy->shouldFailScheduleRemoteCommandRequest =
[&request](const executor::RemoteCommandRequest& requestToSend) {
request = requestToSend;
return true;
};
HostAndPort syncSource("localhost", 12345);
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
ASSERT_EQUALS("admin", request.dbname);
assertRemoteCommandNameEquals("replSetGetRBID", request);
ASSERT_EQUALS(syncSource, request.target);
}
TEST_F(
InitialSyncerTest,
InitialSyncerReturnsShutdownInProgressIfSchedulingRollbackCheckerFailedDueToExecutorShutdown) {
// The rollback id request is sent immediately after oplog truncation. We shut the task executor
// down before returning from truncate() to make the scheduleRemoteCommand() call for
// replSetGetRBID fail.
auto oldTruncateCollFn = _storageInterface->truncateCollFn;
_storageInterface->truncateCollFn = [oldTruncateCollFn, this](OperationContext* opCtx,
const NamespaceString& nss) {
auto status = oldTruncateCollFn(opCtx, nss);
getExecutor().shutdown();
return status;
};
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _lastApplied);
LockGuard lock(_storageInterfaceWorkDoneMutex);
ASSERT_TRUE(_storageInterfaceWorkDone.truncateCalled);
}
TEST_F(InitialSyncerTest, InitialSyncerCancelsRollbackCheckerOnShutdown) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
HostAndPort syncSource("localhost", 12345);
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
ASSERT_EQUALS(InitialSyncer::State::kPreStart, initialSyncer->getState_forTest());
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_EQUALS(InitialSyncer::State::kRunning, initialSyncer->getState_forTest());
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
ASSERT_TRUE(net->hasReadyRequests());
auto noi = net->getNextReadyRequest();
const auto& request = assertRemoteCommandNameEquals("replSetGetRBID", noi->getRequest());
ASSERT_EQUALS("admin", request.dbname);
ASSERT_EQUALS(syncSource, request.target);
net->blackHole(noi);
}
ASSERT_OK(initialSyncer->shutdown());
// Since we need to request the NetworkInterfaceMock to deliver the cancellation event,
// the InitialSyncer has to be in a pre-completion state (ie. ShuttingDown).
ASSERT_EQUALS(InitialSyncer::State::kShuttingDown, initialSyncer->getState_forTest());
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
initialSyncer->join();
ASSERT_EQUALS(InitialSyncer::State::kComplete, initialSyncer->getState_forTest());
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughRollbackCheckerCallbackError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
assertRemoteCommandNameEquals(
"replSetGetRBID",
net->scheduleErrorResponse(
Status(ErrorCodes::OperationFailed, "replSetGetRBID failed at sync source")));
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughLastOplogEntryFetcherScheduleError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// The last oplog entry fetcher is the first component that sends a find command so we reject
// any find commands and save the request for inspection at the end of this test case.
executor::RemoteCommandRequest request;
_executorProxy->shouldFailScheduleRemoteCommandRequest =
[&request](const executor::RemoteCommandRequest& requestToSend) {
request = requestToSend;
return "find" == requestToSend.cmdObj.firstElement().fieldNameStringData();
};
HostAndPort syncSource("localhost", 12345);
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
ASSERT_EQUALS(syncSource, request.target);
ASSERT_EQUALS(_options.localOplogNS.db(), request.dbname);
assertRemoteCommandNameEquals("find", request);
ASSERT_BSONOBJ_EQ(BSON("$natural" << -1), request.cmdObj.getObjectField("sort"));
ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughLastOplogEntryFetcherCallbackError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
assertRemoteCommandNameEquals(
"find",
net->scheduleErrorResponse(
Status(ErrorCodes::OperationFailed, "find command failed at sync source")));
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerCancelsLastOplogEntryFetcherOnShutdown) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
ASSERT_TRUE(net->hasReadyRequests());
}
ASSERT_OK(initialSyncer->shutdown());
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsNoMatchingDocumentIfLastOplogEntryFetcherReturnsEmptyBatchOfDocuments) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({});
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerResendsFindCommandIfLastOplogEntryFetcherReturnsRetriableError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry first attempt - retriable error.
assertRemoteCommandNameEquals("find",
net->scheduleErrorResponse(Status(ErrorCodes::HostNotFound, "")));
net->runReadyNetworkOperations();
// InitialSyncer stays active because it resends the find request for the last oplog entry.
ASSERT_TRUE(initialSyncer->isActive());
// Last oplog entry second attempt.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingHash) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({BSONObj()});
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingTimestamp) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({BSON("h" << 1LL)});
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerPassesThroughErrorFromDataReplicatorExternalStateGetCurrentConfig) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
getExternalState()->replSetConfigResult = Status(ErrorCodes::OperationFailed, "");
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughFCVFetcherScheduleError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// We reject the first find command that is on the fCV collection.
executor::RemoteCommandRequest request;
_executorProxy->shouldFailScheduleRemoteCommandRequest =
[&request](const executor::RemoteCommandRequest& requestToSend) {
request = requestToSend;
return "find" == requestToSend.cmdObj.firstElement().fieldNameStringData() &&
NamespaceString::kServerConfigurationNamespace.coll() ==
requestToSend.cmdObj.firstElement().str();
};
HostAndPort syncSource("localhost", 12345);
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
ASSERT_EQUALS(syncSource, request.target);
assertFCVRequest(request);
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughFCVFetcherCallbackError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
auto request = assertRemoteCommandNameEquals(
"find",
net->scheduleErrorResponse(
Status(ErrorCodes::OperationFailed, "find command failed at sync source")));
assertFCVRequest(request);
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerCancelsFCVFetcherOnShutdown) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
ASSERT_TRUE(net->hasReadyRequests());
}
ASSERT_OK(initialSyncer->shutdown());
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerResendsFindCommandIfFCVFetcherReturnsRetriableError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// FCV first attempt - retriable error.
assertRemoteCommandNameEquals("find",
net->scheduleErrorResponse(Status(ErrorCodes::HostNotFound, "")));
net->runReadyNetworkOperations();
// InitialSyncer stays active because it resends the find request for the fCV.
ASSERT_TRUE(initialSyncer->isActive());
// FCV second attempt.
processSuccessfulFCVFetcherResponse36();
}
void InitialSyncerTest::runInitialSyncWithBadFCVResponse(std::vector docs,
ErrorCodes::Error expectedError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
processSuccessfulFCVFetcherResponse(docs);
}
initialSyncer->join();
ASSERT_EQUALS(expectedError, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsEmptyBatchOfDocuments) {
runInitialSyncWithBadFCVResponse({}, ErrorCodes::IncompatibleServerVersion);
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsTooManyMatchingDocumentsWhenFCVFetcherReturnsMultipleDocuments) {
auto docs = {BSON("_id" << FeatureCompatibilityVersionParser::kParameterName << "version"
<< FeatureCompatibilityVersionParser::kVersion36),
BSON("_id"
<< "other")};
runInitialSyncWithBadFCVResponse(docs, ErrorCodes::TooManyMatchingDocuments);
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsUpgradeTargetVersion) {
auto docs = {BSON("_id" << FeatureCompatibilityVersionParser::kParameterName << "version"
<< FeatureCompatibilityVersionParser::kVersion36
<< "targetVersion"
<< FeatureCompatibilityVersionParser::kVersion40)};
runInitialSyncWithBadFCVResponse(docs, ErrorCodes::IncompatibleServerVersion);
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsDowngradeTargetVersion) {
auto docs = {BSON("_id" << FeatureCompatibilityVersionParser::kParameterName << "version"
<< FeatureCompatibilityVersionParser::kVersion36
<< "targetVersion"
<< FeatureCompatibilityVersionParser::kVersion36)};
runInitialSyncWithBadFCVResponse(docs, ErrorCodes::IncompatibleServerVersion);
}
TEST_F(InitialSyncerTest, InitialSyncerReturnsBadValueWhenFCVFetcherReturnsNoVersion) {
auto docs = {BSON("_id" << FeatureCompatibilityVersionParser::kParameterName << "targetVersion"
<< FeatureCompatibilityVersionParser::kVersion36)};
runInitialSyncWithBadFCVResponse(docs, ErrorCodes::BadValue);
}
TEST_F(InitialSyncerTest, InitialSyncerSucceedsWhenFCVFetcherReturnsOldVersion) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
auto docs = {BSON("_id" << FeatureCompatibilityVersionParser::kParameterName << "version"
<< FeatureCompatibilityVersionParser::kVersion36)};
processSuccessfulFCVFetcherResponse(docs);
ASSERT_TRUE(net->hasReadyRequests());
}
// We shut it down so we do not have to finish initial sync. If the fCV fetcher got an error,
// we would return that.
ASSERT_OK(initialSyncer->shutdown());
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherScheduleError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Make the tailable oplog query fail. Allow all other requests to be scheduled.
executor::RemoteCommandRequest request;
_executorProxy->shouldFailScheduleRemoteCommandRequest =
[&request](const executor::RemoteCommandRequest& requestToSend) {
if ("find" == requestToSend.cmdObj.firstElement().fieldNameStringData() &&
requestToSend.cmdObj.getBoolField("tailable")) {
request = requestToSend;
return true;
}
return false;
};
HostAndPort syncSource("localhost", 12345);
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
net->runReadyNetworkOperations();
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// OplogFetcher will shut down DatabasesCloner on error after setting the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _databasesClonerCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
ASSERT_EQUALS(syncSource, request.target);
ASSERT_EQUALS(_options.localOplogNS.db(), request.dbname);
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherCallbackError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, _options.localOplogNS, {makeOplogEntryObj(1)}));
net->runReadyNetworkOperations();
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// Oplog tailing query.
auto request = assertRemoteCommandNameEquals(
"find", net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "dead cursor")));
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->runReadyNetworkOperations();
// OplogFetcher will shut down DatabasesCloner on error after setting the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _databasesClonerCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreNoOperationsToApply) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
auto request =
assertRemoteCommandNameEquals("find",
net->scheduleSuccessfulResponse(makeCursorResponse(
0LL, _options.localOplogNS, {makeOplogEntryObj(1)})));
ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
net->runReadyNetworkOperations();
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// Oplog tailing query.
// Simulate cursor closing on sync source.
request =
assertRemoteCommandNameEquals("find",
net->scheduleSuccessfulResponse(makeCursorResponse(
0LL, _options.localOplogNS, {makeOplogEntryObj(1)})));
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Last rollback checker replSetGetRBID command.
assertRemoteCommandNameEquals(
"replSetGetRBID", net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)));
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(makeOplogEntry(1).getOpTime(), unittest::assertGet(_lastApplied).opTime);
}
TEST_F(
InitialSyncerTest,
InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreEnoughOperationsInTheOplogBufferToReachEndTimestamp) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// Oplog tailing query.
// Simulate cursor closing on sync source.
auto request =
assertRemoteCommandNameEquals("find",
net->scheduleSuccessfulResponse(makeCursorResponse(
0LL,
_options.localOplogNS,
{makeOplogEntryObj(1),
makeOplogEntryObj(2, OpTypeEnum::kCommand),
makeOplogEntryObj(3, OpTypeEnum::kCommand)})));
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(3)});
// Last rollback checker replSetGetRBID command.
assertRemoteCommandNameEquals(
"replSetGetRBID", net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)));
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(makeOplogEntry(3).getOpTime(), unittest::assertGet(_lastApplied).opTime);
}
TEST_F(
InitialSyncerTest,
InitialSyncerReturnsRemoteResultsUnavailableOnEarlyOplogFetcherCompletionIfThereAreNotEnoughOperationsInTheOplogBufferToReachEndTimestamp) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// Oplog tailing query.
// Simulate cursor closing on sync source.
auto request =
assertRemoteCommandNameEquals("find",
net->scheduleSuccessfulResponse(makeCursorResponse(
0LL,
_options.localOplogNS,
{makeOplogEntryObj(1),
makeOplogEntryObj(2, OpTypeEnum::kCommand),
makeOplogEntryObj(3, OpTypeEnum::kCommand)})));
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
// Return an oplog entry with an optime that is more recent than what the completed
// OplogFetcher has read from the sync source.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(4)});
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::RemoteResultsUnavailable, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerPassesThroughDatabasesClonerScheduleErrorAndCancelsOplogFetcher) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Make the listDatabases command fail. Allow all other requests to be scheduled.
executor::RemoteCommandRequest request;
_executorProxy->shouldFailScheduleRemoteCommandRequest =
[&request](const executor::RemoteCommandRequest& requestToSend) {
if ("listDatabases" == requestToSend.cmdObj.firstElement().fieldNameStringData()) {
request = requestToSend;
return true;
}
return false;
};
HostAndPort syncSource("localhost", 12345);
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// InitialSyncer shuts down OplogFetcher when it fails to schedule DatabasesCloner
// so we should not expect any network requests in the queue.
ASSERT_FALSE(net->hasReadyRequests());
// OplogFetcher is shutting down but we still need to call runReadyNetworkOperations()
// to deliver the cancellation status to the 'InitialSyncer::_oplogFetcherCallback'
// callback.
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
ASSERT_EQUALS(syncSource, request.target);
ASSERT_EQUALS("admin", request.dbname);
assertRemoteCommandNameEquals("listDatabases", request);
}
TEST_F(InitialSyncerTest,
InitialSyncerPassesThroughDatabasesClonerCallbackErrorAndCancelsOplogFetcher) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// DatabasesCloner's first remote command - listDatabases
assertRemoteCommandNameEquals(
"listDatabases",
net->scheduleErrorResponse(Status(ErrorCodes::FailedToParse, "listDatabases failed")));
net->runReadyNetworkOperations();
// DatabasesCloner will shut down OplogFetcher on error after setting the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::FailedToParse, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerIgnoresLocalDatabasesWhenCloningDatabases) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// DatabasesCloner's first remote command - listDatabases
assertRemoteCommandNameEquals(
"listDatabases",
net->scheduleSuccessfulResponse(makeListDatabasesResponse({"a", "local", "b"})));
net->runReadyNetworkOperations();
// Oplog tailing query.
auto noi = net->getNextReadyRequest();
auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// DatabasesCloner should only send listCollections requests for databases 'a' and 'b'.
request = assertRemoteCommandNameEquals(
"listCollections",
net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("a"), {})));
ASSERT_EQUALS("a", request.dbname);
request = assertRemoteCommandNameEquals(
"listCollections",
net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
ASSERT_EQUALS("b", request.dbname);
// After processing all the database names and returning empty lists of collections for each
// database, data cloning should run to completion and we should expect to see a last oplog
// entry fetcher request.
request = assertRemoteCommandNameEquals(
"find",
net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
}
getExecutor().shutdown();
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerIgnoresDatabaseInfoDocumentWithoutNameFieldWhenCloningDatabases) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// DatabasesCloner's first remote command - listDatabases
assertRemoteCommandNameEquals(
"listDatabases",
net->scheduleSuccessfulResponse(BSON("databases" << BSON_ARRAY(BSON("name"
<< "a")
<< BSON("bad"
<< "dbinfo")
<< BSON("name"
<< "b"))
<< "ok"
<< 1)));
net->runReadyNetworkOperations();
// Oplog tailing query.
auto noi = net->getNextReadyRequest();
auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// DatabasesCloner should only send listCollections requests for databases 'a' and 'b'.
request = assertRemoteCommandNameEquals(
"listCollections",
net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("a"), {})));
ASSERT_EQUALS("a", request.dbname);
request = assertRemoteCommandNameEquals(
"listCollections",
net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
ASSERT_EQUALS("b", request.dbname);
// After processing all the database names and returning empty lists of collections for each
// database, data cloning should run to completion and we should expect to see a last oplog
// entry fetcher request.
request = assertRemoteCommandNameEquals(
"find",
net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
}
getExecutor().shutdown();
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerCancelsBothOplogFetcherAndDatabasesClonerOnShutdown) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
}
ASSERT_OK(initialSyncer->shutdown());
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerPassesThroughSecondLastOplogEntryFetcherScheduleErrorAndCancelsOplogFetcher) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Make the second last oplog entry fetcher command fail. Allow all other requests to be
// scheduled.
executor::RemoteCommandRequest request;
bool first = true;
_executorProxy->shouldFailScheduleRemoteCommandRequest =
[&first, &request](const executor::RemoteCommandRequest& requestToSend) {
if ("find" == requestToSend.cmdObj.firstElement().fieldNameStringData() &&
requestToSend.cmdObj.hasField("sort") &&
1 == requestToSend.cmdObj.getIntField("limit")) {
if (first) {
first = false;
return false;
}
request = requestToSend;
return true;
}
return false;
};
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// DatabasesCloner will shut down the OplogFetcher on failing to schedule the last entry
// oplog fetcher after setting the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerPassesThroughSecondLastOplogEntryFetcherCallbackErrorAndCancelsOplogFetcher) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// Second last oplog entry fetcher.
request = assertRemoteCommandNameEquals(
"find",
net->scheduleErrorResponse(
Status(ErrorCodes::OperationFailed, "second last oplog entry fetcher failed")));
ASSERT_TRUE(request.cmdObj.hasField("sort"));
ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
net->runReadyNetworkOperations();
// _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
// setting the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerCancelsBothSecondLastOplogEntryFetcherAndOplogFetcherOnShutdown) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
auto request = assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// Second last oplog entry fetcher.
noi = net->getNextReadyRequest();
request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.hasField("sort"));
ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
net->blackHole(noi);
}
initialSyncer->shutdown().transitional_ignore();
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerCancelsSecondLastOplogEntryFetcherOnOplogFetcherCallbackError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// Save request for OplogFetcher's oplog tailing query. This request will be canceled.
auto noi = net->getNextReadyRequest();
auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
auto oplogFetcherNetworkOperationIterator = noi;
// Second last oplog entry fetcher.
// Blackhole this request which will be canceled when oplog fetcher fails.
noi = net->getNextReadyRequest();
request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.hasField("sort"));
ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
net->blackHole(noi);
// Make oplog fetcher fail.
net->scheduleErrorResponse(oplogFetcherNetworkOperationIterator,
Status(ErrorCodes::OperationFailed, "oplog fetcher failed"));
net->runReadyNetworkOperations();
// _oplogFetcherCallback() will shut down the '_lastOplogEntryFetcher' after setting the
// completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _lastOplogEntryFetcherCallbackAfterCloningData().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(
InitialSyncerTest,
InitialSyncerReturnsTypeMismatchErrorWhenSecondLastOplogEntryFetcherReturnsMalformedDocument) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
auto request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({BSON("ts" << Timestamp(1) << "t" << 1 << "h"
<< "not a hash")});
// _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
// setting the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::TypeMismatch, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsOplogOutOfOrderIfStopTimestampPrecedesBeginTimestamp) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
// setting the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, _lastApplied);
}
TEST_F(
InitialSyncerTest,
InitialSyncerPassesThroughInsertOplogSeedDocumentErrorAfterDataCloningFinishesWithNoOperationsToApply) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
NamespaceString insertDocumentNss;
TimestampedBSONObj insertDocumentDoc;
long long insertDocumentTerm;
_storageInterface->insertDocumentFn =
[&insertDocumentDoc, &insertDocumentNss, &insertDocumentTerm](
OperationContext*,
const NamespaceStringOrUUID& nsOrUUID,
const TimestampedBSONObj& doc,
long long term) {
insertDocumentNss = *nsOrUUID.nss();
insertDocumentDoc = doc;
insertDocumentTerm = term;
return Status(ErrorCodes::OperationFailed, "failed to insert oplog entry");
};
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
// setting the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
ASSERT_EQUALS(_options.localOplogNS, insertDocumentNss);
ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc.obj);
}
TEST_F(
InitialSyncerTest,
InitialSyncerReturnsCallbackCanceledAndDoesNotScheduleRollbackCheckerIfShutdownAfterInsertingInsertOplogSeedDocument) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
NamespaceString insertDocumentNss;
TimestampedBSONObj insertDocumentDoc;
long long insertDocumentTerm;
_storageInterface->insertDocumentFn =
[initialSyncer, &insertDocumentDoc, &insertDocumentNss, &insertDocumentTerm](
OperationContext*,
const NamespaceStringOrUUID& nsOrUUID,
const TimestampedBSONObj& doc,
long long term) {
insertDocumentNss = *nsOrUUID.nss();
insertDocumentDoc = doc;
insertDocumentTerm = term;
initialSyncer->shutdown().transitional_ignore();
return Status::OK();
};
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
// setting the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
ASSERT_EQUALS(_options.localOplogNS, insertDocumentNss);
ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc.obj);
}
TEST_F(
InitialSyncerTest,
InitialSyncerPassesThroughRollbackCheckerScheduleErrorAfterCloningFinishesWithNoOperationsToApply) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Make the second replSetGetRBID command fail. Allow all other requests to be scheduled.
executor::RemoteCommandRequest request;
bool first = true;
_executorProxy->shouldFailScheduleRemoteCommandRequest =
[&first, &request](const executor::RemoteCommandRequest& requestToSend) {
if ("replSetGetRBID" == requestToSend.cmdObj.firstElement().fieldNameStringData()) {
if (first) {
first = false;
return false;
}
request = requestToSend;
return true;
}
return false;
};
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
// setting the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(
InitialSyncerTest,
InitialSyncerPassesThroughRollbackCheckerCallbackErrorAfterCloningFinishesWithNoOperationsToApply) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// Last rollback checker replSetGetRBID command.
assertRemoteCommandNameEquals(
"replSetGetRBID",
net->scheduleErrorResponse(
Status(ErrorCodes::OperationFailed, "replSetGetRBID command failed")));
net->runReadyNetworkOperations();
// _rollbackCheckerCheckForRollbackCallback() will shut down the OplogFetcher after setting
// the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnShutdown) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// Last rollback checker replSetGetRBID command.
noi = net->getNextReadyRequest();
assertRemoteCommandNameEquals("replSetGetRBID", noi->getRequest());
net->blackHole(noi);
// _rollbackCheckerCheckForRollbackCallback() will shut down the OplogFetcher after setting
// the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
ASSERT_OK(initialSyncer->shutdown());
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnOplogFetcherCallbackError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// Save request for OplogFetcher's oplog tailing query. This request will be canceled.
auto noi = net->getNextReadyRequest();
auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
auto oplogFetcherNetworkOperationIterator = noi;
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// Last rollback checker replSetGetRBID command.
noi = net->getNextReadyRequest();
request = noi->getRequest();
assertRemoteCommandNameEquals("replSetGetRBID", request);
net->blackHole(noi);
// Make oplog fetcher fail.
net->scheduleErrorResponse(oplogFetcherNetworkOperationIterator,
Status(ErrorCodes::OperationFailed, "oplog fetcher failed"));
net->runReadyNetworkOperations();
// _oplogFetcherCallback() will shut down the last rollback checker after setting the
// completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _rollbackCheckerCheckForRollbackCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsUnrecoverableRollbackErrorIfSyncSourceRolledBackAfterCloningData) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
auto request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
// Last rollback checker replSetGetRBID command.
request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId + 1));
net->runReadyNetworkOperations();
assertRemoteCommandNameEquals("replSetGetRBID", request);
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, _lastApplied);
}
TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
auto oplogEntry = makeOplogEntry(1);
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry.toBSON()});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Instead of fast forwarding to DatabasesCloner completion by returning an empty list of
// database names, we'll simulate copying a single database with a single collection on the
// sync source.
NamespaceString nss("a.a");
auto request =
net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
assertRemoteCommandNameEquals("listDatabases", request);
net->runReadyNetworkOperations();
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// listCollections for "a"
request = net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
assertRemoteCommandNameEquals("listCollections", request);
// count:a
request = assertRemoteCommandNameEquals(
"count", net->scheduleSuccessfulResponse(BSON("n" << 1 << "ok" << 1)));
ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
ASSERT_EQUALS(nss.db(), request.dbname);
// listIndexes:a
request = assertRemoteCommandNameEquals(
"listIndexes",
net->scheduleSuccessfulResponse(makeCursorResponse(
0LL,
NamespaceString(nss.getCommandNS()),
{BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name"
<< "_id_"
<< "ns"
<< nss.ns())})));
ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
ASSERT_EQUALS(nss.db(), request.dbname);
// find:a
request = assertRemoteCommandNameEquals("find",
net->scheduleSuccessfulResponse(makeCursorResponse(
0LL, nss, {BSON("_id" << 1 << "a" << 1)})));
ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
ASSERT_EQUALS(nss.db(), request.dbname);
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({oplogEntry.toBSON()});
// Last rollback checker replSetGetRBID command.
request = assertRemoteCommandNameEquals(
"replSetGetRBID",
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)));
net->runReadyNetworkOperations();
// Deliver cancellation to OplogFetcher.
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(oplogEntry.getOpTime(), unittest::assertGet(_lastApplied).opTime);
ASSERT_EQUALS(oplogEntry.getHash(), unittest::assertGet(_lastApplied).value);
ASSERT_FALSE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchScheduleError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
auto request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// Before processing scheduled last oplog entry fetcher response, set flag in
// TaskExecutorMock so that InitialSyncer will fail to schedule
// _getNextApplierBatchCallback().
_executorProxy->shouldFailScheduleWorkRequest = []() { return true; };
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
// setting the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughSecondGetNextApplierBatchScheduleError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
auto request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// Before processing scheduled last oplog entry fetcher response, set flag in
// TaskExecutorMock so that InitialSyncer will fail to schedule second
// _getNextApplierBatchCallback() at (now + options.getApplierBatchCallbackRetryWait).
_executorProxy->shouldFailScheduleWorkAtRequest = []() { return true; };
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
// setting the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchOnShutdown) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
auto request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// Since we black holed OplogFetcher's find request, _getNextApplierBatch_inlock() will
// not return any operations for us to apply, leading to _getNextApplierBatchCallback()
// rescheduling itself at new->now() + _options.getApplierBatchCallbackRetryWait.
}
ASSERT_OK(initialSyncer->shutdown());
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchInLockError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
// _getNextApplierBatch_inlock() returns BadValue when it gets an oplog entry with an unexpected
// version (not OplogEntry::kOplogVersion).
auto oplogEntry = makeOplogEntryObj(1);
auto oplogEntryWithInconsistentVersion =
makeOplogEntryObj(2, OpTypeEnum::kInsert, OplogEntry::kOplogVersion + 100);
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// OplogFetcher's oplog tailing query. Return bad oplog entry that will be added to the
// oplog buffer and processed by _getNextApplierBatch_inlock().
auto request = assertRemoteCommandNameEquals(
"find",
net->scheduleSuccessfulResponse(makeCursorResponse(
1LL, _options.localOplogNS, {oplogEntry, oplogEntryWithInconsistentVersion})));
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// _getNextApplierBatchCallback() will shut down the OplogFetcher after setting the
// completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::BadValue, _lastApplied);
}
TEST_F(
InitialSyncerTest,
InitialSyncerReturnsEmptyBatchFromGetNextApplierBatchInLockIfRsSyncApplyStopFailPointIsEnabled) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
// _getNextApplierBatch_inlock() returns BadValue when it gets an oplog entry with an unexpected
// version (not OplogEntry::kOplogVersion).
auto oplogEntry = makeOplogEntryObj(1);
auto oplogEntryWithInconsistentVersion =
makeOplogEntryObj(2, OpTypeEnum::kInsert, OplogEntry::kOplogVersion + 100);
// Enable 'rsSyncApplyStop' so that _getNextApplierBatch_inlock() returns an empty batch of
// operations instead of a batch containing an oplog entry with a bad version.
auto failPoint = getGlobalFailPointRegistry()->getFailPoint("rsSyncApplyStop");
failPoint->setMode(FailPoint::alwaysOn);
ON_BLOCK_EXIT([failPoint]() { failPoint->setMode(FailPoint::off); });
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// OplogFetcher's oplog tailing query. Return bad oplog entry that will be added to the
// oplog buffer and processed by _getNextApplierBatch_inlock().
auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
1LL, _options.localOplogNS, {oplogEntry, oplogEntryWithInconsistentVersion}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// Since the 'rsSyncApplyStop' fail point is enabled, InitialSyncer will get an empty
// batch of operations from _getNextApplierBatch_inlock() even though the oplog buffer
// is not empty.
}
// If the fail point is not working, the initial sync status will be set to BadValue (due to the
// bad oplog entry in the oplog buffer) and shutdown() will not be able to overwrite this status
// with CallbackCanceled.
// Otherwise, shutdown() will cancel both the OplogFetcher and the scheduled
// _getNextApplierBatchCallback() task. The final initial sync status will be CallbackCanceled.
ASSERT_OK(initialSyncer->shutdown());
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierScheduleError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// OplogFetcher's oplog tailing query. Save for later.
auto noi = net->getNextReadyRequest();
auto request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
auto oplogFetcherNoi = noi;
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// _getNextApplierBatchCallback() should have rescheduled itself.
// We'll insert some operations in the oplog buffer so that we'll attempt to schedule
// MultiApplier next time _getNextApplierBatchCallback() runs.
net->scheduleSuccessfulResponse(
oplogFetcherNoi,
makeCursorResponse(
1LL, _options.localOplogNS, {makeOplogEntryObj(1), makeOplogEntryObj(2)}));
net->runReadyNetworkOperations();
// Ignore OplogFetcher's getMore request.
noi = net->getNextReadyRequest();
request = noi->getRequest();
assertRemoteCommandNameEquals("getMore", request);
// Make MultiApplier::startup() fail.
_executorProxy->shouldFailScheduleWorkRequest = []() { return true; };
// Advance clock until _getNextApplierBatchCallback() runs.
auto when = net->now() + _options.getApplierBatchCallbackRetryWait;
ASSERT_EQUALS(when, net->runUntil(when));
// _getNextApplierBatchCallback() will shut down the OplogFetcher after setting the
// completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierCallbackError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
getExternalState()->multiApplyFn =
[](OperationContext*, const MultiApplier::Operations&, OplogApplier::Observer*) {
return Status(ErrorCodes::OperationFailed, "multiApply failed");
};
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// OplogFetcher's oplog tailing query. Provide enough operations to trigger MultiApplier.
auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
1LL, _options.localOplogNS, {makeOplogEntryObj(1), makeOplogEntryObj(2)}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// _multiApplierCallback() will shut down the OplogFetcher after setting the completion
// status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchCallbackOnOplogFetcherError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// OplogFetcher's oplog tailing query. Save for later.
auto noi = net->getNextReadyRequest();
auto request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
auto oplogFetcherNoi = noi;
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// Send error to _oplogFetcherCallback().
net->scheduleErrorResponse(oplogFetcherNoi,
Status(ErrorCodes::OperationFailed, "oplog fetcher failed"));
// _oplogFetcherCallback() will cancel the _getNextApplierBatchCallback() task after setting
// the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
OplogEntry InitialSyncerTest::doInitialSyncWithOneBatch(bool shouldSetFCV) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto lastOp = makeOplogEntry(2);
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// OplogFetcher's oplog tailing query. Response has enough operations to reach
// end timestamp.
auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
1LL, _options.localOplogNS, {makeOplogEntryObj(1), lastOp.toBSON()}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()});
// Black hole OplogFetcher's getMore request.
auto noi = net->getNextReadyRequest();
request = noi->getRequest();
assertRemoteCommandNameEquals("getMore", request);
net->blackHole(noi);
// Last rollback ID check. Before this check, set fCV to 4.2 if required by the test.
// TODO(SERVER-34489) Update below statement to setFCV=4.2 when upgrade/downgrade is ready.
if (shouldSetFCV) {
createTimestampSafeUniqueIndex = true;
}
request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
assertRemoteCommandNameEquals("replSetGetRBID", request);
net->runReadyNetworkOperations();
// _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
// the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
return lastOp;
}
void InitialSyncerTest::doSuccessfulInitialSyncWithOneBatch(bool shouldSetFCV) {
auto lastOp = doInitialSyncWithOneBatch(shouldSetFCV);
// TODO(SERVER-34489) Replace this by fCV reset when upgrade/downgrade is ready.
createTimestampSafeUniqueIndex = false;
ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime);
ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value);
ASSERT_EQUALS(lastOp.getOpTime().getTimestamp(), _storageInterface->getInitialDataTimestamp());
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingOneBatch) {
// Tell test to setFCV=4.2 before the last rollback ID check.
// _rollbackCheckerCheckForRollbackCallback() calls upgradeNonReplicatedUniqueIndexes
// only if fCV is 4.2.
doSuccessfulInitialSyncWithOneBatch(true);
// Ensure that upgradeNonReplicatedUniqueIndexes is called.
LockGuard lock(_storageInterfaceWorkDoneMutex);
ASSERT_TRUE(_storageInterfaceWorkDone.uniqueIndexUpdated);
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingMultipleBatches) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
// To make InitialSyncer apply multiple batches, we make the third and last operation a command
// so that it will go into a separate batch from the second operation. First operation is the
// last fetched entry before data cloning and is not applied.
auto lastOp = makeOplogEntry(3, OpTypeEnum::kCommand);
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Instead of fast forwarding to DatabasesCloner completion by returning an empty list of
// database names, we'll simulate copying a single database with a single collection on the
// sync source.
NamespaceString nss("a.a");
auto request =
net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
assertRemoteCommandNameEquals("listDatabases", request);
net->runReadyNetworkOperations();
// OplogFetcher's oplog tailing query. Response has enough operations to reach
// end timestamp.
request = net->scheduleSuccessfulResponse(
makeCursorResponse(1LL,
_options.localOplogNS,
{makeOplogEntryObj(1), makeOplogEntryObj(2), lastOp.toBSON()}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
// listCollections for "a"
request = net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
assertRemoteCommandNameEquals("listCollections", request);
// Black hole OplogFetcher's getMore request.
auto noi = net->getNextReadyRequest();
request = noi->getRequest();
assertRemoteCommandNameEquals("getMore", request);
net->blackHole(noi);
// count:a
request = net->scheduleSuccessfulResponse(BSON("n" << 1 << "ok" << 1));
assertRemoteCommandNameEquals("count", request);
ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
ASSERT_EQUALS(nss.db(), request.dbname);
// listIndexes:a
request = net->scheduleSuccessfulResponse(makeCursorResponse(
0LL,
NamespaceString(nss.getCommandNS()),
{BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name"
<< "_id_"
<< "ns"
<< nss.ns())}));
assertRemoteCommandNameEquals("listIndexes", request);
ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
ASSERT_EQUALS(nss.db(), request.dbname);
// find:a
request = net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, nss, {BSON("_id" << 1 << "a" << 1)}));
assertRemoteCommandNameEquals("find", request);
ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
ASSERT_EQUALS(nss.db(), request.dbname);
// Second last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()});
// Last rollback ID.
request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
assertRemoteCommandNameEquals("replSetGetRBID", request);
net->runReadyNetworkOperations();
// _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
// the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime);
ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value);
}
TEST_F(
InitialSyncerTest,
InitialSyncerSchedulesLastOplogEntryFetcherToGetNewStopTimestampIfMissingDocumentsHaveBeenFetchedDuringMultiInitialSyncApply) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// Override DataReplicatorExternalState::_multiInitialSyncApply() so that it will also fetch a
// missing document.
// This forces InitialSyncer to evaluate its end timestamp for applying operations after each
// batch.
bool fetchCountIncremented = false;
getExternalState()->multiApplyFn = [&fetchCountIncremented](OperationContext* opCtx,
const MultiApplier::Operations& ops,
OplogApplier::Observer* observer) {
if (!fetchCountIncremented) {
auto entry = makeOplogEntry(1);
observer->onMissingDocumentsFetchedAndInserted({std::make_pair(entry, BSONObj())});
fetchCountIncremented = true;
}
return ops.back().getOpTime();
};
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
// Use command for third and last operation to ensure we have two batches to apply.
auto lastOp = makeOplogEntry(3, OpTypeEnum::kCommand);
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
// OplogFetcher's oplog tailing query. Response has enough operations to reach
// end timestamp.
auto request = net->scheduleSuccessfulResponse(
makeCursorResponse(1LL,
_options.localOplogNS,
{makeOplogEntryObj(1), makeOplogEntryObj(2), lastOp.toBSON()}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
// Send oplog entry with timestamp 2. InitialSyncer will update this end timestamp after
// applying the first batch.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// Black hole OplogFetcher's getMore request.
auto noi = net->getNextReadyRequest();
request = noi->getRequest();
assertRemoteCommandNameEquals("getMore", request);
net->blackHole(noi);
// Third last oplog entry fetcher.
processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()});
// Last rollback ID.
request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
assertRemoteCommandNameEquals("replSetGetRBID", request);
net->runReadyNetworkOperations();
// _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
// the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime);
ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value);
ASSERT_TRUE(fetchCountIncremented);
auto progress = initialSyncer->getInitialSyncProgress();
log() << "Progress after failed initial sync attempt: " << progress;
ASSERT_EQUALS(1, progress.getIntField("fetchedMissingDocs")) << progress;
}
TEST_F(InitialSyncerTest,
InitialSyncerReturnsInvalidSyncSourceWhenFailInitialSyncWithBadHostFailpointIsEnabled) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
// This fail point makes chooseSyncSourceCallback fail with an InvalidSyncSource error.
auto failPoint = getGlobalFailPointRegistry()->getFailPoint("failInitialSyncWithBadHost");
failPoint->setMode(FailPoint::alwaysOn);
ON_BLOCK_EXIT([failPoint]() { failPoint->setMode(FailPoint::off); });
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, _lastApplied);
}
TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// Ignore listDatabases request.
auto noi = net->getNextReadyRequest();
auto request = noi->getRequest();
assertRemoteCommandNameEquals("listDatabases", request);
net->blackHole(noi);
// OplogFetcher's oplog tailing query.
request = net->scheduleSuccessfulResponse(
makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntryObj(1)}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
// Ensure that OplogFetcher fails with an OplogOutOfOrder error by responding to the getMore
// request with oplog entries containing the following timestamps (most recently processed
// oplog entry has a timestamp of 1):
// (last=1), 5, 4
request = net->scheduleSuccessfulResponse(makeCursorResponse(
1LL, _options.localOplogNS, {makeOplogEntryObj(5), makeOplogEntryObj(4)}, false));
assertRemoteCommandNameEquals("getMore", request);
net->runReadyNetworkOperations();
// Deliver cancellation signal to DatabasesCloner.
net->runReadyNetworkOperations();
}
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, _lastApplied);
}
TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 27017));
ASSERT_OK(initialSyncer->startup(opCtx.get(), 2U));
auto net = getNet();
int baseRollbackId = 1;
// Play first 2 responses to ensure initial syncer has started the oplog fetcher.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
}
log() << "Done playing first failed response";
auto progress = initialSyncer->getInitialSyncProgress();
log() << "Progress after first failed response: " << progress;
ASSERT_EQUALS(progress.nFields(), 8) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 0) << progress;
ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
ASSERT_BSONOBJ_EQ(progress.getObjectField("initialSyncAttempts"), BSONObj());
ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
ASSERT_EQUALS(progress.getIntField("appliedOps"), 0) << progress;
ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0));
// Play rest of the failed round of responses.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
auto request = net->scheduleErrorResponse(
Status(ErrorCodes::FailedToParse, "fail on clone -- listDBs injected failure"));
assertRemoteCommandNameEquals("listDatabases", request);
net->runReadyNetworkOperations();
// Deliver cancellation to OplogFetcher
net->runReadyNetworkOperations();
}
log() << "Done playing failed responses";
// Play the first 2 responses of the successful round of responses to ensure that the
// initial syncer starts the oplog fetcher.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
auto when = net->now() + _options.initialSyncRetryWait;
ASSERT_EQUALS(when, net->runUntil(when));
// Base rollback ID.
auto request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
assertRemoteCommandNameEquals("replSetGetRBID", request);
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
}
log() << "Done playing first successful response";
progress = initialSyncer->getInitialSyncProgress();
log() << "Progress after failure: " << progress;
ASSERT_EQUALS(progress.nFields(), 8) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
ASSERT_EQUALS(progress.getIntField("appliedOps"), 0) << progress;
ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0));
BSONObj attempts = progress["initialSyncAttempts"].Obj();
ASSERT_EQUALS(attempts.nFields(), 1) << attempts;
BSONObj attempt0 = attempts["0"].Obj();
ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0;
ASSERT_EQUALS(attempt0.getStringField("status"),
std::string("FailedToParse: error cloning databases :: caused by :: fail on "
"clone -- listDBs injected failure"))
<< attempt0;
ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0;
ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017"))
<< attempt0;
// Play all but last of the successful round of responses.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// listDatabases
NamespaceString nss("a.a");
auto request =
net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
assertRemoteCommandNameEquals("listDatabases", request);
net->runReadyNetworkOperations();
// Ignore oplog tailing query.
request = net->scheduleSuccessfulResponse(makeCursorResponse(1LL,
_options.localOplogNS,
{makeOplogEntryObj(1),
makeOplogEntryObj(2),
makeOplogEntryObj(3),
makeOplogEntryObj(4),
makeOplogEntryObj(5),
makeOplogEntryObj(6),
makeOplogEntryObj(7)}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
// listCollections for "a"
request = net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
assertRemoteCommandNameEquals("listCollections", request);
auto noi = net->getNextReadyRequest();
request = noi->getRequest();
assertRemoteCommandNameEquals("getMore", request);
net->blackHole(noi);
// count:a
request = net->scheduleSuccessfulResponse(BSON("n" << 5 << "ok" << 1));
assertRemoteCommandNameEquals("count", request);
ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
ASSERT_EQUALS(nss.db(), request.dbname);
// listIndexes:a
request = net->scheduleSuccessfulResponse(makeCursorResponse(
0LL,
NamespaceString(nss.getCommandNS()),
{BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name"
<< "_id_"
<< "ns"
<< nss.ns())}));
assertRemoteCommandNameEquals("listIndexes", request);
ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
ASSERT_EQUALS(nss.db(), request.dbname);
// find:a - 5 batches
for (int i = 1; i <= 5; ++i) {
request = net->scheduleSuccessfulResponse(
makeCursorResponse(i < 5 ? 2LL : 0LL, nss, {BSON("_id" << i << "a" << i)}, i == 1));
ASSERT_EQUALS(i == 1 ? "find" : "getMore",
request.cmdObj.firstElement().fieldNameStringData());
net->runReadyNetworkOperations();
}
// Second last oplog entry fetcher.
// Send oplog entry with timestamp 2. InitialSyncer will update this end timestamp after
// applying the first batch.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(7)});
}
log() << "Done playing all but last successful response";
progress = initialSyncer->getInitialSyncProgress();
log() << "Progress after all but last successful response: " << progress;
ASSERT_EQUALS(progress.nFields(), 9) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
ASSERT_EQUALS(progress["initialSyncOplogEnd"].timestamp(), Timestamp(7, 1)) << progress;
ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
// Expected applied ops to be a superset of this range: Timestamp(2,1) ... Timestamp(7,1).
ASSERT_GREATER_THAN_OR_EQUALS(progress.getIntField("appliedOps"), 6) << progress;
auto databasesProgress = progress.getObjectField("databases");
ASSERT_EQUALS(1, databasesProgress.getIntField("databasesCloned")) << databasesProgress;
auto dbProgress = databasesProgress.getObjectField("a");
ASSERT_EQUALS(1, dbProgress.getIntField("collections")) << dbProgress;
ASSERT_EQUALS(1, dbProgress.getIntField("clonedCollections")) << dbProgress;
auto collectionProgress = dbProgress.getObjectField("a.a");
ASSERT_EQUALS(
5, collectionProgress.getIntField(CollectionCloner::Stats::kDocumentsToCopyFieldName))
<< collectionProgress;
ASSERT_EQUALS(
5, collectionProgress.getIntField(CollectionCloner::Stats::kDocumentsCopiedFieldName))
<< collectionProgress;
ASSERT_EQUALS(1, collectionProgress.getIntField("indexes")) << collectionProgress;
ASSERT_EQUALS(5, collectionProgress.getIntField("fetchedBatches")) << collectionProgress;
attempts = progress["initialSyncAttempts"].Obj();
ASSERT_EQUALS(attempts.nFields(), 1) << progress;
attempt0 = attempts["0"].Obj();
ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0;
ASSERT_EQUALS(attempt0.getStringField("status"),
std::string("FailedToParse: error cloning databases :: caused by :: fail on "
"clone -- listDBs injected failure"))
<< attempt0;
ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0;
ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017"))
<< attempt0;
// Play last successful response.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Last rollback ID.
assertRemoteCommandNameEquals(
"replSetGetRBID",
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)));
net->runReadyNetworkOperations();
// _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
// the completion status.
// We call runReadyNetworkOperations() again to deliver the cancellation status to
// _oplogFetcherCallback().
net->runReadyNetworkOperations();
}
log() << "waiting for initial sync to verify it completed OK";
initialSyncer->join();
ASSERT_EQUALS(makeOplogEntry(7).getOpTime(), unittest::assertGet(_lastApplied).opTime);
progress = initialSyncer->getInitialSyncProgress();
log() << "Progress at end: " << progress;
ASSERT_EQUALS(progress.nFields(), 11) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
ASSERT_EQUALS(progress["initialSyncEnd"].type(), Date) << progress;
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
ASSERT_EQUALS(progress["initialSyncOplogEnd"].timestamp(), Timestamp(7, 1)) << progress;
ASSERT_EQUALS(progress["initialSyncElapsedMillis"].type(), NumberInt) << progress;
ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
// Expected applied ops to be a superset of this range: Timestamp(2,1) ... Timestamp(7,1).
ASSERT_GREATER_THAN_OR_EQUALS(progress.getIntField("appliedOps"), 6) << progress;
attempts = progress["initialSyncAttempts"].Obj();
ASSERT_EQUALS(attempts.nFields(), 2) << attempts;
attempt0 = attempts["0"].Obj();
ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0;
ASSERT_EQUALS(attempt0.getStringField("status"),
std::string("FailedToParse: error cloning databases :: caused by :: fail on "
"clone -- listDBs injected failure"))
<< attempt0;
ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0;
ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017"))
<< attempt0;
BSONObj attempt1 = attempts["1"].Obj();
ASSERT_EQUALS(attempt1.nFields(), 3) << attempt1;
ASSERT_EQUALS(attempt1.getStringField("status"), std::string("OK")) << attempt1;
ASSERT_EQUALS(attempt1["durationMillis"].type(), NumberInt) << attempt1;
ASSERT_EQUALS(attempt1.getStringField("syncSource"), std::string("localhost:27017"))
<< attempt1;
}
TEST_F(InitialSyncerTest, GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExceedBsonLimit) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 27017));
ASSERT_OK(initialSyncer->startup(opCtx.get(), 2U));
const std::size_t numCollections = 200000U;
auto net = getNet();
int baseRollbackId = 1;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Feature Compatibility Version.
processSuccessfulFCVFetcherResponse36();
// listDatabases
NamespaceString nss("a.a");
auto request =
net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
assertRemoteCommandNameEquals("listDatabases", request);
net->runReadyNetworkOperations();
// Ignore oplog tailing query.
request = net->scheduleSuccessfulResponse(
makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntryObj(1)}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
// listCollections for "a"
std::vector collectionInfos;
for (std::size_t i = 0; i < numCollections; ++i) {
const std::string collName = str::stream() << "coll-" << i;
collectionInfos.push_back(BSON("name" << collName << "options" << BSONObj()));
}
request = net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, nss.getCommandNS(), collectionInfos));
assertRemoteCommandNameEquals("listCollections", request);
net->runReadyNetworkOperations();
}
// This returns a valid document because we omit the cloner stats when they do not fit in a
// BSON document.
auto progress = initialSyncer->getInitialSyncProgress();
ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
ASSERT_FALSE(progress.hasField("databases")) << progress;
// Initial sync will attempt to log stats again at shutdown in a callback, where it should not
// terminate because we now return a valid stats document.
ASSERT_OK(initialSyncer->shutdown());
// Deliver cancellation signal to callbacks.
executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
initialSyncer->join();
}
TEST_F(InitialSyncerTest, InitialSyncerDoesNotCallUpgradeNonReplicatedUniqueIndexesOnFCV40) {
// In MongoDB 4.2, upgradeNonReplicatedUniqueIndexes will only be called if fCV is 4.2.
doSuccessfulInitialSyncWithOneBatch(false);
// TODO(SERVER-34489) Ensure that upgradeNonReplicatedUniqueIndexes is not called if fCV
// is not 4.2.
LockGuard lock(_storageInterfaceWorkDoneMutex);
ASSERT_FALSE(_storageInterfaceWorkDone.uniqueIndexUpdated);
}
TEST_F(InitialSyncerTest, InitialSyncerUpgradeNonReplicatedUniqueIndexesError) {
// Ensure upgradeNonReplicatedUniqueIndexes returns a bad status. This should be passed to the
// initial syncer.
{
LockGuard lock(_storageInterfaceWorkDoneMutex);
_storageInterfaceWorkDone.upgradeNonReplicatedUniqueIndexesShouldFail = true;
}
doInitialSyncWithOneBatch(true);
// Ensure the upgradeNonReplicatedUniqueIndexes status was captured.
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, _lastApplied);
}
} // namespace