/**
* Copyright (C) 2009 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
#include "mongo/platform/basic.h"
#include "mongo/client/parallel.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/dbtests/config_server_fixture.h"
#include "mongo/dbtests/dbtests.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/chunk_diff.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/config.h"
namespace ShardingTests {
using std::shared_ptr;
using std::unique_ptr;
using std::make_pair;
using std::map;
using std::pair;
using std::set;
using std::string;
using std::vector;
static int rand( int max = -1 ){
static unsigned seed = 1337;
#if !defined(_WIN32)
int r = rand_r( &seed ) ;
#else
int r = ::rand(); // seed not used in this case
#endif
// Modding is bad, but don't really care in this case
return max > 0 ? r % max : r;
}
//
// Converts array of raw BSONObj chunks to a vector of ChunkType
//
void convertBSONArrayToChunkTypes(const BSONArray& chunksArray,
std::vector* chunksVector) {
for (const BSONElement& obj : chunksArray) {
auto chunkTypeRes = ChunkType::fromBSON(obj.Obj());
ASSERT(chunkTypeRes.isOK());
chunksVector->push_back(chunkTypeRes.getValue());
}
}
//
// Sets up a basic environment for loading chunks to/from the direct database connection
// Redirects connections to the direct database for the duration of the test.
//
class ChunkManagerTest : public ConnectionString::ConnectionHook {
public:
ChunkManagerTest() : _client(&_txn) {
shardConnectionPool.clear();
DBException::traceExceptions = true;
// Make all connections redirect to the direct client
ConnectionString::setConnectionHook( this );
// Create the default config database before querying, necessary for direct connections
_client.dropDatabase( "config" );
_client.insert( "config.test", BSON( "hello" << "world" ) );
_client.dropCollection( "config.test" );
_client.dropDatabase( nsGetDB( collName() ) );
_client.insert( collName(), BSON( "hello" << "world" ) );
_client.dropCollection( collName() );
_shardId = "shard0000";
// Since we've redirected the conns, the host doesn't matter here so long as it's
// prefixed with a "$"
Shard shard(_shardId, ConnectionString(HostAndPort("$hostFooBar:27017")));
// Need to run this to ensure the shard is in the global lookup table
Shard::installShard(_shardId, shard);
// Add dummy shard to config DB
_client.insert(ShardType::ConfigNS,
BSON(ShardType::name() << _shardId <<
ShardType::host() << shard.getConnString().toString()));
// Create an index so that diffing works correctly, otherwise no cursors from S&O
ASSERT_OK(dbtests::createIndex(
&_txn,
ChunkType::ConfigNS,
BSON( ChunkType::ns() << 1 << // br
ChunkType::DEPRECATED_lastmod() << 1 ) ));
}
virtual ~ChunkManagerTest() {
// Reset the redirection
ConnectionString::setConnectionHook( NULL );
}
string collName(){ return "foo.bar"; }
virtual DBClientBase* connect( const ConnectionString& connStr,
string& errmsg,
double socketTimeout )
{
// Note - must be new, since it gets owned elsewhere
return new CustomDirectClient(&_txn);
}
protected:
OperationContextImpl _txn;
CustomDirectClient _client;
ShardId _shardId;
};
//
// Tests creating a new chunk manager and creating the default chunks
//
class ChunkManagerCreateBasicTest : public ChunkManagerTest {
public:
void run(){
ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
ChunkManager manager(collName(), shardKeyPattern, false);
manager.createFirstChunks(_shardId, NULL, NULL );
BSONObj firstChunk = _client.findOne(ChunkType::ConfigNS, BSONObj()).getOwned();
ASSERT(firstChunk[ChunkType::min()].Obj()[ "_id" ].type() == MinKey );
ASSERT(firstChunk[ChunkType::max()].Obj()[ "_id" ].type() == MaxKey );
ChunkVersion version = ChunkVersion::fromBSON(firstChunk,
ChunkType::DEPRECATED_lastmod());
ASSERT( version.majorVersion() == 1 );
ASSERT( version.minorVersion() == 0 );
ASSERT( version.isEpochSet() );
}
};
//
// Tests creating a new chunk manager with random split points. Creating chunks on multiple shards is not
// tested here since there are unresolved race conditions there and probably should be avoided if at all
// possible.
//
class ChunkManagerCreateFullTest : public ChunkManagerTest {
public:
static const int numSplitPoints = 100;
void genRandomSplitPoints( vector* splitPoints ){
for( int i = 0; i < numSplitPoints; i++ ){
splitPoints->push_back( rand( numSplitPoints * 10 ) );
}
}
void genRandomSplitKeys( const string& keyName, vector* splitKeys ){
vector splitPoints;
genRandomSplitPoints( &splitPoints );
for( vector::iterator it = splitPoints.begin(); it != splitPoints.end(); ++it ){
splitKeys->push_back( BSON( keyName << *it ) );
}
}
// Uses a chunk manager to create chunks
void createChunks( const string& keyName ){
vector splitKeys;
genRandomSplitKeys( keyName, &splitKeys );
ShardKeyPattern shardKeyPattern(BSON(keyName << 1));
ChunkManager manager(collName(), shardKeyPattern, false);
manager.createFirstChunks(_shardId, &splitKeys, NULL );
}
void run(){
string keyName = "_id";
createChunks( keyName );
unique_ptr cursor =
_client.query(ChunkType::ConfigNS, QUERY(ChunkType::ns(collName())));
set minorVersions;
OID epoch;
// Check that all chunks were created with version 1|x with consistent epoch and unique minor versions
while( cursor->more() ){
BSONObj chunk = cursor->next();
ChunkVersion version = ChunkVersion::fromBSON(chunk,
ChunkType::DEPRECATED_lastmod());
ASSERT( version.majorVersion() == 1 );
ASSERT( version.isEpochSet() );
if( ! epoch.isSet() ) epoch = version.epoch();
ASSERT( version.epoch() == epoch );
ASSERT( minorVersions.find( version.minorVersion() ) == minorVersions.end() );
minorVersions.insert( version.minorVersion() );
ASSERT(chunk[ChunkType::shard()].String() == _shardId);
}
}
};
//
// Tests that chunks are loaded correctly from the db with no a-priori info and also that they can be reloaded
// on top of an old chunk manager with changes.
//
class ChunkManagerLoadBasicTest : public ChunkManagerCreateFullTest {
public:
void run(){
string keyName = "_id";
createChunks( keyName );
int numChunks = static_cast(_client.count(ChunkType::ConfigNS,
BSON(ChunkType::ns(collName()))));
BSONObj firstChunk = _client.findOne(ChunkType::ConfigNS, BSONObj()).getOwned();
ChunkVersion version = ChunkVersion::fromBSON(firstChunk,
ChunkType::DEPRECATED_lastmod());
// Make manager load existing chunks
CollectionType collType;
collType.setNs(NamespaceString{collName()});
collType.setEpoch(version.epoch());
collType.setUpdatedAt(jsTime());
collType.setKeyPattern(BSON("_id" << 1));
collType.setUnique(false);
collType.setDropped(false);
ChunkManager manager(collType);
manager.loadExistingRanges(nullptr);
ASSERT(manager.getVersion().epoch() == version.epoch());
ASSERT(manager.getVersion().minorVersion() == (numChunks - 1));
ASSERT(static_cast(manager.getChunkMap().size()) == numChunks);
// Modify chunks collection
BSONObjBuilder b;
ChunkVersion laterVersion = ChunkVersion( 2, 1, version.epoch() );
laterVersion.addToBSON(b, ChunkType::DEPRECATED_lastmod());
_client.update(ChunkType::ConfigNS, BSONObj(), BSON( "$set" << b.obj()));
// Make new manager load chunk diff
ChunkManager newManager(manager.getns(),
manager.getShardKeyPattern(),
manager.isUnique());
newManager.loadExistingRanges(&manager);
ASSERT( newManager.getVersion().toLong() == laterVersion.toLong() );
ASSERT( newManager.getVersion().epoch() == laterVersion.epoch() );
ASSERT( static_cast( newManager.getChunkMap().size() ) == numChunks );
}
};
class ChunkDiffUnitTest {
public:
bool _inverse;
typedef map RangeMap;
typedef map VersionMap;
ChunkDiffUnitTest( bool inverse ) : _inverse( inverse ) {}
// The default pass-through adapter for using config diffs
class DefaultDiffAdapter : public ConfigDiffTracker {
public:
DefaultDiffAdapter() {}
virtual ~DefaultDiffAdapter() {}
virtual bool isTracked(const ChunkType& chunk) const { return true; }
virtual pair rangeFor(const ChunkType& chunk) const {
return make_pair(chunk.getMin(), chunk.getMax());
}
virtual string shardFor( const string& name ) const { return name; }
};
// Inverts the storage order for chunks from min to max
class InverseDiffAdapter : public DefaultDiffAdapter {
public:
InverseDiffAdapter() {}
virtual ~InverseDiffAdapter() {}
virtual bool isMinKeyIndexed() const { return false; }
virtual pair rangeFor(const ChunkType& chunk) const {
return make_pair(chunk.getMax(), chunk.getMin());
}
};
// Allow validating with and without ranges (b/c our splits won't actually be updated by the diffs)
void validate(const std::vector& chunks,
ChunkVersion maxVersion,
const VersionMap& maxShardVersions) {
validate(chunks, NULL, maxVersion, maxShardVersions);
}
void validate(const std::vector& chunks,
const RangeMap& ranges,
ChunkVersion maxVersion,
const VersionMap& maxShardVersions) {
validate(chunks, (RangeMap*)&ranges, maxVersion, maxShardVersions);
}
// Validates that the ranges and versions are valid given the chunks
void validate(const std::vector& chunks,
RangeMap* ranges,
ChunkVersion maxVersion,
const VersionMap& maxShardVersions) {
int chunkCount = chunks.size();
ChunkVersion foundMaxVersion;
VersionMap foundMaxShardVersions;
//
// Validate that all the chunks are there and collect versions
//
for (const ChunkType& chunk : chunks) {
if( ranges != NULL ){
// log() << "Validating chunk " << chunkDoc << " size : " << ranges->size() << " vs " << chunkCount << endl;
RangeMap::iterator chunkRange = ranges->find(_inverse ?
chunk.getMax() :
chunk.getMin());
ASSERT( chunkRange != ranges->end() );
ASSERT(chunkRange->second.woCompare(_inverse ?
chunk.getMin() :
chunk.getMax()) == 0);
}
ChunkVersion version =
ChunkVersion::fromBSON(chunk.toBSON()[ChunkType::DEPRECATED_lastmod()]);
if( version > foundMaxVersion ) foundMaxVersion = version;
ChunkVersion shardMaxVersion =
foundMaxShardVersions[chunk.getShard()];
if( version > shardMaxVersion ) {
foundMaxShardVersions[chunk.getShard()] = version;
}
}
// Make sure all chunks are accounted for
if( ranges != NULL ) ASSERT( chunkCount == (int) ranges->size() );
// log() << "Validating that all shard versions are up to date..." << endl;
// Validate that all the versions are the same
ASSERT( foundMaxVersion.equals( maxVersion ) );
for( VersionMap::iterator it = foundMaxShardVersions.begin(); it != foundMaxShardVersions.end(); it++ ){
ChunkVersion foundVersion = it->second;
VersionMap::const_iterator maxIt = maxShardVersions.find( it->first );
ASSERT( maxIt != maxShardVersions.end() );
ASSERT( foundVersion.equals( maxIt->second ) );
}
// Make sure all shards are accounted for
ASSERT( foundMaxShardVersions.size() == maxShardVersions.size() );
}
void run() {
int numShards = 10;
int numInitialChunks = 5;
int maxChunks = 100000; // Needed to not overflow the BSONArray's max bytes
int keySize = 2;
BSONArrayBuilder chunksB;
BSONObj lastSplitPt;
ChunkVersion version( 1, 0, OID() );
//
// Generate numChunks with a given key size over numShards
// All chunks have double key values, so we can split them a bunch
//
for( int i = -1; i < numInitialChunks; i++ ){
BSONObjBuilder splitPtB;
for( int k = 0; k < keySize; k++ ){
string field = string( "k" ) + string( 1, (char)('0' + k) );
if( i < 0 )
splitPtB.appendMinKey( field );
else if( i < numInitialChunks - 1 )
splitPtB.append( field, (double)i );
else
splitPtB.appendMaxKey( field );
}
BSONObj splitPt = splitPtB.obj();
if( i >= 0 ){
BSONObjBuilder chunkB;
chunkB.append(ChunkType::name(), "$dummyname");
chunkB.append(ChunkType::ns(), "$dummyns");
chunkB.append(ChunkType::min(), lastSplitPt );
chunkB.append(ChunkType::max(), splitPt );
int shardNum = rand( numShards );
chunkB.append(ChunkType::shard(),
"shard" + string( 1, (char)('A' + shardNum) ) );
rand( 2 ) ? version.incMajor() : version.incMinor();
version.addToBSON(chunkB, ChunkType::DEPRECATED_lastmod());
chunksB.append( chunkB.obj() );
}
lastSplitPt = splitPt;
}
BSONArray chunks = chunksB.arr();
// log() << "Chunks generated : " << chunks << endl;
// Setup the empty ranges and versions first
RangeMap ranges;
ChunkVersion maxVersion = ChunkVersion( 0, 0, OID() );
VersionMap maxShardVersions;
// Create a differ which will track our progress
std::shared_ptr< DefaultDiffAdapter > differ( _inverse ? new InverseDiffAdapter() : new DefaultDiffAdapter() );
differ->attach( "test", ranges, maxVersion, maxShardVersions );
std::vector chunksVector;
convertBSONArrayToChunkTypes(chunks, &chunksVector);
// Validate initial load
differ->calculateConfigDiff(chunksVector);
validate(chunksVector, ranges, maxVersion, maxShardVersions );
// Generate a lot of diffs, and keep validating that updating from the diffs always
// gives us the right ranges and versions
int numDiffs = 135; // Makes about 100000 chunks overall
int numChunks = numInitialChunks;
for( int i = 0; i < numDiffs; i++ ){
// log() << "Generating new diff... " << i << endl;
BSONArrayBuilder diffsB;
BSONArrayBuilder newChunksB;
BSONObjIterator chunksIt( chunks );
while( chunksIt.more() ){
BSONObj chunk = chunksIt.next().Obj();
int randChoice = rand( 10 );
if( randChoice < 2 && numChunks < maxChunks ){
// Simulate a split
// log() << " ...starting a split with chunk " << chunk << endl;
BSONObjBuilder leftB;
BSONObjBuilder rightB;
BSONObjBuilder midB;
for( int k = 0; k < keySize; k++ ){
string field = string( "k" ) + string( 1, (char)('0' + k) );
BSONType maxType = chunk[ChunkType::max()].Obj()[field].type();
double max = maxType == NumberDouble ? chunk["max"].Obj()[field].Number() : 0.0;
BSONType minType = chunk[ChunkType::min()].Obj()[field].type();
double min = minType == NumberDouble ?
chunk[ChunkType::min()].Obj()[field].Number() :
0.0;
if( minType == MinKey ){
midB.append( field, max - 1.0 );
}
else if( maxType == MaxKey ){
midB.append( field, min + 1.0 );
}
else {
midB.append( field, ( max + min ) / 2.0 );
}
}
BSONObj midPt = midB.obj();
// Only happens if we can't split the min chunk
if( midPt.isEmpty() ) continue;
leftB.append( chunk[ChunkType::min()] );
leftB.append(ChunkType::max(), midPt );
rightB.append(ChunkType::min(), midPt );
rightB.append(chunk[ChunkType::max()] );
// add required fields for ChunkType
leftB.append(chunk[ChunkType::name()]);
leftB.append(chunk[ChunkType::ns()]);
rightB.append(chunk[ChunkType::name()]);
rightB.append(chunk[ChunkType::ns()]);
leftB.append(chunk[ChunkType::shard()] );
rightB.append(chunk[ChunkType::shard()] );
version.incMajor();
version._minor = 0;
version.addToBSON(leftB, ChunkType::DEPRECATED_lastmod());
version.incMinor();
version.addToBSON(rightB, ChunkType::DEPRECATED_lastmod());
BSONObj left = leftB.obj();
BSONObj right = rightB.obj();
// log() << " ... split into " << left << " and " << right << endl;
newChunksB.append( left );
newChunksB.append( right );
diffsB.append( right );
diffsB.append( left );
numChunks++;
}
else if( randChoice < 4 && chunksIt.more() ){
// Simulate a migrate
// log() << " ...starting a migrate with chunk " << chunk << endl;
BSONObj prevShardChunk;
while( chunksIt.more() ){
prevShardChunk = chunksIt.next().Obj();
if( prevShardChunk[ChunkType::shard()].String() ==
chunk[ChunkType::shard()].String() ) break;
// log() << "... appending chunk from diff shard: " << prevShardChunk << endl;
newChunksB.append( prevShardChunk );
prevShardChunk = BSONObj();
}
// We need to move between different shards, hence the weirdness in logic here
if( ! prevShardChunk.isEmpty() ){
BSONObjBuilder newShardB;
BSONObjBuilder prevShardB;
newShardB.append(chunk[ChunkType::min()]);
newShardB.append(chunk[ChunkType::max()]);
prevShardB.append(prevShardChunk[ChunkType::min()]);
prevShardB.append(prevShardChunk[ChunkType::max()]);
// add required fields for ChunkType
newShardB.append(chunk[ChunkType::name()]);
newShardB.append(chunk[ChunkType::ns()]);
prevShardB.append(chunk[ChunkType::name()]);
prevShardB.append(chunk[ChunkType::ns()]);
int shardNum = rand( numShards );
newShardB.append(ChunkType::shard(),
"shard" + string( 1, (char)('A' + shardNum)));
prevShardB.append(prevShardChunk[ChunkType::shard()]);
version.incMajor();
version._minor = 0;
version.addToBSON(newShardB, ChunkType::DEPRECATED_lastmod());
version.incMinor();
version.addToBSON(prevShardB, ChunkType::DEPRECATED_lastmod());
BSONObj newShard = newShardB.obj();
BSONObj prevShard = prevShardB.obj();
// log() << " ... migrated to " << newShard << " and updated " << prevShard << endl;
newChunksB.append( newShard );
newChunksB.append( prevShard );
diffsB.append( newShard );
diffsB.append( prevShard );
}
else{
// log() << "... appending chunk, no more left: " << chunk << endl;
newChunksB.append( chunk );
}
}
else{
// log() << "Appending chunk : " << chunk << endl;
newChunksB.append( chunk );
}
}
BSONArray diffs = diffsB.arr();
chunks = newChunksB.arr();
// log() << "Diffs generated : " << diffs << endl;
// log() << "All chunks : " << chunks << endl;
// Rarely entirely clear out our data
if( rand( 10 ) < 1 ){
diffs = chunks;
ranges.clear();
maxVersion = ChunkVersion( 0, 0, OID() );
maxShardVersions.clear();
}
// log() << "Total number of chunks : " << numChunks << " iteration " << i << endl;
std::vector chunksVector;
convertBSONArrayToChunkTypes(chunks, &chunksVector);
differ->calculateConfigDiff(chunksVector);
validate(chunksVector, ranges, maxVersion, maxShardVersions );
}
}
};
class ChunkDiffUnitTestNormal : public ChunkDiffUnitTest {
public:
ChunkDiffUnitTestNormal() : ChunkDiffUnitTest( false ) {}
};
class ChunkDiffUnitTestInverse : public ChunkDiffUnitTest {
public:
ChunkDiffUnitTestInverse() : ChunkDiffUnitTest( true ) {}
};
class All : public Suite {
public:
All() : Suite( "sharding" ) {
}
void setupTests() {
add< ChunkManagerCreateBasicTest >();
add< ChunkManagerCreateFullTest >();
add< ChunkManagerLoadBasicTest >();
add< ChunkDiffUnitTestNormal >();
add< ChunkDiffUnitTestInverse >();
}
};
SuiteInstance myall;
}