summaryrefslogtreecommitdiff
path: root/src/mongo/client/syncclusterconnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client/syncclusterconnection.cpp')
-rw-r--r--src/mongo/client/syncclusterconnection.cpp963
1 files changed, 483 insertions, 480 deletions
diff --git a/src/mongo/client/syncclusterconnection.cpp b/src/mongo/client/syncclusterconnection.cpp
index f70b982b291..250a8a0a365 100644
--- a/src/mongo/client/syncclusterconnection.cpp
+++ b/src/mongo/client/syncclusterconnection.cpp
@@ -44,605 +44,608 @@
namespace mongo {
- using std::unique_ptr;
- using std::endl;
- using std::list;
- using std::map;
- using std::string;
- using std::stringstream;
- using std::vector;
-
- SyncClusterConnection::SyncClusterConnection( const list<HostAndPort> & L, double socketTimeout)
- : _socketTimeout(socketTimeout) {
-
- {
- stringstream s;
- int n=0;
- for( list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++ ) {
- if( ++n > 1 ) s << ',';
- s << i->toString();
- }
- _address = s.str();
+using std::unique_ptr;
+using std::endl;
+using std::list;
+using std::map;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+SyncClusterConnection::SyncClusterConnection(const list<HostAndPort>& L, double socketTimeout)
+ : _socketTimeout(socketTimeout) {
+ {
+ stringstream s;
+ int n = 0;
+ for (list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++) {
+ if (++n > 1)
+ s << ',';
+ s << i->toString();
}
- for( list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++ )
- _connect( i->toString() );
+ _address = s.str();
}
+ for (list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++)
+ _connect(i->toString());
+}
- SyncClusterConnection::SyncClusterConnection( string commaSeparated, double socketTimeout) :
- _socketTimeout( socketTimeout ) {
-
- _address = commaSeparated;
- string::size_type idx;
- while ( ( idx = commaSeparated.find( ',' ) ) != string::npos ) {
- string h = commaSeparated.substr( 0 , idx );
- commaSeparated = commaSeparated.substr( idx + 1 );
- _connect( h );
- }
- _connect( commaSeparated );
- uassert( 8004 , "SyncClusterConnection needs 3 servers" , _conns.size() == 3 );
- }
+SyncClusterConnection::SyncClusterConnection(string commaSeparated, double socketTimeout)
+ : _socketTimeout(socketTimeout) {
+ _address = commaSeparated;
+ string::size_type idx;
+ while ((idx = commaSeparated.find(',')) != string::npos) {
+ string h = commaSeparated.substr(0, idx);
+ commaSeparated = commaSeparated.substr(idx + 1);
+ _connect(h);
+ }
+ _connect(commaSeparated);
+ uassert(8004, "SyncClusterConnection needs 3 servers", _conns.size() == 3);
+}
- SyncClusterConnection::SyncClusterConnection(
- const std::string& a,
- const std::string& b,
- const std::string& c,
- double socketTimeout) : _socketTimeout( socketTimeout ) {
-
- _address = a + "," + b + "," + c;
- // connect to all even if not working
- _connect( a );
- _connect( b );
- _connect( c );
- }
+SyncClusterConnection::SyncClusterConnection(const std::string& a,
+ const std::string& b,
+ const std::string& c,
+ double socketTimeout)
+ : _socketTimeout(socketTimeout) {
+ _address = a + "," + b + "," + c;
+ // connect to all even if not working
+ _connect(a);
+ _connect(b);
+ _connect(c);
+}
- SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev, double socketTimeout)
- : _socketTimeout(socketTimeout) {
- verify(0);
- }
+SyncClusterConnection::SyncClusterConnection(SyncClusterConnection& prev, double socketTimeout)
+ : _socketTimeout(socketTimeout) {
+ verify(0);
+}
- SyncClusterConnection::~SyncClusterConnection() {
- for ( size_t i=0; i<_conns.size(); i++ )
- delete _conns[i];
- _conns.clear();
- }
+SyncClusterConnection::~SyncClusterConnection() {
+ for (size_t i = 0; i < _conns.size(); i++)
+ delete _conns[i];
+ _conns.clear();
+}
- bool SyncClusterConnection::prepare(string& errmsg) {
- _lastErrors.clear();
+bool SyncClusterConnection::prepare(string& errmsg) {
+ _lastErrors.clear();
- bool ok = true;
- errmsg = "";
+ bool ok = true;
+ errmsg = "";
- for (size_t i = 0; i < _conns.size(); i++) {
- string singleErr;
- try {
- _conns[i]->simpleCommand("admin", NULL, "resetError");
- singleErr = _conns[i]->getLastError(true);
+ for (size_t i = 0; i < _conns.size(); i++) {
+ string singleErr;
+ try {
+ _conns[i]->simpleCommand("admin", NULL, "resetError");
+ singleErr = _conns[i]->getLastError(true);
- if (singleErr.size() == 0)
- continue;
+ if (singleErr.size() == 0)
+ continue;
- }
- catch (DBException& e) {
- singleErr = e.toString();
- }
- ok = false;
- errmsg += " " + _conns[i]->toString() + ":" + singleErr;
+ } catch (DBException& e) {
+ singleErr = e.toString();
}
-
- return ok;
+ ok = false;
+ errmsg += " " + _conns[i]->toString() + ":" + singleErr;
}
- void SyncClusterConnection::_checkLast() {
- _lastErrors.clear();
- vector<string> errors;
-
- for ( size_t i=0; i<_conns.size(); i++ ) {
- BSONObj res;
- string err;
- try {
- if ( ! _conns[i]->runCommand( "admin" , BSON( "getlasterror" << 1 << "fsync" << 1 ) , res ) )
- err = "cmd failed: ";
- }
- catch ( std::exception& e ) {
- err += e.what();
- }
- catch ( ... ) {
- err += "unknown failure";
- }
- _lastErrors.push_back( res.getOwned() );
- errors.push_back( err );
- }
-
- verify( _lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size() );
-
- stringstream err;
- bool ok = true;
+ return ok;
+}
- for ( size_t i = 0; i<_conns.size(); i++ ) {
- BSONObj res = _lastErrors[i];
- if ( res["ok"].trueValue() && (res["fsyncFiles"].numberInt() > 0 ||
- res.hasElement("waited") ||
- res["syncMillis"].numberInt() >= 0 ) )
- continue;
- ok = false;
- err << _conns[i]->toString() << ": " << res << " " << errors[i];
+void SyncClusterConnection::_checkLast() {
+ _lastErrors.clear();
+ vector<string> errors;
+
+ for (size_t i = 0; i < _conns.size(); i++) {
+ BSONObj res;
+ string err;
+ try {
+ if (!_conns[i]->runCommand("admin", BSON("getlasterror" << 1 << "fsync" << 1), res))
+ err = "cmd failed: ";
+ } catch (std::exception& e) {
+ err += e.what();
+ } catch (...) {
+ err += "unknown failure";
}
-
- if ( ok )
- return;
- throw UserException( 8001 , (string)"SyncClusterConnection write op failed: " + err.str() );
+ _lastErrors.push_back(res.getOwned());
+ errors.push_back(err);
}
- BSONObj SyncClusterConnection::getLastErrorDetailed(bool fsync, bool j, int w, int wtimeout) {
- return getLastErrorDetailed("admin", fsync, j, w, wtimeout);
- }
+ verify(_lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size());
- BSONObj SyncClusterConnection::getLastErrorDetailed(const std::string& db,
- bool fsync,
- bool j,
- int w,
- int wtimeout) {
- if ( _lastErrors.size() )
- return _lastErrors[0];
- return DBClientBase::getLastErrorDetailed(db,fsync,j,w,wtimeout);
- }
+ stringstream err;
+ bool ok = true;
- void SyncClusterConnection::_connect( const std::string& host ) {
- log() << "SyncClusterConnection connecting to [" << host << "]" << endl;
- DBClientConnection * c = new DBClientConnection( true );
- c->setRequestMetadataWriter(getRequestMetadataWriter());
- c->setReplyMetadataReader(getReplyMetadataReader());
- c->setSoTimeout( _socketTimeout );
- string errmsg;
- if ( ! c->connect( HostAndPort(host), errmsg ) )
- log() << "SyncClusterConnection connect fail to: " << host << " errmsg: " << errmsg << endl;
- _connAddresses.push_back( host );
- _conns.push_back( c );
+ for (size_t i = 0; i < _conns.size(); i++) {
+ BSONObj res = _lastErrors[i];
+ if (res["ok"].trueValue() &&
+ (res["fsyncFiles"].numberInt() > 0 || res.hasElement("waited") ||
+ res["syncMillis"].numberInt() >= 0))
+ continue;
+ ok = false;
+ err << _conns[i]->toString() << ": " << res << " " << errors[i];
}
- bool SyncClusterConnection::callRead( Message& toSend , Message& response ) {
- // TODO: need to save state of which one to go back to somehow...
- return _conns[0]->callRead( toSend , response );
- }
-
- bool SyncClusterConnection::runCommand(const std::string& dbname,
- const BSONObj& cmd,
- BSONObj& info,
- int options) {
-
- std::string ns = dbname + ".$cmd";
- BSONObj interposedCmd = cmd;
+ if (ok)
+ return;
+ throw UserException(8001, (string) "SyncClusterConnection write op failed: " + err.str());
+}
- if (getRequestMetadataWriter()) {
- // We have a metadata writer. We need to upconvert the metadata, write to it,
- // Then downconvert it again. This unfortunate, but this code is going to be
- // removed anyway as part of CSRS.
+BSONObj SyncClusterConnection::getLastErrorDetailed(bool fsync, bool j, int w, int wtimeout) {
+ return getLastErrorDetailed("admin", fsync, j, w, wtimeout);
+}
- BSONObj upconvertedCommand;
- BSONObj upconvertedMetadata;
+BSONObj SyncClusterConnection::getLastErrorDetailed(
+ const std::string& db, bool fsync, bool j, int w, int wtimeout) {
+ if (_lastErrors.size())
+ return _lastErrors[0];
+ return DBClientBase::getLastErrorDetailed(db, fsync, j, w, wtimeout);
+}
- std::tie(upconvertedCommand, upconvertedMetadata) = uassertStatusOK(
- rpc::upconvertRequestMetadata(cmd, options)
- );
+void SyncClusterConnection::_connect(const std::string& host) {
+ log() << "SyncClusterConnection connecting to [" << host << "]" << endl;
+ DBClientConnection* c = new DBClientConnection(true);
+ c->setRequestMetadataWriter(getRequestMetadataWriter());
+ c->setReplyMetadataReader(getReplyMetadataReader());
+ c->setSoTimeout(_socketTimeout);
+ string errmsg;
+ if (!c->connect(HostAndPort(host), errmsg))
+ log() << "SyncClusterConnection connect fail to: " << host << " errmsg: " << errmsg << endl;
+ _connAddresses.push_back(host);
+ _conns.push_back(c);
+}
- BSONObjBuilder metadataBob;
- metadataBob.appendElements(upconvertedMetadata);
+bool SyncClusterConnection::callRead(Message& toSend, Message& response) {
+ // TODO: need to save state of which one to go back to somehow...
+ return _conns[0]->callRead(toSend, response);
+}
- uassertStatusOK(getRequestMetadataWriter()(&metadataBob));
+bool SyncClusterConnection::runCommand(const std::string& dbname,
+ const BSONObj& cmd,
+ BSONObj& info,
+ int options) {
+ std::string ns = dbname + ".$cmd";
+ BSONObj interposedCmd = cmd;
- std::tie(interposedCmd, options) = uassertStatusOK(
- rpc::downconvertRequestMetadata(std::move(upconvertedCommand), metadataBob.done())
- );
- }
-
- BSONObj legacyResult = findOne(ns, Query(interposedCmd), 0, options);
+ if (getRequestMetadataWriter()) {
+ // We have a metadata writer. We need to upconvert the metadata, write to it,
+ // Then downconvert it again. This unfortunate, but this code is going to be
+ // removed anyway as part of CSRS.
+ BSONObj upconvertedCommand;
BSONObj upconvertedMetadata;
- BSONObj upconvertedReply;
- std::tie(upconvertedReply, upconvertedMetadata) = uassertStatusOK(
- rpc::upconvertReplyMetadata(legacyResult)
- );
+ std::tie(upconvertedCommand, upconvertedMetadata) =
+ uassertStatusOK(rpc::upconvertRequestMetadata(cmd, options));
- if (getReplyMetadataReader()) {
- // TODO: what does getServerAddress() actually mean here as this connection
- // represents a connection to 1 or 3 config servers...
- uassertStatusOK(getReplyMetadataReader()(upconvertedReply, getServerAddress()));
- }
+ BSONObjBuilder metadataBob;
+ metadataBob.appendElements(upconvertedMetadata);
- info = upconvertedReply;
+ uassertStatusOK(getRequestMetadataWriter()(&metadataBob));
- return isOk(info);
+ std::tie(interposedCmd, options) = uassertStatusOK(
+ rpc::downconvertRequestMetadata(std::move(upconvertedCommand), metadataBob.done()));
}
- BSONObj SyncClusterConnection::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) {
-
- if ( ns.find( ".$cmd" ) != string::npos ) {
- string cmdName = query.obj.firstElementFieldName();
-
- int lockType = _lockType( cmdName );
-
- if ( lockType > 0 ) { // write $cmd
- string errmsg;
- if ( ! prepare( errmsg ) )
- throw UserException( PrepareConfigsFailedCode , (string)"SyncClusterConnection::findOne prepare failed: " + errmsg );
-
- vector<BSONObj> all;
- for ( size_t i=0; i<_conns.size(); i++ ) {
- all.push_back( _conns[i]->findOne( ns , query , 0 , queryOptions ).getOwned() );
- }
-
- _checkLast();
-
- for ( size_t i=0; i<all.size(); i++ ) {
- BSONObj temp = all[i];
- if ( isOk( temp ) )
- continue;
- stringstream ss;
- ss << "write $cmd failed on a node: " << temp.jsonString();
- ss << " " << _conns[i]->toString();
- ss << " ns: " << ns;
- ss << " cmd: " << query.toString();
- throw UserException( 13105 , ss.str() );
- }
-
- return all[0];
- }
- }
+ BSONObj legacyResult = findOne(ns, Query(interposedCmd), 0, options);
- return DBClientBase::findOne( ns , query , fieldsToReturn , queryOptions );
- }
+ BSONObj upconvertedMetadata;
+ BSONObj upconvertedReply;
- void SyncClusterConnection::_auth(const BSONObj& params) {
- // A SCC is authenticated if any connection has been authenticated
- // Credentials are stored in the auto-reconnect connections.
+ std::tie(upconvertedReply, upconvertedMetadata) =
+ uassertStatusOK(rpc::upconvertReplyMetadata(legacyResult));
- bool authedOnce = false;
- vector<string> errors;
+ if (getReplyMetadataReader()) {
+ // TODO: what does getServerAddress() actually mean here as this connection
+ // represents a connection to 1 or 3 config servers...
+ uassertStatusOK(getReplyMetadataReader()(upconvertedReply, getServerAddress()));
+ }
- for ( vector<DBClientConnection*>::iterator it = _conns.begin(); it < _conns.end(); ++it ) {
+ info = upconvertedReply;
- massert( 15848, "sync cluster of sync clusters?",
- (*it)->type() != ConnectionString::SYNC );
+ return isOk(info);
+}
- // Authenticate or collect the error message
- string lastErrmsg;
- bool authed;
- try {
- // Auth errors can manifest either as exceptions or as false results
- // TODO: Make this better
- (*it)->auth(params);
- authed = true;
+BSONObj SyncClusterConnection::findOne(const string& ns,
+ const Query& query,
+ const BSONObj* fieldsToReturn,
+ int queryOptions) {
+ if (ns.find(".$cmd") != string::npos) {
+ string cmdName = query.obj.firstElementFieldName();
+
+ int lockType = _lockType(cmdName);
+
+ if (lockType > 0) { // write $cmd
+ string errmsg;
+ if (!prepare(errmsg))
+ throw UserException(PrepareConfigsFailedCode,
+ (string) "SyncClusterConnection::findOne prepare failed: " +
+ errmsg);
+
+ vector<BSONObj> all;
+ for (size_t i = 0; i < _conns.size(); i++) {
+ all.push_back(_conns[i]->findOne(ns, query, 0, queryOptions).getOwned());
}
- catch ( const DBException& e ) {
- // auth will be retried on reconnect
- lastErrmsg = e.what();
- authed = false;
- }
-
- if ( ! authed ) {
- // Since we're using auto-reconnect connections, we're sure the auth info has been
- // stored if needed for later
+ _checkLast();
- lastErrmsg = str::stream() << "auth error on " << (*it)->getServerAddress()
- << causedBy( lastErrmsg );
-
- LOG(1) << lastErrmsg << endl;
- errors.push_back( lastErrmsg );
+ for (size_t i = 0; i < all.size(); i++) {
+ BSONObj temp = all[i];
+ if (isOk(temp))
+ continue;
+ stringstream ss;
+ ss << "write $cmd failed on a node: " << temp.jsonString();
+ ss << " " << _conns[i]->toString();
+ ss << " ns: " << ns;
+ ss << " cmd: " << query.toString();
+ throw UserException(13105, ss.str());
}
- authedOnce = authedOnce || authed;
+ return all[0];
}
+ }
- if( authedOnce ) return;
+ return DBClientBase::findOne(ns, query, fieldsToReturn, queryOptions);
+}
- // Assemble the error message
- str::stream errStream;
- for( vector<string>::iterator it = errors.begin(); it != errors.end(); ++it ){
- if( it != errors.begin() ) errStream << " ::and:: ";
- errStream << *it;
+void SyncClusterConnection::_auth(const BSONObj& params) {
+ // A SCC is authenticated if any connection has been authenticated
+ // Credentials are stored in the auto-reconnect connections.
+
+ bool authedOnce = false;
+ vector<string> errors;
+
+ for (vector<DBClientConnection*>::iterator it = _conns.begin(); it < _conns.end(); ++it) {
+ massert(15848, "sync cluster of sync clusters?", (*it)->type() != ConnectionString::SYNC);
+
+ // Authenticate or collect the error message
+ string lastErrmsg;
+ bool authed;
+ try {
+ // Auth errors can manifest either as exceptions or as false results
+ // TODO: Make this better
+ (*it)->auth(params);
+ authed = true;
+ } catch (const DBException& e) {
+ // auth will be retried on reconnect
+ lastErrmsg = e.what();
+ authed = false;
}
- uasserted(ErrorCodes::AuthenticationFailed, errStream);
- }
+ if (!authed) {
+ // Since we're using auto-reconnect connections, we're sure the auth info has been
+ // stored if needed for later
- // TODO: logout is required for use of this class outside of a cluster environment
+ lastErrmsg = str::stream() << "auth error on " << (*it)->getServerAddress()
+ << causedBy(lastErrmsg);
- unique_ptr<DBClientCursor> SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip,
- const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) {
- _lastErrors.clear();
- if ( ns.find( ".$cmd" ) != string::npos ) {
- string cmdName = query.obj.firstElementFieldName();
- int lockType = _lockType( cmdName );
- uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection::query for:" + cmdName , lockType <= 0 );
+ LOG(1) << lastErrmsg << endl;
+ errors.push_back(lastErrmsg);
}
- return _queryOnActive( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize );
+ authedOnce = authedOnce || authed;
}
- bool SyncClusterConnection::_commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options ) {
- unique_ptr<DBClientCursor> cursor = _queryOnActive(dbname + ".$cmd", cmd, 1, 0, 0, options, 0);
- if ( cursor->more() )
- info = cursor->next().copy();
- else
- info = BSONObj();
- return isOk( info );
- }
+ if (authedOnce)
+ return;
- void SyncClusterConnection::attachQueryHandler( QueryHandler* handler ) {
- _customQueryHandler.reset( handler );
+ // Assemble the error message
+ str::stream errStream;
+ for (vector<string>::iterator it = errors.begin(); it != errors.end(); ++it) {
+ if (it != errors.begin())
+ errStream << " ::and:: ";
+ errStream << *it;
}
- unique_ptr<DBClientCursor> SyncClusterConnection::_queryOnActive(const string &ns, Query query, int nToReturn, int nToSkip,
- const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) {
-
- if ( _customQueryHandler && _customQueryHandler->canHandleQuery( ns, query ) ) {
-
- LOG( 2 ) << "custom query handler used for query on " << ns << ": "
- << query.toString() << endl;
-
- return _customQueryHandler->handleQuery( _connAddresses,
- ns,
- query,
- nToReturn,
- nToSkip,
- fieldsToReturn,
- queryOptions,
- batchSize );
- }
+ uasserted(ErrorCodes::AuthenticationFailed, errStream);
+}
- for ( size_t i=0; i<_conns.size(); i++ ) {
- try {
- unique_ptr<DBClientCursor> cursor =
- _conns[i]->query( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize );
- if ( cursor.get() )
- return cursor;
+// TODO: logout is required for use of this class outside of a cluster environment
+
+unique_ptr<DBClientCursor> SyncClusterConnection::query(const string& ns,
+ Query query,
+ int nToReturn,
+ int nToSkip,
+ const BSONObj* fieldsToReturn,
+ int queryOptions,
+ int batchSize) {
+ _lastErrors.clear();
+ if (ns.find(".$cmd") != string::npos) {
+ string cmdName = query.obj.firstElementFieldName();
+ int lockType = _lockType(cmdName);
+ uassert(13054,
+ (string) "write $cmd not supported in SyncClusterConnection::query for:" + cmdName,
+ lockType <= 0);
+ }
+
+ return _queryOnActive(ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize);
+}
- log() << "query on " << ns << ": " << query.toString() << " failed to: "
- << _conns[i]->toString() << " no data" << endl;
- }
- catch ( std::exception& e ) {
+bool SyncClusterConnection::_commandOnActive(const string& dbname,
+ const BSONObj& cmd,
+ BSONObj& info,
+ int options) {
+ unique_ptr<DBClientCursor> cursor = _queryOnActive(dbname + ".$cmd", cmd, 1, 0, 0, options, 0);
+ if (cursor->more())
+ info = cursor->next().copy();
+ else
+ info = BSONObj();
+ return isOk(info);
+}
- log() << "query on " << ns << ": " << query.toString() << " failed to: "
- << _conns[i]->toString() << " exception: " << e.what() << endl;
- }
- catch ( ... ) {
+void SyncClusterConnection::attachQueryHandler(QueryHandler* handler) {
+ _customQueryHandler.reset(handler);
+}
- log() << "query on " << ns << ": " << query.toString() << " failed to: "
- << _conns[i]->toString() << " exception" << endl;
- }
+unique_ptr<DBClientCursor> SyncClusterConnection::_queryOnActive(const string& ns,
+ Query query,
+ int nToReturn,
+ int nToSkip,
+ const BSONObj* fieldsToReturn,
+ int queryOptions,
+ int batchSize) {
+ if (_customQueryHandler && _customQueryHandler->canHandleQuery(ns, query)) {
+ LOG(2) << "custom query handler used for query on " << ns << ": " << query.toString()
+ << endl;
+
+ return _customQueryHandler->handleQuery(
+ _connAddresses, ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize);
+ }
+
+ for (size_t i = 0; i < _conns.size(); i++) {
+ try {
+ unique_ptr<DBClientCursor> cursor = _conns[i]->query(
+ ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize);
+ if (cursor.get())
+ return cursor;
+
+ log() << "query on " << ns << ": " << query.toString()
+ << " failed to: " << _conns[i]->toString() << " no data" << endl;
+ } catch (std::exception& e) {
+ log() << "query on " << ns << ": " << query.toString()
+ << " failed to: " << _conns[i]->toString() << " exception: " << e.what() << endl;
+ } catch (...) {
+ log() << "query on " << ns << ": " << query.toString()
+ << " failed to: " << _conns[i]->toString() << " exception" << endl;
}
- throw UserException( 8002 , str::stream() << "all servers down/unreachable when querying: " << _address );
}
+ throw UserException(
+ 8002, str::stream() << "all servers down/unreachable when querying: " << _address);
+}
- unique_ptr<DBClientCursor> SyncClusterConnection::getMore( const string &ns, long long cursorId, int nToReturn, int options ) {
- uassert( 10022 , "SyncClusterConnection::getMore not supported yet" , 0);
- unique_ptr<DBClientCursor> c;
- return c;
- }
+unique_ptr<DBClientCursor> SyncClusterConnection::getMore(const string& ns,
+ long long cursorId,
+ int nToReturn,
+ int options) {
+ uassert(10022, "SyncClusterConnection::getMore not supported yet", 0);
+ unique_ptr<DBClientCursor> c;
+ return c;
+}
- void SyncClusterConnection::insert( const string &ns, BSONObj obj , int flags) {
+void SyncClusterConnection::insert(const string& ns, BSONObj obj, int flags) {
+ uassert(13119,
+ str::stream() << "SyncClusterConnection::insert obj has to have an _id: " << obj,
+ nsToCollectionSubstring(ns) == "system.indexes" || obj["_id"].type());
- uassert(13119,
- str::stream() << "SyncClusterConnection::insert obj has to have an _id: " << obj,
- nsToCollectionSubstring(ns) == "system.indexes" || obj["_id"].type());
+ string errmsg;
+ if (!prepare(errmsg))
+ throw UserException(8003,
+ (string) "SyncClusterConnection::insert prepare failed: " + errmsg);
- string errmsg;
- if ( ! prepare( errmsg ) )
- throw UserException( 8003 , (string)"SyncClusterConnection::insert prepare failed: " + errmsg );
+ for (size_t i = 0; i < _conns.size(); i++) {
+ _conns[i]->insert(ns, obj, flags);
+ }
- for ( size_t i=0; i<_conns.size(); i++ ) {
- _conns[i]->insert( ns , obj , flags);
- }
+ _checkLast();
+}
- _checkLast();
+void SyncClusterConnection::insert(const string& ns, const vector<BSONObj>& v, int flags) {
+ if (v.size() == 1) {
+ insert(ns, v[0], flags);
+ return;
}
- void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v , int flags) {
- if (v.size() == 1){
- insert(ns, v[0], flags);
- return;
- }
-
- for (vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it ) {
- BSONObj obj = *it;
- if ( obj["_id"].type() == EOO ) {
- string assertMsg = "SyncClusterConnection::insert (batched) obj misses an _id: ";
- uasserted( 16743, assertMsg + obj.jsonString() );
- }
+ for (vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it) {
+ BSONObj obj = *it;
+ if (obj["_id"].type() == EOO) {
+ string assertMsg = "SyncClusterConnection::insert (batched) obj misses an _id: ";
+ uasserted(16743, assertMsg + obj.jsonString());
}
+ }
- // fsync all connections before starting the batch.
- string errmsg;
- if ( ! prepare( errmsg ) ) {
- string assertMsg = "SyncClusterConnection::insert (batched) prepare failed: ";
- throw UserException( 16744, assertMsg + errmsg );
- }
+ // fsync all connections before starting the batch.
+ string errmsg;
+ if (!prepare(errmsg)) {
+ string assertMsg = "SyncClusterConnection::insert (batched) prepare failed: ";
+ throw UserException(16744, assertMsg + errmsg);
+ }
- // We still want one getlasterror per document, even if they're batched.
- for ( size_t i=0; i<_conns.size(); i++ ) {
- for ( vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it ) {
- _conns[i]->insert( ns, *it, flags );
- _conns[i]->getLastErrorDetailed();
- }
+ // We still want one getlasterror per document, even if they're batched.
+ for (size_t i = 0; i < _conns.size(); i++) {
+ for (vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it) {
+ _conns[i]->insert(ns, *it, flags);
+ _conns[i]->getLastErrorDetailed();
}
-
- // We issue a final getlasterror, but this time with an fsync.
- _checkLast();
}
- void SyncClusterConnection::remove( const string &ns , Query query, int flags ) {
- string errmsg;
- if ( ! prepare( errmsg ) )
- throw UserException( 8020 , (string)"SyncClusterConnection::remove prepare failed: " + errmsg );
+ // We issue a final getlasterror, but this time with an fsync.
+ _checkLast();
+}
- for ( size_t i=0; i<_conns.size(); i++ ) {
- _conns[i]->remove( ns , query , flags );
- }
+void SyncClusterConnection::remove(const string& ns, Query query, int flags) {
+ string errmsg;
+ if (!prepare(errmsg))
+ throw UserException(8020,
+ (string) "SyncClusterConnection::remove prepare failed: " + errmsg);
- _checkLast();
+ for (size_t i = 0; i < _conns.size(); i++) {
+ _conns[i]->remove(ns, query, flags);
}
- void SyncClusterConnection::update(const string &ns, Query query, BSONObj obj, int flags) {
- if ( flags & UpdateOption_Upsert ) {
- uassert(13120,
- "SyncClusterConnection::update upsert query needs _id",
- query.obj["_id"].type());
- }
+ _checkLast();
+}
- string errmsg;
- if (!prepare(errmsg)) {
- throw UserException(8005,
- str::stream() << "SyncClusterConnection::update prepare failed: "
- << errmsg);
- }
+void SyncClusterConnection::update(const string& ns, Query query, BSONObj obj, int flags) {
+ if (flags & UpdateOption_Upsert) {
+ uassert(
+ 13120, "SyncClusterConnection::update upsert query needs _id", query.obj["_id"].type());
+ }
- for (size_t i = 0; i < _conns.size(); i++) {
- _conns[i]->update(ns, query, obj, flags);
- }
+ string errmsg;
+ if (!prepare(errmsg)) {
+ throw UserException(
+ 8005, str::stream() << "SyncClusterConnection::update prepare failed: " << errmsg);
+ }
- _checkLast();
- invariant(_lastErrors.size() > 1);
+ for (size_t i = 0; i < _conns.size(); i++) {
+ _conns[i]->update(ns, query, obj, flags);
+ }
- const int a = _lastErrors[0]["n"].numberInt();
+ _checkLast();
+ invariant(_lastErrors.size() > 1);
- for (unsigned i = 1; i < _lastErrors.size(); i++) {
- int b = _lastErrors[i]["n"].numberInt();
+ const int a = _lastErrors[0]["n"].numberInt();
- if (a == b)
- continue;
+ for (unsigned i = 1; i < _lastErrors.size(); i++) {
+ int b = _lastErrors[i]["n"].numberInt();
- throw UpdateNotTheSame(8017,
- str::stream() << "update not consistent "
- << " ns: " << ns
- << " query: " << query.toString()
- << " update: " << obj
- << " gle1: " << _lastErrors[0]
- << " gle2: " << _lastErrors[i],
- _connAddresses,
- _lastErrors);
- }
+ if (a == b)
+ continue;
+
+ throw UpdateNotTheSame(8017,
+ str::stream() << "update not consistent "
+ << " ns: " << ns << " query: " << query.toString()
+ << " update: " << obj << " gle1: " << _lastErrors[0]
+ << " gle2: " << _lastErrors[i],
+ _connAddresses,
+ _lastErrors);
}
+}
- string SyncClusterConnection::_toString() const {
- stringstream ss;
- ss << "SyncClusterConnection ";
- ss << " [";
- for ( size_t i = 0; i < _conns.size(); i++ ) {
- if ( i != 0 ) ss << ",";
- if ( _conns[i] ) {
- ss << _conns[i]->toString();
- }
- else {
- ss << "(no conn)";
- }
+string SyncClusterConnection::_toString() const {
+ stringstream ss;
+ ss << "SyncClusterConnection ";
+ ss << " [";
+ for (size_t i = 0; i < _conns.size(); i++) {
+ if (i != 0)
+ ss << ",";
+ if (_conns[i]) {
+ ss << _conns[i]->toString();
+ } else {
+ ss << "(no conn)";
}
- ss << "]";
- return ss.str();
}
+ ss << "]";
+ return ss.str();
+}
- bool SyncClusterConnection::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) {
- uassert( 8006 , "SyncClusterConnection::call can only be used directly for dbQuery" ,
- toSend.operation() == dbQuery );
-
- DbMessage d( toSend );
- uassert( 8007 , "SyncClusterConnection::call can't handle $cmd" , strstr( d.getns(), "$cmd" ) == 0 );
-
- for ( size_t i=0; i<_conns.size(); i++ ) {
- try {
- bool ok = _conns[i]->call( toSend , response , assertOk );
- if ( ok ) {
- if ( actualServer )
- *actualServer = _connAddresses[i];
- return ok;
- }
- log() << "call failed to: " << _conns[i]->toString() << " no data" << endl;
- }
- catch ( ... ) {
- log() << "call failed to: " << _conns[i]->toString() << " exception" << endl;
+bool SyncClusterConnection::call(Message& toSend,
+ Message& response,
+ bool assertOk,
+ string* actualServer) {
+ uassert(8006,
+ "SyncClusterConnection::call can only be used directly for dbQuery",
+ toSend.operation() == dbQuery);
+
+ DbMessage d(toSend);
+ uassert(8007, "SyncClusterConnection::call can't handle $cmd", strstr(d.getns(), "$cmd") == 0);
+
+ for (size_t i = 0; i < _conns.size(); i++) {
+ try {
+ bool ok = _conns[i]->call(toSend, response, assertOk);
+ if (ok) {
+ if (actualServer)
+ *actualServer = _connAddresses[i];
+ return ok;
}
+ log() << "call failed to: " << _conns[i]->toString() << " no data" << endl;
+ } catch (...) {
+ log() << "call failed to: " << _conns[i]->toString() << " exception" << endl;
}
- throw UserException( 8008 , str::stream() << "all servers down/unreachable: " << _address );
}
+ throw UserException(8008, str::stream() << "all servers down/unreachable: " << _address);
+}
- void SyncClusterConnection::say( Message &toSend, bool isRetry , string * actualServer ) {
- string errmsg;
- if ( ! prepare( errmsg ) )
- throw UserException( 13397 , (string)"SyncClusterConnection::say prepare failed: " + errmsg );
-
- for ( size_t i=0; i<_conns.size(); i++ ) {
- _conns[i]->say( toSend );
- }
-
- // TODO: should we set actualServer??
+void SyncClusterConnection::say(Message& toSend, bool isRetry, string* actualServer) {
+ string errmsg;
+ if (!prepare(errmsg))
+ throw UserException(13397, (string) "SyncClusterConnection::say prepare failed: " + errmsg);
- _checkLast();
+ for (size_t i = 0; i < _conns.size(); i++) {
+ _conns[i]->say(toSend);
}
- void SyncClusterConnection::sayPiggyBack( Message &toSend ) {
- verify(0);
- }
+ // TODO: should we set actualServer??
- int SyncClusterConnection::_lockType( const string& name ) {
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- map<string,int>::iterator i = _lockTypes.find( name );
- if ( i != _lockTypes.end() )
- return i->second;
- }
-
- BSONObj info;
- uassert( 13053 , str::stream() << "help failed: " << info , _commandOnActive( "admin" , BSON( name << "1" << "help" << 1 ) , info ) );
+ _checkLast();
+}
- int lockType = info["lockType"].numberInt();
+void SyncClusterConnection::sayPiggyBack(Message& toSend) {
+ verify(0);
+}
+int SyncClusterConnection::_lockType(const string& name) {
+ {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _lockTypes[name] = lockType;
- return lockType;
+ map<string, int>::iterator i = _lockTypes.find(name);
+ if (i != _lockTypes.end())
+ return i->second;
}
- void SyncClusterConnection::killCursor( long long cursorID ) {
- // should never need to do this
- verify(0);
- }
+ BSONObj info;
+ uassert(13053,
+ str::stream() << "help failed: " << info,
+ _commandOnActive("admin",
+ BSON(name << "1"
+ << "help" << 1),
+ info));
- // A SCC should be reused only if all the existing connections haven't been broken in the
- // background.
- // Note: an SCC may have missing connections if a config server is temporarily offline,
- // but reading from the others is still allowed.
- bool SyncClusterConnection::isStillConnected() {
- for ( size_t i = 0; i < _conns.size(); i++ ) {
- if ( _conns[i] && !_conns[i]->isStillConnected() ) return false;
+ int lockType = info["lockType"].numberInt();
- }
- return true;
- }
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _lockTypes[name] = lockType;
+ return lockType;
+}
- void SyncClusterConnection::setAllSoTimeouts( double socketTimeout ){
- _socketTimeout = socketTimeout;
- for ( size_t i=0; i<_conns.size(); i++ )
+void SyncClusterConnection::killCursor(long long cursorID) {
+ // should never need to do this
+ verify(0);
+}
- if( _conns[i] ) _conns[i]->setSoTimeout( socketTimeout );
+// A SCC should be reused only if all the existing connections haven't been broken in the
+// background.
+// Note: an SCC may have missing connections if a config server is temporarily offline,
+// but reading from the others is still allowed.
+bool SyncClusterConnection::isStillConnected() {
+ for (size_t i = 0; i < _conns.size(); i++) {
+ if (_conns[i] && !_conns[i]->isStillConnected())
+ return false;
}
+ return true;
+}
- void SyncClusterConnection::setRequestMetadataWriter(rpc::RequestMetadataWriter writer) {
- // Set the hooks in both our sub-connections and in ourselves.
- for (size_t i = 0; i < _conns.size(); ++i) {
- if (_conns[i]) {
- _conns[i]->setRequestMetadataWriter(writer);
- }
+void SyncClusterConnection::setAllSoTimeouts(double socketTimeout) {
+ _socketTimeout = socketTimeout;
+ for (size_t i = 0; i < _conns.size(); i++)
+
+ if (_conns[i])
+ _conns[i]->setSoTimeout(socketTimeout);
+}
+
+void SyncClusterConnection::setRequestMetadataWriter(rpc::RequestMetadataWriter writer) {
+ // Set the hooks in both our sub-connections and in ourselves.
+ for (size_t i = 0; i < _conns.size(); ++i) {
+ if (_conns[i]) {
+ _conns[i]->setRequestMetadataWriter(writer);
}
- DBClientWithCommands::setRequestMetadataWriter(std::move(writer));
}
+ DBClientWithCommands::setRequestMetadataWriter(std::move(writer));
+}
- void SyncClusterConnection::setReplyMetadataReader(rpc::ReplyMetadataReader reader) {
- // Set the hooks in both our sub-connections and in ourselves.
- for (size_t i = 0; i < _conns.size(); ++i) {
- if (_conns[i]) {
- _conns[i]->setReplyMetadataReader(reader);
- }
+void SyncClusterConnection::setReplyMetadataReader(rpc::ReplyMetadataReader reader) {
+ // Set the hooks in both our sub-connections and in ourselves.
+ for (size_t i = 0; i < _conns.size(); ++i) {
+ if (_conns[i]) {
+ _conns[i]->setReplyMetadataReader(reader);
}
- DBClientWithCommands::setReplyMetadataReader(std::move(reader));
}
+ DBClientWithCommands::setReplyMetadataReader(std::move(reader));
+}
}