summaryrefslogtreecommitdiff
path: root/src/mongo/client
diff options
context:
space:
mode:
authorMisha Tyulenev <misha@mongodb.com>2016-03-10 17:30:44 -0500
committerMisha Tyulenev <misha@mongodb.com>2016-03-10 18:18:38 -0500
commiteaef6254d3bcb27657de670ec1b9c797965e7c82 (patch)
tree09d29bd80943025732ebd3139deeea0191d8e171 /src/mongo/client
parent41a738c7629cd52b357b0bce6650182219ae9088 (diff)
downloadmongo-eaef6254d3bcb27657de670ec1b9c797965e7c82.tar.gz
SERVER-22320 remove SYNC option and SyncClusterConnection
Diffstat (limited to 'src/mongo/client')
-rw-r--r--src/mongo/client/SConscript1
-rw-r--r--src/mongo/client/connection_string.cpp70
-rw-r--r--src/mongo/client/connection_string.h8
-rw-r--r--src/mongo/client/connection_string_connect.cpp10
-rw-r--r--src/mongo/client/connection_string_test.cpp5
-rw-r--r--src/mongo/client/connpool.cpp6
-rw-r--r--src/mongo/client/dbclientcursor.cpp3
-rw-r--r--src/mongo/client/remote_command_targeter_factory_impl.cpp1
-rw-r--r--src/mongo/client/syncclusterconnection.cpp689
-rw-r--r--src/mongo/client/syncclusterconnection.h272
10 files changed, 32 insertions, 1033 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index 7aeab208ca4..71a65c735a2 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -131,7 +131,6 @@ env.Library(
'global_conn_pool.cpp',
'replica_set_monitor.cpp',
'replica_set_monitor_manager.cpp',
- 'syncclusterconnection.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authcommon',
diff --git a/src/mongo/client/connection_string.cpp b/src/mongo/client/connection_string.cpp
index 69b3f952039..18ee8ea1234 100644
--- a/src/mongo/client/connection_string.cpp
+++ b/src/mongo/client/connection_string.cpp
@@ -47,6 +47,7 @@ ConnectionString::ConnectionString(StringData setName, std::vector<HostAndPort>
_finishInit();
}
+// TODO: unify c-tors
ConnectionString::ConnectionString(ConnectionType type,
const std::string& s,
const std::string& setName) {
@@ -63,18 +64,9 @@ ConnectionString::ConnectionString(ConnectionType type,
_finishInit();
}
-ConnectionString::ConnectionString(const std::string& s, ConnectionType favoredMultipleType) {
+ConnectionString::ConnectionString(const std::string& s, ConnectionType connType)
+ : _type(connType) {
_fillServers(s);
-
- if (_type != INVALID) {
- // set already
- } else if (_servers.size() == 1) {
- _type = MASTER;
- } else {
- _type = favoredMultipleType;
- verify(_type == SET || _type == SYNC);
- }
-
_finishInit();
}
@@ -83,6 +75,7 @@ ConnectionString ConnectionString::forReplicaSet(StringData setName,
return ConnectionString(setName, std::move(servers));
}
+// TODO: rewrite parsing make it more reliable
void ConnectionString::_fillServers(std::string s) {
//
// Custom-handled servers/replica sets start with '$'
@@ -90,34 +83,38 @@ void ConnectionString::_fillServers(std::string s) {
// (also disallows $replicaSetName hosts)
//
- if (s.find('$') == 0)
+ if (s.find('$') == 0) {
_type = CUSTOM;
+ }
- {
- std::string::size_type idx = s.find('/');
- if (idx != std::string::npos) {
- _setName = s.substr(0, idx);
- s = s.substr(idx + 1);
- if (_type != CUSTOM)
- _type = SET;
- }
+ std::string::size_type idx = s.find('/');
+ if (idx != std::string::npos) {
+ _setName = s.substr(0, idx);
+ s = s.substr(idx + 1);
+ if (_type != CUSTOM)
+ _type = SET;
}
- std::string::size_type idx;
while ((idx = s.find(',')) != std::string::npos) {
_servers.push_back(HostAndPort(s.substr(0, idx)));
s = s.substr(idx + 1);
}
+
_servers.push_back(HostAndPort(s));
+
+ if (_servers.size() == 1 && _type == INVALID) {
+ _type = MASTER;
+ }
}
void ConnectionString::_finishInit() {
switch (_type) {
case MASTER:
+ verify(_setName.empty());
verify(_servers.size() == 1);
break;
case SET:
- verify(_setName.size());
+ verify(!_setName.empty());
verify(_servers.size() >= 1); // 1 is ok since we can derive
break;
default:
@@ -149,6 +146,7 @@ void ConnectionString::_finishInit() {
_string = ss.str();
}
+// TODO: SERVER-23014
bool ConnectionString::sameLogicalEndpoint(const ConnectionString& other) const {
if (_type != other._type) {
return false;
@@ -161,26 +159,6 @@ bool ConnectionString::sameLogicalEndpoint(const ConnectionString& other) const
return _servers[0] == other._servers[0];
case SET:
return _setName == other._setName;
- case SYNC:
- // The servers all have to be the same in each, but not in the same order.
- if (_servers.size() != other._servers.size()) {
- return false;
- }
-
- for (unsigned i = 0; i < _servers.size(); i++) {
- bool found = false;
- for (unsigned j = 0; j < other._servers.size(); j++) {
- if (_servers[i] == other._servers[j]) {
- found = true;
- break;
- }
- }
-
- if (!found)
- return false;
- }
-
- return true;
case CUSTOM:
return _string == other._string;
}
@@ -209,9 +187,11 @@ StatusWith<ConnectionString> ConnectionString::parse(const std::string& url) {
return ConnectionString(singleHost);
}
- // Sharding config server
if (numCommas == 2) {
- return ConnectionString(SYNC, url, "");
+ return Status(ErrorCodes::FailedToParse,
+ str::stream() << "mirrored config server connections are not supported; for "
+ "config server replica sets be sure to use the replica set "
+ "connection string");
}
return Status(ErrorCodes::FailedToParse, str::stream() << "invalid url [" << url << "]");
@@ -225,8 +205,6 @@ std::string ConnectionString::typeToString(ConnectionType type) {
return "master";
case SET:
return "set";
- case SYNC:
- return "sync";
case CUSTOM:
return "custom";
}
diff --git a/src/mongo/client/connection_string.h b/src/mongo/client/connection_string.h
index 83e20906ed3..fbb29a829f6 100644
--- a/src/mongo/client/connection_string.h
+++ b/src/mongo/client/connection_string.h
@@ -47,10 +47,6 @@ class DBClientBase;
* server
* server:port
* foo/server:port,server:port SET
- * server,server,server SYNC
- * Warning - you usually don't want "SYNC", it's used
- * for some special things such as sharding config servers.
- * See syncclusterconnection.h for more info.
*
* Typical use:
*
@@ -60,7 +56,7 @@ class DBClientBase;
*/
class ConnectionString {
public:
- enum ConnectionType { INVALID, MASTER, SET, SYNC, CUSTOM };
+ enum ConnectionType { INVALID, MASTER, SET, CUSTOM };
ConnectionString() = default;
@@ -86,7 +82,7 @@ public:
std::vector<HostAndPort> servers,
const std::string& setName);
- ConnectionString(const std::string& s, ConnectionType favoredMultipleType);
+ ConnectionString(const std::string& s, ConnectionType connType);
bool isValid() const {
return _type != INVALID;
diff --git a/src/mongo/client/connection_string_connect.cpp b/src/mongo/client/connection_string_connect.cpp
index efca34792e6..3a8e0287656 100644
--- a/src/mongo/client/connection_string_connect.cpp
+++ b/src/mongo/client/connection_string_connect.cpp
@@ -36,7 +36,6 @@
#include "mongo/client/dbclient_rs.h"
#include "mongo/client/dbclientinterface.h"
-#include "mongo/client/syncclusterconnection.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
@@ -69,15 +68,6 @@ DBClientBase* ConnectionString::connect(std::string& errmsg, double socketTimeou
return set.release();
}
- case SYNC: {
- // TODO , don't copy
- std::list<HostAndPort> l;
- for (unsigned i = 0; i < _servers.size(); i++)
- l.push_back(_servers[i]);
- SyncClusterConnection* c = new SyncClusterConnection(l, socketTimeout);
- return c;
- }
-
case CUSTOM: {
// Lock in case other things are modifying this at the same time
stdx::lock_guard<stdx::mutex> lk(_connectHookMutex);
diff --git a/src/mongo/client/connection_string_test.cpp b/src/mongo/client/connection_string_test.cpp
index 4a1dfd81384..655ff17c207 100644
--- a/src/mongo/client/connection_string_test.cpp
+++ b/src/mongo/client/connection_string_test.cpp
@@ -35,7 +35,8 @@
namespace {
using namespace mongo;
-
+/*
+ TODO: SERVER-23014 SYNC is gone but this can be a good check for equality
TEST(ConnectionString, EqualitySync) {
ConnectionString cs(ConnectionString::SYNC, "a,b,c", "");
@@ -45,5 +46,5 @@ TEST(ConnectionString, EqualitySync) {
ASSERT(!cs.sameLogicalEndpoint(ConnectionString(ConnectionString::SYNC, "d,a,b", "")));
}
-
+*/
} // namespace
diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp
index 2a019db9787..4bdc263cf33 100644
--- a/src/mongo/client/connpool.cpp
+++ b/src/mongo/client/connpool.cpp
@@ -41,7 +41,6 @@
#include "mongo/client/connection_string.h"
#include "mongo/client/global_conn_pool.h"
#include "mongo/client/replica_set_monitor.h"
-#include "mongo/client/syncclusterconnection.h"
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
@@ -474,10 +473,9 @@ void ScopedDbConnection::done() {
void ScopedDbConnection::_setSocketTimeout() {
if (!_conn)
return;
+
if (_conn->type() == ConnectionString::MASTER)
- ((DBClientConnection*)_conn)->setSoTimeout(_socketTimeout);
- else if (_conn->type() == ConnectionString::SYNC)
- ((SyncClusterConnection*)_conn)->setAllSoTimeouts(_socketTimeout);
+ static_cast<DBClientConnection*>(_conn)->setSoTimeout(_socketTimeout);
}
ScopedDbConnection::~ScopedDbConnection() {
diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp
index 2e0a5bdade3..489f647a2ca 100644
--- a/src/mongo/client/dbclientcursor.cpp
+++ b/src/mongo/client/dbclientcursor.cpp
@@ -432,8 +432,7 @@ void DBClientCursor::attach(AScopedConnection* conn) {
verify(conn);
verify(conn->get());
- if (conn->get()->type() == ConnectionString::SET ||
- conn->get()->type() == ConnectionString::SYNC) {
+ if (conn->get()->type() == ConnectionString::SET) {
if (_lazyHost.size() > 0)
_scopedHost = _lazyHost;
else if (_client)
diff --git a/src/mongo/client/remote_command_targeter_factory_impl.cpp b/src/mongo/client/remote_command_targeter_factory_impl.cpp
index 338cde7c504..3c3b1974eac 100644
--- a/src/mongo/client/remote_command_targeter_factory_impl.cpp
+++ b/src/mongo/client/remote_command_targeter_factory_impl.cpp
@@ -54,7 +54,6 @@ std::unique_ptr<RemoteCommandTargeter> RemoteCommandTargeterFactoryImpl::create(
return stdx::make_unique<RemoteCommandTargeterRS>(connStr.getSetName(),
connStr.getServers());
case ConnectionString::INVALID:
- case ConnectionString::SYNC:
// These connections should never be seen
break;
}
diff --git a/src/mongo/client/syncclusterconnection.cpp b/src/mongo/client/syncclusterconnection.cpp
deleted file mode 100644
index 52ed06592ec..00000000000
--- a/src/mongo/client/syncclusterconnection.cpp
+++ /dev/null
@@ -1,689 +0,0 @@
-// syncclusterconnection.cpp
-/*
- * Copyright 2010 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/client/syncclusterconnection.h"
-
-#include "mongo/client/dbclientcursor.h"
-#include "mongo/client/dbclientinterface.h"
-#include "mongo/db/dbmessage.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-
-using std::unique_ptr;
-using std::endl;
-using std::list;
-using std::map;
-using std::string;
-using std::stringstream;
-using std::vector;
-
-namespace {
-SyncClusterConnection::ConnectionValidationHook connectionHook;
-} // namespace
-
-SyncClusterConnection::SyncClusterConnection(const list<HostAndPort>& L, double socketTimeout)
- : _socketTimeout(socketTimeout) {
- {
- stringstream s;
- int n = 0;
- for (list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++) {
- if (++n > 1)
- s << ',';
- s << i->toString();
- }
- _address = s.str();
- }
- for (list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++)
- _connect(i->toString());
-}
-
-SyncClusterConnection::SyncClusterConnection(string commaSeparated, double socketTimeout)
- : _socketTimeout(socketTimeout) {
- _address = commaSeparated;
- string::size_type idx;
- while ((idx = commaSeparated.find(',')) != string::npos) {
- string h = commaSeparated.substr(0, idx);
- commaSeparated = commaSeparated.substr(idx + 1);
- _connect(h);
- }
- _connect(commaSeparated);
- uassert(8004, "SyncClusterConnection needs 3 servers", _conns.size() == 3);
-}
-
-SyncClusterConnection::SyncClusterConnection(const std::string& a,
- const std::string& b,
- const std::string& c,
- double socketTimeout)
- : _socketTimeout(socketTimeout) {
- _address = a + "," + b + "," + c;
- // connect to all even if not working
- _connect(a);
- _connect(b);
- _connect(c);
-}
-
-SyncClusterConnection::SyncClusterConnection(SyncClusterConnection& prev, double socketTimeout)
- : _socketTimeout(socketTimeout) {
- verify(0);
-}
-
-SyncClusterConnection::~SyncClusterConnection() {
- for (size_t i = 0; i < _conns.size(); i++)
- delete _conns[i];
- _conns.clear();
-}
-
-void SyncClusterConnection::setConnectionValidationHook(ConnectionValidationHook hook) {
- connectionHook = std::move(hook);
-}
-
-bool SyncClusterConnection::prepare(string& errmsg) {
- _lastErrors.clear();
-
- bool ok = true;
- errmsg = "";
-
- for (size_t i = 0; i < _conns.size(); i++) {
- string singleErr;
- try {
- _conns[i]->simpleCommand("admin", NULL, "resetError");
- singleErr = _conns[i]->getLastError(true);
-
- if (singleErr.size() == 0)
- continue;
-
- } catch (const DBException& e) {
- if (e.getCode() == ErrorCodes::IncompatibleCatalogManager) {
- throw;
- }
- singleErr = e.toString();
- }
- ok = false;
- errmsg += " " + _conns[i]->toString() + ":" + singleErr;
- }
-
- return ok;
-}
-
-void SyncClusterConnection::_checkLast() {
- _lastErrors.clear();
- vector<string> errors;
-
- for (size_t i = 0; i < _conns.size(); i++) {
- BSONObj res;
- string err;
- try {
- if (!_conns[i]->runCommand("admin", BSON("getlasterror" << 1 << "fsync" << 1), res))
- err = "cmd failed: ";
- } catch (std::exception& e) {
- err += e.what();
- } catch (...) {
- err += "unknown failure";
- }
- _lastErrors.push_back(res.getOwned());
- errors.push_back(err);
- }
-
- verify(_lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size());
-
- stringstream err;
- bool ok = true;
-
- for (size_t i = 0; i < _conns.size(); i++) {
- BSONObj res = _lastErrors[i];
- if (res["ok"].trueValue() &&
- (res["fsyncFiles"].numberInt() > 0 || res.hasElement("waited") ||
- res["syncMillis"].numberInt() >= 0))
- continue;
- ok = false;
- err << _conns[i]->toString() << ": " << res << " " << errors[i];
- }
-
- if (ok)
- return;
- throw UserException(8001, (string) "SyncClusterConnection write op failed: " + err.str());
-}
-
-BSONObj SyncClusterConnection::getLastErrorDetailed(bool fsync, bool j, int w, int wtimeout) {
- return getLastErrorDetailed("admin", fsync, j, w, wtimeout);
-}
-
-BSONObj SyncClusterConnection::getLastErrorDetailed(
- const std::string& db, bool fsync, bool j, int w, int wtimeout) {
- if (_lastErrors.size())
- return _lastErrors[0];
- return DBClientBase::getLastErrorDetailed(db, fsync, j, w, wtimeout);
-}
-
-void SyncClusterConnection::_connect(const std::string& hostStr) {
- log() << "SyncClusterConnection connecting to [" << hostStr << "]" << endl;
- const HostAndPort host(hostStr);
- DBClientConnection* c;
- if (connectionHook) {
- c = new DBClientConnection(
- true, // auto reconnect
- 0, // socket timeout
- [this, host](const executor::RemoteCommandResponse& isMasterReply) {
- return connectionHook(host, isMasterReply);
- });
- } else {
- c = new DBClientConnection(true);
- }
-
- c->setRequestMetadataWriter(getRequestMetadataWriter());
- c->setReplyMetadataReader(getReplyMetadataReader());
- c->setSoTimeout(_socketTimeout);
- Status status = c->connect(host);
- if (!status.isOK()) {
- log() << "SyncClusterConnection connect fail to: " << hostStr << causedBy(status);
- if (status == ErrorCodes::IncompatibleCatalogManager) {
- // Make sure to propagate IncompatibleCatalogManager errors to trigger catalog manager
- // swapping.
- uassertStatusOK(status);
- }
- }
- _connAddresses.push_back(hostStr);
- _conns.push_back(c);
-}
-
-bool SyncClusterConnection::runCommand(const std::string& dbname,
- const BSONObj& cmd,
- BSONObj& info,
- int options) {
- std::string ns = dbname + ".$cmd";
- BSONObj interposedCmd = cmd;
-
- if (getRequestMetadataWriter()) {
- // We have a metadata writer. We need to upconvert the metadata, write to it,
- // Then downconvert it again. This unfortunate, but this code is going to be
- // removed anyway as part of CSRS.
-
- BSONObj upconvertedCommand;
- BSONObj upconvertedMetadata;
-
- std::tie(upconvertedCommand, upconvertedMetadata) =
- uassertStatusOK(rpc::upconvertRequestMetadata(cmd, options));
-
- BSONObjBuilder metadataBob;
- metadataBob.appendElements(upconvertedMetadata);
-
- uassertStatusOK(getRequestMetadataWriter()(&metadataBob, getServerAddress()));
-
- std::tie(interposedCmd, options) = uassertStatusOK(
- rpc::downconvertRequestMetadata(std::move(upconvertedCommand), metadataBob.done()));
- }
-
- BSONObj legacyResult = findOne(ns, Query(interposedCmd), 0, options);
-
- BSONObj upconvertedMetadata;
- BSONObj upconvertedReply;
-
- std::tie(upconvertedReply, upconvertedMetadata) =
- uassertStatusOK(rpc::upconvertReplyMetadata(legacyResult));
-
- if (getReplyMetadataReader()) {
- // TODO: what does getServerAddress() actually mean here as this connection
- // represents a connection to 1 or 3 config servers...
- uassertStatusOK(getReplyMetadataReader()(upconvertedReply, getServerAddress()));
- }
-
- info = upconvertedReply;
-
- return isOk(info);
-}
-
-BSONObj SyncClusterConnection::findOne(const string& ns,
- const Query& query,
- const BSONObj* fieldsToReturn,
- int queryOptions) {
- if (ns.find(".$cmd") != string::npos) {
- string cmdName = query.obj.firstElementFieldName();
-
- int lockType = _lockType(cmdName);
-
- if (lockType > 0) { // write $cmd
- string errmsg;
- if (!prepare(errmsg))
- throw UserException(ErrorCodes::PrepareConfigsFailed,
- (string) "SyncClusterConnection::findOne prepare failed: " +
- errmsg);
-
- vector<BSONObj> all;
- for (size_t i = 0; i < _conns.size(); i++) {
- all.push_back(_conns[i]->findOne(ns, query, 0, queryOptions).getOwned());
- }
-
- _checkLast();
-
- for (size_t i = 0; i < all.size(); i++) {
- Status status = getStatusFromCommandResult(all[i]);
- if (status.isOK()) {
- continue;
- }
-
- stringstream ss;
- ss << "write $cmd failed on a node: " << status.toString();
- ss << " " << _conns[i]->toString();
- ss << " ns: " << ns;
- ss << " cmd: " << query.toString();
- throw UserException(status.code(), ss.str());
- }
-
- return all[0];
- }
- }
-
- return DBClientBase::findOne(ns, query, fieldsToReturn, queryOptions);
-}
-
-void SyncClusterConnection::_auth(const BSONObj& params) {
- // A SCC is authenticated if any connection has been authenticated
- // Credentials are stored in the auto-reconnect connections.
-
- bool authedOnce = false;
- vector<string> errors;
-
- for (vector<DBClientConnection*>::iterator it = _conns.begin(); it < _conns.end(); ++it) {
- massert(15848, "sync cluster of sync clusters?", (*it)->type() != ConnectionString::SYNC);
-
- // Authenticate or collect the error message
- string lastErrmsg;
- bool authed;
- try {
- // Auth errors can manifest either as exceptions or as false results
- // TODO: Make this better
- (*it)->auth(params);
- authed = true;
- } catch (const DBException& e) {
- // auth will be retried on reconnect
- lastErrmsg = e.what();
- authed = false;
- }
-
- if (!authed) {
- // Since we're using auto-reconnect connections, we're sure the auth info has been
- // stored if needed for later
-
- lastErrmsg = str::stream() << "auth error on " << (*it)->getServerAddress()
- << causedBy(lastErrmsg);
-
- LOG(1) << lastErrmsg << endl;
- errors.push_back(lastErrmsg);
- }
-
- authedOnce = authedOnce || authed;
- }
-
- if (authedOnce)
- return;
-
- // Assemble the error message
- str::stream errStream;
- for (vector<string>::iterator it = errors.begin(); it != errors.end(); ++it) {
- if (it != errors.begin())
- errStream << " ::and:: ";
- errStream << *it;
- }
-
- uasserted(ErrorCodes::AuthenticationFailed, errStream);
-}
-
-// TODO: logout is required for use of this class outside of a cluster environment
-
-unique_ptr<DBClientCursor> SyncClusterConnection::query(const string& ns,
- Query query,
- int nToReturn,
- int nToSkip,
- const BSONObj* fieldsToReturn,
- int queryOptions,
- int batchSize) {
- _lastErrors.clear();
- if (ns.find(".$cmd") != string::npos) {
- string cmdName = query.obj.firstElementFieldName();
- int lockType = _lockType(cmdName);
- uassert(13054,
- (string) "write $cmd not supported in SyncClusterConnection::query for:" + cmdName,
- lockType <= 0);
- }
-
- return _queryOnActive(ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize);
-}
-
-bool SyncClusterConnection::_commandOnActive(const string& dbname,
- const BSONObj& cmd,
- BSONObj& info,
- int options) {
- unique_ptr<DBClientCursor> cursor = _queryOnActive(dbname + ".$cmd", cmd, 1, 0, 0, options, 0);
- if (cursor->more())
- info = cursor->next().copy();
- else
- info = BSONObj();
- return isOk(info);
-}
-
-void SyncClusterConnection::attachQueryHandler(QueryHandler* handler) {
- _customQueryHandler.reset(handler);
-}
-
-unique_ptr<DBClientCursor> SyncClusterConnection::_queryOnActive(const string& ns,
- Query query,
- int nToReturn,
- int nToSkip,
- const BSONObj* fieldsToReturn,
- int queryOptions,
- int batchSize) {
- if (_customQueryHandler && _customQueryHandler->canHandleQuery(ns, query)) {
- LOG(2) << "custom query handler used for query on " << ns << ": " << query.toString()
- << endl;
-
- return _customQueryHandler->handleQuery(
- _connAddresses, ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize);
- }
-
- for (size_t i = 0; i < _conns.size(); i++) {
- try {
- unique_ptr<DBClientCursor> cursor = _conns[i]->query(
- ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize);
- if (cursor.get())
- return cursor;
-
- log() << "query on " << ns << ": " << query.toString()
- << " failed to: " << _conns[i]->toString() << " no data" << endl;
- } catch (std::exception& e) {
- log() << "query on " << ns << ": " << query.toString()
- << " failed to: " << _conns[i]->toString() << " exception: " << e.what() << endl;
- } catch (...) {
- log() << "query on " << ns << ": " << query.toString()
- << " failed to: " << _conns[i]->toString() << " exception" << endl;
- }
- }
- throw UserException(ErrorCodes::HostUnreachable,
- str::stream()
- << "all servers down/unreachable when querying: " << _address);
-}
-
-unique_ptr<DBClientCursor> SyncClusterConnection::getMore(const string& ns,
- long long cursorId,
- int nToReturn,
- int options) {
- uassert(10022, "SyncClusterConnection::getMore not supported yet", 0);
- unique_ptr<DBClientCursor> c;
- return c;
-}
-
-void SyncClusterConnection::insert(const string& ns, BSONObj obj, int flags) {
- uassert(13119,
- str::stream() << "SyncClusterConnection::insert obj has to have an _id: " << obj,
- nsToCollectionSubstring(ns) == "system.indexes" || obj["_id"].type());
-
- string errmsg;
- if (!prepare(errmsg))
- throw UserException(8003,
- (string) "SyncClusterConnection::insert prepare failed: " + errmsg);
-
- for (size_t i = 0; i < _conns.size(); i++) {
- _conns[i]->insert(ns, obj, flags);
- }
-
- _checkLast();
-}
-
-void SyncClusterConnection::insert(const string& ns, const vector<BSONObj>& v, int flags) {
- if (v.size() == 1) {
- insert(ns, v[0], flags);
- return;
- }
-
- for (vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it) {
- BSONObj obj = *it;
- if (obj["_id"].type() == EOO) {
- string assertMsg = "SyncClusterConnection::insert (batched) obj misses an _id: ";
- uasserted(16743, assertMsg + obj.jsonString());
- }
- }
-
- // fsync all connections before starting the batch.
- string errmsg;
- if (!prepare(errmsg)) {
- string assertMsg = "SyncClusterConnection::insert (batched) prepare failed: ";
- throw UserException(16744, assertMsg + errmsg);
- }
-
- // We still want one getlasterror per document, even if they're batched.
- for (size_t i = 0; i < _conns.size(); i++) {
- for (vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it) {
- _conns[i]->insert(ns, *it, flags);
- _conns[i]->getLastErrorDetailed();
- }
- }
-
- // We issue a final getlasterror, but this time with an fsync.
- _checkLast();
-}
-
-void SyncClusterConnection::remove(const string& ns, Query query, int flags) {
- string errmsg;
- if (!prepare(errmsg))
- throw UserException(8020,
- (string) "SyncClusterConnection::remove prepare failed: " + errmsg);
-
- for (size_t i = 0; i < _conns.size(); i++) {
- _conns[i]->remove(ns, query, flags);
- }
-
- _checkLast();
-}
-
-void SyncClusterConnection::update(const string& ns, Query query, BSONObj obj, int flags) {
- if (flags & UpdateOption_Upsert) {
- uassert(
- 13120, "SyncClusterConnection::update upsert query needs _id", query.obj["_id"].type());
- }
-
- string errmsg;
- if (!prepare(errmsg)) {
- throw UserException(
- 8005, str::stream() << "SyncClusterConnection::update prepare failed: " << errmsg);
- }
-
- for (size_t i = 0; i < _conns.size(); i++) {
- _conns[i]->update(ns, query, obj, flags);
- }
-
- _checkLast();
- invariant(_lastErrors.size() > 1);
-
- const int a = _lastErrors[0]["n"].numberInt();
-
- for (unsigned i = 1; i < _lastErrors.size(); i++) {
- int b = _lastErrors[i]["n"].numberInt();
-
- if (a == b)
- continue;
-
- throw UpdateNotTheSame(8017,
- str::stream() << "update not consistent "
- << " ns: " << ns << " query: " << query.toString()
- << " update: " << obj << " gle1: " << _lastErrors[0]
- << " gle2: " << _lastErrors[i],
- _connAddresses,
- _lastErrors);
- }
-}
-
-string SyncClusterConnection::_toString() const {
- stringstream ss;
- ss << "SyncClusterConnection ";
- ss << " [";
- for (size_t i = 0; i < _conns.size(); i++) {
- if (i != 0)
- ss << ",";
- if (_conns[i]) {
- ss << _conns[i]->toString();
- } else {
- ss << "(no conn)";
- }
- }
- ss << "]";
- return ss.str();
-}
-
-bool SyncClusterConnection::call(Message& toSend,
- Message& response,
- bool assertOk,
- string* actualServer) {
- uassert(8006,
- "SyncClusterConnection::call can only be used directly for dbQuery",
- toSend.operation() == dbQuery);
-
- DbMessage d(toSend);
- uassert(8007, "SyncClusterConnection::call can't handle $cmd", strstr(d.getns(), "$cmd") == 0);
-
- for (size_t i = 0; i < _conns.size(); i++) {
- try {
- bool ok = _conns[i]->call(toSend, response, assertOk, nullptr);
- if (ok) {
- if (actualServer)
- *actualServer = _connAddresses[i];
- return ok;
- }
- log() << "call failed to: " << _conns[i]->toString() << " no data" << endl;
- } catch (...) {
- log() << "call failed to: " << _conns[i]->toString() << " exception" << endl;
- }
- }
- throw UserException(8008, str::stream() << "all servers down/unreachable: " << _address);
-}
-
-void SyncClusterConnection::say(Message& toSend, bool isRetry, string* actualServer) {
- string errmsg;
- if (!prepare(errmsg))
- throw UserException(13397, (string) "SyncClusterConnection::say prepare failed: " + errmsg);
-
- for (size_t i = 0; i < _conns.size(); i++) {
- _conns[i]->say(toSend);
- }
-
- // TODO: should we set actualServer??
-
- _checkLast();
-}
-
-int SyncClusterConnection::_lockType(const string& name) {
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- map<string, int>::iterator i = _lockTypes.find(name);
- if (i != _lockTypes.end())
- return i->second;
- }
-
- BSONObj info;
- uassert(13053,
- str::stream() << "help failed: " << info,
- _commandOnActive("admin",
- BSON(name << "1"
- << "help" << 1),
- info));
-
- int lockType = info["lockType"].numberInt();
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _lockTypes[name] = lockType;
- return lockType;
-}
-
-void SyncClusterConnection::killCursor(long long cursorID) {
- // should never need to do this
- verify(0);
-}
-
-// A SCC should be reused only if all the existing connections haven't been broken in the
-// background.
-// Note: an SCC may have missing connections if a config server is temporarily offline,
-// but reading from the others is still allowed.
-bool SyncClusterConnection::isStillConnected() {
- for (size_t i = 0; i < _conns.size(); i++) {
- if (_conns[i] && !_conns[i]->isStillConnected())
- return false;
- }
- return true;
-}
-
-int SyncClusterConnection::getMinWireVersion() {
- int minVersion = 0;
- for (const auto& host : _conns) {
- minVersion = std::max(minVersion, host->getMinWireVersion());
- }
- return minVersion;
-}
-
-int SyncClusterConnection::getMaxWireVersion() {
- int maxVersion = std::numeric_limits<int>::max();
- for (const auto& host : _conns) {
- maxVersion = std::min(maxVersion, host->getMaxWireVersion());
- }
- return maxVersion;
-}
-
-void SyncClusterConnection::setAllSoTimeouts(double socketTimeout) {
- _socketTimeout = socketTimeout;
- for (size_t i = 0; i < _conns.size(); i++)
-
- if (_conns[i])
- _conns[i]->setSoTimeout(socketTimeout);
-}
-
-void SyncClusterConnection::setRequestMetadataWriter(rpc::RequestMetadataWriter writer) {
- // Set the hooks in both our sub-connections and in ourselves.
- for (size_t i = 0; i < _conns.size(); ++i) {
- if (_conns[i]) {
- _conns[i]->setRequestMetadataWriter(writer);
- }
- }
- DBClientWithCommands::setRequestMetadataWriter(std::move(writer));
-}
-
-void SyncClusterConnection::setReplyMetadataReader(rpc::ReplyMetadataReader reader) {
- // Set the hooks in both our sub-connections and in ourselves.
- for (size_t i = 0; i < _conns.size(); ++i) {
- if (_conns[i]) {
- _conns[i]->setReplyMetadataReader(reader);
- }
- }
- DBClientWithCommands::setReplyMetadataReader(std::move(reader));
-}
-}
diff --git a/src/mongo/client/syncclusterconnection.h b/src/mongo/client/syncclusterconnection.h
deleted file mode 100644
index 40373d4ae19..00000000000
--- a/src/mongo/client/syncclusterconnection.h
+++ /dev/null
@@ -1,272 +0,0 @@
-// @file syncclusterconnection.h
-
-/*
- * Copyright 2010 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#pragma once
-
-
-#include "mongo/bson/bsonelement.h"
-#include "mongo/bson/bsonobj.h"
-#include "mongo/client/dbclientinterface.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/util/concurrency/mutex.h"
-
-namespace mongo {
-
-/**
- * This is a connection to a cluster of servers that operate as one
- * for super high durability.
- *
- * Write operations are two-phase. First, all nodes are asked to fsync. If successful
- * everywhere, the write is sent everywhere and then followed by an fsync. There is no
- * rollback if a problem occurs during the second phase. Naturally, with all these fsyncs,
- * these operations will be quite slow -- use sparingly.
- *
- * Read operations are sent to a single random node.
- *
- * The class checks if a command is read or write style, and sends to a single
- * node if a read lock command and to all in two phases with a write style command.
- */
-class SyncClusterConnection : public DBClientBase {
-public:
- using DBClientBase::query;
- using DBClientBase::update;
- using DBClientBase::remove;
-
- class QueryHandler;
-
- /**
- * Hook run on the response of an isMaster request. Run on the DBClientConnections that
- * SyncClusterConnection uses under the hood. Used to detect and signal when the catalog
- * manager needs swapping.
- */
- using ConnectionValidationHook = stdx::function<Status(
- const HostAndPort& host, const executor::RemoteCommandResponse& isMasterReply)>;
-
-
- /**
- * @param commaSeparated should be 3 hosts comma separated
- */
- SyncClusterConnection(const std::list<HostAndPort>&, double socketTimeout = 0);
- SyncClusterConnection(std::string commaSeparated, double socketTimeout = 0);
- SyncClusterConnection(const std::string& a,
- const std::string& b,
- const std::string& c,
- double socketTimeout = 0);
- ~SyncClusterConnection();
-
- /**
- * Sets the ConnectionValidationHook that will be used for the DBClientConnections created by
- * all future constructed SyncClusterConnections.
- */
- static void setConnectionValidationHook(ConnectionValidationHook hook);
-
- /**
- * @return true if all servers are up and ready for writes
- */
- bool prepare(std::string& errmsg);
-
- // --- from DBClientInterface
-
- virtual BSONObj findOne(const std::string& ns,
- const Query& query,
- const BSONObj* fieldsToReturn,
- int queryOptions);
-
- virtual std::unique_ptr<DBClientCursor> query(const std::string& ns,
- Query query,
- int nToReturn,
- int nToSkip,
- const BSONObj* fieldsToReturn,
- int queryOptions,
- int batchSize);
-
- virtual std::unique_ptr<DBClientCursor> getMore(const std::string& ns,
- long long cursorId,
- int nToReturn,
- int options);
-
- virtual void insert(const std::string& ns, BSONObj obj, int flags = 0);
-
- virtual void insert(const std::string& ns, const std::vector<BSONObj>& v, int flags = 0);
-
- virtual void remove(const std::string& ns, Query query, int flags);
-
- virtual void update(const std::string& ns, Query query, BSONObj obj, int flags);
-
- virtual bool call(Message& toSend, Message& response, bool assertOk, std::string* actualServer);
- virtual void say(Message& toSend, bool isRetry = false, std::string* actualServer = 0);
-
- virtual void killCursor(long long cursorID);
-
- virtual std::string getServerAddress() const {
- return _address;
- }
- virtual bool isFailed() const {
- return false;
- }
- virtual bool isStillConnected();
-
- int getMinWireVersion() final;
- int getMaxWireVersion() final;
-
- virtual std::string toString() const {
- return _toString();
- }
-
- virtual BSONObj getLastErrorDetailed(
- const std::string& db, bool fsync = false, bool j = false, int w = 0, int wtimeout = 0);
- virtual BSONObj getLastErrorDetailed(bool fsync = false,
- bool j = false,
- int w = 0,
- int wtimeout = 0);
-
- virtual ConnectionString::ConnectionType type() const {
- return ConnectionString::SYNC;
- }
-
- void setAllSoTimeouts(double socketTimeout);
- double getSoTimeout() const {
- return _socketTimeout;
- }
-
-
- virtual bool lazySupported() const {
- return false;
- }
-
- // This override of runCommand intentionally calls findOne to construct a legacy
- // OP_QUERY command. The reason for this is that delicate logic for targeting/locking
- // config servers is in SyncClusterConnection::findOne, and refactoring that logic
- // is both risky and of dubious value as we move to config server replica sets (CSRS).
- bool runCommand(const std::string& dbname,
- const BSONObj& cmd,
- BSONObj& info,
- int options) final;
-
- void setRequestMetadataWriter(rpc::RequestMetadataWriter writer) final;
- void setReplyMetadataReader(rpc::ReplyMetadataReader reader) final;
-
- /**
- * Allow custom query processing through an external (e.g. mongos-only) service.
- *
- * Takes ownership of attached handler.
- */
- void attachQueryHandler(QueryHandler* handler);
-
-protected:
- virtual void _auth(const BSONObj& params);
-
-private:
- SyncClusterConnection(SyncClusterConnection& prev, double socketTimeout = 0);
- std::string _toString() const;
- bool _commandOnActive(const std::string& dbname,
- const BSONObj& cmd,
- BSONObj& info,
- int options = 0);
- std::unique_ptr<DBClientCursor> _queryOnActive(const std::string& ns,
- Query query,
- int nToReturn,
- int nToSkip,
- const BSONObj* fieldsToReturn,
- int queryOptions,
- int batchSize);
- int _lockType(const std::string& name);
- void _checkLast();
- void _connect(const std::string& host);
-
- std::string _address;
- std::vector<std::string> _connAddresses;
- std::vector<DBClientConnection*> _conns;
-
- std::vector<BSONObj> _lastErrors;
-
- // Optionally attached by user
- std::unique_ptr<QueryHandler> _customQueryHandler;
-
- stdx::mutex _mutex;
- std::map<std::string, int> _lockTypes;
- // End mutex
-
- double _socketTimeout;
-};
-
-/**
- * Interface for custom query processing for the SCC.
- * Allows plugging different host query behaviors for different types of queries.
- */
-class SyncClusterConnection::QueryHandler {
-public:
- virtual ~QueryHandler(){};
-
- /**
- * Returns true if the query can be processed using this handler.
- */
- virtual bool canHandleQuery(const std::string& ns, Query query) = 0;
-
- /**
- * Returns a cursor on one of the hosts with the desired results for the query.
- * May throw or return an empty unique_ptr on failure.
- */
- virtual std::unique_ptr<DBClientCursor> handleQuery(const std::vector<std::string>& hosts,
- const std::string& ns,
- Query query,
- int nToReturn,
- int nToSkip,
- const BSONObj* fieldsToReturn,
- int queryOptions,
- int batchSize) = 0;
-};
-
-class UpdateNotTheSame : public UserException {
-public:
- UpdateNotTheSame(int code,
- const std::string& msg,
- const std::vector<std::string>& addrs,
- const std::vector<BSONObj>& lastErrors)
- : UserException(code, msg), _addrs(addrs), _lastErrors(lastErrors) {
- verify(_addrs.size() == _lastErrors.size());
- }
-
- virtual ~UpdateNotTheSame() throw() {}
-
- unsigned size() const {
- return _addrs.size();
- }
-
- std::pair<std::string, BSONObj> operator[](unsigned i) const {
- return std::make_pair(_addrs[i], _lastErrors[i]);
- }
-
-private:
- std::vector<std::string> _addrs;
- std::vector<BSONObj> _lastErrors;
-};
-};