summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorMisha Tyulenev <misha@mongodb.com>2017-03-13 17:14:43 -0400
committerMisha Tyulenev <misha@mongodb.com>2017-03-13 17:15:06 -0400
commita0516b5f896703682c98cf0b8c2e333f743f4dc1 (patch)
tree543123f33d6bb0f2827d23b2fd1733fafa10ad12 /src/mongo/db/repl
parenta74ed820b13c39cc6a5eaf7d30489ffa41dfac2a (diff)
downloadmongo-a0516b5f896703682c98cf0b8c2e333f743f4dc1.tar.gz
SERVER-27771 add readConcern::afterClusterTime
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/SConscript6
-rw-r--r--src/mongo/db/repl/read_concern_args.cpp48
-rw-r--r--src/mongo/db/repl/read_concern_args.h31
-rw-r--r--src/mongo/db/repl/read_concern_args_test.cpp55
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp80
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h26
6 files changed, 220 insertions, 26 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index e8d3fe79e65..e1bb059fe44 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -627,6 +627,7 @@ env.Library('read_concern_args',
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/bson/util/bson_extract',
+ '$BUILD_DIR/mongo/db/logical_time',
'optime',
])
@@ -960,7 +961,10 @@ env.CppUnitTest(
source=[
'read_concern_args_test.cpp',
],
- LIBDEPS=['replica_set_messages']
+ LIBDEPS=[
+ 'replica_set_messages',
+ '$BUILD_DIR/mongo/db/logical_time',
+ ],
)
env.Library(target='optime',
diff --git a/src/mongo/db/repl/read_concern_args.cpp b/src/mongo/db/repl/read_concern_args.cpp
index 409560de922..2352547deb5 100644
--- a/src/mongo/db/repl/read_concern_args.cpp
+++ b/src/mongo/db/repl/read_concern_args.cpp
@@ -34,6 +34,7 @@
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/logical_time.h"
#include "mongo/db/repl/bson_extract_optime.h"
#include "mongo/util/mongoutils/str.h"
@@ -52,6 +53,7 @@ const char kLinearizableReadConcernStr[] = "linearizable";
const string ReadConcernArgs::kReadConcernFieldName("readConcern");
const string ReadConcernArgs::kAfterOpTimeFieldName("afterOpTime");
+const string ReadConcernArgs::kAfterClusterTimeFieldName("afterClusterTime");
const string ReadConcernArgs::kLevelFieldName("level");
ReadConcernArgs::ReadConcernArgs() = default;
@@ -60,6 +62,11 @@ ReadConcernArgs::ReadConcernArgs(boost::optional<OpTime> opTime,
boost::optional<ReadConcernLevel> level)
: _opTime(std::move(opTime)), _level(std::move(level)) {}
+ReadConcernArgs::ReadConcernArgs(boost::optional<OpTime> opTime,
+ boost::optional<LogicalTime> clusterTime,
+ boost::optional<ReadConcernLevel> level)
+ : _opTime(std::move(opTime)), _clusterTime(std::move(clusterTime)), _level(std::move(level)) {}
+
std::string ReadConcernArgs::toString() const {
return toBSON().toString();
}
@@ -71,19 +78,23 @@ BSONObj ReadConcernArgs::toBSON() const {
}
bool ReadConcernArgs::isEmpty() const {
- return getOpTime().isNull() && getLevel() == repl::ReadConcernLevel::kLocalReadConcern;
+ return !_clusterTime && !_opTime && !_level;
}
ReadConcernLevel ReadConcernArgs::getLevel() const {
return _level.value_or(ReadConcernLevel::kLocalReadConcern);
}
-OpTime ReadConcernArgs::getOpTime() const {
- return _opTime.value_or(OpTime());
+boost::optional<OpTime> ReadConcernArgs::getArgsOpTime() const {
+ return _opTime;
+}
+
+boost::optional<LogicalTime> ReadConcernArgs::getArgsClusterTime() const {
+ return _clusterTime;
}
Status ReadConcernArgs::initialize(const BSONElement& readConcernElem) {
- invariant(!_opTime && !_level); // only legal to call on uninitialized object.
+ invariant(isEmpty()); // only legal to call on uninitialized object.
if (readConcernElem.eoo()) {
return Status::OK();
@@ -108,14 +119,24 @@ Status ReadConcernArgs::initialize(const BSONElement& readConcernElem) {
return opTimeStatus;
}
_opTime = opTime;
+ } else if (fieldName == kAfterClusterTimeFieldName) {
+ Timestamp clusterTime;
+ auto clusterTimeStatus =
+ bsonExtractTimestampField(readConcernObj, kAfterClusterTimeFieldName, &clusterTime);
+ if (!clusterTimeStatus.isOK()) {
+ return clusterTimeStatus;
+ }
+ _clusterTime = LogicalTime(clusterTime);
} else if (fieldName == kLevelFieldName) {
std::string levelString;
// TODO pass field in rather than scanning again.
auto readCommittedStatus =
bsonExtractStringField(readConcernObj, kLevelFieldName, &levelString);
+
if (!readCommittedStatus.isOK()) {
return readCommittedStatus;
}
+
if (levelString == kLocalReadConcernStr) {
_level = ReadConcernLevel::kLocalReadConcern;
} else if (levelString == kMajorityReadConcernStr) {
@@ -136,6 +157,21 @@ Status ReadConcernArgs::initialize(const BSONElement& readConcernElem) {
}
}
+ if (_clusterTime && _opTime) {
+ return Status(ErrorCodes::InvalidOptions,
+ str::stream() << "Can not specify both " << kAfterClusterTimeFieldName
+ << " and "
+ << kAfterOpTimeFieldName);
+ }
+
+ if (_clusterTime && (getLevel() != ReadConcernLevel::kMajorityReadConcern)) {
+ return Status(ErrorCodes::InvalidOptions,
+ str::stream() << kAfterClusterTimeFieldName << " field can be set only if "
+ << kLevelFieldName
+ << " is equal to "
+ << kMajorityReadConcernStr);
+ }
+
return Status::OK();
}
@@ -168,6 +204,10 @@ void ReadConcernArgs::appendInfo(BSONObjBuilder* builder) const {
_opTime->append(&rcBuilder, kAfterOpTimeFieldName);
}
+ if (_clusterTime) {
+ rcBuilder.append(kAfterClusterTimeFieldName, _clusterTime->asTimestamp());
+ }
+
rcBuilder.done();
}
diff --git a/src/mongo/db/repl/read_concern_args.h b/src/mongo/db/repl/read_concern_args.h
index b258ce04461..6778a548228 100644
--- a/src/mongo/db/repl/read_concern_args.h
+++ b/src/mongo/db/repl/read_concern_args.h
@@ -33,6 +33,7 @@
#include "mongo/base/status.h"
#include "mongo/db/json.h"
+#include "mongo/db/logical_time.h"
#include "mongo/db/repl/optime.h"
#include "mongo/util/time_support.h"
@@ -48,19 +49,25 @@ class ReadConcernArgs {
public:
static const std::string kReadConcernFieldName;
static const std::string kAfterOpTimeFieldName;
+ static const std::string kAfterClusterTimeFieldName;
static const std::string kLevelFieldName;
ReadConcernArgs();
+
ReadConcernArgs(boost::optional<OpTime> opTime, boost::optional<ReadConcernLevel> level);
+ ReadConcernArgs(boost::optional<OpTime> opTime,
+ boost::optional<LogicalTime> clusterTime,
+ boost::optional<ReadConcernLevel> level);
/**
* Format:
* {
- * find: “coll”,
+ * find: "coll"
* filter: <Query Object>,
* readConcern: { // optional
* level: "[majority|local|linearizable]",
* afterOpTime: { ts: <timestamp>, term: <NumberLong> },
+ * afterClusterTime: <timestamp>,
* }
* }
*/
@@ -81,18 +88,34 @@ public:
void appendInfo(BSONObjBuilder* builder) const;
/**
- * Returns whether these arguments are 'empty' in the sense that no read concern has been
- * requested.
+ * Returns true if any of clusterTime, opTime or level arguments are set.
*/
bool isEmpty() const;
+ /**
+ * Returns default kLocalReadConcern if _level is not set.
+ */
ReadConcernLevel getLevel() const;
- OpTime getOpTime() const;
+
+ /**
+ * Returns the opTime. Deprecated: will be replaced with getArgsClusterTime.
+ */
+ boost::optional<OpTime> getArgsOpTime() const;
+
+ boost::optional<LogicalTime> getArgsClusterTime() const;
BSONObj toBSON() const;
std::string toString() const;
private:
+ /**
+ * Read data after the OpTime of an operation on this replica set. Deprecated.
+ * The only user is for read-after-optime calls using the config server optime.
+ */
boost::optional<OpTime> _opTime;
+ /**
+ * Read data after cluster-wide logical time.
+ */
+ boost::optional<LogicalTime> _clusterTime;
boost::optional<ReadConcernLevel> _level;
};
diff --git a/src/mongo/db/repl/read_concern_args_test.cpp b/src/mongo/db/repl/read_concern_args_test.cpp
index f915209e585..a41bc87d8ac 100644
--- a/src/mongo/db/repl/read_concern_args_test.cpp
+++ b/src/mongo/db/repl/read_concern_args_test.cpp
@@ -36,7 +36,7 @@ namespace mongo {
namespace repl {
namespace {
-TEST(ReadAfterParse, ReadAfterOnly) {
+TEST(ReadAfterParse, OpTimeOnly) {
ReadConcernArgs readAfterOpTime;
ASSERT_OK(readAfterOpTime.initialize(BSON(
"find"
@@ -46,12 +46,44 @@ TEST(ReadAfterParse, ReadAfterOnly) {
<< BSON(OpTime::kTimestampFieldName << Timestamp(20, 30) << OpTime::kTermFieldName
<< 2)))));
- ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp());
- ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm());
+ ASSERT_TRUE(readAfterOpTime.getArgsOpTime());
+ ASSERT_TRUE(!readAfterOpTime.getArgsClusterTime());
+ auto argsOpTime = readAfterOpTime.getArgsOpTime();
+ ASSERT_EQ(Timestamp(20, 30), argsOpTime->getTimestamp());
+ ASSERT_EQ(2, argsOpTime->getTerm());
ASSERT(ReadConcernLevel::kLocalReadConcern == readAfterOpTime.getLevel());
}
-TEST(ReadAfterParse, ReadCommitLevelOnly) {
+TEST(ReadAfterParse, ClusterTimeOnly) {
+ ReadConcernArgs readAfterOpTime;
+ // Must have level=majority
+ ASSERT_NOT_OK(
+ readAfterOpTime.initialize(BSON("find"
+ << "test"
+ << ReadConcernArgs::kReadConcernFieldName
+ << BSON(ReadConcernArgs::kAfterClusterTimeFieldName
+ << Timestamp(20, 30)))));
+}
+
+TEST(ReadAfterParse, ClusterTimeAndLevel) {
+ ReadConcernArgs readAfterOpTime;
+ // Must have level=majority
+ auto clusterTime = LogicalTime(Timestamp(20, 30));
+ ASSERT_OK(readAfterOpTime.initialize(BSON("find"
+ << "test"
+ << ReadConcernArgs::kReadConcernFieldName
+ << BSON(ReadConcernArgs::kAfterClusterTimeFieldName
+ << clusterTime.asTimestamp()
+ << ReadConcernArgs::kLevelFieldName
+ << "majority"))));
+ auto argsClusterTime = readAfterOpTime.getArgsClusterTime();
+ ASSERT_TRUE(argsClusterTime);
+ ASSERT_TRUE(!readAfterOpTime.getArgsOpTime());
+ ASSERT_TRUE(clusterTime == *argsClusterTime);
+ ASSERT(ReadConcernLevel::kMajorityReadConcern == readAfterOpTime.getLevel());
+}
+
+TEST(ReadAfterParse, LevelOnly) {
ReadConcernArgs readAfterOpTime;
ASSERT_OK(
readAfterOpTime.initialize(BSON("find"
@@ -59,25 +91,25 @@ TEST(ReadAfterParse, ReadCommitLevelOnly) {
<< ReadConcernArgs::kReadConcernFieldName
<< BSON(ReadConcernArgs::kLevelFieldName << "majority"))));
- ASSERT_TRUE(readAfterOpTime.getOpTime().isNull());
+ ASSERT_TRUE(!readAfterOpTime.getArgsOpTime());
+ ASSERT_TRUE(!readAfterOpTime.getArgsClusterTime());
ASSERT_TRUE(ReadConcernLevel::kMajorityReadConcern == readAfterOpTime.getLevel());
}
TEST(ReadAfterParse, ReadCommittedFullSpecification) {
ReadConcernArgs readAfterOpTime;
- ASSERT_OK(readAfterOpTime.initialize(BSON(
+ auto clusterTime = LogicalTime(Timestamp(100, 200));
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
"find"
<< "test"
<< ReadConcernArgs::kReadConcernFieldName
<< BSON(ReadConcernArgs::kAfterOpTimeFieldName
<< BSON(OpTime::kTimestampFieldName << Timestamp(20, 30) << OpTime::kTermFieldName
<< 2)
+ << ReadConcernArgs::kAfterClusterTimeFieldName
+ << clusterTime.asTimestamp()
<< ReadConcernArgs::kLevelFieldName
<< "majority"))));
-
- ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp());
- ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm());
- ASSERT(ReadConcernLevel::kMajorityReadConcern == readAfterOpTime.getLevel());
}
TEST(ReadAfterParse, Empty) {
@@ -85,7 +117,8 @@ TEST(ReadAfterParse, Empty) {
ASSERT_OK(readAfterOpTime.initialize(BSON("find"
<< "test")));
- ASSERT(readAfterOpTime.getOpTime().getTimestamp().isNull());
+ ASSERT_TRUE(!readAfterOpTime.getArgsOpTime());
+ ASSERT_TRUE(!readAfterOpTime.getArgsClusterTime());
ASSERT(ReadConcernLevel::kLocalReadConcern == readAfterOpTime.getLevel());
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 0b8efc13717..3600a08ea1a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -407,6 +407,10 @@ OpTime ReplicationCoordinatorImpl::_getCurrentCommittedSnapshotOpTime_inlock() c
return OpTime();
}
+LogicalTime ReplicationCoordinatorImpl::_getCurrentCommittedLogicalTime_inlock() const {
+ return LogicalTime(_getCurrentCommittedSnapshotOpTime_inlock().getTimestamp());
+}
+
void ReplicationCoordinatorImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) const {
_replExecutor.appendConnectionStats(stats);
}
@@ -1223,8 +1227,8 @@ OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const {
return _getMyLastDurableOpTime_inlock();
}
-Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCtx,
- const ReadConcernArgs& settings) {
+Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx,
+ const ReadConcernArgs& readConcern) {
// We should never wait for replication if we are holding any locks, because this can
// potentially block for long time while doing network activity.
if (opCtx->lockState()->isLocked()) {
@@ -1233,7 +1237,12 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCt
}
const bool isMajorityReadConcern =
- settings.getLevel() == ReadConcernLevel::kMajorityReadConcern;
+ readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
+
+ if (readConcern.getArgsClusterTime() && !isMajorityReadConcern) {
+ return {ErrorCodes::BadValue,
+ "only readConcern level majority is allowed when specifying afterClusterTime"};
+ }
if (isMajorityReadConcern && !getSettings().isMajorityReadConcernEnabled()) {
// This is an opt-in feature. Fail if the user didn't opt-in.
@@ -1242,9 +1251,18 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCt
"--enableMajorityReadConcern."};
}
- const auto targetOpTime = settings.getOpTime();
+ return Status::OK();
+}
+
+Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCtx,
+ const ReadConcernArgs& readConcern) {
+ auto verifyStatus = _validateReadConcern(opCtx, readConcern);
+ if (!verifyStatus.isOK()) {
+ return verifyStatus;
+ }
- if (targetOpTime.isNull()) {
+ // nothing to wait for
+ if (!readConcern.getArgsClusterTime() && !readConcern.getArgsOpTime()) {
return Status::OK();
}
@@ -1256,6 +1274,58 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCt
"node needs to be a replica set member to use read concern"};
}
+ if (readConcern.getArgsClusterTime()) {
+ auto targetTime = *readConcern.getArgsClusterTime();
+
+ if (readConcern.getArgsOpTime()) {
+ auto opTimeStamp = LogicalTime(readConcern.getArgsOpTime()->getTimestamp());
+ if (opTimeStamp > targetTime) {
+ targetTime = opTimeStamp;
+ }
+ }
+ return _waitUntilClusterTimeForRead(opCtx, targetTime);
+ } else {
+ return _waitUntilOpTimeForReadDeprecated(opCtx, readConcern);
+ }
+}
+
+Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(OperationContext* opCtx,
+ LogicalTime clusterTime) {
+ invariant(clusterTime != LogicalTime::kUninitialized);
+
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+
+ auto currentTime = _getCurrentCommittedLogicalTime_inlock();
+ if (clusterTime > currentTime) {
+ LOG(1) << "waitUntilClusterTime: waiting for clusterTime:" << clusterTime.toString()
+ << " to be in a snapshot -- current snapshot: " << currentTime.toString();
+ }
+
+ while (clusterTime > _getCurrentCommittedLogicalTime_inlock()) {
+ if (_inShutdown) {
+ return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
+ }
+
+ LOG(3) << "waitUntilClusterTime: waiting for a new snapshot until " << opCtx->getDeadline();
+
+ auto waitStatus =
+ opCtx->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock);
+ if (!waitStatus.isOK()) {
+ return waitStatus;
+ }
+ LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString();
+ }
+
+ return Status::OK();
+}
+
+Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
+ OperationContext* opCtx, const ReadConcernArgs& readConcern) {
+ const bool isMajorityReadConcern =
+ readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
+
+ const auto targetOpTime = readConcern.getArgsOpTime().value_or(OpTime());
+
stdx::unique_lock<stdx::mutex> lock(_mutex);
if (isMajorityReadConcern && !_externalState->snapshotsEnabled()) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index dcf29656c3f..5003435147e 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -169,7 +169,7 @@ public:
virtual OpTime getMyLastDurableOpTime() const override;
virtual Status waitUntilOpTimeForRead(OperationContext* opCtx,
- const ReadConcernArgs& settings) override;
+ const ReadConcernArgs& readConcern) override;
virtual OID getElectionId() override;
@@ -582,6 +582,16 @@ private:
OpTime _getCurrentCommittedSnapshotOpTime_inlock() const;
/**
+ * Returns the OpTime of the current committed snapshot converted to LogicalTime.
+ */
+ LogicalTime _getCurrentCommittedLogicalTime_inlock() const;
+
+ /**
+ * Verifies that ReadConcernArgs match node's readConcern.
+ */
+ Status _validateReadConcern(OperationContext* opCtx, const ReadConcernArgs& readConcern);
+
+ /**
* Helper method that removes entries from _slaveInfo if they correspond to a node
* with a member ID that is not in the current replica set config. Will always leave an
* entry for ourself at the beginning of _slaveInfo, even if we aren't present in the
@@ -1171,6 +1181,20 @@ private:
*/
ReplicationExecutor::EventHandle _cancelElectionIfNeeded_inTopoLock();
+ /**
+ * Waits until the optime of the current node is at least the opTime specified in 'readConcern'.
+ * It supports local readConcern, which _waitUntilClusterTimeForRead does not.
+ * TODO: remove when SERVER-28150 is done.
+ */
+ Status _waitUntilOpTimeForReadDeprecated(OperationContext* opCtx,
+ const ReadConcernArgs& readConcern);
+
+ /**
+ * Waits until the logicalTime of the current node is at least the 'clusterTime'.
+ * TODO: Merge with waitUntilOpTimeForRead() when SERVER-28150 is done.
+ */
+ Status _waitUntilClusterTimeForRead(OperationContext* opCtx, LogicalTime clusterTime);
+
//
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.