diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/s/move_chunk_command.cpp | 55 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.h | 14 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache.h | 18 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache_loader.h | 7 | ||||
-rw-r--r-- | src/mongo/s/config_server_catalog_cache_loader.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/config_server_catalog_cache_loader.h | 3 |
10 files changed, 123 insertions, 26 deletions
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 033cde19623..979a638599e 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -142,8 +142,8 @@ public: * is added to permit (most) dependent queries on secondaries to complete, too. * * Call result.waitStatus(opCtx) to wait for the deletion to complete or fail. If that succeeds, - * call waitForClean to ensure no other deletions are pending for the range. Call - * result.abandon(), instead, to ignore the outcome. + * waitForClean can be called to ensure no other deletions are pending for the range. Call + * result.abandon(), instead of waitStatus, to ignore the outcome. */ enum CleanWhen { kNow, kDelayed }; auto cleanUpRange(ChunkRange const& range, CleanWhen) -> CleanupNotification; diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 75c77e016cc..bbbd4f4fabe 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -403,20 +403,6 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC // Migration succeeded log() << "Migration succeeded and updated collection version to " << refreshedMetadata->getCollVersion(); - - // Schedule clearing out orphaned documents when they are no longer in active use. - const auto orphans = ChunkRange(_args.getMinKey(), _args.getMaxKey()); - auto const now = CollectionShardingState::kNow, later = CollectionShardingState::kDelayed; - auto whenToClean = _args.getWaitForDelete() ? now : later; - - auto notification = css->cleanUpRange(orphans, whenToClean); - if (notification.ready() && !notification.waitStatus(opCtx).isOK()) { - // if it fails immediately, report that and continue. - warning() << "Failed to initiate cleanup of " << getNss().ns() << " orphan range " - << redact(orphans.toString()) << ": " - << redact(notification.waitStatus(opCtx).reason()); - } - notification.abandon(); } else { AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 556e1e32853..01a89dbb476 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -44,6 +44,7 @@ #include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/move_timing_helper.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/migration_secondary_throttle_options.h" @@ -229,21 +230,57 @@ private: MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep6); auto nss = moveChunkRequest.getNss(); - auto range = ChunkRange(moveChunkRequest.getMinKey(), moveChunkRequest.getMaxKey()); - auto const now = CollectionShardingState::kNow, later = CollectionShardingState::kDelayed; - auto whenToClean = moveChunkRequest.getWaitForDelete() ? now : later; - if (whenToClean == now) { + const auto range = ChunkRange(moveChunkRequest.getMinKey(), moveChunkRequest.getMaxKey()); + + // Wait for the metadata update to be persisted before scheduling the range deletion. + // + // This is necessary to prevent a race on the secondary because both metadata persistence + // and range deletion is done asynchronously and we must prevent the data deletion from + // being propagated before the metadata update. + ScopedCollectionMetadata metadata = [&]() { + AutoGetCollection autoColl(opCtx, nss, MODE_IS); + return CollectionShardingState::get(opCtx, nss)->getMetadata(); + }(); + uassert(ErrorCodes::NamespaceNotSharded, + str::stream() << "Chunk move failed because collection '" << nss.ns() + << "' is no longer sharded.", + metadata); + uassertStatusOK(Grid::get(opCtx)->catalogCache()->waitForCollectionVersion( + opCtx, nss, metadata->getCollVersion())); + + // Now schedule the range deletion clean up. + CollectionShardingState::CleanupNotification notification; + { + AutoGetCollection autoColl(opCtx, nss, MODE_IS); + + auto const now = CollectionShardingState::kNow, + later = CollectionShardingState::kDelayed; + auto whenToClean = moveChunkRequest.getWaitForDelete() ? now : later; + notification = + CollectionShardingState::get(opCtx, nss)->cleanUpRange(range, whenToClean); + + // Check for immediate failure on scheduling range deletion. + if (notification.ready() && !notification.waitStatus(opCtx).isOK()) { + warning() << "Failed to initiate cleanup of " << nss.ns() << " range " + << redact(range.toString()) + << " due to: " << redact(notification.waitStatus(opCtx)); + } + notification.abandon(); + } + + if (moveChunkRequest.getWaitForDelete()) { log() << "Waiting for cleanup of " << nss.ns() << " range " << redact(range.toString()); - CollectionShardingState::waitForClean( - opCtx, moveChunkRequest.getNss(), moveChunkRequest.getVersionEpoch(), range) - .transitional_ignore(); - // Ensure that wait for write concern for the chunk cleanup will include - // the deletes performed by the range deleter thread. + uassertStatusOK(CollectionShardingState::waitForClean( + opCtx, moveChunkRequest.getNss(), moveChunkRequest.getVersionEpoch(), range)); + + // Ensure that wait for write concern for the chunk cleanup will include the deletes + // performed by the range deleter thread. repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); } else { log() << "Leaving cleanup of " << nss.ns() << " range " << redact(range.toString()) << " to complete in background"; } + moveTimingHelper.done(7); MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep7); } diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index fe2d9eca260..2a6b956a17b 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -242,6 +242,28 @@ void ShardServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(OperationCon _namespaceNotifications.notifyChange(nss); } +Status ShardServerCatalogCacheLoader::waitForCollectionVersion(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& version) { + invariant(!opCtx->lockState()->isLocked()); + while (true) { + auto scopedNotification = _namespaceNotifications.createNotification(nss); + + auto swRefreshState = getPersistedRefreshFlags(opCtx, nss); + if (!swRefreshState.isOK()) { + return swRefreshState.getStatus(); + } + RefreshState refreshState = swRefreshState.getValue(); + + if (refreshState.lastRefreshedCollectionVersion.epoch() != version.epoch() || + refreshState.lastRefreshedCollectionVersion >= version) { + return Status::OK(); + } + + scopedNotification.get(opCtx); + } +} + ShardServerCatalogCacheLoader::ShardServerCatalogCacheLoader( std::unique_ptr<CatalogCacheLoader> configLoader) : _configServerLoader(std::move(configLoader)), _threadPool(makeDefaultThreadPoolOptions()) { diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h index 48e0e2baf1e..9a978a5f87e 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.h +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h @@ -71,7 +71,19 @@ public: */ void notifyOfCollectionVersionUpdate(OperationContext* opCtx, const NamespaceString& nss, - const ChunkVersion& version); + const ChunkVersion& version) override; + + /** + * This function can throw a DBException if the opCtx is interrupted. A lock must not be held + * when calling this because it would prevent using the latest snapshot and actually seeing the + * change after it arrives. + * + * See CatalogCache::waitForCollectionVersion for function details: it's a passthrough function + * to give external access to this function, and so it is the interface. + */ + Status waitForCollectionVersion(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& version) override; /** * This must be called serially, never in parallel, including waiting for the returned diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 255ed96df69..9677b8bc58f 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -178,6 +178,12 @@ void CatalogCache::notifyOfCollectionVersionUpdate(OperationContext* opCtx, _cacheLoader->notifyOfCollectionVersionUpdate(opCtx, nss, version); } +Status CatalogCache::waitForCollectionVersion(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& version) { + return _cacheLoader->waitForCollectionVersion(opCtx, nss, version); +} + StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx, StringData dbName) { try { diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index a978642c825..9b261de3a43 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -98,6 +98,24 @@ public: const ChunkVersion& version); /** + * Waits for the persisted collection version to be gte to 'version', or an epoch change. Only + * call this function if you KNOW that a version gte WILL eventually be persisted. + * + * This function cannot wait for a version if nothing is persisted because a collection can + * become unsharded after we start waiting and 'version' will then never be reached. If 'nss' + * has no persisted metadata, even if it will shortly, a NamespaceNotFound error will be + * returned. + * + * A lock must not be held when calling this because it would prevent using the latest snapshot + * and actually seeing the change after it arrives. + * This function can throw a DBException if the opCtx is interrupted. + * This can only be called on a shard! + */ + Status waitForCollectionVersion(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& version); + + /** * Retrieves the cached metadata for the specified database. The returned value is still owned * by the cache and should not be kept elsewhere. I.e., it should only be used as a local * variable. The reason for this is so that if the cache gets invalidated, the caller does not diff --git a/src/mongo/s/catalog_cache_loader.h b/src/mongo/s/catalog_cache_loader.h index 7405d225d5b..3257cb8fbc5 100644 --- a/src/mongo/s/catalog_cache_loader.h +++ b/src/mongo/s/catalog_cache_loader.h @@ -96,6 +96,13 @@ public: const ChunkVersion& version) = 0; /** + * Waits for the persisted collection version to be GTE to 'version', or an epoch change. + */ + virtual Status waitForCollectionVersion(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& version) = 0; + + /** * Non-blocking call, which requests the chunks changed since the specified version to be * fetched from the persistent metadata store and invokes the callback function with the result. * The callback function must never throw - it is a fatal error to do so. diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp index 79cea4d8211..e0ca6968b91 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.cpp +++ b/src/mongo/s/config_server_catalog_cache_loader.cpp @@ -159,6 +159,12 @@ void ConfigServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(OperationCo MONGO_UNREACHABLE; } +Status ConfigServerCatalogCacheLoader::waitForCollectionVersion(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& version) { + MONGO_UNREACHABLE; +} + std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSince( const NamespaceString& nss, ChunkVersion version, diff --git a/src/mongo/s/config_server_catalog_cache_loader.h b/src/mongo/s/config_server_catalog_cache_loader.h index 9f7512f30bb..1a2451628da 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.h +++ b/src/mongo/s/config_server_catalog_cache_loader.h @@ -47,6 +47,9 @@ public: void notifyOfCollectionVersionUpdate(OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& version) override; + Status waitForCollectionVersion(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& version) override; std::shared_ptr<Notification<void>> getChunksSince( const NamespaceString& nss, |