summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorJason Rassi <rassi@10gen.com>2015-08-27 19:39:09 -0400
committerJason Rassi <rassi@10gen.com>2015-08-28 17:42:01 -0400
commit9eb318778fa0d16d2156db5f9cee3c6ad17d507c (patch)
tree3b27a388c89025341e9fbdad6fb843fab004f25b /src/mongo/s
parent1f73154b39e5d404f92558d3ca6baebaef6bfacc (diff)
downloadmongo-9eb318778fa0d16d2156db5f9cee3c6ad17d507c.tar.gz
SERVER-19569 AsyncResultsMerger ability to merge existing cursors
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp13
-rw-r--r--src/mongo/s/query/async_results_merger.h15
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp53
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h29
-rw-r--r--src/mongo/s/query/cluster_find.cpp8
5 files changed, 94 insertions, 24 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 5a1992cc280..9ad91a611ab 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -91,7 +91,7 @@ bool AsyncResultsMerger::ready_inlock() {
// We don't return any results until we have received at least one response from each remote
// node. This is necessary for versioned commands: we have to ensure that we've properly
// established the shard version on each node before we can start returning results.
- if (!remote.gotFirstResponse) {
+ if (!remote.cursorId) {
return false;
}
}
@@ -224,7 +224,7 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
BSONObj cmdObj = remote.cursorId
? GetMoreRequest(_params.nsString, *remote.cursorId, adjustedBatchSize, boost::none)
.toBSON()
- : remote.cmdObj;
+ : *remote.cmdObj;
executor::RemoteCommandRequest request(
remote.hostAndPort, _params.nsString.db().toString(), cmdObj);
@@ -351,10 +351,8 @@ void AsyncResultsMerger::handleBatchResponse(
return;
}
- // Mark that we've gotten a valid response back from 'remote' at least once.
- remote.gotFirstResponse = true;
-
remote.cursorId = cursorResponse.cursorId;
+ remote.cmdObj = boost::none;
for (const auto& obj : cursorResponse.batch) {
remote.docBuffer.push(obj);
@@ -483,7 +481,10 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill() {
AsyncResultsMerger::RemoteCursorData::RemoteCursorData(
const ClusterClientCursorParams::Remote& params)
- : hostAndPort(params.hostAndPort), cmdObj(params.cmdObj) {}
+ : hostAndPort(params.hostAndPort), cmdObj(params.cmdObj), cursorId(params.cursorId) {
+ // Either cmdObj or cursorId can be provided, but not both.
+ invariant(static_cast<bool>(cmdObj) != static_cast<bool>(cursorId));
+}
bool AsyncResultsMerger::RemoteCursorData::hasNext() const {
return !docBuffer.empty();
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 1070e40e0a8..ba52c5ed08b 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -167,15 +167,22 @@ private:
bool exhausted() const;
HostAndPort hostAndPort;
- BSONObj cmdObj;
+
+ // The command object for sending to the remote to establish the cursor. If a remote cursor
+ // has not been established yet, this member will be set to a valid command object. If a
+ // remote cursor has already been established, this member will be unset.
+ boost::optional<BSONObj> cmdObj;
+
+ // The cursor id for the remote cursor. If a remote cursor has not been established yet,
+ // this member will be unset. If a remote cursor has been established and is not yet
+ // exhausted, this member will be set to a valid non-zero cursor id. If a remote cursor was
+ // established but is now exhausted, this member will be set to zero.
boost::optional<CursorId> cursorId;
+
std::queue<BSONObj> docBuffer;
executor::TaskExecutor::CallbackHandle cbHandle;
Status status = Status::OK();
- // Set to true once we have heard from the remote node at least once.
- bool gotFirstResponse = false;
-
// Count of fetched docs during ARM processing of the current batch. Used to reduce the
// batchSize in getMore when mongod returned less docs than the requested batchSize.
long long fetchedCount = 0;
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index b72a1f61729..f5ae25e197e 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -76,7 +76,8 @@ protected:
const std::vector<HostAndPort>& remotes,
boost::optional<long long> getMoreBatchSize = boost::none) {
const bool isExplain = true;
- lpq = unittest::assertGet(LiteParsedQuery::makeFromFindCommand(_nss, findCmd, isExplain));
+ const auto lpq =
+ unittest::assertGet(LiteParsedQuery::makeFromFindCommand(_nss, findCmd, isExplain));
params = ClusterClientCursorParams(_nss);
params.sort = lpq->getSort();
@@ -86,10 +87,22 @@ protected:
params.isTailable = lpq->isTailable();
for (const auto& hostAndPort : remotes) {
- ClusterClientCursorParams::Remote remoteParams;
- remoteParams.hostAndPort = hostAndPort;
- remoteParams.cmdObj = findCmd;
- params.remotes.push_back(remoteParams);
+ params.remotes.emplace_back(hostAndPort, findCmd);
+ }
+
+ arm = stdx::make_unique<AsyncResultsMerger>(executor, params);
+ }
+
+ /**
+ * Given a vector of (HostAndPort, CursorIds) representing a set of existing cursors, constructs
+ * the appropriate ARM. The default CCC parameters are used.
+ */
+ void makeCursorFromExistingCursors(
+ const std::vector<std::pair<HostAndPort, CursorId>>& remotes) {
+ params = ClusterClientCursorParams(_nss);
+
+ for (const auto& hostIdPair : remotes) {
+ params.remotes.emplace_back(hostIdPair.first, hostIdPair.second);
}
arm = stdx::make_unique<AsyncResultsMerger>(executor, params);
@@ -163,7 +176,6 @@ protected:
const std::vector<HostAndPort> _remotes;
executor::TaskExecutor* executor;
- std::unique_ptr<LiteParsedQuery> lpq;
ClusterClientCursorParams params;
std::unique_ptr<AsyncResultsMerger> arm;
@@ -436,6 +448,35 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) {
ASSERT(!unittest::assertGet(arm->nextReady()));
}
+TEST_F(AsyncResultsMergerTest, ExistingCursors) {
+ makeCursorFromExistingCursors({{_remotes[0], 5}, {_remotes[1], 6}});
+
+ ASSERT_FALSE(arm->ready());
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
+ responses.emplace_back(_nss, CursorId(0), batch1);
+ std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
+ responses.emplace_back(_nss, CursorId(0), batch2);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
+
+ executor->waitForEvent(readyEvent);
+
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(arm->ready());
+ ASSERT(!unittest::assertGet(arm->nextReady()));
+}
+
+
TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
makeCursorFromFindCmd(findCmd, {_remotes[0], _remotes[1]});
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index e73a6d5d9ab..85921c8e280 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -32,6 +32,7 @@
#include <vector>
#include "mongo/bson/bsonobj.h"
+#include "mongo/db/cursor_id.h"
#include "mongo/db/namespace_string.h"
namespace mongo {
@@ -41,11 +42,35 @@ struct ClusterClientCursorParams {
* Contains any CCC parameters that are specified per-remote node.
*/
struct Remote {
+ /**
+ * Use when a new cursor should be created on the remote.
+ */
+ Remote(HostAndPort hostAndPort, BSONObj cmdObj)
+ : hostAndPort(std::move(hostAndPort)), cmdObj(std::move(cmdObj)) {}
+
+ /**
+ * Use when an a cursor already exists on the remote. The resulting CCC will take ownership
+ * of the existing remote cursor, generating results based on its current state.
+ *
+ * Note that any results already generated from this cursor will not be returned by the
+ * resulting CCC. The caller is responsible for ensuring that results previously generated
+ * by this cursor have been processed.
+ */
+ Remote(HostAndPort hostAndPort, CursorId cursorId)
+ : hostAndPort(std::move(hostAndPort)), cursorId(cursorId) {}
+
// How the networking layer should contact this remote.
- HostAndPort hostAndPort;
+ const HostAndPort hostAndPort;
// The raw command parameters to send to this remote (e.g. the find command specification).
- BSONObj cmdObj;
+ //
+ // Exactly one of 'cmdObj' or 'cursorId' must be set.
+ const boost::optional<BSONObj> cmdObj;
+
+ // The cursorId for the remote node, if one already exists.
+ //
+ // Exactly one of 'cmdObj' or 'cursorId' must be set.
+ const boost::optional<CursorId> cursorId;
};
ClusterClientCursorParams() {}
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 71286aacf33..9fdb706f4b4 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -143,10 +143,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
// Use read pref to target a particular host from each shard. Also construct the find command
// that we will forward to each shard.
- params.remotes.resize(shards.size());
- for (size_t i = 0; i < shards.size(); ++i) {
- const auto& shard = shards[i];
-
+ for (const auto& shard : shards) {
// The find command cannot be used to query config server content with legacy 3-host config
// servers, because the new targeting logic only works for config server replica sets.
if (shard->isConfig() && shard->getConnString().type() == ConnectionString::SYNC) {
@@ -159,7 +156,6 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
if (!hostAndPort.isOK()) {
return hostAndPort.getStatus();
}
- params.remotes[i].hostAndPort = std::move(hostAndPort.getValue());
// Build the find command, and attach shard version if necessary.
BSONObjBuilder cmdBuilder;
@@ -170,7 +166,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
cmdBuilder.appendArray(LiteParsedQuery::kShardVersionField, shardVersion.toBSON());
}
- params.remotes[i].cmdObj = cmdBuilder.obj();
+ params.remotes.emplace_back(std::move(hostAndPort.getValue()), cmdBuilder.obj());
}
auto ccc =