diff options
29 files changed, 1704 insertions, 1553 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 310c30eb41a..f9bd4148699 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -792,6 +792,7 @@ env.Library('coreshard', [# This is only here temporarily for auto-split logic i 's/config.cpp', 's/grid.cpp', 's/chunk.cpp', + 's/chunk_manager.cpp', # No good reason to be here other than chunk.cpp needs this. 's/config_server_checker_service.cpp', 's/shard.cpp', diff --git a/src/mongo/client/parallel.cpp b/src/mongo/client/parallel.cpp index df1ef53eb51..bd73e4dbdc3 100644 --- a/src/mongo/client/parallel.cpp +++ b/src/mongo/client/parallel.cpp @@ -42,8 +42,7 @@ #include "mongo/client/replica_set_monitor.h" #include "mongo/db/dbmessage.h" #include "mongo/db/query/lite_parsed_query.h" -#include "mongo/s/chunk.h" -#include "mongo/s/chunk_version.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/shard.h" diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index d8e15b96dad..cd0be7674c7 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -1,5 +1,3 @@ -// mr.cpp - /** * Copyright (C) 2012 10gen Inc. * @@ -58,6 +56,7 @@ #include "mongo/db/operation_context_impl.h" #include "mongo/db/storage_options.h" #include "mongo/scripting/engine.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/collection_metadata.h" #include "mongo/s/d_state.h" #include "mongo/s/grid.h" diff --git a/src/mongo/dbtests/chunktests.cpp b/src/mongo/dbtests/chunktests.cpp index 0f44ff7a856..f4530d9cee3 100644 --- a/src/mongo/dbtests/chunktests.cpp +++ b/src/mongo/dbtests/chunktests.cpp @@ -32,7 +32,7 @@ #include "mongo/db/json.h" #include "mongo/dbtests/dbtests.h" -#include "mongo/s/chunk.h" +#include "mongo/s/chunk_manager.h" namespace mongo { diff --git a/src/mongo/dbtests/sharding.cpp b/src/mongo/dbtests/sharding.cpp index 552999c7882..afbd6ab2880 100644 --- a/src/mongo/dbtests/sharding.cpp +++ b/src/mongo/dbtests/sharding.cpp @@ -39,6 +39,7 @@ #include "mongo/dbtests/config_server_fixture.h" #include "mongo/dbtests/dbtests.h" #include "mongo/s/chunk_diff.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_version.h" #include "mongo/s/config.h" #include "mongo/s/type_chunk.h" diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index 418f63a7197..7b34b2269bc 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -37,9 +37,10 @@ #include "mongo/base/owned_pointer_map.h" #include "mongo/client/dbclientcursor.h" #include "mongo/db/jsobj.h" +#include "mongo/db/server_options.h" #include "mongo/db/write_concern.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/chunk.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/cluster_write.h" #include "mongo/s/config.h" #include "mongo/s/config_server_checker_service.h" diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h index 8ed8662b5a9..8161af39293 100644 --- a/src/mongo/s/balance.h +++ b/src/mongo/s/balance.h @@ -30,8 +30,6 @@ #pragma once -#include "mongo/platform/basic.h" - #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> diff --git a/src/mongo/s/balancer_policy.cpp b/src/mongo/s/balancer_policy.cpp index 70e3d24e404..bd23ddf4e25 100644 --- a/src/mongo/s/balancer_policy.cpp +++ b/src/mongo/s/balancer_policy.cpp @@ -35,7 +35,7 @@ #include "mongo/client/connpool.h" #include "mongo/s/balancer_policy.h" -#include "mongo/s/chunk.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/config.h" #include "mongo/s/type_shard.h" #include "mongo/s/type_tags.h" diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index a99a32829e1..ff1ba674c37 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -34,83 +34,51 @@ #include "mongo/s/chunk.h" -#include <boost/shared_ptr.hpp> #include <iostream> #include "mongo/base/owned_pointer_map.h" #include "mongo/client/connpool.h" #include "mongo/client/dbclientcursor.h" -#include "mongo/db/query/lite_parsed_query.h" -#include "mongo/db/index_names.h" #include "mongo/db/lasterror.h" -#include "mongo/db/write_concern.h" +#include "mongo/db/query/query_solution.h" #include "mongo/db/server_parameters.h" +#include "mongo/db/write_concern.h" +#include "mongo/db/write_concern_options.h" #include "mongo/platform/random.h" #include "mongo/s/balancer_policy.h" -#include "mongo/s/chunk_diff.h" -#include "mongo/s/chunk_version.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/client_info.h" #include "mongo/s/cluster_write.h" #include "mongo/s/config.h" #include "mongo/s/config_server_checker_service.h" #include "mongo/s/cursors.h" -#include "mongo/s/distlock.h" #include "mongo/s/grid.h" -#include "mongo/s/strategy.h" -#include "mongo/s/type_collection.h" #include "mongo/s/type_settings.h" #include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/log.h" #include "mongo/util/print.h" -#include "mongo/util/startup_test.h" -#include "mongo/util/timer.h" -#include "mongo/db/query/canonical_query.h" -#include "mongo/db/query/query_planner.h" -#include "mongo/db/query/query_planner_common.h" -#include "mongo/db/query/index_bounds_builder.h" -#include "mongo/db/write_concern_options.h" namespace mongo { using boost::shared_ptr; using std::auto_ptr; - using std::cout; - using std::endl; - using std::pair; - using std::make_pair; using std::map; - using std::max; using std::ostringstream; using std::set; using std::string; using std::stringstream; using std::vector; - inline bool allOfType(BSONType type, const BSONObj& o) { - BSONObjIterator it(o); - while(it.more()) { - if (it.next().type() != type) - return false; - } - return true; - } - - static const int kTooManySplitPoints = 4; - - // ------- Shard -------- +namespace { - long long Chunk::MaxChunkSize = 1024 * 1024 * 64; - int Chunk::MaxObjectPerChunk = 250000; - - // Can be overridden from command line - bool Chunk::ShouldAutoSplit = true; + const int kTooManySplitPoints = 4; /** * Attempts to move the given chunk to another shard. * * Returns true if the chunk was actually moved. */ - static bool tryMoveToOtherShard(const ChunkManager& manager, const ChunkType& chunk) { + bool tryMoveToOtherShard(const ChunkManager& manager, const ChunkType& chunk) { // reload sharding metadata before starting migration ChunkManagerPtr chunkMgr = manager.reload(false /* just reloaded in mulitsplit */); @@ -124,7 +92,7 @@ namespace mongo { } if (shardInfo.size() < 2) { - LOG(0) << "no need to move top chunk since there's only 1 shard" << endl; + LOG(0) << "no need to move top chunk since there's only 1 shard"; return false; } @@ -142,7 +110,7 @@ namespace mongo { chunk); if (!tagStatus.isOK()) { warning() << "Not auto-moving chunk because of an error encountered while " - << "checking tag for chunk: " << tagStatus.getStatus() << endl; + << "checking tag for chunk: " << tagStatus.getStatus(); return false; } @@ -158,7 +126,7 @@ namespace mongo { if (chunk.getShard() == newLocation) { // if this is the best shard, then we shouldn't do anything. LOG(1) << "recently split chunk: " << chunk - << " already in the best shard" << endl; + << " already in the best shard"; return false; } @@ -166,11 +134,11 @@ namespace mongo { if (!(toMove->getMin() == chunk.getMin() && toMove->getMax() == chunk.getMax())) { LOG(1) << "recently split chunk: " << chunk - << " modified before we could migrate " << toMove->toString() << endl; + << " modified before we could migrate " << toMove->toString(); return false; } - log() << "moving chunk (auto): " << toMove->toString() << " to: " << newLocation << endl; + log() << "moving chunk (auto): " << toMove->toString() << " to: " << newLocation; BSONObj res; @@ -190,6 +158,14 @@ namespace mongo { return true; } +} // namespace + + long long Chunk::MaxChunkSize = 1024 * 1024 * 64; + int Chunk::MaxObjectPerChunk = 250000; + + // Can be overridden from command line + bool Chunk::ShouldAutoSplit = true; + Chunk::Chunk(const ChunkManager * manager, BSONObj from) : _manager(manager), _lastmod(0, 0, OID()), _dataWritten(mkDataWritten()) { @@ -236,14 +212,14 @@ namespace mongo { return getMin().woCompare( shardKey ) <= 0 && shardKey.woCompare( getMax() ) < 0; } - bool Chunk::minIsInf() const { + bool Chunk::_minIsInf() const { return 0 == - _manager->getShardKeyPattern().getKeyPattern().globalMin().woCompare( getMin() ); + _manager->getShardKeyPattern().getKeyPattern().globalMin().woCompare(getMin()); } - bool Chunk::maxIsInf() const { + bool Chunk::_maxIsInf() const { return 0 == - _manager->getShardKeyPattern().getKeyPattern().globalMax().woCompare( getMax() ); + _manager->getShardKeyPattern().getKeyPattern().globalMax().woCompare(getMax()); } BSONObj Chunk::_getExtremeKey(bool doSplitAtLower) const { @@ -353,7 +329,7 @@ namespace mongo { conn.done(); } - void Chunk::determineSplitPoints(bool atMedian, std::vector<BSONObj>* splitPoints) const { + void Chunk::determineSplitPoints(bool atMedian, vector<BSONObj>* splitPoints) const { // if splitting is not obligatory we may return early if there are not enough data // we cap the number of objects that would fall in the first half (before the split point) // the rationale is we'll find a split point without traversing all the data @@ -406,7 +382,7 @@ namespace mongo { msg = "chunk not full enough to trigger auto-split"; } - LOG(1) << msg << endl; + LOG(1) << msg; return Status(ErrorCodes::CannotSplit, msg); } @@ -418,13 +394,13 @@ namespace mongo { if (mode == Chunk::autoSplitInternal && KeyPattern::isOrderedKeyPattern(_manager->getShardKeyPattern().toBSON())) { - if (minIsInf()) { + if (_minIsInf()) { BSONObj key = _getExtremeKey(true); if (!key.isEmpty()) { splitPoints[0] = key.getOwned(); } } - else if (maxIsInf()) { + else if (_maxIsInf()) { BSONObj key = _getExtremeKey(false); if (!key.isEmpty()) { splitPoints.pop_back(); @@ -439,7 +415,7 @@ namespace mongo { string msg(str::stream() << "not splitting chunk " << toString() << ", split point " << splitPoints.front() << " is exactly on chunk bounds"); - log() << msg << endl; + log() << msg; return Status(ErrorCodes::CannotSplit, msg); } @@ -447,7 +423,7 @@ namespace mongo { string msg(str::stream() << "not splitting chunk " << toString() << ", split point " << splitPoints.back() << " is exactly on chunk bounds"); - log() << msg << endl; + log() << msg; return Status(ErrorCodes::CannotSplit, msg); } @@ -486,7 +462,7 @@ namespace mongo { if (!conn->runCommand("admin", cmdObj, *res)) { string msg(str::stream() << "splitChunk failed - cmd: " << cmdObj << " result: " << *res); - warning() << msg << endl; + warning() << msg; conn.done(); return Status(ErrorCodes::SplitFailed, msg); @@ -509,7 +485,7 @@ namespace mongo { uassert( 10167 , "can't move shard to its current location!" , getShard() != to ); log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " - << _shard.toString() << " -> " << to.toString() << endl; + << _shard.toString() << " -> " << to.toString(); Shard from = _shard; ScopedDbConnection fromconn(from.getConnString()); @@ -549,7 +525,7 @@ namespace mongo { bool worked = fromconn->runCommand("admin", builder.done(), res); fromconn.done(); - LOG( worked ? 1 : 0 ) << "moveChunk result: " << res << endl; + LOG( worked ? 1 : 0 ) << "moveChunk result: " << res; // if succeeded, needs to reload to pick up the new location // if failed, mongos may be stale @@ -566,15 +542,15 @@ namespace mongo { try { _dataWritten += dataWritten; int splitThreshold = getManager()->getCurrentDesiredChunkSize(); - if ( minIsInf() || maxIsInf() ) { - splitThreshold = (int) ((double)splitThreshold * .9); + if (_minIsInf() || _maxIsInf()) { + splitThreshold = (int)((double)splitThreshold * .9); } if ( _dataWritten < splitThreshold / ChunkManager::SplitHeuristics::splitTestFactor ) return false; if ( ! getManager()->_splitHeuristics._splitTickets.tryAcquire() ) { - LOG(1) << "won't auto split because not enough tickets: " << getManager()->getns() << endl; + LOG(1) << "won't auto split because not enough tickets: " << getManager()->getns(); return false; } TicketHolderReleaser releaser( &(getManager()->_splitHeuristics._splitTickets) ); @@ -596,28 +572,30 @@ namespace mongo { if ( !isConfigServerConsistent() ) { RARELY warning() << "will not perform auto-split because " - << "config servers are inconsistent" << endl; + << "config servers are inconsistent"; return false; } - LOG(1) << "about to initiate autosplit: " << *this << " dataWritten: " << _dataWritten << " splitThreshold: " << splitThreshold << endl; + LOG(1) << "about to initiate autosplit: " << *this << " dataWritten: " << _dataWritten << " splitThreshold: " << splitThreshold; BSONObj res; size_t splitCount = 0; Status status = split(Chunk::autoSplitInternal, &splitCount, &res); - if ( !status.isOK() ) { - // split would have issued a message if we got here - _dataWritten = 0; // this means there wasn't enough data to split, so don't want to try again until considerable more data + if (!status.isOK()) { + // Split would have issued a message if we got here. This means there wasn't enough + // data to split, so don't want to try again until considerable more data + _dataWritten = 0; return false; } - if ( maxIsInf() || minIsInf() ) { + if (_maxIsInf() || _minIsInf()) { // we don't want to reset _dataWritten since we kind of want to check the other side right away } else { - _dataWritten = 0; // we're splitting, so should wait a bit + // we're splitting, so should wait a bit + _dataWritten = 0; } const bool shouldBalance = grid.getConfigShouldBalance() && @@ -631,7 +609,7 @@ namespace mongo { << " size: " << getPhysicalSize() // slow - but can be useful when debugging #endif << ( res["shouldMigrate"].eoo() ? "" : (string)" (migrate suggested" + - ( shouldBalance ? ")" : ", but no migrations allowed)" ) ) << endl; + ( shouldBalance ? ")" : ", but no migrations allowed)" ) ); // Top chunk optimization - try to move the top chunk out of this shard // to prevent the hot spot from staying on a single shard. This is based on @@ -658,7 +636,7 @@ namespace mongo { _dataWritten = mkDataWritten(); // if the collection lock is taken (e.g. we're migrating), it is fine for the split to fail. - warning() << "could not autosplit collection " << _manager->getns() << causedBy( e ) << endl; + warning() << "could not autosplit collection " << _manager->getns() << causedBy( e ); return false; } } @@ -753,7 +731,7 @@ namespace mongo { if ( !result.isOK() ) { warning() << "couldn't set jumbo for chunk: " - << genID() << result.reason() << endl; + << genID() << result.reason(); } } @@ -769,19 +747,19 @@ namespace mongo { // validate chunksize before proceeding if ( csize == 0 ) { // setting was not modified; mark as such - log() << "warning: invalid chunksize (" << csize << ") ignored" << endl; + log() << "warning: invalid chunksize (" << csize << ") ignored"; return; } - LOG(1) << "Refreshing MaxChunkSize: " << csize << "MB" << endl; + LOG(1) << "Refreshing MaxChunkSize: " << csize << "MB"; if (csize != Chunk::MaxChunkSize/(1024*1024)) { log() << "MaxChunkSize changing from " << Chunk::MaxChunkSize/(1024*1024) << "MB" - << " to " << csize << "MB" << endl; + << " to " << csize << "MB"; } if ( !setMaxChunkSizeSizeMB( csize ) ) { - warning() << "invalid MaxChunkSize: " << csize << endl; + warning() << "invalid MaxChunkSize: " << csize; } } @@ -794,823 +772,4 @@ namespace mongo { return true; } - // ------- ChunkManager -------- - - AtomicUInt32 ChunkManager::NextSequenceNumber(1U); - - ChunkManager::ChunkManager( const string& ns, const ShardKeyPattern& pattern , bool unique ) : - _ns( ns ), - _keyPattern( pattern.getKeyPattern() ), - _unique( unique ), - _chunkRanges(), - _mutex("ChunkManager"), - _sequenceNumber(NextSequenceNumber.addAndFetch(1)) - { - // - // Sets up a chunk manager from new data - // - } - - ChunkManager::ChunkManager( const BSONObj& collDoc ) : - // Need the ns early, to construct the lock - // TODO: Construct lock on demand? Not sure why we need to keep it around - _ns(collDoc[CollectionType::ns()].type() == String ? - collDoc[CollectionType::ns()].String() : - ""), - _keyPattern(collDoc[CollectionType::keyPattern()].type() == Object ? - collDoc[CollectionType::keyPattern()].Obj().getOwned() : - BSONObj()), - _unique(collDoc[CollectionType::unique()].trueValue()), - _chunkRanges(), - _mutex("ChunkManager"), - // The shard versioning mechanism hinges on keeping track of the number of times we reloaded ChunkManager's. - // Increasing this number here will prompt checkShardVersion() to refresh the connection-level versions to - // the most up to date value. - _sequenceNumber(NextSequenceNumber.addAndFetch(1)) - { - - // - // Sets up a chunk manager from an existing sharded collection document - // - - verify( _ns != "" ); - verify( ! _keyPattern.toBSON().isEmpty() ); - - _version = ChunkVersion::fromBSON( collDoc ); - } - - void ChunkManager::loadExistingRanges( const string& config, const ChunkManager* oldManager ){ - - int tries = 3; - while (tries--) { - ChunkMap chunkMap; - set<Shard> shards; - ShardVersionMap shardVersions; - Timer t; - - bool success = _load(config, chunkMap, shards, shardVersions, oldManager); - - if( success ){ - { - int ms = t.millis(); - log() << "ChunkManager: time to load chunks for " << _ns << ": " << ms << "ms" - << " sequenceNumber: " << _sequenceNumber - << " version: " << _version.toString() - << " based on: " << - (oldManager ? oldManager->getVersion().toString() : "(empty)") - << endl; - } - - // TODO: Merge into diff code above, so we validate in one place - if (_isValid(chunkMap)) { - // These variables are const for thread-safety. Since the - // constructor can only be called from one thread, we don't have - // to worry about that here. - const_cast<ChunkMap&>(_chunkMap).swap(chunkMap); - const_cast<set<Shard>&>(_shards).swap(shards); - const_cast<ShardVersionMap&>(_shardVersions).swap(shardVersions); - const_cast<ChunkRangeManager&>(_chunkRanges).reloadAll(_chunkMap); - - return; - } - } - - if (_chunkMap.size() < 10) { - _printChunks(); - } - - warning() << "ChunkManager loaded an invalid config for " << _ns - << ", trying again" << endl; - - sleepmillis(10 * (3-tries)); - } - - // this will abort construction so we should never have a reference to an invalid config - msgasserted(13282, "Couldn't load a valid config for " + _ns + " after 3 attempts. Please try again."); - } - - - /** - * This is an adapter so we can use config diffs - mongos and mongod do them slightly - * differently - * - * The mongos adapter here tracks all shards, and stores ranges by (max, Chunk) in the map. - */ - class CMConfigDiffTracker : public ConfigDiffTracker<ChunkPtr, std::string> { - public: - CMConfigDiffTracker( ChunkManager* manager ) : _manager( manager ) {} - - virtual bool isTracked( const BSONObj& chunkDoc ) const { - // Mongos tracks all shards - return true; - } - - virtual BSONObj minFrom( const ChunkPtr& val ) const { - return val.get()->getMin(); - } - - virtual bool isMinKeyIndexed() const { return false; } - - virtual pair<BSONObj,ChunkPtr> rangeFor( const BSONObj& chunkDoc, const BSONObj& min, const BSONObj& max ) const { - ChunkPtr c( new Chunk( _manager, chunkDoc ) ); - return make_pair( max, c ); - } - - virtual string shardFor(const string& hostName) const { - Shard shard = Shard::make(hostName); - return shard.getName(); - } - - ChunkManager* _manager; - - }; - - bool ChunkManager::_load(const string& config, - ChunkMap& chunkMap, - set<Shard>& shards, - ShardVersionMap& shardVersions, - const ChunkManager* oldManager) - { - - // Reset the max version, but not the epoch, when we aren't loading from the oldManager - _version = ChunkVersion( 0, 0, _version.epoch() ); - - // If we have a previous version of the ChunkManager to work from, use that info to reduce - // our config query - if( oldManager && oldManager->getVersion().isSet() ){ - - // Get the old max version - _version = oldManager->getVersion(); - // Load a copy of the old versions - shardVersions = oldManager->_shardVersions; - - // Load a copy of the chunk map, replacing the chunk manager with our own - const ChunkMap& oldChunkMap = oldManager->getChunkMap(); - - // Could be v.expensive - // TODO: If chunks were immutable and didn't reference the manager, we could do more - // interesting things here - for( ChunkMap::const_iterator it = oldChunkMap.begin(); it != oldChunkMap.end(); it++ ){ - - ChunkPtr oldC = it->second; - ChunkPtr c( new Chunk( this, oldC->getMin(), - oldC->getMax(), - oldC->getShard(), - oldC->getLastmod() ) ); - - c->setBytesWritten( oldC->getBytesWritten() ); - - chunkMap.insert( make_pair( oldC->getMax(), c ) ); - } - - LOG(2) << "loading chunk manager for collection " << _ns - << " using old chunk manager w/ version " << _version.toString() - << " and " << oldChunkMap.size() << " chunks" << endl; - } - - // Attach a diff tracker for the versioned chunk data - CMConfigDiffTracker differ( this ); - differ.attach( _ns, chunkMap, _version, shardVersions ); - - // Diff tracker should *always* find at least one chunk if collection exists - int diffsApplied = differ.calculateConfigDiff(config); - if( diffsApplied > 0 ){ - - LOG(2) << "loaded " << diffsApplied << " chunks into new chunk manager for " << _ns - << " with version " << _version << endl; - - // Add all the shards we find to the shards set - for( ShardVersionMap::iterator it = shardVersions.begin(); it != shardVersions.end(); it++ ){ - shards.insert( it->first ); - } - - return true; - } - else if( diffsApplied == 0 ){ - - // No chunks were found for the ns - warning() << "no chunks found when reloading " << _ns - << ", previous version was " << _version << endl; - - // Set all our data to empty - chunkMap.clear(); - shardVersions.clear(); - _version = ChunkVersion( 0, 0, OID() ); - - return true; - } - else { // diffsApplied < 0 - - bool allInconsistent = differ.numValidDiffs() == 0; - - if( allInconsistent ){ - // All versions are different, this can be normal - warning() << "major change in chunk information found when reloading " - << _ns << ", previous version was " << _version << endl; - } - else { - // Inconsistent load halfway through (due to yielding cursor during load) - // should be rare - warning() << "inconsistent chunks found when reloading " - << _ns << ", previous version was " << _version - << ", this should be rare" << endl; - } - - // Set all our data to empty to be extra safe - chunkMap.clear(); - shardVersions.clear(); - _version = ChunkVersion( 0, 0, OID() ); - - return allInconsistent; - } - - } - - ChunkManagerPtr ChunkManager::reload(bool force) const { - return grid.getDBConfig(getns())->getChunkManager(getns(), force); - } - - bool ChunkManager::_isValid(const ChunkMap& chunkMap) { -#define ENSURE(x) do { if(!(x)) { log() << "ChunkManager::_isValid failed: " #x << endl; return false; } } while(0) - - if (chunkMap.empty()) - return true; - - // Check endpoints - ENSURE(allOfType(MinKey, chunkMap.begin()->second->getMin())); - ENSURE(allOfType(MaxKey, boost::prior(chunkMap.end())->second->getMax())); - - // Make sure there are no gaps or overlaps - for (ChunkMap::const_iterator it=boost::next(chunkMap.begin()), end=chunkMap.end(); it != end; ++it) { - ChunkMap::const_iterator last = boost::prior(it); - - if (!(it->second->getMin() == last->second->getMax())) { - PRINT(last->second->toString()); - PRINT(it->second->toString()); - PRINT(it->second->getMin()); - PRINT(last->second->getMax()); - } - ENSURE(it->second->getMin() == last->second->getMax()); - } - - return true; - -#undef ENSURE - } - - void ChunkManager::_printChunks() const { - for (ChunkMap::const_iterator it=_chunkMap.begin(), end=_chunkMap.end(); it != end; ++it) { - log() << *it->second << endl; - } - } - - void ChunkManager::calcInitSplitsAndShards( const Shard& primary, - const vector<BSONObj>* initPoints, - const vector<Shard>* initShards, - vector<BSONObj>* splitPoints, - vector<Shard>* shards ) const - { - verify( _chunkMap.size() == 0 ); - - unsigned long long numObjects = 0; - Chunk c(this, _keyPattern.getKeyPattern().globalMin(), - _keyPattern.getKeyPattern().globalMax(), primary); - - if ( !initPoints || !initPoints->size() ) { - // discover split points - { - // get stats to see if there is any data - ScopedDbConnection shardConn(primary.getConnString()); - - numObjects = shardConn->count( getns() ); - shardConn.done(); - } - - if ( numObjects > 0 ) - c.pickSplitVector( *splitPoints , Chunk::MaxChunkSize ); - - // since docs alread exists, must use primary shard - shards->push_back( primary ); - } else { - // make sure points are unique and ordered - set<BSONObj> orderedPts; - for ( unsigned i = 0; i < initPoints->size(); ++i ) { - BSONObj pt = (*initPoints)[i]; - orderedPts.insert( pt ); - } - for ( set<BSONObj>::iterator it = orderedPts.begin(); it != orderedPts.end(); ++it ) { - splitPoints->push_back( *it ); - } - - if ( !initShards || !initShards->size() ) { - // If not specified, only use the primary shard (note that it's not safe for mongos - // to put initial chunks on other shards without the primary mongod knowing). - shards->push_back( primary ); - } else { - std::copy( initShards->begin() , initShards->end() , std::back_inserter(*shards) ); - } - } - } - - void ChunkManager::createFirstChunks( const string& config, - const Shard& primary, - const vector<BSONObj>* initPoints, - const vector<Shard>* initShards ) - { - // TODO distlock? - // TODO: Race condition if we shard the collection and insert data while we split across - // the non-primary shard. - - vector<BSONObj> splitPoints; - vector<Shard> shards; - - calcInitSplitsAndShards( primary, initPoints, initShards, - &splitPoints, &shards ); - - // this is the first chunk; start the versioning from scratch - ChunkVersion version; - version.incEpoch(); - version.incMajor(); - - log() << "going to create " << splitPoints.size() + 1 << " chunk(s) for: " << _ns - << " using new epoch " << version.epoch() << endl; - - ScopedDbConnection conn(config, 30); - - // Make sure we don't have any chunks that already exist here - unsigned long long existingChunks = - conn->count(ChunkType::ConfigNS, BSON(ChunkType::ns(_ns))); - - uassert( 13449 , str::stream() << "collection " << _ns << " already sharded with " - << existingChunks << " chunks", existingChunks == 0 ); - conn.done(); - - for ( unsigned i=0; i<=splitPoints.size(); i++ ) { - BSONObj min = i == 0 ? _keyPattern.getKeyPattern().globalMin() : splitPoints[i-1]; - BSONObj max = i < splitPoints.size() ? - splitPoints[i] : _keyPattern.getKeyPattern().globalMax(); - - Chunk temp( this , min , max , shards[ i % shards.size() ], version ); - - BSONObjBuilder chunkBuilder; - temp.serialize( chunkBuilder ); - BSONObj chunkObj = chunkBuilder.obj(); - - Status result = clusterUpdate( ChunkType::ConfigNS, - BSON(ChunkType::name(temp.genID())), - chunkObj, - true, // upsert - false, // multi - NULL ); - - version.incMinor(); - - if ( !result.isOK() ) { - string ss = str::stream() << "creating first chunks failed. result: " - << result.reason(); - error() << ss << endl; - msgasserted( 15903 , ss ); - } - } - - _version = ChunkVersion( 0, 0, version.epoch() ); - } - - ChunkPtr ChunkManager::findIntersectingChunk( const BSONObj& shardKey ) const { - { - BSONObj chunkMin; - ChunkPtr chunk; - { - ChunkMap::const_iterator it = _chunkMap.upper_bound( shardKey ); - if (it != _chunkMap.end()) { - chunkMin = it->first; - chunk = it->second; - } - } - - if ( chunk ) { - if ( chunk->containsKey( shardKey ) ){ - return chunk; - } - - PRINT(chunkMin); - PRINT(*chunk); - PRINT( shardKey ); - - reload(); - massert(13141, "Chunk map pointed to incorrect chunk", false); - } - } - - msgasserted( 8070 , - str::stream() << "couldn't find a chunk intersecting: " << shardKey - << " for ns: " << _ns - << " at version: " << _version.toString() - << ", number of chunks: " << _chunkMap.size() ); - } - - void ChunkManager::getShardsForQuery( set<Shard>& shards , const BSONObj& query ) const { - CanonicalQuery* canonicalQuery = NULL; - Status status = CanonicalQuery::canonicalize( - _ns, - query, - &canonicalQuery, - WhereCallbackNoop()); - - boost::scoped_ptr<CanonicalQuery> canonicalQueryPtr(canonicalQuery); - - uassert(status.code(), status.reason(), status.isOK()); - - // Query validation - if (QueryPlannerCommon::hasNode(canonicalQuery->root(), MatchExpression::GEO_NEAR)) { - uassert(13501, "use geoNear command rather than $near query", false); - } - - // Transforms query into bounds for each field in the shard key - // for example : - // Key { a: 1, b: 1 }, - // Query { a : { $gte : 1, $lt : 2 }, - // b : { $gte : 3, $lt : 4 } } - // => Bounds { a : [1, 2), b : [3, 4) } - IndexBounds bounds = getIndexBoundsForQuery(_keyPattern.toBSON(), canonicalQuery); - - // Transforms bounds for each shard key field into full shard key ranges - // for example : - // Key { a : 1, b : 1 } - // Bounds { a : [1, 2), b : [3, 4) } - // => Ranges { a : 1, b : 3 } => { a : 2, b : 4 } - BoundList ranges = _keyPattern.flattenBounds(bounds); - - for (BoundList::const_iterator it = ranges.begin(); it != ranges.end(); - ++it) { - - getShardsForRange(shards, it->first /*min*/, it->second /*max*/); - - // once we know we need to visit all shards no need to keep looping - if( shards.size() == _shards.size() ) break; - } - - // SERVER-4914 Some clients of getShardsForQuery() assume at least one shard will be - // returned. For now, we satisfy that assumption by adding a shard with no matches rather - // than return an empty set of shards. - if ( shards.empty() ) { - massert( 16068, "no chunk ranges available", !_chunkRanges.ranges().empty() ); - shards.insert( _chunkRanges.ranges().begin()->second->getShard() ); - } - } - - void ChunkManager::getShardsForRange( set<Shard>& shards, - const BSONObj& min, - const BSONObj& max ) const { - - ChunkRangeMap::const_iterator it = _chunkRanges.upper_bound(min); - ChunkRangeMap::const_iterator end = _chunkRanges.upper_bound(max); - - massert( 13507 , str::stream() << "no chunks found between bounds " << min << " and " << max , it != _chunkRanges.ranges().end() ); - - if( end != _chunkRanges.ranges().end() ) ++end; - - for( ; it != end; ++it ){ - shards.insert(it->second->getShard()); - - // once we know we need to visit all shards no need to keep looping - if (shards.size() == _shards.size()) break; - } - } - - void ChunkManager::getAllShards( set<Shard>& all ) const { - all.insert(_shards.begin(), _shards.end()); - } - - IndexBounds ChunkManager::getIndexBoundsForQuery(const BSONObj& key, const CanonicalQuery* canonicalQuery) { - // $text is not allowed in planning since we don't have text index on mongos. - // - // TODO: Treat $text query as a no-op in planning. So with shard key {a: 1}, - // the query { a: 2, $text: { ... } } will only target to {a: 2}. - if (QueryPlannerCommon::hasNode(canonicalQuery->root(), MatchExpression::TEXT)) { - IndexBounds bounds; - IndexBoundsBuilder::allValuesBounds(key, &bounds); // [minKey, maxKey] - return bounds; - } - - // Consider shard key as an index - string accessMethod = IndexNames::findPluginName(key); - dassert(accessMethod == IndexNames::BTREE || accessMethod == IndexNames::HASHED); - - // Use query framework to generate index bounds - QueryPlannerParams plannerParams; - // Must use "shard key" index - plannerParams.options = QueryPlannerParams::NO_TABLE_SCAN; - IndexEntry indexEntry(key, accessMethod, false /* multiKey */, false /* sparse */, - false /* unique */, "shardkey", BSONObj()); - plannerParams.indices.push_back(indexEntry); - - OwnedPointerVector<QuerySolution> solutions; - Status status = QueryPlanner::plan(*canonicalQuery, plannerParams, &solutions.mutableVector()); - uassert(status.code(), status.reason(), status.isOK()); - - IndexBounds bounds; - - for (vector<QuerySolution*>::const_iterator it = solutions.begin(); - bounds.size() == 0 && it != solutions.end(); it++) { - // Try next solution if we failed to generate index bounds, i.e. bounds.size() == 0 - bounds = collapseQuerySolution((*it)->root.get()); - } - - if (bounds.size() == 0) { - // We cannot plan the query without collection scan, so target to all shards. - IndexBoundsBuilder::allValuesBounds(key, &bounds); // [minKey, maxKey] - } - return bounds; - } - - IndexBounds ChunkManager::collapseQuerySolution( const QuerySolutionNode* node ) { - if (node->children.size() == 0) { - invariant(node->getType() == STAGE_IXSCAN); - - const IndexScanNode* ixNode = static_cast<const IndexScanNode*>( node ); - return ixNode->bounds; - } - - if (node->children.size() == 1) { - // e.g. FETCH -> IXSCAN - return collapseQuerySolution( node->children.front() ); - } - - // children.size() > 1, assert it's OR / SORT_MERGE. - if ( node->getType() != STAGE_OR && node->getType() != STAGE_SORT_MERGE ) { - // Unexpected node. We should never reach here. - error() << "could not generate index bounds on query solution tree: " << node->toString(); - dassert(false); // We'd like to know this error in testing. - - // Bail out with all shards in production, since this isn't a fatal error. - return IndexBounds(); - } - - IndexBounds bounds; - for ( vector<QuerySolutionNode*>::const_iterator it = node->children.begin(); - it != node->children.end(); it++ ) - { - // The first branch under OR - if ( it == node->children.begin() ) { - invariant(bounds.size() == 0); - bounds = collapseQuerySolution( *it ); - if (bounds.size() == 0) { // Got unexpected node in query solution tree - return IndexBounds(); - } - continue; - } - - IndexBounds childBounds = collapseQuerySolution( *it ); - if (childBounds.size() == 0) { // Got unexpected node in query solution tree - return IndexBounds(); - } - - invariant(childBounds.size() == bounds.size()); - for ( size_t i = 0; i < bounds.size(); i++ ) { - bounds.fields[i].intervals.insert( bounds.fields[i].intervals.end(), - childBounds.fields[i].intervals.begin(), - childBounds.fields[i].intervals.end() ); - } - } - - for ( size_t i = 0; i < bounds.size(); i++ ) { - IndexBoundsBuilder::unionize( &bounds.fields[i] ); - } - - return bounds; - } - - bool ChunkManager::compatibleWith(const ChunkManager& other, const string& shardName) const { - // Return true if the shard version is the same in the two chunk managers - // TODO: This doesn't need to be so strong, just major vs - return other.getVersion(shardName).equals(getVersion(shardName)); - } - - void ChunkManager::drop( ChunkManagerPtr me ) const { - scoped_lock lk( _mutex ); - - configServer.logChange( "dropCollection.start" , _ns , BSONObj() ); - - DistributedLock nsLock( ConnectionString( configServer.modelServer(), - ConnectionString::SYNC ), - _ns ); - - dist_lock_try dlk; - try{ - dlk = dist_lock_try( &nsLock , "drop" ); - } - catch( LockException& e ){ - uassert( 14022, str::stream() << "Error locking distributed lock for chunk drop." << causedBy( e ), false); - } - - uassert( 13331 , "collection's metadata is undergoing changes. Please try again." , dlk.got() ); - - uassert(10174, "config servers not all up", configServer.allUp(false)); - - set<Shard> seen; - - LOG(1) << "ChunkManager::drop : " << _ns << endl; - - // lock all shards so no one can do a split/migrate - for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ) { - ChunkPtr c = i->second; - seen.insert( c->getShard() ); - } - - LOG(1) << "ChunkManager::drop : " << _ns << "\t all locked" << endl; - - map<string,BSONObj> errors; - // delete data from mongod - for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ) { - ScopedDbConnection conn(i->getConnString()); - BSONObj info; - if ( !conn->dropCollection( _ns, &info ) ) { - errors[ i->getConnString() ] = info; - } - conn.done(); - } - if ( !errors.empty() ) { - stringstream ss; - ss << "Dropping collection failed on the following hosts: "; - for ( map<string,BSONObj>::const_iterator it = errors.begin(); it != errors.end(); ) { - ss << it->first << ": " << it->second; - ++it; - if ( it != errors.end() ) { - ss << ", "; - } - } - uasserted( 16338, ss.str() ); - } - - LOG(1) << "ChunkManager::drop : " << _ns << "\t removed shard data" << endl; - - // remove chunk data - Status result = clusterDelete( ChunkType::ConfigNS, - BSON(ChunkType::ns(_ns)), - 0 /* limit */, - NULL ); - - // Make sure we're dropped on the config - if ( !result.isOK() ) { - uasserted( 17001, str::stream() << "could not drop chunks for " << _ns - << ": " << result.reason() ); - } - - LOG(1) << "ChunkManager::drop : " << _ns << "\t removed chunk data" << endl; - - for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ) { - ScopedDbConnection conn(i->getConnString()); - BSONObj res; - - // this is horrible - // we need a special command for dropping on the d side - // this hack works for the moment - - if (!setShardVersion(conn.conn(), - _ns, - configServer.modelServer(), - ChunkVersion(0, 0, OID()), - NULL, - true, - res)) { - - uasserted(8071, str::stream() << "cleaning up after drop failed: " << res); - } - - conn->simpleCommand( "admin", 0, "unsetSharding" ); - conn.done(); - } - - LOG(1) << "ChunkManager::drop : " << _ns << "\t DONE" << endl; - configServer.logChange( "dropCollection" , _ns , BSONObj() ); - } - - ChunkVersion ChunkManager::getVersion(const std::string& shardName) const { - ShardVersionMap::const_iterator i = _shardVersions.find(shardName); - if ( i == _shardVersions.end() ) { - // Shards without explicitly tracked shard versions (meaning they have - // no chunks) always have a version of (0, 0, epoch). Note this is - // *different* from the dropped chunk version of (0, 0, OID(000...)). - // See s/chunk_version.h. - return ChunkVersion( 0, 0, _version.epoch() ); - } - return i->second; - } - - ChunkVersion ChunkManager::getVersion() const { - return _version; - } - - void ChunkManager::getInfo( BSONObjBuilder& b ) const { - b.append(CollectionType::keyPattern(), _keyPattern.toBSON()); - b.appendBool(CollectionType::unique(), _unique); - _version.addEpochToBSON(b, CollectionType::DEPRECATED_lastmod()); - } - - string ChunkManager::toString() const { - stringstream ss; - ss << "ChunkManager: " << _ns << " key:" << _keyPattern.toString() << '\n'; - for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ) { - const ChunkPtr c = i->second; - ss << "\t" << c->toString() << '\n'; - } - return ss.str(); - } - - void ChunkRangeManager::assertValid() const { - if (_ranges.empty()) - return; - - try { - // No Nulls - for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it) { - verify(it->second); - } - - // Check endpoints - verify(allOfType(MinKey, _ranges.begin()->second->getMin())); - verify(allOfType(MaxKey, boost::prior(_ranges.end())->second->getMax())); - - // Make sure there are no gaps or overlaps - for (ChunkRangeMap::const_iterator it=boost::next(_ranges.begin()), end=_ranges.end(); it != end; ++it) { - ChunkRangeMap::const_iterator last = boost::prior(it); - verify(it->second->getMin() == last->second->getMax()); - } - - // Check Map keys - for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it) { - verify(it->first == it->second->getMax()); - } - - // Make sure we match the original chunks - const ChunkMap chunks = _ranges.begin()->second->getManager()->_chunkMap; - for ( ChunkMap::const_iterator i=chunks.begin(); i!=chunks.end(); ++i ) { - const ChunkPtr chunk = i->second; - - ChunkRangeMap::const_iterator min = _ranges.upper_bound(chunk->getMin()); - ChunkRangeMap::const_iterator max = _ranges.lower_bound(chunk->getMax()); - - verify(min != _ranges.end()); - verify(max != _ranges.end()); - verify(min == max); - verify(min->second->getShard() == chunk->getShard()); - verify(min->second->containsKey( chunk->getMin() )); - verify(min->second->containsKey( chunk->getMax() ) || (min->second->getMax() == chunk->getMax())); - } - - } - catch (...) { - error() << "\t invalid ChunkRangeMap! printing ranges:" << endl; - - for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it) - cout << it->first << ": " << *it->second << endl; - - throw; - } - } - - void ChunkRangeManager::reloadAll(const ChunkMap& chunks) { - _ranges.clear(); - _insertRange(chunks.begin(), chunks.end()); - - DEV assertValid(); - } - - void ChunkRangeManager::_insertRange(ChunkMap::const_iterator begin, const ChunkMap::const_iterator end) { - while (begin != end) { - ChunkMap::const_iterator first = begin; - Shard shard = first->second->getShard(); - while (begin != end && (begin->second->getShard() == shard)) - ++begin; - - shared_ptr<ChunkRange> cr (new ChunkRange(first, begin)); - _ranges[cr->getMax()] = cr; - } - } - - int ChunkManager::getCurrentDesiredChunkSize() const { - // split faster in early chunks helps spread out an initial load better - const int minChunkSize = 1 << 20; // 1 MBytes - - int splitThreshold = Chunk::MaxChunkSize; - - int nc = numChunks(); - - if ( nc <= 1 ) { - return 1024; - } - else if ( nc < 3 ) { - return minChunkSize / 2; - } - else if ( nc < 10 ) { - splitThreshold = max( splitThreshold / 4 , minChunkSize ); - } - else if ( nc < 20 ) { - splitThreshold = max( splitThreshold / 2 , minChunkSize ); - } - - return splitThreshold; - } - } // namespace mongo diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index a8101136160..cead91be27b 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -30,12 +30,9 @@ #pragma once -#include <boost/next_prior.hpp> #include <boost/shared_ptr.hpp> -#include "mongo/base/string_data.h" #include "mongo/db/keypattern.h" -#include "mongo/db/query/query_solution.h" #include "mongo/platform/atomic_word.h" #include "mongo/s/chunk_version.h" #include "mongo/s/shard.h" @@ -46,21 +43,9 @@ namespace mongo { - class DBConfig; - class Chunk; - class ChunkRange; class ChunkManager; - class ChunkObjUnitTest; struct WriteConcernOptions; - typedef boost::shared_ptr<const Chunk> ChunkPtr; - - // key is max for each Chunk or ChunkRange - typedef std::map<BSONObj,ChunkPtr,BSONObjCmp> ChunkMap; - typedef std::map<BSONObj,boost::shared_ptr<ChunkRange>,BSONObjCmp> ChunkRangeMap; - - typedef boost::shared_ptr<ChunkManager> ChunkManagerPtr; - /** config.chunks { ns : "alleyinsider.fs.chunks" , min : {} , max : {} , server : "localhost:30001" } @@ -105,10 +90,6 @@ namespace mongo { const BSONObj& getMin() const { return _min; } const BSONObj& getMax() const { return _max; } - // if min/max key is pos/neg infinity - bool minIsInf() const; - bool maxIsInf() const; - // Returns true if this chunk contains the given shard key, and false otherwise // // Note: this function takes an extracted *key*, not an original document @@ -259,10 +240,12 @@ namespace mongo { private: + // if min/max key is pos/neg infinity + bool _minIsInf() const; + bool _maxIsInf() const; - // main shard info - - const ChunkManager * _manager; + // The chunk manager, which owns this chunk. Not owned by the chunk. + const ChunkManager* _manager; BSONObj _min; BSONObj _max; @@ -301,268 +284,6 @@ namespace mongo { static int mkDataWritten(); }; - class ChunkRange { - public: - const ChunkManager* getManager() const { return _manager; } - Shard getShard() const { return _shard; } - - const BSONObj& getMin() const { return _min; } - const BSONObj& getMax() const { return _max; } - - // clones of Chunk methods - // Returns true if this ChunkRange contains the given shard key, and false otherwise - // - // Note: this function takes an extracted *key*, not an original document - // (the point may be computed by, say, hashing a given field or projecting - // to a subset of fields). - bool containsKey( const BSONObj& shardKey ) const; - - ChunkRange(ChunkMap::const_iterator begin, const ChunkMap::const_iterator end) - : _manager(begin->second->getManager()) - , _shard(begin->second->getShard()) - , _min(begin->second->getMin()) - , _max(boost::prior(end)->second->getMax()) { - verify( begin != end ); - - DEV while (begin != end) { - verify(begin->second->getManager() == _manager); - verify(begin->second->getShard() == _shard); - ++begin; - } - } - - // Merge min and max (must be adjacent ranges) - ChunkRange(const ChunkRange& min, const ChunkRange& max) - : _manager(min.getManager()) - , _shard(min.getShard()) - , _min(min.getMin()) - , _max(max.getMax()) { - verify(min.getShard() == max.getShard()); - verify(min.getManager() == max.getManager()); - verify(min.getMax() == max.getMin()); - } - - friend std::ostream& operator<<(std::ostream& out, const ChunkRange& cr) { - return (out << "ChunkRange(min=" << cr._min << ", max=" << cr._max << ", shard=" << cr._shard <<")"); - } - - private: - const ChunkManager* _manager; - const Shard _shard; - const BSONObj _min; - const BSONObj _max; - }; - - - class ChunkRangeManager { - public: - const ChunkRangeMap& ranges() const { return _ranges; } - - void clear() { _ranges.clear(); } - - void reloadAll(const ChunkMap& chunks); - - // Slow operation -- wrap with DEV - void assertValid() const; - - ChunkRangeMap::const_iterator upper_bound(const BSONObj& o) const { return _ranges.upper_bound(o); } - ChunkRangeMap::const_iterator lower_bound(const BSONObj& o) const { return _ranges.lower_bound(o); } - - private: - // assumes nothing in this range exists in _ranges - void _insertRange(ChunkMap::const_iterator begin, const ChunkMap::const_iterator end); - - ChunkRangeMap _ranges; - }; - - /* config.sharding - { ns: 'alleyinsider.fs.chunks' , - key: { ts : 1 } , - shards: [ { min: 1, max: 100, server: a } , { min: 101, max: 200 , server : b } ] - } - */ - class ChunkManager { - public: - typedef std::map<std::string, ChunkVersion> ShardVersionMap; - - // Loads a new chunk manager from a collection document - ChunkManager( const BSONObj& collDoc ); - - // Creates an empty chunk manager for the namespace - ChunkManager( const std::string& ns, const ShardKeyPattern& pattern, bool unique ); - - std::string getns() const { return _ns; } - - const ShardKeyPattern& getShardKeyPattern() const { return _keyPattern; } - - bool isUnique() const { return _unique; } - - /** - * this is just an increasing number of how many ChunkManagers we have so we know if something has been updated - */ - unsigned long long getSequenceNumber() const { return _sequenceNumber; } - - // - // After constructor is invoked, we need to call loadExistingRanges. If this is a new - // sharded collection, we can call createFirstChunks first. - // - - // Creates new chunks based on info in chunk manager - void createFirstChunks( const std::string& config, - const Shard& primary, - const std::vector<BSONObj>* initPoints, - const std::vector<Shard>* initShards ); - - // Loads existing ranges based on info in chunk manager - void loadExistingRanges(const std::string& config, const ChunkManager* oldManager); - - - // Helpers for load - void calcInitSplitsAndShards( const Shard& primary, - const std::vector<BSONObj>* initPoints, - const std::vector<Shard>* initShards, - std::vector<BSONObj>* splitPoints, - std::vector<Shard>* shards ) const; - - // - // Methods to use once loaded / created - // - - int numChunks() const { return _chunkMap.size(); } - - /** - * Given a key that has been extracted from a document, returns the - * chunk that contains that key. - * - * For instance, to locate the chunk for document {a : "foo" , b : "bar"} - * when the shard key is {a : "hashed"}, you can call - * findIntersectingChunk() on {a : hash("foo") } - */ - ChunkPtr findIntersectingChunk( const BSONObj& shardKey ) const; - - void getShardsForQuery( std::set<Shard>& shards , const BSONObj& query ) const; - void getAllShards( std::set<Shard>& all ) const; - /** @param shards set to the shards covered by the interval [min, max], see SERVER-4791 */ - void getShardsForRange( std::set<Shard>& shards, const BSONObj& min, const BSONObj& max ) const; - - // Transforms query into bounds for each field in the shard key - // for example : - // Key { a: 1, b: 1 }, - // Query { a : { $gte : 1, $lt : 2 }, - // b : { $gte : 3, $lt : 4 } } - // => Bounds { a : [1, 2), b : [3, 4) } - static IndexBounds getIndexBoundsForQuery(const BSONObj& key, const CanonicalQuery* canonicalQuery); - - // Collapse query solution tree. - // - // If it has OR node, the result could be a superset of the index bounds generated. - // Since to give a single IndexBounds, this gives the union of bounds on each field. - // for example: - // OR: { a: (0, 1), b: (0, 1) }, - // { a: (2, 3), b: (2, 3) } - // => { a: (0, 1), (2, 3), b: (0, 1), (2, 3) } - static IndexBounds collapseQuerySolution( const QuerySolutionNode* node ); - - const ChunkMap& getChunkMap() const { return _chunkMap; } - - /** - * Returns true if, for this shard, the chunks are identical in both chunk managers - */ - bool compatibleWith(const ChunkManager& other, const std::string& shard) const; - - std::string toString() const; - - ChunkVersion getVersion(const std::string& shardName) const; - ChunkVersion getVersion() const; - - void getInfo( BSONObjBuilder& b ) const; - - /** - * @param me - so i don't get deleted before i'm done - */ - void drop( ChunkManagerPtr me ) const; - - void _printChunks() const; - - int getCurrentDesiredChunkSize() const; - - ChunkManagerPtr reload(bool force=true) const; // doesn't modify self! - - void markMinorForReload( ChunkVersion majorVersion ) const; - void getMarkedMinorVersions( std::set<ChunkVersion>& minorVersions ) const; - - private: - - // helpers for loading - - // returns true if load was consistent - bool _load(const std::string& config, - ChunkMap& chunks, - std::set<Shard>& shards, - ShardVersionMap& shardVersions, - const ChunkManager* oldManager); - static bool _isValid(const ChunkMap& chunks); - - // end helpers - - // All members should be const for thread-safety - const std::string _ns; - const ShardKeyPattern _keyPattern; - const bool _unique; - - const ChunkMap _chunkMap; - const ChunkRangeManager _chunkRanges; - - const std::set<Shard> _shards; - - const ShardVersionMap _shardVersions; // max version per shard - - // max version of any chunk - ChunkVersion _version; - - mutable mutex _mutex; // only used with _nsLock - - const unsigned long long _sequenceNumber; - - // - // Split Heuristic info - // - - - class SplitHeuristics { - public: - - SplitHeuristics() - : _splitTickets(maxParallelSplits) { - } - - TicketHolder _splitTickets; - - // Test whether we should split once data * splitTestFactor > chunkSize (approximately) - static const int splitTestFactor = 5; - // Maximum number of parallel threads requesting a split - static const int maxParallelSplits = 5; - - // The idea here is that we're over-aggressive on split testing by a factor of - // splitTestFactor, so we can safely wait until we get to splitTestFactor invalid splits - // before changing. Unfortunately, we also potentially over-request the splits by a - // factor of maxParallelSplits, but since the factors are identical it works out - // (for now) for parallel or sequential oversplitting. - // TODO: Make splitting a separate thread with notifications? - static const int staleMinorReloadThreshold = maxParallelSplits; - }; - - mutable SplitHeuristics _splitHeuristics; - - // - // End split heuristics - // - - friend class Chunk; - friend class ChunkRangeManager; // only needed for CRM::assertValid() - static AtomicUInt32 NextSequenceNumber; - - friend class TestableChunkManager; - }; + typedef boost::shared_ptr<const Chunk> ChunkPtr; } // namespace mongo diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp new file mode 100644 index 00000000000..0270fbe2ba6 --- /dev/null +++ b/src/mongo/s/chunk_manager.cpp @@ -0,0 +1,931 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/chunk_manager.h" + +#include <map> +#include <set> + +#include "mongo/db/query/index_bounds_builder.h" +#include "mongo/db/query/query_planner.h" +#include "mongo/db/query/query_planner_common.h" +#include "mongo/s/chunk_diff.h" +#include "mongo/s/client/shard_connection.h" +#include "mongo/s/cluster_write.h" +#include "mongo/s/distlock.h" +#include "mongo/s/grid.h" +#include "mongo/s/type_collection.h" +#include "mongo/util/log.h" +#include "mongo/util/timer.h" + +namespace mongo { + + using boost::shared_ptr; + + using std::make_pair; + using std::map; + using std::max; + using std::pair; + using std::set; + using std::string; + using std::vector; + +namespace { + + /** + * This is an adapter so we can use config diffs - mongos and mongod do them slightly + * differently + * + * The mongos adapter here tracks all shards, and stores ranges by (max, Chunk) in the map. + */ + class CMConfigDiffTracker : public ConfigDiffTracker<ChunkPtr, std::string> { + public: + CMConfigDiffTracker( ChunkManager* manager ) : _manager( manager ) { } + + virtual bool isTracked( const BSONObj& chunkDoc ) const { + // Mongos tracks all shards + return true; + } + + virtual BSONObj minFrom( const ChunkPtr& val ) const { + return val.get()->getMin(); + } + + virtual bool isMinKeyIndexed() const { return false; } + + virtual pair<BSONObj,ChunkPtr> rangeFor( const BSONObj& chunkDoc, const BSONObj& min, const BSONObj& max ) const { + ChunkPtr c( new Chunk( _manager, chunkDoc ) ); + return make_pair( max, c ); + } + + virtual string shardFor(const string& hostName) const { + Shard shard = Shard::make(hostName); + return shard.getName(); + } + + private: + ChunkManager* _manager; + }; + + + bool allOfType(BSONType type, const BSONObj& o) { + BSONObjIterator it(o); + while(it.more()) { + if (it.next().type() != type) { + return false; + } + } + return true; + } + +} // namespace + + AtomicUInt32 ChunkManager::NextSequenceNumber(1U); + + ChunkManager::ChunkManager( const string& ns, const ShardKeyPattern& pattern , bool unique ) : + _ns( ns ), + _keyPattern( pattern.getKeyPattern() ), + _unique( unique ), + _chunkRanges(), + _mutex("ChunkManager"), + _sequenceNumber(NextSequenceNumber.addAndFetch(1)) + { + // + // Sets up a chunk manager from new data + // + } + + ChunkManager::ChunkManager( const BSONObj& collDoc ) : + // Need the ns early, to construct the lock + // TODO: Construct lock on demand? Not sure why we need to keep it around + _ns(collDoc[CollectionType::ns()].type() == String ? + collDoc[CollectionType::ns()].String() : + ""), + _keyPattern(collDoc[CollectionType::keyPattern()].type() == Object ? + collDoc[CollectionType::keyPattern()].Obj().getOwned() : + BSONObj()), + _unique(collDoc[CollectionType::unique()].trueValue()), + _chunkRanges(), + _mutex("ChunkManager"), + // The shard versioning mechanism hinges on keeping track of the number of times we reloaded ChunkManager's. + // Increasing this number here will prompt checkShardVersion() to refresh the connection-level versions to + // the most up to date value. + _sequenceNumber(NextSequenceNumber.addAndFetch(1)) + { + + // + // Sets up a chunk manager from an existing sharded collection document + // + + verify( _ns != "" ); + verify( ! _keyPattern.toBSON().isEmpty() ); + + _version = ChunkVersion::fromBSON( collDoc ); + } + + void ChunkManager::loadExistingRanges( const string& config, const ChunkManager* oldManager ) { + int tries = 3; + while (tries--) { + ChunkMap chunkMap; + set<Shard> shards; + ShardVersionMap shardVersions; + Timer t; + + bool success = _load(config, chunkMap, shards, shardVersions, oldManager); + + if( success ){ + { + int ms = t.millis(); + log() << "ChunkManager: time to load chunks for " << _ns << ": " << ms << "ms" + << " sequenceNumber: " << _sequenceNumber + << " version: " << _version.toString() + << " based on: " << + (oldManager ? oldManager->getVersion().toString() : "(empty)");; + } + + // TODO: Merge into diff code above, so we validate in one place + if (_isValid(chunkMap)) { + // These variables are const for thread-safety. Since the + // constructor can only be called from one thread, we don't have + // to worry about that here. + const_cast<ChunkMap&>(_chunkMap).swap(chunkMap); + const_cast<set<Shard>&>(_shards).swap(shards); + const_cast<ShardVersionMap&>(_shardVersions).swap(shardVersions); + const_cast<ChunkRangeManager&>(_chunkRanges).reloadAll(_chunkMap); + + return; + } + } + + if (_chunkMap.size() < 10) { + _printChunks(); + } + + warning() << "ChunkManager loaded an invalid config for " << _ns + << ", trying again"; + + sleepmillis(10 * (3-tries)); + } + + // this will abort construction so we should never have a reference to an invalid config + msgasserted(13282, "Couldn't load a valid config for " + _ns + " after 3 attempts. Please try again."); + } + + bool ChunkManager::_load(const string& config, + ChunkMap& chunkMap, + set<Shard>& shards, + ShardVersionMap& shardVersions, + const ChunkManager* oldManager) + { + + // Reset the max version, but not the epoch, when we aren't loading from the oldManager + _version = ChunkVersion( 0, 0, _version.epoch() ); + + // If we have a previous version of the ChunkManager to work from, use that info to reduce + // our config query + if( oldManager && oldManager->getVersion().isSet() ){ + + // Get the old max version + _version = oldManager->getVersion(); + // Load a copy of the old versions + shardVersions = oldManager->_shardVersions; + + // Load a copy of the chunk map, replacing the chunk manager with our own + const ChunkMap& oldChunkMap = oldManager->getChunkMap(); + + // Could be v.expensive + // TODO: If chunks were immutable and didn't reference the manager, we could do more + // interesting things here + for( ChunkMap::const_iterator it = oldChunkMap.begin(); it != oldChunkMap.end(); it++ ){ + + ChunkPtr oldC = it->second; + ChunkPtr c( new Chunk( this, oldC->getMin(), + oldC->getMax(), + oldC->getShard(), + oldC->getLastmod() ) ); + + c->setBytesWritten( oldC->getBytesWritten() ); + + chunkMap.insert( make_pair( oldC->getMax(), c ) ); + } + + LOG(2) << "loading chunk manager for collection " << _ns + << " using old chunk manager w/ version " << _version.toString() + << " and " << oldChunkMap.size() << " chunks"; + } + + // Attach a diff tracker for the versioned chunk data + CMConfigDiffTracker differ( this ); + differ.attach( _ns, chunkMap, _version, shardVersions ); + + // Diff tracker should *always* find at least one chunk if collection exists + int diffsApplied = differ.calculateConfigDiff(config); + if( diffsApplied > 0 ){ + + LOG(2) << "loaded " << diffsApplied << " chunks into new chunk manager for " << _ns + << " with version " << _version; + + // Add all the shards we find to the shards set + for( ShardVersionMap::iterator it = shardVersions.begin(); it != shardVersions.end(); it++ ){ + shards.insert( it->first ); + } + + return true; + } + else if( diffsApplied == 0 ){ + + // No chunks were found for the ns + warning() << "no chunks found when reloading " << _ns + << ", previous version was " << _version; + + // Set all our data to empty + chunkMap.clear(); + shardVersions.clear(); + _version = ChunkVersion( 0, 0, OID() ); + + return true; + } + else { // diffsApplied < 0 + + bool allInconsistent = differ.numValidDiffs() == 0; + + if( allInconsistent ){ + // All versions are different, this can be normal + warning() << "major change in chunk information found when reloading " + << _ns << ", previous version was " << _version; + } + else { + // Inconsistent load halfway through (due to yielding cursor during load) + // should be rare + warning() << "inconsistent chunks found when reloading " + << _ns << ", previous version was " << _version + << ", this should be rare"; + } + + // Set all our data to empty to be extra safe + chunkMap.clear(); + shardVersions.clear(); + _version = ChunkVersion( 0, 0, OID() ); + + return allInconsistent; + } + + } + + ChunkManagerPtr ChunkManager::reload(bool force) const { + return grid.getDBConfig(getns())->getChunkManager(getns(), force); + } + + bool ChunkManager::_isValid(const ChunkMap& chunkMap) { +#define ENSURE(x) do { if(!(x)) { log() << "ChunkManager::_isValid failed: " #x; return false; } } while(0) + + if (chunkMap.empty()) + return true; + + // Check endpoints + ENSURE(allOfType(MinKey, chunkMap.begin()->second->getMin())); + ENSURE(allOfType(MaxKey, boost::prior(chunkMap.end())->second->getMax())); + + // Make sure there are no gaps or overlaps + for (ChunkMap::const_iterator it=boost::next(chunkMap.begin()), end=chunkMap.end(); it != end; ++it) { + ChunkMap::const_iterator last = boost::prior(it); + + if (!(it->second->getMin() == last->second->getMax())) { + PRINT(last->second->toString()); + PRINT(it->second->toString()); + PRINT(it->second->getMin()); + PRINT(last->second->getMax()); + } + ENSURE(it->second->getMin() == last->second->getMax()); + } + + return true; + +#undef ENSURE + } + + void ChunkManager::_printChunks() const { + for (ChunkMap::const_iterator it=_chunkMap.begin(), end=_chunkMap.end(); it != end; ++it) { + log() << *it->second ; + } + } + + void ChunkManager::calcInitSplitsAndShards( const Shard& primary, + const vector<BSONObj>* initPoints, + const vector<Shard>* initShards, + vector<BSONObj>* splitPoints, + vector<Shard>* shards ) const + { + verify( _chunkMap.size() == 0 ); + + unsigned long long numObjects = 0; + Chunk c(this, _keyPattern.getKeyPattern().globalMin(), + _keyPattern.getKeyPattern().globalMax(), primary); + + if ( !initPoints || !initPoints->size() ) { + // discover split points + { + // get stats to see if there is any data + ScopedDbConnection shardConn(primary.getConnString()); + + numObjects = shardConn->count( getns() ); + shardConn.done(); + } + + if ( numObjects > 0 ) + c.pickSplitVector( *splitPoints , Chunk::MaxChunkSize ); + + // since docs alread exists, must use primary shard + shards->push_back( primary ); + } else { + // make sure points are unique and ordered + set<BSONObj> orderedPts; + for ( unsigned i = 0; i < initPoints->size(); ++i ) { + BSONObj pt = (*initPoints)[i]; + orderedPts.insert( pt ); + } + for ( set<BSONObj>::iterator it = orderedPts.begin(); it != orderedPts.end(); ++it ) { + splitPoints->push_back( *it ); + } + + if ( !initShards || !initShards->size() ) { + // If not specified, only use the primary shard (note that it's not safe for mongos + // to put initial chunks on other shards without the primary mongod knowing). + shards->push_back( primary ); + } else { + std::copy( initShards->begin() , initShards->end() , std::back_inserter(*shards) ); + } + } + } + + void ChunkManager::createFirstChunks( const string& config, + const Shard& primary, + const vector<BSONObj>* initPoints, + const vector<Shard>* initShards ) + { + // TODO distlock? + // TODO: Race condition if we shard the collection and insert data while we split across + // the non-primary shard. + + vector<BSONObj> splitPoints; + vector<Shard> shards; + + calcInitSplitsAndShards( primary, initPoints, initShards, + &splitPoints, &shards ); + + // this is the first chunk; start the versioning from scratch + ChunkVersion version; + version.incEpoch(); + version.incMajor(); + + log() << "going to create " << splitPoints.size() + 1 << " chunk(s) for: " << _ns + << " using new epoch " << version.epoch() ; + + ScopedDbConnection conn(config, 30); + + // Make sure we don't have any chunks that already exist here + unsigned long long existingChunks = + conn->count(ChunkType::ConfigNS, BSON(ChunkType::ns(_ns))); + + uassert( 13449 , str::stream() << "collection " << _ns << " already sharded with " + << existingChunks << " chunks", existingChunks == 0 ); + conn.done(); + + for ( unsigned i=0; i<=splitPoints.size(); i++ ) { + BSONObj min = i == 0 ? _keyPattern.getKeyPattern().globalMin() : splitPoints[i-1]; + BSONObj max = i < splitPoints.size() ? + splitPoints[i] : _keyPattern.getKeyPattern().globalMax(); + + Chunk temp( this , min , max , shards[ i % shards.size() ], version ); + + BSONObjBuilder chunkBuilder; + temp.serialize( chunkBuilder ); + BSONObj chunkObj = chunkBuilder.obj(); + + Status result = clusterUpdate( ChunkType::ConfigNS, + BSON(ChunkType::name(temp.genID())), + chunkObj, + true, // upsert + false, // multi + NULL ); + + version.incMinor(); + + if ( !result.isOK() ) { + string ss = str::stream() << "creating first chunks failed. result: " + << result.reason(); + error() << ss ; + msgasserted( 15903 , ss ); + } + } + + _version = ChunkVersion( 0, 0, version.epoch() ); + } + + ChunkPtr ChunkManager::findIntersectingChunk( const BSONObj& shardKey ) const { + { + BSONObj chunkMin; + ChunkPtr chunk; + { + ChunkMap::const_iterator it = _chunkMap.upper_bound( shardKey ); + if (it != _chunkMap.end()) { + chunkMin = it->first; + chunk = it->second; + } + } + + if ( chunk ) { + if ( chunk->containsKey( shardKey ) ){ + return chunk; + } + + PRINT(chunkMin); + PRINT(*chunk); + PRINT( shardKey ); + + reload(); + massert(13141, "Chunk map pointed to incorrect chunk", false); + } + } + + msgasserted( 8070 , + str::stream() << "couldn't find a chunk intersecting: " << shardKey + << " for ns: " << _ns + << " at version: " << _version.toString() + << ", number of chunks: " << _chunkMap.size() ); + } + + void ChunkManager::getShardsForQuery( set<Shard>& shards , const BSONObj& query ) const { + CanonicalQuery* canonicalQuery = NULL; + Status status = CanonicalQuery::canonicalize( + _ns, + query, + &canonicalQuery, + WhereCallbackNoop()); + + boost::scoped_ptr<CanonicalQuery> canonicalQueryPtr(canonicalQuery); + + uassert(status.code(), status.reason(), status.isOK()); + + // Query validation + if (QueryPlannerCommon::hasNode(canonicalQuery->root(), MatchExpression::GEO_NEAR)) { + uassert(13501, "use geoNear command rather than $near query", false); + } + + // Transforms query into bounds for each field in the shard key + // for example : + // Key { a: 1, b: 1 }, + // Query { a : { $gte : 1, $lt : 2 }, + // b : { $gte : 3, $lt : 4 } } + // => Bounds { a : [1, 2), b : [3, 4) } + IndexBounds bounds = getIndexBoundsForQuery(_keyPattern.toBSON(), canonicalQuery); + + // Transforms bounds for each shard key field into full shard key ranges + // for example : + // Key { a : 1, b : 1 } + // Bounds { a : [1, 2), b : [3, 4) } + // => Ranges { a : 1, b : 3 } => { a : 2, b : 4 } + BoundList ranges = _keyPattern.flattenBounds(bounds); + + for (BoundList::const_iterator it = ranges.begin(); it != ranges.end(); + ++it) { + + getShardsForRange(shards, it->first /*min*/, it->second /*max*/); + + // once we know we need to visit all shards no need to keep looping + if( shards.size() == _shards.size() ) break; + } + + // SERVER-4914 Some clients of getShardsForQuery() assume at least one shard will be + // returned. For now, we satisfy that assumption by adding a shard with no matches rather + // than return an empty set of shards. + if ( shards.empty() ) { + massert( 16068, "no chunk ranges available", !_chunkRanges.ranges().empty() ); + shards.insert( _chunkRanges.ranges().begin()->second->getShard() ); + } + } + + void ChunkManager::getShardsForRange( set<Shard>& shards, + const BSONObj& min, + const BSONObj& max ) const { + + ChunkRangeMap::const_iterator it = _chunkRanges.upper_bound(min); + ChunkRangeMap::const_iterator end = _chunkRanges.upper_bound(max); + + massert( 13507 , str::stream() << "no chunks found between bounds " << min << " and " << max , it != _chunkRanges.ranges().end() ); + + if( end != _chunkRanges.ranges().end() ) ++end; + + for( ; it != end; ++it ){ + shards.insert(it->second->getShard()); + + // once we know we need to visit all shards no need to keep looping + if (shards.size() == _shards.size()) break; + } + } + + void ChunkManager::getAllShards( set<Shard>& all ) const { + all.insert(_shards.begin(), _shards.end()); + } + + IndexBounds ChunkManager::getIndexBoundsForQuery(const BSONObj& key, const CanonicalQuery* canonicalQuery) { + // $text is not allowed in planning since we don't have text index on mongos. + // + // TODO: Treat $text query as a no-op in planning. So with shard key {a: 1}, + // the query { a: 2, $text: { ... } } will only target to {a: 2}. + if (QueryPlannerCommon::hasNode(canonicalQuery->root(), MatchExpression::TEXT)) { + IndexBounds bounds; + IndexBoundsBuilder::allValuesBounds(key, &bounds); // [minKey, maxKey] + return bounds; + } + + // Consider shard key as an index + string accessMethod = IndexNames::findPluginName(key); + dassert(accessMethod == IndexNames::BTREE || accessMethod == IndexNames::HASHED); + + // Use query framework to generate index bounds + QueryPlannerParams plannerParams; + // Must use "shard key" index + plannerParams.options = QueryPlannerParams::NO_TABLE_SCAN; + IndexEntry indexEntry(key, accessMethod, false /* multiKey */, false /* sparse */, + false /* unique */, "shardkey", BSONObj()); + plannerParams.indices.push_back(indexEntry); + + OwnedPointerVector<QuerySolution> solutions; + Status status = QueryPlanner::plan(*canonicalQuery, plannerParams, &solutions.mutableVector()); + uassert(status.code(), status.reason(), status.isOK()); + + IndexBounds bounds; + + for (vector<QuerySolution*>::const_iterator it = solutions.begin(); + bounds.size() == 0 && it != solutions.end(); it++) { + // Try next solution if we failed to generate index bounds, i.e. bounds.size() == 0 + bounds = collapseQuerySolution((*it)->root.get()); + } + + if (bounds.size() == 0) { + // We cannot plan the query without collection scan, so target to all shards. + IndexBoundsBuilder::allValuesBounds(key, &bounds); // [minKey, maxKey] + } + return bounds; + } + + IndexBounds ChunkManager::collapseQuerySolution( const QuerySolutionNode* node ) { + if (node->children.size() == 0) { + invariant(node->getType() == STAGE_IXSCAN); + + const IndexScanNode* ixNode = static_cast<const IndexScanNode*>( node ); + return ixNode->bounds; + } + + if (node->children.size() == 1) { + // e.g. FETCH -> IXSCAN + return collapseQuerySolution( node->children.front() ); + } + + // children.size() > 1, assert it's OR / SORT_MERGE. + if ( node->getType() != STAGE_OR && node->getType() != STAGE_SORT_MERGE ) { + // Unexpected node. We should never reach here. + error() << "could not generate index bounds on query solution tree: " << node->toString(); + dassert(false); // We'd like to know this error in testing. + + // Bail out with all shards in production, since this isn't a fatal error. + return IndexBounds(); + } + + IndexBounds bounds; + for ( vector<QuerySolutionNode*>::const_iterator it = node->children.begin(); + it != node->children.end(); it++ ) + { + // The first branch under OR + if ( it == node->children.begin() ) { + invariant(bounds.size() == 0); + bounds = collapseQuerySolution( *it ); + if (bounds.size() == 0) { // Got unexpected node in query solution tree + return IndexBounds(); + } + continue; + } + + IndexBounds childBounds = collapseQuerySolution( *it ); + if (childBounds.size() == 0) { // Got unexpected node in query solution tree + return IndexBounds(); + } + + invariant(childBounds.size() == bounds.size()); + for ( size_t i = 0; i < bounds.size(); i++ ) { + bounds.fields[i].intervals.insert( bounds.fields[i].intervals.end(), + childBounds.fields[i].intervals.begin(), + childBounds.fields[i].intervals.end() ); + } + } + + for ( size_t i = 0; i < bounds.size(); i++ ) { + IndexBoundsBuilder::unionize( &bounds.fields[i] ); + } + + return bounds; + } + + bool ChunkManager::compatibleWith(const ChunkManager& other, const string& shardName) const { + // Return true if the shard version is the same in the two chunk managers + // TODO: This doesn't need to be so strong, just major vs + return other.getVersion(shardName).equals(getVersion(shardName)); + } + + void ChunkManager::drop() const { + scoped_lock lk( _mutex ); + + configServer.logChange( "dropCollection.start" , _ns , BSONObj() ); + + DistributedLock nsLock( ConnectionString( configServer.modelServer(), + ConnectionString::SYNC ), + _ns ); + + dist_lock_try dlk; + try{ + dlk = dist_lock_try( &nsLock , "drop" ); + } + catch( LockException& e ){ + uassert( 14022, str::stream() << "Error locking distributed lock for chunk drop." << causedBy( e ), false); + } + + uassert( 13331 , "collection's metadata is undergoing changes. Please try again." , dlk.got() ); + + uassert(10174, "config servers not all up", configServer.allUp(false)); + + set<Shard> seen; + + LOG(1) << "ChunkManager::drop : " << _ns ; + + // lock all shards so no one can do a split/migrate + for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ) { + ChunkPtr c = i->second; + seen.insert( c->getShard() ); + } + + LOG(1) << "ChunkManager::drop : " << _ns << "\t all locked"; + + map<string,BSONObj> errors; + // delete data from mongod + for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ) { + ScopedDbConnection conn(i->getConnString()); + BSONObj info; + if ( !conn->dropCollection( _ns, &info ) ) { + errors[ i->getConnString() ] = info; + } + conn.done(); + } + if ( !errors.empty() ) { + StringBuilder sb; + sb << "Dropping collection failed on the following hosts: "; + + for (map<string, BSONObj>::const_iterator it = errors.begin(); it != errors.end();) { + sb << it->first << ": " << it->second; + ++it; + if (it != errors.end()) { + sb << ", "; + } + } + + uasserted(16338, sb.str()); + } + + LOG(1) << "ChunkManager::drop : " << _ns << "\t removed shard data"; + + // remove chunk data + Status result = clusterDelete( ChunkType::ConfigNS, + BSON(ChunkType::ns(_ns)), + 0 /* limit */, + NULL ); + + // Make sure we're dropped on the config + if ( !result.isOK() ) { + uasserted( 17001, str::stream() << "could not drop chunks for " << _ns + << ": " << result.reason() ); + } + + LOG(1) << "ChunkManager::drop : " << _ns << "\t removed chunk data"; + + for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ) { + ScopedDbConnection conn(i->getConnString()); + BSONObj res; + + // this is horrible + // we need a special command for dropping on the d side + // this hack works for the moment + + if (!setShardVersion(conn.conn(), + _ns, + configServer.modelServer(), + ChunkVersion(0, 0, OID()), + NULL, + true, + res)) { + + uasserted(8071, str::stream() << "cleaning up after drop failed: " << res); + } + + conn->simpleCommand( "admin", 0, "unsetSharding" ); + conn.done(); + } + + LOG(1) << "ChunkManager::drop : " << _ns << "\t DONE"; + configServer.logChange( "dropCollection" , _ns , BSONObj() ); + } + + ChunkVersion ChunkManager::getVersion(const std::string& shardName) const { + ShardVersionMap::const_iterator i = _shardVersions.find(shardName); + if ( i == _shardVersions.end() ) { + // Shards without explicitly tracked shard versions (meaning they have + // no chunks) always have a version of (0, 0, epoch). Note this is + // *different* from the dropped chunk version of (0, 0, OID(000...)). + // See s/chunk_version.h. + return ChunkVersion( 0, 0, _version.epoch() ); + } + return i->second; + } + + ChunkVersion ChunkManager::getVersion() const { + return _version; + } + + void ChunkManager::getInfo( BSONObjBuilder& b ) const { + b.append(CollectionType::keyPattern(), _keyPattern.toBSON()); + b.appendBool(CollectionType::unique(), _unique); + _version.addEpochToBSON(b, CollectionType::DEPRECATED_lastmod()); + } + + string ChunkManager::toString() const { + StringBuilder sb; + sb << "ChunkManager: " << _ns << " key:" << _keyPattern.toString() << '\n'; + + for (ChunkMap::const_iterator i = _chunkMap.begin(); i != _chunkMap.end(); ++i) { + sb << "\t" << i->second->toString() << '\n'; + } + + return sb.str(); + } + + + ChunkRange::ChunkRange(ChunkMap::const_iterator begin, const ChunkMap::const_iterator end) + : _manager(begin->second->getManager()), + _shard(begin->second->getShard()), + _min(begin->second->getMin()), + _max(boost::prior(end)->second->getMax()) { + + invariant(begin != end); + + DEV while (begin != end) { + dassert(begin->second->getManager() == _manager); + dassert(begin->second->getShard() == _shard); + ++begin; + } + } + + ChunkRange::ChunkRange(const ChunkRange& min, const ChunkRange& max) + : _manager(min.getManager()), + _shard(min.getShard()), + _min(min.getMin()), + _max(max.getMax()) { + + invariant(min.getShard() == max.getShard()); + invariant(min.getManager() == max.getManager()); + invariant(min.getMax() == max.getMin()); + } + + string ChunkRange::toString() const { + StringBuilder sb; + sb << "ChunkRange(min=" << _min << ", max=" << _max + << ", shard=" << _shard.toString() << ")"; + + return sb.str(); + } + + + void ChunkRangeManager::assertValid() const { + if (_ranges.empty()) + return; + + try { + // No Nulls + for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it) { + verify(it->second); + } + + // Check endpoints + verify(allOfType(MinKey, _ranges.begin()->second->getMin())); + verify(allOfType(MaxKey, boost::prior(_ranges.end())->second->getMax())); + + // Make sure there are no gaps or overlaps + for (ChunkRangeMap::const_iterator it=boost::next(_ranges.begin()), end=_ranges.end(); it != end; ++it) { + ChunkRangeMap::const_iterator last = boost::prior(it); + verify(it->second->getMin() == last->second->getMax()); + } + + // Check Map keys + for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it) { + verify(it->first == it->second->getMax()); + } + + // Make sure we match the original chunks + const ChunkMap chunks = _ranges.begin()->second->getManager()->_chunkMap; + for ( ChunkMap::const_iterator i=chunks.begin(); i!=chunks.end(); ++i ) { + const ChunkPtr chunk = i->second; + + ChunkRangeMap::const_iterator min = _ranges.upper_bound(chunk->getMin()); + ChunkRangeMap::const_iterator max = _ranges.lower_bound(chunk->getMax()); + + verify(min != _ranges.end()); + verify(max != _ranges.end()); + verify(min == max); + verify(min->second->getShard() == chunk->getShard()); + verify(min->second->containsKey( chunk->getMin() )); + verify(min->second->containsKey( chunk->getMax() ) || (min->second->getMax() == chunk->getMax())); + } + + } + catch (...) { + error() << "\t invalid ChunkRangeMap! printing ranges:"; + + for (ChunkRangeMap::const_iterator it = _ranges.begin(), end = _ranges.end(); it != end; ++it) { + log() << it->first << ": " << it->second->toString(); + } + + throw; + } + } + + void ChunkRangeManager::reloadAll(const ChunkMap& chunks) { + _ranges.clear(); + _insertRange(chunks.begin(), chunks.end()); + + DEV assertValid(); + } + + void ChunkRangeManager::_insertRange(ChunkMap::const_iterator begin, const ChunkMap::const_iterator end) { + while (begin != end) { + ChunkMap::const_iterator first = begin; + Shard shard = first->second->getShard(); + while (begin != end && (begin->second->getShard() == shard)) + ++begin; + + shared_ptr<ChunkRange> cr (new ChunkRange(first, begin)); + _ranges[cr->getMax()] = cr; + } + } + + int ChunkManager::getCurrentDesiredChunkSize() const { + // split faster in early chunks helps spread out an initial load better + const int minChunkSize = 1 << 20; // 1 MBytes + + int splitThreshold = Chunk::MaxChunkSize; + + int nc = numChunks(); + + if ( nc <= 1 ) { + return 1024; + } + else if ( nc < 3 ) { + return minChunkSize / 2; + } + else if ( nc < 10 ) { + splitThreshold = max( splitThreshold / 4 , minChunkSize ); + } + else if ( nc < 20 ) { + splitThreshold = max( splitThreshold / 2 , minChunkSize ); + } + + return splitThreshold; + } + +} // namespace mongo diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h new file mode 100644 index 00000000000..c15fb7dbaa1 --- /dev/null +++ b/src/mongo/s/chunk_manager.h @@ -0,0 +1,293 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/next_prior.hpp> +#include <boost/shared_ptr.hpp> +#include <map> +#include <string> +#include <vector> + +#include "mongo/s/chunk.h" + +namespace mongo { + + class CanonicalQuery; + class ChunkManager; + struct QuerySolutionNode; + + typedef boost::shared_ptr<ChunkManager> ChunkManagerPtr; + + // The key for the map is max for each Chunk or ChunkRange + typedef std::map<BSONObj, boost::shared_ptr<const Chunk>, BSONObjCmp> ChunkMap; + + + class ChunkRange { + public: + ChunkRange(ChunkMap::const_iterator begin, const ChunkMap::const_iterator end); + + // Merge min and max (must be adjacent ranges) + ChunkRange(const ChunkRange& min, const ChunkRange& max); + + const ChunkManager* getManager() const { return _manager; } + Shard getShard() const { return _shard; } + + const BSONObj& getMin() const { return _min; } + const BSONObj& getMax() const { return _max; } + + // clones of Chunk methods + // Returns true if this ChunkRange contains the given shard key, and false otherwise + // + // Note: this function takes an extracted *key*, not an original document + // (the point may be computed by, say, hashing a given field or projecting + // to a subset of fields). + bool containsKey( const BSONObj& shardKey ) const; + + std::string toString() const; + + private: + const ChunkManager* _manager; + const Shard _shard; + const BSONObj _min; + const BSONObj _max; + }; + + typedef std::map<BSONObj, boost::shared_ptr<ChunkRange>, BSONObjCmp> ChunkRangeMap; + + + class ChunkRangeManager { + public: + const ChunkRangeMap& ranges() const { return _ranges; } + + void clear() { _ranges.clear(); } + + void reloadAll(const ChunkMap& chunks); + + // Slow operation -- wrap with DEV + void assertValid() const; + + ChunkRangeMap::const_iterator upper_bound(const BSONObj& o) const { return _ranges.upper_bound(o); } + ChunkRangeMap::const_iterator lower_bound(const BSONObj& o) const { return _ranges.lower_bound(o); } + + private: + // assumes nothing in this range exists in _ranges + void _insertRange(ChunkMap::const_iterator begin, const ChunkMap::const_iterator end); + + ChunkRangeMap _ranges; + }; + + + /* config.sharding + { ns: 'alleyinsider.fs.chunks' , + key: { ts : 1 } , + shards: [ { min: 1, max: 100, server: a } , { min: 101, max: 200 , server : b } ] + } + */ + class ChunkManager { + public: + typedef std::map<std::string, ChunkVersion> ShardVersionMap; + + // Loads a new chunk manager from a collection document + ChunkManager( const BSONObj& collDoc ); + + // Creates an empty chunk manager for the namespace + ChunkManager( const std::string& ns, const ShardKeyPattern& pattern, bool unique ); + + std::string getns() const { return _ns; } + + const ShardKeyPattern& getShardKeyPattern() const { return _keyPattern; } + + bool isUnique() const { return _unique; } + + /** + * this is just an increasing number of how many ChunkManagers we have so we know if something has been updated + */ + unsigned long long getSequenceNumber() const { return _sequenceNumber; } + + // + // After constructor is invoked, we need to call loadExistingRanges. If this is a new + // sharded collection, we can call createFirstChunks first. + // + + // Creates new chunks based on info in chunk manager + void createFirstChunks( const std::string& config, + const Shard& primary, + const std::vector<BSONObj>* initPoints, + const std::vector<Shard>* initShards ); + + // Loads existing ranges based on info in chunk manager + void loadExistingRanges(const std::string& config, const ChunkManager* oldManager); + + + // Helpers for load + void calcInitSplitsAndShards( const Shard& primary, + const std::vector<BSONObj>* initPoints, + const std::vector<Shard>* initShards, + std::vector<BSONObj>* splitPoints, + std::vector<Shard>* shards ) const; + + // + // Methods to use once loaded / created + // + + int numChunks() const { return _chunkMap.size(); } + + /** + * Given a key that has been extracted from a document, returns the + * chunk that contains that key. + * + * For instance, to locate the chunk for document {a : "foo" , b : "bar"} + * when the shard key is {a : "hashed"}, you can call + * findIntersectingChunk() on {a : hash("foo") } + */ + ChunkPtr findIntersectingChunk( const BSONObj& shardKey ) const; + + void getShardsForQuery( std::set<Shard>& shards , const BSONObj& query ) const; + void getAllShards( std::set<Shard>& all ) const; + /** @param shards set to the shards covered by the interval [min, max], see SERVER-4791 */ + void getShardsForRange( std::set<Shard>& shards, const BSONObj& min, const BSONObj& max ) const; + + // Transforms query into bounds for each field in the shard key + // for example : + // Key { a: 1, b: 1 }, + // Query { a : { $gte : 1, $lt : 2 }, + // b : { $gte : 3, $lt : 4 } } + // => Bounds { a : [1, 2), b : [3, 4) } + static IndexBounds getIndexBoundsForQuery(const BSONObj& key, const CanonicalQuery* canonicalQuery); + + // Collapse query solution tree. + // + // If it has OR node, the result could be a superset of the index bounds generated. + // Since to give a single IndexBounds, this gives the union of bounds on each field. + // for example: + // OR: { a: (0, 1), b: (0, 1) }, + // { a: (2, 3), b: (2, 3) } + // => { a: (0, 1), (2, 3), b: (0, 1), (2, 3) } + static IndexBounds collapseQuerySolution( const QuerySolutionNode* node ); + + const ChunkMap& getChunkMap() const { return _chunkMap; } + + /** + * Returns true if, for this shard, the chunks are identical in both chunk managers + */ + bool compatibleWith(const ChunkManager& other, const std::string& shard) const; + + std::string toString() const; + + ChunkVersion getVersion(const std::string& shardName) const; + ChunkVersion getVersion() const; + + void getInfo( BSONObjBuilder& b ) const; + + void drop() const; + + void _printChunks() const; + + int getCurrentDesiredChunkSize() const; + + ChunkManagerPtr reload(bool force=true) const; // doesn't modify self! + + void markMinorForReload( ChunkVersion majorVersion ) const; + void getMarkedMinorVersions( std::set<ChunkVersion>& minorVersions ) const; + + private: + + // helpers for loading + + // returns true if load was consistent + bool _load(const std::string& config, + ChunkMap& chunks, + std::set<Shard>& shards, + ShardVersionMap& shardVersions, + const ChunkManager* oldManager); + static bool _isValid(const ChunkMap& chunks); + + // end helpers + + // All members should be const for thread-safety + const std::string _ns; + const ShardKeyPattern _keyPattern; + const bool _unique; + + const ChunkMap _chunkMap; + const ChunkRangeManager _chunkRanges; + + const std::set<Shard> _shards; + + const ShardVersionMap _shardVersions; // max version per shard + + // max version of any chunk + ChunkVersion _version; + + mutable mutex _mutex; // only used with _nsLock + + const unsigned long long _sequenceNumber; + + // + // Split Heuristic info + // + + + class SplitHeuristics { + public: + + SplitHeuristics() + : _splitTickets(maxParallelSplits) { + } + + TicketHolder _splitTickets; + + // Test whether we should split once data * splitTestFactor > chunkSize (approximately) + static const int splitTestFactor = 5; + // Maximum number of parallel threads requesting a split + static const int maxParallelSplits = 5; + + // The idea here is that we're over-aggressive on split testing by a factor of + // splitTestFactor, so we can safely wait until we get to splitTestFactor invalid splits + // before changing. Unfortunately, we also potentially over-request the splits by a + // factor of maxParallelSplits, but since the factors are identical it works out + // (for now) for parallel or sequential oversplitting. + // TODO: Make splitting a separate thread with notifications? + static const int staleMinorReloadThreshold = maxParallelSplits; + }; + + mutable SplitHeuristics _splitHeuristics; + + // + // End split heuristics + // + + friend class Chunk; + friend class ChunkRangeManager; // only needed for CRM::assertValid() + static AtomicUInt32 NextSequenceNumber; + + friend class TestableChunkManager; + }; + +} // namespace mongo diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp index 1981d9dfb6c..553b509a17c 100644 --- a/src/mongo/s/chunk_manager_targeter.cpp +++ b/src/mongo/s/chunk_manager_targeter.cpp @@ -28,8 +28,11 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding +#include "mongo/platform/basic.h" + #include "mongo/s/chunk_manager_targeter.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" @@ -37,7 +40,6 @@ namespace mongo { - using std::endl; using std::map; using std::set; using std::string; @@ -45,21 +47,249 @@ namespace mongo { using mongoutils::str::stream; +namespace { + + enum UpdateType { + UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown + }; + + enum CompareResult { + CompareResult_Unknown, CompareResult_GTE, CompareResult_LT + }; + + const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1)); + + // To match legacy reload behavior, we have to backoff on config reload per-thread + // TODO: Centralize this behavior better by refactoring config reload in mongos + boost::thread_specific_ptr<Backoff> perThreadBackoff; + const int maxWaitMillis = 500; + /** * Helper to get the DBConfigPtr object in an exception-safe way. */ - static bool getDBConfigSafe( StringData db, DBConfigPtr& config, string* errMsg ) { + bool getDBConfigSafe(StringData db, DBConfigPtr& config, string* errMsg) { try { - config = grid.getDBConfig( db, true ); - if ( !config ) *errMsg = stream() << "could not load or create database " << db; + config = grid.getDBConfig(db, true); + if (config) { + return true; + } + + *errMsg = stream() << "could not load or create database " << db; } - catch ( const DBException& ex ) { + catch (const DBException& ex) { *errMsg = ex.toString(); } - return config.get(); + return false; + } + + /** + * There are two styles of update expressions: + * + * Replacement style: coll.update({ x : 1 }, { y : 2 }) + * OpStyle: coll.update({ x : 1 }, { $set : { y : 2 } }) + */ + UpdateType getUpdateExprType(const BSONObj& updateExpr) { + // Empty update is replacement-style, by default + if (updateExpr.isEmpty()) { + return UpdateType_Replacement; + } + + UpdateType updateType = UpdateType_Unknown; + + BSONObjIterator it(updateExpr); + while (it.more()) { + BSONElement next = it.next(); + + if (next.fieldName()[0] == '$') { + if (updateType == UpdateType_Unknown) { + updateType = UpdateType_OpStyle; + } + else if (updateType == UpdateType_Replacement) { + return UpdateType_Unknown; + } + } + else { + if (updateType == UpdateType_Unknown) { + updateType = UpdateType_Replacement; + } + else if (updateType == UpdateType_OpStyle) { + return UpdateType_Unknown; + } + } + } + + return updateType; + } + + /** + * This returns "does the query have an _id field" and "is the _id field querying for a direct + * value like _id : 3 and not _id : { $gt : 3 }" + * + * Ex: { _id : 1 } => true + * { foo : <anything>, _id : 1 } => true + * { _id : { $lt : 30 } } => false + * { foo : <anything> } => false + */ + bool isExactIdQuery(const BSONObj& query) { + StatusWith<BSONObj> status = virtualIdShardKey.extractShardKeyFromQuery(query); + if (!status.isOK()) { + return false; + } + + return !status.getValue()["_id"].eoo(); + } + + void refreshBackoff() { + if (!perThreadBackoff.get()) { + perThreadBackoff.reset(new Backoff(maxWaitMillis, maxWaitMillis * 2)); + } + + perThreadBackoff.get()->nextSleepMillis(); + } + + + // + // Utilities to compare shard versions + // + + /** + * Returns the relationship of two shard versions. Shard versions of a collection that has not + * been dropped and recreated and where there is at least one chunk on a shard are comparable, + * otherwise the result is ambiguous. + */ + CompareResult compareShardVersions(const ChunkVersion& shardVersionA, + const ChunkVersion& shardVersionB) { + + // Collection may have been dropped + if (!shardVersionA.hasEqualEpoch(shardVersionB)) { + return CompareResult_Unknown; + } + + // Zero shard versions are only comparable to themselves + if (!shardVersionA.isSet() || !shardVersionB.isSet()) { + // If both are zero... + if (!shardVersionA.isSet() && !shardVersionB.isSet()) { + return CompareResult_GTE; + } + + return CompareResult_Unknown; + } + + if (shardVersionA < shardVersionB) { + return CompareResult_LT; + } + + else return CompareResult_GTE; + } + + ChunkVersion getShardVersion(StringData shardName, + const ChunkManager* manager, + const Shard* primary) { + + dassert(!(manager && primary)); + dassert(manager || primary); + + if (primary) { + return ChunkVersion::UNSHARDED(); + } + + return manager->getVersion(shardName.toString()); + } + + /** + * Returns the relationship between two maps of shard versions. As above, these maps are often + * comparable when the collection has not been dropped and there is at least one chunk on the + * shards. If any versions in the maps are not comparable, the result is _Unknown. + * + * If any versions in the first map (cached) are _LT the versions in the second map (remote), + * the first (cached) versions are _LT the second (remote) versions. + * + * Note that the signature here is weird since our cached map of chunk versions is stored in a + * ChunkManager or is implicit in the primary shard of the collection. + */ + CompareResult compareAllShardVersions(const ChunkManager* cachedChunkManager, + const Shard* cachedPrimary, + const map<string, ChunkVersion>& remoteShardVersions) { + + CompareResult finalResult = CompareResult_GTE; + + for (map<string, ChunkVersion>::const_iterator it = remoteShardVersions.begin(); + it != remoteShardVersions.end(); + ++it) { + + // Get the remote and cached version for the next shard + const string& shardName = it->first; + const ChunkVersion& remoteShardVersion = it->second; + + ChunkVersion cachedShardVersion; + + try { + // Throws b/c shard constructor throws + cachedShardVersion = getShardVersion(shardName, + cachedChunkManager, + cachedPrimary); + } + catch (const DBException& ex) { + warning() << "could not lookup shard " << shardName + << " in local cache, shard metadata may have changed" + << " or be unavailable" << causedBy(ex); + + return CompareResult_Unknown; + } + + // Compare the remote and cached versions + CompareResult result = compareShardVersions(cachedShardVersion, remoteShardVersion); + + if (result == CompareResult_Unknown) return result; + if (result == CompareResult_LT) finalResult = CompareResult_LT; + + // Note that we keep going after _LT b/c there could be more _Unknowns. + } + + return finalResult; + } + + /** + * Whether or not the manager/primary pair is different from the other manager/primary pair. + */ + bool isMetadataDifferent(const ChunkManagerPtr& managerA, + const ShardPtr& primaryA, + const ChunkManagerPtr& managerB, + const ShardPtr& primaryB) { + + if ((managerA && !managerB) || (!managerA && managerB) || (primaryA && !primaryB) || (!primaryA && primaryB)) return true; + + if (managerA) { + return !managerA->getVersion().isStrictlyEqualTo(managerB->getVersion()); + } + + dassert(NULL != primaryA.get()); + return primaryA->getName() != primaryB->getName(); } + /** + * Whether or not the manager/primary pair was changed or refreshed from a previous version + * of the metadata. + */ + bool wasMetadataRefreshed(const ChunkManagerPtr& managerA, + const ShardPtr& primaryA, + const ChunkManagerPtr& managerB, + const ShardPtr& primaryB) { + + if (isMetadataDifferent(managerA, primaryA, managerB, primaryB)) + return true; + + if (managerA) { + dassert(managerB.get()); // otherwise metadata would be different + return managerA->getSequenceNumber() != managerB->getSequenceNumber(); + } + + return false; + } + +} // namespace + ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss) : _nss(nss), _needsTargetingRefresh(false) { @@ -70,12 +300,12 @@ namespace mongo { DBConfigPtr config; string errMsg; - if ( !getDBConfigSafe( _nss.db(), config, &errMsg ) ) { - return Status( ErrorCodes::DatabaseNotFound, errMsg ); + if (!getDBConfigSafe(_nss.db(), config, &errMsg)) { + return Status(ErrorCodes::DatabaseNotFound, errMsg); } // Get either the chunk manager or primary shard - config->getChunkManagerOrPrimary( _nss.ns(), _manager, _primary ); + config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary); return Status::OK(); } @@ -130,69 +360,6 @@ namespace mongo { } } - namespace { - - // TODO: Expose these for unit testing via dbtests - - enum UpdateType { - UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown - }; - - /** - * There are two styles of update expressions: - * coll.update({ x : 1 }, { y : 2 }) // Replacement style - * coll.update({ x : 1 }, { $set : { y : 2 } }) // OpStyle - */ - UpdateType getUpdateExprType( const BSONObj& updateExpr ) { - - UpdateType updateType = UpdateType_Unknown; - - // Empty update is replacement-style, by default - if ( updateExpr.isEmpty() ) return UpdateType_Replacement; - - BSONObjIterator it( updateExpr ); - while ( it.more() ) { - BSONElement next = it.next(); - - if ( next.fieldName()[0] == '$' ) { - if ( updateType == UpdateType_Unknown ) { - updateType = UpdateType_OpStyle; - } - else if ( updateType == UpdateType_Replacement ) { - return UpdateType_Unknown; - } - } - else { - if ( updateType == UpdateType_Unknown ) { - updateType = UpdateType_Replacement; - } - else if ( updateType == UpdateType_OpStyle ) { - return UpdateType_Unknown; - } - } - } - - return updateType; - } - - /** - * This returns "does the query have an _id field" and "is the _id field - * querying for a direct value like _id : 3 and not _id : { $gt : 3 }" - * - * Ex: { _id : 1 } => true - * { foo : <anything>, _id : 1 } => true - * { _id : { $lt : 30 } } => false - * { foo : <anything> } => false - */ - bool isExactIdQuery(const BSONObj& query) { - static const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1)); - StatusWith<BSONObj> status = virtualIdShardKey.extractShardKeyFromQuery(query); - if (!status.isOK()) - return false; - return !status.getValue()["_id"].eoo(); - } - } - Status ChunkManagerTargeter::targetUpdate( const BatchedUpdateDocument& updateDoc, vector<ShardEndpoint*>* endpoints ) const { @@ -357,10 +524,9 @@ namespace mongo { vector<ShardEndpoint*>* endpoints ) const { if ( !_primary && !_manager ) { - return Status( ErrorCodes::NamespaceNotFound, - str::stream() << "could not target query in " - << getNS().ns() - << "; no metadata found" ); + return Status(ErrorCodes::NamespaceNotFound, + stream() << "could not target query in " + << getNS().ns() << "; no metadata found"); } set<Shard> shards; @@ -454,153 +620,6 @@ namespace mongo { return Status::OK(); } - namespace { - - // - // Utilities to compare shard versions - // - - enum CompareResult { - CompareResult_Unknown, CompareResult_GTE, CompareResult_LT - }; - - /** - * Returns the relationship of two shard versions. Shard versions of a collection that has - * not been dropped and recreated and where there is at least one chunk on a shard are - * comparable, otherwise the result is ambiguous. - */ - CompareResult compareShardVersions( const ChunkVersion& shardVersionA, - const ChunkVersion& shardVersionB ) { - - // Collection may have been dropped - if ( !shardVersionA.hasEqualEpoch( shardVersionB ) ) return CompareResult_Unknown; - - // Zero shard versions are only comparable to themselves - if ( !shardVersionA.isSet() || !shardVersionB.isSet() ) { - // If both are zero... - if ( !shardVersionA.isSet() && !shardVersionB.isSet() ) return CompareResult_GTE; - // Otherwise... - return CompareResult_Unknown; - } - - if ( shardVersionA < shardVersionB ) return CompareResult_LT; - else return CompareResult_GTE; - } - - ChunkVersion getShardVersion( StringData shardName, - const ChunkManagerPtr& manager, - const ShardPtr& primary ) { - - dassert( !( manager && primary ) ); - dassert( manager || primary ); - - if ( primary ) return ChunkVersion::UNSHARDED(); - - return manager->getVersion(shardName.toString()); - } - - /** - * Returns the relationship between two maps of shard versions. As above, these maps are - * often comparable when the collection has not been dropped and there is at least one - * chunk on the shards. - * - * If any versions in the maps are not comparable, the result is _Unknown. - * - * If any versions in the first map (cached) are _LT the versions in the second map - * (remote), the first (cached) versions are _LT the second (remote) versions. - * - * Note that the signature here is weird since our cached map of chunk versions is - * stored in a ChunkManager or is implicit in the primary shard of the collection. - */ - CompareResult // - compareAllShardVersions( const ChunkManagerPtr& cachedShardVersions, - const ShardPtr& cachedPrimary, - const map<string, ChunkVersion>& remoteShardVersions ) { - - CompareResult finalResult = CompareResult_GTE; - - for ( map<string, ChunkVersion>::const_iterator it = remoteShardVersions.begin(); - it != remoteShardVersions.end(); ++it ) { - - // - // Get the remote and cached version for the next shard - // - - const string& shardName = it->first; - const ChunkVersion& remoteShardVersion = it->second; - ChunkVersion cachedShardVersion; - - try { - // Throws b/c shard constructor throws - cachedShardVersion = getShardVersion( shardName, - cachedShardVersions, - cachedPrimary ); - } - catch ( const DBException& ex ) { - - warning() << "could not lookup shard " << shardName - << " in local cache, shard metadata may have changed" - << " or be unavailable" << causedBy( ex ) << endl; - - return CompareResult_Unknown; - } - - // - // Compare the remote and cached versions - // - - CompareResult result = compareShardVersions( cachedShardVersion, - remoteShardVersion ); - - if ( result == CompareResult_Unknown ) return result; - if ( result == CompareResult_LT ) finalResult = CompareResult_LT; - - // Note that we keep going after _LT b/c there could be more _Unknowns. - } - - return finalResult; - } - - /** - * Whether or not the manager/primary pair is different from the other manager/primary pair - */ - bool isMetadataDifferent( const ChunkManagerPtr& managerA, - const ShardPtr& primaryA, - const ChunkManagerPtr& managerB, - const ShardPtr& primaryB ) { - - if ( ( managerA && !managerB ) || ( !managerA && managerB ) || ( primaryA && !primaryB ) - || ( !primaryA && primaryB ) ) return true; - - if ( managerA ) { - return !managerA->getVersion().isStrictlyEqualTo( managerB->getVersion() ); - } - - dassert( NULL != primaryA.get() ); - return primaryA->getName() != primaryB->getName(); - } - - /** - * Whether or not the manager/primary pair was changed or refreshed from a previous version - * of the metadata. - */ - bool wasMetadataRefreshed( const ChunkManagerPtr& managerA, - const ShardPtr& primaryA, - const ChunkManagerPtr& managerB, - const ShardPtr& primaryB ) { - - if ( isMetadataDifferent( managerA, primaryA, managerB, primaryB ) ) - return true; - - if ( managerA ) { - dassert( managerB.get() ); // otherwise metadata would be different - return managerA->getSequenceNumber() != managerB->getSequenceNumber(); - } - - return false; - } - } - void ChunkManagerTargeter::noteStaleResponse( const ShardEndpoint& endpoint, const BSONObj& staleInfo ) { dassert( !_needsTargetingRefresh ); @@ -609,7 +628,8 @@ namespace mongo { if ( staleInfo["vWanted"].eoo() ) { // If we don't have a vWanted sent, assume the version is higher than our current // version. - remoteShardVersion = getShardVersion( endpoint.shardName, _manager, _primary ); + remoteShardVersion = + getShardVersion(endpoint.shardName, _manager.get(), _primary.get()); remoteShardVersion.incMajor(); } else { @@ -712,9 +732,9 @@ namespace mongo { // If we got stale shard versions from remote shards, we may need to refresh // NOTE: Not sure yet if this can happen simultaneously with targeting issues - CompareResult result = compareAllShardVersions( _manager, - _primary, - _remoteShardVersions ); + CompareResult result = compareAllShardVersions(_manager.get(), + _primary.get(), + _remoteShardVersions); // Reset the versions _remoteShardVersions.clear(); @@ -736,19 +756,7 @@ namespace mongo { return Status::OK(); } - // To match legacy reload behavior, we have to backoff on config reload per-thread - // TODO: Centralize this behavior better by refactoring config reload in mongos - static const int maxWaitMillis = 500; - static boost::thread_specific_ptr<Backoff> perThreadBackoff; - - static void refreshBackoff() { - if ( !perThreadBackoff.get() ) - perThreadBackoff.reset( new Backoff( maxWaitMillis, maxWaitMillis * 2 ) ); - perThreadBackoff.get()->nextSleepMillis(); - } - Status ChunkManagerTargeter::refreshNow( RefreshType refreshType ) { - DBConfigPtr config; string errMsg; @@ -781,6 +789,7 @@ namespace mongo { catch ( const DBException& ex ) { return Status( ErrorCodes::UnknownError, ex.toString() ); } + config->getChunkManagerOrPrimary( _nss.ns(), _manager, _primary ); } diff --git a/src/mongo/s/chunk_manager_targeter.h b/src/mongo/s/chunk_manager_targeter.h index 7b8a7592b15..30d715e329d 100644 --- a/src/mongo/s/chunk_manager_targeter.h +++ b/src/mongo/s/chunk_manager_targeter.h @@ -28,18 +28,19 @@ #pragma once -#include <boost/scoped_ptr.hpp> +#include <boost/shared_ptr.hpp> #include <map> #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" -#include "mongo/s/chunk.h" -#include "mongo/s/shard.h" -#include "mongo/s/chunk_version.h" #include "mongo/s/ns_targeter.h" namespace mongo { + class ChunkManager; + struct ChunkVersion; + class Shard; + struct TargeterStats { // Map of chunk shard minKey -> approximate delta. This is used for deciding // whether a chunk might need splitting or not. @@ -54,7 +55,6 @@ namespace mongo { */ class ChunkManagerTargeter : public NSTargeter { public: - ChunkManagerTargeter(const NamespaceString& nss); /** @@ -103,7 +103,6 @@ namespace mongo { const TargeterStats* getStats() const; private: - // Different ways we can refresh metadata enum RefreshType { // No refresh is needed @@ -157,8 +156,8 @@ namespace mongo { // Zero or one of these are filled at all times // If sharded, _manager, if unsharded, _primary, on error, neither - ChunkManagerPtr _manager; - ShardPtr _primary; + boost::shared_ptr<ChunkManager> _manager; + boost::shared_ptr<Shard> _primary; // Map of shard->remote shard version reported from stale errors ShardVersionMap _remoteShardVersions; diff --git a/src/mongo/s/chunk_manager_targeter_test.cpp b/src/mongo/s/chunk_manager_targeter_test.cpp index 30bd0589cbd..fe798dcfa92 100644 --- a/src/mongo/s/chunk_manager_targeter_test.cpp +++ b/src/mongo/s/chunk_manager_targeter_test.cpp @@ -28,10 +28,12 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding +#include "mongo/platform/basic.h" + #include "mongo/db/json.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/query/interval.h" -#include "mongo/s/chunk.h" +#include "mongo/db/query/canonical_query.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -42,6 +44,7 @@ namespace { using std::auto_ptr; using std::make_pair; + /** * ChunkManager targeting test * @@ -529,4 +532,4 @@ namespace { CheckBoundList(list, expectedList); } -} // end namespace +} // namespace diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp index 7f2ba85a63a..c1357835bd2 100644 --- a/src/mongo/s/client/shard_connection.cpp +++ b/src/mongo/s/client/shard_connection.cpp @@ -36,7 +36,7 @@ #include "mongo/db/commands.h" #include "mongo/db/lasterror.h" -#include "mongo/s/chunk.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/request.h" #include "mongo/s/shard.h" #include "mongo/s/stale_exception.h" diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp index c34189df99a..cd2eb01eb4f 100644 --- a/src/mongo/s/cluster_write.cpp +++ b/src/mongo/s/cluster_write.cpp @@ -38,6 +38,7 @@ #include "mongo/base/init.h" #include "mongo/base/status.h" #include "mongo/db/write_concern_options.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_manager_targeter.h" #include "mongo/s/config.h" #include "mongo/s/dbclient_multi_command.h" diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp index 5e872b5a80c..91dd0fecf2c 100644 --- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp +++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp @@ -26,6 +26,8 @@ * then also delete it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/base/init.h" #include "mongo/client/connpool.h" #include "mongo/db/auth/action_type.h" @@ -34,7 +36,8 @@ #include "mongo/db/commands.h" #include "mongo/db/field_parser.h" #include "mongo/db/namespace_string.h" -#include "mongo/s/config.h" // For config server and DBConfig and version refresh +#include "mongo/s/chunk_manager.h" +#include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/shard.h" diff --git a/src/mongo/s/commands_admin.cpp b/src/mongo/s/commands_admin.cpp index 170e43eb77f..1d4918656b3 100644 --- a/src/mongo/s/commands_admin.cpp +++ b/src/mongo/s/commands_admin.cpp @@ -55,7 +55,7 @@ #include "mongo/db/wire_version.h" #include "mongo/db/write_concern.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/chunk.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/client_info.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/cluster_write.h" diff --git a/src/mongo/s/commands_public.cpp b/src/mongo/s/commands_public.cpp index 29cbb1b5413..fcf2d97f2f3 100644 --- a/src/mongo/s/commands_public.cpp +++ b/src/mongo/s/commands_public.cpp @@ -58,7 +58,7 @@ #include "mongo/platform/atomic_word.h" #include "mongo/s/client_info.h" #include "mongo/s/cluster_explain.h" -#include "mongo/s/chunk.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/config.h" #include "mongo/s/cursors.h" #include "mongo/s/distlock.h" @@ -742,7 +742,7 @@ namespace mongo { return passthrough( conf , cmdObj , result ); } - cm->drop( cm ); + cm->drop(); if( ! conf->removeSharding( fullns ) ){ warning() << "collection " << fullns diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp index d8fa0fdba0a..645e9c72cc1 100644 --- a/src/mongo/s/config.cpp +++ b/src/mongo/s/config.cpp @@ -32,19 +32,21 @@ #include "mongo/platform/basic.h" +#include "mongo/s/config.h" + #include <boost/scoped_ptr.hpp> -#include "pcrecpp.h" +#include <pcrecpp.h> #include "mongo/client/connpool.h" #include "mongo/client/dbclientcursor.h" #include "mongo/db/client.h" #include "mongo/db/lasterror.h" +#include "mongo/db/server_options.h" #include "mongo/db/write_concern.h" -#include "mongo/s/chunk.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/cluster_write.h" -#include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/server.h" #include "mongo/s/type_changelog.h" @@ -74,21 +76,30 @@ namespace mongo { int ConfigServer::VERSION = 3; Shard Shard::EMPTY; - /* --- DBConfig --- */ DBConfig::CollectionInfo::CollectionInfo( const BSONObj& in ) { _dirty = false; _dropped = in[CollectionType::dropped()].trueValue(); - if ( in[CollectionType::keyPattern()].isABSONObj() ) { - shard( new ChunkManager( in ) ); + if (in[CollectionType::keyPattern()].isABSONObj()) { + shard(new ChunkManager(in)); } _dirty = false; } - - void DBConfig::CollectionInfo::shard( ChunkManager* manager ){ + DBConfig::CollectionInfo::~CollectionInfo() { + + } + + void DBConfig::CollectionInfo::resetCM(ChunkManager* cm) { + invariant(cm); + invariant(_cm); + + _cm.reset(cm); + } + + void DBConfig::CollectionInfo::shard(ChunkManager* manager){ // Do this *first* so we're invisible to everyone else manager->loadExistingRanges(configServer.getPrimary().getConnString(), NULL); @@ -97,8 +108,8 @@ namespace mongo { // This helps prevent errors when dropping in a different process // - if( manager->numChunks() != 0 ){ - _cm = ChunkManagerPtr( manager ); + if (manager->numChunks() != 0){ + _cm = ChunkManagerPtr(manager); _key = manager->getShardKeyPattern().toBSON().getOwned(); _unqiue = manager->isUnique(); _dirty = true; @@ -106,7 +117,7 @@ namespace mongo { } else{ warning() << "no chunks found for collection " << manager->getns() - << ", assuming unsharded" << endl; + << ", assuming unsharded"; unshard(); } } @@ -149,6 +160,21 @@ namespace mongo { _dirty = false; } + + DBConfig::DBConfig(std::string name) + : _name(name), + _primary("config", "", 0 /* maxSize */, false /* draining */), + _shardingEnabled(false), + _lock("DBConfig"), + _hitConfigServerLock("DBConfig::_hitConfigServerLock") { + + invariant(!_name.empty()); + } + + DBConfig::~DBConfig() { + + } + bool DBConfig::isSharded( const string& ns ) { if ( ! _shardingEnabled ) return false; @@ -157,11 +183,15 @@ namespace mongo { } bool DBConfig::_isSharded( const string& ns ) { - if ( ! _shardingEnabled ) + if (!_shardingEnabled) { return false; - Collections::iterator i = _collections.find( ns ); - if ( i == _collections.end() ) + } + + CollectionInfoMap::iterator i = _collections.find( ns ); + if (i == _collections.end()) { return false; + } + return i->second.isSharded(); } @@ -290,7 +320,7 @@ namespace mongo { scoped_lock lk( _lock ); - Collections::iterator i = _collections.find( ns ); + CollectionInfoMap::iterator i = _collections.find( ns ); if ( i == _collections.end() ) return false; @@ -321,7 +351,7 @@ namespace mongo { { scoped_lock lk( _lock ); - Collections::iterator i = _collections.find( ns ); + CollectionInfoMap::iterator i = _collections.find( ns ); // No namespace if( i == _collections.end() ){ @@ -612,7 +642,7 @@ namespace mongo { if( coll ){ - for ( Collections::iterator i=_collections.begin(); i!=_collections.end(); ++i ) { + for ( CollectionInfoMap::iterator i=_collections.begin(); i!=_collections.end(); ++i ) { if ( ! i->second.isDirty() ) continue; i->second.save( i->first ); @@ -725,7 +755,7 @@ namespace mongo { num = 0; set<string> seen; while ( true ) { - Collections::iterator i = _collections.begin(); + CollectionInfoMap::iterator i = _collections.begin(); for ( ; i != _collections.end(); ++i ) { // log() << "coll : " << i->first << " and " << i->second.isSharded() << endl; if ( i->second.isSharded() ) @@ -744,7 +774,7 @@ namespace mongo { LOG(1) << "\t dropping sharded collection: " << i->first << endl; i->second.getCM()->getAllShards( allServers ); - i->second.getCM()->drop( i->second.getCM() ); + i->second.getCM()->drop(); // We should warn, but it's not a fatal error if someone else reloaded the db/coll as // unsharded in the meantime @@ -765,7 +795,7 @@ namespace mongo { void DBConfig::getAllShards(set<Shard>& shards) const { scoped_lock lk( _lock ); shards.insert(getPrimary()); - for (Collections::const_iterator it(_collections.begin()), end(_collections.end()); it != end; ++it) { + for (CollectionInfoMap::const_iterator it(_collections.begin()), end(_collections.end()); it != end; ++it) { if (it->second.isSharded()) { it->second.getCM()->getAllShards(shards); } // TODO: handle collections on non-primary shard @@ -776,7 +806,7 @@ namespace mongo { scoped_lock lk( _lock ); - for( Collections::const_iterator i = _collections.begin(); i != _collections.end(); i++ ) { + for( CollectionInfoMap::const_iterator i = _collections.begin(); i != _collections.end(); i++ ) { log() << "Coll : " << i->first << " sharded? " << i->second.isSharded() << endl; if( i->second.isSharded() ) namespaces.insert( i->first ); } diff --git a/src/mongo/s/config.h b/src/mongo/s/config.h index d3626baada5..f71e0e26b8e 100644 --- a/src/mongo/s/config.h +++ b/src/mongo/s/config.h @@ -38,15 +38,15 @@ #include <boost/shared_ptr.hpp> #include "mongo/client/dbclient_rs.h" -#include "mongo/s/chunk.h" #include "mongo/s/shard.h" #include "mongo/s/shard_key_pattern.h" namespace mongo { + class ChunkManager; class ConfigServer; - class DBConfig; + typedef boost::shared_ptr<DBConfig> DBConfigPtr; extern DBConfigPtr configServerPtr; @@ -56,65 +56,9 @@ namespace mongo { * top level configuration for a database */ class DBConfig { - - struct CollectionInfo { - CollectionInfo() { - _dirty = false; - _dropped = false; - } - - CollectionInfo( const BSONObj& in ); - - bool isSharded() const { - return _cm.get(); - } - - ChunkManagerPtr getCM() const { - return _cm; - } - - void resetCM( ChunkManager * cm ) { - verify(cm); - verify(_cm); // this has to be already sharded - _cm.reset( cm ); - } - - void shard( ChunkManager* cm ); - void unshard(); - - bool isDirty() const { return _dirty; } - bool wasDropped() const { return _dropped; } - - void save( const std::string& ns ); - - bool unique() const { return _unqiue; } - BSONObj key() const { return _key; } - - - private: - BSONObj _key; - bool _unqiue; - ChunkManagerPtr _cm; - bool _dirty; - bool _dropped; - }; - - typedef std::map<std::string,CollectionInfo> Collections; - public: - - DBConfig( std::string name ) - : _name( name ) , - _primary("config", - "", - 0 /* maxSize */, - false /* draining */), - _shardingEnabled(false), - _lock("DBConfig") , - _hitConfigServerLock( "DBConfig::_hitConfigServerLock" ) { - verify( name.size() ); - } - virtual ~DBConfig() {} + DBConfig(std::string name); + virtual ~DBConfig(); std::string getName() const { return _name; }; @@ -135,11 +79,11 @@ namespace mongo { * WARNING: It's not safe to place initial chunks onto non-primary shards using this method. * The initShards parameter allows legacy behavior expected by map-reduce. */ - ChunkManagerPtr shardCollection(const std::string& ns, - const ShardKeyPattern& fieldsAndOrder, - bool unique, - std::vector<BSONObj>* initPoints, - std::vector<Shard>* initShards = NULL); + boost::shared_ptr<ChunkManager> shardCollection(const std::string& ns, + const ShardKeyPattern& fieldsAndOrder, + bool unique, + std::vector<BSONObj>* initPoints, + std::vector<Shard>* initShards = NULL); /** @return true if there was sharding info to remove @@ -153,10 +97,10 @@ namespace mongo { // Atomically returns *either* the chunk manager *or* the primary shard for the collection, // neither if the collection doesn't exist. - void getChunkManagerOrPrimary( const std::string& ns, ChunkManagerPtr& manager, ShardPtr& primary ); + void getChunkManagerOrPrimary(const std::string& ns, boost::shared_ptr<ChunkManager>& manager, ShardPtr& primary); - ChunkManagerPtr getChunkManager( const std::string& ns , bool reload = false, bool forceReload = false ); - ChunkManagerPtr getChunkManagerIfExists( const std::string& ns , bool reload = false, bool forceReload = false ); + boost::shared_ptr<ChunkManager> getChunkManager(const std::string& ns, bool reload = false, bool forceReload = false); + boost::shared_ptr<ChunkManager> getChunkManagerIfExists(const std::string& ns, bool reload = false, bool forceReload = false); const Shard& getShard( const std::string& ns ); /** @@ -189,6 +133,47 @@ namespace mongo { void getAllShardedCollections(std::set<std::string>& namespaces) const; protected: + struct CollectionInfo { + CollectionInfo() { + _dirty = false; + _dropped = false; + } + + CollectionInfo(const BSONObj& in); + ~CollectionInfo(); + + bool isSharded() const { + return _cm.get(); + } + + boost::shared_ptr<ChunkManager> getCM() const { + return _cm; + } + + void resetCM(ChunkManager * cm); + + void shard(ChunkManager* cm); + void unshard(); + + bool isDirty() const { return _dirty; } + bool wasDropped() const { return _dropped; } + + void save(const std::string& ns); + + bool unique() const { return _unqiue; } + BSONObj key() const { return _key; } + + + private: + BSONObj _key; + bool _unqiue; + boost::shared_ptr<ChunkManager> _cm; + bool _dirty; + bool _dropped; + }; + + typedef std::map<std::string, CollectionInfo> CollectionInfoMap; + /** lockless @@ -201,14 +186,13 @@ namespace mongo { bool _reload(); void _save( bool db = true, bool coll = true ); - std::string _name; // e.g. "alleyinsider" + + const std::string _name; // e.g. "alleyinsider" + Shard _primary; // e.g. localhost , mongo.foo.com:9999 bool _shardingEnabled; - //map<std::string,CollectionInfo> _sharded; // { "alleyinsider.blog.posts" : { ts : 1 } , ... ] - all ns that are sharded - //map<std::string,ChunkManagerPtr> _shards; // this will only have entries for things that have been looked at - - Collections _collections; + CollectionInfoMap _collections; mutable mongo::mutex _lock; // TODO: change to r/w lock ?? mutable mongo::mutex _hitConfigServerLock; diff --git a/src/mongo/s/config_server_checker_service.cpp b/src/mongo/s/config_server_checker_service.cpp index 8bec6b1ee2d..6f80a7004a1 100644 --- a/src/mongo/s/config_server_checker_service.cpp +++ b/src/mongo/s/config_server_checker_service.cpp @@ -26,11 +26,14 @@ * then also delete it in the license file. */ +#include "mongo/platform/basic.h" + +#include "mongo/s/config_server_checker_service.h" + #include <boost/scoped_ptr.hpp> #include <boost/thread/thread.hpp> #include "mongo/s/config.h" -#include "mongo/s/config_server_checker_service.h" #include "mongo/util/concurrency/mutex.h" #include "mongo/util/exit.h" @@ -72,4 +75,3 @@ namespace mongo { return _checkerThread != NULL; } } - diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index 2e44fec54f4..852d1c353d3 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -34,8 +34,8 @@ #include "mongo/s/grid.h" -#include "pcrecpp.h" #include <iomanip> +#include <pcrecpp.h> #include "mongo/client/connpool.h" #include "mongo/client/replica_set_monitor.h" @@ -67,6 +67,16 @@ namespace mongo { MONGO_FP_DECLARE(neverBalance); + Grid::Grid() + : _lock("Grid"), + _allowLocalShard(true) { + + } + + Grid::~Grid() { + + } + DBConfigPtr Grid::getDBConfig( StringData ns , bool create , const string& shardNameHint ) { string database = nsToDatabase( ns ); @@ -474,8 +484,8 @@ namespace mongo { return ! shard.isEmpty(); } - bool Grid::_getNewShardName( string* name ) const { - DEV verify( name ); + bool Grid::_getNewShardName(string* name) const { + invariant(name); bool ok = false; int count = 0; @@ -484,18 +494,20 @@ namespace mongo { BSONObj o = conn->findOne(ShardType::ConfigNS, Query(fromjson("{" + ShardType::name() + ": /^shard/}")) .sort(BSON(ShardType::name() << -1 ))); - if ( ! o.isEmpty() ) { + if (!o.isEmpty()) { string last = o[ShardType::name()].String(); - istringstream is( last.substr( 5 ) ); + istringstream is(last.substr(5)); is >> count; count++; } + if (count < 9999) { stringstream ss; ss << "shard" << setfill('0') << setw(4) << count; *name = ss.str(); ok = true; } + conn.done(); return ok; diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index a63d0e970d9..1543e354567 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -35,7 +35,7 @@ #include "mongo/util/time_support.h" #include "mongo/util/concurrency/mutex.h" -#include "mongo/s/config.h" // DBConfigPtr +#include "mongo/s/config.h" namespace mongo { @@ -47,7 +47,8 @@ namespace mongo { */ class Grid { public: - Grid() : _lock( "Grid" ) , _allowLocalShard( true ) { } + Grid(); + ~Grid(); /** * gets the config the db. @@ -156,10 +157,6 @@ namespace mongo { static bool _inBalancingWindow( const BSONObj& balancerDoc , const boost::posix_time::ptime& now ); private: - mongo::mutex _lock; // protects _databases; TODO: change to r/w lock ?? - std::map<std::string, DBConfigPtr > _databases; // maps ns to DBConfig's - bool _allowLocalShard; // can 'localhost' be used in shard addresses? - /** * @param name is the chose name for the shard. Parameter is mandatory. * @return true if it managed to generate a shard name. May return false if (currently) @@ -171,6 +168,14 @@ namespace mongo { * @return whether a give dbname is used for shard "local" databases (e.g., admin or local) */ static bool _isSpecialLocalDB( const std::string& dbName ); + + + // Databases catalog map and mutex to protect it + mongo::mutex _lock; + std::map<std::string, DBConfigPtr> _databases; + + // can 'localhost' be used in shard addresses? + bool _allowLocalShard; }; extern Grid grid; diff --git a/src/mongo/s/mongos_options.cpp b/src/mongo/s/mongos_options.cpp index b84cd44365f..566ac2e033a 100644 --- a/src/mongo/s/mongos_options.cpp +++ b/src/mongo/s/mongos_options.cpp @@ -1,4 +1,4 @@ -/* +/** * Copyright (C) 2013 10gen Inc. * * This program is free software: you can redistribute it and/or modify diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index eda8e400ad8..a8458f71c1f 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -58,7 +58,7 @@ #include "mongo/db/startup_warnings_common.h" #include "mongo/platform/process_id.h" #include "mongo/s/balance.h" -#include "mongo/s/chunk.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/client_info.h" #include "mongo/s/config.h" #include "mongo/s/config_server_checker_service.h" diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp index 2cb49dc4e3a..c841c2b2cab 100644 --- a/src/mongo/s/strategy.cpp +++ b/src/mongo/s/strategy.cpp @@ -52,7 +52,7 @@ #include "mongo/s/chunk_manager_targeter.h" #include "mongo/s/client_info.h" #include "mongo/s/cluster_write.h" -#include "mongo/s/chunk.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_version.h" #include "mongo/s/cursors.h" #include "mongo/s/dbclient_shard_resolver.h" diff --git a/src/mongo/s/version_manager.cpp b/src/mongo/s/version_manager.cpp index f71b51bc89f..9396f12a8a3 100644 --- a/src/mongo/s/version_manager.cpp +++ b/src/mongo/s/version_manager.cpp @@ -34,7 +34,7 @@ #include "mongo/s/version_manager.h" -#include "mongo/s/chunk.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/config.h" |