diff options
author | James Wahlin <james.wahlin@10gen.com> | 2016-07-25 16:56:22 -0400 |
---|---|---|
committer | James Wahlin <james.wahlin@10gen.com> | 2016-07-29 15:36:53 -0400 |
commit | dc7f50c520c5129709008568241274cb6d5ec231 (patch) | |
tree | cd38158bf08d17566e706eeb3eb4202d3dfc1044 /src | |
parent | d305e618162d37ccc16cf574fcc0388a1160af93 (diff) | |
download | mongo-dc7f50c520c5129709008568241274cb6d5ec231.tar.gz |
SERVER-24762 Support for views on sharded collections
Diffstat (limited to 'src')
52 files changed, 1243 insertions, 365 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index c2c63c78aab..0871fe48a1f 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -167,7 +167,7 @@ error_code("ViewDepthLimitExceeded", 165) error_code("CommandNotSupportedOnView", 166) error_code("OptionNotSupportedOnView", 167) error_code("InvalidPipelineOperator", 168) -error_code("CommandOnShardedViewNotSupportedOnMongos", 169) +error_code("CommandOnShardedViewNotSupportedOnMongod", 169) error_code("TooManyMatchingDocuments", 170) error_code("CannotIndexParallelArrays", 171) error_code("TransportSessionNotFound", 172) diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 87c472dc519..2f53565f93c 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -66,6 +66,7 @@ env.Library( '$BUILD_DIR/mongo/db/startup_warnings_common', '$BUILD_DIR/mongo/db/stats/counters', '$BUILD_DIR/mongo/db/stats/timer_stats', + '$BUILD_DIR/mongo/db/views/views', '$BUILD_DIR/mongo/logger/parse_log_component_settings', '$BUILD_DIR/mongo/s/client/sharding_client', '$BUILD_DIR/mongo/s/coreshard', diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp index adcab164b79..3204900e26f 100644 --- a/src/mongo/db/commands/count_cmd.cpp +++ b/src/mongo/db/commands/count_cmd.cpp @@ -36,13 +36,13 @@ #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/count.h" -#include "mongo/db/query/cursor_response.h" #include "mongo/db/query/explain.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/view_response_formatter.h" #include "mongo/db/range_preserver.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/views/resolved_view.h" #include "mongo/util/log.h" namespace mongo { @@ -115,6 +115,7 @@ public: // Acquire the db read lock. AutoGetCollectionOrViewForRead ctx(txn, request.getValue().getNs()); Collection* collection = ctx.getCollection(); + if (ctx.getView()) { ctx.releaseLocksForView(); @@ -162,6 +163,7 @@ public: AutoGetCollectionOrViewForRead ctx(txn, request.getValue().getNs()); Collection* collection = ctx.getCollection(); + if (ctx.getView()) { ctx.releaseLocksForView(); @@ -174,6 +176,11 @@ public: (void)Command::findCommand("aggregate") ->run(txn, dbname, viewAggregation.getValue(), options, errmsg, aggResult); + if (ResolvedView::isResolvedViewErrorResponse(aggResult.asTempObj())) { + result.appendElements(aggResult.obj()); + return false; + } + ViewResponseFormatter formatter(aggResult.obj()); Status formatStatus = formatter.appendAsCountResponse(&result); if (!formatStatus.isOK()) { diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp index a10e7cb3b1f..e571bfee3d3 100644 --- a/src/mongo/db/commands/distinct.cpp +++ b/src/mongo/db/commands/distinct.cpp @@ -30,6 +30,8 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery +#include "mongo/platform/basic.h" + #include <string> #include <vector> @@ -56,6 +58,7 @@ #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_planner_common.h" #include "mongo/db/query/view_response_formatter.h" +#include "mongo/db/views/resolved_view.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" @@ -178,6 +181,11 @@ public: (void)Command::findCommand("aggregate") ->run(txn, dbname, viewAggregation.getValue(), options, errmsg, aggResult); + if (ResolvedView::isResolvedViewErrorResponse(aggResult.asTempObj())) { + result.appendElements(aggResult.obj()); + return false; + } + ViewResponseFormatter formatter(aggResult.obj()); Status formatStatus = formatter.appendAsDistinctResponse(&result); if (!formatStatus.isOK()) { diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 64db549e8a7..34f7f5b45bc 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -59,6 +59,7 @@ #include "mongo/db/storage/storage_options.h" #include "mongo/db/views/view.h" #include "mongo/db/views/view_catalog.h" +#include "mongo/db/views/view_sharding_check.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" @@ -283,7 +284,18 @@ public: // recursively calling run, which will re-acquire locks on the underlying collection. // (The lock must be released because recursively acquiring locks on the database will // prohibit yielding.) - if (ctx.getView()) { + if (auto view = ctx.getView()) { + auto viewDefinition = + ViewShardingCheck::getResolvedViewIfSharded(txn, ctx.getDb(), view); + if (!viewDefinition.isOK()) { + return appendCommandStatus(result, viewDefinition.getStatus()); + } + + if (!viewDefinition.getValue().isEmpty()) { + ViewShardingCheck::appendShardedViewStatus(viewDefinition.getValue(), &result); + return false; + } + auto resolvedView = ctx.getDb()->getViewCatalog()->resolveView(txn, nss); if (!resolvedView.isOK()) { return appendCommandStatus(result, resolvedView.getStatus()); @@ -293,10 +305,13 @@ public: ctx.releaseLocksForView(); // Parse the resolved view into a new aggregation request. - BSONObj viewCmd = + auto viewCmd = resolvedView.getValue().asExpandedViewAggregation(request.getValue()); + if (!viewCmd.isOK()) { + return appendCommandStatus(result, viewCmd.getStatus()); + } - return this->run(txn, db, viewCmd, options, errmsg, result); + return this->run(txn, db, viewCmd.getValue(), options, errmsg, result); } // If the pipeline does not have a user-specified collation, set it from the collection diff --git a/src/mongo/db/query/count_request.cpp b/src/mongo/db/query/count_request.cpp index c062e483ffa..f412065ab9c 100644 --- a/src/mongo/db/query/count_request.cpp +++ b/src/mongo/db/query/count_request.cpp @@ -149,12 +149,6 @@ StatusWith<BSONObj> CountRequest::asAggregationCommand() const { str::stream() << "Option " << kHintField << " not supported in aggregation."}; } - // TODO(SERVER-25186): Views may not override the collation of the underlying collection. - if (_collation) { - return {ErrorCodes::InvalidPipelineOperator, - str::stream() << kCollationField << " not supported on a view."}; - } - BSONObjBuilder aggregationBuilder; aggregationBuilder.append("aggregate", _nss.coll()); @@ -176,6 +170,7 @@ StatusWith<BSONObj> CountRequest::asAggregationCommand() const { limitBuilder.append("$limit", *_limit); limitBuilder.doneFast(); } + BSONObjBuilder countBuilder(pipelineBuilder.subobjStart()); countBuilder.append("$count", "count"); countBuilder.doneFast(); @@ -186,6 +181,10 @@ StatusWith<BSONObj> CountRequest::asAggregationCommand() const { aggregationBuilder.append(kExplainField, _explain); } + if (_collation) { + aggregationBuilder.append(kCollationField, *_collation); + } + // The 'cursor' option is always specified so that aggregation uses the cursor interface. aggregationBuilder.append("cursor", BSONObj()); diff --git a/src/mongo/db/query/parsed_distinct.h b/src/mongo/db/query/parsed_distinct.h index f26066abd83..8e07116809b 100644 --- a/src/mongo/db/query/parsed_distinct.h +++ b/src/mongo/db/query/parsed_distinct.h @@ -32,11 +32,11 @@ #include <string> #include "mongo/base/status_with.h" +#include "mongo/db/query/canonical_query.h" namespace mongo { class BSONObj; -class CanonicalQuery; class ExtensionsCallback; class NamespaceString; class OperationContext; diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 47e66ca914b..018d3b2cdcf 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -203,6 +203,18 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* txn) { } } +bool CollectionShardingState::collectionIsSharded() { + auto metadata = getMetadata().getMetadata(); + if (metadata && (metadata->getCollVersion().isStrictlyEqualTo(ChunkVersion::UNSHARDED()))) { + return false; + } + + // If 'metadata' is null, then the shard doesn't know if this collection is sharded or not. In + // this scenario we will assume this collection is sharded. We will know sharding state + // definitively once SERVER-24960 has been fixed. + return true; +} + bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* txn, const BSONObj& doc) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 40ce2829e53..509893c8b33 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -130,6 +130,12 @@ public: */ void checkShardVersionOrThrow(OperationContext* txn); + /** + * Returns whether this collection is sharded. Valid only if mongoD is primary. + * TODO SERVER-24960: This method may return a false positive until SERVER-24960 is fixed. + */ + bool collectionIsSharded(); + // Replication subsystem hooks. If this collection is serving as a source for migration, these // methods inform it of any changes to its contents. diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp index aa0ed124df2..5a0a8afd8c1 100644 --- a/src/mongo/db/s/set_shard_version_command.cpp +++ b/src/mongo/db/s/set_shard_version_command.cpp @@ -183,9 +183,16 @@ public: connectionVersion.addToBSON(result, "oldVersion"); { - // Use a stable collection metadata while performing the checks - boost::optional<AutoGetCollection> autoColl; - autoColl.emplace(txn, nss, MODE_IS); + boost::optional<AutoGetDb> autoDb; + autoDb.emplace(txn, nss.db(), MODE_IS); + + // Views do not require a shard version check. + if (autoDb->getDb() && autoDb->getDb()->getViewCatalog()->lookup(nss.ns())) { + return true; + } + + boost::optional<Lock::CollectionLock> collLock; + collLock.emplace(txn->lockState(), nss.ns(), MODE_IS); auto css = CollectionShardingState::get(txn, nss); const ChunkVersion collectionShardVersion = @@ -248,7 +255,8 @@ public: auto critSecSignal = css->getMigrationSourceManager()->getMigrationCriticalSectionSignal(); if (critSecSignal) { - autoColl.reset(); + collLock.reset(); + autoDb.reset(); log() << "waiting till out of critical section"; critSecSignal->waitFor(txn, Seconds(10)); } @@ -270,7 +278,8 @@ public: auto critSecSignal = css->getMigrationSourceManager()->getMigrationCriticalSectionSignal(); if (critSecSignal) { - autoColl.reset(); + collLock.reset(); + autoDb.reset(); log() << "waiting till out of critical section"; critSecSignal->waitFor(txn, Seconds(10)); } diff --git a/src/mongo/db/views/SConscript b/src/mongo/db/views/SConscript index 0a63322240c..ee25270984c 100644 --- a/src/mongo/db/views/SConscript +++ b/src/mongo/db/views/SConscript @@ -6,11 +6,13 @@ env.Library( target='views_mongod', source=[ 'durable_view_catalog.cpp', + 'view_sharding_check.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/dbhelpers', '$BUILD_DIR/mongo/db/views/views', '$BUILD_DIR/mongo/db/catalog/catalog', + '$BUILD_DIR/mongo/db/s/sharding', ] ) @@ -19,10 +21,12 @@ env.Library( source=[ 'view.cpp', 'view_catalog.cpp', + 'resolved_view.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/server_parameters', - '$BUILD_DIR/mongo/db/pipeline/aggregation' + '$BUILD_DIR/mongo/db/pipeline/aggregation', + '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', ] ) diff --git a/src/mongo/db/views/resolved_view.cpp b/src/mongo/db/views/resolved_view.cpp new file mode 100644 index 00000000000..c1fef5c2370 --- /dev/null +++ b/src/mongo/db/views/resolved_view.cpp @@ -0,0 +1,116 @@ +/** +* Copyright (C) 2016 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#include "mongo/platform/basic.h" + +#include "mongo/db/views/resolved_view.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/pipeline/aggregation_request.h" +#include "mongo/rpc/get_status_from_command_result.h" + +namespace mongo { + +bool ResolvedView::isResolvedViewErrorResponse(BSONObj commandResponseObj) { + auto status = getStatusFromCommandResult(commandResponseObj); + return ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == status; +} + +ResolvedView ResolvedView::fromBSON(BSONObj commandResponseObj) { + uassert(40248, + "command response expected to have a 'resolvedView' field", + commandResponseObj.hasField("resolvedView")); + + auto viewDef = commandResponseObj.getObjectField("resolvedView"); + uassert(40249, "resolvedView must be an object", !viewDef.isEmpty()); + + uassert(40250, + "View definition must have 'ns' field of type string", + viewDef.hasField("ns") && viewDef.getField("ns").type() == BSONType::String); + + uassert(40251, + "View definition must have 'pipeline' field of type array", + viewDef.hasField("pipeline") && viewDef.getField("pipeline").type() == BSONType::Array); + + std::vector<BSONObj> pipeline; + for (auto&& item : viewDef["pipeline"].Obj()) { + pipeline.push_back(item.Obj().getOwned()); + } + + return {ResolvedView(NamespaceString(viewDef["ns"].valueStringData()), pipeline)}; +} + +StatusWith<BSONObj> ResolvedView::asExpandedViewAggregation( + const AggregationRequest& request) const { + if (!request.getCollation().isEmpty()) { + // TODO(SERVER-25186): Views may not override the collation of the underlying collection. + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "collation not supported on a view."}; + } + + BSONObjBuilder aggregationBuilder; + // Perform the aggregation on the resolved namespace. + aggregationBuilder.append("aggregate", _namespace.coll()); + + // The new pipeline consists of two parts: first, 'pipeline' in this ResolvedView; + // then, the pipeline in 'request'. + BSONArrayBuilder pipelineBuilder(aggregationBuilder.subarrayStart("pipeline")); + for (auto&& item : _pipeline) { + pipelineBuilder.append(item); + } + + for (auto&& item : request.getPipeline()) { + pipelineBuilder.append(item); + } + pipelineBuilder.doneFast(); + + // The cursor option is always specified regardless of the presence of batchSize. + if (request.getBatchSize()) { + BSONObjBuilder batchSizeBuilder(aggregationBuilder.subobjStart("cursor")); + batchSizeBuilder.append(AggregationRequest::kBatchSizeName, *request.getBatchSize()); + batchSizeBuilder.doneFast(); + } else { + aggregationBuilder.append("cursor", BSONObj()); + } + + if (request.isExplain()) + aggregationBuilder.append("explain", true); + + return aggregationBuilder.obj(); +} + +StatusWith<BSONObj> ResolvedView::asExpandedViewAggregation(const BSONObj& aggCommand) const { + auto aggRequest = AggregationRequest::parseFromBSON(_namespace, aggCommand); + if (!aggRequest.isOK()) { + return aggRequest.getStatus(); + } + + return asExpandedViewAggregation(aggRequest.getValue()); +} + +} // namespace mongo diff --git a/src/mongo/db/views/resolved_view.h b/src/mongo/db/views/resolved_view.h new file mode 100644 index 00000000000..93ad3a7e709 --- /dev/null +++ b/src/mongo/db/views/resolved_view.h @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/namespace_string.h" + +namespace mongo { + +class AggregationRequest; + +/** + * Represents a resolved definition, composed of a base collection namespace and a pipeline + * built from one or more views. + */ +class ResolvedView { +public: + ResolvedView(const NamespaceString& collectionNs, const std::vector<BSONObj>& pipeline) + : _namespace(std::move(collectionNs)), _pipeline(std::move(pipeline)) {} + + /** + * Returns whether 'commandResponseObj' contains a CommandOnShardedViewNotSupportedOnMongod + * error and resolved view definition. + */ + static bool isResolvedViewErrorResponse(BSONObj commandResponseObj); + + static ResolvedView fromBSON(BSONObj commandResponseObj); + + /** + * Convert an aggregation command on a view to the equivalent command against the views + * underlying collection. + */ + StatusWith<BSONObj> asExpandedViewAggregation(const BSONObj& aggCommand) const; + StatusWith<BSONObj> asExpandedViewAggregation(const AggregationRequest& aggRequest) const; + + const NamespaceString& getNamespace() const { + return _namespace; + } + + const std::vector<BSONObj>& getPipeline() const { + return _pipeline; + } + +private: + NamespaceString _namespace; + std::vector<BSONObj> _pipeline; +}; + +} // namespace mongo diff --git a/src/mongo/db/views/view_catalog.cpp b/src/mongo/db/views/view_catalog.cpp index 2f1ea88a1d1..fd738cb2f6d 100644 --- a/src/mongo/db/views/view_catalog.cpp +++ b/src/mongo/db/views/view_catalog.cpp @@ -36,13 +36,8 @@ #include "mongo/base/status_with.h" #include "mongo/base/string_data.h" -#include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/collection_options.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" -#include "mongo/db/pipeline/aggregation_request.h" -#include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/pipeline.h" #include "mongo/db/server_parameters.h" #include "mongo/db/storage/recovery_unit.h" #include "mongo/db/views/view.h" @@ -57,39 +52,6 @@ ExportedServerParameter<bool, ServerParameterType::kStartupOnly> enableViewsPara const std::uint32_t ViewCatalog::kMaxViewDepth = 20; -BSONObj ResolvedViewDefinition::asExpandedViewAggregation(const AggregationRequest& request) { - BSONObjBuilder aggregationBuilder; - - // Perform the aggregation on the resolved namespace. - aggregationBuilder.append("aggregate", collectionNss.coll()); - - // The new pipeline consists of two parts: first, 'pipeline' in this ResolvedViewDefinition; - // then, the pipeline in 'request'. - BSONArrayBuilder pipelineBuilder(aggregationBuilder.subarrayStart("pipeline")); - for (auto&& item : pipeline) { - pipelineBuilder.append(item); - } - - for (auto&& item : request.getPipeline()) { - pipelineBuilder.append(item); - } - pipelineBuilder.doneFast(); - - // The cursor option is always specified regardless of the presence of batchSize. - if (request.getBatchSize()) { - BSONObjBuilder batchSizeBuilder(aggregationBuilder.subobjStart("cursor")); - batchSizeBuilder.append(AggregationRequest::kBatchSizeName, *request.getBatchSize()); - batchSizeBuilder.doneFast(); - } else { - aggregationBuilder.append("cursor", BSONObj()); - } - - if (request.isExplain()) - aggregationBuilder.append("explain", true); - - return aggregationBuilder.obj(); -} - ViewCatalog::ViewCatalog(OperationContext* txn, DurableViewCatalog* durable) : _durable(durable) { durable->iterate(txn, [&](const BSONObj& view) { NamespaceString viewName(view["_id"].str()); @@ -149,15 +111,15 @@ ViewDefinition* ViewCatalog::lookup(StringData ns) { return nullptr; } -StatusWith<ResolvedViewDefinition> ViewCatalog::resolveView(OperationContext* txn, - const NamespaceString& nss) { +StatusWith<ResolvedView> ViewCatalog::resolveView(OperationContext* txn, + const NamespaceString& nss) { const NamespaceString* resolvedNss = &nss; std::vector<BSONObj> resolvedPipeline; for (std::uint32_t i = 0; i < ViewCatalog::kMaxViewDepth; i++) { ViewDefinition* view = lookup(resolvedNss->ns()); if (!view) - return StatusWith<ResolvedViewDefinition>({*resolvedNss, resolvedPipeline}); + return StatusWith<ResolvedView>({*resolvedNss, resolvedPipeline}); resolvedNss = &(view->viewOn()); diff --git a/src/mongo/db/views/view_catalog.h b/src/mongo/db/views/view_catalog.h index 91c9f694d48..c3cbb505f99 100644 --- a/src/mongo/db/views/view_catalog.h +++ b/src/mongo/db/views/view_catalog.h @@ -39,30 +39,14 @@ #include "mongo/base/string_data.h" #include "mongo/db/namespace_string.h" #include "mongo/db/views/durable_view_catalog.h" +#include "mongo/db/views/resolved_view.h" #include "mongo/db/views/view.h" #include "mongo/util/string_map.h" namespace mongo { -class AggregationRequest; -class Database; class OperationContext; /** - * Represents a fully-resolved view: a non-view namespace with a corresponding aggregation pipeline. - */ -struct ResolvedViewDefinition { - /** - * Creates a new aggregation command object for a view operation. The new command is an - * aggregation on 'collectionNss', and its pipeline is the concatenation of 'pipeline' with the - * pipeline of 'request'. - */ - BSONObj asExpandedViewAggregation(const AggregationRequest& request); - - NamespaceString collectionNss; - std::vector<BSONObj> pipeline; -}; - -/** * In-memory data structure for view definitions. Note that this structure is not thread-safe; you * must be holding a database lock to access a database's view catalog. */ @@ -121,8 +105,7 @@ public: * * It is illegal to call this function on a namespace that is not a view. */ - StatusWith<ResolvedViewDefinition> resolveView(OperationContext* txn, - const NamespaceString& nss); + StatusWith<ResolvedView> resolveView(OperationContext* txn, const NamespaceString& nss); private: ViewMap _viewMap; diff --git a/src/mongo/db/views/view_sharding_check.cpp b/src/mongo/db/views/view_sharding_check.cpp new file mode 100644 index 00000000000..cc662ff2549 --- /dev/null +++ b/src/mongo/db/views/view_sharding_check.cpp @@ -0,0 +1,97 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/views/view_sharding_check.h" + +#include "mongo/db/catalog/database.h" +#include "mongo/db/commands.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/server_options.h" +#include "mongo/db/views/view_catalog.h" +#include "mongo/s/stale_exception.h" + +namespace mongo { + +StatusWith<BSONObj> ViewShardingCheck::getResolvedViewIfSharded(OperationContext* opCtx, + Database* db, + const ViewDefinition* view) { + invariant(opCtx); + invariant(db); + invariant(view); + + if (ClusterRole::ShardServer != serverGlobalParams.clusterRole) { + // This node is not part of a sharded cluster, so the collection cannot be sharded. + return BSONObj(); + } + + auto resolvedView = db->getViewCatalog()->resolveView(opCtx, view->name()); + if (!resolvedView.isOK()) { + return resolvedView.getStatus(); + } + + const auto& sourceNss = resolvedView.getValue().getNamespace(); + const auto isPrimary = + repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext()) + ->canAcceptWritesForDatabase(db->name()); + + if (isPrimary && !collectionIsSharded(opCtx, sourceNss)) { + return BSONObj(); + } + + BSONObjBuilder viewDetailBob; + viewDetailBob.append("ns", sourceNss.ns()); + viewDetailBob.append("pipeline", resolvedView.getValue().getPipeline()); + + return viewDetailBob.obj(); +} + +void ViewShardingCheck::appendShardedViewStatus(const BSONObj& resolvedView, BSONObjBuilder* out) { + invariant(out); + invariant(!resolvedView.isEmpty()); + + out->append("resolvedView", resolvedView); + Status status{ErrorCodes::CommandOnShardedViewNotSupportedOnMongod, + str::stream() << "Command on view must be executed by mongos"}; + Command::appendCommandStatus(*out, status); +} + +bool ViewShardingCheck::collectionIsSharded(OperationContext* opCtx, const NamespaceString& nss) { + // The database is locked at this point but the collection underlying the given view is not + // and must be for a sharding check. + dassert(opCtx->lockState()->isDbLockedForMode(nss.db(), MODE_IS)); + AutoGetCollection autoGetCol(opCtx, nss, MODE_IS); + return CollectionShardingState::get(opCtx, nss)->collectionIsSharded(); +} + +} // namespace mongo diff --git a/src/mongo/db/views/view_sharding_check.h b/src/mongo/db/views/view_sharding_check.h new file mode 100644 index 00000000000..d3437353212 --- /dev/null +++ b/src/mongo/db/views/view_sharding_check.h @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" + +#include <string> +#include <vector> + +namespace mongo { + +class Database; +class NamespaceString; +class OperationContext; +class ViewDefinition; + +/** + * When a read against a view is forwarded from mongoS, it is done so without any awareness as to + * whether the underlying collection is sharded. If it is found that the underlying collection is + * sharded(*) we return an error to mongos with the view definition requesting + * that the resolved read operation be executed there. + * + * (*) We have incomplete sharding state on secondaries. If we are a secondary, then we have to + * assume that the collection backing the view could be sharded. + */ +class ViewShardingCheck { +public: + /** + * If it is determined that a view's underlying collection may be sharded this method returns + * a BSONObj containing the resolved view definition. If the underlying collection is not + * sharded an empty BSONObj is returned. + * + * Will return an error if the ViewCatalog is unable to generate a resolved view. + */ + static StatusWith<BSONObj> getResolvedViewIfSharded(OperationContext* opCtx, + Database* db, + const ViewDefinition* view); + + /** + * Appends the resolved view definition and CommandOnShardedViewNotSupportedOnMongod status to + * 'result'. + */ + static void appendShardedViewStatus(const BSONObj& resolvedView, BSONObjBuilder* result); + +private: + /** + * Confirms whether 'ns' represents a sharded collection. Only valid if the calling + * member is primary. + */ + static bool collectionIsSharded(OperationContext* opCtx, const NamespaceString& nss); +}; + +} // namespace mongo diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index 4d46ada00e6..49667af9ee4 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -76,6 +76,7 @@ env.Library( '$BUILD_DIR/mongo/client/parallel', '$BUILD_DIR/mongo/db/pipeline/pipeline', '$BUILD_DIR/mongo/db/commands/killcursors_common', + '$BUILD_DIR/mongo/db/views/views', '$BUILD_DIR/mongo/s/cluster_ops_impl', '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/mongoscore', diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp index e2a31a05bfa..93d7f02bd69 100644 --- a/src/mongo/s/commands/cluster_count_cmd.cpp +++ b/src/mongo/s/commands/cluster_count_cmd.cpp @@ -31,6 +31,10 @@ #include <vector> #include "mongo/db/commands.h" +#include "mongo/db/query/count_request.h" +#include "mongo/db/query/view_response_formatter.h" +#include "mongo/db/views/resolved_view.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/cluster_explain.h" #include "mongo/s/commands/strategy.h" @@ -156,6 +160,40 @@ public: Strategy::commandOp( txn, dbname, countCmdBuilder.done(), options, nss.ns(), filter, &countResult); + if (countResult.size() == 1 && + ResolvedView::isResolvedViewErrorResponse(countResult[0].result)) { + auto countRequest = CountRequest::parseFromBSON(dbname, cmdObj, false); + if (!countRequest.isOK()) { + return appendCommandStatus(result, countRequest.getStatus()); + } + + auto aggCmdOnView = countRequest.getValue().asAggregationCommand(); + if (!aggCmdOnView.isOK()) { + return appendCommandStatus(result, aggCmdOnView.getStatus()); + } + + auto resolvedView = ResolvedView::fromBSON(countResult[0].result); + auto aggCmd = resolvedView.asExpandedViewAggregation(aggCmdOnView.getValue()); + if (!aggCmd.isOK()) { + return appendCommandStatus(result, aggCmd.getStatus()); + } + + + BSONObjBuilder aggResult; + Command::findCommand("aggregate") + ->run(txn, dbname, aggCmd.getValue(), options, errmsg, aggResult); + + result.resetToEmpty(); + ViewResponseFormatter formatter(aggResult.obj()); + auto formatStatus = formatter.appendAsCountResponse(&result); + if (!formatStatus.isOK()) { + return appendCommandStatus(result, formatStatus); + } + + return true; + } + + long long total = 0; BSONObjBuilder shardSubTotal(result.subobjStart("shards")); @@ -224,6 +262,33 @@ public: long long millisElapsed = timer.millis(); + if (shardResults.size() == 1 && + ResolvedView::isResolvedViewErrorResponse(shardResults[0].result)) { + auto countRequest = CountRequest::parseFromBSON(dbname, cmdObj, true); + if (!countRequest.isOK()) { + return countRequest.getStatus(); + } + + auto aggCmdOnView = countRequest.getValue().asAggregationCommand(); + if (!aggCmdOnView.isOK()) { + return aggCmdOnView.getStatus(); + } + + auto resolvedView = ResolvedView::fromBSON(shardResults[0].result); + auto aggCmd = resolvedView.asExpandedViewAggregation(aggCmdOnView.getValue()); + if (!aggCmd.isOK()) { + return aggCmd.getStatus(); + } + + std::string errMsg; + if (Command::findCommand("aggregate") + ->run(txn, dbname, aggCmd.getValue(), 0, errMsg, *out)) { + return Status::OK(); + } + + return getStatusFromCommandResult(out->asTempObj()); + } + const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResults, cmdObj); return ClusterExplain::buildExplainResult( diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp index 1bd898c2c37..9a742c979e6 100644 --- a/src/mongo/s/commands/cluster_find_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_cmd.cpp @@ -36,6 +36,8 @@ #include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/stats/counters.h" +#include "mongo/db/views/resolved_view.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/query/cluster_find.h" @@ -118,8 +120,36 @@ public: return qr.getStatus(); } - return Strategy::explainFind( + auto result = Strategy::explainFind( txn, cmdObj, *qr.getValue(), verbosity, serverSelectionMetadata, out); + + if (result == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) { + auto resolvedView = ResolvedView::fromBSON(out->asTempObj()); + out->resetToEmpty(); + + auto aggCmdOnView = qr.getValue().get()->asAggregationCommand(); + if (!aggCmdOnView.isOK()) { + return aggCmdOnView.getStatus(); + } + + auto aggCmd = resolvedView.asExpandedViewAggregation(aggCmdOnView.getValue()); + if (!aggCmd.isOK()) { + return aggCmd.getStatus(); + } + + Command* c = Command::findCommand("aggregate"); + int queryOptions = 0; + std::string errMsg; + + if (c->run(txn, dbname, aggCmd.getValue(), queryOptions, errMsg, *out)) { + return Status::OK(); + } + + BSONObj tmp = out->asTempObj(); + return getStatusFromCommandResult(out->asTempObj()); + } + + return result; } bool run(OperationContext* txn, @@ -161,8 +191,26 @@ public: // Do the work to generate the first batch of results. This blocks waiting to get responses // from the shard(s). std::vector<BSONObj> batch; - auto cursorId = ClusterFind::runQuery(txn, *cq.getValue(), readPref.getValue(), &batch); + BSONObj viewDefinition; + auto cursorId = ClusterFind::runQuery( + txn, *cq.getValue(), readPref.getValue(), &batch, &viewDefinition); if (!cursorId.isOK()) { + if (cursorId.getStatus() == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) { + auto aggCmdOnView = cq.getValue()->getQueryRequest().asAggregationCommand(); + if (!aggCmdOnView.isOK()) { + return appendCommandStatus(result, aggCmdOnView.getStatus()); + } + + auto resolvedView = ResolvedView::fromBSON(viewDefinition); + auto aggCmd = resolvedView.asExpandedViewAggregation(aggCmdOnView.getValue()); + if (!aggCmd.isOK()) { + return appendCommandStatus(result, aggCmd.getStatus()); + } + + return Command::findCommand("aggregate") + ->run(txn, dbname, aggCmd.getValue(), options, errmsg, result); + } + return appendCommandStatus(result, cursorId.getStatus()); } diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index 7b4adf1c9fe..bac39eceb29 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -43,6 +43,8 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/views/resolved_view.h" +#include "mongo/db/views/view.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/platform/random.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -115,7 +117,7 @@ public: shared_ptr<DBConfig> conf = status.getValue(); if (!conf->isShardingEnabled()) { - return aggPassthrough(txn, conf, cmdObj, result, options); + return aggPassthrough(txn, dbname, conf, cmdObj, result, options, errmsg); } auto request = AggregationRequest::parseFromBSON(NamespaceString(fullns), cmdObj); @@ -140,7 +142,7 @@ public: } if (!conf->isSharded(fullns)) { - return aggPassthrough(txn, conf, cmdObj, result, options); + return aggPassthrough(txn, dbname, conf, cmdObj, result, options, errmsg); } // If the first $match stage is an exact match on the shard key, we only have to send it @@ -287,10 +289,12 @@ private: BSONObj aggRunCommand(DBClientBase* conn, const string& db, BSONObj cmd, int queryOptions); bool aggPassthrough(OperationContext* txn, + const std::string& dbname, shared_ptr<DBConfig> conf, BSONObj cmd, BSONObjBuilder& result, - int queryOptions); + int queryOptions, + std::string& errmsg); } clusterPipelineCmd; std::vector<DocumentSourceMergeCursors::CursorDescriptor> PipelineCommand::parseCursors( @@ -432,14 +436,16 @@ BSONObj PipelineCommand::aggRunCommand(DBClientBase* conn, } bool PipelineCommand::aggPassthrough(OperationContext* txn, + const std::string& dbname, shared_ptr<DBConfig> conf, - BSONObj cmd, + BSONObj cmdObj, BSONObjBuilder& out, - int queryOptions) { + int queryOptions, + std::string& errmsg) { // Temporary hack. See comment on declaration for details. const auto shard = grid.shardRegistry()->getShard(txn, conf->getPrimaryId()); ShardConnection conn(shard->getConnString(), ""); - BSONObj result = aggRunCommand(conn.get(), conf->name(), cmd, queryOptions); + BSONObj result = aggRunCommand(conn.get(), conf->name(), cmdObj, queryOptions); conn.done(); // First append the properly constructed writeConcernError. It will then be skipped @@ -449,6 +455,28 @@ bool PipelineCommand::aggPassthrough(OperationContext* txn, } out.appendElementsUnique(result); + + BSONObj responseObj = out.asTempObj(); + if (ResolvedView::isResolvedViewErrorResponse(responseObj)) { + auto resolvedView = ResolvedView::fromBSON(responseObj); + + auto request = AggregationRequest::parseFromBSON(resolvedView.getNamespace(), cmdObj); + if (!request.isOK()) { + out.resetToEmpty(); + return appendCommandStatus(out, request.getStatus()); + } + + auto aggCmd = resolvedView.asExpandedViewAggregation(request.getValue()); + if (!aggCmd.isOK()) { + out.resetToEmpty(); + return appendCommandStatus(out, aggCmd.getStatus()); + } + + out.resetToEmpty(); + return Command::findCommand("aggregate") + ->run(txn, dbname, aggCmd.getValue(), queryOptions, errmsg, out); + } + return result["ok"].trueValue(); } diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index 098aa63376c..09b94d8f0ae 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -42,6 +42,10 @@ #include "mongo/db/commands/copydb.h" #include "mongo/db/commands/rename_collection.h" #include "mongo/db/lasterror.h" +#include "mongo/db/matcher/extensions_callback_noop.h" +#include "mongo/db/query/parsed_distinct.h" +#include "mongo/db/query/view_response_formatter.h" +#include "mongo/db/views/resolved_view.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/catalog_cache.h" @@ -1124,7 +1128,45 @@ public: shared_ptr<DBConfig> conf = status.getValue(); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { - return passthrough(txn, conf.get(), cmdObj, options, result); + + if (passthrough(txn, conf.get(), cmdObj, options, result)) { + return true; + } + + BSONObj resultObj = result.asTempObj(); + if (ResolvedView::isResolvedViewErrorResponse(resultObj)) { + auto resolvedView = ResolvedView::fromBSON(resultObj); + result.resetToEmpty(); + + auto parsedDistinct = ParsedDistinct::parse( + txn, resolvedView.getNamespace(), cmdObj, ExtensionsCallbackNoop(), false); + if (!parsedDistinct.isOK()) { + return appendCommandStatus(result, parsedDistinct.getStatus()); + } + + auto aggCmdOnView = parsedDistinct.getValue().asAggregationCommand(); + if (!aggCmdOnView.isOK()) { + return appendCommandStatus(result, aggCmdOnView.getStatus()); + } + + auto aggCmd = resolvedView.asExpandedViewAggregation(aggCmdOnView.getValue()); + if (!aggCmd.isOK()) { + return appendCommandStatus(result, aggCmd.getStatus()); + } + + BSONObjBuilder aggResult; + Command::findCommand("aggregate") + ->run(txn, dbName, aggCmd.getValue(), options, errmsg, aggResult); + + ViewResponseFormatter formatter(aggResult.obj()); + auto formatStatus = formatter.appendAsDistinctResponse(&result); + if (!formatStatus.isOK()) { + return appendCommandStatus(result, formatStatus); + } + return true; + } + + return false; } shared_ptr<ChunkManager> cm = conf->getChunkManager(txn, fullns); @@ -1210,6 +1252,34 @@ public: long long millisElapsed = timer.millis(); + if (shardResults.size() == 1 && + ResolvedView::isResolvedViewErrorResponse(shardResults[0].result)) { + auto resolvedView = ResolvedView::fromBSON(shardResults[0].result); + auto parsedDistinct = ParsedDistinct::parse( + txn, resolvedView.getNamespace(), cmdObj, ExtensionsCallbackNoop(), true); + if (!parsedDistinct.isOK()) { + return parsedDistinct.getStatus(); + } + + auto aggCmdOnView = parsedDistinct.getValue().asAggregationCommand(); + if (!aggCmdOnView.isOK()) { + return aggCmdOnView.getStatus(); + } + + auto aggCmd = resolvedView.asExpandedViewAggregation(aggCmdOnView.getValue()); + if (!aggCmd.isOK()) { + return aggCmd.getStatus(); + } + + std::string errMsg; + if (Command::findCommand("aggregate") + ->run(txn, dbname, aggCmd.getValue(), 0, errMsg, *out)) { + return Status::OK(); + } + + return getStatusFromCommandResult(out->asTempObj()); + } + const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResults, cmdObj); return ClusterExplain::buildExplainResult( diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 4bfccd4ce2c..d121c36346d 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -52,6 +52,8 @@ #include "mongo/db/query/query_request.h" #include "mongo/db/server_parameters.h" #include "mongo/db/stats/counters.h" +#include "mongo/db/views/resolved_view.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/s/bson_serializable.h" #include "mongo/s/catalog/catalog_cache.h" @@ -192,7 +194,18 @@ void Strategy::queryOp(OperationContext* txn, Request& request) { // 0 means the cursor is exhausted. Otherwise we assume that a cursor with the returned id can // be retrieved via the ClusterCursorManager. - auto cursorId = ClusterFind::runQuery(txn, *canonicalQuery.getValue(), readPreference, &batch); + auto cursorId = + ClusterFind::runQuery(txn, + *canonicalQuery.getValue(), + readPreference, + &batch, + nullptr /*Argument is for views which OP_QUERY doesn't support*/); + + if (!cursorId.isOK() && + cursorId.getStatus() == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) { + uasserted(40247, "OP_QUERY not supported on views"); + } + uassertStatusOK(cursorId.getStatus()); // Fill out the response buffer. @@ -530,6 +543,12 @@ Status Strategy::explainFind(OperationContext* txn, long long millisElapsed = timer.millis(); + if (shardResults.size() == 1 && + ResolvedView::isResolvedViewErrorResponse(shardResults[0].result)) { + out->append("resolvedView", shardResults[0].result.getObjectField("resolvedView")); + return getStatusFromCommandResult(shardResults[0].result); + } + const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResults, findCommand); return ClusterExplain::buildExplainResult( diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 2d1919769ad..31d9337b10f 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -182,7 +182,7 @@ bool AsyncResultsMerger::readyUnsorted_inlock() { return allExhausted; } -StatusWith<boost::optional<BSONObj>> AsyncResultsMerger::nextReady() { +StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() { stdx::lock_guard<stdx::mutex> lk(_mutex); dassert(ready_inlock()); if (_lifecycleState != kAlive) { @@ -195,19 +195,19 @@ StatusWith<boost::optional<BSONObj>> AsyncResultsMerger::nextReady() { if (_eofNext) { _eofNext = false; - return {boost::none}; + return {ClusterQueryResult()}; } const bool hasSort = !_params.sort.isEmpty(); return hasSort ? nextReadySorted() : nextReadyUnsorted(); } -boost::optional<BSONObj> AsyncResultsMerger::nextReadySorted() { +ClusterQueryResult AsyncResultsMerger::nextReadySorted() { // Tailable cursors cannot have a sort. invariant(!_params.isTailable); if (_mergeQueue.empty()) { - return boost::none; + return {}; } size_t smallestRemote = _mergeQueue.top(); @@ -216,7 +216,7 @@ boost::optional<BSONObj> AsyncResultsMerger::nextReadySorted() { invariant(!_remotes[smallestRemote].docBuffer.empty()); invariant(_remotes[smallestRemote].status.isOK()); - BSONObj front = _remotes[smallestRemote].docBuffer.front(); + ClusterQueryResult front = _remotes[smallestRemote].docBuffer.front(); _remotes[smallestRemote].docBuffer.pop(); // Re-populate the merging queue with the next result from 'smallestRemote', if it has a @@ -228,14 +228,14 @@ boost::optional<BSONObj> AsyncResultsMerger::nextReadySorted() { return front; } -boost::optional<BSONObj> AsyncResultsMerger::nextReadyUnsorted() { +ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() { size_t remotesAttempted = 0; while (remotesAttempted < _remotes.size()) { // It is illegal to call this method if there is an error received from any shard. invariant(_remotes[_gettingFromRemote].status.isOK()); if (_remotes[_gettingFromRemote].hasNext()) { - BSONObj front = _remotes[_gettingFromRemote].docBuffer.front(); + ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front(); _remotes[_gettingFromRemote].docBuffer.pop(); if (_params.isTailable && !_remotes[_gettingFromRemote].hasNext()) { @@ -255,7 +255,7 @@ boost::optional<BSONObj> AsyncResultsMerger::nextReadyUnsorted() { } } - return boost::none; + return {}; } Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { @@ -436,6 +436,38 @@ void AsyncResultsMerger::handleBatchResponse( : cbData.response.getStatus()); if (!cursorResponseStatus.isOK()) { + // In the case a read is performed against a view, the shard primary can return an error + // indicating that the underlying collection may be sharded. When this occurs the return + // message will include an expanded view definition and collection namespace which we need + // to store. This allows for a second attempt at the read directly against the underlying + // collection. + if (cursorResponseStatus.getStatus() == + ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) { + auto& responseObj = cbData.response.getValue().data; + if (!responseObj.hasField("resolvedView")) { + remote.status = Status(ErrorCodes::InternalError, + str::stream() << "Missing field 'resolvedView' in document: " + << responseObj); + return; + } + + auto resolvedViewObj = responseObj.getObjectField("resolvedView"); + if (resolvedViewObj.isEmpty()) { + remote.status = Status(ErrorCodes::InternalError, + str::stream() << "Field 'resolvedView' must be an object: " + << responseObj); + return; + } + + ClusterQueryResult result; + result.setViewDefinition(resolvedViewObj.getOwned()); + + remote.docBuffer.push(result); + remote.cursorId = 0; + remote.status = Status::OK(); + return; + } + auto shard = remote.getShard(); if (!shard) { remote.status = Status(cursorResponseStatus.getStatus().code(), @@ -479,7 +511,7 @@ void AsyncResultsMerger::handleBatchResponse( remote.status = Status::OK(); // Clear the results buffer and cursor id. - std::queue<BSONObj> emptyBuffer; + std::queue<ClusterQueryResult> emptyBuffer; std::swap(remote.docBuffer, emptyBuffer); remote.cursorId = 0; } @@ -504,7 +536,8 @@ void AsyncResultsMerger::handleBatchResponse( return; } - remote.docBuffer.push(obj); + ClusterQueryResult result(obj); + remote.docBuffer.push(result); ++remote.fetchedCount; } @@ -677,11 +710,11 @@ std::shared_ptr<Shard> AsyncResultsMerger::RemoteCursorData::getShard() { // bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const size_t& rhs) { - const BSONObj& leftDoc = _remotes[lhs].docBuffer.front(); - const BSONObj& rightDoc = _remotes[rhs].docBuffer.front(); + const ClusterQueryResult& leftDoc = _remotes[lhs].docBuffer.front(); + const ClusterQueryResult& rightDoc = _remotes[rhs].docBuffer.front(); - BSONObj leftDocKey = leftDoc[ClusterClientCursorParams::kSortKeyField].Obj(); - BSONObj rightDocKey = rightDoc[ClusterClientCursorParams::kSortKeyField].Obj(); + BSONObj leftDocKey = (*leftDoc.getResult())[ClusterClientCursorParams::kSortKeyField].Obj(); + BSONObj rightDocKey = (*rightDoc.getResult())[ClusterClientCursorParams::kSortKeyField].Obj(); // This does not need to sort with a collator, since mongod has already mapped strings to their // ICU comparison keys as part of the $sortKey meta projection. diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 8956dca52c8..85a3fbd30ed 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -38,6 +38,7 @@ #include "mongo/db/cursor_id.h" #include "mongo/executor/task_executor.h" #include "mongo/s/query/cluster_client_cursor_params.h" +#include "mongo/s/query/cluster_query_result.h" #include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -128,10 +129,10 @@ public: * status. * * If this AsyncResultsMerger is fetching results from a remote cursor tailing a capped - * collection, may return boost::none before end-of-stream. (Tailable cursors remain open even - * when there are no further results, and may subsequently return more results when they become - * available.) The calling code is responsible for handling multiple boost::none return values, - * keeping the cursor open in the tailable case. + * collection, may return an empty ClusterQueryResult before end-of-stream. (Tailable cursors + * remain open even when there are no further results, and may subsequently return more results + * when they become available.) The calling code is responsible for handling multiple empty, + * ClusterQueryResult return values, keeping the cursor open in the tailable case. * * If there has been an error received from one of the shards, or there is an error in * processing results from a shard, then a non-ok status is returned. @@ -139,7 +140,7 @@ public: * Invalid to call unless ready() has returned true (i.e., invalid to call if getting the next * result requires scheduling remote work). */ - StatusWith<boost::optional<BSONObj>> nextReady(); + StatusWith<ClusterQueryResult> nextReady(); /** * Schedules remote work as required in order to make further results available. If there is an @@ -234,7 +235,7 @@ private: // established but is now exhausted, this member will be set to zero. boost::optional<CursorId> cursorId; - std::queue<BSONObj> docBuffer; + std::queue<ClusterQueryResult> docBuffer; executor::TaskExecutor::CallbackHandle cbHandle; Status status = Status::OK(); @@ -309,8 +310,8 @@ private: // Helpers for nextReady(). // - boost::optional<BSONObj> nextReadySorted(); - boost::optional<BSONObj> nextReadyUnsorted(); + ClusterQueryResult nextReadySorted(); + ClusterQueryResult nextReadyUnsorted(); /** * When nextEvent() schedules remote work, it passes this method as a callback. The TaskExecutor diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index e2957554ad2..a5179f16c68 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -141,6 +141,24 @@ protected: } /** + * Schedules a "CommandOnShardedViewNotSupportedOnMongod" error response w/ view definition. + */ + void scheduleNetworkViewResponse(const std::string& ns, const std::string& pipelineJsonArr) { + BSONObjBuilder viewDefBob; + viewDefBob.append("ns", ns); + viewDefBob.append("pipeline", fromjson(pipelineJsonArr)); + + BSONObjBuilder bob; + bob.append("resolvedView", viewDefBob.obj()); + bob.append("ok", 0.0); + bob.append("errmsg", "Command on view must be executed by mongos"); + bob.append("code", 169); + + std::vector<BSONObj> batch = {bob.obj()}; + scheduleNetworkResponseObjs(batch); + } + + /** * Schedules a list of cursor responses to be returned by the mock network. */ void scheduleNetworkResponses(std::vector<CursorResponse> responses, @@ -235,19 +253,19 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) { ASSERT_TRUE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { @@ -270,17 +288,17 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { ASSERT_FALSE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); @@ -300,13 +318,13 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { ASSERT_FALSE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 10}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 10}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); @@ -322,9 +340,9 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { ASSERT_TRUE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 11}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 11}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, ClusterFindSorted) { @@ -349,19 +367,25 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSorted) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 3, $sortKey: {'': 3}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 3, $sortKey: {'': 3}}"), + *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 4, $sortKey: {'': 4}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 4, $sortKey: {'': 4}}"), + *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 5, $sortKey: {'': 5}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 5, $sortKey: {'': 5}}"), + *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 6, $sortKey: {'': 6}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 6, $sortKey: {'': 6}}"), + *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 8, $sortKey: {'': 8}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 8, $sortKey: {'': 8}}"), + *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 9, $sortKey: {'': 9}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 9, $sortKey: {'': 9}}"), + *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { @@ -386,13 +410,13 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 3}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 3}}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 4}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 4}}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 5}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 5}}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 6}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 6}}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); @@ -407,11 +431,11 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 7}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 7}}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 7}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 7}}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 8}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 8}}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); @@ -426,13 +450,13 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 9}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 9}}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 10}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 10}}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 10}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 10}}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) { @@ -457,19 +481,25 @@ TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 10, '': 11}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 10, '': 11}}"), + *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 10, '': 12}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 10, '': 12}}"), + *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 5, '': 9}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 5, '': 9}}"), + *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 5, '': 9}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 5, '': 9}}"), + *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 4, '': 4}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 4, '': 4}}"), + *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{$sortKey: {'': 4, '': 20}}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{$sortKey: {'': 4, '': 20}}"), + *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) { @@ -528,7 +558,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); @@ -551,9 +581,40 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_TRUE(arm->ready()); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); +} + +TEST_F(AsyncResultsMergerTest, ReceivedViewDefinitionFromShard) { + BSONObj findCmd = fromjson("{find: 'testcoll'}"); + makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + std::string inputNs = "views_sharded.coll"; + std::string inputPipeline = "[ { $match: { a: { $gte: 5.0 } } } ]"; + scheduleNetworkViewResponse(inputNs, inputPipeline); + + executor()->waitForEvent(readyEvent); + ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + + auto statusWithNext = arm->nextReady(); + ASSERT(statusWithNext.isOK()); + + auto viewDef = statusWithNext.getValue().getViewDefinition(); + ASSERT(viewDef); + + auto outputPipeline = (*viewDef)["pipeline"]; + ASSERT(!outputPipeline.eoo()); + ASSERT_EQ(fromjson(inputPipeline), outputPipeline.Obj()); + + auto outputNs = (*viewDef)["ns"]; + ASSERT(!outputNs.eoo()); + ASSERT_EQ(outputNs.String(), inputNs); } TEST_F(AsyncResultsMergerTest, ExistingCursors) { @@ -573,15 +634,15 @@ TEST_F(AsyncResultsMergerTest, ExistingCursors) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT(unittest::assertGet(arm->nextReady()).isEOF()); } @@ -603,13 +664,13 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); @@ -626,9 +687,9 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); @@ -643,9 +704,9 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady()).getResult()); // Kill cursor before deleting it, as the second remote cursor has not been exhausted. We don't // wait on 'killEvent' here, as the blackholed request's callback will only run on shutdown of @@ -670,11 +731,11 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); @@ -769,11 +830,11 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); // Required to kill the 'arm' on error before destruction. auto killEvent = arm->kill(); @@ -899,9 +960,9 @@ TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) { // First batch received. ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); // This will schedule a getMore on cursor id 123. ASSERT_FALSE(arm->ready()); @@ -978,13 +1039,13 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); - // In the tailable case, we expect boost::none after every batch. + // In the tailable case, we expect EOF after every batch. ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); ASSERT_FALSE(arm->remotesExhausted()); ASSERT_FALSE(arm->ready()); @@ -1000,9 +1061,9 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { ASSERT_TRUE(arm->ready()); ASSERT_FALSE(arm->remotesExhausted()); - ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); ASSERT_FALSE(arm->remotesExhausted()); auto killedEvent = arm->kill(); @@ -1027,7 +1088,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { // After receiving an empty batch, the ARM should return boost::none, but remotes should not be // marked as exhausted. ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); ASSERT_FALSE(arm->remotesExhausted()); auto killedEvent = arm->kill(); @@ -1052,7 +1113,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { // Afterwards, the ARM should return boost::none and remote cursors should be marked as // exhausted. ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); ASSERT_TRUE(arm->remotesExhausted()); } @@ -1072,9 +1133,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); responses.clear(); @@ -1093,9 +1154,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) { @@ -1117,9 +1178,9 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, AllowPartialResults) { @@ -1145,9 +1206,9 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1166,7 +1227,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1181,7 +1242,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { @@ -1199,9 +1260,9 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1211,7 +1272,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { // EOF. scheduleErrorResponse({ErrorCodes::AuthenticationFailed, "authentication failed"}); ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkSingleNode) { @@ -1237,7 +1298,7 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkSingleNode) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); @@ -1300,7 +1361,7 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) { ASSERT_TRUE(arm->ready()); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); @@ -1321,9 +1382,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); ASSERT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); @@ -1346,9 +1407,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); - ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_TRUE(arm->ready()); - ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); } TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index 47f4e46f89a..c4bb1c9373e 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -31,6 +31,7 @@ #include <boost/optional.hpp> #include "mongo/db/jsobj.h" +#include "mongo/s/query/cluster_query_result.h" #include "mongo/util/time_support.h" namespace mongo { @@ -58,12 +59,12 @@ public: * Returns the next available result document (along with an ok status). May block waiting * for results from remote nodes. * - * If there are no further results, the end of the stream is indicated with boost::none and - * an ok status. + * If there are no further results, the end of the stream is indicated with an empty + * QueryResult and an ok status. * * A non-ok status is returned in case of any error. */ - virtual StatusWith<boost::optional<BSONObj>> next() = 0; + virtual StatusWith<ClusterQueryResult> next() = 0; /** * Must be called before destruction to abandon a not-yet-exhausted cursor. If next() has @@ -84,14 +85,15 @@ public: virtual long long getNumReturnedSoFar() const = 0; /** - * Stash the BSONObj so that it gets returned from the CCC on a later call to next(). + * Stash the ClusterQueryResult so that it gets returned from the CCC on a later call to + * next(). * * Queued documents are returned in FIFO order. The queued results are exhausted before * generating further results from the underlying mongos query stages. * * 'obj' must be owned BSON. */ - virtual void queueResult(const BSONObj& obj) = 0; + virtual void queueResult(const ClusterQueryResult& result) = 0; /** * Returns whether or not all the remote cursors underlying this cursor have been exhausted. diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index d37b78f2a58..1498164f9ac 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -72,17 +72,17 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(executor::TaskExecutor* executo ClusterClientCursorImpl::ClusterClientCursorImpl(std::unique_ptr<RouterStageMock> root) : _root(std::move(root)) {} -StatusWith<boost::optional<BSONObj>> ClusterClientCursorImpl::next() { +StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next() { // First return stashed results, if there are any. if (!_stash.empty()) { - BSONObj front = std::move(_stash.front()); + auto front = std::move(_stash.front()); _stash.pop(); ++_numReturnedSoFar; return {front}; } auto next = _root->next(); - if (next.isOK() && next.getValue()) { + if (next.isOK() && !next.getValue().isEOF()) { ++_numReturnedSoFar; } return next; @@ -100,9 +100,18 @@ long long ClusterClientCursorImpl::getNumReturnedSoFar() const { return _numReturnedSoFar; } -void ClusterClientCursorImpl::queueResult(const BSONObj& obj) { - invariant(obj.isOwned()); - _stash.push(obj); +void ClusterClientCursorImpl::queueResult(const ClusterQueryResult& result) { + auto resultObj = result.getResult(); + if (resultObj) { + invariant(resultObj->isOwned()); + } + + auto viewDef = result.getViewDefinition(); + if (viewDef) { + invariant(viewDef->isOwned()); + } + + _stash.push(result); } bool ClusterClientCursorImpl::remotesExhausted() { diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index d8645bb7834..71f018deec7 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -34,6 +34,7 @@ #include "mongo/executor/task_executor.h" #include "mongo/s/query/cluster_client_cursor.h" #include "mongo/s/query/cluster_client_cursor_params.h" +#include "mongo/s/query/cluster_query_result.h" #include "mongo/s/query/router_exec_stage.h" #include "mongo/util/net/hostandport.h" @@ -91,7 +92,7 @@ public: */ ClusterClientCursorImpl(std::unique_ptr<RouterStageMock> root); - StatusWith<boost::optional<BSONObj>> next() final; + StatusWith<ClusterQueryResult> next() final; void kill() final; @@ -99,7 +100,7 @@ public: long long getNumReturnedSoFar() const final; - void queueResult(const BSONObj& obj) final; + void queueResult(const ClusterQueryResult& result) final; bool remotesExhausted() final; @@ -127,8 +128,8 @@ private: // The root stage of the pipeline used to return the result set, merged from the remote nodes. std::unique_ptr<RouterExecStage> _root; - // Stores documents queued by queueResult(). Stashed BSONObjs must be owned. - std::queue<BSONObj> _stash; + // Stores documents queued by queueResult(). BSONObjs within the stashed results must be owned. + std::queue<ClusterQueryResult> _stash; }; } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp index 50b47f47817..06a193f3298 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp @@ -53,13 +53,13 @@ TEST(ClusterClientCursorImpl, NumReturnedSoFar) { for (int i = 1; i < 10; ++i) { auto result = cursor.next(); ASSERT(result.isOK()); - ASSERT_EQ(*result.getValue(), BSON("a" << i)); + ASSERT_EQ(*result.getValue().getResult(), BSON("a" << i)); ASSERT_EQ(cursor.getNumReturnedSoFar(), i); } // Now check that if nothing is fetched the getNumReturnedSoFar stays the same. auto result = cursor.next(); ASSERT_OK(result.getStatus()); - ASSERT_FALSE(result.getValue()); + ASSERT_TRUE(result.getValue().isEOF()); ASSERT_EQ(cursor.getNumReturnedSoFar(), 9LL); } @@ -72,34 +72,55 @@ TEST(ClusterClientCursorImpl, QueueResult) { auto firstResult = cursor.next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); cursor.queueResult(BSON("a" << 2)); cursor.queueResult(BSON("a" << 3)); auto secondResult = cursor.next(); ASSERT_OK(secondResult.getStatus()); - ASSERT(secondResult.getValue()); - ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2)); + ASSERT(secondResult.getValue().getResult()); + ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 2)); auto thirdResult = cursor.next(); ASSERT_OK(thirdResult.getStatus()); - ASSERT(thirdResult.getValue()); - ASSERT_EQ(*thirdResult.getValue(), BSON("a" << 3)); + ASSERT(thirdResult.getValue().getResult()); + ASSERT_EQ(*thirdResult.getValue().getResult(), BSON("a" << 3)); auto fourthResult = cursor.next(); ASSERT_OK(fourthResult.getStatus()); - ASSERT(fourthResult.getValue()); - ASSERT_EQ(*fourthResult.getValue(), BSON("a" << 4)); + ASSERT(fourthResult.getValue().getResult()); + ASSERT_EQ(*fourthResult.getValue().getResult(), BSON("a" << 4)); auto fifthResult = cursor.next(); ASSERT_OK(fifthResult.getStatus()); - ASSERT(!fifthResult.getValue()); + ASSERT(fifthResult.getValue().isEOF()); ASSERT_EQ(cursor.getNumReturnedSoFar(), 4LL); } +TEST(ClusterClientCursorImpl, CursorPropagatesViewDefinition) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + + auto viewDef = BSON("ns" + << "view_ns" + << "pipeline" + << BSON_ARRAY(BSON("$match" << BSONNULL))); + + ClusterQueryResult cqResult; + cqResult.setViewDefinition(viewDef); + mockStage->queueResult(cqResult); + + ClusterClientCursorImpl cursor(std::move(mockStage)); + + auto result = cursor.next(); + ASSERT_OK(result.getStatus()); + ASSERT(!result.getValue().getResult()); + ASSERT(result.getValue().getViewDefinition()); + ASSERT_EQ(*result.getValue().getViewDefinition(), viewDef); +} + TEST(ClusterClientCursorImpl, RemotesExhausted) { auto mockStage = stdx::make_unique<RouterStageMock>(); mockStage->queueResult(BSON("a" << 1)); @@ -111,19 +132,19 @@ TEST(ClusterClientCursorImpl, RemotesExhausted) { auto firstResult = cursor.next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); ASSERT_TRUE(cursor.remotesExhausted()); auto secondResult = cursor.next(); ASSERT_OK(secondResult.getStatus()); - ASSERT(secondResult.getValue()); - ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2)); + ASSERT(secondResult.getValue().getResult()); + ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 2)); ASSERT_TRUE(cursor.remotesExhausted()); auto thirdResult = cursor.next(); ASSERT_OK(thirdResult.getStatus()); - ASSERT(!thirdResult.getValue()); + ASSERT_TRUE(thirdResult.getValue().isEOF()); ASSERT_TRUE(cursor.remotesExhausted()); ASSERT_EQ(cursor.getNumReturnedSoFar(), 2LL); diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index 2b10928449e..bd4136ab7f2 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -43,12 +43,12 @@ ClusterClientCursorMock::~ClusterClientCursorMock() { invariant(_exhausted || _killed); } -StatusWith<boost::optional<BSONObj>> ClusterClientCursorMock::next() { +StatusWith<ClusterQueryResult> ClusterClientCursorMock::next() { invariant(!_killed); if (_resultsQueue.empty()) { _exhausted = true; - return {boost::none}; + return {ClusterQueryResult()}; } auto out = _resultsQueue.front(); @@ -59,7 +59,7 @@ StatusWith<boost::optional<BSONObj>> ClusterClientCursorMock::next() { } ++_numReturnedSoFar; - return boost::optional<BSONObj>(out.getValue()); + return out.getValue(); } long long ClusterClientCursorMock::getNumReturnedSoFar() const { @@ -77,8 +77,8 @@ bool ClusterClientCursorMock::isTailable() const { return false; } -void ClusterClientCursorMock::queueResult(const BSONObj& obj) { - _resultsQueue.push({obj}); +void ClusterClientCursorMock::queueResult(const ClusterQueryResult& result) { + _resultsQueue.push({result}); } bool ClusterClientCursorMock::remotesExhausted() { diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index 3749a8abb19..d9e0ba789e3 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -43,7 +43,7 @@ public: ~ClusterClientCursorMock(); - StatusWith<boost::optional<BSONObj>> next() final; + StatusWith<ClusterQueryResult> next() final; void kill() final; @@ -51,7 +51,7 @@ public: long long getNumReturnedSoFar() const final; - void queueResult(const BSONObj& obj) final; + void queueResult(const ClusterQueryResult& result) final; Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; @@ -73,7 +73,7 @@ public: private: bool _killed = false; bool _exhausted = false; - std::queue<StatusWith<BSONObj>> _resultsQueue; + std::queue<StatusWith<ClusterQueryResult>> _resultsQueue; stdx::function<void(void)> _killCallback; // Number of returned documents. diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index cca70a8dbdf..48d233239d9 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -110,7 +110,7 @@ ClusterCursorManager::PinnedCursor& ClusterCursorManager::PinnedCursor::operator return *this; } -StatusWith<boost::optional<BSONObj>> ClusterCursorManager::PinnedCursor::next() { +StatusWith<ClusterQueryResult> ClusterCursorManager::PinnedCursor::next() { invariant(_cursor); return _cursor->next(); } @@ -137,9 +137,9 @@ long long ClusterCursorManager::PinnedCursor::getNumReturnedSoFar() const { return _cursor->getNumReturnedSoFar(); } -void ClusterCursorManager::PinnedCursor::queueResult(const BSONObj& obj) { +void ClusterCursorManager::PinnedCursor::queueResult(const ClusterQueryResult& result) { invariant(_cursor); - _cursor->queueResult(obj); + _cursor->queueResult(result); } bool ClusterCursorManager::PinnedCursor::remotesExhausted() { diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index a1d4b28ba68..d8f8899c386 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -154,7 +154,7 @@ public: * * Can block. */ - StatusWith<boost::optional<BSONObj>> next(); + StatusWith<ClusterQueryResult> next(); /** * Returns whether or not the underlying cursor is tailing a capped collection. Cannot be @@ -185,7 +185,7 @@ public: /** * Stashes 'obj' to be returned later by this cursor. A cursor must be owned. */ - void queueResult(const BSONObj& obj); + void queueResult(const ClusterQueryResult& result); /** * Returns whether or not all the remote cursors underlying this cursor have been diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp index ce65558f865..3cb88e9a813 100644 --- a/src/mongo/s/query/cluster_cursor_manager_test.cpp +++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp @@ -117,11 +117,11 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) { ASSERT_OK(pinnedCursor.getStatus()); auto nextResult = pinnedCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); - ASSERT(nextResult.getValue()); - ASSERT_EQ(BSON("a" << 1), *nextResult.getValue()); + ASSERT(nextResult.getValue().getResult()); + ASSERT_EQ(BSON("a" << 1), *nextResult.getValue().getResult()); nextResult = pinnedCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); - ASSERT_FALSE(nextResult.getValue()); + ASSERT_TRUE(nextResult.getValue().isEOF()); } // Test that registering a cursor returns a non-zero cursor id. @@ -148,11 +148,11 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorBasic) { ASSERT_EQ(cursorId, checkedOutCursor.getValue().getCursorId()); auto nextResult = checkedOutCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); - ASSERT(nextResult.getValue()); - ASSERT_EQ(BSON("a" << 1), *nextResult.getValue()); + ASSERT(nextResult.getValue().getResult()); + ASSERT_EQ(BSON("a" << 1), *nextResult.getValue().getResult()); nextResult = checkedOutCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); - ASSERT_FALSE(nextResult.getValue()); + ASSERT_TRUE(nextResult.getValue().isEOF()); } // Test that checking out a cursor returns a pin to the correct cursor, when multiple cursors are @@ -174,11 +174,11 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) { ASSERT_OK(pinnedCursor.getStatus()); auto nextResult = pinnedCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); - ASSERT(nextResult.getValue()); - ASSERT_EQ(BSON("a" << i), *nextResult.getValue()); + ASSERT(nextResult.getValue().getResult()); + ASSERT_EQ(BSON("a" << i), *nextResult.getValue().getResult()); nextResult = pinnedCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); - ASSERT_FALSE(nextResult.getValue()); + ASSERT_TRUE(nextResult.getValue().isEOF()); } } diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 57d6ece2c87..e4ebc311a1c 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -150,7 +150,8 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, const ReadPreferenceSetting& readPref, ChunkManager* chunkManager, std::shared_ptr<Shard> primary, - std::vector<BSONObj>* results) { + std::vector<BSONObj>* results, + BSONObj* viewDefinition) { auto shardRegistry = grid.shardRegistry(); // Get the set of shards on which we will run the query. @@ -235,7 +236,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, return next.getStatus(); } - if (!next.getValue()) { + if (next.getValue().isEOF()) { // We reached end-of-stream. If the cursor is not tailable, then we mark it as // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even // when we reach end-of-stream. However, if all the remote cursors are exhausted, there @@ -246,17 +247,27 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, break; } + if (next.getValue().getViewDefinition()) { + if (viewDefinition) { + *viewDefinition = BSON("resolvedView" << *next.getValue().getViewDefinition()); + } + return {ErrorCodes::CommandOnShardedViewNotSupportedOnMongod, + "Find must be transformed for view and run against base collection"}; + } + + auto nextObj = *next.getValue().getResult(); + // If adding this object will cause us to exceed the message size limit, then we stash it // for later. - if (!FindCommon::haveSpaceForNext(*next.getValue(), results->size(), bytesBuffered)) { - ccc->queueResult(*next.getValue()); + if (!FindCommon::haveSpaceForNext(nextObj, results->size(), bytesBuffered)) { + ccc->queueResult(nextObj); break; } // Add doc to the batch. Account for the space overhead associated with returning this doc // inside a BSON array. - bytesBuffered += (next.getValue()->objsize() + kPerDocumentOverheadBytesUpperBound); - results->push_back(std::move(*next.getValue())); + bytesBuffered += (nextObj.objsize() + kPerDocumentOverheadBytesUpperBound); + results->push_back(std::move(nextObj)); } if (!query.getQueryRequest().wantMore() && !ccc->isTailable()) { @@ -287,7 +298,8 @@ const size_t ClusterFind::kMaxStaleConfigRetries = 10; StatusWith<CursorId> ClusterFind::runQuery(OperationContext* txn, const CanonicalQuery& query, const ReadPreferenceSetting& readPref, - std::vector<BSONObj>* results) { + std::vector<BSONObj>* results, + BSONObj* viewDefinition) { invariant(results); // Projection on the reserved sort key field is illegal in mongos. @@ -316,7 +328,7 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* txn, // shard version. for (size_t retries = 1; retries <= kMaxStaleConfigRetries; ++retries) { auto cursorId = runQueryWithoutRetrying( - txn, query, readPref, chunkManager.get(), std::move(primary), results); + txn, query, readPref, chunkManager.get(), std::move(primary), results, viewDefinition); if (cursorId.isOK()) { return cursorId; } @@ -387,7 +399,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* txn, return next.getStatus(); } - if (!next.getValue()) { + if (next.getValue().isEOF()) { // We reached end-of-stream. if (!pinnedCursor.getValue().isTailable()) { cursorState = ClusterCursorManager::CursorState::Exhausted; @@ -395,15 +407,17 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* txn, break; } - if (!FindCommon::haveSpaceForNext(*next.getValue(), batch.size(), bytesBuffered)) { - pinnedCursor.getValue().queueResult(*next.getValue()); + if (!FindCommon::haveSpaceForNext( + *next.getValue().getResult(), batch.size(), bytesBuffered)) { + pinnedCursor.getValue().queueResult(*next.getValue().getResult()); break; } // Add doc to the batch. Account for the space overhead associated with returning this doc // inside a BSON array. - bytesBuffered += (next.getValue()->objsize() + kPerDocumentOverheadBytesUpperBound); - batch.push_back(std::move(*next.getValue())); + bytesBuffered += + (next.getValue().getResult()->objsize() + kPerDocumentOverheadBytesUpperBound); + batch.push_back(std::move(*next.getValue().getResult())); } // Transfer ownership of the cursor back to the cursor manager. diff --git a/src/mongo/s/query/cluster_find.h b/src/mongo/s/query/cluster_find.h index 64821512ded..22d7ad89b04 100644 --- a/src/mongo/s/query/cluster_find.h +++ b/src/mongo/s/query/cluster_find.h @@ -63,11 +63,14 @@ public: * On success, fills out 'results' with the first batch of query results and returns the cursor * id which the caller can use on subsequent getMore operations. If no cursor needed to be saved * (e.g. the cursor was exhausted without need for a getMore), returns a cursor id of 0. + * If a CommandOnShardedViewNotSupportedOnMongod error is returned, then 'viewDefinition', if + * not null, will contain a view definition. */ static StatusWith<CursorId> runQuery(OperationContext* txn, const CanonicalQuery& query, const ReadPreferenceSetting& readPref, - std::vector<BSONObj>* results); + std::vector<BSONObj>* results, + BSONObj* viewDefinition); /** * Executes the getMore request 'request', and on success returns a CursorResponse. diff --git a/src/mongo/s/query/cluster_query_result.h b/src/mongo/s/query/cluster_query_result.h new file mode 100644 index 00000000000..43faee4392e --- /dev/null +++ b/src/mongo/s/query/cluster_query_result.h @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> + +#include "mongo/bson/bsonobj.h" + +namespace mongo { + +/** + * Holds a single result from a mongoS find command shard request. This result can represent one of + * several states: + * - Contains collection data, stored in '_resultObj'. + * - Contains a view definition, stored in '_viewDefinition'. + * - EOF. Both '_resultObj' and '_viewDefinition' are isEOF() returns true. + */ +class ClusterQueryResult { +public: + ClusterQueryResult() = default; + + ClusterQueryResult(BSONObj resObj) : _resultObj(resObj) {} + + bool isEOF() const { + return !_resultObj && !_viewDefinition; + } + + boost::optional<BSONObj> getResult() const { + return _resultObj; + } + + boost::optional<BSONObj> getViewDefinition() const { + return _viewDefinition; + } + + void setViewDefinition(BSONObj viewDef) { + invariant(isEOF()); + _viewDefinition = viewDef; + } + +private: + boost::optional<BSONObj> _resultObj; + boost::optional<BSONObj> _viewDefinition; +}; + +} // namespace mongo diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index 0e10d9edff2..726bd2df97b 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -33,6 +33,7 @@ #include "mongo/base/status_with.h" #include "mongo/bson/bsonobj.h" +#include "mongo/s/query/cluster_query_result.h" #include "mongo/util/time_support.h" namespace mongo { @@ -59,13 +60,13 @@ public: /** * Returns the next query result, or an error. * - * If there are no more results, returns boost::none. + * If there are no more results, returns an EOF ClusterQueryResult. * * All returned BSONObjs are owned. They may own a buffer larger than the object. If you are * holding on to a subset of the returned results and need to minimize memory usage, call copy() * on the BSONObjs. */ - virtual StatusWith<boost::optional<BSONObj>> next() = 0; + virtual StatusWith<ClusterQueryResult> next() = 0; /** * Must be called before destruction to abandon a not-yet-exhausted plan. May block waiting for diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp index 5f7db02ec7b..4756423249c 100644 --- a/src/mongo/s/query/router_stage_limit.cpp +++ b/src/mongo/s/query/router_stage_limit.cpp @@ -39,9 +39,9 @@ RouterStageLimit::RouterStageLimit(std::unique_ptr<RouterExecStage> child, long invariant(limit > 0); } -StatusWith<boost::optional<BSONObj>> RouterStageLimit::next() { +StatusWith<ClusterQueryResult> RouterStageLimit::next() { if (_returnedSoFar >= _limit) { - return {boost::none}; + return {ClusterQueryResult()}; } auto childResult = getChildStage()->next(); @@ -49,7 +49,7 @@ StatusWith<boost::optional<BSONObj>> RouterStageLimit::next() { return childResult; } - if (childResult.getValue()) { + if (!childResult.getValue().isEOF()) { ++_returnedSoFar; } return childResult; diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h index 366a964f2a4..8b29b56f291 100644 --- a/src/mongo/s/query/router_stage_limit.h +++ b/src/mongo/s/query/router_stage_limit.h @@ -39,7 +39,7 @@ class RouterStageLimit final : public RouterExecStage { public: RouterStageLimit(std::unique_ptr<RouterExecStage> child, long long limit); - StatusWith<boost::optional<BSONObj>> next() final; + StatusWith<ClusterQueryResult> next() final; void kill() final; diff --git a/src/mongo/s/query/router_stage_limit_test.cpp b/src/mongo/s/query/router_stage_limit_test.cpp index fd8fa335e7e..658b6b4d48d 100644 --- a/src/mongo/s/query/router_stage_limit_test.cpp +++ b/src/mongo/s/query/router_stage_limit_test.cpp @@ -42,25 +42,25 @@ namespace { TEST(RouterStageLimitTest, LimitIsOne) { auto mockStage = stdx::make_unique<RouterStageMock>(); - mockStage->queueResult(BSON("a" << 1)); - mockStage->queueResult(BSON("a" << 2)); - mockStage->queueResult(BSON("a" << 3)); + mockStage->queueResult({BSON("a" << 1)}); + mockStage->queueResult({BSON("a" << 2)}); + mockStage->queueResult({BSON("a" << 3)}); auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 1); auto firstResult = limitStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); auto secondResult = limitStage->next(); ASSERT_OK(secondResult.getStatus()); - ASSERT(!secondResult.getValue()); + ASSERT(!secondResult.getValue().getResult()); - // Once end-of-stream is reached, the limit stage should keep returning boost::none. + // Once end-of-stream is reached, the limit stage should keep returning no results. auto thirdResult = limitStage->next(); ASSERT_OK(thirdResult.getStatus()); - ASSERT(!thirdResult.getValue()); + ASSERT(!thirdResult.getValue().getResult()); } TEST(RouterStageLimitTest, LimitIsTwo) { @@ -73,17 +73,17 @@ TEST(RouterStageLimitTest, LimitIsTwo) { auto firstResult = limitStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); auto secondResult = limitStage->next(); ASSERT_OK(secondResult.getStatus()); - ASSERT(secondResult.getValue()); - ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 2)); auto thirdResult = limitStage->next(); ASSERT_OK(thirdResult.getStatus()); - ASSERT(!thirdResult.getValue()); + ASSERT(!thirdResult.getValue().getResult()); } TEST(RouterStageLimitTest, LimitStagePropagatesError) { @@ -97,8 +97,8 @@ TEST(RouterStageLimitTest, LimitStagePropagatesError) { auto firstResult = limitStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); auto secondResult = limitStage->next(); ASSERT_NOT_OK(secondResult.getStatus()); @@ -106,6 +106,27 @@ TEST(RouterStageLimitTest, LimitStagePropagatesError) { ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened"); } +TEST(RouterStageLimitTest, LimitStagePropagatesViewDefinition) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + + auto viewDef = BSON("ns" + << "view_ns" + << "pipeline" + << BSON_ARRAY(BSON("$match" << BSONNULL))); + + ClusterQueryResult cqResult; + cqResult.setViewDefinition(viewDef); + mockStage->queueResult(cqResult); + + auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 3); + + auto result = limitStage->next(); + ASSERT_OK(result.getStatus()); + ASSERT(!result.getValue().getResult()); + ASSERT(result.getValue().getViewDefinition()); + ASSERT_EQ(*result.getValue().getViewDefinition(), viewDef); +} + TEST(RouterStageLimitTest, LimitStageToleratesMidStreamEOF) { // Here we're mocking the tailable case, where there may be a boost::none returned before the // remote cursor is closed. Our goal is to make sure that the limit stage handles this properly, @@ -120,21 +141,21 @@ TEST(RouterStageLimitTest, LimitStageToleratesMidStreamEOF) { auto firstResult = limitStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); auto secondResult = limitStage->next(); ASSERT_OK(secondResult.getStatus()); - ASSERT(!secondResult.getValue()); + ASSERT(secondResult.getValue().isEOF()); auto thirdResult = limitStage->next(); ASSERT_OK(thirdResult.getStatus()); - ASSERT(thirdResult.getValue()); - ASSERT_EQ(*thirdResult.getValue(), BSON("a" << 2)); + ASSERT(thirdResult.getValue().getResult()); + ASSERT_EQ(*thirdResult.getValue().getResult(), BSON("a" << 2)); auto fourthResult = limitStage->next(); ASSERT_OK(fourthResult.getStatus()); - ASSERT(!fourthResult.getValue()); + ASSERT(fourthResult.getValue().isEOF()); } TEST(RouterStageLimitTest, LimitStageRemotesExhausted) { @@ -148,19 +169,19 @@ TEST(RouterStageLimitTest, LimitStageRemotesExhausted) { auto firstResult = limitStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); ASSERT_TRUE(limitStage->remotesExhausted()); auto secondResult = limitStage->next(); ASSERT_OK(secondResult.getStatus()); - ASSERT(secondResult.getValue()); - ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2)); + ASSERT(secondResult.getValue().getResult()); + ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 2)); ASSERT_TRUE(limitStage->remotesExhausted()); auto thirdResult = limitStage->next(); ASSERT_OK(thirdResult.getStatus()); - ASSERT(!thirdResult.getValue()); + ASSERT(thirdResult.getValue().isEOF()); ASSERT_TRUE(limitStage->remotesExhausted()); } diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index 0e9304d9952..e3b8fd299a6 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -40,7 +40,7 @@ RouterStageMerge::RouterStageMerge(executor::TaskExecutor* executor, ClusterClientCursorParams&& params) : _executor(executor), _arm(executor, std::move(params)) {} -StatusWith<boost::optional<BSONObj>> RouterStageMerge::next() { +StatusWith<ClusterQueryResult> RouterStageMerge::next() { while (!_arm.ready()) { auto nextEventStatus = _arm.nextEvent(); if (!nextEventStatus.isOK()) { diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index a75d5a46f8d..9f2c2e9e0c4 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -45,7 +45,7 @@ class RouterStageMerge final : public RouterExecStage { public: RouterStageMerge(executor::TaskExecutor* executor, ClusterClientCursorParams&& params); - StatusWith<boost::optional<BSONObj>> next() final; + StatusWith<ClusterQueryResult> next() final; void kill() final; diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp index daad6fe6d07..b2aa83e3ed6 100644 --- a/src/mongo/s/query/router_stage_mock.cpp +++ b/src/mongo/s/query/router_stage_mock.cpp @@ -34,8 +34,8 @@ namespace mongo { -void RouterStageMock::queueResult(BSONObj obj) { - _resultsQueue.push({obj}); +void RouterStageMock::queueResult(const ClusterQueryResult& result) { + _resultsQueue.push({result}); } void RouterStageMock::queueError(Status status) { @@ -43,16 +43,16 @@ void RouterStageMock::queueError(Status status) { } void RouterStageMock::queueEOF() { - _resultsQueue.push({boost::none}); + _resultsQueue.push({ClusterQueryResult()}); } void RouterStageMock::markRemotesExhausted() { _remotesExhausted = true; } -StatusWith<boost::optional<BSONObj>> RouterStageMock::next() { +StatusWith<ClusterQueryResult> RouterStageMock::next() { if (_resultsQueue.empty()) { - return {boost::none}; + return {ClusterQueryResult()}; } auto out = _resultsQueue.front(); diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h index 255ae75b595..ef093b04fe4 100644 --- a/src/mongo/s/query/router_stage_mock.h +++ b/src/mongo/s/query/router_stage_mock.h @@ -31,6 +31,7 @@ #include <boost/optional.hpp> #include <queue> +#include "mongo/s/query/cluster_query_result.h" #include "mongo/s/query/router_exec_stage.h" namespace mongo { @@ -43,7 +44,7 @@ class RouterStageMock final : public RouterExecStage { public: ~RouterStageMock() final {} - StatusWith<boost::optional<BSONObj>> next() final; + StatusWith<ClusterQueryResult> next() final; void kill() final; @@ -56,7 +57,7 @@ public: /** * Queues a BSONObj to be returned. */ - void queueResult(BSONObj obj); + void queueResult(const ClusterQueryResult& result); /** * Queues an error response. @@ -80,7 +81,7 @@ public: StatusWith<Milliseconds> getAwaitDataTimeout(); private: - std::queue<StatusWith<boost::optional<BSONObj>>> _resultsQueue; + std::queue<StatusWith<ClusterQueryResult>> _resultsQueue; bool _remotesExhausted = false; boost::optional<Milliseconds> _awaitDataTimeout; }; diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp index 949182e3a1a..77d3e26afd0 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.cpp +++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp @@ -41,14 +41,16 @@ namespace mongo { RouterStageRemoveSortKey::RouterStageRemoveSortKey(std::unique_ptr<RouterExecStage> child) : RouterExecStage(std::move(child)) {} -StatusWith<boost::optional<BSONObj>> RouterStageRemoveSortKey::next() { +StatusWith<ClusterQueryResult> RouterStageRemoveSortKey::next() { auto childResult = getChildStage()->next(); - if (!childResult.isOK() || !childResult.getValue()) { + if (!childResult.isOK() || !childResult.getValue().getResult()) { return childResult; } + const auto& childObj = childResult.getValue().getResult(); + BSONObjBuilder builder; - for (BSONElement elt : *childResult.getValue()) { + for (BSONElement elt : *childObj) { if (!str::equals(elt.fieldName(), ClusterClientCursorParams::kSortKeyField)) { builder.append(elt); } diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h index f7965312d47..79294aeb20a 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.h +++ b/src/mongo/s/query/router_stage_remove_sortkey.h @@ -41,7 +41,7 @@ class RouterStageRemoveSortKey final : public RouterExecStage { public: RouterStageRemoveSortKey(std::unique_ptr<RouterExecStage> child); - StatusWith<boost::optional<BSONObj>> next() final; + StatusWith<ClusterQueryResult> next() final; void kill() final; diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp index 255bcb3cba4..ad6ee3f55a2 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp +++ b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp @@ -52,29 +52,29 @@ TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) { auto firstResult = sortKeyStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSON("a" << 4 << "b" << 3)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 4 << "b" << 3)); auto secondResult = sortKeyStage->next(); ASSERT_OK(secondResult.getStatus()); - ASSERT(secondResult.getValue()); - ASSERT_EQ(*secondResult.getValue(), + ASSERT(secondResult.getValue().getResult()); + ASSERT_EQ(*secondResult.getValue().getResult(), BSON("c" << BSON("d" << "foo"))); auto thirdResult = sortKeyStage->next(); ASSERT_OK(thirdResult.getStatus()); - ASSERT(thirdResult.getValue()); - ASSERT_EQ(*thirdResult.getValue(), BSON("a" << 3)); + ASSERT(thirdResult.getValue().getResult()); + ASSERT_EQ(*thirdResult.getValue().getResult(), BSON("a" << 3)); auto fourthResult = sortKeyStage->next(); ASSERT_OK(fourthResult.getStatus()); - ASSERT(fourthResult.getValue()); - ASSERT_EQ(*fourthResult.getValue(), BSONObj()); + ASSERT(fourthResult.getValue().getResult()); + ASSERT_EQ(*fourthResult.getValue().getResult(), BSONObj()); auto fifthResult = sortKeyStage->next(); ASSERT_OK(fifthResult.getStatus()); - ASSERT(!fifthResult.getValue()); + ASSERT(fifthResult.getValue().isEOF()); } TEST(RouterStageRemoveSortKeyTest, PropagatesError) { @@ -86,8 +86,8 @@ TEST(RouterStageRemoveSortKeyTest, PropagatesError) { auto firstResult = sortKeyStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSONObj()); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSONObj()); auto secondResult = sortKeyStage->next(); ASSERT_NOT_OK(secondResult.getStatus()); @@ -105,21 +105,21 @@ TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) { auto firstResult = sortKeyStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1 << "b" << 1)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1)); auto secondResult = sortKeyStage->next(); ASSERT_OK(secondResult.getStatus()); - ASSERT(!secondResult.getValue()); + ASSERT(secondResult.getValue().isEOF()); auto thirdResult = sortKeyStage->next(); ASSERT_OK(thirdResult.getStatus()); - ASSERT(thirdResult.getValue()); - ASSERT_EQ(*thirdResult.getValue(), BSON("a" << 2 << "b" << 2)); + ASSERT(thirdResult.getValue().getResult()); + ASSERT_EQ(*thirdResult.getValue().getResult(), BSON("a" << 2 << "b" << 2)); auto fourthResult = sortKeyStage->next(); ASSERT_OK(fourthResult.getStatus()); - ASSERT(!fourthResult.getValue()); + ASSERT(fourthResult.getValue().isEOF()); } TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) { @@ -133,19 +133,19 @@ TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) { auto firstResult = sortKeyStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1 << "b" << 1)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1)); ASSERT_TRUE(sortKeyStage->remotesExhausted()); auto secondResult = sortKeyStage->next(); ASSERT_OK(secondResult.getStatus()); - ASSERT(secondResult.getValue()); - ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2 << "b" << 2)); + ASSERT(secondResult.getValue().getResult()); + ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 2 << "b" << 2)); ASSERT_TRUE(sortKeyStage->remotesExhausted()); auto thirdResult = sortKeyStage->next(); ASSERT_OK(thirdResult.getStatus()); - ASSERT(!thirdResult.getValue()); + ASSERT(thirdResult.getValue().isEOF()); ASSERT_TRUE(sortKeyStage->remotesExhausted()); } diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp index af746d5e430..f9a63e515ff 100644 --- a/src/mongo/s/query/router_stage_skip.cpp +++ b/src/mongo/s/query/router_stage_skip.cpp @@ -39,14 +39,18 @@ RouterStageSkip::RouterStageSkip(std::unique_ptr<RouterExecStage> child, long lo invariant(skip > 0); } -StatusWith<boost::optional<BSONObj>> RouterStageSkip::next() { +StatusWith<ClusterQueryResult> RouterStageSkip::next() { while (_skippedSoFar < _skip) { auto next = getChildStage()->next(); if (!next.isOK()) { return next; } - if (!next.getValue()) { + if (next.getValue().isEOF()) { + return next; + } + + if (next.getValue().getViewDefinition()) { return next; } diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h index 430d3748c91..fda4201f9cb 100644 --- a/src/mongo/s/query/router_stage_skip.h +++ b/src/mongo/s/query/router_stage_skip.h @@ -39,7 +39,7 @@ class RouterStageSkip final : public RouterExecStage { public: RouterStageSkip(std::unique_ptr<RouterExecStage> child, long long skip); - StatusWith<boost::optional<BSONObj>> next() final; + StatusWith<ClusterQueryResult> next() final; void kill() final; diff --git a/src/mongo/s/query/router_stage_skip_test.cpp b/src/mongo/s/query/router_stage_skip_test.cpp index 7aca1f600bd..efd57eaf111 100644 --- a/src/mongo/s/query/router_stage_skip_test.cpp +++ b/src/mongo/s/query/router_stage_skip_test.cpp @@ -50,22 +50,22 @@ TEST(RouterStageSkipTest, SkipIsOne) { auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSON("a" << 2)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 2)); auto secondResult = skipStage->next(); ASSERT_OK(secondResult.getStatus()); - ASSERT(secondResult.getValue()); - ASSERT_EQ(*secondResult.getValue(), BSON("a" << 3)); + ASSERT(secondResult.getValue().getResult()); + ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 3)); // Once end-of-stream is reached, the skip stage should keep returning boost::none. auto thirdResult = skipStage->next(); ASSERT_OK(thirdResult.getStatus()); - ASSERT(!thirdResult.getValue()); + ASSERT(thirdResult.getValue().isEOF()); auto fourthResult = skipStage->next(); ASSERT_OK(thirdResult.getStatus()); - ASSERT(!thirdResult.getValue()); + ASSERT(thirdResult.getValue().isEOF()); } TEST(RouterStageSkipTest, SkipIsThree) { @@ -79,12 +79,12 @@ TEST(RouterStageSkipTest, SkipIsThree) { auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSON("a" << 4)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 4)); auto secondResult = skipStage->next(); ASSERT_OK(secondResult.getStatus()); - ASSERT(!secondResult.getValue()); + ASSERT(secondResult.getValue().isEOF()); } TEST(RouterStageSkipTest, SkipEqualToResultSetSize) { @@ -97,7 +97,7 @@ TEST(RouterStageSkipTest, SkipEqualToResultSetSize) { auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(!firstResult.getValue()); + ASSERT(firstResult.getValue().isEOF()); } TEST(RouterStageSkipTest, SkipExceedsResultSetSize) { @@ -110,7 +110,7 @@ TEST(RouterStageSkipTest, SkipExceedsResultSetSize) { auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(!firstResult.getValue()); + ASSERT(firstResult.getValue().isEOF()); } TEST(RouterStageSkipTest, ErrorWhileSkippingResults) { @@ -139,8 +139,8 @@ TEST(RouterStageSkipTest, ErrorAfterSkippingResults) { auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSON("a" << 3)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 3)); auto secondResult = skipStage->next(); ASSERT_NOT_OK(secondResult.getStatus()); @@ -148,6 +148,29 @@ TEST(RouterStageSkipTest, ErrorAfterSkippingResults) { ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened"); } +TEST(RouterStageSkipTest, SkipStagePropagatesViewDefinition) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + + ClusterQueryResult cqResult; + cqResult.setViewDefinition(BSON("ns" + << "view_ns" + << "pipeline" + << BSON_ARRAY(BSON("$match" << BSONNULL)))); + mockStage->queueResult(cqResult); + + auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3); + + auto result = skipStage->next(); + ASSERT_OK(result.getStatus()); + ASSERT(!result.getValue().getResult()); + ASSERT(result.getValue().getViewDefinition()); + ASSERT_EQ(*result.getValue().getViewDefinition(), + BSON("ns" + << "view_ns" + << "pipeline" + << BSON_ARRAY(BSON("$match" << BSONNULL)))); +} + TEST(RouterStageSkipTest, SkipStageToleratesMidStreamEOF) { // Skip stage must propagate a boost::none, but not count it towards the skip value. auto mockStage = stdx::make_unique<RouterStageMock>(); @@ -160,16 +183,16 @@ TEST(RouterStageSkipTest, SkipStageToleratesMidStreamEOF) { auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(!firstResult.getValue()); + ASSERT(firstResult.getValue().isEOF()); auto secondResult = skipStage->next(); ASSERT_OK(secondResult.getStatus()); - ASSERT(secondResult.getValue()); - ASSERT_EQ(*secondResult.getValue(), BSON("a" << 3)); + ASSERT(secondResult.getValue().getResult()); + ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 3)); auto thirdResult = skipStage->next(); ASSERT_OK(thirdResult.getStatus()); - ASSERT(!thirdResult.getValue()); + ASSERT(thirdResult.getValue().isEOF()); } TEST(RouterStageSkipTest, SkipStageRemotesExhausted) { @@ -184,19 +207,19 @@ TEST(RouterStageSkipTest, SkipStageRemotesExhausted) { auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); - ASSERT(firstResult.getValue()); - ASSERT_EQ(*firstResult.getValue(), BSON("a" << 2)); + ASSERT(firstResult.getValue().getResult()); + ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 2)); ASSERT_TRUE(skipStage->remotesExhausted()); auto secondResult = skipStage->next(); ASSERT_OK(secondResult.getStatus()); - ASSERT(secondResult.getValue()); - ASSERT_EQ(*secondResult.getValue(), BSON("a" << 3)); + ASSERT(secondResult.getValue().getResult()); + ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 3)); ASSERT_TRUE(skipStage->remotesExhausted()); auto thirdResult = skipStage->next(); ASSERT_OK(thirdResult.getStatus()); - ASSERT(!thirdResult.getValue()); + ASSERT(thirdResult.getValue().isEOF()); ASSERT_TRUE(skipStage->remotesExhausted()); } |