diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 92 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.h | 11 |
2 files changed, 27 insertions, 76 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 08dae4105d..b25f19e4ba 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -25,12 +25,6 @@ #include "qpid/framing/AMQFrame.h" #include "Connector.h" -#include "qpid/sys/AsynchIO.h" -#include "qpid/sys/Dispatcher.h" -#include "qpid/sys/Poller.h" - -#include <boost/bind.hpp> - namespace qpid { namespace client { @@ -49,6 +43,7 @@ Connector::Connector( idleIn(0), idleOut(0), timeoutHandler(0), shutdownHandler(0), + inbuf(receive_buffer_size), outbuf(send_buffer_size) { } @@ -61,7 +56,6 @@ Connector::~Connector(){ void Connector::connect(const std::string& host, int port){ socket.connect(host, port); closed = false; - poller = Poller::shared_ptr(new Poller); receiver = Thread(this); } @@ -74,7 +68,7 @@ void Connector::init(){ bool Connector::closeInternal() { Mutex::ScopedLock l(closedLock); if (!closed) { - poller->shutdown(); + socket.close(); closed = true; return true; } @@ -97,8 +91,6 @@ OutputHandler* Connector::getOutputHandler(){ return this; } -// TODO: astitcher 20070908: Writing still needs to be transferred to the aynchronous IO -// framework. void Connector::send(AMQFrame& frame){ writeBlock(&frame); QPID_LOG(trace, "SENT: " << frame); @@ -129,10 +121,6 @@ void Connector::handleClosed() { shutdownHandler->shutdown(); } -// TODO: astitcher 20070908: This version of the code can never time out, so the idle processing -// can never be called. The timeut processing needs to be added into the underlying Dispatcher code -// -// TODO: astitcher 20070908: EOF is dealt with separately now via a callback to eof void Connector::checkIdle(ssize_t status){ if(timeoutHandler){ AbsTime t = now(); @@ -178,65 +166,33 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){ timeoutHandler = handler; } - -// Buffer definition -struct Buff : public AsynchIO::Buffer { - Buff() : - AsynchIO::Buffer(new char[65536], 65536) - {} - ~Buff() - { delete [] bytes;} -}; - -void Connector::readbuff(AsynchIO& aio, AsynchIO::Buffer* buff) { - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); - - AMQFrame frame(version); - while(frame.decode(in)){ - QPID_LOG(trace, "RECV: " << frame); - input->received(frame); - } - // TODO: unreading needs to go away, and when we can cope - // with multiple sub-buffers in the general buffer scheme, it will - if (in.available() != 0) { - // Adjust buffer for used bytes and then "unread them" - buff->dataStart += buff->dataCount-in.available(); - buff->dataCount = in.available(); - aio.unread(buff); - } else { - // Give whole buffer back to aio subsystem - aio.queueReadBuffer(buff); - } -} - -void Connector::eof(AsynchIO&) { - handleClosed(); -} - -// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing -// will never be called void Connector::run(){ - try { - Dispatcher d(poller); - - AsynchIO* aio = new AsynchIO(socket, - boost::bind(&Connector::readbuff, this, _1, _2), - boost::bind(&Connector::eof, this, _1), - boost::bind(&Connector::eof, this, _1)); - - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff); + try{ + while(!closed){ + ssize_t available = inbuf.available(); + if(available < 1){ + THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); + } + ssize_t received = socket.recv(inbuf.start(), available); + checkIdle(received); + + if(!closed && received > 0){ + inbuf.move(received); + inbuf.flip();//position = 0, limit = total data read + + AMQFrame frame(version); + while(frame.decode(inbuf)){ + QPID_LOG(trace, "RECV: " << frame); + input->received(frame); + } + //need to compact buffer to preserve any 'extra' data + inbuf.compact(); } - - aio->start(poller); - d.run(); - aio->queueForDeletion(); - socket.close(); - } catch (const std::exception& e) { + } + } catch (const std::exception& e) { QPID_LOG(error, e.what()); handleClosed(); } } - }} // namespace qpid::client diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index 82c9db2ef1..1577564d57 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -34,10 +34,9 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Socket.h" #include "qpid/sys/Time.h" -#include "qpid/sys/AsynchIO.h" namespace qpid { - + namespace client { class Connector : public framing::OutputHandler, @@ -63,6 +62,7 @@ class Connector : public framing::OutputHandler, framing::InitiationHandler* initialiser; framing::OutputHandler* output; + framing::Buffer inbuf; framing::Buffer outbuf; sys::Mutex writeLock; @@ -70,8 +70,6 @@ class Connector : public framing::OutputHandler, sys::Socket socket; - sys::Poller::shared_ptr poller; - void checkIdle(ssize_t status); void writeBlock(framing::AMQDataBlock* data); void writeToSocket(char* data, size_t available); @@ -80,10 +78,7 @@ class Connector : public framing::OutputHandler, void run(); void handleClosed(); bool closeInternal(); - - void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::Buffer*); - void eof(qpid::sys::AsynchIO&); - + friend class Channel; public: Connector(framing::ProtocolVersion pVersion, |