summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Neupauer <martin.neupauer@mongodb.com>2018-02-28 17:13:53 -0500
committerMartin Neupauer <martin.neupauer@mongodb.com>2018-03-14 10:15:08 -0400
commit5b42a86b74380953db29455dc68e133166c81005 (patch)
tree9ffbeb25cce1ab78308bce7a086d22b90f1f07fe /src
parente8f97b7adb06f49c6092b8e3234e658386cd4471 (diff)
downloadmongo-5b42a86b74380953db29455dc68e133166c81005.tar.gz
SERVER-33455 SERVER-33452 - Add history to the chunk type and update move/split/merge methods.
The Chunk type has an array of the past history that we maintain in the sharding manager. When a chunk is moved we add a new entry to the history, the chunk split copies the history to new chunks and finally the merge drops any previous history and creates only the latest entry.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp3
-rw-r--r--src/mongo/db/s/config/configsvr_merge_chunk_command.cpp3
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h10
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp99
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp7
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp145
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp52
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp26
-rw-r--r--src/mongo/db/s/merge_chunks_command.cpp8
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp17
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/catalog/type_chunk.cpp63
-rw-r--r--src/mongo/s/catalog/type_chunk.h29
-rw-r--r--src/mongo/s/catalog/type_chunk_base.idl44
-rw-r--r--src/mongo/s/chunk.cpp1
-rw-r--r--src/mongo/s/chunk.h2
-rw-r--r--src/mongo/s/request_types/commit_chunk_migration_request_test.cpp10
-rw-r--r--src/mongo/s/request_types/commit_chunk_migration_request_type.cpp19
-rw-r--r--src/mongo/s/request_types/commit_chunk_migration_request_type.h9
-rw-r--r--src/mongo/s/request_types/merge_chunk_request_test.cpp4
-rw-r--r--src/mongo/s/request_types/merge_chunk_request_type.cpp30
-rw-r--r--src/mongo/s/request_types/merge_chunk_request_type.h9
22 files changed, 548 insertions, 43 deletions
diff --git a/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp b/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp
index 28119fe682e..5756821d11e 100644
--- a/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp
+++ b/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp
@@ -134,7 +134,8 @@ public:
commitRequest.getControlChunk(),
commitRequest.getCollectionEpoch(),
commitRequest.getFromShard(),
- commitRequest.getToShard());
+ commitRequest.getToShard(),
+ commitRequest.getValidAfter());
if (!response.isOK()) {
return CommandHelpers::appendCommandStatus(result, response.getStatus());
}
diff --git a/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp b/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp
index 74d11ef2eb5..91c7fe9de31 100644
--- a/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp
+++ b/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp
@@ -114,7 +114,8 @@ public:
parsedRequest.getNamespace(),
parsedRequest.getEpoch(),
parsedRequest.getChunkBoundaries(),
- parsedRequest.getShardName());
+ parsedRequest.getShardName(),
+ parsedRequest.getValidAfter());
if (!mergeChunkResult.isOK()) {
return CommandHelpers::appendCommandStatus(result, mergeChunkResult);
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index 786b862f7ee..a244f701133 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -171,15 +171,20 @@ public:
/**
* Updates metadata in the config.chunks collection so the chunks with given boundaries are seen
* merged into a single larger chunk.
+ * If 'validAfter' is not set, this means the commit request came from an older server version,
+ * which is not history-aware.
*/
Status commitChunkMerge(OperationContext* opCtx,
const NamespaceString& nss,
const OID& requestEpoch,
const std::vector<BSONObj>& chunkBoundaries,
- const std::string& shardName);
+ const std::string& shardName,
+ const boost::optional<Timestamp>& validAfter);
/**
* Updates metadata in config.chunks collection to show the given chunk in its new shard.
+ * If 'validAfter' is not set, this means the commit request came from an older server version,
+ * which is not history-aware.
*/
StatusWith<BSONObj> commitChunkMigration(OperationContext* opCtx,
const NamespaceString& nss,
@@ -187,7 +192,8 @@ public:
const boost::optional<ChunkType>& controlChunk,
const OID& collectionEpoch,
const ShardId& fromShard,
- const ShardId& toShard);
+ const ShardId& toShard,
+ const boost::optional<Timestamp>& validAfter);
//
// Database Operations
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index ea34e097844..c4fc7ca563e 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -70,7 +70,8 @@ void appendShortVersion(BufBuilder* b, const ChunkType& chunk) {
}
BSONArray buildMergeChunksTransactionUpdates(const std::vector<ChunkType>& chunksToMerge,
- const ChunkVersion& mergeVersion) {
+ const ChunkVersion& mergeVersion,
+ const boost::optional<Timestamp>& validAfter) {
BSONArrayBuilder updates;
// Build an update operation to expand the first chunk into the newly merged chunk
@@ -87,6 +88,13 @@ BSONArray buildMergeChunksTransactionUpdates(const std::vector<ChunkType>& chunk
// fill in additional details for sending through transaction
mergedChunk.setVersion(mergeVersion);
+ // Clear the chunk history
+ std::vector<ChunkHistory> history;
+ if (validAfter) {
+ history.emplace_back(ChunkHistory(validAfter.get(), mergedChunk.getShard()));
+ }
+ mergedChunk.setHistory(std::move(history));
+
// add the new chunk information as the update object
op.append("o", mergedChunk.toConfigBSON());
@@ -188,6 +196,7 @@ BSONObj makeCommitChunkTransactionCommand(const NamespaceString& nss,
n.append(ChunkType::min(), migratedChunk.getMin());
n.append(ChunkType::max(), migratedChunk.getMax());
n.append(ChunkType::shard(), toShard);
+ migratedChunk.addHistoryToBSON(n);
n.done();
BSONObjBuilder q(op.subobjStart("o2"));
@@ -275,6 +284,32 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx,
return {ErrorCodes::StaleEpoch, errmsg};
}
+ // Find the chunk history.
+ auto findHistory = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON(ChunkType::name << ChunkType::genID(nss, range.getMin())),
+ BSONObj(),
+ 1);
+ if (!findHistory.isOK()) {
+ return findHistory.getStatus();
+ }
+
+ const auto origChunks = std::move(findHistory.getValue().docs);
+ if (origChunks.size() != 1) {
+ return {ErrorCodes::IncompatibleShardingMetadata,
+ str::stream() << "Tried to find the chunk history for '"
+ << ChunkType::genID(nss, range.getMin())
+ << ", but found no chunks"};
+ }
+
+ const auto origChunk = ChunkType::fromConfigBSON(origChunks.front());
+ if (!origChunk.isOK()) {
+ return origChunk.getStatus();
+ }
+
std::vector<ChunkType> newChunks;
ChunkVersion currentMaxVersion = collVersion;
@@ -336,6 +371,7 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx,
n.append(ChunkType::min(), startKey);
n.append(ChunkType::max(), endKey);
n.append(ChunkType::shard(), shardName);
+ origChunk.getValue().addHistoryToBSON(n);
n.done();
// add the chunk's _id as the query part of the update statement
@@ -431,7 +467,8 @@ Status ShardingCatalogManager::commitChunkMerge(OperationContext* opCtx,
const NamespaceString& nss,
const OID& requestEpoch,
const std::vector<BSONObj>& chunkBoundaries,
- const std::string& shardName) {
+ const std::string& shardName,
+ const boost::optional<Timestamp>& validAfter) {
// This method must never be called with empty chunks to merge
invariant(!chunkBoundaries.empty());
@@ -500,7 +537,7 @@ Status ShardingCatalogManager::commitChunkMerge(OperationContext* opCtx,
ChunkVersion mergeVersion = collVersion;
mergeVersion.incMinor();
- auto updates = buildMergeChunksTransactionUpdates(chunksToMerge, mergeVersion);
+ auto updates = buildMergeChunksTransactionUpdates(chunksToMerge, mergeVersion, validAfter);
auto preCond = buildMergeChunksTransactionPrecond(chunksToMerge, collVersion);
// apply the batch of updates to local metadata
@@ -542,7 +579,8 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
const boost::optional<ChunkType>& controlChunk,
const OID& collectionEpoch,
const ShardId& fromShard,
- const ShardId& toShard) {
+ const ShardId& toShard,
+ const boost::optional<Timestamp>& validAfter) {
auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
@@ -624,12 +662,65 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
}
}
+ // Find the chunk history.
+ auto findHistory = configShard->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON(ChunkType::name << ChunkType::genID(nss, migratedChunk.getMin())),
+ BSONObj(),
+ 1);
+ if (!findHistory.isOK()) {
+ return findHistory.getStatus();
+ }
+
+ const auto origChunks = std::move(findHistory.getValue().docs);
+ if (origChunks.size() != 1) {
+ return {ErrorCodes::IncompatibleShardingMetadata,
+ str::stream() << "Tried to find the chunk history for '"
+ << ChunkType::genID(nss, migratedChunk.getMin())
+ << ", but found no chunks"};
+ }
+
+ const auto origChunk = ChunkType::fromConfigBSON(origChunks.front());
+ if (!origChunk.isOK()) {
+ return origChunk.getStatus();
+ }
+
// Generate the new versions of migratedChunk and controlChunk. Migrating chunk's minor version
// will be 0.
ChunkType newMigratedChunk = migratedChunk;
newMigratedChunk.setVersion(ChunkVersion(
currentCollectionVersion.majorVersion() + 1, 0, currentCollectionVersion.epoch()));
+ // Copy the complete history.
+ auto newHistory = origChunk.getValue().getHistory();
+ const int kHistorySecs = 10;
+
+ // Update the history of the migrated chunk.
+ if (validAfter) {
+ // Drop the history that is too old (10 seconds of history for now).
+ // TODO SERVER-33831 to update the old history removal policy.
+ while (!newHistory.empty() &&
+ newHistory.back().getValidAfter().getSecs() + kHistorySecs <
+ validAfter.get().getSecs()) {
+ newHistory.pop_back();
+ }
+
+ if (!newHistory.empty() && newHistory.front().getValidAfter() >= validAfter.get()) {
+ return {ErrorCodes::IncompatibleShardingMetadata,
+ str::stream() << "The chunk history for '"
+ << ChunkType::genID(nss, migratedChunk.getMin())
+ << " is corrupted. The last validAfter "
+ << newHistory.back().getValidAfter().toString()
+ << " is greater or equal to the new validAfter "
+ << validAfter.get().toString()};
+ }
+ newHistory.emplace(newHistory.begin(), ChunkHistory(validAfter.get(), toShard));
+ }
+ newMigratedChunk.setHistory(std::move(newHistory));
+
// Control chunk's minor version will be 1 (if control chunk is present).
boost::optional<ChunkType> newControlChunk = boost::none;
if (controlChunk) {
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
index e990e35f7dd..802faba9f97 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
@@ -274,6 +275,8 @@ ChunkVersion ShardingCatalogManager::_createFirstChunks(OperationContext* opCtx,
log() << "going to create " << splitPoints.size() + 1 << " chunk(s) for: " << nss
<< " using new epoch " << version.epoch();
+ const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp();
+
for (unsigned i = 0; i <= splitPoints.size(); i++) {
const BSONObj min = (i == 0) ? keyPattern.globalMin() : splitPoints[i - 1];
const BSONObj max = (i < splitPoints.size()) ? splitPoints[i] : keyPattern.globalMax();
@@ -290,6 +293,10 @@ ChunkVersion ShardingCatalogManager::_createFirstChunks(OperationContext* opCtx,
chunk.setMax(max);
chunk.setShard(shardIds[i % shardIds.size()]);
chunk.setVersion(version);
+ // TODO SERVER-33781 write history only when FCV4.0 config.
+ std::vector<ChunkHistory> initialHistory;
+ initialHistory.emplace_back(ChunkHistory(validAfter, shardIds[i % shardIds.size()]));
+ chunk.setHistory(std::move(initialHistory));
uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument(
opCtx,
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp
index 76340441d2c..dd1357e8b4a 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp
@@ -64,6 +64,7 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandWithCtl) {
chunk0.setNS(kNamespace);
chunk0.setVersion(origVersion);
chunk0.setShard(shard0.getName());
+ chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))});
// apportion
auto chunkMin = BSON("a" << 1);
@@ -86,6 +87,8 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandWithCtl) {
ChunkType const& chunk0cref = chunk0;
ChunkType const& chunk1cref = chunk1;
+ Timestamp validAfter{101, 0};
+
StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
->commitChunkMigration(operationContext(),
chunk0.getNS(),
@@ -93,7 +96,8 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandWithCtl) {
chunk1cref,
origVersion.epoch(),
ShardId(shard0.getName()),
- ShardId(shard1.getName()));
+ ShardId(shard1.getName()),
+ validAfter);
ASSERT_OK(resultBSON.getStatus());
@@ -111,6 +115,9 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandWithCtl) {
auto chunkDoc0 = uassertStatusOK(getChunkDoc(operationContext(), chunkMin));
ASSERT_EQ("shard1", chunkDoc0.getShard().toString());
ASSERT_EQ(mver.getValue(), chunkDoc0.getVersion());
+ // The history should be updated.
+ ASSERT_EQ(2UL, chunkDoc0.getHistory().size());
+ ASSERT_EQ(validAfter, chunkDoc0.getHistory().front().getValidAfter());
auto chunkDoc1 = uassertStatusOK(getChunkDoc(operationContext(), chunkMax));
ASSERT_EQ("shard0", chunkDoc1.getShard().toString());
@@ -136,6 +143,7 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtl) {
chunk0.setNS(kNamespace);
chunk0.setVersion(origVersion);
chunk0.setShard(shard0.getName());
+ chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))});
// apportion
auto chunkMin = BSON("a" << 1);
@@ -145,6 +153,8 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtl) {
setupChunks({chunk0}).transitional_ignore();
+ Timestamp validAfter{101, 0};
+
StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
->commitChunkMigration(operationContext(),
chunk0.getNS(),
@@ -152,7 +162,8 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtl) {
boost::none,
origVersion.epoch(),
ShardId(shard0.getName()),
- ShardId(shard1.getName()));
+ ShardId(shard1.getName()),
+ validAfter);
ASSERT_OK(resultBSON.getStatus());
@@ -169,6 +180,116 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtl) {
auto chunkDoc0 = uassertStatusOK(getChunkDoc(operationContext(), chunkMin));
ASSERT_EQ("shard1", chunkDoc0.getShard().toString());
ASSERT_EQ(mver.getValue(), chunkDoc0.getVersion());
+ // The history should be updated.
+ ASSERT_EQ(2UL, chunkDoc0.getHistory().size());
+ ASSERT_EQ(validAfter, chunkDoc0.getHistory().front().getValidAfter());
+}
+
+TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtlTrimHistory) {
+
+ ShardType shard0;
+ shard0.setName("shard0");
+ shard0.setHost("shard0:12");
+
+ ShardType shard1;
+ shard1.setName("shard1");
+ shard1.setHost("shard1:12");
+
+ setupShards({shard0, shard1}).transitional_ignore();
+
+ int origMajorVersion = 15;
+ auto const origVersion = ChunkVersion(origMajorVersion, 4, OID::gen());
+
+ ChunkType chunk0;
+ chunk0.setNS(kNamespace);
+ chunk0.setVersion(origVersion);
+ chunk0.setShard(shard0.getName());
+ chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))});
+
+ // apportion
+ auto chunkMin = BSON("a" << 1);
+ chunk0.setMin(chunkMin);
+ auto chunkMax = BSON("a" << 10);
+ chunk0.setMax(chunkMax);
+
+ setupChunks({chunk0}).transitional_ignore();
+
+ // Make the time distance between the last history element large enough.
+ Timestamp validAfter{200, 0};
+
+ StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
+ ->commitChunkMigration(operationContext(),
+ chunk0.getNS(),
+ chunk0,
+ boost::none,
+ origVersion.epoch(),
+ ShardId(shard0.getName()),
+ ShardId(shard1.getName()),
+ validAfter);
+
+ ASSERT_OK(resultBSON.getStatus());
+
+ // Verify the version returned matches expected value.
+ BSONObj versions = resultBSON.getValue();
+ auto mver = ChunkVersion::parseFromBSONWithFieldForCommands(versions, "migratedChunkVersion");
+ ASSERT_OK(mver.getStatus());
+ ASSERT_EQ(ChunkVersion(origMajorVersion + 1, 0, origVersion.epoch()), mver.getValue());
+
+ auto cver = ChunkVersion::parseFromBSONWithFieldForCommands(versions, "controlChunkVersion");
+ ASSERT_NOT_OK(cver.getStatus());
+
+ // Verify the chunk ended up in the right shard, and version matches the value returned.
+ auto chunkDoc0 = uassertStatusOK(getChunkDoc(operationContext(), chunkMin));
+ ASSERT_EQ("shard1", chunkDoc0.getShard().toString());
+ ASSERT_EQ(mver.getValue(), chunkDoc0.getVersion());
+ // The history should be updated.
+ ASSERT_EQ(1UL, chunkDoc0.getHistory().size());
+ ASSERT_EQ(validAfter, chunkDoc0.getHistory().front().getValidAfter());
+}
+
+TEST_F(CommitChunkMigrate, RejectOutOfOrderHistory) {
+
+ ShardType shard0;
+ shard0.setName("shard0");
+ shard0.setHost("shard0:12");
+
+ ShardType shard1;
+ shard1.setName("shard1");
+ shard1.setHost("shard1:12");
+
+ setupShards({shard0, shard1}).transitional_ignore();
+
+ int origMajorVersion = 15;
+ auto const origVersion = ChunkVersion(origMajorVersion, 4, OID::gen());
+
+ ChunkType chunk0;
+ chunk0.setNS(kNamespace);
+ chunk0.setVersion(origVersion);
+ chunk0.setShard(shard0.getName());
+ chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))});
+
+ // apportion
+ auto chunkMin = BSON("a" << 1);
+ chunk0.setMin(chunkMin);
+ auto chunkMax = BSON("a" << 10);
+ chunk0.setMax(chunkMax);
+
+ setupChunks({chunk0}).transitional_ignore();
+
+ // Make the time before the last change to trigger the failure.
+ Timestamp validAfter{99, 0};
+
+ StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
+ ->commitChunkMigration(operationContext(),
+ chunk0.getNS(),
+ chunk0,
+ boost::none,
+ origVersion.epoch(),
+ ShardId(shard0.getName()),
+ ShardId(shard1.getName()),
+ validAfter);
+
+ ASSERT_EQ(ErrorCodes::IncompatibleShardingMetadata, resultBSON.getStatus());
}
TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch0) {
@@ -208,6 +329,8 @@ TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch0) {
setupChunks({chunk0, chunk1}).transitional_ignore();
+ Timestamp validAfter{1};
+
StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
->commitChunkMigration(operationContext(),
chunk0.getNS(),
@@ -215,7 +338,8 @@ TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch0) {
chunk1,
OID::gen(),
ShardId(shard0.getName()),
- ShardId(shard1.getName()));
+ ShardId(shard1.getName()),
+ validAfter);
ASSERT_EQ(ErrorCodes::StaleEpoch, resultBSON.getStatus());
}
@@ -259,6 +383,8 @@ TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch1) {
// get version from the control chunk this time
setupChunks({chunk1, chunk0}).transitional_ignore();
+ Timestamp validAfter{1};
+
StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
->commitChunkMigration(operationContext(),
chunk0.getNS(),
@@ -266,7 +392,8 @@ TEST_F(CommitChunkMigrate, RejectWrongCollectionEpoch1) {
chunk1,
origVersion.epoch(),
ShardId(shard0.getName()),
- ShardId(shard1.getName()));
+ ShardId(shard1.getName()),
+ validAfter);
ASSERT_EQ(ErrorCodes::StaleEpoch, resultBSON.getStatus());
}
@@ -308,6 +435,8 @@ TEST_F(CommitChunkMigrate, RejectChunkMissing0) {
setupChunks({chunk1}).transitional_ignore();
+ Timestamp validAfter{1};
+
StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
->commitChunkMigration(operationContext(),
chunk0.getNS(),
@@ -315,7 +444,8 @@ TEST_F(CommitChunkMigrate, RejectChunkMissing0) {
chunk1,
origVersion.epoch(),
ShardId(shard0.getName()),
- ShardId(shard1.getName()));
+ ShardId(shard1.getName()),
+ validAfter);
ASSERT_EQ(40165, resultBSON.getStatus().code());
}
@@ -357,6 +487,8 @@ TEST_F(CommitChunkMigrate, RejectChunkMissing1) {
setupChunks({chunk0}).transitional_ignore();
+ Timestamp validAfter{1};
+
StatusWith<BSONObj> resultBSON = ShardingCatalogManager::get(operationContext())
->commitChunkMigration(operationContext(),
chunk0.getNS(),
@@ -364,7 +496,8 @@ TEST_F(CommitChunkMigrate, RejectChunkMissing1) {
chunk1,
origVersion.epoch(),
ShardId(shard0.getName()),
- ShardId(shard1.getName()));
+ ShardId(shard1.getName()),
+ validAfter);
ASSERT_EQ(40165, resultBSON.getStatus().code());
}
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp
index 3d115ec0a80..fee827a420f 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_merge_chunks_test.cpp
@@ -67,12 +67,15 @@ TEST_F(MergeChunkTest, MergeExistingChunksCorrectlyShouldSucceed) {
setupChunks({chunk, chunk2}).transitional_ignore();
+ Timestamp validAfter{100, 0};
+
ASSERT_OK(ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.TestColl"),
origVersion.epoch(),
chunkBoundaries,
- "shard0000"));
+ "shard0000",
+ validAfter));
auto findResponse = uassertStatusOK(
getConfigShard()->exhaustiveFindOnConfig(operationContext(),
@@ -98,6 +101,10 @@ TEST_F(MergeChunkTest, MergeExistingChunksCorrectlyShouldSucceed) {
ASSERT_EQ(origVersion.majorVersion(), mergedChunk.getVersion().majorVersion());
ASSERT_EQ(origVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion());
}
+
+ // Make sure history is there
+ ASSERT_EQ(1UL, mergedChunk.getHistory().size());
+ ASSERT_EQ(validAfter, mergedChunk.getHistory().front().getValidAfter());
}
TEST_F(MergeChunkTest, MergeSeveralChunksCorrectlyShouldSucceed) {
@@ -131,12 +138,15 @@ TEST_F(MergeChunkTest, MergeSeveralChunksCorrectlyShouldSucceed) {
setupChunks({chunk, chunk2, chunk3}).transitional_ignore();
+ Timestamp validAfter{100, 0};
+
ASSERT_OK(ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.TestColl"),
origVersion.epoch(),
chunkBoundaries,
- "shard0000"));
+ "shard0000",
+ validAfter));
auto findResponse = uassertStatusOK(
getConfigShard()->exhaustiveFindOnConfig(operationContext(),
@@ -162,6 +172,10 @@ TEST_F(MergeChunkTest, MergeSeveralChunksCorrectlyShouldSucceed) {
ASSERT_EQ(origVersion.majorVersion(), mergedChunk.getVersion().majorVersion());
ASSERT_EQ(origVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion());
}
+
+ // Make sure history is there
+ ASSERT_EQ(1UL, mergedChunk.getHistory().size());
+ ASSERT_EQ(validAfter, mergedChunk.getHistory().front().getValidAfter());
}
TEST_F(MergeChunkTest, NewMergeShouldClaimHighestVersion) {
@@ -199,12 +213,15 @@ TEST_F(MergeChunkTest, NewMergeShouldClaimHighestVersion) {
setupChunks({chunk, chunk2, otherChunk}).transitional_ignore();
+ Timestamp validAfter{100, 0};
+
ASSERT_OK(ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.TestColl"),
collEpoch,
chunkBoundaries,
- "shard0000"));
+ "shard0000",
+ validAfter));
auto findResponse = uassertStatusOK(
getConfigShard()->exhaustiveFindOnConfig(operationContext(),
@@ -230,6 +247,10 @@ TEST_F(MergeChunkTest, NewMergeShouldClaimHighestVersion) {
ASSERT_EQ(competingVersion.majorVersion(), mergedChunk.getVersion().majorVersion());
ASSERT_EQ(competingVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion());
}
+
+ // Make sure history is there
+ ASSERT_EQ(1UL, mergedChunk.getHistory().size());
+ ASSERT_EQ(validAfter, mergedChunk.getHistory().front().getValidAfter());
}
TEST_F(MergeChunkTest, MergeLeavesOtherChunksAlone) {
@@ -263,12 +284,15 @@ TEST_F(MergeChunkTest, MergeLeavesOtherChunksAlone) {
setupChunks({chunk, chunk2, otherChunk}).transitional_ignore();
+ Timestamp validAfter{1};
+
ASSERT_OK(ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.TestColl"),
origVersion.epoch(),
chunkBoundaries,
- "shard0000"));
+ "shard0000",
+ validAfter));
auto findResponse = uassertStatusOK(
getConfigShard()->exhaustiveFindOnConfig(operationContext(),
@@ -326,12 +350,15 @@ TEST_F(MergeChunkTest, NonExistingNamespace) {
setupChunks({chunk, chunk2}).transitional_ignore();
+ Timestamp validAfter{1};
+
auto mergeStatus = ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.NonExistingColl"),
origVersion.epoch(),
chunkBoundaries,
- "shard0000");
+ "shard0000",
+ validAfter);
ASSERT_EQ(ErrorCodes::IllegalOperation, mergeStatus);
}
@@ -360,12 +387,15 @@ TEST_F(MergeChunkTest, NonMatchingEpochsOfChunkAndRequestErrors) {
setupChunks({chunk, chunk2}).transitional_ignore();
+ Timestamp validAfter{1};
+
auto mergeStatus = ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.TestColl"),
OID::gen(),
chunkBoundaries,
- "shard0000");
+ "shard0000",
+ validAfter);
ASSERT_EQ(ErrorCodes::StaleEpoch, mergeStatus);
}
@@ -400,13 +430,16 @@ TEST_F(MergeChunkTest, MergeAlreadyHappenedFailsPrecondition) {
setupChunks({mergedChunk}).transitional_ignore();
+ Timestamp validAfter{1};
+
ASSERT_EQ(ErrorCodes::BadValue,
ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.TestColl"),
origVersion.epoch(),
chunkBoundaries,
- "shard0000"));
+ "shard0000",
+ validAfter));
// Verify that no change to config.chunks happened.
auto findResponse = uassertStatusOK(
@@ -461,13 +494,16 @@ TEST_F(MergeChunkTest, ChunkBoundariesOutOfOrderFails) {
setupChunks(originalChunks).transitional_ignore();
}
+ Timestamp validAfter{1};
+
ASSERT_EQ(ErrorCodes::InvalidOptions,
ShardingCatalogManager::get(operationContext())
->commitChunkMerge(operationContext(),
NamespaceString("TestDB.TestColl"),
epoch,
chunkBoundaries,
- "shard0000"));
+ "shard0000",
+ validAfter));
}
} // namespace
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp
index 31d3a794d41..932bc374745 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp
@@ -53,6 +53,8 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) {
auto chunkMax = BSON("a" << 10);
chunk.setMin(chunkMin);
chunk.setMax(chunkMax);
+ chunk.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX")),
+ ChunkHistory(Timestamp(90, 0), ShardId("shardY"))});
auto chunkSplitPoint = BSON("a" << 5);
std::vector<BSONObj> splitPoints{chunkSplitPoint};
@@ -78,6 +80,9 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) {
ASSERT_EQ(origVersion.majorVersion(), chunkDoc.getVersion().majorVersion());
ASSERT_EQ(origVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion());
+ // Make sure the history is there
+ ASSERT_EQ(2UL, chunkDoc.getHistory().size());
+
// Second chunkDoc should have range [chunkSplitPoint, chunkMax]
auto otherChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint);
ASSERT_OK(otherChunkDocStatus.getStatus());
@@ -88,6 +93,12 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) {
// Check for increment on second chunkDoc's minor version
ASSERT_EQ(origVersion.majorVersion(), otherChunkDoc.getVersion().majorVersion());
ASSERT_EQ(origVersion.minorVersion() + 2, otherChunkDoc.getVersion().minorVersion());
+
+ // Make sure the history is there
+ ASSERT_EQ(2UL, otherChunkDoc.getHistory().size());
+
+ // Both chunks should have the same history
+ ASSERT(chunkDoc.getHistory() == otherChunkDoc.getHistory());
}
TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
@@ -102,6 +113,8 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
auto chunkMax = BSON("a" << 10);
chunk.setMin(chunkMin);
chunk.setMax(chunkMax);
+ chunk.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX")),
+ ChunkHistory(Timestamp(90, 0), ShardId("shardY"))});
auto chunkSplitPoint = BSON("a" << 5);
auto chunkSplitPoint2 = BSON("a" << 7);
@@ -128,6 +141,9 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
ASSERT_EQ(origVersion.majorVersion(), chunkDoc.getVersion().majorVersion());
ASSERT_EQ(origVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion());
+ // Make sure the history is there
+ ASSERT_EQ(2UL, chunkDoc.getHistory().size());
+
// Second chunkDoc should have range [chunkSplitPoint, chunkSplitPoint2]
auto midChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint);
ASSERT_OK(midChunkDocStatus.getStatus());
@@ -139,6 +155,9 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
ASSERT_EQ(origVersion.majorVersion(), midChunkDoc.getVersion().majorVersion());
ASSERT_EQ(origVersion.minorVersion() + 2, midChunkDoc.getVersion().minorVersion());
+ // Make sure the history is there
+ ASSERT_EQ(2UL, midChunkDoc.getHistory().size());
+
// Third chunkDoc should have range [chunkSplitPoint2, chunkMax]
auto lastChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint2);
ASSERT_OK(lastChunkDocStatus.getStatus());
@@ -149,6 +168,13 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
// Check for increment on third chunkDoc's minor version
ASSERT_EQ(origVersion.majorVersion(), lastChunkDoc.getVersion().majorVersion());
ASSERT_EQ(origVersion.minorVersion() + 3, lastChunkDoc.getVersion().minorVersion());
+
+ // Make sure the history is there
+ ASSERT_EQ(2UL, lastChunkDoc.getHistory().size());
+
+ // Both chunks should have the same history
+ ASSERT(chunkDoc.getHistory() == midChunkDoc.getHistory());
+ ASSERT(midChunkDoc.getHistory() == lastChunkDoc.getHistory());
}
TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) {
diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp
index 0458815e990..7d523de8854 100644
--- a/src/mongo/db/s/merge_chunks_command.cpp
+++ b/src/mongo/db/s/merge_chunks_command.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands.h"
#include "mongo/db/field_parser.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
@@ -240,8 +241,11 @@ Status mergeChunks(OperationContext* opCtx,
//
// Run _configsvrCommitChunkMerge.
//
- MergeChunkRequest request{
- nss, shardingState->getShardName(), shardVersion.epoch(), chunkBoundaries};
+ MergeChunkRequest request{nss,
+ shardingState->getShardName(),
+ shardVersion.epoch(),
+ chunkBoundaries,
+ LogicalClock::get(opCtx)->getClusterTime().asTimestamp()};
auto configCmdObj =
request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 8f113193f1e..bb0dc85c807 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -35,6 +35,7 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
@@ -408,13 +409,15 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
migratedChunkType.setMin(_args.getMinKey());
migratedChunkType.setMax(_args.getMaxKey());
- CommitChunkMigrationRequest::appendAsCommand(&builder,
- getNss(),
- _args.getFromShardId(),
- _args.getToShardId(),
- migratedChunkType,
- controlChunkType,
- metadata->getCollVersion());
+ CommitChunkMigrationRequest::appendAsCommand(
+ &builder,
+ getNss(),
+ _args.getFromShardId(),
+ _args.getToShardId(),
+ migratedChunkType,
+ controlChunkType,
+ metadata->getCollVersion(),
+ LogicalClock::get(opCtx)->getClusterTime().asTimestamp());
builder.append(kWriteConcernField, kMajorityWriteConcern.toBSON());
}
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 6b668e0c4ca..59aeadf2017 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -130,6 +130,7 @@ env.Library(
'request_types/update_zone_key_range_request_type.cpp',
'shard_id.cpp',
'versioning.cpp',
+ env.Idlc('catalog/type_chunk_base.idl')[0],
env.Idlc('request_types/create_collection.idl')[0],
env.Idlc('request_types/create_database.idl')[0],
env.Idlc('request_types/flush_routing_table_cache_updates.idl')[0],
diff --git a/src/mongo/s/catalog/type_chunk.cpp b/src/mongo/s/catalog/type_chunk.cpp
index 58a181240a9..5aa3132a78e 100644
--- a/src/mongo/s/catalog/type_chunk.cpp
+++ b/src/mongo/s/catalog/type_chunk.cpp
@@ -54,6 +54,7 @@ const BSONField<std::string> ChunkType::shard("shard");
const BSONField<bool> ChunkType::jumbo("jumbo");
const BSONField<Date_t> ChunkType::lastmod("lastmod");
const BSONField<OID> ChunkType::epoch("lastmodEpoch");
+const BSONField<BSONObj> ChunkType::history("history");
namespace {
@@ -157,6 +158,23 @@ ChunkRange ChunkRange::unionWith(ChunkRange const& other) const {
le(_maxKey, other._maxKey) ? other._maxKey : _maxKey);
}
+StatusWith<std::vector<ChunkHistory>> ChunkHistory::fromBSON(const BSONArray& source) {
+ std::vector<ChunkHistory> values;
+
+ for (const auto& arrayElement : source) {
+ if (arrayElement.type() == Object) {
+ IDLParserErrorContext tempContext("chunk history array");
+ values.emplace_back(ChunkHistoryBase::parse(tempContext, arrayElement.Obj()));
+ } else {
+ return {ErrorCodes::BadValue,
+ str::stream() << "array element does not have the object type: "
+ << arrayElement.type()};
+ }
+ }
+
+ return values;
+}
+
// ChunkType
ChunkType::ChunkType() = default;
@@ -217,6 +235,22 @@ StatusWith<ChunkType> ChunkType::fromConfigBSON(const BSONObj& source) {
chunk._version = std::move(versionStatus.getValue());
}
+ {
+ BSONElement historyObj;
+ Status status = bsonExtractTypedField(source, history.name(), Array, &historyObj);
+ if (status.isOK()) {
+ auto history = std::move(ChunkHistory::fromBSON(BSONArray(historyObj.Obj())));
+ if (!history.isOK())
+ return history.getStatus();
+
+ chunk._history = std::move(history.getValue());
+ } else if (status == ErrorCodes::NoSuchKey) {
+ // History is missing, so it will be presumed empty
+ } else {
+ return status;
+ }
+ }
+
return chunk;
}
@@ -236,7 +270,7 @@ BSONObj ChunkType::toConfigBSON() const {
_version->appendForChunk(&builder);
if (_jumbo)
builder.append(jumbo.name(), getJumbo());
-
+ addHistoryToBSON(builder);
return builder.obj();
}
@@ -283,6 +317,22 @@ StatusWith<ChunkType> ChunkType::fromShardBSON(const BSONObj& source, const OID&
chunk._version = std::move(statusWithChunkVersion.getValue());
}
+ {
+ BSONElement historyObj;
+ Status status = bsonExtractTypedField(source, history.name(), Array, &historyObj);
+ if (status.isOK()) {
+ auto history = std::move(ChunkHistory::fromBSON(BSONArray(historyObj.Obj())));
+ if (!history.isOK())
+ return history.getStatus();
+
+ chunk._history = std::move(history.getValue());
+ } else if (status == ErrorCodes::NoSuchKey) {
+ // History is missing, so it will be presumed empty
+ } else {
+ return status;
+ }
+ }
+
return chunk;
}
@@ -296,6 +346,7 @@ BSONObj ChunkType::toShardBSON() const {
builder.append(max.name(), getMax());
builder.append(shard.name(), getShard().toString());
builder.appendTimestamp(lastmod.name(), _version->toLong());
+ addHistoryToBSON(builder);
return builder.obj();
}
@@ -334,6 +385,16 @@ void ChunkType::setJumbo(bool jumbo) {
_jumbo = jumbo;
}
+void ChunkType::addHistoryToBSON(BSONObjBuilder& builder) const {
+ if (_history.size()) {
+ BSONArrayBuilder arrayBuilder(builder.subarrayStart(history.name()));
+ for (const auto& item : _history) {
+ BSONObjBuilder subObjBuilder(arrayBuilder.subobjStart());
+ item.serialize(&subObjBuilder);
+ }
+ }
+}
+
std::string ChunkType::genID(const NamespaceString& nss, const BSONObj& o) {
StringBuilder buf;
buf << nss.ns() << "-";
diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h
index b4998af3c9a..139e2d1dfa9 100644
--- a/src/mongo/s/catalog/type_chunk.h
+++ b/src/mongo/s/catalog/type_chunk.h
@@ -33,6 +33,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/s/catalog/type_chunk_base_gen.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/shard_id.h"
@@ -104,6 +105,22 @@ private:
BSONObj _maxKey;
};
+class ChunkHistory : public ChunkHistoryBase {
+public:
+ ChunkHistory() : ChunkHistoryBase() {}
+ ChunkHistory(mongo::Timestamp ts, mongo::ShardId shard) : ChunkHistoryBase() {
+ setValidAfter(std::move(ts));
+ setShard(std::move(shard));
+ }
+ ChunkHistory(const ChunkHistoryBase& b) : ChunkHistoryBase(b) {}
+
+ static StatusWith<std::vector<ChunkHistory>> fromBSON(const BSONArray& source);
+
+ bool operator==(const ChunkHistory& other) const {
+ return getValidAfter() == other.getValidAfter() && getShard() == other.getShard();
+ }
+};
+
/**
* This class represents the layouts and contents of documents contained in the config server's
* config.chunks and shard server's config.chunks.uuid collections. All manipulation of documents
@@ -162,6 +179,7 @@ public:
static const BSONField<bool> jumbo;
static const BSONField<Date_t> lastmod;
static const BSONField<OID> epoch;
+ static const BSONField<BSONObj> history;
ChunkType();
ChunkType(NamespaceString nss, ChunkRange range, ChunkVersion version, ShardId shardId);
@@ -236,6 +254,15 @@ public:
}
void setJumbo(bool jumbo);
+ void setHistory(std::vector<ChunkHistory>&& history) {
+ _history = std::move(history);
+ }
+ const std::vector<ChunkHistory>& getHistory() const {
+ return _history;
+ }
+
+ void addHistoryToBSON(BSONObjBuilder& builder) const;
+
/**
* Generates chunk id based on the namespace name and the lower bound of the chunk.
*/
@@ -267,6 +294,8 @@ private:
boost::optional<ShardId> _shard;
// (O)(C) too big to move?
boost::optional<bool> _jumbo;
+ // history of the chunk
+ std::vector<ChunkHistory> _history;
};
} // namespace mongo
diff --git a/src/mongo/s/catalog/type_chunk_base.idl b/src/mongo/s/catalog/type_chunk_base.idl
new file mode 100644
index 00000000000..c1741a14f3f
--- /dev/null
+++ b/src/mongo/s/catalog/type_chunk_base.idl
@@ -0,0 +1,44 @@
+# Copyright (C) 2018 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+# ChunkTypeBase type
+
+global:
+ cpp_namespace: "mongo"
+ cpp_includes:
+ - "mongo/s/shard_id.h"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+types:
+ shardid:
+ bson_serialization_type: string
+ description: "A BSON UTF-8 string"
+ cpp_type: "ShardId"
+ deserializer: "mongo::BSONElement::str"
+ serializer: "mongo::ShardId::toString"
+
+structs:
+ ChunkHistoryBase:
+ description: "An element of a chunk history array. The array holds the history ordered from
+ the most recent (the index 0) to the oldest supported."
+ fields:
+ validAfter:
+ type: timestamp
+ description: The time after which this chunk is at this shard.
+ shard:
+ type: shardid
+ description: Shard this chunk lives in, starting at the "validAfter" time.
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp
index 527440594eb..21067acd7e0 100644
--- a/src/mongo/s/chunk.cpp
+++ b/src/mongo/s/chunk.cpp
@@ -61,6 +61,7 @@ Chunk::Chunk(const ChunkType& from)
: _range(from.getMin(), from.getMax()),
_shardId(from.getShard()),
_lastmod(from.getVersion()),
+ _history(from.getHistory()),
_jumbo(from.getJumbo()),
_dataWritten(mkDataWritten()) {
invariantOK(from.validate());
diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h
index 540238463b5..31205aefc25 100644
--- a/src/mongo/s/chunk.h
+++ b/src/mongo/s/chunk.h
@@ -98,6 +98,8 @@ private:
const ChunkVersion _lastmod;
+ const std::vector<ChunkHistory> _history;
+
// Indicates whether this chunk should be treated as jumbo and not attempted to be moved or
// split
mutable bool _jumbo;
diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp b/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp
index 9c5f513ac60..afc887745f0 100644
--- a/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp
+++ b/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp
@@ -65,13 +65,16 @@ TEST(CommitChunkMigrationRequest, WithControlChunk) {
controlChunk.setMax(kKey3);
boost::optional<ChunkType> controlChunkOpt = controlChunk;
+ Timestamp validAfter{1};
+
CommitChunkMigrationRequest::appendAsCommand(&builder,
kNamespaceString,
kShardId0,
kShardId1,
migratedChunk,
controlChunkOpt,
- fromShardCollectionVersion);
+ fromShardCollectionVersion,
+ validAfter);
BSONObj cmdObj = builder.obj();
@@ -98,13 +101,16 @@ TEST(CommitChunkMigrationRequest, WithoutControlChunk) {
ChunkVersion fromShardCollectionVersion(1, 2, OID::gen());
+ Timestamp validAfter{1};
+
CommitChunkMigrationRequest::appendAsCommand(&builder,
kNamespaceString,
kShardId0,
kShardId1,
migratedChunk,
boost::none,
- fromShardCollectionVersion);
+ fromShardCollectionVersion,
+ validAfter);
BSONObj cmdObj = builder.obj();
diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp b/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp
index 1f1125529cb..463ee0d4a92 100644
--- a/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp
+++ b/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp
@@ -41,6 +41,7 @@ const char kToShard[] = "toShard";
const char kMigratedChunk[] = "migratedChunk";
const char kControlChunk[] = "controlChunk";
const char kFromShardCollectionVersion[] = "fromShardCollectionVersion";
+const char kValidAfter[] = "validAfter";
/**
* Attempts to parse a (range-only!) ChunkType from "field" in "source".
@@ -132,6 +133,20 @@ StatusWith<CommitChunkMigrationRequest> CommitChunkMigrationRequest::createFromC
request._collectionEpoch = statusWithChunkVersion.getValue().epoch();
}
+ {
+ Timestamp validAfter;
+ auto status = bsonExtractTimestampField(obj, kValidAfter, &validAfter);
+ if (!status.isOK() && status != ErrorCodes::NoSuchKey) {
+ return status;
+ }
+
+ if (status.isOK()) {
+ request._validAfter = validAfter;
+ } else {
+ request._validAfter = boost::none;
+ }
+ }
+
return request;
}
@@ -141,7 +156,8 @@ void CommitChunkMigrationRequest::appendAsCommand(BSONObjBuilder* builder,
const ShardId& toShard,
const ChunkType& migratedChunk,
const boost::optional<ChunkType>& controlChunk,
- const ChunkVersion& fromShardCollectionVersion) {
+ const ChunkVersion& fromShardCollectionVersion,
+ const Timestamp& validAfter) {
invariant(builder->asTempObj().isEmpty());
invariant(nss.isValid());
@@ -154,6 +170,7 @@ void CommitChunkMigrationRequest::appendAsCommand(BSONObjBuilder* builder,
if (controlChunk) {
builder->append(kControlChunk, controlChunk->toConfigBSON());
}
+ builder->append(kValidAfter, validAfter);
}
} // namespace mongo
diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_type.h b/src/mongo/s/request_types/commit_chunk_migration_request_type.h
index 919db71870d..f403572e46e 100644
--- a/src/mongo/s/request_types/commit_chunk_migration_request_type.h
+++ b/src/mongo/s/request_types/commit_chunk_migration_request_type.h
@@ -60,7 +60,8 @@ struct CommitChunkMigrationRequest {
const ShardId& toShard,
const ChunkType& migratedChunkType,
const boost::optional<ChunkType>& controlChunkType,
- const ChunkVersion& fromShardChunkVersion);
+ const ChunkVersion& fromShardChunkVersion,
+ const Timestamp& validAfter);
const NamespaceString& getNss() const {
return _nss;
@@ -80,6 +81,9 @@ struct CommitChunkMigrationRequest {
const OID& getCollectionEpoch() {
return _collectionEpoch;
}
+ const boost::optional<Timestamp>& getValidAfter() {
+ return _validAfter;
+ }
// The collection for which this request applies.
NamespaceString _nss;
@@ -97,6 +101,9 @@ struct CommitChunkMigrationRequest {
boost::optional<ChunkType> _controlChunk;
OID _collectionEpoch;
+
+ // The time of the move
+ boost::optional<Timestamp> _validAfter;
};
} // namespace mongo
diff --git a/src/mongo/s/request_types/merge_chunk_request_test.cpp b/src/mongo/s/request_types/merge_chunk_request_test.cpp
index de9ecd7990c..ce8b7366a9e 100644
--- a/src/mongo/s/request_types/merge_chunk_request_test.cpp
+++ b/src/mongo/s/request_types/merge_chunk_request_test.cpp
@@ -64,7 +64,9 @@ TEST(MergeChunkRequest, ConfigCommandtoBSON) {
<< "chunkBoundaries"
<< BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))
<< "shard"
- << "shard0000");
+ << "shard0000"
+ << "validAfter"
+ << Timestamp{100});
BSONObj writeConcernObj = BSON("writeConcern" << BSON("w"
<< "majority"));
diff --git a/src/mongo/s/request_types/merge_chunk_request_type.cpp b/src/mongo/s/request_types/merge_chunk_request_type.cpp
index 64200b85460..abace857a99 100644
--- a/src/mongo/s/request_types/merge_chunk_request_type.cpp
+++ b/src/mongo/s/request_types/merge_chunk_request_type.cpp
@@ -40,17 +40,19 @@ const char kConfigsvrMergeChunk[] = "_configsvrCommitChunkMerge";
const char kCollEpoch[] = "collEpoch";
const char kChunkBoundaries[] = "chunkBoundaries";
const char kShardName[] = "shard";
-
+const char kValidAfter[] = "validAfter";
} // namespace
MergeChunkRequest::MergeChunkRequest(NamespaceString nss,
std::string shardName,
OID epoch,
- std::vector<BSONObj> chunkBoundaries)
+ std::vector<BSONObj> chunkBoundaries,
+ boost::optional<Timestamp> validAfter)
: _nss(std::move(nss)),
_epoch(std::move(epoch)),
_chunkBoundaries(std::move(chunkBoundaries)),
- _shardName(std::move(shardName)) {}
+ _shardName(std::move(shardName)),
+ _validAfter(validAfter) {}
StatusWith<MergeChunkRequest> MergeChunkRequest::parseFromConfigCommand(const BSONObj& cmdObj) {
std::string ns;
@@ -103,8 +105,24 @@ StatusWith<MergeChunkRequest> MergeChunkRequest::parseFromConfigCommand(const BS
}
}
- return MergeChunkRequest(
- std::move(nss), std::move(shardName), std::move(epoch), std::move(chunkBoundaries));
+ boost::optional<Timestamp> validAfter = boost::none;
+ {
+ Timestamp ts{0};
+ auto status = bsonExtractTimestampField(cmdObj, kValidAfter, &ts);
+ if (!status.isOK() && status != ErrorCodes::NoSuchKey) {
+ return status;
+ }
+
+ if (status.isOK()) {
+ validAfter = ts;
+ }
+ }
+
+ return MergeChunkRequest(std::move(nss),
+ std::move(shardName),
+ std::move(epoch),
+ std::move(chunkBoundaries),
+ validAfter);
}
BSONObj MergeChunkRequest::toConfigCommandBSON(const BSONObj& writeConcern) {
@@ -127,6 +145,8 @@ void MergeChunkRequest::appendAsConfigCommand(BSONObjBuilder* cmdBuilder) {
}
}
cmdBuilder->append(kShardName, _shardName);
+ invariant(_validAfter.is_initialized());
+ cmdBuilder->append(kValidAfter, _validAfter.get());
}
} // namespace mongo
diff --git a/src/mongo/s/request_types/merge_chunk_request_type.h b/src/mongo/s/request_types/merge_chunk_request_type.h
index 2ead5f3a9b1..2722c53b057 100644
--- a/src/mongo/s/request_types/merge_chunk_request_type.h
+++ b/src/mongo/s/request_types/merge_chunk_request_type.h
@@ -46,7 +46,8 @@ public:
MergeChunkRequest(NamespaceString nss,
std::string shardName,
OID epoch,
- std::vector<BSONObj> chunkBoundaries);
+ std::vector<BSONObj> chunkBoundaries,
+ boost::optional<Timestamp> validAfter);
/**
* Parses the provided BSON content as the internal _configsvrCommitChunkMerge command, and if
@@ -94,6 +95,10 @@ public:
return _shardName;
}
+ const boost::optional<Timestamp>& getValidAfter() const {
+ return _validAfter;
+ }
+
private:
NamespaceString _nss;
OID _epoch;
@@ -102,6 +107,8 @@ private:
std::vector<BSONObj> _chunkBoundaries;
std::string _shardName;
+
+ boost::optional<Timestamp> _validAfter;
};
} // namespace mongo