summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/sys/AsynchIO.h43
-rw-r--r--cpp/src/qpid/sys/Dispatcher.cpp256
-rw-r--r--cpp/src/qpid/sys/Dispatcher.h21
-rw-r--r--cpp/src/qpid/sys/Poller.h21
-rw-r--r--cpp/src/qpid/sys/epoll/EpollPoller.cpp92
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp120
7 files changed, 431 insertions, 123 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index bbb1c6655e..3d97436db6 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -121,6 +121,7 @@ libLogger_la_CXXFLAGS=$(AM_CXXFLAGS) -Wno-unused-parameter
libqpidcommon_la_LIBADD = \
-lboost_program_options \
+ -luuid \
libLogger.la \
$(APR_LIBS) \
$(LIB_DLOPEN) \
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h
index b346c11706..7accde17b0 100644
--- a/cpp/src/qpid/sys/AsynchIO.h
+++ b/cpp/src/qpid/sys/AsynchIO.h
@@ -57,42 +57,61 @@ private:
*
* Writer accepts a buffer and queues it for writing; can also be given
* a callback for when writing is "idle" (ie fd is writable, but nothing to write)
+ *
+ * The class is implemented in terms of DispatchHandle to allow it to be deleted by deleting
+ * the contained DispatchHandle
*/
-class AsynchIO {
+class AsynchIO : private DispatchHandle {
public:
struct Buffer {
+ typedef boost::function1<void, const Buffer&> RecycleStorage;
+
char* const bytes;
const int32_t byteCount;
+ int32_t dataStart;
+ int32_t dataCount;
Buffer(char* const b, const int32_t s) :
bytes(b),
- byteCount(s)
+ byteCount(s),
+ dataStart(0),
+ dataCount(s)
+ {}
+
+ virtual ~Buffer()
{}
};
- typedef boost::function2<void, const Buffer&, int32_t> ReadCallback;
- typedef boost::function0<void> EofCallback;
- typedef boost::function0<void> BuffersEmptyCallback;
- typedef boost::function1<void, int> IdleCallback;
+ typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
+ typedef boost::function1<void, AsynchIO&> EofCallback;
+ typedef boost::function1<void, AsynchIO&> DisconnectCallback;
+ typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback;
+ typedef boost::function1<void, AsynchIO&> IdleCallback;
private:
ReadCallback readCallback;
EofCallback eofCallback;
+ DisconnectCallback disCallback;
BuffersEmptyCallback emptyCallback;
IdleCallback idleCallback;
- DispatchHandle handle;
- std::deque<Buffer> bufferQueue;
- std::deque<Buffer> writeQueue;
+ std::deque<Buffer*> bufferQueue;
+ std::deque<Buffer*> writeQueue;
public:
- AsynchIO(int fd, ReadCallback rCb, EofCallback eofCb, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
+ AsynchIO(int fd,
+ ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
+ BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
+ void queueForDeletion();
+
void start(Poller::shared_ptr poller);
- void QueueReadBuffer(const Buffer& buff);
- void QueueWrite(const Buffer& buff);
+ void queueReadBuffer(Buffer* buff);
+ void queueWrite(Buffer* buff);
private:
+ ~AsynchIO();
void readable(DispatchHandle& handle);
void writeable(DispatchHandle& handle);
+ void disconnected(DispatchHandle& handle);
};
}}
diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp
index 9a20e2c3bc..3a1da13bd0 100644
--- a/cpp/src/qpid/sys/Dispatcher.cpp
+++ b/cpp/src/qpid/sys/Dispatcher.cpp
@@ -40,10 +40,10 @@ void Dispatcher::run() {
// If can read/write then dispatch appropriate callbacks
if (h) {
- h->dispatchCallbacks(event.dir);
+ h->dispatchCallbacks(event.type);
} else {
// Handle shutdown
- switch (event.dir) {
+ switch (event.type) {
case Poller::SHUTDOWN:
goto dispatcher_shutdown;
default:
@@ -57,7 +57,11 @@ dispatcher_shutdown:
;
}
-void DispatchHandle::watch(Poller::shared_ptr poller0) {
+DispatchHandle::~DispatchHandle() {
+ stopWatch();
+}
+
+void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
bool r = readableCallback;
bool w = writableCallback;
@@ -84,25 +88,26 @@ void DispatchHandle::watch(Poller::shared_ptr poller0) {
}
void DispatchHandle::rewatch() {
- assert(poller);
bool r = readableCallback;
bool w = writableCallback;
ScopedLock<Mutex> lock(stateLock);
switch(state) {
- case DispatchHandle::IDLE:
- assert(false);
+ case IDLE:
break;
- case DispatchHandle::DELAYED_R:
- case DispatchHandle::DELAYED_W:
- case DispatchHandle::CALLBACK:
+ case DELAYED_R:
+ case DELAYED_W:
+ case CALLBACK:
state = r ?
(w ? DELAYED_RW : DELAYED_R) :
DELAYED_W;
break;
- case DispatchHandle::INACTIVE:
- case DispatchHandle::ACTIVE_R:
- case DispatchHandle::ACTIVE_W: {
+ case DELAYED_DELETE:
+ break;
+ case INACTIVE:
+ case ACTIVE_R:
+ case ACTIVE_W: {
+ assert(poller);
Poller::Direction d = r ?
(w ? Poller::INOUT : Poller::IN) :
Poller::OUT;
@@ -112,42 +117,43 @@ void DispatchHandle::rewatch() {
ACTIVE_W;
break;
}
- case DispatchHandle::DELAYED_RW:
- case DispatchHandle::ACTIVE_RW:
+ case DELAYED_RW:
+ case ACTIVE_RW:
// Don't need to do anything already waiting for readable/writable
break;
}
}
void DispatchHandle::rewatchRead() {
- assert(poller);
if (!readableCallback) {
return;
}
ScopedLock<Mutex> lock(stateLock);
switch(state) {
- case DispatchHandle::IDLE:
- assert(false);
+ case IDLE:
break;
- case DispatchHandle::DELAYED_R:
- case DispatchHandle::DELAYED_RW:
+ case DELAYED_R:
+ case DELAYED_RW:
+ case DELAYED_DELETE:
break;
- case DispatchHandle::DELAYED_W:
+ case DELAYED_W:
state = DELAYED_RW;
break;
- case DispatchHandle::CALLBACK:
+ case CALLBACK:
state = DELAYED_R;
break;
- case DispatchHandle::ACTIVE_R:
- case DispatchHandle::ACTIVE_RW:
- // Nothing to do: already wating for readable
+ case ACTIVE_R:
+ case ACTIVE_RW:
+ // Nothing to do: already waiting for readable
break;
- case DispatchHandle::INACTIVE:
+ case INACTIVE:
+ assert(poller);
poller->modFd(*this, Poller::IN);
state = ACTIVE_R;
break;
- case DispatchHandle::ACTIVE_W:
+ case ACTIVE_W:
+ assert(poller);
poller->modFd(*this, Poller::INOUT);
state = ACTIVE_RW;
break;
@@ -155,101 +161,245 @@ void DispatchHandle::rewatchRead() {
}
void DispatchHandle::rewatchWrite() {
- assert(poller);
if (!writableCallback) {
return;
}
ScopedLock<Mutex> lock(stateLock);
switch(state) {
- case DispatchHandle::IDLE:
- assert(false);
+ case IDLE:
break;
- case DispatchHandle::DELAYED_W:
- case DispatchHandle::DELAYED_RW:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case DELAYED_DELETE:
break;
- case DispatchHandle::DELAYED_R:
+ case DELAYED_R:
state = DELAYED_RW;
break;
- case DispatchHandle::CALLBACK:
+ case CALLBACK:
state = DELAYED_W;
break;
- case DispatchHandle::INACTIVE:
+ case INACTIVE:
+ assert(poller);
poller->modFd(*this, Poller::OUT);
state = ACTIVE_W;
break;
- case DispatchHandle::ACTIVE_R:
+ case ACTIVE_R:
+ assert(poller);
poller->modFd(*this, Poller::INOUT);
state = ACTIVE_RW;
break;
- case DispatchHandle::ACTIVE_W:
- case DispatchHandle::ACTIVE_RW:
+ case ACTIVE_W:
+ case ACTIVE_RW:
// Nothing to do: already waiting for writable
break;
}
}
+void DispatchHandle::unwatchRead() {
+ if (!readableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ break;
+ case DELAYED_R:
+ state = CALLBACK;
+ break;
+ case DELAYED_RW:
+ state = DELAYED_W;
+ break;
+ case DELAYED_W:
+ case CALLBACK:
+ case DELAYED_DELETE:
+ break;
+ case ACTIVE_R:
+ assert(poller);
+ poller->modFd(*this, Poller::NONE);
+ state = INACTIVE;
+ break;
+ case ACTIVE_RW:
+ assert(poller);
+ poller->modFd(*this, Poller::OUT);
+ state = ACTIVE_W;
+ break;
+ case ACTIVE_W:
+ case INACTIVE:
+ break;
+ }
+}
+
+void DispatchHandle::unwatchWrite() {
+ if (!writableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ break;
+ case DELAYED_W:
+ state = CALLBACK;
+ break;
+ case DELAYED_RW:
+ state = DELAYED_R;
+ break;
+ case DELAYED_R:
+ case CALLBACK:
+ case DELAYED_DELETE:
+ break;
+ case ACTIVE_W:
+ assert(poller);
+ poller->modFd(*this, Poller::NONE);
+ state = INACTIVE;
+ break;
+ case ACTIVE_RW:
+ assert(poller);
+ poller->modFd(*this, Poller::IN);
+ state = ACTIVE_R;
+ break;
+ case ACTIVE_R:
+ case INACTIVE:
+ break;
+ }
+}
+
void DispatchHandle::unwatch() {
- assert(poller);
ScopedLock<Mutex> lock(stateLock);
+ switch (state) {
+ case IDLE:
+ break;
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case CALLBACK:
+ state = CALLBACK;
+ break;
+ case DELAYED_DELETE:
+ break;
+ default:
+ assert(poller);
+ poller->modFd(*this, Poller::NONE);
+ state = INACTIVE;
+ break;
+ }
+}
+
+void DispatchHandle::stopWatch() {
+ ScopedLock<Mutex> lock(stateLock);
+ if ( state == IDLE) {
+ return;
+ }
+ assert(poller);
poller->delFd(*this);
poller.reset();
state = IDLE;
}
-void DispatchHandle::dispatchCallbacks(Poller::Direction dir) {
- // Note that we are now doing the callbacks
+// The slightly strange switch structure
+// is to ensure that the lock is released before
+// we do the delete
+void DispatchHandle::doDelete() {
+ // If we're in the middle of a callback defer the delete
{
ScopedLock<Mutex> lock(stateLock);
- assert(
- state == ACTIVE_R ||
- state == ACTIVE_W ||
- state == ACTIVE_RW);
+ switch (state) {
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case CALLBACK:
+ case DELAYED_DELETE:
+ state = DELAYED_DELETE;
+ return;
+ default:
+ break;
+ }
+ }
+ // If we're not then do it right away
+ delete this;
+}
- state = CALLBACK;
+void DispatchHandle::dispatchCallbacks(Poller::EventType type) {
+ // Note that we are now doing the callbacks
+ {
+ ScopedLock<Mutex> lock(stateLock);
+
+ // Set up to wait for same events next time unless reset
+ switch(state) {
+ case ACTIVE_R:
+ state = DELAYED_R;
+ break;
+ case ACTIVE_W:
+ state = DELAYED_W;
+ break;
+ case ACTIVE_RW:
+ state = DELAYED_RW;
+ break;
+ default:
+ assert(false);
+ }
}
// Do callbacks - whilst we are doing the callbacks we are prevented from processing
// the same handle until we re-enable it. To avoid rentering the callbacks for a single
// handle re-enabling in the callbacks is actually deferred until they are complete.
- switch (dir) {
- case Poller::IN:
+ switch (type) {
+ case Poller::READABLE:
readableCallback(*this);
break;
- case Poller::OUT:
+ case Poller::WRITABLE:
writableCallback(*this);
break;
- case Poller::INOUT:
+ case Poller::READ_WRITABLE:
readableCallback(*this);
writableCallback(*this);
break;
+ case Poller::DISCONNECTED:
+ {
+ ScopedLock<Mutex> lock(stateLock);
+ state = CALLBACK;
+ }
+ if (disconnectedCallback) {
+ disconnectedCallback(*this);
+ }
+ break;
default:
assert(false);
}
// If any of the callbacks re-enabled reading/writing then actually
// do it now
+ {
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case DELAYED_R:
poller->modFd(*this, Poller::IN);
state = ACTIVE_R;
- break;
+ return;
case DELAYED_W:
poller->modFd(*this, Poller::OUT);
state = ACTIVE_W;
- break;
+ return;
case DELAYED_RW:
poller->modFd(*this, Poller::INOUT);
state = ACTIVE_RW;
- break;
+ return;
case CALLBACK:
state = INACTIVE;
- break;
+ return;
+ case IDLE:
+ return;
default:
// This should be impossible
assert(false);
+ return;
+ case DELAYED_DELETE:
+ break;
+ }
}
+ delete this;
}
}}
diff --git a/cpp/src/qpid/sys/Dispatcher.h b/cpp/src/qpid/sys/Dispatcher.h
index 3e43ca3bc1..60e9522bc1 100644
--- a/cpp/src/qpid/sys/Dispatcher.h
+++ b/cpp/src/qpid/sys/Dispatcher.h
@@ -44,26 +44,39 @@ public:
private:
Callback readableCallback;
Callback writableCallback;
+ Callback disconnectedCallback;
Poller::shared_ptr poller;
Mutex stateLock;
- enum { IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW, CALLBACK, DELAYED_R, DELAYED_W, DELAYED_RW} state;
+ enum {
+ IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW,
+ CALLBACK, DELAYED_R, DELAYED_W, DELAYED_RW, DELAYED_DELETE
+ } state;
public:
- DispatchHandle(int fd, Callback rCb, Callback wCb) :
+ DispatchHandle(int fd, Callback rCb, Callback wCb, Callback dCb) :
PollerHandle(fd),
readableCallback(rCb),
writableCallback(wCb),
+ disconnectedCallback(dCb),
state(IDLE)
{}
- void watch(Poller::shared_ptr poller);
+ ~DispatchHandle();
+
+ void startWatch(Poller::shared_ptr poller);
void rewatch();
void rewatchRead();
void rewatchWrite();
void unwatch();
+ void unwatchRead();
+ void unwatchWrite();
+ void stopWatch();
+
+protected:
+ void doDelete();
private:
- void dispatchCallbacks(Poller::Direction dir);
+ void dispatchCallbacks(Poller::EventType dir);
};
class Dispatcher : public Runnable {
diff --git a/cpp/src/qpid/sys/Poller.h b/cpp/src/qpid/sys/Poller.h
index 6fedd669a0..55fead55aa 100644
--- a/cpp/src/qpid/sys/Poller.h
+++ b/cpp/src/qpid/sys/Poller.h
@@ -61,20 +61,29 @@ public:
typedef boost::shared_ptr<Poller> shared_ptr;
enum Direction {
- NONE,
+ NONE = 0,
IN,
OUT,
- INOUT,
- SHUTDOWN
+ INOUT
+ };
+
+ enum EventType {
+ INVALID = 0,
+ READABLE,
+ WRITABLE,
+ READ_WRITABLE,
+ DISCONNECTED,
+ SHUTDOWN,
+ TIMEOUT
};
struct Event {
PollerHandle* handle;
- Direction dir;
+ EventType type;
- Event(PollerHandle* handle0, Direction dir0) :
+ Event(PollerHandle* handle0, EventType type0) :
handle(handle0),
- dir(dir0) {
+ type(type0) {
}
};
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index 65b2255023..8c3bdbc7d5 100644
--- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -40,7 +40,9 @@ class PollerHandlePrivate {
enum FDStat {
ABSENT,
MONITORED,
- INACTIVE
+ INACTIVE,
+ HUNGUP,
+ MONITORED_HUNGUP
};
::__uint32_t events;
@@ -51,6 +53,39 @@ class PollerHandlePrivate {
events(0),
stat(ABSENT) {
}
+
+ bool isActive() const {
+ return stat == MONITORED || stat == MONITORED_HUNGUP;
+ }
+
+ void setActive() {
+ stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED;
+ }
+
+ bool isInactive() const {
+ return stat == INACTIVE || stat == HUNGUP;
+ }
+
+ void setInactive() {
+ stat = INACTIVE;
+ }
+
+ bool isIdle() const {
+ return stat == ABSENT;
+ }
+
+ void setIdle() {
+ stat = ABSENT;
+ }
+
+ bool isHungup() const {
+ return stat == MONITORED_HUNGUP || stat == HUNGUP;
+ }
+
+ void setHungup() {
+ assert(stat == MONITORED);
+ stat = HUNGUP;
+ }
};
PollerHandle::PollerHandle(int fd0) :
@@ -108,13 +143,16 @@ class PollerPrivate {
}
}
- static Poller::Direction epollToDirection(::__uint32_t events) {
+ static Poller::EventType epollToDirection(::__uint32_t events) {
+ // POLLOUT & POLLHUP are mutually exclusive really, but at least socketpairs
+ // can give you both!
+ events = (events & ::EPOLLHUP) ? events & ~::EPOLLOUT : events;
::__uint32_t e = events & (::EPOLLIN | ::EPOLLOUT);
switch (e) {
- case ::EPOLLIN: return Poller::IN;
- case ::EPOLLOUT: return Poller::OUT;
- case ::EPOLLIN | ::EPOLLOUT: return Poller::INOUT;
- default: return Poller::NONE;
+ case ::EPOLLIN: return Poller::READABLE;
+ case ::EPOLLOUT: return Poller::WRITABLE;
+ case ::EPOLLIN | ::EPOLLOUT: return Poller::READ_WRITABLE;
+ default: return (events & ::EPOLLHUP) ? Poller::DISCONNECTED : Poller::INVALID;
}
}
@@ -138,11 +176,11 @@ void Poller::addFd(PollerHandle& handle, Direction dir) {
::epoll_event epe;
int op;
- if (eh.stat == PollerHandlePrivate::ABSENT) {
+ if (eh.isIdle()) {
op = EPOLL_CTL_ADD;
epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
} else {
- assert(eh.stat == PollerHandlePrivate::MONITORED);
+ assert(eh.isActive());
op = EPOLL_CTL_MOD;
epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
}
@@ -152,22 +190,27 @@ void Poller::addFd(PollerHandle& handle, Direction dir) {
// Record monitoring state of this fd
eh.events = epe.events;
- eh.stat = PollerHandlePrivate::MONITORED;
+ eh.setActive();
}
void Poller::delFd(PollerHandle& handle) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
- assert(eh.stat != PollerHandlePrivate::ABSENT);
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0));
- eh.stat = PollerHandlePrivate::ABSENT;
+ assert(!eh.isIdle());
+ int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0);
+ // Ignore EBADF since deleting a nonexistent fd has the overall required result!
+ // And allows the case where a sloppy program closes the fd and then does the delFd()
+ if (rc == -1 && errno != EBADF) {
+ QPID_POSIX_CHECK(rc);
+ }
+ eh.setIdle();
}
// modFd is equivalent to delFd followed by addFd
void Poller::modFd(PollerHandle& handle, Direction dir) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
- assert(eh.stat != PollerHandlePrivate::ABSENT);
+ assert(!eh.isIdle());
::epoll_event epe;
epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
@@ -177,13 +220,13 @@ void Poller::modFd(PollerHandle& handle, Direction dir) {
// Record monitoring state of this fd
eh.events = epe.events;
- eh.stat = PollerHandlePrivate::MONITORED;
+ eh.setActive();
}
void Poller::rearmFd(PollerHandle& handle) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
- assert(eh.stat == PollerHandlePrivate::INACTIVE);
+ assert(eh.isInactive());
::epoll_event epe;
epe.events = eh.events;
@@ -191,7 +234,7 @@ void Poller::rearmFd(PollerHandle& handle) {
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe));
- eh.stat = PollerHandlePrivate::MONITORED;
+ eh.setActive();
}
void Poller::shutdown() {
@@ -229,8 +272,17 @@ Poller::Event Poller::wait(Duration timeout) {
ScopedLock<Mutex> l(eh.lock);
// the handle could have gone inactive since we left the epoll_wait
- if (eh.stat == PollerHandlePrivate::MONITORED) {
- eh.stat = PollerHandlePrivate::INACTIVE;
+ if (eh.isActive()) {
+ // If the connection has been hungup we could still be readable
+ // (just not writable), allow us to readable until we get here again
+ if (epe.events & ::EPOLLHUP) {
+ if (eh.isHungup()) {
+ return Event(handle, DISCONNECTED);
+ }
+ eh.setHungup();
+ } else {
+ eh.setInactive();
+ }
return Event(handle, PollerPrivate::epollToDirection(epe.events));
}
}
@@ -245,8 +297,8 @@ Poller::Event Poller::wait(Duration timeout) {
// If the wait wasn't indefinite, but we were interrupted then we have to return
// with a timeout as we don't know how long we've waited so far and so we can't
// continue the wait.
- if (rc == 0 || timeoutMs == -1) {
- return Event(0, NONE);
+ if (rc == 0 || timeoutMs != -1) {
+ return Event(0, TIMEOUT);
}
} while (true);
}
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 400c2080b2..473ef7936f 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -27,6 +27,7 @@
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
+#include <signal.h>
#include <errno.h>
#include <boost/bind.hpp>
@@ -42,6 +43,14 @@ void nonblocking(int fd) {
QPID_POSIX_CHECK(::fcntl(fd, F_SETFL, O_NONBLOCK));
}
+/*
+ * Make *process* not generate SIGPIPE when writing to closed
+ * pipe/socket (necessary as default action is to terminate process)
+ */
+void ignoreSigpipe() {
+ ::signal(SIGPIPE, SIG_IGN);
+}
+
}
/*
@@ -50,13 +59,14 @@ void nonblocking(int fd) {
AsynchAcceptor::AsynchAcceptor(int fd, Callback callback) :
acceptedCallback(callback),
- handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0) {
+ handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
nonblocking(fd);
+ ignoreSigpipe();
}
void AsynchAcceptor::start(Poller::shared_ptr poller) {
- handle.watch(poller);
+ handle.startWatch(poller);
}
/*
@@ -84,28 +94,50 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
/*
* Asynch reader/writer
*/
-AsynchIO::AsynchIO(int fd, ReadCallback rCb, EofCallback eofCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
+AsynchIO::AsynchIO(int fd,
+ ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
+ BuffersEmptyCallback eCb, IdleCallback iCb) :
+
+ DispatchHandle(fd,
+ boost::bind(&AsynchIO::readable, this, _1),
+ boost::bind(&AsynchIO::writeable, this, _1),
+ boost::bind(&AsynchIO::disconnected, this, _1)),
readCallback(rCb),
eofCallback(eofCb),
+ disCallback(disCb),
emptyCallback(eCb),
- idleCallback(iCb),
- handle(fd, boost::bind(&AsynchIO::readable, this, _1), boost::bind(&AsynchIO::writeable, this, _1)) {
+ idleCallback(iCb) {
nonblocking(fd);
}
+struct deleter
+{
+ template <typename T>
+ void operator()(T *ptr){ delete ptr;}
+};
+
+AsynchIO::~AsynchIO() {
+ std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
+ std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
+}
+
+void AsynchIO::queueForDeletion() {
+ DispatchHandle::doDelete();
+}
+
void AsynchIO::start(Poller::shared_ptr poller) {
- handle.watch(poller);
+ DispatchHandle::startWatch(poller);
}
-void AsynchIO::QueueReadBuffer(const Buffer& buff) {
+void AsynchIO::queueReadBuffer(Buffer* buff) {
bufferQueue.push_front(buff);
- handle.rewatchRead();
+ DispatchHandle::rewatchRead();
}
-void AsynchIO::QueueWrite(const Buffer& buff) {
+void AsynchIO::queueWrite(Buffer* buff) {
writeQueue.push_front(buff);
- handle.rewatchWrite();
+ DispatchHandle::rewatchWrite();
}
/*
@@ -117,22 +149,34 @@ void AsynchIO::readable(DispatchHandle& h) {
// (Try to) get a buffer
if (!bufferQueue.empty()) {
// Read into buffer
- Buffer buff = bufferQueue.back();
+ Buffer* buff = bufferQueue.back();
bufferQueue.pop_back();
errno = 0;
- int rc = ::read(h.getFD(), buff.bytes, buff.byteCount);
+ int rc = ::read(h.getFD(), buff->bytes, buff->byteCount);
if (rc == 0) {
- eofCallback();
+ eofCallback(*this);
+ h.unwatchRead();
+ return;
} else if (rc > 0) {
- readCallback(buff, rc);
+ buff->dataStart = 0;
+ buff->dataCount = rc;
+ readCallback(*this, buff);
+ if (rc != buff->byteCount) {
+ // If we didn't fill the read buffer then time to stop reading
+ return;
+ }
} else {
// Put buffer back
bufferQueue.push_back(buff);
- if (errno == EAGAIN) {
- // We must have just put a buffer back so we know
- // we can do this
- h.rewatchRead();
+ // This is effectively the same as eof
+ if (errno == ECONNRESET) {
+ eofCallback(*this);
+ h.unwatchRead();
+ return;
+ } else if (errno == EAGAIN) {
+ // We have just put a buffer back so we know
+ // we can carry on watching for reads
return;
} else {
QPID_POSIX_CHECK(rc);
@@ -141,10 +185,11 @@ void AsynchIO::readable(DispatchHandle& h) {
} else {
// Something to read but no buffer
if (emptyCallback) {
- emptyCallback();
+ emptyCallback(*this);
}
// If we still have no buffers we can't do anything more
if (bufferQueue.empty()) {
+ h.unwatchRead();
return;
}
@@ -160,36 +205,55 @@ void AsynchIO::writeable(DispatchHandle& h) {
// See if we've got something to write
if (!writeQueue.empty()) {
// Write buffer
- Buffer buff = writeQueue.back();
+ Buffer* buff = writeQueue.back();
writeQueue.pop_back();
errno = 0;
- int rc = ::write(h.getFD(), buff.bytes, buff.byteCount);
+ assert(buff->dataStart+buff->dataCount <= buff->byteCount);
+ int rc = ::write(h.getFD(), buff->bytes+buff->dataStart, buff->dataCount);
if (rc >= 0) {
+ // If we didn't write full buffer put rest back
+ if (rc != buff->dataCount) {
+ buff->dataStart += rc;
+ buff->dataCount -= rc;
+ writeQueue.push_back(buff);
+ return;
+ }
+
// Recycle the buffer
- QueueReadBuffer(buff);
+ queueReadBuffer(buff);
} else {
// Put buffer back
writeQueue.push_back(buff);
-
- if (errno == EAGAIN) {
+ if (errno == ECONNRESET || errno == EPIPE) {
+ // Just stop watching for write here - we'll get a
+ // disconnect callback soon enough
+ h.unwatchWrite();
+ return;
+ } else if (errno == EAGAIN) {
// We have just put a buffer back so we know
- // we can do this
- h.rewatchWrite();
+ // we can carry on watching for writes
return;
} else {
QPID_POSIX_CHECK(rc);
}
}
} else {
- // Something to read but no buffer
+ // Fd is writable, but nothing to write
if (idleCallback) {
- idleCallback(h.getFD());
+ idleCallback(*this);
}
// If we still have no buffers to write we can't do anything more
if (writeQueue.empty()) {
+ h.unwatchWrite();
return;
}
}
} while (true);
}
+void AsynchIO::disconnected(DispatchHandle& h) {
+ if (disCallback) {
+ disCallback(*this);
+ h.unwatch();
+ }
+}