diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-05-22 15:18:08 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-05-22 15:18:08 +0000 |
commit | f646350b5e59ccf49f1253bd55f98d062769f2ee (patch) | |
tree | ba8143aa842ced96eaa450cc236a96abdd8b9c05 /cpp/src | |
parent | b8f00ac2a358a02d0cdae2dc098f2bacb2af44d5 (diff) | |
download | qpid-python-f646350b5e59ccf49f1253bd55f98d062769f2ee.tar.gz |
* Split apart platform (threading etc.) from network io
you can now use a posix platform implementation by configuring
--disable-apr-platform
* Changed Time classes to distinguish between absolute times (AbsTime)
and durations (Duration). This should avoid bugs caused by confusing
the two types of time.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@540608 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
43 files changed, 1233 insertions, 647 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 87087ef369..cb17425369 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -30,44 +30,75 @@ qpidd_LDADD = \ sbin_PROGRAMS = qpidd qpidd_SOURCES = qpidd.cpp -apr_src = \ +apr_netio_src = \ qpid/sys/apr/APRAcceptor.cpp \ qpid/sys/apr/APRBase.cpp \ qpid/sys/apr/APRPool.cpp \ qpid/sys/apr/APRSocket.cpp \ qpid/sys/apr/LFProcessor.cpp \ - qpid/sys/apr/LFSessionContext.cpp \ - qpid/sys/apr/Socket.cpp \ - qpid/sys/apr/Thread.cpp + qpid/sys/apr/LFSessionContext.cpp -apr_hdr = \ +apr_netio_hdr = \ qpid/sys/apr/APRBase.h \ qpid/sys/apr/APRPool.h \ qpid/sys/apr/APRSocket.h \ qpid/sys/apr/LFProcessor.h \ qpid/sys/apr/LFSessionContext.h -posix_src = \ - qpid/sys/posix/PosixAcceptor.cpp \ - qpid/sys/posix/Socket.cpp \ - qpid/sys/posix/Thread.cpp \ - qpid/sys/posix/check.cpp \ +apr_plat_src = \ + qpid/sys/apr/Socket.cpp \ + qpid/sys/apr/Time.cpp \ + qpid/sys/apr/Thread.cpp + +apr_plat_hdr = \ + qpid/sys/apr/Condition.h \ + qpid/sys/apr/Module.h \ + qpid/sys/apr/Mutex.h \ + qpid/sys/apr/Socket.h \ + qpid/sys/apr/Thread.h + +posix_netio_src = \ qpid/sys/posix/EventChannel.cpp \ + qpid/sys/posix/EventChannelAcceptor.cpp \ + qpid/sys/posix/EventChannelConnection.cpp \ qpid/sys/posix/EventChannelThreads.cpp -posix_hdr = \ - qpid/sys/posix/check.h \ +posix_netio_hdr = \ qpid/sys/posix/EventChannel.h \ qpid/sys/posix/EventChannelThreads.h -if USE_APR - platform_dist=$(posix_src) $(posix_hdr) - platform_src = $(apr_src) - platform_hdr = $(apr_hdr) +posix_plat_src = \ + qpid/sys/posix/check.cpp \ + qpid/sys/posix/Socket.cpp \ + qpid/sys/posix/Time.cpp \ + qpid/sys/posix/Thread.cpp + +posix_plat_hdr = \ + qpid/sys/posix/check.h \ + qpid/sys/posix/Condition.h \ + qpid/sys/posix/Module.h \ + qpid/sys/posix/Mutex.h \ + qpid/sys/posix/Socket.h \ + qpid/sys/posix/Thread.h + +if USE_APR_NETIO + platform_dist=$(posix_netio_src) $(posix_netio_hdr) + platform_src = $(apr_netio_src) + platform_hdr = $(apr_netio_hdr) +else + platform_dist=$(apr_netio_src) $(apr_netio_hdr) + platform_src = $(posix_netio_src) + platform_hdr = $(posix_netio_hdr) +endif + +if USE_APR_PLATFORM + platform_dist+=$(posix_plat_src) $(posix_plat_hdr) + platform_src += $(apr_plat_src) + platform_hdr += $(apr_plat_hdr) else - platform_dist =$(apr_src) $(apr_hdr) $(generated_cpp) $(generated_h) - platform_src = $(posix_src) - platform_hdr = $(posix_hdr) + platform_dist+=$(apr_plat_src) $(apr_plat_hdr) + platform_src += $(posix_plat_src) + platform_hdr += $(posix_plat_hdr) endif lib_LTLIBRARIES = libqpidcommon.la libqpidbroker.la libqpidclient.la @@ -111,7 +142,6 @@ libqpidcommon_la_SOURCES = \ qpid/ExceptionHolder.cpp \ qpid/QpidError.cpp \ qpid/sys/Runnable.cpp \ - qpid/sys/Time.cpp \ qpid/sys/ProducerConsumer.cpp \ qpid/CommonOptions.cpp diff --git a/cpp/src/qpid/broker/AutoDelete.cpp b/cpp/src/qpid/broker/AutoDelete.cpp index bfdb8decd2..8f1f38b184 100644 --- a/cpp/src/qpid/broker/AutoDelete.cpp +++ b/cpp/src/qpid/broker/AutoDelete.cpp @@ -25,7 +25,7 @@ using namespace qpid::broker; using namespace qpid::sys; AutoDelete::AutoDelete(QueueRegistry* const _registry, uint32_t _period) - : registry(_registry), period(_period), stopped(true) { } + : registry(_registry), period(_period*TIME_MSEC), stopped(true) { } void AutoDelete::add(Queue::shared_ptr const queue){ Mutex::ScopedLock l(lock); @@ -63,7 +63,7 @@ void AutoDelete::run(){ Monitor::ScopedLock l(monitor); while(!stopped){ process(); - monitor.wait(period*TIME_MSEC); + monitor.wait(AbsTime(now(), period)); } } diff --git a/cpp/src/qpid/broker/AutoDelete.h b/cpp/src/qpid/broker/AutoDelete.h index 24e9c18a68..d4d0f84f4b 100644 --- a/cpp/src/qpid/broker/AutoDelete.h +++ b/cpp/src/qpid/broker/AutoDelete.h @@ -26,6 +26,7 @@ #include "BrokerQueue.h" #include "QueueRegistry.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" namespace qpid { namespace broker{ @@ -34,7 +35,7 @@ namespace qpid { qpid::sys::Monitor monitor; std::queue<Queue::shared_ptr> queues; QueueRegistry* const registry; - uint32_t period; + sys::Duration period; volatile bool stopped; qpid::sys::Thread runner; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index c5e5f82613..de2f2c55f4 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -18,8 +18,8 @@ * under the License. * */ -#include <iostream> -#include <memory> + +#include "Broker.h" #include "qpid/framing/AMQFrame.h" #include "DirectExchange.h" @@ -31,11 +31,15 @@ #include "qpid/framing/ProtocolInitiation.h" #include "RecoveryManagerImpl.h" #include "Connection.h" +#include "qpid/sys/Acceptor.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/TimeoutHandler.h" -#include "Broker.h" +#include <iostream> +#include <memory> + +using qpid::sys::Acceptor; namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index 34c8d79e76..0fbff0f8fd 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -44,11 +44,10 @@ Queue::Queue(const string& _name, uint32_t _autodelete, queueing(false), dispatching(false), next(0), - lastUsed(0), exclusive(0), persistenceId(0) { - if(autodelete) lastUsed = now()/TIME_MSEC; + if(autodelete) lastUsed = now(); } Queue::~Queue(){} @@ -135,7 +134,7 @@ void Queue::consume(Consumer* c, bool requestExclusive){ "Exclusive access denied.") %getName()); exclusive = c; } - if(autodelete && consumers.empty()) lastUsed = 0; + if(autodelete && consumers.empty()) lastUsed = FAR_FUTURE; consumers.push_back(c); } @@ -144,7 +143,7 @@ void Queue::cancel(Consumer* c){ Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c); if (i != consumers.end()) consumers.erase(i); - if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC; + if(autodelete && consumers.empty()) lastUsed = now(); if(exclusive == c) exclusive = 0; } @@ -193,7 +192,7 @@ uint32_t Queue::getConsumerCount() const{ bool Queue::canAutoDelete() const{ Mutex::ScopedLock locker(lock); - return lastUsed && (now()*TIME_MSEC - lastUsed > autodelete); + return Duration(lastUsed, now()) > autodelete; } void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg) diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index c45b35566e..e1e69cbc60 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -58,7 +58,7 @@ namespace qpid { typedef std::queue<Message::shared_ptr> Messages; const string name; - const uint32_t autodelete; + const sys::Duration autodelete; MessageStore* const store; const ConnectionToken* const owner; Consumers consumers; @@ -67,7 +67,7 @@ namespace qpid { bool dispatching; int next; mutable qpid::sys::Mutex lock; - int64_t lastUsed; + sys::AbsTime lastUsed; Consumer* exclusive; mutable uint64_t persistenceId; framing::FieldTable settings; diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 8e2ff9a09c..3447467fda 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -37,7 +37,6 @@ Connector::Connector( send_buffer_size(buffer_size), version(ver), closed(true), - lastIn(0), lastOut(0), timeout(0), idleIn(0), idleOut(0), timeoutHandler(0), @@ -100,7 +99,7 @@ void Connector::writeToSocket(char* data, size_t available){ while(written < available && !closed){ ssize_t sent = socket.send(data + written, available-written); if(sent > 0) { - lastOut = now() * TIME_MSEC; + lastOut = now(); written += sent; } } @@ -124,9 +123,9 @@ bool Connector::markClosed(){ void Connector::checkIdle(ssize_t status){ if(timeoutHandler){ - Time t = now() * TIME_MSEC; + AbsTime t = now(); if(status == Socket::SOCKET_TIMEOUT) { - if(idleIn && (t - lastIn > idleIn)){ + if(idleIn && (Duration(lastIn, t) > idleIn)){ timeoutHandler->idleIn(); } } @@ -136,14 +135,14 @@ void Connector::checkIdle(ssize_t status){ else { lastIn = t; } - if(idleOut && (t - lastOut > idleOut)){ + if(idleOut && (Duration(lastOut, t) > idleOut)){ timeoutHandler->idleOut(); } } } void Connector::setReadTimeout(uint16_t t){ - idleIn = t * 1000;//t is in secs + idleIn = t * TIME_SEC;//t is in secs if(idleIn && (!timeout || idleIn < timeout)){ timeout = idleIn; setSocketTimeout(); @@ -152,7 +151,7 @@ void Connector::setReadTimeout(uint16_t t){ } void Connector::setWriteTimeout(uint16_t t){ - idleOut = t * 1000;//t is in secs + idleOut = t * TIME_SEC;//t is in secs if(idleOut && (!timeout || idleOut < timeout)){ timeout = idleOut; setSocketTimeout(); @@ -160,7 +159,7 @@ void Connector::setWriteTimeout(uint16_t t){ } void Connector::setSocketTimeout(){ - socket.setTimeout(timeout*TIME_MSEC); + socket.setTimeout(timeout); } void Connector::setTimeoutHandler(TimeoutHandler* handler){ diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index 10bde1b8ea..56eea95dd1 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -30,8 +30,10 @@ #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Socket.h" +#include "qpid/sys/Time.h" namespace qpid { @@ -48,11 +50,11 @@ class Connector : public framing::OutputHandler, bool closed; sys::Mutex closedLock; - int64_t lastIn; - int64_t lastOut; - int64_t timeout; - uint32_t idleIn; - uint32_t idleOut; + sys::AbsTime lastIn; + sys::AbsTime lastOut; + sys::Duration timeout; + sys::Duration idleIn; + sys::Duration idleOut; sys::TimeoutHandler* timeoutHandler; sys::ShutdownHandler* shutdownHandler; diff --git a/cpp/src/qpid/sys/Condition.h b/cpp/src/qpid/sys/Condition.h index 455b179683..961c15e1ee 100644 --- a/cpp/src/qpid/sys/Condition.h +++ b/cpp/src/qpid/sys/Condition.h @@ -22,107 +22,10 @@ * */ -#include <sys/errno.h> -#include <boost/noncopyable.hpp> -#include "Mutex.h" -#include "Time.h" - -#ifdef USE_APR -#include <apr_thread_cond.h> -#endif - -namespace qpid { -namespace sys { - -/** - * A condition variable for thread synchronization. - */ -class Condition -{ - public: - inline Condition(); - inline ~Condition(); - inline void wait(Mutex&); - inline bool wait(Mutex&, const Time& absoluteTime); - inline void notify(); - inline void notifyAll(); - - private: -#ifdef USE_APR - apr_thread_cond_t* condition; +#ifdef USE_APR_PLATFORM +#include "apr/Condition.h" #else - pthread_cond_t condition; +#include "posix/Condition.h" #endif -}; - - -// APR ================================================================ -#ifdef USE_APR - -Condition::Condition() { - CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); -} - -Condition::~Condition() { - CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); -} - -void Condition::wait(Mutex& mutex) { - CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex.mutex)); -} - -bool Condition::wait(Mutex& mutex, const Time& absoluteTime){ - // APR uses microseconds. - apr_status_t status = - apr_thread_cond_timedwait( - condition, mutex.mutex, absoluteTime/TIME_USEC); - if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status); - return status == 0; -} - -void Condition::notify(){ - CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); -} - -void Condition::notifyAll(){ - CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); -} - -#else -// POSIX ================================================================ - -Condition::Condition() { - QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0)); -} - -Condition::~Condition() { - QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition)); -} - -void Condition::wait(Mutex& mutex) { - QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex)); -} - -bool Condition::wait(Mutex& mutex, const Time& absoluteTime){ - struct timespec ts; - toTimespec(ts, absoluteTime); - int status = pthread_cond_timedwait(&condition, &mutex.mutex, &ts); - if (status != 0) { - if (status == ETIMEDOUT) return false; - throw QPID_POSIX_ERROR(status); - } - return true; -} - -void Condition::notify(){ - QPID_POSIX_THROW_IF(pthread_cond_signal(&condition)); -} - -void Condition::notifyAll(){ - QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition)); -} -#endif /*USE_APR*/ - -}} #endif /*!_sys_Condition_h*/ diff --git a/cpp/src/qpid/sys/Module.h b/cpp/src/qpid/sys/Module.h index cb31018fdc..89cea36a74 100644 --- a/cpp/src/qpid/sys/Module.h +++ b/cpp/src/qpid/sys/Module.h @@ -21,141 +21,12 @@ * under the License. * */ -#include <boost/noncopyable.hpp> -#include <iostream> -#include "qpid/QpidError.h" - -namespace qpid { -namespace sys { -#if USE_APR -#include <apr_dso.h> - typedef apr_dso_handle_t* dso_handle_t; + +#ifdef USE_APR_PLATFORM +#include "apr/Module.h" #else - typedef void* dso_handle_t; +#include "posix/Module.h" #endif - template <class T> class Module : private boost::noncopyable - { - typedef T* create_t(); - typedef void destroy_t(T*); - - dso_handle_t handle; - destroy_t* destroy; - T* ptr; - - void load(const std::string& name); - void unload(); - void* getSymbol(const std::string& name); - - public: - Module(const std::string& name); - T* operator->(); - T* get(); - ~Module() throw(); - }; - -} -} - -using namespace qpid::sys; - -template <class T> Module<T>::Module(const std::string& module) : destroy(0), ptr(0) -{ - load(module); - //TODO: need a better strategy for symbol names to allow multiple - //modules to be loaded without clashes... - - //Note: need the double cast to avoid errors in casting from void* to function pointer with -pedantic - create_t* create = reinterpret_cast<create_t*>(reinterpret_cast<intptr_t>(getSymbol("create"))); - destroy = reinterpret_cast<destroy_t*>(reinterpret_cast<intptr_t>(getSymbol("destroy"))); - ptr = create(); -} - -template <class T> T* Module<T>::operator->() -{ - return ptr; -} - -template <class T> T* Module<T>::get() -{ - return ptr; -} - -template <class T> Module<T>::~Module() throw() -{ - try { - if (handle && ptr) { - destroy(ptr); - } - if (handle) unload(); - } catch (std::exception& e) { - std::cout << "Error while destroying module: " << e.what() << std::endl; - } - destroy = 0; - handle = 0; - ptr = 0; -} - -// APR ================================================================ -#if USE_APR - -#include "apr/APRBase.h" -#include "apr/APRPool.h" - -template <class T> void Module<T>::load(const std::string& name) -{ - CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), APRPool::get())); -} - -template <class T> void Module<T>::unload() -{ - CHECK_APR_SUCCESS(apr_dso_unload(handle)); -} - -template <class T> void* Module<T>::getSymbol(const std::string& name) -{ - apr_dso_handle_sym_t symbol; - CHECK_APR_SUCCESS(apr_dso_sym(&symbol, handle, name.c_str())); - return (void*) symbol; -} - -// POSIX================================================================ -#else - -#include <dlfcn.h> - -template <class T> void Module<T>::load(const std::string& name) -{ - dlerror(); - handle = dlopen(name.c_str(), RTLD_NOW); - const char* error = dlerror(); - if (error) { - THROW_QPID_ERROR(INTERNAL_ERROR, error); - } -} - -template <class T> void Module<T>::unload() -{ - dlerror(); - dlclose(handle); - const char* error = dlerror(); - if (error) { - THROW_QPID_ERROR(INTERNAL_ERROR, error); - } -} - -template <class T> void* Module<T>::getSymbol(const std::string& name) -{ - dlerror(); - void* sym = dlsym(handle, name.c_str()); - const char* error = dlerror(); - if (error) { - THROW_QPID_ERROR(INTERNAL_ERROR, error); - } - return sym; -} - -#endif //if USE_APR - #endif //ifndef _sys_Module_h diff --git a/cpp/src/qpid/sys/Monitor.h b/cpp/src/qpid/sys/Monitor.h index 1b8ae1a527..1d9835675c 100644 --- a/cpp/src/qpid/sys/Monitor.h +++ b/cpp/src/qpid/sys/Monitor.h @@ -25,10 +25,6 @@ #include <sys/errno.h> #include "Condition.h" -#ifdef USE_APR -#include <apr_thread_cond.h> -#endif - namespace qpid { namespace sys { @@ -39,7 +35,7 @@ class Monitor : public Mutex, public Condition { public: using Condition::wait; inline void wait(); - inline bool wait(const Time& absoluteTime); + inline bool wait(const AbsTime& absoluteTime); }; @@ -47,7 +43,7 @@ void Monitor::wait() { Condition::wait(*this); } -bool Monitor::wait(const Time& absoluteTime) { +bool Monitor::wait(const AbsTime& absoluteTime) { return Condition::wait(*this, absoluteTime); } diff --git a/cpp/src/qpid/sys/Mutex.h b/cpp/src/qpid/sys/Mutex.h index 825b519039..4eff6078ae 100644 --- a/cpp/src/qpid/sys/Mutex.h +++ b/cpp/src/qpid/sys/Mutex.h @@ -19,21 +19,9 @@ * */ -#ifdef USE_APR -#include <apr_thread_mutex.h> -#include "apr/APRBase.h" -#include "apr/APRPool.h" -#else -#include <pthread.h> -#include "posix/check.h" -#endif -#include <boost/noncopyable.hpp> - namespace qpid { namespace sys { -class Condition; - /** * Scoped lock template: calls lock() in ctor, unlock() in dtor. * L can be any class with lock() and unlock() functions. @@ -57,109 +45,13 @@ class ScopedUnlock private: L& mutex; }; - -/** - * Mutex lock. - */ -class Mutex : private boost::noncopyable { - public: - typedef ScopedLock<Mutex> ScopedLock; - typedef ScopedUnlock<Mutex> ScopedUnlock; - - inline Mutex(); - inline ~Mutex(); - inline void lock(); - inline void unlock(); - inline void trylock(); - protected: -#ifdef USE_APR - apr_thread_mutex_t* mutex; +}} + +#ifdef USE_APR_PLATFORM +#include "apr/Mutex.h" #else - pthread_mutex_t mutex; +#include "posix/Mutex.h" #endif - friend class Condition; -}; - -#ifdef USE_APR -// APR ================================================================ - -Mutex::Mutex() { - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); -} - -Mutex::~Mutex(){ - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); -} - -void Mutex::lock() { - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); -} -void Mutex::unlock() { - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); -} - -void Mutex::trylock() { - CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); -} - -#else -// POSIX ================================================================ - -/** - * PODMutex is a POD, can be static-initialized with - * PODMutex m = QPID_PODMUTEX_INITIALIZER - */ -struct PODMutex -{ - typedef ScopedLock<PODMutex> ScopedLock; - - inline void lock(); - inline void unlock(); - inline void trylock(); - - // Must be public to be a POD: - pthread_mutex_t mutex; -}; - -#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER } - - -void PODMutex::lock() { - QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); -} -void PODMutex::unlock() { - QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); -} - -void PODMutex::trylock() { - QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); -} - - -Mutex::Mutex() { - QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, 0)); -} - -Mutex::~Mutex(){ - QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex)); -} - -void Mutex::lock() { - QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); -} -void Mutex::unlock() { - QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); -} - -void Mutex::trylock() { - QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); -} - -#endif // USE_APR - -}} - - #endif /*!_sys_Mutex_h*/ diff --git a/cpp/src/qpid/sys/ProducerConsumer.cpp b/cpp/src/qpid/sys/ProducerConsumer.cpp index 56b4c0cdb9..e892f60794 100644 --- a/cpp/src/qpid/sys/ProducerConsumer.cpp +++ b/cpp/src/qpid/sys/ProducerConsumer.cpp @@ -103,7 +103,7 @@ ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p) } ProducerConsumer::ConsumerLock::ConsumerLock( - ProducerConsumer& p, const Time& timeout) : Lock(p) + ProducerConsumer& p, const Duration& timeout) : Lock(p) { if (isOk()) { // Don't wait if timeout==0 @@ -113,7 +113,7 @@ ProducerConsumer::ConsumerLock::ConsumerLock( return; } else { - Time deadline = now() + timeout; + AbsTime deadline(now(), timeout); ScopedIncrement<size_t> inc(pc.waiters); while (pc.items == 0 && !pc.shutdownFlag) { if (!pc.monitor.wait(deadline)) { diff --git a/cpp/src/qpid/sys/ProducerConsumer.h b/cpp/src/qpid/sys/ProducerConsumer.h index fc6434ef48..2a02dab503 100644 --- a/cpp/src/qpid/sys/ProducerConsumer.h +++ b/cpp/src/qpid/sys/ProducerConsumer.h @@ -143,7 +143,7 @@ class ProducerConsumer * If isTimedOut() there was a timeout. * If neither then we were shutdown. */ - ConsumerLock(ProducerConsumer& p, const Time& timeout); + ConsumerLock(ProducerConsumer& p, const Duration& timeout); /** Release locks */ ~ConsumerLock(); diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h index ea03222aed..bb1ef27e65 100644 --- a/cpp/src/qpid/sys/Socket.h +++ b/cpp/src/qpid/sys/Socket.h @@ -22,67 +22,10 @@ * */ -#include <string> -#include "Time.h" - -#ifdef USE_APR -#include <apr_network_io.h> -#endif - -namespace qpid { -namespace sys { - -class Socket -{ - public: - /** Create an initialized TCP socket */ - static Socket createTcp(); - - /** Create a socket wrapper for descriptor. */ -#ifdef USE_APR - Socket(apr_socket_t* descriptor = 0); -#else - Socket(int descriptor = 0); -#endif - - /** Set timeout for read and write */ - void setTimeout(Time interval); - - void connect(const std::string& host, int port); - - void close(); - - enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode; - - /** Returns bytes sent or an ErrorCode value < 0. */ - ssize_t send(const void* data, size_t size); - - /** - * Returns bytes received, an ErrorCode value < 0 or 0 - * if the connection closed in an orderly manner. - */ - ssize_t recv(void* data, size_t size); - - /** Bind to a port and start listening. - *@param port 0 means choose an available port. - *@param backlog maximum number of pending connections. - *@return The bound port. - */ - int listen(int port = 0, int backlog = 10); - - /** Get file descriptor */ - int fd(); - - private: -#ifdef USE_APR - apr_socket_t* socket; +#ifdef USE_APR_PLATFORM +#include "apr/Socket.h" #else - void init() const; - mutable int socket; // Initialized on demand. +#include "posix/Socket.h" #endif -}; - -}} - #endif /*!_sys_Socket_h*/ diff --git a/cpp/src/qpid/sys/Thread.h b/cpp/src/qpid/sys/Thread.h index e52f2a1b3e..fd9be5617e 100644 --- a/cpp/src/qpid/sys/Thread.h +++ b/cpp/src/qpid/sys/Thread.h @@ -22,121 +22,10 @@ * */ -#include "Runnable.h" - -#ifdef USE_APR -#include <apr_thread_proc.h> -#include <apr_portable.h> -#include "apr/APRPool.h" -#include "apr/APRBase.h" -#else -#include "posix/check.h" -#include <pthread.h> -#endif - -namespace qpid { -namespace sys { - -class Thread -{ - public: - inline static Thread current(); - inline static void yield(); - - inline Thread(); - inline explicit Thread(qpid::sys::Runnable*); - inline explicit Thread(qpid::sys::Runnable&); - - inline void join(); - - inline long id(); - - private: -#ifdef USE_APR - static void* APR_THREAD_FUNC runRunnable(apr_thread_t* thread, void *data); - inline Thread(apr_thread_t* t); - apr_thread_t* thread; -#else - static void* runRunnable(void* runnable); - inline Thread(pthread_t); - pthread_t thread; -#endif -}; - - -Thread::Thread() : thread(0) {} - -// APR ================================================================ -#ifdef USE_APR - -Thread::Thread(Runnable* runnable) { - CHECK_APR_SUCCESS( - apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get())); -} - -Thread::Thread(Runnable& runnable) { - CHECK_APR_SUCCESS( - apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get())); -} - -void Thread::join(){ - apr_status_t status; - if (thread != 0) - CHECK_APR_SUCCESS(apr_thread_join(&status, thread)); -} - -long Thread::id() { - return long(thread); -} - -Thread::Thread(apr_thread_t* t) : thread(t) {} - -Thread Thread::current(){ - apr_thread_t* thr; - apr_os_thread_t osthr = apr_os_thread_current(); - CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get())); - return Thread(thr); -} - -void Thread::yield() -{ - apr_thread_yield(); -} - - -// POSIX ================================================================ +#ifdef USE_APR_PLATFORM +#include "apr/Thread.h" #else - -Thread::Thread(Runnable* runnable) { - QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable)); -} - -Thread::Thread(Runnable& runnable) { - QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable)); -} - -void Thread::join(){ - QPID_POSIX_THROW_IF(pthread_join(thread, 0)); -} - -long Thread::id() { - return long(thread); -} - -Thread::Thread(pthread_t thr) : thread(thr) {} - -Thread Thread::current() { - return Thread(pthread_self()); -} - -void Thread::yield() -{ - QPID_POSIX_THROW_IF(pthread_yield()); -} - - +#include "posix/Thread.h" #endif -}} - #endif /*!_sys_Thread_h*/ diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h index 47609d51df..314f4b5bbb 100644 --- a/cpp/src/qpid/sys/Time.h +++ b/cpp/src/qpid/sys/Time.h @@ -23,36 +23,76 @@ */ #include <stdint.h> - -#ifdef USE_APR -#include <apr_time.h> -#else -#include <time.h> -#endif +#include <limits> namespace qpid { namespace sys { -/** Time in nanoseconds */ -typedef int64_t Time; +class Duration; + +/** Times in nanoseconds */ +class AbsTime { + int64_t time_ns; + + friend class Duration; + +public: + inline AbsTime() {} + inline AbsTime(const AbsTime& time0, const Duration& duration); + // Default asignment operation fine + // Default copy constructor fine + + static AbsTime now(); + inline static AbsTime FarFuture(); +}; + +class Duration { + int64_t nanosecs; + + friend class AbsTime; + +public: + inline Duration(int64_t time0); + inline explicit Duration(const AbsTime& time0); + inline explicit Duration(const AbsTime& start, const AbsTime& finish); + inline operator int64_t() const; +}; + -Time now(); +AbsTime::AbsTime(const AbsTime& time0, const Duration& duration0) : + time_ns(time0.time_ns+duration0.nanosecs) +{} + +AbsTime AbsTime::FarFuture() { AbsTime ff; ff.time_ns = std::numeric_limits<int64_t>::max(); return ff;} + +inline AbsTime now() { return AbsTime::now(); } + +Duration::Duration(int64_t time0) : + nanosecs(time0) +{} + +Duration::Duration(const AbsTime& time0) : + nanosecs(time0.time_ns) +{} + +Duration::Duration(const AbsTime& start, const AbsTime& finish) : + nanosecs(finish.time_ns - start.time_ns) +{} + +Duration::operator int64_t() const +{ return nanosecs; } /** Nanoseconds per second. */ -const Time TIME_SEC = 1000*1000*1000; +const Duration TIME_SEC = 1000*1000*1000; /** Nanoseconds per millisecond */ -const Time TIME_MSEC = 1000*1000; +const Duration TIME_MSEC = 1000*1000; /** Nanoseconds per microseconds. */ -const Time TIME_USEC = 1000; +const Duration TIME_USEC = 1000; /** Nanoseconds per nanosecond. */ -const Time TIME_NSEC = 1; - -#ifndef USE_APR -struct timespec toTimespec(const Time& t); -struct timespec& toTimespec(struct timespec& ts, const Time& t); -Time toTime(const struct timespec& ts); -#endif +const Duration TIME_NSEC = 1; +/** Time greater than any other time */ +const AbsTime FAR_FUTURE = AbsTime::FarFuture(); }} #endif /*!_sys_Time_h*/ diff --git a/cpp/src/qpid/sys/apr/APRAcceptor.cpp b/cpp/src/qpid/sys/apr/APRAcceptor.cpp index dc021d2a3f..e9ce24ac2d 100644 --- a/cpp/src/qpid/sys/apr/APRAcceptor.cpp +++ b/cpp/src/qpid/sys/apr/APRAcceptor.cpp @@ -20,6 +20,7 @@ */ #include "qpid/sys/Acceptor.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" +#include "qpid/sys/Mutex.h" #include "LFProcessor.h" #include "LFSessionContext.h" #include "APRBase.h" diff --git a/cpp/src/qpid/sys/apr/Condition.h b/cpp/src/qpid/sys/apr/Condition.h new file mode 100644 index 0000000000..5e544219ab --- /dev/null +++ b/cpp/src/qpid/sys/apr/Condition.h @@ -0,0 +1,84 @@ +#ifndef _sys_apr_Condition_h +#define _sys_apr_Condition_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "APRPool.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" + +#include <sys/errno.h> +#include <boost/noncopyable.hpp> +#include <apr_thread_cond.h> + +namespace qpid { +namespace sys { + +/** + * A condition variable for thread synchronization. + */ +class Condition +{ + public: + inline Condition(); + inline ~Condition(); + inline void wait(Mutex&); + inline bool wait(Mutex&, const AbsTime& absoluteTime); + inline void notify(); + inline void notifyAll(); + + private: + apr_thread_cond_t* condition; +}; + + +Condition::Condition() { + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); +} + +Condition::~Condition() { + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); +} + +void Condition::wait(Mutex& mutex) { + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex.mutex)); +} + +bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){ + // APR uses microseconds. + apr_status_t status = + apr_thread_cond_timedwait( + condition, mutex.mutex, Duration(now(), absoluteTime)/TIME_USEC); + if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status); + return status == 0; +} + +void Condition::notify(){ + CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void Condition::notifyAll(){ + CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + +}} +#endif /*!_sys_apr_Condition_h*/ diff --git a/cpp/src/qpid/sys/apr/LFProcessor.cpp b/cpp/src/qpid/sys/apr/LFProcessor.cpp index 0d8ac425fe..9e139c874c 100644 --- a/cpp/src/qpid/sys/apr/LFProcessor.cpp +++ b/cpp/src/qpid/sys/apr/LFProcessor.cpp @@ -20,6 +20,7 @@ */ #include <sstream> #include "qpid/QpidError.h" +#include "qpid/sys/Mutex.h" #include "LFProcessor.h" #include "APRBase.h" #include "LFSessionContext.h" diff --git a/cpp/src/qpid/sys/apr/LFSessionContext.h b/cpp/src/qpid/sys/apr/LFSessionContext.h index ed97b23645..3c90c4a381 100644 --- a/cpp/src/qpid/sys/apr/LFSessionContext.h +++ b/cpp/src/qpid/sys/apr/LFSessionContext.h @@ -30,6 +30,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/Buffer.h" #include "qpid/sys/Monitor.h" +#include "qpid/sys/Mutex.h" #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/sys/ConnectionInputHandler.h" diff --git a/cpp/src/qpid/sys/apr/Module.h b/cpp/src/qpid/sys/apr/Module.h new file mode 100644 index 0000000000..d77cc0f388 --- /dev/null +++ b/cpp/src/qpid/sys/apr/Module.h @@ -0,0 +1,114 @@ +#ifndef _sys_apr_Module_h +#define _sys_apr_Module_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/QpidError.h" +#include "APRBase.h" +#include "APRPool.h" + +#include <boost/noncopyable.hpp> +#include <iostream> +#include <apr_dso.h> + +namespace qpid { +namespace sys { + +typedef apr_dso_handle_t* dso_handle_t; + +template <class T> class Module : private boost::noncopyable +{ + typedef T* create_t(); + typedef void destroy_t(T*); + + dso_handle_t handle; + destroy_t* destroy; + T* ptr; + + void load(const std::string& name); + void unload(); + void* getSymbol(const std::string& name); + +public: + Module(const std::string& name); + T* operator->(); + T* get(); + ~Module() throw(); +}; + +template <class T> Module<T>::Module(const std::string& module) : destroy(0), ptr(0) +{ + load(module); + //TODO: need a better strategy for symbol names to allow multiple + //modules to be loaded without clashes... + + //Note: need the double cast to avoid errors in casting from void* to function pointer with -pedantic + create_t* create = reinterpret_cast<create_t*>(reinterpret_cast<intptr_t>(getSymbol("create"))); + destroy = reinterpret_cast<destroy_t*>(reinterpret_cast<intptr_t>(getSymbol("destroy"))); + ptr = create(); +} + +template <class T> T* Module<T>::operator->() +{ + return ptr; +} + +template <class T> T* Module<T>::get() +{ + return ptr; +} + +template <class T> Module<T>::~Module() throw() +{ + try { + if (handle && ptr) { + destroy(ptr); + } + if (handle) unload(); + } catch (std::exception& e) { + std::cout << "Error while destroying module: " << e.what() << std::endl; + } + destroy = 0; + handle = 0; + ptr = 0; +} + +template <class T> void Module<T>::load(const std::string& name) +{ + CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), APRPool::get())); +} + +template <class T> void Module<T>::unload() +{ + CHECK_APR_SUCCESS(apr_dso_unload(handle)); +} + +template <class T> void* Module<T>::getSymbol(const std::string& name) +{ + apr_dso_handle_sym_t symbol; + CHECK_APR_SUCCESS(apr_dso_sym(&symbol, handle, name.c_str())); + return (void*) symbol; +} + +}} +#endif //ifndef _sys_apr_Module_h + diff --git a/cpp/src/qpid/sys/apr/Mutex.h b/cpp/src/qpid/sys/apr/Mutex.h new file mode 100644 index 0000000000..700b5b910b --- /dev/null +++ b/cpp/src/qpid/sys/apr/Mutex.h @@ -0,0 +1,72 @@ +#ifndef _sys_apr_Mutex_h +#define _sys_apr_Mutex_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "APRBase.h" +#include "APRPool.h" + +#include <boost/noncopyable.hpp> +#include <apr_thread_mutex.h> + +namespace qpid { +namespace sys { + +class Condition; + +/** + * Mutex lock. + */ +class Mutex : private boost::noncopyable { + public: + typedef ScopedLock<Mutex> ScopedLock; + typedef ScopedUnlock<Mutex> ScopedUnlock; + + inline Mutex(); + inline ~Mutex(); + inline void lock(); + inline void unlock(); + inline void trylock(); + + protected: + apr_thread_mutex_t* mutex; + friend class Condition; +}; + +Mutex::Mutex() { + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); +} + +Mutex::~Mutex(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); +} + +void Mutex::lock() { + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} +void Mutex::unlock() { + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} + +void Mutex::trylock() { + CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); +} + +}} +#endif /*!_sys_apr_Mutex_h*/ diff --git a/cpp/src/qpid/sys/apr/Socket.cpp b/cpp/src/qpid/sys/apr/Socket.cpp index 8cd98161d2..6e64d656d2 100644 --- a/cpp/src/qpid/sys/apr/Socket.cpp +++ b/cpp/src/qpid/sys/apr/Socket.cpp @@ -20,7 +20,7 @@ */ -#include "qpid/sys/Socket.h" +#include "Socket.h" #include "APRBase.h" #include "APRPool.h" @@ -40,7 +40,7 @@ Socket::Socket(apr_socket_t* s) { socket = s; } -void Socket::setTimeout(Time interval) { +void Socket::setTimeout(const Duration& interval) { apr_socket_timeout_set(socket, interval/TIME_USEC); } diff --git a/cpp/src/qpid/sys/apr/Socket.h b/cpp/src/qpid/sys/apr/Socket.h new file mode 100644 index 0000000000..c20c36dcd9 --- /dev/null +++ b/cpp/src/qpid/sys/apr/Socket.h @@ -0,0 +1,75 @@ +#ifndef _sys_apr_Socket_h +#define _sys_apr_Socket_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include "qpid/sys/Time.h" + +#include <apr_network_io.h> + +namespace qpid { +namespace sys { + +class Socket +{ + public: + /** Create an initialized TCP socket */ + static Socket createTcp(); + + /** Create a socket wrapper for descriptor. */ + Socket(apr_socket_t* descriptor = 0); + + /** Set timeout for read and write */ + void setTimeout(const Duration& interval); + + void connect(const std::string& host, int port); + + void close(); + + enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode; + + /** Returns bytes sent or an ErrorCode value < 0. */ + ssize_t send(const void* data, size_t size); + + /** + * Returns bytes received, an ErrorCode value < 0 or 0 + * if the connection closed in an orderly manner. + */ + ssize_t recv(void* data, size_t size); + + /** Bind to a port and start listening. + *@param port 0 means choose an available port. + *@param backlog maximum number of pending connections. + *@return The bound port. + */ + int listen(int port = 0, int backlog = 10); + + /** Get file descriptor */ + int fd(); + + private: + apr_socket_t* socket; +}; + +}} +#endif /*!_sys_Socket_h*/ diff --git a/cpp/src/qpid/sys/apr/Thread.cpp b/cpp/src/qpid/sys/apr/Thread.cpp index 0f9aeca186..3369ef7eb1 100644 --- a/cpp/src/qpid/sys/apr/Thread.cpp +++ b/cpp/src/qpid/sys/apr/Thread.cpp @@ -19,7 +19,8 @@ * */ -#include "qpid/sys/Thread.h" +#include "Thread.h" +#include "qpid/sys/Runnable.h" using namespace qpid::sys; using qpid::sys::Runnable; diff --git a/cpp/src/qpid/sys/apr/Thread.h b/cpp/src/qpid/sys/apr/Thread.h new file mode 100644 index 0000000000..ce876efbdf --- /dev/null +++ b/cpp/src/qpid/sys/apr/Thread.h @@ -0,0 +1,93 @@ +#ifndef _sys_apr_Thread_h +#define _sys_apr_Thread_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "APRPool.h" +#include "APRBase.h" + +#include <apr_thread_proc.h> +#include <apr_portable.h> + +namespace qpid { +namespace sys { + +class Runnable; + +class Thread +{ + public: + inline static Thread current(); + inline static void yield(); + + inline Thread(); + inline explicit Thread(qpid::sys::Runnable*); + inline explicit Thread(qpid::sys::Runnable&); + + inline void join(); + + inline long id(); + + private: + static void* APR_THREAD_FUNC runRunnable(apr_thread_t* thread, void *data); + inline Thread(apr_thread_t* t); + apr_thread_t* thread; +}; + +Thread::Thread() : thread(0) {} + +Thread::Thread(Runnable* runnable) { + CHECK_APR_SUCCESS( + apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get())); +} + +Thread::Thread(Runnable& runnable) { + CHECK_APR_SUCCESS( + apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get())); +} + +void Thread::join(){ + apr_status_t status; + if (thread != 0) + CHECK_APR_SUCCESS(apr_thread_join(&status, thread)); +} + +long Thread::id() { + return long(thread); +} + +Thread::Thread(apr_thread_t* t) : thread(t) {} + +Thread Thread::current(){ + apr_thread_t* thr; + apr_os_thread_t osthr = apr_os_thread_current(); + CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get())); + return Thread(thr); +} + +void Thread::yield() +{ + apr_thread_yield(); +} + +}} +#endif /*!_sys_apr_Thread_h*/ diff --git a/cpp/src/qpid/sys/apr/Time.cpp b/cpp/src/qpid/sys/apr/Time.cpp new file mode 100644 index 0000000000..34e740b144 --- /dev/null +++ b/cpp/src/qpid/sys/apr/Time.cpp @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Time.h" + +#include <apr_time.h> + +namespace qpid { +namespace sys { + +AbsTime AbsTime::now() { + AbsTime time_now; + time_now.time_ns = apr_time_now() * TIME_USEC; + return time_now; +} + +}} + diff --git a/cpp/src/qpid/sys/posix/Condition.h b/cpp/src/qpid/sys/posix/Condition.h new file mode 100644 index 0000000000..1c8d1a80b1 --- /dev/null +++ b/cpp/src/qpid/sys/posix/Condition.h @@ -0,0 +1,86 @@ +#ifndef _sys_posix_Condition_h +#define _sys_posix_Condition_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "PrivatePosix.h" + +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" + +#include <time.h> +#include <sys/errno.h> +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +/** + * A condition variable for thread synchronization. + */ +class Condition +{ + public: + inline Condition(); + inline ~Condition(); + inline void wait(Mutex&); + inline bool wait(Mutex&, const AbsTime& absoluteTime); + inline void notify(); + inline void notifyAll(); + + private: + pthread_cond_t condition; +}; + +Condition::Condition() { + QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0)); +} + +Condition::~Condition() { + QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition)); +} + +void Condition::wait(Mutex& mutex) { + QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex)); +} + +bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){ + struct timespec ts; + toTimespec(ts, Duration(absoluteTime)); + int status = pthread_cond_timedwait(&condition, &mutex.mutex, &ts); + if (status != 0) { + if (status == ETIMEDOUT) return false; + throw QPID_POSIX_ERROR(status); + } + return true; +} + +void Condition::notify(){ + QPID_POSIX_THROW_IF(pthread_cond_signal(&condition)); +} + +void Condition::notifyAll(){ + QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition)); +} + +}} +#endif /*!_sys_posix_Condition_h*/ diff --git a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp index 2676985cc6..d5a2c238d9 100644 --- a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp @@ -51,9 +51,9 @@ class EventChannelAcceptor : public Acceptor { int16_t port_, int backlog, int nThreads, bool trace_ ); - int getPort() const; + uint16_t getPort() const; - void run(ConnectionInputHandlerFactory& factory); + void run(ConnectionInputHandlerFactory* factory); void shutdown(); @@ -96,17 +96,17 @@ EventChannelAcceptor::EventChannelAcceptor( threads(EventChannelThreads::create(EventChannel::create(), nThreads)) { } -int EventChannelAcceptor::getPort() const { +uint16_t EventChannelAcceptor::getPort() const { return port; // Immutable no need for lock. } -void EventChannelAcceptor::run(ConnectionInputHandlerFactory& f) { +void EventChannelAcceptor::run(ConnectionInputHandlerFactory* f) { { Mutex::ScopedLock l(lock); if (!isRunning && !isShutdown) { isRunning = true; - factory = &f; - threads->post(acceptEvent); + factory = f; + threads->postEvent(acceptEvent); } } threads->join(); // Wait for shutdown. @@ -143,7 +143,7 @@ void EventChannelAcceptor::accept() int fd = acceptEvent.getAcceptedDesscriptor(); connections.push_back( new EventChannelConnection(threads, *factory, fd, fd, isTrace)); - threads->post(acceptEvent); // Keep accepting. + threads->postEvent(acceptEvent); // Keep accepting. } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp index 9a7d53fa8a..73e617ea83 100644 --- a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp @@ -140,7 +140,7 @@ void EventChannelConnection::startWrite() { writeFd, out.start(), out.available(), boost::bind(&EventChannelConnection::closeOnException, this, &EventChannelConnection::endWrite)); - threads->post(writeEvent); + threads->postEvent(writeEvent); } // ScopedBusy ctor increments busyThreads. @@ -180,7 +180,7 @@ void EventChannelConnection::startRead() { // Non blocking read, as much as we can swallow. readEvent = ReadEvent( readFd, in.start(), in.available(), readCallback,true); - threads->post(readEvent); + threads->postEvent(readEvent); } // Completion of initial read, expect protocolInit. diff --git a/cpp/src/qpid/sys/posix/Module.h b/cpp/src/qpid/sys/posix/Module.h new file mode 100644 index 0000000000..af3d6ac6ef --- /dev/null +++ b/cpp/src/qpid/sys/posix/Module.h @@ -0,0 +1,126 @@ +#ifndef _sys_posix_Module_h +#define _sys_posix_Module_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/QpidError.h" + +#include <boost/noncopyable.hpp> +#include <iostream> +#include <dlfcn.h> + +namespace qpid { +namespace sys { + +typedef void* dso_handle_t; + +template <class T> class Module : private boost::noncopyable +{ + typedef T* create_t(); + typedef void destroy_t(T*); + + dso_handle_t handle; + destroy_t* destroy; + T* ptr; + + void load(const std::string& name); + void unload(); + void* getSymbol(const std::string& name); + +public: + Module(const std::string& name); + T* operator->(); + T* get(); + ~Module() throw(); +}; + +template <class T> Module<T>::Module(const std::string& module) : destroy(0), ptr(0) +{ + load(module); + //TODO: need a better strategy for symbol names to allow multiple + //modules to be loaded without clashes... + + //Note: need the double cast to avoid errors in casting from void* to function pointer with -pedantic + create_t* create = reinterpret_cast<create_t*>(reinterpret_cast<intptr_t>(getSymbol("create"))); + destroy = reinterpret_cast<destroy_t*>(reinterpret_cast<intptr_t>(getSymbol("destroy"))); + ptr = create(); +} + +template <class T> T* Module<T>::operator->() +{ + return ptr; +} + +template <class T> T* Module<T>::get() +{ + return ptr; +} + +template <class T> Module<T>::~Module() throw() +{ + try { + if (handle && ptr) { + destroy(ptr); + } + if (handle) unload(); + } catch (std::exception& e) { + std::cout << "Error while destroying module: " << e.what() << std::endl; + } + destroy = 0; + handle = 0; + ptr = 0; +} + +template <class T> void Module<T>::load(const std::string& name) +{ + dlerror(); + handle = dlopen(name.c_str(), RTLD_NOW); + const char* error = dlerror(); + if (error) { + THROW_QPID_ERROR(INTERNAL_ERROR, error); + } +} + +template <class T> void Module<T>::unload() +{ + dlerror(); + dlclose(handle); + const char* error = dlerror(); + if (error) { + THROW_QPID_ERROR(INTERNAL_ERROR, error); + } +} + +template <class T> void* Module<T>::getSymbol(const std::string& name) +{ + dlerror(); + void* sym = dlsym(handle, name.c_str()); + const char* error = dlerror(); + if (error) { + THROW_QPID_ERROR(INTERNAL_ERROR, error); + } + return sym; +} + +}} +#endif //ifndef _sys_posix_Module_h + diff --git a/cpp/src/qpid/sys/posix/Mutex.h b/cpp/src/qpid/sys/posix/Mutex.h new file mode 100644 index 0000000000..b278c6b14a --- /dev/null +++ b/cpp/src/qpid/sys/posix/Mutex.h @@ -0,0 +1,127 @@ +#ifndef _sys_posix_Mutex_h +#define _sys_posix_Mutex_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "check.h" + +#include <pthread.h> +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +class Condition; + +/** + * Mutex lock. + */ +class Mutex : private boost::noncopyable { + friend class Condition; + +public: + typedef ScopedLock<Mutex> ScopedLock; + typedef ScopedUnlock<Mutex> ScopedUnlock; + + inline Mutex(); + inline ~Mutex(); + inline void lock(); + inline void unlock(); + inline void trylock(); + +protected: + pthread_mutex_t mutex; +}; + +/** + * Initialise a recursive mutex attr for use in creating mutexes later + * (we use pthread_once to make sure it is initialised exactly once) + */ +namespace { + pthread_once_t onceControl = PTHREAD_ONCE_INIT; + pthread_mutexattr_t mutexattr; + + void initMutexattr() { + pthread_mutexattr_init(&mutexattr); + pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE); + } + + struct RecursiveMutexattr { + RecursiveMutexattr() { + pthread_once(&onceControl, initMutexattr); + } + + operator const pthread_mutexattr_t*() const { + return &mutexattr; + } + }; + + RecursiveMutexattr recursiveMutexattr; +} + +/** + * PODMutex is a POD, can be static-initialized with + * PODMutex m = QPID_PODMUTEX_INITIALIZER + */ +struct PODMutex +{ + typedef ScopedLock<PODMutex> ScopedLock; + + inline void lock(); + inline void unlock(); + inline void trylock(); + + // Must be public to be a POD: + pthread_mutex_t mutex; +}; + +#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER } + +void PODMutex::lock() { + QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); +} +void PODMutex::unlock() { + QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); +} + +void PODMutex::trylock() { + QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); +} + +Mutex::Mutex() { + QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, recursiveMutexattr)); +} + +Mutex::~Mutex(){ + QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex)); +} + +void Mutex::lock() { + QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); +} +void Mutex::unlock() { + QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); +} + +void Mutex::trylock() { + QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); +} + +}} +#endif /*!_sys_posix_Mutex_h*/ diff --git a/cpp/src/qpid/sys/posix/PrivatePosix.h b/cpp/src/qpid/sys/posix/PrivatePosix.h new file mode 100644 index 0000000000..2707057ef0 --- /dev/null +++ b/cpp/src/qpid/sys/posix/PrivatePosix.h @@ -0,0 +1,39 @@ +#ifndef _sys_posix_PrivatePosix_h +#define _sys_posix_PrivatePosix_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Time.h" + +struct timespec; +struct timeval; + +namespace qpid { +namespace sys { + +struct timespec& toTimespec(struct timespec& ts, const Duration& t); +struct timeval& toTimeval(struct timeval& tv, const Duration& t); +Duration toTime(const struct timespec& ts); + +}} + +#endif /*!_sys_posix_PrivatePosix_h*/ diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index e9bd2eeb6b..39651fa821 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -19,6 +19,12 @@ * */ +#include "qpid/sys/Socket.h" + +#include "qpid/QpidError.h" +#include "check.h" +#include "PrivatePosix.h" + #include <sys/socket.h> #include <sys/errno.h> #include <netinet/in.h> @@ -26,10 +32,6 @@ #include <boost/format.hpp> -#include "qpid/QpidError.h" -#include "check.h" -#include "qpid/sys/Socket.h" - using namespace qpid::sys; Socket Socket::createTcp() @@ -41,11 +43,10 @@ Socket Socket::createTcp() Socket::Socket(int descriptor) : socket(descriptor) {} -void Socket::setTimeout(Time interval) +void Socket::setTimeout(const Duration& interval) { struct timeval tv; - tv.tv_sec = interval/TIME_SEC; - tv.tv_usec = (interval%TIME_SEC)/TIME_USEC; + toTimeval(tv, interval); setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); } diff --git a/cpp/src/qpid/sys/posix/Socket.h b/cpp/src/qpid/sys/posix/Socket.h new file mode 100644 index 0000000000..614221354f --- /dev/null +++ b/cpp/src/qpid/sys/posix/Socket.h @@ -0,0 +1,76 @@ +#ifndef _sys_posix_Socket_h +#define _sys_posix_Socket_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include "qpid/sys/Time.h" + +namespace qpid { +namespace sys { + +class Socket +{ + public: + /** Create an initialized TCP socket */ + static Socket createTcp(); + + /** Create a socket wrapper for descriptor. */ + Socket(int descriptor = 0); + + /** Set timeout for read and write */ + void setTimeout(const Duration& interval); + + void connect(const std::string& host, int port); + + void close(); + + enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode; + + /** Returns bytes sent or an ErrorCode value < 0. */ + ssize_t send(const void* data, size_t size); + + /** + * Returns bytes received, an ErrorCode value < 0 or 0 + * if the connection closed in an orderly manner. + */ + ssize_t recv(void* data, size_t size); + + /** Bind to a port and start listening. + *@param port 0 means choose an available port. + *@param backlog maximum number of pending connections. + *@return The bound port. + */ + int listen(int port = 0, int backlog = 10); + + /** Get file descriptor */ + int fd(); + + private: + void init() const; + mutable int socket; // Initialized on demand. +}; + +}} + + +#endif /*!_sys_Socket_h*/ diff --git a/cpp/src/qpid/sys/posix/Thread.cpp b/cpp/src/qpid/sys/posix/Thread.cpp index fc85e35028..dc9b21448f 100644 --- a/cpp/src/qpid/sys/posix/Thread.cpp +++ b/cpp/src/qpid/sys/posix/Thread.cpp @@ -19,7 +19,8 @@ * */ -#include "qpid/sys/Thread.h" +#include "Thread.h" +#include "qpid/sys/Runnable.h" void* qpid::sys::Thread::runRunnable(void* p) { diff --git a/cpp/src/qpid/sys/posix/Thread.h b/cpp/src/qpid/sys/posix/Thread.h new file mode 100644 index 0000000000..9de7299f5a --- /dev/null +++ b/cpp/src/qpid/sys/posix/Thread.h @@ -0,0 +1,86 @@ +#ifndef _sys_posix_Thread_h +#define _sys_posix_Thread_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "check.h" +#include <pthread.h> + +namespace qpid { +namespace sys { + +class Runnable; + +class Thread +{ + public: + inline static Thread current(); + inline static void yield(); + + inline Thread(); + inline explicit Thread(qpid::sys::Runnable*); + inline explicit Thread(qpid::sys::Runnable&); + + inline void join(); + + inline long id(); + + private: + static void* runRunnable(void* runnable); + inline Thread(pthread_t); + pthread_t thread; +}; + + +Thread::Thread() : thread(0) {} + +Thread::Thread(Runnable* runnable) { + QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable)); +} + +Thread::Thread(Runnable& runnable) { + QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable)); +} + +void Thread::join(){ + if (thread != 0) + QPID_POSIX_THROW_IF(pthread_join(thread, 0)); +} + +long Thread::id() { + return long(thread); +} + +Thread::Thread(pthread_t thr) : thread(thr) {} + +Thread Thread::current() { + return Thread(pthread_self()); +} + +void Thread::yield() +{ + QPID_POSIX_THROW_IF(pthread_yield()); +} + + +}} +#endif /*!_sys_posix_Thread_h*/ diff --git a/cpp/src/qpid/sys/Time.cpp b/cpp/src/qpid/sys/posix/Time.cpp index ad6185b966..2228caea58 100644 --- a/cpp/src/qpid/sys/Time.cpp +++ b/cpp/src/qpid/sys/posix/Time.cpp @@ -19,42 +19,39 @@ * */ -#include "Time.h" +#include "PrivatePosix.h" -namespace qpid { -namespace sys { - -// APR ================================================================ -#if USE_APR +#include "qpid/sys/Time.h" -Time now() { return apr_time_now() * TIME_USEC; } +#include <time.h> +#include <sys/time.h> -// POSIX================================================================ -#else +namespace qpid { +namespace sys { -Time now() { +AbsTime AbsTime::now() { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); - return toTime(ts); + AbsTime time_now; + time_now.time_ns = toTime(ts).nanosecs; + return time_now; } -struct timespec toTimespec(const Time& t) { - struct timespec ts; - toTimespec(ts, t); +struct timespec& toTimespec(struct timespec& ts, const Duration& t) { + ts.tv_sec = t / TIME_SEC; + ts.tv_nsec = t % TIME_SEC; return ts; } -struct timespec& toTimespec(struct timespec& ts, const Time& t) { - ts.tv_sec = t / TIME_SEC; - ts.tv_nsec = t % TIME_SEC; - return ts; +struct timeval& toTimeval(struct timeval& tv, const Duration& t) { + tv.tv_sec = t/TIME_SEC; + tv.tv_usec = (t%TIME_SEC)/TIME_USEC; + return tv; } -Time toTime(const struct timespec& ts) { +Duration toTime(const struct timespec& ts) { return ts.tv_sec*TIME_SEC + ts.tv_nsec; } - -#endif }} diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 74e5863613..198c9ce7ef 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -1,5 +1,5 @@ AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(APR_CXXFLAGS) -INCLUDES = -I$(srcdir)/.. -I$(srcdir)/../gen -I$(builddir)/../gen +INCLUDES = -I$(srcdir)/.. -I$(srcdir)/../gen -I$(top_builddir)/src/gen # Unit tests broker_unit_tests = \ diff --git a/cpp/src/tests/ProducerConsumerTest.cpp b/cpp/src/tests/ProducerConsumerTest.cpp index 410a846b8b..789e365a85 100644 --- a/cpp/src/tests/ProducerConsumerTest.cpp +++ b/cpp/src/tests/ProducerConsumerTest.cpp @@ -74,9 +74,9 @@ class WatchedCounter : public Monitor { return count; } - bool waitFor(int i, Time timeout=TIME_SEC) { + bool waitFor(int i, Duration timeout=TIME_SEC) { Lock l(*this); - Time deadline = timeout+now(); + AbsTime deadline(now(), timeout); while (count != i) { if (!wait(deadline)) return false; @@ -116,8 +116,8 @@ class ProducerConsumerTest : public CppUnit::TestCase struct ConsumeTimeoutRunnable : public Runnable { ProducerConsumerTest& test; - Time timeout; - ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Time& t) + Duration timeout; + ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Duration& t) : test(test_), timeout(t) {} void run() { test.consumeTimeout(timeout); } }; @@ -143,7 +143,7 @@ class ProducerConsumerTest : public CppUnit::TestCase consumeInternal(consumer); }; - void consumeTimeout(const Time& timeout) { + void consumeTimeout(const Duration& timeout) { ProducerConsumer::ConsumerLock consumer(pc, timeout); consumeInternal(consumer); }; diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index aa51dd27d4..8af8f007d9 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -56,7 +56,7 @@ class Listener : public MessageListener{ const bool transactional; bool init; int count; - Time start; + AbsTime start; void shutdown(); void report(); @@ -159,8 +159,8 @@ void Listener::shutdown(){ } void Listener::report(){ - Time finish = now(); - Time time = finish - start; + AbsTime finish = now(); + Duration time(start, finish); stringstream reportstr; reportstr << "Received " << count << " messages in " << time/TIME_MSEC << " ms."; diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp index 1c92f75cc8..7c16dd71f3 100644 --- a/cpp/src/tests/topic_publisher.cpp +++ b/cpp/src/tests/topic_publisher.cpp @@ -183,7 +183,7 @@ void Publisher::waitForCompletion(int msgs){ int64_t Publisher::publish(int msgs, int listeners, int size){ Message msg; msg.setData(generateData(size)); - Time start = now(); + AbsTime start = now(); { Monitor::ScopedLock l(monitor); for(int i = 0; i < msgs; i++){ @@ -201,8 +201,8 @@ int64_t Publisher::publish(int msgs, int listeners, int size){ waitForCompletion(listeners); } - Time finish = now(); - return finish - start; + AbsTime finish = now(); + return Duration(start, finish); } string Publisher::generateData(int size){ |