summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2015-07-09 08:58:29 -0400
committermatt dannenberg <matt.dannenberg@10gen.com>2015-07-10 10:35:39 -0400
commit917e4197593077116a0e4e8d1db5ce96c3f444a6 (patch)
tree6f3dead0312530c4d5f4643868dee578cc8665dd /src/mongo
parent27e0f7a4b5b407d43de33db941cd413ea46399cd (diff)
downloadmongo-917e4197593077116a0e4e8d1db5ce96c3f444a6.tar.gz
SERVER-19211 support read committed in read after optime
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/read_after_optime_args.cpp14
-rw-r--r--src/mongo/db/repl/read_after_optime_args.h6
-rw-r--r--src/mongo/db/repl/read_after_optime_args_test.cpp16
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp12
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp146
5 files changed, 190 insertions, 4 deletions
diff --git a/src/mongo/db/repl/read_after_optime_args.cpp b/src/mongo/db/repl/read_after_optime_args.cpp
index e3dcd87274b..532a2b2b9c5 100644
--- a/src/mongo/db/repl/read_after_optime_args.cpp
+++ b/src/mongo/db/repl/read_after_optime_args.cpp
@@ -45,10 +45,16 @@ const string ReadAfterOpTimeArgs::kRootFieldName("$readConcern");
const string ReadAfterOpTimeArgs::kOpTimeFieldName("afterOpTime");
const string ReadAfterOpTimeArgs::kOpTimestampFieldName("ts");
const string ReadAfterOpTimeArgs::kOpTermFieldName("term");
+const string ReadAfterOpTimeArgs::kReadCommittedFieldName("committed");
ReadAfterOpTimeArgs::ReadAfterOpTimeArgs() : ReadAfterOpTimeArgs(OpTime()) {}
-ReadAfterOpTimeArgs::ReadAfterOpTimeArgs(OpTime opTime) : _opTime(std::move(opTime)) {}
+ReadAfterOpTimeArgs::ReadAfterOpTimeArgs(OpTime opTime, bool readCommitted)
+ : _opTime(std::move(opTime)), _isReadCommitted(readCommitted) {}
+
+bool ReadAfterOpTimeArgs::isReadCommitted() const {
+ return _isReadCommitted;
+}
const OpTime& ReadAfterOpTimeArgs::getOpTime() const {
return _opTime;
@@ -95,6 +101,12 @@ Status ReadAfterOpTimeArgs::initialize(const BSONObj& cmdObj) {
_opTime = OpTime(timestamp, termNumber);
+ auto readCommittedStatus = bsonExtractBooleanFieldWithDefault(
+ cmdObj, ReadAfterOpTimeArgs::kReadCommittedFieldName, false, &_isReadCommitted);
+ if (!readCommittedStatus.isOK()) {
+ return readCommittedStatus;
+ }
+
return Status::OK();
}
diff --git a/src/mongo/db/repl/read_after_optime_args.h b/src/mongo/db/repl/read_after_optime_args.h
index 04536ce8702..1369c8145ba 100644
--- a/src/mongo/db/repl/read_after_optime_args.h
+++ b/src/mongo/db/repl/read_after_optime_args.h
@@ -46,9 +46,10 @@ public:
static const std::string kOpTimeFieldName;
static const std::string kOpTimestampFieldName;
static const std::string kOpTermFieldName;
+ static const std::string kReadCommittedFieldName;
ReadAfterOpTimeArgs();
- explicit ReadAfterOpTimeArgs(OpTime opTime);
+ explicit ReadAfterOpTimeArgs(OpTime opTime, bool readCommitted = false);
/**
* Format:
@@ -56,17 +57,20 @@ public:
* find: “coll”,
* filter: <Query Object>,
* $readConcern: { // optional
+ * committed: 1, // optional
* afterOpTime: { ts: <timestamp>, term: <NumberLong> },
* }
* }
*/
Status initialize(const BSONObj& cmdObj);
+ bool isReadCommitted() const;
const OpTime& getOpTime() const;
const Milliseconds& getTimeout() const;
private:
OpTime _opTime;
+ bool _isReadCommitted = false;
};
} // namespace repl
diff --git a/src/mongo/db/repl/read_after_optime_args_test.cpp b/src/mongo/db/repl/read_after_optime_args_test.cpp
index e79a9ff5dbb..a63ae023dad 100644
--- a/src/mongo/db/repl/read_after_optime_args_test.cpp
+++ b/src/mongo/db/repl/read_after_optime_args_test.cpp
@@ -45,6 +45,22 @@ TEST(ReadAfterParse, BasicFullSpecification) {
ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp());
ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm());
+ ASSERT_FALSE(readAfterOpTime.isReadCommitted());
+}
+
+TEST(ReadAfterParse, ReadCommittedFullSpecification) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_OK(readAfterOpTime.initialize(BSON("find"
+ << "test" << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName << BSON(
+ ReadAfterOpTimeArgs::kOpTimestampFieldName
+ << Timestamp(20, 30)
+ << ReadAfterOpTimeArgs::kOpTermFieldName
+ << 2)) << "committed" << true)));
+
+ ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp());
+ ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm());
+ ASSERT(readAfterOpTime.isReadCommitted());
}
TEST(ReadAfterParse, Empty) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 80bedd0cd26..510c4e49884 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -793,8 +793,13 @@ ReadAfterOpTimeResponse ReplicationCoordinatorImpl::waitUntilOpTime(
Timer timer;
stdx::unique_lock<stdx::mutex> lock(_mutex);
+ auto loopCondition = [this, settings, ts] {
+ return settings.isReadCommitted()
+ ? !_currentCommittedSnapshot || ts > *_currentCommittedSnapshot
+ : ts > _getMyLastOptime_inlock();
+ };
- while (ts > _getMyLastOptime_inlock()) {
+ while (loopCondition()) {
Status interruptedStatus = txn->checkForInterruptNoAssert();
if (!interruptedStatus.isOK()) {
return ReadAfterOpTimeResponse(interruptedStatus, Milliseconds(timer.millis()));
@@ -806,10 +811,13 @@ ReadAfterOpTimeResponse ReplicationCoordinatorImpl::waitUntilOpTime(
}
stdx::condition_variable condVar;
+ WriteConcernOptions writeConcern;
+ writeConcern.wMode = WriteConcernOptions::kMajority;
+
WaiterInfo waitInfo(&_opTimeWaiterList,
txn->getOpID(),
&ts,
- nullptr, // Don't care about write concern.
+ settings.isReadCommitted() ? &writeConcern : nullptr,
&condVar);
if (CurOp::get(txn)->isMaxTimeSet()) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index f39877f1bc1..3e89da2ce09 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -2209,6 +2209,152 @@ TEST_F(ReplCoordTest, ReadAfterDeferredEqualOpTime) {
ASSERT_OK(result.getStatus());
}
+TEST_F(ReplCoordTest, CantUseReadAfterCommittedIfNotReplSet) {
+ init(ReplSettings());
+ OperationContextNoop txn;
+ auto result = getReplCoord()->waitUntilOpTime(
+ &txn, ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 0), true));
+
+ ASSERT_FALSE(result.didWait());
+ ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus());
+}
+
+TEST_F(ReplCoordTest, ReadAfterCommittedWhileShutdown) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+ getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
+
+ getReplCoord()->setMyLastOptime(OpTime(Timestamp(10, 0), 0));
+
+ shutdown();
+
+ auto result = getReplCoord()->waitUntilOpTime(
+ &txn, ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 0), true));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result.getStatus());
+}
+
+TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) {
+ OperationContextReplMock txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+ getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
+
+ getReplCoord()->setMyLastOptime(OpTime(Timestamp(10, 0), 0));
+
+ txn.setCheckForInterruptStatus(Status(ErrorCodes::Interrupted, "test"));
+
+ auto result = getReplCoord()->waitUntilOpTime(
+ &txn, ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 0), true));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus());
+}
+
+TEST_F(ReplCoordTest, ReadAfterCommittedGreaterOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+ getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
+
+ getReplCoord()->setMyLastOptime(OpTime(Timestamp(100, 0), 0));
+ getReplCoord()->onSnapshotCreate(OpTime(Timestamp(100, 0), 0));
+ auto result = getReplCoord()->waitUntilOpTime(
+ &txn, ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 0), true));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+}
+
+TEST_F(ReplCoordTest, ReadAfterCommittedEqualOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+ getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
+
+
+ OpTime time(Timestamp(100, 0), 0);
+ getReplCoord()->setMyLastOptime(time);
+ getReplCoord()->onSnapshotCreate(time);
+ auto result = getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs(time, true));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+}
+
+TEST_F(ReplCoordTest, ReadAfterCommittedDeferredGreaterOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+ getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
+
+ getReplCoord()->setMyLastOptime(OpTime(Timestamp(0, 0), 0));
+ OpTime committedOpTime(Timestamp(200, 0), 0);
+ auto pseudoLogOp = stdx::async(stdx::launch::async,
+ [this, &committedOpTime]() {
+ // Not guaranteed to be scheduled after waitUntil blocks...
+ getReplCoord()->setMyLastOptime(committedOpTime);
+ getReplCoord()->onSnapshotCreate(committedOpTime);
+ });
+
+ auto result = getReplCoord()->waitUntilOpTime(
+ &txn, ReadAfterOpTimeArgs(OpTime(Timestamp(100, 0), 0), true));
+ pseudoLogOp.get();
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+}
+
+TEST_F(ReplCoordTest, ReadAfterCommittedDeferredEqualOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+ getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
+
+ getReplCoord()->setMyLastOptime(OpTime(Timestamp(0, 0), 0));
+
+ OpTime opTimeToWait(Timestamp(100, 0), 0);
+
+ auto pseudoLogOp = stdx::async(stdx::launch::async,
+ [this, &opTimeToWait]() {
+ // Not guaranteed to be scheduled after waitUntil blocks...
+ getReplCoord()->setMyLastOptime(opTimeToWait);
+ getReplCoord()->onSnapshotCreate(opTimeToWait);
+ });
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs(opTimeToWait, true));
+ pseudoLogOp.get();
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+}
+
TEST_F(ReplCoordTest, MetadataWrongConfigVersion) {
// Ensure that we do not process ReplicationMetadata when ConfigVersions do not match.
assertStartSuccess(BSON("_id"