summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-04-17 00:19:14 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-04-17 00:19:14 +0000
commit71d805b6086ae19ed774589c25702d791ee91cf2 (patch)
tree67d99aa0b37a9592786ebd0897b6f631db24ca70 /cpp/src/qpid/sys
parentc6faf3b28d0cf89051b2bff6476f3113285ed9a6 (diff)
downloadqpid-python-71d805b6086ae19ed774589c25702d791ee91cf2.tar.gz
Refactored IO Thread creation so that it happens in the Broker class
- There is now a single Poller created by the Broker class that is passed to the Acceptor for use in network IO. It can also now be passed to anything else that wants to put work in the IO threads - The Broker class itself is now responsible for actually creating the threads git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@648904 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/Acceptor.h14
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp60
2 files changed, 25 insertions, 49 deletions
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h
index 1e7827e60c..243e791eeb 100644
--- a/cpp/src/qpid/sys/Acceptor.h
+++ b/cpp/src/qpid/sys/Acceptor.h
@@ -26,22 +26,24 @@
#include "qpid/SharedObject.h"
#include "ConnectionCodec.h"
+
namespace qpid {
namespace sys {
+class Poller;
+
class Acceptor : public qpid::SharedObject<Acceptor>
{
public:
- static Acceptor::shared_ptr create(int16_t port, int backlog, int threads);
+ static Acceptor::shared_ptr create(int16_t port, int backlog);
virtual ~Acceptor() = 0;
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
- virtual void run(ConnectionCodec::Factory*) = 0;
+ virtual void run(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
virtual void connect(
- const std::string& host, int16_t port, ConnectionCodec::Factory* codec) = 0;
-
- /** Note: this function is async-signal safe */
- virtual void shutdown() = 0;
+ boost::shared_ptr<Poller>,
+ const std::string& host, int16_t port,
+ ConnectionCodec::Factory* codec) = 0;
};
inline Acceptor::~Acceptor() {}
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 43fbfdf7be..5133fde183 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -43,19 +43,15 @@ namespace qpid {
namespace sys {
class AsynchIOAcceptor : public Acceptor {
- Poller::shared_ptr poller;
Socket listener;
- int numIOThreads;
const uint16_t listeningPort;
+ std::auto_ptr<AsynchAcceptor> acceptor;
public:
- AsynchIOAcceptor(int16_t port, int backlog, int threads);
- ~AsynchIOAcceptor() {}
- void run(ConnectionCodec::Factory*);
- void connect(const std::string& host, int16_t port, ConnectionCodec::Factory*);
+ AsynchIOAcceptor(int16_t port, int backlog);
+ void run(Poller::shared_ptr, ConnectionCodec::Factory*);
+ void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*);
- void shutdown();
-
uint16_t getPort() const;
std::string getHost() const;
@@ -63,15 +59,14 @@ class AsynchIOAcceptor : public Acceptor {
void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
};
-Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads)
+Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog)
{
- return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads));
+ return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog));
}
-AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
- poller(new Poller),
- numIOThreads(threads),
- listeningPort(listener.listen(port, backlog))
+AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) :
+ listeningPort(listener.listen(port, backlog)),
+ acceptor(0)
{}
// Buffer definition
@@ -157,30 +152,17 @@ std::string AsynchIOAcceptor::getHost() const {
return listener.getSockname();
}
-void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) {
- Dispatcher d(poller);
- AsynchAcceptor
- acceptor(listener,
- boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact));
- acceptor.start(poller);
-
- std::vector<Thread> t(numIOThreads-1);
-
- // Run n-1 io threads
- for (int i=0; i<numIOThreads-1; ++i)
- t[i] = Thread(d);
-
- // Run final thread
- d.run();
-
- // Now wait for n-1 io threads to exit
- for (int i=0; i<numIOThreads-1; ++i) {
- t[i].join();
- }
+void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
+ acceptor.reset(
+ new AsynchAcceptor(listener,
+ boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)));
+ acceptor->start(poller);
}
void AsynchIOAcceptor::connect(
- const std::string& host, int16_t port, ConnectionCodec::Factory* f)
+ Poller::shared_ptr poller,
+ const std::string& host, int16_t port,
+ ConnectionCodec::Factory* f)
{
Socket* socket = new Socket();//Should be deleted by handle when socket closes
socket->connect(host, port);
@@ -202,14 +184,6 @@ void AsynchIOAcceptor::connect(
aio->start(poller);
}
-
-void AsynchIOAcceptor::shutdown() {
- // NB: this function must be async-signal safe, it must not
- // call any function that is not async-signal safe.
- poller->shutdown();
-}
-
-
void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
{
QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")");