summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/SslPlugin.cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2013-03-01 00:21:12 +0000
committerAndrew Stitcher <astitcher@apache.org>2013-03-01 00:21:12 +0000
commitd039b79e62bdb4f6f9aad9803ba9cef7c053dcdf (patch)
tree59abcc008d50b11f940bb234e2cbf1083159621e /cpp/src/qpid/sys/SslPlugin.cpp
parent7e2379e6c5158ed4771e6d06df698d6b56c92305 (diff)
downloadqpid-python-d039b79e62bdb4f6f9aad9803ba9cef7c053dcdf.tar.gz
QPID-4610: Remove duplicated transport code from C++ broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1451443 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/SslPlugin.cpp')
-rw-r--r--cpp/src/qpid/sys/SslPlugin.cpp222
1 files changed, 28 insertions, 194 deletions
diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp
index a40da24eb8..20ca9256fc 100644
--- a/cpp/src/qpid/sys/SslPlugin.cpp
+++ b/cpp/src/qpid/sys/SslPlugin.cpp
@@ -19,22 +19,17 @@
*
*/
-#include "qpid/sys/ProtocolFactory.h"
+#include "qpid/sys/TransportFactory.h"
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/NameGenerator.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/SocketTransport.h"
#include "qpid/sys/ssl/util.h"
#include "qpid/sys/ssl/SslSocket.h"
-#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/SystemInfo.h"
-#include "qpid/sys/Poller.h"
#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
@@ -64,32 +59,20 @@ struct SslServerOptions : ssl::SslOptions
}
};
-class SslProtocolFactory : public ProtocolFactory {
- boost::ptr_vector<Socket> listeners;
- boost::ptr_vector<AsynchAcceptor> acceptors;
- Timer& brokerTimer;
- uint32_t maxNegotiateTime;
- uint16_t listeningPort;
- const bool tcpNoDelay;
- bool nodict;
-
- public:
- SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
- Timer& timer);
- void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
- ConnectionCodec::Factory*,
- ConnectFailedCallback);
+namespace {
+ Socket* createServerSSLSocket(const SslServerOptions& options) {
+ return new SslSocket(options.certName, options.clientAuth);
+ }
- uint16_t getPort() const;
+ Socket* createServerSSLMuxSocket(const SslServerOptions& options) {
+ return new SslMuxSocket(options.certName, options.clientAuth);
+ }
- private:
- void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
- void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
- void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
- void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
-};
+ Socket* createClientSSLSocket() {
+ return new SslSocket();
+ }
+}
// Static instance to initialise plugin
static struct SslPlugin : public Plugin {
@@ -104,7 +87,7 @@ static struct SslPlugin : public Plugin {
void earlyInitialize(Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker && !options.certDbPath.empty()) {
- const broker::Broker::Options& opts = broker->getOptions();
+ broker::Broker::Options& opts = broker->getOptions();
if (opts.port == options.port && // AMQP & AMQPS ports are the same
opts.port != 0) {
@@ -132,18 +115,25 @@ static struct SslPlugin : public Plugin {
nssInitialized = true;
const broker::Broker::Options& opts = broker->getOptions();
-
- ProtocolFactory::shared_ptr protocol(
- static_cast<ProtocolFactory*>(new SslProtocolFactory(opts, options, broker->getTimer())));
-
- if (protocol->getPort()!=0 ) {
+ TransportAcceptor::shared_ptr ta;
+ SocketAcceptor* sa =
+ new SocketAcceptor(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer());
+ uint16_t port = sa->listen(opts.listenInterfaces, boost::lexical_cast<std::string>(options.port), opts.connectionBacklog,
+ options.multiplex ?
+ boost::bind(&createServerSSLMuxSocket, options) :
+ boost::bind(&createServerSSLSocket, options));
+ if ( port!=0 ) {
+ ta.reset(sa);
QPID_LOG(notice, "Listening for " <<
(options.multiplex ? "SSL or TCP" : "SSL") <<
" connections on TCP/TCP6 port " <<
- protocol->getPort());
+ port);
}
- broker->registerProtocolFactory("ssl", protocol);
+ TransportConnector::shared_ptr tc(
+ new SocketConnector(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer(),
+ &createClientSSLSocket));
+ broker->registerTransport("ssl", ta, tc, port);
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what());
}
@@ -152,160 +142,4 @@ static struct SslPlugin : public Plugin {
}
} sslPlugin;
-namespace {
- // Expand list of Interfaces and addresses to a list of addresses
- std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
- std::vector<std::string> addresses;
- // If there are no specific interfaces listed use a single "" to listen on every interface
- if (interfaces.empty()) {
- addresses.push_back("");
- return addresses;
- }
- for (unsigned i = 0; i < interfaces.size(); ++i) {
- const std::string& interface = interfaces[i];
- if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
- // We don't have an interface of that name -
- // Check for IPv6 ('[' ']') brackets and remove them
- // then pass to be looked up directly
- if (interface[0]=='[' && interface[interface.size()-1]==']') {
- addresses.push_back(interface.substr(1, interface.size()-2));
- } else {
- addresses.push_back(interface);
- }
- }
- }
- return addresses;
- }
-}
-
-SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
- Timer& timer) :
- brokerTimer(timer),
- maxNegotiateTime(opts.maxNegotiateTime),
- tcpNoDelay(opts.tcpNoDelay),
- nodict(options.nodict)
-{
- std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
- if (addresses.empty()) {
- // We specified some interfaces, but couldn't find addresses for them
- QPID_LOG(warning, "SSL: No specified network interfaces found: Not Listening");
- listeningPort = 0;
- }
-
- for (unsigned i = 0; i<addresses.size(); ++i) {
- QPID_LOG(debug, "Using interface: " << addresses[i]);
- SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
-
- // We must have at least one resolved address
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = options.multiplex ?
- new SslMuxSocket(options.certName, options.clientAuth) :
- new SslSocket(options.certName, options.clientAuth);
- uint16_t lport = s->listen(sa, opts.connectionBacklog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
-
- listeningPort = lport;
-
- // Try any other resolved addresses
- while (sa.nextAddress()) {
- // Hack to ensure that all listening connections are on the same port
- sa.setAddrInfoPort(listeningPort);
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = options.multiplex ?
- new SslMuxSocket(options.certName, options.clientAuth) :
- new SslSocket(options.certName, options.clientAuth);
- uint16_t lport = s->listen(sa, opts.connectionBacklog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
- }
- }
-}
-
-void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f) {
- AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
- establishedCommon(async, poller, s);
-}
-
-void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, const std::string& name) {
- AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
- establishedCommon(async, poller, s);
-}
-
-void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
- if (tcpNoDelay) {
- s.setTcpNoDelay();
- QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
- }
-
- AsynchIO* aio = AsynchIO::create(
- s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
-
- async->init(aio, brokerTimer, maxNegotiateTime);
- aio->start(poller);
-}
-
-uint16_t SslProtocolFactory::getPort() const {
- return listeningPort; // Immutable no need for lock.
-}
-
-void SslProtocolFactory::accept(Poller::shared_ptr poller,
- ConnectionCodec::Factory* fact) {
- for (unsigned i = 0; i<listeners.size(); ++i) {
- acceptors.push_back(
- AsynchAcceptor::create(listeners[i],
- boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
- acceptors[i].start(poller);
- }
-}
-
-void SslProtocolFactory::connectFailed(
- const Socket& s, int ec, const std::string& emsg,
- ConnectFailedCallback failedCb)
-{
- failedCb(ec, emsg);
- s.close();
- delete &s;
-}
-
-void SslProtocolFactory::connect(
- Poller::shared_ptr poller,
- const std::string& name,
- const std::string& host, const std::string& port,
- ConnectionCodec::Factory* fact,
- ConnectFailedCallback failed)
-{
- // Note that the following logic does not cause a memory leak.
- // The allocated Socket is freed either by the SslConnector
- // upon connection failure or by the SslIoHandle upon connection
- // shutdown. The allocated SslConnector frees itself when it
- // is no longer needed.
-
- Socket* socket = new qpid::sys::ssl::SslSocket();
- try {
- AsynchConnector* c = AsynchConnector::create(
- *socket,
- host,
- port,
- boost::bind(&SslProtocolFactory::establishedOutgoing,
- this, poller, _1, fact, name),
- boost::bind(&SslProtocolFactory::connectFailed,
- this, _1, _2, _3, failed));
- c->start(poller);
- } catch (std::exception&) {
- // TODO: Design question - should we do the error callback and also throw?
- int errCode = socket->getError();
- connectFailed(*socket, errCode, strError(errCode), failed);
- throw;
- }
-}
-
}} // namespace qpid::sys