diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 00:22:50 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 10:56:02 -0400 |
commit | 9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch) | |
tree | 3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/db/commands/getmore_cmd.cpp | |
parent | 01965cf52bce6976637ecb8f4a622aeb05ab256a (diff) | |
download | mongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz |
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/db/commands/getmore_cmd.cpp')
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 590 |
1 files changed, 301 insertions, 289 deletions
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index e075fbd047e..23805bf1123 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -57,351 +57,363 @@ namespace mongo { - /** - * A command for running getMore() against an existing cursor registered with a CursorManager. - * Used to generate the next batch of results for a ClientCursor. - * - * Can be used in combination with any cursor-generating command (e.g. find, aggregate, - * listIndexes). - */ - class GetMoreCmd : public Command { - MONGO_DISALLOW_COPYING(GetMoreCmd); - public: - GetMoreCmd() : Command("getMore") { } +/** + * A command for running getMore() against an existing cursor registered with a CursorManager. + * Used to generate the next batch of results for a ClientCursor. + * + * Can be used in combination with any cursor-generating command (e.g. find, aggregate, + * listIndexes). + */ +class GetMoreCmd : public Command { + MONGO_DISALLOW_COPYING(GetMoreCmd); - bool isWriteCommandForConfigServer() const override { return false; } +public: + GetMoreCmd() : Command("getMore") {} - bool slaveOk() const override { return false; } + bool isWriteCommandForConfigServer() const override { + return false; + } - bool slaveOverrideOk() const override { return true; } + bool slaveOk() const override { + return false; + } - bool maintenanceOk() const override { return false; } + bool slaveOverrideOk() const override { + return true; + } - bool adminOnly() const override { return false; } + bool maintenanceOk() const override { + return false; + } - void help(std::stringstream& help) const override { - help << "retrieve more results from an existing cursor"; - } + bool adminOnly() const override { + return false; + } - /** - * A getMore command increments the getMore counter, not the command counter. - */ - bool shouldAffectCommandCounter() const override { return false; } + void help(std::stringstream& help) const override { + help << "retrieve more results from an existing cursor"; + } - std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { - return GetMoreRequest::parseNs(dbname, cmdObj); + /** + * A getMore command increments the getMore counter, not the command counter. + */ + bool shouldAffectCommandCounter() const override { + return false; + } + + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { + return GetMoreRequest::parseNs(dbname, cmdObj); + } + + Status checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj) override { + StatusWith<GetMoreRequest> parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj); + if (!parseStatus.isOK()) { + return parseStatus.getStatus(); + } + const GetMoreRequest& request = parseStatus.getValue(); + + return AuthorizationSession::get(client) + ->checkAuthForGetMore(request.nss, request.cursorid); + } + + bool run(OperationContext* txn, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) override { + // Counted as a getMore, not as a command. + globalOpCounters.gotGetMore(); + + if (txn->getClient()->isInDirectClient()) { + return appendCommandStatus( + result, + Status(ErrorCodes::IllegalOperation, "Cannot run getMore command from eval()")); } - Status checkAuthForCommand(ClientBasic* client, - const std::string& dbname, - const BSONObj& cmdObj) override { - StatusWith<GetMoreRequest> parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj); - if (!parseStatus.isOK()) { - return parseStatus.getStatus(); + StatusWith<GetMoreRequest> parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj); + if (!parseStatus.isOK()) { + return appendCommandStatus(result, parseStatus.getStatus()); + } + const GetMoreRequest& request = parseStatus.getValue(); + + // Depending on the type of cursor being operated on, we hold locks for the whole + // getMore, or none of the getMore, or part of the getMore. The three cases in detail: + // + // 1) Normal cursor: we lock with "ctx" and hold it for the whole getMore. + // 2) Cursor owned by global cursor manager: we don't lock anything. These cursors + // don't own any collection state. + // 3) Agg cursor: we lock with "ctx", then release, then relock with "unpinDBLock" and + // "unpinCollLock". This is because agg cursors handle locking internally (hence the + // release), but the pin and unpin of the cursor must occur under the collection + // lock. We don't use our AutoGetCollectionForRead "ctx" to relock, because + // AutoGetCollectionForRead checks the sharding version (and we want the relock for + // the unpin to succeed even if the sharding version has changed). + // + // Note that we declare our locks before our ClientCursorPin, in order to ensure that + // the pin's destructor is called before the lock destructors (so that the unpin occurs + // under the lock). + std::unique_ptr<AutoGetCollectionForRead> ctx; + std::unique_ptr<Lock::DBLock> unpinDBLock; + std::unique_ptr<Lock::CollectionLock> unpinCollLock; + + CursorManager* cursorManager; + CursorManager* globalCursorManager = CursorManager::getGlobalCursorManager(); + if (globalCursorManager->ownsCursorId(request.cursorid)) { + cursorManager = globalCursorManager; + } else { + ctx.reset(new AutoGetCollectionForRead(txn, request.nss)); + Collection* collection = ctx->getCollection(); + if (!collection) { + return appendCommandStatus(result, + Status(ErrorCodes::OperationFailed, + "collection dropped between getMore calls")); } - const GetMoreRequest& request = parseStatus.getValue(); + cursorManager = collection->getCursorManager(); + } - return AuthorizationSession::get(client)->checkAuthForGetMore(request.nss, - request.cursorid); + ClientCursorPin ccPin(cursorManager, request.cursorid); + ClientCursor* cursor = ccPin.c(); + if (!cursor) { + // We didn't find the cursor. + return appendCommandStatus( + result, + Status(ErrorCodes::CursorNotFound, + str::stream() << "Cursor not found, cursor id: " << request.cursorid)); } - bool run(OperationContext* txn, - const std::string& dbname, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result) override { - // Counted as a getMore, not as a command. - globalOpCounters.gotGetMore(); + if (request.nss.ns() != cursor->ns()) { + return appendCommandStatus( + result, + Status(ErrorCodes::Unauthorized, + str::stream() << "Requested getMore on namespace '" << request.nss.ns() + << "', but cursor belongs to a different namespace")); + } - if (txn->getClient()->isInDirectClient()) { - return appendCommandStatus(result, - Status(ErrorCodes::IllegalOperation, - "Cannot run getMore command from eval()")); - } + const bool hasOwnMaxTime = CurOp::get(txn)->isMaxTimeSet(); - StatusWith<GetMoreRequest> parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj); - if (!parseStatus.isOK()) { - return appendCommandStatus(result, parseStatus.getStatus()); - } - const GetMoreRequest& request = parseStatus.getValue(); - - // Depending on the type of cursor being operated on, we hold locks for the whole - // getMore, or none of the getMore, or part of the getMore. The three cases in detail: - // - // 1) Normal cursor: we lock with "ctx" and hold it for the whole getMore. - // 2) Cursor owned by global cursor manager: we don't lock anything. These cursors - // don't own any collection state. - // 3) Agg cursor: we lock with "ctx", then release, then relock with "unpinDBLock" and - // "unpinCollLock". This is because agg cursors handle locking internally (hence the - // release), but the pin and unpin of the cursor must occur under the collection - // lock. We don't use our AutoGetCollectionForRead "ctx" to relock, because - // AutoGetCollectionForRead checks the sharding version (and we want the relock for - // the unpin to succeed even if the sharding version has changed). - // - // Note that we declare our locks before our ClientCursorPin, in order to ensure that - // the pin's destructor is called before the lock destructors (so that the unpin occurs - // under the lock). - std::unique_ptr<AutoGetCollectionForRead> ctx; - std::unique_ptr<Lock::DBLock> unpinDBLock; - std::unique_ptr<Lock::CollectionLock> unpinCollLock; - - CursorManager* cursorManager; - CursorManager* globalCursorManager = CursorManager::getGlobalCursorManager(); - if (globalCursorManager->ownsCursorId(request.cursorid)) { - cursorManager = globalCursorManager; - } - else { - ctx.reset(new AutoGetCollectionForRead(txn, request.nss)); - Collection* collection = ctx->getCollection(); - if (!collection) { - return appendCommandStatus(result, - Status(ErrorCodes::OperationFailed, - "collection dropped between getMore calls")); - } - cursorManager = collection->getCursorManager(); - } + // Validation related to awaitData. + if (isCursorAwaitData(cursor)) { + invariant(isCursorTailable(cursor)); - ClientCursorPin ccPin(cursorManager, request.cursorid); - ClientCursor* cursor = ccPin.c(); - if (!cursor) { - // We didn't find the cursor. - return appendCommandStatus(result, Status(ErrorCodes::CursorNotFound, str::stream() - << "Cursor not found, cursor id: " << request.cursorid)); + if (!hasOwnMaxTime) { + Status status(ErrorCodes::BadValue, + str::stream() << "Must set maxTimeMS on a getMore if the initial " + << "query had 'awaitData' set: " << cmdObj); + return appendCommandStatus(result, status); } - if (request.nss.ns() != cursor->ns()) { - return appendCommandStatus(result, Status(ErrorCodes::Unauthorized, str::stream() - << "Requested getMore on namespace '" << request.nss.ns() - << "', but cursor belongs to a different namespace")); + if (cursor->isAggCursor()) { + Status status(ErrorCodes::BadValue, + "awaitData cannot be set on an aggregation cursor"); + return appendCommandStatus(result, status); } + } - const bool hasOwnMaxTime = CurOp::get(txn)->isMaxTimeSet(); - - // Validation related to awaitData. - if (isCursorAwaitData(cursor)) { - invariant(isCursorTailable(cursor)); + // On early return, get rid of the cursor. + ScopeGuard cursorFreer = MakeGuard(&GetMoreCmd::cleanupCursor, txn, &ccPin, request); - if (!hasOwnMaxTime) { - Status status(ErrorCodes::BadValue, - str::stream() << "Must set maxTimeMS on a getMore if the initial " - << "query had 'awaitData' set: " << cmdObj); - return appendCommandStatus(result, status); - } + if (!cursor->hasRecoveryUnit()) { + // Start using a new RecoveryUnit. + cursor->setOwnedRecoveryUnit( + getGlobalServiceContext()->getGlobalStorageEngine()->newRecoveryUnit()); + } - if (cursor->isAggCursor()) { - Status status(ErrorCodes::BadValue, - "awaitData cannot be set on an aggregation cursor"); - return appendCommandStatus(result, status); - } - } + // Swap RecoveryUnit(s) between the ClientCursor and OperationContext. + ScopedRecoveryUnitSwapper ruSwapper(cursor, txn); - // On early return, get rid of the cursor. - ScopeGuard cursorFreer = MakeGuard(&GetMoreCmd::cleanupCursor, txn, &ccPin, request); + // Reset timeout timer on the cursor since the cursor is still in use. + cursor->setIdleTime(0); - if (!cursor->hasRecoveryUnit()) { - // Start using a new RecoveryUnit. - cursor->setOwnedRecoveryUnit( - getGlobalServiceContext()->getGlobalStorageEngine()->newRecoveryUnit()); - } + // If there is no time limit set directly on this getMore command, but the operation + // that spawned this cursor had a time limit set, then we have to apply any leftover + // time to this getMore. + if (!hasOwnMaxTime) { + CurOp::get(txn)->setMaxTimeMicros(cursor->getLeftoverMaxTimeMicros()); + } + txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. - // Swap RecoveryUnit(s) between the ClientCursor and OperationContext. - ScopedRecoveryUnitSwapper ruSwapper(cursor, txn); + if (cursor->isAggCursor()) { + // Agg cursors handle their own locking internally. + ctx.reset(); // unlocks + } - // Reset timeout timer on the cursor since the cursor is still in use. - cursor->setIdleTime(0); + PlanExecutor* exec = cursor->getExecutor(); + exec->restoreState(txn); - // If there is no time limit set directly on this getMore command, but the operation - // that spawned this cursor had a time limit set, then we have to apply any leftover - // time to this getMore. - if (!hasOwnMaxTime) { - CurOp::get(txn)->setMaxTimeMicros(cursor->getLeftoverMaxTimeMicros()); - } - txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. + // If we're tailing a capped collection, retrieve a monotonically increasing insert + // counter. + uint64_t lastInsertCount = 0; + if (isCursorAwaitData(cursor)) { + invariant(ctx->getCollection()->isCapped()); + lastInsertCount = ctx->getCollection()->getCappedInsertNotifier()->getCount(); + } - if (cursor->isAggCursor()) { - // Agg cursors handle their own locking internally. - ctx.reset(); // unlocks - } + CursorId respondWithId = 0; + BSONArrayBuilder nextBatch; + BSONObj obj; + PlanExecutor::ExecState state; + int numResults = 0; + Status batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults); + if (!batchStatus.isOK()) { + return appendCommandStatus(result, batchStatus); + } - PlanExecutor* exec = cursor->getExecutor(); + // If this is an await data cursor, and we hit EOF without generating any results, then + // we block waiting for new oplog data to arrive. + if (isCursorAwaitData(cursor) && state == PlanExecutor::IS_EOF && numResults == 0) { + // Retrieve the notifier which we will wait on until new data arrives. We make sure + // to do this in the lock because once we drop the lock it is possible for the + // collection to become invalid. The notifier itself will outlive the collection if + // the collection is dropped, as we keep a shared_ptr to it. + auto notifier = ctx->getCollection()->getCappedInsertNotifier(); + + // Save the PlanExecutor and drop our locks. + exec->saveState(); + ctx.reset(); + + // Block waiting for data. + Microseconds timeout(CurOp::get(txn)->getRemainingMaxTimeMicros()); + notifier->waitForInsert(lastInsertCount, timeout); + notifier.reset(); + + ctx.reset(new AutoGetCollectionForRead(txn, request.nss)); exec->restoreState(txn); - // If we're tailing a capped collection, retrieve a monotonically increasing insert - // counter. - uint64_t lastInsertCount = 0; - if (isCursorAwaitData(cursor)) { - invariant(ctx->getCollection()->isCapped()); - lastInsertCount = ctx->getCollection()->getCappedInsertNotifier()->getCount(); - } - - CursorId respondWithId = 0; - BSONArrayBuilder nextBatch; - BSONObj obj; - PlanExecutor::ExecState state; - int numResults = 0; - Status batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults); + // We woke up because either the timed_wait expired, or there was more data. Either + // way, attempt to generate another batch of results. + batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults); if (!batchStatus.isOK()) { return appendCommandStatus(result, batchStatus); } + } - // If this is an await data cursor, and we hit EOF without generating any results, then - // we block waiting for new oplog data to arrive. - if (isCursorAwaitData(cursor) && state == PlanExecutor::IS_EOF && numResults == 0) { - // Retrieve the notifier which we will wait on until new data arrives. We make sure - // to do this in the lock because once we drop the lock it is possible for the - // collection to become invalid. The notifier itself will outlive the collection if - // the collection is dropped, as we keep a shared_ptr to it. - auto notifier = ctx->getCollection()->getCappedInsertNotifier(); - - // Save the PlanExecutor and drop our locks. - exec->saveState(); - ctx.reset(); - - // Block waiting for data. - Microseconds timeout(CurOp::get(txn)->getRemainingMaxTimeMicros()); - notifier->waitForInsert(lastInsertCount, timeout); - notifier.reset(); - - ctx.reset(new AutoGetCollectionForRead(txn, request.nss)); - exec->restoreState(txn); - - // We woke up because either the timed_wait expired, or there was more data. Either - // way, attempt to generate another batch of results. - batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults); - if (!batchStatus.isOK()) { - return appendCommandStatus(result, batchStatus); - } - } - - if (shouldSaveCursorGetMore(state, exec, isCursorTailable(cursor))) { - respondWithId = request.cursorid; + if (shouldSaveCursorGetMore(state, exec, isCursorTailable(cursor))) { + respondWithId = request.cursorid; - exec->saveState(); + exec->saveState(); - // If maxTimeMS was set directly on the getMore rather than being rolled over - // from a previous find, then don't roll remaining micros over to the next - // getMore. - if (!hasOwnMaxTime) { - cursor->setLeftoverMaxTimeMicros(CurOp::get(txn)->getRemainingMaxTimeMicros()); - } + // If maxTimeMS was set directly on the getMore rather than being rolled over + // from a previous find, then don't roll remaining micros over to the next + // getMore. + if (!hasOwnMaxTime) { + cursor->setLeftoverMaxTimeMicros(CurOp::get(txn)->getRemainingMaxTimeMicros()); + } - cursor->incPos(numResults); + cursor->incPos(numResults); - if (isCursorTailable(cursor) && state == PlanExecutor::IS_EOF) { - // Rather than swapping their existing RU into the client cursor, tailable - // cursors should get a new recovery unit. - ruSwapper.dismiss(); - } - } - else { - CurOp::get(txn)->debug().cursorExhausted = true; + if (isCursorTailable(cursor) && state == PlanExecutor::IS_EOF) { + // Rather than swapping their existing RU into the client cursor, tailable + // cursors should get a new recovery unit. + ruSwapper.dismiss(); } + } else { + CurOp::get(txn)->debug().cursorExhausted = true; + } - appendGetMoreResponseObject(respondWithId, request.nss.ns(), nextBatch.arr(), &result); + appendGetMoreResponseObject(respondWithId, request.nss.ns(), nextBatch.arr(), &result); - if (respondWithId) { - cursorFreer.Dismiss(); + if (respondWithId) { + cursorFreer.Dismiss(); - // If we are operating on an aggregation cursor, then we dropped our collection lock - // earlier and need to reacquire it in order to clean up our ClientCursorPin. - if (cursor->isAggCursor()) { - invariant(NULL == ctx.get()); - unpinDBLock.reset( - new Lock::DBLock(txn->lockState(), request.nss.db(), MODE_IS)); - unpinCollLock.reset( - new Lock::CollectionLock(txn->lockState(), request.nss.ns(), MODE_IS)); - } + // If we are operating on an aggregation cursor, then we dropped our collection lock + // earlier and need to reacquire it in order to clean up our ClientCursorPin. + if (cursor->isAggCursor()) { + invariant(NULL == ctx.get()); + unpinDBLock.reset(new Lock::DBLock(txn->lockState(), request.nss.db(), MODE_IS)); + unpinCollLock.reset( + new Lock::CollectionLock(txn->lockState(), request.nss.ns(), MODE_IS)); } - - return true; } - /** - * Uses 'cursor' and 'request' to fill out 'nextBatch' with the batch of result documents to - * be returned by this getMore. - * - * Returns the number of documents in the batch in *numResults, which must be initialized to - * zero by the caller. Returns the final ExecState returned by the cursor in *state. - * - * Returns an OK status if the batch was successfully generated, and a non-OK status if the - * PlanExecutor encounters a failure. - */ - Status generateBatch(ClientCursor* cursor, - const GetMoreRequest& request, - BSONArrayBuilder* nextBatch, - PlanExecutor::ExecState* state, - int* numResults) { - PlanExecutor* exec = cursor->getExecutor(); - const bool isAwaitData = isCursorAwaitData(cursor); - - // If an awaitData getMore is killed during this process due to our max time expiring at - // an interrupt point, we just continue as normal and return rather than reporting a - // timeout to the user. - BSONObj obj; - try { - while (PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, NULL))) { - // If adding this object will cause us to exceed the BSON size limit, then we - // stash it for later. - if (nextBatch->len() + obj.objsize() > BSONObjMaxUserSize && *numResults > 0) { - exec->enqueue(obj); - break; - } - - // Add result to output buffer. - nextBatch->append(obj); - (*numResults)++; - - if (enoughForGetMore(request.batchSize.value_or(0), - *numResults, nextBatch->len())) { - break; - } - } - } - catch (const UserException& except) { - if (isAwaitData && except.getCode() == ErrorCodes::ExceededTimeLimit) { - // We ignore exceptions from interrupt points due to max time expiry for - // awaitData cursors. - } - else { - throw; + return true; + } + + /** + * Uses 'cursor' and 'request' to fill out 'nextBatch' with the batch of result documents to + * be returned by this getMore. + * + * Returns the number of documents in the batch in *numResults, which must be initialized to + * zero by the caller. Returns the final ExecState returned by the cursor in *state. + * + * Returns an OK status if the batch was successfully generated, and a non-OK status if the + * PlanExecutor encounters a failure. + */ + Status generateBatch(ClientCursor* cursor, + const GetMoreRequest& request, + BSONArrayBuilder* nextBatch, + PlanExecutor::ExecState* state, + int* numResults) { + PlanExecutor* exec = cursor->getExecutor(); + const bool isAwaitData = isCursorAwaitData(cursor); + + // If an awaitData getMore is killed during this process due to our max time expiring at + // an interrupt point, we just continue as normal and return rather than reporting a + // timeout to the user. + BSONObj obj; + try { + while (PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, NULL))) { + // If adding this object will cause us to exceed the BSON size limit, then we + // stash it for later. + if (nextBatch->len() + obj.objsize() > BSONObjMaxUserSize && *numResults > 0) { + exec->enqueue(obj); + break; } - } - if (PlanExecutor::FAILURE == *state || PlanExecutor::DEAD == *state) { - const std::unique_ptr<PlanStageStats> stats(exec->getStats()); - error() << "GetMore command executor error: " << PlanExecutor::statestr(*state) - << ", stats: " << Explain::statsToBSON(*stats); + // Add result to output buffer. + nextBatch->append(obj); + (*numResults)++; - return Status(ErrorCodes::OperationFailed, - str::stream() << "GetMore command executor error: " - << WorkingSetCommon::toStatusString(obj)); + if (enoughForGetMore( + request.batchSize.value_or(0), *numResults, nextBatch->len())) { + break; + } + } + } catch (const UserException& except) { + if (isAwaitData && except.getCode() == ErrorCodes::ExceededTimeLimit) { + // We ignore exceptions from interrupt points due to max time expiry for + // awaitData cursors. + } else { + throw; } - - return Status::OK(); } - /** - * Called via a ScopeGuard on early return in order to ensure that the ClientCursor gets - * cleaned up properly. - */ - static void cleanupCursor(OperationContext* txn, - ClientCursorPin* ccPin, - const GetMoreRequest& request) { - ClientCursor* cursor = ccPin->c(); + if (PlanExecutor::FAILURE == *state || PlanExecutor::DEAD == *state) { + const std::unique_ptr<PlanStageStats> stats(exec->getStats()); + error() << "GetMore command executor error: " << PlanExecutor::statestr(*state) + << ", stats: " << Explain::statsToBSON(*stats); - std::unique_ptr<Lock::DBLock> unpinDBLock; - std::unique_ptr<Lock::CollectionLock> unpinCollLock; + return Status(ErrorCodes::OperationFailed, + str::stream() << "GetMore command executor error: " + << WorkingSetCommon::toStatusString(obj)); + } - if (cursor->isAggCursor()) { - unpinDBLock.reset(new Lock::DBLock(txn->lockState(), request.nss.db(), MODE_IS)); - unpinCollLock.reset( - new Lock::CollectionLock(txn->lockState(), request.nss.ns(), MODE_IS)); - } + return Status::OK(); + } - ccPin->deleteUnderlying(); + /** + * Called via a ScopeGuard on early return in order to ensure that the ClientCursor gets + * cleaned up properly. + */ + static void cleanupCursor(OperationContext* txn, + ClientCursorPin* ccPin, + const GetMoreRequest& request) { + ClientCursor* cursor = ccPin->c(); + + std::unique_ptr<Lock::DBLock> unpinDBLock; + std::unique_ptr<Lock::CollectionLock> unpinCollLock; + + if (cursor->isAggCursor()) { + unpinDBLock.reset(new Lock::DBLock(txn->lockState(), request.nss.db(), MODE_IS)); + unpinCollLock.reset( + new Lock::CollectionLock(txn->lockState(), request.nss.ns(), MODE_IS)); } - } getMoreCmd; + ccPin->deleteUnderlying(); + } + +} getMoreCmd; -} // namespace mongo +} // namespace mongo |