diff options
Diffstat (limited to 'src/mongo/s/d_merge.cpp')
-rw-r--r-- | src/mongo/s/d_merge.cpp | 504 |
1 files changed, 241 insertions, 263 deletions
diff --git a/src/mongo/s/d_merge.cpp b/src/mongo/s/d_merge.cpp index 897b3dfd32f..d14c036b0e1 100644 --- a/src/mongo/s/d_merge.cpp +++ b/src/mongo/s/d_merge.cpp @@ -47,344 +47,322 @@ namespace mongo { - using std::endl; - using std::string; - using mongoutils::str::stream; - - static Status runApplyOpsCmd(const std::vector<ChunkType>&, - const ChunkVersion&, - const ChunkVersion&); - - static BSONObj buildMergeLogEntry(const std::vector<ChunkType>&, - const ChunkVersion&, - const ChunkVersion&); - - static bool isEmptyChunk( const ChunkType& ); - - bool mergeChunks( OperationContext* txn, - const NamespaceString& nss, - const BSONObj& minKey, - const BSONObj& maxKey, - const OID& epoch, - string* errMsg ) { - - // - // Get sharding state up-to-date - // - - ConnectionString configLoc = ConnectionString::parse( shardingState.getConfigServer(), - *errMsg ); - if ( !configLoc.isValid() ){ - warning() << *errMsg << endl; - return false; - } - - // - // Get the distributed lock - // - - string whyMessage = stream() << "merging chunks in " << nss.ns() - << " from " << minKey << " to " << maxKey; - auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock( - nss.ns(), whyMessage); - - if (!scopedDistLock.isOK()) { - *errMsg = stream() << "could not acquire collection lock for " << nss.ns() - << " to merge chunks in [" << minKey << "," << maxKey << ")" - << causedBy(scopedDistLock.getStatus()); - - warning() << *errMsg << endl; - return false; - } - - // - // We now have the collection lock, refresh metadata to latest version and sanity check - // +using std::endl; +using std::string; +using mongoutils::str::stream; + +static Status runApplyOpsCmd(const std::vector<ChunkType>&, + const ChunkVersion&, + const ChunkVersion&); + +static BSONObj buildMergeLogEntry(const std::vector<ChunkType>&, + const ChunkVersion&, + const ChunkVersion&); + +static bool isEmptyChunk(const ChunkType&); + +bool mergeChunks(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& minKey, + const BSONObj& maxKey, + const OID& epoch, + string* errMsg) { + // + // Get sharding state up-to-date + // - ChunkVersion shardVersion; - Status status = shardingState.refreshMetadataNow(txn, nss.ns(), &shardVersion); + ConnectionString configLoc = ConnectionString::parse(shardingState.getConfigServer(), *errMsg); + if (!configLoc.isValid()) { + warning() << *errMsg << endl; + return false; + } - if ( !status.isOK() ) { + // + // Get the distributed lock + // - *errMsg = str::stream() << "could not merge chunks, failed to refresh metadata for " - << nss.ns() << causedBy( status.reason() ); + string whyMessage = stream() << "merging chunks in " << nss.ns() << " from " << minKey << " to " + << maxKey; + auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(nss.ns(), whyMessage); - warning() << *errMsg << endl; - return false; - } + if (!scopedDistLock.isOK()) { + *errMsg = stream() << "could not acquire collection lock for " << nss.ns() + << " to merge chunks in [" << minKey << "," << maxKey << ")" + << causedBy(scopedDistLock.getStatus()); - if ( epoch.isSet() && shardVersion.epoch() != epoch ) { + warning() << *errMsg << endl; + return false; + } - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " has changed" << " since merge was sent" << "(sent epoch : " - << epoch.toString() - << ", current epoch : " << shardVersion.epoch().toString() << ")"; + // + // We now have the collection lock, refresh metadata to latest version and sanity check + // - warning() << *errMsg << endl; - return false; - } + ChunkVersion shardVersion; + Status status = shardingState.refreshMetadataNow(txn, nss.ns(), &shardVersion); - CollectionMetadataPtr metadata = shardingState.getCollectionMetadata( nss.ns() ); + if (!status.isOK()) { + *errMsg = str::stream() << "could not merge chunks, failed to refresh metadata for " + << nss.ns() << causedBy(status.reason()); - if ( !metadata || metadata->getKeyPattern().isEmpty() ) { + warning() << *errMsg << endl; + return false; + } - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " is not sharded"; + if (epoch.isSet() && shardVersion.epoch() != epoch) { + *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " has changed" + << " since merge was sent" + << "(sent epoch : " << epoch.toString() + << ", current epoch : " << shardVersion.epoch().toString() << ")"; - warning() << *errMsg << endl; - return false; - } + warning() << *errMsg << endl; + return false; + } - dassert( metadata->getShardVersion().equals( shardVersion ) ); + CollectionMetadataPtr metadata = shardingState.getCollectionMetadata(nss.ns()); - if ( !metadata->isValidKey( minKey ) || !metadata->isValidKey( maxKey ) ) { + if (!metadata || metadata->getKeyPattern().isEmpty()) { + *errMsg = stream() << "could not merge chunks, collection " << nss.ns() + << " is not sharded"; - *errMsg = stream() << "could not merge chunks, the range " - << rangeToString( minKey, maxKey ) << " is not valid" - << " for collection " << nss.ns() << " with key pattern " - << metadata->getKeyPattern(); + warning() << *errMsg << endl; + return false; + } - warning() << *errMsg << endl; - return false; - } + dassert(metadata->getShardVersion().equals(shardVersion)); - // - // Get merged chunk information - // + if (!metadata->isValidKey(minKey) || !metadata->isValidKey(maxKey)) { + *errMsg = stream() << "could not merge chunks, the range " << rangeToString(minKey, maxKey) + << " is not valid" + << " for collection " << nss.ns() << " with key pattern " + << metadata->getKeyPattern(); - ChunkVersion mergeVersion = metadata->getCollVersion(); - mergeVersion.incMinor(); + warning() << *errMsg << endl; + return false; + } - std::vector<ChunkType> chunksToMerge; + // + // Get merged chunk information + // - ChunkType itChunk; - itChunk.setMin( minKey ); - itChunk.setMax( minKey ); - itChunk.setNS( nss.ns() ); - itChunk.setShard( shardingState.getShardName() ); + ChunkVersion mergeVersion = metadata->getCollVersion(); + mergeVersion.incMinor(); - while (itChunk.getMax().woCompare(maxKey) < 0 && - metadata->getNextChunk(itChunk.getMax(), &itChunk)) { - chunksToMerge.push_back(itChunk); - } + std::vector<ChunkType> chunksToMerge; - if ( chunksToMerge.empty() ) { + ChunkType itChunk; + itChunk.setMin(minKey); + itChunk.setMax(minKey); + itChunk.setNS(nss.ns()); + itChunk.setShard(shardingState.getShardName()); - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " range starting at " << minKey - << " and ending at " << maxKey - << " does not belong to shard " << shardingState.getShardName(); + while (itChunk.getMax().woCompare(maxKey) < 0 && + metadata->getNextChunk(itChunk.getMax(), &itChunk)) { + chunksToMerge.push_back(itChunk); + } - warning() << *errMsg << endl; - return false; - } + if (chunksToMerge.empty()) { + *errMsg = stream() << "could not merge chunks, collection " << nss.ns() + << " range starting at " << minKey << " and ending at " << maxKey + << " does not belong to shard " << shardingState.getShardName(); - // - // Validate the range starts and ends at chunks and has no holes, error if not valid - // + warning() << *errMsg << endl; + return false; + } - BSONObj firstDocMin = chunksToMerge.front().getMin(); - BSONObj firstDocMax = chunksToMerge.front().getMax(); - // minKey is inclusive - bool minKeyInRange = rangeContains( firstDocMin, firstDocMax, minKey ); + // + // Validate the range starts and ends at chunks and has no holes, error if not valid + // - if ( !minKeyInRange ) { + BSONObj firstDocMin = chunksToMerge.front().getMin(); + BSONObj firstDocMax = chunksToMerge.front().getMax(); + // minKey is inclusive + bool minKeyInRange = rangeContains(firstDocMin, firstDocMax, minKey); - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " range starting at " << minKey - << " does not belong to shard " << shardingState.getShardName(); + if (!minKeyInRange) { + *errMsg = stream() << "could not merge chunks, collection " << nss.ns() + << " range starting at " << minKey << " does not belong to shard " + << shardingState.getShardName(); - warning() << *errMsg << endl; - return false; - } + warning() << *errMsg << endl; + return false; + } - BSONObj lastDocMin = chunksToMerge.back().getMin(); - BSONObj lastDocMax = chunksToMerge.back().getMax(); - // maxKey is exclusive - bool maxKeyInRange = lastDocMin.woCompare( maxKey ) < 0 && - lastDocMax.woCompare( maxKey ) >= 0; + BSONObj lastDocMin = chunksToMerge.back().getMin(); + BSONObj lastDocMax = chunksToMerge.back().getMax(); + // maxKey is exclusive + bool maxKeyInRange = lastDocMin.woCompare(maxKey) < 0 && lastDocMax.woCompare(maxKey) >= 0; - if ( !maxKeyInRange ) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " range ending at " << maxKey - << " does not belong to shard " << shardingState.getShardName(); + if (!maxKeyInRange) { + *errMsg = stream() << "could not merge chunks, collection " << nss.ns() + << " range ending at " << maxKey << " does not belong to shard " + << shardingState.getShardName(); - warning() << *errMsg << endl; - return false; - } + warning() << *errMsg << endl; + return false; + } - bool validRangeStartKey = firstDocMin.woCompare( minKey ) == 0; - bool validRangeEndKey = lastDocMax.woCompare( maxKey ) == 0; + bool validRangeStartKey = firstDocMin.woCompare(minKey) == 0; + bool validRangeEndKey = lastDocMax.woCompare(maxKey) == 0; - if ( !validRangeStartKey || !validRangeEndKey ) { + if (!validRangeStartKey || !validRangeEndKey) { + *errMsg = stream() << "could not merge chunks, collection " << nss.ns() + << " does not contain a chunk " + << (!validRangeStartKey ? "starting at " + minKey.toString() : "") + << (!validRangeStartKey && !validRangeEndKey ? " or " : "") + << (!validRangeEndKey ? "ending at " + maxKey.toString() : ""); - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " does not contain a chunk " - << ( !validRangeStartKey ? "starting at " + minKey.toString() : "" ) - << ( !validRangeStartKey && !validRangeEndKey ? " or " : "" ) - << ( !validRangeEndKey ? "ending at " + maxKey.toString() : "" ); + warning() << *errMsg << endl; + return false; + } - warning() << *errMsg << endl; - return false; - } + if (chunksToMerge.size() == 1) { + *errMsg = stream() << "could not merge chunks, collection " << nss.ns() + << " already contains chunk for " << rangeToString(minKey, maxKey); - if ( chunksToMerge.size() == 1 ) { + warning() << *errMsg << endl; + return false; + } - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " already contains chunk for " << rangeToString( minKey, maxKey ); + // Look for hole in range + for (size_t i = 1; i < chunksToMerge.size(); ++i) { + if (chunksToMerge[i - 1].getMax().woCompare(chunksToMerge[i].getMin()) != 0) { + *errMsg = + stream() << "could not merge chunks, collection " << nss.ns() + << " has a hole in the range " << rangeToString(minKey, maxKey) << " at " + << rangeToString(chunksToMerge[i - 1].getMax(), chunksToMerge[i].getMin()); warning() << *errMsg << endl; return false; } + } - // Look for hole in range - for (size_t i = 1; i < chunksToMerge.size(); ++i) { - if (chunksToMerge[i-1].getMax().woCompare(chunksToMerge[i].getMin()) != 0) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " has a hole in the range " << rangeToString(minKey, maxKey) - << " at " << rangeToString(chunksToMerge[i-1].getMax(), - chunksToMerge[i].getMin()); - - warning() << *errMsg << endl; - return false; - } - } - - // - // Run apply ops command - // - Status applyOpsStatus = runApplyOpsCmd(chunksToMerge, shardVersion, mergeVersion); - if (!applyOpsStatus.isOK()) { - warning() << applyOpsStatus; - return false; - } - - // - // Install merged chunk metadata - // - - { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock writeLk(txn->lockState(), nss.db(), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_X); - shardingState.mergeChunks(txn, nss.ns(), minKey, maxKey, mergeVersion); - } - - // - // Log change - // - - BSONObj mergeLogEntry = buildMergeLogEntry( chunksToMerge, - shardVersion, - mergeVersion ); + // + // Run apply ops command + // + Status applyOpsStatus = runApplyOpsCmd(chunksToMerge, shardVersion, mergeVersion); + if (!applyOpsStatus.isOK()) { + warning() << applyOpsStatus; + return false; + } - grid.catalogManager()->logChange(txn, "merge", nss.ns(), mergeLogEntry); + // + // Install merged chunk metadata + // - return true; + { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock writeLk(txn->lockState(), nss.db(), MODE_IX); + Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_X); + shardingState.mergeChunks(txn, nss.ns(), minKey, maxKey, mergeVersion); } // - // Utilities for building BSONObjs for applyOps and change logging + // Log change // - BSONObj buildMergeLogEntry(const std::vector<ChunkType>& chunksToMerge, - const ChunkVersion& currShardVersion, - const ChunkVersion& newMergedVersion) { + BSONObj mergeLogEntry = buildMergeLogEntry(chunksToMerge, shardVersion, mergeVersion); - BSONObjBuilder logDetailB; + grid.catalogManager()->logChange(txn, "merge", nss.ns(), mergeLogEntry); - BSONArrayBuilder mergedB( logDetailB.subarrayStart( "merged" ) ); + return true; +} - for (const ChunkType& chunkToMerge : chunksToMerge) { - mergedB.append(chunkToMerge.toBSON()); - } +// +// Utilities for building BSONObjs for applyOps and change logging +// - mergedB.done(); +BSONObj buildMergeLogEntry(const std::vector<ChunkType>& chunksToMerge, + const ChunkVersion& currShardVersion, + const ChunkVersion& newMergedVersion) { + BSONObjBuilder logDetailB; - currShardVersion.addToBSON( logDetailB, "prevShardVersion" ); - newMergedVersion.addToBSON( logDetailB, "mergedVersion" ); + BSONArrayBuilder mergedB(logDetailB.subarrayStart("merged")); - return logDetailB.obj(); + for (const ChunkType& chunkToMerge : chunksToMerge) { + mergedB.append(chunkToMerge.toBSON()); } + mergedB.done(); - BSONObj buildOpMergeChunk( const ChunkType& mergedChunk ) { + currShardVersion.addToBSON(logDetailB, "prevShardVersion"); + newMergedVersion.addToBSON(logDetailB, "mergedVersion"); - BSONObjBuilder opB; + return logDetailB.obj(); +} - // Op basics - opB.append( "op" , "u" ); - opB.appendBool( "b" , false ); // no upserting - opB.append( "ns" , ChunkType::ConfigNS ); - // New object - opB.append( "o", mergedChunk.toBSON() ); +BSONObj buildOpMergeChunk(const ChunkType& mergedChunk) { + BSONObjBuilder opB; - // Query object - opB.append( "o2", - BSON( ChunkType::name( mergedChunk.getName() ) ) ); + // Op basics + opB.append("op", "u"); + opB.appendBool("b", false); // no upserting + opB.append("ns", ChunkType::ConfigNS); - return opB.obj(); - } + // New object + opB.append("o", mergedChunk.toBSON()); - BSONObj buildOpRemoveChunk( const ChunkType& chunkToRemove ) { + // Query object + opB.append("o2", BSON(ChunkType::name(mergedChunk.getName()))); - BSONObjBuilder opB; - - // Op basics - opB.append( "op", "d" ); // delete - opB.append( "ns", ChunkType::ConfigNS ); + return opB.obj(); +} - opB.append( "o", BSON( ChunkType::name( chunkToRemove.getName() ) ) ); +BSONObj buildOpRemoveChunk(const ChunkType& chunkToRemove) { + BSONObjBuilder opB; - return opB.obj(); - } + // Op basics + opB.append("op", "d"); // delete + opB.append("ns", ChunkType::ConfigNS); - BSONArray buildOpPrecond(const string& ns, - const string& shardName, - const ChunkVersion& shardVersion) { - BSONArrayBuilder preCond; - BSONObjBuilder condB; - condB.append( "ns", ChunkType::ConfigNS ); - condB.append( "q", BSON( "query" << BSON( ChunkType::ns( ns ) ) - << "orderby" << BSON( ChunkType::DEPRECATED_lastmod() << -1 ) ) ); - { - BSONObjBuilder resB( condB.subobjStart( "res" ) ); - shardVersion.addToBSON( resB, ChunkType::DEPRECATED_lastmod() ); - resB.done(); - } - preCond.append(condB.obj()); - return preCond.arr(); - } + opB.append("o", BSON(ChunkType::name(chunkToRemove.getName()))); - Status runApplyOpsCmd(const std::vector<ChunkType>& chunksToMerge, - const ChunkVersion& currShardVersion, - const ChunkVersion& newMergedVersion) { + return opB.obj(); +} - BSONArrayBuilder updatesB; +BSONArray buildOpPrecond(const string& ns, + const string& shardName, + const ChunkVersion& shardVersion) { + BSONArrayBuilder preCond; + BSONObjBuilder condB; + condB.append("ns", ChunkType::ConfigNS); + condB.append("q", + BSON("query" << BSON(ChunkType::ns(ns)) << "orderby" + << BSON(ChunkType::DEPRECATED_lastmod() << -1))); + { + BSONObjBuilder resB(condB.subobjStart("res")); + shardVersion.addToBSON(resB, ChunkType::DEPRECATED_lastmod()); + resB.done(); + } + preCond.append(condB.obj()); + return preCond.arr(); +} - // The chunk we'll be "expanding" is the first chunk - const ChunkType& firstChunk = chunksToMerge.front(); +Status runApplyOpsCmd(const std::vector<ChunkType>& chunksToMerge, + const ChunkVersion& currShardVersion, + const ChunkVersion& newMergedVersion) { + BSONArrayBuilder updatesB; - // Fill in details not tracked by metadata - ChunkType mergedChunk(firstChunk); - mergedChunk.setName(Chunk::genID(firstChunk.getNS(), firstChunk.getMin())); - mergedChunk.setMax(chunksToMerge.back().getMax()); - mergedChunk.setVersion(newMergedVersion); + // The chunk we'll be "expanding" is the first chunk + const ChunkType& firstChunk = chunksToMerge.front(); - updatesB.append(buildOpMergeChunk(mergedChunk)); + // Fill in details not tracked by metadata + ChunkType mergedChunk(firstChunk); + mergedChunk.setName(Chunk::genID(firstChunk.getNS(), firstChunk.getMin())); + mergedChunk.setMax(chunksToMerge.back().getMax()); + mergedChunk.setVersion(newMergedVersion); - // Don't remove chunk we're expanding - for (size_t i = 1; i < chunksToMerge.size(); ++i) { - ChunkType chunkToMerge(chunksToMerge[i]); - chunkToMerge.setName(Chunk::genID(chunkToMerge.getNS(), chunkToMerge.getMin())); - updatesB.append(buildOpRemoveChunk(chunkToMerge)); - } + updatesB.append(buildOpMergeChunk(mergedChunk)); - BSONArray preCond = buildOpPrecond(firstChunk.getNS(), - firstChunk.getShard(), - currShardVersion); - return grid.catalogManager()->applyChunkOpsDeprecated(updatesB.arr(), preCond); + // Don't remove chunk we're expanding + for (size_t i = 1; i < chunksToMerge.size(); ++i) { + ChunkType chunkToMerge(chunksToMerge[i]); + chunkToMerge.setName(Chunk::genID(chunkToMerge.getNS(), chunkToMerge.getMin())); + updatesB.append(buildOpRemoveChunk(chunkToMerge)); } + BSONArray preCond = buildOpPrecond(firstChunk.getNS(), firstChunk.getShard(), currShardVersion); + return grid.catalogManager()->applyChunkOpsDeprecated(updatesB.arr(), preCond); +} } |