summaryrefslogtreecommitdiff
path: root/src/mongo/s/chunk_diff-inl.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/chunk_diff-inl.h')
-rw-r--r--src/mongo/s/chunk_diff-inl.h386
1 files changed, 187 insertions, 199 deletions
diff --git a/src/mongo/s/chunk_diff-inl.h b/src/mongo/s/chunk_diff-inl.h
index 0f88f2517b7..0c89d417000 100644
--- a/src/mongo/s/chunk_diff-inl.h
+++ b/src/mongo/s/chunk_diff-inl.h
@@ -39,229 +39,217 @@
namespace mongo {
- template < class ValType, class ShardType >
- bool ConfigDiffTracker<ValType,ShardType>::
- isOverlapping( const BSONObj& min, const BSONObj& max )
- {
- RangeOverlap overlap = overlappingRange( min, max );
-
- return overlap.first != overlap.second;
+template <class ValType, class ShardType>
+bool ConfigDiffTracker<ValType, ShardType>::isOverlapping(const BSONObj& min, const BSONObj& max) {
+ RangeOverlap overlap = overlappingRange(min, max);
+
+ return overlap.first != overlap.second;
+}
+
+template <class ValType, class ShardType>
+void ConfigDiffTracker<ValType, ShardType>::removeOverlapping(const BSONObj& min,
+ const BSONObj& max) {
+ verifyAttached();
+
+ RangeOverlap overlap = overlappingRange(min, max);
+
+ _currMap->erase(overlap.first, overlap.second);
+}
+
+template <class ValType, class ShardType>
+typename ConfigDiffTracker<ValType, ShardType>::RangeOverlap
+ConfigDiffTracker<ValType, ShardType>::overlappingRange(const BSONObj& min, const BSONObj& max) {
+ verifyAttached();
+
+ typename RangeMap::iterator low;
+ typename RangeMap::iterator high;
+
+ if (isMinKeyIndexed()) {
+ // Returns the first chunk with a min key that is >= min - implies the
+ // previous chunk cannot overlap min
+ low = _currMap->lower_bound(min);
+ // Returns the first chunk with a min key that is >= max - implies the
+ // chunk does not overlap max
+ high = _currMap->lower_bound(max);
+ } else {
+ // Returns the first chunk with a max key that is > min - implies that
+ // the chunk overlaps min
+ low = _currMap->upper_bound(min);
+ // Returns the first chunk with a max key that is > max - implies that
+ // the next chunk cannot not overlap max
+ high = _currMap->upper_bound(max);
}
- template < class ValType, class ShardType >
- void ConfigDiffTracker<ValType,ShardType>::
- removeOverlapping( const BSONObj& min, const BSONObj& max )
- {
- verifyAttached();
-
- RangeOverlap overlap = overlappingRange( min, max );
+ return RangeOverlap(low, high);
+}
- _currMap->erase( overlap.first, overlap.second );
- }
+template <class ValType, class ShardType>
+int ConfigDiffTracker<ValType, ShardType>::calculateConfigDiff(const std::string& config) {
+ verifyAttached();
- template < class ValType, class ShardType >
- typename ConfigDiffTracker<ValType,ShardType>::RangeOverlap ConfigDiffTracker<ValType,ShardType>::
- overlappingRange( const BSONObj& min, const BSONObj& max )
- {
- verifyAttached();
-
- typename RangeMap::iterator low;
- typename RangeMap::iterator high;
-
- if( isMinKeyIndexed() ){
- // Returns the first chunk with a min key that is >= min - implies the
- // previous chunk cannot overlap min
- low = _currMap->lower_bound( min );
- // Returns the first chunk with a min key that is >= max - implies the
- // chunk does not overlap max
- high = _currMap->lower_bound( max );
- }
- else{
- // Returns the first chunk with a max key that is > min - implies that
- // the chunk overlaps min
- low = _currMap->upper_bound( min );
- // Returns the first chunk with a max key that is > max - implies that
- // the next chunk cannot not overlap max
- high = _currMap->upper_bound( max );
- }
+ // Get the diff query required
+ Query diffQuery = configDiffQuery();
- return RangeOverlap( low, high );
- }
+ ScopedDbConnection conn(config, 30.0);
- template<class ValType, class ShardType>
- int ConfigDiffTracker<ValType, ShardType>::calculateConfigDiff(const std::string& config) {
- verifyAttached();
+ try {
+ // Open a cursor for the diff chunks
+ std::auto_ptr<DBClientCursor> cursor =
+ conn->query(ChunkType::ConfigNS, diffQuery, 0, 0, 0, 0, (DEBUG_BUILD ? 2 : 1000000));
+ verify(cursor.get());
- // Get the diff query required
- Query diffQuery = configDiffQuery();
+ int diff = calculateConfigDiff(*cursor.get());
- ScopedDbConnection conn(config, 30.0);
+ conn.done();
- try {
+ return diff;
+ } catch (DBException& e) {
+ // Should only happen on connection errors
+ e.addContext(str::stream() << "could not calculate config difference for ns " << _ns
+ << " on " << config);
+ throw;
+ }
+}
+
+template <class ValType, class ShardType>
+int ConfigDiffTracker<ValType, ShardType>::calculateConfigDiff(
+ DBClientCursorInterface& diffCursor) {
+ verifyAttached();
+
+ // Apply the chunk changes to the ranges and versions
+
+ //
+ // Overall idea here is to work in two steps :
+ // 1. For all the new chunks we find, increment the maximum version per-shard and
+ // per-collection, and remove any conflicting chunks from the ranges
+ // 2. For all the new chunks we're interested in (all of them for mongos, just chunks on the
+ // shard for mongod) add them to the ranges
+ //
+
+ std::vector<BSONObj> newTracked;
+ // Store epoch now so it doesn't change when we change max
+ OID currEpoch = _maxVersion->epoch();
+
+ _validDiffs = 0;
+ while (diffCursor.more()) {
+ BSONObj diffChunkDoc = diffCursor.next();
+
+ ChunkVersion chunkVersion =
+ ChunkVersion::fromBSON(diffChunkDoc, ChunkType::DEPRECATED_lastmod());
+
+ if (diffChunkDoc[ChunkType::min()].type() != Object ||
+ diffChunkDoc[ChunkType::max()].type() != Object ||
+ diffChunkDoc[ChunkType::shard()].type() != String) {
+ using namespace logger;
+ LogstreamBuilder(
+ globalLogDomain(), getThreadName(), LogSeverity::Warning(), LogComponent::kSharding)
+ << "got invalid chunk document " << diffChunkDoc
+ << " when trying to load differing chunks" << std::endl;
+ continue;
+ }
- // Open a cursor for the diff chunks
- std::auto_ptr<DBClientCursor> cursor = conn->query(
- ChunkType::ConfigNS, diffQuery, 0, 0, 0, 0, ( DEBUG_BUILD ? 2 : 1000000 ) );
- verify( cursor.get() );
+ if (!chunkVersion.isSet() || !chunkVersion.hasEqualEpoch(currEpoch)) {
+ using namespace logger;
+ LogstreamBuilder(
+ globalLogDomain(), getThreadName(), LogSeverity::Warning(), LogComponent::kSharding)
+ << "got invalid chunk version " << chunkVersion << " in document " << diffChunkDoc
+ << " when trying to load differing chunks at version "
+ << ChunkVersion(_maxVersion->majorVersion(), _maxVersion->minorVersion(), currEpoch)
+ << std::endl;
- int diff = calculateConfigDiff( *cursor.get() );
+ // Don't keep loading, since we know we'll be broken here
+ return -1;
+ }
- conn.done();
+ _validDiffs++;
- return diff;
- }
- catch( DBException& e ){
- // Should only happen on connection errors
- e.addContext( str::stream() << "could not calculate config difference for ns " << _ns << " on " << config );
- throw;
- }
- }
+ // Get max changed version and chunk version
+ if (chunkVersion > *_maxVersion)
+ *_maxVersion = chunkVersion;
- template < class ValType, class ShardType >
- int ConfigDiffTracker<ValType,ShardType>::
- calculateConfigDiff( DBClientCursorInterface& diffCursor )
- {
- verifyAttached();
-
- // Apply the chunk changes to the ranges and versions
-
- //
- // Overall idea here is to work in two steps :
- // 1. For all the new chunks we find, increment the maximum version per-shard and
- // per-collection, and remove any conflicting chunks from the ranges
- // 2. For all the new chunks we're interested in (all of them for mongos, just chunks on the
- // shard for mongod) add them to the ranges
- //
-
- std::vector<BSONObj> newTracked;
- // Store epoch now so it doesn't change when we change max
- OID currEpoch = _maxVersion->epoch();
-
- _validDiffs = 0;
- while( diffCursor.more() ){
-
- BSONObj diffChunkDoc = diffCursor.next();
-
- ChunkVersion chunkVersion = ChunkVersion::fromBSON(diffChunkDoc, ChunkType::DEPRECATED_lastmod());
-
- if( diffChunkDoc[ChunkType::min()].type() != Object ||
- diffChunkDoc[ChunkType::max()].type() != Object ||
- diffChunkDoc[ChunkType::shard()].type() != String )
- {
- using namespace logger;
- LogstreamBuilder(globalLogDomain(), getThreadName(), LogSeverity::Warning(),
- LogComponent::kSharding)
- << "got invalid chunk document " << diffChunkDoc
- << " when trying to load differing chunks" << std::endl;
- continue;
- }
-
- if( ! chunkVersion.isSet() || ! chunkVersion.hasEqualEpoch( currEpoch ) ){
-
- using namespace logger;
- LogstreamBuilder(globalLogDomain(), getThreadName(), LogSeverity::Warning(),
- LogComponent::kSharding)
- << "got invalid chunk version " << chunkVersion << " in document " << diffChunkDoc
- << " when trying to load differing chunks at version "
- << ChunkVersion( _maxVersion->majorVersion(),
- _maxVersion->minorVersion(),
- currEpoch ) << std::endl;
-
- // Don't keep loading, since we know we'll be broken here
- return -1;
- }
-
- _validDiffs++;
-
- // Get max changed version and chunk version
- if( chunkVersion > *_maxVersion ) *_maxVersion = chunkVersion;
-
- // Chunk version changes
- ShardType shard = shardFor( diffChunkDoc[ChunkType::shard()].String() );
- typename std::map<ShardType, ChunkVersion>::iterator shardVersionIt = _maxShardVersions->find( shard );
- if( shardVersionIt == _maxShardVersions->end() || shardVersionIt->second < chunkVersion ){
- (*_maxShardVersions)[ shard ] = chunkVersion;
- }
-
- // See if we need to remove any chunks we are currently tracking b/c of this chunk's changes
- removeOverlapping(diffChunkDoc[ChunkType::min()].Obj(),
- diffChunkDoc[ChunkType::max()].Obj());
-
- // Figure out which of the new chunks we need to track
- // Important - we need to actually own this doc, in case the cursor decides to getMore or unbuffer
- if( isTracked( diffChunkDoc ) ) newTracked.push_back( diffChunkDoc.getOwned() );
+ // Chunk version changes
+ ShardType shard = shardFor(diffChunkDoc[ChunkType::shard()].String());
+ typename std::map<ShardType, ChunkVersion>::iterator shardVersionIt =
+ _maxShardVersions->find(shard);
+ if (shardVersionIt == _maxShardVersions->end() || shardVersionIt->second < chunkVersion) {
+ (*_maxShardVersions)[shard] = chunkVersion;
}
- using namespace logger;
- if (globalLogDomain()->shouldLog(LogComponent::kSharding, LogSeverity::Debug(3))) {
- LogstreamBuilder(globalLogDomain(), getThreadName(), LogSeverity::Debug(3),
- LogComponent::kSharding)
- << "found " << _validDiffs
- << " new chunks for collection " << _ns
- << " (tracking " << newTracked.size()
- << "), new version is " << *_maxVersion
- << std::endl;
- }
+ // See if we need to remove any chunks we are currently tracking b/c of this chunk's changes
+ removeOverlapping(diffChunkDoc[ChunkType::min()].Obj(),
+ diffChunkDoc[ChunkType::max()].Obj());
- for( std::vector<BSONObj>::iterator it = newTracked.begin(); it != newTracked.end(); it++ ){
+ // Figure out which of the new chunks we need to track
+ // Important - we need to actually own this doc, in case the cursor decides to getMore or unbuffer
+ if (isTracked(diffChunkDoc))
+ newTracked.push_back(diffChunkDoc.getOwned());
+ }
- BSONObj chunkDoc = *it;
+ using namespace logger;
+ if (globalLogDomain()->shouldLog(LogComponent::kSharding, LogSeverity::Debug(3))) {
+ LogstreamBuilder(
+ globalLogDomain(), getThreadName(), LogSeverity::Debug(3), LogComponent::kSharding)
+ << "found " << _validDiffs << " new chunks for collection " << _ns << " (tracking "
+ << newTracked.size() << "), new version is " << *_maxVersion << std::endl;
+ }
- // Important - we need to make sure we actually own the min and max here
- BSONObj min = chunkDoc[ChunkType::min()].Obj().getOwned();
- BSONObj max = chunkDoc[ChunkType::max()].Obj().getOwned();
+ for (std::vector<BSONObj>::iterator it = newTracked.begin(); it != newTracked.end(); it++) {
+ BSONObj chunkDoc = *it;
- // Invariant enforced by sharding
- // It's possible to read inconsistent state b/c of getMore() and yielding, so we want
- // to detect as early as possible.
- // TODO: This checks for overlap, we also should check for holes here iff we're tracking
- // all chunks
- if( isOverlapping( min, max ) ) return -1;
+ // Important - we need to make sure we actually own the min and max here
+ BSONObj min = chunkDoc[ChunkType::min()].Obj().getOwned();
+ BSONObj max = chunkDoc[ChunkType::max()].Obj().getOwned();
- _currMap->insert( rangeFor( chunkDoc, min, max ) );
- }
+ // Invariant enforced by sharding
+ // It's possible to read inconsistent state b/c of getMore() and yielding, so we want
+ // to detect as early as possible.
+ // TODO: This checks for overlap, we also should check for holes here iff we're tracking
+ // all chunks
+ if (isOverlapping(min, max))
+ return -1;
- return _validDiffs;
+ _currMap->insert(rangeFor(chunkDoc, min, max));
}
- template<class ValType, class ShardType>
- Query ConfigDiffTracker<ValType, ShardType>::configDiffQuery() const {
-
- verifyAttached();
-
- //
- // Basic idea behind the query is to find all the chunks $gte the current max version.
- // Currently, any splits and merges will increment the current max version.
- //
-
- BSONObjBuilder queryB;
- queryB.append(ChunkType::ns(), _ns);
- BSONObjBuilder tsBuilder(queryB.subobjStart(ChunkType::DEPRECATED_lastmod()));
- tsBuilder.appendTimestamp( "$gte", _maxVersion->toLong() );
- tsBuilder.done();
- BSONObj query = queryB.obj();
-
- //
- // NOTE: IT IS IMPORTANT FOR CONSISTENCY THAT WE SORT BY ASC VERSION, TO HANDLE
- // CURSOR YIELDING BETWEEN CHUNKS BEING MIGRATED.
- //
- // This ensures that changes to chunk version (which will always be higher) will always
- // come *after* our current position in the chunk cursor.
- //
-
- Query queryObj(query);
- queryObj.sort(BSON( "lastmod" << 1 ));
-
- using namespace logger;
- if (globalLogDomain()->shouldLog(LogComponent::kSharding, LogSeverity::Debug(2))) {
- LogstreamBuilder(globalLogDomain(), getThreadName(), LogSeverity::Debug(2),
- LogComponent::kSharding)
- << "major version query from " << *_maxVersion << " and over "
- << _maxShardVersions->size() << " shards is " << queryObj << std::endl;
- }
-
- return queryObj;
+ return _validDiffs;
+}
+
+template <class ValType, class ShardType>
+Query ConfigDiffTracker<ValType, ShardType>::configDiffQuery() const {
+ verifyAttached();
+
+ //
+ // Basic idea behind the query is to find all the chunks $gte the current max version.
+ // Currently, any splits and merges will increment the current max version.
+ //
+
+ BSONObjBuilder queryB;
+ queryB.append(ChunkType::ns(), _ns);
+ BSONObjBuilder tsBuilder(queryB.subobjStart(ChunkType::DEPRECATED_lastmod()));
+ tsBuilder.appendTimestamp("$gte", _maxVersion->toLong());
+ tsBuilder.done();
+ BSONObj query = queryB.obj();
+
+ //
+ // NOTE: IT IS IMPORTANT FOR CONSISTENCY THAT WE SORT BY ASC VERSION, TO HANDLE
+ // CURSOR YIELDING BETWEEN CHUNKS BEING MIGRATED.
+ //
+ // This ensures that changes to chunk version (which will always be higher) will always
+ // come *after* our current position in the chunk cursor.
+ //
+
+ Query queryObj(query);
+ queryObj.sort(BSON("lastmod" << 1));
+
+ using namespace logger;
+ if (globalLogDomain()->shouldLog(LogComponent::kSharding, LogSeverity::Debug(2))) {
+ LogstreamBuilder(
+ globalLogDomain(), getThreadName(), LogSeverity::Debug(2), LogComponent::kSharding)
+ << "major version query from " << *_maxVersion << " and over "
+ << _maxShardVersions->size() << " shards is " << queryObj << std::endl;
}
-} // namespace mongo
+ return queryObj;
+}
+} // namespace mongo