diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2013-03-01 00:21:12 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2013-03-01 00:21:12 +0000 |
| commit | d039b79e62bdb4f6f9aad9803ba9cef7c053dcdf (patch) | |
| tree | 59abcc008d50b11f940bb234e2cbf1083159621e /cpp/src/qpid/sys/SslPlugin.cpp | |
| parent | 7e2379e6c5158ed4771e6d06df698d6b56c92305 (diff) | |
| download | qpid-python-d039b79e62bdb4f6f9aad9803ba9cef7c053dcdf.tar.gz | |
QPID-4610: Remove duplicated transport code from C++ broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1451443 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/SslPlugin.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/SslPlugin.cpp | 222 |
1 files changed, 28 insertions, 194 deletions
diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp index a40da24eb8..20ca9256fc 100644 --- a/cpp/src/qpid/sys/SslPlugin.cpp +++ b/cpp/src/qpid/sys/SslPlugin.cpp @@ -19,22 +19,17 @@ * */ -#include "qpid/sys/ProtocolFactory.h" +#include "qpid/sys/TransportFactory.h" #include "qpid/Plugin.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/NameGenerator.h" #include "qpid/log/Statement.h" -#include "qpid/sys/AsynchIOHandler.h" #include "qpid/sys/AsynchIO.h" +#include "qpid/sys/SocketTransport.h" #include "qpid/sys/ssl/util.h" #include "qpid/sys/ssl/SslSocket.h" -#include "qpid/sys/SocketAddress.h" -#include "qpid/sys/SystemInfo.h" -#include "qpid/sys/Poller.h" #include <boost/bind.hpp> -#include <boost/ptr_container/ptr_vector.hpp> namespace qpid { namespace sys { @@ -64,32 +59,20 @@ struct SslServerOptions : ssl::SslOptions } }; -class SslProtocolFactory : public ProtocolFactory { - boost::ptr_vector<Socket> listeners; - boost::ptr_vector<AsynchAcceptor> acceptors; - Timer& brokerTimer; - uint32_t maxNegotiateTime; - uint16_t listeningPort; - const bool tcpNoDelay; - bool nodict; - - public: - SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, - Timer& timer); - void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port, - ConnectionCodec::Factory*, - ConnectFailedCallback); +namespace { + Socket* createServerSSLSocket(const SslServerOptions& options) { + return new SslSocket(options.certName, options.clientAuth); + } - uint16_t getPort() const; + Socket* createServerSSLMuxSocket(const SslServerOptions& options) { + return new SslMuxSocket(options.certName, options.clientAuth); + } - private: - void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); - void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&); - void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&); - void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback); -}; + Socket* createClientSSLSocket() { + return new SslSocket(); + } +} // Static instance to initialise plugin static struct SslPlugin : public Plugin { @@ -104,7 +87,7 @@ static struct SslPlugin : public Plugin { void earlyInitialize(Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); if (broker && !options.certDbPath.empty()) { - const broker::Broker::Options& opts = broker->getOptions(); + broker::Broker::Options& opts = broker->getOptions(); if (opts.port == options.port && // AMQP & AMQPS ports are the same opts.port != 0) { @@ -132,18 +115,25 @@ static struct SslPlugin : public Plugin { nssInitialized = true; const broker::Broker::Options& opts = broker->getOptions(); - - ProtocolFactory::shared_ptr protocol( - static_cast<ProtocolFactory*>(new SslProtocolFactory(opts, options, broker->getTimer()))); - - if (protocol->getPort()!=0 ) { + TransportAcceptor::shared_ptr ta; + SocketAcceptor* sa = + new SocketAcceptor(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer()); + uint16_t port = sa->listen(opts.listenInterfaces, boost::lexical_cast<std::string>(options.port), opts.connectionBacklog, + options.multiplex ? + boost::bind(&createServerSSLMuxSocket, options) : + boost::bind(&createServerSSLSocket, options)); + if ( port!=0 ) { + ta.reset(sa); QPID_LOG(notice, "Listening for " << (options.multiplex ? "SSL or TCP" : "SSL") << " connections on TCP/TCP6 port " << - protocol->getPort()); + port); } - broker->registerProtocolFactory("ssl", protocol); + TransportConnector::shared_ptr tc( + new SocketConnector(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer(), + &createClientSSLSocket)); + broker->registerTransport("ssl", ta, tc, port); } catch (const std::exception& e) { QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what()); } @@ -152,160 +142,4 @@ static struct SslPlugin : public Plugin { } } sslPlugin; -namespace { - // Expand list of Interfaces and addresses to a list of addresses - std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) { - std::vector<std::string> addresses; - // If there are no specific interfaces listed use a single "" to listen on every interface - if (interfaces.empty()) { - addresses.push_back(""); - return addresses; - } - for (unsigned i = 0; i < interfaces.size(); ++i) { - const std::string& interface = interfaces[i]; - if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) { - // We don't have an interface of that name - - // Check for IPv6 ('[' ']') brackets and remove them - // then pass to be looked up directly - if (interface[0]=='[' && interface[interface.size()-1]==']') { - addresses.push_back(interface.substr(1, interface.size()-2)); - } else { - addresses.push_back(interface); - } - } - } - return addresses; - } -} - -SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, - Timer& timer) : - brokerTimer(timer), - maxNegotiateTime(opts.maxNegotiateTime), - tcpNoDelay(opts.tcpNoDelay), - nodict(options.nodict) -{ - std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces); - if (addresses.empty()) { - // We specified some interfaces, but couldn't find addresses for them - QPID_LOG(warning, "SSL: No specified network interfaces found: Not Listening"); - listeningPort = 0; - } - - for (unsigned i = 0; i<addresses.size(); ++i) { - QPID_LOG(debug, "Using interface: " << addresses[i]); - SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port)); - - // We must have at least one resolved address - QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = options.multiplex ? - new SslMuxSocket(options.certName, options.clientAuth) : - new SslSocket(options.certName, options.clientAuth); - uint16_t lport = s->listen(sa, opts.connectionBacklog); - QPID_LOG(debug, "Listened to: " << lport); - listeners.push_back(s); - - listeningPort = lport; - - // Try any other resolved addresses - while (sa.nextAddress()) { - // Hack to ensure that all listening connections are on the same port - sa.setAddrInfoPort(listeningPort); - QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = options.multiplex ? - new SslMuxSocket(options.certName, options.clientAuth) : - new SslSocket(options.certName, options.clientAuth); - uint16_t lport = s->listen(sa, opts.connectionBacklog); - QPID_LOG(debug, "Listened to: " << lport); - listeners.push_back(s); - } - } -} - -void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f) { - AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false); - establishedCommon(async, poller, s); -} - -void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, const std::string& name) { - AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false); - establishedCommon(async, poller, s); -} - -void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) { - if (tcpNoDelay) { - s.setTcpNoDelay(); - QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); - } - - AsynchIO* aio = AsynchIO::create( - s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - - async->init(aio, brokerTimer, maxNegotiateTime); - aio->start(poller); -} - -uint16_t SslProtocolFactory::getPort() const { - return listeningPort; // Immutable no need for lock. -} - -void SslProtocolFactory::accept(Poller::shared_ptr poller, - ConnectionCodec::Factory* fact) { - for (unsigned i = 0; i<listeners.size(); ++i) { - acceptors.push_back( - AsynchAcceptor::create(listeners[i], - boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact))); - acceptors[i].start(poller); - } -} - -void SslProtocolFactory::connectFailed( - const Socket& s, int ec, const std::string& emsg, - ConnectFailedCallback failedCb) -{ - failedCb(ec, emsg); - s.close(); - delete &s; -} - -void SslProtocolFactory::connect( - Poller::shared_ptr poller, - const std::string& name, - const std::string& host, const std::string& port, - ConnectionCodec::Factory* fact, - ConnectFailedCallback failed) -{ - // Note that the following logic does not cause a memory leak. - // The allocated Socket is freed either by the SslConnector - // upon connection failure or by the SslIoHandle upon connection - // shutdown. The allocated SslConnector frees itself when it - // is no longer needed. - - Socket* socket = new qpid::sys::ssl::SslSocket(); - try { - AsynchConnector* c = AsynchConnector::create( - *socket, - host, - port, - boost::bind(&SslProtocolFactory::establishedOutgoing, - this, poller, _1, fact, name), - boost::bind(&SslProtocolFactory::connectFailed, - this, _1, _2, _3, failed)); - c->start(poller); - } catch (std::exception&) { - // TODO: Design question - should we do the error callback and also throw? - int errCode = socket->getError(); - connectFailed(*socket, errCode, strError(errCode), failed); - throw; - } -} - }} // namespace qpid::sys |
