diff options
author | Andrew Stitcher <astitcher@apache.org> | 2008-04-17 00:19:14 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2008-04-17 00:19:14 +0000 |
commit | 71d805b6086ae19ed774589c25702d791ee91cf2 (patch) | |
tree | 67d99aa0b37a9592786ebd0897b6f631db24ca70 /cpp/src/qpid/sys | |
parent | c6faf3b28d0cf89051b2bff6476f3113285ed9a6 (diff) | |
download | qpid-python-71d805b6086ae19ed774589c25702d791ee91cf2.tar.gz |
Refactored IO Thread creation so that it happens in the Broker class
- There is now a single Poller created by the Broker class that is
passed to the Acceptor for use in network IO. It can also now be passed
to anything else that wants to put work in the IO threads
- The Broker class itself is now responsible for actually creating the
threads
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@648904 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r-- | cpp/src/qpid/sys/Acceptor.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 60 |
2 files changed, 25 insertions, 49 deletions
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h index 1e7827e60c..243e791eeb 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/Acceptor.h @@ -26,22 +26,24 @@ #include "qpid/SharedObject.h" #include "ConnectionCodec.h" + namespace qpid { namespace sys { +class Poller; + class Acceptor : public qpid::SharedObject<Acceptor> { public: - static Acceptor::shared_ptr create(int16_t port, int backlog, int threads); + static Acceptor::shared_ptr create(int16_t port, int backlog); virtual ~Acceptor() = 0; virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; - virtual void run(ConnectionCodec::Factory*) = 0; + virtual void run(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0; virtual void connect( - const std::string& host, int16_t port, ConnectionCodec::Factory* codec) = 0; - - /** Note: this function is async-signal safe */ - virtual void shutdown() = 0; + boost::shared_ptr<Poller>, + const std::string& host, int16_t port, + ConnectionCodec::Factory* codec) = 0; }; inline Acceptor::~Acceptor() {} diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 43fbfdf7be..5133fde183 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -43,19 +43,15 @@ namespace qpid { namespace sys { class AsynchIOAcceptor : public Acceptor { - Poller::shared_ptr poller; Socket listener; - int numIOThreads; const uint16_t listeningPort; + std::auto_ptr<AsynchAcceptor> acceptor; public: - AsynchIOAcceptor(int16_t port, int backlog, int threads); - ~AsynchIOAcceptor() {} - void run(ConnectionCodec::Factory*); - void connect(const std::string& host, int16_t port, ConnectionCodec::Factory*); + AsynchIOAcceptor(int16_t port, int backlog); + void run(Poller::shared_ptr, ConnectionCodec::Factory*); + void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*); - void shutdown(); - uint16_t getPort() const; std::string getHost() const; @@ -63,15 +59,14 @@ class AsynchIOAcceptor : public Acceptor { void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); }; -Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads) +Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog) { - return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads)); + return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog)); } -AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) : - poller(new Poller), - numIOThreads(threads), - listeningPort(listener.listen(port, backlog)) +AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) : + listeningPort(listener.listen(port, backlog)), + acceptor(0) {} // Buffer definition @@ -157,30 +152,17 @@ std::string AsynchIOAcceptor::getHost() const { return listener.getSockname(); } -void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) { - Dispatcher d(poller); - AsynchAcceptor - acceptor(listener, - boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)); - acceptor.start(poller); - - std::vector<Thread> t(numIOThreads-1); - - // Run n-1 io threads - for (int i=0; i<numIOThreads-1; ++i) - t[i] = Thread(d); - - // Run final thread - d.run(); - - // Now wait for n-1 io threads to exit - for (int i=0; i<numIOThreads-1; ++i) { - t[i].join(); - } +void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { + acceptor.reset( + new AsynchAcceptor(listener, + boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact))); + acceptor->start(poller); } void AsynchIOAcceptor::connect( - const std::string& host, int16_t port, ConnectionCodec::Factory* f) + Poller::shared_ptr poller, + const std::string& host, int16_t port, + ConnectionCodec::Factory* f) { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); @@ -202,14 +184,6 @@ void AsynchIOAcceptor::connect( aio->start(poller); } - -void AsynchIOAcceptor::shutdown() { - // NB: this function must be async-signal safe, it must not - // call any function that is not async-signal safe. - poller->shutdown(); -} - - void AsynchIOHandler::write(const framing::ProtocolInitiation& data) { QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")"); |