summaryrefslogtreecommitdiff
path: root/src/mongo/client/replica_set_monitor.cpp
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2018-06-19 15:46:57 -0400
committerBen Caimano <ben.caimano@10gen.com>2018-09-07 16:52:31 -0400
commitdbb45c36cde1d371a08a49d31fca189eb71e2b1d (patch)
tree5c2c6544814468037f90c857b6047e93aae13c1c /src/mongo/client/replica_set_monitor.cpp
parent5445067793ec31b5dbdf7076fe710586f9a63414 (diff)
downloadmongo-dbb45c36cde1d371a08a49d31fca189eb71e2b1d.tar.gz
SERVER-35216 Expose ReplicaSetManager refresh period to the mongo shell
(cherry picked from commit 3934aee73df7e514b60732bb64d5d4b748835f05)
Diffstat (limited to 'src/mongo/client/replica_set_monitor.cpp')
-rw-r--r--src/mongo/client/replica_set_monitor.cpp81
1 files changed, 38 insertions, 43 deletions
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp
index a85952d7672..06e8141c304 100644
--- a/src/mongo/client/replica_set_monitor.cpp
+++ b/src/mongo/client/replica_set_monitor.cpp
@@ -65,6 +65,9 @@ using std::vector;
// Failpoint for disabling AsyncConfigChangeHook calls on updated RS nodes.
MONGO_FAIL_POINT_DEFINE(failAsyncConfigChangeHook);
+// Failpoint for changing the default refresh period
+MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorDefaultRefreshPeriod);
+
namespace {
// Pull nested types to top-level scope
@@ -163,7 +166,7 @@ struct HostNotIn {
/**
* Replica set refresh period on the task executor.
*/
-const Seconds kRefreshPeriod(30);
+const Seconds kDefaultRefreshPeriod(30);
} // namespace
// If we cannot find a host after 15 seconds of refreshing, give up
@@ -172,6 +175,16 @@ const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15);
// Defaults to random selection as required by the spec
bool ReplicaSetMonitor::useDeterministicHostSelection = false;
+Seconds ReplicaSetMonitor::getDefaultRefreshPeriod() {
+ MONGO_FAIL_POINT_BLOCK_IF(modifyReplicaSetMonitorDefaultRefreshPeriod,
+ data,
+ [&](const BSONObj& data) { return data.hasField("period"); }) {
+ return Seconds{data.getData().getIntField("period")};
+ }
+
+ return kDefaultRefreshPeriod;
+}
+
ReplicaSetMonitor::ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds)
: _state(std::make_shared<SetState>(name, seeds)),
_executor(globalRSMonitorManager.getExecutor()) {}
@@ -180,32 +193,11 @@ ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri)
: _state(std::make_shared<SetState>(uri)), _executor(globalRSMonitorManager.getExecutor()) {}
void ReplicaSetMonitor::init() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_executor);
- std::weak_ptr<ReplicaSetMonitor> that(shared_from_this());
- auto status = _executor->scheduleWork([=](const CallbackArgs& cbArgs) {
- if (auto ptr = that.lock()) {
- ptr->_refresh(cbArgs);
- }
- });
-
- if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
- LOG(1) << "Couldn't schedule refresh for " << getName()
- << ". Executor shutdown in progress";
- return;
- }
-
- if (!status.isOK()) {
- severe() << "Can't start refresh for replica set " << getName()
- << causedBy(redact(status.getStatus()));
- fassertFailed(40139);
- }
-
- _refresherHandle = status.getValue();
+ _scheduleRefresh(_executor->now());
}
ReplicaSetMonitor::~ReplicaSetMonitor() {
- // need this lock because otherwise can get race with scheduling in _refresh
+ // need this lock because otherwise can get race with _scheduleRefresh()
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!_refresherHandle || !_executor) {
return;
@@ -220,15 +212,7 @@ ReplicaSetMonitor::~ReplicaSetMonitor() {
_refresherHandle = {};
}
-void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) {
- if (!cbArgs.status.isOK()) {
- return;
- }
-
- Timer t;
- startOrContinueRefresh().refreshAll();
- LOG(1) << "Refreshing replica set " << getName() << " took " << t.millis() << " msec";
-
+void ReplicaSetMonitor::_scheduleRefresh(Date_t when) {
// Reschedule the refresh
invariant(_executor);
@@ -238,14 +222,15 @@ void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) {
}
stdx::lock_guard<stdx::mutex> lk(_mutex);
-
std::weak_ptr<ReplicaSetMonitor> that(shared_from_this());
- auto status = _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod,
- [=](const CallbackArgs& cbArgs) {
- if (auto ptr = that.lock()) {
- ptr->_refresh(cbArgs);
- }
- });
+ auto status = _executor->scheduleWorkAt(when, [that](const CallbackArgs& cbArgs) {
+ if (!cbArgs.status.isOK())
+ return;
+
+ if (auto ptr = that.lock()) {
+ ptr->_doScheduledRefresh(cbArgs.myHandle);
+ }
+ });
if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
LOG(1) << "Cant schedule refresh for " << getName() << ". Executor shutdown in progress";
@@ -261,6 +246,13 @@ void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) {
_refresherHandle = status.getValue();
}
+void ReplicaSetMonitor::_doScheduledRefresh(const CallbackHandle& currentHandle) {
+ startOrContinueRefresh().refreshAll();
+
+ // And now we set up the next one
+ _scheduleRefresh(_executor->now() + _state->refreshPeriod);
+}
+
StatusWith<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria,
Milliseconds maxWait) {
if (_isRemovedFromManager.load()) {
@@ -490,7 +482,9 @@ HostAndPort Refresher::refreshUntilMatches(const ReadPreferenceSetting& criteria
};
void Refresher::refreshAll() {
+ Timer t;
_refreshUntilMatches(nullptr);
+ LOG(1) << "Refreshing replica set " << _set->name << " took " << t.millis() << " msec";
}
Refresher::NextStep Refresher::getNextStep() {
@@ -1004,7 +998,8 @@ SetState::SetState(StringData name, const std::set<HostAndPort>& seedNodes)
seedNodes(seedNodes),
latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * 1000),
rand(int64_t(time(0))),
- roundRobin(0) {
+ roundRobin(0),
+ refreshPeriod(getDefaultRefreshPeriod()) {
uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty());
if (name.empty())
@@ -1088,7 +1083,7 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con
Date_t maxWriteTime = (*latestSecNode)->lastWriteDate;
matchNode = [=](const Node& node) -> bool {
return duration_cast<Seconds>(maxWriteTime - node.lastWriteDate) +
- kRefreshPeriod <=
+ refreshPeriod <=
criteria.maxStalenessSeconds;
};
}
@@ -1098,7 +1093,7 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con
matchNode = [=](const Node& node) -> bool {
return duration_cast<Seconds>(node.lastWriteDateUpdateTime -
node.lastWriteDate) -
- primaryStaleness + kRefreshPeriod <=
+ primaryStaleness + refreshPeriod <=
criteria.maxStalenessSeconds;
};
}