diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-03-01 17:37:52 -0500 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-04-04 17:23:50 -0400 |
commit | 7d09f278a2acf9791b36927d6af1d30347d60391 (patch) | |
tree | 91138c5de49ba3f300df43f840e63c9979a9cebd | |
parent | 09253ad8f4187f4e7e4c453cc157362d751e0918 (diff) | |
download | mongo-7d09f278a2acf9791b36927d6af1d30347d60391.tar.gz |
SERVER-33323 Use the IDL to serialize the ARM
66 files changed, 1376 insertions, 843 deletions
diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp index 399d6cb9143..23a9b12a792 100644 --- a/src/mongo/client/dbclientcursor.cpp +++ b/src/mongo/client/dbclientcursor.cpp @@ -137,7 +137,7 @@ Message DBClientCursor::_assembleInit() { Message DBClientCursor::_assembleGetMore() { invariant(cursorId); if (_useFindCommand) { - long long batchSize = nextBatchSize(); + std::int64_t batchSize = nextBatchSize(); auto gmr = GetMoreRequest(ns, cursorId, boost::make_optional(batchSize != 0, batchSize), diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index ac8a6fbf2f2..4bc9e9b35cc 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -519,7 +519,7 @@ Status runAggregate(OperationContext* opCtx, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), opCtx->recoveryUnit()->getReadConcernLevel(), cmdObj); - if (expCtx->tailableMode == TailableMode::kTailableAndAwaitData) { + if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { cursorParams.setTailable(true); cursorParams.setAwaitData(true); } diff --git a/src/mongo/db/matcher/expression_optimize_test.cpp b/src/mongo/db/matcher/expression_optimize_test.cpp index 9091ad48e93..e2f82257a67 100644 --- a/src/mongo/db/matcher/expression_optimize_test.cpp +++ b/src/mongo/db/matcher/expression_optimize_test.cpp @@ -133,7 +133,7 @@ TEST(ExpressionOptimizeTest, IsValidText) { TEST(ExpressionOptimizeTest, IsValidTextTailable) { // Filter inside QueryRequest is not used. auto qr = stdx::make_unique<QueryRequest>(nss); - qr->setTailableMode(TailableMode::kTailable); + qr->setTailableMode(TailableModeEnum::kTailable); ASSERT_OK(qr->validate()); // Invalid: TEXT and tailable. diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 9712f86cd53..5b3f46972cd 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -300,6 +300,7 @@ pipelineeEnv.Library( 'document_source_skip.cpp', 'document_source_sort.cpp', 'document_source_sort_by_count.cpp', + "cluster_aggregation_planner.cpp", 'document_source_tee_consumer.cpp', 'document_source_unwind.cpp', 'mongo_process_common.cpp', diff --git a/src/mongo/db/pipeline/cluster_aggregation_planner.cpp b/src/mongo/db/pipeline/cluster_aggregation_planner.cpp new file mode 100644 index 00000000000..3653b5b2638 --- /dev/null +++ b/src/mongo/db/pipeline/cluster_aggregation_planner.cpp @@ -0,0 +1,182 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/cluster_aggregation_planner.h" + +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_merge_cursors.h" +#include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/pipeline/document_source_unwind.h" + +namespace mongo { +namespace cluster_aggregation_planner { + +namespace { +/** + * Moves everything before a splittable stage to the shards. If there are no splittable stages, + * moves everything to the shards. + * + * It is not safe to call this optimization multiple times. + * + * NOTE: looks for SplittableDocumentSources and uses that API + */ +void findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) { + while (!mergePipe->getSources().empty()) { + boost::intrusive_ptr<DocumentSource> current = mergePipe->popFront(); + + // Check if this source is splittable. + SplittableDocumentSource* splittable = + dynamic_cast<SplittableDocumentSource*>(current.get()); + + if (!splittable) { + // Move the source from the merger _sources to the shard _sources. + shardPipe->pushBack(current); + } else { + // Split this source into 'merge' and 'shard' _sources. + boost::intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource(); + auto mergeSources = splittable->getMergeSources(); + + // A source may not simultaneously be present on both sides of the split. + invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) == + mergeSources.end()); + + if (shardSource) + shardPipe->pushBack(shardSource); + + // Add the stages in reverse order, so that they appear in the pipeline in the same + // order as they were returned by the stage. + for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) { + mergePipe->addInitialSource(*it); + } + + break; + } + } +} + +/** + * If the final stage on shards is to unwind an array, move that stage to the merger. This cuts down + * on network traffic and allows us to take advantage of reduced copying in unwind. + */ +void moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe) { + while (!shardPipe->getSources().empty() && + dynamic_cast<DocumentSourceUnwind*>(shardPipe->getSources().back().get())) { + mergePipe->addInitialSource(shardPipe->popBack()); + } +} + +/** + * Adds a stage to the end of 'shardPipe' explicitly requesting all fields that 'mergePipe' needs. + * This is only done if it heuristically determines that it is needed. This optimization can reduce + * the amount of network traffic and can also enable the shards to convert less source BSON into + * Documents. + */ +void limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe) { + auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery()) + ? DepsTracker::MetadataAvailable::kTextScore + : DepsTracker::MetadataAvailable::kNoMetadata; + DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata)); + if (mergeDeps.needWholeDocument) + return; // the merge needs all fields, so nothing we can do. + + // Empty project is "special" so if no fields are needed, we just ask for _id instead. + if (mergeDeps.fields.empty()) + mergeDeps.fields.insert("_id"); + + // Remove metadata from dependencies since it automatically flows through projection and we + // don't want to project it in to the document. + mergeDeps.setNeedTextScore(false); + + // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of + // field dependencies. While this may not be 100% ideal in all cases, it is simple and + // avoids the worst cases by ensuring that: + // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of + // dependencies. This situation can happen when a $sort is before the first $project or + // $group. Without the optimization, the shards would have to reify and transmit full + // objects even though only a subset of fields are needed. + // 2) Optimization IS NOT applied immediately following a $project or $group since it would + // add an unnecessary project (and therefore a deep-copy). + for (auto&& source : shardPipe->getSources()) { + DepsTracker dt(depsMetadata); + if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS) + return; + } + // if we get here, add the project. + boost::intrusive_ptr<DocumentSource> project = DocumentSourceProject::createFromBson( + BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->getContext()); + shardPipe->pushBack(project); +} +} // namespace + +void performSplitPipelineOptimizations(Pipeline* shardPipeline, Pipeline* mergingPipeline) { + // The order in which optimizations are applied can have significant impact on the + // efficiency of the final pipeline. Be Careful! + findSplitPoint(shardPipeline, mergingPipeline); + moveFinalUnwindFromShardsToMerger(shardPipeline, mergingPipeline); + limitFieldsSentFromShardsToMerger(shardPipeline, mergingPipeline); +} + +boost::optional<BSONObj> popLeadingMergeSort(Pipeline* pipeline) { + // Remove a leading $sort iff it is a mergesort, since the ARM cannot handle blocking $sort. + auto frontSort = pipeline->popFrontWithNameAndCriteria( + DocumentSourceSort::kStageName, [](const DocumentSource* const source) { + return static_cast<const DocumentSourceSort* const>(source)->mergingPresorted(); + }); + + if (frontSort) { + auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get()); + if (auto sortLimit = sortStage->getLimitSrc()) { + // There was a limit stage absorbed into the sort stage, so we need to preserve that. + pipeline->addInitialSource(sortLimit); + } + return sortStage + ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging) + .toBson(); + } + return boost::none; +} + +void addMergeCursorsSource(Pipeline* mergePipeline, + std::vector<RemoteCursor> remoteCursors, + executor::TaskExecutor* executor) { + AsyncResultsMergerParams armParams; + if (auto sort = popLeadingMergeSort(mergePipeline)) { + armParams.setSort(std::move(*sort)); + } + armParams.setRemotes(std::move(remoteCursors)); + armParams.setTailableMode(mergePipeline->getContext()->tailableMode); + armParams.setNss(mergePipeline->getContext()->ns); + mergePipeline->addInitialSource(DocumentSourceMergeCursors::create( + executor, std::move(armParams), mergePipeline->getContext())); +} + +} // namespace cluster_aggregation_planner +} // namespace mongo diff --git a/src/mongo/db/pipeline/cluster_aggregation_planner.h b/src/mongo/db/pipeline/cluster_aggregation_planner.h new file mode 100644 index 00000000000..f62f158f6ca --- /dev/null +++ b/src/mongo/db/pipeline/cluster_aggregation_planner.h @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/pipeline.h" + +namespace mongo { +namespace cluster_aggregation_planner { + +/** + * Performs optimizations with the aim of reducing computing time and network traffic when a + * pipeline has been split into two pieces. Modifies 'shardPipeline' and 'mergingPipeline' such that + * they may contain different stages, but still compute the same results when executed. + */ +void performSplitPipelineOptimizations(Pipeline* shardPipeline, Pipeline* mergingPipeline); + +/** + * Rips off an initial $sort stage that can be handled by cursor merging machinery. Returns the + * sort key pattern of such a $sort stage if there was one, and boost::none otherwise. + */ +boost::optional<BSONObj> popLeadingMergeSort(Pipeline* mergePipeline); + +/** + * Creates a new DocumentSourceMergeCursors from the provided 'remoteCursors' and adds it to the + * front of 'mergePipeline'. + */ +void addMergeCursorsSource(Pipeline* mergePipeline, + std::vector<RemoteCursor> remoteCursors, + executor::TaskExecutor*); + +} // namespace cluster_aggregation_planner +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index f393efbeae9..a9c4a9c5667 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -394,7 +394,7 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx, list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { // A change stream is a tailable + awaitData cursor. - expCtx->tailableMode = TailableMode::kTailableAndAwaitData; + expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData; // Change stream on an entire database is a new 4.0 feature. uassert(ErrorCodes::QueryFeatureNotAllowed, diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index dfe4ece48ac..32efd87cdda 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -520,9 +520,8 @@ public: Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { while (_removeLeadingQueryStages && !pipeline->getSources().empty()) { - if (pipeline->popFrontWithCriteria("$match") || - pipeline->popFrontWithCriteria("$sort") || - pipeline->popFrontWithCriteria("$project")) { + if (pipeline->popFrontWithName("$match") || pipeline->popFrontWithName("$sort") || + pipeline->popFrontWithName("$project")) { continue; } break; diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp index 06a314e8c2e..3e34fd366a0 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp @@ -43,15 +43,20 @@ constexpr StringData DocumentSourceMergeCursors::kStageName; DocumentSourceMergeCursors::DocumentSourceMergeCursors( executor::TaskExecutor* executor, - std::unique_ptr<ClusterClientCursorParams> cursorParams, - const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(expCtx), _executor(executor), _armParams(std::move(cursorParams)) {} + AsyncResultsMergerParams armParams, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<BSONObj> ownedParamsSpec) + : DocumentSource(expCtx), + _armParamsObj(std::move(ownedParamsSpec)), + _executor(executor), + _armParams(std::move(armParams)) {} DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() { // We don't expect or support tailable cursors to be executing through this stage. - invariant(pExpCtx->tailableMode == TailableMode::kNormal); + invariant(pExpCtx->tailableMode == TailableModeEnum::kNormal); if (!_arm) { - _arm.emplace(pExpCtx->opCtx, _executor, _armParams.get()); + _arm.emplace(pExpCtx->opCtx, _executor, std::move(*_armParams)); + _armParams = boost::none; } auto next = uassertStatusOK(_arm->blockingNext()); if (next.isEOF()) { @@ -60,24 +65,19 @@ DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() { return Document::fromBsonWithMetaData(*next.getResult()); } -void DocumentSourceMergeCursors::serializeToArray( - std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { +Value DocumentSourceMergeCursors::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { invariant(!_arm); invariant(_armParams); - std::vector<Value> cursors; - for (auto&& remote : _armParams->remotes) { - cursors.emplace_back(Document{{"host", remote.hostAndPort.toString()}, - {"ns", remote.cursorResponse.getNSS().toString()}, - {"id", remote.cursorResponse.getCursorId()}}); - } - array.push_back(Value(Document{{kStageName, Value(std::move(cursors))}})); - if (!_armParams->sort.isEmpty()) { - array.push_back(Value(Document{{DocumentSourceSort::kStageName, Value(_armParams->sort)}})); - } + return Value(Document{{kStageName, _armParams->toBSON()}}); } Pipeline::SourceContainer::iterator DocumentSourceMergeCursors::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + // TODO SERVER-34009 3.6 and earlier mongos processes serialize $mergeCursors and $sort with + // $mergingPresorted as separate stages. This code is responsible for coalescing these two + // stages. New mongos processes serialize $mergeCursors with an absorbed sort as one stage. Once + // we no longer have to support the old format, this code can be deleted. invariant(*itr == this); invariant(!_arm); invariant(_armParams); @@ -92,9 +92,9 @@ Pipeline::SourceContainer::iterator DocumentSourceMergeCursors::doOptimizeAt( return next; } - _armParams->sort = + _armParams->setSort( nextSort->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging) - .toBson(); + .toBson()); if (auto sortLimit = nextSort->getLimitSrc()) { // There was a limit stage absorbed into the sort stage, so we need to preserve that. container->insert(std::next(next), sortLimit); @@ -105,15 +105,31 @@ Pipeline::SourceContainer::iterator DocumentSourceMergeCursors::doOptimizeAt( boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { - uassert( - 17026, "$mergeCursors stage expected array as argument", elem.type() == BSONType::Array); + if (elem.type() == BSONType::Object) { + // This is the modern serialization format. We de-serialize using the IDL. + auto ownedObj = elem.embeddedObject().getOwned(); + auto armParams = + AsyncResultsMergerParams::parse(IDLParserErrorContext(kStageName), ownedObj); + return new DocumentSourceMergeCursors( + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), + std::move(armParams), + expCtx, + std::move(ownedObj)); + } + + // This is the old serialization format which can still be generated by mongos processes + // older than 4.0. + // TODO SERVER-34009 Remove support for this format. + uassert(17026, + "$mergeCursors stage expected either an array or an object as argument", + elem.type() == BSONType::Array); const auto serializedRemotes = elem.Array(); uassert(50729, "$mergeCursors stage expected array with at least one entry", serializedRemotes.size() > 0); boost::optional<NamespaceString> nss; - std::vector<ClusterClientCursorParams::RemoteCursor> remotes; + std::vector<RemoteCursor> remotes; for (auto&& cursor : serializedRemotes) { BSONElement nsElem; BSONElement hostElem; @@ -164,28 +180,30 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson( // any data in the initial batch. // TODO SERVER-33323 We use a fake shard id because the AsyncResultsMerger won't use it for // anything, and finding the real one is non-trivial. + RemoteCursor remoteCursor; + remoteCursor.setShardId(ShardId("fakeShardIdForMergeCursors")); + remoteCursor.setHostAndPort(std::move(host)); std::vector<BSONObj> emptyBatch; - remotes.push_back({ShardId("fakeShardIdForMergeCursors"), - std::move(host), - CursorResponse{*nss, cursorId, emptyBatch}}); + remoteCursor.setCursorResponse(CursorResponse{*nss, cursorId, emptyBatch}); + remotes.push_back(std::move(remoteCursor)); } invariant(nss); // We know there is at least one cursor in 'serializedRemotes', and we require // each cursor to have a 'ns' field. - auto params = stdx::make_unique<ClusterClientCursorParams>(*nss); - params->remotes = std::move(remotes); + AsyncResultsMergerParams params; + params.setRemotes(std::move(remotes)); + params.setNss(*nss); return new DocumentSourceMergeCursors( Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params), - expCtx); + expCtx, + elem.embeddedObject().getOwned()); } boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create( - std::vector<ClusterClientCursorParams::RemoteCursor>&& remoteCursors, executor::TaskExecutor* executor, + AsyncResultsMergerParams params, const boost::intrusive_ptr<ExpressionContext>& expCtx) { - auto params = stdx::make_unique<ClusterClientCursorParams>(expCtx->ns); - params->remotes = std::move(remoteCursors); return new DocumentSourceMergeCursors(executor, std::move(params), expCtx); } diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index cb74f4bd9ef..611d09bcd34 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -55,11 +55,11 @@ public: BSONElement, const boost::intrusive_ptr<ExpressionContext>&); /** - * Creates a new DocumentSourceMergeCursors from the given 'remoteCursors'. + * Creates a new DocumentSourceMergeCursors from the given parameters. */ static boost::intrusive_ptr<DocumentSource> create( - std::vector<ClusterClientCursorParams::RemoteCursor>&& remoteCursors, executor::TaskExecutor*, + AsyncResultsMergerParams, const boost::intrusive_ptr<ExpressionContext>&); const char* getSourceName() const final { @@ -77,9 +77,7 @@ public: /** * Serializes this stage to be sent to perform the merging on a different host. */ - void serializeToArray( - std::vector<Value>& array, - boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; StageConstraints constraints(Pipeline::SplitState pipeState) const final { StageConstraints constraints(StreamType::kStreaming, @@ -102,21 +100,23 @@ protected: private: DocumentSourceMergeCursors(executor::TaskExecutor*, - std::unique_ptr<ClusterClientCursorParams>, - const boost::intrusive_ptr<ExpressionContext>&); + AsyncResultsMergerParams, + const boost::intrusive_ptr<ExpressionContext>&, + boost::optional<BSONObj> ownedParamsSpec = boost::none); - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { - MONGO_UNREACHABLE; // Should call serializeToArray instead. - } + // When we have parsed the params out of a BSONObj, the object needs to stay around while the + // params are in use. We store them here. + boost::optional<BSONObj> _armParamsObj; executor::TaskExecutor* _executor; - // '_arm' is not populated until the first call to getNext(). If getNext() is never called we - // will not create an AsyncResultsMerger. If we did so the destruction of this stage would cause - // the cursors within the ARM to be killed prematurely. For example, if this stage is parsed on - // mongos then forwarded to the shards, it should not kill the cursors when it goes out of scope - // on mongos. - std::unique_ptr<ClusterClientCursorParams> _armParams; + // '_armParams' is populated until the first call to getNext(). Upon the first call to getNext() + // '_arm' will be populated using '_armParams', and '_armParams' will become boost::none. So if + // getNext() is never called we will never populate '_arm'. If we did so the destruction of this + // stage would cause the cursors within the ARM to be killed prematurely. For example, if this + // stage is parsed on mongos then forwarded to the shards, it should not kill the cursors when + // it goes out of scope on mongos. + boost::optional<AsyncResultsMergerParams> _armParams; boost::optional<AsyncResultsMerger> _arm; }; diff --git a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp index 4809ac54b50..3614324d27d 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp @@ -111,9 +111,7 @@ private: }; TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectNonArray) { - auto spec = BSON("$mergeCursors" << BSON( - "cursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host" - << kTestHost.toString())))); + auto spec = BSON("$mergeCursors" << 2); ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()), AssertionException, 17026); @@ -213,16 +211,66 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseTheSerializedVersionOf ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx())); } +RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) { + RemoteCursor remoteCursor; + remoteCursor.setShardId(std::move(shardId)); + remoteCursor.setHostAndPort(std::move(host)); + remoteCursor.setCursorResponse(std::move(response)); + return remoteCursor; +} + +TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseSerializedARMParams) { + AsyncResultsMergerParams params; + params.setSort(BSON("y" << 1 << "z" << 1)); + params.setNss(kTestNss); + std::vector<RemoteCursor> cursors; + cursors.emplace_back(makeRemoteCursor( + kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, kExhaustedCursorID, {}))); + params.setRemotes(std::move(cursors)); + auto spec = BSON("$mergeCursors" << params.toBSON()); + auto mergeCursors = + DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()); + std::vector<Value> serializationArray; + mergeCursors->serializeToArray(serializationArray); + ASSERT_EQ(serializationArray.size(), 1UL); + + // Make sure the serialized version can be parsed into an identical AsyncResultsMergerParams. + auto newSpec = serializationArray[0].getDocument().toBson(); + ASSERT(newSpec["$mergeCursors"].type() == BSONType::Object); + auto newParams = AsyncResultsMergerParams::parse(IDLParserErrorContext("$mergeCursors test"), + newSpec["$mergeCursors"].Obj()); + ASSERT_TRUE(params.getSort()); + ASSERT_BSONOBJ_EQ(*params.getSort(), *newParams.getSort()); + ASSERT_EQ(params.getCompareWholeSortKey(), newParams.getCompareWholeSortKey()); + ASSERT(params.getTailableMode() == newParams.getTailableMode()); + ASSERT(params.getBatchSize() == newParams.getBatchSize()); + ASSERT_EQ(params.getNss(), newParams.getNss()); + ASSERT_EQ(params.getAllowPartialResults(), newParams.getAllowPartialResults()); + ASSERT_EQ(newParams.getRemotes().size(), 1UL); + ASSERT(newParams.getRemotes()[0].getShardId() == kTestShardIds[0].toString()); + ASSERT(newParams.getRemotes()[0].getHostAndPort() == kTestShardHosts[0]); + ASSERT_EQ(newParams.getRemotes()[0].getCursorResponse().getNSS(), kTestNss); + ASSERT_EQ(newParams.getRemotes()[0].getCursorResponse().getCursorId(), kExhaustedCursorID); + ASSERT(newParams.getRemotes()[0].getCursorResponse().getBatch().empty()); + + // Test that the $mergeCursors stage will accept the serialized format of + // AsyncResultsMergerParams. + ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx())); +} + TEST_F(DocumentSourceMergeCursorsTest, ShouldReportEOFWithNoCursors) { auto expCtx = getExpCtx(); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back( - kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {})); - cursors.emplace_back( - kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {})); + AsyncResultsMergerParams armParams; + armParams.setNss(kTestNss); + std::vector<RemoteCursor> cursors; + cursors.emplace_back(makeRemoteCursor( + kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {}))); + cursors.emplace_back(makeRemoteCursor( + kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {}))); + armParams.setRemotes(std::move(cursors)); auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); auto mergeCursorsStage = - DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); + DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx); ASSERT_TRUE(mergeCursorsStage->getNext().isEOF()); } @@ -236,12 +284,17 @@ BSONObj cursorResponseObj(const NamespaceString& nss, TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) { auto expCtx = getExpCtx(); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); + AsyncResultsMergerParams armParams; + armParams.setNss(kTestNss); + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); + armParams.setRemotes(std::move(cursors)); auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); pipeline->addInitialSource( - DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx)); + DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); // Iterate the $mergeCursors stage asynchronously on a different thread, since it will block // waiting for network responses, which we will manually schedule below. @@ -280,12 +333,17 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) { TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) { auto expCtx = getExpCtx(); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); + AsyncResultsMergerParams armParams; + armParams.setNss(kTestNss); + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); + armParams.setRemotes(std::move(cursors)); auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); pipeline->addInitialSource( - DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx)); + DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); pipeline.reset(); // Delete the pipeline before using it. @@ -295,11 +353,15 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) { TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorIfPartiallyIterated) { auto expCtx = getExpCtx(); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + AsyncResultsMergerParams armParams; + armParams.setNss(kTestNss); + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); + armParams.setRemotes(std::move(cursors)); auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); pipeline->addInitialSource( - DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx)); + DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); // Iterate the pipeline asynchronously on a different thread, since it will block waiting for // network responses, which we will manually schedule below. @@ -344,12 +406,16 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx)); // Make a $mergeCursors stage and add it to the front of the pipeline. - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); - auto mergeCursorsStage = - DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); - pipeline->addInitialSource(std::move(mergeCursorsStage)); + AsyncResultsMergerParams armParams; + armParams.setNss(kTestNss); + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); + armParams.setRemotes(std::move(cursors)); + pipeline->addInitialSource( + DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); // After optimization we should only have a $mergeCursors stage. pipeline->optimizePipeline(); @@ -364,6 +430,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}})); ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}})); ASSERT_FALSE(static_cast<bool>(pipeline->getNext())); + std::cout << "Finished"; }); onCommand([&](const auto& request) { @@ -382,36 +449,35 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd future.timed_get(kFutureTimeout); } -TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) { +TEST_F(DocumentSourceMergeCursorsTest, ShouldEnforceSortSpecifiedViaARMParams) { auto expCtx = getExpCtx(); + auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); - // Make a pipeline with a single $sort stage that is merging pre-sorted results. - const bool mergingPresorted = true; - const long long limit = 3; - auto sortStage = DocumentSourceSort::create( - expCtx, BSON("x" << 1), limit, DocumentSourceSort::kMaxMemoryUsageBytes, mergingPresorted); - auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx)); - - // Make a $mergeCursors stage and add it to the front of the pipeline. - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); - auto mergeCursorsStage = - DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); - pipeline->addInitialSource(std::move(mergeCursorsStage)); + // Make a $mergeCursors stage with a sort on "x" and add it to the front of the pipeline. + AsyncResultsMergerParams armParams; + armParams.setNss(kTestNss); + armParams.setSort(BSON("x" << 1)); + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); + armParams.setRemotes(std::move(cursors)); + pipeline->addInitialSource( + DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); - // After optimization, we should still have a $limit stage. + // After optimization we should only have a $mergeCursors stage. pipeline->optimizePipeline(); - ASSERT_EQ(pipeline->getSources().size(), 2UL); + ASSERT_EQ(pipeline->getSources().size(), 1UL); ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); - ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get())); // Iterate the pipeline asynchronously on a different thread, since it will block waiting for // network responses, which we will manually schedule below. - auto future = launchAsync([&]() { - for (int i = 1; i <= limit; ++i) { - ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}})); - } + auto future = launchAsync([&pipeline]() { + ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}})); + ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 2}})); + ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}})); + ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}})); ASSERT_FALSE(static_cast<bool>(pipeline->getNext())); }); @@ -431,7 +497,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLea future.timed_get(kFutureTimeout); } -TEST_F(DocumentSourceMergeCursorsTest, ShouldSerializeSortIfAbsorbedViaOptimize) { +TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) { auto expCtx = getExpCtx(); // Make a pipeline with a single $sort stage that is merging pre-sorted results. @@ -442,12 +508,16 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldSerializeSortIfAbsorbedViaOptimize) auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx)); // Make a $mergeCursors stage and add it to the front of the pipeline. - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); - auto mergeCursorsStage = - DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); - pipeline->addInitialSource(std::move(mergeCursorsStage)); + AsyncResultsMergerParams armParams; + armParams.setNss(kTestNss); + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); + armParams.setRemotes(std::move(cursors)); + pipeline->addInitialSource( + DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); // After optimization, we should still have a $limit stage. pipeline->optimizePipeline(); @@ -455,11 +525,29 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldSerializeSortIfAbsorbedViaOptimize) ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get())); - auto serialized = pipeline->serialize(); - ASSERT_EQ(serialized.size(), 3UL); - ASSERT_FALSE(serialized[0]["$mergeCursors"].missing()); - ASSERT_FALSE(serialized[1]["$sort"].missing()); - ASSERT_FALSE(serialized[2]["$limit"].missing()); + // Iterate the pipeline asynchronously on a different thread, since it will block waiting for + // network responses, which we will manually schedule below. + auto future = launchAsync([&]() { + for (int i = 1; i <= limit; ++i) { + ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}})); + } + ASSERT_FALSE(static_cast<bool>(pipeline->getNext())); + }); + + onCommand([&](const auto& request) { + return cursorResponseObj(expCtx->ns, + kExhaustedCursorID, + {BSON("x" << 1 << "$sortKey" << BSON("" << 1)), + BSON("x" << 3 << "$sortKey" << BSON("" << 3))}); + }); + onCommand([&](const auto& request) { + return cursorResponseObj(expCtx->ns, + kExhaustedCursorID, + {BSON("x" << 2 << "$sortKey" << BSON("" << 2)), + BSON("x" << 4 << "$sortKey" << BSON("" << 4))}); + }); + + future.timed_get(kFutureTimeout); } } // namespace diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index d2ad1cd9ec4..c5989718d5e 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -262,7 +262,7 @@ private: uint64_t _maxMemoryUsageBytes; bool _done; - bool _mergingPresorted; + bool _mergingPresorted; // TODO SERVER-34009 Remove this flag. std::unique_ptr<MySorter> _sorter; std::unique_ptr<MySorter::Iterator> _output; }; diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 6847d43e8a4..b5e490e91d7 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -161,7 +161,7 @@ public: * query. */ bool isTailableAwaitData() const { - return tailableMode == TailableMode::kTailableAndAwaitData; + return tailableMode == TailableModeEnum::kTailableAndAwaitData; } // The explain verbosity requested by the user, or boost::none if no explain was requested. @@ -198,7 +198,7 @@ public: Variables variables; VariablesParseState variablesParseState; - TailableMode tailableMode = TailableMode::kNormal; + TailableModeEnum tailableMode = TailableModeEnum::kNormal; // Tracks the depth of nested aggregation sub-pipelines. Used to enforce depth limits. size_t subPipelineDepth = 0; diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 2f6fce9e526..335ca4f5655 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -28,9 +28,7 @@ #include "mongo/platform/basic.h" -// This file defines functions from both of these headers #include "mongo/db/pipeline/pipeline.h" -#include "mongo/db/pipeline/pipeline_optimizations.h" #include <algorithm> @@ -40,6 +38,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/accumulator.h" +#include "mongo/db/pipeline/cluster_aggregation_planner.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_geo_near.h" @@ -47,6 +46,7 @@ #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" @@ -329,13 +329,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() { // Keep a copy of the original source list in case we need to reset the pipeline from split to // unsplit later. shardPipeline->_unsplitSources.emplace(_sources); - - // The order in which optimizations are applied can have significant impact on the - // efficiency of the final pipeline. Be Careful! - Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this); - Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this); - Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this); - + cluster_aggregation_planner::performSplitPipelineOptimizations(shardPipeline.get(), this); shardPipeline->_splitState = SplitState::kSplitForShards; _splitState = SplitState::kSplitForMerge; @@ -366,87 +360,6 @@ void Pipeline::unsplitFromSharded( stitch(); } -void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) { - while (!mergePipe->_sources.empty()) { - intrusive_ptr<DocumentSource> current = mergePipe->_sources.front(); - mergePipe->_sources.pop_front(); - - // Check if this source is splittable. - SplittableDocumentSource* splittable = - dynamic_cast<SplittableDocumentSource*>(current.get()); - - if (!splittable) { - // Move the source from the merger _sources to the shard _sources. - shardPipe->_sources.push_back(current); - } else { - // Split this source into 'merge' and 'shard' _sources. - intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource(); - auto mergeSources = splittable->getMergeSources(); - - // A source may not simultaneously be present on both sides of the split. - invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) == - mergeSources.end()); - - if (shardSource) - shardPipe->_sources.push_back(shardSource); - - // Add the stages in reverse order, so that they appear in the pipeline in the same - // order as they were returned by the stage. - for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) { - mergePipe->_sources.push_front(*it); - } - - break; - } - } -} - -void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, - Pipeline* mergePipe) { - while (!shardPipe->_sources.empty() && - dynamic_cast<DocumentSourceUnwind*>(shardPipe->_sources.back().get())) { - mergePipe->_sources.push_front(shardPipe->_sources.back()); - shardPipe->_sources.pop_back(); - } -} - -void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, - Pipeline* mergePipe) { - auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery()) - ? DepsTracker::MetadataAvailable::kTextScore - : DepsTracker::MetadataAvailable::kNoMetadata; - DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata)); - if (mergeDeps.needWholeDocument) - return; // the merge needs all fields, so nothing we can do. - - // Empty project is "special" so if no fields are needed, we just ask for _id instead. - if (mergeDeps.fields.empty()) - mergeDeps.fields.insert("_id"); - - // Remove metadata from dependencies since it automatically flows through projection and we - // don't want to project it in to the document. - mergeDeps.setNeedTextScore(false); - - // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of - // field dependencies. While this may not be 100% ideal in all cases, it is simple and - // avoids the worst cases by ensuring that: - // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of - // dependencies. This situation can happen when a $sort is before the first $project or - // $group. Without the optimization, the shards would have to reify and transmit full - // objects even though only a subset of fields are needed. - // 2) Optimization IS NOT applied immediately following a $project or $group since it would - // add an unnecessary project (and therefore a deep-copy). - for (auto&& source : shardPipe->_sources) { - DepsTracker dt(depsMetadata); - if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS) - return; - } - // if we get here, add the project. - boost::intrusive_ptr<DocumentSource> project = DocumentSourceProject::createFromBson( - BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx); - shardPipe->_sources.push_back(project); -} - BSONObj Pipeline::getInitialQuery() const { if (_sources.empty()) return BSONObj(); @@ -699,7 +612,35 @@ Status Pipeline::_pipelineCanRunOnMongoS() const { return Status::OK(); } -boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria( +void Pipeline::pushBack(boost::intrusive_ptr<DocumentSource> newStage) { + newStage->setSource(_sources.back().get()); + _sources.push_back(std::move(newStage)); +} + +boost::intrusive_ptr<DocumentSource> Pipeline::popBack() { + if (_sources.empty()) { + return nullptr; + } + auto targetStage = _sources.back(); + _sources.pop_back(); + return targetStage; +} + +boost::intrusive_ptr<DocumentSource> Pipeline::popFront() { + if (_sources.empty()) { + return nullptr; + } + auto targetStage = _sources.front(); + _sources.pop_front(); + stitch(); + return targetStage; +} + +boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithName(StringData targetStageName) { + return popFrontWithNameAndCriteria(targetStageName, nullptr); +} + +boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithNameAndCriteria( StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate) { if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) { return nullptr; @@ -710,8 +651,7 @@ boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria( return nullptr; } - _sources.pop_front(); - stitch(); - return targetStage; + return popFront(); } + } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index bcb7ee64521..04b5769792d 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -39,6 +39,8 @@ #include "mongo/db/pipeline/value.h" #include "mongo/db/query/explain_options.h" #include "mongo/db/query/query_knobs.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/stdx/functional.h" #include "mongo/util/intrusive_counter.h" #include "mongo/util/timer.h" @@ -268,13 +270,34 @@ public: } /** + * Removes and returns the first stage of the pipeline. Returns nullptr if the pipeline is + * empty. + */ + boost::intrusive_ptr<DocumentSource> popFront(); + + /** + * Removes and returns the last stage of the pipeline. Returns nullptr if the pipeline is empty. + */ + boost::intrusive_ptr<DocumentSource> popBack(); + + /** + * Adds the given stage to the end of the pipeline. + */ + void pushBack(boost::intrusive_ptr<DocumentSource>); + + /** + * Removes and returns the first stage of the pipeline if its name is 'targetStageName'. + * Returns nullptr if there is no first stage with that name. + */ + boost::intrusive_ptr<DocumentSource> popFrontWithName(StringData targetStageName); + + /** * Removes and returns the first stage of the pipeline if its name is 'targetStageName' and the * given 'predicate' function, if present, returns 'true' when called with a pointer to the * stage. Returns nullptr if there is no first stage which meets these criteria. */ - boost::intrusive_ptr<DocumentSource> popFrontWithCriteria( - StringData targetStageName, - stdx::function<bool(const DocumentSource* const)> predicate = nullptr); + boost::intrusive_ptr<DocumentSource> popFrontWithNameAndCriteria( + StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate); /** * PipelineD is a "sister" class that has additional functionality for the Pipeline. It exists @@ -286,15 +309,6 @@ public: friend class PipelineD; private: - class Optimizations { - public: - // This contains static functions that optimize pipelines in various ways. - // This is a class rather than a namespace so that it can be a friend of Pipeline. - // It is defined in pipeline_optimizations.h. - class Sharded; - }; - - friend class Optimizations::Sharded; friend class PipelineDeleter; /** diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index e460407f89e..9b4d7f6904c 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -384,7 +384,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS; } - if (expCtx->needsMerge && expCtx->tailableMode == TailableMode::kTailableAndAwaitData) { + if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; } diff --git a/src/mongo/db/pipeline/pipeline_optimizations.h b/src/mongo/db/pipeline/pipeline_optimizations.h deleted file mode 100644 index af85fdff4b0..00000000000 --- a/src/mongo/db/pipeline/pipeline_optimizations.h +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Copyright 2013 (c) 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -/** - * This file declares optimizations available on Pipelines. For now they should be considered part - * of Pipeline's implementation rather than it's interface. - */ - -#pragma once - -#include "mongo/db/pipeline/pipeline.h" - -namespace mongo { -/** - * This class holds optimizations applied to a shard Pipeline and a merger Pipeline. - * - * Each function has the same signature and takes two Pipelines, both as an in/out parameters. - */ -class Pipeline::Optimizations::Sharded { -public: - /** - * Moves everything before a splittable stage to the shards. If there - * are no splittable stages, moves everything to the shards. - * - * It is not safe to call this optimization multiple times. - * - * NOTE: looks for SplittableDocumentSources and uses that API - */ - static void findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe); - - /** - * If the final stage on shards is to unwind an array, move that stage to the merger. This - * cuts down on network traffic and allows us to take advantage of reduced copying in - * unwind. - */ - static void moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe); - - /** - * Adds a stage to the end of shardPipe explicitly requesting all fields that mergePipe - * needs. This is only done if it heuristically determines that it is needed. This - * optimization can reduce the amount of network traffic and can also enable the shards to - * convert less source BSON into Documents. - */ - static void limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe); -}; -} // namespace mongo diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index 81c4ac22cf9..3638d5ba00b 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -187,7 +187,8 @@ env.Library( target="query_request", source=[ "query_request.cpp", - "tailable_mode.cpp" + "tailable_mode.cpp", + env.Idlc("tailable_mode.idl")[0], ], LIBDEPS=[ "$BUILD_DIR/mongo/base", diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index d3852f2ee21..b2dd828f4c2 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -141,6 +141,23 @@ public: }; /** + * Constructs a CursorResponse from the command BSON response. + */ + static StatusWith<CursorResponse> parseFromBSON(const BSONObj& cmdResponse); + + /** + * A throwing version of 'parseFromBSON'. + */ + static CursorResponse parseFromBSONThrowing(const BSONObj& cmdResponse) { + return uassertStatusOK(parseFromBSON(cmdResponse)); + } + + /** + * Constructs an empty cursor response. + */ + CursorResponse() = default; + + /** * Constructs from values for each of the fields. */ CursorResponse(NamespaceString nss, @@ -186,15 +203,13 @@ public: } /** - * Constructs a CursorResponse from the command BSON response. - */ - static StatusWith<CursorResponse> parseFromBSON(const BSONObj& cmdResponse); - - /** * Converts this response to its raw BSON representation. */ BSONObj toBSON(ResponseType responseType) const; void addToBSON(ResponseType responseType, BSONObjBuilder* builder) const; + BSONObj toBSONAsInitialResponse() const { + return toBSON(ResponseType::InitialResponse); + } private: NamespaceString _nss; diff --git a/src/mongo/db/query/getmore_request.cpp b/src/mongo/db/query/getmore_request.cpp index a6b29ea520b..ea2515123f6 100644 --- a/src/mongo/db/query/getmore_request.cpp +++ b/src/mongo/db/query/getmore_request.cpp @@ -59,7 +59,7 @@ GetMoreRequest::GetMoreRequest() : cursorid(0), batchSize(0) {} GetMoreRequest::GetMoreRequest(NamespaceString namespaceString, CursorId id, - boost::optional<long long> sizeOfBatch, + boost::optional<std::int64_t> sizeOfBatch, boost::optional<Milliseconds> awaitDataTimeout, boost::optional<long long> term, boost::optional<repl::OpTime> lastKnownCommittedOpTime) @@ -108,7 +108,7 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna boost::optional<NamespaceString> nss; // Optional fields. - boost::optional<long long> batchSize; + boost::optional<std::int64_t> batchSize; boost::optional<Milliseconds> awaitDataTimeout; boost::optional<long long> term; boost::optional<repl::OpTime> lastKnownCommittedOpTime; diff --git a/src/mongo/db/query/getmore_request.h b/src/mongo/db/query/getmore_request.h index 8fa2d0fc5dd..4da4839abbd 100644 --- a/src/mongo/db/query/getmore_request.h +++ b/src/mongo/db/query/getmore_request.h @@ -52,7 +52,7 @@ struct GetMoreRequest { */ GetMoreRequest(NamespaceString namespaceString, CursorId id, - boost::optional<long long> sizeOfBatch, + boost::optional<std::int64_t> sizeOfBatch, boost::optional<Milliseconds> awaitDataTimeout, boost::optional<long long> term, boost::optional<repl::OpTime> lastKnownCommittedOpTime); @@ -76,7 +76,7 @@ struct GetMoreRequest { // The batch size is optional. If not provided, we will put as many documents into the batch // as fit within the byte limit. - const boost::optional<long long> batchSize; + const boost::optional<std::int64_t> batchSize; // The number of milliseconds for which a getMore on a tailable, awaitData query should block. const boost::optional<Milliseconds> awaitDataTimeout; diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp index 6268785c6a6..56ffe5dbb42 100644 --- a/src/mongo/db/query/query_request.cpp +++ b/src/mongo/db/query/query_request.cpp @@ -495,16 +495,16 @@ void QueryRequest::asFindCommand(BSONObjBuilder* cmdBuilder) const { } switch (_tailableMode) { - case TailableMode::kTailable: { + case TailableModeEnum::kTailable: { cmdBuilder->append(kTailableField, true); break; } - case TailableMode::kTailableAndAwaitData: { + case TailableModeEnum::kTailableAndAwaitData: { cmdBuilder->append(kTailableField, true); cmdBuilder->append(kAwaitDataField, true); break; } - case TailableMode::kNormal: { + case TailableModeEnum::kNormal: { break; } } @@ -623,7 +623,7 @@ Status QueryRequest::validate() const { << _maxTimeMS); } - if (_tailableMode != TailableMode::kNormal) { + if (_tailableMode != TailableModeEnum::kNormal) { // Tailable cursors cannot have any sort other than {$natural: 1}. const BSONObj expectedSort = BSON(kNaturalSortField << 1); if (!_sort.isEmpty() && @@ -911,9 +911,9 @@ Status QueryRequest::initFullQuery(const BSONObj& top) { int QueryRequest::getOptions() const { int options = 0; - if (_tailableMode == TailableMode::kTailable) { + if (_tailableMode == TailableModeEnum::kTailable) { options |= QueryOption_CursorTailable; - } else if (_tailableMode == TailableMode::kTailableAndAwaitData) { + } else if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) { options |= QueryOption_CursorTailable; options |= QueryOption_AwaitData; } diff --git a/src/mongo/db/query/query_request.h b/src/mongo/db/query/query_request.h index 7a89e466faf..29abf54aaca 100644 --- a/src/mongo/db/query/query_request.h +++ b/src/mongo/db/query/query_request.h @@ -319,19 +319,19 @@ public: } bool isTailable() const { - return _tailableMode == TailableMode::kTailable || - _tailableMode == TailableMode::kTailableAndAwaitData; + return _tailableMode == TailableModeEnum::kTailable || + _tailableMode == TailableModeEnum::kTailableAndAwaitData; } bool isTailableAndAwaitData() const { - return _tailableMode == TailableMode::kTailableAndAwaitData; + return _tailableMode == TailableModeEnum::kTailableAndAwaitData; } - void setTailableMode(TailableMode tailableMode) { + void setTailableMode(TailableModeEnum tailableMode) { _tailableMode = tailableMode; } - TailableMode getTailableMode() const { + TailableModeEnum getTailableMode() const { return _tailableMode; } @@ -498,7 +498,7 @@ private: bool _hasReadPref = false; // Options that can be specified in the OP_QUERY 'flags' header. - TailableMode _tailableMode = TailableMode::kNormal; + TailableModeEnum _tailableMode = TailableModeEnum::kNormal; bool _slaveOk = false; bool _oplogReplay = false; bool _noCursorTimeout = false; diff --git a/src/mongo/db/query/query_request_test.cpp b/src/mongo/db/query/query_request_test.cpp index edcf58cecb1..9992fdb155e 100644 --- a/src/mongo/db/query/query_request_test.cpp +++ b/src/mongo/db/query/query_request_test.cpp @@ -1165,7 +1165,7 @@ TEST(QueryRequestTest, ConvertToAggregationWithShowRecordIdFails) { TEST(QueryRequestTest, ConvertToAggregationWithTailableFails) { QueryRequest qr(testns); - qr.setTailableMode(TailableMode::kTailable); + qr.setTailableMode(TailableModeEnum::kTailable); ASSERT_NOT_OK(qr.asAggregationCommand()); } @@ -1183,7 +1183,7 @@ TEST(QueryRequestTest, ConvertToAggregationWithNoCursorTimeoutFails) { TEST(QueryRequestTest, ConvertToAggregationWithAwaitDataFails) { QueryRequest qr(testns); - qr.setTailableMode(TailableMode::kTailableAndAwaitData); + qr.setTailableMode(TailableModeEnum::kTailableAndAwaitData); ASSERT_NOT_OK(qr.asAggregationCommand()); } diff --git a/src/mongo/db/query/tailable_mode.cpp b/src/mongo/db/query/tailable_mode.cpp index b19a1988672..f09a7b59c41 100644 --- a/src/mongo/db/query/tailable_mode.cpp +++ b/src/mongo/db/query/tailable_mode.cpp @@ -32,17 +32,17 @@ namespace mongo { -StatusWith<TailableMode> tailableModeFromBools(bool isTailable, bool isAwaitData) { +StatusWith<TailableModeEnum> tailableModeFromBools(bool isTailable, bool isAwaitData) { if (isTailable) { if (isAwaitData) { - return TailableMode::kTailableAndAwaitData; + return TailableModeEnum::kTailableAndAwaitData; } - return TailableMode::kTailable; + return TailableModeEnum::kTailable; } else if (isAwaitData) { return {ErrorCodes::FailedToParse, "Cannot set 'awaitData' without also setting 'tailable'"}; } - return TailableMode::kNormal; + return TailableModeEnum::kNormal; } } // namespace mongo diff --git a/src/mongo/db/query/tailable_mode.h b/src/mongo/db/query/tailable_mode.h index 92c0fe9292e..531191c6f76 100644 --- a/src/mongo/db/query/tailable_mode.h +++ b/src/mongo/db/query/tailable_mode.h @@ -30,19 +30,14 @@ #include "mongo/base/error_codes.h" #include "mongo/base/status_with.h" +#include "mongo/db/query/tailable_mode_gen.h" namespace mongo { -enum class TailableMode { - kNormal, - kTailable, - kTailableAndAwaitData, -}; - /** * Returns a TailableMode from two booleans, returning ErrorCodes::FailedToParse if awaitData is * set without tailable. */ -StatusWith<TailableMode> tailableModeFromBools(bool isTailable, bool isAwaitData); +StatusWith<TailableModeEnum> tailableModeFromBools(bool isTailable, bool isAwaitData); } // namespace mongo diff --git a/src/mongo/db/query/tailable_mode.idl b/src/mongo/db/query/tailable_mode.idl new file mode 100644 index 00000000000..33d5b6989dd --- /dev/null +++ b/src/mongo/db/query/tailable_mode.idl @@ -0,0 +1,37 @@ +# Copyright (C) 2018 MongoDB Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the GNU Affero General Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. + +global: + cpp_namespace: "mongo" + +enums: + TailableMode: + description: "Describes the tailablility of a cursor." + type: string + values: + kNormal: "normal" + kTailable: "tailable" + kTailableAndAwaitData: "tailableAndAwaitData" diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index bcb0da9d789..fb0890b87a5 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1241,7 +1241,6 @@ env.Library( '$BUILD_DIR/mongo/executor/task_executor_interface', '$BUILD_DIR/mongo/rpc/command_status', '$BUILD_DIR/mongo/s/query/async_results_merger', - '$BUILD_DIR/mongo/s/query/cluster_client_cursor', '$BUILD_DIR/mongo/util/progress_meter', ], ) diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 773f54a0b8b..8aee68a4c58 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -43,7 +43,6 @@ #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/server_parameters.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/util/assert_util.h" #include "mongo/util/destructor_guard.h" #include "mongo/util/fail_point_service.h" @@ -613,20 +612,25 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa << " cursors established."; // Initialize the 'AsyncResultsMerger'(ARM). - std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; + std::vector<RemoteCursor> remoteCursors; for (auto&& cursorResponse : cursorResponses) { // A placeholder 'ShardId' is used until the ARM is made less sharding specific. - remoteCursors.emplace_back( - ShardId("CollectionClonerSyncSource"), _source, std::move(cursorResponse)); + remoteCursors.emplace_back(); + auto& newCursor = remoteCursors.back(); + newCursor.setShardId("CollectionClonerSyncSource"); + newCursor.setHostAndPort(_source); + newCursor.setCursorResponse(std::move(cursorResponse)); } - _clusterClientCursorParams = stdx::make_unique<ClusterClientCursorParams>(_sourceNss); - _clusterClientCursorParams->remotes = std::move(remoteCursors); - if (_collectionCloningBatchSize > 0) - _clusterClientCursorParams->batchSize = _collectionCloningBatchSize; + AsyncResultsMergerParams armParams; + armParams.setNss(_sourceNss); + armParams.setRemotes(std::move(remoteCursors)); + if (_collectionCloningBatchSize > 0) { + armParams.setBatchSize(_collectionCloningBatchSize); + } Client::initThreadIfNotAlready(); _arm = stdx::make_unique<AsyncResultsMerger>( - cc().getOperationContext(), _executor, _clusterClientCursorParams.get()); + cc().getOperationContext(), _executor, std::move(armParams)); // This completion guard invokes _finishCallback on destruction. auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index e79258e3734..d97c5c0048f 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -301,8 +301,6 @@ private: const int _maxNumClonerCursors; // (M) Component responsible for fetching the documents from the collection cloner cursor(s). std::unique_ptr<AsyncResultsMerger> _arm; - // (R) The cursor parameters used by the 'AsyncResultsMerger'. - std::unique_ptr<ClusterClientCursorParams> _clusterClientCursorParams; // (M) The event handle for the 'kill' event of the 'AsyncResultsMerger'. executor::TaskExecutor::EventHandle _killArmHandle; diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index ebfaccbde5c..5a834ed8aa5 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -369,7 +369,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterTimeout) opCtx(), collScanParams, workingSet.get(), matchExpression.get()); auto queryRequest = stdx::make_unique<QueryRequest>(nss); queryRequest->setFilter(filter); - queryRequest->setTailableMode(TailableMode::kTailableAndAwaitData); + queryRequest->setTailableMode(TailableModeEnum::kTailableAndAwaitData); auto canonicalQuery = unittest::assertGet( CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); auto planExecutor = @@ -381,7 +381,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterTimeout) PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT)); // Make a DocumentSourceCursor. - ctx()->tailableMode = TailableMode::kTailableAndAwaitData; + ctx()->tailableMode = TailableModeEnum::kTailableAndAwaitData; // DocumentSourceCursor expects a PlanExecutor that has had its state saved. planExecutor->saveState(); auto cursor = @@ -419,7 +419,7 @@ TEST_F(DocumentSourceCursorTest, NonAwaitDataCursorShouldErrorAfterTimeout) { PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT)); // Make a DocumentSourceCursor. - ctx()->tailableMode = TailableMode::kNormal; + ctx()->tailableMode = TailableModeEnum::kNormal; // DocumentSourceCursor expects a PlanExecutor that has had its state saved. planExecutor->saveState(); auto cursor = @@ -449,7 +449,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKil opCtx(), collScanParams, workingSet.get(), matchExpression.get()); auto queryRequest = stdx::make_unique<QueryRequest>(nss); queryRequest->setFilter(filter); - queryRequest->setTailableMode(TailableMode::kTailableAndAwaitData); + queryRequest->setTailableMode(TailableModeEnum::kTailableAndAwaitData); auto canonicalQuery = unittest::assertGet( CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); auto planExecutor = @@ -461,7 +461,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKil PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED)); // Make a DocumentSourceCursor. - ctx()->tailableMode = TailableMode::kTailableAndAwaitData; + ctx()->tailableMode = TailableModeEnum::kTailableAndAwaitData; // DocumentSourceCursor expects a PlanExecutor that has had its state saved. planExecutor->saveState(); auto cursor = @@ -498,7 +498,7 @@ TEST_F(DocumentSourceCursorTest, NormalCursorShouldErrorAfterBeingKilled) { PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED)); // Make a DocumentSourceCursor. - ctx()->tailableMode = TailableMode::kNormal; + ctx()->tailableMode = TailableModeEnum::kNormal; // DocumentSourceCursor expects a PlanExecutor that has had its state saved. planExecutor->saveState(); auto cursor = diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index 8f8025e4b9d..51a5f7723cd 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -99,7 +99,7 @@ public: Collection* coll, BSONObj& filterObj, PlanExecutor::YieldPolicy yieldPolicy = PlanExecutor::YieldPolicy::YIELD_MANUAL, - TailableMode tailableMode = TailableMode::kNormal) { + TailableModeEnum tailableMode = TailableModeEnum::kNormal) { CollectionScanParams csparams; csparams.collection = coll; csparams.direction = CollectionScanParams::FORWARD; @@ -307,7 +307,7 @@ TEST_F(PlanExecutorTest, ShouldReportErrorIfKilledDuringYieldButIsTailableAndAwa auto exec = makeCollScanExec(coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, - TailableMode::kTailableAndAwaitData); + TailableModeEnum::kTailableAndAwaitData); BSONObj resultObj; ASSERT_EQ(PlanExecutor::DEAD, exec->getNext(&resultObj, nullptr)); @@ -323,7 +323,7 @@ TEST_F(PlanExecutorTest, ShouldNotSwallowExceedsTimeLimitDuringYieldButIsTailabl Collection* coll = ctx.getCollection(); auto exec = makeCollScanExec( - coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, TailableMode::kTailable); + coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, TailableModeEnum::kTailable); BSONObj resultObj; ASSERT_EQ(PlanExecutor::DEAD, exec->getNext(&resultObj, nullptr)); diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 88c888c586a..d6dbd316763 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -41,6 +41,7 @@ #include "mongo/db/curop.h" #include "mongo/db/logical_clock.h" #include "mongo/db/operation_context.h" +#include "mongo/db/pipeline/cluster_aggregation_planner.h" #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/document_source_out.h" @@ -274,15 +275,14 @@ bool verifyTargetedShardsAtClusterTime(OperationContext* opCtx, return true; } -std::vector<ClusterClientCursorParams::RemoteCursor> establishShardCursors( - OperationContext* opCtx, - const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - CachedCollectionRoutingInfo* routingInfo, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery, - const BSONObj& collation) { +std::vector<RemoteCursor> establishShardCursors(OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + CachedCollectionRoutingInfo* routingInfo, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref, + const BSONObj& shardQuery, + const BSONObj& collation) { LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; std::set<ShardId> shardIds = @@ -351,7 +351,7 @@ struct DispatchShardPipelineResults { // Populated if this *is not* an explain, this vector represents the cursors on the remote // shards. - std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; + std::vector<RemoteCursor> remoteCursors; // Populated if this *is* an explain, this vector represents the results from each shard. std::vector<AsyncRequestsSender::Response> remoteExplainOutput; @@ -389,7 +389,7 @@ DispatchShardPipelineResults dispatchShardPipeline( // pipeline is already split and we now only need to target a single shard, reassemble the // original pipeline. // - After exhausting 10 attempts to establish the cursors, we give up and throw. - auto cursors = std::vector<ClusterClientCursorParams::RemoteCursor>(); + auto cursors = std::vector<RemoteCursor>(); auto shardResults = std::vector<AsyncRequestsSender::Response>(); auto opCtx = expCtx->opCtx; @@ -558,7 +558,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, BSONObj cmdToRunOnNewShards, const LiteParsedPipeline& liteParsedPipeline, std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, - std::vector<ClusterClientCursorParams::RemoteCursor> cursors) { + std::vector<RemoteCursor> cursors) { ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx)); @@ -566,7 +566,6 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, params.tailableMode = pipelineForMerging->getContext()->tailableMode; params.mergePipeline = std::move(pipelineForMerging); params.remotes = std::move(cursors); - // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch // size we pass here is used for getMores, so do not specify a batch size if the initial request // had a batch size of 0. @@ -576,12 +575,19 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, if (liteParsedPipeline.hasChangeStream()) { // For change streams, we need to set up a custom stage to establish cursors on new shards - // when they are added. - params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params) { + // when they are added. Be careful to extract the targeted shard IDs before the remote + // cursors are transferred from the ClusterClientCursorParams to the AsyncResultsMerger. + std::vector<ShardId> shardIds; + for (const auto& remote : params.remotes) { + shardIds.emplace_back(remote.getShardId().toString()); + } + + params.createCustomCursorSource = [cmdToRunOnNewShards, + shardIds](OperationContext* opCtx, + executor::TaskExecutor* executor, + ClusterClientCursorParams* params) { return stdx::make_unique<RouterStageUpdateOnAddShard>( - opCtx, executor, params, cmdToRunOnNewShards); + opCtx, executor, params, std::move(shardIds), cmdToRunOnNewShards); }; } auto ccc = ClusterClientCursorImpl::make( @@ -630,7 +636,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, ccc->detachFromOperationContext(); - int nShards = ccc->getRemotes().size(); + int nShards = ccc->getNumRemotes(); CursorId clusterCursorId = 0; if (cursorState == ClusterCursorManager::CursorState::NotExhausted) { @@ -696,7 +702,8 @@ ShardId pickMergingShard(OperationContext* opCtx, return dispatchResults.needsPrimaryShardMerge ? primaryShard : dispatchResults.remoteCursors[prng.nextInt32(dispatchResults.remoteCursors.size())] - .shardId; + .getShardId() + .toString(); } } // namespace @@ -849,15 +856,16 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, auto executorPool = Grid::get(opCtx)->getExecutorPool(); const BSONObj reply = uassertStatusOK(storePossibleCursor( opCtx, - remoteCursor.shardId, - remoteCursor.hostAndPort, - remoteCursor.cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse), + remoteCursor.getShardId().toString(), + remoteCursor.getHostAndPort(), + remoteCursor.getCursorResponse().toBSON(CursorResponse::ResponseType::InitialResponse), namespaces.requestedNss, executorPool->getArbitraryExecutor(), Grid::get(opCtx)->getCursorManager(), mergeCtx->tailableMode)); - return appendCursorResponseToCommandResult(remoteCursor.shardId, reply, result); + return appendCursorResponseToCommandResult( + remoteCursor.getShardId().toString(), reply, result); } // If we reach here, we have a merge pipeline to dispatch. @@ -893,10 +901,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, ShardId mergingShardId = pickMergingShard(opCtx, dispatchResults, executionNsRoutingInfo.db().primaryId()); - mergingPipeline->addInitialSource(DocumentSourceMergeCursors::create( + cluster_aggregation_planner::addMergeCursorsSource( + mergingPipeline.get(), std::move(dispatchResults.remoteCursors), - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - mergeCtx)); + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline); auto mergeResponse = @@ -999,8 +1007,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, namespaces.requestedNss, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), Grid::get(opCtx)->getCursorManager(), - liteParsedPipeline.hasChangeStream() ? TailableMode::kTailableAndAwaitData - : TailableMode::kNormal)); + liteParsedPipeline.hasChangeStream() ? TailableModeEnum::kTailableAndAwaitData + : TailableModeEnum::kNormal)); } // First append the properly constructed writeConcernError. It will then be skipped diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp index 060c5533877..4d91ddfff08 100644 --- a/src/mongo/s/commands/pipeline_s.cpp +++ b/src/mongo/s/commands/pipeline_s.cpp @@ -114,7 +114,7 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument( cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern); } - auto shardResult = std::vector<ClusterClientCursorParams::RemoteCursor>(); + auto shardResult = std::vector<RemoteCursor>(); auto findCmd = cmdBuilder.obj(); size_t numAttempts = 0; while (++numAttempts <= kMaxNumStaleVersionRetries) { @@ -164,13 +164,13 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument( invariant(shardResult.size() == 1u); - auto& cursor = shardResult.front().cursorResponse; + auto& cursor = shardResult.front().getCursorResponse(); auto& batch = cursor.getBatch(); // We should have at most 1 result, and the cursor should be exhausted. uassert(ErrorCodes::InternalError, str::stream() << "Shard cursor was unexpectedly open after lookup: " - << shardResult.front().hostAndPort + << shardResult.front().getHostAndPort() << ", id: " << cursor.getCursorId(), cursor.getCursorId() == 0); diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 8ae277237ae..baafa3bbce6 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -579,7 +579,7 @@ DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss } uassertStatusOK(statusGetDb); - boost::optional<long long> batchSize; + boost::optional<std::int64_t> batchSize; if (ntoreturn) { batchSize = ntoreturn; } diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index d9148577f5a..aa9a1713255 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -86,6 +86,7 @@ env.Library( source=[ "async_results_merger.cpp", "establish_cursors.cpp", + env.Idlc('async_results_merger_params.idl')[0], ], LIBDEPS=[ "$BUILD_DIR/mongo/db/query/command_request_response", diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 59d9a133d72..45403399e8c 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -82,20 +82,27 @@ int compareSortKeys(BSONObj leftSortKey, BSONObj rightSortKey, BSONObj sortKeyPa AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, executor::TaskExecutor* executor, - ClusterClientCursorParams* params) + AsyncResultsMergerParams params) : _opCtx(opCtx), _executor(executor), - _params(params), - _mergeQueue(MergingComparator(_remotes, _params->sort, _params->compareWholeSortKey)) { + // This strange initialization is to work around the fact that the IDL does not currently + // support a default value for an enum. The default tailable mode should be 'kNormal', but + // since that is not supported we treat boost::none (unspecified) to mean 'kNormal'. + _tailableMode(params.getTailableMode() ? *params.getTailableMode() + : TailableModeEnum::kNormal), + _params(std::move(params)), + _mergeQueue(MergingComparator(_remotes, + _params.getSort() ? *_params.getSort() : BSONObj(), + _params.getCompareWholeSortKey())) { size_t remoteIndex = 0; - for (const auto& remote : _params->remotes) { - _remotes.emplace_back(remote.hostAndPort, - remote.cursorResponse.getNSS(), - remote.cursorResponse.getCursorId()); + for (const auto& remote : _params.getRemotes()) { + _remotes.emplace_back(remote.getHostAndPort(), + remote.getCursorResponse().getNSS(), + remote.getCursorResponse().getCursorId()); // We don't check the return value of _addBatchToBuffer here; if there was an error, // it will be stored in the remote and the first call to ready() will return true. - _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse); + _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse()); ++remoteIndex; } } @@ -123,7 +130,7 @@ bool AsyncResultsMerger::_remotesExhausted(WithLock) { Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_params->tailableMode != TailableMode::kTailableAndAwaitData) { + if (_tailableMode != TailableModeEnum::kTailableAndAwaitData) { return Status(ErrorCodes::BadValue, "maxTimeMS can only be used with getMore for tailable, awaitData cursors"); } @@ -133,9 +140,9 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { // recent optimes, which allows us to return sorted $changeStream results even if some shards // are yet to provide a batch of data. If the timeout specified by the client is greater than // 1000ms, then it will be enforced elsewhere. - _awaitDataTimeout = (!_params->sort.isEmpty() && _remotes.size() > 1u - ? std::min(awaitDataTimeout, Milliseconds{1000}) - : awaitDataTimeout); + _awaitDataTimeout = + (_params.getSort() && _remotes.size() > 1u ? std::min(awaitDataTimeout, Milliseconds{1000}) + : awaitDataTimeout); return Status::OK(); } @@ -161,13 +168,12 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) { _opCtx = opCtx; } -void AsyncResultsMerger::addNewShardCursors( - const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors) { +void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCursors) { stdx::lock_guard<stdx::mutex> lk(_mutex); for (auto&& remote : newCursors) { - _remotes.emplace_back(remote.hostAndPort, - remote.cursorResponse.getNSS(), - remote.cursorResponse.getCursorId()); + _remotes.emplace_back(remote.getHostAndPort(), + remote.getCursorResponse().getNSS(), + remote.getCursorResponse().getCursorId()); } } @@ -190,16 +196,15 @@ bool AsyncResultsMerger::_ready(WithLock lk) { } } - const bool hasSort = !_params->sort.isEmpty(); - return hasSort ? _readySorted(lk) : _readyUnsorted(lk); + return _params.getSort() ? _readySorted(lk) : _readyUnsorted(lk); } bool AsyncResultsMerger::_readySorted(WithLock lk) { - if (_params->tailableMode == TailableMode::kTailableAndAwaitData) { + if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) { return _readySortedTailable(lk); } // Tailable non-awaitData cursors cannot have a sort. - invariant(_params->tailableMode == TailableMode::kNormal); + invariant(_tailableMode == TailableModeEnum::kNormal); for (const auto& remote : _remotes) { if (!remote.hasNext() && !remote.exhausted()) { @@ -218,13 +223,14 @@ bool AsyncResultsMerger::_readySortedTailable(WithLock) { auto smallestRemote = _mergeQueue.top(); auto smallestResult = _remotes[smallestRemote].docBuffer.front(); auto keyWeWantToReturn = - extractSortKey(*smallestResult.getResult(), _params->compareWholeSortKey); + extractSortKey(*smallestResult.getResult(), _params.getCompareWholeSortKey()); for (const auto& remote : _remotes) { if (!remote.promisedMinSortKey) { // In order to merge sorted tailable cursors, we need this value to be populated. return false; } - if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, _params->sort) > 0) { + if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, *_params.getSort()) > + 0) { // The key we want to return is not guaranteed to be smaller than future results from // this remote, so we can't yet return it. return false; @@ -264,13 +270,12 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() { return {ClusterQueryResult()}; } - const bool hasSort = !_params->sort.isEmpty(); - return hasSort ? _nextReadySorted(lk) : _nextReadyUnsorted(lk); + return _params.getSort() ? _nextReadySorted(lk) : _nextReadyUnsorted(lk); } ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) { // Tailable non-awaitData cursors cannot have a sort. - invariant(_params->tailableMode != TailableMode::kTailable); + invariant(_tailableMode != TailableModeEnum::kTailable); if (_mergeQueue.empty()) { return {}; @@ -304,7 +309,7 @@ ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) { ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front(); _remotes[_gettingFromRemote].docBuffer.pop(); - if (_params->tailableMode == TailableMode::kTailable && + if (_tailableMode == TailableModeEnum::kTailable && !_remotes[_gettingFromRemote].hasNext()) { // The cursor is tailable and we're about to return the last buffered result. This // means that the next value returned should be boost::none to indicate the end of @@ -334,9 +339,9 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { // request to fetch the remaining docs only. If the remote node has a plan with OR for top k and // a full sort as is the case for the OP_QUERY find then this optimization will prevent // switching to the full sort plan branch. - auto adjustedBatchSize = _params->batchSize; - if (_params->batchSize && *_params->batchSize > remote.fetchedCount) { - adjustedBatchSize = *_params->batchSize - remote.fetchedCount; + auto adjustedBatchSize = _params.getBatchSize(); + if (_params.getBatchSize() && *_params.getBatchSize() > remote.fetchedCount) { + adjustedBatchSize = *_params.getBatchSize() - remote.fetchedCount; } BSONObj cmdObj = GetMoreRequest(remote.cursorNss, @@ -348,7 +353,7 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { .toBSON(); executor::RemoteCommandRequest request( - remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _opCtx); + remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, _opCtx); auto callbackStatus = _executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) { @@ -448,7 +453,8 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, remote->cursorId = response.getCursorId(); if (response.getLastOplogTimestamp() && !response.getLastOplogTimestamp()->isNull()) { // We only expect to see this for change streams. - invariant(SimpleBSONObjComparator::kInstance.evaluate(_params->sort == + invariant(_params.getSort()); + invariant(SimpleBSONObjComparator::kInstance.evaluate(*_params.getSort() == change_stream_constants::kSortSpec)); auto newLatestTimestamp = *response.getLastOplogTimestamp(); @@ -475,7 +481,7 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, auto maxSortKeyFromResponse = (response.getBatch().empty() ? BSONObj() - : extractSortKey(response.getBatch().back(), _params->compareWholeSortKey)); + : extractSortKey(response.getBatch().back(), _params.getCompareWholeSortKey())); remote->promisedMinSortKey = (compareSortKeys( @@ -525,7 +531,7 @@ void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t remote.status = std::move(status); // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We // remove the unreachable host entirely from consideration by marking it as exhausted. - if (_params->isAllowPartialResults) { + if (_params.getAllowPartialResults()) { remote.status = Status::OK(); // Clear the results buffer and cursor id. @@ -565,7 +571,7 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk, // through to the client as-is. // (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from // one shard means the end of the overall batch). - if (_params->tailableMode == TailableMode::kTailable && !remote.hasNext()) { + if (_tailableMode == TailableModeEnum::kTailable && !remote.hasNext()) { invariant(_remotes.size() == 1); _eofNext = true; } else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive) { @@ -582,7 +588,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, updateRemoteMetadata(&remote, response); for (const auto& obj : response.getBatch()) { // If there's a sort, we're expecting the remote node to have given us back a sort key. - if (!_params->sort.isEmpty()) { + if (_params.getSort()) { auto key = obj[AsyncResultsMerger::kSortKeyField]; if (!key) { remote.status = @@ -591,7 +597,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, << "' in document: " << obj); return false; - } else if (!_params->compareWholeSortKey && key.type() != BSONType::Object) { + } else if (!_params.getCompareWholeSortKey() && key.type() != BSONType::Object) { remote.status = Status(ErrorCodes::InternalError, str::stream() << "Field '" << AsyncResultsMerger::kSortKeyField @@ -606,9 +612,9 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, ++remote.fetchedCount; } - // If we're doing a sorted merge, then we have to make sure to put this remote onto the - // merge queue. - if (!_params->sort.isEmpty() && !response.getBatch().empty()) { + // If we're doing a sorted merge, then we have to make sure to put this remote onto the merge + // queue. + if (_params.getSort() && !response.getBatch().empty()) { _mergeQueue.push(remoteIndex); } return true; @@ -638,10 +644,10 @@ void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx) for (const auto& remote : _remotes) { if (remote.status.isOK() && remote.cursorId && !remote.exhausted()) { - BSONObj cmdObj = KillCursorsRequest(_params->nsString, {remote.cursorId}).toBSON(); + BSONObj cmdObj = KillCursorsRequest(_params.getNss(), {remote.cursorId}).toBSON(); executor::RemoteCommandRequest request( - remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, opCtx); + remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, opCtx); // Send kill request; discard callback handle, if any, or failure report, if not. _executor->scheduleRemoteCommand(request, [](auto const&) {}).getStatus().ignore(); diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index e2551a3c7dd..1653374b5bc 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -37,7 +37,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/cursor_id.h" #include "mongo/executor/task_executor.h" -#include "mongo/s/query/cluster_client_cursor_params.h" +#include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/s/query/cluster_query_result.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/with_lock.h" @@ -98,7 +98,7 @@ public: */ AsyncResultsMerger(OperationContext* opCtx, executor::TaskExecutor* executor, - ClusterClientCursorParams* params); + AsyncResultsMergerParams params); /** * In order to be destroyed, either the ARM must have been kill()'ed or all cursors must have @@ -195,7 +195,11 @@ public: * Adds the specified shard cursors to the set of cursors to be merged. The results from the * new cursors will be returned as normal through nextReady(). */ - void addNewShardCursors(const std::vector<ClusterClientCursorParams::RemoteCursor>& newCursors); + void addNewShardCursors(std::vector<RemoteCursor>&& newCursors); + + std::size_t getNumRemotes() const { + return _remotes.size(); + } /** * Starts shutting down this ARM by canceling all pending requests and scheduling killCursors @@ -293,7 +297,7 @@ private: private: const std::vector<RemoteCursorData>& _remotes; - const BSONObj& _sort; + const BSONObj _sort; // When '_compareWholeSortKey' is true, $sortKey is a scalar value, rather than an object. // We extract the sort key {$sortKey: <value>}. The sort key pattern '_sort' is verified to @@ -401,7 +405,8 @@ private: OperationContext* _opCtx; executor::TaskExecutor* _executor; - ClusterClientCursorParams* _params; + TailableModeEnum _tailableMode; + AsyncResultsMergerParams _params; // Must be acquired before accessing any data members (other than _params, which is read-only). stdx::mutex _mutex; diff --git a/src/mongo/s/query/async_results_merger_params.idl b/src/mongo/s/query/async_results_merger_params.idl new file mode 100644 index 00000000000..dafc9b53c1c --- /dev/null +++ b/src/mongo/s/query/async_results_merger_params.idl @@ -0,0 +1,89 @@ +# Copyright (C) 2018 MongoDB Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the GNU Affero General Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. + +global: + cpp_namespace: "mongo" + cpp_includes: + - "mongo/s/shard_id.h" + - "mongo/util/net/hostandport.h" + - "mongo/db/query/cursor_response.h" + +imports: + - "mongo/db/query/tailable_mode.idl" + - "mongo/idl/basic_types.idl" + - "mongo/util/net/hostandport.idl" + +types: + CursorResponse: + bson_serialization_type: object + description: The first batch returned after establishing cursors on a shard. + cpp_type: CursorResponse + serializer: CursorResponse::toBSONAsInitialResponse + deserializer: CursorResponse::parseFromBSONThrowing + +structs: + RemoteCursor: + description: A description of a cursor opened on a remote server. + fields: + shardId: + type: string + description: The shardId of the shard on which the cursor resides. + hostAndPort: + type: HostAndPort + description: The exact host (within the shard) on which the cursor resides. + cursorResponse: + type: CursorResponse + description: The response after establishing a cursor on the remote shard, including + the first batch. + + AsyncResultsMergerParams: + description: The parameters needed to establish an AsyncResultsMerger. + fields: + sort: + type: object + description: The sort requested on the merging operation. Empty if there is no sort. + optional: true + compareWholeSortKey: + type: bool + default: false + description: >- + When 'compareWholeSortKey' is true, $sortKey is a scalar value, rather than an + object. We extract the sort key {$sortKey: <value>}. The sort key pattern is + verified to be {$sortKey: 1}. + remotes: array<RemoteCursor> + tailableMode: + type: TailableMode + optional: true + description: If set, the tailability mode of this cursor. + batchSize: + type: safeInt64 + optional: true + description: The batch size for this cursor. + nss: namespacestring + allowPartialResults: + type: bool + default: false + description: If set, error responses are ignored. diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 4c0e32cba51..6fd81715e90 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -62,9 +62,11 @@ const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", HostAndPort("FakeShard2Host", 12345), HostAndPort("FakeShard3Host", 12345)}; +const NamespaceString kTestNss("testdb.testcoll"); + class AsyncResultsMergerTest : public ShardingTestFixture { public: - AsyncResultsMergerTest() : _nss("testdb.testcoll") {} + AsyncResultsMergerTest() {} void setUp() override { ShardingTestFixture::setUp(); @@ -95,42 +97,48 @@ public: void tearDown() override { ShardingTestFixture::tearDown(); - // Reset _params only after shutting down the network interface (through - // ShardingTestFixture::tearDown()), because shutting down the network interface will - // deliver blackholed responses to the AsyncResultsMerger, and the AsyncResultsMerger's - // callback may still access _params. - _params.reset(); } protected: /** * Constructs an ARM with the given vector of existing cursors. * - * If 'findCmd' is not set, the default ClusterClientCursorParams are used. - * Otherwise, the 'findCmd' is used to construct the ClusterClientCursorParams. + * If 'findCmd' is not set, the default AsyncResultsMergerParams are used. + * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams. * * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.' */ - void makeCursorFromExistingCursors( - std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors, + std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors( + std::vector<RemoteCursor> remoteCursors, boost::optional<BSONObj> findCmd = boost::none, - boost::optional<long long> getMoreBatchSize = boost::none) { - _params = stdx::make_unique<ClusterClientCursorParams>(_nss); - _params->remotes = std::move(remoteCursors); + boost::optional<std::int64_t> getMoreBatchSize = boost::none) { + AsyncResultsMergerParams params; + params.setNss(kTestNss); + params.setRemotes(std::move(remoteCursors)); + if (findCmd) { const auto qr = unittest::assertGet( - QueryRequest::makeFromFindCommand(_nss, *findCmd, false /* isExplain */)); - _params->sort = qr->getSort(); - _params->limit = qr->getLimit(); - _params->batchSize = getMoreBatchSize ? getMoreBatchSize : qr->getBatchSize(); - _params->skip = qr->getSkip(); - _params->tailableMode = qr->getTailableMode(); - _params->isAllowPartialResults = qr->isAllowPartialResults(); + QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */)); + if (!qr->getSort().isEmpty()) { + params.setSort(qr->getSort().getOwned()); + } + + if (getMoreBatchSize) { + params.setBatchSize(getMoreBatchSize); + } else { + params.setBatchSize(qr->getBatchSize() + ? boost::optional<std::int64_t>( + static_cast<std::int64_t>(*qr->getBatchSize())) + : boost::none); + } + params.setTailableMode(qr->getTailableMode()); + params.setAllowPartialResults(qr->isAllowPartialResults()); } - arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), _params.get()); + return stdx::make_unique<AsyncResultsMerger>( + operationContext(), executor(), std::move(params)); } /** @@ -220,11 +228,6 @@ protected: net->blackHole(net->getNextReadyRequest()); net->exitNetwork(); } - - const NamespaceString _nss; - std::unique_ptr<ClusterClientCursorParams> _params; - - std::unique_ptr<AsyncResultsMerger> arm; }; void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) { @@ -240,10 +243,19 @@ void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) { ASSERT_EQ(numCursors, 1u); } +RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) { + RemoteCursor remoteCursor; + remoteCursor.setShardId(std::move(shardId)); + remoteCursor.setHostAndPort(std::move(host)); + remoteCursor.setCursorResponse(std::move(response)); + return remoteCursor; +} + TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -259,7 +271,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) { // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated. std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; - responses.emplace_back(_nss, CursorId(0), batch); + responses.emplace_back(kTestNss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -285,9 +297,10 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) { TEST_F(AsyncResultsMergerTest, SingleShardSorted) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -303,7 +316,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) { // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated. std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")}; - responses.emplace_back(_nss, CursorId(0), batch); + responses.emplace_back(kTestNss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -328,10 +341,12 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) { } TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -348,7 +363,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; - responses.emplace_back(_nss, CursorId(0), batch1); + responses.emplace_back(kTestNss, CursorId(0), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -376,7 +391,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { responses.clear(); std::vector<BSONObj> batch2 = { fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(_nss, CursorId(0), batch2); + responses.emplace_back(kTestNss, CursorId(0), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -392,18 +407,20 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult()); - // After returning all the buffered results, the ARM returns EOF immediately because both - // shards cursors were exhausted. + // After returning all the buffered results, the ARM returns EOF immediately because both shards + // cursors were exhausted. ASSERT_TRUE(arm->ready()); ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, MultiShardSorted) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -420,7 +437,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")}; - responses.emplace_back(_nss, CursorId(0), batch1); + responses.emplace_back(kTestNss, CursorId(0), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -434,7 +451,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) { responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 3}}"), fromjson("{$sortKey: {'': 9}}")}; - responses.emplace_back(_nss, CursorId(0), batch2); + responses.emplace_back(kTestNss, CursorId(0), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -456,17 +473,19 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) { ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 9}}"), *unittest::assertGet(arm->nextReady()).getResult()); - // After returning all the buffered results, the ARM returns EOF immediately because both - // shards cursors were exhausted. + // After returning all the buffered results, the ARM returns EOF immediately because both shards + // cursors were exhausted. ASSERT_TRUE(arm->ready()); ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -479,7 +498,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; - responses.emplace_back(_nss, CursorId(5), batch1); + responses.emplace_back(kTestNss, CursorId(5), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -508,7 +527,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { responses.clear(); std::vector<BSONObj> batch2 = { fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(_nss, CursorId(0), batch2); + responses.emplace_back(kTestNss, CursorId(0), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -536,7 +555,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { responses.clear(); std::vector<BSONObj> batch3 = { fromjson("{_id: 7}"), fromjson("{_id: 8}"), fromjson("{_id: 9}")}; - responses.emplace_back(_nss, CursorId(0), batch3); + responses.emplace_back(kTestNss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -552,19 +571,22 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady()).getResult()); - // After returning all the buffered results, the ARM returns EOF immediately because both - // shards cursors were exhausted. + // After returning all the buffered results, the ARM returns EOF immediately because both shards + // cursors were exhausted. ASSERT_TRUE(arm->ready()); ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, CompoundSortKey) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 6, {})); - cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 7, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 6, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 7, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); // Schedule requests. ASSERT_FALSE(arm->ready()); @@ -575,13 +597,13 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5, '': 9}}"), fromjson("{$sortKey: {'': 4, '': 20}}")}; - responses.emplace_back(_nss, CursorId(0), batch1); + responses.emplace_back(kTestNss, CursorId(0), batch1); std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 10, '': 11}}"), fromjson("{$sortKey: {'': 4, '': 4}}")}; - responses.emplace_back(_nss, CursorId(0), batch2); + responses.emplace_back(kTestNss, CursorId(0), batch2); std::vector<BSONObj> batch3 = {fromjson("{$sortKey: {'': 10, '': 12}}"), fromjson("{$sortKey: {'': 5, '': 9}}")}; - responses.emplace_back(_nss, CursorId(0), batch3); + responses.emplace_back(kTestNss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -607,17 +629,18 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) { ASSERT_BSONOBJ_EQ(fromjson("{$sortKey: {'': 4, '': 20}}"), *unittest::assertGet(arm->nextReady()).getResult()); - // After returning all the buffered results, the ARM returns EOF immediately because both - // shards cursors were exhausted. + // After returning all the buffered results, the ARM returns EOF immediately because both shards + // cursors were exhausted. ASSERT_TRUE(arm->ready()); ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -626,7 +649,7 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) { // Parsing the batch results in an error because the sort key is missing. std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{a: 2, b: 1}"), fromjson("{a: 1, b: 2}")}; - responses.emplace_back(_nss, CursorId(1), batch1); + responses.emplace_back(kTestNss, CursorId(1), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -644,10 +667,10 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) { TEST_F(AsyncResultsMergerTest, HasFirstBatch) { std::vector<BSONObj> firstBatch = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back( - kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, std::move(firstBatch))); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back(makeRemoteCursor( + kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, std::move(firstBatch)))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); // Because there was firstBatch, ARM is immediately ready to return results. ASSERT_TRUE(arm->ready()); @@ -675,7 +698,7 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) { // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated. std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(_nss, CursorId(0), batch); + responses.emplace_back(kTestNss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -702,11 +725,12 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) { TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) { std::vector<BSONObj> firstBatch = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back( - kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 5, std::move(firstBatch))); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 0, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back(makeRemoteCursor( + kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 5, std::move(firstBatch)))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 0, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); // Because there was firstBatch, ARM is immediately ready to return results. ASSERT_TRUE(arm->ready()); @@ -734,7 +758,7 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) { // Shard responds; the handleBatchResponse callbacks are run and ARM's remotes get updated. std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(_nss, CursorId(0), batch); + responses.emplace_back(kTestNss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -759,10 +783,12 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) { } TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -771,9 +797,9 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { // Both shards respond with the first batch. std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(1), batch1); + responses.emplace_back(kTestNss, CursorId(1), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(_nss, CursorId(2), batch2); + responses.emplace_back(kTestNss, CursorId(2), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -795,7 +821,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { // never responds. responses.clear(); std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(_nss, CursorId(1), batch3); + responses.emplace_back(kTestNss, CursorId(1), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); blackHoleNextRequest(); @@ -813,7 +839,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { // We can continue to return results from first shard, while second shard remains unresponsive. responses.clear(); std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")}; - responses.emplace_back(_nss, CursorId(0), batch4); + responses.emplace_back(kTestNss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -828,12 +854,15 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { // the network interface. auto killEvent = arm->kill(operationContext()); ASSERT_TRUE(killEvent.isValid()); + executor()->shutdown(); + executor()->waitForEvent(killEvent); } TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -841,7 +870,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; - responses.emplace_back(_nss, CursorId(456), batch); + responses.emplace_back(kTestNss, CursorId(456), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -855,22 +884,25 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { } TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {})); - cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 789, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 789, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - BSONObj response1 = CursorResponse(_nss, CursorId(123), batch1) + BSONObj response1 = CursorResponse(kTestNss, CursorId(123), batch1) .toBSON(CursorResponse::ResponseType::SubsequentResponse); BSONObj response2 = fromjson("{foo: 'bar'}"); std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")}; - BSONObj response3 = CursorResponse(_nss, CursorId(789), batch3) + BSONObj response3 = CursorResponse(kTestNss, CursorId(789), batch3) .toBSON(CursorResponse::ResponseType::SubsequentResponse); scheduleNetworkResponseObjs({response1, response2, response3}); runReadyCallbacks(); @@ -885,11 +917,14 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { } TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {})); - cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -897,9 +932,9 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(1), batch1); + responses.emplace_back(kTestNss, CursorId(1), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(_nss, CursorId(2), batch2); + responses.emplace_back(kTestNss, CursorId(2), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -918,9 +953,10 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { } TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -930,7 +966,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(0), batch); + responses.emplace_back(kTestNss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -948,9 +984,10 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { } TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); executor()->shutdown(); ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent().getStatus()); @@ -959,9 +996,10 @@ TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) { } TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatches) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); // Make a request to the shard that will never get answered. ASSERT_FALSE(arm->ready()); @@ -979,9 +1017,10 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch } TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto killedEvent = arm->kill(operationContext()); @@ -996,11 +1035,14 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) { } TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {})); - cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1008,11 +1050,11 @@ TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(0), batch1); + responses.emplace_back(kTestNss, CursorId(0), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(_nss, CursorId(0), batch2); + responses.emplace_back(kTestNss, CursorId(0), batch2); std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(_nss, CursorId(0), batch3); + responses.emplace_back(kTestNss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1027,11 +1069,14 @@ TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) { } TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {})); - cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 123, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 123, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1039,12 +1084,12 @@ TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(0), batch1); + responses.emplace_back(kTestNss, CursorId(0), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(_nss, CursorId(0), batch2); + responses.emplace_back(kTestNss, CursorId(0), batch2); // Cursor 3 is not exhausted. std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; - responses.emplace_back(_nss, CursorId(123), batch3); + responses.emplace_back(kTestNss, CursorId(123), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1059,11 +1104,14 @@ TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) { } TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 2, {})); - cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 3, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 3, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1071,7 +1119,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(0), batch1); + responses.emplace_back(kTestNss, CursorId(0), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1091,9 +1139,10 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { } TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1101,7 +1150,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(1), batch1); + responses.emplace_back(kTestNss, CursorId(1), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1114,9 +1163,10 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { } TEST_F(AsyncResultsMergerTest, KillCalledTwice) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); auto killedEvent1 = arm->kill(operationContext()); ASSERT(killedEvent1.isValid()); auto killedEvent2 = arm->kill(operationContext()); @@ -1127,9 +1177,10 @@ TEST_F(AsyncResultsMergerTest, KillCalledTwice) { TEST_F(AsyncResultsMergerTest, TailableBasic) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1137,7 +1188,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(123), batch1); + responses.emplace_back(kTestNss, CursorId(123), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1158,7 +1209,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")}; - responses.emplace_back(_nss, CursorId(123), batch2); + responses.emplace_back(kTestNss, CursorId(123), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1176,9 +1227,10 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1187,7 +1239,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { // Remote responds with an empty batch and a non-zero cursor id. std::vector<CursorResponse> responses; std::vector<BSONObj> batch; - responses.emplace_back(_nss, CursorId(123), batch); + responses.emplace_back(kTestNss, CursorId(123), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1204,9 +1256,10 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1215,7 +1268,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { // Remote responds with an empty batch and a zero cursor id. std::vector<CursorResponse> responses; std::vector<BSONObj> batch; - responses.emplace_back(_nss, CursorId(0), batch); + responses.emplace_back(kTestNss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1229,9 +1282,10 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 3}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1239,7 +1293,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(1), batch1); + responses.emplace_back(kTestNss, CursorId(1), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1254,7 +1308,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")}; - responses.emplace_back(_nss, CursorId(0), batch2); + responses.emplace_back(kTestNss, CursorId(0), batch2); readyEvent = unittest::assertGet(arm->nextEvent()); BSONObj scheduledCmd = getNthPendingRequest(0).cmdObj; @@ -1274,11 +1328,14 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { TEST_F(AsyncResultsMergerTest, AllowPartialResults) { BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 97, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 98, {})); - cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 99, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 97, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 98, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 99, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1292,9 +1349,9 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { // remaining shards. std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; - responses.emplace_back(_nss, CursorId(98), batch1); + responses.emplace_back(kTestNss, CursorId(98), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(99), batch2); + responses.emplace_back(kTestNss, CursorId(99), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1315,7 +1372,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { responses.clear(); std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")}; - responses.emplace_back(_nss, CursorId(99), batch3); + responses.emplace_back(kTestNss, CursorId(99), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1330,7 +1387,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { // Once the last reachable shard indicates that its cursor is closed, we're done. responses.clear(); std::vector<BSONObj> batch4 = {}; - responses.emplace_back(_nss, CursorId(0), batch4); + responses.emplace_back(kTestNss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1341,9 +1398,10 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 98, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 98, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1351,7 +1409,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(98), batch); + responses.emplace_back(kTestNss, CursorId(98), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1374,10 +1432,12 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) { BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 2, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 2, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1386,7 +1446,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) { // First host returns single result std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}")}; - responses.emplace_back(_nss, CursorId(0), batch); + responses.emplace_back(kTestNss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1404,10 +1464,12 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) { TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - cursors.emplace_back(kTestShardIds[2], kTestShardHosts[2], CursorResponse(_nss, 2, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[2], kTestShardHosts[2], CursorResponse(kTestNss, 2, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1432,9 +1494,10 @@ TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) { TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1442,7 +1505,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; - responses.emplace_back(_nss, CursorId(123), batch1); + responses.emplace_back(kTestNss, CursorId(123), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1464,7 +1527,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")}; - responses.emplace_back(_nss, CursorId(123), batch2); + responses.emplace_back(kTestNss, CursorId(123), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1485,20 +1548,24 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { // Clean up. responses.clear(); std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")}; - responses.emplace_back(_nss, CursorId(0), batch3); + responses.emplace_back(kTestNss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoOplogTimestamp) { - auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); - cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {})); - params->remotes = std::move(cursors); - params->tailableMode = TailableMode::kTailableAndAwaitData; - params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"); - arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get()); + AsyncResultsMergerParams params; + params.setNss(kTestNss); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {}))); + params.setRemotes(std::move(cursors)); + params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); + params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}")); + auto arm = + stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1510,7 +1577,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}; const Timestamp lastObservedFirstCursor = Timestamp(1, 6); - responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); + responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1523,7 +1590,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas fromjson("{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: 1, documentKey: {_id: 2}}, " "$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")}; const Timestamp lastObservedSecondCursor = Timestamp(1, 5); - responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); + responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1543,37 +1610,40 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas // Clean up the cursors. responses.clear(); std::vector<BSONObj> batch3 = {}; - responses.emplace_back(_nss, CursorId(0), batch3); + responses.emplace_back(kTestNss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); responses.clear(); std::vector<BSONObj> batch4 = {}; - responses.emplace_back(_nss, CursorId(0), batch4); + responses.emplace_back(kTestNss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNullOplogTimestamp) { - auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back( + AsyncResultsMergerParams params; + params.setNss(kTestNss); + std::vector<RemoteCursor> cursors; + cursors.push_back(makeRemoteCursor( kTestShardIds[0], kTestShardHosts[0], CursorResponse( - _nss, + kTestNss, 123, {fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}, boost::none, - Timestamp(1, 5))); - cursors.emplace_back(kTestShardIds[1], + Timestamp(1, 5)))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], - CursorResponse(_nss, 456, {}, boost::none, Timestamp())); - params->remotes = std::move(cursors); - params->tailableMode = TailableMode::kTailableAndAwaitData; - params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"); - arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get()); + CursorResponse(kTestNss, 456, {}, boost::none, Timestamp()))); + params.setRemotes(std::move(cursors)); + params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); + params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}")); + auto arm = + stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1581,7 +1651,7 @@ TEST_F(AsyncResultsMergerTest, std::vector<CursorResponse> responses; std::vector<BSONObj> batch3 = {}; - responses.emplace_back(_nss, CursorId(0), batch3, boost::none, Timestamp(1, 8)); + responses.emplace_back(kTestNss, CursorId(0), batch3, boost::none, Timestamp(1, 8)); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(unittest::assertGet(arm->nextEvent())); @@ -1597,31 +1667,34 @@ TEST_F(AsyncResultsMergerTest, // Clean up. responses.clear(); std::vector<BSONObj> batch4 = {}; - responses.emplace_back(_nss, CursorId(0), batch4); + responses.emplace_back(kTestNss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOplogTime) { - auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + AsyncResultsMergerParams params; + params.setNss(kTestNss); + std::vector<RemoteCursor> cursors; Timestamp tooLow = Timestamp(1, 2); - cursors.emplace_back( + cursors.push_back(makeRemoteCursor( kTestShardIds[0], kTestShardHosts[0], CursorResponse( - _nss, + kTestNss, 123, {fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}, boost::none, - Timestamp(1, 5))); - cursors.emplace_back( - kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}, boost::none, tooLow)); - params->remotes = std::move(cursors); - params->tailableMode = TailableMode::kTailableAndAwaitData; - params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"); - arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get()); + Timestamp(1, 5)))); + cursors.push_back(makeRemoteCursor(kTestShardIds[1], + kTestShardHosts[1], + CursorResponse(kTestNss, 456, {}, boost::none, tooLow))); + params.setRemotes(std::move(cursors)); + params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); + params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}")); + auto arm = + stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1629,7 +1702,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOp // Clean up the cursors. std::vector<CursorResponse> responses; - responses.emplace_back(_nss, CursorId(0), std::vector<BSONObj>{}); + responses.emplace_back(kTestNss, CursorId(0), std::vector<BSONObj>{}); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); auto killEvent = arm->kill(operationContext()); @@ -1637,13 +1710,16 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOp } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) { - auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); - params->remotes = std::move(cursors); - params->tailableMode = TailableMode::kTailableAndAwaitData; - params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"); - arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get()); + AsyncResultsMergerParams params; + params.setNss(kTestNss); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + params.setRemotes(std::move(cursors)); + params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); + params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}")); + auto arm = + stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1655,7 +1731,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}; const Timestamp lastObservedFirstCursor = Timestamp(1, 6); - responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); + responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1663,9 +1739,10 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) ASSERT_TRUE(arm->ready()); // Add the new shard. - std::vector<ClusterClientCursorParams::RemoteCursor> newCursors; - newCursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {})); - arm->addNewShardCursors(newCursors); + std::vector<RemoteCursor> newCursors; + newCursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {}))); + arm->addNewShardCursors(std::move(newCursors)); // Now shouldn't be ready, we don't have a guarantee from each shard. ASSERT_FALSE(arm->ready()); @@ -1677,7 +1754,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) fromjson("{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: 1, documentKey: {_id: 2}}, " "$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")}; const Timestamp lastObservedSecondCursor = Timestamp(1, 5); - responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); + responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1697,24 +1774,27 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) // Clean up the cursors. responses.clear(); std::vector<BSONObj> batch3 = {}; - responses.emplace_back(_nss, CursorId(0), batch3); + responses.emplace_back(kTestNss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); responses.clear(); std::vector<BSONObj> batch4 = {}; - responses.emplace_back(_nss, CursorId(0), batch4); + responses.emplace_back(kTestNss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting) { - auto params = stdx::make_unique<ClusterClientCursorParams>(_nss, boost::none); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); - params->remotes = std::move(cursors); - params->tailableMode = TailableMode::kTailableAndAwaitData; - params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"); - arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get()); + AsyncResultsMergerParams params; + params.setNss(kTestNss); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + params.setRemotes(std::move(cursors)); + params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); + params.setSort(fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}")); + auto arm = + stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1726,7 +1806,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}; const Timestamp lastObservedFirstCursor = Timestamp(1, 6); - responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); + responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); @@ -1734,9 +1814,10 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting ASSERT_TRUE(arm->ready()); // Add the new shard. - std::vector<ClusterClientCursorParams::RemoteCursor> newCursors; - newCursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {})); - arm->addNewShardCursors(newCursors); + std::vector<RemoteCursor> newCursors; + newCursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {}))); + arm->addNewShardCursors(std::move(newCursors)); // Now shouldn't be ready, we don't have a guarantee from each shard. ASSERT_FALSE(arm->ready()); @@ -1750,7 +1831,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting // The last observed time should still be later than the first shard, so we can get the data // from it. const Timestamp lastObservedSecondCursor = Timestamp(1, 5); - responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); + responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); executor()->waitForEvent(readyEvent); @@ -1770,21 +1851,22 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting // Clean up the cursors. responses.clear(); std::vector<BSONObj> batch3 = {}; - responses.emplace_back(_nss, CursorId(0), batch3); + responses.emplace_back(kTestNss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); responses.clear(); std::vector<BSONObj> batch4 = {}; - responses.emplace_back(_nss, CursorId(0), batch4); + responses.emplace_back(kTestNss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); } TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); auto killEvent = arm->kill(operationContext()); @@ -1793,9 +1875,10 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); auto killEvent = arm->kill(operationContext()); @@ -1804,9 +1887,10 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) { TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); - makeCursorFromExistingCursors(std::move(cursors), findCmd); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1820,9 +1904,10 @@ TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) { } TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulingKillCursors) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -1850,9 +1935,10 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin } TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -1871,7 +1957,7 @@ TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) { // exhausted. onCommand([&](const auto& request) { ASSERT(request.cmdObj["getMore"]); - return CursorResponse(_nss, 0LL, {BSON("x" << 1)}) + return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)}) .toBSON(CursorResponse::ResponseType::SubsequentResponse); }); @@ -1879,9 +1965,10 @@ TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) { } TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); // Issue a blocking wait for the next result asynchronously on a different thread. auto future = launchAsync([&]() { @@ -1905,9 +1992,10 @@ TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) { } TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilKilled) { - std::vector<ClusterClientCursorParams::RemoteCursor> cursors; - cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); - makeCursorFromExistingCursors(std::move(cursors)); + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); // Before any requests are scheduled, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index 653599def7e..9c01c013ce6 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -113,7 +113,7 @@ public: /** * Returns a reference to the vector of remote hosts involved in this operation. */ - virtual const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const = 0; + virtual const std::size_t getNumRemotes() const = 0; /** * Returns the number of result documents returned so far by this cursor via the next() method. diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 58484e87bfa..1a4be45f1be 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -30,6 +30,7 @@ #include "mongo/s/query/cluster_client_cursor_impl.h" +#include "mongo/db/pipeline/cluster_aggregation_planner.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_skip.h" #include "mongo/db/pipeline/document_source_sort.h" @@ -135,20 +136,19 @@ OperationContext* ClusterClientCursorImpl::getCurrentOperationContext() const { } bool ClusterClientCursorImpl::isTailable() const { - return _params.tailableMode != TailableMode::kNormal; + return _params.tailableMode != TailableModeEnum::kNormal; } bool ClusterClientCursorImpl::isTailableAndAwaitData() const { - return _params.tailableMode == TailableMode::kTailableAndAwaitData; + return _params.tailableMode == TailableModeEnum::kTailableAndAwaitData; } BSONObj ClusterClientCursorImpl::getOriginatingCommand() const { return _params.originatingCommandObj; } -const std::vector<ClusterClientCursorParams::RemoteCursor>& ClusterClientCursorImpl::getRemotes() - const { - return _params.remotes; +const std::size_t ClusterClientCursorImpl::getNumRemotes() const { + return _root->getNumRemotes(); } long long ClusterClientCursorImpl::getNumReturnedSoFar() const { @@ -181,30 +181,6 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc namespace { -/** - * Rips off an initial $sort stage that will be handled by mongos execution machinery. Returns the - * sort key pattern of such a $sort stage if there was one, and boost::none otherwise. - */ -boost::optional<BSONObj> extractLeadingSort(Pipeline* mergePipeline) { - // Remove a leading $sort iff it is a mergesort, since the ARM cannot handle blocking $sort. - auto frontSort = mergePipeline->popFrontWithCriteria( - DocumentSourceSort::kStageName, [](const DocumentSource* const source) { - return static_cast<const DocumentSourceSort* const>(source)->mergingPresorted(); - }); - - if (frontSort) { - auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get()); - if (auto sortLimit = sortStage->getLimitSrc()) { - // There was a limit stage absorbed into the sort stage, so we need to preserve that. - mergePipeline->addInitialSource(sortLimit); - } - return sortStage - ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging) - .toBson(); - } - return boost::none; -} - bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) { return (dynamic_cast<DocumentSourceLimit*>(stage.get()) || dynamic_cast<DocumentSourceSkip*>(stage.get())); @@ -250,10 +226,10 @@ std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* execu // instead. while (!pipeline->getSources().empty()) { invariant(isSkipOrLimit(pipeline->getSources().front())); - if (auto skip = pipeline->popFrontWithCriteria(DocumentSourceSkip::kStageName)) { + if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) { root = stdx::make_unique<RouterStageSkip>( opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip()); - } else if (auto limit = pipeline->popFrontWithCriteria(DocumentSourceLimit::kStageName)) { + } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) { root = stdx::make_unique<RouterStageLimit>( opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit()); } @@ -270,7 +246,8 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( const auto skip = params->skip; const auto limit = params->limit; if (params->mergePipeline) { - if (auto sort = extractLeadingSort(params->mergePipeline.get())) { + if (auto sort = + cluster_aggregation_planner::popLeadingMergeSort(params->mergePipeline.get())) { params->sort = *sort; } return buildPipelinePlan(executor, params); diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index c685a383307..d3c9349233b 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -105,7 +105,7 @@ public: BSONObj getOriginatingCommand() const final; - const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const final; + const std::size_t getNumRemotes() const final; long long getNumReturnedSoFar() const final; diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index 0e3a3fd6731..6e624f36b84 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -68,8 +68,7 @@ BSONObj ClusterClientCursorMock::getOriginatingCommand() const { return _originatingCommand; } -const std::vector<ClusterClientCursorParams::RemoteCursor>& ClusterClientCursorMock::getRemotes() - const { +const std::size_t ClusterClientCursorMock::getNumRemotes() const { MONGO_UNREACHABLE; } diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index 7364240112d..1c50403c3ae 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -69,7 +69,7 @@ public: BSONObj getOriginatingCommand() const final; - const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const final; + const std::size_t getNumRemotes() const final; long long getNumReturnedSoFar() const final; diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index 116634bcee8..71a7f17c282 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -42,6 +42,7 @@ #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/tailable_mode.h" #include "mongo/s/client/shard.h" +#include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/util/net/hostandport.h" namespace mongo { @@ -61,22 +62,6 @@ class RouterExecStage; * this cursor have been processed. */ struct ClusterClientCursorParams { - struct RemoteCursor { - RemoteCursor(ShardId shardId, HostAndPort hostAndPort, CursorResponse cursorResponse) - : shardId(std::move(shardId)), - hostAndPort(std::move(hostAndPort)), - cursorResponse(std::move(cursorResponse)) {} - - // The shardId of the shard on which the cursor resides. - ShardId shardId; - - // The exact host (within the shard) on which the cursor resides. - HostAndPort hostAndPort; - - // Encompasses the state of the established cursor. - CursorResponse cursorResponse; - }; - ClusterClientCursorParams(NamespaceString nss, boost::optional<ReadPreferenceSetting> readPref = boost::none) : nsString(std::move(nss)) { @@ -85,6 +70,24 @@ struct ClusterClientCursorParams { } } + /** + * Extracts the subset of fields here needed by the AsyncResultsMerger. The returned + * AsyncResultsMergerParams will assume ownership of 'remotes'. + */ + AsyncResultsMergerParams extractARMParams() { + AsyncResultsMergerParams armParams; + if (!sort.isEmpty()) { + armParams.setSort(sort); + } + armParams.setCompareWholeSortKey(compareWholeSortKey); + armParams.setRemotes(std::move(remotes)); + armParams.setTailableMode(tailableMode); + armParams.setBatchSize(batchSize); + armParams.setNss(nsString); + armParams.setAllowPartialResults(isAllowPartialResults); + return armParams; + } + // Namespace against which the cursors exist. NamespaceString nsString; @@ -108,7 +111,7 @@ struct ClusterClientCursorParams { // The number of results per batch. Optional. If specified, will be specified as the batch for // each getMore. - boost::optional<long long> batchSize; + boost::optional<std::int64_t> batchSize; // Limits the number of results returned by the ClusterClientCursor to this many. Optional. // Should be forwarded to the remote hosts in 'cmdObj'. @@ -119,7 +122,7 @@ struct ClusterClientCursorParams { // Whether this cursor is tailing a capped collection, and whether it has the awaitData option // set. - TailableMode tailableMode = TailableMode::kNormal; + TailableModeEnum tailableMode = TailableModeEnum::kNormal; // Set if a readPreference must be respected throughout the lifetime of the cursor. boost::optional<ReadPreferenceSetting> readPreference; diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index cd5ce0f9bc8..ca66ee04e08 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -147,10 +147,9 @@ BSONObj ClusterCursorManager::PinnedCursor::getOriginatingCommand() const { return _cursor->getOriginatingCommand(); } -const std::vector<ClusterClientCursorParams::RemoteCursor>& -ClusterCursorManager::PinnedCursor::getRemotes() const { +const std::size_t ClusterCursorManager::PinnedCursor::getNumRemotes() const { invariant(_cursor); - return _cursor->getRemotes(); + return _cursor->getNumRemotes(); } CursorId ClusterCursorManager::PinnedCursor::getCursorId() const { diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index b9b4ee81ef6..cf6dc54f21e 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -198,7 +198,7 @@ public: /** * Returns a reference to the vector of remote hosts involved in this operation. */ - const std::vector<ClusterClientCursorParams::RemoteCursor>& getRemotes() const; + const std::size_t getNumRemotes() const; /** * Returns the cursor id for the underlying cursor, or zero if no cursor is owned. diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 7979f127e7c..3e68415f1a4 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -344,7 +344,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, } // Fill out query exec properties. - CurOp::get(opCtx)->debug().nShards = ccc->getRemotes().size(); + CurOp::get(opCtx)->debug().nShards = ccc->getNumRemotes(); CurOp::get(opCtx)->debug().nreturned = results->size(); // If the cursor is exhausted, then there are no more results to return and we don't need to @@ -491,7 +491,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, // Set the originatingCommand object and the cursorID in CurOp. { - CurOp::get(opCtx)->debug().nShards = pinnedCursor.getValue().getRemotes().size(); + CurOp::get(opCtx)->debug().nShards = pinnedCursor.getValue().getNumRemotes(); CurOp::get(opCtx)->debug().cursorid = request.cursorid; stdx::lock_guard<Client> lk(*opCtx->getClient()); CurOp::get(opCtx)->setOriginatingCommand_inlock( diff --git a/src/mongo/s/query/document_source_router_adapter.cpp b/src/mongo/s/query/document_source_router_adapter.cpp index 4e144751dcb..26a944ed5cc 100644 --- a/src/mongo/s/query/document_source_router_adapter.cpp +++ b/src/mongo/s/query/document_source_router_adapter.cpp @@ -67,6 +67,10 @@ Value DocumentSourceRouterAdapter::serialize( return Value(); // Return the empty value to hide this stage from explain output. } +std::size_t DocumentSourceRouterAdapter::getNumRemotes() const { + return _child->getNumRemotes(); +} + bool DocumentSourceRouterAdapter::remotesExhausted() { return _child->remotesExhausted(); } diff --git a/src/mongo/s/query/document_source_router_adapter.h b/src/mongo/s/query/document_source_router_adapter.h index 5c1a6a0935c..a7db7734539 100644 --- a/src/mongo/s/query/document_source_router_adapter.h +++ b/src/mongo/s/query/document_source_router_adapter.h @@ -59,6 +59,7 @@ public: void detachFromOperationContext() final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; bool remotesExhausted(); + std::size_t getNumRemotes() const; void setExecContext(RouterExecStage::ExecContext execContext) { _execContext = execContext; diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index f186bcb1bb5..08ce5a2cb5b 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -47,13 +47,12 @@ namespace mongo { -std::vector<ClusterClientCursorParams::RemoteCursor> establishCursors( - OperationContext* opCtx, - executor::TaskExecutor* executor, - const NamespaceString& nss, - const ReadPreferenceSetting readPref, - const std::vector<std::pair<ShardId, BSONObj>>& remotes, - bool allowPartialResults) { +std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, + executor::TaskExecutor* executor, + const NamespaceString& nss, + const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + bool allowPartialResults) { // Construct the requests std::vector<AsyncRequestsSender::Request> requests; for (const auto& remote : remotes) { @@ -68,20 +67,23 @@ std::vector<ClusterClientCursorParams::RemoteCursor> establishCursors( readPref, Shard::RetryPolicy::kIdempotent); - std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; + std::vector<RemoteCursor> remoteCursors; try { // Get the responses while (!ars.done()) { try { auto response = ars.next(); - - // uasserts must happen before attempting to access the optional shardHostAndPort. - auto cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON( + // Note the shardHostAndPort may not be populated if there was an error, so be sure + // to do this after parsing the cursor response to ensure the response was ok. + // Additionally, be careful not to push into 'remoteCursors' until we are sure we + // have a valid cursor, since the error handling path will attempt to clean up + // anything in 'remoteCursors' + RemoteCursor cursor; + cursor.setCursorResponse(CursorResponse::parseFromBSONThrowing( uassertStatusOK(std::move(response.swResponse)).data)); - - remoteCursors.emplace_back(std::move(response.shardId), - std::move(*response.shardHostAndPort), - std::move(cursorResponse)); + cursor.setShardId(std::move(response.shardId)); + cursor.setHostAndPort(*response.shardHostAndPort); + remoteCursors.push_back(std::move(cursor)); } catch (const DBException& ex) { // Retriable errors are swallowed if 'allowPartialResults' is true. if (allowPartialResults && @@ -114,18 +116,21 @@ std::vector<ClusterClientCursorParams::RemoteCursor> establishCursors( : response.swResponse.getStatus()); if (swCursorResponse.isOK()) { - remoteCursors.emplace_back(std::move(response.shardId), - *response.shardHostAndPort, - std::move(swCursorResponse.getValue())); + RemoteCursor cursor; + cursor.setShardId(std::move(response.shardId)); + cursor.setHostAndPort(*response.shardHostAndPort); + cursor.setCursorResponse(std::move(swCursorResponse.getValue())); + remoteCursors.push_back(std::move(cursor)); } } // Schedule killCursors against all cursors that were established. for (const auto& remoteCursor : remoteCursors) { BSONObj cmdObj = - KillCursorsRequest(nss, {remoteCursor.cursorResponse.getCursorId()}).toBSON(); + KillCursorsRequest(nss, {remoteCursor.getCursorResponse().getCursorId()}) + .toBSON(); executor::RemoteCommandRequest request( - remoteCursor.hostAndPort, nss.db().toString(), cmdObj, opCtx); + remoteCursor.getHostAndPort(), nss.db().toString(), cmdObj, opCtx); // We do not process the response to the killCursors request (we make a good-faith // attempt at cleaning up the cursors, but ignore any returned errors). diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h index b75a750d7b7..e88ddc2682e 100644 --- a/src/mongo/s/query/establish_cursors.h +++ b/src/mongo/s/query/establish_cursors.h @@ -36,7 +36,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/cursor_id.h" #include "mongo/executor/task_executor.h" -#include "mongo/s/query/cluster_client_cursor_params.h" +#include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -61,12 +61,11 @@ class CursorResponse; * on reachable hosts are returned. * */ -std::vector<ClusterClientCursorParams::RemoteCursor> establishCursors( - OperationContext* opCtx, - executor::TaskExecutor* executor, - const NamespaceString& nss, - const ReadPreferenceSetting readPref, - const std::vector<std::pair<ShardId, BSONObj>>& remotes, - bool allowPartialResults); +std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, + executor::TaskExecutor* executor, + const NamespaceString& nss, + const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + bool allowPartialResults); } // namespace mongo diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index 418419fdbef..515da5a358c 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -89,6 +89,14 @@ public: } /** + * Returns the number of remote hosts involved in this execution plan. + */ + virtual std::size_t getNumRemotes() const { + invariant(_child); // The default implementation forwards to the child stage. + return _child->getNumRemotes(); + } + + /** * Returns whether or not all the remote cursors are exhausted. */ virtual bool remotesExhausted() { diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index 4f17927483b..48abb1452ec 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -40,18 +40,21 @@ namespace mongo { RouterStageMerge::RouterStageMerge(OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) - : RouterExecStage(opCtx), _executor(executor), _params(params), _arm(opCtx, executor, params) {} + : RouterExecStage(opCtx), + _executor(executor), + _params(params), + _arm(opCtx, executor, params->extractARMParams()) {} StatusWith<ClusterQueryResult> RouterStageMerge::next(ExecContext execCtx) { // Non-tailable and tailable non-awaitData cursors always block until ready(). AwaitData // cursors wait for ready() only until a specified time limit is exceeded. - return (_params->tailableMode == TailableMode::kTailableAndAwaitData + return (_params->tailableMode == TailableModeEnum::kTailableAndAwaitData ? awaitNextWithTimeout(execCtx) : _arm.blockingNext()); } StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContext execCtx) { - invariant(_params->tailableMode == TailableMode::kTailableAndAwaitData); + invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData); // If we are in kInitialFind or kGetMoreWithAtLeastOneResultInBatch context and the ARM is not // ready, we don't block. Fall straight through to the return statement. while (!_arm.ready() && execCtx == ExecContext::kGetMoreNoResultsYet) { @@ -85,7 +88,7 @@ StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContex StatusWith<EventHandle> RouterStageMerge::getNextEvent() { // If we abandoned a previous event due to a mongoS-side timeout, wait for it first. if (_leftoverEventFromLastTimeout) { - invariant(_params->tailableMode == TailableMode::kTailableAndAwaitData); + invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData); auto event = _leftoverEventFromLastTimeout; _leftoverEventFromLastTimeout = EventHandle(); return event; @@ -102,14 +105,16 @@ bool RouterStageMerge::remotesExhausted() { return _arm.remotesExhausted(); } +std::size_t RouterStageMerge::getNumRemotes() const { + return _arm.getNumRemotes(); +} + Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { return _arm.setAwaitDataTimeout(awaitDataTimeout); } -void RouterStageMerge::addNewShardCursors( - std::vector<ClusterClientCursorParams::RemoteCursor>&& newShards) { - _arm.addNewShardCursors(newShards); - std::move(newShards.begin(), newShards.end(), std::back_inserter(_params->remotes)); +void RouterStageMerge::addNewShardCursors(std::vector<RemoteCursor>&& newShards) { + _arm.addNewShardCursors(std::move(newShards)); } } // namespace mongo diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index efd397b8c7e..b6bfee146b6 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -57,10 +57,12 @@ public: bool remotesExhausted() final; + std::size_t getNumRemotes() const final; + /** * Adds the cursors in 'newShards' to those being merged by the ARM. */ - void addNewShardCursors(std::vector<ClusterClientCursorParams::RemoteCursor>&& newShards); + void addNewShardCursors(std::vector<RemoteCursor>&& newShards); protected: Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index 97febc62173..5e94274b9ac 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -84,6 +84,10 @@ void RouterStagePipeline::kill(OperationContext* opCtx) { _mergePipeline->dispose(opCtx); } +std::size_t RouterStagePipeline::getNumRemotes() const { + return _mongosOnlyPipeline ? 0 : _routerAdapter->getNumRemotes(); +} + bool RouterStagePipeline::remotesExhausted() { return _mongosOnlyPipeline || _routerAdapter->remotesExhausted(); } diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index e876dc816a2..c14ddf9f80b 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -51,6 +51,8 @@ public: bool remotesExhausted() final; + std::size_t getNumRemotes() const final; + protected: Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.cpp b/src/mongo/s/query/router_stage_update_on_add_shard.cpp index 451a1ee9699..61fa2a9176d 100644 --- a/src/mongo/s/query/router_stage_update_on_add_shard.cpp +++ b/src/mongo/s/query/router_stage_update_on_add_shard.cpp @@ -56,9 +56,11 @@ bool needsUpdate(const StatusWith<ClusterQueryResult>& childResult) { RouterStageUpdateOnAddShard::RouterStageUpdateOnAddShard(OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params, + std::vector<ShardId> shardIds, BSONObj cmdToRunOnNewShards) : RouterExecStage(opCtx, stdx::make_unique<RouterStageMerge>(opCtx, executor, params)), _params(params), + _shardIds(std::move(shardIds)), _cmdToRunOnNewShards(cmdToRunOnNewShards) {} StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next( @@ -73,18 +75,12 @@ StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next( } void RouterStageUpdateOnAddShard::addNewShardCursors(BSONObj newShardDetectedObj) { - std::vector<ShardId> existingShardIds; - for (const auto& remote : _params->remotes) { - existingShardIds.push_back(remote.shardId); - } checked_cast<RouterStageMerge*>(getChildStage()) - ->addNewShardCursors( - establishShardCursorsOnNewShards(std::move(existingShardIds), newShardDetectedObj)); + ->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj)); } -std::vector<ClusterClientCursorParams::RemoteCursor> -RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(std::vector<ShardId> existingShardIds, - const BSONObj& newShardDetectedObj) { +std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards( + const BSONObj& newShardDetectedObj) { auto* opCtx = getOpCtx(); // Reload the shard registry. We need to ensure a reload initiated after calling this method // caused the reload, otherwise we aren't guaranteed to get all the new shards. @@ -98,12 +94,12 @@ RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(std::vector<ShardI std::vector<ShardId> shardIds, newShardIds; shardRegistry->getAllShardIdsNoReload(&shardIds); - std::sort(existingShardIds.begin(), existingShardIds.end()); + std::sort(_shardIds.begin(), _shardIds.end()); std::sort(shardIds.begin(), shardIds.end()); std::set_difference(shardIds.begin(), shardIds.end(), - existingShardIds.begin(), - existingShardIds.end(), + _shardIds.begin(), + _shardIds.end(), std::back_inserter(newShardIds)); auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand( @@ -112,6 +108,7 @@ RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(std::vector<ShardI std::vector<std::pair<ShardId, BSONObj>> requests; for (const auto& shardId : newShardIds) { requests.emplace_back(shardId, cmdObj); + _shardIds.push_back(shardId); } const bool allowPartialResults = false; // partial results are not allowed return establishCursors(opCtx, diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.h b/src/mongo/s/query/router_stage_update_on_add_shard.h index 1128dc83430..00ee921e2af 100644 --- a/src/mongo/s/query/router_stage_update_on_add_shard.h +++ b/src/mongo/s/query/router_stage_update_on_add_shard.h @@ -44,6 +44,7 @@ public: RouterStageUpdateOnAddShard(OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params, + std::vector<ShardId> shardIds, BSONObj cmdToRunOnNewShards); StatusWith<ClusterQueryResult> next(ExecContext) final; @@ -58,10 +59,10 @@ private: /** * Open the cursors on the new shards. */ - std::vector<ClusterClientCursorParams::RemoteCursor> establishShardCursorsOnNewShards( - std::vector<ShardId> existingShardIds, const BSONObj& newShardDetectedObj); + std::vector<RemoteCursor> establishShardCursorsOnNewShards(const BSONObj& newShardDetectedObj); ClusterClientCursorParams* _params; + std::vector<ShardId> _shardIds; BSONObj _cmdToRunOnNewShards; }; } diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index bdf4283c104..9a754364412 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -38,6 +38,7 @@ #include "mongo/s/query/cluster_client_cursor_impl.h" #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/cluster_cursor_manager.h" +#include "mongo/s/shard_id.h" namespace mongo { @@ -48,7 +49,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, const NamespaceString& requestedNss, executor::TaskExecutor* executor, ClusterCursorManager* cursorManager, - TailableMode tailableMode) { + TailableModeEnum tailableMode) { if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) { return cmdResult; } @@ -71,11 +72,13 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, } ClusterClientCursorParams params(incomingCursorResponse.getValue().getNSS()); - params.remotes.emplace_back(shardId, - server, - CursorResponse(incomingCursorResponse.getValue().getNSS(), - incomingCursorResponse.getValue().getCursorId(), - {})); + params.remotes.emplace_back(); + auto& remoteCursor = params.remotes.back(); + remoteCursor.setShardId(shardId.toString()); + remoteCursor.setHostAndPort(server); + remoteCursor.setCursorResponse(CursorResponse(incomingCursorResponse.getValue().getNSS(), + incomingCursorResponse.getValue().getCursorId(), + {})); params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); params.tailableMode = tailableMode; diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h index 75a4e76bf24..b9756be44f7 100644 --- a/src/mongo/s/query/store_possible_cursor.h +++ b/src/mongo/s/query/store_possible_cursor.h @@ -74,6 +74,6 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, const NamespaceString& requestedNss, executor::TaskExecutor* executor, ClusterCursorManager* cursorManager, - TailableMode tailableMode = TailableMode::kNormal); + TailableModeEnum tailableMode = TailableModeEnum::kNormal); } // namespace mongo diff --git a/src/mongo/shell/bench.cpp b/src/mongo/shell/bench.cpp index b096affd2cd..27b75dbb06c 100644 --- a/src/mongo/shell/bench.cpp +++ b/src/mongo/shell/bench.cpp @@ -214,12 +214,15 @@ int runQueryWithReadCommands(DBClientBase* conn, } while (cursorResponse.getCursorId() != 0) { - GetMoreRequest getMoreRequest(qr->nss(), - cursorResponse.getCursorId(), - qr->getBatchSize(), - boost::none, // maxTimeMS - boost::none, // term - boost::none); // lastKnownCommittedOpTime + GetMoreRequest getMoreRequest( + qr->nss(), + cursorResponse.getCursorId(), + qr->getBatchSize() + ? boost::optional<std::int64_t>(static_cast<std::int64_t>(*qr->getBatchSize())) + : boost::none, + boost::none, // maxTimeMS + boost::none, // term + boost::none); // lastKnownCommittedOpTime BSONObj getMoreCommandResult; uassert(ErrorCodes::CommandFailed, str::stream() << "getMore command failed; reply was: " << getMoreCommandResult, diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript index b7dd19b3953..133ee826998 100644 --- a/src/mongo/util/net/SConscript +++ b/src/mongo/util/net/SConscript @@ -12,6 +12,7 @@ env.Library( "hostandport.cpp", "hostname_canonicalization.cpp", "sockaddr.cpp", + env.Idlc('hostandport.idl')[0], ], LIBDEPS=[ '$BUILD_DIR/mongo/base', diff --git a/src/mongo/util/net/hostandport.h b/src/mongo/util/net/hostandport.h index 540f57f3b46..da0af575bc3 100644 --- a/src/mongo/util/net/hostandport.h +++ b/src/mongo/util/net/hostandport.h @@ -54,12 +54,19 @@ class StatusWith; */ struct HostAndPort { /** - * Parses "text" to produce a HostAndPort. Returns either that or an error - * status describing the parse failure. + * Parses "text" to produce a HostAndPort. Returns either that or an error status describing + * the parse failure. */ static StatusWith<HostAndPort> parse(StringData text); /** + * A version of 'parse' that throws a UserException if a parsing error is encountered. + */ + static HostAndPort parseThrowing(StringData text) { + return uassertStatusOK(parse(text)); + } + + /** * Construct an empty/invalid HostAndPort. */ HostAndPort(); diff --git a/src/mongo/util/net/hostandport.idl b/src/mongo/util/net/hostandport.idl new file mode 100644 index 00000000000..6edda963f6e --- /dev/null +++ b/src/mongo/util/net/hostandport.idl @@ -0,0 +1,38 @@ +# Copyright (C) 2018 MongoDB Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the GNU Affero General Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. + +global: + cpp_namespace: "mongo" + cpp_includes: + - "mongo/util/net/hostandport.h" +types: + HostAndPort: + bson_serialization_type: string + description: A string representing a host and port of a mongod or mongos process. + cpp_type: HostAndPort + serializer: HostAndPort::toString + deserializer: HostAndPort::parseThrowing + |