diff options
Diffstat (limited to 'src')
42 files changed, 782 insertions, 723 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index a86944badf3..de7d36b205d 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -229,7 +229,7 @@ Collection::Collection(OperationContext* opCtx, parseValidationAction(_details->getCollectionOptions(opCtx).validationAction))), _validationLevel(uassertStatusOK( parseValidationLevel(_details->getCollectionOptions(opCtx).validationLevel))), - _cursorManager(fullNS), + _cursorManager(_ns), _cappedNotifier(_recordStore->isCapped() ? new CappedInsertNotifier() : nullptr), _mustTakeCappedLockOnInsert(isCapped() && !_ns.isSystemDotProfile() && !_ns.isOplog()) { diff --git a/src/mongo/db/catalog/cursor_manager.cpp b/src/mongo/db/catalog/cursor_manager.cpp index b2079e9e03e..25f0be7edec 100644 --- a/src/mongo/db/catalog/cursor_manager.cpp +++ b/src/mongo/db/catalog/cursor_manager.cpp @@ -50,42 +50,23 @@ namespace mongo { -using std::string; using std::vector; namespace { -unsigned idFromCursorId(CursorId id) { +uint32_t idFromCursorId(CursorId id) { uint64_t x = static_cast<uint64_t>(id); x = x >> 32; - return static_cast<unsigned>(x); + return static_cast<uint32_t>(x); } -CursorId cursorIdFromParts(unsigned collection, unsigned cursor) { - CursorId x = static_cast<CursorId>(collection) << 32; +CursorId cursorIdFromParts(uint32_t collectionIdentifier, uint32_t cursor) { + // The leading two bits of a non-global CursorId should be 0. + invariant((collectionIdentifier & (0b11 << 30)) == 0); + CursorId x = static_cast<CursorId>(collectionIdentifier) << 32; x |= cursor; return x; } - -class IdWorkTest : public StartupTest { -public: - void _run(unsigned a, unsigned b) { - CursorId x = cursorIdFromParts(a, b); - invariant(a == idFromCursorId(x)); - CursorId y = cursorIdFromParts(a, b + 1); - invariant(x != y); - } - - void run() { - _run(123, 456); - _run(0xdeadbeef, 0xcafecafe); - _run(0, 0); - _run(99999999, 999); - _run(0xFFFFFFFF, 1); - _run(0xFFFFFFFF, 0); - _run(0xFFFFFFFF, 0xFFFFFFFF); - } -} idWorkTest; -} +} // namespace class GlobalCursorIdCache { public: @@ -93,16 +74,16 @@ public: ~GlobalCursorIdCache(); /** - * this gets called when a CursorManager gets created - * @return the id the CursorManager should use when generating - * cursor ids + * Returns a unique 32-bit identifier to be used as the first 32 bits of all cursor ids for a + * new CursorManager. */ - unsigned created(const std::string& ns); + uint32_t registerCursorManager(const NamespaceString& nss); /** - * called by CursorManager when its going away + * Must be called when a CursorManager is deleted. 'id' must be the identifier returned by + * registerCursorManager(). */ - void destroyed(unsigned id, const std::string& ns); + void deregisterCursorManager(uint32_t id, const NamespaceString& nss); /** * works globally @@ -118,8 +99,8 @@ public: private: SimpleMutex _mutex; - typedef unordered_map<unsigned, string> Map; - Map _idToNS; + typedef unordered_map<unsigned, NamespaceString> Map; + Map _idToNss; unsigned _nextId; std::unique_ptr<SecureRandom> _secureRandom; @@ -137,7 +118,7 @@ MONGO_INITIALIZER(GlobalCursorIdCache)(InitializerContext* context) { MONGO_INITIALIZER_WITH_PREREQUISITES(GlobalCursorManager, ("GlobalCursorIdCache")) (InitializerContext* context) { - globalCursorManager.reset(new CursorManager("")); + globalCursorManager.reset(new CursorManager({})); return Status::OK(); } @@ -152,56 +133,57 @@ int64_t GlobalCursorIdCache::nextSeed() { return _secureRandom->nextInt64(); } -unsigned GlobalCursorIdCache::created(const std::string& ns) { - static const unsigned MAX_IDS = 1000 * 1000 * 1000; +uint32_t GlobalCursorIdCache::registerCursorManager(const NamespaceString& nss) { + static const uint32_t kMaxIds = 1000 * 1000 * 1000; + static_assert((kMaxIds & (0b11 << 30)) == 0, + "the first two bits of a collection identifier must always be zeroes"); stdx::lock_guard<SimpleMutex> lk(_mutex); - fassert(17359, _idToNS.size() < MAX_IDS); + fassert(17359, _idToNss.size() < kMaxIds); - for (unsigned i = 0; i <= MAX_IDS; i++) { - unsigned id = ++_nextId; + for (uint32_t i = 0; i <= kMaxIds; i++) { + uint32_t id = ++_nextId; if (id == 0) continue; - if (_idToNS.count(id) > 0) + if (_idToNss.count(id) > 0) continue; - _idToNS[id] = ns; + _idToNss[id] = nss; return id; } - invariant(false); + MONGO_UNREACHABLE; } -void GlobalCursorIdCache::destroyed(unsigned id, const std::string& ns) { +void GlobalCursorIdCache::deregisterCursorManager(uint32_t id, const NamespaceString& nss) { stdx::lock_guard<SimpleMutex> lk(_mutex); - invariant(ns == _idToNS[id]); - _idToNS.erase(id); + invariant(nss == _idToNss[id]); + _idToNss.erase(id); } bool GlobalCursorIdCache::eraseCursor(OperationContext* opCtx, CursorId id, bool checkAuth) { // Figure out what the namespace of this cursor is. - std::string ns; - if (globalCursorManager->ownsCursorId(id)) { - auto pin = globalCursorManager.get()->pinCursor(id); + NamespaceString nss; + if (CursorManager::isGloballyManagedCursor(id)) { + auto pin = globalCursorManager->pinCursor(id); if (!pin.isOK()) { invariant(pin == ErrorCodes::CursorNotFound); // No such cursor. TODO: Consider writing to audit log here (even though we don't // have a namespace). return false; } - ns = pin.getValue().getCursor()->ns(); + nss = pin.getValue().getCursor()->nss(); } else { stdx::lock_guard<SimpleMutex> lk(_mutex); - unsigned nsid = idFromCursorId(id); - Map::const_iterator it = _idToNS.find(nsid); - if (it == _idToNS.end()) { + uint32_t nsid = idFromCursorId(id); + Map::const_iterator it = _idToNss.find(nsid); + if (it == _idToNss.end()) { // No namespace corresponding to this cursor id prefix. TODO: Consider writing to // audit log here (even though we don't have a namespace). return false; } - ns = it->second; + nss = it->second; } - const NamespaceString nss(ns); invariant(nss.isValid()); // Check if we are authorized to erase this cursor. @@ -215,7 +197,7 @@ bool GlobalCursorIdCache::eraseCursor(OperationContext* opCtx, CursorId id, bool } // If this cursor is owned by the global cursor manager, ask it to erase the cursor for us. - if (globalCursorManager->ownsCursorId(id)) { + if (CursorManager::isGloballyManagedCursor(id)) { Status eraseStatus = globalCursorManager->eraseCursor(opCtx, id, checkAuth); massert(28697, eraseStatus.reason(), @@ -250,16 +232,11 @@ std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* opCtx, int mil totalTimedOut += globalCursorManager->timeoutCursors(millisSinceLastCall); // Compute the set of collection names that we have to time out cursors for. - vector<string> todo; + vector<NamespaceString> todo; { stdx::lock_guard<SimpleMutex> lk(_mutex); - for (Map::const_iterator i = _idToNS.begin(); i != _idToNS.end(); ++i) { - if (globalCursorManager->ownsCursorId(cursorIdFromParts(i->first, 0))) { - // Skip the global cursor manager, since we handle it above (and it's not - // associated with a collection). - continue; - } - todo.push_back(i->second); + for (auto&& entry : _idToNss) { + todo.push_back(entry.second); } } @@ -314,14 +291,19 @@ bool CursorManager::eraseCursorGlobal(OperationContext* opCtx, CursorId id) { // -------------------------- -CursorManager::CursorManager(StringData ns) : _nss(ns) { - _collectionCacheRuntimeId = globalCursorIdCache->created(_nss.ns()); +CursorManager::CursorManager(NamespaceString nss) : _nss(std::move(nss)) { + if (!isGlobalManager()) { + // Generate a unique id for this collection. + _collectionCacheRuntimeId = globalCursorIdCache->registerCursorManager(_nss); + } _random.reset(new PseudoRandom(globalCursorIdCache->nextSeed())); } CursorManager::~CursorManager() { invalidateAll(true, "collection going away"); - globalCursorIdCache->destroyed(_collectionCacheRuntimeId, _nss.ns()); + if (!isGlobalManager()) { + globalCursorIdCache->deregisterCursorManager(_collectionCacheRuntimeId, _nss); + } } void CursorManager::invalidateAll(bool collectionGoingAway, const std::string& reason) { @@ -371,11 +353,8 @@ void CursorManager::invalidateAll(bool collectionGoingAway, const std::string& r continue; } - if (cc->_isPinned || cc->isAggCursor()) { - // Pinned cursors need to stay alive, so we leave them around. Aggregation - // cursors also can stay alive (since they don't have their lifetime bound to - // the underlying collection). However, if they have an associated executor, we - // need to kill it, because it's now invalid. + if (cc->_isPinned) { + // Pinned cursors need to stay alive, so we leave them around. if (cc->getExecutor()) cc->getExecutor()->kill(reason); newMap.insert(*i); @@ -484,10 +463,6 @@ void CursorManager::unpin(ClientCursor* cursor) { cursor->_isPinned = false; } -bool CursorManager::ownsCursorId(CursorId cursorId) const { - return _collectionCacheRuntimeId == idFromCursorId(cursorId); -} - void CursorManager::getCursorIds(std::set<CursorId>* openCursors) const { stdx::lock_guard<SimpleMutex> lk(_mutex); @@ -504,19 +479,32 @@ size_t CursorManager::numCursors() const { CursorId CursorManager::_allocateCursorId_inlock() { for (int i = 0; i < 10000; i++) { - unsigned mypart = static_cast<unsigned>(_random->nextInt32()); - CursorId id = cursorIdFromParts(_collectionCacheRuntimeId, mypart); + // The leading two bits of a CursorId are used to determine if the cursor is registered on + // the global cursor manager. + CursorId id; + if (isGlobalManager()) { + // This is the global cursor manager, so generate a random number and make sure the + // first two bits are 01. + uint64_t mask = 0x3FFFFFFFFFFFFFFF; + uint64_t bitToSet = 1ULL << 62; + id = ((_random->nextInt64() & mask) | bitToSet); + } else { + // The first 2 bits are 0, the next 30 bits are the collection identifier, the next 32 + // bits are random. + uint32_t myPart = static_cast<uint32_t>(_random->nextInt32()); + id = cursorIdFromParts(_collectionCacheRuntimeId, myPart); + } if (_cursors.count(id) == 0) return id; } fassertFailed(17360); } -ClientCursorPin CursorManager::registerCursor(const ClientCursorParams& cursorParams) { +ClientCursorPin CursorManager::registerCursor(ClientCursorParams&& cursorParams) { stdx::lock_guard<SimpleMutex> lk(_mutex); CursorId cursorId = _allocateCursorId_inlock(); std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor( - new ClientCursor(cursorParams, this, cursorId)); + new ClientCursor(std::move(cursorParams), this, cursorId)); return _registerCursor_inlock(std::move(clientCursor)); } diff --git a/src/mongo/db/catalog/cursor_manager.h b/src/mongo/db/catalog/cursor_manager.h index ad4289d4f38..7e4da59b626 100644 --- a/src/mongo/db/catalog/cursor_manager.h +++ b/src/mongo/db/catalog/cursor_manager.h @@ -73,7 +73,7 @@ class PlanExecutor; */ class CursorManager { public: - CursorManager(StringData ns); + CursorManager(NamespaceString nss); /** * Destroys the CursorManager. Managed cursors which are not pinned are destroyed. Ownership of @@ -124,7 +124,7 @@ public: * Constructs a new ClientCursor according to the given 'cursorParams'. The cursor is atomically * registered with the manager and returned in pinned state. */ - ClientCursorPin registerCursor(const ClientCursorParams& cursorParams); + ClientCursorPin registerCursor(ClientCursorParams&& cursorParams); /** * Constructs and pins a special ClientCursor used to track sharding state for the given @@ -153,15 +153,6 @@ public: */ Status eraseCursor(OperationContext* opCtx, CursorId id, bool shouldAudit); - /** - * Returns true if the space of cursor ids that cursor manager is responsible for includes - * the given cursor id. Otherwise, returns false. - * - * The return value of this method does not indicate any information about whether or not a - * cursor actually exists with the given cursor id. - */ - bool ownsCursorId(CursorId cursorId) const; - void getCursorIds(std::set<CursorId>* openCursors) const; /** @@ -172,6 +163,17 @@ public: static CursorManager* getGlobalCursorManager(); + /** + * Returns true if this CursorId would be registered with the global CursorManager. Note that if + * this method returns true it does not imply the cursor exists. + */ + static bool isGloballyManagedCursor(CursorId cursorId) { + // The first two bits are 01 for globally managed cursors, and 00 for cursors owned by a + // collection. The leading bit is always 0 so that CursorIds do not appear as negative. + const long long mask = static_cast<long long>(0b11) << 62; + return (cursorId & mask) == (static_cast<long long>(0b01) << 62); + } + static int eraseCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* ids); static bool eraseCursorGlobalIfAuthorized(OperationContext* opCtx, CursorId id); @@ -196,8 +198,12 @@ private: void unpin(ClientCursor* cursor); + bool isGlobalManager() const { + return _nss.isEmpty(); + } + NamespaceString _nss; - unsigned _collectionCacheRuntimeId; + uint32_t _collectionCacheRuntimeId; std::unique_ptr<PseudoRandom> _random; mutable SimpleMutex _mutex; diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index 708a2bc38f7..70003c2da2b 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -77,17 +77,16 @@ long long ClientCursor::totalOpen() { return cursorStatsOpen.get(); } -ClientCursor::ClientCursor(const ClientCursorParams& params, +ClientCursor::ClientCursor(ClientCursorParams&& params, CursorManager* cursorManager, CursorId cursorId) : _cursorid(cursorId), - _ns(params.ns), + _nss(std::move(params.nss)), _isReadCommitted(params.isReadCommitted), _cursorManager(cursorManager), - _query(params.query), - _queryOptions(params.qopts), - _isAggCursor(params.isAggCursor) { - _exec.reset(params.exec); + _originatingCommand(params.originatingCommandObj), + _queryOptions(params.queryOptions), + _exec(std::move(params.exec)) { init(); } @@ -95,7 +94,7 @@ ClientCursor::ClientCursor(const Collection* collection, CursorManager* cursorManager, CursorId cursorId) : _cursorid(cursorId), - _ns(collection->ns().ns()), + _nss(collection->ns()), _cursorManager(cursorManager), _queryOptions(QueryOption_NoCursorTimeout) { init(); @@ -106,10 +105,9 @@ void ClientCursor::init() { cursorStatsOpen.increment(); - if (_queryOptions & QueryOption_NoCursorTimeout) { + if (isNoTimeout()) { // cursors normally timeout after an inactivity period to prevent excess memory use // setting this prevents timeout of the cursor in question. - _isNoTimeout = true; cursorStatsOpenNoTimeout.increment(); } } @@ -120,7 +118,7 @@ ClientCursor::~ClientCursor() { invariant(!_cursorManager); cursorStatsOpen.decrement(); - if (_isNoTimeout) { + if (isNoTimeout()) { cursorStatsOpenNoTimeout.decrement(); } } @@ -138,7 +136,7 @@ void ClientCursor::kill() { bool ClientCursor::shouldTimeout(int millis) { _idleAgeMillis += millis; - if (_isNoTimeout || _isPinned) { + if (isNoTimeout() || _isPinned) { return false; } return _idleAgeMillis > cursorTimeoutMillis.load(); @@ -152,7 +150,7 @@ void ClientCursor::updateSlaveLocation(OperationContext* opCtx) { if (_slaveReadTill.isNull()) return; - verify(str::startsWith(_ns.c_str(), "local.oplog.")); + verify(_nss.isOplog()); Client* c = opCtx->getClient(); verify(c); @@ -221,7 +219,7 @@ void ClientCursorPin::release() { if (!_cursor->_cursorManager) { // The ClientCursor was killed while we had it. Therefore, it is our responsibility to - // kill it. + // delete it. deleteUnderlying(); } else { // Unpin the cursor under the collection cursor manager lock. diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 44ae4600ac6..91e7a0d325c 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -28,6 +28,7 @@ #pragma once +#include "mongo/client/dbclientinterface.h" #include "mongo/db/cursor_id.h" #include "mongo/db/jsobj.h" #include "mongo/db/query/plan_executor.h" @@ -42,30 +43,30 @@ class CursorManager; class RecoveryUnit; /** - * Parameters used for constructing a ClientCursor. ClientCursors cannot be constructed in - * isolation, but rather must be constructed and managed using a CursorManager. See cursor_manager.h - * for more details. + * Parameters used for constructing a ClientCursor. Makes an owned copy of 'originatingCommandObj' + * to be used across getMores. + * + * ClientCursors cannot be constructed in isolation, but rather must be + * constructed and managed using a CursorManager. See cursor_manager.h for more details. */ struct ClientCursorParams { - ClientCursorParams(PlanExecutor* exec, - std::string ns, + ClientCursorParams(std::unique_ptr<PlanExecutor> planExecutor, + NamespaceString nss, bool isReadCommitted, - int qopts = 0, - const BSONObj query = BSONObj(), - bool isAggCursor = false) - : exec(exec), - ns(std::move(ns)), + BSONObj originatingCommandObj) + : exec(std::move(planExecutor)), + nss(std::move(nss)), isReadCommitted(isReadCommitted), - qopts(qopts), - query(query), - isAggCursor(isAggCursor) {} + queryOptions(exec->getCanonicalQuery() + ? exec->getCanonicalQuery()->getQueryRequest().getOptions() + : 0), + originatingCommandObj(originatingCommandObj.getOwned()) {} - PlanExecutor* exec = nullptr; - const std::string ns; + std::unique_ptr<PlanExecutor> exec; + const NamespaceString nss; bool isReadCommitted = false; - int qopts = 0; - const BSONObj query = BSONObj(); - bool isAggCursor = false; + int queryOptions = 0; + BSONObj originatingCommandObj; }; /** @@ -92,18 +93,14 @@ public: return _cursorid; } - std::string ns() const { - return _ns; + const NamespaceString& nss() const { + return _nss; } bool isReadCommitted() const { return _isReadCommitted; } - bool isAggCursor() const { - return _isAggCursor; - } - PlanExecutor* getExecutor() const { return _exec.get(); } @@ -112,8 +109,8 @@ public: return _queryOptions; } - const BSONObj& getQuery() const { - return _query; + const BSONObj& getOriginatingCommandObj() const { + return _originatingCommand; } /** @@ -223,7 +220,7 @@ private: * Constructs a ClientCursor. Since cursors must come into being registered and pinned, this is * private. See cursor_manager.h for more details. */ - ClientCursor(const ClientCursorParams& params, CursorManager* cursorManager, CursorId cursorId); + ClientCursor(ClientCursorParams&& params, CursorManager* cursorManager, CursorId cursorId); /** * Constructs a special ClientCursor used to track sharding state for the given collection. @@ -246,11 +243,15 @@ private: */ void kill(); + bool isNoTimeout() const { + return (_queryOptions & QueryOption_NoCursorTimeout); + } + // The ID of the ClientCursor. A value of 0 is used to mean that no cursor id has been assigned. CursorId _cursorid = 0; // The namespace we're operating on. - std::string _ns; + const NamespaceString _nss; const bool _isReadCommitted = false; @@ -265,21 +266,11 @@ private: // Tracks the number of results returned by this cursor so far. long long _pos = 0; - // If this cursor was created by a find operation, '_query' holds the query predicate for - // the find. If this cursor was created by a command (e.g. the aggregate command), then - // '_query' holds the command specification received from the client. - BSONObj _query; + // Holds an owned copy of the command specification received from the client. + const BSONObj _originatingCommand; // See the QueryOptions enum in dbclientinterface.h. - int _queryOptions = 0; - - // Is this ClientCursor backed by an aggregation pipeline? - // - // Agg executors differ from others in that they manage their own locking internally and - // should not be killed or destroyed when the underlying collection is deleted. - // - // Note: This should *not* be set for the internal cursor used as input to an aggregation. - const bool _isAggCursor = false; + const int _queryOptions = 0; // While a cursor is being used by a client, it is marked as "pinned". See ClientCursorPin // below. @@ -287,10 +278,6 @@ private: // Cursors always come into existence in a pinned state. bool _isPinned = true; - // Is the "no timeout" flag set on this cursor? If false, this cursor may be automatically - // deleted after an interval of inactivity. - bool _isNoTimeout = false; - // The replication position only used in master-slave. Timestamp _slaveReadTill; diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index a2deb9db62a..cd66033e427 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -203,13 +203,11 @@ Status checkCanAcceptWritesForDatabase(OperationContext* opCtx, const NamespaceS void recordStatsForTopCommand(OperationContext* opCtx) { auto curOp = CurOp::get(opCtx); - const int writeLocked = 1; - Top::get(opCtx->getClient()->getServiceContext()) .record(opCtx, curOp->getNS(), curOp->getLogicalOp(), - writeLocked, + Top::LockType::WriteLocked, curOp->elapsedMicros(), curOp->isCommand(), curOp->getReadWriteType()); diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index d43e991950c..60ae2e2eba4 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -399,11 +399,10 @@ public: // Create a ClientCursor containing this plan executor and register it with the cursor // manager. ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor( - {exec.release(), - nss.ns(), + {std::move(exec), + nss, opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), - originalQR.getOptions(), - cmdObj.getOwned()}); + cmdObj}); cursorId = pinnedCursor.getCursor()->cursorid(); invariant(!exec); diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 78c7d822404..448177a8068 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -53,6 +53,7 @@ #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/counters.h" +#include "mongo/db/stats/top.h" #include "mongo/s/chunk_version.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" @@ -154,9 +155,6 @@ public: auto curOp = CurOp::get(opCtx); curOp->debug().cursorid = request.cursorid; - // Disable shard version checking - getmore commands are always unversioned - OperationShardingState::get(opCtx).setShardVersion(request.nss, ChunkVersion::IGNORED()); - // Validate term before acquiring locks, if provided. if (request.term) { auto replCoord = repl::ReplicationCoordinator::get(opCtx); @@ -167,67 +165,47 @@ public: } } - // Depending on the type of cursor being operated on, we hold locks for the whole - // getMore, or none of the getMore, or part of the getMore. The three cases in detail: + // Cursors come in one of two flavors: + // - Cursors owned by the collection cursor manager, such as those generated via the find + // command. For these cursors, we hold the appropriate collection lock for the duration of + // the getMore using AutoGetCollectionForRead. This will automatically update the CurOp + // object appropriately and record execution time via Top upon completion. + // - Cursors owned by the global cursor manager, such as those generated via the aggregate + // command. These cursors either hold no collection state or manage their collection state + // internally, so we acquire no locks. In this case we use the AutoStatsTracker object to + // update the CurOp object appropriately and record execution time via Top upon + // completion. // - // 1) Normal cursor: we lock with "ctx" and hold it for the whole getMore. - // 2) Cursor owned by global cursor manager: we don't lock anything. These cursors - // don't own any collection state. These cursors are generated either by the - // listCollections or listIndexes commands, as these special cursor-generating commands - // operate over catalog data rather than targeting the data within a collection. - // 3) Agg cursor: we lock with "ctx", then release, then relock with "unpinDBLock" and - // "unpinCollLock". This is because agg cursors handle locking internally (hence the - // release), but the pin and unpin of the cursor must occur under the collection - // lock. We don't use our AutoGetCollectionForRead "ctx" to relock, because - // AutoGetCollectionForRead checks the sharding version (and we want the relock for - // the unpin to succeed even if the sharding version has changed). + // Thus, only one of 'readLock' and 'statsTracker' will be populated as we populate + // 'cursorManager'. // // Note that we declare our locks before our ClientCursorPin, in order to ensure that - // the pin's destructor is called before the lock destructors (so that the unpin occurs - // under the lock). - std::unique_ptr<AutoGetCollectionForReadCommand> ctx; - std::unique_ptr<Lock::DBLock> unpinDBLock; - std::unique_ptr<Lock::CollectionLock> unpinCollLock; - + // the pin's destructor is called before the lock's destructor (if there is one) so that the + // cursor cleanup can occur under the lock. + boost::optional<AutoGetCollectionForReadCommand> readLock; + boost::optional<AutoStatsTracker> statsTracker; CursorManager* cursorManager; - if (request.nss.isListIndexesCursorNS() || request.nss.isListCollectionsCursorNS()) { + + if (CursorManager::isGloballyManagedCursor(request.cursorid)) { cursorManager = CursorManager::getGlobalCursorManager(); + + if (boost::optional<NamespaceString> nssForCurOp = + request.nss.isGloballyManagedNamespace() + ? request.nss.getTargetNSForGloballyManagedNamespace() + : request.nss) { + const boost::optional<int> dbProfilingLevel = boost::none; + statsTracker.emplace( + opCtx, *nssForCurOp, Top::LockType::NotLocked, dbProfilingLevel); + } } else { - ctx = stdx::make_unique<AutoGetCollectionOrViewForReadCommand>(opCtx, request.nss); - auto viewCtx = static_cast<AutoGetCollectionOrViewForReadCommand*>(ctx.get()); - Collection* collection = ctx->getCollection(); + // getMore commands are always unversioned, so prevent AutoGetCollectionForRead from + // checking the shard version. + OperationShardingState::get(opCtx).setShardVersion(request.nss, + ChunkVersion::IGNORED()); + + readLock.emplace(opCtx, request.nss); + Collection* collection = readLock->getCollection(); if (!collection) { - // Rewrite a getMore on a view to a getMore on the original underlying collection. - // If the view no longer exists, or has been rewritten, the cursor id will be - // unknown, resulting in an appropriate error. - if (viewCtx->getView()) { - auto resolved = - viewCtx->getDb()->getViewCatalog()->resolveView(opCtx, request.nss); - if (!resolved.isOK()) { - return appendCommandStatus(result, resolved.getStatus()); - } - viewCtx->releaseLocksForView(); - - // Only one shardversion can be set at a time for an operation, so unset it - // here to allow setting it on the underlying namespace. - OperationShardingState::get(opCtx).unsetShardVersion(request.nss); - - GetMoreRequest newRequest(resolved.getValue().getNamespace(), - request.cursorid, - request.batchSize, - request.awaitDataTimeout, - request.term, - request.lastKnownCommittedOpTime); - - bool retVal = runParsed(opCtx, origNss, newRequest, cmdObj, errmsg, result); - { - // Set the namespace of the curop back to the view namespace so ctx records - // stats on this view namespace on destruction. - stdx::lock_guard<Client> lk(*opCtx->getClient()); - curOp->setNS_inlock(origNss.ns()); - } - return retVal; - } return appendCommandStatus(result, Status(ErrorCodes::OperationFailed, "collection dropped between getMore calls")); @@ -243,23 +221,24 @@ public: ClientCursor* cursor = ccPin.getValue().getCursor(); - // If the fail point is enabled, busy wait until it is disabled. We unlock and re-acquire - // the locks periodically in order to avoid deadlock (see SERVER-21997 for details). + // If the fail point is enabled, busy wait until it is disabled. while (MONGO_FAIL_POINT(keepCursorPinnedDuringGetMore)) { - invariant(ctx); - invariant(!unpinDBLock); - invariant(!unpinCollLock); - sleepFor(Milliseconds(10)); - ctx.reset(); - ctx = stdx::make_unique<AutoGetCollectionForReadCommand>(opCtx, request.nss); + if (readLock) { + // We unlock and re-acquire the locks periodically in order to avoid deadlock (see + // SERVER-21997 for details). + sleepFor(Milliseconds(10)); + readLock.reset(); + readLock.emplace(opCtx, request.nss); + } } - if (request.nss.ns() != cursor->ns()) { + if (request.nss != cursor->nss()) { return appendCommandStatus( result, Status(ErrorCodes::Unauthorized, str::stream() << "Requested getMore on namespace '" << request.nss.ns() - << "', but cursor belongs to a different namespace")); + << "', but cursor belongs to a different namespace " + << cursor->nss().ns())); } if (request.nss.isOplog() && MONGO_FAIL_POINT(rsStopGetMoreCmd)) { @@ -274,9 +253,8 @@ public: if (isCursorAwaitData(cursor)) { invariant(isCursorTailable(cursor)); - if (cursor->isAggCursor()) { - Status status(ErrorCodes::BadValue, - "awaitData cannot be set on an aggregation cursor"); + if (CursorManager::isGloballyManagedCursor(request.cursorid)) { + Status status(ErrorCodes::BadValue, "awaitData cannot be set on this cursor"); return appendCommandStatus(result, status); } } @@ -288,8 +266,7 @@ public: } // On early return, get rid of the cursor. - ScopeGuard cursorFreer = - MakeGuard(&GetMoreCmd::cleanupCursor, opCtx, &ccPin.getValue(), request); + ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &ccPin.getValue()); if (cursor->isReadCommitted()) uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); @@ -318,11 +295,6 @@ public: } opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. - if (cursor->isAggCursor()) { - // Agg cursors handle their own locking internally. - ctx.reset(); // unlocks - } - PlanExecutor* exec = cursor->getExecutor(); exec->reattachToOperationContext(opCtx); exec->restoreState(); @@ -334,7 +306,7 @@ public: // Ensure that the original query or command object is available in the slow query log, // profiler and currentOp. - auto originatingCommand = cursor->getQuery(); + auto originatingCommand = cursor->getOriginatingCommandObj(); if (!originatingCommand.isEmpty()) { curOp->setOriginatingCommand_inlock(originatingCommand); } @@ -343,12 +315,12 @@ public: uint64_t notifierVersion = 0; std::shared_ptr<CappedInsertNotifier> notifier; if (isCursorAwaitData(cursor)) { - invariant(ctx->getCollection()->isCapped()); + invariant(readLock->getCollection()->isCapped()); // Retrieve the notifier which we will wait on until new data arrives. We make sure // to do this in the lock because once we drop the lock it is possible for the // collection to become invalid. The notifier itself will outlive the collection if // the collection is dropped, as we keep a shared_ptr to it. - notifier = ctx->getCollection()->getCappedInsertNotifier(); + notifier = readLock->getCollection()->getCappedInsertNotifier(); // Must get the version before we call generateBatch in case a write comes in after // that call and before we call wait on the notifier. @@ -386,11 +358,11 @@ public: // to do this in the lock because once we drop the lock it is possible for the // collection to become invalid. The notifier itself will outlive the collection if // the collection is dropped, as we keep a shared_ptr to it. - auto notifier = ctx->getCollection()->getCappedInsertNotifier(); + auto notifier = readLock->getCollection()->getCappedInsertNotifier(); // Save the PlanExecutor and drop our locks. exec->saveState(); - ctx.reset(); + readLock.reset(); // Block waiting for data. const auto timeout = opCtx->getRemainingMaxTimeMicros(); @@ -402,7 +374,7 @@ public: // CappedInsertNotifier. curOp->setExpectedLatencyMs(durationCount<Milliseconds>(timeout)); - ctx.reset(new AutoGetCollectionForReadCommand(opCtx, request.nss)); + readLock.emplace(opCtx, request.nss); exec->restoreState(); // We woke up because either the timed_wait expired, or there was more data. Either @@ -420,11 +392,11 @@ public: postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined; curOp->debug().setPlanSummaryMetrics(postExecutionStats); - // We do not report 'execStats' for aggregation, both in the original request and - // subsequent getMore. The reason for this is that aggregation's source PlanExecutor - // could be destroyed before we know whether we need execStats and we do not want to - // generate for all operations due to cost. - if (!cursor->isAggCursor() && curOp->shouldDBProfile()) { + // We do not report 'execStats' for aggregation or other globally managed cursors, both in + // the original request and subsequent getMore. It would be useful to have this information + // for an aggregation, but the source PlanExecutor could be destroyed before we know whether + // we need execStats and we do not want to generate for all operations due to cost. + if (!CursorManager::isGloballyManagedCursor(request.cursorid) && curOp->shouldDBProfile()) { BSONObjBuilder execStatsBob; Explain::getWinningPlanStats(exec, &execStatsBob); curOp->debug().execStats = execStatsBob.obj(); @@ -448,9 +420,7 @@ public: curOp->debug().cursorExhausted = true; } - // Respond with the originally requested namespace, even if this is a getMore over a view - // that was resolved to a different backing namespace. - nextBatch.done(respondWithId, origNss.ns()); + nextBatch.done(respondWithId, request.nss.ns()); // Ensure log and profiler include the number of results returned in this getMore's response // batch. @@ -458,15 +428,6 @@ public: if (respondWithId) { cursorFreer.Dismiss(); - - // If we are operating on an aggregation cursor, then we dropped our collection lock - // earlier and need to reacquire it in order to clean up our ClientCursorPin. - if (cursor->isAggCursor()) { - invariant(NULL == ctx.get()); - unpinDBLock.reset(new Lock::DBLock(opCtx, request.nss.db(), MODE_IS)); - unpinCollLock.reset( - new Lock::CollectionLock(opCtx->lockState(), request.nss.ns(), MODE_IS)); - } } return true; @@ -554,27 +515,6 @@ public: return Status::OK(); } - /** - * Called via a ScopeGuard on early return in order to ensure that the ClientCursor gets - * cleaned up properly. - */ - static void cleanupCursor(OperationContext* opCtx, - ClientCursorPin* ccPin, - const GetMoreRequest& request) { - ClientCursor* cursor = ccPin->getCursor(); - - std::unique_ptr<Lock::DBLock> unpinDBLock; - std::unique_ptr<Lock::CollectionLock> unpinCollLock; - - if (cursor->isAggCursor()) { - unpinDBLock.reset(new Lock::DBLock(opCtx, request.nss.db(), MODE_IS)); - unpinCollLock.reset( - new Lock::CollectionLock(opCtx->lockState(), request.nss.ns(), MODE_IS)); - } - - ccPin->deleteUnderlying(); - } - } getMoreCmd; } // namespace mongo diff --git a/src/mongo/db/commands/killcursors_cmd.cpp b/src/mongo/db/commands/killcursors_cmd.cpp index c1526d990b5..a39f0dfaa42 100644 --- a/src/mongo/db/commands/killcursors_cmd.cpp +++ b/src/mongo/db/commands/killcursors_cmd.cpp @@ -35,6 +35,8 @@ #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/query/killcursors_request.h" +#include "mongo/db/stats/top.h" +#include "mongo/util/scopeguard.h" namespace mongo { @@ -48,34 +50,51 @@ private: Status _killCursor(OperationContext* opCtx, const NamespaceString& nss, CursorId cursorId) final { - std::unique_ptr<AutoGetCollectionOrViewForReadCommand> ctx; - + // Cursors come in one of two flavors: + // - Cursors owned by the collection cursor manager, such as those generated via the find + // command. For these cursors, we hold the appropriate collection lock for the duration of + // the getMore using AutoGetCollectionForRead. This will automatically update the CurOp + // object appropriately and record execution time via Top upon completion. + // - Cursors owned by the global cursor manager, such as those generated via the aggregate + // command. These cursors either hold no collection state or manage their collection state + // internally, so we acquire no locks. In this case we use the AutoStatsTracker object to + // update the CurOp object appropriately and record execution time via Top upon + // completion. + // + // Thus, exactly one of 'readLock' and 'statsTracker' will be populated as we populate + // 'cursorManager'. + boost::optional<AutoGetCollectionForReadCommand> readLock; + boost::optional<AutoStatsTracker> statsTracker; CursorManager* cursorManager; - if (nss.isListIndexesCursorNS() || nss.isListCollectionsCursorNS()) { - // listCollections and listIndexes are special cursor-generating commands whose cursors - // are managed globally, as they operate over catalog data rather than targeting the - // data within a collection. + + if (CursorManager::isGloballyManagedCursor(cursorId)) { cursorManager = CursorManager::getGlobalCursorManager(); - } else { - ctx = stdx::make_unique<AutoGetCollectionOrViewForReadCommand>(opCtx, nss); - Collection* collection = ctx->getCollection(); - ViewDefinition* view = ctx->getView(); - if (view) { - Database* db = ctx->getDb(); - auto resolved = db->getViewCatalog()->resolveView(opCtx, nss); - if (!resolved.isOK()) { - return resolved.getStatus(); - } - ctx->releaseLocksForView(); - Status status = _killCursor(opCtx, resolved.getValue().getNamespace(), cursorId); - { - // Set the namespace of the curop back to the view namespace so ctx records - // stats on this view namespace on destruction. - stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setNS_inlock(nss.ns()); + + if (auto nssForCurOp = nss.isGloballyManagedNamespace() + ? nss.getTargetNSForGloballyManagedNamespace() + : nss) { + const boost::optional<int> dbProfilingLevel = boost::none; + statsTracker.emplace( + opCtx, *nssForCurOp, Top::LockType::NotLocked, dbProfilingLevel); + } + + // Make sure the namespace of the cursor matches the namespace passed to the killCursors + // command so we can be sure we checked the correct privileges. + auto ccPin = cursorManager->pinCursor(cursorId); + if (ccPin.isOK()) { + auto cursorNs = ccPin.getValue().getCursor()->nss(); + if (cursorNs != nss) { + return Status{ErrorCodes::Unauthorized, + str::stream() << "issued killCursors on namespace '" << nss.ns() + << "', but cursor with id " + << cursorId + << " belongs to a different namespace: " + << cursorNs.ns()}; } - return status; } + } else { + readLock.emplace(opCtx, nss); + Collection* collection = readLock->getCollection(); if (!collection) { return {ErrorCodes::CursorNotFound, str::stream() << "collection does not exist: " << nss.ns()}; diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp index da72422b4eb..a636ed51d17 100644 --- a/src/mongo/db/commands/list_collections.cpp +++ b/src/mongo/db/commands/list_collections.cpp @@ -298,7 +298,7 @@ public: const NamespaceString cursorNss = NamespaceString::makeListCollectionsNSS(dbname); auto statusWithPlanExecutor = PlanExecutor::make( - opCtx, std::move(ws), std::move(root), cursorNss.ns(), PlanExecutor::YIELD_MANUAL); + opCtx, std::move(ws), std::move(root), cursorNss, PlanExecutor::YIELD_MANUAL); if (!statusWithPlanExecutor.isOK()) { return appendCommandStatus(result, statusWithPlanExecutor.getStatus()); } @@ -328,9 +328,10 @@ public: exec->saveState(); exec->detachFromOperationContext(); auto pinnedCursor = CursorManager::getGlobalCursorManager()->registerCursor( - {exec.release(), - cursorNss.ns(), - opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()}); + {std::move(exec), + cursorNss, + opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + jsobj}); cursorId = pinnedCursor.getCursor()->cursorid(); } diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp index 73e3d19289e..7ea9b23ed0d 100644 --- a/src/mongo/db/commands/list_indexes.cpp +++ b/src/mongo/db/commands/list_indexes.cpp @@ -198,7 +198,7 @@ public: dassert(ns == cursorNss.getTargetNSForListIndexes()); auto statusWithPlanExecutor = PlanExecutor::make( - opCtx, std::move(ws), std::move(root), cursorNss.ns(), PlanExecutor::YIELD_MANUAL); + opCtx, std::move(ws), std::move(root), cursorNss, PlanExecutor::YIELD_MANUAL); if (!statusWithPlanExecutor.isOK()) { return appendCommandStatus(result, statusWithPlanExecutor.getStatus()); } @@ -228,9 +228,10 @@ public: exec->saveState(); exec->detachFromOperationContext(); auto pinnedCursor = CursorManager::getGlobalCursorManager()->registerCursor( - {exec.release(), - cursorNss.ns(), - opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()}); + {std::move(exec), + cursorNss, + opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + cmdObj}); cursorId = pinnedCursor.getCursor()->cursorid(); } diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp index 8ff2e100a58..c46cb6d8c5b 100644 --- a/src/mongo/db/commands/parallel_collection_scan.cpp +++ b/src/mongo/db/commands/parallel_collection_scan.cpp @@ -148,11 +148,12 @@ public: exec->saveState(); exec->detachFromOperationContext(); - // Create and regiter a new ClientCursor. + // Create and register a new ClientCursor. auto pinnedCursor = collection->getCursorManager()->registerCursor( - {exec.release(), - ns.ns(), - opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()}); + {std::move(exec), + ns, + opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + cmdObj}); pinnedCursor.getCursor()->setLeftoverMaxTimeMicros( opCtx->getRemainingMaxTimeMicros()); diff --git a/src/mongo/db/commands/repair_cursor.cpp b/src/mongo/db/commands/repair_cursor.cpp index ac6155cc394..69ded443ec6 100644 --- a/src/mongo/db/commands/repair_cursor.cpp +++ b/src/mongo/db/commands/repair_cursor.cpp @@ -107,9 +107,10 @@ public: exec->detachFromOperationContext(); auto pinnedCursor = collection->getCursorManager()->registerCursor( - {exec.release(), - ns.ns(), - opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()}); + {std::move(exec), + ns, + opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + cmdObj}); appendCursorResponseObject( pinnedCursor.getCursor()->cursorid(), ns.ns(), BSONArray(), &result); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 35164560c16..f0ef162e9af 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -60,6 +60,7 @@ #include "mongo/db/views/view_sharding_check.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" #include "mongo/util/string_map.h" namespace mongo { @@ -76,19 +77,15 @@ namespace { /** * Returns true if we need to keep a ClientCursor saved for this pipeline (for future getMore * requests). Otherwise, returns false. The passed 'nsForCursor' is only used to determine the - * namespace used in the returned cursor. In the case of views, this can be different from that - * in 'request'. + * namespace used in the returned cursor, which will be registered with the global cursor manager, + * and thus will be different from that in 'request'. */ bool handleCursorCommand(OperationContext* opCtx, - const string& nsForCursor, + const NamespaceString& nsForCursor, ClientCursor* cursor, - PlanExecutor* exec, const AggregationRequest& request, BSONObjBuilder& result) { - if (cursor) { - invariant(cursor->getExecutor() == exec); - invariant(cursor->isAggCursor()); - } + invariant(cursor); long long batchSize = request.getBatchSize(); @@ -99,45 +96,29 @@ bool handleCursorCommand(OperationContext* opCtx, // The initial getNext() on a PipelineProxyStage may be very expensive so we don't // do it when batchSize is 0 since that indicates a desire for a fast return. PlanExecutor::ExecState state; - if ((state = exec->getNext(&next, NULL)) == PlanExecutor::IS_EOF) { + if ((state = cursor->getExecutor()->getNext(&next, nullptr)) == PlanExecutor::IS_EOF) { // make it an obvious error to use cursor or executor after this point - cursor = NULL; - exec = NULL; + cursor = nullptr; break; } - uassert(34426, - "Plan executor error during aggregation: " + WorkingSetCommon::toStatusString(next), - PlanExecutor::ADVANCED == state); + if (PlanExecutor::ADVANCED != state) { + auto status = WorkingSetCommon::getMemberObjectStatus(next); + uasserted(status.code(), + "PlanExecutor error during aggregation: " + + WorkingSetCommon::toStatusString(next)); + } // If adding this object will cause us to exceed the message size limit, then we stash it // for later. if (!FindCommon::haveSpaceForNext(next, objCount, resultsArray.len())) { - exec->enqueue(next); + cursor->getExecutor()->enqueue(next); break; } resultsArray.append(next); } - // NOTE: exec->isEOF() can have side effects such as writing by $out. However, it should - // be relatively quick since if there was no cursor then the input is empty. Also, this - // violates the contract for batchSize==0. Sharding requires a cursor to be returned in that - // case. This is ok for now however, since you can't have a sharded collection that doesn't - // exist. - const bool canReturnMoreBatches = cursor; - if (!canReturnMoreBatches && exec && !exec->isEOF()) { - // msgasserting since this shouldn't be possible to trigger from today's aggregation - // language. The wording assumes that the only reason cursor would be null is if the - // collection doesn't exist. - msgasserted( - 17391, - str::stream() << "Aggregation has more results than fit in initial batch, but can't " - << "create cursor since collection " - << nsForCursor - << " doesn't exist"); - } - if (cursor) { // If a time limit was set on the pipeline, remaining time is "rolled over" to the // cursor (for use by future getmore ops). @@ -147,14 +128,14 @@ bool handleCursorCommand(OperationContext* opCtx, // Cursor needs to be in a saved state while we yield locks for getmore. State // will be restored in getMore(). - exec->saveState(); - exec->detachFromOperationContext(); + cursor->getExecutor()->saveState(); + cursor->getExecutor()->detachFromOperationContext(); } else { CurOp::get(opCtx)->debug().cursorExhausted = true; } - const long long cursorId = cursor ? cursor->cursorid() : 0LL; - appendCursorResponseObject(cursorId, nsForCursor, resultsArray.arr(), &result); + const CursorId cursorId = cursor ? cursor->cursorid() : 0LL; + appendCursorResponseObject(cursorId, nsForCursor.ns(), resultsArray.arr(), &result); return static_cast<bool>(cursor); } @@ -296,7 +277,6 @@ Status runAggregate(OperationContext* opCtx, : uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) ->makeFromBSON(request.getCollation())); - boost::optional<ClientCursorPin> pin; // either this OR the exec will be non-null unique_ptr<PlanExecutor> exec; boost::intrusive_ptr<ExpressionContext> expCtx; boost::intrusive_ptr<Pipeline> pipeline; @@ -304,11 +284,6 @@ Status runAggregate(OperationContext* opCtx, { // This will throw if the sharding version for this connection is out of date. If the // namespace is a view, the lock will be released before re-running the aggregation. - // Otherwise, the lock must be held continuously from now until we have we created both - // the output ClientCursor and the input executor. This ensures that both are using the - // same sharding version that we synchronize on here. This is also why we always need to - // create a ClientCursor even when we aren't outputting to a cursor. See the comment on - // ShardFilterStage for more details. AutoGetCollectionOrViewForReadCommand ctx(opCtx, nss); Collection* collection = ctx.getCollection(); @@ -416,17 +391,15 @@ Status runAggregate(OperationContext* opCtx, // it to the front of the pipeline if needed. PipelineD::prepareCursorSource(collection, &request, pipeline); - // Create the PlanExecutor which returns results from the pipeline. The WorkingSet - // ('ws') and the PipelineProxyStage ('proxy') will be owned by the created - // PlanExecutor. auto ws = make_unique<WorkingSet>(); auto proxy = make_unique<PipelineProxyStage>(opCtx, pipeline, ws.get()); - auto statusWithPlanExecutor = (NULL == collection) - ? PlanExecutor::make( - opCtx, std::move(ws), std::move(proxy), nss.ns(), PlanExecutor::YIELD_MANUAL) - : PlanExecutor::make( - opCtx, std::move(ws), std::move(proxy), collection, PlanExecutor::YIELD_MANUAL); + // This PlanExecutor will simply forward requests to the Pipeline, so does not need to + // yield or to be registered with any collection's CursorManager to receive invalidations. + // The Pipeline may contain PlanExecutors which *are* yielding PlanExecutors and which *are* + // registered with their respective collection's CursorManager + auto statusWithPlanExecutor = PlanExecutor::make( + opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::YIELD_MANUAL); invariant(statusWithPlanExecutor.isOK()); exec = std::move(statusWithPlanExecutor.getValue()); @@ -435,75 +408,39 @@ Status runAggregate(OperationContext* opCtx, stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp->setPlanSummary_inlock(std::move(planSummary)); } - - if (collection) { - const bool isAggCursor = true; // enable special locking behavior - pin.emplace(collection->getCursorManager()->registerCursor( - {exec.release(), - nss.ns(), - opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), - 0, - cmdObj.getOwned(), - isAggCursor})); - // Don't add any code between here and the start of the try block. - } - - // At this point, it is safe to release the collection lock. - // - In the case where we have a collection: we will need to reacquire the - // collection lock later when cleaning up our ClientCursorPin. - // - In the case where we don't have a collection: our PlanExecutor won't be - // registered, so it will be safe to clean it up outside the lock. - invariant(!exec || !collection); } - try { - // Unless set to true, the ClientCursor created above will be deleted on block exit. - bool keepCursor = false; - - // If both explain and cursor are specified, explain wins. - if (expCtx->explain) { - result << "stages" << Value(pipeline->writeExplainOps(*expCtx->explain)); - } else { - // Cursor must be specified, if explain is not. - keepCursor = handleCursorCommand(opCtx, - origNss.ns(), - pin ? pin->getCursor() : nullptr, - pin ? pin->getCursor()->getExecutor() : exec.get(), - request, - result); - } - - if (!expCtx->explain) { - PlanSummaryStats stats; - Explain::getSummaryStats(pin ? *pin->getCursor()->getExecutor() : *exec.get(), &stats); - curOp->debug().setPlanSummaryMetrics(stats); - curOp->debug().nreturned = stats.nReturned; + // Having released the collection lock, we can now create a cursor that returns results from the + // pipeline. This cursor owns no collection state, and thus we register it with the global + // cursor manager. The global cursor manager does not deliver invalidations or kill + // notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving + // invalidations and kill notifications themselves, not the cursor we create here. + auto pin = CursorManager::getGlobalCursorManager()->registerCursor( + {std::move(exec), + origNss, + opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + cmdObj}); + ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin); + + // If both explain and cursor are specified, explain wins. + if (expCtx->explain) { + result << "stages" << Value(pipeline->writeExplainOps(*expCtx->explain)); + } else { + // Cursor must be specified, if explain is not. + const bool keepCursor = + handleCursorCommand(opCtx, origNss, pin.getCursor(), request, result); + if (keepCursor) { + cursorFreer.Dismiss(); } + } - // Clean up our ClientCursorPin, if needed. We must reacquire the collection lock - // in order to do so. - if (pin) { - // We acquire locks here with DBLock and CollectionLock instead of using - // AutoGetCollectionForRead. AutoGetCollectionForRead will throw if the - // sharding version is out of date, and we don't care if the sharding version - // has changed. - Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS); - Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS); - if (keepCursor) { - pin->release(); - } else { - pin->deleteUnderlying(); - } - } - } catch (...) { - // On our way out of scope, we clean up our ClientCursorPin if needed. - if (pin) { - Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS); - Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS); - pin->deleteUnderlying(); - } - throw; + if (!expCtx->explain) { + PlanSummaryStats stats; + Explain::getSummaryStats(*(pin.getCursor()->getExecutor()), &stats); + curOp->debug().setPlanSummaryMetrics(stats); + curOp->debug().nreturned = stats.nReturned; } + // Any code that needs the cursor pinned must be inside the try block, above. return Status::OK(); } diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp index 51016d486ce..2af0907170d 100644 --- a/src/mongo/db/curop.cpp +++ b/src/mongo/db/curop.cpp @@ -266,10 +266,12 @@ void CurOp::ensureStarted() { } } -void CurOp::enter_inlock(const char* ns, int dbProfileLevel) { +void CurOp::enter_inlock(const char* ns, boost::optional<int> dbProfileLevel) { ensureStarted(); _ns = ns; - raiseDbProfileLevel(dbProfileLevel); + if (dbProfileLevel) { + raiseDbProfileLevel(*dbProfileLevel); + } } void CurOp::raiseDbProfileLevel(int dbProfileLevel) { diff --git a/src/mongo/db/curop.h b/src/mongo/db/curop.h index 4e0ef0d2d7b..158513bf6fe 100644 --- a/src/mongo/db/curop.h +++ b/src/mongo/db/curop.h @@ -192,7 +192,7 @@ public: return _originatingCommand; } - void enter_inlock(const char* ns, int dbProfileLevel); + void enter_inlock(const char* ns, boost::optional<int> dbProfileLevel); /** * Sets the type of the current network operation. diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp index 84b6584b540..8001b658a09 100644 --- a/src/mongo/db/db_raii.cpp +++ b/src/mongo/db/db_raii.cpp @@ -75,6 +75,35 @@ AutoGetOrCreateDb::AutoGetOrCreateDb(OperationContext* opCtx, StringData ns, Loc } } +AutoStatsTracker::AutoStatsTracker(OperationContext* opCtx, + const NamespaceString& nss, + Top::LockType lockType, + boost::optional<int> dbProfilingLevel) + : _opCtx(opCtx), _lockType(lockType) { + if (!dbProfilingLevel) { + // No profiling level was determined, attempt to read the profiling level from the Database + // object. + AutoGetDb autoDb(_opCtx, nss.db(), MODE_IS); + if (autoDb.getDb()) { + dbProfilingLevel = autoDb.getDb()->getProfilingLevel(); + } + } + stdx::lock_guard<Client> clientLock(*_opCtx->getClient()); + CurOp::get(_opCtx)->enter_inlock(nss.ns().c_str(), dbProfilingLevel); +} + +AutoStatsTracker::~AutoStatsTracker() { + auto curOp = CurOp::get(_opCtx); + Top::get(_opCtx->getServiceContext()) + .record(_opCtx, + curOp->getNS(), + curOp->getLogicalOp(), + _lockType, + _timer.micros(), + curOp->isCommand(), + curOp->getReadWriteType()); +} + AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx, const NamespaceString& nss, AutoGetCollection::ViewMode viewMode) { @@ -84,19 +113,6 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx, _ensureMajorityCommittedSnapshotIsValid(nss, opCtx); } -AutoGetCollectionForReadCommand::~AutoGetCollectionForReadCommand() { - // Report time spent in read lock - auto currentOp = CurOp::get(_opCtx); - Top::get(_opCtx->getClient()->getServiceContext()) - .record(_opCtx, - currentOp->getNS(), - currentOp->getLogicalOp(), - -1, // "read locked" - _timer.micros(), - currentOp->isCommand(), - currentOp->getReadWriteType()); -} - void AutoGetCollectionForRead::_ensureMajorityCommittedSnapshotIsValid(const NamespaceString& nss, OperationContext* opCtx) { while (true) { @@ -134,25 +150,15 @@ void AutoGetCollectionForRead::_ensureMajorityCommittedSnapshotIsValid(const Nam } AutoGetCollectionForReadCommand::AutoGetCollectionForReadCommand( - OperationContext* opCtx, const NamespaceString& nss, AutoGetCollection::ViewMode viewMode) - : _opCtx(opCtx) { - { - _autoCollForRead.emplace(opCtx, nss, viewMode); - - auto curOp = CurOp::get(_opCtx); - stdx::lock_guard<Client> lk(*_opCtx->getClient()); - - // TODO: OldClientContext legacy, needs to be removed - curOp->ensureStarted(); - curOp->setNS_inlock(nss.ns()); - - // At this point, we are locked in shared mode for the database by the DB lock in the - // constructor, so it is safe to load the DB pointer. - if (_autoCollForRead->getDb()) { - // TODO: OldClientContext legacy, needs to be removed - curOp->enter_inlock(nss.ns().c_str(), _autoCollForRead->getDb()->getProfilingLevel()); - } - } + OperationContext* opCtx, const NamespaceString& nss, AutoGetCollection::ViewMode viewMode) { + + _autoCollForRead.emplace(opCtx, nss, viewMode); + const int doNotChangeProfilingLevel = 0; + _statsTracker.emplace(opCtx, + nss, + Top::LockType::ReadLocked, + _autoCollForRead->getDb() ? _autoCollForRead->getDb()->getProfilingLevel() + : doNotChangeProfilingLevel); // We have both the DB and collection locked, which is the prerequisite to do a stable shard // version check, but we'd like to do the check after we have a satisfactory snapshot. @@ -231,7 +237,8 @@ OldClientContext::~OldClientContext() { .record(_opCtx, currentOp->getNS(), currentOp->getLogicalOp(), - _opCtx->lockState()->isWriteLocked() ? 1 : -1, + _opCtx->lockState()->isWriteLocked() ? Top::LockType::WriteLocked + : Top::LockType::ReadLocked, _timer.micros(), currentOp->isCommand(), currentOp->getReadWriteType()); diff --git a/src/mongo/db/db_raii.h b/src/mongo/db/db_raii.h index 8060d06f000..42444775bad 100644 --- a/src/mongo/db/db_raii.h +++ b/src/mongo/db/db_raii.h @@ -35,6 +35,7 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/stats/top.h" #include "mongo/db/views/view.h" #include "mongo/util/timer.h" @@ -170,6 +171,36 @@ private: }; /** + * RAII-style class which automatically tracks the operation namespace in CurrentOp and records the + * operation via Top upon destruction. + */ +class AutoStatsTracker { + MONGO_DISALLOW_COPYING(AutoStatsTracker); + +public: + /** + * Sets the namespace of the CurOp object associated with 'opCtx' to be 'nss' and starts the + * CurOp timer. 'lockType' describes which type of lock is held by this operation, and will be + * used for reporting via Top. If 'dbProfilingLevel' is not given, this constructor will acquire + * and then drop a database lock in order to determine the database's profiling level. + */ + AutoStatsTracker(OperationContext* opCtx, + const NamespaceString& nss, + Top::LockType lockType, + boost::optional<int> dbProfilingLevel); + + /** + * Records stats about the current operation via Top. + */ + ~AutoStatsTracker(); + +private: + const Timer _timer; + OperationContext* _opCtx; + Top::LockType _lockType; +}; + +/** * RAII-style class, which would acquire the appropriate hierarchy of locks for obtaining * a particular collection and would retrieve a reference to the collection. In addition, this * utility will ensure that the read will be performed against an appropriately committed snapshot @@ -236,8 +267,6 @@ public: : AutoGetCollectionForReadCommand( opCtx, nss, AutoGetCollection::ViewMode::kViewsForbidden) {} - ~AutoGetCollectionForReadCommand(); - Database* getDb() const { return _autoCollForRead->getDb(); } @@ -246,21 +275,18 @@ public: return _autoCollForRead->getCollection(); } -private: - OperationContext* const _opCtx; - const Timer _timer; - protected: AutoGetCollectionForReadCommand(OperationContext* opCtx, const NamespaceString& nss, AutoGetCollection::ViewMode viewMode); - /** - * This protected section must come after the private section because - * AutoGetCollectionOrViewForRead needs access to _autoColl, but _autoColl must be initialized - * after _transaction. - */ + // '_autoCollForRead' may need to be reset by AutoGetCollectionOrViewForReadCommand, so needs to + // be a boost::optional. boost::optional<AutoGetCollectionForRead> _autoCollForRead; + + // This needs to be initialized after 'autoCollForRead', since we need to consult the Database + // object to get the profiling level. Thus, it needs to be a boost::optional. + boost::optional<AutoStatsTracker> _statsTracker; }; /** diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h index 9482f9ba565..fd5b99c4bae 100644 --- a/src/mongo/db/exec/pipeline_proxy.h +++ b/src/mongo/db/exec/pipeline_proxy.h @@ -37,6 +37,7 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/record_id.h" +#include "mongo/util/assert_util.h" namespace mongo { @@ -64,13 +65,18 @@ public: // Not used. SpecificStats* getSpecificStats() const final { - return NULL; + MONGO_UNREACHABLE; + } + + void doInvalidate(OperationContext* txn, const RecordId& rid, InvalidationType type) final { + // A PlanExecutor with a PipelineProxyStage should be registered with the global cursor + // manager, so should not receive invalidations. + MONGO_UNREACHABLE; } std::string getPlanSummaryStr() const; void getPlanSummaryStats(PlanSummaryStats* statsOut) const; - // Not used. StageType stageType() const final { return STAGE_PIPELINE_PROXY; } diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 6ee4d989ceb..0a453546a61 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -137,6 +137,17 @@ NamespaceString NamespaceString::getTargetNSForListIndexes() const { return NamespaceString(db(), coll().substr(listIndexesCursorNSPrefix.size())); } +boost::optional<NamespaceString> NamespaceString::getTargetNSForGloballyManagedNamespace() const { + // Globally managed namespaces are of the form '$cmd.commandName.<targetNs>' or simply + // '$cmd.commandName'. + dassert(isGloballyManagedNamespace()); + const size_t indexOfNextDot = coll().find('.', 5); + if (indexOfNextDot == std::string::npos) { + return boost::none; + } + return NamespaceString{db(), coll().substr(indexOfNextDot + 1)}; +} + string NamespaceString::escapeDbName(const StringData dbname) { std::string escapedDbName; diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index c623dbbac90..58f26d76b2c 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -31,6 +31,7 @@ #pragma once #include <algorithm> +#include <boost/optional.hpp> #include <iosfwd> #include <string> @@ -93,13 +94,13 @@ public: NamespaceString(StringData dbName, StringData collectionName); /** - * Contructs a NamespaceString representing a listCollections namespace. The format for this + * Constructs a NamespaceString representing a listCollections namespace. The format for this * namespace is "<dbName>.$cmd.listCollections". */ static NamespaceString makeListCollectionsNSS(StringData dbName); /** - * Contructs a NamespaceString representing a listIndexes namespace. The format for this + * Constructs a NamespaceString representing a listIndexes namespace. The format for this * namespace is "<dbName>.$cmd.listIndexes.<collectionName>". */ static NamespaceString makeListIndexesNSS(StringData dbName, StringData collectionName); @@ -202,10 +203,25 @@ public: bool isVirtualized() const { return virtualized(_ns); } + + /** + * Returns true if cursors for this namespace are registered with the global cursor manager. + */ + bool isGloballyManagedNamespace() const { + return coll().startsWith("$cmd."_sd); + } + bool isListCollectionsCursorNS() const; bool isListIndexesCursorNS() const; /** + * Given a NamespaceString for which isGloballyManagedNamespace() returns true, returns the + * namespace the command targets, or boost::none for commands like 'listCollections' which + * do not target a collection. + */ + boost::optional<NamespaceString> getTargetNSForGloballyManagedNamespace() const; + + /** * Given a NamespaceString for which isListIndexesCursorNS() returns true, returns the * NamespaceString for the collection that the "listIndexes" targets. */ diff --git a/src/mongo/db/namespace_string_test.cpp b/src/mongo/db/namespace_string_test.cpp index 69884bd5ca9..3e4b489b508 100644 --- a/src/mongo/db/namespace_string_test.cpp +++ b/src/mongo/db/namespace_string_test.cpp @@ -173,6 +173,41 @@ TEST(NamespaceStringTest, ListIndexesCursorNS) { ASSERT(!NamespaceString("test.$cmd.listCollections.foo").isListIndexesCursorNS()); } +TEST(NamespaceStringTest, IsGloballyManagedNamespace) { + ASSERT_TRUE(NamespaceString{"test.$cmd.aggregate.foo"}.isGloballyManagedNamespace()); + ASSERT_TRUE(NamespaceString{"test.$cmd.listIndexes.foo"}.isGloballyManagedNamespace()); + ASSERT_TRUE(NamespaceString{"test.$cmd.otherCommand.foo"}.isGloballyManagedNamespace()); + ASSERT_TRUE(NamespaceString{"test.$cmd.listCollections"}.isGloballyManagedNamespace()); + ASSERT_TRUE(NamespaceString{"test.$cmd.otherCommand"}.isGloballyManagedNamespace()); + ASSERT_TRUE(NamespaceString{"test.$cmd.aggregate"}.isGloballyManagedNamespace()); + ASSERT_TRUE(NamespaceString{"test.$cmd.listIndexes"}.isGloballyManagedNamespace()); + + ASSERT_FALSE(NamespaceString{"test.foo"}.isGloballyManagedNamespace()); + ASSERT_FALSE(NamespaceString{"test.$cmd"}.isGloballyManagedNamespace()); + + ASSERT_FALSE(NamespaceString{"$cmd.aggregate.foo"}.isGloballyManagedNamespace()); + ASSERT_FALSE(NamespaceString{"$cmd.listCollections"}.isGloballyManagedNamespace()); +} + +TEST(NamespaceStringTest, GetTargetNSForGloballyManagedNamespace) { + ASSERT_EQ( + (NamespaceString{"test", "foo"}), + NamespaceString{"test.$cmd.aggregate.foo"}.getTargetNSForGloballyManagedNamespace().get()); + ASSERT_EQ((NamespaceString{"test", "foo"}), + NamespaceString{"test.$cmd.listIndexes.foo"} + .getTargetNSForGloballyManagedNamespace() + .get()); + ASSERT_EQ((NamespaceString{"test", "foo"}), + NamespaceString{"test.$cmd.otherCommand.foo"} + .getTargetNSForGloballyManagedNamespace() + .get()); + + ASSERT_FALSE( + NamespaceString{"test.$cmd.listCollections"}.getTargetNSForGloballyManagedNamespace()); + ASSERT_FALSE( + NamespaceString{"test.$cmd.otherCommand"}.getTargetNSForGloballyManagedNamespace()); +} + TEST(NamespaceStringTest, CollectionComponentValidNames) { ASSERT(NamespaceString::validCollectionComponent("a.b")); ASSERT(NamespaceString::validCollectionComponent("a.b")); diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 60f0b7c79b6..a29a42e4a52 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -94,7 +94,7 @@ void finishCurOp(OperationContext* opCtx, CurOp* curOp) { .record(opCtx, curOp->getNS(), curOp->getLogicalOp(), - 1, // "write locked" + Top::LockType::WriteLocked, curOp->totalTimeMicros(), curOp->isCommand(), curOp->getReadWriteType()); @@ -412,7 +412,7 @@ WriteResult performInserts(OperationContext* opCtx, const InsertOp& wholeOp) { .record(opCtx, wholeOp.ns.ns(), LogicalOp::opInsert, - 1 /* write locked*/, + Top::LockType::WriteLocked, curOp.totalTimeMicros(), curOp.isCommand(), curOp.getReadWriteType()); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index e243adda24a..ee26ddc62fd 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -473,5 +473,6 @@ env.Library( '$BUILD_DIR/mongo/db/index/index_access_methods', '$BUILD_DIR/mongo/db/matcher/expressions_mongod_only', '$BUILD_DIR/mongo/db/stats/serveronly', + '$BUILD_DIR/mongo/db/clientcursor', ], ) diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index ae62da0a978..edc49e4923e 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -31,12 +31,11 @@ #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/catalog/collection.h" +#include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/query/explain.h" -#include "mongo/db/query/find_common.h" -#include "mongo/db/server_parameters.h" #include "mongo/db/storage/storage_options.h" #include "mongo/util/scopeguard.h" @@ -66,20 +65,27 @@ DocumentSource::GetNextResult DocumentSourceCursor::getNext() { } void DocumentSourceCursor::dispose() { - _exec.reset(); _currentBatch.clear(); + if (!_exec) { + return; + } + + // We must hold a collection lock to destroy a PlanExecutor to ensure the CursorManager will + // still exist and can be safely told to deregister the PlanExecutor. + invariant(pExpCtx->opCtx); + AutoGetCollection autoColl(pExpCtx->opCtx, _exec->nss(), MODE_IS); + _exec.reset(); + _rangePreserver.release(); } void DocumentSourceCursor::loadBatch() { if (!_exec) { + // No more documents. dispose(); return; } - // We have already validated the sharding version when we constructed the PlanExecutor - // so we shouldn't check it again. - AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _nss); - + AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss()); _exec->restoreState(); int memUsageBytes = 0; @@ -88,7 +94,7 @@ void DocumentSourceCursor::loadBatch() { { ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); }); - while ((state = _exec->getNext(&obj, NULL)) == PlanExecutor::ADVANCED) { + while ((state = _exec->getNext(&obj, nullptr)) == PlanExecutor::ADVANCED) { if (_shouldProduceEmptyDocs) { _currentBatch.push_back(Document()); } else if (_dependencies) { @@ -114,27 +120,28 @@ void DocumentSourceCursor::loadBatch() { } } - // If we got here, there won't be any more documents, so destroy the executor. Can't use - // dispose since we want to keep the _currentBatch. + // If we got here, there won't be any more documents, so destroy our PlanExecutor. Note we can't + // use dispose() since we want to keep the current batch. _exec.reset(); - - uassert(16028, - str::stream() << "collection or index disappeared when cursor yielded: " - << WorkingSetCommon::toStatusString(obj), - state != PlanExecutor::DEAD); - - uassert(17285, - str::stream() << "cursor encountered an error: " - << WorkingSetCommon::toStatusString(obj), - state != PlanExecutor::FAILURE); - - massert(17286, - str::stream() << "Unexpected return from PlanExecutor::getNext: " << state, - state == PlanExecutor::IS_EOF || state == PlanExecutor::ADVANCED); -} - -long long DocumentSourceCursor::getLimit() const { - return _limit ? _limit->getLimit() : -1; + _rangePreserver.release(); + + switch (state) { + case PlanExecutor::ADVANCED: + case PlanExecutor::IS_EOF: + return; // We've reached our limit or exhausted the cursor. + case PlanExecutor::DEAD: { + uasserted(ErrorCodes::QueryPlanKilled, + str::stream() << "collection or index disappeared when cursor yielded: " + << WorkingSetCommon::toStatusString(obj)); + } + case PlanExecutor::FAILURE: { + uasserted(17285, + str::stream() << "cursor encountered an error: " + << WorkingSetCommon::toStatusString(obj)); + } + default: + MONGO_UNREACHABLE; + } } Pipeline::SourceContainer::iterator DocumentSourceCursor::doOptimizeAt( @@ -156,12 +163,6 @@ Pipeline::SourceContainer::iterator DocumentSourceCursor::doOptimizeAt( return std::next(itr); } - -void DocumentSourceCursor::recordPlanSummaryStr() { - invariant(_exec); - _planSummary = Explain::getPlanSummary(_exec.get()); -} - void DocumentSourceCursor::recordPlanSummaryStats() { invariant(_exec); // Aggregation handles in-memory sort outside of the query sub-system. Given that we need to @@ -175,20 +176,17 @@ void DocumentSourceCursor::recordPlanSummaryStats() { } Value DocumentSourceCursor::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { - // we never parse a documentSourceCursor, so we only serialize for explain + // We never parse a DocumentSourceCursor, so we only serialize for explain. if (!explain) return Value(); // Get planner-level explain info from the underlying PlanExecutor. + invariant(_exec); BSONObjBuilder explainBuilder; { - AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _nss); - - massert(17392, "No _exec. Were we disposed before explained?", _exec); - + AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss()); _exec->restoreState(); Explain::explainStages(_exec.get(), autoColl.getCollection(), *explain, &explainBuilder); - _exec->saveState(); } @@ -229,44 +227,28 @@ void DocumentSourceCursor::reattachToOperationContext(OperationContext* opCtx) { } DocumentSourceCursor::DocumentSourceCursor(Collection* collection, - const string& ns, std::unique_ptr<PlanExecutor> exec, const intrusive_ptr<ExpressionContext>& pCtx) : DocumentSource(pCtx), _docsAddedToBatches(0), - _nss(ns), + _rangePreserver(collection), _exec(std::move(exec)), _outputSorts(_exec->getOutputSorts()) { - recordPlanSummaryStr(); - // We record execution metrics here to allow for capture of indexes used prior to execution. + _planSummary = Explain::getPlanSummary(_exec.get()); recordPlanSummaryStats(); + if (collection) { - collection->infoCache()->notifyOfQuery(pCtx->opCtx, _planSummaryStats.indexesUsed); + collection->infoCache()->notifyOfQuery(pExpCtx->opCtx, _planSummaryStats.indexesUsed); } } intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create( Collection* collection, - const string& ns, std::unique_ptr<PlanExecutor> exec, const intrusive_ptr<ExpressionContext>& pExpCtx) { intrusive_ptr<DocumentSourceCursor> source( - new DocumentSourceCursor(collection, ns, std::move(exec), pExpCtx)); + new DocumentSourceCursor(collection, std::move(exec), pExpCtx)); return source; } - -void DocumentSourceCursor::setProjection(const BSONObj& projection, - const boost::optional<ParsedDeps>& deps) { - _projection = projection; - _dependencies = deps; -} - -const std::string& DocumentSourceCursor::getPlanSummaryStr() const { - return _planSummary; -} - -const PlanSummaryStats& DocumentSourceCursor::getPlanSummaryStats() const { - return _planSummaryStats; -} } diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 3faeea86de6..348174b656a 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -34,16 +34,14 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/query/plan_summary_stats.h" +#include "mongo/db/range_preserver.h" namespace mongo { class PlanExecutor; /** - * Constructs and returns Documents from the BSONObj objects produced by a supplied - * PlanExecutor. - * - * An object of this type may only be used by one thread, see SERVER-6123. + * Constructs and returns Documents from the BSONObj objects produced by a supplied PlanExecutor. */ class DocumentSourceCursor final : public DocumentSource { public: @@ -69,14 +67,11 @@ public: void reattachToOperationContext(OperationContext* opCtx) final; /** - * Create a document source based on a passed-in PlanExecutor. - * - * This is usually put at the beginning of a chain of document sources - * in order to fetch data from the database. + * Create a document source based on a passed-in PlanExecutor. 'exec' must be a yielding + * PlanExecutor, and must be registered with the associated collection's CursorManager. */ static boost::intrusive_ptr<DocumentSourceCursor> create( Collection* collection, - const std::string& ns, std::unique_ptr<PlanExecutor> exec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); @@ -116,10 +111,17 @@ public: * @param projection The projection that has been passed down to the query system. * @param deps The output of DepsTracker::toParsedDeps. */ - void setProjection(const BSONObj& projection, const boost::optional<ParsedDeps>& deps); + void setProjection(const BSONObj& projection, const boost::optional<ParsedDeps>& deps) { + _projection = projection; + _dependencies = deps; + } - /// returns -1 for no limit - long long getLimit() const; + /** + * Returns the limit associated with this cursor, or -1 if there is no limit. + */ + long long getLimit() const { + return _limit ? _limit->getLimit() : -1; + } /** * If subsequent sources need no information from the cursor, the cursor can simply output empty @@ -129,20 +131,21 @@ public: _shouldProduceEmptyDocs = true; } - const std::string& getPlanSummaryStr() const; + const std::string& getPlanSummaryStr() const { + return _planSummary; + } - const PlanSummaryStats& getPlanSummaryStats() const; + const PlanSummaryStats& getPlanSummaryStats() const { + return _planSummaryStats; + } private: DocumentSourceCursor(Collection* collection, - const std::string& ns, std::unique_ptr<PlanExecutor> exec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); void loadBatch(); - void recordPlanSummaryStr(); - void recordPlanSummaryStats(); std::deque<Document> _currentBatch; @@ -156,7 +159,7 @@ private: boost::intrusive_ptr<DocumentSourceLimit> _limit; long long _docsAddedToBatches; // for _limit enforcement - const NamespaceString _nss; + RangePreserver _rangePreserver; std::unique_ptr<PlanExecutor> _exec; BSONObjSet _outputSorts; std::string _planSummary; diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 262deeb140b..c37c1301198 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -579,15 +579,12 @@ void PipelineD::addCursorSource(Collection* collection, const BSONObj& queryObj, const BSONObj& sortObj, const BSONObj& projectionObj) { - // Get the full "namespace" name. - const string& fullName = expCtx->ns.ns(); - // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. exec->saveState(); // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline. intrusive_ptr<DocumentSourceCursor> pSource = - DocumentSourceCursor::create(collection, fullName, std::move(exec), expCtx); + DocumentSourceCursor::create(collection, std::move(exec), expCtx); // Note the query, sort, and projection for explain. pSource->setQuery(queryObj); diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp index eb43caf5047..b243336655b 100644 --- a/src/mongo/db/query/explain.cpp +++ b/src/mongo/db/query/explain.cpp @@ -596,7 +596,7 @@ void Explain::generatePlannerInfo(PlanExecutor* exec, BSONObjBuilder plannerBob(out->subobjStart("queryPlanner")); plannerBob.append("plannerVersion", QueryPlanner::kPlannerVersion); - plannerBob.append("namespace", exec->ns()); + plannerBob.append("namespace", exec->nss().ns()); // Find whether there is an index filter set for the query shape. The 'indexFilterSet' // field will always be false in the case of EOF or idhack plans. diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 44f69823096..62a08389e3f 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -55,6 +55,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" +#include "mongo/db/stats/top.h" #include "mongo/db/storage/storage_options.h" #include "mongo/s/chunk_version.h" #include "mongo/s/stale_exception.h" @@ -62,10 +63,10 @@ #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/scopeguard.h" namespace mongo { -using std::endl; using std::unique_ptr; using stdx::make_unique; @@ -238,47 +239,45 @@ Message getMore(OperationContext* opCtx, const NamespaceString nss(ns); - // Depending on the type of cursor being operated on, we hold locks for the whole getMore, - // or none of the getMore, or part of the getMore. The three cases in detail: + // Cursors come in one of two flavors: + // - Cursors owned by the collection cursor manager, such as those generated via the find + // command. For these cursors, we hold the appropriate collection lock for the duration of + // the getMore using AutoGetCollectionForRead. This will automatically update the CurOp + // object appropriately and record execution time via Top upon completion. + // - Cursors owned by the global cursor manager, such as those generated via the aggregate + // command. These cursors either hold no collection state or manage their collection state + // internally, so we acquire no locks. In this case we use the AutoStatsTracker object to + // update the CurOp object appropriately and record execution time via Top upon + // completion. // - // 1) Normal cursor: we lock with "ctx" and hold it for the whole getMore. - // 2) Cursor owned by global cursor manager: we don't lock anything. These cursors don't own - // any collection state. These cursors are generated either by the listCollections or - // listIndexes commands, as these special cursor-generating commands operate over catalog - // data rather than targeting the data within a collection. - // 3) Agg cursor: we lock with "ctx", then release, then relock with "unpinDBLock" and - // "unpinCollLock". This is because agg cursors handle locking internally (hence the - // release), but the pin and unpin of the cursor must occur under the collection lock. - // We don't use our AutoGetCollectionForRead "ctx" to relock, because - // AutoGetCollectionForRead checks the sharding version (and we want the relock for the - // unpin to succeed even if the sharding version has changed). - // - // Note that we declare our locks before our ClientCursorPin, in order to ensure that the - // pin's destructor is called before the lock destructors (so that the unpin occurs under - // the lock). - unique_ptr<AutoGetCollectionForReadCommand> ctx; - unique_ptr<Lock::DBLock> unpinDBLock; - unique_ptr<Lock::CollectionLock> unpinCollLock; - + // Thus, only one of 'readLock' and 'statsTracker' will be populated as we populate + // 'cursorManager'. + boost::optional<AutoGetCollectionForReadCommand> readLock; + boost::optional<AutoStatsTracker> statsTracker; CursorManager* cursorManager; - if (nss.isListIndexesCursorNS() || nss.isListCollectionsCursorNS()) { - // List collections and list indexes are special cursor-generating commands whose - // cursors are managed globally, as they operate over catalog data rather than targeting - // the data within a collection. + + if (CursorManager::isGloballyManagedCursor(cursorid)) { cursorManager = CursorManager::getGlobalCursorManager(); - } else { - ctx = stdx::make_unique<AutoGetCollectionOrViewForReadCommand>(opCtx, nss); - auto viewCtx = static_cast<AutoGetCollectionOrViewForReadCommand*>(ctx.get()); - if (viewCtx->getView()) { - uasserted( + + if (boost::optional<NamespaceString> nssForCurOp = nss.isGloballyManagedNamespace() + ? nss.getTargetNSForGloballyManagedNamespace() + : nss) { + AutoGetDb autoDb(opCtx, nssForCurOp->db(), MODE_IS); + const auto profilingLevel = autoDb.getDb() + ? boost::optional<int>{autoDb.getDb()->getProfilingLevel()} + : boost::none; + statsTracker.emplace(opCtx, *nssForCurOp, Top::LockType::NotLocked, profilingLevel); + uassert( ErrorCodes::CommandNotSupportedOnView, - str::stream() << "Namespace " << nss.ns() + str::stream() << "Namespace " << nssForCurOp->ns() << " is a view. OP_GET_MORE operations are not supported on views. " << "Only clients which support the getMore command can be used to " - "query views."); + "query views.", + !autoDb.getDb()->getViewCatalog()->lookup(opCtx, nssForCurOp->ns())); } - - Collection* collection = ctx->getCollection(); + } else { + readLock.emplace(opCtx, nss); + Collection* collection = readLock->getCollection(); uassert(17356, "collection dropped between getMore calls", collection); cursorManager = collection->getCursorManager(); } @@ -323,8 +322,8 @@ Message getMore(OperationContext* opCtx, str::stream() << "Requested getMore on namespace " << ns << ", but cursor " << cursorid << " belongs to namespace " - << cc->ns(), - ns == cc->ns()); + << cc->nss().ns(), + nss == cc->nss()); *isCursorAuthorized = true; if (cc->isReadCommitted()) @@ -345,11 +344,6 @@ Message getMore(OperationContext* opCtx, cc->updateSlaveLocation(opCtx); - if (cc->isAggCursor()) { - // Agg cursors handle their own locking internally. - ctx.reset(); // unlocks - } - // If we're replaying the oplog, we save the last time that we read. Timestamp slaveReadTill; @@ -359,12 +353,12 @@ Message getMore(OperationContext* opCtx, uint64_t notifierVersion = 0; std::shared_ptr<CappedInsertNotifier> notifier; if (isCursorAwaitData(cc)) { - invariant(ctx->getCollection()->isCapped()); + invariant(readLock->getCollection()->isCapped()); // Retrieve the notifier which we will wait on until new data arrives. We make sure // to do this in the lock because once we drop the lock it is possible for the // collection to become invalid. The notifier itself will outlive the collection if // the collection is dropped, as we keep a shared_ptr to it. - notifier = ctx->getCollection()->getCappedInsertNotifier(); + notifier = readLock->getCollection()->getCappedInsertNotifier(); // Must get the version before we call generateBatch in case a write comes in after // that call and before we call wait on the notifier. @@ -384,7 +378,7 @@ Message getMore(OperationContext* opCtx, // and currentOp. Upconvert _query to resemble a getMore command, and set the original // command or upconverted legacy query in the originatingCommand field. curOp.setQuery_inlock(upconvertGetMoreEntry(nss, cursorid, ntoreturn)); - curOp.setOriginatingCommand_inlock(cc->getQuery()); + curOp.setOriginatingCommand_inlock(cc->getOriginatingCommandObj()); } PlanExecutor::ExecState state; @@ -402,7 +396,7 @@ Message getMore(OperationContext* opCtx, if (isCursorAwaitData(cc) && state == PlanExecutor::IS_EOF && numResults == 0) { // Save the PlanExecutor and drop our locks. exec->saveState(); - ctx.reset(); + readLock.reset(); // Block waiting for data for up to 1 second. Seconds timeout(1); @@ -414,7 +408,7 @@ Message getMore(OperationContext* opCtx, curOp.setExpectedLatencyMs(durationCount<Milliseconds>(timeout)); // Reacquiring locks. - ctx = make_unique<AutoGetCollectionForReadCommand>(opCtx, nss); + readLock.emplace(opCtx, nss); exec->restoreState(); // We woke up because either the timed_wait expired, or there was more data. Either @@ -428,44 +422,28 @@ Message getMore(OperationContext* opCtx, postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined; curOp.debug().setPlanSummaryMetrics(postExecutionStats); - // We do not report 'execStats' for aggregation, both in the original request and - // subsequent getMore. The reason for this is that aggregation's source PlanExecutor - // could be destroyed before we know whether we need execStats and we do not want to - // generate for all operations due to cost. - if (!cc->isAggCursor() && curOp.shouldDBProfile()) { + // We do not report 'execStats' for aggregation or other globally managed cursors, both in + // the original request and subsequent getMore. It would be useful to have this information + // for an aggregation, but the source PlanExecutor could be destroyed before we know whether + // we need execStats and we do not want to generate for all operations due to cost. + if (!CursorManager::isGloballyManagedCursor(cursorid) && curOp.shouldDBProfile()) { BSONObjBuilder execStatsBob; Explain::getWinningPlanStats(exec, &execStatsBob); curOp.debug().execStats = execStatsBob.obj(); } - // We have to do this before re-acquiring locks in the agg case because - // shouldSaveCursorGetMore() can make a network call for agg cursors. - // - // TODO: Getting rid of PlanExecutor::isEOF() in favor of PlanExecutor::IS_EOF would mean - // that this network operation is no longer necessary. - const bool shouldSaveCursor = shouldSaveCursorGetMore(state, exec, isCursorTailable(cc)); - - // In order to deregister a cursor, we need to be holding the DB + collection lock and - // if the cursor is aggregation, we release these locks. - if (cc->isAggCursor()) { - invariant(NULL == ctx.get()); - unpinDBLock = make_unique<Lock::DBLock>(opCtx, nss.db(), MODE_IS); - unpinCollLock = - make_unique<Lock::CollectionLock>(opCtx->lockState(), nss.ns(), MODE_IS); - } - // Our two possible ClientCursorPin cleanup paths are: // 1) If the cursor is not going to be saved, we call deleteUnderlying() on the pin. - // 2) If the cursor is going to be saved, we simply let the pin go out of scope. In - // this case, the pin's destructor will be invoked, which will call release() on the - // pin. Because our ClientCursorPin is declared after our lock is declared, this - // will happen under the lock. - if (!shouldSaveCursor) { + // 2) If the cursor is going to be saved, we simply let the pin go out of scope. In this + // case, the pin's destructor will be invoked, which will call release() on the pin. + // Because our ClientCursorPin is declared after our lock is declared, this will happen + // under the lock if any locking was necessary. + if (!shouldSaveCursorGetMore(state, exec, isCursorTailable(cc))) { ccPin.getValue().deleteUnderlying(); // cc is now invalid, as is the executor cursorid = 0; - cc = NULL; + cc = nullptr; curOp.debug().cursorExhausted = true; LOG(5) << "getMore NOT saving client cursor, ended with state " @@ -673,10 +651,9 @@ std::string runQuery(OperationContext* opCtx, // Allocate a new ClientCursor and register it with the cursor manager. ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor( - {exec.release(), - nss.ns(), + {std::move(exec), + nss, opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), - qr.getOptions(), upconvertQueryEntry(q.query, qr.nss(), q.ntoreturn, q.ntoskip)}); ccId = pinnedCursor.getCursor()->cursorid(); diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 9aa85a7d81a..be3673369a7 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -739,8 +739,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorDelete(OperationContext* opCtx, << " Using EOF stage: " << redact(unparsedQuery); auto deleteStage = make_unique<DeleteStage>( opCtx, deleteStageParams, ws.get(), nullptr, new EOFStage(opCtx)); - return PlanExecutor::make( - opCtx, std::move(ws), std::move(deleteStage), nss.ns(), policy); + return PlanExecutor::make(opCtx, std::move(ws), std::move(deleteStage), nss, policy); } const IndexDescriptor* descriptor = collection->getIndexCatalog()->findIdIndex(opCtx); @@ -906,7 +905,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorUpdate(OperationContext* opCtx, auto updateStage = make_unique<UpdateStage>( opCtx, updateStageParams, ws.get(), collection, new EOFStage(opCtx)); return PlanExecutor::make( - opCtx, std::move(ws), std::move(updateStage), nsString.ns(), policy); + opCtx, std::move(ws), std::move(updateStage), nsString, policy); } const IndexDescriptor* descriptor = collection->getIndexCatalog()->findIdIndex(opCtx); @@ -1004,8 +1003,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorGroup(OperationContext* opCtx, unique_ptr<PlanStage> root = make_unique<GroupStage>(opCtx, request, ws.get(), new EOFStage(opCtx)); - return PlanExecutor::make( - opCtx, std::move(ws), std::move(root), request.ns.ns(), yieldPolicy); + return PlanExecutor::make(opCtx, std::move(ws), std::move(root), request.ns, yieldPolicy); } const NamespaceString nss(request.ns); @@ -1263,7 +1261,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorCount(OperationContext* opCtx, unique_ptr<PlanStage> root = make_unique<CountStage>( opCtx, collection, std::move(params), ws.get(), new EOFStage(opCtx)); return PlanExecutor::make( - opCtx, std::move(ws), std::move(root), request.getNs().ns(), yieldPolicy); + opCtx, std::move(ws), std::move(root), request.getNs(), yieldPolicy); } // If the query is empty, then we can determine the count by just asking the collection @@ -1280,7 +1278,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorCount(OperationContext* opCtx, unique_ptr<PlanStage> root = make_unique<CountStage>(opCtx, collection, std::move(params), ws.get(), nullptr); return PlanExecutor::make( - opCtx, std::move(ws), std::move(root), request.getNs().ns(), yieldPolicy); + opCtx, std::move(ws), std::move(root), request.getNs(), yieldPolicy); } const size_t plannerOptions = QueryPlannerParams::IS_COUNT; diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp index ade228a0223..f7c3e96bce1 100644 --- a/src/mongo/db/query/internal_plans.cpp +++ b/src/mongo/db/query/internal_plans.cpp @@ -52,8 +52,8 @@ std::unique_ptr<PlanExecutor> InternalPlanner::collectionScan(OperationContext* if (NULL == collection) { auto eof = stdx::make_unique<EOFStage>(opCtx); // Takes ownership of 'ws' and 'eof'. - auto statusWithPlanExecutor = - PlanExecutor::make(opCtx, std::move(ws), std::move(eof), ns.toString(), yieldPolicy); + auto statusWithPlanExecutor = PlanExecutor::make( + opCtx, std::move(ws), std::move(eof), NamespaceString(ns), yieldPolicy); invariant(statusWithPlanExecutor.isOK()); return std::move(statusWithPlanExecutor.getValue()); } diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index dd3f66164f4..8e79fe1a2c5 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -91,17 +91,23 @@ StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx, const Collection* collection, YieldPolicy yieldPolicy) { return PlanExecutor::make( - opCtx, std::move(ws), std::move(rt), nullptr, nullptr, collection, "", yieldPolicy); + opCtx, std::move(ws), std::move(rt), nullptr, nullptr, collection, {}, yieldPolicy); } // static StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx, unique_ptr<WorkingSet> ws, unique_ptr<PlanStage> rt, - const string& ns, + NamespaceString nss, YieldPolicy yieldPolicy) { - return PlanExecutor::make( - opCtx, std::move(ws), std::move(rt), nullptr, nullptr, nullptr, ns, yieldPolicy); + return PlanExecutor::make(opCtx, + std::move(ws), + std::move(rt), + nullptr, + nullptr, + nullptr, + std::move(nss), + yieldPolicy); } // static @@ -112,7 +118,7 @@ StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx, const Collection* collection, YieldPolicy yieldPolicy) { return PlanExecutor::make( - opCtx, std::move(ws), std::move(rt), nullptr, std::move(cq), collection, "", yieldPolicy); + opCtx, std::move(ws), std::move(rt), nullptr, std::move(cq), collection, {}, yieldPolicy); } // static @@ -129,7 +135,7 @@ StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx, std::move(qs), std::move(cq), collection, - "", + {}, yieldPolicy); } @@ -140,10 +146,15 @@ StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx, unique_ptr<QuerySolution> qs, unique_ptr<CanonicalQuery> cq, const Collection* collection, - const string& ns, + NamespaceString nss, YieldPolicy yieldPolicy) { - unique_ptr<PlanExecutor> exec(new PlanExecutor( - opCtx, std::move(ws), std::move(rt), std::move(qs), std::move(cq), collection, ns)); + unique_ptr<PlanExecutor> exec(new PlanExecutor(opCtx, + std::move(ws), + std::move(rt), + std::move(qs), + std::move(cq), + collection, + std::move(nss))); // Perform plan selection, if necessary. Status status = exec->pickBestPlan(yieldPolicy, collection); @@ -160,25 +171,24 @@ PlanExecutor::PlanExecutor(OperationContext* opCtx, unique_ptr<QuerySolution> qs, unique_ptr<CanonicalQuery> cq, const Collection* collection, - const string& ns) + NamespaceString nss) : _opCtx(opCtx), _cq(std::move(cq)), _workingSet(std::move(ws)), _qs(std::move(qs)), _root(std::move(rt)), - _ns(ns), + _nss(std::move(nss)), _yieldPolicy(new PlanYieldPolicy(this, YIELD_MANUAL)) { - // We may still need to initialize _ns from either collection or _cq. - if (!_ns.empty()) { - // We already have an _ns set, so there's nothing more to do. - return; + // We may still need to initialize _nss from either collection or _cq. + if (!_nss.isEmpty()) { + return; // We already have an _nss set, so there's nothing more to do. } if (collection) { - _ns = collection->ns().ns(); + _nss = collection->ns(); } else { invariant(_cq); - _ns = _cq->getQueryRequest().ns(); + _nss = _cq->getQueryRequest().nss(); } } @@ -362,7 +372,7 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, MONGO_FAIL_POINT_BLOCK(planExecutorAlwaysDead, customKill) { const BSONObj& data = customKill.getData(); BSONElement customKillNS = data["namespace"]; - if (!customKillNS || _ns == customKillNS.str()) { + if (!customKillNS || _nss.ns() == customKillNS.str()) { deregisterExec(); kill("hit planExecutorAlwaysDead fail point"); } @@ -469,7 +479,8 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, throw WriteConflictException(); CurOp::get(_opCtx)->debug().writeConflicts++; writeConflictsInARow++; - WriteConflictException::logAndBackoff(writeConflictsInARow, "plan execution", _ns); + WriteConflictException::logAndBackoff( + writeConflictsInARow, "plan execution", _nss.ns()); } else { WorkingSetMember* member = _workingSet->get(id); @@ -546,10 +557,6 @@ Status PlanExecutor::executePlan() { return Status::OK(); } -const string& PlanExecutor::ns() { - return _ns; -} - void PlanExecutor::setYieldPolicy(YieldPolicy policy, const Collection* collection, bool registerExecutor) { diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index 1e97963f67b..325d7642a91 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -153,7 +153,7 @@ public: static StatusWith<std::unique_ptr<PlanExecutor>> make(OperationContext* opCtx, std::unique_ptr<WorkingSet> ws, std::unique_ptr<PlanStage> rt, - const std::string& ns, + NamespaceString nss, YieldPolicy yieldPolicy); /** @@ -203,7 +203,9 @@ public: /** * Return the NS that the query is running over. */ - const std::string& ns(); + const NamespaceString& nss() const { + return _nss; + } /** * Return the OperationContext that the plan is currently executing within. @@ -408,7 +410,7 @@ private: std::unique_ptr<QuerySolution> qs, std::unique_ptr<CanonicalQuery> cq, const Collection* collection, - const std::string& ns); + NamespaceString nss); /** * Public factory methods delegate to this private factory to do their work. @@ -419,7 +421,7 @@ private: std::unique_ptr<QuerySolution> qs, std::unique_ptr<CanonicalQuery> cq, const Collection* collection, - const std::string& ns, + NamespaceString nss, YieldPolicy yieldPolicy); /** @@ -462,7 +464,7 @@ private: std::unique_ptr<ScopedExecutorRegistration> _safety; // What namespace are we operating over? - std::string _ns; + NamespaceString _nss; // This is used to handle automatic yielding when allowed by the YieldPolicy. Never NULL. // TODO make this a non-pointer member. This requires some header shuffling so that this diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp index d84db2b225d..f315daec285 100644 --- a/src/mongo/db/query/plan_yield_policy.cpp +++ b/src/mongo/db/query/plan_yield_policy.cpp @@ -108,14 +108,14 @@ bool PlanYieldPolicy::yield(RecordFetcher* fetcher) { opCtx->recoveryUnit()->abandonSnapshot(); } else { // Release and reacquire locks. - QueryYield::yieldAllLocks(opCtx, fetcher, _planYielding->ns()); + QueryYield::yieldAllLocks(opCtx, fetcher, _planYielding->nss()); } return _planYielding->restoreStateWithoutRetrying(); } catch (const WriteConflictException& wce) { CurOp::get(opCtx)->debug().writeConflicts++; WriteConflictException::logAndBackoff( - attempt, "plan execution restoreState", _planYielding->ns()); + attempt, "plan execution restoreState", _planYielding->nss().ns()); // retry } } diff --git a/src/mongo/db/query/query_yield.cpp b/src/mongo/db/query/query_yield.cpp index 5f43cf819ec..37ecc4bf528 100644 --- a/src/mongo/db/query/query_yield.cpp +++ b/src/mongo/db/query/query_yield.cpp @@ -47,7 +47,7 @@ MONGO_FP_DECLARE(setYieldAllLocksWait); // static void QueryYield::yieldAllLocks(OperationContext* opCtx, RecordFetcher* fetcher, - const std::string& planExecNS) { + const NamespaceString& planExecNS) { // Things have to happen here in a specific order: // 1) Tell the RecordFetcher to do any setup which needs to happen inside locks // 2) Release lock mgr locks diff --git a/src/mongo/db/query/query_yield.h b/src/mongo/db/query/query_yield.h index a42e29800c9..7d98b299484 100644 --- a/src/mongo/db/query/query_yield.h +++ b/src/mongo/db/query/query_yield.h @@ -28,7 +28,7 @@ #pragma once -#include <string> +#include "mongo/db/namespace_string.h" namespace mongo { @@ -50,7 +50,7 @@ public: */ static void yieldAllLocks(OperationContext* opCtx, RecordFetcher* fetcher, - const std::string& planExecNS); + const NamespaceString& planExecNS); }; } // namespace mongo diff --git a/src/mongo/db/range_preserver.h b/src/mongo/db/range_preserver.h index 55e290d52d1..35b7694234e 100644 --- a/src/mongo/db/range_preserver.h +++ b/src/mongo/db/range_preserver.h @@ -54,9 +54,15 @@ public: } } - ~RangePreserver() { - if (_pin) + void release() { + if (_pin) { _pin->deleteUnderlying(); + _pin.reset(); + } + } + + ~RangePreserver() { + release(); } private: diff --git a/src/mongo/db/stats/top.cpp b/src/mongo/db/stats/top.cpp index 8a8358030b8..45680f7db15 100644 --- a/src/mongo/db/stats/top.cpp +++ b/src/mongo/db/stats/top.cpp @@ -75,7 +75,7 @@ Top& Top::get(ServiceContext* service) { void Top::record(OperationContext* opCtx, StringData ns, LogicalOp logicalOp, - int lockType, + LockType lockType, long long micros, bool command, Command::ReadWriteType readWriteType) { @@ -97,7 +97,7 @@ void Top::record(OperationContext* opCtx, void Top::_record(OperationContext* opCtx, CollectionData& c, LogicalOp logicalOp, - int lockType, + LockType lockType, long long micros, Command::ReadWriteType readWriteType) { @@ -105,9 +105,9 @@ void Top::_record(OperationContext* opCtx, c.total.inc(micros); - if (lockType > 0) + if (lockType == LockType::WriteLocked) c.writeLock.inc(micros); - else if (lockType < 0) + else if (lockType == LockType::ReadLocked) c.readLock.inc(micros); switch (logicalOp) { diff --git a/src/mongo/db/stats/top.h b/src/mongo/db/stats/top.h index 1f08e4784a5..463482199fe 100644 --- a/src/mongo/db/stats/top.h +++ b/src/mongo/db/stats/top.h @@ -84,13 +84,19 @@ public: OperationLatencyHistogram opLatencyHistogram; }; + enum class LockType { + ReadLocked, + WriteLocked, + NotLocked, + }; + typedef StringMap<CollectionData> UsageMap; public: void record(OperationContext* opCtx, StringData ns, LogicalOp logicalOp, - int lockType, + LockType lockType, long long micros, bool command, Command::ReadWriteType readWriteType); @@ -126,7 +132,7 @@ private: void _record(OperationContext* opCtx, CollectionData& c, LogicalOp logicalOp, - int lockType, + LockType lockType, long long micros, Command::ReadWriteType readWriteType); diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index 58df14b77b4..a6588ffc6b2 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -108,10 +108,7 @@ protected: getExecutor(&_opCtx, ctx.getCollection(), std::move(cq), PlanExecutor::YIELD_MANUAL)); exec->saveState(); - exec->registerExec(ctx.getCollection()); - - _source = - DocumentSourceCursor::create(ctx.getCollection(), nss.ns(), std::move(exec), _ctx); + _source = DocumentSourceCursor::create(ctx.getCollection(), std::move(exec), _ctx); } intrusive_ptr<ExpressionContextForTest> ctx() { diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index 5fbc726b51a..b2e15787c0d 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -91,13 +91,10 @@ public: } /** - * Given a match expression, represented as the BSON object 'filterObj', - * create a PlanExecutor capable of executing a simple collection - * scan. - * - * The caller takes ownership of the returned PlanExecutor*. + * Given a match expression, represented as the BSON object 'filterObj', create a PlanExecutor + * capable of executing a simple collection scan. */ - PlanExecutor* makeCollScanExec(Collection* coll, BSONObj& filterObj) { + unique_ptr<PlanExecutor> makeCollScanExec(Collection* coll, BSONObj& filterObj) { CollectionScanParams csparams; csparams.collection = coll; csparams.direction = CollectionScanParams::FORWARD; @@ -124,7 +121,7 @@ public: coll, PlanExecutor::YIELD_MANUAL); ASSERT_OK(statusWithPlanExecutor.getStatus()); - return statusWithPlanExecutor.getValue().release(); + return std::move(statusWithPlanExecutor.getValue()); } /** @@ -302,8 +299,7 @@ public: // Wrap the "inner" plan executor in a DocumentSourceCursor and add it as the first source // in the pipeline. innerExec->saveState(); - auto cursorSource = - DocumentSourceCursor::create(collection, nss.ns(), std::move(innerExec), expCtx); + auto cursorSource = DocumentSourceCursor::create(collection, std::move(innerExec), expCtx); auto pipeline = assertGet(Pipeline::create({cursorSource}, expCtx)); // Create the output PlanExecutor that pulls results from the pipeline. @@ -323,7 +319,7 @@ public: // Verify that the aggregation pipeline returns an error because its "inner" plan executor // has been killed due to the collection being dropped. - ASSERT_THROWS_CODE(pipeline->getNext(), UserException, 16028); + ASSERT_THROWS_CODE(pipeline->getNext(), UserException, ErrorCodes::QueryPlanKilled); // Verify that the "outer" plan executor has been killed due to the collection being // dropped. @@ -450,10 +446,10 @@ public: BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); Collection* coll = ctx.getCollection(); - PlanExecutor* exec = makeCollScanExec(coll, filterObj); + auto exec = makeCollScanExec(coll, filterObj); // Make a client cursor from the plan executor. - coll->getCursorManager()->registerCursor({exec, nss.ns(), false, 0, BSONObj()}); + coll->getCursorManager()->registerCursor({std::move(exec), nss, false, BSONObj()}); // There should be one cursor before invalidation, // and zero cursors after invalidation. @@ -476,11 +472,11 @@ public: Collection* collection = ctx.getCollection(); BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); - PlanExecutor* exec = makeCollScanExec(collection, filterObj); + auto exec = makeCollScanExec(collection, filterObj); // Make a client cursor from the plan executor. - auto ccPin = - collection->getCursorManager()->registerCursor({exec, nss.ns(), false, 0, BSONObj()}); + auto ccPin = collection->getCursorManager()->registerCursor( + {std::move(exec), nss, false, BSONObj()}); // If the cursor is pinned, it sticks around, even after invalidation. ASSERT_EQUALS(1U, numCursors()); @@ -490,7 +486,7 @@ public: // The invalidation should have killed the plan executor. BSONObj objOut; - ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); + ASSERT_EQUALS(PlanExecutor::DEAD, ccPin.getCursor()->getExecutor()->getNext(&objOut, NULL)); ASSERT(WorkingSetCommon::isValidStatusMemberObject(objOut)); const Status status = WorkingSetCommon::getMemberObjectStatus(objOut); ASSERT(status.reason().find(invalidateReason) != string::npos); @@ -519,10 +515,11 @@ public: Collection* collection = ctx.getCollection(); BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); - PlanExecutor* exec = makeCollScanExec(collection, filterObj); + auto exec = makeCollScanExec(collection, filterObj); // Make a client cursor from the plan executor. - collection->getCursorManager()->registerCursor({exec, nss.ns(), false, 0, BSONObj()}); + collection->getCursorManager()->registerCursor( + {std::move(exec), nss, false, BSONObj()}); } // There should be one cursor before timeout, diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp index 8171cb96cbe..d705399e27c 100644 --- a/src/mongo/dbtests/querytests.cpp +++ b/src/mongo/dbtests/querytests.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" +#include <boost/optional.hpp> #include <iostream> #include "mongo/client/dbclientcursor.h" @@ -39,6 +40,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/exec/queued_data_stage.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/json.h" #include "mongo/db/lasterror.h" @@ -244,8 +246,8 @@ protected: return !_client.getPrevError().getField("err").isNull(); } - const ServiceContext::UniqueOperationContext _txnPtr = cc().makeOperationContext(); - OperationContext& _opCtx = *_txnPtr; + const ServiceContext::UniqueOperationContext _opCtxPtr = cc().makeOperationContext(); + OperationContext& _opCtx = *_opCtxPtr; DBDirectClient _client; }; @@ -1751,6 +1753,107 @@ public: } }; +class CursorManagerIsGloballyManagedCursorShouldReturnFalseIfLeadingBitsAreZeroes { +public: + void run() { + ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x0000000000000000)); + ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x000000000FFFFFFF)); + ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x000000007FFFFFFF)); + ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x0FFFFFFFFFFFFFFF)); + ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x3FFFFFFFFFFFFFFF)); + ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x3dedbeefdeadbeef)); + } +}; + +class CursorManagerIsGloballyManagedCursorShouldReturnTrueIfLeadingBitsAreZeroAndOne { +public: + void run() { + ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x4FFFFFFFFFFFFFFF)); + ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x5FFFFFFFFFFFFFFF)); + ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x6FFFFFFFFFFFFFFF)); + ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x7FFFFFFFFFFFFFFF)); + ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x4000000000000000)); + ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x4dedbeefdeadbeef)); + } +}; + +class CursorManagerIsGloballyManagedCursorShouldReturnFalseIfLeadingBitIsAOne { +public: + void run() { + ASSERT_FALSE(CursorManager::isGloballyManagedCursor(~0LL)); + ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0xFFFFFFFFFFFFFFFF)); + ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x8FFFFFFFFFFFFFFF)); + ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x8dedbeefdeadbeef)); + } +}; + +class CursorManagerTest { +public: + std::unique_ptr<PlanExecutor> makeFakePlanExecutor(OperationContext* opCtx) { + auto workingSet = stdx::make_unique<WorkingSet>(); + auto queuedDataStage = stdx::make_unique<QueuedDataStage>(opCtx, workingSet.get()); + return unittest::assertGet(PlanExecutor::make(opCtx, + std::move(workingSet), + std::move(queuedDataStage), + NamespaceString{"test.collection"}, + PlanExecutor::YieldPolicy::YIELD_MANUAL)); + } +}; + +class GlobalCursorManagerShouldReportOwnershipOfCursorsItCreated : public CursorManagerTest { +public: + void run() { + auto opCtx = cc().makeOperationContext(); + for (int i = 0; i < 1000; i++) { + auto exec = makeFakePlanExecutor(opCtx.get()); + auto cursorPin = CursorManager::getGlobalCursorManager()->registerCursor( + {std::move(exec), NamespaceString{"test.collection"}, false, BSONObj()}); + ASSERT_TRUE(CursorManager::isGloballyManagedCursor(cursorPin.getCursor()->cursorid())); + } + } +}; + +class CursorsFromCollectionCursorManagerShouldNotReportBeingManagedByGlobalCursorManager + : public CursorManagerTest { +public: + void run() { + CursorManager testManager(NamespaceString{"test.collection"}); + auto opCtx = cc().makeOperationContext(); + for (int i = 0; i < 1000; i++) { + auto exec = makeFakePlanExecutor(opCtx.get()); + auto cursorPin = testManager.registerCursor( + {std::move(exec), NamespaceString{"test.collection"}, false, BSONObj()}); + ASSERT_FALSE(CursorManager::isGloballyManagedCursor(cursorPin.getCursor()->cursorid())); + } + } +}; + +class AllCursorsFromCollectionCursorManagerShouldContainIdentical32BitPrefixes + : public CursorManagerTest { +public: + void run() { + CursorManager testManager(NamespaceString{"test.collection"}); + auto opCtx = cc().makeOperationContext(); + boost::optional<uint32_t> prefix; + for (int i = 0; i < 1000; i++) { + auto exec = makeFakePlanExecutor(opCtx.get()); + auto cursorPin = testManager.registerCursor( + {std::move(exec), NamespaceString{"test.collection"}, false, BSONObj()}); + auto cursorId = cursorPin.getCursor()->cursorid(); + if (prefix) { + ASSERT_EQ(*prefix, extractLeading32Bits(cursorId)); + } else { + prefix = extractLeading32Bits(cursorId); + } + } + } + +private: + uint32_t extractLeading32Bits(CursorId cursorId) { + return static_cast<uint32_t>((cursorId & 0xFFFFFFFF00000000) >> 32); + } +}; + class All : public Suite { public: All() : Suite("query") {} @@ -1808,10 +1911,14 @@ public: add<QueryCursorTimeout>(); add<QueryReadsAll>(); add<KillPinnedCursor>(); - add<queryobjecttests::names1>(); - add<OrderingTest>(); + add<CursorManagerIsGloballyManagedCursorShouldReturnFalseIfLeadingBitsAreZeroes>(); + add<CursorManagerIsGloballyManagedCursorShouldReturnTrueIfLeadingBitsAreZeroAndOne>(); + add<CursorManagerIsGloballyManagedCursorShouldReturnFalseIfLeadingBitIsAOne>(); + add<GlobalCursorManagerShouldReportOwnershipOfCursorsItCreated>(); + add<CursorsFromCollectionCursorManagerShouldNotReportBeingManagedByGlobalCursorManager>(); + add<AllCursorsFromCollectionCursorManagerShouldContainIdentical32BitPrefixes>(); } }; |