summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMisha Tyulenev <misha@mongodb.com>2017-06-21 22:18:58 -0400
committerMisha Tyulenev <misha@mongodb.com>2017-06-21 22:20:08 -0400
commit48dc747414e18d8caa56f0fba23c80871f67c31a (patch)
treedffe91c2fc1c792ce45b199a3deab9dedc702c07 /src/mongo
parent358cf1d12d8c3787fe14fe4458a242186d12b3c0 (diff)
downloadmongo-48dc747414e18d8caa56f0fba23c80871f67c31a.tar.gz
SERVER-28150 Add support for readConcern::afterClusterTime for level == local
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/read_concern_args.cpp15
-rw-r--r--src/mongo/db/repl/read_concern_args.h5
-rw-r--r--src/mongo/db/repl/read_concern_args_test.cpp69
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp90
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h16
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_collection_operations_impl.cpp2
-rw-r--r--src/mongo/shell/mongo.js15
7 files changed, 130 insertions, 82 deletions
diff --git a/src/mongo/db/repl/read_concern_args.cpp b/src/mongo/db/repl/read_concern_args.cpp
index fc85048d4b4..a10d66d1ed4 100644
--- a/src/mongo/db/repl/read_concern_args.cpp
+++ b/src/mongo/db/repl/read_concern_args.cpp
@@ -58,14 +58,16 @@ const string ReadConcernArgs::kLevelFieldName("level");
ReadConcernArgs::ReadConcernArgs() = default;
+ReadConcernArgs::ReadConcernArgs(boost::optional<ReadConcernLevel> level)
+ : _level(std::move(level)) {}
+
ReadConcernArgs::ReadConcernArgs(boost::optional<OpTime> opTime,
boost::optional<ReadConcernLevel> level)
: _opTime(std::move(opTime)), _level(std::move(level)) {}
-ReadConcernArgs::ReadConcernArgs(boost::optional<OpTime> opTime,
- boost::optional<LogicalTime> clusterTime,
+ReadConcernArgs::ReadConcernArgs(boost::optional<LogicalTime> clusterTime,
boost::optional<ReadConcernLevel> level)
- : _opTime(std::move(opTime)), _clusterTime(std::move(clusterTime)), _level(std::move(level)) {}
+ : _clusterTime(std::move(clusterTime)), _level(std::move(level)) {}
std::string ReadConcernArgs::toString() const {
return toBSON().toString();
@@ -164,12 +166,15 @@ Status ReadConcernArgs::initialize(const BSONElement& readConcernElem) {
<< kAfterOpTimeFieldName);
}
- if (_clusterTime && (getLevel() != ReadConcernLevel::kMajorityReadConcern)) {
+ if (_clusterTime && getLevel() != ReadConcernLevel::kMajorityReadConcern &&
+ getLevel() != ReadConcernLevel::kLocalReadConcern) {
return Status(ErrorCodes::InvalidOptions,
str::stream() << kAfterClusterTimeFieldName << " field can be set only if "
<< kLevelFieldName
<< " is equal to "
- << kMajorityReadConcernStr);
+ << kMajorityReadConcernStr
+ << " or "
+ << kLocalReadConcernStr);
}
if (_clusterTime && _clusterTime == LogicalTime::kUninitialized) {
diff --git a/src/mongo/db/repl/read_concern_args.h b/src/mongo/db/repl/read_concern_args.h
index 6778a548228..c6794b278fb 100644
--- a/src/mongo/db/repl/read_concern_args.h
+++ b/src/mongo/db/repl/read_concern_args.h
@@ -54,10 +54,11 @@ public:
ReadConcernArgs();
+ ReadConcernArgs(boost::optional<ReadConcernLevel> level);
+
ReadConcernArgs(boost::optional<OpTime> opTime, boost::optional<ReadConcernLevel> level);
- ReadConcernArgs(boost::optional<OpTime> opTime,
- boost::optional<LogicalTime> clusterTime,
+ ReadConcernArgs(boost::optional<LogicalTime> clusterTime,
boost::optional<ReadConcernLevel> level);
/**
* Format:
diff --git a/src/mongo/db/repl/read_concern_args_test.cpp b/src/mongo/db/repl/read_concern_args_test.cpp
index a41bc87d8ac..830c1533622 100644
--- a/src/mongo/db/repl/read_concern_args_test.cpp
+++ b/src/mongo/db/repl/read_concern_args_test.cpp
@@ -56,16 +56,37 @@ TEST(ReadAfterParse, OpTimeOnly) {
TEST(ReadAfterParse, ClusterTimeOnly) {
ReadConcernArgs readAfterOpTime;
+ auto clusterTime = LogicalTime(Timestamp(20, 30));
+ ASSERT_OK(readAfterOpTime.initialize(BSON("find"
+ << "test"
+ << ReadConcernArgs::kReadConcernFieldName
+ << BSON(ReadConcernArgs::kAfterClusterTimeFieldName
+ << clusterTime.asTimestamp()))));
+ auto argsClusterTime = readAfterOpTime.getArgsClusterTime();
+ ASSERT_TRUE(argsClusterTime);
+ ASSERT_TRUE(!readAfterOpTime.getArgsOpTime());
+ ASSERT_TRUE(clusterTime == *argsClusterTime);
+}
+
+TEST(ReadAfterParse, ClusterTimeAndLevelLocal) {
+ ReadConcernArgs readAfterOpTime;
// Must have level=majority
- ASSERT_NOT_OK(
- readAfterOpTime.initialize(BSON("find"
- << "test"
- << ReadConcernArgs::kReadConcernFieldName
- << BSON(ReadConcernArgs::kAfterClusterTimeFieldName
- << Timestamp(20, 30)))));
+ auto clusterTime = LogicalTime(Timestamp(20, 30));
+ ASSERT_OK(readAfterOpTime.initialize(BSON("find"
+ << "test"
+ << ReadConcernArgs::kReadConcernFieldName
+ << BSON(ReadConcernArgs::kAfterClusterTimeFieldName
+ << clusterTime.asTimestamp()
+ << ReadConcernArgs::kLevelFieldName
+ << "local"))));
+ auto argsClusterTime = readAfterOpTime.getArgsClusterTime();
+ ASSERT_TRUE(argsClusterTime);
+ ASSERT_TRUE(!readAfterOpTime.getArgsOpTime());
+ ASSERT_TRUE(clusterTime == *argsClusterTime);
+ ASSERT(ReadConcernLevel::kLocalReadConcern == readAfterOpTime.getLevel());
}
-TEST(ReadAfterParse, ClusterTimeAndLevel) {
+TEST(ReadAfterParse, ClusterTimeAndLevelMajority) {
ReadConcernArgs readAfterOpTime;
// Must have level=majority
auto clusterTime = LogicalTime(Timestamp(20, 30));
@@ -226,7 +247,20 @@ TEST(ReadAfterSerialize, Empty) {
ASSERT_BSONOBJ_EQ(BSON(ReadConcernArgs::kReadConcernFieldName << BSONObj()), obj);
}
-TEST(ReadAfterSerialize, ReadAfterOnly) {
+TEST(ReadAfterSerialize, AfterClusterTimeOnly) {
+ BSONObjBuilder builder;
+ auto clusterTime = LogicalTime(Timestamp(20, 30));
+ ReadConcernArgs readAfterClusterTime(clusterTime, boost::none);
+ readAfterClusterTime.appendInfo(&builder);
+
+ BSONObj expectedObj(
+ BSON(ReadConcernArgs::kReadConcernFieldName
+ << BSON(ReadConcernArgs::kAfterClusterTimeFieldName << clusterTime.asTimestamp())));
+
+ ASSERT_BSONOBJ_EQ(expectedObj, builder.done());
+}
+
+TEST(ReadAfterSerialize, AfterOpTimeOnly) {
BSONObjBuilder builder;
ReadConcernArgs readAfterOpTime(OpTime(Timestamp(20, 30), 2), boost::none);
readAfterOpTime.appendInfo(&builder);
@@ -241,7 +275,7 @@ TEST(ReadAfterSerialize, ReadAfterOnly) {
TEST(ReadAfterSerialize, CommitLevelOnly) {
BSONObjBuilder builder;
- ReadConcernArgs readAfterOpTime(boost::none, ReadConcernLevel::kLocalReadConcern);
+ ReadConcernArgs readAfterOpTime(ReadConcernLevel::kLocalReadConcern);
readAfterOpTime.appendInfo(&builder);
BSONObj expectedObj(BSON(ReadConcernArgs::kReadConcernFieldName
@@ -250,7 +284,22 @@ TEST(ReadAfterSerialize, CommitLevelOnly) {
ASSERT_BSONOBJ_EQ(expectedObj, builder.done());
}
-TEST(ReadAfterSerialize, FullSpecification) {
+TEST(ReadAfterSerialize, iAfterCLusterTimeAndLevel) {
+ BSONObjBuilder builder;
+ auto clusterTime = LogicalTime(Timestamp(20, 30));
+ ReadConcernArgs readAfterClusterTime(clusterTime, ReadConcernLevel::kMajorityReadConcern);
+ readAfterClusterTime.appendInfo(&builder);
+
+ BSONObj expectedObj(
+ BSON(ReadConcernArgs::kReadConcernFieldName
+ << BSON(ReadConcernArgs::kLevelFieldName << "majority"
+ << ReadConcernArgs::kAfterClusterTimeFieldName
+ << clusterTime.asTimestamp())));
+
+ ASSERT_BSONOBJ_EQ(expectedObj, builder.done());
+}
+
+TEST(ReadAfterSerialize, AfterOpTimeAndLevel) {
BSONObjBuilder builder;
ReadConcernArgs readAfterOpTime(OpTime(Timestamp(20, 30), 2),
ReadConcernLevel::kMajorityReadConcern);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 2d3ba978e54..d52920c0f45 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1052,9 +1052,12 @@ Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx,
const bool isMajorityReadConcern =
readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
- if (readConcern.getArgsClusterTime() && !isMajorityReadConcern) {
+ if (readConcern.getArgsClusterTime() &&
+ readConcern.getLevel() != ReadConcernLevel::kMajorityReadConcern &&
+ readConcern.getLevel() != ReadConcernLevel::kLocalReadConcern) {
return {ErrorCodes::BadValue,
- "only readConcern level majority is allowed when specifying afterClusterTime"};
+ "Only readConcern level 'majority' or 'local' is allowed when specifying "
+ "afterClusterTime"};
}
if (isMajorityReadConcern && !getSettings().isMajorityReadConcernEnabled()) {
@@ -1088,57 +1091,15 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCt
}
if (readConcern.getArgsClusterTime()) {
- auto targetTime = *readConcern.getArgsClusterTime();
-
- if (readConcern.getArgsOpTime()) {
- auto opTimeStamp = LogicalTime(readConcern.getArgsOpTime()->getTimestamp());
- if (opTimeStamp > targetTime) {
- targetTime = opTimeStamp;
- }
- }
- return _waitUntilClusterTimeForRead(opCtx, targetTime);
+ return _waitUntilClusterTimeForRead(opCtx, readConcern);
} else {
return _waitUntilOpTimeForReadDeprecated(opCtx, readConcern);
}
}
-Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(OperationContext* opCtx,
- LogicalTime clusterTime) {
- invariant(clusterTime != LogicalTime::kUninitialized);
-
- stdx::unique_lock<stdx::mutex> lock(_mutex);
-
- auto currentTime = _getCurrentCommittedLogicalTime_inlock();
- if (clusterTime > currentTime) {
- LOG(1) << "waitUntilClusterTime: waiting for clusterTime:" << clusterTime.toString()
- << " to be in a snapshot -- current snapshot: " << currentTime.toString();
- }
-
- while (clusterTime > _getCurrentCommittedLogicalTime_inlock()) {
- if (_inShutdown) {
- return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
- }
-
- LOG(3) << "waitUntilClusterTime: waiting for a new snapshot until " << opCtx->getDeadline();
-
- auto waitStatus =
- opCtx->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock);
- if (!waitStatus.isOK()) {
- return waitStatus;
- }
- LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString();
- }
-
- return Status::OK();
-}
-
-Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
- OperationContext* opCtx, const ReadConcernArgs& readConcern) {
- const bool isMajorityReadConcern =
- readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
-
- const auto targetOpTime = readConcern.getArgsOpTime().value_or(OpTime());
-
+Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx,
+ bool isMajorityReadConcern,
+ OpTime targetOpTime) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
if (isMajorityReadConcern && !_externalState->snapshotsEnabled()) {
@@ -1147,9 +1108,8 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
}
auto getCurrentOpTime = [this, isMajorityReadConcern]() {
- auto committedOptime =
- _currentCommittedSnapshot ? _currentCommittedSnapshot->opTime : OpTime();
- return isMajorityReadConcern ? committedOptime : _getMyLastAppliedOpTime_inlock();
+ return isMajorityReadConcern ? _getCurrentCommittedSnapshotOpTime_inlock()
+ : _getMyLastAppliedOpTime_inlock();
};
if (isMajorityReadConcern && targetOpTime > getCurrentOpTime()) {
@@ -1181,7 +1141,7 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
ThreadWaiter waiter(targetOpTime, nullptr, &condVar);
WaiterGuard guard(&_opTimeWaiterList, &waiter);
- LOG(3) << "waituntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime "
+ LOG(3) << "waitUntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime "
<< waiter << " until " << opCtx->getDeadline();
auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock);
@@ -1193,6 +1153,32 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
return Status::OK();
}
+Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(
+ OperationContext* opCtx, const ReadConcernArgs& readConcern) {
+ auto clusterTime = *readConcern.getArgsClusterTime();
+ invariant(clusterTime != LogicalTime::kUninitialized);
+
+ // convert clusterTime to opTime so it can be used by the _opTimeWaiterList for wait on
+ // readConcern level local.
+ auto targetOpTime = OpTime(clusterTime.asTimestamp(), OpTime::kUninitializedTerm);
+ invariant(!readConcern.getArgsOpTime());
+
+ const bool isMajorityReadConcern =
+ readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
+
+ return _waitUntilOpTime(opCtx, isMajorityReadConcern, targetOpTime);
+}
+
+// TODO: remove when SERVER-29729 is done
+Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
+ OperationContext* opCtx, const ReadConcernArgs& readConcern) {
+ const bool isMajorityReadConcern =
+ readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
+
+ const auto targetOpTime = readConcern.getArgsOpTime().value_or(OpTime());
+ return _waitUntilOpTime(opCtx, isMajorityReadConcern, targetOpTime);
+}
+
OpTime ReplicationCoordinatorImpl::_getMyLastAppliedOpTime_inlock() const {
return _topCoord->getMyLastAppliedOpTime();
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 4bdb1cd4daf..e0ed3a93ff3 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -1094,18 +1094,24 @@ private:
executor::TaskExecutor::EventHandle _cancelElectionIfNeeded_inlock();
/**
+ * Waits until the optime of the current node is at least the 'opTime'.
+ */
+ Status _waitUntilOpTime(OperationContext* opCtx, bool isMajorityReadConcern, OpTime opTime);
+
+ /**
* Waits until the optime of the current node is at least the opTime specified in 'readConcern'.
- * It supports local readConcern, which _waitUntilClusterTimeForRead does not.
- * TODO: remove when SERVER-28150 is done.
+ * Supports local and majority readConcern.
*/
+ // TODO: remove when SERVER-29729 is done
Status _waitUntilOpTimeForReadDeprecated(OperationContext* opCtx,
const ReadConcernArgs& readConcern);
/**
- * Waits until the logicalTime of the current node is at least the 'clusterTime'.
- * TODO: Merge with waitUntilOpTimeForRead() when SERVER-28150 is done.
+ * Waits until the optime of the current node is at least the clusterTime specified in
+ * 'readConcern'. Supports local and majority readConcern.
*/
- Status _waitUntilClusterTimeForRead(OperationContext* opCtx, LogicalTime clusterTime);
+ Status _waitUntilClusterTimeForRead(OperationContext* opCtx,
+ const ReadConcernArgs& readConcern);
/**
* Returns a pseudorandom number no less than 0 and less than limit (which must be positive).
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations_impl.cpp b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations_impl.cpp
index 89804b6d920..d82db079052 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations_impl.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations_impl.cpp
@@ -188,7 +188,7 @@ void checkForExistingChunks(OperationContext* opCtx, const string& ns) {
// Use readConcern local to guarantee we see any chunks that have been written and may
// become committed; readConcern majority will not see the chunks if they have not made it
// to the majority snapshot.
- repl::ReadConcernArgs readConcern(boost::none, repl::ReadConcernLevel::kLocalReadConcern);
+ repl::ReadConcernArgs readConcern(repl::ReadConcernLevel::kLocalReadConcern);
readConcern.appendInfo(&countBuilder);
auto cmdResponse = uassertStatusOK(
diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js
index 471a0e3285c..796bfdcbf92 100644
--- a/src/mongo/shell/mongo.js
+++ b/src/mongo/shell/mongo.js
@@ -57,7 +57,8 @@ Mongo.prototype.isCausalConsistencyEnabled = function(cmdName, cmdObj) {
return false;
}
- // Currently, read concern afterClusterTime is only supported for read concern level majority.
+ // Currently, read concern afterClusterTime is only supported for commands that support read
+ // concern level majority.
var commandsThatSupportMajorityReadConcern = [
"count",
"distinct",
@@ -137,13 +138,13 @@ Mongo.prototype._injectAfterClusterTime = function(cmdObj) {
const readConcern = Object.assign({}, cmdObj.readConcern);
// Currently server supports afterClusterTime only with level:majority. Going forward it
// will be relaxed for any level of readConcern.
- if (!readConcern.hasOwnProperty("level") || readConcern.level === "majority") {
- if (!readConcern.hasOwnProperty("afterClusterTime")) {
- readConcern.afterClusterTime = operationTime;
- }
- readConcern.level = "majority";
- cmdObj.readConcern = readConcern;
+ if (!readConcern.hasOwnProperty("afterClusterTime")) {
+ readConcern.afterClusterTime = operationTime;
+ }
+ if (!readConcern.hasOwnProperty("level")) {
+ readConcern.level = "local";
}
+ cmdObj.readConcern = readConcern;
}
return cmdObj;
};