diff options
author | Kyle Suarez <kyle.suarez@mongodb.com> | 2016-07-18 10:51:40 -0400 |
---|---|---|
committer | Kyle Suarez <kyle.suarez@mongodb.com> | 2016-07-19 11:14:50 -0400 |
commit | cef83c2fddc2d8cfffa76d3e8680fc6c069e75f3 (patch) | |
tree | 8120a74875f92757338615fe20ba1905ba35e65c /src/mongo | |
parent | 97c43492de001c1bfd9426107c919ab50f1829ab (diff) | |
download | mongo-cef83c2fddc2d8cfffa76d3e8680fc6c069e75f3.tar.gz |
SERVER-24766 find command support for views
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 64 | ||||
-rw-r--r-- | src/mongo/db/query/query_request.cpp | 119 | ||||
-rw-r--r-- | src/mongo/db/query/query_request.h | 6 |
3 files changed, 184 insertions, 5 deletions
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 7f10d506c23..0d3b0d4381b 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -157,9 +157,37 @@ public: } std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - AutoGetCollectionForRead ctx(txn, nss); - // The collection may be NULL. If so, getExecutor() should handle it by returning - // an execution tree with an EOFStage. + // Acquire locks. If the namespace is a view, we release our locks and convert the query + // request into an aggregation command. + AutoGetCollectionOrViewForRead ctx(txn, nss); + if (ctx.getView()) { + // Relinquish locks. The aggregation command will re-acquire them. + ctx.releaseLocksForView(); + + // Convert the find command into an aggregation using $match (and other stages, as + // necessary), if possible. + const auto& qr = cq->getQueryRequest(); + auto viewAggregationCommand = qr.asAggregationCommand(); + if (!viewAggregationCommand.isOK()) + return viewAggregationCommand.getStatus(); + + Command* agg = Command::findCommand("aggregate"); + std::string errmsg; + + try { + agg->run(txn, dbname, viewAggregationCommand.getValue(), 0, errmsg, *out); + } catch (DBException& error) { + if (error.getCode() == ErrorCodes::InvalidPipelineOperator) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Unsupported in view pipeline: " << error.what()}; + } + return error.toStatus(); + } + return Status::OK(); + } + + // The collection may be NULL. If so, getExecutor() should handle it by returning an + // execution tree with an EOFStage. Collection* collection = ctx.getCollection(); // We have a parsed query. Time to get the execution plan for it. @@ -242,9 +270,35 @@ public: } std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - // Acquire locks. - AutoGetCollectionForRead ctx(txn, nss); + // Acquire locks. If the query is on a view, we release our locks and convert the query + // request into an aggregation command. + AutoGetCollectionOrViewForRead ctx(txn, nss); Collection* collection = ctx.getCollection(); + if (ctx.getView()) { + // Relinquish locks. The aggregation command will re-acquire them. + ctx.releaseLocksForView(); + + // Convert the find command into an aggregation using $match (and other stages, as + // necessary), if possible. + const auto& qr = cq->getQueryRequest(); + auto viewAggregationCommand = qr.asAggregationCommand(); + if (!viewAggregationCommand.isOK()) + return appendCommandStatus(result, viewAggregationCommand.getStatus()); + + Command* agg = Command::findCommand("aggregate"); + try { + agg->run(txn, dbname, viewAggregationCommand.getValue(), options, errmsg, result); + } catch (DBException& error) { + if (error.getCode() == ErrorCodes::InvalidPipelineOperator) { + return appendCommandStatus( + result, + {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Unsupported in view pipeline: " << error.what()}); + } + return appendCommandStatus(result, error.toStatus()); + } + return true; + } // Get the execution plan for the query. auto statusWithPlanExecutor = diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp index f42d97d326d..9d76e502ec3 100644 --- a/src/mongo/db/query/query_request.cpp +++ b/src/mongo/db/query/query_request.cpp @@ -910,4 +910,123 @@ boost::optional<long long> QueryRequest::getEffectiveBatchSize() const { return _batchSize ? _batchSize : _ntoreturn; } +StatusWith<BSONObj> QueryRequest::asAggregationCommand() const { + BSONObjBuilder aggregationBuilder; + + // First, check if this query has options that are not supported in aggregation. + if (!_min.isEmpty()) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kMinField << " not supported in aggregation."}; + } + if (!_max.isEmpty()) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kMaxField << " not supported in aggregation."}; + } + if (!_wantMore) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kSingleBatchField + << " not supported in aggregation."}; + } + if (_maxScan != 0) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kMaxScanField << " not supported in aggregation."}; + } + if (_returnKey) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kReturnKeyField << " not supported in aggregation."}; + } + if (!_hint.isEmpty()) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kHintField << " not supported in aggregation."}; + } + if (!_comment.empty()) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kCommentField << " not supported in aggregation."}; + } + if (_showRecordId) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kShowRecordIdField + << " not supported in aggregation."}; + } + if (_snapshot) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kSnapshotField << " not supported in aggregation."}; + } + if (_tailable) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kTailableField << " not supported in aggregation."}; + } + if (_oplogReplay) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kOplogReplayField + << " not supported in aggregation."}; + } + if (_noCursorTimeout) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kNoCursorTimeoutField + << " not supported in aggregation."}; + } + if (_awaitData) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kAwaitDataField << " not supported in aggregation."}; + } + if (_allowPartialResults) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kPartialResultsField + << " not supported in aggregation."}; + } + if (_ntoreturn) { + return {ErrorCodes::BadValue, + str::stream() << "Cannot convert to an aggregation if ntoreturn is set."}; + } + + // Now that we've successfully validated this QR, begin building the aggregation command. + aggregationBuilder.append("aggregate", _nss.coll()); + + // Construct an aggregation pipeline that finds the equivalent documents to this query request. + BSONArrayBuilder pipelineBuilder(aggregationBuilder.subarrayStart("pipeline")); + if (!_filter.isEmpty()) { + BSONObjBuilder matchBuilder(pipelineBuilder.subobjStart()); + matchBuilder.append("$match", _filter); + matchBuilder.doneFast(); + } + if (!_sort.isEmpty()) { + BSONObjBuilder sortBuilder(pipelineBuilder.subobjStart()); + sortBuilder.append("$sort", _sort); + sortBuilder.doneFast(); + } + if (_skip) { + BSONObjBuilder skipBuilder(pipelineBuilder.subobjStart()); + skipBuilder.append("$skip", *_skip); + skipBuilder.doneFast(); + } + if (_limit) { + BSONObjBuilder limitBuilder(pipelineBuilder.subobjStart()); + limitBuilder.append("$limit", *_limit); + limitBuilder.doneFast(); + } + if (!_proj.isEmpty()) { + BSONObjBuilder projectBuilder(pipelineBuilder.subobjStart()); + projectBuilder.append("$project", _proj); + projectBuilder.doneFast(); + } + pipelineBuilder.doneFast(); + + // The aggregation 'cursor' option is always set, regardless of the presence of batchSize. + BSONObjBuilder batchSizeBuilder(aggregationBuilder.subobjStart("cursor")); + if (_batchSize) { + batchSizeBuilder.append(kBatchSizeField, *_batchSize); + } + batchSizeBuilder.doneFast(); + + // Other options. + aggregationBuilder.append("collation", _collation); + if (_explain) { + aggregationBuilder.append("explain", _explain); + } + if (_maxTimeMS > 0) { + aggregationBuilder.append(cmdOptionMaxTimeMS, _maxTimeMS); + } + return StatusWith<BSONObj>(aggregationBuilder.obj()); +} } // namespace mongo diff --git a/src/mongo/db/query/query_request.h b/src/mongo/db/query/query_request.h index cd7fa62f393..01d356a5837 100644 --- a/src/mongo/db/query/query_request.h +++ b/src/mongo/db/query/query_request.h @@ -77,6 +77,12 @@ public: void asFindCommand(BSONObjBuilder* cmdBuilder) const; /** + * Converts this QR into an aggregation using $match. If this QR has options that cannot be + * satisfied by aggregation, a non-OK status is returned and 'cmdBuilder' is not modified. + */ + StatusWith<BSONObj> asAggregationCommand() const; + + /** * Parses maxTimeMS from the BSONElement containing its value. */ static StatusWith<int> parseMaxTimeMS(BSONElement maxTimeMSElt); |