diff options
Diffstat (limited to 'src/mongo/db/commands/find_cmd.cpp')
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 541 |
1 files changed, 270 insertions, 271 deletions
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 0bc9589bbef..d40ff0f766e 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -55,313 +55,312 @@ namespace mongo { - /** - * A command for running .find() queries. - */ - class FindCmd : public Command { - MONGO_DISALLOW_COPYING(FindCmd); - public: - FindCmd() : Command("find") { } +/** + * A command for running .find() queries. + */ +class FindCmd : public Command { + MONGO_DISALLOW_COPYING(FindCmd); - bool isWriteCommandForConfigServer() const override { return false; } +public: + FindCmd() : Command("find") {} - bool slaveOk() const override { return false; } + bool isWriteCommandForConfigServer() const override { + return false; + } - bool slaveOverrideOk() const override { return true; } + bool slaveOk() const override { + return false; + } - bool maintenanceOk() const override { return false; } + bool slaveOverrideOk() const override { + return true; + } - bool adminOnly() const override { return false; } + bool maintenanceOk() const override { + return false; + } - void help(std::stringstream& help) const override { - help << "query for documents"; - } + bool adminOnly() const override { + return false; + } - /** - * A find command does not increment the command counter, but rather increments the - * query counter. - */ - bool shouldAffectCommandCounter() const override { return false; } + void help(std::stringstream& help) const override { + help << "query for documents"; + } - Status checkAuthForCommand(ClientBasic* client, - const std::string& dbname, - const BSONObj& cmdObj) override { - AuthorizationSession* authzSession = AuthorizationSession::get(client); - ResourcePattern pattern = parseResourcePattern(dbname, cmdObj); + /** + * A find command does not increment the command counter, but rather increments the + * query counter. + */ + bool shouldAffectCommandCounter() const override { + return false; + } - if (authzSession->isAuthorizedForActionsOnResource(pattern, ActionType::find)) { - return Status::OK(); - } + Status checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj) override { + AuthorizationSession* authzSession = AuthorizationSession::get(client); + ResourcePattern pattern = parseResourcePattern(dbname, cmdObj); - return Status(ErrorCodes::Unauthorized, "unauthorized"); + if (authzSession->isAuthorizedForActionsOnResource(pattern, ActionType::find)) { + return Status::OK(); } - Status explain(OperationContext* txn, - const std::string& dbname, - const BSONObj& cmdObj, - ExplainCommon::Verbosity verbosity, - BSONObjBuilder* out) const override { - const std::string fullns = parseNs(dbname, cmdObj); - const NamespaceString nss(fullns); - if (!nss.isValid()) { - return {ErrorCodes::InvalidNamespace, - str::stream() << "Invalid collection name: " << nss.ns()}; - } + return Status(ErrorCodes::Unauthorized, "unauthorized"); + } + + Status explain(OperationContext* txn, + const std::string& dbname, + const BSONObj& cmdObj, + ExplainCommon::Verbosity verbosity, + BSONObjBuilder* out) const override { + const std::string fullns = parseNs(dbname, cmdObj); + const NamespaceString nss(fullns); + if (!nss.isValid()) { + return {ErrorCodes::InvalidNamespace, + str::stream() << "Invalid collection name: " << nss.ns()}; + } - // Parse the command BSON to a LiteParsedQuery. - const bool isExplain = true; - auto lpqStatus = LiteParsedQuery::makeFromFindCommand(nss, cmdObj, isExplain); - if (!lpqStatus.isOK()) { - return lpqStatus.getStatus(); - } + // Parse the command BSON to a LiteParsedQuery. + const bool isExplain = true; + auto lpqStatus = LiteParsedQuery::makeFromFindCommand(nss, cmdObj, isExplain); + if (!lpqStatus.isOK()) { + return lpqStatus.getStatus(); + } - // Finish the parsing step by using the LiteParsedQuery to create a CanonicalQuery. - std::unique_ptr<CanonicalQuery> cq; - { - CanonicalQuery* rawCq; - WhereCallbackReal whereCallback(txn, nss.db()); - Status canonStatus = CanonicalQuery::canonicalize(lpqStatus.getValue().release(), - &rawCq, - whereCallback); - if (!canonStatus.isOK()) { - return canonStatus; - } - cq.reset(rawCq); + // Finish the parsing step by using the LiteParsedQuery to create a CanonicalQuery. + std::unique_ptr<CanonicalQuery> cq; + { + CanonicalQuery* rawCq; + WhereCallbackReal whereCallback(txn, nss.db()); + Status canonStatus = + CanonicalQuery::canonicalize(lpqStatus.getValue().release(), &rawCq, whereCallback); + if (!canonStatus.isOK()) { + return canonStatus; } + cq.reset(rawCq); + } - AutoGetCollectionForRead ctx(txn, nss); - // The collection may be NULL. If so, getExecutor() should handle it by returning - // an execution tree with an EOFStage. - Collection* collection = ctx.getCollection(); - - // We have a parsed query. Time to get the execution plan for it. - std::unique_ptr<PlanExecutor> exec; - { - PlanExecutor* rawExec; - Status execStatus = getExecutorFind(txn, - collection, - nss, - cq.release(), - PlanExecutor::YIELD_AUTO, - &rawExec); - if (!execStatus.isOK()) { - return execStatus; - } - exec.reset(rawExec); + AutoGetCollectionForRead ctx(txn, nss); + // The collection may be NULL. If so, getExecutor() should handle it by returning + // an execution tree with an EOFStage. + Collection* collection = ctx.getCollection(); + + // We have a parsed query. Time to get the execution plan for it. + std::unique_ptr<PlanExecutor> exec; + { + PlanExecutor* rawExec; + Status execStatus = getExecutorFind( + txn, collection, nss, cq.release(), PlanExecutor::YIELD_AUTO, &rawExec); + if (!execStatus.isOK()) { + return execStatus; } + exec.reset(rawExec); + } - // Got the execution tree. Explain it. - Explain::explainStages(exec.get(), verbosity, out); - return Status::OK(); + // Got the execution tree. Explain it. + Explain::explainStages(exec.get(), verbosity, out); + return Status::OK(); + } + + /** + * Runs a query using the following steps: + * 1) Parsing. + * 2) Acquire locks. + * 3) Plan query, obtaining an executor that can run it. + * 4) Setup a cursor for the query, which may be used on subsequent getMores. + * 5) Generate the first batch. + * 6) Save state for getMore. + * 7) Generate response to send to the client. + * + * TODO: Rather than using the sharding version available in thread-local storage + * (i.e. call to shardingState.needCollectionMetadata() below), shard version + * information should be passed as part of the command parameter. + */ + bool run(OperationContext* txn, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) override { + const std::string fullns = parseNs(dbname, cmdObj); + const NamespaceString nss(fullns); + if (!nss.isValid()) { + return appendCommandStatus(result, + {ErrorCodes::InvalidNamespace, + str::stream() << "Invalid collection name: " << nss.ns()}); } - /** - * Runs a query using the following steps: - * 1) Parsing. - * 2) Acquire locks. - * 3) Plan query, obtaining an executor that can run it. - * 4) Setup a cursor for the query, which may be used on subsequent getMores. - * 5) Generate the first batch. - * 6) Save state for getMore. - * 7) Generate response to send to the client. - * - * TODO: Rather than using the sharding version available in thread-local storage - * (i.e. call to shardingState.needCollectionMetadata() below), shard version - * information should be passed as part of the command parameter. - */ - bool run(OperationContext* txn, - const std::string& dbname, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result) override { - const std::string fullns = parseNs(dbname, cmdObj); - const NamespaceString nss(fullns); - if (!nss.isValid()) { - return appendCommandStatus(result, {ErrorCodes::InvalidNamespace, - str::stream() << "Invalid collection name: " - << nss.ns()}); - } + // Although it is a command, a find command gets counted as a query. + globalOpCounters.gotQuery(); - // Although it is a command, a find command gets counted as a query. - globalOpCounters.gotQuery(); + if (txn->getClient()->isInDirectClient()) { + return appendCommandStatus( + result, + Status(ErrorCodes::IllegalOperation, "Cannot run find command from eval()")); + } - if (txn->getClient()->isInDirectClient()) { - return appendCommandStatus(result, - Status(ErrorCodes::IllegalOperation, - "Cannot run find command from eval()")); - } + // 1a) Parse the command BSON to a LiteParsedQuery. + const bool isExplain = false; + auto lpqStatus = LiteParsedQuery::makeFromFindCommand(nss, cmdObj, isExplain); + if (!lpqStatus.isOK()) { + return appendCommandStatus(result, lpqStatus.getStatus()); + } - // 1a) Parse the command BSON to a LiteParsedQuery. - const bool isExplain = false; - auto lpqStatus = LiteParsedQuery::makeFromFindCommand(nss, cmdObj, isExplain); - if (!lpqStatus.isOK()) { - return appendCommandStatus(result, lpqStatus.getStatus()); - } + auto& lpq = lpqStatus.getValue(); - auto& lpq = lpqStatus.getValue(); - - // Fill out curop information. - int ntoreturn = lpq->getBatchSize().value_or(0); - beginQueryOp(txn, nss, cmdObj, ntoreturn, lpq->getSkip()); - - // 1b) Finish the parsing step by using the LiteParsedQuery to create a CanonicalQuery. - std::unique_ptr<CanonicalQuery> cq; - { - CanonicalQuery* rawCq; - WhereCallbackReal whereCallback(txn, nss.db()); - Status canonStatus = CanonicalQuery::canonicalize(lpq.release(), - &rawCq, - whereCallback); - if (!canonStatus.isOK()) { - return appendCommandStatus(result, canonStatus); - } - cq.reset(rawCq); - } + // Fill out curop information. + int ntoreturn = lpq->getBatchSize().value_or(0); + beginQueryOp(txn, nss, cmdObj, ntoreturn, lpq->getSkip()); - // 2) Acquire locks. - AutoGetCollectionForRead ctx(txn, nss); - Collection* collection = ctx.getCollection(); - - const int dbProfilingLevel = ctx.getDb() ? ctx.getDb()->getProfilingLevel() : - serverGlobalParams.defaultProfile; - - // It is possible that the sharding version will change during yield while we are - // retrieving a plan executor. If this happens we will throw an error and mongos will - // retry. - const ChunkVersion shardingVersionAtStart = shardingState.getVersion(nss.ns()); - - // 3) Get the execution plan for the query. - std::unique_ptr<PlanExecutor> execHolder; - { - PlanExecutor* rawExec; - Status execStatus = getExecutorFind(txn, - collection, - nss, - cq.release(), - PlanExecutor::YIELD_AUTO, - &rawExec); - if (!execStatus.isOK()) { - return appendCommandStatus(result, execStatus); - } - execHolder.reset(rawExec); + // 1b) Finish the parsing step by using the LiteParsedQuery to create a CanonicalQuery. + std::unique_ptr<CanonicalQuery> cq; + { + CanonicalQuery* rawCq; + WhereCallbackReal whereCallback(txn, nss.db()); + Status canonStatus = CanonicalQuery::canonicalize(lpq.release(), &rawCq, whereCallback); + if (!canonStatus.isOK()) { + return appendCommandStatus(result, canonStatus); } + cq.reset(rawCq); + } - // TODO: Currently, chunk ranges are kept around until all ClientCursors created while - // the chunk belonged on this node are gone. Separating chunk lifetime management from - // ClientCursor should allow this check to go away. - if (!shardingState.getVersion(nss.ns()).isWriteCompatibleWith(shardingVersionAtStart)) { - // Version changed while retrieving a PlanExecutor. Terminate the operation, - // signaling that mongos should retry. - throw SendStaleConfigException(nss.ns(), - "version changed during find command", - shardingVersionAtStart, - shardingState.getVersion(nss.ns())); + // 2) Acquire locks. + AutoGetCollectionForRead ctx(txn, nss); + Collection* collection = ctx.getCollection(); + + const int dbProfilingLevel = + ctx.getDb() ? ctx.getDb()->getProfilingLevel() : serverGlobalParams.defaultProfile; + + // It is possible that the sharding version will change during yield while we are + // retrieving a plan executor. If this happens we will throw an error and mongos will + // retry. + const ChunkVersion shardingVersionAtStart = shardingState.getVersion(nss.ns()); + + // 3) Get the execution plan for the query. + std::unique_ptr<PlanExecutor> execHolder; + { + PlanExecutor* rawExec; + Status execStatus = getExecutorFind( + txn, collection, nss, cq.release(), PlanExecutor::YIELD_AUTO, &rawExec); + if (!execStatus.isOK()) { + return appendCommandStatus(result, execStatus); } + execHolder.reset(rawExec); + } - if (!collection) { - // No collection. Just fill out curop indicating that there were zero results and - // there is no ClientCursor id, and then return. - const int numResults = 0; - const CursorId cursorId = 0; - endQueryOp(txn, execHolder.get(), dbProfilingLevel, numResults, cursorId); - appendCursorResponseObject(cursorId, nss.ns(), BSONArray(), &result); - return true; - } + // TODO: Currently, chunk ranges are kept around until all ClientCursors created while + // the chunk belonged on this node are gone. Separating chunk lifetime management from + // ClientCursor should allow this check to go away. + if (!shardingState.getVersion(nss.ns()).isWriteCompatibleWith(shardingVersionAtStart)) { + // Version changed while retrieving a PlanExecutor. Terminate the operation, + // signaling that mongos should retry. + throw SendStaleConfigException(nss.ns(), + "version changed during find command", + shardingVersionAtStart, + shardingState.getVersion(nss.ns())); + } - const LiteParsedQuery& pq = execHolder->getCanonicalQuery()->getParsed(); - - // 4) If possible, register the execution plan inside a ClientCursor, and pin that - // cursor. In this case, ownership of the PlanExecutor is transferred to the - // ClientCursor, and 'exec' becomes null. - // - // First unregister the PlanExecutor so it can be re-registered with ClientCursor. - execHolder->deregisterExec(); - - // Create a ClientCursor containing this plan executor. We don't have to worry - // about leaking it as it's inserted into a global map by its ctor. - ClientCursor* cursor = new ClientCursor(collection->getCursorManager(), - execHolder.release(), - nss.ns(), - pq.getOptions(), - pq.getFilter()); - CursorId cursorId = cursor->cursorid(); - ClientCursorPin ccPin(collection->getCursorManager(), cursorId); - - // On early return, get rid of the the cursor. - ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, ccPin); - - invariant(!execHolder); - PlanExecutor* exec = cursor->getExecutor(); - - // 5) Stream query results, adding them to a BSONArray as we go. - BSONArrayBuilder firstBatch; - BSONObj obj; - PlanExecutor::ExecState state; - int numResults = 0; - while (!enoughForFirstBatch(pq, numResults, firstBatch.len()) - && PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { - // If adding this object will cause us to exceed the BSON size limit, then we stash - // it for later. - if (firstBatch.len() + obj.objsize() > BSONObjMaxUserSize && numResults > 0) { - exec->enqueue(obj); - break; - } - - // Add result to output buffer. - firstBatch.append(obj); - numResults++; - } + if (!collection) { + // No collection. Just fill out curop indicating that there were zero results and + // there is no ClientCursor id, and then return. + const int numResults = 0; + const CursorId cursorId = 0; + endQueryOp(txn, execHolder.get(), dbProfilingLevel, numResults, cursorId); + appendCursorResponseObject(cursorId, nss.ns(), BSONArray(), &result); + return true; + } - // Throw an assertion if query execution fails for any reason. - if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) { - const std::unique_ptr<PlanStageStats> stats(exec->getStats()); - error() << "Plan executor error during find command: " - << PlanExecutor::statestr(state) - << ", stats: " << Explain::statsToBSON(*stats); - - return appendCommandStatus(result, - Status(ErrorCodes::OperationFailed, - str::stream() - << "Executor error during find command: " - << WorkingSetCommon::toStatusString(obj))); + const LiteParsedQuery& pq = execHolder->getCanonicalQuery()->getParsed(); + + // 4) If possible, register the execution plan inside a ClientCursor, and pin that + // cursor. In this case, ownership of the PlanExecutor is transferred to the + // ClientCursor, and 'exec' becomes null. + // + // First unregister the PlanExecutor so it can be re-registered with ClientCursor. + execHolder->deregisterExec(); + + // Create a ClientCursor containing this plan executor. We don't have to worry + // about leaking it as it's inserted into a global map by its ctor. + ClientCursor* cursor = new ClientCursor(collection->getCursorManager(), + execHolder.release(), + nss.ns(), + pq.getOptions(), + pq.getFilter()); + CursorId cursorId = cursor->cursorid(); + ClientCursorPin ccPin(collection->getCursorManager(), cursorId); + + // On early return, get rid of the the cursor. + ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, ccPin); + + invariant(!execHolder); + PlanExecutor* exec = cursor->getExecutor(); + + // 5) Stream query results, adding them to a BSONArray as we go. + BSONArrayBuilder firstBatch; + BSONObj obj; + PlanExecutor::ExecState state; + int numResults = 0; + while (!enoughForFirstBatch(pq, numResults, firstBatch.len()) && + PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { + // If adding this object will cause us to exceed the BSON size limit, then we stash + // it for later. + if (firstBatch.len() + obj.objsize() > BSONObjMaxUserSize && numResults > 0) { + exec->enqueue(obj); + break; } - // 6) Set up the cursor for getMore. - if (shouldSaveCursor(txn, collection, state, exec)) { - // State will be restored on getMore. - exec->saveState(); - - cursor->setLeftoverMaxTimeMicros(CurOp::get(txn)->getRemainingMaxTimeMicros()); - cursor->setPos(numResults); - - // Don't stash the RU for tailable cursors at EOF, let them get a new RU on their - // next getMore. - if (!(pq.isTailable() && state == PlanExecutor::IS_EOF)) { - // We stash away the RecoveryUnit in the ClientCursor. It's used for - // subsequent getMore requests. The calling OpCtx gets a fresh RecoveryUnit. - txn->recoveryUnit()->abandonSnapshot(); - cursor->setOwnedRecoveryUnit(txn->releaseRecoveryUnit()); - StorageEngine* engine = getGlobalServiceContext()->getGlobalStorageEngine(); - txn->setRecoveryUnit(engine->newRecoveryUnit(), - OperationContext::kNotInUnitOfWork); - } - } - else { - cursorId = 0; - } + // Add result to output buffer. + firstBatch.append(obj); + numResults++; + } - // Fill out curop based on the results. - endQueryOp(txn, exec, dbProfilingLevel, numResults, cursorId); + // Throw an assertion if query execution fails for any reason. + if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) { + const std::unique_ptr<PlanStageStats> stats(exec->getStats()); + error() << "Plan executor error during find command: " << PlanExecutor::statestr(state) + << ", stats: " << Explain::statsToBSON(*stats); + + return appendCommandStatus(result, + Status(ErrorCodes::OperationFailed, + str::stream() + << "Executor error during find command: " + << WorkingSetCommon::toStatusString(obj))); + } - // 7) Generate the response object to send to the client. - appendCursorResponseObject(cursorId, nss.ns(), firstBatch.arr(), &result); - if (cursorId) { - cursorFreer.Dismiss(); + // 6) Set up the cursor for getMore. + if (shouldSaveCursor(txn, collection, state, exec)) { + // State will be restored on getMore. + exec->saveState(); + + cursor->setLeftoverMaxTimeMicros(CurOp::get(txn)->getRemainingMaxTimeMicros()); + cursor->setPos(numResults); + + // Don't stash the RU for tailable cursors at EOF, let them get a new RU on their + // next getMore. + if (!(pq.isTailable() && state == PlanExecutor::IS_EOF)) { + // We stash away the RecoveryUnit in the ClientCursor. It's used for + // subsequent getMore requests. The calling OpCtx gets a fresh RecoveryUnit. + txn->recoveryUnit()->abandonSnapshot(); + cursor->setOwnedRecoveryUnit(txn->releaseRecoveryUnit()); + StorageEngine* engine = getGlobalServiceContext()->getGlobalStorageEngine(); + txn->setRecoveryUnit(engine->newRecoveryUnit(), OperationContext::kNotInUnitOfWork); } - return true; + } else { + cursorId = 0; + } + + // Fill out curop based on the results. + endQueryOp(txn, exec, dbProfilingLevel, numResults, cursorId); + + // 7) Generate the response object to send to the client. + appendCursorResponseObject(cursorId, nss.ns(), firstBatch.arr(), &result); + if (cursorId) { + cursorFreer.Dismiss(); } + return true; + } - } findCmd; +} findCmd; -} // namespace mongo +} // namespace mongo |