summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_cursor.cpp
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2014-01-30 18:44:34 -0500
committerMathias Stearn <mathias@10gen.com>2014-02-12 11:39:35 -0500
commit6244c5e5ef1a285ea0a6a28a411caa41a2691197 (patch)
tree944ea64542f67273afbcd16307b874eceebe5e3b /src/mongo/db/pipeline/document_source_cursor.cpp
parent20806b5757b5bf4dbf524df0f332170012086af7 (diff)
downloadmongo-6244c5e5ef1a285ea0a6a28a411caa41a2691197.tar.gz
SERVER-12530 Make DocumentSourceCursor use Runner directly
Now that the input Runner no longer is wrapped in a ClientCursor, the PipelineRunner is responsible for propagating kill and invalidate methods.
Diffstat (limited to 'src/mongo/db/pipeline/document_source_cursor.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp79
1 files changed, 22 insertions, 57 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 5596b9ca957..99fdf7d8d45 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -66,59 +66,30 @@ namespace mongo {
return out;
}
- void DocumentSourceCursor::kill() {
- _killed = true;
- _cursorId = 0;
- }
-
void DocumentSourceCursor::dispose() {
- if (_cursorId) {
- Lock::DBRead lk(_ns);
- Client::Context ctx(_ns, storageGlobalParams.dbpath, /*doVersion=*/false);
- Collection* collection = ctx.db()->getCollection( _ns );
- if ( collection ) {
- ClientCursor* cc = collection->cursorCache()->find( _cursorId );
- if ( cc ) {
- collection->cursorCache()->deregisterCursor( cc );
- delete cc;
- }
- }
- _cursorId = 0;
- }
-
+ // Can't call in to Runner or ClientCursor registries from this function since it will be
+ // called when an agg cursor is killed which would cause a deadlock.
+ _runner.reset();
_currentBatch.clear();
}
void DocumentSourceCursor::loadBatch() {
-
- Lock::DBRead lk(_ns);
-
- uassert( 17361, "collection or index disappeared when cursor yielded", !_killed );
-
- if (!_cursorId) {
+ if (!_runner) {
dispose();
return;
}
- // We have already validated the sharding version when we constructed the cursor
+ // We have already validated the sharding version when we constructed the Runner
// so we shouldn't check it again.
+ Lock::DBRead lk(_ns);
Client::Context ctx(_ns, storageGlobalParams.dbpath, /*doVersion=*/false);
- Collection* collection = ctx.db()->getCollection( _ns );
- uassert( 17358, "Collection dropped.", collection );
- ClientCursorPin pin(collection, _cursorId);
- ClientCursor* cursor = pin.c();
-
- uassert(16950, "Cursor deleted. Was the collection or database dropped?",
- cursor);
-
- Runner* runner = cursor->getRunner();
- runner->restoreState();
+ _runner->restoreState();
int memUsageBytes = 0;
BSONObj obj;
Runner::RunnerState state;
- while ((state = runner->getNext(&obj, NULL)) == Runner::RUNNER_ADVANCED) {
+ while ((state = _runner->getNext(&obj, NULL)) == Runner::RUNNER_ADVANCED) {
if (_dependencies) {
_currentBatch.push_back(_dependencies->extractFields(obj));
}
@@ -136,16 +107,16 @@ namespace mongo {
memUsageBytes += _currentBatch.back().getApproximateSize();
if (memUsageBytes > MaxBytesToReturnToClientAtOnce) {
- // End this batch and prepare cursor for yielding.
- runner->saveState();
+ // End this batch and prepare Runner for yielding.
+ _runner->saveState();
cc().curop()->yielded();
return;
}
}
- // If we got here, there won't be any more documents, so destroy the cursor and runner.
- _cursorId = 0;
- pin.deleteUnderlying();
+ // If we got here, there won't be any more documents, so destroy the runner. Can't use
+ // dispose since we want to keep the _currentBatch.
+ _runner.reset();
uassert(16028, "collection or index disappeared when cursor yielded",
state != Runner::RUNNER_DEAD);
@@ -236,21 +207,17 @@ namespace {
Collection* collection = ctx.db()->getCollection( _ns );
uassert( 17362, "Collection dropped.", collection );
- ClientCursorPin pin(collection, _cursorId);
- ClientCursor* cursor = pin.c();
+ massert(17392, "No _runner. Were we disposed before explained?",
+ _runner);
- uassert(17135, "Cursor deleted. Was the collection or database dropped?",
- cursor);
-
- Runner* runner = cursor->getRunner();
- runner->restoreState();
+ _runner->restoreState();
TypeExplain* explainRaw;
- explainStatus = runner->getInfo(&explainRaw, NULL);
+ explainStatus = _runner->getInfo(&explainRaw, NULL);
if (explainStatus.isOK())
plan.reset(explainRaw);
- runner->saveState();
+ _runner->saveState();
}
MutableDocument out;
@@ -271,25 +238,23 @@ namespace {
out["planError"] = Value(explainStatus.toString());
}
-
return Value(DOC(getSourceName() << out.freezeToValue()));
}
DocumentSourceCursor::DocumentSourceCursor(const string& ns,
- CursorId cursorId,
+ const boost::shared_ptr<Runner>& runner,
const intrusive_ptr<ExpressionContext> &pCtx)
: DocumentSource(pCtx)
, _docsAddedToBatches(0)
, _ns(ns)
- , _cursorId(cursorId)
- , _killed(false)
+ , _runner(runner)
{}
intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create(
const string& ns,
- CursorId cursorId,
+ const boost::shared_ptr<Runner>& runner,
const intrusive_ptr<ExpressionContext> &pExpCtx) {
- return new DocumentSourceCursor(ns, cursorId, pExpCtx);
+ return new DocumentSourceCursor(ns, runner, pExpCtx);
}
void DocumentSourceCursor::setProjection(