summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/SslPlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/SslPlugin.cpp')
-rw-r--r--cpp/src/qpid/sys/SslPlugin.cpp192
1 files changed, 92 insertions, 100 deletions
diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp
index 77cda40056..3b56f9788b 100644
--- a/cpp/src/qpid/sys/SslPlugin.cpp
+++ b/cpp/src/qpid/sys/SslPlugin.cpp
@@ -22,20 +22,17 @@
#include "qpid/sys/ProtocolFactory.h"
#include "qpid/Plugin.h"
-#include "qpid/sys/ssl/check.h"
-#include "qpid/sys/ssl/util.h"
-#include "qpid/sys/ssl/SslHandler.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
-#include "qpid/sys/ssl/SslIo.h"
+#include "qpid/sys/ssl/util.h"
#include "qpid/sys/ssl/SslSocket.h"
#include "qpid/sys/SocketAddress.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/log/Statement.h"
+#include "qpid/sys/Poller.h"
#include <boost/bind.hpp>
-#include <memory>
-
+#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
@@ -65,38 +62,33 @@ struct SslServerOptions : ssl::SslOptions
}
};
-template <class T>
-class SslProtocolFactoryTmpl : public ProtocolFactory {
- private:
-
+class SslProtocolFactory : public ProtocolFactory {
+ boost::ptr_vector<Socket> listeners;
+ boost::ptr_vector<AsynchAcceptor> acceptors;
Timer& brokerTimer;
uint32_t maxNegotiateTime;
+ uint16_t listeningPort;
const bool tcpNoDelay;
- T listener;
- const uint16_t listeningPort;
- std::auto_ptr<SslAcceptor> acceptor;
bool nodict;
public:
- SslProtocolFactoryTmpl(const std::string& host, const std::string& port,
+ SslProtocolFactory(const std::string& host, const std::string& port,
const SslServerOptions&,
int backlog, bool nodelay,
Timer& timer, uint32_t maxTime);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
- boost::function2<void, int, std::string> failed);
+ ConnectFailedCallback);
uint16_t getPort() const;
private:
void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
bool isClient);
+ void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
};
-typedef SslProtocolFactoryTmpl<SslSocket> SslProtocolFactory;
-typedef SslProtocolFactoryTmpl<SslMuxSocket> SslMuxProtocolFactory;
-
// Static instance to initialise plugin
static struct SslPlugin : public Plugin {
@@ -125,7 +117,7 @@ static struct SslPlugin : public Plugin {
}
}
}
-
+
void initialize(Target& target) {
QPID_LOG(trace, "Initialising SSL plugin");
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
@@ -140,12 +132,7 @@ static struct SslPlugin : public Plugin {
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(options.multiplex ?
- static_cast<ProtocolFactory*>(new SslMuxProtocolFactory("", boost::lexical_cast<std::string>(options.port),
- options,
- opts.connectionBacklog,
- opts.tcpNoDelay,
- broker->getTimer(), opts.maxNegotiateTime)) :
+ ProtocolFactory::shared_ptr protocol(
static_cast<ProtocolFactory*>(new SslProtocolFactory("", boost::lexical_cast<std::string>(options.port),
options,
opts.connectionBacklog,
@@ -153,7 +140,7 @@ static struct SslPlugin : public Plugin {
broker->getTimer(), opts.maxNegotiateTime)));
QPID_LOG(notice, "Listening for " <<
(options.multiplex ? "SSL or TCP" : "SSL") <<
- " connections on TCP port " <<
+ " connections on TCP/TCP6 port " <<
protocol->getPort());
broker->registerProtocolFactory("ssl", protocol);
} catch (const std::exception& e) {
@@ -164,23 +151,48 @@ static struct SslPlugin : public Plugin {
}
} sslPlugin;
-template <class T>
-SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const std::string& host, const std::string& port,
+SslProtocolFactory::SslProtocolFactory(const std::string& host, const std::string& port,
const SslServerOptions& options,
int backlog, bool nodelay,
Timer& timer, uint32_t maxTime) :
brokerTimer(timer),
maxNegotiateTime(maxTime),
tcpNoDelay(nodelay),
- listener(options.certName, options.clientAuth),
- listeningPort(listener.listen(SocketAddress(host, port), backlog)),
nodict(options.nodict)
-{}
+{
+ SocketAddress sa(host, port);
+
+ // We must have at least one resolved address
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = options.multiplex ?
+ new SslMuxSocket(options.certName, options.clientAuth) :
+ new SslSocket(options.certName, options.clientAuth);
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+
+ listeningPort = lport;
+
+ // Try any other resolved addresses
+ while (sa.nextAddress()) {
+ // Hack to ensure that all listening connections are on the same port
+ sa.setAddrInfoPort(listeningPort);
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = options.multiplex ?
+ new SslMuxSocket(options.certName, options.clientAuth) :
+ new SslSocket(options.certName, options.clientAuth);
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+ }
+
+}
-void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
- ConnectionCodec::Factory* f, bool isClient,
- Timer& timer, uint32_t maxTime, bool tcpNoDelay, bool nodict) {
- qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
+
+void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, bool isClient) {
+
+ AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, nodict);
if (tcpNoDelay) {
s.setTcpNoDelay();
@@ -191,76 +203,43 @@ void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
async->setClient();
}
- qpid::sys::ssl::SslIO* aio = new qpid::sys::ssl::SslIO(s,
- boost::bind(&qpid::sys::ssl::SslHandler::readbuff, async, _1, _2),
- boost::bind(&qpid::sys::ssl::SslHandler::eof, async, _1),
- boost::bind(&qpid::sys::ssl::SslHandler::disconnect, async, _1),
- boost::bind(&qpid::sys::ssl::SslHandler::closedSocket, async, _1, _2),
- boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1),
- boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1));
+ AsynchIO* aio = AsynchIO::create(
+ s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio,timer, maxTime);
+ async->init(aio, brokerTimer, maxNegotiateTime);
aio->start(poller);
}
-template <>
-void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient) {
- const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
-
- SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict);
-}
-
-template <class T>
-uint16_t SslProtocolFactoryTmpl<T>::getPort() const {
+uint16_t SslProtocolFactory::getPort() const {
return listeningPort; // Immutable no need for lock.
}
-template <class T>
-void SslProtocolFactoryTmpl<T>::accept(Poller::shared_ptr poller,
- ConnectionCodec::Factory* fact) {
- acceptor.reset(
- new SslAcceptor(listener,
- boost::bind(&SslProtocolFactoryTmpl<T>::established,
- this, poller, _1, fact, false)));
- acceptor->start(poller);
-}
-
-template <>
-void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient) {
- const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
-
- if (sslSock) {
- SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict);
- return;
+void SslProtocolFactory::accept(Poller::shared_ptr poller,
+ ConnectionCodec::Factory* fact) {
+ for (unsigned i = 0; i<listeners.size(); ++i) {
+ acceptors.push_back(
+ AsynchAcceptor::create(listeners[i],
+ boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
+ acceptors[i].start(poller);
}
+}
- AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, false);
-
- if (tcpNoDelay) {
- s.setTcpNoDelay();
- QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
- }
-
- if (isClient) {
- async->setClient();
- }
- AsynchIO* aio = AsynchIO::create
- (s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
-
- async->init(aio, brokerTimer, maxNegotiateTime);
- aio->start(poller);
+void SslProtocolFactory::connectFailed(
+ const Socket& s, int ec, const std::string& emsg,
+ ConnectFailedCallback failedCb)
+{
+ failedCb(ec, emsg);
+ s.close();
+ delete &s;
}
-template <class T>
-void SslProtocolFactoryTmpl<T>::connect(
+void SslProtocolFactory::connect(
Poller::shared_ptr poller,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
@@ -272,10 +251,23 @@ void SslProtocolFactoryTmpl<T>::connect(
// shutdown. The allocated SslConnector frees itself when it
// is no longer needed.
- qpid::sys::ssl::SslSocket* socket = new qpid::sys::ssl::SslSocket();
- new SslConnector(*socket, poller, host, port,
- boost::bind(&SslProtocolFactoryTmpl<T>::established, this, poller, _1, fact, true),
- failed);
+ Socket* socket = new qpid::sys::ssl::SslSocket();
+ try {
+ AsynchConnector* c = AsynchConnector::create(
+ *socket,
+ host,
+ port,
+ boost::bind(&SslProtocolFactory::established,
+ this, poller, _1, fact, true),
+ boost::bind(&SslProtocolFactory::connectFailed,
+ this, _1, _2, _3, failed));
+ c->start(poller);
+ } catch (std::exception&) {
+ // TODO: Design question - should we do the error callback and also throw?
+ int errCode = socket->getError();
+ connectFailed(*socket, errCode, strError(errCode), failed);
+ throw;
+ }
}
}} // namespace qpid::sys