summaryrefslogtreecommitdiff
path: root/src/mongo/s/query
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2015-09-22 18:08:36 -0400
committerDavid Storch <david.storch@10gen.com>2015-09-24 20:04:21 -0400
commit549d1080fe68fdef42182d4a4189fdd70f964eaf (patch)
treea72b834ba750998caaf23b54e7aee1f9976cec66 /src/mongo/s/query
parentb332a0f555e0d332d3b3d77878187597a23e140b (diff)
downloadmongo-549d1080fe68fdef42182d4a4189fdd70f964eaf.tar.gz
SERVER-19842 support allowPartialResults query option in new mongos read path
Diffstat (limited to 'src/mongo/s/query')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp22
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp98
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h6
-rw-r--r--src/mongo/s/query/cluster_find.cpp1
4 files changed, 122 insertions, 5 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index ef039eadb2f..abecbba1a34 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -369,9 +369,14 @@ void AsyncResultsMerger::handleBatchResponse(
if (!cbData.response.isOK()) {
remote.status = cbData.response.getStatus();
- // If we failed to retrieve the batch because we couldn't contact the remote, we notify that
- // targeter that the host is unreachable. The caller can then retry on a new host.
- if (remote.status == ErrorCodes::HostUnreachable && remote.shardId) {
+ // Errors other than HostUnreachable have no special handling.
+ if (remote.status != ErrorCodes::HostUnreachable) {
+ return;
+ }
+
+ // Notify that targeter that the host is unreachable. The caller can then retry on a new
+ // host.
+ if (remote.shardId) {
auto shard = _params.shardRegistry->getShard(_params.txn, *remote.shardId);
if (!shard) {
remote.status =
@@ -383,6 +388,17 @@ void AsyncResultsMerger::handleBatchResponse(
}
}
+ // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
+ // remove the unreachable host entirely from consideration by marking it as exhausted.
+ if (_params.isAllowPartialResults) {
+ remote.status = Status::OK();
+
+ // Clear the results buffer and cursor id.
+ std::queue<BSONObj> emptyBuffer;
+ std::swap(remote.docBuffer, emptyBuffer);
+ remote.cursorId = 0;
+ }
+
return;
}
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 7daaa34d03b..75dfe2a2876 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -86,10 +86,16 @@ protected:
params.batchSize = getMoreBatchSize ? getMoreBatchSize : lpq->getBatchSize();
params.skip = lpq->getSkip();
params.isTailable = lpq->isTailable();
+ params.isAllowPartialResults = lpq->isAllowPartialResults();
params.isSecondaryOk = isSecondaryOk;
for (const auto& hostAndPort : remotes) {
- params.remotes.emplace_back(hostAndPort, "testShard", findCmd);
+ // Pass boost::none in place of a ShardId. If there is a ShardId and the ARM receives an
+ // UnreachableHost error, it will attempt to look inside the shard registry in order to
+ // find the Shard to which the host belongs and notify it of the unreachable host. Since
+ // this text fixture is not passing down the shard registry (or an OperationContext),
+ // we must skip this unreachable host handling.
+ params.remotes.emplace_back(hostAndPort, boost::none, findCmd);
}
arm = stdx::make_unique<AsyncResultsMerger>(executor, params);
@@ -1045,6 +1051,96 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) {
ASSERT(!unittest::assertGet(arm->nextReady()));
}
+TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
+ makeCursorFromFindCmd(findCmd, _remotes);
+
+ ASSERT_FALSE(arm->ready());
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // The network layer reports that the first host is unreachable.
+ scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
+ ASSERT_FALSE(arm->ready());
+
+ // Instead of propagating the error, we should be willing to return results from the two
+ // remaining shards.
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
+ responses.emplace_back(_nss, CursorId(98), batch1);
+ std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
+ responses.emplace_back(_nss, CursorId(99), batch2);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
+ executor->waitForEvent(readyEvent);
+
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+
+ ASSERT_FALSE(arm->ready());
+ readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // Now the second host becomes unreachable. We should still be willing to return results from
+ // the third shard.
+ scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
+ ASSERT_FALSE(arm->ready());
+
+ responses.clear();
+ std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")};
+ responses.emplace_back(_nss, CursorId(99), batch3);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
+ executor->waitForEvent(readyEvent);
+
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()));
+
+ ASSERT_FALSE(arm->ready());
+ readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // Once the last reachable shard indicates that its cursor is closed, we're done.
+ responses.clear();
+ std::vector<BSONObj> batch4 = {};
+ responses.emplace_back(_nss, CursorId(0), batch4);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
+ executor->waitForEvent(readyEvent);
+
+ ASSERT_TRUE(arm->ready());
+ ASSERT(!unittest::assertGet(arm->nextReady()));
+}
+
+TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
+ makeCursorFromFindCmd(findCmd, {_remotes[0]});
+
+ ASSERT_FALSE(arm->ready());
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
+ responses.emplace_back(_nss, CursorId(98), batch);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
+ executor->waitForEvent(readyEvent);
+
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+
+ ASSERT_FALSE(arm->ready());
+ readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // The lone host involved in this query becomes unreachable. This should simply cause us to
+ // return EOF.
+ scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
+ ASSERT_TRUE(arm->ready());
+ ASSERT(!unittest::assertGet(arm->nextReady()));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 8913e1001e5..3c73d47ba9b 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -53,7 +53,7 @@ struct ClusterClientCursorParams {
/**
* Use when a new cursor should be created on the remote.
*/
- Remote(HostAndPort hostAndPort, ShardId sid, BSONObj cmdObj)
+ Remote(HostAndPort hostAndPort, boost::optional<ShardId> sid, BSONObj cmdObj)
: hostAndPort(std::move(hostAndPort)),
shardId(std::move(sid)),
cmdObj(std::move(cmdObj)) {}
@@ -126,6 +126,10 @@ struct ClusterClientCursorParams {
// Whether any of the remote nodes might be secondaries due to a read preference mode other
// than "primary".
bool isSecondaryOk = false;
+
+ // Whether the client indicated that it is willing to receive partial results in the case of an
+ // unreachable host.
+ bool isAllowPartialResults = false;
};
} // mongo
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index a30b9a55655..92ac88152ef 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -218,6 +218,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
params.skip = query.getParsed().getSkip();
params.isTailable = query.getParsed().isTailable();
params.isSecondaryOk = (readPref.pref != ReadPreference::PrimaryOnly);
+ params.isAllowPartialResults = query.getParsed().isAllowPartialResults();
// This is the batchSize passed to each subsequent getMore command issued by the cursor. We
// usually use the batchSize associated with the initial find, but as it is illegal to send a