diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2018-06-19 15:46:57 -0400 |
---|---|---|
committer | Ben Caimano <ben.caimano@10gen.com> | 2018-09-07 16:52:31 -0400 |
commit | dbb45c36cde1d371a08a49d31fca189eb71e2b1d (patch) | |
tree | 5c2c6544814468037f90c857b6047e93aae13c1c /src/mongo/client/replica_set_monitor.cpp | |
parent | 5445067793ec31b5dbdf7076fe710586f9a63414 (diff) | |
download | mongo-dbb45c36cde1d371a08a49d31fca189eb71e2b1d.tar.gz |
SERVER-35216 Expose ReplicaSetManager refresh period to the mongo shell
(cherry picked from commit 3934aee73df7e514b60732bb64d5d4b748835f05)
Diffstat (limited to 'src/mongo/client/replica_set_monitor.cpp')
-rw-r--r-- | src/mongo/client/replica_set_monitor.cpp | 81 |
1 files changed, 38 insertions, 43 deletions
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp index a85952d7672..06e8141c304 100644 --- a/src/mongo/client/replica_set_monitor.cpp +++ b/src/mongo/client/replica_set_monitor.cpp @@ -65,6 +65,9 @@ using std::vector; // Failpoint for disabling AsyncConfigChangeHook calls on updated RS nodes. MONGO_FAIL_POINT_DEFINE(failAsyncConfigChangeHook); +// Failpoint for changing the default refresh period +MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorDefaultRefreshPeriod); + namespace { // Pull nested types to top-level scope @@ -163,7 +166,7 @@ struct HostNotIn { /** * Replica set refresh period on the task executor. */ -const Seconds kRefreshPeriod(30); +const Seconds kDefaultRefreshPeriod(30); } // namespace // If we cannot find a host after 15 seconds of refreshing, give up @@ -172,6 +175,16 @@ const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15); // Defaults to random selection as required by the spec bool ReplicaSetMonitor::useDeterministicHostSelection = false; +Seconds ReplicaSetMonitor::getDefaultRefreshPeriod() { + MONGO_FAIL_POINT_BLOCK_IF(modifyReplicaSetMonitorDefaultRefreshPeriod, + data, + [&](const BSONObj& data) { return data.hasField("period"); }) { + return Seconds{data.getData().getIntField("period")}; + } + + return kDefaultRefreshPeriod; +} + ReplicaSetMonitor::ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds) : _state(std::make_shared<SetState>(name, seeds)), _executor(globalRSMonitorManager.getExecutor()) {} @@ -180,32 +193,11 @@ ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri) : _state(std::make_shared<SetState>(uri)), _executor(globalRSMonitorManager.getExecutor()) {} void ReplicaSetMonitor::init() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_executor); - std::weak_ptr<ReplicaSetMonitor> that(shared_from_this()); - auto status = _executor->scheduleWork([=](const CallbackArgs& cbArgs) { - if (auto ptr = that.lock()) { - ptr->_refresh(cbArgs); - } - }); - - if (status.getStatus() == ErrorCodes::ShutdownInProgress) { - LOG(1) << "Couldn't schedule refresh for " << getName() - << ". Executor shutdown in progress"; - return; - } - - if (!status.isOK()) { - severe() << "Can't start refresh for replica set " << getName() - << causedBy(redact(status.getStatus())); - fassertFailed(40139); - } - - _refresherHandle = status.getValue(); + _scheduleRefresh(_executor->now()); } ReplicaSetMonitor::~ReplicaSetMonitor() { - // need this lock because otherwise can get race with scheduling in _refresh + // need this lock because otherwise can get race with _scheduleRefresh() stdx::lock_guard<stdx::mutex> lk(_mutex); if (!_refresherHandle || !_executor) { return; @@ -220,15 +212,7 @@ ReplicaSetMonitor::~ReplicaSetMonitor() { _refresherHandle = {}; } -void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) { - if (!cbArgs.status.isOK()) { - return; - } - - Timer t; - startOrContinueRefresh().refreshAll(); - LOG(1) << "Refreshing replica set " << getName() << " took " << t.millis() << " msec"; - +void ReplicaSetMonitor::_scheduleRefresh(Date_t when) { // Reschedule the refresh invariant(_executor); @@ -238,14 +222,15 @@ void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) { } stdx::lock_guard<stdx::mutex> lk(_mutex); - std::weak_ptr<ReplicaSetMonitor> that(shared_from_this()); - auto status = _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod, - [=](const CallbackArgs& cbArgs) { - if (auto ptr = that.lock()) { - ptr->_refresh(cbArgs); - } - }); + auto status = _executor->scheduleWorkAt(when, [that](const CallbackArgs& cbArgs) { + if (!cbArgs.status.isOK()) + return; + + if (auto ptr = that.lock()) { + ptr->_doScheduledRefresh(cbArgs.myHandle); + } + }); if (status.getStatus() == ErrorCodes::ShutdownInProgress) { LOG(1) << "Cant schedule refresh for " << getName() << ". Executor shutdown in progress"; @@ -261,6 +246,13 @@ void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) { _refresherHandle = status.getValue(); } +void ReplicaSetMonitor::_doScheduledRefresh(const CallbackHandle& currentHandle) { + startOrContinueRefresh().refreshAll(); + + // And now we set up the next one + _scheduleRefresh(_executor->now() + _state->refreshPeriod); +} + StatusWith<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria, Milliseconds maxWait) { if (_isRemovedFromManager.load()) { @@ -490,7 +482,9 @@ HostAndPort Refresher::refreshUntilMatches(const ReadPreferenceSetting& criteria }; void Refresher::refreshAll() { + Timer t; _refreshUntilMatches(nullptr); + LOG(1) << "Refreshing replica set " << _set->name << " took " << t.millis() << " msec"; } Refresher::NextStep Refresher::getNextStep() { @@ -1004,7 +998,8 @@ SetState::SetState(StringData name, const std::set<HostAndPort>& seedNodes) seedNodes(seedNodes), latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * 1000), rand(int64_t(time(0))), - roundRobin(0) { + roundRobin(0), + refreshPeriod(getDefaultRefreshPeriod()) { uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty()); if (name.empty()) @@ -1088,7 +1083,7 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con Date_t maxWriteTime = (*latestSecNode)->lastWriteDate; matchNode = [=](const Node& node) -> bool { return duration_cast<Seconds>(maxWriteTime - node.lastWriteDate) + - kRefreshPeriod <= + refreshPeriod <= criteria.maxStalenessSeconds; }; } @@ -1098,7 +1093,7 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con matchNode = [=](const Node& node) -> bool { return duration_cast<Seconds>(node.lastWriteDateUpdateTime - node.lastWriteDate) - - primaryStaleness + kRefreshPeriod <= + primaryStaleness + refreshPeriod <= criteria.maxStalenessSeconds; }; } |