diff options
author | Alan Conway <aconway@apache.org> | 2008-01-10 22:50:23 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-01-10 22:50:23 +0000 |
commit | b8f05c981543c406c17cc3aa8362b360449bb5e3 (patch) | |
tree | 24b3cf051fbcc17deb5ec20def42a97bbd7fc0ee /qpid/cpp/src | |
parent | 8d3a4cca33c6dcd7cee82bad259c0a18037eca44 (diff) | |
download | qpid-python-b8f05c981543c406c17cc3aa8362b360449bb5e3.tar.gz |
Client always collects at least an entire frameset into a single buffer
when possible. Based on patch from Gordon Sim.
- Refactor Connector::writebuff, ::send as Connector::Writer
- Collect frames up to EOF notifying AIO write.
- Encode all available complete framesets into buffers as compactly as possible.
- Logging buffer size and frames encoded per write for client and broker.
- framing::Buffer added getPosition(), getSize(), default ctor, copy ctor.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@610972 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/client/Connector.cpp | 175 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connector.h | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/Buffer.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/Buffer.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 246 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp | 59 | ||||
-rw-r--r-- | qpid/cpp/src/tests/perftest.cpp | 2 |
7 files changed, 287 insertions, 240 deletions
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index 23d2c3ff8d..0e3afdd3f0 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -48,8 +48,7 @@ Connector::Connector( timeoutHandler(0), shutdownHandler(0), aio(0) -{ -} +{} Connector::~Connector() { close(); @@ -62,12 +61,13 @@ void Connector::connect(const std::string& host, int port){ closed = false; poller = Poller::shared_ptr(new Poller); aio = new AsynchIO(socket, - boost::bind(&Connector::readbuff, this, _1, _2), - boost::bind(&Connector::eof, this, _1), - boost::bind(&Connector::eof, this, _1), - 0, // closed - 0, // nobuffs - boost::bind(&Connector::writebuff, this, _1)); + boost::bind(&Connector::readbuff, this, _1, _2), + boost::bind(&Connector::eof, this, _1), + boost::bind(&Connector::eof, this, _1), + 0, // closed + 0, // nobuffs + boost::bind(&Connector::writebuff, this, _1)); + writer.setAio(aio); } void Connector::init(){ @@ -103,12 +103,8 @@ OutputHandler* Connector::getOutputHandler(){ return this; } -void Connector::send(AMQFrame& frame){ - Mutex::ScopedLock l(writeLock); - writeFrameQueue.push(frame); - aio->notifyPendingWrite(); - - QPID_LOG(trace, "SENT [" << this << "]: " << frame); +void Connector::send(AMQFrame& frame) { + writer.handle(frame); } void Connector::handleClosed() { @@ -165,70 +161,89 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){ timeoutHandler = handler; } - -// Buffer definition -struct Buff : public AsynchIO::BufferBase { - Buff() : - AsynchIO::BufferBase(new char[65536], 65536) - {} - ~Buff() - { delete [] bytes;} +struct Connector::Buff : public AsynchIO::BufferBase { + Buff() : AsynchIO::BufferBase(new char[65536], 65536) {} + ~Buff() { delete [] bytes;} }; -void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); +Connector::Writer::Writer() : aio(0), buffer(0), lastEof(frames.begin()) {} + +Connector::Writer::~Writer() { delete buffer; } - AMQFrame frame; - while(frame.decode(in)){ - QPID_LOG(trace, "RECV [" << this << "]: " << frame); - input->received(frame); - } - // TODO: unreading needs to go away, and when we can cope - // with multiple sub-buffers in the general buffer scheme, it will - if (in.available() != 0) { - // Adjust buffer for used bytes and then "unread them" - buff->dataStart += buff->dataCount-in.available(); - buff->dataCount = in.available(); - aio.unread(buff); - } else { - // Give whole buffer back to aio subsystem - aio.queueReadBuffer(buff); - } +void Connector::Writer::setAio(sys::AsynchIO* a) { + Mutex::ScopedLock l(lock); + aio = a; + newBuffer(l); } -void Connector::writebuff(AsynchIO& aio) { - Mutex::ScopedLock l(writeLock); - - if (writeFrameQueue.empty()) { - return; +void Connector::Writer::handle(framing::AMQFrame& frame) { + Mutex::ScopedLock l(lock); + frames.push_back(frame); + if (frame.getEof()) { + lastEof = frames.end(); + aio->notifyPendingWrite(); } + QPID_LOG(trace, "SENT [" << this << "]: " << frame); +} - do { - // Try and get a queued buffer if not then construct new one - AsynchIO::BufferBase* buff = aio.getQueuedBuffer(); - if (!buff) - buff = new Buff; - framing::Buffer out(buff->bytes, buff->byteCount); - int buffUsed = 0; - - framing::AMQFrame frame = writeFrameQueue.front(); - int frameSize = frame.size(); - while (frameSize <= int(out.available())) { - - // Encode output frame - frame.encode(out); - buffUsed += frameSize; - - writeFrameQueue.pop(); - if (writeFrameQueue.empty()) - break; - frame = writeFrameQueue.front(); - frameSize = frame.size(); - } - - buff->dataCount = buffUsed; - aio.queueWrite(buff); - } while (!writeFrameQueue.empty()); +void Connector::Writer::writeOne(const Mutex::ScopedLock& l) { + assert(buffer); + QPID_LOG(trace, "Write buffer " << encode.getPosition() + << " bytes " << framesEncoded << " frames "); + framesEncoded = 0; + + buffer->dataStart = 0; + buffer->dataCount = encode.getPosition(); + aio->queueWrite(buffer); + newBuffer(l); +} + +void Connector::Writer::newBuffer(const Mutex::ScopedLock&) { + buffer = aio->getQueuedBuffer(); + if (!buffer) buffer = new Buff(); + encode = framing::Buffer(buffer->bytes, buffer->byteCount); + framesEncoded = 0; +} + +// Called in IO thread. +void Connector::Writer::write(sys::AsynchIO& aio_) { + Mutex::ScopedLock l(lock); + assert(&aio_ == aio); + assert(buffer); + for (Frames::iterator i = frames.begin(); i != lastEof; ++i) { + if (i->size() > encode.available()) writeOne(l); + assert(i->size() <= encode.available()); + i->encode(encode); + ++framesEncoded; + } + frames.erase(frames.begin(), lastEof); + lastEof = frames.begin(); + if (encode.getPosition() > 0) writeOne(l); +} + +void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + + AMQFrame frame; + while(frame.decode(in)){ + QPID_LOG(trace, "RECV [" << this << "]: " << frame); + input->received(frame); + } + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (in.available() != 0) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += buff->dataCount-in.available(); + buff->dataCount = in.available(); + aio.unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio.queueReadBuffer(buff); + } +} + +void Connector::writebuff(AsynchIO& aio_) { + writer.write(aio_); } void Connector::writeDataBlock(const AMQDataBlock& data) { @@ -240,24 +255,24 @@ void Connector::writeDataBlock(const AMQDataBlock& data) { } void Connector::eof(AsynchIO&) { - handleClosed(); + handleClosed(); } // TODO: astitcher 20070908 This version of the code can never time out, so the idle processing // will never be called void Connector::run(){ - try { - Dispatcher d(poller); + try { + Dispatcher d(poller); - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff); - } + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff); + } - aio->start(poller); - d.run(); + aio->start(poller); + d.run(); aio->queueForDeletion(); socket.close(); - } catch (const std::exception& e) { + } catch (const std::exception& e) { QPID_LOG(error, e.what()); handleClosed(); } diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h index b1d759569c..9897789901 100644 --- a/qpid/cpp/src/qpid/client/Connector.h +++ b/qpid/cpp/src/qpid/client/Connector.h @@ -45,6 +45,33 @@ namespace client { class Connector : public framing::OutputHandler, private sys::Runnable { + struct Buff; + + /** Batch up frames for writing to aio. */ + class Writer : public framing::FrameHandler { + typedef sys::AsynchIO::BufferBase BufferBase; + typedef std::vector<framing::AMQFrame> Frames; + + sys::Mutex lock; + sys::AsynchIO* aio; + BufferBase* buffer; + Frames frames; + Frames::iterator lastEof; // Points after last EOF in frames + framing::Buffer encode; + size_t framesEncoded; + + void writeOne(const sys::Mutex::ScopedLock&); + void newBuffer(const sys::Mutex::ScopedLock&); + + public: + + Writer(); + ~Writer(); + void setAio(sys::AsynchIO*); + void handle(framing::AMQFrame&); + void write(sys::AsynchIO&); + }; + const bool debug; const int receive_buffer_size; const int send_buffer_size; @@ -65,8 +92,7 @@ class Connector : public framing::OutputHandler, framing::InitiationHandler* initialiser; framing::OutputHandler* output; - sys::Mutex writeLock; - std::queue<framing::AMQFrame> writeFrameQueue; + Writer writer; sys::Thread receiver; diff --git a/qpid/cpp/src/qpid/framing/Buffer.cpp b/qpid/cpp/src/qpid/framing/Buffer.cpp index b42797414f..7eadf377b9 100644 --- a/qpid/cpp/src/qpid/framing/Buffer.cpp +++ b/qpid/cpp/src/qpid/framing/Buffer.cpp @@ -47,10 +47,6 @@ void Buffer::reset(){ position = 0; } -uint32_t Buffer::available(){ - return size - position; -} - /////////////////////////////////////////////////// void Buffer::putOctet(uint8_t i){ diff --git a/qpid/cpp/src/qpid/framing/Buffer.h b/qpid/cpp/src/qpid/framing/Buffer.h index fe33dbd366..5ab897d351 100644 --- a/qpid/cpp/src/qpid/framing/Buffer.h +++ b/qpid/cpp/src/qpid/framing/Buffer.h @@ -34,7 +34,7 @@ class FieldTable; class Buffer { - const uint32_t size; + uint32_t size; char* data; uint32_t position; uint32_t r_position; @@ -43,13 +43,16 @@ class Buffer public: - Buffer(char* data, uint32_t size); + Buffer(char* data=0, uint32_t size=0); void record(); void restore(bool reRecord = false); void reset(); - uint32_t available(); - + + uint32_t available() { return size - position; } + uint32_t getSize() { return size; } + uint32_t getPosition() { return position; } + void putOctet(uint8_t i); void putShort(uint16_t i); void putLong(uint32_t i); diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 9ca083354b..c2c4b545f9 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -44,12 +44,12 @@ namespace qpid { namespace sys { class AsynchIOAcceptor : public Acceptor { - Poller::shared_ptr poller; - Socket listener; - int numIOThreads; - const uint16_t listeningPort; + Poller::shared_ptr poller; + Socket listener; + int numIOThreads; + const uint16_t listeningPort; -public: + public: AsynchIOAcceptor(int16_t port, int backlog, int threads); ~AsynchIOAcceptor() {} void run(ConnectionInputHandlerFactory* factory); @@ -58,7 +58,7 @@ public: uint16_t getPort() const; std::string getHost() const; -private: + private: void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*); }; @@ -69,9 +69,9 @@ Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads) } AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) : - poller(new Poller), - numIOThreads(threads), - listeningPort(listener.listen(port, backlog)) + poller(new Poller), + numIOThreads(threads), + listeningPort(listener.listen(port, backlog)) {} // Buffer definition @@ -93,53 +93,53 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { bool readError; std::string identifier; -public: - AsynchIOHandler() : - inputHandler(0), - frameQueueClosed(false), - initiated(false), - readError(false) - {} + public: + AsynchIOHandler() : + inputHandler(0), + frameQueueClosed(false), + initiated(false), + readError(false) + {} - ~AsynchIOHandler() { - if (inputHandler) - inputHandler->closed(); - delete inputHandler; - } - - void init(AsynchIO* a, ConnectionInputHandler* h) { - aio = a; - inputHandler = h; - } - - // Output side - void send(framing::AMQFrame&); - void close(); - void activateOutput(); - - // Input side - void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); - void eof(AsynchIO& aio); - void disconnect(AsynchIO& aio); + ~AsynchIOHandler() { + if (inputHandler) + inputHandler->closed(); + delete inputHandler; + } + + void init(AsynchIO* a, ConnectionInputHandler* h) { + aio = a; + inputHandler = h; + } + + // Output side + void send(framing::AMQFrame&); + void close(); + void activateOutput(); + + // Input side + void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); + void eof(AsynchIO& aio); + void disconnect(AsynchIO& aio); - // Notifications - void nobuffs(AsynchIO& aio); - void idle(AsynchIO& aio); - void closedSocket(AsynchIO& aio, const Socket& s); + // Notifications + void nobuffs(AsynchIO& aio); + void idle(AsynchIO& aio); + void closedSocket(AsynchIO& aio, const Socket& s); }; void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) { - AsynchIOHandler* async = new AsynchIOHandler; - ConnectionInputHandler* handler = f->create(async, s); + AsynchIOHandler* async = new AsynchIOHandler; + ConnectionInputHandler* handler = f->create(async, s); AsynchIO* aio = new AsynchIO(s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, handler); + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + async->init(aio, handler); // Give connection some buffers to use for (int i = 0; i < 4; i++) { @@ -158,50 +158,50 @@ std::string AsynchIOAcceptor::getHost() const { } void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { - Dispatcher d(poller); - AsynchAcceptor - acceptor(listener, - boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)); - acceptor.start(poller); + Dispatcher d(poller); + AsynchAcceptor + acceptor(listener, + boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)); + acceptor.start(poller); - std::vector<Thread> t(numIOThreads-1); + std::vector<Thread> t(numIOThreads-1); - // Run n-1 io threads - for (int i=0; i<numIOThreads-1; ++i) - t[i] = Thread(d); + // Run n-1 io threads + for (int i=0; i<numIOThreads-1; ++i) + t[i] = Thread(d); - // Run final thread - d.run(); + // Run final thread + d.run(); - // Now wait for n-1 io threads to exit - for (int i=0; i<numIOThreads-1; ++i) { - t[i].join(); - } + // Now wait for n-1 io threads to exit + for (int i=0; i<numIOThreads-1; ++i) { + t[i].join(); + } } void AsynchIOAcceptor::shutdown() { - poller->shutdown(); + poller->shutdown(); } // Output side void AsynchIOHandler::send(framing::AMQFrame& frame) { - // TODO: Need to find out if we are in the callback context, - // in the callback thread if so we can go further than just queuing the frame - // to be handled later - { + // TODO: Need to find out if we are in the callback context, + // in the callback thread if so we can go further than just queuing the frame + // to be handled later + { ScopedLock<Mutex> l(frameQueueLock); // Ignore anything seen after closing if (!frameQueueClosed) - frameQueue.push(frame); - } + frameQueue.push(frame); + } - // Activate aio for writing here - aio->notifyPendingWrite(); + // Activate aio for writing here + aio->notifyPendingWrite(); } void AsynchIOHandler::close() { - ScopedLock<Mutex> l(frameQueueLock); - frameQueueClosed = true; + ScopedLock<Mutex> l(frameQueueLock); + frameQueueClosed = true; } void AsynchIOHandler::activateOutput() { @@ -218,7 +218,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { framing::AMQFrame frame; try{ while(frame.decode(in)) { - QPID_LOG(debug, "RECV [" << identifier << "]: " << frame); + QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); inputHandler->received(frame); } }catch(const std::exception& e){ @@ -249,9 +249,9 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { } void AsynchIOHandler::eof(AsynchIO&) { - QPID_LOG(debug, "DISCONNECTED [" << identifier << "]"); - inputHandler->closed(); - aio->queueWriteClose(); + QPID_LOG(debug, "DISCONNECTED [" << identifier << "]"); + inputHandler->closed(); + aio->queueWriteClose(); } void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) { @@ -259,14 +259,14 @@ void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) { if (!aio->writeQueueEmpty()) { QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data (probably due to client disconnect)"); } - delete &s; - aio->queueForDeletion(); - delete this; + delete &s; + aio->queueForDeletion(); + delete this; } void AsynchIOHandler::disconnect(AsynchIO& a) { - // treat the same as eof - eof(a); + // treat the same as eof + eof(a); } // Notifications @@ -274,50 +274,54 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { } void AsynchIOHandler::idle(AsynchIO&){ - ScopedLock<Mutex> l(frameQueueLock); + ScopedLock<Mutex> l(frameQueueLock); - if (frameQueue.empty()) { - // At this point we know that we're write idling the connection - // so tell the input handler to queue any available output: - inputHandler->doOutput(); - //if still no frames, theres nothing to do: - if (frameQueue.empty()) return; - } + if (frameQueue.empty()) { + // At this point we know that we're write idling the connection + // so tell the input handler to queue any available output: + inputHandler->doOutput(); + //if still no frames, theres nothing to do: + if (frameQueue.empty()) return; + } - do { - // Try and get a queued buffer if not then construct new one - AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) - buff = new Buff; - framing::Buffer out(buff->bytes, buff->byteCount); - int buffUsed = 0; + do { + // Try and get a queued buffer if not then construct new one + AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); + if (!buff) + buff = new Buff; + framing::Buffer out(buff->bytes, buff->byteCount); + int buffUsed = 0; - framing::AMQFrame frame = frameQueue.front(); - int frameSize = frame.size(); - while (frameSize <= int(out.available())) { - frameQueue.pop(); + framing::AMQFrame frame = frameQueue.front(); + int frameSize = frame.size(); + int framesEncoded=0; + while (frameSize <= int(out.available())) { + frameQueue.pop(); - // Encode output frame - frame.encode(out); - buffUsed += frameSize; - QPID_LOG(debug, "SENT [" << identifier << "]: " << frame); + // Encode output frame + frame.encode(out); + ++framesEncoded; + buffUsed += frameSize; + QPID_LOG(trace, "SENT [" << identifier << "]: " << frame); - if (frameQueue.empty()) - break; - frame = frameQueue.front(); - frameSize = frame.size(); - } - // If frame was egregiously large complain - if (frameSize > buff->byteCount) - throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); + if (frameQueue.empty()) + break; + frame = frameQueue.front(); + frameSize = frame.size(); + } + QPID_LOG(trace, "Writing buffer: " << buffUsed << " bytes " << framesEncoded << " frames "); + + // If frame was egregiously large complain + if (frameSize > buff->byteCount) + throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); - buff->dataCount = buffUsed; - aio->queueWrite(buff); - } while (!frameQueue.empty()); + buff->dataCount = buffUsed; + aio->queueWrite(buff); + } while (!frameQueue.empty()); - if (frameQueueClosed) { - aio->queueWriteClose(); - } + if (frameQueueClosed) { + aio->queueWriteClose(); + } } }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index f8aaa38cf5..94c68bd5d0 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -42,7 +42,7 @@ namespace { * pipe/socket (necessary as default action is to terminate process) */ void ignoreSigpipe() { - ::signal(SIGPIPE, SIG_IGN); + ::signal(SIGPIPE, SIG_IGN); } /* @@ -88,7 +88,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) { if (s) { acceptedCallback(*s); } else { - break; + break; } } while (true); @@ -99,13 +99,13 @@ void AsynchAcceptor::readable(DispatchHandle& h) { * Asynch reader/writer */ AsynchIO::AsynchIO(const Socket& s, - ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : + ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, + ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : DispatchHandle(s, - boost::bind(&AsynchIO::readable, this, _1), - boost::bind(&AsynchIO::writeable, this, _1), - boost::bind(&AsynchIO::disconnected, this, _1)), + boost::bind(&AsynchIO::readable, this, _1), + boost::bind(&AsynchIO::writeable, this, _1), + boost::bind(&AsynchIO::disconnected, this, _1)), readCallback(rCb), eofCallback(eofCb), disCallback(disCb), @@ -120,8 +120,8 @@ AsynchIO::AsynchIO(const Socket& s, struct deleter { - template <typename T> - void operator()(T *ptr){ delete ptr;} + template <typename T> + void operator()(T *ptr){ delete ptr;} }; AsynchIO::~AsynchIO() { @@ -138,7 +138,7 @@ void AsynchIO::start(Poller::shared_ptr poller) { } void AsynchIO::queueReadBuffer(BufferBase* buff) { - assert(buff); + assert(buff); buff->dataStart = 0; buff->dataCount = 0; bufferQueue.push_back(buff); @@ -146,11 +146,11 @@ void AsynchIO::queueReadBuffer(BufferBase* buff) { } void AsynchIO::unread(BufferBase* buff) { - assert(buff); - if (buff->dataStart != 0) { - memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); - buff->dataStart = 0; - } + assert(buff); + if (buff->dataStart != 0) { + memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); + buff->dataStart = 0; + } bufferQueue.push_front(buff); DispatchHandle::rewatchRead(); } @@ -182,14 +182,15 @@ void AsynchIO::queueWriteClose() { * to spare */ AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { - // Always keep at least one buffer (it might have data that was "unread" in it) - if (bufferQueue.size()<=1) - return 0; - BufferBase* buff = bufferQueue.back(); - buff->dataStart = 0; - buff->dataCount = 0; - bufferQueue.pop_back(); - return buff; + // Always keep at least one buffer (it might have data that was "unread" in it) + if (bufferQueue.size()<=1) + return 0; + BufferBase* buff = bufferQueue.back(); + assert(buff); + buff->dataStart = 0; + buff->dataCount = 0; + bufferQueue.pop_back(); + return buff; } /* @@ -204,6 +205,7 @@ void AsynchIO::readable(DispatchHandle& h) { if (!bufferQueue.empty()) { // Read into buffer BufferBase* buff = bufferQueue.front(); + assert(buff); bufferQueue.pop_front(); errno = 0; int readCount = buff->byteCount-buff->dataCount; @@ -227,6 +229,7 @@ void AsynchIO::readable(DispatchHandle& h) { } else { // Put buffer back (at front so it doesn't interfere with unread buffers) bufferQueue.push_front(buff); + assert(buff); // Eof or other side has gone away if (rc == 0 || errno == ECONNRESET) { @@ -352,10 +355,10 @@ void AsynchIO::disconnected(DispatchHandle& h) { * Close the socket and callback to say we've done it */ void AsynchIO::close(DispatchHandle& h) { - h.stopWatch(); - h.getSocket().close(); - if (closedCallback) { - closedCallback(*this, getSocket()); - } + h.stopWatch(); + h.getSocket().close(); + if (closedCallback) { + closedCallback(*this, getSocket()); + } } diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp index ee210891fe..1bd5a963de 100644 --- a/qpid/cpp/src/tests/perftest.cpp +++ b/qpid/cpp/src/tests/perftest.cpp @@ -476,7 +476,7 @@ struct SubscribeThread : public Client { session.close(); } catch (const std::exception& e) { - cout << "Publisher exception: " << e.what() << endl; + cout << "SubscribeThread exception: " << e.what() << endl; exit(1); } } |