summaryrefslogtreecommitdiff
path: root/qpid/cpp/lib/common/sys/posix/EventChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/lib/common/sys/posix/EventChannel.cpp')
-rw-r--r--qpid/cpp/lib/common/sys/posix/EventChannel.cpp325
1 files changed, 0 insertions, 325 deletions
diff --git a/qpid/cpp/lib/common/sys/posix/EventChannel.cpp b/qpid/cpp/lib/common/sys/posix/EventChannel.cpp
deleted file mode 100644
index 16c7ec9c3f..0000000000
--- a/qpid/cpp/lib/common/sys/posix/EventChannel.cpp
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <mqueue.h>
-#include <string.h>
-#include <iostream>
-
-#include <sys/errno.h>
-#include <sys/socket.h>
-#include <sys/epoll.h>
-
-#include <typeinfo>
-#include <iostream>
-#include <queue>
-
-#include <boost/ptr_container/ptr_map.hpp>
-#include <boost/current_function.hpp>
-
-#include <QpidError.h>
-#include <sys/Monitor.h>
-
-#include "check.h"
-#include "EventChannel.h"
-
-using namespace std;
-
-
-// Convenience template to zero out a struct.
-template <class S> struct ZeroStruct : public S {
- ZeroStruct() { memset(this, 0, sizeof(*this)); }
-};
-
-namespace qpid {
-namespace sys {
-
-
-/**
- * EventHandler wraps an epoll file descriptor. Acts as private
- * interface between EventChannel and subclasses.
- *
- * Also implements Event interface for events that are not associated
- * with a file descriptor and are passed via the message queue.
- */
-class EventHandler : public Event, private Monitor
-{
- public:
- EventHandler(int epollSize = 256);
- ~EventHandler();
-
- int getEpollFd() { return epollFd; }
- void epollAdd(int fd, uint32_t epollEvents, Event* event);
- void epollMod(int fd, uint32_t epollEvents, Event* event);
- void epollDel(int fd);
-
- void mqPut(Event* event);
- Event* mqGet();
-
- protected:
- // Should never be called, only complete.
- void prepare(EventHandler&) { assert(0); }
- Event* complete(EventHandler& eh);
-
- private:
- int epollFd;
- std::string mqName;
- int mqFd;
- std::queue<Event*> mqEvents;
-};
-
-EventHandler::EventHandler(int epollSize)
-{
- epollFd = epoll_create(epollSize);
- if (epollFd < 0) throw QPID_POSIX_ERROR(errno);
-
- // Create a POSIX message queue for non-fd events.
- // We write one byte and never read it is always ready for read
- // when we add it to epoll.
- //
- ZeroStruct<struct mq_attr> attr;
- attr.mq_maxmsg = 1;
- attr.mq_msgsize = 1;
- do {
- char tmpnam[L_tmpnam];
- tmpnam_r(tmpnam);
- mqName = tmpnam + 4; // Skip "tmp/"
- mqFd = mq_open(
- mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr);
- if (mqFd < 0) throw QPID_POSIX_ERROR(errno);
- } while (mqFd == EEXIST); // Name already taken, try again.
-
- static char zero = '\0';
- mq_send(mqFd, &zero, 1, 0);
- epollAdd(mqFd, 0, this);
-}
-
-EventHandler::~EventHandler() {
- mq_close(mqFd);
- mq_unlink(mqName.c_str());
-}
-
-void EventHandler::mqPut(Event* event) {
- ScopedLock l(*this);
- assert(event != 0);
- mqEvents.push(event);
- epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
-}
-
-Event* EventHandler::mqGet() {
- ScopedLock l(*this);
- if (mqEvents.empty())
- return 0;
- Event* event = mqEvents.front();
- mqEvents.pop();
- if(!mqEvents.empty())
- epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
- return event;
-}
-
-void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event)
-{
- ZeroStruct<struct epoll_event> ee;
- ee.data.ptr = event;
- ee.events = epollEvents;
- if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0)
- throw QPID_POSIX_ERROR(errno);
-}
-
-void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event)
-{
- ZeroStruct<struct epoll_event> ee;
- ee.data.ptr = event;
- ee.events = epollEvents;
- if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0)
- throw QPID_POSIX_ERROR(errno);
-}
-
-void EventHandler::epollDel(int fd) {
- if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0)
- throw QPID_POSIX_ERROR(errno);
-}
-
-Event* EventHandler::complete(EventHandler& eh)
-{
- assert(&eh == this);
- Event* event = mqGet();
- return event==0 ? 0 : event->complete(eh);
-}
-
-// ================================================================
-// EventChannel
-
-EventChannel::shared_ptr EventChannel::create() {
- return shared_ptr(new EventChannel());
-}
-
-EventChannel::EventChannel() : handler(new EventHandler()) {}
-
-EventChannel::~EventChannel() {}
-
-void EventChannel::postEvent(Event& e)
-{
- e.prepare(*handler);
-}
-
-Event* EventChannel::getEvent()
-{
- static const int infiniteTimeout = -1;
- ZeroStruct<struct epoll_event> epollEvent;
-
- // Loop until we can complete the event. Some events may re-post
- // themselves and return 0 from complete, e.g. partial reads. //
- Event* event = 0;
- while (event == 0) {
- int eventCount = epoll_wait(handler->getEpollFd(),
- &epollEvent, 1, infiniteTimeout);
- if (eventCount < 0) {
- if (errno != EINTR) {
- // TODO aconway 2006-11-28: Proper handling/logging of errors.
- cerr << BOOST_CURRENT_FUNCTION << " ignoring error "
- << PosixError::getMessage(errno) << endl;
- assert(0);
- }
- }
- else if (eventCount == 1) {
- event = reinterpret_cast<Event*>(epollEvent.data.ptr);
- assert(event != 0);
- try {
- event = event->complete(*handler);
- }
- catch (const Exception& e) {
- if (event)
- event->setError(e);
- }
- catch (const std::exception& e) {
- if (event)
- event->setError(e);
- }
- }
- }
- return event;
-}
-
-Event::~Event() {}
-
-void Event::prepare(EventHandler& handler)
-{
- handler.mqPut(this);
-}
-
-bool Event::hasError() const {
- return error;
-}
-
-void Event::throwIfError() throw (Exception) {
- if (hasError())
- error.throwSelf();
-}
-
-Event* Event::complete(EventHandler&)
-{
- return this;
-}
-
-void Event::dispatch()
-{
- try {
- if (!callback.empty())
- callback();
- } catch (const std::exception&) {
- throw;
- } catch (...) {
- throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception.");
- }
-}
-
-void Event::setError(const ExceptionHolder& e) {
- error = e;
-}
-
-void ReadEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
-}
-
-ssize_t ReadEvent::doRead() {
- ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received,
- size - received);
- if (n > 0) received += n;
- return n;
-}
-
-Event* ReadEvent::complete(EventHandler& handler)
-{
- // Read as much as possible without blocking.
- ssize_t n = doRead();
- while (n > 0 && received < size) doRead();
-
- if (received == size) {
- handler.epollDel(descriptor);
- received = 0; // Reset for re-use.
- return this;
- }
- else if (n <0 && (errno == EAGAIN)) {
- // Keep polling for more.
- handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this);
- return 0;
- }
- else {
- // Unexpected EOF or error. Throw ENODATA for EOF.
- handler.epollDel(descriptor);
- received = 0; // Reset for re-use.
- throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA);
- }
-}
-
-void WriteEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this);
-}
-
-Event* WriteEvent::complete(EventHandler& handler)
-{
- ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written,
- size - written);
- if (n < 0) throw QPID_POSIX_ERROR(errno);
- written += n;
- if(written < size) {
- // Keep polling.
- handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this);
- return 0;
- }
- written = 0; // Reset for re-use.
- handler.epollDel(descriptor);
- return this;
-}
-
-void AcceptEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
-}
-
-Event* AcceptEvent::complete(EventHandler& handler)
-{
- handler.epollDel(descriptor);
- accepted = ::accept(descriptor, 0, 0);
- if (accepted < 0) throw QPID_POSIX_ERROR(errno);
- return this;
-}
-
-}}