summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJames Wahlin <james.wahlin@10gen.com>2016-07-25 16:56:22 -0400
committerJames Wahlin <james.wahlin@10gen.com>2016-07-29 15:36:53 -0400
commitdc7f50c520c5129709008568241274cb6d5ec231 (patch)
treecd38158bf08d17566e706eeb3eb4202d3dfc1044 /src
parentd305e618162d37ccc16cf574fcc0388a1160af93 (diff)
downloadmongo-dc7f50c520c5129709008568241274cb6d5ec231.tar.gz
SERVER-24762 Support for views on sharded collections
Diffstat (limited to 'src')
-rw-r--r--src/mongo/base/error_codes.err2
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/count_cmd.cpp9
-rw-r--r--src/mongo/db/commands/distinct.cpp8
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp21
-rw-r--r--src/mongo/db/query/count_request.cpp11
-rw-r--r--src/mongo/db/query/parsed_distinct.h2
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp12
-rw-r--r--src/mongo/db/s/collection_sharding_state.h6
-rw-r--r--src/mongo/db/s/set_shard_version_command.cpp19
-rw-r--r--src/mongo/db/views/SConscript6
-rw-r--r--src/mongo/db/views/resolved_view.cpp116
-rw-r--r--src/mongo/db/views/resolved_view.h78
-rw-r--r--src/mongo/db/views/view_catalog.cpp44
-rw-r--r--src/mongo/db/views/view_catalog.h21
-rw-r--r--src/mongo/db/views/view_sharding_check.cpp97
-rw-r--r--src/mongo/db/views/view_sharding_check.h81
-rw-r--r--src/mongo/s/commands/SConscript1
-rw-r--r--src/mongo/s/commands/cluster_count_cmd.cpp65
-rw-r--r--src/mongo/s/commands/cluster_find_cmd.cpp52
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp40
-rw-r--r--src/mongo/s/commands/commands_public.cpp72
-rw-r--r--src/mongo/s/commands/strategy.cpp21
-rw-r--r--src/mongo/s/query/async_results_merger.cpp61
-rw-r--r--src/mongo/s/query/async_results_merger.h17
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp251
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h12
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp21
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h9
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl_test.cpp53
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp10
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h6
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp6
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h4
-rw-r--r--src/mongo/s/query/cluster_cursor_manager_test.cpp18
-rw-r--r--src/mongo/s/query/cluster_find.cpp40
-rw-r--r--src/mongo/s/query/cluster_find.h5
-rw-r--r--src/mongo/s/query/cluster_query_result.h72
-rw-r--r--src/mongo/s/query/router_exec_stage.h5
-rw-r--r--src/mongo/s/query/router_stage_limit.cpp6
-rw-r--r--src/mongo/s/query/router_stage_limit.h2
-rw-r--r--src/mongo/s/query/router_stage_limit_test.cpp73
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp2
-rw-r--r--src/mongo/s/query/router_stage_merge.h2
-rw-r--r--src/mongo/s/query/router_stage_mock.cpp10
-rw-r--r--src/mongo/s/query/router_stage_mock.h7
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.cpp8
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.h2
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey_test.cpp44
-rw-r--r--src/mongo/s/query/router_stage_skip.cpp8
-rw-r--r--src/mongo/s/query/router_stage_skip.h2
-rw-r--r--src/mongo/s/query/router_stage_skip_test.cpp67
52 files changed, 1243 insertions, 365 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index c2c63c78aab..0871fe48a1f 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -167,7 +167,7 @@ error_code("ViewDepthLimitExceeded", 165)
error_code("CommandNotSupportedOnView", 166)
error_code("OptionNotSupportedOnView", 167)
error_code("InvalidPipelineOperator", 168)
-error_code("CommandOnShardedViewNotSupportedOnMongos", 169)
+error_code("CommandOnShardedViewNotSupportedOnMongod", 169)
error_code("TooManyMatchingDocuments", 170)
error_code("CannotIndexParallelArrays", 171)
error_code("TransportSessionNotFound", 172)
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 87c472dc519..2f53565f93c 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -66,6 +66,7 @@ env.Library(
'$BUILD_DIR/mongo/db/startup_warnings_common',
'$BUILD_DIR/mongo/db/stats/counters',
'$BUILD_DIR/mongo/db/stats/timer_stats',
+ '$BUILD_DIR/mongo/db/views/views',
'$BUILD_DIR/mongo/logger/parse_log_component_settings',
'$BUILD_DIR/mongo/s/client/sharding_client',
'$BUILD_DIR/mongo/s/coreshard',
diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp
index adcab164b79..3204900e26f 100644
--- a/src/mongo/db/commands/count_cmd.cpp
+++ b/src/mongo/db/commands/count_cmd.cpp
@@ -36,13 +36,13 @@
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/count.h"
-#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/explain.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/view_response_formatter.h"
#include "mongo/db/range_preserver.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/views/resolved_view.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -115,6 +115,7 @@ public:
// Acquire the db read lock.
AutoGetCollectionOrViewForRead ctx(txn, request.getValue().getNs());
Collection* collection = ctx.getCollection();
+
if (ctx.getView()) {
ctx.releaseLocksForView();
@@ -162,6 +163,7 @@ public:
AutoGetCollectionOrViewForRead ctx(txn, request.getValue().getNs());
Collection* collection = ctx.getCollection();
+
if (ctx.getView()) {
ctx.releaseLocksForView();
@@ -174,6 +176,11 @@ public:
(void)Command::findCommand("aggregate")
->run(txn, dbname, viewAggregation.getValue(), options, errmsg, aggResult);
+ if (ResolvedView::isResolvedViewErrorResponse(aggResult.asTempObj())) {
+ result.appendElements(aggResult.obj());
+ return false;
+ }
+
ViewResponseFormatter formatter(aggResult.obj());
Status formatStatus = formatter.appendAsCountResponse(&result);
if (!formatStatus.isOK()) {
diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp
index a10e7cb3b1f..e571bfee3d3 100644
--- a/src/mongo/db/commands/distinct.cpp
+++ b/src/mongo/db/commands/distinct.cpp
@@ -30,6 +30,8 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+#include "mongo/platform/basic.h"
+
#include <string>
#include <vector>
@@ -56,6 +58,7 @@
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_planner_common.h"
#include "mongo/db/query/view_response_formatter.h"
+#include "mongo/db/views/resolved_view.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
@@ -178,6 +181,11 @@ public:
(void)Command::findCommand("aggregate")
->run(txn, dbname, viewAggregation.getValue(), options, errmsg, aggResult);
+ if (ResolvedView::isResolvedViewErrorResponse(aggResult.asTempObj())) {
+ result.appendElements(aggResult.obj());
+ return false;
+ }
+
ViewResponseFormatter formatter(aggResult.obj());
Status formatStatus = formatter.appendAsDistinctResponse(&result);
if (!formatStatus.isOK()) {
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 64db549e8a7..34f7f5b45bc 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -59,6 +59,7 @@
#include "mongo/db/storage/storage_options.h"
#include "mongo/db/views/view.h"
#include "mongo/db/views/view_catalog.h"
+#include "mongo/db/views/view_sharding_check.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
@@ -283,7 +284,18 @@ public:
// recursively calling run, which will re-acquire locks on the underlying collection.
// (The lock must be released because recursively acquiring locks on the database will
// prohibit yielding.)
- if (ctx.getView()) {
+ if (auto view = ctx.getView()) {
+ auto viewDefinition =
+ ViewShardingCheck::getResolvedViewIfSharded(txn, ctx.getDb(), view);
+ if (!viewDefinition.isOK()) {
+ return appendCommandStatus(result, viewDefinition.getStatus());
+ }
+
+ if (!viewDefinition.getValue().isEmpty()) {
+ ViewShardingCheck::appendShardedViewStatus(viewDefinition.getValue(), &result);
+ return false;
+ }
+
auto resolvedView = ctx.getDb()->getViewCatalog()->resolveView(txn, nss);
if (!resolvedView.isOK()) {
return appendCommandStatus(result, resolvedView.getStatus());
@@ -293,10 +305,13 @@ public:
ctx.releaseLocksForView();
// Parse the resolved view into a new aggregation request.
- BSONObj viewCmd =
+ auto viewCmd =
resolvedView.getValue().asExpandedViewAggregation(request.getValue());
+ if (!viewCmd.isOK()) {
+ return appendCommandStatus(result, viewCmd.getStatus());
+ }
- return this->run(txn, db, viewCmd, options, errmsg, result);
+ return this->run(txn, db, viewCmd.getValue(), options, errmsg, result);
}
// If the pipeline does not have a user-specified collation, set it from the collection
diff --git a/src/mongo/db/query/count_request.cpp b/src/mongo/db/query/count_request.cpp
index c062e483ffa..f412065ab9c 100644
--- a/src/mongo/db/query/count_request.cpp
+++ b/src/mongo/db/query/count_request.cpp
@@ -149,12 +149,6 @@ StatusWith<BSONObj> CountRequest::asAggregationCommand() const {
str::stream() << "Option " << kHintField << " not supported in aggregation."};
}
- // TODO(SERVER-25186): Views may not override the collation of the underlying collection.
- if (_collation) {
- return {ErrorCodes::InvalidPipelineOperator,
- str::stream() << kCollationField << " not supported on a view."};
- }
-
BSONObjBuilder aggregationBuilder;
aggregationBuilder.append("aggregate", _nss.coll());
@@ -176,6 +170,7 @@ StatusWith<BSONObj> CountRequest::asAggregationCommand() const {
limitBuilder.append("$limit", *_limit);
limitBuilder.doneFast();
}
+
BSONObjBuilder countBuilder(pipelineBuilder.subobjStart());
countBuilder.append("$count", "count");
countBuilder.doneFast();
@@ -186,6 +181,10 @@ StatusWith<BSONObj> CountRequest::asAggregationCommand() const {
aggregationBuilder.append(kExplainField, _explain);
}
+ if (_collation) {
+ aggregationBuilder.append(kCollationField, *_collation);
+ }
+
// The 'cursor' option is always specified so that aggregation uses the cursor interface.
aggregationBuilder.append("cursor", BSONObj());
diff --git a/src/mongo/db/query/parsed_distinct.h b/src/mongo/db/query/parsed_distinct.h
index f26066abd83..8e07116809b 100644
--- a/src/mongo/db/query/parsed_distinct.h
+++ b/src/mongo/db/query/parsed_distinct.h
@@ -32,11 +32,11 @@
#include <string>
#include "mongo/base/status_with.h"
+#include "mongo/db/query/canonical_query.h"
namespace mongo {
class BSONObj;
-class CanonicalQuery;
class ExtensionsCallback;
class NamespaceString;
class OperationContext;
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 47e66ca914b..018d3b2cdcf 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -203,6 +203,18 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* txn) {
}
}
+bool CollectionShardingState::collectionIsSharded() {
+ auto metadata = getMetadata().getMetadata();
+ if (metadata && (metadata->getCollVersion().isStrictlyEqualTo(ChunkVersion::UNSHARDED()))) {
+ return false;
+ }
+
+ // If 'metadata' is null, then the shard doesn't know if this collection is sharded or not. In
+ // this scenario we will assume this collection is sharded. We will know sharding state
+ // definitively once SERVER-24960 has been fixed.
+ return true;
+}
+
bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* txn,
const BSONObj& doc) {
dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index 40ce2829e53..509893c8b33 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -130,6 +130,12 @@ public:
*/
void checkShardVersionOrThrow(OperationContext* txn);
+ /**
+ * Returns whether this collection is sharded. Valid only if mongoD is primary.
+ * TODO SERVER-24960: This method may return a false positive until SERVER-24960 is fixed.
+ */
+ bool collectionIsSharded();
+
// Replication subsystem hooks. If this collection is serving as a source for migration, these
// methods inform it of any changes to its contents.
diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp
index aa0ed124df2..5a0a8afd8c1 100644
--- a/src/mongo/db/s/set_shard_version_command.cpp
+++ b/src/mongo/db/s/set_shard_version_command.cpp
@@ -183,9 +183,16 @@ public:
connectionVersion.addToBSON(result, "oldVersion");
{
- // Use a stable collection metadata while performing the checks
- boost::optional<AutoGetCollection> autoColl;
- autoColl.emplace(txn, nss, MODE_IS);
+ boost::optional<AutoGetDb> autoDb;
+ autoDb.emplace(txn, nss.db(), MODE_IS);
+
+ // Views do not require a shard version check.
+ if (autoDb->getDb() && autoDb->getDb()->getViewCatalog()->lookup(nss.ns())) {
+ return true;
+ }
+
+ boost::optional<Lock::CollectionLock> collLock;
+ collLock.emplace(txn->lockState(), nss.ns(), MODE_IS);
auto css = CollectionShardingState::get(txn, nss);
const ChunkVersion collectionShardVersion =
@@ -248,7 +255,8 @@ public:
auto critSecSignal =
css->getMigrationSourceManager()->getMigrationCriticalSectionSignal();
if (critSecSignal) {
- autoColl.reset();
+ collLock.reset();
+ autoDb.reset();
log() << "waiting till out of critical section";
critSecSignal->waitFor(txn, Seconds(10));
}
@@ -270,7 +278,8 @@ public:
auto critSecSignal =
css->getMigrationSourceManager()->getMigrationCriticalSectionSignal();
if (critSecSignal) {
- autoColl.reset();
+ collLock.reset();
+ autoDb.reset();
log() << "waiting till out of critical section";
critSecSignal->waitFor(txn, Seconds(10));
}
diff --git a/src/mongo/db/views/SConscript b/src/mongo/db/views/SConscript
index 0a63322240c..ee25270984c 100644
--- a/src/mongo/db/views/SConscript
+++ b/src/mongo/db/views/SConscript
@@ -6,11 +6,13 @@ env.Library(
target='views_mongod',
source=[
'durable_view_catalog.cpp',
+ 'view_sharding_check.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/views/views',
'$BUILD_DIR/mongo/db/catalog/catalog',
+ '$BUILD_DIR/mongo/db/s/sharding',
]
)
@@ -19,10 +21,12 @@ env.Library(
source=[
'view.cpp',
'view_catalog.cpp',
+ 'resolved_view.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/server_parameters',
- '$BUILD_DIR/mongo/db/pipeline/aggregation'
+ '$BUILD_DIR/mongo/db/pipeline/aggregation',
+ '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
]
)
diff --git a/src/mongo/db/views/resolved_view.cpp b/src/mongo/db/views/resolved_view.cpp
new file mode 100644
index 00000000000..c1fef5c2370
--- /dev/null
+++ b/src/mongo/db/views/resolved_view.cpp
@@ -0,0 +1,116 @@
+/**
+* Copyright (C) 2016 MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/views/resolved_view.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+
+namespace mongo {
+
+bool ResolvedView::isResolvedViewErrorResponse(BSONObj commandResponseObj) {
+ auto status = getStatusFromCommandResult(commandResponseObj);
+ return ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == status;
+}
+
+ResolvedView ResolvedView::fromBSON(BSONObj commandResponseObj) {
+ uassert(40248,
+ "command response expected to have a 'resolvedView' field",
+ commandResponseObj.hasField("resolvedView"));
+
+ auto viewDef = commandResponseObj.getObjectField("resolvedView");
+ uassert(40249, "resolvedView must be an object", !viewDef.isEmpty());
+
+ uassert(40250,
+ "View definition must have 'ns' field of type string",
+ viewDef.hasField("ns") && viewDef.getField("ns").type() == BSONType::String);
+
+ uassert(40251,
+ "View definition must have 'pipeline' field of type array",
+ viewDef.hasField("pipeline") && viewDef.getField("pipeline").type() == BSONType::Array);
+
+ std::vector<BSONObj> pipeline;
+ for (auto&& item : viewDef["pipeline"].Obj()) {
+ pipeline.push_back(item.Obj().getOwned());
+ }
+
+ return {ResolvedView(NamespaceString(viewDef["ns"].valueStringData()), pipeline)};
+}
+
+StatusWith<BSONObj> ResolvedView::asExpandedViewAggregation(
+ const AggregationRequest& request) const {
+ if (!request.getCollation().isEmpty()) {
+ // TODO(SERVER-25186): Views may not override the collation of the underlying collection.
+ return {ErrorCodes::InvalidPipelineOperator,
+ str::stream() << "collation not supported on a view."};
+ }
+
+ BSONObjBuilder aggregationBuilder;
+ // Perform the aggregation on the resolved namespace.
+ aggregationBuilder.append("aggregate", _namespace.coll());
+
+ // The new pipeline consists of two parts: first, 'pipeline' in this ResolvedView;
+ // then, the pipeline in 'request'.
+ BSONArrayBuilder pipelineBuilder(aggregationBuilder.subarrayStart("pipeline"));
+ for (auto&& item : _pipeline) {
+ pipelineBuilder.append(item);
+ }
+
+ for (auto&& item : request.getPipeline()) {
+ pipelineBuilder.append(item);
+ }
+ pipelineBuilder.doneFast();
+
+ // The cursor option is always specified regardless of the presence of batchSize.
+ if (request.getBatchSize()) {
+ BSONObjBuilder batchSizeBuilder(aggregationBuilder.subobjStart("cursor"));
+ batchSizeBuilder.append(AggregationRequest::kBatchSizeName, *request.getBatchSize());
+ batchSizeBuilder.doneFast();
+ } else {
+ aggregationBuilder.append("cursor", BSONObj());
+ }
+
+ if (request.isExplain())
+ aggregationBuilder.append("explain", true);
+
+ return aggregationBuilder.obj();
+}
+
+StatusWith<BSONObj> ResolvedView::asExpandedViewAggregation(const BSONObj& aggCommand) const {
+ auto aggRequest = AggregationRequest::parseFromBSON(_namespace, aggCommand);
+ if (!aggRequest.isOK()) {
+ return aggRequest.getStatus();
+ }
+
+ return asExpandedViewAggregation(aggRequest.getValue());
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/views/resolved_view.h b/src/mongo/db/views/resolved_view.h
new file mode 100644
index 00000000000..93ad3a7e709
--- /dev/null
+++ b/src/mongo/db/views/resolved_view.h
@@ -0,0 +1,78 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/namespace_string.h"
+
+namespace mongo {
+
+class AggregationRequest;
+
+/**
+ * Represents a resolved definition, composed of a base collection namespace and a pipeline
+ * built from one or more views.
+ */
+class ResolvedView {
+public:
+ ResolvedView(const NamespaceString& collectionNs, const std::vector<BSONObj>& pipeline)
+ : _namespace(std::move(collectionNs)), _pipeline(std::move(pipeline)) {}
+
+ /**
+ * Returns whether 'commandResponseObj' contains a CommandOnShardedViewNotSupportedOnMongod
+ * error and resolved view definition.
+ */
+ static bool isResolvedViewErrorResponse(BSONObj commandResponseObj);
+
+ static ResolvedView fromBSON(BSONObj commandResponseObj);
+
+ /**
+ * Convert an aggregation command on a view to the equivalent command against the views
+ * underlying collection.
+ */
+ StatusWith<BSONObj> asExpandedViewAggregation(const BSONObj& aggCommand) const;
+ StatusWith<BSONObj> asExpandedViewAggregation(const AggregationRequest& aggRequest) const;
+
+ const NamespaceString& getNamespace() const {
+ return _namespace;
+ }
+
+ const std::vector<BSONObj>& getPipeline() const {
+ return _pipeline;
+ }
+
+private:
+ NamespaceString _namespace;
+ std::vector<BSONObj> _pipeline;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/views/view_catalog.cpp b/src/mongo/db/views/view_catalog.cpp
index 2f1ea88a1d1..fd738cb2f6d 100644
--- a/src/mongo/db/views/view_catalog.cpp
+++ b/src/mongo/db/views/view_catalog.cpp
@@ -36,13 +36,8 @@
#include "mongo/base/status_with.h"
#include "mongo/base/string_data.h"
-#include "mongo/db/catalog/collection.h"
-#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/pipeline/aggregation_request.h"
-#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/db/views/view.h"
@@ -57,39 +52,6 @@ ExportedServerParameter<bool, ServerParameterType::kStartupOnly> enableViewsPara
const std::uint32_t ViewCatalog::kMaxViewDepth = 20;
-BSONObj ResolvedViewDefinition::asExpandedViewAggregation(const AggregationRequest& request) {
- BSONObjBuilder aggregationBuilder;
-
- // Perform the aggregation on the resolved namespace.
- aggregationBuilder.append("aggregate", collectionNss.coll());
-
- // The new pipeline consists of two parts: first, 'pipeline' in this ResolvedViewDefinition;
- // then, the pipeline in 'request'.
- BSONArrayBuilder pipelineBuilder(aggregationBuilder.subarrayStart("pipeline"));
- for (auto&& item : pipeline) {
- pipelineBuilder.append(item);
- }
-
- for (auto&& item : request.getPipeline()) {
- pipelineBuilder.append(item);
- }
- pipelineBuilder.doneFast();
-
- // The cursor option is always specified regardless of the presence of batchSize.
- if (request.getBatchSize()) {
- BSONObjBuilder batchSizeBuilder(aggregationBuilder.subobjStart("cursor"));
- batchSizeBuilder.append(AggregationRequest::kBatchSizeName, *request.getBatchSize());
- batchSizeBuilder.doneFast();
- } else {
- aggregationBuilder.append("cursor", BSONObj());
- }
-
- if (request.isExplain())
- aggregationBuilder.append("explain", true);
-
- return aggregationBuilder.obj();
-}
-
ViewCatalog::ViewCatalog(OperationContext* txn, DurableViewCatalog* durable) : _durable(durable) {
durable->iterate(txn, [&](const BSONObj& view) {
NamespaceString viewName(view["_id"].str());
@@ -149,15 +111,15 @@ ViewDefinition* ViewCatalog::lookup(StringData ns) {
return nullptr;
}
-StatusWith<ResolvedViewDefinition> ViewCatalog::resolveView(OperationContext* txn,
- const NamespaceString& nss) {
+StatusWith<ResolvedView> ViewCatalog::resolveView(OperationContext* txn,
+ const NamespaceString& nss) {
const NamespaceString* resolvedNss = &nss;
std::vector<BSONObj> resolvedPipeline;
for (std::uint32_t i = 0; i < ViewCatalog::kMaxViewDepth; i++) {
ViewDefinition* view = lookup(resolvedNss->ns());
if (!view)
- return StatusWith<ResolvedViewDefinition>({*resolvedNss, resolvedPipeline});
+ return StatusWith<ResolvedView>({*resolvedNss, resolvedPipeline});
resolvedNss = &(view->viewOn());
diff --git a/src/mongo/db/views/view_catalog.h b/src/mongo/db/views/view_catalog.h
index 91c9f694d48..c3cbb505f99 100644
--- a/src/mongo/db/views/view_catalog.h
+++ b/src/mongo/db/views/view_catalog.h
@@ -39,30 +39,14 @@
#include "mongo/base/string_data.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/views/durable_view_catalog.h"
+#include "mongo/db/views/resolved_view.h"
#include "mongo/db/views/view.h"
#include "mongo/util/string_map.h"
namespace mongo {
-class AggregationRequest;
-class Database;
class OperationContext;
/**
- * Represents a fully-resolved view: a non-view namespace with a corresponding aggregation pipeline.
- */
-struct ResolvedViewDefinition {
- /**
- * Creates a new aggregation command object for a view operation. The new command is an
- * aggregation on 'collectionNss', and its pipeline is the concatenation of 'pipeline' with the
- * pipeline of 'request'.
- */
- BSONObj asExpandedViewAggregation(const AggregationRequest& request);
-
- NamespaceString collectionNss;
- std::vector<BSONObj> pipeline;
-};
-
-/**
* In-memory data structure for view definitions. Note that this structure is not thread-safe; you
* must be holding a database lock to access a database's view catalog.
*/
@@ -121,8 +105,7 @@ public:
*
* It is illegal to call this function on a namespace that is not a view.
*/
- StatusWith<ResolvedViewDefinition> resolveView(OperationContext* txn,
- const NamespaceString& nss);
+ StatusWith<ResolvedView> resolveView(OperationContext* txn, const NamespaceString& nss);
private:
ViewMap _viewMap;
diff --git a/src/mongo/db/views/view_sharding_check.cpp b/src/mongo/db/views/view_sharding_check.cpp
new file mode 100644
index 00000000000..cc662ff2549
--- /dev/null
+++ b/src/mongo/db/views/view_sharding_check.cpp
@@ -0,0 +1,97 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/views/view_sharding_check.h"
+
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/server_options.h"
+#include "mongo/db/views/view_catalog.h"
+#include "mongo/s/stale_exception.h"
+
+namespace mongo {
+
+StatusWith<BSONObj> ViewShardingCheck::getResolvedViewIfSharded(OperationContext* opCtx,
+ Database* db,
+ const ViewDefinition* view) {
+ invariant(opCtx);
+ invariant(db);
+ invariant(view);
+
+ if (ClusterRole::ShardServer != serverGlobalParams.clusterRole) {
+ // This node is not part of a sharded cluster, so the collection cannot be sharded.
+ return BSONObj();
+ }
+
+ auto resolvedView = db->getViewCatalog()->resolveView(opCtx, view->name());
+ if (!resolvedView.isOK()) {
+ return resolvedView.getStatus();
+ }
+
+ const auto& sourceNss = resolvedView.getValue().getNamespace();
+ const auto isPrimary =
+ repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext())
+ ->canAcceptWritesForDatabase(db->name());
+
+ if (isPrimary && !collectionIsSharded(opCtx, sourceNss)) {
+ return BSONObj();
+ }
+
+ BSONObjBuilder viewDetailBob;
+ viewDetailBob.append("ns", sourceNss.ns());
+ viewDetailBob.append("pipeline", resolvedView.getValue().getPipeline());
+
+ return viewDetailBob.obj();
+}
+
+void ViewShardingCheck::appendShardedViewStatus(const BSONObj& resolvedView, BSONObjBuilder* out) {
+ invariant(out);
+ invariant(!resolvedView.isEmpty());
+
+ out->append("resolvedView", resolvedView);
+ Status status{ErrorCodes::CommandOnShardedViewNotSupportedOnMongod,
+ str::stream() << "Command on view must be executed by mongos"};
+ Command::appendCommandStatus(*out, status);
+}
+
+bool ViewShardingCheck::collectionIsSharded(OperationContext* opCtx, const NamespaceString& nss) {
+ // The database is locked at this point but the collection underlying the given view is not
+ // and must be for a sharding check.
+ dassert(opCtx->lockState()->isDbLockedForMode(nss.db(), MODE_IS));
+ AutoGetCollection autoGetCol(opCtx, nss, MODE_IS);
+ return CollectionShardingState::get(opCtx, nss)->collectionIsSharded();
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/views/view_sharding_check.h b/src/mongo/db/views/view_sharding_check.h
new file mode 100644
index 00000000000..d3437353212
--- /dev/null
+++ b/src/mongo/db/views/view_sharding_check.h
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+
+#include <string>
+#include <vector>
+
+namespace mongo {
+
+class Database;
+class NamespaceString;
+class OperationContext;
+class ViewDefinition;
+
+/**
+ * When a read against a view is forwarded from mongoS, it is done so without any awareness as to
+ * whether the underlying collection is sharded. If it is found that the underlying collection is
+ * sharded(*) we return an error to mongos with the view definition requesting
+ * that the resolved read operation be executed there.
+ *
+ * (*) We have incomplete sharding state on secondaries. If we are a secondary, then we have to
+ * assume that the collection backing the view could be sharded.
+ */
+class ViewShardingCheck {
+public:
+ /**
+ * If it is determined that a view's underlying collection may be sharded this method returns
+ * a BSONObj containing the resolved view definition. If the underlying collection is not
+ * sharded an empty BSONObj is returned.
+ *
+ * Will return an error if the ViewCatalog is unable to generate a resolved view.
+ */
+ static StatusWith<BSONObj> getResolvedViewIfSharded(OperationContext* opCtx,
+ Database* db,
+ const ViewDefinition* view);
+
+ /**
+ * Appends the resolved view definition and CommandOnShardedViewNotSupportedOnMongod status to
+ * 'result'.
+ */
+ static void appendShardedViewStatus(const BSONObj& resolvedView, BSONObjBuilder* result);
+
+private:
+ /**
+ * Confirms whether 'ns' represents a sharded collection. Only valid if the calling
+ * member is primary.
+ */
+ static bool collectionIsSharded(OperationContext* opCtx, const NamespaceString& nss);
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index 4d46ada00e6..49667af9ee4 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -76,6 +76,7 @@ env.Library(
'$BUILD_DIR/mongo/client/parallel',
'$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/db/commands/killcursors_common',
+ '$BUILD_DIR/mongo/db/views/views',
'$BUILD_DIR/mongo/s/cluster_ops_impl',
'$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/s/mongoscore',
diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp
index e2a31a05bfa..93d7f02bd69 100644
--- a/src/mongo/s/commands/cluster_count_cmd.cpp
+++ b/src/mongo/s/commands/cluster_count_cmd.cpp
@@ -31,6 +31,10 @@
#include <vector>
#include "mongo/db/commands.h"
+#include "mongo/db/query/count_request.h"
+#include "mongo/db/query/view_response_formatter.h"
+#include "mongo/db/views/resolved_view.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/commands/cluster_explain.h"
#include "mongo/s/commands/strategy.h"
@@ -156,6 +160,40 @@ public:
Strategy::commandOp(
txn, dbname, countCmdBuilder.done(), options, nss.ns(), filter, &countResult);
+ if (countResult.size() == 1 &&
+ ResolvedView::isResolvedViewErrorResponse(countResult[0].result)) {
+ auto countRequest = CountRequest::parseFromBSON(dbname, cmdObj, false);
+ if (!countRequest.isOK()) {
+ return appendCommandStatus(result, countRequest.getStatus());
+ }
+
+ auto aggCmdOnView = countRequest.getValue().asAggregationCommand();
+ if (!aggCmdOnView.isOK()) {
+ return appendCommandStatus(result, aggCmdOnView.getStatus());
+ }
+
+ auto resolvedView = ResolvedView::fromBSON(countResult[0].result);
+ auto aggCmd = resolvedView.asExpandedViewAggregation(aggCmdOnView.getValue());
+ if (!aggCmd.isOK()) {
+ return appendCommandStatus(result, aggCmd.getStatus());
+ }
+
+
+ BSONObjBuilder aggResult;
+ Command::findCommand("aggregate")
+ ->run(txn, dbname, aggCmd.getValue(), options, errmsg, aggResult);
+
+ result.resetToEmpty();
+ ViewResponseFormatter formatter(aggResult.obj());
+ auto formatStatus = formatter.appendAsCountResponse(&result);
+ if (!formatStatus.isOK()) {
+ return appendCommandStatus(result, formatStatus);
+ }
+
+ return true;
+ }
+
+
long long total = 0;
BSONObjBuilder shardSubTotal(result.subobjStart("shards"));
@@ -224,6 +262,33 @@ public:
long long millisElapsed = timer.millis();
+ if (shardResults.size() == 1 &&
+ ResolvedView::isResolvedViewErrorResponse(shardResults[0].result)) {
+ auto countRequest = CountRequest::parseFromBSON(dbname, cmdObj, true);
+ if (!countRequest.isOK()) {
+ return countRequest.getStatus();
+ }
+
+ auto aggCmdOnView = countRequest.getValue().asAggregationCommand();
+ if (!aggCmdOnView.isOK()) {
+ return aggCmdOnView.getStatus();
+ }
+
+ auto resolvedView = ResolvedView::fromBSON(shardResults[0].result);
+ auto aggCmd = resolvedView.asExpandedViewAggregation(aggCmdOnView.getValue());
+ if (!aggCmd.isOK()) {
+ return aggCmd.getStatus();
+ }
+
+ std::string errMsg;
+ if (Command::findCommand("aggregate")
+ ->run(txn, dbname, aggCmd.getValue(), 0, errMsg, *out)) {
+ return Status::OK();
+ }
+
+ return getStatusFromCommandResult(out->asTempObj());
+ }
+
const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResults, cmdObj);
return ClusterExplain::buildExplainResult(
diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp
index 1bd898c2c37..9a742c979e6 100644
--- a/src/mongo/s/commands/cluster_find_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_cmd.cpp
@@ -36,6 +36,8 @@
#include "mongo/db/matcher/extensions_callback_noop.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/stats/counters.h"
+#include "mongo/db/views/resolved_view.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/commands/strategy.h"
#include "mongo/s/query/cluster_find.h"
@@ -118,8 +120,36 @@ public:
return qr.getStatus();
}
- return Strategy::explainFind(
+ auto result = Strategy::explainFind(
txn, cmdObj, *qr.getValue(), verbosity, serverSelectionMetadata, out);
+
+ if (result == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) {
+ auto resolvedView = ResolvedView::fromBSON(out->asTempObj());
+ out->resetToEmpty();
+
+ auto aggCmdOnView = qr.getValue().get()->asAggregationCommand();
+ if (!aggCmdOnView.isOK()) {
+ return aggCmdOnView.getStatus();
+ }
+
+ auto aggCmd = resolvedView.asExpandedViewAggregation(aggCmdOnView.getValue());
+ if (!aggCmd.isOK()) {
+ return aggCmd.getStatus();
+ }
+
+ Command* c = Command::findCommand("aggregate");
+ int queryOptions = 0;
+ std::string errMsg;
+
+ if (c->run(txn, dbname, aggCmd.getValue(), queryOptions, errMsg, *out)) {
+ return Status::OK();
+ }
+
+ BSONObj tmp = out->asTempObj();
+ return getStatusFromCommandResult(out->asTempObj());
+ }
+
+ return result;
}
bool run(OperationContext* txn,
@@ -161,8 +191,26 @@ public:
// Do the work to generate the first batch of results. This blocks waiting to get responses
// from the shard(s).
std::vector<BSONObj> batch;
- auto cursorId = ClusterFind::runQuery(txn, *cq.getValue(), readPref.getValue(), &batch);
+ BSONObj viewDefinition;
+ auto cursorId = ClusterFind::runQuery(
+ txn, *cq.getValue(), readPref.getValue(), &batch, &viewDefinition);
if (!cursorId.isOK()) {
+ if (cursorId.getStatus() == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) {
+ auto aggCmdOnView = cq.getValue()->getQueryRequest().asAggregationCommand();
+ if (!aggCmdOnView.isOK()) {
+ return appendCommandStatus(result, aggCmdOnView.getStatus());
+ }
+
+ auto resolvedView = ResolvedView::fromBSON(viewDefinition);
+ auto aggCmd = resolvedView.asExpandedViewAggregation(aggCmdOnView.getValue());
+ if (!aggCmd.isOK()) {
+ return appendCommandStatus(result, aggCmd.getStatus());
+ }
+
+ return Command::findCommand("aggregate")
+ ->run(txn, dbname, aggCmd.getValue(), options, errmsg, result);
+ }
+
return appendCommandStatus(result, cursorId.getStatus());
}
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index 7b4adf1c9fe..bac39eceb29 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -43,6 +43,8 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/views/resolved_view.h"
+#include "mongo/db/views/view.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/platform/random.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -115,7 +117,7 @@ public:
shared_ptr<DBConfig> conf = status.getValue();
if (!conf->isShardingEnabled()) {
- return aggPassthrough(txn, conf, cmdObj, result, options);
+ return aggPassthrough(txn, dbname, conf, cmdObj, result, options, errmsg);
}
auto request = AggregationRequest::parseFromBSON(NamespaceString(fullns), cmdObj);
@@ -140,7 +142,7 @@ public:
}
if (!conf->isSharded(fullns)) {
- return aggPassthrough(txn, conf, cmdObj, result, options);
+ return aggPassthrough(txn, dbname, conf, cmdObj, result, options, errmsg);
}
// If the first $match stage is an exact match on the shard key, we only have to send it
@@ -287,10 +289,12 @@ private:
BSONObj aggRunCommand(DBClientBase* conn, const string& db, BSONObj cmd, int queryOptions);
bool aggPassthrough(OperationContext* txn,
+ const std::string& dbname,
shared_ptr<DBConfig> conf,
BSONObj cmd,
BSONObjBuilder& result,
- int queryOptions);
+ int queryOptions,
+ std::string& errmsg);
} clusterPipelineCmd;
std::vector<DocumentSourceMergeCursors::CursorDescriptor> PipelineCommand::parseCursors(
@@ -432,14 +436,16 @@ BSONObj PipelineCommand::aggRunCommand(DBClientBase* conn,
}
bool PipelineCommand::aggPassthrough(OperationContext* txn,
+ const std::string& dbname,
shared_ptr<DBConfig> conf,
- BSONObj cmd,
+ BSONObj cmdObj,
BSONObjBuilder& out,
- int queryOptions) {
+ int queryOptions,
+ std::string& errmsg) {
// Temporary hack. See comment on declaration for details.
const auto shard = grid.shardRegistry()->getShard(txn, conf->getPrimaryId());
ShardConnection conn(shard->getConnString(), "");
- BSONObj result = aggRunCommand(conn.get(), conf->name(), cmd, queryOptions);
+ BSONObj result = aggRunCommand(conn.get(), conf->name(), cmdObj, queryOptions);
conn.done();
// First append the properly constructed writeConcernError. It will then be skipped
@@ -449,6 +455,28 @@ bool PipelineCommand::aggPassthrough(OperationContext* txn,
}
out.appendElementsUnique(result);
+
+ BSONObj responseObj = out.asTempObj();
+ if (ResolvedView::isResolvedViewErrorResponse(responseObj)) {
+ auto resolvedView = ResolvedView::fromBSON(responseObj);
+
+ auto request = AggregationRequest::parseFromBSON(resolvedView.getNamespace(), cmdObj);
+ if (!request.isOK()) {
+ out.resetToEmpty();
+ return appendCommandStatus(out, request.getStatus());
+ }
+
+ auto aggCmd = resolvedView.asExpandedViewAggregation(request.getValue());
+ if (!aggCmd.isOK()) {
+ out.resetToEmpty();
+ return appendCommandStatus(out, aggCmd.getStatus());
+ }
+
+ out.resetToEmpty();
+ return Command::findCommand("aggregate")
+ ->run(txn, dbname, aggCmd.getValue(), queryOptions, errmsg, out);
+ }
+
return result["ok"].trueValue();
}
diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp
index 098aa63376c..09b94d8f0ae 100644
--- a/src/mongo/s/commands/commands_public.cpp
+++ b/src/mongo/s/commands/commands_public.cpp
@@ -42,6 +42,10 @@
#include "mongo/db/commands/copydb.h"
#include "mongo/db/commands/rename_collection.h"
#include "mongo/db/lasterror.h"
+#include "mongo/db/matcher/extensions_callback_noop.h"
+#include "mongo/db/query/parsed_distinct.h"
+#include "mongo/db/query/view_response_formatter.h"
+#include "mongo/db/views/resolved_view.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/catalog_cache.h"
@@ -1124,7 +1128,45 @@ public:
shared_ptr<DBConfig> conf = status.getValue();
if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) {
- return passthrough(txn, conf.get(), cmdObj, options, result);
+
+ if (passthrough(txn, conf.get(), cmdObj, options, result)) {
+ return true;
+ }
+
+ BSONObj resultObj = result.asTempObj();
+ if (ResolvedView::isResolvedViewErrorResponse(resultObj)) {
+ auto resolvedView = ResolvedView::fromBSON(resultObj);
+ result.resetToEmpty();
+
+ auto parsedDistinct = ParsedDistinct::parse(
+ txn, resolvedView.getNamespace(), cmdObj, ExtensionsCallbackNoop(), false);
+ if (!parsedDistinct.isOK()) {
+ return appendCommandStatus(result, parsedDistinct.getStatus());
+ }
+
+ auto aggCmdOnView = parsedDistinct.getValue().asAggregationCommand();
+ if (!aggCmdOnView.isOK()) {
+ return appendCommandStatus(result, aggCmdOnView.getStatus());
+ }
+
+ auto aggCmd = resolvedView.asExpandedViewAggregation(aggCmdOnView.getValue());
+ if (!aggCmd.isOK()) {
+ return appendCommandStatus(result, aggCmd.getStatus());
+ }
+
+ BSONObjBuilder aggResult;
+ Command::findCommand("aggregate")
+ ->run(txn, dbName, aggCmd.getValue(), options, errmsg, aggResult);
+
+ ViewResponseFormatter formatter(aggResult.obj());
+ auto formatStatus = formatter.appendAsDistinctResponse(&result);
+ if (!formatStatus.isOK()) {
+ return appendCommandStatus(result, formatStatus);
+ }
+ return true;
+ }
+
+ return false;
}
shared_ptr<ChunkManager> cm = conf->getChunkManager(txn, fullns);
@@ -1210,6 +1252,34 @@ public:
long long millisElapsed = timer.millis();
+ if (shardResults.size() == 1 &&
+ ResolvedView::isResolvedViewErrorResponse(shardResults[0].result)) {
+ auto resolvedView = ResolvedView::fromBSON(shardResults[0].result);
+ auto parsedDistinct = ParsedDistinct::parse(
+ txn, resolvedView.getNamespace(), cmdObj, ExtensionsCallbackNoop(), true);
+ if (!parsedDistinct.isOK()) {
+ return parsedDistinct.getStatus();
+ }
+
+ auto aggCmdOnView = parsedDistinct.getValue().asAggregationCommand();
+ if (!aggCmdOnView.isOK()) {
+ return aggCmdOnView.getStatus();
+ }
+
+ auto aggCmd = resolvedView.asExpandedViewAggregation(aggCmdOnView.getValue());
+ if (!aggCmd.isOK()) {
+ return aggCmd.getStatus();
+ }
+
+ std::string errMsg;
+ if (Command::findCommand("aggregate")
+ ->run(txn, dbname, aggCmd.getValue(), 0, errMsg, *out)) {
+ return Status::OK();
+ }
+
+ return getStatusFromCommandResult(out->asTempObj());
+ }
+
const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResults, cmdObj);
return ClusterExplain::buildExplainResult(
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 4bfccd4ce2c..d121c36346d 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -52,6 +52,8 @@
#include "mongo/db/query/query_request.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/stats/counters.h"
+#include "mongo/db/views/resolved_view.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/s/bson_serializable.h"
#include "mongo/s/catalog/catalog_cache.h"
@@ -192,7 +194,18 @@ void Strategy::queryOp(OperationContext* txn, Request& request) {
// 0 means the cursor is exhausted. Otherwise we assume that a cursor with the returned id can
// be retrieved via the ClusterCursorManager.
- auto cursorId = ClusterFind::runQuery(txn, *canonicalQuery.getValue(), readPreference, &batch);
+ auto cursorId =
+ ClusterFind::runQuery(txn,
+ *canonicalQuery.getValue(),
+ readPreference,
+ &batch,
+ nullptr /*Argument is for views which OP_QUERY doesn't support*/);
+
+ if (!cursorId.isOK() &&
+ cursorId.getStatus() == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) {
+ uasserted(40247, "OP_QUERY not supported on views");
+ }
+
uassertStatusOK(cursorId.getStatus());
// Fill out the response buffer.
@@ -530,6 +543,12 @@ Status Strategy::explainFind(OperationContext* txn,
long long millisElapsed = timer.millis();
+ if (shardResults.size() == 1 &&
+ ResolvedView::isResolvedViewErrorResponse(shardResults[0].result)) {
+ out->append("resolvedView", shardResults[0].result.getObjectField("resolvedView"));
+ return getStatusFromCommandResult(shardResults[0].result);
+ }
+
const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResults, findCommand);
return ClusterExplain::buildExplainResult(
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 2d1919769ad..31d9337b10f 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -182,7 +182,7 @@ bool AsyncResultsMerger::readyUnsorted_inlock() {
return allExhausted;
}
-StatusWith<boost::optional<BSONObj>> AsyncResultsMerger::nextReady() {
+StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
dassert(ready_inlock());
if (_lifecycleState != kAlive) {
@@ -195,19 +195,19 @@ StatusWith<boost::optional<BSONObj>> AsyncResultsMerger::nextReady() {
if (_eofNext) {
_eofNext = false;
- return {boost::none};
+ return {ClusterQueryResult()};
}
const bool hasSort = !_params.sort.isEmpty();
return hasSort ? nextReadySorted() : nextReadyUnsorted();
}
-boost::optional<BSONObj> AsyncResultsMerger::nextReadySorted() {
+ClusterQueryResult AsyncResultsMerger::nextReadySorted() {
// Tailable cursors cannot have a sort.
invariant(!_params.isTailable);
if (_mergeQueue.empty()) {
- return boost::none;
+ return {};
}
size_t smallestRemote = _mergeQueue.top();
@@ -216,7 +216,7 @@ boost::optional<BSONObj> AsyncResultsMerger::nextReadySorted() {
invariant(!_remotes[smallestRemote].docBuffer.empty());
invariant(_remotes[smallestRemote].status.isOK());
- BSONObj front = _remotes[smallestRemote].docBuffer.front();
+ ClusterQueryResult front = _remotes[smallestRemote].docBuffer.front();
_remotes[smallestRemote].docBuffer.pop();
// Re-populate the merging queue with the next result from 'smallestRemote', if it has a
@@ -228,14 +228,14 @@ boost::optional<BSONObj> AsyncResultsMerger::nextReadySorted() {
return front;
}
-boost::optional<BSONObj> AsyncResultsMerger::nextReadyUnsorted() {
+ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() {
size_t remotesAttempted = 0;
while (remotesAttempted < _remotes.size()) {
// It is illegal to call this method if there is an error received from any shard.
invariant(_remotes[_gettingFromRemote].status.isOK());
if (_remotes[_gettingFromRemote].hasNext()) {
- BSONObj front = _remotes[_gettingFromRemote].docBuffer.front();
+ ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front();
_remotes[_gettingFromRemote].docBuffer.pop();
if (_params.isTailable && !_remotes[_gettingFromRemote].hasNext()) {
@@ -255,7 +255,7 @@ boost::optional<BSONObj> AsyncResultsMerger::nextReadyUnsorted() {
}
}
- return boost::none;
+ return {};
}
Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
@@ -436,6 +436,38 @@ void AsyncResultsMerger::handleBatchResponse(
: cbData.response.getStatus());
if (!cursorResponseStatus.isOK()) {
+ // In the case a read is performed against a view, the shard primary can return an error
+ // indicating that the underlying collection may be sharded. When this occurs the return
+ // message will include an expanded view definition and collection namespace which we need
+ // to store. This allows for a second attempt at the read directly against the underlying
+ // collection.
+ if (cursorResponseStatus.getStatus() ==
+ ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) {
+ auto& responseObj = cbData.response.getValue().data;
+ if (!responseObj.hasField("resolvedView")) {
+ remote.status = Status(ErrorCodes::InternalError,
+ str::stream() << "Missing field 'resolvedView' in document: "
+ << responseObj);
+ return;
+ }
+
+ auto resolvedViewObj = responseObj.getObjectField("resolvedView");
+ if (resolvedViewObj.isEmpty()) {
+ remote.status = Status(ErrorCodes::InternalError,
+ str::stream() << "Field 'resolvedView' must be an object: "
+ << responseObj);
+ return;
+ }
+
+ ClusterQueryResult result;
+ result.setViewDefinition(resolvedViewObj.getOwned());
+
+ remote.docBuffer.push(result);
+ remote.cursorId = 0;
+ remote.status = Status::OK();
+ return;
+ }
+
auto shard = remote.getShard();
if (!shard) {
remote.status = Status(cursorResponseStatus.getStatus().code(),
@@ -479,7 +511,7 @@ void AsyncResultsMerger::handleBatchResponse(
remote.status = Status::OK();
// Clear the results buffer and cursor id.
- std::queue<BSONObj> emptyBuffer;
+ std::queue<ClusterQueryResult> emptyBuffer;
std::swap(remote.docBuffer, emptyBuffer);
remote.cursorId = 0;
}
@@ -504,7 +536,8 @@ void AsyncResultsMerger::handleBatchResponse(
return;
}
- remote.docBuffer.push(obj);
+ ClusterQueryResult result(obj);
+ remote.docBuffer.push(result);
++remote.fetchedCount;
}
@@ -677,11 +710,11 @@ std::shared_ptr<Shard> AsyncResultsMerger::RemoteCursorData::getShard() {
//
bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const size_t& rhs) {
- const BSONObj& leftDoc = _remotes[lhs].docBuffer.front();
- const BSONObj& rightDoc = _remotes[rhs].docBuffer.front();
+ const ClusterQueryResult& leftDoc = _remotes[lhs].docBuffer.front();
+ const ClusterQueryResult& rightDoc = _remotes[rhs].docBuffer.front();
- BSONObj leftDocKey = leftDoc[ClusterClientCursorParams::kSortKeyField].Obj();
- BSONObj rightDocKey = rightDoc[ClusterClientCursorParams::kSortKeyField].Obj();
+ BSONObj leftDocKey = (*leftDoc.getResult())[ClusterClientCursorParams::kSortKeyField].Obj();
+ BSONObj rightDocKey = (*rightDoc.getResult())[ClusterClientCursorParams::kSortKeyField].Obj();
// This does not need to sort with a collator, since mongod has already mapped strings to their
// ICU comparison keys as part of the $sortKey meta projection.
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 8956dca52c8..85a3fbd30ed 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -38,6 +38,7 @@
#include "mongo/db/cursor_id.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
+#include "mongo/s/query/cluster_query_result.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
@@ -128,10 +129,10 @@ public:
* status.
*
* If this AsyncResultsMerger is fetching results from a remote cursor tailing a capped
- * collection, may return boost::none before end-of-stream. (Tailable cursors remain open even
- * when there are no further results, and may subsequently return more results when they become
- * available.) The calling code is responsible for handling multiple boost::none return values,
- * keeping the cursor open in the tailable case.
+ * collection, may return an empty ClusterQueryResult before end-of-stream. (Tailable cursors
+ * remain open even when there are no further results, and may subsequently return more results
+ * when they become available.) The calling code is responsible for handling multiple empty,
+ * ClusterQueryResult return values, keeping the cursor open in the tailable case.
*
* If there has been an error received from one of the shards, or there is an error in
* processing results from a shard, then a non-ok status is returned.
@@ -139,7 +140,7 @@ public:
* Invalid to call unless ready() has returned true (i.e., invalid to call if getting the next
* result requires scheduling remote work).
*/
- StatusWith<boost::optional<BSONObj>> nextReady();
+ StatusWith<ClusterQueryResult> nextReady();
/**
* Schedules remote work as required in order to make further results available. If there is an
@@ -234,7 +235,7 @@ private:
// established but is now exhausted, this member will be set to zero.
boost::optional<CursorId> cursorId;
- std::queue<BSONObj> docBuffer;
+ std::queue<ClusterQueryResult> docBuffer;
executor::TaskExecutor::CallbackHandle cbHandle;
Status status = Status::OK();
@@ -309,8 +310,8 @@ private:
// Helpers for nextReady().
//
- boost::optional<BSONObj> nextReadySorted();
- boost::optional<BSONObj> nextReadyUnsorted();
+ ClusterQueryResult nextReadySorted();
+ ClusterQueryResult nextReadyUnsorted();
/**
* When nextEvent() schedules remote work, it passes this method as a callback. The TaskExecutor
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index e2957554ad2..a5179f16c68 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -141,6 +141,24 @@ protected:
}
/**
+ * Schedules a "CommandOnShardedViewNotSupportedOnMongod" error response w/ view definition.
+ */
+ void scheduleNetworkViewResponse(const std::string& ns, const std::string& pipelineJsonArr) {
+ BSONObjBuilder viewDefBob;
+ viewDefBob.append("ns", ns);
+ viewDefBob.append("pipeline", fromjson(pipelineJsonArr));
+
+ BSONObjBuilder bob;
+ bob.append("resolvedView", viewDefBob.obj());
+ bob.append("ok", 0.0);
+ bob.append("errmsg", "Command on view must be executed by mongos");
+ bob.append("code", 169);
+
+ std::vector<BSONObj> batch = {bob.obj()};
+ scheduleNetworkResponseObjs(batch);
+ }
+
+ /**
* Schedules a list of cursor responses to be returned by the mock network.
*/
void scheduleNetworkResponses(std::vector<CursorResponse> responses,
@@ -235,19 +253,19 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) {
ASSERT_TRUE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
@@ -270,17 +288,17 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
ASSERT_FALSE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
readyEvent = unittest::assertGet(arm->nextEvent());
@@ -300,13 +318,13 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
ASSERT_FALSE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 10}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 10}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
readyEvent = unittest::assertGet(arm->nextEvent());
@@ -322,9 +340,9 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
ASSERT_TRUE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 11}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 11}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, ClusterFindSorted) {
@@ -349,19 +367,25 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSorted) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 3, $sortKey: {'': 3}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 3, $sortKey: {'': 3}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 4, $sortKey: {'': 4}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 4, $sortKey: {'': 4}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 5, $sortKey: {'': 5}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 5, $sortKey: {'': 5}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 6, $sortKey: {'': 6}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 6, $sortKey: {'': 6}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 8, $sortKey: {'': 8}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 8, $sortKey: {'': 8}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 9, $sortKey: {'': 9}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 9, $sortKey: {'': 9}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
@@ -386,13 +410,13 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 3}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 3}}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 4}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 4}}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 5}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 5}}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 6}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 6}}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
readyEvent = unittest::assertGet(arm->nextEvent());
@@ -407,11 +431,11 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 7}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 7}}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 7}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 7}}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 8}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 8}}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
readyEvent = unittest::assertGet(arm->nextEvent());
@@ -426,13 +450,13 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 9}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 9}}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 10}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 10}}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 10}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 10}}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) {
@@ -457,19 +481,25 @@ TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 10, '': 11}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 10, '': 11}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 10, '': 12}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 10, '': 12}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 5, '': 9}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 5, '': 9}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 5, '': 9}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 5, '': 9}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 4, '': 4}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 4, '': 4}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{$sortKey: {'': 4, '': 20}}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{$sortKey: {'': 4, '': 20}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) {
@@ -528,7 +558,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
readyEvent = unittest::assertGet(arm->nextEvent());
@@ -551,9 +581,40 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_TRUE(arm->ready());
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
+}
+
+TEST_F(AsyncResultsMergerTest, ReceivedViewDefinitionFromShard) {
+ BSONObj findCmd = fromjson("{find: 'testcoll'}");
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+
+ ASSERT_FALSE(arm->ready());
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ std::string inputNs = "views_sharded.coll";
+ std::string inputPipeline = "[ { $match: { a: { $gte: 5.0 } } } ]";
+ scheduleNetworkViewResponse(inputNs, inputPipeline);
+
+ executor()->waitForEvent(readyEvent);
+
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+
+ auto statusWithNext = arm->nextReady();
+ ASSERT(statusWithNext.isOK());
+
+ auto viewDef = statusWithNext.getValue().getViewDefinition();
+ ASSERT(viewDef);
+
+ auto outputPipeline = (*viewDef)["pipeline"];
+ ASSERT(!outputPipeline.eoo());
+ ASSERT_EQ(fromjson(inputPipeline), outputPipeline.Obj());
+
+ auto outputNs = (*viewDef)["ns"];
+ ASSERT(!outputNs.eoo());
+ ASSERT_EQ(outputNs.String(), inputNs);
}
TEST_F(AsyncResultsMergerTest, ExistingCursors) {
@@ -573,15 +634,15 @@ TEST_F(AsyncResultsMergerTest, ExistingCursors) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT(unittest::assertGet(arm->nextReady()).isEOF());
}
@@ -603,13 +664,13 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
readyEvent = unittest::assertGet(arm->nextEvent());
@@ -626,9 +687,9 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
readyEvent = unittest::assertGet(arm->nextEvent());
@@ -643,9 +704,9 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady()).getResult());
// Kill cursor before deleting it, as the second remote cursor has not been exhausted. We don't
// wait on 'killEvent' here, as the blackholed request's callback will only run on shutdown of
@@ -670,11 +731,11 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
readyEvent = unittest::assertGet(arm->nextEvent());
@@ -769,11 +830,11 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
// Required to kill the 'arm' on error before destruction.
auto killEvent = arm->kill();
@@ -899,9 +960,9 @@ TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) {
// First batch received.
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
// This will schedule a getMore on cursor id 123.
ASSERT_FALSE(arm->ready());
@@ -978,13 +1039,13 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
- // In the tailable case, we expect boost::none after every batch.
+ // In the tailable case, we expect EOF after every batch.
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
ASSERT_FALSE(arm->remotesExhausted());
ASSERT_FALSE(arm->ready());
@@ -1000,9 +1061,9 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
ASSERT_TRUE(arm->ready());
ASSERT_FALSE(arm->remotesExhausted());
- ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
ASSERT_FALSE(arm->remotesExhausted());
auto killedEvent = arm->kill();
@@ -1027,7 +1088,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
// After receiving an empty batch, the ARM should return boost::none, but remotes should not be
// marked as exhausted.
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
ASSERT_FALSE(arm->remotesExhausted());
auto killedEvent = arm->kill();
@@ -1052,7 +1113,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
// Afterwards, the ARM should return boost::none and remote cursors should be marked as
// exhausted.
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
ASSERT_TRUE(arm->remotesExhausted());
}
@@ -1072,9 +1133,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
responses.clear();
@@ -1093,9 +1154,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) {
@@ -1117,9 +1178,9 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
@@ -1145,9 +1206,9 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1166,7 +1227,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1181,7 +1242,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
@@ -1199,9 +1260,9 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
readyEvent = unittest::assertGet(arm->nextEvent());
@@ -1211,7 +1272,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
// EOF.
scheduleErrorResponse({ErrorCodes::AuthenticationFailed, "authentication failed"});
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkSingleNode) {
@@ -1237,7 +1298,7 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkSingleNode) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());
@@ -1300,7 +1361,7 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) {
ASSERT_TRUE(arm->ready());
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());
@@ -1321,9 +1382,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
ASSERT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
@@ -1346,9 +1407,9 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
- ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_TRUE(arm->ready());
- ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
}
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index 47f4e46f89a..c4bb1c9373e 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -31,6 +31,7 @@
#include <boost/optional.hpp>
#include "mongo/db/jsobj.h"
+#include "mongo/s/query/cluster_query_result.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -58,12 +59,12 @@ public:
* Returns the next available result document (along with an ok status). May block waiting
* for results from remote nodes.
*
- * If there are no further results, the end of the stream is indicated with boost::none and
- * an ok status.
+ * If there are no further results, the end of the stream is indicated with an empty
+ * QueryResult and an ok status.
*
* A non-ok status is returned in case of any error.
*/
- virtual StatusWith<boost::optional<BSONObj>> next() = 0;
+ virtual StatusWith<ClusterQueryResult> next() = 0;
/**
* Must be called before destruction to abandon a not-yet-exhausted cursor. If next() has
@@ -84,14 +85,15 @@ public:
virtual long long getNumReturnedSoFar() const = 0;
/**
- * Stash the BSONObj so that it gets returned from the CCC on a later call to next().
+ * Stash the ClusterQueryResult so that it gets returned from the CCC on a later call to
+ * next().
*
* Queued documents are returned in FIFO order. The queued results are exhausted before
* generating further results from the underlying mongos query stages.
*
* 'obj' must be owned BSON.
*/
- virtual void queueResult(const BSONObj& obj) = 0;
+ virtual void queueResult(const ClusterQueryResult& result) = 0;
/**
* Returns whether or not all the remote cursors underlying this cursor have been exhausted.
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index d37b78f2a58..1498164f9ac 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -72,17 +72,17 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(executor::TaskExecutor* executo
ClusterClientCursorImpl::ClusterClientCursorImpl(std::unique_ptr<RouterStageMock> root)
: _root(std::move(root)) {}
-StatusWith<boost::optional<BSONObj>> ClusterClientCursorImpl::next() {
+StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next() {
// First return stashed results, if there are any.
if (!_stash.empty()) {
- BSONObj front = std::move(_stash.front());
+ auto front = std::move(_stash.front());
_stash.pop();
++_numReturnedSoFar;
return {front};
}
auto next = _root->next();
- if (next.isOK() && next.getValue()) {
+ if (next.isOK() && !next.getValue().isEOF()) {
++_numReturnedSoFar;
}
return next;
@@ -100,9 +100,18 @@ long long ClusterClientCursorImpl::getNumReturnedSoFar() const {
return _numReturnedSoFar;
}
-void ClusterClientCursorImpl::queueResult(const BSONObj& obj) {
- invariant(obj.isOwned());
- _stash.push(obj);
+void ClusterClientCursorImpl::queueResult(const ClusterQueryResult& result) {
+ auto resultObj = result.getResult();
+ if (resultObj) {
+ invariant(resultObj->isOwned());
+ }
+
+ auto viewDef = result.getViewDefinition();
+ if (viewDef) {
+ invariant(viewDef->isOwned());
+ }
+
+ _stash.push(result);
}
bool ClusterClientCursorImpl::remotesExhausted() {
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index d8645bb7834..71f018deec7 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -34,6 +34,7 @@
#include "mongo/executor/task_executor.h"
#include "mongo/s/query/cluster_client_cursor.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
+#include "mongo/s/query/cluster_query_result.h"
#include "mongo/s/query/router_exec_stage.h"
#include "mongo/util/net/hostandport.h"
@@ -91,7 +92,7 @@ public:
*/
ClusterClientCursorImpl(std::unique_ptr<RouterStageMock> root);
- StatusWith<boost::optional<BSONObj>> next() final;
+ StatusWith<ClusterQueryResult> next() final;
void kill() final;
@@ -99,7 +100,7 @@ public:
long long getNumReturnedSoFar() const final;
- void queueResult(const BSONObj& obj) final;
+ void queueResult(const ClusterQueryResult& result) final;
bool remotesExhausted() final;
@@ -127,8 +128,8 @@ private:
// The root stage of the pipeline used to return the result set, merged from the remote nodes.
std::unique_ptr<RouterExecStage> _root;
- // Stores documents queued by queueResult(). Stashed BSONObjs must be owned.
- std::queue<BSONObj> _stash;
+ // Stores documents queued by queueResult(). BSONObjs within the stashed results must be owned.
+ std::queue<ClusterQueryResult> _stash;
};
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
index 50b47f47817..06a193f3298 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
@@ -53,13 +53,13 @@ TEST(ClusterClientCursorImpl, NumReturnedSoFar) {
for (int i = 1; i < 10; ++i) {
auto result = cursor.next();
ASSERT(result.isOK());
- ASSERT_EQ(*result.getValue(), BSON("a" << i));
+ ASSERT_EQ(*result.getValue().getResult(), BSON("a" << i));
ASSERT_EQ(cursor.getNumReturnedSoFar(), i);
}
// Now check that if nothing is fetched the getNumReturnedSoFar stays the same.
auto result = cursor.next();
ASSERT_OK(result.getStatus());
- ASSERT_FALSE(result.getValue());
+ ASSERT_TRUE(result.getValue().isEOF());
ASSERT_EQ(cursor.getNumReturnedSoFar(), 9LL);
}
@@ -72,34 +72,55 @@ TEST(ClusterClientCursorImpl, QueueResult) {
auto firstResult = cursor.next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
cursor.queueResult(BSON("a" << 2));
cursor.queueResult(BSON("a" << 3));
auto secondResult = cursor.next();
ASSERT_OK(secondResult.getStatus());
- ASSERT(secondResult.getValue());
- ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2));
+ ASSERT(secondResult.getValue().getResult());
+ ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 2));
auto thirdResult = cursor.next();
ASSERT_OK(thirdResult.getStatus());
- ASSERT(thirdResult.getValue());
- ASSERT_EQ(*thirdResult.getValue(), BSON("a" << 3));
+ ASSERT(thirdResult.getValue().getResult());
+ ASSERT_EQ(*thirdResult.getValue().getResult(), BSON("a" << 3));
auto fourthResult = cursor.next();
ASSERT_OK(fourthResult.getStatus());
- ASSERT(fourthResult.getValue());
- ASSERT_EQ(*fourthResult.getValue(), BSON("a" << 4));
+ ASSERT(fourthResult.getValue().getResult());
+ ASSERT_EQ(*fourthResult.getValue().getResult(), BSON("a" << 4));
auto fifthResult = cursor.next();
ASSERT_OK(fifthResult.getStatus());
- ASSERT(!fifthResult.getValue());
+ ASSERT(fifthResult.getValue().isEOF());
ASSERT_EQ(cursor.getNumReturnedSoFar(), 4LL);
}
+TEST(ClusterClientCursorImpl, CursorPropagatesViewDefinition) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+
+ auto viewDef = BSON("ns"
+ << "view_ns"
+ << "pipeline"
+ << BSON_ARRAY(BSON("$match" << BSONNULL)));
+
+ ClusterQueryResult cqResult;
+ cqResult.setViewDefinition(viewDef);
+ mockStage->queueResult(cqResult);
+
+ ClusterClientCursorImpl cursor(std::move(mockStage));
+
+ auto result = cursor.next();
+ ASSERT_OK(result.getStatus());
+ ASSERT(!result.getValue().getResult());
+ ASSERT(result.getValue().getViewDefinition());
+ ASSERT_EQ(*result.getValue().getViewDefinition(), viewDef);
+}
+
TEST(ClusterClientCursorImpl, RemotesExhausted) {
auto mockStage = stdx::make_unique<RouterStageMock>();
mockStage->queueResult(BSON("a" << 1));
@@ -111,19 +132,19 @@ TEST(ClusterClientCursorImpl, RemotesExhausted) {
auto firstResult = cursor.next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
ASSERT_TRUE(cursor.remotesExhausted());
auto secondResult = cursor.next();
ASSERT_OK(secondResult.getStatus());
- ASSERT(secondResult.getValue());
- ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2));
+ ASSERT(secondResult.getValue().getResult());
+ ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 2));
ASSERT_TRUE(cursor.remotesExhausted());
auto thirdResult = cursor.next();
ASSERT_OK(thirdResult.getStatus());
- ASSERT(!thirdResult.getValue());
+ ASSERT_TRUE(thirdResult.getValue().isEOF());
ASSERT_TRUE(cursor.remotesExhausted());
ASSERT_EQ(cursor.getNumReturnedSoFar(), 2LL);
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 2b10928449e..bd4136ab7f2 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -43,12 +43,12 @@ ClusterClientCursorMock::~ClusterClientCursorMock() {
invariant(_exhausted || _killed);
}
-StatusWith<boost::optional<BSONObj>> ClusterClientCursorMock::next() {
+StatusWith<ClusterQueryResult> ClusterClientCursorMock::next() {
invariant(!_killed);
if (_resultsQueue.empty()) {
_exhausted = true;
- return {boost::none};
+ return {ClusterQueryResult()};
}
auto out = _resultsQueue.front();
@@ -59,7 +59,7 @@ StatusWith<boost::optional<BSONObj>> ClusterClientCursorMock::next() {
}
++_numReturnedSoFar;
- return boost::optional<BSONObj>(out.getValue());
+ return out.getValue();
}
long long ClusterClientCursorMock::getNumReturnedSoFar() const {
@@ -77,8 +77,8 @@ bool ClusterClientCursorMock::isTailable() const {
return false;
}
-void ClusterClientCursorMock::queueResult(const BSONObj& obj) {
- _resultsQueue.push({obj});
+void ClusterClientCursorMock::queueResult(const ClusterQueryResult& result) {
+ _resultsQueue.push({result});
}
bool ClusterClientCursorMock::remotesExhausted() {
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index 3749a8abb19..d9e0ba789e3 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -43,7 +43,7 @@ public:
~ClusterClientCursorMock();
- StatusWith<boost::optional<BSONObj>> next() final;
+ StatusWith<ClusterQueryResult> next() final;
void kill() final;
@@ -51,7 +51,7 @@ public:
long long getNumReturnedSoFar() const final;
- void queueResult(const BSONObj& obj) final;
+ void queueResult(const ClusterQueryResult& result) final;
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
@@ -73,7 +73,7 @@ public:
private:
bool _killed = false;
bool _exhausted = false;
- std::queue<StatusWith<BSONObj>> _resultsQueue;
+ std::queue<StatusWith<ClusterQueryResult>> _resultsQueue;
stdx::function<void(void)> _killCallback;
// Number of returned documents.
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index cca70a8dbdf..48d233239d9 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -110,7 +110,7 @@ ClusterCursorManager::PinnedCursor& ClusterCursorManager::PinnedCursor::operator
return *this;
}
-StatusWith<boost::optional<BSONObj>> ClusterCursorManager::PinnedCursor::next() {
+StatusWith<ClusterQueryResult> ClusterCursorManager::PinnedCursor::next() {
invariant(_cursor);
return _cursor->next();
}
@@ -137,9 +137,9 @@ long long ClusterCursorManager::PinnedCursor::getNumReturnedSoFar() const {
return _cursor->getNumReturnedSoFar();
}
-void ClusterCursorManager::PinnedCursor::queueResult(const BSONObj& obj) {
+void ClusterCursorManager::PinnedCursor::queueResult(const ClusterQueryResult& result) {
invariant(_cursor);
- _cursor->queueResult(obj);
+ _cursor->queueResult(result);
}
bool ClusterCursorManager::PinnedCursor::remotesExhausted() {
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index a1d4b28ba68..d8f8899c386 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -154,7 +154,7 @@ public:
*
* Can block.
*/
- StatusWith<boost::optional<BSONObj>> next();
+ StatusWith<ClusterQueryResult> next();
/**
* Returns whether or not the underlying cursor is tailing a capped collection. Cannot be
@@ -185,7 +185,7 @@ public:
/**
* Stashes 'obj' to be returned later by this cursor. A cursor must be owned.
*/
- void queueResult(const BSONObj& obj);
+ void queueResult(const ClusterQueryResult& result);
/**
* Returns whether or not all the remote cursors underlying this cursor have been
diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp
index ce65558f865..3cb88e9a813 100644
--- a/src/mongo/s/query/cluster_cursor_manager_test.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp
@@ -117,11 +117,11 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) {
ASSERT_OK(pinnedCursor.getStatus());
auto nextResult = pinnedCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
- ASSERT(nextResult.getValue());
- ASSERT_EQ(BSON("a" << 1), *nextResult.getValue());
+ ASSERT(nextResult.getValue().getResult());
+ ASSERT_EQ(BSON("a" << 1), *nextResult.getValue().getResult());
nextResult = pinnedCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
- ASSERT_FALSE(nextResult.getValue());
+ ASSERT_TRUE(nextResult.getValue().isEOF());
}
// Test that registering a cursor returns a non-zero cursor id.
@@ -148,11 +148,11 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorBasic) {
ASSERT_EQ(cursorId, checkedOutCursor.getValue().getCursorId());
auto nextResult = checkedOutCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
- ASSERT(nextResult.getValue());
- ASSERT_EQ(BSON("a" << 1), *nextResult.getValue());
+ ASSERT(nextResult.getValue().getResult());
+ ASSERT_EQ(BSON("a" << 1), *nextResult.getValue().getResult());
nextResult = checkedOutCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
- ASSERT_FALSE(nextResult.getValue());
+ ASSERT_TRUE(nextResult.getValue().isEOF());
}
// Test that checking out a cursor returns a pin to the correct cursor, when multiple cursors are
@@ -174,11 +174,11 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) {
ASSERT_OK(pinnedCursor.getStatus());
auto nextResult = pinnedCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
- ASSERT(nextResult.getValue());
- ASSERT_EQ(BSON("a" << i), *nextResult.getValue());
+ ASSERT(nextResult.getValue().getResult());
+ ASSERT_EQ(BSON("a" << i), *nextResult.getValue().getResult());
nextResult = pinnedCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
- ASSERT_FALSE(nextResult.getValue());
+ ASSERT_TRUE(nextResult.getValue().isEOF());
}
}
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 57d6ece2c87..e4ebc311a1c 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -150,7 +150,8 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
const ReadPreferenceSetting& readPref,
ChunkManager* chunkManager,
std::shared_ptr<Shard> primary,
- std::vector<BSONObj>* results) {
+ std::vector<BSONObj>* results,
+ BSONObj* viewDefinition) {
auto shardRegistry = grid.shardRegistry();
// Get the set of shards on which we will run the query.
@@ -235,7 +236,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
return next.getStatus();
}
- if (!next.getValue()) {
+ if (next.getValue().isEOF()) {
// We reached end-of-stream. If the cursor is not tailable, then we mark it as
// exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even
// when we reach end-of-stream. However, if all the remote cursors are exhausted, there
@@ -246,17 +247,27 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
break;
}
+ if (next.getValue().getViewDefinition()) {
+ if (viewDefinition) {
+ *viewDefinition = BSON("resolvedView" << *next.getValue().getViewDefinition());
+ }
+ return {ErrorCodes::CommandOnShardedViewNotSupportedOnMongod,
+ "Find must be transformed for view and run against base collection"};
+ }
+
+ auto nextObj = *next.getValue().getResult();
+
// If adding this object will cause us to exceed the message size limit, then we stash it
// for later.
- if (!FindCommon::haveSpaceForNext(*next.getValue(), results->size(), bytesBuffered)) {
- ccc->queueResult(*next.getValue());
+ if (!FindCommon::haveSpaceForNext(nextObj, results->size(), bytesBuffered)) {
+ ccc->queueResult(nextObj);
break;
}
// Add doc to the batch. Account for the space overhead associated with returning this doc
// inside a BSON array.
- bytesBuffered += (next.getValue()->objsize() + kPerDocumentOverheadBytesUpperBound);
- results->push_back(std::move(*next.getValue()));
+ bytesBuffered += (nextObj.objsize() + kPerDocumentOverheadBytesUpperBound);
+ results->push_back(std::move(nextObj));
}
if (!query.getQueryRequest().wantMore() && !ccc->isTailable()) {
@@ -287,7 +298,8 @@ const size_t ClusterFind::kMaxStaleConfigRetries = 10;
StatusWith<CursorId> ClusterFind::runQuery(OperationContext* txn,
const CanonicalQuery& query,
const ReadPreferenceSetting& readPref,
- std::vector<BSONObj>* results) {
+ std::vector<BSONObj>* results,
+ BSONObj* viewDefinition) {
invariant(results);
// Projection on the reserved sort key field is illegal in mongos.
@@ -316,7 +328,7 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* txn,
// shard version.
for (size_t retries = 1; retries <= kMaxStaleConfigRetries; ++retries) {
auto cursorId = runQueryWithoutRetrying(
- txn, query, readPref, chunkManager.get(), std::move(primary), results);
+ txn, query, readPref, chunkManager.get(), std::move(primary), results, viewDefinition);
if (cursorId.isOK()) {
return cursorId;
}
@@ -387,7 +399,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* txn,
return next.getStatus();
}
- if (!next.getValue()) {
+ if (next.getValue().isEOF()) {
// We reached end-of-stream.
if (!pinnedCursor.getValue().isTailable()) {
cursorState = ClusterCursorManager::CursorState::Exhausted;
@@ -395,15 +407,17 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* txn,
break;
}
- if (!FindCommon::haveSpaceForNext(*next.getValue(), batch.size(), bytesBuffered)) {
- pinnedCursor.getValue().queueResult(*next.getValue());
+ if (!FindCommon::haveSpaceForNext(
+ *next.getValue().getResult(), batch.size(), bytesBuffered)) {
+ pinnedCursor.getValue().queueResult(*next.getValue().getResult());
break;
}
// Add doc to the batch. Account for the space overhead associated with returning this doc
// inside a BSON array.
- bytesBuffered += (next.getValue()->objsize() + kPerDocumentOverheadBytesUpperBound);
- batch.push_back(std::move(*next.getValue()));
+ bytesBuffered +=
+ (next.getValue().getResult()->objsize() + kPerDocumentOverheadBytesUpperBound);
+ batch.push_back(std::move(*next.getValue().getResult()));
}
// Transfer ownership of the cursor back to the cursor manager.
diff --git a/src/mongo/s/query/cluster_find.h b/src/mongo/s/query/cluster_find.h
index 64821512ded..22d7ad89b04 100644
--- a/src/mongo/s/query/cluster_find.h
+++ b/src/mongo/s/query/cluster_find.h
@@ -63,11 +63,14 @@ public:
* On success, fills out 'results' with the first batch of query results and returns the cursor
* id which the caller can use on subsequent getMore operations. If no cursor needed to be saved
* (e.g. the cursor was exhausted without need for a getMore), returns a cursor id of 0.
+ * If a CommandOnShardedViewNotSupportedOnMongod error is returned, then 'viewDefinition', if
+ * not null, will contain a view definition.
*/
static StatusWith<CursorId> runQuery(OperationContext* txn,
const CanonicalQuery& query,
const ReadPreferenceSetting& readPref,
- std::vector<BSONObj>* results);
+ std::vector<BSONObj>* results,
+ BSONObj* viewDefinition);
/**
* Executes the getMore request 'request', and on success returns a CursorResponse.
diff --git a/src/mongo/s/query/cluster_query_result.h b/src/mongo/s/query/cluster_query_result.h
new file mode 100644
index 00000000000..43faee4392e
--- /dev/null
+++ b/src/mongo/s/query/cluster_query_result.h
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+
+#include "mongo/bson/bsonobj.h"
+
+namespace mongo {
+
+/**
+ * Holds a single result from a mongoS find command shard request. This result can represent one of
+ * several states:
+ * - Contains collection data, stored in '_resultObj'.
+ * - Contains a view definition, stored in '_viewDefinition'.
+ * - EOF. Both '_resultObj' and '_viewDefinition' are isEOF() returns true.
+ */
+class ClusterQueryResult {
+public:
+ ClusterQueryResult() = default;
+
+ ClusterQueryResult(BSONObj resObj) : _resultObj(resObj) {}
+
+ bool isEOF() const {
+ return !_resultObj && !_viewDefinition;
+ }
+
+ boost::optional<BSONObj> getResult() const {
+ return _resultObj;
+ }
+
+ boost::optional<BSONObj> getViewDefinition() const {
+ return _viewDefinition;
+ }
+
+ void setViewDefinition(BSONObj viewDef) {
+ invariant(isEOF());
+ _viewDefinition = viewDef;
+ }
+
+private:
+ boost::optional<BSONObj> _resultObj;
+ boost::optional<BSONObj> _viewDefinition;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 0e10d9edff2..726bd2df97b 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -33,6 +33,7 @@
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj.h"
+#include "mongo/s/query/cluster_query_result.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -59,13 +60,13 @@ public:
/**
* Returns the next query result, or an error.
*
- * If there are no more results, returns boost::none.
+ * If there are no more results, returns an EOF ClusterQueryResult.
*
* All returned BSONObjs are owned. They may own a buffer larger than the object. If you are
* holding on to a subset of the returned results and need to minimize memory usage, call copy()
* on the BSONObjs.
*/
- virtual StatusWith<boost::optional<BSONObj>> next() = 0;
+ virtual StatusWith<ClusterQueryResult> next() = 0;
/**
* Must be called before destruction to abandon a not-yet-exhausted plan. May block waiting for
diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp
index 5f7db02ec7b..4756423249c 100644
--- a/src/mongo/s/query/router_stage_limit.cpp
+++ b/src/mongo/s/query/router_stage_limit.cpp
@@ -39,9 +39,9 @@ RouterStageLimit::RouterStageLimit(std::unique_ptr<RouterExecStage> child, long
invariant(limit > 0);
}
-StatusWith<boost::optional<BSONObj>> RouterStageLimit::next() {
+StatusWith<ClusterQueryResult> RouterStageLimit::next() {
if (_returnedSoFar >= _limit) {
- return {boost::none};
+ return {ClusterQueryResult()};
}
auto childResult = getChildStage()->next();
@@ -49,7 +49,7 @@ StatusWith<boost::optional<BSONObj>> RouterStageLimit::next() {
return childResult;
}
- if (childResult.getValue()) {
+ if (!childResult.getValue().isEOF()) {
++_returnedSoFar;
}
return childResult;
diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h
index 366a964f2a4..8b29b56f291 100644
--- a/src/mongo/s/query/router_stage_limit.h
+++ b/src/mongo/s/query/router_stage_limit.h
@@ -39,7 +39,7 @@ class RouterStageLimit final : public RouterExecStage {
public:
RouterStageLimit(std::unique_ptr<RouterExecStage> child, long long limit);
- StatusWith<boost::optional<BSONObj>> next() final;
+ StatusWith<ClusterQueryResult> next() final;
void kill() final;
diff --git a/src/mongo/s/query/router_stage_limit_test.cpp b/src/mongo/s/query/router_stage_limit_test.cpp
index fd8fa335e7e..658b6b4d48d 100644
--- a/src/mongo/s/query/router_stage_limit_test.cpp
+++ b/src/mongo/s/query/router_stage_limit_test.cpp
@@ -42,25 +42,25 @@ namespace {
TEST(RouterStageLimitTest, LimitIsOne) {
auto mockStage = stdx::make_unique<RouterStageMock>();
- mockStage->queueResult(BSON("a" << 1));
- mockStage->queueResult(BSON("a" << 2));
- mockStage->queueResult(BSON("a" << 3));
+ mockStage->queueResult({BSON("a" << 1)});
+ mockStage->queueResult({BSON("a" << 2)});
+ mockStage->queueResult({BSON("a" << 3)});
auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 1);
auto firstResult = limitStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
auto secondResult = limitStage->next();
ASSERT_OK(secondResult.getStatus());
- ASSERT(!secondResult.getValue());
+ ASSERT(!secondResult.getValue().getResult());
- // Once end-of-stream is reached, the limit stage should keep returning boost::none.
+ // Once end-of-stream is reached, the limit stage should keep returning no results.
auto thirdResult = limitStage->next();
ASSERT_OK(thirdResult.getStatus());
- ASSERT(!thirdResult.getValue());
+ ASSERT(!thirdResult.getValue().getResult());
}
TEST(RouterStageLimitTest, LimitIsTwo) {
@@ -73,17 +73,17 @@ TEST(RouterStageLimitTest, LimitIsTwo) {
auto firstResult = limitStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
auto secondResult = limitStage->next();
ASSERT_OK(secondResult.getStatus());
- ASSERT(secondResult.getValue());
- ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 2));
auto thirdResult = limitStage->next();
ASSERT_OK(thirdResult.getStatus());
- ASSERT(!thirdResult.getValue());
+ ASSERT(!thirdResult.getValue().getResult());
}
TEST(RouterStageLimitTest, LimitStagePropagatesError) {
@@ -97,8 +97,8 @@ TEST(RouterStageLimitTest, LimitStagePropagatesError) {
auto firstResult = limitStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
auto secondResult = limitStage->next();
ASSERT_NOT_OK(secondResult.getStatus());
@@ -106,6 +106,27 @@ TEST(RouterStageLimitTest, LimitStagePropagatesError) {
ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened");
}
+TEST(RouterStageLimitTest, LimitStagePropagatesViewDefinition) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+
+ auto viewDef = BSON("ns"
+ << "view_ns"
+ << "pipeline"
+ << BSON_ARRAY(BSON("$match" << BSONNULL)));
+
+ ClusterQueryResult cqResult;
+ cqResult.setViewDefinition(viewDef);
+ mockStage->queueResult(cqResult);
+
+ auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 3);
+
+ auto result = limitStage->next();
+ ASSERT_OK(result.getStatus());
+ ASSERT(!result.getValue().getResult());
+ ASSERT(result.getValue().getViewDefinition());
+ ASSERT_EQ(*result.getValue().getViewDefinition(), viewDef);
+}
+
TEST(RouterStageLimitTest, LimitStageToleratesMidStreamEOF) {
// Here we're mocking the tailable case, where there may be a boost::none returned before the
// remote cursor is closed. Our goal is to make sure that the limit stage handles this properly,
@@ -120,21 +141,21 @@ TEST(RouterStageLimitTest, LimitStageToleratesMidStreamEOF) {
auto firstResult = limitStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
auto secondResult = limitStage->next();
ASSERT_OK(secondResult.getStatus());
- ASSERT(!secondResult.getValue());
+ ASSERT(secondResult.getValue().isEOF());
auto thirdResult = limitStage->next();
ASSERT_OK(thirdResult.getStatus());
- ASSERT(thirdResult.getValue());
- ASSERT_EQ(*thirdResult.getValue(), BSON("a" << 2));
+ ASSERT(thirdResult.getValue().getResult());
+ ASSERT_EQ(*thirdResult.getValue().getResult(), BSON("a" << 2));
auto fourthResult = limitStage->next();
ASSERT_OK(fourthResult.getStatus());
- ASSERT(!fourthResult.getValue());
+ ASSERT(fourthResult.getValue().isEOF());
}
TEST(RouterStageLimitTest, LimitStageRemotesExhausted) {
@@ -148,19 +169,19 @@ TEST(RouterStageLimitTest, LimitStageRemotesExhausted) {
auto firstResult = limitStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
ASSERT_TRUE(limitStage->remotesExhausted());
auto secondResult = limitStage->next();
ASSERT_OK(secondResult.getStatus());
- ASSERT(secondResult.getValue());
- ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2));
+ ASSERT(secondResult.getValue().getResult());
+ ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 2));
ASSERT_TRUE(limitStage->remotesExhausted());
auto thirdResult = limitStage->next();
ASSERT_OK(thirdResult.getStatus());
- ASSERT(!thirdResult.getValue());
+ ASSERT(thirdResult.getValue().isEOF());
ASSERT_TRUE(limitStage->remotesExhausted());
}
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 0e9304d9952..e3b8fd299a6 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -40,7 +40,7 @@ RouterStageMerge::RouterStageMerge(executor::TaskExecutor* executor,
ClusterClientCursorParams&& params)
: _executor(executor), _arm(executor, std::move(params)) {}
-StatusWith<boost::optional<BSONObj>> RouterStageMerge::next() {
+StatusWith<ClusterQueryResult> RouterStageMerge::next() {
while (!_arm.ready()) {
auto nextEventStatus = _arm.nextEvent();
if (!nextEventStatus.isOK()) {
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index a75d5a46f8d..9f2c2e9e0c4 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -45,7 +45,7 @@ class RouterStageMerge final : public RouterExecStage {
public:
RouterStageMerge(executor::TaskExecutor* executor, ClusterClientCursorParams&& params);
- StatusWith<boost::optional<BSONObj>> next() final;
+ StatusWith<ClusterQueryResult> next() final;
void kill() final;
diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp
index daad6fe6d07..b2aa83e3ed6 100644
--- a/src/mongo/s/query/router_stage_mock.cpp
+++ b/src/mongo/s/query/router_stage_mock.cpp
@@ -34,8 +34,8 @@
namespace mongo {
-void RouterStageMock::queueResult(BSONObj obj) {
- _resultsQueue.push({obj});
+void RouterStageMock::queueResult(const ClusterQueryResult& result) {
+ _resultsQueue.push({result});
}
void RouterStageMock::queueError(Status status) {
@@ -43,16 +43,16 @@ void RouterStageMock::queueError(Status status) {
}
void RouterStageMock::queueEOF() {
- _resultsQueue.push({boost::none});
+ _resultsQueue.push({ClusterQueryResult()});
}
void RouterStageMock::markRemotesExhausted() {
_remotesExhausted = true;
}
-StatusWith<boost::optional<BSONObj>> RouterStageMock::next() {
+StatusWith<ClusterQueryResult> RouterStageMock::next() {
if (_resultsQueue.empty()) {
- return {boost::none};
+ return {ClusterQueryResult()};
}
auto out = _resultsQueue.front();
diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h
index 255ae75b595..ef093b04fe4 100644
--- a/src/mongo/s/query/router_stage_mock.h
+++ b/src/mongo/s/query/router_stage_mock.h
@@ -31,6 +31,7 @@
#include <boost/optional.hpp>
#include <queue>
+#include "mongo/s/query/cluster_query_result.h"
#include "mongo/s/query/router_exec_stage.h"
namespace mongo {
@@ -43,7 +44,7 @@ class RouterStageMock final : public RouterExecStage {
public:
~RouterStageMock() final {}
- StatusWith<boost::optional<BSONObj>> next() final;
+ StatusWith<ClusterQueryResult> next() final;
void kill() final;
@@ -56,7 +57,7 @@ public:
/**
* Queues a BSONObj to be returned.
*/
- void queueResult(BSONObj obj);
+ void queueResult(const ClusterQueryResult& result);
/**
* Queues an error response.
@@ -80,7 +81,7 @@ public:
StatusWith<Milliseconds> getAwaitDataTimeout();
private:
- std::queue<StatusWith<boost::optional<BSONObj>>> _resultsQueue;
+ std::queue<StatusWith<ClusterQueryResult>> _resultsQueue;
bool _remotesExhausted = false;
boost::optional<Milliseconds> _awaitDataTimeout;
};
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp
index 949182e3a1a..77d3e26afd0 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp
@@ -41,14 +41,16 @@ namespace mongo {
RouterStageRemoveSortKey::RouterStageRemoveSortKey(std::unique_ptr<RouterExecStage> child)
: RouterExecStage(std::move(child)) {}
-StatusWith<boost::optional<BSONObj>> RouterStageRemoveSortKey::next() {
+StatusWith<ClusterQueryResult> RouterStageRemoveSortKey::next() {
auto childResult = getChildStage()->next();
- if (!childResult.isOK() || !childResult.getValue()) {
+ if (!childResult.isOK() || !childResult.getValue().getResult()) {
return childResult;
}
+ const auto& childObj = childResult.getValue().getResult();
+
BSONObjBuilder builder;
- for (BSONElement elt : *childResult.getValue()) {
+ for (BSONElement elt : *childObj) {
if (!str::equals(elt.fieldName(), ClusterClientCursorParams::kSortKeyField)) {
builder.append(elt);
}
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h
index f7965312d47..79294aeb20a 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.h
+++ b/src/mongo/s/query/router_stage_remove_sortkey.h
@@ -41,7 +41,7 @@ class RouterStageRemoveSortKey final : public RouterExecStage {
public:
RouterStageRemoveSortKey(std::unique_ptr<RouterExecStage> child);
- StatusWith<boost::optional<BSONObj>> next() final;
+ StatusWith<ClusterQueryResult> next() final;
void kill() final;
diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
index 255bcb3cba4..ad6ee3f55a2 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
@@ -52,29 +52,29 @@ TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) {
auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSON("a" << 4 << "b" << 3));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 4 << "b" << 3));
auto secondResult = sortKeyStage->next();
ASSERT_OK(secondResult.getStatus());
- ASSERT(secondResult.getValue());
- ASSERT_EQ(*secondResult.getValue(),
+ ASSERT(secondResult.getValue().getResult());
+ ASSERT_EQ(*secondResult.getValue().getResult(),
BSON("c" << BSON("d"
<< "foo")));
auto thirdResult = sortKeyStage->next();
ASSERT_OK(thirdResult.getStatus());
- ASSERT(thirdResult.getValue());
- ASSERT_EQ(*thirdResult.getValue(), BSON("a" << 3));
+ ASSERT(thirdResult.getValue().getResult());
+ ASSERT_EQ(*thirdResult.getValue().getResult(), BSON("a" << 3));
auto fourthResult = sortKeyStage->next();
ASSERT_OK(fourthResult.getStatus());
- ASSERT(fourthResult.getValue());
- ASSERT_EQ(*fourthResult.getValue(), BSONObj());
+ ASSERT(fourthResult.getValue().getResult());
+ ASSERT_EQ(*fourthResult.getValue().getResult(), BSONObj());
auto fifthResult = sortKeyStage->next();
ASSERT_OK(fifthResult.getStatus());
- ASSERT(!fifthResult.getValue());
+ ASSERT(fifthResult.getValue().isEOF());
}
TEST(RouterStageRemoveSortKeyTest, PropagatesError) {
@@ -86,8 +86,8 @@ TEST(RouterStageRemoveSortKeyTest, PropagatesError) {
auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSONObj());
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSONObj());
auto secondResult = sortKeyStage->next();
ASSERT_NOT_OK(secondResult.getStatus());
@@ -105,21 +105,21 @@ TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) {
auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1 << "b" << 1));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1));
auto secondResult = sortKeyStage->next();
ASSERT_OK(secondResult.getStatus());
- ASSERT(!secondResult.getValue());
+ ASSERT(secondResult.getValue().isEOF());
auto thirdResult = sortKeyStage->next();
ASSERT_OK(thirdResult.getStatus());
- ASSERT(thirdResult.getValue());
- ASSERT_EQ(*thirdResult.getValue(), BSON("a" << 2 << "b" << 2));
+ ASSERT(thirdResult.getValue().getResult());
+ ASSERT_EQ(*thirdResult.getValue().getResult(), BSON("a" << 2 << "b" << 2));
auto fourthResult = sortKeyStage->next();
ASSERT_OK(fourthResult.getStatus());
- ASSERT(!fourthResult.getValue());
+ ASSERT(fourthResult.getValue().isEOF());
}
TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) {
@@ -133,19 +133,19 @@ TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) {
auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1 << "b" << 1));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1));
ASSERT_TRUE(sortKeyStage->remotesExhausted());
auto secondResult = sortKeyStage->next();
ASSERT_OK(secondResult.getStatus());
- ASSERT(secondResult.getValue());
- ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2 << "b" << 2));
+ ASSERT(secondResult.getValue().getResult());
+ ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 2 << "b" << 2));
ASSERT_TRUE(sortKeyStage->remotesExhausted());
auto thirdResult = sortKeyStage->next();
ASSERT_OK(thirdResult.getStatus());
- ASSERT(!thirdResult.getValue());
+ ASSERT(thirdResult.getValue().isEOF());
ASSERT_TRUE(sortKeyStage->remotesExhausted());
}
diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp
index af746d5e430..f9a63e515ff 100644
--- a/src/mongo/s/query/router_stage_skip.cpp
+++ b/src/mongo/s/query/router_stage_skip.cpp
@@ -39,14 +39,18 @@ RouterStageSkip::RouterStageSkip(std::unique_ptr<RouterExecStage> child, long lo
invariant(skip > 0);
}
-StatusWith<boost::optional<BSONObj>> RouterStageSkip::next() {
+StatusWith<ClusterQueryResult> RouterStageSkip::next() {
while (_skippedSoFar < _skip) {
auto next = getChildStage()->next();
if (!next.isOK()) {
return next;
}
- if (!next.getValue()) {
+ if (next.getValue().isEOF()) {
+ return next;
+ }
+
+ if (next.getValue().getViewDefinition()) {
return next;
}
diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h
index 430d3748c91..fda4201f9cb 100644
--- a/src/mongo/s/query/router_stage_skip.h
+++ b/src/mongo/s/query/router_stage_skip.h
@@ -39,7 +39,7 @@ class RouterStageSkip final : public RouterExecStage {
public:
RouterStageSkip(std::unique_ptr<RouterExecStage> child, long long skip);
- StatusWith<boost::optional<BSONObj>> next() final;
+ StatusWith<ClusterQueryResult> next() final;
void kill() final;
diff --git a/src/mongo/s/query/router_stage_skip_test.cpp b/src/mongo/s/query/router_stage_skip_test.cpp
index 7aca1f600bd..efd57eaf111 100644
--- a/src/mongo/s/query/router_stage_skip_test.cpp
+++ b/src/mongo/s/query/router_stage_skip_test.cpp
@@ -50,22 +50,22 @@ TEST(RouterStageSkipTest, SkipIsOne) {
auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSON("a" << 2));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 2));
auto secondResult = skipStage->next();
ASSERT_OK(secondResult.getStatus());
- ASSERT(secondResult.getValue());
- ASSERT_EQ(*secondResult.getValue(), BSON("a" << 3));
+ ASSERT(secondResult.getValue().getResult());
+ ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 3));
// Once end-of-stream is reached, the skip stage should keep returning boost::none.
auto thirdResult = skipStage->next();
ASSERT_OK(thirdResult.getStatus());
- ASSERT(!thirdResult.getValue());
+ ASSERT(thirdResult.getValue().isEOF());
auto fourthResult = skipStage->next();
ASSERT_OK(thirdResult.getStatus());
- ASSERT(!thirdResult.getValue());
+ ASSERT(thirdResult.getValue().isEOF());
}
TEST(RouterStageSkipTest, SkipIsThree) {
@@ -79,12 +79,12 @@ TEST(RouterStageSkipTest, SkipIsThree) {
auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSON("a" << 4));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 4));
auto secondResult = skipStage->next();
ASSERT_OK(secondResult.getStatus());
- ASSERT(!secondResult.getValue());
+ ASSERT(secondResult.getValue().isEOF());
}
TEST(RouterStageSkipTest, SkipEqualToResultSetSize) {
@@ -97,7 +97,7 @@ TEST(RouterStageSkipTest, SkipEqualToResultSetSize) {
auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(!firstResult.getValue());
+ ASSERT(firstResult.getValue().isEOF());
}
TEST(RouterStageSkipTest, SkipExceedsResultSetSize) {
@@ -110,7 +110,7 @@ TEST(RouterStageSkipTest, SkipExceedsResultSetSize) {
auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(!firstResult.getValue());
+ ASSERT(firstResult.getValue().isEOF());
}
TEST(RouterStageSkipTest, ErrorWhileSkippingResults) {
@@ -139,8 +139,8 @@ TEST(RouterStageSkipTest, ErrorAfterSkippingResults) {
auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSON("a" << 3));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 3));
auto secondResult = skipStage->next();
ASSERT_NOT_OK(secondResult.getStatus());
@@ -148,6 +148,29 @@ TEST(RouterStageSkipTest, ErrorAfterSkippingResults) {
ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened");
}
+TEST(RouterStageSkipTest, SkipStagePropagatesViewDefinition) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+
+ ClusterQueryResult cqResult;
+ cqResult.setViewDefinition(BSON("ns"
+ << "view_ns"
+ << "pipeline"
+ << BSON_ARRAY(BSON("$match" << BSONNULL))));
+ mockStage->queueResult(cqResult);
+
+ auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3);
+
+ auto result = skipStage->next();
+ ASSERT_OK(result.getStatus());
+ ASSERT(!result.getValue().getResult());
+ ASSERT(result.getValue().getViewDefinition());
+ ASSERT_EQ(*result.getValue().getViewDefinition(),
+ BSON("ns"
+ << "view_ns"
+ << "pipeline"
+ << BSON_ARRAY(BSON("$match" << BSONNULL))));
+}
+
TEST(RouterStageSkipTest, SkipStageToleratesMidStreamEOF) {
// Skip stage must propagate a boost::none, but not count it towards the skip value.
auto mockStage = stdx::make_unique<RouterStageMock>();
@@ -160,16 +183,16 @@ TEST(RouterStageSkipTest, SkipStageToleratesMidStreamEOF) {
auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(!firstResult.getValue());
+ ASSERT(firstResult.getValue().isEOF());
auto secondResult = skipStage->next();
ASSERT_OK(secondResult.getStatus());
- ASSERT(secondResult.getValue());
- ASSERT_EQ(*secondResult.getValue(), BSON("a" << 3));
+ ASSERT(secondResult.getValue().getResult());
+ ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 3));
auto thirdResult = skipStage->next();
ASSERT_OK(thirdResult.getStatus());
- ASSERT(!thirdResult.getValue());
+ ASSERT(thirdResult.getValue().isEOF());
}
TEST(RouterStageSkipTest, SkipStageRemotesExhausted) {
@@ -184,19 +207,19 @@ TEST(RouterStageSkipTest, SkipStageRemotesExhausted) {
auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
- ASSERT(firstResult.getValue());
- ASSERT_EQ(*firstResult.getValue(), BSON("a" << 2));
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_EQ(*firstResult.getValue().getResult(), BSON("a" << 2));
ASSERT_TRUE(skipStage->remotesExhausted());
auto secondResult = skipStage->next();
ASSERT_OK(secondResult.getStatus());
- ASSERT(secondResult.getValue());
- ASSERT_EQ(*secondResult.getValue(), BSON("a" << 3));
+ ASSERT(secondResult.getValue().getResult());
+ ASSERT_EQ(*secondResult.getValue().getResult(), BSON("a" << 3));
ASSERT_TRUE(skipStage->remotesExhausted());
auto thirdResult = skipStage->next();
ASSERT_OK(thirdResult.getStatus());
- ASSERT(!thirdResult.getValue());
+ ASSERT(thirdResult.getValue().isEOF());
ASSERT_TRUE(skipStage->remotesExhausted());
}