diff options
Diffstat (limited to 'src')
22 files changed, 548 insertions, 43 deletions
diff --git a/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp b/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp index 28119fe682e..5756821d11e 100644 --- a/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp +++ b/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp @@ -134,7 +134,8 @@ public: commitRequest.getControlChunk(), commitRequest.getCollectionEpoch(), commitRequest.getFromShard(), - commitRequest.getToShard()); + commitRequest.getToShard(), + commitRequest.getValidAfter()); if (!response.isOK()) { return CommandHelpers::appendCommandStatus(result, response.getStatus()); } diff --git a/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp b/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp index 74d11ef2eb5..91c7fe9de31 100644 --- a/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp +++ b/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp @@ -114,7 +114,8 @@ public: parsedRequest.getNamespace(), parsedRequest.getEpoch(), parsedRequest.getChunkBoundaries(), - parsedRequest.getShardName()); + parsedRequest.getShardName(), + parsedRequest.getValidAfter()); if (!mergeChunkResult.isOK()) { return CommandHelpers::appendCommandStatus(result, mergeChunkResult); diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index 786b862f7ee..a244f701133 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -171,15 +171,20 @@ public: /** * Updates metadata in the config.chunks collection so the chunks with given boundaries are seen * merged into a single larger chunk. + * If 'validAfter' is not set, this means the commit request came from an older server version, + * which is not history-aware. */ Status commitChunkMerge(OperationContext* opCtx, const NamespaceString& nss, const OID& requestEpoch, const std::vector<BSONObj>& chunkBoundaries, - const std::string& shardName); + const std::string& shardName, + const boost::optional<Timestamp>& validAfter); /** * Updates metadata in config.chunks collection to show the given chunk in its new shard. + * If 'validAfter' is not set, this means the commit request came from an older server version, + * which is not history-aware. */ StatusWith<BSONObj> commitChunkMigration(OperationContext* opCtx, const NamespaceString& nss, @@ -187,7 +192,8 @@ public: const boost::optional<ChunkType>& controlChunk, const OID& collectionEpoch, const ShardId& fromShard, - const ShardId& toShard); + const ShardId& toShard, + const boost::optional<Timestamp>& validAfter); // // Database Operations diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index ea34e097844..c4fc7ca563e 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -70,7 +70,8 @@ void appendShortVersion(BufBuilder* b, const ChunkType& chunk) { } BSONArray buildMergeChunksTransactionUpdates(const std::vector<ChunkType>& chunksToMerge, - const ChunkVersion& mergeVersion) { + const ChunkVersion& mergeVersion, + const boost::optional<Timestamp>& validAfter) { BSONArrayBuilder updates; // Build an update operation to expand the first chunk into the newly merged chunk @@ -87,6 +88,13 @@ BSONArray buildMergeChunksTransactionUpdates(const std::vector<ChunkType>& chunk // fill in additional details for sending through transaction mergedChunk.setVersion(mergeVersion); + // Clear the chunk history + std::vector<ChunkHistory> history; + if (validAfter) { + history.emplace_back(ChunkHistory(validAfter.get(), mergedChunk.getShard())); + } + mergedChunk.setHistory(std::move(history)); + // add the new chunk information as the update object op.append("o", mergedChunk.toConfigBSON()); @@ -188,6 +196,7 @@ BSONObj makeCommitChunkTransactionCommand(const NamespaceString& nss, n.append(ChunkType::min(), migratedChunk.getMin()); n.append(ChunkType::max(), migratedChunk.getMax()); n.append(ChunkType::shard(), toShard); + migratedChunk.addHistoryToBSON(n); n.done(); BSONObjBuilder q(op.subobjStart("o2")); @@ -275,6 +284,32 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx, return {ErrorCodes::StaleEpoch, errmsg}; } + // Find the chunk history. + auto findHistory = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + BSON(ChunkType::name << ChunkType::genID(nss, range.getMin())), + BSONObj(), + 1); + if (!findHistory.isOK()) { + return findHistory.getStatus(); + } + + const auto origChunks = std::move(findHistory.getValue().docs); + if (origChunks.size() != 1) { + return {ErrorCodes::IncompatibleShardingMetadata, + str::stream() << "Tried to find the chunk history for '" + << ChunkType::genID(nss, range.getMin()) + << ", but found no chunks"}; + } + + const auto origChunk = ChunkType::fromConfigBSON(origChunks.front()); + if (!origChunk.isOK()) { + return origChunk.getStatus(); + } + std::vector<ChunkType> newChunks; ChunkVersion currentMaxVersion = collVersion; @@ -336,6 +371,7 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx, n.append(ChunkType::min(), startKey); n.append(ChunkType::max(), endKey); n.append(ChunkType::shard(), shardName); + origChunk.getValue().addHistoryToBSON(n); n.done(); // add the chunk's _id as the query part of the update statement @@ -431,7 +467,8 @@ Status ShardingCatalogManager::commitChunkMerge(OperationContext* opCtx, const NamespaceString& nss, const OID& requestEpoch, const std::vector<BSONObj>& chunkBoundaries, - const std::string& shardName) { + const std::string& shardName, + const boost::optional<Timestamp>& validAfter) { // This method must never be called with empty chunks to merge invariant(!chunkBoundaries.empty()); @@ -500,7 +537,7 @@ Status ShardingCatalogManager::commitChunkMerge(OperationContext* opCtx, ChunkVersion mergeVersion = collVersion; mergeVersion.incMinor(); - auto updates = buildMergeChunksTransactionUpdates(chunksToMerge, mergeVersion); + auto updates = buildMergeChunksTransactionUpdates(chunksToMerge, mergeVersion, validAfter); auto preCond = buildMergeChunksTransactionPrecond(chunksToMerge, collVersion); // apply the batch of updates to local metadata @@ -542,7 +579,8 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( const boost::optional<ChunkType>& controlChunk, const OID& collectionEpoch, const ShardId& fromShard, - const ShardId& toShard) { + const ShardId& toShard, + const boost::optional<Timestamp>& validAfter) { auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); @@ -624,12 +662,65 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( } } + // Find the chunk history. + auto findHistory = configShard->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + BSON(ChunkType::name << ChunkType::genID(nss, migratedChunk.getMin())), + BSONObj(), + 1); + if (!findHistory.isOK()) { + return findHistory.getStatus(); + } + + const auto origChunks = std::move(findHistory.getValue().docs); + if (origChunks.size() != 1) { + return {ErrorCodes::IncompatibleShardingMetadata, + str::stream() << "Tried to find the chunk history for '" + << ChunkType::genID(nss, migratedChunk.getMin()) + << ", but found no chunks"}; + } + + const auto origChunk = ChunkType::fromConfigBSON(origChunks.front()); + if (!origChunk.isOK()) { + return origChunk.getStatus(); + } + // Generate the new versions of migratedChunk and controlChunk. Migrating chunk's minor version // will be 0. ChunkType newMigratedChunk = migratedChunk; newMigratedChunk.setVersion(ChunkVersion( currentCollectionVersion.majorVersion() + 1, 0, currentCollectionVersion.epoch())); + // Copy the complete history. + auto newHistory = origChunk.getValue().getHistory(); + const int kHistorySecs = 10; + + // Update the history of the migrated chunk. + if (validAfter) { + // Drop the history that is too old (10 seconds of history for now). + // TODO SERVER-33831 to update the old history removal policy. + while (!newHistory.empty() && + newHistory.back().getValidAfter().getSecs() + kHistorySecs < + validAfter.get().getSecs()) { + newHistory.pop_back(); + } + + if (!newHistory.empty() && newHistory.front().getValidAfter() >= validAfter.get()) { + return {ErrorCodes::IncompatibleShardingMetadata, + str::stream() << "The chunk history for '" + << ChunkType::genID(nss, migratedChunk.getMin()) + << " is corrupted. The last validAfter " + << newHistory.back().getValidAfter().toString() + << " is greater or equal to the new validAfter " + << validAfter.get().toString()}; + } + newHistory.emplace(newHistory.begin(), ChunkHistory(validAfter.get(), toShard)); + } + newMigratedChunk.setHistory(std::move(newHistory)); + // Control chunk's minor version will be 1 (if control chunk is present). boost::optional<ChunkType> newControlChunk = boost::none; if (controlChunk) { diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index e990e35f7dd..802faba9f97 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -44,6 +44,7 @@ #include "mongo/db/catalog/collection_options.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/collation/collator_factory_interface.h" @@ -274,6 +275,8 @@ ChunkVersion ShardingCatalogManager::_createFirstChunks(OperationContext* opCtx, log() << "going to create " << splitPoints.size() + 1 << " chunk(s) for: " << nss << " using new epoch " << version.epoch(); + const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp(); + for (unsigned i = 0; i <= splitPoints.size(); i++) { const BSONObj min = (i == 0) ? keyPattern.globalMin() : splitPoints[i - 1]; const BSONObj max = (i < splitPoints.size()) ? splitPoints[i] : keyPattern.globalMax(); @@ -290,6 +293,10 @@ ChunkVersion ShardingCatalogManager::_createFirstChunks(OperationContext* opCtx, chunk.setMax(max); chunk.setShard(shardIds[i % shardIds.size()]); chunk.setVersion(version); + // TODO SERVER-33781 write history only when FCV4.0 config. + std::vector<ChunkHistory> initialHistory; + initialHistory.emplace_back(ChunkHistory(validAfter, shardIds[i % shardIds.size()])); + chunk.setHistory(std::move(initialHistory)); uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument( opCtx, diff --git a/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp index 76340441d2c..dd1357e8b4a 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp @@ -64,6 +64,7 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandWithCtl) { chunk0.setNS(kNamespace); chunk0.setVersion(origVersion); chunk0.setShard(shard0.getName()); + chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))}); // apportion auto chunkMin = BSON("a" << 1); @@ -86,6 +87,8 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandWithCtl) { ChunkType const& chunk0cref = chunk0; ChunkType const& chunk1cref = chunk1; + Timestamp validAfter{101, 0}; + StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext()) ->commitChunkMigration(operationContext(), chunk0.getNS(), @@ -93,7 +96,8 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandWithCtl) { chunk1cref, origVersion.epoch(), ShardId(shard0.getName()), - ShardId(shard1.getName())); + ShardId(shard1.getName()), + validAfter); ASSERT_OK(resultBSON.getStatus()); @@ -111,6 +115,9 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandWithCtl) { auto chunkDoc0 = uassertStatusOK(getChunkDoc(operationContext(), chunkMin)); ASSERT_EQ("shard1", chunkDoc0.getShard().toString()); ASSERT_EQ(mver.getValue(), chunkDoc0.getVersion()); + // The history should be updated. + ASSERT_EQ(2UL, chunkDoc0.getHistory().size()); + ASSERT_EQ(validAfter, chunkDoc0.getHistory().front().getValidAfter()); auto chunkDoc1 = uassertStatusOK(getChunkDoc(operationContext(), chunkMax)); ASSERT_EQ("shard0", chunkDoc1.getShard().toString()); @@ -136,6 +143,7 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtl) { chunk0.setNS(kNamespace); chunk0.setVersion(origVersion); chunk0.setShard(shard0.getName()); + chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))}); // apportion auto chunkMin = BSON("a" << 1); @@ -145,6 +153,8 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtl) { setupChunks({chunk0}).transitional_ignore(); + Timestamp validAfter{101, 0}; + StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext()) ->commitChunkMigration(operationContext(), chunk0.getNS(), @@ -152,7 +162,8 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtl) { boost::none, origVersion.epoch(), ShardId(shard0.getName()), - ShardId(shard1.getName())); + ShardId(shard1.getName()), + validAfter); ASSERT_OK(resultBSON.getStatus()); @@ -169,6 +180,116 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtl) { auto chunkDoc0 = uassertStatusOK(getChunkDoc(operationContext(), chunkMin)); ASSERT_EQ("shard1", chunkDoc0.getShard().toString()); ASSERT_EQ(mver.getValue(), chunkDoc0.getVersion()); + // The history should be updated. + ASSERT_EQ(2UL, chunkDoc0.getHistory().size()); + ASSERT_EQ(validAfter, chunkDoc0.getHistory().front().getValidAfter()); +} + +TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtlTrimHistory) { + + ShardType shard0; + shard0.setName("shard0"); + shard0.setHost("shard0:12"); + + ShardType shard1; + shard1.setName("shard1"); + shard1.setHost("shard1:12"); + + setupShards({shard0, shard1}).transitional_ignore(); + + int origMajorVersion = 15; + auto const origVersion = ChunkVersion(origMajorVersion, 4, OID::gen()); + + ChunkType chunk0; + chunk0.setNS(kNamespace); + chunk0.setVersion(origVersion); + chunk0.setShard(shard0.getName()); + chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))}); + + // apportion + auto chunkMin = BSON("a" << 1); + chunk0.setMin(chunkMin); + auto chunkMax = BSON("a" << 10); + chunk0.setMax(chunkMax); + + setupChunks({chunk0}).transitional_ignore(); + + // Make the time distance between the last history element large enough. + Timestamp validAfter{200, 0}; + + StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext()) + ->commitChunkMigration(operationContext(), + chunk0.getNS(), + chunk0, + boost::none, + origVersion.epoch(), + ShardId(shard0.getName()), + ShardId(shard1.getName()), + validAfter); + + ASSERT_OK(resultBSON.getStatus()); + + // Verify the version returned matches expected value. + BSONObj versions = resultBSON.getValue(); + auto mver = ChunkVersion::parseFromBSONWithFieldForCommands(versions, "migratedChunkVersion"); + ASSERT_OK(mver.getStatus()); + ASSERT_EQ(ChunkVersion(origMajorVersion + 1, 0, origVersion.epoch()), mver.getValue()); + + auto cver = ChunkVersion::parseFromBSONWithFieldForCommands(versions, "controlChunkVersion"); + ASSERT_NOT_OK(cver.getStatus()); + + // Verify the chunk ended up in the right shard, and version matches the value returned. + auto chunkDoc0 = uassertStatusOK(getChunkDoc(operationContext(), chunkMin)); + ASSERT_EQ("shard1", chunkDoc0.getShard().toString()); + ASSERT_EQ(mver.getValue(), chunkDoc0.getVersion()); + // The history should be updated. + ASSERT_EQ(1UL, chunkDoc0.getHistory().size()); + ASSERT_EQ(validAfter, chunkDoc0.getHistory().front().getValidAfter()); +} + +TEST_F(CommitChunkMigrate, RejectOutOfOrderHistory) { + + ShardType shard0; + shard0.setName("shard0"); + shard0.setHost("shard0:12"); + + ShardType shard1; + shard1.setName("shard1"); + shard1.setHost("shard1:12"); + + setupShards({shard0, shard1}).transitional_ignore(); + + int origMajorVersion = 15; + auto const origVersion = ChunkVersion(origMajorVersion, 4, OID::gen()); + + ChunkType chunk0; + chunk0.setNS(kNamespace); + chunk0.setVersion(origVersion); + chunk0.setShard(shard0.getName()); + chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))}); + + // apportion + auto chunkMin = BSON("a" << 1); + chunk0.setMin(chunkMin); + auto chunkMax = BSON("a" << 10); + chunk0.setMax(chunkMax); + + setupChunks({chunk0}).transitional_ignore(); + + // Make the time before the last change to trigger the failure. + Timestamp validAfter{99, 0}; + + StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext()) + ->commitChunkMigration(operationContext(), + chunk0.getNS(), + chunk0, + boost::none, + origVersion.epoch(), + ShardId(shard0.getName()), + ShardId(shard1.getName()), + validAfter); + + ASSERT_EQ(ErrorCodes::IncompatibleShardingMetadata, resultBSON.getStatus()); } TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch0) { @@ -208,6 +329,8 @@ TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch0) { setupChunks({chunk0, chunk1}).transitional_ignore(); + Timestamp validAfter{1}; + StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext()) ->commitChunkMigration(operationContext(), chunk0.getNS(), @@ -215,7 +338,8 @@ TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch0) { chunk1, OID::gen(), ShardId(shard0.getName()), - ShardId(shard1.getName())); + ShardId(shard1.getName()), + validAfter); ASSERT_EQ(ErrorCodes::StaleEpoch, resultBSON.getStatus()); } @@ -259,6 +383,8 @@ TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch1) { // get version from the control chunk this time setupChunks({chunk1, chunk0}).transitional_ignore(); + Timestamp validAfter{1}; + StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext()) ->commitChunkMigration(operationContext(), chunk0.getNS(), @@ -266,7 +392,8 @@ TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch1) { chunk1, origVersion.epoch(), ShardId(shard0.getName()), - ShardId(shard1.getName())); + ShardId(shard1.getName()), + validAfter); ASSERT_EQ(ErrorCodes::StaleEpoch, resultBSON.getStatus()); } @@ -308,6 +435,8 @@ TEST_F(CommitChunkMigrate, RejectChunkMissing0) { setupChunks({chunk1}).transitional_ignore(); + Timestamp validAfter{1}; + StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext()) ->commitChunkMigration(operationContext(), chunk0.getNS(), @@ -315,7 +444,8 @@ TEST_F(CommitChunkMigrate, RejectChunkMissing0) { chunk1, origVersion.epoch(), ShardId(shard0.getName()), - ShardId(shard1.getName())); + ShardId(shard1.getName()), + validAfter); ASSERT_EQ(40165, resultBSON.getStatus().code()); } @@ -357,6 +487,8 @@ TEST_F(CommitChunkMigrate, RejectChunkMissing1) { setupChunks({chunk0}).transitional_ignore(); + Timestamp validAfter{1}; + StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext()) ->commitChunkMigration(operationContext(), chunk0.getNS(), @@ -364,7 +496,8 @@ TEST_F(CommitChunkMigrate, RejectChunkMissing1) { chunk1, origVersion.epoch(), ShardId(shard0.getName()), - ShardId(shard1.getName())); + ShardId(shard1.getName()), + validAfter); ASSERT_EQ(40165, resultBSON.getStatus().code()); } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp index 3d115ec0a80..fee827a420f 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp @@ -67,12 +67,15 @@ TEST_F(MergeChunkTest, MergeExistingChunksCorrectlyShouldSucceed) { setupChunks({chunk, chunk2}).transitional_ignore(); + Timestamp validAfter{100, 0}; + ASSERT_OK(ShardingCatalogManager::get(operationContext()) ->commitChunkMerge(operationContext(), NamespaceString("TestDB.TestColl"), origVersion.epoch(), chunkBoundaries, - "shard0000")); + "shard0000", + validAfter)); auto findResponse = uassertStatusOK( getConfigShard()->exhaustiveFindOnConfig(operationContext(), @@ -98,6 +101,10 @@ TEST_F(MergeChunkTest, MergeExistingChunksCorrectlyShouldSucceed) { ASSERT_EQ(origVersion.majorVersion(), mergedChunk.getVersion().majorVersion()); ASSERT_EQ(origVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion()); } + + // Make sure history is there + ASSERT_EQ(1UL, mergedChunk.getHistory().size()); + ASSERT_EQ(validAfter, mergedChunk.getHistory().front().getValidAfter()); } TEST_F(MergeChunkTest, MergeSeveralChunksCorrectlyShouldSucceed) { @@ -131,12 +138,15 @@ TEST_F(MergeChunkTest, MergeSeveralChunksCorrectlyShouldSucceed) { setupChunks({chunk, chunk2, chunk3}).transitional_ignore(); + Timestamp validAfter{100, 0}; + ASSERT_OK(ShardingCatalogManager::get(operationContext()) ->commitChunkMerge(operationContext(), NamespaceString("TestDB.TestColl"), origVersion.epoch(), chunkBoundaries, - "shard0000")); + "shard0000", + validAfter)); auto findResponse = uassertStatusOK( getConfigShard()->exhaustiveFindOnConfig(operationContext(), @@ -162,6 +172,10 @@ TEST_F(MergeChunkTest, MergeSeveralChunksCorrectlyShouldSucceed) { ASSERT_EQ(origVersion.majorVersion(), mergedChunk.getVersion().majorVersion()); ASSERT_EQ(origVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion()); } + + // Make sure history is there + ASSERT_EQ(1UL, mergedChunk.getHistory().size()); + ASSERT_EQ(validAfter, mergedChunk.getHistory().front().getValidAfter()); } TEST_F(MergeChunkTest, NewMergeShouldClaimHighestVersion) { @@ -199,12 +213,15 @@ TEST_F(MergeChunkTest, NewMergeShouldClaimHighestVersion) { setupChunks({chunk, chunk2, otherChunk}).transitional_ignore(); + Timestamp validAfter{100, 0}; + ASSERT_OK(ShardingCatalogManager::get(operationContext()) ->commitChunkMerge(operationContext(), NamespaceString("TestDB.TestColl"), collEpoch, chunkBoundaries, - "shard0000")); + "shard0000", + validAfter)); auto findResponse = uassertStatusOK( getConfigShard()->exhaustiveFindOnConfig(operationContext(), @@ -230,6 +247,10 @@ TEST_F(MergeChunkTest, NewMergeShouldClaimHighestVersion) { ASSERT_EQ(competingVersion.majorVersion(), mergedChunk.getVersion().majorVersion()); ASSERT_EQ(competingVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion()); } + + // Make sure history is there + ASSERT_EQ(1UL, mergedChunk.getHistory().size()); + ASSERT_EQ(validAfter, mergedChunk.getHistory().front().getValidAfter()); } TEST_F(MergeChunkTest, MergeLeavesOtherChunksAlone) { @@ -263,12 +284,15 @@ TEST_F(MergeChunkTest, MergeLeavesOtherChunksAlone) { setupChunks({chunk, chunk2, otherChunk}).transitional_ignore(); + Timestamp validAfter{1}; + ASSERT_OK(ShardingCatalogManager::get(operationContext()) ->commitChunkMerge(operationContext(), NamespaceString("TestDB.TestColl"), origVersion.epoch(), chunkBoundaries, - "shard0000")); + "shard0000", + validAfter)); auto findResponse = uassertStatusOK( getConfigShard()->exhaustiveFindOnConfig(operationContext(), @@ -326,12 +350,15 @@ TEST_F(MergeChunkTest, NonExistingNamespace) { setupChunks({chunk, chunk2}).transitional_ignore(); + Timestamp validAfter{1}; + auto mergeStatus = ShardingCatalogManager::get(operationContext()) ->commitChunkMerge(operationContext(), NamespaceString("TestDB.NonExistingColl"), origVersion.epoch(), chunkBoundaries, - "shard0000"); + "shard0000", + validAfter); ASSERT_EQ(ErrorCodes::IllegalOperation, mergeStatus); } @@ -360,12 +387,15 @@ TEST_F(MergeChunkTest, NonMatchingEpochsOfChunkAndRequestErrors) { setupChunks({chunk, chunk2}).transitional_ignore(); + Timestamp validAfter{1}; + auto mergeStatus = ShardingCatalogManager::get(operationContext()) ->commitChunkMerge(operationContext(), NamespaceString("TestDB.TestColl"), OID::gen(), chunkBoundaries, - "shard0000"); + "shard0000", + validAfter); ASSERT_EQ(ErrorCodes::StaleEpoch, mergeStatus); } @@ -400,13 +430,16 @@ TEST_F(MergeChunkTest, MergeAlreadyHappenedFailsPrecondition) { setupChunks({mergedChunk}).transitional_ignore(); + Timestamp validAfter{1}; + ASSERT_EQ(ErrorCodes::BadValue, ShardingCatalogManager::get(operationContext()) ->commitChunkMerge(operationContext(), NamespaceString("TestDB.TestColl"), origVersion.epoch(), chunkBoundaries, - "shard0000")); + "shard0000", + validAfter)); // Verify that no change to config.chunks happened. auto findResponse = uassertStatusOK( @@ -461,13 +494,16 @@ TEST_F(MergeChunkTest, ChunkBoundariesOutOfOrderFails) { setupChunks(originalChunks).transitional_ignore(); } + Timestamp validAfter{1}; + ASSERT_EQ(ErrorCodes::InvalidOptions, ShardingCatalogManager::get(operationContext()) ->commitChunkMerge(operationContext(), NamespaceString("TestDB.TestColl"), epoch, chunkBoundaries, - "shard0000")); + "shard0000", + validAfter)); } } // namespace diff --git a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp index 31d3a794d41..932bc374745 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp @@ -53,6 +53,8 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) { auto chunkMax = BSON("a" << 10); chunk.setMin(chunkMin); chunk.setMax(chunkMax); + chunk.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX")), + ChunkHistory(Timestamp(90, 0), ShardId("shardY"))}); auto chunkSplitPoint = BSON("a" << 5); std::vector<BSONObj> splitPoints{chunkSplitPoint}; @@ -78,6 +80,9 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) { ASSERT_EQ(origVersion.majorVersion(), chunkDoc.getVersion().majorVersion()); ASSERT_EQ(origVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion()); + // Make sure the history is there + ASSERT_EQ(2UL, chunkDoc.getHistory().size()); + // Second chunkDoc should have range [chunkSplitPoint, chunkMax] auto otherChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint); ASSERT_OK(otherChunkDocStatus.getStatus()); @@ -88,6 +93,12 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) { // Check for increment on second chunkDoc's minor version ASSERT_EQ(origVersion.majorVersion(), otherChunkDoc.getVersion().majorVersion()); ASSERT_EQ(origVersion.minorVersion() + 2, otherChunkDoc.getVersion().minorVersion()); + + // Make sure the history is there + ASSERT_EQ(2UL, otherChunkDoc.getHistory().size()); + + // Both chunks should have the same history + ASSERT(chunkDoc.getHistory() == otherChunkDoc.getHistory()); } TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) { @@ -102,6 +113,8 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) { auto chunkMax = BSON("a" << 10); chunk.setMin(chunkMin); chunk.setMax(chunkMax); + chunk.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX")), + ChunkHistory(Timestamp(90, 0), ShardId("shardY"))}); auto chunkSplitPoint = BSON("a" << 5); auto chunkSplitPoint2 = BSON("a" << 7); @@ -128,6 +141,9 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) { ASSERT_EQ(origVersion.majorVersion(), chunkDoc.getVersion().majorVersion()); ASSERT_EQ(origVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion()); + // Make sure the history is there + ASSERT_EQ(2UL, chunkDoc.getHistory().size()); + // Second chunkDoc should have range [chunkSplitPoint, chunkSplitPoint2] auto midChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint); ASSERT_OK(midChunkDocStatus.getStatus()); @@ -139,6 +155,9 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) { ASSERT_EQ(origVersion.majorVersion(), midChunkDoc.getVersion().majorVersion()); ASSERT_EQ(origVersion.minorVersion() + 2, midChunkDoc.getVersion().minorVersion()); + // Make sure the history is there + ASSERT_EQ(2UL, midChunkDoc.getHistory().size()); + // Third chunkDoc should have range [chunkSplitPoint2, chunkMax] auto lastChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint2); ASSERT_OK(lastChunkDocStatus.getStatus()); @@ -149,6 +168,13 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) { // Check for increment on third chunkDoc's minor version ASSERT_EQ(origVersion.majorVersion(), lastChunkDoc.getVersion().majorVersion()); ASSERT_EQ(origVersion.minorVersion() + 3, lastChunkDoc.getVersion().minorVersion()); + + // Make sure the history is there + ASSERT_EQ(2UL, lastChunkDoc.getHistory().size()); + + // Both chunks should have the same history + ASSERT(chunkDoc.getHistory() == midChunkDoc.getHistory()); + ASSERT(midChunkDoc.getHistory() == lastChunkDoc.getHistory()); } TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) { diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index 0458815e990..7d523de8854 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -36,6 +36,7 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/commands.h" #include "mongo/db/field_parser.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/namespace_string.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" @@ -240,8 +241,11 @@ Status mergeChunks(OperationContext* opCtx, // // Run _configsvrCommitChunkMerge. // - MergeChunkRequest request{ - nss, shardingState->getShardName(), shardVersion.epoch(), chunkBoundaries}; + MergeChunkRequest request{nss, + shardingState->getShardName(), + shardVersion.epoch(), + chunkBoundaries, + LogicalClock::get(opCtx)->getClusterTime().asTimestamp()}; auto configCmdObj = request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 8f113193f1e..bb0dc85c807 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -35,6 +35,7 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" @@ -408,13 +409,15 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC migratedChunkType.setMin(_args.getMinKey()); migratedChunkType.setMax(_args.getMaxKey()); - CommitChunkMigrationRequest::appendAsCommand(&builder, - getNss(), - _args.getFromShardId(), - _args.getToShardId(), - migratedChunkType, - controlChunkType, - metadata->getCollVersion()); + CommitChunkMigrationRequest::appendAsCommand( + &builder, + getNss(), + _args.getFromShardId(), + _args.getToShardId(), + migratedChunkType, + controlChunkType, + metadata->getCollVersion(), + LogicalClock::get(opCtx)->getClusterTime().asTimestamp()); builder.append(kWriteConcernField, kMajorityWriteConcern.toBSON()); } diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 6b668e0c4ca..59aeadf2017 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -130,6 +130,7 @@ env.Library( 'request_types/update_zone_key_range_request_type.cpp', 'shard_id.cpp', 'versioning.cpp', + env.Idlc('catalog/type_chunk_base.idl')[0], env.Idlc('request_types/create_collection.idl')[0], env.Idlc('request_types/create_database.idl')[0], env.Idlc('request_types/flush_routing_table_cache_updates.idl')[0], diff --git a/src/mongo/s/catalog/type_chunk.cpp b/src/mongo/s/catalog/type_chunk.cpp index 58a181240a9..5aa3132a78e 100644 --- a/src/mongo/s/catalog/type_chunk.cpp +++ b/src/mongo/s/catalog/type_chunk.cpp @@ -54,6 +54,7 @@ const BSONField<std::string> ChunkType::shard("shard"); const BSONField<bool> ChunkType::jumbo("jumbo"); const BSONField<Date_t> ChunkType::lastmod("lastmod"); const BSONField<OID> ChunkType::epoch("lastmodEpoch"); +const BSONField<BSONObj> ChunkType::history("history"); namespace { @@ -157,6 +158,23 @@ ChunkRange ChunkRange::unionWith(ChunkRange const& other) const { le(_maxKey, other._maxKey) ? other._maxKey : _maxKey); } +StatusWith<std::vector<ChunkHistory>> ChunkHistory::fromBSON(const BSONArray& source) { + std::vector<ChunkHistory> values; + + for (const auto& arrayElement : source) { + if (arrayElement.type() == Object) { + IDLParserErrorContext tempContext("chunk history array"); + values.emplace_back(ChunkHistoryBase::parse(tempContext, arrayElement.Obj())); + } else { + return {ErrorCodes::BadValue, + str::stream() << "array element does not have the object type: " + << arrayElement.type()}; + } + } + + return values; +} + // ChunkType ChunkType::ChunkType() = default; @@ -217,6 +235,22 @@ StatusWith<ChunkType> ChunkType::fromConfigBSON(const BSONObj& source) { chunk._version = std::move(versionStatus.getValue()); } + { + BSONElement historyObj; + Status status = bsonExtractTypedField(source, history.name(), Array, &historyObj); + if (status.isOK()) { + auto history = std::move(ChunkHistory::fromBSON(BSONArray(historyObj.Obj()))); + if (!history.isOK()) + return history.getStatus(); + + chunk._history = std::move(history.getValue()); + } else if (status == ErrorCodes::NoSuchKey) { + // History is missing, so it will be presumed empty + } else { + return status; + } + } + return chunk; } @@ -236,7 +270,7 @@ BSONObj ChunkType::toConfigBSON() const { _version->appendForChunk(&builder); if (_jumbo) builder.append(jumbo.name(), getJumbo()); - + addHistoryToBSON(builder); return builder.obj(); } @@ -283,6 +317,22 @@ StatusWith<ChunkType> ChunkType::fromShardBSON(const BSONObj& source, const OID& chunk._version = std::move(statusWithChunkVersion.getValue()); } + { + BSONElement historyObj; + Status status = bsonExtractTypedField(source, history.name(), Array, &historyObj); + if (status.isOK()) { + auto history = std::move(ChunkHistory::fromBSON(BSONArray(historyObj.Obj()))); + if (!history.isOK()) + return history.getStatus(); + + chunk._history = std::move(history.getValue()); + } else if (status == ErrorCodes::NoSuchKey) { + // History is missing, so it will be presumed empty + } else { + return status; + } + } + return chunk; } @@ -296,6 +346,7 @@ BSONObj ChunkType::toShardBSON() const { builder.append(max.name(), getMax()); builder.append(shard.name(), getShard().toString()); builder.appendTimestamp(lastmod.name(), _version->toLong()); + addHistoryToBSON(builder); return builder.obj(); } @@ -334,6 +385,16 @@ void ChunkType::setJumbo(bool jumbo) { _jumbo = jumbo; } +void ChunkType::addHistoryToBSON(BSONObjBuilder& builder) const { + if (_history.size()) { + BSONArrayBuilder arrayBuilder(builder.subarrayStart(history.name())); + for (const auto& item : _history) { + BSONObjBuilder subObjBuilder(arrayBuilder.subobjStart()); + item.serialize(&subObjBuilder); + } + } +} + std::string ChunkType::genID(const NamespaceString& nss, const BSONObj& o) { StringBuilder buf; buf << nss.ns() << "-"; diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h index b4998af3c9a..139e2d1dfa9 100644 --- a/src/mongo/s/catalog/type_chunk.h +++ b/src/mongo/s/catalog/type_chunk.h @@ -33,6 +33,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" +#include "mongo/s/catalog/type_chunk_base_gen.h" #include "mongo/s/chunk_version.h" #include "mongo/s/shard_id.h" @@ -104,6 +105,22 @@ private: BSONObj _maxKey; }; +class ChunkHistory : public ChunkHistoryBase { +public: + ChunkHistory() : ChunkHistoryBase() {} + ChunkHistory(mongo::Timestamp ts, mongo::ShardId shard) : ChunkHistoryBase() { + setValidAfter(std::move(ts)); + setShard(std::move(shard)); + } + ChunkHistory(const ChunkHistoryBase& b) : ChunkHistoryBase(b) {} + + static StatusWith<std::vector<ChunkHistory>> fromBSON(const BSONArray& source); + + bool operator==(const ChunkHistory& other) const { + return getValidAfter() == other.getValidAfter() && getShard() == other.getShard(); + } +}; + /** * This class represents the layouts and contents of documents contained in the config server's * config.chunks and shard server's config.chunks.uuid collections. All manipulation of documents @@ -162,6 +179,7 @@ public: static const BSONField<bool> jumbo; static const BSONField<Date_t> lastmod; static const BSONField<OID> epoch; + static const BSONField<BSONObj> history; ChunkType(); ChunkType(NamespaceString nss, ChunkRange range, ChunkVersion version, ShardId shardId); @@ -236,6 +254,15 @@ public: } void setJumbo(bool jumbo); + void setHistory(std::vector<ChunkHistory>&& history) { + _history = std::move(history); + } + const std::vector<ChunkHistory>& getHistory() const { + return _history; + } + + void addHistoryToBSON(BSONObjBuilder& builder) const; + /** * Generates chunk id based on the namespace name and the lower bound of the chunk. */ @@ -267,6 +294,8 @@ private: boost::optional<ShardId> _shard; // (O)(C) too big to move? boost::optional<bool> _jumbo; + // history of the chunk + std::vector<ChunkHistory> _history; }; } // namespace mongo diff --git a/src/mongo/s/catalog/type_chunk_base.idl b/src/mongo/s/catalog/type_chunk_base.idl new file mode 100644 index 00000000000..c1741a14f3f --- /dev/null +++ b/src/mongo/s/catalog/type_chunk_base.idl @@ -0,0 +1,44 @@ +# Copyright (C) 2018 MongoDB Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +# ChunkTypeBase type + +global: + cpp_namespace: "mongo" + cpp_includes: + - "mongo/s/shard_id.h" + +imports: + - "mongo/idl/basic_types.idl" + +types: + shardid: + bson_serialization_type: string + description: "A BSON UTF-8 string" + cpp_type: "ShardId" + deserializer: "mongo::BSONElement::str" + serializer: "mongo::ShardId::toString" + +structs: + ChunkHistoryBase: + description: "An element of a chunk history array. The array holds the history ordered from + the most recent (the index 0) to the oldest supported." + fields: + validAfter: + type: timestamp + description: The time after which this chunk is at this shard. + shard: + type: shardid + description: Shard this chunk lives in, starting at the "validAfter" time. diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 527440594eb..21067acd7e0 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -61,6 +61,7 @@ Chunk::Chunk(const ChunkType& from) : _range(from.getMin(), from.getMax()), _shardId(from.getShard()), _lastmod(from.getVersion()), + _history(from.getHistory()), _jumbo(from.getJumbo()), _dataWritten(mkDataWritten()) { invariantOK(from.validate()); diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index 540238463b5..31205aefc25 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -98,6 +98,8 @@ private: const ChunkVersion _lastmod; + const std::vector<ChunkHistory> _history; + // Indicates whether this chunk should be treated as jumbo and not attempted to be moved or // split mutable bool _jumbo; diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp b/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp index 9c5f513ac60..afc887745f0 100644 --- a/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp +++ b/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp @@ -65,13 +65,16 @@ TEST(CommitChunkMigrationRequest, WithControlChunk) { controlChunk.setMax(kKey3); boost::optional<ChunkType> controlChunkOpt = controlChunk; + Timestamp validAfter{1}; + CommitChunkMigrationRequest::appendAsCommand(&builder, kNamespaceString, kShardId0, kShardId1, migratedChunk, controlChunkOpt, - fromShardCollectionVersion); + fromShardCollectionVersion, + validAfter); BSONObj cmdObj = builder.obj(); @@ -98,13 +101,16 @@ TEST(CommitChunkMigrationRequest, WithoutControlChunk) { ChunkVersion fromShardCollectionVersion(1, 2, OID::gen()); + Timestamp validAfter{1}; + CommitChunkMigrationRequest::appendAsCommand(&builder, kNamespaceString, kShardId0, kShardId1, migratedChunk, boost::none, - fromShardCollectionVersion); + fromShardCollectionVersion, + validAfter); BSONObj cmdObj = builder.obj(); diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp b/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp index 1f1125529cb..463ee0d4a92 100644 --- a/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp +++ b/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp @@ -41,6 +41,7 @@ const char kToShard[] = "toShard"; const char kMigratedChunk[] = "migratedChunk"; const char kControlChunk[] = "controlChunk"; const char kFromShardCollectionVersion[] = "fromShardCollectionVersion"; +const char kValidAfter[] = "validAfter"; /** * Attempts to parse a (range-only!) ChunkType from "field" in "source". @@ -132,6 +133,20 @@ StatusWith<CommitChunkMigrationRequest> CommitChunkMigrationRequest::createFromC request._collectionEpoch = statusWithChunkVersion.getValue().epoch(); } + { + Timestamp validAfter; + auto status = bsonExtractTimestampField(obj, kValidAfter, &validAfter); + if (!status.isOK() && status != ErrorCodes::NoSuchKey) { + return status; + } + + if (status.isOK()) { + request._validAfter = validAfter; + } else { + request._validAfter = boost::none; + } + } + return request; } @@ -141,7 +156,8 @@ void CommitChunkMigrationRequest::appendAsCommand(BSONObjBuilder* builder, const ShardId& toShard, const ChunkType& migratedChunk, const boost::optional<ChunkType>& controlChunk, - const ChunkVersion& fromShardCollectionVersion) { + const ChunkVersion& fromShardCollectionVersion, + const Timestamp& validAfter) { invariant(builder->asTempObj().isEmpty()); invariant(nss.isValid()); @@ -154,6 +170,7 @@ void CommitChunkMigrationRequest::appendAsCommand(BSONObjBuilder* builder, if (controlChunk) { builder->append(kControlChunk, controlChunk->toConfigBSON()); } + builder->append(kValidAfter, validAfter); } } // namespace mongo diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_type.h b/src/mongo/s/request_types/commit_chunk_migration_request_type.h index 919db71870d..f403572e46e 100644 --- a/src/mongo/s/request_types/commit_chunk_migration_request_type.h +++ b/src/mongo/s/request_types/commit_chunk_migration_request_type.h @@ -60,7 +60,8 @@ struct CommitChunkMigrationRequest { const ShardId& toShard, const ChunkType& migratedChunkType, const boost::optional<ChunkType>& controlChunkType, - const ChunkVersion& fromShardChunkVersion); + const ChunkVersion& fromShardChunkVersion, + const Timestamp& validAfter); const NamespaceString& getNss() const { return _nss; @@ -80,6 +81,9 @@ struct CommitChunkMigrationRequest { const OID& getCollectionEpoch() { return _collectionEpoch; } + const boost::optional<Timestamp>& getValidAfter() { + return _validAfter; + } // The collection for which this request applies. NamespaceString _nss; @@ -97,6 +101,9 @@ struct CommitChunkMigrationRequest { boost::optional<ChunkType> _controlChunk; OID _collectionEpoch; + + // The time of the move + boost::optional<Timestamp> _validAfter; }; } // namespace mongo diff --git a/src/mongo/s/request_types/merge_chunk_request_test.cpp b/src/mongo/s/request_types/merge_chunk_request_test.cpp index de9ecd7990c..ce8b7366a9e 100644 --- a/src/mongo/s/request_types/merge_chunk_request_test.cpp +++ b/src/mongo/s/request_types/merge_chunk_request_test.cpp @@ -64,7 +64,9 @@ TEST(MergeChunkRequest, ConfigCommandtoBSON) { << "chunkBoundaries" << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)) << "shard" - << "shard0000"); + << "shard0000" + << "validAfter" + << Timestamp{100}); BSONObj writeConcernObj = BSON("writeConcern" << BSON("w" << "majority")); diff --git a/src/mongo/s/request_types/merge_chunk_request_type.cpp b/src/mongo/s/request_types/merge_chunk_request_type.cpp index 64200b85460..abace857a99 100644 --- a/src/mongo/s/request_types/merge_chunk_request_type.cpp +++ b/src/mongo/s/request_types/merge_chunk_request_type.cpp @@ -40,17 +40,19 @@ const char kConfigsvrMergeChunk[] = "_configsvrCommitChunkMerge"; const char kCollEpoch[] = "collEpoch"; const char kChunkBoundaries[] = "chunkBoundaries"; const char kShardName[] = "shard"; - +const char kValidAfter[] = "validAfter"; } // namespace MergeChunkRequest::MergeChunkRequest(NamespaceString nss, std::string shardName, OID epoch, - std::vector<BSONObj> chunkBoundaries) + std::vector<BSONObj> chunkBoundaries, + boost::optional<Timestamp> validAfter) : _nss(std::move(nss)), _epoch(std::move(epoch)), _chunkBoundaries(std::move(chunkBoundaries)), - _shardName(std::move(shardName)) {} + _shardName(std::move(shardName)), + _validAfter(validAfter) {} StatusWith<MergeChunkRequest> MergeChunkRequest::parseFromConfigCommand(const BSONObj& cmdObj) { std::string ns; @@ -103,8 +105,24 @@ StatusWith<MergeChunkRequest> MergeChunkRequest::parseFromConfigCommand(const BS } } - return MergeChunkRequest( - std::move(nss), std::move(shardName), std::move(epoch), std::move(chunkBoundaries)); + boost::optional<Timestamp> validAfter = boost::none; + { + Timestamp ts{0}; + auto status = bsonExtractTimestampField(cmdObj, kValidAfter, &ts); + if (!status.isOK() && status != ErrorCodes::NoSuchKey) { + return status; + } + + if (status.isOK()) { + validAfter = ts; + } + } + + return MergeChunkRequest(std::move(nss), + std::move(shardName), + std::move(epoch), + std::move(chunkBoundaries), + validAfter); } BSONObj MergeChunkRequest::toConfigCommandBSON(const BSONObj& writeConcern) { @@ -127,6 +145,8 @@ void MergeChunkRequest::appendAsConfigCommand(BSONObjBuilder* cmdBuilder) { } } cmdBuilder->append(kShardName, _shardName); + invariant(_validAfter.is_initialized()); + cmdBuilder->append(kValidAfter, _validAfter.get()); } } // namespace mongo diff --git a/src/mongo/s/request_types/merge_chunk_request_type.h b/src/mongo/s/request_types/merge_chunk_request_type.h index 2ead5f3a9b1..2722c53b057 100644 --- a/src/mongo/s/request_types/merge_chunk_request_type.h +++ b/src/mongo/s/request_types/merge_chunk_request_type.h @@ -46,7 +46,8 @@ public: MergeChunkRequest(NamespaceString nss, std::string shardName, OID epoch, - std::vector<BSONObj> chunkBoundaries); + std::vector<BSONObj> chunkBoundaries, + boost::optional<Timestamp> validAfter); /** * Parses the provided BSON content as the internal _configsvrCommitChunkMerge command, and if @@ -94,6 +95,10 @@ public: return _shardName; } + const boost::optional<Timestamp>& getValidAfter() const { + return _validAfter; + } + private: NamespaceString _nss; OID _epoch; @@ -102,6 +107,8 @@ private: std::vector<BSONObj> _chunkBoundaries; std::string _shardName; + + boost::optional<Timestamp> _validAfter; }; } // namespace mongo |