From b360179b6871b4ffac2c1995b7edfa104b67ab18 Mon Sep 17 00:00:00 2001 From: Kaloian Manassiev Date: Fri, 29 Sep 2017 15:53:57 -0400 Subject: SERVER-31333 Make ReplicaSetMonitor return ShutdownInProgress if the server is shutting down --- src/mongo/client/connpool.cpp | 4 +- src/mongo/client/replica_set_monitor.cpp | 87 ++++++++++++++++++-------------- src/mongo/client/replica_set_monitor.h | 16 +----- 3 files changed, 53 insertions(+), 54 deletions(-) (limited to 'src/mongo/client') diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp index 72b6d0b5c84..30325079fab 100644 --- a/src/mongo/client/connpool.cpp +++ b/src/mongo/client/connpool.cpp @@ -196,7 +196,9 @@ DBConnectionPool::DBConnectionPool() _hooks(new list()) {} DBClientBase* DBConnectionPool::_get(const string& ident, double socketTimeout) { - uassert(17382, "Can't use connection pool during shutdown", !globalInShutdownDeprecated()); + uassert(ErrorCodes::ShutdownInProgress, + "Can't use connection pool during shutdown", + !globalInShutdownDeprecated()); stdx::lock_guard L(_mutex); PoolForHost& p = _pools[PoolKey(ident, socketTimeout)]; p.setMaxPoolSize(_maxPoolSize); diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp index ee4dbe0657b..d6fefc3f492 100644 --- a/src/mongo/client/replica_set_monitor.cpp +++ b/src/mongo/client/replica_set_monitor.cpp @@ -227,44 +227,44 @@ void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) { Timer t; startOrContinueRefresh().refreshAll(); LOG(1) << "Refreshing replica set " << getName() << " took " << t.millis() << " msec"; - { - // reschedule itself - invariant(_executor); - if (_isRemovedFromManager.load()) { // already removed so no need to refresh - LOG(1) << "Stopping refresh for replica set " << getName() << " because its removed"; - return; - } - stdx::lock_guard lk(_mutex); - std::weak_ptr that(shared_from_this()); - auto status = _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod, - [=](const CallbackArgs& cbArgs) { - if (auto ptr = that.lock()) { - ptr->_refresh(cbArgs); - } - }); - - if (status.getStatus() == ErrorCodes::ShutdownInProgress) { - LOG(1) << "Cant schedule refresh for " << getName() - << ". Executor shutdown in progress"; - return; - } + // Reschedule the refresh + invariant(_executor); - if (!status.isOK()) { - severe() << "Can't continue refresh for replica set " << getName() << " due to " - << redact(status.getStatus()); - fassertFailed(40140); - } + if (_isRemovedFromManager.load()) { // already removed so no need to refresh + LOG(1) << "Stopping refresh for replica set " << getName() << " because its removed"; + return; + } + + stdx::lock_guard lk(_mutex); + + std::weak_ptr that(shared_from_this()); + auto status = _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod, + [=](const CallbackArgs& cbArgs) { + if (auto ptr = that.lock()) { + ptr->_refresh(cbArgs); + } + }); - _refresherHandle = status.getValue(); + if (status.getStatus() == ErrorCodes::ShutdownInProgress) { + LOG(1) << "Cant schedule refresh for " << getName() << ". Executor shutdown in progress"; + return; } + + if (!status.isOK()) { + severe() << "Can't continue refresh for replica set " << getName() << " due to " + << redact(status.getStatus()); + fassertFailed(40140); + } + + _refresherHandle = status.getValue(); } StatusWith ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria, Milliseconds maxWait) { if (_isRemovedFromManager.load()) { - return Status(ErrorCodes::ReplicaSetMonitorRemoved, - str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed"); + return {ErrorCodes::ReplicaSetMonitorRemoved, + str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed"}; } { @@ -272,7 +272,7 @@ StatusWith ReplicaSetMonitor::getHostOrRefresh(const ReadPreference stdx::lock_guard lk(_state->mutex); HostAndPort out = _state->getMatchingHost(criteria); if (!out.empty()) - return out; + return {std::move(out)}; } const auto startTimeMs = Date_t::now(); @@ -286,7 +286,11 @@ StatusWith ReplicaSetMonitor::getHostOrRefresh(const ReadPreference HostAndPort out = refresher.refreshUntilMatches(criteria); if (!out.empty()) - return out; + return {std::move(out)}; + + if (globalInShutdownDeprecated()) { + return {ErrorCodes::ShutdownInProgress, str::stream() << "Server is shutting down"}; + } const Milliseconds remaining = maxWait - (Date_t::now() - startTimeMs); @@ -298,11 +302,10 @@ StatusWith ReplicaSetMonitor::getHostOrRefresh(const ReadPreference sleepFor(kFindHostMaxBackOffTime); } - return Status(ErrorCodes::FailedToSatisfyReadPreference, - str::stream() << "could not find host matching read preference " - << criteria.toString() - << " for set " - << getName()); + return {ErrorCodes::FailedToSatisfyReadPreference, + str::stream() << "Could not find host matching read preference " << criteria.toString() + << " for set " + << getName()}; } HostAndPort ReplicaSetMonitor::getMasterOrUassert() { @@ -468,15 +471,21 @@ void ReplicaSetMonitor::markAsRemoved() { _isRemovedFromManager.store(true); } -Refresher::Refresher(const SetStatePtr& setState) - : _set(setState), _scan(setState->currentScan), _startedNewScan(false) { +Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan) { if (_scan) return; // participate in in-progress scan LOG(2) << "Starting new refresh of replica set " << _set->name; _scan = startNewScan(_set.get()); _set->currentScan = _scan; - _startedNewScan = true; +} + +HostAndPort Refresher::refreshUntilMatches(const ReadPreferenceSetting& criteria) { + return _refreshUntilMatches(&criteria); +}; + +void Refresher::refreshAll() { + _refreshUntilMatches(nullptr); } Refresher::NextStep Refresher::getNextStep() { diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h index 93394f2386d..f770d5ce9be 100644 --- a/src/mongo/client/replica_set_monitor.h +++ b/src/mongo/client/replica_set_monitor.h @@ -296,9 +296,7 @@ public: * * This is called by ReplicaSetMonitor::getHostWithRefresh() */ - HostAndPort refreshUntilMatches(const ReadPreferenceSetting& criteria) { - return _refreshUntilMatches(&criteria); - }; + HostAndPort refreshUntilMatches(const ReadPreferenceSetting& criteria); /** * Refresh all hosts. Equivalent to refreshUntilMatches with a criteria that never @@ -306,9 +304,7 @@ public: * * This is intended to be called periodically, possibly from a background thread. */ - void refreshAll() { - _refreshUntilMatches(NULL); - } + void refreshAll(); // // Remaining methods are only for testing and internal use. @@ -355,13 +351,6 @@ public: */ void failedHost(const HostAndPort& host, const Status& status); - /** - * True if this Refresher started a new full scan rather than joining an existing one. - */ - bool startedNewScan() const { - return _startedNewScan; - } - /** * Starts a new scan over the hosts in set. */ @@ -398,7 +387,6 @@ private: // Both pointers are never NULL SetStatePtr _set; ScanStatePtr _scan; // May differ from _set->currentScan if a new scan has started. - bool _startedNewScan; }; } // namespace mongo -- cgit v1.2.1