diff options
author | Alan Conway <aconway@apache.org> | 2006-12-20 22:57:54 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-12-20 22:57:54 +0000 |
commit | 6e148b97b5c57655366e6a431fc3d52c1bc4a360 (patch) | |
tree | 0cef8b74342e43a67ed262741abe3fa42696de7a | |
parent | 3a6a3b7522ec1e6e51061458efed774150420ec4 (diff) | |
download | qpid-python-6e148b97b5c57655366e6a431fc3d52c1bc4a360.tar.gz |
EventChannel.h/cpp: Simpler event completion, events are always returned on
first wakeup even if they have not read/written all required data. Application
is responsible for re-posting the event or posting new events as required.
EventChannelConnection::endWrite(): re-posts writeEvent till entire
frame is written (required by simplificationa above.)
Added EventChannel::shutdown(): causes EventChannel::wait() to throw
an EventChannel::ShutdownException() to all waiting threads. Simplified
EventChannelThreads::shutdown() to take advantage of this.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/event-queue-2006-12-20@489218 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/lib/common/sys/posix/EventChannel.cpp | 176 | ||||
-rw-r--r-- | cpp/lib/common/sys/posix/EventChannel.h | 27 | ||||
-rw-r--r-- | cpp/lib/common/sys/posix/EventChannelConnection.cpp | 16 | ||||
-rw-r--r-- | cpp/lib/common/sys/posix/EventChannelThreads.cpp | 15 | ||||
-rw-r--r-- | cpp/tests/EventChannelTest.cpp | 2 |
5 files changed, 160 insertions, 76 deletions
diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp index ee6b6bbe2b..37e3c2257a 100644 --- a/cpp/lib/common/sys/posix/EventChannel.cpp +++ b/cpp/lib/common/sys/posix/EventChannel.cpp @@ -43,6 +43,7 @@ #include "check.h" #include "EventChannel.h" +#include "sys/AtomicCount.h" using namespace std; @@ -57,7 +58,17 @@ namespace sys { namespace { typedef enum { IN, OUT } Direction; + typedef std::pair<Event*, Event*> EventPair; + +/** + * Template to zero out a C-struct on construction. Avoids uninitialized memory + * warnings from valgrind or other mem checking tool. + */ +template <class T> struct CleanStruct : public T { + CleanStruct() { memset(this, 0, sizeof(*this)); } +}; + } // namespace /** @@ -153,25 +164,29 @@ class EventChannel::Impl { Queue& getDispatchQueue() { return *dispatchQueue; } + void shutdown(); + private: typedef boost::ptr_map<int, Descriptor> DescriptorMap; - Mutex lock; + Monitor monitor; int epollFd; DescriptorMap descriptors; int pipe[2]; + AtomicCount nWaiters; Queue* dispatchQueue; + bool isShutdown; }; - // ================================================================ // EventChannel::Queue::implementation. static const char* shutdownMsg = "Event queue shut down."; -EventChannel::Queue::Queue(Descriptor& d, Direction dir) : lock(d.lock), descriptor(d), +EventChannel::Queue::Queue(Descriptor& d, Direction dir) : + lock(d.lock), descriptor(d), myEvent(dir==IN ? EPOLLIN : EPOLLOUT) {} @@ -193,11 +208,14 @@ void EventChannel::Queue::setBit(uint32_t &epollFlags) { Event* EventChannel::Queue::wake(uint32_t epollFlags) { // Called with lock held. if (!queue.empty() && (isMyEvent(epollFlags))) { - Event* e = queue.front()->complete(descriptor); - if (e) { - queue.pop_front(); - return e; - } + assert(!queue.empty()); + Event* e = queue.front(); + assert(e); + // TODO aconway 2006-12-20: Can/should we move event completion + // out into dispatch() so it doesn't happen in Descriptor locks? + e->complete(descriptor); + queue.pop_front(); + return e; } return 0; } @@ -250,9 +268,7 @@ void EventChannel::Descriptor::update() { void EventChannel::Descriptor::epollCtl(int op, uint32_t events) { // Caller holds lock - assert(!isShutdown()); - struct epoll_event ee; - memset(&ee, 0, sizeof(ee)); + CleanStruct<epoll_event> ee; ee.data.ptr = this; ee.events = events; int status = ::epoll_ctl(epollFd, op, myFd, &ee); @@ -263,7 +279,6 @@ void EventChannel::Descriptor::epollCtl(int op, uint32_t events) { EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) { Mutex::ScopedLock l(lock); - cout << "DEBUG: " << std::hex << epollEvents << std::dec << endl; // If we have an error: if (epollEvents & (EPOLLERR | EPOLLHUP)) { shutdownUnsafe(); @@ -282,7 +297,7 @@ EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) { EventChannel::Impl::Impl(int epollSize): - epollFd(-1), dispatchQueue(0) + epollFd(-1), dispatchQueue(0), isShutdown(false) { // Create the epoll file descriptor. epollFd = epoll_create(epollSize); @@ -298,22 +313,90 @@ EventChannel::Impl::Impl(int epollSize): } EventChannel::Impl::~Impl() { - close(epollFd); - close(pipe[0]); - close(pipe[1]); + shutdown(); + ::close(epollFd); + ::close(pipe[0]); + ::close(pipe[1]); } +void EventChannel::Impl::shutdown() { + Monitor::ScopedLock l(monitor); + if (!isShutdown) { // I'm starting shutdown. + isShutdown = true; + if (nWaiters == 0) + return; + + // TODO aconway 2006-12-20: If I just close the epollFd will + // that wake all threads? If so with what? Would be simpler than: + + // Create a pipe and write a single byte. The byte is never + // read so the pipes read fd is always ready for read. + // Since we use level-triggered epoll this will wake up all + // wait() threads. + // + QPID_POSIX_CHECK(::pipe(pipe)); + static char zero = '\0'; + QPID_POSIX_CHECK(::write(pipe[1], &zero, 1)); + CleanStruct<epoll_event> ee; + ee.data.ptr = 0; + ee.events = EPOLLIN; + QPID_POSIX_CHECK(epoll_ctl(epollFd, EPOLL_CTL_ADD, pipe[0], &ee)); + } + // Wait for nWaiters to get out. + while (nWaiters > 0) { + monitor.wait(); + } +} + +// TODO aconway 2006-12-20: DEBUG remove +struct epoll { + epoll(uint32_t e) : events(e) { } + uint32_t events; +}; + +#define BIT(X) out << ((e.events & X) ? __STRING(X) "." : "") +ostream& operator << (ostream& out, epoll e) { + out << "epoll_event.events: "; + BIT(EPOLLIN); + BIT(EPOLLPRI); + BIT(EPOLLOUT); + BIT(EPOLLRDNORM); + BIT(EPOLLRDBAND); + BIT(EPOLLWRNORM); + BIT(EPOLLWRBAND); + BIT(EPOLLMSG); + BIT(EPOLLERR); + BIT(EPOLLHUP); + BIT(EPOLLONESHOT); + BIT(EPOLLET); + return out; +} + + + /** * Wait for epoll to wake up, return the descriptor or 0 on timeout. */ Event* EventChannel::Impl::wait(Time timeoutNs) { + { + Monitor::ScopedLock l(monitor); + if (isShutdown) + throw ShutdownException(); + } + + // Increase nWaiters for the duration, notify the monitor if I'm + // the last one out. + // + AtomicCount::ScopedIncrement si( + nWaiters, boost::bind(&Monitor::notifyAll, &monitor)); + // No lock, all thread safe calls or local variables: // const long timeoutMs = (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC; - struct epoll_event ee; + CleanStruct<epoll_event> ee; Event* event = 0; bool doSwap = true; @@ -324,14 +407,18 @@ Event* EventChannel::Impl::wait(Time timeoutNs) int n = epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe. if (n == 0) // Timeout return 0; - if (n < 0 && errno != EINTR) // Interrupt, ignore it. + if (n < 0 && errno == EINTR) // Interrupt, ignore it. continue; if (n < 0) throw QPID_POSIX_ERROR(errno); assert(n == 1); Descriptor* ed = reinterpret_cast<Descriptor*>(ee.data.ptr); - assert(ed); + if (ed == 0) // We're being shut-down. + throw ShutdownException(); + assert(ed != 0); + // TODO aconway 2006-12-20: DEBUG + cout << endl << epoll(ee.events) << endl; EventPair ready = ed->wake(ee.events); // We can only return one event so if both completed push one @@ -352,7 +439,7 @@ Event* EventChannel::Impl::wait(Time timeoutNs) } EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(monitor); Descriptor& ed = descriptors[fd]; ed.activate(epollFd, fd); return ed; @@ -385,6 +472,10 @@ Event* EventChannel::wait(Time timeoutNs) return impl->wait(timeoutNs); } +void EventChannel::shutdown() { + impl->shutdown(); +} + // ================================================================ // Event and subclasses. @@ -430,31 +521,18 @@ void ReadEvent::prepare(EventChannel::Impl& impl) { impl.getDescriptor(descriptor).getQueue(IN).push(this); } -Event* ReadEvent::complete(EventChannel::Descriptor& ed) +void ReadEvent::complete(EventChannel::Descriptor& ed) { ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + bytesRead, size - bytesRead); - - if (n < 0 && errno != EAGAIN) { // Error - setException(QPID_POSIX_ERROR(errno)); - ed.shutdownUnsafe(); // Called with lock held. - } - else if (n == 0) { // End of file - // TODO aconway 2006-12-13: Don't treat EOF as exception - // unless we're partway thru a !noWait read. - setException(QPID_POSIX_ERROR(ENODATA)); - ed.shutdownUnsafe(); // Called with lock held. + if (n > 0) + bytesRead += n; + if (n == 0 || (n < 0 && errno != EAGAIN)) { + // Use ENODATA for file closed. + setException(QPID_POSIX_ERROR(n == 0 ? ENODATA : errno)); + ed.shutdownUnsafe(); } - else { - if (n > 0) // possible that n < 0 && errno == EAGAIN - bytesRead += n; - if (bytesRead < size && !noWait) { - // Continue reading, not enough data. - return 0; - } - } - return this; } @@ -463,42 +541,36 @@ void WriteEvent::prepare(EventChannel::Impl& impl) { } -Event* WriteEvent::complete(EventChannel::Descriptor& ed) +void WriteEvent::complete(EventChannel::Descriptor& ed) { ssize_t n = ::write(descriptor, static_cast<const char*>(buffer) + bytesWritten, size - bytesWritten); - if(n < 0 && errno == EAGAIN && noWait) { - return 0; - } - if (n < 0 || (bytesWritten += n) < size) { + if (n > 0) + bytesWritten += n; + if(n < 0 && errno != EAGAIN) { setException(QPID_POSIX_ERROR(errno)); ed.shutdownUnsafe(); // Called with lock held. } - return this; } void AcceptEvent::prepare(EventChannel::Impl& impl) { impl.getDescriptor(descriptor).getQueue(IN).push(this); } -Event* AcceptEvent::complete(EventChannel::Descriptor& ed) +void AcceptEvent::complete(EventChannel::Descriptor& ed) { accepted = ::accept(descriptor, 0, 0); if (accepted < 0) { setException(QPID_POSIX_ERROR(errno)); ed.shutdownUnsafe(); // Called with lock held. } - return this; } void DispatchEvent::prepare(EventChannel::Impl& impl) { impl.getDispatchQueue().push(this); } -Event* DispatchEvent::complete(EventChannel::Descriptor&) -{ - return this; -} +void DispatchEvent::complete(EventChannel::Descriptor&) {} }} diff --git a/cpp/lib/common/sys/posix/EventChannel.h b/cpp/lib/common/sys/posix/EventChannel.h index 60c4026fbc..0181a9c780 100644 --- a/cpp/lib/common/sys/posix/EventChannel.h +++ b/cpp/lib/common/sys/posix/EventChannel.h @@ -40,6 +40,9 @@ class EventChannel : public qpid::SharedObject<EventChannel> public: static shared_ptr create(); + /** Exception throw from wait() if channel is shut down. */ + class ShutdownException : public qpid::Exception {}; + ~EventChannel(); /** Post an event to the channel. */ @@ -51,9 +54,18 @@ class EventChannel : public qpid::SharedObject<EventChannel> /** * Wait for the next complete event, up to timeout. *@return Pointer to event or 0 if timeout elapses. + *@exception ShutdownException if the channel is shut down. */ Event* wait(Time timeout = TIME_INFINITE); + /** + * Shut down the event channel. + * Blocks till all threads have exited wait() + */ + void shutdown(); + + + // Internal classes. class Impl; class Queue; class Descriptor; @@ -68,6 +80,7 @@ class EventChannel : public qpid::SharedObject<EventChannel> /** * Base class for all Events. + * * Derived classes define events representing various async IO operations. * When an event is complete, it is returned by the EventChannel to * a thread calling wait. The thread will call Event::dispatch() to @@ -86,7 +99,7 @@ class Event /** *If there was an exception processing this Event, return it. - *@return 0 if there was no exception. Caller must not delete. + *@return 0 if there was no exception. */ qpid::Exception::shared_ptr_const getException() const; @@ -103,7 +116,7 @@ class Event Event(Callback cb=0) : callback(cb) {} virtual void prepare(EventChannel::Impl&) = 0; - virtual Event* complete(EventChannel::Descriptor&) = 0; + virtual void complete(EventChannel::Descriptor&) = 0; Callback callback; Exception::shared_ptr_const exception; @@ -122,7 +135,7 @@ class DispatchEvent : public Event { protected: void prepare(EventChannel::Impl&); - Event* complete(EventChannel::Descriptor&); + void complete(EventChannel::Descriptor&); }; // Utility base class. @@ -148,7 +161,7 @@ class IOEvent : public FDEvent { size_t size; bool noWait; }; - + /** Asynchronous read event */ class ReadEvent : public IOEvent { @@ -163,7 +176,7 @@ class ReadEvent : public IOEvent private: void prepare(EventChannel::Impl&); - Event* complete(EventChannel::Descriptor&); + void complete(EventChannel::Descriptor&); ssize_t doRead(); void* buffer; @@ -183,7 +196,7 @@ class WriteEvent : public IOEvent private: void prepare(EventChannel::Impl&); - Event* complete(EventChannel::Descriptor&); + void complete(EventChannel::Descriptor&); ssize_t doWrite(); const void* buffer; @@ -204,7 +217,7 @@ class AcceptEvent : public FDEvent private: void prepare(EventChannel::Impl&); - Event* complete(EventChannel::Descriptor&); + void complete(EventChannel::Descriptor&); int accepted; }; diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.cpp b/cpp/lib/common/sys/posix/EventChannelConnection.cpp index 196dde5af8..5227ef8190 100644 --- a/cpp/lib/common/sys/posix/EventChannelConnection.cpp +++ b/cpp/lib/common/sys/posix/EventChannelConnection.cpp @@ -54,8 +54,8 @@ EventChannelConnection::EventChannelConnection( out(bufferSize), isTrace(isTrace_) { - BOOST_ASSERT(readFd > 0); - BOOST_ASSERT(writeFd > 0); + assert(readFd > 0); + assert(writeFd > 0); closeOnException(&EventChannelConnection::startRead); } @@ -161,12 +161,18 @@ void EventChannelConnection::endWrite() { ScopedBusy(*this); { Monitor::ScopedLock lock(monitor); + assert(isWriting); isWriting = false; - if (isClosed) + if (isClosed) return; writeEvent.throwIfException(); + if (writeEvent.getBytesWritten() < writeEvent.getSize()) { + // Keep writing the current event till done. + isWriting = true; + threads->post(writeEvent); + } } - // Check if there's more in to write in the write queue. + // Continue writing from writeFrames queue. startWrite(); } @@ -179,7 +185,7 @@ void EventChannelConnection::endWrite() { void EventChannelConnection::startRead() { // Non blocking read, as much as we can swallow. readEvent = ReadEvent( - readFd, in.start(), in.available(), readCallback,true); + readFd, in.start(), in.available(), readCallback); threads->post(readEvent); } diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.cpp b/cpp/lib/common/sys/posix/EventChannelThreads.cpp index 787da72ffa..d762a600c5 100644 --- a/cpp/lib/common/sys/posix/EventChannelThreads.cpp +++ b/cpp/lib/common/sys/posix/EventChannelThreads.cpp @@ -57,18 +57,13 @@ EventChannelThreads::~EventChannelThreads() { join(); } -// Termination marker event. -static DispatchEvent terminate; - void EventChannelThreads::shutdown() { Monitor::ScopedLock lock(monitor); if (state != RUNNING) // Already shutting down. return; state = TERMINATING; - for (size_t i = 0; i < workers.size(); ++i) { - channel->post(terminate); - } + channel->shutdown(); monitor.notify(); // Wake up one join() thread. } @@ -113,8 +108,6 @@ void EventChannelThreads::run() while (true) { Event* e = channel->wait(); assert(e != 0); - if (e == &terminate) - return; AtomicCount::ScopedDecrement dec(nWaiting); // Make sure there's at least one waiting thread. if (dec == 0 && state == RUNNING) @@ -122,12 +115,12 @@ void EventChannelThreads::run() e->dispatch(); } } + catch (const EventChannel::ShutdownException& e) { + return; + } catch (const std::exception& e) { Exception::log(e, "Exception in EventChannelThreads::run()"); } - catch (...) { - Exception::logUnknown("Exception in EventChannelThreads::run()"); - } } }} diff --git a/cpp/tests/EventChannelTest.cpp b/cpp/tests/EventChannelTest.cpp index 67b8b03ce2..bba0a9cd9b 100644 --- a/cpp/tests/EventChannelTest.cpp +++ b/cpp/tests/EventChannelTest.cpp @@ -108,7 +108,7 @@ class EventChannelTest : public CppUnit::TestCase } void testPartialRead() { - ReadEvent re(pipe[0], readBuf, size, 0, true); + ReadEvent re(pipe[0], readBuf, size, 0); ec->post(re); CPPUNIT_ASSERT_EQUAL(ssize_t(size/2), ::write(pipe[1], hello, size/2)); CPPUNIT_ASSERT(isNextEventOk(re)); |