/* Copyright 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ /** tools for working in parallel/sharded/clustered environment */ #pragma once #include #include "mongo/db/namespace_string.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_connection.h" namespace mongo { class DBClientCursorHolder; class StaleConfigException; class ParallelConnectionMetadata; class CommandInfo { public: std::string versionedNS; BSONObj cmdFilter; CommandInfo() {} CommandInfo( const std::string& vns, const BSONObj& filter ) : versionedNS( vns ), cmdFilter( filter ) {} bool isEmpty(){ return versionedNS.size() == 0; } std::string toString() const { return str::stream() << "CInfo " << BSON( "v_ns" << versionedNS << "filter" << cmdFilter ); } }; class DBClientCursor; typedef boost::shared_ptr DBClientCursorPtr; class ParallelConnectionState { public: ParallelConnectionState() : count( 0 ), done( false ) { } // Please do not reorder. cursor destructor can use conn. // On a related note, never attempt to cleanup these pointers manually. ShardConnectionPtr conn; DBClientCursorPtr cursor; // Version information ChunkManagerPtr manager; boost::shared_ptr primary; // Cursor status information long long count; bool done; BSONObj toBSON() const; std::string toString() const { return str::stream() << "PCState : " << toBSON(); } }; typedef ParallelConnectionState PCState; typedef boost::shared_ptr PCStatePtr; class ParallelConnectionMetadata { public: ParallelConnectionMetadata() : retryNext( false ), initialized( false ), finished( false ), completed( false ), errored( false ) { } ~ParallelConnectionMetadata(){ cleanup( true ); } void cleanup( bool full = true ); PCStatePtr pcState; bool retryNext; bool initialized; bool finished; bool completed; bool errored; BSONObj toBSON() const; std::string toString() const { return str::stream() << "PCMData : " << toBSON(); } }; typedef ParallelConnectionMetadata PCMData; typedef boost::shared_ptr PCMDataPtr; /** * Runs a query in parallel across N servers, enforcing compatible chunk versions for queries * across all shards. * * If CommandInfo is provided, the ParallelCursor does not use the direct .$cmd namespace in the * query spec, but instead enforces versions across another namespace specified by CommandInfo. * This is to support commands like: * db.runCommand({ fileMD5 : "" }) * * There is a deprecated legacy mode as well which effectively does a merge-sort across a number * of servers, but does not correctly enforce versioning (used only in mapreduce). */ class ParallelSortClusteredCursor { public: ParallelSortClusteredCursor( const QuerySpec& qSpec, const CommandInfo& cInfo = CommandInfo() ); // DEPRECATED legacy constructor for pure mergesort functionality - do not use ParallelSortClusteredCursor( const std::set& servers , const std::string& ns , const Query& q , int options=0, const BSONObj& fields=BSONObj() ); ~ParallelSortClusteredCursor(); std::string getNS(); /** call before using */ void init(); bool more(); BSONObj next(); std::string type() const { return "ParallelSort"; } void fullInit(); void startInit(); void finishInit(); bool isCommand(){ return NamespaceString( _qSpec.ns() ).isCommand(); } bool isExplain(){ return _qSpec.isExplain(); } /** * Sets the batch size on all underlying cursors to 'newBatchSize'. */ void setBatchSize(int newBatchSize); /** * Returns whether the collection was sharded when the cursors were established. */ bool isSharded(); /** * Returns the number of shards with open cursors. */ int getNumQueryShards(); /** * Returns the set of shards with open cursors. */ void getQueryShardIds(std::set& shardIds); /** * Returns the single shard with an open cursor. * It is an error to call this if getNumQueryShards() > 1 */ boost::shared_ptr getQueryShard(); /** * Returns primary shard with an open cursor. * It is an error to call this if the collection is sharded. */ boost::shared_ptr getPrimary(); DBClientCursorPtr getShardCursor(const ShardId& shardId); BSONObj toBSON() const; std::string toString() const; void explain(BSONObjBuilder& b); private: void _finishCons(); void _explain( std::map< std::string,std::list >& out ); void _markStaleNS( const NamespaceString& staleNS, const StaleConfigException& e, bool& forceReload, bool& fullReload ); void _handleStaleNS( const NamespaceString& staleNS, bool forceReload, bool fullReload ); bool _didInit; bool _done; QuerySpec _qSpec; CommandInfo _cInfo; // Count round-trips req'd for namespaces and total std::map _staleNSMap; int _totalTries; std::map _cursorMap; // LEGACY BELOW int _numServers; int _lastFrom; std::set _servers; BSONObj _sortKey; DBClientCursorHolder * _cursors; int _needToSkip; /** * Setups the shard version of the connection. When using a replica * set connection and the primary cannot be reached, the version * will not be set if the slaveOk flag is set. */ void setupVersionAndHandleSlaveOk(PCStatePtr state /* in & out */, const ShardId& shardId, boost::shared_ptr primary /* in */, const NamespaceString& ns, const std::string& vinfo, ChunkManagerPtr manager /* in */ ); // LEGACY init - Needed for map reduce void _oldInit(); // LEGACY - Needed ONLY for _oldInit std::string _ns; BSONObj _query; int _options; BSONObj _fields; int _batchSize; }; /** * Helper class to manage ownership of opened cursors while merging results. * * TODO: Choose one set of ownership semantics so that this isn't needed - merge sort via * mapreduce is the main issue since it has no metadata and this holder owns the cursors. */ class DBClientCursorHolder { public: DBClientCursorHolder() {} ~DBClientCursorHolder() {} void reset(DBClientCursor* cursor, ParallelConnectionMetadata* pcmData) { _cursor.reset(cursor); _pcmData.reset(pcmData); } DBClientCursor* get() { return _cursor.get(); } ParallelConnectionMetadata* getMData() { return _pcmData.get(); } void release() { _cursor.release(); _pcmData.release(); } private: std::unique_ptr _cursor; std::unique_ptr _pcmData; }; /** * Generally clients should be using Strategy::commandOp() wherever possible - the Future API * does not handle versioning. * * tools for doing asynchronous operations * right now uses underlying sync network ops and uses another thread * should be changed to use non-blocking io */ class Future { public: class CommandResult { public: std::string getServer() const { return _server; } bool isDone() const { return _done; } bool ok() const { verify( _done ); return _ok; } BSONObj result() const { verify( _done ); return _res; } /** blocks until command is done returns ok() */ bool join( int maxRetries = 1 ); private: CommandResult( const std::string& server, const std::string& db, const BSONObj& cmd, int options, DBClientBase * conn, bool useShardedConn ); void init(); std::string _server; std::string _db; int _options; BSONObj _cmd; DBClientBase * _conn; std::unique_ptr _connHolder; // used if not provided a connection bool _useShardConn; std::unique_ptr _cursor; BSONObj _res; bool _ok; bool _done; friend class Future; }; /** * @param server server name * @param db db name * @param cmd cmd to exec * @param conn optional connection to use. will use standard pooled if non-specified * @param useShardConn use ShardConnection */ static boost::shared_ptr spawnCommand( const std::string& server, const std::string& db, const BSONObj& cmd, int options, DBClientBase * conn = 0, bool useShardConn = false ); }; }