summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/client/replica_set_monitor.cpp88
-rw-r--r--src/mongo/client/replica_set_monitor.h15
-rw-r--r--src/mongo/client/replica_set_monitor_internal.h1
3 files changed, 58 insertions, 46 deletions
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp
index f074df22fb5..e77474194a0 100644
--- a/src/mongo/client/replica_set_monitor.cpp
+++ b/src/mongo/client/replica_set_monitor.cpp
@@ -65,6 +65,9 @@ using std::vector;
// Failpoint for disabling AsyncConfigChangeHook calls on updated RS nodes.
MONGO_FAIL_POINT_DEFINE(failAsyncConfigChangeHook);
+// Failpoint for changing the default refresh period
+MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorDefaultRefreshPeriod);
+
namespace {
// Pull nested types to top-level scope
@@ -163,7 +166,7 @@ struct HostNotIn {
/**
* Replica set refresh period on the task executor.
*/
-const Seconds kRefreshPeriod(30);
+const Seconds kDefaultRefreshPeriod(30);
} // namespace
// If we cannot find a host after 15 seconds of refreshing, give up
@@ -172,6 +175,16 @@ const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15);
// Defaults to random selection as required by the spec
bool ReplicaSetMonitor::useDeterministicHostSelection = false;
+Seconds ReplicaSetMonitor::getDefaultRefreshPeriod() {
+ MONGO_FAIL_POINT_BLOCK_IF(modifyReplicaSetMonitorDefaultRefreshPeriod,
+ data,
+ [&](const BSONObj& data) { return data.hasField("period"); }) {
+ return Seconds{data.getData().getIntField("period")};
+ }
+
+ return kDefaultRefreshPeriod;
+}
+
ReplicaSetMonitor::ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds)
: _state(std::make_shared<SetState>(name, seeds)),
_executor(globalRSMonitorManager.getExecutor()) {}
@@ -180,32 +193,11 @@ ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri)
: _state(std::make_shared<SetState>(uri)), _executor(globalRSMonitorManager.getExecutor()) {}
void ReplicaSetMonitor::init() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_executor);
- std::weak_ptr<ReplicaSetMonitor> that(shared_from_this());
- auto status = _executor->scheduleWork([=](const CallbackArgs& cbArgs) {
- if (auto ptr = that.lock()) {
- ptr->_refresh(cbArgs);
- }
- });
-
- if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
- LOG(1) << "Couldn't schedule refresh for " << getName()
- << ". Executor shutdown in progress";
- return;
- }
-
- if (!status.isOK()) {
- severe() << "Can't start refresh for replica set " << getName()
- << causedBy(redact(status.getStatus()));
- fassertFailed(40139);
- }
-
- _refresherHandle = status.getValue();
+ _scheduleRefresh(_executor->now());
}
ReplicaSetMonitor::~ReplicaSetMonitor() {
- // need this lock because otherwise can get race with scheduling in _refresh
+ // need this lock because otherwise can get race with _scheduleRefresh()
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!_refresherHandle || !_executor) {
return;
@@ -220,15 +212,7 @@ ReplicaSetMonitor::~ReplicaSetMonitor() {
_refresherHandle = {};
}
-void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) {
- if (!cbArgs.status.isOK()) {
- return;
- }
-
- Timer t;
- startOrContinueRefresh().refreshAll();
- LOG(1) << "Refreshing replica set " << getName() << " took " << t.millis() << " msec";
-
+void ReplicaSetMonitor::_scheduleRefresh(Date_t when) {
// Reschedule the refresh
invariant(_executor);
@@ -238,14 +222,15 @@ void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) {
}
stdx::lock_guard<stdx::mutex> lk(_mutex);
-
std::weak_ptr<ReplicaSetMonitor> that(shared_from_this());
- auto status = _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod,
- [=](const CallbackArgs& cbArgs) {
- if (auto ptr = that.lock()) {
- ptr->_refresh(cbArgs);
- }
- });
+ auto status = _executor->scheduleWorkAt(when, [that](const CallbackArgs& cbArgs) {
+ if (!cbArgs.status.isOK())
+ return;
+
+ if (auto ptr = that.lock()) {
+ ptr->_doScheduledRefresh(cbArgs.myHandle);
+ }
+ });
if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
LOG(1) << "Cant schedule refresh for " << getName() << ". Executor shutdown in progress";
@@ -261,6 +246,20 @@ void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) {
_refresherHandle = status.getValue();
}
+void ReplicaSetMonitor::_doScheduledRefresh(const CallbackHandle& currentHandle) {
+ startOrContinueRefresh().refreshAll();
+
+ // We're calling this from outside the scheduling loop, always cancel first.
+ // This doesn't happen right now. But it does in my dreams.
+ if (currentHandle != _refresherHandle) {
+ LOG(1) << "Canceling scheduled refresh.";
+ _executor->cancel(_refresherHandle);
+ }
+
+ // And now we set up the next one
+ _scheduleRefresh(_executor->now() + _state->refreshPeriod);
+}
+
StatusWith<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria,
Milliseconds maxWait) {
if (_isRemovedFromManager.load()) {
@@ -490,7 +489,9 @@ HostAndPort Refresher::refreshUntilMatches(const ReadPreferenceSetting& criteria
};
void Refresher::refreshAll() {
+ Timer t;
_refreshUntilMatches(nullptr);
+ LOG(1) << "Refreshing replica set " << _set->name << " took " << t.millis() << " msec";
}
Refresher::NextStep Refresher::getNextStep() {
@@ -1004,7 +1005,8 @@ SetState::SetState(StringData name, const std::set<HostAndPort>& seedNodes)
seedNodes(seedNodes),
latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * 1000),
rand(int64_t(time(0))),
- roundRobin(0) {
+ roundRobin(0),
+ refreshPeriod(getDefaultRefreshPeriod()) {
uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty());
if (name.empty())
@@ -1088,7 +1090,7 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con
Date_t maxWriteTime = (*latestSecNode)->lastWriteDate;
matchNode = [=](const Node& node) -> bool {
return duration_cast<Seconds>(maxWriteTime - node.lastWriteDate) +
- kRefreshPeriod <=
+ refreshPeriod <=
criteria.maxStalenessSeconds;
};
}
@@ -1098,7 +1100,7 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con
matchNode = [=](const Node& node) -> bool {
return duration_cast<Seconds>(node.lastWriteDateUpdateTime -
node.lastWriteDate) -
- primaryStaleness + kRefreshPeriod <=
+ primaryStaleness + refreshPeriod <=
criteria.maxStalenessSeconds;
};
}
diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h
index 1c9560b56a3..ab6afcb4be2 100644
--- a/src/mongo/client/replica_set_monitor.h
+++ b/src/mongo/client/replica_set_monitor.h
@@ -235,6 +235,11 @@ public:
*/
static void shutdown();
+ /**
+ * Returns the refresh period that is given to all new SetStates.
+ */
+ static Seconds getDefaultRefreshPeriod();
+
//
// internal types (defined in replica_set_monitor_internal.h)
//
@@ -268,10 +273,14 @@ public:
private:
/**
- * A callback passed to a task executor to refresh the replica set. It reschedules itself until
- * its canceled in d-tor.
+ * Schedules a refresh via the task executor. (Task is automatically canceled in the d-tor.)
+ */
+ void _scheduleRefresh(Date_t when);
+
+ /**
+ * This function refreshes the replica set and calls _scheduleRefresh() again.
*/
- void _refresh(const executor::TaskExecutor::CallbackArgs&);
+ void _doScheduledRefresh(const executor::TaskExecutor::CallbackHandle& currentHandle);
// Serializes refresh and protects _refresherHandle
stdx::mutex _mutex;
diff --git a/src/mongo/client/replica_set_monitor_internal.h b/src/mongo/client/replica_set_monitor_internal.h
index f14080a16a7..a03ed7a2de0 100644
--- a/src/mongo/client/replica_set_monitor_internal.h
+++ b/src/mongo/client/replica_set_monitor_internal.h
@@ -206,6 +206,7 @@ public:
mutable PseudoRandom rand; // only used for host selection to balance load
mutable int roundRobin; // used when useDeterministicHostSelection is true
MongoURI setUri; // URI that may have constructed this
+ Seconds refreshPeriod;
};
struct ReplicaSetMonitor::ScanState {