diff options
Diffstat (limited to 'src/mongo/s/chunk_diff-inl.h')
-rw-r--r-- | src/mongo/s/chunk_diff-inl.h | 386 |
1 files changed, 187 insertions, 199 deletions
diff --git a/src/mongo/s/chunk_diff-inl.h b/src/mongo/s/chunk_diff-inl.h index 0f88f2517b7..0c89d417000 100644 --- a/src/mongo/s/chunk_diff-inl.h +++ b/src/mongo/s/chunk_diff-inl.h @@ -39,229 +39,217 @@ namespace mongo { - template < class ValType, class ShardType > - bool ConfigDiffTracker<ValType,ShardType>:: - isOverlapping( const BSONObj& min, const BSONObj& max ) - { - RangeOverlap overlap = overlappingRange( min, max ); - - return overlap.first != overlap.second; +template <class ValType, class ShardType> +bool ConfigDiffTracker<ValType, ShardType>::isOverlapping(const BSONObj& min, const BSONObj& max) { + RangeOverlap overlap = overlappingRange(min, max); + + return overlap.first != overlap.second; +} + +template <class ValType, class ShardType> +void ConfigDiffTracker<ValType, ShardType>::removeOverlapping(const BSONObj& min, + const BSONObj& max) { + verifyAttached(); + + RangeOverlap overlap = overlappingRange(min, max); + + _currMap->erase(overlap.first, overlap.second); +} + +template <class ValType, class ShardType> +typename ConfigDiffTracker<ValType, ShardType>::RangeOverlap +ConfigDiffTracker<ValType, ShardType>::overlappingRange(const BSONObj& min, const BSONObj& max) { + verifyAttached(); + + typename RangeMap::iterator low; + typename RangeMap::iterator high; + + if (isMinKeyIndexed()) { + // Returns the first chunk with a min key that is >= min - implies the + // previous chunk cannot overlap min + low = _currMap->lower_bound(min); + // Returns the first chunk with a min key that is >= max - implies the + // chunk does not overlap max + high = _currMap->lower_bound(max); + } else { + // Returns the first chunk with a max key that is > min - implies that + // the chunk overlaps min + low = _currMap->upper_bound(min); + // Returns the first chunk with a max key that is > max - implies that + // the next chunk cannot not overlap max + high = _currMap->upper_bound(max); } - template < class ValType, class ShardType > - void ConfigDiffTracker<ValType,ShardType>:: - removeOverlapping( const BSONObj& min, const BSONObj& max ) - { - verifyAttached(); - - RangeOverlap overlap = overlappingRange( min, max ); + return RangeOverlap(low, high); +} - _currMap->erase( overlap.first, overlap.second ); - } +template <class ValType, class ShardType> +int ConfigDiffTracker<ValType, ShardType>::calculateConfigDiff(const std::string& config) { + verifyAttached(); - template < class ValType, class ShardType > - typename ConfigDiffTracker<ValType,ShardType>::RangeOverlap ConfigDiffTracker<ValType,ShardType>:: - overlappingRange( const BSONObj& min, const BSONObj& max ) - { - verifyAttached(); - - typename RangeMap::iterator low; - typename RangeMap::iterator high; - - if( isMinKeyIndexed() ){ - // Returns the first chunk with a min key that is >= min - implies the - // previous chunk cannot overlap min - low = _currMap->lower_bound( min ); - // Returns the first chunk with a min key that is >= max - implies the - // chunk does not overlap max - high = _currMap->lower_bound( max ); - } - else{ - // Returns the first chunk with a max key that is > min - implies that - // the chunk overlaps min - low = _currMap->upper_bound( min ); - // Returns the first chunk with a max key that is > max - implies that - // the next chunk cannot not overlap max - high = _currMap->upper_bound( max ); - } + // Get the diff query required + Query diffQuery = configDiffQuery(); - return RangeOverlap( low, high ); - } + ScopedDbConnection conn(config, 30.0); - template<class ValType, class ShardType> - int ConfigDiffTracker<ValType, ShardType>::calculateConfigDiff(const std::string& config) { - verifyAttached(); + try { + // Open a cursor for the diff chunks + std::auto_ptr<DBClientCursor> cursor = + conn->query(ChunkType::ConfigNS, diffQuery, 0, 0, 0, 0, (DEBUG_BUILD ? 2 : 1000000)); + verify(cursor.get()); - // Get the diff query required - Query diffQuery = configDiffQuery(); + int diff = calculateConfigDiff(*cursor.get()); - ScopedDbConnection conn(config, 30.0); + conn.done(); - try { + return diff; + } catch (DBException& e) { + // Should only happen on connection errors + e.addContext(str::stream() << "could not calculate config difference for ns " << _ns + << " on " << config); + throw; + } +} + +template <class ValType, class ShardType> +int ConfigDiffTracker<ValType, ShardType>::calculateConfigDiff( + DBClientCursorInterface& diffCursor) { + verifyAttached(); + + // Apply the chunk changes to the ranges and versions + + // + // Overall idea here is to work in two steps : + // 1. For all the new chunks we find, increment the maximum version per-shard and + // per-collection, and remove any conflicting chunks from the ranges + // 2. For all the new chunks we're interested in (all of them for mongos, just chunks on the + // shard for mongod) add them to the ranges + // + + std::vector<BSONObj> newTracked; + // Store epoch now so it doesn't change when we change max + OID currEpoch = _maxVersion->epoch(); + + _validDiffs = 0; + while (diffCursor.more()) { + BSONObj diffChunkDoc = diffCursor.next(); + + ChunkVersion chunkVersion = + ChunkVersion::fromBSON(diffChunkDoc, ChunkType::DEPRECATED_lastmod()); + + if (diffChunkDoc[ChunkType::min()].type() != Object || + diffChunkDoc[ChunkType::max()].type() != Object || + diffChunkDoc[ChunkType::shard()].type() != String) { + using namespace logger; + LogstreamBuilder( + globalLogDomain(), getThreadName(), LogSeverity::Warning(), LogComponent::kSharding) + << "got invalid chunk document " << diffChunkDoc + << " when trying to load differing chunks" << std::endl; + continue; + } - // Open a cursor for the diff chunks - std::auto_ptr<DBClientCursor> cursor = conn->query( - ChunkType::ConfigNS, diffQuery, 0, 0, 0, 0, ( DEBUG_BUILD ? 2 : 1000000 ) ); - verify( cursor.get() ); + if (!chunkVersion.isSet() || !chunkVersion.hasEqualEpoch(currEpoch)) { + using namespace logger; + LogstreamBuilder( + globalLogDomain(), getThreadName(), LogSeverity::Warning(), LogComponent::kSharding) + << "got invalid chunk version " << chunkVersion << " in document " << diffChunkDoc + << " when trying to load differing chunks at version " + << ChunkVersion(_maxVersion->majorVersion(), _maxVersion->minorVersion(), currEpoch) + << std::endl; - int diff = calculateConfigDiff( *cursor.get() ); + // Don't keep loading, since we know we'll be broken here + return -1; + } - conn.done(); + _validDiffs++; - return diff; - } - catch( DBException& e ){ - // Should only happen on connection errors - e.addContext( str::stream() << "could not calculate config difference for ns " << _ns << " on " << config ); - throw; - } - } + // Get max changed version and chunk version + if (chunkVersion > *_maxVersion) + *_maxVersion = chunkVersion; - template < class ValType, class ShardType > - int ConfigDiffTracker<ValType,ShardType>:: - calculateConfigDiff( DBClientCursorInterface& diffCursor ) - { - verifyAttached(); - - // Apply the chunk changes to the ranges and versions - - // - // Overall idea here is to work in two steps : - // 1. For all the new chunks we find, increment the maximum version per-shard and - // per-collection, and remove any conflicting chunks from the ranges - // 2. For all the new chunks we're interested in (all of them for mongos, just chunks on the - // shard for mongod) add them to the ranges - // - - std::vector<BSONObj> newTracked; - // Store epoch now so it doesn't change when we change max - OID currEpoch = _maxVersion->epoch(); - - _validDiffs = 0; - while( diffCursor.more() ){ - - BSONObj diffChunkDoc = diffCursor.next(); - - ChunkVersion chunkVersion = ChunkVersion::fromBSON(diffChunkDoc, ChunkType::DEPRECATED_lastmod()); - - if( diffChunkDoc[ChunkType::min()].type() != Object || - diffChunkDoc[ChunkType::max()].type() != Object || - diffChunkDoc[ChunkType::shard()].type() != String ) - { - using namespace logger; - LogstreamBuilder(globalLogDomain(), getThreadName(), LogSeverity::Warning(), - LogComponent::kSharding) - << "got invalid chunk document " << diffChunkDoc - << " when trying to load differing chunks" << std::endl; - continue; - } - - if( ! chunkVersion.isSet() || ! chunkVersion.hasEqualEpoch( currEpoch ) ){ - - using namespace logger; - LogstreamBuilder(globalLogDomain(), getThreadName(), LogSeverity::Warning(), - LogComponent::kSharding) - << "got invalid chunk version " << chunkVersion << " in document " << diffChunkDoc - << " when trying to load differing chunks at version " - << ChunkVersion( _maxVersion->majorVersion(), - _maxVersion->minorVersion(), - currEpoch ) << std::endl; - - // Don't keep loading, since we know we'll be broken here - return -1; - } - - _validDiffs++; - - // Get max changed version and chunk version - if( chunkVersion > *_maxVersion ) *_maxVersion = chunkVersion; - - // Chunk version changes - ShardType shard = shardFor( diffChunkDoc[ChunkType::shard()].String() ); - typename std::map<ShardType, ChunkVersion>::iterator shardVersionIt = _maxShardVersions->find( shard ); - if( shardVersionIt == _maxShardVersions->end() || shardVersionIt->second < chunkVersion ){ - (*_maxShardVersions)[ shard ] = chunkVersion; - } - - // See if we need to remove any chunks we are currently tracking b/c of this chunk's changes - removeOverlapping(diffChunkDoc[ChunkType::min()].Obj(), - diffChunkDoc[ChunkType::max()].Obj()); - - // Figure out which of the new chunks we need to track - // Important - we need to actually own this doc, in case the cursor decides to getMore or unbuffer - if( isTracked( diffChunkDoc ) ) newTracked.push_back( diffChunkDoc.getOwned() ); + // Chunk version changes + ShardType shard = shardFor(diffChunkDoc[ChunkType::shard()].String()); + typename std::map<ShardType, ChunkVersion>::iterator shardVersionIt = + _maxShardVersions->find(shard); + if (shardVersionIt == _maxShardVersions->end() || shardVersionIt->second < chunkVersion) { + (*_maxShardVersions)[shard] = chunkVersion; } - using namespace logger; - if (globalLogDomain()->shouldLog(LogComponent::kSharding, LogSeverity::Debug(3))) { - LogstreamBuilder(globalLogDomain(), getThreadName(), LogSeverity::Debug(3), - LogComponent::kSharding) - << "found " << _validDiffs - << " new chunks for collection " << _ns - << " (tracking " << newTracked.size() - << "), new version is " << *_maxVersion - << std::endl; - } + // See if we need to remove any chunks we are currently tracking b/c of this chunk's changes + removeOverlapping(diffChunkDoc[ChunkType::min()].Obj(), + diffChunkDoc[ChunkType::max()].Obj()); - for( std::vector<BSONObj>::iterator it = newTracked.begin(); it != newTracked.end(); it++ ){ + // Figure out which of the new chunks we need to track + // Important - we need to actually own this doc, in case the cursor decides to getMore or unbuffer + if (isTracked(diffChunkDoc)) + newTracked.push_back(diffChunkDoc.getOwned()); + } - BSONObj chunkDoc = *it; + using namespace logger; + if (globalLogDomain()->shouldLog(LogComponent::kSharding, LogSeverity::Debug(3))) { + LogstreamBuilder( + globalLogDomain(), getThreadName(), LogSeverity::Debug(3), LogComponent::kSharding) + << "found " << _validDiffs << " new chunks for collection " << _ns << " (tracking " + << newTracked.size() << "), new version is " << *_maxVersion << std::endl; + } - // Important - we need to make sure we actually own the min and max here - BSONObj min = chunkDoc[ChunkType::min()].Obj().getOwned(); - BSONObj max = chunkDoc[ChunkType::max()].Obj().getOwned(); + for (std::vector<BSONObj>::iterator it = newTracked.begin(); it != newTracked.end(); it++) { + BSONObj chunkDoc = *it; - // Invariant enforced by sharding - // It's possible to read inconsistent state b/c of getMore() and yielding, so we want - // to detect as early as possible. - // TODO: This checks for overlap, we also should check for holes here iff we're tracking - // all chunks - if( isOverlapping( min, max ) ) return -1; + // Important - we need to make sure we actually own the min and max here + BSONObj min = chunkDoc[ChunkType::min()].Obj().getOwned(); + BSONObj max = chunkDoc[ChunkType::max()].Obj().getOwned(); - _currMap->insert( rangeFor( chunkDoc, min, max ) ); - } + // Invariant enforced by sharding + // It's possible to read inconsistent state b/c of getMore() and yielding, so we want + // to detect as early as possible. + // TODO: This checks for overlap, we also should check for holes here iff we're tracking + // all chunks + if (isOverlapping(min, max)) + return -1; - return _validDiffs; + _currMap->insert(rangeFor(chunkDoc, min, max)); } - template<class ValType, class ShardType> - Query ConfigDiffTracker<ValType, ShardType>::configDiffQuery() const { - - verifyAttached(); - - // - // Basic idea behind the query is to find all the chunks $gte the current max version. - // Currently, any splits and merges will increment the current max version. - // - - BSONObjBuilder queryB; - queryB.append(ChunkType::ns(), _ns); - BSONObjBuilder tsBuilder(queryB.subobjStart(ChunkType::DEPRECATED_lastmod())); - tsBuilder.appendTimestamp( "$gte", _maxVersion->toLong() ); - tsBuilder.done(); - BSONObj query = queryB.obj(); - - // - // NOTE: IT IS IMPORTANT FOR CONSISTENCY THAT WE SORT BY ASC VERSION, TO HANDLE - // CURSOR YIELDING BETWEEN CHUNKS BEING MIGRATED. - // - // This ensures that changes to chunk version (which will always be higher) will always - // come *after* our current position in the chunk cursor. - // - - Query queryObj(query); - queryObj.sort(BSON( "lastmod" << 1 )); - - using namespace logger; - if (globalLogDomain()->shouldLog(LogComponent::kSharding, LogSeverity::Debug(2))) { - LogstreamBuilder(globalLogDomain(), getThreadName(), LogSeverity::Debug(2), - LogComponent::kSharding) - << "major version query from " << *_maxVersion << " and over " - << _maxShardVersions->size() << " shards is " << queryObj << std::endl; - } - - return queryObj; + return _validDiffs; +} + +template <class ValType, class ShardType> +Query ConfigDiffTracker<ValType, ShardType>::configDiffQuery() const { + verifyAttached(); + + // + // Basic idea behind the query is to find all the chunks $gte the current max version. + // Currently, any splits and merges will increment the current max version. + // + + BSONObjBuilder queryB; + queryB.append(ChunkType::ns(), _ns); + BSONObjBuilder tsBuilder(queryB.subobjStart(ChunkType::DEPRECATED_lastmod())); + tsBuilder.appendTimestamp("$gte", _maxVersion->toLong()); + tsBuilder.done(); + BSONObj query = queryB.obj(); + + // + // NOTE: IT IS IMPORTANT FOR CONSISTENCY THAT WE SORT BY ASC VERSION, TO HANDLE + // CURSOR YIELDING BETWEEN CHUNKS BEING MIGRATED. + // + // This ensures that changes to chunk version (which will always be higher) will always + // come *after* our current position in the chunk cursor. + // + + Query queryObj(query); + queryObj.sort(BSON("lastmod" << 1)); + + using namespace logger; + if (globalLogDomain()->shouldLog(LogComponent::kSharding, LogSeverity::Debug(2))) { + LogstreamBuilder( + globalLogDomain(), getThreadName(), LogSeverity::Debug(2), LogComponent::kSharding) + << "major version query from " << *_maxVersion << " and over " + << _maxShardVersions->size() << " shards is " << queryObj << std::endl; } -} // namespace mongo + return queryObj; +} +} // namespace mongo |