diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-04-18 18:05:25 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-05-26 10:20:26 -0400 |
commit | fe2046a0eb715cec4d978454458f79b7588ee7a7 (patch) | |
tree | 50311590ddec96f9487dc4069891700cb4b9d23c | |
parent | 58486605c09672b3bfce8608dca403a145413bba (diff) | |
download | mongo-fe2046a0eb715cec4d978454458f79b7588ee7a7.tar.gz |
SERVER-21754 Partition CursorManager's structures
-rw-r--r-- | src/mongo/db/catalog/SConscript | 9 | ||||
-rw-r--r-- | src/mongo/db/catalog/cursor_manager.cpp | 227 | ||||
-rw-r--r-- | src/mongo/db/catalog/cursor_manager.h | 61 | ||||
-rw-r--r-- | src/mongo/db/catalog/util/SConscript | 14 | ||||
-rw-r--r-- | src/mongo/db/catalog/util/partitioned.h | 386 | ||||
-rw-r--r-- | src/mongo/db/catalog/util/partitioned_test.cpp | 273 | ||||
-rw-r--r-- | src/mongo/db/clientcursor.h | 4 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.h | 22 | ||||
-rw-r--r-- | src/mongo/dbtests/querytests.cpp | 11 |
10 files changed, 875 insertions, 137 deletions
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript index 271301d4d5f..5e37439c403 100644 --- a/src/mongo/db/catalog/SConscript +++ b/src/mongo/db/catalog/SConscript @@ -4,6 +4,15 @@ Import("env") env = env.Clone() +env.SConscript( + dirs=[ + 'util', + ], + exports=[ + 'env', + ], +) + env.Library( target='collection_options', source=[ diff --git a/src/mongo/db/catalog/cursor_manager.cpp b/src/mongo/db/catalog/cursor_manager.cpp index ef73ab4a342..918134b057d 100644 --- a/src/mongo/db/catalog/cursor_manager.cpp +++ b/src/mongo/db/catalog/cursor_manager.cpp @@ -46,11 +46,11 @@ #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/platform/random.h" +#include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/startup_test.h" namespace mongo { - using std::vector; constexpr Minutes CursorManager::kDefaultCursorTimeoutMinutes; @@ -60,6 +60,8 @@ MONGO_EXPORT_SERVER_PARAMETER( int, durationCount<Milliseconds>(CursorManager::kDefaultCursorTimeoutMinutes)); +constexpr int CursorManager::kNumPartitions; + namespace { uint32_t idFromCursorId(CursorId id) { uint64_t x = static_cast<uint64_t>(id); @@ -226,7 +228,7 @@ bool GlobalCursorIdCache::eraseCursor(OperationContext* opCtx, CursorId id, bool } Status eraseStatus = collection->getCursorManager()->eraseCursor(opCtx, id, checkAuth); - massert(16089, + uassert(16089, eraseStatus.reason(), eraseStatus.code() == ErrorCodes::OK || eraseStatus.code() == ErrorCodes::CursorNotFound); @@ -298,21 +300,29 @@ bool CursorManager::eraseCursorGlobal(OperationContext* opCtx, CursorId id) { // -------------------------- - -CursorManager::CursorManager(NamespaceString nss) : _nss(std::move(nss)) { - if (!isGlobalManager()) { - // Generate a unique id for this collection. - _collectionCacheRuntimeId = globalCursorIdCache->registerCursorManager(_nss); - } - _random.reset(new PseudoRandom(globalCursorIdCache->nextSeed())); +std::size_t CursorManager::PlanExecutorPartitioner::operator()(const PlanExecutor* exec, + const std::size_t nPartitions) { + auto token = exec->getRegistrationToken(); + invariant(token); + return (*token) % nPartitions; } +CursorManager::CursorManager(NamespaceString nss) + : _nss(std::move(nss)), + _collectionCacheRuntimeId(_nss.isEmpty() ? 0 + : globalCursorIdCache->registerCursorManager(_nss)), + _random(stdx::make_unique<PseudoRandom>(globalCursorIdCache->nextSeed())), + _registeredPlanExecutors(), + _cursorMap(stdx::make_unique<Partitioned<unordered_map<CursorId, ClientCursor*>>>()) {} + CursorManager::~CursorManager() { + // All cursors and PlanExecutors should have been deleted already. + invariant(_registeredPlanExecutors.empty()); + invariant(_cursorMap->empty()); + if (!isGlobalManager()) { globalCursorIdCache->deregisterCursorManager(_collectionCacheRuntimeId, _nss); } - invariant(_cursors.empty()); - invariant(_nonCachedExecutors.empty()); } void CursorManager::invalidateAll(OperationContext* opCtx, @@ -320,43 +330,49 @@ void CursorManager::invalidateAll(OperationContext* opCtx, const std::string& reason) { invariant(!isGlobalManager()); // The global cursor manager should never need to kill cursors. dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X)); - - stdx::lock_guard<SimpleMutex> lk(_mutex); fassert(28819, !BackgroundOperation::inProgForNs(_nss)); - - for (auto&& exec : _nonCachedExecutors) { - // We kill the executor, but it deletes itself. - exec->markAsKilled(reason); - } - _nonCachedExecutors.clear(); - - CursorMap newMap; - for (auto&& entry : _cursors) { - auto* cursor = entry.second; - cursor->markAsKilled(reason); - - if (cursor->_isPinned) { - // There is an active user of this cursor, who is now responsible for cleaning it up. - // This CursorManager will no longer track this cursor. - continue; + auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions(); + for (auto&& partition : allExecPartitions) { + for (auto&& exec : partition) { + // The PlanExecutor is owned elsewhere, so we just mark it as killed and let it be + // cleaned up later. + exec->markAsKilled(reason); } - - if (!collectionGoingAway) { - // We keep around unpinned cursors so that future attempts to use the cursor will result - // in a useful error message. - newMap.insert(entry); - } else { - // The collection is going away, so there's no point in keeping any state. - cursor->dispose(opCtx); - delete cursor; + } + allExecPartitions.clear(); + + // Mark all cursors as killed, but keep around those we can in order to provide a useful error + // message to the user when they attempt to use it next time. + auto allCurrentPartitions = _cursorMap->lockAllPartitions(); + for (auto&& partition : allCurrentPartitions) { + for (auto it = partition.begin(); it != partition.end();) { + auto* cursor = it->second; + cursor->markAsKilled(reason); + + // If pinned, there is an active user of this cursor, who is now responsible for + // cleaning it up. Otherwise, we can immediately dispose of it. + if (cursor->_isPinned) { + it = partition.erase(it); + continue; + } + + if (!collectionGoingAway) { + // We keep around unpinned cursors so that future attempts to use the cursor will + // result in a useful error message. + ++it; + } else { + cursor->dispose(opCtx); + delete cursor; + it = partition.erase(it); + } } } - _cursors = newMap; } void CursorManager::invalidateDocument(OperationContext* opCtx, const RecordId& dl, InvalidationType type) { + dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); invariant(!isGlobalManager()); // The global cursor manager should never receive invalidations. if (supportsDocLocking()) { // If a storage engine supports doc locking, then we do not need to invalidate. @@ -364,15 +380,19 @@ void CursorManager::invalidateDocument(OperationContext* opCtx, return; } - stdx::lock_guard<SimpleMutex> lk(_mutex); - - for (ExecSet::iterator it = _nonCachedExecutors.begin(); it != _nonCachedExecutors.end(); - ++it) { - (*it)->invalidate(opCtx, dl, type); + auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions(); + for (auto&& partition : allExecPartitions) { + for (auto&& exec : partition) { + exec->invalidate(opCtx, dl, type); + } } - for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) { - i->second->getExecutor()->invalidate(opCtx, dl, type); + auto allPartitions = _cursorMap->lockAllPartitions(); + for (auto&& partition : allPartitions) { + for (auto&& entry : partition) { + auto exec = entry.second->getExecutor(); + exec->invalidate(opCtx, dl, type); + } } } @@ -384,42 +404,48 @@ bool CursorManager::cursorShouldTimeout_inlock(const ClientCursor* cursor, Date_ } std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, Date_t now) { - vector<ClientCursor*> toDelete; - - stdx::lock_guard<SimpleMutex> lk(_mutex); - - for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) { - ClientCursor* cc = i->second; - if (cursorShouldTimeout_inlock(cc, now)) - toDelete.push_back(cc); - } - - // Properly dispose of each cursor that was timed out. - for (vector<ClientCursor*>::const_iterator i = toDelete.begin(); i != toDelete.end(); ++i) { - ClientCursor* cc = *i; - _deregisterCursor_inlock(cc); - cc->dispose(opCtx); - delete cc; + std::vector<std::unique_ptr<ClientCursor, ClientCursor::Deleter>> toDelete; + + for (size_t partitionId = 0; partitionId < kNumPartitions; ++partitionId) { + auto lockedPartition = _cursorMap->lockOnePartitionById(partitionId); + for (auto it = lockedPartition->begin(); it != lockedPartition->end();) { + auto* cursor = it->second; + if (cursorShouldTimeout_inlock(cursor, now)) { + // Dispose of the cursor and remove it from the partition. + cursor->dispose(opCtx); + toDelete.push_back(std::unique_ptr<ClientCursor, ClientCursor::Deleter>{cursor}); + it = lockedPartition->erase(it); + } else { + ++it; + } + } } return toDelete.size(); } -void CursorManager::registerExecutor(PlanExecutor* exec) { - stdx::lock_guard<SimpleMutex> lk(_mutex); - const std::pair<ExecSet::iterator, bool> result = _nonCachedExecutors.insert(exec); - invariant(result.second); // make sure this was inserted +namespace { +static AtomicUInt32 registeredPlanExecutorId; +} // namespace + +Partitioned<unordered_set<PlanExecutor*>>::PartitionId CursorManager::registerExecutor( + PlanExecutor* exec) { + auto partitionId = registeredPlanExecutorId.fetchAndAdd(1); + exec->setRegistrationToken(partitionId); + _registeredPlanExecutors.insert(exec); + return partitionId; } void CursorManager::deregisterExecutor(PlanExecutor* exec) { - stdx::lock_guard<SimpleMutex> lk(_mutex); - _nonCachedExecutors.erase(exec); + if (auto partitionId = exec->getRegistrationToken()) { + _registeredPlanExecutors.erase(exec); + } } StatusWith<ClientCursorPin> CursorManager::pinCursor(OperationContext* opCtx, CursorId id) { - stdx::lock_guard<SimpleMutex> lk(_mutex); - CursorMap::const_iterator it = _cursors.find(id); - if (it == _cursors.end()) { + auto lockedPartition = _cursorMap->lockOnePartition(id); + auto it = lockedPartition->find(id); + if (it == lockedPartition->end()) { return {ErrorCodes::CursorNotFound, str::stream() << "cursor id " << id << " not found"}; } @@ -430,7 +456,7 @@ StatusWith<ClientCursorPin> CursorManager::pinCursor(OperationContext* opCtx, Cu Status error{ErrorCodes::QueryPlanKilled, str::stream() << "cursor killed because: " << cursor->getExecutor()->getKillReason()}; - _deregisterCursor_inlock(cursor); + lockedPartition->erase(cursor->cursorid()); cursor->dispose(opCtx); delete cursor; return error; @@ -443,28 +469,26 @@ void CursorManager::unpin(OperationContext* opCtx, ClientCursor* cursor) { // Avoid computing the current time within the critical section. auto now = opCtx->getServiceContext()->getPreciseClockSource()->now(); - stdx::lock_guard<SimpleMutex> lk(_mutex); - + auto partitionLock = _cursorMap->lockOnePartition(cursor->cursorid()); invariant(cursor->_isPinned); cursor->_isPinned = false; cursor->_lastUseDate = now; } void CursorManager::getCursorIds(std::set<CursorId>* openCursors) const { - stdx::lock_guard<SimpleMutex> lk(_mutex); - - for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) { - ClientCursor* cc = i->second; - openCursors->insert(cc->cursorid()); + auto allPartitions = _cursorMap->lockAllPartitions(); + for (auto&& partition : allPartitions) { + for (auto&& entry : partition) { + openCursors->insert(entry.first); + } } } size_t CursorManager::numCursors() const { - stdx::lock_guard<SimpleMutex> lk(_mutex); - return _cursors.size(); + return _cursorMap->size(); } -CursorId CursorManager::_allocateCursorId_inlock() { +CursorId CursorManager::allocateCursorId_inlock() { for (int i = 0; i < 10000; i++) { // The leading two bits of a CursorId are used to determine if the cursor is registered on // the global cursor manager. @@ -481,7 +505,8 @@ CursorId CursorManager::_allocateCursorId_inlock() { uint32_t myPart = static_cast<uint32_t>(_random->nextInt32()); id = cursorIdFromParts(_collectionCacheRuntimeId, myPart); } - if (_cursors.count(id) == 0) + auto partition = _cursorMap->lockOnePartition(id); + if (partition->count(id) == 0) return id; } fassertFailed(17360); @@ -492,42 +517,41 @@ ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx, // Avoid computing the current time within the critical section. auto now = opCtx->getServiceContext()->getPreciseClockSource()->now(); - stdx::lock_guard<SimpleMutex> lk(_mutex); // Make sure the PlanExecutor isn't registered, since we will register the ClientCursor wrapping // it. invariant(cursorParams.exec); - _nonCachedExecutors.erase(cursorParams.exec.get()); + deregisterExecutor(cursorParams.exec.get()); cursorParams.exec.get_deleter().dismissDisposal(); cursorParams.exec->unsetRegistered(); - CursorId cursorId = _allocateCursorId_inlock(); - invariant(cursorId); + // Note we must hold the registration lock from now until insertion into '_cursorMap' to ensure + // we don't insert two cursors with the same cursor id. + stdx::lock_guard<SimpleMutex> lock(_registrationLock); + CursorId cursorId = allocateCursorId_inlock(); std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor( new ClientCursor(std::move(cursorParams), this, cursorId, now)); - // Transfer ownership of the cursor to '_cursors'. + // Transfer ownership of the cursor to '_cursorMap'. + auto partition = _cursorMap->lockOnePartition(cursorId); ClientCursor* unownedCursor = clientCursor.release(); - _cursors[cursorId] = unownedCursor; + partition->emplace(cursorId, unownedCursor); return ClientCursorPin(opCtx, unownedCursor); } void CursorManager::deregisterCursor(ClientCursor* cc) { - stdx::lock_guard<SimpleMutex> lk(_mutex); - _deregisterCursor_inlock(cc); + _cursorMap->erase(cc->cursorid()); } Status CursorManager::eraseCursor(OperationContext* opCtx, CursorId id, bool shouldAudit) { - stdx::lock_guard<SimpleMutex> lk(_mutex); - - CursorMap::iterator it = _cursors.find(id); - if (it == _cursors.end()) { + auto lockedPartition = _cursorMap->lockOnePartition(id); + auto it = lockedPartition->find(id); + if (it == lockedPartition->end()) { if (shouldAudit) { audit::logKillCursorsAuthzCheck( opCtx->getClient(), _nss, id, ErrorCodes::CursorNotFound); } return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id}; } - auto cursor = it->second; if (cursor->_isPinned) { @@ -537,20 +561,15 @@ Status CursorManager::eraseCursor(OperationContext* opCtx, CursorId id, bool sho } return {ErrorCodes::OperationFailed, str::stream() << "Cannot kill pinned cursor: " << id}; } + std::unique_ptr<ClientCursor, ClientCursor::Deleter> ownedCursor(cursor); if (shouldAudit) { audit::logKillCursorsAuthzCheck(opCtx->getClient(), _nss, id, ErrorCodes::OK); } - _deregisterCursor_inlock(cursor); - cursor->dispose(opCtx); - delete cursor; + lockedPartition->erase(ownedCursor->cursorid()); + ownedCursor->dispose(opCtx); return Status::OK(); } -void CursorManager::_deregisterCursor_inlock(ClientCursor* cc) { - invariant(cc); - CursorId id = cc->cursorid(); - _cursors.erase(id); -} } // namespace mongo diff --git a/src/mongo/db/catalog/cursor_manager.h b/src/mongo/db/catalog/cursor_manager.h index 59b8c5fbf17..70fce8f3804 100644 --- a/src/mongo/db/catalog/cursor_manager.h +++ b/src/mongo/db/catalog/cursor_manager.h @@ -30,11 +30,13 @@ #pragma once - +#include "mongo/db/catalog/util/partitioned.h" #include "mongo/db/clientcursor.h" +#include "mongo/db/cursor_id.h" #include "mongo/db/invalidation_type.h" #include "mongo/db/namespace_string.h" #include "mongo/db/record_id.h" +#include "mongo/platform/unordered_map.h" #include "mongo/platform/unordered_set.h" #include "mongo/util/concurrency/mutex.h" #include "mongo/util/duration.h" @@ -76,6 +78,7 @@ class CursorManager { public: // The number of minutes a cursor is allowed to be idle before timing out. static constexpr Minutes kDefaultCursorTimeoutMinutes{10}; + using RegistrationToken = Partitioned<unordered_set<PlanExecutor*>>::PartitionId; CursorManager(NamespaceString nss); @@ -114,11 +117,12 @@ public: std::size_t timeoutCursors(OperationContext* opCtx, Date_t now); /** - * Register an executor so that it can be notified of deletion/invalidation during yields. - * Must be called before an executor yields. If an executor is registered inside a - * ClientCursor it must not be itself registered; the two are mutually exclusive. + * Register an executor so that it can be notified of deletions, invalidations, collection + * drops, or the like during yields. Must be called before an executor yields. Registration + * happens automatically for yielding PlanExecutors, so this should only be called by a + * PlanExecutor itself. Returns a token that must be stored for use during deregistration. */ - void registerExecutor(PlanExecutor* exec); + Partitioned<unordered_set<PlanExecutor*>>::PartitionId registerExecutor(PlanExecutor* exec); /** * Remove an executor from the registry. It is legal to call this even if 'exec' is not @@ -188,10 +192,16 @@ public: static std::size_t timeoutCursorsGlobal(OperationContext* opCtx, Date_t now); private: + static constexpr int kNumPartitions = 16; friend class ClientCursorPin; - CursorId _allocateCursorId_inlock(); - void _deregisterCursor_inlock(ClientCursor* cc); + struct PlanExecutorPartitioner { + std::size_t operator()(const PlanExecutor* exec, std::size_t nPartitions); + }; + CursorId allocateCursorId_inlock(); + + ClientCursorPin _registerCursor( + OperationContext* opCtx, std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor); void deregisterCursor(ClientCursor* cc); @@ -203,16 +213,33 @@ private: return _nss.isEmpty(); } - NamespaceString _nss; - uint32_t _collectionCacheRuntimeId; + // No locks are needed to consult these data members. + const NamespaceString _nss; + const uint32_t _collectionCacheRuntimeId; + + // A CursorManager holds a pointer to all open PlanExecutors and all open ClientCursors. All + // pointers to PlanExecutors are unowned, and a PlanExecutor will notify the CursorManager when + // it is being destroyed. ClientCursors are owned by the CursorManager, except when they are in + // use by a ClientCursorPin. When in use by a pin, an unowned pointer remains to ensure they + // still receive invalidations while in use. + // + // There are several mutexes at work to protect concurrent access to data structures managed by + // this cursor manager. The two registration data structures '_registeredPlanExecutors' and + // '_cursorMap' are partitioned to decrease contention, and each partition of the structure is + // protected by its own mutex. Separately, there is a '_registrationLock' which protects + // concurrent access to '_random' for cursor id generation, and must be held from cursor id + // generation until insertion into '_cursorMap'. If you ever need to acquire more than one of + // these mutexes at once, you must follow the following rules: + // - '_registrationLock' must be acquired first, if at all. + // - Mutex(es) for '_registeredPlanExecutors' must be acquired next. + // - Mutex(es) for '_cursorMap' must be acquired next. + // - If you need to access multiple partitions within '_registeredPlanExecutors' or '_cursorMap' + // at once, you must acquire the mutexes for those partitions in ascending order, or use the + // partition helpers to acquire mutexes for all partitions. + mutable SimpleMutex _registrationLock; std::unique_ptr<PseudoRandom> _random; - - mutable SimpleMutex _mutex; - - typedef unordered_set<PlanExecutor*> ExecSet; - ExecSet _nonCachedExecutors; - - typedef std::map<CursorId, ClientCursor*> CursorMap; - CursorMap _cursors; + Partitioned<unordered_set<PlanExecutor*>, kNumPartitions, PlanExecutorPartitioner> + _registeredPlanExecutors; + std::unique_ptr<Partitioned<unordered_map<CursorId, ClientCursor*>, kNumPartitions>> _cursorMap; }; } // namespace mongo diff --git a/src/mongo/db/catalog/util/SConscript b/src/mongo/db/catalog/util/SConscript new file mode 100644 index 00000000000..320ea3396cf --- /dev/null +++ b/src/mongo/db/catalog/util/SConscript @@ -0,0 +1,14 @@ +# -*- mode: python; -*- + +Import("env") + +env = env.Clone() + +env.CppUnitTest( + target='partitioned_test', + source=[ + 'partitioned_test.cpp' + ], + LIBDEPS=[ + ] +) diff --git a/src/mongo/db/catalog/util/partitioned.h b/src/mongo/db/catalog/util/partitioned.h new file mode 100644 index 00000000000..475ea415f48 --- /dev/null +++ b/src/mongo/db/catalog/util/partitioned.h @@ -0,0 +1,386 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <algorithm> +#include <cstdlib> +#include <iterator> +#include <memory> +#include <numeric> +#include <utility> +#include <vector> + +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/memory.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +inline std::size_t partitionOf(const char x, const std::size_t nPartitions) { + return static_cast<unsigned char>(x) % nPartitions; +} +inline std::size_t partitionOf(const unsigned char x, const std::size_t nPartitions) { + return x % nPartitions; +} +inline std::size_t partitionOf(const signed char x, const std::size_t nPartitions) { + return static_cast<unsigned char>(x) % nPartitions; +} +inline std::size_t partitionOf(const int x, const std::size_t nPartitions) { + return static_cast<unsigned int>(x) % nPartitions; +} +inline std::size_t partitionOf(const unsigned int x, const std::size_t nPartitions) { + return x % nPartitions; +} +inline std::size_t partitionOf(const short x, const std::size_t nPartitions) { + return static_cast<unsigned short>(x) % nPartitions; +} +inline std::size_t partitionOf(const unsigned short x, const std::size_t nPartitions) { + return x % nPartitions; +} +inline std::size_t partitionOf(const long x, const std::size_t nPartitions) { + return static_cast<unsigned long>(x) % nPartitions; +} +inline std::size_t partitionOf(const unsigned long x, const std::size_t nPartitions) { + return x % nPartitions; +} +inline std::size_t partitionOf(const long long x, const std::size_t nPartitions) { + return static_cast<unsigned long long>(x) % nPartitions; +} +inline std::size_t partitionOf(const unsigned long long x, const std::size_t nPartitions) { + return x % nPartitions; +} +inline std::size_t partitionOf(const wchar_t x, const std::size_t nPartitions) { + return x % nPartitions; +} +inline std::size_t partitionOf(const char16_t x, const std::size_t nPartitions) { + return x % nPartitions; +} +inline std::size_t partitionOf(const char32_t x, const std::size_t nPartitions) { + return x % nPartitions; +} + +/** + * The default partitioning policy: If using a numeric built-in type, will use the lower bits of a + * number to decide which partition to assign it to. If using any other type T, you must define + * partitionOf(const T&, std::size_t) or specialize this template. + */ +template <typename T> +struct Partitioner { + std::size_t operator()(const T& x, const std::size_t nPartitions) { + return partitionOf(x, nPartitions); + } +}; + +namespace partitioned_detail { + +template <typename Key, typename Value> +Key getKey(const std::pair<Key, Value>& pair) { + return std::get<0>(pair); +} + +template <typename Key> +Key getKey(const Key& key) { + return key; +} + +inline std::vector<stdx::unique_lock<stdx::mutex>> lockAllPartitions( + std::vector<stdx::mutex>& mutexes) { + std::vector<stdx::unique_lock<stdx::mutex>> result; + std::transform(mutexes.begin(), mutexes.end(), std::back_inserter(result), [](auto&& mutex) { + return stdx::unique_lock<stdx::mutex>{mutex}; + }); + return result; +} +} // namespace partitioned_detail + +/** + * A templated class used to partition an associative container like a set or a map to increase + * scalability. `AssociativeContainer` is a type like a std::map or std::set that meets the + * requirements of either the AssociativeContainer or UnorderedAssociativeContainer concept. + * `nPartitions` determines how many partitions to make. `Partitioner` can be provided to customize + * how the partition of each entry is computed. + */ +template <typename AssociativeContainer, + std::size_t nPartitions = 16, + typename KeyPartitioner = Partitioner<typename AssociativeContainer::key_type>> +class Partitioned { +private: + // Used to create an iterator representing the end of the partitioned structure. + struct IteratorEndTag {}; + +public: + static_assert(nPartitions > 0, "cannot create partitioned structure with 0 partitions"); + using value_type = typename AssociativeContainer::value_type; + using key_type = typename AssociativeContainer::key_type; + using PartitionId = std::size_t; + + /** + * Used to protect access to all partitions of this partitioned associative structure. For + * example, may be used to empty each partition in the structure, or to provide a snapshotted + * count of the number of entries across all partitions. + */ + class All { + private: + /** + * Acquires locks for all partitions. The lifetime of this `All` object must be shorter than + * that of `partitionedContainer`. + */ + explicit All(Partitioned& partitionedContainer) + : _lockGuards(partitioned_detail::lockAllPartitions(partitionedContainer._mutexes)), + _partitionedContainer(&partitionedContainer) {} + + public: + /** + * Returns an iterator at the start of the partitions. + */ + auto begin() & { + return this->_partitionedContainer->_partitions.begin(); + } + + /** + * Returns an iterator at the end of the partitions. + */ + auto end() & { + return this->_partitionedContainer->_partitions.end(); + } + + /** + * Returns an iterator at the start of the partitions. + */ + auto begin() const& { + return this->_partitionedContainer->_partitions.begin(); + } + void begin() && = delete; + + /** + * Returns an iterator at the end of the partitions. + */ + auto end() const& { + return this->_partitionedContainer->_partitions.end(); + } + void end() && = delete; + + /** + * Returns the number of elements in all partitions, summed together. + */ + std::size_t size() const { + return std::accumulate( + this->_partitionedContainer->_partitions.begin(), + this->_partitionedContainer->_partitions.end(), + std::size_t{0}, + [](auto&& total, auto&& partition) { return total + partition.size(); }); + } + + /** + * Returns true if each partition is empty. + */ + bool empty() const { + return std::all_of(this->_partitionedContainer->_partitions.begin(), + this->_partitionedContainer->_partitions.end(), + [](auto&& partition) { return partition.empty(); }); + } + + /** + * Returns the number of entries with the given key. + */ + std::size_t count(const key_type& key) const { + auto partitionId = KeyPartitioner()(key, nPartitions); + return this->_partitionedContainer->_partitions[partitionId].count(key); + } + + /** + * Empties each container within each partition. + */ + void clear() { + for (auto&& partition : this->_partitionedContainer->_partitions) { + partition.clear(); + } + } + + /** + * Inserts `value` into its designated partition. + */ + void insert(value_type value) & { + const auto partitionId = + KeyPartitioner()(partitioned_detail::getKey(value), nPartitions); + this->_partitionedContainer->_partitions[partitionId].insert(std::move(value)); + } + void insert(value_type)&& = delete; + + /** + * Erases one entry from the partitioned structure, returns the number of entries removed. + */ + std::size_t erase(const key_type& key) & { + const auto partitionId = KeyPartitioner()(key, nPartitions); + return this->_partitionedContainer->_partitions[partitionId].erase(key); + } + void erase(const key_type&) && = delete; + + private: + friend class Partitioned; + + std::vector<stdx::unique_lock<stdx::mutex>> _lockGuards; + Partitioned* _partitionedContainer; + }; + + /** + * Used to protect access to a single partition of a Partitioned. For example, can be used to do + * a series of reads and/or modifications to a single entry without interference from other + * threads. + */ + class OnePartition { + public: + /** + * Returns a pointer to the structure in this partition. + */ + AssociativeContainer* operator->() const& { + return &this->_partitioned->_partitions[_id]; + } + void operator->() && = delete; + + /** + * Returns a reference to the structure in this partition. + */ + AssociativeContainer& operator*() const& { + return this->_partitioned->_partitions[_id]; + } + void operator*() && = delete; + + private: + friend class Partitioned; + + /** + * Acquires locks for the ith partition. `partitionedAssociativeContainer` must outlive + * this GuardedPartition. If a single thread needs access to multiple partitions, it must + * use GuardedAssociativeContainer, or acquire them in ascending order. + */ + OnePartition(Partitioned& partitioned, PartitionId partitionId) + : _partitionLock(partitioned._mutexes[partitionId]), + _partitioned(&partitioned), + _id(partitionId) {} + + stdx::unique_lock<stdx::mutex> _partitionLock; + Partitioned* _partitioned; + PartitionId _id; + }; + + Partitioned(const Partitioned&) = delete; + Partitioned(Partitioned&&) = default; + Partitioned& operator=(const Partitioned&) = delete; + Partitioned& operator=(Partitioned&&) = default; + ~Partitioned() = default; + + /** + * Returns true if each partition is empty. Locks the all partitions to perform this check, but + * insertions can occur as soon as this method returns. + */ + bool empty() const { + const auto all = partitioned_detail::lockAllPartitions(this->_mutexes); + return std::all_of(this->_partitions.begin(), + this->_partitions.end(), + [](auto&& partition) { return partition.empty(); }); + } + + /** + * Returns the number of elements in all partitions, summed together. Locks all partitions to + * do this computation, but the size can change as soon as this method returns. + */ + std::size_t size() const { + const auto all = partitioned_detail::lockAllPartitions(this->_mutexes); + return std::accumulate( + this->_partitions.begin(), + this->_partitions.end(), + std::size_t{0}, + [](auto&& total, auto&& partition) { return total + partition.size(); }); + } + + /** + * Returns the number of entries with the given key. Acquires locks for only the partition + * determined by that key. + */ + std::size_t count(const key_type& key) & { + auto partition = this->lockOnePartition(key); + return partition->count(key); + } + void count(const key_type&) && = delete; + + /** + * Empties all partitions. + */ + void clear() { + auto all = this->lockAllPartitions(); + all.clear(); + } + + /** + * Inserts a single value into the partitioned structure. Locks a single partition determined by + * the value itself. Will not lock any partitions besides the one inserted into. + */ + void insert(const value_type value) & { + auto partition = this->lockOnePartitionById( + KeyPartitioner()(partitioned_detail::getKey(value), nPartitions)); + partition->insert(std::move(value)); + } + void insert(const value_type) && = delete; + + /** + * Erases one entry from the partitioned structure. Locks only the partition given by the key. + * Returns the number of entries removed. + */ + std::size_t erase(const key_type& key) & { + auto partition = this->lockOnePartition(key); + return partition->erase(key); + } + void erase(const key_type&) && = delete; + + All lockAllPartitions() & { + return All{*this}; + } + + OnePartition lockOnePartition(const key_type key) & { + return OnePartition{*this, KeyPartitioner()(key, nPartitions)}; + } + + OnePartition lockOnePartitionById(PartitionId id) & { + return OnePartition{*this, id}; + } + + /** + * Constructs a partitioned version of a AssociativeContainer, with `nPartitions` partitions. + */ + Partitioned() : _mutexes(nPartitions), _partitions(nPartitions) {} + +private: + // These two vectors parallel each other, but we keep them separate so that we can return an + // iterator over `_partitions` from within All. + mutable std::vector<stdx::mutex> _mutexes; + std::vector<AssociativeContainer> _partitions; +}; +} // namespace mongo diff --git a/src/mongo/db/catalog/util/partitioned_test.cpp b/src/mongo/db/catalog/util/partitioned_test.cpp new file mode 100644 index 00000000000..e37988d4d83 --- /dev/null +++ b/src/mongo/db/catalog/util/partitioned_test.cpp @@ -0,0 +1,273 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <set> + +#include "mongo/db/catalog/util/partitioned.h" +#include "mongo/stdx/thread.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +const std::size_t nPartitions = 3; +using PartitionedIntSet = Partitioned<std::set<std::size_t>, nPartitions>; + +TEST(Partitioned, DefaultConstructedPartitionedShouldBeEmpty) { + PartitionedIntSet test; + ASSERT_TRUE(test.empty()); +} + +TEST(Partitioned, InsertionShouldModifySize) { + PartitionedIntSet test; + test.insert(4); + ASSERT_EQ(test.size(), 1UL); + ASSERT_EQ(test.count(4), 1UL); + ASSERT_FALSE(test.empty()); +} + +TEST(Partitioned, DuplicateInsertionShouldNotModifySize) { + PartitionedIntSet test; + test.insert(4); + test.insert(4); + ASSERT_EQ(test.size(), 1UL); + ASSERT_EQ(test.count(4), 1UL); +} + +TEST(Partitioned, ClearShouldResetSizeToZero) { + PartitionedIntSet test; + test.insert(0); + test.insert(1); + test.insert(2); + ASSERT_EQ(test.size(), 3UL); + ASSERT_EQ(test.count(0), 1UL); + ASSERT_EQ(test.count(1), 1UL); + ASSERT_EQ(test.count(2), 1UL); + test.clear(); + ASSERT_EQ(test.size(), 0UL); + ASSERT_TRUE(test.empty()); +} + +TEST(Partitioned, ErasingEntryShouldModifySize) { + PartitionedIntSet test; + test.insert(0); + test.insert(1); + test.insert(2); + ASSERT_EQ(test.size(), 3UL); + ASSERT_EQ(test.count(0), 1UL); + ASSERT_EQ(1UL, test.erase(0)); + ASSERT_EQ(test.count(0), 0UL); + ASSERT_EQ(test.size(), 2UL); +} + +TEST(Partitioned, ErasingEntryThatDoesNotExistShouldNotModifySize) { + PartitionedIntSet test; + test.insert(0); + test.insert(1); + test.insert(2); + ASSERT_EQ(test.size(), 3UL); + ASSERT_EQ(test.count(5), 0UL); + ASSERT_EQ(0UL, test.erase(5)); + ASSERT_EQ(test.count(5), 0UL); + ASSERT_EQ(test.size(), 3UL); +} + +TEST(PartitionedAll, DefaultConstructedPartitionedShouldBeEmpty) { + PartitionedIntSet test; + auto all = test.lockAllPartitions(); + ASSERT_TRUE(all.empty()); +} + +TEST(PartitionedAll, InsertionShouldModifySize) { + PartitionedIntSet test; + auto all = test.lockAllPartitions(); + all.insert(4); + ASSERT_EQ(all.size(), 1UL); + ASSERT_EQ(all.count(4), 1UL); + ASSERT_FALSE(all.empty()); +} + +TEST(PartitionedAll, DuplicateInsertionShouldNotModifySize) { + PartitionedIntSet test; + auto all = test.lockAllPartitions(); + all.insert(4); + all.insert(4); + ASSERT_EQ(all.size(), 1UL); + ASSERT_EQ(all.count(4), 1UL); +} + +TEST(PartitionedAll, ClearShouldResetSizeToZero) { + PartitionedIntSet test; + auto all = test.lockAllPartitions(); + all.insert(0); + all.insert(1); + all.insert(2); + ASSERT_EQ(all.size(), 3UL); + ASSERT_EQ(all.count(0), 1UL); + ASSERT_EQ(all.count(1), 1UL); + ASSERT_EQ(all.count(2), 1UL); + all.clear(); + ASSERT_EQ(all.size(), 0UL); + ASSERT_TRUE(all.empty()); +} + +TEST(PartitionedAll, ErasingEntryShouldModifySize) { + PartitionedIntSet test; + auto all = test.lockAllPartitions(); + all.insert(0); + all.insert(1); + all.insert(2); + ASSERT_EQ(all.size(), 3UL); + ASSERT_EQ(all.count(0), 1UL); + ASSERT_EQ(1UL, all.erase(0)); + ASSERT_EQ(all.count(0), 0UL); + ASSERT_EQ(all.size(), 2UL); +} + +TEST(PartitionedAll, ErasingEntryThatDoesNotExistShouldNotModifySize) { + PartitionedIntSet test; + auto all = test.lockAllPartitions(); + all.insert(0); + all.insert(1); + all.insert(2); + ASSERT_EQ(all.size(), 3UL); + ASSERT_EQ(all.count(5), 0UL); + ASSERT_EQ(0UL, all.erase(5)); + ASSERT_EQ(all.count(5), 0UL); + ASSERT_EQ(all.size(), 3UL); +} + +TEST(PartitionedConcurrency, ShouldBeAbleToGuardSeparatePartitionsSimultaneously) { + PartitionedIntSet test; + { + auto zeroth = test.lockOnePartition(0); + auto first = test.lockOnePartition(1); + } +} + +TEST(PartitionedConcurrency, ModificationsFromOnePartitionShouldBeVisible) { + PartitionedIntSet test; + { + auto zeroth = test.lockOnePartition(0); + zeroth->insert(0); + } + + // Make sure a All can see the modifications. + { + auto all = test.lockAllPartitions(); + ASSERT_EQ(1UL, all.size()); + } + + // Make sure a OnePartition can see the modifications. + { + auto guardedPartition = test.lockOnePartition(0); + ASSERT_EQ(1UL, guardedPartition->count(0)); + } +} + +TEST(PartitionedConcurrency, ModificationsFromAllShouldBeVisible) { + PartitionedIntSet test; + { + auto all = test.lockAllPartitions(); + all.insert(0); + all.insert(1); + all.insert(2); + for (auto&& partition : all) { + ASSERT_EQ(1UL, partition.size()); + } + } + + // Make sure a OnePartition can see the modifications. + { + auto zeroth = test.lockOnePartition(0); + ASSERT_EQ(1UL, zeroth->count(0)); + } + { + auto first = test.lockOnePartition(1); + ASSERT_EQ(1UL, first->count(1)); + } + { + auto second = test.lockOnePartition(2); + ASSERT_EQ(1UL, second->count(2)); + } + + // Make sure a All can see the modifications. + { + auto all = test.lockAllPartitions(); + ASSERT_EQ(3UL, all.size()); + } +} + +TEST(PartitionedConcurrency, ShouldProtectConcurrentAccesses) { + PartitionedIntSet test; + + // 4 threads will be accessing each partition. + const size_t numThreads = nPartitions * 4; + std::vector<stdx::thread> threads; + const size_t opsPerThread = 1000; + + AtomicUInt32 ready{0}; + for (size_t threadId = 1; threadId <= numThreads; ++threadId) { + auto workerThreadBody = [&, threadId, opsPerThread]() { + + // Busy-wait until everybody is ready + ready.fetchAndAdd(1); + while (ready.load() < numThreads) { + } + + for (size_t op = 0; op < opsPerThread; ++op) { + size_t partitionId = threadId % nPartitions; + size_t uniqueVal = (nPartitions * opsPerThread * threadId + op) + partitionId; + if (op % 3 == 0) { + auto all = test.lockAllPartitions(); + all.insert(uniqueVal); + } else if (op % 3 == 1) { + auto partition = test.lockOnePartition(partitionId); + partition->insert(uniqueVal); + } else if (op % 3 == 2) { + test.insert(uniqueVal); + } + } + }; + + threads.emplace_back(workerThreadBody); + } + for (auto& thread : threads) + thread.join(); + + // Make sure each insert was successful. + for (std::size_t partitionId = 0; partitionId < nPartitions; ++partitionId) { + auto partition = test.lockOnePartition(std::size_t{partitionId}); + ASSERT_EQ(opsPerThread * 4, partition->size()); + } + ASSERT_EQ(numThreads * opsPerThread, test.size()); +} +} // namespace +} // namespace mongo diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 9aa3398d330..d2f216b01e1 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -196,6 +196,10 @@ public: */ static long long totalOpen(); + friend std::size_t partitionOf(const ClientCursor* cursor) { + return cursor->cursorid(); + } + private: friend class CursorManager; friend class ClientCursorPin; diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index 135c6026137..2789e660b76 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -197,8 +197,7 @@ PlanExecutor::PlanExecutor(OperationContext* opCtx, if (collection) { _nss = collection->ns(); if (_yieldPolicy->canReleaseLocksDuringExecution()) { - collection->getCursorManager()->registerExecutor(this); - _registered = true; + _registrationToken = collection->getCursorManager()->registerExecutor(this); } } else { invariant(_cq); @@ -543,7 +542,7 @@ void PlanExecutor::dispose(OperationContext* opCtx, CursorManager* cursorManager // caller of markAsKilled() will have done that already, and the CursorManager may no longer // exist. Note that the caller's collection lock prevents us from being marked as killed during // this method, since any interruption event requires a lock in at least MODE_IX. - if (cursorManager && _registered && !isMarkedAsKilled()) { + if (cursorManager && _registrationToken && !isMarkedAsKilled()) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS)); cursorManager->deregisterExecutor(this); } diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index edefc56465a..48fed0eab0b 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -32,9 +32,11 @@ #include <queue> #include "mongo/base/status.h" +#include "mongo/db/catalog/util/partitioned.h" #include "mongo/db/invalidation_type.h" #include "mongo/db/query/query_solution.h" #include "mongo/db/storage/snapshot.h" +#include "mongo/platform/unordered_set.h" namespace mongo { @@ -399,12 +401,23 @@ public: * 'non-cached PlanExecutor'. */ void unsetRegistered() { - _registered = false; + _registrationToken.reset(); + } + + boost::optional<Partitioned<unordered_set<PlanExecutor*>>::PartitionId> getRegistrationToken() + const& { + return _registrationToken; + } + void getRegistrationToken() && = delete; + + void setRegistrationToken(Partitioned<unordered_set<PlanExecutor*>>::PartitionId token) & { + invariant(!_registrationToken); + _registrationToken = token; } bool isMarkedAsKilled() const { return static_cast<bool>(_killReason); - }; + } const std::string& getKillReason() { invariant(isMarkedAsKilled()); @@ -490,9 +503,8 @@ private: enum { kUsable, kSaved, kDetached, kDisposed } _currentState = kUsable; - // Set to true if this PlanExecutor is registered with the CursorManager as a 'non-cached - // PlanExecutor' to receive invalidations. - bool _registered = false; + // Set if this PlanExecutor is registered with the CursorManager. + boost::optional<Partitioned<unordered_set<PlanExecutor*>>::PartitionId> _registrationToken; bool _everDetachedFromOperationContext = false; }; diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp index eae94dbb7ca..101aa23f552 100644 --- a/src/mongo/dbtests/querytests.cpp +++ b/src/mongo/dbtests/querytests.cpp @@ -50,10 +50,10 @@ #include "mongo/dbtests/dbtests.h" #include "mongo/util/timer.h" +namespace { namespace QueryTests { using std::unique_ptr; -using std::cout; using std::endl; using std::string; using std::vector; @@ -1188,7 +1188,6 @@ public: while (cursor->more()) { BSONObj o = cursor->next(); verify(o.valid(BSONVersion::kLatest)); - // cout << " foo " << o << endl; } } void run() { @@ -1406,7 +1405,7 @@ public: fast = t.micros(); } - cout << "HelperTest slow:" << slow << " fast:" << fast << endl; + std::cout << "HelperTest slow:" << slow << " fast:" << fast << endl; } }; @@ -1449,7 +1448,6 @@ public: FindingStart() : CollectionBase("findingstart") {} void run() { - cout << "1 SFDSDF" << endl; BSONObj info; ASSERT(_client.runCommand("unittests", BSON("create" @@ -1488,7 +1486,6 @@ public: ASSERT(!next["ts"].eoo()); ASSERT_EQUALS((j > min ? j : min), next["ts"].numberInt()); } - cout << k << endl; } } }; @@ -1498,7 +1495,6 @@ public: FindingStartPartiallyFull() : CollectionBase("findingstart") {} void run() { - cout << "2 ;kljsdf" << endl; size_t startNumCursors = numCursorsOpen(); BSONObj info; @@ -1529,7 +1525,6 @@ public: ASSERT(!next["ts"].eoo()); ASSERT_EQUALS((j > min ? j : min), next["ts"].numberInt()); } - cout << k << endl; } ASSERT_EQUALS(startNumCursors, numCursorsOpen()); @@ -1545,7 +1540,6 @@ public: FindingStartStale() : CollectionBase("findingstart") {} void run() { - cout << "3 xcxcv" << endl; size_t startNumCursors = numCursorsOpen(); // Check OplogReplay mode with missing collection. @@ -1763,3 +1757,4 @@ public: SuiteInstance<All> myall; } // namespace QueryTests +} // namespace |