diff options
author | Kaitlin Mahar <kaitlin.mahar@mongodb.com> | 2017-12-07 14:56:25 -0500 |
---|---|---|
committer | Kaitlin Mahar <kaitlin.mahar@mongodb.com> | 2017-12-13 11:41:02 -0500 |
commit | 65c64a7845ca8159028148b25d3ec1867158108d (patch) | |
tree | dc8f4abd70848187b906d9b76bcad4b15faaf05b /src/mongo/db/keys_collection_manager.cpp | |
parent | 33bf1580977524b1beb84f9bcedd51547c68aeb4 (diff) | |
download | mongo-65c64a7845ca8159028148b25d3ec1867158108d.tar.gz |
SERVER-31316 Remove unused code and unnecessary inheritance in KeysCollectionManager* classes
Diffstat (limited to 'src/mongo/db/keys_collection_manager.cpp')
-rw-r--r-- | src/mongo/db/keys_collection_manager.cpp | 315 |
1 files changed, 314 insertions, 1 deletions
diff --git a/src/mongo/db/keys_collection_manager.cpp b/src/mongo/db/keys_collection_manager.cpp index 1ef6a8bb673..24fb4a8b6da 100644 --- a/src/mongo/db/keys_collection_manager.cpp +++ b/src/mongo/db/keys_collection_manager.cpp @@ -25,12 +25,27 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault #include "mongo/platform/basic.h" #include "mongo/db/keys_collection_manager.h" +#include "mongo/db/keys_collection_cache_reader.h" +#include "mongo/db/keys_collection_cache_reader_and_updater.h" +#include "mongo/db/keys_collection_client.h" +#include "mongo/db/logical_clock.h" +#include "mongo/db/logical_time.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" +#include "mongo/db/service_context.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/concurrency/idle_thread_block.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/time_support.h" namespace mongo { @@ -41,6 +56,304 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(KeysRotationIntervalSec, int, KeysCollectionManager::kKeyValidInterval.count()); -KeysCollectionManager::~KeysCollectionManager() = default; +namespace { + +Milliseconds kDefaultRefreshWaitTime(30 * 1000); +Milliseconds kRefreshIntervalIfErrored(200); +Milliseconds kMaxRefreshWaitTime(10 * 60 * 1000); + +// Prevents the refresher thread from waiting longer than the given number of milliseconds, even on +// a successful refresh. +MONGO_FP_DECLARE(maxKeyRefreshWaitTimeOverrideMS); + +/** + * Returns the amount of time to wait until the monitoring thread should attempt to refresh again. + */ +Milliseconds howMuchSleepNeedFor(const LogicalTime& currentTime, + const LogicalTime& latestExpiredAt, + const Milliseconds& interval) { + auto currentSecs = currentTime.asTimestamp().getSecs(); + auto expiredSecs = latestExpiredAt.asTimestamp().getSecs(); + + if (currentSecs >= expiredSecs) { + // This means that the last round didn't generate a usable key for the current time. + // However, we don't want to poll too hard as well, so use a low interval. + return kRefreshIntervalIfErrored; + } + + auto millisBeforeExpire = 1000 * (expiredSecs - currentSecs); + + if (interval.count() <= millisBeforeExpire) { + return interval; + } + + return Milliseconds(millisBeforeExpire); +} + +} // unnamed namespace + +KeysCollectionManager::KeysCollectionManager(std::string purpose, + std::unique_ptr<KeysCollectionClient> client, + Seconds keyValidForInterval) + : _client(std::move(client)), + _purpose(std::move(purpose)), + _keyValidForInterval(keyValidForInterval), + _keysCache(_purpose, _client.get()) {} + + +StatusWith<KeysCollectionDocument> KeysCollectionManager::getKeyForValidation( + OperationContext* opCtx, long long keyId, const LogicalTime& forThisTime) { + auto keyStatus = _getKeyWithKeyIdCheck(keyId, forThisTime); + + if (keyStatus != ErrorCodes::KeyNotFound) { + return keyStatus; + } + + _refresher.refreshNow(opCtx); + + return _getKeyWithKeyIdCheck(keyId, forThisTime); +} + +StatusWith<KeysCollectionDocument> KeysCollectionManager::getKeyForSigning( + OperationContext* opCtx, const LogicalTime& forThisTime) { + return _getKey(forThisTime); +} + +StatusWith<KeysCollectionDocument> KeysCollectionManager::_getKeyWithKeyIdCheck( + long long keyId, const LogicalTime& forThisTime) { + auto keyStatus = _keysCache.getKeyById(keyId, forThisTime); + + if (!keyStatus.isOK()) { + return keyStatus; + } + + return keyStatus.getValue(); +} + +StatusWith<KeysCollectionDocument> KeysCollectionManager::_getKey(const LogicalTime& forThisTime) { + auto keyStatus = _keysCache.getKey(forThisTime); + + if (!keyStatus.isOK()) { + return keyStatus; + } + + const auto& key = keyStatus.getValue(); + + if (key.getExpiresAt() < forThisTime) { + return {ErrorCodes::KeyNotFound, + str::stream() << "No keys found for " << _purpose << " that is valid for " + << forThisTime.toString()}; + } + + return key; +} + +void KeysCollectionManager::refreshNow(OperationContext* opCtx) { + _refresher.refreshNow(opCtx); +} + +void KeysCollectionManager::startMonitoring(ServiceContext* service) { + _keysCache.resetCache(); + _refresher.setFunc([this](OperationContext* opCtx) { return _keysCache.refresh(opCtx); }); + _refresher.start( + service, str::stream() << "monitoring keys for " << _purpose, _keyValidForInterval); +} + +void KeysCollectionManager::stopMonitoring() { + _refresher.stop(); +} + +void KeysCollectionManager::enableKeyGenerator(OperationContext* opCtx, bool doEnable) { + if (doEnable) { + _refresher.switchFunc(opCtx, [this](OperationContext* opCtx) { + KeysCollectionCacheReaderAndUpdater keyGenerator( + _purpose, _client.get(), _keyValidForInterval); + auto keyGenerationStatus = keyGenerator.refresh(opCtx); + + if (ErrorCodes::isShutdownError(keyGenerationStatus.getStatus().code())) { + return keyGenerationStatus; + } + + // An error encountered by the keyGenerator should not prevent refreshing the cache + auto cacheRefreshStatus = _keysCache.refresh(opCtx); + + if (!keyGenerationStatus.isOK()) { + return keyGenerationStatus; + } + + return cacheRefreshStatus; + }); + } else { + _refresher.switchFunc( + opCtx, [this](OperationContext* opCtx) { return _keysCache.refresh(opCtx); }); + } +} + +bool KeysCollectionManager::hasSeenKeys() { + return _refresher.hasSeenKeys(); +} + +void KeysCollectionManager::PeriodicRunner::refreshNow(OperationContext* opCtx) { + auto refreshRequest = [this]() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (_inShutdown) { + uasserted(ErrorCodes::ShutdownInProgress, + "aborting keys cache refresh because node is shutting down"); + } + + if (_refreshRequest) { + return _refreshRequest; + } + + _refreshNeededCV.notify_all(); + _refreshRequest = std::make_shared<Notification<void>>(); + return _refreshRequest; + }(); + + // note: waitFor waits min(maxTimeMS, kDefaultRefreshWaitTime). + // waitFor also throws if timeout, so also throw when notification was not satisfied after + // waiting. + if (!refreshRequest->waitFor(opCtx, kDefaultRefreshWaitTime)) { + uasserted(ErrorCodes::ExceededTimeLimit, "timed out waiting for refresh"); + } +} + +void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* service, + std::string threadName, + Milliseconds refreshInterval) { + Client::initThreadIfNotAlready(threadName); + + while (true) { + bool hasRefreshRequestInitially = false; + unsigned errorCount = 0; + std::shared_ptr<RefreshFunc> doRefresh; + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + if (_inShutdown) { + break; + } + + invariant(_doRefresh.get() != nullptr); + doRefresh = _doRefresh; + hasRefreshRequestInitially = _refreshRequest.get() != nullptr; + } + + Milliseconds nextWakeup = kRefreshIntervalIfErrored; + + // No need to refresh keys in FCV 3.4, since key generation will be disabled. + if (serverGlobalParams.featureCompatibility.getVersion() == + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo36) { + auto opCtx = cc().makeOperationContext(); + + auto latestKeyStatusWith = (*doRefresh)(opCtx.get()); + if (latestKeyStatusWith.getStatus().isOK()) { + errorCount = 0; + const auto& latestKey = latestKeyStatusWith.getValue(); + auto currentTime = LogicalClock::get(service)->getClusterTime(); + + { + stdx::unique_lock<stdx::mutex> lock(_mutex); + _hasSeenKeys = true; + } + + nextWakeup = + howMuchSleepNeedFor(currentTime, latestKey.getExpiresAt(), refreshInterval); + } else { + errorCount += 1; + nextWakeup = Milliseconds(kRefreshIntervalIfErrored.count() * errorCount); + if (nextWakeup > kMaxRefreshWaitTime) { + nextWakeup = kMaxRefreshWaitTime; + } + } + } else { + nextWakeup = kDefaultRefreshWaitTime; + } + + MONGO_FAIL_POINT_BLOCK(maxKeyRefreshWaitTimeOverrideMS, data) { + const BSONObj& dataObj = data.getData(); + auto overrideMS = Milliseconds(dataObj["overrideMS"].numberInt()); + if (nextWakeup > overrideMS) { + nextWakeup = overrideMS; + } + } + + stdx::unique_lock<stdx::mutex> lock(_mutex); + + if (_refreshRequest) { + if (!hasRefreshRequestInitially) { + // A fresh request came in, fulfill the request before going to sleep. + continue; + } + + _refreshRequest->set(); + _refreshRequest.reset(); + } + + if (_inShutdown) { + break; + } + + // Use a new opCtx so we won't be holding any RecoveryUnit while this thread goes to sleep. + auto opCtx = cc().makeOperationContext(); + + MONGO_IDLE_THREAD_BLOCK; + auto sleepStatus = opCtx->waitForConditionOrInterruptNoAssertUntil( + _refreshNeededCV, lock, Date_t::now() + nextWakeup); + + if (ErrorCodes::isShutdownError(sleepStatus.getStatus().code())) { + break; + } + } + + stdx::unique_lock<stdx::mutex> lock(_mutex); + if (_refreshRequest) { + _refreshRequest->set(); + _refreshRequest.reset(); + } +} + +void KeysCollectionManager::PeriodicRunner::setFunc(RefreshFunc newRefreshStrategy) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + _doRefresh = std::make_shared<RefreshFunc>(std::move(newRefreshStrategy)); + _refreshNeededCV.notify_all(); +} + +void KeysCollectionManager::PeriodicRunner::switchFunc(OperationContext* opCtx, + RefreshFunc newRefreshStrategy) { + setFunc(newRefreshStrategy); +} + +void KeysCollectionManager::PeriodicRunner::start(ServiceContext* service, + const std::string& threadName, + Milliseconds refreshInterval) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(!_backgroundThread.joinable()); + invariant(!_inShutdown); + + _backgroundThread = stdx::thread([this, service, threadName, refreshInterval] { + _doPeriodicRefresh(service, threadName, refreshInterval); + }); +} + +void KeysCollectionManager::PeriodicRunner::stop() { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (!_backgroundThread.joinable()) { + return; + } + + _inShutdown = true; + _refreshNeededCV.notify_all(); + } + + _backgroundThread.join(); +} + +bool KeysCollectionManager::PeriodicRunner::hasSeenKeys() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _hasSeenKeys; +} } // namespace mongo |