summaryrefslogtreecommitdiff
path: root/src/mongo/s/d_merge.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/d_merge.cpp')
-rw-r--r--src/mongo/s/d_merge.cpp504
1 files changed, 241 insertions, 263 deletions
diff --git a/src/mongo/s/d_merge.cpp b/src/mongo/s/d_merge.cpp
index 897b3dfd32f..d14c036b0e1 100644
--- a/src/mongo/s/d_merge.cpp
+++ b/src/mongo/s/d_merge.cpp
@@ -47,344 +47,322 @@
namespace mongo {
- using std::endl;
- using std::string;
- using mongoutils::str::stream;
-
- static Status runApplyOpsCmd(const std::vector<ChunkType>&,
- const ChunkVersion&,
- const ChunkVersion&);
-
- static BSONObj buildMergeLogEntry(const std::vector<ChunkType>&,
- const ChunkVersion&,
- const ChunkVersion&);
-
- static bool isEmptyChunk( const ChunkType& );
-
- bool mergeChunks( OperationContext* txn,
- const NamespaceString& nss,
- const BSONObj& minKey,
- const BSONObj& maxKey,
- const OID& epoch,
- string* errMsg ) {
-
- //
- // Get sharding state up-to-date
- //
-
- ConnectionString configLoc = ConnectionString::parse( shardingState.getConfigServer(),
- *errMsg );
- if ( !configLoc.isValid() ){
- warning() << *errMsg << endl;
- return false;
- }
-
- //
- // Get the distributed lock
- //
-
- string whyMessage = stream() << "merging chunks in " << nss.ns()
- << " from " << minKey << " to " << maxKey;
- auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(
- nss.ns(), whyMessage);
-
- if (!scopedDistLock.isOK()) {
- *errMsg = stream() << "could not acquire collection lock for " << nss.ns()
- << " to merge chunks in [" << minKey << "," << maxKey << ")"
- << causedBy(scopedDistLock.getStatus());
-
- warning() << *errMsg << endl;
- return false;
- }
-
- //
- // We now have the collection lock, refresh metadata to latest version and sanity check
- //
+using std::endl;
+using std::string;
+using mongoutils::str::stream;
+
+static Status runApplyOpsCmd(const std::vector<ChunkType>&,
+ const ChunkVersion&,
+ const ChunkVersion&);
+
+static BSONObj buildMergeLogEntry(const std::vector<ChunkType>&,
+ const ChunkVersion&,
+ const ChunkVersion&);
+
+static bool isEmptyChunk(const ChunkType&);
+
+bool mergeChunks(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& minKey,
+ const BSONObj& maxKey,
+ const OID& epoch,
+ string* errMsg) {
+ //
+ // Get sharding state up-to-date
+ //
- ChunkVersion shardVersion;
- Status status = shardingState.refreshMetadataNow(txn, nss.ns(), &shardVersion);
+ ConnectionString configLoc = ConnectionString::parse(shardingState.getConfigServer(), *errMsg);
+ if (!configLoc.isValid()) {
+ warning() << *errMsg << endl;
+ return false;
+ }
- if ( !status.isOK() ) {
+ //
+ // Get the distributed lock
+ //
- *errMsg = str::stream() << "could not merge chunks, failed to refresh metadata for "
- << nss.ns() << causedBy( status.reason() );
+ string whyMessage = stream() << "merging chunks in " << nss.ns() << " from " << minKey << " to "
+ << maxKey;
+ auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(nss.ns(), whyMessage);
- warning() << *errMsg << endl;
- return false;
- }
+ if (!scopedDistLock.isOK()) {
+ *errMsg = stream() << "could not acquire collection lock for " << nss.ns()
+ << " to merge chunks in [" << minKey << "," << maxKey << ")"
+ << causedBy(scopedDistLock.getStatus());
- if ( epoch.isSet() && shardVersion.epoch() != epoch ) {
+ warning() << *errMsg << endl;
+ return false;
+ }
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " has changed" << " since merge was sent" << "(sent epoch : "
- << epoch.toString()
- << ", current epoch : " << shardVersion.epoch().toString() << ")";
+ //
+ // We now have the collection lock, refresh metadata to latest version and sanity check
+ //
- warning() << *errMsg << endl;
- return false;
- }
+ ChunkVersion shardVersion;
+ Status status = shardingState.refreshMetadataNow(txn, nss.ns(), &shardVersion);
- CollectionMetadataPtr metadata = shardingState.getCollectionMetadata( nss.ns() );
+ if (!status.isOK()) {
+ *errMsg = str::stream() << "could not merge chunks, failed to refresh metadata for "
+ << nss.ns() << causedBy(status.reason());
- if ( !metadata || metadata->getKeyPattern().isEmpty() ) {
+ warning() << *errMsg << endl;
+ return false;
+ }
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " is not sharded";
+ if (epoch.isSet() && shardVersion.epoch() != epoch) {
+ *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " has changed"
+ << " since merge was sent"
+ << "(sent epoch : " << epoch.toString()
+ << ", current epoch : " << shardVersion.epoch().toString() << ")";
- warning() << *errMsg << endl;
- return false;
- }
+ warning() << *errMsg << endl;
+ return false;
+ }
- dassert( metadata->getShardVersion().equals( shardVersion ) );
+ CollectionMetadataPtr metadata = shardingState.getCollectionMetadata(nss.ns());
- if ( !metadata->isValidKey( minKey ) || !metadata->isValidKey( maxKey ) ) {
+ if (!metadata || metadata->getKeyPattern().isEmpty()) {
+ *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
+ << " is not sharded";
- *errMsg = stream() << "could not merge chunks, the range "
- << rangeToString( minKey, maxKey ) << " is not valid"
- << " for collection " << nss.ns() << " with key pattern "
- << metadata->getKeyPattern();
+ warning() << *errMsg << endl;
+ return false;
+ }
- warning() << *errMsg << endl;
- return false;
- }
+ dassert(metadata->getShardVersion().equals(shardVersion));
- //
- // Get merged chunk information
- //
+ if (!metadata->isValidKey(minKey) || !metadata->isValidKey(maxKey)) {
+ *errMsg = stream() << "could not merge chunks, the range " << rangeToString(minKey, maxKey)
+ << " is not valid"
+ << " for collection " << nss.ns() << " with key pattern "
+ << metadata->getKeyPattern();
- ChunkVersion mergeVersion = metadata->getCollVersion();
- mergeVersion.incMinor();
+ warning() << *errMsg << endl;
+ return false;
+ }
- std::vector<ChunkType> chunksToMerge;
+ //
+ // Get merged chunk information
+ //
- ChunkType itChunk;
- itChunk.setMin( minKey );
- itChunk.setMax( minKey );
- itChunk.setNS( nss.ns() );
- itChunk.setShard( shardingState.getShardName() );
+ ChunkVersion mergeVersion = metadata->getCollVersion();
+ mergeVersion.incMinor();
- while (itChunk.getMax().woCompare(maxKey) < 0 &&
- metadata->getNextChunk(itChunk.getMax(), &itChunk)) {
- chunksToMerge.push_back(itChunk);
- }
+ std::vector<ChunkType> chunksToMerge;
- if ( chunksToMerge.empty() ) {
+ ChunkType itChunk;
+ itChunk.setMin(minKey);
+ itChunk.setMax(minKey);
+ itChunk.setNS(nss.ns());
+ itChunk.setShard(shardingState.getShardName());
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " range starting at " << minKey
- << " and ending at " << maxKey
- << " does not belong to shard " << shardingState.getShardName();
+ while (itChunk.getMax().woCompare(maxKey) < 0 &&
+ metadata->getNextChunk(itChunk.getMax(), &itChunk)) {
+ chunksToMerge.push_back(itChunk);
+ }
- warning() << *errMsg << endl;
- return false;
- }
+ if (chunksToMerge.empty()) {
+ *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
+ << " range starting at " << minKey << " and ending at " << maxKey
+ << " does not belong to shard " << shardingState.getShardName();
- //
- // Validate the range starts and ends at chunks and has no holes, error if not valid
- //
+ warning() << *errMsg << endl;
+ return false;
+ }
- BSONObj firstDocMin = chunksToMerge.front().getMin();
- BSONObj firstDocMax = chunksToMerge.front().getMax();
- // minKey is inclusive
- bool minKeyInRange = rangeContains( firstDocMin, firstDocMax, minKey );
+ //
+ // Validate the range starts and ends at chunks and has no holes, error if not valid
+ //
- if ( !minKeyInRange ) {
+ BSONObj firstDocMin = chunksToMerge.front().getMin();
+ BSONObj firstDocMax = chunksToMerge.front().getMax();
+ // minKey is inclusive
+ bool minKeyInRange = rangeContains(firstDocMin, firstDocMax, minKey);
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " range starting at " << minKey
- << " does not belong to shard " << shardingState.getShardName();
+ if (!minKeyInRange) {
+ *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
+ << " range starting at " << minKey << " does not belong to shard "
+ << shardingState.getShardName();
- warning() << *errMsg << endl;
- return false;
- }
+ warning() << *errMsg << endl;
+ return false;
+ }
- BSONObj lastDocMin = chunksToMerge.back().getMin();
- BSONObj lastDocMax = chunksToMerge.back().getMax();
- // maxKey is exclusive
- bool maxKeyInRange = lastDocMin.woCompare( maxKey ) < 0 &&
- lastDocMax.woCompare( maxKey ) >= 0;
+ BSONObj lastDocMin = chunksToMerge.back().getMin();
+ BSONObj lastDocMax = chunksToMerge.back().getMax();
+ // maxKey is exclusive
+ bool maxKeyInRange = lastDocMin.woCompare(maxKey) < 0 && lastDocMax.woCompare(maxKey) >= 0;
- if ( !maxKeyInRange ) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " range ending at " << maxKey
- << " does not belong to shard " << shardingState.getShardName();
+ if (!maxKeyInRange) {
+ *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
+ << " range ending at " << maxKey << " does not belong to shard "
+ << shardingState.getShardName();
- warning() << *errMsg << endl;
- return false;
- }
+ warning() << *errMsg << endl;
+ return false;
+ }
- bool validRangeStartKey = firstDocMin.woCompare( minKey ) == 0;
- bool validRangeEndKey = lastDocMax.woCompare( maxKey ) == 0;
+ bool validRangeStartKey = firstDocMin.woCompare(minKey) == 0;
+ bool validRangeEndKey = lastDocMax.woCompare(maxKey) == 0;
- if ( !validRangeStartKey || !validRangeEndKey ) {
+ if (!validRangeStartKey || !validRangeEndKey) {
+ *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
+ << " does not contain a chunk "
+ << (!validRangeStartKey ? "starting at " + minKey.toString() : "")
+ << (!validRangeStartKey && !validRangeEndKey ? " or " : "")
+ << (!validRangeEndKey ? "ending at " + maxKey.toString() : "");
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " does not contain a chunk "
- << ( !validRangeStartKey ? "starting at " + minKey.toString() : "" )
- << ( !validRangeStartKey && !validRangeEndKey ? " or " : "" )
- << ( !validRangeEndKey ? "ending at " + maxKey.toString() : "" );
+ warning() << *errMsg << endl;
+ return false;
+ }
- warning() << *errMsg << endl;
- return false;
- }
+ if (chunksToMerge.size() == 1) {
+ *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
+ << " already contains chunk for " << rangeToString(minKey, maxKey);
- if ( chunksToMerge.size() == 1 ) {
+ warning() << *errMsg << endl;
+ return false;
+ }
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " already contains chunk for " << rangeToString( minKey, maxKey );
+ // Look for hole in range
+ for (size_t i = 1; i < chunksToMerge.size(); ++i) {
+ if (chunksToMerge[i - 1].getMax().woCompare(chunksToMerge[i].getMin()) != 0) {
+ *errMsg =
+ stream() << "could not merge chunks, collection " << nss.ns()
+ << " has a hole in the range " << rangeToString(minKey, maxKey) << " at "
+ << rangeToString(chunksToMerge[i - 1].getMax(), chunksToMerge[i].getMin());
warning() << *errMsg << endl;
return false;
}
+ }
- // Look for hole in range
- for (size_t i = 1; i < chunksToMerge.size(); ++i) {
- if (chunksToMerge[i-1].getMax().woCompare(chunksToMerge[i].getMin()) != 0) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " has a hole in the range " << rangeToString(minKey, maxKey)
- << " at " << rangeToString(chunksToMerge[i-1].getMax(),
- chunksToMerge[i].getMin());
-
- warning() << *errMsg << endl;
- return false;
- }
- }
-
- //
- // Run apply ops command
- //
- Status applyOpsStatus = runApplyOpsCmd(chunksToMerge, shardVersion, mergeVersion);
- if (!applyOpsStatus.isOK()) {
- warning() << applyOpsStatus;
- return false;
- }
-
- //
- // Install merged chunk metadata
- //
-
- {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock writeLk(txn->lockState(), nss.db(), MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_X);
- shardingState.mergeChunks(txn, nss.ns(), minKey, maxKey, mergeVersion);
- }
-
- //
- // Log change
- //
-
- BSONObj mergeLogEntry = buildMergeLogEntry( chunksToMerge,
- shardVersion,
- mergeVersion );
+ //
+ // Run apply ops command
+ //
+ Status applyOpsStatus = runApplyOpsCmd(chunksToMerge, shardVersion, mergeVersion);
+ if (!applyOpsStatus.isOK()) {
+ warning() << applyOpsStatus;
+ return false;
+ }
- grid.catalogManager()->logChange(txn, "merge", nss.ns(), mergeLogEntry);
+ //
+ // Install merged chunk metadata
+ //
- return true;
+ {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock writeLk(txn->lockState(), nss.db(), MODE_IX);
+ Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_X);
+ shardingState.mergeChunks(txn, nss.ns(), minKey, maxKey, mergeVersion);
}
//
- // Utilities for building BSONObjs for applyOps and change logging
+ // Log change
//
- BSONObj buildMergeLogEntry(const std::vector<ChunkType>& chunksToMerge,
- const ChunkVersion& currShardVersion,
- const ChunkVersion& newMergedVersion) {
+ BSONObj mergeLogEntry = buildMergeLogEntry(chunksToMerge, shardVersion, mergeVersion);
- BSONObjBuilder logDetailB;
+ grid.catalogManager()->logChange(txn, "merge", nss.ns(), mergeLogEntry);
- BSONArrayBuilder mergedB( logDetailB.subarrayStart( "merged" ) );
+ return true;
+}
- for (const ChunkType& chunkToMerge : chunksToMerge) {
- mergedB.append(chunkToMerge.toBSON());
- }
+//
+// Utilities for building BSONObjs for applyOps and change logging
+//
- mergedB.done();
+BSONObj buildMergeLogEntry(const std::vector<ChunkType>& chunksToMerge,
+ const ChunkVersion& currShardVersion,
+ const ChunkVersion& newMergedVersion) {
+ BSONObjBuilder logDetailB;
- currShardVersion.addToBSON( logDetailB, "prevShardVersion" );
- newMergedVersion.addToBSON( logDetailB, "mergedVersion" );
+ BSONArrayBuilder mergedB(logDetailB.subarrayStart("merged"));
- return logDetailB.obj();
+ for (const ChunkType& chunkToMerge : chunksToMerge) {
+ mergedB.append(chunkToMerge.toBSON());
}
+ mergedB.done();
- BSONObj buildOpMergeChunk( const ChunkType& mergedChunk ) {
+ currShardVersion.addToBSON(logDetailB, "prevShardVersion");
+ newMergedVersion.addToBSON(logDetailB, "mergedVersion");
- BSONObjBuilder opB;
+ return logDetailB.obj();
+}
- // Op basics
- opB.append( "op" , "u" );
- opB.appendBool( "b" , false ); // no upserting
- opB.append( "ns" , ChunkType::ConfigNS );
- // New object
- opB.append( "o", mergedChunk.toBSON() );
+BSONObj buildOpMergeChunk(const ChunkType& mergedChunk) {
+ BSONObjBuilder opB;
- // Query object
- opB.append( "o2",
- BSON( ChunkType::name( mergedChunk.getName() ) ) );
+ // Op basics
+ opB.append("op", "u");
+ opB.appendBool("b", false); // no upserting
+ opB.append("ns", ChunkType::ConfigNS);
- return opB.obj();
- }
+ // New object
+ opB.append("o", mergedChunk.toBSON());
- BSONObj buildOpRemoveChunk( const ChunkType& chunkToRemove ) {
+ // Query object
+ opB.append("o2", BSON(ChunkType::name(mergedChunk.getName())));
- BSONObjBuilder opB;
-
- // Op basics
- opB.append( "op", "d" ); // delete
- opB.append( "ns", ChunkType::ConfigNS );
+ return opB.obj();
+}
- opB.append( "o", BSON( ChunkType::name( chunkToRemove.getName() ) ) );
+BSONObj buildOpRemoveChunk(const ChunkType& chunkToRemove) {
+ BSONObjBuilder opB;
- return opB.obj();
- }
+ // Op basics
+ opB.append("op", "d"); // delete
+ opB.append("ns", ChunkType::ConfigNS);
- BSONArray buildOpPrecond(const string& ns,
- const string& shardName,
- const ChunkVersion& shardVersion) {
- BSONArrayBuilder preCond;
- BSONObjBuilder condB;
- condB.append( "ns", ChunkType::ConfigNS );
- condB.append( "q", BSON( "query" << BSON( ChunkType::ns( ns ) )
- << "orderby" << BSON( ChunkType::DEPRECATED_lastmod() << -1 ) ) );
- {
- BSONObjBuilder resB( condB.subobjStart( "res" ) );
- shardVersion.addToBSON( resB, ChunkType::DEPRECATED_lastmod() );
- resB.done();
- }
- preCond.append(condB.obj());
- return preCond.arr();
- }
+ opB.append("o", BSON(ChunkType::name(chunkToRemove.getName())));
- Status runApplyOpsCmd(const std::vector<ChunkType>& chunksToMerge,
- const ChunkVersion& currShardVersion,
- const ChunkVersion& newMergedVersion) {
+ return opB.obj();
+}
- BSONArrayBuilder updatesB;
+BSONArray buildOpPrecond(const string& ns,
+ const string& shardName,
+ const ChunkVersion& shardVersion) {
+ BSONArrayBuilder preCond;
+ BSONObjBuilder condB;
+ condB.append("ns", ChunkType::ConfigNS);
+ condB.append("q",
+ BSON("query" << BSON(ChunkType::ns(ns)) << "orderby"
+ << BSON(ChunkType::DEPRECATED_lastmod() << -1)));
+ {
+ BSONObjBuilder resB(condB.subobjStart("res"));
+ shardVersion.addToBSON(resB, ChunkType::DEPRECATED_lastmod());
+ resB.done();
+ }
+ preCond.append(condB.obj());
+ return preCond.arr();
+}
- // The chunk we'll be "expanding" is the first chunk
- const ChunkType& firstChunk = chunksToMerge.front();
+Status runApplyOpsCmd(const std::vector<ChunkType>& chunksToMerge,
+ const ChunkVersion& currShardVersion,
+ const ChunkVersion& newMergedVersion) {
+ BSONArrayBuilder updatesB;
- // Fill in details not tracked by metadata
- ChunkType mergedChunk(firstChunk);
- mergedChunk.setName(Chunk::genID(firstChunk.getNS(), firstChunk.getMin()));
- mergedChunk.setMax(chunksToMerge.back().getMax());
- mergedChunk.setVersion(newMergedVersion);
+ // The chunk we'll be "expanding" is the first chunk
+ const ChunkType& firstChunk = chunksToMerge.front();
- updatesB.append(buildOpMergeChunk(mergedChunk));
+ // Fill in details not tracked by metadata
+ ChunkType mergedChunk(firstChunk);
+ mergedChunk.setName(Chunk::genID(firstChunk.getNS(), firstChunk.getMin()));
+ mergedChunk.setMax(chunksToMerge.back().getMax());
+ mergedChunk.setVersion(newMergedVersion);
- // Don't remove chunk we're expanding
- for (size_t i = 1; i < chunksToMerge.size(); ++i) {
- ChunkType chunkToMerge(chunksToMerge[i]);
- chunkToMerge.setName(Chunk::genID(chunkToMerge.getNS(), chunkToMerge.getMin()));
- updatesB.append(buildOpRemoveChunk(chunkToMerge));
- }
+ updatesB.append(buildOpMergeChunk(mergedChunk));
- BSONArray preCond = buildOpPrecond(firstChunk.getNS(),
- firstChunk.getShard(),
- currShardVersion);
- return grid.catalogManager()->applyChunkOpsDeprecated(updatesB.arr(), preCond);
+ // Don't remove chunk we're expanding
+ for (size_t i = 1; i < chunksToMerge.size(); ++i) {
+ ChunkType chunkToMerge(chunksToMerge[i]);
+ chunkToMerge.setName(Chunk::genID(chunkToMerge.getNS(), chunkToMerge.getMin()));
+ updatesB.append(buildOpRemoveChunk(chunkToMerge));
}
+ BSONArray preCond = buildOpPrecond(firstChunk.getNS(), firstChunk.getShard(), currShardVersion);
+ return grid.catalogManager()->applyChunkOpsDeprecated(updatesB.arr(), preCond);
+}
}