summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/Broker.cpp36
-rw-r--r--cpp/src/qpid/broker/Broker.h14
-rw-r--r--cpp/src/qpid/sys/ProtocolFactory.h (renamed from cpp/src/qpid/sys/Acceptor.h)14
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp50
5 files changed, 50 insertions, 66 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 608120a7e7..597e566049 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -473,7 +473,6 @@ nobase_include_HEADERS = \
qpid/management/ManagementAgent.h \
qpid/management/ManagementExchange.h \
qpid/management/ManagementObject.h \
- qpid/sys/Acceptor.h \
qpid/sys/AggregateOutput.h \
qpid/sys/AsynchIO.h \
qpid/sys/AsynchIOHandler.h \
@@ -493,6 +492,7 @@ nobase_include_HEADERS = \
qpid/sys/OutputControl.h \
qpid/sys/OutputTask.h \
qpid/sys/Poller.h \
+ qpid/sys/ProtocolFactory.h \
qpid/sys/Runnable.h \
qpid/sys/ScopedIncrement.h \
qpid/sys/Semaphore.h \
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index ec690a8acb..6cbd9bf343 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -35,7 +35,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/sys/Acceptor.h"
+#include "qpid/sys/ProtocolFactory.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Thread.h"
@@ -54,7 +54,7 @@
#include <sasl/sasl.h>
#endif
-using qpid::sys::Acceptor;
+using qpid::sys::ProtocolFactory;
using qpid::sys::Poller;
using qpid::sys::Dispatcher;
using qpid::sys::Thread;
@@ -334,41 +334,33 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
return status;
}
-boost::shared_ptr<Acceptor> Broker::getAcceptor() const {
- assert(acceptors.size() > 0);
-#if 0
- if (!acceptor) {
- const_cast<Acceptor::shared_ptr&>(acceptor) =
- Acceptor::create(config.port,
- config.connectionBacklog);
- QPID_LOG(info, "Listening on port " << getPort());
- }
-#endif
- return acceptors[0];
+boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory() const {
+ assert(protocolFactories.size() > 0);
+ return protocolFactories[0];
}
-void Broker::registerAccepter(Acceptor::shared_ptr acceptor) {
- acceptors.push_back(acceptor);
+void Broker::registerProtocolFactory(ProtocolFactory::shared_ptr protocolFactory) {
+ protocolFactories.push_back(protocolFactory);
}
-// TODO: This can only work if there is only one acceptor
+// TODO: This can only work if there is only one protocolFactory
uint16_t Broker::getPort() const {
- return getAcceptor()->getPort();
+ return getProtocolFactory()->getPort();
}
-// TODO: This should iterate over all acceptors
+// TODO: This should iterate over all protocolFactories
void Broker::accept() {
- for (unsigned int i = 0; i < acceptors.size(); ++i)
- acceptors[i]->run(poller, &factory);
+ for (unsigned int i = 0; i < protocolFactories.size(); ++i)
+ protocolFactories[i]->accept(poller, &factory);
}
-// TODO: How to chose the acceptor to use for the connection
+// TODO: How to chose the protocolFactory to use for the connection
void Broker::connect(
const std::string& host, uint16_t port,
sys::ConnectionCodec::Factory* f)
{
- getAcceptor()->connect(poller, host, port, f ? f : &factory);
+ getProtocolFactory()->connect(poller, host, port, f ? f : &factory);
}
void Broker::connect(
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 02f34ff3ba..fa66061fd0 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -50,7 +50,7 @@
namespace qpid {
namespace sys {
- class Acceptor;
+ class ProtocolFactory;
class Poller;
}
@@ -124,8 +124,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
- /** Add to the broker's acceptors */
- void registerAccepter(boost::shared_ptr<sys::Acceptor>);
+ /** Add to the broker's protocolFactorys */
+ void registerProtocolFactory(boost::shared_ptr<sys::ProtocolFactory>);
/** Accept connections */
void accept();
@@ -139,7 +139,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
private:
boost::shared_ptr<sys::Poller> poller;
Options config;
- std::vector< boost::shared_ptr<sys::Acceptor> > acceptors;
+ std::vector< boost::shared_ptr<sys::ProtocolFactory> > protocolFactories;
MessageStore* store;
DataDir dataDir;
@@ -154,9 +154,9 @@ class Broker : public sys::Runnable, public Plugin::Target,
Vhost::shared_ptr vhostObject;
System::shared_ptr systemObject;
- // TODO: There is no longer a single acceptor so the use of the following needs to be fixed
- // For the present just return the first acceptor registered.
- boost::shared_ptr<sys::Acceptor> getAcceptor() const;
+ // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
+ // For the present just return the first ProtocolFactory registered.
+ boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
void declareStandardExchange(const std::string& name, const std::string& type);
};
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/ProtocolFactory.h
index 69a6eb8d7c..5f80771e49 100644
--- a/cpp/src/qpid/sys/Acceptor.h
+++ b/cpp/src/qpid/sys/ProtocolFactory.h
@@ -1,5 +1,5 @@
-#ifndef _sys_Acceptor_h
-#define _sys_Acceptor_h
+#ifndef _sys_ProtocolFactory_h
+#define _sys_ProtocolFactory_h
/*
*
@@ -32,23 +32,23 @@ namespace sys {
class Poller;
-class Acceptor : public qpid::SharedObject<Acceptor>
+class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
{
public:
- virtual ~Acceptor() = 0;
+ virtual ~ProtocolFactory() = 0;
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
- virtual void run(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
+ virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
virtual void connect(
boost::shared_ptr<Poller>,
const std::string& host, int16_t port,
ConnectionCodec::Factory* codec) = 0;
};
-inline Acceptor::~Acceptor() {}
+inline ProtocolFactory::~ProtocolFactory() {}
}}
-#endif /*!_sys_Acceptor_h*/
+#endif //!_sys_ProtocolFactory_h
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index eb6bcb3dee..65ea380b07 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "Acceptor.h"
+#include "ProtocolFactory.h"
#include "AsynchIOHandler.h"
#include "AsynchIO.h"
@@ -33,21 +33,21 @@
namespace qpid {
namespace sys {
-class AsynchIOAcceptor : public Acceptor {
+class AsynchIOProtocolFactory : public ProtocolFactory {
Socket listener;
const uint16_t listeningPort;
std::auto_ptr<AsynchAcceptor> acceptor;
public:
- AsynchIOAcceptor(int16_t port, int backlog);
- void run(Poller::shared_ptr, ConnectionCodec::Factory*);
+ AsynchIOProtocolFactory(int16_t port, int backlog);
+ void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*);
uint16_t getPort() const;
std::string getHost() const;
private:
- void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
+ void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient);
};
// Static instance to initialise plugin
@@ -56,24 +56,26 @@ static class TCPIOPlugin : public Plugin {
}
void initialize(Target& target) {
-
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker
if (broker) {
const broker::Broker::Options& opts = broker->getOptions();
- Acceptor::shared_ptr acceptor(new AsynchIOAcceptor(opts.port, opts.connectionBacklog));
- QPID_LOG(info, "Listening on TCP port " << acceptor->getPort());
- broker->registerAccepter(acceptor);
+ ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog));
+ QPID_LOG(info, "Listening on TCP port " << protocol->getPort());
+ broker->registerProtocolFactory(protocol);
}
}
-} acceptor;
+} tcpPlugin;
-AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) :
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) :
listeningPort(listener.listen(port, backlog))
{}
-void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) {
+void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, bool isClient) {
AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
+ if (isClient)
+ async->setClient();
AsynchIO* aio = new AsynchIO(s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
@@ -85,40 +87,30 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn
aio->start(poller);
}
-
-uint16_t AsynchIOAcceptor::getPort() const {
+uint16_t AsynchIOProtocolFactory::getPort() const {
return listeningPort; // Immutable no need for lock.
}
-std::string AsynchIOAcceptor::getHost() const {
+std::string AsynchIOProtocolFactory::getHost() const {
return listener.getSockname();
}
-void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
+void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
acceptor.reset(
new AsynchAcceptor(listener,
- boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)));
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
acceptor->start(poller);
}
-void AsynchIOAcceptor::connect(
+void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
const std::string& host, int16_t port,
ConnectionCodec::Factory* f)
{
Socket* socket = new Socket();//Should be deleted by handle when socket closes
socket->connect(host, port);
- AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f);
- async->setClient();
- AsynchIO* aio = new AsynchIO(*socket,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, 4);
- aio->start(poller);
+
+ established(poller, *socket, f, true);
}
}} // namespace qpid::sys