diff options
Diffstat (limited to 'src/mongo/client/parallel.h')
-rw-r--r-- | src/mongo/client/parallel.h | 517 |
1 files changed, 264 insertions, 253 deletions
diff --git a/src/mongo/client/parallel.h b/src/mongo/client/parallel.h index 3e97b6c23ea..ae778334fb8 100644 --- a/src/mongo/client/parallel.h +++ b/src/mongo/client/parallel.h @@ -38,329 +38,340 @@ namespace mongo { - class DBClientCursorHolder; - class StaleConfigException; - class ParallelConnectionMetadata; +class DBClientCursorHolder; +class StaleConfigException; +class ParallelConnectionMetadata; - class CommandInfo { - public: - std::string versionedNS; - BSONObj cmdFilter; - - CommandInfo() {} - CommandInfo( const std::string& vns, const BSONObj& filter ) : versionedNS( vns ), cmdFilter( filter ) {} +class CommandInfo { +public: + std::string versionedNS; + BSONObj cmdFilter; - bool isEmpty(){ - return versionedNS.size() == 0; - } + CommandInfo() {} + CommandInfo(const std::string& vns, const BSONObj& filter) + : versionedNS(vns), cmdFilter(filter) {} - std::string toString() const { - return str::stream() << "CInfo " << BSON( "v_ns" << versionedNS << "filter" << cmdFilter ); - } - }; + bool isEmpty() { + return versionedNS.size() == 0; + } - class DBClientCursor; - typedef std::shared_ptr<DBClientCursor> DBClientCursorPtr; + std::string toString() const { + return str::stream() << "CInfo " << BSON("v_ns" << versionedNS << "filter" << cmdFilter); + } +}; - class ParallelConnectionState { - public: +class DBClientCursor; +typedef std::shared_ptr<DBClientCursor> DBClientCursorPtr; - ParallelConnectionState() : - count( 0 ), done( false ) { } +class ParallelConnectionState { +public: + ParallelConnectionState() : count(0), done(false) {} - // Please do not reorder. cursor destructor can use conn. - // On a related note, never attempt to cleanup these pointers manually. - ShardConnectionPtr conn; - DBClientCursorPtr cursor; + // Please do not reorder. cursor destructor can use conn. + // On a related note, never attempt to cleanup these pointers manually. + ShardConnectionPtr conn; + DBClientCursorPtr cursor; - // Version information - ChunkManagerPtr manager; - std::shared_ptr<Shard> primary; + // Version information + ChunkManagerPtr manager; + std::shared_ptr<Shard> primary; - // Cursor status information - long long count; - bool done; + // Cursor status information + long long count; + bool done; - BSONObj toBSON() const; + BSONObj toBSON() const; - std::string toString() const { - return str::stream() << "PCState : " << toBSON(); - } - }; + std::string toString() const { + return str::stream() << "PCState : " << toBSON(); + } +}; - typedef ParallelConnectionState PCState; - typedef std::shared_ptr<PCState> PCStatePtr; +typedef ParallelConnectionState PCState; +typedef std::shared_ptr<PCState> PCStatePtr; - class ParallelConnectionMetadata { - public: +class ParallelConnectionMetadata { +public: + ParallelConnectionMetadata() + : retryNext(false), initialized(false), finished(false), completed(false), errored(false) {} - ParallelConnectionMetadata() : - retryNext( false ), initialized( false ), finished( false ), completed( false ), errored( false ) { } + ~ParallelConnectionMetadata() { + cleanup(true); + } - ~ParallelConnectionMetadata(){ - cleanup( true ); - } + void cleanup(bool full = true); - void cleanup( bool full = true ); + PCStatePtr pcState; - PCStatePtr pcState; + bool retryNext; - bool retryNext; + bool initialized; + bool finished; + bool completed; - bool initialized; - bool finished; - bool completed; + bool errored; - bool errored; + BSONObj toBSON() const; - BSONObj toBSON() const; + std::string toString() const { + return str::stream() << "PCMData : " << toBSON(); + } +}; - std::string toString() const { - return str::stream() << "PCMData : " << toBSON(); - } - }; +typedef ParallelConnectionMetadata PCMData; +typedef std::shared_ptr<PCMData> PCMDataPtr; - typedef ParallelConnectionMetadata PCMData; - typedef std::shared_ptr<PCMData> PCMDataPtr; - - /** - * Runs a query in parallel across N servers, enforcing compatible chunk versions for queries - * across all shards. - * - * If CommandInfo is provided, the ParallelCursor does not use the direct .$cmd namespace in the - * query spec, but instead enforces versions across another namespace specified by CommandInfo. - * This is to support commands like: - * db.runCommand({ fileMD5 : "<coll name>" }) - * - * There is a deprecated legacy mode as well which effectively does a merge-sort across a number - * of servers, but does not correctly enforce versioning (used only in mapreduce). - */ - class ParallelSortClusteredCursor { - public: - - ParallelSortClusteredCursor( const QuerySpec& qSpec, const CommandInfo& cInfo = CommandInfo() ); - - // DEPRECATED legacy constructor for pure mergesort functionality - do not use - ParallelSortClusteredCursor( const std::set<std::string>& servers , const std::string& ns , - const Query& q , int options=0, const BSONObj& fields=BSONObj() ); - - ~ParallelSortClusteredCursor(); +/** + * Runs a query in parallel across N servers, enforcing compatible chunk versions for queries + * across all shards. + * + * If CommandInfo is provided, the ParallelCursor does not use the direct .$cmd namespace in the + * query spec, but instead enforces versions across another namespace specified by CommandInfo. + * This is to support commands like: + * db.runCommand({ fileMD5 : "<coll name>" }) + * + * There is a deprecated legacy mode as well which effectively does a merge-sort across a number + * of servers, but does not correctly enforce versioning (used only in mapreduce). + */ +class ParallelSortClusteredCursor { +public: + ParallelSortClusteredCursor(const QuerySpec& qSpec, const CommandInfo& cInfo = CommandInfo()); - std::string getNS(); + // DEPRECATED legacy constructor for pure mergesort functionality - do not use + ParallelSortClusteredCursor(const std::set<std::string>& servers, + const std::string& ns, + const Query& q, + int options = 0, + const BSONObj& fields = BSONObj()); - /** call before using */ - void init(); + ~ParallelSortClusteredCursor(); - bool more(); - BSONObj next(); - std::string type() const { return "ParallelSort"; } + std::string getNS(); - void fullInit(); - void startInit(); - void finishInit(); + /** call before using */ + void init(); - bool isCommand(){ return NamespaceString( _qSpec.ns() ).isCommand(); } - bool isExplain(){ return _qSpec.isExplain(); } + bool more(); + BSONObj next(); + std::string type() const { + return "ParallelSort"; + } - /** - * Sets the batch size on all underlying cursors to 'newBatchSize'. - */ - void setBatchSize(int newBatchSize); + void fullInit(); + void startInit(); + void finishInit(); - /** - * Returns whether the collection was sharded when the cursors were established. - */ - bool isSharded(); + bool isCommand() { + return NamespaceString(_qSpec.ns()).isCommand(); + } + bool isExplain() { + return _qSpec.isExplain(); + } - /** - * Returns the number of shards with open cursors. - */ - int getNumQueryShards(); + /** + * Sets the batch size on all underlying cursors to 'newBatchSize'. + */ + void setBatchSize(int newBatchSize); - /** - * Returns the set of shards with open cursors. - */ - void getQueryShardIds(std::set<ShardId>& shardIds); + /** + * Returns whether the collection was sharded when the cursors were established. + */ + bool isSharded(); - /** - * Returns the single shard with an open cursor. - * It is an error to call this if getNumQueryShards() > 1 - */ - std::shared_ptr<Shard> getQueryShard(); + /** + * Returns the number of shards with open cursors. + */ + int getNumQueryShards(); - /** - * Returns primary shard with an open cursor. - * It is an error to call this if the collection is sharded. - */ - std::shared_ptr<Shard> getPrimary(); + /** + * Returns the set of shards with open cursors. + */ + void getQueryShardIds(std::set<ShardId>& shardIds); - DBClientCursorPtr getShardCursor(const ShardId& shardId); + /** + * Returns the single shard with an open cursor. + * It is an error to call this if getNumQueryShards() > 1 + */ + std::shared_ptr<Shard> getQueryShard(); - BSONObj toBSON() const; - std::string toString() const; + /** + * Returns primary shard with an open cursor. + * It is an error to call this if the collection is sharded. + */ + std::shared_ptr<Shard> getPrimary(); - void explain(BSONObjBuilder& b); + DBClientCursorPtr getShardCursor(const ShardId& shardId); - private: - void _finishCons(); + BSONObj toBSON() const; + std::string toString() const; - void _explain( std::map< std::string,std::list<BSONObj> >& out ); + void explain(BSONObjBuilder& b); - void _markStaleNS( const NamespaceString& staleNS, const StaleConfigException& e, bool& forceReload, bool& fullReload ); - void _handleStaleNS( const NamespaceString& staleNS, bool forceReload, bool fullReload ); +private: + void _finishCons(); - bool _didInit; - bool _done; + void _explain(std::map<std::string, std::list<BSONObj>>& out); - QuerySpec _qSpec; - CommandInfo _cInfo; + void _markStaleNS(const NamespaceString& staleNS, + const StaleConfigException& e, + bool& forceReload, + bool& fullReload); + void _handleStaleNS(const NamespaceString& staleNS, bool forceReload, bool fullReload); - // Count round-trips req'd for namespaces and total - std::map<std::string,int> _staleNSMap; - int _totalTries; + bool _didInit; + bool _done; - std::map<ShardId, PCMData> _cursorMap; + QuerySpec _qSpec; + CommandInfo _cInfo; - // LEGACY BELOW - int _numServers; - int _lastFrom; - std::set<std::string> _servers; - BSONObj _sortKey; + // Count round-trips req'd for namespaces and total + std::map<std::string, int> _staleNSMap; + int _totalTries; - DBClientCursorHolder * _cursors; - int _needToSkip; + std::map<ShardId, PCMData> _cursorMap; - /** - * Setups the shard version of the connection. When using a replica - * set connection and the primary cannot be reached, the version - * will not be set if the slaveOk flag is set. - */ - void setupVersionAndHandleSlaveOk(PCStatePtr state /* in & out */, - const ShardId& shardId, - std::shared_ptr<Shard> primary /* in */, - const NamespaceString& ns, - const std::string& vinfo, - ChunkManagerPtr manager /* in */ ); - - // LEGACY init - Needed for map reduce - void _oldInit(); - - // LEGACY - Needed ONLY for _oldInit - std::string _ns; - BSONObj _query; - int _options; - BSONObj _fields; - int _batchSize; - }; + // LEGACY BELOW + int _numServers; + int _lastFrom; + std::set<std::string> _servers; + BSONObj _sortKey; + DBClientCursorHolder* _cursors; + int _needToSkip; /** - * Helper class to manage ownership of opened cursors while merging results. - * - * TODO: Choose one set of ownership semantics so that this isn't needed - merge sort via - * mapreduce is the main issue since it has no metadata and this holder owns the cursors. + * Setups the shard version of the connection. When using a replica + * set connection and the primary cannot be reached, the version + * will not be set if the slaveOk flag is set. */ - class DBClientCursorHolder { - public: + void setupVersionAndHandleSlaveOk(PCStatePtr state /* in & out */, + const ShardId& shardId, + std::shared_ptr<Shard> primary /* in */, + const NamespaceString& ns, + const std::string& vinfo, + ChunkManagerPtr manager /* in */); - DBClientCursorHolder() {} - ~DBClientCursorHolder() {} + // LEGACY init - Needed for map reduce + void _oldInit(); - void reset(DBClientCursor* cursor, ParallelConnectionMetadata* pcmData) { - _cursor.reset(cursor); - _pcmData.reset(pcmData); - } - - DBClientCursor* get() { return _cursor.get(); } - ParallelConnectionMetadata* getMData() { return _pcmData.get(); } + // LEGACY - Needed ONLY for _oldInit + std::string _ns; + BSONObj _query; + int _options; + BSONObj _fields; + int _batchSize; +}; - void release() { - _cursor.release(); - _pcmData.release(); - } - private: - - std::unique_ptr<DBClientCursor> _cursor; - std::unique_ptr<ParallelConnectionMetadata> _pcmData; - }; +/** + * Helper class to manage ownership of opened cursors while merging results. + * + * TODO: Choose one set of ownership semantics so that this isn't needed - merge sort via + * mapreduce is the main issue since it has no metadata and this holder owns the cursors. + */ +class DBClientCursorHolder { +public: + DBClientCursorHolder() {} + ~DBClientCursorHolder() {} + + void reset(DBClientCursor* cursor, ParallelConnectionMetadata* pcmData) { + _cursor.reset(cursor); + _pcmData.reset(pcmData); + } + + DBClientCursor* get() { + return _cursor.get(); + } + ParallelConnectionMetadata* getMData() { + return _pcmData.get(); + } + + void release() { + _cursor.release(); + _pcmData.release(); + } + +private: + std::unique_ptr<DBClientCursor> _cursor; + std::unique_ptr<ParallelConnectionMetadata> _pcmData; +}; - /** - * Generally clients should be using Strategy::commandOp() wherever possible - the Future API - * does not handle versioning. - * - * tools for doing asynchronous operations - * right now uses underlying sync network ops and uses another thread - * should be changed to use non-blocking io - */ - class Future { +/** + * Generally clients should be using Strategy::commandOp() wherever possible - the Future API + * does not handle versioning. + * + * tools for doing asynchronous operations + * right now uses underlying sync network ops and uses another thread + * should be changed to use non-blocking io + */ +class Future { +public: + class CommandResult { public: - class CommandResult { - public: - - std::string getServer() const { return _server; } - - bool isDone() const { return _done; } + std::string getServer() const { + return _server; + } - bool ok() const { - verify( _done ); - return _ok; - } + bool isDone() const { + return _done; + } - BSONObj result() const { - verify( _done ); - return _res; - } + bool ok() const { + verify(_done); + return _ok; + } - /** - blocks until command is done - returns ok() - */ - bool join( int maxRetries = 1 ); + BSONObj result() const { + verify(_done); + return _res; + } - private: + /** + blocks until command is done + returns ok() + */ + bool join(int maxRetries = 1); - CommandResult( const std::string& server, - const std::string& db, - const BSONObj& cmd, - int options, - DBClientBase * conn, - bool useShardedConn ); - void init(); + private: + CommandResult(const std::string& server, + const std::string& db, + const BSONObj& cmd, + int options, + DBClientBase* conn, + bool useShardedConn); + void init(); - std::string _server; - std::string _db; - int _options; - BSONObj _cmd; - DBClientBase * _conn; - std::unique_ptr<AScopedConnection> _connHolder; // used if not provided a connection - bool _useShardConn; + std::string _server; + std::string _db; + int _options; + BSONObj _cmd; + DBClientBase* _conn; + std::unique_ptr<AScopedConnection> _connHolder; // used if not provided a connection + bool _useShardConn; - std::unique_ptr<DBClientCursor> _cursor; + std::unique_ptr<DBClientCursor> _cursor; - BSONObj _res; - bool _ok; - bool _done; + BSONObj _res; + bool _ok; + bool _done; - friend class Future; - }; + friend class Future; + }; - /** - * @param server server name - * @param db db name - * @param cmd cmd to exec - * @param conn optional connection to use. will use standard pooled if non-specified - * @param useShardConn use ShardConnection - */ - static std::shared_ptr<CommandResult> spawnCommand( const std::string& server, + /** + * @param server server name + * @param db db name + * @param cmd cmd to exec + * @param conn optional connection to use. will use standard pooled if non-specified + * @param useShardConn use ShardConnection + */ + static std::shared_ptr<CommandResult> spawnCommand(const std::string& server, const std::string& db, const BSONObj& cmd, int options, - DBClientBase * conn = 0, - bool useShardConn = false ); - }; - + DBClientBase* conn = 0, + bool useShardConn = false); +}; } - |