diff options
author | Mathias Stearn <mathias@10gen.com> | 2014-01-30 17:16:36 -0500 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2014-02-12 11:38:38 -0500 |
commit | 20806b5757b5bf4dbf524df0f332170012086af7 (patch) | |
tree | c0f71847c38417ae835bd2dff99eef4d59f9d4ce /src/mongo | |
parent | 8c99c9ad5cd5ef14f808c1c135f807255330f025 (diff) | |
download | mongo-20806b5757b5bf4dbf524df0f332170012086af7.tar.gz |
SERVER-12530 clean up agg output cursor creation and locking
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/commands/pipeline_command.cpp | 188 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.h | 7 |
3 files changed, 109 insertions, 106 deletions
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 7a024de270d..87deb59f496 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -166,28 +166,16 @@ namespace { } static void handleCursorCommand(const string& ns, - intrusive_ptr<Pipeline>& pPipeline, - BSONObj& cmdObj, + ClientCursorPin* pin, + PipelineRunner* runner, + const BSONObj& cmdObj, BSONObjBuilder& result) { - scoped_ptr<ClientCursorPin> pin; - string cursorNs = ns; - - { - // Set up cursor - Client::ReadContext ctx(ns); - Collection* collection = ctx.ctx().db()->getCollection( ns ); - if ( collection ) { - ClientCursor* cc = new ClientCursor(collection, - new PipelineRunner(pPipeline)); - // enable special locking and ns deletion behavior - cc->isAggCursor = true; - - pin.reset( new ClientCursorPin( collection, cc->cursorid() ) ); - - // we need this after cursor may have been deleted - cursorNs = cc->ns(); - } + ClientCursor* cursor = pin ? pin->c() : NULL; + if (pin) { + invariant(cursor); + invariant(cursor->getRunner() == runner); + invariant(cursor->isAggCursor); } BSONElement batchSizeElem = cmdObj.getFieldDotted("cursor.batchSize"); @@ -195,74 +183,56 @@ namespace { ? batchSizeElem.numberLong() : 101; // same as query - ClientCursor* cursor = NULL; - PipelineRunner* runner = NULL; - if ( pin ) { - cursor = pin->c(); - massert(16958, - "Cursor shouldn't have been deleted", - cursor); - verify(cursor->isAggCursor); - - runner = dynamic_cast<PipelineRunner*>(cursor->getRunner()); - verify(runner); - } - - try { - - // can't use result BSONObjBuilder directly since it won't handle exceptions correctly. - BSONArrayBuilder resultsArray; - const int byteLimit = MaxBytesToReturnToClientAtOnce; - BSONObj next; - if ( runner ) { - for (int objCount = 0; objCount < batchSize; objCount++) { - // The initial getNext() on a PipelineRunner may be very expensive so we don't - // do it when batchSize is 0 since that indicates a desire for a fast return. - if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) { - pin->deleteUnderlying(); - cursor = NULL; // make it an obvious error to use cursor after this point - break; - } - - if (resultsArray.len() + next.objsize() > byteLimit) { - // too big. next will be the first doc in the second batch - runner->pushBack(next); - break; - } - - resultsArray.append(next); - } - } - else { - // this is to ensure that side-effects such as $out occur, - // and that an empty output set is the correct result of this pipeline - invariant( pPipeline.get() ); - invariant( pPipeline->output() ); - invariant( !pPipeline->output()->getNext() ); + // can't use result BSONObjBuilder directly since it won't handle exceptions correctly. + BSONArrayBuilder resultsArray; + const int byteLimit = MaxBytesToReturnToClientAtOnce; + BSONObj next; + for (int objCount = 0; objCount < batchSize; objCount++) { + // The initial getNext() on a PipelineRunner may be very expensive so we don't + // do it when batchSize is 0 since that indicates a desire for a fast return. + if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) { + if (pin) pin->deleteUnderlying(); + // make it an obvious error to use cursor or runner after this point + cursor = NULL; + runner = NULL; + break; } - if (cursor) { - // If a time limit was set on the pipeline, remaining time is "rolled over" to the - // cursor (for use by future getmore ops). - cursor->setLeftoverMaxTimeMicros( cc().curop()->getRemainingMaxTimeMicros() ); + if (resultsArray.len() + next.objsize() > byteLimit) { + // too big. next will be the first doc in the second batch + runner->pushBack(next); + break; } - BSONObjBuilder cursorObj(result.subobjStart("cursor")); - if ( cursor ) - cursorObj.append("id", cursor->cursorid() ); - else - cursorObj.append("id", 0LL ); - cursorObj.append("ns", cursorNs); - cursorObj.append("firstBatch", resultsArray.arr()); - cursorObj.done(); + resultsArray.append(next); } - catch (...) { - // Clean up cursor on way out of scope. - if ( pin ) { - pin->deleteUnderlying(); - } - throw; + + // NOTE: runner->isEOF() can have side effects such as writing by $out. However, it should + // be relatively quick since if there was no pin then the input is empty. Also, this + // violates the contract for batchSize==0. Sharding requires a cursor to be returned in that + // case. This is ok for now however, since you can't have a sharded collection that doesn't + // exist. + const bool canReturnMoreBatches = pin; + if (!canReturnMoreBatches && runner && !runner->isEOF()) { + // msgasserting since this shouldn't be possible to trigger from today's aggregation + // language. The wording assumes that the only reason pin would be null is if the + // collection doesn't exist. + msgasserted(17391, str::stream() + << "Aggregation has more results than fit in initial batch, but can't " + << "create cursor since collection " << ns << " doesn't exist"); } + + if (cursor) { + // If a time limit was set on the pipeline, remaining time is "rolled over" to the + // cursor (for use by future getmore ops). + cursor->setLeftoverMaxTimeMicros( cc().curop()->getRemainingMaxTimeMicros() ); + } + + BSONObjBuilder cursorObj(result.subobjStart("cursor")); + cursorObj.append("id", cursor ? cursor->cursorid() : 0LL); + cursorObj.append("ns", ns); + cursorObj.append("firstBatch", resultsArray.arr()); + cursorObj.done(); } @@ -321,22 +291,52 @@ namespace { } #endif - // This does the mongod-specific stuff like creating a cursor - PipelineD::prepareCursorSource(pPipeline, pCtx); - - pPipeline->stitch(); - - if (pPipeline->isExplain()) { - result << "stages" << Value(pPipeline->writeExplainOps()); - return true; // don't do any actual execution + scoped_ptr<ClientCursorPin> pin; + PipelineRunner* runner = NULL; + auto_ptr<PipelineRunner> runnerHolder; + { + // This will throw if the sharding version for this connection is out of date. The + // lock must be held continuously from now until we have we created both the output + // ClientCursor and the input Runner. This ensures that both are using the same + // sharding version that we synchronize on here. This is also why we always need to + // create a ClientCursor even when we aren't outputting to a cursor. See the comment + // on ShardFilterStage for more details. + Client::ReadContext ctx(ns); + + Collection* collection = ctx.ctx().db()->getCollection(ns); + + // This does mongod-specific stuff like creating the input Runner if needed + PipelineD::prepareCursorSource(pPipeline, pCtx, collection); + pPipeline->stitch(); + + runnerHolder.reset(new PipelineRunner(pPipeline)); + runner = runnerHolder.get(); + + if (collection) { + ClientCursor* cursor = new ClientCursor(collection, runnerHolder.release()); + cursor->isAggCursor = true; // enable special locking behavior + pin.reset(new ClientCursorPin(collection, cursor->cursorid())); + // Don't add any code between here and the start of the try block. + } } - if (isCursorCommand(cmdObj)) { - handleCursorCommand(ns, pPipeline, cmdObj, result); + try { + if (pPipeline->isExplain()) { + result << "stages" << Value(pPipeline->writeExplainOps()); + } + else if (isCursorCommand(cmdObj)) { + handleCursorCommand(ns, pin.get(), runner, cmdObj, result); + } + else { + pPipeline->run(result); + } } - else { - pPipeline->run(result); + catch (...) { + // Clean up cursor on way out of scope. + if (pin) pin->deleteUnderlying(); + throw; } + // Any code that needs the cursor pinned must be inside the try block, above. return true; } diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 7bb6f0940d2..e85a5b3d3ee 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -63,9 +63,12 @@ namespace { }; } - void PipelineD::prepareCursorSource( - const intrusive_ptr<Pipeline> &pPipeline, - const intrusive_ptr<ExpressionContext> &pExpCtx) { + void PipelineD::prepareCursorSource(const intrusive_ptr<Pipeline>& pPipeline, + const intrusive_ptr<ExpressionContext>& pExpCtx, + Collection* collection) { + // get the full "namespace" name + const string& fullName = pExpCtx->ns.ns(); + Lock::assertAtLeastReadLocked(fullName); // We will be modifying the source vector as we go Pipeline::SourceContainer& sources = pPipeline->sources; @@ -125,9 +128,6 @@ namespace { } } - // get the full "namespace" name - const string& fullName = pExpCtx->ns.ns(); - // for debugging purposes, show what the query and sort are DEV { (log() << "\n---- query BSON\n" << @@ -138,11 +138,9 @@ namespace { fullName << "\n----\n"); } - // Create the necessary context to use a Runner, including taking a namespace read lock. - // Note: this may throw if the sharding version for this connection is out of date. - Client::ReadContext context(fullName); - Collection* collection = context.ctx().db()->getCollection(fullName); - if ( !collection ) { + if (!collection) { + // Collection doesn't exist. Create a source that will return no results to simulate an + // empty collection. intrusive_ptr<DocumentSource> source(DocumentSourceBsonArray::create(BSONObj(), pExpCtx)); while (!sources.empty() && source->coalesce(sources.front())) { diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index feaca0fc815..33cc6af35fa 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -31,6 +31,7 @@ #include "mongo/pch.h" namespace mongo { + class Collection; class DocumentSourceCursor; struct ExpressionContext; class Pipeline; @@ -60,12 +61,16 @@ namespace mongo { * * The cursor is added to the front of the pipeline's sources. * + * Must have a ReadContext before entering. + * * @param pPipeline the logical "this" for this operation * @param pExpCtx the expression context for this pipeline + * @param collection the input collection. NULL if doesn't exist. */ static void prepareCursorSource( const intrusive_ptr<Pipeline> &pPipeline, - const intrusive_ptr<ExpressionContext> &pExpCtx); + const intrusive_ptr<ExpressionContext> &pExpCtx, + Collection* collection); private: PipelineD(); // does not exist: prevent instantiation |