diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-06-18 12:11:32 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-06-18 12:11:32 +0000 |
commit | 45b526ce09daee869ec1313808583f7e05bff7bb (patch) | |
tree | 297d2b1f02b14e1fdffbc1074b3d23670859f602 /cpp/src | |
parent | 41c30308ad435c338633b97405fe7350d515f069 (diff) | |
download | qpid-python-45b526ce09daee869ec1313808583f7e05bff7bb.tar.gz |
Intermediate checkin with preliminary work on epoll based net IO
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@548337 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
22 files changed, 1609 insertions, 364 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index f04f750987..0468a2122b 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -69,6 +69,7 @@ posix_netio_hdr = \ posix_plat_src = \ qpid/sys/posix/check.cpp \ + qpid/sys/epoll/EpollPoller.cpp \ qpid/sys/posix/Socket.cpp \ qpid/sys/posix/Time.cpp \ qpid/sys/posix/Thread.cpp @@ -155,7 +156,6 @@ libqpidcommon_la_SOURCES = \ gen/qpid/framing/AMQP_MethodVersionMap.cpp \ gen/qpid/framing/AMQP_ServerProxy.cpp \ qpid/Exception.cpp \ - qpid/ExceptionHolder.cpp \ qpid/Url.h \ qpid/Url.cpp \ qpid/QpidError.cpp \ @@ -345,6 +345,7 @@ nobase_include_HEADERS = \ qpid/framing/amqp_types_full.h \ qpid/sys/Acceptor.h \ qpid/sys/AtomicCount.h \ + qpid/sys/Dispatcher.h \ qpid/sys/Condition.h \ qpid/sys/ConnectionInputHandler.h \ qpid/sys/ConnectionInputHandlerFactory.h \ @@ -352,6 +353,7 @@ nobase_include_HEADERS = \ qpid/sys/Module.h \ qpid/sys/Monitor.h \ qpid/sys/Mutex.h \ + qpid/sys/Poller.h \ qpid/sys/ProducerConsumer.h \ qpid/sys/Runnable.h \ qpid/sys/ScopedIncrement.h \ diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp index f4eeb7931b..7e34e49bef 100644 --- a/cpp/src/qpid/Exception.cpp +++ b/cpp/src/qpid/Exception.cpp @@ -42,13 +42,15 @@ Exception::Exception(const std::string& str) throw() Exception::Exception(const char* str) throw() : whatStr(str) { ctorLog(this); } +Exception::Exception(const std::exception& e) throw() : whatStr(e.what()) {} + Exception::~Exception() throw() {} const char* Exception::what() const throw() { return whatStr.c_str(); } std::string Exception::toString() const throw() { return whatStr; } -Exception* Exception::clone() const throw() { return new Exception(*this); } +Exception::auto_ptr Exception::clone() const throw() { return Exception::auto_ptr(new Exception(*this)); } void Exception::throwSelf() const { throw *this; } @@ -56,4 +58,18 @@ ShutdownException::ShutdownException() : Exception("Shut down.") {} EmptyException::EmptyException() : Exception("Empty.") {} +const char* Exception::defaultMessage = "Unexpected exception"; + +void Exception::log(const char* what, const char* message) { + QPID_LOG(error, message << ": " << what); +} + +void Exception::log(const std::exception& e, const char* message) { + log(e.what(), message); +} + +void Exception::logUnknown(const char* message) { + log("unknown exception.", message); +} + } // namespace qpid diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h index 13583042a8..24f0efd16b 100644 --- a/cpp/src/qpid/Exception.h +++ b/cpp/src/qpid/Exception.h @@ -21,13 +21,15 @@ * under the License. * */ + +#include "framing/amqp_types.h" + #include <exception> #include <string> #include <memory> #include <boost/shared_ptr.hpp> #include <boost/lexical_cast.hpp> - -#include "framing/amqp_types.h" +#include <boost/function.hpp> namespace qpid { @@ -44,6 +46,10 @@ class Exception : public std::exception std::string whatStr; public: + typedef boost::shared_ptr<Exception> shared_ptr; + typedef boost::shared_ptr<const Exception> shared_ptr_const; + typedef std::auto_ptr<Exception> auto_ptr; + Exception() throw(); Exception(const std::string& str) throw(); Exception(const char* str) throw(); @@ -59,10 +65,61 @@ class Exception : public std::exception virtual const char* what() const throw(); virtual std::string toString() const throw(); - virtual Exception* clone() const throw(); + virtual auto_ptr clone() const throw(); virtual void throwSelf() const; - typedef boost::shared_ptr<Exception> shared_ptr; + /** Default message: "Unknown exception" or something like it. */ + static const char* defaultMessage; + + /** + * Log a message of the form "message: what" + *@param what Exception's what() message. + *@param message Prefix message. + */ + static void log(const char* what, const char* message = defaultMessage); + + /** + * Log an exception. + *@param e Exception to log. + + */ + static void log( + const std::exception& e, const char* message = defaultMessage); + + + /** + * Log an unknown exception - use in catch(...) + *@param message Prefix message. + */ + static void logUnknown(const char* message = defaultMessage); + + /** + * Wrapper template function to call another function inside + * try/catch and log any exception. Use boost::bind to wrap + * member function calls or functions with arguments. + * + *@param f Function to call in try block. + *@param retrhow If true the exception is rethrown. + *@param message Prefix message. + */ + template <class T> + static T tryCatchLog(boost::function0<T> f, bool rethrow=true, + const char* message=defaultMessage) + { + try { + return f(); + } + catch (const std::exception& e) { + log(e, message); + if (rethrow) + throw; + } + catch (...) { + logUnknown(message); + if (rethrow) + throw; + } + } }; struct ChannelException : public Exception { diff --git a/cpp/src/qpid/QpidError.cpp b/cpp/src/qpid/QpidError.cpp index 365c84ea1d..fcd5af47a5 100644 --- a/cpp/src/qpid/QpidError.cpp +++ b/cpp/src/qpid/QpidError.cpp @@ -30,7 +30,7 @@ QpidError::QpidError() : code(0) {} QpidError::~QpidError() throw() {} -Exception* QpidError::clone() const throw() { return new QpidError(*this); } +Exception::auto_ptr QpidError::clone() const throw() { return Exception::auto_ptr(new QpidError(*this)); } void QpidError::throwSelf() const { throw *this; } diff --git a/cpp/src/qpid/QpidError.h b/cpp/src/qpid/QpidError.h index 24c2b74dde..dea0902a0e 100644 --- a/cpp/src/qpid/QpidError.h +++ b/cpp/src/qpid/QpidError.h @@ -52,7 +52,7 @@ class QpidError : public Exception { init(); } ~QpidError() throw(); - Exception* clone() const throw(); + Exception::auto_ptr clone() const throw(); void throwSelf() const; private: diff --git a/cpp/src/qpid/log/Statement.h b/cpp/src/qpid/log/Statement.h index a04755012b..563da3716c 100644 --- a/cpp/src/qpid/log/Statement.h +++ b/cpp/src/qpid/log/Statement.h @@ -19,7 +19,6 @@ * */ -#include "qpid/log/Statement.h" #include <boost/current_function.hpp> #include <sstream> diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp new file mode 100644 index 0000000000..4838e5e4cd --- /dev/null +++ b/cpp/src/qpid/sys/Dispatcher.cpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Dispatcher.h" + +#include <assert.h> + +namespace qpid { +namespace sys { + +Dispatcher::Dispatcher(Poller::shared_ptr poller0) : + poller(poller0) { +} + +Dispatcher::~Dispatcher() { +} + +void Dispatcher::run() { + do { + Poller::Event event = poller->wait(); + // Poller::wait guarantees to return an event + DispatchHandle* h = static_cast<DispatchHandle*>(event.handle); + switch (event.dir) { + case Poller::IN: + h->readableCallback(*h); + break; + case Poller::OUT: + h->writableCallback(*h); + break; + case Poller::INOUT: + h->readableCallback(*h); + h->writableCallback(*h); + break; + case Poller::SHUTDOWN: + goto dispatcher_shutdown; + default: + ; + } + } while (true); + +dispatcher_shutdown: + ; +} + +void DispatchHandle::watch(Poller::shared_ptr poller0) { + bool r = readableCallback; + bool w = writableCallback; + + // If no callbacks set then do nothing (that is what we were asked to do!) + // TODO: Maybe this should be an assert instead + if (!r && !w) + return; + + Poller::Direction d = r ? + (w ? Poller::INOUT : Poller::IN) : + Poller::OUT; + + poller = poller0; + poller->addFd(*this, d); +} + +void DispatchHandle::rewatch() { + assert(poller); + poller->rearmFd(*this); +} + +void DispatchHandle::unwatch() { + poller->delFd(*this); + poller.reset(); +} + +}} diff --git a/cpp/src/qpid/sys/Dispatcher.h b/cpp/src/qpid/sys/Dispatcher.h new file mode 100644 index 0000000000..c0b010eb39 --- /dev/null +++ b/cpp/src/qpid/sys/Dispatcher.h @@ -0,0 +1,74 @@ +#ifndef _sys_Dispatcher_h +#define _sys_Dispatcher_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include "Poller.h" +#include "Runnable.h" + +#include <memory> +#include <boost/function.hpp> + +#include <assert.h> + + +namespace qpid { +namespace sys { + +class Dispatcher; +class DispatchHandle : public PollerHandle { + friend class Dispatcher; +public: + typedef boost::function1<void, DispatchHandle&> Callback; + +private: + Callback readableCallback; + Callback writableCallback; + Poller::shared_ptr poller; + +public: + + DispatchHandle(int fd, Callback rCb, Callback wCb) : + PollerHandle(fd), + readableCallback(rCb), + writableCallback(wCb) + {} + + void watch(Poller::shared_ptr poller); + void rewatch(); + void unwatch(); +}; + +class Dispatcher : public Runnable { + const Poller::shared_ptr poller; + +public: + Dispatcher(Poller::shared_ptr poller); + ~Dispatcher(); + + void run(); +}; + +}} + +#endif // _sys_Dispatcher_h diff --git a/cpp/src/qpid/sys/Poller.h b/cpp/src/qpid/sys/Poller.h new file mode 100644 index 0000000000..6fedd669a0 --- /dev/null +++ b/cpp/src/qpid/sys/Poller.h @@ -0,0 +1,93 @@ +#ifndef _sys_Poller_h +#define _sys_Poller_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Time.h" + +#include <stdint.h> + +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace sys { + +/** + * Handle class to use for polling + */ +class Poller; +class PollerHandlePrivate; +class PollerHandle { + friend class Poller; + + PollerHandlePrivate* impl; + const int fd; + +public: + PollerHandle(int fd0); + virtual ~PollerHandle(); + + int getFD() const { return fd; } +}; + +/** + * Poller: abstract class to encapsulate a file descriptor poll to be used + * by a reactor + */ +class PollerPrivate; +class Poller { + PollerPrivate* impl; + +public: + typedef boost::shared_ptr<Poller> shared_ptr; + + enum Direction { + NONE, + IN, + OUT, + INOUT, + SHUTDOWN + }; + + struct Event { + PollerHandle* handle; + Direction dir; + + Event(PollerHandle* handle0, Direction dir0) : + handle(handle0), + dir(dir0) { + } + }; + + Poller(); + ~Poller(); + void shutdown(); + + void addFd(PollerHandle& handle, Direction dir); + void delFd(PollerHandle& handle); + void modFd(PollerHandle& handle, Direction dir); + void rearmFd(PollerHandle& handle); + Event wait(Duration timeout = TIME_INFINITE); +}; + +}} +#endif // _sys_Poller_h diff --git a/cpp/src/qpid/sys/ScopedIncrement.h b/cpp/src/qpid/sys/ScopedIncrement.h index f14461ddaf..ba9e89ba5f 100644 --- a/cpp/src/qpid/sys/ScopedIncrement.h +++ b/cpp/src/qpid/sys/ScopedIncrement.h @@ -20,19 +20,27 @@ */ #include <boost/noncopyable.hpp> +#include <boost/function.hpp> namespace qpid { namespace sys { -/** Increment counter in constructor and decrement in destructor. */ +/** + * Increment counter in constructor and decrement in destructor. + * Optionally call a function if the decremented counter value is 0. + * Note the function must not throw, it is called in the destructor. + */ template <class T> class ScopedIncrement : boost::noncopyable { public: - ScopedIncrement(T& c) : count(c) { ++count; } - ~ScopedIncrement() { --count; } + ScopedIncrement(T& c, boost::function0<void> f=0) + : count(c), callback(f) { ++count; } + ~ScopedIncrement() { if (--count == 0 && callback) callback(); } + private: T& count; + boost::function0<void> callback; }; diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h index 4bb65e9f4a..25b1606844 100644 --- a/cpp/src/qpid/sys/Time.h +++ b/cpp/src/qpid/sys/Time.h @@ -97,8 +97,12 @@ const Duration TIME_USEC = 1000; /** Nanoseconds per nanosecond. */ const Duration TIME_NSEC = 1; +/** Value to represent an infinite timeout */ +const Duration TIME_INFINITE = std::numeric_limits<int64_t>::max(); + /** Time greater than any other time */ const AbsTime FAR_FUTURE = AbsTime::FarFuture(); + }} #endif /*!_sys_Time_h*/ diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp new file mode 100644 index 0000000000..65b2255023 --- /dev/null +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Poller.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/posix/check.h" + +#include <sys/epoll.h> +#include <errno.h> + +#include <assert.h> +#include <vector> +#include <exception> + +namespace qpid { +namespace sys { + +class PollerHandlePrivate { + friend class Poller; + friend class PollerHandle; + + enum FDStat { + ABSENT, + MONITORED, + INACTIVE + }; + + ::__uint32_t events; + FDStat stat; + Mutex lock; + + PollerHandlePrivate() : + events(0), + stat(ABSENT) { + } +}; + +PollerHandle::PollerHandle(int fd0) : + impl(new PollerHandlePrivate), + fd(fd0) +{} + +PollerHandle::~PollerHandle() { + delete impl; +} + +/** + * Concrete implementation of Poller to use the Linux specific epoll + * interface + */ +class PollerPrivate { + friend class Poller; + + static const int DefaultFds = 256; + + struct ReadablePipe { + int fds[2]; + + /** + * This encapsulates an always readable pipe which we can add + * to the epoll set to force epoll_wait to return + */ + ReadablePipe() { + QPID_POSIX_CHECK(::pipe(fds)); + // Just write the pipe's fds to the pipe + QPID_POSIX_CHECK(::write(fds[1], fds, 2)); + } + + ~ReadablePipe() { + ::close(fds[0]); + ::close(fds[1]); + } + + int getFD() { + return fds[0]; + } + }; + + static ReadablePipe alwaysReadable; + + const int epollFd; + bool isShutdown; + + static ::__uint32_t directionToEpollEvent(Poller::Direction dir) { + switch (dir) { + case Poller::IN: return ::EPOLLIN; + case Poller::OUT: return ::EPOLLOUT; + case Poller::INOUT: return ::EPOLLIN | ::EPOLLOUT; + default: return 0; + } + } + + static Poller::Direction epollToDirection(::__uint32_t events) { + ::__uint32_t e = events & (::EPOLLIN | ::EPOLLOUT); + switch (e) { + case ::EPOLLIN: return Poller::IN; + case ::EPOLLOUT: return Poller::OUT; + case ::EPOLLIN | ::EPOLLOUT: return Poller::INOUT; + default: return Poller::NONE; + } + } + + PollerPrivate() : + epollFd(::epoll_create(DefaultFds)), + isShutdown(false) { + QPID_POSIX_CHECK(epollFd); + } + + ~PollerPrivate() { + // It's probably okay to ignore any errors here as there can't be data loss + ::close(epollFd); + } +}; + +PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable; + +void Poller::addFd(PollerHandle& handle, Direction dir) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + ::epoll_event epe; + int op; + + if (eh.stat == PollerHandlePrivate::ABSENT) { + op = EPOLL_CTL_ADD; + epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; + } else { + assert(eh.stat == PollerHandlePrivate::MONITORED); + op = EPOLL_CTL_MOD; + epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir); + } + epe.data.ptr = &handle; + + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, handle.getFD(), &epe)); + + // Record monitoring state of this fd + eh.events = epe.events; + eh.stat = PollerHandlePrivate::MONITORED; +} + +void Poller::delFd(PollerHandle& handle) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(eh.stat != PollerHandlePrivate::ABSENT); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0)); + eh.stat = PollerHandlePrivate::ABSENT; +} + +// modFd is equivalent to delFd followed by addFd +void Poller::modFd(PollerHandle& handle, Direction dir) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(eh.stat != PollerHandlePrivate::ABSENT); + + ::epoll_event epe; + epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; + epe.data.ptr = &handle; + + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe)); + + // Record monitoring state of this fd + eh.events = epe.events; + eh.stat = PollerHandlePrivate::MONITORED; +} + +void Poller::rearmFd(PollerHandle& handle) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(eh.stat == PollerHandlePrivate::INACTIVE); + + ::epoll_event epe; + epe.events = eh.events; + epe.data.ptr = &handle; + + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe)); + + eh.stat = PollerHandlePrivate::MONITORED; +} + +void Poller::shutdown() { + // Don't use any locking here - isshutdown will be visible to all + // after the epoll_ctl() anyway (it's a memory barrier) + impl->isShutdown = true; + + // Add always readable fd to epoll (not EPOLLONESHOT) + int fd = impl->alwaysReadable.getFD(); + ::epoll_event epe; + epe.events = ::EPOLLIN; + epe.data.ptr = 0; + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, fd, &epe)); +} + +Poller::Event Poller::wait(Duration timeout) { + epoll_event epe; + int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC; + + // Repeat until we weren't interupted + do { + int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs); + + if (impl->isShutdown) { + return Event(0, SHUTDOWN); + } + + if (rc ==-1 && errno != EINTR) { + QPID_POSIX_CHECK(rc); + } else if (rc > 0) { + assert(rc == 1); + PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr); + PollerHandlePrivate& eh = *handle->impl; + + ScopedLock<Mutex> l(eh.lock); + + // the handle could have gone inactive since we left the epoll_wait + if (eh.stat == PollerHandlePrivate::MONITORED) { + eh.stat = PollerHandlePrivate::INACTIVE; + return Event(handle, PollerPrivate::epollToDirection(epe.events)); + } + } + // We only get here if one of the following: + // * epoll_wait was interrupted by a signal + // * epoll_wait timed out + // * the state of the handle changed after being returned by epoll_wait + // + // The only things we can do here are return a timeout or wait more. + // Obviously if we timed out we return timeout; if the wait was meant to + // be indefinite then we should never return with a time out so we go again. + // If the wait wasn't indefinite, but we were interrupted then we have to return + // with a timeout as we don't know how long we've waited so far and so we can't + // continue the wait. + if (rc == 0 || timeoutMs == -1) { + return Event(0, NONE); + } + } while (true); +} + +// Concrete constructors +Poller::Poller() : + impl(new PollerPrivate()) +{} + +Poller::~Poller() { + delete impl; +} + +}} diff --git a/cpp/src/qpid/sys/posix/EventChannel.cpp b/cpp/src/qpid/sys/posix/EventChannel.cpp index 6db397a165..d35eedf5a5 100644 --- a/cpp/src/qpid/sys/posix/EventChannel.cpp +++ b/cpp/src/qpid/sys/posix/EventChannel.cpp @@ -1,4 +1,4 @@ -/* +/* * * Copyright (c) 2006 The Apache Software Foundation * @@ -16,6 +16,19 @@ * */ +// TODO aconway 2006-12-15: Locking review. + +// TODO aconway 2006-12-15: use Descriptor pointers everywhere, +// get them from channel, pass them to Event constructors. +// Eliminate lookup. + + +#include "EventChannel.h" +#include "check.h" + +#include "qpid/QpidError.h" +#include "qpid/sys/AtomicCount.h" + #include <mqueue.h> #include <string.h> #include <iostream> @@ -29,139 +42,420 @@ #include <queue> #include <boost/ptr_container/ptr_map.hpp> -#include <boost/current_function.hpp> - -#include "qpid/QpidError.h" -#include "qpid/sys/Monitor.h" -#include "qpid/log/Statement.h" - -#include "check.h" -#include "EventChannel.h" +#include <boost/noncopyable.hpp> +#include <boost/bind.hpp> using namespace std; -// Convenience template to zero out a struct. -template <class S> struct ZeroStruct : public S { - ZeroStruct() { memset(this, 0, sizeof(*this)); } -}; - namespace qpid { namespace sys { +// ================================================================ +// Private class declarations + +namespace { + +typedef enum { IN, OUT } Direction; + +typedef std::pair<Event*, Event*> EventPair; + /** - * EventHandler wraps an epoll file descriptor. Acts as private - * interface between EventChannel and subclasses. - * - * Also implements Event interface for events that are not associated - * with a file descriptor and are passed via the message queue. - */ -class EventHandler : public Event, private Monitor + * Template to zero out a C-struct on construction. Avoids uninitialized memory + * warnings from valgrind or other mem checking tool. + */ +template <class T> struct CleanStruct : public T { + CleanStruct() { memset(this, 0, sizeof(*this)); } +}; + +} // namespace + +/** + * Queue of events corresponding to one IO direction (IN or OUT). + * Each Descriptor contains two Queues. + */ +class EventChannel::Queue : private boost::noncopyable { public: - EventHandler(int epollSize = 256); - ~EventHandler(); + Queue(Descriptor& container, Direction dir); - int getEpollFd() { return epollFd; } - void epollAdd(int fd, uint32_t epollEvents, Event* event); - void epollMod(int fd, uint32_t epollEvents, Event* event); - void epollDel(int fd); + /** Called by Event classes in prepare() */ + void push(Event* e); - void mqPut(Event* event); - Event* mqGet(); - - protected: - // Should never be called, only complete. - void prepare(EventHandler&) { assert(0); } - Event* complete(EventHandler& eh); + /** Called when epoll wakes. + *@return The next completed event or 0. + */ + Event* wake(uint32_t epollFlags); + + Event* pop() { Event* e = queue.front(); queue.pop_front(); return e; } + + bool empty() { return queue.empty(); } + + void setBit(uint32_t &epollFlags); + + void shutdown(); private: + typedef std::deque<Event*> EventQ; + + inline bool isMyEvent(uint32_t flags) { return flags | myEvent; } + + Mutex& lock; // Shared with Descriptor. + Descriptor& descriptor; + uint32_t myEvent; // Epoll event flag. + EventQ queue; +}; + + +/** + * Manages a file descriptor in an epoll set. + * + * Can be shutdown and re-activated for the same file descriptor. + */ +class EventChannel::Descriptor : private boost::noncopyable { + public: + explicit Descriptor(int fd) : epollFd(-1), myFd(fd), + inQueue(*this, IN), outQueue(*this, OUT) {} + + void activate(int epollFd_); + + /** Epoll woke up for this descriptor. */ + Event* wake(uint32_t epollEvents); + + /** Shut down: close and remove file descriptor. + * May be re-activated if fd is reused. + */ + void shutdown(); + + // TODO aconway 2006-12-18: Nasty. Need to clean up interaction. + void shutdownUnsafe(); + + bool isShutdown() { return epollFd == -1; } + + Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; } + int getFD() const { return myFd; } + + private: + void update(); + void epollCtl(int op, uint32_t events); + Queue* pick(); + + Mutex lock; int epollFd; - std::string mqName; - int mqFd; - std::queue<Event*> mqEvents; + int myFd; + Queue inQueue, outQueue; + bool preferIn; + + friend class Queue; }; -EventHandler::EventHandler(int epollSize) -{ - epollFd = epoll_create(epollSize); - if (epollFd < 0) throw QPID_POSIX_ERROR(errno); + +/** + * Holds a map of Descriptors, which do most of the work. + */ +class EventChannel::Impl { + public: + Impl(int size = 256); + + ~Impl(); + + /** + * Activate descriptor + */ + void activate(Descriptor& d) { + d.activate(epollFd); + } + + /** Wait for an event, return 0 on timeout */ + Event* wait(Duration timeout); - // Create a POSIX message queue for non-fd events. - // We write one byte and never read it is always ready for read - // when we add it to epoll. + void shutdown(); + + private: + + Monitor monitor; + int epollFd; + int shutdownPipe[2]; + AtomicCount nWaiters; + bool isShutdown; +}; + + +// ================================================================ +// EventChannel::Queue::implementation. + +static const char* shutdownMsg = "Event queue shut down."; + +EventChannel::Queue::Queue(Descriptor& d, Direction dir) : + lock(d.lock), descriptor(d), + myEvent(dir==IN ? EPOLLIN : EPOLLOUT) +{} + +void EventChannel::Queue::push(Event* e) { + Mutex::ScopedLock l(lock); + if (descriptor.isShutdown()) + THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg); + queue.push_back(e); + descriptor.update(); +} + +void EventChannel::Queue::setBit(uint32_t &epollFlags) { + if (queue.empty()) + epollFlags &= ~myEvent; + else + epollFlags |= myEvent; +} + +// TODO aconway 2006-12-20: REMOVE +Event* EventChannel::Queue::wake(uint32_t epollFlags) { + // Called with lock held. + if (!queue.empty() && (isMyEvent(epollFlags))) { + assert(!queue.empty()); + Event* e = queue.front(); + assert(e); + if (!e->getException()) { + // TODO aconway 2006-12-20: Can/should we move event completion + // out into dispatch() so it doesn't happen in Descriptor locks? + e->complete(descriptor); + } + queue.pop_front(); + return e; + } + return 0; +} + +void EventChannel::Queue::shutdown() { + // Mark all pending events with a shutdown exception. + // The server threads will remove and dispatch the events. // - ZeroStruct<struct mq_attr> attr; - attr.mq_maxmsg = 1; - attr.mq_msgsize = 1; - do { - char tmpnam[L_tmpnam]; - tmpnam_r(tmpnam); - mqName = tmpnam + 4; // Skip "tmp/" - mqFd = mq_open( - mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr); - if (mqFd < 0) throw QPID_POSIX_ERROR(errno); - } while (mqFd == EEXIST); // Name already taken, try again. + qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, SRCLINE); + for_each(queue.begin(), queue.end(), + boost::bind(&Event::setException, _1, ex)); +} - static char zero = '\0'; - mq_send(mqFd, &zero, 1, 0); - epollAdd(mqFd, 0, this); + +// ================================================================ +// Descriptor + + +void EventChannel::Descriptor::activate(int epollFd_) { + Mutex::ScopedLock l(lock); + if (isShutdown()) { + epollFd = epollFd_; // We're back in business. + epollCtl(EPOLL_CTL_ADD, 0); + } } -EventHandler::~EventHandler() { - mq_close(mqFd); - mq_unlink(mqName.c_str()); +void EventChannel::Descriptor::shutdown() { + Mutex::ScopedLock l(lock); + shutdownUnsafe(); } -void EventHandler::mqPut(Event* event) { - ScopedLock l(*this); - assert(event != 0); - mqEvents.push(event); - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); +void EventChannel::Descriptor::shutdownUnsafe() { + // Caller holds lock. + ::close(myFd); + epollFd = -1; // Mark myself as shutdown. + inQueue.shutdown(); + outQueue.shutdown(); +} + +// TODO aconway 2006-12-20: Inline into wake(). +void EventChannel::Descriptor::update() { + // Caller holds lock. + if (isShutdown()) // Nothing to do + return; + uint32_t events = EPOLLONESHOT | EPOLLERR | EPOLLHUP; + inQueue.setBit(events); + outQueue.setBit(events); + epollCtl(EPOLL_CTL_MOD, events); +} + +void EventChannel::Descriptor::epollCtl(int op, uint32_t events) { + // Caller holds lock + assert(!isShutdown()); + CleanStruct<epoll_event> ee; + ee.data.ptr = this; + ee.events = events; + int status = ::epoll_ctl(epollFd, op, myFd, &ee); + if (status < 0) { + if (errno == EEXIST) // It's okay to add an existing fd + return; + else if (errno == EBADF) // FD was closed externally. + shutdownUnsafe(); + else + throw QPID_POSIX_ERROR(errno); + } } + -Event* EventHandler::mqGet() { - ScopedLock l(*this); - if (mqEvents.empty()) +EventChannel::Queue* EventChannel::Descriptor::pick() { + if (inQueue.empty() && outQueue.empty()) return 0; - Event* event = mqEvents.front(); - mqEvents.pop(); - if(!mqEvents.empty()) - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); - return event; + if (inQueue.empty() || outQueue.empty()) + return !inQueue.empty() ? &inQueue : &outQueue; + // Neither is empty, pick fairly. + preferIn = !preferIn; + return preferIn ? &inQueue : &outQueue; } -void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event) -{ - ZeroStruct<struct epoll_event> ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) - throw QPID_POSIX_ERROR(errno); +Event* EventChannel::Descriptor::wake(uint32_t epollEvents) { + Mutex::ScopedLock l(lock); + // On error, shut down the Descriptor and both queues. + if (epollEvents & (EPOLLERR | EPOLLHUP)) { + shutdownUnsafe(); + // TODO aconway 2006-12-20: This error handling models means + // that any error reported by epoll will result in a shutdown + // exception on the events. Can we get more accurate error + // reporting somehow? + } + Queue*q = 0; + bool in = (epollEvents & EPOLLIN); + bool out = (epollEvents & EPOLLOUT); + if ((in && out) || isShutdown()) + q = pick(); // Choose fairly, either non-empty queue. + else if (in) + q = &inQueue; + else if (out) + q = &outQueue; + Event* e = (q && !q->empty()) ? q->pop() : 0; + update(); + if (e) + e->complete(*this); + return e; } -void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event) + + +// ================================================================ +// EventChannel::Impl + + +EventChannel::Impl::Impl(int epollSize): + epollFd(-1), isShutdown(false) { - ZeroStruct<struct epoll_event> ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) - throw QPID_POSIX_ERROR(errno); + // Create the epoll file descriptor. + epollFd = epoll_create(epollSize); + QPID_POSIX_CHECK(epollFd); + + // Create a pipe and write a single byte. The byte is never + // read so the pipes read fd is always ready for read. + // We activate the FD when there are messages in the queue. + QPID_POSIX_CHECK(::pipe(shutdownPipe)); + static char zero = '\0'; + QPID_POSIX_CHECK(::write(shutdownPipe[1], &zero, 1)); } -void EventHandler::epollDel(int fd) { - if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0) - throw QPID_POSIX_ERROR(errno); +EventChannel::Impl::~Impl() { + shutdown(); + ::close(epollFd); + ::close(shutdownPipe[0]); + ::close(shutdownPipe[1]); } -Event* EventHandler::complete(EventHandler& eh) -{ - assert(&eh == this); - Event* event = mqGet(); - return event==0 ? 0 : event->complete(eh); + +void EventChannel::Impl::shutdown() { + Monitor::ScopedLock l(monitor); + if (!isShutdown) { // I'm starting shutdown. + isShutdown = true; + if (nWaiters == 0) + return; + + // TODO aconway 2006-12-20: If I just close the epollFd will + // that wake all threads? If so with what? Would be simpler than: + + CleanStruct<epoll_event> ee; + ee.data.ptr = 0; + ee.events = EPOLLIN; + QPID_POSIX_CHECK( + epoll_ctl(epollFd, EPOLL_CTL_ADD, shutdownPipe[0], &ee)); + } + // Wait for nWaiters to get out. + while (nWaiters > 0) { + monitor.wait(); + } +} + +// TODO aconway 2006-12-20: DEBUG remove +struct epoll { + epoll(uint32_t e) : events(e) { } + uint32_t events; +}; + +#define BIT(X) out << ((e.events & X) ? __STRING(X) "." : "") +ostream& operator << (ostream& out, epoll e) { + out << "epoll_event.events: "; + BIT(EPOLLIN); + BIT(EPOLLPRI); + BIT(EPOLLOUT); + BIT(EPOLLRDNORM); + BIT(EPOLLRDBAND); + BIT(EPOLLWRNORM); + BIT(EPOLLWRBAND); + BIT(EPOLLMSG); + BIT(EPOLLERR); + BIT(EPOLLHUP); + BIT(EPOLLONESHOT); + BIT(EPOLLET); + return out; } + + +/** + * Wait for epoll to wake up, return the descriptor or 0 on timeout. + */ +Event* EventChannel::Impl::wait(Duration timeoutNs) +{ + { + Monitor::ScopedLock l(monitor); + if (isShutdown) + throw ShutdownException(); + } + + // Increase nWaiters for the duration, notify the monitor if I'm + // the last one out. + // + AtomicCount::ScopedIncrement si( + nWaiters, boost::bind(&Monitor::notifyAll, &monitor)); + + // No lock, all thread safe calls or local variables: + // + const long timeoutMs = + (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC; + CleanStruct<epoll_event> ee; + Event* event = 0; + + // Loop till we get a completed event. Some events may repost + // themselves and return 0, e.g. incomplete read or write events. + //TODO aconway 2006-12-20: FIX THIS! + while (!event) { + int n = ::epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe. + if (n == 0) // Timeout + return 0; + if (n < 0 && errno == EINTR) // Interrupt, ignore it. + continue; + if (n < 0) + throw QPID_POSIX_ERROR(errno); + assert(n == 1); + Descriptor* ed = + reinterpret_cast<Descriptor*>(ee.data.ptr); + if (ed == 0) // We're being shut-down. + throw ShutdownException(); + assert(ed != 0); + event = ed->wake(ee.events); + } + return event; +} + +//EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) { +// Mutex::ScopedLock l(monitor); +// Descriptor& ed = descriptors[fd]; +// ed.activate(epollFd, fd); +// return ed; +//} + + // ================================================================ // EventChannel @@ -169,157 +463,134 @@ EventChannel::shared_ptr EventChannel::create() { return shared_ptr(new EventChannel()); } -EventChannel::EventChannel() : handler(new EventHandler()) {} +EventChannel::EventChannel() : impl(new EventChannel::Impl()) {} EventChannel::~EventChannel() {} -void EventChannel::postEvent(Event& e) +void EventChannel::post(Event& e) { - e.prepare(*handler); + e.prepare(*impl); } -Event* EventChannel::getEvent() +Event* EventChannel::wait(Duration timeoutNs) { - static const int infiniteTimeout = -1; - ZeroStruct<struct epoll_event> epollEvent; + return impl->wait(timeoutNs); +} - // Loop until we can complete the event. Some events may re-post - // themselves and return 0 from complete, e.g. partial reads. // - Event* event = 0; - while (event == 0) { - int eventCount = epoll_wait(handler->getEpollFd(), - &epollEvent, 1, infiniteTimeout); - if (eventCount < 0) { - if (errno != EINTR) { - QPID_LOG(warn, "Ignoring error: " - << PosixError::getMessage(errno)); - assert(0); - } - } - else if (eventCount == 1) { - event = reinterpret_cast<Event*>(epollEvent.data.ptr); - assert(event != 0); - try { - event = event->complete(*handler); - } - catch (const Exception& e) { - if (event) - event->setError(e); - } - catch (const std::exception& e) { - if (event) - event->setError(e); - } - } - } - return event; +void EventChannel::shutdown() { + impl->shutdown(); } + +// ================================================================ +// Event and subclasses. + Event::~Event() {} -void Event::prepare(EventHandler& handler) -{ - handler.mqPut(this); +Exception::shared_ptr_const Event::getException() const { + return exception; } -bool Event::hasError() const { - return error; +void Event::throwIfException() { + if (getException()) + exception->throwSelf(); } -void Event::throwIfError() throw (Exception) { - if (hasError()) - error.throwSelf(); -} - -Event* Event::complete(EventHandler&) +void Event::dispatch() { - return this; + if (!callback.empty()) + callback(); } -void Event::dispatch() -{ +void Event::setException(const std::exception& e) { + const Exception* ex = dynamic_cast<const Exception*>(&e); + if (ex) + exception.reset(ex->clone().release()); + else + exception.reset(new Exception(e)); +#ifndef NDEBUG + // Throw and re-catch the exception. Has no effect on the + // program but it triggers debuggers watching for throw. The + // context that sets the exception is more informative for + // debugging purposes than the one that ultimately throws it. + // try { - if (!callback.empty()) - callback(); - } catch (const std::exception&) { - throw; - } catch (...) { - throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception."); + throwIfException(); } + catch (...) { } // Ignored. +#endif } -void Event::setError(const ExceptionHolder& e) { - error = e; +int FDEvent::getFDescriptor() const { + return descriptor.getFD(); } -void ReadEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +// TODO: AMS 21/12/06 Don't like the inline new, probably cause a memory leak +ReadEvent::ReadEvent(int fd, void* buf, size_t sz,Callback cb, bool noWait) : + IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesRead(0) { } -ssize_t ReadEvent::doRead() { - ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received, - size - received); - if (n > 0) received += n; - return n; +void ReadEvent::prepare(EventChannel::Impl& impl) { + EventChannel::Descriptor& d = getDescriptor(); + impl.activate(d); + d.getQueue(IN).push(this); } -Event* ReadEvent::complete(EventHandler& handler) +void ReadEvent::complete(EventChannel::Descriptor& ed) { - // Read as much as possible without blocking. - ssize_t n = doRead(); - while (n > 0 && received < size) doRead(); - - if (received == size) { - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - return this; - } - else if (n <0 && (errno == EAGAIN)) { - // Keep polling for more. - handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this); - return 0; - } - else { - // Unexpected EOF or error. Throw ENODATA for EOF. - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA); + ssize_t n = ::read(getFDescriptor(), + static_cast<char*>(buffer) + bytesRead, + size - bytesRead); + if (n > 0) + bytesRead += n; + if (n == 0 || (n < 0 && errno != EAGAIN)) { + // Use ENODATA for file closed. + setException(QPID_POSIX_ERROR(n == 0 ? ENODATA : errno)); + ed.shutdownUnsafe(); } } -void WriteEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this); +WriteEvent::WriteEvent(int fd, const void* buf, size_t sz, Callback cb) : + IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesWritten(0) { +} + +void WriteEvent::prepare(EventChannel::Impl& impl) { + EventChannel::Descriptor& d = getDescriptor(); + impl.activate(d); + d.getQueue(OUT).push(this); } -Event* WriteEvent::complete(EventHandler& handler) + +void WriteEvent::complete(EventChannel::Descriptor& ed) { - ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written, - size - written); - if (n < 0) throw QPID_POSIX_ERROR(errno); - written += n; - if(written < size) { - // Keep polling. - handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this); - return 0; + ssize_t n = ::write(getFDescriptor(), + static_cast<const char*>(buffer) + bytesWritten, + size - bytesWritten); + if (n > 0) + bytesWritten += n; + if(n < 0 && errno != EAGAIN) { + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. } - written = 0; // Reset for re-use. - handler.epollDel(descriptor); - return this; } -void AcceptEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +AcceptEvent::AcceptEvent(int fd, Callback cb) : + FDEvent(cb, *(new EventChannel::Descriptor(fd))), accepted(0) { +} + +void AcceptEvent::prepare(EventChannel::Impl& impl) { + EventChannel::Descriptor& d = getDescriptor(); + impl.activate(d); + d.getQueue(IN).push(this); } -Event* AcceptEvent::complete(EventHandler& handler) +void AcceptEvent::complete(EventChannel::Descriptor& ed) { - handler.epollDel(descriptor); - accepted = ::accept(descriptor, 0, 0); - if (accepted < 0) throw QPID_POSIX_ERROR(errno); - return this; + accepted = ::accept(getFDescriptor(), 0, 0); + if (accepted < 0) { + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. + } } }} diff --git a/cpp/src/qpid/sys/posix/EventChannel.h b/cpp/src/qpid/sys/posix/EventChannel.h index f465580996..85e121379a 100644 --- a/cpp/src/qpid/sys/posix/EventChannel.h +++ b/cpp/src/qpid/sys/posix/EventChannel.h @@ -20,7 +20,10 @@ */ #include "qpid/SharedObject.h" -#include "qpid/ExceptionHolder.h" +#include "qpid/Exception.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Time.h" + #include <boost/function.hpp> #include <memory> @@ -28,11 +31,57 @@ namespace qpid { namespace sys { class Event; -class EventHandler; -class EventChannel; + +/** + * Channel to post and wait for events. + */ +class EventChannel : public qpid::SharedObject<EventChannel> +{ + public: + static shared_ptr create(); + + /** Exception throw from wait() if channel is shut down. */ + class ShutdownException : public qpid::Exception {}; + + ~EventChannel(); + + /** Post an event to the channel. */ + void post(Event& event); + + /** + * Wait for the next complete event, up to timeout. + *@return Pointer to event or 0 if timeout elapses. + *@exception ShutdownException if the channel is shut down. + */ + Event* wait(Duration timeout = TIME_INFINITE); + + /** + * Shut down the event channel. + * Blocks till all threads have exited wait() + */ + void shutdown(); + + + // Internal classes. + class Impl; + class Queue; + class Descriptor; + + private: + + EventChannel(); + + Mutex lock; + boost::shared_ptr<Impl> impl; +}; /** * Base class for all Events. + * + * Derived classes define events representing various async IO operations. + * When an event is complete, it is returned by the EventChannel to + * a thread calling wait. The thread will call Event::dispatch() to + * execute code associated with event completion. */ class Event { @@ -40,135 +89,124 @@ class Event /** Type for callback when event is dispatched */ typedef boost::function0<void> Callback; - /** - * Create an event with optional callback. - * Instances of Event are sent directly through the channel. - * Derived classes define additional waiting behaviour. - *@param cb A callback functor that is invoked when dispatch() is called. - */ - Event(Callback cb = 0) : callback(cb) {} - virtual ~Event(); /** Call the callback provided to the constructor, if any. */ void dispatch(); - /** True if there was an error processing this event */ - bool hasError() const; + /** + *If there was an exception processing this Event, return it. + *@return 0 if there was no exception. + */ + qpid::Exception::shared_ptr_const getException() const; + + /** If getException() throw the corresponding exception. */ + void throwIfException(); - /** If hasError() throw the corresponding exception. */ - void throwIfError() throw(Exception); + /** Set the dispatch callback. */ + void setCallback(Callback cb) { callback = cb; } + + /** Set the exception. */ + void setException(const std::exception& e); protected: - virtual void prepare(EventHandler&); - virtual Event* complete(EventHandler&); - void setError(const ExceptionHolder& e); + Event(Callback cb=0) : callback(cb) {} + + virtual void prepare(EventChannel::Impl&) = 0; + virtual void complete(EventChannel::Descriptor&) = 0; Callback callback; - ExceptionHolder error; + Exception::shared_ptr_const exception; friend class EventChannel; - friend class EventHandler; + friend class EventChannel::Queue; }; -template <class BufT> -class IOEvent : public Event { +/** Base class for events related to a file descriptor */ +class FDEvent : public Event { + public: + EventChannel::Descriptor& getDescriptor() const { return descriptor; } + int getFDescriptor() const; + + protected: + FDEvent(Callback cb, EventChannel::Descriptor& fd) + : Event(cb), descriptor(fd) {} + // TODO AMS: 1/6/07 I really don't think this is correct, but + // the descriptor is immutable + FDEvent& operator=(const FDEvent& rhs) { Event::operator=(rhs); return *this; } + + private: + EventChannel::Descriptor& descriptor; +}; + +/** Base class for read or write events. */ +class IOEvent : public FDEvent { public: - void getDescriptor() const { return descriptor; } size_t getSize() const { return size; } - BufT getBuffer() const { return buffer; } - + protected: - IOEvent(int fd, Callback cb, size_t sz, BufT buf) : - Event(cb), descriptor(fd), buffer(buf), size(sz) {} + IOEvent(Callback cb, EventChannel::Descriptor& fd, size_t sz, bool noWait_) : + FDEvent(cb, fd), size(sz), noWait(noWait_) {} - int descriptor; - BufT buffer; size_t size; + bool noWait; }; /** Asynchronous read event */ -class ReadEvent : public IOEvent<void*> +class ReadEvent : public IOEvent { public: - explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) : - IOEvent<void*>(fd, cb, sz, buf), received(0) {} + explicit ReadEvent(int fd, void* buf=0, size_t sz=0,Callback cb=0, bool noWait=false); + void* getBuffer() const { return buffer; } + size_t getBytesRead() const { return bytesRead; } + private: - void prepare(EventHandler&); - Event* complete(EventHandler&); + void prepare(EventChannel::Impl&); + void complete(EventChannel::Descriptor&); ssize_t doRead(); - size_t received; + void* buffer; + size_t bytesRead; }; /** Asynchronous write event */ -class WriteEvent : public IOEvent<const void*> +class WriteEvent : public IOEvent { public: - explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0, - Callback cb=0) : - IOEvent<const void*>(fd, cb, sz, buf), written(0) {} + explicit WriteEvent(int fd, const void* buf=0, size_t sz=0, Callback cb=0); - protected: - void prepare(EventHandler&); - Event* complete(EventHandler&); + const void* getBuffer() const { return buffer; } + size_t getBytesWritten() const { return bytesWritten; } private: + void prepare(EventChannel::Impl&); + void complete(EventChannel::Descriptor&); ssize_t doWrite(); - size_t written; + + const void* buffer; + size_t bytesWritten; }; + /** Asynchronous socket accept event */ -class AcceptEvent : public Event +class AcceptEvent : public FDEvent { public: /** Accept a connection on fd. */ - explicit AcceptEvent(int fd=-1, Callback cb=0) : - Event(cb), descriptor(fd), accepted(0) {} - - /** Get descriptor for server socket */ + explicit AcceptEvent(int fd, Callback cb=0); + + /** Get descriptor for accepted server socket */ int getAcceptedDesscriptor() const { return accepted; } private: - void prepare(EventHandler&); - Event* complete(EventHandler&); + void prepare(EventChannel::Impl&); + void complete(EventChannel::Descriptor&); - int descriptor; int accepted; }; -class QueueSet; - -/** - * Channel to post and wait for events. - */ -class EventChannel : public qpid::SharedObject<EventChannel> -{ - public: - static shared_ptr create(); - - ~EventChannel(); - - /** Post an event to the channel. */ - void postEvent(Event& event); - - /** Post an event to the channel. Must not be 0. */ - void postEvent(Event* event) { postEvent(*event); } - - /** - * Wait for the next complete event. - *@return Pointer to event. Will never return 0. - */ - Event* getEvent(); - - private: - EventChannel(); - boost::shared_ptr<EventHandler> handler; -}; - - }} diff --git a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp index d5a2c238d9..1a5fceb56e 100644 --- a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp @@ -106,7 +106,7 @@ void EventChannelAcceptor::run(ConnectionInputHandlerFactory* f) { if (!isRunning && !isShutdown) { isRunning = true; factory = f; - threads->postEvent(acceptEvent); + threads->post(acceptEvent); } } threads->join(); // Wait for shutdown. @@ -120,7 +120,7 @@ void EventChannelAcceptor::shutdown() { isShutdown = true; } if (doShutdown) { - ::close(acceptEvent.getDescriptor()); + ::close(acceptEvent.getFDescriptor()); threads->shutdown(); for_each(connections.begin(), connections.end(), boost::bind(&EventChannelConnection::close, _1)); @@ -139,11 +139,11 @@ void EventChannelAcceptor::accept() shutdown(); return; } - // TODO aconway 2006-11-29: Need to reap closed connections also. int fd = acceptEvent.getAcceptedDesscriptor(); + threads->post(acceptEvent); // Keep accepting. + // TODO aconway 2006-11-29: Need to reap closed connections also. connections.push_back( new EventChannelConnection(threads, *factory, fd, fd, isTrace)); - threads->postEvent(acceptEvent); // Keep accepting. } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp index 0c1c81b6fe..a36f096a4d 100644 --- a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp @@ -24,7 +24,6 @@ #include "EventChannelConnection.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/QpidError.h" -#include "qpid/log/Statement.h" using namespace std; using namespace qpid; @@ -44,6 +43,8 @@ EventChannelConnection::EventChannelConnection( ) : readFd(rfd), writeFd(wfd ? wfd : rfd), + readEvent(readFd), + writeEvent(writeFd), readCallback(boost::bind(&EventChannelConnection::closeOnException, this, &EventChannelConnection::endInitRead)), @@ -55,8 +56,8 @@ EventChannelConnection::EventChannelConnection( out(bufferSize), isTrace(isTrace_) { - BOOST_ASSERT(readFd > 0); - BOOST_ASSERT(writeFd > 0); + assert(readFd > 0); + assert(writeFd > 0); closeOnException(&EventChannelConnection::startRead); } @@ -133,14 +134,17 @@ void EventChannelConnection::startWrite() { } // No need to lock here - only one thread can be writing at a time. out.clear(); - QPID_LOG(trace, "Send on socket " << writeFd << ": " << *frame); + if (isTrace) + cout << "Send on socket " << writeFd << ": " << *frame << endl; frame->encode(out); out.flip(); + // TODO: AMS 1/6/07 This only works because we already have the correct fd + // in the descriptor - change not to use assigment writeEvent = WriteEvent( writeFd, out.start(), out.available(), boost::bind(&EventChannelConnection::closeOnException, this, &EventChannelConnection::endWrite)); - threads->postEvent(writeEvent); + threads->post(writeEvent); } // ScopedBusy ctor increments busyThreads. @@ -161,12 +165,18 @@ void EventChannelConnection::endWrite() { ScopedBusy(*this); { Monitor::ScopedLock lock(monitor); + assert(isWriting); isWriting = false; - if (isClosed) + if (isClosed) return; writeEvent.throwIfException(); + if (writeEvent.getBytesWritten() < writeEvent.getSize()) { + // Keep writing the current event till done. + isWriting = true; + threads->post(writeEvent); + } } - // Check if there's more in to write in the write queue. + // Continue writing from writeFrames queue. startWrite(); } @@ -179,8 +189,8 @@ void EventChannelConnection::endWrite() { void EventChannelConnection::startRead() { // Non blocking read, as much as we can swallow. readEvent = ReadEvent( - readFd, in.start(), in.available(), readCallback,true); - threads->postEvent(readEvent); + readFd, in.start(), in.available(), readCallback); + threads->post(readEvent); } // Completion of initial read, expect protocolInit. @@ -194,7 +204,7 @@ void EventChannelConnection::endInitRead() { in.flip(); ProtocolInitiation protocolInit; if(protocolInit.decode(in)){ - handler->initiated(&protocolInit); + handler->initiated(protocolInit); readCallback = boost::bind( &EventChannelConnection::closeOnException, this, &EventChannelConnection::endRead); @@ -215,8 +225,10 @@ void EventChannelConnection::endRead() { in.flip(); AMQFrame frame; while (frame.decode(in)) { - QPID_LOG(trace, "Received on socket " << readFd - << ": " << frame); + // TODO aconway 2006-11-30: received should take Frame& + if (isTrace) + cout << "Received on socket " << readFd + << ": " << frame << endl; handler->received(&frame); } in.compact(); diff --git a/cpp/src/qpid/sys/posix/EventChannelConnection.h b/cpp/src/qpid/sys/posix/EventChannelConnection.h index bd010a4240..394df55fd9 100644 --- a/cpp/src/qpid/sys/posix/EventChannelConnection.h +++ b/cpp/src/qpid/sys/posix/EventChannelConnection.h @@ -34,7 +34,7 @@ namespace sys { class ConnectionInputHandlerFactory; /** - * Implements ConnectionOutputHandler and delegates to a ConnectionInputHandler + * Implements SessionContext and delegates to a SessionHandler * for a connection via the EventChannel. *@param readDescriptor file descriptor for reading. *@param writeDescriptor file descriptor for writing, @@ -50,7 +50,7 @@ class EventChannelConnection : public ConnectionOutputHandler { bool isTrace = false ); - // TODO aconway 2006-11-30: ConnectionOutputHandler::send should take auto_ptr + // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr virtual void send(qpid::framing::AMQFrame* frame) { send(std::auto_ptr<qpid::framing::AMQFrame>(frame)); } diff --git a/cpp/src/qpid/sys/posix/EventChannelThreads.cpp b/cpp/src/qpid/sys/posix/EventChannelThreads.cpp index 68c57405d5..70954d0c16 100644 --- a/cpp/src/qpid/sys/posix/EventChannelThreads.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelThreads.cpp @@ -16,27 +16,40 @@ * */ -#include "EventChannelThreads.h" -#include "qpid/sys/Runnable.h" -#include "qpid/log/Statement.h" #include <iostream> -using namespace std; +#include <limits> + #include <boost/bind.hpp> +#include "qpid/sys/Runnable.h" + +#include "EventChannelThreads.h" + namespace qpid { namespace sys { +const size_t EventChannelThreads::unlimited = + std::numeric_limits<size_t>::max(); + EventChannelThreads::shared_ptr EventChannelThreads::create( - EventChannel::shared_ptr ec) + EventChannel::shared_ptr ec, size_t min, size_t max +) { - return EventChannelThreads::shared_ptr(new EventChannelThreads(ec)); + return EventChannelThreads::shared_ptr( + new EventChannelThreads(ec, min, max)); } -EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) : - channel(ec), nWaiting(0), state(RUNNING) +EventChannelThreads::EventChannelThreads( + EventChannel::shared_ptr ec, size_t min, size_t max) : + minThreads(std::max(size_t(1), min)), + maxThreads(std::min(min, max)), + channel(ec), + nWaiting(0), + state(RUNNING) { - // TODO aconway 2006-11-15: Estimate initial threads based on CPUs. - addThread(); + Monitor::ScopedLock l(monitor); + while (workers.size() < minThreads) + workers.push_back(Thread(*this)); } EventChannelThreads::~EventChannelThreads() { @@ -46,32 +59,30 @@ EventChannelThreads::~EventChannelThreads() { void EventChannelThreads::shutdown() { - ScopedLock lock(*this); + Monitor::ScopedLock lock(monitor); if (state != RUNNING) // Already shutting down. return; - for (size_t i = 0; i < workers.size(); ++i) { - channel->postEvent(terminate); - } - state = TERMINATE_SENT; - notify(); // Wake up one join() thread. + state = TERMINATING; + channel->shutdown(); + monitor.notify(); // Wake up one join() thread. } void EventChannelThreads::join() { { - ScopedLock lock(*this); + Monitor::ScopedLock lock(monitor); while (state == RUNNING) // Wait for shutdown to start. - wait(); + monitor.wait(); if (state == SHUTDOWN) // Shutdown is complete return; if (state == JOINING) { // Someone else is doing the join. while (state != SHUTDOWN) - wait(); + monitor.wait(); return; } // I'm the joining thread - assert(state == TERMINATE_SENT); + assert(state == TERMINATING); state = JOINING; } // Drop the lock. @@ -80,12 +91,13 @@ void EventChannelThreads::join() workers[i].join(); } state = SHUTDOWN; - notifyAll(); // Notify other join() threaeds. + monitor.notifyAll(); // Notify any other join() threads. } void EventChannelThreads::addThread() { - ScopedLock l(*this); - workers.push_back(Thread(*this)); + Monitor::ScopedLock l(monitor); + if (workers.size() < maxThreads) + workers.push_back(Thread(*this)); } void EventChannelThreads::run() @@ -94,23 +106,20 @@ void EventChannelThreads::run() AtomicCount::ScopedIncrement inc(nWaiting); try { while (true) { - Event* e = channel->getEvent(); + Event* e = channel->wait(); assert(e != 0); - if (e == &terminate) { - return; - } AtomicCount::ScopedDecrement dec(nWaiting); - // I'm no longer waiting, make sure someone is. - if (dec == 0) + // Make sure there's at least one waiting thread. + if (dec == 0 && state == RUNNING) addThread(); e->dispatch(); } } - catch (const std::exception& e) { - QPID_LOG(error, e.what()); + catch (const EventChannel::ShutdownException& e) { + return; } - catch (...) { - QPID_LOG(error, "unknown exception"); + catch (const std::exception& e) { + Exception::log(e, "Exception in EventChannelThreads::run()"); } } diff --git a/cpp/src/qpid/sys/posix/EventChannelThreads.h b/cpp/src/qpid/sys/posix/EventChannelThreads.h index 245cefe585..19112cf4db 100644 --- a/cpp/src/qpid/sys/posix/EventChannelThreads.h +++ b/cpp/src/qpid/sys/posix/EventChannelThreads.h @@ -18,14 +18,16 @@ * limitations under the License. * */ -#include <vector> +#include "EventChannel.h" #include "qpid/Exception.h" -#include "qpid/sys/Time.h" +#include "qpid/sys/AtomicCount.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" -#include "qpid/sys/AtomicCount.h" -#include "EventChannel.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Runnable.h" + +#include <vector> namespace qpid { namespace sys { @@ -33,26 +35,33 @@ namespace sys { /** Dynamic thread pool serving an EventChannel. - Threads run a loop { e = getEvent(); e->dispatch(); } + Threads run a loop { e = wait(); e->dispatch(); } The size of the thread pool is automatically adjusted to optimal size. */ class EventChannelThreads : public qpid::SharedObject<EventChannelThreads>, - public sys::Monitor, private sys::Runnable + private sys::Runnable { public: - /** Create the thread pool and start initial threads. */ + /** Constant to represent an unlimited number of threads */ + static const size_t unlimited; + + /** + * Create the thread pool and start initial threads. + * @param minThreads Pool will initialy contain minThreads threads and + * will never shrink to less until shutdown. + * @param maxThreads Pool will never grow to more than maxThreads. + */ static EventChannelThreads::shared_ptr create( - EventChannel::shared_ptr channel + EventChannel::shared_ptr channel = EventChannel::create(), + size_t minThreads = 1, + size_t maxThreads = unlimited ); ~EventChannelThreads(); /** Post event to the underlying channel */ - void postEvent(Event& event) { channel->postEvent(event); } - - /** Post event to the underlying channel Must not be 0. */ - void postEvent(Event* event) { channel->postEvent(event); } + void post(Event& event) { channel->post(event); } /** * Terminate all threads. @@ -68,21 +77,25 @@ class EventChannelThreads : private: typedef std::vector<sys::Thread> Threads; typedef enum { - RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN + RUNNING, TERMINATING, JOINING, SHUTDOWN } State; - EventChannelThreads(EventChannel::shared_ptr underlyingChannel); + EventChannelThreads( + EventChannel::shared_ptr channel, size_t min, size_t max); + void addThread(); void run(); bool keepRunning(); void adjustThreads(); + Monitor monitor; + size_t minThreads; + size_t maxThreads; EventChannel::shared_ptr channel; Threads workers; sys::AtomicCount nWaiting; State state; - Event terminate; }; diff --git a/cpp/src/qpid/sys/posix/check.h b/cpp/src/qpid/sys/posix/check.h index fe53321e27..7fa7b69d3b 100644 --- a/cpp/src/qpid/sys/posix/check.h +++ b/cpp/src/qpid/sys/posix/check.h @@ -43,7 +43,7 @@ class PosixError : public qpid::QpidError int getErrNo() { return errNo; } - Exception* clone() const throw() { return new PosixError(*this); } + Exception::auto_ptr clone() const throw() { return Exception::auto_ptr(new PosixError(*this)); } void throwSelf() const { throw *this; } @@ -56,6 +56,10 @@ class PosixError : public qpid::QpidError /** Create a PosixError for the current file/line and errno. */ #define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE) +/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */ +#define QPID_POSIX_CHECK(RESULT) \ + if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno)) + /** Throw a posix error if errNo is non-zero */ #define QPID_POSIX_THROW_IF(ERRNO) \ if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO)) diff --git a/cpp/src/tests/DispatcherTest.cpp b/cpp/src/tests/DispatcherTest.cpp new file mode 100644 index 0000000000..7631956acc --- /dev/null +++ b/cpp/src/tests/DispatcherTest.cpp @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Poller.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Thread.h" + +#include <sys/types.h> +#include <sys/socket.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> + +#include <iostream> +#include <boost/bind.hpp> + +using namespace std; +using namespace qpid::sys; + +int writeALot(int fd, const string& s) { + int bytesWritten = 0; + do { + errno = 0; + int lastWrite = ::write(fd, s.c_str(), s.size()); + if ( lastWrite >= 0) { + bytesWritten += lastWrite; + } + } while (errno != EAGAIN); + return bytesWritten; +} + +int readALot(int fd) { + int bytesRead = 0; + char buf[10240]; + + do { + errno = 0; + int lastRead = ::read(fd, buf, sizeof(buf)); + if ( lastRead >= 0) { + bytesRead += lastRead; + } + } while (errno != EAGAIN); + return bytesRead; +} + +int64_t writtenBytes = 0; +int64_t readBytes = 0; + +void writer(DispatchHandle& h, int fd, const string& s) { + writtenBytes += writeALot(fd, s); + h.rewatch(); +} + +void reader(DispatchHandle& h, int fd) { + readBytes += readALot(fd); + h.rewatch(); +} + +int main(int argc, char** argv) +{ + // Create poller + Poller::shared_ptr poller(new Poller); + + // Create dispatcher thread + Dispatcher d(poller); + Dispatcher d1(poller); + //Dispatcher d2(poller); + //Dispatcher d3(poller); + Thread dt(d); + Thread dt1(d1); + //Thread dt2(d2); + //Thread dt3(d3); + + // Setup sender and receiver + int sv[2]; + int rc = ::socketpair(AF_LOCAL, SOCK_STREAM, 0, sv); + assert(rc >= 0); + + // Set non-blocking + rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK); + assert(rc >= 0); + + rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK); + assert(rc >= 0); + + // Make up a large string + string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;"; + for (int i = 0; i < 8; i++) + testString += testString; + + DispatchHandle rh(sv[0], boost::bind(reader, _1, sv[0]), 0); + DispatchHandle wh(sv[1], 0, boost::bind(writer, _1, sv[1], testString)); + + rh.watch(poller); + wh.watch(poller); + + // wait 2 minutes then shutdown + sleep(60); + + poller->shutdown(); + dt.join(); + dt1.join(); + //dt2.join(); + //dt3.join(); + + cout << "Wrote: " << writtenBytes << "\n"; + cout << "Read: " << readBytes << "\n"; + + return 0; +} diff --git a/cpp/src/tests/PollerTest.cpp b/cpp/src/tests/PollerTest.cpp new file mode 100644 index 0000000000..fcb1d0dadf --- /dev/null +++ b/cpp/src/tests/PollerTest.cpp @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * Use socketpair to test the poller + */ + +#include "qpid/sys/Poller.h" + +#include <string> +#include <iostream> +#include <memory> +#include <exception> + +#include <assert.h> + +#include <sys/types.h> +#include <sys/socket.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> + +using namespace std; +using namespace qpid::sys; + +int writeALot(int fd, const string& s) { + int bytesWritten = 0; + do { + errno = 0; + int lastWrite = ::write(fd, s.c_str(), s.size()); + if ( lastWrite >= 0) { + bytesWritten += lastWrite; + } + } while (errno != EAGAIN); + return bytesWritten; +} + +int readALot(int fd) { + int bytesRead = 0; + char buf[1024]; + + do { + errno = 0; + int lastRead = ::read(fd, buf, sizeof(buf)); + if ( lastRead >= 0) { + bytesRead += lastRead; + } + } while (errno != EAGAIN); + return bytesRead; +} + +int main(int argc, char** argv) +{ + try + { + int sv[2]; + int rc = ::socketpair(AF_LOCAL, SOCK_STREAM, 0, sv); + assert(rc >= 0); + + // Set non-blocking + rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK); + assert(rc >= 0); + + rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK); + assert(rc >= 0); + + // Make up a large string + string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;"; + for (int i = 0; i < 6; i++) + testString += testString; + + // Read as much as we can from socket 0 + int bytesRead = readALot(sv[0]); + assert(bytesRead == 0); + cout << "Read(0): " << bytesRead << " bytes\n"; + + // Write as much as we can to socket 0 + int bytesWritten = writeALot(sv[0], testString); + cout << "Wrote(0): " << bytesWritten << " bytes\n"; + + // Read as much as we can from socket 1 + bytesRead = readALot(sv[1]); + assert(bytesRead == bytesWritten); + cout << "Read(1): " << bytesRead << " bytes\n"; + + auto_ptr<Poller> poller(new Poller); + + PollerHandle h0(sv[0]); + PollerHandle h1(sv[1]); + + poller->addFd(h0, Poller::INOUT); + + // Wait for 500ms - h0 should be writable + Poller::Event event = poller->wait(); + assert(event.handle == &h0); + assert(event.dir == Poller::OUT); + + // Write as much as we can to socket 0 + bytesWritten = writeALot(sv[0], testString); + cout << "Wrote(0): " << bytesWritten << " bytes\n"; + + // Wait for 500ms - h0 no longer writable + poller->rearmFd(h0); + event = poller->wait(500000000); + assert(event.handle == 0); + + // Test we can read it all now + poller->addFd(h1, Poller::INOUT); + event = poller->wait(); + assert(event.handle == &h1); + assert(event.dir == Poller::INOUT); + + bytesRead = readALot(sv[1]); + assert(bytesRead == bytesWritten); + cout << "Read(1): " << bytesRead << " bytes\n"; + + // At this point h1 should have been disabled from the poller + // (as it was just returned) and h0 can write again + event = poller->wait(); + assert(event.handle == &h0); + assert(event.dir == Poller::OUT); + + // Now both the handles should be disabled + event = poller->wait(500000000); + assert(event.handle == 0); + + // Test shutdown + poller->shutdown(); + event = poller->wait(); + assert(event.handle == 0); + assert(event.dir == Poller::SHUTDOWN); + + event = poller->wait(); + assert(event.handle == 0); + assert(event.dir == Poller::SHUTDOWN); + + poller->delFd(h1); + poller->delFd(h0); + + return 0; + } catch (exception& e) { + cout << "Caught exception " << e.what() << "\n"; + } +} + + |