diff options
Diffstat (limited to 'src/mongo/db/commands/getmore_cmd.cpp')
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 112 |
1 files changed, 83 insertions, 29 deletions
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 1e57c922448..901eaea3306 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -67,6 +67,37 @@ namespace { MONGO_FP_DECLARE(rsStopGetMoreCmd); +// Helper function which sets the 'msg' field of the opCtx's CurOp to the specified string, and +// returns the original value of the field. +std::string updateCurOpMsg(OperationContext* opCtx, const std::string& newMsg) { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + auto oldMsg = CurOp::get(opCtx)->getMessage(); + CurOp::get(opCtx)->setMessage_inlock(newMsg.c_str()); + return oldMsg; +} + +// This helper function works much like MONGO_FAIL_POINT_PAUSE_WHILE_SET, but it additionally +// releases and re-acquires the collection readLock at regular intervals, in order to avoid +// deadlocks caused by the pinned-cursor failpoints in this file (see SERVER-21997). Finally, it +// also sets the 'msg' field of the opCtx's CurOp to the given string while the failpoint is active. +void waitWhileFailPointEnabled(FailPoint* failPoint, + OperationContext* opCtx, + const NamespaceString& nss, + const std::string& curOpMsg, + boost::optional<AutoGetCollectionForRead>* readLock) { + invariant(failPoint); + auto origCurOpMsg = updateCurOpMsg(opCtx, curOpMsg); + + while (MONGO_FAIL_POINT((*failPoint))) { + sleepFor(Milliseconds(10)); + if (readLock && *readLock) { + readLock->reset(); + readLock->emplace(opCtx, nss); + } + } + updateCurOpMsg(opCtx, origCurOpMsg); +} + /** * A command for running getMore() against an existing cursor registered with a CursorManager. * Used to generate the next batch of results for a ClientCursor. @@ -228,15 +259,15 @@ public: ClientCursor* cursor = ccPin.getValue().getCursor(); - // If the fail point is enabled, busy wait until it is disabled. - while (MONGO_FAIL_POINT(keepCursorPinnedDuringGetMore)) { - if (readLock) { - // We unlock and re-acquire the locks periodically in order to avoid deadlock (see - // SERVER-21997 for details). - sleepFor(Milliseconds(10)); - readLock.reset(); - readLock.emplace(opCtx, request.nss); - } + // If the 'waitAfterPinningCursorBeforeGetMoreBatch' fail point is enabled, set the 'msg' + // field of this operation's CurOp to signal that we've hit this point and then spin until + // the failpoint is released. + if (MONGO_FAIL_POINT(waitAfterPinningCursorBeforeGetMoreBatch)) { + waitWhileFailPointEnabled(&waitAfterPinningCursorBeforeGetMoreBatch, + opCtx, + request.nss, + "waitAfterPinningCursorBeforeGetMoreBatch", + &readLock); } // A user can only call getMore on their own cursor. If there were multiple users @@ -344,6 +375,18 @@ public: awaitDataState(opCtx).shouldWaitForInserts = true; } + // We're about to begin running the PlanExecutor in order to fill the getMore batch. If the + // 'waitWithPinnedCursorDuringGetMoreBatch' failpoint is active, set the 'msg' field of this + // operation's CurOp to signal that we've hit this point and then spin until the failpoint + // is released. + if (MONGO_FAIL_POINT(waitWithPinnedCursorDuringGetMoreBatch)) { + waitWhileFailPointEnabled(&waitWithPinnedCursorDuringGetMoreBatch, + opCtx, + request.nss, + "waitWithPinnedCursorDuringGetMoreBatch", + &readLock); + } + Status batchStatus = generateBatch(opCtx, cursor, request, &nextBatch, &state, &numResults); if (!batchStatus.isOK()) { return CommandHelpers::appendCommandStatus(result, batchStatus); @@ -388,6 +431,18 @@ public: cursorFreer.Dismiss(); } + // We're about to unpin the cursor as the ClientCursorPin goes out of scope (or delete it, + // if it has been exhausted). If the 'waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch' + // failpoint is active, set the 'msg' field of this operation's CurOp to signal that we've + // hit this point and then spin until the failpoint is released. + if (MONGO_FAIL_POINT(waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch)) { + waitWhileFailPointEnabled(&waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch, + opCtx, + request.nss, + "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch", + &readLock); + } + return true; } @@ -451,28 +506,27 @@ public: return Status::OK(); } - if (PlanExecutor::FAILURE == *state) { - nextBatch->abandon(); - - error() << "GetMore command executor error: " << PlanExecutor::statestr(*state) - << ", stats: " << redact(Explain::getWinningPlanStats(exec)); - - return Status(ErrorCodes::OperationFailed, - str::stream() << "GetMore command executor error: " - << WorkingSetCommon::toStatusString(obj)); - } else if (PlanExecutor::DEAD == *state) { - nextBatch->abandon(); - - return Status(ErrorCodes::QueryPlanKilled, - str::stream() << "PlanExecutor killed: " - << WorkingSetCommon::toStatusString(obj)); - } else if (PlanExecutor::IS_EOF == *state) { - // This causes the reported latest oplog timestamp to advance even when there are - // no results for this particular query. - nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); + switch (*state) { + case PlanExecutor::FAILURE: + // Log an error message and then perform the same cleanup as DEAD. + error() << "GetMore command executor error: " << PlanExecutor::statestr(*state) + << ", stats: " << redact(Explain::getWinningPlanStats(exec)); + case PlanExecutor::DEAD: { + nextBatch->abandon(); + // We should always have a valid status member object at this point. + auto status = WorkingSetCommon::getMemberObjectStatus(obj); + invariant(!status.isOK()); + return status; + } + case PlanExecutor::IS_EOF: + // This causes the reported latest oplog timestamp to advance even when there are + // no results for this particular query. + nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); + default: + return Status::OK(); } - return Status::OK(); + MONGO_UNREACHABLE; } } getMoreCmd; |