/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery #include "mongo/platform/basic.h" #include #include #include "mongo/base/disallow_copying.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/client.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" #include "mongo/db/curop.h" #include "mongo/db/curop_failpoint_helpers.h" #include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/find.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/top.h" #include "mongo/s/chunk_version.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" #include "mongo/util/time_support.h" namespace mongo { namespace { MONGO_FP_DECLARE(rsStopGetMoreCmd); MONGO_FP_DECLARE(waitWithPinnedCursorDuringGetMoreBatch); /** * Validates that the lsid of 'opCtx' matches that of 'cursor'. This must be called after * authenticating, so that it is safe to report the lsid of 'cursor'. */ void validateLSID(OperationContext* opCtx, const GetMoreRequest& request, ClientCursor* cursor) { uassert(50736, str::stream() << "Cannot run getMore on cursor " << request.cursorid << ", which was not created in a session, in session " << *opCtx->getLogicalSessionId(), !opCtx->getLogicalSessionId() || cursor->getSessionId()); uassert(50737, str::stream() << "Cannot run getMore on cursor " << request.cursorid << ", which was created in session " << *cursor->getSessionId() << ", without an lsid", opCtx->getLogicalSessionId() || !cursor->getSessionId()); uassert(50738, str::stream() << "Cannot run getMore on cursor " << request.cursorid << ", which was created in session " << *cursor->getSessionId() << ", in session " << *opCtx->getLogicalSessionId(), !opCtx->getLogicalSessionId() || !cursor->getSessionId() || (*opCtx->getLogicalSessionId() == *cursor->getSessionId())); } /** * Validates that the txnNumber of 'opCtx' matches that of 'cursor'. This must be called after * authenticating, so that it is safe to report the txnNumber of 'cursor'. */ void validateTxnNumber(OperationContext* opCtx, const GetMoreRequest& request, ClientCursor* cursor) { uassert(50739, str::stream() << "Cannot run getMore on cursor " << request.cursorid << ", which was not created in a transaction, in transaction " << *opCtx->getTxnNumber(), !opCtx->getTxnNumber() || cursor->getTxnNumber()); uassert(50740, str::stream() << "Cannot run getMore on cursor " << request.cursorid << ", which was created in transaction " << *cursor->getTxnNumber() << ", without a txnNumber", opCtx->getTxnNumber() || !cursor->getTxnNumber()); uassert(50741, str::stream() << "Cannot run getMore on cursor " << request.cursorid << ", which was created in transaction " << *cursor->getTxnNumber() << ", in transaction " << *opCtx->getTxnNumber(), !opCtx->getTxnNumber() || !cursor->getTxnNumber() || (*opCtx->getTxnNumber() == *cursor->getTxnNumber())); } /** * A command for running getMore() against an existing cursor registered with a CursorManager. * Used to generate the next batch of results for a ClientCursor. * * Can be used in combination with any cursor-generating command (e.g. find, aggregate, * listIndexes). */ class GetMoreCmd : public BasicCommand { MONGO_DISALLOW_COPYING(GetMoreCmd); public: GetMoreCmd() : BasicCommand("getMore") {} virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } virtual bool allowsAfterClusterTime(const BSONObj& cmdObj) const override { return false; } AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { return AllowedOnSecondary::kAlways; } bool maintenanceOk() const override { return false; } bool adminOnly() const override { return false; } bool supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj, repl::ReadConcernLevel level) const final { // Uses the readConcern setting from whatever created the cursor. return level == repl::ReadConcernLevel::kLocalReadConcern; } ReadWriteType getReadWriteType() const { return ReadWriteType::kRead; } std::string help() const override { return "retrieve more results from an existing cursor"; } LogicalOp getLogicalOp() const override { return LogicalOp::opGetMore; } std::size_t reserveBytesForReply() const override { // The extra 1K is an artifact of how we construct batches. We consider a batch to be full // when it exceeds the goal batch size. In the case that we are just below the limit and // then read a large document, the extra 1K helps prevent a final realloc+memcpy. return FindCommon::kMaxBytesToReturnToClientAtOnce + 1024u; } /** * A getMore command increments the getMore counter, not the command counter. */ bool shouldAffectCommandCounter() const override { return false; } std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { return GetMoreRequest::parseNs(dbname, cmdObj).ns(); } Status checkAuthForCommand(Client* client, const std::string& dbname, const BSONObj& cmdObj) const override { StatusWith parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj); if (!parseStatus.isOK()) { return parseStatus.getStatus(); } const GetMoreRequest& request = parseStatus.getValue(); return AuthorizationSession::get(client)->checkAuthForGetMore( request.nss, request.cursorid, request.term.is_initialized()); } bool runParsed(OperationContext* opCtx, const NamespaceString& origNss, const GetMoreRequest& request, const BSONObj& cmdObj, BSONObjBuilder& result) { // If we return early without freeing the cursor, indicate we have a stashed cursor, so that // transaction state is stashed. ScopeGuard stashedCursorIndicator = MakeGuard(&OperationContext::setStashedCursor, opCtx); auto curOp = CurOp::get(opCtx); curOp->debug().cursorid = request.cursorid; // Validate term before acquiring locks, if provided. if (request.term) { auto replCoord = repl::ReplicationCoordinator::get(opCtx); Status status = replCoord->updateTerm(opCtx, *request.term); // Note: updateTerm returns ok if term stayed the same. if (!status.isOK()) { return CommandHelpers::appendCommandStatus(result, status); } } // Cursors come in one of two flavors: // - Cursors owned by the collection cursor manager, such as those generated via the find // command. For these cursors, we hold the appropriate collection lock for the duration of // the getMore using AutoGetCollectionForRead. // - Cursors owned by the global cursor manager, such as those generated via the aggregate // command. These cursors either hold no collection state or manage their collection state // internally, so we acquire no locks. // // While we only need to acquire locks in the case of a cursor which is *not* globally // owned, we need to create an AutoStatsTracker in either case. This is responsible for // updating statistics in CurOp and Top. We avoid using AutoGetCollectionForReadCommand // because we may need to drop and reacquire locks when the cursor is awaitData, but we // don't want to update the stats twice. // // Note that we acquire our locks before our ClientCursorPin, in order to ensure that // the pin's destructor is called before the lock's destructor (if there is one) so that the // cursor cleanup can occur under the lock. boost::optional readLock; boost::optional statsTracker; CursorManager* cursorManager; if (CursorManager::isGloballyManagedCursor(request.cursorid)) { cursorManager = CursorManager::getGlobalCursorManager(); if (boost::optional nssForCurOp = request.nss.isGloballyManagedNamespace() ? request.nss.getTargetNSForGloballyManagedNamespace() : request.nss) { const boost::optional dbProfilingLevel = boost::none; statsTracker.emplace( opCtx, *nssForCurOp, Top::LockType::NotLocked, dbProfilingLevel); } } else { readLock.emplace(opCtx, request.nss); const int doNotChangeProfilingLevel = 0; statsTracker.emplace(opCtx, request.nss, Top::LockType::ReadLocked, readLock->getDb() ? readLock->getDb()->getProfilingLevel() : doNotChangeProfilingLevel); Collection* collection = readLock->getCollection(); if (!collection) { return CommandHelpers::appendCommandStatus( result, Status(ErrorCodes::OperationFailed, "collection dropped between getMore calls")); } cursorManager = collection->getCursorManager(); } auto ccPin = cursorManager->pinCursor(opCtx, request.cursorid); if (!ccPin.isOK()) { return CommandHelpers::appendCommandStatus(result, ccPin.getStatus()); } ClientCursor* cursor = ccPin.getValue().getCursor(); // Only used by the failpoints. const auto dropAndReaquireReadLock = [&readLock, opCtx, &request]() { readLock.reset(); readLock.emplace(opCtx, request.nss); }; // If the 'waitAfterPinningCursorBeforeGetMoreBatch' fail point is enabled, set the 'msg' // field of this operation's CurOp to signal that we've hit this point and then repeatedly // release and re-acquire the collection readLock at regular intervals until the failpoint // is released. This is done in order to avoid deadlocks caused by the pinned-cursor // failpoints in this file (see SERVER-21997). if (MONGO_FAIL_POINT(waitAfterPinningCursorBeforeGetMoreBatch)) { CurOpFailpointHelpers::waitWhileFailPointEnabled( &waitAfterPinningCursorBeforeGetMoreBatch, opCtx, "waitAfterPinningCursorBeforeGetMoreBatch", dropAndReaquireReadLock); } // A user can only call getMore on their own cursor. If there were multiple users // authenticated when the cursor was created, then at least one of them must be // authenticated in order to run getMore on the cursor. if (!AuthorizationSession::get(opCtx->getClient()) ->isCoauthorizedWith(cursor->getAuthenticatedUsers())) { return CommandHelpers::appendCommandStatus( result, Status(ErrorCodes::Unauthorized, str::stream() << "cursor id " << request.cursorid << " was not created by the authenticated user")); } if (request.nss != cursor->nss()) { return CommandHelpers::appendCommandStatus( result, Status(ErrorCodes::Unauthorized, str::stream() << "Requested getMore on namespace '" << request.nss.ns() << "', but cursor belongs to a different namespace " << cursor->nss().ns())); } // Ensure the lsid and txnNumber of the getMore match that of the originating command. validateLSID(opCtx, request, cursor); validateTxnNumber(opCtx, request, cursor); if (request.nss.isOplog() && MONGO_FAIL_POINT(rsStopGetMoreCmd)) { return CommandHelpers::appendCommandStatus( result, Status(ErrorCodes::CommandFailed, str::stream() << "getMore on " << request.nss.ns() << " rejected due to active fail point rsStopGetMoreCmd")); } // Validation related to awaitData. if (cursor->isAwaitData()) { invariant(cursor->isTailable()); } if (request.awaitDataTimeout && !cursor->isAwaitData()) { Status status(ErrorCodes::BadValue, "cannot set maxTimeMS on getMore command for a non-awaitData cursor"); return CommandHelpers::appendCommandStatus(result, status); } // On early return, get rid of the cursor. We should no longer indicate we have a stashed // cursor on early return. ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &ccPin.getValue()); stashedCursorIndicator.Dismiss(); const auto replicationMode = repl::ReplicationCoordinator::get(opCtx)->getReplicationMode(); opCtx->recoveryUnit()->setReadConcernLevelAndReplicationMode(cursor->getReadConcernLevel(), replicationMode); // TODO SERVER-33698: Remove kSnapshotReadConcern clause once we can guarantee that a // readConcern level snapshot getMore will have an established point-in-time WiredTiger // snapshot. if (replicationMode == repl::ReplicationCoordinator::modeReplSet && (cursor->getReadConcernLevel() == repl::ReadConcernLevel::kMajorityReadConcern || cursor->getReadConcernLevel() == repl::ReadConcernLevel::kSnapshotReadConcern)) { uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot()); } const bool disableAwaitDataFailpointActive = MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd); // We assume that cursors created through a DBDirectClient are always used from their // original OperationContext, so we do not need to move time to and from the cursor. if (!opCtx->getClient()->isInDirectClient()) { // There is no time limit set directly on this getMore command. If the cursor is // awaitData, then we supply a default time of one second. Otherwise we roll over // any leftover time from the maxTimeMS of the operation that spawned this cursor, // applying it to this getMore. if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) { awaitDataState(opCtx).waitForInsertsDeadline = opCtx->getServiceContext()->getPreciseClockSource()->now() + request.awaitDataTimeout.value_or(Seconds{1}); } else if (cursor->getLeftoverMaxTimeMicros() < Microseconds::max()) { opCtx->setDeadlineAfterNowBy(cursor->getLeftoverMaxTimeMicros()); } } if (!cursor->isAwaitData()) { opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. } PlanExecutor* exec = cursor->getExecutor(); exec->reattachToOperationContext(opCtx); uassertStatusOK(exec->restoreState()); auto planSummary = Explain::getPlanSummary(exec); { stdx::lock_guard lk(*opCtx->getClient()); curOp->setPlanSummary_inlock(planSummary); // Ensure that the original query or command object is available in the slow query log, // profiler and currentOp. auto originatingCommand = cursor->getOriginatingCommandObj(); if (!originatingCommand.isEmpty()) { curOp->setOriginatingCommand_inlock(originatingCommand); } } CursorId respondWithId = 0; CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result); BSONObj obj; PlanExecutor::ExecState state = PlanExecutor::ADVANCED; long long numResults = 0; // We report keysExamined and docsExamined to OpDebug for a given getMore operation. To // obtain these values we need to take a diff of the pre-execution and post-execution // metrics, as they accumulate over the course of a cursor's lifetime. PlanSummaryStats preExecutionStats; Explain::getSummaryStats(*exec, &preExecutionStats); // Mark this as an AwaitData operation if appropriate. if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) { if (request.lastKnownCommittedOpTime) clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get(); awaitDataState(opCtx).shouldWaitForInserts = true; } // We're about to begin running the PlanExecutor in order to fill the getMore batch. If the // 'waitWithPinnedCursorDuringGetMoreBatch' failpoint is active, set the 'msg' field of this // operation's CurOp to signal that we've hit this point and then spin until the failpoint // is released. if (MONGO_FAIL_POINT(waitWithPinnedCursorDuringGetMoreBatch)) { CurOpFailpointHelpers::waitWhileFailPointEnabled( &waitWithPinnedCursorDuringGetMoreBatch, opCtx, "waitWithPinnedCursorDuringGetMoreBatch", dropAndReaquireReadLock); } Status batchStatus = generateBatch(opCtx, cursor, request, &nextBatch, &state, &numResults); if (!batchStatus.isOK()) { return CommandHelpers::appendCommandStatus(result, batchStatus); } PlanSummaryStats postExecutionStats; Explain::getSummaryStats(*exec, &postExecutionStats); postExecutionStats.totalKeysExamined -= preExecutionStats.totalKeysExamined; postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined; curOp->debug().setPlanSummaryMetrics(postExecutionStats); // We do not report 'execStats' for aggregation or other globally managed cursors, both in // the original request and subsequent getMore. It would be useful to have this information // for an aggregation, but the source PlanExecutor could be destroyed before we know whether // we need execStats and we do not want to generate for all operations due to cost. if (!CursorManager::isGloballyManagedCursor(request.cursorid) && curOp->shouldDBProfile()) { BSONObjBuilder execStatsBob; Explain::getWinningPlanStats(exec, &execStatsBob); curOp->debug().execStats = execStatsBob.obj(); } if (shouldSaveCursorGetMore(state, exec, cursor->isTailable())) { respondWithId = request.cursorid; exec->saveState(); exec->detachFromOperationContext(); cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); cursor->incPos(numResults); opCtx->setStashedCursor(); } else { curOp->debug().cursorExhausted = true; } nextBatch.done(respondWithId, request.nss.ns()); // Ensure log and profiler include the number of results returned in this getMore's response // batch. curOp->debug().nreturned = numResults; if (respondWithId) { cursorFreer.Dismiss(); } // We're about to unpin the cursor as the ClientCursorPin goes out of scope (or delete it, // if it has been exhausted). If the 'waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch' // failpoint is active, set the 'msg' field of this operation's CurOp to signal that we've // hit this point and then spin until the failpoint is released. if (MONGO_FAIL_POINT(waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch)) { CurOpFailpointHelpers::waitWhileFailPointEnabled( &waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch, opCtx, "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch", dropAndReaquireReadLock); } return true; } bool run(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder& result) override { // Counted as a getMore, not as a command. globalOpCounters.gotGetMore(); StatusWith parsedRequest = GetMoreRequest::parseFromBSON(dbname, cmdObj); if (!parsedRequest.isOK()) { return CommandHelpers::appendCommandStatus(result, parsedRequest.getStatus()); } auto request = parsedRequest.getValue(); return runParsed(opCtx, request.nss, request, cmdObj, result); } /** * Uses 'cursor' and 'request' to fill out 'nextBatch' with the batch of result documents to * be returned by this getMore. * * Returns the number of documents in the batch in *numResults, which must be initialized to * zero by the caller. Returns the final ExecState returned by the cursor in *state. * * Returns an OK status if the batch was successfully generated, and a non-OK status if the * PlanExecutor encounters a failure. */ Status generateBatch(OperationContext* opCtx, ClientCursor* cursor, const GetMoreRequest& request, CursorResponseBuilder* nextBatch, PlanExecutor::ExecState* state, long long* numResults) { PlanExecutor* exec = cursor->getExecutor(); // If an awaitData getMore is killed during this process due to our max time expiring at // an interrupt point, we just continue as normal and return rather than reporting a // timeout to the user. BSONObj obj; try { while (!FindCommon::enoughForGetMore(request.batchSize.value_or(0), *numResults) && PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, NULL))) { // If adding this object will cause us to exceed the message size limit, then we // stash it for later. if (!FindCommon::haveSpaceForNext(obj, *numResults, nextBatch->bytesUsed())) { exec->enqueue(obj); break; } // As soon as we get a result, this operation no longer waits. awaitDataState(opCtx).shouldWaitForInserts = false; // Add result to output buffer. nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); nextBatch->append(obj); (*numResults)++; } } catch (const ExceptionFor&) { // FAILURE state will make getMore command close the cursor even if it's tailable. *state = PlanExecutor::FAILURE; return Status::OK(); } switch (*state) { case PlanExecutor::FAILURE: // Log an error message and then perform the same cleanup as DEAD. error() << "GetMore command executor error: " << PlanExecutor::statestr(*state) << ", stats: " << redact(Explain::getWinningPlanStats(exec)); case PlanExecutor::DEAD: { nextBatch->abandon(); // We should always have a valid status member object at this point. auto status = WorkingSetCommon::getMemberObjectStatus(obj); invariant(!status.isOK()); return status; } case PlanExecutor::IS_EOF: // This causes the reported latest oplog timestamp to advance even when there are // no results for this particular query. nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); default: return Status::OK(); } MONGO_UNREACHABLE; } } getMoreCmd; } // namespace } // namespace mongo