diff options
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIOAcceptor.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 60 |
1 files changed, 17 insertions, 43 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 43fbfdf7be..5133fde183 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -43,19 +43,15 @@ namespace qpid { namespace sys { class AsynchIOAcceptor : public Acceptor { - Poller::shared_ptr poller; Socket listener; - int numIOThreads; const uint16_t listeningPort; + std::auto_ptr<AsynchAcceptor> acceptor; public: - AsynchIOAcceptor(int16_t port, int backlog, int threads); - ~AsynchIOAcceptor() {} - void run(ConnectionCodec::Factory*); - void connect(const std::string& host, int16_t port, ConnectionCodec::Factory*); + AsynchIOAcceptor(int16_t port, int backlog); + void run(Poller::shared_ptr, ConnectionCodec::Factory*); + void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*); - void shutdown(); - uint16_t getPort() const; std::string getHost() const; @@ -63,15 +59,14 @@ class AsynchIOAcceptor : public Acceptor { void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); }; -Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads) +Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog) { - return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads)); + return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog)); } -AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) : - poller(new Poller), - numIOThreads(threads), - listeningPort(listener.listen(port, backlog)) +AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) : + listeningPort(listener.listen(port, backlog)), + acceptor(0) {} // Buffer definition @@ -157,30 +152,17 @@ std::string AsynchIOAcceptor::getHost() const { return listener.getSockname(); } -void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) { - Dispatcher d(poller); - AsynchAcceptor - acceptor(listener, - boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)); - acceptor.start(poller); - - std::vector<Thread> t(numIOThreads-1); - - // Run n-1 io threads - for (int i=0; i<numIOThreads-1; ++i) - t[i] = Thread(d); - - // Run final thread - d.run(); - - // Now wait for n-1 io threads to exit - for (int i=0; i<numIOThreads-1; ++i) { - t[i].join(); - } +void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { + acceptor.reset( + new AsynchAcceptor(listener, + boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact))); + acceptor->start(poller); } void AsynchIOAcceptor::connect( - const std::string& host, int16_t port, ConnectionCodec::Factory* f) + Poller::shared_ptr poller, + const std::string& host, int16_t port, + ConnectionCodec::Factory* f) { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); @@ -202,14 +184,6 @@ void AsynchIOAcceptor::connect( aio->start(poller); } - -void AsynchIOAcceptor::shutdown() { - // NB: this function must be async-signal safe, it must not - // call any function that is not async-signal safe. - poller->shutdown(); -} - - void AsynchIOHandler::write(const framing::ProtocolInitiation& data) { QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")"); |
