summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnthony Roy <anthony.roy@10gen.com>2018-07-10 14:02:52 -0400
committerAnthony Roy <anthony.roy@10gen.com>2018-07-11 13:51:43 -0400
commit143b979d16031cdcb0d4b6ecc70a11da52822960 (patch)
tree123047393980c7855bbf9e4b1c1fcf1e84617835
parentc27e72a4979b1b2ef241d23d2ad1434cdb3ff747 (diff)
downloadmongo-143b979d16031cdcb0d4b6ecc70a11da52822960.tar.gz
SERVER-35911 Upgraded GetMoreCmd and ClusterGetMoreCmd to use TypedCommand
Upgraded for access to DocumentSequences and to reduce the amount of GetMoreRequest parses requred.
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp745
-rw-r--r--src/mongo/db/query/getmore_request.cpp14
-rw-r--r--src/mongo/db/query/getmore_request.h2
-rw-r--r--src/mongo/s/commands/cluster_getmore_cmd.cpp91
4 files changed, 421 insertions, 431 deletions
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 71b7f548baf..4f0d297fc55 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -136,430 +136,433 @@ void validateTxnNumber(OperationContext* opCtx,
* Can be used in combination with any cursor-generating command (e.g. find, aggregate,
* listIndexes).
*/
-class GetMoreCmd : public BasicCommand {
- MONGO_DISALLOW_COPYING(GetMoreCmd);
-
+class GetMoreCmd final : public Command {
public:
- GetMoreCmd() : BasicCommand("getMore") {}
-
-
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
- return false;
- }
-
- virtual bool allowsAfterClusterTime(const BSONObj& cmdObj) const override {
- return false;
- }
-
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
- return AllowedOnSecondary::kAlways;
- }
-
- bool maintenanceOk() const override {
- return false;
- }
+ GetMoreCmd() : Command("getMore") {}
- bool adminOnly() const override {
- return false;
- }
-
- bool supportsReadConcern(const std::string& dbName,
- const BSONObj& cmdObj,
- repl::ReadConcernLevel level) const final {
- // Uses the readConcern setting from whatever created the cursor.
- return level == repl::ReadConcernLevel::kLocalReadConcern;
- }
-
- ReadWriteType getReadWriteType() const {
- return ReadWriteType::kRead;
+ std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx,
+ const OpMsgRequest& opMsgRequest) override {
+ return std::make_unique<Invocation>(this, opMsgRequest);
}
- std::string help() const override {
- return "retrieve more results from an existing cursor";
- }
-
- LogicalOp getLogicalOp() const override {
- return LogicalOp::opGetMore;
- }
+ class Invocation final : public CommandInvocation {
+ public:
+ Invocation(Command* cmd, const OpMsgRequest& request)
+ : CommandInvocation(cmd),
+ _request(uassertStatusOK(
+ GetMoreRequest::parseFromBSON(request.getDatabase().toString(), request.body))) {}
- std::size_t reserveBytesForReply() const override {
- // The extra 1K is an artifact of how we construct batches. We consider a batch to be full
- // when it exceeds the goal batch size. In the case that we are just below the limit and
- // then read a large document, the extra 1K helps prevent a final realloc+memcpy.
- return FindCommon::kMaxBytesToReturnToClientAtOnce + 1024u;
- }
+ private:
+ bool supportsWriteConcern() const override {
+ return false;
+ }
- /**
- * A getMore command increments the getMore counter, not the command counter.
- */
- bool shouldAffectCommandCounter() const override {
- return false;
- }
+ bool allowsAfterClusterTime() const override {
+ return false;
+ }
- std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
- return GetMoreRequest::parseNs(dbname, cmdObj).ns();
- }
+ NamespaceString ns() const override {
+ return _request.nss;
+ }
- Status checkAuthForCommand(Client* client,
- const std::string& dbname,
- const BSONObj& cmdObj) const override {
- StatusWith<GetMoreRequest> parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj);
- if (!parseStatus.isOK()) {
- return parseStatus.getStatus();
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassertStatusOK(AuthorizationSession::get(opCtx->getClient())
+ ->checkAuthForGetMore(_request.nss,
+ _request.cursorid,
+ _request.term.is_initialized()));
}
- const GetMoreRequest& request = parseStatus.getValue();
- return AuthorizationSession::get(client)->checkAuthForGetMore(
- request.nss, request.cursorid, request.term.is_initialized());
- }
+ /**
+ * Uses 'cursor' and 'request' to fill out 'nextBatch' with the batch of result documents to
+ * be returned by this getMore.
+ *
+ * Returns the number of documents in the batch in *numResults, which must be initialized to
+ * zero by the caller. Returns the final ExecState returned by the cursor in *state.
+ *
+ * Returns an OK status if the batch was successfully generated, and a non-OK status if the
+ * PlanExecutor encounters a failure.
+ */
+ Status generateBatch(OperationContext* opCtx,
+ ClientCursor* cursor,
+ const GetMoreRequest& request,
+ CursorResponseBuilder* nextBatch,
+ PlanExecutor::ExecState* state,
+ long long* numResults) {
+ PlanExecutor* exec = cursor->getExecutor();
+
+ // If an awaitData getMore is killed during this process due to our max time expiring at
+ // an interrupt point, we just continue as normal and return rather than reporting a
+ // timeout to the user.
+ BSONObj obj;
+ try {
+ while (!FindCommon::enoughForGetMore(request.batchSize.value_or(0), *numResults) &&
+ PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, NULL))) {
+ // If adding this object will cause us to exceed the message size limit, then we
+ // stash it for later.
+ if (!FindCommon::haveSpaceForNext(obj, *numResults, nextBatch->bytesUsed())) {
+ exec->enqueue(obj);
+ break;
+ }
+
+ // As soon as we get a result, this operation no longer waits.
+ awaitDataState(opCtx).shouldWaitForInserts = false;
+ // Add result to output buffer.
+ nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp());
+ nextBatch->append(obj);
+ (*numResults)++;
+ }
+ } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) {
+ // FAILURE state will make getMore command close the cursor even if it's tailable.
+ *state = PlanExecutor::FAILURE;
+ return Status::OK();
+ }
+
+ switch (*state) {
+ case PlanExecutor::FAILURE:
+ // Log an error message and then perform the same cleanup as DEAD.
+ error() << "GetMore command executor error: " << PlanExecutor::statestr(*state)
+ << ", stats: " << redact(Explain::getWinningPlanStats(exec));
+ case PlanExecutor::DEAD: {
+ nextBatch->abandon();
+ // We should always have a valid status member object at this point.
+ auto status = WorkingSetCommon::getMemberObjectStatus(obj);
+ invariant(!status.isOK());
+ return status;
+ }
+ case PlanExecutor::IS_EOF:
+ // This causes the reported latest oplog timestamp to advance even when there
+ // are
+ // no results for this particular query.
+ nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp());
+ default:
+ return Status::OK();
+ }
- bool runParsed(OperationContext* opCtx,
- const NamespaceString& origNss,
- const GetMoreRequest& request,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) {
- auto curOp = CurOp::get(opCtx);
- curOp->debug().cursorid = request.cursorid;
-
- // Validate term before acquiring locks, if provided.
- if (request.term) {
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- Status status = replCoord->updateTerm(opCtx, *request.term);
- // Note: updateTerm returns ok if term stayed the same.
- uassertStatusOK(status);
+ MONGO_UNREACHABLE;
}
- // Cursors come in one of two flavors:
- // - Cursors owned by the collection cursor manager, such as those generated via the find
- // command. For these cursors, we hold the appropriate collection lock for the duration of
- // the getMore using AutoGetCollectionForRead.
- // - Cursors owned by the global cursor manager, such as those generated via the aggregate
- // command. These cursors either hold no collection state or manage their collection state
- // internally, so we acquire no locks.
- //
- // While we only need to acquire locks in the case of a cursor which is *not* globally
- // owned, we need to create an AutoStatsTracker in either case. This is responsible for
- // updating statistics in CurOp and Top. We avoid using AutoGetCollectionForReadCommand
- // because we may need to drop and reacquire locks when the cursor is awaitData, but we
- // don't want to update the stats twice.
- //
- // Note that we acquire our locks before our ClientCursorPin, in order to ensure that
- // the pin's destructor is called before the lock's destructor (if there is one) so that the
- // cursor cleanup can occur under the lock.
- boost::optional<AutoGetCollectionForRead> readLock;
- boost::optional<AutoStatsTracker> statsTracker;
- CursorManager* cursorManager;
-
- if (CursorManager::isGloballyManagedCursor(request.cursorid)) {
- cursorManager = CursorManager::getGlobalCursorManager();
-
- if (boost::optional<NamespaceString> nssForCurOp =
- request.nss.isGloballyManagedNamespace()
- ? request.nss.getTargetNSForGloballyManagedNamespace()
- : request.nss) {
- const boost::optional<int> dbProfilingLevel = boost::none;
- statsTracker.emplace(
- opCtx, *nssForCurOp, Top::LockType::NotLocked, dbProfilingLevel);
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override {
+ // Counted as a getMore, not as a command.
+ globalOpCounters.gotGetMore();
+ auto result = reply->getBodyBuilder();
+ auto curOp = CurOp::get(opCtx);
+ curOp->debug().cursorid = _request.cursorid;
+
+ // Validate term before acquiring locks, if provided.
+ if (_request.term) {
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ // Note: updateTerm returns ok if term stayed the same.
+ uassertStatusOK(replCoord->updateTerm(opCtx, *_request.term));
}
- } else {
- readLock.emplace(opCtx, request.nss);
- const int doNotChangeProfilingLevel = 0;
- statsTracker.emplace(opCtx,
- request.nss,
- Top::LockType::ReadLocked,
- readLock->getDb() ? readLock->getDb()->getProfilingLevel()
- : doNotChangeProfilingLevel);
-
- Collection* collection = readLock->getCollection();
- if (!collection) {
- uasserted(ErrorCodes::OperationFailed, "collection dropped between getMore calls");
+
+ // Cursors come in one of two flavors:
+ // - Cursors owned by the collection cursor manager, such as those generated via the
+ // find
+ // command. For these cursors, we hold the appropriate collection lock for the
+ // duration of
+ // the getMore using AutoGetCollectionForRead.
+ // - Cursors owned by the global cursor manager, such as those generated via the
+ // aggregate
+ // command. These cursors either hold no collection state or manage their collection
+ // state
+ // internally, so we acquire no locks.
+ //
+ // While we only need to acquire locks in the case of a cursor which is *not* globally
+ // owned, we need to create an AutoStatsTracker in either case. This is responsible for
+ // updating statistics in CurOp and Top. We avoid using AutoGetCollectionForReadCommand
+ // because we may need to drop and reacquire locks when the cursor is awaitData, but we
+ // don't want to update the stats twice.
+ //
+ // Note that we acquire our locks before our ClientCursorPin, in order to ensure that
+ // the pin's destructor is called before the lock's destructor (if there is one) so that
+ // the
+ // cursor cleanup can occur under the lock.
+ boost::optional<AutoGetCollectionForRead> readLock;
+ boost::optional<AutoStatsTracker> statsTracker;
+ CursorManager* cursorManager;
+
+ if (CursorManager::isGloballyManagedCursor(_request.cursorid)) {
+ cursorManager = CursorManager::getGlobalCursorManager();
+
+ if (boost::optional<NamespaceString> nssForCurOp =
+ _request.nss.isGloballyManagedNamespace()
+ ? _request.nss.getTargetNSForGloballyManagedNamespace()
+ : _request.nss) {
+ const boost::optional<int> dbProfilingLevel = boost::none;
+ statsTracker.emplace(
+ opCtx, *nssForCurOp, Top::LockType::NotLocked, dbProfilingLevel);
+ }
+ } else {
+ readLock.emplace(opCtx, _request.nss);
+ const int doNotChangeProfilingLevel = 0;
+ statsTracker.emplace(opCtx,
+ _request.nss,
+ Top::LockType::ReadLocked,
+ readLock->getDb() ? readLock->getDb()->getProfilingLevel()
+ : doNotChangeProfilingLevel);
+
+ Collection* collection = readLock->getCollection();
+ if (!collection) {
+ uasserted(ErrorCodes::OperationFailed,
+ "collection dropped between getMore calls");
+ }
+ cursorManager = collection->getCursorManager();
}
- cursorManager = collection->getCursorManager();
- }
- auto ccPin = cursorManager->pinCursor(opCtx, request.cursorid);
- uassertStatusOK(ccPin.getStatus());
-
- ClientCursor* cursor = ccPin.getValue().getCursor();
-
- // Only used by the failpoints.
- const auto dropAndReaquireReadLock = [&readLock, opCtx, &request]() {
- // Make sure an interrupted operation does not prevent us from reacquiring the lock.
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
-
- readLock.reset();
- readLock.emplace(opCtx, request.nss);
- };
-
- // If the 'waitAfterPinningCursorBeforeGetMoreBatch' fail point is enabled, set the 'msg'
- // field of this operation's CurOp to signal that we've hit this point and then repeatedly
- // release and re-acquire the collection readLock at regular intervals until the failpoint
- // is released. This is done in order to avoid deadlocks caused by the pinned-cursor
- // failpoints in this file (see SERVER-21997).
- if (MONGO_FAIL_POINT(waitAfterPinningCursorBeforeGetMoreBatch)) {
- CurOpFailpointHelpers::waitWhileFailPointEnabled(
- &waitAfterPinningCursorBeforeGetMoreBatch,
- opCtx,
- "waitAfterPinningCursorBeforeGetMoreBatch",
- dropAndReaquireReadLock);
- }
+ auto ccPin = uassertStatusOK(cursorManager->pinCursor(opCtx, _request.cursorid));
+ ClientCursor* cursor = ccPin.getCursor();
+
+ // Only used by the failpoints.
+ const auto dropAndReaquireReadLock = [&readLock, opCtx, this]() {
+ // Make sure an interrupted operation does not prevent us from reacquiring the lock.
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+
+ readLock.reset();
+ readLock.emplace(opCtx, _request.nss);
+ };
+
+ // If the 'waitAfterPinningCursorBeforeGetMoreBatch' fail point is enabled, set the
+ // 'msg'
+ // field of this operation's CurOp to signal that we've hit this point and then
+ // repeatedly
+ // release and re-acquire the collection readLock at regular intervals until the
+ // failpoint
+ // is released. This is done in order to avoid deadlocks caused by the pinned-cursor
+ // failpoints in this file (see SERVER-21997).
+ if (MONGO_FAIL_POINT(waitAfterPinningCursorBeforeGetMoreBatch)) {
+ CurOpFailpointHelpers::waitWhileFailPointEnabled(
+ &waitAfterPinningCursorBeforeGetMoreBatch,
+ opCtx,
+ "waitAfterPinningCursorBeforeGetMoreBatch",
+ dropAndReaquireReadLock);
+ }
- // A user can only call getMore on their own cursor. If there were multiple users
- // authenticated when the cursor was created, then at least one of them must be
- // authenticated in order to run getMore on the cursor.
- if (!AuthorizationSession::get(opCtx->getClient())
- ->isCoauthorizedWith(cursor->getAuthenticatedUsers())) {
- uasserted(ErrorCodes::Unauthorized,
- str::stream() << "cursor id " << request.cursorid
- << " was not created by the authenticated user");
- }
+ // A user can only call getMore on their own cursor. If there were multiple users
+ // authenticated when the cursor was created, then at least one of them must be
+ // authenticated in order to run getMore on the cursor.
+ if (!AuthorizationSession::get(opCtx->getClient())
+ ->isCoauthorizedWith(cursor->getAuthenticatedUsers())) {
+ uasserted(ErrorCodes::Unauthorized,
+ str::stream() << "cursor id " << _request.cursorid
+ << " was not created by the authenticated user");
+ }
- if (request.nss != cursor->nss()) {
- uasserted(ErrorCodes::Unauthorized,
- str::stream() << "Requested getMore on namespace '" << request.nss.ns()
- << "', but cursor belongs to a different namespace "
- << cursor->nss().ns());
- }
+ if (_request.nss != cursor->nss()) {
+ uasserted(ErrorCodes::Unauthorized,
+ str::stream() << "Requested getMore on namespace '" << _request.nss.ns()
+ << "', but cursor belongs to a different namespace "
+ << cursor->nss().ns());
+ }
- // Ensure the lsid and txnNumber of the getMore match that of the originating command.
- validateLSID(opCtx, request, cursor);
- validateTxnNumber(opCtx, request, cursor);
+ // Ensure the lsid and txnNumber of the getMore match that of the originating command.
+ validateLSID(opCtx, _request, cursor);
+ validateTxnNumber(opCtx, _request, cursor);
- if (request.nss.isOplog() && MONGO_FAIL_POINT(rsStopGetMoreCmd)) {
- uasserted(ErrorCodes::CommandFailed,
- str::stream() << "getMore on " << request.nss.ns()
- << " rejected due to active fail point rsStopGetMoreCmd");
- }
+ if (_request.nss.isOplog() && MONGO_FAIL_POINT(rsStopGetMoreCmd)) {
+ uasserted(ErrorCodes::CommandFailed,
+ str::stream() << "getMore on " << _request.nss.ns()
+ << " rejected due to active fail point rsStopGetMoreCmd");
+ }
- // Validation related to awaitData.
- if (cursor->isAwaitData()) {
- invariant(cursor->isTailable());
- }
+ // Validation related to awaitData.
+ if (cursor->isAwaitData()) {
+ invariant(cursor->isTailable());
+ }
- if (request.awaitDataTimeout && !cursor->isAwaitData()) {
- Status status(ErrorCodes::BadValue,
+ if (_request.awaitDataTimeout && !cursor->isAwaitData()) {
+ uasserted(ErrorCodes::BadValue,
"cannot set maxTimeMS on getMore command for a non-awaitData cursor");
- uassertStatusOK(status);
- }
+ }
- // On early return, get rid of the cursor.
- ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &ccPin.getValue());
+ // On early return, get rid of the cursor.
+ ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &ccPin);
- const auto replicationMode = repl::ReplicationCoordinator::get(opCtx)->getReplicationMode();
- if (replicationMode == repl::ReplicationCoordinator::modeReplSet &&
- cursor->getReadConcernLevel() == repl::ReadConcernLevel::kMajorityReadConcern) {
- opCtx->recoveryUnit()->setTimestampReadSource(
- RecoveryUnit::ReadSource::kMajorityCommitted);
- uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot());
- }
+ const auto replicationMode =
+ repl::ReplicationCoordinator::get(opCtx)->getReplicationMode();
+ if (replicationMode == repl::ReplicationCoordinator::modeReplSet &&
+ cursor->getReadConcernLevel() == repl::ReadConcernLevel::kMajorityReadConcern) {
+ opCtx->recoveryUnit()->setTimestampReadSource(
+ RecoveryUnit::ReadSource::kMajorityCommitted);
+ uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot());
+ }
- const bool disableAwaitDataFailpointActive =
- MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd);
+ const bool disableAwaitDataFailpointActive =
+ MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd);
+
+ // We assume that cursors created through a DBDirectClient are always used from their
+ // original OperationContext, so we do not need to move time to and from the cursor.
+ if (!opCtx->getClient()->isInDirectClient()) {
+ // There is no time limit set directly on this getMore command. If the cursor is
+ // awaitData, then we supply a default time of one second. Otherwise we roll over
+ // any leftover time from the maxTimeMS of the operation that spawned this cursor,
+ // applying it to this getMore.
+ if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) {
+ awaitDataState(opCtx).waitForInsertsDeadline =
+ opCtx->getServiceContext()->getPreciseClockSource()->now() +
+ _request.awaitDataTimeout.value_or(Seconds{1});
+ } else if (cursor->getLeftoverMaxTimeMicros() < Microseconds::max()) {
+ opCtx->setDeadlineAfterNowBy(cursor->getLeftoverMaxTimeMicros());
+ }
+ }
+ if (!cursor->isAwaitData()) {
+ opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
+ }
- // We assume that cursors created through a DBDirectClient are always used from their
- // original OperationContext, so we do not need to move time to and from the cursor.
- if (!opCtx->getClient()->isInDirectClient()) {
- // There is no time limit set directly on this getMore command. If the cursor is
- // awaitData, then we supply a default time of one second. Otherwise we roll over
- // any leftover time from the maxTimeMS of the operation that spawned this cursor,
- // applying it to this getMore.
- if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) {
- awaitDataState(opCtx).waitForInsertsDeadline =
- opCtx->getServiceContext()->getPreciseClockSource()->now() +
- request.awaitDataTimeout.value_or(Seconds{1});
- } else if (cursor->getLeftoverMaxTimeMicros() < Microseconds::max()) {
- opCtx->setDeadlineAfterNowBy(cursor->getLeftoverMaxTimeMicros());
+ PlanExecutor* exec = cursor->getExecutor();
+ exec->reattachToOperationContext(opCtx);
+ uassertStatusOK(exec->restoreState());
+
+ auto planSummary = Explain::getPlanSummary(exec);
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ curOp->setPlanSummary_inlock(planSummary);
+
+ // Ensure that the original query or command object is available in the slow query
+ // log,
+ // profiler and currentOp.
+ auto originatingCommand = cursor->getOriginatingCommandObj();
+ if (!originatingCommand.isEmpty()) {
+ curOp->setOriginatingCommand_inlock(originatingCommand);
+ }
}
- }
- if (!cursor->isAwaitData()) {
- opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
- }
- PlanExecutor* exec = cursor->getExecutor();
- exec->reattachToOperationContext(opCtx);
- uassertStatusOK(exec->restoreState());
+ CursorId respondWithId = 0;
+ CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result);
+ BSONObj obj;
+ PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
+ long long numResults = 0;
- auto planSummary = Explain::getPlanSummary(exec);
- {
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- curOp->setPlanSummary_inlock(planSummary);
+ // We report keysExamined and docsExamined to OpDebug for a given getMore operation. To
+ // obtain these values we need to take a diff of the pre-execution and post-execution
+ // metrics, as they accumulate over the course of a cursor's lifetime.
+ PlanSummaryStats preExecutionStats;
+ Explain::getSummaryStats(*exec, &preExecutionStats);
- // Ensure that the original query or command object is available in the slow query log,
- // profiler and currentOp.
- auto originatingCommand = cursor->getOriginatingCommandObj();
- if (!originatingCommand.isEmpty()) {
- curOp->setOriginatingCommand_inlock(originatingCommand);
+ // Mark this as an AwaitData operation if appropriate.
+ if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) {
+ if (_request.lastKnownCommittedOpTime)
+ clientsLastKnownCommittedOpTime(opCtx) =
+ _request.lastKnownCommittedOpTime.get();
+ awaitDataState(opCtx).shouldWaitForInserts = true;
}
- }
- CursorId respondWithId = 0;
- CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result);
- BSONObj obj;
- PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
- long long numResults = 0;
-
- // We report keysExamined and docsExamined to OpDebug for a given getMore operation. To
- // obtain these values we need to take a diff of the pre-execution and post-execution
- // metrics, as they accumulate over the course of a cursor's lifetime.
- PlanSummaryStats preExecutionStats;
- Explain::getSummaryStats(*exec, &preExecutionStats);
-
- // Mark this as an AwaitData operation if appropriate.
- if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) {
- if (request.lastKnownCommittedOpTime)
- clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get();
- awaitDataState(opCtx).shouldWaitForInserts = true;
- }
+ // We're about to begin running the PlanExecutor in order to fill the getMore batch. If
+ // the
+ // 'waitWithPinnedCursorDuringGetMoreBatch' failpoint is active, set the 'msg' field of
+ // this
+ // operation's CurOp to signal that we've hit this point and then spin until the
+ // failpoint
+ // is released.
+ if (MONGO_FAIL_POINT(waitWithPinnedCursorDuringGetMoreBatch)) {
+ CurOpFailpointHelpers::waitWhileFailPointEnabled(
+ &waitWithPinnedCursorDuringGetMoreBatch,
+ opCtx,
+ "waitWithPinnedCursorDuringGetMoreBatch",
+ dropAndReaquireReadLock);
+ }
- // We're about to begin running the PlanExecutor in order to fill the getMore batch. If the
- // 'waitWithPinnedCursorDuringGetMoreBatch' failpoint is active, set the 'msg' field of this
- // operation's CurOp to signal that we've hit this point and then spin until the failpoint
- // is released.
- if (MONGO_FAIL_POINT(waitWithPinnedCursorDuringGetMoreBatch)) {
- CurOpFailpointHelpers::waitWhileFailPointEnabled(
- &waitWithPinnedCursorDuringGetMoreBatch,
- opCtx,
- "waitWithPinnedCursorDuringGetMoreBatch",
- dropAndReaquireReadLock);
- }
+ uassertStatusOK(
+ generateBatch(opCtx, cursor, _request, &nextBatch, &state, &numResults));
+
+ PlanSummaryStats postExecutionStats;
+ Explain::getSummaryStats(*exec, &postExecutionStats);
+ postExecutionStats.totalKeysExamined -= preExecutionStats.totalKeysExamined;
+ postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined;
+ curOp->debug().setPlanSummaryMetrics(postExecutionStats);
+
+ // We do not report 'execStats' for aggregation or other globally managed cursors, both
+ // in
+ // the original request and subsequent getMore. It would be useful to have this
+ // information
+ // for an aggregation, but the source PlanExecutor could be destroyed before we know
+ // whether
+ // we need execStats and we do not want to generate for all operations due to cost.
+ if (!CursorManager::isGloballyManagedCursor(_request.cursorid) &&
+ curOp->shouldDBProfile()) {
+ BSONObjBuilder execStatsBob;
+ Explain::getWinningPlanStats(exec, &execStatsBob);
+ curOp->debug().execStats = execStatsBob.obj();
+ }
- Status batchStatus = generateBatch(opCtx, cursor, request, &nextBatch, &state, &numResults);
- uassertStatusOK(batchStatus);
-
- PlanSummaryStats postExecutionStats;
- Explain::getSummaryStats(*exec, &postExecutionStats);
- postExecutionStats.totalKeysExamined -= preExecutionStats.totalKeysExamined;
- postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined;
- curOp->debug().setPlanSummaryMetrics(postExecutionStats);
-
- // We do not report 'execStats' for aggregation or other globally managed cursors, both in
- // the original request and subsequent getMore. It would be useful to have this information
- // for an aggregation, but the source PlanExecutor could be destroyed before we know whether
- // we need execStats and we do not want to generate for all operations due to cost.
- if (!CursorManager::isGloballyManagedCursor(request.cursorid) && curOp->shouldDBProfile()) {
- BSONObjBuilder execStatsBob;
- Explain::getWinningPlanStats(exec, &execStatsBob);
- curOp->debug().execStats = execStatsBob.obj();
- }
+ if (shouldSaveCursorGetMore(state, exec, cursor->isTailable())) {
+ respondWithId = _request.cursorid;
- if (shouldSaveCursorGetMore(state, exec, cursor->isTailable())) {
- respondWithId = request.cursorid;
+ exec->saveState();
+ exec->detachFromOperationContext();
- exec->saveState();
- exec->detachFromOperationContext();
+ cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
+ cursor->incPos(numResults);
+ } else {
+ curOp->debug().cursorExhausted = true;
+ }
- cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
- cursor->incPos(numResults);
- } else {
- curOp->debug().cursorExhausted = true;
- }
+ nextBatch.done(respondWithId, _request.nss.ns());
- nextBatch.done(respondWithId, request.nss.ns());
+ // Ensure log and profiler include the number of results returned in this getMore's
+ // response
+ // batch.
+ curOp->debug().nreturned = numResults;
- // Ensure log and profiler include the number of results returned in this getMore's response
- // batch.
- curOp->debug().nreturned = numResults;
+ if (respondWithId) {
+ cursorFreer.Dismiss();
+ }
- if (respondWithId) {
- cursorFreer.Dismiss();
+ // We're about to unpin the cursor as the ClientCursorPin goes out of scope (or delete
+ // it,
+ // if it has been exhausted). If the
+ // 'waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch'
+ // failpoint is active, set the 'msg' field of this operation's CurOp to signal that
+ // we've
+ // hit this point and then spin until the failpoint is released.
+ if (MONGO_FAIL_POINT(waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch)) {
+ CurOpFailpointHelpers::waitWhileFailPointEnabled(
+ &waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch,
+ opCtx,
+ "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch",
+ dropAndReaquireReadLock);
+ }
}
- // We're about to unpin the cursor as the ClientCursorPin goes out of scope (or delete it,
- // if it has been exhausted). If the 'waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch'
- // failpoint is active, set the 'msg' field of this operation's CurOp to signal that we've
- // hit this point and then spin until the failpoint is released.
- if (MONGO_FAIL_POINT(waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch)) {
- CurOpFailpointHelpers::waitWhileFailPointEnabled(
- &waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch,
- opCtx,
- "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch",
- dropAndReaquireReadLock);
- }
+ const GetMoreRequest _request;
+ };
- return true;
+ bool maintenanceOk() const override {
+ return false;
}
- bool run(OperationContext* opCtx,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) override {
- // Counted as a getMore, not as a command.
- globalOpCounters.gotGetMore();
-
- StatusWith<GetMoreRequest> parsedRequest = GetMoreRequest::parseFromBSON(dbname, cmdObj);
- uassertStatusOK(parsedRequest.getStatus());
- auto request = parsedRequest.getValue();
- return runParsed(opCtx, request.nss, request, cmdObj, result);
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kAlways;
}
- /**
- * Uses 'cursor' and 'request' to fill out 'nextBatch' with the batch of result documents to
- * be returned by this getMore.
- *
- * Returns the number of documents in the batch in *numResults, which must be initialized to
- * zero by the caller. Returns the final ExecState returned by the cursor in *state.
- *
- * Returns an OK status if the batch was successfully generated, and a non-OK status if the
- * PlanExecutor encounters a failure.
- */
- Status generateBatch(OperationContext* opCtx,
- ClientCursor* cursor,
- const GetMoreRequest& request,
- CursorResponseBuilder* nextBatch,
- PlanExecutor::ExecState* state,
- long long* numResults) {
- PlanExecutor* exec = cursor->getExecutor();
-
- // If an awaitData getMore is killed during this process due to our max time expiring at
- // an interrupt point, we just continue as normal and return rather than reporting a
- // timeout to the user.
- BSONObj obj;
- try {
- while (!FindCommon::enoughForGetMore(request.batchSize.value_or(0), *numResults) &&
- PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, NULL))) {
- // If adding this object will cause us to exceed the message size limit, then we
- // stash it for later.
- if (!FindCommon::haveSpaceForNext(obj, *numResults, nextBatch->bytesUsed())) {
- exec->enqueue(obj);
- break;
- }
+ ReadWriteType getReadWriteType() const {
+ return ReadWriteType::kRead;
+ }
- // As soon as we get a result, this operation no longer waits.
- awaitDataState(opCtx).shouldWaitForInserts = false;
- // Add result to output buffer.
- nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp());
- nextBatch->append(obj);
- (*numResults)++;
- }
- } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) {
- // FAILURE state will make getMore command close the cursor even if it's tailable.
- *state = PlanExecutor::FAILURE;
- return Status::OK();
- }
+ std::string help() const override {
+ return "retrieve more results from an existing cursor";
+ }
- switch (*state) {
- case PlanExecutor::FAILURE:
- // Log an error message and then perform the same cleanup as DEAD.
- error() << "GetMore command executor error: " << PlanExecutor::statestr(*state)
- << ", stats: " << redact(Explain::getWinningPlanStats(exec));
- case PlanExecutor::DEAD: {
- nextBatch->abandon();
- // We should always have a valid status member object at this point.
- auto status = WorkingSetCommon::getMemberObjectStatus(obj);
- invariant(!status.isOK());
- return status;
- }
- case PlanExecutor::IS_EOF:
- // This causes the reported latest oplog timestamp to advance even when there are
- // no results for this particular query.
- nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp());
- default:
- return Status::OK();
- }
+ LogicalOp getLogicalOp() const override {
+ return LogicalOp::opGetMore;
+ }
- MONGO_UNREACHABLE;
+ std::size_t reserveBytesForReply() const override {
+ // The extra 1K is an artifact of how we construct batches. We consider a batch to be full
+ // when it exceeds the goal batch size. In the case that we are just below the limit and
+ // then read a large document, the extra 1K helps prevent a final realloc+memcpy.
+ return FindCommon::kMaxBytesToReturnToClientAtOnce + 1024u;
}
+ /**
+ * A getMore command increments the getMore counter, not the command counter.
+ */
+ bool shouldAffectCommandCounter() const override {
+ return false;
+ }
+
+ bool adminOnly() const override {
+ return false;
+ }
} getMoreCmd;
} // namespace
diff --git a/src/mongo/db/query/getmore_request.cpp b/src/mongo/db/query/getmore_request.cpp
index ea2515123f6..d27b84521b4 100644
--- a/src/mongo/db/query/getmore_request.cpp
+++ b/src/mongo/db/query/getmore_request.cpp
@@ -91,18 +91,8 @@ Status GetMoreRequest::isValid() const {
}
// static
-NamespaceString GetMoreRequest::parseNs(const std::string& dbname, const BSONObj& cmdObj) {
- BSONElement collElt = cmdObj["collection"];
- const std::string coll = (collElt.type() == BSONType::String) ? collElt.String() : "";
-
- return NamespaceString(dbname, coll);
-}
-
-// static
StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbname,
const BSONObj& cmdObj) {
- invariant(!dbname.empty());
-
// Required fields.
boost::optional<CursorId> cursorid;
boost::optional<NamespaceString> nss;
@@ -129,7 +119,9 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna
<< cmdObj};
}
- nss = parseNs(dbname, cmdObj);
+ BSONElement collElt = cmdObj["collection"];
+ const std::string coll = (collElt.type() == BSONType::String) ? collElt.String() : "";
+ nss = NamespaceString(dbname, coll);
} else if (fieldName == kBatchSizeField) {
if (!el.isNumber()) {
return {ErrorCodes::TypeMismatch,
diff --git a/src/mongo/db/query/getmore_request.h b/src/mongo/db/query/getmore_request.h
index 4da4839abbd..9759dd0a5b1 100644
--- a/src/mongo/db/query/getmore_request.h
+++ b/src/mongo/db/query/getmore_request.h
@@ -69,8 +69,6 @@ struct GetMoreRequest {
*/
BSONObj toBSON() const;
- static NamespaceString parseNs(const std::string& dbname, const BSONObj& cmdObj);
-
const NamespaceString nss;
const CursorId cursorid;
diff --git a/src/mongo/s/commands/cluster_getmore_cmd.cpp b/src/mongo/s/commands/cluster_getmore_cmd.cpp
index 688cacd28ef..fb8068eb395 100644
--- a/src/mongo/s/commands/cluster_getmore_cmd.cpp
+++ b/src/mongo/s/commands/cluster_getmore_cmd.cpp
@@ -44,78 +44,75 @@ namespace {
* corresponding to the cursor id passed from the application. In order to generate these results,
* may issue getMore commands to remote nodes in one or more shards.
*/
-class ClusterGetMoreCmd final : public BasicCommand {
- MONGO_DISALLOW_COPYING(ClusterGetMoreCmd);
-
+class ClusterGetMoreCmd final : public Command {
public:
- ClusterGetMoreCmd() : BasicCommand("getMore") {}
+ ClusterGetMoreCmd() : Command("getMore") {}
- std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const final {
- return GetMoreRequest::parseNs(dbname, cmdObj).ns();
+ std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx,
+ const OpMsgRequest& opMsgRequest) override {
+ return std::make_unique<Invocation>(this, opMsgRequest);
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
- return false;
- }
+ class Invocation final : public CommandInvocation {
+ public:
+ Invocation(Command* cmd, const OpMsgRequest& request)
+ : CommandInvocation(cmd),
+ _request(uassertStatusOK(
+ GetMoreRequest::parseFromBSON(request.getDatabase().toString(), request.body))) {}
+
+ private:
+ NamespaceString ns() const override {
+ return _request.nss;
+ }
+
+ bool supportsWriteConcern() const override {
+ return false;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassertStatusOK(AuthorizationSession::get(opCtx->getClient())
+ ->checkAuthForGetMore(_request.nss,
+ _request.cursorid,
+ _request.term.is_initialized()));
+ }
+
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override {
+ // Counted as a getMore, not as a command.
+ globalOpCounters.gotGetMore();
+ auto bob = reply->getBodyBuilder();
+ auto response = uassertStatusOK(ClusterFind::runGetMore(opCtx, _request));
+ response.addToBSON(CursorResponse::ResponseType::SubsequentResponse, &bob);
+ }
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const final {
+ const GetMoreRequest _request;
+ };
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kAlways;
}
- bool maintenanceOk() const final {
+ bool maintenanceOk() const override {
return false;
}
- bool adminOnly() const final {
+ bool adminOnly() const override {
return false;
}
/**
* A getMore command increments the getMore counter, not the command counter.
*/
- bool shouldAffectCommandCounter() const final {
+ bool shouldAffectCommandCounter() const override {
return false;
}
- std::string help() const final {
+ std::string help() const override {
return "retrieve more documents for a cursor id";
}
- LogicalOp getLogicalOp() const final {
+ LogicalOp getLogicalOp() const override {
return LogicalOp::opGetMore;
}
-
- Status checkAuthForCommand(Client* client,
- const std::string& dbname,
- const BSONObj& cmdObj) const final {
- StatusWith<GetMoreRequest> parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj);
- if (!parseStatus.isOK()) {
- return parseStatus.getStatus();
- }
- const GetMoreRequest& request = parseStatus.getValue();
-
- return AuthorizationSession::get(client)->checkAuthForGetMore(
- request.nss, request.cursorid, request.term.is_initialized());
- }
-
- bool run(OperationContext* opCtx,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) final {
- // Counted as a getMore, not as a command.
- globalOpCounters.gotGetMore();
-
- StatusWith<GetMoreRequest> parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj);
- uassertStatusOK(parseStatus.getStatus());
- const GetMoreRequest& request = parseStatus.getValue();
-
- auto response = ClusterFind::runGetMore(opCtx, request);
- uassertStatusOK(response.getStatus());
-
- response.getValue().addToBSON(CursorResponse::ResponseType::SubsequentResponse, &result);
- return true;
- }
-
} cmdGetMoreCluster;
} // namespace