diff options
Diffstat (limited to 'src/mongo/db/query/find.cpp')
-rw-r--r-- | src/mongo/db/query/find.cpp | 83 |
1 files changed, 42 insertions, 41 deletions
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index f60bbe33ae5..d8c9feb8fde 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -80,7 +80,7 @@ bool isCursorAwaitData(const ClientCursor* cursor) { return cursor->queryOptions() & QueryOption_AwaitData; } -bool shouldSaveCursor(OperationContext* txn, +bool shouldSaveCursor(OperationContext* opCtx, const Collection* collection, PlanExecutor::ExecState finalState, PlanExecutor* exec) { @@ -100,7 +100,7 @@ bool shouldSaveCursor(OperationContext* txn, // an empty collection. Right now we do not keep a cursor if the collection // has zero records. if (qr.isTailable()) { - return collection && collection->numRecords(txn) != 0U; + return collection && collection->numRecords(opCtx) != 0U; } return !exec->isEOF(); @@ -120,25 +120,25 @@ bool shouldSaveCursorGetMore(PlanExecutor::ExecState finalState, return !exec->isEOF(); } -void beginQueryOp(OperationContext* txn, +void beginQueryOp(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& queryObj, long long ntoreturn, long long ntoskip) { - auto curOp = CurOp::get(txn); + auto curOp = CurOp::get(opCtx); curOp->debug().ntoreturn = ntoreturn; curOp->debug().ntoskip = ntoskip; - stdx::lock_guard<Client> lk(*txn->getClient()); + stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp->setQuery_inlock(queryObj); curOp->setNS_inlock(nss.ns()); } -void endQueryOp(OperationContext* txn, +void endQueryOp(OperationContext* opCtx, Collection* collection, const PlanExecutor& exec, long long numResults, CursorId cursorId) { - auto curOp = CurOp::get(txn); + auto curOp = CurOp::get(opCtx); // Fill out basic CurOp query exec properties. curOp->debug().nreturned = numResults; @@ -151,7 +151,7 @@ void endQueryOp(OperationContext* txn, curOp->debug().setPlanSummaryMetrics(summaryStats); if (collection) { - collection->infoCache()->notifyOfQuery(txn, summaryStats.indexesUsed); + collection->infoCache()->notifyOfQuery(opCtx, summaryStats.indexesUsed); } if (curOp->shouldDBProfile()) { @@ -219,7 +219,7 @@ void generateBatch(int ntoreturn, /** * Called by db/instance.cpp. This is the getMore entry point. */ -Message getMore(OperationContext* txn, +Message getMore(OperationContext* opCtx, const char* ns, int ntoreturn, long long cursorid, @@ -227,7 +227,7 @@ Message getMore(OperationContext* txn, bool* isCursorAuthorized) { invariant(ntoreturn >= 0); - CurOp& curOp = *CurOp::get(txn); + CurOp& curOp = *CurOp::get(opCtx); // For testing, we may want to fail if we receive a getmore. if (MONGO_FAIL_POINT(failReceivedGetmore)) { @@ -267,7 +267,7 @@ Message getMore(OperationContext* txn, // the data within a collection. cursorManager = CursorManager::getGlobalCursorManager(); } else { - ctx = stdx::make_unique<AutoGetCollectionOrViewForRead>(txn, nss); + ctx = stdx::make_unique<AutoGetCollectionOrViewForRead>(opCtx, nss); auto viewCtx = static_cast<AutoGetCollectionOrViewForRead*>(ctx.get()); if (viewCtx->getView()) { uasserted( @@ -290,7 +290,7 @@ Message getMore(OperationContext* txn, // reads are allowed is PRIMARY (or master in master/slave). This function uasserts if // reads are not okay. Status status = - repl::getGlobalReplicationCoordinator()->checkCanServeReadsFor_UNSAFE(txn, nss, true); + repl::getGlobalReplicationCoordinator()->checkCanServeReadsFor_UNSAFE(opCtx, nss, true); uassertStatusOK(status); // A pin performs a CC lookup and if there is a CC, increments the CC's pin value so it @@ -328,7 +328,7 @@ Message getMore(OperationContext* txn, *isCursorAuthorized = true; if (cc->isReadCommitted()) - uassertStatusOK(txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); + uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); // Reset timeout timer on the cursor since the cursor is still in use. cc->resetIdleTime(); @@ -338,12 +338,12 @@ Message getMore(OperationContext* txn, if (cc->getLeftoverMaxTimeMicros() < Microseconds::max()) { uassert(40136, "Illegal attempt to set operation deadline within DBDirectClient", - !txn->getClient()->isInDirectClient()); - txn->setDeadlineAfterNowBy(cc->getLeftoverMaxTimeMicros()); + !opCtx->getClient()->isInDirectClient()); + opCtx->setDeadlineAfterNowBy(cc->getLeftoverMaxTimeMicros()); } - txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. + opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. - cc->updateSlaveLocation(txn); + cc->updateSlaveLocation(opCtx); if (cc->isAggCursor()) { // Agg cursors handle their own locking internally. @@ -372,12 +372,12 @@ Message getMore(OperationContext* txn, } PlanExecutor* exec = cc->getExecutor(); - exec->reattachToOperationContext(txn); + exec->reattachToOperationContext(opCtx); exec->restoreState(); auto planSummary = Explain::getPlanSummary(exec); { - stdx::lock_guard<Client> lk(*txn->getClient()); + stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp.setPlanSummary_inlock(planSummary); // Ensure that the original query object is available in the slow query log, profiler @@ -414,7 +414,7 @@ Message getMore(OperationContext* txn, curOp.setExpectedLatencyMs(durationCount<Milliseconds>(timeout)); // Reacquiring locks. - ctx = make_unique<AutoGetCollectionForRead>(txn, nss); + ctx = make_unique<AutoGetCollectionForRead>(opCtx, nss); exec->restoreState(); // We woke up because either the timed_wait expired, or there was more data. Either @@ -449,8 +449,9 @@ Message getMore(OperationContext* txn, // if the cursor is aggregation, we release these locks. if (cc->isAggCursor()) { invariant(NULL == ctx.get()); - unpinDBLock = make_unique<Lock::DBLock>(txn->lockState(), nss.db(), MODE_IS); - unpinCollLock = make_unique<Lock::CollectionLock>(txn->lockState(), nss.ns(), MODE_IS); + unpinDBLock = make_unique<Lock::DBLock>(opCtx->lockState(), nss.db(), MODE_IS); + unpinCollLock = + make_unique<Lock::CollectionLock>(opCtx->lockState(), nss.ns(), MODE_IS); } // Our two possible ClientCursorPin cleanup paths are: @@ -486,7 +487,7 @@ Message getMore(OperationContext* txn, // If the getmore had a time limit, remaining time is "rolled over" back to the // cursor (for use by future getmore ops). - cc->setLeftoverMaxTimeMicros(txn->getRemainingMaxTimeMicros()); + cc->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); } } @@ -501,11 +502,11 @@ Message getMore(OperationContext* txn, return Message(bb.release()); } -std::string runQuery(OperationContext* txn, +std::string runQuery(OperationContext* opCtx, QueryMessage& q, const NamespaceString& nss, Message& result) { - CurOp& curOp = *CurOp::get(txn); + CurOp& curOp = *CurOp::get(opCtx); uassert(ErrorCodes::InvalidNamespace, str::stream() << "Invalid ns [" << nss.ns() << "]", @@ -513,11 +514,11 @@ std::string runQuery(OperationContext* txn, invariant(!nss.isCommand()); // Set CurOp information. - beginQueryOp(txn, nss, q.query, q.ntoreturn, q.ntoskip); + beginQueryOp(opCtx, nss, q.query, q.ntoreturn, q.ntoskip); // Parse the qm into a CanonicalQuery. - auto statusWithCQ = CanonicalQuery::canonicalize(txn, q, ExtensionsCallbackReal(txn, &nss)); + auto statusWithCQ = CanonicalQuery::canonicalize(opCtx, q, ExtensionsCallbackReal(opCtx, &nss)); if (!statusWithCQ.isOK()) { uasserted(17287, str::stream() << "Can't canonicalize query: " @@ -530,7 +531,7 @@ std::string runQuery(OperationContext* txn, LOG(2) << "Running query: " << redact(cq->toStringShort()); // Parse, canonicalize, plan, transcribe, and get a plan executor. - AutoGetCollectionOrViewForRead ctx(txn, nss); + AutoGetCollectionOrViewForRead ctx(opCtx, nss); Collection* collection = ctx.getCollection(); if (ctx.getView()) { @@ -544,7 +545,7 @@ std::string runQuery(OperationContext* txn, // We have a parsed query. Time to get the execution plan for it. std::unique_ptr<PlanExecutor> exec = uassertStatusOK( - getExecutorFind(txn, collection, nss, std::move(cq), PlanExecutor::YIELD_AUTO)); + getExecutorFind(opCtx, collection, nss, std::move(cq), PlanExecutor::YIELD_AUTO)); const QueryRequest& qr = exec->getCanonicalQuery()->getQueryRequest(); @@ -578,15 +579,15 @@ std::string runQuery(OperationContext* txn, if (qr.getMaxTimeMS() > 0) { uassert(40116, "Illegal attempt to set operation deadline within DBDirectClient", - !txn->getClient()->isInDirectClient()); - txn->setDeadlineAfterNowBy(Milliseconds{qr.getMaxTimeMS()}); + !opCtx->getClient()->isInDirectClient()); + opCtx->setDeadlineAfterNowBy(Milliseconds{qr.getMaxTimeMS()}); } - txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. + opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. // uassert if we are not on a primary, and not a secondary with SlaveOk query parameter set. bool slaveOK = qr.isSlaveOk() || qr.hasReadPref(); Status serveReadsStatus = - repl::getGlobalReplicationCoordinator()->checkCanServeReadsFor_UNSAFE(txn, nss, slaveOK); + repl::getGlobalReplicationCoordinator()->checkCanServeReadsFor_UNSAFE(opCtx, nss, slaveOK); uassertStatusOK(serveReadsStatus); // Run the query. @@ -607,7 +608,7 @@ std::string runQuery(OperationContext* txn, // Get summary info about which plan the executor is using. { - stdx::lock_guard<Client> lk(*txn->getClient()); + stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp.setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); } @@ -657,14 +658,14 @@ std::string runQuery(OperationContext* txn, // Before saving the cursor, ensure that whatever plan we established happened with the expected // collection version - auto css = CollectionShardingState::get(txn, nss); - css->checkShardVersionOrThrow(txn); + auto css = CollectionShardingState::get(opCtx, nss); + css->checkShardVersionOrThrow(opCtx); // Fill out CurOp based on query results. If we have a cursorid, we will fill out CurOp with // this cursorid later. long long ccId = 0; - if (shouldSaveCursor(txn, collection, state, exec.get())) { + if (shouldSaveCursor(opCtx, collection, state, exec.get())) { // We won't use the executor until it's getMore'd. exec->saveState(); exec->detachFromOperationContext(); @@ -673,7 +674,7 @@ std::string runQuery(OperationContext* txn, ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor( {exec.release(), nss.ns(), - txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), qr.getOptions(), upconvertQueryEntry(q.query, qr.nss(), q.ntoreturn, q.ntoskip)}); ccId = pinnedCursor.getCursor()->cursorid(); @@ -695,12 +696,12 @@ std::string runQuery(OperationContext* txn, // If the query had a time limit, remaining time is "rolled over" to the cursor (for // use by future getmore ops). - pinnedCursor.getCursor()->setLeftoverMaxTimeMicros(txn->getRemainingMaxTimeMicros()); + pinnedCursor.getCursor()->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); - endQueryOp(txn, collection, *pinnedCursor.getCursor()->getExecutor(), numResults, ccId); + endQueryOp(opCtx, collection, *pinnedCursor.getCursor()->getExecutor(), numResults, ccId); } else { LOG(5) << "Not caching executor but returning " << numResults << " results."; - endQueryOp(txn, collection, *exec, numResults, ccId); + endQueryOp(opCtx, collection, *exec, numResults, ccId); } // Fill out the output buffer's header. |