diff options
author | Alan Conway <aconway@apache.org> | 2006-11-29 14:36:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-11-29 14:36:08 +0000 |
commit | b13e1a24fcca8797b7be5a242f164afbe17ec4f6 (patch) | |
tree | ef0362e52c125bc75b07ef3e374dabfa52254e98 /cpp/src/qpid/sys | |
parent | 16d818e749462daf5e0e43079b2e48991646c619 (diff) | |
download | qpid-python-b13e1a24fcca8797b7be5a242f164afbe17ec4f6.tar.gz |
Posix EventChannel implementation using epoll. Placeholder for kevents.
Dynamic thread pool EventChannelThreads to serve EventChannel.
Misc cleanup/enhancements.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480582 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r-- | cpp/src/qpid/sys/AtomicCount.h | 71 | ||||
-rw-r--r-- | cpp/src/qpid/sys/EventChannel.h | 239 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Monitor.h | 153 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Mutex.h | 151 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Runnable.cpp | 32 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Runnable.h | 20 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Socket.h | 30 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Thread.h | 42 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Time.cpp | 39 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Time.h | 46 |
10 files changed, 391 insertions, 432 deletions
diff --git a/cpp/src/qpid/sys/AtomicCount.h b/cpp/src/qpid/sys/AtomicCount.h new file mode 100644 index 0000000000..b625b2c9b0 --- /dev/null +++ b/cpp/src/qpid/sys/AtomicCount.h @@ -0,0 +1,71 @@ +#ifndef _posix_AtomicCount_h +#define _posix_AtomicCount_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/detail/atomic_count.hpp> +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +/** + * Atomic counter. + */ +class AtomicCount : boost::noncopyable { + public: + class ScopedDecrement : boost::noncopyable { + public: + /** Decrement counter in constructor and increment in destructor. */ + ScopedDecrement(AtomicCount& c) : count(c) { value = --count; } + ~ScopedDecrement() { ++count; } + /** Return the value returned by the decrement. */ + operator long() { return value; } + private: + AtomicCount& count; + long value; + }; + + class ScopedIncrement : boost::noncopyable { + public: + /** Increment counter in constructor and increment in destructor. */ + ScopedIncrement(AtomicCount& c) : count(c) { ++count; } + ~ScopedIncrement() { --count; } + private: + AtomicCount& count; + }; + + AtomicCount(long value = 0) : count(value) {} + + void operator++() { ++count ; } + + long operator--() { return --count; } + + operator long() const { return count; } + + + private: + boost::detail::atomic_count count; +}; + + +}} + + +#endif // _posix_AtomicCount_h diff --git a/cpp/src/qpid/sys/EventChannel.h b/cpp/src/qpid/sys/EventChannel.h deleted file mode 100644 index dd857c02c7..0000000000 --- a/cpp/src/qpid/sys/EventChannel.h +++ /dev/null @@ -1,239 +0,0 @@ -#ifndef _sys_EventChannel_h -#define _sys_EventChannel_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <qpid/SharedObject.h> -#include <qpid/Exception.h> -#include <qpid/sys/Time.h> - -namespace qpid { -namespace sys { - -class EventChannel; - -class Event; -class ReadEvent; -class WriteEvent; -class AcceptEvent; -class NotifyEvent; - -/** - Active event channel. Events represent async IO requests or - inter-task synchronization. Posting an Event registers interest in - the IO or sync event. When it occurs the posted Event is - corresponding IO or sync event occurs they are returned to one - of the threads waiting on the channel. For more details see - the Event hierarchy. -*/ -class EventChannel : public qpid::SharedObject<EventChannel> -{ - public: - static EventChannel::shared_ptr create(); - - virtual ~EventChannel() {} - - virtual void post(ReadEvent& event) = 0; - virtual void post(WriteEvent& event) = 0; - virtual void post(AcceptEvent& event) = 0; - virtual void post(NotifyEvent& event) = 0; - - inline void post(Event& event); - - /** - * Wait for the next completed event. - * @return An Event or 0 to indicate the calling thread should shut down. - */ - virtual Event* getEvent() = 0; - - /** Dispose of a system-allocated buffer. Called by ReadEvent */ - virtual void dispose(void* buffer, size_t size) = 0; - - protected: - EventChannel() {} -}; - - -/** - * Base class for all events. There are two possible styles of use: - * - * Task style: the event is allocated as a local variable on the initiating - * task, which blocks in wait(). Event::dispatch() resumes that task - * with the event data available. - * - * Proactor style: Threads post events but do not - * wait. Event::dispatch() processes the event in the dispatching - * thread and then deletes itself. - * - * Tasks give less kernel context switching and blocking AND simpler - * coding. Tasks can call any number of pseudo-blocking opereations - * that are actually event post/wait pairs. At each such point the - * current thread can continue with the task or switch to another task - * to minimise blocking. - * - * With Proactor style dispatch() is an atomic unit of work as far as - * the EventChannel is concerned. To avoid such blocking the - * application has to be written as a collection of non-blocking - * dispatch() callbacks, which is more complex than tasks that can - * call pseudo-blocking operations. - */ -class Event : private boost::noncopyable -{ - public: - virtual ~Event() {} - - /** Post this event to the channel */ - virtual void post(EventChannel& channel) = 0; - - /** - * Block till the event is delivered. - * At most one task can wait on an event. - */ - virtual void wait() const = 0; - - /** - * Dispatch the event. Runs some event-specific code, may switch - * context to resume a waiting task. - */ - virtual void dispatch() = 0; -}; - - -/** - * Base class for asynchronous request events, provides exception - * handling. - */ -class RequestEvent : public Event -{ - public: - /** True if the async request failed */ - bool hasException() const { return ex.get(); } - - const qpid::Exception& getException() const { return *ex; } - - void setException(std::auto_ptr<qpid::Exception>& e) { ex = e; } - - /** If the event has an exception throw it, else do nothing */ - void verify() const { if (ex.get()) throw *ex; } - - void post(EventChannel& channel) { channel.post(*this); } - - private: - qpid::HeapException ex; -}; - - -/** An asynchronous read event. */ -class ReadEvent : public RequestEvent { - public: - /** - * Read data from fd. - */ - ReadEvent(int fileDescriptor, void* buffer, size_t bytesToRead) : - fd(fileDescriptor), data(buffer), size(bytesToRead) {} - - /** Number of bytes read. */ - size_t getBytesRead() const { verify(); return size; } - - /** - * If the system supports direct access to DMA buffers then - * it may provide a direct pointer to such a buffer to avoid - * a copy into the user buffer. - * @return true if getData() is returning a system-supplied buffer. - */ - bool isSystemData() const { verify(); return channel != 0; } - - /** - * Pointer to data read. Note if isSystemData() is true then this - * is NOT the same buffer that was supplied to the constructor. - * The user buffer is untouched. See dispose(). - */ - void* getData() const { verify(); return data; } - - /** Called by the event channel on completion. */ - void complete(EventChannel::shared_ptr ec, void* _data, size_t _size) { - if (data != _data) channel = ec; data = _data; size = _size; - } - - /** - * Dispose of system-provided data buffer, if any. This is - * automatically called by the destructor. - */ - void dispose() { if(channel && data) channel->dispose(data,size); data=0; } - - ~ReadEvent() { dispose(); } - - void post(EventChannel& channel) { channel.post(*this); } - - private: - int fd; - void* data; - size_t size; - EventChannel::shared_ptr channel; -}; - -/** Asynchronous write event */ -class WriteEvent : public RequestEvent { - public: - WriteEvent(int fileDescriptor, void* buffer, size_t bytesToWrite) : - fd(fileDescriptor), data(buffer), size(bytesToWrite) {} - - /** Number of bytes written */ - size_t getBytesWritten() const { verify(); return size; } - - void post(EventChannel& channel) { channel.post(*this); } - - private: - int fd; - void* data; - size_t size; -}; - -/** Asynchronous socket accept event */ -class AcceptEvent : public RequestEvent { - public: - /** Accept a connection on listeningFd */ - AcceptEvent(int listeningFd) : listen(listeningFd) {} - - /** Get accepted file descriptor */ - int getAcceptedFd() const { verify(); return accepted; } - - void post(EventChannel& channel) { channel.post(*this); } - - private: - int listen; - int accepted; -}; - -/** - * NotifyEvent is delievered immediately to be dispatched by an - * EventChannel thread. - */ -class NotifyEvent : public RequestEvent { - public: - void post(EventChannel& channel) { channel.post(*this); } -}; - - -inline void EventChannel::post(Event& event) { event.post(*this); } - -}} - - -#endif /*!_sys_EventChannel_h*/ diff --git a/cpp/src/qpid/sys/Monitor.h b/cpp/src/qpid/sys/Monitor.h index 59e1e74b57..bbe126cecb 100644 --- a/cpp/src/qpid/sys/Monitor.h +++ b/cpp/src/qpid/sys/Monitor.h @@ -22,60 +22,28 @@ * */ +#include <sys/errno.h> #include <boost/noncopyable.hpp> +#include <qpid/sys/Mutex.h> +#include <qpid/sys/Time.h> #ifdef USE_APR -# include <apr_thread_mutex.h> -# include <apr_thread_cond.h> -# include <qpid/apr/APRBase.h> -# include <qpid/apr/APRPool.h> -#else -# include <pthread.h> -# include <qpid/sys/Time.h> -# include <qpid/posix/check.h> +# include <apr-1/apr_thread_cond.h> #endif namespace qpid { namespace sys { -template <class L> -class ScopedLock -{ - public: - ScopedLock(L& l) : mutex(l) { l.lock(); } - ~ScopedLock() { mutex.unlock(); } - private: - L& mutex; -}; - - -class Mutex : private boost::noncopyable -{ - public: - typedef ScopedLock<Mutex> ScopedLock; - - inline Mutex(); - inline ~Mutex(); - inline void lock(); - inline void unlock(); - inline void trylock(); - - protected: -#ifdef USE_APR - apr_thread_mutex_t* mutex; -#else - pthread_mutex_t mutex; -#endif -}; - -/** A condition variable and a mutex */ +/** + * A monitor is a condition variable and a mutex + */ class Monitor : public Mutex { public: inline Monitor(); inline ~Monitor(); inline void wait(); - inline bool wait(int64_t nsecs); + inline bool wait(const Time& absoluteTime); inline void notify(); inline void notifyAll(); @@ -91,25 +59,6 @@ class Monitor : public Mutex // APR ================================================================ #ifdef USE_APR -Mutex::Mutex() { - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); -} - -Mutex::~Mutex(){ - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); -} - -void Mutex::lock() { - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); -} -void Mutex::unlock() { - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); -} - -void Mutex::trylock() { - CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); -} - Monitor::Monitor() { CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); } @@ -122,10 +71,10 @@ void Monitor::wait() { CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); } -bool Monitor::wait(int64_t nsecs){ +bool Monitor::wait(const Time& absoluteTime){ // APR uses microseconds. - apr_status_t status = apr_thread_cond_timedwait( - condition, mutex, nsecs/1000); + apr_status_t status = + apr_thread_cond_timedwait(condition, mutex, absoluteTime/TIME_USEC); if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status); return status == 0; } @@ -138,93 +87,41 @@ void Monitor::notifyAll(){ CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); } - -}} - - -// POSIX ================================================================ #else -/** - * PODMutex is a POD, can be static-initialized with - * PODMutex m = QPID_PODMUTEX_INITIALIZER - */ -struct PODMutex -{ - typedef ScopedLock<PODMutex> ScopedLock; - - inline void lock(); - inline void unlock(); - inline void trylock(); - - // Must be public to be a POD: - pthread_mutex_t mutex; -}; - -#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER } - - -void PODMutex::lock() { - CHECK0(pthread_mutex_lock(&mutex)); -} -void PODMutex::unlock() { - CHECK0(pthread_mutex_unlock(&mutex)); -} - -void PODMutex::trylock() { - CHECK0(pthread_mutex_trylock(&mutex)); -} - - -Mutex::Mutex() { - CHECK0(pthread_mutex_init(&mutex, 0)); -} - -Mutex::~Mutex(){ - CHECK0(pthread_mutex_destroy(&mutex)); -} - -void Mutex::lock() { - CHECK0(pthread_mutex_lock(&mutex)); -} -void Mutex::unlock() { - CHECK0(pthread_mutex_unlock(&mutex)); -} - -void Mutex::trylock() { - CHECK0(pthread_mutex_trylock(&mutex)); -} +// POSIX ================================================================ Monitor::Monitor() { - CHECK0(pthread_cond_init(&condition, 0)); + QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0)); } Monitor::~Monitor() { - CHECK0(pthread_cond_destroy(&condition)); + QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition)); } void Monitor::wait() { - CHECK0(pthread_cond_wait(&condition, &mutex)); + QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex)); } -bool Monitor::wait(int64_t nsecs){ - Time t(nsecs); - int status = pthread_cond_timedwait(&condition, &mutex, &t.getTimespec()); - if(status != 0) { - if (errno == ETIMEDOUT) return false; - CHECK0(status); +bool Monitor::wait(const Time& absoluteTime){ + struct timespec ts; + toTimespec(ts, absoluteTime); + int status = pthread_cond_timedwait(&condition, &mutex, &ts); + if (status != 0) { + if (status == ETIMEDOUT) return false; + throw QPID_POSIX_ERROR(status); } return true; } void Monitor::notify(){ - CHECK0(pthread_cond_signal(&condition)); + QPID_POSIX_THROW_IF(pthread_cond_signal(&condition)); } void Monitor::notifyAll(){ - CHECK0(pthread_cond_broadcast(&condition)); + QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition)); } +#endif /*USE_APR*/ }} -#endif /*USE_APR*/ #endif /*!_sys_Monitor_h*/ diff --git a/cpp/src/qpid/sys/Mutex.h b/cpp/src/qpid/sys/Mutex.h new file mode 100644 index 0000000000..3ada2e98b7 --- /dev/null +++ b/cpp/src/qpid/sys/Mutex.h @@ -0,0 +1,151 @@ +#ifndef _sys_Mutex_h +#define _sys_Mutex_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifdef USE_APR +# include <apr-1/apr_thread_mutex.h> +# include <qpid/apr/APRBase.h> +# include <qpid/apr/APRPool.h> +#else +# include <pthread.h> +# include <qpid/posix/check.h> +#endif +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +/** + * Scoped lock template: calls lock() in ctor, unlock() in dtor. + * L can be any class with lock() and unlock() functions. + */ +template <class L> +class ScopedLock +{ + public: + ScopedLock(L& l) : mutex(l) { l.lock(); } + ~ScopedLock() { mutex.unlock(); } + private: + L& mutex; +}; + +/** + * Mutex lock. + */ +class Mutex : private boost::noncopyable { + public: + typedef ScopedLock<Mutex> ScopedLock; + + inline Mutex(); + inline ~Mutex(); + inline void lock(); + inline void unlock(); + inline void trylock(); + + protected: +#ifdef USE_APR + apr_thread_mutex_t* mutex; +#else + pthread_mutex_t mutex; +#endif +}; + +#ifdef USE_APR +// APR ================================================================ + +Mutex::Mutex() { + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); +} + +Mutex::~Mutex(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); +} + +void Mutex::lock() { + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} +void Mutex::unlock() { + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} + +void Mutex::trylock() { + CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); +} + +#else +// POSIX ================================================================ + +/** + * PODMutex is a POD, can be static-initialized with + * PODMutex m = QPID_PODMUTEX_INITIALIZER + */ +struct PODMutex +{ + typedef ScopedLock<PODMutex> ScopedLock; + + inline void lock(); + inline void unlock(); + inline void trylock(); + + // Must be public to be a POD: + pthread_mutex_t mutex; +}; + +#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER } + + +void PODMutex::lock() { + QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); +} +void PODMutex::unlock() { + QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); +} + +void PODMutex::trylock() { + QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); +} + + +Mutex::Mutex() { + QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, 0)); +} + +Mutex::~Mutex(){ + QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex)); +} + +void Mutex::lock() { + QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); +} +void Mutex::unlock() { + QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); +} + +void Mutex::trylock() { + QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); +} + +#endif // USE_APR + +}} + + + +#endif /*!_sys_Mutex_h*/ diff --git a/cpp/src/qpid/sys/Runnable.cpp b/cpp/src/qpid/sys/Runnable.cpp new file mode 100644 index 0000000000..30122c682f --- /dev/null +++ b/cpp/src/qpid/sys/Runnable.cpp @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Runnable.h" +#include <boost/bind.hpp> + +namespace qpid { +namespace sys { + +Runnable::~Runnable() {} + +Runnable::Functor Runnable::functor() +{ + return boost::bind(&Runnable::run, this); +} + +}} diff --git a/cpp/src/qpid/sys/Runnable.h b/cpp/src/qpid/sys/Runnable.h index 8379afb2f9..fb3927c612 100644 --- a/cpp/src/qpid/sys/Runnable.h +++ b/cpp/src/qpid/sys/Runnable.h @@ -1,3 +1,5 @@ +#ifndef _Runnable_ +#define _Runnable_ /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,18 +20,28 @@ * under the License. * */ -#ifndef _Runnable_ -#define _Runnable_ + +#include <boost/function.hpp> namespace qpid { namespace sys { -/** Base class for classes that run in a thread. */ +/** + * Interface for objects that can be run, e.g. in a thread. + */ class Runnable { public: - virtual ~Runnable() {} + /** Type to represent a runnable as a Functor */ + typedef boost::function0<void> Functor; + + virtual ~Runnable(); + + /** Derived classes override run(). */ virtual void run() = 0; + + /** Create a functor object that will call this->run(). */ + Functor functor(); }; }} diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h index cf757e7a27..d3e8c1af48 100644 --- a/cpp/src/qpid/sys/Socket.h +++ b/cpp/src/qpid/sys/Socket.h @@ -23,6 +23,7 @@ */ #include <string> +#include <qpid/sys/Time.h> #ifdef USE_APR # include <apr_network_io.h> @@ -34,10 +35,18 @@ namespace sys { class Socket { public: - Socket(); + /** Create an initialized TCP socket */ + static Socket createTcp(); + /** Create a socket wrapper for descriptor. */ +#ifdef USE_APR + Socket(apr_socket_t* descriptor = 0); +#else + Socket(int descriptor = 0); +#endif + /** Set timeout for read and write */ - void setTimeout(long msecs); + void setTimeout(Time interval); void connect(const std::string& host, int port); @@ -46,19 +55,30 @@ class Socket enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode; /** Returns bytes sent or an ErrorCode value < 0. */ - ssize_t send(const char* data, size_t size); + ssize_t send(const void* data, size_t size); /** * Returns bytes received, an ErrorCode value < 0 or 0 * if the connection closed in an orderly manner. */ - ssize_t recv(char* data, size_t size); + ssize_t recv(void* data, size_t size); + + /** Bind to a port and start listening. + *@param port 0 means choose an available port. + *@param backlog maximum number of pending connections. + *@return The bound port. + */ + int listen(int port = 0, int backlog = 10); + /** Get file descriptor */ + int fd(); + private: #ifdef USE_APR apr_socket_t* socket; #else - int socket; + void init() const; + mutable int socket; // Initialized on demand. #endif }; diff --git a/cpp/src/qpid/sys/Thread.h b/cpp/src/qpid/sys/Thread.h index 2aad7c24d7..37f714dd6c 100644 --- a/cpp/src/qpid/sys/Thread.h +++ b/cpp/src/qpid/sys/Thread.h @@ -40,11 +40,17 @@ namespace sys { class Thread { public: + inline static Thread current(); + inline static void yield(); + inline Thread(); inline explicit Thread(qpid::sys::Runnable*); + inline explicit Thread(qpid::sys::Runnable&); + inline void join(); - inline static Thread current(); + inline long id(); + private: #ifdef USE_APR static void* APR_THREAD_FUNC runRunnable(apr_thread_t* thread, void *data); @@ -68,12 +74,21 @@ Thread::Thread(Runnable* runnable) { apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get())); } +Thread::Thread(Runnable& runnable) { + CHECK_APR_SUCCESS( + apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get())); +} + void Thread::join(){ apr_status_t status; if (thread != 0) CHECK_APR_SUCCESS(apr_thread_join(&status, thread)); } +long Thread::id() { + return long(thread); +} + Thread::Thread(apr_thread_t* t) : thread(t) {} Thread Thread::current(){ @@ -83,15 +98,29 @@ Thread Thread::current(){ return Thread(thr); } +void Thread::yield() +{ + apr_thread_yield(); +} + + // POSIX ================================================================ #else Thread::Thread(Runnable* runnable) { - CHECK0(pthread_create(&thread, NULL, runRunnable, runnable)); + QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable)); +} + +Thread::Thread(Runnable& runnable) { + QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable)); } void Thread::join(){ - if (thread != 0) CHECK0(pthread_join(thread, 0)); + QPID_POSIX_THROW_IF(pthread_join(thread, 0)); +} + +long Thread::id() { + return long(thread); } Thread::Thread(pthread_t thr) : thread(thr) {} @@ -99,6 +128,13 @@ Thread::Thread(pthread_t thr) : thread(thr) {} Thread Thread::current() { return Thread(pthread_self()); } + +void Thread::yield() +{ + QPID_POSIX_THROW_IF(pthread_yield()); +} + + #endif }} diff --git a/cpp/src/qpid/sys/Time.cpp b/cpp/src/qpid/sys/Time.cpp index 3971297ec2..ad6185b966 100644 --- a/cpp/src/qpid/sys/Time.cpp +++ b/cpp/src/qpid/sys/Time.cpp @@ -27,37 +27,34 @@ namespace sys { // APR ================================================================ #if USE_APR -Time Time::now() { - return Time(apr_time_now(), NSEC_PER_USEC); -} - -void Time::set(int64_t ticks, long nsec_per_tick) { - time = (ticks * nsec_per_tick) / NSEC_PER_USEC; -} - -int64_t Time::nsecs() const { - return time * NSEC_PER_USEC; -} +Time now() { return apr_time_now() * TIME_USEC; } // POSIX================================================================ #else -Time Time::now() { - Time t; - clock_gettime(CLOCK_REALTIME, &t.time); - return t; +Time now() { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return toTime(ts); +} + +struct timespec toTimespec(const Time& t) { + struct timespec ts; + toTimespec(ts, t); + return ts; } -void Time::set(int64_t ticks, long nsec_per_tick) { - int64_t ns = ticks * nsec_per_tick; - time.tv_sec = ns / NSEC_PER_SEC; - time.tv_nsec = ns % NSEC_PER_SEC; +struct timespec& toTimespec(struct timespec& ts, const Time& t) { + ts.tv_sec = t / TIME_SEC; + ts.tv_nsec = t % TIME_SEC; + return ts; } -int64_t Time::nsecs() const { - return time.tv_sec * NSEC_PER_SEC + time.tv_nsec; +Time toTime(const struct timespec& ts) { + return ts.tv_sec*TIME_SEC + ts.tv_nsec; } + #endif }} diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h index a569c90780..3dd46741d8 100644 --- a/cpp/src/qpid/sys/Time.h +++ b/cpp/src/qpid/sys/Time.h @@ -33,43 +33,25 @@ namespace qpid { namespace sys { -class Time -{ - public: - static Time now(); - - enum { - NSEC_PER_SEC=1000*1000*1000, - NSEC_PER_MSEC=1000*1000, - NSEC_PER_USEC=1000 - }; +/** Time in nanoseconds */ +typedef int64_t Time; - inline Time(int64_t ticks=0, long nsec_per_tick=1); +Time now(); - void set(int64_t ticks, long nsec_per_tick=1); - - inline int64_t msecs() const; - inline int64_t usecs() const; - int64_t nsecs() const; +/** Nanoseconds per second. */ +const Time TIME_SEC = 1000*1000*1000; +/** Nanoseconds per millisecond */ +const Time TIME_MSEC = 1000*1000; +/** Nanoseconds per microseconds. */ +const Time TIME_USEC = 1000; +/** Nanoseconds per nanosecond. */ +const Time TIME_NSEC = 1; #ifndef USE_APR - const struct timespec& getTimespec() const { return time; } - struct timespec& getTimespec() { return time; } +struct timespec toTimespec(const Time& t); +struct timespec& toTimespec(struct timespec& ts, const Time& t); +Time toTime(const struct timespec& ts); #endif - - private: -#ifdef USE_APR - apr_time_t time; -#else - struct timespec time; -#endif -}; - -Time::Time(int64_t ticks, long nsec_per_tick) { set(ticks, nsec_per_tick); } - -int64_t Time::msecs() const { return nsecs() / NSEC_PER_MSEC; } - -int64_t Time::usecs() const { return nsecs() / NSEC_PER_USEC; } }} |