summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/config
diff options
context:
space:
mode:
authorMartin Neupauer <martin.neupauer@mongodb.com>2018-04-10 10:57:18 -0400
committerMartin Neupauer <martin.neupauer@mongodb.com>2018-04-13 12:53:11 -0400
commitfde45e48ef4a86fb19604843fdee272a22fcd475 (patch)
tree5902e57d0275a66fdafe65a3e15ad0b34d1ee640 /src/mongo/db/s/config
parentb1e0f61e6d78c56248997c26f505b4519c84833f (diff)
downloadmongo-fde45e48ef4a86fb19604843fdee272a22fcd475.tar.gz
SERVER-33781 upgrade/downgrade config.chunks metadata with a new history field.
Diffstat (limited to 'src/mongo/db/s/config')
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h33
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp318
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp10
3 files changed, 306 insertions, 55 deletions
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index a41c1c2ce0c..c2fe589252b 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -353,6 +353,25 @@ public:
*/
static void clearForTests(ServiceContext* serviceContext);
+ //
+ // Upgrade/downgrade
+ //
+
+ /**
+ * Upgrade the chunk metadata to include the history field.
+ */
+ Status upgradeChunksHistory(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const OID& collectionEpoch,
+ const Timestamp validAfter);
+
+ /**
+ * Remove the history field from the chunk metadata.
+ */
+ Status downgradeChunksHistory(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const OID& collectionEpoch);
+
private:
/**
* Performs the necessary checks for version compatibility and creates a new config.version
@@ -455,6 +474,20 @@ private:
const std::vector<BSONObj>& initPoints,
const bool distributeInitialChunks);
+ /**
+ * Retrieve the full chunk description from the config.
+ */
+ StatusWith<ChunkType> _findChunkOnConfig(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& key);
+
+ /**
+ * Retrieve the the latest collection version from the config.
+ */
+ StatusWith<ChunkVersion> _findCollectionVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const OID& collectionEpoch);
+
// The owning service context
ServiceContext* const _serviceContext;
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index 5f32fca980a..e4a73bc6d54 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -88,12 +88,13 @@ BSONArray buildMergeChunksTransactionUpdates(const std::vector<ChunkType>& chunk
// fill in additional details for sending through transaction
mergedChunk.setVersion(mergeVersion);
- // Clear the chunk history
- std::vector<ChunkHistory> history;
- if (validAfter) {
- history.emplace_back(ChunkHistory(validAfter.get(), mergedChunk.getShard()));
+ // FCV 3.6 does not have the history field in the persisted metadata
+ if (serverGlobalParams.featureCompatibility.getVersion() >=
+ ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) {
+
+ invariant(validAfter);
+ mergedChunk.setHistory({ChunkHistory(validAfter.get(), mergedChunk.getShard())});
}
- mergedChunk.setHistory(std::move(history));
// add the new chunk information as the update object
op.append("o", mergedChunk.toConfigBSON());
@@ -286,27 +287,7 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx,
}
// Find the chunk history.
- auto findHistory = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kLocalReadConcern,
- ChunkType::ConfigNS,
- BSON(ChunkType::name << ChunkType::genID(nss, range.getMin())),
- BSONObj(),
- 1);
- if (!findHistory.isOK()) {
- return findHistory.getStatus();
- }
-
- const auto origChunks = std::move(findHistory.getValue().docs);
- if (origChunks.size() != 1) {
- return {ErrorCodes::IncompatibleShardingMetadata,
- str::stream() << "Tried to find the chunk history for '"
- << ChunkType::genID(nss, range.getMin())
- << ", but found no chunks"};
- }
-
- const auto origChunk = ChunkType::fromConfigBSON(origChunks.front());
+ const auto origChunk = _findChunkOnConfig(opCtx, nss, range.getMin());
if (!origChunk.isOK()) {
return origChunk.getStatus();
}
@@ -372,7 +353,13 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx,
n.append(ChunkType::min(), startKey);
n.append(ChunkType::max(), endKey);
n.append(ChunkType::shard(), shardName);
- origChunk.getValue().addHistoryToBSON(n);
+
+ // FCV 3.6 does not have the history field in the persisted metadata
+ if (serverGlobalParams.featureCompatibility.getVersion() >=
+ ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) {
+ origChunk.getValue().addHistoryToBSON(n);
+ }
+
n.done();
// add the chunk's _id as the query part of the update statement
@@ -479,6 +466,11 @@ Status ShardingCatalogManager::commitChunkMerge(OperationContext* opCtx,
// move chunks on different collections to proceed in parallel
Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
+ if (serverGlobalParams.featureCompatibility.getVersion() >=
+ ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40 &&
+ !validAfter) {
+ return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"};
+ }
// Get the chunk with the highest version for this namespace
auto findStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
opCtx,
@@ -597,6 +589,12 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
// (Note: This is not needed while we have a global lock, taken here only for consistency.)
Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
+ if (serverGlobalParams.featureCompatibility.getVersion() >=
+ ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40 &&
+ !validAfter) {
+ return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"};
+ }
+
// Must use local read concern because we will perform subsequent writes.
auto findResponse =
configShard->exhaustiveFindOnConfig(opCtx,
@@ -664,27 +662,7 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
}
// Find the chunk history.
- auto findHistory = configShard->exhaustiveFindOnConfig(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kLocalReadConcern,
- ChunkType::ConfigNS,
- BSON(ChunkType::name << ChunkType::genID(nss, migratedChunk.getMin())),
- BSONObj(),
- 1);
- if (!findHistory.isOK()) {
- return findHistory.getStatus();
- }
-
- const auto origChunks = std::move(findHistory.getValue().docs);
- if (origChunks.size() != 1) {
- return {ErrorCodes::IncompatibleShardingMetadata,
- str::stream() << "Tried to find the chunk history for '"
- << ChunkType::genID(nss, migratedChunk.getMin())
- << ", but found no chunks"};
- }
-
- const auto origChunk = ChunkType::fromConfigBSON(origChunks.front());
+ const auto origChunk = _findChunkOnConfig(opCtx, nss, migratedChunk.getMin());
if (!origChunk.isOK()) {
return origChunk.getStatus();
}
@@ -701,7 +679,10 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
const int kHistorySecs = 10;
// Update the history of the migrated chunk.
- if (validAfter) {
+ if (serverGlobalParams.featureCompatibility.getVersion() >=
+ ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) {
+ invariant(validAfter);
+
// Drop the history that is too old (10 seconds of history for now).
// TODO SERVER-33831 to update the old history removal policy.
while (!newHistory.empty() &&
@@ -721,7 +702,7 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
}
newHistory.emplace(newHistory.begin(), ChunkHistory(validAfter.get(), toShard));
} else {
- // TODO: SERVER-33781 FCV 3.6 should not have any history
+ // FCV 3.6 does not have the history field in the persisted metadata
newHistory.clear();
}
newMigratedChunk.setHistory(std::move(newHistory));
@@ -729,9 +710,22 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
// Control chunk's minor version will be 1 (if control chunk is present).
boost::optional<ChunkType> newControlChunk = boost::none;
if (controlChunk) {
- newControlChunk = controlChunk.get();
+ // Find the chunk history.
+ const auto origControlChunk = _findChunkOnConfig(opCtx, nss, controlChunk->getMin());
+ if (!origControlChunk.isOK()) {
+ return origControlChunk.getStatus();
+ }
+
+ newControlChunk = origControlChunk.getValue();
newControlChunk->setVersion(ChunkVersion(
currentCollectionVersion.majorVersion() + 1, 1, currentCollectionVersion.epoch()));
+
+ // Copy the history of the control chunk.
+ if (serverGlobalParams.featureCompatibility.getVersion() <
+ ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) {
+ // FCV 3.6 does not have the history field in the persisted metadata
+ newControlChunk->setHistory({});
+ }
}
auto command = makeCommitChunkTransactionCommand(
@@ -762,4 +756,226 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
return result.obj();
}
+StatusWith<ChunkType> ShardingCatalogManager::_findChunkOnConfig(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& key) {
+ auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+
+ auto findResponse =
+ configShard->exhaustiveFindOnConfig(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON(ChunkType::name << ChunkType::genID(nss, key)),
+ BSONObj(),
+ 1);
+
+ if (!findResponse.isOK()) {
+ return findResponse.getStatus();
+ }
+
+ const auto origChunks = std::move(findResponse.getValue().docs);
+ if (origChunks.size() != 1) {
+ return {ErrorCodes::IncompatibleShardingMetadata,
+ str::stream() << "Tried to find the chunk for '" << ChunkType::genID(nss, key)
+ << ", but found no chunks"};
+ }
+
+ return ChunkType::fromConfigBSON(origChunks.front());
+}
+
+Status ShardingCatalogManager::upgradeChunksHistory(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const OID& collectionEpoch,
+ const Timestamp validAfter) {
+ auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ auto const catalogClient = Grid::get(opCtx)->catalogClient();
+
+ // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
+ // migrations.
+ Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
+
+ auto findResponse =
+ configShard->exhaustiveFindOnConfig(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON("ns" << nss.ns()),
+ BSONObj(),
+ boost::none);
+ if (!findResponse.isOK()) {
+ return findResponse.getStatus();
+ }
+
+ const auto chunksVector = std::move(findResponse.getValue().docs);
+ if (chunksVector.empty()) {
+ return {ErrorCodes::IncompatibleShardingMetadata,
+ str::stream() << "Tried to find chunks for collection '" << nss.ns()
+ << ", but found no chunks"};
+ }
+
+ const auto currentCollectionVersion = _findCollectionVersion(opCtx, nss, collectionEpoch);
+ if (!currentCollectionVersion.isOK()) {
+ return currentCollectionVersion.getStatus();
+ }
+
+ // Bump the version.
+ auto newCollectionVersion = ChunkVersion(currentCollectionVersion.getValue().majorVersion() + 1,
+ 0,
+ currentCollectionVersion.getValue().epoch());
+
+ for (const auto& chunk : chunksVector) {
+ auto swChunk = ChunkType::fromConfigBSON(chunk);
+ if (!swChunk.isOK()) {
+ return swChunk.getStatus();
+ }
+ auto& upgradeChunk = swChunk.getValue();
+
+ if (upgradeChunk.getHistory().empty()) {
+
+ // Bump the version.
+ upgradeChunk.setVersion(newCollectionVersion);
+ newCollectionVersion.incMajor();
+
+ // Construct the fresh history.
+ upgradeChunk.setHistory({ChunkHistory{validAfter, upgradeChunk.getShard()}});
+
+ // Run the update.
+ auto status =
+ catalogClient->updateConfigDocument(opCtx,
+ ChunkType::ConfigNS,
+ BSON(ChunkType::name(upgradeChunk.getName())),
+ upgradeChunk.toConfigBSON(),
+ false,
+ ShardingCatalogClient::kLocalWriteConcern);
+
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+ }
+ }
+
+ return Status::OK();
+}
+
+Status ShardingCatalogManager::downgradeChunksHistory(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const OID& collectionEpoch) {
+ auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ auto const catalogClient = Grid::get(opCtx)->catalogClient();
+
+ // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
+ // migrations.
+ //
+ Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
+
+ auto findResponse =
+ configShard->exhaustiveFindOnConfig(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON("ns" << nss.ns()),
+ BSONObj(),
+ boost::none);
+ if (!findResponse.isOK()) {
+ return findResponse.getStatus();
+ }
+
+ const auto chunksVector = std::move(findResponse.getValue().docs);
+ if (chunksVector.empty()) {
+ return {ErrorCodes::IncompatibleShardingMetadata,
+ str::stream() << "Tried to find chunks for collection '" << nss.ns()
+ << ", but found no chunks"};
+ }
+
+ const auto currentCollectionVersion = _findCollectionVersion(opCtx, nss, collectionEpoch);
+ if (!currentCollectionVersion.isOK()) {
+ return currentCollectionVersion.getStatus();
+ }
+
+ // Bump the version.
+ auto newCollectionVersion = ChunkVersion(currentCollectionVersion.getValue().majorVersion() + 1,
+ 0,
+ currentCollectionVersion.getValue().epoch());
+
+ for (const auto& chunk : chunksVector) {
+ auto swChunk = ChunkType::fromConfigBSON(chunk);
+ if (!swChunk.isOK()) {
+ return swChunk.getStatus();
+ }
+ auto& downgradeChunk = swChunk.getValue();
+
+ // Bump the version.
+ downgradeChunk.setVersion(newCollectionVersion);
+ newCollectionVersion.incMajor();
+
+ // Clear the history.
+ downgradeChunk.setHistory({});
+
+ // Run the update.
+ auto status =
+ catalogClient->updateConfigDocument(opCtx,
+ ChunkType::ConfigNS,
+ BSON(ChunkType::name(downgradeChunk.getName())),
+ downgradeChunk.toConfigBSON(),
+ false,
+ ShardingCatalogClient::kLocalWriteConcern);
+
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+ }
+
+ return Status::OK();
+}
+
+StatusWith<ChunkVersion> ShardingCatalogManager::_findCollectionVersion(
+ OperationContext* opCtx, const NamespaceString& nss, const OID& collectionEpoch) {
+ auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+
+ // Must use local read concern because we will perform subsequent writes.
+ auto findResponse =
+ configShard->exhaustiveFindOnConfig(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON("ns" << nss.ns()),
+ BSON(ChunkType::lastmod << -1),
+ 1);
+ if (!findResponse.isOK()) {
+ return findResponse.getStatus();
+ }
+
+ const auto chunksVector = std::move(findResponse.getValue().docs);
+ if (chunksVector.empty()) {
+ return {ErrorCodes::IncompatibleShardingMetadata,
+ str::stream() << "Tried to find max chunk version for collection '" << nss.ns()
+ << ", but found no chunks"};
+ }
+
+ const auto swChunk = ChunkType::fromConfigBSON(chunksVector.front());
+ if (!swChunk.isOK()) {
+ return swChunk.getStatus();
+ }
+
+ const auto currentCollectionVersion = swChunk.getValue().getVersion();
+
+ // It is possible for a migration to end up running partly without the protection of the
+ // distributed lock if the config primary stepped down since the start of the migration and
+ // failed to recover the migration. Check that the collection has not been dropped and recreated
+ // since the migration began, unbeknown to the shard when the command was sent.
+ if (currentCollectionVersion.epoch() != collectionEpoch) {
+ return {ErrorCodes::StaleEpoch,
+ str::stream() << "The collection '" << nss.ns()
+ << "' has been dropped and recreated since the migration began."
+ " The config server's collection version epoch is now '"
+ << currentCollectionVersion.epoch().toString()
+ << "', but the shard's is "
+ << collectionEpoch.toString()
+ << "'."};
+ }
+
+ return currentCollectionVersion;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
index 73ee3e1c938..59d2511d671 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
@@ -293,10 +293,12 @@ ChunkVersion ShardingCatalogManager::_createFirstChunks(OperationContext* opCtx,
chunk.setMax(max);
chunk.setShard(shardIds[i % shardIds.size()]);
chunk.setVersion(version);
- // TODO SERVER-33781 write history only when FCV4.0 config.
- std::vector<ChunkHistory> initialHistory;
- initialHistory.emplace_back(ChunkHistory(validAfter, shardIds[i % shardIds.size()]));
- chunk.setHistory(std::move(initialHistory));
+ if (serverGlobalParams.featureCompatibility.getVersion() >=
+ ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) {
+ std::vector<ChunkHistory> initialHistory;
+ initialHistory.emplace_back(ChunkHistory(validAfter, shardIds[i % shardIds.size()]));
+ chunk.setHistory(std::move(initialHistory));
+ }
uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument(
opCtx,