summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-10-12 11:59:56 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-10-13 16:33:13 -0400
commit15350b5087dfd09cb2b25a22f422804abc8c2654 (patch)
tree3672f96555f1348736caae75805789223f2f21b6 /src
parent65afa27bd884f852a5ee8ae3e7174d1f648ae4a1 (diff)
downloadmongo-15350b5087dfd09cb2b25a22f422804abc8c2654.tar.gz
SERVER-20646 Move remote cursor host targeting inside the AsyncResultsMerger
No functional changes, just moves the resolution of the remote cursor's host to happen inside the AsyncResultsMerger instead of at the level of the entire find command. Also hooks the async merger tests with the sharding test fixture so they can access all objects.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/s/query/SConscript1
-rw-r--r--src/mongo/s/query/async_results_merger.cpp132
-rw-r--r--src/mongo/s/query/async_results_merger.h38
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp148
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h49
-rw-r--r--src/mongo/s/query/cluster_find.cpp14
6 files changed, 241 insertions, 141 deletions
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index f499423e99a..cf7c5e406dd 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -89,6 +89,7 @@ env.CppUnitTest(
"$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture",
"$BUILD_DIR/mongo/s/coreshard",
"$BUILD_DIR/mongo/s/mongoscore",
+ "$BUILD_DIR/mongo/s/sharding_test_fixture",
"async_results_merger",
],
)
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index d8a47c23eaa..aec7c898ada 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -39,6 +39,8 @@
#include "mongo/executor/remote_command_request.h"
#include "mongo/executor/remote_command_response.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/scopeguard.h"
@@ -50,14 +52,27 @@ AsyncResultsMerger::AsyncResultsMerger(executor::TaskExecutor* executor,
_params(std::move(params)),
_mergeQueue(MergingComparator(_remotes, _params.sort)) {
for (const auto& remote : _params.remotes) {
- _remotes.emplace_back(remote);
+ if (remote.shardId) {
+ invariant(remote.cmdObj);
+ invariant(!remote.cursorId);
+ invariant(!remote.hostAndPort);
+ _remotes.emplace_back(*remote.shardId, *remote.cmdObj);
+ } else {
+ invariant(!remote.cmdObj);
+ invariant(remote.cursorId);
+ invariant(remote.hostAndPort);
+ _remotes.emplace_back(*remote.hostAndPort, *remote.cursorId);
+ }
}
- // Initialize command metadata to handle isSecondaryOk.
- BSONObjBuilder metadataBuilder;
- rpc::ServerSelectionMetadata metadata(_params.isSecondaryOk, boost::none);
- uassertStatusOK(metadata.writeToMetadata(&metadataBuilder));
- _metadataObj = metadataBuilder.obj();
+ // Initialize command metadata to handle the read preference.
+ if (_params.readPreference) {
+ BSONObjBuilder metadataBuilder;
+ rpc::ServerSelectionMetadata metadata(
+ _params.readPreference->pref != ReadPreference::PrimaryOnly, boost::none);
+ uassertStatusOK(metadata.writeToMetadata(&metadataBuilder));
+ _metadataObj = metadataBuilder.obj();
+ }
}
AsyncResultsMerger::~AsyncResultsMerger() {
@@ -228,22 +243,34 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
// request to fetch the remaining docs only. If the remote node has a plan with OR for top k and
// a full sort as is the case for the OP_QUERY find then this optimization will prevent
// switching to the full sort plan branch.
- auto adjustedBatchSize = _params.batchSize;
+ BSONObj cmdObj;
+
+ if (remote.cursorId) {
+ auto adjustedBatchSize = _params.batchSize;
- if (remote.cursorId && _params.batchSize && *_params.batchSize > remote.fetchedCount) {
- adjustedBatchSize = *_params.batchSize - remote.fetchedCount;
+ if (_params.batchSize && *_params.batchSize > remote.fetchedCount) {
+ adjustedBatchSize = *_params.batchSize - remote.fetchedCount;
+ }
+
+ cmdObj =
+ GetMoreRequest(
+ _params.nsString, *remote.cursorId, adjustedBatchSize, boost::none, boost::none)
+ .toBSON();
} else {
+ // Do the first time shard host resolution.
+ invariant(_params.readPreference);
+ Status resolveStatus =
+ remote.resolveShardIdToHostAndPort(_params.txn, *_params.readPreference);
+ if (!resolveStatus.isOK()) {
+ return resolveStatus;
+ }
+
remote.fetchedCount = 0;
+ cmdObj = *remote.initialCmdObj;
}
- BSONObj cmdObj = remote.cursorId
- ? GetMoreRequest(
- _params.nsString, *remote.cursorId, adjustedBatchSize, boost::none, boost::none)
- .toBSON()
- : *remote.cmdObj;
-
executor::RemoteCommandRequest request(
- remote.hostAndPort, _params.nsString.db().toString(), cmdObj, _metadataObj);
+ remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, _metadataObj);
auto callbackStatus = _executor->scheduleRemoteCommand(
request,
@@ -368,31 +395,35 @@ void AsyncResultsMerger::handleBatchResponse(
_lifecycleState = kKillComplete;
}
+
return;
}
// Early return from this point on signal anyone waiting on an event, if ready() is true.
ScopeGuard signaller = MakeGuard(&AsyncResultsMerger::signalCurrentEventIfReady_inlock, this);
- if (!cbData.response.isOK()) {
- remote.status = cbData.response.getStatus();
+ StatusWith<CursorResponse> cursorResponseStatus(
+ cbData.response.isOK() ? parseCursorResponse(cbData.response.getValue().data, remote)
+ : cbData.response.getStatus());
+
+ if (!cursorResponseStatus.isOK()) {
+ remote.status = cursorResponseStatus.getStatus();
// Errors other than HostUnreachable have no special handling.
if (remote.status != ErrorCodes::HostUnreachable) {
return;
}
- // Notify that targeter that the host is unreachable. The caller can then retry on a new
- // host.
+ // Notify the shard registry of the failure.
if (remote.shardId) {
- auto shard = _params.shardRegistry->getShard(_params.txn, *remote.shardId);
+ auto shard = grid.shardRegistry()->getShard(_params.txn, *remote.shardId);
if (!shard) {
- remote.status =
- Status(ErrorCodes::HostUnreachable,
- str::stream() << "Could not find shard " << *remote.shardId
- << " containing host " << remote.hostAndPort.toString());
+ remote.status = Status(ErrorCodes::HostUnreachable,
+ str::stream() << "Could not find shard " << *remote.shardId
+ << " containing host "
+ << remote.getTargetHost().toString());
} else {
- shard->getTargeter()->markHostUnreachable(remote.hostAndPort);
+ shard->getTargeter()->markHostUnreachable(remote.getTargetHost());
}
}
@@ -410,15 +441,9 @@ void AsyncResultsMerger::handleBatchResponse(
return;
}
- auto cursorResponseStatus = parseCursorResponse(cbData.response.getValue().data, remote);
- if (!cursorResponseStatus.isOK()) {
- remote.status = cursorResponseStatus.getStatus();
- return;
- }
-
auto cursorResponse = cursorResponseStatus.getValue();
remote.cursorId = cursorResponse.cursorId;
- remote.cmdObj = boost::none;
+ remote.initialCmdObj = boost::none;
for (const auto& obj : cursorResponse.batch) {
// If there's a sort, we're expecting the remote node to give us back a sort key.
@@ -496,7 +521,7 @@ void AsyncResultsMerger::scheduleKillCursors_inlock() {
BSONObj cmdObj = KillCursorsRequest(_params.nsString, {*remote.cursorId}).toBSON();
executor::RemoteCommandRequest request(
- remote.hostAndPort, _params.nsString.db().toString(), cmdObj);
+ remote.getTargetHost(), _params.nsString.db().toString(), cmdObj);
_executor->scheduleRemoteCommand(
request,
@@ -548,14 +573,16 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill() {
// AsyncResultsMerger::RemoteCursorData
//
-AsyncResultsMerger::RemoteCursorData::RemoteCursorData(
- const ClusterClientCursorParams::Remote& params)
- : hostAndPort(params.hostAndPort),
- shardId(params.shardId),
- cmdObj(params.cmdObj),
- cursorId(params.cursorId) {
- // Either cmdObj or cursorId can be provided, but not both.
- invariant(static_cast<bool>(cmdObj) != static_cast<bool>(cursorId));
+AsyncResultsMerger::RemoteCursorData::RemoteCursorData(ShardId shardId, BSONObj cmdObj)
+ : shardId(std::move(shardId)), initialCmdObj(std::move(cmdObj)) {}
+
+AsyncResultsMerger::RemoteCursorData::RemoteCursorData(HostAndPort hostAndPort,
+ CursorId establishedCursorId)
+ : cursorId(establishedCursorId), _shardHostAndPort(std::move(hostAndPort)) {}
+
+const HostAndPort& AsyncResultsMerger::RemoteCursorData::getTargetHost() const {
+ invariant(_shardHostAndPort);
+ return *_shardHostAndPort;
}
bool AsyncResultsMerger::RemoteCursorData::hasNext() const {
@@ -566,6 +593,27 @@ bool AsyncResultsMerger::RemoteCursorData::exhausted() const {
return cursorId && (*cursorId == 0);
}
+Status AsyncResultsMerger::RemoteCursorData::resolveShardIdToHostAndPort(
+ OperationContext* txn, const ReadPreferenceSetting& readPref) {
+ invariant(shardId);
+ invariant(!cursorId);
+
+ const auto shard = grid.shardRegistry()->getShard(txn, *shardId);
+ if (!shard) {
+ return Status(ErrorCodes::ShardNotFound,
+ str::stream() << "Could not find shard " << *shardId);
+ }
+
+ auto findHostStatus = shard->getTargeter()->findHost(readPref);
+ if (!findHostStatus.isOK()) {
+ return findHostStatus.getStatus();
+ }
+
+ _shardHostAndPort = std::move(findHostStatus.getValue());
+
+ return Status::OK();
+}
+
//
// AsyncResultsMerger::MergingComparator
//
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 4c6cc58f4d1..b51f57f0559 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -160,7 +160,23 @@ private:
* reported from the remote.
*/
struct RemoteCursorData {
- RemoteCursorData(const ClusterClientCursorParams::Remote& params);
+ /**
+ * Creates a new uninitialized remote cursor state, which will have to send a command in
+ * order to establish its cursor id. Must only be used if the remote cursor ids are not yet
+ * known.
+ */
+ RemoteCursorData(ShardId shardId, BSONObj cmdObj);
+
+ /**
+ * Instantiates a new initialized remote cursor, which has an established cursor id. It may
+ * only be used for getMore operations.
+ */
+ RemoteCursorData(HostAndPort hostAndPort, CursorId establishedCursorId);
+
+ /**
+ * Returns the resolved host and port on which the remote cursor resides.
+ */
+ const HostAndPort& getTargetHost() const;
/**
* Returns whether there is another buffered result available for this remote node.
@@ -173,13 +189,22 @@ private:
*/
bool exhausted() const;
- HostAndPort hostAndPort;
- boost::optional<ShardId> shardId;
+ /**
+ * Given the shard id with which the cursor was initialized and a read preference, selects
+ * a host on which the cursor should be created.
+ *
+ * May not be called once a cursor has already been established.
+ */
+ Status resolveShardIdToHostAndPort(OperationContext* txn,
+ const ReadPreferenceSetting& readPref);
+
+ // ShardId on which a cursor will be created.
+ const boost::optional<ShardId> shardId;
// The command object for sending to the remote to establish the cursor. If a remote cursor
// has not been established yet, this member will be set to a valid command object. If a
// remote cursor has already been established, this member will be unset.
- boost::optional<BSONObj> cmdObj;
+ boost::optional<BSONObj> initialCmdObj;
// The cursor id for the remote cursor. If a remote cursor has not been established yet,
// this member will be unset. If a remote cursor has been established and is not yet
@@ -194,6 +219,11 @@ private:
// Count of fetched docs during ARM processing of the current batch. Used to reduce the
// batchSize in getMore when mongod returned less docs than the requested batchSize.
long long fetchedCount = 0;
+
+ private:
+ // For a cursor, which has shard id associated contains the exact host on which the remote
+ // cursor resides.
+ boost::optional<HostAndPort> _shardHostAndPort;
};
class MergingComparator {
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index dd1d90dc636..291be1670f5 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -30,6 +30,8 @@
#include "mongo/s/query/async_results_merger.h"
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/client/remote_command_targeter_factory_mock.h"
#include "mongo/db/json.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/getmore_request.h"
@@ -38,6 +40,9 @@
#include "mongo/executor/task_executor.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/sharding_test_fixture.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
@@ -49,21 +54,44 @@ using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
-class AsyncResultsMergerTest : public executor::ThreadPoolExecutorTest {
+const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
+const std::vector<std::string> kTestShardIds = {"FakeShardId1", "FakeShardId2", "FakeShardId3"};
+const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShardHost1", 12345),
+ HostAndPort("FakeShardHost2", 12345),
+ HostAndPort("FakeShardHost3", 12345)};
+
+class AsyncResultsMergerTest : public ShardingTestFixture {
public:
- AsyncResultsMergerTest()
- : _nss("testdb.testcoll"),
- _remotes({HostAndPort("localhost", -1),
- HostAndPort("localhost", -2),
- HostAndPort("localhost", -3)}) {}
-
- void setUp() final {
- ThreadPoolExecutorTest::setUp();
- launchExecutorThread();
- executor = &getExecutor();
- }
+ AsyncResultsMergerTest() : _nss("testdb.testcoll") {}
+
+ void setUp() override {
+ ShardingTestFixture::setUp();
+
+ executor = shardRegistry()->getExecutor();
+
+ getMessagingPort()->setRemote(HostAndPort("ClientHost", 12345));
+ configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
+
+ std::vector<ShardType> shards;
+
+ for (size_t i = 0; i < kTestShardIds.size(); i++) {
+ ShardType shardType;
+ shardType.setName(kTestShardIds[i]);
+ shardType.setHost(kTestShardHosts[i].toString());
- void postExecutorThreadLaunch() final {}
+ shards.push_back(shardType);
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(ConnectionString(kTestConfigShardHost));
+ targeter->setFindHostReturnValue(kTestShardHosts[i]);
+
+ targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]),
+ std::move(targeter));
+ }
+
+ setupShards(shards);
+ }
protected:
/**
@@ -73,30 +101,26 @@ protected:
* If 'batchSize' is set (i.e. not equal to boost::none), this batchSize is used for each
* getMore. If 'findCmd' has a batchSize, this is used just for the initial find operation.
*/
- void makeCursorFromFindCmd(const BSONObj& findCmd,
- const std::vector<HostAndPort>& remotes,
- boost::optional<long long> getMoreBatchSize = boost::none,
- bool isSecondaryOk = false) {
+ void makeCursorFromFindCmd(
+ const BSONObj& findCmd,
+ const std::vector<ShardId>& shardIds,
+ boost::optional<long long> getMoreBatchSize = boost::none,
+ ReadPreferenceSetting readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly)) {
const bool isExplain = true;
const auto lpq =
unittest::assertGet(LiteParsedQuery::makeFromFindCommand(_nss, findCmd, isExplain));
- params = ClusterClientCursorParams(_nss);
+ ClusterClientCursorParams params =
+ ClusterClientCursorParams(operationContext(), _nss, readPref);
params.sort = lpq->getSort();
params.limit = lpq->getLimit();
params.batchSize = getMoreBatchSize ? getMoreBatchSize : lpq->getBatchSize();
params.skip = lpq->getSkip();
params.isTailable = lpq->isTailable();
params.isAllowPartialResults = lpq->isAllowPartialResults();
- params.isSecondaryOk = isSecondaryOk;
-
- for (const auto& hostAndPort : remotes) {
- // Pass boost::none in place of a ShardId. If there is a ShardId and the ARM receives an
- // UnreachableHost error, it will attempt to look inside the shard registry in order to
- // find the Shard to which the host belongs and notify it of the unreachable host. Since
- // this text fixture is not passing down the shard registry (or an OperationContext),
- // we must skip this unreachable host handling.
- params.remotes.emplace_back(hostAndPort, boost::none, findCmd);
+
+ for (const auto& shardId : shardIds) {
+ params.remotes.emplace_back(shardId, findCmd);
}
arm = stdx::make_unique<AsyncResultsMerger>(executor, params);
@@ -108,7 +132,7 @@ protected:
*/
void makeCursorFromExistingCursors(
const std::vector<std::pair<HostAndPort, CursorId>>& remotes) {
- params = ClusterClientCursorParams(_nss);
+ ClusterClientCursorParams params = ClusterClientCursorParams(_nss);
for (const auto& hostIdPair : remotes) {
params.remotes.emplace_back(hostIdPair.first, hostIdPair.second);
@@ -133,7 +157,7 @@ protected:
* Schedules a list of raw BSON command responses to be returned by the mock network.
*/
void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) {
- executor::NetworkInterfaceMock* net = getNet();
+ executor::NetworkInterfaceMock* net = network();
net->enterNetwork();
for (const auto& obj : objs) {
ASSERT_TRUE(net->hasReadyRequests());
@@ -147,7 +171,7 @@ protected:
}
RemoteCommandRequest getFirstPendingRequest() {
- executor::NetworkInterfaceMock* net = getNet();
+ executor::NetworkInterfaceMock* net = network();
net->enterNetwork();
ASSERT_TRUE(net->hasReadyRequests());
NetworkInterfaceMock::NetworkOperationIterator noi = net->getFrontOfUnscheduledQueue();
@@ -158,7 +182,7 @@ protected:
void scheduleErrorResponse(Status status) {
invariant(!status.isOK());
- executor::NetworkInterfaceMock* net = getNet();
+ executor::NetworkInterfaceMock* net = network();
net->enterNetwork();
ASSERT_TRUE(net->hasReadyRequests());
net->scheduleResponse(net->getNextReadyRequest(), net->now(), status);
@@ -167,7 +191,7 @@ protected:
}
void blackHoleNextRequest() {
- executor::NetworkInterfaceMock* net = getNet();
+ executor::NetworkInterfaceMock* net = network();
net->enterNetwork();
ASSERT_TRUE(net->hasReadyRequests());
net->blackHole(net->getNextReadyRequest());
@@ -175,17 +199,15 @@ protected:
}
const NamespaceString _nss;
- const std::vector<HostAndPort> _remotes;
executor::TaskExecutor* executor;
- ClusterClientCursorParams params;
std::unique_ptr<AsyncResultsMerger> arm;
};
TEST_F(AsyncResultsMergerTest, ClusterFind) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, _remotes);
+ makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -232,7 +254,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) {
TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(findCmd, _remotes);
+ makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -307,7 +329,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
TEST_F(AsyncResultsMergerTest, ClusterFindSorted) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}, batchSize: 2}");
- makeCursorFromFindCmd(findCmd, _remotes);
+ makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -344,7 +366,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSorted) {
TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}, batchSize: 2}");
- makeCursorFromFindCmd(findCmd, _remotes);
+ makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -413,7 +435,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}, batchSize: 2}");
- makeCursorFromFindCmd(findCmd, _remotes);
+ makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -450,7 +472,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) {
TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}, batchSize: 2}");
- makeCursorFromFindCmd(findCmd, {_remotes[0]});
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -479,7 +501,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) {
// command is one.
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 0}");
const long long getMoreBatchSize = 1LL;
- makeCursorFromFindCmd(findCmd, {_remotes[0], _remotes[1]}, getMoreBatchSize);
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]}, getMoreBatchSize);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -530,7 +552,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) {
}
TEST_F(AsyncResultsMergerTest, ExistingCursors) {
- makeCursorFromExistingCursors({{_remotes[0], 5}, {_remotes[1], 6}});
+ makeCursorFromExistingCursors({{kTestShardHosts[0], 5}, {kTestShardHosts[1], 6}});
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -560,7 +582,7 @@ TEST_F(AsyncResultsMergerTest, ExistingCursors) {
TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(findCmd, {_remotes[0], _remotes[1]});
+ makeCursorFromFindCmd(findCmd, {kTestShardHosts[0].toString(), kTestShardHosts[1].toString()});
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -627,7 +649,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, {_remotes[0]});
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -668,7 +690,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, _remotes);
+ makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -695,7 +717,7 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, _remotes);
+ makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -724,7 +746,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, {_remotes[0]});
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -752,7 +774,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, _remotes);
+ makeCursorFromFindCmd(findCmd, kTestShardIds);
executor->shutdown();
ASSERT_NOT_OK(arm->nextEvent().getStatus());
auto killEvent = arm->kill();
@@ -761,7 +783,7 @@ TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatches) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, {_remotes[0]});
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
// Make a request to the shard that will never get answered.
ASSERT_FALSE(arm->ready());
@@ -777,7 +799,7 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch
TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
- makeCursorFromFindCmd(findCmd, _remotes);
+ makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
auto killedEvent = arm->kill();
@@ -792,7 +814,7 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(findCmd, _remotes);
+ makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -816,7 +838,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) {
TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(findCmd, _remotes);
+ makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -855,7 +877,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(findCmd, {_remotes[0]});
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -900,7 +922,7 @@ TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) {
TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(findCmd, {_remotes[0]});
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -921,7 +943,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- makeCursorFromFindCmd(findCmd, {_remotes[0]});
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
auto killedEvent1 = arm->kill();
ASSERT(killedEvent1.isValid());
auto killedEvent2 = arm->kill();
@@ -932,7 +954,7 @@ TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
TEST_F(AsyncResultsMergerTest, TailableBasic) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- makeCursorFromFindCmd(findCmd, {_remotes[0]});
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -977,7 +999,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- makeCursorFromFindCmd(findCmd, {_remotes[0]});
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1002,7 +1024,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
- makeCursorFromFindCmd(findCmd, {_remotes[0]});
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1024,7 +1046,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 3}");
- makeCursorFromFindCmd(findCmd, {_remotes[0]});
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1065,8 +1087,8 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
- const bool isSecondaryOk = true;
- makeCursorFromFindCmd(findCmd, {_remotes[0]}, boost::none, isSecondaryOk);
+ makeCursorFromFindCmd(
+ findCmd, {kTestShardIds[0]}, boost::none, ReadPreferenceSetting(ReadPreference::Nearest));
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1089,7 +1111,7 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) {
TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
- makeCursorFromFindCmd(findCmd, _remotes);
+ makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1149,7 +1171,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
- makeCursorFromFindCmd(findCmd, {_remotes[0]});
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 3c73d47ba9b..0e497fd105a 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -33,14 +33,16 @@
#include <vector>
#include "mongo/bson/bsonobj.h"
+#include "mongo/client/read_preference.h"
#include "mongo/db/cursor_id.h"
#include "mongo/db/namespace_string.h"
#include "mongo/s/client/shard.h"
-#include "mongo/s/client/shard_registry.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
+class OperationContext;
+
struct ClusterClientCursorParams {
// When mongos has to do a merge in order to return results to the client in the correct sort
// order, it requests a sortKey meta-projection using this field name.
@@ -53,10 +55,7 @@ struct ClusterClientCursorParams {
/**
* Use when a new cursor should be created on the remote.
*/
- Remote(HostAndPort hostAndPort, boost::optional<ShardId> sid, BSONObj cmdObj)
- : hostAndPort(std::move(hostAndPort)),
- shardId(std::move(sid)),
- cmdObj(std::move(cmdObj)) {}
+ Remote(ShardId sid, BSONObj cmdObj) : shardId(std::move(sid)), cmdObj(std::move(cmdObj)) {}
/**
* Use when an a cursor already exists on the remote. The resulting CCC will take ownership
@@ -69,16 +68,17 @@ struct ClusterClientCursorParams {
Remote(HostAndPort hostAndPort, CursorId cursorId)
: hostAndPort(std::move(hostAndPort)), cursorId(cursorId) {}
- // How the networking layer should contact this remote.
- HostAndPort hostAndPort;
-
- // The id of the shard to which this remote belongs. If the cursor was already established
- // on the remote when the CCC was established, 'shardId' is boost::none. (Since a cursor has
- // already been successfully created on a particular host in this case, there is no need to
- // know or care to which shard this host belongs. No re-targeting of a different host within
- // the shard will take place.)
+ // If this is a regular query cursor, this value will be set and shard id retargeting may
+ // occur on certain networking or replication errors.
+ //
+ // If this is an externally-prepared cursor (as is in the case of aggregation cursors),
+ // this value will never be set and no retargeting will occur.
boost::optional<ShardId> shardId;
+ // If this is an externally-specified cursor (e.g. aggregation), this value will be set and
+ // used directly and no re-targeting may happen on errors.
+ boost::optional<HostAndPort> hostAndPort;
+
// The raw command parameters to send to this remote (e.g. the find command specification).
//
// Exactly one of 'cmdObj' or 'cursorId' must be set.
@@ -90,16 +90,25 @@ struct ClusterClientCursorParams {
boost::optional<CursorId> cursorId;
};
- ClusterClientCursorParams() {}
+ /**
+ * Constructor used for cases where initial shard host targeting is necessary (i.e., we don't
+ * know yet the remote cursor id).
+ */
+ ClusterClientCursorParams(OperationContext* opCtx,
+ NamespaceString nss,
+ ReadPreferenceSetting readPref)
+ : txn(opCtx), nsString(std::move(nss)), readPreference(std::move(readPref)) {}
+ /**
+ * Constructor used for cases, where the remote cursor ids are already known and no resolution
+ * or retargeting needs to happen.
+ */
ClusterClientCursorParams(NamespaceString nss) : nsString(std::move(nss)) {}
// Not owned.
OperationContext* txn = nullptr;
- // Unowned pointer to the global registry of shards.
- ShardRegistry* shardRegistry = nullptr;
-
+ // Namespace against which to query.
NamespaceString nsString;
// Per-remote node data.
@@ -123,9 +132,9 @@ struct ClusterClientCursorParams {
// Whether this cursor is tailing a capped collection.
bool isTailable = false;
- // Whether any of the remote nodes might be secondaries due to a read preference mode other
- // than "primary".
- bool isSecondaryOk = false;
+ // Read preference for where to target the query. This value is only set if initial shard host
+ // targeting is necessary and not used if using externally prepared cursor ids.
+ boost::optional<ReadPreferenceSetting> readPreference;
// Whether the client indicated that it is willing to receive partial results in the case of an
// unreachable host.
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index ebc04d3103a..e1f36f7a9a7 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -39,7 +39,6 @@
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connpool.h"
#include "mongo/client/read_preference.h"
-#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/commands.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/find_common.h"
@@ -215,14 +214,12 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
}
}
- ClusterClientCursorParams params(query.nss());
+ ClusterClientCursorParams params(txn, query.nss(), readPref);
params.txn = txn;
- params.shardRegistry = shardRegistry;
params.limit = query.getParsed().getLimit();
params.batchSize = query.getParsed().getEffectiveBatchSize();
params.skip = query.getParsed().getSkip();
params.isTailable = query.getParsed().isTailable();
- params.isSecondaryOk = (readPref.pref != ReadPreference::PrimaryOnly);
params.isAllowPartialResults = query.getParsed().isAllowPartialResults();
// This is the batchSize passed to each subsequent getMore command issued by the cursor. We
@@ -254,12 +251,6 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
return runConfigServerQuerySCCC(query, *shard, results);
}
- auto targeter = shard->getTargeter();
- auto hostAndPort = targeter->findHost(readPref);
- if (!hostAndPort.isOK()) {
- return hostAndPort.getStatus();
- }
-
// Build the find command, and attach shard version if necessary.
BSONObjBuilder cmdBuilder;
lpqToForward->asFindCommand(&cmdBuilder);
@@ -272,8 +263,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
version.appendForCommands(&cmdBuilder);
}
- params.remotes.emplace_back(
- std::move(hostAndPort.getValue()), shard->getId(), cmdBuilder.obj());
+ params.remotes.emplace_back(shard->getId(), cmdBuilder.obj());
}
auto ccc =