summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_cursor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_cursor.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp274
1 files changed, 135 insertions, 139 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index d862663363d..702852f53b2 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -43,179 +43,175 @@
namespace mongo {
- using boost::intrusive_ptr;
- using std::shared_ptr;
- using std::string;
+using boost::intrusive_ptr;
+using std::shared_ptr;
+using std::string;
- DocumentSourceCursor::~DocumentSourceCursor() {
- dispose();
- }
-
- const char *DocumentSourceCursor::getSourceName() const {
- return "$cursor";
- }
+DocumentSourceCursor::~DocumentSourceCursor() {
+ dispose();
+}
- boost::optional<Document> DocumentSourceCursor::getNext() {
- pExpCtx->checkForInterrupt();
+const char* DocumentSourceCursor::getSourceName() const {
+ return "$cursor";
+}
- if (_currentBatch.empty()) {
- loadBatch();
+boost::optional<Document> DocumentSourceCursor::getNext() {
+ pExpCtx->checkForInterrupt();
- if (_currentBatch.empty()) // exhausted the cursor
- return boost::none;
- }
+ if (_currentBatch.empty()) {
+ loadBatch();
- Document out = _currentBatch.front();
- _currentBatch.pop_front();
- return out;
+ if (_currentBatch.empty()) // exhausted the cursor
+ return boost::none;
}
- void DocumentSourceCursor::dispose() {
- // Can't call in to PlanExecutor or ClientCursor registries from this function since it
- // will be called when an agg cursor is killed which would cause a deadlock.
- _exec.reset();
- _currentBatch.clear();
- }
-
- void DocumentSourceCursor::loadBatch() {
- if (!_exec) {
- dispose();
- return;
- }
+ Document out = _currentBatch.front();
+ _currentBatch.pop_front();
+ return out;
+}
- // We have already validated the sharding version when we constructed the PlanExecutor
- // so we shouldn't check it again.
- const NamespaceString nss(_ns);
- AutoGetCollectionForRead autoColl(pExpCtx->opCtx, nss);
+void DocumentSourceCursor::dispose() {
+ // Can't call in to PlanExecutor or ClientCursor registries from this function since it
+ // will be called when an agg cursor is killed which would cause a deadlock.
+ _exec.reset();
+ _currentBatch.clear();
+}
- _exec->restoreState(pExpCtx->opCtx);
+void DocumentSourceCursor::loadBatch() {
+ if (!_exec) {
+ dispose();
+ return;
+ }
- int memUsageBytes = 0;
- BSONObj obj;
- PlanExecutor::ExecState state;
- while ((state = _exec->getNext(&obj, NULL)) == PlanExecutor::ADVANCED) {
- if (_dependencies) {
- _currentBatch.push_back(_dependencies->extractFields(obj));
- }
- else {
- _currentBatch.push_back(Document::fromBsonWithMetaData(obj));
- }
+ // We have already validated the sharding version when we constructed the PlanExecutor
+ // so we shouldn't check it again.
+ const NamespaceString nss(_ns);
+ AutoGetCollectionForRead autoColl(pExpCtx->opCtx, nss);
+
+ _exec->restoreState(pExpCtx->opCtx);
+
+ int memUsageBytes = 0;
+ BSONObj obj;
+ PlanExecutor::ExecState state;
+ while ((state = _exec->getNext(&obj, NULL)) == PlanExecutor::ADVANCED) {
+ if (_dependencies) {
+ _currentBatch.push_back(_dependencies->extractFields(obj));
+ } else {
+ _currentBatch.push_back(Document::fromBsonWithMetaData(obj));
+ }
- if (_limit) {
- if (++_docsAddedToBatches == _limit->getLimit()) {
- break;
- }
- verify(_docsAddedToBatches < _limit->getLimit());
+ if (_limit) {
+ if (++_docsAddedToBatches == _limit->getLimit()) {
+ break;
}
+ verify(_docsAddedToBatches < _limit->getLimit());
+ }
- memUsageBytes += _currentBatch.back().getApproximateSize();
+ memUsageBytes += _currentBatch.back().getApproximateSize();
- if (memUsageBytes > MaxBytesToReturnToClientAtOnce) {
- // End this batch and prepare PlanExecutor for yielding.
- _exec->saveState();
- return;
- }
+ if (memUsageBytes > MaxBytesToReturnToClientAtOnce) {
+ // End this batch and prepare PlanExecutor for yielding.
+ _exec->saveState();
+ return;
}
+ }
- // If we got here, there won't be any more documents, so destroy the executor. Can't use
- // dispose since we want to keep the _currentBatch.
- _exec.reset();
-
- uassert(16028, str::stream() << "collection or index disappeared when cursor yielded: "
- << WorkingSetCommon::toStatusString(obj),
- state != PlanExecutor::DEAD);
+ // If we got here, there won't be any more documents, so destroy the executor. Can't use
+ // dispose since we want to keep the _currentBatch.
+ _exec.reset();
- uassert(17285, str::stream() << "cursor encountered an error: "
- << WorkingSetCommon::toStatusString(obj),
- state != PlanExecutor::FAILURE);
+ uassert(16028,
+ str::stream() << "collection or index disappeared when cursor yielded: "
+ << WorkingSetCommon::toStatusString(obj),
+ state != PlanExecutor::DEAD);
- massert(17286, str::stream() << "Unexpected return from PlanExecutor::getNext: " << state,
- state == PlanExecutor::IS_EOF || state == PlanExecutor::ADVANCED);
- }
+ uassert(
+ 17285,
+ str::stream() << "cursor encountered an error: " << WorkingSetCommon::toStatusString(obj),
+ state != PlanExecutor::FAILURE);
- void DocumentSourceCursor::setSource(DocumentSource *pSource) {
- /* this doesn't take a source */
- verify(false);
- }
+ massert(17286,
+ str::stream() << "Unexpected return from PlanExecutor::getNext: " << state,
+ state == PlanExecutor::IS_EOF || state == PlanExecutor::ADVANCED);
+}
- long long DocumentSourceCursor::getLimit() const {
- return _limit ? _limit->getLimit() : -1;
- }
+void DocumentSourceCursor::setSource(DocumentSource* pSource) {
+ /* this doesn't take a source */
+ verify(false);
+}
- bool DocumentSourceCursor::coalesce(const intrusive_ptr<DocumentSource>& nextSource) {
- // Note: Currently we assume the $limit is logically after any $sort or
- // $match. If we ever pull in $match or $sort using this method, we
- // will need to keep track of the order of the sub-stages.
+long long DocumentSourceCursor::getLimit() const {
+ return _limit ? _limit->getLimit() : -1;
+}
- if (!_limit) {
- _limit = dynamic_cast<DocumentSourceLimit*>(nextSource.get());
- return _limit.get(); // false if next is not a $limit
- }
- else {
- return _limit->coalesce(nextSource);
- }
+bool DocumentSourceCursor::coalesce(const intrusive_ptr<DocumentSource>& nextSource) {
+ // Note: Currently we assume the $limit is logically after any $sort or
+ // $match. If we ever pull in $match or $sort using this method, we
+ // will need to keep track of the order of the sub-stages.
- return false;
+ if (!_limit) {
+ _limit = dynamic_cast<DocumentSourceLimit*>(nextSource.get());
+ return _limit.get(); // false if next is not a $limit
+ } else {
+ return _limit->coalesce(nextSource);
}
- Value DocumentSourceCursor::serialize(bool explain) const {
- // we never parse a documentSourceCursor, so we only serialize for explain
- if (!explain)
- return Value();
+ return false;
+}
- // Get planner-level explain info from the underlying PlanExecutor.
- BSONObjBuilder explainBuilder;
- {
- const NamespaceString nss(_ns);
- AutoGetCollectionForRead autoColl(pExpCtx->opCtx, nss);
+Value DocumentSourceCursor::serialize(bool explain) const {
+ // we never parse a documentSourceCursor, so we only serialize for explain
+ if (!explain)
+ return Value();
- massert(17392, "No _exec. Were we disposed before explained?", _exec);
+ // Get planner-level explain info from the underlying PlanExecutor.
+ BSONObjBuilder explainBuilder;
+ {
+ const NamespaceString nss(_ns);
+ AutoGetCollectionForRead autoColl(pExpCtx->opCtx, nss);
- _exec->restoreState(pExpCtx->opCtx);
- Explain::explainStages(_exec.get(), ExplainCommon::QUERY_PLANNER, &explainBuilder);
- _exec->saveState();
- }
+ massert(17392, "No _exec. Were we disposed before explained?", _exec);
- MutableDocument out;
- out["query"] = Value(_query);
+ _exec->restoreState(pExpCtx->opCtx);
+ Explain::explainStages(_exec.get(), ExplainCommon::QUERY_PLANNER, &explainBuilder);
+ _exec->saveState();
+ }
- if (!_sort.isEmpty())
- out["sort"] = Value(_sort);
+ MutableDocument out;
+ out["query"] = Value(_query);
- if (_limit)
- out["limit"] = Value(_limit->getLimit());
+ if (!_sort.isEmpty())
+ out["sort"] = Value(_sort);
- if (!_projection.isEmpty())
- out["fields"] = Value(_projection);
+ if (_limit)
+ out["limit"] = Value(_limit->getLimit());
- // Add explain results from the query system into the agg explain output.
- BSONObj explainObj = explainBuilder.obj();
- invariant(explainObj.hasField("queryPlanner"));
- out["queryPlanner"] = Value(explainObj["queryPlanner"]);
+ if (!_projection.isEmpty())
+ out["fields"] = Value(_projection);
- return Value(DOC(getSourceName() << out.freezeToValue()));
- }
+ // Add explain results from the query system into the agg explain output.
+ BSONObj explainObj = explainBuilder.obj();
+ invariant(explainObj.hasField("queryPlanner"));
+ out["queryPlanner"] = Value(explainObj["queryPlanner"]);
- DocumentSourceCursor::DocumentSourceCursor(const string& ns,
- const std::shared_ptr<PlanExecutor>& exec,
- const intrusive_ptr<ExpressionContext> &pCtx)
- : DocumentSource(pCtx)
- , _docsAddedToBatches(0)
- , _ns(ns)
- , _exec(exec)
- {}
-
- intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create(
- const string& ns,
- const std::shared_ptr<PlanExecutor>& exec,
- const intrusive_ptr<ExpressionContext> &pExpCtx) {
- return new DocumentSourceCursor(ns, exec, pExpCtx);
- }
+ return Value(DOC(getSourceName() << out.freezeToValue()));
+}
- void DocumentSourceCursor::setProjection(
- const BSONObj& projection,
- const boost::optional<ParsedDeps>& deps) {
- _projection = projection;
- _dependencies = deps;
- }
+DocumentSourceCursor::DocumentSourceCursor(const string& ns,
+ const std::shared_ptr<PlanExecutor>& exec,
+ const intrusive_ptr<ExpressionContext>& pCtx)
+ : DocumentSource(pCtx), _docsAddedToBatches(0), _ns(ns), _exec(exec) {}
+
+intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create(
+ const string& ns,
+ const std::shared_ptr<PlanExecutor>& exec,
+ const intrusive_ptr<ExpressionContext>& pExpCtx) {
+ return new DocumentSourceCursor(ns, exec, pExpCtx);
+}
+
+void DocumentSourceCursor::setProjection(const BSONObj& projection,
+ const boost::optional<ParsedDeps>& deps) {
+ _projection = projection;
+ _dependencies = deps;
+}
}