// dbclient.cpp - connect to a Mongo database as a database, from C++
/* Copyright 2009 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
#include "mongo/platform/basic.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/bson/util/builder.h"
#include "mongo/client/constants.h"
#include "mongo/client/dbclient_rs.h"
#include "mongo/client/dbclientcursor.h"
#include "mongo/client/sasl_client_authenticate.h"
#include "mongo/client/syncclusterconnection.h"
#include "mongo/db/auth/internal_user_auth.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/json.h"
#include "mongo/db/namespace_string.h"
#include "mongo/s/stale_exception.h" // for RecvStaleConfigException
#include "mongo/util/assert_util.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/log.h"
#include "mongo/util/net/ssl_manager.h"
#include "mongo/util/net/ssl_options.h"
#include "mongo/util/password_digest.h"
namespace mongo {
using std::auto_ptr;
using std::endl;
using std::list;
using std::map;
using std::string;
using std::stringstream;
using std::vector;
AtomicInt64 DBClientBase::ConnectionIdSequence;
const char* const saslCommandUserSourceFieldName = "userSource";
void ConnectionString::_fillServers( string s ) {
//
// Custom-handled servers/replica sets start with '$'
// According to RFC-1123/952, this will not overlap with valid hostnames
// (also disallows $replicaSetName hosts)
//
if( s.find( '$' ) == 0 ) _type = CUSTOM;
{
string::size_type idx = s.find( '/' );
if ( idx != string::npos ) {
_setName = s.substr( 0 , idx );
s = s.substr( idx + 1 );
if( _type != CUSTOM ) _type = SET;
}
}
string::size_type idx;
while ( ( idx = s.find( ',' ) ) != string::npos ) {
_servers.push_back(HostAndPort(s.substr(0, idx)));
s = s.substr( idx + 1 );
}
_servers.push_back(HostAndPort(s));
}
void ConnectionString::_finishInit() {
// Needed here as well b/c the parsing logic isn't used in all constructors
// TODO: Refactor so that the parsing logic *is* used in all constructors
if ( _type == MASTER && _servers.size() > 0 ){
if( _servers[0].host().find( '$' ) == 0 ){
_type = CUSTOM;
}
}
stringstream ss;
if ( _type == SET )
ss << _setName << "/";
for ( unsigned i=0; i<_servers.size(); i++ ) {
if ( i > 0 )
ss << ",";
ss << _servers[i].toString();
}
_string = ss.str();
}
mutex ConnectionString::_connectHookMutex;
ConnectionString::ConnectionHook* ConnectionString::_connectHook = NULL;
DBClientBase* ConnectionString::connect( string& errmsg, double socketTimeout ) const {
switch ( _type ) {
case MASTER: {
DBClientConnection * c = new DBClientConnection(true);
c->setSoTimeout( socketTimeout );
LOG(1) << "creating new connection to:" << _servers[0] << endl;
if ( ! c->connect( _servers[0] , errmsg ) ) {
delete c;
return 0;
}
LOG(1) << "connected connection!" << endl;
return c;
}
case PAIR:
case SET: {
DBClientReplicaSet * set = new DBClientReplicaSet( _setName , _servers , socketTimeout );
if( ! set->connect() ) {
delete set;
errmsg = "connect failed to replica set ";
errmsg += toString();
return 0;
}
return set;
}
case SYNC: {
// TODO , don't copy
list l;
for ( unsigned i=0; i<_servers.size(); i++ )
l.push_back( _servers[i] );
SyncClusterConnection* c = new SyncClusterConnection( l, socketTimeout );
return c;
}
case CUSTOM: {
// Lock in case other things are modifying this at the same time
boost::lock_guard lk( _connectHookMutex );
// Allow the replacement of connections with other connections - useful for testing.
uassert( 16335, "custom connection to " + this->toString() +
" specified with no connection hook", _connectHook );
// Double-checked lock, since this will never be active during normal operation
DBClientBase* replacementConn = _connectHook->connect( *this, errmsg, socketTimeout );
log() << "replacing connection to " << this->toString() << " with "
<< ( replacementConn ? replacementConn->getServerAddress() : "(empty)" ) << endl;
return replacementConn;
}
case INVALID:
throw UserException( 13421 , "trying to connect to invalid ConnectionString" );
break;
}
verify( 0 );
return 0;
}
bool ConnectionString::sameLogicalEndpoint( const ConnectionString& other ) const {
if ( _type != other._type )
return false;
switch ( _type ) {
case INVALID:
return true;
case MASTER:
return _servers[0] == other._servers[0];
case PAIR:
if ( _servers[0] == other._servers[0] )
return _servers[1] == other._servers[1];
return
( _servers[0] == other._servers[1] ) &&
( _servers[1] == other._servers[0] );
case SET:
return _setName == other._setName;
case SYNC:
// The servers all have to be the same in each, but not in the same order.
if ( _servers.size() != other._servers.size() )
return false;
for ( unsigned i = 0; i < _servers.size(); i++ ) {
bool found = false;
for ( unsigned j = 0; j < other._servers.size(); j++ ) {
if ( _servers[i] == other._servers[j] ) {
found = true;
break;
}
}
if ( ! found )
return false;
}
return true;
case CUSTOM:
return _string == other._string;
}
verify( false );
}
ConnectionString ConnectionString::parse( const string& host , string& errmsg ) {
string::size_type i = host.find( '/' );
if ( i != string::npos && i != 0) {
// replica set
return ConnectionString( SET , host.substr( i + 1 ) , host.substr( 0 , i ) );
}
int numCommas = str::count( host , ',' );
if( numCommas == 0 )
return ConnectionString( HostAndPort( host ) );
if ( numCommas == 1 )
return ConnectionString( PAIR , host );
if ( numCommas == 2 )
return ConnectionString( SYNC , host );
errmsg = (string)"invalid hostname [" + host + "]";
return ConnectionString(); // INVALID
}
string ConnectionString::typeToString( ConnectionType type ) {
switch ( type ) {
case INVALID:
return "invalid";
case MASTER:
return "master";
case PAIR:
return "pair";
case SET:
return "set";
case SYNC:
return "sync";
case CUSTOM:
return "custom";
}
verify(0);
return "";
}
const BSONField Query::ReadPrefField("$readPreference");
const BSONField Query::ReadPrefModeField("mode");
const BSONField Query::ReadPrefTagsField("tags");
Query::Query( const string &json ) : obj( fromjson( json ) ) {}
Query::Query( const char *json ) : obj( fromjson( json ) ) {}
Query& Query::hint(const string &jsonKeyPatt) { return hint( fromjson( jsonKeyPatt ) ); }
Query& Query::where(const string &jscode, BSONObj scope) {
/* use where() before sort() and hint() and explain(), else this will assert. */
verify( ! isComplex() );
BSONObjBuilder b;
b.appendElements(obj);
b.appendWhere(jscode, scope);
obj = b.obj();
return *this;
}
void Query::makeComplex() {
if ( isComplex() )
return;
BSONObjBuilder b;
b.append( "query", obj );
obj = b.obj();
}
Query& Query::sort(const BSONObj& s) {
appendComplex( "orderby", s );
return *this;
}
Query& Query::hint(BSONObj keyPattern) {
appendComplex( "$hint", keyPattern );
return *this;
}
Query& Query::explain() {
appendComplex( "$explain", true );
return *this;
}
Query& Query::snapshot() {
appendComplex( "$snapshot", true );
return *this;
}
Query& Query::minKey( const BSONObj &val ) {
appendComplex( "$min", val );
return *this;
}
Query& Query::maxKey( const BSONObj &val ) {
appendComplex( "$max", val );
return *this;
}
bool Query::isComplex(const BSONObj& obj, bool* hasDollar) {
if (obj.hasElement("query")) {
if (hasDollar) *hasDollar = false;
return true;
}
if (obj.hasElement("$query")) {
if (hasDollar) *hasDollar = true;
return true;
}
return false;
}
Query& Query::readPref(ReadPreference pref, const BSONArray& tags) {
string mode;
switch (pref) {
case ReadPreference_PrimaryOnly:
mode = "primary";
break;
case ReadPreference_PrimaryPreferred:
mode = "primaryPreferred";
break;
case ReadPreference_SecondaryOnly:
mode = "secondary";
break;
case ReadPreference_SecondaryPreferred:
mode = "secondaryPreferred";
break;
case ReadPreference_Nearest:
mode = "nearest";
break;
}
BSONObjBuilder readPrefDocBuilder;
readPrefDocBuilder << ReadPrefModeField(mode);
if (!tags.isEmpty()) {
readPrefDocBuilder << ReadPrefTagsField(tags);
}
appendComplex(ReadPrefField.name().c_str(), readPrefDocBuilder.done());
return *this;
}
bool Query::isComplex( bool * hasDollar ) const {
return isComplex(obj, hasDollar);
}
bool Query::hasReadPreference(const BSONObj& queryObj) {
const bool hasReadPrefOption = queryObj["$queryOptions"].isABSONObj() &&
queryObj["$queryOptions"].Obj().hasField(ReadPrefField.name());
return (Query::isComplex(queryObj) &&
queryObj.hasField(ReadPrefField.name())) ||
hasReadPrefOption;
}
BSONObj Query::getFilter() const {
bool hasDollar;
if ( ! isComplex( &hasDollar ) )
return obj;
return obj.getObjectField( hasDollar ? "$query" : "query" );
}
BSONObj Query::getSort() const {
if ( ! isComplex() )
return BSONObj();
BSONObj ret = obj.getObjectField( "orderby" );
if (ret.isEmpty())
ret = obj.getObjectField( "$orderby" );
return ret;
}
BSONObj Query::getHint() const {
if ( ! isComplex() )
return BSONObj();
return obj.getObjectField( "$hint" );
}
bool Query::isExplain() const {
return isComplex() && obj.getBoolField( "$explain" );
}
string Query::toString() const {
return obj.toString();
}
/* --- dbclientcommands --- */
bool DBClientWithCommands::isOk(const BSONObj& o) {
return o["ok"].trueValue();
}
bool DBClientWithCommands::isNotMasterErrorString( const BSONElement& e ) {
return e.type() == String && str::contains( e.valuestr() , "not master" );
}
enum QueryOptions DBClientWithCommands::availableOptions() {
if ( !_haveCachedAvailableOptions ) {
_cachedAvailableOptions = _lookupAvailableOptions();
_haveCachedAvailableOptions = true;
}
return _cachedAvailableOptions;
}
enum QueryOptions DBClientWithCommands::_lookupAvailableOptions() {
BSONObj ret;
if ( runCommand( "admin", BSON( "availablequeryoptions" << 1 ), ret ) ) {
return QueryOptions( ret.getIntField( "options" ) );
}
return QueryOptions(0);
}
void DBClientWithCommands::setRunCommandHook(RunCommandHookFunc func) {
_runCommandHook = func;
}
void DBClientWithCommands::setPostRunCommandHook(PostRunCommandHookFunc func) {
_postRunCommandHook = func;
}
bool DBClientWithCommands::runCommand(const string &dbname,
const BSONObj& cmd,
BSONObj &info,
int options) {
string ns = dbname + ".$cmd";
if (_runCommandHook) {
BSONObjBuilder cmdObj;
cmdObj.appendElements(cmd);
_runCommandHook(&cmdObj);
info = findOne(ns, cmdObj.done(), 0 , options);
}
else {
info = findOne(ns, cmd, 0 , options);
}
if (_postRunCommandHook) {
_postRunCommandHook(info, getServerAddress());
}
return isOk(info);
}
/* note - we build a bson obj here -- for something that is super common like getlasterror you
should have that object prebuilt as that would be faster.
*/
bool DBClientWithCommands::simpleCommand(const string &dbname, BSONObj *info, const string &command) {
BSONObj o;
if ( info == 0 )
info = &o;
BSONObjBuilder b;
b.append(command, 1);
return runCommand(dbname, b.done(), *info);
}
unsigned long long DBClientWithCommands::count(const string &myns, const BSONObj& query, int options, int limit, int skip ) {
BSONObj cmd = _countCmd( myns , query , options , limit , skip );
BSONObj res;
if( !runCommand(nsToDatabase(myns), cmd, res, options) )
uasserted(11010,string("count fails:") + res.toString());
return res["n"].numberLong();
}
BSONObj DBClientWithCommands::_countCmd(const string &myns, const BSONObj& query, int options, int limit, int skip ) {
NamespaceString ns(myns);
BSONObjBuilder b;
b.append( "count" , ns.coll() );
b.append( "query" , query );
if ( limit )
b.append( "limit" , limit );
if ( skip )
b.append( "skip" , skip );
return b.obj();
}
BSONObj DBClientWithCommands::getLastErrorDetailed(bool fsync, bool j, int w, int wtimeout) {
return getLastErrorDetailed("admin", fsync, j, w, wtimeout);
}
BSONObj DBClientWithCommands::getLastErrorDetailed(const std::string& db,
bool fsync,
bool j,
int w,
int wtimeout) {
BSONObj info;
BSONObjBuilder b;
b.append( "getlasterror", 1 );
if ( fsync )
b.append( "fsync", 1 );
if ( j )
b.append( "j", 1 );
// only affects request when greater than one node
if ( w >= 1 )
b.append( "w", w );
else if ( w == -1 )
b.append( "w", "majority" );
if ( wtimeout > 0 )
b.append( "wtimeout", wtimeout );
runCommand(db, b.obj(), info);
return info;
}
string DBClientWithCommands::getLastError(bool fsync, bool j, int w, int wtimeout) {
return getLastError("admin", fsync, j, w, wtimeout);
}
string DBClientWithCommands::getLastError(const std::string& db,
bool fsync,
bool j,
int w,
int wtimeout) {
BSONObj info = getLastErrorDetailed(db, fsync, j, w, wtimeout);
return getLastErrorString( info );
}
string DBClientWithCommands::getLastErrorString(const BSONObj& info) {
if (info["ok"].trueValue()) {
BSONElement e = info["err"];
if (e.eoo()) return "";
if (e.type() == Object) return e.toString();
return e.str();
} else {
// command failure
BSONElement e = info["errmsg"];
if (e.eoo()) return "";
if (e.type() == Object) return "getLastError command failed: " + e.toString();
return "getLastError command failed: " + e.str();
}
}
const BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}");
BSONObj DBClientWithCommands::getPrevError() {
BSONObj info;
runCommand("admin", getpreverrorcmdobj, info);
return info;
}
BSONObj getnoncecmdobj = fromjson("{getnonce:1}");
string DBClientWithCommands::createPasswordDigest( const string & username , const string & clearTextPassword ) {
return mongo::createPasswordDigest(username, clearTextPassword);
}
namespace {
class RunCommandHookOverrideGuard {
MONGO_DISALLOW_COPYING(RunCommandHookOverrideGuard);
public:
RunCommandHookOverrideGuard(DBClientWithCommands* cli,
const DBClientWithCommands::RunCommandHookFunc& hookFunc)
: _cli(cli), _oldHookFunc(cli->getRunCommandHook()) {
cli->setRunCommandHook(hookFunc);
}
~RunCommandHookOverrideGuard() {
_cli->setRunCommandHook(_oldHookFunc);
}
private:
DBClientWithCommands* const _cli;
DBClientWithCommands::RunCommandHookFunc const _oldHookFunc;
};
} // namespace
void DBClientWithCommands::_auth(const BSONObj& params) {
RunCommandHookOverrideGuard hookGuard(this, RunCommandHookFunc());
std::string mechanism;
uassertStatusOK(bsonExtractStringField(params,
saslCommandMechanismFieldName,
&mechanism));
uassert(17232, "You cannot specify both 'db' and 'userSource'. Please use only 'db'.",
!(params.hasField(saslCommandUserDBFieldName)
&& params.hasField(saslCommandUserSourceFieldName)));
if (mechanism == StringData("MONGODB-CR", StringData::LiteralTag())) {
std::string db;
if (params.hasField(saslCommandUserSourceFieldName)) {
uassertStatusOK(bsonExtractStringField(params,
saslCommandUserSourceFieldName,
&db));
}
else {
uassertStatusOK(bsonExtractStringField(params,
saslCommandUserDBFieldName,
&db));
}
std::string user;
uassertStatusOK(bsonExtractStringField(params,
saslCommandUserFieldName,
&user));
std::string password;
uassertStatusOK(bsonExtractStringField(params,
saslCommandPasswordFieldName,
&password));
bool digestPassword;
uassertStatusOK(bsonExtractBooleanFieldWithDefault(params,
saslCommandDigestPasswordFieldName,
true,
&digestPassword));
BSONObj result;
uassert(result["code"].Int(),
result.toString(),
_authMongoCR(db, user, password, &result, digestPassword));
}
#ifdef MONGO_SSL
else if (mechanism == StringData("MONGODB-X509", StringData::LiteralTag())){
std::string db;
if (params.hasField(saslCommandUserSourceFieldName)) {
uassertStatusOK(bsonExtractStringField(params,
saslCommandUserSourceFieldName,
&db));
}
else {
uassertStatusOK(bsonExtractStringField(params,
saslCommandUserDBFieldName,
&db));
}
std::string user;
uassertStatusOK(bsonExtractStringField(params,
saslCommandUserFieldName,
&user));
uassert(ErrorCodes::AuthenticationFailed,
"Please enable SSL on the client-side to use the MONGODB-X509 "
"authentication mechanism.",
getSSLManager() != NULL);
uassert(ErrorCodes::AuthenticationFailed,
"Username \"" + user +
"\" does not match the provided client certificate user \"" +
getSSLManager()->getSSLConfiguration().clientSubjectName + "\"",
user == getSSLManager()->getSSLConfiguration().clientSubjectName);
BSONObj result;
uassert(result["code"].Int(),
result.toString(),
_authX509(db, user, &result));
}
#endif
else if (saslClientAuthenticate != NULL) {
uassertStatusOK(saslClientAuthenticate(this, params));
}
else {
uasserted(ErrorCodes::BadValue,
mechanism + " mechanism support not compiled into client library.");
}
};
void DBClientWithCommands::auth(const BSONObj& params) {
try {
_auth(params);
return;
} catch(const UserException& ex) {
if (getFallbackAuthParams(params).isEmpty() ||
(ex.getCode() != ErrorCodes::BadValue &&
ex.getCode() != ErrorCodes::CommandNotFound)) {
throw ex;
}
}
// BadValue or CommandNotFound indicates unsupported auth mechanism so fall back to
// MONGODB-CR for 2.6 compatibility.
_auth(getFallbackAuthParams(params));
}
bool DBClientWithCommands::auth(const string &dbname,
const string &username,
const string &password_text,
string& errmsg,
bool digestPassword) {
try {
auth(BSON(saslCommandMechanismFieldName << "SCRAM-SHA-1" <<
saslCommandUserDBFieldName << dbname <<
saslCommandUserFieldName << username <<
saslCommandPasswordFieldName << password_text <<
saslCommandDigestPasswordFieldName << digestPassword));
return true;
} catch(const UserException& ex) {
if (ex.getCode() != ErrorCodes::AuthenticationFailed)
throw;
errmsg = ex.what();
return false;
}
}
bool DBClientWithCommands::_authMongoCR(const string &dbname,
const string &username,
const string &password_text,
BSONObj *info,
bool digestPassword) {
string password = password_text;
if( digestPassword )
password = createPasswordDigest( username , password_text );
string nonce;
if( !runCommand(dbname, getnoncecmdobj, *info) ) {
return false;
}
{
BSONElement e = info->getField("nonce");
verify( e.type() == String );
nonce = e.valuestr();
}
BSONObj authCmd;
BSONObjBuilder b;
{
b << "authenticate" << 1 << "nonce" << nonce << "user" << username;
md5digest d;
{
md5_state_t st;
md5_init(&st);
md5_append(&st, (const md5_byte_t *) nonce.c_str(), nonce.size() );
md5_append(&st, (const md5_byte_t *) username.data(), username.length());
md5_append(&st, (const md5_byte_t *) password.c_str(), password.size() );
md5_finish(&st, d);
}
b << "key" << digestToString( d );
authCmd = b.done();
}
if( runCommand(dbname, authCmd, *info) ) {
return true;
}
return false;
}
bool DBClientWithCommands::_authX509(const string&dbname,
const string &username,
BSONObj *info){
BSONObj authCmd;
BSONObjBuilder cmdBuilder;
cmdBuilder << "authenticate" << 1 << "mechanism" << "MONGODB-X509" << "user" << username;
authCmd = cmdBuilder.done();
if( runCommand(dbname, authCmd, *info) ) {
return true;
}
return false;
}
void DBClientWithCommands::logout(const string &dbname, BSONObj& info) {
runCommand(dbname, BSON("logout" << 1), info);
}
BSONObj ismastercmdobj = fromjson("{\"ismaster\":1}");
bool DBClientWithCommands::isMaster(bool& isMaster, BSONObj *info) {
BSONObj o;
if ( info == 0 )
info = &o;
bool ok = runCommand("admin", ismastercmdobj, *info);
isMaster = info->getField("ismaster").trueValue();
return ok;
}
bool DBClientWithCommands::createCollection(const string &ns, long long size, bool capped, int max, BSONObj *info) {
verify(!capped||size);
BSONObj o;
if ( info == 0 ) info = &o;
BSONObjBuilder b;
string db = nsToDatabase(ns);
b.append("create", ns.c_str() + db.length() + 1);
if ( size ) b.append("size", size);
if ( capped ) b.append("capped", true);
if ( max ) b.append("max", max);
return runCommand(db.c_str(), b.done(), *info);
}
bool DBClientWithCommands::copyDatabase(const string &fromdb, const string &todb, const string &fromhost, BSONObj *info) {
BSONObj o;
if ( info == 0 ) info = &o;
BSONObjBuilder b;
b.append("copydb", 1);
b.append("fromhost", fromhost);
b.append("fromdb", fromdb);
b.append("todb", todb);
return runCommand("admin", b.done(), *info);
}
bool DBClientWithCommands::setDbProfilingLevel(const string &dbname, ProfilingLevel level, BSONObj *info ) {
BSONObj o;
if ( info == 0 ) info = &o;
if ( level ) {
// Create system.profile collection. If it already exists this does nothing.
// TODO: move this into the db instead of here so that all
// drivers don't have to do this.
string ns = dbname + ".system.profile";
createCollection(ns.c_str(), 1024 * 1024, true, 0, info);
}
BSONObjBuilder b;
b.append("profile", (int) level);
return runCommand(dbname, b.done(), *info);
}
BSONObj getprofilingcmdobj = fromjson("{\"profile\":-1}");
bool DBClientWithCommands::getDbProfilingLevel(const string &dbname, ProfilingLevel& level, BSONObj *info) {
BSONObj o;
if ( info == 0 ) info = &o;
if ( runCommand(dbname, getprofilingcmdobj, *info) ) {
level = (ProfilingLevel) info->getIntField("was");
return true;
}
return false;
}
DBClientWithCommands::MROutput DBClientWithCommands::MRInline (BSON("inline" << 1));
BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, MROutput output) {
BSONObjBuilder b;
b.append("mapreduce", nsGetCollection(ns));
b.appendCode("map", jsmapf);
b.appendCode("reduce", jsreducef);
if( !query.isEmpty() )
b.append("query", query);
b.append("out", output.out);
BSONObj info;
runCommand(nsGetDB(ns), b.done(), info);
return info;
}
bool DBClientWithCommands::eval(const string &dbname, const string &jscode, BSONObj& info, BSONElement& retValue, BSONObj *args) {
BSONObjBuilder b;
b.appendCode("$eval", jscode);
if ( args )
b.appendArray("args", *args);
bool ok = runCommand(dbname, b.done(), info);
if ( ok )
retValue = info.getField("retval");
return ok;
}
bool DBClientWithCommands::eval(const string &dbname, const string &jscode) {
BSONObj info;
BSONElement retValue;
return eval(dbname, jscode, info, retValue);
}
list DBClientWithCommands::getDatabaseNames() {
BSONObj info;
uassert(10005, "listdatabases failed", runCommand("admin",
BSON("listDatabases" << 1),
info,
QueryOption_SlaveOk));
uassert( 10006 , "listDatabases.databases not array" , info["databases"].type() == Array );
list names;
BSONObjIterator i( info["databases"].embeddedObjectUserCheck() );
while ( i.more() ) {
names.push_back( i.next().embeddedObjectUserCheck()["name"].valuestr() );
}
return names;
}
list DBClientWithCommands::getCollectionNames( const string& db ) {
list infos = getCollectionInfos( db );
list names;
for ( list::iterator it = infos.begin(); it != infos.end(); ++it ) {
names.push_back( db + "." + (*it)["name"].valuestr() );
}
return names;
}
list DBClientWithCommands::getCollectionInfos( const string& db,
const BSONObj& filter ) {
list infos;
// first we're going to try the command
// it was only added in 3.0, so if we're talking to an older server
// we'll fail back to querying system.namespaces
// TODO(spencer): remove fallback behavior after 3.0
{
BSONObj res;
if (runCommand(db,
BSON("listCollections" << 1 << "filter" << filter
<< "cursor" << BSONObj()),
res,
QueryOption_SlaveOk)) {
BSONObj cursorObj = res["cursor"].Obj();
BSONObj collections = cursorObj["firstBatch"].Obj();
BSONObjIterator it( collections );
while ( it.more() ) {
BSONElement e = it.next();
infos.push_back( e.Obj().getOwned() );
}
const long long id = cursorObj["id"].Long();
if ( id != 0 ) {
const std::string ns = cursorObj["ns"].String();
auto_ptr cursor = getMore(ns, id, 0, 0);
while ( cursor->more() ) {
infos.push_back(cursor->nextSafe().getOwned());
}
}
return infos;
}
// command failed
int code = res["code"].numberInt();
string errmsg = res["errmsg"].valuestrsafe();
if ( code == ErrorCodes::CommandNotFound ||
errmsg.find( "no such cmd" ) != string::npos ) {
// old version of server, ok, fall through to old code
}
else {
uasserted( 18630, str::stream() << "listCollections failed: " << res );
}
}
// SERVER-14951 filter for old version fallback needs to db qualify the 'name' element
BSONObjBuilder fallbackFilter;
if ( filter.hasField( "name" ) && filter["name"].type() == String ) {
fallbackFilter.append( "name", db + "." + filter["name"].str() );
}
fallbackFilter.appendElementsUnique( filter );
string ns = db + ".system.namespaces";
auto_ptr c = query(
ns.c_str(), fallbackFilter.obj(), 0, 0, 0, QueryOption_SlaveOk);
uassert(28611, str::stream() << "listCollections failed querying " << ns, c.get());
while ( c->more() ) {
BSONObj obj = c->nextSafe();
string ns = obj["name"].valuestr();
if ( ns.find( "$" ) != string::npos )
continue;
BSONObjBuilder b;
b.append( "name", ns.substr( db.size() + 1 ) );
b.appendElementsUnique( obj );
infos.push_back( b.obj() );
}
return infos;
}
bool DBClientWithCommands::exists( const string& ns ) {
BSONObj filter = BSON( "name" << nsToCollectionSubstring( ns ) );
list results = getCollectionInfos( nsToDatabase( ns ), filter );
return !results.empty();
}
/* --- dbclientconnection --- */
void DBClientConnection::_auth(const BSONObj& params) {
if( autoReconnect ) {
/* note we remember the auth info before we attempt to auth -- if the connection is broken, we will
then have it for the next autoreconnect attempt.
*/
authCache[params[saslCommandUserDBFieldName].str()] = params.getOwned();
}
DBClientBase::_auth(params);
}
/** query N objects from the database into an array. makes sense mostly when you want a small number of results. if a huge number, use
query() and iterate the cursor.
*/
void DBClientInterface::findN(vector& out, const string& ns, Query query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions) {
out.reserve(nToReturn);
auto_ptr c =
this->query(ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions);
uassert( 10276 , str::stream() << "DBClientBase::findN: transport error: " << getServerAddress() << " ns: " << ns << " query: " << query.toString(), c.get() );
if ( c->hasResultFlag( ResultFlag_ShardConfigStale ) ){
BSONObj error;
c->peekError( &error );
throw RecvStaleConfigException( "findN stale config", error );
}
for( int i = 0; i < nToReturn; i++ ) {
if ( !c->more() )
break;
out.push_back( c->nextSafe().copy() );
}
}
BSONObj DBClientInterface::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) {
vector v;
findN(v, ns, query, 1, 0, fieldsToReturn, queryOptions);
return v.empty() ? BSONObj() : v[0];
}
bool DBClientConnection::connect(const HostAndPort& server, string& errmsg) {
_server = server;
_serverString = _server.toString();
return _connect( errmsg );
}
bool DBClientConnection::_connect( string& errmsg ) {
_serverString = _server.toString();
_serverAddrString.clear();
// we keep around SockAddr for connection life -- maybe MessagingPort
// requires that?
std::auto_ptr serverSockAddr(new SockAddr(_server.host().c_str(),
_server.port()));
if (!serverSockAddr->isValid()) {
errmsg = str::stream() << "couldn't initialize connection to host "
<< _server.host().c_str() << ", address is invalid";
return false;
}
server.reset(serverSockAddr.release());
p.reset(new MessagingPort( _so_timeout, _logLevel ));
if (_server.host().empty() ) {
errmsg = str::stream() << "couldn't connect to server " << toString()
<< ", host is empty";
return false;
}
_serverAddrString = server->getAddr();
if ( _serverAddrString == "0.0.0.0" ) {
errmsg = str::stream() << "couldn't connect to server " << toString()
<< ", address resolved to 0.0.0.0";
return false;
}
if ( !p->connect(*server) ) {
errmsg = str::stream() << "couldn't connect to server " << toString()
<< ", connection attempt failed";
_failed = true;
return false;
}
else {
LOG( 1 ) << "connected to server " << toString() << endl;
}
#ifdef MONGO_SSL
int sslModeVal = sslGlobalParams.sslMode.load();
if (sslModeVal == SSLGlobalParams::SSLMode_preferSSL ||
sslModeVal == SSLGlobalParams::SSLMode_requireSSL) {
return p->secure( sslManager(), _server.host() );
}
#endif
return true;
}
void DBClientConnection::logout(const string& dbname, BSONObj& info){
authCache.erase(dbname);
runCommand(dbname, BSON("logout" << 1), info);
}
bool DBClientConnection::runCommand(const string &dbname,
const BSONObj& cmd,
BSONObj &info,
int options) {
if (DBClientWithCommands::runCommand(dbname, cmd, info, options))
return true;
if ( clientSet && isNotMasterErrorString( info["errmsg"] ) ) {
clientSet->isntMaster();
}
return false;
}
void DBClientConnection::_checkConnection() {
if ( !_failed )
return;
if ( !autoReconnect )
throw SocketException( SocketException::FAILED_STATE , toString() );
// Don't hammer reconnects, backoff if needed
autoReconnectBackoff.nextSleepMillis();
LOG(_logLevel) << "trying reconnect to " << toString() << endl;
string errmsg;
_failed = false;
if ( ! _connect(errmsg) ) {
_failed = true;
LOG(_logLevel) << "reconnect " << toString() << " failed " << errmsg << endl;
throw SocketException( SocketException::CONNECT_ERROR , toString() );
}
LOG(_logLevel) << "reconnect " << toString() << " ok" << endl;
for( map::const_iterator i = authCache.begin(); i != authCache.end(); i++ ) {
try {
DBClientConnection::_auth(i->second);
} catch (UserException& ex) {
if (ex.getCode() != ErrorCodes::AuthenticationFailed)
throw;
LOG(_logLevel) << "reconnect: auth failed " <<
i->second[saslCommandUserDBFieldName] <<
i->second[saslCommandUserFieldName] << ' ' <<
ex.what() << std::endl;
}
}
}
void DBClientConnection::setSoTimeout(double timeout) {
_so_timeout = timeout;
if (p) {
p->setSocketTimeout(timeout);
}
}
uint64_t DBClientConnection::getSockCreationMicroSec() const {
if (p) {
return p->getSockCreationMicroSec();
}
else {
return INVALID_SOCK_CREATION_TIME;
}
}
const uint64_t DBClientBase::INVALID_SOCK_CREATION_TIME =
static_cast(0xFFFFFFFFFFFFFFFFULL);
auto_ptr DBClientBase::query(const string &ns, Query query, int nToReturn,
int nToSkip, const BSONObj *fieldsToReturn, int queryOptions , int batchSize ) {
auto_ptr c( new DBClientCursor( this,
ns, query.obj, nToReturn, nToSkip,
fieldsToReturn, queryOptions , batchSize ) );
if ( c->init() )
return c;
return auto_ptr< DBClientCursor >( 0 );
}
auto_ptr DBClientBase::getMore( const string &ns, long long cursorId, int nToReturn, int options ) {
auto_ptr c( new DBClientCursor( this, ns, cursorId, nToReturn, options ) );
if ( c->init() )
return c;
return auto_ptr< DBClientCursor >( 0 );
}
struct DBClientFunConvertor {
void operator()( DBClientCursorBatchIterator &i ) {
while( i.moreInCurrentBatch() ) {
_f( i.nextSafe() );
}
}
stdx::function _f;
};
unsigned long long DBClientBase::query( stdx::function f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) {
DBClientFunConvertor fun;
fun._f = f;
stdx::function ptr( fun );
return this->query( ptr, ns, query, fieldsToReturn, queryOptions );
}
unsigned long long DBClientBase::query(
stdx::function f,
const string& ns,
Query query,
const BSONObj *fieldsToReturn,
int queryOptions ) {
// mask options
queryOptions &= (int)( QueryOption_NoCursorTimeout | QueryOption_SlaveOk );
auto_ptr c( this->query(ns, query, 0, 0, fieldsToReturn, queryOptions) );
uassert( 16090, "socket error for mapping query", c.get() );
unsigned long long n = 0;
while ( c->more() ) {
DBClientCursorBatchIterator i( *c );
f( i );
n += i.n();
}
return n;
}
void DBClientConnection::setReplSetClientCallback(DBClientReplicaSet* rsClient) {
clientSet = rsClient;
}
unsigned long long DBClientConnection::query(
stdx::function f,
const string& ns,
Query query,
const BSONObj *fieldsToReturn,
int queryOptions ) {
if ( ! ( availableOptions() & QueryOption_Exhaust ) ) {
return DBClientBase::query( f, ns, query, fieldsToReturn, queryOptions );
}
// mask options
queryOptions &= (int)( QueryOption_NoCursorTimeout | QueryOption_SlaveOk );
queryOptions |= (int)QueryOption_Exhaust;
auto_ptr c( this->query(ns, query, 0, 0, fieldsToReturn, queryOptions) );
uassert( 13386, "socket error for mapping query", c.get() );
unsigned long long n = 0;
try {
while( 1 ) {
while( c->moreInCurrentBatch() ) {
DBClientCursorBatchIterator i( *c );
f( i );
n += i.n();
}
if( c->getCursorId() == 0 )
break;
c->exhaustReceiveMore();
}
}
catch(std::exception&) {
/* connection CANNOT be used anymore as more data may be on the way from the server.
we have to reconnect.
*/
_failed = true;
p->shutdown();
throw;
}
return n;
}
void DBClientBase::insert( const string & ns , BSONObj obj , int flags) {
Message toSend;
BufBuilder b;
int reservedFlags = 0;
if( flags & InsertOption_ContinueOnError )
reservedFlags |= Reserved_InsertOption_ContinueOnError;
if( flags & WriteOption_FromWriteback )
reservedFlags |= Reserved_FromWriteback;
b.appendNum( reservedFlags );
b.appendStr( ns );
obj.appendSelfToBufBuilder( b );
toSend.setData( dbInsert , b.buf() , b.len() );
say( toSend );
}
// TODO: Merge with other insert implementation?
void DBClientBase::insert( const string & ns , const vector< BSONObj > &v , int flags) {
Message toSend;
BufBuilder b;
int reservedFlags = 0;
if( flags & InsertOption_ContinueOnError )
reservedFlags |= Reserved_InsertOption_ContinueOnError;
if( flags & WriteOption_FromWriteback ){
reservedFlags |= Reserved_FromWriteback;
flags ^= WriteOption_FromWriteback;
}
b.appendNum( reservedFlags );
b.appendStr( ns );
for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i )
i->appendSelfToBufBuilder( b );
toSend.setData( dbInsert, b.buf(), b.len() );
say( toSend );
}
void DBClientBase::remove( const string & ns , Query obj , bool justOne ) {
int flags = 0;
if( justOne ) flags |= RemoveOption_JustOne;
remove( ns, obj, flags );
}
void DBClientBase::remove( const string & ns , Query obj , int flags ) {
Message toSend;
BufBuilder b;
int reservedFlags = 0;
if( flags & WriteOption_FromWriteback ){
reservedFlags |= WriteOption_FromWriteback;
flags ^= WriteOption_FromWriteback;
}
b.appendNum( reservedFlags );
b.appendStr( ns );
b.appendNum( flags );
obj.obj.appendSelfToBufBuilder( b );
toSend.setData( dbDelete , b.buf() , b.len() );
say( toSend );
}
void DBClientBase::update( const string & ns , Query query , BSONObj obj , bool upsert, bool multi ) {
int flags = 0;
if ( upsert ) flags |= UpdateOption_Upsert;
if ( multi ) flags |= UpdateOption_Multi;
update( ns, query, obj, flags );
}
void DBClientBase::update( const string & ns , Query query , BSONObj obj , int flags ) {
BufBuilder b;
int reservedFlags = 0;
if( flags & WriteOption_FromWriteback ){
reservedFlags |= Reserved_FromWriteback;
flags ^= WriteOption_FromWriteback;
}
b.appendNum( reservedFlags ); // reserved
b.appendStr( ns );
b.appendNum( flags );
query.obj.appendSelfToBufBuilder( b );
obj.appendSelfToBufBuilder( b );
Message toSend;
toSend.setData( dbUpdate , b.buf() , b.len() );
say( toSend );
}
list DBClientWithCommands::getIndexSpecs( const string &ns, int options ) {
list specs;
{
BSONObj cmd = BSON(
"listIndexes" << nsToCollectionSubstring( ns ) <<
"cursor" << BSONObj()
);
BSONObj res;
if ( runCommand( nsToDatabase( ns ), cmd, res, options ) ) {
BSONObj cursorObj = res["cursor"].Obj();
BSONObjIterator i( cursorObj["firstBatch"].Obj() );
while ( i.more() ) {
specs.push_back( i.next().Obj().getOwned() );
}
const long long id = cursorObj["id"].Long();
if ( id != 0 ) {
const std::string ns = cursorObj["ns"].String();
auto_ptr cursor = getMore(ns, id, 0, 0);
while ( cursor->more() ) {
specs.push_back(cursor->nextSafe().getOwned());
}
}
return specs;
}
int code = res["code"].numberInt();
string errmsg = res["errmsg"].valuestrsafe();
if ( code == ErrorCodes::CommandNotFound ||
errmsg.find( "no such cmd" ) != string::npos ) {
// old version of server, ok, fall through to old code
}
else if ( code == ErrorCodes::NamespaceNotFound ) {
return specs;
}
else {
uasserted( 18631, str::stream() << "listIndexes failed: " << res );
}
}
// fallback to querying system.indexes
// TODO(spencer): Remove fallback behavior after 3.0
auto_ptr cursor = query(NamespaceString(ns).getSystemIndexesCollection(),
BSON("ns" << ns), 0, 0, 0, options);
uassert(28612, str::stream() << "listIndexes failed querying " << ns, cursor.get());
while ( cursor->more() ) {
BSONObj spec = cursor->nextSafe();
specs.push_back( spec.getOwned() );
}
return specs;
}
void DBClientWithCommands::dropIndex( const string& ns , BSONObj keys ) {
dropIndex( ns , genIndexName( keys ) );
}
void DBClientWithCommands::dropIndex( const string& ns , const string& indexName ) {
BSONObj info;
if ( ! runCommand( nsToDatabase( ns ) ,
BSON( "deleteIndexes" << nsToCollectionSubstring(ns) << "index" << indexName ) ,
info ) ) {
LOG(_logLevel) << "dropIndex failed: " << info << endl;
uassert( 10007 , "dropIndex failed" , 0 );
}
resetIndexCache();
}
void DBClientWithCommands::dropIndexes( const string& ns ) {
BSONObj info;
uassert( 10008,
"dropIndexes failed",
runCommand( nsToDatabase( ns ),
BSON( "deleteIndexes" << nsToCollectionSubstring(ns) << "index" << "*"),
info )
);
resetIndexCache();
}
void DBClientWithCommands::reIndex( const string& ns ) {
resetIndexCache();
BSONObj info;
uassert(18908,
str::stream() << "reIndex failed: " << info,
runCommand(nsToDatabase(ns),
BSON("reIndex" << nsToCollectionSubstring(ns)),
info)
);
}
string DBClientWithCommands::genIndexName( const BSONObj& keys ) {
stringstream ss;
bool first = 1;
for ( BSONObjIterator i(keys); i.more(); ) {
BSONElement f = i.next();
if ( first )
first = 0;
else
ss << "_";
ss << f.fieldName() << "_";
if( f.isNumber() )
ss << f.numberInt();
else
ss << f.str(); //this should match up with shell command
}
return ss.str();
}
bool DBClientWithCommands::ensureIndex( const string &ns,
BSONObj keys,
bool unique,
const string & name,
bool cache,
bool background,
int version,
int ttl ) {
BSONObjBuilder toSave;
toSave.append( "ns" , ns );
toSave.append( "key" , keys );
string cacheKey(ns);
cacheKey += "--";
if ( name != "" ) {
toSave.append( "name" , name );
cacheKey += name;
}
else {
string nn = genIndexName( keys );
toSave.append( "name" , nn );
cacheKey += nn;
}
if( version >= 0 )
toSave.append("v", version);
if ( unique )
toSave.appendBool( "unique", unique );
if( background )
toSave.appendBool( "background", true );
if ( _seenIndexes.count( cacheKey ) )
return 0;
if ( cache )
_seenIndexes.insert( cacheKey );
if ( ttl > 0 )
toSave.append( "expireAfterSeconds", ttl );
insert( NamespaceString( ns ).getSystemIndexesCollection() , toSave.obj() );
return 1;
}
void DBClientWithCommands::resetIndexCache() {
_seenIndexes.clear();
}
/* -- DBClientCursor ---------------------------------------------- */
void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend ) {
if (kDebugBuild) {
massert( 10337 , (string)"object not valid assembleRequest query" , query.isValid() );
}
// see query.h for the protocol we are using here.
BufBuilder b;
int opts = queryOptions;
b.appendNum(opts);
b.appendStr(ns);
b.appendNum(nToSkip);
b.appendNum(nToReturn);
query.appendSelfToBufBuilder(b);
if ( fieldsToReturn )
fieldsToReturn->appendSelfToBufBuilder(b);
toSend.setData(dbQuery, b.buf(), b.len());
}
void DBClientConnection::say( Message &toSend, bool isRetry , string * actualServer ) {
checkConnection();
try {
port().say( toSend );
}
catch( SocketException & ) {
_failed = true;
throw;
}
}
void DBClientConnection::sayPiggyBack( Message &toSend ) {
port().piggyBack( toSend );
}
bool DBClientConnection::recv( Message &m ) {
if (port().recv(m)) {
return true;
}
_failed = true;
return false;
}
bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) {
/* todo: this is very ugly messagingport::call returns an error code AND can throw
an exception. we should make it return void and just throw an exception anytime
it fails
*/
checkConnection();
try {
if ( !port().call(toSend, response) ) {
_failed = true;
if ( assertOk )
uasserted( 10278 , str::stream() << "dbclient error communicating with server: " << getServerAddress() );
return false;
}
}
catch( SocketException & ) {
_failed = true;
throw;
}
return true;
}
BSONElement getErrField(const BSONObj& o) {
BSONElement first = o.firstElement();
if( strcmp(first.fieldName(), "$err") == 0 )
return first;
// temp - will be DEV only later
/*DEV*/
if( 1 ) {
BSONElement e = o["$err"];
if( !e.eoo() ) {
wassert(false);
}
return e;
}
return BSONElement();
}
bool hasErrField( const BSONObj& o ){
return ! getErrField( o ).eoo();
}
void DBClientConnection::checkResponse( const char *data, int nReturned, bool* retry, string* host ) {
/* check for errors. the only one we really care about at
* this stage is "not master"
*/
*retry = false;
*host = _serverString;
if ( clientSet && nReturned ) {
verify(data);
BSONObj o(data);
if ( isNotMasterErrorString( getErrField(o) ) ) {
clientSet->isntMaster();
}
}
}
void DBClientConnection::killCursor( long long cursorId ) {
StackBufBuilder b;
b.appendNum( (int)0 ); // reserved
b.appendNum( (int)1 ); // number
b.appendNum( cursorId );
Message m;
m.setData( dbKillCursors , b.buf() , b.len() );
if ( _lazyKillCursor )
sayPiggyBack( m );
else
say(m);
}
#ifdef MONGO_SSL
static SimpleMutex s_mtx("SSLManager");
static SSLManagerInterface* s_sslMgr(NULL);
SSLManagerInterface* DBClientConnection::sslManager() {
SimpleMutex::scoped_lock lk(s_mtx);
if (s_sslMgr)
return s_sslMgr;
s_sslMgr = getSSLManager();
return s_sslMgr;
}
#endif
AtomicInt32 DBClientConnection::_numConnections;
bool DBClientConnection::_lazyKillCursor = true;
bool serverAlive( const string &uri ) {
DBClientConnection c( false, 0, 20 ); // potentially the connection to server could fail while we're checking if it's alive - so use timeouts
string err;
if ( !c.connect( HostAndPort(uri), err ) )
return false;
if ( !c.simpleCommand( "admin", 0, "ping" ) )
return false;
return true;
}
/** @return the database name portion of an ns string */
string nsGetDB( const string &ns ) {
string::size_type pos = ns.find( "." );
if ( pos == string::npos )
return ns;
return ns.substr( 0 , pos );
}
/** @return the collection name portion of an ns string */
string nsGetCollection( const string &ns ) {
string::size_type pos = ns.find( "." );
if ( pos == string::npos )
return "";
return ns.substr( pos + 1 );
}
} // namespace mongo