diff options
author | Misha Tyulenev <misha@mongodb.com> | 2017-06-21 22:18:58 -0400 |
---|---|---|
committer | Misha Tyulenev <misha@mongodb.com> | 2017-06-21 22:20:08 -0400 |
commit | 48dc747414e18d8caa56f0fba23c80871f67c31a (patch) | |
tree | dffe91c2fc1c792ce45b199a3deab9dedc702c07 /src/mongo/db/repl/replication_coordinator_impl.cpp | |
parent | 358cf1d12d8c3787fe14fe4458a242186d12b3c0 (diff) | |
download | mongo-48dc747414e18d8caa56f0fba23c80871f67c31a.tar.gz |
SERVER-28150 Add support for readConcern::afterClusterTime for level == local
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 90 |
1 files changed, 38 insertions, 52 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 2d3ba978e54..d52920c0f45 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1052,9 +1052,12 @@ Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx, const bool isMajorityReadConcern = readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern; - if (readConcern.getArgsClusterTime() && !isMajorityReadConcern) { + if (readConcern.getArgsClusterTime() && + readConcern.getLevel() != ReadConcernLevel::kMajorityReadConcern && + readConcern.getLevel() != ReadConcernLevel::kLocalReadConcern) { return {ErrorCodes::BadValue, - "only readConcern level majority is allowed when specifying afterClusterTime"}; + "Only readConcern level 'majority' or 'local' is allowed when specifying " + "afterClusterTime"}; } if (isMajorityReadConcern && !getSettings().isMajorityReadConcernEnabled()) { @@ -1088,57 +1091,15 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCt } if (readConcern.getArgsClusterTime()) { - auto targetTime = *readConcern.getArgsClusterTime(); - - if (readConcern.getArgsOpTime()) { - auto opTimeStamp = LogicalTime(readConcern.getArgsOpTime()->getTimestamp()); - if (opTimeStamp > targetTime) { - targetTime = opTimeStamp; - } - } - return _waitUntilClusterTimeForRead(opCtx, targetTime); + return _waitUntilClusterTimeForRead(opCtx, readConcern); } else { return _waitUntilOpTimeForReadDeprecated(opCtx, readConcern); } } -Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(OperationContext* opCtx, - LogicalTime clusterTime) { - invariant(clusterTime != LogicalTime::kUninitialized); - - stdx::unique_lock<stdx::mutex> lock(_mutex); - - auto currentTime = _getCurrentCommittedLogicalTime_inlock(); - if (clusterTime > currentTime) { - LOG(1) << "waitUntilClusterTime: waiting for clusterTime:" << clusterTime.toString() - << " to be in a snapshot -- current snapshot: " << currentTime.toString(); - } - - while (clusterTime > _getCurrentCommittedLogicalTime_inlock()) { - if (_inShutdown) { - return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; - } - - LOG(3) << "waitUntilClusterTime: waiting for a new snapshot until " << opCtx->getDeadline(); - - auto waitStatus = - opCtx->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock); - if (!waitStatus.isOK()) { - return waitStatus; - } - LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString(); - } - - return Status::OK(); -} - -Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( - OperationContext* opCtx, const ReadConcernArgs& readConcern) { - const bool isMajorityReadConcern = - readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern; - - const auto targetOpTime = readConcern.getArgsOpTime().value_or(OpTime()); - +Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx, + bool isMajorityReadConcern, + OpTime targetOpTime) { stdx::unique_lock<stdx::mutex> lock(_mutex); if (isMajorityReadConcern && !_externalState->snapshotsEnabled()) { @@ -1147,9 +1108,8 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( } auto getCurrentOpTime = [this, isMajorityReadConcern]() { - auto committedOptime = - _currentCommittedSnapshot ? _currentCommittedSnapshot->opTime : OpTime(); - return isMajorityReadConcern ? committedOptime : _getMyLastAppliedOpTime_inlock(); + return isMajorityReadConcern ? _getCurrentCommittedSnapshotOpTime_inlock() + : _getMyLastAppliedOpTime_inlock(); }; if (isMajorityReadConcern && targetOpTime > getCurrentOpTime()) { @@ -1181,7 +1141,7 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( ThreadWaiter waiter(targetOpTime, nullptr, &condVar); WaiterGuard guard(&_opTimeWaiterList, &waiter); - LOG(3) << "waituntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime " + LOG(3) << "waitUntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime " << waiter << " until " << opCtx->getDeadline(); auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock); @@ -1193,6 +1153,32 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( return Status::OK(); } +Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead( + OperationContext* opCtx, const ReadConcernArgs& readConcern) { + auto clusterTime = *readConcern.getArgsClusterTime(); + invariant(clusterTime != LogicalTime::kUninitialized); + + // convert clusterTime to opTime so it can be used by the _opTimeWaiterList for wait on + // readConcern level local. + auto targetOpTime = OpTime(clusterTime.asTimestamp(), OpTime::kUninitializedTerm); + invariant(!readConcern.getArgsOpTime()); + + const bool isMajorityReadConcern = + readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern; + + return _waitUntilOpTime(opCtx, isMajorityReadConcern, targetOpTime); +} + +// TODO: remove when SERVER-29729 is done +Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( + OperationContext* opCtx, const ReadConcernArgs& readConcern) { + const bool isMajorityReadConcern = + readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern; + + const auto targetOpTime = readConcern.getArgsOpTime().value_or(OpTime()); + return _waitUntilOpTime(opCtx, isMajorityReadConcern, targetOpTime); +} + OpTime ReplicationCoordinatorImpl::_getMyLastAppliedOpTime_inlock() const { return _topCoord->getMyLastAppliedOpTime(); } |