summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/SslPlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/SslPlugin.cpp')
-rw-r--r--cpp/src/qpid/sys/SslPlugin.cpp289
1 files changed, 153 insertions, 136 deletions
diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp
index 069e97758e..a40da24eb8 100644
--- a/cpp/src/qpid/sys/SslPlugin.cpp
+++ b/cpp/src/qpid/sys/SslPlugin.cpp
@@ -22,19 +22,19 @@
#include "qpid/sys/ProtocolFactory.h"
#include "qpid/Plugin.h"
-#include "qpid/sys/ssl/check.h"
-#include "qpid/sys/ssl/util.h"
-#include "qpid/sys/ssl/SslHandler.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
+#include "qpid/log/Statement.h"
#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
-#include "qpid/sys/ssl/SslIo.h"
+#include "qpid/sys/ssl/util.h"
#include "qpid/sys/ssl/SslSocket.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/log/Statement.h"
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/SystemInfo.h"
+#include "qpid/sys/Poller.h"
#include <boost/bind.hpp>
-#include <memory>
-
+#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
@@ -64,38 +64,32 @@ struct SslServerOptions : ssl::SslOptions
}
};
-template <class T>
-class SslProtocolFactoryTmpl : public ProtocolFactory {
- private:
-
- typedef SslAcceptorTmpl<T> SslAcceptor;
-
+class SslProtocolFactory : public ProtocolFactory {
+ boost::ptr_vector<Socket> listeners;
+ boost::ptr_vector<AsynchAcceptor> acceptors;
Timer& brokerTimer;
uint32_t maxNegotiateTime;
+ uint16_t listeningPort;
const bool tcpNoDelay;
- T listener;
- const uint16_t listeningPort;
- std::auto_ptr<SslAcceptor> acceptor;
bool nodict;
public:
- SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay, Timer& timer, uint32_t maxTime);
+ SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
+ Timer& timer);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
+ void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
- boost::function2<void, int, std::string> failed);
+ ConnectFailedCallback);
uint16_t getPort() const;
- bool supports(const std::string& capability);
private:
- void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
- bool isClient);
+ void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
+ void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
+ void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
+ void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
};
-typedef SslProtocolFactoryTmpl<SslSocket> SslProtocolFactory;
-typedef SslProtocolFactoryTmpl<SslMuxSocket> SslMuxProtocolFactory;
-
// Static instance to initialise plugin
static struct SslPlugin : public Plugin {
@@ -124,7 +118,7 @@ static struct SslPlugin : public Plugin {
}
}
}
-
+
void initialize(Target& target) {
QPID_LOG(trace, "Initialising SSL plugin");
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
@@ -139,19 +133,16 @@ static struct SslPlugin : public Plugin {
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(options.multiplex ?
- static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options,
- opts.connectionBacklog,
- opts.tcpNoDelay,
- broker->getTimer(), opts.maxNegotiateTime)) :
- static_cast<ProtocolFactory*>(new SslProtocolFactory(options,
- opts.connectionBacklog,
- opts.tcpNoDelay,
- broker->getTimer(), opts.maxNegotiateTime)));
- QPID_LOG(notice, "Listening for " <<
- (options.multiplex ? "SSL or TCP" : "SSL") <<
- " connections on TCP port " <<
- protocol->getPort());
+ ProtocolFactory::shared_ptr protocol(
+ static_cast<ProtocolFactory*>(new SslProtocolFactory(opts, options, broker->getTimer())));
+
+ if (protocol->getPort()!=0 ) {
+ QPID_LOG(notice, "Listening for " <<
+ (options.multiplex ? "SSL or TCP" : "SSL") <<
+ " connections on TCP/TCP6 port " <<
+ protocol->getPort());
+ }
+
broker->registerProtocolFactory("ssl", protocol);
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what());
@@ -161,99 +152,133 @@ static struct SslPlugin : public Plugin {
}
} sslPlugin;
-template <class T>
-SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay, Timer& timer, uint32_t maxTime) :
+namespace {
+ // Expand list of Interfaces and addresses to a list of addresses
+ std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
+ std::vector<std::string> addresses;
+ // If there are no specific interfaces listed use a single "" to listen on every interface
+ if (interfaces.empty()) {
+ addresses.push_back("");
+ return addresses;
+ }
+ for (unsigned i = 0; i < interfaces.size(); ++i) {
+ const std::string& interface = interfaces[i];
+ if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
+ // We don't have an interface of that name -
+ // Check for IPv6 ('[' ']') brackets and remove them
+ // then pass to be looked up directly
+ if (interface[0]=='[' && interface[interface.size()-1]==']') {
+ addresses.push_back(interface.substr(1, interface.size()-2));
+ } else {
+ addresses.push_back(interface);
+ }
+ }
+ }
+ return addresses;
+ }
+}
+
+SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
+ Timer& timer) :
brokerTimer(timer),
- maxNegotiateTime(maxTime),
- tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)),
+ maxNegotiateTime(opts.maxNegotiateTime),
+ tcpNoDelay(opts.tcpNoDelay),
nodict(options.nodict)
-{}
-
-void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
- ConnectionCodec::Factory* f, bool isClient,
- Timer& timer, uint32_t maxTime, bool tcpNoDelay, bool nodict) {
- qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
-
- if (tcpNoDelay) {
- s.setTcpNoDelay(tcpNoDelay);
- QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+{
+ std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
+ if (addresses.empty()) {
+ // We specified some interfaces, but couldn't find addresses for them
+ QPID_LOG(warning, "SSL: No specified network interfaces found: Not Listening");
+ listeningPort = 0;
}
- if (isClient) {
- async->setClient();
+ for (unsigned i = 0; i<addresses.size(); ++i) {
+ QPID_LOG(debug, "Using interface: " << addresses[i]);
+ SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
+
+ // We must have at least one resolved address
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = options.multiplex ?
+ new SslMuxSocket(options.certName, options.clientAuth) :
+ new SslSocket(options.certName, options.clientAuth);
+ uint16_t lport = s->listen(sa, opts.connectionBacklog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+
+ listeningPort = lport;
+
+ // Try any other resolved addresses
+ while (sa.nextAddress()) {
+ // Hack to ensure that all listening connections are on the same port
+ sa.setAddrInfoPort(listeningPort);
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = options.multiplex ?
+ new SslMuxSocket(options.certName, options.clientAuth) :
+ new SslSocket(options.certName, options.clientAuth);
+ uint16_t lport = s->listen(sa, opts.connectionBacklog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+ }
}
-
- qpid::sys::ssl::SslIO* aio = new qpid::sys::ssl::SslIO(s,
- boost::bind(&qpid::sys::ssl::SslHandler::readbuff, async, _1, _2),
- boost::bind(&qpid::sys::ssl::SslHandler::eof, async, _1),
- boost::bind(&qpid::sys::ssl::SslHandler::disconnect, async, _1),
- boost::bind(&qpid::sys::ssl::SslHandler::closedSocket, async, _1, _2),
- boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1),
- boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1));
-
- async->init(aio,timer, maxTime);
- aio->start(poller);
-}
-
-template <>
-void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient) {
- const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
-
- SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict);
}
-template <class T>
-uint16_t SslProtocolFactoryTmpl<T>::getPort() const {
- return listeningPort; // Immutable no need for lock.
+void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f) {
+ AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
+ establishedCommon(async, poller, s);
}
-template <class T>
-void SslProtocolFactoryTmpl<T>::accept(Poller::shared_ptr poller,
- ConnectionCodec::Factory* fact) {
- acceptor.reset(
- new SslAcceptor(listener,
- boost::bind(&SslProtocolFactoryTmpl<T>::established,
- this, poller, _1, fact, false)));
- acceptor->start(poller);
+void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, const std::string& name) {
+ AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
+ establishedCommon(async, poller, s);
}
-template <>
-void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient) {
- const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
-
- if (sslSock) {
- SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict);
- return;
- }
-
- AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f);
-
+void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
if (tcpNoDelay) {
s.setTcpNoDelay();
QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
- if (isClient) {
- async->setClient();
- }
- AsynchIO* aio = AsynchIO::create
- (s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
+ AsynchIO* aio = AsynchIO::create(
+ s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
async->init(aio, brokerTimer, maxNegotiateTime);
aio->start(poller);
}
-template <class T>
-void SslProtocolFactoryTmpl<T>::connect(
+uint16_t SslProtocolFactory::getPort() const {
+ return listeningPort; // Immutable no need for lock.
+}
+
+void SslProtocolFactory::accept(Poller::shared_ptr poller,
+ ConnectionCodec::Factory* fact) {
+ for (unsigned i = 0; i<listeners.size(); ++i) {
+ acceptors.push_back(
+ AsynchAcceptor::create(listeners[i],
+ boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
+ acceptors[i].start(poller);
+ }
+}
+
+void SslProtocolFactory::connectFailed(
+ const Socket& s, int ec, const std::string& emsg,
+ ConnectFailedCallback failedCb)
+{
+ failedCb(ec, emsg);
+ s.close();
+ delete &s;
+}
+
+void SslProtocolFactory::connect(
Poller::shared_ptr poller,
+ const std::string& name,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
@@ -264,31 +289,23 @@ void SslProtocolFactoryTmpl<T>::connect(
// shutdown. The allocated SslConnector frees itself when it
// is no longer needed.
- qpid::sys::ssl::SslSocket* socket = new qpid::sys::ssl::SslSocket();
- new SslConnector(*socket, poller, host, port,
- boost::bind(&SslProtocolFactoryTmpl<T>::established, this, poller, _1, fact, true),
- failed);
-}
-
-namespace
-{
-const std::string SSL = "ssl";
-}
-
-template <>
-bool SslProtocolFactory::supports(const std::string& capability)
-{
- std::string s = capability;
- transform(s.begin(), s.end(), s.begin(), tolower);
- return s == SSL;
-}
-
-template <>
-bool SslMuxProtocolFactory::supports(const std::string& capability)
-{
- std::string s = capability;
- transform(s.begin(), s.end(), s.begin(), tolower);
- return s == SSL || s == "tcp";
+ Socket* socket = new qpid::sys::ssl::SslSocket();
+ try {
+ AsynchConnector* c = AsynchConnector::create(
+ *socket,
+ host,
+ port,
+ boost::bind(&SslProtocolFactory::establishedOutgoing,
+ this, poller, _1, fact, name),
+ boost::bind(&SslProtocolFactory::connectFailed,
+ this, _1, _2, _3, failed));
+ c->start(poller);
+ } catch (std::exception&) {
+ // TODO: Design question - should we do the error callback and also throw?
+ int errCode = socket->getError();
+ connectFailed(*socket, errCode, strError(errCode), failed);
+ throw;
+ }
}
}} // namespace qpid::sys