diff options
author | Misha Tyulenev <misha@mongodb.com> | 2016-05-26 16:09:19 -0400 |
---|---|---|
committer | Misha Tyulenev <misha@mongodb.com> | 2016-05-26 17:03:51 -0400 |
commit | b6221d5e2f3e95221d73947bf0fba6772b19e49b (patch) | |
tree | 908f09fd4d18e63ad07e5618efece41b2dc26351 /src/mongo/client | |
parent | cbb1f07f40d8487bfeb5dfce4ccfb5d461d9a2f6 (diff) | |
download | mongo-b6221d5e2f3e95221d73947bf0fba6772b19e49b.tar.gz |
SERVER-22564 Move ReplicaSetMonitor to TaskExecutor thread
Diffstat (limited to 'src/mongo/client')
-rw-r--r-- | src/mongo/client/SConscript | 14 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor.cpp | 212 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor.h | 23 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_manager.cpp | 40 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_manager.h | 9 |
5 files changed, 177 insertions, 121 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 3145929e5f9..1506ad0b680 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -209,13 +209,23 @@ env.CppUnitTest( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/write_concern_options', - 'clientdriver' + 'clientdriver', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor', + '$BUILD_DIR/mongo/executor/network_interface_thread_pool', + '$BUILD_DIR/mongo/executor/network_interface_factory' ] ) env.CppUnitTest('dbclient_rs_test', ['dbclient_rs_test.cpp'], - LIBDEPS=['clientdriver', '$BUILD_DIR/mongo/dbtests/mocklib']) + LIBDEPS=[ + 'clientdriver', + '$BUILD_DIR/mongo/dbtests/mocklib', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor', + '$BUILD_DIR/mongo/executor/network_interface_thread_pool', + '$BUILD_DIR/mongo/executor/network_interface_factory' + ] +) env.CppUnitTest( target='scoped_db_conn_test', diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp index 970d2ca3818..250e09992d8 100644 --- a/src/mongo/client/replica_set_monitor.cpp +++ b/src/mongo/client/replica_set_monitor.cpp @@ -37,7 +37,9 @@ #include "mongo/client/connpool.h" #include "mongo/client/global_conn_pool.h" #include "mongo/client/replica_set_monitor_internal.h" +#include "mongo/db/operation_context.h" #include "mongo/db/server_options.h" +#include "mongo/s/grid.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" @@ -70,6 +72,9 @@ typedef ReplicaSetMonitor::Refresher Refresher; typedef ScanState::UnconfirmedReplies UnconfirmedReplies; typedef SetState::Node Node; typedef SetState::Nodes Nodes; +using executor::TaskExecutor; +using CallbackArgs = TaskExecutor::CallbackArgs; +using CallbackHandle = TaskExecutor::CallbackHandle; const double socketTimeoutSecs = 5; @@ -83,113 +88,6 @@ const Milliseconds kFindHostMaxBackOffTime(500); ReplicaSetMonitor::ConfigChangeHook asyncConfigChangeHook; ReplicaSetMonitor::ConfigChangeHook syncConfigChangeHook; -// global background job responsible for checking every X amount of time -class ReplicaSetMonitorWatcher : public BackgroundJob { -public: - ReplicaSetMonitorWatcher() : _started(false), _stopRequested(false) {} - - ~ReplicaSetMonitorWatcher() { - stop(); - - // We relying on the fact that if the monitor was rerun again, wait will not hang - // because _destroyingStatics will make the run method exit immediately. - dassert(StaticObserver::_destroyingStatics); - if (running()) { - wait(); - } - } - - virtual string name() const { - return "ReplicaSetMonitorWatcher"; - } - - void safeGo() { - stdx::lock_guard<stdx::mutex> lk(_monitorMutex); - if (_started) - return; - - _started = true; - _stopRequested = false; - - go(); - } - - /** - * Stops monitoring the sets and wait for the monitoring thread to terminate. - */ - void stop() { - stdx::lock_guard<stdx::mutex> sl(_monitorMutex); - _stopRequested = true; - _stopRequestedCV.notify_one(); - } - -protected: - void run() { - log() << "starting"; // includes thread name in output - - // Added only for patching timing problems in test. Remove after tests - // are fixed - see 392b933598668768bf12b1e41ad444aa3548d970. - // Should not be needed after SERVER-7533 gets implemented and tests start - // using it. - if (!inShutdown() && !StaticObserver::_destroyingStatics) { - stdx::unique_lock<stdx::mutex> sl(_monitorMutex); - _stopRequestedCV.wait_for(sl, Seconds(10).toSystemDuration()); - } - - while (!inShutdown() && !StaticObserver::_destroyingStatics) { - { - stdx::lock_guard<stdx::mutex> sl(_monitorMutex); - if (_stopRequested) { - break; - } - } - - try { - checkAllSets(); - } catch (const std::exception& e) { - error() << "check all sets failed: " << e.what(); - } catch (...) { - error() << "unknown error"; - } - - stdx::unique_lock<stdx::mutex> sl(_monitorMutex); - if (_stopRequested) { - break; - } - - _stopRequestedCV.wait_for(sl, Seconds(10).toSystemDuration()); - } - } - - void checkAllSets() { - for (const string& setName : globalRSMonitorManager.getAllSetNames()) { - shared_ptr<ReplicaSetMonitor> m = globalRSMonitorManager.getMonitor(setName); - if (!m) { - continue; - } - - Timer t; - m->startOrContinueRefresh().refreshAll(); - LOG(1) << "Refreshing replica set " << setName << " took " << t.millis() << " msec"; - - if (!m->isSetUsable()) { - log() << "Stopping periodic monitoring of set " << m->getName() - << " because none of the hosts could be contacted for an extended period of " - "time."; - - ReplicaSetMonitor::remove(m->getName()); - } - } - } - - // protects _started, _stopRequested - stdx::mutex _monitorMutex; - bool _started; - - stdx::condition_variable _stopRequestedCV; - bool _stopRequested; -} replicaSetMonitorWatcher; - StaticObserver staticObserver; // @@ -253,6 +151,11 @@ struct HostNotIn { } const std::set<HostAndPort>& _hosts; }; +/** + * Replica set refresh period on the task executor. + */ +const Seconds kRefreshPeriod(30); + } // namespace @@ -266,7 +169,94 @@ const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15); bool ReplicaSetMonitor::useDeterministicHostSelection = false; ReplicaSetMonitor::ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds) - : _state(std::make_shared<SetState>(name, seeds)) {} + : _state(std::make_shared<SetState>(name, seeds)), + _executor(globalRSMonitorManager.getExecutor()) {} + +void ReplicaSetMonitor::init() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + std::weak_ptr<ReplicaSetMonitor> that(shared_from_this()); + auto status = _executor->scheduleWork([=](const CallbackArgs& cbArgs) { + if (auto ptr = that.lock()) { + ptr->_refresh(cbArgs); + } + }); + + if (status.getStatus() == ErrorCodes::ShutdownInProgress) { + LOG(1) << "Couldn't schedule refresh for " << getName() + << ". Executor shutdown in progress"; + return; + } + + if (!status.isOK()) { + severe() << "Can't start refresh for replica set " << getName() + << causedBy(status.getStatus()); + fassertFailed(40139); + } + + _refresherHandle = status.getValue(); +} + +ReplicaSetMonitor::~ReplicaSetMonitor() { + // need this lock because otherwise can get race with scheduling in _refresh + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_executor); + + if (!_refresherHandle) { + return; + } + _executor->cancel(_refresherHandle); + // Note: calling _executor->wait(_refresherHandle); from the dispatcher thread will cause hang + // Its ok not to call it because the d-tor is called only when the last owning pointer goes out + // of scope, so as taskExecutor queue holds a weak pointer to RSM it will not be able to get a + // task to execute eliminating the need to call method "wait". + // + _refresherHandle = {}; +} + +void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) { + if (!cbArgs.status.isOK()) { + return; + } + + Timer t; + startOrContinueRefresh().refreshAll(); + LOG(1) << "Refreshing replica set " << getName() << " took " << t.millis() << " msec"; + + if (!isSetUsable()) { + log() << "Stopping periodic monitoring of set " << getName() + << " because none of the hosts could be contacted for an extended period of " + "time."; + + ReplicaSetMonitor::remove(getName()); + return; + } + + { + // reschedule itself + stdx::lock_guard<stdx::mutex> lk(_mutex); + std::weak_ptr<ReplicaSetMonitor> that(shared_from_this()); + auto status = _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod, + [=](const CallbackArgs& cbArgs) { + if (auto ptr = that.lock()) { + ptr->_refresh(cbArgs); + } + }); + + if (status.getStatus() == ErrorCodes::ShutdownInProgress) { + LOG(1) << "Cant schedule refresh for " << getName() + << ". Executor shutdown in progress"; + return; + } + + if (!status.isOK()) { + severe() << "Can't continue refresh for replica set " << getName() << " due to " + << status.getStatus().toString(); + fassertFailed(40140); + } + + _refresherHandle = status.getValue(); + } +} StatusWith<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria, Milliseconds maxWait) { @@ -391,8 +381,6 @@ bool ReplicaSetMonitor::contains(const HostAndPort& host) const { void ReplicaSetMonitor::createIfNeeded(const string& name, const set<HostAndPort>& servers) { globalRSMonitorManager.getOrCreateMonitor( ConnectionString::forReplicaSet(name, vector<HostAndPort>(servers.begin(), servers.end()))); - - replicaSetMonitorWatcher.safeGo(); } shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::get(const std::string& name) { @@ -452,10 +440,6 @@ void ReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder) const { } void ReplicaSetMonitor::cleanup() { - // Call cancel first, in case the RSMW was never started. - replicaSetMonitorWatcher.cancel(); - replicaSetMonitorWatcher.stop(); - replicaSetMonitorWatcher.wait(); globalRSMonitorManager.removeAllMonitors(); asyncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook(); syncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook(); diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h index 5847a6af6cd..fc415806b0f 100644 --- a/src/mongo/client/replica_set_monitor.h +++ b/src/mongo/client/replica_set_monitor.h @@ -31,9 +31,11 @@ #include <memory> #include <set> #include <string> +#include <memory> #include "mongo/base/disallow_copying.h" #include "mongo/base/string_data.h" +#include "mongo/executor/task_executor.h" #include "mongo/stdx/functional.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -49,7 +51,7 @@ typedef std::shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorPtr; * Holds state about a replica set and provides a means to refresh the local view. * All methods perform the required synchronization to allow callers from multiple threads. */ -class ReplicaSetMonitor { +class ReplicaSetMonitor : public std::enable_shared_from_this<ReplicaSetMonitor> { MONGO_DISALLOW_COPYING(ReplicaSetMonitor); public: @@ -66,6 +68,11 @@ public: ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds); /** + * Schedules the initial refresh task into task executor. + */ + void init(); + + /** * Returns a host matching the given read preference or an error, if no host matches. * * @param readPref Read preference to match against @@ -224,10 +231,11 @@ public: * Allows tests to set initial conditions and introspect the current state. */ explicit ReplicaSetMonitor(const SetStatePtr& initialState) : _state(initialState) {} + ~ReplicaSetMonitor(); /** * If a ReplicaSetMonitor has been refreshed more than this many times in a row without - * finding any live nodes claiming to be in the set, the ReplicaSetMonitorWatcher will stop + * finding any live nodes claiming to be in the set, the ReplicaSetMonitor will stop * periodic background refreshes of this set. */ static std::atomic<int> maxConsecutiveFailedChecks; // NOLINT @@ -248,7 +256,18 @@ public: static bool useDeterministicHostSelection; private: + /** + * A callback passed to a task executor to refresh the replica set. It reschedules itself until + * its canceled in d-tor. + */ + void _refresh(const executor::TaskExecutor::CallbackArgs&); + + // Serializes refresh and protects _refresherHandle + stdx::mutex _mutex; + executor::TaskExecutor::CallbackHandle _refresherHandle; + const SetStatePtr _state; + executor::TaskExecutor* _executor; }; diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp index 1ef7ae0e254..2789d8c4df9 100644 --- a/src/mongo/client/replica_set_monitor_manager.cpp +++ b/src/mongo/client/replica_set_monitor_manager.cpp @@ -35,6 +35,12 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/client/connection_string.h" #include "mongo/client/replica_set_monitor.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/network_interface_thread_pool.h" +#include "mongo/executor/task_executor.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/util/log.h" #include "mongo/util/map_util.h" @@ -46,7 +52,13 @@ using std::set; using std::string; using std::vector; -ReplicaSetMonitorManager::ReplicaSetMonitorManager() = default; +using executor::NetworkInterface; +using executor::NetworkInterfaceThreadPool; +using executor::TaskExecutorPool; +using executor::TaskExecutor; +using executor::ThreadPoolTaskExecutor; + +ReplicaSetMonitorManager::ReplicaSetMonitorManager() {} ReplicaSetMonitorManager::~ReplicaSetMonitorManager() = default; @@ -61,6 +73,16 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor( invariant(connStr.type() == ConnectionString::SET); stdx::lock_guard<stdx::mutex> lk(_mutex); + if (!_taskExecutor) { + // construct task executor + auto net = executor::makeNetworkInterface("ReplicaSetMonitor-TaskExecutor"); + auto netPtr = net.get(); + _taskExecutor = stdx::make_unique<ThreadPoolTaskExecutor>( + stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net)); + LOG(1) << "Starting up task executor for monitoring replica sets in response to request to " + "monitor set: " << connStr.toString(); + _taskExecutor->startup(); + } shared_ptr<ReplicaSetMonitor>& monitor = _monitors[connStr.getSetName()]; if (!monitor) { @@ -70,6 +92,7 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor( log() << "Starting new replica set monitor for " << connStr.toString(); monitor = std::make_shared<ReplicaSetMonitor>(connStr.getSetName(), servers); + monitor->init(); } return monitor; @@ -98,9 +121,15 @@ void ReplicaSetMonitorManager::removeMonitor(StringData setName) { void ReplicaSetMonitorManager::removeAllMonitors() { stdx::lock_guard<stdx::mutex> lk(_mutex); - - // Reset the StringMap, which will release all registered monitors + // Reset the _monitors map, which will release all registered monitors _monitors = ReplicaSetMonitorsMap(); + + if (_taskExecutor) { + LOG(1) << "Shutting down task executor used for monitoring replica sets"; + _taskExecutor->shutdown(); + _taskExecutor->join(); + _taskExecutor.release(); + } } void ReplicaSetMonitorManager::report(BSONObjBuilder* builder) { @@ -119,4 +148,9 @@ void ReplicaSetMonitorManager::report(BSONObjBuilder* builder) { } } +TaskExecutor* ReplicaSetMonitorManager::getExecutor() { + invariant(_taskExecutor); + return _taskExecutor.get(); +} + } // namespace mongo diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h index 0a69fe72a4c..a776ccda767 100644 --- a/src/mongo/client/replica_set_monitor_manager.h +++ b/src/mongo/client/replica_set_monitor_manager.h @@ -32,6 +32,7 @@ #include <vector> #include "mongo/base/disallow_copying.h" +#include "mongo/executor/task_executor.h" #include "mongo/stdx/mutex.h" #include "mongo/util/string_map.h" @@ -80,12 +81,20 @@ public: */ void report(BSONObjBuilder* builder); + /** + * Returns an executor for running RSM tasks. + */ + executor::TaskExecutor* getExecutor(); + private: using ReplicaSetMonitorsMap = StringMap<std::shared_ptr<ReplicaSetMonitor>>; // Protects access to the replica set monitors stdx::mutex _mutex; ReplicaSetMonitorsMap _monitors; + + // Executor for monitoring replica sets. + std::unique_ptr<executor::TaskExecutor> _taskExecutor; }; } // namespace mongo |