/**
* Copyright (C) 2009 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
#include "mongo/platform/basic.h"
#include
#include "mongo/client/dbclientmockcursor.h"
#include "mongo/client/parallel.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/dbtests/config_server_fixture.h"
#include "mongo/dbtests/dbtests.h"
#include "mongo/s/chunk_diff.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/config.h"
#include "mongo/s/type_chunk.h"
#include "mongo/s/type_collection.h"
#include "mongo/util/log.h"
namespace ShardingTests {
using boost::shared_ptr;
using std::auto_ptr;
using std::make_pair;
using std::map;
using std::pair;
using std::set;
using std::string;
using std::vector;
namespace serverandquerytests {
class test1 {
public:
void run() {
ServerAndQuery a("foo:1", BSON("a" << GT << 0 << LTE << 100));
ServerAndQuery b("foo:1", BSON("a" << GT << 200 << LTE << 1000));
ASSERT(a < b);
ASSERT(!(b < a));
set s;
s.insert(a);
s.insert(b);
ASSERT_EQUALS((unsigned int)2, s.size());
}
};
}
static int rand(int max = -1) {
static unsigned seed = 1337;
#if !defined(_WIN32)
int r = rand_r(&seed);
#else
int r = ::rand(); // seed not used in this case
#endif
// Modding is bad, but don't really care in this case
return max > 0 ? r % max : r;
}
//
// Sets up a basic environment for loading chunks to/from the direct database connection
// Redirects connections to the direct database for the duration of the test.
//
class ChunkManagerTest : public ConnectionString::ConnectionHook {
public:
ChunkManagerTest() : _client(&_txn) {
shardConnectionPool.clear();
DBException::traceExceptions = true;
// Make all connections redirect to the direct client
ConnectionString::setConnectionHook(this);
// Create the default config database before querying, necessary for direct connections
_client.dropDatabase("config");
_client.insert("config.test",
BSON("hello"
<< "world"));
_client.dropCollection("config.test");
_client.dropDatabase(nsGetDB(collName()));
_client.insert(collName(),
BSON("hello"
<< "world"));
_client.dropCollection(collName());
// Since we've redirected the conns, the host doesn't matter here so long as it's
// prefixed with a "$"
_shard = Shard("shard0000", "$hostFooBar:27017", 0 /* maxSize */, false /* draining */);
// Need to run this to ensure the shard is in the global lookup table
Shard::installShard(_shard.getName(), _shard);
// Create an index so that diffing works correctly, otherwise no cursors from S&O
ASSERT_OK(dbtests::createIndex(&_txn,
ChunkType::ConfigNS,
BSON(ChunkType::ns() << 1 << // br
ChunkType::DEPRECATED_lastmod() << 1)));
configServer.init("$dummy:1000");
}
virtual ~ChunkManagerTest() {
// Reset the redirection
ConnectionString::setConnectionHook(NULL);
ScopedDbConnection::clearPool();
ShardConnection::clearPool();
}
string collName() {
return "foo.bar";
}
Shard& shard() {
return _shard;
}
virtual DBClientBase* connect(const ConnectionString& connStr,
string& errmsg,
double socketTimeout) {
// Note - must be new, since it gets owned elsewhere
return new CustomDirectClient(&_txn);
}
protected:
OperationContextImpl _txn;
CustomDirectClient _client;
Shard _shard;
};
//
// Tests creating a new chunk manager and creating the default chunks
//
class ChunkManagerCreateBasicTest : public ChunkManagerTest {
public:
void run() {
ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
ChunkManager manager(collName(), shardKeyPattern, false);
manager.createFirstChunks(shard().getConnString(), shard(), NULL, NULL);
BSONObj firstChunk = _client.findOne(ChunkType::ConfigNS, BSONObj()).getOwned();
ASSERT(firstChunk[ChunkType::min()].Obj()["_id"].type() == MinKey);
ASSERT(firstChunk[ChunkType::max()].Obj()["_id"].type() == MaxKey);
ChunkVersion version = ChunkVersion::fromBSON(firstChunk, ChunkType::DEPRECATED_lastmod());
ASSERT(version.majorVersion() == 1);
ASSERT(version.minorVersion() == 0);
ASSERT(version.isEpochSet());
}
};
//
// Tests creating a new chunk manager with random split points. Creating chunks on multiple shards
// is not tested here since there are unresolved race conditions there and probably should be
// avoided if at all possible.
//
class ChunkManagerCreateFullTest : public ChunkManagerTest {
public:
static const int numSplitPoints = 100;
void genRandomSplitPoints(vector* splitPoints) {
for (int i = 0; i < numSplitPoints; i++) {
splitPoints->push_back(rand(numSplitPoints * 10));
}
}
void genRandomSplitKeys(const string& keyName, vector* splitKeys) {
vector splitPoints;
genRandomSplitPoints(&splitPoints);
for (vector::iterator it = splitPoints.begin(); it != splitPoints.end(); ++it) {
splitKeys->push_back(BSON(keyName << *it));
}
}
// Uses a chunk manager to create chunks
void createChunks(const string& keyName) {
vector splitKeys;
genRandomSplitKeys(keyName, &splitKeys);
ShardKeyPattern shardKeyPattern(BSON(keyName << 1));
ChunkManager manager(collName(), shardKeyPattern, false);
manager.createFirstChunks(shard().getConnString(), shard(), &splitKeys, NULL);
}
void run() {
string keyName = "_id";
createChunks(keyName);
auto_ptr cursor =
_client.query(ChunkType::ConfigNS, QUERY(ChunkType::ns(collName())));
set minorVersions;
OID epoch;
// Check that all chunks were created with version 1|x with consistent epoch and unique
// minor versions
while (cursor->more()) {
BSONObj chunk = cursor->next();
ChunkVersion version = ChunkVersion::fromBSON(chunk, ChunkType::DEPRECATED_lastmod());
ASSERT(version.majorVersion() == 1);
ASSERT(version.isEpochSet());
if (!epoch.isSet())
epoch = version.epoch();
ASSERT(version.epoch() == epoch);
ASSERT(minorVersions.find(version.minorVersion()) == minorVersions.end());
minorVersions.insert(version.minorVersion());
ASSERT(chunk[ChunkType::shard()].String() == shard().getName());
}
}
};
//
// Tests that chunks are loaded correctly from the db with no a-priori info and also that they can
// be reloaded on top of an old chunk manager with changes.
//
class ChunkManagerLoadBasicTest : public ChunkManagerCreateFullTest {
public:
void run() {
string keyName = "_id";
createChunks(keyName);
int numChunks =
static_cast(_client.count(ChunkType::ConfigNS, BSON(ChunkType::ns(collName()))));
BSONObj firstChunk = _client.findOne(ChunkType::ConfigNS, BSONObj()).getOwned();
ChunkVersion version = ChunkVersion::fromBSON(firstChunk, ChunkType::DEPRECATED_lastmod());
// Make manager load existing chunks
BSONObjBuilder collDocBuilder;
collDocBuilder << CollectionType::ns(collName());
collDocBuilder << CollectionType::keyPattern(BSON("_id" << 1));
collDocBuilder << CollectionType::unique(false);
collDocBuilder << CollectionType::dropped(false);
collDocBuilder << CollectionType::DEPRECATED_lastmod(jsTime());
collDocBuilder << CollectionType::DEPRECATED_lastmodEpoch(version.epoch());
BSONObj collDoc(collDocBuilder.done());
ChunkManager manager(collDoc);
manager.loadExistingRanges(shard().getConnString(), NULL);
ASSERT(manager.getVersion().epoch() == version.epoch());
ASSERT(manager.getVersion().minorVersion() == (numChunks - 1));
ASSERT(static_cast(manager.getChunkMap().size()) == numChunks);
// Modify chunks collection
BSONObjBuilder b;
ChunkVersion laterVersion = ChunkVersion(2, 1, version.epoch());
laterVersion.addToBSON(b, ChunkType::DEPRECATED_lastmod());
_client.update(ChunkType::ConfigNS, BSONObj(), BSON("$set" << b.obj()));
// Make new manager load chunk diff
ChunkManager newManager(manager.getns(), manager.getShardKeyPattern(), manager.isUnique());
newManager.loadExistingRanges(shard().getConnString(), &manager);
ASSERT(newManager.getVersion().toLong() == laterVersion.toLong());
ASSERT(newManager.getVersion().epoch() == laterVersion.epoch());
ASSERT(static_cast(newManager.getChunkMap().size()) == numChunks);
}
};
class ChunkDiffUnitTest {
public:
bool _inverse;
typedef map RangeMap;
typedef map VersionMap;
ChunkDiffUnitTest(bool inverse) : _inverse(inverse) {}
// The default pass-through adapter for using config diffs
class DefaultDiffAdapter : public ConfigDiffTracker {
public:
DefaultDiffAdapter() {}
virtual ~DefaultDiffAdapter() {}
virtual bool isTracked(const BSONObj& chunkDoc) const {
return true;
}
virtual BSONObj maxFrom(const BSONObj& max) const {
return max;
}
virtual pair rangeFor(const BSONObj& chunkDoc,
const BSONObj& min,
const BSONObj& max) const {
return make_pair(min, max);
}
virtual string shardFor(const string& name) const {
return name;
}
};
// Inverts the storage order for chunks from min to max
class InverseDiffAdapter : public DefaultDiffAdapter {
public:
InverseDiffAdapter() {}
virtual ~InverseDiffAdapter() {}
// Disable
virtual BSONObj maxFrom(const BSONObj& max) const {
ASSERT(false);
return max;
}
virtual BSONObj minFrom(const BSONObj& min) const {
return min;
}
virtual bool isMinKeyIndexed() const {
return false;
}
virtual pair rangeFor(const BSONObj& chunkDoc,
const BSONObj& min,
const BSONObj& max) const {
return make_pair(max, min);
}
};
// Allow validating with and without ranges (b/c our splits won't actually be updated by the
// diffs)
void validate(BSONArray chunks, ChunkVersion maxVersion, const VersionMap& maxShardVersions) {
validate(chunks, NULL, maxVersion, maxShardVersions);
}
void validate(BSONArray chunks,
const RangeMap& ranges,
ChunkVersion maxVersion,
const VersionMap& maxShardVersions) {
validate(chunks, (RangeMap*)&ranges, maxVersion, maxShardVersions);
}
// Validates that the ranges and versions are valid given the chunks
void validate(const BSONArray& chunks,
RangeMap* ranges,
ChunkVersion maxVersion,
const VersionMap& maxShardVersions) {
BSONObjIterator it(chunks);
int chunkCount = 0;
ChunkVersion foundMaxVersion;
VersionMap foundMaxShardVersions;
//
// Validate that all the chunks are there and collect versions
//
while (it.more()) {
BSONObj chunkDoc = it.next().Obj();
chunkCount++;
if (ranges != NULL) {
// log() << "Validating chunk " << chunkDoc << " size : " << ranges->size() << " vs
// " << chunkCount << endl;
RangeMap::iterator chunkRange =
ranges->find(_inverse ? chunkDoc["max"].Obj() : chunkDoc["min"].Obj());
ASSERT(chunkRange != ranges->end());
ASSERT(chunkRange->second.woCompare(_inverse ? chunkDoc["min"].Obj()
: chunkDoc["max"].Obj()) == 0);
}
ChunkVersion version =
ChunkVersion::fromBSON(chunkDoc[ChunkType::DEPRECATED_lastmod()]);
if (version > foundMaxVersion)
foundMaxVersion = version;
ChunkVersion shardMaxVersion =
foundMaxShardVersions[chunkDoc[ChunkType::shard()].String()];
if (version > shardMaxVersion) {
foundMaxShardVersions[chunkDoc[ChunkType::shard()].String()] = version;
}
}
// Make sure all chunks are accounted for
if (ranges != NULL)
ASSERT(chunkCount == (int)ranges->size());
// log() << "Validating that all shard versions are up to date..." << endl;
// Validate that all the versions are the same
ASSERT(foundMaxVersion.equals(maxVersion));
for (VersionMap::iterator it = foundMaxShardVersions.begin();
it != foundMaxShardVersions.end();
it++) {
ChunkVersion foundVersion = it->second;
VersionMap::const_iterator maxIt = maxShardVersions.find(it->first);
ASSERT(maxIt != maxShardVersions.end());
ASSERT(foundVersion.equals(maxIt->second));
}
// Make sure all shards are accounted for
ASSERT(foundMaxShardVersions.size() == maxShardVersions.size());
}
void run() {
int numShards = 10;
int numInitialChunks = 5;
int maxChunks = 100000; // Needed to not overflow the BSONArray's max bytes
int keySize = 2;
BSONArrayBuilder chunksB;
BSONObj lastSplitPt;
ChunkVersion version(1, 0, OID());
//
// Generate numChunks with a given key size over numShards
// All chunks have double key values, so we can split them a bunch
//
for (int i = -1; i < numInitialChunks; i++) {
BSONObjBuilder splitPtB;
for (int k = 0; k < keySize; k++) {
string field = string("k") + string(1, (char)('0' + k));
if (i < 0)
splitPtB.appendMinKey(field);
else if (i < numInitialChunks - 1)
splitPtB.append(field, (double)i);
else
splitPtB.appendMaxKey(field);
}
BSONObj splitPt = splitPtB.obj();
if (i >= 0) {
BSONObjBuilder chunkB;
chunkB.append(ChunkType::min(), lastSplitPt);
chunkB.append(ChunkType::max(), splitPt);
int shardNum = rand(numShards);
chunkB.append(ChunkType::shard(), "shard" + string(1, (char)('A' + shardNum)));
rand(2) ? version.incMajor() : version.incMinor();
version.addToBSON(chunkB, ChunkType::DEPRECATED_lastmod());
chunksB.append(chunkB.obj());
}
lastSplitPt = splitPt;
}
BSONArray chunks = chunksB.arr();
// log() << "Chunks generated : " << chunks << endl;
DBClientMockCursor chunksCursor(chunks);
// Setup the empty ranges and versions first
RangeMap ranges;
ChunkVersion maxVersion = ChunkVersion(0, 0, OID());
VersionMap maxShardVersions;
// Create a differ which will track our progress
boost::shared_ptr differ(_inverse ? new InverseDiffAdapter()
: new DefaultDiffAdapter());
differ->attach("test", ranges, maxVersion, maxShardVersions);
// Validate initial load
differ->calculateConfigDiff(chunksCursor);
validate(chunks, ranges, maxVersion, maxShardVersions);
// Generate a lot of diffs, and keep validating that updating from the diffs always
// gives us the right ranges and versions
int numDiffs = 135; // Makes about 100000 chunks overall
int numChunks = numInitialChunks;
for (int i = 0; i < numDiffs; i++) {
// log() << "Generating new diff... " << i << endl;
BSONArrayBuilder diffsB;
BSONArrayBuilder newChunksB;
BSONObjIterator chunksIt(chunks);
while (chunksIt.more()) {
BSONObj chunk = chunksIt.next().Obj();
int randChoice = rand(10);
if (randChoice < 2 && numChunks < maxChunks) {
// Simulate a split
// log() << " ...starting a split with chunk " << chunk << endl;
BSONObjBuilder leftB;
BSONObjBuilder rightB;
BSONObjBuilder midB;
for (int k = 0; k < keySize; k++) {
string field = string("k") + string(1, (char)('0' + k));
BSONType maxType = chunk[ChunkType::max()].Obj()[field].type();
double max =
maxType == NumberDouble ? chunk["max"].Obj()[field].Number() : 0.0;
BSONType minType = chunk[ChunkType::min()].Obj()[field].type();
double min = minType == NumberDouble
? chunk[ChunkType::min()].Obj()[field].Number()
: 0.0;
if (minType == MinKey) {
midB.append(field, max - 1.0);
} else if (maxType == MaxKey) {
midB.append(field, min + 1.0);
} else {
midB.append(field, (max + min) / 2.0);
}
}
BSONObj midPt = midB.obj();
// Only happens if we can't split the min chunk
if (midPt.isEmpty())
continue;
leftB.append(chunk[ChunkType::min()]);
leftB.append(ChunkType::max(), midPt);
rightB.append(ChunkType::min(), midPt);
rightB.append(chunk[ChunkType::max()]);
leftB.append(chunk[ChunkType::shard()]);
rightB.append(chunk[ChunkType::shard()]);
version.incMajor();
version._minor = 0;
version.addToBSON(leftB, ChunkType::DEPRECATED_lastmod());
version.incMinor();
version.addToBSON(rightB, ChunkType::DEPRECATED_lastmod());
BSONObj left = leftB.obj();
BSONObj right = rightB.obj();
// log() << " ... split into " << left << " and " << right << endl;
newChunksB.append(left);
newChunksB.append(right);
diffsB.append(right);
diffsB.append(left);
numChunks++;
} else if (randChoice < 4 && chunksIt.more()) {
// Simulate a migrate
// log() << " ...starting a migrate with chunk " << chunk << endl;
BSONObj prevShardChunk;
while (chunksIt.more()) {
prevShardChunk = chunksIt.next().Obj();
if (prevShardChunk[ChunkType::shard()].String() ==
chunk[ChunkType::shard()].String())
break;
// log() << "... appending chunk from diff shard: " << prevShardChunk <<
// endl;
newChunksB.append(prevShardChunk);
prevShardChunk = BSONObj();
}
// We need to move between different shards, hence the weirdness in logic here
if (!prevShardChunk.isEmpty()) {
BSONObjBuilder newShardB;
BSONObjBuilder prevShardB;
newShardB.append(chunk[ChunkType::min()]);
newShardB.append(chunk[ChunkType::max()]);
prevShardB.append(prevShardChunk[ChunkType::min()]);
prevShardB.append(prevShardChunk[ChunkType::max()]);
int shardNum = rand(numShards);
newShardB.append(ChunkType::shard(),
"shard" + string(1, (char)('A' + shardNum)));
prevShardB.append(prevShardChunk[ChunkType::shard()]);
version.incMajor();
version._minor = 0;
version.addToBSON(newShardB, ChunkType::DEPRECATED_lastmod());
version.incMinor();
version.addToBSON(prevShardB, ChunkType::DEPRECATED_lastmod());
BSONObj newShard = newShardB.obj();
BSONObj prevShard = prevShardB.obj();
// log() << " ... migrated to " << newShard << " and updated " << prevShard
// << endl;
newChunksB.append(newShard);
newChunksB.append(prevShard);
diffsB.append(newShard);
diffsB.append(prevShard);
} else {
// log() << "... appending chunk, no more left: " << chunk << endl;
newChunksB.append(chunk);
}
} else {
// log() << "Appending chunk : " << chunk << endl;
newChunksB.append(chunk);
}
}
BSONArray diffs = diffsB.arr();
chunks = newChunksB.arr();
// log() << "Diffs generated : " << diffs << endl;
// log() << "All chunks : " << chunks << endl;
// Rarely entirely clear out our data
if (rand(10) < 1) {
diffs = chunks;
ranges.clear();
maxVersion = ChunkVersion(0, 0, OID());
maxShardVersions.clear();
}
// log() << "Total number of chunks : " << numChunks << " iteration " << i << endl;
DBClientMockCursor diffCursor(diffs);
differ->calculateConfigDiff(diffCursor);
validate(chunks, ranges, maxVersion, maxShardVersions);
}
}
};
class ChunkDiffUnitTestNormal : public ChunkDiffUnitTest {
public:
ChunkDiffUnitTestNormal() : ChunkDiffUnitTest(false) {}
};
class ChunkDiffUnitTestInverse : public ChunkDiffUnitTest {
public:
ChunkDiffUnitTestInverse() : ChunkDiffUnitTest(true) {}
};
class All : public Suite {
public:
All() : Suite("sharding") {}
void setupTests() {
add();
add();
add();
add();
add();
add();
}
};
SuiteInstance myall;
}