diff options
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/SConscript | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/operation_context_repl_mock.cpp | 44 | ||||
-rw-r--r-- | src/mongo/db/repl/operation_context_repl_mock.h | 22 | ||||
-rw-r--r-- | src/mongo/db/repl/read_after_optime_args.cpp | 134 | ||||
-rw-r--r-- | src/mongo/db/repl/read_after_optime_args.h | 78 | ||||
-rw-r--r-- | src/mongo/db/repl/read_after_optime_args_test.cpp | 178 | ||||
-rw-r--r-- | src/mongo/db/repl/read_after_optime_response.cpp | 87 | ||||
-rw-r--r-- | src/mongo/db/repl/read_after_optime_response.h | 91 | ||||
-rw-r--r-- | src/mongo/db/repl/read_after_optime_response_test.cpp | 82 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 19 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 102 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 111 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_test.cpp | 226 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 4 |
15 files changed, 1113 insertions, 101 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index fbb37662882..8eb73d7f8ba 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -246,11 +246,21 @@ env.Library('replmocks', 'replication_executor', ]) +env.Library('read_after_optime_args', + [ + 'read_after_optime_args.cpp' + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base/base', + '$BUILD_DIR/mongo/bson/util/bson_extract', + ]) + env.Library('replica_set_messages', [ 'handshake_args.cpp', 'is_master_response.cpp', 'member_config.cpp', + 'read_after_optime_response.cpp', 'repl_set_declare_election_winner_args.cpp', 'repl_set_heartbeat_args.cpp', 'repl_set_heartbeat_args_v1.cpp', @@ -264,6 +274,7 @@ env.Library('replica_set_messages', 'last_vote.cpp', ], LIBDEPS=[ + 'read_after_optime_args', '$BUILD_DIR/mongo/bson/bson', '$BUILD_DIR/mongo/bson/util/bson_extract', '$BUILD_DIR/mongo/util/net/hostandport', @@ -449,3 +460,20 @@ env.CppUnitTest( 'task_runner_test_fixture', ], ) + +env.CppUnitTest( + target='read_after_optime_args_test', + source=[ + 'read_after_optime_args_test.cpp', + ], + LIBDEPS=['replica_set_messages'] +) + +env.CppUnitTest( + target='read_after_optime_response_test', + source=[ + 'read_after_optime_response_test.cpp', + ], + LIBDEPS=['replica_set_messages'] +) + diff --git a/src/mongo/db/repl/operation_context_repl_mock.cpp b/src/mongo/db/repl/operation_context_repl_mock.cpp index b5c44316261..e1730eacf1c 100644 --- a/src/mongo/db/repl/operation_context_repl_mock.cpp +++ b/src/mongo/db/repl/operation_context_repl_mock.cpp @@ -36,10 +36,50 @@ namespace mongo { namespace repl { - OperationContextReplMock::OperationContextReplMock() - : _lockState(new MMAPV1LockerImpl()) { } + OperationContextReplMock::OperationContextReplMock(): + _lockState(new MMAPV1LockerImpl()), + _opID(0), + _checkForInterruptStatus(Status::OK()), + _maxTimeMicrosRemaining(0) { + } OperationContextReplMock::~OperationContextReplMock() {} + Locker* OperationContextReplMock::lockState() const { + return _lockState.get(); + } + + unsigned int OperationContextReplMock::getOpID() const { + return _opID; + } + + void OperationContextReplMock::setOpID(unsigned int opID) { + _opID = opID; + } + + void OperationContextReplMock::checkForInterrupt() const { + uassertStatusOK(checkForInterruptNoAssert()); + } + + Status OperationContextReplMock::checkForInterruptNoAssert() const { + if (!_checkForInterruptStatus.isOK()) { + return _checkForInterruptStatus; + } + + return Status::OK(); + } + + void OperationContextReplMock::setCheckForInterruptStatus(Status status) { + _checkForInterruptStatus = std::move(status); + } + + uint64_t OperationContextReplMock::getRemainingMaxTimeMicros() const { + return _maxTimeMicrosRemaining; + } + + void OperationContextReplMock::setRemainingMaxTimeMicros(uint64_t micros) { + _maxTimeMicrosRemaining = micros; + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/operation_context_repl_mock.h b/src/mongo/db/repl/operation_context_repl_mock.h index 660fbdfc5db..fb53dff2e3c 100644 --- a/src/mongo/db/repl/operation_context_repl_mock.h +++ b/src/mongo/db/repl/operation_context_repl_mock.h @@ -40,16 +40,36 @@ namespace repl { /** * Mock implementation of OperationContext that can be used with real instances of LockManager. + * Note this is not thread safe and the setter methods should only be called in the context + * where access to this object is guaranteed to be serialized. */ class OperationContextReplMock : public OperationContextNoop { public: OperationContextReplMock(); virtual ~OperationContextReplMock(); - virtual Locker* lockState() const { return _lockState.get(); } + virtual Locker* lockState() const override; + + virtual unsigned int getOpID() const override; + + void setOpID(unsigned int opID); + + virtual void checkForInterrupt() const override; + + virtual Status checkForInterruptNoAssert() const override; + + void setCheckForInterruptStatus(Status status); + + virtual uint64_t getRemainingMaxTimeMicros() const override; + + void setRemainingMaxTimeMicros(uint64_t micros); private: boost::scoped_ptr<Locker> _lockState; + unsigned int _opID; + + Status _checkForInterruptStatus; + uint64_t _maxTimeMicrosRemaining; }; } // namespace repl diff --git a/src/mongo/db/repl/read_after_optime_args.cpp b/src/mongo/db/repl/read_after_optime_args.cpp new file mode 100644 index 00000000000..b17c24f23fa --- /dev/null +++ b/src/mongo/db/repl/read_after_optime_args.cpp @@ -0,0 +1,134 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/read_after_optime_args.h" + +#include "mongo/bson/util/bson_extract.h" +#include "mongo/db/jsobj.h" +#include "mongo/util/mongoutils/str.h" + +using std::string; + +namespace mongo { +namespace repl { + + const string ReadAfterOpTimeArgs::kRootFieldName("after"); + const string ReadAfterOpTimeArgs::kOpTimeFieldName("opTime"); + const string ReadAfterOpTimeArgs::kOpTimestampFieldName("ts"); + const string ReadAfterOpTimeArgs::kOpTermFieldName("term"); + const string ReadAfterOpTimeArgs::kTimeoutFieldName("timeoutMS"); + + ReadAfterOpTimeArgs::ReadAfterOpTimeArgs(): ReadAfterOpTimeArgs(OpTime(), Milliseconds(0)) { + } + + ReadAfterOpTimeArgs::ReadAfterOpTimeArgs(OpTime opTime, Milliseconds timeout): + _opTime(std::move(opTime)), + _timeout(std::move(timeout)) { + } + + const OpTime& ReadAfterOpTimeArgs::getOpTime() const { + return _opTime; + } + + const Milliseconds& ReadAfterOpTimeArgs::getTimeout() const { + return _timeout; + } + + Status ReadAfterOpTimeArgs::initialize(const BSONObj& cmdObj) { + auto afterElem = cmdObj[ReadAfterOpTimeArgs::kRootFieldName]; + + if (afterElem.eoo()) { + return Status::OK(); + } + + if (!afterElem.isABSONObj()) { + return Status(ErrorCodes::FailedToParse, "'after' field should be an object"); + } + + BSONObj readAfterObj = afterElem.Obj(); + BSONElement opTimeElem; + auto opTimeStatus = bsonExtractTypedField(readAfterObj, + ReadAfterOpTimeArgs::kOpTimeFieldName, + Object, + &opTimeElem); + + if (!opTimeStatus.isOK()) { + return opTimeStatus; + } + + BSONObj opTimeObj = opTimeElem.Obj(); + BSONElement timestampElem; + + Timestamp timestamp; + auto timestampStatus = bsonExtractTimestampField(opTimeObj, + ReadAfterOpTimeArgs::kOpTimestampFieldName, + ×tamp); + + if (!timestampStatus.isOK()) { + return timestampStatus; + } + + long long termNumber; + auto termStatus = bsonExtractIntegerField(opTimeObj, + ReadAfterOpTimeArgs::kOpTermFieldName, + &termNumber); + + if (!termStatus.isOK()) { + return termStatus; + } + + long long timeoutMS; + auto timeoutStatus = bsonExtractIntegerFieldWithDefault( + readAfterObj, + ReadAfterOpTimeArgs::kTimeoutFieldName, + 0, // Default to no timeout. + &timeoutMS); + + if (!timeoutStatus.isOK()) { + return timeoutStatus; + } + + if (timeoutMS < 0) { + return Status(ErrorCodes::BadValue, + str::stream() << ReadAfterOpTimeArgs::kRootFieldName + << "." << ReadAfterOpTimeArgs::kTimeoutFieldName + << " value must be positive"); + } + + _opTime = OpTime(timestamp, termNumber); + _timeout = Milliseconds(timeoutMS); // Note: 'long long' -> 'long' down casting. + + return Status::OK(); + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/read_after_optime_args.h b/src/mongo/db/repl/read_after_optime_args.h new file mode 100644 index 00000000000..a8703ad52f7 --- /dev/null +++ b/src/mongo/db/repl/read_after_optime_args.h @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/base/status.h" +#include "mongo/db/repl/optime.h" +#include "mongo/util/time_support.h" + +namespace mongo { + + class BSONObj; + +namespace repl { + + class ReadAfterOpTimeArgs { + public: + + static const std::string kRootFieldName; + static const std::string kOpTimeFieldName; + static const std::string kOpTimestampFieldName; + static const std::string kOpTermFieldName; + static const std::string kTimeoutFieldName; + + ReadAfterOpTimeArgs(); + ReadAfterOpTimeArgs(OpTime opTime, Milliseconds timeout); + + /** + * Format: + * { + * find: “coll”, + * filter: <Query Object>, + * after: { // optional + * opTime: { ts: <timestamp>, term: <NumberLong> }, + * timeoutMS: <NumberLong> //optional + * } + * } + */ + Status initialize(const BSONObj& cmdObj); + + const OpTime& getOpTime() const; + const Milliseconds& getTimeout() const; + + private: + + OpTime _opTime; + Milliseconds _timeout; + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/read_after_optime_args_test.cpp b/src/mongo/db/repl/read_after_optime_args_test.cpp new file mode 100644 index 00000000000..d780500d9c2 --- /dev/null +++ b/src/mongo/db/repl/read_after_optime_args_test.cpp @@ -0,0 +1,178 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/jsobj.h" +#include "mongo/db/repl/read_after_optime_args.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace repl { +namespace { + + TEST(ReadAfterParse, BasicFullSpecification) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(20, 30) + << ReadAfterOpTimeArgs::kOpTermFieldName << 2) + << ReadAfterOpTimeArgs::kTimeoutFieldName << 100)))); + + ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp()); + ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm()); + ASSERT_EQ(100, readAfterOpTime.getTimeout().total_milliseconds()); + } + + TEST(ReadAfterParse, Empty) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_OK(readAfterOpTime.initialize(BSON("find" << "test"))); + + ASSERT(readAfterOpTime.getOpTime().getTimestamp().isNull()); + ASSERT_EQ(0, readAfterOpTime.getTimeout().total_milliseconds()); + } + + TEST(ReadAfterParse, BadRootType) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName << "x"))); + } + + TEST(ReadAfterParse, BadOpTimeType) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName << 2)))); + } + + TEST(ReadAfterParse, OpTimeRequiredIfRootPresent) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kTimeoutFieldName << 100)))); + } + + TEST(ReadAfterParse, NoOpTimeTS) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTermFieldName << 2) + << ReadAfterOpTimeArgs::kTimeoutFieldName << 100)))); + } + + TEST(ReadAfterParse, NoOpTimeTerm) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTermFieldName << 2) + << ReadAfterOpTimeArgs::kTimeoutFieldName << 100)))); + } + + TEST(ReadAfterParse, BadOpTimeTSType) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << BSON("x" << 1) + << ReadAfterOpTimeArgs::kOpTermFieldName << 2) + << ReadAfterOpTimeArgs::kTimeoutFieldName << 100)))); + } + + TEST(ReadAfterParse, BadOpTimeTermType) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0) + << ReadAfterOpTimeArgs::kOpTermFieldName << "y") + << ReadAfterOpTimeArgs::kTimeoutFieldName << 100)))); + } + + TEST(ReadAfterParse, TimeoutDefault) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0) + << ReadAfterOpTimeArgs::kOpTermFieldName << 2))))); + + ASSERT_EQ(Timestamp(1, 0), readAfterOpTime.getOpTime().getTimestamp()); + ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm()); + ASSERT_EQ(0, readAfterOpTime.getTimeout().total_milliseconds()); + } + + TEST(ReadAfterParse, BadTimeoutType) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0) + << ReadAfterOpTimeArgs::kOpTermFieldName << 2) + << ReadAfterOpTimeArgs::kTimeoutFieldName << "abc")))); + } + + TEST(ReadAfterParse, NegativeTimeout) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_NOT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(1, 0) + << ReadAfterOpTimeArgs::kOpTermFieldName << 2) + << ReadAfterOpTimeArgs::kTimeoutFieldName << -100)))); + } + + TEST(ReadAfterParse, ZeroTimeout) { + ReadAfterOpTimeArgs readAfterOpTime; + ASSERT_OK(readAfterOpTime.initialize(BSON( + "find" << "test" + << ReadAfterOpTimeArgs::kRootFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimeFieldName + << BSON(ReadAfterOpTimeArgs::kOpTimestampFieldName << Timestamp(20, 30) + << ReadAfterOpTimeArgs::kOpTermFieldName << 2) + << ReadAfterOpTimeArgs::kTimeoutFieldName << 0)))); + + ASSERT_EQ(Timestamp(20, 30), readAfterOpTime.getOpTime().getTimestamp()); + ASSERT_EQ(2, readAfterOpTime.getOpTime().getTerm()); + ASSERT_EQ(0, readAfterOpTime.getTimeout().total_milliseconds()); + } + +} // unnamed namespace +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/read_after_optime_response.cpp b/src/mongo/db/repl/read_after_optime_response.cpp new file mode 100644 index 00000000000..7caffe09a96 --- /dev/null +++ b/src/mongo/db/repl/read_after_optime_response.cpp @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/read_after_optime_response.h" + +#include "mongo/bson/bsonobjbuilder.h" + +using std::string; + +namespace mongo { +namespace repl { + + const string ReadAfterOpTimeResponse::kWaitedMSFieldName("waitedMS"); + + ReadAfterOpTimeResponse::ReadAfterOpTimeResponse(Status status): + ReadAfterOpTimeResponse(status, boost::posix_time::milliseconds(0), false) { + } + + ReadAfterOpTimeResponse::ReadAfterOpTimeResponse(): + ReadAfterOpTimeResponse(Status::OK()) { + } + + ReadAfterOpTimeResponse::ReadAfterOpTimeResponse(Status status, + boost::posix_time::milliseconds duration): + ReadAfterOpTimeResponse(status, duration, true) { + } + + ReadAfterOpTimeResponse::ReadAfterOpTimeResponse(Status status, + boost::posix_time::milliseconds duration, + bool waited): + _waited(waited), + _duration(duration), + _status(status) { + } + + void ReadAfterOpTimeResponse::appendInfo(BSONObjBuilder* builder) { + if (!_waited) { + return; + } + + builder->append(kWaitedMSFieldName, + static_cast<long long>(_duration.total_milliseconds())); + } + + bool ReadAfterOpTimeResponse::didWait() const { + return _waited; + } + + boost::posix_time::milliseconds ReadAfterOpTimeResponse::getDuration() const { + return _duration; + } + + Status ReadAfterOpTimeResponse::getStatus() const { + return _status; + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/read_after_optime_response.h b/src/mongo/db/repl/read_after_optime_response.h new file mode 100644 index 00000000000..b906dc196d4 --- /dev/null +++ b/src/mongo/db/repl/read_after_optime_response.h @@ -0,0 +1,91 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/date_time/posix_time/posix_time_types.hpp> +#include <string> + +#include "mongo/base/status.h" + +namespace mongo { + + class BSONObjBuilder; + +namespace repl { + + class ReadAfterOpTimeResponse { + public: + static const std::string kWaitedMSFieldName; + + /** + * Constructs a default response that has OK status, and wait is false. + */ + ReadAfterOpTimeResponse(); + + /** + * Constructs a response with the given status with wait equals to false. + */ + explicit ReadAfterOpTimeResponse(Status status); + + /** + * Constructs a response with wait set to true along with the given parameters. + */ + ReadAfterOpTimeResponse(Status status, boost::posix_time::milliseconds duration); + + /** + * Appends to the builder the timeout and duration info if didWait() is true. + * Note: does not include status. + */ + void appendInfo(BSONObjBuilder* builder); + + bool didWait() const; + + /** + * Returns the amount of duration waiting for opTime to pass. + * Valid only if didWait is true. + */ + boost::posix_time::milliseconds getDuration() const; + + /** + * Returns more details about an error if it occurred. + */ + Status getStatus() const; + + private: + ReadAfterOpTimeResponse(Status status, + boost::posix_time::milliseconds duration, + bool waited); + + bool _waited; + boost::posix_time::milliseconds _duration; + Status _status; + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/read_after_optime_response_test.cpp b/src/mongo/db/repl/read_after_optime_response_test.cpp new file mode 100644 index 00000000000..a30824a57b8 --- /dev/null +++ b/src/mongo/db/repl/read_after_optime_response_test.cpp @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/jsobj.h" +#include "mongo/db/repl/read_after_optime_response.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace repl { +namespace { + + TEST(ReadAfterResponse, Default) { + ReadAfterOpTimeResponse response; + + ASSERT_FALSE(response.didWait()); + + BSONObjBuilder builder; + response.appendInfo(&builder); + + BSONObj obj(builder.done()); + ASSERT_TRUE(obj.isEmpty()); + } + + TEST(ReadAfterResponse, WithStatus) { + ReadAfterOpTimeResponse response(Status(ErrorCodes::InternalError, "test")); + + ASSERT_FALSE(response.didWait()); + + ASSERT_EQ(ErrorCodes::InternalError, response.getStatus().code()); + + BSONObjBuilder builder; + response.appendInfo(&builder); + + BSONObj obj(builder.done()); + ASSERT_TRUE(obj.isEmpty()); + } + + TEST(ReadAfterResponse, WaitedWithDuration) { + ReadAfterOpTimeResponse response(Status(ErrorCodes::InternalError, "test"), + boost::posix_time::milliseconds(7)); + + ASSERT_TRUE(response.didWait()); + ASSERT_EQUALS(7, response.getDuration().total_milliseconds()); + ASSERT_EQ(ErrorCodes::InternalError, response.getStatus().code()); + + BSONObjBuilder builder; + response.appendInfo(&builder); + + BSONObj obj(builder.done()); + auto waitedMSElem = obj[ReadAfterOpTimeResponse::kWaitedMSFieldName]; + ASSERT_TRUE(waitedMSElem.isNumber()); + ASSERT_EQ(7, waitedMSElem.numberLong()); + } + +} // unnamed namespace +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 660c7b47407..9201f4af326 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -55,6 +55,8 @@ namespace repl { class IsMasterResponse; class OplogReader; class OpTime; + class ReadAfterOpTimeArgs; + class ReadAfterOpTimeResponse; class ReplSetDeclareElectionWinnerArgs; class ReplSetDeclareElectionWinnerResponse; class ReplSetHeartbeatArgs; @@ -288,6 +290,23 @@ namespace repl { virtual Timestamp getMyLastOptime() const = 0; /** + * Waits until the optime of the current node is at least the opTime specified in + * 'settings'. + * + * The returned ReadAfterOpTimeResponse object's didWait() method returns true if + * an attempt was made to wait for the specified opTime. Cases when this can be + * false could include: + * + * 1. No read after opTime was specified. + * 2. Attempting to do read after opTime when node is not a replica set member. + * + * Note: getDuration() on the returned ReadAfterOpTimeResponse will only be valid if + * its didWait() method returns true. + */ + virtual ReadAfterOpTimeResponse waitUntilOpTime(const OperationContext* txn, + const ReadAfterOpTimeArgs& settings) = 0; + + /** * Retrieves and returns the current election id, which is a unique id that is local to * this node and changes every time we become primary. * TODO(spencer): Use term instead. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 246c973dc5e..a83b37b9b76 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -34,6 +34,7 @@ #include <algorithm> #include <boost/thread.hpp> +#include <limits> #include "mongo/base/status.h" #include "mongo/db/concurrency/d_concurrency.h" @@ -45,6 +46,8 @@ #include "mongo/db/repl/freshness_checker.h" #include "mongo/db/repl/handshake_args.h" #include "mongo/db/repl/is_master_response.h" +#include "mongo/db/repl/read_after_optime_args.h" +#include "mongo/db/repl/read_after_optime_response.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_set_declare_election_winner_args.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" @@ -736,10 +739,19 @@ namespace { if (getReplicationMode() != modeReplSet) { return; } + + for (auto& opTimeWaiter : _opTimeWaiterList) { + if (*(opTimeWaiter->opTime) <= ts) { + opTimeWaiter->condVar->notify_all(); + } + } + if (_getMemberState_inlock().primary()) { return; } + lock->unlock(); + _externalState->forwardSlaveProgress(); // Must do this outside _mutex } @@ -758,6 +770,85 @@ namespace { return _getMyLastOptime_inlock(); } + ReadAfterOpTimeResponse ReplicationCoordinatorImpl::waitUntilOpTime( + const OperationContext* txn, + const ReadAfterOpTimeArgs& settings) { + // TODO: SERVER-18217 use OpTime directly. + const auto& ts = settings.getOpTime().getTimestamp(); + const auto& timeout = settings.getTimeout(); + + if (ts.isNull()) { + return ReadAfterOpTimeResponse(); + } + + if (getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) { + return ReadAfterOpTimeResponse(Status(ErrorCodes::NotAReplicaSet, + "node needs to be a replica set member to use read after opTime")); + } + + // TODO: SERVER-18298 enable code once V1 protocol is fully implemented. +#if 0 + if (!isV1ElectionProtocol()) { + return ReadAfterOpTimeResponse(Status(ErrorCodes::IncompatibleElectionProtocol, + "node needs to be running on v1 election protocol to " + "use read after opTime")); + } +#endif + + Timer timer; + boost::unique_lock<boost::mutex> lock(_mutex); + + while (ts > _getMyLastOptime_inlock()) { + Status interruptedStatus = txn->checkForInterruptNoAssert(); + if (!interruptedStatus.isOK()) { + return ReadAfterOpTimeResponse(interruptedStatus, + Milliseconds(timer.millis())); + } + + if (_inShutdown) { + return ReadAfterOpTimeResponse( + Status(ErrorCodes::ShutdownInProgress, "shutting down"), + Milliseconds(timer.millis())); + } + + const auto elapsedMS = timer.millis(); + if (timeout.total_milliseconds() > 0 && + elapsedMS > timeout.total_milliseconds()) { + return ReadAfterOpTimeResponse( + Status(ErrorCodes::ReadAfterOptimeTimeout, + str::stream() << "timed out waiting for opTime: " + << ts.toStringPretty()), + Milliseconds(timer.millis())); + } + + boost::condition_variable condVar; + WaiterInfo waitInfo(&_opTimeWaiterList, + txn->getOpID(), + &ts, + nullptr, // Don't care about write concern. + &condVar); + + uint64_t maxTimeMicrosRemaining = txn->getRemainingMaxTimeMicros(); + auto maxTimeMSRemaining = (maxTimeMicrosRemaining == 0) ? + std::numeric_limits<uint64_t>::max() : (maxTimeMicrosRemaining / 1000); + + auto timeoutMSRemaining = (timeout.total_milliseconds() == 0) ? + std::numeric_limits<uint64_t>::max() : + static_cast<uint64_t>(timeout.total_milliseconds() - elapsedMS); + + auto sleepTimeMS = std::min(maxTimeMSRemaining, timeoutMSRemaining); + + if (sleepTimeMS == std::numeric_limits<uint64_t>::max()) { + condVar.wait(lock); + } + else { + condVar.timed_wait(lock, Milliseconds(sleepTimeMS)); + } + } + + return ReadAfterOpTimeResponse(Status::OK(), Milliseconds(timer.millis())); + } + Timestamp ReplicationCoordinatorImpl::_getMyLastOptime_inlock() const { return _slaveInfo[_getMyIndexInSlaveInfo_inlock()].opTime; } @@ -847,6 +938,13 @@ namespace { } } + for (auto& opTimeWaiter : _opTimeWaiterList) { + if (opTimeWaiter->opID == opId) { + opTimeWaiter->condVar->notify_all(); + return; + } + } + _replExecutor.scheduleWork( stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback, this, @@ -861,6 +959,10 @@ namespace { info->condVar->notify_all(); } + for (auto& opTimeWaiter : _opTimeWaiterList) { + opTimeWaiter->condVar->notify_all(); + } + _replExecutor.scheduleWork( stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback, this, diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 83c80b73bb7..16223c17e86 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -91,21 +91,21 @@ namespace repl { // ================== Members of public ReplicationCoordinator API =================== - virtual void startReplication(OperationContext* txn); + virtual void startReplication(OperationContext* txn) override; - virtual void shutdown(); + virtual void shutdown() override; - virtual const ReplSettings& getSettings() const; + virtual const ReplSettings& getSettings() const override; - virtual Mode getReplicationMode() const; + virtual Mode getReplicationMode() const override; - virtual MemberState getMemberState() const; + virtual MemberState getMemberState() const override; - virtual bool isInPrimaryOrSecondaryState() const; + virtual bool isInPrimaryOrSecondaryState() const override; - virtual Seconds getSlaveDelaySecs() const; + virtual Seconds getSlaveDelaySecs() const override; - virtual void clearSyncSourceBlacklist(); + virtual void clearSyncSourceBlacklist() override; /* * Implementation of the KillOpListenerInterface interrupt method so that we can wake up @@ -156,107 +156,112 @@ namespace repl { virtual Timestamp getMyLastOptime() const; - virtual OpTime getMyLastOptimeV1() const; + virtual OpTime getMyLastOptimeV1() const override; - virtual OID getElectionId(); + virtual ReadAfterOpTimeResponse waitUntilOpTime( + const OperationContext* txn, + const ReadAfterOpTimeArgs& settings) override; + + virtual OID getElectionId() override; - virtual OID getMyRID() const; + virtual OID getMyRID() const override; - virtual int getMyId() const; + virtual int getMyId() const override; - virtual bool setFollowerMode(const MemberState& newState); + virtual bool setFollowerMode(const MemberState& newState) override; - virtual bool isWaitingForApplierToDrain(); + virtual bool isWaitingForApplierToDrain() override; - virtual void signalDrainComplete(OperationContext* txn); + virtual void signalDrainComplete(OperationContext* txn) override; - virtual void signalUpstreamUpdater(); + virtual void signalUpstreamUpdater() override; - virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder); + virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) override; - virtual Status processReplSetGetStatus(BSONObjBuilder* result); + virtual Status processReplSetGetStatus(BSONObjBuilder* result) override; - virtual void fillIsMasterForReplSet(IsMasterResponse* result); + virtual void fillIsMasterForReplSet(IsMasterResponse* result) override; - virtual void appendSlaveInfoData(BSONObjBuilder* result); + virtual void appendSlaveInfoData(BSONObjBuilder* result) override; - virtual ReplicaSetConfig getConfig() const; + virtual ReplicaSetConfig getConfig() const override; - virtual void processReplSetGetConfig(BSONObjBuilder* result); + virtual void processReplSetGetConfig(BSONObjBuilder* result) override; - virtual Status setMaintenanceMode(bool activate); + virtual Status setMaintenanceMode(bool activate) override; - virtual bool getMaintenanceMode(); + virtual bool getMaintenanceMode() override; virtual Status processReplSetSyncFrom(const HostAndPort& target, - BSONObjBuilder* resultObj); + BSONObjBuilder* resultObj) override; - virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj); + virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj) override; virtual Status processHeartbeat(const ReplSetHeartbeatArgs& args, - ReplSetHeartbeatResponse* response); + ReplSetHeartbeatResponse* response) override; virtual Status processReplSetReconfig(OperationContext* txn, const ReplSetReconfigArgs& args, - BSONObjBuilder* resultObj); + BSONObjBuilder* resultObj) override; virtual Status processReplSetInitiate(OperationContext* txn, const BSONObj& configObj, - BSONObjBuilder* resultObj); + BSONObjBuilder* resultObj) override; - virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj); + virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj) override; - virtual void incrementRollbackID(); + virtual void incrementRollbackID() override; virtual Status processReplSetFresh(const ReplSetFreshArgs& args, - BSONObjBuilder* resultObj); + BSONObjBuilder* resultObj) override; virtual Status processReplSetElect(const ReplSetElectArgs& args, - BSONObjBuilder* response); + BSONObjBuilder* response) override; virtual Status processReplSetUpdatePosition(const UpdatePositionArgs& updates, - long long* configVersion); + long long* configVersion) override; - virtual Status processHandshake(OperationContext* txn, const HandshakeArgs& handshake); + virtual Status processHandshake(OperationContext* txn, + const HandshakeArgs& handshake) override; - virtual bool buildsIndexes(); + virtual bool buildsIndexes() override; - virtual std::vector<HostAndPort> getHostsWrittenTo(const Timestamp& op); + virtual std::vector<HostAndPort> getHostsWrittenTo(const Timestamp& op) override; - virtual std::vector<HostAndPort> getOtherNodesInReplSet() const; + virtual std::vector<HostAndPort> getOtherNodesInReplSet() const override; - virtual WriteConcernOptions getGetLastErrorDefault(); + virtual WriteConcernOptions getGetLastErrorDefault() override; - virtual Status checkReplEnabledForCommand(BSONObjBuilder* result); + virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override; - virtual bool isReplEnabled() const; + virtual bool isReplEnabled() const override; - virtual HostAndPort chooseNewSyncSource(); + virtual HostAndPort chooseNewSyncSource() override; - virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); + virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override; - virtual void resetLastOpTimeFromOplog(OperationContext* txn); + virtual void resetLastOpTimeFromOplog(OperationContext* txn) override; - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource); + virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) override; - virtual Timestamp getLastCommittedOpTime() const; + virtual Timestamp getLastCommittedOpTime() const override; virtual Status processReplSetRequestVotes(OperationContext* txn, const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response); + ReplSetRequestVotesResponse* response) override; virtual Status processReplSetDeclareElectionWinner( const ReplSetDeclareElectionWinnerArgs& args, - long long* responseTerm); + long long* responseTerm) override; virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder); virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, - ReplSetHeartbeatResponseV1* response); + ReplSetHeartbeatResponseV1* response) override; - virtual bool isV1ElectionProtocol(); + virtual bool isV1ElectionProtocol() override; - virtual void summarizeAsHtml(ReplSetHtmlSummary* s); + virtual void summarizeAsHtml(ReplSetHtmlSummary* s) override; // ================== Test support API =================== @@ -880,6 +885,10 @@ namespace repl { // WaiterInfos. std::vector<WaiterInfo*> _replicationWaiterList; // (M) + // list of information about clients waiting for a particular opTime. + // Does *not* own the WaiterInfos. + std::vector<WaiterInfo*> _opTimeWaiterList; // (M) + // Set to true when we are in the process of shutting down replication. bool _inShutdown; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index f486fed8694..b33b87b1a83 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -32,6 +32,7 @@ #include <boost/scoped_ptr.hpp> #include <boost/thread.hpp> +#include <future> #include <iostream> #include <memory> #include <set> @@ -42,6 +43,9 @@ #include "mongo/db/repl/is_master_response.h" #include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/operation_context_repl_mock.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/read_after_optime_args.h" +#include "mongo/db/repl/read_after_optime_response.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replica_set_config.h" @@ -57,12 +61,15 @@ #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" +#include "mongo/util/time_support.h" +#include "mongo/util/timer.h" namespace mongo { namespace repl { namespace { typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs; + Status kInterruptedStatus(ErrorCodes::Interrupted, "operation was interrupted"); TEST_F(ReplCoordTest, StartupWithValidLocalConfig) { assertStartSuccess( @@ -324,7 +331,7 @@ namespace { ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); BSONObjBuilder result1; - getExternalState()->setStoreLocalConfigDocumentStatus(Status(ErrorCodes::OutOfDiskSpace, + getExternalState()->setStoreLocalConfigDocumentStatus(Status(ErrorCodes::OutOfDiskSpace, "The test set this")); ASSERT_EQUALS( ErrorCodes::OutOfDiskSpace, @@ -840,50 +847,9 @@ namespace { awaiter.reset(); } - class OperationContextNoopWithInterrupt : public OperationContextReplMock { - public: - - OperationContextNoopWithInterrupt() : _opID(0), _interruptOp(false) {} - - virtual unsigned int getOpID() const { - return _opID; - } - - /** - * Can only be called before any multi-threaded access to this object has begun. - */ - void setOpID(unsigned int opID) { - _opID = opID; - } - - virtual void checkForInterrupt() const { - if (_interruptOp) { - uasserted(ErrorCodes::Interrupted, "operation was interrupted"); - } - } - - virtual Status checkForInterruptNoAssert() const { - if (_interruptOp) { - return Status(ErrorCodes::Interrupted, "operation was interrupted"); - } - return Status::OK(); - } - - /** - * Can only be called before any multi-threaded access to this object has begun. - */ - void setInterruptOp(bool interrupt) { - _interruptOp = interrupt; - } - - private: - unsigned int _opID; - bool _interruptOp; - }; - TEST_F(ReplCoordTest, AwaitReplicationInterrupt) { // Tests that a thread blocked in awaitReplication can be killed by a killOp operation - OperationContextNoopWithInterrupt txn; + OperationContextReplMock txn; assertStartSuccess( BSON("_id" << "mySet" << "version" << 2 << @@ -914,7 +880,7 @@ namespace { ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1)); ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time1)); - txn.setInterruptOp(true); + txn.setCheckForInterruptStatus(kInterruptedStatus); getReplCoord()->interrupt(opID); ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult(); ASSERT_EQUALS(ErrorCodes::Interrupted, statusAndDur.status); @@ -1202,7 +1168,7 @@ namespace { } TEST_F(StepDownTest, InterruptStepDown) { - OperationContextNoopWithInterrupt txn; + OperationContextReplMock txn; Timestamp optime1(100, 1); Timestamp optime2(100, 2); // No secondary is caught up @@ -1223,7 +1189,7 @@ namespace { unsigned int opID = 100; txn.setOpID(opID); - txn.setInterruptOp(true); + txn.setCheckForInterruptStatus(kInterruptedStatus); getReplCoord()->interrupt(opID); ASSERT_EQUALS(ErrorCodes::Interrupted, runner.getResult()); @@ -1558,7 +1524,7 @@ namespace { IsMasterResponse roundTripped; ASSERT_OK(roundTripped.initialize(response.toBSON())); } - + TEST_F(ReplCoordTest, ShutDownBeforeStartUpFinished) { init(); startCapturingLogMessages(); @@ -1945,6 +1911,172 @@ namespace { ASSERT_EQUALS(newTime, getReplCoord()->getLastCommittedOpTime()); } + TEST_F(ReplCoordTest, CantUseReadAfterIfNotReplSet) { + init(ReplSettings()); + OperationContextNoop txn; + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1), + Milliseconds(0))); + + ASSERT_FALSE(result.didWait()); + ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterWhileShutdown) { + OperationContextNoop txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + getReplCoord()->setMyLastOptime(Timestamp(10, 0)); + + shutdown(); + + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1), + Milliseconds(0))); + + ASSERT_TRUE(result.didWait()); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterInterrupted) { + OperationContextReplMock txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + getReplCoord()->setMyLastOptime(Timestamp(10, 0)); + + txn.setCheckForInterruptStatus(Status(ErrorCodes::Interrupted, "test")); + + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1), + Milliseconds(0))); + + ASSERT_TRUE(result.didWait()); + ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterNoOpTime) { + OperationContextNoop txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + auto result = getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs()); + + ASSERT_FALSE(result.didWait()); + ASSERT_OK(result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterGreaterOpTime) { + OperationContextNoop txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + getReplCoord()->setMyLastOptime(Timestamp(100, 0)); + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(OpTime(Timestamp(50, 0), 1), + Milliseconds(100))); + + ASSERT_TRUE(result.didWait()); + ASSERT_OK(result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterEqualOpTime) { + OperationContextNoop txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + + OpTime time(Timestamp(100, 0), 1); + getReplCoord()->setMyLastOptime(time.getTimestamp()); + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(time, Milliseconds(100))); + + ASSERT_TRUE(result.didWait()); + ASSERT_OK(result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterDeferredGreaterOpTime) { + OperationContextNoop txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + getReplCoord()->setMyLastOptime(Timestamp(0, 0)); + + auto pseudoLogOp = std::async(std::launch::async, [this]() { + // Not guaranteed to be scheduled after waitUnitl blocks... + getReplCoord()->setMyLastOptime(Timestamp(200, 0)); + }); + + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(OpTime(Timestamp(100, 0), 1), + Milliseconds(0))); + pseudoLogOp.get(); + + ASSERT_TRUE(result.didWait()); + ASSERT_OK(result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterDeferredEqualOpTime) { + OperationContextNoop txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + getReplCoord()->setMyLastOptime(Timestamp(0, 0)); + + repl::OpTime opTimeToWait(Timestamp(100, 0), 1); + + auto pseudoLogOp = std::async(std::launch::async, [this, &opTimeToWait]() { + // Not guaranteed to be scheduled after waitUnitl blocks... + getReplCoord()->setMyLastOptime(opTimeToWait.getTimestamp()); + }); + + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(opTimeToWait, Milliseconds(0))); + pseudoLogOp.get(); + + ASSERT_TRUE(result.didWait()); + ASSERT_OK(result.getStatus()); + } + + TEST_F(ReplCoordTest, ReadAfterOpTimeTimeoutNoMaxTimeMS) { + OperationContextNoop txn; + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))), + HostAndPort("node1", 12345)); + + getReplCoord()->setMyLastOptime(Timestamp(100, 0)); + + auto result = getReplCoord()->waitUntilOpTime(&txn, + ReadAfterOpTimeArgs(OpTime(Timestamp(200, 0), 1), Milliseconds(10))); + + ASSERT_TRUE(result.didWait()); + ASSERT_EQUALS(ErrorCodes::ReadAfterOptimeTimeout, result.getStatus()); + } + // TODO(schwerin): Unit test election id updating } // namespace diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 9d7f28e62af..ca9f32605ed 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -32,6 +32,8 @@ #include "mongo/base/status.h" #include "mongo/db/write_concern_options.h" +#include "mongo/db/repl/read_after_optime_args.h" +#include "mongo/db/repl/read_after_optime_response.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/optime.h" #include "mongo/util/assert_util.h" @@ -145,6 +147,12 @@ namespace repl { return Timestamp(); } + ReadAfterOpTimeResponse ReplicationCoordinatorMock::waitUntilOpTime( + const OperationContext* txn, + const ReadAfterOpTimeArgs& settings) { + return ReadAfterOpTimeResponse(); + } + OID ReplicationCoordinatorMock::getElectionId() { // TODO diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 79cf56507c8..d4f8bd33754 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -103,6 +103,10 @@ namespace repl { virtual Timestamp getMyLastOptime() const; + virtual ReadAfterOpTimeResponse waitUntilOpTime( + const OperationContext* txn, + const ReadAfterOpTimeArgs& settings) override; + virtual OID getElectionId(); virtual OID getMyRID() const; |