diff options
Diffstat (limited to 'src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp')
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp | 144 |
1 files changed, 143 insertions, 1 deletions
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index dc288b6cde8..c85d0464d90 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -1014,7 +1014,7 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn, Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock); // Acquire GlobalLock in MODE_X twice to prevent yielding. - // GlobalLock and the following lock on config.chunks are only needed to support + // GLobalLock and the following lock on config.chunks are only needed to support // mixed-mode operation with mongoses from 3.2 // TODO(SERVER-25337): Remove GlobalLock and config.chunks lock after 3.4 Lock::GlobalLock firstGlobalLock(txn->lockState(), MODE_X, UINT_MAX); @@ -1158,6 +1158,148 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn, return applyOpsStatus; } +Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn, + const NamespaceString& ns, + const OID& requestEpoch, + const std::vector<BSONObj>& chunkBoundaries, + const std::string& shardName) { + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations + // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ + // move chunks on different collections to proceed in parallel + Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock); + + // Acquire GlobalLock in MODE_X twice to prevent yielding. + // GLobalLock and the following lock on config.chunks are only needed to support + // mixed-mode operation with mongoses from 3.2 + // TODO(SERVER-25337): Remove GlobalLock and config.chunks lock after 3.4 + Lock::GlobalLock firstGlobalLock(txn->lockState(), MODE_X, UINT_MAX); + Lock::GlobalLock secondGlobalLock(txn->lockState(), MODE_X, UINT_MAX); + + // Acquire lock on config.chunks in MODE_X + AutoGetCollection autoColl(txn, NamespaceString(ChunkType::ConfigNS), MODE_X); + + // Get the chunk with the highest version for this namespace + auto findStatus = grid.shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON("ns" << ns.ns()), + BSON(ChunkType::DEPRECATED_lastmod << -1), + 1); + + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + const auto& chunksVector = findStatus.getValue().docs; + if (chunksVector.empty()) + return {ErrorCodes::IllegalOperation, + "collection does not exist, isn't sharded, or has no chunks"}; + + ChunkVersion collVersion = + ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); + + // Return an error if epoch of chunk does not match epoch of request + if (collVersion.epoch() != requestEpoch) { + return {ErrorCodes::StaleEpoch, + "epoch of chunk does not match epoch of request. This most likely means " + "that the collection was dropped and re-created."}; + } + + // Build chunks to be merged + std::vector<ChunkType> chunksToMerge; + + ChunkType itChunk; + itChunk.setMax(chunkBoundaries.front()); + itChunk.setNS(ns.ns()); + itChunk.setShard(shardName); + + // Do not use the first chunk boundary as a max bound while building chunks + for (size_t i = 1; i < chunkBoundaries.size(); ++i) { + itChunk.setMin(itChunk.getMax()); + itChunk.setMax(chunkBoundaries[i]); + chunksToMerge.push_back(itChunk); + } + + ChunkVersion mergeVersion = collVersion; + mergeVersion.incMinor(); + + BSONArrayBuilder updates; + + // Build an update operation to expand the first chunk into the newly merged chunk + { + BSONObjBuilder op; + op.append("op", "u"); + op.appendBool("b", false); + op.append("ns", ChunkType::ConfigNS); + + // expand first chunk into newly merged chunk + ChunkType mergedChunk(chunksToMerge.front()); + mergedChunk.setMax(chunksToMerge.back().getMax()); + + // fill in additional details for sending through applyOps + mergedChunk.setVersion(mergeVersion); + + // add the new chunk information as the update object + op.append("o", mergedChunk.toBSON()); + + // query object + op.append("o2", BSON(ChunkType::name(mergedChunk.getName()))); + + updates.append(op.obj()); + } + + // Build update operations to delete the rest of the chunks to be merged. Remember not + // to delete the first chunk we're expanding + for (size_t i = 1; i < chunksToMerge.size(); ++i) { + BSONObjBuilder op; + op.append("op", "d"); + op.append("ns", ChunkType::ConfigNS); + + op.append("o", BSON(ChunkType::name(chunksToMerge[i].getName()))); + + updates.append(op.obj()); + } + + BSONArrayBuilder preCond; + { + BSONObjBuilder b; + b.append("ns", ChunkType::ConfigNS); + b.append("q", + BSON("query" << BSON(ChunkType::ns(ns.ns())) << "orderby" + << BSON(ChunkType::DEPRECATED_lastmod() << -1))); + { + BSONObjBuilder bb(b.subobjStart("res")); + collVersion.addToBSON(bb, ChunkType::DEPRECATED_lastmod()); + } + preCond.append(b.obj()); + } + + // apply the batch of updates to remote and local metadata + Status applyOpsStatus = grid.catalogClient(txn)->applyChunkOpsDeprecated( + txn, updates.arr(), preCond.arr(), ns.ns(), mergeVersion); + if (!applyOpsStatus.isOK()) { + return applyOpsStatus; + } + + // log changes + BSONObjBuilder logDetail; + { + BSONArrayBuilder b(logDetail.subarrayStart("merged")); + for (auto chunkToMerge : chunksToMerge) { + b.append(chunkToMerge.toBSON()); + } + } + collVersion.addToBSON(logDetail, "prevShardVersion"); + mergeVersion.addToBSON(logDetail, "mergedVersion"); + + grid.catalogClient(txn)->logChange(txn, "merge", ns.ns(), logDetail.obj()); + + return applyOpsStatus; +} + void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) { _executorForAddShard->appendConnectionStats(stats); } |