summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2014-01-30 17:16:36 -0500
committerMathias Stearn <mathias@10gen.com>2014-02-12 11:38:38 -0500
commit20806b5757b5bf4dbf524df0f332170012086af7 (patch)
treec0f71847c38417ae835bd2dff99eef4d59f9d4ce /src/mongo
parent8c99c9ad5cd5ef14f808c1c135f807255330f025 (diff)
downloadmongo-20806b5757b5bf4dbf524df0f332170012086af7.tar.gz
SERVER-12530 clean up agg output cursor creation and locking
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp188
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp20
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h7
3 files changed, 109 insertions, 106 deletions
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 7a024de270d..87deb59f496 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -166,28 +166,16 @@ namespace {
}
static void handleCursorCommand(const string& ns,
- intrusive_ptr<Pipeline>& pPipeline,
- BSONObj& cmdObj,
+ ClientCursorPin* pin,
+ PipelineRunner* runner,
+ const BSONObj& cmdObj,
BSONObjBuilder& result) {
- scoped_ptr<ClientCursorPin> pin;
- string cursorNs = ns;
-
- {
- // Set up cursor
- Client::ReadContext ctx(ns);
- Collection* collection = ctx.ctx().db()->getCollection( ns );
- if ( collection ) {
- ClientCursor* cc = new ClientCursor(collection,
- new PipelineRunner(pPipeline));
- // enable special locking and ns deletion behavior
- cc->isAggCursor = true;
-
- pin.reset( new ClientCursorPin( collection, cc->cursorid() ) );
-
- // we need this after cursor may have been deleted
- cursorNs = cc->ns();
- }
+ ClientCursor* cursor = pin ? pin->c() : NULL;
+ if (pin) {
+ invariant(cursor);
+ invariant(cursor->getRunner() == runner);
+ invariant(cursor->isAggCursor);
}
BSONElement batchSizeElem = cmdObj.getFieldDotted("cursor.batchSize");
@@ -195,74 +183,56 @@ namespace {
? batchSizeElem.numberLong()
: 101; // same as query
- ClientCursor* cursor = NULL;
- PipelineRunner* runner = NULL;
- if ( pin ) {
- cursor = pin->c();
- massert(16958,
- "Cursor shouldn't have been deleted",
- cursor);
- verify(cursor->isAggCursor);
-
- runner = dynamic_cast<PipelineRunner*>(cursor->getRunner());
- verify(runner);
- }
-
- try {
-
- // can't use result BSONObjBuilder directly since it won't handle exceptions correctly.
- BSONArrayBuilder resultsArray;
- const int byteLimit = MaxBytesToReturnToClientAtOnce;
- BSONObj next;
- if ( runner ) {
- for (int objCount = 0; objCount < batchSize; objCount++) {
- // The initial getNext() on a PipelineRunner may be very expensive so we don't
- // do it when batchSize is 0 since that indicates a desire for a fast return.
- if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) {
- pin->deleteUnderlying();
- cursor = NULL; // make it an obvious error to use cursor after this point
- break;
- }
-
- if (resultsArray.len() + next.objsize() > byteLimit) {
- // too big. next will be the first doc in the second batch
- runner->pushBack(next);
- break;
- }
-
- resultsArray.append(next);
- }
- }
- else {
- // this is to ensure that side-effects such as $out occur,
- // and that an empty output set is the correct result of this pipeline
- invariant( pPipeline.get() );
- invariant( pPipeline->output() );
- invariant( !pPipeline->output()->getNext() );
+ // can't use result BSONObjBuilder directly since it won't handle exceptions correctly.
+ BSONArrayBuilder resultsArray;
+ const int byteLimit = MaxBytesToReturnToClientAtOnce;
+ BSONObj next;
+ for (int objCount = 0; objCount < batchSize; objCount++) {
+ // The initial getNext() on a PipelineRunner may be very expensive so we don't
+ // do it when batchSize is 0 since that indicates a desire for a fast return.
+ if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) {
+ if (pin) pin->deleteUnderlying();
+ // make it an obvious error to use cursor or runner after this point
+ cursor = NULL;
+ runner = NULL;
+ break;
}
- if (cursor) {
- // If a time limit was set on the pipeline, remaining time is "rolled over" to the
- // cursor (for use by future getmore ops).
- cursor->setLeftoverMaxTimeMicros( cc().curop()->getRemainingMaxTimeMicros() );
+ if (resultsArray.len() + next.objsize() > byteLimit) {
+ // too big. next will be the first doc in the second batch
+ runner->pushBack(next);
+ break;
}
- BSONObjBuilder cursorObj(result.subobjStart("cursor"));
- if ( cursor )
- cursorObj.append("id", cursor->cursorid() );
- else
- cursorObj.append("id", 0LL );
- cursorObj.append("ns", cursorNs);
- cursorObj.append("firstBatch", resultsArray.arr());
- cursorObj.done();
+ resultsArray.append(next);
}
- catch (...) {
- // Clean up cursor on way out of scope.
- if ( pin ) {
- pin->deleteUnderlying();
- }
- throw;
+
+ // NOTE: runner->isEOF() can have side effects such as writing by $out. However, it should
+ // be relatively quick since if there was no pin then the input is empty. Also, this
+ // violates the contract for batchSize==0. Sharding requires a cursor to be returned in that
+ // case. This is ok for now however, since you can't have a sharded collection that doesn't
+ // exist.
+ const bool canReturnMoreBatches = pin;
+ if (!canReturnMoreBatches && runner && !runner->isEOF()) {
+ // msgasserting since this shouldn't be possible to trigger from today's aggregation
+ // language. The wording assumes that the only reason pin would be null is if the
+ // collection doesn't exist.
+ msgasserted(17391, str::stream()
+ << "Aggregation has more results than fit in initial batch, but can't "
+ << "create cursor since collection " << ns << " doesn't exist");
}
+
+ if (cursor) {
+ // If a time limit was set on the pipeline, remaining time is "rolled over" to the
+ // cursor (for use by future getmore ops).
+ cursor->setLeftoverMaxTimeMicros( cc().curop()->getRemainingMaxTimeMicros() );
+ }
+
+ BSONObjBuilder cursorObj(result.subobjStart("cursor"));
+ cursorObj.append("id", cursor ? cursor->cursorid() : 0LL);
+ cursorObj.append("ns", ns);
+ cursorObj.append("firstBatch", resultsArray.arr());
+ cursorObj.done();
}
@@ -321,22 +291,52 @@ namespace {
}
#endif
- // This does the mongod-specific stuff like creating a cursor
- PipelineD::prepareCursorSource(pPipeline, pCtx);
-
- pPipeline->stitch();
-
- if (pPipeline->isExplain()) {
- result << "stages" << Value(pPipeline->writeExplainOps());
- return true; // don't do any actual execution
+ scoped_ptr<ClientCursorPin> pin;
+ PipelineRunner* runner = NULL;
+ auto_ptr<PipelineRunner> runnerHolder;
+ {
+ // This will throw if the sharding version for this connection is out of date. The
+ // lock must be held continuously from now until we have we created both the output
+ // ClientCursor and the input Runner. This ensures that both are using the same
+ // sharding version that we synchronize on here. This is also why we always need to
+ // create a ClientCursor even when we aren't outputting to a cursor. See the comment
+ // on ShardFilterStage for more details.
+ Client::ReadContext ctx(ns);
+
+ Collection* collection = ctx.ctx().db()->getCollection(ns);
+
+ // This does mongod-specific stuff like creating the input Runner if needed
+ PipelineD::prepareCursorSource(pPipeline, pCtx, collection);
+ pPipeline->stitch();
+
+ runnerHolder.reset(new PipelineRunner(pPipeline));
+ runner = runnerHolder.get();
+
+ if (collection) {
+ ClientCursor* cursor = new ClientCursor(collection, runnerHolder.release());
+ cursor->isAggCursor = true; // enable special locking behavior
+ pin.reset(new ClientCursorPin(collection, cursor->cursorid()));
+ // Don't add any code between here and the start of the try block.
+ }
}
- if (isCursorCommand(cmdObj)) {
- handleCursorCommand(ns, pPipeline, cmdObj, result);
+ try {
+ if (pPipeline->isExplain()) {
+ result << "stages" << Value(pPipeline->writeExplainOps());
+ }
+ else if (isCursorCommand(cmdObj)) {
+ handleCursorCommand(ns, pin.get(), runner, cmdObj, result);
+ }
+ else {
+ pPipeline->run(result);
+ }
}
- else {
- pPipeline->run(result);
+ catch (...) {
+ // Clean up cursor on way out of scope.
+ if (pin) pin->deleteUnderlying();
+ throw;
}
+ // Any code that needs the cursor pinned must be inside the try block, above.
return true;
}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 7bb6f0940d2..e85a5b3d3ee 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -63,9 +63,12 @@ namespace {
};
}
- void PipelineD::prepareCursorSource(
- const intrusive_ptr<Pipeline> &pPipeline,
- const intrusive_ptr<ExpressionContext> &pExpCtx) {
+ void PipelineD::prepareCursorSource(const intrusive_ptr<Pipeline>& pPipeline,
+ const intrusive_ptr<ExpressionContext>& pExpCtx,
+ Collection* collection) {
+ // get the full "namespace" name
+ const string& fullName = pExpCtx->ns.ns();
+ Lock::assertAtLeastReadLocked(fullName);
// We will be modifying the source vector as we go
Pipeline::SourceContainer& sources = pPipeline->sources;
@@ -125,9 +128,6 @@ namespace {
}
}
- // get the full "namespace" name
- const string& fullName = pExpCtx->ns.ns();
-
// for debugging purposes, show what the query and sort are
DEV {
(log() << "\n---- query BSON\n" <<
@@ -138,11 +138,9 @@ namespace {
fullName << "\n----\n");
}
- // Create the necessary context to use a Runner, including taking a namespace read lock.
- // Note: this may throw if the sharding version for this connection is out of date.
- Client::ReadContext context(fullName);
- Collection* collection = context.ctx().db()->getCollection(fullName);
- if ( !collection ) {
+ if (!collection) {
+ // Collection doesn't exist. Create a source that will return no results to simulate an
+ // empty collection.
intrusive_ptr<DocumentSource> source(DocumentSourceBsonArray::create(BSONObj(),
pExpCtx));
while (!sources.empty() && source->coalesce(sources.front())) {
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index feaca0fc815..33cc6af35fa 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -31,6 +31,7 @@
#include "mongo/pch.h"
namespace mongo {
+ class Collection;
class DocumentSourceCursor;
struct ExpressionContext;
class Pipeline;
@@ -60,12 +61,16 @@ namespace mongo {
*
* The cursor is added to the front of the pipeline's sources.
*
+ * Must have a ReadContext before entering.
+ *
* @param pPipeline the logical "this" for this operation
* @param pExpCtx the expression context for this pipeline
+ * @param collection the input collection. NULL if doesn't exist.
*/
static void prepareCursorSource(
const intrusive_ptr<Pipeline> &pPipeline,
- const intrusive_ptr<ExpressionContext> &pExpCtx);
+ const intrusive_ptr<ExpressionContext> &pExpCtx,
+ Collection* collection);
private:
PipelineD(); // does not exist: prevent instantiation