summaryrefslogtreecommitdiff
path: root/src/mongo/client
diff options
context:
space:
mode:
authorMisha Tyulenev <misha@mongodb.com>2016-05-26 16:09:19 -0400
committerMisha Tyulenev <misha@mongodb.com>2016-05-26 17:03:51 -0400
commitb6221d5e2f3e95221d73947bf0fba6772b19e49b (patch)
tree908f09fd4d18e63ad07e5618efece41b2dc26351 /src/mongo/client
parentcbb1f07f40d8487bfeb5dfce4ccfb5d461d9a2f6 (diff)
downloadmongo-b6221d5e2f3e95221d73947bf0fba6772b19e49b.tar.gz
SERVER-22564 Move ReplicaSetMonitor to TaskExecutor thread
Diffstat (limited to 'src/mongo/client')
-rw-r--r--src/mongo/client/SConscript14
-rw-r--r--src/mongo/client/replica_set_monitor.cpp212
-rw-r--r--src/mongo/client/replica_set_monitor.h23
-rw-r--r--src/mongo/client/replica_set_monitor_manager.cpp40
-rw-r--r--src/mongo/client/replica_set_monitor_manager.h9
5 files changed, 177 insertions, 121 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index 3145929e5f9..1506ad0b680 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -209,13 +209,23 @@ env.CppUnitTest(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/write_concern_options',
- 'clientdriver'
+ 'clientdriver',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor',
+ '$BUILD_DIR/mongo/executor/network_interface_thread_pool',
+ '$BUILD_DIR/mongo/executor/network_interface_factory'
]
)
env.CppUnitTest('dbclient_rs_test',
['dbclient_rs_test.cpp'],
- LIBDEPS=['clientdriver', '$BUILD_DIR/mongo/dbtests/mocklib'])
+ LIBDEPS=[
+ 'clientdriver',
+ '$BUILD_DIR/mongo/dbtests/mocklib',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor',
+ '$BUILD_DIR/mongo/executor/network_interface_thread_pool',
+ '$BUILD_DIR/mongo/executor/network_interface_factory'
+ ]
+)
env.CppUnitTest(
target='scoped_db_conn_test',
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp
index 970d2ca3818..250e09992d8 100644
--- a/src/mongo/client/replica_set_monitor.cpp
+++ b/src/mongo/client/replica_set_monitor.cpp
@@ -37,7 +37,9 @@
#include "mongo/client/connpool.h"
#include "mongo/client/global_conn_pool.h"
#include "mongo/client/replica_set_monitor_internal.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/server_options.h"
+#include "mongo/s/grid.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
@@ -70,6 +72,9 @@ typedef ReplicaSetMonitor::Refresher Refresher;
typedef ScanState::UnconfirmedReplies UnconfirmedReplies;
typedef SetState::Node Node;
typedef SetState::Nodes Nodes;
+using executor::TaskExecutor;
+using CallbackArgs = TaskExecutor::CallbackArgs;
+using CallbackHandle = TaskExecutor::CallbackHandle;
const double socketTimeoutSecs = 5;
@@ -83,113 +88,6 @@ const Milliseconds kFindHostMaxBackOffTime(500);
ReplicaSetMonitor::ConfigChangeHook asyncConfigChangeHook;
ReplicaSetMonitor::ConfigChangeHook syncConfigChangeHook;
-// global background job responsible for checking every X amount of time
-class ReplicaSetMonitorWatcher : public BackgroundJob {
-public:
- ReplicaSetMonitorWatcher() : _started(false), _stopRequested(false) {}
-
- ~ReplicaSetMonitorWatcher() {
- stop();
-
- // We relying on the fact that if the monitor was rerun again, wait will not hang
- // because _destroyingStatics will make the run method exit immediately.
- dassert(StaticObserver::_destroyingStatics);
- if (running()) {
- wait();
- }
- }
-
- virtual string name() const {
- return "ReplicaSetMonitorWatcher";
- }
-
- void safeGo() {
- stdx::lock_guard<stdx::mutex> lk(_monitorMutex);
- if (_started)
- return;
-
- _started = true;
- _stopRequested = false;
-
- go();
- }
-
- /**
- * Stops monitoring the sets and wait for the monitoring thread to terminate.
- */
- void stop() {
- stdx::lock_guard<stdx::mutex> sl(_monitorMutex);
- _stopRequested = true;
- _stopRequestedCV.notify_one();
- }
-
-protected:
- void run() {
- log() << "starting"; // includes thread name in output
-
- // Added only for patching timing problems in test. Remove after tests
- // are fixed - see 392b933598668768bf12b1e41ad444aa3548d970.
- // Should not be needed after SERVER-7533 gets implemented and tests start
- // using it.
- if (!inShutdown() && !StaticObserver::_destroyingStatics) {
- stdx::unique_lock<stdx::mutex> sl(_monitorMutex);
- _stopRequestedCV.wait_for(sl, Seconds(10).toSystemDuration());
- }
-
- while (!inShutdown() && !StaticObserver::_destroyingStatics) {
- {
- stdx::lock_guard<stdx::mutex> sl(_monitorMutex);
- if (_stopRequested) {
- break;
- }
- }
-
- try {
- checkAllSets();
- } catch (const std::exception& e) {
- error() << "check all sets failed: " << e.what();
- } catch (...) {
- error() << "unknown error";
- }
-
- stdx::unique_lock<stdx::mutex> sl(_monitorMutex);
- if (_stopRequested) {
- break;
- }
-
- _stopRequestedCV.wait_for(sl, Seconds(10).toSystemDuration());
- }
- }
-
- void checkAllSets() {
- for (const string& setName : globalRSMonitorManager.getAllSetNames()) {
- shared_ptr<ReplicaSetMonitor> m = globalRSMonitorManager.getMonitor(setName);
- if (!m) {
- continue;
- }
-
- Timer t;
- m->startOrContinueRefresh().refreshAll();
- LOG(1) << "Refreshing replica set " << setName << " took " << t.millis() << " msec";
-
- if (!m->isSetUsable()) {
- log() << "Stopping periodic monitoring of set " << m->getName()
- << " because none of the hosts could be contacted for an extended period of "
- "time.";
-
- ReplicaSetMonitor::remove(m->getName());
- }
- }
- }
-
- // protects _started, _stopRequested
- stdx::mutex _monitorMutex;
- bool _started;
-
- stdx::condition_variable _stopRequestedCV;
- bool _stopRequested;
-} replicaSetMonitorWatcher;
-
StaticObserver staticObserver;
//
@@ -253,6 +151,11 @@ struct HostNotIn {
}
const std::set<HostAndPort>& _hosts;
};
+/**
+ * Replica set refresh period on the task executor.
+ */
+const Seconds kRefreshPeriod(30);
+
} // namespace
@@ -266,7 +169,94 @@ const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15);
bool ReplicaSetMonitor::useDeterministicHostSelection = false;
ReplicaSetMonitor::ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds)
- : _state(std::make_shared<SetState>(name, seeds)) {}
+ : _state(std::make_shared<SetState>(name, seeds)),
+ _executor(globalRSMonitorManager.getExecutor()) {}
+
+void ReplicaSetMonitor::init() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ std::weak_ptr<ReplicaSetMonitor> that(shared_from_this());
+ auto status = _executor->scheduleWork([=](const CallbackArgs& cbArgs) {
+ if (auto ptr = that.lock()) {
+ ptr->_refresh(cbArgs);
+ }
+ });
+
+ if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
+ LOG(1) << "Couldn't schedule refresh for " << getName()
+ << ". Executor shutdown in progress";
+ return;
+ }
+
+ if (!status.isOK()) {
+ severe() << "Can't start refresh for replica set " << getName()
+ << causedBy(status.getStatus());
+ fassertFailed(40139);
+ }
+
+ _refresherHandle = status.getValue();
+}
+
+ReplicaSetMonitor::~ReplicaSetMonitor() {
+ // need this lock because otherwise can get race with scheduling in _refresh
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_executor);
+
+ if (!_refresherHandle) {
+ return;
+ }
+ _executor->cancel(_refresherHandle);
+ // Note: calling _executor->wait(_refresherHandle); from the dispatcher thread will cause hang
+ // Its ok not to call it because the d-tor is called only when the last owning pointer goes out
+ // of scope, so as taskExecutor queue holds a weak pointer to RSM it will not be able to get a
+ // task to execute eliminating the need to call method "wait".
+ //
+ _refresherHandle = {};
+}
+
+void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) {
+ if (!cbArgs.status.isOK()) {
+ return;
+ }
+
+ Timer t;
+ startOrContinueRefresh().refreshAll();
+ LOG(1) << "Refreshing replica set " << getName() << " took " << t.millis() << " msec";
+
+ if (!isSetUsable()) {
+ log() << "Stopping periodic monitoring of set " << getName()
+ << " because none of the hosts could be contacted for an extended period of "
+ "time.";
+
+ ReplicaSetMonitor::remove(getName());
+ return;
+ }
+
+ {
+ // reschedule itself
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ std::weak_ptr<ReplicaSetMonitor> that(shared_from_this());
+ auto status = _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod,
+ [=](const CallbackArgs& cbArgs) {
+ if (auto ptr = that.lock()) {
+ ptr->_refresh(cbArgs);
+ }
+ });
+
+ if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
+ LOG(1) << "Cant schedule refresh for " << getName()
+ << ". Executor shutdown in progress";
+ return;
+ }
+
+ if (!status.isOK()) {
+ severe() << "Can't continue refresh for replica set " << getName() << " due to "
+ << status.getStatus().toString();
+ fassertFailed(40140);
+ }
+
+ _refresherHandle = status.getValue();
+ }
+}
StatusWith<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria,
Milliseconds maxWait) {
@@ -391,8 +381,6 @@ bool ReplicaSetMonitor::contains(const HostAndPort& host) const {
void ReplicaSetMonitor::createIfNeeded(const string& name, const set<HostAndPort>& servers) {
globalRSMonitorManager.getOrCreateMonitor(
ConnectionString::forReplicaSet(name, vector<HostAndPort>(servers.begin(), servers.end())));
-
- replicaSetMonitorWatcher.safeGo();
}
shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::get(const std::string& name) {
@@ -452,10 +440,6 @@ void ReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder) const {
}
void ReplicaSetMonitor::cleanup() {
- // Call cancel first, in case the RSMW was never started.
- replicaSetMonitorWatcher.cancel();
- replicaSetMonitorWatcher.stop();
- replicaSetMonitorWatcher.wait();
globalRSMonitorManager.removeAllMonitors();
asyncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook();
syncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook();
diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h
index 5847a6af6cd..fc415806b0f 100644
--- a/src/mongo/client/replica_set_monitor.h
+++ b/src/mongo/client/replica_set_monitor.h
@@ -31,9 +31,11 @@
#include <memory>
#include <set>
#include <string>
+#include <memory>
#include "mongo/base/disallow_copying.h"
#include "mongo/base/string_data.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
@@ -49,7 +51,7 @@ typedef std::shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorPtr;
* Holds state about a replica set and provides a means to refresh the local view.
* All methods perform the required synchronization to allow callers from multiple threads.
*/
-class ReplicaSetMonitor {
+class ReplicaSetMonitor : public std::enable_shared_from_this<ReplicaSetMonitor> {
MONGO_DISALLOW_COPYING(ReplicaSetMonitor);
public:
@@ -66,6 +68,11 @@ public:
ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds);
/**
+ * Schedules the initial refresh task into task executor.
+ */
+ void init();
+
+ /**
* Returns a host matching the given read preference or an error, if no host matches.
*
* @param readPref Read preference to match against
@@ -224,10 +231,11 @@ public:
* Allows tests to set initial conditions and introspect the current state.
*/
explicit ReplicaSetMonitor(const SetStatePtr& initialState) : _state(initialState) {}
+ ~ReplicaSetMonitor();
/**
* If a ReplicaSetMonitor has been refreshed more than this many times in a row without
- * finding any live nodes claiming to be in the set, the ReplicaSetMonitorWatcher will stop
+ * finding any live nodes claiming to be in the set, the ReplicaSetMonitor will stop
* periodic background refreshes of this set.
*/
static std::atomic<int> maxConsecutiveFailedChecks; // NOLINT
@@ -248,7 +256,18 @@ public:
static bool useDeterministicHostSelection;
private:
+ /**
+ * A callback passed to a task executor to refresh the replica set. It reschedules itself until
+ * its canceled in d-tor.
+ */
+ void _refresh(const executor::TaskExecutor::CallbackArgs&);
+
+ // Serializes refresh and protects _refresherHandle
+ stdx::mutex _mutex;
+ executor::TaskExecutor::CallbackHandle _refresherHandle;
+
const SetStatePtr _state;
+ executor::TaskExecutor* _executor;
};
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp
index 1ef7ae0e254..2789d8c4df9 100644
--- a/src/mongo/client/replica_set_monitor_manager.cpp
+++ b/src/mongo/client/replica_set_monitor_manager.cpp
@@ -35,6 +35,12 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/client/connection_string.h"
#include "mongo/client/replica_set_monitor.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/network_interface_thread_pool.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/log.h"
#include "mongo/util/map_util.h"
@@ -46,7 +52,13 @@ using std::set;
using std::string;
using std::vector;
-ReplicaSetMonitorManager::ReplicaSetMonitorManager() = default;
+using executor::NetworkInterface;
+using executor::NetworkInterfaceThreadPool;
+using executor::TaskExecutorPool;
+using executor::TaskExecutor;
+using executor::ThreadPoolTaskExecutor;
+
+ReplicaSetMonitorManager::ReplicaSetMonitorManager() {}
ReplicaSetMonitorManager::~ReplicaSetMonitorManager() = default;
@@ -61,6 +73,16 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(
invariant(connStr.type() == ConnectionString::SET);
stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (!_taskExecutor) {
+ // construct task executor
+ auto net = executor::makeNetworkInterface("ReplicaSetMonitor-TaskExecutor");
+ auto netPtr = net.get();
+ _taskExecutor = stdx::make_unique<ThreadPoolTaskExecutor>(
+ stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net));
+ LOG(1) << "Starting up task executor for monitoring replica sets in response to request to "
+ "monitor set: " << connStr.toString();
+ _taskExecutor->startup();
+ }
shared_ptr<ReplicaSetMonitor>& monitor = _monitors[connStr.getSetName()];
if (!monitor) {
@@ -70,6 +92,7 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(
log() << "Starting new replica set monitor for " << connStr.toString();
monitor = std::make_shared<ReplicaSetMonitor>(connStr.getSetName(), servers);
+ monitor->init();
}
return monitor;
@@ -98,9 +121,15 @@ void ReplicaSetMonitorManager::removeMonitor(StringData setName) {
void ReplicaSetMonitorManager::removeAllMonitors() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- // Reset the StringMap, which will release all registered monitors
+ // Reset the _monitors map, which will release all registered monitors
_monitors = ReplicaSetMonitorsMap();
+
+ if (_taskExecutor) {
+ LOG(1) << "Shutting down task executor used for monitoring replica sets";
+ _taskExecutor->shutdown();
+ _taskExecutor->join();
+ _taskExecutor.release();
+ }
}
void ReplicaSetMonitorManager::report(BSONObjBuilder* builder) {
@@ -119,4 +148,9 @@ void ReplicaSetMonitorManager::report(BSONObjBuilder* builder) {
}
}
+TaskExecutor* ReplicaSetMonitorManager::getExecutor() {
+ invariant(_taskExecutor);
+ return _taskExecutor.get();
+}
+
} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h
index 0a69fe72a4c..a776ccda767 100644
--- a/src/mongo/client/replica_set_monitor_manager.h
+++ b/src/mongo/client/replica_set_monitor_manager.h
@@ -32,6 +32,7 @@
#include <vector>
#include "mongo/base/disallow_copying.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/string_map.h"
@@ -80,12 +81,20 @@ public:
*/
void report(BSONObjBuilder* builder);
+ /**
+ * Returns an executor for running RSM tasks.
+ */
+ executor::TaskExecutor* getExecutor();
+
private:
using ReplicaSetMonitorsMap = StringMap<std::shared_ptr<ReplicaSetMonitor>>;
// Protects access to the replica set monitors
stdx::mutex _mutex;
ReplicaSetMonitorsMap _monitors;
+
+ // Executor for monitoring replica sets.
+ std::unique_ptr<executor::TaskExecutor> _taskExecutor;
};
} // namespace mongo