summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2016-11-15 16:17:19 -0500
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-03-15 11:03:44 -0400
commit584ca76de9ee66b3e11987e640f5317ae40975e4 (patch)
treedb52f1717155c295437f1b4fa41a5db295183669 /src
parentf05b9437fbdc53deecf55ed3c20e36af3d733953 (diff)
downloadmongo-584ca76de9ee66b3e11987e640f5317ae40975e4.tar.gz
SERVER-22541 Manage aggregation cursors on global cursor manager.
Moves registration of aggregation cursors to the global cursor manager. This simplifies the logic for acquiring locks and resolving view namespaces within the getMore and killCursors commands.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/catalog/collection.cpp2
-rw-r--r--src/mongo/db/catalog/cursor_manager.cpp148
-rw-r--r--src/mongo/db/catalog/cursor_manager.h30
-rw-r--r--src/mongo/db/clientcursor.cpp24
-rw-r--r--src/mongo/db/clientcursor.h77
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp4
-rw-r--r--src/mongo/db/commands/find_cmd.cpp7
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp182
-rw-r--r--src/mongo/db/commands/killcursors_cmd.cpp67
-rw-r--r--src/mongo/db/commands/list_collections.cpp9
-rw-r--r--src/mongo/db/commands/list_indexes.cpp9
-rw-r--r--src/mongo/db/commands/parallel_collection_scan.cpp9
-rw-r--r--src/mongo/db/commands/repair_cursor.cpp7
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp167
-rw-r--r--src/mongo/db/curop.cpp6
-rw-r--r--src/mongo/db/curop.h2
-rw-r--r--src/mongo/db/db_raii.cpp73
-rw-r--r--src/mongo/db/db_raii.h48
-rw-r--r--src/mongo/db/exec/pipeline_proxy.h10
-rw-r--r--src/mongo/db/namespace_string.cpp11
-rw-r--r--src/mongo/db/namespace_string.h20
-rw-r--r--src/mongo/db/namespace_string_test.cpp35
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp4
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp104
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h39
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp5
-rw-r--r--src/mongo/db/query/explain.cpp2
-rw-r--r--src/mongo/db/query/find.cpp131
-rw-r--r--src/mongo/db/query/get_executor.cpp12
-rw-r--r--src/mongo/db/query/internal_plans.cpp4
-rw-r--r--src/mongo/db/query/plan_executor.cpp53
-rw-r--r--src/mongo/db/query/plan_executor.h12
-rw-r--r--src/mongo/db/query/plan_yield_policy.cpp4
-rw-r--r--src/mongo/db/query/query_yield.cpp2
-rw-r--r--src/mongo/db/query/query_yield.h4
-rw-r--r--src/mongo/db/range_preserver.h10
-rw-r--r--src/mongo/db/stats/top.cpp8
-rw-r--r--src/mongo/db/stats/top.h10
-rw-r--r--src/mongo/dbtests/documentsourcetests.cpp5
-rw-r--r--src/mongo/dbtests/query_plan_executor.cpp33
-rw-r--r--src/mongo/dbtests/querytests.cpp115
42 files changed, 782 insertions, 723 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index a86944badf3..de7d36b205d 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -229,7 +229,7 @@ Collection::Collection(OperationContext* opCtx,
parseValidationAction(_details->getCollectionOptions(opCtx).validationAction))),
_validationLevel(uassertStatusOK(
parseValidationLevel(_details->getCollectionOptions(opCtx).validationLevel))),
- _cursorManager(fullNS),
+ _cursorManager(_ns),
_cappedNotifier(_recordStore->isCapped() ? new CappedInsertNotifier() : nullptr),
_mustTakeCappedLockOnInsert(isCapped() && !_ns.isSystemDotProfile() && !_ns.isOplog()) {
diff --git a/src/mongo/db/catalog/cursor_manager.cpp b/src/mongo/db/catalog/cursor_manager.cpp
index b2079e9e03e..25f0be7edec 100644
--- a/src/mongo/db/catalog/cursor_manager.cpp
+++ b/src/mongo/db/catalog/cursor_manager.cpp
@@ -50,42 +50,23 @@
namespace mongo {
-using std::string;
using std::vector;
namespace {
-unsigned idFromCursorId(CursorId id) {
+uint32_t idFromCursorId(CursorId id) {
uint64_t x = static_cast<uint64_t>(id);
x = x >> 32;
- return static_cast<unsigned>(x);
+ return static_cast<uint32_t>(x);
}
-CursorId cursorIdFromParts(unsigned collection, unsigned cursor) {
- CursorId x = static_cast<CursorId>(collection) << 32;
+CursorId cursorIdFromParts(uint32_t collectionIdentifier, uint32_t cursor) {
+ // The leading two bits of a non-global CursorId should be 0.
+ invariant((collectionIdentifier & (0b11 << 30)) == 0);
+ CursorId x = static_cast<CursorId>(collectionIdentifier) << 32;
x |= cursor;
return x;
}
-
-class IdWorkTest : public StartupTest {
-public:
- void _run(unsigned a, unsigned b) {
- CursorId x = cursorIdFromParts(a, b);
- invariant(a == idFromCursorId(x));
- CursorId y = cursorIdFromParts(a, b + 1);
- invariant(x != y);
- }
-
- void run() {
- _run(123, 456);
- _run(0xdeadbeef, 0xcafecafe);
- _run(0, 0);
- _run(99999999, 999);
- _run(0xFFFFFFFF, 1);
- _run(0xFFFFFFFF, 0);
- _run(0xFFFFFFFF, 0xFFFFFFFF);
- }
-} idWorkTest;
-}
+} // namespace
class GlobalCursorIdCache {
public:
@@ -93,16 +74,16 @@ public:
~GlobalCursorIdCache();
/**
- * this gets called when a CursorManager gets created
- * @return the id the CursorManager should use when generating
- * cursor ids
+ * Returns a unique 32-bit identifier to be used as the first 32 bits of all cursor ids for a
+ * new CursorManager.
*/
- unsigned created(const std::string& ns);
+ uint32_t registerCursorManager(const NamespaceString& nss);
/**
- * called by CursorManager when its going away
+ * Must be called when a CursorManager is deleted. 'id' must be the identifier returned by
+ * registerCursorManager().
*/
- void destroyed(unsigned id, const std::string& ns);
+ void deregisterCursorManager(uint32_t id, const NamespaceString& nss);
/**
* works globally
@@ -118,8 +99,8 @@ public:
private:
SimpleMutex _mutex;
- typedef unordered_map<unsigned, string> Map;
- Map _idToNS;
+ typedef unordered_map<unsigned, NamespaceString> Map;
+ Map _idToNss;
unsigned _nextId;
std::unique_ptr<SecureRandom> _secureRandom;
@@ -137,7 +118,7 @@ MONGO_INITIALIZER(GlobalCursorIdCache)(InitializerContext* context) {
MONGO_INITIALIZER_WITH_PREREQUISITES(GlobalCursorManager, ("GlobalCursorIdCache"))
(InitializerContext* context) {
- globalCursorManager.reset(new CursorManager(""));
+ globalCursorManager.reset(new CursorManager({}));
return Status::OK();
}
@@ -152,56 +133,57 @@ int64_t GlobalCursorIdCache::nextSeed() {
return _secureRandom->nextInt64();
}
-unsigned GlobalCursorIdCache::created(const std::string& ns) {
- static const unsigned MAX_IDS = 1000 * 1000 * 1000;
+uint32_t GlobalCursorIdCache::registerCursorManager(const NamespaceString& nss) {
+ static const uint32_t kMaxIds = 1000 * 1000 * 1000;
+ static_assert((kMaxIds & (0b11 << 30)) == 0,
+ "the first two bits of a collection identifier must always be zeroes");
stdx::lock_guard<SimpleMutex> lk(_mutex);
- fassert(17359, _idToNS.size() < MAX_IDS);
+ fassert(17359, _idToNss.size() < kMaxIds);
- for (unsigned i = 0; i <= MAX_IDS; i++) {
- unsigned id = ++_nextId;
+ for (uint32_t i = 0; i <= kMaxIds; i++) {
+ uint32_t id = ++_nextId;
if (id == 0)
continue;
- if (_idToNS.count(id) > 0)
+ if (_idToNss.count(id) > 0)
continue;
- _idToNS[id] = ns;
+ _idToNss[id] = nss;
return id;
}
- invariant(false);
+ MONGO_UNREACHABLE;
}
-void GlobalCursorIdCache::destroyed(unsigned id, const std::string& ns) {
+void GlobalCursorIdCache::deregisterCursorManager(uint32_t id, const NamespaceString& nss) {
stdx::lock_guard<SimpleMutex> lk(_mutex);
- invariant(ns == _idToNS[id]);
- _idToNS.erase(id);
+ invariant(nss == _idToNss[id]);
+ _idToNss.erase(id);
}
bool GlobalCursorIdCache::eraseCursor(OperationContext* opCtx, CursorId id, bool checkAuth) {
// Figure out what the namespace of this cursor is.
- std::string ns;
- if (globalCursorManager->ownsCursorId(id)) {
- auto pin = globalCursorManager.get()->pinCursor(id);
+ NamespaceString nss;
+ if (CursorManager::isGloballyManagedCursor(id)) {
+ auto pin = globalCursorManager->pinCursor(id);
if (!pin.isOK()) {
invariant(pin == ErrorCodes::CursorNotFound);
// No such cursor. TODO: Consider writing to audit log here (even though we don't
// have a namespace).
return false;
}
- ns = pin.getValue().getCursor()->ns();
+ nss = pin.getValue().getCursor()->nss();
} else {
stdx::lock_guard<SimpleMutex> lk(_mutex);
- unsigned nsid = idFromCursorId(id);
- Map::const_iterator it = _idToNS.find(nsid);
- if (it == _idToNS.end()) {
+ uint32_t nsid = idFromCursorId(id);
+ Map::const_iterator it = _idToNss.find(nsid);
+ if (it == _idToNss.end()) {
// No namespace corresponding to this cursor id prefix. TODO: Consider writing to
// audit log here (even though we don't have a namespace).
return false;
}
- ns = it->second;
+ nss = it->second;
}
- const NamespaceString nss(ns);
invariant(nss.isValid());
// Check if we are authorized to erase this cursor.
@@ -215,7 +197,7 @@ bool GlobalCursorIdCache::eraseCursor(OperationContext* opCtx, CursorId id, bool
}
// If this cursor is owned by the global cursor manager, ask it to erase the cursor for us.
- if (globalCursorManager->ownsCursorId(id)) {
+ if (CursorManager::isGloballyManagedCursor(id)) {
Status eraseStatus = globalCursorManager->eraseCursor(opCtx, id, checkAuth);
massert(28697,
eraseStatus.reason(),
@@ -250,16 +232,11 @@ std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* opCtx, int mil
totalTimedOut += globalCursorManager->timeoutCursors(millisSinceLastCall);
// Compute the set of collection names that we have to time out cursors for.
- vector<string> todo;
+ vector<NamespaceString> todo;
{
stdx::lock_guard<SimpleMutex> lk(_mutex);
- for (Map::const_iterator i = _idToNS.begin(); i != _idToNS.end(); ++i) {
- if (globalCursorManager->ownsCursorId(cursorIdFromParts(i->first, 0))) {
- // Skip the global cursor manager, since we handle it above (and it's not
- // associated with a collection).
- continue;
- }
- todo.push_back(i->second);
+ for (auto&& entry : _idToNss) {
+ todo.push_back(entry.second);
}
}
@@ -314,14 +291,19 @@ bool CursorManager::eraseCursorGlobal(OperationContext* opCtx, CursorId id) {
// --------------------------
-CursorManager::CursorManager(StringData ns) : _nss(ns) {
- _collectionCacheRuntimeId = globalCursorIdCache->created(_nss.ns());
+CursorManager::CursorManager(NamespaceString nss) : _nss(std::move(nss)) {
+ if (!isGlobalManager()) {
+ // Generate a unique id for this collection.
+ _collectionCacheRuntimeId = globalCursorIdCache->registerCursorManager(_nss);
+ }
_random.reset(new PseudoRandom(globalCursorIdCache->nextSeed()));
}
CursorManager::~CursorManager() {
invalidateAll(true, "collection going away");
- globalCursorIdCache->destroyed(_collectionCacheRuntimeId, _nss.ns());
+ if (!isGlobalManager()) {
+ globalCursorIdCache->deregisterCursorManager(_collectionCacheRuntimeId, _nss);
+ }
}
void CursorManager::invalidateAll(bool collectionGoingAway, const std::string& reason) {
@@ -371,11 +353,8 @@ void CursorManager::invalidateAll(bool collectionGoingAway, const std::string& r
continue;
}
- if (cc->_isPinned || cc->isAggCursor()) {
- // Pinned cursors need to stay alive, so we leave them around. Aggregation
- // cursors also can stay alive (since they don't have their lifetime bound to
- // the underlying collection). However, if they have an associated executor, we
- // need to kill it, because it's now invalid.
+ if (cc->_isPinned) {
+ // Pinned cursors need to stay alive, so we leave them around.
if (cc->getExecutor())
cc->getExecutor()->kill(reason);
newMap.insert(*i);
@@ -484,10 +463,6 @@ void CursorManager::unpin(ClientCursor* cursor) {
cursor->_isPinned = false;
}
-bool CursorManager::ownsCursorId(CursorId cursorId) const {
- return _collectionCacheRuntimeId == idFromCursorId(cursorId);
-}
-
void CursorManager::getCursorIds(std::set<CursorId>* openCursors) const {
stdx::lock_guard<SimpleMutex> lk(_mutex);
@@ -504,19 +479,32 @@ size_t CursorManager::numCursors() const {
CursorId CursorManager::_allocateCursorId_inlock() {
for (int i = 0; i < 10000; i++) {
- unsigned mypart = static_cast<unsigned>(_random->nextInt32());
- CursorId id = cursorIdFromParts(_collectionCacheRuntimeId, mypart);
+ // The leading two bits of a CursorId are used to determine if the cursor is registered on
+ // the global cursor manager.
+ CursorId id;
+ if (isGlobalManager()) {
+ // This is the global cursor manager, so generate a random number and make sure the
+ // first two bits are 01.
+ uint64_t mask = 0x3FFFFFFFFFFFFFFF;
+ uint64_t bitToSet = 1ULL << 62;
+ id = ((_random->nextInt64() & mask) | bitToSet);
+ } else {
+ // The first 2 bits are 0, the next 30 bits are the collection identifier, the next 32
+ // bits are random.
+ uint32_t myPart = static_cast<uint32_t>(_random->nextInt32());
+ id = cursorIdFromParts(_collectionCacheRuntimeId, myPart);
+ }
if (_cursors.count(id) == 0)
return id;
}
fassertFailed(17360);
}
-ClientCursorPin CursorManager::registerCursor(const ClientCursorParams& cursorParams) {
+ClientCursorPin CursorManager::registerCursor(ClientCursorParams&& cursorParams) {
stdx::lock_guard<SimpleMutex> lk(_mutex);
CursorId cursorId = _allocateCursorId_inlock();
std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor(
- new ClientCursor(cursorParams, this, cursorId));
+ new ClientCursor(std::move(cursorParams), this, cursorId));
return _registerCursor_inlock(std::move(clientCursor));
}
diff --git a/src/mongo/db/catalog/cursor_manager.h b/src/mongo/db/catalog/cursor_manager.h
index ad4289d4f38..7e4da59b626 100644
--- a/src/mongo/db/catalog/cursor_manager.h
+++ b/src/mongo/db/catalog/cursor_manager.h
@@ -73,7 +73,7 @@ class PlanExecutor;
*/
class CursorManager {
public:
- CursorManager(StringData ns);
+ CursorManager(NamespaceString nss);
/**
* Destroys the CursorManager. Managed cursors which are not pinned are destroyed. Ownership of
@@ -124,7 +124,7 @@ public:
* Constructs a new ClientCursor according to the given 'cursorParams'. The cursor is atomically
* registered with the manager and returned in pinned state.
*/
- ClientCursorPin registerCursor(const ClientCursorParams& cursorParams);
+ ClientCursorPin registerCursor(ClientCursorParams&& cursorParams);
/**
* Constructs and pins a special ClientCursor used to track sharding state for the given
@@ -153,15 +153,6 @@ public:
*/
Status eraseCursor(OperationContext* opCtx, CursorId id, bool shouldAudit);
- /**
- * Returns true if the space of cursor ids that cursor manager is responsible for includes
- * the given cursor id. Otherwise, returns false.
- *
- * The return value of this method does not indicate any information about whether or not a
- * cursor actually exists with the given cursor id.
- */
- bool ownsCursorId(CursorId cursorId) const;
-
void getCursorIds(std::set<CursorId>* openCursors) const;
/**
@@ -172,6 +163,17 @@ public:
static CursorManager* getGlobalCursorManager();
+ /**
+ * Returns true if this CursorId would be registered with the global CursorManager. Note that if
+ * this method returns true it does not imply the cursor exists.
+ */
+ static bool isGloballyManagedCursor(CursorId cursorId) {
+ // The first two bits are 01 for globally managed cursors, and 00 for cursors owned by a
+ // collection. The leading bit is always 0 so that CursorIds do not appear as negative.
+ const long long mask = static_cast<long long>(0b11) << 62;
+ return (cursorId & mask) == (static_cast<long long>(0b01) << 62);
+ }
+
static int eraseCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* ids);
static bool eraseCursorGlobalIfAuthorized(OperationContext* opCtx, CursorId id);
@@ -196,8 +198,12 @@ private:
void unpin(ClientCursor* cursor);
+ bool isGlobalManager() const {
+ return _nss.isEmpty();
+ }
+
NamespaceString _nss;
- unsigned _collectionCacheRuntimeId;
+ uint32_t _collectionCacheRuntimeId;
std::unique_ptr<PseudoRandom> _random;
mutable SimpleMutex _mutex;
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index 708a2bc38f7..70003c2da2b 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -77,17 +77,16 @@ long long ClientCursor::totalOpen() {
return cursorStatsOpen.get();
}
-ClientCursor::ClientCursor(const ClientCursorParams& params,
+ClientCursor::ClientCursor(ClientCursorParams&& params,
CursorManager* cursorManager,
CursorId cursorId)
: _cursorid(cursorId),
- _ns(params.ns),
+ _nss(std::move(params.nss)),
_isReadCommitted(params.isReadCommitted),
_cursorManager(cursorManager),
- _query(params.query),
- _queryOptions(params.qopts),
- _isAggCursor(params.isAggCursor) {
- _exec.reset(params.exec);
+ _originatingCommand(params.originatingCommandObj),
+ _queryOptions(params.queryOptions),
+ _exec(std::move(params.exec)) {
init();
}
@@ -95,7 +94,7 @@ ClientCursor::ClientCursor(const Collection* collection,
CursorManager* cursorManager,
CursorId cursorId)
: _cursorid(cursorId),
- _ns(collection->ns().ns()),
+ _nss(collection->ns()),
_cursorManager(cursorManager),
_queryOptions(QueryOption_NoCursorTimeout) {
init();
@@ -106,10 +105,9 @@ void ClientCursor::init() {
cursorStatsOpen.increment();
- if (_queryOptions & QueryOption_NoCursorTimeout) {
+ if (isNoTimeout()) {
// cursors normally timeout after an inactivity period to prevent excess memory use
// setting this prevents timeout of the cursor in question.
- _isNoTimeout = true;
cursorStatsOpenNoTimeout.increment();
}
}
@@ -120,7 +118,7 @@ ClientCursor::~ClientCursor() {
invariant(!_cursorManager);
cursorStatsOpen.decrement();
- if (_isNoTimeout) {
+ if (isNoTimeout()) {
cursorStatsOpenNoTimeout.decrement();
}
}
@@ -138,7 +136,7 @@ void ClientCursor::kill() {
bool ClientCursor::shouldTimeout(int millis) {
_idleAgeMillis += millis;
- if (_isNoTimeout || _isPinned) {
+ if (isNoTimeout() || _isPinned) {
return false;
}
return _idleAgeMillis > cursorTimeoutMillis.load();
@@ -152,7 +150,7 @@ void ClientCursor::updateSlaveLocation(OperationContext* opCtx) {
if (_slaveReadTill.isNull())
return;
- verify(str::startsWith(_ns.c_str(), "local.oplog."));
+ verify(_nss.isOplog());
Client* c = opCtx->getClient();
verify(c);
@@ -221,7 +219,7 @@ void ClientCursorPin::release() {
if (!_cursor->_cursorManager) {
// The ClientCursor was killed while we had it. Therefore, it is our responsibility to
- // kill it.
+ // delete it.
deleteUnderlying();
} else {
// Unpin the cursor under the collection cursor manager lock.
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index 44ae4600ac6..91e7a0d325c 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -28,6 +28,7 @@
#pragma once
+#include "mongo/client/dbclientinterface.h"
#include "mongo/db/cursor_id.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/query/plan_executor.h"
@@ -42,30 +43,30 @@ class CursorManager;
class RecoveryUnit;
/**
- * Parameters used for constructing a ClientCursor. ClientCursors cannot be constructed in
- * isolation, but rather must be constructed and managed using a CursorManager. See cursor_manager.h
- * for more details.
+ * Parameters used for constructing a ClientCursor. Makes an owned copy of 'originatingCommandObj'
+ * to be used across getMores.
+ *
+ * ClientCursors cannot be constructed in isolation, but rather must be
+ * constructed and managed using a CursorManager. See cursor_manager.h for more details.
*/
struct ClientCursorParams {
- ClientCursorParams(PlanExecutor* exec,
- std::string ns,
+ ClientCursorParams(std::unique_ptr<PlanExecutor> planExecutor,
+ NamespaceString nss,
bool isReadCommitted,
- int qopts = 0,
- const BSONObj query = BSONObj(),
- bool isAggCursor = false)
- : exec(exec),
- ns(std::move(ns)),
+ BSONObj originatingCommandObj)
+ : exec(std::move(planExecutor)),
+ nss(std::move(nss)),
isReadCommitted(isReadCommitted),
- qopts(qopts),
- query(query),
- isAggCursor(isAggCursor) {}
+ queryOptions(exec->getCanonicalQuery()
+ ? exec->getCanonicalQuery()->getQueryRequest().getOptions()
+ : 0),
+ originatingCommandObj(originatingCommandObj.getOwned()) {}
- PlanExecutor* exec = nullptr;
- const std::string ns;
+ std::unique_ptr<PlanExecutor> exec;
+ const NamespaceString nss;
bool isReadCommitted = false;
- int qopts = 0;
- const BSONObj query = BSONObj();
- bool isAggCursor = false;
+ int queryOptions = 0;
+ BSONObj originatingCommandObj;
};
/**
@@ -92,18 +93,14 @@ public:
return _cursorid;
}
- std::string ns() const {
- return _ns;
+ const NamespaceString& nss() const {
+ return _nss;
}
bool isReadCommitted() const {
return _isReadCommitted;
}
- bool isAggCursor() const {
- return _isAggCursor;
- }
-
PlanExecutor* getExecutor() const {
return _exec.get();
}
@@ -112,8 +109,8 @@ public:
return _queryOptions;
}
- const BSONObj& getQuery() const {
- return _query;
+ const BSONObj& getOriginatingCommandObj() const {
+ return _originatingCommand;
}
/**
@@ -223,7 +220,7 @@ private:
* Constructs a ClientCursor. Since cursors must come into being registered and pinned, this is
* private. See cursor_manager.h for more details.
*/
- ClientCursor(const ClientCursorParams& params, CursorManager* cursorManager, CursorId cursorId);
+ ClientCursor(ClientCursorParams&& params, CursorManager* cursorManager, CursorId cursorId);
/**
* Constructs a special ClientCursor used to track sharding state for the given collection.
@@ -246,11 +243,15 @@ private:
*/
void kill();
+ bool isNoTimeout() const {
+ return (_queryOptions & QueryOption_NoCursorTimeout);
+ }
+
// The ID of the ClientCursor. A value of 0 is used to mean that no cursor id has been assigned.
CursorId _cursorid = 0;
// The namespace we're operating on.
- std::string _ns;
+ const NamespaceString _nss;
const bool _isReadCommitted = false;
@@ -265,21 +266,11 @@ private:
// Tracks the number of results returned by this cursor so far.
long long _pos = 0;
- // If this cursor was created by a find operation, '_query' holds the query predicate for
- // the find. If this cursor was created by a command (e.g. the aggregate command), then
- // '_query' holds the command specification received from the client.
- BSONObj _query;
+ // Holds an owned copy of the command specification received from the client.
+ const BSONObj _originatingCommand;
// See the QueryOptions enum in dbclientinterface.h.
- int _queryOptions = 0;
-
- // Is this ClientCursor backed by an aggregation pipeline?
- //
- // Agg executors differ from others in that they manage their own locking internally and
- // should not be killed or destroyed when the underlying collection is deleted.
- //
- // Note: This should *not* be set for the internal cursor used as input to an aggregation.
- const bool _isAggCursor = false;
+ const int _queryOptions = 0;
// While a cursor is being used by a client, it is marked as "pinned". See ClientCursorPin
// below.
@@ -287,10 +278,6 @@ private:
// Cursors always come into existence in a pinned state.
bool _isPinned = true;
- // Is the "no timeout" flag set on this cursor? If false, this cursor may be automatically
- // deleted after an interval of inactivity.
- bool _isNoTimeout = false;
-
// The replication position only used in master-slave.
Timestamp _slaveReadTill;
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index a2deb9db62a..cd66033e427 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -203,13 +203,11 @@ Status checkCanAcceptWritesForDatabase(OperationContext* opCtx, const NamespaceS
void recordStatsForTopCommand(OperationContext* opCtx) {
auto curOp = CurOp::get(opCtx);
- const int writeLocked = 1;
-
Top::get(opCtx->getClient()->getServiceContext())
.record(opCtx,
curOp->getNS(),
curOp->getLogicalOp(),
- writeLocked,
+ Top::LockType::WriteLocked,
curOp->elapsedMicros(),
curOp->isCommand(),
curOp->getReadWriteType());
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index d43e991950c..60ae2e2eba4 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -399,11 +399,10 @@ public:
// Create a ClientCursor containing this plan executor and register it with the cursor
// manager.
ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor(
- {exec.release(),
- nss.ns(),
+ {std::move(exec),
+ nss,
opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
- originalQR.getOptions(),
- cmdObj.getOwned()});
+ cmdObj});
cursorId = pinnedCursor.getCursor()->cursorid();
invariant(!exec);
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 78c7d822404..448177a8068 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/counters.h"
+#include "mongo/db/stats/top.h"
#include "mongo/s/chunk_version.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/fail_point_service.h"
@@ -154,9 +155,6 @@ public:
auto curOp = CurOp::get(opCtx);
curOp->debug().cursorid = request.cursorid;
- // Disable shard version checking - getmore commands are always unversioned
- OperationShardingState::get(opCtx).setShardVersion(request.nss, ChunkVersion::IGNORED());
-
// Validate term before acquiring locks, if provided.
if (request.term) {
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
@@ -167,67 +165,47 @@ public:
}
}
- // Depending on the type of cursor being operated on, we hold locks for the whole
- // getMore, or none of the getMore, or part of the getMore. The three cases in detail:
+ // Cursors come in one of two flavors:
+ // - Cursors owned by the collection cursor manager, such as those generated via the find
+ // command. For these cursors, we hold the appropriate collection lock for the duration of
+ // the getMore using AutoGetCollectionForRead. This will automatically update the CurOp
+ // object appropriately and record execution time via Top upon completion.
+ // - Cursors owned by the global cursor manager, such as those generated via the aggregate
+ // command. These cursors either hold no collection state or manage their collection state
+ // internally, so we acquire no locks. In this case we use the AutoStatsTracker object to
+ // update the CurOp object appropriately and record execution time via Top upon
+ // completion.
//
- // 1) Normal cursor: we lock with "ctx" and hold it for the whole getMore.
- // 2) Cursor owned by global cursor manager: we don't lock anything. These cursors
- // don't own any collection state. These cursors are generated either by the
- // listCollections or listIndexes commands, as these special cursor-generating commands
- // operate over catalog data rather than targeting the data within a collection.
- // 3) Agg cursor: we lock with "ctx", then release, then relock with "unpinDBLock" and
- // "unpinCollLock". This is because agg cursors handle locking internally (hence the
- // release), but the pin and unpin of the cursor must occur under the collection
- // lock. We don't use our AutoGetCollectionForRead "ctx" to relock, because
- // AutoGetCollectionForRead checks the sharding version (and we want the relock for
- // the unpin to succeed even if the sharding version has changed).
+ // Thus, only one of 'readLock' and 'statsTracker' will be populated as we populate
+ // 'cursorManager'.
//
// Note that we declare our locks before our ClientCursorPin, in order to ensure that
- // the pin's destructor is called before the lock destructors (so that the unpin occurs
- // under the lock).
- std::unique_ptr<AutoGetCollectionForReadCommand> ctx;
- std::unique_ptr<Lock::DBLock> unpinDBLock;
- std::unique_ptr<Lock::CollectionLock> unpinCollLock;
-
+ // the pin's destructor is called before the lock's destructor (if there is one) so that the
+ // cursor cleanup can occur under the lock.
+ boost::optional<AutoGetCollectionForReadCommand> readLock;
+ boost::optional<AutoStatsTracker> statsTracker;
CursorManager* cursorManager;
- if (request.nss.isListIndexesCursorNS() || request.nss.isListCollectionsCursorNS()) {
+
+ if (CursorManager::isGloballyManagedCursor(request.cursorid)) {
cursorManager = CursorManager::getGlobalCursorManager();
+
+ if (boost::optional<NamespaceString> nssForCurOp =
+ request.nss.isGloballyManagedNamespace()
+ ? request.nss.getTargetNSForGloballyManagedNamespace()
+ : request.nss) {
+ const boost::optional<int> dbProfilingLevel = boost::none;
+ statsTracker.emplace(
+ opCtx, *nssForCurOp, Top::LockType::NotLocked, dbProfilingLevel);
+ }
} else {
- ctx = stdx::make_unique<AutoGetCollectionOrViewForReadCommand>(opCtx, request.nss);
- auto viewCtx = static_cast<AutoGetCollectionOrViewForReadCommand*>(ctx.get());
- Collection* collection = ctx->getCollection();
+ // getMore commands are always unversioned, so prevent AutoGetCollectionForRead from
+ // checking the shard version.
+ OperationShardingState::get(opCtx).setShardVersion(request.nss,
+ ChunkVersion::IGNORED());
+
+ readLock.emplace(opCtx, request.nss);
+ Collection* collection = readLock->getCollection();
if (!collection) {
- // Rewrite a getMore on a view to a getMore on the original underlying collection.
- // If the view no longer exists, or has been rewritten, the cursor id will be
- // unknown, resulting in an appropriate error.
- if (viewCtx->getView()) {
- auto resolved =
- viewCtx->getDb()->getViewCatalog()->resolveView(opCtx, request.nss);
- if (!resolved.isOK()) {
- return appendCommandStatus(result, resolved.getStatus());
- }
- viewCtx->releaseLocksForView();
-
- // Only one shardversion can be set at a time for an operation, so unset it
- // here to allow setting it on the underlying namespace.
- OperationShardingState::get(opCtx).unsetShardVersion(request.nss);
-
- GetMoreRequest newRequest(resolved.getValue().getNamespace(),
- request.cursorid,
- request.batchSize,
- request.awaitDataTimeout,
- request.term,
- request.lastKnownCommittedOpTime);
-
- bool retVal = runParsed(opCtx, origNss, newRequest, cmdObj, errmsg, result);
- {
- // Set the namespace of the curop back to the view namespace so ctx records
- // stats on this view namespace on destruction.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- curOp->setNS_inlock(origNss.ns());
- }
- return retVal;
- }
return appendCommandStatus(result,
Status(ErrorCodes::OperationFailed,
"collection dropped between getMore calls"));
@@ -243,23 +221,24 @@ public:
ClientCursor* cursor = ccPin.getValue().getCursor();
- // If the fail point is enabled, busy wait until it is disabled. We unlock and re-acquire
- // the locks periodically in order to avoid deadlock (see SERVER-21997 for details).
+ // If the fail point is enabled, busy wait until it is disabled.
while (MONGO_FAIL_POINT(keepCursorPinnedDuringGetMore)) {
- invariant(ctx);
- invariant(!unpinDBLock);
- invariant(!unpinCollLock);
- sleepFor(Milliseconds(10));
- ctx.reset();
- ctx = stdx::make_unique<AutoGetCollectionForReadCommand>(opCtx, request.nss);
+ if (readLock) {
+ // We unlock and re-acquire the locks periodically in order to avoid deadlock (see
+ // SERVER-21997 for details).
+ sleepFor(Milliseconds(10));
+ readLock.reset();
+ readLock.emplace(opCtx, request.nss);
+ }
}
- if (request.nss.ns() != cursor->ns()) {
+ if (request.nss != cursor->nss()) {
return appendCommandStatus(
result,
Status(ErrorCodes::Unauthorized,
str::stream() << "Requested getMore on namespace '" << request.nss.ns()
- << "', but cursor belongs to a different namespace"));
+ << "', but cursor belongs to a different namespace "
+ << cursor->nss().ns()));
}
if (request.nss.isOplog() && MONGO_FAIL_POINT(rsStopGetMoreCmd)) {
@@ -274,9 +253,8 @@ public:
if (isCursorAwaitData(cursor)) {
invariant(isCursorTailable(cursor));
- if (cursor->isAggCursor()) {
- Status status(ErrorCodes::BadValue,
- "awaitData cannot be set on an aggregation cursor");
+ if (CursorManager::isGloballyManagedCursor(request.cursorid)) {
+ Status status(ErrorCodes::BadValue, "awaitData cannot be set on this cursor");
return appendCommandStatus(result, status);
}
}
@@ -288,8 +266,7 @@ public:
}
// On early return, get rid of the cursor.
- ScopeGuard cursorFreer =
- MakeGuard(&GetMoreCmd::cleanupCursor, opCtx, &ccPin.getValue(), request);
+ ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &ccPin.getValue());
if (cursor->isReadCommitted())
uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
@@ -318,11 +295,6 @@ public:
}
opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
- if (cursor->isAggCursor()) {
- // Agg cursors handle their own locking internally.
- ctx.reset(); // unlocks
- }
-
PlanExecutor* exec = cursor->getExecutor();
exec->reattachToOperationContext(opCtx);
exec->restoreState();
@@ -334,7 +306,7 @@ public:
// Ensure that the original query or command object is available in the slow query log,
// profiler and currentOp.
- auto originatingCommand = cursor->getQuery();
+ auto originatingCommand = cursor->getOriginatingCommandObj();
if (!originatingCommand.isEmpty()) {
curOp->setOriginatingCommand_inlock(originatingCommand);
}
@@ -343,12 +315,12 @@ public:
uint64_t notifierVersion = 0;
std::shared_ptr<CappedInsertNotifier> notifier;
if (isCursorAwaitData(cursor)) {
- invariant(ctx->getCollection()->isCapped());
+ invariant(readLock->getCollection()->isCapped());
// Retrieve the notifier which we will wait on until new data arrives. We make sure
// to do this in the lock because once we drop the lock it is possible for the
// collection to become invalid. The notifier itself will outlive the collection if
// the collection is dropped, as we keep a shared_ptr to it.
- notifier = ctx->getCollection()->getCappedInsertNotifier();
+ notifier = readLock->getCollection()->getCappedInsertNotifier();
// Must get the version before we call generateBatch in case a write comes in after
// that call and before we call wait on the notifier.
@@ -386,11 +358,11 @@ public:
// to do this in the lock because once we drop the lock it is possible for the
// collection to become invalid. The notifier itself will outlive the collection if
// the collection is dropped, as we keep a shared_ptr to it.
- auto notifier = ctx->getCollection()->getCappedInsertNotifier();
+ auto notifier = readLock->getCollection()->getCappedInsertNotifier();
// Save the PlanExecutor and drop our locks.
exec->saveState();
- ctx.reset();
+ readLock.reset();
// Block waiting for data.
const auto timeout = opCtx->getRemainingMaxTimeMicros();
@@ -402,7 +374,7 @@ public:
// CappedInsertNotifier.
curOp->setExpectedLatencyMs(durationCount<Milliseconds>(timeout));
- ctx.reset(new AutoGetCollectionForReadCommand(opCtx, request.nss));
+ readLock.emplace(opCtx, request.nss);
exec->restoreState();
// We woke up because either the timed_wait expired, or there was more data. Either
@@ -420,11 +392,11 @@ public:
postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined;
curOp->debug().setPlanSummaryMetrics(postExecutionStats);
- // We do not report 'execStats' for aggregation, both in the original request and
- // subsequent getMore. The reason for this is that aggregation's source PlanExecutor
- // could be destroyed before we know whether we need execStats and we do not want to
- // generate for all operations due to cost.
- if (!cursor->isAggCursor() && curOp->shouldDBProfile()) {
+ // We do not report 'execStats' for aggregation or other globally managed cursors, both in
+ // the original request and subsequent getMore. It would be useful to have this information
+ // for an aggregation, but the source PlanExecutor could be destroyed before we know whether
+ // we need execStats and we do not want to generate for all operations due to cost.
+ if (!CursorManager::isGloballyManagedCursor(request.cursorid) && curOp->shouldDBProfile()) {
BSONObjBuilder execStatsBob;
Explain::getWinningPlanStats(exec, &execStatsBob);
curOp->debug().execStats = execStatsBob.obj();
@@ -448,9 +420,7 @@ public:
curOp->debug().cursorExhausted = true;
}
- // Respond with the originally requested namespace, even if this is a getMore over a view
- // that was resolved to a different backing namespace.
- nextBatch.done(respondWithId, origNss.ns());
+ nextBatch.done(respondWithId, request.nss.ns());
// Ensure log and profiler include the number of results returned in this getMore's response
// batch.
@@ -458,15 +428,6 @@ public:
if (respondWithId) {
cursorFreer.Dismiss();
-
- // If we are operating on an aggregation cursor, then we dropped our collection lock
- // earlier and need to reacquire it in order to clean up our ClientCursorPin.
- if (cursor->isAggCursor()) {
- invariant(NULL == ctx.get());
- unpinDBLock.reset(new Lock::DBLock(opCtx, request.nss.db(), MODE_IS));
- unpinCollLock.reset(
- new Lock::CollectionLock(opCtx->lockState(), request.nss.ns(), MODE_IS));
- }
}
return true;
@@ -554,27 +515,6 @@ public:
return Status::OK();
}
- /**
- * Called via a ScopeGuard on early return in order to ensure that the ClientCursor gets
- * cleaned up properly.
- */
- static void cleanupCursor(OperationContext* opCtx,
- ClientCursorPin* ccPin,
- const GetMoreRequest& request) {
- ClientCursor* cursor = ccPin->getCursor();
-
- std::unique_ptr<Lock::DBLock> unpinDBLock;
- std::unique_ptr<Lock::CollectionLock> unpinCollLock;
-
- if (cursor->isAggCursor()) {
- unpinDBLock.reset(new Lock::DBLock(opCtx, request.nss.db(), MODE_IS));
- unpinCollLock.reset(
- new Lock::CollectionLock(opCtx->lockState(), request.nss.ns(), MODE_IS));
- }
-
- ccPin->deleteUnderlying();
- }
-
} getMoreCmd;
} // namespace mongo
diff --git a/src/mongo/db/commands/killcursors_cmd.cpp b/src/mongo/db/commands/killcursors_cmd.cpp
index c1526d990b5..a39f0dfaa42 100644
--- a/src/mongo/db/commands/killcursors_cmd.cpp
+++ b/src/mongo/db/commands/killcursors_cmd.cpp
@@ -35,6 +35,8 @@
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/query/killcursors_request.h"
+#include "mongo/db/stats/top.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -48,34 +50,51 @@ private:
Status _killCursor(OperationContext* opCtx,
const NamespaceString& nss,
CursorId cursorId) final {
- std::unique_ptr<AutoGetCollectionOrViewForReadCommand> ctx;
-
+ // Cursors come in one of two flavors:
+ // - Cursors owned by the collection cursor manager, such as those generated via the find
+ // command. For these cursors, we hold the appropriate collection lock for the duration of
+ // the getMore using AutoGetCollectionForRead. This will automatically update the CurOp
+ // object appropriately and record execution time via Top upon completion.
+ // - Cursors owned by the global cursor manager, such as those generated via the aggregate
+ // command. These cursors either hold no collection state or manage their collection state
+ // internally, so we acquire no locks. In this case we use the AutoStatsTracker object to
+ // update the CurOp object appropriately and record execution time via Top upon
+ // completion.
+ //
+ // Thus, exactly one of 'readLock' and 'statsTracker' will be populated as we populate
+ // 'cursorManager'.
+ boost::optional<AutoGetCollectionForReadCommand> readLock;
+ boost::optional<AutoStatsTracker> statsTracker;
CursorManager* cursorManager;
- if (nss.isListIndexesCursorNS() || nss.isListCollectionsCursorNS()) {
- // listCollections and listIndexes are special cursor-generating commands whose cursors
- // are managed globally, as they operate over catalog data rather than targeting the
- // data within a collection.
+
+ if (CursorManager::isGloballyManagedCursor(cursorId)) {
cursorManager = CursorManager::getGlobalCursorManager();
- } else {
- ctx = stdx::make_unique<AutoGetCollectionOrViewForReadCommand>(opCtx, nss);
- Collection* collection = ctx->getCollection();
- ViewDefinition* view = ctx->getView();
- if (view) {
- Database* db = ctx->getDb();
- auto resolved = db->getViewCatalog()->resolveView(opCtx, nss);
- if (!resolved.isOK()) {
- return resolved.getStatus();
- }
- ctx->releaseLocksForView();
- Status status = _killCursor(opCtx, resolved.getValue().getNamespace(), cursorId);
- {
- // Set the namespace of the curop back to the view namespace so ctx records
- // stats on this view namespace on destruction.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->setNS_inlock(nss.ns());
+
+ if (auto nssForCurOp = nss.isGloballyManagedNamespace()
+ ? nss.getTargetNSForGloballyManagedNamespace()
+ : nss) {
+ const boost::optional<int> dbProfilingLevel = boost::none;
+ statsTracker.emplace(
+ opCtx, *nssForCurOp, Top::LockType::NotLocked, dbProfilingLevel);
+ }
+
+ // Make sure the namespace of the cursor matches the namespace passed to the killCursors
+ // command so we can be sure we checked the correct privileges.
+ auto ccPin = cursorManager->pinCursor(cursorId);
+ if (ccPin.isOK()) {
+ auto cursorNs = ccPin.getValue().getCursor()->nss();
+ if (cursorNs != nss) {
+ return Status{ErrorCodes::Unauthorized,
+ str::stream() << "issued killCursors on namespace '" << nss.ns()
+ << "', but cursor with id "
+ << cursorId
+ << " belongs to a different namespace: "
+ << cursorNs.ns()};
}
- return status;
}
+ } else {
+ readLock.emplace(opCtx, nss);
+ Collection* collection = readLock->getCollection();
if (!collection) {
return {ErrorCodes::CursorNotFound,
str::stream() << "collection does not exist: " << nss.ns()};
diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp
index da72422b4eb..a636ed51d17 100644
--- a/src/mongo/db/commands/list_collections.cpp
+++ b/src/mongo/db/commands/list_collections.cpp
@@ -298,7 +298,7 @@ public:
const NamespaceString cursorNss = NamespaceString::makeListCollectionsNSS(dbname);
auto statusWithPlanExecutor = PlanExecutor::make(
- opCtx, std::move(ws), std::move(root), cursorNss.ns(), PlanExecutor::YIELD_MANUAL);
+ opCtx, std::move(ws), std::move(root), cursorNss, PlanExecutor::YIELD_MANUAL);
if (!statusWithPlanExecutor.isOK()) {
return appendCommandStatus(result, statusWithPlanExecutor.getStatus());
}
@@ -328,9 +328,10 @@ public:
exec->saveState();
exec->detachFromOperationContext();
auto pinnedCursor = CursorManager::getGlobalCursorManager()->registerCursor(
- {exec.release(),
- cursorNss.ns(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()});
+ {std::move(exec),
+ cursorNss,
+ opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ jsobj});
cursorId = pinnedCursor.getCursor()->cursorid();
}
diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp
index 73e3d19289e..7ea9b23ed0d 100644
--- a/src/mongo/db/commands/list_indexes.cpp
+++ b/src/mongo/db/commands/list_indexes.cpp
@@ -198,7 +198,7 @@ public:
dassert(ns == cursorNss.getTargetNSForListIndexes());
auto statusWithPlanExecutor = PlanExecutor::make(
- opCtx, std::move(ws), std::move(root), cursorNss.ns(), PlanExecutor::YIELD_MANUAL);
+ opCtx, std::move(ws), std::move(root), cursorNss, PlanExecutor::YIELD_MANUAL);
if (!statusWithPlanExecutor.isOK()) {
return appendCommandStatus(result, statusWithPlanExecutor.getStatus());
}
@@ -228,9 +228,10 @@ public:
exec->saveState();
exec->detachFromOperationContext();
auto pinnedCursor = CursorManager::getGlobalCursorManager()->registerCursor(
- {exec.release(),
- cursorNss.ns(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()});
+ {std::move(exec),
+ cursorNss,
+ opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ cmdObj});
cursorId = pinnedCursor.getCursor()->cursorid();
}
diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp
index 8ff2e100a58..c46cb6d8c5b 100644
--- a/src/mongo/db/commands/parallel_collection_scan.cpp
+++ b/src/mongo/db/commands/parallel_collection_scan.cpp
@@ -148,11 +148,12 @@ public:
exec->saveState();
exec->detachFromOperationContext();
- // Create and regiter a new ClientCursor.
+ // Create and register a new ClientCursor.
auto pinnedCursor = collection->getCursorManager()->registerCursor(
- {exec.release(),
- ns.ns(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()});
+ {std::move(exec),
+ ns,
+ opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ cmdObj});
pinnedCursor.getCursor()->setLeftoverMaxTimeMicros(
opCtx->getRemainingMaxTimeMicros());
diff --git a/src/mongo/db/commands/repair_cursor.cpp b/src/mongo/db/commands/repair_cursor.cpp
index ac6155cc394..69ded443ec6 100644
--- a/src/mongo/db/commands/repair_cursor.cpp
+++ b/src/mongo/db/commands/repair_cursor.cpp
@@ -107,9 +107,10 @@ public:
exec->detachFromOperationContext();
auto pinnedCursor = collection->getCursorManager()->registerCursor(
- {exec.release(),
- ns.ns(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()});
+ {std::move(exec),
+ ns,
+ opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ cmdObj});
appendCursorResponseObject(
pinnedCursor.getCursor()->cursorid(), ns.ns(), BSONArray(), &result);
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 35164560c16..f0ef162e9af 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -60,6 +60,7 @@
#include "mongo/db/views/view_sharding_check.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
#include "mongo/util/string_map.h"
namespace mongo {
@@ -76,19 +77,15 @@ namespace {
/**
* Returns true if we need to keep a ClientCursor saved for this pipeline (for future getMore
* requests). Otherwise, returns false. The passed 'nsForCursor' is only used to determine the
- * namespace used in the returned cursor. In the case of views, this can be different from that
- * in 'request'.
+ * namespace used in the returned cursor, which will be registered with the global cursor manager,
+ * and thus will be different from that in 'request'.
*/
bool handleCursorCommand(OperationContext* opCtx,
- const string& nsForCursor,
+ const NamespaceString& nsForCursor,
ClientCursor* cursor,
- PlanExecutor* exec,
const AggregationRequest& request,
BSONObjBuilder& result) {
- if (cursor) {
- invariant(cursor->getExecutor() == exec);
- invariant(cursor->isAggCursor());
- }
+ invariant(cursor);
long long batchSize = request.getBatchSize();
@@ -99,45 +96,29 @@ bool handleCursorCommand(OperationContext* opCtx,
// The initial getNext() on a PipelineProxyStage may be very expensive so we don't
// do it when batchSize is 0 since that indicates a desire for a fast return.
PlanExecutor::ExecState state;
- if ((state = exec->getNext(&next, NULL)) == PlanExecutor::IS_EOF) {
+ if ((state = cursor->getExecutor()->getNext(&next, nullptr)) == PlanExecutor::IS_EOF) {
// make it an obvious error to use cursor or executor after this point
- cursor = NULL;
- exec = NULL;
+ cursor = nullptr;
break;
}
- uassert(34426,
- "Plan executor error during aggregation: " + WorkingSetCommon::toStatusString(next),
- PlanExecutor::ADVANCED == state);
+ if (PlanExecutor::ADVANCED != state) {
+ auto status = WorkingSetCommon::getMemberObjectStatus(next);
+ uasserted(status.code(),
+ "PlanExecutor error during aggregation: " +
+ WorkingSetCommon::toStatusString(next));
+ }
// If adding this object will cause us to exceed the message size limit, then we stash it
// for later.
if (!FindCommon::haveSpaceForNext(next, objCount, resultsArray.len())) {
- exec->enqueue(next);
+ cursor->getExecutor()->enqueue(next);
break;
}
resultsArray.append(next);
}
- // NOTE: exec->isEOF() can have side effects such as writing by $out. However, it should
- // be relatively quick since if there was no cursor then the input is empty. Also, this
- // violates the contract for batchSize==0. Sharding requires a cursor to be returned in that
- // case. This is ok for now however, since you can't have a sharded collection that doesn't
- // exist.
- const bool canReturnMoreBatches = cursor;
- if (!canReturnMoreBatches && exec && !exec->isEOF()) {
- // msgasserting since this shouldn't be possible to trigger from today's aggregation
- // language. The wording assumes that the only reason cursor would be null is if the
- // collection doesn't exist.
- msgasserted(
- 17391,
- str::stream() << "Aggregation has more results than fit in initial batch, but can't "
- << "create cursor since collection "
- << nsForCursor
- << " doesn't exist");
- }
-
if (cursor) {
// If a time limit was set on the pipeline, remaining time is "rolled over" to the
// cursor (for use by future getmore ops).
@@ -147,14 +128,14 @@ bool handleCursorCommand(OperationContext* opCtx,
// Cursor needs to be in a saved state while we yield locks for getmore. State
// will be restored in getMore().
- exec->saveState();
- exec->detachFromOperationContext();
+ cursor->getExecutor()->saveState();
+ cursor->getExecutor()->detachFromOperationContext();
} else {
CurOp::get(opCtx)->debug().cursorExhausted = true;
}
- const long long cursorId = cursor ? cursor->cursorid() : 0LL;
- appendCursorResponseObject(cursorId, nsForCursor, resultsArray.arr(), &result);
+ const CursorId cursorId = cursor ? cursor->cursorid() : 0LL;
+ appendCursorResponseObject(cursorId, nsForCursor.ns(), resultsArray.arr(), &result);
return static_cast<bool>(cursor);
}
@@ -296,7 +277,6 @@ Status runAggregate(OperationContext* opCtx,
: uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
->makeFromBSON(request.getCollation()));
- boost::optional<ClientCursorPin> pin; // either this OR the exec will be non-null
unique_ptr<PlanExecutor> exec;
boost::intrusive_ptr<ExpressionContext> expCtx;
boost::intrusive_ptr<Pipeline> pipeline;
@@ -304,11 +284,6 @@ Status runAggregate(OperationContext* opCtx,
{
// This will throw if the sharding version for this connection is out of date. If the
// namespace is a view, the lock will be released before re-running the aggregation.
- // Otherwise, the lock must be held continuously from now until we have we created both
- // the output ClientCursor and the input executor. This ensures that both are using the
- // same sharding version that we synchronize on here. This is also why we always need to
- // create a ClientCursor even when we aren't outputting to a cursor. See the comment on
- // ShardFilterStage for more details.
AutoGetCollectionOrViewForReadCommand ctx(opCtx, nss);
Collection* collection = ctx.getCollection();
@@ -416,17 +391,15 @@ Status runAggregate(OperationContext* opCtx,
// it to the front of the pipeline if needed.
PipelineD::prepareCursorSource(collection, &request, pipeline);
- // Create the PlanExecutor which returns results from the pipeline. The WorkingSet
- // ('ws') and the PipelineProxyStage ('proxy') will be owned by the created
- // PlanExecutor.
auto ws = make_unique<WorkingSet>();
auto proxy = make_unique<PipelineProxyStage>(opCtx, pipeline, ws.get());
- auto statusWithPlanExecutor = (NULL == collection)
- ? PlanExecutor::make(
- opCtx, std::move(ws), std::move(proxy), nss.ns(), PlanExecutor::YIELD_MANUAL)
- : PlanExecutor::make(
- opCtx, std::move(ws), std::move(proxy), collection, PlanExecutor::YIELD_MANUAL);
+ // This PlanExecutor will simply forward requests to the Pipeline, so does not need to
+ // yield or to be registered with any collection's CursorManager to receive invalidations.
+ // The Pipeline may contain PlanExecutors which *are* yielding PlanExecutors and which *are*
+ // registered with their respective collection's CursorManager
+ auto statusWithPlanExecutor = PlanExecutor::make(
+ opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::YIELD_MANUAL);
invariant(statusWithPlanExecutor.isOK());
exec = std::move(statusWithPlanExecutor.getValue());
@@ -435,75 +408,39 @@ Status runAggregate(OperationContext* opCtx,
stdx::lock_guard<Client> lk(*opCtx->getClient());
curOp->setPlanSummary_inlock(std::move(planSummary));
}
-
- if (collection) {
- const bool isAggCursor = true; // enable special locking behavior
- pin.emplace(collection->getCursorManager()->registerCursor(
- {exec.release(),
- nss.ns(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
- 0,
- cmdObj.getOwned(),
- isAggCursor}));
- // Don't add any code between here and the start of the try block.
- }
-
- // At this point, it is safe to release the collection lock.
- // - In the case where we have a collection: we will need to reacquire the
- // collection lock later when cleaning up our ClientCursorPin.
- // - In the case where we don't have a collection: our PlanExecutor won't be
- // registered, so it will be safe to clean it up outside the lock.
- invariant(!exec || !collection);
}
- try {
- // Unless set to true, the ClientCursor created above will be deleted on block exit.
- bool keepCursor = false;
-
- // If both explain and cursor are specified, explain wins.
- if (expCtx->explain) {
- result << "stages" << Value(pipeline->writeExplainOps(*expCtx->explain));
- } else {
- // Cursor must be specified, if explain is not.
- keepCursor = handleCursorCommand(opCtx,
- origNss.ns(),
- pin ? pin->getCursor() : nullptr,
- pin ? pin->getCursor()->getExecutor() : exec.get(),
- request,
- result);
- }
-
- if (!expCtx->explain) {
- PlanSummaryStats stats;
- Explain::getSummaryStats(pin ? *pin->getCursor()->getExecutor() : *exec.get(), &stats);
- curOp->debug().setPlanSummaryMetrics(stats);
- curOp->debug().nreturned = stats.nReturned;
+ // Having released the collection lock, we can now create a cursor that returns results from the
+ // pipeline. This cursor owns no collection state, and thus we register it with the global
+ // cursor manager. The global cursor manager does not deliver invalidations or kill
+ // notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving
+ // invalidations and kill notifications themselves, not the cursor we create here.
+ auto pin = CursorManager::getGlobalCursorManager()->registerCursor(
+ {std::move(exec),
+ origNss,
+ opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ cmdObj});
+ ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin);
+
+ // If both explain and cursor are specified, explain wins.
+ if (expCtx->explain) {
+ result << "stages" << Value(pipeline->writeExplainOps(*expCtx->explain));
+ } else {
+ // Cursor must be specified, if explain is not.
+ const bool keepCursor =
+ handleCursorCommand(opCtx, origNss, pin.getCursor(), request, result);
+ if (keepCursor) {
+ cursorFreer.Dismiss();
}
+ }
- // Clean up our ClientCursorPin, if needed. We must reacquire the collection lock
- // in order to do so.
- if (pin) {
- // We acquire locks here with DBLock and CollectionLock instead of using
- // AutoGetCollectionForRead. AutoGetCollectionForRead will throw if the
- // sharding version is out of date, and we don't care if the sharding version
- // has changed.
- Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS);
- Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS);
- if (keepCursor) {
- pin->release();
- } else {
- pin->deleteUnderlying();
- }
- }
- } catch (...) {
- // On our way out of scope, we clean up our ClientCursorPin if needed.
- if (pin) {
- Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS);
- Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS);
- pin->deleteUnderlying();
- }
- throw;
+ if (!expCtx->explain) {
+ PlanSummaryStats stats;
+ Explain::getSummaryStats(*(pin.getCursor()->getExecutor()), &stats);
+ curOp->debug().setPlanSummaryMetrics(stats);
+ curOp->debug().nreturned = stats.nReturned;
}
+
// Any code that needs the cursor pinned must be inside the try block, above.
return Status::OK();
}
diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp
index 51016d486ce..2af0907170d 100644
--- a/src/mongo/db/curop.cpp
+++ b/src/mongo/db/curop.cpp
@@ -266,10 +266,12 @@ void CurOp::ensureStarted() {
}
}
-void CurOp::enter_inlock(const char* ns, int dbProfileLevel) {
+void CurOp::enter_inlock(const char* ns, boost::optional<int> dbProfileLevel) {
ensureStarted();
_ns = ns;
- raiseDbProfileLevel(dbProfileLevel);
+ if (dbProfileLevel) {
+ raiseDbProfileLevel(*dbProfileLevel);
+ }
}
void CurOp::raiseDbProfileLevel(int dbProfileLevel) {
diff --git a/src/mongo/db/curop.h b/src/mongo/db/curop.h
index 4e0ef0d2d7b..158513bf6fe 100644
--- a/src/mongo/db/curop.h
+++ b/src/mongo/db/curop.h
@@ -192,7 +192,7 @@ public:
return _originatingCommand;
}
- void enter_inlock(const char* ns, int dbProfileLevel);
+ void enter_inlock(const char* ns, boost::optional<int> dbProfileLevel);
/**
* Sets the type of the current network operation.
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index 84b6584b540..8001b658a09 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -75,6 +75,35 @@ AutoGetOrCreateDb::AutoGetOrCreateDb(OperationContext* opCtx, StringData ns, Loc
}
}
+AutoStatsTracker::AutoStatsTracker(OperationContext* opCtx,
+ const NamespaceString& nss,
+ Top::LockType lockType,
+ boost::optional<int> dbProfilingLevel)
+ : _opCtx(opCtx), _lockType(lockType) {
+ if (!dbProfilingLevel) {
+ // No profiling level was determined, attempt to read the profiling level from the Database
+ // object.
+ AutoGetDb autoDb(_opCtx, nss.db(), MODE_IS);
+ if (autoDb.getDb()) {
+ dbProfilingLevel = autoDb.getDb()->getProfilingLevel();
+ }
+ }
+ stdx::lock_guard<Client> clientLock(*_opCtx->getClient());
+ CurOp::get(_opCtx)->enter_inlock(nss.ns().c_str(), dbProfilingLevel);
+}
+
+AutoStatsTracker::~AutoStatsTracker() {
+ auto curOp = CurOp::get(_opCtx);
+ Top::get(_opCtx->getServiceContext())
+ .record(_opCtx,
+ curOp->getNS(),
+ curOp->getLogicalOp(),
+ _lockType,
+ _timer.micros(),
+ curOp->isCommand(),
+ curOp->getReadWriteType());
+}
+
AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx,
const NamespaceString& nss,
AutoGetCollection::ViewMode viewMode) {
@@ -84,19 +113,6 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx,
_ensureMajorityCommittedSnapshotIsValid(nss, opCtx);
}
-AutoGetCollectionForReadCommand::~AutoGetCollectionForReadCommand() {
- // Report time spent in read lock
- auto currentOp = CurOp::get(_opCtx);
- Top::get(_opCtx->getClient()->getServiceContext())
- .record(_opCtx,
- currentOp->getNS(),
- currentOp->getLogicalOp(),
- -1, // "read locked"
- _timer.micros(),
- currentOp->isCommand(),
- currentOp->getReadWriteType());
-}
-
void AutoGetCollectionForRead::_ensureMajorityCommittedSnapshotIsValid(const NamespaceString& nss,
OperationContext* opCtx) {
while (true) {
@@ -134,25 +150,15 @@ void AutoGetCollectionForRead::_ensureMajorityCommittedSnapshotIsValid(const Nam
}
AutoGetCollectionForReadCommand::AutoGetCollectionForReadCommand(
- OperationContext* opCtx, const NamespaceString& nss, AutoGetCollection::ViewMode viewMode)
- : _opCtx(opCtx) {
- {
- _autoCollForRead.emplace(opCtx, nss, viewMode);
-
- auto curOp = CurOp::get(_opCtx);
- stdx::lock_guard<Client> lk(*_opCtx->getClient());
-
- // TODO: OldClientContext legacy, needs to be removed
- curOp->ensureStarted();
- curOp->setNS_inlock(nss.ns());
-
- // At this point, we are locked in shared mode for the database by the DB lock in the
- // constructor, so it is safe to load the DB pointer.
- if (_autoCollForRead->getDb()) {
- // TODO: OldClientContext legacy, needs to be removed
- curOp->enter_inlock(nss.ns().c_str(), _autoCollForRead->getDb()->getProfilingLevel());
- }
- }
+ OperationContext* opCtx, const NamespaceString& nss, AutoGetCollection::ViewMode viewMode) {
+
+ _autoCollForRead.emplace(opCtx, nss, viewMode);
+ const int doNotChangeProfilingLevel = 0;
+ _statsTracker.emplace(opCtx,
+ nss,
+ Top::LockType::ReadLocked,
+ _autoCollForRead->getDb() ? _autoCollForRead->getDb()->getProfilingLevel()
+ : doNotChangeProfilingLevel);
// We have both the DB and collection locked, which is the prerequisite to do a stable shard
// version check, but we'd like to do the check after we have a satisfactory snapshot.
@@ -231,7 +237,8 @@ OldClientContext::~OldClientContext() {
.record(_opCtx,
currentOp->getNS(),
currentOp->getLogicalOp(),
- _opCtx->lockState()->isWriteLocked() ? 1 : -1,
+ _opCtx->lockState()->isWriteLocked() ? Top::LockType::WriteLocked
+ : Top::LockType::ReadLocked,
_timer.micros(),
currentOp->isCommand(),
currentOp->getReadWriteType());
diff --git a/src/mongo/db/db_raii.h b/src/mongo/db/db_raii.h
index 8060d06f000..42444775bad 100644
--- a/src/mongo/db/db_raii.h
+++ b/src/mongo/db/db_raii.h
@@ -35,6 +35,7 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/stats/top.h"
#include "mongo/db/views/view.h"
#include "mongo/util/timer.h"
@@ -170,6 +171,36 @@ private:
};
/**
+ * RAII-style class which automatically tracks the operation namespace in CurrentOp and records the
+ * operation via Top upon destruction.
+ */
+class AutoStatsTracker {
+ MONGO_DISALLOW_COPYING(AutoStatsTracker);
+
+public:
+ /**
+ * Sets the namespace of the CurOp object associated with 'opCtx' to be 'nss' and starts the
+ * CurOp timer. 'lockType' describes which type of lock is held by this operation, and will be
+ * used for reporting via Top. If 'dbProfilingLevel' is not given, this constructor will acquire
+ * and then drop a database lock in order to determine the database's profiling level.
+ */
+ AutoStatsTracker(OperationContext* opCtx,
+ const NamespaceString& nss,
+ Top::LockType lockType,
+ boost::optional<int> dbProfilingLevel);
+
+ /**
+ * Records stats about the current operation via Top.
+ */
+ ~AutoStatsTracker();
+
+private:
+ const Timer _timer;
+ OperationContext* _opCtx;
+ Top::LockType _lockType;
+};
+
+/**
* RAII-style class, which would acquire the appropriate hierarchy of locks for obtaining
* a particular collection and would retrieve a reference to the collection. In addition, this
* utility will ensure that the read will be performed against an appropriately committed snapshot
@@ -236,8 +267,6 @@ public:
: AutoGetCollectionForReadCommand(
opCtx, nss, AutoGetCollection::ViewMode::kViewsForbidden) {}
- ~AutoGetCollectionForReadCommand();
-
Database* getDb() const {
return _autoCollForRead->getDb();
}
@@ -246,21 +275,18 @@ public:
return _autoCollForRead->getCollection();
}
-private:
- OperationContext* const _opCtx;
- const Timer _timer;
-
protected:
AutoGetCollectionForReadCommand(OperationContext* opCtx,
const NamespaceString& nss,
AutoGetCollection::ViewMode viewMode);
- /**
- * This protected section must come after the private section because
- * AutoGetCollectionOrViewForRead needs access to _autoColl, but _autoColl must be initialized
- * after _transaction.
- */
+ // '_autoCollForRead' may need to be reset by AutoGetCollectionOrViewForReadCommand, so needs to
+ // be a boost::optional.
boost::optional<AutoGetCollectionForRead> _autoCollForRead;
+
+ // This needs to be initialized after 'autoCollForRead', since we need to consult the Database
+ // object to get the profiling level. Thus, it needs to be a boost::optional.
+ boost::optional<AutoStatsTracker> _statsTracker;
};
/**
diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h
index 9482f9ba565..fd5b99c4bae 100644
--- a/src/mongo/db/exec/pipeline_proxy.h
+++ b/src/mongo/db/exec/pipeline_proxy.h
@@ -37,6 +37,7 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/record_id.h"
+#include "mongo/util/assert_util.h"
namespace mongo {
@@ -64,13 +65,18 @@ public:
// Not used.
SpecificStats* getSpecificStats() const final {
- return NULL;
+ MONGO_UNREACHABLE;
+ }
+
+ void doInvalidate(OperationContext* txn, const RecordId& rid, InvalidationType type) final {
+ // A PlanExecutor with a PipelineProxyStage should be registered with the global cursor
+ // manager, so should not receive invalidations.
+ MONGO_UNREACHABLE;
}
std::string getPlanSummaryStr() const;
void getPlanSummaryStats(PlanSummaryStats* statsOut) const;
- // Not used.
StageType stageType() const final {
return STAGE_PIPELINE_PROXY;
}
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 6ee4d989ceb..0a453546a61 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -137,6 +137,17 @@ NamespaceString NamespaceString::getTargetNSForListIndexes() const {
return NamespaceString(db(), coll().substr(listIndexesCursorNSPrefix.size()));
}
+boost::optional<NamespaceString> NamespaceString::getTargetNSForGloballyManagedNamespace() const {
+ // Globally managed namespaces are of the form '$cmd.commandName.<targetNs>' or simply
+ // '$cmd.commandName'.
+ dassert(isGloballyManagedNamespace());
+ const size_t indexOfNextDot = coll().find('.', 5);
+ if (indexOfNextDot == std::string::npos) {
+ return boost::none;
+ }
+ return NamespaceString{db(), coll().substr(indexOfNextDot + 1)};
+}
+
string NamespaceString::escapeDbName(const StringData dbname) {
std::string escapedDbName;
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index c623dbbac90..58f26d76b2c 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -31,6 +31,7 @@
#pragma once
#include <algorithm>
+#include <boost/optional.hpp>
#include <iosfwd>
#include <string>
@@ -93,13 +94,13 @@ public:
NamespaceString(StringData dbName, StringData collectionName);
/**
- * Contructs a NamespaceString representing a listCollections namespace. The format for this
+ * Constructs a NamespaceString representing a listCollections namespace. The format for this
* namespace is "<dbName>.$cmd.listCollections".
*/
static NamespaceString makeListCollectionsNSS(StringData dbName);
/**
- * Contructs a NamespaceString representing a listIndexes namespace. The format for this
+ * Constructs a NamespaceString representing a listIndexes namespace. The format for this
* namespace is "<dbName>.$cmd.listIndexes.<collectionName>".
*/
static NamespaceString makeListIndexesNSS(StringData dbName, StringData collectionName);
@@ -202,10 +203,25 @@ public:
bool isVirtualized() const {
return virtualized(_ns);
}
+
+ /**
+ * Returns true if cursors for this namespace are registered with the global cursor manager.
+ */
+ bool isGloballyManagedNamespace() const {
+ return coll().startsWith("$cmd."_sd);
+ }
+
bool isListCollectionsCursorNS() const;
bool isListIndexesCursorNS() const;
/**
+ * Given a NamespaceString for which isGloballyManagedNamespace() returns true, returns the
+ * namespace the command targets, or boost::none for commands like 'listCollections' which
+ * do not target a collection.
+ */
+ boost::optional<NamespaceString> getTargetNSForGloballyManagedNamespace() const;
+
+ /**
* Given a NamespaceString for which isListIndexesCursorNS() returns true, returns the
* NamespaceString for the collection that the "listIndexes" targets.
*/
diff --git a/src/mongo/db/namespace_string_test.cpp b/src/mongo/db/namespace_string_test.cpp
index 69884bd5ca9..3e4b489b508 100644
--- a/src/mongo/db/namespace_string_test.cpp
+++ b/src/mongo/db/namespace_string_test.cpp
@@ -173,6 +173,41 @@ TEST(NamespaceStringTest, ListIndexesCursorNS) {
ASSERT(!NamespaceString("test.$cmd.listCollections.foo").isListIndexesCursorNS());
}
+TEST(NamespaceStringTest, IsGloballyManagedNamespace) {
+ ASSERT_TRUE(NamespaceString{"test.$cmd.aggregate.foo"}.isGloballyManagedNamespace());
+ ASSERT_TRUE(NamespaceString{"test.$cmd.listIndexes.foo"}.isGloballyManagedNamespace());
+ ASSERT_TRUE(NamespaceString{"test.$cmd.otherCommand.foo"}.isGloballyManagedNamespace());
+ ASSERT_TRUE(NamespaceString{"test.$cmd.listCollections"}.isGloballyManagedNamespace());
+ ASSERT_TRUE(NamespaceString{"test.$cmd.otherCommand"}.isGloballyManagedNamespace());
+ ASSERT_TRUE(NamespaceString{"test.$cmd.aggregate"}.isGloballyManagedNamespace());
+ ASSERT_TRUE(NamespaceString{"test.$cmd.listIndexes"}.isGloballyManagedNamespace());
+
+ ASSERT_FALSE(NamespaceString{"test.foo"}.isGloballyManagedNamespace());
+ ASSERT_FALSE(NamespaceString{"test.$cmd"}.isGloballyManagedNamespace());
+
+ ASSERT_FALSE(NamespaceString{"$cmd.aggregate.foo"}.isGloballyManagedNamespace());
+ ASSERT_FALSE(NamespaceString{"$cmd.listCollections"}.isGloballyManagedNamespace());
+}
+
+TEST(NamespaceStringTest, GetTargetNSForGloballyManagedNamespace) {
+ ASSERT_EQ(
+ (NamespaceString{"test", "foo"}),
+ NamespaceString{"test.$cmd.aggregate.foo"}.getTargetNSForGloballyManagedNamespace().get());
+ ASSERT_EQ((NamespaceString{"test", "foo"}),
+ NamespaceString{"test.$cmd.listIndexes.foo"}
+ .getTargetNSForGloballyManagedNamespace()
+ .get());
+ ASSERT_EQ((NamespaceString{"test", "foo"}),
+ NamespaceString{"test.$cmd.otherCommand.foo"}
+ .getTargetNSForGloballyManagedNamespace()
+ .get());
+
+ ASSERT_FALSE(
+ NamespaceString{"test.$cmd.listCollections"}.getTargetNSForGloballyManagedNamespace());
+ ASSERT_FALSE(
+ NamespaceString{"test.$cmd.otherCommand"}.getTargetNSForGloballyManagedNamespace());
+}
+
TEST(NamespaceStringTest, CollectionComponentValidNames) {
ASSERT(NamespaceString::validCollectionComponent("a.b"));
ASSERT(NamespaceString::validCollectionComponent("a.b"));
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 60f0b7c79b6..a29a42e4a52 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -94,7 +94,7 @@ void finishCurOp(OperationContext* opCtx, CurOp* curOp) {
.record(opCtx,
curOp->getNS(),
curOp->getLogicalOp(),
- 1, // "write locked"
+ Top::LockType::WriteLocked,
curOp->totalTimeMicros(),
curOp->isCommand(),
curOp->getReadWriteType());
@@ -412,7 +412,7 @@ WriteResult performInserts(OperationContext* opCtx, const InsertOp& wholeOp) {
.record(opCtx,
wholeOp.ns.ns(),
LogicalOp::opInsert,
- 1 /* write locked*/,
+ Top::LockType::WriteLocked,
curOp.totalTimeMicros(),
curOp.isCommand(),
curOp.getReadWriteType());
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index e243adda24a..ee26ddc62fd 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -473,5 +473,6 @@ env.Library(
'$BUILD_DIR/mongo/db/index/index_access_methods',
'$BUILD_DIR/mongo/db/matcher/expressions_mongod_only',
'$BUILD_DIR/mongo/db/stats/serveronly',
+ '$BUILD_DIR/mongo/db/clientcursor',
],
)
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index ae62da0a978..edc49e4923e 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -31,12 +31,11 @@
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/catalog/collection.h"
+#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/query/explain.h"
-#include "mongo/db/query/find_common.h"
-#include "mongo/db/server_parameters.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/util/scopeguard.h"
@@ -66,20 +65,27 @@ DocumentSource::GetNextResult DocumentSourceCursor::getNext() {
}
void DocumentSourceCursor::dispose() {
- _exec.reset();
_currentBatch.clear();
+ if (!_exec) {
+ return;
+ }
+
+ // We must hold a collection lock to destroy a PlanExecutor to ensure the CursorManager will
+ // still exist and can be safely told to deregister the PlanExecutor.
+ invariant(pExpCtx->opCtx);
+ AutoGetCollection autoColl(pExpCtx->opCtx, _exec->nss(), MODE_IS);
+ _exec.reset();
+ _rangePreserver.release();
}
void DocumentSourceCursor::loadBatch() {
if (!_exec) {
+ // No more documents.
dispose();
return;
}
- // We have already validated the sharding version when we constructed the PlanExecutor
- // so we shouldn't check it again.
- AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _nss);
-
+ AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss());
_exec->restoreState();
int memUsageBytes = 0;
@@ -88,7 +94,7 @@ void DocumentSourceCursor::loadBatch() {
{
ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); });
- while ((state = _exec->getNext(&obj, NULL)) == PlanExecutor::ADVANCED) {
+ while ((state = _exec->getNext(&obj, nullptr)) == PlanExecutor::ADVANCED) {
if (_shouldProduceEmptyDocs) {
_currentBatch.push_back(Document());
} else if (_dependencies) {
@@ -114,27 +120,28 @@ void DocumentSourceCursor::loadBatch() {
}
}
- // If we got here, there won't be any more documents, so destroy the executor. Can't use
- // dispose since we want to keep the _currentBatch.
+ // If we got here, there won't be any more documents, so destroy our PlanExecutor. Note we can't
+ // use dispose() since we want to keep the current batch.
_exec.reset();
-
- uassert(16028,
- str::stream() << "collection or index disappeared when cursor yielded: "
- << WorkingSetCommon::toStatusString(obj),
- state != PlanExecutor::DEAD);
-
- uassert(17285,
- str::stream() << "cursor encountered an error: "
- << WorkingSetCommon::toStatusString(obj),
- state != PlanExecutor::FAILURE);
-
- massert(17286,
- str::stream() << "Unexpected return from PlanExecutor::getNext: " << state,
- state == PlanExecutor::IS_EOF || state == PlanExecutor::ADVANCED);
-}
-
-long long DocumentSourceCursor::getLimit() const {
- return _limit ? _limit->getLimit() : -1;
+ _rangePreserver.release();
+
+ switch (state) {
+ case PlanExecutor::ADVANCED:
+ case PlanExecutor::IS_EOF:
+ return; // We've reached our limit or exhausted the cursor.
+ case PlanExecutor::DEAD: {
+ uasserted(ErrorCodes::QueryPlanKilled,
+ str::stream() << "collection or index disappeared when cursor yielded: "
+ << WorkingSetCommon::toStatusString(obj));
+ }
+ case PlanExecutor::FAILURE: {
+ uasserted(17285,
+ str::stream() << "cursor encountered an error: "
+ << WorkingSetCommon::toStatusString(obj));
+ }
+ default:
+ MONGO_UNREACHABLE;
+ }
}
Pipeline::SourceContainer::iterator DocumentSourceCursor::doOptimizeAt(
@@ -156,12 +163,6 @@ Pipeline::SourceContainer::iterator DocumentSourceCursor::doOptimizeAt(
return std::next(itr);
}
-
-void DocumentSourceCursor::recordPlanSummaryStr() {
- invariant(_exec);
- _planSummary = Explain::getPlanSummary(_exec.get());
-}
-
void DocumentSourceCursor::recordPlanSummaryStats() {
invariant(_exec);
// Aggregation handles in-memory sort outside of the query sub-system. Given that we need to
@@ -175,20 +176,17 @@ void DocumentSourceCursor::recordPlanSummaryStats() {
}
Value DocumentSourceCursor::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
- // we never parse a documentSourceCursor, so we only serialize for explain
+ // We never parse a DocumentSourceCursor, so we only serialize for explain.
if (!explain)
return Value();
// Get planner-level explain info from the underlying PlanExecutor.
+ invariant(_exec);
BSONObjBuilder explainBuilder;
{
- AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _nss);
-
- massert(17392, "No _exec. Were we disposed before explained?", _exec);
-
+ AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss());
_exec->restoreState();
Explain::explainStages(_exec.get(), autoColl.getCollection(), *explain, &explainBuilder);
-
_exec->saveState();
}
@@ -229,44 +227,28 @@ void DocumentSourceCursor::reattachToOperationContext(OperationContext* opCtx) {
}
DocumentSourceCursor::DocumentSourceCursor(Collection* collection,
- const string& ns,
std::unique_ptr<PlanExecutor> exec,
const intrusive_ptr<ExpressionContext>& pCtx)
: DocumentSource(pCtx),
_docsAddedToBatches(0),
- _nss(ns),
+ _rangePreserver(collection),
_exec(std::move(exec)),
_outputSorts(_exec->getOutputSorts()) {
- recordPlanSummaryStr();
- // We record execution metrics here to allow for capture of indexes used prior to execution.
+ _planSummary = Explain::getPlanSummary(_exec.get());
recordPlanSummaryStats();
+
if (collection) {
- collection->infoCache()->notifyOfQuery(pCtx->opCtx, _planSummaryStats.indexesUsed);
+ collection->infoCache()->notifyOfQuery(pExpCtx->opCtx, _planSummaryStats.indexesUsed);
}
}
intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create(
Collection* collection,
- const string& ns,
std::unique_ptr<PlanExecutor> exec,
const intrusive_ptr<ExpressionContext>& pExpCtx) {
intrusive_ptr<DocumentSourceCursor> source(
- new DocumentSourceCursor(collection, ns, std::move(exec), pExpCtx));
+ new DocumentSourceCursor(collection, std::move(exec), pExpCtx));
return source;
}
-
-void DocumentSourceCursor::setProjection(const BSONObj& projection,
- const boost::optional<ParsedDeps>& deps) {
- _projection = projection;
- _dependencies = deps;
-}
-
-const std::string& DocumentSourceCursor::getPlanSummaryStr() const {
- return _planSummary;
-}
-
-const PlanSummaryStats& DocumentSourceCursor::getPlanSummaryStats() const {
- return _planSummaryStats;
-}
}
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 3faeea86de6..348174b656a 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -34,16 +34,14 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/query/plan_summary_stats.h"
+#include "mongo/db/range_preserver.h"
namespace mongo {
class PlanExecutor;
/**
- * Constructs and returns Documents from the BSONObj objects produced by a supplied
- * PlanExecutor.
- *
- * An object of this type may only be used by one thread, see SERVER-6123.
+ * Constructs and returns Documents from the BSONObj objects produced by a supplied PlanExecutor.
*/
class DocumentSourceCursor final : public DocumentSource {
public:
@@ -69,14 +67,11 @@ public:
void reattachToOperationContext(OperationContext* opCtx) final;
/**
- * Create a document source based on a passed-in PlanExecutor.
- *
- * This is usually put at the beginning of a chain of document sources
- * in order to fetch data from the database.
+ * Create a document source based on a passed-in PlanExecutor. 'exec' must be a yielding
+ * PlanExecutor, and must be registered with the associated collection's CursorManager.
*/
static boost::intrusive_ptr<DocumentSourceCursor> create(
Collection* collection,
- const std::string& ns,
std::unique_ptr<PlanExecutor> exec,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
@@ -116,10 +111,17 @@ public:
* @param projection The projection that has been passed down to the query system.
* @param deps The output of DepsTracker::toParsedDeps.
*/
- void setProjection(const BSONObj& projection, const boost::optional<ParsedDeps>& deps);
+ void setProjection(const BSONObj& projection, const boost::optional<ParsedDeps>& deps) {
+ _projection = projection;
+ _dependencies = deps;
+ }
- /// returns -1 for no limit
- long long getLimit() const;
+ /**
+ * Returns the limit associated with this cursor, or -1 if there is no limit.
+ */
+ long long getLimit() const {
+ return _limit ? _limit->getLimit() : -1;
+ }
/**
* If subsequent sources need no information from the cursor, the cursor can simply output empty
@@ -129,20 +131,21 @@ public:
_shouldProduceEmptyDocs = true;
}
- const std::string& getPlanSummaryStr() const;
+ const std::string& getPlanSummaryStr() const {
+ return _planSummary;
+ }
- const PlanSummaryStats& getPlanSummaryStats() const;
+ const PlanSummaryStats& getPlanSummaryStats() const {
+ return _planSummaryStats;
+ }
private:
DocumentSourceCursor(Collection* collection,
- const std::string& ns,
std::unique_ptr<PlanExecutor> exec,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
void loadBatch();
- void recordPlanSummaryStr();
-
void recordPlanSummaryStats();
std::deque<Document> _currentBatch;
@@ -156,7 +159,7 @@ private:
boost::intrusive_ptr<DocumentSourceLimit> _limit;
long long _docsAddedToBatches; // for _limit enforcement
- const NamespaceString _nss;
+ RangePreserver _rangePreserver;
std::unique_ptr<PlanExecutor> _exec;
BSONObjSet _outputSorts;
std::string _planSummary;
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 262deeb140b..c37c1301198 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -579,15 +579,12 @@ void PipelineD::addCursorSource(Collection* collection,
const BSONObj& queryObj,
const BSONObj& sortObj,
const BSONObj& projectionObj) {
- // Get the full "namespace" name.
- const string& fullName = expCtx->ns.ns();
-
// DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved.
exec->saveState();
// Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline.
intrusive_ptr<DocumentSourceCursor> pSource =
- DocumentSourceCursor::create(collection, fullName, std::move(exec), expCtx);
+ DocumentSourceCursor::create(collection, std::move(exec), expCtx);
// Note the query, sort, and projection for explain.
pSource->setQuery(queryObj);
diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp
index eb43caf5047..b243336655b 100644
--- a/src/mongo/db/query/explain.cpp
+++ b/src/mongo/db/query/explain.cpp
@@ -596,7 +596,7 @@ void Explain::generatePlannerInfo(PlanExecutor* exec,
BSONObjBuilder plannerBob(out->subobjStart("queryPlanner"));
plannerBob.append("plannerVersion", QueryPlanner::kPlannerVersion);
- plannerBob.append("namespace", exec->ns());
+ plannerBob.append("namespace", exec->nss().ns());
// Find whether there is an index filter set for the query shape. The 'indexFilterSet'
// field will always be false in the case of EOF or idhack plans.
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index 44f69823096..62a08389e3f 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -55,6 +55,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/stats/top.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/stale_exception.h"
@@ -62,10 +63,10 @@
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
-using std::endl;
using std::unique_ptr;
using stdx::make_unique;
@@ -238,47 +239,45 @@ Message getMore(OperationContext* opCtx,
const NamespaceString nss(ns);
- // Depending on the type of cursor being operated on, we hold locks for the whole getMore,
- // or none of the getMore, or part of the getMore. The three cases in detail:
+ // Cursors come in one of two flavors:
+ // - Cursors owned by the collection cursor manager, such as those generated via the find
+ // command. For these cursors, we hold the appropriate collection lock for the duration of
+ // the getMore using AutoGetCollectionForRead. This will automatically update the CurOp
+ // object appropriately and record execution time via Top upon completion.
+ // - Cursors owned by the global cursor manager, such as those generated via the aggregate
+ // command. These cursors either hold no collection state or manage their collection state
+ // internally, so we acquire no locks. In this case we use the AutoStatsTracker object to
+ // update the CurOp object appropriately and record execution time via Top upon
+ // completion.
//
- // 1) Normal cursor: we lock with "ctx" and hold it for the whole getMore.
- // 2) Cursor owned by global cursor manager: we don't lock anything. These cursors don't own
- // any collection state. These cursors are generated either by the listCollections or
- // listIndexes commands, as these special cursor-generating commands operate over catalog
- // data rather than targeting the data within a collection.
- // 3) Agg cursor: we lock with "ctx", then release, then relock with "unpinDBLock" and
- // "unpinCollLock". This is because agg cursors handle locking internally (hence the
- // release), but the pin and unpin of the cursor must occur under the collection lock.
- // We don't use our AutoGetCollectionForRead "ctx" to relock, because
- // AutoGetCollectionForRead checks the sharding version (and we want the relock for the
- // unpin to succeed even if the sharding version has changed).
- //
- // Note that we declare our locks before our ClientCursorPin, in order to ensure that the
- // pin's destructor is called before the lock destructors (so that the unpin occurs under
- // the lock).
- unique_ptr<AutoGetCollectionForReadCommand> ctx;
- unique_ptr<Lock::DBLock> unpinDBLock;
- unique_ptr<Lock::CollectionLock> unpinCollLock;
-
+ // Thus, only one of 'readLock' and 'statsTracker' will be populated as we populate
+ // 'cursorManager'.
+ boost::optional<AutoGetCollectionForReadCommand> readLock;
+ boost::optional<AutoStatsTracker> statsTracker;
CursorManager* cursorManager;
- if (nss.isListIndexesCursorNS() || nss.isListCollectionsCursorNS()) {
- // List collections and list indexes are special cursor-generating commands whose
- // cursors are managed globally, as they operate over catalog data rather than targeting
- // the data within a collection.
+
+ if (CursorManager::isGloballyManagedCursor(cursorid)) {
cursorManager = CursorManager::getGlobalCursorManager();
- } else {
- ctx = stdx::make_unique<AutoGetCollectionOrViewForReadCommand>(opCtx, nss);
- auto viewCtx = static_cast<AutoGetCollectionOrViewForReadCommand*>(ctx.get());
- if (viewCtx->getView()) {
- uasserted(
+
+ if (boost::optional<NamespaceString> nssForCurOp = nss.isGloballyManagedNamespace()
+ ? nss.getTargetNSForGloballyManagedNamespace()
+ : nss) {
+ AutoGetDb autoDb(opCtx, nssForCurOp->db(), MODE_IS);
+ const auto profilingLevel = autoDb.getDb()
+ ? boost::optional<int>{autoDb.getDb()->getProfilingLevel()}
+ : boost::none;
+ statsTracker.emplace(opCtx, *nssForCurOp, Top::LockType::NotLocked, profilingLevel);
+ uassert(
ErrorCodes::CommandNotSupportedOnView,
- str::stream() << "Namespace " << nss.ns()
+ str::stream() << "Namespace " << nssForCurOp->ns()
<< " is a view. OP_GET_MORE operations are not supported on views. "
<< "Only clients which support the getMore command can be used to "
- "query views.");
+ "query views.",
+ !autoDb.getDb()->getViewCatalog()->lookup(opCtx, nssForCurOp->ns()));
}
-
- Collection* collection = ctx->getCollection();
+ } else {
+ readLock.emplace(opCtx, nss);
+ Collection* collection = readLock->getCollection();
uassert(17356, "collection dropped between getMore calls", collection);
cursorManager = collection->getCursorManager();
}
@@ -323,8 +322,8 @@ Message getMore(OperationContext* opCtx,
str::stream() << "Requested getMore on namespace " << ns << ", but cursor "
<< cursorid
<< " belongs to namespace "
- << cc->ns(),
- ns == cc->ns());
+ << cc->nss().ns(),
+ nss == cc->nss());
*isCursorAuthorized = true;
if (cc->isReadCommitted())
@@ -345,11 +344,6 @@ Message getMore(OperationContext* opCtx,
cc->updateSlaveLocation(opCtx);
- if (cc->isAggCursor()) {
- // Agg cursors handle their own locking internally.
- ctx.reset(); // unlocks
- }
-
// If we're replaying the oplog, we save the last time that we read.
Timestamp slaveReadTill;
@@ -359,12 +353,12 @@ Message getMore(OperationContext* opCtx,
uint64_t notifierVersion = 0;
std::shared_ptr<CappedInsertNotifier> notifier;
if (isCursorAwaitData(cc)) {
- invariant(ctx->getCollection()->isCapped());
+ invariant(readLock->getCollection()->isCapped());
// Retrieve the notifier which we will wait on until new data arrives. We make sure
// to do this in the lock because once we drop the lock it is possible for the
// collection to become invalid. The notifier itself will outlive the collection if
// the collection is dropped, as we keep a shared_ptr to it.
- notifier = ctx->getCollection()->getCappedInsertNotifier();
+ notifier = readLock->getCollection()->getCappedInsertNotifier();
// Must get the version before we call generateBatch in case a write comes in after
// that call and before we call wait on the notifier.
@@ -384,7 +378,7 @@ Message getMore(OperationContext* opCtx,
// and currentOp. Upconvert _query to resemble a getMore command, and set the original
// command or upconverted legacy query in the originatingCommand field.
curOp.setQuery_inlock(upconvertGetMoreEntry(nss, cursorid, ntoreturn));
- curOp.setOriginatingCommand_inlock(cc->getQuery());
+ curOp.setOriginatingCommand_inlock(cc->getOriginatingCommandObj());
}
PlanExecutor::ExecState state;
@@ -402,7 +396,7 @@ Message getMore(OperationContext* opCtx,
if (isCursorAwaitData(cc) && state == PlanExecutor::IS_EOF && numResults == 0) {
// Save the PlanExecutor and drop our locks.
exec->saveState();
- ctx.reset();
+ readLock.reset();
// Block waiting for data for up to 1 second.
Seconds timeout(1);
@@ -414,7 +408,7 @@ Message getMore(OperationContext* opCtx,
curOp.setExpectedLatencyMs(durationCount<Milliseconds>(timeout));
// Reacquiring locks.
- ctx = make_unique<AutoGetCollectionForReadCommand>(opCtx, nss);
+ readLock.emplace(opCtx, nss);
exec->restoreState();
// We woke up because either the timed_wait expired, or there was more data. Either
@@ -428,44 +422,28 @@ Message getMore(OperationContext* opCtx,
postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined;
curOp.debug().setPlanSummaryMetrics(postExecutionStats);
- // We do not report 'execStats' for aggregation, both in the original request and
- // subsequent getMore. The reason for this is that aggregation's source PlanExecutor
- // could be destroyed before we know whether we need execStats and we do not want to
- // generate for all operations due to cost.
- if (!cc->isAggCursor() && curOp.shouldDBProfile()) {
+ // We do not report 'execStats' for aggregation or other globally managed cursors, both in
+ // the original request and subsequent getMore. It would be useful to have this information
+ // for an aggregation, but the source PlanExecutor could be destroyed before we know whether
+ // we need execStats and we do not want to generate for all operations due to cost.
+ if (!CursorManager::isGloballyManagedCursor(cursorid) && curOp.shouldDBProfile()) {
BSONObjBuilder execStatsBob;
Explain::getWinningPlanStats(exec, &execStatsBob);
curOp.debug().execStats = execStatsBob.obj();
}
- // We have to do this before re-acquiring locks in the agg case because
- // shouldSaveCursorGetMore() can make a network call for agg cursors.
- //
- // TODO: Getting rid of PlanExecutor::isEOF() in favor of PlanExecutor::IS_EOF would mean
- // that this network operation is no longer necessary.
- const bool shouldSaveCursor = shouldSaveCursorGetMore(state, exec, isCursorTailable(cc));
-
- // In order to deregister a cursor, we need to be holding the DB + collection lock and
- // if the cursor is aggregation, we release these locks.
- if (cc->isAggCursor()) {
- invariant(NULL == ctx.get());
- unpinDBLock = make_unique<Lock::DBLock>(opCtx, nss.db(), MODE_IS);
- unpinCollLock =
- make_unique<Lock::CollectionLock>(opCtx->lockState(), nss.ns(), MODE_IS);
- }
-
// Our two possible ClientCursorPin cleanup paths are:
// 1) If the cursor is not going to be saved, we call deleteUnderlying() on the pin.
- // 2) If the cursor is going to be saved, we simply let the pin go out of scope. In
- // this case, the pin's destructor will be invoked, which will call release() on the
- // pin. Because our ClientCursorPin is declared after our lock is declared, this
- // will happen under the lock.
- if (!shouldSaveCursor) {
+ // 2) If the cursor is going to be saved, we simply let the pin go out of scope. In this
+ // case, the pin's destructor will be invoked, which will call release() on the pin.
+ // Because our ClientCursorPin is declared after our lock is declared, this will happen
+ // under the lock if any locking was necessary.
+ if (!shouldSaveCursorGetMore(state, exec, isCursorTailable(cc))) {
ccPin.getValue().deleteUnderlying();
// cc is now invalid, as is the executor
cursorid = 0;
- cc = NULL;
+ cc = nullptr;
curOp.debug().cursorExhausted = true;
LOG(5) << "getMore NOT saving client cursor, ended with state "
@@ -673,10 +651,9 @@ std::string runQuery(OperationContext* opCtx,
// Allocate a new ClientCursor and register it with the cursor manager.
ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor(
- {exec.release(),
- nss.ns(),
+ {std::move(exec),
+ nss,
opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
- qr.getOptions(),
upconvertQueryEntry(q.query, qr.nss(), q.ntoreturn, q.ntoskip)});
ccId = pinnedCursor.getCursor()->cursorid();
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 9aa85a7d81a..be3673369a7 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -739,8 +739,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorDelete(OperationContext* opCtx,
<< " Using EOF stage: " << redact(unparsedQuery);
auto deleteStage = make_unique<DeleteStage>(
opCtx, deleteStageParams, ws.get(), nullptr, new EOFStage(opCtx));
- return PlanExecutor::make(
- opCtx, std::move(ws), std::move(deleteStage), nss.ns(), policy);
+ return PlanExecutor::make(opCtx, std::move(ws), std::move(deleteStage), nss, policy);
}
const IndexDescriptor* descriptor = collection->getIndexCatalog()->findIdIndex(opCtx);
@@ -906,7 +905,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorUpdate(OperationContext* opCtx,
auto updateStage = make_unique<UpdateStage>(
opCtx, updateStageParams, ws.get(), collection, new EOFStage(opCtx));
return PlanExecutor::make(
- opCtx, std::move(ws), std::move(updateStage), nsString.ns(), policy);
+ opCtx, std::move(ws), std::move(updateStage), nsString, policy);
}
const IndexDescriptor* descriptor = collection->getIndexCatalog()->findIdIndex(opCtx);
@@ -1004,8 +1003,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorGroup(OperationContext* opCtx,
unique_ptr<PlanStage> root =
make_unique<GroupStage>(opCtx, request, ws.get(), new EOFStage(opCtx));
- return PlanExecutor::make(
- opCtx, std::move(ws), std::move(root), request.ns.ns(), yieldPolicy);
+ return PlanExecutor::make(opCtx, std::move(ws), std::move(root), request.ns, yieldPolicy);
}
const NamespaceString nss(request.ns);
@@ -1263,7 +1261,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorCount(OperationContext* opCtx,
unique_ptr<PlanStage> root = make_unique<CountStage>(
opCtx, collection, std::move(params), ws.get(), new EOFStage(opCtx));
return PlanExecutor::make(
- opCtx, std::move(ws), std::move(root), request.getNs().ns(), yieldPolicy);
+ opCtx, std::move(ws), std::move(root), request.getNs(), yieldPolicy);
}
// If the query is empty, then we can determine the count by just asking the collection
@@ -1280,7 +1278,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorCount(OperationContext* opCtx,
unique_ptr<PlanStage> root =
make_unique<CountStage>(opCtx, collection, std::move(params), ws.get(), nullptr);
return PlanExecutor::make(
- opCtx, std::move(ws), std::move(root), request.getNs().ns(), yieldPolicy);
+ opCtx, std::move(ws), std::move(root), request.getNs(), yieldPolicy);
}
const size_t plannerOptions = QueryPlannerParams::IS_COUNT;
diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp
index ade228a0223..f7c3e96bce1 100644
--- a/src/mongo/db/query/internal_plans.cpp
+++ b/src/mongo/db/query/internal_plans.cpp
@@ -52,8 +52,8 @@ std::unique_ptr<PlanExecutor> InternalPlanner::collectionScan(OperationContext*
if (NULL == collection) {
auto eof = stdx::make_unique<EOFStage>(opCtx);
// Takes ownership of 'ws' and 'eof'.
- auto statusWithPlanExecutor =
- PlanExecutor::make(opCtx, std::move(ws), std::move(eof), ns.toString(), yieldPolicy);
+ auto statusWithPlanExecutor = PlanExecutor::make(
+ opCtx, std::move(ws), std::move(eof), NamespaceString(ns), yieldPolicy);
invariant(statusWithPlanExecutor.isOK());
return std::move(statusWithPlanExecutor.getValue());
}
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index dd3f66164f4..8e79fe1a2c5 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -91,17 +91,23 @@ StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx,
const Collection* collection,
YieldPolicy yieldPolicy) {
return PlanExecutor::make(
- opCtx, std::move(ws), std::move(rt), nullptr, nullptr, collection, "", yieldPolicy);
+ opCtx, std::move(ws), std::move(rt), nullptr, nullptr, collection, {}, yieldPolicy);
}
// static
StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx,
unique_ptr<WorkingSet> ws,
unique_ptr<PlanStage> rt,
- const string& ns,
+ NamespaceString nss,
YieldPolicy yieldPolicy) {
- return PlanExecutor::make(
- opCtx, std::move(ws), std::move(rt), nullptr, nullptr, nullptr, ns, yieldPolicy);
+ return PlanExecutor::make(opCtx,
+ std::move(ws),
+ std::move(rt),
+ nullptr,
+ nullptr,
+ nullptr,
+ std::move(nss),
+ yieldPolicy);
}
// static
@@ -112,7 +118,7 @@ StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx,
const Collection* collection,
YieldPolicy yieldPolicy) {
return PlanExecutor::make(
- opCtx, std::move(ws), std::move(rt), nullptr, std::move(cq), collection, "", yieldPolicy);
+ opCtx, std::move(ws), std::move(rt), nullptr, std::move(cq), collection, {}, yieldPolicy);
}
// static
@@ -129,7 +135,7 @@ StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx,
std::move(qs),
std::move(cq),
collection,
- "",
+ {},
yieldPolicy);
}
@@ -140,10 +146,15 @@ StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx,
unique_ptr<QuerySolution> qs,
unique_ptr<CanonicalQuery> cq,
const Collection* collection,
- const string& ns,
+ NamespaceString nss,
YieldPolicy yieldPolicy) {
- unique_ptr<PlanExecutor> exec(new PlanExecutor(
- opCtx, std::move(ws), std::move(rt), std::move(qs), std::move(cq), collection, ns));
+ unique_ptr<PlanExecutor> exec(new PlanExecutor(opCtx,
+ std::move(ws),
+ std::move(rt),
+ std::move(qs),
+ std::move(cq),
+ collection,
+ std::move(nss)));
// Perform plan selection, if necessary.
Status status = exec->pickBestPlan(yieldPolicy, collection);
@@ -160,25 +171,24 @@ PlanExecutor::PlanExecutor(OperationContext* opCtx,
unique_ptr<QuerySolution> qs,
unique_ptr<CanonicalQuery> cq,
const Collection* collection,
- const string& ns)
+ NamespaceString nss)
: _opCtx(opCtx),
_cq(std::move(cq)),
_workingSet(std::move(ws)),
_qs(std::move(qs)),
_root(std::move(rt)),
- _ns(ns),
+ _nss(std::move(nss)),
_yieldPolicy(new PlanYieldPolicy(this, YIELD_MANUAL)) {
- // We may still need to initialize _ns from either collection or _cq.
- if (!_ns.empty()) {
- // We already have an _ns set, so there's nothing more to do.
- return;
+ // We may still need to initialize _nss from either collection or _cq.
+ if (!_nss.isEmpty()) {
+ return; // We already have an _nss set, so there's nothing more to do.
}
if (collection) {
- _ns = collection->ns().ns();
+ _nss = collection->ns();
} else {
invariant(_cq);
- _ns = _cq->getQueryRequest().ns();
+ _nss = _cq->getQueryRequest().nss();
}
}
@@ -362,7 +372,7 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
MONGO_FAIL_POINT_BLOCK(planExecutorAlwaysDead, customKill) {
const BSONObj& data = customKill.getData();
BSONElement customKillNS = data["namespace"];
- if (!customKillNS || _ns == customKillNS.str()) {
+ if (!customKillNS || _nss.ns() == customKillNS.str()) {
deregisterExec();
kill("hit planExecutorAlwaysDead fail point");
}
@@ -469,7 +479,8 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
throw WriteConflictException();
CurOp::get(_opCtx)->debug().writeConflicts++;
writeConflictsInARow++;
- WriteConflictException::logAndBackoff(writeConflictsInARow, "plan execution", _ns);
+ WriteConflictException::logAndBackoff(
+ writeConflictsInARow, "plan execution", _nss.ns());
} else {
WorkingSetMember* member = _workingSet->get(id);
@@ -546,10 +557,6 @@ Status PlanExecutor::executePlan() {
return Status::OK();
}
-const string& PlanExecutor::ns() {
- return _ns;
-}
-
void PlanExecutor::setYieldPolicy(YieldPolicy policy,
const Collection* collection,
bool registerExecutor) {
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index 1e97963f67b..325d7642a91 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -153,7 +153,7 @@ public:
static StatusWith<std::unique_ptr<PlanExecutor>> make(OperationContext* opCtx,
std::unique_ptr<WorkingSet> ws,
std::unique_ptr<PlanStage> rt,
- const std::string& ns,
+ NamespaceString nss,
YieldPolicy yieldPolicy);
/**
@@ -203,7 +203,9 @@ public:
/**
* Return the NS that the query is running over.
*/
- const std::string& ns();
+ const NamespaceString& nss() const {
+ return _nss;
+ }
/**
* Return the OperationContext that the plan is currently executing within.
@@ -408,7 +410,7 @@ private:
std::unique_ptr<QuerySolution> qs,
std::unique_ptr<CanonicalQuery> cq,
const Collection* collection,
- const std::string& ns);
+ NamespaceString nss);
/**
* Public factory methods delegate to this private factory to do their work.
@@ -419,7 +421,7 @@ private:
std::unique_ptr<QuerySolution> qs,
std::unique_ptr<CanonicalQuery> cq,
const Collection* collection,
- const std::string& ns,
+ NamespaceString nss,
YieldPolicy yieldPolicy);
/**
@@ -462,7 +464,7 @@ private:
std::unique_ptr<ScopedExecutorRegistration> _safety;
// What namespace are we operating over?
- std::string _ns;
+ NamespaceString _nss;
// This is used to handle automatic yielding when allowed by the YieldPolicy. Never NULL.
// TODO make this a non-pointer member. This requires some header shuffling so that this
diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp
index d84db2b225d..f315daec285 100644
--- a/src/mongo/db/query/plan_yield_policy.cpp
+++ b/src/mongo/db/query/plan_yield_policy.cpp
@@ -108,14 +108,14 @@ bool PlanYieldPolicy::yield(RecordFetcher* fetcher) {
opCtx->recoveryUnit()->abandonSnapshot();
} else {
// Release and reacquire locks.
- QueryYield::yieldAllLocks(opCtx, fetcher, _planYielding->ns());
+ QueryYield::yieldAllLocks(opCtx, fetcher, _planYielding->nss());
}
return _planYielding->restoreStateWithoutRetrying();
} catch (const WriteConflictException& wce) {
CurOp::get(opCtx)->debug().writeConflicts++;
WriteConflictException::logAndBackoff(
- attempt, "plan execution restoreState", _planYielding->ns());
+ attempt, "plan execution restoreState", _planYielding->nss().ns());
// retry
}
}
diff --git a/src/mongo/db/query/query_yield.cpp b/src/mongo/db/query/query_yield.cpp
index 5f43cf819ec..37ecc4bf528 100644
--- a/src/mongo/db/query/query_yield.cpp
+++ b/src/mongo/db/query/query_yield.cpp
@@ -47,7 +47,7 @@ MONGO_FP_DECLARE(setYieldAllLocksWait);
// static
void QueryYield::yieldAllLocks(OperationContext* opCtx,
RecordFetcher* fetcher,
- const std::string& planExecNS) {
+ const NamespaceString& planExecNS) {
// Things have to happen here in a specific order:
// 1) Tell the RecordFetcher to do any setup which needs to happen inside locks
// 2) Release lock mgr locks
diff --git a/src/mongo/db/query/query_yield.h b/src/mongo/db/query/query_yield.h
index a42e29800c9..7d98b299484 100644
--- a/src/mongo/db/query/query_yield.h
+++ b/src/mongo/db/query/query_yield.h
@@ -28,7 +28,7 @@
#pragma once
-#include <string>
+#include "mongo/db/namespace_string.h"
namespace mongo {
@@ -50,7 +50,7 @@ public:
*/
static void yieldAllLocks(OperationContext* opCtx,
RecordFetcher* fetcher,
- const std::string& planExecNS);
+ const NamespaceString& planExecNS);
};
} // namespace mongo
diff --git a/src/mongo/db/range_preserver.h b/src/mongo/db/range_preserver.h
index 55e290d52d1..35b7694234e 100644
--- a/src/mongo/db/range_preserver.h
+++ b/src/mongo/db/range_preserver.h
@@ -54,9 +54,15 @@ public:
}
}
- ~RangePreserver() {
- if (_pin)
+ void release() {
+ if (_pin) {
_pin->deleteUnderlying();
+ _pin.reset();
+ }
+ }
+
+ ~RangePreserver() {
+ release();
}
private:
diff --git a/src/mongo/db/stats/top.cpp b/src/mongo/db/stats/top.cpp
index 8a8358030b8..45680f7db15 100644
--- a/src/mongo/db/stats/top.cpp
+++ b/src/mongo/db/stats/top.cpp
@@ -75,7 +75,7 @@ Top& Top::get(ServiceContext* service) {
void Top::record(OperationContext* opCtx,
StringData ns,
LogicalOp logicalOp,
- int lockType,
+ LockType lockType,
long long micros,
bool command,
Command::ReadWriteType readWriteType) {
@@ -97,7 +97,7 @@ void Top::record(OperationContext* opCtx,
void Top::_record(OperationContext* opCtx,
CollectionData& c,
LogicalOp logicalOp,
- int lockType,
+ LockType lockType,
long long micros,
Command::ReadWriteType readWriteType) {
@@ -105,9 +105,9 @@ void Top::_record(OperationContext* opCtx,
c.total.inc(micros);
- if (lockType > 0)
+ if (lockType == LockType::WriteLocked)
c.writeLock.inc(micros);
- else if (lockType < 0)
+ else if (lockType == LockType::ReadLocked)
c.readLock.inc(micros);
switch (logicalOp) {
diff --git a/src/mongo/db/stats/top.h b/src/mongo/db/stats/top.h
index 1f08e4784a5..463482199fe 100644
--- a/src/mongo/db/stats/top.h
+++ b/src/mongo/db/stats/top.h
@@ -84,13 +84,19 @@ public:
OperationLatencyHistogram opLatencyHistogram;
};
+ enum class LockType {
+ ReadLocked,
+ WriteLocked,
+ NotLocked,
+ };
+
typedef StringMap<CollectionData> UsageMap;
public:
void record(OperationContext* opCtx,
StringData ns,
LogicalOp logicalOp,
- int lockType,
+ LockType lockType,
long long micros,
bool command,
Command::ReadWriteType readWriteType);
@@ -126,7 +132,7 @@ private:
void _record(OperationContext* opCtx,
CollectionData& c,
LogicalOp logicalOp,
- int lockType,
+ LockType lockType,
long long micros,
Command::ReadWriteType readWriteType);
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp
index 58df14b77b4..a6588ffc6b2 100644
--- a/src/mongo/dbtests/documentsourcetests.cpp
+++ b/src/mongo/dbtests/documentsourcetests.cpp
@@ -108,10 +108,7 @@ protected:
getExecutor(&_opCtx, ctx.getCollection(), std::move(cq), PlanExecutor::YIELD_MANUAL));
exec->saveState();
- exec->registerExec(ctx.getCollection());
-
- _source =
- DocumentSourceCursor::create(ctx.getCollection(), nss.ns(), std::move(exec), _ctx);
+ _source = DocumentSourceCursor::create(ctx.getCollection(), std::move(exec), _ctx);
}
intrusive_ptr<ExpressionContextForTest> ctx() {
diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp
index 5fbc726b51a..b2e15787c0d 100644
--- a/src/mongo/dbtests/query_plan_executor.cpp
+++ b/src/mongo/dbtests/query_plan_executor.cpp
@@ -91,13 +91,10 @@ public:
}
/**
- * Given a match expression, represented as the BSON object 'filterObj',
- * create a PlanExecutor capable of executing a simple collection
- * scan.
- *
- * The caller takes ownership of the returned PlanExecutor*.
+ * Given a match expression, represented as the BSON object 'filterObj', create a PlanExecutor
+ * capable of executing a simple collection scan.
*/
- PlanExecutor* makeCollScanExec(Collection* coll, BSONObj& filterObj) {
+ unique_ptr<PlanExecutor> makeCollScanExec(Collection* coll, BSONObj& filterObj) {
CollectionScanParams csparams;
csparams.collection = coll;
csparams.direction = CollectionScanParams::FORWARD;
@@ -124,7 +121,7 @@ public:
coll,
PlanExecutor::YIELD_MANUAL);
ASSERT_OK(statusWithPlanExecutor.getStatus());
- return statusWithPlanExecutor.getValue().release();
+ return std::move(statusWithPlanExecutor.getValue());
}
/**
@@ -302,8 +299,7 @@ public:
// Wrap the "inner" plan executor in a DocumentSourceCursor and add it as the first source
// in the pipeline.
innerExec->saveState();
- auto cursorSource =
- DocumentSourceCursor::create(collection, nss.ns(), std::move(innerExec), expCtx);
+ auto cursorSource = DocumentSourceCursor::create(collection, std::move(innerExec), expCtx);
auto pipeline = assertGet(Pipeline::create({cursorSource}, expCtx));
// Create the output PlanExecutor that pulls results from the pipeline.
@@ -323,7 +319,7 @@ public:
// Verify that the aggregation pipeline returns an error because its "inner" plan executor
// has been killed due to the collection being dropped.
- ASSERT_THROWS_CODE(pipeline->getNext(), UserException, 16028);
+ ASSERT_THROWS_CODE(pipeline->getNext(), UserException, ErrorCodes::QueryPlanKilled);
// Verify that the "outer" plan executor has been killed due to the collection being
// dropped.
@@ -450,10 +446,10 @@ public:
BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
Collection* coll = ctx.getCollection();
- PlanExecutor* exec = makeCollScanExec(coll, filterObj);
+ auto exec = makeCollScanExec(coll, filterObj);
// Make a client cursor from the plan executor.
- coll->getCursorManager()->registerCursor({exec, nss.ns(), false, 0, BSONObj()});
+ coll->getCursorManager()->registerCursor({std::move(exec), nss, false, BSONObj()});
// There should be one cursor before invalidation,
// and zero cursors after invalidation.
@@ -476,11 +472,11 @@ public:
Collection* collection = ctx.getCollection();
BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
- PlanExecutor* exec = makeCollScanExec(collection, filterObj);
+ auto exec = makeCollScanExec(collection, filterObj);
// Make a client cursor from the plan executor.
- auto ccPin =
- collection->getCursorManager()->registerCursor({exec, nss.ns(), false, 0, BSONObj()});
+ auto ccPin = collection->getCursorManager()->registerCursor(
+ {std::move(exec), nss, false, BSONObj()});
// If the cursor is pinned, it sticks around, even after invalidation.
ASSERT_EQUALS(1U, numCursors());
@@ -490,7 +486,7 @@ public:
// The invalidation should have killed the plan executor.
BSONObj objOut;
- ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL));
+ ASSERT_EQUALS(PlanExecutor::DEAD, ccPin.getCursor()->getExecutor()->getNext(&objOut, NULL));
ASSERT(WorkingSetCommon::isValidStatusMemberObject(objOut));
const Status status = WorkingSetCommon::getMemberObjectStatus(objOut);
ASSERT(status.reason().find(invalidateReason) != string::npos);
@@ -519,10 +515,11 @@ public:
Collection* collection = ctx.getCollection();
BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
- PlanExecutor* exec = makeCollScanExec(collection, filterObj);
+ auto exec = makeCollScanExec(collection, filterObj);
// Make a client cursor from the plan executor.
- collection->getCursorManager()->registerCursor({exec, nss.ns(), false, 0, BSONObj()});
+ collection->getCursorManager()->registerCursor(
+ {std::move(exec), nss, false, BSONObj()});
}
// There should be one cursor before timeout,
diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp
index 8171cb96cbe..d705399e27c 100644
--- a/src/mongo/dbtests/querytests.cpp
+++ b/src/mongo/dbtests/querytests.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
+#include <boost/optional.hpp>
#include <iostream>
#include "mongo/client/dbclientcursor.h"
@@ -39,6 +40,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
+#include "mongo/db/exec/queued_data_stage.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/json.h"
#include "mongo/db/lasterror.h"
@@ -244,8 +246,8 @@ protected:
return !_client.getPrevError().getField("err").isNull();
}
- const ServiceContext::UniqueOperationContext _txnPtr = cc().makeOperationContext();
- OperationContext& _opCtx = *_txnPtr;
+ const ServiceContext::UniqueOperationContext _opCtxPtr = cc().makeOperationContext();
+ OperationContext& _opCtx = *_opCtxPtr;
DBDirectClient _client;
};
@@ -1751,6 +1753,107 @@ public:
}
};
+class CursorManagerIsGloballyManagedCursorShouldReturnFalseIfLeadingBitsAreZeroes {
+public:
+ void run() {
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x0000000000000000));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x000000000FFFFFFF));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x000000007FFFFFFF));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x0FFFFFFFFFFFFFFF));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x3FFFFFFFFFFFFFFF));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x3dedbeefdeadbeef));
+ }
+};
+
+class CursorManagerIsGloballyManagedCursorShouldReturnTrueIfLeadingBitsAreZeroAndOne {
+public:
+ void run() {
+ ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x4FFFFFFFFFFFFFFF));
+ ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x5FFFFFFFFFFFFFFF));
+ ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x6FFFFFFFFFFFFFFF));
+ ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x7FFFFFFFFFFFFFFF));
+ ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x4000000000000000));
+ ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x4dedbeefdeadbeef));
+ }
+};
+
+class CursorManagerIsGloballyManagedCursorShouldReturnFalseIfLeadingBitIsAOne {
+public:
+ void run() {
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(~0LL));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0xFFFFFFFFFFFFFFFF));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x8FFFFFFFFFFFFFFF));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x8dedbeefdeadbeef));
+ }
+};
+
+class CursorManagerTest {
+public:
+ std::unique_ptr<PlanExecutor> makeFakePlanExecutor(OperationContext* opCtx) {
+ auto workingSet = stdx::make_unique<WorkingSet>();
+ auto queuedDataStage = stdx::make_unique<QueuedDataStage>(opCtx, workingSet.get());
+ return unittest::assertGet(PlanExecutor::make(opCtx,
+ std::move(workingSet),
+ std::move(queuedDataStage),
+ NamespaceString{"test.collection"},
+ PlanExecutor::YieldPolicy::YIELD_MANUAL));
+ }
+};
+
+class GlobalCursorManagerShouldReportOwnershipOfCursorsItCreated : public CursorManagerTest {
+public:
+ void run() {
+ auto opCtx = cc().makeOperationContext();
+ for (int i = 0; i < 1000; i++) {
+ auto exec = makeFakePlanExecutor(opCtx.get());
+ auto cursorPin = CursorManager::getGlobalCursorManager()->registerCursor(
+ {std::move(exec), NamespaceString{"test.collection"}, false, BSONObj()});
+ ASSERT_TRUE(CursorManager::isGloballyManagedCursor(cursorPin.getCursor()->cursorid()));
+ }
+ }
+};
+
+class CursorsFromCollectionCursorManagerShouldNotReportBeingManagedByGlobalCursorManager
+ : public CursorManagerTest {
+public:
+ void run() {
+ CursorManager testManager(NamespaceString{"test.collection"});
+ auto opCtx = cc().makeOperationContext();
+ for (int i = 0; i < 1000; i++) {
+ auto exec = makeFakePlanExecutor(opCtx.get());
+ auto cursorPin = testManager.registerCursor(
+ {std::move(exec), NamespaceString{"test.collection"}, false, BSONObj()});
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(cursorPin.getCursor()->cursorid()));
+ }
+ }
+};
+
+class AllCursorsFromCollectionCursorManagerShouldContainIdentical32BitPrefixes
+ : public CursorManagerTest {
+public:
+ void run() {
+ CursorManager testManager(NamespaceString{"test.collection"});
+ auto opCtx = cc().makeOperationContext();
+ boost::optional<uint32_t> prefix;
+ for (int i = 0; i < 1000; i++) {
+ auto exec = makeFakePlanExecutor(opCtx.get());
+ auto cursorPin = testManager.registerCursor(
+ {std::move(exec), NamespaceString{"test.collection"}, false, BSONObj()});
+ auto cursorId = cursorPin.getCursor()->cursorid();
+ if (prefix) {
+ ASSERT_EQ(*prefix, extractLeading32Bits(cursorId));
+ } else {
+ prefix = extractLeading32Bits(cursorId);
+ }
+ }
+ }
+
+private:
+ uint32_t extractLeading32Bits(CursorId cursorId) {
+ return static_cast<uint32_t>((cursorId & 0xFFFFFFFF00000000) >> 32);
+ }
+};
+
class All : public Suite {
public:
All() : Suite("query") {}
@@ -1808,10 +1911,14 @@ public:
add<QueryCursorTimeout>();
add<QueryReadsAll>();
add<KillPinnedCursor>();
-
add<queryobjecttests::names1>();
-
add<OrderingTest>();
+ add<CursorManagerIsGloballyManagedCursorShouldReturnFalseIfLeadingBitsAreZeroes>();
+ add<CursorManagerIsGloballyManagedCursorShouldReturnTrueIfLeadingBitsAreZeroAndOne>();
+ add<CursorManagerIsGloballyManagedCursorShouldReturnFalseIfLeadingBitIsAOne>();
+ add<GlobalCursorManagerShouldReportOwnershipOfCursorsItCreated>();
+ add<CursorsFromCollectionCursorManagerShouldNotReportBeingManagedByGlobalCursorManager>();
+ add<AllCursorsFromCollectionCursorManagerShouldContainIdentical32BitPrefixes>();
}
};