/** * Copyright (c) 2012 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "mongo/pch.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/client/dbclientinterface.h" #include "mongo/db/cursor.h" #include "mongo/db/instance.h" #include "mongo/db/parsed_query.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query_optimizer.h" #include "mongo/s/d_logic.h" namespace mongo { namespace { class MongodImplementation : public DocumentSourceNeedsMongod::MongodInterface { public: DBClientBase* directClient() { return &_client; } bool isSharded(const NamespaceString& ns) { const ChunkVersion unsharded(0, 0, OID()); return !(shardingState.getVersion(ns.ns()).isWriteCompatibleWith(unsharded)); } private: DBDirectClient _client; }; } void PipelineD::prepareCursorSource( const intrusive_ptr &pPipeline, const string &dbName, const intrusive_ptr &pExpCtx) { // We will be modifying the source vector as we go Pipeline::SourceContainer& sources = pPipeline->sources; // Inject a MongodImplementation to sources that need them. for (size_t i = 0; i < sources.size(); i++) { DocumentSourceNeedsMongod* needsMongod = dynamic_cast(sources[i].get()); if (needsMongod) { needsMongod->injectMongodInterface(boost::make_shared()); } } if (!sources.empty() && sources.front()->isValidInitialSource()) { if (dynamic_cast(sources.front().get())) { // Enable the hooks for setting up authentication on the subsequent internal // connections we are going to create. This would normally have been done // when SetShardVersion was called, but since SetShardVersion is never called // on secondaries, this is needed. ShardedConnectionInfo::addHook(); } return; // don't need a cursor } /* look for an initial match */ BSONObjBuilder queryBuilder; bool initQuery = pPipeline->getInitialQuery(&queryBuilder); if (initQuery) { /* This will get built in to the Cursor we'll create, so remove the match from the pipeline */ sources.pop_front(); } /* Create a query object. This works whether we got an initial query above or not; if not, it results in a "{}" query, which will be what we want in that case. We create a pointer to a shared object instead of a local object so that we can preserve it for the Cursor we're going to create below. */ BSONObj queryObj = queryBuilder.obj(); /* Look for an initial simple project; we'll avoid constructing Values * for fields that won't make it through the projection. */ bool haveProjection = false; BSONObj projection; DocumentSource::ParsedDeps dependencies; { set deps; DocumentSource::GetDepsReturn status = DocumentSource::SEE_NEXT; for (size_t i=0; i < sources.size() && status == DocumentSource::SEE_NEXT; i++) { status = sources[i]->getDependencies(deps); } if (status == DocumentSource::EXHAUSTIVE) { projection = DocumentSource::depsToProjection(deps); dependencies = DocumentSource::parseDeps(deps); haveProjection = true; } } /* Look for an initial sort; we'll try to add this to the Cursor we create. If we're successful in doing that (further down), we'll remove the $sort from the pipeline, because the documents will already come sorted in the specified order as a result of the index scan. */ intrusive_ptr pSort; BSONObj sortObj; if (!sources.empty()) { const intrusive_ptr &pSC = sources.front(); pSort = dynamic_cast(pSC.get()); if (pSort) { // build the sort key sortObj = pSort->serializeSortKey().toBson(); } } // get the full "namespace" name string fullName(dbName + "." + pPipeline->getCollectionName()); // for debugging purposes, show what the query and sort are DEV { (log() << "\n---- query BSON\n" << queryObj.jsonString(Strict, 1) << "\n----\n"); (log() << "\n---- sort BSON\n" << sortObj.jsonString(Strict, 1) << "\n----\n"); (log() << "\n---- fullName\n" << fullName << "\n----\n"); } // Create the necessary context to use a Cursor, including taking a namespace read lock, // see SERVER-6123. // Note: this may throw if the sharding version for this connection is out of date. Client::ReadContext context(fullName); /* Create the cursor. If we try to create a cursor that includes both the match and the sort, and the two are incompatible wrt the available indexes, then we don't get a cursor back. So we try to use both first. If that fails, try again, without the sort. If we don't have a sort, jump straight to just creating a cursor without the sort. If we are able to incorporate the sort into the cursor, remove it from the head of the pipeline. LATER - we should be able to find this out before we create the cursor. Either way, we can then apply other optimizations there are tickets for, such as SERVER-4507. */ shared_ptr pCursor; bool initSort = false; if (pSort) { const BSONObj queryAndSort = BSON("$query" << queryObj << "$orderby" << sortObj); shared_ptr pq (new ParsedQuery( fullName.c_str(), 0, 0, QueryOption_NoCursorTimeout, queryAndSort, projection)); /* try to create the cursor with the query and the sort */ shared_ptr pSortedCursor( getOptimizedCursor( fullName.c_str(), queryObj, sortObj, QueryPlanSelectionPolicy::any(), pq)); if (pSortedCursor.get()) { /* success: remove the sort from the pipeline */ sources.pop_front(); if (pSort->getLimitSrc()) { // need to reinsert coalesced $limit after removing $sort sources.push_front(pSort->getLimitSrc()); } pCursor = pSortedCursor; initSort = true; } } if (!pCursor.get()) { shared_ptr pq (new ParsedQuery( fullName.c_str(), 0, 0, QueryOption_NoCursorTimeout, queryObj, projection)); /* try to create the cursor without the sort */ shared_ptr pUnsortedCursor( getOptimizedCursor( fullName.c_str(), queryObj, BSONObj(), QueryPlanSelectionPolicy::any(), pq)); pCursor = pUnsortedCursor; } // Now wrap the Cursor in ClientCursor ClientCursorHolder cursor( new ClientCursor(QueryOption_NoCursorTimeout, pCursor, fullName)); CursorId cursorId = cursor->cursorid(); massert(16917, str::stream() << "cursor " << cursor->c()->toString() << "does its own locking so it can't be used with aggregation", cursor->c()->requiresLock()); // Prepare the cursor for data to change under it when we unlock if (cursor->c()->supportYields()) { ClientCursor::YieldData data; cursor->prepareToYield(data); } else { massert(16915, str::stream() << "cursor " << cursor->c()->toString() << " supports neither yields nor getMore, one of which" << " must be supported in an aggregation source", cursor->c()->supportGetMore()); cursor->c()->noteLocation(); } cursor.release(); // it is now owned by the client cursor manager /* wrap the cursor with a DocumentSource and return that */ intrusive_ptr pSource( DocumentSourceCursor::create( fullName, cursorId, pExpCtx ) ); /* Note the query and sort This records them for explain, and keeps them alive; they are referenced (by reference) by the cursor, which doesn't make its own copies of them. */ pSource->setQuery(queryObj); if (initSort) pSource->setSort(sortObj); if (haveProjection) { pSource->setProjection(projection, dependencies); } while (!sources.empty() && pSource->coalesce(sources.front())) { sources.pop_front(); } // If we are in an explain, we won't actually use the created cursor so release it. if (pPipeline->isExplain()) pSource->dispose(); pPipeline->addInitialSource(pSource); } } // namespace mongo