summaryrefslogtreecommitdiff
path: root/src/mongo/client/syncclusterconnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client/syncclusterconnection.cpp')
-rw-r--r--src/mongo/client/syncclusterconnection.cpp689
1 files changed, 0 insertions, 689 deletions
diff --git a/src/mongo/client/syncclusterconnection.cpp b/src/mongo/client/syncclusterconnection.cpp
deleted file mode 100644
index 52ed06592ec..00000000000
--- a/src/mongo/client/syncclusterconnection.cpp
+++ /dev/null
@@ -1,689 +0,0 @@
-// syncclusterconnection.cpp
-/*
- * Copyright 2010 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/client/syncclusterconnection.h"
-
-#include "mongo/client/dbclientcursor.h"
-#include "mongo/client/dbclientinterface.h"
-#include "mongo/db/dbmessage.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-
-using std::unique_ptr;
-using std::endl;
-using std::list;
-using std::map;
-using std::string;
-using std::stringstream;
-using std::vector;
-
-namespace {
-SyncClusterConnection::ConnectionValidationHook connectionHook;
-} // namespace
-
-SyncClusterConnection::SyncClusterConnection(const list<HostAndPort>& L, double socketTimeout)
- : _socketTimeout(socketTimeout) {
- {
- stringstream s;
- int n = 0;
- for (list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++) {
- if (++n > 1)
- s << ',';
- s << i->toString();
- }
- _address = s.str();
- }
- for (list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++)
- _connect(i->toString());
-}
-
-SyncClusterConnection::SyncClusterConnection(string commaSeparated, double socketTimeout)
- : _socketTimeout(socketTimeout) {
- _address = commaSeparated;
- string::size_type idx;
- while ((idx = commaSeparated.find(',')) != string::npos) {
- string h = commaSeparated.substr(0, idx);
- commaSeparated = commaSeparated.substr(idx + 1);
- _connect(h);
- }
- _connect(commaSeparated);
- uassert(8004, "SyncClusterConnection needs 3 servers", _conns.size() == 3);
-}
-
-SyncClusterConnection::SyncClusterConnection(const std::string& a,
- const std::string& b,
- const std::string& c,
- double socketTimeout)
- : _socketTimeout(socketTimeout) {
- _address = a + "," + b + "," + c;
- // connect to all even if not working
- _connect(a);
- _connect(b);
- _connect(c);
-}
-
-SyncClusterConnection::SyncClusterConnection(SyncClusterConnection& prev, double socketTimeout)
- : _socketTimeout(socketTimeout) {
- verify(0);
-}
-
-SyncClusterConnection::~SyncClusterConnection() {
- for (size_t i = 0; i < _conns.size(); i++)
- delete _conns[i];
- _conns.clear();
-}
-
-void SyncClusterConnection::setConnectionValidationHook(ConnectionValidationHook hook) {
- connectionHook = std::move(hook);
-}
-
-bool SyncClusterConnection::prepare(string& errmsg) {
- _lastErrors.clear();
-
- bool ok = true;
- errmsg = "";
-
- for (size_t i = 0; i < _conns.size(); i++) {
- string singleErr;
- try {
- _conns[i]->simpleCommand("admin", NULL, "resetError");
- singleErr = _conns[i]->getLastError(true);
-
- if (singleErr.size() == 0)
- continue;
-
- } catch (const DBException& e) {
- if (e.getCode() == ErrorCodes::IncompatibleCatalogManager) {
- throw;
- }
- singleErr = e.toString();
- }
- ok = false;
- errmsg += " " + _conns[i]->toString() + ":" + singleErr;
- }
-
- return ok;
-}
-
-void SyncClusterConnection::_checkLast() {
- _lastErrors.clear();
- vector<string> errors;
-
- for (size_t i = 0; i < _conns.size(); i++) {
- BSONObj res;
- string err;
- try {
- if (!_conns[i]->runCommand("admin", BSON("getlasterror" << 1 << "fsync" << 1), res))
- err = "cmd failed: ";
- } catch (std::exception& e) {
- err += e.what();
- } catch (...) {
- err += "unknown failure";
- }
- _lastErrors.push_back(res.getOwned());
- errors.push_back(err);
- }
-
- verify(_lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size());
-
- stringstream err;
- bool ok = true;
-
- for (size_t i = 0; i < _conns.size(); i++) {
- BSONObj res = _lastErrors[i];
- if (res["ok"].trueValue() &&
- (res["fsyncFiles"].numberInt() > 0 || res.hasElement("waited") ||
- res["syncMillis"].numberInt() >= 0))
- continue;
- ok = false;
- err << _conns[i]->toString() << ": " << res << " " << errors[i];
- }
-
- if (ok)
- return;
- throw UserException(8001, (string) "SyncClusterConnection write op failed: " + err.str());
-}
-
-BSONObj SyncClusterConnection::getLastErrorDetailed(bool fsync, bool j, int w, int wtimeout) {
- return getLastErrorDetailed("admin", fsync, j, w, wtimeout);
-}
-
-BSONObj SyncClusterConnection::getLastErrorDetailed(
- const std::string& db, bool fsync, bool j, int w, int wtimeout) {
- if (_lastErrors.size())
- return _lastErrors[0];
- return DBClientBase::getLastErrorDetailed(db, fsync, j, w, wtimeout);
-}
-
-void SyncClusterConnection::_connect(const std::string& hostStr) {
- log() << "SyncClusterConnection connecting to [" << hostStr << "]" << endl;
- const HostAndPort host(hostStr);
- DBClientConnection* c;
- if (connectionHook) {
- c = new DBClientConnection(
- true, // auto reconnect
- 0, // socket timeout
- [this, host](const executor::RemoteCommandResponse& isMasterReply) {
- return connectionHook(host, isMasterReply);
- });
- } else {
- c = new DBClientConnection(true);
- }
-
- c->setRequestMetadataWriter(getRequestMetadataWriter());
- c->setReplyMetadataReader(getReplyMetadataReader());
- c->setSoTimeout(_socketTimeout);
- Status status = c->connect(host);
- if (!status.isOK()) {
- log() << "SyncClusterConnection connect fail to: " << hostStr << causedBy(status);
- if (status == ErrorCodes::IncompatibleCatalogManager) {
- // Make sure to propagate IncompatibleCatalogManager errors to trigger catalog manager
- // swapping.
- uassertStatusOK(status);
- }
- }
- _connAddresses.push_back(hostStr);
- _conns.push_back(c);
-}
-
-bool SyncClusterConnection::runCommand(const std::string& dbname,
- const BSONObj& cmd,
- BSONObj& info,
- int options) {
- std::string ns = dbname + ".$cmd";
- BSONObj interposedCmd = cmd;
-
- if (getRequestMetadataWriter()) {
- // We have a metadata writer. We need to upconvert the metadata, write to it,
- // Then downconvert it again. This unfortunate, but this code is going to be
- // removed anyway as part of CSRS.
-
- BSONObj upconvertedCommand;
- BSONObj upconvertedMetadata;
-
- std::tie(upconvertedCommand, upconvertedMetadata) =
- uassertStatusOK(rpc::upconvertRequestMetadata(cmd, options));
-
- BSONObjBuilder metadataBob;
- metadataBob.appendElements(upconvertedMetadata);
-
- uassertStatusOK(getRequestMetadataWriter()(&metadataBob, getServerAddress()));
-
- std::tie(interposedCmd, options) = uassertStatusOK(
- rpc::downconvertRequestMetadata(std::move(upconvertedCommand), metadataBob.done()));
- }
-
- BSONObj legacyResult = findOne(ns, Query(interposedCmd), 0, options);
-
- BSONObj upconvertedMetadata;
- BSONObj upconvertedReply;
-
- std::tie(upconvertedReply, upconvertedMetadata) =
- uassertStatusOK(rpc::upconvertReplyMetadata(legacyResult));
-
- if (getReplyMetadataReader()) {
- // TODO: what does getServerAddress() actually mean here as this connection
- // represents a connection to 1 or 3 config servers...
- uassertStatusOK(getReplyMetadataReader()(upconvertedReply, getServerAddress()));
- }
-
- info = upconvertedReply;
-
- return isOk(info);
-}
-
-BSONObj SyncClusterConnection::findOne(const string& ns,
- const Query& query,
- const BSONObj* fieldsToReturn,
- int queryOptions) {
- if (ns.find(".$cmd") != string::npos) {
- string cmdName = query.obj.firstElementFieldName();
-
- int lockType = _lockType(cmdName);
-
- if (lockType > 0) { // write $cmd
- string errmsg;
- if (!prepare(errmsg))
- throw UserException(ErrorCodes::PrepareConfigsFailed,
- (string) "SyncClusterConnection::findOne prepare failed: " +
- errmsg);
-
- vector<BSONObj> all;
- for (size_t i = 0; i < _conns.size(); i++) {
- all.push_back(_conns[i]->findOne(ns, query, 0, queryOptions).getOwned());
- }
-
- _checkLast();
-
- for (size_t i = 0; i < all.size(); i++) {
- Status status = getStatusFromCommandResult(all[i]);
- if (status.isOK()) {
- continue;
- }
-
- stringstream ss;
- ss << "write $cmd failed on a node: " << status.toString();
- ss << " " << _conns[i]->toString();
- ss << " ns: " << ns;
- ss << " cmd: " << query.toString();
- throw UserException(status.code(), ss.str());
- }
-
- return all[0];
- }
- }
-
- return DBClientBase::findOne(ns, query, fieldsToReturn, queryOptions);
-}
-
-void SyncClusterConnection::_auth(const BSONObj& params) {
- // A SCC is authenticated if any connection has been authenticated
- // Credentials are stored in the auto-reconnect connections.
-
- bool authedOnce = false;
- vector<string> errors;
-
- for (vector<DBClientConnection*>::iterator it = _conns.begin(); it < _conns.end(); ++it) {
- massert(15848, "sync cluster of sync clusters?", (*it)->type() != ConnectionString::SYNC);
-
- // Authenticate or collect the error message
- string lastErrmsg;
- bool authed;
- try {
- // Auth errors can manifest either as exceptions or as false results
- // TODO: Make this better
- (*it)->auth(params);
- authed = true;
- } catch (const DBException& e) {
- // auth will be retried on reconnect
- lastErrmsg = e.what();
- authed = false;
- }
-
- if (!authed) {
- // Since we're using auto-reconnect connections, we're sure the auth info has been
- // stored if needed for later
-
- lastErrmsg = str::stream() << "auth error on " << (*it)->getServerAddress()
- << causedBy(lastErrmsg);
-
- LOG(1) << lastErrmsg << endl;
- errors.push_back(lastErrmsg);
- }
-
- authedOnce = authedOnce || authed;
- }
-
- if (authedOnce)
- return;
-
- // Assemble the error message
- str::stream errStream;
- for (vector<string>::iterator it = errors.begin(); it != errors.end(); ++it) {
- if (it != errors.begin())
- errStream << " ::and:: ";
- errStream << *it;
- }
-
- uasserted(ErrorCodes::AuthenticationFailed, errStream);
-}
-
-// TODO: logout is required for use of this class outside of a cluster environment
-
-unique_ptr<DBClientCursor> SyncClusterConnection::query(const string& ns,
- Query query,
- int nToReturn,
- int nToSkip,
- const BSONObj* fieldsToReturn,
- int queryOptions,
- int batchSize) {
- _lastErrors.clear();
- if (ns.find(".$cmd") != string::npos) {
- string cmdName = query.obj.firstElementFieldName();
- int lockType = _lockType(cmdName);
- uassert(13054,
- (string) "write $cmd not supported in SyncClusterConnection::query for:" + cmdName,
- lockType <= 0);
- }
-
- return _queryOnActive(ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize);
-}
-
-bool SyncClusterConnection::_commandOnActive(const string& dbname,
- const BSONObj& cmd,
- BSONObj& info,
- int options) {
- unique_ptr<DBClientCursor> cursor = _queryOnActive(dbname + ".$cmd", cmd, 1, 0, 0, options, 0);
- if (cursor->more())
- info = cursor->next().copy();
- else
- info = BSONObj();
- return isOk(info);
-}
-
-void SyncClusterConnection::attachQueryHandler(QueryHandler* handler) {
- _customQueryHandler.reset(handler);
-}
-
-unique_ptr<DBClientCursor> SyncClusterConnection::_queryOnActive(const string& ns,
- Query query,
- int nToReturn,
- int nToSkip,
- const BSONObj* fieldsToReturn,
- int queryOptions,
- int batchSize) {
- if (_customQueryHandler && _customQueryHandler->canHandleQuery(ns, query)) {
- LOG(2) << "custom query handler used for query on " << ns << ": " << query.toString()
- << endl;
-
- return _customQueryHandler->handleQuery(
- _connAddresses, ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize);
- }
-
- for (size_t i = 0; i < _conns.size(); i++) {
- try {
- unique_ptr<DBClientCursor> cursor = _conns[i]->query(
- ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize);
- if (cursor.get())
- return cursor;
-
- log() << "query on " << ns << ": " << query.toString()
- << " failed to: " << _conns[i]->toString() << " no data" << endl;
- } catch (std::exception& e) {
- log() << "query on " << ns << ": " << query.toString()
- << " failed to: " << _conns[i]->toString() << " exception: " << e.what() << endl;
- } catch (...) {
- log() << "query on " << ns << ": " << query.toString()
- << " failed to: " << _conns[i]->toString() << " exception" << endl;
- }
- }
- throw UserException(ErrorCodes::HostUnreachable,
- str::stream()
- << "all servers down/unreachable when querying: " << _address);
-}
-
-unique_ptr<DBClientCursor> SyncClusterConnection::getMore(const string& ns,
- long long cursorId,
- int nToReturn,
- int options) {
- uassert(10022, "SyncClusterConnection::getMore not supported yet", 0);
- unique_ptr<DBClientCursor> c;
- return c;
-}
-
-void SyncClusterConnection::insert(const string& ns, BSONObj obj, int flags) {
- uassert(13119,
- str::stream() << "SyncClusterConnection::insert obj has to have an _id: " << obj,
- nsToCollectionSubstring(ns) == "system.indexes" || obj["_id"].type());
-
- string errmsg;
- if (!prepare(errmsg))
- throw UserException(8003,
- (string) "SyncClusterConnection::insert prepare failed: " + errmsg);
-
- for (size_t i = 0; i < _conns.size(); i++) {
- _conns[i]->insert(ns, obj, flags);
- }
-
- _checkLast();
-}
-
-void SyncClusterConnection::insert(const string& ns, const vector<BSONObj>& v, int flags) {
- if (v.size() == 1) {
- insert(ns, v[0], flags);
- return;
- }
-
- for (vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it) {
- BSONObj obj = *it;
- if (obj["_id"].type() == EOO) {
- string assertMsg = "SyncClusterConnection::insert (batched) obj misses an _id: ";
- uasserted(16743, assertMsg + obj.jsonString());
- }
- }
-
- // fsync all connections before starting the batch.
- string errmsg;
- if (!prepare(errmsg)) {
- string assertMsg = "SyncClusterConnection::insert (batched) prepare failed: ";
- throw UserException(16744, assertMsg + errmsg);
- }
-
- // We still want one getlasterror per document, even if they're batched.
- for (size_t i = 0; i < _conns.size(); i++) {
- for (vector<BSONObj>::const_iterator it = v.begin(); it != v.end(); ++it) {
- _conns[i]->insert(ns, *it, flags);
- _conns[i]->getLastErrorDetailed();
- }
- }
-
- // We issue a final getlasterror, but this time with an fsync.
- _checkLast();
-}
-
-void SyncClusterConnection::remove(const string& ns, Query query, int flags) {
- string errmsg;
- if (!prepare(errmsg))
- throw UserException(8020,
- (string) "SyncClusterConnection::remove prepare failed: " + errmsg);
-
- for (size_t i = 0; i < _conns.size(); i++) {
- _conns[i]->remove(ns, query, flags);
- }
-
- _checkLast();
-}
-
-void SyncClusterConnection::update(const string& ns, Query query, BSONObj obj, int flags) {
- if (flags & UpdateOption_Upsert) {
- uassert(
- 13120, "SyncClusterConnection::update upsert query needs _id", query.obj["_id"].type());
- }
-
- string errmsg;
- if (!prepare(errmsg)) {
- throw UserException(
- 8005, str::stream() << "SyncClusterConnection::update prepare failed: " << errmsg);
- }
-
- for (size_t i = 0; i < _conns.size(); i++) {
- _conns[i]->update(ns, query, obj, flags);
- }
-
- _checkLast();
- invariant(_lastErrors.size() > 1);
-
- const int a = _lastErrors[0]["n"].numberInt();
-
- for (unsigned i = 1; i < _lastErrors.size(); i++) {
- int b = _lastErrors[i]["n"].numberInt();
-
- if (a == b)
- continue;
-
- throw UpdateNotTheSame(8017,
- str::stream() << "update not consistent "
- << " ns: " << ns << " query: " << query.toString()
- << " update: " << obj << " gle1: " << _lastErrors[0]
- << " gle2: " << _lastErrors[i],
- _connAddresses,
- _lastErrors);
- }
-}
-
-string SyncClusterConnection::_toString() const {
- stringstream ss;
- ss << "SyncClusterConnection ";
- ss << " [";
- for (size_t i = 0; i < _conns.size(); i++) {
- if (i != 0)
- ss << ",";
- if (_conns[i]) {
- ss << _conns[i]->toString();
- } else {
- ss << "(no conn)";
- }
- }
- ss << "]";
- return ss.str();
-}
-
-bool SyncClusterConnection::call(Message& toSend,
- Message& response,
- bool assertOk,
- string* actualServer) {
- uassert(8006,
- "SyncClusterConnection::call can only be used directly for dbQuery",
- toSend.operation() == dbQuery);
-
- DbMessage d(toSend);
- uassert(8007, "SyncClusterConnection::call can't handle $cmd", strstr(d.getns(), "$cmd") == 0);
-
- for (size_t i = 0; i < _conns.size(); i++) {
- try {
- bool ok = _conns[i]->call(toSend, response, assertOk, nullptr);
- if (ok) {
- if (actualServer)
- *actualServer = _connAddresses[i];
- return ok;
- }
- log() << "call failed to: " << _conns[i]->toString() << " no data" << endl;
- } catch (...) {
- log() << "call failed to: " << _conns[i]->toString() << " exception" << endl;
- }
- }
- throw UserException(8008, str::stream() << "all servers down/unreachable: " << _address);
-}
-
-void SyncClusterConnection::say(Message& toSend, bool isRetry, string* actualServer) {
- string errmsg;
- if (!prepare(errmsg))
- throw UserException(13397, (string) "SyncClusterConnection::say prepare failed: " + errmsg);
-
- for (size_t i = 0; i < _conns.size(); i++) {
- _conns[i]->say(toSend);
- }
-
- // TODO: should we set actualServer??
-
- _checkLast();
-}
-
-int SyncClusterConnection::_lockType(const string& name) {
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- map<string, int>::iterator i = _lockTypes.find(name);
- if (i != _lockTypes.end())
- return i->second;
- }
-
- BSONObj info;
- uassert(13053,
- str::stream() << "help failed: " << info,
- _commandOnActive("admin",
- BSON(name << "1"
- << "help" << 1),
- info));
-
- int lockType = info["lockType"].numberInt();
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _lockTypes[name] = lockType;
- return lockType;
-}
-
-void SyncClusterConnection::killCursor(long long cursorID) {
- // should never need to do this
- verify(0);
-}
-
-// A SCC should be reused only if all the existing connections haven't been broken in the
-// background.
-// Note: an SCC may have missing connections if a config server is temporarily offline,
-// but reading from the others is still allowed.
-bool SyncClusterConnection::isStillConnected() {
- for (size_t i = 0; i < _conns.size(); i++) {
- if (_conns[i] && !_conns[i]->isStillConnected())
- return false;
- }
- return true;
-}
-
-int SyncClusterConnection::getMinWireVersion() {
- int minVersion = 0;
- for (const auto& host : _conns) {
- minVersion = std::max(minVersion, host->getMinWireVersion());
- }
- return minVersion;
-}
-
-int SyncClusterConnection::getMaxWireVersion() {
- int maxVersion = std::numeric_limits<int>::max();
- for (const auto& host : _conns) {
- maxVersion = std::min(maxVersion, host->getMaxWireVersion());
- }
- return maxVersion;
-}
-
-void SyncClusterConnection::setAllSoTimeouts(double socketTimeout) {
- _socketTimeout = socketTimeout;
- for (size_t i = 0; i < _conns.size(); i++)
-
- if (_conns[i])
- _conns[i]->setSoTimeout(socketTimeout);
-}
-
-void SyncClusterConnection::setRequestMetadataWriter(rpc::RequestMetadataWriter writer) {
- // Set the hooks in both our sub-connections and in ourselves.
- for (size_t i = 0; i < _conns.size(); ++i) {
- if (_conns[i]) {
- _conns[i]->setRequestMetadataWriter(writer);
- }
- }
- DBClientWithCommands::setRequestMetadataWriter(std::move(writer));
-}
-
-void SyncClusterConnection::setReplyMetadataReader(rpc::ReplyMetadataReader reader) {
- // Set the hooks in both our sub-connections and in ourselves.
- for (size_t i = 0; i < _conns.size(); ++i) {
- if (_conns[i]) {
- _conns[i]->setReplyMetadataReader(reader);
- }
- }
- DBClientWithCommands::setReplyMetadataReader(std::move(reader));
-}
-}