diff options
author | Benety Goh <benety@mongodb.com> | 2016-09-23 14:43:39 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-09-26 16:13:25 -0400 |
commit | 9a4693efbe83f3c07c751dd608dec69829978ee6 (patch) | |
tree | 2c0f2181d4e7d4d306851ab4067921b43589d549 /src/mongo | |
parent | aa9ea14a57181f098a32f24a734f2c81563bee5c (diff) | |
download | mongo-9a4693efbe83f3c07c751dd608dec69829978ee6.tar.gz |
SERVER-25702 added support to OplogFetcher for restarting oplog query
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 138 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 35 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 257 |
4 files changed, 399 insertions, 32 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index c90a311c033..2ade8283ac6 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -721,6 +721,7 @@ env.CppUnitTest( LIBDEPS=[ 'oplog_fetcher', 'data_replicator_external_state_mock', + '$BUILD_DIR/mongo/unittest/task_executor_proxy', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', ], ) diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index afb48bedde9..7ea9dd5e196 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -262,14 +262,12 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, _onShutdownCallbackFn(onShutdownCallbackFn), _lastFetched(lastFetched), _fetcher(_makeFetcher(_lastFetched.opTime)) { - uassert(ErrorCodes::BadValue, "null last optime fetched", !lastFetched.opTime.isNull()); + uassert(ErrorCodes::BadValue, "null last optime fetched", !_lastFetched.opTime.isNull()); uassert(ErrorCodes::InvalidReplicaSetConfig, "uninitialized replica set configuration", config.isInitialized()); uassert(ErrorCodes::BadValue, "null enqueueDocuments function", enqueueDocumentsFn); uassert(ErrorCodes::BadValue, "null onShutdownCallback function", onShutdownCallbackFn); - - readersCreatedStats.increment(); } OplogFetcher::~OplogFetcher() { @@ -284,19 +282,40 @@ std::string OplogFetcher::toString() const { } bool OplogFetcher::isActive() const { - return _fetcher->isActive(); + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _active; } Status OplogFetcher::startup() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_active) { + return Status(ErrorCodes::IllegalOperation, "oplog fetcher already active"); + } + if (_inShutdown) { + return Status(ErrorCodes::ShutdownInProgress, "oplog fetcher shutting down"); + } + + auto status = _scheduleFetcher_inlock(); + if (status.isOK()) { + _active = true; + } + return status; +} + +Status OplogFetcher::_scheduleFetcher_inlock() { + readersCreatedStats.increment(); return _fetcher->schedule(); } void OplogFetcher::shutdown() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + _inShutdown = true; _fetcher->shutdown(); } void OplogFetcher::join() { - _fetcher->join(); + stdx::unique_lock<stdx::mutex> lock(_mutex); + _condition.wait(lock, [this]() { return !_active; }); } OpTimeWithHash OplogFetcher::getLastOpTimeWithHashFetched() const { @@ -305,6 +324,7 @@ OpTimeWithHash OplogFetcher::getLastOpTimeWithHashFetched() const { } BSONObj OplogFetcher::getCommandObject_forTest() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); return _fetcher->getCommandObject(); } @@ -320,18 +340,71 @@ Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const { return _awaitDataTimeout; } +bool OplogFetcher::inShutdown_forTest() const { + return _isInShutdown(); +} + void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, BSONObjBuilder* getMoreBob) { - // if target cut connections between connecting and querying (for - // example, because it stepped down) we might not have a cursor - if (!result.isOK()) { - LOG(1) << "Error returned from oplog query: " << redact(result.getStatus()); - _onShutdown(result.getStatus()); + const auto& responseStatus = result.getStatus(); + if (ErrorCodes::CallbackCanceled == responseStatus) { + LOG(1) << "oplog query cancelled"; + _finishCallback(responseStatus); + return; + } + + // If target cut connections between connecting and querying (for + // example, because it stepped down) we might not have a cursor. + if (!responseStatus.isOK()) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_inShutdown) { + log() << "Error returned from oplog query while canceling query: " + << redact(responseStatus); + } else if (_fetcherRestarts == _maxFetcherRestarts) { + log() << "Error returned from oplog query (no more query restarts left): " + << redact(responseStatus); + } else { + log() << "Restarting oplog query due to error: " << redact(responseStatus) + << ". Last fetched optime (with hash): " << _lastFetched + << ". Restarts remaining: " << (_maxFetcherRestarts - _fetcherRestarts); + _fetcherRestarts++; + // Destroying current instance in _shuttingDownFetcher will possibly block. + _shuttingDownFetcher.reset(); + // Move the old fetcher into the shutting down instance. + _shuttingDownFetcher.swap(_fetcher); + // Create and start fetcher with new starting optime. + _fetcher = _makeFetcher(_lastFetched.opTime); + auto scheduleStatus = _scheduleFetcher_inlock(); + if (scheduleStatus.isOK()) { + log() << "Scheduled new oplog query " << _fetcher->toString(); + return; + } + error() << "Error scheduling new oplog query: " << redact(scheduleStatus) + << ". Returning current oplog query error: " << redact(responseStatus); + } + } + _finishCallback(responseStatus); + return; + } + + // Reset fetcher restart counter on successful response. + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_active); + _fetcherRestarts = 0; + } + + if (_isInShutdown()) { + _finishCallback(Status(ErrorCodes::CallbackCanceled, "oplog fetcher shutting down")); return; } // Stop fetching and return immediately on fail point. + // This fail point is intended to make the oplog fetcher ignore the downloaded batch of + // operations and not error out. if (MONGO_FAIL_POINT(stopOplogFetcher)) { + _finishCallback(Status::OK()); return; } @@ -347,7 +420,7 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, if (!metadataResult.isOK()) { error() << "invalid replication metadata from sync source " << _fetcher->getSource() << ": " << metadataResult.getStatus() << ": " << metadataObj; - _onShutdown(metadataResult.getStatus()); + _finishCallback(metadataResult.getStatus()); return; } metadata = metadataResult.getValue(); @@ -372,7 +445,7 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, auto status = checkRemoteOplogStart(documents, opTimeWithHash); if (!status.isOK()) { // Stop oplog fetcher and execute rollback. - _onShutdown(status, opTimeWithHash); + _finishCallback(status, opTimeWithHash); return; } @@ -385,7 +458,7 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, auto validateResult = OplogFetcher::validateDocuments( documents, queryResponse.first, opTimeWithHash.opTime.getTimestamp()); if (!validateResult.isOK()) { - _onShutdown(validateResult.getStatus(), opTimeWithHash); + _finishCallback(validateResult.getStatus(), opTimeWithHash); return; } auto info = validateResult.getValue(); @@ -411,23 +484,23 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, } if (_dataReplicatorExternalState->shouldStopFetching(_fetcher->getSource(), metadata)) { - _onShutdown(Status(ErrorCodes::InvalidSyncSource, - str::stream() << "sync source " << _fetcher->getSource().toString() - << " (last optime: " - << metadata.getLastOpVisible().toString() - << "; sync source index: " - << metadata.getSyncSourceIndex() - << "; primary index: " - << metadata.getPrimaryIndex() - << ") is no longer valid"), - opTimeWithHash); + _finishCallback(Status(ErrorCodes::InvalidSyncSource, + str::stream() << "sync source " << _fetcher->getSource().toString() + << " (last optime: " + << metadata.getLastOpVisible().toString() + << "; sync source index: " + << metadata.getSyncSourceIndex() + << "; primary index: " + << metadata.getPrimaryIndex() + << ") is no longer valid"), + opTimeWithHash); return; } // No more data. Stop processing and return Status::OK along with last // fetch info. if (!getMoreBob) { - _onShutdown(Status::OK(), opTimeWithHash); + _finishCallback(Status::OK(), opTimeWithHash); return; } @@ -437,12 +510,18 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, _awaitDataTimeout)); } -void OplogFetcher::_onShutdown(Status status) { - _onShutdown(status, getLastOpTimeWithHashFetched()); +void OplogFetcher::_finishCallback(Status status) { + _finishCallback(status, getLastOpTimeWithHashFetched()); } -void OplogFetcher::_onShutdown(Status status, OpTimeWithHash opTimeWithHash) { +void OplogFetcher::_finishCallback(Status status, OpTimeWithHash opTimeWithHash) { + invariant(isActive()); + _onShutdownCallbackFn(status, opTimeWithHash); + + stdx::lock_guard<stdx::mutex> lock(_mutex); + _active = false; + _condition.notify_all(); } std::unique_ptr<Fetcher> OplogFetcher::_makeFetcher(OpTime lastFetchedOpTime) { @@ -456,5 +535,10 @@ std::unique_ptr<Fetcher> OplogFetcher::_makeFetcher(OpTime lastFetchedOpTime) { _remoteCommandTimeout); } +bool OplogFetcher::_isInShutdown() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _inShutdown; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index e7414c0467c..8d17b6a71d1 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -39,7 +39,9 @@ #include "mongo/db/repl/data_replicator_external_state.h" #include "mongo/db/repl/optime_with.h" #include "mongo/db/repl/replica_set_config.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/mutex.h" namespace mongo { namespace repl { @@ -188,8 +190,20 @@ public: */ Milliseconds getAwaitDataTimeout_forTest() const; + /** + * Returns whether the oplog fetcher is in shutdown. + * + * For testing only. + */ + bool inShutdown_forTest() const; + private: /** + * Schedules fetcher and updates counters. + */ + Status _scheduleFetcher_inlock(); + + /** * Processes each batch of results from the tailable cursor started by the fetcher on the sync * source. * @@ -202,17 +216,24 @@ private: * Notifies caller that the oplog fetcher has completed processing operations from * the remote oplog. */ - void _onShutdown(Status status); - void _onShutdown(Status status, OpTimeWithHash opTimeWithHash); + void _finishCallback(Status status); + void _finishCallback(Status status, OpTimeWithHash opTimeWithHash); /** * Creates a new instance of the fetcher to tail the remote oplog starting at the given optime. */ std::unique_ptr<Fetcher> _makeFetcher(OpTime lastFetchedOpTime); + /** + * Returns whether the oplog fetcher is in shutdown. + */ + bool _isInShutdown() const; + // Protects member data of this OplogFetcher. mutable stdx::mutex _mutex; + mutable stdx::condition_variable _condition; + executor::TaskExecutor* const _executor; const HostAndPort _source; const NamespaceString _nss; @@ -232,7 +253,17 @@ private: // "_enqueueDocumentsFn". OpTimeWithHash _lastFetched; + // _active is true when a fetcher is scheduled to be run by the executor. + bool _active = false; + + // _inShutdown is true after shutdown() is called. + bool _inShutdown = false; + + // Fetcher restarts since the last successful oplog query response. + std::size_t _fetcherRestarts = 0; + std::unique_ptr<Fetcher> _fetcher; + std::unique_ptr<Fetcher> _shuttingDownFetcher; }; } // namespace repl diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 0399cc32dd3..d513e452ff8 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -38,15 +38,19 @@ #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/stdx/memory.h" +#include "mongo/unittest/task_executor_proxy.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/scopeguard.h" namespace { using namespace mongo; using namespace mongo::repl; +using namespace unittest; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; +using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard; class ShutdownState { MONGO_DISALLOW_COPYING(ShutdownState); @@ -148,12 +152,16 @@ void OplogFetcherTest::tearDown() { RemoteCommandRequest OplogFetcherTest::processNetworkResponse( RemoteCommandResponse response, bool expectReadyRequestsAfterProcessing) { + auto net = getNet(); - net->enterNetwork(); + NetworkGuard guard(net); + log() << "scheduling response."; auto request = net->scheduleSuccessfulResponse(response); + log() << "running network ops."; net->runReadyNetworkOperations(); + log() << "checking for more requests"; ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests()); - net->exitNetwork(); + log() << "returning consumed request"; return request; } @@ -280,6 +288,38 @@ TEST_F(OplogFetcherTest, InvalidConstruction) { "null onShutdownCallback function"); } +TEST_F(OplogFetcherTest, StartupWhenActiveReturnsIllegalOperation) { + OplogFetcher oplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + _createConfig(true), + 0, + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + [](Status, OpTimeWithHash) {}); + ASSERT_OK(oplogFetcher.startup()); + ASSERT_TRUE(oplogFetcher.isActive()); + auto status = oplogFetcher.startup(); + getExecutor().shutdown(); + ASSERT_EQUALS(ErrorCodes::IllegalOperation, status); + ASSERT_STRING_CONTAINS(status.reason(), "oplog fetcher already active"); +} + +TEST_F(OplogFetcherTest, StartupWhenShuttingDownReturnsShutdownInProgress) { + OplogFetcher oplogFetcher(&getExecutor(), + lastFetched, + source, + nss, + _createConfig(true), + 0, + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + [](Status, OpTimeWithHash) {}); + oplogFetcher.shutdown(); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, oplogFetcher.startup()); +} + void _checkDefaultCommandObjectFields(BSONObj cmdObj) { ASSERT_EQUALS(std::string("find"), cmdObj.firstElementFieldName()); ASSERT_TRUE(cmdObj.getBoolField("tailable")); @@ -624,7 +664,7 @@ void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* metadata) { TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) { testSyncSourceChecking(nullptr); - // Sync source optime and "hasSyncSource" are not available if the respone does not + // Sync source optime and "hasSyncSource" are not available if the response does not // contain metadata. ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); ASSERT_EQUALS(OpTime(), dataReplicatorExternalState->syncSourceLastOpTime); @@ -709,6 +749,8 @@ RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling(bool isV1ElectionPro ASSERT_BSONOBJ_EQ(fourthEntry, lastEnqueuedDocuments[1]); oplogFetcher.shutdown(); + ASSERT_TRUE(oplogFetcher.inShutdown_forTest()); + oplogFetcher.join(); ASSERT_OK(shutdownState.getStatus()); @@ -870,4 +912,213 @@ TEST_F(OplogFetcherTest, ASSERT_EQUALS(OpTime(), info.lastDocument.opTime); } +long long _getHash(const BSONObj& oplogEntry) { + return oplogEntry["h"].numberLong(); +} + +Timestamp _getTimestamp(const BSONObj& oplogEntry) { + return OplogEntry(oplogEntry).getOpTime().getTimestamp(); +} + +OpTimeWithHash _getOpTimeWithHash(const BSONObj& oplogEntry) { + return {_getHash(oplogEntry), OplogEntry(oplogEntry).getOpTime()}; +} + +std::vector<BSONObj> _generateOplogEntries(std::size_t size) { + std::vector<BSONObj> ops(size); + for (std::size_t i = 0; i < size; ++i) { + ops[i] = makeNoopOplogEntry(Seconds(100 + int(i)), 123LL); + } + return ops; +} + +void _assertFindCommandTimestampEquals(const Timestamp& timestamp, + const RemoteCommandRequest& request) { + executor::TaskExecutorTest::assertRemoteCommandNameEquals("find", request); + ASSERT_EQUALS(timestamp, request.cmdObj["filter"].Obj()["ts"].Obj()["$gte"].timestamp()); +} + +void _assertFindCommandTimestampEquals(const BSONObj& oplogEntry, + const RemoteCommandRequest& request) { + _assertFindCommandTimestampEquals(_getTimestamp(oplogEntry), request); +} + +TEST_F(OplogFetcherTest, OplogFetcherCreatesNewFetcherOnCallbackErrorDuringGetMoreNumberOne) { + auto ops = _generateOplogEntries(5U); + std::size_t maxFetcherRestarts = 1U; + auto shutdownState = stdx::make_unique<ShutdownState>(); + OplogFetcher oplogFetcher(&getExecutor(), + _getOpTimeWithHash(ops[0]), + source, + nss, + _createConfig(true), + maxFetcherRestarts, + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + stdx::ref(*shutdownState)); + ON_BLOCK_EXIT([this] { getExecutor().shutdown(); }); + + ASSERT_OK(oplogFetcher.startup()); + + // Send first batch from FIND. + _assertFindCommandTimestampEquals( + ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true)); + + // Send error during GETMORE. + processNetworkResponse({ErrorCodes::CursorNotFound, "blah"}, true); + + // Send first batch from FIND, and Check that it started from the end of the last FIND response. + // Check that the optimes match for the query and last oplog entry. + _assertFindCommandTimestampEquals( + ops[2], processNetworkResponse(makeCursorResponse(0, {ops[2], ops[3], ops[4]}), false)); + + // Done. + oplogFetcher.join(); + ASSERT_OK(shutdownState->getStatus()); + ASSERT_EQUALS(_getOpTimeWithHash(ops[4]), shutdownState->getLastFetched()); +} + +TEST_F(OplogFetcherTest, OplogFetcherStopsRestartingFetcherIfRestartLimitIsReached) { + auto ops = _generateOplogEntries(3U); + std::size_t maxFetcherRestarts = 2U; + auto shutdownState = stdx::make_unique<ShutdownState>(); + OplogFetcher oplogFetcher(&getExecutor(), + _getOpTimeWithHash(ops[0]), + source, + nss, + _createConfig(true), + maxFetcherRestarts, + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + stdx::ref(*shutdownState)); + ON_BLOCK_EXIT([this] { getExecutor().shutdown(); }); + + ASSERT_OK(oplogFetcher.startup()); + + unittest::log() << "processing find request from first fetcher"; + _assertFindCommandTimestampEquals( + ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true)); + + unittest::log() << "sending error response to getMore request from first fetcher"; + assertRemoteCommandNameEquals( + "getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "fail 1"}, true)); + + unittest::log() << "sending error response to find request from second fetcher"; + _assertFindCommandTimestampEquals( + ops[2], processNetworkResponse({ErrorCodes::IllegalOperation, "fail 2"}, true)); + + unittest::log() << "sending error response to find request from third fetcher"; + _assertFindCommandTimestampEquals( + ops[2], processNetworkResponse({ErrorCodes::OperationFailed, "fail 3"}, false)); + + oplogFetcher.join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, shutdownState->getStatus()); + ASSERT_EQUALS(_getOpTimeWithHash(ops[2]), shutdownState->getLastFetched()); +} + +TEST_F(OplogFetcherTest, OplogFetcherResetsRestartCounterOnSuccessfulFetcherResponse) { + auto ops = _generateOplogEntries(5U); + std::size_t maxFetcherRestarts = 2U; + auto shutdownState = stdx::make_unique<ShutdownState>(); + OplogFetcher oplogFetcher(&getExecutor(), + _getOpTimeWithHash(ops[0]), + source, + nss, + _createConfig(true), + maxFetcherRestarts, + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + stdx::ref(*shutdownState)); + ON_BLOCK_EXIT([this] { getExecutor().shutdown(); }); + + ASSERT_OK(oplogFetcher.startup()); + + unittest::log() << "processing find request from first fetcher"; + _assertFindCommandTimestampEquals( + ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true)); + + unittest::log() << "sending error response to getMore request from first fetcher"; + assertRemoteCommandNameEquals( + "getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "fail 1"}, true)); + + unittest::log() << "processing find request from second fetcher"; + _assertFindCommandTimestampEquals( + ops[2], processNetworkResponse(makeCursorResponse(1, {ops[2], ops[3], ops[4]}), true)); + + unittest::log() << "sending error response to getMore request from second fetcher"; + assertRemoteCommandNameEquals( + "getMore", processNetworkResponse({ErrorCodes::IllegalOperation, "fail 2"}, true)); + + unittest::log() << "sending error response to find request from third fetcher"; + _assertFindCommandTimestampEquals( + ops[4], processNetworkResponse({ErrorCodes::InternalError, "fail 3"}, true)); + + unittest::log() << "sending error response to find request from fourth fetcher"; + _assertFindCommandTimestampEquals( + ops[4], processNetworkResponse({ErrorCodes::OperationFailed, "fail 4"}, false)); + + oplogFetcher.join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, shutdownState->getStatus()); + ASSERT_EQUALS(_getOpTimeWithHash(ops[4]), shutdownState->getLastFetched()); +} + +class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy { +public: + using ShouldFailRequestFn = stdx::function<bool(const executor::RemoteCommandRequest&)>; + + TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor, + ShouldFailRequestFn shouldFailRequest) + : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} + + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb) override { + if (_shouldFailRequest(request)) { + return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); + } + return getExecutor()->scheduleRemoteCommand(request, cb); + } + +private: + ShouldFailRequestFn _shouldFailRequest; +}; + +TEST_F(OplogFetcherTest, OplogFetcherAbortsWithOriginalResponseErrorOnFailureToScheduleNewFetcher) { + auto ops = _generateOplogEntries(3U); + std::size_t maxFetcherRestarts = 2U; + auto shutdownState = stdx::make_unique<ShutdownState>(); + bool shouldFailSchedule = false; + TaskExecutorWithFailureInScheduleRemoteCommand _executorProxy( + &getExecutor(), [&shouldFailSchedule](const executor::RemoteCommandRequest& request) { + return shouldFailSchedule; + }); + OplogFetcher oplogFetcher(&_executorProxy, + _getOpTimeWithHash(ops[0]), + source, + nss, + _createConfig(true), + maxFetcherRestarts, + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + stdx::ref(*shutdownState)); + ON_BLOCK_EXIT([this] { getExecutor().shutdown(); }); + + ASSERT_OK(oplogFetcher.startup()); + ASSERT_TRUE(oplogFetcher.isActive()); + + unittest::log() << "processing find request from first fetcher"; + _assertFindCommandTimestampEquals( + ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true)); + + unittest::log() << "sending error response to getMore request from first fetcher"; + shouldFailSchedule = true; + assertRemoteCommandNameEquals( + "getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "dead cursor"}, false)); + + oplogFetcher.join(); + // Status in shutdown callback should match error for dead cursor instead of error from failed + // schedule request. + ASSERT_EQUALS(ErrorCodes::CappedPositionLost, shutdownState->getStatus()); + ASSERT_EQUALS(_getOpTimeWithHash(ops[2]), shutdownState->getLastFetched()); +} + } // namespace |