diff options
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 175 |
1 files changed, 95 insertions, 80 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 23d2c3ff8d..0e3afdd3f0 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -48,8 +48,7 @@ Connector::Connector( timeoutHandler(0), shutdownHandler(0), aio(0) -{ -} +{} Connector::~Connector() { close(); @@ -62,12 +61,13 @@ void Connector::connect(const std::string& host, int port){ closed = false; poller = Poller::shared_ptr(new Poller); aio = new AsynchIO(socket, - boost::bind(&Connector::readbuff, this, _1, _2), - boost::bind(&Connector::eof, this, _1), - boost::bind(&Connector::eof, this, _1), - 0, // closed - 0, // nobuffs - boost::bind(&Connector::writebuff, this, _1)); + boost::bind(&Connector::readbuff, this, _1, _2), + boost::bind(&Connector::eof, this, _1), + boost::bind(&Connector::eof, this, _1), + 0, // closed + 0, // nobuffs + boost::bind(&Connector::writebuff, this, _1)); + writer.setAio(aio); } void Connector::init(){ @@ -103,12 +103,8 @@ OutputHandler* Connector::getOutputHandler(){ return this; } -void Connector::send(AMQFrame& frame){ - Mutex::ScopedLock l(writeLock); - writeFrameQueue.push(frame); - aio->notifyPendingWrite(); - - QPID_LOG(trace, "SENT [" << this << "]: " << frame); +void Connector::send(AMQFrame& frame) { + writer.handle(frame); } void Connector::handleClosed() { @@ -165,70 +161,89 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){ timeoutHandler = handler; } - -// Buffer definition -struct Buff : public AsynchIO::BufferBase { - Buff() : - AsynchIO::BufferBase(new char[65536], 65536) - {} - ~Buff() - { delete [] bytes;} +struct Connector::Buff : public AsynchIO::BufferBase { + Buff() : AsynchIO::BufferBase(new char[65536], 65536) {} + ~Buff() { delete [] bytes;} }; -void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); +Connector::Writer::Writer() : aio(0), buffer(0), lastEof(frames.begin()) {} + +Connector::Writer::~Writer() { delete buffer; } - AMQFrame frame; - while(frame.decode(in)){ - QPID_LOG(trace, "RECV [" << this << "]: " << frame); - input->received(frame); - } - // TODO: unreading needs to go away, and when we can cope - // with multiple sub-buffers in the general buffer scheme, it will - if (in.available() != 0) { - // Adjust buffer for used bytes and then "unread them" - buff->dataStart += buff->dataCount-in.available(); - buff->dataCount = in.available(); - aio.unread(buff); - } else { - // Give whole buffer back to aio subsystem - aio.queueReadBuffer(buff); - } +void Connector::Writer::setAio(sys::AsynchIO* a) { + Mutex::ScopedLock l(lock); + aio = a; + newBuffer(l); } -void Connector::writebuff(AsynchIO& aio) { - Mutex::ScopedLock l(writeLock); - - if (writeFrameQueue.empty()) { - return; +void Connector::Writer::handle(framing::AMQFrame& frame) { + Mutex::ScopedLock l(lock); + frames.push_back(frame); + if (frame.getEof()) { + lastEof = frames.end(); + aio->notifyPendingWrite(); } + QPID_LOG(trace, "SENT [" << this << "]: " << frame); +} - do { - // Try and get a queued buffer if not then construct new one - AsynchIO::BufferBase* buff = aio.getQueuedBuffer(); - if (!buff) - buff = new Buff; - framing::Buffer out(buff->bytes, buff->byteCount); - int buffUsed = 0; - - framing::AMQFrame frame = writeFrameQueue.front(); - int frameSize = frame.size(); - while (frameSize <= int(out.available())) { - - // Encode output frame - frame.encode(out); - buffUsed += frameSize; - - writeFrameQueue.pop(); - if (writeFrameQueue.empty()) - break; - frame = writeFrameQueue.front(); - frameSize = frame.size(); - } - - buff->dataCount = buffUsed; - aio.queueWrite(buff); - } while (!writeFrameQueue.empty()); +void Connector::Writer::writeOne(const Mutex::ScopedLock& l) { + assert(buffer); + QPID_LOG(trace, "Write buffer " << encode.getPosition() + << " bytes " << framesEncoded << " frames "); + framesEncoded = 0; + + buffer->dataStart = 0; + buffer->dataCount = encode.getPosition(); + aio->queueWrite(buffer); + newBuffer(l); +} + +void Connector::Writer::newBuffer(const Mutex::ScopedLock&) { + buffer = aio->getQueuedBuffer(); + if (!buffer) buffer = new Buff(); + encode = framing::Buffer(buffer->bytes, buffer->byteCount); + framesEncoded = 0; +} + +// Called in IO thread. +void Connector::Writer::write(sys::AsynchIO& aio_) { + Mutex::ScopedLock l(lock); + assert(&aio_ == aio); + assert(buffer); + for (Frames::iterator i = frames.begin(); i != lastEof; ++i) { + if (i->size() > encode.available()) writeOne(l); + assert(i->size() <= encode.available()); + i->encode(encode); + ++framesEncoded; + } + frames.erase(frames.begin(), lastEof); + lastEof = frames.begin(); + if (encode.getPosition() > 0) writeOne(l); +} + +void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + + AMQFrame frame; + while(frame.decode(in)){ + QPID_LOG(trace, "RECV [" << this << "]: " << frame); + input->received(frame); + } + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (in.available() != 0) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += buff->dataCount-in.available(); + buff->dataCount = in.available(); + aio.unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio.queueReadBuffer(buff); + } +} + +void Connector::writebuff(AsynchIO& aio_) { + writer.write(aio_); } void Connector::writeDataBlock(const AMQDataBlock& data) { @@ -240,24 +255,24 @@ void Connector::writeDataBlock(const AMQDataBlock& data) { } void Connector::eof(AsynchIO&) { - handleClosed(); + handleClosed(); } // TODO: astitcher 20070908 This version of the code can never time out, so the idle processing // will never be called void Connector::run(){ - try { - Dispatcher d(poller); + try { + Dispatcher d(poller); - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff); - } + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff); + } - aio->start(poller); - d.run(); + aio->start(poller); + d.run(); aio->queueForDeletion(); socket.close(); - } catch (const std::exception& e) { + } catch (const std::exception& e) { QPID_LOG(error, e.what()); handleClosed(); } |