diff options
author | Misha Tyulenev <misha@mongodb.com> | 2017-06-21 22:18:58 -0400 |
---|---|---|
committer | Misha Tyulenev <misha@mongodb.com> | 2017-06-21 22:20:08 -0400 |
commit | 48dc747414e18d8caa56f0fba23c80871f67c31a (patch) | |
tree | dffe91c2fc1c792ce45b199a3deab9dedc702c07 /src/mongo | |
parent | 358cf1d12d8c3787fe14fe4458a242186d12b3c0 (diff) | |
download | mongo-48dc747414e18d8caa56f0fba23c80871f67c31a.tar.gz |
SERVER-28150 Add support for readConcern::afterClusterTime for level == local
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/read_concern_args.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/read_concern_args.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/read_concern_args_test.cpp | 69 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 90 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 16 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_collection_operations_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/shell/mongo.js | 15 |
7 files changed, 130 insertions, 82 deletions
diff --git a/src/mongo/db/repl/read_concern_args.cpp b/src/mongo/db/repl/read_concern_args.cpp index fc85048d4b4..a10d66d1ed4 100644 --- a/src/mongo/db/repl/read_concern_args.cpp +++ b/src/mongo/db/repl/read_concern_args.cpp @@ -58,14 +58,16 @@ const string ReadConcernArgs::kLevelFieldName("level"); ReadConcernArgs::ReadConcernArgs() = default; +ReadConcernArgs::ReadConcernArgs(boost::optional<ReadConcernLevel> level) + : _level(std::move(level)) {} + ReadConcernArgs::ReadConcernArgs(boost::optional<OpTime> opTime, boost::optional<ReadConcernLevel> level) : _opTime(std::move(opTime)), _level(std::move(level)) {} -ReadConcernArgs::ReadConcernArgs(boost::optional<OpTime> opTime, - boost::optional<LogicalTime> clusterTime, +ReadConcernArgs::ReadConcernArgs(boost::optional<LogicalTime> clusterTime, boost::optional<ReadConcernLevel> level) - : _opTime(std::move(opTime)), _clusterTime(std::move(clusterTime)), _level(std::move(level)) {} + : _clusterTime(std::move(clusterTime)), _level(std::move(level)) {} std::string ReadConcernArgs::toString() const { return toBSON().toString(); @@ -164,12 +166,15 @@ Status ReadConcernArgs::initialize(const BSONElement& readConcernElem) { << kAfterOpTimeFieldName); } - if (_clusterTime && (getLevel() != ReadConcernLevel::kMajorityReadConcern)) { + if (_clusterTime && getLevel() != ReadConcernLevel::kMajorityReadConcern && + getLevel() != ReadConcernLevel::kLocalReadConcern) { return Status(ErrorCodes::InvalidOptions, str::stream() << kAfterClusterTimeFieldName << " field can be set only if " << kLevelFieldName << " is equal to " - << kMajorityReadConcernStr); + << kMajorityReadConcernStr + << " or " + << kLocalReadConcernStr); } if (_clusterTime && _clusterTime == LogicalTime::kUninitialized) { diff --git a/src/mongo/db/repl/read_concern_args.h b/src/mongo/db/repl/read_concern_args.h index 6778a548228..c6794b278fb 100644 --- a/src/mongo/db/repl/read_concern_args.h +++ b/src/mongo/db/repl/read_concern_args.h @@ -54,10 +54,11 @@ public: ReadConcernArgs(); + ReadConcernArgs(boost::optional<ReadConcernLevel> level); + ReadConcernArgs(boost::optional<OpTime> opTime, boost::optional<ReadConcernLevel> level); - ReadConcernArgs(boost::optional<OpTime> opTime, - boost::optional<LogicalTime> clusterTime, + ReadConcernArgs(boost::optional<LogicalTime> clusterTime, boost::optional<ReadConcernLevel> level); /** * Format: diff --git a/src/mongo/db/repl/read_concern_args_test.cpp b/src/mongo/db/repl/read_concern_args_test.cpp index a41bc87d8ac..830c1533622 100644 --- a/src/mongo/db/repl/read_concern_args_test.cpp +++ b/src/mongo/db/repl/read_concern_args_test.cpp @@ -56,16 +56,37 @@ TEST(ReadAfterParse, OpTimeOnly) { TEST(ReadAfterParse, ClusterTimeOnly) { ReadConcernArgs readAfterOpTime; + auto clusterTime = LogicalTime(Timestamp(20, 30)); + ASSERT_OK(readAfterOpTime.initialize(BSON("find" + << "test" + << ReadConcernArgs::kReadConcernFieldName + << BSON(ReadConcernArgs::kAfterClusterTimeFieldName + << clusterTime.asTimestamp())))); + auto argsClusterTime = readAfterOpTime.getArgsClusterTime(); + ASSERT_TRUE(argsClusterTime); + ASSERT_TRUE(!readAfterOpTime.getArgsOpTime()); + ASSERT_TRUE(clusterTime == *argsClusterTime); +} + +TEST(ReadAfterParse, ClusterTimeAndLevelLocal) { + ReadConcernArgs readAfterOpTime; // Must have level=majority - ASSERT_NOT_OK( - readAfterOpTime.initialize(BSON("find" - << "test" - << ReadConcernArgs::kReadConcernFieldName - << BSON(ReadConcernArgs::kAfterClusterTimeFieldName - << Timestamp(20, 30))))); + auto clusterTime = LogicalTime(Timestamp(20, 30)); + ASSERT_OK(readAfterOpTime.initialize(BSON("find" + << "test" + << ReadConcernArgs::kReadConcernFieldName + << BSON(ReadConcernArgs::kAfterClusterTimeFieldName + << clusterTime.asTimestamp() + << ReadConcernArgs::kLevelFieldName + << "local")))); + auto argsClusterTime = readAfterOpTime.getArgsClusterTime(); + ASSERT_TRUE(argsClusterTime); + ASSERT_TRUE(!readAfterOpTime.getArgsOpTime()); + ASSERT_TRUE(clusterTime == *argsClusterTime); + ASSERT(ReadConcernLevel::kLocalReadConcern == readAfterOpTime.getLevel()); } -TEST(ReadAfterParse, ClusterTimeAndLevel) { +TEST(ReadAfterParse, ClusterTimeAndLevelMajority) { ReadConcernArgs readAfterOpTime; // Must have level=majority auto clusterTime = LogicalTime(Timestamp(20, 30)); @@ -226,7 +247,20 @@ TEST(ReadAfterSerialize, Empty) { ASSERT_BSONOBJ_EQ(BSON(ReadConcernArgs::kReadConcernFieldName << BSONObj()), obj); } -TEST(ReadAfterSerialize, ReadAfterOnly) { +TEST(ReadAfterSerialize, AfterClusterTimeOnly) { + BSONObjBuilder builder; + auto clusterTime = LogicalTime(Timestamp(20, 30)); + ReadConcernArgs readAfterClusterTime(clusterTime, boost::none); + readAfterClusterTime.appendInfo(&builder); + + BSONObj expectedObj( + BSON(ReadConcernArgs::kReadConcernFieldName + << BSON(ReadConcernArgs::kAfterClusterTimeFieldName << clusterTime.asTimestamp()))); + + ASSERT_BSONOBJ_EQ(expectedObj, builder.done()); +} + +TEST(ReadAfterSerialize, AfterOpTimeOnly) { BSONObjBuilder builder; ReadConcernArgs readAfterOpTime(OpTime(Timestamp(20, 30), 2), boost::none); readAfterOpTime.appendInfo(&builder); @@ -241,7 +275,7 @@ TEST(ReadAfterSerialize, ReadAfterOnly) { TEST(ReadAfterSerialize, CommitLevelOnly) { BSONObjBuilder builder; - ReadConcernArgs readAfterOpTime(boost::none, ReadConcernLevel::kLocalReadConcern); + ReadConcernArgs readAfterOpTime(ReadConcernLevel::kLocalReadConcern); readAfterOpTime.appendInfo(&builder); BSONObj expectedObj(BSON(ReadConcernArgs::kReadConcernFieldName @@ -250,7 +284,22 @@ TEST(ReadAfterSerialize, CommitLevelOnly) { ASSERT_BSONOBJ_EQ(expectedObj, builder.done()); } -TEST(ReadAfterSerialize, FullSpecification) { +TEST(ReadAfterSerialize, iAfterCLusterTimeAndLevel) { + BSONObjBuilder builder; + auto clusterTime = LogicalTime(Timestamp(20, 30)); + ReadConcernArgs readAfterClusterTime(clusterTime, ReadConcernLevel::kMajorityReadConcern); + readAfterClusterTime.appendInfo(&builder); + + BSONObj expectedObj( + BSON(ReadConcernArgs::kReadConcernFieldName + << BSON(ReadConcernArgs::kLevelFieldName << "majority" + << ReadConcernArgs::kAfterClusterTimeFieldName + << clusterTime.asTimestamp()))); + + ASSERT_BSONOBJ_EQ(expectedObj, builder.done()); +} + +TEST(ReadAfterSerialize, AfterOpTimeAndLevel) { BSONObjBuilder builder; ReadConcernArgs readAfterOpTime(OpTime(Timestamp(20, 30), 2), ReadConcernLevel::kMajorityReadConcern); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 2d3ba978e54..d52920c0f45 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1052,9 +1052,12 @@ Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx, const bool isMajorityReadConcern = readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern; - if (readConcern.getArgsClusterTime() && !isMajorityReadConcern) { + if (readConcern.getArgsClusterTime() && + readConcern.getLevel() != ReadConcernLevel::kMajorityReadConcern && + readConcern.getLevel() != ReadConcernLevel::kLocalReadConcern) { return {ErrorCodes::BadValue, - "only readConcern level majority is allowed when specifying afterClusterTime"}; + "Only readConcern level 'majority' or 'local' is allowed when specifying " + "afterClusterTime"}; } if (isMajorityReadConcern && !getSettings().isMajorityReadConcernEnabled()) { @@ -1088,57 +1091,15 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCt } if (readConcern.getArgsClusterTime()) { - auto targetTime = *readConcern.getArgsClusterTime(); - - if (readConcern.getArgsOpTime()) { - auto opTimeStamp = LogicalTime(readConcern.getArgsOpTime()->getTimestamp()); - if (opTimeStamp > targetTime) { - targetTime = opTimeStamp; - } - } - return _waitUntilClusterTimeForRead(opCtx, targetTime); + return _waitUntilClusterTimeForRead(opCtx, readConcern); } else { return _waitUntilOpTimeForReadDeprecated(opCtx, readConcern); } } -Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(OperationContext* opCtx, - LogicalTime clusterTime) { - invariant(clusterTime != LogicalTime::kUninitialized); - - stdx::unique_lock<stdx::mutex> lock(_mutex); - - auto currentTime = _getCurrentCommittedLogicalTime_inlock(); - if (clusterTime > currentTime) { - LOG(1) << "waitUntilClusterTime: waiting for clusterTime:" << clusterTime.toString() - << " to be in a snapshot -- current snapshot: " << currentTime.toString(); - } - - while (clusterTime > _getCurrentCommittedLogicalTime_inlock()) { - if (_inShutdown) { - return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; - } - - LOG(3) << "waitUntilClusterTime: waiting for a new snapshot until " << opCtx->getDeadline(); - - auto waitStatus = - opCtx->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock); - if (!waitStatus.isOK()) { - return waitStatus; - } - LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString(); - } - - return Status::OK(); -} - -Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( - OperationContext* opCtx, const ReadConcernArgs& readConcern) { - const bool isMajorityReadConcern = - readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern; - - const auto targetOpTime = readConcern.getArgsOpTime().value_or(OpTime()); - +Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx, + bool isMajorityReadConcern, + OpTime targetOpTime) { stdx::unique_lock<stdx::mutex> lock(_mutex); if (isMajorityReadConcern && !_externalState->snapshotsEnabled()) { @@ -1147,9 +1108,8 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( } auto getCurrentOpTime = [this, isMajorityReadConcern]() { - auto committedOptime = - _currentCommittedSnapshot ? _currentCommittedSnapshot->opTime : OpTime(); - return isMajorityReadConcern ? committedOptime : _getMyLastAppliedOpTime_inlock(); + return isMajorityReadConcern ? _getCurrentCommittedSnapshotOpTime_inlock() + : _getMyLastAppliedOpTime_inlock(); }; if (isMajorityReadConcern && targetOpTime > getCurrentOpTime()) { @@ -1181,7 +1141,7 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( ThreadWaiter waiter(targetOpTime, nullptr, &condVar); WaiterGuard guard(&_opTimeWaiterList, &waiter); - LOG(3) << "waituntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime " + LOG(3) << "waitUntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime " << waiter << " until " << opCtx->getDeadline(); auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock); @@ -1193,6 +1153,32 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( return Status::OK(); } +Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead( + OperationContext* opCtx, const ReadConcernArgs& readConcern) { + auto clusterTime = *readConcern.getArgsClusterTime(); + invariant(clusterTime != LogicalTime::kUninitialized); + + // convert clusterTime to opTime so it can be used by the _opTimeWaiterList for wait on + // readConcern level local. + auto targetOpTime = OpTime(clusterTime.asTimestamp(), OpTime::kUninitializedTerm); + invariant(!readConcern.getArgsOpTime()); + + const bool isMajorityReadConcern = + readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern; + + return _waitUntilOpTime(opCtx, isMajorityReadConcern, targetOpTime); +} + +// TODO: remove when SERVER-29729 is done +Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( + OperationContext* opCtx, const ReadConcernArgs& readConcern) { + const bool isMajorityReadConcern = + readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern; + + const auto targetOpTime = readConcern.getArgsOpTime().value_or(OpTime()); + return _waitUntilOpTime(opCtx, isMajorityReadConcern, targetOpTime); +} + OpTime ReplicationCoordinatorImpl::_getMyLastAppliedOpTime_inlock() const { return _topCoord->getMyLastAppliedOpTime(); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 4bdb1cd4daf..e0ed3a93ff3 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -1094,18 +1094,24 @@ private: executor::TaskExecutor::EventHandle _cancelElectionIfNeeded_inlock(); /** + * Waits until the optime of the current node is at least the 'opTime'. + */ + Status _waitUntilOpTime(OperationContext* opCtx, bool isMajorityReadConcern, OpTime opTime); + + /** * Waits until the optime of the current node is at least the opTime specified in 'readConcern'. - * It supports local readConcern, which _waitUntilClusterTimeForRead does not. - * TODO: remove when SERVER-28150 is done. + * Supports local and majority readConcern. */ + // TODO: remove when SERVER-29729 is done Status _waitUntilOpTimeForReadDeprecated(OperationContext* opCtx, const ReadConcernArgs& readConcern); /** - * Waits until the logicalTime of the current node is at least the 'clusterTime'. - * TODO: Merge with waitUntilOpTimeForRead() when SERVER-28150 is done. + * Waits until the optime of the current node is at least the clusterTime specified in + * 'readConcern'. Supports local and majority readConcern. */ - Status _waitUntilClusterTimeForRead(OperationContext* opCtx, LogicalTime clusterTime); + Status _waitUntilClusterTimeForRead(OperationContext* opCtx, + const ReadConcernArgs& readConcern); /** * Returns a pseudorandom number no less than 0 and less than limit (which must be positive). diff --git a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations_impl.cpp b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations_impl.cpp index 89804b6d920..d82db079052 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations_impl.cpp @@ -188,7 +188,7 @@ void checkForExistingChunks(OperationContext* opCtx, const string& ns) { // Use readConcern local to guarantee we see any chunks that have been written and may // become committed; readConcern majority will not see the chunks if they have not made it // to the majority snapshot. - repl::ReadConcernArgs readConcern(boost::none, repl::ReadConcernLevel::kLocalReadConcern); + repl::ReadConcernArgs readConcern(repl::ReadConcernLevel::kLocalReadConcern); readConcern.appendInfo(&countBuilder); auto cmdResponse = uassertStatusOK( diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js index 471a0e3285c..796bfdcbf92 100644 --- a/src/mongo/shell/mongo.js +++ b/src/mongo/shell/mongo.js @@ -57,7 +57,8 @@ Mongo.prototype.isCausalConsistencyEnabled = function(cmdName, cmdObj) { return false; } - // Currently, read concern afterClusterTime is only supported for read concern level majority. + // Currently, read concern afterClusterTime is only supported for commands that support read + // concern level majority. var commandsThatSupportMajorityReadConcern = [ "count", "distinct", @@ -137,13 +138,13 @@ Mongo.prototype._injectAfterClusterTime = function(cmdObj) { const readConcern = Object.assign({}, cmdObj.readConcern); // Currently server supports afterClusterTime only with level:majority. Going forward it // will be relaxed for any level of readConcern. - if (!readConcern.hasOwnProperty("level") || readConcern.level === "majority") { - if (!readConcern.hasOwnProperty("afterClusterTime")) { - readConcern.afterClusterTime = operationTime; - } - readConcern.level = "majority"; - cmdObj.readConcern = readConcern; + if (!readConcern.hasOwnProperty("afterClusterTime")) { + readConcern.afterClusterTime = operationTime; + } + if (!readConcern.hasOwnProperty("level")) { + readConcern.level = "local"; } + cmdObj.readConcern = readConcern; } return cmdObj; }; |