diff options
author | Jason Chan <jason.chan@10gen.com> | 2019-02-14 13:08:41 -0500 |
---|---|---|
committer | Jason Chan <jason.chan@10gen.com> | 2019-02-14 13:09:29 -0500 |
commit | 22e1ef5134181dd9f58e8408e04744f205d7b41d (patch) | |
tree | 7488b8bd8cf49777e0abd54b006be01b62e04f20 /src/mongo/db | |
parent | a6eab704282bd9c68249325d4fc38a9b2253724a (diff) | |
download | mongo-22e1ef5134181dd9f58e8408e04744f205d7b41d.tar.gz |
SERVER-37948 Satisfy linearizable read concern on getmore cursors.
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 69 | ||||
-rw-r--r-- | src/mongo/db/read_concern.h | 6 | ||||
-rw-r--r-- | src/mongo/db/read_concern_mongod.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_mongod.cpp | 2 |
4 files changed, 55 insertions, 28 deletions
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 604d0535f5b..64beb7f2725 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -71,6 +71,9 @@ namespace { MONGO_FAIL_POINT_DEFINE(rsStopGetMoreCmd); +// The timeout when waiting for linearizable read concern on a getMore command. +static constexpr int kLinearizableReadConcernTimeout = 15000; + /** * Validates that the lsid of 'opCtx' matches that of 'cursor'. This must be called after * authenticating, so that it is safe to report the lsid of 'cursor'. @@ -269,19 +272,11 @@ public: MONGO_UNREACHABLE; } - void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override { - // Counted as a getMore, not as a command. - globalOpCounters.gotGetMore(); - auto curOp = CurOp::get(opCtx); - curOp->debug().cursorid = _request.cursorid; - - // Validate term before acquiring locks, if provided. - if (_request.term) { - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - // Note: updateTerm returns ok if term stayed the same. - uassertStatusOK(replCoord->updateTerm(opCtx, *_request.term)); - } - + void acquireLocksAndIterateCursor(OperationContext* opCtx, + rpc::ReplyBuilderInterface* reply, + CursorManager* cursorManager, + ClientCursorPin& cursorPin, + CurOp* curOp) { // Cursors come in one of two flavors: // // - Cursors which read from a single collection, such as those generated via the @@ -303,9 +298,6 @@ public: boost::optional<AutoGetCollectionForRead> readLock; boost::optional<AutoStatsTracker> statsTracker; - auto cursorManager = CursorManager::get(opCtx); - auto cursorPin = uassertStatusOK(cursorManager->pinCursor(opCtx, _request.cursorid)); - { // We call RecoveryUnit::setTimestampReadSource() before acquiring a lock on the // collection via AutoGetCollectionForRead in order to ensure the comparison to the @@ -331,7 +323,6 @@ public: opCtx->recoveryUnit()->setIgnorePrepared(false); } } - if (cursorPin->lockPolicy() == ClientCursorParams::LockPolicy::kLocksInternally) { if (!_request.nss.isCollectionlessCursorNamespace()) { const boost::optional<int> dbProfilingLevel = boost::none; @@ -355,7 +346,7 @@ public: } // Only used by the failpoints. - stdx::function<void()> dropAndReaquireReadLock = [&readLock, opCtx, this]() { + stdx::function<void()> dropAndReacquireReadLock = [&readLock, opCtx, this]() { // Make sure an interrupted operation does not prevent us from reacquiring the lock. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); @@ -418,14 +409,14 @@ public: MONGO_FAIL_POINT_BLOCK(waitAfterPinningCursorBeforeGetMoreBatch, options) { const BSONObj& data = options.getData(); if (data["shouldNotdropLock"].booleanSafe()) { - dropAndReaquireReadLock = []() {}; + dropAndReacquireReadLock = []() {}; } CurOpFailpointHelpers::waitWhileFailPointEnabled( &waitAfterPinningCursorBeforeGetMoreBatch, opCtx, "waitAfterPinningCursorBeforeGetMoreBatch", - dropAndReaquireReadLock, + dropAndReacquireReadLock, false, _request.nss); } @@ -512,7 +503,7 @@ public: &waitWithPinnedCursorDuringGetMoreBatch, opCtx, "waitWithPinnedCursorDuringGetMoreBatch", - dropAndReaquireReadLock); + dropAndReacquireReadLock); } uassertStatusOK(generateBatch( @@ -558,6 +549,39 @@ public: if (respondWithId) { cursorFreer.dismiss(); } + } + + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override { + // Counted as a getMore, not as a command. + globalOpCounters.gotGetMore(); + auto curOp = CurOp::get(opCtx); + curOp->debug().cursorid = _request.cursorid; + + // Validate term before acquiring locks, if provided. + if (_request.term) { + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + // Note: updateTerm returns ok if term stayed the same. + uassertStatusOK(replCoord->updateTerm(opCtx, *_request.term)); + } + + auto cursorManager = CursorManager::get(opCtx); + auto cursorPin = uassertStatusOK(cursorManager->pinCursor(opCtx, _request.cursorid)); + + // Get the read concern level here in case the cursor is exhausted while iterating. + const auto isLinearizableReadConcern = cursorPin->getReadConcernArgs().getLevel() == + repl::ReadConcernLevel::kLinearizableReadConcern; + + acquireLocksAndIterateCursor(opCtx, reply, cursorManager, cursorPin, curOp); + + if (isLinearizableReadConcern) { + // waitForLinearizableReadConcern performs a NoOp write and waits for that write + // to have been majority committed. awaitReplication requires that we release all + // locks to prevent blocking for a long time while doing network activity. Since + // getMores do not have support for a maxTimeout duration, we hardcode the timeout + // to avoid waiting indefinitely. + uassertStatusOK( + mongo::waitForLinearizableReadConcern(opCtx, kLinearizableReadConcernTimeout)); + } // We're about to unpin or delete the cursor as the ClientCursorPin goes out of scope. // If the 'waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch' failpoint is active, we @@ -567,8 +591,7 @@ public: CurOpFailpointHelpers::waitWhileFailPointEnabled( &waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch, opCtx, - "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch", - dropAndReaquireReadLock); + "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch"); } } diff --git a/src/mongo/db/read_concern.h b/src/mongo/db/read_concern.h index 49a0b92acb0..2fd42beb71f 100644 --- a/src/mongo/db/read_concern.h +++ b/src/mongo/db/read_concern.h @@ -58,8 +58,12 @@ extern MONGO_DECLARE_SHIM((OperationContext * opCtx, /* * Given a linearizable read command, confirm that * current primary is still the true primary of the replica set. + * + * A readConcernTimeout of 0 indicates that the operation will block indefinitely waiting for read + * concern. */ -extern MONGO_DECLARE_SHIM((OperationContext * opCtx)->Status) waitForLinearizableReadConcern; +extern MONGO_DECLARE_SHIM((OperationContext * opCtx, const int readConcernTimeout)->Status) + waitForLinearizableReadConcern; /** * Waits to satisfy a "speculative" majority read. diff --git a/src/mongo/db/read_concern_mongod.cpp b/src/mongo/db/read_concern_mongod.cpp index 0cd7add7a20..d20f4ad7b38 100644 --- a/src/mongo/db/read_concern_mongod.cpp +++ b/src/mongo/db/read_concern_mongod.cpp @@ -347,8 +347,8 @@ MONGO_REGISTER_SHIM(waitForReadConcern) return Status::OK(); } -MONGO_REGISTER_SHIM(waitForLinearizableReadConcern)(OperationContext* opCtx)->Status { - +MONGO_REGISTER_SHIM(waitForLinearizableReadConcern) +(OperationContext* opCtx, const int readConcernTimeout)->Status { CurOpFailpointHelpers::waitWhileFailPointEnabled( &hangBeforeLinearizableReadConcern, opCtx, "hangBeforeLinearizableReadConcern", [opCtx]() { log() << "batch update - hangBeforeLinearizableReadConcern fail point enabled. " @@ -377,7 +377,7 @@ MONGO_REGISTER_SHIM(waitForLinearizableReadConcern)(OperationContext* opCtx)->St }); } WriteConcernOptions wc = WriteConcernOptions( - WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0); + WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, readConcernTimeout); repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); auto awaitReplResult = replCoord->awaitReplication(opCtx, lastOpApplied, wc); diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index 4af38918684..0e870bf80a6 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -118,7 +118,7 @@ public: // from the primary. if (repl::ReadConcernArgs::get(opCtx).getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern) { - uassertStatusOK(mongo::waitForLinearizableReadConcern(opCtx)); + uassertStatusOK(mongo::waitForLinearizableReadConcern(opCtx, 0)); } } |