diff options
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r-- | cpp/src/qpid/sys/AsynchIO.h | 22 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 14 |
3 files changed, 22 insertions, 24 deletions
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h index 7cc7995ee2..ea2badf456 100644 --- a/cpp/src/qpid/sys/AsynchIO.h +++ b/cpp/src/qpid/sys/AsynchIO.h @@ -63,26 +63,24 @@ private: */ class AsynchIO : private DispatchHandle { public: - struct Buffer { - typedef boost::function1<void, const Buffer&> RecycleStorage; - + struct BufferBase { char* const bytes; const int32_t byteCount; int32_t dataStart; int32_t dataCount; - Buffer(char* const b, const int32_t s) : + BufferBase(char* const b, const int32_t s) : bytes(b), byteCount(s), dataStart(0), dataCount(0) {} - virtual ~Buffer() + virtual ~BufferBase() {} }; - typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback; + typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback; typedef boost::function1<void, AsynchIO&> EofCallback; typedef boost::function1<void, AsynchIO&> DisconnectCallback; typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback; @@ -96,8 +94,8 @@ private: ClosedCallback closedCallback; BuffersEmptyCallback emptyCallback; IdleCallback idleCallback; - std::deque<Buffer*> bufferQueue; - std::deque<Buffer*> writeQueue; + std::deque<BufferBase*> bufferQueue; + std::deque<BufferBase*> writeQueue; bool queuedClose; public: @@ -107,11 +105,11 @@ public: void queueForDeletion(); void start(Poller::shared_ptr poller); - void queueReadBuffer(Buffer* buff); - void queueWrite(Buffer* buff = 0); - void unread(Buffer* buff); + void queueReadBuffer(BufferBase* buff); + void queueWrite(BufferBase* buff = 0); + void unread(BufferBase* buff); void queueWriteClose(); - Buffer* getQueuedBuffer(); + BufferBase* getQueuedBuffer(); const Socket& getSocket() const { return DispatchHandle::getSocket(); } private: diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 6cd43dc4f3..0700fff8af 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -74,9 +74,9 @@ AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) : {} // Buffer definition -struct Buff : public AsynchIO::Buffer { +struct Buff : public AsynchIO::BufferBase { Buff() : - AsynchIO::Buffer(new char[65536], 65536) + AsynchIO::BufferBase(new char[65536], 65536) {} ~Buff() { delete [] bytes;} @@ -113,7 +113,7 @@ public: void close(); // Input side - void readbuff(AsynchIO& aio, AsynchIO::Buffer* buff); + void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); void eof(AsynchIO& aio); void disconnect(AsynchIO& aio); @@ -200,7 +200,7 @@ void AsynchIOHandler::close() { } // Input side -void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::Buffer* buff) { +void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); if(initiated){ framing::AMQFrame frame; @@ -264,7 +264,7 @@ void AsynchIOHandler::idle(AsynchIO&){ do { // Try and get a queued buffer if not then construct new one - AsynchIO::Buffer* buff = aio->getQueuedBuffer(); + AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); if (!buff) buff = new Buff; framing::Buffer out(buff->bytes, buff->byteCount); diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 2b462cbd7a..3512363d46 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -121,7 +121,7 @@ void AsynchIO::start(Poller::shared_ptr poller) { DispatchHandle::startWatch(poller); } -void AsynchIO::queueReadBuffer(Buffer* buff) { +void AsynchIO::queueReadBuffer(BufferBase* buff) { assert(buff); buff->dataStart = 0; buff->dataCount = 0; @@ -129,7 +129,7 @@ void AsynchIO::queueReadBuffer(Buffer* buff) { DispatchHandle::rewatchRead(); } -void AsynchIO::unread(Buffer* buff) { +void AsynchIO::unread(BufferBase* buff) { assert(buff); if (buff->dataStart != 0) { memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); @@ -141,7 +141,7 @@ void AsynchIO::unread(Buffer* buff) { // Either queue for writing or announce that there is something to write // and we should ask for it -void AsynchIO::queueWrite(Buffer* buff) { +void AsynchIO::queueWrite(BufferBase* buff) { // If no buffer then don't queue anything // (but still wake up for writing) if (buff) { @@ -163,11 +163,11 @@ void AsynchIO::queueWriteClose() { /** Return a queued buffer if there are enough * to spare */ -AsynchIO::Buffer* AsynchIO::getQueuedBuffer() { +AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { // Always keep at least one buffer (it might have data that was "unread" in it) if (bufferQueue.size()<=1) return 0; - Buffer* buff = bufferQueue.back(); + BufferBase* buff = bufferQueue.back(); buff->dataStart = 0; buff->dataCount = 0; bufferQueue.pop_back(); @@ -183,7 +183,7 @@ void AsynchIO::readable(DispatchHandle& h) { // (Try to) get a buffer if (!bufferQueue.empty()) { // Read into buffer - Buffer* buff = bufferQueue.front(); + BufferBase* buff = bufferQueue.front(); bufferQueue.pop_front(); errno = 0; int readCount = buff->byteCount-buff->dataCount; @@ -239,7 +239,7 @@ void AsynchIO::writeable(DispatchHandle& h) { // See if we've got something to write if (!writeQueue.empty()) { // Write buffer - Buffer* buff = writeQueue.back(); + BufferBase* buff = writeQueue.back(); writeQueue.pop_back(); errno = 0; assert(buff->dataStart+buff->dataCount <= buff->byteCount); |