summaryrefslogtreecommitdiff
path: root/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp')
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp144
1 files changed, 143 insertions, 1 deletions
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
index dc288b6cde8..c85d0464d90 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -1014,7 +1014,7 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn,
Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock);
// Acquire GlobalLock in MODE_X twice to prevent yielding.
- // GlobalLock and the following lock on config.chunks are only needed to support
+ // GLobalLock and the following lock on config.chunks are only needed to support
// mixed-mode operation with mongoses from 3.2
// TODO(SERVER-25337): Remove GlobalLock and config.chunks lock after 3.4
Lock::GlobalLock firstGlobalLock(txn->lockState(), MODE_X, UINT_MAX);
@@ -1158,6 +1158,148 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn,
return applyOpsStatus;
}
+Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn,
+ const NamespaceString& ns,
+ const OID& requestEpoch,
+ const std::vector<BSONObj>& chunkBoundaries,
+ const std::string& shardName) {
+ // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
+ // migrations
+ // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/
+ // move chunks on different collections to proceed in parallel
+ Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock);
+
+ // Acquire GlobalLock in MODE_X twice to prevent yielding.
+ // GLobalLock and the following lock on config.chunks are only needed to support
+ // mixed-mode operation with mongoses from 3.2
+ // TODO(SERVER-25337): Remove GlobalLock and config.chunks lock after 3.4
+ Lock::GlobalLock firstGlobalLock(txn->lockState(), MODE_X, UINT_MAX);
+ Lock::GlobalLock secondGlobalLock(txn->lockState(), MODE_X, UINT_MAX);
+
+ // Acquire lock on config.chunks in MODE_X
+ AutoGetCollection autoColl(txn, NamespaceString(ChunkType::ConfigNS), MODE_X);
+
+ // Get the chunk with the highest version for this namespace
+ auto findStatus = grid.shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(ChunkType::ConfigNS),
+ BSON("ns" << ns.ns()),
+ BSON(ChunkType::DEPRECATED_lastmod << -1),
+ 1);
+
+ if (!findStatus.isOK()) {
+ return findStatus.getStatus();
+ }
+
+ const auto& chunksVector = findStatus.getValue().docs;
+ if (chunksVector.empty())
+ return {ErrorCodes::IllegalOperation,
+ "collection does not exist, isn't sharded, or has no chunks"};
+
+ ChunkVersion collVersion =
+ ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod());
+
+ // Return an error if epoch of chunk does not match epoch of request
+ if (collVersion.epoch() != requestEpoch) {
+ return {ErrorCodes::StaleEpoch,
+ "epoch of chunk does not match epoch of request. This most likely means "
+ "that the collection was dropped and re-created."};
+ }
+
+ // Build chunks to be merged
+ std::vector<ChunkType> chunksToMerge;
+
+ ChunkType itChunk;
+ itChunk.setMax(chunkBoundaries.front());
+ itChunk.setNS(ns.ns());
+ itChunk.setShard(shardName);
+
+ // Do not use the first chunk boundary as a max bound while building chunks
+ for (size_t i = 1; i < chunkBoundaries.size(); ++i) {
+ itChunk.setMin(itChunk.getMax());
+ itChunk.setMax(chunkBoundaries[i]);
+ chunksToMerge.push_back(itChunk);
+ }
+
+ ChunkVersion mergeVersion = collVersion;
+ mergeVersion.incMinor();
+
+ BSONArrayBuilder updates;
+
+ // Build an update operation to expand the first chunk into the newly merged chunk
+ {
+ BSONObjBuilder op;
+ op.append("op", "u");
+ op.appendBool("b", false);
+ op.append("ns", ChunkType::ConfigNS);
+
+ // expand first chunk into newly merged chunk
+ ChunkType mergedChunk(chunksToMerge.front());
+ mergedChunk.setMax(chunksToMerge.back().getMax());
+
+ // fill in additional details for sending through applyOps
+ mergedChunk.setVersion(mergeVersion);
+
+ // add the new chunk information as the update object
+ op.append("o", mergedChunk.toBSON());
+
+ // query object
+ op.append("o2", BSON(ChunkType::name(mergedChunk.getName())));
+
+ updates.append(op.obj());
+ }
+
+ // Build update operations to delete the rest of the chunks to be merged. Remember not
+ // to delete the first chunk we're expanding
+ for (size_t i = 1; i < chunksToMerge.size(); ++i) {
+ BSONObjBuilder op;
+ op.append("op", "d");
+ op.append("ns", ChunkType::ConfigNS);
+
+ op.append("o", BSON(ChunkType::name(chunksToMerge[i].getName())));
+
+ updates.append(op.obj());
+ }
+
+ BSONArrayBuilder preCond;
+ {
+ BSONObjBuilder b;
+ b.append("ns", ChunkType::ConfigNS);
+ b.append("q",
+ BSON("query" << BSON(ChunkType::ns(ns.ns())) << "orderby"
+ << BSON(ChunkType::DEPRECATED_lastmod() << -1)));
+ {
+ BSONObjBuilder bb(b.subobjStart("res"));
+ collVersion.addToBSON(bb, ChunkType::DEPRECATED_lastmod());
+ }
+ preCond.append(b.obj());
+ }
+
+ // apply the batch of updates to remote and local metadata
+ Status applyOpsStatus = grid.catalogClient(txn)->applyChunkOpsDeprecated(
+ txn, updates.arr(), preCond.arr(), ns.ns(), mergeVersion);
+ if (!applyOpsStatus.isOK()) {
+ return applyOpsStatus;
+ }
+
+ // log changes
+ BSONObjBuilder logDetail;
+ {
+ BSONArrayBuilder b(logDetail.subarrayStart("merged"));
+ for (auto chunkToMerge : chunksToMerge) {
+ b.append(chunkToMerge.toBSON());
+ }
+ }
+ collVersion.addToBSON(logDetail, "prevShardVersion");
+ mergeVersion.addToBSON(logDetail, "mergedVersion");
+
+ grid.catalogClient(txn)->logChange(txn, "merge", ns.ns(), logDetail.obj());
+
+ return applyOpsStatus;
+}
+
void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) {
_executorForAddShard->appendConnectionStats(stats);
}