summaryrefslogtreecommitdiff
path: root/src/mongo/client/parallel.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client/parallel.h')
-rw-r--r--src/mongo/client/parallel.h517
1 files changed, 264 insertions, 253 deletions
diff --git a/src/mongo/client/parallel.h b/src/mongo/client/parallel.h
index 3e97b6c23ea..ae778334fb8 100644
--- a/src/mongo/client/parallel.h
+++ b/src/mongo/client/parallel.h
@@ -38,329 +38,340 @@
namespace mongo {
- class DBClientCursorHolder;
- class StaleConfigException;
- class ParallelConnectionMetadata;
+class DBClientCursorHolder;
+class StaleConfigException;
+class ParallelConnectionMetadata;
- class CommandInfo {
- public:
- std::string versionedNS;
- BSONObj cmdFilter;
-
- CommandInfo() {}
- CommandInfo( const std::string& vns, const BSONObj& filter ) : versionedNS( vns ), cmdFilter( filter ) {}
+class CommandInfo {
+public:
+ std::string versionedNS;
+ BSONObj cmdFilter;
- bool isEmpty(){
- return versionedNS.size() == 0;
- }
+ CommandInfo() {}
+ CommandInfo(const std::string& vns, const BSONObj& filter)
+ : versionedNS(vns), cmdFilter(filter) {}
- std::string toString() const {
- return str::stream() << "CInfo " << BSON( "v_ns" << versionedNS << "filter" << cmdFilter );
- }
- };
+ bool isEmpty() {
+ return versionedNS.size() == 0;
+ }
- class DBClientCursor;
- typedef std::shared_ptr<DBClientCursor> DBClientCursorPtr;
+ std::string toString() const {
+ return str::stream() << "CInfo " << BSON("v_ns" << versionedNS << "filter" << cmdFilter);
+ }
+};
- class ParallelConnectionState {
- public:
+class DBClientCursor;
+typedef std::shared_ptr<DBClientCursor> DBClientCursorPtr;
- ParallelConnectionState() :
- count( 0 ), done( false ) { }
+class ParallelConnectionState {
+public:
+ ParallelConnectionState() : count(0), done(false) {}
- // Please do not reorder. cursor destructor can use conn.
- // On a related note, never attempt to cleanup these pointers manually.
- ShardConnectionPtr conn;
- DBClientCursorPtr cursor;
+ // Please do not reorder. cursor destructor can use conn.
+ // On a related note, never attempt to cleanup these pointers manually.
+ ShardConnectionPtr conn;
+ DBClientCursorPtr cursor;
- // Version information
- ChunkManagerPtr manager;
- std::shared_ptr<Shard> primary;
+ // Version information
+ ChunkManagerPtr manager;
+ std::shared_ptr<Shard> primary;
- // Cursor status information
- long long count;
- bool done;
+ // Cursor status information
+ long long count;
+ bool done;
- BSONObj toBSON() const;
+ BSONObj toBSON() const;
- std::string toString() const {
- return str::stream() << "PCState : " << toBSON();
- }
- };
+ std::string toString() const {
+ return str::stream() << "PCState : " << toBSON();
+ }
+};
- typedef ParallelConnectionState PCState;
- typedef std::shared_ptr<PCState> PCStatePtr;
+typedef ParallelConnectionState PCState;
+typedef std::shared_ptr<PCState> PCStatePtr;
- class ParallelConnectionMetadata {
- public:
+class ParallelConnectionMetadata {
+public:
+ ParallelConnectionMetadata()
+ : retryNext(false), initialized(false), finished(false), completed(false), errored(false) {}
- ParallelConnectionMetadata() :
- retryNext( false ), initialized( false ), finished( false ), completed( false ), errored( false ) { }
+ ~ParallelConnectionMetadata() {
+ cleanup(true);
+ }
- ~ParallelConnectionMetadata(){
- cleanup( true );
- }
+ void cleanup(bool full = true);
- void cleanup( bool full = true );
+ PCStatePtr pcState;
- PCStatePtr pcState;
+ bool retryNext;
- bool retryNext;
+ bool initialized;
+ bool finished;
+ bool completed;
- bool initialized;
- bool finished;
- bool completed;
+ bool errored;
- bool errored;
+ BSONObj toBSON() const;
- BSONObj toBSON() const;
+ std::string toString() const {
+ return str::stream() << "PCMData : " << toBSON();
+ }
+};
- std::string toString() const {
- return str::stream() << "PCMData : " << toBSON();
- }
- };
+typedef ParallelConnectionMetadata PCMData;
+typedef std::shared_ptr<PCMData> PCMDataPtr;
- typedef ParallelConnectionMetadata PCMData;
- typedef std::shared_ptr<PCMData> PCMDataPtr;
-
- /**
- * Runs a query in parallel across N servers, enforcing compatible chunk versions for queries
- * across all shards.
- *
- * If CommandInfo is provided, the ParallelCursor does not use the direct .$cmd namespace in the
- * query spec, but instead enforces versions across another namespace specified by CommandInfo.
- * This is to support commands like:
- * db.runCommand({ fileMD5 : "<coll name>" })
- *
- * There is a deprecated legacy mode as well which effectively does a merge-sort across a number
- * of servers, but does not correctly enforce versioning (used only in mapreduce).
- */
- class ParallelSortClusteredCursor {
- public:
-
- ParallelSortClusteredCursor( const QuerySpec& qSpec, const CommandInfo& cInfo = CommandInfo() );
-
- // DEPRECATED legacy constructor for pure mergesort functionality - do not use
- ParallelSortClusteredCursor( const std::set<std::string>& servers , const std::string& ns ,
- const Query& q , int options=0, const BSONObj& fields=BSONObj() );
-
- ~ParallelSortClusteredCursor();
+/**
+ * Runs a query in parallel across N servers, enforcing compatible chunk versions for queries
+ * across all shards.
+ *
+ * If CommandInfo is provided, the ParallelCursor does not use the direct .$cmd namespace in the
+ * query spec, but instead enforces versions across another namespace specified by CommandInfo.
+ * This is to support commands like:
+ * db.runCommand({ fileMD5 : "<coll name>" })
+ *
+ * There is a deprecated legacy mode as well which effectively does a merge-sort across a number
+ * of servers, but does not correctly enforce versioning (used only in mapreduce).
+ */
+class ParallelSortClusteredCursor {
+public:
+ ParallelSortClusteredCursor(const QuerySpec& qSpec, const CommandInfo& cInfo = CommandInfo());
- std::string getNS();
+ // DEPRECATED legacy constructor for pure mergesort functionality - do not use
+ ParallelSortClusteredCursor(const std::set<std::string>& servers,
+ const std::string& ns,
+ const Query& q,
+ int options = 0,
+ const BSONObj& fields = BSONObj());
- /** call before using */
- void init();
+ ~ParallelSortClusteredCursor();
- bool more();
- BSONObj next();
- std::string type() const { return "ParallelSort"; }
+ std::string getNS();
- void fullInit();
- void startInit();
- void finishInit();
+ /** call before using */
+ void init();
- bool isCommand(){ return NamespaceString( _qSpec.ns() ).isCommand(); }
- bool isExplain(){ return _qSpec.isExplain(); }
+ bool more();
+ BSONObj next();
+ std::string type() const {
+ return "ParallelSort";
+ }
- /**
- * Sets the batch size on all underlying cursors to 'newBatchSize'.
- */
- void setBatchSize(int newBatchSize);
+ void fullInit();
+ void startInit();
+ void finishInit();
- /**
- * Returns whether the collection was sharded when the cursors were established.
- */
- bool isSharded();
+ bool isCommand() {
+ return NamespaceString(_qSpec.ns()).isCommand();
+ }
+ bool isExplain() {
+ return _qSpec.isExplain();
+ }
- /**
- * Returns the number of shards with open cursors.
- */
- int getNumQueryShards();
+ /**
+ * Sets the batch size on all underlying cursors to 'newBatchSize'.
+ */
+ void setBatchSize(int newBatchSize);
- /**
- * Returns the set of shards with open cursors.
- */
- void getQueryShardIds(std::set<ShardId>& shardIds);
+ /**
+ * Returns whether the collection was sharded when the cursors were established.
+ */
+ bool isSharded();
- /**
- * Returns the single shard with an open cursor.
- * It is an error to call this if getNumQueryShards() > 1
- */
- std::shared_ptr<Shard> getQueryShard();
+ /**
+ * Returns the number of shards with open cursors.
+ */
+ int getNumQueryShards();
- /**
- * Returns primary shard with an open cursor.
- * It is an error to call this if the collection is sharded.
- */
- std::shared_ptr<Shard> getPrimary();
+ /**
+ * Returns the set of shards with open cursors.
+ */
+ void getQueryShardIds(std::set<ShardId>& shardIds);
- DBClientCursorPtr getShardCursor(const ShardId& shardId);
+ /**
+ * Returns the single shard with an open cursor.
+ * It is an error to call this if getNumQueryShards() > 1
+ */
+ std::shared_ptr<Shard> getQueryShard();
- BSONObj toBSON() const;
- std::string toString() const;
+ /**
+ * Returns primary shard with an open cursor.
+ * It is an error to call this if the collection is sharded.
+ */
+ std::shared_ptr<Shard> getPrimary();
- void explain(BSONObjBuilder& b);
+ DBClientCursorPtr getShardCursor(const ShardId& shardId);
- private:
- void _finishCons();
+ BSONObj toBSON() const;
+ std::string toString() const;
- void _explain( std::map< std::string,std::list<BSONObj> >& out );
+ void explain(BSONObjBuilder& b);
- void _markStaleNS( const NamespaceString& staleNS, const StaleConfigException& e, bool& forceReload, bool& fullReload );
- void _handleStaleNS( const NamespaceString& staleNS, bool forceReload, bool fullReload );
+private:
+ void _finishCons();
- bool _didInit;
- bool _done;
+ void _explain(std::map<std::string, std::list<BSONObj>>& out);
- QuerySpec _qSpec;
- CommandInfo _cInfo;
+ void _markStaleNS(const NamespaceString& staleNS,
+ const StaleConfigException& e,
+ bool& forceReload,
+ bool& fullReload);
+ void _handleStaleNS(const NamespaceString& staleNS, bool forceReload, bool fullReload);
- // Count round-trips req'd for namespaces and total
- std::map<std::string,int> _staleNSMap;
- int _totalTries;
+ bool _didInit;
+ bool _done;
- std::map<ShardId, PCMData> _cursorMap;
+ QuerySpec _qSpec;
+ CommandInfo _cInfo;
- // LEGACY BELOW
- int _numServers;
- int _lastFrom;
- std::set<std::string> _servers;
- BSONObj _sortKey;
+ // Count round-trips req'd for namespaces and total
+ std::map<std::string, int> _staleNSMap;
+ int _totalTries;
- DBClientCursorHolder * _cursors;
- int _needToSkip;
+ std::map<ShardId, PCMData> _cursorMap;
- /**
- * Setups the shard version of the connection. When using a replica
- * set connection and the primary cannot be reached, the version
- * will not be set if the slaveOk flag is set.
- */
- void setupVersionAndHandleSlaveOk(PCStatePtr state /* in & out */,
- const ShardId& shardId,
- std::shared_ptr<Shard> primary /* in */,
- const NamespaceString& ns,
- const std::string& vinfo,
- ChunkManagerPtr manager /* in */ );
-
- // LEGACY init - Needed for map reduce
- void _oldInit();
-
- // LEGACY - Needed ONLY for _oldInit
- std::string _ns;
- BSONObj _query;
- int _options;
- BSONObj _fields;
- int _batchSize;
- };
+ // LEGACY BELOW
+ int _numServers;
+ int _lastFrom;
+ std::set<std::string> _servers;
+ BSONObj _sortKey;
+ DBClientCursorHolder* _cursors;
+ int _needToSkip;
/**
- * Helper class to manage ownership of opened cursors while merging results.
- *
- * TODO: Choose one set of ownership semantics so that this isn't needed - merge sort via
- * mapreduce is the main issue since it has no metadata and this holder owns the cursors.
+ * Setups the shard version of the connection. When using a replica
+ * set connection and the primary cannot be reached, the version
+ * will not be set if the slaveOk flag is set.
*/
- class DBClientCursorHolder {
- public:
+ void setupVersionAndHandleSlaveOk(PCStatePtr state /* in & out */,
+ const ShardId& shardId,
+ std::shared_ptr<Shard> primary /* in */,
+ const NamespaceString& ns,
+ const std::string& vinfo,
+ ChunkManagerPtr manager /* in */);
- DBClientCursorHolder() {}
- ~DBClientCursorHolder() {}
+ // LEGACY init - Needed for map reduce
+ void _oldInit();
- void reset(DBClientCursor* cursor, ParallelConnectionMetadata* pcmData) {
- _cursor.reset(cursor);
- _pcmData.reset(pcmData);
- }
-
- DBClientCursor* get() { return _cursor.get(); }
- ParallelConnectionMetadata* getMData() { return _pcmData.get(); }
+ // LEGACY - Needed ONLY for _oldInit
+ std::string _ns;
+ BSONObj _query;
+ int _options;
+ BSONObj _fields;
+ int _batchSize;
+};
- void release() {
- _cursor.release();
- _pcmData.release();
- }
- private:
-
- std::unique_ptr<DBClientCursor> _cursor;
- std::unique_ptr<ParallelConnectionMetadata> _pcmData;
- };
+/**
+ * Helper class to manage ownership of opened cursors while merging results.
+ *
+ * TODO: Choose one set of ownership semantics so that this isn't needed - merge sort via
+ * mapreduce is the main issue since it has no metadata and this holder owns the cursors.
+ */
+class DBClientCursorHolder {
+public:
+ DBClientCursorHolder() {}
+ ~DBClientCursorHolder() {}
+
+ void reset(DBClientCursor* cursor, ParallelConnectionMetadata* pcmData) {
+ _cursor.reset(cursor);
+ _pcmData.reset(pcmData);
+ }
+
+ DBClientCursor* get() {
+ return _cursor.get();
+ }
+ ParallelConnectionMetadata* getMData() {
+ return _pcmData.get();
+ }
+
+ void release() {
+ _cursor.release();
+ _pcmData.release();
+ }
+
+private:
+ std::unique_ptr<DBClientCursor> _cursor;
+ std::unique_ptr<ParallelConnectionMetadata> _pcmData;
+};
- /**
- * Generally clients should be using Strategy::commandOp() wherever possible - the Future API
- * does not handle versioning.
- *
- * tools for doing asynchronous operations
- * right now uses underlying sync network ops and uses another thread
- * should be changed to use non-blocking io
- */
- class Future {
+/**
+ * Generally clients should be using Strategy::commandOp() wherever possible - the Future API
+ * does not handle versioning.
+ *
+ * tools for doing asynchronous operations
+ * right now uses underlying sync network ops and uses another thread
+ * should be changed to use non-blocking io
+ */
+class Future {
+public:
+ class CommandResult {
public:
- class CommandResult {
- public:
-
- std::string getServer() const { return _server; }
-
- bool isDone() const { return _done; }
+ std::string getServer() const {
+ return _server;
+ }
- bool ok() const {
- verify( _done );
- return _ok;
- }
+ bool isDone() const {
+ return _done;
+ }
- BSONObj result() const {
- verify( _done );
- return _res;
- }
+ bool ok() const {
+ verify(_done);
+ return _ok;
+ }
- /**
- blocks until command is done
- returns ok()
- */
- bool join( int maxRetries = 1 );
+ BSONObj result() const {
+ verify(_done);
+ return _res;
+ }
- private:
+ /**
+ blocks until command is done
+ returns ok()
+ */
+ bool join(int maxRetries = 1);
- CommandResult( const std::string& server,
- const std::string& db,
- const BSONObj& cmd,
- int options,
- DBClientBase * conn,
- bool useShardedConn );
- void init();
+ private:
+ CommandResult(const std::string& server,
+ const std::string& db,
+ const BSONObj& cmd,
+ int options,
+ DBClientBase* conn,
+ bool useShardedConn);
+ void init();
- std::string _server;
- std::string _db;
- int _options;
- BSONObj _cmd;
- DBClientBase * _conn;
- std::unique_ptr<AScopedConnection> _connHolder; // used if not provided a connection
- bool _useShardConn;
+ std::string _server;
+ std::string _db;
+ int _options;
+ BSONObj _cmd;
+ DBClientBase* _conn;
+ std::unique_ptr<AScopedConnection> _connHolder; // used if not provided a connection
+ bool _useShardConn;
- std::unique_ptr<DBClientCursor> _cursor;
+ std::unique_ptr<DBClientCursor> _cursor;
- BSONObj _res;
- bool _ok;
- bool _done;
+ BSONObj _res;
+ bool _ok;
+ bool _done;
- friend class Future;
- };
+ friend class Future;
+ };
- /**
- * @param server server name
- * @param db db name
- * @param cmd cmd to exec
- * @param conn optional connection to use. will use standard pooled if non-specified
- * @param useShardConn use ShardConnection
- */
- static std::shared_ptr<CommandResult> spawnCommand( const std::string& server,
+ /**
+ * @param server server name
+ * @param db db name
+ * @param cmd cmd to exec
+ * @param conn optional connection to use. will use standard pooled if non-specified
+ * @param useShardConn use ShardConnection
+ */
+ static std::shared_ptr<CommandResult> spawnCommand(const std::string& server,
const std::string& db,
const BSONObj& cmd,
int options,
- DBClientBase * conn = 0,
- bool useShardConn = false );
- };
-
+ DBClientBase* conn = 0,
+ bool useShardConn = false);
+};
}
-