summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/s/d_migrate.cpp215
-rw-r--r--src/mongo/s/d_split.cpp194
-rw-r--r--src/mongo/s/distlock.cpp34
-rw-r--r--src/mongo/s/distlock.h30
4 files changed, 218 insertions, 255 deletions
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp
index b6011313603..f0c86759a29 100644
--- a/src/mongo/s/d_migrate.cpp
+++ b/src/mongo/s/d_migrate.cpp
@@ -837,17 +837,14 @@ namespace mongo {
// 1.
string ns = parseNs(dbname, cmdObj);
- string to = cmdObj["to"].str();
- string from = cmdObj["from"].str(); // my public address, a tad redundant, but safe
- // fromShard and toShard needed so that 2.2 mongos can interact with either 2.0 or 2.2 mongod
- if( cmdObj["fromShard"].type() == String ){
- from = cmdObj["fromShard"].String();
- }
+ // The shard addresses, redundant, but allows for validation
+ string toShardHost = cmdObj["to"].str();
+ string fromShardHost = cmdObj["from"].str();
- if( cmdObj["toShard"].type() == String ){
- to = cmdObj["toShard"].String();
- }
+ // The shard names
+ string toShardName = cmdObj["toShard"].str();
+ string fromShardName = cmdObj["fromShard"].str();
// Process secondary throttle settings and assign defaults if necessary.
BSONObj secThrottleObj;
@@ -894,11 +891,11 @@ namespace mongo {
return false;
}
- if ( to.empty() ) {
+ if ( toShardName.empty() ) {
errmsg = "need to specify shard to move chunk to";
return false;
}
- if ( from.empty() ) {
+ if ( fromShardName.empty() ) {
errmsg = "need to specify shard to move chunk from";
return false;
}
@@ -924,28 +921,36 @@ namespace mongo {
}
const long long maxChunkSize = maxSizeElem.numberLong(); // in bytes
+ // This could be the first call that enables sharding - make sure we initialize the
+ // sharding state for this shard.
if ( ! shardingState.enabled() ) {
if ( cmdObj["configdb"].type() != String ) {
errmsg = "sharding not enabled";
+ warning() << errmsg << endl;
return false;
}
string configdb = cmdObj["configdb"].String();
ShardingState::initialize(configdb);
}
- MoveTimingHelper timing( "from" , ns , min , max , 6 /* steps */ , &errmsg );
+ // Initialize our current shard name in the shard state if needed
+ shardingState.gotShardName(fromShardName);
// Make sure we're as up-to-date as possible with shard information
// This catches the case where we had to previously changed a shard's host by
// removing/adding a shard with the same name
Shard::reloadShardInfo();
+ Shard toShard(toShardName);
+ Shard fromShard(fromShardName);
- // So 2.2 mongod can interact with 2.0 mongos, mongod needs to handle either a conn
- // string or a shard in the to/from fields. The Shard constructor handles this,
- // eventually we should break the compatibility.
+ ConnectionString configLoc = ConnectionString::parse(shardingState.getConfigServer(),
+ errmsg);
+ if (!configLoc.isValid()) {
+ warning() << errmsg;
+ return false;
+ }
- Shard fromShard( from );
- Shard toShard( to );
+ MoveTimingHelper timing( "from" , ns , min , max , 6 /* steps */ , &errmsg );
log() << "received moveChunk request: " << cmdObj << migrateLog;
@@ -960,138 +965,88 @@ namespace mongo {
return false;
}
- DistributedLock lockSetup( ConnectionString( shardingState.getConfigServer() , ConnectionString::SYNC ) , ns );
- dist_lock_try dlk;
+ //
+ // Get the distributed lock
+ //
- try{
- dlk = dist_lock_try( &lockSetup , (string)"migrate-" + min.toString(), 30.0 /*timeout*/ );
- }
- catch( LockException& e ){
- errmsg = str::stream() << "error locking distributed lock for migration " << "migrate-" << min.toString() << causedBy( e );
- warning() << errmsg << endl;
- return false;
- }
-
- if ( ! dlk.got() ) {
- errmsg = str::stream() << "the collection metadata could not be locked with lock " << "migrate-" << min.toString();
- warning() << errmsg << endl;
- result.append( "who" , dlk.other() );
- return false;
- }
+ ScopedDistributedLock collLock(configLoc, ns);
+ collLock.setLockMessage(str::stream() << "migrating chunk [" << minKey << ", " << maxKey
+ << ") in " << ns);
- BSONObj chunkInfo = BSON("min" << min << "max" << max << "from" << fromShard.getName() << "to" << toShard.getName() );
- configServer.logChange( "moveChunk.start" , ns , chunkInfo );
+ if (!collLock.tryAcquire(&errmsg)) {
- ChunkVersion maxVersion;
- ChunkVersion startingVersion;
- string myOldShard;
- {
- ScopedDbConnection conn(shardingState.getConfigServer(), 30);
+ errmsg = str::stream() << "could not acquire collection lock for " << ns
+ << " to migrate chunk [" << minKey << "," << maxKey << ")"
+ << causedBy(errmsg);
- BSONObj x;
- BSONObj currChunk;
- try{
- x = conn->findOne(ChunkType::ConfigNS,
- Query(BSON(ChunkType::ns(ns)))
- .sort(BSON(ChunkType::DEPRECATED_lastmod() << -1)));
+ warning() << errmsg;
+ return false;
+ }
- currChunk = conn->findOne(ChunkType::ConfigNS,
- shardId.wrap(ChunkType::name().c_str()));
- }
- catch( DBException& e ){
- errmsg = str::stream() << "aborted moveChunk because could not get chunk data from config server " << shardingState.getConfigServer() << causedBy( e );
- warning() << errmsg << endl;
- return false;
- }
+ BSONObj chunkInfo =
+ BSON("min" << min << "max" << max <<
+ "from" << fromShard.getName() << "to" << toShard.getName());
+ configServer.logChange("moveChunk.start", ns, chunkInfo);
- maxVersion = ChunkVersion::fromBSON(x, ChunkType::DEPRECATED_lastmod());
- verify(currChunk[ChunkType::shard()].type());
- verify(currChunk[ChunkType::min()].type());
- verify(currChunk[ChunkType::max()].type());
- myOldShard = currChunk[ChunkType::shard()].String();
- conn.done();
+ // Always refresh our metadata remotely
+ ChunkVersion origShardVersion;
+ Status refreshStatus = shardingState.refreshMetadataNow(txn, ns, &origShardVersion);
- BSONObj currMin = currChunk[ChunkType::min()].Obj();
- BSONObj currMax = currChunk[ChunkType::max()].Obj();
- if ( currMin.woCompare( min ) || currMax.woCompare( max ) ) {
- errmsg = "boundaries are outdated (likely a split occurred)";
- result.append( "currMin" , currMin );
- result.append( "currMax" , currMax );
- result.append( "requestedMin" , min );
- result.append( "requestedMax" , max );
-
- warning() << "aborted moveChunk because" << errmsg << ": " << min << "->" << max
- << " is now " << currMin << "->" << currMax << migrateLog;
- return false;
- }
+ if (!refreshStatus.isOK()) {
- if ( myOldShard != fromShard.getName() ) {
- errmsg = "location is outdated (likely balance or migrate occurred)";
- result.append( "from" , fromShard.getName() );
- result.append( "official" , myOldShard );
+ errmsg = str::stream() << "moveChunk cannot start migrate of chunk "
+ << "[" << minKey << "," << maxKey << ")"
+ << causedBy(refreshStatus.reason());
- warning() << "aborted moveChunk because " << errmsg << ": chunk is at " << myOldShard
- << " and not at " << fromShard.getName() << migrateLog;
- return false;
- }
+ warning() << errmsg;
+ return false;
+ }
- if ( maxVersion < shardingState.getVersion( ns ) ) {
- errmsg = "official version less than mine?";
- maxVersion.addToBSON( result, "officialVersion" );
- shardingState.getVersion( ns ).addToBSON( result, "myVersion" );
+ if (origShardVersion.majorVersion() == 0) {
- warning() << "aborted moveChunk because " << errmsg << ": official " << maxVersion
- << " mine: " << shardingState.getVersion(ns) << migrateLog;
- return false;
- }
+ // It makes no sense to migrate if our version is zero and we have no chunks
+ errmsg = str::stream() << "moveChunk cannot start migrate of chunk "
+ << "[" << minKey << "," << maxKey << ")"
+ << " with zero shard version";
- // since this could be the first call that enable sharding we also make sure to
- // load the shard's metadata, if we don't have it
- shardingState.gotShardName( myOldShard );
+ warning() << errmsg;
+ return false;
+ }
- // Always refresh our metadata remotely
- // TODO: The above checks should be removed, we should only have one refresh
- // mechanism.
- ChunkVersion startingVersion;
- Status status = shardingState.refreshMetadataNow(txn, ns, &startingVersion );
+ // Get collection metadata
+ const CollectionMetadataPtr origCollMetadata(shardingState.getCollectionMetadata(ns));
+ // With nonzero shard version, we must have metadata
+ invariant(NULL != origCollMetadata);
- if (!status.isOK()) {
- errmsg = str::stream() << "moveChunk cannot start migrate of chunk "
- << "[" << currMin << "," << currMax << ")"
- << causedBy( status.reason() );
+ ChunkVersion origCollVersion = origCollMetadata->getCollVersion();
+ BSONObj shardKeyPattern = origCollMetadata->getKeyPattern();
- warning() << errmsg << endl;
- return false;
- }
+ // With nonzero shard version, we must have a coll version >= our shard version
+ invariant(origCollVersion >= origShardVersion);
+ // With nonzero shard version, we must have a shard key
+ invariant(!shardKeyPattern.isEmpty());
- if (startingVersion.majorVersion() == 0) {
- // It makes no sense to migrate if our version is zero and we have no chunks
- errmsg = str::stream() << "moveChunk cannot start migrate of chunk "
- << "[" << currMin << "," << currMax << ")"
- << " with zero shard version";
+ ChunkType origChunk;
+ if (!origCollMetadata->getNextChunk(min, &origChunk)
+ || origChunk.getMin().woCompare(min) || origChunk.getMax().woCompare(max)) {
- warning() << errmsg << endl;
- return false;
- }
+ // Our boundaries are different from those passed in
+ errmsg = str::stream() << "moveChunk cannot find chunk "
+ << "[" << minKey << "," << maxKey << ")"
+ << " to migrate, the chunk boundaries may be stale";
- log() << "moveChunk request accepted at version " << startingVersion << migrateLog;
+ warning() << errmsg;
+ return false;
}
+ log() << "moveChunk request accepted at version " << origShardVersion;
+
timing.done(2);
MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep2);
// 3.
-
- const CollectionMetadataPtr origCollMetadata( shardingState.getCollectionMetadata( ns ) );
- verify( origCollMetadata != NULL );
- BSONObj shardKeyPattern = origCollMetadata->getKeyPattern();
- if ( shardKeyPattern.isEmpty() ){
- errmsg = "no shard key found";
- warning() << errmsg << endl;
- return false;
- }
-
MigrateStatusHolder statusHolder(txn, ns, min, max, shardKeyPattern);
+
if (statusHolder.isAnotherMigrationActive()) {
errmsg = "moveChunk is already in progress from this shard";
warning() << errmsg << endl;
@@ -1132,7 +1087,7 @@ namespace mongo {
}
catch( DBException& e ){
errmsg = str::stream() << "moveChunk could not contact to: shard "
- << to << " to start transfer" << causedBy( e );
+ << toShardName << " to start transfer" << causedBy( e );
warning() << errmsg << endl;
return false;
}
@@ -1170,7 +1125,7 @@ namespace mongo {
res = res.getOwned();
}
catch( DBException& e ){
- errmsg = str::stream() << "moveChunk could not contact to: shard " << to << " to monitor transfer" << causedBy( e );
+ errmsg = str::stream() << "moveChunk could not contact to: shard " << toShardName << " to monitor transfer" << causedBy( e );
warning() << errmsg << endl;
return false;
}
@@ -1254,7 +1209,7 @@ namespace mongo {
// Ensure distributed lock still held
string lockHeldMsg;
- bool lockHeld = dlk.isLockHeld( 30.0 /* timeout */, &lockHeldMsg );
+ bool lockHeld = collLock.verifyLockHeld(&lockHeldMsg);
if ( !lockHeld ) {
errmsg = str::stream() << "not entering migrate critical section because "
<< lockHeldMsg;
@@ -1269,7 +1224,7 @@ namespace mongo {
// we're under the collection lock here, so no other migrate can change maxVersion
// or CollectionMetadata state
migrateFromStatus.setInCriticalSection( true );
- ChunkVersion myVersion = maxVersion;
+ ChunkVersion myVersion = origCollVersion;
myVersion.incMajor();
{
@@ -1306,7 +1261,7 @@ namespace mongo {
if ( !ok || MONGO_FAIL_POINT(failMigrationCommit) ) {
log() << "moveChunk migrate commit not accepted by TO-shard: " << res
- << " resetting shard version to: " << startingVersion << migrateLog;
+ << " resetting shard version to: " << origShardVersion << migrateLog;
{
Lock::GlobalWrite lk(txn->lockState());
log() << "moveChunk global lock acquired to reset shard version from "
@@ -1426,7 +1381,7 @@ namespace mongo {
{
BSONObjBuilder bb( b.subobjStart( "res" ) );
// TODO: For backwards compatibility, we can't yet require an epoch here
- bb.appendTimestamp(ChunkType::DEPRECATED_lastmod(), maxVersion.toLong());
+ bb.appendTimestamp(ChunkType::DEPRECATED_lastmod(), origCollVersion.toLong());
bb.done();
}
preCond.append( b.obj() );
diff --git a/src/mongo/s/d_split.cpp b/src/mongo/s/d_split.cpp
index 8028a0634c3..254ec7f72bb 100644
--- a/src/mongo/s/d_split.cpp
+++ b/src/mongo/s/d_split.cpp
@@ -533,8 +533,8 @@ namespace mongo {
return false;
}
- const string from = cmdObj["from"].str();
- if ( from.empty() ) {
+ const string shardName = cmdObj["from"].str();
+ if ( shardName.empty() ) {
errmsg = "need specify server to split chunk at";
return false;
}
@@ -556,134 +556,99 @@ namespace mongo {
return false;
}
- // It is possible that this is the first sharded command this mongod is asked to perform. If so,
- // start sharding apparatus. We'd still be missing some more shard-related info but we'll get it
- // in step 2. below.
+ //
+ // Get sharding state up-to-date
+ //
+
+ // This could be the first call that enables sharding - make sure we initialize the
+ // sharding state for this shard.
if ( ! shardingState.enabled() ) {
if ( cmdObj["configdb"].type() != String ) {
errmsg = "sharding not enabled";
+ warning() << errmsg << endl;
return false;
}
string configdb = cmdObj["configdb"].String();
ShardingState::initialize(configdb);
}
- Shard myShard( from );
+ // Initialize our current shard name in the shard state if needed
+ shardingState.gotShardName(shardName);
+
+ ConnectionString configLoc = ConnectionString::parse(shardingState.getConfigServer(),
+ errmsg);
+ if (!configLoc.isValid()) {
+ warning() << errmsg;
+ return false;
+ }
- log() << "received splitChunk request: " << cmdObj << endl;
+ log() << "received splitChunk request: " << cmdObj;
//
// 2. lock the collection's metadata and get highest version for the current shard
//
- DistributedLock lockSetup( ConnectionString( shardingState.getConfigServer() , ConnectionString::SYNC) , ns );
- dist_lock_try dlk;
+ ScopedDistributedLock collLock(configLoc, ns);
+ collLock.setLockMessage(str::stream() << "splitting chunk [" << minKey << ", " << maxKey
+ << ") in " << ns);
- try{
- dlk = dist_lock_try( &lockSetup, string("split-") + min.toString() );
- }
- catch( LockException& e ){
- errmsg = str::stream() << "Error locking distributed lock for split." << causedBy( e );
- return false;
- }
+ if (!collLock.tryAcquire(&errmsg)) {
+
+ errmsg = str::stream() << "could not acquire collection lock for " << ns
+ << " to split chunk [" << minKey << "," << maxKey << ")"
+ << causedBy(errmsg);
- if ( ! dlk.got() ) {
- errmsg = "the collection's metadata lock is taken";
- result.append( "who" , dlk.other() );
+ warning() << errmsg;
return false;
}
- // TODO This is a check migrate does to the letter. Factor it out and share. 2010-10-22
+ // Always check our version remotely
+ ChunkVersion shardVersion;
+ Status refreshStatus = shardingState.refreshMetadataNow(txn, ns, &shardVersion);
- ChunkVersion maxVersion;
- ChunkType origChunk;
- {
- ScopedDbConnection conn(shardingState.getConfigServer(), 30);
+ if (!refreshStatus.isOK()) {
- BSONObj x = conn->findOne(ChunkType::ConfigNS,
- Query(BSON(ChunkType::ns(ns)))
- .sort(BSON(ChunkType::DEPRECATED_lastmod() << -1)));
+ errmsg = str::stream() << "splitChunk cannot split chunk " << "[" << minKey << ","
+ << maxKey << ")" << causedBy(refreshStatus.reason());
- maxVersion = ChunkVersion::fromBSON(x, ChunkType::DEPRECATED_lastmod());
-
- BSONObj currChunk =
- conn->findOne(ChunkType::ConfigNS,
- shardId.wrap(ChunkType::name().c_str())).getOwned();
- conn.done();
-
- if (!origChunk.parseBSON(currChunk, &errmsg)) {
- return false;
- }
-
- const BSONObj currMin = origChunk.getMin();
- const BSONObj currMax = origChunk.getMax();
- if ( currMin.woCompare( min ) || currMax.woCompare( max ) ) {
- errmsg = "chunk boundaries are outdated (likely a split occurred)";
- result.append( "currMin" , currMin );
- result.append( "currMax" , currMax );
- result.append( "requestedMin" , min );
- result.append( "requestedMax" , max );
-
- warning() << "aborted split because " << errmsg << ": " << min << "->" << max
- << " is now " << currMin << "->" << currMax << endl;
- return false;
- }
-
- if (origChunk.getShard() != myShard.getName()) {
- errmsg = "location is outdated (likely balance or migrate occurred)";
- result.append( "from" , myShard.getName() );
- result.append("official", origChunk.getShard());
-
- warning() << "aborted split because " << errmsg
- << ": chunk is at " << origChunk.getShard()
- << " and not at " << myShard.getName() << endl;
- return false;
- }
-
- if ( maxVersion < shardingState.getVersion( ns ) ) {
- errmsg = "official version less than mine?";
- maxVersion.addToBSON( result, "officialVersion" );
- shardingState.getVersion( ns ).addToBSON( result, "myVersion" );
-
- warning() << "aborted split because " << errmsg << ": official " << maxVersion
- << " mine: " << shardingState.getVersion(ns) << endl;
- return false;
- }
+ warning() << errmsg;
+ return false;
+ }
- // since this could be the first call that enable sharding we also make sure to load
- // the shard's metadata
- shardingState.gotShardName(origChunk.getShard());
+ if (shardVersion.majorVersion() == 0) {
- // Always check our version remotely.
- // TODO: Make this less expensive by using the incoming request's shard version.
- // TODO: The above checks should be removed, we should only have one refresh
- // mechanism.
- ChunkVersion shardVersion;
- Status status = shardingState.refreshMetadataNow(txn, ns, &shardVersion);
+ // It makes no sense to split if our version is zero and we have no chunks
+ errmsg = str::stream() << "splitChunk cannot split chunk " << "[" << minKey << ","
+ << maxKey << ")" << " with zero shard version";
- if (!status.isOK()) {
- errmsg = str::stream() << "splitChunk cannot split chunk "
- << "[" << currMin << "," << currMax << ")"
- << causedBy( status.reason() );
+ warning() << errmsg;
+ return false;
+ }
- warning() << errmsg << endl;
- return false;
- }
+ // Get collection metadata
+ const CollectionMetadataPtr collMetadata(shardingState.getCollectionMetadata(ns));
+ // With nonzero shard version, we must have metadata
+ invariant(NULL != collMetadata);
- if ( shardVersion.majorVersion() == 0 ) {
- // It makes no sense to split if our version is zero and we have no chunks
- errmsg = str::stream() << "splitChunk cannot split chunk "
- << "[" << currMin << "," << currMax << ")"
- << " with zero shard version";
+ ChunkVersion collVersion = collMetadata->getCollVersion();
+ // With nonzero shard version, we must have a coll version >= our shard version
+ invariant(collVersion >= shardVersion);
- warning() << errmsg << endl;
- return false;
- }
+ ChunkType origChunk;
+ if (!collMetadata->getNextChunk(min, &origChunk)
+ || origChunk.getMin().woCompare(min) || origChunk.getMax().woCompare(max)) {
- log() << "splitChunk accepted at version " << shardVersion << endl;
+ // Our boundaries are different from those passed in
+ errmsg = str::stream() << "splitChunk cannot find chunk "
+ << "[" << minKey << "," << maxKey << ")"
+ << " to split, the chunk boundaries may be stale";
+ warning() << errmsg;
+ return false;
}
+ log() << "splitChunk accepted at version " << shardVersion;
+
//
// 3. create the batch of updates to metadata ( the new chunks ) to be applied via 'applyOps' command
//
@@ -693,7 +658,7 @@ namespace mongo {
LOG(1) << "before split on " << origChunk << endl;
OwnedPointerVector<ChunkType> newChunks;
- ChunkVersion myVersion = maxVersion;
+ ChunkVersion nextChunkVersion = collVersion;
BSONObj startKey = min;
splitKeys.push_back( max ); // makes it easier to have 'max' in the next loop. remove later.
@@ -712,14 +677,13 @@ namespace mongo {
return false;
}
- CollectionMetadataPtr metadata(shardingState.getCollectionMetadata(ns));
- if (!isShardDocSizeValid(metadata->getKeyPattern(), endKey, &errmsg)) {
+ if (!isShardDocSizeValid(collMetadata->getKeyPattern(), endKey, &errmsg)) {
warning() << errmsg << endl;
return false;
}
// splits only update the 'minor' portion of version
- myVersion.incMinor();
+ nextChunkVersion.incMinor();
// build an update operation against the chunks collection of the config database with
// upsert true
@@ -731,11 +695,11 @@ namespace mongo {
// add the modified (new) chunk information as the update object
BSONObjBuilder n( op.subobjStart( "o" ) );
n.append(ChunkType::name(), Chunk::genID(ns, startKey));
- myVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod());
+ nextChunkVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod());
n.append(ChunkType::ns(), ns);
n.append(ChunkType::min(), startKey);
n.append(ChunkType::max(), endKey);
- n.append(ChunkType::shard(), origChunk.getShard());
+ n.append(ChunkType::shard(), shardName);
n.done();
// add the chunk's _id as the query part of the update statement
@@ -749,13 +713,15 @@ namespace mongo {
auto_ptr<ChunkType> chunk(new ChunkType());
chunk->setMin(startKey);
chunk->setMax(endKey);
- chunk->setVersion(myVersion);
+ chunk->setVersion(nextChunkVersion);
newChunks.push_back(chunk.release());
startKey = endKey;
}
+ splitKeys.pop_back(); // 'max' was used as sentinel
+
updates.done();
{
@@ -767,7 +733,7 @@ namespace mongo {
{
BSONObjBuilder bb( b.subobjStart( "res" ) );
// TODO: For backwards compatibility, we can't yet require an epoch here
- bb.appendTimestamp(ChunkType::DEPRECATED_lastmod(), maxVersion.toLong());
+ bb.appendTimestamp(ChunkType::DEPRECATED_lastmod(), collVersion.toLong());
bb.done();
}
preCond.append( b.obj() );
@@ -800,13 +766,20 @@ namespace mongo {
//
// Install chunk metadata with knowledge about newly split chunks in this shard's state
//
-
- splitKeys.pop_back(); // 'max' was used as sentinel
- maxVersion.incMinor();
{
Lock::DBWrite writeLk(txn->lockState(), ns);
- shardingState.splitChunk(txn, ns, min, max, splitKeys, maxVersion);
+
+ // NOTE: The newShardVersion resulting from this split is higher than any other
+ // chunk version, so it's also implicitly the newCollVersion
+ ChunkVersion newShardVersion = collVersion;
+
+ // Increment the minor version once, shardingState.splitChunk increments once per
+ // split point (resulting in the correct final shard/collection version)
+ // TODO: Revisit this interface, it's a bit clunky
+ newShardVersion.incMinor();
+
+ shardingState.splitChunk(txn, ns, min, max, splitKeys, newShardVersion);
}
//
@@ -880,7 +853,8 @@ namespace mongo {
BSONObjBuilder bb(b);
bb.append(ChunkType::min(), chunk.getMin());
bb.append(ChunkType::max(), chunk.getMax());
- chunk.getVersion().addToBSON(bb, ChunkType::DEPRECATED_lastmod());
+ if (chunk.isVersionSet())
+ chunk.getVersion().addToBSON(bb, ChunkType::DEPRECATED_lastmod());
bb.done();
}
diff --git a/src/mongo/s/distlock.cpp b/src/mongo/s/distlock.cpp
index 25823a27253..18d5fdae8c3 100644
--- a/src/mongo/s/distlock.cpp
+++ b/src/mongo/s/distlock.cpp
@@ -542,19 +542,19 @@ namespace mongo {
bool DistributedLock::isLockHeld( double timeout, string* errMsg ) {
- ScopedDbConnection conn(_conn.toString(), timeout );
BSONObj lockObj;
try {
+ ScopedDbConnection conn(_conn.toString(), timeout );
lockObj = conn->findOne( LocksType::ConfigNS,
BSON( LocksType::name(_name) ) ).getOwned();
+ conn.done();
}
catch ( DBException& e ) {
*errMsg = str::stream() << "error checking whether lock " << _name << " is held "
<< causedBy( e );
return false;
}
- conn.done();
if ( lockObj.isEmpty() ) {
*errMsg = str::stream() << "no lock for " << _name << " exists in the locks collection";
@@ -1149,9 +1149,15 @@ namespace mongo {
<< " minutes timeout." << endl;
}
- ScopedDistributedLock::ScopedDistributedLock(const ConnectionString& conn, const string& name) :
- _lock(conn, name), _why(""), _lockTryIntervalMillis(1000), _acquired(false)
- {
+ const long long ScopedDistributedLock::kDefaultLockTryIntervalMillis = 1000;
+ const long long ScopedDistributedLock::kDefaultSocketTimeoutMillis = 30 * 1000;
+
+ ScopedDistributedLock::ScopedDistributedLock(const ConnectionString& conn, const string& name)
+ : _lock(conn, name),
+ _why(""),
+ _lockTryIntervalMillis(kDefaultLockTryIntervalMillis),
+ _socketTimeoutMillis(kDefaultSocketTimeoutMillis),
+ _acquired(false) {
}
ScopedDistributedLock::~ScopedDistributedLock() {
@@ -1162,7 +1168,10 @@ namespace mongo {
bool ScopedDistributedLock::tryAcquire(string* errMsg) {
try {
- _acquired = _lock.lock_try(_why, false, &_other);
+ _acquired = _lock.lock_try(_why,
+ false,
+ &_other,
+ static_cast<double>(_socketTimeoutMillis / 1000));
}
catch (const DBException& e) {
@@ -1224,4 +1233,17 @@ namespace mongo {
return false;
}
+ /**
+ * Returns false if the lock is known _not_ to be held, otherwise asks the underlying
+ * lock to issue a 'isLockHeld' call and returns whatever that calls does.
+ */
+ bool ScopedDistributedLock::verifyLockHeld(std::string* errMsg) {
+ if (!_acquired) {
+ *errMsg = "lock was never acquired";
+ return false;
+ }
+
+ return _lock.isLockHeld(static_cast<double>(_socketTimeoutMillis / 1000), errMsg);
+ }
+
}
diff --git a/src/mongo/s/distlock.h b/src/mongo/s/distlock.h
index 8516392df27..5fd57736be5 100644
--- a/src/mongo/s/distlock.h
+++ b/src/mongo/s/distlock.h
@@ -318,25 +318,22 @@ namespace mongo {
class MONGO_CLIENT_API ScopedDistributedLock {
public:
+ static const long long kDefaultLockTryIntervalMillis;
+ static const long long kDefaultSocketTimeoutMillis;
+
ScopedDistributedLock(const ConnectionString& conn, const std::string& name);
- virtual ~ScopedDistributedLock();
+ ~ScopedDistributedLock();
/**
* Tries once to obtain a lock, and can fail with an error message.
- *
- * Subclasses of this lock can override this method (and are also required to call the base
- * in the overridden method).
- *
- * @return if the lock was successfully acquired
+ * Returns true if the lock was successfully acquired.
*/
- virtual bool tryAcquire(std::string* errMsg);
+ bool tryAcquire(std::string* errMsg);
/**
* Tries to unlock the lock if acquired. Cannot report an error or block indefinitely
* (though it may log messages or continue retrying in a non-blocking way).
- *
- * Subclasses should define their own destructor unlockXXX() methods.
*/
void unlock();
@@ -351,6 +348,12 @@ namespace mongo {
*/
bool acquire(long long waitForMillis, std::string* errMsg);
+ /**
+ * If lock is held, remotely verifies that the lock has not been forced as a sanity check.
+ * If the lock is not held or cannot be verified, returns false with errMsg.
+ */
+ bool verifyLockHeld(std::string* errMsg);
+
bool isAcquired() const {
return _acquired;
}
@@ -375,10 +378,19 @@ namespace mongo {
return _why;
}
+ void setSocketTimeoutMillis(long long socketTimeoutMillis) {
+ _socketTimeoutMillis = socketTimeoutMillis;
+ }
+
+ long long getSocketTimeoutMillis() const {
+ return _socketTimeoutMillis;
+ }
+
private:
DistributedLock _lock;
std::string _why;
long long _lockTryIntervalMillis;
+ long long _socketTimeoutMillis;
bool _acquired;
BSONObj _other;