diff options
author | Misha Tyulenev <misha@mongodb.com> | 2016-03-10 17:30:44 -0500 |
---|---|---|
committer | Misha Tyulenev <misha@mongodb.com> | 2016-03-10 18:18:38 -0500 |
commit | eaef6254d3bcb27657de670ec1b9c797965e7c82 (patch) | |
tree | 09d29bd80943025732ebd3139deeea0191d8e171 /src/mongo/client | |
parent | 41a738c7629cd52b357b0bce6650182219ae9088 (diff) | |
download | mongo-eaef6254d3bcb27657de670ec1b9c797965e7c82.tar.gz |
SERVER-22320 remove SYNC option and SyncClusterConnection
Diffstat (limited to 'src/mongo/client')
-rw-r--r-- | src/mongo/client/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/client/connection_string.cpp | 70 | ||||
-rw-r--r-- | src/mongo/client/connection_string.h | 8 | ||||
-rw-r--r-- | src/mongo/client/connection_string_connect.cpp | 10 | ||||
-rw-r--r-- | src/mongo/client/connection_string_test.cpp | 5 | ||||
-rw-r--r-- | src/mongo/client/connpool.cpp | 6 | ||||
-rw-r--r-- | src/mongo/client/dbclientcursor.cpp | 3 | ||||
-rw-r--r-- | src/mongo/client/remote_command_targeter_factory_impl.cpp | 1 | ||||
-rw-r--r-- | src/mongo/client/syncclusterconnection.cpp | 689 | ||||
-rw-r--r-- | src/mongo/client/syncclusterconnection.h | 272 |
10 files changed, 32 insertions, 1033 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 7aeab208ca4..71a65c735a2 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -131,7 +131,6 @@ env.Library( 'global_conn_pool.cpp', 'replica_set_monitor.cpp', 'replica_set_monitor_manager.cpp', - 'syncclusterconnection.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authcommon', diff --git a/src/mongo/client/connection_string.cpp b/src/mongo/client/connection_string.cpp index 69b3f952039..18ee8ea1234 100644 --- a/src/mongo/client/connection_string.cpp +++ b/src/mongo/client/connection_string.cpp @@ -47,6 +47,7 @@ ConnectionString::ConnectionString(StringData setName, std::vector<HostAndPort> _finishInit(); } +// TODO: unify c-tors ConnectionString::ConnectionString(ConnectionType type, const std::string& s, const std::string& setName) { @@ -63,18 +64,9 @@ ConnectionString::ConnectionString(ConnectionType type, _finishInit(); } -ConnectionString::ConnectionString(const std::string& s, ConnectionType favoredMultipleType) { +ConnectionString::ConnectionString(const std::string& s, ConnectionType connType) + : _type(connType) { _fillServers(s); - - if (_type != INVALID) { - // set already - } else if (_servers.size() == 1) { - _type = MASTER; - } else { - _type = favoredMultipleType; - verify(_type == SET || _type == SYNC); - } - _finishInit(); } @@ -83,6 +75,7 @@ ConnectionString ConnectionString::forReplicaSet(StringData setName, return ConnectionString(setName, std::move(servers)); } +// TODO: rewrite parsing make it more reliable void ConnectionString::_fillServers(std::string s) { // // Custom-handled servers/replica sets start with '$' @@ -90,34 +83,38 @@ void ConnectionString::_fillServers(std::string s) { // (also disallows $replicaSetName hosts) // - if (s.find('$') == 0) + if (s.find('$') == 0) { _type = CUSTOM; + } - { - std::string::size_type idx = s.find('/'); - if (idx != std::string::npos) { - _setName = s.substr(0, idx); - s = s.substr(idx + 1); - if (_type != CUSTOM) - _type = SET; - } + std::string::size_type idx = s.find('/'); + if (idx != std::string::npos) { + _setName = s.substr(0, idx); + s = s.substr(idx + 1); + if (_type != CUSTOM) + _type = SET; } - std::string::size_type idx; while ((idx = s.find(',')) != std::string::npos) { _servers.push_back(HostAndPort(s.substr(0, idx))); s = s.substr(idx + 1); } + _servers.push_back(HostAndPort(s)); + + if (_servers.size() == 1 && _type == INVALID) { + _type = MASTER; + } } void ConnectionString::_finishInit() { switch (_type) { case MASTER: + verify(_setName.empty()); verify(_servers.size() == 1); break; case SET: - verify(_setName.size()); + verify(!_setName.empty()); verify(_servers.size() >= 1); // 1 is ok since we can derive break; default: @@ -149,6 +146,7 @@ void ConnectionString::_finishInit() { _string = ss.str(); } +// TODO: SERVER-23014 bool ConnectionString::sameLogicalEndpoint(const ConnectionString& other) const { if (_type != other._type) { return false; @@ -161,26 +159,6 @@ bool ConnectionString::sameLogicalEndpoint(const ConnectionString& other) const return _servers[0] == other._servers[0]; case SET: return _setName == other._setName; - case SYNC: - // The servers all have to be the same in each, but not in the same order. - if (_servers.size() != other._servers.size()) { - return false; - } - - for (unsigned i = 0; i < _servers.size(); i++) { - bool found = false; - for (unsigned j = 0; j < other._servers.size(); j++) { - if (_servers[i] == other._servers[j]) { - found = true; - break; - } - } - - if (!found) - return false; - } - - return true; case CUSTOM: return _string == other._string; } @@ -209,9 +187,11 @@ StatusWith<ConnectionString> ConnectionString::parse(const std::string& url) { return ConnectionString(singleHost); } - // Sharding config server if (numCommas == 2) { - return ConnectionString(SYNC, url, ""); + return Status(ErrorCodes::FailedToParse, + str::stream() << "mirrored config server connections are not supported; for " + "config server replica sets be sure to use the replica set " + "connection string"); } return Status(ErrorCodes::FailedToParse, str::stream() << "invalid url [" << url << "]"); @@ -225,8 +205,6 @@ std::string ConnectionString::typeToString(ConnectionType type) { return "master"; case SET: return "set"; - case SYNC: - return "sync"; case CUSTOM: return "custom"; } diff --git a/src/mongo/client/connection_string.h b/src/mongo/client/connection_string.h index 83e20906ed3..fbb29a829f6 100644 --- a/src/mongo/client/connection_string.h +++ b/src/mongo/client/connection_string.h @@ -47,10 +47,6 @@ class DBClientBase; * server * server:port * foo/server:port,server:port SET - * server,server,server SYNC - * Warning - you usually don't want "SYNC", it's used - * for some special things such as sharding config servers. - * See syncclusterconnection.h for more info. * * Typical use: * @@ -60,7 +56,7 @@ class DBClientBase; */ class ConnectionString { public: - enum ConnectionType { INVALID, MASTER, SET, SYNC, CUSTOM }; + enum ConnectionType { INVALID, MASTER, SET, CUSTOM }; ConnectionString() = default; @@ -86,7 +82,7 @@ public: std::vector<HostAndPort> servers, const std::string& setName); - ConnectionString(const std::string& s, ConnectionType favoredMultipleType); + ConnectionString(const std::string& s, ConnectionType connType); bool isValid() const { return _type != INVALID; diff --git a/src/mongo/client/connection_string_connect.cpp b/src/mongo/client/connection_string_connect.cpp index efca34792e6..3a8e0287656 100644 --- a/src/mongo/client/connection_string_connect.cpp +++ b/src/mongo/client/connection_string_connect.cpp @@ -36,7 +36,6 @@ #include "mongo/client/dbclient_rs.h" #include "mongo/client/dbclientinterface.h" -#include "mongo/client/syncclusterconnection.h" #include "mongo/stdx/memory.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -69,15 +68,6 @@ DBClientBase* ConnectionString::connect(std::string& errmsg, double socketTimeou return set.release(); } - case SYNC: { - // TODO , don't copy - std::list<HostAndPort> l; - for (unsigned i = 0; i < _servers.size(); i++) - l.push_back(_servers[i]); - SyncClusterConnection* c = new SyncClusterConnection(l, socketTimeout); - return c; - } - case CUSTOM: { // Lock in case other things are modifying this at the same time stdx::lock_guard<stdx::mutex> lk(_connectHookMutex); diff --git a/src/mongo/client/connection_string_test.cpp b/src/mongo/client/connection_string_test.cpp index 4a1dfd81384..655ff17c207 100644 --- a/src/mongo/client/connection_string_test.cpp +++ b/src/mongo/client/connection_string_test.cpp @@ -35,7 +35,8 @@ namespace { using namespace mongo; - +/* + TODO: SERVER-23014 SYNC is gone but this can be a good check for equality TEST(ConnectionString, EqualitySync) { ConnectionString cs(ConnectionString::SYNC, "a,b,c", ""); @@ -45,5 +46,5 @@ TEST(ConnectionString, EqualitySync) { ASSERT(!cs.sameLogicalEndpoint(ConnectionString(ConnectionString::SYNC, "d,a,b", ""))); } - +*/ } // namespace diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp index 2a019db9787..4bdc263cf33 100644 --- a/src/mongo/client/connpool.cpp +++ b/src/mongo/client/connpool.cpp @@ -41,7 +41,6 @@ #include "mongo/client/connection_string.h" #include "mongo/client/global_conn_pool.h" #include "mongo/client/replica_set_monitor.h" -#include "mongo/client/syncclusterconnection.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" @@ -474,10 +473,9 @@ void ScopedDbConnection::done() { void ScopedDbConnection::_setSocketTimeout() { if (!_conn) return; + if (_conn->type() == ConnectionString::MASTER) - ((DBClientConnection*)_conn)->setSoTimeout(_socketTimeout); - else if (_conn->type() == ConnectionString::SYNC) - ((SyncClusterConnection*)_conn)->setAllSoTimeouts(_socketTimeout); + static_cast<DBClientConnection*>(_conn)->setSoTimeout(_socketTimeout); } ScopedDbConnection::~ScopedDbConnection() { diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp index 2e0a5bdade3..489f647a2ca 100644 --- a/src/mongo/client/dbclientcursor.cpp +++ b/src/mongo/client/dbclientcursor.cpp @@ -432,8 +432,7 @@ void DBClientCursor::attach(AScopedConnection* conn) { verify(conn); verify(conn->get()); - if (conn->get()->type() == ConnectionString::SET || - conn->get()->type() == ConnectionString::SYNC) { + if (conn->get()->type() == ConnectionString::SET) { if (_lazyHost.size() > 0) _scopedHost = _lazyHost; else if (_client) diff --git a/src/mongo/client/remote_command_targeter_factory_impl.cpp b/src/mongo/client/remote_command_targeter_factory_impl.cpp index 338cde7c504..3c3b1974eac 100644 --- a/src/mongo/client/remote_command_targeter_factory_impl.cpp +++ b/src/mongo/client/remote_command_targeter_factory_impl.cpp @@ -54,7 +54,6 @@ std::unique_ptr<RemoteCommandTargeter> RemoteCommandTargeterFactoryImpl::create( return stdx::make_unique<RemoteCommandTargeterRS>(connStr.getSetName(), connStr.getServers()); case ConnectionString::INVALID: - case ConnectionString::SYNC: // These connections should never be seen break; } diff --git a/src/mongo/client/syncclusterconnection.cpp b/src/mongo/client/syncclusterconnection.cpp deleted file mode 100644 index 52ed06592ec..00000000000 --- a/src/mongo/client/syncclusterconnection.cpp +++ /dev/null @@ -1,689 +0,0 @@ -// syncclusterconnection.cpp -/* - * Copyright 2010 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork - -#include "mongo/platform/basic.h" - -#include "mongo/client/syncclusterconnection.h" - -#include "mongo/client/dbclientcursor.h" -#include "mongo/client/dbclientinterface.h" -#include "mongo/db/dbmessage.h" -#include "mongo/db/namespace_string.h" -#include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/util/log.h" - -namespace mongo { - -using std::unique_ptr; -using std::endl; -using std::list; -using std::map; -using std::string; -using std::stringstream; -using std::vector; - -namespace { -SyncClusterConnection::ConnectionValidationHook connectionHook; -} // namespace - -SyncClusterConnection::SyncClusterConnection(const list<HostAndPort>& L, double socketTimeout) - : _socketTimeout(socketTimeout) { - { - stringstream s; - int n = 0; - for (list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++) { - if (++n > 1) - s << ','; - s << i->toString(); - } - _address = s.str(); - } - for (list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++) - _connect(i->toString()); -} - -SyncClusterConnection::SyncClusterConnection(string commaSeparated, double socketTimeout) - : _socketTimeout(socketTimeout) { - _address = commaSeparated; - string::size_type idx; - while ((idx = commaSeparated.find(',')) != string::npos) { - string h = commaSeparated.substr(0, idx); - commaSeparated = commaSeparated.substr(idx + 1); - _connect(h); - } - _connect(commaSeparated); - uassert(8004, "SyncClusterConnection needs 3 servers", _conns.size() == 3); -} - -SyncClusterConnection::SyncClusterConnection(const std::string& a, - const std::string& b, - const std::string& c, - double socketTimeout) - : _socketTimeout(socketTimeout) { - _address = a + "," + b + "," + c; - // connect to all even if not working - _connect(a); - _connect(b); - _connect(c); -} - -SyncClusterConnection::SyncClusterConnection(SyncClusterConnection& prev, double socketTimeout) - : _socketTimeout(socketTimeout) { - verify(0); -} - -SyncClusterConnection::~SyncClusterConnection() { - for (size_t i = 0; i < _conns.size(); i++) - delete _conns[i]; - _conns.clear(); -} - -void SyncClusterConnection::setConnectionValidationHook(ConnectionValidationHook hook) { - connectionHook = std::move(hook); -} - -bool SyncClusterConnection::prepare(string& errmsg) { - _lastErrors.clear(); - - bool ok = true; - errmsg = ""; - - for (size_t i = 0; i < _conns.size(); i++) { - string singleErr; - try { - _conns[i]->simpleCommand("admin", NULL, "resetError"); - singleErr = _conns[i]->getLastError(true); - - if (singleErr.size() == 0) - continue; - - } catch (const DBException& e) { - if (e.getCode() == ErrorCodes::IncompatibleCatalogManager) { - throw; - } - singleErr = e.toString(); - } - ok = false; - errmsg += " " + _conns[i]->toString() + ":" + singleErr; - } - - return ok; -} - -void SyncClusterConnection::_checkLast() { - _lastErrors.clear(); - vector<string> errors; - - for (size_t i = 0; i < _conns.size(); i++) { - BSONObj res; - string err; - try { - if (!_conns[i]->runCommand("admin", BSON("getlasterror" << 1 << "fsync" << 1), res)) - err = "cmd failed: "; - } catch (std::exception& e) { - err += e.what(); - } catch (...) { - err += "unknown failure"; - } - _lastErrors.push_back(res.getOwned()); - errors.push_back(err); - } - - verify(_lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size()); - - stringstream err; - bool ok = true; - - for (size_t i = 0; i < _conns.size(); i++) { - BSONObj res = _lastErrors[i]; - if (res["ok"].trueValue() && - (res["fsyncFiles"].numberInt() > 0 || res.hasElement("waited") || - res["syncMillis"].numberInt() >= 0)) - continue; - ok = false; - err << _conns[i]->toString() << ": " << res << " " << errors[i]; - } - - if (ok) - return; - throw UserException(8001, (string) "SyncClusterConnection write op failed: " + err.str()); -} - -BSONObj SyncClusterConnection::getLastErrorDetailed(bool fsync, bool j, int w, int wtimeout) { - return getLastErrorDetailed("admin", fsync, j, w, wtimeout); -} - -BSONObj SyncClusterConnection::getLastErrorDetailed( - const std::string& db, bool fsync, bool j, int w, int wtimeout) { - if (_lastErrors.size()) - return _lastErrors[0]; - return DBClientBase::getLastErrorDetailed(db, fsync, j, w, wtimeout); -} - -void SyncClusterConnection::_connect(const std::string& hostStr) { - log() << "SyncClusterConnection connecting to [" << hostStr << "]" << endl; - const HostAndPort host(hostStr); - DBClientConnection* c; - if (connectionHook) { - c = new DBClientConnection( - true, // auto reconnect - 0, // socket timeout - [this, host](const executor::RemoteCommandResponse& isMasterReply) { - return connectionHook(host, isMasterReply); - }); - } else { - c = new DBClientConnection(true); - } - - c->setRequestMetadataWriter(getRequestMetadataWriter()); - c->setReplyMetadataReader(getReplyMetadataReader()); - c->setSoTimeout(_socketTimeout); - Status status = c->connect(host); - if (!status.isOK()) { - log() << "SyncClusterConnection connect fail to: " << hostStr << causedBy(status); - if (status == ErrorCodes::IncompatibleCatalogManager) { - // Make sure to propagate IncompatibleCatalogManager errors to trigger catalog manager - // swapping. - uassertStatusOK(status); - } - } - _connAddresses.push_back(hostStr); - _conns.push_back(c); -} - -bool SyncClusterConnection::runCommand(const std::string& dbname, - const BSONObj& cmd, - BSONObj& info, - int options) { - std::string ns = dbname + ".$cmd"; - BSONObj interposedCmd = cmd; - - if (getRequestMetadataWriter()) { - // We have a metadata writer. We need to upconvert the metadata, write to it, - // Then downconvert it again. This unfortunate, but this code is going to be - // removed anyway as part of CSRS. - - BSONObj upconvertedCommand; - BSONObj upconvertedMetadata; - - std::tie(upconvertedCommand, upconvertedMetadata) = - uassertStatusOK(rpc::upconvertRequestMetadata(cmd, options)); - - BSONObjBuilder metadataBob; - metadataBob.appendElements(upconvertedMetadata); - - uassertStatusOK(getRequestMetadataWriter()(&metadataBob, getServerAddress())); - - std::tie(interposedCmd, options) = uassertStatusOK( - rpc::downconvertRequestMetadata(std::move(upconvertedCommand), metadataBob.done())); - } - - BSONObj legacyResult = findOne(ns, Query(interposedCmd), 0, options); - - BSONObj upconvertedMetadata; - BSONObj upconvertedReply; - - std::tie(upconvertedReply, upconvertedMetadata) = - uassertStatusOK(rpc::upconvertReplyMetadata(legacyResult)); - - if (getReplyMetadataReader()) { - // TODO: what does getServerAddress() actually mean here as this connection - // represents a connection to 1 or 3 config servers... - uassertStatusOK(getReplyMetadataReader()(upconvertedReply, getServerAddress())); - } - - info = upconvertedReply; - - return isOk(info); -} - -BSONObj SyncClusterConnection::findOne(const string& ns, - const Query& query, - const BSONObj* fieldsToReturn, - int queryOptions) { - if (ns.find(".$cmd") != string::npos) { - string cmdName = query.obj.firstElementFieldName(); - - int lockType = _lockType(cmdName); - - if (lockType > 0) { // write $cmd - string errmsg; - if (!prepare(errmsg)) - throw UserException(ErrorCodes::PrepareConfigsFailed, - (string) "SyncClusterConnection::findOne prepare failed: " + - errmsg); - - vector<BSONObj> all; - for (size_t i = 0; i < _conns.size(); i++) { - all.push_back(_conns[i]->findOne(ns, query, 0, queryOptions).getOwned()); - } - - _checkLast(); - - for (size_t i = 0; i < all.size(); i++) { - Status status = getStatusFromCommandResult(all[i]); - if (status.isOK()) { - continue; - } - - stringstream ss; - ss << "write $cmd failed on a node: " << status.toString(); - ss << " " << _conns[i]->toString(); - ss << " ns: " << ns; - ss << " cmd: " << query.toString(); - throw UserException(status.code(), ss.str()); - } - - return all[0]; - } - } - - return DBClientBase::findOne(ns, query, fieldsToReturn, queryOptions); -} - -void SyncClusterConnection::_auth(const BSONObj& params) { - // A SCC is authenticated if any connection has been authenticated - // Credentials are stored in the auto-reconnect connections. - - bool authedOnce = false; - vector<string> errors; - - for (vector<DBClientConnection*>::iterator it = _conns.begin(); it < _conns.end(); ++it) { - massert(15848, "sync cluster of sync clusters?", (*it)->type() != ConnectionString::SYNC); - - // Authenticate or collect the error message - string lastErrmsg; - bool authed; - try { - // Auth errors can manifest either as exceptions or as false results - // TODO: Make this better - (*it)->auth(params); - authed = true; - } catch (const DBException& e) { - // auth will be retried on reconnect - lastErrmsg = e.what(); - authed = false; - } - - if (!authed) { - // Since we're using auto-reconnect connections, we're sure the auth info has been - // stored if needed for later - - lastErrmsg = str::stream() << "auth error on " << (*it)->getServerAddress() - << causedBy(lastErrmsg); - - LOG(1) << lastErrmsg << endl; - errors.push_back(lastErrmsg); - } - - authedOnce = authedOnce || authed; - } - - if (authedOnce) - return; - - // Assemble the error message - str::stream errStream; - for (vector<string>::iterator it = errors.begin(); it != errors.end(); ++it) { - if (it != errors.begin()) - errStream << " ::and:: "; - errStream << *it; - } - - uasserted(ErrorCodes::AuthenticationFailed, errStream); -} - -// TODO: logout is required for use of this class outside of a cluster environment - -unique_ptr<DBClientCursor> SyncClusterConnection::query(const string& ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize) { - _lastErrors.clear(); - if (ns.find(".$cmd") != string::npos) { - string cmdName = query.obj.firstElementFieldName(); - int lockType = _lockType(cmdName); - uassert(13054, - (string) "write $cmd not supported in SyncClusterConnection::query for:" + cmdName, - lockType <= 0); - } - - return _queryOnActive(ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); -} - -bool SyncClusterConnection::_commandOnActive(const string& dbname, - const BSONObj& cmd, - BSONObj& info, - int options) { - unique_ptr<DBClientCursor> cursor = _queryOnActive(dbname + ".$cmd", cmd, 1, 0, 0, options, 0); - if (cursor->more()) - info = cursor->next().copy(); - else - info = BSONObj(); - return isOk(info); -} - -void SyncClusterConnection::attachQueryHandler(QueryHandler* handler) { - _customQueryHandler.reset(handler); -} - -unique_ptr<DBClientCursor> SyncClusterConnection::_queryOnActive(const string& ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize) { - if (_customQueryHandler && _customQueryHandler->canHandleQuery(ns, query)) { - LOG(2) << "custom query handler used for query on " << ns << ": " << query.toString() - << endl; - - return _customQueryHandler->handleQuery( - _connAddresses, ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); - } - - for (size_t i = 0; i < _conns.size(); i++) { - try { - unique_ptr<DBClientCursor> cursor = _conns[i]->query( - ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); - if (cursor.get()) - return cursor; - - log() << "query on " << ns << ": " << query.toString() - << " failed to: " << _conns[i]->toString() << " no data" << endl; - } catch (std::exception& e) { - log() << "query on " << ns << ": " << query.toString() - << " failed to: " << _conns[i]->toString() << " exception: " << e.what() << endl; - } catch (...) { - log() << "query on " << ns << ": " << query.toString() - << " failed to: " << _conns[i]->toString() << " exception" << endl; - } - } - throw UserException(ErrorCodes::HostUnreachable, - str::stream() - << "all servers down/unreachable when querying: " << _address); -} - -unique_ptr<DBClientCursor> SyncClusterConnection::getMore(const string& ns, - long long cursorId, - int nToReturn, - int options) { - uassert(10022, "SyncClusterConnection::getMore not supported yet", 0); - unique_ptr<DBClientCursor> c; - return c; -} - -void SyncClusterConnection::insert(const string& ns, BSONObj obj, int flags) { - uassert(13119, - str::stream() << "SyncClusterConnection::insert obj has to have an _id: " << obj, - nsToCollectionSubstring(ns) == "system.indexes" || obj["_id"].type()); - - string errmsg; - if (!prepare(errmsg)) - throw UserException(8003, - (string) "SyncClusterConnection::insert prepare failed: " + errmsg); - - for (size_t i = 0; i < _conns.size(); i++) { - _conns[i]->insert(ns, obj, flags); - } - - _checkLast(); -} - -void SyncClusterConnection::insert(const string& ns, const vector<BSONObj>& v, int flags) { - if (v.size() == 1) { - insert(ns, v[0], flags); - return; - } - - for (vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it) { - BSONObj obj = *it; - if (obj["_id"].type() == EOO) { - string assertMsg = "SyncClusterConnection::insert (batched) obj misses an _id: "; - uasserted(16743, assertMsg + obj.jsonString()); - } - } - - // fsync all connections before starting the batch. - string errmsg; - if (!prepare(errmsg)) { - string assertMsg = "SyncClusterConnection::insert (batched) prepare failed: "; - throw UserException(16744, assertMsg + errmsg); - } - - // We still want one getlasterror per document, even if they're batched. - for (size_t i = 0; i < _conns.size(); i++) { - for (vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it) { - _conns[i]->insert(ns, *it, flags); - _conns[i]->getLastErrorDetailed(); - } - } - - // We issue a final getlasterror, but this time with an fsync. - _checkLast(); -} - -void SyncClusterConnection::remove(const string& ns, Query query, int flags) { - string errmsg; - if (!prepare(errmsg)) - throw UserException(8020, - (string) "SyncClusterConnection::remove prepare failed: " + errmsg); - - for (size_t i = 0; i < _conns.size(); i++) { - _conns[i]->remove(ns, query, flags); - } - - _checkLast(); -} - -void SyncClusterConnection::update(const string& ns, Query query, BSONObj obj, int flags) { - if (flags & UpdateOption_Upsert) { - uassert( - 13120, "SyncClusterConnection::update upsert query needs _id", query.obj["_id"].type()); - } - - string errmsg; - if (!prepare(errmsg)) { - throw UserException( - 8005, str::stream() << "SyncClusterConnection::update prepare failed: " << errmsg); - } - - for (size_t i = 0; i < _conns.size(); i++) { - _conns[i]->update(ns, query, obj, flags); - } - - _checkLast(); - invariant(_lastErrors.size() > 1); - - const int a = _lastErrors[0]["n"].numberInt(); - - for (unsigned i = 1; i < _lastErrors.size(); i++) { - int b = _lastErrors[i]["n"].numberInt(); - - if (a == b) - continue; - - throw UpdateNotTheSame(8017, - str::stream() << "update not consistent " - << " ns: " << ns << " query: " << query.toString() - << " update: " << obj << " gle1: " << _lastErrors[0] - << " gle2: " << _lastErrors[i], - _connAddresses, - _lastErrors); - } -} - -string SyncClusterConnection::_toString() const { - stringstream ss; - ss << "SyncClusterConnection "; - ss << " ["; - for (size_t i = 0; i < _conns.size(); i++) { - if (i != 0) - ss << ","; - if (_conns[i]) { - ss << _conns[i]->toString(); - } else { - ss << "(no conn)"; - } - } - ss << "]"; - return ss.str(); -} - -bool SyncClusterConnection::call(Message& toSend, - Message& response, - bool assertOk, - string* actualServer) { - uassert(8006, - "SyncClusterConnection::call can only be used directly for dbQuery", - toSend.operation() == dbQuery); - - DbMessage d(toSend); - uassert(8007, "SyncClusterConnection::call can't handle $cmd", strstr(d.getns(), "$cmd") == 0); - - for (size_t i = 0; i < _conns.size(); i++) { - try { - bool ok = _conns[i]->call(toSend, response, assertOk, nullptr); - if (ok) { - if (actualServer) - *actualServer = _connAddresses[i]; - return ok; - } - log() << "call failed to: " << _conns[i]->toString() << " no data" << endl; - } catch (...) { - log() << "call failed to: " << _conns[i]->toString() << " exception" << endl; - } - } - throw UserException(8008, str::stream() << "all servers down/unreachable: " << _address); -} - -void SyncClusterConnection::say(Message& toSend, bool isRetry, string* actualServer) { - string errmsg; - if (!prepare(errmsg)) - throw UserException(13397, (string) "SyncClusterConnection::say prepare failed: " + errmsg); - - for (size_t i = 0; i < _conns.size(); i++) { - _conns[i]->say(toSend); - } - - // TODO: should we set actualServer?? - - _checkLast(); -} - -int SyncClusterConnection::_lockType(const string& name) { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - map<string, int>::iterator i = _lockTypes.find(name); - if (i != _lockTypes.end()) - return i->second; - } - - BSONObj info; - uassert(13053, - str::stream() << "help failed: " << info, - _commandOnActive("admin", - BSON(name << "1" - << "help" << 1), - info)); - - int lockType = info["lockType"].numberInt(); - - stdx::lock_guard<stdx::mutex> lk(_mutex); - _lockTypes[name] = lockType; - return lockType; -} - -void SyncClusterConnection::killCursor(long long cursorID) { - // should never need to do this - verify(0); -} - -// A SCC should be reused only if all the existing connections haven't been broken in the -// background. -// Note: an SCC may have missing connections if a config server is temporarily offline, -// but reading from the others is still allowed. -bool SyncClusterConnection::isStillConnected() { - for (size_t i = 0; i < _conns.size(); i++) { - if (_conns[i] && !_conns[i]->isStillConnected()) - return false; - } - return true; -} - -int SyncClusterConnection::getMinWireVersion() { - int minVersion = 0; - for (const auto& host : _conns) { - minVersion = std::max(minVersion, host->getMinWireVersion()); - } - return minVersion; -} - -int SyncClusterConnection::getMaxWireVersion() { - int maxVersion = std::numeric_limits<int>::max(); - for (const auto& host : _conns) { - maxVersion = std::min(maxVersion, host->getMaxWireVersion()); - } - return maxVersion; -} - -void SyncClusterConnection::setAllSoTimeouts(double socketTimeout) { - _socketTimeout = socketTimeout; - for (size_t i = 0; i < _conns.size(); i++) - - if (_conns[i]) - _conns[i]->setSoTimeout(socketTimeout); -} - -void SyncClusterConnection::setRequestMetadataWriter(rpc::RequestMetadataWriter writer) { - // Set the hooks in both our sub-connections and in ourselves. - for (size_t i = 0; i < _conns.size(); ++i) { - if (_conns[i]) { - _conns[i]->setRequestMetadataWriter(writer); - } - } - DBClientWithCommands::setRequestMetadataWriter(std::move(writer)); -} - -void SyncClusterConnection::setReplyMetadataReader(rpc::ReplyMetadataReader reader) { - // Set the hooks in both our sub-connections and in ourselves. - for (size_t i = 0; i < _conns.size(); ++i) { - if (_conns[i]) { - _conns[i]->setReplyMetadataReader(reader); - } - } - DBClientWithCommands::setReplyMetadataReader(std::move(reader)); -} -} diff --git a/src/mongo/client/syncclusterconnection.h b/src/mongo/client/syncclusterconnection.h deleted file mode 100644 index 40373d4ae19..00000000000 --- a/src/mongo/client/syncclusterconnection.h +++ /dev/null @@ -1,272 +0,0 @@ -// @file syncclusterconnection.h - -/* - * Copyright 2010 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - - -#include "mongo/bson/bsonelement.h" -#include "mongo/bson/bsonobj.h" -#include "mongo/client/dbclientinterface.h" -#include "mongo/stdx/mutex.h" -#include "mongo/stdx/functional.h" -#include "mongo/util/concurrency/mutex.h" - -namespace mongo { - -/** - * This is a connection to a cluster of servers that operate as one - * for super high durability. - * - * Write operations are two-phase. First, all nodes are asked to fsync. If successful - * everywhere, the write is sent everywhere and then followed by an fsync. There is no - * rollback if a problem occurs during the second phase. Naturally, with all these fsyncs, - * these operations will be quite slow -- use sparingly. - * - * Read operations are sent to a single random node. - * - * The class checks if a command is read or write style, and sends to a single - * node if a read lock command and to all in two phases with a write style command. - */ -class SyncClusterConnection : public DBClientBase { -public: - using DBClientBase::query; - using DBClientBase::update; - using DBClientBase::remove; - - class QueryHandler; - - /** - * Hook run on the response of an isMaster request. Run on the DBClientConnections that - * SyncClusterConnection uses under the hood. Used to detect and signal when the catalog - * manager needs swapping. - */ - using ConnectionValidationHook = stdx::function<Status( - const HostAndPort& host, const executor::RemoteCommandResponse& isMasterReply)>; - - - /** - * @param commaSeparated should be 3 hosts comma separated - */ - SyncClusterConnection(const std::list<HostAndPort>&, double socketTimeout = 0); - SyncClusterConnection(std::string commaSeparated, double socketTimeout = 0); - SyncClusterConnection(const std::string& a, - const std::string& b, - const std::string& c, - double socketTimeout = 0); - ~SyncClusterConnection(); - - /** - * Sets the ConnectionValidationHook that will be used for the DBClientConnections created by - * all future constructed SyncClusterConnections. - */ - static void setConnectionValidationHook(ConnectionValidationHook hook); - - /** - * @return true if all servers are up and ready for writes - */ - bool prepare(std::string& errmsg); - - // --- from DBClientInterface - - virtual BSONObj findOne(const std::string& ns, - const Query& query, - const BSONObj* fieldsToReturn, - int queryOptions); - - virtual std::unique_ptr<DBClientCursor> query(const std::string& ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize); - - virtual std::unique_ptr<DBClientCursor> getMore(const std::string& ns, - long long cursorId, - int nToReturn, - int options); - - virtual void insert(const std::string& ns, BSONObj obj, int flags = 0); - - virtual void insert(const std::string& ns, const std::vector<BSONObj>& v, int flags = 0); - - virtual void remove(const std::string& ns, Query query, int flags); - - virtual void update(const std::string& ns, Query query, BSONObj obj, int flags); - - virtual bool call(Message& toSend, Message& response, bool assertOk, std::string* actualServer); - virtual void say(Message& toSend, bool isRetry = false, std::string* actualServer = 0); - - virtual void killCursor(long long cursorID); - - virtual std::string getServerAddress() const { - return _address; - } - virtual bool isFailed() const { - return false; - } - virtual bool isStillConnected(); - - int getMinWireVersion() final; - int getMaxWireVersion() final; - - virtual std::string toString() const { - return _toString(); - } - - virtual BSONObj getLastErrorDetailed( - const std::string& db, bool fsync = false, bool j = false, int w = 0, int wtimeout = 0); - virtual BSONObj getLastErrorDetailed(bool fsync = false, - bool j = false, - int w = 0, - int wtimeout = 0); - - virtual ConnectionString::ConnectionType type() const { - return ConnectionString::SYNC; - } - - void setAllSoTimeouts(double socketTimeout); - double getSoTimeout() const { - return _socketTimeout; - } - - - virtual bool lazySupported() const { - return false; - } - - // This override of runCommand intentionally calls findOne to construct a legacy - // OP_QUERY command. The reason for this is that delicate logic for targeting/locking - // config servers is in SyncClusterConnection::findOne, and refactoring that logic - // is both risky and of dubious value as we move to config server replica sets (CSRS). - bool runCommand(const std::string& dbname, - const BSONObj& cmd, - BSONObj& info, - int options) final; - - void setRequestMetadataWriter(rpc::RequestMetadataWriter writer) final; - void setReplyMetadataReader(rpc::ReplyMetadataReader reader) final; - - /** - * Allow custom query processing through an external (e.g. mongos-only) service. - * - * Takes ownership of attached handler. - */ - void attachQueryHandler(QueryHandler* handler); - -protected: - virtual void _auth(const BSONObj& params); - -private: - SyncClusterConnection(SyncClusterConnection& prev, double socketTimeout = 0); - std::string _toString() const; - bool _commandOnActive(const std::string& dbname, - const BSONObj& cmd, - BSONObj& info, - int options = 0); - std::unique_ptr<DBClientCursor> _queryOnActive(const std::string& ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize); - int _lockType(const std::string& name); - void _checkLast(); - void _connect(const std::string& host); - - std::string _address; - std::vector<std::string> _connAddresses; - std::vector<DBClientConnection*> _conns; - - std::vector<BSONObj> _lastErrors; - - // Optionally attached by user - std::unique_ptr<QueryHandler> _customQueryHandler; - - stdx::mutex _mutex; - std::map<std::string, int> _lockTypes; - // End mutex - - double _socketTimeout; -}; - -/** - * Interface for custom query processing for the SCC. - * Allows plugging different host query behaviors for different types of queries. - */ -class SyncClusterConnection::QueryHandler { -public: - virtual ~QueryHandler(){}; - - /** - * Returns true if the query can be processed using this handler. - */ - virtual bool canHandleQuery(const std::string& ns, Query query) = 0; - - /** - * Returns a cursor on one of the hosts with the desired results for the query. - * May throw or return an empty unique_ptr on failure. - */ - virtual std::unique_ptr<DBClientCursor> handleQuery(const std::vector<std::string>& hosts, - const std::string& ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize) = 0; -}; - -class UpdateNotTheSame : public UserException { -public: - UpdateNotTheSame(int code, - const std::string& msg, - const std::vector<std::string>& addrs, - const std::vector<BSONObj>& lastErrors) - : UserException(code, msg), _addrs(addrs), _lastErrors(lastErrors) { - verify(_addrs.size() == _lastErrors.size()); - } - - virtual ~UpdateNotTheSame() throw() {} - - unsigned size() const { - return _addrs.size(); - } - - std::pair<std::string, BSONObj> operator[](unsigned i) const { - return std::make_pair(_addrs[i], _lastErrors[i]); - } - -private: - std::vector<std::string> _addrs; - std::vector<BSONObj> _lastErrors; -}; -}; |