// @file syncclusterconnection.h /* * Copyright 2010 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #pragma once #include "mongo/bson/bsonelement.h" #include "mongo/bson/bsonobj.h" #include "mongo/client/dbclientinterface.h" #include "mongo/util/concurrency/mutex.h" namespace mongo { /** * This is a connection to a cluster of servers that operate as one * for super high durability. * * Write operations are two-phase. First, all nodes are asked to fsync. If successful * everywhere, the write is sent everywhere and then followed by an fsync. There is no * rollback if a problem occurs during the second phase. Naturally, with all these fsyncs, * these operations will be quite slow -- use sparingly. * * Read operations are sent to a single random node. * * The class checks if a command is read or write style, and sends to a single * node if a read lock command and to all in two phases with a write style command. */ class SyncClusterConnection : public DBClientBase { public: using DBClientBase::query; using DBClientBase::update; using DBClientBase::remove; class QueryHandler; /** * @param commaSeparated should be 3 hosts comma separated */ SyncClusterConnection( const std::list &, double socketTimeout = 0); SyncClusterConnection( std::string commaSeparated, double socketTimeout = 0); SyncClusterConnection( const std::string& a, const std::string& b, const std::string& c, double socketTimeout = 0 ); ~SyncClusterConnection(); /** * @return true if all servers are up and ready for writes */ bool prepare( std::string& errmsg ); // --- from DBClientInterface virtual BSONObj findOne(const std::string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions); virtual std::unique_ptr query(const std::string &ns, Query query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, int batchSize ); virtual std::unique_ptr getMore( const std::string &ns, long long cursorId, int nToReturn, int options ); virtual void insert( const std::string &ns, BSONObj obj, int flags=0); virtual void insert( const std::string &ns, const std::vector< BSONObj >& v, int flags=0); virtual void remove( const std::string &ns , Query query, int flags ); virtual void update( const std::string &ns , Query query , BSONObj obj , int flags ); virtual bool call( Message &toSend, Message &response, bool assertOk , std::string * actualServer ); virtual void say( Message &toSend, bool isRetry = false , std::string * actualServer = 0 ); virtual void sayPiggyBack( Message &toSend ); virtual void killCursor( long long cursorID ); virtual std::string getServerAddress() const { return _address; } virtual bool isFailed() const { return false; } virtual bool isStillConnected(); virtual std::string toString() const { return _toString(); } virtual BSONObj getLastErrorDetailed(const std::string& db, bool fsync=false, bool j=false, int w=0, int wtimeout=0); virtual BSONObj getLastErrorDetailed(bool fsync=false, bool j=false, int w=0, int wtimeout=0); virtual bool callRead( Message& toSend , Message& response ); virtual ConnectionString::ConnectionType type() const { return ConnectionString::SYNC; } void setAllSoTimeouts( double socketTimeout ); double getSoTimeout() const { return _socketTimeout; } virtual bool lazySupported() const { return false; } // This override of runCommand intentionally calls findOne to construct a legacy // OP_QUERY command. The reason for this is that delicate logic for targeting/locking // config servers is in SyncClusterConnection::findOne, and refactoring that logic // is both risky and of dubious value as we move to config server replica sets (CSRS). bool runCommand(const std::string& dbname, const BSONObj& cmd, BSONObj& info, int options) final; void setRequestMetadataWriter(rpc::RequestMetadataWriter writer) final; void setReplyMetadataReader(rpc::ReplyMetadataReader reader) final; /** * Allow custom query processing through an external (e.g. mongos-only) service. * * Takes ownership of attached handler. */ void attachQueryHandler( QueryHandler* handler ); protected: virtual void _auth(const BSONObj& params); private: SyncClusterConnection( SyncClusterConnection& prev, double socketTimeout = 0 ); std::string _toString() const; bool _commandOnActive(const std::string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); std::unique_ptr _queryOnActive(const std::string &ns, Query query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, int batchSize ); int _lockType( const std::string& name ); void _checkLast(); void _connect( const std::string& host ); std::string _address; std::vector _connAddresses; std::vector _conns; std::vector _lastErrors; // Optionally attached by user std::unique_ptr _customQueryHandler; mongo::mutex _mutex; std::map _lockTypes; // End mutex double _socketTimeout; }; /** * Interface for custom query processing for the SCC. * Allows plugging different host query behaviors for different types of queries. */ class SyncClusterConnection::QueryHandler { public: virtual ~QueryHandler() {}; /** * Returns true if the query can be processed using this handler. */ virtual bool canHandleQuery( const std::string& ns, Query query ) = 0; /** * Returns a cursor on one of the hosts with the desired results for the query. * May throw or return an empty unique_ptr on failure. */ virtual std::unique_ptr handleQuery( const std::vector& hosts, const std::string &ns, Query query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) = 0; }; class UpdateNotTheSame : public UserException { public: UpdateNotTheSame( int code , const std::string& msg , const std::vector& addrs , const std::vector& lastErrors ) : UserException( code , msg ) , _addrs( addrs ) , _lastErrors( lastErrors ) { verify( _addrs.size() == _lastErrors.size() ); } virtual ~UpdateNotTheSame() throw() { } unsigned size() const { return _addrs.size(); } std::pair operator[](unsigned i) const { return std::make_pair( _addrs[i] , _lastErrors[i] ); } private: std::vector _addrs; std::vector _lastErrors; }; };