diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_cursor.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 274 |
1 files changed, 135 insertions, 139 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index d862663363d..702852f53b2 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -43,179 +43,175 @@ namespace mongo { - using boost::intrusive_ptr; - using std::shared_ptr; - using std::string; +using boost::intrusive_ptr; +using std::shared_ptr; +using std::string; - DocumentSourceCursor::~DocumentSourceCursor() { - dispose(); - } - - const char *DocumentSourceCursor::getSourceName() const { - return "$cursor"; - } +DocumentSourceCursor::~DocumentSourceCursor() { + dispose(); +} - boost::optional<Document> DocumentSourceCursor::getNext() { - pExpCtx->checkForInterrupt(); +const char* DocumentSourceCursor::getSourceName() const { + return "$cursor"; +} - if (_currentBatch.empty()) { - loadBatch(); +boost::optional<Document> DocumentSourceCursor::getNext() { + pExpCtx->checkForInterrupt(); - if (_currentBatch.empty()) // exhausted the cursor - return boost::none; - } + if (_currentBatch.empty()) { + loadBatch(); - Document out = _currentBatch.front(); - _currentBatch.pop_front(); - return out; + if (_currentBatch.empty()) // exhausted the cursor + return boost::none; } - void DocumentSourceCursor::dispose() { - // Can't call in to PlanExecutor or ClientCursor registries from this function since it - // will be called when an agg cursor is killed which would cause a deadlock. - _exec.reset(); - _currentBatch.clear(); - } - - void DocumentSourceCursor::loadBatch() { - if (!_exec) { - dispose(); - return; - } + Document out = _currentBatch.front(); + _currentBatch.pop_front(); + return out; +} - // We have already validated the sharding version when we constructed the PlanExecutor - // so we shouldn't check it again. - const NamespaceString nss(_ns); - AutoGetCollectionForRead autoColl(pExpCtx->opCtx, nss); +void DocumentSourceCursor::dispose() { + // Can't call in to PlanExecutor or ClientCursor registries from this function since it + // will be called when an agg cursor is killed which would cause a deadlock. + _exec.reset(); + _currentBatch.clear(); +} - _exec->restoreState(pExpCtx->opCtx); +void DocumentSourceCursor::loadBatch() { + if (!_exec) { + dispose(); + return; + } - int memUsageBytes = 0; - BSONObj obj; - PlanExecutor::ExecState state; - while ((state = _exec->getNext(&obj, NULL)) == PlanExecutor::ADVANCED) { - if (_dependencies) { - _currentBatch.push_back(_dependencies->extractFields(obj)); - } - else { - _currentBatch.push_back(Document::fromBsonWithMetaData(obj)); - } + // We have already validated the sharding version when we constructed the PlanExecutor + // so we shouldn't check it again. + const NamespaceString nss(_ns); + AutoGetCollectionForRead autoColl(pExpCtx->opCtx, nss); + + _exec->restoreState(pExpCtx->opCtx); + + int memUsageBytes = 0; + BSONObj obj; + PlanExecutor::ExecState state; + while ((state = _exec->getNext(&obj, NULL)) == PlanExecutor::ADVANCED) { + if (_dependencies) { + _currentBatch.push_back(_dependencies->extractFields(obj)); + } else { + _currentBatch.push_back(Document::fromBsonWithMetaData(obj)); + } - if (_limit) { - if (++_docsAddedToBatches == _limit->getLimit()) { - break; - } - verify(_docsAddedToBatches < _limit->getLimit()); + if (_limit) { + if (++_docsAddedToBatches == _limit->getLimit()) { + break; } + verify(_docsAddedToBatches < _limit->getLimit()); + } - memUsageBytes += _currentBatch.back().getApproximateSize(); + memUsageBytes += _currentBatch.back().getApproximateSize(); - if (memUsageBytes > MaxBytesToReturnToClientAtOnce) { - // End this batch and prepare PlanExecutor for yielding. - _exec->saveState(); - return; - } + if (memUsageBytes > MaxBytesToReturnToClientAtOnce) { + // End this batch and prepare PlanExecutor for yielding. + _exec->saveState(); + return; } + } - // If we got here, there won't be any more documents, so destroy the executor. Can't use - // dispose since we want to keep the _currentBatch. - _exec.reset(); - - uassert(16028, str::stream() << "collection or index disappeared when cursor yielded: " - << WorkingSetCommon::toStatusString(obj), - state != PlanExecutor::DEAD); + // If we got here, there won't be any more documents, so destroy the executor. Can't use + // dispose since we want to keep the _currentBatch. + _exec.reset(); - uassert(17285, str::stream() << "cursor encountered an error: " - << WorkingSetCommon::toStatusString(obj), - state != PlanExecutor::FAILURE); + uassert(16028, + str::stream() << "collection or index disappeared when cursor yielded: " + << WorkingSetCommon::toStatusString(obj), + state != PlanExecutor::DEAD); - massert(17286, str::stream() << "Unexpected return from PlanExecutor::getNext: " << state, - state == PlanExecutor::IS_EOF || state == PlanExecutor::ADVANCED); - } + uassert( + 17285, + str::stream() << "cursor encountered an error: " << WorkingSetCommon::toStatusString(obj), + state != PlanExecutor::FAILURE); - void DocumentSourceCursor::setSource(DocumentSource *pSource) { - /* this doesn't take a source */ - verify(false); - } + massert(17286, + str::stream() << "Unexpected return from PlanExecutor::getNext: " << state, + state == PlanExecutor::IS_EOF || state == PlanExecutor::ADVANCED); +} - long long DocumentSourceCursor::getLimit() const { - return _limit ? _limit->getLimit() : -1; - } +void DocumentSourceCursor::setSource(DocumentSource* pSource) { + /* this doesn't take a source */ + verify(false); +} - bool DocumentSourceCursor::coalesce(const intrusive_ptr<DocumentSource>& nextSource) { - // Note: Currently we assume the $limit is logically after any $sort or - // $match. If we ever pull in $match or $sort using this method, we - // will need to keep track of the order of the sub-stages. +long long DocumentSourceCursor::getLimit() const { + return _limit ? _limit->getLimit() : -1; +} - if (!_limit) { - _limit = dynamic_cast<DocumentSourceLimit*>(nextSource.get()); - return _limit.get(); // false if next is not a $limit - } - else { - return _limit->coalesce(nextSource); - } +bool DocumentSourceCursor::coalesce(const intrusive_ptr<DocumentSource>& nextSource) { + // Note: Currently we assume the $limit is logically after any $sort or + // $match. If we ever pull in $match or $sort using this method, we + // will need to keep track of the order of the sub-stages. - return false; + if (!_limit) { + _limit = dynamic_cast<DocumentSourceLimit*>(nextSource.get()); + return _limit.get(); // false if next is not a $limit + } else { + return _limit->coalesce(nextSource); } - Value DocumentSourceCursor::serialize(bool explain) const { - // we never parse a documentSourceCursor, so we only serialize for explain - if (!explain) - return Value(); + return false; +} - // Get planner-level explain info from the underlying PlanExecutor. - BSONObjBuilder explainBuilder; - { - const NamespaceString nss(_ns); - AutoGetCollectionForRead autoColl(pExpCtx->opCtx, nss); +Value DocumentSourceCursor::serialize(bool explain) const { + // we never parse a documentSourceCursor, so we only serialize for explain + if (!explain) + return Value(); - massert(17392, "No _exec. Were we disposed before explained?", _exec); + // Get planner-level explain info from the underlying PlanExecutor. + BSONObjBuilder explainBuilder; + { + const NamespaceString nss(_ns); + AutoGetCollectionForRead autoColl(pExpCtx->opCtx, nss); - _exec->restoreState(pExpCtx->opCtx); - Explain::explainStages(_exec.get(), ExplainCommon::QUERY_PLANNER, &explainBuilder); - _exec->saveState(); - } + massert(17392, "No _exec. Were we disposed before explained?", _exec); - MutableDocument out; - out["query"] = Value(_query); + _exec->restoreState(pExpCtx->opCtx); + Explain::explainStages(_exec.get(), ExplainCommon::QUERY_PLANNER, &explainBuilder); + _exec->saveState(); + } - if (!_sort.isEmpty()) - out["sort"] = Value(_sort); + MutableDocument out; + out["query"] = Value(_query); - if (_limit) - out["limit"] = Value(_limit->getLimit()); + if (!_sort.isEmpty()) + out["sort"] = Value(_sort); - if (!_projection.isEmpty()) - out["fields"] = Value(_projection); + if (_limit) + out["limit"] = Value(_limit->getLimit()); - // Add explain results from the query system into the agg explain output. - BSONObj explainObj = explainBuilder.obj(); - invariant(explainObj.hasField("queryPlanner")); - out["queryPlanner"] = Value(explainObj["queryPlanner"]); + if (!_projection.isEmpty()) + out["fields"] = Value(_projection); - return Value(DOC(getSourceName() << out.freezeToValue())); - } + // Add explain results from the query system into the agg explain output. + BSONObj explainObj = explainBuilder.obj(); + invariant(explainObj.hasField("queryPlanner")); + out["queryPlanner"] = Value(explainObj["queryPlanner"]); - DocumentSourceCursor::DocumentSourceCursor(const string& ns, - const std::shared_ptr<PlanExecutor>& exec, - const intrusive_ptr<ExpressionContext> &pCtx) - : DocumentSource(pCtx) - , _docsAddedToBatches(0) - , _ns(ns) - , _exec(exec) - {} - - intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create( - const string& ns, - const std::shared_ptr<PlanExecutor>& exec, - const intrusive_ptr<ExpressionContext> &pExpCtx) { - return new DocumentSourceCursor(ns, exec, pExpCtx); - } + return Value(DOC(getSourceName() << out.freezeToValue())); +} - void DocumentSourceCursor::setProjection( - const BSONObj& projection, - const boost::optional<ParsedDeps>& deps) { - _projection = projection; - _dependencies = deps; - } +DocumentSourceCursor::DocumentSourceCursor(const string& ns, + const std::shared_ptr<PlanExecutor>& exec, + const intrusive_ptr<ExpressionContext>& pCtx) + : DocumentSource(pCtx), _docsAddedToBatches(0), _ns(ns), _exec(exec) {} + +intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create( + const string& ns, + const std::shared_ptr<PlanExecutor>& exec, + const intrusive_ptr<ExpressionContext>& pExpCtx) { + return new DocumentSourceCursor(ns, exec, pExpCtx); +} + +void DocumentSourceCursor::setProjection(const BSONObj& projection, + const boost::optional<ParsedDeps>& deps) { + _projection = projection; + _dependencies = deps; +} } |