diff options
Diffstat (limited to 'src/mongo/db')
110 files changed, 1322 insertions, 1042 deletions
diff --git a/src/mongo/db/catalog/capped_utils.cpp b/src/mongo/db/catalog/capped_utils.cpp index 1140c54b1e9..6b4119d5a12 100644 --- a/src/mongo/db/catalog/capped_utils.cpp +++ b/src/mongo/db/catalog/capped_utils.cpp @@ -164,10 +164,11 @@ Status cloneCollectionAsCapped(OperationContext* opCtx, long long excessSize = fromCollection->dataSize(opCtx) - allocatedSpaceGuess; - std::unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan( - opCtx, fromNss.ns(), fromCollection, PlanExecutor::YIELD_MANUAL, InternalPlanner::FORWARD)); - - exec->setYieldPolicy(PlanExecutor::WRITE_CONFLICT_RETRY_ONLY, fromCollection); + auto exec = InternalPlanner::collectionScan(opCtx, + fromNss.ns(), + fromCollection, + PlanExecutor::WRITE_CONFLICT_RETRY_ONLY, + InternalPlanner::FORWARD); Snapshotted<BSONObj> objToClone; RecordId loc; diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 97d22b504b1..9b5254464a6 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -924,7 +924,7 @@ Status CollectionImpl::truncate(OperationContext* opCtx) { Status status = _indexCatalog.dropAllIndexes(opCtx, true); if (!status.isOK()) return status; - _cursorManager.invalidateAll(false, "collection truncated"); + _cursorManager.invalidateAll(opCtx, false, "collection truncated"); // 3) truncate record store status = _recordStore->truncate(opCtx); @@ -947,7 +947,7 @@ void CollectionImpl::cappedTruncateAfter(OperationContext* opCtx, RecordId end, BackgroundOperation::assertNoBgOpInProgForNs(ns()); invariant(_indexCatalog.numIndexesInProgress(opCtx) == 0); - _cursorManager.invalidateAll(false, "capped collection truncated"); + _cursorManager.invalidateAll(opCtx, false, "capped collection truncated"); _recordStore->cappedTruncateAfter(opCtx, end, inclusive); } diff --git a/src/mongo/db/catalog/cursor_manager.cpp b/src/mongo/db/catalog/cursor_manager.cpp index 25f0be7edec..e3498f55f54 100644 --- a/src/mongo/db/catalog/cursor_manager.cpp +++ b/src/mongo/db/catalog/cursor_manager.cpp @@ -165,7 +165,7 @@ bool GlobalCursorIdCache::eraseCursor(OperationContext* opCtx, CursorId id, bool // Figure out what the namespace of this cursor is. NamespaceString nss; if (CursorManager::isGloballyManagedCursor(id)) { - auto pin = globalCursorManager->pinCursor(id); + auto pin = globalCursorManager->pinCursor(opCtx, id); if (!pin.isOK()) { invariant(pin == ErrorCodes::CursorNotFound); // No such cursor. TODO: Consider writing to audit log here (even though we don't @@ -229,7 +229,7 @@ std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* opCtx, int mil size_t totalTimedOut = 0; // Time out the cursors from the global cursor manager. - totalTimedOut += globalCursorManager->timeoutCursors(millisSinceLastCall); + totalTimedOut += globalCursorManager->timeoutCursors(opCtx, millisSinceLastCall); // Compute the set of collection names that we have to time out cursors for. vector<NamespaceString> todo; @@ -253,7 +253,7 @@ std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* opCtx, int mil continue; } - totalTimedOut += collection->getCursorManager()->timeoutCursors(millisSinceLastCall); + totalTimedOut += collection->getCursorManager()->timeoutCursors(opCtx, millisSinceLastCall); } return totalTimedOut; @@ -300,86 +300,54 @@ CursorManager::CursorManager(NamespaceString nss) : _nss(std::move(nss)) { } CursorManager::~CursorManager() { - invalidateAll(true, "collection going away"); if (!isGlobalManager()) { globalCursorIdCache->deregisterCursorManager(_collectionCacheRuntimeId, _nss); } + invariant(_cursors.empty()); + invariant(_nonCachedExecutors.empty()); } -void CursorManager::invalidateAll(bool collectionGoingAway, const std::string& reason) { - vector<ClientCursor*> toDelete; +void CursorManager::invalidateAll(OperationContext* opCtx, + bool collectionGoingAway, + const std::string& reason) { + invariant(!isGlobalManager()); // The global cursor manager should never need to kill cursors. + stdx::lock_guard<SimpleMutex> lk(_mutex); + fassert(28819, !BackgroundOperation::inProgForNs(_nss)); - { - stdx::lock_guard<SimpleMutex> lk(_mutex); - fassert(28819, !BackgroundOperation::inProgForNs(_nss)); + for (auto&& exec : _nonCachedExecutors) { + // We kill the executor, but it deletes itself. + exec->markAsKilled(reason); + } + _nonCachedExecutors.clear(); - for (ExecSet::iterator it = _nonCachedExecutors.begin(); it != _nonCachedExecutors.end(); - ++it) { - // we kill the executor, but it deletes itself - PlanExecutor* exec = *it; - exec->kill(reason); + CursorMap newMap; + for (auto&& entry : _cursors) { + auto* cursor = entry.second; + cursor->markAsKilled(reason); + + if (cursor->_isPinned) { + // There is an active user of this cursor, who is now responsible for cleaning it up. + // This CursorManager will no longer track this cursor. + continue; } - _nonCachedExecutors.clear(); - - if (collectionGoingAway) { - // we're going to wipe out the world - for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) { - ClientCursor* cc = i->second; - - cc->kill(); - - // If the CC is pinned, somebody is actively using it and we do not delete it. - // Instead we notify the holder that we killed it. The holder will then delete the - // CC. - // - // If the CC is not pinned, there is nobody actively holding it. We can safely - // delete it. - if (!cc->_isPinned) { - toDelete.push_back(cc); - } - } + + if (!collectionGoingAway) { + // We keep around unpinned cursors so that future attempts to use the cursor will result + // in a useful error message. + newMap.insert(entry); } else { - CursorMap newMap; - - // collection will still be around, just all PlanExecutors are invalid - for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) { - ClientCursor* cc = i->second; - - // Note that a valid ClientCursor state is "no cursor no executor." This is because - // the set of active cursor IDs in ClientCursor is used as representation of query - // state. - if (!cc->getExecutor()) { - newMap.insert(*i); - continue; - } - - if (cc->_isPinned) { - // Pinned cursors need to stay alive, so we leave them around. - if (cc->getExecutor()) - cc->getExecutor()->kill(reason); - newMap.insert(*i); - } else { - cc->kill(); - toDelete.push_back(cc); - } - } - - _cursors = newMap; + // The collection is going away, so there's no point in keeping any state. + cursor->dispose(opCtx); + delete cursor; } } - - // ClientCursors must be destroyed without holding '_mutex'. This is because the destruction of - // a ClientCursor may itself require accessing another CursorManager (e.g. when deregistering a - // non-cached PlanExecutor from a $lookup stage). We won't access this CursorManger when - // destroying a ClientCursor because we've already killed all of its non-cached PlanExecutors. - for (auto* cursor : toDelete) { - delete cursor; - } + _cursors = newMap; } void CursorManager::invalidateDocument(OperationContext* opCtx, const RecordId& dl, InvalidationType type) { + invariant(!isGlobalManager()); // The global cursor manager should never receive invalidations. if (supportsDocLocking()) { // If a storage engine supports doc locking, then we do not need to invalidate. // The transactional boundaries of the operation protect us. @@ -402,31 +370,24 @@ void CursorManager::invalidateDocument(OperationContext* opCtx, } } -std::size_t CursorManager::timeoutCursors(int millisSinceLastCall) { +std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, int millisSinceLastCall) { vector<ClientCursor*> toDelete; - { - stdx::lock_guard<SimpleMutex> lk(_mutex); - - for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) { - ClientCursor* cc = i->second; - // shouldTimeout() ensures that we skip pinned cursors. - if (cc->shouldTimeout(millisSinceLastCall)) - toDelete.push_back(cc); - } + stdx::lock_guard<SimpleMutex> lk(_mutex); - for (vector<ClientCursor*>::const_iterator i = toDelete.begin(); i != toDelete.end(); ++i) { - ClientCursor* cc = *i; - _deregisterCursor_inlock(cc); - cc->kill(); - } + for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) { + ClientCursor* cc = i->second; + // shouldTimeout() ensures that we skip pinned cursors. + if (cc->shouldTimeout(millisSinceLastCall)) + toDelete.push_back(cc); } - // ClientCursors must be destroyed without holding '_mutex'. This is because the destruction of - // a ClientCursor may itself require accessing this CursorManager (e.g. when deregistering a - // non-cached PlanExecutor). - for (auto* cursor : toDelete) { - delete cursor; + // Properly dispose of each cursor that was timed out. + for (vector<ClientCursor*>::const_iterator i = toDelete.begin(); i != toDelete.end(); ++i) { + ClientCursor* cc = *i; + _deregisterCursor_inlock(cc); + cc->dispose(opCtx); + delete cc; } return toDelete.size(); @@ -443,7 +404,7 @@ void CursorManager::deregisterExecutor(PlanExecutor* exec) { _nonCachedExecutors.erase(exec); } -StatusWith<ClientCursorPin> CursorManager::pinCursor(CursorId id) { +StatusWith<ClientCursorPin> CursorManager::pinCursor(OperationContext* opCtx, CursorId id) { stdx::lock_guard<SimpleMutex> lk(_mutex); CursorMap::const_iterator it = _cursors.find(id); if (it == _cursors.end()) { @@ -452,8 +413,19 @@ StatusWith<ClientCursorPin> CursorManager::pinCursor(CursorId id) { ClientCursor* cursor = it->second; uassert(12051, str::stream() << "cursor id " << id << " is already in use", !cursor->_isPinned); + if (cursor->_killed) { + // This cursor was killed while it was idle. + invariant(cursor->getExecutor()); // We should never unpin RangePreserver cursors. + Status error{ErrorCodes::QueryPlanKilled, + str::stream() << "cursor killed because: " + << cursor->getExecutor()->getKillReason()}; + _deregisterCursor_inlock(cursor); + cursor->dispose(opCtx); + delete cursor; + return error; + } cursor->_isPinned = true; - return ClientCursorPin(cursor); + return ClientCursorPin(opCtx, cursor); } void CursorManager::unpin(ClientCursor* cursor) { @@ -500,31 +472,40 @@ CursorId CursorManager::_allocateCursorId_inlock() { fassertFailed(17360); } -ClientCursorPin CursorManager::registerCursor(ClientCursorParams&& cursorParams) { +ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx, + ClientCursorParams&& cursorParams) { stdx::lock_guard<SimpleMutex> lk(_mutex); + // Make sure the PlanExecutor isn't registered, since we will register the ClientCursor wrapping + // it. + invariant(cursorParams.exec); + _nonCachedExecutors.erase(cursorParams.exec.get()); + cursorParams.exec.get_deleter().dismissDisposal(); + cursorParams.exec->unsetRegistered(); + CursorId cursorId = _allocateCursorId_inlock(); std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor( new ClientCursor(std::move(cursorParams), this, cursorId)); - return _registerCursor_inlock(std::move(clientCursor)); + return _registerCursor_inlock(opCtx, std::move(clientCursor)); } -ClientCursorPin CursorManager::registerRangePreserverCursor(const Collection* collection) { +ClientCursorPin CursorManager::registerRangePreserverCursor(OperationContext* opCtx, + const Collection* collection) { stdx::lock_guard<SimpleMutex> lk(_mutex); CursorId cursorId = _allocateCursorId_inlock(); std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor( new ClientCursor(collection, this, cursorId)); - return _registerCursor_inlock(std::move(clientCursor)); + return _registerCursor_inlock(opCtx, std::move(clientCursor)); } ClientCursorPin CursorManager::_registerCursor_inlock( - std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor) { + OperationContext* opCtx, std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor) { CursorId cursorId = clientCursor->cursorid(); invariant(cursorId); // Transfer ownership of the cursor to '_cursors'. ClientCursor* unownedCursor = clientCursor.release(); _cursors[cursorId] = unownedCursor; - return ClientCursorPin(unownedCursor); + return ClientCursorPin(opCtx, unownedCursor); } void CursorManager::deregisterCursor(ClientCursor* cc) { @@ -533,42 +514,33 @@ void CursorManager::deregisterCursor(ClientCursor* cc) { } Status CursorManager::eraseCursor(OperationContext* opCtx, CursorId id, bool shouldAudit) { - ClientCursor* cursor; - - { - stdx::lock_guard<SimpleMutex> lk(_mutex); + stdx::lock_guard<SimpleMutex> lk(_mutex); - CursorMap::iterator it = _cursors.find(id); - if (it == _cursors.end()) { - if (shouldAudit) { - audit::logKillCursorsAuthzCheck( - opCtx->getClient(), _nss, id, ErrorCodes::CursorNotFound); - } - return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id}; + CursorMap::iterator it = _cursors.find(id); + if (it == _cursors.end()) { + if (shouldAudit) { + audit::logKillCursorsAuthzCheck( + opCtx->getClient(), _nss, id, ErrorCodes::CursorNotFound); } + return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id}; + } - cursor = it->second; - - if (cursor->_isPinned) { - if (shouldAudit) { - audit::logKillCursorsAuthzCheck( - opCtx->getClient(), _nss, id, ErrorCodes::OperationFailed); - } - return {ErrorCodes::OperationFailed, - str::stream() << "Cannot kill pinned cursor: " << id}; - } + auto cursor = it->second; + if (cursor->_isPinned) { if (shouldAudit) { - audit::logKillCursorsAuthzCheck(opCtx->getClient(), _nss, id, ErrorCodes::OK); + audit::logKillCursorsAuthzCheck( + opCtx->getClient(), _nss, id, ErrorCodes::OperationFailed); } + return {ErrorCodes::OperationFailed, str::stream() << "Cannot kill pinned cursor: " << id}; + } - cursor->kill(); - _deregisterCursor_inlock(cursor); + if (shouldAudit) { + audit::logKillCursorsAuthzCheck(opCtx->getClient(), _nss, id, ErrorCodes::OK); } - // ClientCursors must be destroyed without holding '_mutex'. This is because the destruction of - // a ClientCursor may itself require accessing this CursorManager (e.g. when deregistering a - // non-cached PlanExecutor). + _deregisterCursor_inlock(cursor); + cursor->dispose(opCtx); delete cursor; return Status::OK(); } diff --git a/src/mongo/db/catalog/cursor_manager.h b/src/mongo/db/catalog/cursor_manager.h index 7e4da59b626..d8aa4be0bed 100644 --- a/src/mongo/db/catalog/cursor_manager.h +++ b/src/mongo/db/catalog/cursor_manager.h @@ -76,8 +76,8 @@ public: CursorManager(NamespaceString nss); /** - * Destroys the CursorManager. Managed cursors which are not pinned are destroyed. Ownership of - * pinned cursors is transferred to the corresponding ClientCursorPin. + * Destroys the CursorManager. All cursors and PlanExecutors must be cleaned up via + * invalidateAll() before destruction. */ ~CursorManager(); @@ -93,7 +93,9 @@ public: * reporting and logging when an operation finds that the cursor it was operating on has been * killed. */ - void invalidateAll(bool collectionGoingAway, const std::string& reason); + void invalidateAll(OperationContext* opCtx, + bool collectionGoingAway, + const std::string& reason); /** * Broadcast a document invalidation to all relevant PlanExecutor(s). invalidateDocument @@ -106,7 +108,7 @@ public: * * Returns the number of cursors that were timed out. */ - std::size_t timeoutCursors(int millisSinceLastCall); + std::size_t timeoutCursors(OperationContext* opCtx, int millisSinceLastCall); /** * Register an executor so that it can be notified of deletion/invalidation during yields. @@ -116,7 +118,8 @@ public: void registerExecutor(PlanExecutor* exec); /** - * Remove an executor from the registry. + * Remove an executor from the registry. It is legal to call this even if 'exec' is not + * registered. */ void deregisterExecutor(PlanExecutor* exec); @@ -124,30 +127,32 @@ public: * Constructs a new ClientCursor according to the given 'cursorParams'. The cursor is atomically * registered with the manager and returned in pinned state. */ - ClientCursorPin registerCursor(ClientCursorParams&& cursorParams); + ClientCursorPin registerCursor(OperationContext* opCtx, ClientCursorParams&& cursorParams); /** * Constructs and pins a special ClientCursor used to track sharding state for the given * collection. See range_preserver.h for more details. */ - ClientCursorPin registerRangePreserverCursor(const Collection* collection); + ClientCursorPin registerRangePreserverCursor(OperationContext* opCtx, + const Collection* collection); /** * Pins and returns the cursor with the given id. * - * Returns ErrorCodes::CursorNotFound if the cursor does not exist. + * Returns ErrorCodes::CursorNotFound if the cursor does not exist or + * ErrorCodes::QueryPlanKilled if the cursor was killed in between uses. * * Throws a UserException if the cursor is already pinned. Callers need not specially handle * this error, as it should only happen if a misbehaving client attempts to simultaneously issue * two operations against the same cursor id. */ - StatusWith<ClientCursorPin> pinCursor(CursorId id); + StatusWith<ClientCursorPin> pinCursor(OperationContext* opCtx, CursorId id); /** * Returns an OK status if the cursor was successfully erased. * - * Returns error code CursorNotFound if the cursor id is not owned by this manager. Returns - * error code OperationFailed if attempting to erase a pinned cursor. + * Returns ErrorCodes::CursorNotFound if the cursor id is not owned by this manager. Returns + * ErrorCodes::OperationFailed if attempting to erase a pinned cursor. * * If 'shouldAudit' is true, will perform audit logging. */ @@ -192,7 +197,7 @@ private: CursorId _allocateCursorId_inlock(); void _deregisterCursor_inlock(ClientCursor* cc); ClientCursorPin _registerCursor_inlock( - std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor); + OperationContext* opCtx, std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor); void deregisterCursor(ClientCursor* cc); diff --git a/src/mongo/db/catalog/database.h b/src/mongo/db/catalog/database.h index f483ceca79c..e5b83da2ba0 100644 --- a/src/mongo/db/catalog/database.h +++ b/src/mongo/db/catalog/database.h @@ -60,7 +60,7 @@ public: virtual void init(OperationContext* opCtx) = 0; - virtual void close(OperationContext* opCtx) = 0; + virtual void close(OperationContext* opCtx, const std::string& reason) = 0; virtual const std::string& name() const = 0; @@ -190,8 +190,8 @@ public: } // closes files and other cleanup see below. - inline void close(OperationContext* const opCtx) { - return this->_impl().close(opCtx); + inline void close(OperationContext* const opCtx, const std::string& reason) { + return this->_impl().close(opCtx, reason); } inline const std::string& name() const { diff --git a/src/mongo/db/catalog/database_holder.cpp b/src/mongo/db/catalog/database_holder.cpp index d3ee0e2e37f..bf13aea1860 100644 --- a/src/mongo/db/catalog/database_holder.cpp +++ b/src/mongo/db/catalog/database_holder.cpp @@ -169,7 +169,7 @@ Database* DatabaseHolder::openDb(OperationContext* opCtx, StringData ns, bool* j return it->second; } -void DatabaseHolder::close(OperationContext* opCtx, StringData ns) { +void DatabaseHolder::close(OperationContext* opCtx, StringData ns, const std::string& reason) { invariant(opCtx->lockState()->isW()); const StringData dbName = _todb(ns); @@ -181,14 +181,17 @@ void DatabaseHolder::close(OperationContext* opCtx, StringData ns) { return; } - it->second->close(opCtx); + it->second->close(opCtx, reason); delete it->second; _dbs.erase(it); getGlobalServiceContext()->getGlobalStorageEngine()->closeDatabase(opCtx, dbName.toString()); } -bool DatabaseHolder::closeAll(OperationContext* opCtx, BSONObjBuilder& result, bool force) { +bool DatabaseHolder::closeAll(OperationContext* opCtx, + BSONObjBuilder& result, + bool force, + const std::string& reason) { invariant(opCtx->lockState()->isW()); stdx::lock_guard<SimpleMutex> lk(_m); @@ -213,7 +216,7 @@ bool DatabaseHolder::closeAll(OperationContext* opCtx, BSONObjBuilder& result, b } Database* db = _dbs[name]; - db->close(opCtx); + db->close(opCtx, reason); delete db; _dbs.erase(name); diff --git a/src/mongo/db/catalog/database_holder.h b/src/mongo/db/catalog/database_holder.h index fd574832d6c..ecde0496b76 100644 --- a/src/mongo/db/catalog/database_holder.h +++ b/src/mongo/db/catalog/database_holder.h @@ -67,15 +67,19 @@ public: /** * Closes the specified database. Must be called with the database locked in X-mode. */ - void close(OperationContext* opCtx, StringData ns); + void close(OperationContext* opCtx, StringData ns, const std::string& reason); /** * Closes all opened databases. Must be called with the global lock acquired in X-mode. * * @param result Populated with the names of the databases, which were closed. * @param force Force close even if something underway - use at shutdown + * @param reason The reason for close. */ - bool closeAll(OperationContext* opCtx, BSONObjBuilder& result, bool force); + bool closeAll(OperationContext* opCtx, + BSONObjBuilder& result, + bool force, + const std::string& reason); /** * Returns the set of existing database names that differ only in casing. diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp index 477540d9bb7..39b39bdb265 100644 --- a/src/mongo/db/catalog/database_impl.cpp +++ b/src/mongo/db/catalog/database_impl.cpp @@ -151,7 +151,7 @@ DatabaseImpl::~DatabaseImpl() { delete i->second; } -void DatabaseImpl::close(OperationContext* opCtx) { +void DatabaseImpl::close(OperationContext* opCtx, const std::string& reason) { // XXX? - Do we need to close database under global lock or just DB-lock is sufficient ? invariant(opCtx->lockState()->isW()); @@ -161,6 +161,11 @@ void DatabaseImpl::close(OperationContext* opCtx) { if (BackgroundOperation::inProgForDb(_name)) { log() << "warning: bg op in prog during close db? " << _name; } + + for (auto&& pair : _collections) { + auto* coll = pair.second; + coll->getCursorManager()->invalidateAll(opCtx, true, reason); + } } Status DatabaseImpl::validateDBName(StringData dbname) { @@ -417,6 +422,7 @@ Status DatabaseImpl::dropCollectionEvenIfSystem(OperationContext* opCtx, audit::logDropCollection(&cc(), fullns.toString()); + collection->getCursorManager()->invalidateAll(opCtx, true, "collection dropped"); Status s = collection->getIndexCatalog()->dropAllIndexes(opCtx, true); if (!s.isOK()) { @@ -475,7 +481,7 @@ void DatabaseImpl::_clearCollectionCache(OperationContext* opCtx, // Takes ownership of the collection opCtx->recoveryUnit()->registerChange(new RemoveCollectionChange(this, it->second)); - it->second->getCursorManager()->invalidateAll(false, reason); + it->second->getCursorManager()->invalidateAll(opCtx, false, reason); _collections.erase(it); } @@ -667,7 +673,7 @@ void DatabaseImpl::dropDatabase(OperationContext* opCtx, Database* db) { Top::get(opCtx->getClient()->getServiceContext()).collectionDropped(coll->ns().ns(), true); } - dbHolder().close(opCtx, name); + dbHolder().close(opCtx, name, "database dropped"); db = NULL; // d is now deleted MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { diff --git a/src/mongo/db/catalog/database_impl.h b/src/mongo/db/catalog/database_impl.h index 6f22deb3786..13b8a77cb7c 100644 --- a/src/mongo/db/catalog/database_impl.h +++ b/src/mongo/db/catalog/database_impl.h @@ -125,7 +125,7 @@ public: } // closes files and other cleanup see below. - void close(OperationContext* opCtx) final; + void close(OperationContext* opCtx, const std::string& reason) final; const std::string& name() const final { return _name; diff --git a/src/mongo/db/catalog/index_catalog.cpp b/src/mongo/db/catalog/index_catalog.cpp index dab3e9d4cb0..f3b644d8253 100644 --- a/src/mongo/db/catalog/index_catalog.cpp +++ b/src/mongo/db/catalog/index_catalog.cpp @@ -822,7 +822,8 @@ Status IndexCatalog::dropAllIndexes(OperationContext* opCtx, // there may be pointers pointing at keys in the btree(s). kill them. // TODO: can this can only clear cursors on this index? - _collection->getCursorManager()->invalidateAll(false, "all indexes on collection dropped"); + _collection->getCursorManager()->invalidateAll( + opCtx, false, "all indexes on collection dropped"); // make sure nothing in progress massert(17348, @@ -963,7 +964,7 @@ Status IndexCatalog::_dropIndex(OperationContext* opCtx, IndexCatalogEntry* entr // collection. if (entry->isReady(opCtx)) { _collection->getCursorManager()->invalidateAll( - false, str::stream() << "index '" << indexName << "' dropped"); + opCtx, false, str::stream() << "index '" << indexName << "' dropped"); } // --------- START REAL WORK ---------- @@ -1231,7 +1232,9 @@ const IndexDescriptor* IndexCatalog::refreshEntry(OperationContext* opCtx, // Notify other users of the IndexCatalog that we're about to invalidate 'oldDesc'. const bool collectionGoingAway = false; _collection->getCursorManager()->invalidateAll( - collectionGoingAway, str::stream() << "definition of index '" << indexName << "' changed"); + opCtx, + collectionGoingAway, + str::stream() << "definition of index '" << indexName << "' changed"); // Delete the IndexCatalogEntry that owns this descriptor. After deletion, 'oldDesc' is // invalid and should not be dereferenced. diff --git a/src/mongo/db/catalog/index_create.cpp b/src/mongo/db/catalog/index_create.cpp index 622706776f5..12726f3a950 100644 --- a/src/mongo/db/catalog/index_create.cpp +++ b/src/mongo/db/catalog/index_create.cpp @@ -295,14 +295,15 @@ Status MultiIndexBlock::insertAllDocumentsInCollection(std::set<RecordId>* dupsO unsigned long long n = 0; - unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan( - _opCtx, _collection->ns().ns(), _collection, PlanExecutor::YIELD_MANUAL)); + PlanExecutor::YieldPolicy yieldPolicy; if (_buildInBackground) { invariant(_allowInterruption); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO, _collection); + yieldPolicy = PlanExecutor::YIELD_AUTO; } else { - exec->setYieldPolicy(PlanExecutor::WRITE_CONFLICT_RETRY_ONLY, _collection); + yieldPolicy = PlanExecutor::WRITE_CONFLICT_RETRY_ONLY; } + auto exec = + InternalPlanner::collectionScan(_opCtx, _collection->ns().ns(), _collection, yieldPolicy); Snapshotted<BSONObj> objToIndex; RecordId loc; diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index 9bf48a6d379..48730e12b8c 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -117,7 +117,7 @@ void ClientCursor::init() { ClientCursor::~ClientCursor() { // Cursors must be unpinned and deregistered from their cursor manager before being deleted. invariant(!_isPinned); - invariant(!_cursorManager); + invariant(_disposed); cursorStatsOpen.decrement(); if (isNoTimeout()) { @@ -125,11 +125,23 @@ ClientCursor::~ClientCursor() { } } -void ClientCursor::kill() { - if (_exec.get()) - _exec->kill("cursor killed"); +void ClientCursor::markAsKilled(const std::string& reason) { + if (_exec) { + _exec->markAsKilled(reason); + } + _killed = true; +} + +void ClientCursor::dispose(OperationContext* opCtx) { + if (_disposed) { + return; + } + + if (_exec) { + _exec->dispose(opCtx, _cursorManager); + } - _cursorManager = nullptr; + _disposed = true; } // @@ -167,10 +179,12 @@ void ClientCursor::updateSlaveLocation(OperationContext* opCtx) { // Pin methods // -ClientCursorPin::ClientCursorPin(ClientCursor* cursor) : _cursor(cursor) { +ClientCursorPin::ClientCursorPin(OperationContext* opCtx, ClientCursor* cursor) + : _opCtx(opCtx), _cursor(cursor) { invariant(_cursor); invariant(_cursor->_isPinned); invariant(_cursor->_cursorManager); + invariant(!_cursor->_disposed); // We keep track of the number of cursors currently pinned. The cursor can become unpinned // either by being released back to the cursor manager or by being deleted. A cursor may be @@ -179,15 +193,16 @@ ClientCursorPin::ClientCursorPin(ClientCursor* cursor) : _cursor(cursor) { cursorStatsOpenPinned.increment(); } -ClientCursorPin::ClientCursorPin(ClientCursorPin&& other) : _cursor(other._cursor) { +ClientCursorPin::ClientCursorPin(ClientCursorPin&& other) + : _opCtx(other._opCtx), _cursor(other._cursor) { // The pinned cursor is being transferred to us from another pin. The 'other' pin must have a // pinned cursor. invariant(other._cursor); invariant(other._cursor->_isPinned); // Be sure to set the 'other' pin's cursor to null in order to transfer ownership to ourself. - _cursor = other._cursor; other._cursor = nullptr; + other._opCtx = nullptr; } ClientCursorPin& ClientCursorPin::operator=(ClientCursorPin&& other) { @@ -206,6 +221,10 @@ ClientCursorPin& ClientCursorPin::operator=(ClientCursorPin&& other) { // Be sure to set the 'other' pin's cursor to null in order to transfer ownership to ourself. _cursor = other._cursor; other._cursor = nullptr; + + _opCtx = other._opCtx; + other._opCtx = nullptr; + return *this; } @@ -219,16 +238,16 @@ void ClientCursorPin::release() { invariant(_cursor->_isPinned); - if (!_cursor->_cursorManager) { + if (_cursor->_killed) { // The ClientCursor was killed while we had it. Therefore, it is our responsibility to - // delete it. + // call dispose() and delete it. deleteUnderlying(); } else { // Unpin the cursor under the collection cursor manager lock. _cursor->_cursorManager->unpin(_cursor); + cursorStatsOpenPinned.decrement(); } - cursorStatsOpenPinned.decrement(); _cursor = nullptr; } @@ -236,16 +255,19 @@ void ClientCursorPin::deleteUnderlying() { invariant(_cursor); invariant(_cursor->_isPinned); // Note the following subtleties of this method's implementation: - // - We must unpin the cursor before destruction, since it is an error to destroy a pinned + // - We must unpin the cursor before destruction, since it is an error to delete a pinned // cursor. // - In addition, we must deregister the cursor before unpinning, since it is an // error to unpin a registered cursor without holding the cursor manager lock (note that // we can't simply unpin with the cursor manager lock here, since we need to guarantee // exclusive ownership of the cursor when we are deleting it). - if (_cursor->_cursorManager) { + + if (!_cursor->_killed) { _cursor->_cursorManager->deregisterCursor(_cursor); - _cursor->kill(); } + + // Make sure the cursor is disposed and unpinned before being destroyed. + _cursor->dispose(_opCtx); _cursor->_isPinned = false; delete _cursor; diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 384cede1fd6..e23c0101775 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -51,7 +51,7 @@ class RecoveryUnit; * constructed and managed using a CursorManager. See cursor_manager.h for more details. */ struct ClientCursorParams { - ClientCursorParams(std::unique_ptr<PlanExecutor> planExecutor, + ClientCursorParams(std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> planExecutor, NamespaceString nss, UserNameIterator authenticatedUsersIter, bool isReadCommitted, @@ -68,7 +68,7 @@ struct ClientCursorParams { } } - std::unique_ptr<PlanExecutor> exec; + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; const NamespaceString nss; std::vector<UserName> authenticatedUsers; bool isReadCommitted = false; @@ -250,9 +250,17 @@ private: void init(); /** - * Marks the cursor, and its underlying query plan, as killed. + * Marks this cursor as killed, so any future uses will return an error status including + * 'reason'. */ - void kill(); + void markAsKilled(const std::string& reason); + + /** + * Disposes this ClientCursor's PlanExecutor. Must be called before deleting a ClientCursor to + * ensure it has a chance to clean up any resources it is using. Can be called multiple times. + * It is an error to call any other method after calling dispose(). + */ + void dispose(OperationContext* opCtx); bool isNoTimeout() const { return (_queryOptions & QueryOption_NoCursorTimeout); @@ -271,12 +279,15 @@ private: // A pointer to the CursorManager which owns this cursor. This must be filled out when the // cursor is constructed via the CursorManager. - // - // If '_cursorManager' is destroyed while this cursor is pinned, then ownership of the cursor is - // transferred to the ClientCursorPin. In this case, '_cursorManager' set back to null in order - // to indicate the ownership transfer. CursorManager* _cursorManager = nullptr; + // Tracks whether dispose() has been called, to make sure it happens before destruction. It is + // an error to use a ClientCursor once it has been disposed. + bool _disposed = false; + + // TODO SERVER-28309 Remove this field and instead use _exec->markedAsKilled(). + bool _killed = false; + // Tracks the number of results returned by this cursor so far. long long _pos = 0; @@ -302,7 +313,7 @@ private: Microseconds _leftoverMaxTimeMicros = Microseconds::max(); // The underlying query execution machinery. - std::unique_ptr<PlanExecutor> _exec; + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _exec; }; /** @@ -322,10 +333,10 @@ private: * * Example usage: * { - * StatusWith<ClientCursorPin> pin = cursorManager->pinCursor(cursorid); + * StatusWith<ClientCursorPin> pin = cursorManager->pinCursor(opCtx, cursorid); * if (!pin.isOK()) { - * // No cursor with id 'cursorid' exists. Handle the error here. Pin automatically released - * // on block exit. + * // No cursor with id 'cursorid' exists, or it was killed while inactive. Handle the error + * here. * return pin.getStatus(); * } * @@ -384,8 +395,9 @@ public: private: friend class CursorManager; - ClientCursorPin(ClientCursor* cursor); + ClientCursorPin(OperationContext* opCtx, ClientCursor* cursor); + OperationContext* _opCtx = nullptr; ClientCursor* _cursor = nullptr; }; diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp index 3b61e081ab4..e0c782779e5 100644 --- a/src/mongo/db/commands/count_cmd.cpp +++ b/src/mongo/db/commands/count_cmd.cpp @@ -148,7 +148,7 @@ public: // Prevent chunks from being cleaned up during yields - this allows us to only check the // version on initial entry into count. - RangePreserver preserver(collection); + RangePreserver preserver(opCtx, collection); auto statusWithPlanExecutor = getExecutorCount(opCtx, collection, @@ -159,7 +159,7 @@ public: return statusWithPlanExecutor.getStatus(); } - unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); + auto exec = std::move(statusWithPlanExecutor.getValue()); Explain::explainStages(exec.get(), collection, verbosity, out); return Status::OK(); @@ -216,7 +216,7 @@ public: // Prevent chunks from being cleaned up during yields - this allows us to only check the // version on initial entry into count. - RangePreserver preserver(collection); + RangePreserver preserver(opCtx, collection); auto statusWithPlanExecutor = getExecutorCount(opCtx, collection, @@ -227,7 +227,7 @@ public: return appendCommandStatus(result, statusWithPlanExecutor.getStatus()); } - unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); + auto exec = std::move(statusWithPlanExecutor.getValue()); // Store the plan summary string in CurOp. auto curOp = CurOp::get(opCtx); diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index ebd5ca79cb7..5e6fef242fc 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -973,9 +973,7 @@ public: return 0; } - unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); - // Process notifications when the lock is released/reacquired in the loop below - exec->registerExec(coll); + auto exec = std::move(statusWithPlanExecutor.getValue()); BSONObj obj; PlanExecutor::ExecState state; @@ -1122,7 +1120,7 @@ public: result.appendBool("estimate", estimate); - unique_ptr<PlanExecutor> exec; + unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; if (min.isEmpty() && max.isEmpty()) { if (estimate) { result.appendNumber("size", static_cast<long long>(collection->dataSize(opCtx))); @@ -1130,8 +1128,7 @@ public: result.append("millis", timer.millis()); return 1; } - exec = - InternalPlanner::collectionScan(opCtx, ns, collection, PlanExecutor::YIELD_MANUAL); + exec = InternalPlanner::collectionScan(opCtx, ns, collection, PlanExecutor::NO_YIELD); } else if (min.isEmpty() || max.isEmpty()) { errmsg = "only one of min or max specified"; return false; @@ -1161,7 +1158,7 @@ public: min, max, BoundInclusion::kIncludeStartKeyOnly, - PlanExecutor::YIELD_MANUAL); + PlanExecutor::NO_YIELD); } long long avgObjSize = collection->dataSize(opCtx) / numRecords; diff --git a/src/mongo/db/commands/dbhash.cpp b/src/mongo/db/commands/dbhash.cpp index ea41c959bb2..7a9e9d18a9d 100644 --- a/src/mongo/db/commands/dbhash.cpp +++ b/src/mongo/db/commands/dbhash.cpp @@ -218,7 +218,7 @@ private: IndexDescriptor* desc = collection->getIndexCatalog()->findIdIndex(opCtx); - unique_ptr<PlanExecutor> exec; + unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; if (desc) { exec = InternalPlanner::indexScan(opCtx, collection, @@ -226,12 +226,12 @@ private: BSONObj(), BSONObj(), BoundInclusion::kIncludeStartKeyOnly, - PlanExecutor::YIELD_MANUAL, + PlanExecutor::NO_YIELD, InternalPlanner::FORWARD, InternalPlanner::IXSCAN_FETCH); } else if (collection->isCapped()) { exec = InternalPlanner::collectionScan( - opCtx, fullCollectionName, collection, PlanExecutor::YIELD_MANUAL); + opCtx, fullCollectionName, collection, PlanExecutor::NO_YIELD); } else { log() << "can't find _id index for: " << fullCollectionName; return "no _id _index"; diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index ce8ceb39fee..05eedc835a8 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -298,7 +298,7 @@ public: if (!statusWithPlanExecutor.isOK()) { return statusWithPlanExecutor.getStatus(); } - const std::unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); + const auto exec = std::move(statusWithPlanExecutor.getValue()); Explain::explainStages(exec.get(), collection, verbosity, out); } else { UpdateRequest request(nsString); @@ -329,7 +329,7 @@ public: if (!statusWithPlanExecutor.isOK()) { return statusWithPlanExecutor.getStatus(); } - const std::unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); + const auto exec = std::move(statusWithPlanExecutor.getValue()); Explain::explainStages(exec.get(), collection, verbosity, out); } @@ -421,8 +421,7 @@ public: if (!statusWithPlanExecutor.isOK()) { return appendCommandStatus(result, statusWithPlanExecutor.getStatus()); } - const std::unique_ptr<PlanExecutor> exec = - std::move(statusWithPlanExecutor.getValue()); + const auto exec = std::move(statusWithPlanExecutor.getValue()); { stdx::lock_guard<Client> lk(*opCtx->getClient()); @@ -527,8 +526,7 @@ public: if (!statusWithPlanExecutor.isOK()) { return appendCommandStatus(result, statusWithPlanExecutor.getStatus()); } - const std::unique_ptr<PlanExecutor> exec = - std::move(statusWithPlanExecutor.getValue()); + const auto exec = std::move(statusWithPlanExecutor.getValue()); { stdx::lock_guard<Client> lk(*opCtx->getClient()); diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 5cb37781525..a8f40cbe2b2 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -211,7 +211,7 @@ public: if (!statusWithPlanExecutor.isOK()) { return statusWithPlanExecutor.getStatus(); } - std::unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); + auto exec = std::move(statusWithPlanExecutor.getValue()); // Got the execution tree. Explain it. Explain::explainStages(exec.get(), collection, verbosity, out); @@ -331,7 +331,7 @@ public: return appendCommandStatus(result, statusWithPlanExecutor.getStatus()); } - std::unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); + auto exec = std::move(statusWithPlanExecutor.getValue()); { stdx::lock_guard<Client> lk(*opCtx->getClient()); @@ -389,15 +389,10 @@ public: // Set up the cursor for getMore. CursorId cursorId = 0; if (shouldSaveCursor(opCtx, collection, state, exec.get())) { - // Register the execution plan inside a ClientCursor. Ownership of the PlanExecutor is - // transferred to the ClientCursor. - // - // First unregister the PlanExecutor so it can be re-registered with ClientCursor. - exec->deregisterExec(); - // Create a ClientCursor containing this plan executor and register it with the cursor // manager. ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor( + opCtx, {std::move(exec), nss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), diff --git a/src/mongo/db/commands/geo_near_cmd.cpp b/src/mongo/db/commands/geo_near_cmd.cpp index 340cd80bd0e..43b57c06b68 100644 --- a/src/mongo/db/commands/geo_near_cmd.cpp +++ b/src/mongo/db/commands/geo_near_cmd.cpp @@ -234,7 +234,7 @@ public: // Prevent chunks from being cleaned up during yields - this allows us to only check the // version on initial entry into geoNear. - RangePreserver preserver(collection); + RangePreserver preserver(opCtx, collection); auto statusWithPlanExecutor = getExecutor(opCtx, collection, std::move(cq), PlanExecutor::YIELD_AUTO, 0); @@ -243,7 +243,7 @@ public: return false; } - unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); + auto exec = std::move(statusWithPlanExecutor.getValue()); auto curOp = CurOp::get(opCtx); { diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 2f69dc7b905..aa2ca491783 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -213,9 +213,8 @@ public: cursorManager = collection->getCursorManager(); } - auto ccPin = cursorManager->pinCursor(request.cursorid); + auto ccPin = cursorManager->pinCursor(opCtx, request.cursorid); if (!ccPin.isOK()) { - // We didn't find the cursor. return appendCommandStatus(result, ccPin.getStatus()); } @@ -512,7 +511,7 @@ public: } } - if (PlanExecutor::FAILURE == *state || PlanExecutor::DEAD == *state) { + if (PlanExecutor::FAILURE == *state) { nextBatch->abandon(); error() << "GetMore command executor error: " << PlanExecutor::statestr(*state) @@ -521,6 +520,12 @@ public: return Status(ErrorCodes::OperationFailed, str::stream() << "GetMore command executor error: " << WorkingSetCommon::toStatusString(obj)); + } else if (PlanExecutor::DEAD == *state) { + nextBatch->abandon(); + + return Status(ErrorCodes::QueryPlanKilled, + str::stream() << "PlanExecutor killed: " + << WorkingSetCommon::toStatusString(obj)); } return Status::OK(); diff --git a/src/mongo/db/commands/group_cmd.cpp b/src/mongo/db/commands/group_cmd.cpp index b2d1735c06b..c7f0b70c6ba 100644 --- a/src/mongo/db/commands/group_cmd.cpp +++ b/src/mongo/db/commands/group_cmd.cpp @@ -143,7 +143,7 @@ private: return statusWithPlanExecutor.getStatus(); } - unique_ptr<PlanExecutor> planExecutor = std::move(statusWithPlanExecutor.getValue()); + auto planExecutor = std::move(statusWithPlanExecutor.getValue()); Explain::explainStages(planExecutor.get(), coll, verbosity, out); return Status::OK(); @@ -174,7 +174,7 @@ private: return appendCommandStatus(result, statusWithPlanExecutor.getStatus()); } - unique_ptr<PlanExecutor> planExecutor = std::move(statusWithPlanExecutor.getValue()); + auto planExecutor = std::move(statusWithPlanExecutor.getValue()); auto curOp = CurOp::get(opCtx); { diff --git a/src/mongo/db/commands/killcursors_cmd.cpp b/src/mongo/db/commands/killcursors_cmd.cpp index a39f0dfaa42..394039ab124 100644 --- a/src/mongo/db/commands/killcursors_cmd.cpp +++ b/src/mongo/db/commands/killcursors_cmd.cpp @@ -80,7 +80,7 @@ private: // Make sure the namespace of the cursor matches the namespace passed to the killCursors // command so we can be sure we checked the correct privileges. - auto ccPin = cursorManager->pinCursor(cursorId); + auto ccPin = cursorManager->pinCursor(opCtx, cursorId); if (ccPin.isOK()) { auto cursorNs = ccPin.getValue().getCursor()->nss(); if (cursorNs != nss) { diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp index 5610a67ad66..b58ff9339fd 100644 --- a/src/mongo/db/commands/list_collections.cpp +++ b/src/mongo/db/commands/list_collections.cpp @@ -297,11 +297,11 @@ public: const NamespaceString cursorNss = NamespaceString::makeListCollectionsNSS(dbname); auto statusWithPlanExecutor = PlanExecutor::make( - opCtx, std::move(ws), std::move(root), cursorNss, PlanExecutor::YIELD_MANUAL); + opCtx, std::move(ws), std::move(root), cursorNss, PlanExecutor::NO_YIELD); if (!statusWithPlanExecutor.isOK()) { return appendCommandStatus(result, statusWithPlanExecutor.getStatus()); } - unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); + auto exec = std::move(statusWithPlanExecutor.getValue()); BSONArrayBuilder firstBatch; @@ -327,6 +327,7 @@ public: exec->saveState(); exec->detachFromOperationContext(); auto pinnedCursor = CursorManager::getGlobalCursorManager()->registerCursor( + opCtx, {std::move(exec), cursorNss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp index 387cb7a376d..46d5d842d97 100644 --- a/src/mongo/db/commands/list_indexes.cpp +++ b/src/mongo/db/commands/list_indexes.cpp @@ -197,11 +197,11 @@ public: dassert(ns == cursorNss.getTargetNSForListIndexes()); auto statusWithPlanExecutor = PlanExecutor::make( - opCtx, std::move(ws), std::move(root), cursorNss, PlanExecutor::YIELD_MANUAL); + opCtx, std::move(ws), std::move(root), cursorNss, PlanExecutor::NO_YIELD); if (!statusWithPlanExecutor.isOK()) { return appendCommandStatus(result, statusWithPlanExecutor.getStatus()); } - unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); + auto exec = std::move(statusWithPlanExecutor.getValue()); BSONArrayBuilder firstBatch; @@ -227,6 +227,7 @@ public: exec->saveState(); exec->detachFromOperationContext(); auto pinnedCursor = CursorManager::getGlobalCursorManager()->registerCursor( + opCtx, {std::move(exec), cursorNss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index f6338b0a2a7..ce900a7fba7 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -1114,7 +1114,15 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHold _opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, QueryPlannerParams::NO_TABLE_SCAN); verify(statusWithPlanExecutor.isOK()); - unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); + auto exec = std::move(statusWithPlanExecutor.getValue()); + + // Make sure the PlanExecutor is destroyed while holding a collection lock. + ON_BLOCK_EXIT([&exec, &ctx, opCtx, this] { + if (!ctx) { + AutoGetCollection autoColl(opCtx, _config.incLong, MODE_IS); + exec.reset(); + } + }); // iterate over all sorted objects BSONObj o; @@ -1404,7 +1412,7 @@ public: Collection* collection = ctx.getCollection(); if (collection) { - rangePreserver.reset(new RangePreserver(collection)); + rangePreserver.reset(new RangePreserver(opCtx, collection)); } // Get metadata before we check our version, to make sure it doesn't increment @@ -1502,7 +1510,7 @@ public: } std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - unique_ptr<PlanExecutor> exec; + unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; { Database* db = scopedAutoDb->getDb(); Collection* coll = State::getCollectionOrUassert(opCtx, db, config.nss); diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp index 725e76b19ab..41471f656aa 100644 --- a/src/mongo/db/commands/parallel_collection_scan.cpp +++ b/src/mongo/db/commands/parallel_collection_scan.cpp @@ -115,7 +115,7 @@ public: numCursors = iterators.size(); } - std::vector<std::unique_ptr<PlanExecutor>> execs; + std::vector<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs; for (size_t i = 0; i < numCursors; i++) { unique_ptr<WorkingSet> ws = make_unique<WorkingSet>(); unique_ptr<MultiIteratorStage> mis = @@ -139,16 +139,13 @@ public: { BSONArrayBuilder bucketsBuilder; for (auto&& exec : execs) { - // The PlanExecutor was registered on construction due to the YIELD_AUTO policy. - // We have to deregister it, as it will be registered with ClientCursor. - exec->deregisterExec(); - // Need to save state while yielding locks between now and getMore(). exec->saveState(); exec->detachFromOperationContext(); // Create and register a new ClientCursor. auto pinnedCursor = collection->getCursorManager()->registerCursor( + opCtx, {std::move(exec), ns, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), diff --git a/src/mongo/db/commands/repair_cursor.cpp b/src/mongo/db/commands/repair_cursor.cpp index bf45608ccff..031370f4316 100644 --- a/src/mongo/db/commands/repair_cursor.cpp +++ b/src/mongo/db/commands/repair_cursor.cpp @@ -96,16 +96,13 @@ public: auto statusWithPlanExecutor = PlanExecutor::make( opCtx, std::move(ws), std::move(stage), collection, PlanExecutor::YIELD_AUTO); invariant(statusWithPlanExecutor.isOK()); - std::unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); + auto exec = std::move(statusWithPlanExecutor.getValue()); - // 'exec' will be used in getMore(). It was automatically registered on construction - // due to the auto yield policy, so it could yield during plan selection. We deregister - // it now so that it can be registed with ClientCursor. - exec->deregisterExec(); exec->saveState(); exec->detachFromOperationContext(); auto pinnedCursor = collection->getCursorManager()->registerCursor( + opCtx, {std::move(exec), ns, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 4b26126d68d..02dd220d340 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -208,8 +208,8 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames * Round trips the pipeline through serialization by calling serialize(), then Pipeline::parse(). * fasserts if it fails to parse after being serialized. */ -boost::intrusive_ptr<Pipeline> reparsePipeline( - const boost::intrusive_ptr<Pipeline>& pipeline, +std::unique_ptr<Pipeline, Pipeline::Deleter> reparsePipeline( + const Pipeline* pipeline, const AggregationRequest& request, const boost::intrusive_ptr<ExpressionContext>& expCtx) { auto serialized = pipeline->serialize(); @@ -231,7 +231,7 @@ boost::intrusive_ptr<Pipeline> reparsePipeline( } reparsedPipeline.getValue()->optimizePipeline(); - return reparsedPipeline.getValue(); + return std::move(reparsedPipeline.getValue()); } /** @@ -241,7 +241,7 @@ boost::intrusive_ptr<Pipeline> reparsePipeline( Status collatorCompatibleWithPipeline(OperationContext* opCtx, Database* db, const CollatorInterface* collator, - const intrusive_ptr<Pipeline> pipeline) { + const Pipeline* pipeline) { if (!db || !pipeline) { return Status::OK(); } @@ -278,9 +278,9 @@ Status runAggregate(OperationContext* opCtx, : uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) ->makeFromBSON(request.getCollation())); - unique_ptr<PlanExecutor> exec; + unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; boost::intrusive_ptr<ExpressionContext> expCtx; - boost::intrusive_ptr<Pipeline> pipeline; + Pipeline* unownedPipeline; auto curOp = CurOp::get(opCtx); { // This will throw if the sharding version for this connection is out of date. If the @@ -368,12 +368,12 @@ Status runAggregate(OperationContext* opCtx, if (!statusWithPipeline.isOK()) { return statusWithPipeline.getStatus(); } - pipeline = std::move(statusWithPipeline.getValue()); + auto pipeline = std::move(statusWithPipeline.getValue()); // Check that the view's collation matches the collation of any views involved // in the pipeline. - auto pipelineCollationStatus = - collatorCompatibleWithPipeline(opCtx, ctx.getDb(), expCtx->getCollator(), pipeline); + auto pipelineCollationStatus = collatorCompatibleWithPipeline( + opCtx, ctx.getDb(), expCtx->getCollator(), pipeline.get()); if (!pipelineCollationStatus.isOK()) { return pipelineCollationStatus; } @@ -385,22 +385,24 @@ Status runAggregate(OperationContext* opCtx, // re-parsing every command in debug builds. This is important because sharded // aggregations rely on this ability. Skipping when inShard because this has // already been through the transformation (and this un-sets expCtx->inShard). - pipeline = reparsePipeline(pipeline, request, expCtx); + pipeline = reparsePipeline(pipeline.get(), request, expCtx); } // This does mongod-specific stuff like creating the input PlanExecutor and adding // it to the front of the pipeline if needed. - PipelineD::prepareCursorSource(collection, &request, pipeline); + PipelineD::prepareCursorSource(collection, &request, pipeline.get()); + // Transfer ownership of the Pipeline to the PipelineProxyStage. + unownedPipeline = pipeline.get(); auto ws = make_unique<WorkingSet>(); - auto proxy = make_unique<PipelineProxyStage>(opCtx, pipeline, ws.get()); + auto proxy = make_unique<PipelineProxyStage>(opCtx, std::move(pipeline), ws.get()); // This PlanExecutor will simply forward requests to the Pipeline, so does not need to // yield or to be registered with any collection's CursorManager to receive invalidations. // The Pipeline may contain PlanExecutors which *are* yielding PlanExecutors and which *are* // registered with their respective collection's CursorManager - auto statusWithPlanExecutor = PlanExecutor::make( - opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::YIELD_MANUAL); + auto statusWithPlanExecutor = + PlanExecutor::make(opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::NO_YIELD); invariant(statusWithPlanExecutor.isOK()); exec = std::move(statusWithPlanExecutor.getValue()); @@ -417,6 +419,7 @@ Status runAggregate(OperationContext* opCtx, // notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving // invalidations and kill notifications themselves, not the cursor we create here. auto pin = CursorManager::getGlobalCursorManager()->registerCursor( + opCtx, {std::move(exec), origNss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), @@ -426,7 +429,7 @@ Status runAggregate(OperationContext* opCtx, // If both explain and cursor are specified, explain wins. if (expCtx->explain) { - result << "stages" << Value(pipeline->writeExplainOps(*expCtx->explain)); + result << "stages" << Value(unownedPipeline->writeExplainOps(*expCtx->explain)); } else { // Cursor must be specified, if explain is not. const bool keepCursor = diff --git a/src/mongo/db/commands/test_commands.cpp b/src/mongo/db/commands/test_commands.cpp index 0c12a8c22ce..05d011580b6 100644 --- a/src/mongo/db/commands/test_commands.cpp +++ b/src/mongo/db/commands/test_commands.cpp @@ -252,12 +252,8 @@ public: // Scan backwards through the collection to find the document to start truncating from. // We will remove 'n' documents, so start truncating from the (n + 1)th document to the // end. - std::unique_ptr<PlanExecutor> exec( - InternalPlanner::collectionScan(opCtx, - fullNs.ns(), - collection, - PlanExecutor::YIELD_MANUAL, - InternalPlanner::BACKWARD)); + auto exec = InternalPlanner::collectionScan( + opCtx, fullNs.ns(), collection, PlanExecutor::NO_YIELD, InternalPlanner::BACKWARD); for (int i = 0; i < n + 1; ++i) { PlanExecutor::ExecState state = exec->getNext(nullptr, &end); diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 31adaba7b18..44aba032a48 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -372,8 +372,8 @@ void repairDatabasesAndCheckVersion(OperationContext* opCtx) { const NamespaceString systemIndexes(db->name(), "system.indexes"); Collection* coll = db->getCollection(opCtx, systemIndexes); - unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan( - opCtx, systemIndexes.ns(), coll, PlanExecutor::YIELD_MANUAL)); + auto exec = InternalPlanner::collectionScan( + opCtx, systemIndexes.ns(), coll, PlanExecutor::NO_YIELD); BSONObj index; PlanExecutor::ExecState state; diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index e8a5df00702..34ecdfbbefa 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -119,12 +119,12 @@ RecordId Helpers::findOne(OperationContext* opCtx, size_t options = requireIndex ? QueryPlannerParams::NO_TABLE_SCAN : QueryPlannerParams::DEFAULT; auto statusWithPlanExecutor = - getExecutor(opCtx, collection, std::move(cq), PlanExecutor::YIELD_MANUAL, options); + getExecutor(opCtx, collection, std::move(cq), PlanExecutor::NO_YIELD, options); massert(17245, "Could not get executor for query " + query.toString(), statusWithPlanExecutor.isOK()); - unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); + auto exec = std::move(statusWithPlanExecutor.getValue()); PlanExecutor::ExecState state; BSONObj obj; RecordId loc; @@ -182,8 +182,8 @@ RecordId Helpers::findById(OperationContext* opCtx, bool Helpers::getSingleton(OperationContext* opCtx, const char* ns, BSONObj& result) { AutoGetCollectionForReadCommand ctx(opCtx, NamespaceString(ns)); - unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan( - opCtx, ns, ctx.getCollection(), PlanExecutor::YIELD_MANUAL)); + auto exec = + InternalPlanner::collectionScan(opCtx, ns, ctx.getCollection(), PlanExecutor::NO_YIELD); PlanExecutor::ExecState state = exec->getNext(&result, NULL); CurOp::get(opCtx)->done(); @@ -201,11 +201,8 @@ bool Helpers::getSingleton(OperationContext* opCtx, const char* ns, BSONObj& res bool Helpers::getLast(OperationContext* opCtx, const char* ns, BSONObj& result) { AutoGetCollectionForReadCommand autoColl(opCtx, NamespaceString(ns)); - unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(opCtx, - ns, - autoColl.getCollection(), - PlanExecutor::YIELD_MANUAL, - InternalPlanner::BACKWARD)); + auto exec = InternalPlanner::collectionScan( + opCtx, ns, autoColl.getCollection(), PlanExecutor::NO_YIELD, InternalPlanner::BACKWARD); PlanExecutor::ExecState state = exec->getNext(&result, NULL); // Non-yielding collection scans from InternalPlanner will never error. @@ -353,17 +350,15 @@ long long Helpers::removeRange(OperationContext* opCtx, return -1; } - unique_ptr<PlanExecutor> exec( - InternalPlanner::indexScan(opCtx, - collection, - desc, - min, - max, - boundInclusion, - PlanExecutor::YIELD_MANUAL, - InternalPlanner::FORWARD, - InternalPlanner::IXSCAN_FETCH)); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO, collection); + auto exec = InternalPlanner::indexScan(opCtx, + collection, + desc, + min, + max, + boundInclusion, + PlanExecutor::YIELD_AUTO, + InternalPlanner::FORWARD, + InternalPlanner::IXSCAN_FETCH); RecordId rloc; BSONObj obj; @@ -469,7 +464,7 @@ void Helpers::emptyCollection(OperationContext* opCtx, const NamespaceString& ns OldClientContext context(opCtx, nss.ns()); repl::UnreplicatedWritesBlock uwb(opCtx); Collection* collection = context.db() ? context.db()->getCollection(opCtx, nss) : nullptr; - deleteObjects(opCtx, collection, nss, BSONObj(), PlanExecutor::YIELD_MANUAL, false); + deleteObjects(opCtx, collection, nss, BSONObj(), false); } Helpers::RemoveSaver::RemoveSaver(const string& a, const string& b, const string& why) { diff --git a/src/mongo/db/exec/cached_plan.cpp b/src/mongo/db/exec/cached_plan.cpp index fd371a3c6c7..3106994547e 100644 --- a/src/mongo/db/exec/cached_plan.cpp +++ b/src/mongo/db/exec/cached_plan.cpp @@ -115,7 +115,7 @@ Status CachedPlanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) { return Status::OK(); } else if (PlanStage::NEED_YIELD == state) { if (id == WorkingSet::INVALID_ID) { - if (!yieldPolicy->allowedToYield()) { + if (!yieldPolicy->canAutoYield()) { throw WriteConflictException(); } } else { @@ -125,7 +125,7 @@ Status CachedPlanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) { _fetcher.reset(member->releaseFetcher()); } - if (yieldPolicy->allowedToYield()) { + if (yieldPolicy->canAutoYield()) { yieldPolicy->forceYield(); } diff --git a/src/mongo/db/exec/multi_plan.cpp b/src/mongo/db/exec/multi_plan.cpp index d4d9d21ca47..a01521610ea 100644 --- a/src/mongo/db/exec/multi_plan.cpp +++ b/src/mongo/db/exec/multi_plan.cpp @@ -365,7 +365,7 @@ bool MultiPlanStage::workAllPlans(size_t numResults, PlanYieldPolicy* yieldPolic doneWorking = true; } else if (PlanStage::NEED_YIELD == state) { if (id == WorkingSet::INVALID_ID) { - if (!yieldPolicy->allowedToYield()) + if (!yieldPolicy->canAutoYield()) throw WriteConflictException(); } else { WorkingSetMember* member = candidate.ws->get(id); @@ -374,7 +374,7 @@ bool MultiPlanStage::workAllPlans(size_t numResults, PlanYieldPolicy* yieldPolic _fetcher.reset(member->releaseFetcher()); } - if (yieldPolicy->allowedToYield()) { + if (yieldPolicy->canAutoYield()) { yieldPolicy->forceYield(); } diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp index 0937f96faa4..d6cbf80718b 100644 --- a/src/mongo/db/exec/pipeline_proxy.cpp +++ b/src/mongo/db/exec/pipeline_proxy.cpp @@ -47,12 +47,16 @@ using stdx::make_unique; const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY"; PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx, - intrusive_ptr<Pipeline> pipeline, + std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline, WorkingSet* ws) : PlanStage(kStageType, opCtx), - _pipeline(pipeline), + _pipeline(std::move(pipeline)), _includeMetaData(_pipeline->getContext()->inShard), // send metadata to merger - _ws(ws) {} + _ws(ws) { + // We take over responsibility for disposing of the Pipeline, since it is required that + // doDispose() will be called before destruction of this PipelineProxyStage. + _pipeline.get_deleter().dismissDisposal(); +} PlanStage::StageState PipelineProxyStage::doWork(WorkingSetID* out) { if (!out) { @@ -99,6 +103,10 @@ void PipelineProxyStage::doReattachToOperationContext() { _pipeline->reattachToOperationContext(getOpCtx()); } +void PipelineProxyStage::doDispose() { + _pipeline->dispose(getOpCtx()); +} + unique_ptr<PlanStageStats> PipelineProxyStage::getStats() { unique_ptr<PlanStageStats> ret = make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_PIPELINE_PROXY); @@ -119,12 +127,12 @@ boost::optional<BSONObj> PipelineProxyStage::getNextBson() { } std::string PipelineProxyStage::getPlanSummaryStr() const { - return PipelineD::getPlanSummaryStr(_pipeline); + return PipelineD::getPlanSummaryStr(_pipeline.get()); } void PipelineProxyStage::getPlanSummaryStats(PlanSummaryStats* statsOut) const { invariant(statsOut); - PipelineD::getPlanSummaryStats(_pipeline, statsOut); + PipelineD::getPlanSummaryStats(_pipeline.get(), statsOut); statsOut->nReturned = getCommonStats()->advanced; } } // namespace mongo diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h index fd5b99c4bae..ab33c963fa1 100644 --- a/src/mongo/db/exec/pipeline_proxy.h +++ b/src/mongo/db/exec/pipeline_proxy.h @@ -47,7 +47,7 @@ namespace mongo { class PipelineProxyStage final : public PlanStage { public: PipelineProxyStage(OperationContext* opCtx, - boost::intrusive_ptr<Pipeline> pipeline, + std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline, WorkingSet* ws); PlanStage::StageState doWork(WorkingSetID* out) final; @@ -68,7 +68,7 @@ public: MONGO_UNREACHABLE; } - void doInvalidate(OperationContext* txn, const RecordId& rid, InvalidationType type) final { + void doInvalidate(OperationContext* opCtx, const RecordId& rid, InvalidationType type) final { // A PlanExecutor with a PipelineProxyStage should be registered with the global cursor // manager, so should not receive invalidations. MONGO_UNREACHABLE; @@ -83,11 +83,14 @@ public: static const char* kStageType; +protected: + void doDispose() final; + private: boost::optional<BSONObj> getNextBson(); // Things in the _stash should be returned before pulling items from _pipeline. - const boost::intrusive_ptr<Pipeline> _pipeline; + std::unique_ptr<Pipeline, Pipeline::Deleter> _pipeline; std::vector<BSONObj> _stash; const bool _includeMetaData; diff --git a/src/mongo/db/exec/plan_stage.h b/src/mongo/db/exec/plan_stage.h index 6b3a67a2c04..33204c84839 100644 --- a/src/mongo/db/exec/plan_stage.h +++ b/src/mongo/db/exec/plan_stage.h @@ -261,6 +261,29 @@ public: */ void invalidate(OperationContext* opCtx, const RecordId& dl, InvalidationType type); + /* + * Releases any resources held by this stage. It is an error to use a PlanStage in any way after + * calling dispose(). Does not throw exceptions. + * + * Propagates to all children, then calls doDispose(). + */ + void dispose(OperationContext* opCtx) { + try { + // We may or may not be attached during disposal. We can't call + // reattachToOperationContext() + // directly, since that will assert that '_opCtx' is not set. + _opCtx = opCtx; + invariant(!_opCtx || opCtx == opCtx); + + for (auto&& child : _children) { + child->dispose(opCtx); + } + doDispose(); + } catch (...) { + std::terminate(); + } + } + /** * Retrieve a list of this stage's children. This stage keeps ownership of * its children. @@ -354,6 +377,11 @@ protected: virtual void doReattachToOperationContext() {} /** + * Does stage-specific destruction. Must not throw exceptions. + */ + virtual void doDispose() {} + + /** * Does the stage-specific invalidation work. */ virtual void doInvalidate(OperationContext* opCtx, const RecordId& dl, InvalidationType type) {} diff --git a/src/mongo/db/exec/stagedebug_cmd.cpp b/src/mongo/db/exec/stagedebug_cmd.cpp index 0cd1f5d7a42..e9f2c255375 100644 --- a/src/mongo/db/exec/stagedebug_cmd.cpp +++ b/src/mongo/db/exec/stagedebug_cmd.cpp @@ -193,7 +193,7 @@ public: auto statusWithPlanExecutor = PlanExecutor::make( opCtx, std::move(ws), std::move(rootFetch), collection, PlanExecutor::YIELD_AUTO); fassert(28536, statusWithPlanExecutor.getStatus()); - std::unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); + auto exec = std::move(statusWithPlanExecutor.getValue()); BSONArrayBuilder resultBuilder(result.subarrayStart("results")); diff --git a/src/mongo/db/index/haystack_access_method.cpp b/src/mongo/db/index/haystack_access_method.cpp index 213c9f05baa..8d80fdc5363 100644 --- a/src/mongo/db/index/haystack_access_method.cpp +++ b/src/mongo/db/index/haystack_access_method.cpp @@ -110,14 +110,13 @@ void HaystackAccessMethod::searchCommand(OperationContext* opCtx, unordered_set<RecordId, RecordId::Hasher> thisPass; - unique_ptr<PlanExecutor> exec( - InternalPlanner::indexScan(opCtx, - collection, - _descriptor, - key, - key, - BoundInclusion::kIncludeBothStartAndEndKeys, - PlanExecutor::YIELD_MANUAL)); + auto exec = InternalPlanner::indexScan(opCtx, + collection, + _descriptor, + key, + key, + BoundInclusion::kIncludeBothStartAndEndKeys, + PlanExecutor::NO_YIELD); PlanExecutor::ExecState state; BSONObj obj; RecordId loc; diff --git a/src/mongo/db/ops/delete.cpp b/src/mongo/db/ops/delete.cpp index 19fc3eb7458..1ac92749e3f 100644 --- a/src/mongo/db/ops/delete.cpp +++ b/src/mongo/db/ops/delete.cpp @@ -39,16 +39,10 @@ namespace mongo { -/* ns: namespace, e.g. <database>.<collection> - pattern: the "where" clause / criteria - justOne: stop after 1 match - god: allow access to system namespaces, and don't yield -*/ long long deleteObjects(OperationContext* opCtx, Collection* collection, const NamespaceString& ns, BSONObj pattern, - PlanExecutor::YieldPolicy policy, bool justOne, bool god, bool fromMigrate) { @@ -57,7 +51,6 @@ long long deleteObjects(OperationContext* opCtx, request.setMulti(!justOne); request.setGod(god); request.setFromMigrate(fromMigrate); - request.setYieldPolicy(policy); ParsedDelete parsedDelete(opCtx, &request); uassertStatusOK(parsedDelete.parseRequest()); @@ -65,7 +58,7 @@ long long deleteObjects(OperationContext* opCtx, auto client = opCtx->getClient(); auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp(); - std::unique_ptr<PlanExecutor> exec = uassertStatusOK( + auto exec = uassertStatusOK( getExecutorDelete(opCtx, &CurOp::get(opCtx)->debug(), collection, &parsedDelete)); uassertStatusOK(exec->executePlan()); diff --git a/src/mongo/db/ops/delete.h b/src/mongo/db/ops/delete.h index bb7427f5bb5..bbbe23cbdbc 100644 --- a/src/mongo/db/ops/delete.h +++ b/src/mongo/db/ops/delete.h @@ -39,11 +39,15 @@ namespace mongo { class Database; class OperationContext; +/** + * Deletes objects from 'collection' that match the query predicate given by 'pattern'. If 'justOne' + * is true, deletes only the first matching object. The PlanExecutor used to do the deletion will + * not yield. If 'god' is true, deletes are allowed on system namespaces. + */ long long deleteObjects(OperationContext* opCtx, Collection* collection, const NamespaceString& ns, BSONObj pattern, - PlanExecutor::YieldPolicy policy, bool justOne, bool god = false, bool fromMigrate = false); diff --git a/src/mongo/db/ops/delete_request.h b/src/mongo/db/ops/delete_request.h index 67514bb99b9..54c55984894 100644 --- a/src/mongo/db/ops/delete_request.h +++ b/src/mongo/db/ops/delete_request.h @@ -48,7 +48,7 @@ public: _fromMigrate(false), _isExplain(false), _returnDeleted(false), - _yieldPolicy(PlanExecutor::YIELD_MANUAL) {} + _yieldPolicy(PlanExecutor::NO_YIELD) {} void setQuery(const BSONObj& query) { _query = query; diff --git a/src/mongo/db/ops/parsed_delete.cpp b/src/mongo/db/ops/parsed_delete.cpp index 854ff7bb480..e53988ea78e 100644 --- a/src/mongo/db/ops/parsed_delete.cpp +++ b/src/mongo/db/ops/parsed_delete.cpp @@ -114,7 +114,7 @@ const DeleteRequest* ParsedDelete::getRequest() const { PlanExecutor::YieldPolicy ParsedDelete::yieldPolicy() const { if (_request->isGod()) { - return PlanExecutor::YIELD_MANUAL; + return PlanExecutor::NO_YIELD; } if (_request->getYieldPolicy() == PlanExecutor::YIELD_AUTO && isIsolated()) { return PlanExecutor::WRITE_CONFLICT_RETRY_ONLY; // Don't yield locks. diff --git a/src/mongo/db/ops/parsed_update.cpp b/src/mongo/db/ops/parsed_update.cpp index a1e0a258e8a..a6dfb2373ac 100644 --- a/src/mongo/db/ops/parsed_update.cpp +++ b/src/mongo/db/ops/parsed_update.cpp @@ -169,7 +169,7 @@ Status ParsedUpdate::parseArrayFilters() { PlanExecutor::YieldPolicy ParsedUpdate::yieldPolicy() const { if (_request->isGod()) { - return PlanExecutor::YIELD_MANUAL; + return PlanExecutor::NO_YIELD; } if (_request->getYieldPolicy() == PlanExecutor::YIELD_AUTO && isIsolated()) { return PlanExecutor::WRITE_CONFLICT_RETRY_ONLY; // Don't yield locks. diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp index 5150eb7a66c..eaeb014009b 100644 --- a/src/mongo/db/ops/update.cpp +++ b/src/mongo/db/ops/update.cpp @@ -111,8 +111,7 @@ UpdateResult update(OperationContext* opCtx, Database* db, const UpdateRequest& uassertStatusOK(parsedUpdate.parseRequest()); OpDebug* const nullOpDebug = nullptr; - std::unique_ptr<PlanExecutor> exec = - uassertStatusOK(getExecutorUpdate(opCtx, nullOpDebug, collection, &parsedUpdate)); + auto exec = uassertStatusOK(getExecutorUpdate(opCtx, nullOpDebug, collection, &parsedUpdate)); uassertStatusOK(exec->executePlan()); if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) { diff --git a/src/mongo/db/ops/update_request.h b/src/mongo/db/ops/update_request.h index 5d3bf8b9d14..4b821db9996 100644 --- a/src/mongo/db/ops/update_request.h +++ b/src/mongo/db/ops/update_request.h @@ -54,6 +54,7 @@ public: // Return the document as it is after the update. RETURN_NEW }; + inline UpdateRequest(const NamespaceString& nsString) : _nsString(nsString), _god(false), @@ -63,7 +64,7 @@ public: _lifecycle(NULL), _isExplain(false), _returnDocs(ReturnDocOption::RETURN_NONE), - _yieldPolicy(PlanExecutor::YIELD_MANUAL) {} + _yieldPolicy(PlanExecutor::NO_YIELD) {} const NamespaceString& getNamespaceString() const { return _nsString; @@ -275,7 +276,7 @@ private: // without another query before or after the update. ReturnDocOption _returnDocs; - // Whether or not the update should yield. Defaults to YIELD_MANUAL. + // Whether or not the update should yield. Defaults to NO_YIELD. PlanExecutor::YieldPolicy _yieldPolicy; }; diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index ec1d8edfcb3..23e98350197 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -83,11 +83,6 @@ const char* DocumentSource::getSourceName() const { return unknown; } -void DocumentSource::setSource(DocumentSource* pTheSource) { - verify(!isValidInitialSource()); - pSource = pTheSource; -} - intrusive_ptr<DocumentSource> DocumentSource::optimize() { return this; } @@ -192,12 +187,6 @@ Pipeline::SourceContainer::iterator DocumentSource::optimizeAt( return doOptimizeAt(itr, container); } -void DocumentSource::dispose() { - if (pSource) { - pSource->dispose(); - } -} - void DocumentSource::serializeToArray(vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { Value entry = serialize(explain); diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index b6ef71d1480..2b014c37a61 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -209,38 +209,31 @@ public: virtual GetNextResult getNext() = 0; /** - * Inform the source that it is no longer needed and may release its resources. After - * dispose() is called the source must still be able to handle iteration requests, but may - * become eof(). - * NOTE: For proper mutex yielding, dispose() must be called on any DocumentSource that will - * not be advanced until eof(), see SERVER-6123. + * Informs the stage that it is no longer needed and can release its resources. After dispose() + * is called the stage must still be able to handle calls to getNext(), but can return kEOF. + * + * This is a non-virtual public interface to ensure dispose() is threaded through the entire + * pipeline. Subclasses should override doDispose() to implement their disposal. */ - virtual void dispose(); + void dispose() { + doDispose(); + if (pSource) { + pSource->dispose(); + } + } /** - Get the source's name. - - @returns the std::string name of the source as a constant string; - this is static, and there's no need to worry about adopting it + * Get the stage's name. */ virtual const char* getSourceName() const; /** - Set the underlying source this source should use to get Documents - from. - - It is an error to set the source more than once. This is to - prevent changing sources once the original source has been started; - this could break the state maintained by the DocumentSource. - - This pointer is not reference counted because that has led to - some circular references. As a result, this doesn't keep - sources alive, and is only intended to be used temporarily for - the lifetime of a Pipeline::run(). - - @param pSource the underlying source to use + * Set the underlying source this source should use to get Documents from. Must not throw + * exceptions. */ - virtual void setSource(DocumentSource* pSource); + virtual void setSource(DocumentSource* source) { + pSource = source; + } /** * In the default case, serializes the DocumentSource and adds it to the std::vector<Value>. @@ -436,9 +429,6 @@ public: } protected: - /** - Base constructor. - */ explicit DocumentSource(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); /** @@ -458,6 +448,11 @@ protected: return std::next(itr); }; + /** + * Release any resources held by this stage. After doDispose() is called the stage must still be + * able to handle calls to getNext(), but can return kEOF. + */ + virtual void doDispose() {} /* Most DocumentSources have an underlying source they get their data @@ -580,7 +575,7 @@ public: * * This function returns a non-OK status if parsing the pipeline failed. */ - virtual StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline( + virtual StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx) = 0; diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp index f2015df5f64..b00feee2b68 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -327,10 +327,9 @@ Document DocumentSourceBucketAuto::makeDocument(const Bucket& bucket) { return out.freeze(); } -void DocumentSourceBucketAuto::dispose() { +void DocumentSourceBucketAuto::doDispose() { _sortedInput.reset(); _bucketsIterator = _buckets.end(); - pSource->dispose(); } Value DocumentSourceBucketAuto::serialize( diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index 75c3f07b683..59fec253134 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -46,7 +46,6 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; GetDepsReturn getDependencies(DepsTracker* deps) const final; GetNextResult getNext() final; - void dispose() final; const char* getSourceName() const final; /** @@ -82,6 +81,9 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); +protected: + void doDispose() final; + private: DocumentSourceBucketAuto(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, const boost::intrusive_ptr<Expression>& groupByExpression, diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index edc49e4923e..aaa362575a4 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -64,20 +64,6 @@ DocumentSource::GetNextResult DocumentSourceCursor::getNext() { return std::move(out); } -void DocumentSourceCursor::dispose() { - _currentBatch.clear(); - if (!_exec) { - return; - } - - // We must hold a collection lock to destroy a PlanExecutor to ensure the CursorManager will - // still exist and can be safely told to deregister the PlanExecutor. - invariant(pExpCtx->opCtx); - AutoGetCollection autoColl(pExpCtx->opCtx, _exec->nss(), MODE_IS); - _exec.reset(); - _rangePreserver.release(); -} - void DocumentSourceCursor::loadBatch() { if (!_exec) { // No more documents. @@ -85,45 +71,46 @@ void DocumentSourceCursor::loadBatch() { return; } - AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss()); - _exec->restoreState(); - - int memUsageBytes = 0; - BSONObj obj; PlanExecutor::ExecState state; + BSONObj resultObj; { - ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); }); - - while ((state = _exec->getNext(&obj, nullptr)) == PlanExecutor::ADVANCED) { - if (_shouldProduceEmptyDocs) { - _currentBatch.push_back(Document()); - } else if (_dependencies) { - _currentBatch.push_back(_dependencies->extractFields(obj)); - } else { - _currentBatch.push_back(Document::fromBsonWithMetaData(obj)); - } + AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss()); + _exec->restoreState(); - if (_limit) { - if (++_docsAddedToBatches == _limit->getLimit()) { - break; + int memUsageBytes = 0; + { + ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); }); + + while ((state = _exec->getNext(&resultObj, nullptr)) == PlanExecutor::ADVANCED) { + if (_shouldProduceEmptyDocs) { + _currentBatch.push_back(Document()); + } else if (_dependencies) { + _currentBatch.push_back(_dependencies->extractFields(resultObj)); + } else { + _currentBatch.push_back(Document::fromBsonWithMetaData(resultObj)); + } + + if (_limit) { + if (++_docsAddedToBatches == _limit->getLimit()) { + break; + } + verify(_docsAddedToBatches < _limit->getLimit()); } - verify(_docsAddedToBatches < _limit->getLimit()); - } - memUsageBytes += _currentBatch.back().getApproximateSize(); + memUsageBytes += _currentBatch.back().getApproximateSize(); - if (memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { - // End this batch and prepare PlanExecutor for yielding. - _exec->saveState(); - return; + if (memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { + // End this batch and prepare PlanExecutor for yielding. + _exec->saveState(); + return; + } } } } // If we got here, there won't be any more documents, so destroy our PlanExecutor. Note we can't // use dispose() since we want to keep the current batch. - _exec.reset(); - _rangePreserver.release(); + cleanupExecutor(); switch (state) { case PlanExecutor::ADVANCED: @@ -132,12 +119,12 @@ void DocumentSourceCursor::loadBatch() { case PlanExecutor::DEAD: { uasserted(ErrorCodes::QueryPlanKilled, str::stream() << "collection or index disappeared when cursor yielded: " - << WorkingSetCommon::toStatusString(obj)); + << WorkingSetCommon::toStatusString(resultObj)); } case PlanExecutor::FAILURE: { uasserted(17285, str::stream() << "cursor encountered an error: " - << WorkingSetCommon::toStatusString(obj)); + << WorkingSetCommon::toStatusString(resultObj)); } default: MONGO_UNREACHABLE; @@ -226,12 +213,44 @@ void DocumentSourceCursor::reattachToOperationContext(OperationContext* opCtx) { } } -DocumentSourceCursor::DocumentSourceCursor(Collection* collection, - std::unique_ptr<PlanExecutor> exec, - const intrusive_ptr<ExpressionContext>& pCtx) +void DocumentSourceCursor::doDispose() { + _currentBatch.clear(); + if (!_exec) { + // We've already properly disposed of our PlanExecutor. + return; + } + cleanupExecutor(); +} + +void DocumentSourceCursor::cleanupExecutor() { + invariant(_exec); + auto* opCtx = pExpCtx->opCtx; + // We need to be careful to not use AutoGetCollection here, since we only need the lock to + // protect potential access to the Collection's CursorManager, and AutoGetCollection may throw + // if this namespace has since turned into a view. Using Database::getCollection() will simply + // return nullptr if the collection has since turned into a view. In this case, '_exec' will + // already have been marked as killed when the collection was dropped, and we won't need to + // access the CursorManager to properly dispose of it. + AutoGetDb dbLock(opCtx, _exec->nss().db(), MODE_IS); + Lock::CollectionLock collLock(opCtx->lockState(), _exec->nss().ns(), MODE_IS); + auto collection = dbLock.getDb() ? dbLock.getDb()->getCollection(opCtx, _exec->nss()) : nullptr; + auto cursorManager = collection ? collection->getCursorManager() : nullptr; + _exec->dispose(opCtx, cursorManager); + _exec.reset(); + _rangePreserver.release(); +} + +DocumentSourceCursor::~DocumentSourceCursor() { + invariant(!_exec); // '_exec' should have been cleaned up via dispose() before destruction. +} + +DocumentSourceCursor::DocumentSourceCursor( + Collection* collection, + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, + const intrusive_ptr<ExpressionContext>& pCtx) : DocumentSource(pCtx), _docsAddedToBatches(0), - _rangePreserver(collection), + _rangePreserver(pExpCtx->opCtx, collection), _exec(std::move(exec)), _outputSorts(_exec->getOutputSorts()) { @@ -245,7 +264,7 @@ DocumentSourceCursor::DocumentSourceCursor(Collection* collection, intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create( Collection* collection, - std::unique_ptr<PlanExecutor> exec, + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, const intrusive_ptr<ExpressionContext>& pExpCtx) { intrusive_ptr<DocumentSourceCursor> source( new DocumentSourceCursor(collection, std::move(exec), pExpCtx)); diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 348174b656a..11489897b72 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -51,16 +51,10 @@ public: BSONObjSet getOutputSorts() final { return _outputSorts; } - /** - * Attempts to combine with any subsequent $limit stages by setting the internal '_limit' field. - */ - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; bool isValidInitialSource() const final { return true; } - void dispose() final; void detachFromOperationContext() final; @@ -72,7 +66,7 @@ public: */ static boost::intrusive_ptr<DocumentSourceCursor> create( Collection* collection, - std::unique_ptr<PlanExecutor> exec, + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); /* @@ -139,11 +133,33 @@ public: return _planSummaryStats; } +protected: + /** + * Disposes of '_exec' and '_rangePreserver' if they haven't been disposed already. This + * involves taking a collection lock. + */ + void doDispose() final; + + /** + * Attempts to combine with any subsequent $limit stages by setting the internal '_limit' field. + */ + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + private: DocumentSourceCursor(Collection* collection, - std::unique_ptr<PlanExecutor> exec, + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + ~DocumentSourceCursor(); + + /** + * Acquires locks to properly destroy and de-register '_exec'. '_exec' must be non-null. + */ + void cleanupExecutor(); + /** + * Reads a batch of data from '_exec'. + */ void loadBatch(); void recordPlanSummaryStats(); @@ -159,8 +175,10 @@ private: boost::intrusive_ptr<DocumentSourceLimit> _limit; long long _docsAddedToBatches; // for _limit enforcement + // Both '_rangePreserver' and '_exec' must be destroyed while holding the collection lock. RangePreserver _rangePreserver; - std::unique_ptr<PlanExecutor> _exec; + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _exec; + BSONObjSet _outputSorts; std::string _planSummary; PlanSummaryStats _planSummaryStats; diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 0842f7f68bf..99dcbd85827 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -149,6 +149,13 @@ void DocumentSourceFacet::setSource(DocumentSource* source) { _teeBuffer->setSource(source); } +void DocumentSourceFacet::doDispose() { + for (auto&& facet : _facets) { + facet.pipeline.get_deleter().dismissDisposal(); + facet.pipeline->dispose(pExpCtx->opCtx); + } +} + DocumentSource::GetNextResult DocumentSourceFacet::getNext() { pExpCtx->checkForInterrupt(); diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index 4da558e37a1..37c2fc5cb8f 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -59,11 +59,11 @@ class DocumentSourceFacet final : public DocumentSourceNeedsMongod, public SplittableDocumentSource { public: struct FacetPipeline { - FacetPipeline(std::string name, boost::intrusive_ptr<Pipeline> pipeline) + FacetPipeline(std::string name, std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline) : name(std::move(name)), pipeline(std::move(pipeline)) {} std::string name; - boost::intrusive_ptr<Pipeline> pipeline; + std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline; }; class LiteParsed : public LiteParsedDocumentSource { @@ -131,6 +131,9 @@ public: void doReattachToOperationContext(OperationContext* opCtx) final; bool needsPrimaryShard() const final; +protected: + void doDispose() final; + private: DocumentSourceFacet(std::vector<FacetPipeline> facetPipelines, const boost::intrusive_ptr<ExpressionContext>& expCtx); diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 733634b8994..54c22a306fc 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -186,17 +186,19 @@ public: TEST_F(DocumentSourceFacetTest, SingleFacetShouldReceiveAllDocuments) { auto ctx = getExpCtx(); + deque<DocumentSource::GetNextResult> inputs = { + Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}}; + auto mock = DocumentSourceMock::create(inputs); + auto dummy = DocumentSourcePassthrough::create(); auto statusWithPipeline = Pipeline::create({dummy}, ctx); ASSERT_OK(statusWithPipeline.getStatus()); auto pipeline = std::move(statusWithPipeline.getValue()); - auto facetStage = DocumentSourceFacet::create({{"results", pipeline}}, ctx); - - deque<DocumentSource::GetNextResult> inputs = { - Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}}; - auto mock = DocumentSourceMock::create(inputs); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("results", std::move(pipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); facetStage->setSource(mock.get()); auto output = facetStage->getNext(); @@ -213,18 +215,21 @@ TEST_F(DocumentSourceFacetTest, SingleFacetShouldReceiveAllDocuments) { TEST_F(DocumentSourceFacetTest, MultipleFacetsShouldSeeTheSameDocuments) { auto ctx = getExpCtx(); + deque<DocumentSource::GetNextResult> inputs = { + Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}}; + auto mock = DocumentSourceMock::create(inputs); + auto firstDummy = DocumentSourcePassthrough::create(); auto firstPipeline = uassertStatusOK(Pipeline::create({firstDummy}, ctx)); auto secondDummy = DocumentSourcePassthrough::create(); auto secondPipeline = uassertStatusOK(Pipeline::create({secondDummy}, ctx)); - auto facetStage = - DocumentSourceFacet::create({{"first", firstPipeline}, {"second", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("first", std::move(firstPipeline)); + facets.emplace_back("second", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - deque<DocumentSource::GetNextResult> inputs = { - Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}}; - auto mock = DocumentSourceMock::create(inputs); facetStage->setSource(mock.get()); auto output = facetStage->getNext(); @@ -249,18 +254,21 @@ TEST_F(DocumentSourceFacetTest, ShouldCorrectlyHandleSubPipelinesYieldingDifferentNumbersOfResults) { auto ctx = getExpCtx(); + deque<DocumentSource::GetNextResult> inputs = { + Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}, Document{{"_id", 3}}}; + auto mock = DocumentSourceMock::create(inputs); + auto passthrough = DocumentSourcePassthrough::create(); auto passthroughPipe = uassertStatusOK(Pipeline::create({passthrough}, ctx)); auto limit = DocumentSourceLimit::create(ctx, 1); auto limitedPipe = uassertStatusOK(Pipeline::create({limit}, ctx)); - auto facetStage = - DocumentSourceFacet::create({{"all", passthroughPipe}, {"first", limitedPipe}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("all", std::move(passthroughPipe)); + facets.emplace_back("first", std::move(limitedPipe)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - deque<DocumentSource::GetNextResult> inputs = { - Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}, Document{{"_id", 3}}}; - auto mock = DocumentSourceMock::create(inputs); facetStage->setSource(mock.get()); vector<Value> expectedPassthroughOutput; @@ -285,14 +293,17 @@ TEST_F(DocumentSourceFacetTest, TEST_F(DocumentSourceFacetTest, ShouldBeAbleToEvaluateMultipleStagesWithinOneSubPipeline) { auto ctx = getExpCtx(); + deque<DocumentSource::GetNextResult> inputs = {Document{{"_id", 0}}, Document{{"_id", 1}}}; + auto mock = DocumentSourceMock::create(inputs); + auto firstDummy = DocumentSourcePassthrough::create(); auto secondDummy = DocumentSourcePassthrough::create(); auto pipeline = uassertStatusOK(Pipeline::create({firstDummy, secondDummy}, ctx)); - auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("subPipe", std::move(pipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - deque<DocumentSource::GetNextResult> inputs = {Document{{"_id", 0}}, Document{{"_id", 1}}}; - auto mock = DocumentSourceMock::create(inputs); facetStage->setSource(mock.get()); auto output = facetStage->getNext(); @@ -300,18 +311,41 @@ TEST_F(DocumentSourceFacetTest, ShouldBeAbleToEvaluateMultipleStagesWithinOneSub ASSERT_DOCUMENT_EQ(output.getDocument(), Document(fromjson("{subPipe: [{_id: 0}, {_id: 1}]}"))); } +TEST_F(DocumentSourceFacetTest, ShouldPropagateDisposeThroughToSource) { + auto ctx = getExpCtx(); + + auto mockSource = DocumentSourceMock::create(); + + auto firstDummy = DocumentSourcePassthrough::create(); + auto firstPipe = uassertStatusOK(Pipeline::create({firstDummy}, ctx)); + auto secondDummy = DocumentSourcePassthrough::create(); + auto secondPipe = uassertStatusOK(Pipeline::create({secondDummy}, ctx)); + + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("firstPipe", std::move(firstPipe)); + facets.emplace_back("secondPipe", std::move(secondPipe)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); + + facetStage->setSource(mockSource.get()); + + facetStage->dispose(); + ASSERT_TRUE(mockSource->isDisposed); +} + // TODO: DocumentSourceFacet will have to propagate pauses if we ever allow nested $facets. DEATH_TEST_F(DocumentSourceFacetTest, ShouldFailIfGivenPausedInput, "Invariant failure !input.isPaused()") { auto ctx = getExpCtx(); + auto mock = DocumentSourceMock::create(DocumentSource::GetNextResult::makePauseExecution()); auto firstDummy = DocumentSourcePassthrough::create(); auto pipeline = uassertStatusOK(Pipeline::create({firstDummy}, ctx)); - auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("subPipe", std::move(pipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - auto mock = DocumentSourceMock::create(DocumentSource::GetNextResult::makePauseExecution()); facetStage->setSource(mock.get()); facetStage->getNext(); // This should cause a crash. @@ -335,8 +369,10 @@ TEST_F(DocumentSourceFacetTest, ShouldBeAbleToReParseSerializedStage) { auto secondSkip = DocumentSourceSkip::create(ctx, 2); auto secondPipeline = uassertStatusOK(Pipeline::create({secondSkip}, ctx)); - auto facetStage = DocumentSourceFacet::create( - {{"skippedOne", firstPipeline}, {"skippedTwo", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("skippedOne", std::move(firstPipeline)); + facets.emplace_back("skippedTwo", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); // Serialize the facet stage. vector<Value> serialization; @@ -373,7 +409,9 @@ TEST_F(DocumentSourceFacetTest, ShouldOptimizeInnerPipelines) { auto dummy = DocumentSourcePassthrough::create(); auto pipeline = unittest::assertGet(Pipeline::create({dummy}, ctx)); - auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("subPipe", std::move(pipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); ASSERT_FALSE(dummy->isOptimized); facetStage->optimize(); @@ -389,8 +427,10 @@ TEST_F(DocumentSourceFacetTest, ShouldPropogateDetachingAndReattachingOfOpCtx) { auto secondDummy = DocumentSourcePassthrough::create(); auto secondPipeline = unittest::assertGet(Pipeline::create({secondDummy}, ctx)); - auto facetStage = - DocumentSourceFacet::create({{"one", firstPipeline}, {"two", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("one", std::move(firstPipeline)); + facets.emplace_back("two", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); // Test detaching. ASSERT_FALSE(firstDummy->isDetachedFromOpCtx); @@ -456,8 +496,10 @@ TEST_F(DocumentSourceFacetTest, ShouldUnionDependenciesOfInnerPipelines) { ASSERT_EQ(secondPipelineDeps.fields.size(), 1UL); ASSERT_EQ(secondPipelineDeps.fields.count("b"), 1UL); - auto facetStage = - DocumentSourceFacet::create({{"needsA", firstPipeline}, {"needsB", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("needsA", std::move(firstPipeline)); + facets.emplace_back("needsB", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); DepsTracker deps(DepsTracker::MetadataAvailable::kNoMetadata); ASSERT_EQ(facetStage->getDependencies(&deps), DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL); @@ -491,8 +533,10 @@ TEST_F(DocumentSourceFacetTest, ShouldRequireWholeDocumentIfAnyPipelineRequiresW auto needsWholeDocument = DocumentSourceNeedsWholeDocument::create(); auto secondPipeline = unittest::assertGet(Pipeline::create({needsWholeDocument}, ctx)); - auto facetStage = DocumentSourceFacet::create( - {{"needsA", firstPipeline}, {"needsWholeDocument", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("needsA", std::move(firstPipeline)); + facets.emplace_back("needsWholeDocument", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); DepsTracker deps(DepsTracker::MetadataAvailable::kNoMetadata); ASSERT_EQ(facetStage->getDependencies(&deps), DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL); @@ -526,10 +570,11 @@ TEST_F(DocumentSourceFacetTest, ShouldRequireTextScoreIfAnyPipelineRequiresTextS auto needsTextScore = DocumentSourceNeedsOnlyTextScore::create(); auto thirdPipeline = unittest::assertGet(Pipeline::create({needsTextScore}, ctx)); - auto facetStage = DocumentSourceFacet::create({{"needsA", firstPipeline}, - {"needsWholeDocument", secondPipeline}, - {"needsTextScore", thirdPipeline}}, - ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("needsA", std::move(firstPipeline)); + facets.emplace_back("needsWholeDocument", std::move(secondPipeline)); + facets.emplace_back("needsTextScore", std::move(thirdPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); DepsTracker deps(DepsTracker::MetadataAvailable::kTextScore); ASSERT_EQ(facetStage->getDependencies(&deps), DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL); @@ -546,8 +591,10 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs auto needsTextScore = DocumentSourceNeedsOnlyTextScore::create(); auto secondPipeline = unittest::assertGet(Pipeline::create({needsTextScore}, ctx)); - auto facetStage = DocumentSourceFacet::create( - {{"needsA", firstPipeline}, {"needsTextScore", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("needsA", std::move(firstPipeline)); + facets.emplace_back("needsTextScore", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); DepsTracker deps(DepsTracker::MetadataAvailable::kNoMetadata); ASSERT_THROWS(facetStage->getDependencies(&deps), UserException); @@ -576,8 +623,10 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima auto needsPrimaryShard = DocumentSourceNeedsPrimaryShard::create(); auto secondPipeline = unittest::assertGet(Pipeline::create({needsPrimaryShard}, ctx)); - auto facetStage = DocumentSourceFacet::create( - {{"passthrough", firstPipeline}, {"needsPrimaryShard", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("passthrough", std::move(firstPipeline)); + facets.emplace_back("needsPrimaryShard", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); ASSERT_TRUE(facetStage->needsPrimaryShard()); } @@ -591,8 +640,10 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr auto secondPassthrough = DocumentSourcePassthrough::create(); auto secondPipeline = unittest::assertGet(Pipeline::create({secondPassthrough}, ctx)); - auto facetStage = - DocumentSourceFacet::create({{"first", firstPipeline}, {"second", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("first", std::move(firstPipeline)); + facets.emplace_back("second", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); ASSERT_FALSE(facetStage->needsPrimaryShard()); } diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index e8e2cacc108..717b82d59aa 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -164,11 +164,10 @@ DocumentSource::GetNextResult DocumentSourceGraphLookUp::getNextUnwound() { } } -void DocumentSourceGraphLookUp::dispose() { +void DocumentSourceGraphLookUp::doDispose() { _cache.clear(); _frontier.clear(); _visited.clear(); - pSource->dispose(); } void DocumentSourceGraphLookUp::doBreadthFirstSearch() { diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 8c14a2ab8fa..e196b4578ca 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -43,7 +43,6 @@ public: GetNextResult getNext() final; const char* getSourceName() const final; - void dispose() final; BSONObjSet getOutputSorts() final; void serializeToArray( std::vector<Value>& array, @@ -58,12 +57,6 @@ public: return true; } - /** - * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwind' field. - */ - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; - GetDepsReturn getDependencies(DepsTracker* deps) const final { _startWith->addDependencies(deps); return SEE_NEXT; @@ -96,6 +89,15 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); +protected: + void doDispose() final; + + /** + * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwind' field. + */ + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + private: DocumentSourceGraphLookUp( const boost::intrusive_ptr<ExpressionContext>& expCtx, diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index 4c190b24f6e..0d693d81e25 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -61,7 +61,7 @@ public: MockMongodImplementation(std::deque<DocumentSource::GetNextResult> results) : _results(std::move(results)) {} - StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline( + StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx) final { auto pipeline = Pipeline::parse(rawPipeline, expCtx); diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index d4f44850a8f..2d625dc2f1f 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -169,7 +169,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::getNextStreaming() { return std::move(out); } -void DocumentSourceGroup::dispose() { +void DocumentSourceGroup::doDispose() { // Free our resources. _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>(); _sorterIterator.reset(); @@ -178,9 +178,6 @@ void DocumentSourceGroup::dispose() { groupsIterator = _groups->end(); _firstDocOfNextGroup = boost::none; - - // Free our source's resources. - pSource->dispose(); } intrusive_ptr<DocumentSource> DocumentSourceGroup::optimize() { diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 86b973b9e04..643e291415b 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -50,7 +50,6 @@ public: GetDepsReturn getDependencies(DepsTracker* deps) const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; GetNextResult getNext() final; - void dispose() final; const char* getSourceName() const final; BSONObjSet getOutputSorts() final; @@ -96,6 +95,9 @@ public: boost::intrusive_ptr<DocumentSource> getShardSource() final; boost::intrusive_ptr<DocumentSource> getMergeSource() final; +protected: + void doDispose() final; + private: explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, size_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index a474baefe6a..b6cf88172c3 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -280,9 +280,11 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt( return itr; } -void DocumentSourceLookUp::dispose() { - _pipeline.reset(); - pSource->dispose(); +void DocumentSourceLookUp::doDispose() { + if (_pipeline) { + _pipeline->dispose(pExpCtx->opCtx); + _pipeline.reset(); + } } BSONObj DocumentSourceLookUp::makeMatchStageFromInput(const Document& input, @@ -379,8 +381,17 @@ DocumentSource::GetNextResult DocumentSourceLookUp::unwindResult() { makeMatchStageFromInput(*_input, _localField, _foreignFieldFieldName, filter); // We've already allocated space for the trailing $match stage in '_fromPipeline'. _fromPipeline.back() = matchStage; + + if (_pipeline) { + _pipeline->dispose(pExpCtx->opCtx); + } _pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx)); + // The $lookup stage takes responsibility for disposing of its Pipeline, since it will + // potentially be used by multiple OperationContexts, and the $lookup stage is part of an + // outer Pipeline that will propagate dispose() calls before being destroyed. + _pipeline.get_deleter().dismissDisposal(); + _cursorIndex = 0; _nextValue = _pipeline->getNext(); diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index febdedc0472..e696e8bb4e5 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -62,14 +62,7 @@ public: return true; } - /** - * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwindSrc' - * field. - */ - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; GetDepsReturn getDependencies(DepsTracker* deps) const final; - void dispose() final; BSONObjSet getOutputSorts() final { return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()}); @@ -115,6 +108,16 @@ public: _handlingUnwind = true; } +protected: + void doDispose() final; + + /** + * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwindSrc' + * field. + */ + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + private: DocumentSourceLookUp(NamespaceString fromNs, std::string as, @@ -152,7 +155,7 @@ private: // The following members are used to hold onto state across getNext() calls when // '_handlingUnwind' is true. long long _cursorIndex = 0; - boost::intrusive_ptr<Pipeline> _pipeline; + std::unique_ptr<Pipeline, Pipeline::Deleter> _pipeline; boost::optional<Document> _input; boost::optional<Document> _nextValue; }; diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 9da414ec6f6..4e8a59bd75a 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -157,7 +157,7 @@ public: return false; } - StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline( + StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx) final { auto pipeline = Pipeline::parse(rawPipeline, expCtx); @@ -223,6 +223,7 @@ TEST_F(DocumentSourceLookUpTest, ShouldPropagatePauses) { ASSERT_TRUE(lookup->getNext().isEOF()); ASSERT_TRUE(lookup->getNext().isEOF()); + lookup->dispose(); } TEST_F(DocumentSourceLookUpTest, ShouldPropagatePausesWhileUnwinding) { @@ -275,6 +276,7 @@ TEST_F(DocumentSourceLookUpTest, ShouldPropagatePausesWhileUnwinding) { ASSERT_TRUE(lookup->getNext().isEOF()); ASSERT_TRUE(lookup->getNext().isEOF()); + lookup->dispose(); } TEST_F(DocumentSourceLookUpTest, LookupReportsAsFieldIsModified) { @@ -296,6 +298,7 @@ TEST_F(DocumentSourceLookUpTest, LookupReportsAsFieldIsModified) { ASSERT(modifiedPaths.type == DocumentSource::GetModPathsReturn::Type::kFiniteSet); ASSERT_EQ(1U, modifiedPaths.paths.size()); ASSERT_EQ(1U, modifiedPaths.paths.count("foreignDocs")); + lookup->dispose(); } TEST_F(DocumentSourceLookUpTest, LookupReportsFieldsModifiedByAbsorbedUnwind) { @@ -323,6 +326,7 @@ TEST_F(DocumentSourceLookUpTest, LookupReportsFieldsModifiedByAbsorbedUnwind) { ASSERT_EQ(2U, modifiedPaths.paths.size()); ASSERT_EQ(1U, modifiedPaths.paths.count("foreignDoc")); ASSERT_EQ(1U, modifiedPaths.paths.count("arrIndex")); + lookup->dispose(); } } // namespace diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp index e72377ca04e..0f611b04423 100644 --- a/src/mongo/db/pipeline/document_source_match.cpp +++ b/src/mongo/db/pipeline/document_source_match.cpp @@ -103,7 +103,11 @@ Pipeline::SourceContainer::iterator DocumentSourceMatch::doOptimizeAt( // Since a text search must use an index, it must be the first stage in the pipeline. We cannot // combine a non-text stage with a text stage, as that may turn an invalid pipeline into a // valid one, unbeknownst to the user. - if (nextMatch && !nextMatch->_isTextQuery) { + if (nextMatch) { + // Text queries are not allowed anywhere except as the first stage. This is checked before + // optimization. + invariant(!nextMatch->_isTextQuery); + // Merge 'nextMatch' into this stage. joinMatchWith(nextMatch); @@ -344,11 +348,6 @@ BSONObj DocumentSourceMatch::redactSafePortion() const { return redactSafePortionTopLevel(getQuery()).toBson(); } -void DocumentSourceMatch::setSource(DocumentSource* source) { - uassert(17313, "$match with $text is only allowed as the first pipeline stage", !_isTextQuery); - DocumentSource::setSource(source); -} - bool DocumentSourceMatch::isTextQuery(const BSONObj& query) { BSONForEach(e, query) { const StringData fieldName = e.fieldNameStringData(); diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index faeda447cb3..4abd4862992 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -55,7 +55,6 @@ public: */ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) final; - void setSource(DocumentSource* Source) final; GetDepsReturn getDependencies(DepsTracker* deps) const final; @@ -154,7 +153,7 @@ private: DepsTracker _dependencies; BSONObj _predicate; - bool _isTextQuery; + const bool _isTextQuery; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp index a9f06ef6aea..e26f83d6e3c 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp @@ -174,7 +174,7 @@ DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() { return std::move(next); } -void DocumentSourceMergeCursors::dispose() { +void DocumentSourceMergeCursors::doDispose() { // Note it is an error to call done() on a connection before consuming the response from a // request. Therefore it is an error to call dispose() if there are any outstanding connections // which have not received a reply. diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index 0842ca89bdd..b32e1170573 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -51,7 +51,6 @@ public: // virtuals from DocumentSource GetNextResult getNext() final; const char* getSourceName() const final; - void dispose() final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; bool isValidInitialSource() const final { return true; @@ -76,6 +75,9 @@ public: */ static Document nextSafeFrom(DBClientCursor* cursor); +protected: + void doDispose() final; + private: struct CursorAndConnection { CursorAndConnection(const CursorDescriptor& cursorDescriptor); diff --git a/src/mongo/db/pipeline/document_source_mock.cpp b/src/mongo/db/pipeline/document_source_mock.cpp index 9f1aab2977b..79deb67c00b 100644 --- a/src/mongo/db/pipeline/document_source_mock.cpp +++ b/src/mongo/db/pipeline/document_source_mock.cpp @@ -58,7 +58,7 @@ Value DocumentSourceMock::serialize(boost::optional<ExplainOptions::Verbosity> e return Value(Document{{getSourceName(), Document()}}); } -void DocumentSourceMock::dispose() { +void DocumentSourceMock::doDispose() { isDisposed = true; } diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index 74973f3fd27..bdde16b711f 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -48,7 +48,6 @@ public: const char* getSourceName() const override; Value serialize( boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; - void dispose() override; bool isValidInitialSource() const override { return true; } @@ -89,6 +88,9 @@ public: bool isExpCtxInjected = false; BSONObjSet sorts; + +protected: + void doDispose() override; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp index 5fcd0ba5400..df363f8fde2 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp @@ -70,7 +70,7 @@ intrusive_ptr<DocumentSource> DocumentSourceSingleDocumentTransformation::optimi return this; } -void DocumentSourceSingleDocumentTransformation::dispose() { +void DocumentSourceSingleDocumentTransformation::doDispose() { _parsedTransform.reset(); } diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 86f4607513c..5e09b34e639 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -69,10 +69,7 @@ public: const char* getSourceName() const final; GetNextResult getNext() final; boost::intrusive_ptr<DocumentSource> optimize() final; - void dispose() final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final; GetModPathsReturn getModifiedPaths() const final; @@ -80,6 +77,12 @@ public: return true; } +protected: + void doDispose() final; + + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + private: // Stores transformation logic. std::unique_ptr<TransformerInterface> _parsedTransform; diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index b32b94760d9..b785c89b347 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -100,11 +100,8 @@ void DocumentSourceSort::serializeToArray( } } -void DocumentSourceSort::dispose() { +void DocumentSourceSort::doDispose() { _output.reset(); - if (pSource) { - pSource->dispose(); - } } long long DocumentSourceSort::getLimit() const { diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 52d2ff2b687..83af9a4bcb6 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -60,13 +60,6 @@ public: return allPrefixes(_sort); } - /** - * Attempts to absorb a subsequent $limit stage so that it an perform a top-k sort. - */ - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; - void dispose() final; - GetDepsReturn getDependencies(DepsTracker* deps) const final; boost::intrusive_ptr<DocumentSource> getShardSource() final; @@ -122,6 +115,14 @@ public: return limitSrc; } +protected: + /** + * Attempts to absorb a subsequent $limit stage so that it an perform a top-k sort. + */ + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + void doDispose() final; + private: explicit DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.cpp b/src/mongo/db/pipeline/document_source_tee_consumer.cpp index c620fba222a..88d940df8ad 100644 --- a/src/mongo/db/pipeline/document_source_tee_consumer.cpp +++ b/src/mongo/db/pipeline/document_source_tee_consumer.cpp @@ -58,7 +58,7 @@ DocumentSource::GetNextResult DocumentSourceTeeConsumer::getNext() { return _bufferSource->getNext(_facetId); } -void DocumentSourceTeeConsumer::dispose() { +void DocumentSourceTeeConsumer::doDispose() { _bufferSource->dispose(_facetId); } diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h index cebeaee9a2e..826967a5c48 100644 --- a/src/mongo/db/pipeline/document_source_tee_consumer.h +++ b/src/mongo/db/pipeline/document_source_tee_consumer.h @@ -53,7 +53,6 @@ public: size_t facetId, const boost::intrusive_ptr<TeeBuffer>& bufferSource); - void dispose() final; GetNextResult getNext() final; /** @@ -65,6 +64,9 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; +protected: + void doDispose() final; + private: DocumentSourceTeeConsumer(const boost::intrusive_ptr<ExpressionContext>& expCtx, size_t facetId, diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 027f4527aa0..8c486e9ca4d 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -64,9 +64,14 @@ Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheC Pipeline::Pipeline(SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) : _sources(stages), pCtx(expCtx) {} -StatusWith<intrusive_ptr<Pipeline>> Pipeline::parse( +Pipeline::~Pipeline() { + invariant(_disposed); +} + +StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parse( const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) { - intrusive_ptr<Pipeline> pipeline(new Pipeline(expCtx)); + std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline(new Pipeline(expCtx), + Pipeline::Deleter(expCtx->opCtx)); for (auto&& stageObj : rawPipeline) { auto parsedSources = DocumentSource::parse(expCtx, stageObj); @@ -79,18 +84,19 @@ StatusWith<intrusive_ptr<Pipeline>> Pipeline::parse( return status; } pipeline->stitch(); - return pipeline; + return std::move(pipeline); } -StatusWith<intrusive_ptr<Pipeline>> Pipeline::create( +StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::create( SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) { - intrusive_ptr<Pipeline> pipeline(new Pipeline(stages, expCtx)); + std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline(new Pipeline(stages, expCtx), + Pipeline::Deleter(expCtx->opCtx)); auto status = pipeline->ensureAllStagesAreInLegalPositions(); if (!status.isOK()) { return status; } pipeline->stitch(); - return pipeline; + return std::move(pipeline); } Status Pipeline::ensureAllStagesAreInLegalPositions() const { @@ -101,6 +107,12 @@ Status Pipeline::ensureAllStagesAreInLegalPositions() const { str::stream() << stage->getSourceName() << " is only valid as the first stage in a pipeline."}; } + auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get()); + if (i != 0 && matchStage && matchStage->isTextQuery()) { + return {ErrorCodes::BadValue, + "$match with $text is only allowed as the first pipeline stage", + 17313}; + } if (dynamic_cast<DocumentSourceOut*>(stage.get()) && i != _sources.size() - 1) { return {ErrorCodes::BadValue, "$out can only be the final stage in the pipeline"}; @@ -115,6 +127,9 @@ void Pipeline::optimizePipeline() { SourceContainer::iterator itr = _sources.begin(); + // We could be swapping around stages during this process, so disconnect the pipeline to prevent + // us from entering a state with dangling pointers. + unstitch(); while (itr != _sources.end() && std::next(itr) != _sources.end()) { invariant((*itr).get()); itr = (*itr).get()->optimizeAt(itr, &_sources); @@ -165,11 +180,30 @@ void Pipeline::reattachToOperationContext(OperationContext* opCtx) { } } -intrusive_ptr<Pipeline> Pipeline::splitForSharded() { +void Pipeline::dispose(OperationContext* opCtx) { + try { + pCtx->opCtx = opCtx; + + // Make sure all stages are connected, in case we are being disposed via an error path and + // were + // not stitched at the time of the error. + stitch(); + + if (!_sources.empty()) { + _sources.back()->dispose(); + } + _disposed = true; + } catch (...) { + std::terminate(); + } +} + +std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() { // Create and initialize the shard spec we'll return. We start with an empty pipeline on the // shards and all work being done in the merger. Optimizations can move operations between // the pipelines to be more efficient. - intrusive_ptr<Pipeline> shardPipeline(new Pipeline(pCtx)); + std::unique_ptr<Pipeline, Pipeline::Deleter> shardPipeline(new Pipeline(pCtx), + Pipeline::Deleter(pCtx->opCtx)); // The order in which optimizations are applied can have significant impact on the // efficiency of the final pipeline. Be Careful! @@ -295,12 +329,19 @@ vector<Value> Pipeline::serialize() const { return serializedSources; } +void Pipeline::unstitch() { + for (auto&& stage : _sources) { + stage->setSource(nullptr); + } +} + void Pipeline::stitch() { if (_sources.empty()) { return; } // Chain together all the stages. DocumentSource* prevSource = _sources.front().get(); + prevSource->setSource(nullptr); for (SourceContainer::iterator iter(++_sources.begin()), listEnd(_sources.end()); iter != listEnd; ++iter) { diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 4f15289efef..b32b5df76d5 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -43,20 +43,60 @@ namespace mongo { class BSONObj; class BSONObjBuilder; -class CollatorInterface; +class ExpressionContext; class DocumentSource; +class CollatorInterface; class OperationContext; -class ExpressionContext; /** * A Pipeline object represents a list of DocumentSources and is responsible for optimizing the * pipeline. */ -class Pipeline : public IntrusiveCounterUnsigned { +class Pipeline { public: typedef std::list<boost::intrusive_ptr<DocumentSource>> SourceContainer; /** + * This class will ensure a Pipeline is disposed before it is deleted. + */ + class Deleter { + public: + /** + * Constructs an empty deleter. Useful for creating a + * unique_ptr<Pipeline, Pipeline::Deleter> without populating it. + */ + Deleter() {} + + explicit Deleter(OperationContext* opCtx) : _opCtx(opCtx) {} + + /** + * If an owner of a std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> wants to assume + * responsibility for calling PlanExecutor::dispose(), they can call dismissDisposal(). If + * dismissed, a Deleter will not call dispose() when deleting the PlanExecutor. + */ + void dismissDisposal() { + _dismissed = true; + } + + /** + * Calls dispose() on 'pipeline', unless this Deleter has been dismissed. + */ + void operator()(Pipeline* pipeline) { + // It is illegal to call this method on a default-constructed Deleter. + invariant(_opCtx); + if (!_dismissed) { + pipeline->dispose(_opCtx); + } + delete pipeline; + } + + private: + OperationContext* _opCtx = nullptr; + + bool _dismissed = false; + }; + + /** * Parses a Pipeline from a BSONElement representing a list of DocumentSources. Returns a non-OK * status if it failed to parse. The returned pipeline is not optimized, but the caller may * convert it to an optimized pipeline by calling optimizePipeline(). @@ -65,7 +105,7 @@ public: * will not be used during execution of the pipeline. Doing so may cause comparisons made during * parse-time to return the wrong results. */ - static StatusWith<boost::intrusive_ptr<Pipeline>> parse( + static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> parse( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -75,7 +115,7 @@ public: * Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage * is present but is not the last stage. */ - static StatusWith<boost::intrusive_ptr<Pipeline>> create( + static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> create( SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx); /** @@ -104,15 +144,25 @@ public: void reattachToOperationContext(OperationContext* opCtx); /** - Split the current Pipeline into a Pipeline for each shard, and - a Pipeline that combines the results within mongos. - - This permanently alters this pipeline for the merging operation. + * Releases any resources held by this pipeline such as PlanExecutors or in-memory structures. + * Must be called before deleting a Pipeline. + * + * There are multiple cleanup scenarios: + * - This Pipeline will only ever use one OperationContext. In this case the Pipeline::Deleter + * will automatically call dispose() before deleting the Pipeline, and the owner need not + * call dispose(). + * - This Pipeline may use multiple OperationContexts over its lifetime. In this case it + * is the owner's responsibility to call dispose() with a valid OperationContext before + * deleting the Pipeline. + */ + void dispose(OperationContext* opCtx); - @returns the Spec for the pipeline command that should be sent - to the shards + /** + * Split the current Pipeline into a Pipeline for each shard, and a Pipeline that combines the + * results within mongos. This permanently alters this pipeline for the merging operation, and + * returns a Pipeline object that should be executed on each targeted shard. */ - boost::intrusive_ptr<Pipeline> splitForSharded(); + std::unique_ptr<Pipeline, Pipeline::Deleter> splitForSharded(); /** If the pipeline starts with a $match, return its BSON predicate. * Returns empty BSON if the first stage isn't $match. @@ -164,14 +214,12 @@ public: return _sources; } - /* - PipelineD is a "sister" class that has additional functionality - for the Pipeline. It exists because of linkage requirements. - Pipeline needs to function in mongod and mongos. PipelineD - contains extra functionality required in mongod, and which can't - appear in mongos because the required symbols are unavailable - for linking there. Consider PipelineD to be an extension of this - class for mongod only. + /** + * PipelineD is a "sister" class that has additional functionality for the Pipeline. It exists + * because of linkage requirements. Pipeline needs to function in mongod and mongos. PipelineD + * contains extra functionality required in mongod, and which can't appear in mongos because the + * required symbols are unavailable for linking there. Consider PipelineD to be an extension of + * this class for mongod only. */ friend class PipelineD; @@ -189,6 +237,8 @@ private: Pipeline(const boost::intrusive_ptr<ExpressionContext>& pCtx); Pipeline(SourceContainer stages, const boost::intrusive_ptr<ExpressionContext>& pCtx); + ~Pipeline(); + /** * Stitch together the source pointers by calling setSource() for each source in '_sources'. * This function must be called any time the order of stages within the pipeline changes, e.g. @@ -197,6 +247,12 @@ private: void stitch(); /** + * Reset all stages' child pointers to nullptr. Used to prevent dangling pointers during the + * optimization process, where we might swap or destroy stages. + */ + void unstitch(); + + /** * Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage * is present but is not the last stage in the pipeline. */ @@ -205,5 +261,6 @@ private: SourceContainer _sources; boost::intrusive_ptr<ExpressionContext> pCtx; + bool _disposed = false; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 57cfe0f53d7..236e747a463 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -178,7 +178,7 @@ public: str::stream() << "renameCollection failed: " << info}; } - StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline( + StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx) final { // 'expCtx' may represent the settings for an aggregation pipeline on a different namespace @@ -205,7 +205,8 @@ public: auto css = CollectionShardingState::get(_ctx->opCtx, expCtx->ns); uassert(4567, "from collection cannot be sharded", !bool(css->getMetadata())); - PipelineD::prepareCursorSource(autoColl.getCollection(), nullptr, pipeline.getValue()); + PipelineD::prepareCursorSource( + autoColl.getCollection(), nullptr, pipeline.getValue().get()); return pipeline; } @@ -220,10 +221,8 @@ private: * if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough * percentage of the collection. */ -StatusWith<unique_ptr<PlanExecutor>> createRandomCursorExecutor(Collection* collection, - OperationContext* opCtx, - long long sampleSize, - long long numRecords) { +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorExecutor( + Collection* collection, OperationContext* opCtx, long long sampleSize, long long numRecords) { double kMaxSampleRatioForRandCursor = 0.05; if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) { return {nullptr}; @@ -290,7 +289,7 @@ StatusWith<unique_ptr<PlanExecutor>> createRandomCursorExecutor(Collection* coll opCtx, std::move(ws), std::move(stage), collection, PlanExecutor::YIELD_AUTO); } -StatusWith<std::unique_ptr<PlanExecutor>> attemptToGetExecutor( +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor( OperationContext* opCtx, Collection* collection, const intrusive_ptr<ExpressionContext>& pExpCtx, @@ -338,7 +337,7 @@ StatusWith<std::unique_ptr<PlanExecutor>> attemptToGetExecutor( void PipelineD::prepareCursorSource(Collection* collection, const AggregationRequest* aggRequest, - const intrusive_ptr<Pipeline>& pipeline) { + Pipeline* pipeline) { auto expCtx = pipeline->getContext(); dassert(expCtx->opCtx->lockState()->isCollectionLockedForMode(expCtx->ns.ns(), MODE_IS)); @@ -440,11 +439,11 @@ void PipelineD::prepareCursorSource(Collection* collection, collection, pipeline, expCtx, std::move(exec), deps, queryObj, sortObj, projForQuery); } -StatusWith<std::unique_ptr<PlanExecutor>> PipelineD::prepareExecutor( +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prepareExecutor( OperationContext* opCtx, Collection* collection, const NamespaceString& nss, - const intrusive_ptr<Pipeline>& pipeline, + Pipeline* pipeline, const intrusive_ptr<ExpressionContext>& expCtx, const intrusive_ptr<DocumentSourceSort>& sortStage, const DepsTracker& deps, @@ -514,7 +513,7 @@ StatusWith<std::unique_ptr<PlanExecutor>> PipelineD::prepareExecutor( aggRequest, plannerOpts); - std::unique_ptr<PlanExecutor> exec; + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; if (swExecutorSortAndProj.isOK()) { // Success! We have a non-blocking sort and a covered projection. exec = std::move(swExecutorSortAndProj.getValue()); @@ -573,9 +572,9 @@ StatusWith<std::unique_ptr<PlanExecutor>> PipelineD::prepareExecutor( } void PipelineD::addCursorSource(Collection* collection, - const intrusive_ptr<Pipeline>& pipeline, + Pipeline* pipeline, const intrusive_ptr<ExpressionContext>& expCtx, - unique_ptr<PlanExecutor> exec, + unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, DepsTracker deps, const BSONObj& queryObj, const BSONObj& sortObj, @@ -614,7 +613,7 @@ void PipelineD::addCursorSource(Collection* collection, pipeline->optimizePipeline(); } -std::string PipelineD::getPlanSummaryStr(const boost::intrusive_ptr<Pipeline>& pPipeline) { +std::string PipelineD::getPlanSummaryStr(const Pipeline* pPipeline) { if (auto docSourceCursor = dynamic_cast<DocumentSourceCursor*>(pPipeline->_sources.front().get())) { return docSourceCursor->getPlanSummaryStr(); @@ -623,8 +622,7 @@ std::string PipelineD::getPlanSummaryStr(const boost::intrusive_ptr<Pipeline>& p return ""; } -void PipelineD::getPlanSummaryStats(const boost::intrusive_ptr<Pipeline>& pPipeline, - PlanSummaryStats* statsOut) { +void PipelineD::getPlanSummaryStats(const Pipeline* pPipeline, PlanSummaryStats* statsOut) { invariant(statsOut); if (auto docSourceCursor = diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 6117fd14cff..8c06a9e2f27 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -34,6 +34,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/aggregation_request.h" +#include "mongo/db/query/plan_executor.h" namespace mongo { class Collection; @@ -42,7 +43,6 @@ class DocumentSourceSort; class ExpressionContext; class OperationContext; class Pipeline; -class PlanExecutor; struct PlanSummaryStats; class BSONObj; struct DepsTracker; @@ -77,12 +77,11 @@ public: */ static void prepareCursorSource(Collection* collection, const AggregationRequest* aggRequest, - const boost::intrusive_ptr<Pipeline>& pipeline); + Pipeline* pipeline); - static std::string getPlanSummaryStr(const boost::intrusive_ptr<Pipeline>& pPipeline); + static std::string getPlanSummaryStr(const Pipeline* pipeline); - static void getPlanSummaryStats(const boost::intrusive_ptr<Pipeline>& pPipeline, - PlanSummaryStats* statsOut); + static void getPlanSummaryStats(const Pipeline* pipeline, PlanSummaryStats* statsOut); private: PipelineD(); // does not exist: prevent instantiation @@ -96,11 +95,11 @@ private: * sort, and 'projectionObj' will be set to an empty object if the query system cannot provide a * covered projection. */ - static StatusWith<std::unique_ptr<PlanExecutor>> prepareExecutor( + static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> prepareExecutor( OperationContext* opCtx, Collection* collection, const NamespaceString& nss, - const boost::intrusive_ptr<Pipeline>& pipeline, + Pipeline* pipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const boost::intrusive_ptr<DocumentSourceSort>& sortStage, const DepsTracker& deps, @@ -114,9 +113,9 @@ private: * Pipeline. */ static void addCursorSource(Collection* collection, - const boost::intrusive_ptr<Pipeline>& pipeline, + Pipeline* pipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<PlanExecutor> exec, + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, DepsTracker deps, const BSONObj& queryObj = BSONObj(), const BSONObj& sortObj = BSONObj(), diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 73b5dfb5eaf..380a5824fd6 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -806,8 +806,8 @@ public: virtual ~Base() {} protected: - intrusive_ptr<Pipeline> mergePipe; - intrusive_ptr<Pipeline> shardPipe; + std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipe; + std::unique_ptr<Pipeline, Pipeline::Deleter> shardPipe; private: OperationContextNoop _opCtx; diff --git a/src/mongo/db/pipeline/stub_mongod_interface.h b/src/mongo/db/pipeline/stub_mongod_interface.h index 57279650f43..e21ccc4c822 100644 --- a/src/mongo/db/pipeline/stub_mongod_interface.h +++ b/src/mongo/db/pipeline/stub_mongod_interface.h @@ -87,7 +87,7 @@ public: MONGO_UNREACHABLE; } - StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline( + StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx) override { MONGO_UNREACHABLE; diff --git a/src/mongo/db/pipeline/tee_buffer.h b/src/mongo/db/pipeline/tee_buffer.h index bc4dfcd57b3..60fee309002 100644 --- a/src/mongo/db/pipeline/tee_buffer.h +++ b/src/mongo/db/pipeline/tee_buffer.h @@ -54,7 +54,7 @@ public: static boost::intrusive_ptr<TeeBuffer> create( size_t nConsumers, int bufferSizeBytes = internalQueryFacetBufferSizeBytes.load()); - void setSource(const boost::intrusive_ptr<DocumentSource>& source) { + void setSource(DocumentSource* source) { _source = source; } @@ -69,7 +69,9 @@ public: return info.stillInUse; })) { _buffer.clear(); - _source->dispose(); + if (_source) { + _source->dispose(); + } } } @@ -90,7 +92,7 @@ private: */ void loadNextBatch(); - boost::intrusive_ptr<DocumentSource> _source; + DocumentSource* _source = nullptr; const size_t _bufferSizeBytes; std::vector<DocumentSource::GetNextResult> _buffer; diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 903153d0824..93bccd5f6cb 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -32,6 +32,7 @@ #include "mongo/db/query/find.h" +#include "mongo/base/error_codes.h" #include "mongo/client/dbclientinterface.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" @@ -209,10 +210,14 @@ void generateBatch(int ntoreturn, } } - if (PlanExecutor::DEAD == *state || PlanExecutor::FAILURE == *state) { - // Propagate this error to caller. + // Propagate any errors to the caller. + if (PlanExecutor::FAILURE == *state) { error() << "getMore executor error, stats: " << redact(Explain::getWinningPlanStats(exec)); uasserted(17406, "getMore executor error: " + WorkingSetCommon::toStatusString(obj)); + } else if (PlanExecutor::DEAD == *state) { + uasserted(ErrorCodes::QueryPlanKilled, + str::stream() << "PlanExecutor killed: " + << WorkingSetCommon::toStatusString(obj)); } } @@ -279,7 +284,8 @@ Message getMore(OperationContext* opCtx, } else { readLock.emplace(opCtx, nss); Collection* collection = readLock->getCollection(); - uassert(17356, "collection dropped between getMore calls", collection); + uassert( + ErrorCodes::OperationFailed, "collection dropped between getMore calls", collection); cursorManager = collection->getCursorManager(); } @@ -296,7 +302,7 @@ Message getMore(OperationContext* opCtx, // A pin performs a CC lookup and if there is a CC, increments the CC's pin value so it // doesn't time out. Also informs ClientCursor that there is somebody actively holding the // CC, so don't delete it. - auto ccPin = cursorManager->pinCursor(cursorid); + auto ccPin = cursorManager->pinCursor(opCtx, cursorid); // These are set in the QueryResult msg we return. int resultFlags = ResultFlag_AwaitCapable; @@ -311,9 +317,13 @@ Message getMore(OperationContext* opCtx, bb.skip(sizeof(QueryResult::Value)); if (!ccPin.isOK()) { - invariant(ccPin == ErrorCodes::CursorNotFound); - cursorid = 0; - resultFlags = ResultFlag_CursorNotFound; + if (ccPin == ErrorCodes::CursorNotFound) { + cursorid = 0; + resultFlags = ResultFlag_CursorNotFound; + } else { + invariant(ccPin == ErrorCodes::QueryPlanKilled); + uassertStatusOK(ccPin.getStatus()); + } } else { ClientCursor* cc = ccPin.getValue().getCursor(); @@ -533,7 +543,7 @@ std::string runQuery(OperationContext* opCtx, } // We have a parsed query. Time to get the execution plan for it. - std::unique_ptr<PlanExecutor> exec = uassertStatusOK( + auto exec = uassertStatusOK( getExecutorFind(opCtx, collection, nss, std::move(cq), PlanExecutor::YIELD_AUTO)); const QueryRequest& qr = exec->getCanonicalQuery()->getQueryRequest(); @@ -631,14 +641,6 @@ std::string runQuery(OperationContext* opCtx, } } - // If we cache the executor later, we want to deregister it as it receives notifications - // anyway by virtue of being cached. - // - // If we don't cache the executor later, we are deleting it, so it must be deregistered. - // - // So, no matter what, deregister the executor. - exec->deregisterExec(); - // Caller expects exceptions thrown in certain cases. if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) { error() << "Plan executor error during find: " << PlanExecutor::statestr(state) @@ -662,6 +664,7 @@ std::string runQuery(OperationContext* opCtx, // Allocate a new ClientCursor and register it with the cursor manager. ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor( + opCtx, {std::move(exec), nss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 84dcfb3e372..90e4087d63c 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -460,11 +460,12 @@ StatusWith<PrepareExecutionResult> prepareExecution(OperationContext* opCtx, } // namespace -StatusWith<unique_ptr<PlanExecutor>> getExecutor(OperationContext* opCtx, - Collection* collection, - unique_ptr<CanonicalQuery> canonicalQuery, - PlanExecutor::YieldPolicy yieldPolicy, - size_t plannerOptions) { +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor( + OperationContext* opCtx, + Collection* collection, + unique_ptr<CanonicalQuery> canonicalQuery, + PlanExecutor::YieldPolicy yieldPolicy, + size_t plannerOptions) { unique_ptr<WorkingSet> ws = make_unique<WorkingSet>(); StatusWith<PrepareExecutionResult> executionResult = prepareExecution(opCtx, collection, ws.get(), std::move(canonicalQuery), plannerOptions); @@ -507,9 +508,8 @@ mongo::BSONElement extractOplogTsOptime(const mongo::MatchExpression* me) { return static_cast<const mongo::ComparisonMatchExpression*>(me)->getData(); } -StatusWith<unique_ptr<PlanExecutor>> getOplogStartHack(OperationContext* opCtx, - Collection* collection, - unique_ptr<CanonicalQuery> cq) { +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack( + OperationContext* opCtx, Collection* collection, unique_ptr<CanonicalQuery> cq) { invariant(collection); invariant(cq.get()); @@ -573,7 +573,8 @@ StatusWith<unique_ptr<PlanExecutor>> getOplogStartHack(OperationContext* opCtx, auto statusWithPlanExecutor = PlanExecutor::make( opCtx, std::move(oplogws), std::move(stage), collection, PlanExecutor::YIELD_AUTO); invariant(statusWithPlanExecutor.isOK()); - unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); + unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec = + std::move(statusWithPlanExecutor.getValue()); // The stage returns a RecordId of where to start. startLoc = RecordId(); @@ -615,11 +616,12 @@ StatusWith<unique_ptr<PlanExecutor>> getOplogStartHack(OperationContext* opCtx, } // namespace -StatusWith<unique_ptr<PlanExecutor>> getExecutorFind(OperationContext* opCtx, - Collection* collection, - const NamespaceString& nss, - unique_ptr<CanonicalQuery> canonicalQuery, - PlanExecutor::YieldPolicy yieldPolicy) { +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind( + OperationContext* opCtx, + Collection* collection, + const NamespaceString& nss, + unique_ptr<CanonicalQuery> canonicalQuery, + PlanExecutor::YieldPolicy yieldPolicy) { if (NULL != collection && canonicalQuery->getQueryRequest().isOplogReplay()) { return getOplogStartHack(opCtx, collection, std::move(canonicalQuery)); } @@ -685,10 +687,8 @@ StatusWith<unique_ptr<PlanStage>> applyProjection(OperationContext* opCtx, // Delete // -StatusWith<unique_ptr<PlanExecutor>> getExecutorDelete(OperationContext* opCtx, - OpDebug* opDebug, - Collection* collection, - ParsedDelete* parsedDelete) { +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDelete( + OperationContext* opCtx, OpDebug* opDebug, Collection* collection, ParsedDelete* parsedDelete) { const DeleteRequest* request = parsedDelete->getRequest(); const NamespaceString& nss(request->getNamespaceString()); @@ -839,10 +839,8 @@ inline void validateUpdate(const char* ns, const BSONObj& updateobj, const BSONO } // namespace -StatusWith<unique_ptr<PlanExecutor>> getExecutorUpdate(OperationContext* opCtx, - OpDebug* opDebug, - Collection* collection, - ParsedUpdate* parsedUpdate) { +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorUpdate( + OperationContext* opCtx, OpDebug* opDebug, Collection* collection, ParsedUpdate* parsedUpdate) { const UpdateRequest* request = parsedUpdate->getRequest(); UpdateDriver* driver = parsedUpdate->getDriver(); @@ -989,10 +987,11 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorUpdate(OperationContext* opCtx, // Group // -StatusWith<unique_ptr<PlanExecutor>> getExecutorGroup(OperationContext* opCtx, - Collection* collection, - const GroupRequest& request, - PlanExecutor::YieldPolicy yieldPolicy) { +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorGroup( + OperationContext* opCtx, + Collection* collection, + const GroupRequest& request, + PlanExecutor::YieldPolicy yieldPolicy) { if (!getGlobalScriptEngine()) { return Status(ErrorCodes::BadValue, "server-side JavaScript execution is disabled"); } @@ -1231,11 +1230,12 @@ BSONObj getDistinctProjection(const std::string& field) { } // namespace -StatusWith<unique_ptr<PlanExecutor>> getExecutorCount(OperationContext* opCtx, - Collection* collection, - const CountRequest& request, - bool explain, - PlanExecutor::YieldPolicy yieldPolicy) { +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorCount( + OperationContext* opCtx, + Collection* collection, + const CountRequest& request, + bool explain, + PlanExecutor::YieldPolicy yieldPolicy) { unique_ptr<WorkingSet> ws = make_unique<WorkingSet>(); auto qr = stdx::make_unique<QueryRequest>(request.getNs()); @@ -1410,11 +1410,12 @@ bool turnIxscanIntoDistinctIxscan(QuerySolution* soln, const string& field) { return true; } -StatusWith<unique_ptr<PlanExecutor>> getExecutorDistinct(OperationContext* opCtx, - Collection* collection, - const std::string& ns, - ParsedDistinct* parsedDistinct, - PlanExecutor::YieldPolicy yieldPolicy) { +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDistinct( + OperationContext* opCtx, + Collection* collection, + const std::string& ns, + ParsedDistinct* parsedDistinct, + PlanExecutor::YieldPolicy yieldPolicy) { if (!collection) { // Treat collections that do not exist as empty collections. return PlanExecutor::make(opCtx, diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h index 65708d135fd..6a13341c111 100644 --- a/src/mongo/db/query/get_executor.h +++ b/src/mongo/db/query/get_executor.h @@ -71,7 +71,7 @@ void fillOutPlannerParams(OperationContext* opCtx, * * If the query cannot be executed, returns a Status indicating why. */ -StatusWith<std::unique_ptr<PlanExecutor>> getExecutor( +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor( OperationContext* opCtx, Collection* collection, std::unique_ptr<CanonicalQuery> canonicalQuery, @@ -86,7 +86,7 @@ StatusWith<std::unique_ptr<PlanExecutor>> getExecutor( * * If the query cannot be executed, returns a Status indicating why. */ -StatusWith<std::unique_ptr<PlanExecutor>> getExecutorFind( +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind( OperationContext* opCtx, Collection* collection, const NamespaceString& nss, @@ -109,7 +109,7 @@ bool turnIxscanIntoDistinctIxscan(QuerySolution* soln, const std::string& field) * possible values of a certain field. As such, we can skip lots of data in certain cases (see * body of method for detail). */ -StatusWith<std::unique_ptr<PlanExecutor>> getExecutorDistinct( +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDistinct( OperationContext* opCtx, Collection* collection, const std::string& ns, @@ -123,11 +123,12 @@ StatusWith<std::unique_ptr<PlanExecutor>> getExecutorDistinct( * As such, with certain covered queries, we can skip the overhead of fetching etc. when * executing a count. */ -StatusWith<std::unique_ptr<PlanExecutor>> getExecutorCount(OperationContext* opCtx, - Collection* collection, - const CountRequest& request, - bool explain, - PlanExecutor::YieldPolicy yieldPolicy); +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorCount( + OperationContext* opCtx, + Collection* collection, + const CountRequest& request, + bool explain, + PlanExecutor::YieldPolicy yieldPolicy); /** * Get a PlanExecutor for a delete operation. 'parsedDelete' describes the query predicate @@ -145,10 +146,8 @@ StatusWith<std::unique_ptr<PlanExecutor>> getExecutorCount(OperationContext* opC * * If the query cannot be executed, returns a Status indicating why. */ -StatusWith<std::unique_ptr<PlanExecutor>> getExecutorDelete(OperationContext* opCtx, - OpDebug* opDebug, - Collection* collection, - ParsedDelete* parsedDelete); +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDelete( + OperationContext* opCtx, OpDebug* opDebug, Collection* collection, ParsedDelete* parsedDelete); /** * Get a PlanExecutor for an update operation. 'parsedUpdate' describes the query predicate @@ -167,10 +166,8 @@ StatusWith<std::unique_ptr<PlanExecutor>> getExecutorDelete(OperationContext* op * * If the query cannot be executed, returns a Status indicating why. */ -StatusWith<std::unique_ptr<PlanExecutor>> getExecutorUpdate(OperationContext* opCtx, - OpDebug* opDebug, - Collection* collection, - ParsedUpdate* parsedUpdate); +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorUpdate( + OperationContext* opCtx, OpDebug* opDebug, Collection* collection, ParsedUpdate* parsedUpdate); /** * Get a PlanExecutor for a group operation. @@ -180,9 +177,10 @@ StatusWith<std::unique_ptr<PlanExecutor>> getExecutorUpdate(OperationContext* op * * If an executor could not be created, returns a Status indicating why. */ -StatusWith<std::unique_ptr<PlanExecutor>> getExecutorGroup(OperationContext* opCtx, - Collection* collection, - const GroupRequest& request, - PlanExecutor::YieldPolicy yieldPolicy); +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorGroup( + OperationContext* opCtx, + Collection* collection, + const GroupRequest& request, + PlanExecutor::YieldPolicy yieldPolicy); } // namespace mongo diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp index b09e550b863..6b1ae0857b7 100644 --- a/src/mongo/db/query/internal_plans.cpp +++ b/src/mongo/db/query/internal_plans.cpp @@ -43,12 +43,13 @@ namespace mongo { -std::unique_ptr<PlanExecutor> InternalPlanner::collectionScan(OperationContext* opCtx, - StringData ns, - Collection* collection, - PlanExecutor::YieldPolicy yieldPolicy, - const Direction direction, - const RecordId startLoc) { +std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collectionScan( + OperationContext* opCtx, + StringData ns, + Collection* collection, + PlanExecutor::YieldPolicy yieldPolicy, + const Direction direction, + const RecordId startLoc) { std::unique_ptr<WorkingSet> ws = stdx::make_unique<WorkingSet>(); if (NULL == collection) { @@ -71,7 +72,7 @@ std::unique_ptr<PlanExecutor> InternalPlanner::collectionScan(OperationContext* return std::move(statusWithPlanExecutor.getValue()); } -std::unique_ptr<PlanExecutor> InternalPlanner::deleteWithCollectionScan( +std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWithCollectionScan( OperationContext* opCtx, Collection* collection, const DeleteStageParams& params, @@ -91,15 +92,16 @@ std::unique_ptr<PlanExecutor> InternalPlanner::deleteWithCollectionScan( } -std::unique_ptr<PlanExecutor> InternalPlanner::indexScan(OperationContext* opCtx, - const Collection* collection, - const IndexDescriptor* descriptor, - const BSONObj& startKey, - const BSONObj& endKey, - BoundInclusion boundInclusion, - PlanExecutor::YieldPolicy yieldPolicy, - Direction direction, - int options) { +std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::indexScan( + OperationContext* opCtx, + const Collection* collection, + const IndexDescriptor* descriptor, + const BSONObj& startKey, + const BSONObj& endKey, + BoundInclusion boundInclusion, + PlanExecutor::YieldPolicy yieldPolicy, + Direction direction, + int options) { auto ws = stdx::make_unique<WorkingSet>(); std::unique_ptr<PlanStage> root = _indexScan(opCtx, @@ -118,7 +120,7 @@ std::unique_ptr<PlanExecutor> InternalPlanner::indexScan(OperationContext* opCtx return std::move(executor.getValue()); } -std::unique_ptr<PlanExecutor> InternalPlanner::deleteWithIndexScan( +std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWithIndexScan( OperationContext* opCtx, Collection* collection, const DeleteStageParams& params, @@ -148,7 +150,7 @@ std::unique_ptr<PlanExecutor> InternalPlanner::deleteWithIndexScan( return std::move(executor.getValue()); } -std::unique_ptr<PlanExecutor> InternalPlanner::updateWithIdHack( +std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::updateWithIdHack( OperationContext* opCtx, Collection* collection, const UpdateStageParams& params, diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h index cc9eb7f0d86..800026b8578 100644 --- a/src/mongo/db/query/internal_plans.h +++ b/src/mongo/db/query/internal_plans.h @@ -67,17 +67,18 @@ public: /** * Returns a collection scan. Caller owns pointer. */ - static std::unique_ptr<PlanExecutor> collectionScan(OperationContext* opCtx, - StringData ns, - Collection* collection, - PlanExecutor::YieldPolicy yieldPolicy, - const Direction direction = FORWARD, - const RecordId startLoc = RecordId()); + static std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> collectionScan( + OperationContext* opCtx, + StringData ns, + Collection* collection, + PlanExecutor::YieldPolicy yieldPolicy, + const Direction direction = FORWARD, + const RecordId startLoc = RecordId()); /** * Returns a FETCH => DELETE plan. */ - static std::unique_ptr<PlanExecutor> deleteWithCollectionScan( + static std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> deleteWithCollectionScan( OperationContext* opCtx, Collection* collection, const DeleteStageParams& params, @@ -88,38 +89,41 @@ public: /** * Returns an index scan. Caller owns returned pointer. */ - static std::unique_ptr<PlanExecutor> indexScan(OperationContext* opCtx, - const Collection* collection, - const IndexDescriptor* descriptor, - const BSONObj& startKey, - const BSONObj& endKey, - BoundInclusion boundInclusion, - PlanExecutor::YieldPolicy yieldPolicy, - Direction direction = FORWARD, - int options = IXSCAN_DEFAULT); + static std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> indexScan( + OperationContext* opCtx, + const Collection* collection, + const IndexDescriptor* descriptor, + const BSONObj& startKey, + const BSONObj& endKey, + BoundInclusion boundInclusion, + PlanExecutor::YieldPolicy yieldPolicy, + Direction direction = FORWARD, + int options = IXSCAN_DEFAULT); /** * Returns an IXSCAN => FETCH => DELETE plan. */ - static std::unique_ptr<PlanExecutor> deleteWithIndexScan(OperationContext* opCtx, - Collection* collection, - const DeleteStageParams& params, - const IndexDescriptor* descriptor, - const BSONObj& startKey, - const BSONObj& endKey, - BoundInclusion boundInclusion, - PlanExecutor::YieldPolicy yieldPolicy, - Direction direction = FORWARD); + static std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> deleteWithIndexScan( + OperationContext* opCtx, + Collection* collection, + const DeleteStageParams& params, + const IndexDescriptor* descriptor, + const BSONObj& startKey, + const BSONObj& endKey, + BoundInclusion boundInclusion, + PlanExecutor::YieldPolicy yieldPolicy, + Direction direction = FORWARD); /** * Returns an IDHACK => UPDATE plan. */ - static std::unique_ptr<PlanExecutor> updateWithIdHack(OperationContext* opCtx, - Collection* collection, - const UpdateStageParams& params, - const IndexDescriptor* descriptor, - const BSONObj& key, - PlanExecutor::YieldPolicy yieldPolicy); + static std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> updateWithIdHack( + OperationContext* opCtx, + Collection* collection, + const UpdateStageParams& params, + const IndexDescriptor* descriptor, + const BSONObj& key, + PlanExecutor::YieldPolicy yieldPolicy); private: /** diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index 8e79fe1a2c5..7a47a801b38 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -55,12 +55,11 @@ using std::shared_ptr; using std::string; using std::unique_ptr; using std::vector; -using stdx::make_unique; namespace { namespace { -MONGO_FP_DECLARE(planExecutorAlwaysDead); +MONGO_FP_DECLARE(planExecutorAlwaysFails); } // namespace /** @@ -85,21 +84,23 @@ PlanStage* getStageByType(PlanStage* root, StageType type) { } // static -StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx, - unique_ptr<WorkingSet> ws, - unique_ptr<PlanStage> rt, - const Collection* collection, - YieldPolicy yieldPolicy) { +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( + OperationContext* opCtx, + unique_ptr<WorkingSet> ws, + unique_ptr<PlanStage> rt, + const Collection* collection, + YieldPolicy yieldPolicy) { return PlanExecutor::make( opCtx, std::move(ws), std::move(rt), nullptr, nullptr, collection, {}, yieldPolicy); } // static -StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx, - unique_ptr<WorkingSet> ws, - unique_ptr<PlanStage> rt, - NamespaceString nss, - YieldPolicy yieldPolicy) { +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( + OperationContext* opCtx, + unique_ptr<WorkingSet> ws, + unique_ptr<PlanStage> rt, + NamespaceString nss, + YieldPolicy yieldPolicy) { return PlanExecutor::make(opCtx, std::move(ws), std::move(rt), @@ -111,24 +112,26 @@ StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx, } // static -StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx, - unique_ptr<WorkingSet> ws, - unique_ptr<PlanStage> rt, - unique_ptr<CanonicalQuery> cq, - const Collection* collection, - YieldPolicy yieldPolicy) { +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( + OperationContext* opCtx, + unique_ptr<WorkingSet> ws, + unique_ptr<PlanStage> rt, + unique_ptr<CanonicalQuery> cq, + const Collection* collection, + YieldPolicy yieldPolicy) { return PlanExecutor::make( opCtx, std::move(ws), std::move(rt), nullptr, std::move(cq), collection, {}, yieldPolicy); } // static -StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx, - unique_ptr<WorkingSet> ws, - unique_ptr<PlanStage> rt, - unique_ptr<QuerySolution> qs, - unique_ptr<CanonicalQuery> cq, - const Collection* collection, - YieldPolicy yieldPolicy) { +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( + OperationContext* opCtx, + unique_ptr<WorkingSet> ws, + unique_ptr<PlanStage> rt, + unique_ptr<QuerySolution> qs, + unique_ptr<CanonicalQuery> cq, + const Collection* collection, + YieldPolicy yieldPolicy) { return PlanExecutor::make(opCtx, std::move(ws), std::move(rt), @@ -140,24 +143,29 @@ StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx, } // static -StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx, - unique_ptr<WorkingSet> ws, - unique_ptr<PlanStage> rt, - unique_ptr<QuerySolution> qs, - unique_ptr<CanonicalQuery> cq, - const Collection* collection, - NamespaceString nss, - YieldPolicy yieldPolicy) { - unique_ptr<PlanExecutor> exec(new PlanExecutor(opCtx, - std::move(ws), - std::move(rt), - std::move(qs), - std::move(cq), - collection, - std::move(nss))); +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( + OperationContext* opCtx, + unique_ptr<WorkingSet> ws, + unique_ptr<PlanStage> rt, + unique_ptr<QuerySolution> qs, + unique_ptr<CanonicalQuery> cq, + const Collection* collection, + NamespaceString nss, + YieldPolicy yieldPolicy) { + + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec( + new PlanExecutor(opCtx, + std::move(ws), + std::move(rt), + std::move(qs), + std::move(cq), + collection, + std::move(nss), + yieldPolicy), + PlanExecutor::Deleter(opCtx, collection)); // Perform plan selection, if necessary. - Status status = exec->pickBestPlan(yieldPolicy, collection); + Status status = exec->pickBestPlan(collection); if (!status.isOK()) { return status; } @@ -171,14 +179,16 @@ PlanExecutor::PlanExecutor(OperationContext* opCtx, unique_ptr<QuerySolution> qs, unique_ptr<CanonicalQuery> cq, const Collection* collection, - NamespaceString nss) + NamespaceString nss, + YieldPolicy yieldPolicy) : _opCtx(opCtx), _cq(std::move(cq)), _workingSet(std::move(ws)), _qs(std::move(qs)), _root(std::move(rt)), _nss(std::move(nss)), - _yieldPolicy(new PlanYieldPolicy(this, YIELD_MANUAL)) { + // There's no point in yielding if the collection doesn't exist. + _yieldPolicy(new PlanYieldPolicy(this, collection ? yieldPolicy : NO_YIELD)) { // We may still need to initialize _nss from either collection or _cq. if (!_nss.isEmpty()) { return; // We already have an _nss set, so there's nothing more to do. @@ -186,17 +196,18 @@ PlanExecutor::PlanExecutor(OperationContext* opCtx, if (collection) { _nss = collection->ns(); + if (_yieldPolicy->canReleaseLocksDuringExecution()) { + collection->getCursorManager()->registerExecutor(this); + _registered = true; + } } else { invariant(_cq); _nss = _cq->getQueryRequest().nss(); } } -Status PlanExecutor::pickBestPlan(YieldPolicy policy, const Collection* collection) { +Status PlanExecutor::pickBestPlan(const Collection* collection) { invariant(_currentState == kUsable); - // For YIELD_AUTO, this will both set an auto yield policy on the PlanExecutor and - // register it to receive notifications. - this->setYieldPolicy(policy, collection); // First check if we need to do subplanning. PlanStage* foundStage = getStageByType(_root.get(), STAGE_SUBPLAN); @@ -226,7 +237,9 @@ Status PlanExecutor::pickBestPlan(YieldPolicy policy, const Collection* collecti return Status::OK(); } -PlanExecutor::~PlanExecutor() {} +PlanExecutor::~PlanExecutor() { + invariant(_currentState = kDisposed); +} // static string PlanExecutor::statestr(ExecState s) { @@ -295,7 +308,7 @@ void PlanExecutor::saveState() { // boundaries. WorkingSetCommon::prepareForSnapshotChange(_workingSet.get()); - if (!killed()) { + if (!isMarkedAsKilled()) { _root->saveState(); } _currentState = kSaved; @@ -305,7 +318,7 @@ bool PlanExecutor::restoreState() { try { return restoreStateWithoutRetrying(); } catch (const WriteConflictException& wce) { - if (!_yieldPolicy->allowedToYield()) + if (!_yieldPolicy->canAutoYield()) throw; // Handles retries by calling restoreStateWithoutRetrying() in a loop. @@ -316,12 +329,12 @@ bool PlanExecutor::restoreState() { bool PlanExecutor::restoreStateWithoutRetrying() { invariant(_currentState == kSaved); - if (!killed()) { + if (!isMarkedAsKilled()) { _root->restoreState(); } _currentState = kUsable; - return !killed(); + return !isMarkedAsKilled(); } void PlanExecutor::detachFromOperationContext() { @@ -345,7 +358,7 @@ void PlanExecutor::reattachToOperationContext(OperationContext* opCtx) { } void PlanExecutor::invalidate(OperationContext* opCtx, const RecordId& dl, InvalidationType type) { - if (!killed()) { + if (!isMarkedAsKilled()) { _root->invalidate(opCtx, dl, type); } } @@ -369,17 +382,17 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o } PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { - MONGO_FAIL_POINT_BLOCK(planExecutorAlwaysDead, customKill) { - const BSONObj& data = customKill.getData(); - BSONElement customKillNS = data["namespace"]; - if (!customKillNS || _nss.ns() == customKillNS.str()) { - deregisterExec(); - kill("hit planExecutorAlwaysDead fail point"); - } + if (MONGO_FAIL_POINT(planExecutorAlwaysFails)) { + Status status(ErrorCodes::OperationFailed, + str::stream() << "PlanExecutor hit planExecutorAlwaysFails fail point"); + *objOut = + Snapshotted<BSONObj>(SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status)); + + return PlanExecutor::FAILURE; } invariant(_currentState == kUsable); - if (killed()) { + if (isMarkedAsKilled()) { if (NULL != objOut) { Status status(ErrorCodes::OperationFailed, str::stream() << "Operation aborted because: " << *_killReason); @@ -415,7 +428,7 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, if (!_yieldPolicy->yield(fetcher.get())) { // A return of false from a yield should only happen if we've been killed during the // yield. - invariant(killed()); + invariant(isMarkedAsKilled()); if (NULL != objOut) { Status status(ErrorCodes::OperationFailed, @@ -475,7 +488,7 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, // This result didn't have the data the caller wanted, try again. } else if (PlanStage::NEED_YIELD == code) { if (id == WorkingSet::INVALID_ID) { - if (!_yieldPolicy->allowedToYield()) + if (!_yieldPolicy->canAutoYield()) throw WriteConflictException(); CurOp::get(_opCtx)->debug().writeConflicts++; writeConflictsInARow++; @@ -491,7 +504,7 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, } // If we're allowed to, we will yield next time through the loop. - if (_yieldPolicy->allowedToYield()) + if (_yieldPolicy->canAutoYield()) _yieldPolicy->forceYield(); } else if (PlanStage::NEED_TIME == code) { // Fall through to yield check at end of large conditional. @@ -513,23 +526,29 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, bool PlanExecutor::isEOF() { invariant(_currentState == kUsable); - return killed() || (_stash.empty() && _root->isEOF()); + return isMarkedAsKilled() || (_stash.empty() && _root->isEOF()); } -void PlanExecutor::registerExec(const Collection* collection) { - // There's no need to register a PlanExecutor for which the underlying collection - // doesn't exist. - if (collection) { - _safety.reset(new ScopedExecutorRegistration(this, collection)); - } +void PlanExecutor::markAsKilled(string reason) { + _killReason = std::move(reason); } -void PlanExecutor::deregisterExec() { - _safety.reset(); -} +void PlanExecutor::dispose(OperationContext* opCtx, CursorManager* cursorManager) { + if (_currentState == kDisposed) { + return; + } -void PlanExecutor::kill(string reason) { - _killReason = std::move(reason); + // If we are registered with the CursorManager we need to be sure to deregister ourselves. + // However, if we have been killed we should not attempt to deregister ourselves, since the + // caller of markAsKilled() will have done that already, and the CursorManager may no longer + // exist. Note that the caller's collection lock prevents us from being marked as killed during + // this method, since any interruption event requires a lock in at least MODE_IX. + if (cursorManager && _registered && !isMarkedAsKilled()) { + dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS)); + cursorManager->deregisterExecutor(this); + } + _root->dispose(opCtx); + _currentState = kDisposed; } Status PlanExecutor::executePlan() { @@ -541,7 +560,7 @@ Status PlanExecutor::executePlan() { } if (PlanExecutor::DEAD == state || PlanExecutor::FAILURE == state) { - if (killed()) { + if (isMarkedAsKilled()) { return Status(ErrorCodes::QueryPlanKilled, str::stream() << "Operation aborted because: " << *_killReason); } @@ -552,56 +571,33 @@ Status PlanExecutor::executePlan() { << PlanExecutor::statestr(state)); } - invariant(!killed()); + invariant(!isMarkedAsKilled()); invariant(PlanExecutor::IS_EOF == state); return Status::OK(); } -void PlanExecutor::setYieldPolicy(YieldPolicy policy, - const Collection* collection, - bool registerExecutor) { - if (!collection) { - // If the collection doesn't exist, then there's no need to yield at all. - invariant(!_yieldPolicy->allowedToYield()); - return; - } - - _yieldPolicy->setPolicy(policy); - if (PlanExecutor::YIELD_AUTO == policy) { - // Runners that yield automatically generally need to be registered so that - // after yielding, they receive notifications of events like deletions and - // index drops. The only exception is that a few PlanExecutors get registered - // by ClientCursor instead of being registered here. This is unneeded if we only do - // partial "yields" for WriteConflict retrying. - if (registerExecutor) { - this->registerExec(collection); - } - } -} void PlanExecutor::enqueue(const BSONObj& obj) { _stash.push(obj.getOwned()); } // -// ScopedExecutorRegistration +// PlanExecutor::Deleter // -// PlanExecutor::ScopedExecutorRegistration -PlanExecutor::ScopedExecutorRegistration::ScopedExecutorRegistration(PlanExecutor* exec, - const Collection* collection) - : _exec(exec), _collection(collection) { - invariant(_collection); - _collection->getCursorManager()->registerExecutor(_exec); -} +PlanExecutor::Deleter::Deleter(OperationContext* opCtx, const Collection* collection) + : _opCtx(opCtx), _cursorManager(collection ? collection->getCursorManager() : nullptr) {} -PlanExecutor::ScopedExecutorRegistration::~ScopedExecutorRegistration() { - if (_exec->killed()) { - // If the plan executor has been killed, then it's possible that the collection - // no longer exists. - return; +void PlanExecutor::Deleter::operator()(PlanExecutor* execPtr) { + try { + invariant(_opCtx); // It is illegal to invoke operator() on a default constructed Deleter. + if (!_dismissed) { + execPtr->dispose(_opCtx, _cursorManager); + } + delete execPtr; + } catch (...) { + std::terminate(); } - _collection->getCursorManager()->deregisterExecutor(_exec); } } // namespace mongo diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index 325d7642a91..4e9a39b426c 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -40,11 +40,12 @@ namespace mongo { class BSONObj; class Collection; -class RecordId; -class PlanStage; +class CursorManager; class PlanExecutor; -struct PlanStageStats; +class PlanStage; class PlanYieldPolicy; +class RecordId; +struct PlanStageStats; class WorkingSet; /** @@ -80,8 +81,8 @@ public: }; /** - * The yielding policy of the plan executor. By default, an executor does not yield itself - * (YIELD_MANUAL). + * The yielding policy of the plan executor. By default, an executor does not yield itself + * (NO_YIELD). */ enum YieldPolicy { // Any call to getNext() may yield. In particular, the executor may be killed during any @@ -91,37 +92,63 @@ public: // This will handle WriteConflictExceptions that occur while processing the query, but // will not yield locks. abandonSnapshot() will be called if a WriteConflictException - // occurs so callers must be prepared to get a new snapshot. + // occurs so callers must be prepared to get a new snapshot. A PlanExecutor constructed with + // this yield policy will not be registered to receive invalidations, so the caller must + // hold their locks continuously from construction to destruction. WRITE_CONFLICT_RETRY_ONLY, - // Owner must yield manually if yields are requested. How to yield yourself: - // - // 0. Let's say you have PlanExecutor* exec. - // - // 1. Register your PlanExecutor with ClientCursor. Registered executors are informed - // about RecordId deletions and namespace invalidation, as well as other important - // events. Do this by calling registerExec() on the executor. Alternatively, this can - // be done per-yield (as described below). - // - // 2. Construct a PlanYieldPolicy 'policy', passing 'exec' to the constructor. - // - // 3. Call PlanYieldPolicy::yield() on 'policy'. If your PlanExecutor is not yet - // registered (because you want to register on a per-yield basis), then pass - // 'true' to yield(). - // - // 4. The call to yield() returns a boolean indicating whether or not 'exec' is - // still alove. If it is false, then 'exec' was killed during the yield and is - // no longer valid. - // - // It is not possible to handle WriteConflictExceptions in this mode without restarting - // the query. + // Use this policy if you want to disable auto-yielding, but will release locks while using + // the PlanExecutor. Any WriteConflictExceptions will be raised to the caller of getNext(). YIELD_MANUAL, + + // Can be used in one of the following scenarios: + // - The caller will hold a lock continuously for the lifetime of this PlanExecutor. + // - This PlanExecutor doesn't logically belong to a Collection, and so does not need to be + // locked during execution. For example, a PlanExecutor containing a PipelineProxyStage + // which is being used to execute an aggregation pipeline. + NO_YIELD, + }; + + /** + * This class will ensure a PlanExecutor is disposed before it is deleted. + */ + class Deleter { + public: + /** + * Constructs an empty deleter. Useful for creating a + * unique_ptr<PlanExecutor, PlanExecutor::Deleter> without populating it. + */ + Deleter() {} + + Deleter(OperationContext* opCtx, const Collection* collection); + + /** + * If an owner of a std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> wants to assume + * responsibility for calling PlanExecutor::dispose(), they can call dismissDisposal(). If + * dismissed, a Deleter will not call dispose() when deleting the PlanExecutor. + */ + void dismissDisposal() { + _dismissed = true; + } + + /** + * If 'execPtr' hasn't already been disposed, will call dispose(). Also, if 'execPtr' has + * been registered with the CursorManager, will deregister it. If 'execPtr' is a yielding + * PlanExecutor, callers must hold a lock on the collection in at least MODE_IS. + */ + void operator()(PlanExecutor* execPtr); + + private: + OperationContext* _opCtx = nullptr; + CursorManager* _cursorManager = nullptr; + + bool _dismissed = false; }; // // Factory methods. // - // On success, return a new PlanExecutor, owned by the caller, through 'out'. + // On success, return a new PlanExecutor, owned by the caller. // // Passing YIELD_AUTO to any of these factories will construct a yielding executor which // may yield in the following circumstances: @@ -129,57 +156,58 @@ public: // 2) On any call to getNext(). // 3) While executing the plan inside executePlan(). // - // The executor will also be automatically registered to receive notifications in the - // case of YIELD_AUTO, so no further calls to registerExec() or setYieldPolicy() are - // necessary. + // The executor will also be automatically registered to receive notifications in the case of + // YIELD_AUTO or YIELD_MANUAL. // /** * Used when there is no canonical query and no query solution. * - * Right now this is only for idhack updates which neither canonicalize - * nor go through normal planning. + * Right now this is only for idhack updates which neither canonicalize nor go through normal + * planning. */ - static StatusWith<std::unique_ptr<PlanExecutor>> make(OperationContext* opCtx, - std::unique_ptr<WorkingSet> ws, - std::unique_ptr<PlanStage> rt, - const Collection* collection, - YieldPolicy yieldPolicy); + static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + OperationContext* opCtx, + std::unique_ptr<WorkingSet> ws, + std::unique_ptr<PlanStage> rt, + const Collection* collection, + YieldPolicy yieldPolicy); /** - * Used when we have a NULL collection and no canonical query. In this case, - * we need to explicitly pass a namespace to the plan executor. + * Used when we have a NULL collection and no canonical query. In this case, we need to + * explicitly pass a namespace to the plan executor. */ - static StatusWith<std::unique_ptr<PlanExecutor>> make(OperationContext* opCtx, - std::unique_ptr<WorkingSet> ws, - std::unique_ptr<PlanStage> rt, - NamespaceString nss, - YieldPolicy yieldPolicy); + static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + OperationContext* opCtx, + std::unique_ptr<WorkingSet> ws, + std::unique_ptr<PlanStage> rt, + NamespaceString nss, + YieldPolicy yieldPolicy); /** - * Used when there is a canonical query but no query solution (e.g. idhack - * queries, queries against a NULL collection, queries using the subplan stage). + * Used when there is a canonical query but no query solution (e.g. idhack queries, queries + * against a NULL collection, queries using the subplan stage). */ - static StatusWith<std::unique_ptr<PlanExecutor>> make(OperationContext* opCtx, - std::unique_ptr<WorkingSet> ws, - std::unique_ptr<PlanStage> rt, - std::unique_ptr<CanonicalQuery> cq, - const Collection* collection, - YieldPolicy yieldPolicy); + static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + OperationContext* opCtx, + std::unique_ptr<WorkingSet> ws, + std::unique_ptr<PlanStage> rt, + std::unique_ptr<CanonicalQuery> cq, + const Collection* collection, + YieldPolicy yieldPolicy); /** - * The constructor for the normal case, when you have a collection, a canonical query, - * and a query solution. + * The constructor for the normal case, when you have a collection, a canonical query, and a + * query solution. */ - static StatusWith<std::unique_ptr<PlanExecutor>> make(OperationContext* opCtx, - std::unique_ptr<WorkingSet> ws, - std::unique_ptr<PlanStage> rt, - std::unique_ptr<QuerySolution> qs, - std::unique_ptr<CanonicalQuery> cq, - const Collection* collection, - YieldPolicy yieldPolicy); - - ~PlanExecutor(); + static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + OperationContext* opCtx, + std::unique_ptr<WorkingSet> ws, + std::unique_ptr<PlanStage> rt, + std::unique_ptr<QuerySolution> qs, + std::unique_ptr<CanonicalQuery> cq, + const Collection* collection, + YieldPolicy yieldPolicy); // // Accessors @@ -312,26 +340,27 @@ public: // /** - * Register this plan executor with the collection cursor manager so that it - * receives notifications for events that happen while yielding any locks. - * - * Deregistration happens automatically when this plan executor is destroyed. - */ - void registerExec(const Collection* collection); - - /** - * Unregister this PlanExecutor. Normally you want the PlanExecutor to be registered - * for its lifetime, and you shouldn't have to call this explicitly. + * If we're yielding locks, the database we're operating over or any collection we're relying on + * may be dropped. Plan executors are notified of such events by calling markAsKilled(). + * Callers must specify the 'reason' for why this executor is being killed. Subsequent calls to + * getNext() will return DEAD, and fill 'objOut' with an error detail including 'reason'. */ - void deregisterExec(); + void markAsKilled(std::string reason); /** - * If we're yielding locks, the database we're operating over or any collection we're - * relying on may be dropped. When this happens all cursors and plan executors on that - * database and collection are killed or deleted in some fashion. Callers must specify - * the 'reason' for why this executor is being killed. + * Cleans up any state associated with this PlanExecutor. Must be called before deleting this + * PlanExecutor. It is illegal to use a PlanExecutor after calling dispose(). 'cursorManager' + * may be null. + * + * There are multiple cleanup scenarios: + * - This PlanExecutor will only ever use one OperationContext. In this case the + * PlanExecutor::Deleter will automatically call dispose() before deleting the PlanExecutor, + * and the owner need not call dispose(). + * - This PlanExecutor may use multiple OperationContexts over its lifetime. In this case it + * is the owner's responsibility to call dispose() with a valid OperationContext before + * deleting the PlanExecutor. */ - void kill(std::string reason); + void dispose(OperationContext* opCtx, CursorManager* cursorManager); /** * If we're yielding locks, writes may occur to documents that we rely on to keep valid @@ -346,20 +375,6 @@ public: static std::string statestr(ExecState s); /** - * Change the yield policy of the PlanExecutor to 'policy'. If 'registerExecutor' is true, - * and the yield policy is YIELD_AUTO, then the plan executor gets registered to receive - * notifications of events from other threads. - * - * Everybody who sets the policy to YIELD_AUTO really wants to call registerExec() - * immediately after EXCEPT commands that create cursors...so we expose the ability to - * register (or not) here, rather than require all users to have yet another RAII object. - * Only cursor-creating things like find.cpp set registerExecutor to false. - */ - void setYieldPolicy(YieldPolicy policy, - const Collection* collection, - bool registerExecutor = true); - - /** * Stash the BSONObj so that it gets returned from the PlanExecutor on a later call to * getNext(). * @@ -379,28 +394,26 @@ public: */ BSONObjSet getOutputSorts() const; -private: - ExecState getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut); - /** - * RAII approach to ensuring that plan executors are deregistered. - * - * While retrieving the first batch of results, runQuery manually registers the executor - * with ClientCursor. Certain query execution paths, namely $where, can throw an exception. - * If we fail to deregister the executor, we will call invalidate/kill on the - * still-registered-yet-deleted executor. - * - * For any subsequent calls to getMore, the executor is already registered with ClientCursor - * by virtue of being cached, so this exception-proofing is not required. + * Communicate to this PlanExecutor that it is no longer registered with the CursorManager as a + * 'non-cached PlanExecutor'. */ - struct ScopedExecutorRegistration { - ScopedExecutorRegistration(PlanExecutor* exec, const Collection* collection); - ~ScopedExecutorRegistration(); + void unsetRegistered() { + _registered = false; + } - PlanExecutor* const _exec; - const Collection* const _collection; + bool isMarkedAsKilled() { + return static_cast<bool>(_killReason); }; + const std::string& getKillReason() { + invariant(isMarkedAsKilled()); + return *_killReason; + } + +private: + ExecState getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut); + /** * New PlanExecutor instances are created with the static make() methods above. */ @@ -410,19 +423,27 @@ private: std::unique_ptr<QuerySolution> qs, std::unique_ptr<CanonicalQuery> cq, const Collection* collection, - NamespaceString nss); + NamespaceString nss, + YieldPolicy yieldPolicy); + + /** + * A PlanExecutor must be disposed before destruction. In most cases, this will happen + * automatically through a PlanExecutor::Deleter or a ClientCursor. + */ + ~PlanExecutor(); /** * Public factory methods delegate to this private factory to do their work. */ - static StatusWith<std::unique_ptr<PlanExecutor>> make(OperationContext* opCtx, - std::unique_ptr<WorkingSet> ws, - std::unique_ptr<PlanStage> rt, - std::unique_ptr<QuerySolution> qs, - std::unique_ptr<CanonicalQuery> cq, - const Collection* collection, - NamespaceString nss, - YieldPolicy yieldPolicy); + static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + OperationContext* opCtx, + std::unique_ptr<WorkingSet> ws, + std::unique_ptr<PlanStage> rt, + std::unique_ptr<QuerySolution> qs, + std::unique_ptr<CanonicalQuery> cq, + const Collection* collection, + NamespaceString nss, + YieldPolicy yieldPolicy); /** * Clients of PlanExecutor expect that on receiving a new instance from one of the make() @@ -439,14 +460,10 @@ private: * ErrorCodes::QueryPlanKilled if plan execution cannot proceed due to a concurrent write or * catalog operation. */ - Status pickBestPlan(YieldPolicy policy, const Collection* collection); + Status pickBestPlan(const Collection* collection); - bool killed() { - return static_cast<bool>(_killReason); - }; - - // The OperationContext that we're executing within. We need this in order to release - // locks. + // The OperationContext that we're executing within. This can be updated if necessary by using + // detachFromOperationContext() and reattachToOperationContext(). OperationContext* _opCtx; std::unique_ptr<CanonicalQuery> _cq; @@ -454,15 +471,10 @@ private: std::unique_ptr<QuerySolution> _qs; std::unique_ptr<PlanStage> _root; - // If _killReason has a value, then we have been killed and the value represents the reason - // for the kill. - // The ScopedExecutorRegistration skips dereigstering the plan executor when the plan executor - // has been killed, so _killReason must outlive _safety. + // If _killReason has a value, then we have been killed and the value represents the reason for + // the kill. boost::optional<std::string> _killReason; - // Deregisters this executor when it is destroyed. - std::unique_ptr<ScopedExecutorRegistration> _safety; - // What namespace are we operating over? NamespaceString _nss; @@ -476,7 +488,11 @@ private: // stages. std::queue<BSONObj> _stash; - enum { kUsable, kSaved, kDetached } _currentState = kUsable; + enum { kUsable, kSaved, kDetached, kDisposed } _currentState = kUsable; + + // Set to true if this PlanExecutor is registered with the CursorManager as a 'non-cached + // PlanExecutor' to receive invalidations. + bool _registered = false; bool _everDetachedFromOperationContext = false; }; diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp index f315daec285..b7a8f0225ea 100644 --- a/src/mongo/db/query/plan_yield_policy.cpp +++ b/src/mongo/db/query/plan_yield_policy.cpp @@ -59,7 +59,7 @@ PlanYieldPolicy::PlanYieldPolicy(PlanExecutor::YieldPolicy policy, ClockSource* _planYielding(nullptr) {} bool PlanYieldPolicy::shouldYield() { - if (!allowedToYield()) + if (!canAutoYield()) return false; invariant(!_planYielding->getOpCtx()->lockState()->inAWriteUnitOfWork()); if (_forceYield) @@ -73,7 +73,7 @@ void PlanYieldPolicy::resetTimer() { bool PlanYieldPolicy::yield(RecordFetcher* fetcher) { invariant(_planYielding); - invariant(allowedToYield()); + invariant(canAutoYield()); // After we finish yielding (or in any early return), call resetTimer() to prevent yielding // again right away. We delay the resetTimer() call so that the clock doesn't start ticking diff --git a/src/mongo/db/query/plan_yield_policy.h b/src/mongo/db/query/plan_yield_policy.h index ec6dca7a0f2..892fbafe350 100644 --- a/src/mongo/db/query/plan_yield_policy.h +++ b/src/mongo/db/query/plan_yield_policy.h @@ -39,12 +39,8 @@ class RecordFetcher; class PlanYieldPolicy { public: - /** - * If policy == WRITE_CONFLICT_RETRY_ONLY, shouldYield will only return true after - * forceYield has been called, and yield will only abandonSnapshot without releasing any - * locks. - */ PlanYieldPolicy(PlanExecutor* exec, PlanExecutor::YieldPolicy policy); + /** * Only used in dbtests since we don't have access to a PlanExecutor. Since we don't have * access to the PlanExecutor to grab a ClockSource from, we pass in a ClockSource directly @@ -65,8 +61,8 @@ public: void resetTimer(); /** - * Used to cause a plan executor to give up locks and go to sleep. The PlanExecutor - * must *not* be in saved state. Handles calls to save/restore state internally. + * Used to cause a plan executor to release locks or storage engine state. The PlanExecutor must + * *not* be in saved state. Handles calls to save/restore state internally. * * If 'fetcher' is non-NULL, then we are yielding because the storage engine told us * that we will page fault on this record. We use 'fetcher' to retrieve the record @@ -78,23 +74,37 @@ public: bool yield(RecordFetcher* fetcher = NULL); /** - * All calls to shouldYield will return true until the next call to yield. + * All calls to shouldYield() will return true until the next call to yield. */ void forceYield() { - dassert(allowedToYield()); + dassert(canAutoYield()); _forceYield = true; } - bool allowedToYield() const { - return _policy != PlanExecutor::YIELD_MANUAL; + /** + * Returns true if there is a possibility that a collection lock will be yielded at some point + * during this PlanExecutor's lifetime. + */ + bool canReleaseLocksDuringExecution() const { + return _policy == PlanExecutor::YIELD_AUTO || _policy == PlanExecutor::YIELD_MANUAL; + } + + /** + * Returns true if this yield policy performs automatic yielding. Note 'yielding' here refers to + * either releasing storage engine resources via abandonSnapshot() OR yielding LockManager + * locks. + */ + bool canAutoYield() const { + return _policy == PlanExecutor::YIELD_AUTO || + _policy == PlanExecutor::WRITE_CONFLICT_RETRY_ONLY; } - void setPolicy(PlanExecutor::YieldPolicy policy) { - _policy = policy; + PlanExecutor::YieldPolicy getPolicy() const { + return _policy; } private: - PlanExecutor::YieldPolicy _policy; + const PlanExecutor::YieldPolicy _policy; bool _forceYield; ElapsedTracker _elapsedTracker; diff --git a/src/mongo/db/query/query_yield.cpp b/src/mongo/db/query/query_yield.cpp index 37ecc4bf528..6e1de753010 100644 --- a/src/mongo/db/query/query_yield.cpp +++ b/src/mongo/db/query/query_yield.cpp @@ -72,6 +72,9 @@ void QueryYield::yieldAllLocks(OperationContext* opCtx, // locks). If we are yielding, we are at a safe place to do so. opCtx->recoveryUnit()->abandonSnapshot(); + // Track the number of yields in CurOp. + CurOp::get(opCtx)->yielded(); + MONGO_FAIL_POINT_PAUSE_WHILE_SET(setYieldAllLocksHang); MONGO_FAIL_POINT_BLOCK(setYieldAllLocksWait, customWait) { @@ -82,9 +85,6 @@ void QueryYield::yieldAllLocks(OperationContext* opCtx, } } - // Track the number of yields in CurOp. - CurOp::get(opCtx)->yielded(); - if (fetcher) { fetcher->fetch(); } diff --git a/src/mongo/db/range_preserver.h b/src/mongo/db/range_preserver.h index 35b7694234e..2a893c61b87 100644 --- a/src/mongo/db/range_preserver.h +++ b/src/mongo/db/range_preserver.h @@ -46,11 +46,12 @@ public: * object does. The ClientCursorPin guarantees that the underlying ClientCursor is not deleted * until this object goes out of scope. */ - RangePreserver(const Collection* collection) { + RangePreserver(OperationContext* opCtx, const Collection* collection) { // Empty collections don't have any data we need to preserve if (collection) { // Pin keeps the CC from being deleted while it's in scope. We delete it ourselves. - _pin.emplace(collection->getCursorManager()->registerRangePreserverCursor(collection)); + _pin.emplace( + collection->getCursorManager()->registerRangePreserverCursor(opCtx, collection)); } } diff --git a/src/mongo/db/repair_database.cpp b/src/mongo/db/repair_database.cpp index 5755d9a69f2..615604ba8dc 100644 --- a/src/mongo/db/repair_database.cpp +++ b/src/mongo/db/repair_database.cpp @@ -242,7 +242,7 @@ Status repairDatabase(OperationContext* opCtx, } // Close the db to invalidate all current users and caches. - dbHolder().close(opCtx, dbName); + dbHolder().close(opCtx, dbName, "database closed for repair"); ON_BLOCK_EXIT([&dbName, &opCtx] { try { // Open the db after everything finishes. diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index 1c68848814b..1e1ec17962c 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -274,11 +274,10 @@ void ReplSource::loadAll(OperationContext* opCtx, SourceVector& v) { // check that no items are in sources other than that // add if missing int n = 0; - unique_ptr<PlanExecutor> exec( - InternalPlanner::collectionScan(opCtx, - localSources, - ctx.db()->getCollection(opCtx, localSources), - PlanExecutor::YIELD_MANUAL)); + auto exec = InternalPlanner::collectionScan(opCtx, + localSources, + ctx.db()->getCollection(opCtx, localSources), + PlanExecutor::NO_YIELD); BSONObj obj; PlanExecutor::ExecState state; while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { @@ -318,11 +317,8 @@ void ReplSource::loadAll(OperationContext* opCtx, SourceVector& v) { } } - unique_ptr<PlanExecutor> exec( - InternalPlanner::collectionScan(opCtx, - localSources, - ctx.db()->getCollection(opCtx, localSources), - PlanExecutor::YIELD_MANUAL)); + auto exec = InternalPlanner::collectionScan( + opCtx, localSources, ctx.db()->getCollection(opCtx, localSources), PlanExecutor::NO_YIELD); BSONObj obj; PlanExecutor::ExecState state; while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index dcc7875fc88..04517ae91a6 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -974,8 +974,7 @@ Status applyOperation_inlock(OperationContext* opCtx, o.hasField("_id")); if (opType[1] == 0) { - deleteObjects( - opCtx, collection, requestNss, o, PlanExecutor::YIELD_MANUAL, /*justOne*/ valueB); + deleteObjects(opCtx, collection, requestNss, o, /*justOne*/ valueB); } else verify(opType[1] == 'b'); // "db" advertisement if (incrementOpsAppliedStats) { diff --git a/src/mongo/db/repl/oplog_interface_local.cpp b/src/mongo/db/repl/oplog_interface_local.cpp index b878b2c583f..3ab6e6413c3 100644 --- a/src/mongo/db/repl/oplog_interface_local.cpp +++ b/src/mongo/db/repl/oplog_interface_local.cpp @@ -51,7 +51,7 @@ private: Lock::DBLock _dbLock; Lock::CollectionLock _collectionLock; OldClientContext _ctx; - std::unique_ptr<PlanExecutor> _exec; + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _exec; }; OplogIteratorLocal::OplogIteratorLocal(OperationContext* opCtx, const std::string& collectionName) @@ -61,7 +61,7 @@ OplogIteratorLocal::OplogIteratorLocal(OperationContext* opCtx, const std::strin _exec(InternalPlanner::collectionScan(opCtx, collectionName, _ctx.db()->getCollection(opCtx, collectionName), - PlanExecutor::YIELD_MANUAL, + PlanExecutor::NO_YIELD, InternalPlanner::BACKWARD)) {} StatusWith<OplogInterface::Iterator::Value> OplogIteratorLocal::next() { diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index 84eed6ba699..18ad4ab2d11 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -96,8 +96,8 @@ void appendReplicationInfo(OperationContext* opCtx, BSONObjBuilder& result, int { const NamespaceString localSources{"local.sources"}; AutoGetCollectionForReadCommand ctx(opCtx, localSources); - unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan( - opCtx, localSources.ns(), ctx.getCollection(), PlanExecutor::YIELD_MANUAL)); + auto exec = InternalPlanner::collectionScan( + opCtx, localSources.ns(), ctx.getCollection(), PlanExecutor::NO_YIELD); BSONObj obj; PlanExecutor::ExecState state; while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 9d6c85faad7..54e233728f1 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -539,8 +539,8 @@ void syncFixUp(OperationContext* opCtx, Helpers::RemoveSaver removeSaver("rollback", "", *it); // perform a collection scan and write all documents in the collection to disk - std::unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan( - opCtx, *it, db->getCollection(opCtx, *it), PlanExecutor::YIELD_AUTO)); + auto exec = InternalPlanner::collectionScan( + opCtx, *it, db->getCollection(opCtx, *it), PlanExecutor::YIELD_AUTO); BSONObj curObj; PlanExecutor::ExecState execState; while (PlanExecutor::ADVANCED == (execState = exec->getNext(&curObj, NULL))) { @@ -720,7 +720,6 @@ void syncFixUp(OperationContext* opCtx, collection, docNss, pattern, - PlanExecutor::YIELD_MANUAL, true, // justone true); // god } diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 858c49027ae..5700f0a4fcd 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -495,7 +495,7 @@ StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments( auto isForward = scanDirection == StorageInterface::ScanDirection::kForward; auto direction = isForward ? InternalPlanner::FORWARD : InternalPlanner::BACKWARD; - std::unique_ptr<PlanExecutor> planExecutor; + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> planExecutor; if (!indexName) { if (!startKey.isEmpty()) { return {ErrorCodes::NoSuchKey, @@ -509,12 +509,12 @@ StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments( // Use collection scan. planExecutor = isFind ? InternalPlanner::collectionScan( - opCtx, nss.ns(), collection, PlanExecutor::YIELD_MANUAL, direction) + opCtx, nss.ns(), collection, PlanExecutor::NO_YIELD, direction) : InternalPlanner::deleteWithCollectionScan( opCtx, collection, makeDeleteStageParamsForDeleteDocuments(), - PlanExecutor::YIELD_MANUAL, + PlanExecutor::NO_YIELD, direction); } else { // Use index scan. @@ -551,7 +551,7 @@ StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments( bounds.first, bounds.second, boundInclusion, - PlanExecutor::YIELD_MANUAL, + PlanExecutor::NO_YIELD, direction, InternalPlanner::IXSCAN_FETCH) : InternalPlanner::deleteWithIndexScan(opCtx, @@ -561,7 +561,7 @@ StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments( bounds.first, bounds.second, boundInclusion, - PlanExecutor::YIELD_MANUAL, + PlanExecutor::NO_YIELD, direction); } @@ -659,7 +659,7 @@ Status StorageInterfaceImpl::upsertById(OperationContext* opCtx, request.setUpsert(true); invariant(!request.isMulti()); // This follows from using an exact _id query. invariant(!request.shouldReturnAnyDocs()); - invariant(PlanExecutor::YIELD_MANUAL == request.getYieldPolicy()); + invariant(PlanExecutor::NO_YIELD == request.getYieldPolicy()); MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { // ParsedUpdate needs to be inside the write conflict retry loop because it contains diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 5ce737f5d11..b9853a776e5 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -1070,7 +1070,7 @@ std::string IdempotencyTest::validate() { BSONObj(), BSONObj(), BoundInclusion::kIncludeStartKeyOnly, - PlanExecutor::YIELD_MANUAL, + PlanExecutor::NO_YIELD, InternalPlanner::FORWARD, InternalPlanner::IXSCAN_FETCH); ASSERT(NULL != exec.get()); diff --git a/src/mongo/db/s/check_sharding_index_command.cpp b/src/mongo/db/s/check_sharding_index_command.cpp index db6b727aa2f..61a126928ce 100644 --- a/src/mongo/db/s/check_sharding_index_command.cpp +++ b/src/mongo/db/s/check_sharding_index_command.cpp @@ -134,16 +134,14 @@ public: max = Helpers::toKeyFormat(kp.extendRangeBound(max, false)); } - unique_ptr<PlanExecutor> exec( - InternalPlanner::indexScan(opCtx, - collection, - idx, - min, - max, - BoundInclusion::kIncludeStartKeyOnly, - PlanExecutor::YIELD_MANUAL, - InternalPlanner::FORWARD)); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO, collection); + auto exec = InternalPlanner::indexScan(opCtx, + collection, + idx, + min, + max, + BoundInclusion::kIncludeStartKeyOnly, + PlanExecutor::YIELD_AUTO, + InternalPlanner::FORWARD); // Find the 'missingField' value used to represent a missing document field in a key of // this index. diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp index be37e510a9d..0a267cd263d 100644 --- a/src/mongo/db/s/collection_range_deleter.cpp +++ b/src/mongo/db/s/collection_range_deleter.cpp @@ -181,7 +181,7 @@ int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, min, max, BoundInclusion::kIncludeStartKeyOnly, - PlanExecutor::YIELD_MANUAL, + PlanExecutor::NO_YIELD, InternalPlanner::FORWARD, InternalPlanner::IXSCAN_FETCH); RecordId rloc; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index ff635fb59ff..46bfc059de2 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -436,7 +436,10 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, // If we have drained all the cloned data, there is no need to keep the delete notify executor // around - if (_cloneLocs.empty()) { + if (_cloneLocs.empty() && _deleteNotifyExec) { + // We have a different OperationContext than when we created the PlanExecutor, so need to + // manually destroy it ourselves. + _deleteNotifyExec->dispose(opCtx, collection->getCursorManager()); _deleteNotifyExec.reset(); } @@ -473,7 +476,9 @@ void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* opCtx) { if (_deleteNotifyExec) { AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS); - + const auto cursorManager = + autoColl.getCollection() ? autoColl.getCollection()->getCursorManager() : nullptr; + _deleteNotifyExec->dispose(opCtx, cursorManager); _deleteNotifyExec.reset(); } } @@ -542,7 +547,6 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC } _deleteNotifyExec = std::move(statusWithDeleteNotificationPlanExecutor.getValue()); - _deleteNotifyExec->registerExec(collection); // Assume both min and max non-empty, append MinKey's to make them fit chosen index const KeyPattern kp(idx->keyPattern()); @@ -550,18 +554,15 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC BSONObj min = Helpers::toKeyFormat(kp.extendRangeBound(_args.getMinKey(), false)); BSONObj max = Helpers::toKeyFormat(kp.extendRangeBound(_args.getMaxKey(), false)); - std::unique_ptr<PlanExecutor> exec( - InternalPlanner::indexScan(opCtx, - collection, - idx, - min, - max, - BoundInclusion::kIncludeStartKeyOnly, - PlanExecutor::YIELD_MANUAL)); - // We can afford to yield here because any change to the base data that we might miss is already // being queued and will migrate in the 'transferMods' stage. - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO, collection); + auto exec = InternalPlanner::indexScan(opCtx, + collection, + idx, + min, + max, + BoundInclusion::kIncludeStartKeyOnly, + PlanExecutor::YIELD_AUTO); // Use the average object size to estimate how many objects a full chunk would carry do that // while traversing the chunk's range using the sharding index, below there's a fair amount of diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 7f8b7bf5468..28eca3989c5 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -34,6 +34,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/client/connection_string.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/query/plan_executor.h" #include "mongo/db/s/migration_chunk_cloner_source.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/s/move_chunk_request.h" @@ -48,7 +49,6 @@ class BSONArrayBuilder; class BSONObjBuilder; class Collection; class Database; -class PlanExecutor; class RecordId; class MigrationChunkClonerSourceLegacy final : public MigrationChunkClonerSource { @@ -181,7 +181,7 @@ private: // Registered deletion notifications plan executor, which will listen for document deletions // during the cloning stage - std::unique_ptr<PlanExecutor> _deleteNotifyExec; + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _deleteNotifyExec; // Protects the entries below stdx::mutex _mutex; diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 0ef8981a337..14853dd8a35 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -922,7 +922,6 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, ctx.db() ? ctx.db()->getCollection(opCtx, nss) : nullptr, nss, id, - PlanExecutor::YIELD_MANUAL, true /* justOne */, false /* god */, true /* fromMigrate */); diff --git a/src/mongo/db/s/split_chunk_command.cpp b/src/mongo/db/s/split_chunk_command.cpp index a2009bdf53b..01e712cf95b 100644 --- a/src/mongo/db/s/split_chunk_command.cpp +++ b/src/mongo/db/s/split_chunk_command.cpp @@ -73,13 +73,13 @@ bool checkIfSingleDoc(OperationContext* opCtx, BSONObj newmin = Helpers::toKeyFormat(kp.extendRangeBound(chunk->getMin(), false)); BSONObj newmax = Helpers::toKeyFormat(kp.extendRangeBound(chunk->getMax(), true)); - unique_ptr<PlanExecutor> exec(InternalPlanner::indexScan(opCtx, - collection, - idx, - newmin, - newmax, - BoundInclusion::kIncludeStartKeyOnly, - PlanExecutor::YIELD_MANUAL)); + auto exec = InternalPlanner::indexScan(opCtx, + collection, + idx, + newmin, + newmax, + BoundInclusion::kIncludeStartKeyOnly, + PlanExecutor::NO_YIELD); // check if exactly one document found PlanExecutor::ExecState state; BSONObj obj; diff --git a/src/mongo/db/s/split_vector_command.cpp b/src/mongo/db/s/split_vector_command.cpp index 71bbe04e7dc..d79dd271227 100644 --- a/src/mongo/db/s/split_vector_command.cpp +++ b/src/mongo/db/s/split_vector_command.cpp @@ -257,15 +257,14 @@ public: long long currCount = 0; long long numChunks = 0; - unique_ptr<PlanExecutor> exec( - InternalPlanner::indexScan(opCtx, - collection, - idx, - min, - max, - BoundInclusion::kIncludeStartKeyOnly, - PlanExecutor::YIELD_MANUAL, - InternalPlanner::FORWARD)); + auto exec = InternalPlanner::indexScan(opCtx, + collection, + idx, + min, + max, + BoundInclusion::kIncludeStartKeyOnly, + PlanExecutor::YIELD_AUTO, + InternalPlanner::FORWARD); BSONObj currKey; PlanExecutor::ExecState state = exec->getNext(&currKey, NULL); @@ -281,7 +280,6 @@ public: splitKeys.push_back(dotted_path_support::extractElementsBasedOnTemplate( prettyKey(idx->keyPattern(), currKey.getOwned()), keyPattern)); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO, collection); while (1) { while (PlanExecutor::ADVANCED == state) { currCount++; @@ -340,10 +338,9 @@ public: min, max, BoundInclusion::kIncludeStartKeyOnly, - PlanExecutor::YIELD_MANUAL, + PlanExecutor::YIELD_AUTO, InternalPlanner::FORWARD); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO, collection); state = exec->getNext(&currKey, NULL); } diff --git a/src/mongo/db/service_context_d_test_fixture.cpp b/src/mongo/db/service_context_d_test_fixture.cpp index b90d58fb72a..49efce677fb 100644 --- a/src/mongo/db/service_context_d_test_fixture.cpp +++ b/src/mongo/db/service_context_d_test_fixture.cpp @@ -105,7 +105,7 @@ void ServiceContextMongoDTest::_dropAllDBs(OperationContext* opCtx) { // allocates resources to track these empty databases. These resources not released by // dropAllDatabasesExceptLocal() will be leaked at exit unless we call DatabaseHolder::closeAll. BSONObjBuilder unused; - invariant(dbHolder().closeAll(opCtx, unused, false)); + invariant(dbHolder().closeAll(opCtx, unused, false, "all databases dropped")); } } // namespace mongo diff --git a/src/mongo/db/storage/mmap_v1/repair_database.cpp b/src/mongo/db/storage/mmap_v1/repair_database.cpp index 212a77abf4a..175afb53298 100644 --- a/src/mongo/db/storage/mmap_v1/repair_database.cpp +++ b/src/mongo/db/storage/mmap_v1/repair_database.cpp @@ -448,7 +448,7 @@ Status MMAPV1Engine::repairDatabase(OperationContext* opCtx, repairFileDeleter->success(); // Close the database so we can rename/delete the original data files - dbHolder().close(opCtx, dbName); + dbHolder().close(opCtx, dbName, "database closed for repair"); if (backupOriginalFiles) { _renameForBackup(dbName, reservedPath); diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp index 446f73500fa..59599f109c7 100644 --- a/src/mongo/db/ttl.cpp +++ b/src/mongo/db/ttl.cpp @@ -247,7 +247,7 @@ private: params.isMulti = true; params.canonicalQuery = canonicalQuery.getValue().get(); - std::unique_ptr<PlanExecutor> exec = + auto exec = InternalPlanner::deleteWithIndexScan(opCtx, collection, params, |