diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 95 | ||||
-rw-r--r-- | src/mongo/db/s/shard_metadata_util.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/s/shard_metadata_util.h | 11 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.cpp | 191 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.h | 25 |
5 files changed, 200 insertions, 137 deletions
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 348d76e6886..27d00b58b91 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -108,18 +108,6 @@ private: const ChunkVersion _updatedVersion; }; -/** - * Checks via the ReplicationCoordinator whether this server is a primary/standalone that can do - * writes. This function may return false if the server is primary but in drain mode. - * - * Note: expects the global lock to be held so that a meaningful answer is returned -- replica set - * state cannot change under a lock. - */ -bool isPrimary(OperationContext* opCtx, const NamespaceString& nss) { - // If the server can execute writes, then it is either a primary or standalone. - return repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss); -} - } // unnamed namespace CollectionShardingState::CollectionShardingState(ServiceContext* sc, NamespaceString nss) @@ -412,43 +400,37 @@ void CollectionShardingState::_onConfigRefreshCompleteInvalidateCachedMetadataAn dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); - if (!isPrimary(opCtx, _nss)) { - // Extract which collection entry is being updated - std::string refreshCollection; - fassertStatusOK( - 40477, - bsonExtractStringField(query, ShardCollectionType::uuid.name(), &refreshCollection)); - - // Parse the '$set' update, which will contain the 'lastRefreshedCollectionVersion' if it is - // present. - BSONElement updateElement; - fassertStatusOK(40478, - bsonExtractTypedField(update, StringData("$set"), Object, &updateElement)); - BSONObj setField = updateElement.Obj(); - - // If 'lastRefreshedCollectionVersion' is present, then a refresh completed and the catalog - // cache must be invalidated and the catalog cache loader notified of the new version. - if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) { - // Get the version epoch from the 'updatedDoc', since it didn't get updated and won't be - // in 'update'. - BSONElement oidElem; - fassert(40513, - bsonExtractTypedField( - updatedDoc, ShardCollectionType::epoch(), BSONType::jstOID, &oidElem)); - - // Get the new collection version. - auto statusWithLastRefreshedChunkVersion = - ChunkVersion::parseFromBSONWithFieldAndSetEpoch( - updatedDoc, - ShardCollectionType::lastRefreshedCollectionVersion(), - oidElem.OID()); - fassert(40514, statusWithLastRefreshedChunkVersion.isOK()); - - opCtx->recoveryUnit()->registerChange( - new CollectionVersionLogOpHandler(opCtx, - NamespaceString(refreshCollection), - statusWithLastRefreshedChunkVersion.getValue())); - } + // Extract which collection entry is being updated + std::string refreshCollection; + fassertStatusOK( + 40477, bsonExtractStringField(query, ShardCollectionType::uuid.name(), &refreshCollection)); + + // Parse the '$set' update, which will contain the 'lastRefreshedCollectionVersion' if it is + // present. + BSONElement updateElement; + fassertStatusOK(40478, + bsonExtractTypedField(update, StringData("$set"), Object, &updateElement)); + BSONObj setField = updateElement.Obj(); + + // If 'lastRefreshedCollectionVersion' is present, then a refresh completed and the catalog + // cache must be invalidated and the catalog cache loader notified of the new version. + if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) { + // Get the version epoch from the 'updatedDoc', since it didn't get updated and won't be + // in 'update'. + BSONElement oidElem; + fassert(40513, + bsonExtractTypedField( + updatedDoc, ShardCollectionType::epoch(), BSONType::jstOID, &oidElem)); + + // Get the new collection version. + auto statusWithLastRefreshedChunkVersion = ChunkVersion::parseFromBSONWithFieldAndSetEpoch( + updatedDoc, ShardCollectionType::lastRefreshedCollectionVersion(), oidElem.OID()); + fassert(40514, statusWithLastRefreshedChunkVersion.isOK()); + + opCtx->recoveryUnit()->registerChange( + new CollectionVersionLogOpHandler(opCtx, + NamespaceString(refreshCollection), + statusWithLastRefreshedChunkVersion.getValue())); } } @@ -457,16 +439,13 @@ void CollectionShardingState::_onConfigDeleteInvalidateCachedMetadataAndNotify( dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); - if (!isPrimary(opCtx, _nss)) { - // Extract which collection entry is being deleted from the _id field. - std::string deletedCollection; - fassertStatusOK( - 40479, - bsonExtractStringField(query, ShardCollectionType::uuid.name(), &deletedCollection)); + // Extract which collection entry is being deleted from the _id field. + std::string deletedCollection; + fassertStatusOK( + 40479, bsonExtractStringField(query, ShardCollectionType::uuid.name(), &deletedCollection)); - opCtx->recoveryUnit()->registerChange(new CollectionVersionLogOpHandler( - opCtx, NamespaceString(deletedCollection), ChunkVersion::UNSHARDED())); - } + opCtx->recoveryUnit()->registerChange(new CollectionVersionLogOpHandler( + opCtx, NamespaceString(deletedCollection), ChunkVersion::UNSHARDED())); } bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx, diff --git a/src/mongo/db/s/shard_metadata_util.cpp b/src/mongo/db/s/shard_metadata_util.cpp index 937528403e7..cb614accc31 100644 --- a/src/mongo/db/s/shard_metadata_util.cpp +++ b/src/mongo/db/s/shard_metadata_util.cpp @@ -72,11 +72,18 @@ QueryAndSort createShardChunkDiffQuery(const ChunkVersion& collectionVersion) { BSON(ChunkType::lastmod() << 1)}; } -bool RefreshState::operator==(RefreshState& other) { +bool RefreshState::operator==(RefreshState& other) const { return (other.epoch == epoch) && (other.refreshing == refreshing) && (other.lastRefreshedCollectionVersion == lastRefreshedCollectionVersion); } +std::string RefreshState::toString() const { + return str::stream() << "epoch: " << epoch + << ", refreshing: " << (refreshing ? "true" : "false") + << ", lastRefreshedCollectionVersion: " + << lastRefreshedCollectionVersion.toString(); +} + Status setPersistedRefreshFlags(OperationContext* opCtx, const NamespaceString& nss) { // Set 'refreshing' to true. BSONObj update = BSON(ShardCollectionType::refreshing() << true); @@ -119,7 +126,11 @@ StatusWith<RefreshState> getPersistedRefreshFlags(OperationContext* opCtx, } return RefreshState{entry.getEpoch(), - entry.hasRefreshing() ? entry.getRefreshing() : false, + // If the refreshing field has not yet been added, this means that the first + // refresh has started, but no chunks have ever yet been applied, around + // which these flags are set. So default to refreshing true because the + // chunk metadata is being updated and is not yet ready to be read. + entry.hasRefreshing() ? entry.getRefreshing() : true, entry.hasLastRefreshedCollectionVersion() ? entry.getLastRefreshedCollectionVersion() : ChunkVersion(0, 0, entry.getEpoch())}; diff --git a/src/mongo/db/s/shard_metadata_util.h b/src/mongo/db/s/shard_metadata_util.h index 6a742f8d75a..6a1b8cb5cd6 100644 --- a/src/mongo/db/s/shard_metadata_util.h +++ b/src/mongo/db/s/shard_metadata_util.h @@ -60,13 +60,20 @@ struct QueryAndSort { }; /** - * Subset of the shard's collections collection related to refresh state. + * Subset of the shard's collections collection document that relates to refresh state. */ struct RefreshState { - bool operator==(RefreshState& other); + bool operator==(RefreshState& other) const; + std::string toString() const; + // The current epoch of the collection metadata. OID epoch; + + // Whether a refresh is currently in progress. bool refreshing; + + // The collection version after the last complete refresh. Indicates change if refreshing has + // started and finished since last loaded. ChunkVersion lastRefreshedCollectionVersion; }; diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index edc41a3c66d..52385c7cc8d 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -113,11 +113,15 @@ Status persistCollectionAndChangedChunks(OperationContext* opCtx, } /** + * This function will throw on error! + * * Retrieves the persisted max chunk version for 'nss', if there are any persisted chunks. If there * are none -- meaning there's no persisted metadata for 'nss' --, returns a * ChunkVersion::UNSHARDED() version. * - * It is unsafe to call this when a task for 'nss' is running concurrently. + * It is unsafe to call this when a task for 'nss' is running concurrently because the collection + * could be dropped and recreated between reading the collection epoch and retrieving the chunk, + * which would make the returned ChunkVersion corrupt. */ ChunkVersion getPersistedMaxVersion(OperationContext* opCtx, const NamespaceString& nss) { // Must read the collections entry to get the epoch to pass into ChunkType for shard's chunk @@ -155,28 +159,22 @@ ChunkVersion getPersistedMaxVersion(OperationContext* opCtx, const NamespaceStri } /** - * Tries to find persisted chunk metadata with chunk versions GTE to 'version'. Should always - * return metadata if the collection exists. + * This function will throw on error! + * + * Tries to find persisted chunk metadata with chunk versions GTE to 'version'. + * * * If 'version's epoch matches persisted metadata, returns persisted metadata GTE 'version'. * If 'version's epoch doesn't match persisted metadata, returns all persisted metadata. - * If there is no persisted metadata, returns an empty CollectionAndChangedChunks object. + * If collections entry does not exist, throws NamespaceNotFound error. Can return an empty + * chunks vector in CollectionAndChangedChunks without erroring, if collections entry IS found. */ -StatusWith<CollectionAndChangedChunks> getPersistedMetadataSinceVersion(OperationContext* opCtx, - const NamespaceString& nss, - ChunkVersion version) { - auto swShardCollectionEntry = readShardCollectionsEntry(opCtx, nss); - if (swShardCollectionEntry == ErrorCodes::NamespaceNotFound) { - // If there is no metadata, collection does not exist. Return empty results. - return CollectionAndChangedChunks(); - } else if (!swShardCollectionEntry.isOK()) { - return StatusWith<CollectionAndChangedChunks>( - ErrorCodes::OperationFailed, - str::stream() << "Failed to load local collection metadata due to '" - << swShardCollectionEntry.getStatus().toString() - << "'."); - } - auto shardCollectionEntry = std::move(swShardCollectionEntry.getValue()); +CollectionAndChangedChunks getPersistedMetadataSinceVersion(OperationContext* opCtx, + const NamespaceString& nss, + ChunkVersion version, + const bool okToReadWhileRefreshing) { + ShardCollectionType shardCollectionEntry = + uassertStatusOK(readShardCollectionsEntry(opCtx, nss)); // If the persisted epoch doesn't match what the CatalogCache requested, read everything. ChunkVersion startingVersion = (shardCollectionEntry.getEpoch() == version.epoch()) @@ -185,44 +183,14 @@ StatusWith<CollectionAndChangedChunks> getPersistedMetadataSinceVersion(Operatio QueryAndSort diff = createShardChunkDiffQuery(startingVersion); - auto swChangedChunks = - readShardChunks(opCtx, nss, diff.query, diff.sort, boost::none, startingVersion.epoch()); - if (!swChangedChunks.isOK()) { - return StatusWith<CollectionAndChangedChunks>( - ErrorCodes::OperationFailed, - str::stream() << "Failed to load local collection metadata due to '" - << swChangedChunks.getStatus().toString() - << "'."); - } else if (swChangedChunks.getValue().empty()) { - // No chunks were found, collection was dropped. Return empty results. - return CollectionAndChangedChunks(); - } - - // Make sure the collections entry epoch has not changed. Otherwise an epoch changing update was - // applied after we originally read the entry and the chunks may not match the original epoch. - - auto swAfterShardCollectionEntry = readShardCollectionsEntry(opCtx, nss); - if (swAfterShardCollectionEntry == ErrorCodes::NamespaceNotFound) { - // The collection has been dropped since we began loading, return empty results. - return CollectionAndChangedChunks(); - } else if (!swAfterShardCollectionEntry.isOK()) { - return StatusWith<CollectionAndChangedChunks>( - ErrorCodes::OperationFailed, - str::stream() << "Failed to reload local collection metadata due to '" - << swAfterShardCollectionEntry.getStatus().toString() - << "'."); - } - - if (shardCollectionEntry.getEpoch() != swAfterShardCollectionEntry.getValue().getEpoch()) { - // The collection was dropped and recreated since we began. Return empty results. - return CollectionAndChangedChunks(); - } + auto changedChunks = uassertStatusOK( + readShardChunks(opCtx, nss, diff.query, diff.sort, boost::none, startingVersion.epoch())); return CollectionAndChangedChunks{shardCollectionEntry.getEpoch(), shardCollectionEntry.getKeyPattern().toBSON(), shardCollectionEntry.getDefaultCollation(), shardCollectionEntry.getUnique(), - std::move(swChangedChunks.getValue())}; + std::move(changedChunks)}; } } // namespace @@ -282,7 +250,6 @@ void ShardServerCatalogCacheLoader::waitForVersion(OperationContext* opCtx, } if (refreshState.lastRefreshedCollectionVersion <= version) { - opCtx->waitForConditionOrInterrupt(*(notification.condVar()), lock, [&]() -> bool { return notification.version().epoch() != version.epoch() || notification.version() >= version; @@ -344,27 +311,37 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc isPrimary = (_role == ReplicaSetRole::Primary); } - // TODO: add and plug in secondary machinery - auto notify = std::make_shared<Notification<void>>(); - if (isPrimary) { - uassertStatusOK(_threadPool.schedule( - [ this, nss, version, currentTerm, callbackFn, notify ]() noexcept { - auto opCtx = Client::getCurrent()->makeOperationContext(); - try { + uassertStatusOK(_threadPool.schedule( + [ this, nss, version, callbackFn, notify, isPrimary, currentTerm ]() noexcept { + auto opCtx = Client::getCurrent()->makeOperationContext(); + try { + if (isPrimary) { _schedulePrimaryGetChunksSince( opCtx.get(), nss, version, currentTerm, callbackFn, notify); - } catch (const DBException& ex) { - callbackFn(opCtx.get(), ex.toStatus()); - notify->set(); + } else { + _runSecondaryGetChunksSince(opCtx.get(), nss, version, callbackFn); } - })); - } + } catch (const DBException& ex) { + callbackFn(opCtx.get(), ex.toStatus()); + notify->set(); + } + })); return notify; } +void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince( + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& catalogCacheSinceVersion, + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { + auto swCollAndChunks = + _getCompletePersistedMetadataForSecondarySinceVersion(opCtx, nss, catalogCacheSinceVersion); + callbackFn(opCtx, std::move(swCollAndChunks)); +} + void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( OperationContext* opCtx, const NamespaceString& nss, @@ -404,15 +381,25 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( swCollectionAndChangedChunks != ErrorCodes::NamespaceNotFound) { // No updates to apply. Do nothing. } else { - // Enqueue a Task to apply the update retrieved from the config server. - Status scheduleStatus = _scheduleTask( - nss, Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); - if (!scheduleStatus.isOK()) { - callbackFn(opCtx, scheduleStatus); - notify->set(); - return; + // Enqueue a Task to apply the update retrieved from the config server, if new data was + // retrieved. + if (!swCollectionAndChangedChunks.isOK() || + (swCollectionAndChangedChunks.getValue() + .changedChunks.back() + .getVersion() + .epoch() != maxLoaderVersion.epoch()) || + (swCollectionAndChangedChunks.getValue().changedChunks.back().getVersion() > + maxLoaderVersion)) { + Status scheduleStatus = _scheduleTask( + nss, Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); + if (!scheduleStatus.isOK()) { + callbackFn(opCtx, scheduleStatus); + notify->set(); + return; + } } + if (swCollectionAndChangedChunks.isOK()) { log() << "Cache loader remotely refreshed for collection " << nss << " from collection version " << maxLoaderVersion @@ -460,11 +447,20 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader bool tasksAreEnqueued = std::move(enqueuedRes.first); CollectionAndChangedChunks enqueued = std::move(enqueuedRes.second); - auto swPersisted = getPersistedMetadataSinceVersion(opCtx, nss, catalogCacheSinceVersion); - if (!swPersisted.isOK()) { + // TODO: a primary can load metadata while updates are being applied once we have indexes on the + // chunk collections that ensure new data is seen after query yields. This function keeps + // retrying until no updates are applied concurrently. Waiting on SERVER-27714 to add indexes. + auto swPersisted = + _getCompletePersistedMetadataForSecondarySinceVersion(opCtx, nss, catalogCacheSinceVersion); + CollectionAndChangedChunks persisted; + if (swPersisted == ErrorCodes::NamespaceNotFound) { + // No persisted metadata found, create an empty object. + persisted = CollectionAndChangedChunks(); + } else if (!swPersisted.isOK()) { return swPersisted; + } else { + persisted = std::move(swPersisted.getValue()); } - CollectionAndChangedChunks persisted = std::move(swPersisted.getValue()); log() << "Cache loader found " << (enqueued.changedChunks.empty() @@ -683,6 +679,51 @@ bool ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o return true; } +StatusWith<CollectionAndChangedChunks> +ShardServerCatalogCacheLoader::_getCompletePersistedMetadataForSecondarySinceVersion( + OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& version) { + try { + // Keep trying to load the metadata until we get a complete view without updates being + // concurrently applied. + while (true) { + // Wait until a point when updates are not being applied to the 'nss' metadata. + RefreshState beginRefreshState = uassertStatusOK(getPersistedRefreshFlags(opCtx, nss)); + while (beginRefreshState.refreshing) { + waitForVersion(opCtx, nss, beginRefreshState.lastRefreshedCollectionVersion, true); + beginRefreshState = uassertStatusOK(getPersistedRefreshFlags(opCtx, nss)); + } + + // Load the metadata. + CollectionAndChangedChunks collAndChangedChunks = + getPersistedMetadataSinceVersion(opCtx, nss, version, true); + + // Check that no updates were concurrently applied while we were loading the metadata: + // this could cause the loaded metadata to provide an incomplete view of the chunk + // ranges. + RefreshState endRefreshState = uassertStatusOK(getPersistedRefreshFlags(opCtx, nss)); + if (beginRefreshState == endRefreshState) { + return collAndChangedChunks; + } + LOG(1) << "Cache loader read meatadata while updates were being applied: this" + << " metadata may be incomplete. Retrying. Refresh state before read: " + << beginRefreshState << ". Current refresh state: '" << endRefreshState << "'."; + } + } catch (const DBException& ex) { + Status status = ex.toStatus(); + + // NamespaceNotFound errors are expected and must be returned. + if (status == ErrorCodes::NamespaceNotFound) { + return status; + } + + // All other errors are unhandled. + return Status(ErrorCodes::OperationFailed, + str::stream() << "Failed to load local metadata due to '" + << ex.toStatus().toString() + << "'."); + } +} + ShardServerCatalogCacheLoader::Task::Task( StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks, ChunkVersion minimumQueryVersion, diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h index b193ab6d422..43593811e42 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.h +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h @@ -218,6 +218,16 @@ private: typedef std::map<NamespaceString, TaskList> TaskLists; /** + * Retrieves chunk metadata from the shard's persisted metadata store, then passes the results + * to the 'callbackFn'. + */ + void _runSecondaryGetChunksSince( + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& catalogCacheSinceVersion, + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn); + + /** * Refreshes chunk metadata from the config server's metadata store, and schedules maintenance * of the shard's persisted metadata store with the latest updates retrieved from the config * server. @@ -292,6 +302,21 @@ private: */ bool _updatePersistedMetadata(OperationContext* opCtx, const NamespaceString& nss); + /** + * Attempt to read the collection and chunk metadata since version 'sinceVersion' from the shard + * persisted metadata store. + * + * Retries reading the metadata if the shard persisted metadata store becomes imcomplete due to + * concurrent updates being applied -- complete here means that every chunk range is accounted + * for. + * + * May return: a complete metadata update, which when applied to a complete metadata store up to + * 'sinceVersion' again produces a complete metadata store; or a NamespaceNotFound error, which + * means no metadata was found and the collection was dropped. + */ + StatusWith<CollectionAndChangedChunks> _getCompletePersistedMetadataForSecondarySinceVersion( + OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& version); + // Used by the shard primary to retrieve chunk metadata from the config server. const std::unique_ptr<CatalogCacheLoader> _configServerLoader; |