diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 36 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ProtocolFactory.h (renamed from cpp/src/qpid/sys/Acceptor.h) | 14 | ||||
-rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 50 |
5 files changed, 50 insertions, 66 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 608120a7e7..597e566049 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -473,7 +473,6 @@ nobase_include_HEADERS = \ qpid/management/ManagementAgent.h \ qpid/management/ManagementExchange.h \ qpid/management/ManagementObject.h \ - qpid/sys/Acceptor.h \ qpid/sys/AggregateOutput.h \ qpid/sys/AsynchIO.h \ qpid/sys/AsynchIOHandler.h \ @@ -493,6 +492,7 @@ nobase_include_HEADERS = \ qpid/sys/OutputControl.h \ qpid/sys/OutputTask.h \ qpid/sys/Poller.h \ + qpid/sys/ProtocolFactory.h \ qpid/sys/Runnable.h \ qpid/sys/ScopedIncrement.h \ qpid/sys/Semaphore.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index ec690a8acb..6cbd9bf343 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -35,7 +35,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ProtocolInitiation.h" -#include "qpid/sys/Acceptor.h" +#include "qpid/sys/ProtocolFactory.h" #include "qpid/sys/Poller.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Thread.h" @@ -54,7 +54,7 @@ #include <sasl/sasl.h> #endif -using qpid::sys::Acceptor; +using qpid::sys::ProtocolFactory; using qpid::sys::Poller; using qpid::sys::Dispatcher; using qpid::sys::Thread; @@ -334,41 +334,33 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, return status; } -boost::shared_ptr<Acceptor> Broker::getAcceptor() const { - assert(acceptors.size() > 0); -#if 0 - if (!acceptor) { - const_cast<Acceptor::shared_ptr&>(acceptor) = - Acceptor::create(config.port, - config.connectionBacklog); - QPID_LOG(info, "Listening on port " << getPort()); - } -#endif - return acceptors[0]; +boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory() const { + assert(protocolFactories.size() > 0); + return protocolFactories[0]; } -void Broker::registerAccepter(Acceptor::shared_ptr acceptor) { - acceptors.push_back(acceptor); +void Broker::registerProtocolFactory(ProtocolFactory::shared_ptr protocolFactory) { + protocolFactories.push_back(protocolFactory); } -// TODO: This can only work if there is only one acceptor +// TODO: This can only work if there is only one protocolFactory uint16_t Broker::getPort() const { - return getAcceptor()->getPort(); + return getProtocolFactory()->getPort(); } -// TODO: This should iterate over all acceptors +// TODO: This should iterate over all protocolFactories void Broker::accept() { - for (unsigned int i = 0; i < acceptors.size(); ++i) - acceptors[i]->run(poller, &factory); + for (unsigned int i = 0; i < protocolFactories.size(); ++i) + protocolFactories[i]->accept(poller, &factory); } -// TODO: How to chose the acceptor to use for the connection +// TODO: How to chose the protocolFactory to use for the connection void Broker::connect( const std::string& host, uint16_t port, sys::ConnectionCodec::Factory* f) { - getAcceptor()->connect(poller, host, port, f ? f : &factory); + getProtocolFactory()->connect(poller, host, port, f ? f : &factory); } void Broker::connect( diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 02f34ff3ba..fa66061fd0 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -50,7 +50,7 @@ namespace qpid { namespace sys { - class Acceptor; + class ProtocolFactory; class Poller; } @@ -124,8 +124,8 @@ class Broker : public sys::Runnable, public Plugin::Target, management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); - /** Add to the broker's acceptors */ - void registerAccepter(boost::shared_ptr<sys::Acceptor>); + /** Add to the broker's protocolFactorys */ + void registerProtocolFactory(boost::shared_ptr<sys::ProtocolFactory>); /** Accept connections */ void accept(); @@ -139,7 +139,7 @@ class Broker : public sys::Runnable, public Plugin::Target, private: boost::shared_ptr<sys::Poller> poller; Options config; - std::vector< boost::shared_ptr<sys::Acceptor> > acceptors; + std::vector< boost::shared_ptr<sys::ProtocolFactory> > protocolFactories; MessageStore* store; DataDir dataDir; @@ -154,9 +154,9 @@ class Broker : public sys::Runnable, public Plugin::Target, Vhost::shared_ptr vhostObject; System::shared_ptr systemObject; - // TODO: There is no longer a single acceptor so the use of the following needs to be fixed - // For the present just return the first acceptor registered. - boost::shared_ptr<sys::Acceptor> getAcceptor() const; + // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed + // For the present just return the first ProtocolFactory registered. + boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const; void declareStandardExchange(const std::string& name, const std::string& type); }; diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/ProtocolFactory.h index 69a6eb8d7c..5f80771e49 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/ProtocolFactory.h @@ -1,5 +1,5 @@ -#ifndef _sys_Acceptor_h -#define _sys_Acceptor_h +#ifndef _sys_ProtocolFactory_h +#define _sys_ProtocolFactory_h /* * @@ -32,23 +32,23 @@ namespace sys { class Poller; -class Acceptor : public qpid::SharedObject<Acceptor> +class ProtocolFactory : public qpid::SharedObject<ProtocolFactory> { public: - virtual ~Acceptor() = 0; + virtual ~ProtocolFactory() = 0; virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; - virtual void run(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0; + virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0; virtual void connect( boost::shared_ptr<Poller>, const std::string& host, int16_t port, ConnectionCodec::Factory* codec) = 0; }; -inline Acceptor::~Acceptor() {} +inline ProtocolFactory::~ProtocolFactory() {} }} -#endif /*!_sys_Acceptor_h*/ +#endif //!_sys_ProtocolFactory_h diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index eb6bcb3dee..65ea380b07 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -19,7 +19,7 @@ * */ -#include "Acceptor.h" +#include "ProtocolFactory.h" #include "AsynchIOHandler.h" #include "AsynchIO.h" @@ -33,21 +33,21 @@ namespace qpid { namespace sys { -class AsynchIOAcceptor : public Acceptor { +class AsynchIOProtocolFactory : public ProtocolFactory { Socket listener; const uint16_t listeningPort; std::auto_ptr<AsynchAcceptor> acceptor; public: - AsynchIOAcceptor(int16_t port, int backlog); - void run(Poller::shared_ptr, ConnectionCodec::Factory*); + AsynchIOProtocolFactory(int16_t port, int backlog); + void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*); uint16_t getPort() const; std::string getHost() const; private: - void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); + void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient); }; // Static instance to initialise plugin @@ -56,24 +56,26 @@ static class TCPIOPlugin : public Plugin { } void initialize(Target& target) { - broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); // Only provide to a Broker if (broker) { const broker::Broker::Options& opts = broker->getOptions(); - Acceptor::shared_ptr acceptor(new AsynchIOAcceptor(opts.port, opts.connectionBacklog)); - QPID_LOG(info, "Listening on TCP port " << acceptor->getPort()); - broker->registerAccepter(acceptor); + ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog)); + QPID_LOG(info, "Listening on TCP port " << protocol->getPort()); + broker->registerProtocolFactory(protocol); } } -} acceptor; +} tcpPlugin; -AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) : +AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) : listeningPort(listener.listen(port, backlog)) {} -void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) { +void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f, bool isClient) { AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f); + if (isClient) + async->setClient(); AsynchIO* aio = new AsynchIO(s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), @@ -85,40 +87,30 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn aio->start(poller); } - -uint16_t AsynchIOAcceptor::getPort() const { +uint16_t AsynchIOProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } -std::string AsynchIOAcceptor::getHost() const { +std::string AsynchIOProtocolFactory::getHost() const { return listener.getSockname(); } -void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { +void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { acceptor.reset( new AsynchAcceptor(listener, - boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact))); + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); acceptor->start(poller); } -void AsynchIOAcceptor::connect( +void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t port, ConnectionCodec::Factory* f) { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); - AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f); - async->setClient(); - AsynchIO* aio = new AsynchIO(*socket, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, 4); - aio->start(poller); + + established(poller, *socket, f, true); } }} // namespace qpid::sys |