// @file chunk.h /** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #pragma once #include "mongo/base/string_data.h" #include "mongo/bson/util/atomic_int.h" #include "mongo/s/chunk_version.h" #include "mongo/s/distlock.h" #include "mongo/s/shard.h" #include "mongo/s/shardkey.h" #include "mongo/util/concurrency/ticketholder.h" #include "mongo/db/query/query_solution.h" namespace mongo { class DBConfig; class Chunk; class ChunkRange; class ChunkManager; class ChunkObjUnitTest; typedef shared_ptr ChunkPtr; // key is max for each Chunk or ChunkRange typedef std::map ChunkMap; typedef std::map,BSONObjCmp> ChunkRangeMap; typedef shared_ptr ChunkManagerPtr; /** config.chunks { ns : "alleyinsider.fs.chunks" , min : {} , max : {} , server : "localhost:30001" } x is in a shard iff min <= x < max */ class Chunk : boost::noncopyable { public: Chunk( const ChunkManager * info , BSONObj from); Chunk( const ChunkManager * info , const BSONObj& min, const BSONObj& max, const Shard& shard, ChunkVersion lastmod = ChunkVersion() ); // // serialization support // void serialize(BSONObjBuilder& to, ChunkVersion myLastMod=ChunkVersion(0,OID())); // // chunk boundary support // const BSONObj& getMin() const { return _min; } const BSONObj& getMax() const { return _max; } // if min/max key is pos/neg infinity bool minIsInf() const; bool maxIsInf() const; // Returns true if this chunk contains the given point, and false otherwise // // Note: this function takes an extracted *key*, not an original document // (the point may be computed by, say, hashing a given field or projecting // to a subset of fields). bool containsPoint( const BSONObj& point ) const; std::string genID() const; static std::string genID( const std::string& ns , const BSONObj& min ); // // chunk version support // void appendShortVersion( const char * name , BSONObjBuilder& b ) const; ChunkVersion getLastmod() const { return _lastmod; } void setLastmod( ChunkVersion v ) { _lastmod = v; } // // split support // long getBytesWritten() const { return _dataWritten; } // Const since _dataWritten is mutable and a heuristic // TODO: Split data tracking and chunk information void setBytesWritten( long bytesWritten ) const { _dataWritten = bytesWritten; } /** * if the amount of data written nears the max size of a shard * then we check the real size, and if its too big, we split * @return if something was split */ bool splitIfShould( long dataWritten ) const; /** * Splits this chunk at a non-specificed split key to be chosen by the mongod holding this chunk. * * @param atMedian if set to true, will split the chunk at the middle regardless if * the split is really necessary size wise. If set to false, will only split if * the chunk has reached the currently desired maximum size. Setting to false also * has the effect of splitting the chunk such that the resulting chunks will never * be greater than the current chunk size setting. * @param res the object containing details about the split execution * @param resultingSplits the number of resulting split points. Set to NULL to ignore. * * @throws UserException */ Status split( bool atMedian, size_t* resultingSplits ) const; /** * Splits this chunk at the given key (or keys) * * @param splitPoints the vector of keys that should be used to divide this chunk * @param res the object containing details about the split execution * * @throws UserException */ Status multiSplit( const std::vector& splitPoints ) const; /** * Asks the mongod holding this chunk to find a key that approximately divides this chunk in two * * @param medianKey the key that divides this chunk, if there is one, or empty */ void pickMedianKey( BSONObj& medianKey ) const; /** * @param splitPoints vector to be filled in * @param chunkSize chunk size to target in bytes * @param maxPoints limits the number of split points that are needed, zero is max (optional) * @param maxObjs limits the number of objects in each chunk, zero is as max (optional) */ void pickSplitVector( std::vector& splitPoints , int chunkSize , int maxPoints = 0, int maxObjs = 0) const; // // migration support // /** * Issues a migrate request for this chunk * * @param to shard to move this chunk to * @param chunSize maximum number of bytes beyond which the migrate should no go trhough * @param secondaryThrottle whether during migrate all writes should block for repl * @param waitForDelete whether chunk move should wait for cleanup or return immediately * @param maxTimeMS max time for the migrate request * @param res the object containing details about the migrate execution * @return true if move was successful */ bool moveAndCommit(const Shard& to, long long chunkSize, bool secondaryThrottle, bool waitForDelete, int maxTimeMS, BSONObj& res) const; /** * @return size of shard in bytes * talks to mongod to do this */ long getPhysicalSize() const; /** * marks this chunk as a jumbo chunk * that means the chunk will be inelligble for migrates */ void markAsJumbo() const; bool isJumbo() const { return _jumbo; } /** * Attempt to refresh maximum chunk size from config. */ static void refreshChunkSize(); /** * sets MaxChunkSize * 1 <= newMaxChunkSize <= 1024 * @return true if newMaxChunkSize is valid and was set */ static bool setMaxChunkSizeSizeMB( int newMaxChunkSize ); // // public constants // static int MaxChunkSize; static int MaxObjectPerChunk; static bool ShouldAutoSplit; // // accessors and helpers // std::string toString() const; friend std::ostream& operator << (std::ostream& out, const Chunk& c) { return (out << c.toString()); } // chunk equality is determined by comparing the min and max bounds of the chunk bool operator==(const Chunk& s) const; bool operator!=(const Chunk& s) const { return ! ( *this == s ); } std::string getns() const; Shard getShard() const { return _shard; } const ChunkManager* getManager() const { return _manager; } private: // main shard info const ChunkManager * _manager; BSONObj _min; BSONObj _max; Shard _shard; ChunkVersion _lastmod; mutable bool _jumbo; // transient stuff mutable long _dataWritten; // methods, etc.. /** Returns the highest or lowest existing value in the shard-key space. * Warning: this assumes that the shard key is not "special"- that is, the shardKeyPattern * is simply an ordered list of ascending/descending field names. Examples: * {a : 1, b : -1} is not special. {a : "hashed"} is. * * if sort 1, return lowest key * if sort -1, return highest key * will return empty object if have none */ BSONObj _getExtremeKey( int sort ) const; /** * Determines the appropriate split points for this chunk. * * @param atMedian perform a single split at the middle of this chunk. * @param splitPoints out parameter containing the chosen split points. Can be empty. */ void determineSplitPoints(bool atMedian, std::vector* splitPoints) const; /** initializes _dataWritten with a random value so that a mongos restart wouldn't cause delay in splitting */ static int mkDataWritten(); ShardKeyPattern skey() const; }; class ChunkRange { public: const ChunkManager* getManager() const { return _manager; } Shard getShard() const { return _shard; } const BSONObj& getMin() const { return _min; } const BSONObj& getMax() const { return _max; } // clones of Chunk methods // Returns true if this ChunkRange contains the given point, and false otherwise // // Note: this function takes an extracted *key*, not an original document // (the point may be computed by, say, hashing a given field or projecting // to a subset of fields). bool containsPoint( const BSONObj& point ) const; ChunkRange(ChunkMap::const_iterator begin, const ChunkMap::const_iterator end) : _manager(begin->second->getManager()) , _shard(begin->second->getShard()) , _min(begin->second->getMin()) , _max(boost::prior(end)->second->getMax()) { verify( begin != end ); DEV while (begin != end) { verify(begin->second->getManager() == _manager); verify(begin->second->getShard() == _shard); ++begin; } } // Merge min and max (must be adjacent ranges) ChunkRange(const ChunkRange& min, const ChunkRange& max) : _manager(min.getManager()) , _shard(min.getShard()) , _min(min.getMin()) , _max(max.getMax()) { verify(min.getShard() == max.getShard()); verify(min.getManager() == max.getManager()); verify(min.getMax() == max.getMin()); } friend std::ostream& operator<<(std::ostream& out, const ChunkRange& cr) { return (out << "ChunkRange(min=" << cr._min << ", max=" << cr._max << ", shard=" << cr._shard <<")"); } private: const ChunkManager* _manager; const Shard _shard; const BSONObj _min; const BSONObj _max; }; class ChunkRangeManager { public: const ChunkRangeMap& ranges() const { return _ranges; } void clear() { _ranges.clear(); } void reloadAll(const ChunkMap& chunks); // Slow operation -- wrap with DEV void assertValid() const; ChunkRangeMap::const_iterator upper_bound(const BSONObj& o) const { return _ranges.upper_bound(o); } ChunkRangeMap::const_iterator lower_bound(const BSONObj& o) const { return _ranges.lower_bound(o); } private: // assumes nothing in this range exists in _ranges void _insertRange(ChunkMap::const_iterator begin, const ChunkMap::const_iterator end); ChunkRangeMap _ranges; }; /* config.sharding { ns: 'alleyinsider.fs.chunks' , key: { ts : 1 } , shards: [ { min: 1, max: 100, server: a } , { min: 101, max: 200 , server : b } ] } */ class ChunkManager { public: typedef std::map ShardVersionMap; // Loads a new chunk manager from a collection document ChunkManager( const BSONObj& collDoc ); // Creates an empty chunk manager for the namespace ChunkManager( const std::string& ns, const ShardKeyPattern& pattern, bool unique ); // Updates a chunk manager based on an older manager ChunkManager( ChunkManagerPtr oldManager ); std::string getns() const { return _ns; } const ShardKeyPattern& getShardKey() const { return _key; } bool hasShardKey( const BSONObj& obj ) const; bool isUnique() const { return _unique; } /** * this is just an increasing number of how many ChunkManagers we have so we know if something has been updated */ unsigned long long getSequenceNumber() const { return _sequenceNumber; } // // After constructor is invoked, we need to call loadExistingRanges. If this is a new // sharded collection, we can call createFirstChunks first. // // Creates new chunks based on info in chunk manager void createFirstChunks( const std::string& config, const Shard& primary, const std::vector* initPoints, const std::vector* initShards ); // Loads existing ranges based on info in chunk manager void loadExistingRanges( const std::string& config ); // Helpers for load void calcInitSplitsAndShards( const Shard& primary, const std::vector* initPoints, const std::vector* initShards, std::vector* splitPoints, std::vector* shards ) const; // // Methods to use once loaded / created // int numChunks() const { return _chunkMap.size(); } /** Given a document, returns the chunk which contains that document. * This works by extracting the shard key part of the given document, then * calling findIntersectingChunk() on the extracted key. * * See also the description for findIntersectingChunk(). */ ChunkPtr findChunkForDoc( const BSONObj& doc ) const; /** Given a key that has been extracted from a document, returns the * chunk that contains that key. * * For instance, to locate the chunk for document {a : "foo" , b : "bar"} * when the shard key is {a : "hashed"}, you can call * findChunkForDoc() on {a : "foo" , b : "bar"}, or * findIntersectingChunk() on {a : hash("foo") } */ ChunkPtr findIntersectingChunk( const BSONObj& point ) const; ChunkPtr findChunkOnServer( const Shard& shard ) const; void getShardsForQuery( std::set& shards , const BSONObj& query ) const; void getAllShards( std::set& all ) const; /** @param shards set to the shards covered by the interval [min, max], see SERVER-4791 */ void getShardsForRange( std::set& shards, const BSONObj& min, const BSONObj& max ) const; // Transforms query into bounds for each field in the shard key // for example : // Key { a: 1, b: 1 }, // Query { a : { $gte : 1, $lt : 2 }, // b : { $gte : 3, $lt : 4 } } // => Bounds { a : [1, 2), b : [3, 4) } static IndexBounds getIndexBoundsForQuery(const BSONObj& key, const CanonicalQuery* canonicalQuery); // Collapse query solution tree. // // If it has OR node, the result could be a superset of the index bounds generated. // Since to give a single IndexBounds, this gives the union of bounds on each field. // for example: // OR: { a: (0, 1), b: (0, 1) }, // { a: (2, 3), b: (2, 3) } // => { a: (0, 1), (2, 3), b: (0, 1), (2, 3) } static IndexBounds collapseQuerySolution( const QuerySolutionNode* node ); ChunkMap getChunkMap() const { return _chunkMap; } /** * Returns true if, for this shard, the chunks are identical in both chunk managers */ bool compatibleWith( const ChunkManager& other, const Shard& shard ) const; bool compatibleWith( ChunkManagerPtr other, const Shard& shard ) const { if( ! other ) return false; return compatibleWith( *other, shard ); } bool compatibleWith( const Chunk& other ) const; bool compatibleWith( ChunkPtr other ) const { if( ! other ) return false; return compatibleWith( *other ); } std::string toString() const; ChunkVersion getVersion( const StringData& shardName ) const; ChunkVersion getVersion( const Shard& shard ) const; ChunkVersion getVersion() const; void getInfo( BSONObjBuilder& b ) const; /** * @param me - so i don't get deleted before i'm done */ void drop( ChunkManagerPtr me ) const; void _printChunks() const; int getCurrentDesiredChunkSize() const; ChunkManagerPtr reload(bool force=true) const; // doesn't modify self! void markMinorForReload( ChunkVersion majorVersion ) const; void getMarkedMinorVersions( std::set& minorVersions ) const; private: // helpers for loading // returns true if load was consistent bool _load( const std::string& config, ChunkMap& chunks, std::set& shards, ShardVersionMap& shardVersions, ChunkManagerPtr oldManager); static bool _isValid(const ChunkMap& chunks); // end helpers // All members should be const for thread-safety const std::string _ns; const ShardKeyPattern _key; const bool _unique; const ChunkMap _chunkMap; const ChunkRangeManager _chunkRanges; const std::set _shards; const ShardVersionMap _shardVersions; // max version per shard // max version of any chunk ChunkVersion _version; // the previous manager this was based on // cleared after loading chunks ChunkManagerPtr _oldManager; mutable mutex _mutex; // only used with _nsLock const unsigned long long _sequenceNumber; // // Split Heuristic info // class SplitHeuristics { public: SplitHeuristics() : _splitTickets( maxParallelSplits ), _staleMinorSetMutex( "SplitHeuristics::staleMinorSet" ), _staleMinorCount( 0 ) {} void markMinorForReload( const std::string& ns, ChunkVersion majorVersion ); void getMarkedMinorVersions( std::set& minorVersions ); TicketHolder _splitTickets; mutex _staleMinorSetMutex; // mutex protects below int _staleMinorCount; std::set _staleMinorSet; // Test whether we should split once data * splitTestFactor > chunkSize (approximately) static const int splitTestFactor = 5; // Maximum number of parallel threads requesting a split static const int maxParallelSplits = 5; // The idea here is that we're over-aggressive on split testing by a factor of // splitTestFactor, so we can safely wait until we get to splitTestFactor invalid splits // before changing. Unfortunately, we also potentially over-request the splits by a // factor of maxParallelSplits, but since the factors are identical it works out // (for now) for parallel or sequential oversplitting. // TODO: Make splitting a separate thread with notifications? static const int staleMinorReloadThreshold = maxParallelSplits; }; mutable SplitHeuristics _splitHeuristics; // // End split heuristics // friend class Chunk; friend class ChunkRangeManager; // only needed for CRM::assertValid() static AtomicUInt NextSequenceNumber; /** Just for testing */ friend class TestableChunkManager; ChunkManager(); }; // like BSONObjCmp. for use as an STL comparison functor // key-order in "order" argument must match key-order in shardkey class ChunkCmp { public: ChunkCmp( const BSONObj &order = BSONObj() ) : _cmp( order ) {} bool operator()( const Chunk &l, const Chunk &r ) const { return _cmp(l.getMin(), r.getMin()); } bool operator()( const ptr l, const ptr r ) const { return operator()(*l, *r); } // Also support ChunkRanges bool operator()( const ChunkRange &l, const ChunkRange &r ) const { return _cmp(l.getMin(), r.getMin()); } bool operator()( const shared_ptr l, const shared_ptr r ) const { return operator()(*l, *r); } private: BSONObjCmp _cmp; }; /* struct chunk_lock { chunk_lock( const Chunk* c ){ } Chunk _c; }; */ inline std::string Chunk::genID() const { return genID(_manager->getns(), _min); } bool setShardVersion( DBClientBase & conn, const std::string& ns, ChunkVersion version, ChunkManagerPtr manager, bool authoritative, BSONObj& result ); } // namespace mongo