summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@10gen.com>2017-08-08 18:11:34 -0400
committerDianna Hohensee <dianna.hohensee@10gen.com>2017-08-10 22:27:49 -0400
commit2198d955d83787d12a439369895e9868288a0604 (patch)
tree1941c9cb556c30fb5bf01acc275886b9300e5c08 /src/mongo/db
parent00f3af94a4d5125b48c57ee75d494712b6737c35 (diff)
downloadmongo-2198d955d83787d12a439369895e9868288a0604.tar.gz
SERVER-30437 Stop the shard chunk loader from scheduling persistence tasks with mixed version chunks
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/shard_metadata_util.cpp12
-rw-r--r--src/mongo/db/s/shard_metadata_util_test.cpp16
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp132
3 files changed, 56 insertions, 104 deletions
diff --git a/src/mongo/db/s/shard_metadata_util.cpp b/src/mongo/db/s/shard_metadata_util.cpp
index c0627e2c48f..9e076435a4c 100644
--- a/src/mongo/db/s/shard_metadata_util.cpp
+++ b/src/mongo/db/s/shard_metadata_util.cpp
@@ -290,17 +290,7 @@ Status updateShardChunks(OperationContext* opCtx,
*
*/
for (auto& chunk : chunks) {
- // Check for a different epoch.
- if (!chunk.getVersion().hasEqualEpoch(currEpoch)) {
- return Status{ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Invalid chunks found when reloading '"
- << nss.toString()
- << "'. Previous collection epoch was '"
- << currEpoch.toString()
- << "', but unexpectedly found a new epoch '"
- << chunk.getVersion().epoch().toString()
- << "'. Collection was dropped and recreated."};
- }
+ invariant(chunk.getVersion().hasEqualEpoch(currEpoch));
// Delete any overlapping chunk ranges. Overlapping chunks will have a min value
// ("_id") between (chunk.min, chunk.max].
diff --git a/src/mongo/db/s/shard_metadata_util_test.cpp b/src/mongo/db/s/shard_metadata_util_test.cpp
index d242684034d..fa30bda71af 100644
--- a/src/mongo/db/s/shard_metadata_util_test.cpp
+++ b/src/mongo/db/s/shard_metadata_util_test.cpp
@@ -273,22 +273,6 @@ TEST_F(ShardMetadataUtilTest, WriteAndReadChunks) {
ASSERT_BSONOBJ_EQ(chunks.back().toShardBSON(), readChunks.front().toShardBSON());
}
-TEST_F(ShardMetadataUtilTest, UpdatingChunksFindsNewEpoch) {
- std::vector<ChunkType> chunks = makeFourChunks();
- ASSERT_OK(updateShardChunks(operationContext(), kNss, chunks, getCollectionVersion().epoch()));
- checkChunks(kChunkMetadataNss, chunks);
-
- ChunkVersion originalChunkVersion = chunks.back().getVersion();
- chunks.back().setVersion(ChunkVersion(1, 0, OID::gen()));
- ASSERT_EQUALS(
- updateShardChunks(operationContext(), kNss, chunks, getCollectionVersion().epoch()).code(),
- ErrorCodes::ConflictingOperationInProgress);
-
- // Check that the chunk with a different epoch did not get written.
- chunks.back().setVersion(std::move(originalChunkVersion));
- checkChunks(kChunkMetadataNss, chunks);
-}
-
TEST_F(ShardMetadataUtilTest, UpdateWithWriteNewChunks) {
// Load some chunk metadata.
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index e935418432a..4ac73533bde 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -443,29 +443,12 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
return getPersistedMaxVersion(opCtx, nss);
}();
- auto remoteRefreshCallbackFn = [this,
- nss,
- catalogCacheSinceVersion,
- maxLoaderVersion,
- termScheduled,
- callbackFn,
- notify](
- OperationContext* opCtx,
- StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks) {
-
- if (!swCollectionAndChangedChunks.isOK() &&
- swCollectionAndChangedChunks != ErrorCodes::NamespaceNotFound) {
- // No updates to apply. Do nothing.
- } else {
- // Enqueue a Task to apply the update retrieved from the config server, if new data was
- // retrieved.
- if (!swCollectionAndChangedChunks.isOK() ||
- (swCollectionAndChangedChunks.getValue()
- .changedChunks.back()
- .getVersion()
- .epoch() != maxLoaderVersion.epoch()) ||
- (swCollectionAndChangedChunks.getValue().changedChunks.back().getVersion() >
- maxLoaderVersion)) {
+ auto remoteRefreshCallbackFn =
+ [this, nss, catalogCacheSinceVersion, maxLoaderVersion, termScheduled, callbackFn, notify](
+ OperationContext* opCtx,
+ StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks) {
+
+ if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) {
Status scheduleStatus = _scheduleTask(
nss, Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
if (!scheduleStatus.isOK()) {
@@ -473,36 +456,60 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
notify->set();
return;
}
- }
- if (swCollectionAndChangedChunks.isOK()) {
- log() << "Cache loader remotely refreshed for collection " << nss
- << " from collection version " << maxLoaderVersion
- << " and found collection version "
- << swCollectionAndChangedChunks.getValue().changedChunks.back().getVersion();
-
- // Metadata was found remotely -- otherwise would have received
- // NamespaceNotFound rather than Status::OK(). Return metadata for CatalogCache
- // that's GTE catalogCacheSinceVersion, from the loader's persisted and enqueued
- // metadata.
-
- swCollectionAndChangedChunks =
- _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion, termScheduled);
- if (swCollectionAndChangedChunks.isOK()) {
- // After finding metadata remotely, we must have found metadata locally.
- invariant(!swCollectionAndChangedChunks.getValue().changedChunks.empty());
- }
- } else { // NamespaceNotFound
- invariant(swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound);
log() << "Cache loader remotely refreshed for collection " << nss
<< " from version " << maxLoaderVersion << " and no metadata was found.";
+ } else if (swCollectionAndChangedChunks.isOK()) {
+ auto& collAndChunks = swCollectionAndChangedChunks.getValue();
+
+ if (collAndChunks.changedChunks.back().getVersion().epoch() !=
+ collAndChunks.epoch) {
+ swCollectionAndChangedChunks = Status{
+ ErrorCodes::ConflictingOperationInProgress,
+ str::stream()
+ << "Invalid chunks found when reloading '"
+ << nss.toString()
+ << "' Previous collection epoch was '"
+ << collAndChunks.epoch.toString()
+ << "', but found a new epoch '"
+ << collAndChunks.changedChunks.back().getVersion().epoch().toString()
+ << "'. Collection was dropped and recreated."};
+ } else {
+ if ((collAndChunks.epoch != maxLoaderVersion.epoch()) ||
+ (collAndChunks.changedChunks.back().getVersion() > maxLoaderVersion)) {
+ Status scheduleStatus = _scheduleTask(
+ nss,
+ Task{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
+ if (!scheduleStatus.isOK()) {
+ callbackFn(opCtx, scheduleStatus);
+ notify->set();
+ return;
+ }
+ }
+
+ log() << "Cache loader remotely refreshed for collection " << nss
+ << " from collection version " << maxLoaderVersion
+ << " and found collection version "
+ << collAndChunks.changedChunks.back().getVersion();
+
+ // Metadata was found remotely -- otherwise would have received
+ // NamespaceNotFound rather than Status::OK(). Return metadata for CatalogCache
+ // that's GTE catalogCacheSinceVersion, from the loader's persisted and enqueued
+ // metadata.
+
+ swCollectionAndChangedChunks =
+ _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion, termScheduled);
+ if (swCollectionAndChangedChunks.isOK()) {
+ // After finding metadata remotely, we must have found metadata locally.
+ invariant(!collAndChunks.changedChunks.empty());
+ }
+ }
}
- }
- // Complete the callbackFn work.
- callbackFn(opCtx, std::move(swCollectionAndChangedChunks));
- notify->set();
- };
+ // Complete the callbackFn work.
+ callbackFn(opCtx, std::move(swCollectionAndChangedChunks));
+ notify->set();
+ };
// Refresh the loader's metadata from the config server. The caller's request will
// then be serviced from the loader's up-to-date metadata.
@@ -712,32 +719,8 @@ void ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* o
return;
}
- ChunkVersion persistedMaxVersion = getPersistedMaxVersion(opCtx, nss);
-
- // If the epoch of the update task does not match the persisted metadata, the persisted metadata
- // -- from an old collection that was recreated -- must be cleared before applying the changes.
- if (persistedMaxVersion.isSet() &&
- persistedMaxVersion.epoch() != task.maxQueryVersion.epoch()) {
- Status status = dropChunksAndDeleteCollectionsEntry(opCtx, nss);
- uassert(status.code(),
- str::stream() << "Failed to clear persisted chunk metadata for collection '"
- << nss.ns()
- << "' due to '"
- << status.reason()
- << "'. Will be retried.",
- status.isOK());
- }
-
Status status =
persistCollectionAndChangedChunks(opCtx, nss, task.collectionAndChangedChunks.get());
- if (status == ErrorCodes::ConflictingOperationInProgress) {
- // A new epoch was discovered while updating the persisted metadata. The getChunksSince
- // which enqueued this task would have discovered that independently and also returned
- // ConflictingOperationInProgress to the catalog cache, which means that the next enqueued
- // task should have the new epoch, which in turn means that on the next invocation, the
- // old collection entry will be dropped and recreated.
- return;
- }
uassert(status.code(),
str::stream() << "Failed to update the persisted chunk metadata for collection '"
@@ -884,11 +867,6 @@ CollectionAndChangedChunks ShardServerCatalogCacheLoader::TaskList::getEnqueuedM
collAndChunks = task.collectionAndChangedChunks.get();
} else {
// Epochs match, so the new results should be appended.
- //
- // Note: it's okay if the new chunks change to a new version epoch in the middle of
- // the chunks vector. This will be either reset by the next task with a total reload
- // with a new epoch, or cause the original getChunksSince caller to throw out the
- // results and refresh again.
// Make sure we do not append a duplicate chunk. The diff query is GTE, so there can
// be duplicates of the same exact versioned chunk across tasks. This is no problem