/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/document_source_sequential_document_cache.h" #include "mongo/db/pipeline/document_source_match.h" namespace mongo { constexpr StringData DocumentSourceSequentialDocumentCache::kStageName; DocumentSourceSequentialDocumentCache::DocumentSourceSequentialDocumentCache( const boost::intrusive_ptr& expCtx, SequentialDocumentCache* cache) : DocumentSource(expCtx), _cache(cache) { invariant(_cache); invariant(!_cache->isAbandoned()); if (_cache->isServing()) { _cache->restartIteration(); } } DocumentSource::GetNextResult DocumentSourceSequentialDocumentCache::getNext() { // Either we're reading from the cache, or we have an input source to build the cache from. invariant(pSource || _cache->isServing()); pExpCtx->checkForInterrupt(); if (_cache->isServing()) { auto nextDoc = _cache->getNext(); return (nextDoc ? std::move(*nextDoc) : GetNextResult::makeEOF()); } auto nextResult = pSource->getNext(); if (!_cache->isAbandoned()) { if (nextResult.isEOF()) { _cache->freeze(); } else { _cache->add(nextResult.getDocument()); } } return nextResult; } Pipeline::SourceContainer::iterator DocumentSourceSequentialDocumentCache::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { // The DocumentSourceSequentialDocumentCache should always be the last stage in the pipeline // pre-optimization. By the time optimization reaches this point, all preceding stages are in // the final positions which they would have occupied if no cache stage was present. invariant(_hasOptimizedPos || std::next(itr) == container->end()); invariant((*itr).get() == this); // If we have already optimized our position, stay where we are. if (_hasOptimizedPos) { return std::next(itr); } // Mark this stage as having optimized itself. _hasOptimizedPos = true; // If the cache is the only stage in the pipeline, return immediately. if (itr == container->begin()) { return container->end(); } // Pop the cache stage off the back of the pipeline. auto cacheStage = std::move(*itr); container->erase(itr); // Get all variable IDs defined in this scope. auto varIDs = pExpCtx->variablesParseState.getDefinedVariableIDs(); auto prefixSplit = container->begin(); DepsTracker deps; // Iterate through the pipeline stages until we find one which references an external variable. for (; prefixSplit != container->end(); ++prefixSplit) { (*prefixSplit)->getDependencies(&deps); if (deps.hasVariableReferenceTo(varIDs)) { break; } } // The 'prefixSplit' iterator is now pointing to the first stage of the correlated suffix. If // the split point is the first stage, then the entire pipeline is correlated and we should not // attempt to perform any caching. Abandon the cache and return. if (prefixSplit == container->begin()) { _cache->abandon(); return container->end(); } // If the cache has been populated and is serving results, remove the non-correlated prefix. if (_cache->isServing()) { container->erase(container->begin(), prefixSplit); } container->insert(prefixSplit, std::move(cacheStage)); return container->end(); } Value DocumentSourceSequentialDocumentCache::serialize( boost::optional explain) const { if (explain) { return Value(Document{ {kStageName, Document{{"maxSizeBytes"_sd, Value(static_cast(_cache->maxSizeBytes()))}, {"status"_sd, _cache->isBuilding() ? "kBuilding"_sd : _cache->isServing() ? "kServing"_sd : "kAbandoned"_sd}}}}); } return Value(); } } // namesace mongo