summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/AsynchIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp59
1 files changed, 31 insertions, 28 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index f8aaa38cf5..94c68bd5d0 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -42,7 +42,7 @@ namespace {
* pipe/socket (necessary as default action is to terminate process)
*/
void ignoreSigpipe() {
- ::signal(SIGPIPE, SIG_IGN);
+ ::signal(SIGPIPE, SIG_IGN);
}
/*
@@ -88,7 +88,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
if (s) {
acceptedCallback(*s);
} else {
- break;
+ break;
}
} while (true);
@@ -99,13 +99,13 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
* Asynch reader/writer
*/
AsynchIO::AsynchIO(const Socket& s,
- ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
+ ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
+ ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
DispatchHandle(s,
- boost::bind(&AsynchIO::readable, this, _1),
- boost::bind(&AsynchIO::writeable, this, _1),
- boost::bind(&AsynchIO::disconnected, this, _1)),
+ boost::bind(&AsynchIO::readable, this, _1),
+ boost::bind(&AsynchIO::writeable, this, _1),
+ boost::bind(&AsynchIO::disconnected, this, _1)),
readCallback(rCb),
eofCallback(eofCb),
disCallback(disCb),
@@ -120,8 +120,8 @@ AsynchIO::AsynchIO(const Socket& s,
struct deleter
{
- template <typename T>
- void operator()(T *ptr){ delete ptr;}
+ template <typename T>
+ void operator()(T *ptr){ delete ptr;}
};
AsynchIO::~AsynchIO() {
@@ -138,7 +138,7 @@ void AsynchIO::start(Poller::shared_ptr poller) {
}
void AsynchIO::queueReadBuffer(BufferBase* buff) {
- assert(buff);
+ assert(buff);
buff->dataStart = 0;
buff->dataCount = 0;
bufferQueue.push_back(buff);
@@ -146,11 +146,11 @@ void AsynchIO::queueReadBuffer(BufferBase* buff) {
}
void AsynchIO::unread(BufferBase* buff) {
- assert(buff);
- if (buff->dataStart != 0) {
- memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
- buff->dataStart = 0;
- }
+ assert(buff);
+ if (buff->dataStart != 0) {
+ memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
+ buff->dataStart = 0;
+ }
bufferQueue.push_front(buff);
DispatchHandle::rewatchRead();
}
@@ -182,14 +182,15 @@ void AsynchIO::queueWriteClose() {
* to spare
*/
AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
- // Always keep at least one buffer (it might have data that was "unread" in it)
- if (bufferQueue.size()<=1)
- return 0;
- BufferBase* buff = bufferQueue.back();
- buff->dataStart = 0;
- buff->dataCount = 0;
- bufferQueue.pop_back();
- return buff;
+ // Always keep at least one buffer (it might have data that was "unread" in it)
+ if (bufferQueue.size()<=1)
+ return 0;
+ BufferBase* buff = bufferQueue.back();
+ assert(buff);
+ buff->dataStart = 0;
+ buff->dataCount = 0;
+ bufferQueue.pop_back();
+ return buff;
}
/*
@@ -204,6 +205,7 @@ void AsynchIO::readable(DispatchHandle& h) {
if (!bufferQueue.empty()) {
// Read into buffer
BufferBase* buff = bufferQueue.front();
+ assert(buff);
bufferQueue.pop_front();
errno = 0;
int readCount = buff->byteCount-buff->dataCount;
@@ -227,6 +229,7 @@ void AsynchIO::readable(DispatchHandle& h) {
} else {
// Put buffer back (at front so it doesn't interfere with unread buffers)
bufferQueue.push_front(buff);
+ assert(buff);
// Eof or other side has gone away
if (rc == 0 || errno == ECONNRESET) {
@@ -352,10 +355,10 @@ void AsynchIO::disconnected(DispatchHandle& h) {
* Close the socket and callback to say we've done it
*/
void AsynchIO::close(DispatchHandle& h) {
- h.stopWatch();
- h.getSocket().close();
- if (closedCallback) {
- closedCallback(*this, getSocket());
- }
+ h.stopWatch();
+ h.getSocket().close();
+ if (closedCallback) {
+ closedCallback(*this, getSocket());
+ }
}