From f676daac201f886a3994c545f9bb8636e7ed15a0 Mon Sep 17 00:00:00 2001 From: Jack Mulrow Date: Mon, 8 Feb 2021 15:42:26 +0000 Subject: SERVER-53406 Add TTL for external keys --- src/mongo/db/catalog/database_impl.cpp | 1 - src/mongo/db/keys_collection_cache.cpp | 11 +- src/mongo/db/keys_collection_cache_test.cpp | 205 +++++++++++++++++++-- src/mongo/db/keys_collection_client_direct.h | 2 +- src/mongo/db/keys_collection_document.idl | 5 +- .../db/keys_collection_manager_sharding_test.cpp | 64 +++++-- src/mongo/db/namespace_string.cpp | 6 +- src/mongo/db/repl/repl_server_parameters.idl | 11 +- .../db/repl/tenant_migration_donor_service.cpp | 65 ++++++- src/mongo/db/repl/tenant_migration_donor_service.h | 19 +- .../db/repl/tenant_migration_recipient_service.cpp | 16 +- .../db/repl/tenant_migration_recipient_service.h | 2 +- src/mongo/db/repl/tenant_migration_util.cpp | 74 +++++++- src/mongo/db/repl/tenant_migration_util.h | 19 +- 14 files changed, 428 insertions(+), 72 deletions(-) (limited to 'src/mongo/db') diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp index ea5ae5568b9..061962745b3 100644 --- a/src/mongo/db/catalog/database_impl.cpp +++ b/src/mongo/db/catalog/database_impl.cpp @@ -353,7 +353,6 @@ Status DatabaseImpl::dropCollection(OperationContext* opCtx, } else if (!(nss.isSystemDotViews() || nss.isHealthlog() || nss == NamespaceString::kLogicalSessionsNamespace || nss == NamespaceString::kKeysCollectionNamespace || - nss == NamespaceString::kExternalKeysCollectionNamespace || nss.isTemporaryReshardingCollection())) { return Status(ErrorCodes::IllegalOperation, str::stream() << "can't drop system collection " << nss); diff --git a/src/mongo/db/keys_collection_cache.cpp b/src/mongo/db/keys_collection_cache.cpp index 407e3168f37..1202a118164 100644 --- a/src/mongo/db/keys_collection_cache.cpp +++ b/src/mongo/db/keys_collection_cache.cpp @@ -120,6 +120,11 @@ Status KeysCollectionCache::_refreshExternalKeys(OperationContext* opCtx) { auto& newKeys = refreshStatus.getValue(); + std::multimap newExternalKeysCache; + for (auto&& key : newKeys) { + newExternalKeysCache.emplace(key.getKeyId(), std::move(key)); + } + stdx::lock_guard lk(_cacheMutex); if (originalSize > _externalKeysCache.size()) { // _externalKeysCache cleared while we were getting the new keys, just return so the next @@ -127,9 +132,9 @@ Status KeysCollectionCache::_refreshExternalKeys(OperationContext* opCtx) { return Status::OK(); } - for (auto&& key : newKeys) { - _externalKeysCache.emplace(key.getKeyId(), std::move(key)); - } + // Replace the cached keys with the newly loaded ones. Note because all external keys are loaded + // when refreshing them, this will remove keys that have been deleted from the collection. + std::swap(_externalKeysCache, newExternalKeysCache); return Status::OK(); } diff --git a/src/mongo/db/keys_collection_cache_test.cpp b/src/mongo/db/keys_collection_cache_test.cpp index c5818ba2e22..02204642c5a 100644 --- a/src/mongo/db/keys_collection_cache_test.cpp +++ b/src/mongo/db/keys_collection_cache_test.cpp @@ -29,6 +29,7 @@ #include "mongo/platform/basic.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" #include "mongo/db/keys_collection_cache.h" @@ -36,8 +37,10 @@ #include "mongo/db/keys_collection_client_sharded.h" #include "mongo/db/keys_collection_document_gen.h" #include "mongo/db/operation_context.h" +#include "mongo/db/ops/write_ops.h" #include "mongo/db/s/config/config_server_test_fixture.h" #include "mongo/db/time_proof_service.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/grid.h" #include "mongo/unittest/unittest.h" #include "mongo/util/clock_source_mock.h" @@ -73,6 +76,47 @@ protected: ASSERT_EQ(0, updateResult.numDocsModified); } + void deleteDocument(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& filter) { + auto cmdObj = [&] { + write_ops::Delete deleteOp(nss); + deleteOp.setDeletes({[&] { + write_ops::DeleteOpEntry entry; + entry.setQ(filter); + entry.setMulti(false); + return entry; + }()}); + return deleteOp.toBSON({}); + }(); + + DBDirectClient client(opCtx); + BSONObj result; + client.runCommand(nss.db().toString(), cmdObj, result); + ASSERT_OK(getStatusFromWriteCommandReply(result)); + } + + void updateDocument(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& filter, + const BSONObj& update) { + auto cmdObj = [&] { + write_ops::Update updateOp(nss); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(filter); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update)); + return entry; + }()}); + return updateOp.toBSON({}); + }(); + + DBDirectClient client(opCtx); + BSONObj result; + client.runCommand(nss.db().toString(), cmdObj, result); + ASSERT_OK(getStatusFromWriteCommandReply(result)); + } + private: std::unique_ptr _catalogClient; std::unique_ptr _directClient; @@ -169,9 +213,6 @@ TEST_F(CacheTest, GetKeyShouldReturnCorrectKeyAfterRefreshSharded) { TEST_F(CacheTest, GetKeyShouldReturnCorrectKeysAfterRefreshDirectClient) { KeysCollectionCache cache("test", directClient()); - const auto externalKeysTTLExpiresAt = - ServiceContext().getFastClockSource()->now() + Seconds(30); - KeysCollectionDocument origKey0(1); origKey0.setKeysCollectionDocumentBase( {"test", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(105, 0))}); @@ -180,13 +221,14 @@ TEST_F(CacheTest, GetKeyShouldReturnCorrectKeysAfterRefreshDirectClient) { // Use external keys with the same keyId and expiresAt as the internal key to test that the // cache correctly tackles key collisions. - ExternalKeysCollectionDocument origKey1(OID::gen(), 1, kMigrationId1, externalKeysTTLExpiresAt); + ExternalKeysCollectionDocument origKey1(OID::gen(), 1, kMigrationId1); origKey1.setKeysCollectionDocumentBase( {"test", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(105, 0))}); + origKey1.setTTLExpiresAt(ServiceContext().getFastClockSource()->now() + Seconds(30)); insertDocument( operationContext(), NamespaceString::kExternalKeysCollectionNamespace, origKey1.toBSON()); - ExternalKeysCollectionDocument origKey2(OID::gen(), 1, kMigrationId2, externalKeysTTLExpiresAt); + ExternalKeysCollectionDocument origKey2(OID::gen(), 1, kMigrationId2); origKey2.setKeysCollectionDocumentBase( {"test", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(205, 0))}); insertDocument( @@ -246,7 +288,12 @@ TEST_F(CacheTest, GetKeyShouldReturnCorrectKeysAfterRefreshDirectClient) { ASSERT_EQ(expectedKey.getId(), key.getId()); ASSERT_EQ(expectedKey.getPurpose(), key.getPurpose()); ASSERT_EQ(expectedKey.getExpiresAt().asTimestamp(), key.getExpiresAt().asTimestamp()); - ASSERT_EQ(expectedKey.getTTLExpiresAt(), key.getTTLExpiresAt()); + if (expectedKey.getTTLExpiresAt()) { + ASSERT_EQ(*expectedKey.getTTLExpiresAt(), *key.getTTLExpiresAt()); + } else { + ASSERT(!expectedKey.getTTLExpiresAt()), key.getTTLExpiresAt(); + ASSERT(!key.getTTLExpiresAt()); + } } } @@ -261,7 +308,7 @@ TEST_F(CacheTest, GetKeyShouldReturnCorrectKeysAfterRefreshDirectClient) { ASSERT_EQ(origKey2.getId(), key.getId()); ASSERT_EQ(origKey2.getPurpose(), key.getPurpose()); ASSERT_EQ(origKey2.getExpiresAt().asTimestamp(), key.getExpiresAt().asTimestamp()); - ASSERT_EQ(origKey2.getTTLExpiresAt(), key.getTTLExpiresAt()); + ASSERT(!key.getTTLExpiresAt()); } swExternalKeys = cache.getExternalKeysById(1, LogicalTime(Timestamp(300, 0))); @@ -391,10 +438,7 @@ TEST_F(CacheTest, RefreshShouldNotGetExternalKeysForOtherPurpose) { insertDocument( operationContext(), NamespaceString::kKeysCollectionNamespace, origKey0.toBSON()); - const auto externalKeysTTLExpiresAt = - ServiceContext().getFastClockSource()->now() + Seconds(30); - - ExternalKeysCollectionDocument origKey1(OID::gen(), 1, kMigrationId1, externalKeysTTLExpiresAt); + ExternalKeysCollectionDocument origKey1(OID::gen(), 1, kMigrationId1); origKey1.setKeysCollectionDocumentBase( {"dummy", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(105, 0))}); insertDocument( @@ -408,7 +452,7 @@ TEST_F(CacheTest, RefreshShouldNotGetExternalKeysForOtherPurpose) { ASSERT_EQ(ErrorCodes::KeyNotFound, swKey.getStatus()); } - ExternalKeysCollectionDocument origKey2(OID::gen(), 2, kMigrationId1, externalKeysTTLExpiresAt); + ExternalKeysCollectionDocument origKey2(OID::gen(), 2, kMigrationId1); origKey2.setKeysCollectionDocumentBase( {"test", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(110, 0))}); insertDocument( @@ -430,7 +474,6 @@ TEST_F(CacheTest, RefreshShouldNotGetExternalKeysForOtherPurpose) { ASSERT_EQ(origKey2.getKey(), key.getKey()); ASSERT_EQ("test", key.getPurpose()); ASSERT_EQ(Timestamp(110, 0), key.getExpiresAt().asTimestamp()); - ASSERT_EQ(externalKeysTTLExpiresAt, key.getTTLExpiresAt()); } } @@ -494,15 +537,13 @@ TEST_F(CacheTest, RefreshCanIncrementallyGetNewKeys) { } TEST_F(CacheTest, CacheExternalKeyBasic) { - const auto externalKeysTTLExpiresAt = - ServiceContext().getFastClockSource()->now() + Seconds(30); KeysCollectionCache cache("test", catalogClient()); auto swExternalKeys = cache.getExternalKeysById(5, LogicalTime(Timestamp(10, 1))); ASSERT_EQ(ErrorCodes::KeyNotFound, swExternalKeys.getStatus()); - ExternalKeysCollectionDocument externalKey( - OID::gen(), 5, kMigrationId1, externalKeysTTLExpiresAt); + ExternalKeysCollectionDocument externalKey(OID::gen(), 5, kMigrationId1); + externalKey.setTTLExpiresAt(ServiceContext().getFastClockSource()->now() + Seconds(30)); externalKey.setKeysCollectionDocumentBase( {"test", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(100, 0))}); @@ -516,7 +557,135 @@ TEST_F(CacheTest, CacheExternalKeyBasic) { ASSERT_EQ(externalKey.getId(), cachedKey.getId()); ASSERT_EQ(externalKey.getPurpose(), cachedKey.getPurpose()); ASSERT_EQ(externalKey.getExpiresAt().asTimestamp(), cachedKey.getExpiresAt().asTimestamp()); - ASSERT_EQ(externalKey.getTTLExpiresAt(), cachedKey.getTTLExpiresAt()); + ASSERT_EQ(*externalKey.getTTLExpiresAt(), *cachedKey.getTTLExpiresAt()); +} + +TEST_F(CacheTest, RefreshClearsRemovedExternalKeys) { + KeysCollectionCache cache("test", directClient()); + + KeysCollectionDocument origKey0(1); + origKey0.setKeysCollectionDocumentBase( + {"test", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(105, 0))}); + insertDocument( + operationContext(), NamespaceString::kKeysCollectionNamespace, origKey0.toBSON()); + + ExternalKeysCollectionDocument origKey1(OID::gen(), 1, kMigrationId1); + origKey1.setKeysCollectionDocumentBase( + {"test", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(105, 0))}); + origKey1.setTTLExpiresAt(ServiceContext().getFastClockSource()->now() + Seconds(30)); + insertDocument( + operationContext(), NamespaceString::kExternalKeysCollectionNamespace, origKey1.toBSON()); + + ExternalKeysCollectionDocument origKey2(OID::gen(), 1, kMigrationId2); + origKey2.setKeysCollectionDocumentBase( + {"test", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(205, 0))}); + insertDocument( + operationContext(), NamespaceString::kExternalKeysCollectionNamespace, origKey2.toBSON()); + + // After a refresh, both keys should be in the cache. + { + auto refreshStatus = cache.refresh(operationContext()); + ASSERT_OK(refreshStatus.getStatus()); + + auto swExternalKeys = cache.getExternalKeysById(1, LogicalTime(Timestamp(1, 0))); + ASSERT_OK(swExternalKeys.getStatus()); + ASSERT_EQ(2, swExternalKeys.getValue().size()); + } + + // After a key is deleted from the underlying collection, the next refresh should remove it from + // the cache. + deleteDocument( + operationContext(), NamespaceString::kExternalKeysCollectionNamespace, origKey1.toBSON()); + + // The key is still cached until refresh. + { + auto swExternalKeys = cache.getExternalKeysById(1, LogicalTime(Timestamp(1, 0))); + ASSERT_OK(swExternalKeys.getStatus()); + ASSERT_EQ(2, swExternalKeys.getValue().size()); + } + + { + auto refreshStatus = cache.refresh(operationContext()); + ASSERT_OK(refreshStatus.getStatus()); + + // Now the key is no longer cached. + auto swExternalKeys = cache.getExternalKeysById(1, LogicalTime(Timestamp(1, 0))); + ASSERT_OK(swExternalKeys.getStatus()); + ASSERT_EQ(1, swExternalKeys.getValue().size()); + auto key = swExternalKeys.getValue().front(); + + ASSERT_EQ(origKey2.getId(), key.getId()); + ASSERT_EQ(origKey2.getPurpose(), key.getPurpose()); + ASSERT_EQ(origKey2.getExpiresAt().asTimestamp(), key.getExpiresAt().asTimestamp()); + ASSERT(!key.getTTLExpiresAt()); + } + + // Remove the final key and the external keys cache should be empty. + deleteDocument( + operationContext(), NamespaceString::kExternalKeysCollectionNamespace, origKey2.toBSON()); + + { + auto refreshStatus = cache.refresh(operationContext()); + ASSERT_OK(refreshStatus.getStatus()); + + auto swExternalKeys = cache.getExternalKeysById(1, LogicalTime(Timestamp(1, 0))); + ASSERT_EQ(ErrorCodes::KeyNotFound, swExternalKeys.getStatus()); + } +} + +TEST_F(CacheTest, RefreshHandlesKeysReceivingTTLValue) { + KeysCollectionCache cache("test", directClient()); + + KeysCollectionDocument origKey0(1); + origKey0.setKeysCollectionDocumentBase( + {"test", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(105, 0))}); + insertDocument( + operationContext(), NamespaceString::kKeysCollectionNamespace, origKey0.toBSON()); + + ExternalKeysCollectionDocument origKey1(OID::gen(), 1, kMigrationId1); + origKey1.setKeysCollectionDocumentBase( + {"test", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(105, 0))}); + insertDocument( + operationContext(), NamespaceString::kExternalKeysCollectionNamespace, origKey1.toBSON()); + + // Refresh and the external key should be in the cache. + { + auto refreshStatus = cache.refresh(operationContext()); + ASSERT_OK(refreshStatus.getStatus()); + + auto swExternalKeys = cache.getExternalKeysById(1, LogicalTime(Timestamp(1, 0))); + ASSERT_OK(swExternalKeys.getStatus()); + ASSERT_EQ(1, swExternalKeys.getValue().size()); + auto key = swExternalKeys.getValue().front(); + + ASSERT_EQ(origKey1.getId(), key.getId()); + ASSERT_EQ(origKey1.getPurpose(), key.getPurpose()); + ASSERT_EQ(origKey1.getExpiresAt().asTimestamp(), key.getExpiresAt().asTimestamp()); + ASSERT(!key.getTTLExpiresAt()); + } + + origKey1.setTTLExpiresAt(ServiceContext().getFastClockSource()->now() + Seconds(30)); + updateDocument(operationContext(), + NamespaceString::kExternalKeysCollectionNamespace, + BSON(ExternalKeysCollectionDocument::kIdFieldName << origKey1.getId()), + origKey1.toBSON()); + + // Refresh and the external key should have been updated. + { + auto refreshStatus = cache.refresh(operationContext()); + ASSERT_OK(refreshStatus.getStatus()); + + auto swExternalKeys = cache.getExternalKeysById(1, LogicalTime(Timestamp(1, 0))); + ASSERT_OK(swExternalKeys.getStatus()); + ASSERT_EQ(1, swExternalKeys.getValue().size()); + auto key = swExternalKeys.getValue().front(); + + ASSERT_EQ(origKey1.getId(), key.getId()); + ASSERT_EQ(origKey1.getPurpose(), key.getPurpose()); + ASSERT_EQ(origKey1.getExpiresAt().asTimestamp(), key.getExpiresAt().asTimestamp()); + ASSERT(key.getTTLExpiresAt()); + ASSERT_EQ(*origKey1.getTTLExpiresAt(), *key.getTTLExpiresAt()); + } } } // namespace diff --git a/src/mongo/db/keys_collection_client_direct.h b/src/mongo/db/keys_collection_client_direct.h index c166a964760..ceb321f405d 100644 --- a/src/mongo/db/keys_collection_client_direct.h +++ b/src/mongo/db/keys_collection_client_direct.h @@ -55,7 +55,7 @@ public: bool useMajority) override; /** - * Returns all keys in admin.system.external_validation_keys that match the given purpose. + * Returns all keys in config.external_validation_keys that match the given purpose. */ StatusWith> getAllExternalKeys( OperationContext* opCtx, StringData purpose) override; diff --git a/src/mongo/db/keys_collection_document.idl b/src/mongo/db/keys_collection_document.idl index e2914519c83..671652ecdf5 100644 --- a/src/mongo/db/keys_collection_document.idl +++ b/src/mongo/db/keys_collection_document.idl @@ -46,7 +46,7 @@ structs: keysCollectionDocumentBase: description: >- Contains the fields shared by key documents stored in admin.system.keys and - admin.system.external_validation_keys. + config.external_validation_keys. strict: true fields: purpose: @@ -75,7 +75,7 @@ structs: externalKeysCollectionDocument: description: >- - Represents a key document stored in admin.system.external_validation_keys. + Represents a key document stored in config.external_validation_keys. strict: true inline_chained_structs: true chained_structs: @@ -100,3 +100,4 @@ structs: The wall-clock time at which this key document should be removed by the TTL monitor. cpp_name: TTLExpiresAt + optional: true diff --git a/src/mongo/db/keys_collection_manager_sharding_test.cpp b/src/mongo/db/keys_collection_manager_sharding_test.cpp index 71b91bb3c1f..ba22b2c0481 100644 --- a/src/mongo/db/keys_collection_manager_sharding_test.cpp +++ b/src/mongo/db/keys_collection_manager_sharding_test.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/jsobj.h" +#include "mongo/db/keys_collection_client_direct.h" #include "mongo/db/keys_collection_client_sharded.h" #include "mongo/db/keys_collection_document_gen.h" #include "mongo/db/keys_collection_manager.h" @@ -50,9 +51,6 @@ public: } protected: - const UUID kMigrationId1 = UUID::gen(); - const UUID kMigrationId2 = UUID::gen(); - void setUp() override { ConfigServerTestFixture::setUp(); @@ -371,14 +369,50 @@ TEST_F(KeysManagerShardedTest, HasSeenKeysIsFalseUntilKeysAreFound) { ASSERT_EQ(true, keyManager()->hasSeenKeys()); } -TEST_F(KeysManagerShardedTest, CacheExternalKeyBasic) { +class KeysManagerDirectTest : public ConfigServerTestFixture { +protected: + const UUID kMigrationId1 = UUID::gen(); + const UUID kMigrationId2 = UUID::gen(); + + KeysCollectionManager* keyManager() { + return _keyManager.get(); + } + + void setUp() override { + ConfigServerTestFixture::setUp(); + + auto clockSource = std::make_unique(); + // Timestamps of "0 seconds" are not allowed, so we must advance our clock mock to the first + // real second. + clockSource->advance(Seconds(1)); + + operationContext()->getServiceContext()->setFastClockSource(std::move(clockSource)); + _keyManager = std::make_unique( + "dummy", std::make_unique(), Seconds(1)); + } + + void tearDown() override { + _keyManager->stopMonitoring(); + + ConfigServerTestFixture::tearDown(); + } + +private: + std::unique_ptr _keyManager; +}; + +TEST_F(KeysManagerDirectTest, CacheExternalKeyBasic) { keyManager()->startMonitoring(getServiceContext()); - auto externalKeysTTLExpiresAt = getServiceContext()->getFastClockSource()->now() + Seconds(30); - ExternalKeysCollectionDocument externalKey1( - OID::gen(), 1, kMigrationId1, externalKeysTTLExpiresAt); + // Refresh immediately to prevent a refresh from discovering the inserted keys. + keyManager()->refreshNow(operationContext()); + + ExternalKeysCollectionDocument externalKey1(OID::gen(), 1, kMigrationId1); externalKey1.setKeysCollectionDocumentBase( {"dummy", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(100, 0))}); + ASSERT_OK(insertToConfigCollection(operationContext(), + NamespaceString::kExternalKeysCollectionNamespace, + externalKey1.toBSON())); keyManager()->cacheExternalKey(externalKey1); @@ -393,13 +427,15 @@ TEST_F(KeysManagerShardedTest, CacheExternalKeyBasic) { ASSERT_EQ(externalKey1.getKeyId(), key.getKeyId()); ASSERT_EQ(externalKey1.getPurpose(), key.getPurpose()); - ASSERT_EQ(externalKey1.getExpiresAt().asTimestamp(), key.getExpiresAt().asTimestamp()); } } -TEST_F(KeysManagerShardedTest, WillNotCacheExternalKeyWhenMonitoringIsStopped) { +TEST_F(KeysManagerDirectTest, WillNotCacheExternalKeyWhenMonitoringIsStopped) { keyManager()->startMonitoring(getServiceContext()); + // Refresh immediately to prevent a refresh from discovering the inserted keys. + keyManager()->refreshNow(operationContext()); + // Insert an internal key so the key manager won't attempt to refresh after the refresher is // stopped. KeysCollectionDocument internalKey(1); @@ -408,11 +444,12 @@ TEST_F(KeysManagerShardedTest, WillNotCacheExternalKeyWhenMonitoringIsStopped) { ASSERT_OK(insertToConfigCollection( operationContext(), NamespaceString::kKeysCollectionNamespace, internalKey.toBSON())); - auto externalKeysTTLExpiresAt = getServiceContext()->getFastClockSource()->now() + Seconds(30); - ExternalKeysCollectionDocument externalKey1( - OID::gen(), 1, kMigrationId1, externalKeysTTLExpiresAt); + ExternalKeysCollectionDocument externalKey1(OID::gen(), 1, kMigrationId1); externalKey1.setKeysCollectionDocumentBase( {"dummy", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(100, 0))}); + ASSERT_OK(insertToConfigCollection(operationContext(), + NamespaceString::kExternalKeysCollectionNamespace, + externalKey1.toBSON())); keyManager()->cacheExternalKey(externalKey1); @@ -425,8 +462,7 @@ TEST_F(KeysManagerShardedTest, WillNotCacheExternalKeyWhenMonitoringIsStopped) { keyManager()->stopMonitoring(); - ExternalKeysCollectionDocument externalKey2( - OID::gen(), 1, kMigrationId2, externalKeysTTLExpiresAt); + ExternalKeysCollectionDocument externalKey2(OID::gen(), 1, kMigrationId2); externalKey2.setKeysCollectionDocumentBase( {"dummy", TimeProofService::generateRandomKey(), LogicalTime(Timestamp(100, 0))}); diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 4e6e598622f..ae08e4c640c 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -84,8 +84,8 @@ const NamespaceString NamespaceString::kShardConfigDatabasesNamespace(NamespaceS "cache.databases"); const NamespaceString NamespaceString::kKeysCollectionNamespace(NamespaceString::kAdminDb, "system.keys"); -const NamespaceString NamespaceString::kExternalKeysCollectionNamespace( - NamespaceString::kAdminDb, "system.external_validation_keys"); +const NamespaceString NamespaceString::kExternalKeysCollectionNamespace(NamespaceString::kConfigDb, + "external_validation_keys"); const NamespaceString NamespaceString::kRsOplogNamespace(NamespaceString::kLocalDb, "oplog.rs"); const NamespaceString NamespaceString::kSystemReplSetNamespace(NamespaceString::kLocalDb, "system.replset"); @@ -132,8 +132,6 @@ bool NamespaceString::isLegalClientSystemNS() const { return true; if (coll() == kKeysCollectionNamespace.coll()) return true; - if (coll() == kExternalKeysCollectionNamespace.coll()) - return true; if (coll() == "system.backup_users") return true; } else if (db() == kConfigDb) { diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index 0d950e4b1a6..e45e6289c72 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -382,15 +382,16 @@ server_parameters: default: expr: 15 * 60 * 1000 - tenantMigrationExternalKeysRemovalDelaySecs: + tenantMigrationExternalKeysRemovalBufferSecs: description: >- The amount of time in seconds that the donor or recipient should wait before - removing the cluster time keys cloned from each other. + removing the cluster time keys cloned from each other after receiving + donorForgetMigration or recipientForgetMigration set_at: [ startup, runtime ] cpp_vartype: AtomicWord - cpp_varname: tenantMigrationExternalKeysRemovalDelaySecs + cpp_varname: tenantMigrationExternalKeysRemovalBufferSecs default: - expr: 3 * 30 * 24 * 60 * 60 # ~3 months + expr: 60 * 60 * 24 # 24 hours tenantMigrationOplogBufferPeekCacheSize: description: >- @@ -479,4 +480,4 @@ feature_flags: If enabled, use field secondaryDelaySecs instead of slaveDelay in MemberConfig. cpp_varname: feature_flags::gUseSecondaryDelaySecs default: true - version: 4.9 \ No newline at end of file + version: 4.9 diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 94d6cf0a802..370a143df54 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -63,8 +63,10 @@ MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationAfterPersistingInitialDonorStateDoc) MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeLeavingAbortingIndexBuildsState); MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeLeavingBlockingState); MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeLeavingDataSyncState); +MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationDonorBeforeMarkingStateGarbageCollectable); const std::string kTTLIndexName = "TenantMigrationDonorTTLIndex"; +const std::string kExternalKeysTTLIndexName = "ExternalKeysTTLIndex"; const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly); @@ -98,7 +100,10 @@ void checkIfReceivedDonorAbortMigration(const CancelationToken& serviceToken, } // namespace -ExecutorFuture TenantMigrationDonorService::_rebuildService( +// Note this index is required on both the donor and recipient in a tenant migration, since each +// will copy cluster time keys from the other. The donor service is set up on all mongods on stepup +// to primary, so this index will be created on both donors and recipients. +ExecutorFuture TenantMigrationDonorService::createStateDocumentTTLIndex( std::shared_ptr executor, const CancelationToken& token) { return AsyncTry([this] { auto nss = getStateDocumentsNS(); @@ -123,10 +128,45 @@ ExecutorFuture TenantMigrationDonorService::_rebuildService( .on(**executor, CancelationToken::uncancelable()); } +ExecutorFuture TenantMigrationDonorService::createExternalKeysTTLIndex( + std::shared_ptr executor, const CancelationToken& token) { + return AsyncTry([this] { + const auto nss = NamespaceString::kExternalKeysCollectionNamespace; + + AllowOpCtxWhenServiceRebuildingBlock allowOpCtxBlock(Client::getCurrent()); + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + DBDirectClient client(opCtx); + + BSONObj result; + client.runCommand( + nss.db().toString(), + BSON("createIndexes" + << nss.coll().toString() << "indexes" + << BSON_ARRAY(BSON("key" << BSON("ttlExpiresAt" << 1) << "name" + << kExternalKeysTTLIndexName + << "expireAfterSeconds" << 0))), + result); + uassertStatusOK(getStatusFromCommandResult(result)); + }) + .until([token](Status status) { return shouldStopCreatingTTLIndex(status, token); }) + .withBackoffBetweenIterations(kExponentialBackoff) + .on(**executor, CancelationToken::uncancelable()); +} + +ExecutorFuture TenantMigrationDonorService::_rebuildService( + std::shared_ptr executor, const CancelationToken& token) { + return createStateDocumentTTLIndex(executor, token).then([this, executor, token] { + return createExternalKeysTTLIndex(executor, token); + }); +} + TenantMigrationDonorService::Instance::Instance(ServiceContext* const serviceContext, + const TenantMigrationDonorService* donorService, const BSONObj& initialState) : repl::PrimaryOnlyService::TypedInstance(), _serviceContext(serviceContext), + _donorService(donorService), _stateDoc(tenant_migration_access_blocker::parseDonorStateDocument(initialState)), _instanceName(kServiceName + "-" + _stateDoc.getTenantId()), _recipientUri( @@ -355,7 +395,7 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs const auto& data = dataStatus.getValue(); for (const BSONObj& doc : data.documents) { keyDocs.push_back(tenant_migration_util::makeExternalClusterTimeKeyDoc( - _serviceContext, _stateDoc.getId(), doc.getOwned())); + _stateDoc.getId(), doc.getOwned())); } fetchStatus = Status::OK(); @@ -549,6 +589,8 @@ TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable( auto opCtxHolder = cc().makeOperationContext(); auto opCtx = opCtxHolder.get(); + pauseTenantMigrationDonorBeforeMarkingStateGarbageCollectable.pauseWhileSet(opCtx); + AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX); writeConflictRetry( @@ -722,6 +764,10 @@ SemiFuture TenantMigrationDonorService::Instance::run( }); }) .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { + if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kAbortingIndexBuilds) { + return ExecutorFuture(**executor, Status::OK()); + } + checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token()); return _fetchAndStoreRecipientClusterTimeKeyDocs( @@ -944,6 +990,18 @@ SemiFuture TenantMigrationDonorService::Instance::run( return _sendRecipientForgetMigrationCommand( executor, recipientTargeterRS, serviceToken); }) + .then([this, self = shared_from_this(), executor, serviceToken] { + // Note marking the keys as garbage collectable is not atomic with marking the + // state document garbage collectable, so an interleaved failover can lead the + // keys to be deleted before the state document has an expiration date. This is + // acceptable because the decision to forget a migration is not reversible. + return tenant_migration_util::markExternalKeysAsGarbageCollectable( + _serviceContext, + executor, + _donorService->getInstanceCleanupExecutor(), + _stateDoc.getId(), + serviceToken); + }) .then([this, self = shared_from_this(), executor, serviceToken] { return _markStateDocAsGarbageCollectable(executor, serviceToken); }) @@ -955,7 +1013,8 @@ SemiFuture TenantMigrationDonorService::Instance::run( LOGV2(4920400, "Marked migration state as garbage collectable", "migrationId"_attr = _stateDoc.getId(), - "expireAt"_attr = _stateDoc.getExpireAt()); + "expireAt"_attr = _stateDoc.getExpireAt(), + "status"_attr = status); stdx::lock_guard lg(_mutex); if (_completionPromise.getFuture().isReady()) { diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h index df3d9072528..a2b9b8e3fc5 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.h +++ b/src/mongo/db/repl/tenant_migration_donor_service.h @@ -65,8 +65,8 @@ public: std::shared_ptr constructInstance( BSONObj initialState) const override { - return std::make_shared(_serviceContext, - initialState); + return std::make_shared( + _serviceContext, this, initialState); } class Instance final : public PrimaryOnlyService::TypedInstance { @@ -76,7 +76,9 @@ public: boost::optional abortReason; }; - explicit Instance(ServiceContext* const serviceContext, const BSONObj& initialState); + explicit Instance(ServiceContext* const serviceContext, + const TenantMigrationDonorService* donorService, + const BSONObj& initialState); ~Instance(); @@ -157,7 +159,7 @@ public: /** * Fetches all key documents from the recipient's admin.system.keys collection, stores - * them in admin.system.external_validation_keys, and refreshes the keys cache. + * them in config.external_validation_keys, and refreshes the keys cache. */ ExecutorFuture _fetchAndStoreRecipientClusterTimeKeyDocs( std::shared_ptr executor, @@ -227,7 +229,8 @@ public: return recipientCmdThreadPoolLimits; } - ServiceContext* _serviceContext; + ServiceContext* const _serviceContext; + const TenantMigrationDonorService* const _donorService; TenantMigrationDonorDocument _stateDoc; const std::string _instanceName; @@ -270,6 +273,12 @@ public: }; private: + ExecutorFuture createStateDocumentTTLIndex( + std::shared_ptr executor, const CancelationToken& token); + + ExecutorFuture createExternalKeysTTLIndex( + std::shared_ptr executor, const CancelationToken& token); + ExecutorFuture _rebuildService(std::shared_ptr executor, const CancelationToken& token) override; diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 86a01ddae99..75f23b29ba9 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -1533,8 +1533,8 @@ void TenantMigrationRecipientService::Instance::_fetchAndStoreDonorClusterTimeKe Query().readPref(_readPreference.pref, _readPreference.tags.getTagBSON())); while (cursor->more()) { const auto doc = cursor->nextSafe().getOwned(); - keyDocs.push_back(tenant_migration_util::makeExternalClusterTimeKeyDoc( - _serviceContext, _migrationUuid, doc)); + keyDocs.push_back( + tenant_migration_util::makeExternalClusterTimeKeyDoc(_migrationUuid, doc)); } tenant_migration_util::storeExternalClusterTimeKeyDocs(_scopedExecutor, std::move(keyDocs)); @@ -1947,6 +1947,18 @@ SemiFuture TenantMigrationRecipientService::Instance::run( // waiting for the recipientForgetMigration command. return _receivedRecipientForgetMigrationPromise.getFuture(); }) + .then([this, self = shared_from_this(), token] { + // Note marking the keys as garbage collectable is not atomic with marking the + // state document garbage collectable, so an interleaved failover can lead the + // keys to be deleted before the state document has an expiration date. This is + // acceptable because the decision to forget a migration is not reversible. + return tenant_migration_util::markExternalKeysAsGarbageCollectable( + _serviceContext, + _scopedExecutor, + _recipientService->getInstanceCleanupExecutor(), + _migrationUuid, + token); + }) .then([this, self = shared_from_this()] { return _markStateDocAsGarbageCollectable(); }) .then([this, self = shared_from_this()] { _stopOrHangOnFailPoint(&fpBeforeDroppingOplogBufferCollection); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index e45d1f153dc..637c7560c52 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -341,7 +341,7 @@ public: /** * Fetches all key documents from the donor's admin.system.keys collection, stores them in - * admin.system.external_validation_keys, and refreshes the keys cache. + * config.external_validation_keys, and refreshes the keys cache. */ void _fetchAndStoreDonorClusterTimeKeyDocs(const CancelationToken& token); diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp index 60a34587942..23022bec383 100644 --- a/src/mongo/db/repl/tenant_migration_util.cpp +++ b/src/mongo/db/repl/tenant_migration_util.cpp @@ -32,8 +32,11 @@ #include "mongo/bson/json.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/logical_time_validator.h" +#include "mongo/db/ops/update.h" +#include "mongo/db/ops/update_request.h" #include "mongo/db/pipeline/document_source_add_fields.h" #include "mongo/db/pipeline/document_source_graph_lookup.h" #include "mongo/db/pipeline/document_source_lookup.h" @@ -49,17 +52,15 @@ namespace mongo { namespace tenant_migration_util { -ExternalKeysCollectionDocument makeExternalClusterTimeKeyDoc(ServiceContext* serviceContext, - UUID migrationId, - BSONObj keyDoc) { +MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationBeforeMarkingExternalKeysGarbageCollectable); + +const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); + +ExternalKeysCollectionDocument makeExternalClusterTimeKeyDoc(UUID migrationId, BSONObj keyDoc) { auto originalKeyDoc = KeysCollectionDocument::parse(IDLParserErrorContext("keyDoc"), keyDoc); ExternalKeysCollectionDocument externalKeyDoc( - OID::gen(), - originalKeyDoc.getKeyId(), - migrationId, - serviceContext->getFastClockSource()->now() + - Seconds{repl::tenantMigrationExternalKeysRemovalDelaySecs.load()}); + OID::gen(), originalKeyDoc.getKeyId(), migrationId); externalKeyDoc.setKeysCollectionDocumentBase(originalKeyDoc.getKeysCollectionDocumentBase()); return externalKeyDoc; @@ -345,6 +346,63 @@ createRetryableWritesOplogFetchingPipelineForTenantMigrations( return Pipeline::create(std::move(stages), expCtx); } +bool shouldStopUpdatingExternalKeys(Status status, const CancelationToken& token) { + return status.isOK() || token.isCanceled(); +} + +ExecutorFuture markExternalKeysAsGarbageCollectable( + ServiceContext* serviceContext, + std::shared_ptr executor, + std::shared_ptr parentExecutor, + UUID migrationId, + const CancelationToken& token) { + auto ttlExpiresAt = serviceContext->getFastClockSource()->now() + + Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()} + + Seconds{repl::tenantMigrationExternalKeysRemovalBufferSecs.load()}; + return AsyncTry([executor, migrationId, ttlExpiresAt] { + return ExecutorFuture(**executor).then([migrationId, ttlExpiresAt] { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + pauseTenantMigrationBeforeMarkingExternalKeysGarbageCollectable.pauseWhileSet( + opCtx); + + const auto& nss = NamespaceString::kExternalKeysCollectionNamespace; + AutoGetCollection coll(opCtx, nss, MODE_IX); + + writeConflictRetry( + opCtx, "TenantMigrationMarkExternalKeysAsGarbageCollectable", nss.ns(), [&] { + auto request = UpdateRequest(); + request.setNamespaceString(nss); + request.setQuery( + BSON(ExternalKeysCollectionDocument::kMigrationIdFieldName + << migrationId)); + request.setUpdateModification( + write_ops::UpdateModification::parseFromClassicUpdate(BSON( + "$set" + << BSON(ExternalKeysCollectionDocument::kTTLExpiresAtFieldName + << ttlExpiresAt)))); + request.setMulti(true); + + // Note marking keys garbage collectable is not atomic with marking the + // state document garbage collectable, so after a failover this update + // may fail to match any keys if they were previously marked garbage + // collectable and deleted by the TTL monitor. Because of this we can't + // assert on the update result's numMatched or numDocsModified. + update(opCtx, coll.getDb(), request); + }); + }); + }) + .until([token](Status status) { return shouldStopUpdatingExternalKeys(status, token); }) + .withBackoffBetweenIterations(kExponentialBackoff) + // Due to the issue in SERVER-54735, using AsyncTry with a scoped executor can lead to a + // BrokenPromise error if the executor is shut down. To work around this, schedule the + // AsyncTry itself on an executor that won't shut down. + // + // TODO SERVER-54735: Stop using the parent executor here. + .on(parentExecutor, CancelationToken::uncancelable()); +} + } // namespace tenant_migration_util } // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_util.h b/src/mongo/db/repl/tenant_migration_util.h index a06ac2ef1f7..3d56479964b 100644 --- a/src/mongo/db/repl/tenant_migration_util.h +++ b/src/mongo/db/repl/tenant_migration_util.h @@ -130,21 +130,30 @@ inline Status validatePrivateKeyPEMPayload(const StringData& payload) { } /* - * Creates an ExternalKeysCollectionDocument representing an admin.system.external_validation_keys + * Creates an ExternalKeysCollectionDocument representing an config.external_validation_keys * document from the given the admin.system.keys document BSONObj. */ -ExternalKeysCollectionDocument makeExternalClusterTimeKeyDoc(ServiceContext* serviceContext, - UUID migrationId, - BSONObj keyDoc); +ExternalKeysCollectionDocument makeExternalClusterTimeKeyDoc(UUID migrationId, BSONObj keyDoc); /* * For each given ExternalKeysCollectionDocument, inserts it if there is not an existing document in - * admin.system.external_validation_keys for it with the same keyId and replicaSetName. Otherwise, + * config.external_validation_keys for it with the same keyId and replicaSetName. Otherwise, * updates the ttlExpiresAt of the existing document if it is less than the new ttlExpiresAt. */ void storeExternalClusterTimeKeyDocs(std::shared_ptr executor, std::vector keyDocs); +/** + * Sets the "ttlExpiresAt" field for the external keys so they can be garbage collected by the ttl + * monitor. + */ +ExecutorFuture markExternalKeysAsGarbageCollectable( + ServiceContext* serviceContext, + std::shared_ptr executor, + std::shared_ptr parentExecutor, + UUID migrationId, + const CancelationToken& token); + /** * Creates a view on the oplog that allows a tenant migration recipient to fetch retryable writes * and transactions from a tenant migration donor. -- cgit v1.2.1