/**
* Copyright 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/repl/abstract_oplog_fetcher.h"
#include "mongo/db/repl/abstract_oplog_fetcher_test_fixture.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/task_executor_mock.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/task_executor_proxy.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/scopeguard.h"
namespace {
using namespace mongo;
using namespace mongo::repl;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard;
HostAndPort source("localhost:12345");
NamespaceString nss("local.oplog.rs");
/**
* This class is the minimal implementation of an oplog fetcher. It has the simplest `find` command
* possible, no metadata, and the _onSuccessfulBatch function simply returns a `getMore` command
* on the fetcher's cursor.
*/
class MockOplogFetcher : public AbstractOplogFetcher {
public:
explicit MockOplogFetcher(executor::TaskExecutor* executor,
OpTimeWithHash lastFetched,
HostAndPort source,
NamespaceString nss,
std::size_t maxFetcherRestarts,
OnShutdownCallbackFn onShutdownCallbackFn);
private:
BSONObj _makeFindCommandObject(const NamespaceString& nss,
OpTime lastOpTimeFetched) const override;
BSONObj _makeMetadataObject() const override;
StatusWith _onSuccessfulBatch(const Fetcher::QueryResponse& queryResponse) override;
};
MockOplogFetcher::MockOplogFetcher(executor::TaskExecutor* executor,
OpTimeWithHash lastFetched,
HostAndPort source,
NamespaceString nss,
std::size_t maxFetcherRestarts,
OnShutdownCallbackFn onShutdownCallbackFn)
: AbstractOplogFetcher(executor,
lastFetched,
source,
nss,
maxFetcherRestarts,
onShutdownCallbackFn,
"mock oplog fetcher") {}
BSONObj MockOplogFetcher::_makeFindCommandObject(const NamespaceString& nss,
OpTime lastOpTimeFetched) const {
BSONObjBuilder cmdBob;
cmdBob.append("find", nss.coll());
cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp())));
return cmdBob.obj();
}
BSONObj MockOplogFetcher::_makeMetadataObject() const {
return BSONObj();
}
StatusWith MockOplogFetcher::_onSuccessfulBatch(
const Fetcher::QueryResponse& queryResponse) {
BSONObjBuilder cmdBob;
cmdBob.append("getMore", queryResponse.cursorId);
cmdBob.append("collection", _getNamespace().coll());
return cmdBob.obj();
}
TEST_F(AbstractOplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarting) {
getExecutor().shutdown();
MockOplogFetcher oplogFetcher(&getExecutor(), lastFetched, source, nss, 0, [](Status) {});
// Last optime and hash fetched should match values passed to constructor.
ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeWithHashFetched_forTest());
ASSERT_FALSE(oplogFetcher.isActive());
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, oplogFetcher.startup());
ASSERT_FALSE(oplogFetcher.isActive());
// Last optime and hash fetched should not change.
ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeWithHashFetched_forTest());
}
TEST_F(AbstractOplogFetcherTest, StartupReturnsOperationFailedIfExecutorFailsToScheduleFetcher) {
ShutdownState shutdownState;
TaskExecutorMock taskExecutorMock(&getExecutor());
taskExecutorMock.shouldFailScheduleWorkRequest = []() { return true; };
MockOplogFetcher oplogFetcher(
&taskExecutorMock, lastFetched, source, nss, 0, stdx::ref(shutdownState));
ASSERT_EQUALS(ErrorCodes::OperationFailed, oplogFetcher.startup());
}
TEST_F(AbstractOplogFetcherTest, OplogFetcherReturnsOperationFailedIfExecutorFailsToScheduleFind) {
ShutdownState shutdownState;
TaskExecutorMock taskExecutorMock(&getExecutor());
taskExecutorMock.shouldFailScheduleRemoteCommandRequest =
[](const executor::RemoteCommandRequest&) { return true; };
MockOplogFetcher oplogFetcher(
&taskExecutorMock, lastFetched, source, nss, 0, stdx::ref(shutdownState));
ASSERT_FALSE(oplogFetcher.isActive());
ASSERT_OK(oplogFetcher.startup());
ASSERT_TRUE(oplogFetcher.isActive());
oplogFetcher.join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, shutdownState.getStatus());
}
TEST_F(AbstractOplogFetcherTest, ShuttingExecutorDownAfterStartupStopsTheOplogFetcher) {
ShutdownState shutdownState;
TaskExecutorMock taskExecutorMock(&getExecutor());
taskExecutorMock.shouldDeferScheduleWorkRequestByOneSecond = []() { return true; };
MockOplogFetcher oplogFetcher(
&taskExecutorMock, lastFetched, source, nss, 0, stdx::ref(shutdownState));
ASSERT_FALSE(oplogFetcher.isActive());
ASSERT_OK(oplogFetcher.startup());
ASSERT_TRUE(oplogFetcher.isActive());
getExecutor().shutdown();
oplogFetcher.join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
}
TEST_F(AbstractOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterStartup) {
ShutdownState shutdownState;
TaskExecutorMock taskExecutorMock(&getExecutor());
taskExecutorMock.shouldDeferScheduleWorkRequestByOneSecond = []() { return true; };
MockOplogFetcher oplogFetcher(
&taskExecutorMock, lastFetched, source, nss, 0, stdx::ref(shutdownState));
ASSERT_FALSE(oplogFetcher.isActive());
ASSERT_OK(oplogFetcher.startup());
ASSERT_TRUE(oplogFetcher.isActive());
oplogFetcher.shutdown();
oplogFetcher.join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
}
long long _getHash(const BSONObj& oplogEntry) {
return oplogEntry["h"].numberLong();
}
Timestamp _getTimestamp(const BSONObj& oplogEntry) {
return OplogEntry(oplogEntry).getOpTime().getTimestamp();
}
OpTimeWithHash _getOpTimeWithHash(const BSONObj& oplogEntry) {
return {_getHash(oplogEntry), OplogEntry(oplogEntry).getOpTime()};
}
std::vector _generateOplogEntries(std::size_t size) {
std::vector ops(size);
for (std::size_t i = 0; i < size; ++i) {
ops[i] = AbstractOplogFetcherTest::makeNoopOplogEntry(Seconds(100 + int(i)), 123LL);
}
return ops;
}
void _assertFindCommandTimestampEquals(const Timestamp& timestamp,
const RemoteCommandRequest& request) {
executor::TaskExecutorTest::assertRemoteCommandNameEquals("find", request);
ASSERT_EQUALS(timestamp, request.cmdObj["filter"].Obj()["ts"].Obj()["$gte"].timestamp());
}
void _assertFindCommandTimestampEquals(const BSONObj& oplogEntry,
const RemoteCommandRequest& request) {
_assertFindCommandTimestampEquals(_getTimestamp(oplogEntry), request);
}
TEST_F(AbstractOplogFetcherTest,
OplogFetcherCreatesNewFetcherOnCallbackErrorDuringGetMoreNumberOne) {
auto ops = _generateOplogEntries(5U);
std::size_t maxFetcherRestarts = 1U;
auto shutdownState = stdx::make_unique();
MockOplogFetcher oplogFetcher(&getExecutor(),
_getOpTimeWithHash(ops[0]),
source,
nss,
maxFetcherRestarts,
stdx::ref(*shutdownState));
ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
ASSERT_OK(oplogFetcher.startup());
// Send first batch from FIND.
_assertFindCommandTimestampEquals(
ops[0], processNetworkResponse({makeCursorResponse(1, {ops[0], ops[1], ops[2]})}, true));
// Send error during GETMORE.
processNetworkResponse({ErrorCodes::CursorNotFound, "cursor not found"}, true);
// Send first batch from FIND, and Check that it started from the end of the last FIND response.
// Check that the optimes match for the query and last oplog entry.
_assertFindCommandTimestampEquals(
ops[2], processNetworkResponse({makeCursorResponse(0, {ops[2], ops[3], ops[4]})}, false));
// Done.
oplogFetcher.join();
ASSERT_OK(shutdownState->getStatus());
}
TEST_F(AbstractOplogFetcherTest, OplogFetcherStopsRestartingFetcherIfRestartLimitIsReached) {
auto ops = _generateOplogEntries(3U);
std::size_t maxFetcherRestarts = 2U;
auto shutdownState = stdx::make_unique();
MockOplogFetcher oplogFetcher(&getExecutor(),
_getOpTimeWithHash(ops[0]),
source,
nss,
maxFetcherRestarts,
stdx::ref(*shutdownState));
ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
ASSERT_OK(oplogFetcher.startup());
unittest::log() << "processing find request from first fetcher";
_assertFindCommandTimestampEquals(
ops[0], processNetworkResponse({makeCursorResponse(1, {ops[0], ops[1], ops[2]})}, true));
unittest::log() << "sending error response to getMore request from first fetcher";
assertRemoteCommandNameEquals(
"getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "fail 1"}, true));
unittest::log() << "sending error response to find request from second fetcher";
_assertFindCommandTimestampEquals(
ops[2], processNetworkResponse({ErrorCodes::IllegalOperation, "fail 2"}, true));
unittest::log() << "sending error response to find request from third fetcher";
_assertFindCommandTimestampEquals(
ops[2], processNetworkResponse({ErrorCodes::OperationFailed, "fail 3"}, false));
oplogFetcher.join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, shutdownState->getStatus());
}
TEST_F(AbstractOplogFetcherTest, OplogFetcherResetsRestartCounterOnSuccessfulFetcherResponse) {
auto ops = _generateOplogEntries(5U);
std::size_t maxFetcherRestarts = 2U;
auto shutdownState = stdx::make_unique();
MockOplogFetcher oplogFetcher(&getExecutor(),
_getOpTimeWithHash(ops[0]),
source,
nss,
maxFetcherRestarts,
stdx::ref(*shutdownState));
ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
ASSERT_OK(oplogFetcher.startup());
unittest::log() << "processing find request from first fetcher";
_assertFindCommandTimestampEquals(
ops[0], processNetworkResponse({makeCursorResponse(1, {ops[0], ops[1], ops[2]})}, true));
unittest::log() << "sending error response to getMore request from first fetcher";
assertRemoteCommandNameEquals(
"getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "fail 1"}, true));
unittest::log() << "processing find request from second fetcher";
_assertFindCommandTimestampEquals(
ops[2], processNetworkResponse({makeCursorResponse(1, {ops[2], ops[3], ops[4]})}, true));
unittest::log() << "sending error response to getMore request from second fetcher";
assertRemoteCommandNameEquals(
"getMore", processNetworkResponse({ErrorCodes::IllegalOperation, "fail 2"}, true));
unittest::log() << "sending error response to find request from third fetcher";
_assertFindCommandTimestampEquals(
ops[4], processNetworkResponse({ErrorCodes::InternalError, "fail 3"}, true));
unittest::log() << "sending error response to find request from fourth fetcher";
_assertFindCommandTimestampEquals(
ops[4], processNetworkResponse({ErrorCodes::OperationFailed, "fail 4"}, false));
oplogFetcher.join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, shutdownState->getStatus());
}
class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy {
public:
using ShouldFailRequestFn = stdx::function;
TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor,
ShouldFailRequestFn shouldFailRequest)
: unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
StatusWith scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb) override {
if (_shouldFailRequest(request)) {
return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
}
return getExecutor()->scheduleRemoteCommand(request, cb);
}
private:
ShouldFailRequestFn _shouldFailRequest;
};
TEST_F(AbstractOplogFetcherTest,
OplogFetcherAbortsWithOriginalResponseErrorOnFailureToScheduleNewFetcher) {
auto ops = _generateOplogEntries(3U);
std::size_t maxFetcherRestarts = 2U;
auto shutdownState = stdx::make_unique();
bool shouldFailSchedule = false;
TaskExecutorWithFailureInScheduleRemoteCommand _executorProxy(
&getExecutor(), [&shouldFailSchedule](const executor::RemoteCommandRequest& request) {
return shouldFailSchedule;
});
MockOplogFetcher oplogFetcher(&_executorProxy,
_getOpTimeWithHash(ops[0]),
source,
nss,
maxFetcherRestarts,
stdx::ref(*shutdownState));
ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
ASSERT_OK(oplogFetcher.startup());
ASSERT_TRUE(oplogFetcher.isActive());
unittest::log() << "processing find request from first fetcher";
_assertFindCommandTimestampEquals(
ops[0], processNetworkResponse({makeCursorResponse(1, {ops[0], ops[1], ops[2]})}, true));
unittest::log() << "sending error response to getMore request from first fetcher";
shouldFailSchedule = true;
assertRemoteCommandNameEquals(
"getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "dead cursor"}, false));
oplogFetcher.join();
// Status in shutdown callback should match error for dead cursor instead of error from failed
// schedule request.
ASSERT_EQUALS(ErrorCodes::CappedPositionLost, shutdownState->getStatus());
}
bool sharedCallbackStateDestroyed = false;
class SharedCallbackState {
MONGO_DISALLOW_COPYING(SharedCallbackState);
public:
SharedCallbackState() {}
~SharedCallbackState() {
sharedCallbackStateDestroyed = true;
}
};
TEST_F(AbstractOplogFetcherTest, OplogFetcherResetsOnShutdownCallbackFunctionOnCompletion) {
auto sharedCallbackData = std::make_shared();
auto callbackInvoked = false;
auto status = getDetectableErrorStatus();
MockOplogFetcher oplogFetcher(
&getExecutor(),
lastFetched,
source,
nss,
0,
[&callbackInvoked, sharedCallbackData, &status](const Status& shutdownStatus) {
status = shutdownStatus, callbackInvoked = true;
});
ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
ASSERT_FALSE(oplogFetcher.isActive());
ASSERT_OK(oplogFetcher.startup());
ASSERT_TRUE(oplogFetcher.isActive());
sharedCallbackData.reset();
ASSERT_FALSE(sharedCallbackStateDestroyed);
processNetworkResponse({ErrorCodes::OperationFailed, "oplog tailing query failed"}, false);
oplogFetcher.join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
// Oplog fetcher should reset 'OplogFetcher::_onShutdownCallbackFn' after running callback
// function before becoming inactive.
// This ensures that we release resources associated with
// 'OplogFetcher::_onShutdownCallbackFn'.
ASSERT_TRUE(callbackInvoked);
ASSERT_TRUE(sharedCallbackStateDestroyed);
}
} // namespace