/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault #include "mongo/platform/basic.h" #include "mongo/db/keys_collection_manager_sharding.h" #include "mongo/db/keys_collection_cache_reader.h" #include "mongo/db/keys_collection_cache_reader_and_updater.h" #include "mongo/db/keys_collection_client.h" #include "mongo/db/logical_clock.h" #include "mongo/db/logical_time.h" #include "mongo/db/operation_context.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/stdx/memory.h" #include "mongo/util/concurrency/idle_thread_block.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/time_support.h" namespace mongo { namespace { Milliseconds kDefaultRefreshWaitTime(30 * 1000); Milliseconds kRefreshIntervalIfErrored(200); Milliseconds kMaxRefreshWaitTime(10 * 60 * 1000); // Prevents the refresher thread from waiting longer than the given number of milliseconds, even on // a successful refresh. MONGO_FP_DECLARE(maxKeyRefreshWaitTimeOverrideMS); /** * Returns the amount of time to wait until the monitoring thread should attempt to refresh again. */ Milliseconds howMuchSleepNeedFor(const LogicalTime& currentTime, const LogicalTime& latestExpiredAt, const Milliseconds& interval) { auto currentSecs = currentTime.asTimestamp().getSecs(); auto expiredSecs = latestExpiredAt.asTimestamp().getSecs(); if (currentSecs >= expiredSecs) { // This means that the last round didn't generate a usable key for the current time. // However, we don't want to poll too hard as well, so use a low interval. return kRefreshIntervalIfErrored; } auto millisBeforeExpire = 1000 * (expiredSecs - currentSecs); if (interval.count() <= millisBeforeExpire) { return interval; } return Milliseconds(millisBeforeExpire); } } // unnamed namespace KeysCollectionManagerSharding::KeysCollectionManagerSharding( std::string purpose, std::unique_ptr client, Seconds keyValidForInterval) : _client(std::move(client)), _purpose(std::move(purpose)), _keyValidForInterval(keyValidForInterval), _keysCache(_purpose, _client.get()) {} StatusWith KeysCollectionManagerSharding::getKeyForValidation( OperationContext* opCtx, long long keyId, const LogicalTime& forThisTime) { auto keyStatus = _getKeyWithKeyIdCheck(keyId, forThisTime); if (keyStatus != ErrorCodes::KeyNotFound) { return keyStatus; } _refresher.refreshNow(opCtx); return _getKeyWithKeyIdCheck(keyId, forThisTime); } StatusWith KeysCollectionManagerSharding::getKeyForSigning( OperationContext* opCtx, const LogicalTime& forThisTime) { return _getKey(forThisTime); } StatusWith KeysCollectionManagerSharding::_getKeyWithKeyIdCheck( long long keyId, const LogicalTime& forThisTime) { auto keyStatus = _keysCache.getKeyById(keyId, forThisTime); if (!keyStatus.isOK()) { return keyStatus; } return keyStatus.getValue(); } StatusWith KeysCollectionManagerSharding::_getKey( const LogicalTime& forThisTime) { auto keyStatus = _keysCache.getKey(forThisTime); if (!keyStatus.isOK()) { return keyStatus; } const auto& key = keyStatus.getValue(); if (key.getExpiresAt() < forThisTime) { return {ErrorCodes::KeyNotFound, str::stream() << "No keys found for " << _purpose << " that is valid for " << forThisTime.toString()}; } return key; } void KeysCollectionManagerSharding::refreshNow(OperationContext* opCtx) { _refresher.refreshNow(opCtx); } void KeysCollectionManagerSharding::startMonitoring(ServiceContext* service) { _keysCache.resetCache(); _refresher.setFunc([this](OperationContext* opCtx) { return _keysCache.refresh(opCtx); }); _refresher.start( service, str::stream() << "monitoring keys for " << _purpose, _keyValidForInterval); } void KeysCollectionManagerSharding::stopMonitoring() { _refresher.stop(); } void KeysCollectionManagerSharding::enableKeyGenerator(OperationContext* opCtx, bool doEnable) { if (doEnable) { _refresher.switchFunc(opCtx, [this](OperationContext* opCtx) { KeysCollectionCacheReaderAndUpdater keyGenerator( _purpose, _client.get(), _keyValidForInterval); auto keyGenerationStatus = keyGenerator.refresh(opCtx); if (ErrorCodes::isShutdownError(keyGenerationStatus.getStatus().code())) { return keyGenerationStatus; } // An error encountered by the keyGenerator should not prevent refreshing the cache auto cacheRefreshStatus = _keysCache.refresh(opCtx); if (!keyGenerationStatus.isOK()) { return keyGenerationStatus; } return cacheRefreshStatus; }); } else { _refresher.switchFunc( opCtx, [this](OperationContext* opCtx) { return _keysCache.refresh(opCtx); }); } } bool KeysCollectionManagerSharding::hasSeenKeys() { return _refresher.hasSeenKeys(); } void KeysCollectionManagerSharding::PeriodicRunner::refreshNow(OperationContext* opCtx) { auto refreshRequest = [this]() { stdx::lock_guard lk(_mutex); if (_inShutdown) { uasserted(ErrorCodes::ShutdownInProgress, "aborting keys cache refresh because node is shutting down"); } if (_refreshRequest) { return _refreshRequest; } _refreshNeededCV.notify_all(); _refreshRequest = std::make_shared>(); return _refreshRequest; }(); // note: waitFor waits min(maxTimeMS, kDefaultRefreshWaitTime). // waitFor also throws if timeout, so also throw when notification was not satisfied after // waiting. if (!refreshRequest->waitFor(opCtx, kDefaultRefreshWaitTime)) { uasserted(ErrorCodes::ExceededTimeLimit, "timed out waiting for refresh"); } } void KeysCollectionManagerSharding::PeriodicRunner::_doPeriodicRefresh( ServiceContext* service, std::string threadName, Milliseconds refreshInterval) { Client::initThreadIfNotAlready(threadName); while (true) { bool hasRefreshRequestInitially = false; unsigned errorCount = 0; std::shared_ptr doRefresh; { stdx::lock_guard lock(_mutex); if (_inShutdown) { break; } invariant(_doRefresh.get() != nullptr); doRefresh = _doRefresh; hasRefreshRequestInitially = _refreshRequest.get() != nullptr; } Milliseconds nextWakeup = kRefreshIntervalIfErrored; // No need to refresh keys in FCV 3.4, since key generation will be disabled. if (serverGlobalParams.featureCompatibility.getVersion() == ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo36) { auto opCtx = cc().makeOperationContext(); auto latestKeyStatusWith = (*doRefresh)(opCtx.get()); if (latestKeyStatusWith.getStatus().isOK()) { errorCount = 0; const auto& latestKey = latestKeyStatusWith.getValue(); auto currentTime = LogicalClock::get(service)->getClusterTime(); { stdx::unique_lock lock(_mutex); _hasSeenKeys = true; } nextWakeup = howMuchSleepNeedFor(currentTime, latestKey.getExpiresAt(), refreshInterval); } else { errorCount += 1; nextWakeup = Milliseconds(kRefreshIntervalIfErrored.count() * errorCount); if (nextWakeup > kMaxRefreshWaitTime) { nextWakeup = kMaxRefreshWaitTime; } } } else { nextWakeup = kDefaultRefreshWaitTime; } MONGO_FAIL_POINT_BLOCK(maxKeyRefreshWaitTimeOverrideMS, data) { const BSONObj& dataObj = data.getData(); auto overrideMS = Milliseconds(dataObj["overrideMS"].numberInt()); if (nextWakeup > overrideMS) { nextWakeup = overrideMS; } } stdx::unique_lock lock(_mutex); if (_refreshRequest) { if (!hasRefreshRequestInitially) { // A fresh request came in, fulfill the request before going to sleep. continue; } _refreshRequest->set(); _refreshRequest.reset(); } if (_inShutdown) { break; } // Use a new opCtx so we won't be holding any RecoveryUnit while this thread goes to sleep. auto opCtx = cc().makeOperationContext(); MONGO_IDLE_THREAD_BLOCK; auto sleepStatus = opCtx->waitForConditionOrInterruptNoAssertUntil( _refreshNeededCV, lock, Date_t::now() + nextWakeup); if (ErrorCodes::isShutdownError(sleepStatus.getStatus().code())) { break; } } stdx::unique_lock lock(_mutex); if (_refreshRequest) { _refreshRequest->set(); _refreshRequest.reset(); } } void KeysCollectionManagerSharding::PeriodicRunner::setFunc(RefreshFunc newRefreshStrategy) { stdx::lock_guard lock(_mutex); _doRefresh = std::make_shared(std::move(newRefreshStrategy)); _refreshNeededCV.notify_all(); } void KeysCollectionManagerSharding::PeriodicRunner::switchFunc(OperationContext* opCtx, RefreshFunc newRefreshStrategy) { setFunc(newRefreshStrategy); } void KeysCollectionManagerSharding::PeriodicRunner::start(ServiceContext* service, const std::string& threadName, Milliseconds refreshInterval) { stdx::lock_guard lock(_mutex); invariant(!_backgroundThread.joinable()); invariant(!_inShutdown); _backgroundThread = stdx::thread(stdx::bind(&KeysCollectionManagerSharding::PeriodicRunner::_doPeriodicRefresh, this, service, threadName, refreshInterval)); } void KeysCollectionManagerSharding::PeriodicRunner::stop() { { stdx::lock_guard lock(_mutex); if (!_backgroundThread.joinable()) { return; } _inShutdown = true; _hasSeenKeys = false; _refreshNeededCV.notify_all(); } _backgroundThread.join(); } bool KeysCollectionManagerSharding::PeriodicRunner::hasSeenKeys() { stdx::lock_guard lock(_mutex); return _hasSeenKeys; } void KeysCollectionManagerSharding::clearCache() { _keysCache.resetCache(); } } // namespace mongo