diff options
author | matt dannenberg <matt.dannenberg@10gen.com> | 2015-04-17 11:26:35 -0400 |
---|---|---|
committer | matt dannenberg <matt.dannenberg@10gen.com> | 2015-04-20 13:42:49 -0400 |
commit | 86841632812fa09ec74df84884773812c91b1f36 (patch) | |
tree | 995a7d92567ca7389ca74f178744b266782c1e26 | |
parent | c303b3739ba35fb3747143379b066ac1c3389cee (diff) | |
download | mongo-86841632812fa09ec74df84884773812c91b1f36.tar.gz |
SERVER-18110 create Reporter to replace SyncSourceFeedback
-rw-r--r-- | src/mongo/db/repl/SConscript | 24 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/reporter.cpp | 138 | ||||
-rw-r--r-- | src/mongo/db/repl/reporter.h | 118 | ||||
-rw-r--r-- | src/mongo/db/repl/reporter_test.cpp | 269 |
5 files changed, 551 insertions, 1 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index ee47a02bb78..78672a5f87f 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -113,6 +113,7 @@ env.Library('repl_coordinator_impl', 'repl_coordinator_interface', 'replica_set_messages', 'replication_executor', + 'reporter', 'rslog', 'topology_coordinator', ]) @@ -182,6 +183,7 @@ env.Library('repl_coordinator_interface', 'replication_coordinator_external_state.cpp'], LIBDEPS=[ '$BUILD_DIR/mongo/hostandport', + 'reporter', ]) env.Library('repl_coordinator_global', @@ -253,6 +255,19 @@ env.Library( ) env.Library( + target='reporter', + source=[ + 'reporter.cpp', + ], + LIBDEPS=[ + 'replication_executor', + '$BUILD_DIR/mongo/logger/logger', + '$BUILD_DIR/mongo/namespace_string', + '$BUILD_DIR/mongo/util/net/command_status', + ], +) + +env.Library( target='fetcher', source=[ 'fetcher.cpp', @@ -266,6 +281,15 @@ env.Library( ) env.CppUnitTest( + target='reporter_test', + source='reporter_test.cpp', + LIBDEPS=[ + 'reporter', + 'replication_executor_test_fixture', + ], +) + +env.CppUnitTest( target='fetcher_test', source='fetcher_test.cpp', LIBDEPS=[ diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 6b86010122a..8748f764b02 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -35,6 +35,7 @@ #include "mongo/base/status.h" #include "mongo/db/repl/member_state.h" #include "mongo/db/repl/repl_settings.h" +#include "mongo/db/repl/reporter.h" #include "mongo/util/net/hostandport.h" namespace mongo { @@ -75,7 +76,7 @@ namespace repl { * with the rest of the system. The public methods on ReplicationCoordinator are the public * API that the replication subsystem presents to the rest of the codebase. */ - class ReplicationCoordinator { + class ReplicationCoordinator : ReplicationProgressManager { MONGO_DISALLOW_COPYING(ReplicationCoordinator); public: diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp new file mode 100644 index 00000000000..c4dd0daac14 --- /dev/null +++ b/src/mongo/db/repl/reporter.cpp @@ -0,0 +1,138 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/reporter.h" + +#include "mongo/db/repl/replication_executor.h" + +namespace mongo { +namespace repl { + + ReplicationProgressManager::~ReplicationProgressManager() {} + + Reporter::Reporter(ReplicationExecutor* executor, + ReplicationProgressManager* ReplicationProgressManager, + const HostAndPort& target) + : _executor(executor), + _updatePositionSource(ReplicationProgressManager), + _target(target), + _status(Status::OK()), + _willRunAgain(false), + _active(false) { + + uassert(ErrorCodes::BadValue, "null replication executor", executor); + uassert(ErrorCodes::BadValue, + "null replication progress manager", + ReplicationProgressManager); + uassert(ErrorCodes::BadValue, "target name cannot be empty", !target.empty()); + } + + Reporter::~Reporter() { + cancel(); + } + + void Reporter::cancel() { + boost::lock_guard<boost::mutex> lk(_mutex); + + if (!_active) { + return; + } + + _status = Status(ErrorCodes::CallbackCanceled, "Reporter no longer valid"); + _active = false; + _willRunAgain = false; + invariant(_remoteCommandCallbackHandle.isValid()); + _executor->cancel(_remoteCommandCallbackHandle); + } + + Status Reporter::trigger() { + boost::lock_guard<boost::mutex> lk(_mutex); + return _schedule_inlock(); + } + + Status Reporter::_schedule_inlock() { + if (!_status.isOK()) { + return _status; + } + + if (_active) { + _willRunAgain = true; + return _status; + } + + _willRunAgain = false; + + BSONObjBuilder cmd; + _updatePositionSource->prepareReplSetUpdatePositionCommand(&cmd); + StatusWith<ReplicationExecutor::CallbackHandle> scheduleResult = + _executor->scheduleRemoteCommand( + ReplicationExecutor::RemoteCommandRequest(_target, "admin", cmd.obj()), + stdx::bind(&Reporter::_callback, this, stdx::placeholders::_1)); + + if (!scheduleResult.isOK()) { + _status = scheduleResult.getStatus(); + return _status; + } + + _active = true; + _remoteCommandCallbackHandle = scheduleResult.getValue(); + return Status::OK(); + } + + void Reporter::_callback(const ReplicationExecutor::RemoteCommandCallbackData& rcbd) { + boost::lock_guard<boost::mutex> lk(_mutex); + + _status = rcbd.response.getStatus(); + _active = false; + + if (_status.isOK() && _willRunAgain) { + _schedule_inlock(); + } + else { + _willRunAgain = false; + } + } + + Status Reporter::previousReturnStatus() const { + boost::lock_guard<boost::mutex> lk(_mutex); + return _status; + } + + bool Reporter::isActive() const { + boost::lock_guard<boost::mutex> lk(_mutex); + return _active; + } + + bool Reporter::willRunAgain() const { + boost::lock_guard<boost::mutex> lk(_mutex); + return _willRunAgain; + } +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/reporter.h b/src/mongo/db/repl/reporter.h new file mode 100644 index 00000000000..c96a896cd88 --- /dev/null +++ b/src/mongo/db/repl/reporter.h @@ -0,0 +1,118 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status.h" +#include "mongo/db/repl/replication_executor.h" +#include "mongo/stdx/functional.h" + +namespace mongo { +namespace repl { + + class ReplicationProgressManager { + public: + virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) = 0; + virtual ~ReplicationProgressManager(); + }; + + class Reporter { + MONGO_DISALLOW_COPYING(Reporter); + + public: + Reporter(ReplicationExecutor* executor, + ReplicationProgressManager* ReplicationProgressManager, + const HostAndPort& target); + virtual ~Reporter(); + + /** + * Returns true if a remote command has been scheduled (but not completed) + * with the executor. + */ + bool isActive() const; + + /** + * Returns true if a remote command should be scheduled once the current one returns + * from the executor. + */ + bool willRunAgain() const; + + /** + * Cancels remote command request. + * Returns immediately if the Reporter is not active. + */ + void cancel(); + + /** + * Signals to the Reporter that there is new information to be sent to the "_target" server. + * Returns the _status, indicating any error the Reporter has encountered. + */ + Status trigger(); + + /** + * Returns the previous return status so that the owner can decide whether the Reporter + * needs a new target to whom it can report. + */ + Status previousReturnStatus() const; + + private: + /** + * Schedules remote command to be run by the executor + */ + Status _schedule_inlock(); + + /** + * Callback for remote command. + */ + void _callback(const ReplicationExecutor::RemoteCommandCallbackData& rcbd); + + // Not owned by us. + ReplicationExecutor* _executor; + ReplicationProgressManager* _updatePositionSource; + + // Host to whom the Reporter sends updates. + HostAndPort _target; + + // Protects member data of this Reporter. + mutable boost::mutex _mutex; + + // Stores the most recent Status returned from the ReplicationExecutor. + Status _status; + + // _willRunAgain is true when Reporter is scheduled to be run by the executor and subsequent + // updates have come in. + bool _willRunAgain; + // _active is true when Reporter is scheduled to be run by the executor. + bool _active; + + // Callback handle to the scheduled remote command. + ReplicationExecutor::CallbackHandle _remoteCommandCallbackHandle; + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp new file mode 100644 index 00000000000..d0a9bc51638 --- /dev/null +++ b/src/mongo/db/repl/reporter_test.cpp @@ -0,0 +1,269 @@ +/** + * Copyright 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/network_interface_mock.h" +#include "mongo/db/repl/replication_executor_test_fixture.h" +#include "mongo/db/repl/reporter.h" + +#include "mongo/unittest/unittest.h" + +namespace { + + using namespace mongo; + using namespace mongo::repl; + + class MockProgressManager : public ReplicationProgressManager { + public: + void updateMap(int memberId, const Timestamp& ts) { + progressMap[memberId] = ts; + } + + bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) { + cmdBuilder->append("replSetUpdatePosition", 1); + BSONArrayBuilder arrayBuilder(cmdBuilder->subarrayStart("optimes")); + for (auto itr = progressMap.begin(); itr != progressMap.end(); ++itr) { + BSONObjBuilder entry(arrayBuilder.subobjStart()); + entry.append("optime", itr->second); + entry.append("memberId", itr->first); + entry.append("cfgver", 1); + } + return true; + } + private: + std::map<int, Timestamp> progressMap; + }; + + class ReporterTest : public ReplicationExecutorTest { + public: + static Status getDefaultStatus(); + ReporterTest(); + void setUp() override; + void tearDown() override; + void scheduleNetworkResponse(const BSONObj& obj); + void scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason); + void finishProcessingNetworkResponse(); + + protected: + std::unique_ptr<Reporter> reporter; + std::unique_ptr<MockProgressManager> posUpdater; + + }; + + ReporterTest::ReporterTest() {} + + Status ReporterTest::getDefaultStatus() { + return Status(ErrorCodes::InternalError, "Not mutated"); + } + + void ReporterTest::setUp() { + ReplicationExecutorTest::setUp(); + posUpdater.reset(new MockProgressManager()); + reporter.reset(new Reporter(&getExecutor(), posUpdater.get(), HostAndPort("h1"))); + launchExecutorThread(); + } + + void ReporterTest::tearDown() { + ReplicationExecutorTest::tearDown(); + // Executor may still invoke reporter's callback before shutting down. + posUpdater.reset(); + reporter.reset(); + } + + void ReporterTest::scheduleNetworkResponse(const BSONObj& obj) { + NetworkInterfaceMock* net = getNet(); + ASSERT_TRUE(net->hasReadyRequests()); + ReplicationExecutor::Milliseconds millis(0); + ReplicationExecutor::RemoteCommandResponse response(obj, millis); + ReplicationExecutor::ResponseStatus responseStatus(response); + net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); + } + + void ReporterTest::scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason) { + NetworkInterfaceMock* net = getNet(); + ASSERT_TRUE(net->hasReadyRequests()); + ReplicationExecutor::ResponseStatus responseStatus(code, reason); + net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); + } + + TEST_F(ReporterTest, InvalidConstruction) { + // null ReplicationProgressManager + ASSERT_THROWS(Reporter(&getExecutor(), nullptr, HostAndPort("h1")), UserException); + + // null ReplicationExecutor + ASSERT_THROWS(Reporter(nullptr, posUpdater.get(), HostAndPort("h1")), UserException); + + // empty HostAndPort + ASSERT_THROWS(Reporter(&getExecutor(), posUpdater.get(), HostAndPort()), UserException); + } + + TEST_F(ReporterTest, IsActiveOnceScheduled) { + ASSERT_FALSE(reporter->isActive()); + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + } + + TEST_F(ReporterTest, CancelWithoutScheduled) { + ASSERT_FALSE(reporter->isActive()); + reporter->cancel(); + ASSERT_FALSE(reporter->isActive()); + } + + TEST_F(ReporterTest, ShutdownBeforeSchedule) { + getExecutor().shutdown(); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, reporter->trigger()); + ASSERT_FALSE(reporter->isActive()); + } + + // If an error is returned, it should be recorded in the Reporter and be returned when triggered + TEST_F(ReporterTest, ErrorsAreStoredInTheReporter) { + posUpdater->updateMap(0, Timestamp(3,0)); + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + scheduleNetworkResponse(ErrorCodes::NoSuchKey, "waaaah"); + getNet()->runReadyNetworkOperations(); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->previousReturnStatus()); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->trigger()); + ASSERT_FALSE(reporter->isActive()); + ASSERT_FALSE(getNet()->hasReadyRequests()); + } + + // If an error is returned, it should be recorded in the Reporter and not run again. + TEST_F(ReporterTest, ErrorsStopTheReporter) { + posUpdater->updateMap(0, Timestamp(3,0)); + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_FALSE(reporter->willRunAgain()); + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_TRUE(reporter->willRunAgain()); + + scheduleNetworkResponse(ErrorCodes::NoSuchKey, "waaaah"); + getNet()->runReadyNetworkOperations(); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->previousReturnStatus()); + ASSERT_FALSE(reporter->willRunAgain()); + ASSERT_FALSE(reporter->isActive()); + ASSERT_FALSE(getNet()->hasReadyRequests()); + } + + // Schedule while we are already scheduled, it should set willRunAgain, then automatically + // schedule itself after finishing. + TEST_F(ReporterTest, DoubleScheduleShouldCauseRescheduleImmediatelyAfterRespondedTo) { + posUpdater->updateMap(0, Timestamp(3,0)); + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_FALSE(reporter->willRunAgain()); + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_TRUE(reporter->willRunAgain()); + + scheduleNetworkResponse(BSON("ok" << 1)); + getNet()->runReadyNetworkOperations(); + ASSERT_TRUE(getNet()->hasReadyRequests()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_FALSE(reporter->willRunAgain()); + + scheduleNetworkResponse(BSON("ok" << 1)); + getNet()->runReadyNetworkOperations(); + ASSERT_FALSE(getNet()->hasReadyRequests()); + ASSERT_FALSE(reporter->isActive()); + ASSERT_FALSE(reporter->willRunAgain()); + } + + // Schedule multiple times while we are already scheduled, it should set willRunAgain, + // then automatically schedule itself after finishing, but not a third time since the latter two + // will contian the same batch of updates. + TEST_F(ReporterTest, TripleScheduleShouldCauseRescheduleImmediatelyAfterRespondedToOnlyOnce) { + posUpdater->updateMap(0, Timestamp(3,0)); + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_FALSE(reporter->willRunAgain()); + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_TRUE(reporter->willRunAgain()); + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_TRUE(reporter->willRunAgain()); + + scheduleNetworkResponse(BSON("ok" << 1)); + getNet()->runReadyNetworkOperations(); + ASSERT_TRUE(getNet()->hasReadyRequests()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_FALSE(reporter->willRunAgain()); + + scheduleNetworkResponse(BSON("ok" << 1)); + getNet()->runReadyNetworkOperations(); + ASSERT_FALSE(getNet()->hasReadyRequests()); + ASSERT_FALSE(reporter->isActive()); + ASSERT_FALSE(reporter->willRunAgain()); + } + + TEST_F(ReporterTest, CancelWhileScheduled) { + posUpdater->updateMap(0, Timestamp(3,0)); + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_FALSE(reporter->willRunAgain()); + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_TRUE(reporter->willRunAgain()); + + reporter->cancel(); + ASSERT_FALSE(reporter->isActive()); + ASSERT_FALSE(reporter->willRunAgain()); + ASSERT_FALSE(getNet()->hasReadyRequests()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->previousReturnStatus()); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->trigger()); + } + + TEST_F(ReporterTest, CancelAfterFirstReturns) { + posUpdater->updateMap(0, Timestamp(3,0)); + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_FALSE(reporter->willRunAgain()); + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_TRUE(reporter->willRunAgain()); + + scheduleNetworkResponse(BSON("ok" << 1)); + getNet()->runReadyNetworkOperations(); + ASSERT_TRUE(getNet()->hasReadyRequests()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_FALSE(reporter->willRunAgain()); + + reporter->cancel(); + ASSERT_FALSE(reporter->isActive()); + ASSERT_FALSE(reporter->willRunAgain()); + ASSERT_FALSE(getNet()->hasReadyRequests()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->previousReturnStatus()); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->trigger()); + } + +} // namespace |