diff options
author | Randolph Tan <randolph@10gen.com> | 2016-11-04 11:36:30 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2016-11-08 13:13:51 -0500 |
commit | 102f68907ecad28cf8ed479bee61c3afd1a4f0f5 (patch) | |
tree | b749da3aec0879a9ed3ebc5446424825bde9bd1f | |
parent | 524cc9989bcf743b34bcc9e9d05d9f72cae48620 (diff) | |
download | mongo-102f68907ecad28cf8ed479bee61c3afd1a4f0f5.tar.gz |
SERVER-26859 AsyncResultsMerger replica set retargeting may block the ASIO callback threadsr3.2.11-rc0
When the handleResponse callback encounters a retriable error. Signal the merger thread for it to retry instead of trying to reschedule inline since rescheduling involves re-evaluating the target host which is a blocking operation.
(cherry picked from commit 5b2134f4ae4ea2d70b0ce89041fd11fd7810e40d)
Conflicts:
src/mongo/s/query/async_results_merger.cpp
src/mongo/s/query/async_results_merger_test.cpp
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 31 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.h | 6 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 76 |
3 files changed, 101 insertions, 12 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index c82de6a3bbe..984c2f85184 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -304,6 +304,14 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { return Status::OK(); } +/* + * Note: When nextEvent() is called to do retries, only the remotes with retriable errors will + * be rescheduled because: + * + * 1. Other pending remotes still have callback assigned to them. + * 2. Remotes that already has some result will have a non-empty buffer. + * 3. Remotes that reached maximum retries will be in 'exhausted' state. + */ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -441,24 +449,29 @@ void AsyncResultsMerger::handleBatchResponse( } } - // If the error is retriable, schedule another request. + // If we can still retry the initial cursor establishment, reset the state so it can be + // retried the next time nextEvent is called. Never retry getMores to avoid + // accidentally skipping results. if (!remote.cursorId && remote.retryCount < kMaxNumFailedHostRetryAttempts && ShardRegistry::kAllRetriableErrors.count(cursorResponseStatus.getStatus().code())) { + invariant(remote.docBuffer.empty()); + LOG(1) << "Initial cursor establishment failed with retriable error and will be retried" << causedBy(cursorResponseStatus.getStatus()); ++remote.retryCount; - // Since we potentially updated the targeter that the last host it chose might be - // faulty, the call below may end up getting a different host. - remote.status = askForNextBatch_inlock(remoteIndex); - if (remote.status.isOK()) { - return; + remote.status = Status::OK(); // Reset status so it can be retried. + + // Signal the merger thread to make it retry this remote again. + if (_currentEvent.isValid()) { + // To prevent ourselves from signalling the event twice, + // we set '_currentEvent' as invalid after signalling it. + _executor->signalEvent(_currentEvent); + _currentEvent = executor::TaskExecutor::EventHandle(); } - // If we end up here, it means we failed to schedule the retry request, which is a more - // severe error that should not be retried. Just pass through to the error handling - // logic below. + return; } else { remote.status = cursorResponseStatus.getStatus(); } diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index d30281aa8f6..4f1d8e7ca58 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -146,6 +146,12 @@ public: * Also invalid to call if there is an outstanding event, created by a previous call to this * function, that has not yet been signaled. If there is an outstanding unsignaled event, * returns an error. + * + * Conditions when event can be signaled: + * - Finished collecting results from all remotes. + * - One of the host failed with a retriable error. In this case, if ready() is false, then + * the caller should call nextEvent() to retry the request on the hosts that errored. If + * ready() is true, then either the error was not retriable or it has exhausted max retries. */ StatusWith<executor::TaskExecutor::EventHandle> nextEvent(); diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 8f9e04664b1..31cf4228bcf 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -1220,17 +1220,21 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkSingleNode) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); - ASSERT_FALSE(arm->ready()); // First and second attempts return an error. + auto readyEvent = unittest::assertGet(arm->nextEvent()); scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); + executor->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); + readyEvent = unittest::assertGet(arm->nextEvent()); scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); + + executor->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); // Third attempt succeeds. + readyEvent = unittest::assertGet(arm->nextEvent()); std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(0), batch); @@ -1254,15 +1258,25 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkAllFailSingleNode) { // All attempts return an error (one attempt plus three retries) scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); + executor->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); + readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); + executor->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); + readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); + executor->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); + readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); + executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); auto status = arm->nextReady(); @@ -1289,19 +1303,75 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) { // From the second host all attempts return an error (one attempt plus three retries) scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + executor->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); + readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + executor->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); + readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + executor->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); + readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + executor->waitForEvent(readyEvent); + readyEvent = unittest::assertGet(arm->nextEvent()); + executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(arm->remotesExhausted()); + ASSERT_TRUE(arm->ready()); +} + +TEST_F(AsyncResultsMergerTest, ErrorAtFirstAttemptAtSameTimeShouldEventuallyReturnResults) { + BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}"); + makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]}); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // Both hosts return an error which indicates that the request should be retried. + scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + + executor->waitForEvent(readyEvent); + ASSERT_FALSE(arm->ready()); + + readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + + // Return valid data on both hosts. + { + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch = {fromjson("{_id: 1, $sortKey: {'': 1}}")}; + responses.emplace_back(_nss, CursorId(0), batch); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::InitialResponse); + } + + { + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch = {fromjson("{_id: 2, $sortKey: {'': 2}}}")}; + responses.emplace_back(_nss, CursorId(0), batch); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::InitialResponse); + } + + executor->waitForEvent(readyEvent); + ASSERT_TRUE(arm->ready()); + + ASSERT_EQ(fromjson("{_id: 1, $sortKey: {'': 1}}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 2, $sortKey: {'': 2}}}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); |