summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/Makefile.am5
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIO.h100
-rw-r--r--qpid/cpp/src/qpid/sys/Dispatcher.cpp203
-rw-r--r--qpid/cpp/src/qpid/sys/Dispatcher.h13
-rw-r--r--qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp195
-rw-r--r--qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp24
-rw-r--r--qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h10
7 files changed, 506 insertions, 44 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index b92f8d85e5..4b3a223e72 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -68,9 +68,11 @@ posix_netio_hdr = \
qpid/sys/posix/EventChannelThreads.h
posix_plat_src = \
- qpid/sys/posix/check.cpp \
+ qpid/sys/Dispatcher.cpp \
qpid/sys/epoll/EpollPoller.cpp \
+ qpid/sys/posix/check.cpp \
qpid/sys/posix/Socket.cpp \
+ qpid/sys/posix/AsynchIO.cpp \
qpid/sys/posix/Time.cpp \
qpid/sys/posix/Thread.cpp
@@ -349,6 +351,7 @@ nobase_include_HEADERS = \
qpid/framing/amqp_types.h \
qpid/framing/amqp_types_full.h \
qpid/sys/Acceptor.h \
+ qpid/sys/AsynchIO.h \
qpid/sys/AtomicCount.h \
qpid/sys/Dispatcher.h \
qpid/sys/Condition.h \
diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h
new file mode 100644
index 0000000000..b346c11706
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/AsynchIO.h
@@ -0,0 +1,100 @@
+#ifndef _sys_AsynchIO
+#define _sys_AsynchIO
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Dispatcher.h"
+
+#include <boost/function.hpp>
+#include <deque>
+
+namespace qpid {
+namespace sys {
+
+/*
+ * Asynchronous acceptor: accepts connections then does a callback with the
+ * accepted fd
+ */
+class AsynchAcceptor {
+public:
+ typedef boost::function1<void, int> Callback;
+
+private:
+ Callback acceptedCallback;
+ DispatchHandle handle;
+
+public:
+ AsynchAcceptor(int fd, Callback callback);
+ void start(Poller::shared_ptr poller);
+
+private:
+ void readable(DispatchHandle& handle);
+};
+
+/*
+ * Asycnchronous reader/writer:
+ * Reader accepts buffers to read into; reads into the provided buffers
+ * and then does a callback with the buffer and amount read. Optionally it can callback
+ * when there is something to read but no buffer to read it into.
+ *
+ * Writer accepts a buffer and queues it for writing; can also be given
+ * a callback for when writing is "idle" (ie fd is writable, but nothing to write)
+ */
+class AsynchIO {
+public:
+ struct Buffer {
+ char* const bytes;
+ const int32_t byteCount;
+
+ Buffer(char* const b, const int32_t s) :
+ bytes(b),
+ byteCount(s)
+ {}
+ };
+
+ typedef boost::function2<void, const Buffer&, int32_t> ReadCallback;
+ typedef boost::function0<void> EofCallback;
+ typedef boost::function0<void> BuffersEmptyCallback;
+ typedef boost::function1<void, int> IdleCallback;
+
+private:
+ ReadCallback readCallback;
+ EofCallback eofCallback;
+ BuffersEmptyCallback emptyCallback;
+ IdleCallback idleCallback;
+ DispatchHandle handle;
+ std::deque<Buffer> bufferQueue;
+ std::deque<Buffer> writeQueue;
+
+public:
+ AsynchIO(int fd, ReadCallback rCb, EofCallback eofCb, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
+ void start(Poller::shared_ptr poller);
+ void QueueReadBuffer(const Buffer& buff);
+ void QueueWrite(const Buffer& buff);
+
+private:
+ void readable(DispatchHandle& handle);
+ void writeable(DispatchHandle& handle);
+};
+
+}}
+
+#endif // _sys_AsynchIO
diff --git a/qpid/cpp/src/qpid/sys/Dispatcher.cpp b/qpid/cpp/src/qpid/sys/Dispatcher.cpp
index 4838e5e4cd..9a20e2c3bc 100644
--- a/qpid/cpp/src/qpid/sys/Dispatcher.cpp
+++ b/qpid/cpp/src/qpid/sys/Dispatcher.cpp
@@ -36,23 +36,20 @@ Dispatcher::~Dispatcher() {
void Dispatcher::run() {
do {
Poller::Event event = poller->wait();
- // Poller::wait guarantees to return an event
DispatchHandle* h = static_cast<DispatchHandle*>(event.handle);
- switch (event.dir) {
- case Poller::IN:
- h->readableCallback(*h);
- break;
- case Poller::OUT:
- h->writableCallback(*h);
- break;
- case Poller::INOUT:
- h->readableCallback(*h);
- h->writableCallback(*h);
- break;
- case Poller::SHUTDOWN:
- goto dispatcher_shutdown;
- default:
- ;
+
+ // If can read/write then dispatch appropriate callbacks
+ if (h) {
+ h->dispatchCallbacks(event.dir);
+ } else {
+ // Handle shutdown
+ switch (event.dir) {
+ case Poller::SHUTDOWN:
+ goto dispatcher_shutdown;
+ default:
+ // This should be impossible
+ assert(false);
+ }
}
} while (true);
@@ -63,11 +60,16 @@ dispatcher_shutdown:
void DispatchHandle::watch(Poller::shared_ptr poller0) {
bool r = readableCallback;
bool w = writableCallback;
-
+
+ ScopedLock<Mutex> lock(stateLock);
+ assert(state == IDLE);
+
// If no callbacks set then do nothing (that is what we were asked to do!)
// TODO: Maybe this should be an assert instead
- if (!r && !w)
+ if (!r && !w) {
+ state = INACTIVE;
return;
+ }
Poller::Direction d = r ?
(w ? Poller::INOUT : Poller::IN) :
@@ -75,16 +77,179 @@ void DispatchHandle::watch(Poller::shared_ptr poller0) {
poller = poller0;
poller->addFd(*this, d);
+
+ state = r ?
+ (w ? ACTIVE_RW : ACTIVE_R) :
+ ACTIVE_W;
}
void DispatchHandle::rewatch() {
assert(poller);
- poller->rearmFd(*this);
+ bool r = readableCallback;
+ bool w = writableCallback;
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case DispatchHandle::IDLE:
+ assert(false);
+ break;
+ case DispatchHandle::DELAYED_R:
+ case DispatchHandle::DELAYED_W:
+ case DispatchHandle::CALLBACK:
+ state = r ?
+ (w ? DELAYED_RW : DELAYED_R) :
+ DELAYED_W;
+ break;
+ case DispatchHandle::INACTIVE:
+ case DispatchHandle::ACTIVE_R:
+ case DispatchHandle::ACTIVE_W: {
+ Poller::Direction d = r ?
+ (w ? Poller::INOUT : Poller::IN) :
+ Poller::OUT;
+ poller->modFd(*this, d);
+ state = r ?
+ (w ? ACTIVE_RW : ACTIVE_R) :
+ ACTIVE_W;
+ break;
+ }
+ case DispatchHandle::DELAYED_RW:
+ case DispatchHandle::ACTIVE_RW:
+ // Don't need to do anything already waiting for readable/writable
+ break;
+ }
+}
+
+void DispatchHandle::rewatchRead() {
+ assert(poller);
+ if (!readableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case DispatchHandle::IDLE:
+ assert(false);
+ break;
+ case DispatchHandle::DELAYED_R:
+ case DispatchHandle::DELAYED_RW:
+ break;
+ case DispatchHandle::DELAYED_W:
+ state = DELAYED_RW;
+ break;
+ case DispatchHandle::CALLBACK:
+ state = DELAYED_R;
+ break;
+ case DispatchHandle::ACTIVE_R:
+ case DispatchHandle::ACTIVE_RW:
+ // Nothing to do: already wating for readable
+ break;
+ case DispatchHandle::INACTIVE:
+ poller->modFd(*this, Poller::IN);
+ state = ACTIVE_R;
+ break;
+ case DispatchHandle::ACTIVE_W:
+ poller->modFd(*this, Poller::INOUT);
+ state = ACTIVE_RW;
+ break;
+ }
+}
+
+void DispatchHandle::rewatchWrite() {
+ assert(poller);
+ if (!writableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case DispatchHandle::IDLE:
+ assert(false);
+ break;
+ case DispatchHandle::DELAYED_W:
+ case DispatchHandle::DELAYED_RW:
+ break;
+ case DispatchHandle::DELAYED_R:
+ state = DELAYED_RW;
+ break;
+ case DispatchHandle::CALLBACK:
+ state = DELAYED_W;
+ break;
+ case DispatchHandle::INACTIVE:
+ poller->modFd(*this, Poller::OUT);
+ state = ACTIVE_W;
+ break;
+ case DispatchHandle::ACTIVE_R:
+ poller->modFd(*this, Poller::INOUT);
+ state = ACTIVE_RW;
+ break;
+ case DispatchHandle::ACTIVE_W:
+ case DispatchHandle::ACTIVE_RW:
+ // Nothing to do: already waiting for writable
+ break;
+ }
}
void DispatchHandle::unwatch() {
+ assert(poller);
+ ScopedLock<Mutex> lock(stateLock);
poller->delFd(*this);
poller.reset();
+ state = IDLE;
+}
+
+void DispatchHandle::dispatchCallbacks(Poller::Direction dir) {
+ // Note that we are now doing the callbacks
+ {
+ ScopedLock<Mutex> lock(stateLock);
+ assert(
+ state == ACTIVE_R ||
+ state == ACTIVE_W ||
+ state == ACTIVE_RW);
+
+ state = CALLBACK;
+ }
+
+ // Do callbacks - whilst we are doing the callbacks we are prevented from processing
+ // the same handle until we re-enable it. To avoid rentering the callbacks for a single
+ // handle re-enabling in the callbacks is actually deferred until they are complete.
+ switch (dir) {
+ case Poller::IN:
+ readableCallback(*this);
+ break;
+ case Poller::OUT:
+ writableCallback(*this);
+ break;
+ case Poller::INOUT:
+ readableCallback(*this);
+ writableCallback(*this);
+ break;
+ default:
+ assert(false);
+ }
+
+ // If any of the callbacks re-enabled reading/writing then actually
+ // do it now
+ ScopedLock<Mutex> lock(stateLock);
+ switch (state) {
+ case DELAYED_R:
+ poller->modFd(*this, Poller::IN);
+ state = ACTIVE_R;
+ break;
+ case DELAYED_W:
+ poller->modFd(*this, Poller::OUT);
+ state = ACTIVE_W;
+ break;
+ case DELAYED_RW:
+ poller->modFd(*this, Poller::INOUT);
+ state = ACTIVE_RW;
+ break;
+ case CALLBACK:
+ state = INACTIVE;
+ break;
+ default:
+ // This should be impossible
+ assert(false);
+ }
}
}}
diff --git a/qpid/cpp/src/qpid/sys/Dispatcher.h b/qpid/cpp/src/qpid/sys/Dispatcher.h
index c0b010eb39..3e43ca3bc1 100644
--- a/qpid/cpp/src/qpid/sys/Dispatcher.h
+++ b/qpid/cpp/src/qpid/sys/Dispatcher.h
@@ -22,9 +22,9 @@
*
*/
-
#include "Poller.h"
#include "Runnable.h"
+#include "Mutex.h"
#include <memory>
#include <boost/function.hpp>
@@ -45,18 +45,25 @@ private:
Callback readableCallback;
Callback writableCallback;
Poller::shared_ptr poller;
+ Mutex stateLock;
+ enum { IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW, CALLBACK, DELAYED_R, DELAYED_W, DELAYED_RW} state;
public:
-
DispatchHandle(int fd, Callback rCb, Callback wCb) :
PollerHandle(fd),
readableCallback(rCb),
- writableCallback(wCb)
+ writableCallback(wCb),
+ state(IDLE)
{}
void watch(Poller::shared_ptr poller);
void rewatch();
+ void rewatchRead();
+ void rewatchWrite();
void unwatch();
+
+private:
+ void dispatchCallbacks(Poller::Direction dir);
};
class Dispatcher : public Runnable {
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
new file mode 100644
index 0000000000..400c2080b2
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AsynchIO.h"
+
+#include "check.h"
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <errno.h>
+
+#include <boost/bind.hpp>
+
+using namespace qpid::sys;
+
+namespace {
+
+/*
+ * Make file descriptor non-blocking
+ */
+void nonblocking(int fd) {
+ QPID_POSIX_CHECK(::fcntl(fd, F_SETFL, O_NONBLOCK));
+}
+
+}
+
+/*
+ * Asynch Acceptor
+ */
+
+AsynchAcceptor::AsynchAcceptor(int fd, Callback callback) :
+ acceptedCallback(callback),
+ handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0) {
+
+ nonblocking(fd);
+}
+
+void AsynchAcceptor::start(Poller::shared_ptr poller) {
+ handle.watch(poller);
+}
+
+/*
+ * We keep on accepting as long as there is something to accept
+ */
+void AsynchAcceptor::readable(DispatchHandle& h) {
+ int afd;
+ do {
+ errno = 0;
+ // TODO: Currently we ignore the peers address, perhaps we should
+ // log it or use it for connection acceptance.
+ afd = ::accept(h.getFD(), 0, 0);
+ if (afd >= 0) {
+ acceptedCallback(afd);
+ } else if (errno == EAGAIN) {
+ break;
+ } else {
+ QPID_POSIX_CHECK(afd);
+ }
+ } while (true);
+
+ h.rewatch();
+}
+
+/*
+ * Asynch reader/writer
+ */
+AsynchIO::AsynchIO(int fd, ReadCallback rCb, EofCallback eofCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
+ readCallback(rCb),
+ eofCallback(eofCb),
+ emptyCallback(eCb),
+ idleCallback(iCb),
+ handle(fd, boost::bind(&AsynchIO::readable, this, _1), boost::bind(&AsynchIO::writeable, this, _1)) {
+
+ nonblocking(fd);
+}
+
+void AsynchIO::start(Poller::shared_ptr poller) {
+ handle.watch(poller);
+}
+
+void AsynchIO::QueueReadBuffer(const Buffer& buff) {
+ bufferQueue.push_front(buff);
+ handle.rewatchRead();
+}
+
+void AsynchIO::QueueWrite(const Buffer& buff) {
+ writeQueue.push_front(buff);
+ handle.rewatchWrite();
+}
+
+/*
+ * We keep on reading as long as we have something to read and a buffer to put
+ * it in
+ */
+void AsynchIO::readable(DispatchHandle& h) {
+ do {
+ // (Try to) get a buffer
+ if (!bufferQueue.empty()) {
+ // Read into buffer
+ Buffer buff = bufferQueue.back();
+ bufferQueue.pop_back();
+ errno = 0;
+ int rc = ::read(h.getFD(), buff.bytes, buff.byteCount);
+ if (rc == 0) {
+ eofCallback();
+ } else if (rc > 0) {
+ readCallback(buff, rc);
+ } else {
+ // Put buffer back
+ bufferQueue.push_back(buff);
+
+ if (errno == EAGAIN) {
+ // We must have just put a buffer back so we know
+ // we can do this
+ h.rewatchRead();
+ return;
+ } else {
+ QPID_POSIX_CHECK(rc);
+ }
+ }
+ } else {
+ // Something to read but no buffer
+ if (emptyCallback) {
+ emptyCallback();
+ }
+ // If we still have no buffers we can't do anything more
+ if (bufferQueue.empty()) {
+ return;
+ }
+
+ }
+ } while (true);
+}
+
+/*
+ * We carry on writing whilst we have data to write and we can write
+ */
+void AsynchIO::writeable(DispatchHandle& h) {
+ do {
+ // See if we've got something to write
+ if (!writeQueue.empty()) {
+ // Write buffer
+ Buffer buff = writeQueue.back();
+ writeQueue.pop_back();
+ errno = 0;
+ int rc = ::write(h.getFD(), buff.bytes, buff.byteCount);
+ if (rc >= 0) {
+ // Recycle the buffer
+ QueueReadBuffer(buff);
+ } else {
+ // Put buffer back
+ writeQueue.push_back(buff);
+
+ if (errno == EAGAIN) {
+ // We have just put a buffer back so we know
+ // we can do this
+ h.rewatchWrite();
+ return;
+ } else {
+ QPID_POSIX_CHECK(rc);
+ }
+ }
+ } else {
+ // Something to read but no buffer
+ if (idleCallback) {
+ idleCallback(h.getFD());
+ }
+ // If we still have no buffers to write we can't do anything more
+ if (writeQueue.empty()) {
+ return;
+ }
+ }
+ } while (true);
+}
+
diff --git a/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp b/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
index a36f096a4d..f4b6396dd1 100644
--- a/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
@@ -62,11 +62,10 @@ EventChannelConnection::EventChannelConnection(
}
-void EventChannelConnection::send(std::auto_ptr<AMQFrame> frame) {
+void EventChannelConnection::send(AMQFrame& frame) {
{
Monitor::ScopedLock lock(monitor);
- assert(frame.get());
- writeFrames.push_back(frame.release());
+ writeFrames.push_back(frame);
}
closeOnException(&EventChannelConnection::startWrite);
}
@@ -119,7 +118,6 @@ void EventChannelConnection::closeOnException(MemberFnPtr f)
// Called by endWrite and send, but only one thread writes at a time.
//
void EventChannelConnection::startWrite() {
- FrameQueue::auto_type frame;
{
Monitor::ScopedLock lock(monitor);
// Stop if closed or a write event is already in progress.
@@ -130,14 +128,15 @@ void EventChannelConnection::startWrite() {
return;
}
isWriting = true;
- frame = writeFrames.pop_front();
+ AMQFrame& frame = writeFrames.front();
+ writeFrames.pop_front();
+ // No need to lock here - only one thread can be writing at a time.
+ out.clear();
+ if (isTrace)
+ cout << "Send on socket " << writeFd << ": " << frame << endl;
+ frame.encode(out);
+ out.flip();
}
- // No need to lock here - only one thread can be writing at a time.
- out.clear();
- if (isTrace)
- cout << "Send on socket " << writeFd << ": " << *frame << endl;
- frame->encode(out);
- out.flip();
// TODO: AMS 1/6/07 This only works because we already have the correct fd
// in the descriptor - change not to use assigment
writeEvent = WriteEvent(
@@ -225,11 +224,10 @@ void EventChannelConnection::endRead() {
in.flip();
AMQFrame frame;
while (frame.decode(in)) {
- // TODO aconway 2006-11-30: received should take Frame&
if (isTrace)
cout << "Received on socket " << readFd
<< ": " << frame << endl;
- handler->received(&frame);
+ handler->received(frame);
}
in.compact();
startRead();
diff --git a/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h b/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h
index 394df55fd9..a4ca5de517 100644
--- a/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h
+++ b/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h
@@ -50,17 +50,11 @@ class EventChannelConnection : public ConnectionOutputHandler {
bool isTrace = false
);
- // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr
- virtual void send(qpid::framing::AMQFrame* frame) {
- send(std::auto_ptr<qpid::framing::AMQFrame>(frame));
- }
-
- virtual void send(std::auto_ptr<qpid::framing::AMQFrame> frame);
-
+ virtual void send(qpid::framing::AMQFrame& frame);
virtual void close();
private:
- typedef boost::ptr_deque<qpid::framing::AMQFrame> FrameQueue;
+ typedef std::deque<qpid::framing::AMQFrame> FrameQueue;
typedef void (EventChannelConnection::*MemberFnPtr)();
struct ScopedBusy;