diff options
Diffstat (limited to 'src/mongo/client/syncclusterconnection.cpp')
-rw-r--r-- | src/mongo/client/syncclusterconnection.cpp | 410 |
1 files changed, 410 insertions, 0 deletions
diff --git a/src/mongo/client/syncclusterconnection.cpp b/src/mongo/client/syncclusterconnection.cpp new file mode 100644 index 00000000000..601cdcbd758 --- /dev/null +++ b/src/mongo/client/syncclusterconnection.cpp @@ -0,0 +1,410 @@ +// syncclusterconnection.cpp +/* + * Copyright 2010 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "pch.h" +#include "syncclusterconnection.h" +#include "../db/dbmessage.h" + +// error codes 8000-8009 + +namespace mongo { + + SyncClusterConnection::SyncClusterConnection( const list<HostAndPort> & L, double socketTimeout) : _mutex("SyncClusterConnection"), _socketTimeout( socketTimeout ) { + { + stringstream s; + int n=0; + for( list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++ ) { + if( ++n > 1 ) s << ','; + s << i->toString(); + } + _address = s.str(); + } + for( list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++ ) + _connect( i->toString() ); + } + + SyncClusterConnection::SyncClusterConnection( string commaSeperated, double socketTimeout) : _mutex("SyncClusterConnection"), _socketTimeout( socketTimeout ) { + _address = commaSeperated; + string::size_type idx; + while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ) { + string h = commaSeperated.substr( 0 , idx ); + commaSeperated = commaSeperated.substr( idx + 1 ); + _connect( h ); + } + _connect( commaSeperated ); + uassert( 8004 , "SyncClusterConnection needs 3 servers" , _conns.size() == 3 ); + } + + SyncClusterConnection::SyncClusterConnection( string a , string b , string c, double socketTimeout) : _mutex("SyncClusterConnection"), _socketTimeout( socketTimeout ) { + _address = a + "," + b + "," + c; + // connect to all even if not working + _connect( a ); + _connect( b ); + _connect( c ); + } + + SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev, double socketTimeout) : _mutex("SyncClusterConnection"), _socketTimeout( socketTimeout ) { + assert(0); + } + + SyncClusterConnection::~SyncClusterConnection() { + for ( size_t i=0; i<_conns.size(); i++ ) + delete _conns[i]; + _conns.clear(); + } + + bool SyncClusterConnection::prepare( string& errmsg ) { + _lastErrors.clear(); + return fsync( errmsg ); + } + + bool SyncClusterConnection::fsync( string& errmsg ) { + bool ok = true; + errmsg = ""; + for ( size_t i=0; i<_conns.size(); i++ ) { + BSONObj res; + try { + if ( _conns[i]->simpleCommand( "admin" , &res , "fsync" ) ) + continue; + } + catch ( DBException& e ) { + errmsg += e.toString(); + } + catch ( std::exception& e ) { + errmsg += e.what(); + } + catch ( ... ) { + } + ok = false; + errmsg += " " + _conns[i]->toString() + ":" + res.toString(); + } + return ok; + } + + void SyncClusterConnection::_checkLast() { + _lastErrors.clear(); + vector<string> errors; + + for ( size_t i=0; i<_conns.size(); i++ ) { + BSONObj res; + string err; + try { + if ( ! _conns[i]->runCommand( "admin" , BSON( "getlasterror" << 1 << "fsync" << 1 ) , res ) ) + err = "cmd failed: "; + } + catch ( std::exception& e ) { + err += e.what(); + } + catch ( ... ) { + err += "unknown failure"; + } + _lastErrors.push_back( res.getOwned() ); + errors.push_back( err ); + } + + assert( _lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size() ); + + stringstream err; + bool ok = true; + + for ( size_t i = 0; i<_conns.size(); i++ ) { + BSONObj res = _lastErrors[i]; + if ( res["ok"].trueValue() && (res["fsyncFiles"].numberInt() > 0 || res.hasElement("waited"))) + continue; + ok = false; + err << _conns[i]->toString() << ": " << res << " " << errors[i]; + } + + if ( ok ) + return; + throw UserException( 8001 , (string)"SyncClusterConnection write op failed: " + err.str() ); + } + + BSONObj SyncClusterConnection::getLastErrorDetailed(bool fsync, bool j, int w, int wtimeout) { + if ( _lastErrors.size() ) + return _lastErrors[0]; + return DBClientBase::getLastErrorDetailed(fsync,j,w,wtimeout); + } + + void SyncClusterConnection::_connect( string host ) { + log() << "SyncClusterConnection connecting to [" << host << "]" << endl; + DBClientConnection * c = new DBClientConnection( true ); + c->setSoTimeout( _socketTimeout ); + string errmsg; + if ( ! c->connect( host , errmsg ) ) + log() << "SyncClusterConnection connect fail to: " << host << " errmsg: " << errmsg << endl; + _connAddresses.push_back( host ); + _conns.push_back( c ); + } + + bool SyncClusterConnection::callRead( Message& toSend , Message& response ) { + // TODO: need to save state of which one to go back to somehow... + return _conns[0]->callRead( toSend , response ); + } + + BSONObj SyncClusterConnection::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { + + if ( ns.find( ".$cmd" ) != string::npos ) { + string cmdName = query.obj.firstElementFieldName(); + + int lockType = _lockType( cmdName ); + + if ( lockType > 0 ) { // write $cmd + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 13104 , (string)"SyncClusterConnection::findOne prepare failed: " + errmsg ); + + vector<BSONObj> all; + for ( size_t i=0; i<_conns.size(); i++ ) { + all.push_back( _conns[i]->findOne( ns , query , 0 , queryOptions ).getOwned() ); + } + + _checkLast(); + + for ( size_t i=0; i<all.size(); i++ ) { + BSONObj temp = all[i]; + if ( isOk( temp ) ) + continue; + stringstream ss; + ss << "write $cmd failed on a node: " << temp.jsonString(); + ss << " " << _conns[i]->toString(); + ss << " ns: " << ns; + ss << " cmd: " << query.toString(); + throw UserException( 13105 , ss.str() ); + } + + return all[0]; + } + } + + return DBClientBase::findOne( ns , query , fieldsToReturn , queryOptions ); + } + + bool SyncClusterConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { + for (vector<DBClientConnection*>::iterator it = _conns.begin(); it < _conns.end(); it++) { + massert( 15848, "sync cluster of sync clusters?", (*it)->type() != ConnectionString::SYNC); + + if (!(*it)->auth(dbname, username, password_text, errmsg, digestPassword)) { + return false; + } + } + return true; + } + + auto_ptr<DBClientCursor> SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip, + const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) { + _lastErrors.clear(); + if ( ns.find( ".$cmd" ) != string::npos ) { + string cmdName = query.obj.firstElementFieldName(); + int lockType = _lockType( cmdName ); + uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection::query for:" + cmdName , lockType <= 0 ); + } + + return _queryOnActive( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize ); + } + + bool SyncClusterConnection::_commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options ) { + auto_ptr<DBClientCursor> cursor = _queryOnActive( dbname + ".$cmd" , cmd , 1 , 0 , 0 , options , 0 ); + if ( cursor->more() ) + info = cursor->next().copy(); + else + info = BSONObj(); + return isOk( info ); + } + + auto_ptr<DBClientCursor> SyncClusterConnection::_queryOnActive(const string &ns, Query query, int nToReturn, int nToSkip, + const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) { + + for ( size_t i=0; i<_conns.size(); i++ ) { + try { + auto_ptr<DBClientCursor> cursor = + _conns[i]->query( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize ); + if ( cursor.get() ) + return cursor; + log() << "query failed to: " << _conns[i]->toString() << " no data" << endl; + } + catch ( std::exception& e ) { + log() << "query failed to: " << _conns[i]->toString() << " exception: " << e.what() << endl; + } + catch ( ... ) { + log() << "query failed to: " << _conns[i]->toString() << " exception" << endl; + } + } + throw UserException( 8002 , "all servers down!" ); + } + + auto_ptr<DBClientCursor> SyncClusterConnection::getMore( const string &ns, long long cursorId, int nToReturn, int options ) { + uassert( 10022 , "SyncClusterConnection::getMore not supported yet" , 0); + auto_ptr<DBClientCursor> c; + return c; + } + + void SyncClusterConnection::insert( const string &ns, BSONObj obj , int flags) { + + uassert( 13119 , (string)"SyncClusterConnection::insert obj has to have an _id: " + obj.jsonString() , + ns.find( ".system.indexes" ) != string::npos || obj["_id"].type() ); + + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 8003 , (string)"SyncClusterConnection::insert prepare failed: " + errmsg ); + + for ( size_t i=0; i<_conns.size(); i++ ) { + _conns[i]->insert( ns , obj , flags); + } + + _checkLast(); + } + + void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v , int flags) { + uassert( 10023 , "SyncClusterConnection bulk insert not implemented" , 0); + } + + void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ) { + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 8020 , (string)"SyncClusterConnection::remove prepare failed: " + errmsg ); + + for ( size_t i=0; i<_conns.size(); i++ ) { + _conns[i]->remove( ns , query , justOne ); + } + + _checkLast(); + } + + void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ) { + + if ( upsert ) { + uassert( 13120 , "SyncClusterConnection::update upsert query needs _id" , query.obj["_id"].type() ); + } + + if ( _writeConcern ) { + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg ); + } + + for ( size_t i = 0; i < _conns.size(); i++ ) { + try { + _conns[i]->update( ns , query , obj , upsert , multi ); + } + catch ( std::exception& e ) { + if ( _writeConcern ) + throw e; + } + } + + if ( _writeConcern ) { + _checkLast(); + assert( _lastErrors.size() > 1 ); + + int a = _lastErrors[0]["n"].numberInt(); + for ( unsigned i=1; i<_lastErrors.size(); i++ ) { + int b = _lastErrors[i]["n"].numberInt(); + if ( a == b ) + continue; + + throw UpdateNotTheSame( 8017 , + str::stream() + << "update not consistent " + << " ns: " << ns + << " query: " << query.toString() + << " update: " << obj + << " gle1: " << _lastErrors[0] + << " gle2: " << _lastErrors[i] , + _connAddresses , _lastErrors ); + } + } + } + + string SyncClusterConnection::_toString() const { + stringstream ss; + ss << "SyncClusterConnection [" << _address << "]"; + return ss.str(); + } + + bool SyncClusterConnection::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) { + uassert( 8006 , "SyncClusterConnection::call can only be used directly for dbQuery" , + toSend.operation() == dbQuery ); + + DbMessage d( toSend ); + uassert( 8007 , "SyncClusterConnection::call can't handle $cmd" , strstr( d.getns(), "$cmd" ) == 0 ); + + for ( size_t i=0; i<_conns.size(); i++ ) { + try { + bool ok = _conns[i]->call( toSend , response , assertOk ); + if ( ok ) { + if ( actualServer ) + *actualServer = _connAddresses[i]; + return ok; + } + log() << "call failed to: " << _conns[i]->toString() << " no data" << endl; + } + catch ( ... ) { + log() << "call failed to: " << _conns[i]->toString() << " exception" << endl; + } + } + throw UserException( 8008 , "all servers down!" ); + } + + void SyncClusterConnection::say( Message &toSend, bool isRetry ) { + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 13397 , (string)"SyncClusterConnection::say prepare failed: " + errmsg ); + + for ( size_t i=0; i<_conns.size(); i++ ) { + _conns[i]->say( toSend ); + } + + _checkLast(); + } + + void SyncClusterConnection::sayPiggyBack( Message &toSend ) { + assert(0); + } + + int SyncClusterConnection::_lockType( const string& name ) { + { + scoped_lock lk(_mutex); + map<string,int>::iterator i = _lockTypes.find( name ); + if ( i != _lockTypes.end() ) + return i->second; + } + + BSONObj info; + uassert( 13053 , str::stream() << "help failed: " << info , _commandOnActive( "admin" , BSON( name << "1" << "help" << 1 ) , info ) ); + + int lockType = info["lockType"].numberInt(); + + scoped_lock lk(_mutex); + _lockTypes[name] = lockType; + return lockType; + } + + void SyncClusterConnection::killCursor( long long cursorID ) { + // should never need to do this + assert(0); + } + + void SyncClusterConnection::setAllSoTimeouts( double socketTimeout ){ + _socketTimeout = socketTimeout; + for ( size_t i=0; i<_conns.size(); i++ ) + + if( _conns[i] ) _conns[i]->setSoTimeout( socketTimeout ); + } + +} |