/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery #include "mongo/platform/basic.h" #include "mongo/s/query/cluster_cursor_manager.h" #include #include "mongo/util/clock_source.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { namespace { // // Helpers to construct a user-friendly error Status from a (nss, cursorId) pair. // Status cursorNotFoundStatus(const NamespaceString& nss, CursorId cursorId) { return {ErrorCodes::CursorNotFound, str::stream() << "Cursor not found (namespace: '" << nss.ns() << "', id: " << cursorId << ")."}; } Status cursorInUseStatus(const NamespaceString& nss, CursorId cursorId) { return {ErrorCodes::CursorInUse, str::stream() << "Cursor already in use (namespace: '" << nss.ns() << "', id: " << cursorId << ")."}; } // // CursorId is a 64-bit type, made up of a 32-bit prefix and a 32-bit suffix. The below helpers // convert between a CursorId and its prefix/suffix. // CursorId createCursorId(uint32_t prefix, uint32_t suffix) { return (static_cast(prefix) << 32) | suffix; } uint32_t extractPrefixFromCursorId(CursorId cursorId) { return static_cast(cursorId) >> 32; } } // namespace ClusterCursorManager::PinnedCursor::PinnedCursor(ClusterCursorManager* manager, std::unique_ptr cursor, const NamespaceString& nss, CursorId cursorId) : _manager(manager), _cursor(std::move(cursor)), _nss(nss), _cursorId(cursorId) { invariant(_manager); invariant(_cursor); invariant(_cursorId); // Zero is not a valid cursor id. } ClusterCursorManager::PinnedCursor::~PinnedCursor() { if (_cursor) { // The underlying cursor has not yet been returned. returnAndKillCursor(); } } ClusterCursorManager::PinnedCursor::PinnedCursor(PinnedCursor&& other) : _manager(std::move(other._manager)), _cursor(std::move(other._cursor)), _nss(std::move(other._nss)), _cursorId(std::move(other._cursorId)) {} ClusterCursorManager::PinnedCursor& ClusterCursorManager::PinnedCursor::operator=( ClusterCursorManager::PinnedCursor&& other) { if (_cursor) { // The underlying cursor has not yet been returned. returnAndKillCursor(); } _manager = std::move(other._manager); _cursor = std::move(other._cursor); _nss = std::move(other._nss); _cursorId = std::move(other._cursorId); return *this; } StatusWith ClusterCursorManager::PinnedCursor::next() { invariant(_cursor); return _cursor->next(); } bool ClusterCursorManager::PinnedCursor::isTailable() const { invariant(_cursor); return _cursor->isTailable(); } void ClusterCursorManager::PinnedCursor::returnCursor(CursorState cursorState) { invariant(_cursor); // Note that unpinning a cursor transfers ownership of the underlying ClusterClientCursor object // back to the manager. _manager->checkInCursor(std::move(_cursor), _nss, _cursorId, cursorState); *this = PinnedCursor(); } CursorId ClusterCursorManager::PinnedCursor::getCursorId() const { return _cursorId; } long long ClusterCursorManager::PinnedCursor::getNumReturnedSoFar() const { invariant(_cursor); return _cursor->getNumReturnedSoFar(); } void ClusterCursorManager::PinnedCursor::queueResult(const ClusterQueryResult& result) { invariant(_cursor); _cursor->queueResult(result); } bool ClusterCursorManager::PinnedCursor::remotesExhausted() { invariant(_cursor); return _cursor->remotesExhausted(); } Status ClusterCursorManager::PinnedCursor::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { invariant(_cursor); return _cursor->setAwaitDataTimeout(awaitDataTimeout); } void ClusterCursorManager::PinnedCursor::setOperationContext(OperationContext* txn) { return _cursor->setOperationContext(txn); } void ClusterCursorManager::PinnedCursor::returnAndKillCursor() { invariant(_cursor); // Inform the manager that the cursor should be killed. invariantOK(_manager->killCursor(_nss, _cursorId)); // Return the cursor to the manager. It will be deleted on the next call to // ClusterCursorManager::reapZombieCursors(). // // The value of the argument to returnCursor() doesn't matter; the cursor will be kept as a // zombie. returnCursor(CursorState::NotExhausted); } ClusterCursorManager::ClusterCursorManager(ClockSource* clockSource) : _clockSource(clockSource), _pseudoRandom(std::unique_ptr(SecureRandom::create())->nextInt64()) { invariant(_clockSource); } ClusterCursorManager::~ClusterCursorManager() { invariant(_cursorIdPrefixToNamespaceMap.empty()); invariant(_namespaceToContainerMap.empty()); } void ClusterCursorManager::shutdown() { stdx::unique_lock lk(_mutex); _inShutdown = true; lk.unlock(); killAllCursors(); reapZombieCursors(); } StatusWith ClusterCursorManager::registerCursor( std::unique_ptr cursor, const NamespaceString& nss, CursorType cursorType, CursorLifetime cursorLifetime) { // Read the clock out of the lock. const auto now = _clockSource->now(); stdx::unique_lock lk(_mutex); if (_inShutdown) { lk.unlock(); cursor->kill(); return Status(ErrorCodes::ShutdownInProgress, "Cannot register new cursors as we are in the process of shutting down"); } invariant(cursor); // Find the CursorEntryContainer for this namespace. If none exists, create one. auto nsToContainerIt = _namespaceToContainerMap.find(nss); if (nsToContainerIt == _namespaceToContainerMap.end()) { uint32_t containerPrefix = 0; do { // The server has always generated positive values for CursorId (which is a signed // type), so we use std::abs() here on the prefix for consistency with this historical // behavior. containerPrefix = static_cast(std::abs(_pseudoRandom.nextInt32())); } while (_cursorIdPrefixToNamespaceMap.count(containerPrefix) > 0); _cursorIdPrefixToNamespaceMap[containerPrefix] = nss; auto emplaceResult = _namespaceToContainerMap.emplace(nss, CursorEntryContainer(containerPrefix)); invariant(emplaceResult.second); invariant(_namespaceToContainerMap.size() == _cursorIdPrefixToNamespaceMap.size()); nsToContainerIt = emplaceResult.first; } else { invariant(!nsToContainerIt->second.entryMap.empty()); // If exists, shouldn't be empty. } CursorEntryContainer& container = nsToContainerIt->second; // Generate a CursorId (which can't be the invalid value zero). CursorEntryMap& entryMap = container.entryMap; CursorId cursorId = 0; do { const uint32_t cursorSuffix = static_cast(_pseudoRandom.nextInt32()); cursorId = createCursorId(container.containerPrefix, cursorSuffix); } while (cursorId == 0 || entryMap.count(cursorId) > 0); // Create a new CursorEntry and register it in the CursorEntryContainer's map. auto emplaceResult = entryMap.emplace(cursorId, CursorEntry(std::move(cursor), cursorType, cursorLifetime, now)); invariant(emplaceResult.second); return cursorId; } StatusWith ClusterCursorManager::checkOutCursor( const NamespaceString& nss, CursorId cursorId, OperationContext* txn) { // Read the clock out of the lock. const auto now = _clockSource->now(); stdx::lock_guard lk(_mutex); if (_inShutdown) { return Status(ErrorCodes::ShutdownInProgress, "Cannot check out cursor as we are in the process of shutting down"); } CursorEntry* entry = getEntry_inlock(nss, cursorId); if (!entry) { return cursorNotFoundStatus(nss, cursorId); } if (entry->getKillPending()) { return cursorNotFoundStatus(nss, cursorId); } std::unique_ptr cursor = entry->releaseCursor(); if (!cursor) { return cursorInUseStatus(nss, cursorId); } entry->setLastActive(now); cursor->setOperationContext(txn); // Note that pinning a cursor transfers ownership of the underlying ClusterClientCursor object // to the pin; the CursorEntry is left with a null ClusterClientCursor. return PinnedCursor(this, std::move(cursor), nss, cursorId); } void ClusterCursorManager::checkInCursor(std::unique_ptr cursor, const NamespaceString& nss, CursorId cursorId, CursorState cursorState) { stdx::unique_lock lk(_mutex); invariant(cursor); // Reset OperationContext so that non-user initiated operations do not try to use an invalid // operation context cursor->setOperationContext(nullptr); const bool remotesExhausted = cursor->remotesExhausted(); CursorEntry* entry = getEntry_inlock(nss, cursorId); invariant(entry); entry->returnCursor(std::move(cursor)); if (cursorState == CursorState::NotExhausted || entry->getKillPending()) { return; } if (!remotesExhausted) { // The cursor still has open remote cursors that need to be cleaned up. Schedule for // deletion by the reaper thread by setting the kill pending flag. entry->setKillPending(); return; } // The cursor is exhausted, is not already scheduled for deletion, and does not have any // remote cursor state left to clean up. We can delete the cursor right away. auto detachedCursor = detachCursor_inlock(nss, cursorId); invariantOK(detachedCursor.getStatus()); // Deletion of the cursor can happen out of the lock. lk.unlock(); detachedCursor.getValue().reset(); } Status ClusterCursorManager::killCursor(const NamespaceString& nss, CursorId cursorId) { stdx::lock_guard lk(_mutex); CursorEntry* entry = getEntry_inlock(nss, cursorId); if (!entry) { return cursorNotFoundStatus(nss, cursorId); } entry->setKillPending(); return Status::OK(); } void ClusterCursorManager::killMortalCursorsInactiveSince(Date_t cutoff) { stdx::lock_guard lk(_mutex); for (auto& nsContainerPair : _namespaceToContainerMap) { for (auto& cursorIdEntryPair : nsContainerPair.second.entryMap) { CursorEntry& entry = cursorIdEntryPair.second; if (entry.getLifetimeType() == CursorLifetime::Mortal && entry.getLastActive() <= cutoff) { entry.setInactive(); log() << "Marking cursor id " << cursorIdEntryPair.first << " for deletion, idle since " << entry.getLastActive().toString(); entry.setKillPending(); } } } } void ClusterCursorManager::killAllCursors() { stdx::lock_guard lk(_mutex); for (auto& nsContainerPair : _namespaceToContainerMap) { for (auto& cursorIdEntryPair : nsContainerPair.second.entryMap) { cursorIdEntryPair.second.setKillPending(); } } } std::size_t ClusterCursorManager::reapZombieCursors() { struct CursorDescriptor { CursorDescriptor(NamespaceString ns, CursorId cursorId, bool isInactive) : ns(std::move(ns)), cursorId(cursorId), isInactive(isInactive) {} NamespaceString ns; CursorId cursorId; bool isInactive; }; // List all zombie cursors under the manager lock, and kill them one-by-one while not holding // the lock (ClusterClientCursor::kill() is blocking, so we don't want to hold a lock while // issuing the kill). stdx::unique_lock lk(_mutex); std::vector zombieCursorDescriptors; for (auto& nsContainerPair : _namespaceToContainerMap) { const NamespaceString& nss = nsContainerPair.first; for (auto& cursorIdEntryPair : nsContainerPair.second.entryMap) { CursorId cursorId = cursorIdEntryPair.first; const CursorEntry& entry = cursorIdEntryPair.second; if (!entry.getKillPending()) { continue; } zombieCursorDescriptors.emplace_back(nss, cursorId, entry.isInactive()); } } std::size_t cursorsTimedOut = 0; for (auto& cursorDescriptor : zombieCursorDescriptors) { StatusWith> zombieCursor = detachCursor_inlock(cursorDescriptor.ns, cursorDescriptor.cursorId); if (!zombieCursor.isOK()) { // Cursor in use, or has already been deleted. continue; } lk.unlock(); zombieCursor.getValue()->setOperationContext(nullptr); zombieCursor.getValue()->kill(); zombieCursor.getValue().reset(); lk.lock(); if (cursorDescriptor.isInactive) { ++cursorsTimedOut; } } return cursorsTimedOut; } ClusterCursorManager::Stats ClusterCursorManager::stats() const { stdx::lock_guard lk(_mutex); Stats stats; for (auto& nsContainerPair : _namespaceToContainerMap) { for (auto& cursorIdEntryPair : nsContainerPair.second.entryMap) { const CursorEntry& entry = cursorIdEntryPair.second; if (entry.getKillPending()) { // Killed cursors do not count towards the number of pinned cursors or the number of // open cursors. continue; } if (!entry.isCursorOwned()) { ++stats.cursorsPinned; } switch (entry.getCursorType()) { case CursorType::NamespaceNotSharded: ++stats.cursorsNotSharded; break; case CursorType::NamespaceSharded: ++stats.cursorsSharded; break; } } } return stats; } boost::optional ClusterCursorManager::getNamespaceForCursorId( CursorId cursorId) const { stdx::lock_guard lk(_mutex); const auto it = _cursorIdPrefixToNamespaceMap.find(extractPrefixFromCursorId(cursorId)); if (it == _cursorIdPrefixToNamespaceMap.end()) { return boost::none; } return it->second; } ClusterCursorManager::CursorEntry* ClusterCursorManager::getEntry_inlock(const NamespaceString& nss, CursorId cursorId) { auto nsToContainerIt = _namespaceToContainerMap.find(nss); if (nsToContainerIt == _namespaceToContainerMap.end()) { return nullptr; } CursorEntryMap& entryMap = nsToContainerIt->second.entryMap; auto entryMapIt = entryMap.find(cursorId); if (entryMapIt == entryMap.end()) { return nullptr; } return &entryMapIt->second; } StatusWith> ClusterCursorManager::detachCursor_inlock( const NamespaceString& nss, CursorId cursorId) { CursorEntry* entry = getEntry_inlock(nss, cursorId); if (!entry) { return cursorNotFoundStatus(nss, cursorId); } std::unique_ptr cursor = entry->releaseCursor(); if (!cursor) { return cursorInUseStatus(nss, cursorId); } auto nsToContainerIt = _namespaceToContainerMap.find(nss); invariant(nsToContainerIt != _namespaceToContainerMap.end()); CursorEntryMap& entryMap = nsToContainerIt->second.entryMap; size_t eraseResult = entryMap.erase(cursorId); invariant(1 == eraseResult); if (entryMap.empty()) { // This was the last cursor remaining in the given namespace. Erase all state associated // with this namespace. size_t numDeleted = _cursorIdPrefixToNamespaceMap.erase(nsToContainerIt->second.containerPrefix); invariant(numDeleted == 1); _namespaceToContainerMap.erase(nsToContainerIt); invariant(_namespaceToContainerMap.size() == _cursorIdPrefixToNamespaceMap.size()); } return std::move(cursor); } } // namespace mongo