diff options
author | Dianna Hohensee <dianna.hohensee@10gen.com> | 2017-08-08 18:11:34 -0400 |
---|---|---|
committer | Dianna Hohensee <dianna.hohensee@10gen.com> | 2017-08-10 22:27:49 -0400 |
commit | 2198d955d83787d12a439369895e9868288a0604 (patch) | |
tree | 1941c9cb556c30fb5bf01acc275886b9300e5c08 /src/mongo/db | |
parent | 00f3af94a4d5125b48c57ee75d494712b6737c35 (diff) | |
download | mongo-2198d955d83787d12a439369895e9868288a0604.tar.gz |
SERVER-30437 Stop the shard chunk loader from scheduling persistence tasks with mixed version chunks
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/s/shard_metadata_util.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/s/shard_metadata_util_test.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.cpp | 132 |
3 files changed, 56 insertions, 104 deletions
diff --git a/src/mongo/db/s/shard_metadata_util.cpp b/src/mongo/db/s/shard_metadata_util.cpp index c0627e2c48f..9e076435a4c 100644 --- a/src/mongo/db/s/shard_metadata_util.cpp +++ b/src/mongo/db/s/shard_metadata_util.cpp @@ -290,17 +290,7 @@ Status updateShardChunks(OperationContext* opCtx, * */ for (auto& chunk : chunks) { - // Check for a different epoch. - if (!chunk.getVersion().hasEqualEpoch(currEpoch)) { - return Status{ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Invalid chunks found when reloading '" - << nss.toString() - << "'. Previous collection epoch was '" - << currEpoch.toString() - << "', but unexpectedly found a new epoch '" - << chunk.getVersion().epoch().toString() - << "'. Collection was dropped and recreated."}; - } + invariant(chunk.getVersion().hasEqualEpoch(currEpoch)); // Delete any overlapping chunk ranges. Overlapping chunks will have a min value // ("_id") between (chunk.min, chunk.max]. diff --git a/src/mongo/db/s/shard_metadata_util_test.cpp b/src/mongo/db/s/shard_metadata_util_test.cpp index d242684034d..fa30bda71af 100644 --- a/src/mongo/db/s/shard_metadata_util_test.cpp +++ b/src/mongo/db/s/shard_metadata_util_test.cpp @@ -273,22 +273,6 @@ TEST_F(ShardMetadataUtilTest, WriteAndReadChunks) { ASSERT_BSONOBJ_EQ(chunks.back().toShardBSON(), readChunks.front().toShardBSON()); } -TEST_F(ShardMetadataUtilTest, UpdatingChunksFindsNewEpoch) { - std::vector<ChunkType> chunks = makeFourChunks(); - ASSERT_OK(updateShardChunks(operationContext(), kNss, chunks, getCollectionVersion().epoch())); - checkChunks(kChunkMetadataNss, chunks); - - ChunkVersion originalChunkVersion = chunks.back().getVersion(); - chunks.back().setVersion(ChunkVersion(1, 0, OID::gen())); - ASSERT_EQUALS( - updateShardChunks(operationContext(), kNss, chunks, getCollectionVersion().epoch()).code(), - ErrorCodes::ConflictingOperationInProgress); - - // Check that the chunk with a different epoch did not get written. - chunks.back().setVersion(std::move(originalChunkVersion)); - checkChunks(kChunkMetadataNss, chunks); -} - TEST_F(ShardMetadataUtilTest, UpdateWithWriteNewChunks) { // Load some chunk metadata. diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index e935418432a..4ac73533bde 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -443,29 +443,12 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( return getPersistedMaxVersion(opCtx, nss); }(); - auto remoteRefreshCallbackFn = [this, - nss, - catalogCacheSinceVersion, - maxLoaderVersion, - termScheduled, - callbackFn, - notify]( - OperationContext* opCtx, - StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks) { - - if (!swCollectionAndChangedChunks.isOK() && - swCollectionAndChangedChunks != ErrorCodes::NamespaceNotFound) { - // No updates to apply. Do nothing. - } else { - // Enqueue a Task to apply the update retrieved from the config server, if new data was - // retrieved. - if (!swCollectionAndChangedChunks.isOK() || - (swCollectionAndChangedChunks.getValue() - .changedChunks.back() - .getVersion() - .epoch() != maxLoaderVersion.epoch()) || - (swCollectionAndChangedChunks.getValue().changedChunks.back().getVersion() > - maxLoaderVersion)) { + auto remoteRefreshCallbackFn = + [this, nss, catalogCacheSinceVersion, maxLoaderVersion, termScheduled, callbackFn, notify]( + OperationContext* opCtx, + StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks) { + + if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) { Status scheduleStatus = _scheduleTask( nss, Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); if (!scheduleStatus.isOK()) { @@ -473,36 +456,60 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( notify->set(); return; } - } - if (swCollectionAndChangedChunks.isOK()) { - log() << "Cache loader remotely refreshed for collection " << nss - << " from collection version " << maxLoaderVersion - << " and found collection version " - << swCollectionAndChangedChunks.getValue().changedChunks.back().getVersion(); - - // Metadata was found remotely -- otherwise would have received - // NamespaceNotFound rather than Status::OK(). Return metadata for CatalogCache - // that's GTE catalogCacheSinceVersion, from the loader's persisted and enqueued - // metadata. - - swCollectionAndChangedChunks = - _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion, termScheduled); - if (swCollectionAndChangedChunks.isOK()) { - // After finding metadata remotely, we must have found metadata locally. - invariant(!swCollectionAndChangedChunks.getValue().changedChunks.empty()); - } - } else { // NamespaceNotFound - invariant(swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound); log() << "Cache loader remotely refreshed for collection " << nss << " from version " << maxLoaderVersion << " and no metadata was found."; + } else if (swCollectionAndChangedChunks.isOK()) { + auto& collAndChunks = swCollectionAndChangedChunks.getValue(); + + if (collAndChunks.changedChunks.back().getVersion().epoch() != + collAndChunks.epoch) { + swCollectionAndChangedChunks = Status{ + ErrorCodes::ConflictingOperationInProgress, + str::stream() + << "Invalid chunks found when reloading '" + << nss.toString() + << "' Previous collection epoch was '" + << collAndChunks.epoch.toString() + << "', but found a new epoch '" + << collAndChunks.changedChunks.back().getVersion().epoch().toString() + << "'. Collection was dropped and recreated."}; + } else { + if ((collAndChunks.epoch != maxLoaderVersion.epoch()) || + (collAndChunks.changedChunks.back().getVersion() > maxLoaderVersion)) { + Status scheduleStatus = _scheduleTask( + nss, + Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); + if (!scheduleStatus.isOK()) { + callbackFn(opCtx, scheduleStatus); + notify->set(); + return; + } + } + + log() << "Cache loader remotely refreshed for collection " << nss + << " from collection version " << maxLoaderVersion + << " and found collection version " + << collAndChunks.changedChunks.back().getVersion(); + + // Metadata was found remotely -- otherwise would have received + // NamespaceNotFound rather than Status::OK(). Return metadata for CatalogCache + // that's GTE catalogCacheSinceVersion, from the loader's persisted and enqueued + // metadata. + + swCollectionAndChangedChunks = + _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion, termScheduled); + if (swCollectionAndChangedChunks.isOK()) { + // After finding metadata remotely, we must have found metadata locally. + invariant(!collAndChunks.changedChunks.empty()); + } + } } - } - // Complete the callbackFn work. - callbackFn(opCtx, std::move(swCollectionAndChangedChunks)); - notify->set(); - }; + // Complete the callbackFn work. + callbackFn(opCtx, std::move(swCollectionAndChangedChunks)); + notify->set(); + }; // Refresh the loader's metadata from the config server. The caller's request will // then be serviced from the loader's up-to-date metadata. @@ -712,32 +719,8 @@ void ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o return; } - ChunkVersion persistedMaxVersion = getPersistedMaxVersion(opCtx, nss); - - // If the epoch of the update task does not match the persisted metadata, the persisted metadata - // -- from an old collection that was recreated -- must be cleared before applying the changes. - if (persistedMaxVersion.isSet() && - persistedMaxVersion.epoch() != task.maxQueryVersion.epoch()) { - Status status = dropChunksAndDeleteCollectionsEntry(opCtx, nss); - uassert(status.code(), - str::stream() << "Failed to clear persisted chunk metadata for collection '" - << nss.ns() - << "' due to '" - << status.reason() - << "'. Will be retried.", - status.isOK()); - } - Status status = persistCollectionAndChangedChunks(opCtx, nss, task.collectionAndChangedChunks.get()); - if (status == ErrorCodes::ConflictingOperationInProgress) { - // A new epoch was discovered while updating the persisted metadata. The getChunksSince - // which enqueued this task would have discovered that independently and also returned - // ConflictingOperationInProgress to the catalog cache, which means that the next enqueued - // task should have the new epoch, which in turn means that on the next invocation, the - // old collection entry will be dropped and recreated. - return; - } uassert(status.code(), str::stream() << "Failed to update the persisted chunk metadata for collection '" @@ -884,11 +867,6 @@ CollectionAndChangedChunks ShardServerCatalogCacheLoader::TaskList::getEnqueuedM collAndChunks = task.collectionAndChangedChunks.get(); } else { // Epochs match, so the new results should be appended. - // - // Note: it's okay if the new chunks change to a new version epoch in the middle of - // the chunks vector. This will be either reset by the next task with a total reload - // with a new epoch, or cause the original getChunksSince caller to throw out the - // results and refresh again. // Make sure we do not append a duplicate chunk. The diff query is GTE, so there can // be duplicates of the same exact versioned chunk across tasks. This is no problem |