diff options
author | Alan Conway <aconway@apache.org> | 2006-11-29 14:36:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-11-29 14:36:08 +0000 |
commit | b13e1a24fcca8797b7be5a242f164afbe17ec4f6 (patch) | |
tree | ef0362e52c125bc75b07ef3e374dabfa52254e98 /cpp/src/qpid/posix/EventChannel.cpp | |
parent | 16d818e749462daf5e0e43079b2e48991646c619 (diff) | |
download | qpid-python-b13e1a24fcca8797b7be5a242f164afbe17ec4f6.tar.gz |
Posix EventChannel implementation using epoll. Placeholder for kevents.
Dynamic thread pool EventChannelThreads to serve EventChannel.
Misc cleanup/enhancements.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480582 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/posix/EventChannel.cpp')
-rw-r--r-- | cpp/src/qpid/posix/EventChannel.cpp | 325 |
1 files changed, 325 insertions, 0 deletions
diff --git a/cpp/src/qpid/posix/EventChannel.cpp b/cpp/src/qpid/posix/EventChannel.cpp new file mode 100644 index 0000000000..9d0819f206 --- /dev/null +++ b/cpp/src/qpid/posix/EventChannel.cpp @@ -0,0 +1,325 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <mqueue.h> +#include <string.h> +#include <iostream> + +#include <sys/errno.h> +#include <sys/socket.h> +#include <sys/epoll.h> + +#include <typeinfo> +#include <iostream> +#include <queue> + +#include <boost/ptr_container/ptr_map.hpp> +#include <boost/current_function.hpp> + +#include <qpid/QpidError.h> +#include <qpid/sys/Monitor.h> + +#include "check.h" +#include "EventChannel.h" + +using namespace std; + + +// Convenience template to zero out a struct. +template <class S> struct ZeroStruct : public S { + ZeroStruct() { memset(this, 0, sizeof(*this)); } +}; + +namespace qpid { +namespace sys { + + +/** + * EventHandler wraps an epoll file descriptor. Acts as private + * interface between EventChannel and subclasses. + * + * Also implements Event interface for events that are not associated + * with a file descriptor and are passed via the message queue. + */ +class EventHandler : public Event, private Monitor +{ + public: + EventHandler(int epollSize = 256); + ~EventHandler(); + + int getEpollFd() { return epollFd; } + void epollAdd(int fd, uint32_t epollEvents, Event* event); + void epollMod(int fd, uint32_t epollEvents, Event* event); + void epollDel(int fd); + + void mqPut(Event* event); + Event* mqGet(); + + protected: + // Should never be called, only complete. + void prepare(EventHandler&) { assert(0); } + Event* complete(EventHandler& eh); + + private: + int epollFd; + std::string mqName; + int mqFd; + std::queue<Event*> mqEvents; +}; + +EventHandler::EventHandler(int epollSize) +{ + epollFd = epoll_create(epollSize); + if (epollFd < 0) throw QPID_POSIX_ERROR(errno); + + // Create a POSIX message queue for non-fd events. + // We write one byte and never read it is always ready for read + // when we add it to epoll. + // + ZeroStruct<struct mq_attr> attr; + attr.mq_maxmsg = 1; + attr.mq_msgsize = 1; + do { + char tmpnam[L_tmpnam]; + tmpnam_r(tmpnam); + mqName = tmpnam + 4; // Skip "tmp/" + mqFd = mq_open( + mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr); + if (mqFd < 0) throw QPID_POSIX_ERROR(errno); + } while (mqFd == EEXIST); // Name already taken, try again. + + static char zero = '\0'; + mq_send(mqFd, &zero, 1, 0); + epollAdd(mqFd, 0, this); +} + +EventHandler::~EventHandler() { + mq_close(mqFd); + mq_unlink(mqName.c_str()); +} + +void EventHandler::mqPut(Event* event) { + ScopedLock l(*this); + assert(event != 0); + mqEvents.push(event); + epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); +} + +Event* EventHandler::mqGet() { + ScopedLock l(*this); + if (mqEvents.empty()) + return 0; + Event* event = mqEvents.front(); + mqEvents.pop(); + if(!mqEvents.empty()) + epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); + return event; +} + +void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event) +{ + ZeroStruct<struct epoll_event> ee; + ee.data.ptr = event; + ee.events = epollEvents; + if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) + throw QPID_POSIX_ERROR(errno); +} + +void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event) +{ + ZeroStruct<struct epoll_event> ee; + ee.data.ptr = event; + ee.events = epollEvents; + if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) + throw QPID_POSIX_ERROR(errno); +} + +void EventHandler::epollDel(int fd) { + if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0) + throw QPID_POSIX_ERROR(errno); +} + +Event* EventHandler::complete(EventHandler& eh) +{ + assert(&eh == this); + Event* event = mqGet(); + return event==0 ? 0 : event->complete(eh); +} + +// ================================================================ +// EventChannel + +EventChannel::shared_ptr EventChannel::create() { + return shared_ptr(new EventChannel()); +} + +EventChannel::EventChannel() : handler(new EventHandler()) {} + +EventChannel::~EventChannel() {} + +void EventChannel::postEvent(Event& e) +{ + e.prepare(*handler); +} + +Event* EventChannel::getEvent() +{ + static const int infiniteTimeout = -1; + ZeroStruct<struct epoll_event> epollEvent; + + // Loop until we can complete the event. Some events may re-post + // themselves and return 0 from complete, e.g. partial reads. // + Event* event = 0; + while (event == 0) { + int eventCount = epoll_wait(handler->getEpollFd(), + &epollEvent, 1, infiniteTimeout); + if (eventCount < 0) { + if (errno != EINTR) { + // TODO aconway 2006-11-28: Proper handling/logging of errors. + cerr << BOOST_CURRENT_FUNCTION << " ignoring error " + << PosixError::getMessage(errno) << endl; + assert(0); + } + } + else if (eventCount == 1) { + event = reinterpret_cast<Event*>(epollEvent.data.ptr); + assert(event != 0); + try { + event = event->complete(*handler); + } + catch (const Exception& e) { + if (event) + event->setError(e); + } + catch (const std::exception& e) { + if (event) + event->setError(e); + } + } + } + return event; +} + +Event::~Event() {} + +void Event::prepare(EventHandler& handler) +{ + handler.mqPut(this); +} + +bool Event::hasError() const { + return error; +} + +void Event::throwIfError() throw (Exception) { + if (hasError()) + error.throwSelf(); +} + +Event* Event::complete(EventHandler&) +{ + return this; +} + +void Event::dispatch() +{ + try { + if (!callback.empty()) + callback(); + } catch (const std::exception&) { + throw; + } catch (...) { + throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception."); + } +} + +void Event::setError(const ExceptionHolder& e) { + error = e; +} + +void ReadEvent::prepare(EventHandler& handler) +{ + handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +} + +ssize_t ReadEvent::doRead() { + ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received, + size - received); + if (n > 0) received += n; + return n; +} + +Event* ReadEvent::complete(EventHandler& handler) +{ + // Read as much as possible without blocking. + ssize_t n = doRead(); + while (n > 0 && received < size) doRead(); + + if (received == size) { + handler.epollDel(descriptor); + received = 0; // Reset for re-use. + return this; + } + else if (n <0 && (errno == EAGAIN)) { + // Keep polling for more. + handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this); + return 0; + } + else { + // Unexpected EOF or error. Throw ENODATA for EOF. + handler.epollDel(descriptor); + received = 0; // Reset for re-use. + throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA); + } +} + +void WriteEvent::prepare(EventHandler& handler) +{ + handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this); +} + +Event* WriteEvent::complete(EventHandler& handler) +{ + ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written, + size - written); + if (n < 0) throw QPID_POSIX_ERROR(errno); + written += n; + if(written < size) { + // Keep polling. + handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this); + return 0; + } + written = 0; // Reset for re-use. + handler.epollDel(descriptor); + return this; +} + +void AcceptEvent::prepare(EventHandler& handler) +{ + handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +} + +Event* AcceptEvent::complete(EventHandler& handler) +{ + handler.epollDel(descriptor); + accepted = ::accept(descriptor, 0, 0); + if (accepted < 0) throw QPID_POSIX_ERROR(errno); + return this; +} + +}} |