summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/catalog/collection.cpp2
-rw-r--r--src/mongo/db/catalog/cursor_manager.cpp148
-rw-r--r--src/mongo/db/catalog/cursor_manager.h30
-rw-r--r--src/mongo/db/clientcursor.cpp24
-rw-r--r--src/mongo/db/clientcursor.h77
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp4
-rw-r--r--src/mongo/db/commands/find_cmd.cpp7
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp182
-rw-r--r--src/mongo/db/commands/killcursors_cmd.cpp67
-rw-r--r--src/mongo/db/commands/list_collections.cpp9
-rw-r--r--src/mongo/db/commands/list_indexes.cpp9
-rw-r--r--src/mongo/db/commands/parallel_collection_scan.cpp9
-rw-r--r--src/mongo/db/commands/repair_cursor.cpp7
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp167
-rw-r--r--src/mongo/db/curop.cpp6
-rw-r--r--src/mongo/db/curop.h2
-rw-r--r--src/mongo/db/db_raii.cpp73
-rw-r--r--src/mongo/db/db_raii.h48
-rw-r--r--src/mongo/db/exec/pipeline_proxy.h10
-rw-r--r--src/mongo/db/namespace_string.cpp11
-rw-r--r--src/mongo/db/namespace_string.h20
-rw-r--r--src/mongo/db/namespace_string_test.cpp35
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp4
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp104
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h39
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp5
-rw-r--r--src/mongo/db/query/explain.cpp2
-rw-r--r--src/mongo/db/query/find.cpp131
-rw-r--r--src/mongo/db/query/get_executor.cpp12
-rw-r--r--src/mongo/db/query/internal_plans.cpp4
-rw-r--r--src/mongo/db/query/plan_executor.cpp53
-rw-r--r--src/mongo/db/query/plan_executor.h12
-rw-r--r--src/mongo/db/query/plan_yield_policy.cpp4
-rw-r--r--src/mongo/db/query/query_yield.cpp2
-rw-r--r--src/mongo/db/query/query_yield.h4
-rw-r--r--src/mongo/db/range_preserver.h10
-rw-r--r--src/mongo/db/stats/top.cpp8
-rw-r--r--src/mongo/db/stats/top.h10
-rw-r--r--src/mongo/dbtests/documentsourcetests.cpp5
-rw-r--r--src/mongo/dbtests/query_plan_executor.cpp33
-rw-r--r--src/mongo/dbtests/querytests.cpp115
42 files changed, 782 insertions, 723 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index a86944badf3..de7d36b205d 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -229,7 +229,7 @@ Collection::Collection(OperationContext* opCtx,
parseValidationAction(_details->getCollectionOptions(opCtx).validationAction))),
_validationLevel(uassertStatusOK(
parseValidationLevel(_details->getCollectionOptions(opCtx).validationLevel))),
- _cursorManager(fullNS),
+ _cursorManager(_ns),
_cappedNotifier(_recordStore->isCapped() ? new CappedInsertNotifier() : nullptr),
_mustTakeCappedLockOnInsert(isCapped() && !_ns.isSystemDotProfile() && !_ns.isOplog()) {
diff --git a/src/mongo/db/catalog/cursor_manager.cpp b/src/mongo/db/catalog/cursor_manager.cpp
index b2079e9e03e..25f0be7edec 100644
--- a/src/mongo/db/catalog/cursor_manager.cpp
+++ b/src/mongo/db/catalog/cursor_manager.cpp
@@ -50,42 +50,23 @@
namespace mongo {
-using std::string;
using std::vector;
namespace {
-unsigned idFromCursorId(CursorId id) {
+uint32_t idFromCursorId(CursorId id) {
uint64_t x = static_cast<uint64_t>(id);
x = x >> 32;
- return static_cast<unsigned>(x);
+ return static_cast<uint32_t>(x);
}
-CursorId cursorIdFromParts(unsigned collection, unsigned cursor) {
- CursorId x = static_cast<CursorId>(collection) << 32;
+CursorId cursorIdFromParts(uint32_t collectionIdentifier, uint32_t cursor) {
+ // The leading two bits of a non-global CursorId should be 0.
+ invariant((collectionIdentifier & (0b11 << 30)) == 0);
+ CursorId x = static_cast<CursorId>(collectionIdentifier) << 32;
x |= cursor;
return x;
}
-
-class IdWorkTest : public StartupTest {
-public:
- void _run(unsigned a, unsigned b) {
- CursorId x = cursorIdFromParts(a, b);
- invariant(a == idFromCursorId(x));
- CursorId y = cursorIdFromParts(a, b + 1);
- invariant(x != y);
- }
-
- void run() {
- _run(123, 456);
- _run(0xdeadbeef, 0xcafecafe);
- _run(0, 0);
- _run(99999999, 999);
- _run(0xFFFFFFFF, 1);
- _run(0xFFFFFFFF, 0);
- _run(0xFFFFFFFF, 0xFFFFFFFF);
- }
-} idWorkTest;
-}
+} // namespace
class GlobalCursorIdCache {
public:
@@ -93,16 +74,16 @@ public:
~GlobalCursorIdCache();
/**
- * this gets called when a CursorManager gets created
- * @return the id the CursorManager should use when generating
- * cursor ids
+ * Returns a unique 32-bit identifier to be used as the first 32 bits of all cursor ids for a
+ * new CursorManager.
*/
- unsigned created(const std::string& ns);
+ uint32_t registerCursorManager(const NamespaceString& nss);
/**
- * called by CursorManager when its going away
+ * Must be called when a CursorManager is deleted. 'id' must be the identifier returned by
+ * registerCursorManager().
*/
- void destroyed(unsigned id, const std::string& ns);
+ void deregisterCursorManager(uint32_t id, const NamespaceString& nss);
/**
* works globally
@@ -118,8 +99,8 @@ public:
private:
SimpleMutex _mutex;
- typedef unordered_map<unsigned, string> Map;
- Map _idToNS;
+ typedef unordered_map<unsigned, NamespaceString> Map;
+ Map _idToNss;
unsigned _nextId;
std::unique_ptr<SecureRandom> _secureRandom;
@@ -137,7 +118,7 @@ MONGO_INITIALIZER(GlobalCursorIdCache)(InitializerContext* context) {
MONGO_INITIALIZER_WITH_PREREQUISITES(GlobalCursorManager, ("GlobalCursorIdCache"))
(InitializerContext* context) {
- globalCursorManager.reset(new CursorManager(""));
+ globalCursorManager.reset(new CursorManager({}));
return Status::OK();
}
@@ -152,56 +133,57 @@ int64_t GlobalCursorIdCache::nextSeed() {
return _secureRandom->nextInt64();
}
-unsigned GlobalCursorIdCache::created(const std::string& ns) {
- static const unsigned MAX_IDS = 1000 * 1000 * 1000;
+uint32_t GlobalCursorIdCache::registerCursorManager(const NamespaceString& nss) {
+ static const uint32_t kMaxIds = 1000 * 1000 * 1000;
+ static_assert((kMaxIds & (0b11 << 30)) == 0,
+ "the first two bits of a collection identifier must always be zeroes");
stdx::lock_guard<SimpleMutex> lk(_mutex);
- fassert(17359, _idToNS.size() < MAX_IDS);
+ fassert(17359, _idToNss.size() < kMaxIds);
- for (unsigned i = 0; i <= MAX_IDS; i++) {
- unsigned id = ++_nextId;
+ for (uint32_t i = 0; i <= kMaxIds; i++) {
+ uint32_t id = ++_nextId;
if (id == 0)
continue;
- if (_idToNS.count(id) > 0)
+ if (_idToNss.count(id) > 0)
continue;
- _idToNS[id] = ns;
+ _idToNss[id] = nss;
return id;
}
- invariant(false);
+ MONGO_UNREACHABLE;
}
-void GlobalCursorIdCache::destroyed(unsigned id, const std::string& ns) {
+void GlobalCursorIdCache::deregisterCursorManager(uint32_t id, const NamespaceString& nss) {
stdx::lock_guard<SimpleMutex> lk(_mutex);
- invariant(ns == _idToNS[id]);
- _idToNS.erase(id);
+ invariant(nss == _idToNss[id]);
+ _idToNss.erase(id);
}
bool GlobalCursorIdCache::eraseCursor(OperationContext* opCtx, CursorId id, bool checkAuth) {
// Figure out what the namespace of this cursor is.
- std::string ns;
- if (globalCursorManager->ownsCursorId(id)) {
- auto pin = globalCursorManager.get()->pinCursor(id);
+ NamespaceString nss;
+ if (CursorManager::isGloballyManagedCursor(id)) {
+ auto pin = globalCursorManager->pinCursor(id);
if (!pin.isOK()) {
invariant(pin == ErrorCodes::CursorNotFound);
// No such cursor. TODO: Consider writing to audit log here (even though we don't
// have a namespace).
return false;
}
- ns = pin.getValue().getCursor()->ns();
+ nss = pin.getValue().getCursor()->nss();
} else {
stdx::lock_guard<SimpleMutex> lk(_mutex);
- unsigned nsid = idFromCursorId(id);
- Map::const_iterator it = _idToNS.find(nsid);
- if (it == _idToNS.end()) {
+ uint32_t nsid = idFromCursorId(id);
+ Map::const_iterator it = _idToNss.find(nsid);
+ if (it == _idToNss.end()) {
// No namespace corresponding to this cursor id prefix. TODO: Consider writing to
// audit log here (even though we don't have a namespace).
return false;
}
- ns = it->second;
+ nss = it->second;
}
- const NamespaceString nss(ns);
invariant(nss.isValid());
// Check if we are authorized to erase this cursor.
@@ -215,7 +197,7 @@ bool GlobalCursorIdCache::eraseCursor(OperationContext* opCtx, CursorId id, bool
}
// If this cursor is owned by the global cursor manager, ask it to erase the cursor for us.
- if (globalCursorManager->ownsCursorId(id)) {
+ if (CursorManager::isGloballyManagedCursor(id)) {
Status eraseStatus = globalCursorManager->eraseCursor(opCtx, id, checkAuth);
massert(28697,
eraseStatus.reason(),
@@ -250,16 +232,11 @@ std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* opCtx, int mil
totalTimedOut += globalCursorManager->timeoutCursors(millisSinceLastCall);
// Compute the set of collection names that we have to time out cursors for.
- vector<string> todo;
+ vector<NamespaceString> todo;
{
stdx::lock_guard<SimpleMutex> lk(_mutex);
- for (Map::const_iterator i = _idToNS.begin(); i != _idToNS.end(); ++i) {
- if (globalCursorManager->ownsCursorId(cursorIdFromParts(i->first, 0))) {
- // Skip the global cursor manager, since we handle it above (and it's not
- // associated with a collection).
- continue;
- }
- todo.push_back(i->second);
+ for (auto&& entry : _idToNss) {
+ todo.push_back(entry.second);
}
}
@@ -314,14 +291,19 @@ bool CursorManager::eraseCursorGlobal(OperationContext* opCtx, CursorId id) {
// --------------------------
-CursorManager::CursorManager(StringData ns) : _nss(ns) {
- _collectionCacheRuntimeId = globalCursorIdCache->created(_nss.ns());
+CursorManager::CursorManager(NamespaceString nss) : _nss(std::move(nss)) {
+ if (!isGlobalManager()) {
+ // Generate a unique id for this collection.
+ _collectionCacheRuntimeId = globalCursorIdCache->registerCursorManager(_nss);
+ }
_random.reset(new PseudoRandom(globalCursorIdCache->nextSeed()));
}
CursorManager::~CursorManager() {
invalidateAll(true, "collection going away");
- globalCursorIdCache->destroyed(_collectionCacheRuntimeId, _nss.ns());
+ if (!isGlobalManager()) {
+ globalCursorIdCache->deregisterCursorManager(_collectionCacheRuntimeId, _nss);
+ }
}
void CursorManager::invalidateAll(bool collectionGoingAway, const std::string& reason) {
@@ -371,11 +353,8 @@ void CursorManager::invalidateAll(bool collectionGoingAway, const std::string& r
continue;
}
- if (cc->_isPinned || cc->isAggCursor()) {
- // Pinned cursors need to stay alive, so we leave them around. Aggregation
- // cursors also can stay alive (since they don't have their lifetime bound to
- // the underlying collection). However, if they have an associated executor, we
- // need to kill it, because it's now invalid.
+ if (cc->_isPinned) {
+ // Pinned cursors need to stay alive, so we leave them around.
if (cc->getExecutor())
cc->getExecutor()->kill(reason);
newMap.insert(*i);
@@ -484,10 +463,6 @@ void CursorManager::unpin(ClientCursor* cursor) {
cursor->_isPinned = false;
}
-bool CursorManager::ownsCursorId(CursorId cursorId) const {
- return _collectionCacheRuntimeId == idFromCursorId(cursorId);
-}
-
void CursorManager::getCursorIds(std::set<CursorId>* openCursors) const {
stdx::lock_guard<SimpleMutex> lk(_mutex);
@@ -504,19 +479,32 @@ size_t CursorManager::numCursors() const {
CursorId CursorManager::_allocateCursorId_inlock() {
for (int i = 0; i < 10000; i++) {
- unsigned mypart = static_cast<unsigned>(_random->nextInt32());
- CursorId id = cursorIdFromParts(_collectionCacheRuntimeId, mypart);
+ // The leading two bits of a CursorId are used to determine if the cursor is registered on
+ // the global cursor manager.
+ CursorId id;
+ if (isGlobalManager()) {
+ // This is the global cursor manager, so generate a random number and make sure the
+ // first two bits are 01.
+ uint64_t mask = 0x3FFFFFFFFFFFFFFF;
+ uint64_t bitToSet = 1ULL << 62;
+ id = ((_random->nextInt64() & mask) | bitToSet);
+ } else {
+ // The first 2 bits are 0, the next 30 bits are the collection identifier, the next 32
+ // bits are random.
+ uint32_t myPart = static_cast<uint32_t>(_random->nextInt32());
+ id = cursorIdFromParts(_collectionCacheRuntimeId, myPart);
+ }
if (_cursors.count(id) == 0)
return id;
}
fassertFailed(17360);
}
-ClientCursorPin CursorManager::registerCursor(const ClientCursorParams& cursorParams) {
+ClientCursorPin CursorManager::registerCursor(ClientCursorParams&& cursorParams) {
stdx::lock_guard<SimpleMutex> lk(_mutex);
CursorId cursorId = _allocateCursorId_inlock();
std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor(
- new ClientCursor(cursorParams, this, cursorId));
+ new ClientCursor(std::move(cursorParams), this, cursorId));
return _registerCursor_inlock(std::move(clientCursor));
}
diff --git a/src/mongo/db/catalog/cursor_manager.h b/src/mongo/db/catalog/cursor_manager.h
index ad4289d4f38..7e4da59b626 100644
--- a/src/mongo/db/catalog/cursor_manager.h
+++ b/src/mongo/db/catalog/cursor_manager.h
@@ -73,7 +73,7 @@ class PlanExecutor;
*/
class CursorManager {
public:
- CursorManager(StringData ns);
+ CursorManager(NamespaceString nss);
/**
* Destroys the CursorManager. Managed cursors which are not pinned are destroyed. Ownership of
@@ -124,7 +124,7 @@ public:
* Constructs a new ClientCursor according to the given 'cursorParams'. The cursor is atomically
* registered with the manager and returned in pinned state.
*/
- ClientCursorPin registerCursor(const ClientCursorParams& cursorParams);
+ ClientCursorPin registerCursor(ClientCursorParams&& cursorParams);
/**
* Constructs and pins a special ClientCursor used to track sharding state for the given
@@ -153,15 +153,6 @@ public:
*/
Status eraseCursor(OperationContext* opCtx, CursorId id, bool shouldAudit);
- /**
- * Returns true if the space of cursor ids that cursor manager is responsible for includes
- * the given cursor id. Otherwise, returns false.
- *
- * The return value of this method does not indicate any information about whether or not a
- * cursor actually exists with the given cursor id.
- */
- bool ownsCursorId(CursorId cursorId) const;
-
void getCursorIds(std::set<CursorId>* openCursors) const;
/**
@@ -172,6 +163,17 @@ public:
static CursorManager* getGlobalCursorManager();
+ /**
+ * Returns true if this CursorId would be registered with the global CursorManager. Note that if
+ * this method returns true it does not imply the cursor exists.
+ */
+ static bool isGloballyManagedCursor(CursorId cursorId) {
+ // The first two bits are 01 for globally managed cursors, and 00 for cursors owned by a
+ // collection. The leading bit is always 0 so that CursorIds do not appear as negative.
+ const long long mask = static_cast<long long>(0b11) << 62;
+ return (cursorId & mask) == (static_cast<long long>(0b01) << 62);
+ }
+
static int eraseCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* ids);
static bool eraseCursorGlobalIfAuthorized(OperationContext* opCtx, CursorId id);
@@ -196,8 +198,12 @@ private:
void unpin(ClientCursor* cursor);
+ bool isGlobalManager() const {
+ return _nss.isEmpty();
+ }
+
NamespaceString _nss;
- unsigned _collectionCacheRuntimeId;
+ uint32_t _collectionCacheRuntimeId;
std::unique_ptr<PseudoRandom> _random;
mutable SimpleMutex _mutex;
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index 708a2bc38f7..70003c2da2b 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -77,17 +77,16 @@ long long ClientCursor::totalOpen() {
return cursorStatsOpen.get();
}
-ClientCursor::ClientCursor(const ClientCursorParams& params,
+ClientCursor::ClientCursor(ClientCursorParams&& params,
CursorManager* cursorManager,
CursorId cursorId)
: _cursorid(cursorId),
- _ns(params.ns),
+ _nss(std::move(params.nss)),
_isReadCommitted(params.isReadCommitted),
_cursorManager(cursorManager),
- _query(params.query),
- _queryOptions(params.qopts),
- _isAggCursor(params.isAggCursor) {
- _exec.reset(params.exec);
+ _originatingCommand(params.originatingCommandObj),
+ _queryOptions(params.queryOptions),
+ _exec(std::move(params.exec)) {
init();
}
@@ -95,7 +94,7 @@ ClientCursor::ClientCursor(const Collection* collection,
CursorManager* cursorManager,
CursorId cursorId)
: _cursorid(cursorId),
- _ns(collection->ns().ns()),
+ _nss(collection->ns()),
_cursorManager(cursorManager),
_queryOptions(QueryOption_NoCursorTimeout) {
init();
@@ -106,10 +105,9 @@ void ClientCursor::init() {
cursorStatsOpen.increment();
- if (_queryOptions & QueryOption_NoCursorTimeout) {
+ if (isNoTimeout()) {
// cursors normally timeout after an inactivity period to prevent excess memory use
// setting this prevents timeout of the cursor in question.
- _isNoTimeout = true;
cursorStatsOpenNoTimeout.increment();
}
}
@@ -120,7 +118,7 @@ ClientCursor::~ClientCursor() {
invariant(!_cursorManager);
cursorStatsOpen.decrement();
- if (_isNoTimeout) {
+ if (isNoTimeout()) {
cursorStatsOpenNoTimeout.decrement();
}
}
@@ -138,7 +136,7 @@ void ClientCursor::kill() {
bool ClientCursor::shouldTimeout(int millis) {
_idleAgeMillis += millis;
- if (_isNoTimeout || _isPinned) {
+ if (isNoTimeout() || _isPinned) {
return false;
}
return _idleAgeMillis > cursorTimeoutMillis.load();
@@ -152,7 +150,7 @@ void ClientCursor::updateSlaveLocation(OperationContext* opCtx) {
if (_slaveReadTill.isNull())
return;
- verify(str::startsWith(_ns.c_str(), "local.oplog."));
+ verify(_nss.isOplog());
Client* c = opCtx->getClient();
verify(c);
@@ -221,7 +219,7 @@ void ClientCursorPin::release() {
if (!_cursor->_cursorManager) {
// The ClientCursor was killed while we had it. Therefore, it is our responsibility to
- // kill it.
+ // delete it.
deleteUnderlying();
} else {
// Unpin the cursor under the collection cursor manager lock.
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index 44ae4600ac6..91e7a0d325c 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -28,6 +28,7 @@
#pragma once
+#include "mongo/client/dbclientinterface.h"
#include "mongo/db/cursor_id.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/query/plan_executor.h"
@@ -42,30 +43,30 @@ class CursorManager;
class RecoveryUnit;
/**
- * Parameters used for constructing a ClientCursor. ClientCursors cannot be constructed in
- * isolation, but rather must be constructed and managed using a CursorManager. See cursor_manager.h
- * for more details.
+ * Parameters used for constructing a ClientCursor. Makes an owned copy of 'originatingCommandObj'
+ * to be used across getMores.
+ *
+ * ClientCursors cannot be constructed in isolation, but rather must be
+ * constructed and managed using a CursorManager. See cursor_manager.h for more details.
*/
struct ClientCursorParams {
- ClientCursorParams(PlanExecutor* exec,
- std::string ns,
+ ClientCursorParams(std::unique_ptr<PlanExecutor> planExecutor,
+ NamespaceString nss,
bool isReadCommitted,
- int qopts = 0,
- const BSONObj query = BSONObj(),
- bool isAggCursor = false)
- : exec(exec),
- ns(std::move(ns)),
+ BSONObj originatingCommandObj)
+ : exec(std::move(planExecutor)),
+ nss(std::move(nss)),
isReadCommitted(isReadCommitted),
- qopts(qopts),
- query(query),
- isAggCursor(isAggCursor) {}
+ queryOptions(exec->getCanonicalQuery()
+ ? exec->getCanonicalQuery()->getQueryRequest().getOptions()
+ : 0),
+ originatingCommandObj(originatingCommandObj.getOwned()) {}
- PlanExecutor* exec = nullptr;
- const std::string ns;
+ std::unique_ptr<PlanExecutor> exec;
+ const NamespaceString nss;
bool isReadCommitted = false;
- int qopts = 0;
- const BSONObj query = BSONObj();
- bool isAggCursor = false;
+ int queryOptions = 0;
+ BSONObj originatingCommandObj;
};
/**
@@ -92,18 +93,14 @@ public:
return _cursorid;
}
- std::string ns() const {
- return _ns;
+ const NamespaceString& nss() const {
+ return _nss;
}
bool isReadCommitted() const {
return _isReadCommitted;
}
- bool isAggCursor() const {
- return _isAggCursor;
- }
-
PlanExecutor* getExecutor() const {
return _exec.get();
}
@@ -112,8 +109,8 @@ public:
return _queryOptions;
}
- const BSONObj& getQuery() const {
- return _query;
+ const BSONObj& getOriginatingCommandObj() const {
+ return _originatingCommand;
}
/**
@@ -223,7 +220,7 @@ private:
* Constructs a ClientCursor. Since cursors must come into being registered and pinned, this is
* private. See cursor_manager.h for more details.
*/
- ClientCursor(const ClientCursorParams& params, CursorManager* cursorManager, CursorId cursorId);
+ ClientCursor(ClientCursorParams&& params, CursorManager* cursorManager, CursorId cursorId);
/**
* Constructs a special ClientCursor used to track sharding state for the given collection.
@@ -246,11 +243,15 @@ private:
*/
void kill();
+ bool isNoTimeout() const {
+ return (_queryOptions & QueryOption_NoCursorTimeout);
+ }
+
// The ID of the ClientCursor. A value of 0 is used to mean that no cursor id has been assigned.
CursorId _cursorid = 0;
// The namespace we're operating on.
- std::string _ns;
+ const NamespaceString _nss;
const bool _isReadCommitted = false;
@@ -265,21 +266,11 @@ private:
// Tracks the number of results returned by this cursor so far.
long long _pos = 0;
- // If this cursor was created by a find operation, '_query' holds the query predicate for
- // the find. If this cursor was created by a command (e.g. the aggregate command), then
- // '_query' holds the command specification received from the client.
- BSONObj _query;
+ // Holds an owned copy of the command specification received from the client.
+ const BSONObj _originatingCommand;
// See the QueryOptions enum in dbclientinterface.h.
- int _queryOptions = 0;
-
- // Is this ClientCursor backed by an aggregation pipeline?
- //
- // Agg executors differ from others in that they manage their own locking internally and
- // should not be killed or destroyed when the underlying collection is deleted.
- //
- // Note: This should *not* be set for the internal cursor used as input to an aggregation.
- const bool _isAggCursor = false;
+ const int _queryOptions = 0;
// While a cursor is being used by a client, it is marked as "pinned". See ClientCursorPin
// below.
@@ -287,10 +278,6 @@ private:
// Cursors always come into existence in a pinned state.
bool _isPinned = true;
- // Is the "no timeout" flag set on this cursor? If false, this cursor may be automatically
- // deleted after an interval of inactivity.
- bool _isNoTimeout = false;
-
// The replication position only used in master-slave.
Timestamp _slaveReadTill;
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index a2deb9db62a..cd66033e427 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -203,13 +203,11 @@ Status checkCanAcceptWritesForDatabase(OperationContext* opCtx, const NamespaceS
void recordStatsForTopCommand(OperationContext* opCtx) {
auto curOp = CurOp::get(opCtx);
- const int writeLocked = 1;
-
Top::get(opCtx->getClient()->getServiceContext())
.record(opCtx,
curOp->getNS(),
curOp->getLogicalOp(),
- writeLocked,
+ Top::LockType::WriteLocked,
curOp->elapsedMicros(),
curOp->isCommand(),
curOp->getReadWriteType());
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index d43e991950c..60ae2e2eba4 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -399,11 +399,10 @@ public:
// Create a ClientCursor containing this plan executor and register it with the cursor
// manager.
ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor(
- {exec.release(),
- nss.ns(),
+ {std::move(exec),
+ nss,
opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
- originalQR.getOptions(),
- cmdObj.getOwned()});
+ cmdObj});
cursorId = pinnedCursor.getCursor()->cursorid();
invariant(!exec);
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 78c7d822404..448177a8068 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/counters.h"
+#include "mongo/db/stats/top.h"
#include "mongo/s/chunk_version.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/fail_point_service.h"
@@ -154,9 +155,6 @@ public:
auto curOp = CurOp::get(opCtx);
curOp->debug().cursorid = request.cursorid;
- // Disable shard version checking - getmore commands are always unversioned
- OperationShardingState::get(opCtx).setShardVersion(request.nss, ChunkVersion::IGNORED());
-
// Validate term before acquiring locks, if provided.
if (request.term) {
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
@@ -167,67 +165,47 @@ public:
}
}
- // Depending on the type of cursor being operated on, we hold locks for the whole
- // getMore, or none of the getMore, or part of the getMore. The three cases in detail:
+ // Cursors come in one of two flavors:
+ // - Cursors owned by the collection cursor manager, such as those generated via the find
+ // command. For these cursors, we hold the appropriate collection lock for the duration of
+ // the getMore using AutoGetCollectionForRead. This will automatically update the CurOp
+ // object appropriately and record execution time via Top upon completion.
+ // - Cursors owned by the global cursor manager, such as those generated via the aggregate
+ // command. These cursors either hold no collection state or manage their collection state
+ // internally, so we acquire no locks. In this case we use the AutoStatsTracker object to
+ // update the CurOp object appropriately and record execution time via Top upon
+ // completion.
//
- // 1) Normal cursor: we lock with "ctx" and hold it for the whole getMore.
- // 2) Cursor owned by global cursor manager: we don't lock anything. These cursors
- // don't own any collection state. These cursors are generated either by the
- // listCollections or listIndexes commands, as these special cursor-generating commands
- // operate over catalog data rather than targeting the data within a collection.
- // 3) Agg cursor: we lock with "ctx", then release, then relock with "unpinDBLock" and
- // "unpinCollLock". This is because agg cursors handle locking internally (hence the
- // release), but the pin and unpin of the cursor must occur under the collection
- // lock. We don't use our AutoGetCollectionForRead "ctx" to relock, because
- // AutoGetCollectionForRead checks the sharding version (and we want the relock for
- // the unpin to succeed even if the sharding version has changed).
+ // Thus, only one of 'readLock' and 'statsTracker' will be populated as we populate
+ // 'cursorManager'.
//
// Note that we declare our locks before our ClientCursorPin, in order to ensure that
- // the pin's destructor is called before the lock destructors (so that the unpin occurs
- // under the lock).
- std::unique_ptr<AutoGetCollectionForReadCommand> ctx;
- std::unique_ptr<Lock::DBLock> unpinDBLock;
- std::unique_ptr<Lock::CollectionLock> unpinCollLock;
-
+ // the pin's destructor is called before the lock's destructor (if there is one) so that the
+ // cursor cleanup can occur under the lock.
+ boost::optional<AutoGetCollectionForReadCommand> readLock;
+ boost::optional<AutoStatsTracker> statsTracker;
CursorManager* cursorManager;
- if (request.nss.isListIndexesCursorNS() || request.nss.isListCollectionsCursorNS()) {
+
+ if (CursorManager::isGloballyManagedCursor(request.cursorid)) {
cursorManager = CursorManager::getGlobalCursorManager();
+
+ if (boost::optional<NamespaceString> nssForCurOp =
+ request.nss.isGloballyManagedNamespace()
+ ? request.nss.getTargetNSForGloballyManagedNamespace()
+ : request.nss) {
+ const boost::optional<int> dbProfilingLevel = boost::none;
+ statsTracker.emplace(
+ opCtx, *nssForCurOp, Top::LockType::NotLocked, dbProfilingLevel);
+ }
} else {
- ctx = stdx::make_unique<AutoGetCollectionOrViewForReadCommand>(opCtx, request.nss);
- auto viewCtx = static_cast<AutoGetCollectionOrViewForReadCommand*>(ctx.get());
- Collection* collection = ctx->getCollection();
+ // getMore commands are always unversioned, so prevent AutoGetCollectionForRead from
+ // checking the shard version.
+ OperationShardingState::get(opCtx).setShardVersion(request.nss,
+ ChunkVersion::IGNORED());
+
+ readLock.emplace(opCtx, request.nss);
+ Collection* collection = readLock->getCollection();
if (!collection) {
- // Rewrite a getMore on a view to a getMore on the original underlying collection.
- // If the view no longer exists, or has been rewritten, the cursor id will be
- // unknown, resulting in an appropriate error.
- if (viewCtx->getView()) {
- auto resolved =
- viewCtx->getDb()->getViewCatalog()->resolveView(opCtx, request.nss);
- if (!resolved.isOK()) {
- return appendCommandStatus(result, resolved.getStatus());
- }
- viewCtx->releaseLocksForView();
-
- // Only one shardversion can be set at a time for an operation, so unset it
- // here to allow setting it on the underlying namespace.
- OperationShardingState::get(opCtx).unsetShardVersion(request.nss);
-
- GetMoreRequest newRequest(resolved.getValue().getNamespace(),
- request.cursorid,
- request.batchSize,
- request.awaitDataTimeout,
- request.term,
- request.lastKnownCommittedOpTime);
-
- bool retVal = runParsed(opCtx, origNss, newRequest, cmdObj, errmsg, result);
- {
- // Set the namespace of the curop back to the view namespace so ctx records
- // stats on this view namespace on destruction.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- curOp->setNS_inlock(origNss.ns());
- }
- return retVal;
- }
return appendCommandStatus(result,
Status(ErrorCodes::OperationFailed,
"collection dropped between getMore calls"));
@@ -243,23 +221,24 @@ public:
ClientCursor* cursor = ccPin.getValue().getCursor();
- // If the fail point is enabled, busy wait until it is disabled. We unlock and re-acquire
- // the locks periodically in order to avoid deadlock (see SERVER-21997 for details).
+ // If the fail point is enabled, busy wait until it is disabled.
while (MONGO_FAIL_POINT(keepCursorPinnedDuringGetMore)) {
- invariant(ctx);
- invariant(!unpinDBLock);
- invariant(!unpinCollLock);
- sleepFor(Milliseconds(10));
- ctx.reset();
- ctx = stdx::make_unique<AutoGetCollectionForReadCommand>(opCtx, request.nss);
+ if (readLock) {
+ // We unlock and re-acquire the locks periodically in order to avoid deadlock (see
+ // SERVER-21997 for details).
+ sleepFor(Milliseconds(10));
+ readLock.reset();
+ readLock.emplace(opCtx, request.nss);
+ }
}
- if (request.nss.ns() != cursor->ns()) {
+ if (request.nss != cursor->nss()) {
return appendCommandStatus(
result,
Status(ErrorCodes::Unauthorized,
str::stream() << "Requested getMore on namespace '" << request.nss.ns()
- << "', but cursor belongs to a different namespace"));
+ << "', but cursor belongs to a different namespace "
+ << cursor->nss().ns()));
}
if (request.nss.isOplog() && MONGO_FAIL_POINT(rsStopGetMoreCmd)) {
@@ -274,9 +253,8 @@ public:
if (isCursorAwaitData(cursor)) {
invariant(isCursorTailable(cursor));
- if (cursor->isAggCursor()) {
- Status status(ErrorCodes::BadValue,
- "awaitData cannot be set on an aggregation cursor");
+ if (CursorManager::isGloballyManagedCursor(request.cursorid)) {
+ Status status(ErrorCodes::BadValue, "awaitData cannot be set on this cursor");
return appendCommandStatus(result, status);
}
}
@@ -288,8 +266,7 @@ public:
}
// On early return, get rid of the cursor.
- ScopeGuard cursorFreer =
- MakeGuard(&GetMoreCmd::cleanupCursor, opCtx, &ccPin.getValue(), request);
+ ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &ccPin.getValue());
if (cursor->isReadCommitted())
uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
@@ -318,11 +295,6 @@ public:
}
opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
- if (cursor->isAggCursor()) {
- // Agg cursors handle their own locking internally.
- ctx.reset(); // unlocks
- }
-
PlanExecutor* exec = cursor->getExecutor();
exec->reattachToOperationContext(opCtx);
exec->restoreState();
@@ -334,7 +306,7 @@ public:
// Ensure that the original query or command object is available in the slow query log,
// profiler and currentOp.
- auto originatingCommand = cursor->getQuery();
+ auto originatingCommand = cursor->getOriginatingCommandObj();
if (!originatingCommand.isEmpty()) {
curOp->setOriginatingCommand_inlock(originatingCommand);
}
@@ -343,12 +315,12 @@ public:
uint64_t notifierVersion = 0;
std::shared_ptr<CappedInsertNotifier> notifier;
if (isCursorAwaitData(cursor)) {
- invariant(ctx->getCollection()->isCapped());
+ invariant(readLock->getCollection()->isCapped());
// Retrieve the notifier which we will wait on until new data arrives. We make sure
// to do this in the lock because once we drop the lock it is possible for the
// collection to become invalid. The notifier itself will outlive the collection if
// the collection is dropped, as we keep a shared_ptr to it.
- notifier = ctx->getCollection()->getCappedInsertNotifier();
+ notifier = readLock->getCollection()->getCappedInsertNotifier();
// Must get the version before we call generateBatch in case a write comes in after
// that call and before we call wait on the notifier.
@@ -386,11 +358,11 @@ public:
// to do this in the lock because once we drop the lock it is possible for the
// collection to become invalid. The notifier itself will outlive the collection if
// the collection is dropped, as we keep a shared_ptr to it.
- auto notifier = ctx->getCollection()->getCappedInsertNotifier();
+ auto notifier = readLock->getCollection()->getCappedInsertNotifier();
// Save the PlanExecutor and drop our locks.
exec->saveState();
- ctx.reset();
+ readLock.reset();
// Block waiting for data.
const auto timeout = opCtx->getRemainingMaxTimeMicros();
@@ -402,7 +374,7 @@ public:
// CappedInsertNotifier.
curOp->setExpectedLatencyMs(durationCount<Milliseconds>(timeout));
- ctx.reset(new AutoGetCollectionForReadCommand(opCtx, request.nss));
+ readLock.emplace(opCtx, request.nss);
exec->restoreState();
// We woke up because either the timed_wait expired, or there was more data. Either
@@ -420,11 +392,11 @@ public:
postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined;
curOp->debug().setPlanSummaryMetrics(postExecutionStats);
- // We do not report 'execStats' for aggregation, both in the original request and
- // subsequent getMore. The reason for this is that aggregation's source PlanExecutor
- // could be destroyed before we know whether we need execStats and we do not want to
- // generate for all operations due to cost.
- if (!cursor->isAggCursor() && curOp->shouldDBProfile()) {
+ // We do not report 'execStats' for aggregation or other globally managed cursors, both in
+ // the original request and subsequent getMore. It would be useful to have this information
+ // for an aggregation, but the source PlanExecutor could be destroyed before we know whether
+ // we need execStats and we do not want to generate for all operations due to cost.
+ if (!CursorManager::isGloballyManagedCursor(request.cursorid) && curOp->shouldDBProfile()) {
BSONObjBuilder execStatsBob;
Explain::getWinningPlanStats(exec, &execStatsBob);
curOp->debug().execStats = execStatsBob.obj();
@@ -448,9 +420,7 @@ public:
curOp->debug().cursorExhausted = true;
}
- // Respond with the originally requested namespace, even if this is a getMore over a view
- // that was resolved to a different backing namespace.
- nextBatch.done(respondWithId, origNss.ns());
+ nextBatch.done(respondWithId, request.nss.ns());
// Ensure log and profiler include the number of results returned in this getMore's response
// batch.
@@ -458,15 +428,6 @@ public:
if (respondWithId) {
cursorFreer.Dismiss();
-
- // If we are operating on an aggregation cursor, then we dropped our collection lock
- // earlier and need to reacquire it in order to clean up our ClientCursorPin.
- if (cursor->isAggCursor()) {
- invariant(NULL == ctx.get());
- unpinDBLock.reset(new Lock::DBLock(opCtx, request.nss.db(), MODE_IS));
- unpinCollLock.reset(
- new Lock::CollectionLock(opCtx->lockState(), request.nss.ns(), MODE_IS));
- }
}
return true;
@@ -554,27 +515,6 @@ public:
return Status::OK();
}
- /**
- * Called via a ScopeGuard on early return in order to ensure that the ClientCursor gets
- * cleaned up properly.
- */
- static void cleanupCursor(OperationContext* opCtx,
- ClientCursorPin* ccPin,
- const GetMoreRequest& request) {
- ClientCursor* cursor = ccPin->getCursor();
-
- std::unique_ptr<Lock::DBLock> unpinDBLock;
- std::unique_ptr<Lock::CollectionLock> unpinCollLock;
-
- if (cursor->isAggCursor()) {
- unpinDBLock.reset(new Lock::DBLock(opCtx, request.nss.db(), MODE_IS));
- unpinCollLock.reset(
- new Lock::CollectionLock(opCtx->lockState(), request.nss.ns(), MODE_IS));
- }
-
- ccPin->deleteUnderlying();
- }
-
} getMoreCmd;
} // namespace mongo
diff --git a/src/mongo/db/commands/killcursors_cmd.cpp b/src/mongo/db/commands/killcursors_cmd.cpp
index c1526d990b5..a39f0dfaa42 100644
--- a/src/mongo/db/commands/killcursors_cmd.cpp
+++ b/src/mongo/db/commands/killcursors_cmd.cpp
@@ -35,6 +35,8 @@
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/query/killcursors_request.h"
+#include "mongo/db/stats/top.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -48,34 +50,51 @@ private:
Status _killCursor(OperationContext* opCtx,
const NamespaceString& nss,
CursorId cursorId) final {
- std::unique_ptr<AutoGetCollectionOrViewForReadCommand> ctx;
-
+ // Cursors come in one of two flavors:
+ // - Cursors owned by the collection cursor manager, such as those generated via the find
+ // command. For these cursors, we hold the appropriate collection lock for the duration of
+ // the getMore using AutoGetCollectionForRead. This will automatically update the CurOp
+ // object appropriately and record execution time via Top upon completion.
+ // - Cursors owned by the global cursor manager, such as those generated via the aggregate
+ // command. These cursors either hold no collection state or manage their collection state
+ // internally, so we acquire no locks. In this case we use the AutoStatsTracker object to
+ // update the CurOp object appropriately and record execution time via Top upon
+ // completion.
+ //
+ // Thus, exactly one of 'readLock' and 'statsTracker' will be populated as we populate
+ // 'cursorManager'.
+ boost::optional<AutoGetCollectionForReadCommand> readLock;
+ boost::optional<AutoStatsTracker> statsTracker;
CursorManager* cursorManager;
- if (nss.isListIndexesCursorNS() || nss.isListCollectionsCursorNS()) {
- // listCollections and listIndexes are special cursor-generating commands whose cursors
- // are managed globally, as they operate over catalog data rather than targeting the
- // data within a collection.
+
+ if (CursorManager::isGloballyManagedCursor(cursorId)) {
cursorManager = CursorManager::getGlobalCursorManager();
- } else {
- ctx = stdx::make_unique<AutoGetCollectionOrViewForReadCommand>(opCtx, nss);
- Collection* collection = ctx->getCollection();
- ViewDefinition* view = ctx->getView();
- if (view) {
- Database* db = ctx->getDb();
- auto resolved = db->getViewCatalog()->resolveView(opCtx, nss);
- if (!resolved.isOK()) {
- return resolved.getStatus();
- }
- ctx->releaseLocksForView();
- Status status = _killCursor(opCtx, resolved.getValue().getNamespace(), cursorId);
- {
- // Set the namespace of the curop back to the view namespace so ctx records
- // stats on this view namespace on destruction.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->setNS_inlock(nss.ns());
+
+ if (auto nssForCurOp = nss.isGloballyManagedNamespace()
+ ? nss.getTargetNSForGloballyManagedNamespace()
+ : nss) {
+ const boost::optional<int> dbProfilingLevel = boost::none;
+ statsTracker.emplace(
+ opCtx, *nssForCurOp, Top::LockType::NotLocked, dbProfilingLevel);
+ }
+
+ // Make sure the namespace of the cursor matches the namespace passed to the killCursors
+ // command so we can be sure we checked the correct privileges.
+ auto ccPin = cursorManager->pinCursor(cursorId);
+ if (ccPin.isOK()) {
+ auto cursorNs = ccPin.getValue().getCursor()->nss();
+ if (cursorNs != nss) {
+ return Status{ErrorCodes::Unauthorized,
+ str::stream() << "issued killCursors on namespace '" << nss.ns()
+ << "', but cursor with id "
+ << cursorId
+ << " belongs to a different namespace: "
+ << cursorNs.ns()};
}
- return status;
}
+ } else {
+ readLock.emplace(opCtx, nss);
+ Collection* collection = readLock->getCollection();
if (!collection) {
return {ErrorCodes::CursorNotFound,
str::stream() << "collection does not exist: " << nss.ns()};
diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp
index da72422b4eb..a636ed51d17 100644
--- a/src/mongo/db/commands/list_collections.cpp
+++ b/src/mongo/db/commands/list_collections.cpp
@@ -298,7 +298,7 @@ public:
const NamespaceString cursorNss = NamespaceString::makeListCollectionsNSS(dbname);
auto statusWithPlanExecutor = PlanExecutor::make(
- opCtx, std::move(ws), std::move(root), cursorNss.ns(), PlanExecutor::YIELD_MANUAL);
+ opCtx, std::move(ws), std::move(root), cursorNss, PlanExecutor::YIELD_MANUAL);
if (!statusWithPlanExecutor.isOK()) {
return appendCommandStatus(result, statusWithPlanExecutor.getStatus());
}
@@ -328,9 +328,10 @@ public:
exec->saveState();
exec->detachFromOperationContext();
auto pinnedCursor = CursorManager::getGlobalCursorManager()->registerCursor(
- {exec.release(),
- cursorNss.ns(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()});
+ {std::move(exec),
+ cursorNss,
+ opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ jsobj});
cursorId = pinnedCursor.getCursor()->cursorid();
}
diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp
index 73e3d19289e..7ea9b23ed0d 100644
--- a/src/mongo/db/commands/list_indexes.cpp
+++ b/src/mongo/db/commands/list_indexes.cpp
@@ -198,7 +198,7 @@ public:
dassert(ns == cursorNss.getTargetNSForListIndexes());
auto statusWithPlanExecutor = PlanExecutor::make(
- opCtx, std::move(ws), std::move(root), cursorNss.ns(), PlanExecutor::YIELD_MANUAL);
+ opCtx, std::move(ws), std::move(root), cursorNss, PlanExecutor::YIELD_MANUAL);
if (!statusWithPlanExecutor.isOK()) {
return appendCommandStatus(result, statusWithPlanExecutor.getStatus());
}
@@ -228,9 +228,10 @@ public:
exec->saveState();
exec->detachFromOperationContext();
auto pinnedCursor = CursorManager::getGlobalCursorManager()->registerCursor(
- {exec.release(),
- cursorNss.ns(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()});
+ {std::move(exec),
+ cursorNss,
+ opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ cmdObj});
cursorId = pinnedCursor.getCursor()->cursorid();
}
diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp
index 8ff2e100a58..c46cb6d8c5b 100644
--- a/src/mongo/db/commands/parallel_collection_scan.cpp
+++ b/src/mongo/db/commands/parallel_collection_scan.cpp
@@ -148,11 +148,12 @@ public:
exec->saveState();
exec->detachFromOperationContext();
- // Create and regiter a new ClientCursor.
+ // Create and register a new ClientCursor.
auto pinnedCursor = collection->getCursorManager()->registerCursor(
- {exec.release(),
- ns.ns(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()});
+ {std::move(exec),
+ ns,
+ opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ cmdObj});
pinnedCursor.getCursor()->setLeftoverMaxTimeMicros(
opCtx->getRemainingMaxTimeMicros());
diff --git a/src/mongo/db/commands/repair_cursor.cpp b/src/mongo/db/commands/repair_cursor.cpp
index ac6155cc394..69ded443ec6 100644
--- a/src/mongo/db/commands/repair_cursor.cpp
+++ b/src/mongo/db/commands/repair_cursor.cpp
@@ -107,9 +107,10 @@ public:
exec->detachFromOperationContext();
auto pinnedCursor = collection->getCursorManager()->registerCursor(
- {exec.release(),
- ns.ns(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()});
+ {std::move(exec),
+ ns,
+ opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ cmdObj});
appendCursorResponseObject(
pinnedCursor.getCursor()->cursorid(), ns.ns(), BSONArray(), &result);
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 35164560c16..f0ef162e9af 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -60,6 +60,7 @@
#include "mongo/db/views/view_sharding_check.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
#include "mongo/util/string_map.h"
namespace mongo {
@@ -76,19 +77,15 @@ namespace {
/**
* Returns true if we need to keep a ClientCursor saved for this pipeline (for future getMore
* requests). Otherwise, returns false. The passed 'nsForCursor' is only used to determine the
- * namespace used in the returned cursor. In the case of views, this can be different from that
- * in 'request'.
+ * namespace used in the returned cursor, which will be registered with the global cursor manager,
+ * and thus will be different from that in 'request'.
*/
bool handleCursorCommand(OperationContext* opCtx,
- const string& nsForCursor,
+ const NamespaceString& nsForCursor,
ClientCursor* cursor,
- PlanExecutor* exec,
const AggregationRequest& request,
BSONObjBuilder& result) {
- if (cursor) {
- invariant(cursor->getExecutor() == exec);
- invariant(cursor->isAggCursor());
- }
+ invariant(cursor);
long long batchSize = request.getBatchSize();
@@ -99,45 +96,29 @@ bool handleCursorCommand(OperationContext* opCtx,
// The initial getNext() on a PipelineProxyStage may be very expensive so we don't
// do it when batchSize is 0 since that indicates a desire for a fast return.
PlanExecutor::ExecState state;
- if ((state = exec->getNext(&next, NULL)) == PlanExecutor::IS_EOF) {
+ if ((state = cursor->getExecutor()->getNext(&next, nullptr)) == PlanExecutor::IS_EOF) {
// make it an obvious error to use cursor or executor after this point
- cursor = NULL;
- exec = NULL;
+ cursor = nullptr;
break;
}
- uassert(34426,
- "Plan executor error during aggregation: " + WorkingSetCommon::toStatusString(next),
- PlanExecutor::ADVANCED == state);
+ if (PlanExecutor::ADVANCED != state) {
+ auto status = WorkingSetCommon::getMemberObjectStatus(next);
+ uasserted(status.code(),
+ "PlanExecutor error during aggregation: " +
+ WorkingSetCommon::toStatusString(next));
+ }
// If adding this object will cause us to exceed the message size limit, then we stash it
// for later.
if (!FindCommon::haveSpaceForNext(next, objCount, resultsArray.len())) {
- exec->enqueue(next);
+ cursor->getExecutor()->enqueue(next);
break;
}
resultsArray.append(next);
}
- // NOTE: exec->isEOF() can have side effects such as writing by $out. However, it should
- // be relatively quick since if there was no cursor then the input is empty. Also, this
- // violates the contract for batchSize==0. Sharding requires a cursor to be returned in that
- // case. This is ok for now however, since you can't have a sharded collection that doesn't
- // exist.
- const bool canReturnMoreBatches = cursor;
- if (!canReturnMoreBatches && exec && !exec->isEOF()) {
- // msgasserting since this shouldn't be possible to trigger from today's aggregation
- // language. The wording assumes that the only reason cursor would be null is if the
- // collection doesn't exist.
- msgasserted(
- 17391,
- str::stream() << "Aggregation has more results than fit in initial batch, but can't "
- << "create cursor since collection "
- << nsForCursor
- << " doesn't exist");
- }
-
if (cursor) {
// If a time limit was set on the pipeline, remaining time is "rolled over" to the
// cursor (for use by future getmore ops).
@@ -147,14 +128,14 @@ bool handleCursorCommand(OperationContext* opCtx,
// Cursor needs to be in a saved state while we yield locks for getmore. State
// will be restored in getMore().
- exec->saveState();
- exec->detachFromOperationContext();
+ cursor->getExecutor()->saveState();
+ cursor->getExecutor()->detachFromOperationContext();
} else {
CurOp::get(opCtx)->debug().cursorExhausted = true;
}
- const long long cursorId = cursor ? cursor->cursorid() : 0LL;
- appendCursorResponseObject(cursorId, nsForCursor, resultsArray.arr(), &result);
+ const CursorId cursorId = cursor ? cursor->cursorid() : 0LL;
+ appendCursorResponseObject(cursorId, nsForCursor.ns(), resultsArray.arr(), &result);
return static_cast<bool>(cursor);
}
@@ -296,7 +277,6 @@ Status runAggregate(OperationContext* opCtx,
: uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
->makeFromBSON(request.getCollation()));
- boost::optional<ClientCursorPin> pin; // either this OR the exec will be non-null
unique_ptr<PlanExecutor> exec;
boost::intrusive_ptr<ExpressionContext> expCtx;
boost::intrusive_ptr<Pipeline> pipeline;
@@ -304,11 +284,6 @@ Status runAggregate(OperationContext* opCtx,
{
// This will throw if the sharding version for this connection is out of date. If the
// namespace is a view, the lock will be released before re-running the aggregation.
- // Otherwise, the lock must be held continuously from now until we have we created both
- // the output ClientCursor and the input executor. This ensures that both are using the
- // same sharding version that we synchronize on here. This is also why we always need to
- // create a ClientCursor even when we aren't outputting to a cursor. See the comment on
- // ShardFilterStage for more details.
AutoGetCollectionOrViewForReadCommand ctx(opCtx, nss);
Collection* collection = ctx.getCollection();
@@ -416,17 +391,15 @@ Status runAggregate(OperationContext* opCtx,
// it to the front of the pipeline if needed.
PipelineD::prepareCursorSource(collection, &request, pipeline);
- // Create the PlanExecutor which returns results from the pipeline. The WorkingSet
- // ('ws') and the PipelineProxyStage ('proxy') will be owned by the created
- // PlanExecutor.
auto ws = make_unique<WorkingSet>();
auto proxy = make_unique<PipelineProxyStage>(opCtx, pipeline, ws.get());
- auto statusWithPlanExecutor = (NULL == collection)
- ? PlanExecutor::make(
- opCtx, std::move(ws), std::move(proxy), nss.ns(), PlanExecutor::YIELD_MANUAL)
- : PlanExecutor::make(
- opCtx, std::move(ws), std::move(proxy), collection, PlanExecutor::YIELD_MANUAL);
+ // This PlanExecutor will simply forward requests to the Pipeline, so does not need to
+ // yield or to be registered with any collection's CursorManager to receive invalidations.
+ // The Pipeline may contain PlanExecutors which *are* yielding PlanExecutors and which *are*
+ // registered with their respective collection's CursorManager
+ auto statusWithPlanExecutor = PlanExecutor::make(
+ opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::YIELD_MANUAL);
invariant(statusWithPlanExecutor.isOK());
exec = std::move(statusWithPlanExecutor.getValue());
@@ -435,75 +408,39 @@ Status runAggregate(OperationContext* opCtx,
stdx::lock_guard<Client> lk(*opCtx->getClient());
curOp->setPlanSummary_inlock(std::move(planSummary));
}
-
- if (collection) {
- const bool isAggCursor = true; // enable special locking behavior
- pin.emplace(collection->getCursorManager()->registerCursor(
- {exec.release(),
- nss.ns(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
- 0,
- cmdObj.getOwned(),
- isAggCursor}));
- // Don't add any code between here and the start of the try block.
- }
-
- // At this point, it is safe to release the collection lock.
- // - In the case where we have a collection: we will need to reacquire the
- // collection lock later when cleaning up our ClientCursorPin.
- // - In the case where we don't have a collection: our PlanExecutor won't be
- // registered, so it will be safe to clean it up outside the lock.
- invariant(!exec || !collection);
}
- try {
- // Unless set to true, the ClientCursor created above will be deleted on block exit.
- bool keepCursor = false;
-
- // If both explain and cursor are specified, explain wins.
- if (expCtx->explain) {
- result << "stages" << Value(pipeline->writeExplainOps(*expCtx->explain));
- } else {
- // Cursor must be specified, if explain is not.
- keepCursor = handleCursorCommand(opCtx,
- origNss.ns(),
- pin ? pin->getCursor() : nullptr,
- pin ? pin->getCursor()->getExecutor() : exec.get(),
- request,
- result);
- }
-
- if (!expCtx->explain) {
- PlanSummaryStats stats;
- Explain::getSummaryStats(pin ? *pin->getCursor()->getExecutor() : *exec.get(), &stats);
- curOp->debug().setPlanSummaryMetrics(stats);
- curOp->debug().nreturned = stats.nReturned;
+ // Having released the collection lock, we can now create a cursor that returns results from the
+ // pipeline. This cursor owns no collection state, and thus we register it with the global
+ // cursor manager. The global cursor manager does not deliver invalidations or kill
+ // notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving
+ // invalidations and kill notifications themselves, not the cursor we create here.
+ auto pin = CursorManager::getGlobalCursorManager()->registerCursor(
+ {std::move(exec),
+ origNss,
+ opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ cmdObj});
+ ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin);
+
+ // If both explain and cursor are specified, explain wins.
+ if (expCtx->explain) {
+ result << "stages" << Value(pipeline->writeExplainOps(*expCtx->explain));
+ } else {
+ // Cursor must be specified, if explain is not.
+ const bool keepCursor =
+ handleCursorCommand(opCtx, origNss, pin.getCursor(), request, result);
+ if (keepCursor) {
+ cursorFreer.Dismiss();
}
+ }
- // Clean up our ClientCursorPin, if needed. We must reacquire the collection lock
- // in order to do so.
- if (pin) {
- // We acquire locks here with DBLock and CollectionLock instead of using
- // AutoGetCollectionForRead. AutoGetCollectionForRead will throw if the
- // sharding version is out of date, and we don't care if the sharding version
- // has changed.
- Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS);
- Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS);
- if (keepCursor) {
- pin->release();
- } else {
- pin->deleteUnderlying();
- }
- }
- } catch (...) {
- // On our way out of scope, we clean up our ClientCursorPin if needed.
- if (pin) {
- Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS);
- Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS);
- pin->deleteUnderlying();
- }
- throw;
+ if (!expCtx->explain) {
+ PlanSummaryStats stats;
+ Explain::getSummaryStats(*(pin.getCursor()->getExecutor()), &stats);
+ curOp->debug().setPlanSummaryMetrics(stats);
+ curOp->debug().nreturned = stats.nReturned;
}
+
// Any code that needs the cursor pinned must be inside the try block, above.
return Status::OK();
}
diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp
index 51016d486ce..2af0907170d 100644
--- a/src/mongo/db/curop.cpp
+++ b/src/mongo/db/curop.cpp
@@ -266,10 +266,12 @@ void CurOp::ensureStarted() {
}
}
-void CurOp::enter_inlock(const char* ns, int dbProfileLevel) {
+void CurOp::enter_inlock(const char* ns, boost::optional<int> dbProfileLevel) {
ensureStarted();
_ns = ns;
- raiseDbProfileLevel(dbProfileLevel);
+ if (dbProfileLevel) {
+ raiseDbProfileLevel(*dbProfileLevel);
+ }
}
void CurOp::raiseDbProfileLevel(int dbProfileLevel) {
diff --git a/src/mongo/db/curop.h b/src/mongo/db/curop.h
index 4e0ef0d2d7b..158513bf6fe 100644
--- a/src/mongo/db/curop.h
+++ b/src/mongo/db/curop.h
@@ -192,7 +192,7 @@ public:
return _originatingCommand;
}
- void enter_inlock(const char* ns, int dbProfileLevel);
+ void enter_inlock(const char* ns, boost::optional<int> dbProfileLevel);
/**
* Sets the type of the current network operation.
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index 84b6584b540..8001b658a09 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -75,6 +75,35 @@ AutoGetOrCreateDb::AutoGetOrCreateDb(OperationContext* opCtx, StringData ns, Loc
}
}
+AutoStatsTracker::AutoStatsTracker(OperationContext* opCtx,
+ const NamespaceString& nss,
+ Top::LockType lockType,
+ boost::optional<int> dbProfilingLevel)
+ : _opCtx(opCtx), _lockType(lockType) {
+ if (!dbProfilingLevel) {
+ // No profiling level was determined, attempt to read the profiling level from the Database
+ // object.
+ AutoGetDb autoDb(_opCtx, nss.db(), MODE_IS);
+ if (autoDb.getDb()) {
+ dbProfilingLevel = autoDb.getDb()->getProfilingLevel();
+ }
+ }
+ stdx::lock_guard<Client> clientLock(*_opCtx->getClient());
+ CurOp::get(_opCtx)->enter_inlock(nss.ns().c_str(), dbProfilingLevel);
+}
+
+AutoStatsTracker::~AutoStatsTracker() {
+ auto curOp = CurOp::get(_opCtx);
+ Top::get(_opCtx->getServiceContext())
+ .record(_opCtx,
+ curOp->getNS(),
+ curOp->getLogicalOp(),
+ _lockType,
+ _timer.micros(),
+ curOp->isCommand(),
+ curOp->getReadWriteType());
+}
+
AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx,
const NamespaceString& nss,
AutoGetCollection::ViewMode viewMode) {
@@ -84,19 +113,6 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx,
_ensureMajorityCommittedSnapshotIsValid(nss, opCtx);
}
-AutoGetCollectionForReadCommand::~AutoGetCollectionForReadCommand() {
- // Report time spent in read lock
- auto currentOp = CurOp::get(_opCtx);
- Top::get(_opCtx->getClient()->getServiceContext())
- .record(_opCtx,
- currentOp->getNS(),
- currentOp->getLogicalOp(),
- -1, // "read locked"
- _timer.micros(),
- currentOp->isCommand(),
- currentOp->getReadWriteType());
-}
-
void AutoGetCollectionForRead::_ensureMajorityCommittedSnapshotIsValid(const NamespaceString& nss,
OperationContext* opCtx) {
while (true) {
@@ -134,25 +150,15 @@ void AutoGetCollectionForRead::_ensureMajorityCommittedSnapshotIsValid(const Nam
}
AutoGetCollectionForReadCommand::AutoGetCollectionForReadCommand(
- OperationContext* opCtx, const NamespaceString& nss, AutoGetCollection::ViewMode viewMode)
- : _opCtx(opCtx) {
- {
- _autoCollForRead.emplace(opCtx, nss, viewMode);
-
- auto curOp = CurOp::get(_opCtx);
- stdx::lock_guard<Client> lk(*_opCtx->getClient());
-
- // TODO: OldClientContext legacy, needs to be removed
- curOp->ensureStarted();
- curOp->setNS_inlock(nss.ns());
-
- // At this point, we are locked in shared mode for the database by the DB lock in the
- // constructor, so it is safe to load the DB pointer.
- if (_autoCollForRead->getDb()) {
- // TODO: OldClientContext legacy, needs to be removed
- curOp->enter_inlock(nss.ns().c_str(), _autoCollForRead->getDb()->getProfilingLevel());
- }
- }
+ OperationContext* opCtx, const NamespaceString& nss, AutoGetCollection::ViewMode viewMode) {
+
+ _autoCollForRead.emplace(opCtx, nss, viewMode);
+ const int doNotChangeProfilingLevel = 0;
+ _statsTracker.emplace(opCtx,
+ nss,
+ Top::LockType::ReadLocked,
+ _autoCollForRead->getDb() ? _autoCollForRead->getDb()->getProfilingLevel()
+ : doNotChangeProfilingLevel);
// We have both the DB and collection locked, which is the prerequisite to do a stable shard
// version check, but we'd like to do the check after we have a satisfactory snapshot.
@@ -231,7 +237,8 @@ OldClientContext::~OldClientContext() {
.record(_opCtx,
currentOp->getNS(),
currentOp->getLogicalOp(),
- _opCtx->lockState()->isWriteLocked() ? 1 : -1,
+ _opCtx->lockState()->isWriteLocked() ? Top::LockType::WriteLocked
+ : Top::LockType::ReadLocked,
_timer.micros(),
currentOp->isCommand(),
currentOp->getReadWriteType());
diff --git a/src/mongo/db/db_raii.h b/src/mongo/db/db_raii.h
index 8060d06f000..42444775bad 100644
--- a/src/mongo/db/db_raii.h
+++ b/src/mongo/db/db_raii.h
@@ -35,6 +35,7 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/stats/top.h"
#include "mongo/db/views/view.h"
#include "mongo/util/timer.h"
@@ -170,6 +171,36 @@ private:
};
/**
+ * RAII-style class which automatically tracks the operation namespace in CurrentOp and records the
+ * operation via Top upon destruction.
+ */
+class AutoStatsTracker {
+ MONGO_DISALLOW_COPYING(AutoStatsTracker);
+
+public:
+ /**
+ * Sets the namespace of the CurOp object associated with 'opCtx' to be 'nss' and starts the
+ * CurOp timer. 'lockType' describes which type of lock is held by this operation, and will be
+ * used for reporting via Top. If 'dbProfilingLevel' is not given, this constructor will acquire
+ * and then drop a database lock in order to determine the database's profiling level.
+ */
+ AutoStatsTracker(OperationContext* opCtx,
+ const NamespaceString& nss,
+ Top::LockType lockType,
+ boost::optional<int> dbProfilingLevel);
+
+ /**
+ * Records stats about the current operation via Top.
+ */
+ ~AutoStatsTracker();
+
+private:
+ const Timer _timer;
+ OperationContext* _opCtx;
+ Top::LockType _lockType;
+};
+
+/**
* RAII-style class, which would acquire the appropriate hierarchy of locks for obtaining
* a particular collection and would retrieve a reference to the collection. In addition, this
* utility will ensure that the read will be performed against an appropriately committed snapshot
@@ -236,8 +267,6 @@ public:
: AutoGetCollectionForReadCommand(
opCtx, nss, AutoGetCollection::ViewMode::kViewsForbidden) {}
- ~AutoGetCollectionForReadCommand();
-
Database* getDb() const {
return _autoCollForRead->getDb();
}
@@ -246,21 +275,18 @@ public:
return _autoCollForRead->getCollection();
}
-private:
- OperationContext* const _opCtx;
- const Timer _timer;
-
protected:
AutoGetCollectionForReadCommand(OperationContext* opCtx,
const NamespaceString& nss,
AutoGetCollection::ViewMode viewMode);
- /**
- * This protected section must come after the private section because
- * AutoGetCollectionOrViewForRead needs access to _autoColl, but _autoColl must be initialized
- * after _transaction.
- */
+ // '_autoCollForRead' may need to be reset by AutoGetCollectionOrViewForReadCommand, so needs to
+ // be a boost::optional.
boost::optional<AutoGetCollectionForRead> _autoCollForRead;
+
+ // This needs to be initialized after 'autoCollForRead', since we need to consult the Database
+ // object to get the profiling level. Thus, it needs to be a boost::optional.
+ boost::optional<AutoStatsTracker> _statsTracker;
};
/**
diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h
index 9482f9ba565..fd5b99c4bae 100644
--- a/src/mongo/db/exec/pipeline_proxy.h
+++ b/src/mongo/db/exec/pipeline_proxy.h
@@ -37,6 +37,7 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/record_id.h"
+#include "mongo/util/assert_util.h"
namespace mongo {
@@ -64,13 +65,18 @@ public:
// Not used.
SpecificStats* getSpecificStats() const final {
- return NULL;
+ MONGO_UNREACHABLE;
+ }
+
+ void doInvalidate(OperationContext* txn, const RecordId& rid, InvalidationType type) final {
+ // A PlanExecutor with a PipelineProxyStage should be registered with the global cursor
+ // manager, so should not receive invalidations.
+ MONGO_UNREACHABLE;
}
std::string getPlanSummaryStr() const;
void getPlanSummaryStats(PlanSummaryStats* statsOut) const;
- // Not used.
StageType stageType() const final {
return STAGE_PIPELINE_PROXY;
}
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 6ee4d989ceb..0a453546a61 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -137,6 +137,17 @@ NamespaceString NamespaceString::getTargetNSForListIndexes() const {
return NamespaceString(db(), coll().substr(listIndexesCursorNSPrefix.size()));
}
+boost::optional<NamespaceString> NamespaceString::getTargetNSForGloballyManagedNamespace() const {
+ // Globally managed namespaces are of the form '$cmd.commandName.<targetNs>' or simply
+ // '$cmd.commandName'.
+ dassert(isGloballyManagedNamespace());
+ const size_t indexOfNextDot = coll().find('.', 5);
+ if (indexOfNextDot == std::string::npos) {
+ return boost::none;
+ }
+ return NamespaceString{db(), coll().substr(indexOfNextDot + 1)};
+}
+
string NamespaceString::escapeDbName(const StringData dbname) {
std::string escapedDbName;
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index c623dbbac90..58f26d76b2c 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -31,6 +31,7 @@
#pragma once
#include <algorithm>
+#include <boost/optional.hpp>
#include <iosfwd>
#include <string>
@@ -93,13 +94,13 @@ public:
NamespaceString(StringData dbName, StringData collectionName);
/**
- * Contructs a NamespaceString representing a listCollections namespace. The format for this
+ * Constructs a NamespaceString representing a listCollections namespace. The format for this
* namespace is "<dbName>.$cmd.listCollections".
*/
static NamespaceString makeListCollectionsNSS(StringData dbName);
/**
- * Contructs a NamespaceString representing a listIndexes namespace. The format for this
+ * Constructs a NamespaceString representing a listIndexes namespace. The format for this
* namespace is "<dbName>.$cmd.listIndexes.<collectionName>".
*/
static NamespaceString makeListIndexesNSS(StringData dbName, StringData collectionName);
@@ -202,10 +203,25 @@ public:
bool isVirtualized() const {
return virtualized(_ns);
}
+
+ /**
+ * Returns true if cursors for this namespace are registered with the global cursor manager.
+ */
+ bool isGloballyManagedNamespace() const {
+ return coll().startsWith("$cmd."_sd);
+ }
+
bool isListCollectionsCursorNS() const;
bool isListIndexesCursorNS() const;
/**
+ * Given a NamespaceString for which isGloballyManagedNamespace() returns true, returns the
+ * namespace the command targets, or boost::none for commands like 'listCollections' which
+ * do not target a collection.
+ */
+ boost::optional<NamespaceString> getTargetNSForGloballyManagedNamespace() const;
+
+ /**
* Given a NamespaceString for which isListIndexesCursorNS() returns true, returns the
* NamespaceString for the collection that the "listIndexes" targets.
*/
diff --git a/src/mongo/db/namespace_string_test.cpp b/src/mongo/db/namespace_string_test.cpp
index 69884bd5ca9..3e4b489b508 100644
--- a/src/mongo/db/namespace_string_test.cpp
+++ b/src/mongo/db/namespace_string_test.cpp
@@ -173,6 +173,41 @@ TEST(NamespaceStringTest, ListIndexesCursorNS) {
ASSERT(!NamespaceString("test.$cmd.listCollections.foo").isListIndexesCursorNS());
}
+TEST(NamespaceStringTest, IsGloballyManagedNamespace) {
+ ASSERT_TRUE(NamespaceString{"test.$cmd.aggregate.foo"}.isGloballyManagedNamespace());
+ ASSERT_TRUE(NamespaceString{"test.$cmd.listIndexes.foo"}.isGloballyManagedNamespace());
+ ASSERT_TRUE(NamespaceString{"test.$cmd.otherCommand.foo"}.isGloballyManagedNamespace());
+ ASSERT_TRUE(NamespaceString{"test.$cmd.listCollections"}.isGloballyManagedNamespace());
+ ASSERT_TRUE(NamespaceString{"test.$cmd.otherCommand"}.isGloballyManagedNamespace());
+ ASSERT_TRUE(NamespaceString{"test.$cmd.aggregate"}.isGloballyManagedNamespace());
+ ASSERT_TRUE(NamespaceString{"test.$cmd.listIndexes"}.isGloballyManagedNamespace());
+
+ ASSERT_FALSE(NamespaceString{"test.foo"}.isGloballyManagedNamespace());
+ ASSERT_FALSE(NamespaceString{"test.$cmd"}.isGloballyManagedNamespace());
+
+ ASSERT_FALSE(NamespaceString{"$cmd.aggregate.foo"}.isGloballyManagedNamespace());
+ ASSERT_FALSE(NamespaceString{"$cmd.listCollections"}.isGloballyManagedNamespace());
+}
+
+TEST(NamespaceStringTest, GetTargetNSForGloballyManagedNamespace) {
+ ASSERT_EQ(
+ (NamespaceString{"test", "foo"}),
+ NamespaceString{"test.$cmd.aggregate.foo"}.getTargetNSForGloballyManagedNamespace().get());
+ ASSERT_EQ((NamespaceString{"test", "foo"}),
+ NamespaceString{"test.$cmd.listIndexes.foo"}
+ .getTargetNSForGloballyManagedNamespace()
+ .get());
+ ASSERT_EQ((NamespaceString{"test", "foo"}),
+ NamespaceString{"test.$cmd.otherCommand.foo"}
+ .getTargetNSForGloballyManagedNamespace()
+ .get());
+
+ ASSERT_FALSE(
+ NamespaceString{"test.$cmd.listCollections"}.getTargetNSForGloballyManagedNamespace());
+ ASSERT_FALSE(
+ NamespaceString{"test.$cmd.otherCommand"}.getTargetNSForGloballyManagedNamespace());
+}
+
TEST(NamespaceStringTest, CollectionComponentValidNames) {
ASSERT(NamespaceString::validCollectionComponent("a.b"));
ASSERT(NamespaceString::validCollectionComponent("a.b"));
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 60f0b7c79b6..a29a42e4a52 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -94,7 +94,7 @@ void finishCurOp(OperationContext* opCtx, CurOp* curOp) {
.record(opCtx,
curOp->getNS(),
curOp->getLogicalOp(),
- 1, // "write locked"
+ Top::LockType::WriteLocked,
curOp->totalTimeMicros(),
curOp->isCommand(),
curOp->getReadWriteType());
@@ -412,7 +412,7 @@ WriteResult performInserts(OperationContext* opCtx, const InsertOp& wholeOp) {
.record(opCtx,
wholeOp.ns.ns(),
LogicalOp::opInsert,
- 1 /* write locked*/,
+ Top::LockType::WriteLocked,
curOp.totalTimeMicros(),
curOp.isCommand(),
curOp.getReadWriteType());
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index e243adda24a..ee26ddc62fd 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -473,5 +473,6 @@ env.Library(
'$BUILD_DIR/mongo/db/index/index_access_methods',
'$BUILD_DIR/mongo/db/matcher/expressions_mongod_only',
'$BUILD_DIR/mongo/db/stats/serveronly',
+ '$BUILD_DIR/mongo/db/clientcursor',
],
)
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index ae62da0a978..edc49e4923e 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -31,12 +31,11 @@
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/catalog/collection.h"
+#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/query/explain.h"
-#include "mongo/db/query/find_common.h"
-#include "mongo/db/server_parameters.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/util/scopeguard.h"
@@ -66,20 +65,27 @@ DocumentSource::GetNextResult DocumentSourceCursor::getNext() {
}
void DocumentSourceCursor::dispose() {
- _exec.reset();
_currentBatch.clear();
+ if (!_exec) {
+ return;
+ }
+
+ // We must hold a collection lock to destroy a PlanExecutor to ensure the CursorManager will
+ // still exist and can be safely told to deregister the PlanExecutor.
+ invariant(pExpCtx->opCtx);
+ AutoGetCollection autoColl(pExpCtx->opCtx, _exec->nss(), MODE_IS);
+ _exec.reset();
+ _rangePreserver.release();
}
void DocumentSourceCursor::loadBatch() {
if (!_exec) {
+ // No more documents.
dispose();
return;
}
- // We have already validated the sharding version when we constructed the PlanExecutor
- // so we shouldn't check it again.
- AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _nss);
-
+ AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss());
_exec->restoreState();
int memUsageBytes = 0;
@@ -88,7 +94,7 @@ void DocumentSourceCursor::loadBatch() {
{
ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); });
- while ((state = _exec->getNext(&obj, NULL)) == PlanExecutor::ADVANCED) {
+ while ((state = _exec->getNext(&obj, nullptr)) == PlanExecutor::ADVANCED) {
if (_shouldProduceEmptyDocs) {
_currentBatch.push_back(Document());
} else if (_dependencies) {
@@ -114,27 +120,28 @@ void DocumentSourceCursor::loadBatch() {
}
}
- // If we got here, there won't be any more documents, so destroy the executor. Can't use
- // dispose since we want to keep the _currentBatch.
+ // If we got here, there won't be any more documents, so destroy our PlanExecutor. Note we can't
+ // use dispose() since we want to keep the current batch.
_exec.reset();
-
- uassert(16028,
- str::stream() << "collection or index disappeared when cursor yielded: "
- << WorkingSetCommon::toStatusString(obj),
- state != PlanExecutor::DEAD);
-
- uassert(17285,
- str::stream() << "cursor encountered an error: "
- << WorkingSetCommon::toStatusString(obj),
- state != PlanExecutor::FAILURE);
-
- massert(17286,
- str::stream() << "Unexpected return from PlanExecutor::getNext: " << state,
- state == PlanExecutor::IS_EOF || state == PlanExecutor::ADVANCED);
-}
-
-long long DocumentSourceCursor::getLimit() const {
- return _limit ? _limit->getLimit() : -1;
+ _rangePreserver.release();
+
+ switch (state) {
+ case PlanExecutor::ADVANCED:
+ case PlanExecutor::IS_EOF:
+ return; // We've reached our limit or exhausted the cursor.
+ case PlanExecutor::DEAD: {
+ uasserted(ErrorCodes::QueryPlanKilled,
+ str::stream() << "collection or index disappeared when cursor yielded: "
+ << WorkingSetCommon::toStatusString(obj));
+ }
+ case PlanExecutor::FAILURE: {
+ uasserted(17285,
+ str::stream() << "cursor encountered an error: "
+ << WorkingSetCommon::toStatusString(obj));
+ }
+ default:
+ MONGO_UNREACHABLE;
+ }
}
Pipeline::SourceContainer::iterator DocumentSourceCursor::doOptimizeAt(
@@ -156,12 +163,6 @@ Pipeline::SourceContainer::iterator DocumentSourceCursor::doOptimizeAt(
return std::next(itr);
}
-
-void DocumentSourceCursor::recordPlanSummaryStr() {
- invariant(_exec);
- _planSummary = Explain::getPlanSummary(_exec.get());
-}
-
void DocumentSourceCursor::recordPlanSummaryStats() {
invariant(_exec);
// Aggregation handles in-memory sort outside of the query sub-system. Given that we need to
@@ -175,20 +176,17 @@ void DocumentSourceCursor::recordPlanSummaryStats() {
}
Value DocumentSourceCursor::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
- // we never parse a documentSourceCursor, so we only serialize for explain
+ // We never parse a DocumentSourceCursor, so we only serialize for explain.
if (!explain)
return Value();
// Get planner-level explain info from the underlying PlanExecutor.
+ invariant(_exec);
BSONObjBuilder explainBuilder;
{
- AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _nss);
-
- massert(17392, "No _exec. Were we disposed before explained?", _exec);
-
+ AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss());
_exec->restoreState();
Explain::explainStages(_exec.get(), autoColl.getCollection(), *explain, &explainBuilder);
-
_exec->saveState();
}
@@ -229,44 +227,28 @@ void DocumentSourceCursor::reattachToOperationContext(OperationContext* opCtx) {
}
DocumentSourceCursor::DocumentSourceCursor(Collection* collection,
- const string& ns,
std::unique_ptr<PlanExecutor> exec,
const intrusive_ptr<ExpressionContext>& pCtx)
: DocumentSource(pCtx),
_docsAddedToBatches(0),
- _nss(ns),
+ _rangePreserver(collection),
_exec(std::move(exec)),
_outputSorts(_exec->getOutputSorts()) {
- recordPlanSummaryStr();
- // We record execution metrics here to allow for capture of indexes used prior to execution.
+ _planSummary = Explain::getPlanSummary(_exec.get());
recordPlanSummaryStats();
+
if (collection) {
- collection->infoCache()->notifyOfQuery(pCtx->opCtx, _planSummaryStats.indexesUsed);
+ collection->infoCache()->notifyOfQuery(pExpCtx->opCtx, _planSummaryStats.indexesUsed);
}
}
intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create(
Collection* collection,
- const string& ns,
std::unique_ptr<PlanExecutor> exec,
const intrusive_ptr<ExpressionContext>& pExpCtx) {
intrusive_ptr<DocumentSourceCursor> source(
- new DocumentSourceCursor(collection, ns, std::move(exec), pExpCtx));
+ new DocumentSourceCursor(collection, std::move(exec), pExpCtx));
return source;
}
-
-void DocumentSourceCursor::setProjection(const BSONObj& projection,
- const boost::optional<ParsedDeps>& deps) {
- _projection = projection;
- _dependencies = deps;
-}
-
-const std::string& DocumentSourceCursor::getPlanSummaryStr() const {
- return _planSummary;
-}
-
-const PlanSummaryStats& DocumentSourceCursor::getPlanSummaryStats() const {
- return _planSummaryStats;
-}
}
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 3faeea86de6..348174b656a 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -34,16 +34,14 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/query/plan_summary_stats.h"
+#include "mongo/db/range_preserver.h"
namespace mongo {
class PlanExecutor;
/**
- * Constructs and returns Documents from the BSONObj objects produced by a supplied
- * PlanExecutor.
- *
- * An object of this type may only be used by one thread, see SERVER-6123.
+ * Constructs and returns Documents from the BSONObj objects produced by a supplied PlanExecutor.
*/
class DocumentSourceCursor final : public DocumentSource {
public:
@@ -69,14 +67,11 @@ public:
void reattachToOperationContext(OperationContext* opCtx) final;
/**
- * Create a document source based on a passed-in PlanExecutor.
- *
- * This is usually put at the beginning of a chain of document sources
- * in order to fetch data from the database.
+ * Create a document source based on a passed-in PlanExecutor. 'exec' must be a yielding
+ * PlanExecutor, and must be registered with the associated collection's CursorManager.
*/
static boost::intrusive_ptr<DocumentSourceCursor> create(
Collection* collection,
- const std::string& ns,
std::unique_ptr<PlanExecutor> exec,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
@@ -116,10 +111,17 @@ public:
* @param projection The projection that has been passed down to the query system.
* @param deps The output of DepsTracker::toParsedDeps.
*/
- void setProjection(const BSONObj& projection, const boost::optional<ParsedDeps>& deps);
+ void setProjection(const BSONObj& projection, const boost::optional<ParsedDeps>& deps) {
+ _projection = projection;
+ _dependencies = deps;
+ }
- /// returns -1 for no limit
- long long getLimit() const;
+ /**
+ * Returns the limit associated with this cursor, or -1 if there is no limit.
+ */
+ long long getLimit() const {
+ return _limit ? _limit->getLimit() : -1;
+ }
/**
* If subsequent sources need no information from the cursor, the cursor can simply output empty
@@ -129,20 +131,21 @@ public:
_shouldProduceEmptyDocs = true;
}
- const std::string& getPlanSummaryStr() const;
+ const std::string& getPlanSummaryStr() const {
+ return _planSummary;
+ }
- const PlanSummaryStats& getPlanSummaryStats() const;
+ const PlanSummaryStats& getPlanSummaryStats() const {
+ return _planSummaryStats;
+ }
private:
DocumentSourceCursor(Collection* collection,
- const std::string& ns,
std::unique_ptr<PlanExecutor> exec,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
void loadBatch();
- void recordPlanSummaryStr();
-
void recordPlanSummaryStats();
std::deque<Document> _currentBatch;
@@ -156,7 +159,7 @@ private:
boost::intrusive_ptr<DocumentSourceLimit> _limit;
long long _docsAddedToBatches; // for _limit enforcement
- const NamespaceString _nss;
+ RangePreserver _rangePreserver;
std::unique_ptr<PlanExecutor> _exec;
BSONObjSet _outputSorts;
std::string _planSummary;
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 262deeb140b..c37c1301198 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -579,15 +579,12 @@ void PipelineD::addCursorSource(Collection* collection,
const BSONObj& queryObj,
const BSONObj& sortObj,
const BSONObj& projectionObj) {
- // Get the full "namespace" name.
- const string& fullName = expCtx->ns.ns();
-
// DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved.
exec->saveState();
// Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline.
intrusive_ptr<DocumentSourceCursor> pSource =
- DocumentSourceCursor::create(collection, fullName, std::move(exec), expCtx);
+ DocumentSourceCursor::create(collection, std::move(exec), expCtx);
// Note the query, sort, and projection for explain.
pSource->setQuery(queryObj);
diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp
index eb43caf5047..b243336655b 100644
--- a/src/mongo/db/query/explain.cpp
+++ b/src/mongo/db/query/explain.cpp
@@ -596,7 +596,7 @@ void Explain::generatePlannerInfo(PlanExecutor* exec,
BSONObjBuilder plannerBob(out->subobjStart("queryPlanner"));
plannerBob.append("plannerVersion", QueryPlanner::kPlannerVersion);
- plannerBob.append("namespace", exec->ns());
+ plannerBob.append("namespace", exec->nss().ns());
// Find whether there is an index filter set for the query shape. The 'indexFilterSet'
// field will always be false in the case of EOF or idhack plans.
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index 44f69823096..62a08389e3f 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -55,6 +55,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/stats/top.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/stale_exception.h"
@@ -62,10 +63,10 @@
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
-using std::endl;
using std::unique_ptr;
using stdx::make_unique;
@@ -238,47 +239,45 @@ Message getMore(OperationContext* opCtx,
const NamespaceString nss(ns);
- // Depending on the type of cursor being operated on, we hold locks for the whole getMore,
- // or none of the getMore, or part of the getMore. The three cases in detail:
+ // Cursors come in one of two flavors:
+ // - Cursors owned by the collection cursor manager, such as those generated via the find
+ // command. For these cursors, we hold the appropriate collection lock for the duration of
+ // the getMore using AutoGetCollectionForRead. This will automatically update the CurOp
+ // object appropriately and record execution time via Top upon completion.
+ // - Cursors owned by the global cursor manager, such as those generated via the aggregate
+ // command. These cursors either hold no collection state or manage their collection state
+ // internally, so we acquire no locks. In this case we use the AutoStatsTracker object to
+ // update the CurOp object appropriately and record execution time via Top upon
+ // completion.
//
- // 1) Normal cursor: we lock with "ctx" and hold it for the whole getMore.
- // 2) Cursor owned by global cursor manager: we don't lock anything. These cursors don't own
- // any collection state. These cursors are generated either by the listCollections or
- // listIndexes commands, as these special cursor-generating commands operate over catalog
- // data rather than targeting the data within a collection.
- // 3) Agg cursor: we lock with "ctx", then release, then relock with "unpinDBLock" and
- // "unpinCollLock". This is because agg cursors handle locking internally (hence the
- // release), but the pin and unpin of the cursor must occur under the collection lock.
- // We don't use our AutoGetCollectionForRead "ctx" to relock, because
- // AutoGetCollectionForRead checks the sharding version (and we want the relock for the
- // unpin to succeed even if the sharding version has changed).
- //
- // Note that we declare our locks before our ClientCursorPin, in order to ensure that the
- // pin's destructor is called before the lock destructors (so that the unpin occurs under
- // the lock).
- unique_ptr<AutoGetCollectionForReadCommand> ctx;
- unique_ptr<Lock::DBLock> unpinDBLock;
- unique_ptr<Lock::CollectionLock> unpinCollLock;
-
+ // Thus, only one of 'readLock' and 'statsTracker' will be populated as we populate
+ // 'cursorManager'.
+ boost::optional<AutoGetCollectionForReadCommand> readLock;
+ boost::optional<AutoStatsTracker> statsTracker;
CursorManager* cursorManager;
- if (nss.isListIndexesCursorNS() || nss.isListCollectionsCursorNS()) {
- // List collections and list indexes are special cursor-generating commands whose
- // cursors are managed globally, as they operate over catalog data rather than targeting
- // the data within a collection.
+
+ if (CursorManager::isGloballyManagedCursor(cursorid)) {
cursorManager = CursorManager::getGlobalCursorManager();
- } else {
- ctx = stdx::make_unique<AutoGetCollectionOrViewForReadCommand>(opCtx, nss);
- auto viewCtx = static_cast<AutoGetCollectionOrViewForReadCommand*>(ctx.get());
- if (viewCtx->getView()) {
- uasserted(
+
+ if (boost::optional<NamespaceString> nssForCurOp = nss.isGloballyManagedNamespace()
+ ? nss.getTargetNSForGloballyManagedNamespace()
+ : nss) {
+ AutoGetDb autoDb(opCtx, nssForCurOp->db(), MODE_IS);
+ const auto profilingLevel = autoDb.getDb()
+ ? boost::optional<int>{autoDb.getDb()->getProfilingLevel()}
+ : boost::none;
+ statsTracker.emplace(opCtx, *nssForCurOp, Top::LockType::NotLocked, profilingLevel);
+ uassert(
ErrorCodes::CommandNotSupportedOnView,
- str::stream() << "Namespace " << nss.ns()
+ str::stream() << "Namespace " << nssForCurOp->ns()
<< " is a view. OP_GET_MORE operations are not supported on views. "
<< "Only clients which support the getMore command can be used to "
- "query views.");
+ "query views.",
+ !autoDb.getDb()->getViewCatalog()->lookup(opCtx, nssForCurOp->ns()));
}
-
- Collection* collection = ctx->getCollection();
+ } else {
+ readLock.emplace(opCtx, nss);
+ Collection* collection = readLock->getCollection();
uassert(17356, "collection dropped between getMore calls", collection);
cursorManager = collection->getCursorManager();
}
@@ -323,8 +322,8 @@ Message getMore(OperationContext* opCtx,
str::stream() << "Requested getMore on namespace " << ns << ", but cursor "
<< cursorid
<< " belongs to namespace "
- << cc->ns(),
- ns == cc->ns());
+ << cc->nss().ns(),
+ nss == cc->nss());
*isCursorAuthorized = true;
if (cc->isReadCommitted())
@@ -345,11 +344,6 @@ Message getMore(OperationContext* opCtx,
cc->updateSlaveLocation(opCtx);
- if (cc->isAggCursor()) {
- // Agg cursors handle their own locking internally.
- ctx.reset(); // unlocks
- }
-
// If we're replaying the oplog, we save the last time that we read.
Timestamp slaveReadTill;
@@ -359,12 +353,12 @@ Message getMore(OperationContext* opCtx,
uint64_t notifierVersion = 0;
std::shared_ptr<CappedInsertNotifier> notifier;
if (isCursorAwaitData(cc)) {
- invariant(ctx->getCollection()->isCapped());
+ invariant(readLock->getCollection()->isCapped());
// Retrieve the notifier which we will wait on until new data arrives. We make sure
// to do this in the lock because once we drop the lock it is possible for the
// collection to become invalid. The notifier itself will outlive the collection if
// the collection is dropped, as we keep a shared_ptr to it.
- notifier = ctx->getCollection()->getCappedInsertNotifier();
+ notifier = readLock->getCollection()->getCappedInsertNotifier();
// Must get the version before we call generateBatch in case a write comes in after
// that call and before we call wait on the notifier.
@@ -384,7 +378,7 @@ Message getMore(OperationContext* opCtx,
// and currentOp. Upconvert _query to resemble a getMore command, and set the original
// command or upconverted legacy query in the originatingCommand field.
curOp.setQuery_inlock(upconvertGetMoreEntry(nss, cursorid, ntoreturn));
- curOp.setOriginatingCommand_inlock(cc->getQuery());
+ curOp.setOriginatingCommand_inlock(cc->getOriginatingCommandObj());
}
PlanExecutor::ExecState state;
@@ -402,7 +396,7 @@ Message getMore(OperationContext* opCtx,
if (isCursorAwaitData(cc) && state == PlanExecutor::IS_EOF && numResults == 0) {
// Save the PlanExecutor and drop our locks.
exec->saveState();
- ctx.reset();
+ readLock.reset();
// Block waiting for data for up to 1 second.
Seconds timeout(1);
@@ -414,7 +408,7 @@ Message getMore(OperationContext* opCtx,
curOp.setExpectedLatencyMs(durationCount<Milliseconds>(timeout));
// Reacquiring locks.
- ctx = make_unique<AutoGetCollectionForReadCommand>(opCtx, nss);
+ readLock.emplace(opCtx, nss);
exec->restoreState();
// We woke up because either the timed_wait expired, or there was more data. Either
@@ -428,44 +422,28 @@ Message getMore(OperationContext* opCtx,
postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined;
curOp.debug().setPlanSummaryMetrics(postExecutionStats);
- // We do not report 'execStats' for aggregation, both in the original request and
- // subsequent getMore. The reason for this is that aggregation's source PlanExecutor
- // could be destroyed before we know whether we need execStats and we do not want to
- // generate for all operations due to cost.
- if (!cc->isAggCursor() && curOp.shouldDBProfile()) {
+ // We do not report 'execStats' for aggregation or other globally managed cursors, both in
+ // the original request and subsequent getMore. It would be useful to have this information
+ // for an aggregation, but the source PlanExecutor could be destroyed before we know whether
+ // we need execStats and we do not want to generate for all operations due to cost.
+ if (!CursorManager::isGloballyManagedCursor(cursorid) && curOp.shouldDBProfile()) {
BSONObjBuilder execStatsBob;
Explain::getWinningPlanStats(exec, &execStatsBob);
curOp.debug().execStats = execStatsBob.obj();
}
- // We have to do this before re-acquiring locks in the agg case because
- // shouldSaveCursorGetMore() can make a network call for agg cursors.
- //
- // TODO: Getting rid of PlanExecutor::isEOF() in favor of PlanExecutor::IS_EOF would mean
- // that this network operation is no longer necessary.
- const bool shouldSaveCursor = shouldSaveCursorGetMore(state, exec, isCursorTailable(cc));
-
- // In order to deregister a cursor, we need to be holding the DB + collection lock and
- // if the cursor is aggregation, we release these locks.
- if (cc->isAggCursor()) {
- invariant(NULL == ctx.get());
- unpinDBLock = make_unique<Lock::DBLock>(opCtx, nss.db(), MODE_IS);
- unpinCollLock =
- make_unique<Lock::CollectionLock>(opCtx->lockState(), nss.ns(), MODE_IS);
- }
-
// Our two possible ClientCursorPin cleanup paths are:
// 1) If the cursor is not going to be saved, we call deleteUnderlying() on the pin.
- // 2) If the cursor is going to be saved, we simply let the pin go out of scope. In
- // this case, the pin's destructor will be invoked, which will call release() on the
- // pin. Because our ClientCursorPin is declared after our lock is declared, this
- // will happen under the lock.
- if (!shouldSaveCursor) {
+ // 2) If the cursor is going to be saved, we simply let the pin go out of scope. In this
+ // case, the pin's destructor will be invoked, which will call release() on the pin.
+ // Because our ClientCursorPin is declared after our lock is declared, this will happen
+ // under the lock if any locking was necessary.
+ if (!shouldSaveCursorGetMore(state, exec, isCursorTailable(cc))) {
ccPin.getValue().deleteUnderlying();
// cc is now invalid, as is the executor
cursorid = 0;
- cc = NULL;
+ cc = nullptr;
curOp.debug().cursorExhausted = true;
LOG(5) << "getMore NOT saving client cursor, ended with state "
@@ -673,10 +651,9 @@ std::string runQuery(OperationContext* opCtx,
// Allocate a new ClientCursor and register it with the cursor manager.
ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor(
- {exec.release(),
- nss.ns(),
+ {std::move(exec),
+ nss,
opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
- qr.getOptions(),
upconvertQueryEntry(q.query, qr.nss(), q.ntoreturn, q.ntoskip)});
ccId = pinnedCursor.getCursor()->cursorid();
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 9aa85a7d81a..be3673369a7 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -739,8 +739,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorDelete(OperationContext* opCtx,
<< " Using EOF stage: " << redact(unparsedQuery);
auto deleteStage = make_unique<DeleteStage>(
opCtx, deleteStageParams, ws.get(), nullptr, new EOFStage(opCtx));
- return PlanExecutor::make(
- opCtx, std::move(ws), std::move(deleteStage), nss.ns(), policy);
+ return PlanExecutor::make(opCtx, std::move(ws), std::move(deleteStage), nss, policy);
}
const IndexDescriptor* descriptor = collection->getIndexCatalog()->findIdIndex(opCtx);
@@ -906,7 +905,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorUpdate(OperationContext* opCtx,
auto updateStage = make_unique<UpdateStage>(
opCtx, updateStageParams, ws.get(), collection, new EOFStage(opCtx));
return PlanExecutor::make(
- opCtx, std::move(ws), std::move(updateStage), nsString.ns(), policy);
+ opCtx, std::move(ws), std::move(updateStage), nsString, policy);
}
const IndexDescriptor* descriptor = collection->getIndexCatalog()->findIdIndex(opCtx);
@@ -1004,8 +1003,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorGroup(OperationContext* opCtx,
unique_ptr<PlanStage> root =
make_unique<GroupStage>(opCtx, request, ws.get(), new EOFStage(opCtx));
- return PlanExecutor::make(
- opCtx, std::move(ws), std::move(root), request.ns.ns(), yieldPolicy);
+ return PlanExecutor::make(opCtx, std::move(ws), std::move(root), request.ns, yieldPolicy);
}
const NamespaceString nss(request.ns);
@@ -1263,7 +1261,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorCount(OperationContext* opCtx,
unique_ptr<PlanStage> root = make_unique<CountStage>(
opCtx, collection, std::move(params), ws.get(), new EOFStage(opCtx));
return PlanExecutor::make(
- opCtx, std::move(ws), std::move(root), request.getNs().ns(), yieldPolicy);
+ opCtx, std::move(ws), std::move(root), request.getNs(), yieldPolicy);
}
// If the query is empty, then we can determine the count by just asking the collection
@@ -1280,7 +1278,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorCount(OperationContext* opCtx,
unique_ptr<PlanStage> root =
make_unique<CountStage>(opCtx, collection, std::move(params), ws.get(), nullptr);
return PlanExecutor::make(
- opCtx, std::move(ws), std::move(root), request.getNs().ns(), yieldPolicy);
+ opCtx, std::move(ws), std::move(root), request.getNs(), yieldPolicy);
}
const size_t plannerOptions = QueryPlannerParams::IS_COUNT;
diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp
index ade228a0223..f7c3e96bce1 100644
--- a/src/mongo/db/query/internal_plans.cpp
+++ b/src/mongo/db/query/internal_plans.cpp
@@ -52,8 +52,8 @@ std::unique_ptr<PlanExecutor> InternalPlanner::collectionScan(OperationContext*
if (NULL == collection) {
auto eof = stdx::make_unique<EOFStage>(opCtx);
// Takes ownership of 'ws' and 'eof'.
- auto statusWithPlanExecutor =
- PlanExecutor::make(opCtx, std::move(ws), std::move(eof), ns.toString(), yieldPolicy);
+ auto statusWithPlanExecutor = PlanExecutor::make(
+ opCtx, std::move(ws), std::move(eof), NamespaceString(ns), yieldPolicy);
invariant(statusWithPlanExecutor.isOK());
return std::move(statusWithPlanExecutor.getValue());
}
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index dd3f66164f4..8e79fe1a2c5 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -91,17 +91,23 @@ StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx,
const Collection* collection,
YieldPolicy yieldPolicy) {
return PlanExecutor::make(
- opCtx, std::move(ws), std::move(rt), nullptr, nullptr, collection, "", yieldPolicy);
+ opCtx, std::move(ws), std::move(rt), nullptr, nullptr, collection, {}, yieldPolicy);
}
// static
StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx,
unique_ptr<WorkingSet> ws,
unique_ptr<PlanStage> rt,
- const string& ns,
+ NamespaceString nss,
YieldPolicy yieldPolicy) {
- return PlanExecutor::make(
- opCtx, std::move(ws), std::move(rt), nullptr, nullptr, nullptr, ns, yieldPolicy);
+ return PlanExecutor::make(opCtx,
+ std::move(ws),
+ std::move(rt),
+ nullptr,
+ nullptr,
+ nullptr,
+ std::move(nss),
+ yieldPolicy);
}
// static
@@ -112,7 +118,7 @@ StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx,
const Collection* collection,
YieldPolicy yieldPolicy) {
return PlanExecutor::make(
- opCtx, std::move(ws), std::move(rt), nullptr, std::move(cq), collection, "", yieldPolicy);
+ opCtx, std::move(ws), std::move(rt), nullptr, std::move(cq), collection, {}, yieldPolicy);
}
// static
@@ -129,7 +135,7 @@ StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx,
std::move(qs),
std::move(cq),
collection,
- "",
+ {},
yieldPolicy);
}
@@ -140,10 +146,15 @@ StatusWith<unique_ptr<PlanExecutor>> PlanExecutor::make(OperationContext* opCtx,
unique_ptr<QuerySolution> qs,
unique_ptr<CanonicalQuery> cq,
const Collection* collection,
- const string& ns,
+ NamespaceString nss,
YieldPolicy yieldPolicy) {
- unique_ptr<PlanExecutor> exec(new PlanExecutor(
- opCtx, std::move(ws), std::move(rt), std::move(qs), std::move(cq), collection, ns));
+ unique_ptr<PlanExecutor> exec(new PlanExecutor(opCtx,
+ std::move(ws),
+ std::move(rt),
+ std::move(qs),
+ std::move(cq),
+ collection,
+ std::move(nss)));
// Perform plan selection, if necessary.
Status status = exec->pickBestPlan(yieldPolicy, collection);
@@ -160,25 +171,24 @@ PlanExecutor::PlanExecutor(OperationContext* opCtx,
unique_ptr<QuerySolution> qs,
unique_ptr<CanonicalQuery> cq,
const Collection* collection,
- const string& ns)
+ NamespaceString nss)
: _opCtx(opCtx),
_cq(std::move(cq)),
_workingSet(std::move(ws)),
_qs(std::move(qs)),
_root(std::move(rt)),
- _ns(ns),
+ _nss(std::move(nss)),
_yieldPolicy(new PlanYieldPolicy(this, YIELD_MANUAL)) {
- // We may still need to initialize _ns from either collection or _cq.
- if (!_ns.empty()) {
- // We already have an _ns set, so there's nothing more to do.
- return;
+ // We may still need to initialize _nss from either collection or _cq.
+ if (!_nss.isEmpty()) {
+ return; // We already have an _nss set, so there's nothing more to do.
}
if (collection) {
- _ns = collection->ns().ns();
+ _nss = collection->ns();
} else {
invariant(_cq);
- _ns = _cq->getQueryRequest().ns();
+ _nss = _cq->getQueryRequest().nss();
}
}
@@ -362,7 +372,7 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
MONGO_FAIL_POINT_BLOCK(planExecutorAlwaysDead, customKill) {
const BSONObj& data = customKill.getData();
BSONElement customKillNS = data["namespace"];
- if (!customKillNS || _ns == customKillNS.str()) {
+ if (!customKillNS || _nss.ns() == customKillNS.str()) {
deregisterExec();
kill("hit planExecutorAlwaysDead fail point");
}
@@ -469,7 +479,8 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
throw WriteConflictException();
CurOp::get(_opCtx)->debug().writeConflicts++;
writeConflictsInARow++;
- WriteConflictException::logAndBackoff(writeConflictsInARow, "plan execution", _ns);
+ WriteConflictException::logAndBackoff(
+ writeConflictsInARow, "plan execution", _nss.ns());
} else {
WorkingSetMember* member = _workingSet->get(id);
@@ -546,10 +557,6 @@ Status PlanExecutor::executePlan() {
return Status::OK();
}
-const string& PlanExecutor::ns() {
- return _ns;
-}
-
void PlanExecutor::setYieldPolicy(YieldPolicy policy,
const Collection* collection,
bool registerExecutor) {
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index 1e97963f67b..325d7642a91 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -153,7 +153,7 @@ public:
static StatusWith<std::unique_ptr<PlanExecutor>> make(OperationContext* opCtx,
std::unique_ptr<WorkingSet> ws,
std::unique_ptr<PlanStage> rt,
- const std::string& ns,
+ NamespaceString nss,
YieldPolicy yieldPolicy);
/**
@@ -203,7 +203,9 @@ public:
/**
* Return the NS that the query is running over.
*/
- const std::string& ns();
+ const NamespaceString& nss() const {
+ return _nss;
+ }
/**
* Return the OperationContext that the plan is currently executing within.
@@ -408,7 +410,7 @@ private:
std::unique_ptr<QuerySolution> qs,
std::unique_ptr<CanonicalQuery> cq,
const Collection* collection,
- const std::string& ns);
+ NamespaceString nss);
/**
* Public factory methods delegate to this private factory to do their work.
@@ -419,7 +421,7 @@ private:
std::unique_ptr<QuerySolution> qs,
std::unique_ptr<CanonicalQuery> cq,
const Collection* collection,
- const std::string& ns,
+ NamespaceString nss,
YieldPolicy yieldPolicy);
/**
@@ -462,7 +464,7 @@ private:
std::unique_ptr<ScopedExecutorRegistration> _safety;
// What namespace are we operating over?
- std::string _ns;
+ NamespaceString _nss;
// This is used to handle automatic yielding when allowed by the YieldPolicy. Never NULL.
// TODO make this a non-pointer member. This requires some header shuffling so that this
diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp
index d84db2b225d..f315daec285 100644
--- a/src/mongo/db/query/plan_yield_policy.cpp
+++ b/src/mongo/db/query/plan_yield_policy.cpp
@@ -108,14 +108,14 @@ bool PlanYieldPolicy::yield(RecordFetcher* fetcher) {
opCtx->recoveryUnit()->abandonSnapshot();
} else {
// Release and reacquire locks.
- QueryYield::yieldAllLocks(opCtx, fetcher, _planYielding->ns());
+ QueryYield::yieldAllLocks(opCtx, fetcher, _planYielding->nss());
}
return _planYielding->restoreStateWithoutRetrying();
} catch (const WriteConflictException& wce) {
CurOp::get(opCtx)->debug().writeConflicts++;
WriteConflictException::logAndBackoff(
- attempt, "plan execution restoreState", _planYielding->ns());
+ attempt, "plan execution restoreState", _planYielding->nss().ns());
// retry
}
}
diff --git a/src/mongo/db/query/query_yield.cpp b/src/mongo/db/query/query_yield.cpp
index 5f43cf819ec..37ecc4bf528 100644
--- a/src/mongo/db/query/query_yield.cpp
+++ b/src/mongo/db/query/query_yield.cpp
@@ -47,7 +47,7 @@ MONGO_FP_DECLARE(setYieldAllLocksWait);
// static
void QueryYield::yieldAllLocks(OperationContext* opCtx,
RecordFetcher* fetcher,
- const std::string& planExecNS) {
+ const NamespaceString& planExecNS) {
// Things have to happen here in a specific order:
// 1) Tell the RecordFetcher to do any setup which needs to happen inside locks
// 2) Release lock mgr locks
diff --git a/src/mongo/db/query/query_yield.h b/src/mongo/db/query/query_yield.h
index a42e29800c9..7d98b299484 100644
--- a/src/mongo/db/query/query_yield.h
+++ b/src/mongo/db/query/query_yield.h
@@ -28,7 +28,7 @@
#pragma once
-#include <string>
+#include "mongo/db/namespace_string.h"
namespace mongo {
@@ -50,7 +50,7 @@ public:
*/
static void yieldAllLocks(OperationContext* opCtx,
RecordFetcher* fetcher,
- const std::string& planExecNS);
+ const NamespaceString& planExecNS);
};
} // namespace mongo
diff --git a/src/mongo/db/range_preserver.h b/src/mongo/db/range_preserver.h
index 55e290d52d1..35b7694234e 100644
--- a/src/mongo/db/range_preserver.h
+++ b/src/mongo/db/range_preserver.h
@@ -54,9 +54,15 @@ public:
}
}
- ~RangePreserver() {
- if (_pin)
+ void release() {
+ if (_pin) {
_pin->deleteUnderlying();
+ _pin.reset();
+ }
+ }
+
+ ~RangePreserver() {
+ release();
}
private:
diff --git a/src/mongo/db/stats/top.cpp b/src/mongo/db/stats/top.cpp
index 8a8358030b8..45680f7db15 100644
--- a/src/mongo/db/stats/top.cpp
+++ b/src/mongo/db/stats/top.cpp
@@ -75,7 +75,7 @@ Top& Top::get(ServiceContext* service) {
void Top::record(OperationContext* opCtx,
StringData ns,
LogicalOp logicalOp,
- int lockType,
+ LockType lockType,
long long micros,
bool command,
Command::ReadWriteType readWriteType) {
@@ -97,7 +97,7 @@ void Top::record(OperationContext* opCtx,
void Top::_record(OperationContext* opCtx,
CollectionData& c,
LogicalOp logicalOp,
- int lockType,
+ LockType lockType,
long long micros,
Command::ReadWriteType readWriteType) {
@@ -105,9 +105,9 @@ void Top::_record(OperationContext* opCtx,
c.total.inc(micros);
- if (lockType > 0)
+ if (lockType == LockType::WriteLocked)
c.writeLock.inc(micros);
- else if (lockType < 0)
+ else if (lockType == LockType::ReadLocked)
c.readLock.inc(micros);
switch (logicalOp) {
diff --git a/src/mongo/db/stats/top.h b/src/mongo/db/stats/top.h
index 1f08e4784a5..463482199fe 100644
--- a/src/mongo/db/stats/top.h
+++ b/src/mongo/db/stats/top.h
@@ -84,13 +84,19 @@ public:
OperationLatencyHistogram opLatencyHistogram;
};
+ enum class LockType {
+ ReadLocked,
+ WriteLocked,
+ NotLocked,
+ };
+
typedef StringMap<CollectionData> UsageMap;
public:
void record(OperationContext* opCtx,
StringData ns,
LogicalOp logicalOp,
- int lockType,
+ LockType lockType,
long long micros,
bool command,
Command::ReadWriteType readWriteType);
@@ -126,7 +132,7 @@ private:
void _record(OperationContext* opCtx,
CollectionData& c,
LogicalOp logicalOp,
- int lockType,
+ LockType lockType,
long long micros,
Command::ReadWriteType readWriteType);
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp
index 58df14b77b4..a6588ffc6b2 100644
--- a/src/mongo/dbtests/documentsourcetests.cpp
+++ b/src/mongo/dbtests/documentsourcetests.cpp
@@ -108,10 +108,7 @@ protected:
getExecutor(&_opCtx, ctx.getCollection(), std::move(cq), PlanExecutor::YIELD_MANUAL));
exec->saveState();
- exec->registerExec(ctx.getCollection());
-
- _source =
- DocumentSourceCursor::create(ctx.getCollection(), nss.ns(), std::move(exec), _ctx);
+ _source = DocumentSourceCursor::create(ctx.getCollection(), std::move(exec), _ctx);
}
intrusive_ptr<ExpressionContextForTest> ctx() {
diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp
index 5fbc726b51a..b2e15787c0d 100644
--- a/src/mongo/dbtests/query_plan_executor.cpp
+++ b/src/mongo/dbtests/query_plan_executor.cpp
@@ -91,13 +91,10 @@ public:
}
/**
- * Given a match expression, represented as the BSON object 'filterObj',
- * create a PlanExecutor capable of executing a simple collection
- * scan.
- *
- * The caller takes ownership of the returned PlanExecutor*.
+ * Given a match expression, represented as the BSON object 'filterObj', create a PlanExecutor
+ * capable of executing a simple collection scan.
*/
- PlanExecutor* makeCollScanExec(Collection* coll, BSONObj& filterObj) {
+ unique_ptr<PlanExecutor> makeCollScanExec(Collection* coll, BSONObj& filterObj) {
CollectionScanParams csparams;
csparams.collection = coll;
csparams.direction = CollectionScanParams::FORWARD;
@@ -124,7 +121,7 @@ public:
coll,
PlanExecutor::YIELD_MANUAL);
ASSERT_OK(statusWithPlanExecutor.getStatus());
- return statusWithPlanExecutor.getValue().release();
+ return std::move(statusWithPlanExecutor.getValue());
}
/**
@@ -302,8 +299,7 @@ public:
// Wrap the "inner" plan executor in a DocumentSourceCursor and add it as the first source
// in the pipeline.
innerExec->saveState();
- auto cursorSource =
- DocumentSourceCursor::create(collection, nss.ns(), std::move(innerExec), expCtx);
+ auto cursorSource = DocumentSourceCursor::create(collection, std::move(innerExec), expCtx);
auto pipeline = assertGet(Pipeline::create({cursorSource}, expCtx));
// Create the output PlanExecutor that pulls results from the pipeline.
@@ -323,7 +319,7 @@ public:
// Verify that the aggregation pipeline returns an error because its "inner" plan executor
// has been killed due to the collection being dropped.
- ASSERT_THROWS_CODE(pipeline->getNext(), UserException, 16028);
+ ASSERT_THROWS_CODE(pipeline->getNext(), UserException, ErrorCodes::QueryPlanKilled);
// Verify that the "outer" plan executor has been killed due to the collection being
// dropped.
@@ -450,10 +446,10 @@ public:
BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
Collection* coll = ctx.getCollection();
- PlanExecutor* exec = makeCollScanExec(coll, filterObj);
+ auto exec = makeCollScanExec(coll, filterObj);
// Make a client cursor from the plan executor.
- coll->getCursorManager()->registerCursor({exec, nss.ns(), false, 0, BSONObj()});
+ coll->getCursorManager()->registerCursor({std::move(exec), nss, false, BSONObj()});
// There should be one cursor before invalidation,
// and zero cursors after invalidation.
@@ -476,11 +472,11 @@ public:
Collection* collection = ctx.getCollection();
BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
- PlanExecutor* exec = makeCollScanExec(collection, filterObj);
+ auto exec = makeCollScanExec(collection, filterObj);
// Make a client cursor from the plan executor.
- auto ccPin =
- collection->getCursorManager()->registerCursor({exec, nss.ns(), false, 0, BSONObj()});
+ auto ccPin = collection->getCursorManager()->registerCursor(
+ {std::move(exec), nss, false, BSONObj()});
// If the cursor is pinned, it sticks around, even after invalidation.
ASSERT_EQUALS(1U, numCursors());
@@ -490,7 +486,7 @@ public:
// The invalidation should have killed the plan executor.
BSONObj objOut;
- ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL));
+ ASSERT_EQUALS(PlanExecutor::DEAD, ccPin.getCursor()->getExecutor()->getNext(&objOut, NULL));
ASSERT(WorkingSetCommon::isValidStatusMemberObject(objOut));
const Status status = WorkingSetCommon::getMemberObjectStatus(objOut);
ASSERT(status.reason().find(invalidateReason) != string::npos);
@@ -519,10 +515,11 @@ public:
Collection* collection = ctx.getCollection();
BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
- PlanExecutor* exec = makeCollScanExec(collection, filterObj);
+ auto exec = makeCollScanExec(collection, filterObj);
// Make a client cursor from the plan executor.
- collection->getCursorManager()->registerCursor({exec, nss.ns(), false, 0, BSONObj()});
+ collection->getCursorManager()->registerCursor(
+ {std::move(exec), nss, false, BSONObj()});
}
// There should be one cursor before timeout,
diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp
index 8171cb96cbe..d705399e27c 100644
--- a/src/mongo/dbtests/querytests.cpp
+++ b/src/mongo/dbtests/querytests.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
+#include <boost/optional.hpp>
#include <iostream>
#include "mongo/client/dbclientcursor.h"
@@ -39,6 +40,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
+#include "mongo/db/exec/queued_data_stage.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/json.h"
#include "mongo/db/lasterror.h"
@@ -244,8 +246,8 @@ protected:
return !_client.getPrevError().getField("err").isNull();
}
- const ServiceContext::UniqueOperationContext _txnPtr = cc().makeOperationContext();
- OperationContext& _opCtx = *_txnPtr;
+ const ServiceContext::UniqueOperationContext _opCtxPtr = cc().makeOperationContext();
+ OperationContext& _opCtx = *_opCtxPtr;
DBDirectClient _client;
};
@@ -1751,6 +1753,107 @@ public:
}
};
+class CursorManagerIsGloballyManagedCursorShouldReturnFalseIfLeadingBitsAreZeroes {
+public:
+ void run() {
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x0000000000000000));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x000000000FFFFFFF));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x000000007FFFFFFF));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x0FFFFFFFFFFFFFFF));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x3FFFFFFFFFFFFFFF));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x3dedbeefdeadbeef));
+ }
+};
+
+class CursorManagerIsGloballyManagedCursorShouldReturnTrueIfLeadingBitsAreZeroAndOne {
+public:
+ void run() {
+ ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x4FFFFFFFFFFFFFFF));
+ ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x5FFFFFFFFFFFFFFF));
+ ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x6FFFFFFFFFFFFFFF));
+ ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x7FFFFFFFFFFFFFFF));
+ ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x4000000000000000));
+ ASSERT_TRUE(CursorManager::isGloballyManagedCursor(0x4dedbeefdeadbeef));
+ }
+};
+
+class CursorManagerIsGloballyManagedCursorShouldReturnFalseIfLeadingBitIsAOne {
+public:
+ void run() {
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(~0LL));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0xFFFFFFFFFFFFFFFF));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x8FFFFFFFFFFFFFFF));
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(0x8dedbeefdeadbeef));
+ }
+};
+
+class CursorManagerTest {
+public:
+ std::unique_ptr<PlanExecutor> makeFakePlanExecutor(OperationContext* opCtx) {
+ auto workingSet = stdx::make_unique<WorkingSet>();
+ auto queuedDataStage = stdx::make_unique<QueuedDataStage>(opCtx, workingSet.get());
+ return unittest::assertGet(PlanExecutor::make(opCtx,
+ std::move(workingSet),
+ std::move(queuedDataStage),
+ NamespaceString{"test.collection"},
+ PlanExecutor::YieldPolicy::YIELD_MANUAL));
+ }
+};
+
+class GlobalCursorManagerShouldReportOwnershipOfCursorsItCreated : public CursorManagerTest {
+public:
+ void run() {
+ auto opCtx = cc().makeOperationContext();
+ for (int i = 0; i < 1000; i++) {
+ auto exec = makeFakePlanExecutor(opCtx.get());
+ auto cursorPin = CursorManager::getGlobalCursorManager()->registerCursor(
+ {std::move(exec), NamespaceString{"test.collection"}, false, BSONObj()});
+ ASSERT_TRUE(CursorManager::isGloballyManagedCursor(cursorPin.getCursor()->cursorid()));
+ }
+ }
+};
+
+class CursorsFromCollectionCursorManagerShouldNotReportBeingManagedByGlobalCursorManager
+ : public CursorManagerTest {
+public:
+ void run() {
+ CursorManager testManager(NamespaceString{"test.collection"});
+ auto opCtx = cc().makeOperationContext();
+ for (int i = 0; i < 1000; i++) {
+ auto exec = makeFakePlanExecutor(opCtx.get());
+ auto cursorPin = testManager.registerCursor(
+ {std::move(exec), NamespaceString{"test.collection"}, false, BSONObj()});
+ ASSERT_FALSE(CursorManager::isGloballyManagedCursor(cursorPin.getCursor()->cursorid()));
+ }
+ }
+};
+
+class AllCursorsFromCollectionCursorManagerShouldContainIdentical32BitPrefixes
+ : public CursorManagerTest {
+public:
+ void run() {
+ CursorManager testManager(NamespaceString{"test.collection"});
+ auto opCtx = cc().makeOperationContext();
+ boost::optional<uint32_t> prefix;
+ for (int i = 0; i < 1000; i++) {
+ auto exec = makeFakePlanExecutor(opCtx.get());
+ auto cursorPin = testManager.registerCursor(
+ {std::move(exec), NamespaceString{"test.collection"}, false, BSONObj()});
+ auto cursorId = cursorPin.getCursor()->cursorid();
+ if (prefix) {
+ ASSERT_EQ(*prefix, extractLeading32Bits(cursorId));
+ } else {
+ prefix = extractLeading32Bits(cursorId);
+ }
+ }
+ }
+
+private:
+ uint32_t extractLeading32Bits(CursorId cursorId) {
+ return static_cast<uint32_t>((cursorId & 0xFFFFFFFF00000000) >> 32);
+ }
+};
+
class All : public Suite {
public:
All() : Suite("query") {}
@@ -1808,10 +1911,14 @@ public:
add<QueryCursorTimeout>();
add<QueryReadsAll>();
add<KillPinnedCursor>();
-
add<queryobjecttests::names1>();
-
add<OrderingTest>();
+ add<CursorManagerIsGloballyManagedCursorShouldReturnFalseIfLeadingBitsAreZeroes>();
+ add<CursorManagerIsGloballyManagedCursorShouldReturnTrueIfLeadingBitsAreZeroAndOne>();
+ add<CursorManagerIsGloballyManagedCursorShouldReturnFalseIfLeadingBitIsAOne>();
+ add<GlobalCursorManagerShouldReportOwnershipOfCursorsItCreated>();
+ add<CursorsFromCollectionCursorManagerShouldNotReportBeingManagedByGlobalCursorManager>();
+ add<AllCursorsFromCollectionCursorManagerShouldContainIdentical32BitPrefixes>();
}
};