summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl.cpp
diff options
context:
space:
mode:
authorMisha Tyulenev <misha@mongodb.com>2017-06-21 22:18:58 -0400
committerMisha Tyulenev <misha@mongodb.com>2017-06-21 22:20:08 -0400
commit48dc747414e18d8caa56f0fba23c80871f67c31a (patch)
treedffe91c2fc1c792ce45b199a3deab9dedc702c07 /src/mongo/db/repl/replication_coordinator_impl.cpp
parent358cf1d12d8c3787fe14fe4458a242186d12b3c0 (diff)
downloadmongo-48dc747414e18d8caa56f0fba23c80871f67c31a.tar.gz
SERVER-28150 Add support for readConcern::afterClusterTime for level == local
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp90
1 files changed, 38 insertions, 52 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 2d3ba978e54..d52920c0f45 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1052,9 +1052,12 @@ Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx,
const bool isMajorityReadConcern =
readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
- if (readConcern.getArgsClusterTime() && !isMajorityReadConcern) {
+ if (readConcern.getArgsClusterTime() &&
+ readConcern.getLevel() != ReadConcernLevel::kMajorityReadConcern &&
+ readConcern.getLevel() != ReadConcernLevel::kLocalReadConcern) {
return {ErrorCodes::BadValue,
- "only readConcern level majority is allowed when specifying afterClusterTime"};
+ "Only readConcern level 'majority' or 'local' is allowed when specifying "
+ "afterClusterTime"};
}
if (isMajorityReadConcern && !getSettings().isMajorityReadConcernEnabled()) {
@@ -1088,57 +1091,15 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCt
}
if (readConcern.getArgsClusterTime()) {
- auto targetTime = *readConcern.getArgsClusterTime();
-
- if (readConcern.getArgsOpTime()) {
- auto opTimeStamp = LogicalTime(readConcern.getArgsOpTime()->getTimestamp());
- if (opTimeStamp > targetTime) {
- targetTime = opTimeStamp;
- }
- }
- return _waitUntilClusterTimeForRead(opCtx, targetTime);
+ return _waitUntilClusterTimeForRead(opCtx, readConcern);
} else {
return _waitUntilOpTimeForReadDeprecated(opCtx, readConcern);
}
}
-Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(OperationContext* opCtx,
- LogicalTime clusterTime) {
- invariant(clusterTime != LogicalTime::kUninitialized);
-
- stdx::unique_lock<stdx::mutex> lock(_mutex);
-
- auto currentTime = _getCurrentCommittedLogicalTime_inlock();
- if (clusterTime > currentTime) {
- LOG(1) << "waitUntilClusterTime: waiting for clusterTime:" << clusterTime.toString()
- << " to be in a snapshot -- current snapshot: " << currentTime.toString();
- }
-
- while (clusterTime > _getCurrentCommittedLogicalTime_inlock()) {
- if (_inShutdown) {
- return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
- }
-
- LOG(3) << "waitUntilClusterTime: waiting for a new snapshot until " << opCtx->getDeadline();
-
- auto waitStatus =
- opCtx->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock);
- if (!waitStatus.isOK()) {
- return waitStatus;
- }
- LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString();
- }
-
- return Status::OK();
-}
-
-Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
- OperationContext* opCtx, const ReadConcernArgs& readConcern) {
- const bool isMajorityReadConcern =
- readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
-
- const auto targetOpTime = readConcern.getArgsOpTime().value_or(OpTime());
-
+Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx,
+ bool isMajorityReadConcern,
+ OpTime targetOpTime) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
if (isMajorityReadConcern && !_externalState->snapshotsEnabled()) {
@@ -1147,9 +1108,8 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
}
auto getCurrentOpTime = [this, isMajorityReadConcern]() {
- auto committedOptime =
- _currentCommittedSnapshot ? _currentCommittedSnapshot->opTime : OpTime();
- return isMajorityReadConcern ? committedOptime : _getMyLastAppliedOpTime_inlock();
+ return isMajorityReadConcern ? _getCurrentCommittedSnapshotOpTime_inlock()
+ : _getMyLastAppliedOpTime_inlock();
};
if (isMajorityReadConcern && targetOpTime > getCurrentOpTime()) {
@@ -1181,7 +1141,7 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
ThreadWaiter waiter(targetOpTime, nullptr, &condVar);
WaiterGuard guard(&_opTimeWaiterList, &waiter);
- LOG(3) << "waituntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime "
+ LOG(3) << "waitUntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime "
<< waiter << " until " << opCtx->getDeadline();
auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock);
@@ -1193,6 +1153,32 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
return Status::OK();
}
+Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(
+ OperationContext* opCtx, const ReadConcernArgs& readConcern) {
+ auto clusterTime = *readConcern.getArgsClusterTime();
+ invariant(clusterTime != LogicalTime::kUninitialized);
+
+ // convert clusterTime to opTime so it can be used by the _opTimeWaiterList for wait on
+ // readConcern level local.
+ auto targetOpTime = OpTime(clusterTime.asTimestamp(), OpTime::kUninitializedTerm);
+ invariant(!readConcern.getArgsOpTime());
+
+ const bool isMajorityReadConcern =
+ readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
+
+ return _waitUntilOpTime(opCtx, isMajorityReadConcern, targetOpTime);
+}
+
+// TODO: remove when SERVER-29729 is done
+Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
+ OperationContext* opCtx, const ReadConcernArgs& readConcern) {
+ const bool isMajorityReadConcern =
+ readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
+
+ const auto targetOpTime = readConcern.getArgsOpTime().value_or(OpTime());
+ return _waitUntilOpTime(opCtx, isMajorityReadConcern, targetOpTime);
+}
+
OpTime ReplicationCoordinatorImpl::_getMyLastAppliedOpTime_inlock() const {
return _topCoord->getMyLastAppliedOpTime();
}