summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/client/connpool.cpp4
-rw-r--r--src/mongo/client/replica_set_monitor.cpp87
-rw-r--r--src/mongo/client/replica_set_monitor.h16
3 files changed, 53 insertions, 54 deletions
diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp
index 72b6d0b5c84..30325079fab 100644
--- a/src/mongo/client/connpool.cpp
+++ b/src/mongo/client/connpool.cpp
@@ -196,7 +196,9 @@ DBConnectionPool::DBConnectionPool()
_hooks(new list<DBConnectionHook*>()) {}
DBClientBase* DBConnectionPool::_get(const string& ident, double socketTimeout) {
- uassert(17382, "Can't use connection pool during shutdown", !globalInShutdownDeprecated());
+ uassert(ErrorCodes::ShutdownInProgress,
+ "Can't use connection pool during shutdown",
+ !globalInShutdownDeprecated());
stdx::lock_guard<stdx::mutex> L(_mutex);
PoolForHost& p = _pools[PoolKey(ident, socketTimeout)];
p.setMaxPoolSize(_maxPoolSize);
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp
index ee4dbe0657b..d6fefc3f492 100644
--- a/src/mongo/client/replica_set_monitor.cpp
+++ b/src/mongo/client/replica_set_monitor.cpp
@@ -227,44 +227,44 @@ void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) {
Timer t;
startOrContinueRefresh().refreshAll();
LOG(1) << "Refreshing replica set " << getName() << " took " << t.millis() << " msec";
- {
- // reschedule itself
- invariant(_executor);
- if (_isRemovedFromManager.load()) { // already removed so no need to refresh
- LOG(1) << "Stopping refresh for replica set " << getName() << " because its removed";
- return;
- }
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- std::weak_ptr<ReplicaSetMonitor> that(shared_from_this());
- auto status = _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod,
- [=](const CallbackArgs& cbArgs) {
- if (auto ptr = that.lock()) {
- ptr->_refresh(cbArgs);
- }
- });
-
- if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
- LOG(1) << "Cant schedule refresh for " << getName()
- << ". Executor shutdown in progress";
- return;
- }
+ // Reschedule the refresh
+ invariant(_executor);
- if (!status.isOK()) {
- severe() << "Can't continue refresh for replica set " << getName() << " due to "
- << redact(status.getStatus());
- fassertFailed(40140);
- }
+ if (_isRemovedFromManager.load()) { // already removed so no need to refresh
+ LOG(1) << "Stopping refresh for replica set " << getName() << " because its removed";
+ return;
+ }
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ std::weak_ptr<ReplicaSetMonitor> that(shared_from_this());
+ auto status = _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod,
+ [=](const CallbackArgs& cbArgs) {
+ if (auto ptr = that.lock()) {
+ ptr->_refresh(cbArgs);
+ }
+ });
- _refresherHandle = status.getValue();
+ if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
+ LOG(1) << "Cant schedule refresh for " << getName() << ". Executor shutdown in progress";
+ return;
}
+
+ if (!status.isOK()) {
+ severe() << "Can't continue refresh for replica set " << getName() << " due to "
+ << redact(status.getStatus());
+ fassertFailed(40140);
+ }
+
+ _refresherHandle = status.getValue();
}
StatusWith<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria,
Milliseconds maxWait) {
if (_isRemovedFromManager.load()) {
- return Status(ErrorCodes::ReplicaSetMonitorRemoved,
- str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed");
+ return {ErrorCodes::ReplicaSetMonitorRemoved,
+ str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed"};
}
{
@@ -272,7 +272,7 @@ StatusWith<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreference
stdx::lock_guard<stdx::mutex> lk(_state->mutex);
HostAndPort out = _state->getMatchingHost(criteria);
if (!out.empty())
- return out;
+ return {std::move(out)};
}
const auto startTimeMs = Date_t::now();
@@ -286,7 +286,11 @@ StatusWith<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreference
HostAndPort out = refresher.refreshUntilMatches(criteria);
if (!out.empty())
- return out;
+ return {std::move(out)};
+
+ if (globalInShutdownDeprecated()) {
+ return {ErrorCodes::ShutdownInProgress, str::stream() << "Server is shutting down"};
+ }
const Milliseconds remaining = maxWait - (Date_t::now() - startTimeMs);
@@ -298,11 +302,10 @@ StatusWith<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreference
sleepFor(kFindHostMaxBackOffTime);
}
- return Status(ErrorCodes::FailedToSatisfyReadPreference,
- str::stream() << "could not find host matching read preference "
- << criteria.toString()
- << " for set "
- << getName());
+ return {ErrorCodes::FailedToSatisfyReadPreference,
+ str::stream() << "Could not find host matching read preference " << criteria.toString()
+ << " for set "
+ << getName()};
}
HostAndPort ReplicaSetMonitor::getMasterOrUassert() {
@@ -468,15 +471,21 @@ void ReplicaSetMonitor::markAsRemoved() {
_isRemovedFromManager.store(true);
}
-Refresher::Refresher(const SetStatePtr& setState)
- : _set(setState), _scan(setState->currentScan), _startedNewScan(false) {
+Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan) {
if (_scan)
return; // participate in in-progress scan
LOG(2) << "Starting new refresh of replica set " << _set->name;
_scan = startNewScan(_set.get());
_set->currentScan = _scan;
- _startedNewScan = true;
+}
+
+HostAndPort Refresher::refreshUntilMatches(const ReadPreferenceSetting& criteria) {
+ return _refreshUntilMatches(&criteria);
+};
+
+void Refresher::refreshAll() {
+ _refreshUntilMatches(nullptr);
}
Refresher::NextStep Refresher::getNextStep() {
diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h
index 93394f2386d..f770d5ce9be 100644
--- a/src/mongo/client/replica_set_monitor.h
+++ b/src/mongo/client/replica_set_monitor.h
@@ -296,9 +296,7 @@ public:
*
* This is called by ReplicaSetMonitor::getHostWithRefresh()
*/
- HostAndPort refreshUntilMatches(const ReadPreferenceSetting& criteria) {
- return _refreshUntilMatches(&criteria);
- };
+ HostAndPort refreshUntilMatches(const ReadPreferenceSetting& criteria);
/**
* Refresh all hosts. Equivalent to refreshUntilMatches with a criteria that never
@@ -306,9 +304,7 @@ public:
*
* This is intended to be called periodically, possibly from a background thread.
*/
- void refreshAll() {
- _refreshUntilMatches(NULL);
- }
+ void refreshAll();
//
// Remaining methods are only for testing and internal use.
@@ -356,13 +352,6 @@ public:
void failedHost(const HostAndPort& host, const Status& status);
/**
- * True if this Refresher started a new full scan rather than joining an existing one.
- */
- bool startedNewScan() const {
- return _startedNewScan;
- }
-
- /**
* Starts a new scan over the hosts in set.
*/
static ScanStatePtr startNewScan(const SetState* set);
@@ -398,7 +387,6 @@ private:
// Both pointers are never NULL
SetStatePtr _set;
ScanStatePtr _scan; // May differ from _set->currentScan if a new scan has started.
- bool _startedNewScan;
};
} // namespace mongo