summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/AsynchIO.h22
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp10
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp14
3 files changed, 22 insertions, 24 deletions
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h
index 7cc7995ee2..ea2badf456 100644
--- a/cpp/src/qpid/sys/AsynchIO.h
+++ b/cpp/src/qpid/sys/AsynchIO.h
@@ -63,26 +63,24 @@ private:
*/
class AsynchIO : private DispatchHandle {
public:
- struct Buffer {
- typedef boost::function1<void, const Buffer&> RecycleStorage;
-
+ struct BufferBase {
char* const bytes;
const int32_t byteCount;
int32_t dataStart;
int32_t dataCount;
- Buffer(char* const b, const int32_t s) :
+ BufferBase(char* const b, const int32_t s) :
bytes(b),
byteCount(s),
dataStart(0),
dataCount(0)
{}
- virtual ~Buffer()
+ virtual ~BufferBase()
{}
};
- typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
+ typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback;
typedef boost::function1<void, AsynchIO&> EofCallback;
typedef boost::function1<void, AsynchIO&> DisconnectCallback;
typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
@@ -96,8 +94,8 @@ private:
ClosedCallback closedCallback;
BuffersEmptyCallback emptyCallback;
IdleCallback idleCallback;
- std::deque<Buffer*> bufferQueue;
- std::deque<Buffer*> writeQueue;
+ std::deque<BufferBase*> bufferQueue;
+ std::deque<BufferBase*> writeQueue;
bool queuedClose;
public:
@@ -107,11 +105,11 @@ public:
void queueForDeletion();
void start(Poller::shared_ptr poller);
- void queueReadBuffer(Buffer* buff);
- void queueWrite(Buffer* buff = 0);
- void unread(Buffer* buff);
+ void queueReadBuffer(BufferBase* buff);
+ void queueWrite(BufferBase* buff = 0);
+ void unread(BufferBase* buff);
void queueWriteClose();
- Buffer* getQueuedBuffer();
+ BufferBase* getQueuedBuffer();
const Socket& getSocket() const { return DispatchHandle::getSocket(); }
private:
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 6cd43dc4f3..0700fff8af 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -74,9 +74,9 @@ AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
{}
// Buffer definition
-struct Buff : public AsynchIO::Buffer {
+struct Buff : public AsynchIO::BufferBase {
Buff() :
- AsynchIO::Buffer(new char[65536], 65536)
+ AsynchIO::BufferBase(new char[65536], 65536)
{}
~Buff()
{ delete [] bytes;}
@@ -113,7 +113,7 @@ public:
void close();
// Input side
- void readbuff(AsynchIO& aio, AsynchIO::Buffer* buff);
+ void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
void eof(AsynchIO& aio);
void disconnect(AsynchIO& aio);
@@ -200,7 +200,7 @@ void AsynchIOHandler::close() {
}
// Input side
-void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::Buffer* buff) {
+void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
if(initiated){
framing::AMQFrame frame;
@@ -264,7 +264,7 @@ void AsynchIOHandler::idle(AsynchIO&){
do {
// Try and get a queued buffer if not then construct new one
- AsynchIO::Buffer* buff = aio->getQueuedBuffer();
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
if (!buff)
buff = new Buff;
framing::Buffer out(buff->bytes, buff->byteCount);
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 2b462cbd7a..3512363d46 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -121,7 +121,7 @@ void AsynchIO::start(Poller::shared_ptr poller) {
DispatchHandle::startWatch(poller);
}
-void AsynchIO::queueReadBuffer(Buffer* buff) {
+void AsynchIO::queueReadBuffer(BufferBase* buff) {
assert(buff);
buff->dataStart = 0;
buff->dataCount = 0;
@@ -129,7 +129,7 @@ void AsynchIO::queueReadBuffer(Buffer* buff) {
DispatchHandle::rewatchRead();
}
-void AsynchIO::unread(Buffer* buff) {
+void AsynchIO::unread(BufferBase* buff) {
assert(buff);
if (buff->dataStart != 0) {
memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
@@ -141,7 +141,7 @@ void AsynchIO::unread(Buffer* buff) {
// Either queue for writing or announce that there is something to write
// and we should ask for it
-void AsynchIO::queueWrite(Buffer* buff) {
+void AsynchIO::queueWrite(BufferBase* buff) {
// If no buffer then don't queue anything
// (but still wake up for writing)
if (buff) {
@@ -163,11 +163,11 @@ void AsynchIO::queueWriteClose() {
/** Return a queued buffer if there are enough
* to spare
*/
-AsynchIO::Buffer* AsynchIO::getQueuedBuffer() {
+AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
// Always keep at least one buffer (it might have data that was "unread" in it)
if (bufferQueue.size()<=1)
return 0;
- Buffer* buff = bufferQueue.back();
+ BufferBase* buff = bufferQueue.back();
buff->dataStart = 0;
buff->dataCount = 0;
bufferQueue.pop_back();
@@ -183,7 +183,7 @@ void AsynchIO::readable(DispatchHandle& h) {
// (Try to) get a buffer
if (!bufferQueue.empty()) {
// Read into buffer
- Buffer* buff = bufferQueue.front();
+ BufferBase* buff = bufferQueue.front();
bufferQueue.pop_front();
errno = 0;
int readCount = buff->byteCount-buff->dataCount;
@@ -239,7 +239,7 @@ void AsynchIO::writeable(DispatchHandle& h) {
// See if we've got something to write
if (!writeQueue.empty()) {
// Write buffer
- Buffer* buff = writeQueue.back();
+ BufferBase* buff = writeQueue.back();
writeQueue.pop_back();
errno = 0;
assert(buff->dataStart+buff->dataCount <= buff->byteCount);