diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 00:22:50 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 10:56:02 -0400 |
commit | 9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch) | |
tree | 3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/db/commands/pipeline_command.cpp | |
parent | 01965cf52bce6976637ecb8f4a622aeb05ab256a (diff) | |
download | mongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz |
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/db/commands/pipeline_command.cpp')
-rw-r--r-- | src/mongo/db/commands/pipeline_command.cpp | 509 |
1 files changed, 249 insertions, 260 deletions
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 4f9274dc6b7..95423cf2e7b 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -54,295 +54,284 @@ namespace mongo { - using boost::intrusive_ptr; - using std::endl; - using std::shared_ptr; - using std::string; - using std::stringstream; - using std::unique_ptr; - - /** - * Returns true if we need to keep a ClientCursor saved for this pipeline (for future getMore - * requests). Otherwise, returns false. - */ - static bool handleCursorCommand(OperationContext* txn, - const string& ns, - ClientCursorPin* pin, - PlanExecutor* exec, - const BSONObj& cmdObj, - BSONObjBuilder& result) { - - ClientCursor* cursor = pin ? pin->c() : NULL; - if (pin) { - invariant(cursor); - invariant(cursor->getExecutor() == exec); - invariant(cursor->isAggCursor()); - } +using boost::intrusive_ptr; +using std::endl; +using std::shared_ptr; +using std::string; +using std::stringstream; +using std::unique_ptr; - const long long defaultBatchSize = 101; // Same as query. - long long batchSize; - uassertStatusOK(Command::parseCommandCursorOptions(cmdObj, defaultBatchSize, &batchSize)); - - // can't use result BSONObjBuilder directly since it won't handle exceptions correctly. - BSONArrayBuilder resultsArray; - const int byteLimit = MaxBytesToReturnToClientAtOnce; - BSONObj next; - for (int objCount = 0; objCount < batchSize; objCount++) { - // The initial getNext() on a PipelineProxyStage may be very expensive so we don't - // do it when batchSize is 0 since that indicates a desire for a fast return. - if (exec->getNext(&next, NULL) != PlanExecutor::ADVANCED) { - // make it an obvious error to use cursor or executor after this point - cursor = NULL; - exec = NULL; - break; - } +/** + * Returns true if we need to keep a ClientCursor saved for this pipeline (for future getMore + * requests). Otherwise, returns false. + */ +static bool handleCursorCommand(OperationContext* txn, + const string& ns, + ClientCursorPin* pin, + PlanExecutor* exec, + const BSONObj& cmdObj, + BSONObjBuilder& result) { + ClientCursor* cursor = pin ? pin->c() : NULL; + if (pin) { + invariant(cursor); + invariant(cursor->getExecutor() == exec); + invariant(cursor->isAggCursor()); + } - if (resultsArray.len() + next.objsize() > byteLimit) { - // Get the pipeline proxy stage wrapped by this PlanExecutor. - PipelineProxyStage* proxy = static_cast<PipelineProxyStage*>(exec->getRootStage()); - // too big. next will be the first doc in the second batch - proxy->pushBack(next); - break; - } + const long long defaultBatchSize = 101; // Same as query. + long long batchSize; + uassertStatusOK(Command::parseCommandCursorOptions(cmdObj, defaultBatchSize, &batchSize)); + + // can't use result BSONObjBuilder directly since it won't handle exceptions correctly. + BSONArrayBuilder resultsArray; + const int byteLimit = MaxBytesToReturnToClientAtOnce; + BSONObj next; + for (int objCount = 0; objCount < batchSize; objCount++) { + // The initial getNext() on a PipelineProxyStage may be very expensive so we don't + // do it when batchSize is 0 since that indicates a desire for a fast return. + if (exec->getNext(&next, NULL) != PlanExecutor::ADVANCED) { + // make it an obvious error to use cursor or executor after this point + cursor = NULL; + exec = NULL; + break; + } - resultsArray.append(next); + if (resultsArray.len() + next.objsize() > byteLimit) { + // Get the pipeline proxy stage wrapped by this PlanExecutor. + PipelineProxyStage* proxy = static_cast<PipelineProxyStage*>(exec->getRootStage()); + // too big. next will be the first doc in the second batch + proxy->pushBack(next); + break; } - // NOTE: exec->isEOF() can have side effects such as writing by $out. However, it should - // be relatively quick since if there was no pin then the input is empty. Also, this - // violates the contract for batchSize==0. Sharding requires a cursor to be returned in that - // case. This is ok for now however, since you can't have a sharded collection that doesn't - // exist. - const bool canReturnMoreBatches = pin; - if (!canReturnMoreBatches && exec && !exec->isEOF()) { - // msgasserting since this shouldn't be possible to trigger from today's aggregation - // language. The wording assumes that the only reason pin would be null is if the - // collection doesn't exist. - msgasserted(17391, str::stream() - << "Aggregation has more results than fit in initial batch, but can't " - << "create cursor since collection " << ns << " doesn't exist"); + resultsArray.append(next); + } + + // NOTE: exec->isEOF() can have side effects such as writing by $out. However, it should + // be relatively quick since if there was no pin then the input is empty. Also, this + // violates the contract for batchSize==0. Sharding requires a cursor to be returned in that + // case. This is ok for now however, since you can't have a sharded collection that doesn't + // exist. + const bool canReturnMoreBatches = pin; + if (!canReturnMoreBatches && exec && !exec->isEOF()) { + // msgasserting since this shouldn't be possible to trigger from today's aggregation + // language. The wording assumes that the only reason pin would be null is if the + // collection doesn't exist. + msgasserted( + 17391, + str::stream() << "Aggregation has more results than fit in initial batch, but can't " + << "create cursor since collection " << ns << " doesn't exist"); + } + + if (cursor) { + // If a time limit was set on the pipeline, remaining time is "rolled over" to the + // cursor (for use by future getmore ops). + cursor->setLeftoverMaxTimeMicros(CurOp::get(txn)->getRemainingMaxTimeMicros()); + + CurOp::get(txn)->debug().cursorid = cursor->cursorid(); + + if (txn->getClient()->isInDirectClient()) { + cursor->setUnownedRecoveryUnit(txn->recoveryUnit()); + } else { + // We stash away the RecoveryUnit in the ClientCursor. It's used for subsequent + // getMore requests. The calling OpCtx gets a fresh RecoveryUnit. + txn->recoveryUnit()->abandonSnapshot(); + cursor->setOwnedRecoveryUnit(txn->releaseRecoveryUnit()); + StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); + invariant(txn->setRecoveryUnit(storageEngine->newRecoveryUnit(), + OperationContext::kNotInUnitOfWork) == + OperationContext::kNotInUnitOfWork); } - if (cursor) { - // If a time limit was set on the pipeline, remaining time is "rolled over" to the - // cursor (for use by future getmore ops). - cursor->setLeftoverMaxTimeMicros( CurOp::get(txn)->getRemainingMaxTimeMicros() ); + // Cursor needs to be in a saved state while we yield locks for getmore. State + // will be restored in getMore(). + exec->saveState(); + } - CurOp::get(txn)->debug().cursorid = cursor->cursorid(); + const long long cursorId = cursor ? cursor->cursorid() : 0LL; + appendCursorResponseObject(cursorId, ns, resultsArray.arr(), &result); - if (txn->getClient()->isInDirectClient()) { - cursor->setUnownedRecoveryUnit(txn->recoveryUnit()); - } - else { - // We stash away the RecoveryUnit in the ClientCursor. It's used for subsequent - // getMore requests. The calling OpCtx gets a fresh RecoveryUnit. - txn->recoveryUnit()->abandonSnapshot(); - cursor->setOwnedRecoveryUnit(txn->releaseRecoveryUnit()); - StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); - invariant(txn->setRecoveryUnit(storageEngine->newRecoveryUnit(), - OperationContext::kNotInUnitOfWork) - == OperationContext::kNotInUnitOfWork); - } + return static_cast<bool>(cursor); +} - // Cursor needs to be in a saved state while we yield locks for getmore. State - // will be restored in getMore(). - exec->saveState(); - } - const long long cursorId = cursor ? cursor->cursorid() : 0LL; - appendCursorResponseObject(cursorId, ns, resultsArray.arr(), &result); +class PipelineCommand : public Command { +public: + PipelineCommand() : Command(Pipeline::commandName) {} // command is called "aggregate" - return static_cast<bool>(cursor); + // Locks are managed manually, in particular by DocumentSourceCursor. + virtual bool isWriteCommandForConfigServer() const { + return false; + } + virtual bool slaveOk() const { + return false; + } + virtual bool slaveOverrideOk() const { + return true; + } + virtual void help(stringstream& help) const { + help << "{ pipeline: [ { $operator: {...}}, ... ]" + << ", explain: <bool>" + << ", allowDiskUse: <bool>" + << ", cursor: {batchSize: <number>}" + << " }" << endl + << "See http://dochub.mongodb.org/core/aggregation for more details."; } + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + Pipeline::addRequiredPrivileges(this, dbname, cmdObj, out); + } - class PipelineCommand : - public Command { - public: - PipelineCommand() :Command(Pipeline::commandName) {} // command is called "aggregate" - - // Locks are managed manually, in particular by DocumentSourceCursor. - virtual bool isWriteCommandForConfigServer() const { return false; } - virtual bool slaveOk() const { return false; } - virtual bool slaveOverrideOk() const { return true; } - virtual void help(stringstream &help) const { - help << "{ pipeline: [ { $operator: {...}}, ... ]" - << ", explain: <bool>" - << ", allowDiskUse: <bool>" - << ", cursor: {batchSize: <number>}" - << " }" - << endl - << "See http://dochub.mongodb.org/core/aggregation for more details." - ; + virtual bool run(OperationContext* txn, + const string& db, + BSONObj& cmdObj, + int options, + string& errmsg, + BSONObjBuilder& result) { + const std::string ns = parseNs(db, cmdObj); + if (nsToCollectionSubstring(ns).empty()) { + errmsg = "missing collection name"; + return false; } - - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - Pipeline::addRequiredPrivileges(this, dbname, cmdObj, out); + NamespaceString nss(ns); + + intrusive_ptr<ExpressionContext> pCtx = new ExpressionContext(txn, nss); + pCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; + + /* try to parse the command; if this fails, then we didn't run */ + intrusive_ptr<Pipeline> pPipeline = Pipeline::parseCommand(errmsg, cmdObj, pCtx); + if (!pPipeline.get()) + return false; + + // This is outside of the if block to keep the object alive until the pipeline is finished. + BSONObj parsed; + if (kDebugBuild && !pPipeline->isExplain() && !pCtx->inShard) { + // Make sure all operations round-trip through Pipeline::toBson() correctly by + // reparsing every command in debug builds. This is important because sharded + // aggregations rely on this ability. Skipping when inShard because this has + // already been through the transformation (and this unsets pCtx->inShard). + parsed = pPipeline->serialize().toBson(); + pPipeline = Pipeline::parseCommand(errmsg, parsed, pCtx); + verify(pPipeline); } - virtual bool run(OperationContext* txn, - const string &db, - BSONObj &cmdObj, - int options, - string &errmsg, - BSONObjBuilder &result) { - const std::string ns = parseNs(db, cmdObj); - if (nsToCollectionSubstring(ns).empty()) { - errmsg = "missing collection name"; - return false; + PlanExecutor* exec = NULL; + unique_ptr<ClientCursorPin> pin; // either this OR the execHolder will be non-null + unique_ptr<PlanExecutor> execHolder; + { + // This will throw if the sharding version for this connection is out of date. The + // lock must be held continuously from now until we have we created both the output + // ClientCursor and the input executor. This ensures that both are using the same + // sharding version that we synchronize on here. This is also why we always need to + // create a ClientCursor even when we aren't outputting to a cursor. See the comment + // on ShardFilterStage for more details. + AutoGetCollectionForRead ctx(txn, nss.ns()); + + Collection* collection = ctx.getCollection(); + + // This does mongod-specific stuff like creating the input PlanExecutor and adding + // it to the front of the pipeline if needed. + std::shared_ptr<PlanExecutor> input = + PipelineD::prepareCursorSource(txn, collection, pPipeline, pCtx); + pPipeline->stitch(); + + // Create the PlanExecutor which returns results from the pipeline. The WorkingSet + // ('ws') and the PipelineProxyStage ('proxy') will be owned by the created + // PlanExecutor. + unique_ptr<WorkingSet> ws(new WorkingSet()); + unique_ptr<PipelineProxyStage> proxy( + new PipelineProxyStage(pPipeline, input, ws.get())); + Status execStatus = Status::OK(); + if (NULL == collection) { + execStatus = PlanExecutor::make(txn, + ws.release(), + proxy.release(), + nss.ns(), + PlanExecutor::YIELD_MANUAL, + &exec); + } else { + execStatus = PlanExecutor::make(txn, + ws.release(), + proxy.release(), + collection, + PlanExecutor::YIELD_MANUAL, + &exec); } - NamespaceString nss(ns); - - intrusive_ptr<ExpressionContext> pCtx = new ExpressionContext(txn, nss); - pCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; - - /* try to parse the command; if this fails, then we didn't run */ - intrusive_ptr<Pipeline> pPipeline = Pipeline::parseCommand(errmsg, cmdObj, pCtx); - if (!pPipeline.get()) - return false; - - // This is outside of the if block to keep the object alive until the pipeline is finished. - BSONObj parsed; - if (kDebugBuild && !pPipeline->isExplain() && !pCtx->inShard) { - // Make sure all operations round-trip through Pipeline::toBson() correctly by - // reparsing every command in debug builds. This is important because sharded - // aggregations rely on this ability. Skipping when inShard because this has - // already been through the transformation (and this unsets pCtx->inShard). - parsed = pPipeline->serialize().toBson(); - pPipeline = Pipeline::parseCommand(errmsg, parsed, pCtx); - verify(pPipeline); + invariant(execStatus.isOK()); + execHolder.reset(exec); + + if (!collection && input) { + // If we don't have a collection, we won't be able to register any executors, so + // make sure that the input PlanExecutor (likely wrapping an EOFStage) doesn't + // need to be registered. + invariant(!input->collection()); } - PlanExecutor* exec = NULL; - unique_ptr<ClientCursorPin> pin; // either this OR the execHolder will be non-null - unique_ptr<PlanExecutor> execHolder; - { - // This will throw if the sharding version for this connection is out of date. The - // lock must be held continuously from now until we have we created both the output - // ClientCursor and the input executor. This ensures that both are using the same - // sharding version that we synchronize on here. This is also why we always need to - // create a ClientCursor even when we aren't outputting to a cursor. See the comment - // on ShardFilterStage for more details. - AutoGetCollectionForRead ctx(txn, nss.ns()); - - Collection* collection = ctx.getCollection(); - - // This does mongod-specific stuff like creating the input PlanExecutor and adding - // it to the front of the pipeline if needed. - std::shared_ptr<PlanExecutor> input = PipelineD::prepareCursorSource(txn, - collection, - pPipeline, - pCtx); - pPipeline->stitch(); - - // Create the PlanExecutor which returns results from the pipeline. The WorkingSet - // ('ws') and the PipelineProxyStage ('proxy') will be owned by the created - // PlanExecutor. - unique_ptr<WorkingSet> ws(new WorkingSet()); - unique_ptr<PipelineProxyStage> proxy( - new PipelineProxyStage(pPipeline, input, ws.get())); - Status execStatus = Status::OK(); - if (NULL == collection) { - execStatus = PlanExecutor::make(txn, - ws.release(), - proxy.release(), - nss.ns(), - PlanExecutor::YIELD_MANUAL, - &exec); - } - else { - execStatus = PlanExecutor::make(txn, - ws.release(), - proxy.release(), - collection, - PlanExecutor::YIELD_MANUAL, - &exec); - } - invariant(execStatus.isOK()); - execHolder.reset(exec); - - if (!collection && input) { - // If we don't have a collection, we won't be able to register any executors, so - // make sure that the input PlanExecutor (likely wrapping an EOFStage) doesn't - // need to be registered. - invariant(!input->collection()); - } - - if (collection) { - const bool isAggCursor = true; // enable special locking behavior - ClientCursor* cursor = new ClientCursor(collection->getCursorManager(), - execHolder.release(), - nss.ns(), - 0, - cmdObj.getOwned(), - isAggCursor); - pin.reset(new ClientCursorPin(collection->getCursorManager(), - cursor->cursorid())); - // Don't add any code between here and the start of the try block. - } - - // At this point, it is safe to release the collection lock. - // - In the case where we have a collection: we will need to reacquire the - // collection lock later when cleaning up our ClientCursorPin. - // - In the case where we don't have a collection: our PlanExecutor won't be - // registered, so it will be safe to clean it up outside the lock. - invariant(NULL == execHolder.get() || NULL == execHolder->collection()); + if (collection) { + const bool isAggCursor = true; // enable special locking behavior + ClientCursor* cursor = new ClientCursor(collection->getCursorManager(), + execHolder.release(), + nss.ns(), + 0, + cmdObj.getOwned(), + isAggCursor); + pin.reset(new ClientCursorPin(collection->getCursorManager(), cursor->cursorid())); + // Don't add any code between here and the start of the try block. } - try { - // Unless set to true, the ClientCursor created above will be deleted on block exit. - bool keepCursor = false; + // At this point, it is safe to release the collection lock. + // - In the case where we have a collection: we will need to reacquire the + // collection lock later when cleaning up our ClientCursorPin. + // - In the case where we don't have a collection: our PlanExecutor won't be + // registered, so it will be safe to clean it up outside the lock. + invariant(NULL == execHolder.get() || NULL == execHolder->collection()); + } - const bool isCursorCommand = !cmdObj["cursor"].eoo(); + try { + // Unless set to true, the ClientCursor created above will be deleted on block exit. + bool keepCursor = false; - // If both explain and cursor are specified, explain wins. - if (pPipeline->isExplain()) { - result << "stages" << Value(pPipeline->writeExplainOps()); - } - else if (isCursorCommand) { - keepCursor = handleCursorCommand(txn, - nss.ns(), - pin.get(), - exec, - cmdObj, - result); - } - else { - pPipeline->run(result); - } + const bool isCursorCommand = !cmdObj["cursor"].eoo(); - // Clean up our ClientCursorPin, if needed. We must reacquire the collection lock - // in order to do so. - if (pin) { - // We acquire locks here with DBLock and CollectionLock instead of using - // AutoGetCollectionForRead. AutoGetCollectionForRead will throw if the - // sharding version is out of date, and we don't care if the sharding version - // has changed. - Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_IS); - Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_IS); - if (keepCursor) { - pin->release(); - } - else { - pin->deleteUnderlying(); - } - } + // If both explain and cursor are specified, explain wins. + if (pPipeline->isExplain()) { + result << "stages" << Value(pPipeline->writeExplainOps()); + } else if (isCursorCommand) { + keepCursor = handleCursorCommand(txn, nss.ns(), pin.get(), exec, cmdObj, result); + } else { + pPipeline->run(result); } - catch (...) { - // On our way out of scope, we clean up our ClientCursorPin if needed. - if (pin) { - Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_IS); - Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_IS); + + // Clean up our ClientCursorPin, if needed. We must reacquire the collection lock + // in order to do so. + if (pin) { + // We acquire locks here with DBLock and CollectionLock instead of using + // AutoGetCollectionForRead. AutoGetCollectionForRead will throw if the + // sharding version is out of date, and we don't care if the sharding version + // has changed. + Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_IS); + Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_IS); + if (keepCursor) { + pin->release(); + } else { pin->deleteUnderlying(); } - throw; } - // Any code that needs the cursor pinned must be inside the try block, above. - - return true; + } catch (...) { + // On our way out of scope, we clean up our ClientCursorPin if needed. + if (pin) { + Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_IS); + Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_IS); + pin->deleteUnderlying(); + } + throw; } - } cmdPipeline; + // Any code that needs the cursor pinned must be inside the try block, above. + + return true; + } +} cmdPipeline; -} // namespace mongo +} // namespace mongo |