/**
* Copyright 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include
#include "mongo/base/disallow_copying.h"
#include "mongo/db/repl/data_replicator_external_state_mock.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/rpc/metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
namespace {
using namespace mongo;
using namespace mongo::repl;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
class ShutdownState {
MONGO_DISALLOW_COPYING(ShutdownState);
public:
ShutdownState();
Status getStatus() const;
OpTimeWithHash getLastFetched() const;
/**
* Use this for oplog fetcher shutdown callback.
*/
void operator()(const Status& status, const OpTimeWithHash& lastFetched);
private:
Status _status = executor::TaskExecutorTest::getDetectableErrorStatus();
OpTimeWithHash _lastFetched = {0, OpTime()};
};
class OplogFetcherTest : public executor::ThreadPoolExecutorTest {
protected:
void setUp() override;
void tearDown() override;
/**
* Schedules response to the current network request.
* Returns remote command request in network request.
*/
RemoteCommandRequest scheduleNetworkResponse(RemoteCommandResponse response);
/**
* Schedules network response and instructs network interface to process response.
* Returns remote command request in network request.
*/
RemoteCommandRequest processNetworkResponse(RemoteCommandResponse response,
bool expectReadyRequestsAfterProcessing = false);
RemoteCommandRequest processNetworkResponse(BSONObj obj,
bool expectReadyRequestsAfterProcessing = false);
/**
* Starts an oplog fetcher. Processes a single batch of results from
* the oplog query and shuts down.
* Returns shutdown state.
*/
std::unique_ptr processSingleBatch(RemoteCommandResponse response);
std::unique_ptr processSingleBatch(BSONObj obj);
/**
* Tests checkSyncSource result handling.
*/
void testSyncSourceChecking(rpc::ReplSetMetadata* metadata);
/**
* Tests handling of two batches of operations returned from query.
* Returns getMore request.
*/
RemoteCommandRequest testTwoBatchHandling(bool isV1ElectionProtocol);
OpTimeWithHash lastFetched;
std::unique_ptr dataReplicatorExternalState;
Fetcher::Documents lastEnqueuedDocuments;
OplogFetcher::DocumentsInfo lastEnqueuedDocumentsInfo;
Milliseconds lastEnqueuedElapsed;
OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn;
};
ShutdownState::ShutdownState() = default;
Status ShutdownState::getStatus() const {
return _status;
}
OpTimeWithHash ShutdownState::getLastFetched() const {
return _lastFetched;
}
void ShutdownState::operator()(const Status& status, const OpTimeWithHash& lastFetched) {
_status = status;
_lastFetched = lastFetched;
}
void OplogFetcherTest::setUp() {
executor::ThreadPoolExecutorTest::setUp();
launchExecutorThread();
lastFetched = {456LL, {{123, 0}, 1}};
dataReplicatorExternalState = stdx::make_unique();
dataReplicatorExternalState->currentTerm = lastFetched.opTime.getTerm();
dataReplicatorExternalState->lastCommittedOpTime = {{9999, 0}, lastFetched.opTime.getTerm()};
enqueueDocumentsFn = [this](Fetcher::Documents::const_iterator begin,
Fetcher::Documents::const_iterator end,
const OplogFetcher::DocumentsInfo& info,
Milliseconds elapsed) {
lastEnqueuedDocuments = {begin, end};
lastEnqueuedDocumentsInfo = info;
lastEnqueuedElapsed = elapsed;
};
}
void OplogFetcherTest::tearDown() {
executor::ThreadPoolExecutorTest::tearDown();
}
RemoteCommandRequest OplogFetcherTest::scheduleNetworkResponse(RemoteCommandResponse response) {
auto net = getNet();
ASSERT_TRUE(net->hasReadyRequests());
Milliseconds millis(0);
executor::TaskExecutor::ResponseStatus responseStatus(response);
auto noi = net->getNextReadyRequest();
net->scheduleResponse(noi, net->now(), responseStatus);
return noi->getRequest();
}
RemoteCommandRequest OplogFetcherTest::processNetworkResponse(
RemoteCommandResponse response, bool expectReadyRequestsAfterProcessing) {
auto net = getNet();
net->enterNetwork();
auto request = scheduleNetworkResponse(response);
net->runReadyNetworkOperations();
ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests());
net->exitNetwork();
return request;
}
RemoteCommandRequest OplogFetcherTest::processNetworkResponse(
BSONObj obj, bool expectReadyRequestsAfterProcessing) {
auto net = getNet();
net->enterNetwork();
auto request = scheduleNetworkResponse({obj, rpc::makeEmptyMetadata(), Milliseconds(0)});
net->runReadyNetworkOperations();
ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests());
net->exitNetwork();
return request;
}
HostAndPort source("localhost:12345");
NamespaceString nss("local.oplog.rs");
ReplicaSetConfig _createConfig(bool isV1ElectionProtocol) {
BSONObjBuilder bob;
bob.append("_id", "myset");
bob.append("version", 1);
if (isV1ElectionProtocol) {
bob.append("protocolVersion", 1);
}
{
BSONArrayBuilder membersBob(bob.subarrayStart("members"));
BSONObjBuilder(membersBob.subobjStart())
.appendElements(BSON("_id" << 0 << "host" << source.toString()));
}
{
BSONObjBuilder settingsBob(bob.subobjStart("settings"));
settingsBob.append("electionTimeoutMillis", 10000);
}
auto configObj = bob.obj();
ReplicaSetConfig config;
ASSERT_OK(config.initialize(configObj));
return config;
}
std::unique_ptr OplogFetcherTest::processSingleBatch(
RemoteCommandResponse response) {
auto shutdownState = stdx::make_unique();
OplogFetcher oplogFetcher(&getExecutor(),
lastFetched,
source,
nss,
_createConfig(true),
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
stdx::ref(*shutdownState));
ASSERT_FALSE(oplogFetcher.isActive());
ASSERT_OK(oplogFetcher.startup());
ASSERT_TRUE(oplogFetcher.isActive());
auto request = processNetworkResponse(response);
ASSERT_EQUALS(oplogFetcher.getCommandObject_forTest(), request.cmdObj);
ASSERT_EQUALS(oplogFetcher.getMetadataObject_forTest(), request.metadata);
oplogFetcher.shutdown();
oplogFetcher.join();
return shutdownState;
}
std::unique_ptr OplogFetcherTest::processSingleBatch(BSONObj obj) {
return processSingleBatch({obj, rpc::makeEmptyMetadata(), Milliseconds(0)});
}
TEST_F(OplogFetcherTest, InvalidConstruction) {
// Null start timestamp.
ASSERT_THROWS_CODE_AND_WHAT(OplogFetcher(&getExecutor(),
OpTimeWithHash(),
source,
nss,
_createConfig(true),
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {}),
UserException,
ErrorCodes::BadValue,
"null last optime fetched");
// Null EnqueueDocumentsFn.
ASSERT_THROWS_CODE_AND_WHAT(OplogFetcher(&getExecutor(),
lastFetched,
source,
nss,
_createConfig(true),
dataReplicatorExternalState.get(),
OplogFetcher::EnqueueDocumentsFn(),
[](Status, OpTimeWithHash) {}),
UserException,
ErrorCodes::BadValue,
"null enqueueDocuments function");
// Uninitialized replica set configuration.
ASSERT_THROWS_CODE_AND_WHAT(OplogFetcher(&getExecutor(),
lastFetched,
source,
nss,
ReplicaSetConfig(),
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {}),
UserException,
ErrorCodes::InvalidReplicaSetConfig,
"uninitialized replica set configuration");
// Null OnShutdownCallbackFn.
ASSERT_THROWS_CODE_AND_WHAT(OplogFetcher(&getExecutor(),
lastFetched,
source,
nss,
_createConfig(true),
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
OplogFetcher::OnShutdownCallbackFn()),
UserException,
ErrorCodes::BadValue,
"null onShutdownCallback function");
}
void _checkDefaultCommandObjectFields(BSONObj cmdObj) {
ASSERT_EQUALS(std::string("find"), cmdObj.firstElementFieldName());
ASSERT_TRUE(cmdObj.getBoolField("tailable"));
ASSERT_TRUE(cmdObj.getBoolField("oplogReplay"));
ASSERT_TRUE(cmdObj.getBoolField("awaitData"));
ASSERT_EQUALS(60000, cmdObj.getIntField("maxTimeMS"));
}
TEST_F(
OplogFetcherTest,
CommandObjectContainsTermAndStartTimestampIfGetCurrentTermAndLastCommittedOpTimeReturnsValidTerm) {
auto cmdObj = OplogFetcher(&getExecutor(),
lastFetched,
source,
nss,
_createConfig(true),
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
.getCommandObject_forTest();
ASSERT_EQUALS(mongo::BSONType::Object, cmdObj["filter"].type());
ASSERT_EQUALS(BSON("ts" << BSON("$gte" << lastFetched.opTime.getTimestamp())),
cmdObj["filter"].Obj());
ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, cmdObj["term"].numberLong());
_checkDefaultCommandObjectFields(cmdObj);
}
TEST_F(
OplogFetcherTest,
CommandObjectContainsDoesNotContainTermIfGetCurrentTermAndLastCommittedOpTimeReturnsUninitializedTerm) {
dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm;
auto cmdObj = OplogFetcher(&getExecutor(),
lastFetched,
source,
nss,
_createConfig(true),
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
.getCommandObject_forTest();
ASSERT_EQUALS(mongo::BSONType::Object, cmdObj["filter"].type());
ASSERT_EQUALS(BSON("ts" << BSON("$gte" << lastFetched.opTime.getTimestamp())),
cmdObj["filter"].Obj());
ASSERT_FALSE(cmdObj.hasField("term"));
_checkDefaultCommandObjectFields(cmdObj);
}
TEST_F(OplogFetcherTest, MetadataObjectContainsReplSetMetadataFieldUnderProtocolVersion1) {
auto metadataObj = OplogFetcher(&getExecutor(),
lastFetched,
source,
nss,
_createConfig(true),
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
.getMetadataObject_forTest();
ASSERT_EQUALS(1, metadataObj.nFields());
ASSERT_EQUALS(1, metadataObj[rpc::kReplSetMetadataFieldName].numberInt());
}
TEST_F(OplogFetcherTest, MetadataObjectIsEmptyUnderProtocolVersion0) {
auto metadataObj = OplogFetcher(&getExecutor(),
lastFetched,
source,
nss,
_createConfig(false),
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
.getMetadataObject_forTest();
ASSERT_EQUALS(BSONObj(), metadataObj);
}
TEST_F(OplogFetcherTest, RemoteCommandTimeoutShouldEqualElectionTimeout) {
auto config = _createConfig(true);
auto timeout = OplogFetcher(&getExecutor(),
lastFetched,
source,
nss,
config,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
.getRemoteCommandTimeout_forTest();
ASSERT_EQUALS(config.getElectionTimeoutPeriod(), timeout);
}
TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeoutUnderProtocolVersion1) {
auto config = _createConfig(true);
auto timeout = OplogFetcher(&getExecutor(),
lastFetched,
source,
nss,
config,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
.getAwaitDataTimeout_forTest();
ASSERT_EQUALS(config.getElectionTimeoutPeriod() / 2, timeout);
}
TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldBeAConstantUnderProtocolVersion0) {
auto timeout = OplogFetcher(&getExecutor(),
lastFetched,
source,
nss,
_createConfig(false),
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
.getAwaitDataTimeout_forTest();
ASSERT_EQUALS(OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout, timeout);
}
TEST_F(OplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarting) {
getExecutor().shutdown();
OplogFetcher oplogFetcher(&getExecutor(),
lastFetched,
source,
nss,
_createConfig(true),
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {});
// Last optime and hash fetched should match values passed to constructor.
ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeWithHashFetched());
ASSERT_FALSE(oplogFetcher.isActive());
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, oplogFetcher.startup());
ASSERT_FALSE(oplogFetcher.isActive());
// Last optime and hash fetched should not change.
ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeWithHashFetched());
}
TEST_F(OplogFetcherTest, ShuttingExecutorDownAfterStartupStopsTheOplogFetcher) {
ShutdownState shutdownState;
OplogFetcher oplogFetcher(&getExecutor(),
lastFetched,
source,
nss,
_createConfig(true),
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
stdx::ref(shutdownState));
ASSERT_FALSE(oplogFetcher.isActive());
ASSERT_OK(oplogFetcher.startup());
ASSERT_TRUE(oplogFetcher.isActive());
getExecutor().shutdown();
oplogFetcher.join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
ASSERT_EQUALS(lastFetched, shutdownState.getLastFetched());
}
BSONObj makeNoopOplogEntry(OpTimeWithHash opTimeWithHash) {
BSONObjBuilder bob;
bob.appendElements(opTimeWithHash.opTime.toBSON());
bob.append("h", opTimeWithHash.value);
bob.append("op", "c");
bob.append("ns", "test.t");
return bob.obj();
}
BSONObj makeNoopOplogEntry(OpTime opTime, long long hash) {
return makeNoopOplogEntry({hash, opTime});
}
BSONObj makeNoopOplogEntry(Seconds seconds, long long hash) {
return makeNoopOplogEntry({{seconds, 0}, 1LL}, hash);
}
BSONObj makeCursorResponse(CursorId cursorId,
Fetcher::Documents oplogEntries,
bool isFirstBatch = true) {
BSONObjBuilder bob;
{
BSONObjBuilder cursorBob(bob.subobjStart("cursor"));
cursorBob.append("id", cursorId);
cursorBob.append("ns", nss.toString());
{
BSONArrayBuilder batchBob(
cursorBob.subarrayStart(isFirstBatch ? "firstBatch" : "nextBatch"));
for (auto oplogEntry : oplogEntries) {
batchBob.append(oplogEntry);
}
}
}
bob.append("ok", 1);
return bob.obj();
}
TEST_F(OplogFetcherTest, InvalidMetadataInResponseStopsTheOplogFetcher) {
auto shutdownState = processSingleBatch(
{makeCursorResponse(0, {}),
BSON(rpc::kReplSetMetadataFieldName << BSON("invalid_repl_metadata_field" << 1)),
Milliseconds(0)});
ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus());
}
TEST_F(OplogFetcherTest, VaidMetadataInResponseShouldBeForwardedToProcessMetadataFn) {
rpc::ReplSetMetadata metadata(1, lastFetched.opTime, lastFetched.opTime, 1, OID::gen(), 2, 2);
BSONObjBuilder bob;
ASSERT_OK(metadata.writeToMetadata(&bob));
auto metadataObj = bob.obj();
processSingleBatch(
{makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj, Milliseconds(0)});
ASSERT_EQUALS(metadata.getPrimaryIndex(),
dataReplicatorExternalState->metadataProcessed.getPrimaryIndex());
}
TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithRemoteOplogStaleError) {
ASSERT_EQUALS(ErrorCodes::RemoteOplogStale,
processSingleBatch(makeCursorResponse(0, {}))->getStatus());
}
TEST_F(OplogFetcherTest,
MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) {
ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
processSingleBatch(makeCursorResponse(0, {BSONObj()}))->getStatus());
}
TEST_F(
OplogFetcherTest,
LastOpTimeFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) {
ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
processSingleBatch(
makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}))
->getStatus());
}
TEST_F(OplogFetcherTest,
LastHashFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) {
ASSERT_EQUALS(
ErrorCodes::OplogStartMissing,
processSingleBatch(
makeCursorResponse(0, {makeNoopOplogEntry(lastFetched.opTime, lastFetched.value + 1)}))
->getStatus());
}
TEST_F(OplogFetcherTest,
MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) {
ASSERT_EQUALS(
ErrorCodes::NoSuchKey,
processSingleBatch(makeCursorResponse(0,
{makeNoopOplogEntry(lastFetched),
BSON("o" << BSON("msg"
<< "oplog entry without optime"))}))
->getStatus());
}
TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) {
ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
processSingleBatch(makeCursorResponse(0,
{makeNoopOplogEntry(lastFetched),
makeNoopOplogEntry(Seconds(1000), 1),
makeNoopOplogEntry(Seconds(2000), 1),
makeNoopOplogEntry(Seconds(1500), 1)}))
->getStatus());
}
TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) {
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200);
auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300);
Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry};
Milliseconds elapsed(600);
auto shutdownState =
processSingleBatch({makeCursorResponse(0, documents), rpc::makeEmptyMetadata(), elapsed});
ASSERT_EQUALS(2U, lastEnqueuedDocuments.size());
ASSERT_EQUALS(secondEntry, lastEnqueuedDocuments[0]);
ASSERT_EQUALS(thirdEntry, lastEnqueuedDocuments[1]);
ASSERT_EQUALS(3U, lastEnqueuedDocumentsInfo.networkDocumentCount);
ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()),
lastEnqueuedDocumentsInfo.networkDocumentBytes);
ASSERT_EQUALS(2U, lastEnqueuedDocumentsInfo.toApplyDocumentCount);
ASSERT_EQUALS(size_t(secondEntry.objsize() + thirdEntry.objsize()),
lastEnqueuedDocumentsInfo.toApplyDocumentBytes);
ASSERT_EQUALS(thirdEntry["h"].numberLong(), lastEnqueuedDocumentsInfo.lastDocument.value);
ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)),
lastEnqueuedDocumentsInfo.lastDocument.opTime);
ASSERT_EQUALS(elapsed, lastEnqueuedElapsed);
// The last fetched optime and hash should be updated after pushing the operations into the
// buffer and reflected in the shutdown callback arguments.
ASSERT_OK(shutdownState->getStatus());
ASSERT_EQUALS(OpTimeWithHash(thirdEntry["h"].numberLong(),
unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry))),
shutdownState->getLastFetched());
}
void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* metadata) {
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200);
auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300);
Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry};
BSONObj metadataObj;
if (metadata) {
BSONObjBuilder bob;
ASSERT_OK(metadata->writeToMetadata(&bob));
metadataObj = bob.obj();
}
dataReplicatorExternalState->shouldStopFetchingResult = true;
auto shutdownState =
processSingleBatch({makeCursorResponse(0, documents), metadataObj, Milliseconds(0)});
// Sync source checking happens after we have successfully pushed the operations into
// the buffer for the next replication phase (eg. applier).
// The last fetched optime and hash should be reflected in the shutdown callback
// arguments.
ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState->getStatus());
ASSERT_EQUALS(OpTimeWithHash(thirdEntry["h"].numberLong(),
unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry))),
shutdownState->getLastFetched());
}
TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) {
testSyncSourceChecking(nullptr);
// Sync source optime and "hasSyncSource" are not available if the respone does not
// contain metadata.
ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
ASSERT_EQUALS(OpTime(), dataReplicatorExternalState->syncSourceLastOpTime);
ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource);
}
TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithMetadataStopsTheOplogFetcher) {
rpc::ReplSetMetadata metadata(lastFetched.opTime.getTerm(),
{{Seconds(10000), 0}, 1},
{{Seconds(20000), 0}, 1},
1,
OID::gen(),
2,
2);
testSyncSourceChecking(&metadata);
// Sync source optime and "hasSyncSource" can be set if the respone contains metadata.
ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
ASSERT_EQUALS(metadata.getLastOpVisible(), dataReplicatorExternalState->syncSourceLastOpTime);
ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource);
}
TEST_F(OplogFetcherTest,
FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) {
rpc::ReplSetMetadata metadata(lastFetched.opTime.getTerm(),
{{Seconds(10000), 0}, 1},
{{Seconds(20000), 0}, 1},
1,
OID::gen(),
2,
-1);
testSyncSourceChecking(&metadata);
// Sync source "hasSyncSource" is derived from metadata.
ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
ASSERT_EQUALS(metadata.getLastOpVisible(), dataReplicatorExternalState->syncSourceLastOpTime);
ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource);
}
RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling(bool isV1ElectionProtocol) {
ShutdownState shutdownState;
if (!isV1ElectionProtocol) {
dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm;
}
OplogFetcher oplogFetcher(&getExecutor(),
lastFetched,
source,
nss,
_createConfig(isV1ElectionProtocol),
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
stdx::ref(shutdownState));
ASSERT_OK(oplogFetcher.startup());
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200);
processNetworkResponse(makeCursorResponse(cursorId, {firstEntry, secondEntry}), true);
ASSERT_EQUALS(1U, lastEnqueuedDocuments.size());
ASSERT_EQUALS(secondEntry, lastEnqueuedDocuments[0]);
// Set cursor ID to 0 in getMore response to indicate no more data available.
auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300);
auto fourthEntry = makeNoopOplogEntry({{Seconds(1200), 0}, lastFetched.opTime.getTerm()}, 300);
auto request = processNetworkResponse(makeCursorResponse(0, {thirdEntry, fourthEntry}, false));
ASSERT_EQUALS(std::string("getMore"), request.cmdObj.firstElementFieldName());
ASSERT_EQUALS(nss.coll(), request.cmdObj["collection"].String());
ASSERT_EQUALS(int(durationCount(oplogFetcher.getAwaitDataTimeout_forTest())),
request.cmdObj.getIntField("maxTimeMS"));
ASSERT_EQUALS(2U, lastEnqueuedDocuments.size());
ASSERT_EQUALS(thirdEntry, lastEnqueuedDocuments[0]);
ASSERT_EQUALS(fourthEntry, lastEnqueuedDocuments[1]);
oplogFetcher.shutdown();
oplogFetcher.join();
ASSERT_OK(shutdownState.getStatus());
ASSERT_EQUALS(OpTimeWithHash(fourthEntry["h"].numberLong(),
unittest::assertGet(OpTime::parseFromOplogEntry(fourthEntry))),
shutdownState.getLastFetched());
return request;
}
TEST_F(
OplogFetcherTest,
NoDataAvailableAfterFirstTwoBatchesShouldCauseTheOplogFetcherToShutDownWithSuccessfulStatus) {
auto request = testTwoBatchHandling(true);
ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, request.cmdObj["term"].numberLong());
ASSERT_EQUALS(dataReplicatorExternalState->lastCommittedOpTime,
unittest::assertGet(OpTime::parseFromOplogEntry(
request.cmdObj["lastKnownCommittedOpTime"].Obj())));
}
TEST_F(OplogFetcherTest,
GetMoreRequestUnderProtocolVersionZeroDoesNotIncludeTermOrLastKnownCommittedOpTime) {
auto request = testTwoBatchHandling(false);
ASSERT_FALSE(request.cmdObj.hasField("term"));
ASSERT_FALSE(request.cmdObj.hasField("lastKnownCommittedOpTime"));
}
TEST_F(OplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) {
auto firstEntry = makeNoopOplogEntry(Seconds(123), 100);
auto secondEntry = BSON("o" << BSON("msg"
<< "oplog entry without optime"));
ASSERT_EQUALS(ErrorCodes::NoSuchKey,
OplogFetcher::validateDocuments(
{firstEntry, secondEntry},
true,
unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
.getStatus());
}
TEST_F(
OplogFetcherTest,
ValidateDocumentsReturnsOutOfOrderIfTimestampInFirstEntryIsEqualToLastTimestampAndNotProcessingFirstBatch) {
auto firstEntry = makeNoopOplogEntry(Seconds(123), 100);
auto secondEntry = makeNoopOplogEntry(Seconds(456), 200);
ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
OplogFetcher::validateDocuments(
{firstEntry, secondEntry},
false,
unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
.getStatus());
}
TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInSecondEntryIsBeforeFirst) {
auto firstEntry = makeNoopOplogEntry(Seconds(456), 100);
auto secondEntry = makeNoopOplogEntry(Seconds(123), 200);
ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
OplogFetcher::validateDocuments(
{firstEntry, secondEntry},
true,
unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
.getStatus());
}
TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInThirdEntryIsBeforeSecond) {
auto firstEntry = makeNoopOplogEntry(Seconds(123), 100);
auto secondEntry = makeNoopOplogEntry(Seconds(789), 200);
auto thirdEntry = makeNoopOplogEntry(Seconds(456), 300);
ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
OplogFetcher::validateDocuments(
{firstEntry, secondEntry, thirdEntry},
true,
unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
.getStatus());
}
TEST_F(OplogFetcherTest,
ValidateDocumentsExcludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatch) {
auto firstEntry = makeNoopOplogEntry(Seconds(123), 100);
auto secondEntry = makeNoopOplogEntry(Seconds(456), 200);
auto thirdEntry = makeNoopOplogEntry(Seconds(789), 300);
auto info = unittest::assertGet(OplogFetcher::validateDocuments(
{firstEntry, secondEntry, thirdEntry},
true,
unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()));
ASSERT_EQUALS(3U, info.networkDocumentCount);
ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()),
info.networkDocumentBytes);
ASSERT_EQUALS(300LL, info.lastDocument.value);
ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)),
info.lastDocument.opTime);
}
TEST_F(OplogFetcherTest,
ValidateDocumentsIncludesFirstDocumentInApplyCountAndBytesIfNotProcessingFirstBatch) {
auto firstEntry = makeNoopOplogEntry(Seconds(123), 100);
auto secondEntry = makeNoopOplogEntry(Seconds(456), 200);
auto thirdEntry = makeNoopOplogEntry(Seconds(789), 300);
auto info = unittest::assertGet(OplogFetcher::validateDocuments(
{firstEntry, secondEntry, thirdEntry}, false, Timestamp(Seconds(100), 0)));
ASSERT_EQUALS(3U, info.networkDocumentCount);
ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()),
info.networkDocumentBytes);
ASSERT_EQUALS(info.networkDocumentCount, info.toApplyDocumentCount);
ASSERT_EQUALS(info.networkDocumentBytes, info.toApplyDocumentBytes);
ASSERT_EQUALS(300LL, info.lastDocument.value);
ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)),
info.lastDocument.opTime);
}
TEST_F(OplogFetcherTest,
ValidateDocumentsReturnsDefaultLastDocumentHashAndOpTimeWhenThereAreNoDocumentsToApply) {
auto firstEntry = makeNoopOplogEntry(Seconds(123), 100);
auto info = unittest::assertGet(OplogFetcher::validateDocuments(
{firstEntry},
true,
unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()));
ASSERT_EQUALS(1U, info.networkDocumentCount);
ASSERT_EQUALS(size_t(firstEntry.objsize()), info.networkDocumentBytes);
ASSERT_EQUALS(0U, info.toApplyDocumentCount);
ASSERT_EQUALS(0U, info.toApplyDocumentBytes);
ASSERT_EQUALS(0LL, info.lastDocument.value);
ASSERT_EQUALS(OpTime(), info.lastDocument.opTime);
}
TEST_F(OplogFetcherTest,
ValidateDocumentsReturnsOplogStartMissingWhenThereAreNoDocumentsWhenProcessingFirstBatch) {
ASSERT_EQUALS(
ErrorCodes::OplogStartMissing,
OplogFetcher::validateDocuments({}, true, Timestamp(Seconds(100), 0)).getStatus());
}
TEST_F(OplogFetcherTest,
ValidateDocumentsReturnsDefaultInfoWhenThereAreNoDocumentsWhenNotProcessingFirstBatch) {
auto info =
unittest::assertGet(OplogFetcher::validateDocuments({}, false, Timestamp(Seconds(100), 0)));
ASSERT_EQUALS(0U, info.networkDocumentCount);
ASSERT_EQUALS(0U, info.networkDocumentBytes);
ASSERT_EQUALS(0U, info.toApplyDocumentCount);
ASSERT_EQUALS(0U, info.toApplyDocumentBytes);
ASSERT_EQUALS(0LL, info.lastDocument.value);
ASSERT_EQUALS(OpTime(), info.lastDocument.opTime);
}
} // namespace