summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2014-01-30 18:44:34 -0500
committerMathias Stearn <mathias@10gen.com>2014-02-12 11:39:35 -0500
commit6244c5e5ef1a285ea0a6a28a411caa41a2691197 (patch)
tree944ea64542f67273afbcd16307b874eceebe5e3b
parent20806b5757b5bf4dbf524df0f332170012086af7 (diff)
downloadmongo-6244c5e5ef1a285ea0a6a28a411caa41a2691197.tar.gz
SERVER-12530 Make DocumentSourceCursor use Runner directly
Now that the input Runner no longer is wrapped in a ClientCursor, the PipelineRunner is responsible for propagating kill and invalidate methods.
-rw-r--r--jstests/aggregation/bugs/server3253.js46
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp38
-rw-r--r--src/mongo/db/pipeline/document_source.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source.h29
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp79
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp51
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h12
-rw-r--r--src/mongo/dbtests/documentsourcetests.cpp47
8 files changed, 130 insertions, 178 deletions
diff --git a/jstests/aggregation/bugs/server3253.js b/jstests/aggregation/bugs/server3253.js
index 5f868e7d0d0..a689f4e321e 100644
--- a/jstests/aggregation/bugs/server3253.js
+++ b/jstests/aggregation/bugs/server3253.js
@@ -2,16 +2,23 @@
load('jstests/aggregation/extras/utils.js');
var input = db.server3253_in;
+var inputDoesntExist = db.server3253_doesnt_exist;
var output = db.server3253_out;
+var cappedOutput = db.server3253_out_capped;
input.drop();
+inputDoesntExist.drop(); // never created
output.drop();
+function collectionExists(coll) {
+ return Array.contains(coll.getDB().getCollectionNames(), coll.getName());
+}
+
function getOutputIndexes() {
return db.system.indexes.find({ns: output.getFullName()}).sort({"key":1}).toArray();
}
-function test(pipeline, expected) {
+function test(input, pipeline, expected) {
pipeline.push({$out: output.getName()});
var indexes = getOutputIndexes();
@@ -20,6 +27,7 @@ function test(pipeline, expected) {
assert.eq(cursor.itcount(), 0); // empty cursor returned
assert.eq(output.find().toArray(), expected); // correct results
assert.eq(getOutputIndexes(), indexes); // number of indexes maintained
+ assert(collectionExists(output));
}
@@ -34,42 +42,54 @@ output.insert({_id:1});
assert.eq([], db.system.namespaces.find({name: /tmp\.agg_out/}).toArray());
// basic test
-test([{$project: {a: {$add: ['$_id', '$_id']}}}],
+test(input,
+ [{$project: {a: {$add: ['$_id', '$_id']}}}],
[{_id:1, a:2},{_id:2, a:4},{_id:3, a:6}]);
// test with indexes
assert.eq(output.getIndexes().length, 1);
output.ensureIndex({a:1});
assert.eq(output.getIndexes().length, 2);
-test([{$project: {a: {$multiply: ['$_id', '$_id']}}}],
+test(input,
+ [{$project: {a: {$multiply: ['$_id', '$_id']}}}],
[{_id:1, a:1},{_id:2, a:4},{_id:3, a:9}]);
// test with empty result set and make sure old result is gone, but indexes remain
-test([{$match: {_id: 11}}],
+test(input,
+ [{$match: {_id: 11}}],
[]);
assert.eq(output.getIndexes().length, 2);
// test with geo index
output.ensureIndex({b:"2d"});
assert.eq(output.getIndexes().length, 3);
-test([{$project: {b: "$_id"}}],
+test(input,
+ [{$project: {b: "$_id"}}],
[{_id:1, b:1}, {_id:2, b:2}, {_id:3, b:3}]);
// test with full text index
output.ensureIndex({c:"text"});
assert.eq(output.getIndexes().length, 4);
-test([{$project: {c: {$concat: ["hello there ", "_id"]}}}],
+test(input,
+ [{$project: {c: {$concat: ["hello there ", "_id"]}}}],
[{_id:1, c:"hello there _id"}, {_id:2, c:"hello there _id"}, {_id:3, c:"hello there _id"}]);
// test with capped collection
-output.drop();
-db.createCollection(output.getName(), {capped: true, size: 2});
-assertErrorCode(input, {$out: output.getName()}, 17152);
+cappedOutput.drop();
+db.createCollection(cappedOutput.getName(), {capped: true, size: 2});
+assertErrorCode(input, {$out: cappedOutput.getName()}, 17152);
-// ensure we cant do dangerous things to system
-output = db.getSiblingDB("system").server3253_out;
-input = db.getSiblingDB("system").server3253_in;
-assertErrorCode(input, {$out: output.getName()}, 16994);
+// ensure everything works even if input doesn't exist.
+test(inputDoesntExist,
+ [],
+ []);
+
+if (0) { // SERVER-12586
+// ensure we cant do dangerous things to system collections
+outputInSystem = db.system.server3253_out;
+assertErrorCode(input, {$out: outputInSystem.getName()}, 16994);
+assert(!collectionExists(outputInSystem));
+}
// shoudn't leave temp collections laying around
assert.eq([], db.system.namespaces.find({name: /tmp\.agg_out/}).toArray());
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 87deb59f496..395eb75f74e 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -28,6 +28,7 @@
#include "mongo/pch.h"
+#include <boost/smart_ptr.hpp>
#include <vector>
#include "mongo/db/auth/action_set.h"
@@ -46,6 +47,7 @@
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/find_constants.h"
+#include "mongo/db/query/get_runner.h"
#include "mongo/db/storage_options.h"
namespace mongo {
@@ -57,9 +59,10 @@ namespace {
*/
class PipelineRunner : public Runner {
public:
- PipelineRunner(intrusive_ptr<Pipeline> pipeline)
+ PipelineRunner(intrusive_ptr<Pipeline> pipeline, const boost::shared_ptr<Runner>& child)
: _pipeline(pipeline)
, _includeMetaData(_pipeline->getContext()->inShard) // send metadata to merger
+ , _childRunner(child)
{}
virtual RunnerState getNext(BSONObj* objOut, DiskLoc* dlOut) {
@@ -101,12 +104,20 @@ namespace {
"PipelineCursor doesn't implement getExplainPlan");
}
- // These are all no-ops for PipelineRunners
- virtual void setYieldPolicy(YieldPolicy policy) {}
- virtual void invalidate(const DiskLoc& dl, InvalidationType type) {}
+ // propagate to child runner if still in use
+ virtual void invalidate(const DiskLoc& dl, InvalidationType type) {
+ if (boost::shared_ptr<Runner> runner = _childRunner.lock()) {
+ runner->invalidate(dl, type);
+ }
+ }
virtual void kill() {
- _pipeline->output()->kill();
+ if (boost::shared_ptr<Runner> runner = _childRunner.lock()) {
+ runner->kill();
+ }
}
+
+ // These are all no-ops for PipelineRunners
+ virtual void setYieldPolicy(YieldPolicy policy) {}
virtual void saveState() {}
virtual bool restoreState() { return true; }
virtual const Collection* collection() { return NULL; }
@@ -136,6 +147,7 @@ namespace {
const intrusive_ptr<Pipeline> _pipeline;
vector<BSONObj> _stash;
const bool _includeMetaData;
+ boost::weak_ptr<Runner> _childRunner;
};
}
@@ -291,8 +303,8 @@ namespace {
}
#endif
- scoped_ptr<ClientCursorPin> pin;
PipelineRunner* runner = NULL;
+ scoped_ptr<ClientCursorPin> pin; // either this OR the runnerHolder will be non-null
auto_ptr<PipelineRunner> runnerHolder;
{
// This will throw if the sharding version for this connection is out of date. The
@@ -305,13 +317,21 @@ namespace {
Collection* collection = ctx.ctx().db()->getCollection(ns);
- // This does mongod-specific stuff like creating the input Runner if needed
- PipelineD::prepareCursorSource(pPipeline, pCtx, collection);
+ // This does mongod-specific stuff like creating the input Runner and adding to the
+ // front of the pipeline if needed.
+ boost::shared_ptr<Runner> input = PipelineD::prepareCursorSource(pPipeline, pCtx);
pPipeline->stitch();
- runnerHolder.reset(new PipelineRunner(pPipeline));
+ runnerHolder.reset(new PipelineRunner(pPipeline, input));
runner = runnerHolder.get();
+ if (!collection && input) {
+ // If we don't have a collection, we won't be able to register any Runners, so
+ // make sure that the input Runner (likely an EOFRunner) doesn't need to be
+ // registered.
+ invariant(!input->collection());
+ }
+
if (collection) {
ClientCursor* cursor = new ClientCursor(collection, runnerHolder.release());
cursor->isAggCursor = true; // enable special locking behavior
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index 60ec8772db1..d4c21fca2fd 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -63,12 +63,6 @@ namespace mongo {
}
}
- void DocumentSource::kill() {
- if ( pSource ) {
- pSource->kill();
- }
- }
-
void DocumentSource::serializeToArray(vector<Value>& array, bool explain) const {
Value entry = serialize(explain);
if (!entry.missing()) {
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 07ec5910bcf..1814a2a80d4 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -50,12 +50,12 @@
namespace mongo {
class Accumulator;
- class Cursor;
class Document;
class Expression;
class ExpressionFieldPath;
class ExpressionObject;
class DocumentSourceLimit;
+ class Runner;
class DocumentSource : public IntrusiveCounterUnsigned {
public:
@@ -76,11 +76,6 @@ namespace mongo {
virtual void dispose();
/**
- * See ClientCursor::kill()
- */
- virtual void kill();
-
- /**
Get the source's name.
@returns the string name of the source as a constant string;
@@ -335,7 +330,7 @@ namespace mongo {
/**
- * Constructs and returns Documents from the BSONObj objects produced by a supplied Cursor.
+ * Constructs and returns Documents from the BSONObj objects produced by a supplied Runner.
* An object of this type may only be used by one thread, see SERVER-6123.
*/
class DocumentSourceCursor :
@@ -350,25 +345,16 @@ namespace mongo {
virtual bool coalesce(const intrusive_ptr<DocumentSource>& nextSource);
virtual bool isValidInitialSource() const { return true; }
virtual void dispose();
- virtual void kill();
/**
- * Create a document source based on a passed-in cursor.
+ * Create a document source based on a passed-in Runner.
*
* This is usually put at the beginning of a chain of document sources
* in order to fetch data from the database.
- *
- * The DocumentSource takes ownership of the cursor and will destroy it
- * when the DocumentSource is finished with the cursor, if it hasn't
- * already been destroyed.
- *
- * @param ns the namespace the cursor is over
- * @param cursorId the id of the cursor to use
- * @param pExpCtx the expression context for the pipeline
*/
static intrusive_ptr<DocumentSourceCursor> create(
const string& ns,
- CursorId cursorId,
+ const boost::shared_ptr<Runner>& runner,
const intrusive_ptr<ExpressionContext> &pExpCtx);
/*
@@ -412,7 +398,7 @@ namespace mongo {
private:
DocumentSourceCursor(
const string& ns,
- CursorId cursorId,
+ const boost::shared_ptr<Runner>& runner,
const intrusive_ptr<ExpressionContext> &pExpCtx);
void loadBatch();
@@ -427,9 +413,8 @@ namespace mongo {
intrusive_ptr<DocumentSourceLimit> _limit;
long long _docsAddedToBatches; // for _limit enforcement
- string _ns; // namespace
- CursorId _cursorId;
- bool _killed;
+ const string _ns;
+ boost::shared_ptr<Runner> _runner; // PipelineRunner holds a weak_ptr to this.
};
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 5596b9ca957..99fdf7d8d45 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -66,59 +66,30 @@ namespace mongo {
return out;
}
- void DocumentSourceCursor::kill() {
- _killed = true;
- _cursorId = 0;
- }
-
void DocumentSourceCursor::dispose() {
- if (_cursorId) {
- Lock::DBRead lk(_ns);
- Client::Context ctx(_ns, storageGlobalParams.dbpath, /*doVersion=*/false);
- Collection* collection = ctx.db()->getCollection( _ns );
- if ( collection ) {
- ClientCursor* cc = collection->cursorCache()->find( _cursorId );
- if ( cc ) {
- collection->cursorCache()->deregisterCursor( cc );
- delete cc;
- }
- }
- _cursorId = 0;
- }
-
+ // Can't call in to Runner or ClientCursor registries from this function since it will be
+ // called when an agg cursor is killed which would cause a deadlock.
+ _runner.reset();
_currentBatch.clear();
}
void DocumentSourceCursor::loadBatch() {
-
- Lock::DBRead lk(_ns);
-
- uassert( 17361, "collection or index disappeared when cursor yielded", !_killed );
-
- if (!_cursorId) {
+ if (!_runner) {
dispose();
return;
}
- // We have already validated the sharding version when we constructed the cursor
+ // We have already validated the sharding version when we constructed the Runner
// so we shouldn't check it again.
+ Lock::DBRead lk(_ns);
Client::Context ctx(_ns, storageGlobalParams.dbpath, /*doVersion=*/false);
- Collection* collection = ctx.db()->getCollection( _ns );
- uassert( 17358, "Collection dropped.", collection );
- ClientCursorPin pin(collection, _cursorId);
- ClientCursor* cursor = pin.c();
-
- uassert(16950, "Cursor deleted. Was the collection or database dropped?",
- cursor);
-
- Runner* runner = cursor->getRunner();
- runner->restoreState();
+ _runner->restoreState();
int memUsageBytes = 0;
BSONObj obj;
Runner::RunnerState state;
- while ((state = runner->getNext(&obj, NULL)) == Runner::RUNNER_ADVANCED) {
+ while ((state = _runner->getNext(&obj, NULL)) == Runner::RUNNER_ADVANCED) {
if (_dependencies) {
_currentBatch.push_back(_dependencies->extractFields(obj));
}
@@ -136,16 +107,16 @@ namespace mongo {
memUsageBytes += _currentBatch.back().getApproximateSize();
if (memUsageBytes > MaxBytesToReturnToClientAtOnce) {
- // End this batch and prepare cursor for yielding.
- runner->saveState();
+ // End this batch and prepare Runner for yielding.
+ _runner->saveState();
cc().curop()->yielded();
return;
}
}
- // If we got here, there won't be any more documents, so destroy the cursor and runner.
- _cursorId = 0;
- pin.deleteUnderlying();
+ // If we got here, there won't be any more documents, so destroy the runner. Can't use
+ // dispose since we want to keep the _currentBatch.
+ _runner.reset();
uassert(16028, "collection or index disappeared when cursor yielded",
state != Runner::RUNNER_DEAD);
@@ -236,21 +207,17 @@ namespace {
Collection* collection = ctx.db()->getCollection( _ns );
uassert( 17362, "Collection dropped.", collection );
- ClientCursorPin pin(collection, _cursorId);
- ClientCursor* cursor = pin.c();
+ massert(17392, "No _runner. Were we disposed before explained?",
+ _runner);
- uassert(17135, "Cursor deleted. Was the collection or database dropped?",
- cursor);
-
- Runner* runner = cursor->getRunner();
- runner->restoreState();
+ _runner->restoreState();
TypeExplain* explainRaw;
- explainStatus = runner->getInfo(&explainRaw, NULL);
+ explainStatus = _runner->getInfo(&explainRaw, NULL);
if (explainStatus.isOK())
plan.reset(explainRaw);
- runner->saveState();
+ _runner->saveState();
}
MutableDocument out;
@@ -271,25 +238,23 @@ namespace {
out["planError"] = Value(explainStatus.toString());
}
-
return Value(DOC(getSourceName() << out.freezeToValue()));
}
DocumentSourceCursor::DocumentSourceCursor(const string& ns,
- CursorId cursorId,
+ const boost::shared_ptr<Runner>& runner,
const intrusive_ptr<ExpressionContext> &pCtx)
: DocumentSource(pCtx)
, _docsAddedToBatches(0)
, _ns(ns)
- , _cursorId(cursorId)
- , _killed(false)
+ , _runner(runner)
{}
intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create(
const string& ns,
- CursorId cursorId,
+ const boost::shared_ptr<Runner>& runner,
const intrusive_ptr<ExpressionContext> &pExpCtx) {
- return new DocumentSourceCursor(ns, cursorId, pExpCtx);
+ return new DocumentSourceCursor(ns, runner, pExpCtx);
}
void DocumentSourceCursor::setProjection(
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index e85a5b3d3ee..d60a3259a15 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -63,9 +63,9 @@ namespace {
};
}
- void PipelineD::prepareCursorSource(const intrusive_ptr<Pipeline>& pPipeline,
- const intrusive_ptr<ExpressionContext>& pExpCtx,
- Collection* collection) {
+ boost::shared_ptr<Runner> PipelineD::prepareCursorSource(
+ const intrusive_ptr<Pipeline>& pPipeline,
+ const intrusive_ptr<ExpressionContext>& pExpCtx) {
// get the full "namespace" name
const string& fullName = pExpCtx->ns.ns();
Lock::assertAtLeastReadLocked(fullName);
@@ -90,7 +90,7 @@ namespace {
// on secondaries, this is needed.
ShardedConnectionInfo::addHook();
}
- return; // don't need a cursor
+ return boost::shared_ptr<Runner>(); // don't need a cursor
}
@@ -128,28 +128,6 @@ namespace {
}
}
- // for debugging purposes, show what the query and sort are
- DEV {
- (log() << "\n---- query BSON\n" <<
- queryObj.jsonString(Strict, 1) << "\n----\n");
- (log() << "\n---- sort BSON\n" <<
- sortObj.jsonString(Strict, 1) << "\n----\n");
- (log() << "\n---- fullName\n" <<
- fullName << "\n----\n");
- }
-
- if (!collection) {
- // Collection doesn't exist. Create a source that will return no results to simulate an
- // empty collection.
- intrusive_ptr<DocumentSource> source(DocumentSourceBsonArray::create(BSONObj(),
- pExpCtx));
- while (!sources.empty() && source->coalesce(sources.front())) {
- sources.pop_front();
- }
- pPipeline->addInitialSource( source );
- return;
- }
-
// Create the Runner.
//
// If we try to create a Runner that includes both the match and the
@@ -173,7 +151,7 @@ namespace {
| QueryPlannerParams::INCLUDE_SHARD_FILTER
| QueryPlannerParams::NO_BLOCKING_SORT
;
- auto_ptr<Runner> runner;
+ boost::shared_ptr<Runner> runner;
bool sortInRunner = false;
if (sortStage) {
CanonicalQuery* cq;
@@ -212,19 +190,14 @@ namespace {
runner.reset(rawRunner);
}
- // Now wrap the Runner in ClientCursor
- auto_ptr<ClientCursor> cursor(new ClientCursor(collection, runner.release()));
- verify(cursor->getRunner());
- CursorId cursorId = cursor->cursorid();
- // Prepare the cursor for data to change under it when we unlock
- cursor->getRunner()->setYieldPolicy(Runner::YIELD_AUTO);
- cursor->getRunner()->saveState();
- cursor.release(); // it is now owned by the client cursor manager
+ // DocumentSourceCursor expects a yielding Runner that has had its state saved.
+ runner->setYieldPolicy(Runner::YIELD_AUTO);
+ runner->saveState();
- /* wrap the cursor with a DocumentSource and return that */
- intrusive_ptr<DocumentSourceCursor> pSource(
- DocumentSourceCursor::create( fullName, cursorId, pExpCtx ) );
+ // Put the Runner into a DocumentSourceCursor and add it to the front of the pipeline.
+ intrusive_ptr<DocumentSourceCursor> pSource =
+ DocumentSourceCursor::create(fullName, runner, pExpCtx);
// Note the query, sort, and projection for explain.
pSource->setQuery(queryObj);
@@ -238,6 +211,8 @@ namespace {
}
pPipeline->addInitialSource(pSource);
+
+ return runner;
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index 33cc6af35fa..98ea7537be3 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -28,13 +28,14 @@
#pragma once
-#include "mongo/pch.h"
+#include <boost/smart_ptr.hpp>
namespace mongo {
class Collection;
class DocumentSourceCursor;
struct ExpressionContext;
class Pipeline;
+ class Runner;
/*
PipelineD is an extension of the Pipeline class, but with additional
@@ -63,14 +64,15 @@ namespace mongo {
*
* Must have a ReadContext before entering.
*
+ * If the returned Runner is non-null, you are responsible for ensuring
+ * it receives appropriate invalidate and kill messages.
+ *
* @param pPipeline the logical "this" for this operation
* @param pExpCtx the expression context for this pipeline
- * @param collection the input collection. NULL if doesn't exist.
*/
- static void prepareCursorSource(
+ static boost::shared_ptr<Runner> prepareCursorSource(
const intrusive_ptr<Pipeline> &pPipeline,
- const intrusive_ptr<ExpressionContext> &pExpCtx,
- Collection* collection);
+ const intrusive_ptr<ExpressionContext> &pExpCtx);
private:
PipelineD(); // does not exist: prevent instantiation
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp
index 3d8ffa28a71..06ce6475e34 100644
--- a/src/mongo/dbtests/documentsourcetests.cpp
+++ b/src/mongo/dbtests/documentsourcetests.cpp
@@ -160,54 +160,45 @@ namespace DocumentSourceTests {
{ _ctx->tempDir = storageGlobalParams.dbpath + "/_tmp"; }
protected:
void createSource() {
+ // clean up first if this was called before
+ _source.reset();
+ _registration.reset();
+ _runner.reset();
+
Client::WriteContext ctx (ns);
- Collection* collection = ctx.ctx().db()->getOrCreateCollection( ns );
CanonicalQuery* cq;
uassertStatusOK(CanonicalQuery::canonicalize(ns, /*query=*/BSONObj(), &cq));
- Runner* runner;
- uassertStatusOK(getRunner(cq, &runner));
- auto_ptr<ClientCursor> cc(new ClientCursor(collection,
- runner,
- QueryOption_NoCursorTimeout));
- verify(cc->getRunner());
- cc->getRunner()->setYieldPolicy(Runner::YIELD_AUTO);
- CursorId cursorId = cc->cursorid();
- runner->saveState();
- cc.release(); // it is now owned by the client cursor manager
- _source = DocumentSourceCursor::create(ns, cursorId, _ctx);
+ Runner* runnerBare;
+ uassertStatusOK(getRunner(cq, &runnerBare));
+
+ _runner.reset(runnerBare);
+ _runner->setYieldPolicy(Runner::YIELD_AUTO);
+ _runner->saveState();
+ _registration.reset(new ScopedRunnerRegistration(_runner.get()));
+
+ _source = DocumentSourceCursor::create(ns, _runner, _ctx);
}
intrusive_ptr<ExpressionContext> ctx() { return _ctx; }
DocumentSourceCursor* source() { return _source.get(); }
private:
+ // It is important that these are ordered to ensure correct destruction order.
+ boost::shared_ptr<Runner> _runner;
+ boost::scoped_ptr<ScopedRunnerRegistration> _registration;
intrusive_ptr<ExpressionContext> _ctx;
intrusive_ptr<DocumentSourceCursor> _source;
};
/** Create a DocumentSourceCursor. */
- class Create : public Base {
+ class Empty : public Base {
public:
void run() {
createSource();
// The DocumentSourceCursor doesn't hold a read lock.
ASSERT( !Lock::isReadLocked() );
- // The DocumentSourceCursor holds a ClientCursor.
- assertNumClientCursors( 1 );
// The collection is empty, so the source produces no results.
ASSERT( !source()->getNext() );
// Exhausting the source releases the read lock.
ASSERT( !Lock::isReadLocked() );
- // The ClientCursor is also cleaned up.
- assertNumClientCursors( 0 );
- }
- private:
- void assertNumClientCursors( unsigned int expected ) {
- Client::ReadContext ctx( ns );
- Collection* collection = ctx.ctx().db()->getCollection( ns );
- if ( !collection ) {
- ASSERT( 0 == expected );
- return;
- }
- ASSERT_EQUALS( expected, collection->cursorCache()->numCursors() );
}
};
@@ -1936,7 +1927,7 @@ namespace DocumentSourceTests {
void setupTests() {
add<DocumentSourceClass::Deps>();
- add<DocumentSourceCursor::Create>();
+ add<DocumentSourceCursor::Empty>();
add<DocumentSourceCursor::Iterate>();
add<DocumentSourceCursor::Dispose>();
add<DocumentSourceCursor::IterateDispose>();