diff options
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/SConscript | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/read_concern_args.cpp | 48 | ||||
-rw-r--r-- | src/mongo/db/repl/read_concern_args.h | 31 | ||||
-rw-r--r-- | src/mongo/db/repl/read_concern_args_test.cpp | 55 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 80 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 26 |
6 files changed, 220 insertions, 26 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index e8d3fe79e65..e1bb059fe44 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -627,6 +627,7 @@ env.Library('read_concern_args', LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/bson/util/bson_extract', + '$BUILD_DIR/mongo/db/logical_time', 'optime', ]) @@ -960,7 +961,10 @@ env.CppUnitTest( source=[ 'read_concern_args_test.cpp', ], - LIBDEPS=['replica_set_messages'] + LIBDEPS=[ + 'replica_set_messages', + '$BUILD_DIR/mongo/db/logical_time', + ], ) env.Library(target='optime', diff --git a/src/mongo/db/repl/read_concern_args.cpp b/src/mongo/db/repl/read_concern_args.cpp index 409560de922..2352547deb5 100644 --- a/src/mongo/db/repl/read_concern_args.cpp +++ b/src/mongo/db/repl/read_concern_args.cpp @@ -34,6 +34,7 @@ #include "mongo/bson/util/bson_extract.h" #include "mongo/db/jsobj.h" +#include "mongo/db/logical_time.h" #include "mongo/db/repl/bson_extract_optime.h" #include "mongo/util/mongoutils/str.h" @@ -52,6 +53,7 @@ const char kLinearizableReadConcernStr[] = "linearizable"; const string ReadConcernArgs::kReadConcernFieldName("readConcern"); const string ReadConcernArgs::kAfterOpTimeFieldName("afterOpTime"); +const string ReadConcernArgs::kAfterClusterTimeFieldName("afterClusterTime"); const string ReadConcernArgs::kLevelFieldName("level"); ReadConcernArgs::ReadConcernArgs() = default; @@ -60,6 +62,11 @@ ReadConcernArgs::ReadConcernArgs(boost::optional<OpTime> opTime, boost::optional<ReadConcernLevel> level) : _opTime(std::move(opTime)), _level(std::move(level)) {} +ReadConcernArgs::ReadConcernArgs(boost::optional<OpTime> opTime, + boost::optional<LogicalTime> clusterTime, + boost::optional<ReadConcernLevel> level) + : _opTime(std::move(opTime)), _clusterTime(std::move(clusterTime)), _level(std::move(level)) {} + std::string ReadConcernArgs::toString() const { return toBSON().toString(); } @@ -71,19 +78,23 @@ BSONObj ReadConcernArgs::toBSON() const { } bool ReadConcernArgs::isEmpty() const { - return getOpTime().isNull() && getLevel() == repl::ReadConcernLevel::kLocalReadConcern; + return !_clusterTime && !_opTime && !_level; } ReadConcernLevel ReadConcernArgs::getLevel() const { return _level.value_or(ReadConcernLevel::kLocalReadConcern); } -OpTime ReadConcernArgs::getOpTime() const { - return _opTime.value_or(OpTime()); +boost::optional<OpTime> ReadConcernArgs::getArgsOpTime() const { + return _opTime; +} + +boost::optional<LogicalTime> ReadConcernArgs::getArgsClusterTime() const { + return _clusterTime; } Status ReadConcernArgs::initialize(const BSONElement& readConcernElem) { - invariant(!_opTime && !_level); // only legal to call on uninitialized object. + invariant(isEmpty()); // only legal to call on uninitialized object. if (readConcernElem.eoo()) { return Status::OK(); @@ -108,14 +119,24 @@ Status ReadConcernArgs::initialize(const BSONElement& readConcernElem) { return opTimeStatus; } _opTime = opTime; + } else if (fieldName == kAfterClusterTimeFieldName) { + Timestamp clusterTime; + auto clusterTimeStatus = + bsonExtractTimestampField(readConcernObj, kAfterClusterTimeFieldName, &clusterTime); + if (!clusterTimeStatus.isOK()) { + return clusterTimeStatus; + } + _clusterTime = LogicalTime(clusterTime); } else if (fieldName == kLevelFieldName) { std::string levelString; // TODO pass field in rather than scanning again. auto readCommittedStatus = bsonExtractStringField(readConcernObj, kLevelFieldName, &levelString); + if (!readCommittedStatus.isOK()) { return readCommittedStatus; } + if (levelString == kLocalReadConcernStr) { _level = ReadConcernLevel::kLocalReadConcern; } else if (levelString == kMajorityReadConcernStr) { @@ -136,6 +157,21 @@ Status ReadConcernArgs::initialize(const BSONElement& readConcernElem) { } } + if (_clusterTime && _opTime) { + return Status(ErrorCodes::InvalidOptions, + str::stream() << "Can not specify both " << kAfterClusterTimeFieldName + << " and " + << kAfterOpTimeFieldName); + } + + if (_clusterTime && (getLevel() != ReadConcernLevel::kMajorityReadConcern)) { + return Status(ErrorCodes::InvalidOptions, + str::stream() << kAfterClusterTimeFieldName << " field can be set only if " + << kLevelFieldName + << " is equal to " + << kMajorityReadConcernStr); + } + return Status::OK(); } @@ -168,6 +204,10 @@ void ReadConcernArgs::appendInfo(BSONObjBuilder* builder) const { _opTime->append(&rcBuilder, kAfterOpTimeFieldName); } + if (_clusterTime) { + rcBuilder.append(kAfterClusterTimeFieldName, _clusterTime->asTimestamp()); + } + rcBuilder.done(); } diff --git a/src/mongo/db/repl/read_concern_args.h b/src/mongo/db/repl/read_concern_args.h index b258ce04461..6778a548228 100644 --- a/src/mongo/db/repl/read_concern_args.h +++ b/src/mongo/db/repl/read_concern_args.h @@ -33,6 +33,7 @@ #include "mongo/base/status.h" #include "mongo/db/json.h" +#include "mongo/db/logical_time.h" #include "mongo/db/repl/optime.h" #include "mongo/util/time_support.h" @@ -48,19 +49,25 @@ class ReadConcernArgs { public: static const std::string kReadConcernFieldName; static const std::string kAfterOpTimeFieldName; + static const std::string kAfterClusterTimeFieldName; static const std::string kLevelFieldName; ReadConcernArgs(); + ReadConcernArgs(boost::optional<OpTime> opTime, boost::optional<ReadConcernLevel> level); + ReadConcernArgs(boost::optional<OpTime> opTime, + boost::optional<LogicalTime> clusterTime, + boost::optional<ReadConcernLevel> level); /** * Format: * { - * find: “coll”, + * find: "coll" * filter: <Query Object>, * readConcern: { // optional * level: "[majority|local|linearizable]", * afterOpTime: { ts: <timestamp>, term: <NumberLong> }, + * afterClusterTime: <timestamp>, * } * } */ @@ -81,18 +88,34 @@ public: void appendInfo(BSONObjBuilder* builder) const; /** - * Returns whether these arguments are 'empty' in the sense that no read concern has been - * requested. + * Returns true if any of clusterTime, opTime or level arguments are set. */ bool isEmpty() const; + /** + * Returns default kLocalReadConcern if _level is not set. + */ ReadConcernLevel getLevel() const; - OpTime getOpTime() const; + + /** + * Returns the opTime. Deprecated: will be replaced with getArgsClusterTime. + */ + boost::optional<OpTime> getArgsOpTime() const; + + boost::optional<LogicalTime> getArgsClusterTime() const; BSONObj toBSON() const; std::string toString() const; private: + /** + * Read data after the OpTime of an operation on this replica set. Deprecated. + * The only user is for read-after-optime calls using the config server optime. + */ boost::optional<OpTime> _opTime; + /** + * Read data after cluster-wide logical time. + */ + boost::optional<LogicalTime> _clusterTime; boost::optional<ReadConcernLevel> _level; }; diff --git a/src/mongo/db/repl/read_concern_args_test.cpp b/src/mongo/db/repl/read_concern_args_test.cpp index f915209e585..a41bc87d8ac 100644 --- a/src/mongo/db/repl/read_concern_args_test.cpp +++ b/src/mongo/db/repl/read_concern_args_test.cpp @@ -36,7 +36,7 @@ namespace mongo { namespace repl { namespace { -TEST(ReadAfterParse, ReadAfterOnly) { +TEST(ReadAfterParse, OpTimeOnly) { ReadConcernArgs readAfterOpTime; ASSERT_OK(readAfterOpTime.initialize(BSON( "find" @@ -46,12 +46,44 @@ TEST(ReadAfterParse, ReadAfterOnly) { << BSON(OpTime::kTimestampFieldName << Timestamp(20, 30) << OpTime::kTermFieldName << 2))))); - ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp()); - ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm()); + ASSERT_TRUE(readAfterOpTime.getArgsOpTime()); + ASSERT_TRUE(!readAfterOpTime.getArgsClusterTime()); + auto argsOpTime = readAfterOpTime.getArgsOpTime(); + ASSERT_EQ(Timestamp(20, 30), argsOpTime->getTimestamp()); + ASSERT_EQ(2, argsOpTime->getTerm()); ASSERT(ReadConcernLevel::kLocalReadConcern == readAfterOpTime.getLevel()); } -TEST(ReadAfterParse, ReadCommitLevelOnly) { +TEST(ReadAfterParse, ClusterTimeOnly) { + ReadConcernArgs readAfterOpTime; + // Must have level=majority + ASSERT_NOT_OK( + readAfterOpTime.initialize(BSON("find" + << "test" + << ReadConcernArgs::kReadConcernFieldName + << BSON(ReadConcernArgs::kAfterClusterTimeFieldName + << Timestamp(20, 30))))); +} + +TEST(ReadAfterParse, ClusterTimeAndLevel) { + ReadConcernArgs readAfterOpTime; + // Must have level=majority + auto clusterTime = LogicalTime(Timestamp(20, 30)); + ASSERT_OK(readAfterOpTime.initialize(BSON("find" + << "test" + << ReadConcernArgs::kReadConcernFieldName + << BSON(ReadConcernArgs::kAfterClusterTimeFieldName + << clusterTime.asTimestamp() + << ReadConcernArgs::kLevelFieldName + << "majority")))); + auto argsClusterTime = readAfterOpTime.getArgsClusterTime(); + ASSERT_TRUE(argsClusterTime); + ASSERT_TRUE(!readAfterOpTime.getArgsOpTime()); + ASSERT_TRUE(clusterTime == *argsClusterTime); + ASSERT(ReadConcernLevel::kMajorityReadConcern == readAfterOpTime.getLevel()); +} + +TEST(ReadAfterParse, LevelOnly) { ReadConcernArgs readAfterOpTime; ASSERT_OK( readAfterOpTime.initialize(BSON("find" @@ -59,25 +91,25 @@ TEST(ReadAfterParse, ReadCommitLevelOnly) { << ReadConcernArgs::kReadConcernFieldName << BSON(ReadConcernArgs::kLevelFieldName << "majority")))); - ASSERT_TRUE(readAfterOpTime.getOpTime().isNull()); + ASSERT_TRUE(!readAfterOpTime.getArgsOpTime()); + ASSERT_TRUE(!readAfterOpTime.getArgsClusterTime()); ASSERT_TRUE(ReadConcernLevel::kMajorityReadConcern == readAfterOpTime.getLevel()); } TEST(ReadAfterParse, ReadCommittedFullSpecification) { ReadConcernArgs readAfterOpTime; - ASSERT_OK(readAfterOpTime.initialize(BSON( + auto clusterTime = LogicalTime(Timestamp(100, 200)); + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( "find" << "test" << ReadConcernArgs::kReadConcernFieldName << BSON(ReadConcernArgs::kAfterOpTimeFieldName << BSON(OpTime::kTimestampFieldName << Timestamp(20, 30) << OpTime::kTermFieldName << 2) + << ReadConcernArgs::kAfterClusterTimeFieldName + << clusterTime.asTimestamp() << ReadConcernArgs::kLevelFieldName << "majority")))); - - ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp()); - ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm()); - ASSERT(ReadConcernLevel::kMajorityReadConcern == readAfterOpTime.getLevel()); } TEST(ReadAfterParse, Empty) { @@ -85,7 +117,8 @@ TEST(ReadAfterParse, Empty) { ASSERT_OK(readAfterOpTime.initialize(BSON("find" << "test"))); - ASSERT(readAfterOpTime.getOpTime().getTimestamp().isNull()); + ASSERT_TRUE(!readAfterOpTime.getArgsOpTime()); + ASSERT_TRUE(!readAfterOpTime.getArgsClusterTime()); ASSERT(ReadConcernLevel::kLocalReadConcern == readAfterOpTime.getLevel()); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 0b8efc13717..3600a08ea1a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -407,6 +407,10 @@ OpTime ReplicationCoordinatorImpl::_getCurrentCommittedSnapshotOpTime_inlock() c return OpTime(); } +LogicalTime ReplicationCoordinatorImpl::_getCurrentCommittedLogicalTime_inlock() const { + return LogicalTime(_getCurrentCommittedSnapshotOpTime_inlock().getTimestamp()); +} + void ReplicationCoordinatorImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) const { _replExecutor.appendConnectionStats(stats); } @@ -1223,8 +1227,8 @@ OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const { return _getMyLastDurableOpTime_inlock(); } -Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCtx, - const ReadConcernArgs& settings) { +Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx, + const ReadConcernArgs& readConcern) { // We should never wait for replication if we are holding any locks, because this can // potentially block for long time while doing network activity. if (opCtx->lockState()->isLocked()) { @@ -1233,7 +1237,12 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCt } const bool isMajorityReadConcern = - settings.getLevel() == ReadConcernLevel::kMajorityReadConcern; + readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern; + + if (readConcern.getArgsClusterTime() && !isMajorityReadConcern) { + return {ErrorCodes::BadValue, + "only readConcern level majority is allowed when specifying afterClusterTime"}; + } if (isMajorityReadConcern && !getSettings().isMajorityReadConcernEnabled()) { // This is an opt-in feature. Fail if the user didn't opt-in. @@ -1242,9 +1251,18 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCt "--enableMajorityReadConcern."}; } - const auto targetOpTime = settings.getOpTime(); + return Status::OK(); +} + +Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCtx, + const ReadConcernArgs& readConcern) { + auto verifyStatus = _validateReadConcern(opCtx, readConcern); + if (!verifyStatus.isOK()) { + return verifyStatus; + } - if (targetOpTime.isNull()) { + // nothing to wait for + if (!readConcern.getArgsClusterTime() && !readConcern.getArgsOpTime()) { return Status::OK(); } @@ -1256,6 +1274,58 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCt "node needs to be a replica set member to use read concern"}; } + if (readConcern.getArgsClusterTime()) { + auto targetTime = *readConcern.getArgsClusterTime(); + + if (readConcern.getArgsOpTime()) { + auto opTimeStamp = LogicalTime(readConcern.getArgsOpTime()->getTimestamp()); + if (opTimeStamp > targetTime) { + targetTime = opTimeStamp; + } + } + return _waitUntilClusterTimeForRead(opCtx, targetTime); + } else { + return _waitUntilOpTimeForReadDeprecated(opCtx, readConcern); + } +} + +Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(OperationContext* opCtx, + LogicalTime clusterTime) { + invariant(clusterTime != LogicalTime::kUninitialized); + + stdx::unique_lock<stdx::mutex> lock(_mutex); + + auto currentTime = _getCurrentCommittedLogicalTime_inlock(); + if (clusterTime > currentTime) { + LOG(1) << "waitUntilClusterTime: waiting for clusterTime:" << clusterTime.toString() + << " to be in a snapshot -- current snapshot: " << currentTime.toString(); + } + + while (clusterTime > _getCurrentCommittedLogicalTime_inlock()) { + if (_inShutdown) { + return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; + } + + LOG(3) << "waitUntilClusterTime: waiting for a new snapshot until " << opCtx->getDeadline(); + + auto waitStatus = + opCtx->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock); + if (!waitStatus.isOK()) { + return waitStatus; + } + LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString(); + } + + return Status::OK(); +} + +Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( + OperationContext* opCtx, const ReadConcernArgs& readConcern) { + const bool isMajorityReadConcern = + readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern; + + const auto targetOpTime = readConcern.getArgsOpTime().value_or(OpTime()); + stdx::unique_lock<stdx::mutex> lock(_mutex); if (isMajorityReadConcern && !_externalState->snapshotsEnabled()) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index dcf29656c3f..5003435147e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -169,7 +169,7 @@ public: virtual OpTime getMyLastDurableOpTime() const override; virtual Status waitUntilOpTimeForRead(OperationContext* opCtx, - const ReadConcernArgs& settings) override; + const ReadConcernArgs& readConcern) override; virtual OID getElectionId() override; @@ -582,6 +582,16 @@ private: OpTime _getCurrentCommittedSnapshotOpTime_inlock() const; /** + * Returns the OpTime of the current committed snapshot converted to LogicalTime. + */ + LogicalTime _getCurrentCommittedLogicalTime_inlock() const; + + /** + * Verifies that ReadConcernArgs match node's readConcern. + */ + Status _validateReadConcern(OperationContext* opCtx, const ReadConcernArgs& readConcern); + + /** * Helper method that removes entries from _slaveInfo if they correspond to a node * with a member ID that is not in the current replica set config. Will always leave an * entry for ourself at the beginning of _slaveInfo, even if we aren't present in the @@ -1171,6 +1181,20 @@ private: */ ReplicationExecutor::EventHandle _cancelElectionIfNeeded_inTopoLock(); + /** + * Waits until the optime of the current node is at least the opTime specified in 'readConcern'. + * It supports local readConcern, which _waitUntilClusterTimeForRead does not. + * TODO: remove when SERVER-28150 is done. + */ + Status _waitUntilOpTimeForReadDeprecated(OperationContext* opCtx, + const ReadConcernArgs& readConcern); + + /** + * Waits until the logicalTime of the current node is at least the 'clusterTime'. + * TODO: Merge with waitUntilOpTimeForRead() when SERVER-28150 is done. + */ + Status _waitUntilClusterTimeForRead(OperationContext* opCtx, LogicalTime clusterTime); + // // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. |