/** * Copyright (C) 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault #include "mongo/platform/basic.h" #include "mongo/client/parallel.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/operation_context_impl.h" #include "mongo/dbtests/config_server_fixture.h" #include "mongo/dbtests/dbtests.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/chunk_diff.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_version.h" #include "mongo/s/config.h" namespace ShardingTests { using std::shared_ptr; using std::unique_ptr; using std::make_pair; using std::map; using std::pair; using std::set; using std::string; using std::vector; static int rand( int max = -1 ){ static unsigned seed = 1337; #if !defined(_WIN32) int r = rand_r( &seed ) ; #else int r = ::rand(); // seed not used in this case #endif // Modding is bad, but don't really care in this case return max > 0 ? r % max : r; } // // Converts array of raw BSONObj chunks to a vector of ChunkType // void convertBSONArrayToChunkTypes(const BSONArray& chunksArray, std::vector* chunksVector) { for (const BSONElement& obj : chunksArray) { auto chunkTypeRes = ChunkType::fromBSON(obj.Obj()); ASSERT(chunkTypeRes.isOK()); chunksVector->push_back(chunkTypeRes.getValue()); } } // // Sets up a basic environment for loading chunks to/from the direct database connection // Redirects connections to the direct database for the duration of the test. // class ChunkManagerTest : public ConnectionString::ConnectionHook { public: ChunkManagerTest() : _client(&_txn) { shardConnectionPool.clear(); DBException::traceExceptions = true; // Make all connections redirect to the direct client ConnectionString::setConnectionHook( this ); // Create the default config database before querying, necessary for direct connections _client.dropDatabase( "config" ); _client.insert( "config.test", BSON( "hello" << "world" ) ); _client.dropCollection( "config.test" ); _client.dropDatabase( nsGetDB( collName() ) ); _client.insert( collName(), BSON( "hello" << "world" ) ); _client.dropCollection( collName() ); _shardId = "shard0000"; // Since we've redirected the conns, the host doesn't matter here so long as it's // prefixed with a "$" Shard shard(_shardId, ConnectionString(HostAndPort("$hostFooBar:27017"))); // Need to run this to ensure the shard is in the global lookup table Shard::installShard(_shardId, shard); // Add dummy shard to config DB _client.insert(ShardType::ConfigNS, BSON(ShardType::name() << _shardId << ShardType::host() << shard.getConnString().toString())); // Create an index so that diffing works correctly, otherwise no cursors from S&O ASSERT_OK(dbtests::createIndex( &_txn, ChunkType::ConfigNS, BSON( ChunkType::ns() << 1 << // br ChunkType::DEPRECATED_lastmod() << 1 ) )); } virtual ~ChunkManagerTest() { // Reset the redirection ConnectionString::setConnectionHook( NULL ); } string collName(){ return "foo.bar"; } virtual DBClientBase* connect( const ConnectionString& connStr, string& errmsg, double socketTimeout ) { // Note - must be new, since it gets owned elsewhere return new CustomDirectClient(&_txn); } protected: OperationContextImpl _txn; CustomDirectClient _client; ShardId _shardId; }; // // Tests creating a new chunk manager and creating the default chunks // class ChunkManagerCreateBasicTest : public ChunkManagerTest { public: void run(){ ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); ChunkManager manager(collName(), shardKeyPattern, false); manager.createFirstChunks(_shardId, NULL, NULL ); BSONObj firstChunk = _client.findOne(ChunkType::ConfigNS, BSONObj()).getOwned(); ASSERT(firstChunk[ChunkType::min()].Obj()[ "_id" ].type() == MinKey ); ASSERT(firstChunk[ChunkType::max()].Obj()[ "_id" ].type() == MaxKey ); ChunkVersion version = ChunkVersion::fromBSON(firstChunk, ChunkType::DEPRECATED_lastmod()); ASSERT( version.majorVersion() == 1 ); ASSERT( version.minorVersion() == 0 ); ASSERT( version.isEpochSet() ); } }; // // Tests creating a new chunk manager with random split points. Creating chunks on multiple shards is not // tested here since there are unresolved race conditions there and probably should be avoided if at all // possible. // class ChunkManagerCreateFullTest : public ChunkManagerTest { public: static const int numSplitPoints = 100; void genRandomSplitPoints( vector* splitPoints ){ for( int i = 0; i < numSplitPoints; i++ ){ splitPoints->push_back( rand( numSplitPoints * 10 ) ); } } void genRandomSplitKeys( const string& keyName, vector* splitKeys ){ vector splitPoints; genRandomSplitPoints( &splitPoints ); for( vector::iterator it = splitPoints.begin(); it != splitPoints.end(); ++it ){ splitKeys->push_back( BSON( keyName << *it ) ); } } // Uses a chunk manager to create chunks void createChunks( const string& keyName ){ vector splitKeys; genRandomSplitKeys( keyName, &splitKeys ); ShardKeyPattern shardKeyPattern(BSON(keyName << 1)); ChunkManager manager(collName(), shardKeyPattern, false); manager.createFirstChunks(_shardId, &splitKeys, NULL ); } void run(){ string keyName = "_id"; createChunks( keyName ); unique_ptr cursor = _client.query(ChunkType::ConfigNS, QUERY(ChunkType::ns(collName()))); set minorVersions; OID epoch; // Check that all chunks were created with version 1|x with consistent epoch and unique minor versions while( cursor->more() ){ BSONObj chunk = cursor->next(); ChunkVersion version = ChunkVersion::fromBSON(chunk, ChunkType::DEPRECATED_lastmod()); ASSERT( version.majorVersion() == 1 ); ASSERT( version.isEpochSet() ); if( ! epoch.isSet() ) epoch = version.epoch(); ASSERT( version.epoch() == epoch ); ASSERT( minorVersions.find( version.minorVersion() ) == minorVersions.end() ); minorVersions.insert( version.minorVersion() ); ASSERT(chunk[ChunkType::shard()].String() == _shardId); } } }; // // Tests that chunks are loaded correctly from the db with no a-priori info and also that they can be reloaded // on top of an old chunk manager with changes. // class ChunkManagerLoadBasicTest : public ChunkManagerCreateFullTest { public: void run(){ string keyName = "_id"; createChunks( keyName ); int numChunks = static_cast(_client.count(ChunkType::ConfigNS, BSON(ChunkType::ns(collName())))); BSONObj firstChunk = _client.findOne(ChunkType::ConfigNS, BSONObj()).getOwned(); ChunkVersion version = ChunkVersion::fromBSON(firstChunk, ChunkType::DEPRECATED_lastmod()); // Make manager load existing chunks CollectionType collType; collType.setNs(NamespaceString{collName()}); collType.setEpoch(version.epoch()); collType.setUpdatedAt(jsTime()); collType.setKeyPattern(BSON("_id" << 1)); collType.setUnique(false); collType.setDropped(false); ChunkManager manager(collType); manager.loadExistingRanges(nullptr); ASSERT(manager.getVersion().epoch() == version.epoch()); ASSERT(manager.getVersion().minorVersion() == (numChunks - 1)); ASSERT(static_cast(manager.getChunkMap().size()) == numChunks); // Modify chunks collection BSONObjBuilder b; ChunkVersion laterVersion = ChunkVersion( 2, 1, version.epoch() ); laterVersion.addToBSON(b, ChunkType::DEPRECATED_lastmod()); _client.update(ChunkType::ConfigNS, BSONObj(), BSON( "$set" << b.obj())); // Make new manager load chunk diff ChunkManager newManager(manager.getns(), manager.getShardKeyPattern(), manager.isUnique()); newManager.loadExistingRanges(&manager); ASSERT( newManager.getVersion().toLong() == laterVersion.toLong() ); ASSERT( newManager.getVersion().epoch() == laterVersion.epoch() ); ASSERT( static_cast( newManager.getChunkMap().size() ) == numChunks ); } }; class ChunkDiffUnitTest { public: bool _inverse; typedef map RangeMap; typedef map VersionMap; ChunkDiffUnitTest( bool inverse ) : _inverse( inverse ) {} // The default pass-through adapter for using config diffs class DefaultDiffAdapter : public ConfigDiffTracker { public: DefaultDiffAdapter() {} virtual ~DefaultDiffAdapter() {} virtual bool isTracked(const ChunkType& chunk) const { return true; } virtual pair rangeFor(const ChunkType& chunk) const { return make_pair(chunk.getMin(), chunk.getMax()); } virtual string shardFor( const string& name ) const { return name; } }; // Inverts the storage order for chunks from min to max class InverseDiffAdapter : public DefaultDiffAdapter { public: InverseDiffAdapter() {} virtual ~InverseDiffAdapter() {} virtual bool isMinKeyIndexed() const { return false; } virtual pair rangeFor(const ChunkType& chunk) const { return make_pair(chunk.getMax(), chunk.getMin()); } }; // Allow validating with and without ranges (b/c our splits won't actually be updated by the diffs) void validate(const std::vector& chunks, ChunkVersion maxVersion, const VersionMap& maxShardVersions) { validate(chunks, NULL, maxVersion, maxShardVersions); } void validate(const std::vector& chunks, const RangeMap& ranges, ChunkVersion maxVersion, const VersionMap& maxShardVersions) { validate(chunks, (RangeMap*)&ranges, maxVersion, maxShardVersions); } // Validates that the ranges and versions are valid given the chunks void validate(const std::vector& chunks, RangeMap* ranges, ChunkVersion maxVersion, const VersionMap& maxShardVersions) { int chunkCount = chunks.size(); ChunkVersion foundMaxVersion; VersionMap foundMaxShardVersions; // // Validate that all the chunks are there and collect versions // for (const ChunkType& chunk : chunks) { if( ranges != NULL ){ // log() << "Validating chunk " << chunkDoc << " size : " << ranges->size() << " vs " << chunkCount << endl; RangeMap::iterator chunkRange = ranges->find(_inverse ? chunk.getMax() : chunk.getMin()); ASSERT( chunkRange != ranges->end() ); ASSERT(chunkRange->second.woCompare(_inverse ? chunk.getMin() : chunk.getMax()) == 0); } ChunkVersion version = ChunkVersion::fromBSON(chunk.toBSON()[ChunkType::DEPRECATED_lastmod()]); if( version > foundMaxVersion ) foundMaxVersion = version; ChunkVersion shardMaxVersion = foundMaxShardVersions[chunk.getShard()]; if( version > shardMaxVersion ) { foundMaxShardVersions[chunk.getShard()] = version; } } // Make sure all chunks are accounted for if( ranges != NULL ) ASSERT( chunkCount == (int) ranges->size() ); // log() << "Validating that all shard versions are up to date..." << endl; // Validate that all the versions are the same ASSERT( foundMaxVersion.equals( maxVersion ) ); for( VersionMap::iterator it = foundMaxShardVersions.begin(); it != foundMaxShardVersions.end(); it++ ){ ChunkVersion foundVersion = it->second; VersionMap::const_iterator maxIt = maxShardVersions.find( it->first ); ASSERT( maxIt != maxShardVersions.end() ); ASSERT( foundVersion.equals( maxIt->second ) ); } // Make sure all shards are accounted for ASSERT( foundMaxShardVersions.size() == maxShardVersions.size() ); } void run() { int numShards = 10; int numInitialChunks = 5; int maxChunks = 100000; // Needed to not overflow the BSONArray's max bytes int keySize = 2; BSONArrayBuilder chunksB; BSONObj lastSplitPt; ChunkVersion version( 1, 0, OID() ); // // Generate numChunks with a given key size over numShards // All chunks have double key values, so we can split them a bunch // for( int i = -1; i < numInitialChunks; i++ ){ BSONObjBuilder splitPtB; for( int k = 0; k < keySize; k++ ){ string field = string( "k" ) + string( 1, (char)('0' + k) ); if( i < 0 ) splitPtB.appendMinKey( field ); else if( i < numInitialChunks - 1 ) splitPtB.append( field, (double)i ); else splitPtB.appendMaxKey( field ); } BSONObj splitPt = splitPtB.obj(); if( i >= 0 ){ BSONObjBuilder chunkB; chunkB.append(ChunkType::name(), "$dummyname"); chunkB.append(ChunkType::ns(), "$dummyns"); chunkB.append(ChunkType::min(), lastSplitPt ); chunkB.append(ChunkType::max(), splitPt ); int shardNum = rand( numShards ); chunkB.append(ChunkType::shard(), "shard" + string( 1, (char)('A' + shardNum) ) ); rand( 2 ) ? version.incMajor() : version.incMinor(); version.addToBSON(chunkB, ChunkType::DEPRECATED_lastmod()); chunksB.append( chunkB.obj() ); } lastSplitPt = splitPt; } BSONArray chunks = chunksB.arr(); // log() << "Chunks generated : " << chunks << endl; // Setup the empty ranges and versions first RangeMap ranges; ChunkVersion maxVersion = ChunkVersion( 0, 0, OID() ); VersionMap maxShardVersions; // Create a differ which will track our progress std::shared_ptr< DefaultDiffAdapter > differ( _inverse ? new InverseDiffAdapter() : new DefaultDiffAdapter() ); differ->attach( "test", ranges, maxVersion, maxShardVersions ); std::vector chunksVector; convertBSONArrayToChunkTypes(chunks, &chunksVector); // Validate initial load differ->calculateConfigDiff(chunksVector); validate(chunksVector, ranges, maxVersion, maxShardVersions ); // Generate a lot of diffs, and keep validating that updating from the diffs always // gives us the right ranges and versions int numDiffs = 135; // Makes about 100000 chunks overall int numChunks = numInitialChunks; for( int i = 0; i < numDiffs; i++ ){ // log() << "Generating new diff... " << i << endl; BSONArrayBuilder diffsB; BSONArrayBuilder newChunksB; BSONObjIterator chunksIt( chunks ); while( chunksIt.more() ){ BSONObj chunk = chunksIt.next().Obj(); int randChoice = rand( 10 ); if( randChoice < 2 && numChunks < maxChunks ){ // Simulate a split // log() << " ...starting a split with chunk " << chunk << endl; BSONObjBuilder leftB; BSONObjBuilder rightB; BSONObjBuilder midB; for( int k = 0; k < keySize; k++ ){ string field = string( "k" ) + string( 1, (char)('0' + k) ); BSONType maxType = chunk[ChunkType::max()].Obj()[field].type(); double max = maxType == NumberDouble ? chunk["max"].Obj()[field].Number() : 0.0; BSONType minType = chunk[ChunkType::min()].Obj()[field].type(); double min = minType == NumberDouble ? chunk[ChunkType::min()].Obj()[field].Number() : 0.0; if( minType == MinKey ){ midB.append( field, max - 1.0 ); } else if( maxType == MaxKey ){ midB.append( field, min + 1.0 ); } else { midB.append( field, ( max + min ) / 2.0 ); } } BSONObj midPt = midB.obj(); // Only happens if we can't split the min chunk if( midPt.isEmpty() ) continue; leftB.append( chunk[ChunkType::min()] ); leftB.append(ChunkType::max(), midPt ); rightB.append(ChunkType::min(), midPt ); rightB.append(chunk[ChunkType::max()] ); // add required fields for ChunkType leftB.append(chunk[ChunkType::name()]); leftB.append(chunk[ChunkType::ns()]); rightB.append(chunk[ChunkType::name()]); rightB.append(chunk[ChunkType::ns()]); leftB.append(chunk[ChunkType::shard()] ); rightB.append(chunk[ChunkType::shard()] ); version.incMajor(); version._minor = 0; version.addToBSON(leftB, ChunkType::DEPRECATED_lastmod()); version.incMinor(); version.addToBSON(rightB, ChunkType::DEPRECATED_lastmod()); BSONObj left = leftB.obj(); BSONObj right = rightB.obj(); // log() << " ... split into " << left << " and " << right << endl; newChunksB.append( left ); newChunksB.append( right ); diffsB.append( right ); diffsB.append( left ); numChunks++; } else if( randChoice < 4 && chunksIt.more() ){ // Simulate a migrate // log() << " ...starting a migrate with chunk " << chunk << endl; BSONObj prevShardChunk; while( chunksIt.more() ){ prevShardChunk = chunksIt.next().Obj(); if( prevShardChunk[ChunkType::shard()].String() == chunk[ChunkType::shard()].String() ) break; // log() << "... appending chunk from diff shard: " << prevShardChunk << endl; newChunksB.append( prevShardChunk ); prevShardChunk = BSONObj(); } // We need to move between different shards, hence the weirdness in logic here if( ! prevShardChunk.isEmpty() ){ BSONObjBuilder newShardB; BSONObjBuilder prevShardB; newShardB.append(chunk[ChunkType::min()]); newShardB.append(chunk[ChunkType::max()]); prevShardB.append(prevShardChunk[ChunkType::min()]); prevShardB.append(prevShardChunk[ChunkType::max()]); // add required fields for ChunkType newShardB.append(chunk[ChunkType::name()]); newShardB.append(chunk[ChunkType::ns()]); prevShardB.append(chunk[ChunkType::name()]); prevShardB.append(chunk[ChunkType::ns()]); int shardNum = rand( numShards ); newShardB.append(ChunkType::shard(), "shard" + string( 1, (char)('A' + shardNum))); prevShardB.append(prevShardChunk[ChunkType::shard()]); version.incMajor(); version._minor = 0; version.addToBSON(newShardB, ChunkType::DEPRECATED_lastmod()); version.incMinor(); version.addToBSON(prevShardB, ChunkType::DEPRECATED_lastmod()); BSONObj newShard = newShardB.obj(); BSONObj prevShard = prevShardB.obj(); // log() << " ... migrated to " << newShard << " and updated " << prevShard << endl; newChunksB.append( newShard ); newChunksB.append( prevShard ); diffsB.append( newShard ); diffsB.append( prevShard ); } else{ // log() << "... appending chunk, no more left: " << chunk << endl; newChunksB.append( chunk ); } } else{ // log() << "Appending chunk : " << chunk << endl; newChunksB.append( chunk ); } } BSONArray diffs = diffsB.arr(); chunks = newChunksB.arr(); // log() << "Diffs generated : " << diffs << endl; // log() << "All chunks : " << chunks << endl; // Rarely entirely clear out our data if( rand( 10 ) < 1 ){ diffs = chunks; ranges.clear(); maxVersion = ChunkVersion( 0, 0, OID() ); maxShardVersions.clear(); } // log() << "Total number of chunks : " << numChunks << " iteration " << i << endl; std::vector chunksVector; convertBSONArrayToChunkTypes(chunks, &chunksVector); differ->calculateConfigDiff(chunksVector); validate(chunksVector, ranges, maxVersion, maxShardVersions ); } } }; class ChunkDiffUnitTestNormal : public ChunkDiffUnitTest { public: ChunkDiffUnitTestNormal() : ChunkDiffUnitTest( false ) {} }; class ChunkDiffUnitTestInverse : public ChunkDiffUnitTest { public: ChunkDiffUnitTestInverse() : ChunkDiffUnitTest( true ) {} }; class All : public Suite { public: All() : Suite( "sharding" ) { } void setupTests() { add< ChunkManagerCreateBasicTest >(); add< ChunkManagerCreateFullTest >(); add< ChunkManagerLoadBasicTest >(); add< ChunkDiffUnitTestNormal >(); add< ChunkDiffUnitTestInverse >(); } }; SuiteInstance myall; }