summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/SConscript28
-rw-r--r--src/mongo/db/repl/operation_context_repl_mock.cpp44
-rw-r--r--src/mongo/db/repl/operation_context_repl_mock.h22
-rw-r--r--src/mongo/db/repl/read_after_optime_args.cpp134
-rw-r--r--src/mongo/db/repl/read_after_optime_args.h78
-rw-r--r--src/mongo/db/repl/read_after_optime_args_test.cpp178
-rw-r--r--src/mongo/db/repl/read_after_optime_response.cpp87
-rw-r--r--src/mongo/db/repl/read_after_optime_response.h91
-rw-r--r--src/mongo/db/repl/read_after_optime_response_test.cpp82
-rw-r--r--src/mongo/db/repl/replication_coordinator.h19
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp102
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h111
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp226
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h4
15 files changed, 1113 insertions, 101 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index fbb37662882..8eb73d7f8ba 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -246,11 +246,21 @@ env.Library('replmocks',
'replication_executor',
])
+env.Library('read_after_optime_args',
+ [
+ 'read_after_optime_args.cpp'
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base/base',
+ '$BUILD_DIR/mongo/bson/util/bson_extract',
+ ])
+
env.Library('replica_set_messages',
[
'handshake_args.cpp',
'is_master_response.cpp',
'member_config.cpp',
+ 'read_after_optime_response.cpp',
'repl_set_declare_election_winner_args.cpp',
'repl_set_heartbeat_args.cpp',
'repl_set_heartbeat_args_v1.cpp',
@@ -264,6 +274,7 @@ env.Library('replica_set_messages',
'last_vote.cpp',
],
LIBDEPS=[
+ 'read_after_optime_args',
'$BUILD_DIR/mongo/bson/bson',
'$BUILD_DIR/mongo/bson/util/bson_extract',
'$BUILD_DIR/mongo/util/net/hostandport',
@@ -449,3 +460,20 @@ env.CppUnitTest(
'task_runner_test_fixture',
],
)
+
+env.CppUnitTest(
+ target='read_after_optime_args_test',
+ source=[
+ 'read_after_optime_args_test.cpp',
+ ],
+ LIBDEPS=['replica_set_messages']
+)
+
+env.CppUnitTest(
+ target='read_after_optime_response_test',
+ source=[
+ 'read_after_optime_response_test.cpp',
+ ],
+ LIBDEPS=['replica_set_messages']
+)
+
diff --git a/src/mongo/db/repl/operation_context_repl_mock.cpp b/src/mongo/db/repl/operation_context_repl_mock.cpp
index b5c44316261..e1730eacf1c 100644
--- a/src/mongo/db/repl/operation_context_repl_mock.cpp
+++ b/src/mongo/db/repl/operation_context_repl_mock.cpp
@@ -36,10 +36,50 @@
namespace mongo {
namespace repl {
- OperationContextReplMock::OperationContextReplMock()
- : _lockState(new MMAPV1LockerImpl()) { }
+ OperationContextReplMock::OperationContextReplMock():
+ _lockState(new MMAPV1LockerImpl()),
+ _opID(0),
+ _checkForInterruptStatus(Status::OK()),
+ _maxTimeMicrosRemaining(0) {
+ }
OperationContextReplMock::~OperationContextReplMock() {}
+ Locker* OperationContextReplMock::lockState() const {
+ return _lockState.get();
+ }
+
+ unsigned int OperationContextReplMock::getOpID() const {
+ return _opID;
+ }
+
+ void OperationContextReplMock::setOpID(unsigned int opID) {
+ _opID = opID;
+ }
+
+ void OperationContextReplMock::checkForInterrupt() const {
+ uassertStatusOK(checkForInterruptNoAssert());
+ }
+
+ Status OperationContextReplMock::checkForInterruptNoAssert() const {
+ if (!_checkForInterruptStatus.isOK()) {
+ return _checkForInterruptStatus;
+ }
+
+ return Status::OK();
+ }
+
+ void OperationContextReplMock::setCheckForInterruptStatus(Status status) {
+ _checkForInterruptStatus = std::move(status);
+ }
+
+ uint64_t OperationContextReplMock::getRemainingMaxTimeMicros() const {
+ return _maxTimeMicrosRemaining;
+ }
+
+ void OperationContextReplMock::setRemainingMaxTimeMicros(uint64_t micros) {
+ _maxTimeMicrosRemaining = micros;
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/operation_context_repl_mock.h b/src/mongo/db/repl/operation_context_repl_mock.h
index 660fbdfc5db..fb53dff2e3c 100644
--- a/src/mongo/db/repl/operation_context_repl_mock.h
+++ b/src/mongo/db/repl/operation_context_repl_mock.h
@@ -40,16 +40,36 @@ namespace repl {
/**
* Mock implementation of OperationContext that can be used with real instances of LockManager.
+ * Note this is not thread safe and the setter methods should only be called in the context
+ * where access to this object is guaranteed to be serialized.
*/
class OperationContextReplMock : public OperationContextNoop {
public:
OperationContextReplMock();
virtual ~OperationContextReplMock();
- virtual Locker* lockState() const { return _lockState.get(); }
+ virtual Locker* lockState() const override;
+
+ virtual unsigned int getOpID() const override;
+
+ void setOpID(unsigned int opID);
+
+ virtual void checkForInterrupt() const override;
+
+ virtual Status checkForInterruptNoAssert() const override;
+
+ void setCheckForInterruptStatus(Status status);
+
+ virtual uint64_t getRemainingMaxTimeMicros() const override;
+
+ void setRemainingMaxTimeMicros(uint64_t micros);
private:
boost::scoped_ptr<Locker> _lockState;
+ unsigned int _opID;
+
+ Status _checkForInterruptStatus;
+ uint64_t _maxTimeMicrosRemaining;
};
} // namespace repl
diff --git a/src/mongo/db/repl/read_after_optime_args.cpp b/src/mongo/db/repl/read_after_optime_args.cpp
new file mode 100644
index 00000000000..b17c24f23fa
--- /dev/null
+++ b/src/mongo/db/repl/read_after_optime_args.cpp
@@ -0,0 +1,134 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/read_after_optime_args.h"
+
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/util/mongoutils/str.h"
+
+using std::string;
+
+namespace mongo {
+namespace repl {
+
+ const string ReadAfterOpTimeArgs::kRootFieldName("after");
+ const string ReadAfterOpTimeArgs::kOpTimeFieldName("opTime");
+ const string ReadAfterOpTimeArgs::kOpTimestampFieldName("ts");
+ const string ReadAfterOpTimeArgs::kOpTermFieldName("term");
+ const string ReadAfterOpTimeArgs::kTimeoutFieldName("timeoutMS");
+
+ ReadAfterOpTimeArgs::ReadAfterOpTimeArgs(): ReadAfterOpTimeArgs(OpTime(), Milliseconds(0)) {
+ }
+
+ ReadAfterOpTimeArgs::ReadAfterOpTimeArgs(OpTime opTime, Milliseconds timeout):
+ _opTime(std::move(opTime)),
+ _timeout(std::move(timeout)) {
+ }
+
+ const OpTime& ReadAfterOpTimeArgs::getOpTime() const {
+ return _opTime;
+ }
+
+ const Milliseconds& ReadAfterOpTimeArgs::getTimeout() const {
+ return _timeout;
+ }
+
+ Status ReadAfterOpTimeArgs::initialize(const BSONObj& cmdObj) {
+ auto afterElem = cmdObj[ReadAfterOpTimeArgs::kRootFieldName];
+
+ if (afterElem.eoo()) {
+ return Status::OK();
+ }
+
+ if (!afterElem.isABSONObj()) {
+ return Status(ErrorCodes::FailedToParse, "'after' field should be an object");
+ }
+
+ BSONObj readAfterObj = afterElem.Obj();
+ BSONElement opTimeElem;
+ auto opTimeStatus = bsonExtractTypedField(readAfterObj,
+ ReadAfterOpTimeArgs::kOpTimeFieldName,
+ Object,
+ &opTimeElem);
+
+ if (!opTimeStatus.isOK()) {
+ return opTimeStatus;
+ }
+
+ BSONObj opTimeObj = opTimeElem.Obj();
+ BSONElement timestampElem;
+
+ Timestamp timestamp;
+ auto timestampStatus = bsonExtractTimestampField(opTimeObj,
+ ReadAfterOpTimeArgs::kOpTimestampFieldName,
+ &timestamp);
+
+ if (!timestampStatus.isOK()) {
+ return timestampStatus;
+ }
+
+ long long termNumber;
+ auto termStatus = bsonExtractIntegerField(opTimeObj,
+ ReadAfterOpTimeArgs::kOpTermFieldName,
+ &termNumber);
+
+ if (!termStatus.isOK()) {
+ return termStatus;
+ }
+
+ long long timeoutMS;
+ auto timeoutStatus = bsonExtractIntegerFieldWithDefault(
+ readAfterObj,
+ ReadAfterOpTimeArgs::kTimeoutFieldName,
+ 0, // Default to no timeout.
+ &timeoutMS);
+
+ if (!timeoutStatus.isOK()) {
+ return timeoutStatus;
+ }
+
+ if (timeoutMS < 0) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << ReadAfterOpTimeArgs::kRootFieldName
+ << "." << ReadAfterOpTimeArgs::kTimeoutFieldName
+ << " value must be positive");
+ }
+
+ _opTime = OpTime(timestamp, termNumber);
+ _timeout = Milliseconds(timeoutMS); // Note: 'long long' -> 'long' down casting.
+
+ return Status::OK();
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/read_after_optime_args.h b/src/mongo/db/repl/read_after_optime_args.h
new file mode 100644
index 00000000000..a8703ad52f7
--- /dev/null
+++ b/src/mongo/db/repl/read_after_optime_args.h
@@ -0,0 +1,78 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "mongo/base/status.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+ class BSONObj;
+
+namespace repl {
+
+ class ReadAfterOpTimeArgs {
+ public:
+
+ static const std::string kRootFieldName;
+ static const std::string kOpTimeFieldName;
+ static const std::string kOpTimestampFieldName;
+ static const std::string kOpTermFieldName;
+ static const std::string kTimeoutFieldName;
+
+ ReadAfterOpTimeArgs();
+ ReadAfterOpTimeArgs(OpTime opTime, Milliseconds timeout);
+
+ /**
+ * Format:
+ * {
+ * find: “coll”,
+ * filter: <Query Object>,
+ * after: { // optional
+ * opTime: { ts: <timestamp>, term: <NumberLong> },
+ * timeoutMS: <NumberLong> //optional
+ * }
+ * }
+ */
+ Status initialize(const BSONObj& cmdObj);
+
+ const OpTime& getOpTime() const;
+ const Milliseconds& getTimeout() const;
+
+ private:
+
+ OpTime _opTime;
+ Milliseconds _timeout;
+ };
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/read_after_optime_args_test.cpp b/src/mongo/db/repl/read_after_optime_args_test.cpp
new file mode 100644
index 00000000000..d780500d9c2
--- /dev/null
+++ b/src/mongo/db/repl/read_after_optime_args_test.cpp
@@ -0,0 +1,178 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/read_after_optime_args.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace repl {
+namespace {
+
+ TEST(ReadAfterParse, BasicFullSpecification) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(20, 30)
+ << ReadAfterOpTimeArgs::kOpTermFieldName << 2)
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << 100))));
+
+ ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp());
+ ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm());
+ ASSERT_EQ(100, readAfterOpTime.getTimeout().total_milliseconds());
+ }
+
+ TEST(ReadAfterParse, Empty) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_OK(readAfterOpTime.initialize(BSON("find" << "test")));
+
+ ASSERT(readAfterOpTime.getOpTime().getTimestamp().isNull());
+ ASSERT_EQ(0, readAfterOpTime.getTimeout().total_milliseconds());
+ }
+
+ TEST(ReadAfterParse, BadRootType) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName << "x")));
+ }
+
+ TEST(ReadAfterParse, BadOpTimeType) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName << 2))));
+ }
+
+ TEST(ReadAfterParse, OpTimeRequiredIfRootPresent) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kTimeoutFieldName << 100))));
+ }
+
+ TEST(ReadAfterParse, NoOpTimeTS) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTermFieldName << 2)
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << 100))));
+ }
+
+ TEST(ReadAfterParse, NoOpTimeTerm) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTermFieldName << 2)
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << 100))));
+ }
+
+ TEST(ReadAfterParse, BadOpTimeTSType) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << BSON("x" << 1)
+ << ReadAfterOpTimeArgs::kOpTermFieldName << 2)
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << 100))));
+ }
+
+ TEST(ReadAfterParse, BadOpTimeTermType) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0)
+ << ReadAfterOpTimeArgs::kOpTermFieldName << "y")
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << 100))));
+ }
+
+ TEST(ReadAfterParse, TimeoutDefault) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0)
+ << ReadAfterOpTimeArgs::kOpTermFieldName << 2)))));
+
+ ASSERT_EQ(Timestamp(1, 0), readAfterOpTime.getOpTime().getTimestamp());
+ ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm());
+ ASSERT_EQ(0, readAfterOpTime.getTimeout().total_milliseconds());
+ }
+
+ TEST(ReadAfterParse, BadTimeoutType) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0)
+ << ReadAfterOpTimeArgs::kOpTermFieldName << 2)
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << "abc"))));
+ }
+
+ TEST(ReadAfterParse, NegativeTimeout) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_NOT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0)
+ << ReadAfterOpTimeArgs::kOpTermFieldName << 2)
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << -100))));
+ }
+
+ TEST(ReadAfterParse, ZeroTimeout) {
+ ReadAfterOpTimeArgs readAfterOpTime;
+ ASSERT_OK(readAfterOpTime.initialize(BSON(
+ "find" << "test"
+ << ReadAfterOpTimeArgs::kRootFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName
+ << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(20, 30)
+ << ReadAfterOpTimeArgs::kOpTermFieldName << 2)
+ << ReadAfterOpTimeArgs::kTimeoutFieldName << 0))));
+
+ ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp());
+ ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm());
+ ASSERT_EQ(0, readAfterOpTime.getTimeout().total_milliseconds());
+ }
+
+} // unnamed namespace
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/read_after_optime_response.cpp b/src/mongo/db/repl/read_after_optime_response.cpp
new file mode 100644
index 00000000000..7caffe09a96
--- /dev/null
+++ b/src/mongo/db/repl/read_after_optime_response.cpp
@@ -0,0 +1,87 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/read_after_optime_response.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+
+using std::string;
+
+namespace mongo {
+namespace repl {
+
+ const string ReadAfterOpTimeResponse::kWaitedMSFieldName("waitedMS");
+
+ ReadAfterOpTimeResponse::ReadAfterOpTimeResponse(Status status):
+ ReadAfterOpTimeResponse(status, boost::posix_time::milliseconds(0), false) {
+ }
+
+ ReadAfterOpTimeResponse::ReadAfterOpTimeResponse():
+ ReadAfterOpTimeResponse(Status::OK()) {
+ }
+
+ ReadAfterOpTimeResponse::ReadAfterOpTimeResponse(Status status,
+ boost::posix_time::milliseconds duration):
+ ReadAfterOpTimeResponse(status, duration, true) {
+ }
+
+ ReadAfterOpTimeResponse::ReadAfterOpTimeResponse(Status status,
+ boost::posix_time::milliseconds duration,
+ bool waited):
+ _waited(waited),
+ _duration(duration),
+ _status(status) {
+ }
+
+ void ReadAfterOpTimeResponse::appendInfo(BSONObjBuilder* builder) {
+ if (!_waited) {
+ return;
+ }
+
+ builder->append(kWaitedMSFieldName,
+ static_cast<long long>(_duration.total_milliseconds()));
+ }
+
+ bool ReadAfterOpTimeResponse::didWait() const {
+ return _waited;
+ }
+
+ boost::posix_time::milliseconds ReadAfterOpTimeResponse::getDuration() const {
+ return _duration;
+ }
+
+ Status ReadAfterOpTimeResponse::getStatus() const {
+ return _status;
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/read_after_optime_response.h b/src/mongo/db/repl/read_after_optime_response.h
new file mode 100644
index 00000000000..b906dc196d4
--- /dev/null
+++ b/src/mongo/db/repl/read_after_optime_response.h
@@ -0,0 +1,91 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include <string>
+
+#include "mongo/base/status.h"
+
+namespace mongo {
+
+ class BSONObjBuilder;
+
+namespace repl {
+
+ class ReadAfterOpTimeResponse {
+ public:
+ static const std::string kWaitedMSFieldName;
+
+ /**
+ * Constructs a default response that has OK status, and wait is false.
+ */
+ ReadAfterOpTimeResponse();
+
+ /**
+ * Constructs a response with the given status with wait equals to false.
+ */
+ explicit ReadAfterOpTimeResponse(Status status);
+
+ /**
+ * Constructs a response with wait set to true along with the given parameters.
+ */
+ ReadAfterOpTimeResponse(Status status, boost::posix_time::milliseconds duration);
+
+ /**
+ * Appends to the builder the timeout and duration info if didWait() is true.
+ * Note: does not include status.
+ */
+ void appendInfo(BSONObjBuilder* builder);
+
+ bool didWait() const;
+
+ /**
+ * Returns the amount of duration waiting for opTime to pass.
+ * Valid only if didWait is true.
+ */
+ boost::posix_time::milliseconds getDuration() const;
+
+ /**
+ * Returns more details about an error if it occurred.
+ */
+ Status getStatus() const;
+
+ private:
+ ReadAfterOpTimeResponse(Status status,
+ boost::posix_time::milliseconds duration,
+ bool waited);
+
+ bool _waited;
+ boost::posix_time::milliseconds _duration;
+ Status _status;
+ };
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/read_after_optime_response_test.cpp b/src/mongo/db/repl/read_after_optime_response_test.cpp
new file mode 100644
index 00000000000..a30824a57b8
--- /dev/null
+++ b/src/mongo/db/repl/read_after_optime_response_test.cpp
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/read_after_optime_response.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace repl {
+namespace {
+
+ TEST(ReadAfterResponse, Default) {
+ ReadAfterOpTimeResponse response;
+
+ ASSERT_FALSE(response.didWait());
+
+ BSONObjBuilder builder;
+ response.appendInfo(&builder);
+
+ BSONObj obj(builder.done());
+ ASSERT_TRUE(obj.isEmpty());
+ }
+
+ TEST(ReadAfterResponse, WithStatus) {
+ ReadAfterOpTimeResponse response(Status(ErrorCodes::InternalError, "test"));
+
+ ASSERT_FALSE(response.didWait());
+
+ ASSERT_EQ(ErrorCodes::InternalError, response.getStatus().code());
+
+ BSONObjBuilder builder;
+ response.appendInfo(&builder);
+
+ BSONObj obj(builder.done());
+ ASSERT_TRUE(obj.isEmpty());
+ }
+
+ TEST(ReadAfterResponse, WaitedWithDuration) {
+ ReadAfterOpTimeResponse response(Status(ErrorCodes::InternalError, "test"),
+ boost::posix_time::milliseconds(7));
+
+ ASSERT_TRUE(response.didWait());
+ ASSERT_EQUALS(7, response.getDuration().total_milliseconds());
+ ASSERT_EQ(ErrorCodes::InternalError, response.getStatus().code());
+
+ BSONObjBuilder builder;
+ response.appendInfo(&builder);
+
+ BSONObj obj(builder.done());
+ auto waitedMSElem = obj[ReadAfterOpTimeResponse::kWaitedMSFieldName];
+ ASSERT_TRUE(waitedMSElem.isNumber());
+ ASSERT_EQ(7, waitedMSElem.numberLong());
+ }
+
+} // unnamed namespace
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 660c7b47407..9201f4af326 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -55,6 +55,8 @@ namespace repl {
class IsMasterResponse;
class OplogReader;
class OpTime;
+ class ReadAfterOpTimeArgs;
+ class ReadAfterOpTimeResponse;
class ReplSetDeclareElectionWinnerArgs;
class ReplSetDeclareElectionWinnerResponse;
class ReplSetHeartbeatArgs;
@@ -288,6 +290,23 @@ namespace repl {
virtual Timestamp getMyLastOptime() const = 0;
/**
+ * Waits until the optime of the current node is at least the opTime specified in
+ * 'settings'.
+ *
+ * The returned ReadAfterOpTimeResponse object's didWait() method returns true if
+ * an attempt was made to wait for the specified opTime. Cases when this can be
+ * false could include:
+ *
+ * 1. No read after opTime was specified.
+ * 2. Attempting to do read after opTime when node is not a replica set member.
+ *
+ * Note: getDuration() on the returned ReadAfterOpTimeResponse will only be valid if
+ * its didWait() method returns true.
+ */
+ virtual ReadAfterOpTimeResponse waitUntilOpTime(const OperationContext* txn,
+ const ReadAfterOpTimeArgs& settings) = 0;
+
+ /**
* Retrieves and returns the current election id, which is a unique id that is local to
* this node and changes every time we become primary.
* TODO(spencer): Use term instead.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 246c973dc5e..a83b37b9b76 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -34,6 +34,7 @@
#include <algorithm>
#include <boost/thread.hpp>
+#include <limits>
#include "mongo/base/status.h"
#include "mongo/db/concurrency/d_concurrency.h"
@@ -45,6 +46,8 @@
#include "mongo/db/repl/freshness_checker.h"
#include "mongo/db/repl/handshake_args.h"
#include "mongo/db/repl/is_master_response.h"
+#include "mongo/db/repl/read_after_optime_args.h"
+#include "mongo/db/repl/read_after_optime_response.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_set_declare_election_winner_args.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
@@ -736,10 +739,19 @@ namespace {
if (getReplicationMode() != modeReplSet) {
return;
}
+
+ for (auto& opTimeWaiter : _opTimeWaiterList) {
+ if (*(opTimeWaiter->opTime) <= ts) {
+ opTimeWaiter->condVar->notify_all();
+ }
+ }
+
if (_getMemberState_inlock().primary()) {
return;
}
+
lock->unlock();
+
_externalState->forwardSlaveProgress(); // Must do this outside _mutex
}
@@ -758,6 +770,85 @@ namespace {
return _getMyLastOptime_inlock();
}
+ ReadAfterOpTimeResponse ReplicationCoordinatorImpl::waitUntilOpTime(
+ const OperationContext* txn,
+ const ReadAfterOpTimeArgs& settings) {
+ // TODO: SERVER-18217 use OpTime directly.
+ const auto& ts = settings.getOpTime().getTimestamp();
+ const auto& timeout = settings.getTimeout();
+
+ if (ts.isNull()) {
+ return ReadAfterOpTimeResponse();
+ }
+
+ if (getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) {
+ return ReadAfterOpTimeResponse(Status(ErrorCodes::NotAReplicaSet,
+ "node needs to be a replica set member to use read after opTime"));
+ }
+
+ // TODO: SERVER-18298 enable code once V1 protocol is fully implemented.
+#if 0
+ if (!isV1ElectionProtocol()) {
+ return ReadAfterOpTimeResponse(Status(ErrorCodes::IncompatibleElectionProtocol,
+ "node needs to be running on v1 election protocol to "
+ "use read after opTime"));
+ }
+#endif
+
+ Timer timer;
+ boost::unique_lock<boost::mutex> lock(_mutex);
+
+ while (ts > _getMyLastOptime_inlock()) {
+ Status interruptedStatus = txn->checkForInterruptNoAssert();
+ if (!interruptedStatus.isOK()) {
+ return ReadAfterOpTimeResponse(interruptedStatus,
+ Milliseconds(timer.millis()));
+ }
+
+ if (_inShutdown) {
+ return ReadAfterOpTimeResponse(
+ Status(ErrorCodes::ShutdownInProgress, "shutting down"),
+ Milliseconds(timer.millis()));
+ }
+
+ const auto elapsedMS = timer.millis();
+ if (timeout.total_milliseconds() > 0 &&
+ elapsedMS > timeout.total_milliseconds()) {
+ return ReadAfterOpTimeResponse(
+ Status(ErrorCodes::ReadAfterOptimeTimeout,
+ str::stream() << "timed out waiting for opTime: "
+ << ts.toStringPretty()),
+ Milliseconds(timer.millis()));
+ }
+
+ boost::condition_variable condVar;
+ WaiterInfo waitInfo(&_opTimeWaiterList,
+ txn->getOpID(),
+ &ts,
+ nullptr, // Don't care about write concern.
+ &condVar);
+
+ uint64_t maxTimeMicrosRemaining = txn->getRemainingMaxTimeMicros();
+ auto maxTimeMSRemaining = (maxTimeMicrosRemaining == 0) ?
+ std::numeric_limits<uint64_t>::max() : (maxTimeMicrosRemaining / 1000);
+
+ auto timeoutMSRemaining = (timeout.total_milliseconds() == 0) ?
+ std::numeric_limits<uint64_t>::max() :
+ static_cast<uint64_t>(timeout.total_milliseconds() - elapsedMS);
+
+ auto sleepTimeMS = std::min(maxTimeMSRemaining, timeoutMSRemaining);
+
+ if (sleepTimeMS == std::numeric_limits<uint64_t>::max()) {
+ condVar.wait(lock);
+ }
+ else {
+ condVar.timed_wait(lock, Milliseconds(sleepTimeMS));
+ }
+ }
+
+ return ReadAfterOpTimeResponse(Status::OK(), Milliseconds(timer.millis()));
+ }
+
Timestamp ReplicationCoordinatorImpl::_getMyLastOptime_inlock() const {
return _slaveInfo[_getMyIndexInSlaveInfo_inlock()].opTime;
}
@@ -847,6 +938,13 @@ namespace {
}
}
+ for (auto& opTimeWaiter : _opTimeWaiterList) {
+ if (opTimeWaiter->opID == opId) {
+ opTimeWaiter->condVar->notify_all();
+ return;
+ }
+ }
+
_replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback,
this,
@@ -861,6 +959,10 @@ namespace {
info->condVar->notify_all();
}
+ for (auto& opTimeWaiter : _opTimeWaiterList) {
+ opTimeWaiter->condVar->notify_all();
+ }
+
_replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback,
this,
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 83c80b73bb7..16223c17e86 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -91,21 +91,21 @@ namespace repl {
// ================== Members of public ReplicationCoordinator API ===================
- virtual void startReplication(OperationContext* txn);
+ virtual void startReplication(OperationContext* txn) override;
- virtual void shutdown();
+ virtual void shutdown() override;
- virtual const ReplSettings& getSettings() const;
+ virtual const ReplSettings& getSettings() const override;
- virtual Mode getReplicationMode() const;
+ virtual Mode getReplicationMode() const override;
- virtual MemberState getMemberState() const;
+ virtual MemberState getMemberState() const override;
- virtual bool isInPrimaryOrSecondaryState() const;
+ virtual bool isInPrimaryOrSecondaryState() const override;
- virtual Seconds getSlaveDelaySecs() const;
+ virtual Seconds getSlaveDelaySecs() const override;
- virtual void clearSyncSourceBlacklist();
+ virtual void clearSyncSourceBlacklist() override;
/*
* Implementation of the KillOpListenerInterface interrupt method so that we can wake up
@@ -156,107 +156,112 @@ namespace repl {
virtual Timestamp getMyLastOptime() const;
- virtual OpTime getMyLastOptimeV1() const;
+ virtual OpTime getMyLastOptimeV1() const override;
- virtual OID getElectionId();
+ virtual ReadAfterOpTimeResponse waitUntilOpTime(
+ const OperationContext* txn,
+ const ReadAfterOpTimeArgs& settings) override;
+
+ virtual OID getElectionId() override;
- virtual OID getMyRID() const;
+ virtual OID getMyRID() const override;
- virtual int getMyId() const;
+ virtual int getMyId() const override;
- virtual bool setFollowerMode(const MemberState& newState);
+ virtual bool setFollowerMode(const MemberState& newState) override;
- virtual bool isWaitingForApplierToDrain();
+ virtual bool isWaitingForApplierToDrain() override;
- virtual void signalDrainComplete(OperationContext* txn);
+ virtual void signalDrainComplete(OperationContext* txn) override;
- virtual void signalUpstreamUpdater();
+ virtual void signalUpstreamUpdater() override;
- virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder);
+ virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) override;
- virtual Status processReplSetGetStatus(BSONObjBuilder* result);
+ virtual Status processReplSetGetStatus(BSONObjBuilder* result) override;
- virtual void fillIsMasterForReplSet(IsMasterResponse* result);
+ virtual void fillIsMasterForReplSet(IsMasterResponse* result) override;
- virtual void appendSlaveInfoData(BSONObjBuilder* result);
+ virtual void appendSlaveInfoData(BSONObjBuilder* result) override;
- virtual ReplicaSetConfig getConfig() const;
+ virtual ReplicaSetConfig getConfig() const override;
- virtual void processReplSetGetConfig(BSONObjBuilder* result);
+ virtual void processReplSetGetConfig(BSONObjBuilder* result) override;
- virtual Status setMaintenanceMode(bool activate);
+ virtual Status setMaintenanceMode(bool activate) override;
- virtual bool getMaintenanceMode();
+ virtual bool getMaintenanceMode() override;
virtual Status processReplSetSyncFrom(const HostAndPort& target,
- BSONObjBuilder* resultObj);
+ BSONObjBuilder* resultObj) override;
- virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj);
+ virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj) override;
virtual Status processHeartbeat(const ReplSetHeartbeatArgs& args,
- ReplSetHeartbeatResponse* response);
+ ReplSetHeartbeatResponse* response) override;
virtual Status processReplSetReconfig(OperationContext* txn,
const ReplSetReconfigArgs& args,
- BSONObjBuilder* resultObj);
+ BSONObjBuilder* resultObj) override;
virtual Status processReplSetInitiate(OperationContext* txn,
const BSONObj& configObj,
- BSONObjBuilder* resultObj);
+ BSONObjBuilder* resultObj) override;
- virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj);
+ virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj) override;
- virtual void incrementRollbackID();
+ virtual void incrementRollbackID() override;
virtual Status processReplSetFresh(const ReplSetFreshArgs& args,
- BSONObjBuilder* resultObj);
+ BSONObjBuilder* resultObj) override;
virtual Status processReplSetElect(const ReplSetElectArgs& args,
- BSONObjBuilder* response);
+ BSONObjBuilder* response) override;
virtual Status processReplSetUpdatePosition(const UpdatePositionArgs& updates,
- long long* configVersion);
+ long long* configVersion) override;
- virtual Status processHandshake(OperationContext* txn, const HandshakeArgs& handshake);
+ virtual Status processHandshake(OperationContext* txn,
+ const HandshakeArgs& handshake) override;
- virtual bool buildsIndexes();
+ virtual bool buildsIndexes() override;
- virtual std::vector<HostAndPort> getHostsWrittenTo(const Timestamp& op);
+ virtual std::vector<HostAndPort> getHostsWrittenTo(const Timestamp& op) override;
- virtual std::vector<HostAndPort> getOtherNodesInReplSet() const;
+ virtual std::vector<HostAndPort> getOtherNodesInReplSet() const override;
- virtual WriteConcernOptions getGetLastErrorDefault();
+ virtual WriteConcernOptions getGetLastErrorDefault() override;
- virtual Status checkReplEnabledForCommand(BSONObjBuilder* result);
+ virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override;
- virtual bool isReplEnabled() const;
+ virtual bool isReplEnabled() const override;
- virtual HostAndPort chooseNewSyncSource();
+ virtual HostAndPort chooseNewSyncSource() override;
- virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
+ virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override;
- virtual void resetLastOpTimeFromOplog(OperationContext* txn);
+ virtual void resetLastOpTimeFromOplog(OperationContext* txn) override;
- virtual bool shouldChangeSyncSource(const HostAndPort& currentSource);
+ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) override;
- virtual Timestamp getLastCommittedOpTime() const;
+ virtual Timestamp getLastCommittedOpTime() const override;
virtual Status processReplSetRequestVotes(OperationContext* txn,
const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response);
+ ReplSetRequestVotesResponse* response) override;
virtual Status processReplSetDeclareElectionWinner(
const ReplSetDeclareElectionWinnerArgs& args,
- long long* responseTerm);
+ long long* responseTerm) override;
virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder);
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
- ReplSetHeartbeatResponseV1* response);
+ ReplSetHeartbeatResponseV1* response) override;
- virtual bool isV1ElectionProtocol();
+ virtual bool isV1ElectionProtocol() override;
- virtual void summarizeAsHtml(ReplSetHtmlSummary* s);
+ virtual void summarizeAsHtml(ReplSetHtmlSummary* s) override;
// ================== Test support API ===================
@@ -880,6 +885,10 @@ namespace repl {
// WaiterInfos.
std::vector<WaiterInfo*> _replicationWaiterList; // (M)
+ // list of information about clients waiting for a particular opTime.
+ // Does *not* own the WaiterInfos.
+ std::vector<WaiterInfo*> _opTimeWaiterList; // (M)
+
// Set to true when we are in the process of shutting down replication.
bool _inShutdown; // (M)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index f486fed8694..b33b87b1a83 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -32,6 +32,7 @@
#include <boost/scoped_ptr.hpp>
#include <boost/thread.hpp>
+#include <future>
#include <iostream>
#include <memory>
#include <set>
@@ -42,6 +43,9 @@
#include "mongo/db/repl/is_master_response.h"
#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/operation_context_repl_mock.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/read_after_optime_args.h"
+#include "mongo/db/repl/read_after_optime_response.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replica_set_config.h"
@@ -57,12 +61,15 @@
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
+#include "mongo/util/time_support.h"
+#include "mongo/util/timer.h"
namespace mongo {
namespace repl {
namespace {
typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs;
+ Status kInterruptedStatus(ErrorCodes::Interrupted, "operation was interrupted");
TEST_F(ReplCoordTest, StartupWithValidLocalConfig) {
assertStartSuccess(
@@ -324,7 +331,7 @@ namespace {
ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
BSONObjBuilder result1;
- getExternalState()->setStoreLocalConfigDocumentStatus(Status(ErrorCodes::OutOfDiskSpace,
+ getExternalState()->setStoreLocalConfigDocumentStatus(Status(ErrorCodes::OutOfDiskSpace,
"The test set this"));
ASSERT_EQUALS(
ErrorCodes::OutOfDiskSpace,
@@ -840,50 +847,9 @@ namespace {
awaiter.reset();
}
- class OperationContextNoopWithInterrupt : public OperationContextReplMock {
- public:
-
- OperationContextNoopWithInterrupt() : _opID(0), _interruptOp(false) {}
-
- virtual unsigned int getOpID() const {
- return _opID;
- }
-
- /**
- * Can only be called before any multi-threaded access to this object has begun.
- */
- void setOpID(unsigned int opID) {
- _opID = opID;
- }
-
- virtual void checkForInterrupt() const {
- if (_interruptOp) {
- uasserted(ErrorCodes::Interrupted, "operation was interrupted");
- }
- }
-
- virtual Status checkForInterruptNoAssert() const {
- if (_interruptOp) {
- return Status(ErrorCodes::Interrupted, "operation was interrupted");
- }
- return Status::OK();
- }
-
- /**
- * Can only be called before any multi-threaded access to this object has begun.
- */
- void setInterruptOp(bool interrupt) {
- _interruptOp = interrupt;
- }
-
- private:
- unsigned int _opID;
- bool _interruptOp;
- };
-
TEST_F(ReplCoordTest, AwaitReplicationInterrupt) {
// Tests that a thread blocked in awaitReplication can be killed by a killOp operation
- OperationContextNoopWithInterrupt txn;
+ OperationContextReplMock txn;
assertStartSuccess(
BSON("_id" << "mySet" <<
"version" << 2 <<
@@ -914,7 +880,7 @@ namespace {
ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time1));
- txn.setInterruptOp(true);
+ txn.setCheckForInterruptStatus(kInterruptedStatus);
getReplCoord()->interrupt(opID);
ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
ASSERT_EQUALS(ErrorCodes::Interrupted, statusAndDur.status);
@@ -1202,7 +1168,7 @@ namespace {
}
TEST_F(StepDownTest, InterruptStepDown) {
- OperationContextNoopWithInterrupt txn;
+ OperationContextReplMock txn;
Timestamp optime1(100, 1);
Timestamp optime2(100, 2);
// No secondary is caught up
@@ -1223,7 +1189,7 @@ namespace {
unsigned int opID = 100;
txn.setOpID(opID);
- txn.setInterruptOp(true);
+ txn.setCheckForInterruptStatus(kInterruptedStatus);
getReplCoord()->interrupt(opID);
ASSERT_EQUALS(ErrorCodes::Interrupted, runner.getResult());
@@ -1558,7 +1524,7 @@ namespace {
IsMasterResponse roundTripped;
ASSERT_OK(roundTripped.initialize(response.toBSON()));
}
-
+
TEST_F(ReplCoordTest, ShutDownBeforeStartUpFinished) {
init();
startCapturingLogMessages();
@@ -1945,6 +1911,172 @@ namespace {
ASSERT_EQUALS(newTime, getReplCoord()->getLastCommittedOpTime());
}
+ TEST_F(ReplCoordTest, CantUseReadAfterIfNotReplSet) {
+ init(ReplSettings());
+ OperationContextNoop txn;
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1),
+ Milliseconds(0)));
+
+ ASSERT_FALSE(result.didWait());
+ ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterWhileShutdown) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(Timestamp(10, 0));
+
+ shutdown();
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1),
+ Milliseconds(0)));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterInterrupted) {
+ OperationContextReplMock txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(Timestamp(10, 0));
+
+ txn.setCheckForInterruptStatus(Status(ErrorCodes::Interrupted, "test"));
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1),
+ Milliseconds(0)));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterNoOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs());
+
+ ASSERT_FALSE(result.didWait());
+ ASSERT_OK(result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterGreaterOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(Timestamp(100, 0));
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1),
+ Milliseconds(100)));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterEqualOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+
+ OpTime time(Timestamp(100, 0), 1);
+ getReplCoord()->setMyLastOptime(time.getTimestamp());
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(time, Milliseconds(100)));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterDeferredGreaterOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(Timestamp(0, 0));
+
+ auto pseudoLogOp = std::async(std::launch::async, [this]() {
+ // Not guaranteed to be scheduled after waitUnitl blocks...
+ getReplCoord()->setMyLastOptime(Timestamp(200, 0));
+ });
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(OpTime(Timestamp(100, 0), 1),
+ Milliseconds(0)));
+ pseudoLogOp.get();
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterDeferredEqualOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(Timestamp(0, 0));
+
+ repl::OpTime opTimeToWait(Timestamp(100, 0), 1);
+
+ auto pseudoLogOp = std::async(std::launch::async, [this, &opTimeToWait]() {
+ // Not guaranteed to be scheduled after waitUnitl blocks...
+ getReplCoord()->setMyLastOptime(opTimeToWait.getTimestamp());
+ });
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(opTimeToWait, Milliseconds(0)));
+ pseudoLogOp.get();
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+ }
+
+ TEST_F(ReplCoordTest, ReadAfterOpTimeTimeoutNoMaxTimeMS) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(Timestamp(100, 0));
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn,
+ ReadAfterOpTimeArgs(OpTime(Timestamp(200, 0), 1), Milliseconds(10)));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_EQUALS(ErrorCodes::ReadAfterOptimeTimeout, result.getStatus());
+ }
+
// TODO(schwerin): Unit test election id updating
} // namespace
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 9d7f28e62af..ca9f32605ed 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -32,6 +32,8 @@
#include "mongo/base/status.h"
#include "mongo/db/write_concern_options.h"
+#include "mongo/db/repl/read_after_optime_args.h"
+#include "mongo/db/repl/read_after_optime_response.h"
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/optime.h"
#include "mongo/util/assert_util.h"
@@ -145,6 +147,12 @@ namespace repl {
return Timestamp();
}
+ ReadAfterOpTimeResponse ReplicationCoordinatorMock::waitUntilOpTime(
+ const OperationContext* txn,
+ const ReadAfterOpTimeArgs& settings) {
+ return ReadAfterOpTimeResponse();
+ }
+
OID ReplicationCoordinatorMock::getElectionId() {
// TODO
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 79cf56507c8..d4f8bd33754 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -103,6 +103,10 @@ namespace repl {
virtual Timestamp getMyLastOptime() const;
+ virtual ReadAfterOpTimeResponse waitUntilOpTime(
+ const OperationContext* txn,
+ const ReadAfterOpTimeArgs& settings) override;
+
virtual OID getElectionId();
virtual OID getMyRID() const;