summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp3
-rw-r--r--src/mongo/db/s/config/configsvr_merge_chunk_command.cpp3
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h10
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp99
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp7
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp145
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp52
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp26
-rw-r--r--src/mongo/db/s/merge_chunks_command.cpp8
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp17
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/catalog/type_chunk.cpp63
-rw-r--r--src/mongo/s/catalog/type_chunk.h29
-rw-r--r--src/mongo/s/catalog/type_chunk_base.idl44
-rw-r--r--src/mongo/s/chunk.cpp1
-rw-r--r--src/mongo/s/chunk.h2
-rw-r--r--src/mongo/s/request_types/commit_chunk_migration_request_test.cpp10
-rw-r--r--src/mongo/s/request_types/commit_chunk_migration_request_type.cpp19
-rw-r--r--src/mongo/s/request_types/commit_chunk_migration_request_type.h9
-rw-r--r--src/mongo/s/request_types/merge_chunk_request_test.cpp4
-rw-r--r--src/mongo/s/request_types/merge_chunk_request_type.cpp30
-rw-r--r--src/mongo/s/request_types/merge_chunk_request_type.h9
22 files changed, 548 insertions, 43 deletions
diff --git a/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp b/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp
index 28119fe682e..5756821d11e 100644
--- a/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp
+++ b/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp
@@ -134,7 +134,8 @@ public:
commitRequest.getControlChunk(),
commitRequest.getCollectionEpoch(),
commitRequest.getFromShard(),
- commitRequest.getToShard());
+ commitRequest.getToShard(),
+ commitRequest.getValidAfter());
if (!response.isOK()) {
return CommandHelpers::appendCommandStatus(result, response.getStatus());
}
diff --git a/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp b/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp
index 74d11ef2eb5..91c7fe9de31 100644
--- a/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp
+++ b/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp
@@ -114,7 +114,8 @@ public:
parsedRequest.getNamespace(),
parsedRequest.getEpoch(),
parsedRequest.getChunkBoundaries(),
- parsedRequest.getShardName());
+ parsedRequest.getShardName(),
+ parsedRequest.getValidAfter());
if (!mergeChunkResult.isOK()) {
return CommandHelpers::appendCommandStatus(result, mergeChunkResult);
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index 786b862f7ee..a244f701133 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -171,15 +171,20 @@ public:
/**
* Updates metadata in the config.chunks collection so the chunks with given boundaries are seen
* merged into a single larger chunk.
+ * If 'validAfter' is not set, this means the commit request came from an older server version,
+ * which is not history-aware.
*/
Status commitChunkMerge(OperationContext* opCtx,
const NamespaceString& nss,
const OID& requestEpoch,
const std::vector<BSONObj>& chunkBoundaries,
- const std::string& shardName);
+ const std::string& shardName,
+ const boost::optional<Timestamp>& validAfter);
/**
* Updates metadata in config.chunks collection to show the given chunk in its new shard.
+ * If 'validAfter' is not set, this means the commit request came from an older server version,
+ * which is not history-aware.
*/
StatusWith<BSONObj> commitChunkMigration(OperationContext* opCtx,
const NamespaceString& nss,
@@ -187,7 +192,8 @@ public:
const boost::optional<ChunkType>& controlChunk,
const OID& collectionEpoch,
const ShardId& fromShard,
- const ShardId& toShard);
+ const ShardId& toShard,
+ const boost::optional<Timestamp>& validAfter);
//
// Database Operations
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index ea34e097844..c4fc7ca563e 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -70,7 +70,8 @@ void appendShortVersion(BufBuilder* b, const ChunkType& chunk) {
}
BSONArray buildMergeChunksTransactionUpdates(const std::vector<ChunkType>& chunksToMerge,
- const ChunkVersion& mergeVersion) {
+ const ChunkVersion& mergeVersion,
+ const boost::optional<Timestamp>& validAfter) {
BSONArrayBuilder updates;
// Build an update operation to expand the first chunk into the newly merged chunk
@@ -87,6 +88,13 @@ BSONArray buildMergeChunksTransactionUpdates(const std::vector<ChunkType>& chunk
// fill in additional details for sending through transaction
mergedChunk.setVersion(mergeVersion);
+ // Clear the chunk history
+ std::vector<ChunkHistory> history;
+ if (validAfter) {
+ history.emplace_back(ChunkHistory(validAfter.get(), mergedChunk.getShard()));
+ }
+ mergedChunk.setHistory(std::move(history));
+
// add the new chunk information as the update object
op.append("o", mergedChunk.toConfigBSON());
@@ -188,6 +196,7 @@ BSONObj makeCommitChunkTransactionCommand(const NamespaceString& nss,
n.append(ChunkType::min(), migratedChunk.getMin());
n.append(ChunkType::max(), migratedChunk.getMax());
n.append(ChunkType::shard(), toShard);
+ migratedChunk.addHistoryToBSON(n);
n.done();
BSONObjBuilder q(op.subobjStart("o2"));
@@ -275,6 +284,32 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx,
return {ErrorCodes::StaleEpoch, errmsg};
}
+ // Find the chunk history.
+ auto findHistory = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON(ChunkType::name << ChunkType::genID(nss, range.getMin())),
+ BSONObj(),
+ 1);
+ if (!findHistory.isOK()) {
+ return findHistory.getStatus();
+ }
+
+ const auto origChunks = std::move(findHistory.getValue().docs);
+ if (origChunks.size() != 1) {
+ return {ErrorCodes::IncompatibleShardingMetadata,
+ str::stream() << "Tried to find the chunk history for '"
+ << ChunkType::genID(nss, range.getMin())
+ << ", but found no chunks"};
+ }
+
+ const auto origChunk = ChunkType::fromConfigBSON(origChunks.front());
+ if (!origChunk.isOK()) {
+ return origChunk.getStatus();
+ }
+
std::vector<ChunkType> newChunks;
ChunkVersion currentMaxVersion = collVersion;
@@ -336,6 +371,7 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx,
n.append(ChunkType::min(), startKey);
n.append(ChunkType::max(), endKey);
n.append(ChunkType::shard(), shardName);
+ origChunk.getValue().addHistoryToBSON(n);
n.done();
// add the chunk's _id as the query part of the update statement
@@ -431,7 +467,8 @@ Status ShardingCatalogManager::commitChunkMerge(OperationContext* opCtx,
const NamespaceString& nss,
const OID& requestEpoch,
const std::vector<BSONObj>& chunkBoundaries,
- const std::string& shardName) {
+ const std::string& shardName,
+ const boost::optional<Timestamp>& validAfter) {
// This method must never be called with empty chunks to merge
invariant(!chunkBoundaries.empty());
@@ -500,7 +537,7 @@ Status ShardingCatalogManager::commitChunkMerge(OperationContext* opCtx,
ChunkVersion mergeVersion = collVersion;
mergeVersion.incMinor();
- auto updates = buildMergeChunksTransactionUpdates(chunksToMerge, mergeVersion);
+ auto updates = buildMergeChunksTransactionUpdates(chunksToMerge, mergeVersion, validAfter);
auto preCond = buildMergeChunksTransactionPrecond(chunksToMerge, collVersion);
// apply the batch of updates to local metadata
@@ -542,7 +579,8 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
const boost::optional<ChunkType>& controlChunk,
const OID& collectionEpoch,
const ShardId& fromShard,
- const ShardId& toShard) {
+ const ShardId& toShard,
+ const boost::optional<Timestamp>& validAfter) {
auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
@@ -624,12 +662,65 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
}
}
+ // Find the chunk history.
+ auto findHistory = configShard->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON(ChunkType::name << ChunkType::genID(nss, migratedChunk.getMin())),
+ BSONObj(),
+ 1);
+ if (!findHistory.isOK()) {
+ return findHistory.getStatus();
+ }
+
+ const auto origChunks = std::move(findHistory.getValue().docs);
+ if (origChunks.size() != 1) {
+ return {ErrorCodes::IncompatibleShardingMetadata,
+ str::stream() << "Tried to find the chunk history for '"
+ << ChunkType::genID(nss, migratedChunk.getMin())
+ << ", but found no chunks"};
+ }
+
+ const auto origChunk = ChunkType::fromConfigBSON(origChunks.front());
+ if (!origChunk.isOK()) {
+ return origChunk.getStatus();
+ }
+
// Generate the new versions of migratedChunk and controlChunk. Migrating chunk's minor version
// will be 0.
ChunkType newMigratedChunk = migratedChunk;
newMigratedChunk.setVersion(ChunkVersion(
currentCollectionVersion.majorVersion() + 1, 0, currentCollectionVersion.epoch()));
+ // Copy the complete history.
+ auto newHistory = origChunk.getValue().getHistory();
+ const int kHistorySecs = 10;
+
+ // Update the history of the migrated chunk.
+ if (validAfter) {
+ // Drop the history that is too old (10 seconds of history for now).
+ // TODO SERVER-33831 to update the old history removal policy.
+ while (!newHistory.empty() &&
+ newHistory.back().getValidAfter().getSecs() + kHistorySecs <
+ validAfter.get().getSecs()) {
+ newHistory.pop_back();
+ }
+
+ if (!newHistory.empty() && newHistory.front().getValidAfter() >= validAfter.get()) {
+ return {ErrorCodes::IncompatibleShardingMetadata,
+ str::stream() << "The chunk history for '"
+ << ChunkType::genID(nss, migratedChunk.getMin())
+ << " is corrupted. The last validAfter "
+ << newHistory.back().getValidAfter().toString()
+ << " is greater or equal to the new validAfter "
+ << validAfter.get().toString()};
+ }
+ newHistory.emplace(newHistory.begin(), ChunkHistory(validAfter.get(), toShard));
+ }
+ newMigratedChunk.setHistory(std::move(newHistory));
+
// Control chunk's minor version will be 1 (if control chunk is present).
boost::optional<ChunkType> newControlChunk = boost::none;
if (controlChunk) {
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
index e990e35f7dd..802faba9f97 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
@@ -274,6 +275,8 @@ ChunkVersion ShardingCatalogManager::_createFirstChunks(OperationContext* opCtx,
log() << "going to create " << splitPoints.size() + 1 << " chunk(s) for: " << nss
<< " using new epoch " << version.epoch();
+ const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp();
+
for (unsigned i = 0; i <= splitPoints.size(); i++) {
const BSONObj min = (i == 0) ? keyPattern.globalMin() : splitPoints[i - 1];
const BSONObj max = (i < splitPoints.size()) ? splitPoints[i] : keyPattern.globalMax();
@@ -290,6 +293,10 @@ ChunkVersion ShardingCatalogManager::_createFirstChunks(OperationContext* opCtx,
chunk.setMax(max);
chunk.setShard(shardIds[i % shardIds.size()]);
chunk.setVersion(version);
+ // TODO SERVER-33781 write history only when FCV4.0 config.
+ std::vector<ChunkHistory> initialHistory;
+ initialHistory.emplace_back(ChunkHistory(validAfter, shardIds[i % shardIds.size()]));
+ chunk.setHistory(std::move(initialHistory));
uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument(
opCtx,
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp
index 76340441d2c..dd1357e8b4a 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp
@@ -64,6 +64,7 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandWithCtl) {
chunk0.setNS(kNamespace);
chunk0.setVersion(origVersion);
chunk0.setShard(shard0.getName());
+ chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))});
// apportion
auto chunkMin = BSON("a" << 1);
@@ -86,6 +87,8 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandWithCtl) {
ChunkType const& chunk0cref = chunk0;
ChunkType const& chunk1cref = chunk1;
+ Timestamp validAfter{101, 0};
+
StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
->commitChunkMigration(operationContext(),
chunk0.getNS(),
@@ -93,7 +96,8 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandWithCtl) {
chunk1cref,
origVersion.epoch(),
ShardId(shard0.getName()),
- ShardId(shard1.getName()));
+ ShardId(shard1.getName()),
+ validAfter);
ASSERT_OK(resultBSON.getStatus());
@@ -111,6 +115,9 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandWithCtl) {
auto chunkDoc0 = uassertStatusOK(getChunkDoc(operationContext(), chunkMin));
ASSERT_EQ("shard1", chunkDoc0.getShard().toString());
ASSERT_EQ(mver.getValue(), chunkDoc0.getVersion());
+ // The history should be updated.
+ ASSERT_EQ(2UL, chunkDoc0.getHistory().size());
+ ASSERT_EQ(validAfter, chunkDoc0.getHistory().front().getValidAfter());
auto chunkDoc1 = uassertStatusOK(getChunkDoc(operationContext(), chunkMax));
ASSERT_EQ("shard0", chunkDoc1.getShard().toString());
@@ -136,6 +143,7 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtl) {
chunk0.setNS(kNamespace);
chunk0.setVersion(origVersion);
chunk0.setShard(shard0.getName());
+ chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))});
// apportion
auto chunkMin = BSON("a" << 1);
@@ -145,6 +153,8 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtl) {
setupChunks({chunk0}).transitional_ignore();
+ Timestamp validAfter{101, 0};
+
StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
->commitChunkMigration(operationContext(),
chunk0.getNS(),
@@ -152,7 +162,8 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtl) {
boost::none,
origVersion.epoch(),
ShardId(shard0.getName()),
- ShardId(shard1.getName()));
+ ShardId(shard1.getName()),
+ validAfter);
ASSERT_OK(resultBSON.getStatus());
@@ -169,6 +180,116 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtl) {
auto chunkDoc0 = uassertStatusOK(getChunkDoc(operationContext(), chunkMin));
ASSERT_EQ("shard1", chunkDoc0.getShard().toString());
ASSERT_EQ(mver.getValue(), chunkDoc0.getVersion());
+ // The history should be updated.
+ ASSERT_EQ(2UL, chunkDoc0.getHistory().size());
+ ASSERT_EQ(validAfter, chunkDoc0.getHistory().front().getValidAfter());
+}
+
+TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtlTrimHistory) {
+
+ ShardType shard0;
+ shard0.setName("shard0");
+ shard0.setHost("shard0:12");
+
+ ShardType shard1;
+ shard1.setName("shard1");
+ shard1.setHost("shard1:12");
+
+ setupShards({shard0, shard1}).transitional_ignore();
+
+ int origMajorVersion = 15;
+ auto const origVersion = ChunkVersion(origMajorVersion, 4, OID::gen());
+
+ ChunkType chunk0;
+ chunk0.setNS(kNamespace);
+ chunk0.setVersion(origVersion);
+ chunk0.setShard(shard0.getName());
+ chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))});
+
+ // apportion
+ auto chunkMin = BSON("a" << 1);
+ chunk0.setMin(chunkMin);
+ auto chunkMax = BSON("a" << 10);
+ chunk0.setMax(chunkMax);
+
+ setupChunks({chunk0}).transitional_ignore();
+
+ // Make the time distance between the last history element large enough.
+ Timestamp validAfter{200, 0};
+
+ StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
+ ->commitChunkMigration(operationContext(),
+ chunk0.getNS(),
+ chunk0,
+ boost::none,
+ origVersion.epoch(),
+ ShardId(shard0.getName()),
+ ShardId(shard1.getName()),
+ validAfter);
+
+ ASSERT_OK(resultBSON.getStatus());
+
+ // Verify the version returned matches expected value.
+ BSONObj versions = resultBSON.getValue();
+ auto mver = ChunkVersion::parseFromBSONWithFieldForCommands(versions, "migratedChunkVersion");
+ ASSERT_OK(mver.getStatus());
+ ASSERT_EQ(ChunkVersion(origMajorVersion + 1, 0, origVersion.epoch()), mver.getValue());
+
+ auto cver = ChunkVersion::parseFromBSONWithFieldForCommands(versions, "controlChunkVersion");
+ ASSERT_NOT_OK(cver.getStatus());
+
+ // Verify the chunk ended up in the right shard, and version matches the value returned.
+ auto chunkDoc0 = uassertStatusOK(getChunkDoc(operationContext(), chunkMin));
+ ASSERT_EQ("shard1", chunkDoc0.getShard().toString());
+ ASSERT_EQ(mver.getValue(), chunkDoc0.getVersion());
+ // The history should be updated.
+ ASSERT_EQ(1UL, chunkDoc0.getHistory().size());
+ ASSERT_EQ(validAfter, chunkDoc0.getHistory().front().getValidAfter());
+}
+
+TEST_F(CommitChunkMigrate, RejectOutOfOrderHistory) {
+
+ ShardType shard0;
+ shard0.setName("shard0");
+ shard0.setHost("shard0:12");
+
+ ShardType shard1;
+ shard1.setName("shard1");
+ shard1.setHost("shard1:12");
+
+ setupShards({shard0, shard1}).transitional_ignore();
+
+ int origMajorVersion = 15;
+ auto const origVersion = ChunkVersion(origMajorVersion, 4, OID::gen());
+
+ ChunkType chunk0;
+ chunk0.setNS(kNamespace);
+ chunk0.setVersion(origVersion);
+ chunk0.setShard(shard0.getName());
+ chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))});
+
+ // apportion
+ auto chunkMin = BSON("a" << 1);
+ chunk0.setMin(chunkMin);
+ auto chunkMax = BSON("a" << 10);
+ chunk0.setMax(chunkMax);
+
+ setupChunks({chunk0}).transitional_ignore();
+
+ // Make the time before the last change to trigger the failure.
+ Timestamp validAfter{99, 0};
+
+ StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
+ ->commitChunkMigration(operationContext(),
+ chunk0.getNS(),
+ chunk0,
+ boost::none,
+ origVersion.epoch(),
+ ShardId(shard0.getName()),
+ ShardId(shard1.getName()),
+ validAfter);
+
+ ASSERT_EQ(ErrorCodes::IncompatibleShardingMetadata, resultBSON.getStatus());
}
TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch0) {
@@ -208,6 +329,8 @@ TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch0) {
setupChunks({chunk0, chunk1}).transitional_ignore();
+ Timestamp validAfter{1};
+
StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
->commitChunkMigration(operationContext(),
chunk0.getNS(),
@@ -215,7 +338,8 @@ TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch0) {
chunk1,
OID::gen(),
ShardId(shard0.getName()),
- ShardId(shard1.getName()));
+ ShardId(shard1.getName()),
+ validAfter);
ASSERT_EQ(ErrorCodes::StaleEpoch, resultBSON.getStatus());
}
@@ -259,6 +383,8 @@ TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch1) {
// get version from the control chunk this time
setupChunks({chunk1, chunk0}).transitional_ignore();
+ Timestamp validAfter{1};
+
StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
->commitChunkMigration(operationContext(),
chunk0.getNS(),
@@ -266,7 +392,8 @@ TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch1) {
chunk1,
origVersion.epoch(),
ShardId(shard0.getName()),
- ShardId(shard1.getName()));
+ ShardId(shard1.getName()),
+ validAfter);
ASSERT_EQ(ErrorCodes::StaleEpoch, resultBSON.getStatus());
}
@@ -308,6 +435,8 @@ TEST_F(CommitChunkMigrate, RejectChunkMissing0) {
setupChunks({chunk1}).transitional_ignore();
+ Timestamp validAfter{1};
+
StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
->commitChunkMigration(operationContext(),
chunk0.getNS(),
@@ -315,7 +444,8 @@ TEST_F(CommitChunkMigrate, RejectChunkMissing0) {
chunk1,
origVersion.epoch(),
ShardId(shard0.getName()),
- ShardId(shard1.getName()));
+ ShardId(shard1.getName()),
+ validAfter);
ASSERT_EQ(40165, resultBSON.getStatus().code());
}
@@ -357,6 +487,8 @@ TEST_F(CommitChunkMigrate, RejectChunkMissing1) {
setupChunks({chunk0}).transitional_ignore();
+ Timestamp validAfter{1};
+
StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
->commitChunkMigration(operationContext(),
chunk0.getNS(),
@@ -364,7 +496,8 @@ TEST_F(CommitChunkMigrate, RejectChunkMissing1) {
chunk1,
origVersion.epoch(),
ShardId(shard0.getName()),
- ShardId(shard1.getName()));
+ ShardId(shard1.getName()),
+ validAfter);
ASSERT_EQ(40165, resultBSON.getStatus().code());
}
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp
index 3d115ec0a80..fee827a420f 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp
@@ -67,12 +67,15 @@ TEST_F(MergeChunkTest, MergeExistingChunksCorrectlyShouldSucceed) {
setupChunks({chunk, chunk2}).transitional_ignore();
+ Timestamp validAfter{100, 0};
+
ASSERT_OK(ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.TestColl"),
origVersion.epoch(),
chunkBoundaries,
- "shard0000"));
+ "shard0000",
+ validAfter));
auto findResponse = uassertStatusOK(
getConfigShard()->exhaustiveFindOnConfig(operationContext(),
@@ -98,6 +101,10 @@ TEST_F(MergeChunkTest, MergeExistingChunksCorrectlyShouldSucceed) {
ASSERT_EQ(origVersion.majorVersion(), mergedChunk.getVersion().majorVersion());
ASSERT_EQ(origVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion());
}
+
+ // Make sure history is there
+ ASSERT_EQ(1UL, mergedChunk.getHistory().size());
+ ASSERT_EQ(validAfter, mergedChunk.getHistory().front().getValidAfter());
}
TEST_F(MergeChunkTest, MergeSeveralChunksCorrectlyShouldSucceed) {
@@ -131,12 +138,15 @@ TEST_F(MergeChunkTest, MergeSeveralChunksCorrectlyShouldSucceed) {
setupChunks({chunk, chunk2, chunk3}).transitional_ignore();
+ Timestamp validAfter{100, 0};
+
ASSERT_OK(ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.TestColl"),
origVersion.epoch(),
chunkBoundaries,
- "shard0000"));
+ "shard0000",
+ validAfter));
auto findResponse = uassertStatusOK(
getConfigShard()->exhaustiveFindOnConfig(operationContext(),
@@ -162,6 +172,10 @@ TEST_F(MergeChunkTest, MergeSeveralChunksCorrectlyShouldSucceed) {
ASSERT_EQ(origVersion.majorVersion(), mergedChunk.getVersion().majorVersion());
ASSERT_EQ(origVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion());
}
+
+ // Make sure history is there
+ ASSERT_EQ(1UL, mergedChunk.getHistory().size());
+ ASSERT_EQ(validAfter, mergedChunk.getHistory().front().getValidAfter());
}
TEST_F(MergeChunkTest, NewMergeShouldClaimHighestVersion) {
@@ -199,12 +213,15 @@ TEST_F(MergeChunkTest, NewMergeShouldClaimHighestVersion) {
setupChunks({chunk, chunk2, otherChunk}).transitional_ignore();
+ Timestamp validAfter{100, 0};
+
ASSERT_OK(ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.TestColl"),
collEpoch,
chunkBoundaries,
- "shard0000"));
+ "shard0000",
+ validAfter));
auto findResponse = uassertStatusOK(
getConfigShard()->exhaustiveFindOnConfig(operationContext(),
@@ -230,6 +247,10 @@ TEST_F(MergeChunkTest, NewMergeShouldClaimHighestVersion) {
ASSERT_EQ(competingVersion.majorVersion(), mergedChunk.getVersion().majorVersion());
ASSERT_EQ(competingVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion());
}
+
+ // Make sure history is there
+ ASSERT_EQ(1UL, mergedChunk.getHistory().size());
+ ASSERT_EQ(validAfter, mergedChunk.getHistory().front().getValidAfter());
}
TEST_F(MergeChunkTest, MergeLeavesOtherChunksAlone) {
@@ -263,12 +284,15 @@ TEST_F(MergeChunkTest, MergeLeavesOtherChunksAlone) {
setupChunks({chunk, chunk2, otherChunk}).transitional_ignore();
+ Timestamp validAfter{1};
+
ASSERT_OK(ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.TestColl"),
origVersion.epoch(),
chunkBoundaries,
- "shard0000"));
+ "shard0000",
+ validAfter));
auto findResponse = uassertStatusOK(
getConfigShard()->exhaustiveFindOnConfig(operationContext(),
@@ -326,12 +350,15 @@ TEST_F(MergeChunkTest, NonExistingNamespace) {
setupChunks({chunk, chunk2}).transitional_ignore();
+ Timestamp validAfter{1};
+
auto mergeStatus = ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.NonExistingColl"),
origVersion.epoch(),
chunkBoundaries,
- "shard0000");
+ "shard0000",
+ validAfter);
ASSERT_EQ(ErrorCodes::IllegalOperation, mergeStatus);
}
@@ -360,12 +387,15 @@ TEST_F(MergeChunkTest, NonMatchingEpochsOfChunkAndRequestErrors) {
setupChunks({chunk, chunk2}).transitional_ignore();
+ Timestamp validAfter{1};
+
auto mergeStatus = ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.TestColl"),
OID::gen(),
chunkBoundaries,
- "shard0000");
+ "shard0000",
+ validAfter);
ASSERT_EQ(ErrorCodes::StaleEpoch, mergeStatus);
}
@@ -400,13 +430,16 @@ TEST_F(MergeChunkTest, MergeAlreadyHappenedFailsPrecondition) {
setupChunks({mergedChunk}).transitional_ignore();
+ Timestamp validAfter{1};
+
ASSERT_EQ(ErrorCodes::BadValue,
ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.TestColl"),
origVersion.epoch(),
chunkBoundaries,
- "shard0000"));
+ "shard0000",
+ validAfter));
// Verify that no change to config.chunks happened.
auto findResponse = uassertStatusOK(
@@ -461,13 +494,16 @@ TEST_F(MergeChunkTest, ChunkBoundariesOutOfOrderFails) {
setupChunks(originalChunks).transitional_ignore();
}
+ Timestamp validAfter{1};
+
ASSERT_EQ(ErrorCodes::InvalidOptions,
ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.TestColl"),
epoch,
chunkBoundaries,
- "shard0000"));
+ "shard0000",
+ validAfter));
}
} // namespace
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp
index 31d3a794d41..932bc374745 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp
@@ -53,6 +53,8 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) {
auto chunkMax = BSON("a" << 10);
chunk.setMin(chunkMin);
chunk.setMax(chunkMax);
+ chunk.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX")),
+ ChunkHistory(Timestamp(90, 0), ShardId("shardY"))});
auto chunkSplitPoint = BSON("a" << 5);
std::vector<BSONObj> splitPoints{chunkSplitPoint};
@@ -78,6 +80,9 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) {
ASSERT_EQ(origVersion.majorVersion(), chunkDoc.getVersion().majorVersion());
ASSERT_EQ(origVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion());
+ // Make sure the history is there
+ ASSERT_EQ(2UL, chunkDoc.getHistory().size());
+
// Second chunkDoc should have range [chunkSplitPoint, chunkMax]
auto otherChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint);
ASSERT_OK(otherChunkDocStatus.getStatus());
@@ -88,6 +93,12 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) {
// Check for increment on second chunkDoc's minor version
ASSERT_EQ(origVersion.majorVersion(), otherChunkDoc.getVersion().majorVersion());
ASSERT_EQ(origVersion.minorVersion() + 2, otherChunkDoc.getVersion().minorVersion());
+
+ // Make sure the history is there
+ ASSERT_EQ(2UL, otherChunkDoc.getHistory().size());
+
+ // Both chunks should have the same history
+ ASSERT(chunkDoc.getHistory() == otherChunkDoc.getHistory());
}
TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
@@ -102,6 +113,8 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
auto chunkMax = BSON("a" << 10);
chunk.setMin(chunkMin);
chunk.setMax(chunkMax);
+ chunk.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX")),
+ ChunkHistory(Timestamp(90, 0), ShardId("shardY"))});
auto chunkSplitPoint = BSON("a" << 5);
auto chunkSplitPoint2 = BSON("a" << 7);
@@ -128,6 +141,9 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
ASSERT_EQ(origVersion.majorVersion(), chunkDoc.getVersion().majorVersion());
ASSERT_EQ(origVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion());
+ // Make sure the history is there
+ ASSERT_EQ(2UL, chunkDoc.getHistory().size());
+
// Second chunkDoc should have range [chunkSplitPoint, chunkSplitPoint2]
auto midChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint);
ASSERT_OK(midChunkDocStatus.getStatus());
@@ -139,6 +155,9 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
ASSERT_EQ(origVersion.majorVersion(), midChunkDoc.getVersion().majorVersion());
ASSERT_EQ(origVersion.minorVersion() + 2, midChunkDoc.getVersion().minorVersion());
+ // Make sure the history is there
+ ASSERT_EQ(2UL, midChunkDoc.getHistory().size());
+
// Third chunkDoc should have range [chunkSplitPoint2, chunkMax]
auto lastChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint2);
ASSERT_OK(lastChunkDocStatus.getStatus());
@@ -149,6 +168,13 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
// Check for increment on third chunkDoc's minor version
ASSERT_EQ(origVersion.majorVersion(), lastChunkDoc.getVersion().majorVersion());
ASSERT_EQ(origVersion.minorVersion() + 3, lastChunkDoc.getVersion().minorVersion());
+
+ // Make sure the history is there
+ ASSERT_EQ(2UL, lastChunkDoc.getHistory().size());
+
+ // Both chunks should have the same history
+ ASSERT(chunkDoc.getHistory() == midChunkDoc.getHistory());
+ ASSERT(midChunkDoc.getHistory() == lastChunkDoc.getHistory());
}
TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) {
diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp
index 0458815e990..7d523de8854 100644
--- a/src/mongo/db/s/merge_chunks_command.cpp
+++ b/src/mongo/db/s/merge_chunks_command.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands.h"
#include "mongo/db/field_parser.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
@@ -240,8 +241,11 @@ Status mergeChunks(OperationContext* opCtx,
//
// Run _configsvrCommitChunkMerge.
//
- MergeChunkRequest request{
- nss, shardingState->getShardName(), shardVersion.epoch(), chunkBoundaries};
+ MergeChunkRequest request{nss,
+ shardingState->getShardName(),
+ shardVersion.epoch(),
+ chunkBoundaries,
+ LogicalClock::get(opCtx)->getClusterTime().asTimestamp()};
auto configCmdObj =
request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 8f113193f1e..bb0dc85c807 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -35,6 +35,7 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
@@ -408,13 +409,15 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
migratedChunkType.setMin(_args.getMinKey());
migratedChunkType.setMax(_args.getMaxKey());
- CommitChunkMigrationRequest::appendAsCommand(&builder,
- getNss(),
- _args.getFromShardId(),
- _args.getToShardId(),
- migratedChunkType,
- controlChunkType,
- metadata->getCollVersion());
+ CommitChunkMigrationRequest::appendAsCommand(
+ &builder,
+ getNss(),
+ _args.getFromShardId(),
+ _args.getToShardId(),
+ migratedChunkType,
+ controlChunkType,
+ metadata->getCollVersion(),
+ LogicalClock::get(opCtx)->getClusterTime().asTimestamp());
builder.append(kWriteConcernField, kMajorityWriteConcern.toBSON());
}
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 6b668e0c4ca..59aeadf2017 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -130,6 +130,7 @@ env.Library(
'request_types/update_zone_key_range_request_type.cpp',
'shard_id.cpp',
'versioning.cpp',
+ env.Idlc('catalog/type_chunk_base.idl')[0],
env.Idlc('request_types/create_collection.idl')[0],
env.Idlc('request_types/create_database.idl')[0],
env.Idlc('request_types/flush_routing_table_cache_updates.idl')[0],
diff --git a/src/mongo/s/catalog/type_chunk.cpp b/src/mongo/s/catalog/type_chunk.cpp
index 58a181240a9..5aa3132a78e 100644
--- a/src/mongo/s/catalog/type_chunk.cpp
+++ b/src/mongo/s/catalog/type_chunk.cpp
@@ -54,6 +54,7 @@ const BSONField<std::string> ChunkType::shard("shard");
const BSONField<bool> ChunkType::jumbo("jumbo");
const BSONField<Date_t> ChunkType::lastmod("lastmod");
const BSONField<OID> ChunkType::epoch("lastmodEpoch");
+const BSONField<BSONObj> ChunkType::history("history");
namespace {
@@ -157,6 +158,23 @@ ChunkRange ChunkRange::unionWith(ChunkRange const& other) const {
le(_maxKey, other._maxKey) ? other._maxKey : _maxKey);
}
+StatusWith<std::vector<ChunkHistory>> ChunkHistory::fromBSON(const BSONArray& source) {
+ std::vector<ChunkHistory> values;
+
+ for (const auto& arrayElement : source) {
+ if (arrayElement.type() == Object) {
+ IDLParserErrorContext tempContext("chunk history array");
+ values.emplace_back(ChunkHistoryBase::parse(tempContext, arrayElement.Obj()));
+ } else {
+ return {ErrorCodes::BadValue,
+ str::stream() << "array element does not have the object type: "
+ << arrayElement.type()};
+ }
+ }
+
+ return values;
+}
+
// ChunkType
ChunkType::ChunkType() = default;
@@ -217,6 +235,22 @@ StatusWith<ChunkType> ChunkType::fromConfigBSON(const BSONObj& source) {
chunk._version = std::move(versionStatus.getValue());
}
+ {
+ BSONElement historyObj;
+ Status status = bsonExtractTypedField(source, history.name(), Array, &historyObj);
+ if (status.isOK()) {
+ auto history = std::move(ChunkHistory::fromBSON(BSONArray(historyObj.Obj())));
+ if (!history.isOK())
+ return history.getStatus();
+
+ chunk._history = std::move(history.getValue());
+ } else if (status == ErrorCodes::NoSuchKey) {
+ // History is missing, so it will be presumed empty
+ } else {
+ return status;
+ }
+ }
+
return chunk;
}
@@ -236,7 +270,7 @@ BSONObj ChunkType::toConfigBSON() const {
_version->appendForChunk(&builder);
if (_jumbo)
builder.append(jumbo.name(), getJumbo());
-
+ addHistoryToBSON(builder);
return builder.obj();
}
@@ -283,6 +317,22 @@ StatusWith<ChunkType> ChunkType::fromShardBSON(const BSONObj& source, const OID&
chunk._version = std::move(statusWithChunkVersion.getValue());
}
+ {
+ BSONElement historyObj;
+ Status status = bsonExtractTypedField(source, history.name(), Array, &historyObj);
+ if (status.isOK()) {
+ auto history = std::move(ChunkHistory::fromBSON(BSONArray(historyObj.Obj())));
+ if (!history.isOK())
+ return history.getStatus();
+
+ chunk._history = std::move(history.getValue());
+ } else if (status == ErrorCodes::NoSuchKey) {
+ // History is missing, so it will be presumed empty
+ } else {
+ return status;
+ }
+ }
+
return chunk;
}
@@ -296,6 +346,7 @@ BSONObj ChunkType::toShardBSON() const {
builder.append(max.name(), getMax());
builder.append(shard.name(), getShard().toString());
builder.appendTimestamp(lastmod.name(), _version->toLong());
+ addHistoryToBSON(builder);
return builder.obj();
}
@@ -334,6 +385,16 @@ void ChunkType::setJumbo(bool jumbo) {
_jumbo = jumbo;
}
+void ChunkType::addHistoryToBSON(BSONObjBuilder& builder) const {
+ if (_history.size()) {
+ BSONArrayBuilder arrayBuilder(builder.subarrayStart(history.name()));
+ for (const auto& item : _history) {
+ BSONObjBuilder subObjBuilder(arrayBuilder.subobjStart());
+ item.serialize(&subObjBuilder);
+ }
+ }
+}
+
std::string ChunkType::genID(const NamespaceString& nss, const BSONObj& o) {
StringBuilder buf;
buf << nss.ns() << "-";
diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h
index b4998af3c9a..139e2d1dfa9 100644
--- a/src/mongo/s/catalog/type_chunk.h
+++ b/src/mongo/s/catalog/type_chunk.h
@@ -33,6 +33,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/s/catalog/type_chunk_base_gen.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/shard_id.h"
@@ -104,6 +105,22 @@ private:
BSONObj _maxKey;
};
+class ChunkHistory : public ChunkHistoryBase {
+public:
+ ChunkHistory() : ChunkHistoryBase() {}
+ ChunkHistory(mongo::Timestamp ts, mongo::ShardId shard) : ChunkHistoryBase() {
+ setValidAfter(std::move(ts));
+ setShard(std::move(shard));
+ }
+ ChunkHistory(const ChunkHistoryBase& b) : ChunkHistoryBase(b) {}
+
+ static StatusWith<std::vector<ChunkHistory>> fromBSON(const BSONArray& source);
+
+ bool operator==(const ChunkHistory& other) const {
+ return getValidAfter() == other.getValidAfter() && getShard() == other.getShard();
+ }
+};
+
/**
* This class represents the layouts and contents of documents contained in the config server's
* config.chunks and shard server's config.chunks.uuid collections. All manipulation of documents
@@ -162,6 +179,7 @@ public:
static const BSONField<bool> jumbo;
static const BSONField<Date_t> lastmod;
static const BSONField<OID> epoch;
+ static const BSONField<BSONObj> history;
ChunkType();
ChunkType(NamespaceString nss, ChunkRange range, ChunkVersion version, ShardId shardId);
@@ -236,6 +254,15 @@ public:
}
void setJumbo(bool jumbo);
+ void setHistory(std::vector<ChunkHistory>&& history) {
+ _history = std::move(history);
+ }
+ const std::vector<ChunkHistory>& getHistory() const {
+ return _history;
+ }
+
+ void addHistoryToBSON(BSONObjBuilder& builder) const;
+
/**
* Generates chunk id based on the namespace name and the lower bound of the chunk.
*/
@@ -267,6 +294,8 @@ private:
boost::optional<ShardId> _shard;
// (O)(C) too big to move?
boost::optional<bool> _jumbo;
+ // history of the chunk
+ std::vector<ChunkHistory> _history;
};
} // namespace mongo
diff --git a/src/mongo/s/catalog/type_chunk_base.idl b/src/mongo/s/catalog/type_chunk_base.idl
new file mode 100644
index 00000000000..c1741a14f3f
--- /dev/null
+++ b/src/mongo/s/catalog/type_chunk_base.idl
@@ -0,0 +1,44 @@
+# Copyright (C) 2018 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+# ChunkTypeBase type
+
+global:
+ cpp_namespace: "mongo"
+ cpp_includes:
+ - "mongo/s/shard_id.h"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+types:
+ shardid:
+ bson_serialization_type: string
+ description: "A BSON UTF-8 string"
+ cpp_type: "ShardId"
+ deserializer: "mongo::BSONElement::str"
+ serializer: "mongo::ShardId::toString"
+
+structs:
+ ChunkHistoryBase:
+ description: "An element of a chunk history array. The array holds the history ordered from
+ the most recent (the index 0) to the oldest supported."
+ fields:
+ validAfter:
+ type: timestamp
+ description: The time after which this chunk is at this shard.
+ shard:
+ type: shardid
+ description: Shard this chunk lives in, starting at the "validAfter" time.
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp
index 527440594eb..21067acd7e0 100644
--- a/src/mongo/s/chunk.cpp
+++ b/src/mongo/s/chunk.cpp
@@ -61,6 +61,7 @@ Chunk::Chunk(const ChunkType& from)
: _range(from.getMin(), from.getMax()),
_shardId(from.getShard()),
_lastmod(from.getVersion()),
+ _history(from.getHistory()),
_jumbo(from.getJumbo()),
_dataWritten(mkDataWritten()) {
invariantOK(from.validate());
diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h
index 540238463b5..31205aefc25 100644
--- a/src/mongo/s/chunk.h
+++ b/src/mongo/s/chunk.h
@@ -98,6 +98,8 @@ private:
const ChunkVersion _lastmod;
+ const std::vector<ChunkHistory> _history;
+
// Indicates whether this chunk should be treated as jumbo and not attempted to be moved or
// split
mutable bool _jumbo;
diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp b/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp
index 9c5f513ac60..afc887745f0 100644
--- a/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp
+++ b/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp
@@ -65,13 +65,16 @@ TEST(CommitChunkMigrationRequest, WithControlChunk) {
controlChunk.setMax(kKey3);
boost::optional<ChunkType> controlChunkOpt = controlChunk;
+ Timestamp validAfter{1};
+
CommitChunkMigrationRequest::appendAsCommand(&builder,
kNamespaceString,
kShardId0,
kShardId1,
migratedChunk,
controlChunkOpt,
- fromShardCollectionVersion);
+ fromShardCollectionVersion,
+ validAfter);
BSONObj cmdObj = builder.obj();
@@ -98,13 +101,16 @@ TEST(CommitChunkMigrationRequest, WithoutControlChunk) {
ChunkVersion fromShardCollectionVersion(1, 2, OID::gen());
+ Timestamp validAfter{1};
+
CommitChunkMigrationRequest::appendAsCommand(&builder,
kNamespaceString,
kShardId0,
kShardId1,
migratedChunk,
boost::none,
- fromShardCollectionVersion);
+ fromShardCollectionVersion,
+ validAfter);
BSONObj cmdObj = builder.obj();
diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp b/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp
index 1f1125529cb..463ee0d4a92 100644
--- a/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp
+++ b/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp
@@ -41,6 +41,7 @@ const char kToShard[] = "toShard";
const char kMigratedChunk[] = "migratedChunk";
const char kControlChunk[] = "controlChunk";
const char kFromShardCollectionVersion[] = "fromShardCollectionVersion";
+const char kValidAfter[] = "validAfter";
/**
* Attempts to parse a (range-only!) ChunkType from "field" in "source".
@@ -132,6 +133,20 @@ StatusWith<CommitChunkMigrationRequest> CommitChunkMigrationRequest::createFromC
request._collectionEpoch = statusWithChunkVersion.getValue().epoch();
}
+ {
+ Timestamp validAfter;
+ auto status = bsonExtractTimestampField(obj, kValidAfter, &validAfter);
+ if (!status.isOK() && status != ErrorCodes::NoSuchKey) {
+ return status;
+ }
+
+ if (status.isOK()) {
+ request._validAfter = validAfter;
+ } else {
+ request._validAfter = boost::none;
+ }
+ }
+
return request;
}
@@ -141,7 +156,8 @@ void CommitChunkMigrationRequest::appendAsCommand(BSONObjBuilder* builder,
const ShardId& toShard,
const ChunkType& migratedChunk,
const boost::optional<ChunkType>& controlChunk,
- const ChunkVersion& fromShardCollectionVersion) {
+ const ChunkVersion& fromShardCollectionVersion,
+ const Timestamp& validAfter) {
invariant(builder->asTempObj().isEmpty());
invariant(nss.isValid());
@@ -154,6 +170,7 @@ void CommitChunkMigrationRequest::appendAsCommand(BSONObjBuilder* builder,
if (controlChunk) {
builder->append(kControlChunk, controlChunk->toConfigBSON());
}
+ builder->append(kValidAfter, validAfter);
}
} // namespace mongo
diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_type.h b/src/mongo/s/request_types/commit_chunk_migration_request_type.h
index 919db71870d..f403572e46e 100644
--- a/src/mongo/s/request_types/commit_chunk_migration_request_type.h
+++ b/src/mongo/s/request_types/commit_chunk_migration_request_type.h
@@ -60,7 +60,8 @@ struct CommitChunkMigrationRequest {
const ShardId& toShard,
const ChunkType& migratedChunkType,
const boost::optional<ChunkType>& controlChunkType,
- const ChunkVersion& fromShardChunkVersion);
+ const ChunkVersion& fromShardChunkVersion,
+ const Timestamp& validAfter);
const NamespaceString& getNss() const {
return _nss;
@@ -80,6 +81,9 @@ struct CommitChunkMigrationRequest {
const OID& getCollectionEpoch() {
return _collectionEpoch;
}
+ const boost::optional<Timestamp>& getValidAfter() {
+ return _validAfter;
+ }
// The collection for which this request applies.
NamespaceString _nss;
@@ -97,6 +101,9 @@ struct CommitChunkMigrationRequest {
boost::optional<ChunkType> _controlChunk;
OID _collectionEpoch;
+
+ // The time of the move
+ boost::optional<Timestamp> _validAfter;
};
} // namespace mongo
diff --git a/src/mongo/s/request_types/merge_chunk_request_test.cpp b/src/mongo/s/request_types/merge_chunk_request_test.cpp
index de9ecd7990c..ce8b7366a9e 100644
--- a/src/mongo/s/request_types/merge_chunk_request_test.cpp
+++ b/src/mongo/s/request_types/merge_chunk_request_test.cpp
@@ -64,7 +64,9 @@ TEST(MergeChunkRequest, ConfigCommandtoBSON) {
<< "chunkBoundaries"
<< BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))
<< "shard"
- << "shard0000");
+ << "shard0000"
+ << "validAfter"
+ << Timestamp{100});
BSONObj writeConcernObj = BSON("writeConcern" << BSON("w"
<< "majority"));
diff --git a/src/mongo/s/request_types/merge_chunk_request_type.cpp b/src/mongo/s/request_types/merge_chunk_request_type.cpp
index 64200b85460..abace857a99 100644
--- a/src/mongo/s/request_types/merge_chunk_request_type.cpp
+++ b/src/mongo/s/request_types/merge_chunk_request_type.cpp
@@ -40,17 +40,19 @@ const char kConfigsvrMergeChunk[] = "_configsvrCommitChunkMerge";
const char kCollEpoch[] = "collEpoch";
const char kChunkBoundaries[] = "chunkBoundaries";
const char kShardName[] = "shard";
-
+const char kValidAfter[] = "validAfter";
} // namespace
MergeChunkRequest::MergeChunkRequest(NamespaceString nss,
std::string shardName,
OID epoch,
- std::vector<BSONObj> chunkBoundaries)
+ std::vector<BSONObj> chunkBoundaries,
+ boost::optional<Timestamp> validAfter)
: _nss(std::move(nss)),
_epoch(std::move(epoch)),
_chunkBoundaries(std::move(chunkBoundaries)),
- _shardName(std::move(shardName)) {}
+ _shardName(std::move(shardName)),
+ _validAfter(validAfter) {}
StatusWith<MergeChunkRequest> MergeChunkRequest::parseFromConfigCommand(const BSONObj& cmdObj) {
std::string ns;
@@ -103,8 +105,24 @@ StatusWith<MergeChunkRequest> MergeChunkRequest::parseFromConfigCommand(const BS
}
}
- return MergeChunkRequest(
- std::move(nss), std::move(shardName), std::move(epoch), std::move(chunkBoundaries));
+ boost::optional<Timestamp> validAfter = boost::none;
+ {
+ Timestamp ts{0};
+ auto status = bsonExtractTimestampField(cmdObj, kValidAfter, &ts);
+ if (!status.isOK() && status != ErrorCodes::NoSuchKey) {
+ return status;
+ }
+
+ if (status.isOK()) {
+ validAfter = ts;
+ }
+ }
+
+ return MergeChunkRequest(std::move(nss),
+ std::move(shardName),
+ std::move(epoch),
+ std::move(chunkBoundaries),
+ validAfter);
}
BSONObj MergeChunkRequest::toConfigCommandBSON(const BSONObj& writeConcern) {
@@ -127,6 +145,8 @@ void MergeChunkRequest::appendAsConfigCommand(BSONObjBuilder* cmdBuilder) {
}
}
cmdBuilder->append(kShardName, _shardName);
+ invariant(_validAfter.is_initialized());
+ cmdBuilder->append(kValidAfter, _validAfter.get());
}
} // namespace mongo
diff --git a/src/mongo/s/request_types/merge_chunk_request_type.h b/src/mongo/s/request_types/merge_chunk_request_type.h
index 2ead5f3a9b1..2722c53b057 100644
--- a/src/mongo/s/request_types/merge_chunk_request_type.h
+++ b/src/mongo/s/request_types/merge_chunk_request_type.h
@@ -46,7 +46,8 @@ public:
MergeChunkRequest(NamespaceString nss,
std::string shardName,
OID epoch,
- std::vector<BSONObj> chunkBoundaries);
+ std::vector<BSONObj> chunkBoundaries,
+ boost::optional<Timestamp> validAfter);
/**
* Parses the provided BSON content as the internal _configsvrCommitChunkMerge command, and if
@@ -94,6 +95,10 @@ public:
return _shardName;
}
+ const boost::optional<Timestamp>& getValidAfter() const {
+ return _validAfter;
+ }
+
private:
NamespaceString _nss;
OID _epoch;
@@ -102,6 +107,8 @@ private:
std::vector<BSONObj> _chunkBoundaries;
std::string _shardName;
+
+ boost::optional<Timestamp> _validAfter;
};
} // namespace mongo