summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/find.cpp
diff options
context:
space:
mode:
authorIrina Yatsenko <irina.yatsenko@mongodb.com>2021-07-07 15:44:54 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-16 05:38:55 +0000
commitbeeab6beaf18232e52bb3094f5f31fe83fbae2a4 (patch)
treedce5b9fefa813283212757dcf16f59e4b8bffe9e /src/mongo/db/query/find.cpp
parent23ecc48f89f4ec03d7b42e637c5969802efdb261 (diff)
downloadmongo-beeab6beaf18232e52bb3094f5f31fe83fbae2a4.tar.gz
SERVER-57391 Return error response to OP_QUERY and OP_GET_MORE messages
Diffstat (limited to 'src/mongo/db/query/find.cpp')
-rw-r--r--src/mongo/db/query/find.cpp673
1 files changed, 0 insertions, 673 deletions
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index c4dac17cfec..0f791f2d11e 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -78,9 +78,6 @@ using std::unique_ptr;
// Failpoint for checking whether we've received a getmore.
MONGO_FAIL_POINT_DEFINE(failReceivedGetmore);
-// Failpoint to keep a cursor pinned.
-MONGO_FAIL_POINT_DEFINE(legacyGetMoreWaitWithCursor)
-
bool shouldSaveCursor(OperationContext* opCtx,
const CollectionPtr& collection,
PlanExecutor::ExecState finalState,
@@ -148,674 +145,4 @@ void endQueryOp(OperationContext* opCtx,
}
}
-namespace {
-
-/**
- * Uses 'cursor' to fill out 'bb' with the batch of result documents to be returned by this getMore.
- *
- * Returns the number of documents in the batch in 'numResults', which must be initialized to
- * zero by the caller. Returns the final ExecState returned by the cursor in *state.
- *
- * Throws an exception if the PlanExecutor encounters a failure.
- */
-void generateBatch(int ntoreturn,
- ClientCursor* cursor,
- BufBuilder* bb,
- std::uint64_t* numResults,
- ResourceConsumption::DocumentUnitCounter* docUnitsReturned,
- PlanExecutor::ExecState* state) {
- PlanExecutor* exec = cursor->getExecutor();
-
- try {
- BSONObj obj;
- while (!FindCommon::enoughForGetMore(ntoreturn, *numResults) &&
- PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, nullptr))) {
-
- // If we can't fit this result inside the current batch, then we stash it for later.
- if (!FindCommon::haveSpaceForNext(obj, *numResults, bb->len())) {
- exec->enqueue(obj);
- break;
- }
-
- // Add result to output buffer.
- bb->appendBuf((void*)obj.objdata(), obj.objsize());
-
- // Count the result.
- (*numResults)++;
-
- docUnitsReturned->observeOne(obj.objsize());
- }
- } catch (DBException& exception) {
- auto&& explainer = exec->getPlanExplainer();
- auto&& [stats, _] = explainer.getWinningPlanStats(ExplainOptions::Verbosity::kExecStats);
- LOGV2_ERROR(20918, "getMore executor error", "stats"_attr = redact(stats));
- exception.addContext("Executor error during OP_GET_MORE");
- throw;
- }
-}
-
-Message makeCursorNotFoundResponse() {
- const int initialBufSize = 512 + sizeof(QueryResult::Value);
- BufBuilder bb(initialBufSize);
- bb.skip(sizeof(QueryResult::Value));
- QueryResult::View qr = bb.buf();
- qr.msgdata().setLen(bb.len());
- qr.msgdata().setOperation(opReply);
- qr.setResultFlags(ResultFlag_CursorNotFound);
- qr.setCursorId(0);
- qr.setStartingFrom(0);
- qr.setNReturned(0);
- return Message(bb.release());
-}
-
-} // namespace
-
-/**
- * Called by db/instance.cpp. This is the getMore entry point.
- */
-Message getMore(OperationContext* opCtx,
- const char* ns,
- int ntoreturn,
- long long cursorid,
- bool* exhaust,
- bool* isCursorAuthorized) {
- invariant(ntoreturn >= 0);
-
- LOGV2_DEBUG(20909, 5, "Running getMore", "cursorId"_attr = cursorid);
-
- CurOp& curOp = *CurOp::get(opCtx);
- curOp.ensureStarted();
-
- // For testing, we may want to fail if we receive a getmore.
- if (MONGO_unlikely(failReceivedGetmore.shouldFail())) {
- MONGO_UNREACHABLE;
- }
-
- *exhaust = false;
-
- const NamespaceString nss(ns);
-
- ResourceConsumption::ScopedMetricsCollector scopedMetrics(opCtx, nss.db().toString());
-
- // Cursors come in one of two flavors:
- //
- // - Cursors which read from a single collection, such as those generated via the find command.
- // For these cursors, we hold the appropriate collection lock for the duration of the getMore
- // using AutoGetCollectionForRead. These cursors have the 'kLockExternally' lock policy.
- //
- // - Cursors which may read from many collections, e.g. those generated via the aggregate
- // command, or which do not read from a collection at all, e.g. those generated by the
- // listIndexes command. We don't need to acquire locks to use these cursors, since they either
- // manage locking themselves or don't access data protected by collection locks. These cursors
- // have the 'kLocksInternally' lock policy.
- //
- // While we only need to acquire locks for 'kLockExternally' cursors, we need to create an
- // AutoStatsTracker in either case. This is responsible for updating statistics in CurOp and
- // Top. We avoid using AutoGetCollectionForReadCommand because we may need to drop and reacquire
- // locks when the cursor is awaitData, but we don't want to update the stats twice.
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- boost::optional<AutoGetCollectionForReadMaybeLockFree> readLock;
- boost::optional<AutoStatsTracker> statsTracker;
-
- // These are set in the QueryResult msg we return.
- int resultFlags = ResultFlag_AwaitCapable;
-
- auto cursorManager = CursorManager::get(opCtx);
- auto statusWithCursorPin = cursorManager->pinCursor(opCtx, cursorid);
- if (statusWithCursorPin == ErrorCodes::CursorNotFound) {
- return makeCursorNotFoundResponse();
- }
- uassertStatusOK(statusWithCursorPin.getStatus());
- auto cursorPin = std::move(statusWithCursorPin.getValue());
-
- // Set kMajorityCommitted before we instantiate readLock. We should not override readSource
- // after storage snapshot is setup.
- const auto replicationMode = repl::ReplicationCoordinator::get(opCtx)->getReplicationMode();
- if (replicationMode == repl::ReplicationCoordinator::modeReplSet &&
- cursorPin->getReadConcernArgs().getLevel() ==
- repl::ReadConcernLevel::kMajorityReadConcern) {
- opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kMajorityCommitted);
- uassertStatusOK(opCtx->recoveryUnit()->majorityCommittedSnapshotAvailable());
- }
-
- opCtx->setExhaust(cursorPin->queryOptions() & QueryOption_Exhaust);
-
- if (cursorPin->getExecutor()->lockPolicy() == PlanExecutor::LockPolicy::kLocksInternally) {
- if (!nss.isCollectionlessCursorNamespace()) {
- AutoGetDb autoDb(opCtx, nss.db(), MODE_IS);
- statsTracker.emplace(opCtx,
- nss,
- Top::LockType::NotLocked,
- AutoStatsTracker::LogMode::kUpdateTopAndCurOp,
- CollectionCatalog::get(opCtx)->getDatabaseProfileLevel(nss.db()));
- auto view =
- autoDb.getDb() ? ViewCatalog::get(autoDb.getDb())->lookup(opCtx, nss) : nullptr;
- uassert(
- ErrorCodes::CommandNotSupportedOnView,
- str::stream() << "Namespace " << nss.ns()
- << " is a view. OP_GET_MORE operations are not supported on views. "
- << "Only clients which support the getMore command can be used to "
- "query views.",
- !view);
- }
- } else {
- readLock.emplace(opCtx, nss);
- statsTracker.emplace(opCtx,
- nss,
- Top::LockType::ReadLocked,
- AutoStatsTracker::LogMode::kUpdateTopAndCurOp,
- CollectionCatalog::get(opCtx)->getDatabaseProfileLevel(nss.db()));
-
- // This checks to make sure the operation is allowed on a replicated node. Since we are not
- // passing in a query object (necessary to check SlaveOK query option), we allow reads
- // whether we are PRIMARY or SECONDARY.
- uassertStatusOK(
- repl::ReplicationCoordinator::get(opCtx)->checkCanServeReadsFor(opCtx, nss, true));
- }
-
- std::uint64_t numResults = 0;
- int startingResult = 0;
- ResourceConsumption::DocumentUnitCounter docUnitsReturned;
-
- const int initialBufSize =
- 512 + sizeof(QueryResult::Value) + FindCommon::kMaxBytesToReturnToClientAtOnce;
-
- BufBuilder bb(initialBufSize);
- bb.skip(sizeof(QueryResult::Value));
-
- // Check for spoofing of the ns such that it does not match the one originally there for the
- // cursor.
- uassert(ErrorCodes::Unauthorized,
- str::stream() << "Requested getMore on namespace " << ns << ", but cursor " << cursorid
- << " belongs to namespace " << cursorPin->nss().ns(),
- nss == cursorPin->nss());
-
- // A user can only call getMore on their own cursor. If there were multiple users authenticated
- // when the cursor was created, then at least one of them must be authenticated in order to run
- // getMore on the cursor.
- uassert(ErrorCodes::Unauthorized,
- str::stream() << "cursor id " << cursorid
- << " was not created by the authenticated user",
- AuthorizationSession::get(opCtx->getClient())
- ->isCoauthorizedWith(cursorPin->getAuthenticatedUsers()));
-
- *isCursorAuthorized = true;
-
- // Only used by the failpoints.
- std::function<void()> dropAndReaquireReadLock = [&] {
- // Make sure an interrupted operation does not prevent us from reacquiring the lock.
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
-
- readLock.reset();
- readLock.emplace(opCtx, nss);
- };
-
- // On early return, get rid of the cursor.
- auto cursorFreer = makeGuard([&] { cursorPin.deleteUnderlying(); });
-
- // If the 'waitAfterPinningCursorBeforeGetMoreBatch' fail point is enabled, set the
- // 'msg' field of this operation's CurOp to signal that we've hit this point and then
- // repeatedly release and re-acquire the collection readLock at regular intervals until
- // the failpoint is released. This is done in order to avoid deadlocks caused by the
- // pinned-cursor failpoints in this file (see SERVER-21997).
- waitAfterPinningCursorBeforeGetMoreBatch.execute([&](const BSONObj& data) {
- if (data["shouldNotdropLock"].booleanSafe()) {
- dropAndReaquireReadLock = []() {};
- }
-
- CurOpFailpointHelpers::waitWhileFailPointEnabled(&waitAfterPinningCursorBeforeGetMoreBatch,
- opCtx,
- "waitAfterPinningCursorBeforeGetMoreBatch",
- dropAndReaquireReadLock,
- nss);
- });
-
- uassert(40548,
- "OP_GET_MORE operations are not supported on tailable aggregations. Only clients "
- "which support the getMore command can be used on tailable aggregations.",
- readLock || !cursorPin->isAwaitData());
- uassert(
- 31124,
- str::stream()
- << "OP_GET_MORE does not support cursors with a write concern other than the default."
- " Use the getMore command instead. Write concern was: "
- << cursorPin->getWriteConcernOptions().toBSON(),
- cursorPin->getWriteConcernOptions().isImplicitDefaultWriteConcern());
-
- // If the operation that spawned this cursor had a time limit set, apply leftover time to this
- // getmore.
- if (cursorPin->getLeftoverMaxTimeMicros() < Microseconds::max()) {
- uassert(40136,
- "Illegal attempt to set operation deadline within DBDirectClient",
- !opCtx->getClient()->isInDirectClient());
- opCtx->setDeadlineAfterNowBy(cursorPin->getLeftoverMaxTimeMicros(),
- ErrorCodes::MaxTimeMSExpired);
- }
- opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
-
- // What number result are we starting at? Used to fill out the reply.
- startingResult = cursorPin->nReturnedSoFar();
-
- uint64_t notifierVersion = 0;
- std::shared_ptr<CappedInsertNotifier> notifier;
- if (cursorPin->isAwaitData()) {
- invariant(readLock->getCollection()->isCapped());
- // Retrieve the notifier which we will wait on until new data arrives. We make sure to do
- // this in the lock because once we drop the lock it is possible for the collection to
- // become invalid. The notifier itself will outlive the collection if the collection is
- // dropped, as we keep a shared_ptr to it.
- notifier = readLock->getCollection()->getCappedInsertNotifier();
-
- // Must get the version before we call generateBatch in case a write comes in after that
- // call and before we call wait on the notifier.
- notifierVersion = notifier->getVersion();
- }
-
- PlanExecutor* exec = cursorPin->getExecutor();
- exec->reattachToOperationContext(opCtx);
- exec->restoreState(readLock ? &readLock->getCollection() : nullptr);
-
- auto planSummary = exec->getPlanExplainer().getPlanSummary();
- {
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- curOp.setPlanSummary_inlock(planSummary);
-
- // Ensure that the original query object is available in the slow query log, profiler and
- // currentOp. Upconvert _query to resemble a getMore command, and set the original command
- // or upconverted legacy query in the originatingCommand field.
- curOp.setOpDescription_inlock(upconvertGetMoreEntry(nss, cursorid, ntoreturn));
- curOp.setOriginatingCommand_inlock(cursorPin->getOriginatingCommandObj());
- // Update the generic cursor in curOp.
- curOp.setGenericCursor_inlock(cursorPin->toGenericCursor());
- }
-
- // If the 'failGetMoreAfterCursorCheckout' failpoint is enabled, throw an exception with the
- // specified 'errorCode' value, or ErrorCodes::InternalError if 'errorCode' is omitted.
- failGetMoreAfterCursorCheckout.executeIf(
- [](const BSONObj& data) {
- auto errorCode = (data["errorCode"] ? data["errorCode"].safeNumberLong()
- : ErrorCodes::InternalError);
- uasserted(errorCode, "Hit the 'failGetMoreAfterCursorCheckout' failpoint");
- },
- [&opCtx, &nss](const BSONObj& data) {
- auto dataForFailCommand =
- data.addField(BSON("failCommands" << BSON_ARRAY("getMore")).firstElement());
- auto* getMoreCommand = CommandHelpers::findCommand("getMore");
- return CommandHelpers::shouldActivateFailCommandFailPoint(
- dataForFailCommand, nss, getMoreCommand, opCtx->getClient());
- });
-
- PlanExecutor::ExecState state;
-
- // We report keysExamined and docsExamined to OpDebug for a given getMore operation. To obtain
- // these values we need to take a diff of the pre-execution and post-execution metrics, as they
- // accumulate over the course of a cursor's lifetime.
- PlanSummaryStats preExecutionStats;
- exec->getPlanExplainer().getSummaryStats(&preExecutionStats);
- if (MONGO_unlikely(waitWithPinnedCursorDuringGetMoreBatch.shouldFail())) {
- CurOpFailpointHelpers::waitWhileFailPointEnabled(&waitWithPinnedCursorDuringGetMoreBatch,
- opCtx,
- "waitWithPinnedCursorDuringGetMoreBatch",
- nullptr);
- }
-
- generateBatch(ntoreturn, cursorPin.getCursor(), &bb, &numResults, &docUnitsReturned, &state);
-
- // If this is an await data cursor, and we hit EOF without generating any results, then we block
- // waiting for new data to arrive.
- if (cursorPin->isAwaitData() && state == PlanExecutor::IS_EOF && numResults == 0) {
- // Save the PlanExecutor and drop our locks.
- exec->saveState();
- readLock.reset();
-
- // Block waiting for data for up to 1 second. Time spent blocking is not counted towards the
- // total operation latency.
- curOp.pauseTimer();
- Seconds timeout(1);
- notifier->waitUntil(notifierVersion,
- opCtx->getServiceContext()->getPreciseClockSource()->now() + timeout);
- notifier.reset();
- curOp.resumeTimer();
-
- // Reacquiring locks.
- readLock.emplace(opCtx, nss);
- exec->restoreState(&readLock->getCollection());
-
- // We woke up because either the timed_wait expired, or there was more data. Either way,
- // attempt to generate another batch of results.
- generateBatch(
- ntoreturn, cursorPin.getCursor(), &bb, &numResults, &docUnitsReturned, &state);
- }
-
- PlanSummaryStats postExecutionStats;
- auto&& explainer = exec->getPlanExplainer();
- explainer.getSummaryStats(&postExecutionStats);
- postExecutionStats.totalKeysExamined -= preExecutionStats.totalKeysExamined;
- postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined;
- curOp.debug().setPlanSummaryMetrics(postExecutionStats);
-
- // We do not report 'execStats' for aggregation or other cursors with the 'kLocksInternally'
- // policy, both in the original request and subsequent getMore. It would be useful to have this
- // info for an aggregation, but the source PlanExecutor could be destroyed before we know if we
- // need 'execStats' and we do not want to generate the stats eagerly for all operations due to
- // cost.
- if (cursorPin->getExecutor()->lockPolicy() != PlanExecutor::LockPolicy::kLocksInternally &&
- curOp.shouldDBProfile(opCtx)) {
- auto&& [stats, _] = explainer.getWinningPlanStats(ExplainOptions::Verbosity::kExecStats);
-
- curOp.debug().execStats = std::move(stats);
- }
-
- // Our two possible ClientCursorPin cleanup paths are:
- // 1) If the cursor is not going to be saved, we call deleteUnderlying() on the pin.
- // 2) If the cursor is going to be saved, we simply let the pin go out of scope. In this case,
- // the pin's destructor will be invoked, which will call release() on the pin. Because our
- // ClientCursorPin is declared after our lock is declared, this will happen under the lock if
- // any locking was necessary.
- if (!shouldSaveCursorGetMore(exec, cursorPin->isTailable())) {
- // cc is now invalid, as is the executor
- cursorid = 0;
- curOp.debug().cursorExhausted = true;
-
- LOGV2_DEBUG(20910,
- 5,
- "getMore NOT saving client cursor",
- "planExecutorState"_attr = PlanExecutor::stateToStr(state));
- } else {
- cursorFreer.dismiss();
- // Continue caching the ClientCursor.
- cursorPin->incNReturnedSoFar(numResults);
- cursorPin->incNBatches();
- exec->saveState();
- exec->detachFromOperationContext();
- LOGV2_DEBUG(20911,
- 5,
- "getMore saving client cursor",
- "planExecutorState"_attr = PlanExecutor::stateToStr(state));
-
- // Set 'exhaust' if the client requested exhaust and the cursor is not exhausted.
- *exhaust = opCtx->isExhaust();
-
- // We assume that cursors created through a DBDirectClient are always used from their
- // original OperationContext, so we do not need to move time to and from the cursor.
- if (!opCtx->getClient()->isInDirectClient()) {
- // If the getmore had a time limit, remaining time is "rolled over" back to the cursor
- // (for use by future getmore ops).
- cursorPin->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
- }
- }
-
- // We're about to unpin or delete the cursor as the ClientCursorPin goes out of scope.
- // If the 'waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch' failpoint is active, we
- // set the 'msg' field of this operation's CurOp to signal that we've hit this point and
- // then spin until the failpoint is released.
- if (MONGO_unlikely(waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch.shouldFail())) {
- CurOpFailpointHelpers::waitWhileFailPointEnabled(
- &waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch,
- opCtx,
- "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch",
- dropAndReaquireReadLock);
- }
-
- // Increment this metric once the command succeeds and we know it will return documents.
- auto& metricsCollector = ResourceConsumption::MetricsCollector::get(opCtx);
- metricsCollector.incrementDocUnitsReturned(docUnitsReturned);
-
- QueryResult::View qr = bb.buf();
- qr.msgdata().setLen(bb.len());
- qr.msgdata().setOperation(opReply);
- qr.setResultFlags(resultFlags);
- qr.setCursorId(cursorid);
- qr.setStartingFrom(startingResult);
- qr.setNReturned(numResults);
- LOGV2_DEBUG(20912, 5, "getMore returned results", "numResults"_attr = numResults);
- return Message(bb.release());
-}
-
-bool runQuery(OperationContext* opCtx,
- QueryMessage& q,
- const NamespaceString& nss,
- Message& result) {
- CurOp& curOp = *CurOp::get(opCtx);
- curOp.ensureStarted();
-
- uassert(ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid ns [" << nss.ns() << "]",
- nss.isValid());
- invariant(!nss.isCommand());
-
- ResourceConsumption::ScopedMetricsCollector scopedMetrics(opCtx, nss.db().toString());
-
- // Set CurOp information.
- const auto upconvertedQuery = upconvertQueryEntry(q.query, nss, q.ntoreturn, q.ntoskip);
-
- // Extract the 'comment' parameter from the upconverted query, if it exists.
- if (auto commentField = upconvertedQuery["comment"]) {
- opCtx->setComment(commentField.wrap());
- }
-
- beginQueryOp(opCtx, nss, upconvertedQuery, q.ntoreturn, q.ntoskip);
-
- // Parse the qm into a CanonicalQuery.
- const boost::intrusive_ptr<ExpressionContext> expCtx =
- make_intrusive<ExpressionContext>(opCtx, nullptr /* collator */, nss);
- auto cq = uassertStatusOKWithContext(
- CanonicalQuery::canonicalize(opCtx,
- q,
- expCtx,
- ExtensionsCallbackReal(opCtx, &nss),
- MatchExpressionParser::kAllowAllSpecialFeatures),
- "Can't canonicalize query");
- invariant(cq.get());
-
- LOGV2_DEBUG(20913, 5, "Running query", "query"_attr = redact(cq->toString()));
- LOGV2_DEBUG(20914, 2, "Running query", "query"_attr = redact(cq->toStringShort()));
-
- // Parse, canonicalize, plan, transcribe, and get a plan executor.
- AutoGetCollectionForReadCommandMaybeLockFree collection(
- opCtx, nss, AutoGetCollectionViewMode::kViewsForbidden);
-
- const bool isExhaust = (q.queryOptions & QueryOption_Exhaust) != 0;
- opCtx->setExhaust(isExhaust);
-
- {
- // Allow the query to run on secondaries if the read preference permits it. If no read
- // preference was specified, allow the query to run iff slaveOk has been set.
- const bool isSecondaryOk = (q.queryOptions & QueryOption_SecondaryOk) != 0;
- const bool hasReadPref = q.query.hasField(query_request_helper::kWrappedReadPrefField);
- const bool secondaryOk = hasReadPref
- ? uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(q.query))
- .canRunOnSecondary()
- : isSecondaryOk;
- uassertStatusOK(repl::ReplicationCoordinator::get(opCtx)->checkCanServeReadsFor(
- opCtx, nss, secondaryOk));
- }
-
- const FindCommandRequest& findCommand = cq->getFindCommandRequest();
- // Get the execution plan for the query.
- constexpr auto verbosity = ExplainOptions::Verbosity::kExecAllPlans;
- const bool isExplain = cq->getExplain();
- expCtx->explain = isExplain ? boost::make_optional(verbosity) : boost::none;
- auto exec =
- uassertStatusOK(getExecutorLegacyFind(opCtx, &collection.getCollection(), std::move(cq)));
-
- // If it's actually an explain, do the explain and return rather than falling through
- // to the normal query execution loop.
- if (isExplain) {
- BufBuilder bb;
- bb.skip(sizeof(QueryResult::Value));
-
- BSONObjBuilder explainBob;
- Explain::explainStages(exec.get(),
- collection.getCollection(),
- verbosity,
- BSONObj(),
- upconvertedQuery,
- &explainBob);
-
- // Add the resulting object to the return buffer.
- BSONObj explainObj = explainBob.obj();
- bb.appendBuf((void*)explainObj.objdata(), explainObj.objsize());
-
- // Set query result fields.
- QueryResult::View qr = bb.buf();
- qr.setResultFlagsToOk();
- qr.msgdata().setLen(bb.len());
- curOp.debug().responseLength = bb.len();
- qr.msgdata().setOperation(opReply);
- qr.setCursorId(0);
- qr.setStartingFrom(0);
- qr.setNReturned(1);
- result.setData(bb.release());
- return false;
- }
-
- int maxTimeMS = findCommand.getMaxTimeMS() ? static_cast<int>(*findCommand.getMaxTimeMS()) : 0;
- // Handle query option $maxTimeMS (not used with commands).
- if (maxTimeMS > 0) {
- uassert(40116,
- "Illegal attempt to set operation deadline within DBDirectClient",
- !opCtx->getClient()->isInDirectClient());
- opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}, ErrorCodes::MaxTimeMSExpired);
- }
- opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
-
- FindCommon::waitInFindBeforeMakingBatch(opCtx, *exec->getCanonicalQuery());
-
- // Run the query.
- // bb is used to hold query results
- // this buffer should contain either requested documents per query or
- // explain information, but not both
- BufBuilder bb(FindCommon::kInitReplyBufferSize);
- bb.skip(sizeof(QueryResult::Value));
-
- // How many results have we obtained from the executor?
- int numResults = 0;
- ResourceConsumption::DocumentUnitCounter docUnitsReturned;
-
- BSONObj obj;
- PlanExecutor::ExecState state;
-
- // Get summary info about which plan the executor is using.
- {
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- curOp.setPlanSummary_inlock(exec->getPlanExplainer().getPlanSummary());
- }
-
- try {
- while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, nullptr))) {
- // If we can't fit this result inside the current batch, then we stash it for later.
- if (!FindCommon::haveSpaceForNext(obj, numResults, bb.len())) {
- exec->enqueue(obj);
- break;
- }
-
- // Add result to output buffer.
- bb.appendBuf((void*)obj.objdata(), obj.objsize());
-
- // Count the result.
- ++numResults;
-
- docUnitsReturned.observeOne(obj.objsize());
-
- if (FindCommon::enoughForFirstBatch(findCommand, numResults)) {
- LOGV2_DEBUG(20915,
- 5,
- "Enough for first batch",
- "wantMore"_attr = !findCommand.getSingleBatch(),
- "numToReturn"_attr = findCommand.getNtoreturn().value_or(0),
- "numResults"_attr = numResults);
- break;
- }
- }
- } catch (DBException& exception) {
- auto&& explainer = exec->getPlanExplainer();
- auto&& [stats, _] = explainer.getWinningPlanStats(ExplainOptions::Verbosity::kExecStats);
- LOGV2_ERROR(20919,
- "Plan executor error during find",
- "error"_attr = redact(exception.toStatus()),
- "stats"_attr = redact(stats));
-
- exception.addContext("Executor error during find");
- throw;
- }
-
- // Fill out CurOp based on query results. If we have a cursorid, we will fill out CurOp with
- // this cursorid later.
- long long ccId = 0;
-
- if (shouldSaveCursor(opCtx, collection.getCollection(), state, exec.get())) {
- // We won't use the executor until it's getMore'd.
- exec->saveState();
- exec->detachFromOperationContext();
-
- const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- // Allocate a new ClientCursor and register it with the cursor manager.
- ClientCursorPin pinnedCursor = CursorManager::get(opCtx)->registerCursor(
- opCtx,
- {std::move(exec),
- nss,
- AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- APIParameters::get(opCtx),
- opCtx->getWriteConcern(),
- readConcernArgs,
- upconvertedQuery,
- {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)}});
- ccId = pinnedCursor.getCursor()->cursorid();
-
- LOGV2_DEBUG(20916,
- 5,
- "Caching executor after returning results",
- "cursorId"_attr = ccId,
- "numResults"_attr = numResults);
-
- // Set curOp.debug().exhaust if the client requested exhaust and the cursor is not
- // exhausted.
- if (opCtx->isExhaust()) {
- curOp.debug().exhaust = true;
- }
-
- pinnedCursor.getCursor()->setNReturnedSoFar(numResults);
- pinnedCursor.getCursor()->incNBatches();
-
- // We assume that cursors created through a DBDirectClient are always used from their
- // original OperationContext, so we do not need to move time to and from the cursor.
- if (!opCtx->getClient()->isInDirectClient()) {
- // If the query had a time limit, remaining time is "rolled over" to the cursor (for
- // use by future getmore ops).
- pinnedCursor.getCursor()->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
- }
-
- endQueryOp(opCtx,
- collection.getCollection(),
- *pinnedCursor.getCursor()->getExecutor(),
- numResults,
- ccId);
- } else {
- LOGV2_DEBUG(
- 20917, 5, "Not caching executor but returning results", "numResults"_attr = numResults);
- endQueryOp(opCtx, collection.getCollection(), *exec, numResults, ccId);
- }
-
- // Increment this metric once it has succeeded and we know it will return documents.
- auto& metricsCollector = ResourceConsumption::MetricsCollector::get(opCtx);
- metricsCollector.incrementDocUnitsReturned(docUnitsReturned);
-
- // Fill out the output buffer's header.
- QueryResult::View queryResultView = bb.buf();
- queryResultView.setCursorId(ccId);
- queryResultView.setResultFlagsToOk();
- queryResultView.msgdata().setLen(bb.len());
- queryResultView.msgdata().setOperation(opReply);
- queryResultView.setStartingFrom(0);
- queryResultView.setNReturned(numResults);
-
- // Add the results from the query into the output buffer.
- result.setData(bb.release());
-
- // curOp.debug().exhaust is set above if the client requested exhaust and the cursor is not
- // exhausted.
- return curOp.debug().exhaust;
-}
-
} // namespace mongo