diff options
author | David Storch <david.storch@10gen.com> | 2018-11-28 17:25:24 -0500 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2019-01-15 17:54:33 -0500 |
commit | de2a803ca492261cac1d7f43a9f7c847cd0ea24d (patch) | |
tree | 03cb6ea2b304463e7458f557246a95978d1ef96a /src/mongo/db/query | |
parent | af8fa6034f8a989cb47ee890c6a6b3e87e1bcf7b (diff) | |
download | mongo-de2a803ca492261cac1d7f43a9f7c847cd0ea24d.tar.gz |
SERVER-37451 Move all ClientCursor ownership to the global CursorManager.
Deleting the per-collection CursorManagers, and other
related cleanup, is left as future work.
Diffstat (limited to 'src/mongo/db/query')
-rw-r--r-- | src/mongo/db/query/find.cpp | 428 |
1 files changed, 211 insertions, 217 deletions
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index a4ad2ad5d91..765e79a7183 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -215,6 +215,20 @@ void generateBatch(int ntoreturn, MONGO_UNREACHABLE; } +Message makeCursorNotFoundResponse() { + const int initialBufSize = 512 + sizeof(QueryResult::Value); + BufBuilder bb(initialBufSize); + bb.skip(sizeof(QueryResult::Value)); + QueryResult::View qr = bb.buf(); + qr.msgdata().setLen(bb.len()); + qr.msgdata().setOperation(opReply); + qr.setResultFlags(ResultFlag_CursorNotFound); + qr.setCursorId(0); + qr.setStartingFrom(0); + qr.setNReturned(0); + return Message(bb.release()); +} + } // namespace /** @@ -228,6 +242,8 @@ Message getMore(OperationContext* opCtx, bool* isCursorAuthorized) { invariant(ntoreturn >= 0); + LOG(5) << "Running getMore, cursorid: " << cursorid; + CurOp& curOp = *CurOp::get(opCtx); curOp.ensureStarted(); @@ -241,48 +257,52 @@ Message getMore(OperationContext* opCtx, const NamespaceString nss(ns); // Cursors come in one of two flavors: - // - Cursors owned by the collection cursor manager, such as those generated via the find - // command. For these cursors, we hold the appropriate collection lock for the duration of the - // getMore using AutoGetCollectionForRead. - // - Cursors owned by the global cursor manager, such as those generated via the aggregate - // command. These cursors either hold no collection state or manage their collection state - // internally, so we acquire no locks. // - // While we only need to acquire locks in the case of a cursor which is *not* globally owned, we - // need to create an AutoStatsTracker in either case. This is responsible for updating - // statistics in CurOp and Top. We avoid using AutoGetCollectionForReadCommand because we may - // need to drop and reacquire locks when the cursor is awaitData, but we don't want to update - // the stats twice. + // - Cursors which read from a single collection, such as those generated via the find command. + // For these cursors, we hold the appropriate collection lock for the duration of the getMore + // using AutoGetCollectionForRead. These cursors have the 'kLockExternally' lock policy. + // + // - Cursors which may read from many collections, e.g. those generated via the aggregate + // command, or which do not read from a collection at all, e.g. those generated by the + // listIndexes command. We don't need to acquire locks to use these cursors, since they either + // manage locking themselves or don't access data protected by collection locks. These cursors + // have the 'kLocksInternally' lock policy. // - // Note that we acquire our locks before our ClientCursorPin, in order to ensure that the pin's - // destructor is called before the lock's destructor (if there is one) so that the cursor - // cleanup can occur under the lock. + // While we only need to acquire locks for 'kLockExternally' cursors, we need to create an + // AutoStatsTracker in either case. This is responsible for updating statistics in CurOp and + // Top. We avoid using AutoGetCollectionForReadCommand because we may need to drop and reacquire + // locks when the cursor is awaitData, but we don't want to update the stats twice. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); boost::optional<AutoGetCollectionForRead> readLock; boost::optional<AutoStatsTracker> statsTracker; - CursorManager* cursorManager; - if (CursorManager::isGloballyManagedCursor(cursorid)) { - cursorManager = CursorManager::getGlobalCursorManager(); + // These are set in the QueryResult msg we return. + int resultFlags = ResultFlag_AwaitCapable; - if (boost::optional<NamespaceString> nssForCurOp = nss.isGloballyManagedNamespace() - ? nss.getTargetNSForGloballyManagedNamespace() - : nss) { - AutoGetDb autoDb(opCtx, nssForCurOp->db(), MODE_IS); + auto cursorManager = CursorManager::getGlobalCursorManager(); + auto statusWithCursorPin = cursorManager->pinCursor(opCtx, cursorid); + if (statusWithCursorPin == ErrorCodes::CursorNotFound) { + return makeCursorNotFoundResponse(); + } + uassertStatusOK(statusWithCursorPin.getStatus()); + auto cursorPin = std::move(statusWithCursorPin.getValue()); + + if (cursorPin->lockPolicy() == ClientCursorParams::LockPolicy::kLocksInternally) { + if (!nss.isCollectionlessCursorNamespace()) { + AutoGetDb autoDb(opCtx, nss.db(), MODE_IS); const auto profilingLevel = autoDb.getDb() ? boost::optional<int>{autoDb.getDb()->getProfilingLevel()} : boost::none; statsTracker.emplace(opCtx, - *nssForCurOp, + nss, Top::LockType::NotLocked, AutoStatsTracker::LogMode::kUpdateTopAndCurop, profilingLevel); - auto view = autoDb.getDb() - ? autoDb.getDb()->getViewCatalog()->lookup(opCtx, nssForCurOp->ns()) - : nullptr; + auto view = autoDb.getDb() ? autoDb.getDb()->getViewCatalog()->lookup(opCtx, nss.ns()) + : nullptr; uassert( ErrorCodes::CommandNotSupportedOnView, - str::stream() << "Namespace " << nssForCurOp->ns() + str::stream() << "Namespace " << nss.ns() << " is a view. OP_GET_MORE operations are not supported on views. " << "Only clients which support the getMore command can be used to " "query views.", @@ -297,10 +317,6 @@ Message getMore(OperationContext* opCtx, AutoStatsTracker::LogMode::kUpdateTopAndCurop, readLock->getDb() ? readLock->getDb()->getProfilingLevel() : doNotChangeProfilingLevel); - Collection* collection = readLock->getCollection(); - uassert( - ErrorCodes::OperationFailed, "collection dropped between getMore calls", collection); - cursorManager = collection->getCursorManager(); // This checks to make sure the operation is allowed on a replicated node. Since we are not // passing in a query object (necessary to check SlaveOK query option), we allow reads @@ -309,206 +325,183 @@ Message getMore(OperationContext* opCtx, repl::ReplicationCoordinator::get(opCtx)->checkCanServeReadsFor(opCtx, nss, true)); } - LOG(5) << "Running getMore, cursorid: " << cursorid; - - // A pin performs a CC lookup and if there is a CC, increments the CC's pin value so it - // doesn't time out. Also informs ClientCursor that there is somebody actively holding the - // CC, so don't delete it. - auto ccPin = cursorManager->pinCursor(opCtx, cursorid); - - // These are set in the QueryResult msg we return. - int resultFlags = ResultFlag_AwaitCapable; - std::uint64_t numResults = 0; int startingResult = 0; - const int InitialBufSize = + const int initialBufSize = 512 + sizeof(QueryResult::Value) + FindCommon::kMaxBytesToReturnToClientAtOnce; - BufBuilder bb(InitialBufSize); + BufBuilder bb(initialBufSize); bb.skip(sizeof(QueryResult::Value)); - if (!ccPin.isOK()) { - if (ccPin == ErrorCodes::CursorNotFound) { - cursorid = 0; - resultFlags = ResultFlag_CursorNotFound; - } else { - uassertStatusOK(ccPin.getStatus()); - } - } else { - ClientCursor* cc = ccPin.getValue().getCursor(); - - // Check for spoofing of the ns such that it does not match the one originally - // there for the cursor. - uassert(ErrorCodes::Unauthorized, - str::stream() << "Requested getMore on namespace " << ns << ", but cursor " - << cursorid - << " belongs to namespace " - << cc->nss().ns(), - nss == cc->nss()); - - // A user can only call getMore on their own cursor. If there were multiple users - // authenticated when the cursor was created, then at least one of them must be - // authenticated in order to run getMore on the cursor. - uassert(ErrorCodes::Unauthorized, - str::stream() << "cursor id " << cursorid - << " was not created by the authenticated user", - AuthorizationSession::get(opCtx->getClient()) - ->isCoauthorizedWith(cc->getAuthenticatedUsers())); - - *isCursorAuthorized = true; - - const auto replicationMode = repl::ReplicationCoordinator::get(opCtx)->getReplicationMode(); - - if (replicationMode == repl::ReplicationCoordinator::modeReplSet && - cc->getReadConcernArgs().getLevel() == repl::ReadConcernLevel::kMajorityReadConcern) { - opCtx->recoveryUnit()->setTimestampReadSource( - RecoveryUnit::ReadSource::kMajorityCommitted); - uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot()); - } + // Check for spoofing of the ns such that it does not match the one originally there for the + // cursor. + uassert(ErrorCodes::Unauthorized, + str::stream() << "Requested getMore on namespace " << ns << ", but cursor " << cursorid + << " belongs to namespace " + << cursorPin->nss().ns(), + nss == cursorPin->nss()); + + // A user can only call getMore on their own cursor. If there were multiple users authenticated + // when the cursor was created, then at least one of them must be authenticated in order to run + // getMore on the cursor. + uassert(ErrorCodes::Unauthorized, + str::stream() << "cursor id " << cursorid + << " was not created by the authenticated user", + AuthorizationSession::get(opCtx->getClient()) + ->isCoauthorizedWith(cursorPin->getAuthenticatedUsers())); + + *isCursorAuthorized = true; + + const auto replicationMode = repl::ReplicationCoordinator::get(opCtx)->getReplicationMode(); + + if (replicationMode == repl::ReplicationCoordinator::modeReplSet && + cursorPin->getReadConcernArgs().getLevel() == + repl::ReadConcernLevel::kMajorityReadConcern) { + opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kMajorityCommitted); + uassertStatusOK(opCtx->recoveryUnit()->obtainMajorityCommittedSnapshot()); + } - uassert(40548, - "OP_GET_MORE operations are not supported on tailable aggregations. Only clients " - "which support the getMore command can be used on tailable aggregations.", - readLock || !cc->isAwaitData()); - - // If the operation that spawned this cursor had a time limit set, apply leftover - // time to this getmore. - if (cc->getLeftoverMaxTimeMicros() < Microseconds::max()) { - uassert(40136, - "Illegal attempt to set operation deadline within DBDirectClient", - !opCtx->getClient()->isInDirectClient()); - opCtx->setDeadlineAfterNowBy(cc->getLeftoverMaxTimeMicros(), - ErrorCodes::MaxTimeMSExpired); - } - opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. - - // What number result are we starting at? Used to fill out the reply. - startingResult = cc->nReturnedSoFar(); - - uint64_t notifierVersion = 0; - std::shared_ptr<CappedInsertNotifier> notifier; - if (cc->isAwaitData()) { - invariant(readLock->getCollection()->isCapped()); - // Retrieve the notifier which we will wait on until new data arrives. We make sure - // to do this in the lock because once we drop the lock it is possible for the - // collection to become invalid. The notifier itself will outlive the collection if - // the collection is dropped, as we keep a shared_ptr to it. - notifier = readLock->getCollection()->getCappedInsertNotifier(); - - // Must get the version before we call generateBatch in case a write comes in after - // that call and before we call wait on the notifier. - notifierVersion = notifier->getVersion(); - } + uassert(40548, + "OP_GET_MORE operations are not supported on tailable aggregations. Only clients " + "which support the getMore command can be used on tailable aggregations.", + readLock || !cursorPin->isAwaitData()); - PlanExecutor* exec = cc->getExecutor(); - exec->reattachToOperationContext(opCtx); - exec->restoreState(); + // If the operation that spawned this cursor had a time limit set, apply leftover time to this + // getmore. + if (cursorPin->getLeftoverMaxTimeMicros() < Microseconds::max()) { + uassert(40136, + "Illegal attempt to set operation deadline within DBDirectClient", + !opCtx->getClient()->isInDirectClient()); + opCtx->setDeadlineAfterNowBy(cursorPin->getLeftoverMaxTimeMicros(), + ErrorCodes::MaxTimeMSExpired); + } + opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. - auto planSummary = Explain::getPlanSummary(exec); - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - curOp.setPlanSummary_inlock(planSummary); - - // Ensure that the original query object is available in the slow query log, profiler - // and currentOp. Upconvert _query to resemble a getMore command, and set the original - // command or upconverted legacy query in the originatingCommand field. - curOp.setOpDescription_inlock(upconvertGetMoreEntry(nss, cursorid, ntoreturn)); - curOp.setOriginatingCommand_inlock(cc->getOriginatingCommandObj()); - // Update the generic cursor in curOp. - curOp.setGenericCursor_inlock(cc->toGenericCursor()); - } + // What number result are we starting at? Used to fill out the reply. + startingResult = cursorPin->nReturnedSoFar(); + + uint64_t notifierVersion = 0; + std::shared_ptr<CappedInsertNotifier> notifier; + if (cursorPin->isAwaitData()) { + invariant(readLock->getCollection()->isCapped()); + // Retrieve the notifier which we will wait on until new data arrives. We make sure to do + // this in the lock because once we drop the lock it is possible for the collection to + // become invalid. The notifier itself will outlive the collection if the collection is + // dropped, as we keep a shared_ptr to it. + notifier = readLock->getCollection()->getCappedInsertNotifier(); + + // Must get the version before we call generateBatch in case a write comes in after that + // call and before we call wait on the notifier. + notifierVersion = notifier->getVersion(); + } - PlanExecutor::ExecState state; + PlanExecutor* exec = cursorPin->getExecutor(); + exec->reattachToOperationContext(opCtx); + exec->restoreState(); - // We report keysExamined and docsExamined to OpDebug for a given getMore operation. To - // obtain these values we need to take a diff of the pre-execution and post-execution - // metrics, as they accumulate over the course of a cursor's lifetime. - PlanSummaryStats preExecutionStats; - Explain::getSummaryStats(*exec, &preExecutionStats); - if (MONGO_FAIL_POINT(legacyGetMoreWaitWithCursor)) { - CurOpFailpointHelpers::waitWhileFailPointEnabled( - &legacyGetMoreWaitWithCursor, opCtx, "legacyGetMoreWaitWithCursor", nullptr); - } + auto planSummary = Explain::getPlanSummary(exec); + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + curOp.setPlanSummary_inlock(planSummary); + + // Ensure that the original query object is available in the slow query log, profiler and + // currentOp. Upconvert _query to resemble a getMore command, and set the original command + // or upconverted legacy query in the originatingCommand field. + curOp.setOpDescription_inlock(upconvertGetMoreEntry(nss, cursorid, ntoreturn)); + curOp.setOriginatingCommand_inlock(cursorPin->getOriginatingCommandObj()); + // Update the generic cursor in curOp. + curOp.setGenericCursor_inlock(cursorPin->toGenericCursor()); + } - generateBatch(ntoreturn, cc, &bb, &numResults, &state); - - // If this is an await data cursor, and we hit EOF without generating any results, then - // we block waiting for new data to arrive. - if (cc->isAwaitData() && state == PlanExecutor::IS_EOF && numResults == 0) { - // Save the PlanExecutor and drop our locks. - exec->saveState(); - readLock.reset(); - - // Block waiting for data for up to 1 second. Time spent blocking is not counted towards - // the total operation latency. - curOp.pauseTimer(); - Seconds timeout(1); - notifier->waitUntil(notifierVersion, - opCtx->getServiceContext()->getPreciseClockSource()->now() + - timeout); - notifier.reset(); - curOp.resumeTimer(); - - // Reacquiring locks. - readLock.emplace(opCtx, nss); - exec->restoreState(); - - // We woke up because either the timed_wait expired, or there was more data. Either - // way, attempt to generate another batch of results. - generateBatch(ntoreturn, cc, &bb, &numResults, &state); - } + PlanExecutor::ExecState state; - PlanSummaryStats postExecutionStats; - Explain::getSummaryStats(*exec, &postExecutionStats); - postExecutionStats.totalKeysExamined -= preExecutionStats.totalKeysExamined; - postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined; - curOp.debug().setPlanSummaryMetrics(postExecutionStats); - - // We do not report 'execStats' for aggregation or other globally managed cursors, both in - // the original request and subsequent getMore. It would be useful to have this information - // for an aggregation, but the source PlanExecutor could be destroyed before we know whether - // we need execStats and we do not want to generate for all operations due to cost. - if (!CursorManager::isGloballyManagedCursor(cursorid) && curOp.shouldDBProfile()) { - BSONObjBuilder execStatsBob; - Explain::getWinningPlanStats(exec, &execStatsBob); - curOp.debug().execStats = execStatsBob.obj(); - } + // We report keysExamined and docsExamined to OpDebug for a given getMore operation. To obtain + // these values we need to take a diff of the pre-execution and post-execution metrics, as they + // accumulate over the course of a cursor's lifetime. + PlanSummaryStats preExecutionStats; + Explain::getSummaryStats(*exec, &preExecutionStats); + if (MONGO_FAIL_POINT(legacyGetMoreWaitWithCursor)) { + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &legacyGetMoreWaitWithCursor, opCtx, "legacyGetMoreWaitWithCursor", nullptr); + } - // Our two possible ClientCursorPin cleanup paths are: - // 1) If the cursor is not going to be saved, we call deleteUnderlying() on the pin. - // 2) If the cursor is going to be saved, we simply let the pin go out of scope. In this - // case, the pin's destructor will be invoked, which will call release() on the pin. - // Because our ClientCursorPin is declared after our lock is declared, this will happen - // under the lock if any locking was necessary. - if (!shouldSaveCursorGetMore(state, exec, cc->isTailable())) { - ccPin.getValue().deleteUnderlying(); - - // cc is now invalid, as is the executor - cursorid = 0; - cc = nullptr; - curOp.debug().cursorExhausted = true; - - LOG(5) << "getMore NOT saving client cursor, ended with state " - << PlanExecutor::statestr(state); - } else { - // Continue caching the ClientCursor. - cc->incNReturnedSoFar(numResults); - cc->incNBatches(); - exec->saveState(); - exec->detachFromOperationContext(); - LOG(5) << "getMore saving client cursor ended with state " - << PlanExecutor::statestr(state); - - *exhaust = cc->queryOptions() & QueryOption_Exhaust; - - // We assume that cursors created through a DBDirectClient are always used from their - // original OperationContext, so we do not need to move time to and from the cursor. - if (!opCtx->getClient()->isInDirectClient()) { - // If the getmore had a time limit, remaining time is "rolled over" back to the - // cursor (for use by future getmore ops). - cc->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); - } + generateBatch(ntoreturn, cursorPin.getCursor(), &bb, &numResults, &state); + + // If this is an await data cursor, and we hit EOF without generating any results, then we block + // waiting for new data to arrive. + if (cursorPin->isAwaitData() && state == PlanExecutor::IS_EOF && numResults == 0) { + // Save the PlanExecutor and drop our locks. + exec->saveState(); + readLock.reset(); + + // Block waiting for data for up to 1 second. Time spent blocking is not counted towards the + // total operation latency. + curOp.pauseTimer(); + Seconds timeout(1); + notifier->waitUntil(notifierVersion, + opCtx->getServiceContext()->getPreciseClockSource()->now() + timeout); + notifier.reset(); + curOp.resumeTimer(); + + // Reacquiring locks. + readLock.emplace(opCtx, nss); + exec->restoreState(); + + // We woke up because either the timed_wait expired, or there was more data. Either way, + // attempt to generate another batch of results. + generateBatch(ntoreturn, cursorPin.getCursor(), &bb, &numResults, &state); + } + + PlanSummaryStats postExecutionStats; + Explain::getSummaryStats(*exec, &postExecutionStats); + postExecutionStats.totalKeysExamined -= preExecutionStats.totalKeysExamined; + postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined; + curOp.debug().setPlanSummaryMetrics(postExecutionStats); + + // We do not report 'execStats' for aggregation or other cursors with the 'kLocksInternally' + // policy, both in the original request and subsequent getMore. It would be useful to have this + // info for an aggregation, but the source PlanExecutor could be destroyed before we know if we + // need 'execStats' and we do not want to generate the stats eagerly for all operations due to + // cost. + if (cursorPin->lockPolicy() != ClientCursorParams::LockPolicy::kLocksInternally && + curOp.shouldDBProfile()) { + BSONObjBuilder execStatsBob; + Explain::getWinningPlanStats(exec, &execStatsBob); + curOp.debug().execStats = execStatsBob.obj(); + } + + // Our two possible ClientCursorPin cleanup paths are: + // 1) If the cursor is not going to be saved, we call deleteUnderlying() on the pin. + // 2) If the cursor is going to be saved, we simply let the pin go out of scope. In this case, + // the pin's destructor will be invoked, which will call release() on the pin. Because our + // ClientCursorPin is declared after our lock is declared, this will happen under the lock if + // any locking was necessary. + if (!shouldSaveCursorGetMore(state, exec, cursorPin->isTailable())) { + cursorPin.deleteUnderlying(); + + // cc is now invalid, as is the executor + cursorid = 0; + curOp.debug().cursorExhausted = true; + + LOG(5) << "getMore NOT saving client cursor, ended with state " + << PlanExecutor::statestr(state); + } else { + // Continue caching the ClientCursor. + cursorPin->incNReturnedSoFar(numResults); + cursorPin->incNBatches(); + exec->saveState(); + exec->detachFromOperationContext(); + LOG(5) << "getMore saving client cursor ended with state " << PlanExecutor::statestr(state); + + *exhaust = cursorPin->queryOptions() & QueryOption_Exhaust; + + // We assume that cursors created through a DBDirectClient are always used from their + // original OperationContext, so we do not need to move time to and from the cursor. + if (!opCtx->getClient()->isInDirectClient()) { + // If the getmore had a time limit, remaining time is "rolled over" back to the cursor + // (for use by future getmore ops). + cursorPin->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); } } @@ -676,13 +669,14 @@ std::string runQuery(OperationContext* opCtx, const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); // Allocate a new ClientCursor and register it with the cursor manager. - ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor( + ClientCursorPin pinnedCursor = CursorManager::getGlobalCursorManager()->registerCursor( opCtx, {std::move(exec), nss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), readConcernArgs, - upconvertedQuery}); + upconvertedQuery, + ClientCursorParams::LockPolicy::kLockExternally}); ccId = pinnedCursor.getCursor()->cursorid(); LOG(5) << "caching executor with cursorid " << ccId << " after returning " << numResults |