diff options
Diffstat (limited to 'cpp/src/qpid/sys/TCPIOPlugin.cpp')
-rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 50 |
1 files changed, 21 insertions, 29 deletions
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index eb6bcb3dee..65ea380b07 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -19,7 +19,7 @@ * */ -#include "Acceptor.h" +#include "ProtocolFactory.h" #include "AsynchIOHandler.h" #include "AsynchIO.h" @@ -33,21 +33,21 @@ namespace qpid { namespace sys { -class AsynchIOAcceptor : public Acceptor { +class AsynchIOProtocolFactory : public ProtocolFactory { Socket listener; const uint16_t listeningPort; std::auto_ptr<AsynchAcceptor> acceptor; public: - AsynchIOAcceptor(int16_t port, int backlog); - void run(Poller::shared_ptr, ConnectionCodec::Factory*); + AsynchIOProtocolFactory(int16_t port, int backlog); + void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*); uint16_t getPort() const; std::string getHost() const; private: - void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); + void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient); }; // Static instance to initialise plugin @@ -56,24 +56,26 @@ static class TCPIOPlugin : public Plugin { } void initialize(Target& target) { - broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); // Only provide to a Broker if (broker) { const broker::Broker::Options& opts = broker->getOptions(); - Acceptor::shared_ptr acceptor(new AsynchIOAcceptor(opts.port, opts.connectionBacklog)); - QPID_LOG(info, "Listening on TCP port " << acceptor->getPort()); - broker->registerAccepter(acceptor); + ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog)); + QPID_LOG(info, "Listening on TCP port " << protocol->getPort()); + broker->registerProtocolFactory(protocol); } } -} acceptor; +} tcpPlugin; -AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) : +AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) : listeningPort(listener.listen(port, backlog)) {} -void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) { +void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f, bool isClient) { AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f); + if (isClient) + async->setClient(); AsynchIO* aio = new AsynchIO(s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), @@ -85,40 +87,30 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn aio->start(poller); } - -uint16_t AsynchIOAcceptor::getPort() const { +uint16_t AsynchIOProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } -std::string AsynchIOAcceptor::getHost() const { +std::string AsynchIOProtocolFactory::getHost() const { return listener.getSockname(); } -void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { +void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { acceptor.reset( new AsynchAcceptor(listener, - boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact))); + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); acceptor->start(poller); } -void AsynchIOAcceptor::connect( +void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t port, ConnectionCodec::Factory* f) { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); - AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f); - async->setClient(); - AsynchIO* aio = new AsynchIO(*socket, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, 4); - aio->start(poller); + + established(poller, *socket, f, true); } }} // namespace qpid::sys |