diff options
Diffstat (limited to 'src/mongo/s/catalog/sharding_catalog_client_impl.cpp')
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_impl.cpp | 71 |
1 files changed, 48 insertions, 23 deletions
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index 27ad059f532..9b36e63d9ce 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -683,6 +683,8 @@ StatusWith<std::vector<ChunkType>> ShardingCatalogClientImpl::getChunks( const BSONObj& sort, boost::optional<int> limit, OpTime* opTime, + const OID& epoch, + const boost::optional<Timestamp>& timestamp, repl::ReadConcernLevel readConcern, const boost::optional<BSONObj>& hint) { invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer || @@ -700,7 +702,7 @@ StatusWith<std::vector<ChunkType>> ShardingCatalogClientImpl::getChunks( std::vector<ChunkType> chunks; for (const BSONObj& obj : chunkDocsOpTimePair.value) { - auto chunkRes = ChunkType::fromConfigBSON(obj); + auto chunkRes = ChunkType::fromConfigBSON(obj, epoch, timestamp); if (!chunkRes.isOK()) { return chunkRes.getStatus().withContext(stream() << "Failed to parse chunk with id " << obj[ChunkType::name()]); @@ -755,34 +757,50 @@ std::pair<CollectionType, std::vector<ChunkType>> ShardingCatalogClientImpl::get stream() << "Collection " << nss.ns() << " not found", !aggResult.empty()); - boost::optional<CollectionType> coll; - std::vector<ChunkType> chunks; - chunks.reserve(aggResult.size() - 1); // The aggregation may return the config.collections document anywhere between the // config.chunks documents. - for (const auto& elem : aggResult) { - const auto chunkElem = elem.getField("chunks"); - if (chunkElem) { - auto chunkRes = uassertStatusOK(ChunkType::fromConfigBSON(chunkElem.Obj())); - chunks.emplace_back(std::move(chunkRes)); - } else { - uassert(5520100, - "Found more than one 'collections' documents in aggregation response", - !coll); - coll.emplace(elem); - - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "Collection " << nss.ns() << " is dropped.", - !coll->getDropped()); + // 1st: look for the collection since it is needed to properly build the chunks. + boost::optional<CollectionType> coll; + { + for (const auto& elem : aggResult) { + const auto chunkElem = elem.getField("chunks"); + if (!chunkElem) { + coll.emplace(elem); + break; + } } + uassert(5520101, "'collections' document not found in aggregation response", coll); + + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Collection " << nss.ns() << " is dropped.", + !coll->getDropped()); } - uassert(5520101, "'collections' document not found in aggregation response", coll); + // 2nd: Traverse all the elements and build the chunks. + std::vector<ChunkType> chunks; + { + chunks.reserve(aggResult.size() - 1); + bool foundCollection = false; + for (const auto& elem : aggResult) { + const auto chunkElem = elem.getField("chunks"); + if (chunkElem) { + auto chunkRes = uassertStatusOK(ChunkType::fromConfigBSON( + chunkElem.Obj(), coll->getEpoch(), coll->getTimestamp())); + chunks.emplace_back(std::move(chunkRes)); + } else { + uassert(5520100, + "Found more than one 'collections' documents in aggregation response", + !foundCollection); + foundCollection = true; + } + } + + uassert(ErrorCodes::ConflictingOperationInProgress, + stream() << "No chunks were found for the collection " << nss, + !chunks.empty()); + } - uassert(ErrorCodes::ConflictingOperationInProgress, - stream() << "No chunks were found for the collection " << nss, - !chunks.empty()); return {std::move(*coll), std::move(chunks)}; }; @@ -1009,7 +1027,14 @@ Status ShardingCatalogClientImpl::applyChunkOpsDeprecated(OperationContext* opCt } else { query.append(ChunkType::ns(), nsOrUUID.nss()->ns()); } - auto chunkWithStatus = getChunks(opCtx, query.obj(), BSONObj(), 1, nullptr, readConcern); + auto chunkWithStatus = getChunks(opCtx, + query.obj(), + BSONObj(), + 1, + nullptr, + lastChunkVersion.epoch(), + lastChunkVersion.getTimestamp(), + readConcern); if (!chunkWithStatus.isOK()) { errMsg = str::stream() |