diff options
author | Jason Rassi <rassi@10gen.com> | 2015-08-27 19:39:09 -0400 |
---|---|---|
committer | Jason Rassi <rassi@10gen.com> | 2015-08-28 17:42:01 -0400 |
commit | 9eb318778fa0d16d2156db5f9cee3c6ad17d507c (patch) | |
tree | 3b27a388c89025341e9fbdad6fb843fab004f25b /src/mongo/s | |
parent | 1f73154b39e5d404f92558d3ca6baebaef6bfacc (diff) | |
download | mongo-9eb318778fa0d16d2156db5f9cee3c6ad17d507c.tar.gz |
SERVER-19569 AsyncResultsMerger ability to merge existing cursors
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 13 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.h | 15 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 53 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_params.h | 29 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 8 |
5 files changed, 94 insertions, 24 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 5a1992cc280..9ad91a611ab 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -91,7 +91,7 @@ bool AsyncResultsMerger::ready_inlock() { // We don't return any results until we have received at least one response from each remote // node. This is necessary for versioned commands: we have to ensure that we've properly // established the shard version on each node before we can start returning results. - if (!remote.gotFirstResponse) { + if (!remote.cursorId) { return false; } } @@ -224,7 +224,7 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { BSONObj cmdObj = remote.cursorId ? GetMoreRequest(_params.nsString, *remote.cursorId, adjustedBatchSize, boost::none) .toBSON() - : remote.cmdObj; + : *remote.cmdObj; executor::RemoteCommandRequest request( remote.hostAndPort, _params.nsString.db().toString(), cmdObj); @@ -351,10 +351,8 @@ void AsyncResultsMerger::handleBatchResponse( return; } - // Mark that we've gotten a valid response back from 'remote' at least once. - remote.gotFirstResponse = true; - remote.cursorId = cursorResponse.cursorId; + remote.cmdObj = boost::none; for (const auto& obj : cursorResponse.batch) { remote.docBuffer.push(obj); @@ -483,7 +481,10 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill() { AsyncResultsMerger::RemoteCursorData::RemoteCursorData( const ClusterClientCursorParams::Remote& params) - : hostAndPort(params.hostAndPort), cmdObj(params.cmdObj) {} + : hostAndPort(params.hostAndPort), cmdObj(params.cmdObj), cursorId(params.cursorId) { + // Either cmdObj or cursorId can be provided, but not both. + invariant(static_cast<bool>(cmdObj) != static_cast<bool>(cursorId)); +} bool AsyncResultsMerger::RemoteCursorData::hasNext() const { return !docBuffer.empty(); diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 1070e40e0a8..ba52c5ed08b 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -167,15 +167,22 @@ private: bool exhausted() const; HostAndPort hostAndPort; - BSONObj cmdObj; + + // The command object for sending to the remote to establish the cursor. If a remote cursor + // has not been established yet, this member will be set to a valid command object. If a + // remote cursor has already been established, this member will be unset. + boost::optional<BSONObj> cmdObj; + + // The cursor id for the remote cursor. If a remote cursor has not been established yet, + // this member will be unset. If a remote cursor has been established and is not yet + // exhausted, this member will be set to a valid non-zero cursor id. If a remote cursor was + // established but is now exhausted, this member will be set to zero. boost::optional<CursorId> cursorId; + std::queue<BSONObj> docBuffer; executor::TaskExecutor::CallbackHandle cbHandle; Status status = Status::OK(); - // Set to true once we have heard from the remote node at least once. - bool gotFirstResponse = false; - // Count of fetched docs during ARM processing of the current batch. Used to reduce the // batchSize in getMore when mongod returned less docs than the requested batchSize. long long fetchedCount = 0; diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index b72a1f61729..f5ae25e197e 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -76,7 +76,8 @@ protected: const std::vector<HostAndPort>& remotes, boost::optional<long long> getMoreBatchSize = boost::none) { const bool isExplain = true; - lpq = unittest::assertGet(LiteParsedQuery::makeFromFindCommand(_nss, findCmd, isExplain)); + const auto lpq = + unittest::assertGet(LiteParsedQuery::makeFromFindCommand(_nss, findCmd, isExplain)); params = ClusterClientCursorParams(_nss); params.sort = lpq->getSort(); @@ -86,10 +87,22 @@ protected: params.isTailable = lpq->isTailable(); for (const auto& hostAndPort : remotes) { - ClusterClientCursorParams::Remote remoteParams; - remoteParams.hostAndPort = hostAndPort; - remoteParams.cmdObj = findCmd; - params.remotes.push_back(remoteParams); + params.remotes.emplace_back(hostAndPort, findCmd); + } + + arm = stdx::make_unique<AsyncResultsMerger>(executor, params); + } + + /** + * Given a vector of (HostAndPort, CursorIds) representing a set of existing cursors, constructs + * the appropriate ARM. The default CCC parameters are used. + */ + void makeCursorFromExistingCursors( + const std::vector<std::pair<HostAndPort, CursorId>>& remotes) { + params = ClusterClientCursorParams(_nss); + + for (const auto& hostIdPair : remotes) { + params.remotes.emplace_back(hostIdPair.first, hostIdPair.second); } arm = stdx::make_unique<AsyncResultsMerger>(executor, params); @@ -163,7 +176,6 @@ protected: const std::vector<HostAndPort> _remotes; executor::TaskExecutor* executor; - std::unique_ptr<LiteParsedQuery> lpq; ClusterClientCursorParams params; std::unique_ptr<AsyncResultsMerger> arm; @@ -436,6 +448,35 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) { ASSERT(!unittest::assertGet(arm->nextReady())); } +TEST_F(AsyncResultsMergerTest, ExistingCursors) { + makeCursorFromExistingCursors({{_remotes[0], 5}, {_remotes[1], 6}}); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; + responses.emplace_back(_nss, CursorId(0), batch1); + std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; + responses.emplace_back(_nss, CursorId(0), batch2); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); + + executor->waitForEvent(readyEvent); + + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(arm->ready()); + ASSERT(!unittest::assertGet(arm->nextReady())); +} + + TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); makeCursorFromFindCmd(findCmd, {_remotes[0], _remotes[1]}); diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index e73a6d5d9ab..85921c8e280 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -32,6 +32,7 @@ #include <vector> #include "mongo/bson/bsonobj.h" +#include "mongo/db/cursor_id.h" #include "mongo/db/namespace_string.h" namespace mongo { @@ -41,11 +42,35 @@ struct ClusterClientCursorParams { * Contains any CCC parameters that are specified per-remote node. */ struct Remote { + /** + * Use when a new cursor should be created on the remote. + */ + Remote(HostAndPort hostAndPort, BSONObj cmdObj) + : hostAndPort(std::move(hostAndPort)), cmdObj(std::move(cmdObj)) {} + + /** + * Use when an a cursor already exists on the remote. The resulting CCC will take ownership + * of the existing remote cursor, generating results based on its current state. + * + * Note that any results already generated from this cursor will not be returned by the + * resulting CCC. The caller is responsible for ensuring that results previously generated + * by this cursor have been processed. + */ + Remote(HostAndPort hostAndPort, CursorId cursorId) + : hostAndPort(std::move(hostAndPort)), cursorId(cursorId) {} + // How the networking layer should contact this remote. - HostAndPort hostAndPort; + const HostAndPort hostAndPort; // The raw command parameters to send to this remote (e.g. the find command specification). - BSONObj cmdObj; + // + // Exactly one of 'cmdObj' or 'cursorId' must be set. + const boost::optional<BSONObj> cmdObj; + + // The cursorId for the remote node, if one already exists. + // + // Exactly one of 'cmdObj' or 'cursorId' must be set. + const boost::optional<CursorId> cursorId; }; ClusterClientCursorParams() {} diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 71286aacf33..9fdb706f4b4 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -143,10 +143,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, // Use read pref to target a particular host from each shard. Also construct the find command // that we will forward to each shard. - params.remotes.resize(shards.size()); - for (size_t i = 0; i < shards.size(); ++i) { - const auto& shard = shards[i]; - + for (const auto& shard : shards) { // The find command cannot be used to query config server content with legacy 3-host config // servers, because the new targeting logic only works for config server replica sets. if (shard->isConfig() && shard->getConnString().type() == ConnectionString::SYNC) { @@ -159,7 +156,6 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, if (!hostAndPort.isOK()) { return hostAndPort.getStatus(); } - params.remotes[i].hostAndPort = std::move(hostAndPort.getValue()); // Build the find command, and attach shard version if necessary. BSONObjBuilder cmdBuilder; @@ -170,7 +166,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, cmdBuilder.appendArray(LiteParsedQuery::kShardVersionField, shardVersion.toBSON()); } - params.remotes[i].cmdObj = cmdBuilder.obj(); + params.remotes.emplace_back(std::move(hostAndPort.getValue()), cmdBuilder.obj()); } auto ccc = |