diff options
author | Misha Tyulenev <misha@mongodb.com> | 2016-03-10 17:30:44 -0500 |
---|---|---|
committer | Misha Tyulenev <misha@mongodb.com> | 2016-03-10 18:18:38 -0500 |
commit | eaef6254d3bcb27657de670ec1b9c797965e7c82 (patch) | |
tree | 09d29bd80943025732ebd3139deeea0191d8e171 /src | |
parent | 41a738c7629cd52b357b0bce6650182219ae9088 (diff) | |
download | mongo-eaef6254d3bcb27657de670ec1b9c797965e7c82.tar.gz |
SERVER-22320 remove SYNC option and SyncClusterConnection
Diffstat (limited to 'src')
26 files changed, 119 insertions, 1612 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 7aeab208ca4..71a65c735a2 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -131,7 +131,6 @@ env.Library( 'global_conn_pool.cpp', 'replica_set_monitor.cpp', 'replica_set_monitor_manager.cpp', - 'syncclusterconnection.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authcommon', diff --git a/src/mongo/client/connection_string.cpp b/src/mongo/client/connection_string.cpp index 69b3f952039..18ee8ea1234 100644 --- a/src/mongo/client/connection_string.cpp +++ b/src/mongo/client/connection_string.cpp @@ -47,6 +47,7 @@ ConnectionString::ConnectionString(StringData setName, std::vector<HostAndPort> _finishInit(); } +// TODO: unify c-tors ConnectionString::ConnectionString(ConnectionType type, const std::string& s, const std::string& setName) { @@ -63,18 +64,9 @@ ConnectionString::ConnectionString(ConnectionType type, _finishInit(); } -ConnectionString::ConnectionString(const std::string& s, ConnectionType favoredMultipleType) { +ConnectionString::ConnectionString(const std::string& s, ConnectionType connType) + : _type(connType) { _fillServers(s); - - if (_type != INVALID) { - // set already - } else if (_servers.size() == 1) { - _type = MASTER; - } else { - _type = favoredMultipleType; - verify(_type == SET || _type == SYNC); - } - _finishInit(); } @@ -83,6 +75,7 @@ ConnectionString ConnectionString::forReplicaSet(StringData setName, return ConnectionString(setName, std::move(servers)); } +// TODO: rewrite parsing make it more reliable void ConnectionString::_fillServers(std::string s) { // // Custom-handled servers/replica sets start with '$' @@ -90,34 +83,38 @@ void ConnectionString::_fillServers(std::string s) { // (also disallows $replicaSetName hosts) // - if (s.find('$') == 0) + if (s.find('$') == 0) { _type = CUSTOM; + } - { - std::string::size_type idx = s.find('/'); - if (idx != std::string::npos) { - _setName = s.substr(0, idx); - s = s.substr(idx + 1); - if (_type != CUSTOM) - _type = SET; - } + std::string::size_type idx = s.find('/'); + if (idx != std::string::npos) { + _setName = s.substr(0, idx); + s = s.substr(idx + 1); + if (_type != CUSTOM) + _type = SET; } - std::string::size_type idx; while ((idx = s.find(',')) != std::string::npos) { _servers.push_back(HostAndPort(s.substr(0, idx))); s = s.substr(idx + 1); } + _servers.push_back(HostAndPort(s)); + + if (_servers.size() == 1 && _type == INVALID) { + _type = MASTER; + } } void ConnectionString::_finishInit() { switch (_type) { case MASTER: + verify(_setName.empty()); verify(_servers.size() == 1); break; case SET: - verify(_setName.size()); + verify(!_setName.empty()); verify(_servers.size() >= 1); // 1 is ok since we can derive break; default: @@ -149,6 +146,7 @@ void ConnectionString::_finishInit() { _string = ss.str(); } +// TODO: SERVER-23014 bool ConnectionString::sameLogicalEndpoint(const ConnectionString& other) const { if (_type != other._type) { return false; @@ -161,26 +159,6 @@ bool ConnectionString::sameLogicalEndpoint(const ConnectionString& other) const return _servers[0] == other._servers[0]; case SET: return _setName == other._setName; - case SYNC: - // The servers all have to be the same in each, but not in the same order. - if (_servers.size() != other._servers.size()) { - return false; - } - - for (unsigned i = 0; i < _servers.size(); i++) { - bool found = false; - for (unsigned j = 0; j < other._servers.size(); j++) { - if (_servers[i] == other._servers[j]) { - found = true; - break; - } - } - - if (!found) - return false; - } - - return true; case CUSTOM: return _string == other._string; } @@ -209,9 +187,11 @@ StatusWith<ConnectionString> ConnectionString::parse(const std::string& url) { return ConnectionString(singleHost); } - // Sharding config server if (numCommas == 2) { - return ConnectionString(SYNC, url, ""); + return Status(ErrorCodes::FailedToParse, + str::stream() << "mirrored config server connections are not supported; for " + "config server replica sets be sure to use the replica set " + "connection string"); } return Status(ErrorCodes::FailedToParse, str::stream() << "invalid url [" << url << "]"); @@ -225,8 +205,6 @@ std::string ConnectionString::typeToString(ConnectionType type) { return "master"; case SET: return "set"; - case SYNC: - return "sync"; case CUSTOM: return "custom"; } diff --git a/src/mongo/client/connection_string.h b/src/mongo/client/connection_string.h index 83e20906ed3..fbb29a829f6 100644 --- a/src/mongo/client/connection_string.h +++ b/src/mongo/client/connection_string.h @@ -47,10 +47,6 @@ class DBClientBase; * server * server:port * foo/server:port,server:port SET - * server,server,server SYNC - * Warning - you usually don't want "SYNC", it's used - * for some special things such as sharding config servers. - * See syncclusterconnection.h for more info. * * Typical use: * @@ -60,7 +56,7 @@ class DBClientBase; */ class ConnectionString { public: - enum ConnectionType { INVALID, MASTER, SET, SYNC, CUSTOM }; + enum ConnectionType { INVALID, MASTER, SET, CUSTOM }; ConnectionString() = default; @@ -86,7 +82,7 @@ public: std::vector<HostAndPort> servers, const std::string& setName); - ConnectionString(const std::string& s, ConnectionType favoredMultipleType); + ConnectionString(const std::string& s, ConnectionType connType); bool isValid() const { return _type != INVALID; diff --git a/src/mongo/client/connection_string_connect.cpp b/src/mongo/client/connection_string_connect.cpp index efca34792e6..3a8e0287656 100644 --- a/src/mongo/client/connection_string_connect.cpp +++ b/src/mongo/client/connection_string_connect.cpp @@ -36,7 +36,6 @@ #include "mongo/client/dbclient_rs.h" #include "mongo/client/dbclientinterface.h" -#include "mongo/client/syncclusterconnection.h" #include "mongo/stdx/memory.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -69,15 +68,6 @@ DBClientBase* ConnectionString::connect(std::string& errmsg, double socketTimeou return set.release(); } - case SYNC: { - // TODO , don't copy - std::list<HostAndPort> l; - for (unsigned i = 0; i < _servers.size(); i++) - l.push_back(_servers[i]); - SyncClusterConnection* c = new SyncClusterConnection(l, socketTimeout); - return c; - } - case CUSTOM: { // Lock in case other things are modifying this at the same time stdx::lock_guard<stdx::mutex> lk(_connectHookMutex); diff --git a/src/mongo/client/connection_string_test.cpp b/src/mongo/client/connection_string_test.cpp index 4a1dfd81384..655ff17c207 100644 --- a/src/mongo/client/connection_string_test.cpp +++ b/src/mongo/client/connection_string_test.cpp @@ -35,7 +35,8 @@ namespace { using namespace mongo; - +/* + TODO: SERVER-23014 SYNC is gone but this can be a good check for equality TEST(ConnectionString, EqualitySync) { ConnectionString cs(ConnectionString::SYNC, "a,b,c", ""); @@ -45,5 +46,5 @@ TEST(ConnectionString, EqualitySync) { ASSERT(!cs.sameLogicalEndpoint(ConnectionString(ConnectionString::SYNC, "d,a,b", ""))); } - +*/ } // namespace diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp index 2a019db9787..4bdc263cf33 100644 --- a/src/mongo/client/connpool.cpp +++ b/src/mongo/client/connpool.cpp @@ -41,7 +41,6 @@ #include "mongo/client/connection_string.h" #include "mongo/client/global_conn_pool.h" #include "mongo/client/replica_set_monitor.h" -#include "mongo/client/syncclusterconnection.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" @@ -474,10 +473,9 @@ void ScopedDbConnection::done() { void ScopedDbConnection::_setSocketTimeout() { if (!_conn) return; + if (_conn->type() == ConnectionString::MASTER) - ((DBClientConnection*)_conn)->setSoTimeout(_socketTimeout); - else if (_conn->type() == ConnectionString::SYNC) - ((SyncClusterConnection*)_conn)->setAllSoTimeouts(_socketTimeout); + static_cast<DBClientConnection*>(_conn)->setSoTimeout(_socketTimeout); } ScopedDbConnection::~ScopedDbConnection() { diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp index 2e0a5bdade3..489f647a2ca 100644 --- a/src/mongo/client/dbclientcursor.cpp +++ b/src/mongo/client/dbclientcursor.cpp @@ -432,8 +432,7 @@ void DBClientCursor::attach(AScopedConnection* conn) { verify(conn); verify(conn->get()); - if (conn->get()->type() == ConnectionString::SET || - conn->get()->type() == ConnectionString::SYNC) { + if (conn->get()->type() == ConnectionString::SET) { if (_lazyHost.size() > 0) _scopedHost = _lazyHost; else if (_client) diff --git a/src/mongo/client/remote_command_targeter_factory_impl.cpp b/src/mongo/client/remote_command_targeter_factory_impl.cpp index 338cde7c504..3c3b1974eac 100644 --- a/src/mongo/client/remote_command_targeter_factory_impl.cpp +++ b/src/mongo/client/remote_command_targeter_factory_impl.cpp @@ -54,7 +54,6 @@ std::unique_ptr<RemoteCommandTargeter> RemoteCommandTargeterFactoryImpl::create( return stdx::make_unique<RemoteCommandTargeterRS>(connStr.getSetName(), connStr.getServers()); case ConnectionString::INVALID: - case ConnectionString::SYNC: // These connections should never be seen break; } diff --git a/src/mongo/client/syncclusterconnection.cpp b/src/mongo/client/syncclusterconnection.cpp deleted file mode 100644 index 52ed06592ec..00000000000 --- a/src/mongo/client/syncclusterconnection.cpp +++ /dev/null @@ -1,689 +0,0 @@ -// syncclusterconnection.cpp -/* - * Copyright 2010 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork - -#include "mongo/platform/basic.h" - -#include "mongo/client/syncclusterconnection.h" - -#include "mongo/client/dbclientcursor.h" -#include "mongo/client/dbclientinterface.h" -#include "mongo/db/dbmessage.h" -#include "mongo/db/namespace_string.h" -#include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/util/log.h" - -namespace mongo { - -using std::unique_ptr; -using std::endl; -using std::list; -using std::map; -using std::string; -using std::stringstream; -using std::vector; - -namespace { -SyncClusterConnection::ConnectionValidationHook connectionHook; -} // namespace - -SyncClusterConnection::SyncClusterConnection(const list<HostAndPort>& L, double socketTimeout) - : _socketTimeout(socketTimeout) { - { - stringstream s; - int n = 0; - for (list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++) { - if (++n > 1) - s << ','; - s << i->toString(); - } - _address = s.str(); - } - for (list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++) - _connect(i->toString()); -} - -SyncClusterConnection::SyncClusterConnection(string commaSeparated, double socketTimeout) - : _socketTimeout(socketTimeout) { - _address = commaSeparated; - string::size_type idx; - while ((idx = commaSeparated.find(',')) != string::npos) { - string h = commaSeparated.substr(0, idx); - commaSeparated = commaSeparated.substr(idx + 1); - _connect(h); - } - _connect(commaSeparated); - uassert(8004, "SyncClusterConnection needs 3 servers", _conns.size() == 3); -} - -SyncClusterConnection::SyncClusterConnection(const std::string& a, - const std::string& b, - const std::string& c, - double socketTimeout) - : _socketTimeout(socketTimeout) { - _address = a + "," + b + "," + c; - // connect to all even if not working - _connect(a); - _connect(b); - _connect(c); -} - -SyncClusterConnection::SyncClusterConnection(SyncClusterConnection& prev, double socketTimeout) - : _socketTimeout(socketTimeout) { - verify(0); -} - -SyncClusterConnection::~SyncClusterConnection() { - for (size_t i = 0; i < _conns.size(); i++) - delete _conns[i]; - _conns.clear(); -} - -void SyncClusterConnection::setConnectionValidationHook(ConnectionValidationHook hook) { - connectionHook = std::move(hook); -} - -bool SyncClusterConnection::prepare(string& errmsg) { - _lastErrors.clear(); - - bool ok = true; - errmsg = ""; - - for (size_t i = 0; i < _conns.size(); i++) { - string singleErr; - try { - _conns[i]->simpleCommand("admin", NULL, "resetError"); - singleErr = _conns[i]->getLastError(true); - - if (singleErr.size() == 0) - continue; - - } catch (const DBException& e) { - if (e.getCode() == ErrorCodes::IncompatibleCatalogManager) { - throw; - } - singleErr = e.toString(); - } - ok = false; - errmsg += " " + _conns[i]->toString() + ":" + singleErr; - } - - return ok; -} - -void SyncClusterConnection::_checkLast() { - _lastErrors.clear(); - vector<string> errors; - - for (size_t i = 0; i < _conns.size(); i++) { - BSONObj res; - string err; - try { - if (!_conns[i]->runCommand("admin", BSON("getlasterror" << 1 << "fsync" << 1), res)) - err = "cmd failed: "; - } catch (std::exception& e) { - err += e.what(); - } catch (...) { - err += "unknown failure"; - } - _lastErrors.push_back(res.getOwned()); - errors.push_back(err); - } - - verify(_lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size()); - - stringstream err; - bool ok = true; - - for (size_t i = 0; i < _conns.size(); i++) { - BSONObj res = _lastErrors[i]; - if (res["ok"].trueValue() && - (res["fsyncFiles"].numberInt() > 0 || res.hasElement("waited") || - res["syncMillis"].numberInt() >= 0)) - continue; - ok = false; - err << _conns[i]->toString() << ": " << res << " " << errors[i]; - } - - if (ok) - return; - throw UserException(8001, (string) "SyncClusterConnection write op failed: " + err.str()); -} - -BSONObj SyncClusterConnection::getLastErrorDetailed(bool fsync, bool j, int w, int wtimeout) { - return getLastErrorDetailed("admin", fsync, j, w, wtimeout); -} - -BSONObj SyncClusterConnection::getLastErrorDetailed( - const std::string& db, bool fsync, bool j, int w, int wtimeout) { - if (_lastErrors.size()) - return _lastErrors[0]; - return DBClientBase::getLastErrorDetailed(db, fsync, j, w, wtimeout); -} - -void SyncClusterConnection::_connect(const std::string& hostStr) { - log() << "SyncClusterConnection connecting to [" << hostStr << "]" << endl; - const HostAndPort host(hostStr); - DBClientConnection* c; - if (connectionHook) { - c = new DBClientConnection( - true, // auto reconnect - 0, // socket timeout - [this, host](const executor::RemoteCommandResponse& isMasterReply) { - return connectionHook(host, isMasterReply); - }); - } else { - c = new DBClientConnection(true); - } - - c->setRequestMetadataWriter(getRequestMetadataWriter()); - c->setReplyMetadataReader(getReplyMetadataReader()); - c->setSoTimeout(_socketTimeout); - Status status = c->connect(host); - if (!status.isOK()) { - log() << "SyncClusterConnection connect fail to: " << hostStr << causedBy(status); - if (status == ErrorCodes::IncompatibleCatalogManager) { - // Make sure to propagate IncompatibleCatalogManager errors to trigger catalog manager - // swapping. - uassertStatusOK(status); - } - } - _connAddresses.push_back(hostStr); - _conns.push_back(c); -} - -bool SyncClusterConnection::runCommand(const std::string& dbname, - const BSONObj& cmd, - BSONObj& info, - int options) { - std::string ns = dbname + ".$cmd"; - BSONObj interposedCmd = cmd; - - if (getRequestMetadataWriter()) { - // We have a metadata writer. We need to upconvert the metadata, write to it, - // Then downconvert it again. This unfortunate, but this code is going to be - // removed anyway as part of CSRS. - - BSONObj upconvertedCommand; - BSONObj upconvertedMetadata; - - std::tie(upconvertedCommand, upconvertedMetadata) = - uassertStatusOK(rpc::upconvertRequestMetadata(cmd, options)); - - BSONObjBuilder metadataBob; - metadataBob.appendElements(upconvertedMetadata); - - uassertStatusOK(getRequestMetadataWriter()(&metadataBob, getServerAddress())); - - std::tie(interposedCmd, options) = uassertStatusOK( - rpc::downconvertRequestMetadata(std::move(upconvertedCommand), metadataBob.done())); - } - - BSONObj legacyResult = findOne(ns, Query(interposedCmd), 0, options); - - BSONObj upconvertedMetadata; - BSONObj upconvertedReply; - - std::tie(upconvertedReply, upconvertedMetadata) = - uassertStatusOK(rpc::upconvertReplyMetadata(legacyResult)); - - if (getReplyMetadataReader()) { - // TODO: what does getServerAddress() actually mean here as this connection - // represents a connection to 1 or 3 config servers... - uassertStatusOK(getReplyMetadataReader()(upconvertedReply, getServerAddress())); - } - - info = upconvertedReply; - - return isOk(info); -} - -BSONObj SyncClusterConnection::findOne(const string& ns, - const Query& query, - const BSONObj* fieldsToReturn, - int queryOptions) { - if (ns.find(".$cmd") != string::npos) { - string cmdName = query.obj.firstElementFieldName(); - - int lockType = _lockType(cmdName); - - if (lockType > 0) { // write $cmd - string errmsg; - if (!prepare(errmsg)) - throw UserException(ErrorCodes::PrepareConfigsFailed, - (string) "SyncClusterConnection::findOne prepare failed: " + - errmsg); - - vector<BSONObj> all; - for (size_t i = 0; i < _conns.size(); i++) { - all.push_back(_conns[i]->findOne(ns, query, 0, queryOptions).getOwned()); - } - - _checkLast(); - - for (size_t i = 0; i < all.size(); i++) { - Status status = getStatusFromCommandResult(all[i]); - if (status.isOK()) { - continue; - } - - stringstream ss; - ss << "write $cmd failed on a node: " << status.toString(); - ss << " " << _conns[i]->toString(); - ss << " ns: " << ns; - ss << " cmd: " << query.toString(); - throw UserException(status.code(), ss.str()); - } - - return all[0]; - } - } - - return DBClientBase::findOne(ns, query, fieldsToReturn, queryOptions); -} - -void SyncClusterConnection::_auth(const BSONObj& params) { - // A SCC is authenticated if any connection has been authenticated - // Credentials are stored in the auto-reconnect connections. - - bool authedOnce = false; - vector<string> errors; - - for (vector<DBClientConnection*>::iterator it = _conns.begin(); it < _conns.end(); ++it) { - massert(15848, "sync cluster of sync clusters?", (*it)->type() != ConnectionString::SYNC); - - // Authenticate or collect the error message - string lastErrmsg; - bool authed; - try { - // Auth errors can manifest either as exceptions or as false results - // TODO: Make this better - (*it)->auth(params); - authed = true; - } catch (const DBException& e) { - // auth will be retried on reconnect - lastErrmsg = e.what(); - authed = false; - } - - if (!authed) { - // Since we're using auto-reconnect connections, we're sure the auth info has been - // stored if needed for later - - lastErrmsg = str::stream() << "auth error on " << (*it)->getServerAddress() - << causedBy(lastErrmsg); - - LOG(1) << lastErrmsg << endl; - errors.push_back(lastErrmsg); - } - - authedOnce = authedOnce || authed; - } - - if (authedOnce) - return; - - // Assemble the error message - str::stream errStream; - for (vector<string>::iterator it = errors.begin(); it != errors.end(); ++it) { - if (it != errors.begin()) - errStream << " ::and:: "; - errStream << *it; - } - - uasserted(ErrorCodes::AuthenticationFailed, errStream); -} - -// TODO: logout is required for use of this class outside of a cluster environment - -unique_ptr<DBClientCursor> SyncClusterConnection::query(const string& ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize) { - _lastErrors.clear(); - if (ns.find(".$cmd") != string::npos) { - string cmdName = query.obj.firstElementFieldName(); - int lockType = _lockType(cmdName); - uassert(13054, - (string) "write $cmd not supported in SyncClusterConnection::query for:" + cmdName, - lockType <= 0); - } - - return _queryOnActive(ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); -} - -bool SyncClusterConnection::_commandOnActive(const string& dbname, - const BSONObj& cmd, - BSONObj& info, - int options) { - unique_ptr<DBClientCursor> cursor = _queryOnActive(dbname + ".$cmd", cmd, 1, 0, 0, options, 0); - if (cursor->more()) - info = cursor->next().copy(); - else - info = BSONObj(); - return isOk(info); -} - -void SyncClusterConnection::attachQueryHandler(QueryHandler* handler) { - _customQueryHandler.reset(handler); -} - -unique_ptr<DBClientCursor> SyncClusterConnection::_queryOnActive(const string& ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize) { - if (_customQueryHandler && _customQueryHandler->canHandleQuery(ns, query)) { - LOG(2) << "custom query handler used for query on " << ns << ": " << query.toString() - << endl; - - return _customQueryHandler->handleQuery( - _connAddresses, ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); - } - - for (size_t i = 0; i < _conns.size(); i++) { - try { - unique_ptr<DBClientCursor> cursor = _conns[i]->query( - ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); - if (cursor.get()) - return cursor; - - log() << "query on " << ns << ": " << query.toString() - << " failed to: " << _conns[i]->toString() << " no data" << endl; - } catch (std::exception& e) { - log() << "query on " << ns << ": " << query.toString() - << " failed to: " << _conns[i]->toString() << " exception: " << e.what() << endl; - } catch (...) { - log() << "query on " << ns << ": " << query.toString() - << " failed to: " << _conns[i]->toString() << " exception" << endl; - } - } - throw UserException(ErrorCodes::HostUnreachable, - str::stream() - << "all servers down/unreachable when querying: " << _address); -} - -unique_ptr<DBClientCursor> SyncClusterConnection::getMore(const string& ns, - long long cursorId, - int nToReturn, - int options) { - uassert(10022, "SyncClusterConnection::getMore not supported yet", 0); - unique_ptr<DBClientCursor> c; - return c; -} - -void SyncClusterConnection::insert(const string& ns, BSONObj obj, int flags) { - uassert(13119, - str::stream() << "SyncClusterConnection::insert obj has to have an _id: " << obj, - nsToCollectionSubstring(ns) == "system.indexes" || obj["_id"].type()); - - string errmsg; - if (!prepare(errmsg)) - throw UserException(8003, - (string) "SyncClusterConnection::insert prepare failed: " + errmsg); - - for (size_t i = 0; i < _conns.size(); i++) { - _conns[i]->insert(ns, obj, flags); - } - - _checkLast(); -} - -void SyncClusterConnection::insert(const string& ns, const vector<BSONObj>& v, int flags) { - if (v.size() == 1) { - insert(ns, v[0], flags); - return; - } - - for (vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it) { - BSONObj obj = *it; - if (obj["_id"].type() == EOO) { - string assertMsg = "SyncClusterConnection::insert (batched) obj misses an _id: "; - uasserted(16743, assertMsg + obj.jsonString()); - } - } - - // fsync all connections before starting the batch. - string errmsg; - if (!prepare(errmsg)) { - string assertMsg = "SyncClusterConnection::insert (batched) prepare failed: "; - throw UserException(16744, assertMsg + errmsg); - } - - // We still want one getlasterror per document, even if they're batched. - for (size_t i = 0; i < _conns.size(); i++) { - for (vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it) { - _conns[i]->insert(ns, *it, flags); - _conns[i]->getLastErrorDetailed(); - } - } - - // We issue a final getlasterror, but this time with an fsync. - _checkLast(); -} - -void SyncClusterConnection::remove(const string& ns, Query query, int flags) { - string errmsg; - if (!prepare(errmsg)) - throw UserException(8020, - (string) "SyncClusterConnection::remove prepare failed: " + errmsg); - - for (size_t i = 0; i < _conns.size(); i++) { - _conns[i]->remove(ns, query, flags); - } - - _checkLast(); -} - -void SyncClusterConnection::update(const string& ns, Query query, BSONObj obj, int flags) { - if (flags & UpdateOption_Upsert) { - uassert( - 13120, "SyncClusterConnection::update upsert query needs _id", query.obj["_id"].type()); - } - - string errmsg; - if (!prepare(errmsg)) { - throw UserException( - 8005, str::stream() << "SyncClusterConnection::update prepare failed: " << errmsg); - } - - for (size_t i = 0; i < _conns.size(); i++) { - _conns[i]->update(ns, query, obj, flags); - } - - _checkLast(); - invariant(_lastErrors.size() > 1); - - const int a = _lastErrors[0]["n"].numberInt(); - - for (unsigned i = 1; i < _lastErrors.size(); i++) { - int b = _lastErrors[i]["n"].numberInt(); - - if (a == b) - continue; - - throw UpdateNotTheSame(8017, - str::stream() << "update not consistent " - << " ns: " << ns << " query: " << query.toString() - << " update: " << obj << " gle1: " << _lastErrors[0] - << " gle2: " << _lastErrors[i], - _connAddresses, - _lastErrors); - } -} - -string SyncClusterConnection::_toString() const { - stringstream ss; - ss << "SyncClusterConnection "; - ss << " ["; - for (size_t i = 0; i < _conns.size(); i++) { - if (i != 0) - ss << ","; - if (_conns[i]) { - ss << _conns[i]->toString(); - } else { - ss << "(no conn)"; - } - } - ss << "]"; - return ss.str(); -} - -bool SyncClusterConnection::call(Message& toSend, - Message& response, - bool assertOk, - string* actualServer) { - uassert(8006, - "SyncClusterConnection::call can only be used directly for dbQuery", - toSend.operation() == dbQuery); - - DbMessage d(toSend); - uassert(8007, "SyncClusterConnection::call can't handle $cmd", strstr(d.getns(), "$cmd") == 0); - - for (size_t i = 0; i < _conns.size(); i++) { - try { - bool ok = _conns[i]->call(toSend, response, assertOk, nullptr); - if (ok) { - if (actualServer) - *actualServer = _connAddresses[i]; - return ok; - } - log() << "call failed to: " << _conns[i]->toString() << " no data" << endl; - } catch (...) { - log() << "call failed to: " << _conns[i]->toString() << " exception" << endl; - } - } - throw UserException(8008, str::stream() << "all servers down/unreachable: " << _address); -} - -void SyncClusterConnection::say(Message& toSend, bool isRetry, string* actualServer) { - string errmsg; - if (!prepare(errmsg)) - throw UserException(13397, (string) "SyncClusterConnection::say prepare failed: " + errmsg); - - for (size_t i = 0; i < _conns.size(); i++) { - _conns[i]->say(toSend); - } - - // TODO: should we set actualServer?? - - _checkLast(); -} - -int SyncClusterConnection::_lockType(const string& name) { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - map<string, int>::iterator i = _lockTypes.find(name); - if (i != _lockTypes.end()) - return i->second; - } - - BSONObj info; - uassert(13053, - str::stream() << "help failed: " << info, - _commandOnActive("admin", - BSON(name << "1" - << "help" << 1), - info)); - - int lockType = info["lockType"].numberInt(); - - stdx::lock_guard<stdx::mutex> lk(_mutex); - _lockTypes[name] = lockType; - return lockType; -} - -void SyncClusterConnection::killCursor(long long cursorID) { - // should never need to do this - verify(0); -} - -// A SCC should be reused only if all the existing connections haven't been broken in the -// background. -// Note: an SCC may have missing connections if a config server is temporarily offline, -// but reading from the others is still allowed. -bool SyncClusterConnection::isStillConnected() { - for (size_t i = 0; i < _conns.size(); i++) { - if (_conns[i] && !_conns[i]->isStillConnected()) - return false; - } - return true; -} - -int SyncClusterConnection::getMinWireVersion() { - int minVersion = 0; - for (const auto& host : _conns) { - minVersion = std::max(minVersion, host->getMinWireVersion()); - } - return minVersion; -} - -int SyncClusterConnection::getMaxWireVersion() { - int maxVersion = std::numeric_limits<int>::max(); - for (const auto& host : _conns) { - maxVersion = std::min(maxVersion, host->getMaxWireVersion()); - } - return maxVersion; -} - -void SyncClusterConnection::setAllSoTimeouts(double socketTimeout) { - _socketTimeout = socketTimeout; - for (size_t i = 0; i < _conns.size(); i++) - - if (_conns[i]) - _conns[i]->setSoTimeout(socketTimeout); -} - -void SyncClusterConnection::setRequestMetadataWriter(rpc::RequestMetadataWriter writer) { - // Set the hooks in both our sub-connections and in ourselves. - for (size_t i = 0; i < _conns.size(); ++i) { - if (_conns[i]) { - _conns[i]->setRequestMetadataWriter(writer); - } - } - DBClientWithCommands::setRequestMetadataWriter(std::move(writer)); -} - -void SyncClusterConnection::setReplyMetadataReader(rpc::ReplyMetadataReader reader) { - // Set the hooks in both our sub-connections and in ourselves. - for (size_t i = 0; i < _conns.size(); ++i) { - if (_conns[i]) { - _conns[i]->setReplyMetadataReader(reader); - } - } - DBClientWithCommands::setReplyMetadataReader(std::move(reader)); -} -} diff --git a/src/mongo/client/syncclusterconnection.h b/src/mongo/client/syncclusterconnection.h deleted file mode 100644 index 40373d4ae19..00000000000 --- a/src/mongo/client/syncclusterconnection.h +++ /dev/null @@ -1,272 +0,0 @@ -// @file syncclusterconnection.h - -/* - * Copyright 2010 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - - -#include "mongo/bson/bsonelement.h" -#include "mongo/bson/bsonobj.h" -#include "mongo/client/dbclientinterface.h" -#include "mongo/stdx/mutex.h" -#include "mongo/stdx/functional.h" -#include "mongo/util/concurrency/mutex.h" - -namespace mongo { - -/** - * This is a connection to a cluster of servers that operate as one - * for super high durability. - * - * Write operations are two-phase. First, all nodes are asked to fsync. If successful - * everywhere, the write is sent everywhere and then followed by an fsync. There is no - * rollback if a problem occurs during the second phase. Naturally, with all these fsyncs, - * these operations will be quite slow -- use sparingly. - * - * Read operations are sent to a single random node. - * - * The class checks if a command is read or write style, and sends to a single - * node if a read lock command and to all in two phases with a write style command. - */ -class SyncClusterConnection : public DBClientBase { -public: - using DBClientBase::query; - using DBClientBase::update; - using DBClientBase::remove; - - class QueryHandler; - - /** - * Hook run on the response of an isMaster request. Run on the DBClientConnections that - * SyncClusterConnection uses under the hood. Used to detect and signal when the catalog - * manager needs swapping. - */ - using ConnectionValidationHook = stdx::function<Status( - const HostAndPort& host, const executor::RemoteCommandResponse& isMasterReply)>; - - - /** - * @param commaSeparated should be 3 hosts comma separated - */ - SyncClusterConnection(const std::list<HostAndPort>&, double socketTimeout = 0); - SyncClusterConnection(std::string commaSeparated, double socketTimeout = 0); - SyncClusterConnection(const std::string& a, - const std::string& b, - const std::string& c, - double socketTimeout = 0); - ~SyncClusterConnection(); - - /** - * Sets the ConnectionValidationHook that will be used for the DBClientConnections created by - * all future constructed SyncClusterConnections. - */ - static void setConnectionValidationHook(ConnectionValidationHook hook); - - /** - * @return true if all servers are up and ready for writes - */ - bool prepare(std::string& errmsg); - - // --- from DBClientInterface - - virtual BSONObj findOne(const std::string& ns, - const Query& query, - const BSONObj* fieldsToReturn, - int queryOptions); - - virtual std::unique_ptr<DBClientCursor> query(const std::string& ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize); - - virtual std::unique_ptr<DBClientCursor> getMore(const std::string& ns, - long long cursorId, - int nToReturn, - int options); - - virtual void insert(const std::string& ns, BSONObj obj, int flags = 0); - - virtual void insert(const std::string& ns, const std::vector<BSONObj>& v, int flags = 0); - - virtual void remove(const std::string& ns, Query query, int flags); - - virtual void update(const std::string& ns, Query query, BSONObj obj, int flags); - - virtual bool call(Message& toSend, Message& response, bool assertOk, std::string* actualServer); - virtual void say(Message& toSend, bool isRetry = false, std::string* actualServer = 0); - - virtual void killCursor(long long cursorID); - - virtual std::string getServerAddress() const { - return _address; - } - virtual bool isFailed() const { - return false; - } - virtual bool isStillConnected(); - - int getMinWireVersion() final; - int getMaxWireVersion() final; - - virtual std::string toString() const { - return _toString(); - } - - virtual BSONObj getLastErrorDetailed( - const std::string& db, bool fsync = false, bool j = false, int w = 0, int wtimeout = 0); - virtual BSONObj getLastErrorDetailed(bool fsync = false, - bool j = false, - int w = 0, - int wtimeout = 0); - - virtual ConnectionString::ConnectionType type() const { - return ConnectionString::SYNC; - } - - void setAllSoTimeouts(double socketTimeout); - double getSoTimeout() const { - return _socketTimeout; - } - - - virtual bool lazySupported() const { - return false; - } - - // This override of runCommand intentionally calls findOne to construct a legacy - // OP_QUERY command. The reason for this is that delicate logic for targeting/locking - // config servers is in SyncClusterConnection::findOne, and refactoring that logic - // is both risky and of dubious value as we move to config server replica sets (CSRS). - bool runCommand(const std::string& dbname, - const BSONObj& cmd, - BSONObj& info, - int options) final; - - void setRequestMetadataWriter(rpc::RequestMetadataWriter writer) final; - void setReplyMetadataReader(rpc::ReplyMetadataReader reader) final; - - /** - * Allow custom query processing through an external (e.g. mongos-only) service. - * - * Takes ownership of attached handler. - */ - void attachQueryHandler(QueryHandler* handler); - -protected: - virtual void _auth(const BSONObj& params); - -private: - SyncClusterConnection(SyncClusterConnection& prev, double socketTimeout = 0); - std::string _toString() const; - bool _commandOnActive(const std::string& dbname, - const BSONObj& cmd, - BSONObj& info, - int options = 0); - std::unique_ptr<DBClientCursor> _queryOnActive(const std::string& ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize); - int _lockType(const std::string& name); - void _checkLast(); - void _connect(const std::string& host); - - std::string _address; - std::vector<std::string> _connAddresses; - std::vector<DBClientConnection*> _conns; - - std::vector<BSONObj> _lastErrors; - - // Optionally attached by user - std::unique_ptr<QueryHandler> _customQueryHandler; - - stdx::mutex _mutex; - std::map<std::string, int> _lockTypes; - // End mutex - - double _socketTimeout; -}; - -/** - * Interface for custom query processing for the SCC. - * Allows plugging different host query behaviors for different types of queries. - */ -class SyncClusterConnection::QueryHandler { -public: - virtual ~QueryHandler(){}; - - /** - * Returns true if the query can be processed using this handler. - */ - virtual bool canHandleQuery(const std::string& ns, Query query) = 0; - - /** - * Returns a cursor on one of the hosts with the desired results for the query. - * May throw or return an empty unique_ptr on failure. - */ - virtual std::unique_ptr<DBClientCursor> handleQuery(const std::vector<std::string>& hosts, - const std::string& ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize) = 0; -}; - -class UpdateNotTheSame : public UserException { -public: - UpdateNotTheSame(int code, - const std::string& msg, - const std::vector<std::string>& addrs, - const std::vector<BSONObj>& lastErrors) - : UserException(code, msg), _addrs(addrs), _lastErrors(lastErrors) { - verify(_addrs.size() == _lastErrors.size()); - } - - virtual ~UpdateNotTheSame() throw() {} - - unsigned size() const { - return _addrs.size(); - } - - std::pair<std::string, BSONObj> operator[](unsigned i) const { - return std::make_pair(_addrs[i], _lastErrors[i]); - } - -private: - std::vector<std::string> _addrs; - std::vector<BSONObj> _lastErrors; -}; -}; diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp index e0e691117fe..4606720cb33 100644 --- a/src/mongo/db/s/set_shard_version_command.cpp +++ b/src/mongo/db/s/set_shard_version_command.cpp @@ -363,18 +363,6 @@ private: return true; } - if (givenConnStr.type() == ConnectionString::SET && - storedConnStr.type() == ConnectionString::SYNC) { - log() << "Detected upgrade from mirrored (SCCC) config servers to " - "replica set (CSRS) config servers. setShardVersion was given: " - << givenConnStr - << " for the config server connection string, but has stored: " - << storedConnStr; - storedConnStr = givenConnStr; - - return true; - } - const auto& storedRawConfigString = storedConnStr.toString(); if (storedRawConfigString == configdb) { return true; diff --git a/src/mongo/s/catalog/catalog_manager_common.cpp b/src/mongo/s/catalog/catalog_manager_common.cpp index a7e81687631..dad99c9f3ee 100644 --- a/src/mongo/s/catalog/catalog_manager_common.cpp +++ b/src/mongo/s/catalog/catalog_manager_common.cpp @@ -95,10 +95,8 @@ StatusWith<ShardType> validateHostAsShard(OperationContext* txn, ShardRegistry* shardRegistry, const ConnectionString& connectionString, const std::string* shardProposedName) { - if (connectionString.type() == ConnectionString::SYNC) { - return {ErrorCodes::BadValue, - "can't use sync cluster as a shard; for a replica set, " - "you have to use <setname>/<server1>,<server2>,..."}; + if (connectionString.type() == ConnectionString::INVALID) { + return {ErrorCodes::BadValue, "Invalid connection string"}; } if (shardProposedName && shardProposedName->empty()) { diff --git a/src/mongo/s/catalog/legacy/config_upgrade.cpp b/src/mongo/s/catalog/legacy/config_upgrade.cpp index e1ff04131bc..ea09db1dc00 100644 --- a/src/mongo/s/catalog/legacy/config_upgrade.cpp +++ b/src/mongo/s/catalog/legacy/config_upgrade.cpp @@ -34,7 +34,6 @@ #include "mongo/client/connpool.h" #include "mongo/client/dbclientcursor.h" -#include "mongo/client/syncclusterconnection.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/dist_lock_manager.h" @@ -168,24 +167,10 @@ VersionStatus isConfigVersionCompatible(const VersionType& versionInfo, string* Status _checkConfigServersAlive(const ConnectionString& configLoc) { BSONObj result; try { - if (configLoc.type() == ConnectionString::SYNC) { - ScopedDbConnection conn(configLoc, 30); - // TODO: Dynamic cast is bad, we need a better way of managing this op - // via the heirarchy (or not) - SyncClusterConnection* scc = dynamic_cast<SyncClusterConnection*>(conn.get()); - fassert(16729, scc != NULL); - std::string errMsg; - if (!scc->prepare(errMsg)) { - return {ErrorCodes::HostUnreachable, errMsg}; - } - conn.done(); - return Status::OK(); - } else { - ScopedDbConnection conn(configLoc, 30); - conn->runCommand("admin", BSON("fsync" << 1), result); - conn.done(); - return getStatusFromCommandResult(result); - } + ScopedDbConnection conn(configLoc, 30); + conn->runCommand("admin", BSON("fsync" << 1), result); + conn.done(); + return getStatusFromCommandResult(result); } catch (const DBException& e) { return e.toStatus(); } diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp index 0d8cbfa7aae..86887c4826e 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp @@ -433,15 +433,15 @@ TEST_F(AddShardTest, StandaloneGenerateName) { TEST_F(AddShardTest, AddSCCCConnectionStringAsShard) { std::unique_ptr<RemoteCommandTargeterMock> targeter( stdx::make_unique<RemoteCommandTargeterMock>()); - auto scccConn = - ConnectionString(ConnectionString::SYNC, "host1:12345,host2:12345,host3:12345", ""); - targeter->setConnectionStringReturnValue(scccConn); + auto invalidConn = + ConnectionString("host1:12345,host2:12345,host3:12345", ConnectionString::INVALID); + targeter->setConnectionStringReturnValue(invalidConn); - auto future = launchAsync([this, scccConn] { + auto future = launchAsync([this, invalidConn] { const std::string shardName("StandaloneShard"); - auto status = catalogManager()->addShard(operationContext(), &shardName, scccConn, 100); + auto status = catalogManager()->addShard(operationContext(), &shardName, invalidConn, 100); ASSERT_EQUALS(ErrorCodes::BadValue, status); - ASSERT_STRING_CONTAINS(status.getStatus().reason(), "can't use sync cluster as a shard"); + ASSERT_STRING_CONTAINS(status.getStatus().reason(), "Invalid connection string"); }); future.timed_get(kFutureTimeout); diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript index 2c7069db671..a72d106d893 100644 --- a/src/mongo/s/client/SConscript +++ b/src/mongo/s/client/SConscript @@ -31,7 +31,6 @@ env.Library( target='sharding_connection_hook', source=[ 'multi_host_query.cpp', - 'scc_fast_query_handler.cpp', 'sharding_connection_hook.cpp', 'sharding_network_connection_hook.cpp', ], diff --git a/src/mongo/s/client/scc_fast_query_handler.cpp b/src/mongo/s/client/scc_fast_query_handler.cpp deleted file mode 100644 index c0871b9310f..00000000000 --- a/src/mongo/s/client/scc_fast_query_handler.cpp +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/s/client/scc_fast_query_handler.h" - -#include <vector> - -#include "mongo/base/init.h" -#include "mongo/bson/util/builder.h" -#include "mongo/client/connpool.h" -#include "mongo/client/dbclientcursor.h" -#include "mongo/client/dbclientinterface.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/server_parameters.h" -#include "mongo/util/log.h" - -namespace mongo { - -using std::unique_ptr; -using std::endl; -using std::string; -using std::vector; - -/** - * This parameter turns on fastest config reads for auth data only - *.system.users collections - * and the "usersInfo" command. This should be enough to prevent a non-responsive config from - * hanging other operations in a cluster. - */ -MONGO_EXPORT_SERVER_PARAMETER(internalSCCAllowFastestAuthConfigReads, bool, false); - -/** - * TESTING ONLY. - * - * This parameter turns on fastest config reads for *all* config.* collections except those - * deemed extremely critical (config.version, config.locks). - * - * NOT FOR PRODUCTION USE. - */ -MONGO_EXPORT_SERVER_PARAMETER(internalSCCAllowFastestMetadataConfigReads, bool, false); - -// -// The shared environment for MultiHostQueries -// - -namespace { - -class MultiQueryEnv : public MultiHostQueryOp::SystemEnv { -public: - virtual ~MultiQueryEnv() {} - - Date_t currentTimeMillis(); - - StatusWith<DBClientCursor*> doBlockingQuery(const ConnectionString& host, - const QuerySpec& query); -}; - -StatusWith<DBClientCursor*> MultiQueryEnv::doBlockingQuery(const ConnectionString& host, - const QuerySpec& query) { - // - // Note that this function may be active during shutdown. This means that we must - // handle connection pool shutdown exceptions (uasserts). The results of this - // operation must then be correctly discarded by the calling thread. - // - - unique_ptr<DBClientCursor> cursor; - - try { - ScopedDbConnection conn(host, 30.0 /* timeout secs */); - - cursor = conn->query(query.ns(), - query.filter(), - query.ntoreturn(), - query.ntoskip(), - query.fieldsPtr(), - query.options(), - 0); - - if (NULL == cursor.get()) { - // Confusingly, exceptions here are written to the log, not thrown - - StringBuilder builder; - builder << "error querying server " << host.toString() - << ", could not create cursor for query"; - - warning() << builder.str() << endl; - return StatusWith<DBClientCursor*>(ErrorCodes::HostUnreachable, builder.str()); - } - - // Confusingly, this *detaches* the cursor from the connection it was established - // on, further getMore calls will use a (potentially different) ScopedDbConnection. - // - // Calls done() too. - cursor->attach(&conn); - } catch (const DBException& ex) { - return StatusWith<DBClientCursor*>(ex.toStatus()); - } - - return StatusWith<DBClientCursor*>(cursor.release()); -} - -Date_t MultiQueryEnv::currentTimeMillis() { - return jsTime(); -} -} - -// Shared networking environment which executes queries. -// NOTE: This environment must stay in scope as long as per-host threads are executing queries - -// i.e. for the lifetime of the server. -// Once the thread pools are disposed and connections shut down, the per-host threads should -// be self-contained and correctly shut down after discarding the results. -static MultiQueryEnv* _multiQueryEnv; - -namespace { - -// -// Create the shared multi-query environment at startup -// - -MONGO_INITIALIZER(InitMultiQueryEnv)(InitializerContext* context) { - // Leaked intentionally - _multiQueryEnv = new MultiQueryEnv(); - return Status::OK(); -} -} - -// -// Per-SCC handling of queries -// - -SCCFastQueryHandler::SCCFastQueryHandler() : _queryThreads(1, false) {} - -bool SCCFastQueryHandler::canHandleQuery(const string& ns, Query query) { - if (!internalSCCAllowFastestAuthConfigReads && !internalSCCAllowFastestMetadataConfigReads) { - return false; - } - - // - // More operations can be added here - // - // NOTE: Not all operations actually pass through the SCC _queryOnActive path - notable - // exceptions include anything related to direct query ops and direct operations for - // connection maintenance. - // - - NamespaceString nss(ns); - if (nss.isCommand()) { - BSONObj cmdObj = query.getFilter(); - string cmdName = cmdObj.firstElement().fieldName(); - if (cmdName == "usersInfo") - return true; - } else if (nss.coll() == "system.users") { - return true; - } - - // - // Allow fastest config reads for all collections except for those involved in locks and - // cluster versioning. - // - - if (!internalSCCAllowFastestMetadataConfigReads) - return false; - - if (nss.db() != "config") - return false; - - if (nss.coll() != "version" && nss.coll() != "locks" && nss.coll() != "lockpings") { - return true; - } - - return false; -} - -static vector<ConnectionString> getHosts(const vector<string> hostStrings) { - vector<ConnectionString> hosts; - for (const auto& host : hostStrings) { - hosts.push_back(fassertStatusOK(28738, ConnectionString::parse(host))); - } - - return hosts; -} - -unique_ptr<DBClientCursor> SCCFastQueryHandler::handleQuery(const vector<string>& hostStrings, - const string& ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize) { - MultiHostQueryOp queryOp(_multiQueryEnv, &_queryThreads); - - QuerySpec querySpec(ns, - query.obj, - fieldsToReturn ? *fieldsToReturn : BSONObj(), - nToSkip, - nToReturn, - queryOptions); - - // TODO: Timeout must be passed down here as well - 30s timeout may not be applicable for - // all operations handled. - StatusWith<DBClientCursor*> status = - queryOp.queryAny(getHosts(hostStrings), querySpec, 30 * 1000); - uassertStatusOK(status.getStatus()); - - unique_ptr<DBClientCursor> cursor(status.getValue()); - return cursor; -} -} diff --git a/src/mongo/s/client/scc_fast_query_handler.h b/src/mongo/s/client/scc_fast_query_handler.h deleted file mode 100644 index c2079e717ff..00000000000 --- a/src/mongo/s/client/scc_fast_query_handler.h +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/client/syncclusterconnection.h" -#include "mongo/s/client/multi_host_query.h" - -namespace mongo { - -/** - * Query handler that plugs in to a SyncClusterConnection and allows query on fastest host - * (if enabled). - * - * Glue code which shields the MultiHostQuery and server parameters from the separate client - * module which knows about neither. - * - * There is a *single* SCCFastQueryHandler for every SCC. Each SCCFastQueryHandler contains - * its own thread pool (lazily initialized) so that at maximum there is a thread-per-SCC-host - * and this thread may have an open connection to the host until it times out. - * If using the "fastestConfigReads" options, clients must be ready for the additional thread - * and connection load when configs are slow. - */ -class SCCFastQueryHandler : public SyncClusterConnection::QueryHandler { -public: - SCCFastQueryHandler(); - - virtual ~SCCFastQueryHandler() {} - - virtual bool canHandleQuery(const std::string& ns, Query query); - - virtual std::unique_ptr<DBClientCursor> handleQuery(const std::vector<std::string>& hostStrings, - const std::string& ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize); - -private: - // The thread pool itself is scoped to the handler and SCC, and lazily creates threads - // per-host as needed. This ensures query starvation cannot occur due to other active - // client threads - though a thread must be created for every client. - HostThreadPools _queryThreads; -}; -} diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 4965f58da85..8cd38e66d9e 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -423,11 +423,7 @@ void ShardRegistry::appendConnectionStats(executor::ConnectionPoolStats* stats) } void ShardRegistry::_addConfigShard_inlock() { - _addShard_inlock("config", - _configServerCS, - _configServerCS.type() == ConnectionString::SYNC - ? nullptr - : _targeterFactory->create(_configServerCS)); + _addShard_inlock("config", _configServerCS, _targeterFactory->create(_configServerCS)); } void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) { diff --git a/src/mongo/s/client/sharding_connection_hook.cpp b/src/mongo/s/client/sharding_connection_hook.cpp index 7f325611db9..d3cbd242c04 100644 --- a/src/mongo/s/client/sharding_connection_hook.cpp +++ b/src/mongo/s/client/sharding_connection_hook.cpp @@ -43,7 +43,6 @@ #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/audit_metadata.h" #include "mongo/rpc/metadata/config_server_metadata.h" -#include "mongo/s/client/scc_fast_query_handler.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/grid.h" @@ -115,6 +114,11 @@ ShardingConnectionHook::ShardingConnectionHook(bool shardedConnections) : _shardedConnections(shardedConnections) {} void ShardingConnectionHook::onCreate(DBClientBase* conn) { + if (conn->type() == ConnectionString::INVALID) { + throw UserException(ErrorCodes::BadValue, + str::stream() << "Unrecognized connection string."); + } + // Authenticate as the first thing we do // NOTE: Replica set authentication allows authentication against *any* online host if (getGlobalAuthorizationManager()->isAuthEnabled()) { @@ -136,12 +140,6 @@ void ShardingConnectionHook::onCreate(DBClientBase* conn) { return _shardingRequestMetadataWriter(_shardedConnections, metadataBob, hostStringData); }); - if (conn->type() == ConnectionString::SYNC) { - throw UserException(ErrorCodes::UnsupportedFormat, - str::stream() << "Unrecognized connection string type: " << conn->type() - << "."); - } - if (conn->type() == ConnectionString::MASTER) { BSONObj isMasterResponse; if (!conn->runCommand("admin", BSON("ismaster" << 1), isMasterResponse)) { diff --git a/src/mongo/s/mongos_options.cpp b/src/mongo/s/mongos_options.cpp index 526cf3f6925..17e85baff74 100644 --- a/src/mongo/s/mongos_options.cpp +++ b/src/mongo/s/mongos_options.cpp @@ -254,41 +254,32 @@ Status storeMongosOptions(const moe::Environment& params, const std::vector<std: return Status(ErrorCodes::BadValue, "error: no args for --configdb"); } - { - std::string configdbString = params["sharding.configDB"].as<std::string>(); - - auto configdbConnectionString = ConnectionString::parse(configdbString); - if (!configdbConnectionString.isOK()) { - return Status(ErrorCodes::BadValue, - str::stream() << "Invalid configdb connection string: " - << configdbConnectionString.getStatus().toString()); - } - - std::vector<HostAndPort> seedServers; - for (const auto& host : configdbConnectionString.getValue().getServers()) { - seedServers.push_back(host); - if (!seedServers.back().hasPort()) { - seedServers.back() = HostAndPort{host.host(), ServerGlobalParams::ConfigServerPort}; - } - } + std::string configdbString = params["sharding.configDB"].as<std::string>(); - mongosGlobalParams.configdbs = - ConnectionString{configdbConnectionString.getValue().type(), - seedServers, - configdbConnectionString.getValue().getSetName()}; + auto configdbConnectionString = ConnectionString::parse(configdbString); + if (!configdbConnectionString.isOK()) { + return configdbConnectionString.getStatus(); } - std::vector<HostAndPort> configServers = mongosGlobalParams.configdbs.getServers(); - - if (mongosGlobalParams.configdbs.type() != ConnectionString::SYNC && - mongosGlobalParams.configdbs.type() != ConnectionString::SET && - mongosGlobalParams.configdbs.type() != ConnectionString::MASTER) { + if (configdbConnectionString.getValue().type() != ConnectionString::SET) { return Status(ErrorCodes::BadValue, - str::stream() << "Invalid config server value " - << mongosGlobalParams.configdbs.toString()); + str::stream() << "configdb supports only replica set connection string"); } - if (configServers.size() < 3) { + std::vector<HostAndPort> seedServers; + for (const auto& host : configdbConnectionString.getValue().getServers()) { + seedServers.push_back(host); + if (!seedServers.back().hasPort()) { + seedServers.back() = HostAndPort{host.host(), ServerGlobalParams::ConfigServerPort}; + } + } + + mongosGlobalParams.configdbs = + ConnectionString{configdbConnectionString.getValue().type(), + seedServers, + configdbConnectionString.getValue().getSetName()}; + + if (mongosGlobalParams.configdbs.getServers().size() < 3) { warning() << "Running a sharded cluster with fewer than 3 config servers should only be " "done for testing purposes and is not recommended for production."; } diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 948e12b2af6..8f697a37595 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -190,7 +190,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, // Use read pref to target a particular host from each shard. Also construct the find command // that we will forward to each shard. for (const auto& shard : shards) { - invariant(!shard->isConfig() || shard->getConnString().type() != ConnectionString::SYNC); + invariant(!shard->isConfig() || shard->getConnString().type() != ConnectionString::INVALID); // Build the find command, and attach shard version if necessary. BSONObjBuilder cmdBuilder; diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 0ad8aa554f5..437e11e4e48 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -36,7 +36,6 @@ #include "mongo/base/status.h" #include "mongo/client/remote_command_targeter_factory_impl.h" -#include "mongo/client/syncclusterconnection.h" #include "mongo/db/audit.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" @@ -177,15 +176,10 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(std::unique_ptr<NetworkIn Status initializeGlobalShardingState(OperationContext* txn, const ConnectionString& configCS, bool allowNetworking) { - if (configCS.type() == ConnectionString::SYNC) { - return {ErrorCodes::UnsupportedFormat, - "SYNC config server connection string is not allowed."}; + if (configCS.type() == ConnectionString::INVALID) { + return {ErrorCodes::BadValue, "Unrecognized connection string."}; } - SyncClusterConnection::setConnectionValidationHook( - [](const HostAndPort& target, const executor::RemoteCommandResponse& isMasterReply) { - return ShardingNetworkConnectionHook::validateHostImpl(target, isMasterReply); - }); auto network = executor::makeNetworkInterface("NetworkInterfaceASIO-ShardRegistry", stdx::make_unique<ShardingNetworkConnectionHook>(), diff --git a/src/mongo/s/version_manager.cpp b/src/mongo/s/version_manager.cpp index 4fb45bdbccb..7cbb2787e82 100644 --- a/src/mongo/s/version_manager.cpp +++ b/src/mongo/s/version_manager.cpp @@ -161,12 +161,6 @@ DBClientBase* getVersionable(DBClientBase* conn) { return nullptr; case ConnectionString::MASTER: return conn; - case ConnectionString::SYNC: - massert(15906, - str::stream() << "cannot set version or shard on sync connection " - << conn->toString(), - false); - return nullptr; case ConnectionString::CUSTOM: massert(16334, str::stream() << "cannot set version or shard on custom connection " diff --git a/src/mongo/shell/servers_misc.js b/src/mongo/shell/servers_misc.js index f27cd754b58..8f3bff9d9e0 100644 --- a/src/mongo/shell/servers_misc.js +++ b/src/mongo/shell/servers_misc.js @@ -201,53 +201,7 @@ allocatePorts = function(numPorts) { return ports; }; -SyncCCTest = function(testName, extraMongodOptions) { - this._testName = testName; - this._connections = []; - - for (var i = 0; i < 3; i++) { - this._connections.push(MongoRunner.runMongod(extraMongodOptions)); - } - - this.url = this._connections.map(function(z) { - return z.name; - }).join(","); - this.conn = new Mongo(this.url); -}; - -SyncCCTest.prototype.stop = function() { - for (var i = 0; i < this._connections.length; i++) { - _stopMongoProgram(30000 + i); - } - - print('*** ' + this._testName + " completed successfully ***"); -}; - -SyncCCTest.prototype.checkHashes = function(dbname, msg) { - var hashes = this._connections.map(function(z) { - return z.getDB(dbname).runCommand("dbhash"); - }); - - for (var i = 1; i < hashes.length; i++) { - assert.eq(hashes[0].md5, - hashes[i].md5, - "checkHash on " + dbname + " " + msg + "\n" + tojson(hashes)); - } -}; - -SyncCCTest.prototype.tempKill = function(num) { - num = num || 0; - MongoRunner.stopMongod(this._connections[num]); -}; - -SyncCCTest.prototype.tempStart = function(num) { - num = num || 0; - var old = this._connections[num]; - this._connections[num] = MongoRunner.runMongod( - {restart: true, cleanData: false, port: old.port, dbpath: old.dbpath}); -}; - -function startParallelShell(jsCode, port, noConnect) { +function startParallelShell( jsCode, port, noConnect ) { var args = ["mongo"]; if (typeof db == "object") { diff --git a/src/mongo/shell/shardingtest.js b/src/mongo/shell/shardingtest.js index 6651057143e..a167ac1c8e5 100644 --- a/src/mongo/shell/shardingtest.js +++ b/src/mongo/shell/shardingtest.js @@ -30,9 +30,8 @@ * configuration object(s)(*). @see MongoRunner.runMongod * * config {number|Object|Array.<Object>}: number of config server or - * config server configuration object(s)(*). If this field has 3 or - * more members, it implies other.sync = true. @see MongoRunner.runMongod - * + * config server configuration object(s)(*). @see MongoRunner.runMongod + * * (*) There are two ways For multiple configuration objects. * (1) Using the object format. Example: * @@ -58,9 +57,7 @@ * * shardOptions {Object}: same as the shards property above. * Can be used to specify options that are common all shards. - * - * sync {boolean}: Use SyncClusterConnection, and readies - * 1 or 3 config servers, based on the value of numConfigs. + * * configOptions {Object}: same as the config property above. * Can be used to specify options that are common all config servers. * mongosOptions {Object}: same as the mongos property above. @@ -1182,133 +1179,57 @@ var ShardingTest = function(params) { rsConn.rs = rs; } - // Default to using 3-node legacy config servers if jsTestOptions().useLegacyOptions is true - // and the user didn't explicity specify a different config server configuration - if (jsTestOptions().useLegacyConfigServers && otherParams.sync !== false && - (typeof(otherParams.config) === 'undefined' || numConfigs === 3)) { - otherParams.sync = true; - } - this._configServers = []; - // Start the config servers - if (otherParams.sync) { - if (numConfigs !== 1 && numConfigs !== 3) { - throw Error('Sync config servers only supported with 1 or 3 nodes'); - } - - var configNames = []; - for (var i = 0; i < numConfigs; i++) { - var options = { - useHostname: otherParams.useHostname, - noJournalPrealloc: otherParams.nopreallocj, - pathOpts: Object.merge(pathOpts, {config: i}), - dbpath: "$testName-config$config", - keyFile: keyFile, - // Ensure that journaling is always enabled for config servers. - journal: "", - configsvr: "" - }; - - if (otherParams.configOptions && otherParams.configOptions.binVersion) { - otherParams.configOptions.binVersion = - MongoRunner.versionIterator(otherParams.configOptions.binVersion); - } - - options = Object.merge(options, otherParams.configOptions); - options = Object.merge(options, otherParams["c" + i]); - - options.port = options.port || allocatePort(); - - if (otherParams.useBridge) { - var bridgeOptions = - Object.merge(otherParams.bridgeOptions, options.bridgeOptions || {}); - bridgeOptions = Object.merge( - bridgeOptions, - { - hostName: otherParams.useHostname ? hostName : "localhost", - // The mongod processes identify themselves to mongobridge as host:port, where - // the host is the actual hostname of the machine and not localhost. - dest: hostName + ":" + options.port, - }); - - var bridge = new MongoBridge(bridgeOptions); - } - - var conn = MongoRunner.runMongod(options); - if (!conn) { - throw new Error("Failed to start config server " + i); - } - - if (otherParams.useBridge) { - bridge.connectToBridge(); - this._configServers.push(bridge); - unbridgedConfigServers.push(conn); - configNames.push(bridge.host); - } else { - this._configServers.push(conn); - configNames.push(conn.name); - } - - _alldbpaths.push(testName + "-config" + i); - this["config" + i] = this._configServers[i]; - this["c" + i] = this._configServers[i]; - } - - this._configDB = configNames.join(','); - } else { - // Using replica set for config servers - var rstOptions = { - useHostName: otherParams.useHostname, - useBridge: otherParams.useBridge, - bridgeOptions: otherParams.bridgeOptions, - keyFile: keyFile, - name: testName + "-configRS", - }; - - // when using CSRS, always use wiredTiger as the storage engine - var startOptions = { - pathOpts: pathOpts, - // Ensure that journaling is always enabled for config servers. - journal: "", - configsvr: "", - noJournalPrealloc: otherParams.nopreallocj, - storageEngine: "wiredTiger", - }; - - if (otherParams.configOptions && otherParams.configOptions.binVersion) { - otherParams.configOptions.binVersion = - MongoRunner.versionIterator(otherParams.configOptions.binVersion); - } + // Using replica set for config servers + var rstOptions = { useHostName : otherParams.useHostname, + useBridge : otherParams.useBridge, + bridgeOptions : otherParams.bridgeOptions, + keyFile : keyFile, + name: testName + "-configRS", + }; + + // when using CSRS, always use wiredTiger as the storage engine + var startOptions = { pathOpts: pathOpts, + // Ensure that journaling is always enabled for config servers. + journal : "", + configsvr : "", + noJournalPrealloc : otherParams.nopreallocj, + storageEngine : "wiredTiger", + }; + + if (otherParams.configOptions && otherParams.configOptions.binVersion) { + otherParams.configOptions.binVersion = + MongoRunner.versionIterator(otherParams.configOptions.binVersion); + } - startOptions = Object.merge(startOptions, otherParams.configOptions); - rstOptions = Object.merge(rstOptions, otherParams.configReplSetTestOptions); + startOptions = Object.merge(startOptions, otherParams.configOptions); + rstOptions = Object.merge(rstOptions, otherParams.configReplSetTestOptions); - var nodeOptions = []; - for (var i = 0; i < numConfigs; ++i) { - nodeOptions.push(otherParams["c" + i] || {}); - } + var nodeOptions = []; + for (var i = 0; i < numConfigs; ++i) { + nodeOptions.push(otherParams["c" + i] || {}); + } - rstOptions.nodes = nodeOptions; + rstOptions.nodes = nodeOptions; - this.configRS = new ReplSetTest(rstOptions); - this.configRS.startSet(startOptions); + this.configRS = new ReplSetTest(rstOptions); + this.configRS.startSet(startOptions); - var config = this.configRS.getReplSetConfig(); - config.configsvr = true; - config.settings = config.settings || {}; - var initiateTimeout = otherParams.rsOptions && otherParams.rsOptions.initiateTimeout; - this.configRS.initiate(config, null, initiateTimeout); + var config = this.configRS.getReplSetConfig(); + config.configsvr = true; + config.settings = config.settings || {}; + var initiateTimeout = otherParams.rsOptions && otherParams.rsOptions.initiateTimeout; + this.configRS.initiate(config, null, initiateTimeout); - this.configRS.getPrimary(); // Wait for master to be elected before starting mongos + this.configRS.getPrimary(); // Wait for master to be elected before starting mongos - this._configDB = this.configRS.getURL(); - this._configServers = this.configRS.nodes; - for (var i = 0; i < numConfigs; ++i) { - var conn = this._configServers[i]; - this["config" + i] = conn; - this["c" + i] = conn; - } + this._configDB = this.configRS.getURL(); + this._configServers = this.configRS.nodes; + for (var i = 0; i < numConfigs; ++i) { + var conn = this._configServers[i]; + this["config" + i] = conn; + this["c" + i] = conn; } printjson("config servers: " + this._configDB); diff --git a/src/mongo/shell/utils.js b/src/mongo/shell/utils.js index 75bf3b9f7f8..2cb54bb1d2a 100644 --- a/src/mongo/shell/utils.js +++ b/src/mongo/shell/utils.js @@ -186,7 +186,6 @@ jsTestOptions = function() { authMechanism: TestData.authMechanism, adminUser: TestData.adminUser || "admin", adminPassword: TestData.adminPassword || "password", - useLegacyConfigServers: TestData.useLegacyConfigServers || false, useLegacyReplicationProtocol: TestData.useLegacyReplicationProtocol || false, enableMajorityReadConcern: TestData.enableMajorityReadConcern, enableEncryption: TestData.enableEncryption, |