diff options
author | Anthony Roy <anthony.roy@10gen.com> | 2018-07-10 14:02:52 -0400 |
---|---|---|
committer | Anthony Roy <anthony.roy@10gen.com> | 2018-07-11 13:51:43 -0400 |
commit | 143b979d16031cdcb0d4b6ecc70a11da52822960 (patch) | |
tree | 123047393980c7855bbf9e4b1c1fcf1e84617835 | |
parent | c27e72a4979b1b2ef241d23d2ad1434cdb3ff747 (diff) | |
download | mongo-143b979d16031cdcb0d4b6ecc70a11da52822960.tar.gz |
SERVER-35911 Upgraded GetMoreCmd and ClusterGetMoreCmd to use TypedCommand
Upgraded for access to DocumentSequences and to reduce the amount of
GetMoreRequest parses requred.
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 745 | ||||
-rw-r--r-- | src/mongo/db/query/getmore_request.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/query/getmore_request.h | 2 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_getmore_cmd.cpp | 91 |
4 files changed, 421 insertions, 431 deletions
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 71b7f548baf..4f0d297fc55 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -136,430 +136,433 @@ void validateTxnNumber(OperationContext* opCtx, * Can be used in combination with any cursor-generating command (e.g. find, aggregate, * listIndexes). */ -class GetMoreCmd : public BasicCommand { - MONGO_DISALLOW_COPYING(GetMoreCmd); - +class GetMoreCmd final : public Command { public: - GetMoreCmd() : BasicCommand("getMore") {} - - - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { - return false; - } - - virtual bool allowsAfterClusterTime(const BSONObj& cmdObj) const override { - return false; - } - - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { - return AllowedOnSecondary::kAlways; - } - - bool maintenanceOk() const override { - return false; - } + GetMoreCmd() : Command("getMore") {} - bool adminOnly() const override { - return false; - } - - bool supportsReadConcern(const std::string& dbName, - const BSONObj& cmdObj, - repl::ReadConcernLevel level) const final { - // Uses the readConcern setting from whatever created the cursor. - return level == repl::ReadConcernLevel::kLocalReadConcern; - } - - ReadWriteType getReadWriteType() const { - return ReadWriteType::kRead; + std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx, + const OpMsgRequest& opMsgRequest) override { + return std::make_unique<Invocation>(this, opMsgRequest); } - std::string help() const override { - return "retrieve more results from an existing cursor"; - } - - LogicalOp getLogicalOp() const override { - return LogicalOp::opGetMore; - } + class Invocation final : public CommandInvocation { + public: + Invocation(Command* cmd, const OpMsgRequest& request) + : CommandInvocation(cmd), + _request(uassertStatusOK( + GetMoreRequest::parseFromBSON(request.getDatabase().toString(), request.body))) {} - std::size_t reserveBytesForReply() const override { - // The extra 1K is an artifact of how we construct batches. We consider a batch to be full - // when it exceeds the goal batch size. In the case that we are just below the limit and - // then read a large document, the extra 1K helps prevent a final realloc+memcpy. - return FindCommon::kMaxBytesToReturnToClientAtOnce + 1024u; - } + private: + bool supportsWriteConcern() const override { + return false; + } - /** - * A getMore command increments the getMore counter, not the command counter. - */ - bool shouldAffectCommandCounter() const override { - return false; - } + bool allowsAfterClusterTime() const override { + return false; + } - std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { - return GetMoreRequest::parseNs(dbname, cmdObj).ns(); - } + NamespaceString ns() const override { + return _request.nss; + } - Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) const override { - StatusWith<GetMoreRequest> parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj); - if (!parseStatus.isOK()) { - return parseStatus.getStatus(); + void doCheckAuthorization(OperationContext* opCtx) const override { + uassertStatusOK(AuthorizationSession::get(opCtx->getClient()) + ->checkAuthForGetMore(_request.nss, + _request.cursorid, + _request.term.is_initialized())); } - const GetMoreRequest& request = parseStatus.getValue(); - return AuthorizationSession::get(client)->checkAuthForGetMore( - request.nss, request.cursorid, request.term.is_initialized()); - } + /** + * Uses 'cursor' and 'request' to fill out 'nextBatch' with the batch of result documents to + * be returned by this getMore. + * + * Returns the number of documents in the batch in *numResults, which must be initialized to + * zero by the caller. Returns the final ExecState returned by the cursor in *state. + * + * Returns an OK status if the batch was successfully generated, and a non-OK status if the + * PlanExecutor encounters a failure. + */ + Status generateBatch(OperationContext* opCtx, + ClientCursor* cursor, + const GetMoreRequest& request, + CursorResponseBuilder* nextBatch, + PlanExecutor::ExecState* state, + long long* numResults) { + PlanExecutor* exec = cursor->getExecutor(); + + // If an awaitData getMore is killed during this process due to our max time expiring at + // an interrupt point, we just continue as normal and return rather than reporting a + // timeout to the user. + BSONObj obj; + try { + while (!FindCommon::enoughForGetMore(request.batchSize.value_or(0), *numResults) && + PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, NULL))) { + // If adding this object will cause us to exceed the message size limit, then we + // stash it for later. + if (!FindCommon::haveSpaceForNext(obj, *numResults, nextBatch->bytesUsed())) { + exec->enqueue(obj); + break; + } + + // As soon as we get a result, this operation no longer waits. + awaitDataState(opCtx).shouldWaitForInserts = false; + // Add result to output buffer. + nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); + nextBatch->append(obj); + (*numResults)++; + } + } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) { + // FAILURE state will make getMore command close the cursor even if it's tailable. + *state = PlanExecutor::FAILURE; + return Status::OK(); + } + + switch (*state) { + case PlanExecutor::FAILURE: + // Log an error message and then perform the same cleanup as DEAD. + error() << "GetMore command executor error: " << PlanExecutor::statestr(*state) + << ", stats: " << redact(Explain::getWinningPlanStats(exec)); + case PlanExecutor::DEAD: { + nextBatch->abandon(); + // We should always have a valid status member object at this point. + auto status = WorkingSetCommon::getMemberObjectStatus(obj); + invariant(!status.isOK()); + return status; + } + case PlanExecutor::IS_EOF: + // This causes the reported latest oplog timestamp to advance even when there + // are + // no results for this particular query. + nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); + default: + return Status::OK(); + } - bool runParsed(OperationContext* opCtx, - const NamespaceString& origNss, - const GetMoreRequest& request, - const BSONObj& cmdObj, - BSONObjBuilder& result) { - auto curOp = CurOp::get(opCtx); - curOp->debug().cursorid = request.cursorid; - - // Validate term before acquiring locks, if provided. - if (request.term) { - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - Status status = replCoord->updateTerm(opCtx, *request.term); - // Note: updateTerm returns ok if term stayed the same. - uassertStatusOK(status); + MONGO_UNREACHABLE; } - // Cursors come in one of two flavors: - // - Cursors owned by the collection cursor manager, such as those generated via the find - // command. For these cursors, we hold the appropriate collection lock for the duration of - // the getMore using AutoGetCollectionForRead. - // - Cursors owned by the global cursor manager, such as those generated via the aggregate - // command. These cursors either hold no collection state or manage their collection state - // internally, so we acquire no locks. - // - // While we only need to acquire locks in the case of a cursor which is *not* globally - // owned, we need to create an AutoStatsTracker in either case. This is responsible for - // updating statistics in CurOp and Top. We avoid using AutoGetCollectionForReadCommand - // because we may need to drop and reacquire locks when the cursor is awaitData, but we - // don't want to update the stats twice. - // - // Note that we acquire our locks before our ClientCursorPin, in order to ensure that - // the pin's destructor is called before the lock's destructor (if there is one) so that the - // cursor cleanup can occur under the lock. - boost::optional<AutoGetCollectionForRead> readLock; - boost::optional<AutoStatsTracker> statsTracker; - CursorManager* cursorManager; - - if (CursorManager::isGloballyManagedCursor(request.cursorid)) { - cursorManager = CursorManager::getGlobalCursorManager(); - - if (boost::optional<NamespaceString> nssForCurOp = - request.nss.isGloballyManagedNamespace() - ? request.nss.getTargetNSForGloballyManagedNamespace() - : request.nss) { - const boost::optional<int> dbProfilingLevel = boost::none; - statsTracker.emplace( - opCtx, *nssForCurOp, Top::LockType::NotLocked, dbProfilingLevel); + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override { + // Counted as a getMore, not as a command. + globalOpCounters.gotGetMore(); + auto result = reply->getBodyBuilder(); + auto curOp = CurOp::get(opCtx); + curOp->debug().cursorid = _request.cursorid; + + // Validate term before acquiring locks, if provided. + if (_request.term) { + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + // Note: updateTerm returns ok if term stayed the same. + uassertStatusOK(replCoord->updateTerm(opCtx, *_request.term)); } - } else { - readLock.emplace(opCtx, request.nss); - const int doNotChangeProfilingLevel = 0; - statsTracker.emplace(opCtx, - request.nss, - Top::LockType::ReadLocked, - readLock->getDb() ? readLock->getDb()->getProfilingLevel() - : doNotChangeProfilingLevel); - - Collection* collection = readLock->getCollection(); - if (!collection) { - uasserted(ErrorCodes::OperationFailed, "collection dropped between getMore calls"); + + // Cursors come in one of two flavors: + // - Cursors owned by the collection cursor manager, such as those generated via the + // find + // command. For these cursors, we hold the appropriate collection lock for the + // duration of + // the getMore using AutoGetCollectionForRead. + // - Cursors owned by the global cursor manager, such as those generated via the + // aggregate + // command. These cursors either hold no collection state or manage their collection + // state + // internally, so we acquire no locks. + // + // While we only need to acquire locks in the case of a cursor which is *not* globally + // owned, we need to create an AutoStatsTracker in either case. This is responsible for + // updating statistics in CurOp and Top. We avoid using AutoGetCollectionForReadCommand + // because we may need to drop and reacquire locks when the cursor is awaitData, but we + // don't want to update the stats twice. + // + // Note that we acquire our locks before our ClientCursorPin, in order to ensure that + // the pin's destructor is called before the lock's destructor (if there is one) so that + // the + // cursor cleanup can occur under the lock. + boost::optional<AutoGetCollectionForRead> readLock; + boost::optional<AutoStatsTracker> statsTracker; + CursorManager* cursorManager; + + if (CursorManager::isGloballyManagedCursor(_request.cursorid)) { + cursorManager = CursorManager::getGlobalCursorManager(); + + if (boost::optional<NamespaceString> nssForCurOp = + _request.nss.isGloballyManagedNamespace() + ? _request.nss.getTargetNSForGloballyManagedNamespace() + : _request.nss) { + const boost::optional<int> dbProfilingLevel = boost::none; + statsTracker.emplace( + opCtx, *nssForCurOp, Top::LockType::NotLocked, dbProfilingLevel); + } + } else { + readLock.emplace(opCtx, _request.nss); + const int doNotChangeProfilingLevel = 0; + statsTracker.emplace(opCtx, + _request.nss, + Top::LockType::ReadLocked, + readLock->getDb() ? readLock->getDb()->getProfilingLevel() + : doNotChangeProfilingLevel); + + Collection* collection = readLock->getCollection(); + if (!collection) { + uasserted(ErrorCodes::OperationFailed, + "collection dropped between getMore calls"); + } + cursorManager = collection->getCursorManager(); } - cursorManager = collection->getCursorManager(); - } - auto ccPin = cursorManager->pinCursor(opCtx, request.cursorid); - uassertStatusOK(ccPin.getStatus()); - - ClientCursor* cursor = ccPin.getValue().getCursor(); - - // Only used by the failpoints. - const auto dropAndReaquireReadLock = [&readLock, opCtx, &request]() { - // Make sure an interrupted operation does not prevent us from reacquiring the lock. - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - - readLock.reset(); - readLock.emplace(opCtx, request.nss); - }; - - // If the 'waitAfterPinningCursorBeforeGetMoreBatch' fail point is enabled, set the 'msg' - // field of this operation's CurOp to signal that we've hit this point and then repeatedly - // release and re-acquire the collection readLock at regular intervals until the failpoint - // is released. This is done in order to avoid deadlocks caused by the pinned-cursor - // failpoints in this file (see SERVER-21997). - if (MONGO_FAIL_POINT(waitAfterPinningCursorBeforeGetMoreBatch)) { - CurOpFailpointHelpers::waitWhileFailPointEnabled( - &waitAfterPinningCursorBeforeGetMoreBatch, - opCtx, - "waitAfterPinningCursorBeforeGetMoreBatch", - dropAndReaquireReadLock); - } + auto ccPin = uassertStatusOK(cursorManager->pinCursor(opCtx, _request.cursorid)); + ClientCursor* cursor = ccPin.getCursor(); + + // Only used by the failpoints. + const auto dropAndReaquireReadLock = [&readLock, opCtx, this]() { + // Make sure an interrupted operation does not prevent us from reacquiring the lock. + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + + readLock.reset(); + readLock.emplace(opCtx, _request.nss); + }; + + // If the 'waitAfterPinningCursorBeforeGetMoreBatch' fail point is enabled, set the + // 'msg' + // field of this operation's CurOp to signal that we've hit this point and then + // repeatedly + // release and re-acquire the collection readLock at regular intervals until the + // failpoint + // is released. This is done in order to avoid deadlocks caused by the pinned-cursor + // failpoints in this file (see SERVER-21997). + if (MONGO_FAIL_POINT(waitAfterPinningCursorBeforeGetMoreBatch)) { + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &waitAfterPinningCursorBeforeGetMoreBatch, + opCtx, + "waitAfterPinningCursorBeforeGetMoreBatch", + dropAndReaquireReadLock); + } - // A user can only call getMore on their own cursor. If there were multiple users - // authenticated when the cursor was created, then at least one of them must be - // authenticated in order to run getMore on the cursor. - if (!AuthorizationSession::get(opCtx->getClient()) - ->isCoauthorizedWith(cursor->getAuthenticatedUsers())) { - uasserted(ErrorCodes::Unauthorized, - str::stream() << "cursor id " << request.cursorid - << " was not created by the authenticated user"); - } + // A user can only call getMore on their own cursor. If there were multiple users + // authenticated when the cursor was created, then at least one of them must be + // authenticated in order to run getMore on the cursor. + if (!AuthorizationSession::get(opCtx->getClient()) + ->isCoauthorizedWith(cursor->getAuthenticatedUsers())) { + uasserted(ErrorCodes::Unauthorized, + str::stream() << "cursor id " << _request.cursorid + << " was not created by the authenticated user"); + } - if (request.nss != cursor->nss()) { - uasserted(ErrorCodes::Unauthorized, - str::stream() << "Requested getMore on namespace '" << request.nss.ns() - << "', but cursor belongs to a different namespace " - << cursor->nss().ns()); - } + if (_request.nss != cursor->nss()) { + uasserted(ErrorCodes::Unauthorized, + str::stream() << "Requested getMore on namespace '" << _request.nss.ns() + << "', but cursor belongs to a different namespace " + << cursor->nss().ns()); + } - // Ensure the lsid and txnNumber of the getMore match that of the originating command. - validateLSID(opCtx, request, cursor); - validateTxnNumber(opCtx, request, cursor); + // Ensure the lsid and txnNumber of the getMore match that of the originating command. + validateLSID(opCtx, _request, cursor); + validateTxnNumber(opCtx, _request, cursor); - if (request.nss.isOplog() && MONGO_FAIL_POINT(rsStopGetMoreCmd)) { - uasserted(ErrorCodes::CommandFailed, - str::stream() << "getMore on " << request.nss.ns() - << " rejected due to active fail point rsStopGetMoreCmd"); - } + if (_request.nss.isOplog() && MONGO_FAIL_POINT(rsStopGetMoreCmd)) { + uasserted(ErrorCodes::CommandFailed, + str::stream() << "getMore on " << _request.nss.ns() + << " rejected due to active fail point rsStopGetMoreCmd"); + } - // Validation related to awaitData. - if (cursor->isAwaitData()) { - invariant(cursor->isTailable()); - } + // Validation related to awaitData. + if (cursor->isAwaitData()) { + invariant(cursor->isTailable()); + } - if (request.awaitDataTimeout && !cursor->isAwaitData()) { - Status status(ErrorCodes::BadValue, + if (_request.awaitDataTimeout && !cursor->isAwaitData()) { + uasserted(ErrorCodes::BadValue, "cannot set maxTimeMS on getMore command for a non-awaitData cursor"); - uassertStatusOK(status); - } + } - // On early return, get rid of the cursor. - ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &ccPin.getValue()); + // On early return, get rid of the cursor. + ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &ccPin); - const auto replicationMode = repl::ReplicationCoordinator::get(opCtx)->getReplicationMode(); - if (replicationMode == repl::ReplicationCoordinator::modeReplSet && - cursor->getReadConcernLevel() == repl::ReadConcernLevel::kMajorityReadConcern) { - opCtx->recoveryUnit()->setTimestampReadSource( - RecoveryUnit::ReadSource::kMajorityCommitted); - uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot()); - } + const auto replicationMode = + repl::ReplicationCoordinator::get(opCtx)->getReplicationMode(); + if (replicationMode == repl::ReplicationCoordinator::modeReplSet && + cursor->getReadConcernLevel() == repl::ReadConcernLevel::kMajorityReadConcern) { + opCtx->recoveryUnit()->setTimestampReadSource( + RecoveryUnit::ReadSource::kMajorityCommitted); + uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot()); + } - const bool disableAwaitDataFailpointActive = - MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd); + const bool disableAwaitDataFailpointActive = + MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd); + + // We assume that cursors created through a DBDirectClient are always used from their + // original OperationContext, so we do not need to move time to and from the cursor. + if (!opCtx->getClient()->isInDirectClient()) { + // There is no time limit set directly on this getMore command. If the cursor is + // awaitData, then we supply a default time of one second. Otherwise we roll over + // any leftover time from the maxTimeMS of the operation that spawned this cursor, + // applying it to this getMore. + if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) { + awaitDataState(opCtx).waitForInsertsDeadline = + opCtx->getServiceContext()->getPreciseClockSource()->now() + + _request.awaitDataTimeout.value_or(Seconds{1}); + } else if (cursor->getLeftoverMaxTimeMicros() < Microseconds::max()) { + opCtx->setDeadlineAfterNowBy(cursor->getLeftoverMaxTimeMicros()); + } + } + if (!cursor->isAwaitData()) { + opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. + } - // We assume that cursors created through a DBDirectClient are always used from their - // original OperationContext, so we do not need to move time to and from the cursor. - if (!opCtx->getClient()->isInDirectClient()) { - // There is no time limit set directly on this getMore command. If the cursor is - // awaitData, then we supply a default time of one second. Otherwise we roll over - // any leftover time from the maxTimeMS of the operation that spawned this cursor, - // applying it to this getMore. - if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) { - awaitDataState(opCtx).waitForInsertsDeadline = - opCtx->getServiceContext()->getPreciseClockSource()->now() + - request.awaitDataTimeout.value_or(Seconds{1}); - } else if (cursor->getLeftoverMaxTimeMicros() < Microseconds::max()) { - opCtx->setDeadlineAfterNowBy(cursor->getLeftoverMaxTimeMicros()); + PlanExecutor* exec = cursor->getExecutor(); + exec->reattachToOperationContext(opCtx); + uassertStatusOK(exec->restoreState()); + + auto planSummary = Explain::getPlanSummary(exec); + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + curOp->setPlanSummary_inlock(planSummary); + + // Ensure that the original query or command object is available in the slow query + // log, + // profiler and currentOp. + auto originatingCommand = cursor->getOriginatingCommandObj(); + if (!originatingCommand.isEmpty()) { + curOp->setOriginatingCommand_inlock(originatingCommand); + } } - } - if (!cursor->isAwaitData()) { - opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. - } - PlanExecutor* exec = cursor->getExecutor(); - exec->reattachToOperationContext(opCtx); - uassertStatusOK(exec->restoreState()); + CursorId respondWithId = 0; + CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result); + BSONObj obj; + PlanExecutor::ExecState state = PlanExecutor::ADVANCED; + long long numResults = 0; - auto planSummary = Explain::getPlanSummary(exec); - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - curOp->setPlanSummary_inlock(planSummary); + // We report keysExamined and docsExamined to OpDebug for a given getMore operation. To + // obtain these values we need to take a diff of the pre-execution and post-execution + // metrics, as they accumulate over the course of a cursor's lifetime. + PlanSummaryStats preExecutionStats; + Explain::getSummaryStats(*exec, &preExecutionStats); - // Ensure that the original query or command object is available in the slow query log, - // profiler and currentOp. - auto originatingCommand = cursor->getOriginatingCommandObj(); - if (!originatingCommand.isEmpty()) { - curOp->setOriginatingCommand_inlock(originatingCommand); + // Mark this as an AwaitData operation if appropriate. + if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) { + if (_request.lastKnownCommittedOpTime) + clientsLastKnownCommittedOpTime(opCtx) = + _request.lastKnownCommittedOpTime.get(); + awaitDataState(opCtx).shouldWaitForInserts = true; } - } - CursorId respondWithId = 0; - CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result); - BSONObj obj; - PlanExecutor::ExecState state = PlanExecutor::ADVANCED; - long long numResults = 0; - - // We report keysExamined and docsExamined to OpDebug for a given getMore operation. To - // obtain these values we need to take a diff of the pre-execution and post-execution - // metrics, as they accumulate over the course of a cursor's lifetime. - PlanSummaryStats preExecutionStats; - Explain::getSummaryStats(*exec, &preExecutionStats); - - // Mark this as an AwaitData operation if appropriate. - if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) { - if (request.lastKnownCommittedOpTime) - clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get(); - awaitDataState(opCtx).shouldWaitForInserts = true; - } + // We're about to begin running the PlanExecutor in order to fill the getMore batch. If + // the + // 'waitWithPinnedCursorDuringGetMoreBatch' failpoint is active, set the 'msg' field of + // this + // operation's CurOp to signal that we've hit this point and then spin until the + // failpoint + // is released. + if (MONGO_FAIL_POINT(waitWithPinnedCursorDuringGetMoreBatch)) { + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &waitWithPinnedCursorDuringGetMoreBatch, + opCtx, + "waitWithPinnedCursorDuringGetMoreBatch", + dropAndReaquireReadLock); + } - // We're about to begin running the PlanExecutor in order to fill the getMore batch. If the - // 'waitWithPinnedCursorDuringGetMoreBatch' failpoint is active, set the 'msg' field of this - // operation's CurOp to signal that we've hit this point and then spin until the failpoint - // is released. - if (MONGO_FAIL_POINT(waitWithPinnedCursorDuringGetMoreBatch)) { - CurOpFailpointHelpers::waitWhileFailPointEnabled( - &waitWithPinnedCursorDuringGetMoreBatch, - opCtx, - "waitWithPinnedCursorDuringGetMoreBatch", - dropAndReaquireReadLock); - } + uassertStatusOK( + generateBatch(opCtx, cursor, _request, &nextBatch, &state, &numResults)); + + PlanSummaryStats postExecutionStats; + Explain::getSummaryStats(*exec, &postExecutionStats); + postExecutionStats.totalKeysExamined -= preExecutionStats.totalKeysExamined; + postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined; + curOp->debug().setPlanSummaryMetrics(postExecutionStats); + + // We do not report 'execStats' for aggregation or other globally managed cursors, both + // in + // the original request and subsequent getMore. It would be useful to have this + // information + // for an aggregation, but the source PlanExecutor could be destroyed before we know + // whether + // we need execStats and we do not want to generate for all operations due to cost. + if (!CursorManager::isGloballyManagedCursor(_request.cursorid) && + curOp->shouldDBProfile()) { + BSONObjBuilder execStatsBob; + Explain::getWinningPlanStats(exec, &execStatsBob); + curOp->debug().execStats = execStatsBob.obj(); + } - Status batchStatus = generateBatch(opCtx, cursor, request, &nextBatch, &state, &numResults); - uassertStatusOK(batchStatus); - - PlanSummaryStats postExecutionStats; - Explain::getSummaryStats(*exec, &postExecutionStats); - postExecutionStats.totalKeysExamined -= preExecutionStats.totalKeysExamined; - postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined; - curOp->debug().setPlanSummaryMetrics(postExecutionStats); - - // We do not report 'execStats' for aggregation or other globally managed cursors, both in - // the original request and subsequent getMore. It would be useful to have this information - // for an aggregation, but the source PlanExecutor could be destroyed before we know whether - // we need execStats and we do not want to generate for all operations due to cost. - if (!CursorManager::isGloballyManagedCursor(request.cursorid) && curOp->shouldDBProfile()) { - BSONObjBuilder execStatsBob; - Explain::getWinningPlanStats(exec, &execStatsBob); - curOp->debug().execStats = execStatsBob.obj(); - } + if (shouldSaveCursorGetMore(state, exec, cursor->isTailable())) { + respondWithId = _request.cursorid; - if (shouldSaveCursorGetMore(state, exec, cursor->isTailable())) { - respondWithId = request.cursorid; + exec->saveState(); + exec->detachFromOperationContext(); - exec->saveState(); - exec->detachFromOperationContext(); + cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); + cursor->incPos(numResults); + } else { + curOp->debug().cursorExhausted = true; + } - cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); - cursor->incPos(numResults); - } else { - curOp->debug().cursorExhausted = true; - } + nextBatch.done(respondWithId, _request.nss.ns()); - nextBatch.done(respondWithId, request.nss.ns()); + // Ensure log and profiler include the number of results returned in this getMore's + // response + // batch. + curOp->debug().nreturned = numResults; - // Ensure log and profiler include the number of results returned in this getMore's response - // batch. - curOp->debug().nreturned = numResults; + if (respondWithId) { + cursorFreer.Dismiss(); + } - if (respondWithId) { - cursorFreer.Dismiss(); + // We're about to unpin the cursor as the ClientCursorPin goes out of scope (or delete + // it, + // if it has been exhausted). If the + // 'waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch' + // failpoint is active, set the 'msg' field of this operation's CurOp to signal that + // we've + // hit this point and then spin until the failpoint is released. + if (MONGO_FAIL_POINT(waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch)) { + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch, + opCtx, + "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch", + dropAndReaquireReadLock); + } } - // We're about to unpin the cursor as the ClientCursorPin goes out of scope (or delete it, - // if it has been exhausted). If the 'waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch' - // failpoint is active, set the 'msg' field of this operation's CurOp to signal that we've - // hit this point and then spin until the failpoint is released. - if (MONGO_FAIL_POINT(waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch)) { - CurOpFailpointHelpers::waitWhileFailPointEnabled( - &waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch, - opCtx, - "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch", - dropAndReaquireReadLock); - } + const GetMoreRequest _request; + }; - return true; + bool maintenanceOk() const override { + return false; } - bool run(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder& result) override { - // Counted as a getMore, not as a command. - globalOpCounters.gotGetMore(); - - StatusWith<GetMoreRequest> parsedRequest = GetMoreRequest::parseFromBSON(dbname, cmdObj); - uassertStatusOK(parsedRequest.getStatus()); - auto request = parsedRequest.getValue(); - return runParsed(opCtx, request.nss, request, cmdObj, result); + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kAlways; } - /** - * Uses 'cursor' and 'request' to fill out 'nextBatch' with the batch of result documents to - * be returned by this getMore. - * - * Returns the number of documents in the batch in *numResults, which must be initialized to - * zero by the caller. Returns the final ExecState returned by the cursor in *state. - * - * Returns an OK status if the batch was successfully generated, and a non-OK status if the - * PlanExecutor encounters a failure. - */ - Status generateBatch(OperationContext* opCtx, - ClientCursor* cursor, - const GetMoreRequest& request, - CursorResponseBuilder* nextBatch, - PlanExecutor::ExecState* state, - long long* numResults) { - PlanExecutor* exec = cursor->getExecutor(); - - // If an awaitData getMore is killed during this process due to our max time expiring at - // an interrupt point, we just continue as normal and return rather than reporting a - // timeout to the user. - BSONObj obj; - try { - while (!FindCommon::enoughForGetMore(request.batchSize.value_or(0), *numResults) && - PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, NULL))) { - // If adding this object will cause us to exceed the message size limit, then we - // stash it for later. - if (!FindCommon::haveSpaceForNext(obj, *numResults, nextBatch->bytesUsed())) { - exec->enqueue(obj); - break; - } + ReadWriteType getReadWriteType() const { + return ReadWriteType::kRead; + } - // As soon as we get a result, this operation no longer waits. - awaitDataState(opCtx).shouldWaitForInserts = false; - // Add result to output buffer. - nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); - nextBatch->append(obj); - (*numResults)++; - } - } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) { - // FAILURE state will make getMore command close the cursor even if it's tailable. - *state = PlanExecutor::FAILURE; - return Status::OK(); - } + std::string help() const override { + return "retrieve more results from an existing cursor"; + } - switch (*state) { - case PlanExecutor::FAILURE: - // Log an error message and then perform the same cleanup as DEAD. - error() << "GetMore command executor error: " << PlanExecutor::statestr(*state) - << ", stats: " << redact(Explain::getWinningPlanStats(exec)); - case PlanExecutor::DEAD: { - nextBatch->abandon(); - // We should always have a valid status member object at this point. - auto status = WorkingSetCommon::getMemberObjectStatus(obj); - invariant(!status.isOK()); - return status; - } - case PlanExecutor::IS_EOF: - // This causes the reported latest oplog timestamp to advance even when there are - // no results for this particular query. - nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); - default: - return Status::OK(); - } + LogicalOp getLogicalOp() const override { + return LogicalOp::opGetMore; + } - MONGO_UNREACHABLE; + std::size_t reserveBytesForReply() const override { + // The extra 1K is an artifact of how we construct batches. We consider a batch to be full + // when it exceeds the goal batch size. In the case that we are just below the limit and + // then read a large document, the extra 1K helps prevent a final realloc+memcpy. + return FindCommon::kMaxBytesToReturnToClientAtOnce + 1024u; } + /** + * A getMore command increments the getMore counter, not the command counter. + */ + bool shouldAffectCommandCounter() const override { + return false; + } + + bool adminOnly() const override { + return false; + } } getMoreCmd; } // namespace diff --git a/src/mongo/db/query/getmore_request.cpp b/src/mongo/db/query/getmore_request.cpp index ea2515123f6..d27b84521b4 100644 --- a/src/mongo/db/query/getmore_request.cpp +++ b/src/mongo/db/query/getmore_request.cpp @@ -91,18 +91,8 @@ Status GetMoreRequest::isValid() const { } // static -NamespaceString GetMoreRequest::parseNs(const std::string& dbname, const BSONObj& cmdObj) { - BSONElement collElt = cmdObj["collection"]; - const std::string coll = (collElt.type() == BSONType::String) ? collElt.String() : ""; - - return NamespaceString(dbname, coll); -} - -// static StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbname, const BSONObj& cmdObj) { - invariant(!dbname.empty()); - // Required fields. boost::optional<CursorId> cursorid; boost::optional<NamespaceString> nss; @@ -129,7 +119,9 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna << cmdObj}; } - nss = parseNs(dbname, cmdObj); + BSONElement collElt = cmdObj["collection"]; + const std::string coll = (collElt.type() == BSONType::String) ? collElt.String() : ""; + nss = NamespaceString(dbname, coll); } else if (fieldName == kBatchSizeField) { if (!el.isNumber()) { return {ErrorCodes::TypeMismatch, diff --git a/src/mongo/db/query/getmore_request.h b/src/mongo/db/query/getmore_request.h index 4da4839abbd..9759dd0a5b1 100644 --- a/src/mongo/db/query/getmore_request.h +++ b/src/mongo/db/query/getmore_request.h @@ -69,8 +69,6 @@ struct GetMoreRequest { */ BSONObj toBSON() const; - static NamespaceString parseNs(const std::string& dbname, const BSONObj& cmdObj); - const NamespaceString nss; const CursorId cursorid; diff --git a/src/mongo/s/commands/cluster_getmore_cmd.cpp b/src/mongo/s/commands/cluster_getmore_cmd.cpp index 688cacd28ef..fb8068eb395 100644 --- a/src/mongo/s/commands/cluster_getmore_cmd.cpp +++ b/src/mongo/s/commands/cluster_getmore_cmd.cpp @@ -44,78 +44,75 @@ namespace { * corresponding to the cursor id passed from the application. In order to generate these results, * may issue getMore commands to remote nodes in one or more shards. */ -class ClusterGetMoreCmd final : public BasicCommand { - MONGO_DISALLOW_COPYING(ClusterGetMoreCmd); - +class ClusterGetMoreCmd final : public Command { public: - ClusterGetMoreCmd() : BasicCommand("getMore") {} + ClusterGetMoreCmd() : Command("getMore") {} - std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const final { - return GetMoreRequest::parseNs(dbname, cmdObj).ns(); + std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx, + const OpMsgRequest& opMsgRequest) override { + return std::make_unique<Invocation>(this, opMsgRequest); } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { - return false; - } + class Invocation final : public CommandInvocation { + public: + Invocation(Command* cmd, const OpMsgRequest& request) + : CommandInvocation(cmd), + _request(uassertStatusOK( + GetMoreRequest::parseFromBSON(request.getDatabase().toString(), request.body))) {} + + private: + NamespaceString ns() const override { + return _request.nss; + } + + bool supportsWriteConcern() const override { + return false; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassertStatusOK(AuthorizationSession::get(opCtx->getClient()) + ->checkAuthForGetMore(_request.nss, + _request.cursorid, + _request.term.is_initialized())); + } + + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override { + // Counted as a getMore, not as a command. + globalOpCounters.gotGetMore(); + auto bob = reply->getBodyBuilder(); + auto response = uassertStatusOK(ClusterFind::runGetMore(opCtx, _request)); + response.addToBSON(CursorResponse::ResponseType::SubsequentResponse, &bob); + } - AllowedOnSecondary secondaryAllowed(ServiceContext*) const final { + const GetMoreRequest _request; + }; + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { return AllowedOnSecondary::kAlways; } - bool maintenanceOk() const final { + bool maintenanceOk() const override { return false; } - bool adminOnly() const final { + bool adminOnly() const override { return false; } /** * A getMore command increments the getMore counter, not the command counter. */ - bool shouldAffectCommandCounter() const final { + bool shouldAffectCommandCounter() const override { return false; } - std::string help() const final { + std::string help() const override { return "retrieve more documents for a cursor id"; } - LogicalOp getLogicalOp() const final { + LogicalOp getLogicalOp() const override { return LogicalOp::opGetMore; } - - Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) const final { - StatusWith<GetMoreRequest> parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj); - if (!parseStatus.isOK()) { - return parseStatus.getStatus(); - } - const GetMoreRequest& request = parseStatus.getValue(); - - return AuthorizationSession::get(client)->checkAuthForGetMore( - request.nss, request.cursorid, request.term.is_initialized()); - } - - bool run(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder& result) final { - // Counted as a getMore, not as a command. - globalOpCounters.gotGetMore(); - - StatusWith<GetMoreRequest> parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj); - uassertStatusOK(parseStatus.getStatus()); - const GetMoreRequest& request = parseStatus.getValue(); - - auto response = ClusterFind::runGetMore(opCtx, request); - uassertStatusOK(response.getStatus()); - - response.getValue().addToBSON(CursorResponse::ResponseType::SubsequentResponse, &result); - return true; - } - } cmdGetMoreCluster; } // namespace |