diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/client/replica_set_monitor.cpp | 88 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor.h | 15 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_internal.h | 1 |
3 files changed, 58 insertions, 46 deletions
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp index f074df22fb5..e77474194a0 100644 --- a/src/mongo/client/replica_set_monitor.cpp +++ b/src/mongo/client/replica_set_monitor.cpp @@ -65,6 +65,9 @@ using std::vector; // Failpoint for disabling AsyncConfigChangeHook calls on updated RS nodes. MONGO_FAIL_POINT_DEFINE(failAsyncConfigChangeHook); +// Failpoint for changing the default refresh period +MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorDefaultRefreshPeriod); + namespace { // Pull nested types to top-level scope @@ -163,7 +166,7 @@ struct HostNotIn { /** * Replica set refresh period on the task executor. */ -const Seconds kRefreshPeriod(30); +const Seconds kDefaultRefreshPeriod(30); } // namespace // If we cannot find a host after 15 seconds of refreshing, give up @@ -172,6 +175,16 @@ const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15); // Defaults to random selection as required by the spec bool ReplicaSetMonitor::useDeterministicHostSelection = false; +Seconds ReplicaSetMonitor::getDefaultRefreshPeriod() { + MONGO_FAIL_POINT_BLOCK_IF(modifyReplicaSetMonitorDefaultRefreshPeriod, + data, + [&](const BSONObj& data) { return data.hasField("period"); }) { + return Seconds{data.getData().getIntField("period")}; + } + + return kDefaultRefreshPeriod; +} + ReplicaSetMonitor::ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds) : _state(std::make_shared<SetState>(name, seeds)), _executor(globalRSMonitorManager.getExecutor()) {} @@ -180,32 +193,11 @@ ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri) : _state(std::make_shared<SetState>(uri)), _executor(globalRSMonitorManager.getExecutor()) {} void ReplicaSetMonitor::init() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_executor); - std::weak_ptr<ReplicaSetMonitor> that(shared_from_this()); - auto status = _executor->scheduleWork([=](const CallbackArgs& cbArgs) { - if (auto ptr = that.lock()) { - ptr->_refresh(cbArgs); - } - }); - - if (status.getStatus() == ErrorCodes::ShutdownInProgress) { - LOG(1) << "Couldn't schedule refresh for " << getName() - << ". Executor shutdown in progress"; - return; - } - - if (!status.isOK()) { - severe() << "Can't start refresh for replica set " << getName() - << causedBy(redact(status.getStatus())); - fassertFailed(40139); - } - - _refresherHandle = status.getValue(); + _scheduleRefresh(_executor->now()); } ReplicaSetMonitor::~ReplicaSetMonitor() { - // need this lock because otherwise can get race with scheduling in _refresh + // need this lock because otherwise can get race with _scheduleRefresh() stdx::lock_guard<stdx::mutex> lk(_mutex); if (!_refresherHandle || !_executor) { return; @@ -220,15 +212,7 @@ ReplicaSetMonitor::~ReplicaSetMonitor() { _refresherHandle = {}; } -void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) { - if (!cbArgs.status.isOK()) { - return; - } - - Timer t; - startOrContinueRefresh().refreshAll(); - LOG(1) << "Refreshing replica set " << getName() << " took " << t.millis() << " msec"; - +void ReplicaSetMonitor::_scheduleRefresh(Date_t when) { // Reschedule the refresh invariant(_executor); @@ -238,14 +222,15 @@ void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) { } stdx::lock_guard<stdx::mutex> lk(_mutex); - std::weak_ptr<ReplicaSetMonitor> that(shared_from_this()); - auto status = _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod, - [=](const CallbackArgs& cbArgs) { - if (auto ptr = that.lock()) { - ptr->_refresh(cbArgs); - } - }); + auto status = _executor->scheduleWorkAt(when, [that](const CallbackArgs& cbArgs) { + if (!cbArgs.status.isOK()) + return; + + if (auto ptr = that.lock()) { + ptr->_doScheduledRefresh(cbArgs.myHandle); + } + }); if (status.getStatus() == ErrorCodes::ShutdownInProgress) { LOG(1) << "Cant schedule refresh for " << getName() << ". Executor shutdown in progress"; @@ -261,6 +246,20 @@ void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) { _refresherHandle = status.getValue(); } +void ReplicaSetMonitor::_doScheduledRefresh(const CallbackHandle& currentHandle) { + startOrContinueRefresh().refreshAll(); + + // We're calling this from outside the scheduling loop, always cancel first. + // This doesn't happen right now. But it does in my dreams. + if (currentHandle != _refresherHandle) { + LOG(1) << "Canceling scheduled refresh."; + _executor->cancel(_refresherHandle); + } + + // And now we set up the next one + _scheduleRefresh(_executor->now() + _state->refreshPeriod); +} + StatusWith<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria, Milliseconds maxWait) { if (_isRemovedFromManager.load()) { @@ -490,7 +489,9 @@ HostAndPort Refresher::refreshUntilMatches(const ReadPreferenceSetting& criteria }; void Refresher::refreshAll() { + Timer t; _refreshUntilMatches(nullptr); + LOG(1) << "Refreshing replica set " << _set->name << " took " << t.millis() << " msec"; } Refresher::NextStep Refresher::getNextStep() { @@ -1004,7 +1005,8 @@ SetState::SetState(StringData name, const std::set<HostAndPort>& seedNodes) seedNodes(seedNodes), latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * 1000), rand(int64_t(time(0))), - roundRobin(0) { + roundRobin(0), + refreshPeriod(getDefaultRefreshPeriod()) { uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty()); if (name.empty()) @@ -1088,7 +1090,7 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con Date_t maxWriteTime = (*latestSecNode)->lastWriteDate; matchNode = [=](const Node& node) -> bool { return duration_cast<Seconds>(maxWriteTime - node.lastWriteDate) + - kRefreshPeriod <= + refreshPeriod <= criteria.maxStalenessSeconds; }; } @@ -1098,7 +1100,7 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con matchNode = [=](const Node& node) -> bool { return duration_cast<Seconds>(node.lastWriteDateUpdateTime - node.lastWriteDate) - - primaryStaleness + kRefreshPeriod <= + primaryStaleness + refreshPeriod <= criteria.maxStalenessSeconds; }; } diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h index 1c9560b56a3..ab6afcb4be2 100644 --- a/src/mongo/client/replica_set_monitor.h +++ b/src/mongo/client/replica_set_monitor.h @@ -235,6 +235,11 @@ public: */ static void shutdown(); + /** + * Returns the refresh period that is given to all new SetStates. + */ + static Seconds getDefaultRefreshPeriod(); + // // internal types (defined in replica_set_monitor_internal.h) // @@ -268,10 +273,14 @@ public: private: /** - * A callback passed to a task executor to refresh the replica set. It reschedules itself until - * its canceled in d-tor. + * Schedules a refresh via the task executor. (Task is automatically canceled in the d-tor.) + */ + void _scheduleRefresh(Date_t when); + + /** + * This function refreshes the replica set and calls _scheduleRefresh() again. */ - void _refresh(const executor::TaskExecutor::CallbackArgs&); + void _doScheduledRefresh(const executor::TaskExecutor::CallbackHandle& currentHandle); // Serializes refresh and protects _refresherHandle stdx::mutex _mutex; diff --git a/src/mongo/client/replica_set_monitor_internal.h b/src/mongo/client/replica_set_monitor_internal.h index f14080a16a7..a03ed7a2de0 100644 --- a/src/mongo/client/replica_set_monitor_internal.h +++ b/src/mongo/client/replica_set_monitor_internal.h @@ -206,6 +206,7 @@ public: mutable PseudoRandom rand; // only used for host selection to balance load mutable int roundRobin; // used when useDeterministicHostSelection is true MongoURI setUri; // URI that may have constructed this + Seconds refreshPeriod; }; struct ReplicaSetMonitor::ScanState { |