summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-11-29 14:36:08 +0000
committerAlan Conway <aconway@apache.org>2006-11-29 14:36:08 +0000
commitb13e1a24fcca8797b7be5a242f164afbe17ec4f6 (patch)
treeef0362e52c125bc75b07ef3e374dabfa52254e98 /cpp/src/qpid/sys
parent16d818e749462daf5e0e43079b2e48991646c619 (diff)
downloadqpid-python-b13e1a24fcca8797b7be5a242f164afbe17ec4f6.tar.gz
Posix EventChannel implementation using epoll. Placeholder for kevents.
Dynamic thread pool EventChannelThreads to serve EventChannel. Misc cleanup/enhancements. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480582 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/AtomicCount.h71
-rw-r--r--cpp/src/qpid/sys/EventChannel.h239
-rw-r--r--cpp/src/qpid/sys/Monitor.h153
-rw-r--r--cpp/src/qpid/sys/Mutex.h151
-rw-r--r--cpp/src/qpid/sys/Runnable.cpp32
-rw-r--r--cpp/src/qpid/sys/Runnable.h20
-rw-r--r--cpp/src/qpid/sys/Socket.h30
-rw-r--r--cpp/src/qpid/sys/Thread.h42
-rw-r--r--cpp/src/qpid/sys/Time.cpp39
-rw-r--r--cpp/src/qpid/sys/Time.h46
10 files changed, 391 insertions, 432 deletions
diff --git a/cpp/src/qpid/sys/AtomicCount.h b/cpp/src/qpid/sys/AtomicCount.h
new file mode 100644
index 0000000000..b625b2c9b0
--- /dev/null
+++ b/cpp/src/qpid/sys/AtomicCount.h
@@ -0,0 +1,71 @@
+#ifndef _posix_AtomicCount_h
+#define _posix_AtomicCount_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/detail/atomic_count.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Atomic counter.
+ */
+class AtomicCount : boost::noncopyable {
+ public:
+ class ScopedDecrement : boost::noncopyable {
+ public:
+ /** Decrement counter in constructor and increment in destructor. */
+ ScopedDecrement(AtomicCount& c) : count(c) { value = --count; }
+ ~ScopedDecrement() { ++count; }
+ /** Return the value returned by the decrement. */
+ operator long() { return value; }
+ private:
+ AtomicCount& count;
+ long value;
+ };
+
+ class ScopedIncrement : boost::noncopyable {
+ public:
+ /** Increment counter in constructor and increment in destructor. */
+ ScopedIncrement(AtomicCount& c) : count(c) { ++count; }
+ ~ScopedIncrement() { --count; }
+ private:
+ AtomicCount& count;
+ };
+
+ AtomicCount(long value = 0) : count(value) {}
+
+ void operator++() { ++count ; }
+
+ long operator--() { return --count; }
+
+ operator long() const { return count; }
+
+
+ private:
+ boost::detail::atomic_count count;
+};
+
+
+}}
+
+
+#endif // _posix_AtomicCount_h
diff --git a/cpp/src/qpid/sys/EventChannel.h b/cpp/src/qpid/sys/EventChannel.h
deleted file mode 100644
index dd857c02c7..0000000000
--- a/cpp/src/qpid/sys/EventChannel.h
+++ /dev/null
@@ -1,239 +0,0 @@
-#ifndef _sys_EventChannel_h
-#define _sys_EventChannel_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <qpid/SharedObject.h>
-#include <qpid/Exception.h>
-#include <qpid/sys/Time.h>
-
-namespace qpid {
-namespace sys {
-
-class EventChannel;
-
-class Event;
-class ReadEvent;
-class WriteEvent;
-class AcceptEvent;
-class NotifyEvent;
-
-/**
- Active event channel. Events represent async IO requests or
- inter-task synchronization. Posting an Event registers interest in
- the IO or sync event. When it occurs the posted Event is
- corresponding IO or sync event occurs they are returned to one
- of the threads waiting on the channel. For more details see
- the Event hierarchy.
-*/
-class EventChannel : public qpid::SharedObject<EventChannel>
-{
- public:
- static EventChannel::shared_ptr create();
-
- virtual ~EventChannel() {}
-
- virtual void post(ReadEvent& event) = 0;
- virtual void post(WriteEvent& event) = 0;
- virtual void post(AcceptEvent& event) = 0;
- virtual void post(NotifyEvent& event) = 0;
-
- inline void post(Event& event);
-
- /**
- * Wait for the next completed event.
- * @return An Event or 0 to indicate the calling thread should shut down.
- */
- virtual Event* getEvent() = 0;
-
- /** Dispose of a system-allocated buffer. Called by ReadEvent */
- virtual void dispose(void* buffer, size_t size) = 0;
-
- protected:
- EventChannel() {}
-};
-
-
-/**
- * Base class for all events. There are two possible styles of use:
- *
- * Task style: the event is allocated as a local variable on the initiating
- * task, which blocks in wait(). Event::dispatch() resumes that task
- * with the event data available.
- *
- * Proactor style: Threads post events but do not
- * wait. Event::dispatch() processes the event in the dispatching
- * thread and then deletes itself.
- *
- * Tasks give less kernel context switching and blocking AND simpler
- * coding. Tasks can call any number of pseudo-blocking opereations
- * that are actually event post/wait pairs. At each such point the
- * current thread can continue with the task or switch to another task
- * to minimise blocking.
- *
- * With Proactor style dispatch() is an atomic unit of work as far as
- * the EventChannel is concerned. To avoid such blocking the
- * application has to be written as a collection of non-blocking
- * dispatch() callbacks, which is more complex than tasks that can
- * call pseudo-blocking operations.
- */
-class Event : private boost::noncopyable
-{
- public:
- virtual ~Event() {}
-
- /** Post this event to the channel */
- virtual void post(EventChannel& channel) = 0;
-
- /**
- * Block till the event is delivered.
- * At most one task can wait on an event.
- */
- virtual void wait() const = 0;
-
- /**
- * Dispatch the event. Runs some event-specific code, may switch
- * context to resume a waiting task.
- */
- virtual void dispatch() = 0;
-};
-
-
-/**
- * Base class for asynchronous request events, provides exception
- * handling.
- */
-class RequestEvent : public Event
-{
- public:
- /** True if the async request failed */
- bool hasException() const { return ex.get(); }
-
- const qpid::Exception& getException() const { return *ex; }
-
- void setException(std::auto_ptr<qpid::Exception>& e) { ex = e; }
-
- /** If the event has an exception throw it, else do nothing */
- void verify() const { if (ex.get()) throw *ex; }
-
- void post(EventChannel& channel) { channel.post(*this); }
-
- private:
- qpid::HeapException ex;
-};
-
-
-/** An asynchronous read event. */
-class ReadEvent : public RequestEvent {
- public:
- /**
- * Read data from fd.
- */
- ReadEvent(int fileDescriptor, void* buffer, size_t bytesToRead) :
- fd(fileDescriptor), data(buffer), size(bytesToRead) {}
-
- /** Number of bytes read. */
- size_t getBytesRead() const { verify(); return size; }
-
- /**
- * If the system supports direct access to DMA buffers then
- * it may provide a direct pointer to such a buffer to avoid
- * a copy into the user buffer.
- * @return true if getData() is returning a system-supplied buffer.
- */
- bool isSystemData() const { verify(); return channel != 0; }
-
- /**
- * Pointer to data read. Note if isSystemData() is true then this
- * is NOT the same buffer that was supplied to the constructor.
- * The user buffer is untouched. See dispose().
- */
- void* getData() const { verify(); return data; }
-
- /** Called by the event channel on completion. */
- void complete(EventChannel::shared_ptr ec, void* _data, size_t _size) {
- if (data != _data) channel = ec; data = _data; size = _size;
- }
-
- /**
- * Dispose of system-provided data buffer, if any. This is
- * automatically called by the destructor.
- */
- void dispose() { if(channel && data) channel->dispose(data,size); data=0; }
-
- ~ReadEvent() { dispose(); }
-
- void post(EventChannel& channel) { channel.post(*this); }
-
- private:
- int fd;
- void* data;
- size_t size;
- EventChannel::shared_ptr channel;
-};
-
-/** Asynchronous write event */
-class WriteEvent : public RequestEvent {
- public:
- WriteEvent(int fileDescriptor, void* buffer, size_t bytesToWrite) :
- fd(fileDescriptor), data(buffer), size(bytesToWrite) {}
-
- /** Number of bytes written */
- size_t getBytesWritten() const { verify(); return size; }
-
- void post(EventChannel& channel) { channel.post(*this); }
-
- private:
- int fd;
- void* data;
- size_t size;
-};
-
-/** Asynchronous socket accept event */
-class AcceptEvent : public RequestEvent {
- public:
- /** Accept a connection on listeningFd */
- AcceptEvent(int listeningFd) : listen(listeningFd) {}
-
- /** Get accepted file descriptor */
- int getAcceptedFd() const { verify(); return accepted; }
-
- void post(EventChannel& channel) { channel.post(*this); }
-
- private:
- int listen;
- int accepted;
-};
-
-/**
- * NotifyEvent is delievered immediately to be dispatched by an
- * EventChannel thread.
- */
-class NotifyEvent : public RequestEvent {
- public:
- void post(EventChannel& channel) { channel.post(*this); }
-};
-
-
-inline void EventChannel::post(Event& event) { event.post(*this); }
-
-}}
-
-
-#endif /*!_sys_EventChannel_h*/
diff --git a/cpp/src/qpid/sys/Monitor.h b/cpp/src/qpid/sys/Monitor.h
index 59e1e74b57..bbe126cecb 100644
--- a/cpp/src/qpid/sys/Monitor.h
+++ b/cpp/src/qpid/sys/Monitor.h
@@ -22,60 +22,28 @@
*
*/
+#include <sys/errno.h>
#include <boost/noncopyable.hpp>
+#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Time.h>
#ifdef USE_APR
-# include <apr_thread_mutex.h>
-# include <apr_thread_cond.h>
-# include <qpid/apr/APRBase.h>
-# include <qpid/apr/APRPool.h>
-#else
-# include <pthread.h>
-# include <qpid/sys/Time.h>
-# include <qpid/posix/check.h>
+# include <apr-1/apr_thread_cond.h>
#endif
namespace qpid {
namespace sys {
-template <class L>
-class ScopedLock
-{
- public:
- ScopedLock(L& l) : mutex(l) { l.lock(); }
- ~ScopedLock() { mutex.unlock(); }
- private:
- L& mutex;
-};
-
-
-class Mutex : private boost::noncopyable
-{
- public:
- typedef ScopedLock<Mutex> ScopedLock;
-
- inline Mutex();
- inline ~Mutex();
- inline void lock();
- inline void unlock();
- inline void trylock();
-
- protected:
-#ifdef USE_APR
- apr_thread_mutex_t* mutex;
-#else
- pthread_mutex_t mutex;
-#endif
-};
-
-/** A condition variable and a mutex */
+/**
+ * A monitor is a condition variable and a mutex
+ */
class Monitor : public Mutex
{
public:
inline Monitor();
inline ~Monitor();
inline void wait();
- inline bool wait(int64_t nsecs);
+ inline bool wait(const Time& absoluteTime);
inline void notify();
inline void notifyAll();
@@ -91,25 +59,6 @@ class Monitor : public Mutex
// APR ================================================================
#ifdef USE_APR
-Mutex::Mutex() {
- CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get()));
-}
-
-Mutex::~Mutex(){
- CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
-}
-
-void Mutex::lock() {
- CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
-}
-void Mutex::unlock() {
- CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
-}
-
-void Mutex::trylock() {
- CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex));
-}
-
Monitor::Monitor() {
CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
}
@@ -122,10 +71,10 @@ void Monitor::wait() {
CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
}
-bool Monitor::wait(int64_t nsecs){
+bool Monitor::wait(const Time& absoluteTime){
// APR uses microseconds.
- apr_status_t status = apr_thread_cond_timedwait(
- condition, mutex, nsecs/1000);
+ apr_status_t status =
+ apr_thread_cond_timedwait(condition, mutex, absoluteTime/TIME_USEC);
if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status);
return status == 0;
}
@@ -138,93 +87,41 @@ void Monitor::notifyAll(){
CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
}
-
-}}
-
-
-// POSIX ================================================================
#else
-/**
- * PODMutex is a POD, can be static-initialized with
- * PODMutex m = QPID_PODMUTEX_INITIALIZER
- */
-struct PODMutex
-{
- typedef ScopedLock<PODMutex> ScopedLock;
-
- inline void lock();
- inline void unlock();
- inline void trylock();
-
- // Must be public to be a POD:
- pthread_mutex_t mutex;
-};
-
-#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER }
-
-
-void PODMutex::lock() {
- CHECK0(pthread_mutex_lock(&mutex));
-}
-void PODMutex::unlock() {
- CHECK0(pthread_mutex_unlock(&mutex));
-}
-
-void PODMutex::trylock() {
- CHECK0(pthread_mutex_trylock(&mutex));
-}
-
-
-Mutex::Mutex() {
- CHECK0(pthread_mutex_init(&mutex, 0));
-}
-
-Mutex::~Mutex(){
- CHECK0(pthread_mutex_destroy(&mutex));
-}
-
-void Mutex::lock() {
- CHECK0(pthread_mutex_lock(&mutex));
-}
-void Mutex::unlock() {
- CHECK0(pthread_mutex_unlock(&mutex));
-}
-
-void Mutex::trylock() {
- CHECK0(pthread_mutex_trylock(&mutex));
-}
+// POSIX ================================================================
Monitor::Monitor() {
- CHECK0(pthread_cond_init(&condition, 0));
+ QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0));
}
Monitor::~Monitor() {
- CHECK0(pthread_cond_destroy(&condition));
+ QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition));
}
void Monitor::wait() {
- CHECK0(pthread_cond_wait(&condition, &mutex));
+ QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex));
}
-bool Monitor::wait(int64_t nsecs){
- Time t(nsecs);
- int status = pthread_cond_timedwait(&condition, &mutex, &t.getTimespec());
- if(status != 0) {
- if (errno == ETIMEDOUT) return false;
- CHECK0(status);
+bool Monitor::wait(const Time& absoluteTime){
+ struct timespec ts;
+ toTimespec(ts, absoluteTime);
+ int status = pthread_cond_timedwait(&condition, &mutex, &ts);
+ if (status != 0) {
+ if (status == ETIMEDOUT) return false;
+ throw QPID_POSIX_ERROR(status);
}
return true;
}
void Monitor::notify(){
- CHECK0(pthread_cond_signal(&condition));
+ QPID_POSIX_THROW_IF(pthread_cond_signal(&condition));
}
void Monitor::notifyAll(){
- CHECK0(pthread_cond_broadcast(&condition));
+ QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition));
}
+#endif /*USE_APR*/
}}
-#endif /*USE_APR*/
#endif /*!_sys_Monitor_h*/
diff --git a/cpp/src/qpid/sys/Mutex.h b/cpp/src/qpid/sys/Mutex.h
new file mode 100644
index 0000000000..3ada2e98b7
--- /dev/null
+++ b/cpp/src/qpid/sys/Mutex.h
@@ -0,0 +1,151 @@
+#ifndef _sys_Mutex_h
+#define _sys_Mutex_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifdef USE_APR
+# include <apr-1/apr_thread_mutex.h>
+# include <qpid/apr/APRBase.h>
+# include <qpid/apr/APRPool.h>
+#else
+# include <pthread.h>
+# include <qpid/posix/check.h>
+#endif
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Scoped lock template: calls lock() in ctor, unlock() in dtor.
+ * L can be any class with lock() and unlock() functions.
+ */
+template <class L>
+class ScopedLock
+{
+ public:
+ ScopedLock(L& l) : mutex(l) { l.lock(); }
+ ~ScopedLock() { mutex.unlock(); }
+ private:
+ L& mutex;
+};
+
+/**
+ * Mutex lock.
+ */
+class Mutex : private boost::noncopyable {
+ public:
+ typedef ScopedLock<Mutex> ScopedLock;
+
+ inline Mutex();
+ inline ~Mutex();
+ inline void lock();
+ inline void unlock();
+ inline void trylock();
+
+ protected:
+#ifdef USE_APR
+ apr_thread_mutex_t* mutex;
+#else
+ pthread_mutex_t mutex;
+#endif
+};
+
+#ifdef USE_APR
+// APR ================================================================
+
+Mutex::Mutex() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get()));
+}
+
+Mutex::~Mutex(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+}
+
+void Mutex::lock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+}
+void Mutex::unlock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+}
+
+void Mutex::trylock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex));
+}
+
+#else
+// POSIX ================================================================
+
+/**
+ * PODMutex is a POD, can be static-initialized with
+ * PODMutex m = QPID_PODMUTEX_INITIALIZER
+ */
+struct PODMutex
+{
+ typedef ScopedLock<PODMutex> ScopedLock;
+
+ inline void lock();
+ inline void unlock();
+ inline void trylock();
+
+ // Must be public to be a POD:
+ pthread_mutex_t mutex;
+};
+
+#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER }
+
+
+void PODMutex::lock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
+}
+void PODMutex::unlock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
+}
+
+void PODMutex::trylock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex));
+}
+
+
+Mutex::Mutex() {
+ QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, 0));
+}
+
+Mutex::~Mutex(){
+ QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex));
+}
+
+void Mutex::lock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
+}
+void Mutex::unlock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
+}
+
+void Mutex::trylock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex));
+}
+
+#endif // USE_APR
+
+}}
+
+
+
+#endif /*!_sys_Mutex_h*/
diff --git a/cpp/src/qpid/sys/Runnable.cpp b/cpp/src/qpid/sys/Runnable.cpp
new file mode 100644
index 0000000000..30122c682f
--- /dev/null
+++ b/cpp/src/qpid/sys/Runnable.cpp
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Runnable.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace sys {
+
+Runnable::~Runnable() {}
+
+Runnable::Functor Runnable::functor()
+{
+ return boost::bind(&Runnable::run, this);
+}
+
+}}
diff --git a/cpp/src/qpid/sys/Runnable.h b/cpp/src/qpid/sys/Runnable.h
index 8379afb2f9..fb3927c612 100644
--- a/cpp/src/qpid/sys/Runnable.h
+++ b/cpp/src/qpid/sys/Runnable.h
@@ -1,3 +1,5 @@
+#ifndef _Runnable_
+#define _Runnable_
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,18 +20,28 @@
* under the License.
*
*/
-#ifndef _Runnable_
-#define _Runnable_
+
+#include <boost/function.hpp>
namespace qpid {
namespace sys {
-/** Base class for classes that run in a thread. */
+/**
+ * Interface for objects that can be run, e.g. in a thread.
+ */
class Runnable
{
public:
- virtual ~Runnable() {}
+ /** Type to represent a runnable as a Functor */
+ typedef boost::function0<void> Functor;
+
+ virtual ~Runnable();
+
+ /** Derived classes override run(). */
virtual void run() = 0;
+
+ /** Create a functor object that will call this->run(). */
+ Functor functor();
};
}}
diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h
index cf757e7a27..d3e8c1af48 100644
--- a/cpp/src/qpid/sys/Socket.h
+++ b/cpp/src/qpid/sys/Socket.h
@@ -23,6 +23,7 @@
*/
#include <string>
+#include <qpid/sys/Time.h>
#ifdef USE_APR
# include <apr_network_io.h>
@@ -34,10 +35,18 @@ namespace sys {
class Socket
{
public:
- Socket();
+ /** Create an initialized TCP socket */
+ static Socket createTcp();
+ /** Create a socket wrapper for descriptor. */
+#ifdef USE_APR
+ Socket(apr_socket_t* descriptor = 0);
+#else
+ Socket(int descriptor = 0);
+#endif
+
/** Set timeout for read and write */
- void setTimeout(long msecs);
+ void setTimeout(Time interval);
void connect(const std::string& host, int port);
@@ -46,19 +55,30 @@ class Socket
enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode;
/** Returns bytes sent or an ErrorCode value < 0. */
- ssize_t send(const char* data, size_t size);
+ ssize_t send(const void* data, size_t size);
/**
* Returns bytes received, an ErrorCode value < 0 or 0
* if the connection closed in an orderly manner.
*/
- ssize_t recv(char* data, size_t size);
+ ssize_t recv(void* data, size_t size);
+
+ /** Bind to a port and start listening.
+ *@param port 0 means choose an available port.
+ *@param backlog maximum number of pending connections.
+ *@return The bound port.
+ */
+ int listen(int port = 0, int backlog = 10);
+ /** Get file descriptor */
+ int fd();
+
private:
#ifdef USE_APR
apr_socket_t* socket;
#else
- int socket;
+ void init() const;
+ mutable int socket; // Initialized on demand.
#endif
};
diff --git a/cpp/src/qpid/sys/Thread.h b/cpp/src/qpid/sys/Thread.h
index 2aad7c24d7..37f714dd6c 100644
--- a/cpp/src/qpid/sys/Thread.h
+++ b/cpp/src/qpid/sys/Thread.h
@@ -40,11 +40,17 @@ namespace sys {
class Thread
{
public:
+ inline static Thread current();
+ inline static void yield();
+
inline Thread();
inline explicit Thread(qpid::sys::Runnable*);
+ inline explicit Thread(qpid::sys::Runnable&);
+
inline void join();
- inline static Thread current();
+ inline long id();
+
private:
#ifdef USE_APR
static void* APR_THREAD_FUNC runRunnable(apr_thread_t* thread, void *data);
@@ -68,12 +74,21 @@ Thread::Thread(Runnable* runnable) {
apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get()));
}
+Thread::Thread(Runnable& runnable) {
+ CHECK_APR_SUCCESS(
+ apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get()));
+}
+
void Thread::join(){
apr_status_t status;
if (thread != 0)
CHECK_APR_SUCCESS(apr_thread_join(&status, thread));
}
+long Thread::id() {
+ return long(thread);
+}
+
Thread::Thread(apr_thread_t* t) : thread(t) {}
Thread Thread::current(){
@@ -83,15 +98,29 @@ Thread Thread::current(){
return Thread(thr);
}
+void Thread::yield()
+{
+ apr_thread_yield();
+}
+
+
// POSIX ================================================================
#else
Thread::Thread(Runnable* runnable) {
- CHECK0(pthread_create(&thread, NULL, runRunnable, runnable));
+ QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable));
+}
+
+Thread::Thread(Runnable& runnable) {
+ QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable));
}
void Thread::join(){
- if (thread != 0) CHECK0(pthread_join(thread, 0));
+ QPID_POSIX_THROW_IF(pthread_join(thread, 0));
+}
+
+long Thread::id() {
+ return long(thread);
}
Thread::Thread(pthread_t thr) : thread(thr) {}
@@ -99,6 +128,13 @@ Thread::Thread(pthread_t thr) : thread(thr) {}
Thread Thread::current() {
return Thread(pthread_self());
}
+
+void Thread::yield()
+{
+ QPID_POSIX_THROW_IF(pthread_yield());
+}
+
+
#endif
}}
diff --git a/cpp/src/qpid/sys/Time.cpp b/cpp/src/qpid/sys/Time.cpp
index 3971297ec2..ad6185b966 100644
--- a/cpp/src/qpid/sys/Time.cpp
+++ b/cpp/src/qpid/sys/Time.cpp
@@ -27,37 +27,34 @@ namespace sys {
// APR ================================================================
#if USE_APR
-Time Time::now() {
- return Time(apr_time_now(), NSEC_PER_USEC);
-}
-
-void Time::set(int64_t ticks, long nsec_per_tick) {
- time = (ticks * nsec_per_tick) / NSEC_PER_USEC;
-}
-
-int64_t Time::nsecs() const {
- return time * NSEC_PER_USEC;
-}
+Time now() { return apr_time_now() * TIME_USEC; }
// POSIX================================================================
#else
-Time Time::now() {
- Time t;
- clock_gettime(CLOCK_REALTIME, &t.time);
- return t;
+Time now() {
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME, &ts);
+ return toTime(ts);
+}
+
+struct timespec toTimespec(const Time& t) {
+ struct timespec ts;
+ toTimespec(ts, t);
+ return ts;
}
-void Time::set(int64_t ticks, long nsec_per_tick) {
- int64_t ns = ticks * nsec_per_tick;
- time.tv_sec = ns / NSEC_PER_SEC;
- time.tv_nsec = ns % NSEC_PER_SEC;
+struct timespec& toTimespec(struct timespec& ts, const Time& t) {
+ ts.tv_sec = t / TIME_SEC;
+ ts.tv_nsec = t % TIME_SEC;
+ return ts;
}
-int64_t Time::nsecs() const {
- return time.tv_sec * NSEC_PER_SEC + time.tv_nsec;
+Time toTime(const struct timespec& ts) {
+ return ts.tv_sec*TIME_SEC + ts.tv_nsec;
}
+
#endif
}}
diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h
index a569c90780..3dd46741d8 100644
--- a/cpp/src/qpid/sys/Time.h
+++ b/cpp/src/qpid/sys/Time.h
@@ -33,43 +33,25 @@
namespace qpid {
namespace sys {
-class Time
-{
- public:
- static Time now();
-
- enum {
- NSEC_PER_SEC=1000*1000*1000,
- NSEC_PER_MSEC=1000*1000,
- NSEC_PER_USEC=1000
- };
+/** Time in nanoseconds */
+typedef int64_t Time;
- inline Time(int64_t ticks=0, long nsec_per_tick=1);
+Time now();
- void set(int64_t ticks, long nsec_per_tick=1);
-
- inline int64_t msecs() const;
- inline int64_t usecs() const;
- int64_t nsecs() const;
+/** Nanoseconds per second. */
+const Time TIME_SEC = 1000*1000*1000;
+/** Nanoseconds per millisecond */
+const Time TIME_MSEC = 1000*1000;
+/** Nanoseconds per microseconds. */
+const Time TIME_USEC = 1000;
+/** Nanoseconds per nanosecond. */
+const Time TIME_NSEC = 1;
#ifndef USE_APR
- const struct timespec& getTimespec() const { return time; }
- struct timespec& getTimespec() { return time; }
+struct timespec toTimespec(const Time& t);
+struct timespec& toTimespec(struct timespec& ts, const Time& t);
+Time toTime(const struct timespec& ts);
#endif
-
- private:
-#ifdef USE_APR
- apr_time_t time;
-#else
- struct timespec time;
-#endif
-};
-
-Time::Time(int64_t ticks, long nsec_per_tick) { set(ticks, nsec_per_tick); }
-
-int64_t Time::msecs() const { return nsecs() / NSEC_PER_MSEC; }
-
-int64_t Time::usecs() const { return nsecs() / NSEC_PER_USEC; }
}}