diff options
Diffstat (limited to 'cpp/lib/common')
26 files changed, 825 insertions, 573 deletions
diff --git a/cpp/lib/common/Exception.cpp b/cpp/lib/common/Exception.cpp index 0161518011..ef88c5cb74 100644 --- a/cpp/lib/common/Exception.cpp +++ b/cpp/lib/common/Exception.cpp @@ -20,6 +20,7 @@ */ #include <Exception.h> +#include <iostream> namespace qpid { @@ -29,14 +30,32 @@ Exception::Exception(const std::string& str) throw() : whatStr(str) {} Exception::Exception(const char* str) throw() : whatStr(str) {} +Exception::Exception(const std::exception& e) throw() : whatStr(e.what()) {} + Exception::~Exception() throw() {} const char* Exception::what() const throw() { return whatStr.c_str(); } std::string Exception::toString() const throw() { return whatStr; } -Exception* Exception::clone() const throw() { return new Exception(*this); } +Exception::auto_ptr Exception::clone() const throw() { + return Exception::auto_ptr(new Exception(*this)); +} void Exception::throwSelf() const { throw *this; } +const char* Exception::defaultMessage = "Unexpected exception"; + +void Exception::log(const char* what, const char* message) { + std::cout << message << ": " << what << std::endl; +} + +void Exception::log(const std::exception& e, const char* message) { + log(e.what(), message); +} + +void Exception::logUnknown(const char* message) { + log("unknown exception.", message); +} + } // namespace qpid diff --git a/cpp/lib/common/Exception.h b/cpp/lib/common/Exception.h index f35d427bb0..185c395283 100644 --- a/cpp/lib/common/Exception.h +++ b/cpp/lib/common/Exception.h @@ -26,6 +26,7 @@ #include <string> #include <memory> #include <boost/shared_ptr.hpp> +#include <boost/function.hpp> namespace qpid { @@ -38,6 +39,10 @@ class Exception : public std::exception std::string whatStr; public: + typedef boost::shared_ptr<Exception> shared_ptr; + typedef boost::shared_ptr<const Exception> shared_ptr_const; + typedef std::auto_ptr<Exception> auto_ptr; + Exception() throw(); Exception(const std::string& str) throw(); Exception(const char* str) throw(); @@ -48,14 +53,65 @@ class Exception : public std::exception virtual const char* what() const throw(); virtual std::string toString() const throw(); - virtual Exception* clone() const throw(); + virtual std::auto_ptr<Exception> clone() const throw(); virtual void throwSelf() const; - typedef boost::shared_ptr<Exception> shared_ptr; -}; + /** Default message: "Unknown exception" or something like it. */ + static const char* defaultMessage; + /** + * Log a message of the form "message: what" + *@param what Exception's what() message. + *@param message Prefix message. + */ + static void log(const char* what, const char* message = defaultMessage); -} + /** + * Log an exception. + *@param e Exception to log. + + */ + static void log( + const std::exception& e, const char* message = defaultMessage); + + + /** + * Log an unknown exception - use in catch(...) + *@param message Prefix message. + */ + static void logUnknown(const char* message = defaultMessage); + + /** + * Wrapper template function to call another function inside + * try/catch and log any exception. Use boost::bind to wrap + * member function calls or functions with arguments. + * + *@param f Function to call in try block. + *@param retrhow If true the exception is rethrown. + *@param message Prefix message. + */ + template <class T> + static T tryCatchLog(boost::function0<T> f, bool rethrow=true, + const char* message=defaultMessage) + { + try { + return f(); + } + catch (const std::exception& e) { + log(e, message); + if (rethrow) + throw; + } + catch (...) { + logUnknown(message); + if (rethrow) + throw; + } + } + +}; + +} // namespace qpid #endif /*!_Exception_*/ diff --git a/cpp/lib/common/ExceptionHolder.cpp b/cpp/lib/common/ExceptionHolder.cpp index de8d7b2487..e69de29bb2 100644 --- a/cpp/lib/common/ExceptionHolder.cpp +++ b/cpp/lib/common/ExceptionHolder.cpp @@ -1,32 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "ExceptionHolder.h" - -namespace qpid { - -ExceptionHolder::ExceptionHolder(const std::exception& e) { - const Exception* ex = dynamic_cast<const Exception*>(&e); - if (ex) { - reset(ex->clone()); - } else { - reset(new Exception(e.what())); - } -} - -} diff --git a/cpp/lib/common/ExceptionHolder.h b/cpp/lib/common/ExceptionHolder.h index 83d0884be9..e69de29bb2 100644 --- a/cpp/lib/common/ExceptionHolder.h +++ b/cpp/lib/common/ExceptionHolder.h @@ -1,62 +0,0 @@ -#ifndef _qpid_ExceptionHolder_h -#define _qpid_ExceptionHolder_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <Exception.h> -#include <boost/shared_ptr.hpp> - -namespace qpid { - -/** - * Holder for a heap-allocated exc eption that can be stack allocated - * and thrown safely. - * - * Basically this is a shared_ptr with the Exception functions added - * so the catcher need not be aware that it is a pointer rather than a - * reference. - * - * shared_ptr is chosen over auto_ptr because it has normal - * copy semantics. - */ -class ExceptionHolder : public Exception, public boost::shared_ptr<Exception> -{ - public: - typedef boost::shared_ptr<Exception> shared_ptr; - - ExceptionHolder() throw() {} - ExceptionHolder(Exception* p) throw() : shared_ptr(p) {} - ExceptionHolder(shared_ptr p) throw() : shared_ptr(p) {} - - ExceptionHolder(const Exception& e) throw() : shared_ptr(e.clone()) {} - ExceptionHolder(const std::exception& e); - - ~ExceptionHolder() throw() {} - - const char* what() const throw() { return (*this)->what(); } - std::string toString() const throw() { return (*this)->toString(); } - virtual Exception* clone() const throw() { return (*this)->clone(); } - virtual void throwSelf() const { (*this)->throwSelf(); } -}; - -} // namespace qpid - - - -#endif /*!_qpid_ExceptionHolder_h*/ diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index e1f7503282..997558f3da 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -24,21 +24,23 @@ apr_hdr = \ $(apr)/LFSessionContext.h posix = sys/posix -posix_src = \ - $(posix)/PosixAcceptor.cpp \ - $(posix)/Socket.cpp \ - $(posix)/Thread.cpp \ - $(posix)/check.cpp \ - $(posix)/EventChannel.cpp \ - $(posix)/EventChannelThreads.cpp -posix_hdr = \ - $(posix)/check.h \ - $(posix)/EventChannel.h \ - $(posix)/EventChannelThreads.h +posix_src = \ + $(posix)/EventChannelAcceptor.cpp \ + $(posix)/Socket.cpp \ + $(posix)/Thread.cpp \ + $(posix)/check.cpp \ + $(posix)/EventChannel.cpp \ + $(posix)/EventChannelThreads.cpp \ + $(posix)/EventChannelConnection.cpp +posix_hdr = \ + $(posix)/check.h \ + $(posix)/EventChannel.h \ + $(posix)/EventChannelThreads.h \ + $(posix)/EventChannelConnection.h -EXTRA_DIST=$(posix_src) $(posix_hdr) -platform_src = $(apr_src) -platform_hdr = $(apr_hdr) +EXTRA_DIST=$(apr_src) $(apr_hdr) +platform_src = $(posix_src) +platform_hdr = $(posix_hdr) framing = framing gen = $(srcdir)/../../gen @@ -76,7 +78,6 @@ libqpidcommon_la_SOURCES = \ $(gen)/AMQP_MethodVersionMap.cpp \ $(gen)/AMQP_ServerProxy.cpp \ Exception.cpp \ - ExceptionHolder.cpp \ QpidError.cpp \ sys/Runnable.cpp \ sys/Time.cpp @@ -107,7 +108,6 @@ nobase_pkginclude_HEADERS = \ $(framing)/amqp_types.h \ $(framing)/AMQP_HighestVersion.h \ Exception.h \ - ExceptionHolder.h \ QpidError.h \ SharedObject.h \ sys/Acceptor.h \ diff --git a/cpp/lib/common/QpidError.cpp b/cpp/lib/common/QpidError.cpp index 7f4f9e2f34..9cbd66c841 100644 --- a/cpp/lib/common/QpidError.cpp +++ b/cpp/lib/common/QpidError.cpp @@ -22,23 +22,41 @@ #include <QpidError.h> #include <sstream> -using namespace qpid; +namespace qpid { QpidError::QpidError() : code(0) {} -QpidError::QpidError(int _code, const std::string& _msg, - const SrcLine& _loc) throw() +QpidError::QpidError( + int _code, const std::string& _msg, Location _loc) throw() : code(_code), msg(_msg), location(_loc) { - std::ostringstream os; - os << "Error [" << code << "] " << msg << " (" - << location.file << ":" << location.line << ")"; - whatStr = os.str(); + setWhat(); +} + +QpidError::QpidError( + int _code, const char* _msg, Location _loc) throw() + : code(_code), msg(_msg), location(_loc) +{ + setWhat(); } QpidError::~QpidError() throw() {} -Exception* QpidError::clone() const throw() { return new QpidError(*this); } +Exception::auto_ptr QpidError::clone() const throw() { + return Exception::auto_ptr(new QpidError(*this)); +} void QpidError::throwSelf() const { throw *this; } +void QpidError::setWhat() { + std::ostringstream os; + os << "Error [" << code << "] " << msg; + if (location.file) { + os << " (" ; + os << location.file << ":" << location.line; + os << ")"; + } + whatStr = os.str(); +} + +} // namespace qpid diff --git a/cpp/lib/common/QpidError.h b/cpp/lib/common/QpidError.h index 30d9d27076..9a47aa5e00 100644 --- a/cpp/lib/common/QpidError.h +++ b/cpp/lib/common/QpidError.h @@ -24,37 +24,45 @@ #include <memory> #include <ostream> #include <Exception.h> +#include <boost/current_function.hpp> namespace qpid { -struct SrcLine { - public: - SrcLine(const std::string& file_="", int line_=0) : - file(file_), line(line_) {} - - std::string file; - int line; -}; - class QpidError : public Exception { public: + // Use macro QPID_LOCATION to construct a location. + struct Location { + Location(const char* function_=0, const char* file_=0, int line_=0) : + function(function_), file(file_), line(line_) {} + const char* function; + const char* file; + int line; + }; + const int code; const std::string msg; - const SrcLine location; + const Location location; QpidError(); - QpidError(int _code, const std::string& _msg, const SrcLine& _loc) throw(); + QpidError(int _code, const char* _msg, const Location _loc) throw(); + QpidError(int _code, const std::string& _msg, const Location _loc) throw(); + ~QpidError() throw(); - Exception* clone() const throw(); + Exception::auto_ptr clone() const throw(); void throwSelf() const; + + private: + void setWhat(); }; } // namespace qpid -#define SRCLINE ::qpid::SrcLine(__FILE__, __LINE__) +#define QPID_ERROR_LOCATION \ + ::qpid::QpidError::Location(BOOST_CURRENT_FUNCTION, __FILE__, __LINE__) -#define QPID_ERROR(CODE, MESSAGE) ::qpid::QpidError((CODE), (MESSAGE), SRCLINE) +#define QPID_ERROR(CODE, MESSAGE) \ + ::qpid::QpidError((CODE), (MESSAGE), QPID_ERROR_LOCATION) #define THROW_QPID_ERROR(CODE, MESSAGE) throw QPID_ERROR(CODE,MESSAGE) diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index bec1946fb7..1ff3ff191f 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -40,9 +40,9 @@ namespace qpid { static AMQP_MethodVersionMap versionMap; u_int16_t channel; - u_int8_t type;//used if the body is decoded separately from the 'head' + u_int8_t type;//used if body decoded separately from 'head' AMQBody::shared_ptr body; - AMQBody::shared_ptr createMethodBody(Buffer& buffer); + AMQBody::shared_ptr createMethodBody(Buffer& buffer); public: AMQFrame(); diff --git a/cpp/lib/common/sys/Acceptor.h b/cpp/lib/common/sys/Acceptor.h index e6bc27a593..7aed068dd0 100644 --- a/cpp/lib/common/sys/Acceptor.h +++ b/cpp/lib/common/sys/Acceptor.h @@ -33,10 +33,11 @@ class SessionHandlerFactory; class Acceptor : public qpid::SharedObject<Acceptor> { public: - static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false); - virtual ~Acceptor() = 0; - virtual int16_t getPort() const = 0; - virtual void run(qpid::sys::SessionHandlerFactory* factory) = 0; + static Acceptor::shared_ptr create( + int16_t port, int backlog, int threads, bool trace = false); + virtual ~Acceptor(); + virtual int getPort() const = 0; + virtual void run(SessionHandlerFactory& factory) = 0; virtual void shutdown() = 0; }; diff --git a/cpp/lib/common/sys/AtomicCount.h b/cpp/lib/common/sys/AtomicCount.h index b625b2c9b0..7a9555480f 100644 --- a/cpp/lib/common/sys/AtomicCount.h +++ b/cpp/lib/common/sys/AtomicCount.h @@ -21,36 +21,52 @@ #include <boost/detail/atomic_count.hpp> #include <boost/noncopyable.hpp> +#include <boost/function.hpp> namespace qpid { namespace sys { /** - * Atomic counter. + * Increment counter in constructor and decrement in destructor. + * Optionally call a function if the decremented counter value is 0. + * Note the function must not throw, it is called in the destructor. */ -class AtomicCount : boost::noncopyable { +template <class Count> +class ScopedIncrement : boost::noncopyable { public: - class ScopedDecrement : boost::noncopyable { - public: - /** Decrement counter in constructor and increment in destructor. */ - ScopedDecrement(AtomicCount& c) : count(c) { value = --count; } - ~ScopedDecrement() { ++count; } - /** Return the value returned by the decrement. */ - operator long() { return value; } - private: - AtomicCount& count; - long value; - }; + ScopedIncrement(Count& c, boost::function0<void> f=0) + : count(c), callback(f) { ++count; } + ~ScopedIncrement() { if (--count == 0 && callback) callback(); } - class ScopedIncrement : boost::noncopyable { - public: - /** Increment counter in constructor and increment in destructor. */ - ScopedIncrement(AtomicCount& c) : count(c) { ++count; } - ~ScopedIncrement() { --count; } - private: - AtomicCount& count; - }; + private: + Count& count; + boost::function0<void> callback; +}; +/** Decrement counter in constructor and increment in destructor. */ +template <class Count> +class ScopedDecrement : boost::noncopyable { + public: + ScopedDecrement(Count& c) : count(c) { value = --count; } + ~ScopedDecrement() { ++count; } + + /** Return the value after the decrement. */ + operator long() { return value; } + + private: + Count& count; + long value; +}; + + +/** + * Atomic counter. + */ +class AtomicCount : boost::noncopyable { + public: + typedef ScopedIncrement<AtomicCount> ScopedIncrement; + typedef ScopedDecrement<AtomicCount> ScopedDecrement; + AtomicCount(long value = 0) : count(value) {} void operator++() { ++count ; } diff --git a/cpp/lib/common/sys/Runnable.cpp b/cpp/lib/common/sys/Runnable.cpp index 30122c682f..5d4f48a373 100644 --- a/cpp/lib/common/sys/Runnable.cpp +++ b/cpp/lib/common/sys/Runnable.cpp @@ -29,4 +29,8 @@ Runnable::Functor Runnable::functor() return boost::bind(&Runnable::run, this); } +void FunctorRunnable::run() { + f(); +} + }} diff --git a/cpp/lib/common/sys/Runnable.h b/cpp/lib/common/sys/Runnable.h index fb3927c612..ef18897b09 100644 --- a/cpp/lib/common/sys/Runnable.h +++ b/cpp/lib/common/sys/Runnable.h @@ -44,7 +44,16 @@ class Runnable Functor functor(); }; -}} +/** Runnable wrapper for a functor */ +class FunctorRunnable : public Runnable { + public: + explicit FunctorRunnable(const Runnable::Functor& runMe) : f(runMe) {} + void run(); + private: + Runnable::Functor f; +}; + +}} // namespace qpid::sys #endif diff --git a/cpp/lib/common/sys/Socket.h b/cpp/lib/common/sys/Socket.h index d793a240c6..e35ed5b07c 100644 --- a/cpp/lib/common/sys/Socket.h +++ b/cpp/lib/common/sys/Socket.h @@ -70,8 +70,11 @@ class Socket */ int listen(int port = 0, int backlog = 10); + /** Accept a connection. This socket must be listening */ + Socket accept(); + /** Get file descriptor */ - int fd(); + int fd() const; private: #ifdef USE_APR diff --git a/cpp/lib/common/sys/Thread.h b/cpp/lib/common/sys/Thread.h index 47b95b6234..9647dc2414 100644 --- a/cpp/lib/common/sys/Thread.h +++ b/cpp/lib/common/sys/Thread.h @@ -116,7 +116,8 @@ Thread::Thread(Runnable& runnable) { } void Thread::join(){ - QPID_POSIX_THROW_IF(pthread_join(thread, 0)); + if (thread != 0) + QPID_POSIX_THROW_IF(pthread_join(thread, 0)); } long Thread::id() { diff --git a/cpp/lib/common/sys/Time.h b/cpp/lib/common/sys/Time.h index 3dd46741d8..4c6951b429 100644 --- a/cpp/lib/common/sys/Time.h +++ b/cpp/lib/common/sys/Time.h @@ -22,6 +22,7 @@ * */ +#include <limits> #include <stdint.h> #ifdef USE_APR @@ -33,7 +34,7 @@ namespace qpid { namespace sys { -/** Time in nanoseconds */ +/** Time in nanoseconds. */ typedef int64_t Time; Time now(); @@ -47,6 +48,9 @@ const Time TIME_USEC = 1000; /** Nanoseconds per nanosecond. */ const Time TIME_NSEC = 1; +/** Value to represent an infinite timeout */ +const Time TIME_INFINITE = std::numeric_limits<Time>::max(); + #ifndef USE_APR struct timespec toTimespec(const Time& t); struct timespec& toTimespec(struct timespec& ts, const Time& t); diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp index 6853833797..1bd23819f4 100644 --- a/cpp/lib/common/sys/apr/APRAcceptor.cpp +++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp @@ -32,20 +32,16 @@ class APRAcceptor : public Acceptor { public: APRAcceptor(int16_t port, int backlog, int threads, bool trace); - virtual int16_t getPort() const; - virtual void run(qpid::sys::SessionHandlerFactory* factory); + virtual int getPort() const; + virtual void run(qpid::sys::SessionHandlerFactory& factory); virtual void shutdown(); private: - void shutdownImpl(); - - private: int16_t port; bool trace; LFProcessor processor; apr_socket_t* socket; volatile bool running; - Mutex shutdownLock; }; // Define generic Acceptor::create() to return APRAcceptor. @@ -69,13 +65,13 @@ Acceptor::~Acceptor() {} CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); } -int16_t APRAcceptor::getPort() const { +int APRAcceptor::getPort() const { apr_sockaddr_t* address; CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); return address->port; } -void APRAcceptor::run(SessionHandlerFactory* factory) { +void APRAcceptor::run(SessionHandlerFactory& factory) { running = true; processor.start(); std::cout << "Listening on port " << getPort() << "..." << std::endl; @@ -90,32 +86,24 @@ void APRAcceptor::run(SessionHandlerFactory* factory) { CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace); - session->init(factory->create(session)); + session->init(factory.create(session)); }else{ - Mutex::ScopedLock locker(shutdownLock); - if(running) { - if(status != APR_EINTR){ - std::cout << "ERROR: " << get_desc(status) << std::endl; - } - shutdownImpl(); + running = false; + if(status != APR_EINTR){ + std::cout << "ERROR: " << get_desc(status) << std::endl; } } } + shutdown(); } void APRAcceptor::shutdown() { - Mutex::ScopedLock locker(shutdownLock); if (running) { - shutdownImpl(); + running = false; + processor.stop(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); } } -void APRAcceptor::shutdownImpl() { - Mutex::ScopedLock locker(shutdownLock); - running = false; - processor.stop(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); -} - }} diff --git a/cpp/lib/common/sys/apr/LFProcessor.cpp b/cpp/lib/common/sys/apr/LFProcessor.cpp index 2b6fc92623..f5d59e31d7 100644 --- a/cpp/lib/common/sys/apr/LFProcessor.cpp +++ b/cpp/lib/common/sys/apr/LFProcessor.cpp @@ -27,8 +27,6 @@ using namespace qpid::sys; using qpid::QpidError; -// TODO aconway 2006-10-12: stopped is read outside locks. -// LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : size(_size), diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp index 16c7ec9c3f..860ecd6b07 100644 --- a/cpp/lib/common/sys/posix/EventChannel.cpp +++ b/cpp/lib/common/sys/posix/EventChannel.cpp @@ -1,4 +1,4 @@ -/* +/* * * Copyright (c) 2006 The Apache Software Foundation * @@ -16,6 +16,13 @@ * */ +// TODO aconway 2006-12-15: Locking review. + +// TODO aconway 2006-12-15: use Descriptor pointers everywhere, +// get them from channel, pass them to Event constructors. +// Eliminate lookup. + + #include <mqueue.h> #include <string.h> #include <iostream> @@ -29,10 +36,10 @@ #include <queue> #include <boost/ptr_container/ptr_map.hpp> -#include <boost/current_function.hpp> +#include <boost/noncopyable.hpp> +#include <boost/bind.hpp> #include <QpidError.h> -#include <sys/Monitor.h> #include "check.h" #include "EventChannel.h" @@ -40,127 +47,319 @@ using namespace std; -// Convenience template to zero out a struct. -template <class S> struct ZeroStruct : public S { - ZeroStruct() { memset(this, 0, sizeof(*this)); } -}; - namespace qpid { namespace sys { +// ================================================================ +// Private class declarations + +namespace { + +typedef enum { IN, OUT } Direction; +typedef std::pair<Event*, Event*> EventPair; +} // namespace + /** - * EventHandler wraps an epoll file descriptor. Acts as private - * interface between EventChannel and subclasses. - * - * Also implements Event interface for events that are not associated - * with a file descriptor and are passed via the message queue. - */ -class EventHandler : public Event, private Monitor + * Queue of events corresponding to one IO direction (IN or OUT). + * Each Descriptor contains two Queues. + */ +class EventChannel::Queue : private boost::noncopyable { public: - EventHandler(int epollSize = 256); - ~EventHandler(); + Queue(Descriptor& container, Direction dir); - int getEpollFd() { return epollFd; } - void epollAdd(int fd, uint32_t epollEvents, Event* event); - void epollMod(int fd, uint32_t epollEvents, Event* event); - void epollDel(int fd); + /** Called by Event classes in prepare() */ + void push(Event* e); - void mqPut(Event* event); - Event* mqGet(); - - protected: - // Should never be called, only complete. - void prepare(EventHandler&) { assert(0); } - Event* complete(EventHandler& eh); + /** Called when epoll wakes. + *@return The next completed event or 0. + */ + Event* wake(uint32_t epollFlags); + + void setBit(uint32_t &epollFlags); + + void shutdown(); private: + typedef std::deque<Event*> EventQ; + + inline bool isMyEvent(uint32_t flags) { return flags | myEvent; } + + Mutex& lock; // Shared with Descriptor. + Descriptor& descriptor; + uint32_t myEvent; // Epoll event flag. + EventQ queue; +}; + + +/** + * Manages a file descriptor in an epoll set. + * + * Can be shutdown and re-activated for the same file descriptor. + */ +class EventChannel::Descriptor : private boost::noncopyable { + public: + Descriptor() : epollFd(-1), myFd(-1), + inQueue(*this, IN), outQueue(*this, OUT) {} + + void activate(int epollFd_, int myFd_); + + /** Epoll woke up for this descriptor. */ + EventPair wake(uint32_t epollEvents); + + /** Shut down: close and remove file descriptor. + * May be re-activated if fd is reused. + */ + void shutdown(); + + // TODO aconway 2006-12-18: Nasty. Need to clean up interaction. + void shutdownUnsafe(); + + bool isShutdown() { return epollFd == -1; } + + Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; } + + private: + void update(); + void epollCtl(int op, uint32_t events); + + Mutex lock; int epollFd; - std::string mqName; - int mqFd; - std::queue<Event*> mqEvents; + int myFd; + Queue inQueue, outQueue; + + friend class Queue; }; -EventHandler::EventHandler(int epollSize) -{ - epollFd = epoll_create(epollSize); - if (epollFd < 0) throw QPID_POSIX_ERROR(errno); + +/** + * Holds the epoll fd, Descriptor map and dispatch queue. + * Most of the epoll work is done by the Descriptors. + */ +class EventChannel::Impl { + public: + Impl(int size = 256); - // Create a POSIX message queue for non-fd events. - // We write one byte and never read it is always ready for read - // when we add it to epoll. - // - ZeroStruct<struct mq_attr> attr; - attr.mq_maxmsg = 1; - attr.mq_msgsize = 1; - do { - char tmpnam[L_tmpnam]; - tmpnam_r(tmpnam); - mqName = tmpnam + 4; // Skip "tmp/" - mqFd = mq_open( - mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr); - if (mqFd < 0) throw QPID_POSIX_ERROR(errno); - } while (mqFd == EEXIST); // Name already taken, try again. + ~Impl(); - static char zero = '\0'; - mq_send(mqFd, &zero, 1, 0); - epollAdd(mqFd, 0, this); + /** + * Registers fd if not already registered. + */ + Descriptor& getDescriptor(int fd); + + /** Wait for an event, return 0 on timeout */ + Event* wait(Time timeout); + + Queue& getDispatchQueue() { return *dispatchQueue; } + + private: + + typedef boost::ptr_map<int, Descriptor> DescriptorMap; + + Mutex lock; + int epollFd; + DescriptorMap descriptors; + int pipe[2]; + Queue* dispatchQueue; +}; + + + +// ================================================================ +// EventChannel::Queue::implementation. + +static const char* shutdownMsg = "Event queue shut down."; + +EventChannel::Queue::Queue(Descriptor& d, Direction dir) : lock(d.lock), descriptor(d), + myEvent(dir==IN ? EPOLLIN : EPOLLOUT) +{} + +void EventChannel::Queue::push(Event* e) { + Mutex::ScopedLock l(lock); + if (descriptor.isShutdown()) + THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg); + queue.push_back(e); + descriptor.update(); } -EventHandler::~EventHandler() { - mq_close(mqFd); - mq_unlink(mqName.c_str()); +void EventChannel::Queue::setBit(uint32_t &epollFlags) { + if (queue.empty()) + epollFlags &= ~myEvent; + else + epollFlags |= myEvent; } -void EventHandler::mqPut(Event* event) { - ScopedLock l(*this); - assert(event != 0); - mqEvents.push(event); - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); +Event* EventChannel::Queue::wake(uint32_t epollFlags) { + // Called with lock held. + if (!queue.empty() && (isMyEvent(epollFlags))) { + Event* e = queue.front()->complete(descriptor); + if (e) { + queue.pop_front(); + return e; + } + } + return 0; +} + +void EventChannel::Queue::shutdown() { + // Mark all pending events with a shutdown exception. + // The server threads will remove and dispatch the events. + // + qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, QPID_ERROR_LOCATION); + for_each(queue.begin(), queue.end(), + boost::bind(&Event::setException, _1, ex)); } -Event* EventHandler::mqGet() { - ScopedLock l(*this); - if (mqEvents.empty()) - return 0; - Event* event = mqEvents.front(); - mqEvents.pop(); - if(!mqEvents.empty()) - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); - return event; + +// ================================================================ +// Descriptor + + +void EventChannel::Descriptor::activate(int epollFd_, int myFd_) { + Mutex::ScopedLock l(lock); + assert(myFd < 0 || (myFd == myFd_)); // Can't change fd. + if (epollFd < 0) { // Means we're not polling. + epollFd = epollFd_; + myFd = myFd_; + epollCtl(EPOLL_CTL_ADD, 0); + } } -void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event) -{ - ZeroStruct<struct epoll_event> ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) +void EventChannel::Descriptor::shutdown() { + Mutex::ScopedLock l(lock); + shutdownUnsafe(); +} + +void EventChannel::Descriptor::shutdownUnsafe() { + // Caller holds lock. + ::close(myFd); + epollFd = -1; // Indicate we are not polling. + inQueue.shutdown(); + outQueue.shutdown(); + epollCtl(EPOLL_CTL_DEL, 0); +} + +void EventChannel::Descriptor::update() { + // Caller holds lock. + uint32_t events = EPOLLONESHOT | EPOLLERR | EPOLLHUP; + inQueue.setBit(events); + outQueue.setBit(events); + epollCtl(EPOLL_CTL_MOD, events); +} + +void EventChannel::Descriptor::epollCtl(int op, uint32_t events) { + // Caller holds lock + assert(!isShutdown()); + struct epoll_event ee; + memset(&ee, 0, sizeof(ee)); + ee.data.ptr = this; + ee.events = events; + int status = ::epoll_ctl(epollFd, op, myFd, &ee); + if (status < 0) throw QPID_POSIX_ERROR(errno); + } +} + + +EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) { + Mutex::ScopedLock l(lock); + cout << "DEBUG: " << std::hex << epollEvents << std::dec << endl; + // If we have an error: + if (epollEvents & (EPOLLERR | EPOLLHUP)) { + shutdownUnsafe(); + // Complete both sides on error so the event can fail and + // mark itself with an exception. + epollEvents |= EPOLLIN | EPOLLOUT; + } + EventPair ready(inQueue.wake(epollEvents), outQueue.wake(epollEvents)); + update(); + return ready; } -void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event) + +// ================================================================ +// EventChannel::Impl + + +EventChannel::Impl::Impl(int epollSize): + epollFd(-1), dispatchQueue(0) { - ZeroStruct<struct epoll_event> ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) - throw QPID_POSIX_ERROR(errno); + // Create the epoll file descriptor. + epollFd = epoll_create(epollSize); + QPID_POSIX_CHECK(epollFd); + + // Create a pipe and write a single byte. The byte is never + // read so the pipes read fd is always ready for read. + // We activate the FD when there are messages in the queue. + QPID_POSIX_CHECK(::pipe(pipe)); + static char zero = '\0'; + QPID_POSIX_CHECK(::write(pipe[1], &zero, 1)); + dispatchQueue = &getDescriptor(pipe[0]).getQueue(IN); } -void EventHandler::epollDel(int fd) { - if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0) - throw QPID_POSIX_ERROR(errno); +EventChannel::Impl::~Impl() { + close(epollFd); + close(pipe[0]); + close(pipe[1]); } -Event* EventHandler::complete(EventHandler& eh) + +/** + * Wait for epoll to wake up, return the descriptor or 0 on timeout. + */ +Event* EventChannel::Impl::wait(Time timeoutNs) { - assert(&eh == this); - Event* event = mqGet(); - return event==0 ? 0 : event->complete(eh); + // No lock, all thread safe calls or local variables: + // + const long timeoutMs = + (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC; + struct epoll_event ee; + Event* event = 0; + bool doSwap = true; + + // Loop till we get a completed event. Some events may repost + // themselves and return 0, e.g. incomplete read or write events. + // + while (!event) { + int n = epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe. + if (n == 0) // Timeout + return 0; + if (n < 0 && errno != EINTR) // Interrupt, ignore it. + continue; + if (n < 0) + throw QPID_POSIX_ERROR(errno); + assert(n == 1); + Descriptor* ed = + reinterpret_cast<Descriptor*>(ee.data.ptr); + assert(ed); + EventPair ready = ed->wake(ee.events); + + // We can only return one event so if both completed push one + // onto the dispatch queue to be dispatched in another thread. + if (ready.first && ready.second) { + // Keep it fair: in & out take turns to be returned first. + if (doSwap) + swap(ready.first, ready.second); + doSwap = !doSwap; + event = ready.first; + dispatchQueue->push(ready.second); + } + else { + event = ready.first ? ready.first : ready.second; + } + } + return event; } - + +EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) { + Mutex::ScopedLock l(lock); + Descriptor& ed = descriptors[fd]; + ed.activate(epollFd, fd); + return ed; +} + + // ================================================================ // EventChannel @@ -168,157 +367,138 @@ EventChannel::shared_ptr EventChannel::create() { return shared_ptr(new EventChannel()); } -EventChannel::EventChannel() : handler(new EventHandler()) {} +EventChannel::EventChannel() : impl(new EventChannel::Impl()) {} EventChannel::~EventChannel() {} -void EventChannel::postEvent(Event& e) +void EventChannel::post(Event& e) { - e.prepare(*handler); + e.prepare(*impl); } -Event* EventChannel::getEvent() -{ - static const int infiniteTimeout = -1; - ZeroStruct<struct epoll_event> epollEvent; - - // Loop until we can complete the event. Some events may re-post - // themselves and return 0 from complete, e.g. partial reads. // - Event* event = 0; - while (event == 0) { - int eventCount = epoll_wait(handler->getEpollFd(), - &epollEvent, 1, infiniteTimeout); - if (eventCount < 0) { - if (errno != EINTR) { - // TODO aconway 2006-11-28: Proper handling/logging of errors. - cerr << BOOST_CURRENT_FUNCTION << " ignoring error " - << PosixError::getMessage(errno) << endl; - assert(0); - } - } - else if (eventCount == 1) { - event = reinterpret_cast<Event*>(epollEvent.data.ptr); - assert(event != 0); - try { - event = event->complete(*handler); - } - catch (const Exception& e) { - if (event) - event->setError(e); - } - catch (const std::exception& e) { - if (event) - event->setError(e); - } - } - } - return event; +void EventChannel::post(Event* e) { + assert(e); + post(*e); } -Event::~Event() {} - -void Event::prepare(EventHandler& handler) +Event* EventChannel::wait(Time timeoutNs) { - handler.mqPut(this); + return impl->wait(timeoutNs); } -bool Event::hasError() const { - return error; -} -void Event::throwIfError() throw (Exception) { - if (hasError()) - error.throwSelf(); +// ================================================================ +// Event and subclasses. + +Event::~Event() {} + +Exception::shared_ptr_const Event::getException() const { + return exception; } -Event* Event::complete(EventHandler&) -{ - return this; +void Event::throwIfException() { + if (getException()) + exception->throwSelf(); } void Event::dispatch() { + if (!callback.empty()) + callback(); +} + +void Event::setException(const std::exception& e) { + const Exception* ex = dynamic_cast<const Exception*>(&e); + if (ex) + exception.reset(ex->clone().release()); + else + exception.reset(new Exception(e)); +#ifndef NDEBUG + // Throw and re-catch the exception. Has no effect on the + // program but it triggers debuggers watching for throw. The + // context that sets the exception is more informative for + // debugging purposes than the one that ultimately throws it. + // try { - if (!callback.empty()) - callback(); - } catch (const std::exception&) { - throw; - } catch (...) { - throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception."); + throwIfException(); } + catch (...) { } // Ignored. +#endif } -void Event::setError(const ExceptionHolder& e) { - error = e; + +void ReadEvent::prepare(EventChannel::Impl& impl) { + impl.getDescriptor(descriptor).getQueue(IN).push(this); } -void ReadEvent::prepare(EventHandler& handler) +Event* ReadEvent::complete(EventChannel::Descriptor& ed) { - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); + ssize_t n = ::read(descriptor, + static_cast<char*>(buffer) + bytesRead, + size - bytesRead); + + if (n < 0 && errno != EAGAIN) { // Error + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. + } + else if (n == 0) { // End of file + // TODO aconway 2006-12-13: Don't treat EOF as exception + // unless we're partway thru a !noWait read. + setException(QPID_POSIX_ERROR(ENODATA)); + ed.shutdownUnsafe(); // Called with lock held. + } + else { + if (n > 0) // possible that n < 0 && errno == EAGAIN + bytesRead += n; + if (bytesRead < size && !noWait) { + // Continue reading, not enough data. + return 0; + } + } + return this; } -ssize_t ReadEvent::doRead() { - ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received, - size - received); - if (n > 0) received += n; - return n; + +void WriteEvent::prepare(EventChannel::Impl& impl) { + impl.getDescriptor(descriptor).getQueue(OUT).push(this); } -Event* ReadEvent::complete(EventHandler& handler) + +Event* WriteEvent::complete(EventChannel::Descriptor& ed) { - // Read as much as possible without blocking. - ssize_t n = doRead(); - while (n > 0 && received < size) doRead(); - - if (received == size) { - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - return this; - } - else if (n <0 && (errno == EAGAIN)) { - // Keep polling for more. - handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this); + ssize_t n = ::write(descriptor, + static_cast<const char*>(buffer) + bytesWritten, + size - bytesWritten); + if(n < 0 && errno == EAGAIN && noWait) { return 0; } - else { - // Unexpected EOF or error. Throw ENODATA for EOF. - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA); + if (n < 0 || (bytesWritten += n) < size) { + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. } + return this; } -void WriteEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this); +void AcceptEvent::prepare(EventChannel::Impl& impl) { + impl.getDescriptor(descriptor).getQueue(IN).push(this); } -Event* WriteEvent::complete(EventHandler& handler) +Event* AcceptEvent::complete(EventChannel::Descriptor& ed) { - ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written, - size - written); - if (n < 0) throw QPID_POSIX_ERROR(errno); - written += n; - if(written < size) { - // Keep polling. - handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this); - return 0; + accepted = ::accept(descriptor, 0, 0); + if (accepted < 0) { + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. } - written = 0; // Reset for re-use. - handler.epollDel(descriptor); return this; } -void AcceptEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +void DispatchEvent::prepare(EventChannel::Impl& impl) { + impl.getDispatchQueue().push(this); } -Event* AcceptEvent::complete(EventHandler& handler) +Event* DispatchEvent::complete(EventChannel::Descriptor&) { - handler.epollDel(descriptor); - accepted = ::accept(descriptor, 0, 0); - if (accepted < 0) throw QPID_POSIX_ERROR(errno); return this; } diff --git a/cpp/lib/common/sys/posix/EventChannel.h b/cpp/lib/common/sys/posix/EventChannel.h index 49c7fce740..60c4026fbc 100644 --- a/cpp/lib/common/sys/posix/EventChannel.h +++ b/cpp/lib/common/sys/posix/EventChannel.h @@ -19,8 +19,11 @@ * */ -#include <SharedObject.h> -#include <ExceptionHolder.h> +#include "SharedObject.h" +#include "Exception.h" +#include "sys/Monitor.h" +#include "sys/Time.h" + #include <boost/function.hpp> #include <memory> @@ -28,11 +31,47 @@ namespace qpid { namespace sys { class Event; -class EventHandler; -class EventChannel; + +/** + * Channel to post and wait for events. + */ +class EventChannel : public qpid::SharedObject<EventChannel> +{ + public: + static shared_ptr create(); + + ~EventChannel(); + + /** Post an event to the channel. */ + void post(Event& event); + + /** Post an event to the channel. Must not be 0. */ + void post(Event* event); + + /** + * Wait for the next complete event, up to timeout. + *@return Pointer to event or 0 if timeout elapses. + */ + Event* wait(Time timeout = TIME_INFINITE); + + class Impl; + class Queue; + class Descriptor; + + private: + + EventChannel(); + + Mutex lock; + boost::shared_ptr<Impl> impl; +}; /** * Base class for all Events. + * Derived classes define events representing various async IO operations. + * When an event is complete, it is returned by the EventChannel to + * a thread calling wait. The thread will call Event::dispatch() to + * execute code associated with event completion. */ class Event { @@ -40,135 +79,137 @@ class Event /** Type for callback when event is dispatched */ typedef boost::function0<void> Callback; - /** - * Create an event with optional callback. - * Instances of Event are sent directly through the channel. - * Derived classes define additional waiting behaviour. - *@param cb A callback functor that is invoked when dispatch() is called. - */ - Event(Callback cb = 0) : callback(cb) {} - virtual ~Event(); /** Call the callback provided to the constructor, if any. */ void dispatch(); - /** True if there was an error processing this event */ - bool hasError() const; + /** + *If there was an exception processing this Event, return it. + *@return 0 if there was no exception. Caller must not delete. + */ + qpid::Exception::shared_ptr_const getException() const; + + /** If getException() throw the corresponding exception. */ + void throwIfException(); + + /** Set the dispatch callback. */ + void setCallback(Callback cb) { callback = cb; } - /** If hasError() throw the corresponding exception. */ - void throwIfError() throw(Exception); + /** Set the exception. */ + void setException(const std::exception& e); protected: - virtual void prepare(EventHandler&); - virtual Event* complete(EventHandler&); - void setError(const ExceptionHolder& e); + Event(Callback cb=0) : callback(cb) {} + + virtual void prepare(EventChannel::Impl&) = 0; + virtual Event* complete(EventChannel::Descriptor&) = 0; Callback callback; - ExceptionHolder error; + Exception::shared_ptr_const exception; friend class EventChannel; - friend class EventHandler; + friend class EventChannel::Queue; }; -template <class BufT> -class IOEvent : public Event { +/** + * An event that does not wait for anything, it is processed + * immediately by one of the channel threads. + */ +class DispatchEvent : public Event { public: - void getDescriptor() const { return descriptor; } - size_t getSize() const { return size; } - BufT getBuffer() const { return buffer; } - + DispatchEvent(Callback cb=0) : Event(cb) {} + protected: - IOEvent(int fd, Callback cb, size_t sz, BufT buf) : - Event(cb), descriptor(fd), buffer(buf), size(sz) {} + void prepare(EventChannel::Impl&); + Event* complete(EventChannel::Descriptor&); +}; + +// Utility base class. +class FDEvent : public Event { + public: + int getDescriptor() const { return descriptor; } + protected: + FDEvent(Callback cb = 0, int fd = 0) + : Event(cb), descriptor(fd) {} int descriptor; - BufT buffer; - size_t size; }; +// Utility base class +class IOEvent : public FDEvent { + public: + size_t getSize() const { return size; } + + protected: + IOEvent(Callback cb, int fd, size_t sz, bool noWait_) : + FDEvent(cb, fd), size(sz), noWait(noWait_) {} + + size_t size; + bool noWait; +}; + /** Asynchronous read event */ -class ReadEvent : public IOEvent<void*> +class ReadEvent : public IOEvent { public: - explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) : - IOEvent<void*>(fd, cb, sz, buf), received(0) {} + explicit ReadEvent( + int fd=-1, void* buf=0, size_t sz=0, + Callback cb=0, bool noWait=false + ) : IOEvent(cb, fd, sz, noWait), buffer(buf), bytesRead(0) {} + void* getBuffer() const { return buffer; } + size_t getBytesRead() const { return bytesRead; } + private: - void prepare(EventHandler&); - Event* complete(EventHandler&); + void prepare(EventChannel::Impl&); + Event* complete(EventChannel::Descriptor&); ssize_t doRead(); - size_t received; + void* buffer; + size_t bytesRead; }; /** Asynchronous write event */ -class WriteEvent : public IOEvent<const void*> +class WriteEvent : public IOEvent { public: explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0, Callback cb=0) : - IOEvent<const void*>(fd, cb, sz, buf), written(0) {} + IOEvent(cb, fd, sz, noWait), buffer(buf), bytesWritten(0) {} - protected: - void prepare(EventHandler&); - Event* complete(EventHandler&); + const void* getBuffer() const { return buffer; } + size_t getBytesWritten() const { return bytesWritten; } private: + void prepare(EventChannel::Impl&); + Event* complete(EventChannel::Descriptor&); ssize_t doWrite(); - size_t written; + + const void* buffer; + size_t bytesWritten; }; + /** Asynchronous socket accept event */ -class AcceptEvent : public Event +class AcceptEvent : public FDEvent { public: /** Accept a connection on fd. */ explicit AcceptEvent(int fd=-1, Callback cb=0) : - Event(cb), descriptor(fd), accepted(0) {} - - /** Get descriptor for server socket */ + FDEvent(cb, fd), accepted(0) {} + + /** Get descriptor for accepted server socket */ int getAcceptedDesscriptor() const { return accepted; } private: - void prepare(EventHandler&); - Event* complete(EventHandler&); + void prepare(EventChannel::Impl&); + Event* complete(EventChannel::Descriptor&); - int descriptor; int accepted; }; -class QueueSet; - -/** - * Channel to post and wait for events. - */ -class EventChannel : public qpid::SharedObject<EventChannel> -{ - public: - static shared_ptr create(); - - ~EventChannel(); - - /** Post an event to the channel. */ - void postEvent(Event& event); - - /** Post an event to the channel. Must not be 0. */ - void postEvent(Event* event) { postEvent(*event); } - - /** - * Wait for the next complete event. - *@return Pointer to event. Will never return 0. - */ - Event* getEvent(); - - private: - EventChannel(); - boost::shared_ptr<EventHandler> handler; -}; - - }} diff --git a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp index 7cd6f60902..28f9beb44e 100644 --- a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp +++ b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp @@ -139,11 +139,11 @@ void EventChannelAcceptor::accept() shutdown(); return; } - // TODO aconway 2006-11-29: Need to reap closed connections also. int fd = acceptEvent.getAcceptedDesscriptor(); + threads->post(acceptEvent); // Keep accepting. + // TODO aconway 2006-11-29: Need to reap closed connections also. connections.push_back( new EventChannelConnection(threads, *factory, fd, fd, isTrace)); - threads->post(acceptEvent); // Keep accepting. } }} // namespace qpid::sys diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.cpp b/cpp/lib/common/sys/posix/EventChannelThreads.cpp index 95e699e0b0..787da72ffa 100644 --- a/cpp/lib/common/sys/posix/EventChannelThreads.cpp +++ b/cpp/lib/common/sys/posix/EventChannelThreads.cpp @@ -16,26 +16,40 @@ * */ -#include "EventChannelThreads.h" -#include <sys/Runnable.h> #include <iostream> -using namespace std; +#include <limits> + #include <boost/bind.hpp> +#include <sys/Runnable.h> + +#include "EventChannelThreads.h" + namespace qpid { namespace sys { +const size_t EventChannelThreads::unlimited = + std::numeric_limits<size_t>::max(); + EventChannelThreads::shared_ptr EventChannelThreads::create( - EventChannel::shared_ptr ec) + EventChannel::shared_ptr ec, size_t min, size_t max +) { - return EventChannelThreads::shared_ptr(new EventChannelThreads(ec)); + return EventChannelThreads::shared_ptr( + new EventChannelThreads(ec, min, max)); } -EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) : - channel(ec), nWaiting(0), state(RUNNING) +EventChannelThreads::EventChannelThreads( + EventChannel::shared_ptr ec, size_t min, size_t max) : + minThreads(std::max(size_t(1), min)), + maxThreads(std::min(min, max)), + channel(ec), + nWaiting(0), + state(RUNNING) { - // TODO aconway 2006-11-15: Estimate initial threads based on CPUs. - addThread(); + Monitor::ScopedLock l(monitor); + while (workers.size() < minThreads) + workers.push_back(Thread(*this)); } EventChannelThreads::~EventChannelThreads() { @@ -43,34 +57,37 @@ EventChannelThreads::~EventChannelThreads() { join(); } +// Termination marker event. +static DispatchEvent terminate; + void EventChannelThreads::shutdown() { - ScopedLock lock(*this); + Monitor::ScopedLock lock(monitor); if (state != RUNNING) // Already shutting down. return; + state = TERMINATING; for (size_t i = 0; i < workers.size(); ++i) { - channel->postEvent(terminate); + channel->post(terminate); } - state = TERMINATE_SENT; - notify(); // Wake up one join() thread. + monitor.notify(); // Wake up one join() thread. } void EventChannelThreads::join() { { - ScopedLock lock(*this); + Monitor::ScopedLock lock(monitor); while (state == RUNNING) // Wait for shutdown to start. - wait(); + monitor.wait(); if (state == SHUTDOWN) // Shutdown is complete return; if (state == JOINING) { // Someone else is doing the join. while (state != SHUTDOWN) - wait(); + monitor.wait(); return; } // I'm the joining thread - assert(state == TERMINATE_SENT); + assert(state == TERMINATING); state = JOINING; } // Drop the lock. @@ -79,12 +96,13 @@ void EventChannelThreads::join() workers[i].join(); } state = SHUTDOWN; - notifyAll(); // Notify other join() threaeds. + monitor.notifyAll(); // Notify any other join() threads. } void EventChannelThreads::addThread() { - ScopedLock l(*this); - workers.push_back(Thread(*this)); + Monitor::ScopedLock l(monitor); + if (workers.size() < maxThreads) + workers.push_back(Thread(*this)); } void EventChannelThreads::run() @@ -93,26 +111,22 @@ void EventChannelThreads::run() AtomicCount::ScopedIncrement inc(nWaiting); try { while (true) { - Event* e = channel->getEvent(); + Event* e = channel->wait(); assert(e != 0); - if (e == &terminate) { + if (e == &terminate) return; - } AtomicCount::ScopedDecrement dec(nWaiting); - // I'm no longer waiting, make sure someone is. - if (dec == 0) + // Make sure there's at least one waiting thread. + if (dec == 0 && state == RUNNING) addThread(); e->dispatch(); } } catch (const std::exception& e) { - // TODO aconway 2006-11-15: need better logging across the board. - std::cerr << "EventChannelThreads::run() caught: " << e.what() - << std::endl; + Exception::log(e, "Exception in EventChannelThreads::run()"); } catch (...) { - std::cerr << "EventChannelThreads::run() caught unknown exception." - << std::endl; + Exception::logUnknown("Exception in EventChannelThreads::run()"); } } diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.h b/cpp/lib/common/sys/posix/EventChannelThreads.h index 98403c0869..721a5e9d24 100644 --- a/cpp/lib/common/sys/posix/EventChannelThreads.h +++ b/cpp/lib/common/sys/posix/EventChannelThreads.h @@ -20,11 +20,12 @@ */ #include <vector> -#include <Exception.h> -#include <sys/Time.h> -#include <sys/Monitor.h> -#include <sys/Thread.h> -#include <sys/AtomicCount.h> +#include "Exception.h" +#include "sys/AtomicCount.h" +#include "sys/Monitor.h" +#include "sys/Thread.h" +#include "sys/Time.h" + #include "EventChannel.h" namespace qpid { @@ -33,26 +34,36 @@ namespace sys { /** Dynamic thread pool serving an EventChannel. - Threads run a loop { e = getEvent(); e->dispatch(); } + Threads run a loop { e = wait(); e->dispatch(); } The size of the thread pool is automatically adjusted to optimal size. */ class EventChannelThreads : public qpid::SharedObject<EventChannelThreads>, - public sys::Monitor, private sys::Runnable + private sys::Runnable { public: - /** Create the thread pool and start initial threads. */ + /** Constant to represent an unlimited number of threads */ + static const size_t unlimited; + + /** + * Create the thread pool and start initial threads. + * @param minThreads Pool will initialy contain minThreads threads and + * will never shrink to less until shutdown. + * @param maxThreads Pool will never grow to more than maxThreads. + */ static EventChannelThreads::shared_ptr create( - EventChannel::shared_ptr channel + EventChannel::shared_ptr channel = EventChannel::create(), + size_t minThreads = 1, + size_t maxThreads = unlimited ); ~EventChannelThreads(); /** Post event to the underlying channel */ - void postEvent(Event& event) { channel->postEvent(event); } + void post(Event& event) { channel->post(event); } /** Post event to the underlying channel Must not be 0. */ - void postEvent(Event* event) { channel->postEvent(event); } + void post(Event* event) { channel->post(event); } /** * Terminate all threads. @@ -68,21 +79,25 @@ class EventChannelThreads : private: typedef std::vector<sys::Thread> Threads; typedef enum { - RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN + RUNNING, TERMINATING, JOINING, SHUTDOWN } State; - EventChannelThreads(EventChannel::shared_ptr underlyingChannel); + EventChannelThreads( + EventChannel::shared_ptr channel, size_t min, size_t max); + void addThread(); void run(); bool keepRunning(); void adjustThreads(); + Monitor monitor; + size_t minThreads; + size_t maxThreads; EventChannel::shared_ptr channel; Threads workers; sys::AtomicCount nWaiting; State state; - Event terminate; }; diff --git a/cpp/lib/common/sys/posix/PosixAcceptor.cpp b/cpp/lib/common/sys/posix/PosixAcceptor.cpp index 842aa76f36..e69de29bb2 100644 --- a/cpp/lib/common/sys/posix/PosixAcceptor.cpp +++ b/cpp/lib/common/sys/posix/PosixAcceptor.cpp @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <sys/Acceptor.h> -#include <Exception.h> - -namespace qpid { -namespace sys { - -namespace { -void fail() { throw qpid::Exception("PosixAcceptor not implemented"); } -} - -class PosixAcceptor : public Acceptor { - public: - virtual int16_t getPort() const { fail(); return 0; } - virtual void run(qpid::sys::SessionHandlerFactory* ) { fail(); } - virtual void shutdown() { fail(); } -}; - -// Define generic Acceptor::create() to return APRAcceptor. - Acceptor::shared_ptr Acceptor::create(int16_t , int, int, bool) -{ - return Acceptor::shared_ptr(new PosixAcceptor()); -} - -// Must define Acceptor virtual dtor. -Acceptor::~Acceptor() {} - -}} diff --git a/cpp/lib/common/sys/posix/Socket.cpp b/cpp/lib/common/sys/posix/Socket.cpp index 5bd13742f6..fc82b4e7e5 100644 --- a/cpp/lib/common/sys/posix/Socket.cpp +++ b/cpp/lib/common/sys/posix/Socket.cpp @@ -96,6 +96,8 @@ Socket::recv(void* data, size_t size) int Socket::listen(int port, int backlog) { struct sockaddr_in name; + static const int ON = 1; + setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &ON, sizeof(ON)); name.sin_family = AF_INET; name.sin_port = htons(port); name.sin_addr.s_addr = 0; @@ -111,8 +113,15 @@ int Socket::listen(int port, int backlog) return ntohs(name.sin_port); } +Socket Socket::accept() { + int accepted = ::accept(socket, 0, 0); + if (accepted < 0) + throw (QPID_POSIX_ERROR(errno)); + return Socket(accepted); +} + -int Socket::fd() +int Socket::fd()const { return socket; } diff --git a/cpp/lib/common/sys/posix/check.cpp b/cpp/lib/common/sys/posix/check.cpp index 408679caa8..4ddacb3fbd 100644 --- a/cpp/lib/common/sys/posix/check.cpp +++ b/cpp/lib/common/sys/posix/check.cpp @@ -32,8 +32,8 @@ PosixError::getMessage(int errNo) return std::string(strerror_r(errNo, buf, sizeof(buf))); } -PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw() - : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc) +PosixError::PosixError(int errNo, const qpid::QpidError::Location& l) throw() + : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), l) { } }} diff --git a/cpp/lib/common/sys/posix/check.h b/cpp/lib/common/sys/posix/check.h index 5afbe8f5a8..052fb08580 100644 --- a/cpp/lib/common/sys/posix/check.h +++ b/cpp/lib/common/sys/posix/check.h @@ -37,13 +37,15 @@ class PosixError : public qpid::QpidError public: static std::string getMessage(int errNo); - PosixError(int errNo, const qpid::SrcLine& location) throw(); + PosixError(int errNo, const qpid::QpidError::Location& location) throw(); ~PosixError() throw() {} int getErrNo() { return errNo; } - Exception* clone() const throw() { return new PosixError(*this); } + Exception::auto_ptr clone() const throw() { + return Exception::auto_ptr(new PosixError(*this)); + } void throwSelf() { throw *this; } @@ -54,9 +56,17 @@ class PosixError : public qpid::QpidError }} /** Create a PosixError for the current file/line and errno. */ -#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE) +#define QPID_POSIX_ERROR(ERRNO) \ + ::qpid::sys::PosixError((ERRNO), QPID_ERROR_LOCATION) -/** Throw a posix error if errNo is non-zero */ -#define QPID_POSIX_THROW_IF(ERRNO) \ - if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO)) +/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */ +#define QPID_POSIX_CHECK(RESULT) \ + if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno)) + +/** Throw QPID_POSIX_ERROR(ERRNO) if ERRNO is non zero */ +#define QPID_POSIX_THROW_IF(ERRNO) \ + do { int e = (ERRNO); if (e) throw QPID_POSIX_ERROR(e); } while(0) + + + #endif /*!_posix_check_h*/ |