/** * Copyright 2011 (c) 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/instance.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/query/explain.h" #include "mongo/db/query/find_constants.h" #include "mongo/db/storage_options.h" #include "mongo/s/d_state.h" namespace mongo { DocumentSourceCursor::~DocumentSourceCursor() { dispose(); } const char *DocumentSourceCursor::getSourceName() const { return "$cursor"; } boost::optional DocumentSourceCursor::getNext() { pExpCtx->checkForInterrupt(); if (_currentBatch.empty()) { loadBatch(); if (_currentBatch.empty()) // exhausted the cursor return boost::none; } Document out = _currentBatch.front(); _currentBatch.pop_front(); return out; } void DocumentSourceCursor::dispose() { // Can't call in to PlanExecutor or ClientCursor registries from this function since it // will be called when an agg cursor is killed which would cause a deadlock. _exec.reset(); _currentBatch.clear(); } void DocumentSourceCursor::loadBatch() { if (!_exec) { dispose(); return; } // We have already validated the sharding version when we constructed the PlanExecutor // so we shouldn't check it again. Lock::DBRead lk(pExpCtx->opCtx->lockState(), _ns); Client::Context ctx(pExpCtx->opCtx, _ns, /*doVersion=*/false); _exec->restoreState(pExpCtx->opCtx); int memUsageBytes = 0; BSONObj obj; PlanExecutor::ExecState state; while ((state = _exec->getNext(&obj, NULL)) == PlanExecutor::ADVANCED) { if (_dependencies) { _currentBatch.push_back(_dependencies->extractFields(obj)); } else { _currentBatch.push_back(Document::fromBsonWithMetaData(obj)); } if (_limit) { if (++_docsAddedToBatches == _limit->getLimit()) { break; } verify(_docsAddedToBatches < _limit->getLimit()); } memUsageBytes += _currentBatch.back().getApproximateSize(); if (memUsageBytes > MaxBytesToReturnToClientAtOnce) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); return; } } // If we got here, there won't be any more documents, so destroy the executor. Can't use // dispose since we want to keep the _currentBatch. _exec.reset(); uassert(16028, "collection or index disappeared when cursor yielded", state != PlanExecutor::DEAD); uassert(17285, "cursor encountered an error: " + WorkingSetCommon::toStatusString(obj), state != PlanExecutor::EXEC_ERROR); massert(17286, str::stream() << "Unexpected return from PlanExecutor::getNext: " << state, state == PlanExecutor::IS_EOF || state == PlanExecutor::ADVANCED); } void DocumentSourceCursor::setSource(DocumentSource *pSource) { /* this doesn't take a source */ verify(false); } long long DocumentSourceCursor::getLimit() const { return _limit ? _limit->getLimit() : -1; } bool DocumentSourceCursor::coalesce(const intrusive_ptr& nextSource) { // Note: Currently we assume the $limit is logically after any $sort or // $match. If we ever pull in $match or $sort using this method, we // will need to keep track of the order of the sub-stages. if (!_limit) { _limit = dynamic_cast(nextSource.get()); return _limit.get(); // false if next is not a $limit } else { return _limit->coalesce(nextSource); } return false; } Value DocumentSourceCursor::serialize(bool explain) const { // we never parse a documentSourceCursor, so we only serialize for explain if (!explain) return Value(); // Get planner-level explain info from the underlying PlanExecutor. BSONObjBuilder explainBuilder; Status explainStatus(ErrorCodes::InternalError, ""); { Lock::DBRead lk(pExpCtx->opCtx->lockState(), _ns); Client::Context ctx(pExpCtx->opCtx, _ns, /*doVersion=*/ false); massert(17392, "No _exec. Were we disposed before explained?", _exec); _exec->restoreState(pExpCtx->opCtx); explainStatus = Explain::explainStages(pExpCtx->opCtx, _exec.get(), ExplainCommon::QUERY_PLANNER, &explainBuilder); _exec->saveState(); } MutableDocument out; out["query"] = Value(_query); if (!_sort.isEmpty()) out["sort"] = Value(_sort); if (_limit) out["limit"] = Value(_limit->getLimit()); if (!_projection.isEmpty()) out["fields"] = Value(_projection); // Add explain results from the query system into the agg explain output. if (explainStatus.isOK()) { BSONObj explainObj = explainBuilder.obj(); invariant(explainObj.hasField("queryPlanner")); out["queryPlanner"] = Value(explainObj["queryPlanner"]); } else { out["planError"] = Value(explainStatus.toString()); } return Value(DOC(getSourceName() << out.freezeToValue())); } DocumentSourceCursor::DocumentSourceCursor(const string& ns, const boost::shared_ptr& exec, const intrusive_ptr &pCtx) : DocumentSource(pCtx) , _docsAddedToBatches(0) , _ns(ns) , _exec(exec) {} intrusive_ptr DocumentSourceCursor::create( const string& ns, const boost::shared_ptr& exec, const intrusive_ptr &pExpCtx) { return new DocumentSourceCursor(ns, exec, pExpCtx); } void DocumentSourceCursor::setProjection( const BSONObj& projection, const boost::optional& deps) { _projection = projection; _dependencies = deps; } }