diff options
author | matt dannenberg <matt.dannenberg@10gen.com> | 2015-07-09 08:58:29 -0400 |
---|---|---|
committer | matt dannenberg <matt.dannenberg@10gen.com> | 2015-07-10 10:35:39 -0400 |
commit | 917e4197593077116a0e4e8d1db5ce96c3f444a6 (patch) | |
tree | 6f3dead0312530c4d5f4643868dee578cc8665dd /src/mongo | |
parent | 27e0f7a4b5b407d43de33db941cd413ea46399cd (diff) | |
download | mongo-917e4197593077116a0e4e8d1db5ce96c3f444a6.tar.gz |
SERVER-19211 support read committed in read after optime
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/read_after_optime_args.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/read_after_optime_args.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/read_after_optime_args_test.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_test.cpp | 146 |
5 files changed, 190 insertions, 4 deletions
diff --git a/src/mongo/db/repl/read_after_optime_args.cpp b/src/mongo/db/repl/read_after_optime_args.cpp index e3dcd87274b..532a2b2b9c5 100644 --- a/src/mongo/db/repl/read_after_optime_args.cpp +++ b/src/mongo/db/repl/read_after_optime_args.cpp @@ -45,10 +45,16 @@ const string ReadAfterOpTimeArgs::kRootFieldName("$readConcern"); const string ReadAfterOpTimeArgs::kOpTimeFieldName("afterOpTime"); const string ReadAfterOpTimeArgs::kOpTimestampFieldName("ts"); const string ReadAfterOpTimeArgs::kOpTermFieldName("term"); +const string ReadAfterOpTimeArgs::kReadCommittedFieldName("committed"); ReadAfterOpTimeArgs::ReadAfterOpTimeArgs() : ReadAfterOpTimeArgs(OpTime()) {} -ReadAfterOpTimeArgs::ReadAfterOpTimeArgs(OpTime opTime) : _opTime(std::move(opTime)) {} +ReadAfterOpTimeArgs::ReadAfterOpTimeArgs(OpTime opTime, bool readCommitted) + : _opTime(std::move(opTime)), _isReadCommitted(readCommitted) {} + +bool ReadAfterOpTimeArgs::isReadCommitted() const { + return _isReadCommitted; +} const OpTime& ReadAfterOpTimeArgs::getOpTime() const { return _opTime; @@ -95,6 +101,12 @@ Status ReadAfterOpTimeArgs::initialize(const BSONObj& cmdObj) { _opTime = OpTime(timestamp, termNumber); + auto readCommittedStatus = bsonExtractBooleanFieldWithDefault( + cmdObj, ReadAfterOpTimeArgs::kReadCommittedFieldName, false, &_isReadCommitted); + if (!readCommittedStatus.isOK()) { + return readCommittedStatus; + } + return Status::OK(); } diff --git a/src/mongo/db/repl/read_after_optime_args.h b/src/mongo/db/repl/read_after_optime_args.h index 04536ce8702..1369c8145ba 100644 --- a/src/mongo/db/repl/read_after_optime_args.h +++ b/src/mongo/db/repl/read_after_optime_args.h @@ -46,9 +46,10 @@ public: static const std::string kOpTimeFieldName; static const std::string kOpTimestampFieldName; static const std::string kOpTermFieldName; + static const std::string kReadCommittedFieldName; ReadAfterOpTimeArgs(); - explicit ReadAfterOpTimeArgs(OpTime opTime); + explicit ReadAfterOpTimeArgs(OpTime opTime, bool readCommitted = false); /** * Format: @@ -56,17 +57,20 @@ public: * find: “coll”, * filter: <Query Object>, * $readConcern: { // optional + * committed: 1, // optional * afterOpTime: { ts: <timestamp>, term: <NumberLong> }, * } * } */ Status initialize(const BSONObj& cmdObj); + bool isReadCommitted() const; const OpTime& getOpTime() const; const Milliseconds& getTimeout() const; private: OpTime _opTime; + bool _isReadCommitted = false; }; } // namespace repl diff --git a/src/mongo/db/repl/read_after_optime_args_test.cpp b/src/mongo/db/repl/read_after_optime_args_test.cpp index e79a9ff5dbb..a63ae023dad 100644 --- a/src/mongo/db/repl/read_after_optime_args_test.cpp +++ b/src/mongo/db/repl/read_after_optime_args_test.cpp @@ -45,6 +45,22 @@ TEST(ReadAfterParse, BasicFullSpecification) { ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp()); ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm()); + ASSERT_FALSE(readAfterOpTime.isReadCommitted()); +} + +TEST(ReadAfterParse, ReadCommittedFullSpecification) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_OK(readAfterOpTime.initialize(BSON("find" + << "test" << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName << BSON( + ReadAfterOpTimeArgs::kOpTimestampFieldName + << Timestamp(20, 30) + << ReadAfterOpTimeArgs::kOpTermFieldName + << 2)) << "committed" << true))); + + ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp()); + ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm()); + ASSERT(readAfterOpTime.isReadCommitted()); } TEST(ReadAfterParse, Empty) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 80bedd0cd26..510c4e49884 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -793,8 +793,13 @@ ReadAfterOpTimeResponse ReplicationCoordinatorImpl::waitUntilOpTime( Timer timer; stdx::unique_lock<stdx::mutex> lock(_mutex); + auto loopCondition = [this, settings, ts] { + return settings.isReadCommitted() + ? !_currentCommittedSnapshot || ts > *_currentCommittedSnapshot + : ts > _getMyLastOptime_inlock(); + }; - while (ts > _getMyLastOptime_inlock()) { + while (loopCondition()) { Status interruptedStatus = txn->checkForInterruptNoAssert(); if (!interruptedStatus.isOK()) { return ReadAfterOpTimeResponse(interruptedStatus, Milliseconds(timer.millis())); @@ -806,10 +811,13 @@ ReadAfterOpTimeResponse ReplicationCoordinatorImpl::waitUntilOpTime( } stdx::condition_variable condVar; + WriteConcernOptions writeConcern; + writeConcern.wMode = WriteConcernOptions::kMajority; + WaiterInfo waitInfo(&_opTimeWaiterList, txn->getOpID(), &ts, - nullptr, // Don't care about write concern. + settings.isReadCommitted() ? &writeConcern : nullptr, &condVar); if (CurOp::get(txn)->isMaxTimeSet()) { diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index f39877f1bc1..3e89da2ce09 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -2209,6 +2209,152 @@ TEST_F(ReplCoordTest, ReadAfterDeferredEqualOpTime) { ASSERT_OK(result.getStatus()); } +TEST_F(ReplCoordTest, CantUseReadAfterCommittedIfNotReplSet) { + init(ReplSettings()); + OperationContextNoop txn; + auto result = getReplCoord()->waitUntilOpTime( + &txn, ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 0), true)); + + ASSERT_FALSE(result.didWait()); + ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus()); +} + +TEST_F(ReplCoordTest, ReadAfterCommittedWhileShutdown) { + OperationContextNoop txn; + assertStartSuccess(BSON("_id" + << "mySet" + << "version" << 2 << "members" << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" << 0))), + HostAndPort("node1", 12345)); + getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); + + getReplCoord()->setMyLastOptime(OpTime(Timestamp(10, 0), 0)); + + shutdown(); + + auto result = getReplCoord()->waitUntilOpTime( + &txn, ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 0), true)); + + ASSERT_TRUE(result.didWait()); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result.getStatus()); +} + +TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) { + OperationContextReplMock txn; + assertStartSuccess(BSON("_id" + << "mySet" + << "version" << 2 << "members" << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" << 0))), + HostAndPort("node1", 12345)); + getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); + + getReplCoord()->setMyLastOptime(OpTime(Timestamp(10, 0), 0)); + + txn.setCheckForInterruptStatus(Status(ErrorCodes::Interrupted, "test")); + + auto result = getReplCoord()->waitUntilOpTime( + &txn, ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 0), true)); + + ASSERT_TRUE(result.didWait()); + ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus()); +} + +TEST_F(ReplCoordTest, ReadAfterCommittedGreaterOpTime) { + OperationContextNoop txn; + assertStartSuccess(BSON("_id" + << "mySet" + << "version" << 2 << "members" << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" << 0))), + HostAndPort("node1", 12345)); + getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); + + getReplCoord()->setMyLastOptime(OpTime(Timestamp(100, 0), 0)); + getReplCoord()->onSnapshotCreate(OpTime(Timestamp(100, 0), 0)); + auto result = getReplCoord()->waitUntilOpTime( + &txn, ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 0), true)); + + ASSERT_TRUE(result.didWait()); + ASSERT_OK(result.getStatus()); +} + +TEST_F(ReplCoordTest, ReadAfterCommittedEqualOpTime) { + OperationContextNoop txn; + assertStartSuccess(BSON("_id" + << "mySet" + << "version" << 2 << "members" << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" << 0))), + HostAndPort("node1", 12345)); + getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); + + + OpTime time(Timestamp(100, 0), 0); + getReplCoord()->setMyLastOptime(time); + getReplCoord()->onSnapshotCreate(time); + auto result = getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs(time, true)); + + ASSERT_TRUE(result.didWait()); + ASSERT_OK(result.getStatus()); +} + +TEST_F(ReplCoordTest, ReadAfterCommittedDeferredGreaterOpTime) { + OperationContextNoop txn; + assertStartSuccess(BSON("_id" + << "mySet" + << "version" << 2 << "members" << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" << 0))), + HostAndPort("node1", 12345)); + getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); + + getReplCoord()->setMyLastOptime(OpTime(Timestamp(0, 0), 0)); + OpTime committedOpTime(Timestamp(200, 0), 0); + auto pseudoLogOp = stdx::async(stdx::launch::async, + [this, &committedOpTime]() { + // Not guaranteed to be scheduled after waitUntil blocks... + getReplCoord()->setMyLastOptime(committedOpTime); + getReplCoord()->onSnapshotCreate(committedOpTime); + }); + + auto result = getReplCoord()->waitUntilOpTime( + &txn, ReadAfterOpTimeArgs(OpTime(Timestamp(100, 0), 0), true)); + pseudoLogOp.get(); + + ASSERT_TRUE(result.didWait()); + ASSERT_OK(result.getStatus()); +} + +TEST_F(ReplCoordTest, ReadAfterCommittedDeferredEqualOpTime) { + OperationContextNoop txn; + assertStartSuccess(BSON("_id" + << "mySet" + << "version" << 2 << "members" << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" << 0))), + HostAndPort("node1", 12345)); + getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); + + getReplCoord()->setMyLastOptime(OpTime(Timestamp(0, 0), 0)); + + OpTime opTimeToWait(Timestamp(100, 0), 0); + + auto pseudoLogOp = stdx::async(stdx::launch::async, + [this, &opTimeToWait]() { + // Not guaranteed to be scheduled after waitUntil blocks... + getReplCoord()->setMyLastOptime(opTimeToWait); + getReplCoord()->onSnapshotCreate(opTimeToWait); + }); + + auto result = getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs(opTimeToWait, true)); + pseudoLogOp.get(); + + ASSERT_TRUE(result.didWait()); + ASSERT_OK(result.getStatus()); +} + TEST_F(ReplCoordTest, MetadataWrongConfigVersion) { // Ensure that we do not process ReplicationMetadata when ConfigVersions do not match. assertStartSuccess(BSON("_id" |