summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2013-03-01 00:21:12 +0000
committerAndrew Stitcher <astitcher@apache.org>2013-03-01 00:21:12 +0000
commitd039b79e62bdb4f6f9aad9803ba9cef7c053dcdf (patch)
tree59abcc008d50b11f940bb234e2cbf1083159621e /cpp/src/qpid/sys
parent7e2379e6c5158ed4771e6d06df698d6b56c92305 (diff)
downloadqpid-python-d039b79e62bdb4f6f9aad9803ba9cef7c053dcdf.tar.gz
QPID-4610: Remove duplicated transport code from C++ broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1451443 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/RdmaIOPlugin.cpp11
-rw-r--r--cpp/src/qpid/sys/SocketTransport.cpp209
-rw-r--r--cpp/src/qpid/sys/SocketTransport.h91
-rw-r--r--cpp/src/qpid/sys/SslPlugin.cpp222
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp208
-rw-r--r--cpp/src/qpid/sys/TransportFactory.h (renamed from cpp/src/qpid/sys/ProtocolFactory.h)34
-rw-r--r--cpp/src/qpid/sys/windows/WinSocket.cpp4
7 files changed, 370 insertions, 409 deletions
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
index 51cc0ed109..8655a8baa3 100644
--- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp
+++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "qpid/sys/ProtocolFactory.h"
+#include "qpid/sys/TransportFactory.h"
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
@@ -239,7 +239,7 @@ void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) {
}
}
-class RdmaIOProtocolFactory : public ProtocolFactory {
+class RdmaIOProtocolFactory : public TransportAcceptor, public TransportConnector {
auto_ptr<Rdma::Listener> listener;
const uint16_t listeningPort;
@@ -275,9 +275,10 @@ static class RdmaIOPlugin : public Plugin {
// Only provide to a Broker
if (broker) {
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog));
- QPID_LOG(notice, "Rdma: Listening on RDMA port " << protocol->getPort());
- broker->registerProtocolFactory("rdma", protocol);
+ boost::shared_ptr<RdmaIOProtocolFactory> protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog));
+ uint16_t port = protocol->getPort();
+ QPID_LOG(notice, "Rdma: Listening on RDMA port " << port);
+ broker->registerTransport("rdma", protocol, protocol, port);
}
}
} rdmaPlugin;
diff --git a/cpp/src/qpid/sys/SocketTransport.cpp b/cpp/src/qpid/sys/SocketTransport.cpp
new file mode 100644
index 0000000000..091851713e
--- /dev/null
+++ b/cpp/src/qpid/sys/SocketTransport.cpp
@@ -0,0 +1,209 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/SocketTransport.h"
+
+#include "qpid/broker/NameGenerator.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/AsynchIOHandler.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/SystemInfo.h"
+
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace sys {
+
+namespace {
+ void establishedCommon(
+ AsynchIOHandler* async,
+ boost::shared_ptr<Poller> poller, const SocketTransportOptions& opts, Timer* timer,
+ const Socket& s)
+ {
+ if (opts.tcpNoDelay) {
+ s.setTcpNoDelay();
+ QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+ }
+
+ AsynchIO* aio = AsynchIO::create
+ (s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+
+ async->init(aio, *timer, opts.maxNegotiateTime);
+ aio->start(poller);
+ }
+
+ void establishedIncoming(
+ boost::shared_ptr<Poller> poller, const SocketTransportOptions& opts, Timer* timer,
+ const Socket& s, ConnectionCodec::Factory* f)
+ {
+ AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, opts.nodict);
+ establishedCommon(async, poller, opts, timer, s);
+ }
+
+ void establishedOutgoing(
+ boost::shared_ptr<Poller> poller, const SocketTransportOptions& opts, Timer* timer,
+ const Socket& s, ConnectionCodec::Factory* f, const std::string& name)
+ {
+ AsynchIOHandler* async = new AsynchIOHandler(name, f, true, opts.nodict);
+ establishedCommon(async, poller, opts, timer, s);
+ }
+
+ void connectFailed(
+ const Socket& s, int ec, const std::string& emsg,
+ SocketConnector::ConnectFailedCallback failedCb)
+ {
+ failedCb(ec, emsg);
+ s.close();
+ delete &s;
+ }
+
+ // Expand list of Interfaces and addresses to a list of addresses
+ std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
+ std::vector<std::string> addresses;
+ // If there are no specific interfaces listed use a single "" to listen on every interface
+ if (interfaces.empty()) {
+ addresses.push_back("");
+ return addresses;
+ }
+ for (unsigned i = 0; i < interfaces.size(); ++i) {
+ const std::string& interface = interfaces[i];
+ if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
+ // We don't have an interface of that name -
+ // Check for IPv6 ('[' ']') brackets and remove them
+ // then pass to be looked up directly
+ if (interface[0]=='[' && interface[interface.size()-1]==']') {
+ addresses.push_back(interface.substr(1, interface.size()-2));
+ } else {
+ addresses.push_back(interface);
+ }
+ }
+ }
+ return addresses;
+ }
+}
+
+SocketAcceptor::SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer0) :
+ timer(timer0),
+ options(tcpNoDelay, nodict, maxNegotiateTime),
+ established(boost::bind(&establishedIncoming, _1, options, &timer, _2, _3))
+{}
+
+SocketAcceptor::SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer0, const EstablishedCallback& established0) :
+ timer(timer0),
+ options(tcpNoDelay, nodict, maxNegotiateTime),
+ established(established0)
+{}
+
+void SocketAcceptor::addListener(Socket* socket)
+{
+ listeners.push_back(socket);
+}
+
+uint16_t SocketAcceptor::listen(const std::vector<std::string>& interfaces, const std::string& port, int backlog, const SocketFactory& factory)
+{
+ std::vector<std::string> addresses = expandInterfaces(interfaces);
+ if (addresses.empty()) {
+ // We specified some interfaces, but couldn't find addresses for them
+ QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
+ return 0;
+ }
+
+ int listeningPort = 0;
+ for (unsigned i = 0; i<addresses.size(); ++i) {
+ QPID_LOG(debug, "Using interface: " << addresses[i]);
+ SocketAddress sa(addresses[i], port);
+
+ // We must have at least one resolved address
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = factory();
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ addListener(s);
+
+ listeningPort = lport;
+
+ // Try any other resolved addresses
+ while (sa.nextAddress()) {
+ // Hack to ensure that all listening connections are on the same port
+ sa.setAddrInfoPort(listeningPort);
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = factory();
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ addListener(s);
+ }
+ }
+ return listeningPort;
+}
+
+void SocketAcceptor::accept(boost::shared_ptr<Poller> poller, ConnectionCodec::Factory* f)
+{
+ for (unsigned i = 0; i<listeners.size(); ++i) {
+ acceptors.push_back(
+ AsynchAcceptor::create(listeners[i], boost::bind(established, poller, _1, f)));
+ acceptors[i].start(poller);
+ }
+}
+
+SocketConnector::SocketConnector(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer0, const SocketFactory& factory0) :
+ timer(timer0),
+ factory(factory0),
+ options(tcpNoDelay, nodict, maxNegotiateTime)
+{}
+
+void SocketConnector::connect(
+ boost::shared_ptr<Poller> poller,
+ const std::string& name,
+ const std::string& host, const std::string& port,
+ ConnectionCodec::Factory* fact,
+ ConnectFailedCallback failed)
+{
+ // Note that the following logic does not cause a memory leak.
+ // The allocated Socket is freed either by the AsynchConnector
+ // upon connection failure or by the AsynchIO upon connection
+ // shutdown. The allocated AsynchConnector frees itself when it
+ // is no longer needed.
+ Socket* socket = factory();
+ try {
+ AsynchConnector* c = AsynchConnector::create(
+ *socket,
+ host,
+ port,
+ boost::bind(&establishedOutgoing, poller, options, &timer, _1, fact, name),
+ boost::bind(&connectFailed, _1, _2, _3, failed));
+ c->start(poller);
+ } catch (std::exception&) {
+ // TODO: Design question - should we do the error callback and also throw?
+ int errCode = socket->getError();
+ connectFailed(*socket, errCode, strError(errCode), failed);
+ throw;
+ }
+}
+
+}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/SocketTransport.h b/cpp/src/qpid/sys/SocketTransport.h
new file mode 100644
index 0000000000..b2f1e72907
--- /dev/null
+++ b/cpp/src/qpid/sys/SocketTransport.h
@@ -0,0 +1,91 @@
+#ifndef QPID_SYS_SOCKETTRANSPORT_H
+#define QPID_SYS_SOCKETTRANSPORT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/TransportFactory.h"
+
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace sys {
+
+class AsynchAcceptor;
+class Poller;
+class Timer;
+class Socket;
+typedef boost::function0<Socket*> SocketFactory;
+typedef boost::function3<void, boost::shared_ptr<Poller>, const Socket&, ConnectionCodec::Factory*> EstablishedCallback;
+
+struct SocketTransportOptions {
+ bool tcpNoDelay;
+ bool nodict;
+ uint32_t maxNegotiateTime;
+
+ SocketTransportOptions(bool t, bool d, uint32_t m) :
+ tcpNoDelay(t),
+ nodict(d),
+ maxNegotiateTime(m)
+ {}
+};
+
+class SocketAcceptor : public TransportAcceptor {
+ boost::ptr_vector<Socket> listeners;
+ boost::ptr_vector<AsynchAcceptor> acceptors;
+ Timer& timer;
+ SocketTransportOptions options;
+ const EstablishedCallback established;
+
+public:
+ SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer);
+ SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer, const EstablishedCallback& established);
+
+ // Create sockets from list of interfaces and listen to them
+ uint16_t listen(const std::vector<std::string>& interfaces, const std::string& port, int backlog, const SocketFactory& factory);
+
+ // Import sockets that are already being listened to
+ void addListener(Socket* socket);
+
+ void accept(boost::shared_ptr<Poller> poller, ConnectionCodec::Factory* f);
+};
+
+class SocketConnector : public TransportConnector {
+ Timer& timer;
+ const SocketFactory factory;
+ SocketTransportOptions options;
+
+public:
+ SocketConnector(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer, const SocketFactory& factory);
+
+ void connect(boost::shared_ptr<Poller> poller,
+ const std::string& name,
+ const std::string& host, const std::string& port,
+ ConnectionCodec::Factory* f,
+ ConnectFailedCallback failed);
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp
index a40da24eb8..20ca9256fc 100644
--- a/cpp/src/qpid/sys/SslPlugin.cpp
+++ b/cpp/src/qpid/sys/SslPlugin.cpp
@@ -19,22 +19,17 @@
*
*/
-#include "qpid/sys/ProtocolFactory.h"
+#include "qpid/sys/TransportFactory.h"
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/NameGenerator.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/SocketTransport.h"
#include "qpid/sys/ssl/util.h"
#include "qpid/sys/ssl/SslSocket.h"
-#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/SystemInfo.h"
-#include "qpid/sys/Poller.h"
#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
@@ -64,32 +59,20 @@ struct SslServerOptions : ssl::SslOptions
}
};
-class SslProtocolFactory : public ProtocolFactory {
- boost::ptr_vector<Socket> listeners;
- boost::ptr_vector<AsynchAcceptor> acceptors;
- Timer& brokerTimer;
- uint32_t maxNegotiateTime;
- uint16_t listeningPort;
- const bool tcpNoDelay;
- bool nodict;
-
- public:
- SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
- Timer& timer);
- void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
- ConnectionCodec::Factory*,
- ConnectFailedCallback);
+namespace {
+ Socket* createServerSSLSocket(const SslServerOptions& options) {
+ return new SslSocket(options.certName, options.clientAuth);
+ }
- uint16_t getPort() const;
+ Socket* createServerSSLMuxSocket(const SslServerOptions& options) {
+ return new SslMuxSocket(options.certName, options.clientAuth);
+ }
- private:
- void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
- void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
- void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
- void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
-};
+ Socket* createClientSSLSocket() {
+ return new SslSocket();
+ }
+}
// Static instance to initialise plugin
static struct SslPlugin : public Plugin {
@@ -104,7 +87,7 @@ static struct SslPlugin : public Plugin {
void earlyInitialize(Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker && !options.certDbPath.empty()) {
- const broker::Broker::Options& opts = broker->getOptions();
+ broker::Broker::Options& opts = broker->getOptions();
if (opts.port == options.port && // AMQP & AMQPS ports are the same
opts.port != 0) {
@@ -132,18 +115,25 @@ static struct SslPlugin : public Plugin {
nssInitialized = true;
const broker::Broker::Options& opts = broker->getOptions();
-
- ProtocolFactory::shared_ptr protocol(
- static_cast<ProtocolFactory*>(new SslProtocolFactory(opts, options, broker->getTimer())));
-
- if (protocol->getPort()!=0 ) {
+ TransportAcceptor::shared_ptr ta;
+ SocketAcceptor* sa =
+ new SocketAcceptor(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer());
+ uint16_t port = sa->listen(opts.listenInterfaces, boost::lexical_cast<std::string>(options.port), opts.connectionBacklog,
+ options.multiplex ?
+ boost::bind(&createServerSSLMuxSocket, options) :
+ boost::bind(&createServerSSLSocket, options));
+ if ( port!=0 ) {
+ ta.reset(sa);
QPID_LOG(notice, "Listening for " <<
(options.multiplex ? "SSL or TCP" : "SSL") <<
" connections on TCP/TCP6 port " <<
- protocol->getPort());
+ port);
}
- broker->registerProtocolFactory("ssl", protocol);
+ TransportConnector::shared_ptr tc(
+ new SocketConnector(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer(),
+ &createClientSSLSocket));
+ broker->registerTransport("ssl", ta, tc, port);
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what());
}
@@ -152,160 +142,4 @@ static struct SslPlugin : public Plugin {
}
} sslPlugin;
-namespace {
- // Expand list of Interfaces and addresses to a list of addresses
- std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
- std::vector<std::string> addresses;
- // If there are no specific interfaces listed use a single "" to listen on every interface
- if (interfaces.empty()) {
- addresses.push_back("");
- return addresses;
- }
- for (unsigned i = 0; i < interfaces.size(); ++i) {
- const std::string& interface = interfaces[i];
- if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
- // We don't have an interface of that name -
- // Check for IPv6 ('[' ']') brackets and remove them
- // then pass to be looked up directly
- if (interface[0]=='[' && interface[interface.size()-1]==']') {
- addresses.push_back(interface.substr(1, interface.size()-2));
- } else {
- addresses.push_back(interface);
- }
- }
- }
- return addresses;
- }
-}
-
-SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
- Timer& timer) :
- brokerTimer(timer),
- maxNegotiateTime(opts.maxNegotiateTime),
- tcpNoDelay(opts.tcpNoDelay),
- nodict(options.nodict)
-{
- std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
- if (addresses.empty()) {
- // We specified some interfaces, but couldn't find addresses for them
- QPID_LOG(warning, "SSL: No specified network interfaces found: Not Listening");
- listeningPort = 0;
- }
-
- for (unsigned i = 0; i<addresses.size(); ++i) {
- QPID_LOG(debug, "Using interface: " << addresses[i]);
- SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
-
- // We must have at least one resolved address
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = options.multiplex ?
- new SslMuxSocket(options.certName, options.clientAuth) :
- new SslSocket(options.certName, options.clientAuth);
- uint16_t lport = s->listen(sa, opts.connectionBacklog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
-
- listeningPort = lport;
-
- // Try any other resolved addresses
- while (sa.nextAddress()) {
- // Hack to ensure that all listening connections are on the same port
- sa.setAddrInfoPort(listeningPort);
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = options.multiplex ?
- new SslMuxSocket(options.certName, options.clientAuth) :
- new SslSocket(options.certName, options.clientAuth);
- uint16_t lport = s->listen(sa, opts.connectionBacklog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
- }
- }
-}
-
-void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f) {
- AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
- establishedCommon(async, poller, s);
-}
-
-void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, const std::string& name) {
- AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
- establishedCommon(async, poller, s);
-}
-
-void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
- if (tcpNoDelay) {
- s.setTcpNoDelay();
- QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
- }
-
- AsynchIO* aio = AsynchIO::create(
- s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
-
- async->init(aio, brokerTimer, maxNegotiateTime);
- aio->start(poller);
-}
-
-uint16_t SslProtocolFactory::getPort() const {
- return listeningPort; // Immutable no need for lock.
-}
-
-void SslProtocolFactory::accept(Poller::shared_ptr poller,
- ConnectionCodec::Factory* fact) {
- for (unsigned i = 0; i<listeners.size(); ++i) {
- acceptors.push_back(
- AsynchAcceptor::create(listeners[i],
- boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
- acceptors[i].start(poller);
- }
-}
-
-void SslProtocolFactory::connectFailed(
- const Socket& s, int ec, const std::string& emsg,
- ConnectFailedCallback failedCb)
-{
- failedCb(ec, emsg);
- s.close();
- delete &s;
-}
-
-void SslProtocolFactory::connect(
- Poller::shared_ptr poller,
- const std::string& name,
- const std::string& host, const std::string& port,
- ConnectionCodec::Factory* fact,
- ConnectFailedCallback failed)
-{
- // Note that the following logic does not cause a memory leak.
- // The allocated Socket is freed either by the SslConnector
- // upon connection failure or by the SslIoHandle upon connection
- // shutdown. The allocated SslConnector frees itself when it
- // is no longer needed.
-
- Socket* socket = new qpid::sys::ssl::SslSocket();
- try {
- AsynchConnector* c = AsynchConnector::create(
- *socket,
- host,
- port,
- boost::bind(&SslProtocolFactory::establishedOutgoing,
- this, poller, _1, fact, name),
- boost::bind(&SslProtocolFactory::connectFailed,
- this, _1, _2, _3, failed));
- c->start(poller);
- } catch (std::exception&) {
- // TODO: Design question - should we do the error callback and also throw?
- int errCode = socket->getError();
- connectFailed(*socket, errCode, strError(errCode), failed);
- throw;
- }
-}
-
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index 1ef8708cd0..f9be1043f8 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -19,52 +19,18 @@
*
*/
-#include "qpid/sys/ProtocolFactory.h"
+#include "qpid/sys/TransportFactory.h"
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/NameGenerator.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
-#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/SystemInfo.h"
-#include "qpid/sys/Poller.h"
-
-#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
+#include "qpid/sys/SocketTransport.h"
namespace qpid {
namespace sys {
-class Timer;
-
-class AsynchIOProtocolFactory : public ProtocolFactory {
- boost::ptr_vector<Socket> listeners;
- boost::ptr_vector<AsynchAcceptor> acceptors;
- Timer& brokerTimer;
- uint32_t maxNegotiateTime;
- uint16_t listeningPort;
- const bool tcpNoDelay;
-
- public:
- AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen);
- void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& name,
- const std::string& host, const std::string& port,
- ConnectionCodec::Factory*,
- ConnectFailedCallback);
-
- uint16_t getPort() const;
-
- private:
- void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
- void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
- void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
- void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
-};
-
static bool sslMultiplexEnabled(void)
{
Options o;
@@ -93,170 +59,22 @@ static class TCPIOPlugin : public Plugin {
// Check for SSL on the same port
bool shouldListen = !sslMultiplexEnabled();
- ProtocolFactory::shared_ptr protocolt(
- new AsynchIOProtocolFactory(opts, broker->getTimer(),shouldListen));
-
- if (shouldListen && protocolt->getPort()!=0 ) {
- QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
- }
-
- broker->registerProtocolFactory("tcp", protocolt);
- }
- }
-} tcpPlugin;
-
-namespace {
- // Expand list of Interfaces and addresses to a list of addresses
- std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
- std::vector<std::string> addresses;
- // If there are no specific interfaces listed use a single "" to listen on every interface
- if (interfaces.empty()) {
- addresses.push_back("");
- return addresses;
- }
- for (unsigned i = 0; i < interfaces.size(); ++i) {
- const std::string& interface = interfaces[i];
- if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
- // We don't have an interface of that name -
- // Check for IPv6 ('[' ']') brackets and remove them
- // then pass to be looked up directly
- if (interface[0]=='[' && interface[interface.size()-1]==']') {
- addresses.push_back(interface.substr(1, interface.size()-2));
- } else {
- addresses.push_back(interface);
+ uint16_t port = opts.port;
+ TransportAcceptor::shared_ptr ta;
+ if (shouldListen) {
+ SocketAcceptor* aa = new SocketAcceptor(opts.tcpNoDelay, false, opts.maxNegotiateTime, broker->getTimer());
+ ta.reset(aa);
+ port = aa->listen(opts.listenInterfaces, boost::lexical_cast<std::string>(opts.port), opts.connectionBacklog, &createSocket);
+ if ( port!=0 ) {
+ QPID_LOG(notice, "Listening on TCP/TCP6 port " << port);
}
}
- }
- return addresses;
- }
-}
-
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen) :
- brokerTimer(timer),
- maxNegotiateTime(opts.maxNegotiateTime),
- tcpNoDelay(opts.tcpNoDelay)
-{
- if (!shouldListen) {
- listeningPort = boost::lexical_cast<uint16_t>(opts.port);
- return;
- }
-
- std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
- if (addresses.empty()) {
- // We specified some interfaces, but couldn't find addresses for them
- QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
- listeningPort = 0;
- }
-
- for (unsigned i = 0; i<addresses.size(); ++i) {
- QPID_LOG(debug, "Using interface: " << addresses[i]);
- SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(opts.port));
-
- // We must have at least one resolved address
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = createSocket();
- uint16_t lport = s->listen(sa, opts.connectionBacklog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
- listeningPort = lport;
+ TransportConnector::shared_ptr tc(new SocketConnector(opts.tcpNoDelay, false, opts.maxNegotiateTime, broker->getTimer(), &createSocket));
- // Try any other resolved addresses
- while (sa.nextAddress()) {
- // Hack to ensure that all listening connections are on the same port
- sa.setAddrInfoPort(listeningPort);
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = createSocket();
- uint16_t lport = s->listen(sa, opts.connectionBacklog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
+ broker->registerTransport("tcp", ta, tc, port);
}
}
-}
-
-void AsynchIOProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f) {
- AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
- establishedCommon(async, poller, s);
-}
-
-void AsynchIOProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, const std::string& name) {
- AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
- establishedCommon(async, poller, s);
-}
-
-void AsynchIOProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
- if (tcpNoDelay) {
- s.setTcpNoDelay();
- QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
- }
-
- AsynchIO* aio = AsynchIO::create
- (s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
-
- async->init(aio, brokerTimer, maxNegotiateTime);
- aio->start(poller);
-}
-
-uint16_t AsynchIOProtocolFactory::getPort() const {
- return listeningPort; // Immutable no need for lock.
-}
-
-void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
- ConnectionCodec::Factory* fact) {
- for (unsigned i = 0; i<listeners.size(); ++i) {
- acceptors.push_back(
- AsynchAcceptor::create(listeners[i],
- boost::bind(&AsynchIOProtocolFactory::establishedIncoming, this, poller, _1, fact)));
- acceptors[i].start(poller);
- }
-}
-
-void AsynchIOProtocolFactory::connectFailed(
- const Socket& s, int ec, const std::string& emsg,
- ConnectFailedCallback failedCb)
-{
- failedCb(ec, emsg);
- s.close();
- delete &s;
-}
-
-void AsynchIOProtocolFactory::connect(
- Poller::shared_ptr poller,
- const std::string& name,
- const std::string& host, const std::string& port,
- ConnectionCodec::Factory* fact,
- ConnectFailedCallback failed)
-{
- // Note that the following logic does not cause a memory leak.
- // The allocated Socket is freed either by the AsynchConnector
- // upon connection failure or by the AsynchIO upon connection
- // shutdown. The allocated AsynchConnector frees itself when it
- // is no longer needed.
- Socket* socket = createSocket();
- try {
- AsynchConnector* c = AsynchConnector::create(
- *socket,
- host,
- port,
- boost::bind(&AsynchIOProtocolFactory::establishedOutgoing,
- this, poller, _1, fact, name),
- boost::bind(&AsynchIOProtocolFactory::connectFailed,
- this, _1, _2, _3, failed));
- c->start(poller);
- } catch (std::exception&) {
- // TODO: Design question - should we do the error callback and also throw?
- int errCode = socket->getError();
- connectFailed(*socket, errCode, strError(errCode), failed);
- throw;
- }
-}
+} tcpPlugin;
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/ProtocolFactory.h b/cpp/src/qpid/sys/TransportFactory.h
index 236398c111..06aa168024 100644
--- a/cpp/src/qpid/sys/ProtocolFactory.h
+++ b/cpp/src/qpid/sys/TransportFactory.h
@@ -1,5 +1,5 @@
-#ifndef _sys_ProtocolFactory_h
-#define _sys_ProtocolFactory_h
+#ifndef QPID_SYS_TRANSPORTFACTORY_H
+#define QPID_SYS_TRANSPORTFACTORY_H
/*
*
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,24 +22,34 @@
*
*/
-#include "qpid/sys/IntegerTypes.h"
#include "qpid/SharedObject.h"
#include "qpid/sys/ConnectionCodec.h"
+#include <string>
#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace sys {
+class AsynchAcceptor;
class Poller;
+class Timer;
-class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
+class TransportAcceptor : public qpid::SharedObject<TransportAcceptor>
{
public:
+ virtual ~TransportAcceptor() = 0;
+ virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
+};
+
+inline TransportAcceptor::~TransportAcceptor() {}
+
+class TransportConnector : public qpid::SharedObject<TransportConnector>
+{
+public:
typedef boost::function2<void, int, std::string> ConnectFailedCallback;
- virtual ~ProtocolFactory() = 0;
- virtual uint16_t getPort() const = 0;
- virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
+ virtual ~TransportConnector() = 0;
virtual void connect(
boost::shared_ptr<Poller>,
const std::string& name,
@@ -48,10 +58,8 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
ConnectFailedCallback failed) = 0;
};
-inline ProtocolFactory::~ProtocolFactory() {}
+inline TransportConnector::~TransportConnector() {}
}}
-
-
-#endif //!_sys_ProtocolFactory_h
+#endif
diff --git a/cpp/src/qpid/sys/windows/WinSocket.cpp b/cpp/src/qpid/sys/windows/WinSocket.cpp
index b2d2d79c63..080600adcd 100644
--- a/cpp/src/qpid/sys/windows/WinSocket.cpp
+++ b/cpp/src/qpid/sys/windows/WinSocket.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY