diff options
Diffstat (limited to 'Final/cpp/lib/common/sys/posix/EventChannel.cpp')
-rw-r--r-- | Final/cpp/lib/common/sys/posix/EventChannel.cpp | 328 |
1 files changed, 328 insertions, 0 deletions
diff --git a/Final/cpp/lib/common/sys/posix/EventChannel.cpp b/Final/cpp/lib/common/sys/posix/EventChannel.cpp new file mode 100644 index 0000000000..8b91fc1ba6 --- /dev/null +++ b/Final/cpp/lib/common/sys/posix/EventChannel.cpp @@ -0,0 +1,328 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <mqueue.h> +#include <string.h> +#include <iostream> + +#include <sys/errno.h> +#include <sys/socket.h> +#include <sys/epoll.h> + +#include <typeinfo> +#include <iostream> +#include <queue> + +#include <boost/ptr_container/ptr_map.hpp> +#include <boost/current_function.hpp> + +#include <QpidError.h> +#include <sys/Monitor.h> + +#include "check.h" +#include "EventChannel.h" + +using namespace std; + + +// Convenience template to zero out a struct. +template <class S> struct ZeroStruct : public S { + ZeroStruct() { memset(this, 0, sizeof(*this)); } +}; + +namespace qpid { +namespace sys { + + +/** + * EventHandler wraps an epoll file descriptor. Acts as private + * interface between EventChannel and subclasses. + * + * Also implements Event interface for events that are not associated + * with a file descriptor and are passed via the message queue. + */ +class EventHandler : public Event, private Monitor +{ + public: + EventHandler(int epollSize = 256); + ~EventHandler(); + + int getEpollFd() { return epollFd; } + void epollAdd(int fd, uint32_t epollEvents, Event* event); + void epollMod(int fd, uint32_t epollEvents, Event* event); + void epollDel(int fd); + + void mqPut(Event* event); + Event* mqGet(); + + protected: + // Should never be called, only complete. + void prepare(EventHandler&) { assert(0); } + Event* complete(EventHandler& eh); + + private: + int epollFd; + std::string mqName; + int mqFd; + std::queue<Event*> mqEvents; +}; + +EventHandler::EventHandler(int epollSize) +{ + epollFd = epoll_create(epollSize); + if (epollFd < 0) throw QPID_POSIX_ERROR(errno); + + // Create a POSIX message queue for non-fd events. + // We write one byte and never read it is always ready for read + // when we add it to epoll. + // + ZeroStruct<struct mq_attr> attr; + attr.mq_maxmsg = 1; + attr.mq_msgsize = 1; + do { + char tmpnam[L_tmpnam]; + tmpnam_r(tmpnam); + mqName = tmpnam + 4; // Skip "tmp/" + mqFd = mq_open( + mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr); + if (mqFd < 0) throw QPID_POSIX_ERROR(errno); + } while (mqFd == EEXIST); // Name already taken, try again. + + static char zero = '\0'; + mq_send(mqFd, &zero, 1, 0); + epollAdd(mqFd, 0, this); +} + +EventHandler::~EventHandler() { + mq_close(mqFd); + mq_unlink(mqName.c_str()); +} + +void EventHandler::mqPut(Event* event) { + ScopedLock l(*this); + assert(event != 0); + mqEvents.push(event); + epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); +} + +Event* EventHandler::mqGet() { + ScopedLock l(*this); + if (mqEvents.empty()) + return 0; + Event* event = mqEvents.front(); + mqEvents.pop(); + if(!mqEvents.empty()) + epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); + return event; +} + +void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event) +{ + ZeroStruct<struct epoll_event> ee; + ee.data.ptr = event; + ee.events = epollEvents; + if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) + throw QPID_POSIX_ERROR(errno); +} + +void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event) +{ + ZeroStruct<struct epoll_event> ee; + ee.data.ptr = event; + ee.events = epollEvents; + if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) + throw QPID_POSIX_ERROR(errno); +} + +void EventHandler::epollDel(int fd) { + if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0) + throw QPID_POSIX_ERROR(errno); +} + +Event* EventHandler::complete(EventHandler& eh) +{ + assert(&eh == this); + Event* event = mqGet(); + return event==0 ? 0 : event->complete(eh); +} + +// ================================================================ +// EventChannel + +EventChannel::shared_ptr EventChannel::create() { + return shared_ptr(new EventChannel()); +} + +EventChannel::EventChannel() : handler(new EventHandler()) {} + +EventChannel::~EventChannel() {} + +void EventChannel::postEvent(Event& e) +{ + e.prepare(*handler); +} + +Event* EventChannel::getEvent() +{ + static const int infiniteTimeout = -1; + ZeroStruct<struct epoll_event> epollEvent; + + // Loop until we can complete the event. Some events may re-post + // themselves and return 0 from complete, e.g. partial reads. // + Event* event = 0; + while (event == 0) { + int eventCount = epoll_wait(handler->getEpollFd(), + &epollEvent, 1, infiniteTimeout); + if (eventCount < 0) { + if (errno != EINTR) { + // TODO aconway 2006-11-28: Proper handling/logging of errors. + cerr << BOOST_CURRENT_FUNCTION << " ignoring error " + << PosixError::getMessage(errno) << endl; + assert(0); + } + } + else if (eventCount == 1) { + event = reinterpret_cast<Event*>(epollEvent.data.ptr); + assert(event != 0); + try { + event = event->complete(*handler); + } + catch (const Exception& e) { + if (event) + event->setError(e); + } + catch (const std::exception& e) { + if (event) + event->setError(e); + } + } + } + return event; +} + +Event::~Event() {} + +void Event::prepare(EventHandler& handler) +{ + handler.mqPut(this); +} + +bool Event::hasError() const { + return error; +} + +void Event::throwIfError() throw (Exception) { + if (hasError()) + error.throwSelf(); +} + +Event* Event::complete(EventHandler&) +{ + return this; +} + +void Event::dispatch() +{ + try { + if (!callback.empty()) + callback(); + } catch (const std::exception&) { + throw; + } catch (...) { + throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception."); + } +} + +void Event::setError(const ExceptionHolder& e) { + error = e; +} + +void ReadEvent::prepare(EventHandler& handler) +{ + handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +} + +ssize_t ReadEvent::doRead() { + ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received, + size - received); + if (n > 0) received += n; + return n; +} + +Event* ReadEvent::complete(EventHandler& handler) +{ + // Read as much as possible without blocking. + ssize_t n = doRead(); + while (n > 0 && received < size) doRead(); + + if (received == size) { + handler.epollDel(descriptor); + received = 0; // Reset for re-use. + return this; + } + else if (n <0 && (errno == EAGAIN)) { + // Keep polling for more. + handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this); + return 0; + } + else { + // Unexpected EOF or error. Throw ENODATA for EOF. + handler.epollDel(descriptor); + received = 0; // Reset for re-use. + throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA); + } +} + +void WriteEvent::prepare(EventHandler& handler) +{ + handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this); +} + +Event* WriteEvent::complete(EventHandler& handler) +{ + ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written, + size - written); + if (n < 0) throw QPID_POSIX_ERROR(errno); + written += n; + if(written < size) { + // Keep polling. + handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this); + return 0; + } + written = 0; // Reset for re-use. + handler.epollDel(descriptor); + return this; +} + +void AcceptEvent::prepare(EventHandler& handler) +{ + handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +} + +Event* AcceptEvent::complete(EventHandler& handler) +{ + handler.epollDel(descriptor); + accepted = ::accept(descriptor, 0, 0); + if (accepted < 0) throw QPID_POSIX_ERROR(errno); + return this; +} + +}} |