// syncclusterconnection.cpp
/*
* Copyright 2010 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
#include "mongo/platform/basic.h"
#include "mongo/client/syncclusterconnection.h"
#include "mongo/client/dbclientcursor.h"
#include "mongo/client/dbclientinterface.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/namespace_string.h"
#include "mongo/util/log.h"
// error codes 8000-8009
namespace mongo {
using std::unique_ptr;
using std::endl;
using std::list;
using std::map;
using std::string;
using std::stringstream;
using std::vector;
SyncClusterConnection::SyncClusterConnection( const list & L, double socketTimeout)
: _socketTimeout(socketTimeout) {
{
stringstream s;
int n=0;
for( list::const_iterator i = L.begin(); i != L.end(); i++ ) {
if( ++n > 1 ) s << ',';
s << i->toString();
}
_address = s.str();
}
for( list::const_iterator i = L.begin(); i != L.end(); i++ )
_connect( i->toString() );
}
SyncClusterConnection::SyncClusterConnection( string commaSeparated, double socketTimeout) :
_socketTimeout( socketTimeout ) {
_address = commaSeparated;
string::size_type idx;
while ( ( idx = commaSeparated.find( ',' ) ) != string::npos ) {
string h = commaSeparated.substr( 0 , idx );
commaSeparated = commaSeparated.substr( idx + 1 );
_connect( h );
}
_connect( commaSeparated );
uassert( 8004 , "SyncClusterConnection needs 3 servers" , _conns.size() == 3 );
}
SyncClusterConnection::SyncClusterConnection(
const std::string& a,
const std::string& b,
const std::string& c,
double socketTimeout) : _socketTimeout( socketTimeout ) {
_address = a + "," + b + "," + c;
// connect to all even if not working
_connect( a );
_connect( b );
_connect( c );
}
SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev, double socketTimeout)
: _socketTimeout(socketTimeout) {
verify(0);
}
SyncClusterConnection::~SyncClusterConnection() {
for ( size_t i=0; i<_conns.size(); i++ )
delete _conns[i];
_conns.clear();
}
bool SyncClusterConnection::prepare(string& errmsg) {
_lastErrors.clear();
bool ok = true;
errmsg = "";
for (size_t i = 0; i < _conns.size(); i++) {
string singleErr;
try {
_conns[i]->simpleCommand("admin", NULL, "resetError");
singleErr = _conns[i]->getLastError(true);
if (singleErr.size() == 0)
continue;
}
catch (DBException& e) {
singleErr = e.toString();
}
ok = false;
errmsg += " " + _conns[i]->toString() + ":" + singleErr;
}
return ok;
}
void SyncClusterConnection::_checkLast() {
_lastErrors.clear();
vector errors;
for ( size_t i=0; i<_conns.size(); i++ ) {
BSONObj res;
string err;
try {
if ( ! _conns[i]->runCommand( "admin" , BSON( "getlasterror" << 1 << "fsync" << 1 ) , res ) )
err = "cmd failed: ";
}
catch ( std::exception& e ) {
err += e.what();
}
catch ( ... ) {
err += "unknown failure";
}
_lastErrors.push_back( res.getOwned() );
errors.push_back( err );
}
verify( _lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size() );
stringstream err;
bool ok = true;
for ( size_t i = 0; i<_conns.size(); i++ ) {
BSONObj res = _lastErrors[i];
if ( res["ok"].trueValue() && (res["fsyncFiles"].numberInt() > 0 ||
res.hasElement("waited") ||
res["syncMillis"].numberInt() >= 0 ) )
continue;
ok = false;
err << _conns[i]->toString() << ": " << res << " " << errors[i];
}
if ( ok )
return;
throw UserException( 8001 , (string)"SyncClusterConnection write op failed: " + err.str() );
}
BSONObj SyncClusterConnection::getLastErrorDetailed(bool fsync, bool j, int w, int wtimeout) {
return getLastErrorDetailed("admin", fsync, j, w, wtimeout);
}
BSONObj SyncClusterConnection::getLastErrorDetailed(const std::string& db,
bool fsync,
bool j,
int w,
int wtimeout) {
if ( _lastErrors.size() )
return _lastErrors[0];
return DBClientBase::getLastErrorDetailed(db,fsync,j,w,wtimeout);
}
void SyncClusterConnection::_connect( const std::string& host ) {
log() << "SyncClusterConnection connecting to [" << host << "]" << endl;
DBClientConnection * c = new DBClientConnection( true );
c->setRunCommandHook(_runCommandHook);
c->setPostRunCommandHook(_postRunCommandHook);
c->setSoTimeout( _socketTimeout );
string errmsg;
if ( ! c->connect( HostAndPort(host), errmsg ) )
log() << "SyncClusterConnection connect fail to: " << host << " errmsg: " << errmsg << endl;
_connAddresses.push_back( host );
_conns.push_back( c );
}
bool SyncClusterConnection::callRead( Message& toSend , Message& response ) {
// TODO: need to save state of which one to go back to somehow...
return _conns[0]->callRead( toSend , response );
}
bool SyncClusterConnection::runCommand(const std::string& dbname,
const BSONObj& cmd,
BSONObj& info,
int options) {
std::string ns = dbname + ".$cmd";
BSONObj interposedCmd = cmd;
if (_runCommandHook) {
BSONObjBuilder cmdObjBob;
cmdObjBob.appendElements(cmd);
_runCommandHook(&cmdObjBob);
interposedCmd = cmdObjBob.obj();
}
info = findOne(ns, Query(interposedCmd), 0, options);
if (_postRunCommandHook) {
_postRunCommandHook(info, getServerAddress());
}
return isOk(info);
}
BSONObj SyncClusterConnection::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) {
if ( ns.find( ".$cmd" ) != string::npos ) {
string cmdName = query.obj.firstElementFieldName();
int lockType = _lockType( cmdName );
if ( lockType > 0 ) { // write $cmd
string errmsg;
if ( ! prepare( errmsg ) )
throw UserException( PrepareConfigsFailedCode , (string)"SyncClusterConnection::findOne prepare failed: " + errmsg );
vector all;
for ( size_t i=0; i<_conns.size(); i++ ) {
all.push_back( _conns[i]->findOne( ns , query , 0 , queryOptions ).getOwned() );
}
_checkLast();
for ( size_t i=0; itoString();
ss << " ns: " << ns;
ss << " cmd: " << query.toString();
throw UserException( 13105 , ss.str() );
}
return all[0];
}
}
return DBClientBase::findOne( ns , query , fieldsToReturn , queryOptions );
}
void SyncClusterConnection::_auth(const BSONObj& params) {
// A SCC is authenticated if any connection has been authenticated
// Credentials are stored in the auto-reconnect connections.
bool authedOnce = false;
vector errors;
for ( vector::iterator it = _conns.begin(); it < _conns.end(); ++it ) {
massert( 15848, "sync cluster of sync clusters?",
(*it)->type() != ConnectionString::SYNC );
// Authenticate or collect the error message
string lastErrmsg;
bool authed;
try {
// Auth errors can manifest either as exceptions or as false results
// TODO: Make this better
(*it)->auth(params);
authed = true;
}
catch ( const DBException& e ) {
// auth will be retried on reconnect
lastErrmsg = e.what();
authed = false;
}
if ( ! authed ) {
// Since we're using auto-reconnect connections, we're sure the auth info has been
// stored if needed for later
lastErrmsg = str::stream() << "auth error on " << (*it)->getServerAddress()
<< causedBy( lastErrmsg );
LOG(1) << lastErrmsg << endl;
errors.push_back( lastErrmsg );
}
authedOnce = authedOnce || authed;
}
if( authedOnce ) return;
// Assemble the error message
str::stream errStream;
for( vector::iterator it = errors.begin(); it != errors.end(); ++it ){
if( it != errors.begin() ) errStream << " ::and:: ";
errStream << *it;
}
uasserted(ErrorCodes::AuthenticationFailed, errStream);
}
// TODO: logout is required for use of this class outside of a cluster environment
unique_ptr SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip,
const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) {
_lastErrors.clear();
if ( ns.find( ".$cmd" ) != string::npos ) {
string cmdName = query.obj.firstElementFieldName();
int lockType = _lockType( cmdName );
uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection::query for:" + cmdName , lockType <= 0 );
}
return _queryOnActive( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize );
}
bool SyncClusterConnection::_commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options ) {
unique_ptr cursor = _queryOnActive(dbname + ".$cmd", cmd, 1, 0, 0, options, 0);
if ( cursor->more() )
info = cursor->next().copy();
else
info = BSONObj();
return isOk( info );
}
void SyncClusterConnection::attachQueryHandler( QueryHandler* handler ) {
_customQueryHandler.reset( handler );
}
unique_ptr SyncClusterConnection::_queryOnActive(const string &ns, Query query, int nToReturn, int nToSkip,
const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) {
if ( _customQueryHandler && _customQueryHandler->canHandleQuery( ns, query ) ) {
LOG( 2 ) << "custom query handler used for query on " << ns << ": "
<< query.toString() << endl;
return _customQueryHandler->handleQuery( _connAddresses,
ns,
query,
nToReturn,
nToSkip,
fieldsToReturn,
queryOptions,
batchSize );
}
for ( size_t i=0; i<_conns.size(); i++ ) {
try {
unique_ptr cursor =
_conns[i]->query( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize );
if ( cursor.get() )
return cursor;
log() << "query on " << ns << ": " << query.toString() << " failed to: "
<< _conns[i]->toString() << " no data" << endl;
}
catch ( std::exception& e ) {
log() << "query on " << ns << ": " << query.toString() << " failed to: "
<< _conns[i]->toString() << " exception: " << e.what() << endl;
}
catch ( ... ) {
log() << "query on " << ns << ": " << query.toString() << " failed to: "
<< _conns[i]->toString() << " exception" << endl;
}
}
throw UserException( 8002 , str::stream() << "all servers down/unreachable when querying: " << _address );
}
unique_ptr SyncClusterConnection::getMore( const string &ns, long long cursorId, int nToReturn, int options ) {
uassert( 10022 , "SyncClusterConnection::getMore not supported yet" , 0);
unique_ptr c;
return c;
}
void SyncClusterConnection::insert( const string &ns, BSONObj obj , int flags) {
uassert(13119,
str::stream() << "SyncClusterConnection::insert obj has to have an _id: " << obj,
nsToCollectionSubstring(ns) == "system.indexes" || obj["_id"].type());
string errmsg;
if ( ! prepare( errmsg ) )
throw UserException( 8003 , (string)"SyncClusterConnection::insert prepare failed: " + errmsg );
for ( size_t i=0; i<_conns.size(); i++ ) {
_conns[i]->insert( ns , obj , flags);
}
_checkLast();
}
void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v , int flags) {
if (v.size() == 1){
insert(ns, v[0], flags);
return;
}
for (vector::const_iterator it = v.begin(); it != v.end(); ++it ) {
BSONObj obj = *it;
if ( obj["_id"].type() == EOO ) {
string assertMsg = "SyncClusterConnection::insert (batched) obj misses an _id: ";
uasserted( 16743, assertMsg + obj.jsonString() );
}
}
// fsync all connections before starting the batch.
string errmsg;
if ( ! prepare( errmsg ) ) {
string assertMsg = "SyncClusterConnection::insert (batched) prepare failed: ";
throw UserException( 16744, assertMsg + errmsg );
}
// We still want one getlasterror per document, even if they're batched.
for ( size_t i=0; i<_conns.size(); i++ ) {
for ( vector::const_iterator it = v.begin(); it != v.end(); ++it ) {
_conns[i]->insert( ns, *it, flags );
_conns[i]->getLastErrorDetailed();
}
}
// We issue a final getlasterror, but this time with an fsync.
_checkLast();
}
void SyncClusterConnection::remove( const string &ns , Query query, int flags ) {
string errmsg;
if ( ! prepare( errmsg ) )
throw UserException( 8020 , (string)"SyncClusterConnection::remove prepare failed: " + errmsg );
for ( size_t i=0; i<_conns.size(); i++ ) {
_conns[i]->remove( ns , query , flags );
}
_checkLast();
}
void SyncClusterConnection::update(const string &ns, Query query, BSONObj obj, int flags) {
if ( flags & UpdateOption_Upsert ) {
uassert(13120,
"SyncClusterConnection::update upsert query needs _id",
query.obj["_id"].type());
}
string errmsg;
if (!prepare(errmsg)) {
throw UserException(8005,
str::stream() << "SyncClusterConnection::update prepare failed: "
<< errmsg);
}
for (size_t i = 0; i < _conns.size(); i++) {
_conns[i]->update(ns, query, obj, flags);
}
_checkLast();
invariant(_lastErrors.size() > 1);
const int a = _lastErrors[0]["n"].numberInt();
for (unsigned i = 1; i < _lastErrors.size(); i++) {
int b = _lastErrors[i]["n"].numberInt();
if (a == b)
continue;
throw UpdateNotTheSame(8017,
str::stream() << "update not consistent "
<< " ns: " << ns
<< " query: " << query.toString()
<< " update: " << obj
<< " gle1: " << _lastErrors[0]
<< " gle2: " << _lastErrors[i],
_connAddresses,
_lastErrors);
}
}
string SyncClusterConnection::_toString() const {
stringstream ss;
ss << "SyncClusterConnection ";
ss << " [";
for ( size_t i = 0; i < _conns.size(); i++ ) {
if ( i != 0 ) ss << ",";
if ( _conns[i] ) {
ss << _conns[i]->toString();
}
else {
ss << "(no conn)";
}
}
ss << "]";
return ss.str();
}
bool SyncClusterConnection::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) {
uassert( 8006 , "SyncClusterConnection::call can only be used directly for dbQuery" ,
toSend.operation() == dbQuery );
DbMessage d( toSend );
uassert( 8007 , "SyncClusterConnection::call can't handle $cmd" , strstr( d.getns(), "$cmd" ) == 0 );
for ( size_t i=0; i<_conns.size(); i++ ) {
try {
bool ok = _conns[i]->call( toSend , response , assertOk );
if ( ok ) {
if ( actualServer )
*actualServer = _connAddresses[i];
return ok;
}
log() << "call failed to: " << _conns[i]->toString() << " no data" << endl;
}
catch ( ... ) {
log() << "call failed to: " << _conns[i]->toString() << " exception" << endl;
}
}
throw UserException( 8008 , str::stream() << "all servers down/unreachable: " << _address );
}
void SyncClusterConnection::say( Message &toSend, bool isRetry , string * actualServer ) {
string errmsg;
if ( ! prepare( errmsg ) )
throw UserException( 13397 , (string)"SyncClusterConnection::say prepare failed: " + errmsg );
for ( size_t i=0; i<_conns.size(); i++ ) {
_conns[i]->say( toSend );
}
// TODO: should we set actualServer??
_checkLast();
}
void SyncClusterConnection::sayPiggyBack( Message &toSend ) {
verify(0);
}
int SyncClusterConnection::_lockType( const string& name ) {
{
boost::lock_guard lk(_mutex);
map::iterator i = _lockTypes.find( name );
if ( i != _lockTypes.end() )
return i->second;
}
BSONObj info;
uassert( 13053 , str::stream() << "help failed: " << info , _commandOnActive( "admin" , BSON( name << "1" << "help" << 1 ) , info ) );
int lockType = info["lockType"].numberInt();
boost::lock_guard lk(_mutex);
_lockTypes[name] = lockType;
return lockType;
}
void SyncClusterConnection::killCursor( long long cursorID ) {
// should never need to do this
verify(0);
}
// A SCC should be reused only if all the existing connections haven't been broken in the
// background.
// Note: an SCC may have missing connections if a config server is temporarily offline,
// but reading from the others is still allowed.
bool SyncClusterConnection::isStillConnected() {
for ( size_t i = 0; i < _conns.size(); i++ ) {
if ( _conns[i] && !_conns[i]->isStillConnected() ) return false;
}
return true;
}
void SyncClusterConnection::setAllSoTimeouts( double socketTimeout ){
_socketTimeout = socketTimeout;
for ( size_t i=0; i<_conns.size(); i++ )
if( _conns[i] ) _conns[i]->setSoTimeout( socketTimeout );
}
void SyncClusterConnection::setRunCommandHook(DBClientWithCommands::RunCommandHookFunc func) {
// Set the hooks in both our sub-connections and in ourselves.
for (size_t i = 0; i < _conns.size(); ++i) {
if (_conns[i]) {
_conns[i]->setRunCommandHook(func);
}
}
_runCommandHook = func;
}
void SyncClusterConnection::setPostRunCommandHook
(DBClientWithCommands::PostRunCommandHookFunc func) {
// Set the hooks in both our sub-connections and in ourselves.
for (size_t i = 0; i < _conns.size(); ++i) {
if (_conns[i]) {
_conns[i]->setPostRunCommandHook(func);
}
}
_postRunCommandHook = func;
}
}