diff options
Diffstat (limited to 'cpp/src/qpid/sys/SslPlugin.cpp')
-rw-r--r-- | cpp/src/qpid/sys/SslPlugin.cpp | 192 |
1 files changed, 92 insertions, 100 deletions
diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp index 77cda40056..3b56f9788b 100644 --- a/cpp/src/qpid/sys/SslPlugin.cpp +++ b/cpp/src/qpid/sys/SslPlugin.cpp @@ -22,20 +22,17 @@ #include "qpid/sys/ProtocolFactory.h" #include "qpid/Plugin.h" -#include "qpid/sys/ssl/check.h" -#include "qpid/sys/ssl/util.h" -#include "qpid/sys/ssl/SslHandler.h" +#include "qpid/broker/Broker.h" +#include "qpid/log/Statement.h" #include "qpid/sys/AsynchIOHandler.h" #include "qpid/sys/AsynchIO.h" -#include "qpid/sys/ssl/SslIo.h" +#include "qpid/sys/ssl/util.h" #include "qpid/sys/ssl/SslSocket.h" #include "qpid/sys/SocketAddress.h" -#include "qpid/broker/Broker.h" -#include "qpid/log/Statement.h" +#include "qpid/sys/Poller.h" #include <boost/bind.hpp> -#include <memory> - +#include <boost/ptr_container/ptr_vector.hpp> namespace qpid { namespace sys { @@ -65,38 +62,33 @@ struct SslServerOptions : ssl::SslOptions } }; -template <class T> -class SslProtocolFactoryTmpl : public ProtocolFactory { - private: - +class SslProtocolFactory : public ProtocolFactory { + boost::ptr_vector<Socket> listeners; + boost::ptr_vector<AsynchAcceptor> acceptors; Timer& brokerTimer; uint32_t maxNegotiateTime; + uint16_t listeningPort; const bool tcpNoDelay; - T listener; - const uint16_t listeningPort; - std::auto_ptr<SslAcceptor> acceptor; bool nodict; public: - SslProtocolFactoryTmpl(const std::string& host, const std::string& port, + SslProtocolFactory(const std::string& host, const std::string& port, const SslServerOptions&, int backlog, bool nodelay, Timer& timer, uint32_t maxTime); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, const std::string& port, ConnectionCodec::Factory*, - boost::function2<void, int, std::string> failed); + ConnectFailedCallback); uint16_t getPort() const; private: void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient); + void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback); }; -typedef SslProtocolFactoryTmpl<SslSocket> SslProtocolFactory; -typedef SslProtocolFactoryTmpl<SslMuxSocket> SslMuxProtocolFactory; - // Static instance to initialise plugin static struct SslPlugin : public Plugin { @@ -125,7 +117,7 @@ static struct SslPlugin : public Plugin { } } } - + void initialize(Target& target) { QPID_LOG(trace, "Initialising SSL plugin"); broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); @@ -140,12 +132,7 @@ static struct SslPlugin : public Plugin { const broker::Broker::Options& opts = broker->getOptions(); - ProtocolFactory::shared_ptr protocol(options.multiplex ? - static_cast<ProtocolFactory*>(new SslMuxProtocolFactory("", boost::lexical_cast<std::string>(options.port), - options, - opts.connectionBacklog, - opts.tcpNoDelay, - broker->getTimer(), opts.maxNegotiateTime)) : + ProtocolFactory::shared_ptr protocol( static_cast<ProtocolFactory*>(new SslProtocolFactory("", boost::lexical_cast<std::string>(options.port), options, opts.connectionBacklog, @@ -153,7 +140,7 @@ static struct SslPlugin : public Plugin { broker->getTimer(), opts.maxNegotiateTime))); QPID_LOG(notice, "Listening for " << (options.multiplex ? "SSL or TCP" : "SSL") << - " connections on TCP port " << + " connections on TCP/TCP6 port " << protocol->getPort()); broker->registerProtocolFactory("ssl", protocol); } catch (const std::exception& e) { @@ -164,23 +151,48 @@ static struct SslPlugin : public Plugin { } } sslPlugin; -template <class T> -SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const std::string& host, const std::string& port, +SslProtocolFactory::SslProtocolFactory(const std::string& host, const std::string& port, const SslServerOptions& options, int backlog, bool nodelay, Timer& timer, uint32_t maxTime) : brokerTimer(timer), maxNegotiateTime(maxTime), tcpNoDelay(nodelay), - listener(options.certName, options.clientAuth), - listeningPort(listener.listen(SocketAddress(host, port), backlog)), nodict(options.nodict) -{} +{ + SocketAddress sa(host, port); + + // We must have at least one resolved address + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = options.multiplex ? + new SslMuxSocket(options.certName, options.clientAuth) : + new SslSocket(options.certName, options.clientAuth); + uint16_t lport = s->listen(sa, backlog); + QPID_LOG(debug, "Listened to: " << lport); + listeners.push_back(s); + + listeningPort = lport; + + // Try any other resolved addresses + while (sa.nextAddress()) { + // Hack to ensure that all listening connections are on the same port + sa.setAddrInfoPort(listeningPort); + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = options.multiplex ? + new SslMuxSocket(options.certName, options.clientAuth) : + new SslSocket(options.certName, options.clientAuth); + uint16_t lport = s->listen(sa, backlog); + QPID_LOG(debug, "Listened to: " << lport); + listeners.push_back(s); + } + +} -void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s, - ConnectionCodec::Factory* f, bool isClient, - Timer& timer, uint32_t maxTime, bool tcpNoDelay, bool nodict) { - qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict); + +void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f, bool isClient) { + + AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, nodict); if (tcpNoDelay) { s.setTcpNoDelay(); @@ -191,76 +203,43 @@ void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s, async->setClient(); } - qpid::sys::ssl::SslIO* aio = new qpid::sys::ssl::SslIO(s, - boost::bind(&qpid::sys::ssl::SslHandler::readbuff, async, _1, _2), - boost::bind(&qpid::sys::ssl::SslHandler::eof, async, _1), - boost::bind(&qpid::sys::ssl::SslHandler::disconnect, async, _1), - boost::bind(&qpid::sys::ssl::SslHandler::closedSocket, async, _1, _2), - boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1), - boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1)); + AsynchIO* aio = AsynchIO::create( + s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio,timer, maxTime); + async->init(aio, brokerTimer, maxNegotiateTime); aio->start(poller); } -template <> -void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, bool isClient) { - const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s); - - SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict); -} - -template <class T> -uint16_t SslProtocolFactoryTmpl<T>::getPort() const { +uint16_t SslProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } -template <class T> -void SslProtocolFactoryTmpl<T>::accept(Poller::shared_ptr poller, - ConnectionCodec::Factory* fact) { - acceptor.reset( - new SslAcceptor(listener, - boost::bind(&SslProtocolFactoryTmpl<T>::established, - this, poller, _1, fact, false))); - acceptor->start(poller); -} - -template <> -void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, bool isClient) { - const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s); - - if (sslSock) { - SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict); - return; +void SslProtocolFactory::accept(Poller::shared_ptr poller, + ConnectionCodec::Factory* fact) { + for (unsigned i = 0; i<listeners.size(); ++i) { + acceptors.push_back( + AsynchAcceptor::create(listeners[i], + boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false))); + acceptors[i].start(poller); } +} - AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, false); - - if (tcpNoDelay) { - s.setTcpNoDelay(); - QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); - } - - if (isClient) { - async->setClient(); - } - AsynchIO* aio = AsynchIO::create - (s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - - async->init(aio, brokerTimer, maxNegotiateTime); - aio->start(poller); +void SslProtocolFactory::connectFailed( + const Socket& s, int ec, const std::string& emsg, + ConnectFailedCallback failedCb) +{ + failedCb(ec, emsg); + s.close(); + delete &s; } -template <class T> -void SslProtocolFactoryTmpl<T>::connect( +void SslProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, const std::string& port, ConnectionCodec::Factory* fact, @@ -272,10 +251,23 @@ void SslProtocolFactoryTmpl<T>::connect( // shutdown. The allocated SslConnector frees itself when it // is no longer needed. - qpid::sys::ssl::SslSocket* socket = new qpid::sys::ssl::SslSocket(); - new SslConnector(*socket, poller, host, port, - boost::bind(&SslProtocolFactoryTmpl<T>::established, this, poller, _1, fact, true), - failed); + Socket* socket = new qpid::sys::ssl::SslSocket(); + try { + AsynchConnector* c = AsynchConnector::create( + *socket, + host, + port, + boost::bind(&SslProtocolFactory::established, + this, poller, _1, fact, true), + boost::bind(&SslProtocolFactory::connectFailed, + this, _1, _2, _3, failed)); + c->start(poller); + } catch (std::exception&) { + // TODO: Design question - should we do the error callback and also throw? + int errCode = socket->getError(); + connectFailed(*socket, errCode, strError(errCode), failed); + throw; + } } }} // namespace qpid::sys |