diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-05-16 11:46:25 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-07-03 05:56:59 -0400 |
commit | 059b8a9dc777e3940caa26f2a909d4988b605645 (patch) | |
tree | 709ccc99b8094354489a55f4a19bf6e88cb65a92 | |
parent | de8b171fbd730be5f82d0c4b6a3c378000167246 (diff) | |
download | mongo-059b8a9dc777e3940caa26f2a909d4988b605645.tar.gz |
SERVER-41869 On term mismatch do not invoke the getChunkSince callback under mutex
As part of this change also backports the following cleanup:
- Use alias for the callback of CatalogCacheLoader::getChunksSince
- Use StringMap in CollectionShardingState instead of std::unordered_map
(cherry picked from commit cb0393248d26e21e69efde15d9d3965293ead29b)
-rw-r--r-- | src/mongo/db/s/catalog_cache_loader_mock.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/catalog_cache_loader_mock.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/s/read_only_catalog_cache_loader.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/read_only_catalog_cache_loader.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.cpp | 65 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.h | 31 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache_loader.h | 9 | ||||
-rw-r--r-- | src/mongo/s/config_server_catalog_cache_loader.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/config_server_catalog_cache_loader.h | 3 |
10 files changed, 52 insertions, 79 deletions
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp index 8b6e06723d5..d798d1b137a 100644 --- a/src/mongo/db/s/catalog_cache_loader_mock.cpp +++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp @@ -97,10 +97,7 @@ void CatalogCacheLoaderMock::waitForDatabaseFlush(OperationContext* opCtx, Strin } std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince( - const NamespaceString& nss, - ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { - + const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { auto notify = std::make_shared<Notification<void>>(); uassertStatusOK(_threadPool.schedule([ this, notify, callbackFn ]() noexcept { diff --git a/src/mongo/db/s/catalog_cache_loader_mock.h b/src/mongo/db/s/catalog_cache_loader_mock.h index f6b7b5e2115..76a0fc17644 100644 --- a/src/mongo/db/s/catalog_cache_loader_mock.h +++ b/src/mongo/db/s/catalog_cache_loader_mock.h @@ -59,8 +59,7 @@ public: std::shared_ptr<Notification<void>> getChunksSince( const NamespaceString& nss, ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) - override; + GetChunksSinceCallbackFn callbackFn) override; void getDatabase( StringData dbName, diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 85cd60486d3..7ba81f81709 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -38,6 +38,7 @@ #include "mongo/db/s/sharded_connection_info.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" +#include "mongo/util/string_map.h" namespace mongo { namespace { diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.cpp b/src/mongo/db/s/read_only_catalog_cache_loader.cpp index e730c055694..f7b0996bc7a 100644 --- a/src/mongo/db/s/read_only_catalog_cache_loader.cpp +++ b/src/mongo/db/s/read_only_catalog_cache_loader.cpp @@ -46,9 +46,7 @@ void ReadOnlyCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCtx, S } std::shared_ptr<Notification<void>> ReadOnlyCatalogCacheLoader::getChunksSince( - const NamespaceString& nss, - ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { + const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { return _configServerLoader.getChunksSince(nss, version, callbackFn); } diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.h b/src/mongo/db/s/read_only_catalog_cache_loader.h index 49bff910bda..43c8aaf1037 100644 --- a/src/mongo/db/s/read_only_catalog_cache_loader.h +++ b/src/mongo/db/s/read_only_catalog_cache_loader.h @@ -51,8 +51,7 @@ public: std::shared_ptr<Notification<void>> getChunksSince( const NamespaceString& nss, ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) - override; + GetChunksSinceCallbackFn callbackFn) override; void getDatabase( StringData dbName, diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index 880ae109ddf..24f42034580 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -360,7 +360,7 @@ void ShardServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(const Namesp } void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<stdx::mutex> lg(_mutex); invariant(_role == ReplicaSetRole::None); if (isPrimary) { @@ -371,7 +371,7 @@ void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) { } void ShardServerCatalogCacheLoader::onStepDown() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<stdx::mutex> lg(_mutex); invariant(_role != ReplicaSetRole::None); _contexts.interrupt(ErrorCodes::PrimarySteppedDown); ++_term; @@ -379,58 +379,47 @@ void ShardServerCatalogCacheLoader::onStepDown() { } void ShardServerCatalogCacheLoader::onStepUp() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<stdx::mutex> lg(_mutex); invariant(_role != ReplicaSetRole::None); ++_term; _role = ReplicaSetRole::Primary; } std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSince( - const NamespaceString& nss, - ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { - long long currentTerm; + const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { + auto notify = std::make_shared<Notification<void>>(); + bool isPrimary; - { - // Take the mutex so that we can discern whether we're primary or secondary and schedule a - // task with the corresponding _term value. + long long term; + std::tie(isPrimary, term) = [&] { stdx::lock_guard<stdx::mutex> lock(_mutex); - invariant(_role != ReplicaSetRole::None); - - currentTerm = _term; - isPrimary = (_role == ReplicaSetRole::Primary); - } - - auto notify = std::make_shared<Notification<void>>(); + return std::make_tuple(_role == ReplicaSetRole::Primary, _term); + }(); uassertStatusOK(_threadPool.schedule( - [ this, nss, version, callbackFn, notify, isPrimary, currentTerm ]() noexcept { + [ this, nss, version, callbackFn, notify, isPrimary, term ]() noexcept { auto context = _contexts.makeOperationContext(*Client::getCurrent()); + auto const opCtx = context.opCtx(); - { - stdx::lock_guard<stdx::mutex> lock(_mutex); - // We may have missed an OperationContextGroup interrupt since this operation began - // but before the OperationContext was added to the group. So we'll check that - // we're still in the same _term. - if (_term != currentTerm) { - callbackFn(context.opCtx(), - Status{ErrorCodes::Interrupted, - "Unable to refresh routing table because replica set state " - "changed or node is shutting down."}); - notify->set(); - return; + try { + { + // We may have missed an OperationContextGroup interrupt since this operation + // began but before the OperationContext was added to the group. So we'll check + // that we're still in the same _term. + stdx::lock_guard<stdx::mutex> lock(_mutex); + uassert(ErrorCodes::Interrupted, + "Unable to refresh routing table because replica set state changed or " + "the node is shutting down.", + _term == term); } - } - try { if (isPrimary) { - _schedulePrimaryGetChunksSince( - context.opCtx(), nss, version, currentTerm, callbackFn, notify); + _schedulePrimaryGetChunksSince(opCtx, nss, version, term, callbackFn, notify); } else { - _runSecondaryGetChunksSince(context.opCtx(), nss, version, callbackFn); + _runSecondaryGetChunksSince(opCtx, nss, version, callbackFn, notify); } } catch (const DBException& ex) { - callbackFn(context.opCtx(), ex.toStatus()); + callbackFn(opCtx, ex.toStatus()); notify->set(); } })); @@ -603,13 +592,15 @@ void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince( OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, + std::shared_ptr<Notification<void>> notify) { forcePrimaryCollectionRefreshAndWaitForReplication(opCtx, nss); // Read the local metadata. auto swCollAndChunks = _getCompletePersistedMetadataForSecondarySinceVersion(opCtx, nss, catalogCacheSinceVersion); callbackFn(opCtx, std::move(swCollAndChunks)); + notify->set(); } void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h index ff246c0666e..045e9e0c1aa 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.h +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h @@ -75,21 +75,10 @@ public: */ void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override; - /** - * This must be called serially, never in parallel, including waiting for the returned - * Notification to be signalled. - * - * This function is robust to unexpected version requests from the CatalogCache. Requesting - * versions with epoches that do not match anything on the config server will not affect or - * clear the locally persisted metadata. Requesting versions higher than anything previous - * requested, or versions lower than already requested, will not mess up the locally persisted - * metadata, and will return what was requested if it exists. - */ std::shared_ptr<Notification<void>> getChunksSince( const NamespaceString& nss, ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) - override; + GetChunksSinceCallbackFn callbackFn) override; void getDatabase( StringData dbName, @@ -351,7 +340,8 @@ private: OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn); + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, + std::shared_ptr<Notification<void>> notify); /** * Refreshes chunk metadata from the config server's metadata store, and schedules maintenance @@ -479,21 +469,19 @@ private: CollectionAndChangedChunks _getCompletePersistedMetadataForSecondarySinceVersion( OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& version); - // Used by the shard primary to retrieve chunk metadata from the config server. + // Loader used by the shard primary to retrieve the authoritative routing metadata from the + // config server const std::unique_ptr<CatalogCacheLoader> _configServerLoader; - // Thread pool used to load chunk metadata. + // Thread pool used to run blocking tasks which perform disk reads and writes ThreadPool _threadPool; + // Registry of notifications for changes happening to the shard's on-disk routing information NamespaceMetadataChangeNotifications _namespaceNotifications; - // Protects the class state below. + // Protects the class state below stdx::mutex _mutex; - // Map to track in progress persisted cache updates on the shard primary. - CollAndChunkTaskLists _collAndChunkTaskLists; - DbTaskLists _dbTaskLists; - // This value is bumped every time the set of currently scheduled tasks should no longer be // running. This includes, replica set state transitions and shutdown. long long _term{0}; @@ -504,6 +492,9 @@ private: // The collection of operation contexts in use by all threads. OperationContextGroup _contexts; + + CollAndChunkTaskLists _collAndChunkTaskLists; + DbTaskLists _dbTaskLists; }; } // namespace mongo diff --git a/src/mongo/s/catalog_cache_loader.h b/src/mongo/s/catalog_cache_loader.h index 94d8ea5a93e..ef292552df1 100644 --- a/src/mongo/s/catalog_cache_loader.h +++ b/src/mongo/s/catalog_cache_loader.h @@ -68,7 +68,6 @@ public: static CatalogCacheLoader& get(ServiceContext* serviceContext); static CatalogCacheLoader& get(OperationContext* opCtx); - /** * Used as a return value for getChunksSince. */ @@ -93,6 +92,9 @@ public: std::vector<ChunkType> changedChunks; }; + using GetChunksSinceCallbackFn = + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)>; + /** * Initializes internal state. Must be called only once when sharding state is initialized. */ @@ -126,10 +128,7 @@ public: * Notification object can be waited on in order to ensure that. */ virtual std::shared_ptr<Notification<void>> getChunksSince( - const NamespaceString& nss, - ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> - callbackFn) = 0; + const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) = 0; /** * Non-blocking call, which requests the most recent db version for the given dbName from the diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp index 4de200959e9..97b5ee9ca9d 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.cpp +++ b/src/mongo/s/config_server_catalog_cache_loader.cpp @@ -27,7 +27,9 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ + #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + #include "mongo/platform/basic.h" #include "mongo/s/config_server_catalog_cache_loader.h" @@ -172,10 +174,7 @@ void ConfigServerCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCt } std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSince( - const NamespaceString& nss, - ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { - + const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { auto notify = std::make_shared<Notification<void>>(); uassertStatusOK(_threadPool.schedule([ nss, version, notify, callbackFn ]() noexcept { diff --git a/src/mongo/s/config_server_catalog_cache_loader.h b/src/mongo/s/config_server_catalog_cache_loader.h index 50d771d1681..fd7d104d80a 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.h +++ b/src/mongo/s/config_server_catalog_cache_loader.h @@ -53,8 +53,7 @@ public: std::shared_ptr<Notification<void>> getChunksSince( const NamespaceString& nss, ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) - override; + GetChunksSinceCallbackFn callbackFn) override; void getDatabase( StringData dbName, |