diff options
Diffstat (limited to 'src/mongo/client/syncclusterconnection.cpp')
-rw-r--r-- | src/mongo/client/syncclusterconnection.cpp | 963 |
1 files changed, 483 insertions, 480 deletions
diff --git a/src/mongo/client/syncclusterconnection.cpp b/src/mongo/client/syncclusterconnection.cpp index f70b982b291..250a8a0a365 100644 --- a/src/mongo/client/syncclusterconnection.cpp +++ b/src/mongo/client/syncclusterconnection.cpp @@ -44,605 +44,608 @@ namespace mongo { - using std::unique_ptr; - using std::endl; - using std::list; - using std::map; - using std::string; - using std::stringstream; - using std::vector; - - SyncClusterConnection::SyncClusterConnection( const list<HostAndPort> & L, double socketTimeout) - : _socketTimeout(socketTimeout) { - - { - stringstream s; - int n=0; - for( list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++ ) { - if( ++n > 1 ) s << ','; - s << i->toString(); - } - _address = s.str(); +using std::unique_ptr; +using std::endl; +using std::list; +using std::map; +using std::string; +using std::stringstream; +using std::vector; + +SyncClusterConnection::SyncClusterConnection(const list<HostAndPort>& L, double socketTimeout) + : _socketTimeout(socketTimeout) { + { + stringstream s; + int n = 0; + for (list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++) { + if (++n > 1) + s << ','; + s << i->toString(); } - for( list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++ ) - _connect( i->toString() ); + _address = s.str(); } + for (list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++) + _connect(i->toString()); +} - SyncClusterConnection::SyncClusterConnection( string commaSeparated, double socketTimeout) : - _socketTimeout( socketTimeout ) { - - _address = commaSeparated; - string::size_type idx; - while ( ( idx = commaSeparated.find( ',' ) ) != string::npos ) { - string h = commaSeparated.substr( 0 , idx ); - commaSeparated = commaSeparated.substr( idx + 1 ); - _connect( h ); - } - _connect( commaSeparated ); - uassert( 8004 , "SyncClusterConnection needs 3 servers" , _conns.size() == 3 ); - } +SyncClusterConnection::SyncClusterConnection(string commaSeparated, double socketTimeout) + : _socketTimeout(socketTimeout) { + _address = commaSeparated; + string::size_type idx; + while ((idx = commaSeparated.find(',')) != string::npos) { + string h = commaSeparated.substr(0, idx); + commaSeparated = commaSeparated.substr(idx + 1); + _connect(h); + } + _connect(commaSeparated); + uassert(8004, "SyncClusterConnection needs 3 servers", _conns.size() == 3); +} - SyncClusterConnection::SyncClusterConnection( - const std::string& a, - const std::string& b, - const std::string& c, - double socketTimeout) : _socketTimeout( socketTimeout ) { - - _address = a + "," + b + "," + c; - // connect to all even if not working - _connect( a ); - _connect( b ); - _connect( c ); - } +SyncClusterConnection::SyncClusterConnection(const std::string& a, + const std::string& b, + const std::string& c, + double socketTimeout) + : _socketTimeout(socketTimeout) { + _address = a + "," + b + "," + c; + // connect to all even if not working + _connect(a); + _connect(b); + _connect(c); +} - SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev, double socketTimeout) - : _socketTimeout(socketTimeout) { - verify(0); - } +SyncClusterConnection::SyncClusterConnection(SyncClusterConnection& prev, double socketTimeout) + : _socketTimeout(socketTimeout) { + verify(0); +} - SyncClusterConnection::~SyncClusterConnection() { - for ( size_t i=0; i<_conns.size(); i++ ) - delete _conns[i]; - _conns.clear(); - } +SyncClusterConnection::~SyncClusterConnection() { + for (size_t i = 0; i < _conns.size(); i++) + delete _conns[i]; + _conns.clear(); +} - bool SyncClusterConnection::prepare(string& errmsg) { - _lastErrors.clear(); +bool SyncClusterConnection::prepare(string& errmsg) { + _lastErrors.clear(); - bool ok = true; - errmsg = ""; + bool ok = true; + errmsg = ""; - for (size_t i = 0; i < _conns.size(); i++) { - string singleErr; - try { - _conns[i]->simpleCommand("admin", NULL, "resetError"); - singleErr = _conns[i]->getLastError(true); + for (size_t i = 0; i < _conns.size(); i++) { + string singleErr; + try { + _conns[i]->simpleCommand("admin", NULL, "resetError"); + singleErr = _conns[i]->getLastError(true); - if (singleErr.size() == 0) - continue; + if (singleErr.size() == 0) + continue; - } - catch (DBException& e) { - singleErr = e.toString(); - } - ok = false; - errmsg += " " + _conns[i]->toString() + ":" + singleErr; + } catch (DBException& e) { + singleErr = e.toString(); } - - return ok; + ok = false; + errmsg += " " + _conns[i]->toString() + ":" + singleErr; } - void SyncClusterConnection::_checkLast() { - _lastErrors.clear(); - vector<string> errors; - - for ( size_t i=0; i<_conns.size(); i++ ) { - BSONObj res; - string err; - try { - if ( ! _conns[i]->runCommand( "admin" , BSON( "getlasterror" << 1 << "fsync" << 1 ) , res ) ) - err = "cmd failed: "; - } - catch ( std::exception& e ) { - err += e.what(); - } - catch ( ... ) { - err += "unknown failure"; - } - _lastErrors.push_back( res.getOwned() ); - errors.push_back( err ); - } - - verify( _lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size() ); - - stringstream err; - bool ok = true; + return ok; +} - for ( size_t i = 0; i<_conns.size(); i++ ) { - BSONObj res = _lastErrors[i]; - if ( res["ok"].trueValue() && (res["fsyncFiles"].numberInt() > 0 || - res.hasElement("waited") || - res["syncMillis"].numberInt() >= 0 ) ) - continue; - ok = false; - err << _conns[i]->toString() << ": " << res << " " << errors[i]; +void SyncClusterConnection::_checkLast() { + _lastErrors.clear(); + vector<string> errors; + + for (size_t i = 0; i < _conns.size(); i++) { + BSONObj res; + string err; + try { + if (!_conns[i]->runCommand("admin", BSON("getlasterror" << 1 << "fsync" << 1), res)) + err = "cmd failed: "; + } catch (std::exception& e) { + err += e.what(); + } catch (...) { + err += "unknown failure"; } - - if ( ok ) - return; - throw UserException( 8001 , (string)"SyncClusterConnection write op failed: " + err.str() ); + _lastErrors.push_back(res.getOwned()); + errors.push_back(err); } - BSONObj SyncClusterConnection::getLastErrorDetailed(bool fsync, bool j, int w, int wtimeout) { - return getLastErrorDetailed("admin", fsync, j, w, wtimeout); - } + verify(_lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size()); - BSONObj SyncClusterConnection::getLastErrorDetailed(const std::string& db, - bool fsync, - bool j, - int w, - int wtimeout) { - if ( _lastErrors.size() ) - return _lastErrors[0]; - return DBClientBase::getLastErrorDetailed(db,fsync,j,w,wtimeout); - } + stringstream err; + bool ok = true; - void SyncClusterConnection::_connect( const std::string& host ) { - log() << "SyncClusterConnection connecting to [" << host << "]" << endl; - DBClientConnection * c = new DBClientConnection( true ); - c->setRequestMetadataWriter(getRequestMetadataWriter()); - c->setReplyMetadataReader(getReplyMetadataReader()); - c->setSoTimeout( _socketTimeout ); - string errmsg; - if ( ! c->connect( HostAndPort(host), errmsg ) ) - log() << "SyncClusterConnection connect fail to: " << host << " errmsg: " << errmsg << endl; - _connAddresses.push_back( host ); - _conns.push_back( c ); + for (size_t i = 0; i < _conns.size(); i++) { + BSONObj res = _lastErrors[i]; + if (res["ok"].trueValue() && + (res["fsyncFiles"].numberInt() > 0 || res.hasElement("waited") || + res["syncMillis"].numberInt() >= 0)) + continue; + ok = false; + err << _conns[i]->toString() << ": " << res << " " << errors[i]; } - bool SyncClusterConnection::callRead( Message& toSend , Message& response ) { - // TODO: need to save state of which one to go back to somehow... - return _conns[0]->callRead( toSend , response ); - } - - bool SyncClusterConnection::runCommand(const std::string& dbname, - const BSONObj& cmd, - BSONObj& info, - int options) { - - std::string ns = dbname + ".$cmd"; - BSONObj interposedCmd = cmd; + if (ok) + return; + throw UserException(8001, (string) "SyncClusterConnection write op failed: " + err.str()); +} - if (getRequestMetadataWriter()) { - // We have a metadata writer. We need to upconvert the metadata, write to it, - // Then downconvert it again. This unfortunate, but this code is going to be - // removed anyway as part of CSRS. +BSONObj SyncClusterConnection::getLastErrorDetailed(bool fsync, bool j, int w, int wtimeout) { + return getLastErrorDetailed("admin", fsync, j, w, wtimeout); +} - BSONObj upconvertedCommand; - BSONObj upconvertedMetadata; +BSONObj SyncClusterConnection::getLastErrorDetailed( + const std::string& db, bool fsync, bool j, int w, int wtimeout) { + if (_lastErrors.size()) + return _lastErrors[0]; + return DBClientBase::getLastErrorDetailed(db, fsync, j, w, wtimeout); +} - std::tie(upconvertedCommand, upconvertedMetadata) = uassertStatusOK( - rpc::upconvertRequestMetadata(cmd, options) - ); +void SyncClusterConnection::_connect(const std::string& host) { + log() << "SyncClusterConnection connecting to [" << host << "]" << endl; + DBClientConnection* c = new DBClientConnection(true); + c->setRequestMetadataWriter(getRequestMetadataWriter()); + c->setReplyMetadataReader(getReplyMetadataReader()); + c->setSoTimeout(_socketTimeout); + string errmsg; + if (!c->connect(HostAndPort(host), errmsg)) + log() << "SyncClusterConnection connect fail to: " << host << " errmsg: " << errmsg << endl; + _connAddresses.push_back(host); + _conns.push_back(c); +} - BSONObjBuilder metadataBob; - metadataBob.appendElements(upconvertedMetadata); +bool SyncClusterConnection::callRead(Message& toSend, Message& response) { + // TODO: need to save state of which one to go back to somehow... + return _conns[0]->callRead(toSend, response); +} - uassertStatusOK(getRequestMetadataWriter()(&metadataBob)); +bool SyncClusterConnection::runCommand(const std::string& dbname, + const BSONObj& cmd, + BSONObj& info, + int options) { + std::string ns = dbname + ".$cmd"; + BSONObj interposedCmd = cmd; - std::tie(interposedCmd, options) = uassertStatusOK( - rpc::downconvertRequestMetadata(std::move(upconvertedCommand), metadataBob.done()) - ); - } - - BSONObj legacyResult = findOne(ns, Query(interposedCmd), 0, options); + if (getRequestMetadataWriter()) { + // We have a metadata writer. We need to upconvert the metadata, write to it, + // Then downconvert it again. This unfortunate, but this code is going to be + // removed anyway as part of CSRS. + BSONObj upconvertedCommand; BSONObj upconvertedMetadata; - BSONObj upconvertedReply; - std::tie(upconvertedReply, upconvertedMetadata) = uassertStatusOK( - rpc::upconvertReplyMetadata(legacyResult) - ); + std::tie(upconvertedCommand, upconvertedMetadata) = + uassertStatusOK(rpc::upconvertRequestMetadata(cmd, options)); - if (getReplyMetadataReader()) { - // TODO: what does getServerAddress() actually mean here as this connection - // represents a connection to 1 or 3 config servers... - uassertStatusOK(getReplyMetadataReader()(upconvertedReply, getServerAddress())); - } + BSONObjBuilder metadataBob; + metadataBob.appendElements(upconvertedMetadata); - info = upconvertedReply; + uassertStatusOK(getRequestMetadataWriter()(&metadataBob)); - return isOk(info); + std::tie(interposedCmd, options) = uassertStatusOK( + rpc::downconvertRequestMetadata(std::move(upconvertedCommand), metadataBob.done())); } - BSONObj SyncClusterConnection::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { - - if ( ns.find( ".$cmd" ) != string::npos ) { - string cmdName = query.obj.firstElementFieldName(); - - int lockType = _lockType( cmdName ); - - if ( lockType > 0 ) { // write $cmd - string errmsg; - if ( ! prepare( errmsg ) ) - throw UserException( PrepareConfigsFailedCode , (string)"SyncClusterConnection::findOne prepare failed: " + errmsg ); - - vector<BSONObj> all; - for ( size_t i=0; i<_conns.size(); i++ ) { - all.push_back( _conns[i]->findOne( ns , query , 0 , queryOptions ).getOwned() ); - } - - _checkLast(); - - for ( size_t i=0; i<all.size(); i++ ) { - BSONObj temp = all[i]; - if ( isOk( temp ) ) - continue; - stringstream ss; - ss << "write $cmd failed on a node: " << temp.jsonString(); - ss << " " << _conns[i]->toString(); - ss << " ns: " << ns; - ss << " cmd: " << query.toString(); - throw UserException( 13105 , ss.str() ); - } - - return all[0]; - } - } + BSONObj legacyResult = findOne(ns, Query(interposedCmd), 0, options); - return DBClientBase::findOne( ns , query , fieldsToReturn , queryOptions ); - } + BSONObj upconvertedMetadata; + BSONObj upconvertedReply; - void SyncClusterConnection::_auth(const BSONObj& params) { - // A SCC is authenticated if any connection has been authenticated - // Credentials are stored in the auto-reconnect connections. + std::tie(upconvertedReply, upconvertedMetadata) = + uassertStatusOK(rpc::upconvertReplyMetadata(legacyResult)); - bool authedOnce = false; - vector<string> errors; + if (getReplyMetadataReader()) { + // TODO: what does getServerAddress() actually mean here as this connection + // represents a connection to 1 or 3 config servers... + uassertStatusOK(getReplyMetadataReader()(upconvertedReply, getServerAddress())); + } - for ( vector<DBClientConnection*>::iterator it = _conns.begin(); it < _conns.end(); ++it ) { + info = upconvertedReply; - massert( 15848, "sync cluster of sync clusters?", - (*it)->type() != ConnectionString::SYNC ); + return isOk(info); +} - // Authenticate or collect the error message - string lastErrmsg; - bool authed; - try { - // Auth errors can manifest either as exceptions or as false results - // TODO: Make this better - (*it)->auth(params); - authed = true; +BSONObj SyncClusterConnection::findOne(const string& ns, + const Query& query, + const BSONObj* fieldsToReturn, + int queryOptions) { + if (ns.find(".$cmd") != string::npos) { + string cmdName = query.obj.firstElementFieldName(); + + int lockType = _lockType(cmdName); + + if (lockType > 0) { // write $cmd + string errmsg; + if (!prepare(errmsg)) + throw UserException(PrepareConfigsFailedCode, + (string) "SyncClusterConnection::findOne prepare failed: " + + errmsg); + + vector<BSONObj> all; + for (size_t i = 0; i < _conns.size(); i++) { + all.push_back(_conns[i]->findOne(ns, query, 0, queryOptions).getOwned()); } - catch ( const DBException& e ) { - // auth will be retried on reconnect - lastErrmsg = e.what(); - authed = false; - } - - if ( ! authed ) { - // Since we're using auto-reconnect connections, we're sure the auth info has been - // stored if needed for later + _checkLast(); - lastErrmsg = str::stream() << "auth error on " << (*it)->getServerAddress() - << causedBy( lastErrmsg ); - - LOG(1) << lastErrmsg << endl; - errors.push_back( lastErrmsg ); + for (size_t i = 0; i < all.size(); i++) { + BSONObj temp = all[i]; + if (isOk(temp)) + continue; + stringstream ss; + ss << "write $cmd failed on a node: " << temp.jsonString(); + ss << " " << _conns[i]->toString(); + ss << " ns: " << ns; + ss << " cmd: " << query.toString(); + throw UserException(13105, ss.str()); } - authedOnce = authedOnce || authed; + return all[0]; } + } - if( authedOnce ) return; + return DBClientBase::findOne(ns, query, fieldsToReturn, queryOptions); +} - // Assemble the error message - str::stream errStream; - for( vector<string>::iterator it = errors.begin(); it != errors.end(); ++it ){ - if( it != errors.begin() ) errStream << " ::and:: "; - errStream << *it; +void SyncClusterConnection::_auth(const BSONObj& params) { + // A SCC is authenticated if any connection has been authenticated + // Credentials are stored in the auto-reconnect connections. + + bool authedOnce = false; + vector<string> errors; + + for (vector<DBClientConnection*>::iterator it = _conns.begin(); it < _conns.end(); ++it) { + massert(15848, "sync cluster of sync clusters?", (*it)->type() != ConnectionString::SYNC); + + // Authenticate or collect the error message + string lastErrmsg; + bool authed; + try { + // Auth errors can manifest either as exceptions or as false results + // TODO: Make this better + (*it)->auth(params); + authed = true; + } catch (const DBException& e) { + // auth will be retried on reconnect + lastErrmsg = e.what(); + authed = false; } - uasserted(ErrorCodes::AuthenticationFailed, errStream); - } + if (!authed) { + // Since we're using auto-reconnect connections, we're sure the auth info has been + // stored if needed for later - // TODO: logout is required for use of this class outside of a cluster environment + lastErrmsg = str::stream() << "auth error on " << (*it)->getServerAddress() + << causedBy(lastErrmsg); - unique_ptr<DBClientCursor> SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip, - const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) { - _lastErrors.clear(); - if ( ns.find( ".$cmd" ) != string::npos ) { - string cmdName = query.obj.firstElementFieldName(); - int lockType = _lockType( cmdName ); - uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection::query for:" + cmdName , lockType <= 0 ); + LOG(1) << lastErrmsg << endl; + errors.push_back(lastErrmsg); } - return _queryOnActive( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize ); + authedOnce = authedOnce || authed; } - bool SyncClusterConnection::_commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options ) { - unique_ptr<DBClientCursor> cursor = _queryOnActive(dbname + ".$cmd", cmd, 1, 0, 0, options, 0); - if ( cursor->more() ) - info = cursor->next().copy(); - else - info = BSONObj(); - return isOk( info ); - } + if (authedOnce) + return; - void SyncClusterConnection::attachQueryHandler( QueryHandler* handler ) { - _customQueryHandler.reset( handler ); + // Assemble the error message + str::stream errStream; + for (vector<string>::iterator it = errors.begin(); it != errors.end(); ++it) { + if (it != errors.begin()) + errStream << " ::and:: "; + errStream << *it; } - unique_ptr<DBClientCursor> SyncClusterConnection::_queryOnActive(const string &ns, Query query, int nToReturn, int nToSkip, - const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) { - - if ( _customQueryHandler && _customQueryHandler->canHandleQuery( ns, query ) ) { - - LOG( 2 ) << "custom query handler used for query on " << ns << ": " - << query.toString() << endl; - - return _customQueryHandler->handleQuery( _connAddresses, - ns, - query, - nToReturn, - nToSkip, - fieldsToReturn, - queryOptions, - batchSize ); - } + uasserted(ErrorCodes::AuthenticationFailed, errStream); +} - for ( size_t i=0; i<_conns.size(); i++ ) { - try { - unique_ptr<DBClientCursor> cursor = - _conns[i]->query( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize ); - if ( cursor.get() ) - return cursor; +// TODO: logout is required for use of this class outside of a cluster environment + +unique_ptr<DBClientCursor> SyncClusterConnection::query(const string& ns, + Query query, + int nToReturn, + int nToSkip, + const BSONObj* fieldsToReturn, + int queryOptions, + int batchSize) { + _lastErrors.clear(); + if (ns.find(".$cmd") != string::npos) { + string cmdName = query.obj.firstElementFieldName(); + int lockType = _lockType(cmdName); + uassert(13054, + (string) "write $cmd not supported in SyncClusterConnection::query for:" + cmdName, + lockType <= 0); + } + + return _queryOnActive(ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); +} - log() << "query on " << ns << ": " << query.toString() << " failed to: " - << _conns[i]->toString() << " no data" << endl; - } - catch ( std::exception& e ) { +bool SyncClusterConnection::_commandOnActive(const string& dbname, + const BSONObj& cmd, + BSONObj& info, + int options) { + unique_ptr<DBClientCursor> cursor = _queryOnActive(dbname + ".$cmd", cmd, 1, 0, 0, options, 0); + if (cursor->more()) + info = cursor->next().copy(); + else + info = BSONObj(); + return isOk(info); +} - log() << "query on " << ns << ": " << query.toString() << " failed to: " - << _conns[i]->toString() << " exception: " << e.what() << endl; - } - catch ( ... ) { +void SyncClusterConnection::attachQueryHandler(QueryHandler* handler) { + _customQueryHandler.reset(handler); +} - log() << "query on " << ns << ": " << query.toString() << " failed to: " - << _conns[i]->toString() << " exception" << endl; - } +unique_ptr<DBClientCursor> SyncClusterConnection::_queryOnActive(const string& ns, + Query query, + int nToReturn, + int nToSkip, + const BSONObj* fieldsToReturn, + int queryOptions, + int batchSize) { + if (_customQueryHandler && _customQueryHandler->canHandleQuery(ns, query)) { + LOG(2) << "custom query handler used for query on " << ns << ": " << query.toString() + << endl; + + return _customQueryHandler->handleQuery( + _connAddresses, ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); + } + + for (size_t i = 0; i < _conns.size(); i++) { + try { + unique_ptr<DBClientCursor> cursor = _conns[i]->query( + ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); + if (cursor.get()) + return cursor; + + log() << "query on " << ns << ": " << query.toString() + << " failed to: " << _conns[i]->toString() << " no data" << endl; + } catch (std::exception& e) { + log() << "query on " << ns << ": " << query.toString() + << " failed to: " << _conns[i]->toString() << " exception: " << e.what() << endl; + } catch (...) { + log() << "query on " << ns << ": " << query.toString() + << " failed to: " << _conns[i]->toString() << " exception" << endl; } - throw UserException( 8002 , str::stream() << "all servers down/unreachable when querying: " << _address ); } + throw UserException( + 8002, str::stream() << "all servers down/unreachable when querying: " << _address); +} - unique_ptr<DBClientCursor> SyncClusterConnection::getMore( const string &ns, long long cursorId, int nToReturn, int options ) { - uassert( 10022 , "SyncClusterConnection::getMore not supported yet" , 0); - unique_ptr<DBClientCursor> c; - return c; - } +unique_ptr<DBClientCursor> SyncClusterConnection::getMore(const string& ns, + long long cursorId, + int nToReturn, + int options) { + uassert(10022, "SyncClusterConnection::getMore not supported yet", 0); + unique_ptr<DBClientCursor> c; + return c; +} - void SyncClusterConnection::insert( const string &ns, BSONObj obj , int flags) { +void SyncClusterConnection::insert(const string& ns, BSONObj obj, int flags) { + uassert(13119, + str::stream() << "SyncClusterConnection::insert obj has to have an _id: " << obj, + nsToCollectionSubstring(ns) == "system.indexes" || obj["_id"].type()); - uassert(13119, - str::stream() << "SyncClusterConnection::insert obj has to have an _id: " << obj, - nsToCollectionSubstring(ns) == "system.indexes" || obj["_id"].type()); + string errmsg; + if (!prepare(errmsg)) + throw UserException(8003, + (string) "SyncClusterConnection::insert prepare failed: " + errmsg); - string errmsg; - if ( ! prepare( errmsg ) ) - throw UserException( 8003 , (string)"SyncClusterConnection::insert prepare failed: " + errmsg ); + for (size_t i = 0; i < _conns.size(); i++) { + _conns[i]->insert(ns, obj, flags); + } - for ( size_t i=0; i<_conns.size(); i++ ) { - _conns[i]->insert( ns , obj , flags); - } + _checkLast(); +} - _checkLast(); +void SyncClusterConnection::insert(const string& ns, const vector<BSONObj>& v, int flags) { + if (v.size() == 1) { + insert(ns, v[0], flags); + return; } - void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v , int flags) { - if (v.size() == 1){ - insert(ns, v[0], flags); - return; - } - - for (vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it ) { - BSONObj obj = *it; - if ( obj["_id"].type() == EOO ) { - string assertMsg = "SyncClusterConnection::insert (batched) obj misses an _id: "; - uasserted( 16743, assertMsg + obj.jsonString() ); - } + for (vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it) { + BSONObj obj = *it; + if (obj["_id"].type() == EOO) { + string assertMsg = "SyncClusterConnection::insert (batched) obj misses an _id: "; + uasserted(16743, assertMsg + obj.jsonString()); } + } - // fsync all connections before starting the batch. - string errmsg; - if ( ! prepare( errmsg ) ) { - string assertMsg = "SyncClusterConnection::insert (batched) prepare failed: "; - throw UserException( 16744, assertMsg + errmsg ); - } + // fsync all connections before starting the batch. + string errmsg; + if (!prepare(errmsg)) { + string assertMsg = "SyncClusterConnection::insert (batched) prepare failed: "; + throw UserException(16744, assertMsg + errmsg); + } - // We still want one getlasterror per document, even if they're batched. - for ( size_t i=0; i<_conns.size(); i++ ) { - for ( vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it ) { - _conns[i]->insert( ns, *it, flags ); - _conns[i]->getLastErrorDetailed(); - } + // We still want one getlasterror per document, even if they're batched. + for (size_t i = 0; i < _conns.size(); i++) { + for (vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it) { + _conns[i]->insert(ns, *it, flags); + _conns[i]->getLastErrorDetailed(); } - - // We issue a final getlasterror, but this time with an fsync. - _checkLast(); } - void SyncClusterConnection::remove( const string &ns , Query query, int flags ) { - string errmsg; - if ( ! prepare( errmsg ) ) - throw UserException( 8020 , (string)"SyncClusterConnection::remove prepare failed: " + errmsg ); + // We issue a final getlasterror, but this time with an fsync. + _checkLast(); +} - for ( size_t i=0; i<_conns.size(); i++ ) { - _conns[i]->remove( ns , query , flags ); - } +void SyncClusterConnection::remove(const string& ns, Query query, int flags) { + string errmsg; + if (!prepare(errmsg)) + throw UserException(8020, + (string) "SyncClusterConnection::remove prepare failed: " + errmsg); - _checkLast(); + for (size_t i = 0; i < _conns.size(); i++) { + _conns[i]->remove(ns, query, flags); } - void SyncClusterConnection::update(const string &ns, Query query, BSONObj obj, int flags) { - if ( flags & UpdateOption_Upsert ) { - uassert(13120, - "SyncClusterConnection::update upsert query needs _id", - query.obj["_id"].type()); - } + _checkLast(); +} - string errmsg; - if (!prepare(errmsg)) { - throw UserException(8005, - str::stream() << "SyncClusterConnection::update prepare failed: " - << errmsg); - } +void SyncClusterConnection::update(const string& ns, Query query, BSONObj obj, int flags) { + if (flags & UpdateOption_Upsert) { + uassert( + 13120, "SyncClusterConnection::update upsert query needs _id", query.obj["_id"].type()); + } - for (size_t i = 0; i < _conns.size(); i++) { - _conns[i]->update(ns, query, obj, flags); - } + string errmsg; + if (!prepare(errmsg)) { + throw UserException( + 8005, str::stream() << "SyncClusterConnection::update prepare failed: " << errmsg); + } - _checkLast(); - invariant(_lastErrors.size() > 1); + for (size_t i = 0; i < _conns.size(); i++) { + _conns[i]->update(ns, query, obj, flags); + } - const int a = _lastErrors[0]["n"].numberInt(); + _checkLast(); + invariant(_lastErrors.size() > 1); - for (unsigned i = 1; i < _lastErrors.size(); i++) { - int b = _lastErrors[i]["n"].numberInt(); + const int a = _lastErrors[0]["n"].numberInt(); - if (a == b) - continue; + for (unsigned i = 1; i < _lastErrors.size(); i++) { + int b = _lastErrors[i]["n"].numberInt(); - throw UpdateNotTheSame(8017, - str::stream() << "update not consistent " - << " ns: " << ns - << " query: " << query.toString() - << " update: " << obj - << " gle1: " << _lastErrors[0] - << " gle2: " << _lastErrors[i], - _connAddresses, - _lastErrors); - } + if (a == b) + continue; + + throw UpdateNotTheSame(8017, + str::stream() << "update not consistent " + << " ns: " << ns << " query: " << query.toString() + << " update: " << obj << " gle1: " << _lastErrors[0] + << " gle2: " << _lastErrors[i], + _connAddresses, + _lastErrors); } +} - string SyncClusterConnection::_toString() const { - stringstream ss; - ss << "SyncClusterConnection "; - ss << " ["; - for ( size_t i = 0; i < _conns.size(); i++ ) { - if ( i != 0 ) ss << ","; - if ( _conns[i] ) { - ss << _conns[i]->toString(); - } - else { - ss << "(no conn)"; - } +string SyncClusterConnection::_toString() const { + stringstream ss; + ss << "SyncClusterConnection "; + ss << " ["; + for (size_t i = 0; i < _conns.size(); i++) { + if (i != 0) + ss << ","; + if (_conns[i]) { + ss << _conns[i]->toString(); + } else { + ss << "(no conn)"; } - ss << "]"; - return ss.str(); } + ss << "]"; + return ss.str(); +} - bool SyncClusterConnection::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) { - uassert( 8006 , "SyncClusterConnection::call can only be used directly for dbQuery" , - toSend.operation() == dbQuery ); - - DbMessage d( toSend ); - uassert( 8007 , "SyncClusterConnection::call can't handle $cmd" , strstr( d.getns(), "$cmd" ) == 0 ); - - for ( size_t i=0; i<_conns.size(); i++ ) { - try { - bool ok = _conns[i]->call( toSend , response , assertOk ); - if ( ok ) { - if ( actualServer ) - *actualServer = _connAddresses[i]; - return ok; - } - log() << "call failed to: " << _conns[i]->toString() << " no data" << endl; - } - catch ( ... ) { - log() << "call failed to: " << _conns[i]->toString() << " exception" << endl; +bool SyncClusterConnection::call(Message& toSend, + Message& response, + bool assertOk, + string* actualServer) { + uassert(8006, + "SyncClusterConnection::call can only be used directly for dbQuery", + toSend.operation() == dbQuery); + + DbMessage d(toSend); + uassert(8007, "SyncClusterConnection::call can't handle $cmd", strstr(d.getns(), "$cmd") == 0); + + for (size_t i = 0; i < _conns.size(); i++) { + try { + bool ok = _conns[i]->call(toSend, response, assertOk); + if (ok) { + if (actualServer) + *actualServer = _connAddresses[i]; + return ok; } + log() << "call failed to: " << _conns[i]->toString() << " no data" << endl; + } catch (...) { + log() << "call failed to: " << _conns[i]->toString() << " exception" << endl; } - throw UserException( 8008 , str::stream() << "all servers down/unreachable: " << _address ); } + throw UserException(8008, str::stream() << "all servers down/unreachable: " << _address); +} - void SyncClusterConnection::say( Message &toSend, bool isRetry , string * actualServer ) { - string errmsg; - if ( ! prepare( errmsg ) ) - throw UserException( 13397 , (string)"SyncClusterConnection::say prepare failed: " + errmsg ); - - for ( size_t i=0; i<_conns.size(); i++ ) { - _conns[i]->say( toSend ); - } - - // TODO: should we set actualServer?? +void SyncClusterConnection::say(Message& toSend, bool isRetry, string* actualServer) { + string errmsg; + if (!prepare(errmsg)) + throw UserException(13397, (string) "SyncClusterConnection::say prepare failed: " + errmsg); - _checkLast(); + for (size_t i = 0; i < _conns.size(); i++) { + _conns[i]->say(toSend); } - void SyncClusterConnection::sayPiggyBack( Message &toSend ) { - verify(0); - } + // TODO: should we set actualServer?? - int SyncClusterConnection::_lockType( const string& name ) { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - map<string,int>::iterator i = _lockTypes.find( name ); - if ( i != _lockTypes.end() ) - return i->second; - } - - BSONObj info; - uassert( 13053 , str::stream() << "help failed: " << info , _commandOnActive( "admin" , BSON( name << "1" << "help" << 1 ) , info ) ); + _checkLast(); +} - int lockType = info["lockType"].numberInt(); +void SyncClusterConnection::sayPiggyBack(Message& toSend) { + verify(0); +} +int SyncClusterConnection::_lockType(const string& name) { + { stdx::lock_guard<stdx::mutex> lk(_mutex); - _lockTypes[name] = lockType; - return lockType; + map<string, int>::iterator i = _lockTypes.find(name); + if (i != _lockTypes.end()) + return i->second; } - void SyncClusterConnection::killCursor( long long cursorID ) { - // should never need to do this - verify(0); - } + BSONObj info; + uassert(13053, + str::stream() << "help failed: " << info, + _commandOnActive("admin", + BSON(name << "1" + << "help" << 1), + info)); - // A SCC should be reused only if all the existing connections haven't been broken in the - // background. - // Note: an SCC may have missing connections if a config server is temporarily offline, - // but reading from the others is still allowed. - bool SyncClusterConnection::isStillConnected() { - for ( size_t i = 0; i < _conns.size(); i++ ) { - if ( _conns[i] && !_conns[i]->isStillConnected() ) return false; + int lockType = info["lockType"].numberInt(); - } - return true; - } + stdx::lock_guard<stdx::mutex> lk(_mutex); + _lockTypes[name] = lockType; + return lockType; +} - void SyncClusterConnection::setAllSoTimeouts( double socketTimeout ){ - _socketTimeout = socketTimeout; - for ( size_t i=0; i<_conns.size(); i++ ) +void SyncClusterConnection::killCursor(long long cursorID) { + // should never need to do this + verify(0); +} - if( _conns[i] ) _conns[i]->setSoTimeout( socketTimeout ); +// A SCC should be reused only if all the existing connections haven't been broken in the +// background. +// Note: an SCC may have missing connections if a config server is temporarily offline, +// but reading from the others is still allowed. +bool SyncClusterConnection::isStillConnected() { + for (size_t i = 0; i < _conns.size(); i++) { + if (_conns[i] && !_conns[i]->isStillConnected()) + return false; } + return true; +} - void SyncClusterConnection::setRequestMetadataWriter(rpc::RequestMetadataWriter writer) { - // Set the hooks in both our sub-connections and in ourselves. - for (size_t i = 0; i < _conns.size(); ++i) { - if (_conns[i]) { - _conns[i]->setRequestMetadataWriter(writer); - } +void SyncClusterConnection::setAllSoTimeouts(double socketTimeout) { + _socketTimeout = socketTimeout; + for (size_t i = 0; i < _conns.size(); i++) + + if (_conns[i]) + _conns[i]->setSoTimeout(socketTimeout); +} + +void SyncClusterConnection::setRequestMetadataWriter(rpc::RequestMetadataWriter writer) { + // Set the hooks in both our sub-connections and in ourselves. + for (size_t i = 0; i < _conns.size(); ++i) { + if (_conns[i]) { + _conns[i]->setRequestMetadataWriter(writer); } - DBClientWithCommands::setRequestMetadataWriter(std::move(writer)); } + DBClientWithCommands::setRequestMetadataWriter(std::move(writer)); +} - void SyncClusterConnection::setReplyMetadataReader(rpc::ReplyMetadataReader reader) { - // Set the hooks in both our sub-connections and in ourselves. - for (size_t i = 0; i < _conns.size(); ++i) { - if (_conns[i]) { - _conns[i]->setReplyMetadataReader(reader); - } +void SyncClusterConnection::setReplyMetadataReader(rpc::ReplyMetadataReader reader) { + // Set the hooks in both our sub-connections and in ourselves. + for (size_t i = 0; i < _conns.size(); ++i) { + if (_conns[i]) { + _conns[i]->setReplyMetadataReader(reader); } - DBClientWithCommands::setReplyMetadataReader(std::move(reader)); } + DBClientWithCommands::setReplyMetadataReader(std::move(reader)); +} } |