/* Copyright 2009 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
/**
tools for working in parallel/sharded/clustered environment
*/
#pragma once
#include "mongo/db/namespace_string.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_connection.h"
namespace mongo {
class DBClientCursorHolder;
class StaleConfigException;
class ParallelConnectionMetadata;
class CommandInfo {
public:
std::string versionedNS;
BSONObj cmdFilter;
CommandInfo() {}
CommandInfo( const std::string& vns, const BSONObj& filter ) : versionedNS( vns ), cmdFilter( filter ) {}
bool isEmpty(){
return versionedNS.size() == 0;
}
std::string toString() const {
return str::stream() << "CInfo " << BSON( "v_ns" << versionedNS << "filter" << cmdFilter );
}
};
class DBClientCursor;
typedef std::shared_ptr DBClientCursorPtr;
class ParallelConnectionState {
public:
ParallelConnectionState() :
count( 0 ), done( false ) { }
// Please do not reorder. cursor destructor can use conn.
// On a related note, never attempt to cleanup these pointers manually.
ShardConnectionPtr conn;
DBClientCursorPtr cursor;
// Version information
ChunkManagerPtr manager;
std::shared_ptr primary;
// Cursor status information
long long count;
bool done;
BSONObj toBSON() const;
std::string toString() const {
return str::stream() << "PCState : " << toBSON();
}
};
typedef ParallelConnectionState PCState;
typedef std::shared_ptr PCStatePtr;
class ParallelConnectionMetadata {
public:
ParallelConnectionMetadata() :
retryNext( false ), initialized( false ), finished( false ), completed( false ), errored( false ) { }
~ParallelConnectionMetadata(){
cleanup( true );
}
void cleanup( bool full = true );
PCStatePtr pcState;
bool retryNext;
bool initialized;
bool finished;
bool completed;
bool errored;
BSONObj toBSON() const;
std::string toString() const {
return str::stream() << "PCMData : " << toBSON();
}
};
typedef ParallelConnectionMetadata PCMData;
typedef std::shared_ptr PCMDataPtr;
/**
* Runs a query in parallel across N servers, enforcing compatible chunk versions for queries
* across all shards.
*
* If CommandInfo is provided, the ParallelCursor does not use the direct .$cmd namespace in the
* query spec, but instead enforces versions across another namespace specified by CommandInfo.
* This is to support commands like:
* db.runCommand({ fileMD5 : "" })
*
* There is a deprecated legacy mode as well which effectively does a merge-sort across a number
* of servers, but does not correctly enforce versioning (used only in mapreduce).
*/
class ParallelSortClusteredCursor {
public:
ParallelSortClusteredCursor( const QuerySpec& qSpec, const CommandInfo& cInfo = CommandInfo() );
// DEPRECATED legacy constructor for pure mergesort functionality - do not use
ParallelSortClusteredCursor( const std::set& servers , const std::string& ns ,
const Query& q , int options=0, const BSONObj& fields=BSONObj() );
~ParallelSortClusteredCursor();
std::string getNS();
/** call before using */
void init();
bool more();
BSONObj next();
std::string type() const { return "ParallelSort"; }
void fullInit();
void startInit();
void finishInit();
bool isCommand(){ return NamespaceString( _qSpec.ns() ).isCommand(); }
bool isExplain(){ return _qSpec.isExplain(); }
/**
* Sets the batch size on all underlying cursors to 'newBatchSize'.
*/
void setBatchSize(int newBatchSize);
/**
* Returns whether the collection was sharded when the cursors were established.
*/
bool isSharded();
/**
* Returns the number of shards with open cursors.
*/
int getNumQueryShards();
/**
* Returns the set of shards with open cursors.
*/
void getQueryShardIds(std::set& shardIds);
/**
* Returns the single shard with an open cursor.
* It is an error to call this if getNumQueryShards() > 1
*/
std::shared_ptr getQueryShard();
/**
* Returns primary shard with an open cursor.
* It is an error to call this if the collection is sharded.
*/
std::shared_ptr getPrimary();
DBClientCursorPtr getShardCursor(const ShardId& shardId);
BSONObj toBSON() const;
std::string toString() const;
void explain(BSONObjBuilder& b);
private:
void _finishCons();
void _explain( std::map< std::string,std::list >& out );
void _markStaleNS( const NamespaceString& staleNS, const StaleConfigException& e, bool& forceReload, bool& fullReload );
void _handleStaleNS( const NamespaceString& staleNS, bool forceReload, bool fullReload );
bool _didInit;
bool _done;
QuerySpec _qSpec;
CommandInfo _cInfo;
// Count round-trips req'd for namespaces and total
std::map _staleNSMap;
int _totalTries;
std::map _cursorMap;
// LEGACY BELOW
int _numServers;
int _lastFrom;
std::set _servers;
BSONObj _sortKey;
DBClientCursorHolder * _cursors;
int _needToSkip;
/**
* Setups the shard version of the connection. When using a replica
* set connection and the primary cannot be reached, the version
* will not be set if the slaveOk flag is set.
*/
void setupVersionAndHandleSlaveOk(PCStatePtr state /* in & out */,
const ShardId& shardId,
std::shared_ptr primary /* in */,
const NamespaceString& ns,
const std::string& vinfo,
ChunkManagerPtr manager /* in */ );
// LEGACY init - Needed for map reduce
void _oldInit();
// LEGACY - Needed ONLY for _oldInit
std::string _ns;
BSONObj _query;
int _options;
BSONObj _fields;
int _batchSize;
};
/**
* Helper class to manage ownership of opened cursors while merging results.
*
* TODO: Choose one set of ownership semantics so that this isn't needed - merge sort via
* mapreduce is the main issue since it has no metadata and this holder owns the cursors.
*/
class DBClientCursorHolder {
public:
DBClientCursorHolder() {}
~DBClientCursorHolder() {}
void reset(DBClientCursor* cursor, ParallelConnectionMetadata* pcmData) {
_cursor.reset(cursor);
_pcmData.reset(pcmData);
}
DBClientCursor* get() { return _cursor.get(); }
ParallelConnectionMetadata* getMData() { return _pcmData.get(); }
void release() {
_cursor.release();
_pcmData.release();
}
private:
std::unique_ptr _cursor;
std::unique_ptr _pcmData;
};
/**
* Generally clients should be using Strategy::commandOp() wherever possible - the Future API
* does not handle versioning.
*
* tools for doing asynchronous operations
* right now uses underlying sync network ops and uses another thread
* should be changed to use non-blocking io
*/
class Future {
public:
class CommandResult {
public:
std::string getServer() const { return _server; }
bool isDone() const { return _done; }
bool ok() const {
verify( _done );
return _ok;
}
BSONObj result() const {
verify( _done );
return _res;
}
/**
blocks until command is done
returns ok()
*/
bool join( int maxRetries = 1 );
private:
CommandResult( const std::string& server,
const std::string& db,
const BSONObj& cmd,
int options,
DBClientBase * conn,
bool useShardedConn );
void init();
std::string _server;
std::string _db;
int _options;
BSONObj _cmd;
DBClientBase * _conn;
std::unique_ptr _connHolder; // used if not provided a connection
bool _useShardConn;
std::unique_ptr _cursor;
BSONObj _res;
bool _ok;
bool _done;
friend class Future;
};
/**
* @param server server name
* @param db db name
* @param cmd cmd to exec
* @param conn optional connection to use. will use standard pooled if non-specified
* @param useShardConn use ShardConnection
*/
static std::shared_ptr spawnCommand( const std::string& server,
const std::string& db,
const BSONObj& cmd,
int options,
DBClientBase * conn = 0,
bool useShardConn = false );
};
}