diff options
author | Mathias Stearn <mathias@10gen.com> | 2014-01-30 18:44:34 -0500 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2014-02-12 11:39:35 -0500 |
commit | 6244c5e5ef1a285ea0a6a28a411caa41a2691197 (patch) | |
tree | 944ea64542f67273afbcd16307b874eceebe5e3b /src/mongo/db/pipeline/document_source_cursor.cpp | |
parent | 20806b5757b5bf4dbf524df0f332170012086af7 (diff) | |
download | mongo-6244c5e5ef1a285ea0a6a28a411caa41a2691197.tar.gz |
SERVER-12530 Make DocumentSourceCursor use Runner directly
Now that the input Runner no longer is wrapped in a ClientCursor, the
PipelineRunner is responsible for propagating kill and invalidate methods.
Diffstat (limited to 'src/mongo/db/pipeline/document_source_cursor.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 79 |
1 files changed, 22 insertions, 57 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 5596b9ca957..99fdf7d8d45 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -66,59 +66,30 @@ namespace mongo { return out; } - void DocumentSourceCursor::kill() { - _killed = true; - _cursorId = 0; - } - void DocumentSourceCursor::dispose() { - if (_cursorId) { - Lock::DBRead lk(_ns); - Client::Context ctx(_ns, storageGlobalParams.dbpath, /*doVersion=*/false); - Collection* collection = ctx.db()->getCollection( _ns ); - if ( collection ) { - ClientCursor* cc = collection->cursorCache()->find( _cursorId ); - if ( cc ) { - collection->cursorCache()->deregisterCursor( cc ); - delete cc; - } - } - _cursorId = 0; - } - + // Can't call in to Runner or ClientCursor registries from this function since it will be + // called when an agg cursor is killed which would cause a deadlock. + _runner.reset(); _currentBatch.clear(); } void DocumentSourceCursor::loadBatch() { - - Lock::DBRead lk(_ns); - - uassert( 17361, "collection or index disappeared when cursor yielded", !_killed ); - - if (!_cursorId) { + if (!_runner) { dispose(); return; } - // We have already validated the sharding version when we constructed the cursor + // We have already validated the sharding version when we constructed the Runner // so we shouldn't check it again. + Lock::DBRead lk(_ns); Client::Context ctx(_ns, storageGlobalParams.dbpath, /*doVersion=*/false); - Collection* collection = ctx.db()->getCollection( _ns ); - uassert( 17358, "Collection dropped.", collection ); - ClientCursorPin pin(collection, _cursorId); - ClientCursor* cursor = pin.c(); - - uassert(16950, "Cursor deleted. Was the collection or database dropped?", - cursor); - - Runner* runner = cursor->getRunner(); - runner->restoreState(); + _runner->restoreState(); int memUsageBytes = 0; BSONObj obj; Runner::RunnerState state; - while ((state = runner->getNext(&obj, NULL)) == Runner::RUNNER_ADVANCED) { + while ((state = _runner->getNext(&obj, NULL)) == Runner::RUNNER_ADVANCED) { if (_dependencies) { _currentBatch.push_back(_dependencies->extractFields(obj)); } @@ -136,16 +107,16 @@ namespace mongo { memUsageBytes += _currentBatch.back().getApproximateSize(); if (memUsageBytes > MaxBytesToReturnToClientAtOnce) { - // End this batch and prepare cursor for yielding. - runner->saveState(); + // End this batch and prepare Runner for yielding. + _runner->saveState(); cc().curop()->yielded(); return; } } - // If we got here, there won't be any more documents, so destroy the cursor and runner. - _cursorId = 0; - pin.deleteUnderlying(); + // If we got here, there won't be any more documents, so destroy the runner. Can't use + // dispose since we want to keep the _currentBatch. + _runner.reset(); uassert(16028, "collection or index disappeared when cursor yielded", state != Runner::RUNNER_DEAD); @@ -236,21 +207,17 @@ namespace { Collection* collection = ctx.db()->getCollection( _ns ); uassert( 17362, "Collection dropped.", collection ); - ClientCursorPin pin(collection, _cursorId); - ClientCursor* cursor = pin.c(); + massert(17392, "No _runner. Were we disposed before explained?", + _runner); - uassert(17135, "Cursor deleted. Was the collection or database dropped?", - cursor); - - Runner* runner = cursor->getRunner(); - runner->restoreState(); + _runner->restoreState(); TypeExplain* explainRaw; - explainStatus = runner->getInfo(&explainRaw, NULL); + explainStatus = _runner->getInfo(&explainRaw, NULL); if (explainStatus.isOK()) plan.reset(explainRaw); - runner->saveState(); + _runner->saveState(); } MutableDocument out; @@ -271,25 +238,23 @@ namespace { out["planError"] = Value(explainStatus.toString()); } - return Value(DOC(getSourceName() << out.freezeToValue())); } DocumentSourceCursor::DocumentSourceCursor(const string& ns, - CursorId cursorId, + const boost::shared_ptr<Runner>& runner, const intrusive_ptr<ExpressionContext> &pCtx) : DocumentSource(pCtx) , _docsAddedToBatches(0) , _ns(ns) - , _cursorId(cursorId) - , _killed(false) + , _runner(runner) {} intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create( const string& ns, - CursorId cursorId, + const boost::shared_ptr<Runner>& runner, const intrusive_ptr<ExpressionContext> &pExpCtx) { - return new DocumentSourceCursor(ns, cursorId, pExpCtx); + return new DocumentSourceCursor(ns, runner, pExpCtx); } void DocumentSourceCursor::setProjection( |