diff options
Diffstat (limited to 'src/mongo')
66 files changed, 854 insertions, 1376 deletions
diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp index 23a9b12a792..399d6cb9143 100644 --- a/src/mongo/client/dbclientcursor.cpp +++ b/src/mongo/client/dbclientcursor.cpp @@ -137,7 +137,7 @@ Message DBClientCursor::_assembleInit() { Message DBClientCursor::_assembleGetMore() { invariant(cursorId); if (_useFindCommand) { - std::int64_t batchSize = nextBatchSize(); + long long batchSize = nextBatchSize(); auto gmr = GetMoreRequest(ns, cursorId, boost::make_optional(batchSize != 0, batchSize), diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 4bc9e9b35cc..ac8a6fbf2f2 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -519,7 +519,7 @@ Status runAggregate(OperationContext* opCtx, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), opCtx->recoveryUnit()->getReadConcernLevel(), cmdObj); - if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { + if (expCtx->tailableMode == TailableMode::kTailableAndAwaitData) { cursorParams.setTailable(true); cursorParams.setAwaitData(true); } diff --git a/src/mongo/db/matcher/expression_optimize_test.cpp b/src/mongo/db/matcher/expression_optimize_test.cpp index e2f82257a67..9091ad48e93 100644 --- a/src/mongo/db/matcher/expression_optimize_test.cpp +++ b/src/mongo/db/matcher/expression_optimize_test.cpp @@ -133,7 +133,7 @@ TEST(ExpressionOptimizeTest, IsValidText) { TEST(ExpressionOptimizeTest, IsValidTextTailable) { // Filter inside QueryRequest is not used. auto qr = stdx::make_unique<QueryRequest>(nss); - qr->setTailableMode(TailableModeEnum::kTailable); + qr->setTailableMode(TailableMode::kTailable); ASSERT_OK(qr->validate()); // Invalid: TEXT and tailable. diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 88c56ab45b9..3d4481ebbc4 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -301,7 +301,6 @@ pipelineeEnv.Library( 'document_source_skip.cpp', 'document_source_sort.cpp', 'document_source_sort_by_count.cpp', - "cluster_aggregation_planner.cpp", 'document_source_tee_consumer.cpp', 'document_source_unwind.cpp', 'mongo_process_common.cpp', diff --git a/src/mongo/db/pipeline/cluster_aggregation_planner.cpp b/src/mongo/db/pipeline/cluster_aggregation_planner.cpp deleted file mode 100644 index 3653b5b2638..00000000000 --- a/src/mongo/db/pipeline/cluster_aggregation_planner.cpp +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Copyright (C) 2018 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/pipeline/cluster_aggregation_planner.h" - -#include "mongo/db/pipeline/document_source_match.h" -#include "mongo/db/pipeline/document_source_merge_cursors.h" -#include "mongo/db/pipeline/document_source_project.h" -#include "mongo/db/pipeline/document_source_sort.h" -#include "mongo/db/pipeline/document_source_unwind.h" - -namespace mongo { -namespace cluster_aggregation_planner { - -namespace { -/** - * Moves everything before a splittable stage to the shards. If there are no splittable stages, - * moves everything to the shards. - * - * It is not safe to call this optimization multiple times. - * - * NOTE: looks for SplittableDocumentSources and uses that API - */ -void findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) { - while (!mergePipe->getSources().empty()) { - boost::intrusive_ptr<DocumentSource> current = mergePipe->popFront(); - - // Check if this source is splittable. - SplittableDocumentSource* splittable = - dynamic_cast<SplittableDocumentSource*>(current.get()); - - if (!splittable) { - // Move the source from the merger _sources to the shard _sources. - shardPipe->pushBack(current); - } else { - // Split this source into 'merge' and 'shard' _sources. - boost::intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource(); - auto mergeSources = splittable->getMergeSources(); - - // A source may not simultaneously be present on both sides of the split. - invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) == - mergeSources.end()); - - if (shardSource) - shardPipe->pushBack(shardSource); - - // Add the stages in reverse order, so that they appear in the pipeline in the same - // order as they were returned by the stage. - for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) { - mergePipe->addInitialSource(*it); - } - - break; - } - } -} - -/** - * If the final stage on shards is to unwind an array, move that stage to the merger. This cuts down - * on network traffic and allows us to take advantage of reduced copying in unwind. - */ -void moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe) { - while (!shardPipe->getSources().empty() && - dynamic_cast<DocumentSourceUnwind*>(shardPipe->getSources().back().get())) { - mergePipe->addInitialSource(shardPipe->popBack()); - } -} - -/** - * Adds a stage to the end of 'shardPipe' explicitly requesting all fields that 'mergePipe' needs. - * This is only done if it heuristically determines that it is needed. This optimization can reduce - * the amount of network traffic and can also enable the shards to convert less source BSON into - * Documents. - */ -void limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe) { - auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery()) - ? DepsTracker::MetadataAvailable::kTextScore - : DepsTracker::MetadataAvailable::kNoMetadata; - DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata)); - if (mergeDeps.needWholeDocument) - return; // the merge needs all fields, so nothing we can do. - - // Empty project is "special" so if no fields are needed, we just ask for _id instead. - if (mergeDeps.fields.empty()) - mergeDeps.fields.insert("_id"); - - // Remove metadata from dependencies since it automatically flows through projection and we - // don't want to project it in to the document. - mergeDeps.setNeedTextScore(false); - - // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of - // field dependencies. While this may not be 100% ideal in all cases, it is simple and - // avoids the worst cases by ensuring that: - // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of - // dependencies. This situation can happen when a $sort is before the first $project or - // $group. Without the optimization, the shards would have to reify and transmit full - // objects even though only a subset of fields are needed. - // 2) Optimization IS NOT applied immediately following a $project or $group since it would - // add an unnecessary project (and therefore a deep-copy). - for (auto&& source : shardPipe->getSources()) { - DepsTracker dt(depsMetadata); - if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS) - return; - } - // if we get here, add the project. - boost::intrusive_ptr<DocumentSource> project = DocumentSourceProject::createFromBson( - BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->getContext()); - shardPipe->pushBack(project); -} -} // namespace - -void performSplitPipelineOptimizations(Pipeline* shardPipeline, Pipeline* mergingPipeline) { - // The order in which optimizations are applied can have significant impact on the - // efficiency of the final pipeline. Be Careful! - findSplitPoint(shardPipeline, mergingPipeline); - moveFinalUnwindFromShardsToMerger(shardPipeline, mergingPipeline); - limitFieldsSentFromShardsToMerger(shardPipeline, mergingPipeline); -} - -boost::optional<BSONObj> popLeadingMergeSort(Pipeline* pipeline) { - // Remove a leading $sort iff it is a mergesort, since the ARM cannot handle blocking $sort. - auto frontSort = pipeline->popFrontWithNameAndCriteria( - DocumentSourceSort::kStageName, [](const DocumentSource* const source) { - return static_cast<const DocumentSourceSort* const>(source)->mergingPresorted(); - }); - - if (frontSort) { - auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get()); - if (auto sortLimit = sortStage->getLimitSrc()) { - // There was a limit stage absorbed into the sort stage, so we need to preserve that. - pipeline->addInitialSource(sortLimit); - } - return sortStage - ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging) - .toBson(); - } - return boost::none; -} - -void addMergeCursorsSource(Pipeline* mergePipeline, - std::vector<RemoteCursor> remoteCursors, - executor::TaskExecutor* executor) { - AsyncResultsMergerParams armParams; - if (auto sort = popLeadingMergeSort(mergePipeline)) { - armParams.setSort(std::move(*sort)); - } - armParams.setRemotes(std::move(remoteCursors)); - armParams.setTailableMode(mergePipeline->getContext()->tailableMode); - armParams.setNss(mergePipeline->getContext()->ns); - mergePipeline->addInitialSource(DocumentSourceMergeCursors::create( - executor, std::move(armParams), mergePipeline->getContext())); -} - -} // namespace cluster_aggregation_planner -} // namespace mongo diff --git a/src/mongo/db/pipeline/cluster_aggregation_planner.h b/src/mongo/db/pipeline/cluster_aggregation_planner.h deleted file mode 100644 index f62f158f6ca..00000000000 --- a/src/mongo/db/pipeline/cluster_aggregation_planner.h +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Copyright (C) 2018 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - -#include "mongo/db/pipeline/pipeline.h" - -namespace mongo { -namespace cluster_aggregation_planner { - -/** - * Performs optimizations with the aim of reducing computing time and network traffic when a - * pipeline has been split into two pieces. Modifies 'shardPipeline' and 'mergingPipeline' such that - * they may contain different stages, but still compute the same results when executed. - */ -void performSplitPipelineOptimizations(Pipeline* shardPipeline, Pipeline* mergingPipeline); - -/** - * Rips off an initial $sort stage that can be handled by cursor merging machinery. Returns the - * sort key pattern of such a $sort stage if there was one, and boost::none otherwise. - */ -boost::optional<BSONObj> popLeadingMergeSort(Pipeline* mergePipeline); - -/** - * Creates a new DocumentSourceMergeCursors from the provided 'remoteCursors' and adds it to the - * front of 'mergePipeline'. - */ -void addMergeCursorsSource(Pipeline* mergePipeline, - std::vector<RemoteCursor> remoteCursors, - executor::TaskExecutor*); - -} // namespace cluster_aggregation_planner -} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index b227afeac5f..df7b31b330e 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -395,7 +395,7 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx, list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { // A change stream is a tailable + awaitData cursor. - expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData; + expCtx->tailableMode = TailableMode::kTailableAndAwaitData; // Prevent $changeStream from running on an entire database (or cluster-wide) unless we are in // test mode. diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 32efd87cdda..dfe4ece48ac 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -520,8 +520,9 @@ public: Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { while (_removeLeadingQueryStages && !pipeline->getSources().empty()) { - if (pipeline->popFrontWithName("$match") || pipeline->popFrontWithName("$sort") || - pipeline->popFrontWithName("$project")) { + if (pipeline->popFrontWithCriteria("$match") || + pipeline->popFrontWithCriteria("$sort") || + pipeline->popFrontWithCriteria("$project")) { continue; } break; diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp index 3e34fd366a0..06a314e8c2e 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp @@ -43,20 +43,15 @@ constexpr StringData DocumentSourceMergeCursors::kStageName; DocumentSourceMergeCursors::DocumentSourceMergeCursors( executor::TaskExecutor* executor, - AsyncResultsMergerParams armParams, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - boost::optional<BSONObj> ownedParamsSpec) - : DocumentSource(expCtx), - _armParamsObj(std::move(ownedParamsSpec)), - _executor(executor), - _armParams(std::move(armParams)) {} + std::unique_ptr<ClusterClientCursorParams> cursorParams, + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(expCtx), _executor(executor), _armParams(std::move(cursorParams)) {} DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() { // We don't expect or support tailable cursors to be executing through this stage. - invariant(pExpCtx->tailableMode == TailableModeEnum::kNormal); + invariant(pExpCtx->tailableMode == TailableMode::kNormal); if (!_arm) { - _arm.emplace(pExpCtx->opCtx, _executor, std::move(*_armParams)); - _armParams = boost::none; + _arm.emplace(pExpCtx->opCtx, _executor, _armParams.get()); } auto next = uassertStatusOK(_arm->blockingNext()); if (next.isEOF()) { @@ -65,19 +60,24 @@ DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() { return Document::fromBsonWithMetaData(*next.getResult()); } -Value DocumentSourceMergeCursors::serialize( - boost::optional<ExplainOptions::Verbosity> explain) const { +void DocumentSourceMergeCursors::serializeToArray( + std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { invariant(!_arm); invariant(_armParams); - return Value(Document{{kStageName, _armParams->toBSON()}}); + std::vector<Value> cursors; + for (auto&& remote : _armParams->remotes) { + cursors.emplace_back(Document{{"host", remote.hostAndPort.toString()}, + {"ns", remote.cursorResponse.getNSS().toString()}, + {"id", remote.cursorResponse.getCursorId()}}); + } + array.push_back(Value(Document{{kStageName, Value(std::move(cursors))}})); + if (!_armParams->sort.isEmpty()) { + array.push_back(Value(Document{{DocumentSourceSort::kStageName, Value(_armParams->sort)}})); + } } Pipeline::SourceContainer::iterator DocumentSourceMergeCursors::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { - // TODO SERVER-34009 3.6 and earlier mongos processes serialize $mergeCursors and $sort with - // $mergingPresorted as separate stages. This code is responsible for coalescing these two - // stages. New mongos processes serialize $mergeCursors with an absorbed sort as one stage. Once - // we no longer have to support the old format, this code can be deleted. invariant(*itr == this); invariant(!_arm); invariant(_armParams); @@ -92,9 +92,9 @@ Pipeline::SourceContainer::iterator DocumentSourceMergeCursors::doOptimizeAt( return next; } - _armParams->setSort( + _armParams->sort = nextSort->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging) - .toBson()); + .toBson(); if (auto sortLimit = nextSort->getLimitSrc()) { // There was a limit stage absorbed into the sort stage, so we need to preserve that. container->insert(std::next(next), sortLimit); @@ -105,31 +105,15 @@ Pipeline::SourceContainer::iterator DocumentSourceMergeCursors::doOptimizeAt( boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { - if (elem.type() == BSONType::Object) { - // This is the modern serialization format. We de-serialize using the IDL. - auto ownedObj = elem.embeddedObject().getOwned(); - auto armParams = - AsyncResultsMergerParams::parse(IDLParserErrorContext(kStageName), ownedObj); - return new DocumentSourceMergeCursors( - Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), - std::move(armParams), - expCtx, - std::move(ownedObj)); - } - - // This is the old serialization format which can still be generated by mongos processes - // older than 4.0. - // TODO SERVER-34009 Remove support for this format. - uassert(17026, - "$mergeCursors stage expected either an array or an object as argument", - elem.type() == BSONType::Array); + uassert( + 17026, "$mergeCursors stage expected array as argument", elem.type() == BSONType::Array); const auto serializedRemotes = elem.Array(); uassert(50729, "$mergeCursors stage expected array with at least one entry", serializedRemotes.size() > 0); boost::optional<NamespaceString> nss; - std::vector<RemoteCursor> remotes; + std::vector<ClusterClientCursorParams::RemoteCursor> remotes; for (auto&& cursor : serializedRemotes) { BSONElement nsElem; BSONElement hostElem; @@ -180,30 +164,28 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson( // any data in the initial batch. // TODO SERVER-33323 We use a fake shard id because the AsyncResultsMerger won't use it for // anything, and finding the real one is non-trivial. - RemoteCursor remoteCursor; - remoteCursor.setShardId(ShardId("fakeShardIdForMergeCursors")); - remoteCursor.setHostAndPort(std::move(host)); std::vector<BSONObj> emptyBatch; - remoteCursor.setCursorResponse(CursorResponse{*nss, cursorId, emptyBatch}); - remotes.push_back(std::move(remoteCursor)); + remotes.push_back({ShardId("fakeShardIdForMergeCursors"), + std::move(host), + CursorResponse{*nss, cursorId, emptyBatch}}); } invariant(nss); // We know there is at least one cursor in 'serializedRemotes', and we require // each cursor to have a 'ns' field. - AsyncResultsMergerParams params; - params.setRemotes(std::move(remotes)); - params.setNss(*nss); + auto params = stdx::make_unique<ClusterClientCursorParams>(*nss); + params->remotes = std::move(remotes); return new DocumentSourceMergeCursors( Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params), - expCtx, - elem.embeddedObject().getOwned()); + expCtx); } boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create( + std::vector<ClusterClientCursorParams::RemoteCursor>&& remoteCursors, executor::TaskExecutor* executor, - AsyncResultsMergerParams params, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + auto params = stdx::make_unique<ClusterClientCursorParams>(expCtx->ns); + params->remotes = std::move(remoteCursors); return new DocumentSourceMergeCursors(executor, std::move(params), expCtx); } diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index 611d09bcd34..cb74f4bd9ef 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -55,11 +55,11 @@ public: BSONElement, const boost::intrusive_ptr<ExpressionContext>&); /** - * Creates a new DocumentSourceMergeCursors from the given parameters. + * Creates a new DocumentSourceMergeCursors from the given 'remoteCursors'. */ static boost::intrusive_ptr<DocumentSource> create( + std::vector<ClusterClientCursorParams::RemoteCursor>&& remoteCursors, executor::TaskExecutor*, - AsyncResultsMergerParams, const boost::intrusive_ptr<ExpressionContext>&); const char* getSourceName() const final { @@ -77,7 +77,9 @@ public: /** * Serializes this stage to be sent to perform the merging on a different host. */ - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + void serializeToArray( + std::vector<Value>& array, + boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; StageConstraints constraints(Pipeline::SplitState pipeState) const final { StageConstraints constraints(StreamType::kStreaming, @@ -100,23 +102,21 @@ protected: private: DocumentSourceMergeCursors(executor::TaskExecutor*, - AsyncResultsMergerParams, - const boost::intrusive_ptr<ExpressionContext>&, - boost::optional<BSONObj> ownedParamsSpec = boost::none); + std::unique_ptr<ClusterClientCursorParams>, + const boost::intrusive_ptr<ExpressionContext>&); - // When we have parsed the params out of a BSONObj, the object needs to stay around while the - // params are in use. We store them here. - boost::optional<BSONObj> _armParamsObj; + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { + MONGO_UNREACHABLE; // Should call serializeToArray instead. + } executor::TaskExecutor* _executor; - // '_armParams' is populated until the first call to getNext(). Upon the first call to getNext() - // '_arm' will be populated using '_armParams', and '_armParams' will become boost::none. So if - // getNext() is never called we will never populate '_arm'. If we did so the destruction of this - // stage would cause the cursors within the ARM to be killed prematurely. For example, if this - // stage is parsed on mongos then forwarded to the shards, it should not kill the cursors when - // it goes out of scope on mongos. - boost::optional<AsyncResultsMergerParams> _armParams; + // '_arm' is not populated until the first call to getNext(). If getNext() is never called we + // will not create an AsyncResultsMerger. If we did so the destruction of this stage would cause + // the cursors within the ARM to be killed prematurely. For example, if this stage is parsed on + // mongos then forwarded to the shards, it should not kill the cursors when it goes out of scope + // on mongos. + std::unique_ptr<ClusterClientCursorParams> _armParams; boost::optional<AsyncResultsMerger> _arm; }; diff --git a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp index 3614324d27d..4809ac54b50 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp @@ -111,7 +111,9 @@ private: }; TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectNonArray) { - auto spec = BSON("$mergeCursors" << 2); + auto spec = BSON("$mergeCursors" << BSON( + "cursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host" + << kTestHost.toString())))); ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()), AssertionException, 17026); @@ -211,66 +213,16 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseTheSerializedVersionOf ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx())); } -RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) { - RemoteCursor remoteCursor; - remoteCursor.setShardId(std::move(shardId)); - remoteCursor.setHostAndPort(std::move(host)); - remoteCursor.setCursorResponse(std::move(response)); - return remoteCursor; -} - -TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseSerializedARMParams) { - AsyncResultsMergerParams params; - params.setSort(BSON("y" << 1 << "z" << 1)); - params.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.emplace_back(makeRemoteCursor( - kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, kExhaustedCursorID, {}))); - params.setRemotes(std::move(cursors)); - auto spec = BSON("$mergeCursors" << params.toBSON()); - auto mergeCursors = - DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()); - std::vector<Value> serializationArray; - mergeCursors->serializeToArray(serializationArray); - ASSERT_EQ(serializationArray.size(), 1UL); - - // Make sure the serialized version can be parsed into an identical AsyncResultsMergerParams. - auto newSpec = serializationArray[0].getDocument().toBson(); - ASSERT(newSpec["$mergeCursors"].type() == BSONType::Object); - auto newParams = AsyncResultsMergerParams::parse(IDLParserErrorContext("$mergeCursors test"), - newSpec["$mergeCursors"].Obj()); - ASSERT_TRUE(params.getSort()); - ASSERT_BSONOBJ_EQ(*params.getSort(), *newParams.getSort()); - ASSERT_EQ(params.getCompareWholeSortKey(), newParams.getCompareWholeSortKey()); - ASSERT(params.getTailableMode() == newParams.getTailableMode()); - ASSERT(params.getBatchSize() == newParams.getBatchSize()); - ASSERT_EQ(params.getNss(), newParams.getNss()); - ASSERT_EQ(params.getAllowPartialResults(), newParams.getAllowPartialResults()); - ASSERT_EQ(newParams.getRemotes().size(), 1UL); - ASSERT(newParams.getRemotes()[0].getShardId() == kTestShardIds[0].toString()); - ASSERT(newParams.getRemotes()[0].getHostAndPort() == kTestShardHosts[0]); - ASSERT_EQ(newParams.getRemotes()[0].getCursorResponse().getNSS(), kTestNss); - ASSERT_EQ(newParams.getRemotes()[0].getCursorResponse().getCursorId(), kExhaustedCursorID); - ASSERT(newParams.getRemotes()[0].getCursorResponse().getBatch().empty()); - - // Test that the $mergeCursors stage will accept the serialized format of - // AsyncResultsMergerParams. - ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx())); -} - TEST_F(DocumentSourceMergeCursorsTest, ShouldReportEOFWithNoCursors) { auto expCtx = getExpCtx(); - AsyncResultsMergerParams armParams; - armParams.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.emplace_back(makeRemoteCursor( - kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {}))); - cursors.emplace_back(makeRemoteCursor( - kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {}))); - armParams.setRemotes(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back( + kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {})); + cursors.emplace_back( + kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {})); auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); auto mergeCursorsStage = - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx); + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); ASSERT_TRUE(mergeCursorsStage->getNext().isEOF()); } @@ -284,17 +236,12 @@ BSONObj cursorResponseObj(const NamespaceString& nss, TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) { auto expCtx = getExpCtx(); - AsyncResultsMergerParams armParams; - armParams.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); - armParams.setRemotes(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx)); // Iterate the $mergeCursors stage asynchronously on a different thread, since it will block // waiting for network responses, which we will manually schedule below. @@ -333,17 +280,12 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) { TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) { auto expCtx = getExpCtx(); - AsyncResultsMergerParams armParams; - armParams.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); - armParams.setRemotes(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx)); pipeline.reset(); // Delete the pipeline before using it. @@ -353,15 +295,11 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) { TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorIfPartiallyIterated) { auto expCtx = getExpCtx(); - AsyncResultsMergerParams armParams; - armParams.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); - armParams.setRemotes(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx)); // Iterate the pipeline asynchronously on a different thread, since it will block waiting for // network responses, which we will manually schedule below. @@ -406,16 +344,12 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx)); // Make a $mergeCursors stage and add it to the front of the pipeline. - AsyncResultsMergerParams armParams; - armParams.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); - armParams.setRemotes(std::move(cursors)); - pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); + auto mergeCursorsStage = + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); + pipeline->addInitialSource(std::move(mergeCursorsStage)); // After optimization we should only have a $mergeCursors stage. pipeline->optimizePipeline(); @@ -430,7 +364,6 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}})); ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}})); ASSERT_FALSE(static_cast<bool>(pipeline->getNext())); - std::cout << "Finished"; }); onCommand([&](const auto& request) { @@ -449,35 +382,36 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd future.timed_get(kFutureTimeout); } -TEST_F(DocumentSourceMergeCursorsTest, ShouldEnforceSortSpecifiedViaARMParams) { +TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) { auto expCtx = getExpCtx(); - auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); - // Make a $mergeCursors stage with a sort on "x" and add it to the front of the pipeline. - AsyncResultsMergerParams armParams; - armParams.setNss(kTestNss); - armParams.setSort(BSON("x" << 1)); - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); - armParams.setRemotes(std::move(cursors)); - pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + // Make a pipeline with a single $sort stage that is merging pre-sorted results. + const bool mergingPresorted = true; + const long long limit = 3; + auto sortStage = DocumentSourceSort::create( + expCtx, BSON("x" << 1), limit, DocumentSourceSort::kMaxMemoryUsageBytes, mergingPresorted); + auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx)); - // After optimization we should only have a $mergeCursors stage. + // Make a $mergeCursors stage and add it to the front of the pipeline. + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); + auto mergeCursorsStage = + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); + pipeline->addInitialSource(std::move(mergeCursorsStage)); + + // After optimization, we should still have a $limit stage. pipeline->optimizePipeline(); - ASSERT_EQ(pipeline->getSources().size(), 1UL); + ASSERT_EQ(pipeline->getSources().size(), 2UL); ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get())); // Iterate the pipeline asynchronously on a different thread, since it will block waiting for // network responses, which we will manually schedule below. - auto future = launchAsync([&pipeline]() { - ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}})); - ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 2}})); - ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}})); - ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}})); + auto future = launchAsync([&]() { + for (int i = 1; i <= limit; ++i) { + ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}})); + } ASSERT_FALSE(static_cast<bool>(pipeline->getNext())); }); @@ -497,7 +431,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldEnforceSortSpecifiedViaARMParams) { future.timed_get(kFutureTimeout); } -TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) { +TEST_F(DocumentSourceMergeCursorsTest, ShouldSerializeSortIfAbsorbedViaOptimize) { auto expCtx = getExpCtx(); // Make a pipeline with a single $sort stage that is merging pre-sorted results. @@ -508,16 +442,12 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLea auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx)); // Make a $mergeCursors stage and add it to the front of the pipeline. - AsyncResultsMergerParams armParams; - armParams.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); - armParams.setRemotes(std::move(cursors)); - pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); + auto mergeCursorsStage = + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); + pipeline->addInitialSource(std::move(mergeCursorsStage)); // After optimization, we should still have a $limit stage. pipeline->optimizePipeline(); @@ -525,29 +455,11 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLea ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get())); - // Iterate the pipeline asynchronously on a different thread, since it will block waiting for - // network responses, which we will manually schedule below. - auto future = launchAsync([&]() { - for (int i = 1; i <= limit; ++i) { - ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}})); - } - ASSERT_FALSE(static_cast<bool>(pipeline->getNext())); - }); - - onCommand([&](const auto& request) { - return cursorResponseObj(expCtx->ns, - kExhaustedCursorID, - {BSON("x" << 1 << "$sortKey" << BSON("" << 1)), - BSON("x" << 3 << "$sortKey" << BSON("" << 3))}); - }); - onCommand([&](const auto& request) { - return cursorResponseObj(expCtx->ns, - kExhaustedCursorID, - {BSON("x" << 2 << "$sortKey" << BSON("" << 2)), - BSON("x" << 4 << "$sortKey" << BSON("" << 4))}); - }); - - future.timed_get(kFutureTimeout); + auto serialized = pipeline->serialize(); + ASSERT_EQ(serialized.size(), 3UL); + ASSERT_FALSE(serialized[0]["$mergeCursors"].missing()); + ASSERT_FALSE(serialized[1]["$sort"].missing()); + ASSERT_FALSE(serialized[2]["$limit"].missing()); } } // namespace diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index c5989718d5e..d2ad1cd9ec4 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -262,7 +262,7 @@ private: uint64_t _maxMemoryUsageBytes; bool _done; - bool _mergingPresorted; // TODO SERVER-34009 Remove this flag. + bool _mergingPresorted; std::unique_ptr<MySorter> _sorter; std::unique_ptr<MySorter::Iterator> _output; }; diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index b5e490e91d7..6847d43e8a4 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -161,7 +161,7 @@ public: * query. */ bool isTailableAwaitData() const { - return tailableMode == TailableModeEnum::kTailableAndAwaitData; + return tailableMode == TailableMode::kTailableAndAwaitData; } // The explain verbosity requested by the user, or boost::none if no explain was requested. @@ -198,7 +198,7 @@ public: Variables variables; VariablesParseState variablesParseState; - TailableModeEnum tailableMode = TailableModeEnum::kNormal; + TailableMode tailableMode = TailableMode::kNormal; // Tracks the depth of nested aggregation sub-pipelines. Used to enforce depth limits. size_t subPipelineDepth = 0; diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 335ca4f5655..2f6fce9e526 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -28,7 +28,9 @@ #include "mongo/platform/basic.h" +// This file defines functions from both of these headers #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/pipeline_optimizations.h" #include <algorithm> @@ -38,7 +40,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/accumulator.h" -#include "mongo/db/pipeline/cluster_aggregation_planner.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_geo_near.h" @@ -46,7 +47,6 @@ #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/document_source_project.h" -#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" @@ -329,7 +329,13 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() { // Keep a copy of the original source list in case we need to reset the pipeline from split to // unsplit later. shardPipeline->_unsplitSources.emplace(_sources); - cluster_aggregation_planner::performSplitPipelineOptimizations(shardPipeline.get(), this); + + // The order in which optimizations are applied can have significant impact on the + // efficiency of the final pipeline. Be Careful! + Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this); + Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this); + Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this); + shardPipeline->_splitState = SplitState::kSplitForShards; _splitState = SplitState::kSplitForMerge; @@ -360,6 +366,87 @@ void Pipeline::unsplitFromSharded( stitch(); } +void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) { + while (!mergePipe->_sources.empty()) { + intrusive_ptr<DocumentSource> current = mergePipe->_sources.front(); + mergePipe->_sources.pop_front(); + + // Check if this source is splittable. + SplittableDocumentSource* splittable = + dynamic_cast<SplittableDocumentSource*>(current.get()); + + if (!splittable) { + // Move the source from the merger _sources to the shard _sources. + shardPipe->_sources.push_back(current); + } else { + // Split this source into 'merge' and 'shard' _sources. + intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource(); + auto mergeSources = splittable->getMergeSources(); + + // A source may not simultaneously be present on both sides of the split. + invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) == + mergeSources.end()); + + if (shardSource) + shardPipe->_sources.push_back(shardSource); + + // Add the stages in reverse order, so that they appear in the pipeline in the same + // order as they were returned by the stage. + for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) { + mergePipe->_sources.push_front(*it); + } + + break; + } + } +} + +void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, + Pipeline* mergePipe) { + while (!shardPipe->_sources.empty() && + dynamic_cast<DocumentSourceUnwind*>(shardPipe->_sources.back().get())) { + mergePipe->_sources.push_front(shardPipe->_sources.back()); + shardPipe->_sources.pop_back(); + } +} + +void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, + Pipeline* mergePipe) { + auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery()) + ? DepsTracker::MetadataAvailable::kTextScore + : DepsTracker::MetadataAvailable::kNoMetadata; + DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata)); + if (mergeDeps.needWholeDocument) + return; // the merge needs all fields, so nothing we can do. + + // Empty project is "special" so if no fields are needed, we just ask for _id instead. + if (mergeDeps.fields.empty()) + mergeDeps.fields.insert("_id"); + + // Remove metadata from dependencies since it automatically flows through projection and we + // don't want to project it in to the document. + mergeDeps.setNeedTextScore(false); + + // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of + // field dependencies. While this may not be 100% ideal in all cases, it is simple and + // avoids the worst cases by ensuring that: + // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of + // dependencies. This situation can happen when a $sort is before the first $project or + // $group. Without the optimization, the shards would have to reify and transmit full + // objects even though only a subset of fields are needed. + // 2) Optimization IS NOT applied immediately following a $project or $group since it would + // add an unnecessary project (and therefore a deep-copy). + for (auto&& source : shardPipe->_sources) { + DepsTracker dt(depsMetadata); + if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS) + return; + } + // if we get here, add the project. + boost::intrusive_ptr<DocumentSource> project = DocumentSourceProject::createFromBson( + BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx); + shardPipe->_sources.push_back(project); +} + BSONObj Pipeline::getInitialQuery() const { if (_sources.empty()) return BSONObj(); @@ -612,35 +699,7 @@ Status Pipeline::_pipelineCanRunOnMongoS() const { return Status::OK(); } -void Pipeline::pushBack(boost::intrusive_ptr<DocumentSource> newStage) { - newStage->setSource(_sources.back().get()); - _sources.push_back(std::move(newStage)); -} - -boost::intrusive_ptr<DocumentSource> Pipeline::popBack() { - if (_sources.empty()) { - return nullptr; - } - auto targetStage = _sources.back(); - _sources.pop_back(); - return targetStage; -} - -boost::intrusive_ptr<DocumentSource> Pipeline::popFront() { - if (_sources.empty()) { - return nullptr; - } - auto targetStage = _sources.front(); - _sources.pop_front(); - stitch(); - return targetStage; -} - -boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithName(StringData targetStageName) { - return popFrontWithNameAndCriteria(targetStageName, nullptr); -} - -boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithNameAndCriteria( +boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria( StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate) { if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) { return nullptr; @@ -651,7 +710,8 @@ boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithNameAndCriteria( return nullptr; } - return popFront(); + _sources.pop_front(); + stitch(); + return targetStage; } - } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 04b5769792d..bcb7ee64521 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -39,8 +39,6 @@ #include "mongo/db/pipeline/value.h" #include "mongo/db/query/explain_options.h" #include "mongo/db/query/query_knobs.h" -#include "mongo/executor/task_executor.h" -#include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/stdx/functional.h" #include "mongo/util/intrusive_counter.h" #include "mongo/util/timer.h" @@ -270,34 +268,13 @@ public: } /** - * Removes and returns the first stage of the pipeline. Returns nullptr if the pipeline is - * empty. - */ - boost::intrusive_ptr<DocumentSource> popFront(); - - /** - * Removes and returns the last stage of the pipeline. Returns nullptr if the pipeline is empty. - */ - boost::intrusive_ptr<DocumentSource> popBack(); - - /** - * Adds the given stage to the end of the pipeline. - */ - void pushBack(boost::intrusive_ptr<DocumentSource>); - - /** - * Removes and returns the first stage of the pipeline if its name is 'targetStageName'. - * Returns nullptr if there is no first stage with that name. - */ - boost::intrusive_ptr<DocumentSource> popFrontWithName(StringData targetStageName); - - /** * Removes and returns the first stage of the pipeline if its name is 'targetStageName' and the * given 'predicate' function, if present, returns 'true' when called with a pointer to the * stage. Returns nullptr if there is no first stage which meets these criteria. */ - boost::intrusive_ptr<DocumentSource> popFrontWithNameAndCriteria( - StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate); + boost::intrusive_ptr<DocumentSource> popFrontWithCriteria( + StringData targetStageName, + stdx::function<bool(const DocumentSource* const)> predicate = nullptr); /** * PipelineD is a "sister" class that has additional functionality for the Pipeline. It exists @@ -309,6 +286,15 @@ public: friend class PipelineD; private: + class Optimizations { + public: + // This contains static functions that optimize pipelines in various ways. + // This is a class rather than a namespace so that it can be a friend of Pipeline. + // It is defined in pipeline_optimizations.h. + class Sharded; + }; + + friend class Optimizations::Sharded; friend class PipelineDeleter; /** diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 043452c3e0f..244946bd877 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -386,7 +386,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS; } - if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { + if (expCtx->needsMerge && expCtx->tailableMode == TailableMode::kTailableAndAwaitData) { plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; } diff --git a/src/mongo/db/pipeline/pipeline_optimizations.h b/src/mongo/db/pipeline/pipeline_optimizations.h new file mode 100644 index 00000000000..af85fdff4b0 --- /dev/null +++ b/src/mongo/db/pipeline/pipeline_optimizations.h @@ -0,0 +1,71 @@ +/** + * Copyright 2013 (c) 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +/** + * This file declares optimizations available on Pipelines. For now they should be considered part + * of Pipeline's implementation rather than it's interface. + */ + +#pragma once + +#include "mongo/db/pipeline/pipeline.h" + +namespace mongo { +/** + * This class holds optimizations applied to a shard Pipeline and a merger Pipeline. + * + * Each function has the same signature and takes two Pipelines, both as an in/out parameters. + */ +class Pipeline::Optimizations::Sharded { +public: + /** + * Moves everything before a splittable stage to the shards. If there + * are no splittable stages, moves everything to the shards. + * + * It is not safe to call this optimization multiple times. + * + * NOTE: looks for SplittableDocumentSources and uses that API + */ + static void findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe); + + /** + * If the final stage on shards is to unwind an array, move that stage to the merger. This + * cuts down on network traffic and allows us to take advantage of reduced copying in + * unwind. + */ + static void moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe); + + /** + * Adds a stage to the end of shardPipe explicitly requesting all fields that mergePipe + * needs. This is only done if it heuristically determines that it is needed. This + * optimization can reduce the amount of network traffic and can also enable the shards to + * convert less source BSON into Documents. + */ + static void limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe); +}; +} // namespace mongo diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index 3638d5ba00b..81c4ac22cf9 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -187,8 +187,7 @@ env.Library( target="query_request", source=[ "query_request.cpp", - "tailable_mode.cpp", - env.Idlc("tailable_mode.idl")[0], + "tailable_mode.cpp" ], LIBDEPS=[ "$BUILD_DIR/mongo/base", diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index b2dd828f4c2..d3852f2ee21 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -141,23 +141,6 @@ public: }; /** - * Constructs a CursorResponse from the command BSON response. - */ - static StatusWith<CursorResponse> parseFromBSON(const BSONObj& cmdResponse); - - /** - * A throwing version of 'parseFromBSON'. - */ - static CursorResponse parseFromBSONThrowing(const BSONObj& cmdResponse) { - return uassertStatusOK(parseFromBSON(cmdResponse)); - } - - /** - * Constructs an empty cursor response. - */ - CursorResponse() = default; - - /** * Constructs from values for each of the fields. */ CursorResponse(NamespaceString nss, @@ -203,13 +186,15 @@ public: } /** + * Constructs a CursorResponse from the command BSON response. + */ + static StatusWith<CursorResponse> parseFromBSON(const BSONObj& cmdResponse); + + /** * Converts this response to its raw BSON representation. */ BSONObj toBSON(ResponseType responseType) const; void addToBSON(ResponseType responseType, BSONObjBuilder* builder) const; - BSONObj toBSONAsInitialResponse() const { - return toBSON(ResponseType::InitialResponse); - } private: NamespaceString _nss; diff --git a/src/mongo/db/query/getmore_request.cpp b/src/mongo/db/query/getmore_request.cpp index ea2515123f6..a6b29ea520b 100644 --- a/src/mongo/db/query/getmore_request.cpp +++ b/src/mongo/db/query/getmore_request.cpp @@ -59,7 +59,7 @@ GetMoreRequest::GetMoreRequest() : cursorid(0), batchSize(0) {} GetMoreRequest::GetMoreRequest(NamespaceString namespaceString, CursorId id, - boost::optional<std::int64_t> sizeOfBatch, + boost::optional<long long> sizeOfBatch, boost::optional<Milliseconds> awaitDataTimeout, boost::optional<long long> term, boost::optional<repl::OpTime> lastKnownCommittedOpTime) @@ -108,7 +108,7 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna boost::optional<NamespaceString> nss; // Optional fields. - boost::optional<std::int64_t> batchSize; + boost::optional<long long> batchSize; boost::optional<Milliseconds> awaitDataTimeout; boost::optional<long long> term; boost::optional<repl::OpTime> lastKnownCommittedOpTime; diff --git a/src/mongo/db/query/getmore_request.h b/src/mongo/db/query/getmore_request.h index 4da4839abbd..8fa2d0fc5dd 100644 --- a/src/mongo/db/query/getmore_request.h +++ b/src/mongo/db/query/getmore_request.h @@ -52,7 +52,7 @@ struct GetMoreRequest { */ GetMoreRequest(NamespaceString namespaceString, CursorId id, - boost::optional<std::int64_t> sizeOfBatch, + boost::optional<long long> sizeOfBatch, boost::optional<Milliseconds> awaitDataTimeout, boost::optional<long long> term, boost::optional<repl::OpTime> lastKnownCommittedOpTime); @@ -76,7 +76,7 @@ struct GetMoreRequest { // The batch size is optional. If not provided, we will put as many documents into the batch // as fit within the byte limit. - const boost::optional<std::int64_t> batchSize; + const boost::optional<long long> batchSize; // The number of milliseconds for which a getMore on a tailable, awaitData query should block. const boost::optional<Milliseconds> awaitDataTimeout; diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp index 56ffe5dbb42..6268785c6a6 100644 --- a/src/mongo/db/query/query_request.cpp +++ b/src/mongo/db/query/query_request.cpp @@ -495,16 +495,16 @@ void QueryRequest::asFindCommand(BSONObjBuilder* cmdBuilder) const { } switch (_tailableMode) { - case TailableModeEnum::kTailable: { + case TailableMode::kTailable: { cmdBuilder->append(kTailableField, true); break; } - case TailableModeEnum::kTailableAndAwaitData: { + case TailableMode::kTailableAndAwaitData: { cmdBuilder->append(kTailableField, true); cmdBuilder->append(kAwaitDataField, true); break; } - case TailableModeEnum::kNormal: { + case TailableMode::kNormal: { break; } } @@ -623,7 +623,7 @@ Status QueryRequest::validate() const { << _maxTimeMS); } - if (_tailableMode != TailableModeEnum::kNormal) { + if (_tailableMode != TailableMode::kNormal) { // Tailable cursors cannot have any sort other than {$natural: 1}. const BSONObj expectedSort = BSON(kNaturalSortField << 1); if (!_sort.isEmpty() && @@ -911,9 +911,9 @@ Status QueryRequest::initFullQuery(const BSONObj& top) { int QueryRequest::getOptions() const { int options = 0; - if (_tailableMode == TailableModeEnum::kTailable) { + if (_tailableMode == TailableMode::kTailable) { options |= QueryOption_CursorTailable; - } else if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) { + } else if (_tailableMode == TailableMode::kTailableAndAwaitData) { options |= QueryOption_CursorTailable; options |= QueryOption_AwaitData; } diff --git a/src/mongo/db/query/query_request.h b/src/mongo/db/query/query_request.h index 29abf54aaca..7a89e466faf 100644 --- a/src/mongo/db/query/query_request.h +++ b/src/mongo/db/query/query_request.h @@ -319,19 +319,19 @@ public: } bool isTailable() const { - return _tailableMode == TailableModeEnum::kTailable || - _tailableMode == TailableModeEnum::kTailableAndAwaitData; + return _tailableMode == TailableMode::kTailable || + _tailableMode == TailableMode::kTailableAndAwaitData; } bool isTailableAndAwaitData() const { - return _tailableMode == TailableModeEnum::kTailableAndAwaitData; + return _tailableMode == TailableMode::kTailableAndAwaitData; } - void setTailableMode(TailableModeEnum tailableMode) { + void setTailableMode(TailableMode tailableMode) { _tailableMode = tailableMode; } - TailableModeEnum getTailableMode() const { + TailableMode getTailableMode() const { return _tailableMode; } @@ -498,7 +498,7 @@ private: bool _hasReadPref = false; // Options that can be specified in the OP_QUERY 'flags' header. - TailableModeEnum _tailableMode = TailableModeEnum::kNormal; + TailableMode _tailableMode = TailableMode::kNormal; bool _slaveOk = false; bool _oplogReplay = false; bool _noCursorTimeout = false; diff --git a/src/mongo/db/query/query_request_test.cpp b/src/mongo/db/query/query_request_test.cpp index 9992fdb155e..edcf58cecb1 100644 --- a/src/mongo/db/query/query_request_test.cpp +++ b/src/mongo/db/query/query_request_test.cpp @@ -1165,7 +1165,7 @@ TEST(QueryRequestTest, ConvertToAggregationWithShowRecordIdFails) { TEST(QueryRequestTest, ConvertToAggregationWithTailableFails) { QueryRequest qr(testns); - qr.setTailableMode(TailableModeEnum::kTailable); + qr.setTailableMode(TailableMode::kTailable); ASSERT_NOT_OK(qr.asAggregationCommand()); } @@ -1183,7 +1183,7 @@ TEST(QueryRequestTest, ConvertToAggregationWithNoCursorTimeoutFails) { TEST(QueryRequestTest, ConvertToAggregationWithAwaitDataFails) { QueryRequest qr(testns); - qr.setTailableMode(TailableModeEnum::kTailableAndAwaitData); + qr.setTailableMode(TailableMode::kTailableAndAwaitData); ASSERT_NOT_OK(qr.asAggregationCommand()); } diff --git a/src/mongo/db/query/tailable_mode.cpp b/src/mongo/db/query/tailable_mode.cpp index f09a7b59c41..b19a1988672 100644 --- a/src/mongo/db/query/tailable_mode.cpp +++ b/src/mongo/db/query/tailable_mode.cpp @@ -32,17 +32,17 @@ namespace mongo { -StatusWith<TailableModeEnum> tailableModeFromBools(bool isTailable, bool isAwaitData) { +StatusWith<TailableMode> tailableModeFromBools(bool isTailable, bool isAwaitData) { if (isTailable) { if (isAwaitData) { - return TailableModeEnum::kTailableAndAwaitData; + return TailableMode::kTailableAndAwaitData; } - return TailableModeEnum::kTailable; + return TailableMode::kTailable; } else if (isAwaitData) { return {ErrorCodes::FailedToParse, "Cannot set 'awaitData' without also setting 'tailable'"}; } - return TailableModeEnum::kNormal; + return TailableMode::kNormal; } } // namespace mongo diff --git a/src/mongo/db/query/tailable_mode.h b/src/mongo/db/query/tailable_mode.h index 531191c6f76..92c0fe9292e 100644 --- a/src/mongo/db/query/tailable_mode.h +++ b/src/mongo/db/query/tailable_mode.h @@ -30,14 +30,19 @@ #include "mongo/base/error_codes.h" #include "mongo/base/status_with.h" -#include "mongo/db/query/tailable_mode_gen.h" namespace mongo { +enum class TailableMode { + kNormal, + kTailable, + kTailableAndAwaitData, +}; + /** * Returns a TailableMode from two booleans, returning ErrorCodes::FailedToParse if awaitData is * set without tailable. */ -StatusWith<TailableModeEnum> tailableModeFromBools(bool isTailable, bool isAwaitData); +StatusWith<TailableMode> tailableModeFromBools(bool isTailable, bool isAwaitData); } // namespace mongo diff --git a/src/mongo/db/query/tailable_mode.idl b/src/mongo/db/query/tailable_mode.idl deleted file mode 100644 index 33d5b6989dd..00000000000 --- a/src/mongo/db/query/tailable_mode.idl +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright (C) 2018 MongoDB Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License, version 3, -# as published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# -# As a special exception, the copyright holders give permission to link the -# code of portions of this program with the OpenSSL library under certain -# conditions as described in each individual source file and distribute -# linked combinations including the program with the OpenSSL library. You -# must comply with the GNU Affero General Public License in all respects for -# all of the code used other than as permitted herein. If you modify file(s) -# with this exception, you may extend this exception to your version of the -# file(s), but you are not obligated to do so. If you do not wish to do so, -# delete this exception statement from your version. If you delete this -# exception statement from all source files in the program, then also delete -# it in the license file. - -global: - cpp_namespace: "mongo" - -enums: - TailableMode: - description: "Describes the tailablility of a cursor." - type: string - values: - kNormal: "normal" - kTailable: "tailable" - kTailableAndAwaitData: "tailableAndAwaitData" diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index fb0890b87a5..bcb0da9d789 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1241,6 +1241,7 @@ env.Library( '$BUILD_DIR/mongo/executor/task_executor_interface', '$BUILD_DIR/mongo/rpc/command_status', '$BUILD_DIR/mongo/s/query/async_results_merger', + '$BUILD_DIR/mongo/s/query/cluster_client_cursor', '$BUILD_DIR/mongo/util/progress_meter', ], ) diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 8aee68a4c58..773f54a0b8b 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -43,6 +43,7 @@ #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/server_parameters.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/util/assert_util.h" #include "mongo/util/destructor_guard.h" #include "mongo/util/fail_point_service.h" @@ -612,25 +613,20 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa << " cursors established."; // Initialize the 'AsyncResultsMerger'(ARM). - std::vector<RemoteCursor> remoteCursors; + std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; for (auto&& cursorResponse : cursorResponses) { // A placeholder 'ShardId' is used until the ARM is made less sharding specific. - remoteCursors.emplace_back(); - auto& newCursor = remoteCursors.back(); - newCursor.setShardId("CollectionClonerSyncSource"); - newCursor.setHostAndPort(_source); - newCursor.setCursorResponse(std::move(cursorResponse)); + remoteCursors.emplace_back( + ShardId("CollectionClonerSyncSource"), _source, std::move(cursorResponse)); } - AsyncResultsMergerParams armParams; - armParams.setNss(_sourceNss); - armParams.setRemotes(std::move(remoteCursors)); - if (_collectionCloningBatchSize > 0) { - armParams.setBatchSize(_collectionCloningBatchSize); - } + _clusterClientCursorParams = stdx::make_unique<ClusterClientCursorParams>(_sourceNss); + _clusterClientCursorParams->remotes = std::move(remoteCursors); + if (_collectionCloningBatchSize > 0) + _clusterClientCursorParams->batchSize = _collectionCloningBatchSize; Client::initThreadIfNotAlready(); _arm = stdx::make_unique<AsyncResultsMerger>( - cc().getOperationContext(), _executor, std::move(armParams)); + cc().getOperationContext(), _executor, _clusterClientCursorParams.get()); // This completion guard invokes _finishCallback on destruction. auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index d97c5c0048f..e79258e3734 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -301,6 +301,8 @@ private: const int _maxNumClonerCursors; // (M) Component responsible for fetching the documents from the collection cloner cursor(s). std::unique_ptr<AsyncResultsMerger> _arm; + // (R) The cursor parameters used by the 'AsyncResultsMerger'. + std::unique_ptr<ClusterClientCursorParams> _clusterClientCursorParams; // (M) The event handle for the 'kill' event of the 'AsyncResultsMerger'. executor::TaskExecutor::EventHandle _killArmHandle; diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index 5a834ed8aa5..ebfaccbde5c 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -369,7 +369,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterTimeout) opCtx(), collScanParams, workingSet.get(), matchExpression.get()); auto queryRequest = stdx::make_unique<QueryRequest>(nss); queryRequest->setFilter(filter); - queryRequest->setTailableMode(TailableModeEnum::kTailableAndAwaitData); + queryRequest->setTailableMode(TailableMode::kTailableAndAwaitData); auto canonicalQuery = unittest::assertGet( CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); auto planExecutor = @@ -381,7 +381,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterTimeout) PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT)); // Make a DocumentSourceCursor. - ctx()->tailableMode = TailableModeEnum::kTailableAndAwaitData; + ctx()->tailableMode = TailableMode::kTailableAndAwaitData; // DocumentSourceCursor expects a PlanExecutor that has had its state saved. planExecutor->saveState(); auto cursor = @@ -419,7 +419,7 @@ TEST_F(DocumentSourceCursorTest, NonAwaitDataCursorShouldErrorAfterTimeout) { PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT)); // Make a DocumentSourceCursor. - ctx()->tailableMode = TailableModeEnum::kNormal; + ctx()->tailableMode = TailableMode::kNormal; // DocumentSourceCursor expects a PlanExecutor that has had its state saved. planExecutor->saveState(); auto cursor = @@ -449,7 +449,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKil opCtx(), collScanParams, workingSet.get(), matchExpression.get()); auto queryRequest = stdx::make_unique<QueryRequest>(nss); queryRequest->setFilter(filter); - queryRequest->setTailableMode(TailableModeEnum::kTailableAndAwaitData); + queryRequest->setTailableMode(TailableMode::kTailableAndAwaitData); auto canonicalQuery = unittest::assertGet( CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); auto planExecutor = @@ -461,7 +461,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKil PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED)); // Make a DocumentSourceCursor. - ctx()->tailableMode = TailableModeEnum::kTailableAndAwaitData; + ctx()->tailableMode = TailableMode::kTailableAndAwaitData; // DocumentSourceCursor expects a PlanExecutor that has had its state saved. planExecutor->saveState(); auto cursor = @@ -498,7 +498,7 @@ TEST_F(DocumentSourceCursorTest, NormalCursorShouldErrorAfterBeingKilled) { PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED)); // Make a DocumentSourceCursor. - ctx()->tailableMode = TailableModeEnum::kNormal; + ctx()->tailableMode = TailableMode::kNormal; // DocumentSourceCursor expects a PlanExecutor that has had its state saved. planExecutor->saveState(); auto cursor = diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index 51a5f7723cd..8f8025e4b9d 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -99,7 +99,7 @@ public: Collection* coll, BSONObj& filterObj, PlanExecutor::YieldPolicy yieldPolicy = PlanExecutor::YieldPolicy::YIELD_MANUAL, - TailableModeEnum tailableMode = TailableModeEnum::kNormal) { + TailableMode tailableMode = TailableMode::kNormal) { CollectionScanParams csparams; csparams.collection = coll; csparams.direction = CollectionScanParams::FORWARD; @@ -307,7 +307,7 @@ TEST_F(PlanExecutorTest, ShouldReportErrorIfKilledDuringYieldButIsTailableAndAwa auto exec = makeCollScanExec(coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, - TailableModeEnum::kTailableAndAwaitData); + TailableMode::kTailableAndAwaitData); BSONObj resultObj; ASSERT_EQ(PlanExecutor::DEAD, exec->getNext(&resultObj, nullptr)); @@ -323,7 +323,7 @@ TEST_F(PlanExecutorTest, ShouldNotSwallowExceedsTimeLimitDuringYieldButIsTailabl Collection* coll = ctx.getCollection(); auto exec = makeCollScanExec( - coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, TailableModeEnum::kTailable); + coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, TailableMode::kTailable); BSONObj resultObj; ASSERT_EQ(PlanExecutor::DEAD, exec->getNext(&resultObj, nullptr)); diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index c0643205001..7c673f73b30 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -41,7 +41,6 @@ #include "mongo/db/curop.h" #include "mongo/db/logical_clock.h" #include "mongo/db/operation_context.h" -#include "mongo/db/pipeline/cluster_aggregation_planner.h" #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/document_source_out.h" @@ -264,14 +263,26 @@ BSONObj createCommandForMergingShard( return appendAllowImplicitCreate(mergeCmd.freeze().toBson(), true); } -std::vector<RemoteCursor> establishShardCursors(OperationContext* opCtx, - const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - CachedCollectionRoutingInfo* routingInfo, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery, - const BSONObj& collation) { +/** + * Verifies that the shardIds are the same as they were atClusterTime using versioned table. + * TODO: SERVER-33767 + */ +bool verifyTargetedShardsAtClusterTime(OperationContext* opCtx, + const std::set<ShardId>& shardIds, + LogicalTime atClusterTime) { + return true; +} + +std::vector<ClusterClientCursorParams::RemoteCursor> establishShardCursors( + OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + CachedCollectionRoutingInfo* routingInfo, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref, + const BSONObj& shardQuery, + const BSONObj& collation) { +>>>>>>> parent of 7d09f27... SERVER-33323 Use the IDL to serialize the ARM LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; bool mustRunOnAll = mustRunOnAllShards(nss, *routingInfo, litePipe); @@ -341,7 +352,7 @@ struct DispatchShardPipelineResults { // Populated if this *is not* an explain, this vector represents the cursors on the remote // shards. - std::vector<RemoteCursor> remoteCursors; + std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; // Populated if this *is* an explain, this vector represents the results from each shard. std::vector<AsyncRequestsSender::Response> remoteExplainOutput; @@ -379,7 +390,7 @@ DispatchShardPipelineResults dispatchShardPipeline( // pipeline is already split and we now only need to target a single shard, reassemble the // original pipeline. // - After exhausting 10 attempts to establish the cursors, we give up and throw. - auto cursors = std::vector<RemoteCursor>(); + auto cursors = std::vector<ClusterClientCursorParams::RemoteCursor>(); auto shardResults = std::vector<AsyncRequestsSender::Response>(); auto opCtx = expCtx->opCtx; @@ -537,7 +548,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, BSONObj cmdToRunOnNewShards, const LiteParsedPipeline& liteParsedPipeline, std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, - std::vector<RemoteCursor> cursors) { + std::vector<ClusterClientCursorParams::RemoteCursor> cursors) { ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx)); @@ -545,6 +556,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, params.tailableMode = pipelineForMerging->getContext()->tailableMode; params.mergePipeline = std::move(pipelineForMerging); params.remotes = std::move(cursors); + // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch // size we pass here is used for getMores, so do not specify a batch size if the initial request // had a batch size of 0. @@ -554,19 +566,12 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, if (liteParsedPipeline.hasChangeStream()) { // For change streams, we need to set up a custom stage to establish cursors on new shards - // when they are added. Be careful to extract the targeted shard IDs before the remote - // cursors are transferred from the ClusterClientCursorParams to the AsyncResultsMerger. - std::vector<ShardId> shardIds; - for (const auto& remote : params.remotes) { - shardIds.emplace_back(remote.getShardId().toString()); - } - - params.createCustomCursorSource = [cmdToRunOnNewShards, - shardIds](OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params) { + // when they are added. + params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx, + executor::TaskExecutor* executor, + ClusterClientCursorParams* params) { return stdx::make_unique<RouterStageUpdateOnAddShard>( - opCtx, executor, params, std::move(shardIds), cmdToRunOnNewShards); + opCtx, executor, params, cmdToRunOnNewShards); }; } auto ccc = ClusterClientCursorImpl::make( @@ -615,7 +620,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, ccc->detachFromOperationContext(); - int nShards = ccc->getNumRemotes(); + int nShards = ccc->getRemotes().size(); CursorId clusterCursorId = 0; if (cursorState == ClusterCursorManager::CursorState::NotExhausted) { @@ -681,8 +686,7 @@ ShardId pickMergingShard(OperationContext* opCtx, return dispatchResults.needsPrimaryShardMerge ? primaryShard : dispatchResults.remoteCursors[prng.nextInt32(dispatchResults.remoteCursors.size())] - .getShardId() - .toString(); + .shardId; } } // namespace @@ -835,16 +839,15 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, auto executorPool = Grid::get(opCtx)->getExecutorPool(); const BSONObj reply = uassertStatusOK(storePossibleCursor( opCtx, - remoteCursor.getShardId().toString(), - remoteCursor.getHostAndPort(), - remoteCursor.getCursorResponse().toBSON(CursorResponse::ResponseType::InitialResponse), + remoteCursor.shardId, + remoteCursor.hostAndPort, + remoteCursor.cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse), namespaces.requestedNss, executorPool->getArbitraryExecutor(), Grid::get(opCtx)->getCursorManager(), mergeCtx->tailableMode)); - return appendCursorResponseToCommandResult( - remoteCursor.getShardId().toString(), reply, result); + return appendCursorResponseToCommandResult(remoteCursor.shardId, reply, result); } // If we reach here, we have a merge pipeline to dispatch. @@ -880,10 +883,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, ShardId mergingShardId = pickMergingShard(opCtx, dispatchResults, executionNsRoutingInfo.db().primaryId()); - cluster_aggregation_planner::addMergeCursorsSource( - mergingPipeline.get(), + mergingPipeline->addInitialSource(DocumentSourceMergeCursors::create( std::move(dispatchResults.remoteCursors), - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + mergeCtx)); auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline); auto mergeResponse = @@ -978,8 +981,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, namespaces.requestedNss, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), Grid::get(opCtx)->getCursorManager(), - liteParsedPipeline.hasChangeStream() ? TailableModeEnum::kTailableAndAwaitData - : TailableModeEnum::kNormal)); + liteParsedPipeline.hasChangeStream() ? TailableMode::kTailableAndAwaitData + : TailableMode::kNormal)); } // First append the properly constructed writeConcernError. It will then be skipped diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp index 4d91ddfff08..060c5533877 100644 --- a/src/mongo/s/commands/pipeline_s.cpp +++ b/src/mongo/s/commands/pipeline_s.cpp @@ -114,7 +114,7 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument( cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern); } - auto shardResult = std::vector<RemoteCursor>(); + auto shardResult = std::vector<ClusterClientCursorParams::RemoteCursor>(); auto findCmd = cmdBuilder.obj(); size_t numAttempts = 0; while (++numAttempts <= kMaxNumStaleVersionRetries) { @@ -164,13 +164,13 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument( invariant(shardResult.size() == 1u); - auto& cursor = shardResult.front().getCursorResponse(); + auto& cursor = shardResult.front().cursorResponse; auto& batch = cursor.getBatch(); // We should have at most 1 result, and the cursor should be exhausted. uassert(ErrorCodes::InternalError, str::stream() << "Shard cursor was unexpectedly open after lookup: " - << shardResult.front().getHostAndPort() + << shardResult.front().hostAndPort << ", id: " << cursor.getCursorId(), cursor.getCursorId() == 0); diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index fa70221c4fd..708f52ac576 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -585,7 +585,7 @@ DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss } uassertStatusOK(statusGetDb); - boost::optional<std::int64_t> batchSize; + boost::optional<long long> batchSize; if (ntoreturn) { batchSize = ntoreturn; } diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index aa9a1713255..d9148577f5a 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -86,7 +86,6 @@ env.Library( source=[ "async_results_merger.cpp", "establish_cursors.cpp", - env.Idlc('async_results_merger_params.idl')[0], ], LIBDEPS=[ "$BUILD_DIR/mongo/db/query/command_request_response", diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 45403399e8c..59d9a133d72 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -82,27 +82,20 @@ int compareSortKeys(BSONObj leftSortKey, BSONObj rightSortKey, BSONObj sortKeyPa AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, executor::TaskExecutor* executor, - AsyncResultsMergerParams params) + ClusterClientCursorParams* params) : _opCtx(opCtx), _executor(executor), - // This strange initialization is to work around the fact that the IDL does not currently - // support a default value for an enum. The default tailable mode should be 'kNormal', but - // since that is not supported we treat boost::none (unspecified) to mean 'kNormal'. - _tailableMode(params.getTailableMode() ? *params.getTailableMode() - : TailableModeEnum::kNormal), - _params(std::move(params)), - _mergeQueue(MergingComparator(_remotes, - _params.getSort() ? *_params.getSort() : BSONObj(), - _params.getCompareWholeSortKey())) { + _params(params), + _mergeQueue(MergingComparator(_remotes, _params->sort, _params->compareWholeSortKey)) { size_t remoteIndex = 0; - for (const auto& remote : _params.getRemotes()) { - _remotes.emplace_back(remote.getHostAndPort(), - remote.getCursorResponse().getNSS(), - remote.getCursorResponse().getCursorId()); + for (const auto& remote : _params->remotes) { + _remotes.emplace_back(remote.hostAndPort, + remote.cursorResponse.getNSS(), + remote.cursorResponse.getCursorId()); // We don't check the return value of _addBatchToBuffer here; if there was an error, // it will be stored in the remote and the first call to ready() will return true. - _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse()); + _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse); ++remoteIndex; } } @@ -130,7 +123,7 @@ bool AsyncResultsMerger::_remotesExhausted(WithLock) { Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_tailableMode != TailableModeEnum::kTailableAndAwaitData) { + if (_params->tailableMode != TailableMode::kTailableAndAwaitData) { return Status(ErrorCodes::BadValue, "maxTimeMS can only be used with getMore for tailable, awaitData cursors"); } @@ -140,9 +133,9 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { // recent optimes, which allows us to return sorted $changeStream results even if some shards // are yet to provide a batch of data. If the timeout specified by the client is greater than // 1000ms, then it will be enforced elsewhere. - _awaitDataTimeout = - (_params.getSort() && _remotes.size() > 1u ? std::min(awaitDataTimeout, Milliseconds{1000}) - : awaitDataTimeout); + _awaitDataTimeout = (!_params->sort.isEmpty() && _remotes.size() > 1u + ? std::min(awaitDataTimeout, Milliseconds{1000}) + : awaitDataTimeout); return Status::OK(); } @@ -168,12 +161,13 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) { _opCtx = opCtx; } -void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCursors) { +void AsyncResultsMerger::addNewShardCursors( + const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors) { stdx::lock_guard<stdx::mutex> lk(_mutex); for (auto&& remote : newCursors) { - _remotes.emplace_back(remote.getHostAndPort(), - remote.getCursorResponse().getNSS(), - remote.getCursorResponse().getCursorId()); + _remotes.emplace_back(remote.hostAndPort, + remote.cursorResponse.getNSS(), + remote.cursorResponse.getCursorId()); } } @@ -196,15 +190,16 @@ bool AsyncResultsMerger::_ready(WithLock lk) { } } - return _params.getSort() ? _readySorted(lk) : _readyUnsorted(lk); + const bool hasSort = !_params->sort.isEmpty(); + return hasSort ? _readySorted(lk) : _readyUnsorted(lk); } bool AsyncResultsMerger::_readySorted(WithLock lk) { - if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) { + if (_params->tailableMode == TailableMode::kTailableAndAwaitData) { return _readySortedTailable(lk); } // Tailable non-awaitData cursors cannot have a sort. - invariant(_tailableMode == TailableModeEnum::kNormal); + invariant(_params->tailableMode == TailableMode::kNormal); for (const auto& remote : _remotes) { if (!remote.hasNext() && !remote.exhausted()) { @@ -223,14 +218,13 @@ bool AsyncResultsMerger::_readySortedTailable(WithLock) { auto smallestRemote = _mergeQueue.top(); auto smallestResult = _remotes[smallestRemote].docBuffer.front(); auto keyWeWantToReturn = - extractSortKey(*smallestResult.getResult(), _params.getCompareWholeSortKey()); + extractSortKey(*smallestResult.getResult(), _params->compareWholeSortKey); for (const auto& remote : _remotes) { if (!remote.promisedMinSortKey) { // In order to merge sorted tailable cursors, we need this value to be populated. return false; } - if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, *_params.getSort()) > - 0) { + if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, _params->sort) > 0) { // The key we want to return is not guaranteed to be smaller than future results from // this remote, so we can't yet return it. return false; @@ -270,12 +264,13 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() { return {ClusterQueryResult()}; } - return _params.getSort() ? _nextReadySorted(lk) : _nextReadyUnsorted(lk); + const bool hasSort = !_params->sort.isEmpty(); + return hasSort ? _nextReadySorted(lk) : _nextReadyUnsorted(lk); } ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) { // Tailable non-awaitData cursors cannot have a sort. - invariant(_tailableMode != TailableModeEnum::kTailable); + invariant(_params->tailableMode != TailableMode::kTailable); if (_mergeQueue.empty()) { return {}; @@ -309,7 +304,7 @@ ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) { ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front(); _remotes[_gettingFromRemote].docBuffer.pop(); - if (_tailableMode == TailableModeEnum::kTailable && + if (_params->tailableMode == TailableMode::kTailable && !_remotes[_gettingFromRemote].hasNext()) { // The cursor is tailable and we're about to return the last buffered result. This // means that the next value returned should be boost::none to indicate the end of @@ -339,9 +334,9 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { // request to fetch the remaining docs only. If the remote node has a plan with OR for top k and // a full sort as is the case for the OP_QUERY find then this optimization will prevent // switching to the full sort plan branch. - auto adjustedBatchSize = _params.getBatchSize(); - if (_params.getBatchSize() && *_params.getBatchSize() > remote.fetchedCount) { - adjustedBatchSize = *_params.getBatchSize() - remote.fetchedCount; + auto adjustedBatchSize = _params->batchSize; + if (_params->batchSize && *_params->batchSize > remote.fetchedCount) { + adjustedBatchSize = *_params->batchSize - remote.fetchedCount; } BSONObj cmdObj = GetMoreRequest(remote.cursorNss, @@ -353,7 +348,7 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { .toBSON(); executor::RemoteCommandRequest request( - remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, _opCtx); + remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _opCtx); auto callbackStatus = _executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) { @@ -453,8 +448,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, remote->cursorId = response.getCursorId(); if (response.getLastOplogTimestamp() && !response.getLastOplogTimestamp()->isNull()) { // We only expect to see this for change streams. - invariant(_params.getSort()); - invariant(SimpleBSONObjComparator::kInstance.evaluate(*_params.getSort() == + invariant(SimpleBSONObjComparator::kInstance.evaluate(_params->sort == change_stream_constants::kSortSpec)); auto newLatestTimestamp = *response.getLastOplogTimestamp(); @@ -481,7 +475,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, auto maxSortKeyFromResponse = (response.getBatch().empty() ? BSONObj() - : extractSortKey(response.getBatch().back(), _params.getCompareWholeSortKey())); + : extractSortKey(response.getBatch().back(), _params->compareWholeSortKey)); remote->promisedMinSortKey = (compareSortKeys( @@ -531,7 +525,7 @@ void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t remote.status = std::move(status); // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We // remove the unreachable host entirely from consideration by marking it as exhausted. - if (_params.getAllowPartialResults()) { + if (_params->isAllowPartialResults) { remote.status = Status::OK(); // Clear the results buffer and cursor id. @@ -571,7 +565,7 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk, // through to the client as-is. // (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from // one shard means the end of the overall batch). - if (_tailableMode == TailableModeEnum::kTailable && !remote.hasNext()) { + if (_params->tailableMode == TailableMode::kTailable && !remote.hasNext()) { invariant(_remotes.size() == 1); _eofNext = true; } else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive) { @@ -588,7 +582,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, updateRemoteMetadata(&remote, response); for (const auto& obj : response.getBatch()) { // If there's a sort, we're expecting the remote node to have given us back a sort key. - if (_params.getSort()) { + if (!_params->sort.isEmpty()) { auto key = obj[AsyncResultsMerger::kSortKeyField]; if (!key) { remote.status = @@ -597,7 +591,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, << "' in document: " << obj); return false; - } else if (!_params.getCompareWholeSortKey() && key.type() != BSONType::Object) { + } else if (!_params->compareWholeSortKey && key.type() != BSONType::Object) { remote.status = Status(ErrorCodes::InternalError, str::stream() << "Field '" << AsyncResultsMerger::kSortKeyField @@ -612,9 +606,9 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, ++remote.fetchedCount; } - // If we're doing a sorted merge, then we have to make sure to put this remote onto the merge - // queue. - if (_params.getSort() && !response.getBatch().empty()) { + // If we're doing a sorted merge, then we have to make sure to put this remote onto the + // merge queue. + if (!_params->sort.isEmpty() && !response.getBatch().empty()) { _mergeQueue.push(remoteIndex); } return true; @@ -644,10 +638,10 @@ void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx) for (const auto& remote : _remotes) { if (remote.status.isOK() && remote.cursorId && !remote.exhausted()) { - BSONObj cmdObj = KillCursorsRequest(_params.getNss(), {remote.cursorId}).toBSON(); + BSONObj cmdObj = KillCursorsRequest(_params->nsString, {remote.cursorId}).toBSON(); executor::RemoteCommandRequest request( - remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, opCtx); + remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx); // Send kill request; discard callback handle, if any, or failure report, if not. _executor->scheduleRemoteCommand(request, [](auto const&) {}).getStatus().ignore(); diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 1653374b5bc..e2551a3c7dd 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -37,7 +37,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/cursor_id.h" #include "mongo/executor/task_executor.h" -#include "mongo/s/query/async_results_merger_params_gen.h" +#include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/cluster_query_result.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/with_lock.h" @@ -98,7 +98,7 @@ public: */ AsyncResultsMerger(OperationContext* opCtx, executor::TaskExecutor* executor, - AsyncResultsMergerParams params); + ClusterClientCursorParams* params); /** * In order to be destroyed, either the ARM must have been kill()'ed or all cursors must have @@ -195,11 +195,7 @@ public: * Adds the specified shard cursors to the set of cursors to be merged. The results from the * new cursors will be returned as normal through nextReady(). */ - void addNewShardCursors(std::vector<RemoteCursor>&& newCursors); - - std::size_t getNumRemotes() const { - return _remotes.size(); - } + void addNewShardCursors(const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors); /** * Starts shutting down this ARM by canceling all pending requests and scheduling killCursors @@ -297,7 +293,7 @@ private: private: const std::vector<RemoteCursorData>& _remotes; - const BSONObj _sort; + const BSONObj& _sort; // When '_compareWholeSortKey' is true, $sortKey is a scalar value, rather than an object. // We extract the sort key {$sortKey: <value>}. The sort key pattern '_sort' is verified to @@ -405,8 +401,7 @@ private: OperationContext* _opCtx; executor::TaskExecutor* _executor; - TailableModeEnum _tailableMode; - AsyncResultsMergerParams _params; + ClusterClientCursorParams* _params; // Must be acquired before accessing any data members (other than _params, which is read-only). stdx::mutex _mutex; diff --git a/src/mongo/s/query/async_results_merger_params.idl b/src/mongo/s/query/async_results_merger_params.idl deleted file mode 100644 index dafc9b53c1c..00000000000 --- a/src/mongo/s/query/async_results_merger_params.idl +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright (C) 2018 MongoDB Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License, version 3, -# as published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# -# As a special exception, the copyright holders give permission to link the -# code of portions of this program with the OpenSSL library under certain -# conditions as described in each individual source file and distribute -# linked combinations including the program with the OpenSSL library. You -# must comply with the GNU Affero General Public License in all respects for -# all of the code used other than as permitted herein. If you modify file(s) -# with this exception, you may extend this exception to your version of the -# file(s), but you are not obligated to do so. If you do not wish to do so, -# delete this exception statement from your version. If you delete this -# exception statement from all source files in the program, then also delete -# it in the license file. - -global: - cpp_namespace: "mongo" - cpp_includes: - - "mongo/s/shard_id.h" - - "mongo/util/net/hostandport.h" - - "mongo/db/query/cursor_response.h" - -imports: - - "mongo/db/query/tailable_mode.idl" - - "mongo/idl/basic_types.idl" - - "mongo/util/net/hostandport.idl" - -types: - CursorResponse: - bson_serialization_type: object - description: The first batch returned after establishing cursors on a shard. - cpp_type: CursorResponse - serializer: CursorResponse::toBSONAsInitialResponse - deserializer: CursorResponse::parseFromBSONThrowing - -structs: - RemoteCursor: - description: A description of a cursor opened on a remote server. - fields: - shardId: - type: string - description: The shardId of the shard on which the cursor resides. - hostAndPort: - type: HostAndPort - description: The exact host (within the shard) on which the cursor resides. - cursorResponse: - type: CursorResponse - description: The response after establishing a cursor on the remote shard, including - the first batch. - - AsyncResultsMergerParams: - description: The parameters needed to establish an AsyncResultsMerger. - fields: - sort: - type: object - description: The sort requested on the merging operation. Empty if there is no sort. - optional: true - compareWholeSortKey: - type: bool - default: false - description: >- - When 'compareWholeSortKey' is true, $sortKey is a scalar value, rather than an - object. We extract the sort key {$sortKey: <value>}. The sort key pattern is - verified to be {$sortKey: 1}. - remotes: array<RemoteCursor> - tailableMode: - type: TailableMode - optional: true - description: If set, the tailability mode of this cursor. - batchSize: - type: safeInt64 - optional: true - description: The batch size for this cursor. - nss: namespacestring - allowPartialResults: - type: bool - default: false - description: If set, error responses are ignored. diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 6fd81715e90..4c0e32cba51 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -62,11 +62,9 @@ const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", HostAndPort("FakeShard2Host", 12345), HostAndPort("FakeShard3Host", 12345)}; -const NamespaceString kTestNss("testdb.testcoll"); - class AsyncResultsMergerTest : public ShardingTestFixture { public: - AsyncResultsMergerTest() {} + AsyncResultsMergerTest() : _nss("testdb.testcoll") {} void setUp() override { ShardingTestFixture::setUp(); @@ -97,48 +95,42 @@ public: void tearDown() override { ShardingTestFixture::tearDown(); + // Reset _params only after shutting down the network interface (through + // ShardingTestFixture::tearDown()), because shutting down the network interface will + // deliver blackholed responses to the AsyncResultsMerger, and the AsyncResultsMerger's + // callback may still access _params. + _params.reset(); } protected: /** * Constructs an ARM with the given vector of existing cursors. * - * If 'findCmd' is not set, the default AsyncResultsMergerParams are used. - * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams. + * If 'findCmd' is not set, the default ClusterClientCursorParams are used. + * Otherwise, the 'findCmd' is used to construct the ClusterClientCursorParams. * * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.' */ - std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors( - std::vector<RemoteCursor> remoteCursors, + void makeCursorFromExistingCursors( + std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors, boost::optional<BSONObj> findCmd = boost::none, - boost::optional<std::int64_t> getMoreBatchSize = boost::none) { - AsyncResultsMergerParams params; - params.setNss(kTestNss); - params.setRemotes(std::move(remoteCursors)); - + boost::optional<long long> getMoreBatchSize = boost::none) { + _params = stdx::make_unique<ClusterClientCursorParams>(_nss); + _params->remotes = std::move(remoteCursors); if (findCmd) { const auto qr = unittest::assertGet( - QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */)); - if (!qr->getSort().isEmpty()) { - params.setSort(qr->getSort().getOwned()); - } - - if (getMoreBatchSize) { - params.setBatchSize(getMoreBatchSize); - } else { - params.setBatchSize(qr->getBatchSize() - ? boost::optional<std::int64_t>( - static_cast<std::int64_t>(*qr->getBatchSize())) - : boost::none); - } - params.setTailableMode(qr->getTailableMode()); - params.setAllowPartialResults(qr->isAllowPartialResults()); + QueryRequest::makeFromFindCommand(_nss, *findCmd, false /* isExplain */)); + _params->sort = qr->getSort(); + _params->limit = qr->getLimit(); + _params->batchSize = getMoreBatchSize ? getMoreBatchSize : qr->getBatchSize(); + _params->skip = qr->getSkip(); + _params->tailableMode = qr->getTailableMode(); + _params->isAllowPartialResults = qr->isAllowPartialResults(); } - return stdx::make_unique<AsyncResultsMerger>( - operationContext(), executor(), std::move(params)); + arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), _params.get()); } /** @@ -228,6 +220,11 @@ protected: net->blackHole(net->getNextReadyRequest()); net->exitNetwork(); } + + const NamespaceString _nss; + std::unique_ptr<ClusterClientCursorParams> _params; + + std::unique_ptr<AsyncResultsMerger> arm; }; void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) { @@ -243,19 +240,10 @@ void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) { ASSERT_EQ(numCursors, 1u); } -RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) { - RemoteCursor remoteCursor; - remoteCursor.setShardId(std::move(shardId)); - remoteCursor.setHostAndPort(std::move(host)); - remoteCursor.setCursorResponse(std::move(response)); - return remoteCursor; -} - TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {})); + makeCursorFromExistingCursors(std::move(cursors)); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -271,7 +259,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) { // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated. std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; - responses.emplace_back(kTestNss, CursorId(0), batch); + responses.emplace_back(_nss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -297,10 +285,9 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) { TEST_F(AsyncResultsMergerTest, SingleShardSorted) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -316,7 +303,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) { // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated. std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")}; - responses.emplace_back(kTestNss, CursorId(0), batch); + responses.emplace_back(_nss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -341,12 +328,10 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) { } TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {})); + makeCursorFromExistingCursors(std::move(cursors)); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -363,7 +348,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; - responses.emplace_back(kTestNss, CursorId(0), batch1); + responses.emplace_back(_nss, CursorId(0), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -391,7 +376,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { responses.clear(); std::vector<BSONObj> batch2 = { fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(kTestNss, CursorId(0), batch2); + responses.emplace_back(_nss, CursorId(0), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -407,20 +392,18 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult()); - // After returning all the buffered results, the ARM returns EOF immediately because both shards - // cursors were exhausted. + // After returning all the buffered results, the ARM returns EOF immediately because both + // shards cursors were exhausted. ASSERT_TRUE(arm->ready()); ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, MultiShardSorted) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -437,7 +420,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")}; - responses.emplace_back(kTestNss, CursorId(0), batch1); + responses.emplace_back(_nss, CursorId(0), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -451,7 +434,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) { responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 3}}"), fromjson("{$sortKey: {'': 9}}")}; - responses.emplace_back(kTestNss, CursorId(0), batch2); + responses.emplace_back(_nss, CursorId(0), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -473,19 +456,17 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) { ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 9}}"), *unittest::assertGet(arm->nextReady()).getResult()); - // After returning all the buffered results, the ARM returns EOF immediately because both shards - // cursors were exhausted. + // After returning all the buffered results, the ARM returns EOF immediately because both + // shards cursors were exhausted. ASSERT_TRUE(arm->ready()); ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {})); + makeCursorFromExistingCursors(std::move(cursors)); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -498,7 +479,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; - responses.emplace_back(kTestNss, CursorId(5), batch1); + responses.emplace_back(_nss, CursorId(5), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -527,7 +508,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { responses.clear(); std::vector<BSONObj> batch2 = { fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(kTestNss, CursorId(0), batch2); + responses.emplace_back(_nss, CursorId(0), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -555,7 +536,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { responses.clear(); std::vector<BSONObj> batch3 = { fromjson("{_id: 7}"), fromjson("{_id: 8}"), fromjson("{_id: 9}")}; - responses.emplace_back(kTestNss, CursorId(0), batch3); + responses.emplace_back(_nss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -571,22 +552,19 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady()).getResult()); - // After returning all the buffered results, the ARM returns EOF immediately because both shards - // cursors were exhausted. + // After returning all the buffered results, the ARM returns EOF immediately because both + // shards cursors were exhausted. ASSERT_TRUE(arm->ready()); ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, CompoundSortKey) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 7, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {})); + cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 7, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); // Schedule requests. ASSERT_FALSE(arm->ready()); @@ -597,13 +575,13 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5, '': 9}}"), fromjson("{$sortKey: {'': 4, '': 20}}")}; - responses.emplace_back(kTestNss, CursorId(0), batch1); + responses.emplace_back(_nss, CursorId(0), batch1); std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 10, '': 11}}"), fromjson("{$sortKey: {'': 4, '': 4}}")}; - responses.emplace_back(kTestNss, CursorId(0), batch2); + responses.emplace_back(_nss, CursorId(0), batch2); std::vector<BSONObj> batch3 = {fromjson("{$sortKey: {'': 10, '': 12}}"), fromjson("{$sortKey: {'': 5, '': 9}}")}; - responses.emplace_back(kTestNss, CursorId(0), batch3); + responses.emplace_back(_nss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -629,18 +607,17 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) { ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 4, '': 20}}"), *unittest::assertGet(arm->nextReady()).getResult()); - // After returning all the buffered results, the ARM returns EOF immediately because both shards - // cursors were exhausted. + // After returning all the buffered results, the ARM returns EOF immediately because both + // shards cursors were exhausted. ASSERT_TRUE(arm->ready()); ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -649,7 +626,7 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) { // Parsing the batch results in an error because the sort key is missing. std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{a: 2, b: 1}"), fromjson("{a: 1, b: 2}")}; - responses.emplace_back(kTestNss, CursorId(1), batch1); + responses.emplace_back(_nss, CursorId(1), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -667,10 +644,10 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) { TEST_F(AsyncResultsMergerTest, HasFirstBatch) { std::vector<BSONObj> firstBatch = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; - std::vector<RemoteCursor> cursors; - cursors.push_back(makeRemoteCursor( - kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, std::move(firstBatch)))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back( + kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, std::move(firstBatch))); + makeCursorFromExistingCursors(std::move(cursors)); // Because there was firstBatch, ARM is immediately ready to return results. ASSERT_TRUE(arm->ready()); @@ -698,7 +675,7 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) { // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated. std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(kTestNss, CursorId(0), batch); + responses.emplace_back(_nss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -725,12 +702,11 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) { TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) { std::vector<BSONObj> firstBatch = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; - std::vector<RemoteCursor> cursors; - cursors.push_back(makeRemoteCursor( - kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, std::move(firstBatch)))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 0, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back( + kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, std::move(firstBatch))); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 0, {})); + makeCursorFromExistingCursors(std::move(cursors)); // Because there was firstBatch, ARM is immediately ready to return results. ASSERT_TRUE(arm->ready()); @@ -758,7 +734,7 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) { // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated. std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(kTestNss, CursorId(0), batch); + responses.emplace_back(_nss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -783,12 +759,10 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) { } TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -797,9 +771,9 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { // Both shards respond with the first batch. std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(kTestNss, CursorId(1), batch1); + responses.emplace_back(_nss, CursorId(1), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(kTestNss, CursorId(2), batch2); + responses.emplace_back(_nss, CursorId(2), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -821,7 +795,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { // never responds. responses.clear(); std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(kTestNss, CursorId(1), batch3); + responses.emplace_back(_nss, CursorId(1), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); blackHoleNextRequest(); @@ -839,7 +813,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { // We can continue to return results from first shard, while second shard remains unresponsive. responses.clear(); std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")}; - responses.emplace_back(kTestNss, CursorId(0), batch4); + responses.emplace_back(_nss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -854,15 +828,12 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { // the network interface. auto killEvent = arm->kill(operationContext()); ASSERT_TRUE(killEvent.isValid()); - executor()->shutdown(); - executor()->waitForEvent(killEvent); } TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -870,7 +841,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(kTestNss, CursorId(456), batch); + responses.emplace_back(_nss, CursorId(456), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -884,25 +855,22 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { } TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 789, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {})); + cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 789, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - BSONObj response1 = CursorResponse(kTestNss, CursorId(123), batch1) + BSONObj response1 = CursorResponse(_nss, CursorId(123), batch1) .toBSON(CursorResponse::ResponseType::SubsequentResponse); BSONObj response2 = fromjson("{foo: 'bar'}"); std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")}; - BSONObj response3 = CursorResponse(kTestNss, CursorId(789), batch3) + BSONObj response3 = CursorResponse(_nss, CursorId(789), batch3) .toBSON(CursorResponse::ResponseType::SubsequentResponse); scheduleNetworkResponseObjs({response1, response2, response3}); runReadyCallbacks(); @@ -917,14 +885,11 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { } TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {})); + cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -932,9 +897,9 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(kTestNss, CursorId(1), batch1); + responses.emplace_back(_nss, CursorId(1), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(kTestNss, CursorId(2), batch2); + responses.emplace_back(_nss, CursorId(2), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -953,10 +918,9 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { } TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -966,7 +930,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(kTestNss, CursorId(0), batch); + responses.emplace_back(_nss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -984,10 +948,9 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { } TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); executor()->shutdown(); ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent().getStatus()); @@ -996,10 +959,9 @@ TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) { } TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatches) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); // Make a request to the shard that will never get answered. ASSERT_FALSE(arm->ready()); @@ -1017,10 +979,9 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch } TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto killedEvent = arm->kill(operationContext()); @@ -1035,14 +996,11 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) { } TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {})); + cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1050,11 +1008,11 @@ TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(kTestNss, CursorId(0), batch1); + responses.emplace_back(_nss, CursorId(0), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(kTestNss, CursorId(0), batch2); + responses.emplace_back(_nss, CursorId(0), batch2); std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(kTestNss, CursorId(0), batch3); + responses.emplace_back(_nss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1069,14 +1027,11 @@ TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) { } TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 123, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {})); + cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1084,12 +1039,12 @@ TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(kTestNss, CursorId(0), batch1); + responses.emplace_back(_nss, CursorId(0), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(kTestNss, CursorId(0), batch2); + responses.emplace_back(_nss, CursorId(0), batch2); // Cursor 3 is not exhausted. std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(kTestNss, CursorId(123), batch3); + responses.emplace_back(_nss, CursorId(123), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1104,14 +1059,11 @@ TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) { } TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {})); + cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1119,7 +1071,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(kTestNss, CursorId(0), batch1); + responses.emplace_back(_nss, CursorId(0), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1139,10 +1091,9 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { } TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1150,7 +1101,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(kTestNss, CursorId(1), batch1); + responses.emplace_back(_nss, CursorId(1), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1163,10 +1114,9 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { } TEST_F(AsyncResultsMergerTest, KillCalledTwice) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); auto killedEvent1 = arm->kill(operationContext()); ASSERT(killedEvent1.isValid()); auto killedEvent2 = arm->kill(operationContext()); @@ -1177,10 +1127,9 @@ TEST_F(AsyncResultsMergerTest, KillCalledTwice) { TEST_F(AsyncResultsMergerTest, TailableBasic) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1188,7 +1137,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(kTestNss, CursorId(123), batch1); + responses.emplace_back(_nss, CursorId(123), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1209,7 +1158,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")}; - responses.emplace_back(kTestNss, CursorId(123), batch2); + responses.emplace_back(_nss, CursorId(123), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1227,10 +1176,9 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1239,7 +1187,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { // Remote responds with an empty batch and a non-zero cursor id. std::vector<CursorResponse> responses; std::vector<BSONObj> batch; - responses.emplace_back(kTestNss, CursorId(123), batch); + responses.emplace_back(_nss, CursorId(123), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1256,10 +1204,9 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1268,7 +1215,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { // Remote responds with an empty batch and a zero cursor id. std::vector<CursorResponse> responses; std::vector<BSONObj> batch; - responses.emplace_back(kTestNss, CursorId(0), batch); + responses.emplace_back(_nss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1282,10 +1229,9 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 3}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1293,7 +1239,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(kTestNss, CursorId(1), batch1); + responses.emplace_back(_nss, CursorId(1), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1308,7 +1254,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")}; - responses.emplace_back(kTestNss, CursorId(0), batch2); + responses.emplace_back(_nss, CursorId(0), batch2); readyEvent = unittest::assertGet(arm->nextEvent()); BSONObj scheduledCmd = getNthPendingRequest(0).cmdObj; @@ -1328,14 +1274,11 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { TEST_F(AsyncResultsMergerTest, AllowPartialResults) { BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 97, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 98, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 99, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 97, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 98, {})); + cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 99, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1349,9 +1292,9 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { // remaining shards. std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; - responses.emplace_back(kTestNss, CursorId(98), batch1); + responses.emplace_back(_nss, CursorId(98), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")}; - responses.emplace_back(kTestNss, CursorId(99), batch2); + responses.emplace_back(_nss, CursorId(99), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1372,7 +1315,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { responses.clear(); std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")}; - responses.emplace_back(kTestNss, CursorId(99), batch3); + responses.emplace_back(_nss, CursorId(99), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1387,7 +1330,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { // Once the last reachable shard indicates that its cursor is closed, we're done. responses.clear(); std::vector<BSONObj> batch4 = {}; - responses.emplace_back(kTestNss, CursorId(0), batch4); + responses.emplace_back(_nss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1398,10 +1341,9 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 98, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 98, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1409,7 +1351,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(kTestNss, CursorId(98), batch); + responses.emplace_back(_nss, CursorId(98), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1432,12 +1374,10 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) { BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 2, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 2, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1446,7 +1386,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) { // First host returns single result std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}")}; - responses.emplace_back(kTestNss, CursorId(0), batch); + responses.emplace_back(_nss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1464,12 +1404,10 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) { TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 2, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 2, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1494,10 +1432,9 @@ TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) { TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1505,7 +1442,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; - responses.emplace_back(kTestNss, CursorId(123), batch1); + responses.emplace_back(_nss, CursorId(123), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1527,7 +1464,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")}; - responses.emplace_back(kTestNss, CursorId(123), batch2); + responses.emplace_back(_nss, CursorId(123), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1548,24 +1485,20 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { // Clean up. responses.clear(); std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")}; - responses.emplace_back(kTestNss, CursorId(0), batch3); + responses.emplace_back(_nss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoOplogTimestamp) { - AsyncResultsMergerParams params; - params.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {}))); - params.setRemotes(std::move(cursors)); - params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); - params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}")); - auto arm = - stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); + auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {})); + params->remotes = std::move(cursors); + params->tailableMode = TailableMode::kTailableAndAwaitData; + params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"); + arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1577,7 +1510,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}; const Timestamp lastObservedFirstCursor = Timestamp(1, 6); - responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); + responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1590,7 +1523,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas fromjson("{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: 1, documentKey: {_id: 2}}, " "$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")}; const Timestamp lastObservedSecondCursor = Timestamp(1, 5); - responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); + responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1610,40 +1543,37 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas // Clean up the cursors. responses.clear(); std::vector<BSONObj> batch3 = {}; - responses.emplace_back(kTestNss, CursorId(0), batch3); + responses.emplace_back(_nss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); responses.clear(); std::vector<BSONObj> batch4 = {}; - responses.emplace_back(kTestNss, CursorId(0), batch4); + responses.emplace_back(_nss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNullOplogTimestamp) { - AsyncResultsMergerParams params; - params.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.push_back(makeRemoteCursor( + auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back( kTestShardIds[0], kTestShardHosts[0], CursorResponse( - kTestNss, + _nss, 123, {fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}, boost::none, - Timestamp(1, 5)))); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], + Timestamp(1, 5))); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], - CursorResponse(kTestNss, 456, {}, boost::none, Timestamp()))); - params.setRemotes(std::move(cursors)); - params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); - params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}")); - auto arm = - stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); + CursorResponse(_nss, 456, {}, boost::none, Timestamp())); + params->remotes = std::move(cursors); + params->tailableMode = TailableMode::kTailableAndAwaitData; + params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"); + arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1651,7 +1581,7 @@ TEST_F(AsyncResultsMergerTest, std::vector<CursorResponse> responses; std::vector<BSONObj> batch3 = {}; - responses.emplace_back(kTestNss, CursorId(0), batch3, boost::none, Timestamp(1, 8)); + responses.emplace_back(_nss, CursorId(0), batch3, boost::none, Timestamp(1, 8)); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(unittest::assertGet(arm->nextEvent())); @@ -1667,34 +1597,31 @@ TEST_F(AsyncResultsMergerTest, // Clean up. responses.clear(); std::vector<BSONObj> batch4 = {}; - responses.emplace_back(kTestNss, CursorId(0), batch4); + responses.emplace_back(_nss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOplogTime) { - AsyncResultsMergerParams params; - params.setNss(kTestNss); - std::vector<RemoteCursor> cursors; + auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; Timestamp tooLow = Timestamp(1, 2); - cursors.push_back(makeRemoteCursor( + cursors.emplace_back( kTestShardIds[0], kTestShardHosts[0], CursorResponse( - kTestNss, + _nss, 123, {fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}, boost::none, - Timestamp(1, 5)))); - cursors.push_back(makeRemoteCursor(kTestShardIds[1], - kTestShardHosts[1], - CursorResponse(kTestNss, 456, {}, boost::none, tooLow))); - params.setRemotes(std::move(cursors)); - params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); - params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}")); - auto arm = - stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); + Timestamp(1, 5))); + cursors.emplace_back( + kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}, boost::none, tooLow)); + params->remotes = std::move(cursors); + params->tailableMode = TailableMode::kTailableAndAwaitData; + params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"); + arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1702,7 +1629,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOp // Clean up the cursors. std::vector<CursorResponse> responses; - responses.emplace_back(kTestNss, CursorId(0), std::vector<BSONObj>{}); + responses.emplace_back(_nss, CursorId(0), std::vector<BSONObj>{}); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); auto killEvent = arm->kill(operationContext()); @@ -1710,16 +1637,13 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOp } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) { - AsyncResultsMergerParams params; - params.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); - params.setRemotes(std::move(cursors)); - params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); - params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}")); - auto arm = - stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); + auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); + params->remotes = std::move(cursors); + params->tailableMode = TailableMode::kTailableAndAwaitData; + params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"); + arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1731,7 +1655,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}; const Timestamp lastObservedFirstCursor = Timestamp(1, 6); - responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); + responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1739,10 +1663,9 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) ASSERT_TRUE(arm->ready()); // Add the new shard. - std::vector<RemoteCursor> newCursors; - newCursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {}))); - arm->addNewShardCursors(std::move(newCursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> newCursors; + newCursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {})); + arm->addNewShardCursors(newCursors); // Now shouldn't be ready, we don't have a guarantee from each shard. ASSERT_FALSE(arm->ready()); @@ -1754,7 +1677,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) fromjson("{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: 1, documentKey: {_id: 2}}, " "$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")}; const Timestamp lastObservedSecondCursor = Timestamp(1, 5); - responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); + responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1774,27 +1697,24 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) // Clean up the cursors. responses.clear(); std::vector<BSONObj> batch3 = {}; - responses.emplace_back(kTestNss, CursorId(0), batch3); + responses.emplace_back(_nss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); responses.clear(); std::vector<BSONObj> batch4 = {}; - responses.emplace_back(kTestNss, CursorId(0), batch4); + responses.emplace_back(_nss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting) { - AsyncResultsMergerParams params; - params.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); - params.setRemotes(std::move(cursors)); - params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); - params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}")); - auto arm = - stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); + auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); + params->remotes = std::move(cursors); + params->tailableMode = TailableMode::kTailableAndAwaitData; + params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"); + arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1806,7 +1726,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}; const Timestamp lastObservedFirstCursor = Timestamp(1, 6); - responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); + responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1814,10 +1734,9 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting ASSERT_TRUE(arm->ready()); // Add the new shard. - std::vector<RemoteCursor> newCursors; - newCursors.push_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {}))); - arm->addNewShardCursors(std::move(newCursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> newCursors; + newCursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {})); + arm->addNewShardCursors(newCursors); // Now shouldn't be ready, we don't have a guarantee from each shard. ASSERT_FALSE(arm->ready()); @@ -1831,7 +1750,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting // The last observed time should still be later than the first shard, so we can get the data // from it. const Timestamp lastObservedSecondCursor = Timestamp(1, 5); - responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); + responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1851,22 +1770,21 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting // Clean up the cursors. responses.clear(); std::vector<BSONObj> batch3 = {}; - responses.emplace_back(kTestNss, CursorId(0), batch3); + responses.emplace_back(_nss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); responses.clear(); std::vector<BSONObj> batch4 = {}; - responses.emplace_back(kTestNss, CursorId(0), batch4); + responses.emplace_back(_nss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); } TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); auto killEvent = arm->kill(operationContext()); @@ -1875,10 +1793,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); auto killEvent = arm->kill(operationContext()); @@ -1887,10 +1804,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) { TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); + makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1904,10 +1820,9 @@ TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) { } TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulingKillCursors) { - std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -1935,10 +1850,9 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin } TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) { - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -1957,7 +1871,7 @@ TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) { // exhausted. onCommand([&](const auto& request) { ASSERT(request.cmdObj["getMore"]); - return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)}) + return CursorResponse(_nss, 0LL, {BSON("x" << 1)}) .toBSON(CursorResponse::ResponseType::SubsequentResponse); }); @@ -1965,10 +1879,9 @@ TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) { } TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) { - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); // Issue a blocking wait for the next result asynchronously on a different thread. auto future = launchAsync([&]() { @@ -1992,10 +1905,9 @@ TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) { } TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilKilled) { - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); + makeCursorFromExistingCursors(std::move(cursors)); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index 9c01c013ce6..653599def7e 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -113,7 +113,7 @@ public: /** * Returns a reference to the vector of remote hosts involved in this operation. */ - virtual const std::size_t getNumRemotes() const = 0; + virtual const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const = 0; /** * Returns the number of result documents returned so far by this cursor via the next() method. diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 1a4be45f1be..58484e87bfa 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -30,7 +30,6 @@ #include "mongo/s/query/cluster_client_cursor_impl.h" -#include "mongo/db/pipeline/cluster_aggregation_planner.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_skip.h" #include "mongo/db/pipeline/document_source_sort.h" @@ -136,19 +135,20 @@ OperationContext* ClusterClientCursorImpl::getCurrentOperationContext() const { } bool ClusterClientCursorImpl::isTailable() const { - return _params.tailableMode != TailableModeEnum::kNormal; + return _params.tailableMode != TailableMode::kNormal; } bool ClusterClientCursorImpl::isTailableAndAwaitData() const { - return _params.tailableMode == TailableModeEnum::kTailableAndAwaitData; + return _params.tailableMode == TailableMode::kTailableAndAwaitData; } BSONObj ClusterClientCursorImpl::getOriginatingCommand() const { return _params.originatingCommandObj; } -const std::size_t ClusterClientCursorImpl::getNumRemotes() const { - return _root->getNumRemotes(); +const std::vector<ClusterClientCursorParams::RemoteCursor>& ClusterClientCursorImpl::getRemotes() + const { + return _params.remotes; } long long ClusterClientCursorImpl::getNumReturnedSoFar() const { @@ -181,6 +181,30 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc namespace { +/** + * Rips off an initial $sort stage that will be handled by mongos execution machinery. Returns the + * sort key pattern of such a $sort stage if there was one, and boost::none otherwise. + */ +boost::optional<BSONObj> extractLeadingSort(Pipeline* mergePipeline) { + // Remove a leading $sort iff it is a mergesort, since the ARM cannot handle blocking $sort. + auto frontSort = mergePipeline->popFrontWithCriteria( + DocumentSourceSort::kStageName, [](const DocumentSource* const source) { + return static_cast<const DocumentSourceSort* const>(source)->mergingPresorted(); + }); + + if (frontSort) { + auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get()); + if (auto sortLimit = sortStage->getLimitSrc()) { + // There was a limit stage absorbed into the sort stage, so we need to preserve that. + mergePipeline->addInitialSource(sortLimit); + } + return sortStage + ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging) + .toBson(); + } + return boost::none; +} + bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) { return (dynamic_cast<DocumentSourceLimit*>(stage.get()) || dynamic_cast<DocumentSourceSkip*>(stage.get())); @@ -226,10 +250,10 @@ std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* execu // instead. while (!pipeline->getSources().empty()) { invariant(isSkipOrLimit(pipeline->getSources().front())); - if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) { + if (auto skip = pipeline->popFrontWithCriteria(DocumentSourceSkip::kStageName)) { root = stdx::make_unique<RouterStageSkip>( opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip()); - } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) { + } else if (auto limit = pipeline->popFrontWithCriteria(DocumentSourceLimit::kStageName)) { root = stdx::make_unique<RouterStageLimit>( opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit()); } @@ -246,8 +270,7 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( const auto skip = params->skip; const auto limit = params->limit; if (params->mergePipeline) { - if (auto sort = - cluster_aggregation_planner::popLeadingMergeSort(params->mergePipeline.get())) { + if (auto sort = extractLeadingSort(params->mergePipeline.get())) { params->sort = *sort; } return buildPipelinePlan(executor, params); diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index d3c9349233b..c685a383307 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -105,7 +105,7 @@ public: BSONObj getOriginatingCommand() const final; - const std::size_t getNumRemotes() const final; + const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const final; long long getNumReturnedSoFar() const final; diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index 6e624f36b84..0e3a3fd6731 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -68,7 +68,8 @@ BSONObj ClusterClientCursorMock::getOriginatingCommand() const { return _originatingCommand; } -const std::size_t ClusterClientCursorMock::getNumRemotes() const { +const std::vector<ClusterClientCursorParams::RemoteCursor>& ClusterClientCursorMock::getRemotes() + const { MONGO_UNREACHABLE; } diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index 1c50403c3ae..7364240112d 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -69,7 +69,7 @@ public: BSONObj getOriginatingCommand() const final; - const std::size_t getNumRemotes() const final; + const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const final; long long getNumReturnedSoFar() const final; diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index 71a7f17c282..116634bcee8 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -42,7 +42,6 @@ #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/tailable_mode.h" #include "mongo/s/client/shard.h" -#include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/util/net/hostandport.h" namespace mongo { @@ -62,6 +61,22 @@ class RouterExecStage; * this cursor have been processed. */ struct ClusterClientCursorParams { + struct RemoteCursor { + RemoteCursor(ShardId shardId, HostAndPort hostAndPort, CursorResponse cursorResponse) + : shardId(std::move(shardId)), + hostAndPort(std::move(hostAndPort)), + cursorResponse(std::move(cursorResponse)) {} + + // The shardId of the shard on which the cursor resides. + ShardId shardId; + + // The exact host (within the shard) on which the cursor resides. + HostAndPort hostAndPort; + + // Encompasses the state of the established cursor. + CursorResponse cursorResponse; + }; + ClusterClientCursorParams(NamespaceString nss, boost::optional<ReadPreferenceSetting> readPref = boost::none) : nsString(std::move(nss)) { @@ -70,24 +85,6 @@ struct ClusterClientCursorParams { } } - /** - * Extracts the subset of fields here needed by the AsyncResultsMerger. The returned - * AsyncResultsMergerParams will assume ownership of 'remotes'. - */ - AsyncResultsMergerParams extractARMParams() { - AsyncResultsMergerParams armParams; - if (!sort.isEmpty()) { - armParams.setSort(sort); - } - armParams.setCompareWholeSortKey(compareWholeSortKey); - armParams.setRemotes(std::move(remotes)); - armParams.setTailableMode(tailableMode); - armParams.setBatchSize(batchSize); - armParams.setNss(nsString); - armParams.setAllowPartialResults(isAllowPartialResults); - return armParams; - } - // Namespace against which the cursors exist. NamespaceString nsString; @@ -111,7 +108,7 @@ struct ClusterClientCursorParams { // The number of results per batch. Optional. If specified, will be specified as the batch for // each getMore. - boost::optional<std::int64_t> batchSize; + boost::optional<long long> batchSize; // Limits the number of results returned by the ClusterClientCursor to this many. Optional. // Should be forwarded to the remote hosts in 'cmdObj'. @@ -122,7 +119,7 @@ struct ClusterClientCursorParams { // Whether this cursor is tailing a capped collection, and whether it has the awaitData option // set. - TailableModeEnum tailableMode = TailableModeEnum::kNormal; + TailableMode tailableMode = TailableMode::kNormal; // Set if a readPreference must be respected throughout the lifetime of the cursor. boost::optional<ReadPreferenceSetting> readPreference; diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index ca66ee04e08..cd5ce0f9bc8 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -147,9 +147,10 @@ BSONObj ClusterCursorManager::PinnedCursor::getOriginatingCommand() const { return _cursor->getOriginatingCommand(); } -const std::size_t ClusterCursorManager::PinnedCursor::getNumRemotes() const { +const std::vector<ClusterClientCursorParams::RemoteCursor>& +ClusterCursorManager::PinnedCursor::getRemotes() const { invariant(_cursor); - return _cursor->getNumRemotes(); + return _cursor->getRemotes(); } CursorId ClusterCursorManager::PinnedCursor::getCursorId() const { diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index cf6dc54f21e..b9b4ee81ef6 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -198,7 +198,7 @@ public: /** * Returns a reference to the vector of remote hosts involved in this operation. */ - const std::size_t getNumRemotes() const; + const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const; /** * Returns the cursor id for the underlying cursor, or zero if no cursor is owned. diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 8c9e654f5bb..f8faaabaa96 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -346,7 +346,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, } // Fill out query exec properties. - CurOp::get(opCtx)->debug().nShards = ccc->getNumRemotes(); + CurOp::get(opCtx)->debug().nShards = ccc->getRemotes().size(); CurOp::get(opCtx)->debug().nreturned = results->size(); // If the cursor is exhausted, then there are no more results to return and we don't need to @@ -490,7 +490,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, // Set the originatingCommand object and the cursorID in CurOp. { - CurOp::get(opCtx)->debug().nShards = pinnedCursor.getValue().getNumRemotes(); + CurOp::get(opCtx)->debug().nShards = pinnedCursor.getValue().getRemotes().size(); CurOp::get(opCtx)->debug().cursorid = request.cursorid; stdx::lock_guard<Client> lk(*opCtx->getClient()); CurOp::get(opCtx)->setOriginatingCommand_inlock( diff --git a/src/mongo/s/query/document_source_router_adapter.cpp b/src/mongo/s/query/document_source_router_adapter.cpp index 26a944ed5cc..4e144751dcb 100644 --- a/src/mongo/s/query/document_source_router_adapter.cpp +++ b/src/mongo/s/query/document_source_router_adapter.cpp @@ -67,10 +67,6 @@ Value DocumentSourceRouterAdapter::serialize( return Value(); // Return the empty value to hide this stage from explain output. } -std::size_t DocumentSourceRouterAdapter::getNumRemotes() const { - return _child->getNumRemotes(); -} - bool DocumentSourceRouterAdapter::remotesExhausted() { return _child->remotesExhausted(); } diff --git a/src/mongo/s/query/document_source_router_adapter.h b/src/mongo/s/query/document_source_router_adapter.h index a7db7734539..5c1a6a0935c 100644 --- a/src/mongo/s/query/document_source_router_adapter.h +++ b/src/mongo/s/query/document_source_router_adapter.h @@ -59,7 +59,6 @@ public: void detachFromOperationContext() final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; bool remotesExhausted(); - std::size_t getNumRemotes() const; void setExecContext(RouterExecStage::ExecContext execContext) { _execContext = execContext; diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index 08ce5a2cb5b..f186bcb1bb5 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -47,12 +47,13 @@ namespace mongo { -std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, - executor::TaskExecutor* executor, - const NamespaceString& nss, - const ReadPreferenceSetting readPref, - const std::vector<std::pair<ShardId, BSONObj>>& remotes, - bool allowPartialResults) { +std::vector<ClusterClientCursorParams::RemoteCursor> establishCursors( + OperationContext* opCtx, + executor::TaskExecutor* executor, + const NamespaceString& nss, + const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + bool allowPartialResults) { // Construct the requests std::vector<AsyncRequestsSender::Request> requests; for (const auto& remote : remotes) { @@ -67,23 +68,20 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, readPref, Shard::RetryPolicy::kIdempotent); - std::vector<RemoteCursor> remoteCursors; + std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; try { // Get the responses while (!ars.done()) { try { auto response = ars.next(); - // Note the shardHostAndPort may not be populated if there was an error, so be sure - // to do this after parsing the cursor response to ensure the response was ok. - // Additionally, be careful not to push into 'remoteCursors' until we are sure we - // have a valid cursor, since the error handling path will attempt to clean up - // anything in 'remoteCursors' - RemoteCursor cursor; - cursor.setCursorResponse(CursorResponse::parseFromBSONThrowing( + + // uasserts must happen before attempting to access the optional shardHostAndPort. + auto cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON( uassertStatusOK(std::move(response.swResponse)).data)); - cursor.setShardId(std::move(response.shardId)); - cursor.setHostAndPort(*response.shardHostAndPort); - remoteCursors.push_back(std::move(cursor)); + + remoteCursors.emplace_back(std::move(response.shardId), + std::move(*response.shardHostAndPort), + std::move(cursorResponse)); } catch (const DBException& ex) { // Retriable errors are swallowed if 'allowPartialResults' is true. if (allowPartialResults && @@ -116,21 +114,18 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, : response.swResponse.getStatus()); if (swCursorResponse.isOK()) { - RemoteCursor cursor; - cursor.setShardId(std::move(response.shardId)); - cursor.setHostAndPort(*response.shardHostAndPort); - cursor.setCursorResponse(std::move(swCursorResponse.getValue())); - remoteCursors.push_back(std::move(cursor)); + remoteCursors.emplace_back(std::move(response.shardId), + *response.shardHostAndPort, + std::move(swCursorResponse.getValue())); } } // Schedule killCursors against all cursors that were established. for (const auto& remoteCursor : remoteCursors) { BSONObj cmdObj = - KillCursorsRequest(nss, {remoteCursor.getCursorResponse().getCursorId()}) - .toBSON(); + KillCursorsRequest(nss, {remoteCursor.cursorResponse.getCursorId()}).toBSON(); executor::RemoteCommandRequest request( - remoteCursor.getHostAndPort(), nss.db().toString(), cmdObj, opCtx); + remoteCursor.hostAndPort, nss.db().toString(), cmdObj, opCtx); // We do not process the response to the killCursors request (we make a good-faith // attempt at cleaning up the cursors, but ignore any returned errors). diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h index e88ddc2682e..b75a750d7b7 100644 --- a/src/mongo/s/query/establish_cursors.h +++ b/src/mongo/s/query/establish_cursors.h @@ -36,7 +36,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/cursor_id.h" #include "mongo/executor/task_executor.h" -#include "mongo/s/query/async_results_merger_params_gen.h" +#include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -61,11 +61,12 @@ class CursorResponse; * on reachable hosts are returned. * */ -std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, - executor::TaskExecutor* executor, - const NamespaceString& nss, - const ReadPreferenceSetting readPref, - const std::vector<std::pair<ShardId, BSONObj>>& remotes, - bool allowPartialResults); +std::vector<ClusterClientCursorParams::RemoteCursor> establishCursors( + OperationContext* opCtx, + executor::TaskExecutor* executor, + const NamespaceString& nss, + const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + bool allowPartialResults); } // namespace mongo diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index 515da5a358c..418419fdbef 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -89,14 +89,6 @@ public: } /** - * Returns the number of remote hosts involved in this execution plan. - */ - virtual std::size_t getNumRemotes() const { - invariant(_child); // The default implementation forwards to the child stage. - return _child->getNumRemotes(); - } - - /** * Returns whether or not all the remote cursors are exhausted. */ virtual bool remotesExhausted() { diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index 48abb1452ec..4f17927483b 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -40,21 +40,18 @@ namespace mongo { RouterStageMerge::RouterStageMerge(OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) - : RouterExecStage(opCtx), - _executor(executor), - _params(params), - _arm(opCtx, executor, params->extractARMParams()) {} + : RouterExecStage(opCtx), _executor(executor), _params(params), _arm(opCtx, executor, params) {} StatusWith<ClusterQueryResult> RouterStageMerge::next(ExecContext execCtx) { // Non-tailable and tailable non-awaitData cursors always block until ready(). AwaitData // cursors wait for ready() only until a specified time limit is exceeded. - return (_params->tailableMode == TailableModeEnum::kTailableAndAwaitData + return (_params->tailableMode == TailableMode::kTailableAndAwaitData ? awaitNextWithTimeout(execCtx) : _arm.blockingNext()); } StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContext execCtx) { - invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData); + invariant(_params->tailableMode == TailableMode::kTailableAndAwaitData); // If we are in kInitialFind or kGetMoreWithAtLeastOneResultInBatch context and the ARM is not // ready, we don't block. Fall straight through to the return statement. while (!_arm.ready() && execCtx == ExecContext::kGetMoreNoResultsYet) { @@ -88,7 +85,7 @@ StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContex StatusWith<EventHandle> RouterStageMerge::getNextEvent() { // If we abandoned a previous event due to a mongoS-side timeout, wait for it first. if (_leftoverEventFromLastTimeout) { - invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData); + invariant(_params->tailableMode == TailableMode::kTailableAndAwaitData); auto event = _leftoverEventFromLastTimeout; _leftoverEventFromLastTimeout = EventHandle(); return event; @@ -105,16 +102,14 @@ bool RouterStageMerge::remotesExhausted() { return _arm.remotesExhausted(); } -std::size_t RouterStageMerge::getNumRemotes() const { - return _arm.getNumRemotes(); -} - Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { return _arm.setAwaitDataTimeout(awaitDataTimeout); } -void RouterStageMerge::addNewShardCursors(std::vector<RemoteCursor>&& newShards) { - _arm.addNewShardCursors(std::move(newShards)); +void RouterStageMerge::addNewShardCursors( + std::vector<ClusterClientCursorParams::RemoteCursor>&& newShards) { + _arm.addNewShardCursors(newShards); + std::move(newShards.begin(), newShards.end(), std::back_inserter(_params->remotes)); } } // namespace mongo diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index b6bfee146b6..efd397b8c7e 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -57,12 +57,10 @@ public: bool remotesExhausted() final; - std::size_t getNumRemotes() const final; - /** * Adds the cursors in 'newShards' to those being merged by the ARM. */ - void addNewShardCursors(std::vector<RemoteCursor>&& newShards); + void addNewShardCursors(std::vector<ClusterClientCursorParams::RemoteCursor>&& newShards); protected: Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index 5e94274b9ac..97febc62173 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -84,10 +84,6 @@ void RouterStagePipeline::kill(OperationContext* opCtx) { _mergePipeline->dispose(opCtx); } -std::size_t RouterStagePipeline::getNumRemotes() const { - return _mongosOnlyPipeline ? 0 : _routerAdapter->getNumRemotes(); -} - bool RouterStagePipeline::remotesExhausted() { return _mongosOnlyPipeline || _routerAdapter->remotesExhausted(); } diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index c14ddf9f80b..e876dc816a2 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -51,8 +51,6 @@ public: bool remotesExhausted() final; - std::size_t getNumRemotes() const final; - protected: Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.cpp b/src/mongo/s/query/router_stage_update_on_add_shard.cpp index 61fa2a9176d..451a1ee9699 100644 --- a/src/mongo/s/query/router_stage_update_on_add_shard.cpp +++ b/src/mongo/s/query/router_stage_update_on_add_shard.cpp @@ -56,11 +56,9 @@ bool needsUpdate(const StatusWith<ClusterQueryResult>& childResult) { RouterStageUpdateOnAddShard::RouterStageUpdateOnAddShard(OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params, - std::vector<ShardId> shardIds, BSONObj cmdToRunOnNewShards) : RouterExecStage(opCtx, stdx::make_unique<RouterStageMerge>(opCtx, executor, params)), _params(params), - _shardIds(std::move(shardIds)), _cmdToRunOnNewShards(cmdToRunOnNewShards) {} StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next( @@ -75,12 +73,18 @@ StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next( } void RouterStageUpdateOnAddShard::addNewShardCursors(BSONObj newShardDetectedObj) { + std::vector<ShardId> existingShardIds; + for (const auto& remote : _params->remotes) { + existingShardIds.push_back(remote.shardId); + } checked_cast<RouterStageMerge*>(getChildStage()) - ->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj)); + ->addNewShardCursors( + establishShardCursorsOnNewShards(std::move(existingShardIds), newShardDetectedObj)); } -std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards( - const BSONObj& newShardDetectedObj) { +std::vector<ClusterClientCursorParams::RemoteCursor> +RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(std::vector<ShardId> existingShardIds, + const BSONObj& newShardDetectedObj) { auto* opCtx = getOpCtx(); // Reload the shard registry. We need to ensure a reload initiated after calling this method // caused the reload, otherwise we aren't guaranteed to get all the new shards. @@ -94,12 +98,12 @@ std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNe std::vector<ShardId> shardIds, newShardIds; shardRegistry->getAllShardIdsNoReload(&shardIds); - std::sort(_shardIds.begin(), _shardIds.end()); + std::sort(existingShardIds.begin(), existingShardIds.end()); std::sort(shardIds.begin(), shardIds.end()); std::set_difference(shardIds.begin(), shardIds.end(), - _shardIds.begin(), - _shardIds.end(), + existingShardIds.begin(), + existingShardIds.end(), std::back_inserter(newShardIds)); auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand( @@ -108,7 +112,6 @@ std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNe std::vector<std::pair<ShardId, BSONObj>> requests; for (const auto& shardId : newShardIds) { requests.emplace_back(shardId, cmdObj); - _shardIds.push_back(shardId); } const bool allowPartialResults = false; // partial results are not allowed return establishCursors(opCtx, diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.h b/src/mongo/s/query/router_stage_update_on_add_shard.h index 00ee921e2af..1128dc83430 100644 --- a/src/mongo/s/query/router_stage_update_on_add_shard.h +++ b/src/mongo/s/query/router_stage_update_on_add_shard.h @@ -44,7 +44,6 @@ public: RouterStageUpdateOnAddShard(OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params, - std::vector<ShardId> shardIds, BSONObj cmdToRunOnNewShards); StatusWith<ClusterQueryResult> next(ExecContext) final; @@ -59,10 +58,10 @@ private: /** * Open the cursors on the new shards. */ - std::vector<RemoteCursor> establishShardCursorsOnNewShards(const BSONObj& newShardDetectedObj); + std::vector<ClusterClientCursorParams::RemoteCursor> establishShardCursorsOnNewShards( + std::vector<ShardId> existingShardIds, const BSONObj& newShardDetectedObj); ClusterClientCursorParams* _params; - std::vector<ShardId> _shardIds; BSONObj _cmdToRunOnNewShards; }; } diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index 9a754364412..bdf4283c104 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -38,7 +38,6 @@ #include "mongo/s/query/cluster_client_cursor_impl.h" #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/cluster_cursor_manager.h" -#include "mongo/s/shard_id.h" namespace mongo { @@ -49,7 +48,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, const NamespaceString& requestedNss, executor::TaskExecutor* executor, ClusterCursorManager* cursorManager, - TailableModeEnum tailableMode) { + TailableMode tailableMode) { if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) { return cmdResult; } @@ -72,13 +71,11 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, } ClusterClientCursorParams params(incomingCursorResponse.getValue().getNSS()); - params.remotes.emplace_back(); - auto& remoteCursor = params.remotes.back(); - remoteCursor.setShardId(shardId.toString()); - remoteCursor.setHostAndPort(server); - remoteCursor.setCursorResponse(CursorResponse(incomingCursorResponse.getValue().getNSS(), - incomingCursorResponse.getValue().getCursorId(), - {})); + params.remotes.emplace_back(shardId, + server, + CursorResponse(incomingCursorResponse.getValue().getNSS(), + incomingCursorResponse.getValue().getCursorId(), + {})); params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); params.tailableMode = tailableMode; diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h index b9756be44f7..75a4e76bf24 100644 --- a/src/mongo/s/query/store_possible_cursor.h +++ b/src/mongo/s/query/store_possible_cursor.h @@ -74,6 +74,6 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, const NamespaceString& requestedNss, executor::TaskExecutor* executor, ClusterCursorManager* cursorManager, - TailableModeEnum tailableMode = TailableModeEnum::kNormal); + TailableMode tailableMode = TailableMode::kNormal); } // namespace mongo diff --git a/src/mongo/shell/bench.cpp b/src/mongo/shell/bench.cpp index 27b75dbb06c..b096affd2cd 100644 --- a/src/mongo/shell/bench.cpp +++ b/src/mongo/shell/bench.cpp @@ -214,15 +214,12 @@ int runQueryWithReadCommands(DBClientBase* conn, } while (cursorResponse.getCursorId() != 0) { - GetMoreRequest getMoreRequest( - qr->nss(), - cursorResponse.getCursorId(), - qr->getBatchSize() - ? boost::optional<std::int64_t>(static_cast<std::int64_t>(*qr->getBatchSize())) - : boost::none, - boost::none, // maxTimeMS - boost::none, // term - boost::none); // lastKnownCommittedOpTime + GetMoreRequest getMoreRequest(qr->nss(), + cursorResponse.getCursorId(), + qr->getBatchSize(), + boost::none, // maxTimeMS + boost::none, // term + boost::none); // lastKnownCommittedOpTime BSONObj getMoreCommandResult; uassert(ErrorCodes::CommandFailed, str::stream() << "getMore command failed; reply was: " << getMoreCommandResult, diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript index 133ee826998..b7dd19b3953 100644 --- a/src/mongo/util/net/SConscript +++ b/src/mongo/util/net/SConscript @@ -12,7 +12,6 @@ env.Library( "hostandport.cpp", "hostname_canonicalization.cpp", "sockaddr.cpp", - env.Idlc('hostandport.idl')[0], ], LIBDEPS=[ '$BUILD_DIR/mongo/base', diff --git a/src/mongo/util/net/hostandport.h b/src/mongo/util/net/hostandport.h index da0af575bc3..540f57f3b46 100644 --- a/src/mongo/util/net/hostandport.h +++ b/src/mongo/util/net/hostandport.h @@ -54,19 +54,12 @@ class StatusWith; */ struct HostAndPort { /** - * Parses "text" to produce a HostAndPort. Returns either that or an error status describing - * the parse failure. + * Parses "text" to produce a HostAndPort. Returns either that or an error + * status describing the parse failure. */ static StatusWith<HostAndPort> parse(StringData text); /** - * A version of 'parse' that throws a UserException if a parsing error is encountered. - */ - static HostAndPort parseThrowing(StringData text) { - return uassertStatusOK(parse(text)); - } - - /** * Construct an empty/invalid HostAndPort. */ HostAndPort(); diff --git a/src/mongo/util/net/hostandport.idl b/src/mongo/util/net/hostandport.idl deleted file mode 100644 index 6edda963f6e..00000000000 --- a/src/mongo/util/net/hostandport.idl +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright (C) 2018 MongoDB Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License, version 3, -# as published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# -# As a special exception, the copyright holders give permission to link the -# code of portions of this program with the OpenSSL library under certain -# conditions as described in each individual source file and distribute -# linked combinations including the program with the OpenSSL library. You -# must comply with the GNU Affero General Public License in all respects for -# all of the code used other than as permitted herein. If you modify file(s) -# with this exception, you may extend this exception to your version of the -# file(s), but you are not obligated to do so. If you do not wish to do so, -# delete this exception statement from your version. If you delete this -# exception statement from all source files in the program, then also delete -# it in the license file. - -global: - cpp_namespace: "mongo" - cpp_includes: - - "mongo/util/net/hostandport.h" -types: - HostAndPort: - bson_serialization_type: string - description: A string representing a host and port of a mongod or mongos process. - cpp_type: HostAndPort - serializer: HostAndPort::toString - deserializer: HostAndPort::parseThrowing - |