diff options
author | David Storch <david.storch@10gen.com> | 2015-09-22 18:08:36 -0400 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2015-09-24 20:04:21 -0400 |
commit | 549d1080fe68fdef42182d4a4189fdd70f964eaf (patch) | |
tree | a72b834ba750998caaf23b54e7aee1f9976cec66 /src | |
parent | b332a0f555e0d332d3b3d77878187597a23e140b (diff) | |
download | mongo-549d1080fe68fdef42182d4a4189fdd70f964eaf.tar.gz |
SERVER-19842 support allowPartialResults query option in new mongos read path
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 22 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 98 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_params.h | 6 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 1 |
4 files changed, 122 insertions, 5 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index ef039eadb2f..abecbba1a34 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -369,9 +369,14 @@ void AsyncResultsMerger::handleBatchResponse( if (!cbData.response.isOK()) { remote.status = cbData.response.getStatus(); - // If we failed to retrieve the batch because we couldn't contact the remote, we notify that - // targeter that the host is unreachable. The caller can then retry on a new host. - if (remote.status == ErrorCodes::HostUnreachable && remote.shardId) { + // Errors other than HostUnreachable have no special handling. + if (remote.status != ErrorCodes::HostUnreachable) { + return; + } + + // Notify that targeter that the host is unreachable. The caller can then retry on a new + // host. + if (remote.shardId) { auto shard = _params.shardRegistry->getShard(_params.txn, *remote.shardId); if (!shard) { remote.status = @@ -383,6 +388,17 @@ void AsyncResultsMerger::handleBatchResponse( } } + // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We + // remove the unreachable host entirely from consideration by marking it as exhausted. + if (_params.isAllowPartialResults) { + remote.status = Status::OK(); + + // Clear the results buffer and cursor id. + std::queue<BSONObj> emptyBuffer; + std::swap(remote.docBuffer, emptyBuffer); + remote.cursorId = 0; + } + return; } diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 7daaa34d03b..75dfe2a2876 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -86,10 +86,16 @@ protected: params.batchSize = getMoreBatchSize ? getMoreBatchSize : lpq->getBatchSize(); params.skip = lpq->getSkip(); params.isTailable = lpq->isTailable(); + params.isAllowPartialResults = lpq->isAllowPartialResults(); params.isSecondaryOk = isSecondaryOk; for (const auto& hostAndPort : remotes) { - params.remotes.emplace_back(hostAndPort, "testShard", findCmd); + // Pass boost::none in place of a ShardId. If there is a ShardId and the ARM receives an + // UnreachableHost error, it will attempt to look inside the shard registry in order to + // find the Shard to which the host belongs and notify it of the unreachable host. Since + // this text fixture is not passing down the shard registry (or an OperationContext), + // we must skip this unreachable host handling. + params.remotes.emplace_back(hostAndPort, boost::none, findCmd); } arm = stdx::make_unique<AsyncResultsMerger>(executor, params); @@ -1045,6 +1051,96 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) { ASSERT(!unittest::assertGet(arm->nextReady())); } +TEST_F(AsyncResultsMergerTest, AllowPartialResults) { + BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}"); + makeCursorFromFindCmd(findCmd, _remotes); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // The network layer reports that the first host is unreachable. + scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + ASSERT_FALSE(arm->ready()); + + // Instead of propagating the error, we should be willing to return results from the two + // remaining shards. + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; + responses.emplace_back(_nss, CursorId(98), batch1); + std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")}; + responses.emplace_back(_nss, CursorId(99), batch2); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); + executor->waitForEvent(readyEvent); + + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + + ASSERT_FALSE(arm->ready()); + readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // Now the second host becomes unreachable. We should still be willing to return results from + // the third shard. + scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + ASSERT_FALSE(arm->ready()); + + responses.clear(); + std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")}; + responses.emplace_back(_nss, CursorId(99), batch3); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); + executor->waitForEvent(readyEvent); + + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); + + ASSERT_FALSE(arm->ready()); + readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // Once the last reachable shard indicates that its cursor is closed, we're done. + responses.clear(); + std::vector<BSONObj> batch4 = {}; + responses.emplace_back(_nss, CursorId(0), batch4); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); + executor->waitForEvent(readyEvent); + + ASSERT_TRUE(arm->ready()); + ASSERT(!unittest::assertGet(arm->nextReady())); +} + +TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { + BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}"); + makeCursorFromFindCmd(findCmd, {_remotes[0]}); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; + responses.emplace_back(_nss, CursorId(98), batch); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); + executor->waitForEvent(readyEvent); + + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + + ASSERT_FALSE(arm->ready()); + readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // The lone host involved in this query becomes unreachable. This should simply cause us to + // return EOF. + scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + ASSERT_TRUE(arm->ready()); + ASSERT(!unittest::assertGet(arm->nextReady())); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index 8913e1001e5..3c73d47ba9b 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -53,7 +53,7 @@ struct ClusterClientCursorParams { /** * Use when a new cursor should be created on the remote. */ - Remote(HostAndPort hostAndPort, ShardId sid, BSONObj cmdObj) + Remote(HostAndPort hostAndPort, boost::optional<ShardId> sid, BSONObj cmdObj) : hostAndPort(std::move(hostAndPort)), shardId(std::move(sid)), cmdObj(std::move(cmdObj)) {} @@ -126,6 +126,10 @@ struct ClusterClientCursorParams { // Whether any of the remote nodes might be secondaries due to a read preference mode other // than "primary". bool isSecondaryOk = false; + + // Whether the client indicated that it is willing to receive partial results in the case of an + // unreachable host. + bool isAllowPartialResults = false; }; } // mongo diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index a30b9a55655..92ac88152ef 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -218,6 +218,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, params.skip = query.getParsed().getSkip(); params.isTailable = query.getParsed().isTailable(); params.isSecondaryOk = (readPref.pref != ReadPreference::PrimaryOnly); + params.isAllowPartialResults = query.getParsed().isAllowPartialResults(); // This is the batchSize passed to each subsequent getMore command issued by the cursor. We // usually use the batchSize associated with the initial find, but as it is illegal to send a |