summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2016-11-04 11:36:30 -0400
committerRandolph Tan <randolph@10gen.com>2016-11-08 13:13:51 -0500
commit102f68907ecad28cf8ed479bee61c3afd1a4f0f5 (patch)
treeb749da3aec0879a9ed3ebc5446424825bde9bd1f
parent524cc9989bcf743b34bcc9e9d05d9f72cae48620 (diff)
downloadmongo-r3.2.11-rc0.tar.gz
SERVER-26859 AsyncResultsMerger replica set retargeting may block the ASIO callback threadsr3.2.11-rc0
When the handleResponse callback encounters a retriable error. Signal the merger thread for it to retry instead of trying to reschedule inline since rescheduling involves re-evaluating the target host which is a blocking operation. (cherry picked from commit 5b2134f4ae4ea2d70b0ce89041fd11fd7810e40d) Conflicts: src/mongo/s/query/async_results_merger.cpp src/mongo/s/query/async_results_merger_test.cpp
-rw-r--r--src/mongo/s/query/async_results_merger.cpp31
-rw-r--r--src/mongo/s/query/async_results_merger.h6
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp76
3 files changed, 101 insertions, 12 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index c82de6a3bbe..984c2f85184 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -304,6 +304,14 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
return Status::OK();
}
+/*
+ * Note: When nextEvent() is called to do retries, only the remotes with retriable errors will
+ * be rescheduled because:
+ *
+ * 1. Other pending remotes still have callback assigned to them.
+ * 2. Remotes that already has some result will have a non-empty buffer.
+ * 3. Remotes that reached maximum retries will be in 'exhausted' state.
+ */
StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -441,24 +449,29 @@ void AsyncResultsMerger::handleBatchResponse(
}
}
- // If the error is retriable, schedule another request.
+ // If we can still retry the initial cursor establishment, reset the state so it can be
+ // retried the next time nextEvent is called. Never retry getMores to avoid
+ // accidentally skipping results.
if (!remote.cursorId && remote.retryCount < kMaxNumFailedHostRetryAttempts &&
ShardRegistry::kAllRetriableErrors.count(cursorResponseStatus.getStatus().code())) {
+ invariant(remote.docBuffer.empty());
+
LOG(1) << "Initial cursor establishment failed with retriable error and will be retried"
<< causedBy(cursorResponseStatus.getStatus());
++remote.retryCount;
- // Since we potentially updated the targeter that the last host it chose might be
- // faulty, the call below may end up getting a different host.
- remote.status = askForNextBatch_inlock(remoteIndex);
- if (remote.status.isOK()) {
- return;
+ remote.status = Status::OK(); // Reset status so it can be retried.
+
+ // Signal the merger thread to make it retry this remote again.
+ if (_currentEvent.isValid()) {
+ // To prevent ourselves from signalling the event twice,
+ // we set '_currentEvent' as invalid after signalling it.
+ _executor->signalEvent(_currentEvent);
+ _currentEvent = executor::TaskExecutor::EventHandle();
}
- // If we end up here, it means we failed to schedule the retry request, which is a more
- // severe error that should not be retried. Just pass through to the error handling
- // logic below.
+ return;
} else {
remote.status = cursorResponseStatus.getStatus();
}
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index d30281aa8f6..4f1d8e7ca58 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -146,6 +146,12 @@ public:
* Also invalid to call if there is an outstanding event, created by a previous call to this
* function, that has not yet been signaled. If there is an outstanding unsignaled event,
* returns an error.
+ *
+ * Conditions when event can be signaled:
+ * - Finished collecting results from all remotes.
+ * - One of the host failed with a retriable error. In this case, if ready() is false, then
+ * the caller should call nextEvent() to retry the request on the hosts that errored. If
+ * ready() is true, then either the error was not retriable or it has exhausted max retries.
*/
StatusWith<executor::TaskExecutor::EventHandle> nextEvent();
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 8f9e04664b1..31cf4228bcf 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -1220,17 +1220,21 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkSingleNode) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
- ASSERT_FALSE(arm->ready());
// First and second attempts return an error.
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
+ executor->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
+ readyEvent = unittest::assertGet(arm->nextEvent());
scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
+
+ executor->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
// Third attempt succeeds.
+ readyEvent = unittest::assertGet(arm->nextEvent());
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}")};
responses.emplace_back(_nss, CursorId(0), batch);
@@ -1254,15 +1258,25 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkAllFailSingleNode) {
// All attempts return an error (one attempt plus three retries)
scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
+ executor->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
+ readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
+ executor->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
+ readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
+ executor->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
+ readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
+ executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
auto status = arm->nextReady();
@@ -1289,19 +1303,75 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) {
// From the second host all attempts return an error (one attempt plus three retries)
scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
+ executor->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
+ readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
+ executor->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
+ readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
+ executor->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
+ readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
+ executor->waitForEvent(readyEvent);
+ readyEvent = unittest::assertGet(arm->nextEvent());
+ executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(arm->remotesExhausted());
+ ASSERT_TRUE(arm->ready());
+}
+
+TEST_F(AsyncResultsMergerTest, ErrorAtFirstAttemptAtSameTimeShouldEventuallyReturnResults) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}");
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]});
+
+ ASSERT_FALSE(arm->ready());
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // Both hosts return an error which indicates that the request should be retried.
+ scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
+ scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
+
+ executor->waitForEvent(readyEvent);
+ ASSERT_FALSE(arm->ready());
+
+ readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+
+ // Return valid data on both hosts.
+ {
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch = {fromjson("{_id: 1, $sortKey: {'': 1}}")};
+ responses.emplace_back(_nss, CursorId(0), batch);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::InitialResponse);
+ }
+
+ {
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch = {fromjson("{_id: 2, $sortKey: {'': 2}}}")};
+ responses.emplace_back(_nss, CursorId(0), batch);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::InitialResponse);
+ }
+
+ executor->waitForEvent(readyEvent);
+ ASSERT_TRUE(arm->ready());
+
+ ASSERT_EQ(fromjson("{_id: 1, $sortKey: {'': 1}}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 2, $sortKey: {'': 2}}}}"), *unittest::assertGet(arm->nextReady()));
ASSERT_TRUE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());