summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/pipeline_command.cpp
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2015-06-20 00:22:50 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2015-06-20 10:56:02 -0400
commit9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch)
tree3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/db/commands/pipeline_command.cpp
parent01965cf52bce6976637ecb8f4a622aeb05ab256a (diff)
downloadmongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/db/commands/pipeline_command.cpp')
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp509
1 files changed, 249 insertions, 260 deletions
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 4f9274dc6b7..95423cf2e7b 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -54,295 +54,284 @@
namespace mongo {
- using boost::intrusive_ptr;
- using std::endl;
- using std::shared_ptr;
- using std::string;
- using std::stringstream;
- using std::unique_ptr;
-
- /**
- * Returns true if we need to keep a ClientCursor saved for this pipeline (for future getMore
- * requests). Otherwise, returns false.
- */
- static bool handleCursorCommand(OperationContext* txn,
- const string& ns,
- ClientCursorPin* pin,
- PlanExecutor* exec,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) {
-
- ClientCursor* cursor = pin ? pin->c() : NULL;
- if (pin) {
- invariant(cursor);
- invariant(cursor->getExecutor() == exec);
- invariant(cursor->isAggCursor());
- }
+using boost::intrusive_ptr;
+using std::endl;
+using std::shared_ptr;
+using std::string;
+using std::stringstream;
+using std::unique_ptr;
- const long long defaultBatchSize = 101; // Same as query.
- long long batchSize;
- uassertStatusOK(Command::parseCommandCursorOptions(cmdObj, defaultBatchSize, &batchSize));
-
- // can't use result BSONObjBuilder directly since it won't handle exceptions correctly.
- BSONArrayBuilder resultsArray;
- const int byteLimit = MaxBytesToReturnToClientAtOnce;
- BSONObj next;
- for (int objCount = 0; objCount < batchSize; objCount++) {
- // The initial getNext() on a PipelineProxyStage may be very expensive so we don't
- // do it when batchSize is 0 since that indicates a desire for a fast return.
- if (exec->getNext(&next, NULL) != PlanExecutor::ADVANCED) {
- // make it an obvious error to use cursor or executor after this point
- cursor = NULL;
- exec = NULL;
- break;
- }
+/**
+ * Returns true if we need to keep a ClientCursor saved for this pipeline (for future getMore
+ * requests). Otherwise, returns false.
+ */
+static bool handleCursorCommand(OperationContext* txn,
+ const string& ns,
+ ClientCursorPin* pin,
+ PlanExecutor* exec,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) {
+ ClientCursor* cursor = pin ? pin->c() : NULL;
+ if (pin) {
+ invariant(cursor);
+ invariant(cursor->getExecutor() == exec);
+ invariant(cursor->isAggCursor());
+ }
- if (resultsArray.len() + next.objsize() > byteLimit) {
- // Get the pipeline proxy stage wrapped by this PlanExecutor.
- PipelineProxyStage* proxy = static_cast<PipelineProxyStage*>(exec->getRootStage());
- // too big. next will be the first doc in the second batch
- proxy->pushBack(next);
- break;
- }
+ const long long defaultBatchSize = 101; // Same as query.
+ long long batchSize;
+ uassertStatusOK(Command::parseCommandCursorOptions(cmdObj, defaultBatchSize, &batchSize));
+
+ // can't use result BSONObjBuilder directly since it won't handle exceptions correctly.
+ BSONArrayBuilder resultsArray;
+ const int byteLimit = MaxBytesToReturnToClientAtOnce;
+ BSONObj next;
+ for (int objCount = 0; objCount < batchSize; objCount++) {
+ // The initial getNext() on a PipelineProxyStage may be very expensive so we don't
+ // do it when batchSize is 0 since that indicates a desire for a fast return.
+ if (exec->getNext(&next, NULL) != PlanExecutor::ADVANCED) {
+ // make it an obvious error to use cursor or executor after this point
+ cursor = NULL;
+ exec = NULL;
+ break;
+ }
- resultsArray.append(next);
+ if (resultsArray.len() + next.objsize() > byteLimit) {
+ // Get the pipeline proxy stage wrapped by this PlanExecutor.
+ PipelineProxyStage* proxy = static_cast<PipelineProxyStage*>(exec->getRootStage());
+ // too big. next will be the first doc in the second batch
+ proxy->pushBack(next);
+ break;
}
- // NOTE: exec->isEOF() can have side effects such as writing by $out. However, it should
- // be relatively quick since if there was no pin then the input is empty. Also, this
- // violates the contract for batchSize==0. Sharding requires a cursor to be returned in that
- // case. This is ok for now however, since you can't have a sharded collection that doesn't
- // exist.
- const bool canReturnMoreBatches = pin;
- if (!canReturnMoreBatches && exec && !exec->isEOF()) {
- // msgasserting since this shouldn't be possible to trigger from today's aggregation
- // language. The wording assumes that the only reason pin would be null is if the
- // collection doesn't exist.
- msgasserted(17391, str::stream()
- << "Aggregation has more results than fit in initial batch, but can't "
- << "create cursor since collection " << ns << " doesn't exist");
+ resultsArray.append(next);
+ }
+
+ // NOTE: exec->isEOF() can have side effects such as writing by $out. However, it should
+ // be relatively quick since if there was no pin then the input is empty. Also, this
+ // violates the contract for batchSize==0. Sharding requires a cursor to be returned in that
+ // case. This is ok for now however, since you can't have a sharded collection that doesn't
+ // exist.
+ const bool canReturnMoreBatches = pin;
+ if (!canReturnMoreBatches && exec && !exec->isEOF()) {
+ // msgasserting since this shouldn't be possible to trigger from today's aggregation
+ // language. The wording assumes that the only reason pin would be null is if the
+ // collection doesn't exist.
+ msgasserted(
+ 17391,
+ str::stream() << "Aggregation has more results than fit in initial batch, but can't "
+ << "create cursor since collection " << ns << " doesn't exist");
+ }
+
+ if (cursor) {
+ // If a time limit was set on the pipeline, remaining time is "rolled over" to the
+ // cursor (for use by future getmore ops).
+ cursor->setLeftoverMaxTimeMicros(CurOp::get(txn)->getRemainingMaxTimeMicros());
+
+ CurOp::get(txn)->debug().cursorid = cursor->cursorid();
+
+ if (txn->getClient()->isInDirectClient()) {
+ cursor->setUnownedRecoveryUnit(txn->recoveryUnit());
+ } else {
+ // We stash away the RecoveryUnit in the ClientCursor. It's used for subsequent
+ // getMore requests. The calling OpCtx gets a fresh RecoveryUnit.
+ txn->recoveryUnit()->abandonSnapshot();
+ cursor->setOwnedRecoveryUnit(txn->releaseRecoveryUnit());
+ StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
+ invariant(txn->setRecoveryUnit(storageEngine->newRecoveryUnit(),
+ OperationContext::kNotInUnitOfWork) ==
+ OperationContext::kNotInUnitOfWork);
}
- if (cursor) {
- // If a time limit was set on the pipeline, remaining time is "rolled over" to the
- // cursor (for use by future getmore ops).
- cursor->setLeftoverMaxTimeMicros( CurOp::get(txn)->getRemainingMaxTimeMicros() );
+ // Cursor needs to be in a saved state while we yield locks for getmore. State
+ // will be restored in getMore().
+ exec->saveState();
+ }
- CurOp::get(txn)->debug().cursorid = cursor->cursorid();
+ const long long cursorId = cursor ? cursor->cursorid() : 0LL;
+ appendCursorResponseObject(cursorId, ns, resultsArray.arr(), &result);
- if (txn->getClient()->isInDirectClient()) {
- cursor->setUnownedRecoveryUnit(txn->recoveryUnit());
- }
- else {
- // We stash away the RecoveryUnit in the ClientCursor. It's used for subsequent
- // getMore requests. The calling OpCtx gets a fresh RecoveryUnit.
- txn->recoveryUnit()->abandonSnapshot();
- cursor->setOwnedRecoveryUnit(txn->releaseRecoveryUnit());
- StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
- invariant(txn->setRecoveryUnit(storageEngine->newRecoveryUnit(),
- OperationContext::kNotInUnitOfWork)
- == OperationContext::kNotInUnitOfWork);
- }
+ return static_cast<bool>(cursor);
+}
- // Cursor needs to be in a saved state while we yield locks for getmore. State
- // will be restored in getMore().
- exec->saveState();
- }
- const long long cursorId = cursor ? cursor->cursorid() : 0LL;
- appendCursorResponseObject(cursorId, ns, resultsArray.arr(), &result);
+class PipelineCommand : public Command {
+public:
+ PipelineCommand() : Command(Pipeline::commandName) {} // command is called "aggregate"
- return static_cast<bool>(cursor);
+ // Locks are managed manually, in particular by DocumentSourceCursor.
+ virtual bool isWriteCommandForConfigServer() const {
+ return false;
+ }
+ virtual bool slaveOk() const {
+ return false;
+ }
+ virtual bool slaveOverrideOk() const {
+ return true;
+ }
+ virtual void help(stringstream& help) const {
+ help << "{ pipeline: [ { $operator: {...}}, ... ]"
+ << ", explain: <bool>"
+ << ", allowDiskUse: <bool>"
+ << ", cursor: {batchSize: <number>}"
+ << " }" << endl
+ << "See http://dochub.mongodb.org/core/aggregation for more details.";
}
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ Pipeline::addRequiredPrivileges(this, dbname, cmdObj, out);
+ }
- class PipelineCommand :
- public Command {
- public:
- PipelineCommand() :Command(Pipeline::commandName) {} // command is called "aggregate"
-
- // Locks are managed manually, in particular by DocumentSourceCursor.
- virtual bool isWriteCommandForConfigServer() const { return false; }
- virtual bool slaveOk() const { return false; }
- virtual bool slaveOverrideOk() const { return true; }
- virtual void help(stringstream &help) const {
- help << "{ pipeline: [ { $operator: {...}}, ... ]"
- << ", explain: <bool>"
- << ", allowDiskUse: <bool>"
- << ", cursor: {batchSize: <number>}"
- << " }"
- << endl
- << "See http://dochub.mongodb.org/core/aggregation for more details."
- ;
+ virtual bool run(OperationContext* txn,
+ const string& db,
+ BSONObj& cmdObj,
+ int options,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ const std::string ns = parseNs(db, cmdObj);
+ if (nsToCollectionSubstring(ns).empty()) {
+ errmsg = "missing collection name";
+ return false;
}
-
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- Pipeline::addRequiredPrivileges(this, dbname, cmdObj, out);
+ NamespaceString nss(ns);
+
+ intrusive_ptr<ExpressionContext> pCtx = new ExpressionContext(txn, nss);
+ pCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
+
+ /* try to parse the command; if this fails, then we didn't run */
+ intrusive_ptr<Pipeline> pPipeline = Pipeline::parseCommand(errmsg, cmdObj, pCtx);
+ if (!pPipeline.get())
+ return false;
+
+ // This is outside of the if block to keep the object alive until the pipeline is finished.
+ BSONObj parsed;
+ if (kDebugBuild && !pPipeline->isExplain() && !pCtx->inShard) {
+ // Make sure all operations round-trip through Pipeline::toBson() correctly by
+ // reparsing every command in debug builds. This is important because sharded
+ // aggregations rely on this ability. Skipping when inShard because this has
+ // already been through the transformation (and this unsets pCtx->inShard).
+ parsed = pPipeline->serialize().toBson();
+ pPipeline = Pipeline::parseCommand(errmsg, parsed, pCtx);
+ verify(pPipeline);
}
- virtual bool run(OperationContext* txn,
- const string &db,
- BSONObj &cmdObj,
- int options,
- string &errmsg,
- BSONObjBuilder &result) {
- const std::string ns = parseNs(db, cmdObj);
- if (nsToCollectionSubstring(ns).empty()) {
- errmsg = "missing collection name";
- return false;
+ PlanExecutor* exec = NULL;
+ unique_ptr<ClientCursorPin> pin; // either this OR the execHolder will be non-null
+ unique_ptr<PlanExecutor> execHolder;
+ {
+ // This will throw if the sharding version for this connection is out of date. The
+ // lock must be held continuously from now until we have we created both the output
+ // ClientCursor and the input executor. This ensures that both are using the same
+ // sharding version that we synchronize on here. This is also why we always need to
+ // create a ClientCursor even when we aren't outputting to a cursor. See the comment
+ // on ShardFilterStage for more details.
+ AutoGetCollectionForRead ctx(txn, nss.ns());
+
+ Collection* collection = ctx.getCollection();
+
+ // This does mongod-specific stuff like creating the input PlanExecutor and adding
+ // it to the front of the pipeline if needed.
+ std::shared_ptr<PlanExecutor> input =
+ PipelineD::prepareCursorSource(txn, collection, pPipeline, pCtx);
+ pPipeline->stitch();
+
+ // Create the PlanExecutor which returns results from the pipeline. The WorkingSet
+ // ('ws') and the PipelineProxyStage ('proxy') will be owned by the created
+ // PlanExecutor.
+ unique_ptr<WorkingSet> ws(new WorkingSet());
+ unique_ptr<PipelineProxyStage> proxy(
+ new PipelineProxyStage(pPipeline, input, ws.get()));
+ Status execStatus = Status::OK();
+ if (NULL == collection) {
+ execStatus = PlanExecutor::make(txn,
+ ws.release(),
+ proxy.release(),
+ nss.ns(),
+ PlanExecutor::YIELD_MANUAL,
+ &exec);
+ } else {
+ execStatus = PlanExecutor::make(txn,
+ ws.release(),
+ proxy.release(),
+ collection,
+ PlanExecutor::YIELD_MANUAL,
+ &exec);
}
- NamespaceString nss(ns);
-
- intrusive_ptr<ExpressionContext> pCtx = new ExpressionContext(txn, nss);
- pCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
-
- /* try to parse the command; if this fails, then we didn't run */
- intrusive_ptr<Pipeline> pPipeline = Pipeline::parseCommand(errmsg, cmdObj, pCtx);
- if (!pPipeline.get())
- return false;
-
- // This is outside of the if block to keep the object alive until the pipeline is finished.
- BSONObj parsed;
- if (kDebugBuild && !pPipeline->isExplain() && !pCtx->inShard) {
- // Make sure all operations round-trip through Pipeline::toBson() correctly by
- // reparsing every command in debug builds. This is important because sharded
- // aggregations rely on this ability. Skipping when inShard because this has
- // already been through the transformation (and this unsets pCtx->inShard).
- parsed = pPipeline->serialize().toBson();
- pPipeline = Pipeline::parseCommand(errmsg, parsed, pCtx);
- verify(pPipeline);
+ invariant(execStatus.isOK());
+ execHolder.reset(exec);
+
+ if (!collection && input) {
+ // If we don't have a collection, we won't be able to register any executors, so
+ // make sure that the input PlanExecutor (likely wrapping an EOFStage) doesn't
+ // need to be registered.
+ invariant(!input->collection());
}
- PlanExecutor* exec = NULL;
- unique_ptr<ClientCursorPin> pin; // either this OR the execHolder will be non-null
- unique_ptr<PlanExecutor> execHolder;
- {
- // This will throw if the sharding version for this connection is out of date. The
- // lock must be held continuously from now until we have we created both the output
- // ClientCursor and the input executor. This ensures that both are using the same
- // sharding version that we synchronize on here. This is also why we always need to
- // create a ClientCursor even when we aren't outputting to a cursor. See the comment
- // on ShardFilterStage for more details.
- AutoGetCollectionForRead ctx(txn, nss.ns());
-
- Collection* collection = ctx.getCollection();
-
- // This does mongod-specific stuff like creating the input PlanExecutor and adding
- // it to the front of the pipeline if needed.
- std::shared_ptr<PlanExecutor> input = PipelineD::prepareCursorSource(txn,
- collection,
- pPipeline,
- pCtx);
- pPipeline->stitch();
-
- // Create the PlanExecutor which returns results from the pipeline. The WorkingSet
- // ('ws') and the PipelineProxyStage ('proxy') will be owned by the created
- // PlanExecutor.
- unique_ptr<WorkingSet> ws(new WorkingSet());
- unique_ptr<PipelineProxyStage> proxy(
- new PipelineProxyStage(pPipeline, input, ws.get()));
- Status execStatus = Status::OK();
- if (NULL == collection) {
- execStatus = PlanExecutor::make(txn,
- ws.release(),
- proxy.release(),
- nss.ns(),
- PlanExecutor::YIELD_MANUAL,
- &exec);
- }
- else {
- execStatus = PlanExecutor::make(txn,
- ws.release(),
- proxy.release(),
- collection,
- PlanExecutor::YIELD_MANUAL,
- &exec);
- }
- invariant(execStatus.isOK());
- execHolder.reset(exec);
-
- if (!collection && input) {
- // If we don't have a collection, we won't be able to register any executors, so
- // make sure that the input PlanExecutor (likely wrapping an EOFStage) doesn't
- // need to be registered.
- invariant(!input->collection());
- }
-
- if (collection) {
- const bool isAggCursor = true; // enable special locking behavior
- ClientCursor* cursor = new ClientCursor(collection->getCursorManager(),
- execHolder.release(),
- nss.ns(),
- 0,
- cmdObj.getOwned(),
- isAggCursor);
- pin.reset(new ClientCursorPin(collection->getCursorManager(),
- cursor->cursorid()));
- // Don't add any code between here and the start of the try block.
- }
-
- // At this point, it is safe to release the collection lock.
- // - In the case where we have a collection: we will need to reacquire the
- // collection lock later when cleaning up our ClientCursorPin.
- // - In the case where we don't have a collection: our PlanExecutor won't be
- // registered, so it will be safe to clean it up outside the lock.
- invariant(NULL == execHolder.get() || NULL == execHolder->collection());
+ if (collection) {
+ const bool isAggCursor = true; // enable special locking behavior
+ ClientCursor* cursor = new ClientCursor(collection->getCursorManager(),
+ execHolder.release(),
+ nss.ns(),
+ 0,
+ cmdObj.getOwned(),
+ isAggCursor);
+ pin.reset(new ClientCursorPin(collection->getCursorManager(), cursor->cursorid()));
+ // Don't add any code between here and the start of the try block.
}
- try {
- // Unless set to true, the ClientCursor created above will be deleted on block exit.
- bool keepCursor = false;
+ // At this point, it is safe to release the collection lock.
+ // - In the case where we have a collection: we will need to reacquire the
+ // collection lock later when cleaning up our ClientCursorPin.
+ // - In the case where we don't have a collection: our PlanExecutor won't be
+ // registered, so it will be safe to clean it up outside the lock.
+ invariant(NULL == execHolder.get() || NULL == execHolder->collection());
+ }
- const bool isCursorCommand = !cmdObj["cursor"].eoo();
+ try {
+ // Unless set to true, the ClientCursor created above will be deleted on block exit.
+ bool keepCursor = false;
- // If both explain and cursor are specified, explain wins.
- if (pPipeline->isExplain()) {
- result << "stages" << Value(pPipeline->writeExplainOps());
- }
- else if (isCursorCommand) {
- keepCursor = handleCursorCommand(txn,
- nss.ns(),
- pin.get(),
- exec,
- cmdObj,
- result);
- }
- else {
- pPipeline->run(result);
- }
+ const bool isCursorCommand = !cmdObj["cursor"].eoo();
- // Clean up our ClientCursorPin, if needed. We must reacquire the collection lock
- // in order to do so.
- if (pin) {
- // We acquire locks here with DBLock and CollectionLock instead of using
- // AutoGetCollectionForRead. AutoGetCollectionForRead will throw if the
- // sharding version is out of date, and we don't care if the sharding version
- // has changed.
- Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_IS);
- Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_IS);
- if (keepCursor) {
- pin->release();
- }
- else {
- pin->deleteUnderlying();
- }
- }
+ // If both explain and cursor are specified, explain wins.
+ if (pPipeline->isExplain()) {
+ result << "stages" << Value(pPipeline->writeExplainOps());
+ } else if (isCursorCommand) {
+ keepCursor = handleCursorCommand(txn, nss.ns(), pin.get(), exec, cmdObj, result);
+ } else {
+ pPipeline->run(result);
}
- catch (...) {
- // On our way out of scope, we clean up our ClientCursorPin if needed.
- if (pin) {
- Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_IS);
- Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_IS);
+
+ // Clean up our ClientCursorPin, if needed. We must reacquire the collection lock
+ // in order to do so.
+ if (pin) {
+ // We acquire locks here with DBLock and CollectionLock instead of using
+ // AutoGetCollectionForRead. AutoGetCollectionForRead will throw if the
+ // sharding version is out of date, and we don't care if the sharding version
+ // has changed.
+ Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_IS);
+ Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_IS);
+ if (keepCursor) {
+ pin->release();
+ } else {
pin->deleteUnderlying();
}
- throw;
}
- // Any code that needs the cursor pinned must be inside the try block, above.
-
- return true;
+ } catch (...) {
+ // On our way out of scope, we clean up our ClientCursorPin if needed.
+ if (pin) {
+ Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_IS);
+ Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_IS);
+ pin->deleteUnderlying();
+ }
+ throw;
}
- } cmdPipeline;
+ // Any code that needs the cursor pinned must be inside the try block, above.
+
+ return true;
+ }
+} cmdPipeline;
-} // namespace mongo
+} // namespace mongo