summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJason Chan <jason.chan@10gen.com>2019-02-14 13:08:41 -0500
committerJason Chan <jason.chan@10gen.com>2019-02-14 13:09:29 -0500
commit22e1ef5134181dd9f58e8408e04744f205d7b41d (patch)
tree7488b8bd8cf49777e0abd54b006be01b62e04f20 /src
parenta6eab704282bd9c68249325d4fc38a9b2253724a (diff)
downloadmongo-22e1ef5134181dd9f58e8408e04744f205d7b41d.tar.gz
SERVER-37948 Satisfy linearizable read concern on getmore cursors.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp69
-rw-r--r--src/mongo/db/read_concern.h6
-rw-r--r--src/mongo/db/read_concern_mongod.cpp6
-rw-r--r--src/mongo/db/service_entry_point_mongod.cpp2
-rw-r--r--src/mongo/embedded/read_concern_embedded.cpp3
-rw-r--r--src/mongo/embedded/service_entry_point_embedded.cpp2
6 files changed, 58 insertions, 30 deletions
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 604d0535f5b..64beb7f2725 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -71,6 +71,9 @@ namespace {
MONGO_FAIL_POINT_DEFINE(rsStopGetMoreCmd);
+// The timeout when waiting for linearizable read concern on a getMore command.
+static constexpr int kLinearizableReadConcernTimeout = 15000;
+
/**
* Validates that the lsid of 'opCtx' matches that of 'cursor'. This must be called after
* authenticating, so that it is safe to report the lsid of 'cursor'.
@@ -269,19 +272,11 @@ public:
MONGO_UNREACHABLE;
}
- void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override {
- // Counted as a getMore, not as a command.
- globalOpCounters.gotGetMore();
- auto curOp = CurOp::get(opCtx);
- curOp->debug().cursorid = _request.cursorid;
-
- // Validate term before acquiring locks, if provided.
- if (_request.term) {
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- // Note: updateTerm returns ok if term stayed the same.
- uassertStatusOK(replCoord->updateTerm(opCtx, *_request.term));
- }
-
+ void acquireLocksAndIterateCursor(OperationContext* opCtx,
+ rpc::ReplyBuilderInterface* reply,
+ CursorManager* cursorManager,
+ ClientCursorPin& cursorPin,
+ CurOp* curOp) {
// Cursors come in one of two flavors:
//
// - Cursors which read from a single collection, such as those generated via the
@@ -303,9 +298,6 @@ public:
boost::optional<AutoGetCollectionForRead> readLock;
boost::optional<AutoStatsTracker> statsTracker;
- auto cursorManager = CursorManager::get(opCtx);
- auto cursorPin = uassertStatusOK(cursorManager->pinCursor(opCtx, _request.cursorid));
-
{
// We call RecoveryUnit::setTimestampReadSource() before acquiring a lock on the
// collection via AutoGetCollectionForRead in order to ensure the comparison to the
@@ -331,7 +323,6 @@ public:
opCtx->recoveryUnit()->setIgnorePrepared(false);
}
}
-
if (cursorPin->lockPolicy() == ClientCursorParams::LockPolicy::kLocksInternally) {
if (!_request.nss.isCollectionlessCursorNamespace()) {
const boost::optional<int> dbProfilingLevel = boost::none;
@@ -355,7 +346,7 @@ public:
}
// Only used by the failpoints.
- stdx::function<void()> dropAndReaquireReadLock = [&readLock, opCtx, this]() {
+ stdx::function<void()> dropAndReacquireReadLock = [&readLock, opCtx, this]() {
// Make sure an interrupted operation does not prevent us from reacquiring the lock.
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
@@ -418,14 +409,14 @@ public:
MONGO_FAIL_POINT_BLOCK(waitAfterPinningCursorBeforeGetMoreBatch, options) {
const BSONObj& data = options.getData();
if (data["shouldNotdropLock"].booleanSafe()) {
- dropAndReaquireReadLock = []() {};
+ dropAndReacquireReadLock = []() {};
}
CurOpFailpointHelpers::waitWhileFailPointEnabled(
&waitAfterPinningCursorBeforeGetMoreBatch,
opCtx,
"waitAfterPinningCursorBeforeGetMoreBatch",
- dropAndReaquireReadLock,
+ dropAndReacquireReadLock,
false,
_request.nss);
}
@@ -512,7 +503,7 @@ public:
&waitWithPinnedCursorDuringGetMoreBatch,
opCtx,
"waitWithPinnedCursorDuringGetMoreBatch",
- dropAndReaquireReadLock);
+ dropAndReacquireReadLock);
}
uassertStatusOK(generateBatch(
@@ -558,6 +549,39 @@ public:
if (respondWithId) {
cursorFreer.dismiss();
}
+ }
+
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override {
+ // Counted as a getMore, not as a command.
+ globalOpCounters.gotGetMore();
+ auto curOp = CurOp::get(opCtx);
+ curOp->debug().cursorid = _request.cursorid;
+
+ // Validate term before acquiring locks, if provided.
+ if (_request.term) {
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ // Note: updateTerm returns ok if term stayed the same.
+ uassertStatusOK(replCoord->updateTerm(opCtx, *_request.term));
+ }
+
+ auto cursorManager = CursorManager::get(opCtx);
+ auto cursorPin = uassertStatusOK(cursorManager->pinCursor(opCtx, _request.cursorid));
+
+ // Get the read concern level here in case the cursor is exhausted while iterating.
+ const auto isLinearizableReadConcern = cursorPin->getReadConcernArgs().getLevel() ==
+ repl::ReadConcernLevel::kLinearizableReadConcern;
+
+ acquireLocksAndIterateCursor(opCtx, reply, cursorManager, cursorPin, curOp);
+
+ if (isLinearizableReadConcern) {
+ // waitForLinearizableReadConcern performs a NoOp write and waits for that write
+ // to have been majority committed. awaitReplication requires that we release all
+ // locks to prevent blocking for a long time while doing network activity. Since
+ // getMores do not have support for a maxTimeout duration, we hardcode the timeout
+ // to avoid waiting indefinitely.
+ uassertStatusOK(
+ mongo::waitForLinearizableReadConcern(opCtx, kLinearizableReadConcernTimeout));
+ }
// We're about to unpin or delete the cursor as the ClientCursorPin goes out of scope.
// If the 'waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch' failpoint is active, we
@@ -567,8 +591,7 @@ public:
CurOpFailpointHelpers::waitWhileFailPointEnabled(
&waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch,
opCtx,
- "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch",
- dropAndReaquireReadLock);
+ "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch");
}
}
diff --git a/src/mongo/db/read_concern.h b/src/mongo/db/read_concern.h
index 49a0b92acb0..2fd42beb71f 100644
--- a/src/mongo/db/read_concern.h
+++ b/src/mongo/db/read_concern.h
@@ -58,8 +58,12 @@ extern MONGO_DECLARE_SHIM((OperationContext * opCtx,
/*
* Given a linearizable read command, confirm that
* current primary is still the true primary of the replica set.
+ *
+ * A readConcernTimeout of 0 indicates that the operation will block indefinitely waiting for read
+ * concern.
*/
-extern MONGO_DECLARE_SHIM((OperationContext * opCtx)->Status) waitForLinearizableReadConcern;
+extern MONGO_DECLARE_SHIM((OperationContext * opCtx, const int readConcernTimeout)->Status)
+ waitForLinearizableReadConcern;
/**
* Waits to satisfy a "speculative" majority read.
diff --git a/src/mongo/db/read_concern_mongod.cpp b/src/mongo/db/read_concern_mongod.cpp
index 0cd7add7a20..d20f4ad7b38 100644
--- a/src/mongo/db/read_concern_mongod.cpp
+++ b/src/mongo/db/read_concern_mongod.cpp
@@ -347,8 +347,8 @@ MONGO_REGISTER_SHIM(waitForReadConcern)
return Status::OK();
}
-MONGO_REGISTER_SHIM(waitForLinearizableReadConcern)(OperationContext* opCtx)->Status {
-
+MONGO_REGISTER_SHIM(waitForLinearizableReadConcern)
+(OperationContext* opCtx, const int readConcernTimeout)->Status {
CurOpFailpointHelpers::waitWhileFailPointEnabled(
&hangBeforeLinearizableReadConcern, opCtx, "hangBeforeLinearizableReadConcern", [opCtx]() {
log() << "batch update - hangBeforeLinearizableReadConcern fail point enabled. "
@@ -377,7 +377,7 @@ MONGO_REGISTER_SHIM(waitForLinearizableReadConcern)(OperationContext* opCtx)->St
});
}
WriteConcernOptions wc = WriteConcernOptions(
- WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0);
+ WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, readConcernTimeout);
repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
auto awaitReplResult = replCoord->awaitReplication(opCtx, lastOpApplied, wc);
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp
index 4af38918684..0e870bf80a6 100644
--- a/src/mongo/db/service_entry_point_mongod.cpp
+++ b/src/mongo/db/service_entry_point_mongod.cpp
@@ -118,7 +118,7 @@ public:
// from the primary.
if (repl::ReadConcernArgs::get(opCtx).getLevel() ==
repl::ReadConcernLevel::kLinearizableReadConcern) {
- uassertStatusOK(mongo::waitForLinearizableReadConcern(opCtx));
+ uassertStatusOK(mongo::waitForLinearizableReadConcern(opCtx, 0));
}
}
diff --git a/src/mongo/embedded/read_concern_embedded.cpp b/src/mongo/embedded/read_concern_embedded.cpp
index 1d52d3d39fb..db4dbe3ef83 100644
--- a/src/mongo/embedded/read_concern_embedded.cpp
+++ b/src/mongo/embedded/read_concern_embedded.cpp
@@ -55,7 +55,8 @@ MONGO_REGISTER_SHIM(waitForSpeculativeMajorityReadConcern)
return Status::OK();
}
-MONGO_REGISTER_SHIM(waitForLinearizableReadConcern)(OperationContext* opCtx)->Status {
+MONGO_REGISTER_SHIM(waitForLinearizableReadConcern)
+(OperationContext* opCtx, const int readConcernTimeout)->Status {
return Status::OK();
}
diff --git a/src/mongo/embedded/service_entry_point_embedded.cpp b/src/mongo/embedded/service_entry_point_embedded.cpp
index 74bb9dfb034..bacdad22ffd 100644
--- a/src/mongo/embedded/service_entry_point_embedded.cpp
+++ b/src/mongo/embedded/service_entry_point_embedded.cpp
@@ -75,7 +75,7 @@ public:
void waitForLinearizableReadConcern(OperationContext* opCtx) const override {
if (repl::ReadConcernArgs::get(opCtx).getLevel() ==
repl::ReadConcernLevel::kLinearizableReadConcern) {
- uassertStatusOK(mongo::waitForLinearizableReadConcern(opCtx));
+ uassertStatusOK(mongo::waitForLinearizableReadConcern(opCtx, 0));
}
}