diff options
author | jannaerin <golden.janna@gmail.com> | 2018-04-03 13:30:30 -0400 |
---|---|---|
committer | jannaerin <golden.janna@gmail.com> | 2018-04-06 17:16:55 -0400 |
commit | 47e4b6b791cce13a36ea499a9311d54f100412ee (patch) | |
tree | 0e120305baab1a4e90863020ccaf3182b0c9f8d4 /src | |
parent | 1ed0056534da6c9a7995f243bea508c9aca48776 (diff) | |
download | mongo-47e4b6b791cce13a36ea499a9311d54f100412ee.tar.gz |
SERVER-32608 Make secondaries refresh their database metadata from the persisted database cache
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.cpp | 55 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.h | 14 |
2 files changed, 64 insertions, 5 deletions
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index d469bf78829..170a3c39c53 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -243,6 +243,17 @@ CollectionAndChangedChunks getPersistedMetadataSinceVersion(OperationContext* op std::move(changedChunks)}; } +DatabaseType getPersistedDbMetadata(OperationContext* opCtx, StringData dbName) { + ShardDatabaseType shardDatabaseEntry = uassertStatusOK(readShardDatabasesEntry(opCtx, dbName)); + + DatabaseType dbt(shardDatabaseEntry.getDbName(), + shardDatabaseEntry.getPrimary(), + shardDatabaseEntry.getPartitioned(), + shardDatabaseEntry.getDbVersion()); + + return dbt; +} + /** * Attempts to read the collection and chunk metadata. May not read a complete diff if the metadata * for the collection is being updated concurrently. This is safe if those updates are appended. @@ -286,7 +297,8 @@ StatusWith<CollectionAndChangedChunks> getIncompletePersistedMetadataSinceVersio * Sends _flushRoutingTableCacheUpdates to the primary to force it to refresh its routing table for * collection 'nss' and then waits for the refresh to replicate to this node. */ -void forcePrimaryRefreshAndWaitForReplication(OperationContext* opCtx, const NamespaceString& nss) { +void forcePrimaryCollectionRefreshAndWaitForReplication(OperationContext* opCtx, + const NamespaceString& nss) { auto const shardingState = ShardingState::get(opCtx); invariant(shardingState->enabled()); @@ -308,6 +320,31 @@ void forcePrimaryRefreshAndWaitForReplication(OperationContext* opCtx, const Nam } /** + * Sends _flushDatabaseCacheUpdates to the primary to force it to refresh its routing table for + * database 'dbName' and then waits for the refresh to replicate to this node. + */ +void forcePrimaryDatabaseRefreshAndWaitForReplication(OperationContext* opCtx, StringData dbName) { + auto const shardingState = ShardingState::get(opCtx); + invariant(shardingState->enabled()); + + auto selfShard = uassertStatusOK( + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardingState->getShardName())); + + auto cmdResponse = uassertStatusOK(selfShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + BSON("_flushDatabaseCacheUpdates" << dbName.toString()), + Seconds{30}, + Shard::RetryPolicy::kIdempotent)); + + uassertStatusOK(cmdResponse.commandStatus); + + uassertStatusOK(repl::ReplicationCoordinator::get(opCtx)->waitUntilOpTimeForRead( + opCtx, {LogicalTime::fromOperationTime(cmdResponse.response), boost::none})); +} + +/** * Reads the local chunk metadata to obtain the current ChunkVersion. If there is no local * metadata for the namespace, returns ChunkVersion::UNSHARDED(), since only metadata for sharded * collections is persisted. @@ -463,6 +500,8 @@ void ShardServerCatalogCacheLoader::getDatabase( if (isPrimary) { _schedulePrimaryGetDatabase( context.opCtx(), StringData(name), currentTerm, callbackFn); + } else { + _runSecondaryGetDatabase(context.opCtx(), StringData(name), callbackFn); } } catch (const DBException& ex) { callbackFn(context.opCtx(), ex.toStatus()); @@ -577,7 +616,7 @@ void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince( const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion, stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { - forcePrimaryRefreshAndWaitForReplication(opCtx, nss); + forcePrimaryCollectionRefreshAndWaitForReplication(opCtx, nss); // Read the local metadata. auto swCollAndChunks = @@ -690,6 +729,17 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( _configServerLoader->getChunksSince(nss, maxLoaderVersion, remoteRefreshCallbackFn); } +void ShardServerCatalogCacheLoader::_runSecondaryGetDatabase( + OperationContext* opCtx, + StringData dbName, + stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) { + forcePrimaryDatabaseRefreshAndWaitForReplication(opCtx, dbName); + + // Read the local metadata. + auto swDatabaseType = getPersistedDbMetadata(opCtx, dbName); + callbackFn(opCtx, std::move(swDatabaseType)); +} + void ShardServerCatalogCacheLoader::_schedulePrimaryGetDatabase( OperationContext* opCtx, StringData dbName, @@ -1187,7 +1237,6 @@ void ShardServerCatalogCacheLoader::CollAndChunkTaskList::addTask(collAndChunkTa void ShardServerCatalogCacheLoader::DbTaskList::addTask(dbTask task) { if (_tasks.empty()) { _tasks.emplace_back(std::move(task)); - return; } diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h index 87bd3dea31c..1732176140c 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.h +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h @@ -155,8 +155,7 @@ private: /** * This represents an update task for the persisted database metadata. The task will either be - * to - * persist an update to the shard persisted metadata store or to drop the persisted + * to persist an update to the shard persisted metadata store or to drop the persisted * metadata for a specific database. */ struct dbTask { @@ -401,6 +400,17 @@ private: std::shared_ptr<Notification<void>> notify); /** + * Forces the primary to refresh its metadata for 'dbName' and waits until this node's metadata + * has caught up to the primary's. + * Then retrieves the db version from this node's persisted metadata store and passes it to + * 'callbackFn'. + */ + void _runSecondaryGetDatabase( + OperationContext* opCtx, + StringData dbName, + stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn); + + /** * Refreshes db version from the config server's metadata store, and schedules maintenance * of the shard's persisted metadata store with the latest updates retrieved from the config * server. |