summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-09-23 14:43:39 -0400
committerBenety Goh <benety@mongodb.com>2016-09-26 16:13:25 -0400
commit9a4693efbe83f3c07c751dd608dec69829978ee6 (patch)
tree2c0f2181d4e7d4d306851ab4067921b43589d549
parentaa9ea14a57181f098a32f24a734f2c81563bee5c (diff)
downloadmongo-9a4693efbe83f3c07c751dd608dec69829978ee6.tar.gz
SERVER-25702 added support to OplogFetcher for restarting oplog query
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp138
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h35
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp257
4 files changed, 399 insertions, 32 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index c90a311c033..2ade8283ac6 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -721,6 +721,7 @@ env.CppUnitTest(
LIBDEPS=[
'oplog_fetcher',
'data_replicator_external_state_mock',
+ '$BUILD_DIR/mongo/unittest/task_executor_proxy',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
],
)
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index afb48bedde9..7ea9dd5e196 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -262,14 +262,12 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
_onShutdownCallbackFn(onShutdownCallbackFn),
_lastFetched(lastFetched),
_fetcher(_makeFetcher(_lastFetched.opTime)) {
- uassert(ErrorCodes::BadValue, "null last optime fetched", !lastFetched.opTime.isNull());
+ uassert(ErrorCodes::BadValue, "null last optime fetched", !_lastFetched.opTime.isNull());
uassert(ErrorCodes::InvalidReplicaSetConfig,
"uninitialized replica set configuration",
config.isInitialized());
uassert(ErrorCodes::BadValue, "null enqueueDocuments function", enqueueDocumentsFn);
uassert(ErrorCodes::BadValue, "null onShutdownCallback function", onShutdownCallbackFn);
-
- readersCreatedStats.increment();
}
OplogFetcher::~OplogFetcher() {
@@ -284,19 +282,40 @@ std::string OplogFetcher::toString() const {
}
bool OplogFetcher::isActive() const {
- return _fetcher->isActive();
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _active;
}
Status OplogFetcher::startup() {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (_active) {
+ return Status(ErrorCodes::IllegalOperation, "oplog fetcher already active");
+ }
+ if (_inShutdown) {
+ return Status(ErrorCodes::ShutdownInProgress, "oplog fetcher shutting down");
+ }
+
+ auto status = _scheduleFetcher_inlock();
+ if (status.isOK()) {
+ _active = true;
+ }
+ return status;
+}
+
+Status OplogFetcher::_scheduleFetcher_inlock() {
+ readersCreatedStats.increment();
return _fetcher->schedule();
}
void OplogFetcher::shutdown() {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _inShutdown = true;
_fetcher->shutdown();
}
void OplogFetcher::join() {
- _fetcher->join();
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _condition.wait(lock, [this]() { return !_active; });
}
OpTimeWithHash OplogFetcher::getLastOpTimeWithHashFetched() const {
@@ -305,6 +324,7 @@ OpTimeWithHash OplogFetcher::getLastOpTimeWithHashFetched() const {
}
BSONObj OplogFetcher::getCommandObject_forTest() const {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
return _fetcher->getCommandObject();
}
@@ -320,18 +340,71 @@ Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const {
return _awaitDataTimeout;
}
+bool OplogFetcher::inShutdown_forTest() const {
+ return _isInShutdown();
+}
+
void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
BSONObjBuilder* getMoreBob) {
- // if target cut connections between connecting and querying (for
- // example, because it stepped down) we might not have a cursor
- if (!result.isOK()) {
- LOG(1) << "Error returned from oplog query: " << redact(result.getStatus());
- _onShutdown(result.getStatus());
+ const auto& responseStatus = result.getStatus();
+ if (ErrorCodes::CallbackCanceled == responseStatus) {
+ LOG(1) << "oplog query cancelled";
+ _finishCallback(responseStatus);
+ return;
+ }
+
+ // If target cut connections between connecting and querying (for
+ // example, because it stepped down) we might not have a cursor.
+ if (!responseStatus.isOK()) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (_inShutdown) {
+ log() << "Error returned from oplog query while canceling query: "
+ << redact(responseStatus);
+ } else if (_fetcherRestarts == _maxFetcherRestarts) {
+ log() << "Error returned from oplog query (no more query restarts left): "
+ << redact(responseStatus);
+ } else {
+ log() << "Restarting oplog query due to error: " << redact(responseStatus)
+ << ". Last fetched optime (with hash): " << _lastFetched
+ << ". Restarts remaining: " << (_maxFetcherRestarts - _fetcherRestarts);
+ _fetcherRestarts++;
+ // Destroying current instance in _shuttingDownFetcher will possibly block.
+ _shuttingDownFetcher.reset();
+ // Move the old fetcher into the shutting down instance.
+ _shuttingDownFetcher.swap(_fetcher);
+ // Create and start fetcher with new starting optime.
+ _fetcher = _makeFetcher(_lastFetched.opTime);
+ auto scheduleStatus = _scheduleFetcher_inlock();
+ if (scheduleStatus.isOK()) {
+ log() << "Scheduled new oplog query " << _fetcher->toString();
+ return;
+ }
+ error() << "Error scheduling new oplog query: " << redact(scheduleStatus)
+ << ". Returning current oplog query error: " << redact(responseStatus);
+ }
+ }
+ _finishCallback(responseStatus);
+ return;
+ }
+
+ // Reset fetcher restart counter on successful response.
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_active);
+ _fetcherRestarts = 0;
+ }
+
+ if (_isInShutdown()) {
+ _finishCallback(Status(ErrorCodes::CallbackCanceled, "oplog fetcher shutting down"));
return;
}
// Stop fetching and return immediately on fail point.
+ // This fail point is intended to make the oplog fetcher ignore the downloaded batch of
+ // operations and not error out.
if (MONGO_FAIL_POINT(stopOplogFetcher)) {
+ _finishCallback(Status::OK());
return;
}
@@ -347,7 +420,7 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
if (!metadataResult.isOK()) {
error() << "invalid replication metadata from sync source " << _fetcher->getSource()
<< ": " << metadataResult.getStatus() << ": " << metadataObj;
- _onShutdown(metadataResult.getStatus());
+ _finishCallback(metadataResult.getStatus());
return;
}
metadata = metadataResult.getValue();
@@ -372,7 +445,7 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
auto status = checkRemoteOplogStart(documents, opTimeWithHash);
if (!status.isOK()) {
// Stop oplog fetcher and execute rollback.
- _onShutdown(status, opTimeWithHash);
+ _finishCallback(status, opTimeWithHash);
return;
}
@@ -385,7 +458,7 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
auto validateResult = OplogFetcher::validateDocuments(
documents, queryResponse.first, opTimeWithHash.opTime.getTimestamp());
if (!validateResult.isOK()) {
- _onShutdown(validateResult.getStatus(), opTimeWithHash);
+ _finishCallback(validateResult.getStatus(), opTimeWithHash);
return;
}
auto info = validateResult.getValue();
@@ -411,23 +484,23 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
}
if (_dataReplicatorExternalState->shouldStopFetching(_fetcher->getSource(), metadata)) {
- _onShutdown(Status(ErrorCodes::InvalidSyncSource,
- str::stream() << "sync source " << _fetcher->getSource().toString()
- << " (last optime: "
- << metadata.getLastOpVisible().toString()
- << "; sync source index: "
- << metadata.getSyncSourceIndex()
- << "; primary index: "
- << metadata.getPrimaryIndex()
- << ") is no longer valid"),
- opTimeWithHash);
+ _finishCallback(Status(ErrorCodes::InvalidSyncSource,
+ str::stream() << "sync source " << _fetcher->getSource().toString()
+ << " (last optime: "
+ << metadata.getLastOpVisible().toString()
+ << "; sync source index: "
+ << metadata.getSyncSourceIndex()
+ << "; primary index: "
+ << metadata.getPrimaryIndex()
+ << ") is no longer valid"),
+ opTimeWithHash);
return;
}
// No more data. Stop processing and return Status::OK along with last
// fetch info.
if (!getMoreBob) {
- _onShutdown(Status::OK(), opTimeWithHash);
+ _finishCallback(Status::OK(), opTimeWithHash);
return;
}
@@ -437,12 +510,18 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
_awaitDataTimeout));
}
-void OplogFetcher::_onShutdown(Status status) {
- _onShutdown(status, getLastOpTimeWithHashFetched());
+void OplogFetcher::_finishCallback(Status status) {
+ _finishCallback(status, getLastOpTimeWithHashFetched());
}
-void OplogFetcher::_onShutdown(Status status, OpTimeWithHash opTimeWithHash) {
+void OplogFetcher::_finishCallback(Status status, OpTimeWithHash opTimeWithHash) {
+ invariant(isActive());
+
_onShutdownCallbackFn(status, opTimeWithHash);
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _active = false;
+ _condition.notify_all();
}
std::unique_ptr<Fetcher> OplogFetcher::_makeFetcher(OpTime lastFetchedOpTime) {
@@ -456,5 +535,10 @@ std::unique_ptr<Fetcher> OplogFetcher::_makeFetcher(OpTime lastFetchedOpTime) {
_remoteCommandTimeout);
}
+bool OplogFetcher::_isInShutdown() const {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _inShutdown;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index e7414c0467c..8d17b6a71d1 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -39,7 +39,9 @@
#include "mongo/db/repl/data_replicator_external_state.h"
#include "mongo/db/repl/optime_with.h"
#include "mongo/db/repl/replica_set_config.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
+#include "mongo/stdx/mutex.h"
namespace mongo {
namespace repl {
@@ -188,8 +190,20 @@ public:
*/
Milliseconds getAwaitDataTimeout_forTest() const;
+ /**
+ * Returns whether the oplog fetcher is in shutdown.
+ *
+ * For testing only.
+ */
+ bool inShutdown_forTest() const;
+
private:
/**
+ * Schedules fetcher and updates counters.
+ */
+ Status _scheduleFetcher_inlock();
+
+ /**
* Processes each batch of results from the tailable cursor started by the fetcher on the sync
* source.
*
@@ -202,17 +216,24 @@ private:
* Notifies caller that the oplog fetcher has completed processing operations from
* the remote oplog.
*/
- void _onShutdown(Status status);
- void _onShutdown(Status status, OpTimeWithHash opTimeWithHash);
+ void _finishCallback(Status status);
+ void _finishCallback(Status status, OpTimeWithHash opTimeWithHash);
/**
* Creates a new instance of the fetcher to tail the remote oplog starting at the given optime.
*/
std::unique_ptr<Fetcher> _makeFetcher(OpTime lastFetchedOpTime);
+ /**
+ * Returns whether the oplog fetcher is in shutdown.
+ */
+ bool _isInShutdown() const;
+
// Protects member data of this OplogFetcher.
mutable stdx::mutex _mutex;
+ mutable stdx::condition_variable _condition;
+
executor::TaskExecutor* const _executor;
const HostAndPort _source;
const NamespaceString _nss;
@@ -232,7 +253,17 @@ private:
// "_enqueueDocumentsFn".
OpTimeWithHash _lastFetched;
+ // _active is true when a fetcher is scheduled to be run by the executor.
+ bool _active = false;
+
+ // _inShutdown is true after shutdown() is called.
+ bool _inShutdown = false;
+
+ // Fetcher restarts since the last successful oplog query response.
+ std::size_t _fetcherRestarts = 0;
+
std::unique_ptr<Fetcher> _fetcher;
+ std::unique_ptr<Fetcher> _shuttingDownFetcher;
};
} // namespace repl
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 0399cc32dd3..d513e452ff8 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -38,15 +38,19 @@
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/stdx/memory.h"
+#include "mongo/unittest/task_executor_proxy.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/scopeguard.h"
namespace {
using namespace mongo;
using namespace mongo::repl;
+using namespace unittest;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
+using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard;
class ShutdownState {
MONGO_DISALLOW_COPYING(ShutdownState);
@@ -148,12 +152,16 @@ void OplogFetcherTest::tearDown() {
RemoteCommandRequest OplogFetcherTest::processNetworkResponse(
RemoteCommandResponse response, bool expectReadyRequestsAfterProcessing) {
+
auto net = getNet();
- net->enterNetwork();
+ NetworkGuard guard(net);
+ log() << "scheduling response.";
auto request = net->scheduleSuccessfulResponse(response);
+ log() << "running network ops.";
net->runReadyNetworkOperations();
+ log() << "checking for more requests";
ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests());
- net->exitNetwork();
+ log() << "returning consumed request";
return request;
}
@@ -280,6 +288,38 @@ TEST_F(OplogFetcherTest, InvalidConstruction) {
"null onShutdownCallback function");
}
+TEST_F(OplogFetcherTest, StartupWhenActiveReturnsIllegalOperation) {
+ OplogFetcher oplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ _createConfig(true),
+ 0,
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ [](Status, OpTimeWithHash) {});
+ ASSERT_OK(oplogFetcher.startup());
+ ASSERT_TRUE(oplogFetcher.isActive());
+ auto status = oplogFetcher.startup();
+ getExecutor().shutdown();
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation, status);
+ ASSERT_STRING_CONTAINS(status.reason(), "oplog fetcher already active");
+}
+
+TEST_F(OplogFetcherTest, StartupWhenShuttingDownReturnsShutdownInProgress) {
+ OplogFetcher oplogFetcher(&getExecutor(),
+ lastFetched,
+ source,
+ nss,
+ _createConfig(true),
+ 0,
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ [](Status, OpTimeWithHash) {});
+ oplogFetcher.shutdown();
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, oplogFetcher.startup());
+}
+
void _checkDefaultCommandObjectFields(BSONObj cmdObj) {
ASSERT_EQUALS(std::string("find"), cmdObj.firstElementFieldName());
ASSERT_TRUE(cmdObj.getBoolField("tailable"));
@@ -624,7 +664,7 @@ void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* metadata) {
TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) {
testSyncSourceChecking(nullptr);
- // Sync source optime and "hasSyncSource" are not available if the respone does not
+ // Sync source optime and "hasSyncSource" are not available if the response does not
// contain metadata.
ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
ASSERT_EQUALS(OpTime(), dataReplicatorExternalState->syncSourceLastOpTime);
@@ -709,6 +749,8 @@ RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling(bool isV1ElectionPro
ASSERT_BSONOBJ_EQ(fourthEntry, lastEnqueuedDocuments[1]);
oplogFetcher.shutdown();
+ ASSERT_TRUE(oplogFetcher.inShutdown_forTest());
+
oplogFetcher.join();
ASSERT_OK(shutdownState.getStatus());
@@ -870,4 +912,213 @@ TEST_F(OplogFetcherTest,
ASSERT_EQUALS(OpTime(), info.lastDocument.opTime);
}
+long long _getHash(const BSONObj& oplogEntry) {
+ return oplogEntry["h"].numberLong();
+}
+
+Timestamp _getTimestamp(const BSONObj& oplogEntry) {
+ return OplogEntry(oplogEntry).getOpTime().getTimestamp();
+}
+
+OpTimeWithHash _getOpTimeWithHash(const BSONObj& oplogEntry) {
+ return {_getHash(oplogEntry), OplogEntry(oplogEntry).getOpTime()};
+}
+
+std::vector<BSONObj> _generateOplogEntries(std::size_t size) {
+ std::vector<BSONObj> ops(size);
+ for (std::size_t i = 0; i < size; ++i) {
+ ops[i] = makeNoopOplogEntry(Seconds(100 + int(i)), 123LL);
+ }
+ return ops;
+}
+
+void _assertFindCommandTimestampEquals(const Timestamp& timestamp,
+ const RemoteCommandRequest& request) {
+ executor::TaskExecutorTest::assertRemoteCommandNameEquals("find", request);
+ ASSERT_EQUALS(timestamp, request.cmdObj["filter"].Obj()["ts"].Obj()["$gte"].timestamp());
+}
+
+void _assertFindCommandTimestampEquals(const BSONObj& oplogEntry,
+ const RemoteCommandRequest& request) {
+ _assertFindCommandTimestampEquals(_getTimestamp(oplogEntry), request);
+}
+
+TEST_F(OplogFetcherTest, OplogFetcherCreatesNewFetcherOnCallbackErrorDuringGetMoreNumberOne) {
+ auto ops = _generateOplogEntries(5U);
+ std::size_t maxFetcherRestarts = 1U;
+ auto shutdownState = stdx::make_unique<ShutdownState>();
+ OplogFetcher oplogFetcher(&getExecutor(),
+ _getOpTimeWithHash(ops[0]),
+ source,
+ nss,
+ _createConfig(true),
+ maxFetcherRestarts,
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ stdx::ref(*shutdownState));
+ ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
+
+ ASSERT_OK(oplogFetcher.startup());
+
+ // Send first batch from FIND.
+ _assertFindCommandTimestampEquals(
+ ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true));
+
+ // Send error during GETMORE.
+ processNetworkResponse({ErrorCodes::CursorNotFound, "blah"}, true);
+
+ // Send first batch from FIND, and Check that it started from the end of the last FIND response.
+ // Check that the optimes match for the query and last oplog entry.
+ _assertFindCommandTimestampEquals(
+ ops[2], processNetworkResponse(makeCursorResponse(0, {ops[2], ops[3], ops[4]}), false));
+
+ // Done.
+ oplogFetcher.join();
+ ASSERT_OK(shutdownState->getStatus());
+ ASSERT_EQUALS(_getOpTimeWithHash(ops[4]), shutdownState->getLastFetched());
+}
+
+TEST_F(OplogFetcherTest, OplogFetcherStopsRestartingFetcherIfRestartLimitIsReached) {
+ auto ops = _generateOplogEntries(3U);
+ std::size_t maxFetcherRestarts = 2U;
+ auto shutdownState = stdx::make_unique<ShutdownState>();
+ OplogFetcher oplogFetcher(&getExecutor(),
+ _getOpTimeWithHash(ops[0]),
+ source,
+ nss,
+ _createConfig(true),
+ maxFetcherRestarts,
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ stdx::ref(*shutdownState));
+ ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
+
+ ASSERT_OK(oplogFetcher.startup());
+
+ unittest::log() << "processing find request from first fetcher";
+ _assertFindCommandTimestampEquals(
+ ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true));
+
+ unittest::log() << "sending error response to getMore request from first fetcher";
+ assertRemoteCommandNameEquals(
+ "getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "fail 1"}, true));
+
+ unittest::log() << "sending error response to find request from second fetcher";
+ _assertFindCommandTimestampEquals(
+ ops[2], processNetworkResponse({ErrorCodes::IllegalOperation, "fail 2"}, true));
+
+ unittest::log() << "sending error response to find request from third fetcher";
+ _assertFindCommandTimestampEquals(
+ ops[2], processNetworkResponse({ErrorCodes::OperationFailed, "fail 3"}, false));
+
+ oplogFetcher.join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, shutdownState->getStatus());
+ ASSERT_EQUALS(_getOpTimeWithHash(ops[2]), shutdownState->getLastFetched());
+}
+
+TEST_F(OplogFetcherTest, OplogFetcherResetsRestartCounterOnSuccessfulFetcherResponse) {
+ auto ops = _generateOplogEntries(5U);
+ std::size_t maxFetcherRestarts = 2U;
+ auto shutdownState = stdx::make_unique<ShutdownState>();
+ OplogFetcher oplogFetcher(&getExecutor(),
+ _getOpTimeWithHash(ops[0]),
+ source,
+ nss,
+ _createConfig(true),
+ maxFetcherRestarts,
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ stdx::ref(*shutdownState));
+ ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
+
+ ASSERT_OK(oplogFetcher.startup());
+
+ unittest::log() << "processing find request from first fetcher";
+ _assertFindCommandTimestampEquals(
+ ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true));
+
+ unittest::log() << "sending error response to getMore request from first fetcher";
+ assertRemoteCommandNameEquals(
+ "getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "fail 1"}, true));
+
+ unittest::log() << "processing find request from second fetcher";
+ _assertFindCommandTimestampEquals(
+ ops[2], processNetworkResponse(makeCursorResponse(1, {ops[2], ops[3], ops[4]}), true));
+
+ unittest::log() << "sending error response to getMore request from second fetcher";
+ assertRemoteCommandNameEquals(
+ "getMore", processNetworkResponse({ErrorCodes::IllegalOperation, "fail 2"}, true));
+
+ unittest::log() << "sending error response to find request from third fetcher";
+ _assertFindCommandTimestampEquals(
+ ops[4], processNetworkResponse({ErrorCodes::InternalError, "fail 3"}, true));
+
+ unittest::log() << "sending error response to find request from fourth fetcher";
+ _assertFindCommandTimestampEquals(
+ ops[4], processNetworkResponse({ErrorCodes::OperationFailed, "fail 4"}, false));
+
+ oplogFetcher.join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, shutdownState->getStatus());
+ ASSERT_EQUALS(_getOpTimeWithHash(ops[4]), shutdownState->getLastFetched());
+}
+
+class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy {
+public:
+ using ShouldFailRequestFn = stdx::function<bool(const executor::RemoteCommandRequest&)>;
+
+ TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor,
+ ShouldFailRequestFn shouldFailRequest)
+ : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
+
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb) override {
+ if (_shouldFailRequest(request)) {
+ return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
+ }
+ return getExecutor()->scheduleRemoteCommand(request, cb);
+ }
+
+private:
+ ShouldFailRequestFn _shouldFailRequest;
+};
+
+TEST_F(OplogFetcherTest, OplogFetcherAbortsWithOriginalResponseErrorOnFailureToScheduleNewFetcher) {
+ auto ops = _generateOplogEntries(3U);
+ std::size_t maxFetcherRestarts = 2U;
+ auto shutdownState = stdx::make_unique<ShutdownState>();
+ bool shouldFailSchedule = false;
+ TaskExecutorWithFailureInScheduleRemoteCommand _executorProxy(
+ &getExecutor(), [&shouldFailSchedule](const executor::RemoteCommandRequest& request) {
+ return shouldFailSchedule;
+ });
+ OplogFetcher oplogFetcher(&_executorProxy,
+ _getOpTimeWithHash(ops[0]),
+ source,
+ nss,
+ _createConfig(true),
+ maxFetcherRestarts,
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ stdx::ref(*shutdownState));
+ ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
+
+ ASSERT_OK(oplogFetcher.startup());
+ ASSERT_TRUE(oplogFetcher.isActive());
+
+ unittest::log() << "processing find request from first fetcher";
+ _assertFindCommandTimestampEquals(
+ ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true));
+
+ unittest::log() << "sending error response to getMore request from first fetcher";
+ shouldFailSchedule = true;
+ assertRemoteCommandNameEquals(
+ "getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "dead cursor"}, false));
+
+ oplogFetcher.join();
+ // Status in shutdown callback should match error for dead cursor instead of error from failed
+ // schedule request.
+ ASSERT_EQUALS(ErrorCodes::CappedPositionLost, shutdownState->getStatus());
+ ASSERT_EQUALS(_getOpTimeWithHash(ops[2]), shutdownState->getLastFetched());
+}
+
} // namespace