summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-04-18 18:05:25 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-05-26 10:20:26 -0400
commitfe2046a0eb715cec4d978454458f79b7588ee7a7 (patch)
tree50311590ddec96f9487dc4069891700cb4b9d23c
parent58486605c09672b3bfce8608dca403a145413bba (diff)
downloadmongo-fe2046a0eb715cec4d978454458f79b7588ee7a7.tar.gz
SERVER-21754 Partition CursorManager's structures
-rw-r--r--src/mongo/db/catalog/SConscript9
-rw-r--r--src/mongo/db/catalog/cursor_manager.cpp227
-rw-r--r--src/mongo/db/catalog/cursor_manager.h61
-rw-r--r--src/mongo/db/catalog/util/SConscript14
-rw-r--r--src/mongo/db/catalog/util/partitioned.h386
-rw-r--r--src/mongo/db/catalog/util/partitioned_test.cpp273
-rw-r--r--src/mongo/db/clientcursor.h4
-rw-r--r--src/mongo/db/query/plan_executor.cpp5
-rw-r--r--src/mongo/db/query/plan_executor.h22
-rw-r--r--src/mongo/dbtests/querytests.cpp11
10 files changed, 875 insertions, 137 deletions
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript
index 271301d4d5f..5e37439c403 100644
--- a/src/mongo/db/catalog/SConscript
+++ b/src/mongo/db/catalog/SConscript
@@ -4,6 +4,15 @@ Import("env")
env = env.Clone()
+env.SConscript(
+ dirs=[
+ 'util',
+ ],
+ exports=[
+ 'env',
+ ],
+)
+
env.Library(
target='collection_options',
source=[
diff --git a/src/mongo/db/catalog/cursor_manager.cpp b/src/mongo/db/catalog/cursor_manager.cpp
index ef73ab4a342..918134b057d 100644
--- a/src/mongo/db/catalog/cursor_manager.cpp
+++ b/src/mongo/db/catalog/cursor_manager.cpp
@@ -46,11 +46,11 @@
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/random.h"
+#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
#include "mongo/util/startup_test.h"
namespace mongo {
-
using std::vector;
constexpr Minutes CursorManager::kDefaultCursorTimeoutMinutes;
@@ -60,6 +60,8 @@ MONGO_EXPORT_SERVER_PARAMETER(
int,
durationCount<Milliseconds>(CursorManager::kDefaultCursorTimeoutMinutes));
+constexpr int CursorManager::kNumPartitions;
+
namespace {
uint32_t idFromCursorId(CursorId id) {
uint64_t x = static_cast<uint64_t>(id);
@@ -226,7 +228,7 @@ bool GlobalCursorIdCache::eraseCursor(OperationContext* opCtx, CursorId id, bool
}
Status eraseStatus = collection->getCursorManager()->eraseCursor(opCtx, id, checkAuth);
- massert(16089,
+ uassert(16089,
eraseStatus.reason(),
eraseStatus.code() == ErrorCodes::OK ||
eraseStatus.code() == ErrorCodes::CursorNotFound);
@@ -298,21 +300,29 @@ bool CursorManager::eraseCursorGlobal(OperationContext* opCtx, CursorId id) {
// --------------------------
-
-CursorManager::CursorManager(NamespaceString nss) : _nss(std::move(nss)) {
- if (!isGlobalManager()) {
- // Generate a unique id for this collection.
- _collectionCacheRuntimeId = globalCursorIdCache->registerCursorManager(_nss);
- }
- _random.reset(new PseudoRandom(globalCursorIdCache->nextSeed()));
+std::size_t CursorManager::PlanExecutorPartitioner::operator()(const PlanExecutor* exec,
+ const std::size_t nPartitions) {
+ auto token = exec->getRegistrationToken();
+ invariant(token);
+ return (*token) % nPartitions;
}
+CursorManager::CursorManager(NamespaceString nss)
+ : _nss(std::move(nss)),
+ _collectionCacheRuntimeId(_nss.isEmpty() ? 0
+ : globalCursorIdCache->registerCursorManager(_nss)),
+ _random(stdx::make_unique<PseudoRandom>(globalCursorIdCache->nextSeed())),
+ _registeredPlanExecutors(),
+ _cursorMap(stdx::make_unique<Partitioned<unordered_map<CursorId, ClientCursor*>>>()) {}
+
CursorManager::~CursorManager() {
+ // All cursors and PlanExecutors should have been deleted already.
+ invariant(_registeredPlanExecutors.empty());
+ invariant(_cursorMap->empty());
+
if (!isGlobalManager()) {
globalCursorIdCache->deregisterCursorManager(_collectionCacheRuntimeId, _nss);
}
- invariant(_cursors.empty());
- invariant(_nonCachedExecutors.empty());
}
void CursorManager::invalidateAll(OperationContext* opCtx,
@@ -320,43 +330,49 @@ void CursorManager::invalidateAll(OperationContext* opCtx,
const std::string& reason) {
invariant(!isGlobalManager()); // The global cursor manager should never need to kill cursors.
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X));
-
- stdx::lock_guard<SimpleMutex> lk(_mutex);
fassert(28819, !BackgroundOperation::inProgForNs(_nss));
-
- for (auto&& exec : _nonCachedExecutors) {
- // We kill the executor, but it deletes itself.
- exec->markAsKilled(reason);
- }
- _nonCachedExecutors.clear();
-
- CursorMap newMap;
- for (auto&& entry : _cursors) {
- auto* cursor = entry.second;
- cursor->markAsKilled(reason);
-
- if (cursor->_isPinned) {
- // There is an active user of this cursor, who is now responsible for cleaning it up.
- // This CursorManager will no longer track this cursor.
- continue;
+ auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions();
+ for (auto&& partition : allExecPartitions) {
+ for (auto&& exec : partition) {
+ // The PlanExecutor is owned elsewhere, so we just mark it as killed and let it be
+ // cleaned up later.
+ exec->markAsKilled(reason);
}
-
- if (!collectionGoingAway) {
- // We keep around unpinned cursors so that future attempts to use the cursor will result
- // in a useful error message.
- newMap.insert(entry);
- } else {
- // The collection is going away, so there's no point in keeping any state.
- cursor->dispose(opCtx);
- delete cursor;
+ }
+ allExecPartitions.clear();
+
+ // Mark all cursors as killed, but keep around those we can in order to provide a useful error
+ // message to the user when they attempt to use it next time.
+ auto allCurrentPartitions = _cursorMap->lockAllPartitions();
+ for (auto&& partition : allCurrentPartitions) {
+ for (auto it = partition.begin(); it != partition.end();) {
+ auto* cursor = it->second;
+ cursor->markAsKilled(reason);
+
+ // If pinned, there is an active user of this cursor, who is now responsible for
+ // cleaning it up. Otherwise, we can immediately dispose of it.
+ if (cursor->_isPinned) {
+ it = partition.erase(it);
+ continue;
+ }
+
+ if (!collectionGoingAway) {
+ // We keep around unpinned cursors so that future attempts to use the cursor will
+ // result in a useful error message.
+ ++it;
+ } else {
+ cursor->dispose(opCtx);
+ delete cursor;
+ it = partition.erase(it);
+ }
}
}
- _cursors = newMap;
}
void CursorManager::invalidateDocument(OperationContext* opCtx,
const RecordId& dl,
InvalidationType type) {
+ dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
invariant(!isGlobalManager()); // The global cursor manager should never receive invalidations.
if (supportsDocLocking()) {
// If a storage engine supports doc locking, then we do not need to invalidate.
@@ -364,15 +380,19 @@ void CursorManager::invalidateDocument(OperationContext* opCtx,
return;
}
- stdx::lock_guard<SimpleMutex> lk(_mutex);
-
- for (ExecSet::iterator it = _nonCachedExecutors.begin(); it != _nonCachedExecutors.end();
- ++it) {
- (*it)->invalidate(opCtx, dl, type);
+ auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions();
+ for (auto&& partition : allExecPartitions) {
+ for (auto&& exec : partition) {
+ exec->invalidate(opCtx, dl, type);
+ }
}
- for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) {
- i->second->getExecutor()->invalidate(opCtx, dl, type);
+ auto allPartitions = _cursorMap->lockAllPartitions();
+ for (auto&& partition : allPartitions) {
+ for (auto&& entry : partition) {
+ auto exec = entry.second->getExecutor();
+ exec->invalidate(opCtx, dl, type);
+ }
}
}
@@ -384,42 +404,48 @@ bool CursorManager::cursorShouldTimeout_inlock(const ClientCursor* cursor, Date_
}
std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, Date_t now) {
- vector<ClientCursor*> toDelete;
-
- stdx::lock_guard<SimpleMutex> lk(_mutex);
-
- for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) {
- ClientCursor* cc = i->second;
- if (cursorShouldTimeout_inlock(cc, now))
- toDelete.push_back(cc);
- }
-
- // Properly dispose of each cursor that was timed out.
- for (vector<ClientCursor*>::const_iterator i = toDelete.begin(); i != toDelete.end(); ++i) {
- ClientCursor* cc = *i;
- _deregisterCursor_inlock(cc);
- cc->dispose(opCtx);
- delete cc;
+ std::vector<std::unique_ptr<ClientCursor, ClientCursor::Deleter>> toDelete;
+
+ for (size_t partitionId = 0; partitionId < kNumPartitions; ++partitionId) {
+ auto lockedPartition = _cursorMap->lockOnePartitionById(partitionId);
+ for (auto it = lockedPartition->begin(); it != lockedPartition->end();) {
+ auto* cursor = it->second;
+ if (cursorShouldTimeout_inlock(cursor, now)) {
+ // Dispose of the cursor and remove it from the partition.
+ cursor->dispose(opCtx);
+ toDelete.push_back(std::unique_ptr<ClientCursor, ClientCursor::Deleter>{cursor});
+ it = lockedPartition->erase(it);
+ } else {
+ ++it;
+ }
+ }
}
return toDelete.size();
}
-void CursorManager::registerExecutor(PlanExecutor* exec) {
- stdx::lock_guard<SimpleMutex> lk(_mutex);
- const std::pair<ExecSet::iterator, bool> result = _nonCachedExecutors.insert(exec);
- invariant(result.second); // make sure this was inserted
+namespace {
+static AtomicUInt32 registeredPlanExecutorId;
+} // namespace
+
+Partitioned<unordered_set<PlanExecutor*>>::PartitionId CursorManager::registerExecutor(
+ PlanExecutor* exec) {
+ auto partitionId = registeredPlanExecutorId.fetchAndAdd(1);
+ exec->setRegistrationToken(partitionId);
+ _registeredPlanExecutors.insert(exec);
+ return partitionId;
}
void CursorManager::deregisterExecutor(PlanExecutor* exec) {
- stdx::lock_guard<SimpleMutex> lk(_mutex);
- _nonCachedExecutors.erase(exec);
+ if (auto partitionId = exec->getRegistrationToken()) {
+ _registeredPlanExecutors.erase(exec);
+ }
}
StatusWith<ClientCursorPin> CursorManager::pinCursor(OperationContext* opCtx, CursorId id) {
- stdx::lock_guard<SimpleMutex> lk(_mutex);
- CursorMap::const_iterator it = _cursors.find(id);
- if (it == _cursors.end()) {
+ auto lockedPartition = _cursorMap->lockOnePartition(id);
+ auto it = lockedPartition->find(id);
+ if (it == lockedPartition->end()) {
return {ErrorCodes::CursorNotFound, str::stream() << "cursor id " << id << " not found"};
}
@@ -430,7 +456,7 @@ StatusWith<ClientCursorPin> CursorManager::pinCursor(OperationContext* opCtx, Cu
Status error{ErrorCodes::QueryPlanKilled,
str::stream() << "cursor killed because: "
<< cursor->getExecutor()->getKillReason()};
- _deregisterCursor_inlock(cursor);
+ lockedPartition->erase(cursor->cursorid());
cursor->dispose(opCtx);
delete cursor;
return error;
@@ -443,28 +469,26 @@ void CursorManager::unpin(OperationContext* opCtx, ClientCursor* cursor) {
// Avoid computing the current time within the critical section.
auto now = opCtx->getServiceContext()->getPreciseClockSource()->now();
- stdx::lock_guard<SimpleMutex> lk(_mutex);
-
+ auto partitionLock = _cursorMap->lockOnePartition(cursor->cursorid());
invariant(cursor->_isPinned);
cursor->_isPinned = false;
cursor->_lastUseDate = now;
}
void CursorManager::getCursorIds(std::set<CursorId>* openCursors) const {
- stdx::lock_guard<SimpleMutex> lk(_mutex);
-
- for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) {
- ClientCursor* cc = i->second;
- openCursors->insert(cc->cursorid());
+ auto allPartitions = _cursorMap->lockAllPartitions();
+ for (auto&& partition : allPartitions) {
+ for (auto&& entry : partition) {
+ openCursors->insert(entry.first);
+ }
}
}
size_t CursorManager::numCursors() const {
- stdx::lock_guard<SimpleMutex> lk(_mutex);
- return _cursors.size();
+ return _cursorMap->size();
}
-CursorId CursorManager::_allocateCursorId_inlock() {
+CursorId CursorManager::allocateCursorId_inlock() {
for (int i = 0; i < 10000; i++) {
// The leading two bits of a CursorId are used to determine if the cursor is registered on
// the global cursor manager.
@@ -481,7 +505,8 @@ CursorId CursorManager::_allocateCursorId_inlock() {
uint32_t myPart = static_cast<uint32_t>(_random->nextInt32());
id = cursorIdFromParts(_collectionCacheRuntimeId, myPart);
}
- if (_cursors.count(id) == 0)
+ auto partition = _cursorMap->lockOnePartition(id);
+ if (partition->count(id) == 0)
return id;
}
fassertFailed(17360);
@@ -492,42 +517,41 @@ ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx,
// Avoid computing the current time within the critical section.
auto now = opCtx->getServiceContext()->getPreciseClockSource()->now();
- stdx::lock_guard<SimpleMutex> lk(_mutex);
// Make sure the PlanExecutor isn't registered, since we will register the ClientCursor wrapping
// it.
invariant(cursorParams.exec);
- _nonCachedExecutors.erase(cursorParams.exec.get());
+ deregisterExecutor(cursorParams.exec.get());
cursorParams.exec.get_deleter().dismissDisposal();
cursorParams.exec->unsetRegistered();
- CursorId cursorId = _allocateCursorId_inlock();
- invariant(cursorId);
+ // Note we must hold the registration lock from now until insertion into '_cursorMap' to ensure
+ // we don't insert two cursors with the same cursor id.
+ stdx::lock_guard<SimpleMutex> lock(_registrationLock);
+ CursorId cursorId = allocateCursorId_inlock();
std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor(
new ClientCursor(std::move(cursorParams), this, cursorId, now));
- // Transfer ownership of the cursor to '_cursors'.
+ // Transfer ownership of the cursor to '_cursorMap'.
+ auto partition = _cursorMap->lockOnePartition(cursorId);
ClientCursor* unownedCursor = clientCursor.release();
- _cursors[cursorId] = unownedCursor;
+ partition->emplace(cursorId, unownedCursor);
return ClientCursorPin(opCtx, unownedCursor);
}
void CursorManager::deregisterCursor(ClientCursor* cc) {
- stdx::lock_guard<SimpleMutex> lk(_mutex);
- _deregisterCursor_inlock(cc);
+ _cursorMap->erase(cc->cursorid());
}
Status CursorManager::eraseCursor(OperationContext* opCtx, CursorId id, bool shouldAudit) {
- stdx::lock_guard<SimpleMutex> lk(_mutex);
-
- CursorMap::iterator it = _cursors.find(id);
- if (it == _cursors.end()) {
+ auto lockedPartition = _cursorMap->lockOnePartition(id);
+ auto it = lockedPartition->find(id);
+ if (it == lockedPartition->end()) {
if (shouldAudit) {
audit::logKillCursorsAuthzCheck(
opCtx->getClient(), _nss, id, ErrorCodes::CursorNotFound);
}
return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id};
}
-
auto cursor = it->second;
if (cursor->_isPinned) {
@@ -537,20 +561,15 @@ Status CursorManager::eraseCursor(OperationContext* opCtx, CursorId id, bool sho
}
return {ErrorCodes::OperationFailed, str::stream() << "Cannot kill pinned cursor: " << id};
}
+ std::unique_ptr<ClientCursor, ClientCursor::Deleter> ownedCursor(cursor);
if (shouldAudit) {
audit::logKillCursorsAuthzCheck(opCtx->getClient(), _nss, id, ErrorCodes::OK);
}
- _deregisterCursor_inlock(cursor);
- cursor->dispose(opCtx);
- delete cursor;
+ lockedPartition->erase(ownedCursor->cursorid());
+ ownedCursor->dispose(opCtx);
return Status::OK();
}
-void CursorManager::_deregisterCursor_inlock(ClientCursor* cc) {
- invariant(cc);
- CursorId id = cc->cursorid();
- _cursors.erase(id);
-}
} // namespace mongo
diff --git a/src/mongo/db/catalog/cursor_manager.h b/src/mongo/db/catalog/cursor_manager.h
index 59b8c5fbf17..70fce8f3804 100644
--- a/src/mongo/db/catalog/cursor_manager.h
+++ b/src/mongo/db/catalog/cursor_manager.h
@@ -30,11 +30,13 @@
#pragma once
-
+#include "mongo/db/catalog/util/partitioned.h"
#include "mongo/db/clientcursor.h"
+#include "mongo/db/cursor_id.h"
#include "mongo/db/invalidation_type.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/record_id.h"
+#include "mongo/platform/unordered_map.h"
#include "mongo/platform/unordered_set.h"
#include "mongo/util/concurrency/mutex.h"
#include "mongo/util/duration.h"
@@ -76,6 +78,7 @@ class CursorManager {
public:
// The number of minutes a cursor is allowed to be idle before timing out.
static constexpr Minutes kDefaultCursorTimeoutMinutes{10};
+ using RegistrationToken = Partitioned<unordered_set<PlanExecutor*>>::PartitionId;
CursorManager(NamespaceString nss);
@@ -114,11 +117,12 @@ public:
std::size_t timeoutCursors(OperationContext* opCtx, Date_t now);
/**
- * Register an executor so that it can be notified of deletion/invalidation during yields.
- * Must be called before an executor yields. If an executor is registered inside a
- * ClientCursor it must not be itself registered; the two are mutually exclusive.
+ * Register an executor so that it can be notified of deletions, invalidations, collection
+ * drops, or the like during yields. Must be called before an executor yields. Registration
+ * happens automatically for yielding PlanExecutors, so this should only be called by a
+ * PlanExecutor itself. Returns a token that must be stored for use during deregistration.
*/
- void registerExecutor(PlanExecutor* exec);
+ Partitioned<unordered_set<PlanExecutor*>>::PartitionId registerExecutor(PlanExecutor* exec);
/**
* Remove an executor from the registry. It is legal to call this even if 'exec' is not
@@ -188,10 +192,16 @@ public:
static std::size_t timeoutCursorsGlobal(OperationContext* opCtx, Date_t now);
private:
+ static constexpr int kNumPartitions = 16;
friend class ClientCursorPin;
- CursorId _allocateCursorId_inlock();
- void _deregisterCursor_inlock(ClientCursor* cc);
+ struct PlanExecutorPartitioner {
+ std::size_t operator()(const PlanExecutor* exec, std::size_t nPartitions);
+ };
+ CursorId allocateCursorId_inlock();
+
+ ClientCursorPin _registerCursor(
+ OperationContext* opCtx, std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor);
void deregisterCursor(ClientCursor* cc);
@@ -203,16 +213,33 @@ private:
return _nss.isEmpty();
}
- NamespaceString _nss;
- uint32_t _collectionCacheRuntimeId;
+ // No locks are needed to consult these data members.
+ const NamespaceString _nss;
+ const uint32_t _collectionCacheRuntimeId;
+
+ // A CursorManager holds a pointer to all open PlanExecutors and all open ClientCursors. All
+ // pointers to PlanExecutors are unowned, and a PlanExecutor will notify the CursorManager when
+ // it is being destroyed. ClientCursors are owned by the CursorManager, except when they are in
+ // use by a ClientCursorPin. When in use by a pin, an unowned pointer remains to ensure they
+ // still receive invalidations while in use.
+ //
+ // There are several mutexes at work to protect concurrent access to data structures managed by
+ // this cursor manager. The two registration data structures '_registeredPlanExecutors' and
+ // '_cursorMap' are partitioned to decrease contention, and each partition of the structure is
+ // protected by its own mutex. Separately, there is a '_registrationLock' which protects
+ // concurrent access to '_random' for cursor id generation, and must be held from cursor id
+ // generation until insertion into '_cursorMap'. If you ever need to acquire more than one of
+ // these mutexes at once, you must follow the following rules:
+ // - '_registrationLock' must be acquired first, if at all.
+ // - Mutex(es) for '_registeredPlanExecutors' must be acquired next.
+ // - Mutex(es) for '_cursorMap' must be acquired next.
+ // - If you need to access multiple partitions within '_registeredPlanExecutors' or '_cursorMap'
+ // at once, you must acquire the mutexes for those partitions in ascending order, or use the
+ // partition helpers to acquire mutexes for all partitions.
+ mutable SimpleMutex _registrationLock;
std::unique_ptr<PseudoRandom> _random;
-
- mutable SimpleMutex _mutex;
-
- typedef unordered_set<PlanExecutor*> ExecSet;
- ExecSet _nonCachedExecutors;
-
- typedef std::map<CursorId, ClientCursor*> CursorMap;
- CursorMap _cursors;
+ Partitioned<unordered_set<PlanExecutor*>, kNumPartitions, PlanExecutorPartitioner>
+ _registeredPlanExecutors;
+ std::unique_ptr<Partitioned<unordered_map<CursorId, ClientCursor*>, kNumPartitions>> _cursorMap;
};
} // namespace mongo
diff --git a/src/mongo/db/catalog/util/SConscript b/src/mongo/db/catalog/util/SConscript
new file mode 100644
index 00000000000..320ea3396cf
--- /dev/null
+++ b/src/mongo/db/catalog/util/SConscript
@@ -0,0 +1,14 @@
+# -*- mode: python; -*-
+
+Import("env")
+
+env = env.Clone()
+
+env.CppUnitTest(
+ target='partitioned_test',
+ source=[
+ 'partitioned_test.cpp'
+ ],
+ LIBDEPS=[
+ ]
+)
diff --git a/src/mongo/db/catalog/util/partitioned.h b/src/mongo/db/catalog/util/partitioned.h
new file mode 100644
index 00000000000..475ea415f48
--- /dev/null
+++ b/src/mongo/db/catalog/util/partitioned.h
@@ -0,0 +1,386 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <cstdlib>
+#include <iterator>
+#include <memory>
+#include <numeric>
+#include <utility>
+#include <vector>
+
+#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+inline std::size_t partitionOf(const char x, const std::size_t nPartitions) {
+ return static_cast<unsigned char>(x) % nPartitions;
+}
+inline std::size_t partitionOf(const unsigned char x, const std::size_t nPartitions) {
+ return x % nPartitions;
+}
+inline std::size_t partitionOf(const signed char x, const std::size_t nPartitions) {
+ return static_cast<unsigned char>(x) % nPartitions;
+}
+inline std::size_t partitionOf(const int x, const std::size_t nPartitions) {
+ return static_cast<unsigned int>(x) % nPartitions;
+}
+inline std::size_t partitionOf(const unsigned int x, const std::size_t nPartitions) {
+ return x % nPartitions;
+}
+inline std::size_t partitionOf(const short x, const std::size_t nPartitions) {
+ return static_cast<unsigned short>(x) % nPartitions;
+}
+inline std::size_t partitionOf(const unsigned short x, const std::size_t nPartitions) {
+ return x % nPartitions;
+}
+inline std::size_t partitionOf(const long x, const std::size_t nPartitions) {
+ return static_cast<unsigned long>(x) % nPartitions;
+}
+inline std::size_t partitionOf(const unsigned long x, const std::size_t nPartitions) {
+ return x % nPartitions;
+}
+inline std::size_t partitionOf(const long long x, const std::size_t nPartitions) {
+ return static_cast<unsigned long long>(x) % nPartitions;
+}
+inline std::size_t partitionOf(const unsigned long long x, const std::size_t nPartitions) {
+ return x % nPartitions;
+}
+inline std::size_t partitionOf(const wchar_t x, const std::size_t nPartitions) {
+ return x % nPartitions;
+}
+inline std::size_t partitionOf(const char16_t x, const std::size_t nPartitions) {
+ return x % nPartitions;
+}
+inline std::size_t partitionOf(const char32_t x, const std::size_t nPartitions) {
+ return x % nPartitions;
+}
+
+/**
+ * The default partitioning policy: If using a numeric built-in type, will use the lower bits of a
+ * number to decide which partition to assign it to. If using any other type T, you must define
+ * partitionOf(const T&, std::size_t) or specialize this template.
+ */
+template <typename T>
+struct Partitioner {
+ std::size_t operator()(const T& x, const std::size_t nPartitions) {
+ return partitionOf(x, nPartitions);
+ }
+};
+
+namespace partitioned_detail {
+
+template <typename Key, typename Value>
+Key getKey(const std::pair<Key, Value>& pair) {
+ return std::get<0>(pair);
+}
+
+template <typename Key>
+Key getKey(const Key& key) {
+ return key;
+}
+
+inline std::vector<stdx::unique_lock<stdx::mutex>> lockAllPartitions(
+ std::vector<stdx::mutex>& mutexes) {
+ std::vector<stdx::unique_lock<stdx::mutex>> result;
+ std::transform(mutexes.begin(), mutexes.end(), std::back_inserter(result), [](auto&& mutex) {
+ return stdx::unique_lock<stdx::mutex>{mutex};
+ });
+ return result;
+}
+} // namespace partitioned_detail
+
+/**
+ * A templated class used to partition an associative container like a set or a map to increase
+ * scalability. `AssociativeContainer` is a type like a std::map or std::set that meets the
+ * requirements of either the AssociativeContainer or UnorderedAssociativeContainer concept.
+ * `nPartitions` determines how many partitions to make. `Partitioner` can be provided to customize
+ * how the partition of each entry is computed.
+ */
+template <typename AssociativeContainer,
+ std::size_t nPartitions = 16,
+ typename KeyPartitioner = Partitioner<typename AssociativeContainer::key_type>>
+class Partitioned {
+private:
+ // Used to create an iterator representing the end of the partitioned structure.
+ struct IteratorEndTag {};
+
+public:
+ static_assert(nPartitions > 0, "cannot create partitioned structure with 0 partitions");
+ using value_type = typename AssociativeContainer::value_type;
+ using key_type = typename AssociativeContainer::key_type;
+ using PartitionId = std::size_t;
+
+ /**
+ * Used to protect access to all partitions of this partitioned associative structure. For
+ * example, may be used to empty each partition in the structure, or to provide a snapshotted
+ * count of the number of entries across all partitions.
+ */
+ class All {
+ private:
+ /**
+ * Acquires locks for all partitions. The lifetime of this `All` object must be shorter than
+ * that of `partitionedContainer`.
+ */
+ explicit All(Partitioned& partitionedContainer)
+ : _lockGuards(partitioned_detail::lockAllPartitions(partitionedContainer._mutexes)),
+ _partitionedContainer(&partitionedContainer) {}
+
+ public:
+ /**
+ * Returns an iterator at the start of the partitions.
+ */
+ auto begin() & {
+ return this->_partitionedContainer->_partitions.begin();
+ }
+
+ /**
+ * Returns an iterator at the end of the partitions.
+ */
+ auto end() & {
+ return this->_partitionedContainer->_partitions.end();
+ }
+
+ /**
+ * Returns an iterator at the start of the partitions.
+ */
+ auto begin() const& {
+ return this->_partitionedContainer->_partitions.begin();
+ }
+ void begin() && = delete;
+
+ /**
+ * Returns an iterator at the end of the partitions.
+ */
+ auto end() const& {
+ return this->_partitionedContainer->_partitions.end();
+ }
+ void end() && = delete;
+
+ /**
+ * Returns the number of elements in all partitions, summed together.
+ */
+ std::size_t size() const {
+ return std::accumulate(
+ this->_partitionedContainer->_partitions.begin(),
+ this->_partitionedContainer->_partitions.end(),
+ std::size_t{0},
+ [](auto&& total, auto&& partition) { return total + partition.size(); });
+ }
+
+ /**
+ * Returns true if each partition is empty.
+ */
+ bool empty() const {
+ return std::all_of(this->_partitionedContainer->_partitions.begin(),
+ this->_partitionedContainer->_partitions.end(),
+ [](auto&& partition) { return partition.empty(); });
+ }
+
+ /**
+ * Returns the number of entries with the given key.
+ */
+ std::size_t count(const key_type& key) const {
+ auto partitionId = KeyPartitioner()(key, nPartitions);
+ return this->_partitionedContainer->_partitions[partitionId].count(key);
+ }
+
+ /**
+ * Empties each container within each partition.
+ */
+ void clear() {
+ for (auto&& partition : this->_partitionedContainer->_partitions) {
+ partition.clear();
+ }
+ }
+
+ /**
+ * Inserts `value` into its designated partition.
+ */
+ void insert(value_type value) & {
+ const auto partitionId =
+ KeyPartitioner()(partitioned_detail::getKey(value), nPartitions);
+ this->_partitionedContainer->_partitions[partitionId].insert(std::move(value));
+ }
+ void insert(value_type)&& = delete;
+
+ /**
+ * Erases one entry from the partitioned structure, returns the number of entries removed.
+ */
+ std::size_t erase(const key_type& key) & {
+ const auto partitionId = KeyPartitioner()(key, nPartitions);
+ return this->_partitionedContainer->_partitions[partitionId].erase(key);
+ }
+ void erase(const key_type&) && = delete;
+
+ private:
+ friend class Partitioned;
+
+ std::vector<stdx::unique_lock<stdx::mutex>> _lockGuards;
+ Partitioned* _partitionedContainer;
+ };
+
+ /**
+ * Used to protect access to a single partition of a Partitioned. For example, can be used to do
+ * a series of reads and/or modifications to a single entry without interference from other
+ * threads.
+ */
+ class OnePartition {
+ public:
+ /**
+ * Returns a pointer to the structure in this partition.
+ */
+ AssociativeContainer* operator->() const& {
+ return &this->_partitioned->_partitions[_id];
+ }
+ void operator->() && = delete;
+
+ /**
+ * Returns a reference to the structure in this partition.
+ */
+ AssociativeContainer& operator*() const& {
+ return this->_partitioned->_partitions[_id];
+ }
+ void operator*() && = delete;
+
+ private:
+ friend class Partitioned;
+
+ /**
+ * Acquires locks for the ith partition. `partitionedAssociativeContainer` must outlive
+ * this GuardedPartition. If a single thread needs access to multiple partitions, it must
+ * use GuardedAssociativeContainer, or acquire them in ascending order.
+ */
+ OnePartition(Partitioned& partitioned, PartitionId partitionId)
+ : _partitionLock(partitioned._mutexes[partitionId]),
+ _partitioned(&partitioned),
+ _id(partitionId) {}
+
+ stdx::unique_lock<stdx::mutex> _partitionLock;
+ Partitioned* _partitioned;
+ PartitionId _id;
+ };
+
+ Partitioned(const Partitioned&) = delete;
+ Partitioned(Partitioned&&) = default;
+ Partitioned& operator=(const Partitioned&) = delete;
+ Partitioned& operator=(Partitioned&&) = default;
+ ~Partitioned() = default;
+
+ /**
+ * Returns true if each partition is empty. Locks the all partitions to perform this check, but
+ * insertions can occur as soon as this method returns.
+ */
+ bool empty() const {
+ const auto all = partitioned_detail::lockAllPartitions(this->_mutexes);
+ return std::all_of(this->_partitions.begin(),
+ this->_partitions.end(),
+ [](auto&& partition) { return partition.empty(); });
+ }
+
+ /**
+ * Returns the number of elements in all partitions, summed together. Locks all partitions to
+ * do this computation, but the size can change as soon as this method returns.
+ */
+ std::size_t size() const {
+ const auto all = partitioned_detail::lockAllPartitions(this->_mutexes);
+ return std::accumulate(
+ this->_partitions.begin(),
+ this->_partitions.end(),
+ std::size_t{0},
+ [](auto&& total, auto&& partition) { return total + partition.size(); });
+ }
+
+ /**
+ * Returns the number of entries with the given key. Acquires locks for only the partition
+ * determined by that key.
+ */
+ std::size_t count(const key_type& key) & {
+ auto partition = this->lockOnePartition(key);
+ return partition->count(key);
+ }
+ void count(const key_type&) && = delete;
+
+ /**
+ * Empties all partitions.
+ */
+ void clear() {
+ auto all = this->lockAllPartitions();
+ all.clear();
+ }
+
+ /**
+ * Inserts a single value into the partitioned structure. Locks a single partition determined by
+ * the value itself. Will not lock any partitions besides the one inserted into.
+ */
+ void insert(const value_type value) & {
+ auto partition = this->lockOnePartitionById(
+ KeyPartitioner()(partitioned_detail::getKey(value), nPartitions));
+ partition->insert(std::move(value));
+ }
+ void insert(const value_type) && = delete;
+
+ /**
+ * Erases one entry from the partitioned structure. Locks only the partition given by the key.
+ * Returns the number of entries removed.
+ */
+ std::size_t erase(const key_type& key) & {
+ auto partition = this->lockOnePartition(key);
+ return partition->erase(key);
+ }
+ void erase(const key_type&) && = delete;
+
+ All lockAllPartitions() & {
+ return All{*this};
+ }
+
+ OnePartition lockOnePartition(const key_type key) & {
+ return OnePartition{*this, KeyPartitioner()(key, nPartitions)};
+ }
+
+ OnePartition lockOnePartitionById(PartitionId id) & {
+ return OnePartition{*this, id};
+ }
+
+ /**
+ * Constructs a partitioned version of a AssociativeContainer, with `nPartitions` partitions.
+ */
+ Partitioned() : _mutexes(nPartitions), _partitions(nPartitions) {}
+
+private:
+ // These two vectors parallel each other, but we keep them separate so that we can return an
+ // iterator over `_partitions` from within All.
+ mutable std::vector<stdx::mutex> _mutexes;
+ std::vector<AssociativeContainer> _partitions;
+};
+} // namespace mongo
diff --git a/src/mongo/db/catalog/util/partitioned_test.cpp b/src/mongo/db/catalog/util/partitioned_test.cpp
new file mode 100644
index 00000000000..e37988d4d83
--- /dev/null
+++ b/src/mongo/db/catalog/util/partitioned_test.cpp
@@ -0,0 +1,273 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <set>
+
+#include "mongo/db/catalog/util/partitioned.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+const std::size_t nPartitions = 3;
+using PartitionedIntSet = Partitioned<std::set<std::size_t>, nPartitions>;
+
+TEST(Partitioned, DefaultConstructedPartitionedShouldBeEmpty) {
+ PartitionedIntSet test;
+ ASSERT_TRUE(test.empty());
+}
+
+TEST(Partitioned, InsertionShouldModifySize) {
+ PartitionedIntSet test;
+ test.insert(4);
+ ASSERT_EQ(test.size(), 1UL);
+ ASSERT_EQ(test.count(4), 1UL);
+ ASSERT_FALSE(test.empty());
+}
+
+TEST(Partitioned, DuplicateInsertionShouldNotModifySize) {
+ PartitionedIntSet test;
+ test.insert(4);
+ test.insert(4);
+ ASSERT_EQ(test.size(), 1UL);
+ ASSERT_EQ(test.count(4), 1UL);
+}
+
+TEST(Partitioned, ClearShouldResetSizeToZero) {
+ PartitionedIntSet test;
+ test.insert(0);
+ test.insert(1);
+ test.insert(2);
+ ASSERT_EQ(test.size(), 3UL);
+ ASSERT_EQ(test.count(0), 1UL);
+ ASSERT_EQ(test.count(1), 1UL);
+ ASSERT_EQ(test.count(2), 1UL);
+ test.clear();
+ ASSERT_EQ(test.size(), 0UL);
+ ASSERT_TRUE(test.empty());
+}
+
+TEST(Partitioned, ErasingEntryShouldModifySize) {
+ PartitionedIntSet test;
+ test.insert(0);
+ test.insert(1);
+ test.insert(2);
+ ASSERT_EQ(test.size(), 3UL);
+ ASSERT_EQ(test.count(0), 1UL);
+ ASSERT_EQ(1UL, test.erase(0));
+ ASSERT_EQ(test.count(0), 0UL);
+ ASSERT_EQ(test.size(), 2UL);
+}
+
+TEST(Partitioned, ErasingEntryThatDoesNotExistShouldNotModifySize) {
+ PartitionedIntSet test;
+ test.insert(0);
+ test.insert(1);
+ test.insert(2);
+ ASSERT_EQ(test.size(), 3UL);
+ ASSERT_EQ(test.count(5), 0UL);
+ ASSERT_EQ(0UL, test.erase(5));
+ ASSERT_EQ(test.count(5), 0UL);
+ ASSERT_EQ(test.size(), 3UL);
+}
+
+TEST(PartitionedAll, DefaultConstructedPartitionedShouldBeEmpty) {
+ PartitionedIntSet test;
+ auto all = test.lockAllPartitions();
+ ASSERT_TRUE(all.empty());
+}
+
+TEST(PartitionedAll, InsertionShouldModifySize) {
+ PartitionedIntSet test;
+ auto all = test.lockAllPartitions();
+ all.insert(4);
+ ASSERT_EQ(all.size(), 1UL);
+ ASSERT_EQ(all.count(4), 1UL);
+ ASSERT_FALSE(all.empty());
+}
+
+TEST(PartitionedAll, DuplicateInsertionShouldNotModifySize) {
+ PartitionedIntSet test;
+ auto all = test.lockAllPartitions();
+ all.insert(4);
+ all.insert(4);
+ ASSERT_EQ(all.size(), 1UL);
+ ASSERT_EQ(all.count(4), 1UL);
+}
+
+TEST(PartitionedAll, ClearShouldResetSizeToZero) {
+ PartitionedIntSet test;
+ auto all = test.lockAllPartitions();
+ all.insert(0);
+ all.insert(1);
+ all.insert(2);
+ ASSERT_EQ(all.size(), 3UL);
+ ASSERT_EQ(all.count(0), 1UL);
+ ASSERT_EQ(all.count(1), 1UL);
+ ASSERT_EQ(all.count(2), 1UL);
+ all.clear();
+ ASSERT_EQ(all.size(), 0UL);
+ ASSERT_TRUE(all.empty());
+}
+
+TEST(PartitionedAll, ErasingEntryShouldModifySize) {
+ PartitionedIntSet test;
+ auto all = test.lockAllPartitions();
+ all.insert(0);
+ all.insert(1);
+ all.insert(2);
+ ASSERT_EQ(all.size(), 3UL);
+ ASSERT_EQ(all.count(0), 1UL);
+ ASSERT_EQ(1UL, all.erase(0));
+ ASSERT_EQ(all.count(0), 0UL);
+ ASSERT_EQ(all.size(), 2UL);
+}
+
+TEST(PartitionedAll, ErasingEntryThatDoesNotExistShouldNotModifySize) {
+ PartitionedIntSet test;
+ auto all = test.lockAllPartitions();
+ all.insert(0);
+ all.insert(1);
+ all.insert(2);
+ ASSERT_EQ(all.size(), 3UL);
+ ASSERT_EQ(all.count(5), 0UL);
+ ASSERT_EQ(0UL, all.erase(5));
+ ASSERT_EQ(all.count(5), 0UL);
+ ASSERT_EQ(all.size(), 3UL);
+}
+
+TEST(PartitionedConcurrency, ShouldBeAbleToGuardSeparatePartitionsSimultaneously) {
+ PartitionedIntSet test;
+ {
+ auto zeroth = test.lockOnePartition(0);
+ auto first = test.lockOnePartition(1);
+ }
+}
+
+TEST(PartitionedConcurrency, ModificationsFromOnePartitionShouldBeVisible) {
+ PartitionedIntSet test;
+ {
+ auto zeroth = test.lockOnePartition(0);
+ zeroth->insert(0);
+ }
+
+ // Make sure a All can see the modifications.
+ {
+ auto all = test.lockAllPartitions();
+ ASSERT_EQ(1UL, all.size());
+ }
+
+ // Make sure a OnePartition can see the modifications.
+ {
+ auto guardedPartition = test.lockOnePartition(0);
+ ASSERT_EQ(1UL, guardedPartition->count(0));
+ }
+}
+
+TEST(PartitionedConcurrency, ModificationsFromAllShouldBeVisible) {
+ PartitionedIntSet test;
+ {
+ auto all = test.lockAllPartitions();
+ all.insert(0);
+ all.insert(1);
+ all.insert(2);
+ for (auto&& partition : all) {
+ ASSERT_EQ(1UL, partition.size());
+ }
+ }
+
+ // Make sure a OnePartition can see the modifications.
+ {
+ auto zeroth = test.lockOnePartition(0);
+ ASSERT_EQ(1UL, zeroth->count(0));
+ }
+ {
+ auto first = test.lockOnePartition(1);
+ ASSERT_EQ(1UL, first->count(1));
+ }
+ {
+ auto second = test.lockOnePartition(2);
+ ASSERT_EQ(1UL, second->count(2));
+ }
+
+ // Make sure a All can see the modifications.
+ {
+ auto all = test.lockAllPartitions();
+ ASSERT_EQ(3UL, all.size());
+ }
+}
+
+TEST(PartitionedConcurrency, ShouldProtectConcurrentAccesses) {
+ PartitionedIntSet test;
+
+ // 4 threads will be accessing each partition.
+ const size_t numThreads = nPartitions * 4;
+ std::vector<stdx::thread> threads;
+ const size_t opsPerThread = 1000;
+
+ AtomicUInt32 ready{0};
+ for (size_t threadId = 1; threadId <= numThreads; ++threadId) {
+ auto workerThreadBody = [&, threadId, opsPerThread]() {
+
+ // Busy-wait until everybody is ready
+ ready.fetchAndAdd(1);
+ while (ready.load() < numThreads) {
+ }
+
+ for (size_t op = 0; op < opsPerThread; ++op) {
+ size_t partitionId = threadId % nPartitions;
+ size_t uniqueVal = (nPartitions * opsPerThread * threadId + op) + partitionId;
+ if (op % 3 == 0) {
+ auto all = test.lockAllPartitions();
+ all.insert(uniqueVal);
+ } else if (op % 3 == 1) {
+ auto partition = test.lockOnePartition(partitionId);
+ partition->insert(uniqueVal);
+ } else if (op % 3 == 2) {
+ test.insert(uniqueVal);
+ }
+ }
+ };
+
+ threads.emplace_back(workerThreadBody);
+ }
+ for (auto& thread : threads)
+ thread.join();
+
+ // Make sure each insert was successful.
+ for (std::size_t partitionId = 0; partitionId < nPartitions; ++partitionId) {
+ auto partition = test.lockOnePartition(std::size_t{partitionId});
+ ASSERT_EQ(opsPerThread * 4, partition->size());
+ }
+ ASSERT_EQ(numThreads * opsPerThread, test.size());
+}
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index 9aa3398d330..d2f216b01e1 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -196,6 +196,10 @@ public:
*/
static long long totalOpen();
+ friend std::size_t partitionOf(const ClientCursor* cursor) {
+ return cursor->cursorid();
+ }
+
private:
friend class CursorManager;
friend class ClientCursorPin;
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index 135c6026137..2789e660b76 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -197,8 +197,7 @@ PlanExecutor::PlanExecutor(OperationContext* opCtx,
if (collection) {
_nss = collection->ns();
if (_yieldPolicy->canReleaseLocksDuringExecution()) {
- collection->getCursorManager()->registerExecutor(this);
- _registered = true;
+ _registrationToken = collection->getCursorManager()->registerExecutor(this);
}
} else {
invariant(_cq);
@@ -543,7 +542,7 @@ void PlanExecutor::dispose(OperationContext* opCtx, CursorManager* cursorManager
// caller of markAsKilled() will have done that already, and the CursorManager may no longer
// exist. Note that the caller's collection lock prevents us from being marked as killed during
// this method, since any interruption event requires a lock in at least MODE_IX.
- if (cursorManager && _registered && !isMarkedAsKilled()) {
+ if (cursorManager && _registrationToken && !isMarkedAsKilled()) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS));
cursorManager->deregisterExecutor(this);
}
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index edefc56465a..48fed0eab0b 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -32,9 +32,11 @@
#include <queue>
#include "mongo/base/status.h"
+#include "mongo/db/catalog/util/partitioned.h"
#include "mongo/db/invalidation_type.h"
#include "mongo/db/query/query_solution.h"
#include "mongo/db/storage/snapshot.h"
+#include "mongo/platform/unordered_set.h"
namespace mongo {
@@ -399,12 +401,23 @@ public:
* 'non-cached PlanExecutor'.
*/
void unsetRegistered() {
- _registered = false;
+ _registrationToken.reset();
+ }
+
+ boost::optional<Partitioned<unordered_set<PlanExecutor*>>::PartitionId> getRegistrationToken()
+ const& {
+ return _registrationToken;
+ }
+ void getRegistrationToken() && = delete;
+
+ void setRegistrationToken(Partitioned<unordered_set<PlanExecutor*>>::PartitionId token) & {
+ invariant(!_registrationToken);
+ _registrationToken = token;
}
bool isMarkedAsKilled() const {
return static_cast<bool>(_killReason);
- };
+ }
const std::string& getKillReason() {
invariant(isMarkedAsKilled());
@@ -490,9 +503,8 @@ private:
enum { kUsable, kSaved, kDetached, kDisposed } _currentState = kUsable;
- // Set to true if this PlanExecutor is registered with the CursorManager as a 'non-cached
- // PlanExecutor' to receive invalidations.
- bool _registered = false;
+ // Set if this PlanExecutor is registered with the CursorManager.
+ boost::optional<Partitioned<unordered_set<PlanExecutor*>>::PartitionId> _registrationToken;
bool _everDetachedFromOperationContext = false;
};
diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp
index eae94dbb7ca..101aa23f552 100644
--- a/src/mongo/dbtests/querytests.cpp
+++ b/src/mongo/dbtests/querytests.cpp
@@ -50,10 +50,10 @@
#include "mongo/dbtests/dbtests.h"
#include "mongo/util/timer.h"
+namespace {
namespace QueryTests {
using std::unique_ptr;
-using std::cout;
using std::endl;
using std::string;
using std::vector;
@@ -1188,7 +1188,6 @@ public:
while (cursor->more()) {
BSONObj o = cursor->next();
verify(o.valid(BSONVersion::kLatest));
- // cout << " foo " << o << endl;
}
}
void run() {
@@ -1406,7 +1405,7 @@ public:
fast = t.micros();
}
- cout << "HelperTest slow:" << slow << " fast:" << fast << endl;
+ std::cout << "HelperTest slow:" << slow << " fast:" << fast << endl;
}
};
@@ -1449,7 +1448,6 @@ public:
FindingStart() : CollectionBase("findingstart") {}
void run() {
- cout << "1 SFDSDF" << endl;
BSONObj info;
ASSERT(_client.runCommand("unittests",
BSON("create"
@@ -1488,7 +1486,6 @@ public:
ASSERT(!next["ts"].eoo());
ASSERT_EQUALS((j > min ? j : min), next["ts"].numberInt());
}
- cout << k << endl;
}
}
};
@@ -1498,7 +1495,6 @@ public:
FindingStartPartiallyFull() : CollectionBase("findingstart") {}
void run() {
- cout << "2 ;kljsdf" << endl;
size_t startNumCursors = numCursorsOpen();
BSONObj info;
@@ -1529,7 +1525,6 @@ public:
ASSERT(!next["ts"].eoo());
ASSERT_EQUALS((j > min ? j : min), next["ts"].numberInt());
}
- cout << k << endl;
}
ASSERT_EQUALS(startNumCursors, numCursorsOpen());
@@ -1545,7 +1540,6 @@ public:
FindingStartStale() : CollectionBase("findingstart") {}
void run() {
- cout << "3 xcxcv" << endl;
size_t startNumCursors = numCursorsOpen();
// Check OplogReplay mode with missing collection.
@@ -1763,3 +1757,4 @@ public:
SuiteInstance<All> myall;
} // namespace QueryTests
+} // namespace