summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/TCPIOPlugin.cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2013-03-01 00:21:12 +0000
committerAndrew Stitcher <astitcher@apache.org>2013-03-01 00:21:12 +0000
commitd039b79e62bdb4f6f9aad9803ba9cef7c053dcdf (patch)
tree59abcc008d50b11f940bb234e2cbf1083159621e /cpp/src/qpid/sys/TCPIOPlugin.cpp
parent7e2379e6c5158ed4771e6d06df698d6b56c92305 (diff)
downloadqpid-python-d039b79e62bdb4f6f9aad9803ba9cef7c053dcdf.tar.gz
QPID-4610: Remove duplicated transport code from C++ broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1451443 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/TCPIOPlugin.cpp')
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp208
1 files changed, 13 insertions, 195 deletions
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index 1ef8708cd0..f9be1043f8 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -19,52 +19,18 @@
*
*/
-#include "qpid/sys/ProtocolFactory.h"
+#include "qpid/sys/TransportFactory.h"
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/NameGenerator.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
-#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/SystemInfo.h"
-#include "qpid/sys/Poller.h"
-
-#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
+#include "qpid/sys/SocketTransport.h"
namespace qpid {
namespace sys {
-class Timer;
-
-class AsynchIOProtocolFactory : public ProtocolFactory {
- boost::ptr_vector<Socket> listeners;
- boost::ptr_vector<AsynchAcceptor> acceptors;
- Timer& brokerTimer;
- uint32_t maxNegotiateTime;
- uint16_t listeningPort;
- const bool tcpNoDelay;
-
- public:
- AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen);
- void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& name,
- const std::string& host, const std::string& port,
- ConnectionCodec::Factory*,
- ConnectFailedCallback);
-
- uint16_t getPort() const;
-
- private:
- void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
- void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
- void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
- void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
-};
-
static bool sslMultiplexEnabled(void)
{
Options o;
@@ -93,170 +59,22 @@ static class TCPIOPlugin : public Plugin {
// Check for SSL on the same port
bool shouldListen = !sslMultiplexEnabled();
- ProtocolFactory::shared_ptr protocolt(
- new AsynchIOProtocolFactory(opts, broker->getTimer(),shouldListen));
-
- if (shouldListen && protocolt->getPort()!=0 ) {
- QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
- }
-
- broker->registerProtocolFactory("tcp", protocolt);
- }
- }
-} tcpPlugin;
-
-namespace {
- // Expand list of Interfaces and addresses to a list of addresses
- std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
- std::vector<std::string> addresses;
- // If there are no specific interfaces listed use a single "" to listen on every interface
- if (interfaces.empty()) {
- addresses.push_back("");
- return addresses;
- }
- for (unsigned i = 0; i < interfaces.size(); ++i) {
- const std::string& interface = interfaces[i];
- if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
- // We don't have an interface of that name -
- // Check for IPv6 ('[' ']') brackets and remove them
- // then pass to be looked up directly
- if (interface[0]=='[' && interface[interface.size()-1]==']') {
- addresses.push_back(interface.substr(1, interface.size()-2));
- } else {
- addresses.push_back(interface);
+ uint16_t port = opts.port;
+ TransportAcceptor::shared_ptr ta;
+ if (shouldListen) {
+ SocketAcceptor* aa = new SocketAcceptor(opts.tcpNoDelay, false, opts.maxNegotiateTime, broker->getTimer());
+ ta.reset(aa);
+ port = aa->listen(opts.listenInterfaces, boost::lexical_cast<std::string>(opts.port), opts.connectionBacklog, &createSocket);
+ if ( port!=0 ) {
+ QPID_LOG(notice, "Listening on TCP/TCP6 port " << port);
}
}
- }
- return addresses;
- }
-}
-
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen) :
- brokerTimer(timer),
- maxNegotiateTime(opts.maxNegotiateTime),
- tcpNoDelay(opts.tcpNoDelay)
-{
- if (!shouldListen) {
- listeningPort = boost::lexical_cast<uint16_t>(opts.port);
- return;
- }
-
- std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
- if (addresses.empty()) {
- // We specified some interfaces, but couldn't find addresses for them
- QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
- listeningPort = 0;
- }
-
- for (unsigned i = 0; i<addresses.size(); ++i) {
- QPID_LOG(debug, "Using interface: " << addresses[i]);
- SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(opts.port));
-
- // We must have at least one resolved address
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = createSocket();
- uint16_t lport = s->listen(sa, opts.connectionBacklog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
- listeningPort = lport;
+ TransportConnector::shared_ptr tc(new SocketConnector(opts.tcpNoDelay, false, opts.maxNegotiateTime, broker->getTimer(), &createSocket));
- // Try any other resolved addresses
- while (sa.nextAddress()) {
- // Hack to ensure that all listening connections are on the same port
- sa.setAddrInfoPort(listeningPort);
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = createSocket();
- uint16_t lport = s->listen(sa, opts.connectionBacklog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
+ broker->registerTransport("tcp", ta, tc, port);
}
}
-}
-
-void AsynchIOProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f) {
- AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
- establishedCommon(async, poller, s);
-}
-
-void AsynchIOProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, const std::string& name) {
- AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
- establishedCommon(async, poller, s);
-}
-
-void AsynchIOProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
- if (tcpNoDelay) {
- s.setTcpNoDelay();
- QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
- }
-
- AsynchIO* aio = AsynchIO::create
- (s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
-
- async->init(aio, brokerTimer, maxNegotiateTime);
- aio->start(poller);
-}
-
-uint16_t AsynchIOProtocolFactory::getPort() const {
- return listeningPort; // Immutable no need for lock.
-}
-
-void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
- ConnectionCodec::Factory* fact) {
- for (unsigned i = 0; i<listeners.size(); ++i) {
- acceptors.push_back(
- AsynchAcceptor::create(listeners[i],
- boost::bind(&AsynchIOProtocolFactory::establishedIncoming, this, poller, _1, fact)));
- acceptors[i].start(poller);
- }
-}
-
-void AsynchIOProtocolFactory::connectFailed(
- const Socket& s, int ec, const std::string& emsg,
- ConnectFailedCallback failedCb)
-{
- failedCb(ec, emsg);
- s.close();
- delete &s;
-}
-
-void AsynchIOProtocolFactory::connect(
- Poller::shared_ptr poller,
- const std::string& name,
- const std::string& host, const std::string& port,
- ConnectionCodec::Factory* fact,
- ConnectFailedCallback failed)
-{
- // Note that the following logic does not cause a memory leak.
- // The allocated Socket is freed either by the AsynchConnector
- // upon connection failure or by the AsynchIO upon connection
- // shutdown. The allocated AsynchConnector frees itself when it
- // is no longer needed.
- Socket* socket = createSocket();
- try {
- AsynchConnector* c = AsynchConnector::create(
- *socket,
- host,
- port,
- boost::bind(&AsynchIOProtocolFactory::establishedOutgoing,
- this, poller, _1, fact, name),
- boost::bind(&AsynchIOProtocolFactory::connectFailed,
- this, _1, _2, _3, failed));
- c->start(poller);
- } catch (std::exception&) {
- // TODO: Design question - should we do the error callback and also throw?
- int errCode = socket->getError();
- connectFailed(*socket, errCode, strError(errCode), failed);
- throw;
- }
-}
+} tcpPlugin;
}} // namespace qpid::sys