From 1e8f34fc476705888f7dec8d06c780de4e556988 Mon Sep 17 00:00:00 2001 From: David Storch Date: Thu, 17 Nov 2016 18:11:23 -0500 Subject: SERVER-27065 cleanup ClientCursor, ClientCursorPin, and CursorManager - Makes cursors come into existence pinned. This fixes a race condition in which a cursor could time out in between being constructed/retrieved and being pinned. - Reduces the public interface of ClientCursor. In particular, makes ClientCursor's constructor and destructor private. - Cleans up header file comments in order to more clearly indicate expected usage. --- src/mongo/db/commands/find_cmd.cpp | 25 +++++++------- src/mongo/db/commands/getmore_cmd.cpp | 19 +++++------ src/mongo/db/commands/list_collections.cpp | 11 +++---- src/mongo/db/commands/list_indexes.cpp | 11 +++---- src/mongo/db/commands/parallel_collection_scan.cpp | 18 +++++----- src/mongo/db/commands/pipeline_command.cpp | 38 ++++++++++------------ src/mongo/db/commands/repair_cursor.cpp | 16 ++++----- 7 files changed, 65 insertions(+), 73 deletions(-) (limited to 'src/mongo/db/commands') diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index e86f3aa9cc6..dc9e5e977cd 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -390,26 +390,25 @@ public: // First unregister the PlanExecutor so it can be re-registered with ClientCursor. exec->deregisterExec(); - // Create a ClientCursor containing this plan executor. We don't have to worry about - // leaking it as it's inserted into a global map by its ctor. - ClientCursor* cursor = - new ClientCursor(collection->getCursorManager(), - exec.release(), - nss.ns(), - txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), - originalQR.getOptions(), - cmdObj.getOwned()); - cursorId = cursor->cursorid(); + // Create a ClientCursor containing this plan executor and register it with the cursor + // manager. + ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor( + {exec.release(), + nss.ns(), + txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + originalQR.getOptions(), + cmdObj.getOwned()}); + cursorId = pinnedCursor.getCursor()->cursorid(); invariant(!exec); - PlanExecutor* cursorExec = cursor->getExecutor(); + PlanExecutor* cursorExec = pinnedCursor.getCursor()->getExecutor(); // State will be restored on getMore. cursorExec->saveState(); cursorExec->detachFromOperationContext(); - cursor->setLeftoverMaxTimeMicros(txn->getRemainingMaxTimeMicros()); - cursor->setPos(numResults); + pinnedCursor.getCursor()->setLeftoverMaxTimeMicros(txn->getRemainingMaxTimeMicros()); + pinnedCursor.getCursor()->setPos(numResults); // Fill out curop based on the results. endQueryOp(txn, collection, *cursorExec, numResults, cursorId); diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index fa09cc4438d..cb93562e5df 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -236,16 +236,14 @@ public: cursorManager = collection->getCursorManager(); } - ClientCursorPin ccPin(cursorManager, request.cursorid); - ClientCursor* cursor = ccPin.c(); - if (!cursor) { + auto ccPin = cursorManager->pinCursor(request.cursorid); + if (!ccPin.isOK()) { // We didn't find the cursor. - return appendCommandStatus( - result, - Status(ErrorCodes::CursorNotFound, - str::stream() << "Cursor not found, cursor id: " << request.cursorid)); + return appendCommandStatus(result, ccPin.getStatus()); } + ClientCursor* cursor = ccPin.getValue().getCursor(); + // If the fail point is enabled, busy wait until it is disabled. We unlock and re-acquire // the locks periodically in order to avoid deadlock (see SERVER-21997 for details). while (MONGO_FAIL_POINT(keepCursorPinnedDuringGetMore)) { @@ -291,13 +289,14 @@ public: } // On early return, get rid of the cursor. - ScopeGuard cursorFreer = MakeGuard(&GetMoreCmd::cleanupCursor, txn, &ccPin, request); + ScopeGuard cursorFreer = + MakeGuard(&GetMoreCmd::cleanupCursor, txn, &ccPin.getValue(), request); if (cursor->isReadCommitted()) uassertStatusOK(txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); // Reset timeout timer on the cursor since the cursor is still in use. - cursor->setIdleTime(0); + cursor->resetIdleTime(); const bool hasOwnMaxTime = txn->hasDeadline(); @@ -563,7 +562,7 @@ public: static void cleanupCursor(OperationContext* txn, ClientCursorPin* ccPin, const GetMoreRequest& request) { - ClientCursor* cursor = ccPin->c(); + ClientCursor* cursor = ccPin->getCursor(); std::unique_ptr unpinDBLock; std::unique_ptr unpinCollLock; diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp index 114d98faf55..0309abe8c7c 100644 --- a/src/mongo/db/commands/list_collections.cpp +++ b/src/mongo/db/commands/list_collections.cpp @@ -320,12 +320,11 @@ public: if (!exec->isEOF()) { exec->saveState(); exec->detachFromOperationContext(); - ClientCursor* cursor = - new ClientCursor(CursorManager::getGlobalCursorManager(), - exec.release(), - cursorNss.ns(), - txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()); - cursorId = cursor->cursorid(); + auto pinnedCursor = CursorManager::getGlobalCursorManager()->registerCursor( + {exec.release(), + cursorNss.ns(), + txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()}); + cursorId = pinnedCursor.getCursor()->cursorid(); } appendCursorResponseObject(cursorId, cursorNss.ns(), firstBatch.arr(), &result); diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp index 53e563ab7bd..2fc5e07545b 100644 --- a/src/mongo/db/commands/list_indexes.cpp +++ b/src/mongo/db/commands/list_indexes.cpp @@ -227,12 +227,11 @@ public: if (!exec->isEOF()) { exec->saveState(); exec->detachFromOperationContext(); - ClientCursor* cursor = - new ClientCursor(CursorManager::getGlobalCursorManager(), - exec.release(), - cursorNss.ns(), - txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()); - cursorId = cursor->cursorid(); + auto pinnedCursor = CursorManager::getGlobalCursorManager()->registerCursor( + {exec.release(), + cursorNss.ns(), + txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()}); + cursorId = pinnedCursor.getCursor()->cursorid(); } appendCursorResponseObject(cursorId, cursorNss.ns(), firstBatch.arr(), &result); diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp index 77dcfefaa26..6fee00a712f 100644 --- a/src/mongo/db/commands/parallel_collection_scan.cpp +++ b/src/mongo/db/commands/parallel_collection_scan.cpp @@ -147,17 +147,17 @@ public: exec->saveState(); exec->detachFromOperationContext(); - // transfer ownership of an executor to the ClientCursor (which manages its own - // lifetime). - ClientCursor* cc = - new ClientCursor(collection->getCursorManager(), - exec.release(), - ns.ns(), - txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()); - cc->setLeftoverMaxTimeMicros(txn->getRemainingMaxTimeMicros()); + // Create and regiter a new ClientCursor. + auto pinnedCursor = collection->getCursorManager()->registerCursor( + {exec.release(), + ns.ns(), + txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()}); + pinnedCursor.getCursor()->setLeftoverMaxTimeMicros( + txn->getRemainingMaxTimeMicros()); BSONObjBuilder threadResult; - appendCursorResponseObject(cc->cursorid(), ns.ns(), BSONArray(), &threadResult); + appendCursorResponseObject( + pinnedCursor.getCursor()->cursorid(), ns.ns(), BSONArray(), &threadResult); threadResult.appendBool("ok", 1); bucketsBuilder.append(threadResult.obj()); diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 061c76b4883..2eb1c49649d 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" +#include #include #include @@ -89,13 +90,11 @@ namespace { */ bool handleCursorCommand(OperationContext* txn, const string& nsForCursor, - ClientCursorPin* pin, + ClientCursor* cursor, PlanExecutor* exec, const AggregationRequest& request, BSONObjBuilder& result) { - ClientCursor* cursor = pin ? pin->c() : NULL; - if (pin) { - invariant(cursor); + if (cursor) { invariant(cursor->getExecutor() == exec); invariant(cursor->isAggCursor()); } @@ -132,14 +131,14 @@ bool handleCursorCommand(OperationContext* txn, } // NOTE: exec->isEOF() can have side effects such as writing by $out. However, it should - // be relatively quick since if there was no pin then the input is empty. Also, this + // be relatively quick since if there was no cursor then the input is empty. Also, this // violates the contract for batchSize==0. Sharding requires a cursor to be returned in that // case. This is ok for now however, since you can't have a sharded collection that doesn't // exist. - const bool canReturnMoreBatches = pin; + const bool canReturnMoreBatches = cursor; if (!canReturnMoreBatches && exec && !exec->isEOF()) { // msgasserting since this shouldn't be possible to trigger from today's aggregation - // language. The wording assumes that the only reason pin would be null is if the + // language. The wording assumes that the only reason cursor would be null is if the // collection doesn't exist. msgasserted( 17391, @@ -361,7 +360,7 @@ public: } expCtx->resolvedNamespaces = std::move(resolvedNamespaces.getValue()); - unique_ptr pin; // either this OR the exec will be non-null + boost::optional pin; // either this OR the exec will be non-null unique_ptr exec; boost::intrusive_ptr pipeline; auto curOp = CurOp::get(txn); @@ -514,15 +513,13 @@ public: if (collection) { const bool isAggCursor = true; // enable special locking behavior - ClientCursor* cursor = - new ClientCursor(collection->getCursorManager(), - exec.release(), - nss.ns(), - txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), - 0, - cmdObj.getOwned(), - isAggCursor); - pin.reset(new ClientCursorPin(collection->getCursorManager(), cursor->cursorid())); + pin.emplace(collection->getCursorManager()->registerCursor( + {exec.release(), + nss.ns(), + txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + 0, + cmdObj.getOwned(), + isAggCursor})); // Don't add any code between here and the start of the try block. } @@ -559,8 +556,8 @@ public: } else if (request.isCursorCommand()) { keepCursor = handleCursorCommand(txn, origNss.ns(), - pin.get(), - pin ? pin->c()->getExecutor() : exec.get(), + pin ? pin->getCursor() : nullptr, + pin ? pin->getCursor()->getExecutor() : exec.get(), request, result); } else { @@ -569,7 +566,8 @@ public: if (!expCtx->isExplain) { PlanSummaryStats stats; - Explain::getSummaryStats(pin ? *pin->c()->getExecutor() : *exec.get(), &stats); + Explain::getSummaryStats(pin ? *pin->getCursor()->getExecutor() : *exec.get(), + &stats); curOp->debug().setPlanSummaryMetrics(stats); curOp->debug().nreturned = stats.nReturned; } diff --git a/src/mongo/db/commands/repair_cursor.cpp b/src/mongo/db/commands/repair_cursor.cpp index 1e04075ab58..1db01e81dd2 100644 --- a/src/mongo/db/commands/repair_cursor.cpp +++ b/src/mongo/db/commands/repair_cursor.cpp @@ -105,15 +105,13 @@ public: exec->saveState(); exec->detachFromOperationContext(); - // ClientCursors' constructor inserts them into a global map that manages their - // lifetimes. That is why the next line isn't leaky. - ClientCursor* cc = - new ClientCursor(collection->getCursorManager(), - exec.release(), - ns.ns(), - txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()); - - appendCursorResponseObject(cc->cursorid(), ns.ns(), BSONArray(), &result); + auto pinnedCursor = collection->getCursorManager()->registerCursor( + {exec.release(), + ns.ns(), + txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()}); + + appendCursorResponseObject( + pinnedCursor.getCursor()->cursorid(), ns.ns(), BSONArray(), &result); return true; } -- cgit v1.2.1