/**
* Copyright 2011 (c) 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/query/explain.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
MONGO_FAIL_POINT_DEFINE(hangBeforeDocumentSourceCursorLoadBatch);
using boost::intrusive_ptr;
using std::shared_ptr;
using std::string;
const char* DocumentSourceCursor::getSourceName() const {
return "$cursor";
}
DocumentSource::GetNextResult DocumentSourceCursor::getNext() {
pExpCtx->checkForInterrupt();
if (_currentBatch.empty()) {
loadBatch();
if (_currentBatch.empty())
return GetNextResult::makeEOF();
}
Document out = std::move(_currentBatch.front());
_currentBatch.pop_front();
return std::move(out);
}
Document DocumentSourceCursor::transformBSONObjToDocument(const BSONObj& obj) const {
return _dependencies ? _dependencies->extractFields(obj) : Document::fromBsonWithMetaData(obj);
}
void DocumentSourceCursor::loadBatch() {
if (!_exec || _exec->isDisposed()) {
// No more documents.
return;
}
while (MONGO_FAIL_POINT(hangBeforeDocumentSourceCursorLoadBatch)) {
log() << "Hanging aggregation due to 'hangBeforeDocumentSourceCursorLoadBatch' failpoint";
sleepmillis(10);
}
PlanExecutor::ExecState state;
BSONObj resultObj;
{
AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss());
uassertStatusOK(repl::ReplicationCoordinator::get(pExpCtx->opCtx)
->checkCanServeReadsFor(pExpCtx->opCtx, _exec->nss(), true));
uassertStatusOK(_exec->restoreState());
int memUsageBytes = 0;
{
ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); });
while ((state = _exec->getNext(&resultObj, nullptr)) == PlanExecutor::ADVANCED) {
if (_shouldProduceEmptyDocs) {
_currentBatch.push_back(Document());
} else {
_currentBatch.push_back(transformBSONObjToDocument(resultObj));
}
if (_limit) {
if (++_docsAddedToBatches == _limit->getLimit()) {
break;
}
verify(_docsAddedToBatches < _limit->getLimit());
}
memUsageBytes += _currentBatch.back().getApproximateSize();
// As long as we're waiting for inserts, we shouldn't do any batching at this level
// we need the whole pipeline to see each document to see if we should stop waiting.
// Furthermore, if we need to return the latest oplog time (in the tailable and
// needs-merge case), batching will result in a wrong time.
if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
(pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) ||
memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
return;
}
}
// Special case for tailable cursor -- EOF doesn't preclude more results, so keep
// the PlanExecutor alive.
if (state == PlanExecutor::IS_EOF && pExpCtx->isTailableAwaitData()) {
_exec->saveState();
return;
}
}
// If we got here, there won't be any more documents, so destroy our PlanExecutor. Note we
// must hold a collection lock to destroy '_exec', but we can only assume that our locks are
// still held if '_exec' did not end in an error. If '_exec' encountered an error during a
// yield, the locks might be yielded.
if (state != PlanExecutor::DEAD && state != PlanExecutor::FAILURE) {
cleanupExecutor(autoColl);
}
}
switch (state) {
case PlanExecutor::ADVANCED:
case PlanExecutor::IS_EOF:
return; // We've reached our limit or exhausted the cursor.
case PlanExecutor::DEAD:
case PlanExecutor::FAILURE: {
_execStatus = WorkingSetCommon::getMemberObjectStatus(resultObj).withContext(
"Error in $cursor stage");
uassertStatusOK(_execStatus);
}
default:
MONGO_UNREACHABLE;
}
}
Pipeline::SourceContainer::iterator DocumentSourceCursor::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
auto nextLimit = dynamic_cast((*std::next(itr)).get());
if (nextLimit) {
if (_limit) {
// We already have an internal limit, set it to the more restrictive of the two.
_limit->setLimit(std::min(_limit->getLimit(), nextLimit->getLimit()));
} else {
_limit = nextLimit;
}
container->erase(std::next(itr));
return itr;
}
return std::next(itr);
}
void DocumentSourceCursor::recordPlanSummaryStats() {
invariant(_exec);
// Aggregation handles in-memory sort outside of the query sub-system. Given that we need to
// preserve the existing value of hasSortStage rather than overwrite with the underlying
// PlanExecutor's value.
auto hasSortStage = _planSummaryStats.hasSortStage;
Explain::getSummaryStats(*_exec, &_planSummaryStats);
_planSummaryStats.hasSortStage = hasSortStage;
}
Value DocumentSourceCursor::serialize(boost::optional verbosity) const {
// We never parse a DocumentSourceCursor, so we only serialize for explain.
if (!verbosity)
return Value();
invariant(_exec);
uassert(50660,
"Mismatch between verbosity passed to serialize() and expression context verbosity",
verbosity == pExpCtx->explain);
MutableDocument out;
out["query"] = Value(_query);
if (!_sort.isEmpty())
out["sort"] = Value(_sort);
if (_limit)
out["limit"] = Value(_limit->getLimit());
if (!_projection.isEmpty())
out["fields"] = Value(_projection);
BSONObjBuilder explainStatsBuilder;
{
auto opCtx = pExpCtx->opCtx;
auto lockMode = getLockModeForQuery(opCtx);
AutoGetDb dbLock(opCtx, _exec->nss().db(), lockMode);
Lock::CollectionLock collLock(opCtx->lockState(), _exec->nss().ns(), lockMode);
auto collection =
dbLock.getDb() ? dbLock.getDb()->getCollection(opCtx, _exec->nss()) : nullptr;
Explain::explainStages(_exec.get(),
collection,
verbosity.get(),
_execStatus,
_winningPlanTrialStats.get(),
&explainStatsBuilder);
}
BSONObj explainStats = explainStatsBuilder.obj();
invariant(explainStats["queryPlanner"]);
out["queryPlanner"] = Value(explainStats["queryPlanner"]);
if (verbosity.get() >= ExplainOptions::Verbosity::kExecStats) {
invariant(explainStats["executionStats"]);
out["executionStats"] = Value(explainStats["executionStats"]);
}
return Value(DOC(getSourceName() << out.freezeToValue()));
}
void DocumentSourceCursor::detachFromOperationContext() {
if (_exec && !_exec->isDetached()) {
_exec->detachFromOperationContext();
}
}
void DocumentSourceCursor::reattachToOperationContext(OperationContext* opCtx) {
if (_exec) {
_exec->reattachToOperationContext(opCtx);
}
}
void DocumentSourceCursor::doDispose() {
_currentBatch.clear();
if (!_exec || _exec->isDisposed()) {
// We've already properly disposed of our PlanExecutor.
return;
}
cleanupExecutor();
}
void DocumentSourceCursor::cleanupExecutor() {
invariant(_exec);
auto* opCtx = pExpCtx->opCtx;
// We need to be careful to not use AutoGetCollection here, since we only need the lock to
// protect potential access to the Collection's CursorManager, and AutoGetCollection may throw
// if this namespace has since turned into a view. Using Database::getCollection() will simply
// return nullptr if the collection has since turned into a view. In this case, '_exec' will
// already have been marked as killed when the collection was dropped, and we won't need to
// access the CursorManager to properly dispose of it.
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
auto lockMode = getLockModeForQuery(opCtx);
AutoGetDb dbLock(opCtx, _exec->nss().db(), lockMode);
Lock::CollectionLock collLock(opCtx->lockState(), _exec->nss().ns(), lockMode);
auto collection = dbLock.getDb() ? dbLock.getDb()->getCollection(opCtx, _exec->nss()) : nullptr;
auto cursorManager = collection ? collection->getCursorManager() : nullptr;
_exec->dispose(opCtx, cursorManager);
// Not freeing _exec if we're in explain mode since it will be used in serialize() to gather
// execution stats.
if (!pExpCtx->explain) {
_exec.reset();
}
}
void DocumentSourceCursor::cleanupExecutor(const AutoGetCollectionForRead& readLock) {
invariant(_exec);
auto cursorManager =
readLock.getCollection() ? readLock.getCollection()->getCursorManager() : nullptr;
_exec->dispose(pExpCtx->opCtx, cursorManager);
// Not freeing _exec if we're in explain mode since it will be used in serialize() to gather
// execution stats.
if (!pExpCtx->explain) {
_exec.reset();
}
}
DocumentSourceCursor::~DocumentSourceCursor() {
if (pExpCtx->explain) {
invariant(_exec->isDisposed()); // _exec should have at least been disposed.
} else {
invariant(!_exec); // '_exec' should have been cleaned up via dispose() before destruction.
}
}
DocumentSourceCursor::DocumentSourceCursor(
Collection* collection,
std::unique_ptr exec,
const intrusive_ptr& pCtx)
: DocumentSource(pCtx),
_docsAddedToBatches(0),
_exec(std::move(exec)),
_outputSorts(_exec->getOutputSorts()) {
// Later code in the DocumentSourceCursor lifecycle expects that '_exec' is in a saved state.
_exec->saveState();
_planSummary = Explain::getPlanSummary(_exec.get());
recordPlanSummaryStats();
if (pExpCtx->explain) {
// It's safe to access the executor even if we don't have the collection lock since we're
// just going to call getStats() on it.
_winningPlanTrialStats = Explain::getWinningPlanTrialStats(_exec.get());
}
if (collection) {
collection->infoCache()->notifyOfQuery(pExpCtx->opCtx, _planSummaryStats.indexesUsed);
}
}
intrusive_ptr DocumentSourceCursor::create(
Collection* collection,
std::unique_ptr exec,
const intrusive_ptr& pExpCtx) {
intrusive_ptr source(
new DocumentSourceCursor(collection, std::move(exec), pExpCtx));
return source;
}
}