summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIOAcceptor.cpp')
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp60
1 files changed, 17 insertions, 43 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 43fbfdf7be..5133fde183 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -43,19 +43,15 @@ namespace qpid {
namespace sys {
class AsynchIOAcceptor : public Acceptor {
- Poller::shared_ptr poller;
Socket listener;
- int numIOThreads;
const uint16_t listeningPort;
+ std::auto_ptr<AsynchAcceptor> acceptor;
public:
- AsynchIOAcceptor(int16_t port, int backlog, int threads);
- ~AsynchIOAcceptor() {}
- void run(ConnectionCodec::Factory*);
- void connect(const std::string& host, int16_t port, ConnectionCodec::Factory*);
+ AsynchIOAcceptor(int16_t port, int backlog);
+ void run(Poller::shared_ptr, ConnectionCodec::Factory*);
+ void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*);
- void shutdown();
-
uint16_t getPort() const;
std::string getHost() const;
@@ -63,15 +59,14 @@ class AsynchIOAcceptor : public Acceptor {
void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
};
-Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads)
+Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog)
{
- return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads));
+ return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog));
}
-AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
- poller(new Poller),
- numIOThreads(threads),
- listeningPort(listener.listen(port, backlog))
+AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) :
+ listeningPort(listener.listen(port, backlog)),
+ acceptor(0)
{}
// Buffer definition
@@ -157,30 +152,17 @@ std::string AsynchIOAcceptor::getHost() const {
return listener.getSockname();
}
-void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) {
- Dispatcher d(poller);
- AsynchAcceptor
- acceptor(listener,
- boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact));
- acceptor.start(poller);
-
- std::vector<Thread> t(numIOThreads-1);
-
- // Run n-1 io threads
- for (int i=0; i<numIOThreads-1; ++i)
- t[i] = Thread(d);
-
- // Run final thread
- d.run();
-
- // Now wait for n-1 io threads to exit
- for (int i=0; i<numIOThreads-1; ++i) {
- t[i].join();
- }
+void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
+ acceptor.reset(
+ new AsynchAcceptor(listener,
+ boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)));
+ acceptor->start(poller);
}
void AsynchIOAcceptor::connect(
- const std::string& host, int16_t port, ConnectionCodec::Factory* f)
+ Poller::shared_ptr poller,
+ const std::string& host, int16_t port,
+ ConnectionCodec::Factory* f)
{
Socket* socket = new Socket();//Should be deleted by handle when socket closes
socket->connect(host, port);
@@ -202,14 +184,6 @@ void AsynchIOAcceptor::connect(
aio->start(poller);
}
-
-void AsynchIOAcceptor::shutdown() {
- // NB: this function must be async-signal safe, it must not
- // call any function that is not async-signal safe.
- poller->shutdown();
-}
-
-
void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
{
QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")");