diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 92 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.h | 11 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Acceptor.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 8 |
5 files changed, 82 insertions, 34 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 98487631cf..bfb0a09bf0 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -153,8 +153,7 @@ Acceptor& Broker::getAcceptor() const { const_cast<Acceptor::shared_ptr&>(acceptor) = Acceptor::create(config.port, config.connectionBacklog, - config.workerThreads, - false); + config.workerThreads); QPID_LOG(info, "Listening on port " << getPort()); } return *acceptor; diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index b25f19e4ba..08dae4105d 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -25,6 +25,12 @@ #include "qpid/framing/AMQFrame.h" #include "Connector.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Poller.h" + +#include <boost/bind.hpp> + namespace qpid { namespace client { @@ -43,7 +49,6 @@ Connector::Connector( idleIn(0), idleOut(0), timeoutHandler(0), shutdownHandler(0), - inbuf(receive_buffer_size), outbuf(send_buffer_size) { } @@ -56,6 +61,7 @@ Connector::~Connector(){ void Connector::connect(const std::string& host, int port){ socket.connect(host, port); closed = false; + poller = Poller::shared_ptr(new Poller); receiver = Thread(this); } @@ -68,7 +74,7 @@ void Connector::init(){ bool Connector::closeInternal() { Mutex::ScopedLock l(closedLock); if (!closed) { - socket.close(); + poller->shutdown(); closed = true; return true; } @@ -91,6 +97,8 @@ OutputHandler* Connector::getOutputHandler(){ return this; } +// TODO: astitcher 20070908: Writing still needs to be transferred to the aynchronous IO +// framework. void Connector::send(AMQFrame& frame){ writeBlock(&frame); QPID_LOG(trace, "SENT: " << frame); @@ -121,6 +129,10 @@ void Connector::handleClosed() { shutdownHandler->shutdown(); } +// TODO: astitcher 20070908: This version of the code can never time out, so the idle processing +// can never be called. The timeut processing needs to be added into the underlying Dispatcher code +// +// TODO: astitcher 20070908: EOF is dealt with separately now via a callback to eof void Connector::checkIdle(ssize_t status){ if(timeoutHandler){ AbsTime t = now(); @@ -166,33 +178,65 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){ timeoutHandler = handler; } + +// Buffer definition +struct Buff : public AsynchIO::Buffer { + Buff() : + AsynchIO::Buffer(new char[65536], 65536) + {} + ~Buff() + { delete [] bytes;} +}; + +void Connector::readbuff(AsynchIO& aio, AsynchIO::Buffer* buff) { + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + + AMQFrame frame(version); + while(frame.decode(in)){ + QPID_LOG(trace, "RECV: " << frame); + input->received(frame); + } + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (in.available() != 0) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += buff->dataCount-in.available(); + buff->dataCount = in.available(); + aio.unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio.queueReadBuffer(buff); + } +} + +void Connector::eof(AsynchIO&) { + handleClosed(); +} + +// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing +// will never be called void Connector::run(){ - try{ - while(!closed){ - ssize_t available = inbuf.available(); - if(available < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - ssize_t received = socket.recv(inbuf.start(), available); - checkIdle(received); - - if(!closed && received > 0){ - inbuf.move(received); - inbuf.flip();//position = 0, limit = total data read - - AMQFrame frame(version); - while(frame.decode(inbuf)){ - QPID_LOG(trace, "RECV: " << frame); - input->received(frame); - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); + try { + Dispatcher d(poller); + + AsynchIO* aio = new AsynchIO(socket, + boost::bind(&Connector::readbuff, this, _1, _2), + boost::bind(&Connector::eof, this, _1), + boost::bind(&Connector::eof, this, _1)); + + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff); } - } - } catch (const std::exception& e) { + + aio->start(poller); + d.run(); + aio->queueForDeletion(); + socket.close(); + } catch (const std::exception& e) { QPID_LOG(error, e.what()); handleClosed(); } } + }} // namespace qpid::client diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index 1577564d57..82c9db2ef1 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -34,9 +34,10 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Socket.h" #include "qpid/sys/Time.h" +#include "qpid/sys/AsynchIO.h" namespace qpid { - + namespace client { class Connector : public framing::OutputHandler, @@ -62,7 +63,6 @@ class Connector : public framing::OutputHandler, framing::InitiationHandler* initialiser; framing::OutputHandler* output; - framing::Buffer inbuf; framing::Buffer outbuf; sys::Mutex writeLock; @@ -70,6 +70,8 @@ class Connector : public framing::OutputHandler, sys::Socket socket; + sys::Poller::shared_ptr poller; + void checkIdle(ssize_t status); void writeBlock(framing::AMQDataBlock* data); void writeToSocket(char* data, size_t available); @@ -78,7 +80,10 @@ class Connector : public framing::OutputHandler, void run(); void handleClosed(); bool closeInternal(); - + + void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::Buffer*); + void eof(qpid::sys::AsynchIO&); + friend class Channel; public: Connector(framing::ProtocolVersion pVersion, diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h index 8ad3186d07..2eee8b4abd 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/Acceptor.h @@ -33,7 +33,7 @@ class ConnectionInputHandlerFactory; class Acceptor : public qpid::SharedObject<Acceptor> { public: - static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false); + static Acceptor::shared_ptr create(int16_t port, int backlog, int threads); virtual ~Acceptor() = 0; virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 93cf35a043..6cd43dc4f3 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -49,7 +49,7 @@ class AsynchIOAcceptor : public Acceptor { const uint16_t listeningPort; public: - AsynchIOAcceptor(int16_t port, int backlog, int threads, bool trace); + AsynchIOAcceptor(int16_t port, int backlog, int threads); ~AsynchIOAcceptor() {} void run(ConnectionInputHandlerFactory* factory); void shutdown(); @@ -61,13 +61,13 @@ private: void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*); }; -Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace) +Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads) { return - Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads, trace)); + Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads)); } -AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads, bool) : +AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) : poller(new Poller), numIOThreads(threads), listeningPort(listener.listen(port, backlog)) |