summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-12-20 22:57:54 +0000
committerAlan Conway <aconway@apache.org>2006-12-20 22:57:54 +0000
commit6e148b97b5c57655366e6a431fc3d52c1bc4a360 (patch)
tree0cef8b74342e43a67ed262741abe3fa42696de7a
parent3a6a3b7522ec1e6e51061458efed774150420ec4 (diff)
downloadqpid-python-6e148b97b5c57655366e6a431fc3d52c1bc4a360.tar.gz
EventChannel.h/cpp: Simpler event completion, events are always returned on
first wakeup even if they have not read/written all required data. Application is responsible for re-posting the event or posting new events as required. EventChannelConnection::endWrite(): re-posts writeEvent till entire frame is written (required by simplificationa above.) Added EventChannel::shutdown(): causes EventChannel::wait() to throw an EventChannel::ShutdownException() to all waiting threads. Simplified EventChannelThreads::shutdown() to take advantage of this. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/event-queue-2006-12-20@489218 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/lib/common/sys/posix/EventChannel.cpp176
-rw-r--r--cpp/lib/common/sys/posix/EventChannel.h27
-rw-r--r--cpp/lib/common/sys/posix/EventChannelConnection.cpp16
-rw-r--r--cpp/lib/common/sys/posix/EventChannelThreads.cpp15
-rw-r--r--cpp/tests/EventChannelTest.cpp2
5 files changed, 160 insertions, 76 deletions
diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp
index ee6b6bbe2b..37e3c2257a 100644
--- a/cpp/lib/common/sys/posix/EventChannel.cpp
+++ b/cpp/lib/common/sys/posix/EventChannel.cpp
@@ -43,6 +43,7 @@
#include "check.h"
#include "EventChannel.h"
+#include "sys/AtomicCount.h"
using namespace std;
@@ -57,7 +58,17 @@ namespace sys {
namespace {
typedef enum { IN, OUT } Direction;
+
typedef std::pair<Event*, Event*> EventPair;
+
+/**
+ * Template to zero out a C-struct on construction. Avoids uninitialized memory
+ * warnings from valgrind or other mem checking tool.
+ */
+template <class T> struct CleanStruct : public T {
+ CleanStruct() { memset(this, 0, sizeof(*this)); }
+};
+
} // namespace
/**
@@ -153,25 +164,29 @@ class EventChannel::Impl {
Queue& getDispatchQueue() { return *dispatchQueue; }
+ void shutdown();
+
private:
typedef boost::ptr_map<int, Descriptor> DescriptorMap;
- Mutex lock;
+ Monitor monitor;
int epollFd;
DescriptorMap descriptors;
int pipe[2];
+ AtomicCount nWaiters;
Queue* dispatchQueue;
+ bool isShutdown;
};
-
// ================================================================
// EventChannel::Queue::implementation.
static const char* shutdownMsg = "Event queue shut down.";
-EventChannel::Queue::Queue(Descriptor& d, Direction dir) : lock(d.lock), descriptor(d),
+EventChannel::Queue::Queue(Descriptor& d, Direction dir) :
+ lock(d.lock), descriptor(d),
myEvent(dir==IN ? EPOLLIN : EPOLLOUT)
{}
@@ -193,11 +208,14 @@ void EventChannel::Queue::setBit(uint32_t &epollFlags) {
Event* EventChannel::Queue::wake(uint32_t epollFlags) {
// Called with lock held.
if (!queue.empty() && (isMyEvent(epollFlags))) {
- Event* e = queue.front()->complete(descriptor);
- if (e) {
- queue.pop_front();
- return e;
- }
+ assert(!queue.empty());
+ Event* e = queue.front();
+ assert(e);
+ // TODO aconway 2006-12-20: Can/should we move event completion
+ // out into dispatch() so it doesn't happen in Descriptor locks?
+ e->complete(descriptor);
+ queue.pop_front();
+ return e;
}
return 0;
}
@@ -250,9 +268,7 @@ void EventChannel::Descriptor::update() {
void EventChannel::Descriptor::epollCtl(int op, uint32_t events) {
// Caller holds lock
- assert(!isShutdown());
- struct epoll_event ee;
- memset(&ee, 0, sizeof(ee));
+ CleanStruct<epoll_event> ee;
ee.data.ptr = this;
ee.events = events;
int status = ::epoll_ctl(epollFd, op, myFd, &ee);
@@ -263,7 +279,6 @@ void EventChannel::Descriptor::epollCtl(int op, uint32_t events) {
EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) {
Mutex::ScopedLock l(lock);
- cout << "DEBUG: " << std::hex << epollEvents << std::dec << endl;
// If we have an error:
if (epollEvents & (EPOLLERR | EPOLLHUP)) {
shutdownUnsafe();
@@ -282,7 +297,7 @@ EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) {
EventChannel::Impl::Impl(int epollSize):
- epollFd(-1), dispatchQueue(0)
+ epollFd(-1), dispatchQueue(0), isShutdown(false)
{
// Create the epoll file descriptor.
epollFd = epoll_create(epollSize);
@@ -298,22 +313,90 @@ EventChannel::Impl::Impl(int epollSize):
}
EventChannel::Impl::~Impl() {
- close(epollFd);
- close(pipe[0]);
- close(pipe[1]);
+ shutdown();
+ ::close(epollFd);
+ ::close(pipe[0]);
+ ::close(pipe[1]);
}
+void EventChannel::Impl::shutdown() {
+ Monitor::ScopedLock l(monitor);
+ if (!isShutdown) { // I'm starting shutdown.
+ isShutdown = true;
+ if (nWaiters == 0)
+ return;
+
+ // TODO aconway 2006-12-20: If I just close the epollFd will
+ // that wake all threads? If so with what? Would be simpler than:
+
+ // Create a pipe and write a single byte. The byte is never
+ // read so the pipes read fd is always ready for read.
+ // Since we use level-triggered epoll this will wake up all
+ // wait() threads.
+ //
+ QPID_POSIX_CHECK(::pipe(pipe));
+ static char zero = '\0';
+ QPID_POSIX_CHECK(::write(pipe[1], &zero, 1));
+ CleanStruct<epoll_event> ee;
+ ee.data.ptr = 0;
+ ee.events = EPOLLIN;
+ QPID_POSIX_CHECK(epoll_ctl(epollFd, EPOLL_CTL_ADD, pipe[0], &ee));
+ }
+ // Wait for nWaiters to get out.
+ while (nWaiters > 0) {
+ monitor.wait();
+ }
+}
+
+// TODO aconway 2006-12-20: DEBUG remove
+struct epoll {
+ epoll(uint32_t e) : events(e) { }
+ uint32_t events;
+};
+
+#define BIT(X) out << ((e.events & X) ? __STRING(X) "." : "")
+ostream& operator << (ostream& out, epoll e) {
+ out << "epoll_event.events: ";
+ BIT(EPOLLIN);
+ BIT(EPOLLPRI);
+ BIT(EPOLLOUT);
+ BIT(EPOLLRDNORM);
+ BIT(EPOLLRDBAND);
+ BIT(EPOLLWRNORM);
+ BIT(EPOLLWRBAND);
+ BIT(EPOLLMSG);
+ BIT(EPOLLERR);
+ BIT(EPOLLHUP);
+ BIT(EPOLLONESHOT);
+ BIT(EPOLLET);
+ return out;
+}
+
+
+
/**
* Wait for epoll to wake up, return the descriptor or 0 on timeout.
*/
Event* EventChannel::Impl::wait(Time timeoutNs)
{
+ {
+ Monitor::ScopedLock l(monitor);
+ if (isShutdown)
+ throw ShutdownException();
+ }
+
+ // Increase nWaiters for the duration, notify the monitor if I'm
+ // the last one out.
+ //
+ AtomicCount::ScopedIncrement si(
+ nWaiters, boost::bind(&Monitor::notifyAll, &monitor));
+
// No lock, all thread safe calls or local variables:
//
const long timeoutMs =
(timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC;
- struct epoll_event ee;
+ CleanStruct<epoll_event> ee;
Event* event = 0;
bool doSwap = true;
@@ -324,14 +407,18 @@ Event* EventChannel::Impl::wait(Time timeoutNs)
int n = epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe.
if (n == 0) // Timeout
return 0;
- if (n < 0 && errno != EINTR) // Interrupt, ignore it.
+ if (n < 0 && errno == EINTR) // Interrupt, ignore it.
continue;
if (n < 0)
throw QPID_POSIX_ERROR(errno);
assert(n == 1);
Descriptor* ed =
reinterpret_cast<Descriptor*>(ee.data.ptr);
- assert(ed);
+ if (ed == 0) // We're being shut-down.
+ throw ShutdownException();
+ assert(ed != 0);
+ // TODO aconway 2006-12-20: DEBUG
+ cout << endl << epoll(ee.events) << endl;
EventPair ready = ed->wake(ee.events);
// We can only return one event so if both completed push one
@@ -352,7 +439,7 @@ Event* EventChannel::Impl::wait(Time timeoutNs)
}
EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) {
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(monitor);
Descriptor& ed = descriptors[fd];
ed.activate(epollFd, fd);
return ed;
@@ -385,6 +472,10 @@ Event* EventChannel::wait(Time timeoutNs)
return impl->wait(timeoutNs);
}
+void EventChannel::shutdown() {
+ impl->shutdown();
+}
+
// ================================================================
// Event and subclasses.
@@ -430,31 +521,18 @@ void ReadEvent::prepare(EventChannel::Impl& impl) {
impl.getDescriptor(descriptor).getQueue(IN).push(this);
}
-Event* ReadEvent::complete(EventChannel::Descriptor& ed)
+void ReadEvent::complete(EventChannel::Descriptor& ed)
{
ssize_t n = ::read(descriptor,
static_cast<char*>(buffer) + bytesRead,
size - bytesRead);
-
- if (n < 0 && errno != EAGAIN) { // Error
- setException(QPID_POSIX_ERROR(errno));
- ed.shutdownUnsafe(); // Called with lock held.
- }
- else if (n == 0) { // End of file
- // TODO aconway 2006-12-13: Don't treat EOF as exception
- // unless we're partway thru a !noWait read.
- setException(QPID_POSIX_ERROR(ENODATA));
- ed.shutdownUnsafe(); // Called with lock held.
+ if (n > 0)
+ bytesRead += n;
+ if (n == 0 || (n < 0 && errno != EAGAIN)) {
+ // Use ENODATA for file closed.
+ setException(QPID_POSIX_ERROR(n == 0 ? ENODATA : errno));
+ ed.shutdownUnsafe();
}
- else {
- if (n > 0) // possible that n < 0 && errno == EAGAIN
- bytesRead += n;
- if (bytesRead < size && !noWait) {
- // Continue reading, not enough data.
- return 0;
- }
- }
- return this;
}
@@ -463,42 +541,36 @@ void WriteEvent::prepare(EventChannel::Impl& impl) {
}
-Event* WriteEvent::complete(EventChannel::Descriptor& ed)
+void WriteEvent::complete(EventChannel::Descriptor& ed)
{
ssize_t n = ::write(descriptor,
static_cast<const char*>(buffer) + bytesWritten,
size - bytesWritten);
- if(n < 0 && errno == EAGAIN && noWait) {
- return 0;
- }
- if (n < 0 || (bytesWritten += n) < size) {
+ if (n > 0)
+ bytesWritten += n;
+ if(n < 0 && errno != EAGAIN) {
setException(QPID_POSIX_ERROR(errno));
ed.shutdownUnsafe(); // Called with lock held.
}
- return this;
}
void AcceptEvent::prepare(EventChannel::Impl& impl) {
impl.getDescriptor(descriptor).getQueue(IN).push(this);
}
-Event* AcceptEvent::complete(EventChannel::Descriptor& ed)
+void AcceptEvent::complete(EventChannel::Descriptor& ed)
{
accepted = ::accept(descriptor, 0, 0);
if (accepted < 0) {
setException(QPID_POSIX_ERROR(errno));
ed.shutdownUnsafe(); // Called with lock held.
}
- return this;
}
void DispatchEvent::prepare(EventChannel::Impl& impl) {
impl.getDispatchQueue().push(this);
}
-Event* DispatchEvent::complete(EventChannel::Descriptor&)
-{
- return this;
-}
+void DispatchEvent::complete(EventChannel::Descriptor&) {}
}}
diff --git a/cpp/lib/common/sys/posix/EventChannel.h b/cpp/lib/common/sys/posix/EventChannel.h
index 60c4026fbc..0181a9c780 100644
--- a/cpp/lib/common/sys/posix/EventChannel.h
+++ b/cpp/lib/common/sys/posix/EventChannel.h
@@ -40,6 +40,9 @@ class EventChannel : public qpid::SharedObject<EventChannel>
public:
static shared_ptr create();
+ /** Exception throw from wait() if channel is shut down. */
+ class ShutdownException : public qpid::Exception {};
+
~EventChannel();
/** Post an event to the channel. */
@@ -51,9 +54,18 @@ class EventChannel : public qpid::SharedObject<EventChannel>
/**
* Wait for the next complete event, up to timeout.
*@return Pointer to event or 0 if timeout elapses.
+ *@exception ShutdownException if the channel is shut down.
*/
Event* wait(Time timeout = TIME_INFINITE);
+ /**
+ * Shut down the event channel.
+ * Blocks till all threads have exited wait()
+ */
+ void shutdown();
+
+
+ // Internal classes.
class Impl;
class Queue;
class Descriptor;
@@ -68,6 +80,7 @@ class EventChannel : public qpid::SharedObject<EventChannel>
/**
* Base class for all Events.
+ *
* Derived classes define events representing various async IO operations.
* When an event is complete, it is returned by the EventChannel to
* a thread calling wait. The thread will call Event::dispatch() to
@@ -86,7 +99,7 @@ class Event
/**
*If there was an exception processing this Event, return it.
- *@return 0 if there was no exception. Caller must not delete.
+ *@return 0 if there was no exception.
*/
qpid::Exception::shared_ptr_const getException() const;
@@ -103,7 +116,7 @@ class Event
Event(Callback cb=0) : callback(cb) {}
virtual void prepare(EventChannel::Impl&) = 0;
- virtual Event* complete(EventChannel::Descriptor&) = 0;
+ virtual void complete(EventChannel::Descriptor&) = 0;
Callback callback;
Exception::shared_ptr_const exception;
@@ -122,7 +135,7 @@ class DispatchEvent : public Event {
protected:
void prepare(EventChannel::Impl&);
- Event* complete(EventChannel::Descriptor&);
+ void complete(EventChannel::Descriptor&);
};
// Utility base class.
@@ -148,7 +161,7 @@ class IOEvent : public FDEvent {
size_t size;
bool noWait;
};
-
+
/** Asynchronous read event */
class ReadEvent : public IOEvent
{
@@ -163,7 +176,7 @@ class ReadEvent : public IOEvent
private:
void prepare(EventChannel::Impl&);
- Event* complete(EventChannel::Descriptor&);
+ void complete(EventChannel::Descriptor&);
ssize_t doRead();
void* buffer;
@@ -183,7 +196,7 @@ class WriteEvent : public IOEvent
private:
void prepare(EventChannel::Impl&);
- Event* complete(EventChannel::Descriptor&);
+ void complete(EventChannel::Descriptor&);
ssize_t doWrite();
const void* buffer;
@@ -204,7 +217,7 @@ class AcceptEvent : public FDEvent
private:
void prepare(EventChannel::Impl&);
- Event* complete(EventChannel::Descriptor&);
+ void complete(EventChannel::Descriptor&);
int accepted;
};
diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.cpp b/cpp/lib/common/sys/posix/EventChannelConnection.cpp
index 196dde5af8..5227ef8190 100644
--- a/cpp/lib/common/sys/posix/EventChannelConnection.cpp
+++ b/cpp/lib/common/sys/posix/EventChannelConnection.cpp
@@ -54,8 +54,8 @@ EventChannelConnection::EventChannelConnection(
out(bufferSize),
isTrace(isTrace_)
{
- BOOST_ASSERT(readFd > 0);
- BOOST_ASSERT(writeFd > 0);
+ assert(readFd > 0);
+ assert(writeFd > 0);
closeOnException(&EventChannelConnection::startRead);
}
@@ -161,12 +161,18 @@ void EventChannelConnection::endWrite() {
ScopedBusy(*this);
{
Monitor::ScopedLock lock(monitor);
+ assert(isWriting);
isWriting = false;
- if (isClosed)
+ if (isClosed)
return;
writeEvent.throwIfException();
+ if (writeEvent.getBytesWritten() < writeEvent.getSize()) {
+ // Keep writing the current event till done.
+ isWriting = true;
+ threads->post(writeEvent);
+ }
}
- // Check if there's more in to write in the write queue.
+ // Continue writing from writeFrames queue.
startWrite();
}
@@ -179,7 +185,7 @@ void EventChannelConnection::endWrite() {
void EventChannelConnection::startRead() {
// Non blocking read, as much as we can swallow.
readEvent = ReadEvent(
- readFd, in.start(), in.available(), readCallback,true);
+ readFd, in.start(), in.available(), readCallback);
threads->post(readEvent);
}
diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.cpp b/cpp/lib/common/sys/posix/EventChannelThreads.cpp
index 787da72ffa..d762a600c5 100644
--- a/cpp/lib/common/sys/posix/EventChannelThreads.cpp
+++ b/cpp/lib/common/sys/posix/EventChannelThreads.cpp
@@ -57,18 +57,13 @@ EventChannelThreads::~EventChannelThreads() {
join();
}
-// Termination marker event.
-static DispatchEvent terminate;
-
void EventChannelThreads::shutdown()
{
Monitor::ScopedLock lock(monitor);
if (state != RUNNING) // Already shutting down.
return;
state = TERMINATING;
- for (size_t i = 0; i < workers.size(); ++i) {
- channel->post(terminate);
- }
+ channel->shutdown();
monitor.notify(); // Wake up one join() thread.
}
@@ -113,8 +108,6 @@ void EventChannelThreads::run()
while (true) {
Event* e = channel->wait();
assert(e != 0);
- if (e == &terminate)
- return;
AtomicCount::ScopedDecrement dec(nWaiting);
// Make sure there's at least one waiting thread.
if (dec == 0 && state == RUNNING)
@@ -122,12 +115,12 @@ void EventChannelThreads::run()
e->dispatch();
}
}
+ catch (const EventChannel::ShutdownException& e) {
+ return;
+ }
catch (const std::exception& e) {
Exception::log(e, "Exception in EventChannelThreads::run()");
}
- catch (...) {
- Exception::logUnknown("Exception in EventChannelThreads::run()");
- }
}
}}
diff --git a/cpp/tests/EventChannelTest.cpp b/cpp/tests/EventChannelTest.cpp
index 67b8b03ce2..bba0a9cd9b 100644
--- a/cpp/tests/EventChannelTest.cpp
+++ b/cpp/tests/EventChannelTest.cpp
@@ -108,7 +108,7 @@ class EventChannelTest : public CppUnit::TestCase
}
void testPartialRead() {
- ReadEvent re(pipe[0], readBuf, size, 0, true);
+ ReadEvent re(pipe[0], readBuf, size, 0);
ec->post(re);
CPPUNIT_ASSERT_EQUAL(ssize_t(size/2), ::write(pipe[1], hello, size/2));
CPPUNIT_ASSERT(isNextEventOk(re));