diff options
author | Misha Tyulenev <misha@mongodb.com> | 2017-09-27 23:23:24 -0400 |
---|---|---|
committer | Misha Tyulenev <misha@mongodb.com> | 2017-09-27 23:23:46 -0400 |
commit | eeee1e2b64f70e8487f017ba579f3ca861c81e4f (patch) | |
tree | e1452828e142748f1f03be61a00c32dbb3ed6bc1 /src | |
parent | 55637833c707998f685f997d43624c52cde99b45 (diff) | |
download | mongo-eeee1e2b64f70e8487f017ba579f3ca861c81e4f.tar.gz |
SERVER-30977 add clusterTime to standalone replica set
Diffstat (limited to 'src')
32 files changed, 726 insertions, 154 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 815116e21a5..5ee0bc4cc7a 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -815,6 +815,7 @@ env.Library( "index/index_descriptor", "index_d", "introspect", + 'keys_collection_client_direct', "matcher/expressions_mongod_only", "op_observer_d", "write_ops", @@ -1222,6 +1223,7 @@ env.Library( 'keys_collection_manager.cpp', ], LIBDEPS=[ + 'server_parameters', ], ) @@ -1250,6 +1252,34 @@ env.Library( ) env.Library( + target='keys_collection_client_direct', + source=[ + 'keys_collection_client_direct.cpp', + ], + LIBDEPS=[ + 'logical_clock', + 'keys_collection_document', + 'logical_time', + 'server_options', + '$BUILD_DIR/mongo/s/client/rs_local_client', + ], +) + +env.Library( + target='keys_collection_client_sharded', + source=[ + 'keys_collection_client_sharded.cpp', + ], + LIBDEPS=[ + 'logical_clock', + 'keys_collection_document', + 'logical_time', + 'server_options', + '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client', + ], +) + +env.Library( target='keys_collection_manager_sharding', source=[ 'keys_collection_manager_sharding.cpp', @@ -1262,7 +1292,7 @@ env.Library( 'keys_collection_manager', 'logical_time', 'server_options', - '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client', + 'keys_collection_client_sharded', ], ) diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index b71537cb235..8f29f0605bc 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -73,7 +73,10 @@ #include "mongo/db/initialize_snmp.h" #include "mongo/db/introspect.h" #include "mongo/db/json.h" +#include "mongo/db/keys_collection_client_direct.h" +#include "mongo/db/keys_collection_client_sharded.h" #include "mongo/db/keys_collection_manager.h" +#include "mongo/db/keys_collection_manager_sharding.h" #include "mongo/db/kill_sessions.h" #include "mongo/db/kill_sessions_local.h" #include "mongo/db/log_process_details.h" @@ -736,10 +739,19 @@ ExitCode _initAndListen(int listenPort) { ShardingCatalogManager::create( startupOpCtx->getServiceContext(), makeShardingTaskExecutor(executor::makeNetworkInterface("AddShard-TaskExecutor"))); + } else if (replSettings.usingReplSets()) { // standalone replica set + auto keysCollectionClient = stdx::make_unique<KeysCollectionClientDirect>(); + auto keyManager = std::make_shared<KeysCollectionManagerSharding>( + KeysCollectionManager::kKeyManagerPurposeString, + std::move(keysCollectionClient), + Seconds(KeysRotationIntervalSec)); + keyManager->startMonitoring(startupOpCtx->getServiceContext()); + + LogicalTimeValidator::set(startupOpCtx->getServiceContext(), + stdx::make_unique<LogicalTimeValidator>(keyManager)); } repl::ReplicationCoordinator::get(startupOpCtx.get())->startup(startupOpCtx.get()); - const unsigned long long missingRepl = checkIfReplMissingFromCommandLine(startupOpCtx.get()); if (missingRepl) { diff --git a/src/mongo/db/keys_collection_cache_reader.cpp b/src/mongo/db/keys_collection_cache_reader.cpp index 4aab840d162..a05f5c23233 100644 --- a/src/mongo/db/keys_collection_cache_reader.cpp +++ b/src/mongo/db/keys_collection_cache_reader.cpp @@ -30,14 +30,15 @@ #include "mongo/db/keys_collection_cache_reader.h" -#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/db/keys_collection_client.h" +#include "mongo/db/keys_collection_document.h" #include "mongo/util/mongoutils/str.h" namespace mongo { KeysCollectionCacheReader::KeysCollectionCacheReader(std::string purpose, - ShardingCatalogClient* client) - : _purpose(std::move(purpose)), _catalogClient(client) {} + KeysCollectionClient* client) + : _purpose(std::move(purpose)), _client(client) {} StatusWith<KeysCollectionDocument> KeysCollectionCacheReader::refresh(OperationContext* opCtx) { LogicalTime newerThanThis; @@ -50,8 +51,7 @@ StatusWith<KeysCollectionDocument> KeysCollectionCacheReader::refresh(OperationC } } - auto refreshStatus = _catalogClient->getNewKeys( - opCtx, _purpose, newerThanThis, repl::ReadConcernLevel::kMajorityReadConcern); + auto refreshStatus = _client->getNewKeys(opCtx, _purpose, newerThanThis); if (!refreshStatus.isOK()) { return refreshStatus.getStatus(); @@ -82,7 +82,8 @@ StatusWith<KeysCollectionDocument> KeysCollectionCacheReader::getKeyById( } return {ErrorCodes::KeyNotFound, - str::stream() << "No keys found for " << _purpose << " that is valid for time: " + str::stream() << "Cache Reader No keys found for " << _purpose + << " that is valid for time: " << forThisTime.toString() << " with id: " << keyId}; @@ -102,4 +103,11 @@ StatusWith<KeysCollectionDocument> KeysCollectionCacheReader::getKey( return iter->second; } +void KeysCollectionCacheReader::resetCache() { + // keys that read with non majority readConcern level can be rolled back. + if (!_client->supportsMajorityReads()) { + _cache.clear(); + } +} + } // namespace mongo diff --git a/src/mongo/db/keys_collection_cache_reader.h b/src/mongo/db/keys_collection_cache_reader.h index 12a215bb1d5..038c24384c8 100644 --- a/src/mongo/db/keys_collection_cache_reader.h +++ b/src/mongo/db/keys_collection_cache_reader.h @@ -35,7 +35,7 @@ namespace mongo { -class ShardingCatalogClient; +class KeysCollectionClient; /** * Keeps a local cache of the keys with the ability to refresh. @@ -44,7 +44,7 @@ class ShardingCatalogClient; */ class KeysCollectionCacheReader : public KeysCollectionCache { public: - KeysCollectionCacheReader(std::string purpose, ShardingCatalogClient* client); + KeysCollectionCacheReader(std::string purpose, KeysCollectionClient* client); ~KeysCollectionCacheReader() = default; /** @@ -56,9 +56,15 @@ public: StatusWith<KeysCollectionDocument> getKeyById(long long keyId, const LogicalTime& forThisTime) override; + /** + * Resets the cache of keys if the client doesnt allow readConcern level:majority reads. + * This method intended to be called on the rollback of the node. + */ + void resetCache(); + private: const std::string _purpose; - ShardingCatalogClient* const _catalogClient; + KeysCollectionClient* const _client; stdx::mutex _cacheMutex; std::map<LogicalTime, KeysCollectionDocument> _cache; // expiresAt -> KeysDocument diff --git a/src/mongo/db/keys_collection_cache_reader_and_updater.cpp b/src/mongo/db/keys_collection_cache_reader_and_updater.cpp index b9ae9ca78ac..ce20c5d5993 100644 --- a/src/mongo/db/keys_collection_cache_reader_and_updater.cpp +++ b/src/mongo/db/keys_collection_cache_reader_and_updater.cpp @@ -31,10 +31,10 @@ #include "mongo/db/keys_collection_cache_reader_and_updater.h" #include "mongo/client/read_preference.h" +#include "mongo/db/keys_collection_client.h" #include "mongo/db/logical_clock.h" #include "mongo/db/operation_context.h" #include "mongo/db/server_options.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/client/shard_registry.h" #include "mongo/util/fail_point_service.h" @@ -52,15 +52,12 @@ MONGO_FP_DECLARE(disableKeyGeneration); * locally and never remotely even if this node is no longer primary. */ Status insertNewKey(OperationContext* opCtx, - ShardingCatalogClient* client, + KeysCollectionClient* client, long long keyId, const std::string& purpose, const LogicalTime& expiresAt) { KeysCollectionDocument newKey(keyId, purpose, TimeProofService::generateRandomKey(), expiresAt); - return client->insertConfigDocument(opCtx, - KeysCollectionDocument::ConfigNS, - newKey.toBSON(), - ShardingCatalogClient::kMajorityWriteConcern); + return client->insertNewKey(opCtx, newKey.toBSON()); } /** @@ -73,11 +70,11 @@ LogicalTime addSeconds(const LogicalTime& logicalTime, const Seconds& seconds) { } // unnamed namespace KeysCollectionCacheReaderAndUpdater::KeysCollectionCacheReaderAndUpdater( - std::string purpose, ShardingCatalogClient* client, Seconds keyValidForInterval) + std::string purpose, KeysCollectionClient* client, Seconds keyValidForInterval) : KeysCollectionCacheReader(purpose, client), + _client(client), _purpose(std::move(purpose)), - _keyValidForInterval(keyValidForInterval), - _catalogClient(client) {} + _keyValidForInterval(keyValidForInterval) {} StatusWith<KeysCollectionDocument> KeysCollectionCacheReaderAndUpdater::refresh( OperationContext* opCtx) { @@ -92,8 +89,7 @@ StatusWith<KeysCollectionDocument> KeysCollectionCacheReaderAndUpdater::refresh( } auto currentTime = LogicalClock::get(opCtx)->getClusterTime(); - auto keyStatus = _catalogClient->getNewKeys( - opCtx, _purpose, currentTime, repl::ReadConcernLevel::kLocalReadConcern); + auto keyStatus = _client->getNewKeys(opCtx, _purpose, currentTime); if (!keyStatus.isOK()) { return keyStatus.getStatus(); @@ -108,7 +104,7 @@ StatusWith<KeysCollectionDocument> KeysCollectionCacheReaderAndUpdater::refresh( if (keyIter == newKeys.cend()) { currentKeyExpiresAt = addSeconds(currentTime, _keyValidForInterval); - auto status = insertNewKey(opCtx, _catalogClient, keyId, _purpose, currentKeyExpiresAt); + auto status = insertNewKey(opCtx, _client, keyId, _purpose, currentKeyExpiresAt); if (!status.isOK()) { return status; @@ -117,7 +113,7 @@ StatusWith<KeysCollectionDocument> KeysCollectionCacheReaderAndUpdater::refresh( keyId++; } else if (keyIter->getExpiresAt() < currentTime) { currentKeyExpiresAt = addSeconds(currentTime, _keyValidForInterval); - auto status = insertNewKey(opCtx, _catalogClient, keyId, _purpose, currentKeyExpiresAt); + auto status = insertNewKey(opCtx, _client, keyId, _purpose, currentKeyExpiresAt); if (!status.isOK()) { return status; @@ -135,14 +131,14 @@ StatusWith<KeysCollectionDocument> KeysCollectionCacheReaderAndUpdater::refresh( // Note: Convert this block into a loop if more reserved keys are desired. if (keyIter == newKeys.cend()) { auto reserveKeyExpiresAt = addSeconds(currentKeyExpiresAt, _keyValidForInterval); - auto status = insertNewKey(opCtx, _catalogClient, keyId, _purpose, reserveKeyExpiresAt); + auto status = insertNewKey(opCtx, _client, keyId, _purpose, reserveKeyExpiresAt); if (!status.isOK()) { return status; } } else if (keyIter->getExpiresAt() < currentTime) { currentKeyExpiresAt = addSeconds(currentKeyExpiresAt, _keyValidForInterval); - auto status = insertNewKey(opCtx, _catalogClient, keyId, _purpose, currentKeyExpiresAt); + auto status = insertNewKey(opCtx, _client, keyId, _purpose, currentKeyExpiresAt); if (!status.isOK()) { return status; diff --git a/src/mongo/db/keys_collection_cache_reader_and_updater.h b/src/mongo/db/keys_collection_cache_reader_and_updater.h index 4719f58c48b..fc079ae4334 100644 --- a/src/mongo/db/keys_collection_cache_reader_and_updater.h +++ b/src/mongo/db/keys_collection_cache_reader_and_updater.h @@ -35,7 +35,7 @@ namespace mongo { -class ShardingCatalogClient; +class KeysCollectionClient; /** * Keeps a local cache of the keys with the ability to refresh. The refresh method also makes sure @@ -49,7 +49,7 @@ class ShardingCatalogClient; class KeysCollectionCacheReaderAndUpdater : public KeysCollectionCacheReader { public: KeysCollectionCacheReaderAndUpdater(std::string purpose, - ShardingCatalogClient* client, + KeysCollectionClient* client, Seconds keyValidForInterval); ~KeysCollectionCacheReaderAndUpdater() = default; @@ -64,10 +64,9 @@ public: const LogicalTime& forThisTime) override; private: + KeysCollectionClient* const _client; const std::string _purpose; const Seconds _keyValidForInterval; - - ShardingCatalogClient* const _catalogClient; }; } // namespace mongo diff --git a/src/mongo/db/keys_collection_cache_reader_and_updater_test.cpp b/src/mongo/db/keys_collection_cache_reader_and_updater_test.cpp index 1a12ecc1a2d..75cf3d993e2 100644 --- a/src/mongo/db/keys_collection_cache_reader_and_updater_test.cpp +++ b/src/mongo/db/keys_collection_cache_reader_and_updater_test.cpp @@ -33,6 +33,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/keys_collection_cache_reader_and_updater.h" +#include "mongo/db/keys_collection_client_sharded.h" #include "mongo/db/keys_collection_document.h" #include "mongo/db/logical_clock.h" #include "mongo/db/namespace_string.h" @@ -59,6 +60,8 @@ protected: auto clockSource = stdx::make_unique<ClockSourceMock>(); operationContext()->getServiceContext()->setFastClockSource(std::move(clockSource)); + _catalogClient = stdx::make_unique<KeysCollectionClientSharded>( + Grid::get(operationContext())->catalogClient()); } std::unique_ptr<DistLockManager> makeDistLockManager( @@ -66,11 +69,17 @@ protected: invariant(distLockCatalog); return stdx::make_unique<DistLockManagerMock>(std::move(distLockCatalog)); } + + KeysCollectionClient* catalogClient() const { + return _catalogClient.get(); + } + +private: + std::unique_ptr<KeysCollectionClient> _catalogClient; }; TEST_F(CacheUpdaterTest, ShouldCreate2KeysFromEmpty) { - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient, Seconds(5)); + KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient(), Seconds(5)); const LogicalTime currentTime(LogicalTime(Timestamp(100, 2))); LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(currentTime); @@ -103,8 +112,7 @@ TEST_F(CacheUpdaterTest, ShouldCreate2KeysFromEmpty) { } TEST_F(CacheUpdaterTest, ShouldPropagateWriteError) { - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient, Seconds(5)); + KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient(), Seconds(5)); const LogicalTime currentTime(LogicalTime(Timestamp(100, 2))); LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(currentTime); @@ -116,8 +124,7 @@ TEST_F(CacheUpdaterTest, ShouldPropagateWriteError) { } TEST_F(CacheUpdaterTest, ShouldCreateAnotherKeyIfOnlyOneKeyExists) { - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient, Seconds(5)); + KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient(), Seconds(5)); LogicalClock::get(operationContext()) ->setClusterTimeFromTrustedSource(LogicalTime(Timestamp(100, 2))); @@ -171,8 +178,7 @@ TEST_F(CacheUpdaterTest, ShouldCreateAnotherKeyIfOnlyOneKeyExists) { } TEST_F(CacheUpdaterTest, ShouldCreateAnotherKeyIfNoValidKeyAfterCurrent) { - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient, Seconds(5)); + KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient(), Seconds(5)); LogicalClock::get(operationContext()) ->setClusterTimeFromTrustedSource(LogicalTime(Timestamp(108, 2))); @@ -263,8 +269,7 @@ TEST_F(CacheUpdaterTest, ShouldCreateAnotherKeyIfNoValidKeyAfterCurrent) { } TEST_F(CacheUpdaterTest, ShouldCreate2KeysIfAllKeysAreExpired) { - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient, Seconds(5)); + KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient(), Seconds(5)); LogicalClock::get(operationContext()) ->setClusterTimeFromTrustedSource(LogicalTime(Timestamp(120, 2))); @@ -368,8 +373,7 @@ TEST_F(CacheUpdaterTest, ShouldCreate2KeysIfAllKeysAreExpired) { } TEST_F(CacheUpdaterTest, ShouldNotCreateNewKeyIfThereAre2UnexpiredKeys) { - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient, Seconds(5)); + KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient(), Seconds(5)); LogicalClock::get(operationContext()) ->setClusterTimeFromTrustedSource(LogicalTime(Timestamp(100, 2))); @@ -435,8 +439,7 @@ TEST_F(CacheUpdaterTest, ShouldNotCreateNewKeyIfThereAre2UnexpiredKeys) { } TEST_F(CacheUpdaterTest, ShouldNotCreateKeysWithDisableKeyGenerationFailPoint) { - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient, Seconds(5)); + KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient(), Seconds(5)); const LogicalTime currentTime(LogicalTime(Timestamp(100, 0))); LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(currentTime); @@ -457,8 +460,7 @@ TEST_F(CacheUpdaterTest, ShouldNotCreateNewKeysInFeatureCompatiblityVersion34) { serverGlobalParams.featureCompatibility.version.store( ServerGlobalParams::FeatureCompatibility::Version::k34); - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient, Seconds(5)); + KeysCollectionCacheReaderAndUpdater updater("dummy", catalogClient(), Seconds(5)); const LogicalTime currentTime(LogicalTime(Timestamp(100, 0))); LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(currentTime); diff --git a/src/mongo/db/keys_collection_cache_reader_test.cpp b/src/mongo/db/keys_collection_cache_reader_test.cpp index 87100bca834..599c9351808 100644 --- a/src/mongo/db/keys_collection_cache_reader_test.cpp +++ b/src/mongo/db/keys_collection_cache_reader_test.cpp @@ -30,35 +30,50 @@ #include "mongo/db/jsobj.h" #include "mongo/db/keys_collection_cache_reader.h" +#include "mongo/db/keys_collection_client_sharded.h" #include "mongo/db/keys_collection_document.h" #include "mongo/db/operation_context.h" +#include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/config_server_test_fixture.h" #include "mongo/s/grid.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source_mock.h" namespace mongo { -using CacheReaderTest = ConfigServerTestFixture; +class CacheReaderTest : public ConfigServerTestFixture { +protected: + void setUp() override { + ConfigServerTestFixture::setUp(); + + _catalogClient = stdx::make_unique<KeysCollectionClientSharded>( + Grid::get(operationContext())->catalogClient()); + } + + KeysCollectionClient* catalogClient() const { + return _catalogClient.get(); + } + +private: + std::unique_ptr<KeysCollectionClient> _catalogClient; +}; TEST_F(CacheReaderTest, ErrorsIfCacheIsEmpty) { - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReader reader("test", catalogClient); + KeysCollectionCacheReader reader("test", catalogClient()); auto status = reader.getKey(LogicalTime(Timestamp(1, 0))).getStatus(); ASSERT_EQ(ErrorCodes::KeyNotFound, status.code()); ASSERT_FALSE(status.reason().empty()); } TEST_F(CacheReaderTest, RefreshErrorsIfCacheIsEmpty) { - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReader reader("test", catalogClient); + KeysCollectionCacheReader reader("test", catalogClient()); auto status = reader.refresh(operationContext()).getStatus(); ASSERT_EQ(ErrorCodes::KeyNotFound, status.code()); ASSERT_FALSE(status.reason().empty()); } TEST_F(CacheReaderTest, GetKeyShouldReturnCorrectKeyAfterRefresh) { - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReader reader("test", catalogClient); + KeysCollectionCacheReader reader("test", catalogClient()); KeysCollectionDocument origKey1( 1, "test", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(105, 0))); @@ -89,8 +104,7 @@ TEST_F(CacheReaderTest, GetKeyShouldReturnCorrectKeyAfterRefresh) { } TEST_F(CacheReaderTest, GetKeyShouldReturnErrorIfNoKeyIsValidForGivenTime) { - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReader reader("test", catalogClient); + KeysCollectionCacheReader reader("test", catalogClient()); KeysCollectionDocument origKey1( 1, "test", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(105, 0))); @@ -113,8 +127,7 @@ TEST_F(CacheReaderTest, GetKeyShouldReturnErrorIfNoKeyIsValidForGivenTime) { } TEST_F(CacheReaderTest, GetKeyShouldReturnOldestKeyPossible) { - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReader reader("test", catalogClient); + KeysCollectionCacheReader reader("test", catalogClient()); KeysCollectionDocument origKey0( 0, "test", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(100, 0))); @@ -155,8 +168,7 @@ TEST_F(CacheReaderTest, GetKeyShouldReturnOldestKeyPossible) { } TEST_F(CacheReaderTest, RefreshShouldNotGetKeysForOtherPurpose) { - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReader reader("test", catalogClient); + KeysCollectionCacheReader reader("test", catalogClient()); KeysCollectionDocument origKey0( 0, "dummy", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(100, 0))); @@ -200,8 +212,7 @@ TEST_F(CacheReaderTest, RefreshShouldNotGetKeysForOtherPurpose) { } TEST_F(CacheReaderTest, RefreshCanIncrementallyGetNewKeys) { - auto catalogClient = Grid::get(operationContext())->catalogClient(); - KeysCollectionCacheReader reader("test", catalogClient); + KeysCollectionCacheReader reader("test", catalogClient()); KeysCollectionDocument origKey0( 0, "test", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(100, 0))); diff --git a/src/mongo/db/keys_collection_client.h b/src/mongo/db/keys_collection_client.h new file mode 100644 index 00000000000..4e93343ff3f --- /dev/null +++ b/src/mongo/db/keys_collection_client.h @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2017 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <string> + +#include "mongo/base/status.h" +#include "mongo/db/keys_collection_document.h" + +namespace mongo { + +class OperationContext; +class LogicalTime; +class BSONObj; + +class KeysCollectionClient { +public: + virtual ~KeysCollectionClient() = default; + + /** + * Returns keys for the given purpose and with an expiresAt value greater than newerThanThis. + */ + virtual StatusWith<std::vector<KeysCollectionDocument>> getNewKeys( + OperationContext* opCtx, StringData purpose, const LogicalTime& newerThanThis) = 0; + + /** + * Directly inserts a key document to the storage + */ + virtual Status insertNewKey(OperationContext* opCtx, const BSONObj& doc) = 0; + + /** + * Returns true if it performs majority reads + */ + virtual bool supportsMajorityReads() const = 0; +}; + +} // namespace mongo diff --git a/src/mongo/db/keys_collection_client_direct.cpp b/src/mongo/db/keys_collection_client_direct.cpp new file mode 100644 index 00000000000..bd97a92a20f --- /dev/null +++ b/src/mongo/db/keys_collection_client_direct.cpp @@ -0,0 +1,169 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include <boost/optional.hpp> +#include <vector> + +#include "mongo/db/keys_collection_client_direct.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/read_preference.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/keys_collection_document.h" +#include "mongo/db/logical_clock.h" +#include "mongo/db/logical_time.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/server_options.h" +#include "mongo/db/service_context.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/write_ops/batched_command_request.h" +#include "mongo/s/write_ops/batched_command_response.h" +#include "mongo/util/log.h" + +namespace mongo { + +namespace { +const int kOnErrorNumRetries = 3; + +bool isRetriableError(ErrorCodes::Error code, Shard::RetryPolicy options) { + if (options == Shard::RetryPolicy::kNoRetry) { + return false; + } + + if (options == Shard::RetryPolicy::kIdempotent) { + return code == ErrorCodes::WriteConcernFailed; + } else { + invariant(options == Shard::RetryPolicy::kNotIdempotent); + return false; + } +} +} + +KeysCollectionClientDirect::KeysCollectionClientDirect() : _rsLocalClient() {} + +StatusWith<std::vector<KeysCollectionDocument>> KeysCollectionClientDirect::getNewKeys( + OperationContext* opCtx, StringData purpose, const LogicalTime& newerThanThis) { + + + BSONObjBuilder queryBuilder; + queryBuilder.append("purpose", purpose); + queryBuilder.append("expiresAt", BSON("$gt" << newerThanThis.asTimestamp())); + + auto findStatus = _query(opCtx, + ReadPreferenceSetting(ReadPreference::Nearest, TagSet{}), + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(KeysCollectionDocument::ConfigNS), + queryBuilder.obj(), + BSON("expiresAt" << 1), + boost::none); + + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + const auto& keyDocs = findStatus.getValue().docs; + std::vector<KeysCollectionDocument> keys; + for (auto&& keyDoc : keyDocs) { + auto parseStatus = KeysCollectionDocument::fromBSON(keyDoc); + if (!parseStatus.isOK()) { + return parseStatus.getStatus(); + } + + keys.push_back(std::move(parseStatus.getValue())); + } + + return keys; +} + +StatusWith<Shard::QueryResponse> KeysCollectionClientDirect::_query( + OperationContext* opCtx, + const ReadPreferenceSetting& readPref, + const repl::ReadConcernLevel& readConcernLevel, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& sort, + boost::optional<long long> limit) { + + for (int retry = 1; retry <= kOnErrorNumRetries; retry++) { + auto result = + _rsLocalClient.queryOnce(opCtx, readPref, readConcernLevel, nss, query, sort, limit); + + if (retry < kOnErrorNumRetries && + isRetriableError(result.getStatus().code(), Shard::RetryPolicy::kIdempotent)) { + continue; + } + + return result; + } + MONGO_UNREACHABLE; +} + +Status KeysCollectionClientDirect::_insert(OperationContext* opCtx, + const std::string& ns, + const BSONObj& doc, + const WriteConcernOptions& writeConcern) { + const NamespaceString nss(ns); + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setDocuments({doc}); + return insertOp; + }()); + request.setWriteConcern(writeConcern.toBSON()); + const BSONObj cmdObj = request.toBSON(); + + for (int retry = 1; retry <= kOnErrorNumRetries; ++retry) { + // Note: write commands can only be issued against a primary. + auto swResponse = _rsLocalClient.runCommandOnce(opCtx, nss.db().toString(), cmdObj); + + BatchedCommandResponse batchResponse; + auto writeStatus = + Shard::CommandResponse::processBatchWriteResponse(swResponse, &batchResponse); + if (retry < kOnErrorNumRetries && + isRetriableError(writeStatus.code(), Shard::RetryPolicy::kIdempotent)) { + LOG(2) << "Batch write command to " << nss.db() + << "failed with retriable error and will be retried" + << causedBy(redact(writeStatus)); + continue; + } + + return batchResponse.toStatus(); + } + MONGO_UNREACHABLE; +} + +Status KeysCollectionClientDirect::insertNewKey(OperationContext* opCtx, const BSONObj& doc) { + return _insert( + opCtx, KeysCollectionDocument::ConfigNS, doc, ShardingCatalogClient::kMajorityWriteConcern); +} +} // namespace mongo diff --git a/src/mongo/db/keys_collection_client_direct.h b/src/mongo/db/keys_collection_client_direct.h new file mode 100644 index 00000000000..7bc02ed1c4f --- /dev/null +++ b/src/mongo/db/keys_collection_client_direct.h @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2017 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <string> + +#include "mongo/base/status.h" +#include "mongo/db/keys_collection_client.h" +#include "mongo/s/client/rs_local_client.h" + +namespace mongo { + +class OperationContext; +class LogicalTime; +class BSONObj; + +class KeysCollectionClientDirect : public KeysCollectionClient { +public: + KeysCollectionClientDirect(); + /** + * Returns keys for the given purpose and with an expiresAt value greater than newerThanThis. + */ + StatusWith<std::vector<KeysCollectionDocument>> getNewKeys( + OperationContext* opCtx, StringData purpose, const LogicalTime& newerThanThis) override; + + /** + * Directly inserts a key document to the storage + */ + Status insertNewKey(OperationContext* opCtx, const BSONObj& doc) override; + + /** + * Returns false if getNewKeys uses readConcern level:local, so the documents returned can be + * rolled back. For level:majority support the nodes must always start with + * enableMajorityReadConcern parameter set to true. + */ + bool supportsMajorityReads() const final { + return false; + } + +private: + StatusWith<Shard::QueryResponse> _query(OperationContext* opCtx, + const ReadPreferenceSetting& readPref, + const repl::ReadConcernLevel& readConcernLevel, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& sort, + boost::optional<long long> limit); + + Status _insert(OperationContext* opCtx, + const std::string& ns, + const BSONObj& doc, + const WriteConcernOptions& writeConcern); + + RSLocalClient _rsLocalClient; +}; +} // namespace mongo diff --git a/src/mongo/db/keys_collection_client_sharded.cpp b/src/mongo/db/keys_collection_client_sharded.cpp new file mode 100644 index 00000000000..2f5bb8d6c9c --- /dev/null +++ b/src/mongo/db/keys_collection_client_sharded.cpp @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/keys_collection_client_sharded.h" +#include "mongo/s/catalog/sharding_catalog_client.h" + +namespace mongo { + +namespace {} // namespace + +KeysCollectionClientSharded::KeysCollectionClientSharded(ShardingCatalogClient* client) + : _catalogClient(client) {} + + +StatusWith<std::vector<KeysCollectionDocument>> KeysCollectionClientSharded::getNewKeys( + OperationContext* opCtx, StringData purpose, const LogicalTime& newerThanThis) { + + return _catalogClient->getNewKeys( + opCtx, purpose, newerThanThis, repl::ReadConcernLevel::kMajorityReadConcern); +} + +Status KeysCollectionClientSharded::insertNewKey(OperationContext* opCtx, const BSONObj& doc) { + return _catalogClient->insertConfigDocument( + opCtx, KeysCollectionDocument::ConfigNS, doc, ShardingCatalogClient::kMajorityWriteConcern); +} +} // namespace mongo diff --git a/src/mongo/db/keys_collection_client_sharded.h b/src/mongo/db/keys_collection_client_sharded.h new file mode 100644 index 00000000000..2ced14f2135 --- /dev/null +++ b/src/mongo/db/keys_collection_client_sharded.h @@ -0,0 +1,65 @@ +/** + * Copyright (C) 2017 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <string> + +#include "mongo/base/status.h" +#include "mongo/db/keys_collection_client.h" + +namespace mongo { + +class OperationContext; +class LogicalTime; +class BSONObj; +class ShardingCatalogClient; + +class KeysCollectionClientSharded : public KeysCollectionClient { +public: + KeysCollectionClientSharded(ShardingCatalogClient*); + /** + * Returns keys for the given purpose and with an expiresAt value greater than newerThanThis. + */ + StatusWith<std::vector<KeysCollectionDocument>> getNewKeys( + OperationContext* opCtx, StringData purpose, const LogicalTime& newerThanThis) override; + + /** + * Directly inserts a key document to the storage + */ + Status insertNewKey(OperationContext* opCtx, const BSONObj& doc) override; + + bool supportsMajorityReads() const final { + return true; + } + +private: + ShardingCatalogClient* const _catalogClient; +}; +} // namespace mongo diff --git a/src/mongo/db/keys_collection_manager.cpp b/src/mongo/db/keys_collection_manager.cpp index bed56f87847..1ef6a8bb673 100644 --- a/src/mongo/db/keys_collection_manager.cpp +++ b/src/mongo/db/keys_collection_manager.cpp @@ -30,9 +30,16 @@ #include "mongo/db/keys_collection_manager.h" +#include "mongo/db/server_parameters.h" + namespace mongo { const Seconds KeysCollectionManager::kKeyValidInterval{3 * 30 * 24 * 60 * 60}; // ~3 months +const std::string KeysCollectionManager::kKeyManagerPurposeString = "HMAC"; + +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(KeysRotationIntervalSec, + int, + KeysCollectionManager::kKeyValidInterval.count()); KeysCollectionManager::~KeysCollectionManager() = default; diff --git a/src/mongo/db/keys_collection_manager.h b/src/mongo/db/keys_collection_manager.h index 87b57ef1250..4d6771be838 100644 --- a/src/mongo/db/keys_collection_manager.h +++ b/src/mongo/db/keys_collection_manager.h @@ -39,6 +39,7 @@ class OperationContext; class LogicalTime; class ServiceContext; +extern int KeysRotationIntervalSec; /** * This is responsible for providing keys that can be used for HMAC computation. This also supports * automatic key rotation that happens on a configurable interval. @@ -46,6 +47,7 @@ class ServiceContext; class KeysCollectionManager { public: static const Seconds kKeyValidInterval; + static const std::string kKeyManagerPurposeString; virtual ~KeysCollectionManager(); diff --git a/src/mongo/db/keys_collection_manager_sharding.cpp b/src/mongo/db/keys_collection_manager_sharding.cpp index 42a4aaa961a..2c4ab5dfa55 100644 --- a/src/mongo/db/keys_collection_manager_sharding.cpp +++ b/src/mongo/db/keys_collection_manager_sharding.cpp @@ -25,6 +25,7 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault #include "mongo/platform/basic.h" @@ -32,6 +33,7 @@ #include "mongo/db/keys_collection_cache_reader.h" #include "mongo/db/keys_collection_cache_reader_and_updater.h" +#include "mongo/db/keys_collection_client.h" #include "mongo/db/logical_clock.h" #include "mongo/db/logical_time.h" #include "mongo/db/operation_context.h" @@ -40,6 +42,7 @@ #include "mongo/stdx/memory.h" #include "mongo/util/concurrency/idle_thread_block.h" #include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/time_support.h" @@ -81,13 +84,13 @@ Milliseconds howMuchSleepNeedFor(const LogicalTime& currentTime, } // unnamed namespace -KeysCollectionManagerSharding::KeysCollectionManagerSharding(std::string purpose, - ShardingCatalogClient* client, - Seconds keyValidForInterval) - : _purpose(std::move(purpose)), +KeysCollectionManagerSharding::KeysCollectionManagerSharding( + std::string purpose, std::unique_ptr<KeysCollectionClient> client, Seconds keyValidForInterval) + : _client(std::move(client)), + _purpose(std::move(purpose)), _keyValidForInterval(keyValidForInterval), - _catalogClient(client), - _keysCache(_purpose, client) {} + _keysCache(_purpose, _client.get()) {} + StatusWith<KeysCollectionDocument> KeysCollectionManagerSharding::getKeyForValidation( OperationContext* opCtx, long long keyId, const LogicalTime& forThisTime) { @@ -142,6 +145,7 @@ void KeysCollectionManagerSharding::refreshNow(OperationContext* opCtx) { } void KeysCollectionManagerSharding::startMonitoring(ServiceContext* service) { + _keysCache.resetCache(); _refresher.setFunc([this](OperationContext* opCtx) { return _keysCache.refresh(opCtx); }); _refresher.start( service, str::stream() << "monitoring keys for " << _purpose, _keyValidForInterval); @@ -155,7 +159,7 @@ void KeysCollectionManagerSharding::enableKeyGenerator(OperationContext* opCtx, if (doEnable) { _refresher.switchFunc(opCtx, [this](OperationContext* opCtx) { KeysCollectionCacheReaderAndUpdater keyGenerator( - _purpose, _catalogClient, _keyValidForInterval); + _purpose, _client.get(), _keyValidForInterval); auto keyGenerationStatus = keyGenerator.refresh(opCtx); if (ErrorCodes::isShutdownError(keyGenerationStatus.getStatus().code())) { diff --git a/src/mongo/db/keys_collection_manager_sharding.h b/src/mongo/db/keys_collection_manager_sharding.h index 82f358ebe18..d1ebd96e540 100644 --- a/src/mongo/db/keys_collection_manager_sharding.h +++ b/src/mongo/db/keys_collection_manager_sharding.h @@ -46,7 +46,7 @@ namespace mongo { class OperationContext; class LogicalTime; class ServiceContext; -class ShardingCatalogClient; +class KeysCollectionClient; /** * This implementation of the KeysCollectionManager queries the config servers for keys. @@ -58,7 +58,7 @@ public: static const Seconds kKeyValidInterval; KeysCollectionManagerSharding(std::string purpose, - ShardingCatalogClient* client, + std::unique_ptr<KeysCollectionClient> client, Seconds keyValidForInterval); /** @@ -185,9 +185,9 @@ private: */ StatusWith<KeysCollectionDocument> _getKey(const LogicalTime& forThisTime); + std::unique_ptr<KeysCollectionClient> _client; const std::string _purpose; const Seconds _keyValidForInterval; - ShardingCatalogClient* _catalogClient; // No mutex needed since the members below have their own mutexes. KeysCollectionCacheReader _keysCache; diff --git a/src/mongo/db/keys_collection_manager_sharding_test.cpp b/src/mongo/db/keys_collection_manager_sharding_test.cpp index d7c88d097f3..4c209a87e62 100644 --- a/src/mongo/db/keys_collection_manager_sharding_test.cpp +++ b/src/mongo/db/keys_collection_manager_sharding_test.cpp @@ -32,6 +32,7 @@ #include <string> #include "mongo/db/jsobj.h" +#include "mongo/db/keys_collection_client_sharded.h" #include "mongo/db/keys_collection_document.h" #include "mongo/db/keys_collection_manager_sharding.h" #include "mongo/db/logical_clock.h" @@ -68,9 +69,10 @@ protected: clockSource->advance(Seconds(1)); operationContext()->getServiceContext()->setFastClockSource(std::move(clockSource)); - auto catalogClient = Grid::get(operationContext())->catalogClient(); - _keyManager = - stdx::make_unique<KeysCollectionManagerSharding>("dummy", catalogClient, Seconds(1)); + auto catalogClient = stdx::make_unique<KeysCollectionClientSharded>( + Grid::get(operationContext())->catalogClient()); + _keyManager = stdx::make_unique<KeysCollectionManagerSharding>( + "dummy", std::move(catalogClient), Seconds(1)); } void tearDown() override { diff --git a/src/mongo/db/logical_time.cpp b/src/mongo/db/logical_time.cpp index 1bb5ce5b62f..709c43db2f6 100644 --- a/src/mongo/db/logical_time.cpp +++ b/src/mongo/db/logical_time.cpp @@ -68,9 +68,7 @@ LogicalTime LogicalTime::addTicks(uint64_t ticks) const { } std::string LogicalTime::toString() const { - StringBuilder buf; - buf << asTimestamp().toString(); - return buf.str(); + return toBSON().toString(); } std::array<unsigned char, sizeof(uint64_t)> LogicalTime::toUnsignedArray() const { @@ -79,4 +77,10 @@ std::array<unsigned char, sizeof(uint64_t)> LogicalTime::toUnsignedArray() const return output; } +BSONObj LogicalTime::toBSON() const { + BSONObjBuilder bldr; + bldr.append("ts", asTimestamp()); + return bldr.obj(); +} + } // namespace mongo diff --git a/src/mongo/db/logical_time.h b/src/mongo/db/logical_time.h index 719673cf231..f5f7e25d292 100644 --- a/src/mongo/db/logical_time.h +++ b/src/mongo/db/logical_time.h @@ -78,6 +78,11 @@ public: std::array<unsigned char, sizeof(uint64_t)> toUnsignedArray() const; /** + * serialize into BSON object. + */ + BSONObj toBSON() const; + + /** * An uninitialized value of LogicalTime. Default constructed. */ static const LogicalTime kUninitialized; diff --git a/src/mongo/db/logical_time_validator.cpp b/src/mongo/db/logical_time_validator.cpp index 9bbb9021ac7..2b336a5e561 100644 --- a/src/mongo/db/logical_time_validator.cpp +++ b/src/mongo/db/logical_time_validator.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + #include "mongo/platform/basic.h" #include "mongo/db/logical_time_validator.h" @@ -39,6 +41,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" #include "mongo/util/assert_util.h" +#include "mongo/util/log.h" namespace mongo { @@ -104,7 +107,7 @@ SignedLogicalTime LogicalTimeValidator::_getProof(const KeysCollectionDocument& } SignedLogicalTime LogicalTimeValidator::trySignLogicalTime(const LogicalTime& newTime) { - auto keyStatusWith = _keyManager->getKeyForSigning(nullptr, newTime); + auto keyStatusWith = _getKeyManagerCopy()->getKeyForSigning(nullptr, newTime); auto keyStatus = keyStatusWith.getStatus(); if (keyStatus == ErrorCodes::KeyNotFound) { @@ -118,13 +121,14 @@ SignedLogicalTime LogicalTimeValidator::trySignLogicalTime(const LogicalTime& ne SignedLogicalTime LogicalTimeValidator::signLogicalTime(OperationContext* opCtx, const LogicalTime& newTime) { - auto keyStatusWith = _keyManager->getKeyForSigning(nullptr, newTime); + auto keyManager = _getKeyManagerCopy(); + auto keyStatusWith = keyManager->getKeyForSigning(nullptr, newTime); auto keyStatus = keyStatusWith.getStatus(); while (keyStatus == ErrorCodes::KeyNotFound) { - _keyManager->refreshNow(opCtx); + keyManager->refreshNow(opCtx); - keyStatusWith = _keyManager->getKeyForSigning(nullptr, newTime); + keyStatusWith = keyManager->getKeyForSigning(nullptr, newTime); keyStatus = keyStatusWith.getStatus(); if (keyStatus == ErrorCodes::KeyNotFound) { @@ -144,7 +148,8 @@ Status LogicalTimeValidator::validate(OperationContext* opCtx, const SignedLogic } } - auto keyStatus = _keyManager->getKeyForValidation(opCtx, newTime.getKeyId(), newTime.getTime()); + auto keyStatus = + _getKeyManagerCopy()->getKeyForValidation(opCtx, newTime.getKeyId(), newTime.getTime()); uassertStatusOK(keyStatus.getStatus()); const auto& key = keyStatus.getValue().getKey(); @@ -163,15 +168,15 @@ Status LogicalTimeValidator::validate(OperationContext* opCtx, const SignedLogic } void LogicalTimeValidator::init(ServiceContext* service) { - _keyManager->startMonitoring(service); + _getKeyManagerCopy()->startMonitoring(service); } void LogicalTimeValidator::shutDown() { - _keyManager->stopMonitoring(); + _getKeyManagerCopy()->stopMonitoring(); } void LogicalTimeValidator::enableKeyGenerator(OperationContext* opCtx, bool doEnable) { - _keyManager->enableKeyGenerator(opCtx, doEnable); + _getKeyManagerCopy()->enableKeyGenerator(opCtx, doEnable); } bool LogicalTimeValidator::isAuthorizedToAdvanceClock(OperationContext* opCtx) { @@ -183,11 +188,38 @@ bool LogicalTimeValidator::isAuthorizedToAdvanceClock(OperationContext* opCtx) { } bool LogicalTimeValidator::shouldGossipLogicalTime() { - return _keyManager->hasSeenKeys(); + return _getKeyManagerCopy()->hasSeenKeys(); } void LogicalTimeValidator::forceKeyRefreshNow(OperationContext* opCtx) { - _keyManager->refreshNow(opCtx); + _getKeyManagerCopy()->refreshNow(opCtx); +} + +void LogicalTimeValidator::resetKeyManagerCache(ServiceContext* service) { + log() << "XXX resetting key manager cache"; + if (auto keyManager = _getKeyManagerCopy()) { + keyManager->stopMonitoring(); + keyManager->startMonitoring(service); + _lastSeenValidTime = SignedLogicalTime(); + _timeProofService.resetCache(); + } +} + +void LogicalTimeValidator::resetKeyManager() { + log() << "XXX resetting key manager"; + stdx::lock_guard<stdx::mutex> lk(_mutexKeyManager); + if (_keyManager) { + _keyManager->stopMonitoring(); + _keyManager.reset(); + _lastSeenValidTime = SignedLogicalTime(); + _timeProofService.resetCache(); + } +} + +std::shared_ptr<KeysCollectionManagerSharding> LogicalTimeValidator::_getKeyManagerCopy() { + stdx::lock_guard<stdx::mutex> lk(_mutexKeyManager); + invariant(_keyManager); + return _keyManager; } } // namespace mongo diff --git a/src/mongo/db/logical_time_validator.h b/src/mongo/db/logical_time_validator.h index dd171c5de2f..43d1c1c2678 100644 --- a/src/mongo/db/logical_time_validator.h +++ b/src/mongo/db/logical_time_validator.h @@ -106,10 +106,28 @@ public: */ void forceKeyRefreshNow(OperationContext* opCtx); + /** + * Reset the key manager to prevent the former members of standalone replica set to use old + * keys with sharded cluster. + */ + void resetKeyManager(); + + /** + * Reset the key manager cache of keys. + */ + void resetKeyManagerCache(ServiceContext* service); + private: + /** + * Returns the copy of the _keyManager to work when its reset by resetKeyManager call. + */ + std::shared_ptr<KeysCollectionManagerSharding> _getKeyManagerCopy(); + + SignedLogicalTime _getProof(const KeysCollectionDocument& keyDoc, LogicalTime newTime); - stdx::mutex _mutex; + stdx::mutex _mutex; // protects _lastSeenValidTime + stdx::mutex _mutexKeyManager; // protects _keyManager SignedLogicalTime _lastSeenValidTime; TimeProofService _timeProofService; std::shared_ptr<KeysCollectionManagerSharding> _keyManager; diff --git a/src/mongo/db/logical_time_validator_test.cpp b/src/mongo/db/logical_time_validator_test.cpp index 8292ee75db4..2f029d272c9 100644 --- a/src/mongo/db/logical_time_validator_test.cpp +++ b/src/mongo/db/logical_time_validator_test.cpp @@ -29,6 +29,7 @@ #include "mongo/platform/basic.h" #include "mongo/bson/timestamp.h" +#include "mongo/db/keys_collection_client_sharded.h" #include "mongo/db/keys_collection_manager.h" #include "mongo/db/keys_collection_manager_sharding.h" #include "mongo/db/logical_clock.h" @@ -66,13 +67,14 @@ protected: auto clockSource = stdx::make_unique<ClockSourceMock>(); operationContext()->getServiceContext()->setFastClockSource(std::move(clockSource)); - auto catalogClient = Grid::get(operationContext())->catalogClient(); + auto catalogClient = stdx::make_unique<KeysCollectionClientSharded>( + Grid::get(operationContext())->catalogClient()); const LogicalTime currentTime(LogicalTime(Timestamp(1, 0))); LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(currentTime); - _keyManager = - std::make_shared<KeysCollectionManagerSharding>("dummy", catalogClient, Seconds(1000)); + _keyManager = std::make_shared<KeysCollectionManagerSharding>( + "dummy", std::move(catalogClient), Seconds(1000)); _validator = stdx::make_unique<LogicalTimeValidator>(_keyManager); _validator->init(operationContext()->getServiceContext()); } diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp index 17bc41419c7..9b034fcb566 100644 --- a/src/mongo/db/read_concern.cpp +++ b/src/mongo/db/read_concern.cpp @@ -82,6 +82,8 @@ Status makeNoopWriteIfNeeded(OperationContext* opCtx, LogicalTime clusterTime) { auto shardingState = ShardingState::get(opCtx); // standalone replica set, so there is no need to advance the OpLog on the primary. if (!shardingState->enabled()) { + log() << "XXX: attempting to make a write for clusterTIme: " << clusterTime + << " but is in standalone RS"; return Status::OK(); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 674ec4d5940..2e99b48c2e8 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -657,16 +657,14 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() { ShardingState::get(_service)->markCollectionsNotShardedAtStepdown(); - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - if (auto validator = LogicalTimeValidator::get(_service)) { - auto opCtx = cc().getOperationContext(); + if (auto validator = LogicalTimeValidator::get(_service)) { + auto opCtx = cc().getOperationContext(); - if (opCtx != nullptr) { - validator->enableKeyGenerator(opCtx, false); - } else { - auto opCtxPtr = cc().makeOperationContext(); - validator->enableKeyGenerator(opCtxPtr.get(), false); - } + if (opCtx != nullptr) { + validator->enableKeyGenerator(opCtx, false); + } else { + auto opCtxPtr = cc().makeOperationContext(); + validator->enableKeyGenerator(opCtxPtr.get(), false); } } } @@ -743,6 +741,10 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook CatalogCacheLoader::get(_service).onStepUp(); PeriodicBalancerSettingsRefresher::get(_service)->start(); ShardingState::get(_service)->initiateChunkSplitter(); + } else { // unsharded + if (auto validator = LogicalTimeValidator::get(_service)) { + validator->enableKeyGenerator(opCtx, true); + } } SessionCatalog::get(_service)->onStepUp(opCtx); diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index c3a192c8d7a..c08fd9e6ced 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -33,6 +33,7 @@ #include "mongo/db/repl/rollback_impl.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/logical_time_validator.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_process.h" @@ -131,6 +132,11 @@ Status RollbackImpl::runRollback(OperationContext* opCtx) { // At this point these functions need to always be called before returning, even on failure. // These functions fassert on failure. ON_BLOCK_EXIT([this, opCtx] { + auto validator = LogicalTimeValidator::get(opCtx); + if (validator) { + validator->resetKeyManagerCache(opCtx->getClient()->getServiceContext()); + } + _checkShardIdentityRollback(opCtx); _resetSessions(opCtx); _transitionFromRollbackToSecondary(opCtx); diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index 50bff361b30..116ce540a41 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -37,6 +37,7 @@ #include "mongo/client/remote_command_targeter.h" #include "mongo/client/remote_command_targeter_factory_impl.h" #include "mongo/db/logical_time_metadata_hook.h" +#include "mongo/db/logical_time_validator.h" #include "mongo/db/operation_context.h" #include "mongo/db/s/read_only_catalog_cache_loader.h" #include "mongo/db/s/shard_server_catalog_cache_loader.h" @@ -88,18 +89,22 @@ Status initializeGlobalShardingStateForMongod(OperationContext* opCtx, auto shardFactory = stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory)); + auto service = opCtx->getServiceContext(); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { if (storageGlobalParams.readOnly) { - CatalogCacheLoader::set(opCtx->getServiceContext(), - stdx::make_unique<ReadOnlyCatalogCacheLoader>()); + CatalogCacheLoader::set(service, stdx::make_unique<ReadOnlyCatalogCacheLoader>()); } else { - CatalogCacheLoader::set(opCtx->getServiceContext(), + CatalogCacheLoader::set(service, stdx::make_unique<ShardServerCatalogCacheLoader>( stdx::make_unique<ConfigServerCatalogCacheLoader>())); } } else { - CatalogCacheLoader::set(opCtx->getServiceContext(), - stdx::make_unique<ConfigServerCatalogCacheLoader>()); + CatalogCacheLoader::set(service, stdx::make_unique<ConfigServerCatalogCacheLoader>()); + } + + auto validator = LogicalTimeValidator::get(service); + if (validator) { // The keyManager may be existing if the node was a part of a standalone RS. + validator->resetKeyManager(); } return initializeGlobalShardingState( diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index b33b3aeeb4b..6fa9fa96157 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -273,28 +273,6 @@ public: virtual StorageEngine* getGlobalStorageEngine() = 0; // - // Key manager, for HMAC keys. - // - - /** - * Sets the key manager on this service context. - */ - void setKeyManager(std::shared_ptr<KeysCollectionManager> keyManager) & { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _keyManager = std::move(keyManager); - } - - /** - * Returns a pointer to the keys collection manager owned by this service context. - */ - std::shared_ptr<KeysCollectionManager> getKeyManager() & { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _keyManager; - } - - std::shared_ptr<KeysCollectionManager> getKeyManager() && = delete; - - // // Global operation management. This may not belong here and there may be too many methods // here. // @@ -483,11 +461,6 @@ private: virtual std::unique_ptr<OperationContext> _newOpCtx(Client* client, unsigned opId) = 0; /** - * The key manager. - */ - std::shared_ptr<KeysCollectionManager> _keyManager; - - /** * The periodic runner. */ std::unique_ptr<PeriodicRunner> _runner; diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index 3bd917f6d02..5cc7dd8eafb 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -265,21 +265,20 @@ void appendReplyMetadata(OperationContext* opCtx, rpc::ShardingMetadata(lastOpTimeFromClient, replCoord->getElectionId()) .writeToMetadata(metadataBob) .transitional_ignore(); - if (serverGlobalParams.featureCompatibility.version.load() == - ServerGlobalParams::FeatureCompatibility::Version::k36) { - if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) { - // No need to sign cluster times for internal clients. - SignedLogicalTime currentTime(LogicalClock::get(opCtx)->getClusterTime(), - TimeProofService::TimeProof(), - 0); - rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); - logicalTimeMetadata.writeToMetadata(metadataBob); - } else if (auto validator = LogicalTimeValidator::get(opCtx)) { - auto currentTime = - validator->trySignLogicalTime(LogicalClock::get(opCtx)->getClusterTime()); - rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); - logicalTimeMetadata.writeToMetadata(metadataBob); - } + } + if (serverGlobalParams.featureCompatibility.version.load() == + ServerGlobalParams::FeatureCompatibility::Version::k36) { + if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) { + // No need to sign cluster times for internal clients. + SignedLogicalTime currentTime( + LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0); + rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); + logicalTimeMetadata.writeToMetadata(metadataBob); + } else if (auto validator = LogicalTimeValidator::get(opCtx)) { + auto currentTime = + validator->trySignLogicalTime(LogicalClock::get(opCtx)->getClusterTime()); + rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); + logicalTimeMetadata.writeToMetadata(metadataBob); } } } diff --git a/src/mongo/db/time_proof_service.cpp b/src/mongo/db/time_proof_service.cpp index 19cf095acd4..078a82da135 100644 --- a/src/mongo/db/time_proof_service.cpp +++ b/src/mongo/db/time_proof_service.cpp @@ -80,4 +80,11 @@ Status TimeProofService::checkProof(LogicalTime time, const TimeProof& proof, co return Status::OK(); } +void TimeProofService::resetCache() { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + if (_cache) { + _cache = boost::none; + } +} + } // namespace mongo diff --git a/src/mongo/db/time_proof_service.h b/src/mongo/db/time_proof_service.h index 0a981632d09..185a2394302 100644 --- a/src/mongo/db/time_proof_service.h +++ b/src/mongo/db/time_proof_service.h @@ -63,6 +63,11 @@ public: */ Status checkProof(LogicalTime time, const TimeProof& proof, const Key& key); + /** + * Resets the cache. + */ + void resetCache(); + private: /** * Nested class to cache TimeProof. It holds proof for the greatest time allowed. diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index f229f6baf66..66b28202403 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -37,6 +37,7 @@ #include "mongo/base/status.h" #include "mongo/client/remote_command_targeter_factory_impl.h" #include "mongo/db/audit.h" +#include "mongo/db/keys_collection_client_sharded.h" #include "mongo/db/keys_collection_manager.h" #include "mongo/db/keys_collection_manager_sharding.h" #include "mongo/db/logical_clock.h" @@ -98,9 +99,6 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolRefreshRequirement MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolRefreshTimeoutMS, int, ConnectionPool::kDefaultRefreshTimeout.count()); -MONGO_EXPORT_STARTUP_SERVER_PARAMETER(KeysRotationIntervalSec, - int, - KeysCollectionManager::kKeyValidInterval.count()); namespace { @@ -110,8 +108,6 @@ using executor::TaskExecutorPool; using executor::ThreadPoolTaskExecutor; static constexpr auto kRetryInterval = Seconds{2}; -const std::string kKeyManagerPurposeString = "HMAC"; -const Seconds kKeyValidInterval(3 * 30 * 24 * 60 * 60); // ~3 months std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service, StringData distLockProcessId) { @@ -222,13 +218,16 @@ Status initializeGlobalShardingState(OperationContext* opCtx, // The catalog client must be started after the shard registry has been started up grid->catalogClient()->startup(); + auto keysCollectionClient = + stdx::make_unique<KeysCollectionClientSharded>(grid->catalogClient()); auto keyManager = std::make_shared<KeysCollectionManagerSharding>( - kKeyManagerPurposeString, grid->catalogClient(), Seconds(KeysRotationIntervalSec)); + KeysCollectionManager::kKeyManagerPurposeString, + std::move(keysCollectionClient), + Seconds(KeysRotationIntervalSec)); keyManager->startMonitoring(opCtx->getServiceContext()); LogicalTimeValidator::set(opCtx->getServiceContext(), stdx::make_unique<LogicalTimeValidator>(keyManager)); - opCtx->getServiceContext()->setKeyManager(keyManager); auto replCoord = repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext()); if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer && |