/** * Copyright (C) 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault #include "mongo/platform/basic.h" #include #include "mongo/client/dbclientmockcursor.h" #include "mongo/client/parallel.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/operation_context_impl.h" #include "mongo/dbtests/config_server_fixture.h" #include "mongo/dbtests/dbtests.h" #include "mongo/s/chunk_diff.h" #include "mongo/s/chunk_version.h" #include "mongo/s/config.h" #include "mongo/s/type_chunk.h" #include "mongo/s/type_collection.h" #include "mongo/util/log.h" namespace ShardingTests { using boost::shared_ptr; using std::auto_ptr; using std::make_pair; using std::map; using std::pair; using std::set; using std::string; using std::vector; namespace serverandquerytests { class test1 { public: void run() { ServerAndQuery a("foo:1", BSON("a" << GT << 0 << LTE << 100)); ServerAndQuery b("foo:1", BSON("a" << GT << 200 << LTE << 1000)); ASSERT(a < b); ASSERT(!(b < a)); set s; s.insert(a); s.insert(b); ASSERT_EQUALS((unsigned int)2, s.size()); } }; } static int rand(int max = -1) { static unsigned seed = 1337; #if !defined(_WIN32) int r = rand_r(&seed); #else int r = ::rand(); // seed not used in this case #endif // Modding is bad, but don't really care in this case return max > 0 ? r % max : r; } // // Sets up a basic environment for loading chunks to/from the direct database connection // Redirects connections to the direct database for the duration of the test. // class ChunkManagerTest : public ConnectionString::ConnectionHook { public: ChunkManagerTest() : _client(&_txn) { shardConnectionPool.clear(); DBException::traceExceptions = true; // Make all connections redirect to the direct client ConnectionString::setConnectionHook(this); // Create the default config database before querying, necessary for direct connections _client.dropDatabase("config"); _client.insert("config.test", BSON("hello" << "world")); _client.dropCollection("config.test"); _client.dropDatabase(nsGetDB(collName())); _client.insert(collName(), BSON("hello" << "world")); _client.dropCollection(collName()); // Since we've redirected the conns, the host doesn't matter here so long as it's // prefixed with a "$" _shard = Shard("shard0000", "$hostFooBar:27017", 0 /* maxSize */, false /* draining */); // Need to run this to ensure the shard is in the global lookup table Shard::installShard(_shard.getName(), _shard); // Create an index so that diffing works correctly, otherwise no cursors from S&O ASSERT_OK(dbtests::createIndex(&_txn, ChunkType::ConfigNS, BSON(ChunkType::ns() << 1 << // br ChunkType::DEPRECATED_lastmod() << 1))); configServer.init("$dummy:1000"); } virtual ~ChunkManagerTest() { // Reset the redirection ConnectionString::setConnectionHook(NULL); ScopedDbConnection::clearPool(); ShardConnection::clearPool(); } string collName() { return "foo.bar"; } Shard& shard() { return _shard; } virtual DBClientBase* connect(const ConnectionString& connStr, string& errmsg, double socketTimeout) { // Note - must be new, since it gets owned elsewhere return new CustomDirectClient(&_txn); } protected: OperationContextImpl _txn; CustomDirectClient _client; Shard _shard; }; // // Tests creating a new chunk manager and creating the default chunks // class ChunkManagerCreateBasicTest : public ChunkManagerTest { public: void run() { ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); ChunkManager manager(collName(), shardKeyPattern, false); manager.createFirstChunks(shard().getConnString(), shard(), NULL, NULL); BSONObj firstChunk = _client.findOne(ChunkType::ConfigNS, BSONObj()).getOwned(); ASSERT(firstChunk[ChunkType::min()].Obj()["_id"].type() == MinKey); ASSERT(firstChunk[ChunkType::max()].Obj()["_id"].type() == MaxKey); ChunkVersion version = ChunkVersion::fromBSON(firstChunk, ChunkType::DEPRECATED_lastmod()); ASSERT(version.majorVersion() == 1); ASSERT(version.minorVersion() == 0); ASSERT(version.isEpochSet()); } }; // // Tests creating a new chunk manager with random split points. Creating chunks on multiple shards // is not tested here since there are unresolved race conditions there and probably should be // avoided if at all possible. // class ChunkManagerCreateFullTest : public ChunkManagerTest { public: static const int numSplitPoints = 100; void genRandomSplitPoints(vector* splitPoints) { for (int i = 0; i < numSplitPoints; i++) { splitPoints->push_back(rand(numSplitPoints * 10)); } } void genRandomSplitKeys(const string& keyName, vector* splitKeys) { vector splitPoints; genRandomSplitPoints(&splitPoints); for (vector::iterator it = splitPoints.begin(); it != splitPoints.end(); ++it) { splitKeys->push_back(BSON(keyName << *it)); } } // Uses a chunk manager to create chunks void createChunks(const string& keyName) { vector splitKeys; genRandomSplitKeys(keyName, &splitKeys); ShardKeyPattern shardKeyPattern(BSON(keyName << 1)); ChunkManager manager(collName(), shardKeyPattern, false); manager.createFirstChunks(shard().getConnString(), shard(), &splitKeys, NULL); } void run() { string keyName = "_id"; createChunks(keyName); auto_ptr cursor = _client.query(ChunkType::ConfigNS, QUERY(ChunkType::ns(collName()))); set minorVersions; OID epoch; // Check that all chunks were created with version 1|x with consistent epoch and unique // minor versions while (cursor->more()) { BSONObj chunk = cursor->next(); ChunkVersion version = ChunkVersion::fromBSON(chunk, ChunkType::DEPRECATED_lastmod()); ASSERT(version.majorVersion() == 1); ASSERT(version.isEpochSet()); if (!epoch.isSet()) epoch = version.epoch(); ASSERT(version.epoch() == epoch); ASSERT(minorVersions.find(version.minorVersion()) == minorVersions.end()); minorVersions.insert(version.minorVersion()); ASSERT(chunk[ChunkType::shard()].String() == shard().getName()); } } }; // // Tests that chunks are loaded correctly from the db with no a-priori info and also that they can // be reloaded on top of an old chunk manager with changes. // class ChunkManagerLoadBasicTest : public ChunkManagerCreateFullTest { public: void run() { string keyName = "_id"; createChunks(keyName); int numChunks = static_cast(_client.count(ChunkType::ConfigNS, BSON(ChunkType::ns(collName())))); BSONObj firstChunk = _client.findOne(ChunkType::ConfigNS, BSONObj()).getOwned(); ChunkVersion version = ChunkVersion::fromBSON(firstChunk, ChunkType::DEPRECATED_lastmod()); // Make manager load existing chunks BSONObjBuilder collDocBuilder; collDocBuilder << CollectionType::ns(collName()); collDocBuilder << CollectionType::keyPattern(BSON("_id" << 1)); collDocBuilder << CollectionType::unique(false); collDocBuilder << CollectionType::dropped(false); collDocBuilder << CollectionType::DEPRECATED_lastmod(jsTime()); collDocBuilder << CollectionType::DEPRECATED_lastmodEpoch(version.epoch()); BSONObj collDoc(collDocBuilder.done()); ChunkManager manager(collDoc); manager.loadExistingRanges(shard().getConnString(), NULL); ASSERT(manager.getVersion().epoch() == version.epoch()); ASSERT(manager.getVersion().minorVersion() == (numChunks - 1)); ASSERT(static_cast(manager.getChunkMap().size()) == numChunks); // Modify chunks collection BSONObjBuilder b; ChunkVersion laterVersion = ChunkVersion(2, 1, version.epoch()); laterVersion.addToBSON(b, ChunkType::DEPRECATED_lastmod()); _client.update(ChunkType::ConfigNS, BSONObj(), BSON("$set" << b.obj())); // Make new manager load chunk diff ChunkManager newManager(manager.getns(), manager.getShardKeyPattern(), manager.isUnique()); newManager.loadExistingRanges(shard().getConnString(), &manager); ASSERT(newManager.getVersion().toLong() == laterVersion.toLong()); ASSERT(newManager.getVersion().epoch() == laterVersion.epoch()); ASSERT(static_cast(newManager.getChunkMap().size()) == numChunks); } }; class ChunkDiffUnitTest { public: bool _inverse; typedef map RangeMap; typedef map VersionMap; ChunkDiffUnitTest(bool inverse) : _inverse(inverse) {} // The default pass-through adapter for using config diffs class DefaultDiffAdapter : public ConfigDiffTracker { public: DefaultDiffAdapter() {} virtual ~DefaultDiffAdapter() {} virtual bool isTracked(const BSONObj& chunkDoc) const { return true; } virtual BSONObj maxFrom(const BSONObj& max) const { return max; } virtual pair rangeFor(const BSONObj& chunkDoc, const BSONObj& min, const BSONObj& max) const { return make_pair(min, max); } virtual string shardFor(const string& name) const { return name; } }; // Inverts the storage order for chunks from min to max class InverseDiffAdapter : public DefaultDiffAdapter { public: InverseDiffAdapter() {} virtual ~InverseDiffAdapter() {} // Disable virtual BSONObj maxFrom(const BSONObj& max) const { ASSERT(false); return max; } virtual BSONObj minFrom(const BSONObj& min) const { return min; } virtual bool isMinKeyIndexed() const { return false; } virtual pair rangeFor(const BSONObj& chunkDoc, const BSONObj& min, const BSONObj& max) const { return make_pair(max, min); } }; // Allow validating with and without ranges (b/c our splits won't actually be updated by the // diffs) void validate(BSONArray chunks, ChunkVersion maxVersion, const VersionMap& maxShardVersions) { validate(chunks, NULL, maxVersion, maxShardVersions); } void validate(BSONArray chunks, const RangeMap& ranges, ChunkVersion maxVersion, const VersionMap& maxShardVersions) { validate(chunks, (RangeMap*)&ranges, maxVersion, maxShardVersions); } // Validates that the ranges and versions are valid given the chunks void validate(const BSONArray& chunks, RangeMap* ranges, ChunkVersion maxVersion, const VersionMap& maxShardVersions) { BSONObjIterator it(chunks); int chunkCount = 0; ChunkVersion foundMaxVersion; VersionMap foundMaxShardVersions; // // Validate that all the chunks are there and collect versions // while (it.more()) { BSONObj chunkDoc = it.next().Obj(); chunkCount++; if (ranges != NULL) { // log() << "Validating chunk " << chunkDoc << " size : " << ranges->size() << " vs // " << chunkCount << endl; RangeMap::iterator chunkRange = ranges->find(_inverse ? chunkDoc["max"].Obj() : chunkDoc["min"].Obj()); ASSERT(chunkRange != ranges->end()); ASSERT(chunkRange->second.woCompare(_inverse ? chunkDoc["min"].Obj() : chunkDoc["max"].Obj()) == 0); } ChunkVersion version = ChunkVersion::fromBSON(chunkDoc[ChunkType::DEPRECATED_lastmod()]); if (version > foundMaxVersion) foundMaxVersion = version; ChunkVersion shardMaxVersion = foundMaxShardVersions[chunkDoc[ChunkType::shard()].String()]; if (version > shardMaxVersion) { foundMaxShardVersions[chunkDoc[ChunkType::shard()].String()] = version; } } // Make sure all chunks are accounted for if (ranges != NULL) ASSERT(chunkCount == (int)ranges->size()); // log() << "Validating that all shard versions are up to date..." << endl; // Validate that all the versions are the same ASSERT(foundMaxVersion.equals(maxVersion)); for (VersionMap::iterator it = foundMaxShardVersions.begin(); it != foundMaxShardVersions.end(); it++) { ChunkVersion foundVersion = it->second; VersionMap::const_iterator maxIt = maxShardVersions.find(it->first); ASSERT(maxIt != maxShardVersions.end()); ASSERT(foundVersion.equals(maxIt->second)); } // Make sure all shards are accounted for ASSERT(foundMaxShardVersions.size() == maxShardVersions.size()); } void run() { int numShards = 10; int numInitialChunks = 5; int maxChunks = 100000; // Needed to not overflow the BSONArray's max bytes int keySize = 2; BSONArrayBuilder chunksB; BSONObj lastSplitPt; ChunkVersion version(1, 0, OID()); // // Generate numChunks with a given key size over numShards // All chunks have double key values, so we can split them a bunch // for (int i = -1; i < numInitialChunks; i++) { BSONObjBuilder splitPtB; for (int k = 0; k < keySize; k++) { string field = string("k") + string(1, (char)('0' + k)); if (i < 0) splitPtB.appendMinKey(field); else if (i < numInitialChunks - 1) splitPtB.append(field, (double)i); else splitPtB.appendMaxKey(field); } BSONObj splitPt = splitPtB.obj(); if (i >= 0) { BSONObjBuilder chunkB; chunkB.append(ChunkType::min(), lastSplitPt); chunkB.append(ChunkType::max(), splitPt); int shardNum = rand(numShards); chunkB.append(ChunkType::shard(), "shard" + string(1, (char)('A' + shardNum))); rand(2) ? version.incMajor() : version.incMinor(); version.addToBSON(chunkB, ChunkType::DEPRECATED_lastmod()); chunksB.append(chunkB.obj()); } lastSplitPt = splitPt; } BSONArray chunks = chunksB.arr(); // log() << "Chunks generated : " << chunks << endl; DBClientMockCursor chunksCursor(chunks); // Setup the empty ranges and versions first RangeMap ranges; ChunkVersion maxVersion = ChunkVersion(0, 0, OID()); VersionMap maxShardVersions; // Create a differ which will track our progress boost::shared_ptr differ(_inverse ? new InverseDiffAdapter() : new DefaultDiffAdapter()); differ->attach("test", ranges, maxVersion, maxShardVersions); // Validate initial load differ->calculateConfigDiff(chunksCursor); validate(chunks, ranges, maxVersion, maxShardVersions); // Generate a lot of diffs, and keep validating that updating from the diffs always // gives us the right ranges and versions int numDiffs = 135; // Makes about 100000 chunks overall int numChunks = numInitialChunks; for (int i = 0; i < numDiffs; i++) { // log() << "Generating new diff... " << i << endl; BSONArrayBuilder diffsB; BSONArrayBuilder newChunksB; BSONObjIterator chunksIt(chunks); while (chunksIt.more()) { BSONObj chunk = chunksIt.next().Obj(); int randChoice = rand(10); if (randChoice < 2 && numChunks < maxChunks) { // Simulate a split // log() << " ...starting a split with chunk " << chunk << endl; BSONObjBuilder leftB; BSONObjBuilder rightB; BSONObjBuilder midB; for (int k = 0; k < keySize; k++) { string field = string("k") + string(1, (char)('0' + k)); BSONType maxType = chunk[ChunkType::max()].Obj()[field].type(); double max = maxType == NumberDouble ? chunk["max"].Obj()[field].Number() : 0.0; BSONType minType = chunk[ChunkType::min()].Obj()[field].type(); double min = minType == NumberDouble ? chunk[ChunkType::min()].Obj()[field].Number() : 0.0; if (minType == MinKey) { midB.append(field, max - 1.0); } else if (maxType == MaxKey) { midB.append(field, min + 1.0); } else { midB.append(field, (max + min) / 2.0); } } BSONObj midPt = midB.obj(); // Only happens if we can't split the min chunk if (midPt.isEmpty()) continue; leftB.append(chunk[ChunkType::min()]); leftB.append(ChunkType::max(), midPt); rightB.append(ChunkType::min(), midPt); rightB.append(chunk[ChunkType::max()]); leftB.append(chunk[ChunkType::shard()]); rightB.append(chunk[ChunkType::shard()]); version.incMajor(); version._minor = 0; version.addToBSON(leftB, ChunkType::DEPRECATED_lastmod()); version.incMinor(); version.addToBSON(rightB, ChunkType::DEPRECATED_lastmod()); BSONObj left = leftB.obj(); BSONObj right = rightB.obj(); // log() << " ... split into " << left << " and " << right << endl; newChunksB.append(left); newChunksB.append(right); diffsB.append(right); diffsB.append(left); numChunks++; } else if (randChoice < 4 && chunksIt.more()) { // Simulate a migrate // log() << " ...starting a migrate with chunk " << chunk << endl; BSONObj prevShardChunk; while (chunksIt.more()) { prevShardChunk = chunksIt.next().Obj(); if (prevShardChunk[ChunkType::shard()].String() == chunk[ChunkType::shard()].String()) break; // log() << "... appending chunk from diff shard: " << prevShardChunk << // endl; newChunksB.append(prevShardChunk); prevShardChunk = BSONObj(); } // We need to move between different shards, hence the weirdness in logic here if (!prevShardChunk.isEmpty()) { BSONObjBuilder newShardB; BSONObjBuilder prevShardB; newShardB.append(chunk[ChunkType::min()]); newShardB.append(chunk[ChunkType::max()]); prevShardB.append(prevShardChunk[ChunkType::min()]); prevShardB.append(prevShardChunk[ChunkType::max()]); int shardNum = rand(numShards); newShardB.append(ChunkType::shard(), "shard" + string(1, (char)('A' + shardNum))); prevShardB.append(prevShardChunk[ChunkType::shard()]); version.incMajor(); version._minor = 0; version.addToBSON(newShardB, ChunkType::DEPRECATED_lastmod()); version.incMinor(); version.addToBSON(prevShardB, ChunkType::DEPRECATED_lastmod()); BSONObj newShard = newShardB.obj(); BSONObj prevShard = prevShardB.obj(); // log() << " ... migrated to " << newShard << " and updated " << prevShard // << endl; newChunksB.append(newShard); newChunksB.append(prevShard); diffsB.append(newShard); diffsB.append(prevShard); } else { // log() << "... appending chunk, no more left: " << chunk << endl; newChunksB.append(chunk); } } else { // log() << "Appending chunk : " << chunk << endl; newChunksB.append(chunk); } } BSONArray diffs = diffsB.arr(); chunks = newChunksB.arr(); // log() << "Diffs generated : " << diffs << endl; // log() << "All chunks : " << chunks << endl; // Rarely entirely clear out our data if (rand(10) < 1) { diffs = chunks; ranges.clear(); maxVersion = ChunkVersion(0, 0, OID()); maxShardVersions.clear(); } // log() << "Total number of chunks : " << numChunks << " iteration " << i << endl; DBClientMockCursor diffCursor(diffs); differ->calculateConfigDiff(diffCursor); validate(chunks, ranges, maxVersion, maxShardVersions); } } }; class ChunkDiffUnitTestNormal : public ChunkDiffUnitTest { public: ChunkDiffUnitTestNormal() : ChunkDiffUnitTest(false) {} }; class ChunkDiffUnitTestInverse : public ChunkDiffUnitTest { public: ChunkDiffUnitTestInverse() : ChunkDiffUnitTest(true) {} }; class All : public Suite { public: All() : Suite("sharding") {} void setupTests() { add(); add(); add(); add(); add(); add(); } }; SuiteInstance myall; }