diff options
author | Greg Studer <greg@10gen.com> | 2014-05-29 15:11:56 -0400 |
---|---|---|
committer | Greg Studer <greg@10gen.com> | 2014-08-20 13:44:01 -0400 |
commit | 88e30849b29c0e2050bba768545daefa170be927 (patch) | |
tree | 867109e30b50b22ca342d783cc385158fbd6b45b /src | |
parent | fb1e82a243bc7c2b96ea1cfa78370f08a4c59bc6 (diff) | |
download | mongo-88e30849b29c0e2050bba768545daefa170be927.tar.gz |
SERVER-13474 remove redundant split and migrate metadata validation steps
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/s/d_migrate.cpp | 215 | ||||
-rw-r--r-- | src/mongo/s/d_split.cpp | 194 | ||||
-rw-r--r-- | src/mongo/s/distlock.cpp | 34 | ||||
-rw-r--r-- | src/mongo/s/distlock.h | 30 |
4 files changed, 218 insertions, 255 deletions
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index b6011313603..f0c86759a29 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -837,17 +837,14 @@ namespace mongo { // 1. string ns = parseNs(dbname, cmdObj); - string to = cmdObj["to"].str(); - string from = cmdObj["from"].str(); // my public address, a tad redundant, but safe - // fromShard and toShard needed so that 2.2 mongos can interact with either 2.0 or 2.2 mongod - if( cmdObj["fromShard"].type() == String ){ - from = cmdObj["fromShard"].String(); - } + // The shard addresses, redundant, but allows for validation + string toShardHost = cmdObj["to"].str(); + string fromShardHost = cmdObj["from"].str(); - if( cmdObj["toShard"].type() == String ){ - to = cmdObj["toShard"].String(); - } + // The shard names + string toShardName = cmdObj["toShard"].str(); + string fromShardName = cmdObj["fromShard"].str(); // Process secondary throttle settings and assign defaults if necessary. BSONObj secThrottleObj; @@ -894,11 +891,11 @@ namespace mongo { return false; } - if ( to.empty() ) { + if ( toShardName.empty() ) { errmsg = "need to specify shard to move chunk to"; return false; } - if ( from.empty() ) { + if ( fromShardName.empty() ) { errmsg = "need to specify shard to move chunk from"; return false; } @@ -924,28 +921,36 @@ namespace mongo { } const long long maxChunkSize = maxSizeElem.numberLong(); // in bytes + // This could be the first call that enables sharding - make sure we initialize the + // sharding state for this shard. if ( ! shardingState.enabled() ) { if ( cmdObj["configdb"].type() != String ) { errmsg = "sharding not enabled"; + warning() << errmsg << endl; return false; } string configdb = cmdObj["configdb"].String(); ShardingState::initialize(configdb); } - MoveTimingHelper timing( "from" , ns , min , max , 6 /* steps */ , &errmsg ); + // Initialize our current shard name in the shard state if needed + shardingState.gotShardName(fromShardName); // Make sure we're as up-to-date as possible with shard information // This catches the case where we had to previously changed a shard's host by // removing/adding a shard with the same name Shard::reloadShardInfo(); + Shard toShard(toShardName); + Shard fromShard(fromShardName); - // So 2.2 mongod can interact with 2.0 mongos, mongod needs to handle either a conn - // string or a shard in the to/from fields. The Shard constructor handles this, - // eventually we should break the compatibility. + ConnectionString configLoc = ConnectionString::parse(shardingState.getConfigServer(), + errmsg); + if (!configLoc.isValid()) { + warning() << errmsg; + return false; + } - Shard fromShard( from ); - Shard toShard( to ); + MoveTimingHelper timing( "from" , ns , min , max , 6 /* steps */ , &errmsg ); log() << "received moveChunk request: " << cmdObj << migrateLog; @@ -960,138 +965,88 @@ namespace mongo { return false; } - DistributedLock lockSetup( ConnectionString( shardingState.getConfigServer() , ConnectionString::SYNC ) , ns ); - dist_lock_try dlk; + // + // Get the distributed lock + // - try{ - dlk = dist_lock_try( &lockSetup , (string)"migrate-" + min.toString(), 30.0 /*timeout*/ ); - } - catch( LockException& e ){ - errmsg = str::stream() << "error locking distributed lock for migration " << "migrate-" << min.toString() << causedBy( e ); - warning() << errmsg << endl; - return false; - } - - if ( ! dlk.got() ) { - errmsg = str::stream() << "the collection metadata could not be locked with lock " << "migrate-" << min.toString(); - warning() << errmsg << endl; - result.append( "who" , dlk.other() ); - return false; - } + ScopedDistributedLock collLock(configLoc, ns); + collLock.setLockMessage(str::stream() << "migrating chunk [" << minKey << ", " << maxKey + << ") in " << ns); - BSONObj chunkInfo = BSON("min" << min << "max" << max << "from" << fromShard.getName() << "to" << toShard.getName() ); - configServer.logChange( "moveChunk.start" , ns , chunkInfo ); + if (!collLock.tryAcquire(&errmsg)) { - ChunkVersion maxVersion; - ChunkVersion startingVersion; - string myOldShard; - { - ScopedDbConnection conn(shardingState.getConfigServer(), 30); + errmsg = str::stream() << "could not acquire collection lock for " << ns + << " to migrate chunk [" << minKey << "," << maxKey << ")" + << causedBy(errmsg); - BSONObj x; - BSONObj currChunk; - try{ - x = conn->findOne(ChunkType::ConfigNS, - Query(BSON(ChunkType::ns(ns))) - .sort(BSON(ChunkType::DEPRECATED_lastmod() << -1))); + warning() << errmsg; + return false; + } - currChunk = conn->findOne(ChunkType::ConfigNS, - shardId.wrap(ChunkType::name().c_str())); - } - catch( DBException& e ){ - errmsg = str::stream() << "aborted moveChunk because could not get chunk data from config server " << shardingState.getConfigServer() << causedBy( e ); - warning() << errmsg << endl; - return false; - } + BSONObj chunkInfo = + BSON("min" << min << "max" << max << + "from" << fromShard.getName() << "to" << toShard.getName()); + configServer.logChange("moveChunk.start", ns, chunkInfo); - maxVersion = ChunkVersion::fromBSON(x, ChunkType::DEPRECATED_lastmod()); - verify(currChunk[ChunkType::shard()].type()); - verify(currChunk[ChunkType::min()].type()); - verify(currChunk[ChunkType::max()].type()); - myOldShard = currChunk[ChunkType::shard()].String(); - conn.done(); + // Always refresh our metadata remotely + ChunkVersion origShardVersion; + Status refreshStatus = shardingState.refreshMetadataNow(txn, ns, &origShardVersion); - BSONObj currMin = currChunk[ChunkType::min()].Obj(); - BSONObj currMax = currChunk[ChunkType::max()].Obj(); - if ( currMin.woCompare( min ) || currMax.woCompare( max ) ) { - errmsg = "boundaries are outdated (likely a split occurred)"; - result.append( "currMin" , currMin ); - result.append( "currMax" , currMax ); - result.append( "requestedMin" , min ); - result.append( "requestedMax" , max ); - - warning() << "aborted moveChunk because" << errmsg << ": " << min << "->" << max - << " is now " << currMin << "->" << currMax << migrateLog; - return false; - } + if (!refreshStatus.isOK()) { - if ( myOldShard != fromShard.getName() ) { - errmsg = "location is outdated (likely balance or migrate occurred)"; - result.append( "from" , fromShard.getName() ); - result.append( "official" , myOldShard ); + errmsg = str::stream() << "moveChunk cannot start migrate of chunk " + << "[" << minKey << "," << maxKey << ")" + << causedBy(refreshStatus.reason()); - warning() << "aborted moveChunk because " << errmsg << ": chunk is at " << myOldShard - << " and not at " << fromShard.getName() << migrateLog; - return false; - } + warning() << errmsg; + return false; + } - if ( maxVersion < shardingState.getVersion( ns ) ) { - errmsg = "official version less than mine?"; - maxVersion.addToBSON( result, "officialVersion" ); - shardingState.getVersion( ns ).addToBSON( result, "myVersion" ); + if (origShardVersion.majorVersion() == 0) { - warning() << "aborted moveChunk because " << errmsg << ": official " << maxVersion - << " mine: " << shardingState.getVersion(ns) << migrateLog; - return false; - } + // It makes no sense to migrate if our version is zero and we have no chunks + errmsg = str::stream() << "moveChunk cannot start migrate of chunk " + << "[" << minKey << "," << maxKey << ")" + << " with zero shard version"; - // since this could be the first call that enable sharding we also make sure to - // load the shard's metadata, if we don't have it - shardingState.gotShardName( myOldShard ); + warning() << errmsg; + return false; + } - // Always refresh our metadata remotely - // TODO: The above checks should be removed, we should only have one refresh - // mechanism. - ChunkVersion startingVersion; - Status status = shardingState.refreshMetadataNow(txn, ns, &startingVersion ); + // Get collection metadata + const CollectionMetadataPtr origCollMetadata(shardingState.getCollectionMetadata(ns)); + // With nonzero shard version, we must have metadata + invariant(NULL != origCollMetadata); - if (!status.isOK()) { - errmsg = str::stream() << "moveChunk cannot start migrate of chunk " - << "[" << currMin << "," << currMax << ")" - << causedBy( status.reason() ); + ChunkVersion origCollVersion = origCollMetadata->getCollVersion(); + BSONObj shardKeyPattern = origCollMetadata->getKeyPattern(); - warning() << errmsg << endl; - return false; - } + // With nonzero shard version, we must have a coll version >= our shard version + invariant(origCollVersion >= origShardVersion); + // With nonzero shard version, we must have a shard key + invariant(!shardKeyPattern.isEmpty()); - if (startingVersion.majorVersion() == 0) { - // It makes no sense to migrate if our version is zero and we have no chunks - errmsg = str::stream() << "moveChunk cannot start migrate of chunk " - << "[" << currMin << "," << currMax << ")" - << " with zero shard version"; + ChunkType origChunk; + if (!origCollMetadata->getNextChunk(min, &origChunk) + || origChunk.getMin().woCompare(min) || origChunk.getMax().woCompare(max)) { - warning() << errmsg << endl; - return false; - } + // Our boundaries are different from those passed in + errmsg = str::stream() << "moveChunk cannot find chunk " + << "[" << minKey << "," << maxKey << ")" + << " to migrate, the chunk boundaries may be stale"; - log() << "moveChunk request accepted at version " << startingVersion << migrateLog; + warning() << errmsg; + return false; } + log() << "moveChunk request accepted at version " << origShardVersion; + timing.done(2); MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep2); // 3. - - const CollectionMetadataPtr origCollMetadata( shardingState.getCollectionMetadata( ns ) ); - verify( origCollMetadata != NULL ); - BSONObj shardKeyPattern = origCollMetadata->getKeyPattern(); - if ( shardKeyPattern.isEmpty() ){ - errmsg = "no shard key found"; - warning() << errmsg << endl; - return false; - } - MigrateStatusHolder statusHolder(txn, ns, min, max, shardKeyPattern); + if (statusHolder.isAnotherMigrationActive()) { errmsg = "moveChunk is already in progress from this shard"; warning() << errmsg << endl; @@ -1132,7 +1087,7 @@ namespace mongo { } catch( DBException& e ){ errmsg = str::stream() << "moveChunk could not contact to: shard " - << to << " to start transfer" << causedBy( e ); + << toShardName << " to start transfer" << causedBy( e ); warning() << errmsg << endl; return false; } @@ -1170,7 +1125,7 @@ namespace mongo { res = res.getOwned(); } catch( DBException& e ){ - errmsg = str::stream() << "moveChunk could not contact to: shard " << to << " to monitor transfer" << causedBy( e ); + errmsg = str::stream() << "moveChunk could not contact to: shard " << toShardName << " to monitor transfer" << causedBy( e ); warning() << errmsg << endl; return false; } @@ -1254,7 +1209,7 @@ namespace mongo { // Ensure distributed lock still held string lockHeldMsg; - bool lockHeld = dlk.isLockHeld( 30.0 /* timeout */, &lockHeldMsg ); + bool lockHeld = collLock.verifyLockHeld(&lockHeldMsg); if ( !lockHeld ) { errmsg = str::stream() << "not entering migrate critical section because " << lockHeldMsg; @@ -1269,7 +1224,7 @@ namespace mongo { // we're under the collection lock here, so no other migrate can change maxVersion // or CollectionMetadata state migrateFromStatus.setInCriticalSection( true ); - ChunkVersion myVersion = maxVersion; + ChunkVersion myVersion = origCollVersion; myVersion.incMajor(); { @@ -1306,7 +1261,7 @@ namespace mongo { if ( !ok || MONGO_FAIL_POINT(failMigrationCommit) ) { log() << "moveChunk migrate commit not accepted by TO-shard: " << res - << " resetting shard version to: " << startingVersion << migrateLog; + << " resetting shard version to: " << origShardVersion << migrateLog; { Lock::GlobalWrite lk(txn->lockState()); log() << "moveChunk global lock acquired to reset shard version from " @@ -1426,7 +1381,7 @@ namespace mongo { { BSONObjBuilder bb( b.subobjStart( "res" ) ); // TODO: For backwards compatibility, we can't yet require an epoch here - bb.appendTimestamp(ChunkType::DEPRECATED_lastmod(), maxVersion.toLong()); + bb.appendTimestamp(ChunkType::DEPRECATED_lastmod(), origCollVersion.toLong()); bb.done(); } preCond.append( b.obj() ); diff --git a/src/mongo/s/d_split.cpp b/src/mongo/s/d_split.cpp index 8028a0634c3..254ec7f72bb 100644 --- a/src/mongo/s/d_split.cpp +++ b/src/mongo/s/d_split.cpp @@ -533,8 +533,8 @@ namespace mongo { return false; } - const string from = cmdObj["from"].str(); - if ( from.empty() ) { + const string shardName = cmdObj["from"].str(); + if ( shardName.empty() ) { errmsg = "need specify server to split chunk at"; return false; } @@ -556,134 +556,99 @@ namespace mongo { return false; } - // It is possible that this is the first sharded command this mongod is asked to perform. If so, - // start sharding apparatus. We'd still be missing some more shard-related info but we'll get it - // in step 2. below. + // + // Get sharding state up-to-date + // + + // This could be the first call that enables sharding - make sure we initialize the + // sharding state for this shard. if ( ! shardingState.enabled() ) { if ( cmdObj["configdb"].type() != String ) { errmsg = "sharding not enabled"; + warning() << errmsg << endl; return false; } string configdb = cmdObj["configdb"].String(); ShardingState::initialize(configdb); } - Shard myShard( from ); + // Initialize our current shard name in the shard state if needed + shardingState.gotShardName(shardName); + + ConnectionString configLoc = ConnectionString::parse(shardingState.getConfigServer(), + errmsg); + if (!configLoc.isValid()) { + warning() << errmsg; + return false; + } - log() << "received splitChunk request: " << cmdObj << endl; + log() << "received splitChunk request: " << cmdObj; // // 2. lock the collection's metadata and get highest version for the current shard // - DistributedLock lockSetup( ConnectionString( shardingState.getConfigServer() , ConnectionString::SYNC) , ns ); - dist_lock_try dlk; + ScopedDistributedLock collLock(configLoc, ns); + collLock.setLockMessage(str::stream() << "splitting chunk [" << minKey << ", " << maxKey + << ") in " << ns); - try{ - dlk = dist_lock_try( &lockSetup, string("split-") + min.toString() ); - } - catch( LockException& e ){ - errmsg = str::stream() << "Error locking distributed lock for split." << causedBy( e ); - return false; - } + if (!collLock.tryAcquire(&errmsg)) { + + errmsg = str::stream() << "could not acquire collection lock for " << ns + << " to split chunk [" << minKey << "," << maxKey << ")" + << causedBy(errmsg); - if ( ! dlk.got() ) { - errmsg = "the collection's metadata lock is taken"; - result.append( "who" , dlk.other() ); + warning() << errmsg; return false; } - // TODO This is a check migrate does to the letter. Factor it out and share. 2010-10-22 + // Always check our version remotely + ChunkVersion shardVersion; + Status refreshStatus = shardingState.refreshMetadataNow(txn, ns, &shardVersion); - ChunkVersion maxVersion; - ChunkType origChunk; - { - ScopedDbConnection conn(shardingState.getConfigServer(), 30); + if (!refreshStatus.isOK()) { - BSONObj x = conn->findOne(ChunkType::ConfigNS, - Query(BSON(ChunkType::ns(ns))) - .sort(BSON(ChunkType::DEPRECATED_lastmod() << -1))); + errmsg = str::stream() << "splitChunk cannot split chunk " << "[" << minKey << "," + << maxKey << ")" << causedBy(refreshStatus.reason()); - maxVersion = ChunkVersion::fromBSON(x, ChunkType::DEPRECATED_lastmod()); - - BSONObj currChunk = - conn->findOne(ChunkType::ConfigNS, - shardId.wrap(ChunkType::name().c_str())).getOwned(); - conn.done(); - - if (!origChunk.parseBSON(currChunk, &errmsg)) { - return false; - } - - const BSONObj currMin = origChunk.getMin(); - const BSONObj currMax = origChunk.getMax(); - if ( currMin.woCompare( min ) || currMax.woCompare( max ) ) { - errmsg = "chunk boundaries are outdated (likely a split occurred)"; - result.append( "currMin" , currMin ); - result.append( "currMax" , currMax ); - result.append( "requestedMin" , min ); - result.append( "requestedMax" , max ); - - warning() << "aborted split because " << errmsg << ": " << min << "->" << max - << " is now " << currMin << "->" << currMax << endl; - return false; - } - - if (origChunk.getShard() != myShard.getName()) { - errmsg = "location is outdated (likely balance or migrate occurred)"; - result.append( "from" , myShard.getName() ); - result.append("official", origChunk.getShard()); - - warning() << "aborted split because " << errmsg - << ": chunk is at " << origChunk.getShard() - << " and not at " << myShard.getName() << endl; - return false; - } - - if ( maxVersion < shardingState.getVersion( ns ) ) { - errmsg = "official version less than mine?"; - maxVersion.addToBSON( result, "officialVersion" ); - shardingState.getVersion( ns ).addToBSON( result, "myVersion" ); - - warning() << "aborted split because " << errmsg << ": official " << maxVersion - << " mine: " << shardingState.getVersion(ns) << endl; - return false; - } + warning() << errmsg; + return false; + } - // since this could be the first call that enable sharding we also make sure to load - // the shard's metadata - shardingState.gotShardName(origChunk.getShard()); + if (shardVersion.majorVersion() == 0) { - // Always check our version remotely. - // TODO: Make this less expensive by using the incoming request's shard version. - // TODO: The above checks should be removed, we should only have one refresh - // mechanism. - ChunkVersion shardVersion; - Status status = shardingState.refreshMetadataNow(txn, ns, &shardVersion); + // It makes no sense to split if our version is zero and we have no chunks + errmsg = str::stream() << "splitChunk cannot split chunk " << "[" << minKey << "," + << maxKey << ")" << " with zero shard version"; - if (!status.isOK()) { - errmsg = str::stream() << "splitChunk cannot split chunk " - << "[" << currMin << "," << currMax << ")" - << causedBy( status.reason() ); + warning() << errmsg; + return false; + } - warning() << errmsg << endl; - return false; - } + // Get collection metadata + const CollectionMetadataPtr collMetadata(shardingState.getCollectionMetadata(ns)); + // With nonzero shard version, we must have metadata + invariant(NULL != collMetadata); - if ( shardVersion.majorVersion() == 0 ) { - // It makes no sense to split if our version is zero and we have no chunks - errmsg = str::stream() << "splitChunk cannot split chunk " - << "[" << currMin << "," << currMax << ")" - << " with zero shard version"; + ChunkVersion collVersion = collMetadata->getCollVersion(); + // With nonzero shard version, we must have a coll version >= our shard version + invariant(collVersion >= shardVersion); - warning() << errmsg << endl; - return false; - } + ChunkType origChunk; + if (!collMetadata->getNextChunk(min, &origChunk) + || origChunk.getMin().woCompare(min) || origChunk.getMax().woCompare(max)) { - log() << "splitChunk accepted at version " << shardVersion << endl; + // Our boundaries are different from those passed in + errmsg = str::stream() << "splitChunk cannot find chunk " + << "[" << minKey << "," << maxKey << ")" + << " to split, the chunk boundaries may be stale"; + warning() << errmsg; + return false; } + log() << "splitChunk accepted at version " << shardVersion; + // // 3. create the batch of updates to metadata ( the new chunks ) to be applied via 'applyOps' command // @@ -693,7 +658,7 @@ namespace mongo { LOG(1) << "before split on " << origChunk << endl; OwnedPointerVector<ChunkType> newChunks; - ChunkVersion myVersion = maxVersion; + ChunkVersion nextChunkVersion = collVersion; BSONObj startKey = min; splitKeys.push_back( max ); // makes it easier to have 'max' in the next loop. remove later. @@ -712,14 +677,13 @@ namespace mongo { return false; } - CollectionMetadataPtr metadata(shardingState.getCollectionMetadata(ns)); - if (!isShardDocSizeValid(metadata->getKeyPattern(), endKey, &errmsg)) { + if (!isShardDocSizeValid(collMetadata->getKeyPattern(), endKey, &errmsg)) { warning() << errmsg << endl; return false; } // splits only update the 'minor' portion of version - myVersion.incMinor(); + nextChunkVersion.incMinor(); // build an update operation against the chunks collection of the config database with // upsert true @@ -731,11 +695,11 @@ namespace mongo { // add the modified (new) chunk information as the update object BSONObjBuilder n( op.subobjStart( "o" ) ); n.append(ChunkType::name(), Chunk::genID(ns, startKey)); - myVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod()); + nextChunkVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod()); n.append(ChunkType::ns(), ns); n.append(ChunkType::min(), startKey); n.append(ChunkType::max(), endKey); - n.append(ChunkType::shard(), origChunk.getShard()); + n.append(ChunkType::shard(), shardName); n.done(); // add the chunk's _id as the query part of the update statement @@ -749,13 +713,15 @@ namespace mongo { auto_ptr<ChunkType> chunk(new ChunkType()); chunk->setMin(startKey); chunk->setMax(endKey); - chunk->setVersion(myVersion); + chunk->setVersion(nextChunkVersion); newChunks.push_back(chunk.release()); startKey = endKey; } + splitKeys.pop_back(); // 'max' was used as sentinel + updates.done(); { @@ -767,7 +733,7 @@ namespace mongo { { BSONObjBuilder bb( b.subobjStart( "res" ) ); // TODO: For backwards compatibility, we can't yet require an epoch here - bb.appendTimestamp(ChunkType::DEPRECATED_lastmod(), maxVersion.toLong()); + bb.appendTimestamp(ChunkType::DEPRECATED_lastmod(), collVersion.toLong()); bb.done(); } preCond.append( b.obj() ); @@ -800,13 +766,20 @@ namespace mongo { // // Install chunk metadata with knowledge about newly split chunks in this shard's state // - - splitKeys.pop_back(); // 'max' was used as sentinel - maxVersion.incMinor(); { Lock::DBWrite writeLk(txn->lockState(), ns); - shardingState.splitChunk(txn, ns, min, max, splitKeys, maxVersion); + + // NOTE: The newShardVersion resulting from this split is higher than any other + // chunk version, so it's also implicitly the newCollVersion + ChunkVersion newShardVersion = collVersion; + + // Increment the minor version once, shardingState.splitChunk increments once per + // split point (resulting in the correct final shard/collection version) + // TODO: Revisit this interface, it's a bit clunky + newShardVersion.incMinor(); + + shardingState.splitChunk(txn, ns, min, max, splitKeys, newShardVersion); } // @@ -880,7 +853,8 @@ namespace mongo { BSONObjBuilder bb(b); bb.append(ChunkType::min(), chunk.getMin()); bb.append(ChunkType::max(), chunk.getMax()); - chunk.getVersion().addToBSON(bb, ChunkType::DEPRECATED_lastmod()); + if (chunk.isVersionSet()) + chunk.getVersion().addToBSON(bb, ChunkType::DEPRECATED_lastmod()); bb.done(); } diff --git a/src/mongo/s/distlock.cpp b/src/mongo/s/distlock.cpp index 25823a27253..18d5fdae8c3 100644 --- a/src/mongo/s/distlock.cpp +++ b/src/mongo/s/distlock.cpp @@ -542,19 +542,19 @@ namespace mongo { bool DistributedLock::isLockHeld( double timeout, string* errMsg ) { - ScopedDbConnection conn(_conn.toString(), timeout ); BSONObj lockObj; try { + ScopedDbConnection conn(_conn.toString(), timeout ); lockObj = conn->findOne( LocksType::ConfigNS, BSON( LocksType::name(_name) ) ).getOwned(); + conn.done(); } catch ( DBException& e ) { *errMsg = str::stream() << "error checking whether lock " << _name << " is held " << causedBy( e ); return false; } - conn.done(); if ( lockObj.isEmpty() ) { *errMsg = str::stream() << "no lock for " << _name << " exists in the locks collection"; @@ -1149,9 +1149,15 @@ namespace mongo { << " minutes timeout." << endl; } - ScopedDistributedLock::ScopedDistributedLock(const ConnectionString& conn, const string& name) : - _lock(conn, name), _why(""), _lockTryIntervalMillis(1000), _acquired(false) - { + const long long ScopedDistributedLock::kDefaultLockTryIntervalMillis = 1000; + const long long ScopedDistributedLock::kDefaultSocketTimeoutMillis = 30 * 1000; + + ScopedDistributedLock::ScopedDistributedLock(const ConnectionString& conn, const string& name) + : _lock(conn, name), + _why(""), + _lockTryIntervalMillis(kDefaultLockTryIntervalMillis), + _socketTimeoutMillis(kDefaultSocketTimeoutMillis), + _acquired(false) { } ScopedDistributedLock::~ScopedDistributedLock() { @@ -1162,7 +1168,10 @@ namespace mongo { bool ScopedDistributedLock::tryAcquire(string* errMsg) { try { - _acquired = _lock.lock_try(_why, false, &_other); + _acquired = _lock.lock_try(_why, + false, + &_other, + static_cast<double>(_socketTimeoutMillis / 1000)); } catch (const DBException& e) { @@ -1224,4 +1233,17 @@ namespace mongo { return false; } + /** + * Returns false if the lock is known _not_ to be held, otherwise asks the underlying + * lock to issue a 'isLockHeld' call and returns whatever that calls does. + */ + bool ScopedDistributedLock::verifyLockHeld(std::string* errMsg) { + if (!_acquired) { + *errMsg = "lock was never acquired"; + return false; + } + + return _lock.isLockHeld(static_cast<double>(_socketTimeoutMillis / 1000), errMsg); + } + } diff --git a/src/mongo/s/distlock.h b/src/mongo/s/distlock.h index 8516392df27..5fd57736be5 100644 --- a/src/mongo/s/distlock.h +++ b/src/mongo/s/distlock.h @@ -318,25 +318,22 @@ namespace mongo { class MONGO_CLIENT_API ScopedDistributedLock { public: + static const long long kDefaultLockTryIntervalMillis; + static const long long kDefaultSocketTimeoutMillis; + ScopedDistributedLock(const ConnectionString& conn, const std::string& name); - virtual ~ScopedDistributedLock(); + ~ScopedDistributedLock(); /** * Tries once to obtain a lock, and can fail with an error message. - * - * Subclasses of this lock can override this method (and are also required to call the base - * in the overridden method). - * - * @return if the lock was successfully acquired + * Returns true if the lock was successfully acquired. */ - virtual bool tryAcquire(std::string* errMsg); + bool tryAcquire(std::string* errMsg); /** * Tries to unlock the lock if acquired. Cannot report an error or block indefinitely * (though it may log messages or continue retrying in a non-blocking way). - * - * Subclasses should define their own destructor unlockXXX() methods. */ void unlock(); @@ -351,6 +348,12 @@ namespace mongo { */ bool acquire(long long waitForMillis, std::string* errMsg); + /** + * If lock is held, remotely verifies that the lock has not been forced as a sanity check. + * If the lock is not held or cannot be verified, returns false with errMsg. + */ + bool verifyLockHeld(std::string* errMsg); + bool isAcquired() const { return _acquired; } @@ -375,10 +378,19 @@ namespace mongo { return _why; } + void setSocketTimeoutMillis(long long socketTimeoutMillis) { + _socketTimeoutMillis = socketTimeoutMillis; + } + + long long getSocketTimeoutMillis() const { + return _socketTimeoutMillis; + } + private: DistributedLock _lock; std::string _why; long long _lockTryIntervalMillis; + long long _socketTimeoutMillis; bool _acquired; BSONObj _other; |