summaryrefslogtreecommitdiff
path: root/Final/cpp/lib/common/sys
diff options
context:
space:
mode:
Diffstat (limited to 'Final/cpp/lib/common/sys')
-rw-r--r--Final/cpp/lib/common/sys/Acceptor.h47
-rw-r--r--Final/cpp/lib/common/sys/AtomicCount.h74
-rw-r--r--Final/cpp/lib/common/sys/Module.h163
-rw-r--r--Final/cpp/lib/common/sys/Monitor.h129
-rw-r--r--Final/cpp/lib/common/sys/Mutex.h158
-rw-r--r--Final/cpp/lib/common/sys/Runnable.cpp35
-rw-r--r--Final/cpp/lib/common/sys/Runnable.h50
-rw-r--r--Final/cpp/lib/common/sys/SessionContext.h41
-rw-r--r--Final/cpp/lib/common/sys/SessionHandler.h45
-rw-r--r--Final/cpp/lib/common/sys/SessionHandlerFactory.h46
-rw-r--r--Final/cpp/lib/common/sys/ShutdownHandler.h37
-rw-r--r--Final/cpp/lib/common/sys/Socket.h89
-rw-r--r--Final/cpp/lib/common/sys/Thread.h150
-rw-r--r--Final/cpp/lib/common/sys/Time.cpp60
-rw-r--r--Final/cpp/lib/common/sys/Time.h58
-rw-r--r--Final/cpp/lib/common/sys/TimeoutHandler.h39
-rw-r--r--Final/cpp/lib/common/sys/apr/APRAcceptor.cpp130
-rw-r--r--Final/cpp/lib/common/sys/apr/APRBase.cpp90
-rw-r--r--Final/cpp/lib/common/sys/apr/APRBase.h78
-rw-r--r--Final/cpp/lib/common/sys/apr/APRPool.cpp78
-rw-r--r--Final/cpp/lib/common/sys/apr/APRPool.h61
-rw-r--r--Final/cpp/lib/common/sys/apr/APRSocket.cpp77
-rw-r--r--Final/cpp/lib/common/sys/apr/APRSocket.h48
-rw-r--r--Final/cpp/lib/common/sys/apr/LFProcessor.cpp181
-rw-r--r--Final/cpp/lib/common/sys/apr/LFProcessor.h122
-rw-r--r--Final/cpp/lib/common/sys/apr/LFSessionContext.cpp181
-rw-r--r--Final/cpp/lib/common/sys/apr/LFSessionContext.h90
-rw-r--r--Final/cpp/lib/common/sys/apr/Socket.cpp94
-rw-r--r--Final/cpp/lib/common/sys/apr/Thread.cpp37
-rw-r--r--Final/cpp/lib/common/sys/posix/EventChannel.cpp328
-rw-r--r--Final/cpp/lib/common/sys/posix/EventChannel.h179
-rw-r--r--Final/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp149
-rw-r--r--Final/cpp/lib/common/sys/posix/EventChannelConnection.cpp232
-rw-r--r--Final/cpp/lib/common/sys/posix/EventChannelConnection.h105
-rw-r--r--Final/cpp/lib/common/sys/posix/EventChannelThreads.cpp122
-rw-r--r--Final/cpp/lib/common/sys/posix/EventChannelThreads.h95
-rw-r--r--Final/cpp/lib/common/sys/posix/PosixAcceptor.cpp48
-rw-r--r--Final/cpp/lib/common/sys/posix/Socket.cpp120
-rw-r--r--Final/cpp/lib/common/sys/posix/Thread.cpp31
-rw-r--r--Final/cpp/lib/common/sys/posix/check.cpp39
-rw-r--r--Final/cpp/lib/common/sys/posix/check.h62
41 files changed, 3998 insertions, 0 deletions
diff --git a/Final/cpp/lib/common/sys/Acceptor.h b/Final/cpp/lib/common/sys/Acceptor.h
new file mode 100644
index 0000000000..e6bc27a593
--- /dev/null
+++ b/Final/cpp/lib/common/sys/Acceptor.h
@@ -0,0 +1,47 @@
+#ifndef _sys_Acceptor_h
+#define _sys_Acceptor_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdint.h>
+#include <SharedObject.h>
+
+namespace qpid {
+namespace sys {
+
+class SessionHandlerFactory;
+
+class Acceptor : public qpid::SharedObject<Acceptor>
+{
+ public:
+ static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false);
+ virtual ~Acceptor() = 0;
+ virtual int16_t getPort() const = 0;
+ virtual void run(qpid::sys::SessionHandlerFactory* factory) = 0;
+ virtual void shutdown() = 0;
+};
+
+}}
+
+
+
+#endif /*!_sys_Acceptor_h*/
diff --git a/Final/cpp/lib/common/sys/AtomicCount.h b/Final/cpp/lib/common/sys/AtomicCount.h
new file mode 100644
index 0000000000..b3d099192f
--- /dev/null
+++ b/Final/cpp/lib/common/sys/AtomicCount.h
@@ -0,0 +1,74 @@
+#ifndef _posix_AtomicCount_h
+#define _posix_AtomicCount_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/detail/atomic_count.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Atomic counter.
+ */
+class AtomicCount : boost::noncopyable {
+ public:
+ class ScopedDecrement : boost::noncopyable {
+ public:
+ /** Decrement counter in constructor and increment in destructor. */
+ ScopedDecrement(AtomicCount& c) : count(c) { value = --count; }
+ ~ScopedDecrement() { ++count; }
+ /** Return the value returned by the decrement. */
+ operator long() { return value; }
+ private:
+ AtomicCount& count;
+ long value;
+ };
+
+ class ScopedIncrement : boost::noncopyable {
+ public:
+ /** Increment counter in constructor and increment in destructor. */
+ ScopedIncrement(AtomicCount& c) : count(c) { ++count; }
+ ~ScopedIncrement() { --count; }
+ private:
+ AtomicCount& count;
+ };
+
+ AtomicCount(long value = 0) : count(value) {}
+
+ void operator++() { ++count ; }
+
+ long operator--() { return --count; }
+
+ operator long() const { return count; }
+
+
+ private:
+ boost::detail::atomic_count count;
+};
+
+
+}}
+
+
+#endif // _posix_AtomicCount_h
diff --git a/Final/cpp/lib/common/sys/Module.h b/Final/cpp/lib/common/sys/Module.h
new file mode 100644
index 0000000000..64ed309afb
--- /dev/null
+++ b/Final/cpp/lib/common/sys/Module.h
@@ -0,0 +1,163 @@
+#ifndef _sys_Module_h
+#define _sys_Module_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include <iostream>
+#include <QpidError.h>
+
+namespace qpid {
+namespace sys {
+#if USE_APR
+#include <apr_dso.h>
+ typedef apr_dso_handle_t* dso_handle_t;
+#else
+ typedef void* dso_handle_t;
+#endif
+
+ template <class T> class Module : private boost::noncopyable
+ {
+ typedef T* create_t();
+ typedef void destroy_t(T*);
+
+ dso_handle_t handle;
+ destroy_t* destroy;
+ T* ptr;
+
+ void load(const std::string& name);
+ void unload();
+ void* getSymbol(const std::string& name);
+
+ public:
+ Module(const std::string& name);
+ T* operator->();
+ T* get();
+ ~Module() throw();
+ };
+
+}
+}
+
+using namespace qpid::sys;
+
+template <class T> Module<T>::Module(const std::string& module) : destroy(0), ptr(0)
+{
+ load(module);
+ //TODO: need a better strategy for symbol names to allow multiple
+ //modules to be loaded without clashes...
+
+ //Note: need the double cast to avoid errors in casting from void* to function pointer with -pedantic
+ create_t* create = reinterpret_cast<create_t*>(reinterpret_cast<intptr_t>(getSymbol("create")));
+ destroy = reinterpret_cast<destroy_t*>(reinterpret_cast<intptr_t>(getSymbol("destroy")));
+ ptr = create();
+}
+
+template <class T> T* Module<T>::operator->()
+{
+ return ptr;
+}
+
+template <class T> T* Module<T>::get()
+{
+ return ptr;
+}
+
+template <class T> Module<T>::~Module() throw()
+{
+ try {
+ if (handle && ptr) {
+ destroy(ptr);
+ }
+ if (handle) unload();
+ } catch (std::exception& e) {
+ std::cout << "Error while destroying module: " << e.what() << std::endl;
+ }
+ destroy = 0;
+ handle = 0;
+ ptr = 0;
+}
+
+// APR ================================================================
+#if USE_APR
+
+#include <apr/APRBase.h>
+#include <apr/APRPool.h>
+
+template <class T> void Module<T>::load(const std::string& name)
+{
+ apr_pool_t* pool = APRPool::get();
+ CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), pool));
+ APRPool::free(pool);
+}
+
+template <class T> void Module<T>::unload()
+{
+ CHECK_APR_SUCCESS(apr_dso_unload(handle));
+}
+
+template <class T> void* Module<T>::getSymbol(const std::string& name)
+{
+ apr_dso_handle_sym_t symbol;
+ CHECK_APR_SUCCESS(apr_dso_sym(&symbol, handle, name.c_str()));
+ return (void*) symbol;
+}
+
+// POSIX================================================================
+#else
+
+#include <dlfcn.h>
+
+template <class T> void Module<T>::load(const std::string& name)
+{
+ dlerror();
+ handle = dlopen(name.c_str(), RTLD_NOW);
+ const char* error = dlerror();
+ if (error) {
+ THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ }
+}
+
+template <class T> void Module<T>::unload()
+{
+ dlerror();
+ dlclose(handle);
+ const char* error = dlerror();
+ if (error) {
+ THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ }
+}
+
+template <class T> void* Module<T>::getSymbol(const std::string& name)
+{
+ dlerror();
+ void* sym = dlsym(handle, name.c_str());
+ const char* error = dlerror();
+ if (error) {
+ THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ }
+ return sym;
+}
+
+#endif //if USE_APR
+
+#endif //ifndef _sys_Module_h
+
diff --git a/Final/cpp/lib/common/sys/Monitor.h b/Final/cpp/lib/common/sys/Monitor.h
new file mode 100644
index 0000000000..c615a97aa3
--- /dev/null
+++ b/Final/cpp/lib/common/sys/Monitor.h
@@ -0,0 +1,129 @@
+#ifndef _sys_Monitor_h
+#define _sys_Monitor_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/errno.h>
+#include <boost/noncopyable.hpp>
+#include <sys/Mutex.h>
+#include <sys/Time.h>
+
+#ifdef USE_APR
+# include <apr_thread_cond.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A monitor is a condition variable and a mutex
+ */
+class Monitor : public Mutex
+{
+ public:
+ inline Monitor();
+ inline ~Monitor();
+ inline void wait();
+ inline bool wait(const Time& absoluteTime);
+ inline void notify();
+ inline void notifyAll();
+
+ private:
+#ifdef USE_APR
+ apr_thread_cond_t* condition;
+#else
+ pthread_cond_t condition;
+#endif
+};
+
+
+// APR ================================================================
+#ifdef USE_APR
+
+Monitor::Monitor() {
+ apr_pool_t* pool = APRPool::get();
+ CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool));
+ APRPool::free(pool);
+}
+
+Monitor::~Monitor() {
+ CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
+}
+
+void Monitor::wait() {
+ CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
+}
+
+bool Monitor::wait(const Time& absoluteTime){
+ // APR uses microseconds.
+ apr_status_t status =
+ apr_thread_cond_timedwait(condition, mutex, absoluteTime/TIME_USEC);
+ if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status);
+ return status == 0;
+}
+
+void Monitor::notify(){
+ CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
+}
+
+void Monitor::notifyAll(){
+ CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
+}
+
+#else
+// POSIX ================================================================
+
+Monitor::Monitor() {
+ QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0));
+}
+
+Monitor::~Monitor() {
+ QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition));
+}
+
+void Monitor::wait() {
+ QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex));
+}
+
+bool Monitor::wait(const Time& absoluteTime){
+ struct timespec ts;
+ toTimespec(ts, absoluteTime);
+ int status = pthread_cond_timedwait(&condition, &mutex, &ts);
+ if (status != 0) {
+ if (status == ETIMEDOUT) return false;
+ throw QPID_POSIX_ERROR(status);
+ }
+ return true;
+}
+
+void Monitor::notify(){
+ QPID_POSIX_THROW_IF(pthread_cond_signal(&condition));
+}
+
+void Monitor::notifyAll(){
+ QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition));
+}
+#endif /*USE_APR*/
+
+
+}}
+#endif /*!_sys_Monitor_h*/
diff --git a/Final/cpp/lib/common/sys/Mutex.h b/Final/cpp/lib/common/sys/Mutex.h
new file mode 100644
index 0000000000..ae1d6369af
--- /dev/null
+++ b/Final/cpp/lib/common/sys/Mutex.h
@@ -0,0 +1,158 @@
+#ifndef _sys_Mutex_h
+#define _sys_Mutex_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifdef USE_APR
+# include <apr_thread_mutex.h>
+# include <apr_pools.h>
+# include "apr/APRBase.h"
+# include "apr/APRPool.h"
+#else
+# include <pthread.h>
+# include <posix/check.h>
+#endif
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Scoped lock template: calls lock() in ctor, unlock() in dtor.
+ * L can be any class with lock() and unlock() functions.
+ */
+template <class L>
+class ScopedLock
+{
+ public:
+ ScopedLock(L& l) : mutex(l) { l.lock(); }
+ ~ScopedLock() { mutex.unlock(); }
+ private:
+ L& mutex;
+};
+
+/**
+ * Mutex lock.
+ */
+class Mutex : private boost::noncopyable {
+ public:
+ typedef ScopedLock<Mutex> ScopedLock;
+
+ inline Mutex();
+ inline ~Mutex();
+ inline void lock();
+ inline void unlock();
+ inline void trylock();
+
+ protected:
+#ifdef USE_APR
+ apr_thread_mutex_t* mutex;
+ apr_pool_t* pool;
+#else
+ pthread_mutex_t mutex;
+#endif
+};
+
+#ifdef USE_APR
+// APR ================================================================
+
+Mutex::Mutex() {
+ pool = APRPool::get();
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
+}
+
+Mutex::~Mutex(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+ APRPool::free(pool);
+}
+
+void Mutex::lock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+}
+void Mutex::unlock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+}
+
+void Mutex::trylock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex));
+}
+
+#else
+// POSIX ================================================================
+
+/**
+ * PODMutex is a POD, can be static-initialized with
+ * PODMutex m = QPID_PODMUTEX_INITIALIZER
+ */
+struct PODMutex
+{
+ typedef ScopedLock<PODMutex> ScopedLock;
+
+ inline void lock();
+ inline void unlock();
+ inline void trylock();
+
+ // Must be public to be a POD:
+ pthread_mutex_t mutex;
+};
+
+#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER }
+
+
+void PODMutex::lock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
+}
+void PODMutex::unlock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
+}
+
+void PODMutex::trylock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex));
+}
+
+
+Mutex::Mutex() {
+ QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, 0));
+}
+
+Mutex::~Mutex(){
+ QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex));
+}
+
+void Mutex::lock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
+}
+void Mutex::unlock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
+}
+
+void Mutex::trylock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex));
+}
+
+#endif // USE_APR
+
+}}
+
+
+
+#endif /*!_sys_Mutex_h*/
diff --git a/Final/cpp/lib/common/sys/Runnable.cpp b/Final/cpp/lib/common/sys/Runnable.cpp
new file mode 100644
index 0000000000..c3174b9bc8
--- /dev/null
+++ b/Final/cpp/lib/common/sys/Runnable.cpp
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Runnable.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace sys {
+
+Runnable::~Runnable() {}
+
+Runnable::Functor Runnable::functor()
+{
+ return boost::bind(&Runnable::run, this);
+}
+
+}}
diff --git a/Final/cpp/lib/common/sys/Runnable.h b/Final/cpp/lib/common/sys/Runnable.h
new file mode 100644
index 0000000000..fb3927c612
--- /dev/null
+++ b/Final/cpp/lib/common/sys/Runnable.h
@@ -0,0 +1,50 @@
+#ifndef _Runnable_
+#define _Runnable_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Interface for objects that can be run, e.g. in a thread.
+ */
+class Runnable
+{
+ public:
+ /** Type to represent a runnable as a Functor */
+ typedef boost::function0<void> Functor;
+
+ virtual ~Runnable();
+
+ /** Derived classes override run(). */
+ virtual void run() = 0;
+
+ /** Create a functor object that will call this->run(). */
+ Functor functor();
+};
+
+}}
+
+
+#endif
diff --git a/Final/cpp/lib/common/sys/SessionContext.h b/Final/cpp/lib/common/sys/SessionContext.h
new file mode 100644
index 0000000000..671e00774f
--- /dev/null
+++ b/Final/cpp/lib/common/sys/SessionContext.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SessionContext_
+#define _SessionContext_
+
+#include <OutputHandler.h>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Provides the output handler associated with a connection.
+ */
+class SessionContext : public virtual qpid::framing::OutputHandler
+{
+ public:
+ virtual void close() = 0;
+};
+
+}}
+
+
+#endif
diff --git a/Final/cpp/lib/common/sys/SessionHandler.h b/Final/cpp/lib/common/sys/SessionHandler.h
new file mode 100644
index 0000000000..76f79d421d
--- /dev/null
+++ b/Final/cpp/lib/common/sys/SessionHandler.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SessionHandler_
+#define _SessionHandler_
+
+#include <InputHandler.h>
+#include <InitiationHandler.h>
+#include <ProtocolInitiation.h>
+#include <sys/TimeoutHandler.h>
+
+namespace qpid {
+namespace sys {
+
+ class SessionHandler :
+ public qpid::framing::InitiationHandler,
+ public qpid::framing::InputHandler,
+ public TimeoutHandler
+ {
+ public:
+ virtual void closed() = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/common/sys/SessionHandlerFactory.h b/Final/cpp/lib/common/sys/SessionHandlerFactory.h
new file mode 100644
index 0000000000..2a01aebcb0
--- /dev/null
+++ b/Final/cpp/lib/common/sys/SessionHandlerFactory.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SessionHandlerFactory_
+#define _SessionHandlerFactory_
+
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+class SessionContext;
+class SessionHandler;
+
+/**
+ * Callback interface used by the Acceptor to
+ * create a SessionHandler for each new connection.
+ */
+class SessionHandlerFactory : private boost::noncopyable
+{
+ public:
+ virtual SessionHandler* create(SessionContext* ctxt) = 0;
+ virtual ~SessionHandlerFactory(){}
+};
+
+}}
+
+
+#endif
diff --git a/Final/cpp/lib/common/sys/ShutdownHandler.h b/Final/cpp/lib/common/sys/ShutdownHandler.h
new file mode 100644
index 0000000000..88baecb5b6
--- /dev/null
+++ b/Final/cpp/lib/common/sys/ShutdownHandler.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ShutdownHandler_
+#define _ShutdownHandler_
+
+namespace qpid {
+namespace sys {
+
+ class ShutdownHandler
+ {
+ public:
+ virtual void shutdown() = 0;
+ virtual ~ShutdownHandler(){}
+ };
+
+}
+}
+
+#endif
diff --git a/Final/cpp/lib/common/sys/Socket.h b/Final/cpp/lib/common/sys/Socket.h
new file mode 100644
index 0000000000..b5f74847c2
--- /dev/null
+++ b/Final/cpp/lib/common/sys/Socket.h
@@ -0,0 +1,89 @@
+#ifndef _sys_Socket_h
+#define _sys_Socket_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <sys/Time.h>
+
+#ifdef USE_APR
+# include <apr_network_io.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+class Socket
+{
+ public:
+ /** Create an initialized TCP socket */
+ static Socket createTcp();
+
+ /** Create a socket wrapper for descriptor. */
+#ifdef USE_APR
+ Socket(apr_socket_t* descriptor = 0);
+#else
+ Socket(int descriptor = 0);
+#endif
+
+ /** Set timeout for read and write */
+ void setTimeout(Time interval);
+ void setTcpNoDelay(bool on);
+
+ void connect(const std::string& host, int port);
+
+ void close();
+
+ enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode;
+
+ /** Returns bytes sent or an ErrorCode value < 0. */
+ ssize_t send(const void* data, size_t size);
+
+ /**
+ * Returns bytes received, an ErrorCode value < 0 or 0
+ * if the connection closed in an orderly manner.
+ */
+ ssize_t recv(void* data, size_t size);
+
+ /** Bind to a port and start listening.
+ *@param port 0 means choose an available port.
+ *@param backlog maximum number of pending connections.
+ *@return The bound port.
+ */
+ int listen(int port = 0, int backlog = 10);
+
+ /** Get file descriptor */
+ int fd();
+
+ private:
+#ifdef USE_APR
+ apr_socket_t* socket;
+#else
+ void init() const;
+ mutable int socket; // Initialized on demand.
+#endif
+};
+
+}}
+
+
+#endif /*!_sys_Socket_h*/
diff --git a/Final/cpp/lib/common/sys/Thread.h b/Final/cpp/lib/common/sys/Thread.h
new file mode 100644
index 0000000000..c14c7cc6ad
--- /dev/null
+++ b/Final/cpp/lib/common/sys/Thread.h
@@ -0,0 +1,150 @@
+#ifndef _sys_Thread_h
+#define _sys_Thread_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/Runnable.h>
+
+#ifdef USE_APR
+# include <apr_thread_proc.h>
+# include <apr_portable.h>
+# include <apr/APRPool.h>
+# include <apr/APRBase.h>
+#else
+# include <posix/check.h>
+# include <pthread.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+class Thread
+{
+ public:
+ inline static Thread current();
+ inline static void yield();
+
+ inline Thread();
+ inline Thread(qpid::sys::Runnable*);
+ inline Thread(qpid::sys::Runnable&);
+ ~Thread();
+ inline void join();
+ inline long id();
+
+ private:
+#ifdef USE_APR
+ static void* APR_THREAD_FUNC runRunnable(apr_thread_t* thread, void *data);
+ inline Thread(apr_thread_t* t);
+ apr_thread_t* thread;
+#else
+ static void* runRunnable(void* runnable);
+ inline Thread(pthread_t);
+ pthread_t thread;
+#endif
+};
+
+
+Thread::Thread() : thread(0) {}
+
+// APR ================================================================
+#ifdef USE_APR
+
+Thread::Thread(Runnable* runnable) {
+ apr_pool_t* tmp_pool = APRPool::get();
+ CHECK_APR_SUCCESS(
+ apr_thread_create(&thread, 0, runRunnable, runnable, tmp_pool));
+ APRPool::free(tmp_pool);
+}
+
+Thread::Thread(Runnable& runnable) {
+ apr_pool_t* tmp_pool = APRPool::get();
+ CHECK_APR_SUCCESS(
+ apr_thread_create(&thread, 0, runRunnable, &runnable, tmp_pool));
+ APRPool::free(tmp_pool);
+}
+
+void Thread::join(){
+ apr_status_t status;
+ if (thread != 0)
+ CHECK_APR_SUCCESS(apr_thread_join(&status, thread));
+}
+
+long Thread::id() {
+ return long(thread);
+}
+
+Thread::Thread(apr_thread_t* t) : thread(t) {}
+
+Thread Thread::current(){
+ apr_pool_t* tmp_pool = APRPool::get();
+ apr_thread_t* thr;
+ apr_os_thread_t osthr = apr_os_thread_current();
+ CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, tmp_pool));
+ APRPool::free(tmp_pool);
+ return Thread(thr);
+}
+
+void Thread::yield()
+{
+ apr_thread_yield();
+}
+
+
+// POSIX ================================================================
+#else
+
+Thread::Thread(Runnable* runnable) {
+ QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable));
+}
+
+Thread::Thread(Runnable& runnable) {
+ QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable));
+}
+
+void Thread::join(){
+ QPID_POSIX_THROW_IF(pthread_join(thread, 0));
+}
+
+long Thread::id() {
+ return long(thread);
+}
+
+Thread::~Thread() {
+}
+
+Thread::Thread(pthread_t thr) : thread(thr) {}
+
+Thread Thread::current() {
+ return Thread(pthread_self());
+}
+
+void Thread::yield()
+{
+ QPID_POSIX_THROW_IF(pthread_yield());
+}
+
+
+#endif
+
+}}
+
+#endif /*!_sys_Thread_h*/
diff --git a/Final/cpp/lib/common/sys/Time.cpp b/Final/cpp/lib/common/sys/Time.cpp
new file mode 100644
index 0000000000..ad6185b966
--- /dev/null
+++ b/Final/cpp/lib/common/sys/Time.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Time.h"
+
+namespace qpid {
+namespace sys {
+
+// APR ================================================================
+#if USE_APR
+
+Time now() { return apr_time_now() * TIME_USEC; }
+
+// POSIX================================================================
+#else
+
+Time now() {
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME, &ts);
+ return toTime(ts);
+}
+
+struct timespec toTimespec(const Time& t) {
+ struct timespec ts;
+ toTimespec(ts, t);
+ return ts;
+}
+
+struct timespec& toTimespec(struct timespec& ts, const Time& t) {
+ ts.tv_sec = t / TIME_SEC;
+ ts.tv_nsec = t % TIME_SEC;
+ return ts;
+}
+
+Time toTime(const struct timespec& ts) {
+ return ts.tv_sec*TIME_SEC + ts.tv_nsec;
+}
+
+
+#endif
+}}
+
diff --git a/Final/cpp/lib/common/sys/Time.h b/Final/cpp/lib/common/sys/Time.h
new file mode 100644
index 0000000000..3dd46741d8
--- /dev/null
+++ b/Final/cpp/lib/common/sys/Time.h
@@ -0,0 +1,58 @@
+#ifndef _sys_Time_h
+#define _sys_Time_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdint.h>
+
+#ifdef USE_APR
+# include <apr_time.h>
+#else
+# include <time.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+/** Time in nanoseconds */
+typedef int64_t Time;
+
+Time now();
+
+/** Nanoseconds per second. */
+const Time TIME_SEC = 1000*1000*1000;
+/** Nanoseconds per millisecond */
+const Time TIME_MSEC = 1000*1000;
+/** Nanoseconds per microseconds. */
+const Time TIME_USEC = 1000;
+/** Nanoseconds per nanosecond. */
+const Time TIME_NSEC = 1;
+
+#ifndef USE_APR
+struct timespec toTimespec(const Time& t);
+struct timespec& toTimespec(struct timespec& ts, const Time& t);
+Time toTime(const struct timespec& ts);
+#endif
+
+}}
+
+#endif /*!_sys_Time_h*/
diff --git a/Final/cpp/lib/common/sys/TimeoutHandler.h b/Final/cpp/lib/common/sys/TimeoutHandler.h
new file mode 100644
index 0000000000..0c10709bbf
--- /dev/null
+++ b/Final/cpp/lib/common/sys/TimeoutHandler.h
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TimeoutHandler_
+#define _TimeoutHandler_
+
+namespace qpid {
+namespace sys {
+
+ class TimeoutHandler
+ {
+ public:
+ virtual void idleOut() = 0;
+ virtual void idleIn() = 0;
+ virtual ~TimeoutHandler(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/common/sys/apr/APRAcceptor.cpp b/Final/cpp/lib/common/sys/apr/APRAcceptor.cpp
new file mode 100644
index 0000000000..a427542fc3
--- /dev/null
+++ b/Final/cpp/lib/common/sys/apr/APRAcceptor.cpp
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <sys/Acceptor.h>
+#include <sys/SessionHandlerFactory.h>
+#include <apr_pools.h>
+#include "LFProcessor.h"
+#include "LFSessionContext.h"
+#include "APRBase.h"
+#include "APRPool.h"
+
+namespace qpid {
+namespace sys {
+
+class APRAcceptor : public Acceptor
+{
+ public:
+ APRAcceptor(int16_t port, int backlog, int threads, bool trace);
+ ~APRAcceptor();
+ virtual int16_t getPort() const;
+ virtual void run(qpid::sys::SessionHandlerFactory* factory);
+ virtual void shutdown();
+
+ private:
+ void shutdownImpl();
+
+ private:
+ int16_t port;
+ bool trace;
+ LFProcessor processor;
+ apr_socket_t* socket;
+ volatile bool running;
+ Mutex shutdownLock;
+ apr_pool_t* pool;
+};
+
+// Define generic Acceptor::create() to return APRAcceptor.
+Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace)
+{
+ return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace));
+}
+// Must define Acceptor virtual dtor.
+Acceptor::~Acceptor() {
+}
+
+APRAcceptor::~APRAcceptor() {
+ APRPool::free(pool);
+}
+
+APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
+ port(port_),
+ trace(trace_),
+ processor(threads, 1000, 5000000)
+{
+ pool = APRPool::get();
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
+ CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
+ CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
+}
+
+int16_t APRAcceptor::getPort() const {
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
+ return address->port;
+}
+
+void APRAcceptor::run(SessionHandlerFactory* factory) {
+ running = true;
+ processor.start();
+ std::cout << "Listening on port " << getPort() << "..." << std::endl;
+ while(running){
+ apr_socket_t* client;
+ apr_status_t status = apr_socket_accept(&client, socket, pool);
+ if(status == APR_SUCCESS){
+ //make this socket non-blocking:
+ CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
+ LFSessionContext* session = new LFSessionContext(client, &processor, trace);
+ session->init(factory->create(session));
+ }else{
+ Mutex::ScopedLock locker(shutdownLock);
+ if(running) {
+ if(status != APR_EINTR){
+ std::cout << "ERROR: " << get_desc(status) << std::endl;
+ }
+ shutdownImpl();
+ }
+ }
+ }
+}
+
+void APRAcceptor::shutdown() {
+ Mutex::ScopedLock locker(shutdownLock);
+ if (running) {
+ shutdownImpl();
+ }
+}
+
+void APRAcceptor::shutdownImpl() {
+ Mutex::ScopedLock locker(shutdownLock);
+ running = false;
+ processor.stop();
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+}
+
+
+}}
diff --git a/Final/cpp/lib/common/sys/apr/APRBase.cpp b/Final/cpp/lib/common/sys/apr/APRBase.cpp
new file mode 100644
index 0000000000..861071499f
--- /dev/null
+++ b/Final/cpp/lib/common/sys/apr/APRBase.cpp
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <QpidError.h>
+#include "APRBase.h"
+
+using namespace qpid::sys;
+
+APRBase* APRBase::instance = 0;
+
+APRBase* APRBase::getInstance(){
+ if(instance == 0){
+ instance = new APRBase();
+ }
+ return instance;
+}
+
+
+APRBase::APRBase() : count(0){
+ apr_initialize();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, 0));
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
+}
+
+APRBase::~APRBase(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+ apr_pool_destroy(pool);
+ apr_terminate();
+}
+
+bool APRBase::_increment(){
+ bool deleted(false);
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+ if(this == instance){
+ count++;
+ }else{
+ deleted = true;
+ }
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+ return !deleted;
+}
+
+void APRBase::_decrement(){
+ APRBase* copy = 0;
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+ if(--count == 0){
+ copy = instance;
+ instance = 0;
+ }
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+ if(copy != 0){
+ delete copy;
+ }
+}
+
+void APRBase::increment(){
+ int count = 0;
+ while(count++ < 2 && !getInstance()->_increment()){
+ std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl;
+ }
+}
+
+void APRBase::decrement(){
+ getInstance()->_decrement();
+}
+
+std::string qpid::sys::get_desc(apr_status_t status){
+ const int size = 50;
+ char tmp[size];
+ return std::string(apr_strerror(status, tmp, size));
+}
+
diff --git a/Final/cpp/lib/common/sys/apr/APRBase.h b/Final/cpp/lib/common/sys/apr/APRBase.h
new file mode 100644
index 0000000000..6a866a554a
--- /dev/null
+++ b/Final/cpp/lib/common/sys/apr/APRBase.h
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _APRBase_
+#define _APRBase_
+
+#include <string>
+#include <apr_thread_mutex.h>
+#include <apr_errno.h>
+#include <QpidError.h>
+
+namespace qpid {
+namespace sys {
+
+ /**
+ * Use of APR libraries necessitates explicit init and terminate
+ * calls. Any class using APR libs should obtain the reference to
+ * this singleton and increment on construction, decrement on
+ * destruction. This class can then correctly initialise apr
+ * before the first use and terminate after the last use.
+ */
+ class APRBase{
+ static APRBase* instance;
+ apr_pool_t* pool;
+ apr_thread_mutex_t* mutex;
+ int count;
+
+ APRBase();
+ ~APRBase();
+ static APRBase* getInstance();
+ bool _increment();
+ void _decrement();
+ public:
+ static void increment();
+ static void decrement();
+ };
+
+ //this is also a convenient place for a helper function for error checking:
+ void check(apr_status_t status, const char* file, const int line);
+ std::string get_desc(apr_status_t status);
+
+#define CHECK_APR_SUCCESS(A) qpid::sys::check(A, __FILE__, __LINE__);
+
+}
+}
+
+// Inlined as it is called *a lot*
+void inline qpid::sys::check(apr_status_t status, const char* file, const int line){
+ if (status != APR_SUCCESS){
+ const int size = 50;
+ char tmp[size];
+ std::string msg(apr_strerror(status, tmp, size));
+ throw qpid::QpidError(APR_ERROR + ((int) status), msg,
+ qpid::SrcLine(file, line));
+ }
+}
+
+
+
+
+#endif
diff --git a/Final/cpp/lib/common/sys/apr/APRPool.cpp b/Final/cpp/lib/common/sys/apr/APRPool.cpp
new file mode 100644
index 0000000000..91481faf09
--- /dev/null
+++ b/Final/cpp/lib/common/sys/apr/APRPool.cpp
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "APRPool.h"
+#include "APRBase.h"
+#include <boost/pool/detail/singleton.hpp>
+#include <iostream>
+#include <sstream>
+
+
+using namespace qpid::sys;
+
+APRPool::APRPool(){
+ APRBase::increment();
+ allocated_pools = new std::stack<apr_pool_t*>();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&poolGuard, APR_THREAD_MUTEX_NESTED, pool));
+}
+
+APRPool::~APRPool(){
+ while(allocated_pools->size() > 0) {
+ apr_pool_t* pool = allocated_pools->top();
+ allocated_pools->pop();
+ apr_pool_destroy(pool);
+ }
+ apr_pool_destroy(pool);
+ apr_thread_mutex_destroy(poolGuard);
+ delete allocated_pools;
+ APRBase::decrement();
+}
+
+void APRPool::free_pool(apr_pool_t* pool) {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(poolGuard));
+ allocated_pools->push(pool);
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(poolGuard));
+}
+
+apr_pool_t* APRPool::allocate_pool() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(poolGuard));
+ apr_pool_t* retval;
+ if (allocated_pools->size() == 0) {
+ CHECK_APR_SUCCESS(apr_pool_create(&retval, pool));
+ }
+ else {
+ retval = allocated_pools->top();
+ allocated_pools->pop();
+ }
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(poolGuard));
+ return retval;
+}
+
+apr_pool_t* APRPool::get() {
+ return
+ boost::details::pool::singleton_default<APRPool>::instance().allocate_pool();
+}
+
+void APRPool::free(apr_pool_t* pool) {
+ boost::details::pool::singleton_default<APRPool>::instance().free_pool(pool);
+}
+
diff --git a/Final/cpp/lib/common/sys/apr/APRPool.h b/Final/cpp/lib/common/sys/apr/APRPool.h
new file mode 100644
index 0000000000..c22338599e
--- /dev/null
+++ b/Final/cpp/lib/common/sys/apr/APRPool.h
@@ -0,0 +1,61 @@
+#ifndef _APRPool_
+#define _APRPool_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include <apr_pools.h>
+#include <apr_thread_mutex.h>
+#include <stack>
+
+namespace qpid {
+namespace sys {
+/**
+ * Singleton APR memory pool.
+ */
+class APRPool : private boost::noncopyable {
+ public:
+ APRPool();
+ ~APRPool();
+
+ apr_pool_t* allocate_pool();
+
+ void free_pool(apr_pool_t* pool);
+
+ /** Allocate pool */
+ static apr_pool_t* get();
+
+ /** Free pool */
+ static void free(apr_pool_t* pool);
+
+ private:
+ apr_pool_t* pool;
+ apr_thread_mutex_t* poolGuard;
+ std::stack<apr_pool_t*>* allocated_pools;
+};
+
+}}
+
+
+
+
+
+#endif /*!_APRPool_*/
diff --git a/Final/cpp/lib/common/sys/apr/APRSocket.cpp b/Final/cpp/lib/common/sys/apr/APRSocket.cpp
new file mode 100644
index 0000000000..f68d51d8e4
--- /dev/null
+++ b/Final/cpp/lib/common/sys/apr/APRSocket.cpp
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "APRBase.h"
+#include "APRSocket.h"
+#include <assert.h>
+#include <iostream>
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){
+
+}
+
+void APRSocket::read(qpid::framing::Buffer& buffer){
+ apr_size_t bytes;
+ bytes = buffer.available();
+ apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes);
+ buffer.move(bytes);
+ if(APR_STATUS_IS_TIMEUP(s)){
+ //timed out
+ }else if(APR_STATUS_IS_EOF(s)){
+ close();
+ }
+}
+
+void APRSocket::write(qpid::framing::Buffer& buffer){
+ apr_size_t bytes;
+ do{
+ bytes = buffer.available();
+ apr_socket_send(socket, buffer.start(), &bytes);
+ buffer.move(bytes);
+ }while(bytes > 0);
+}
+
+void APRSocket::close(){
+ if(!closed){
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ closed = true;
+ }
+}
+
+bool APRSocket::isOpen(){
+ return !closed;
+}
+
+u_int8_t APRSocket::read(){
+ char data[1];
+ apr_size_t bytes = 1;
+ apr_status_t s = apr_socket_recv(socket, data, &bytes);
+ if(APR_STATUS_IS_EOF(s) || bytes == 0){
+ return 0;
+ }else{
+ return *data;
+ }
+}
+
+APRSocket::~APRSocket(){
+}
diff --git a/Final/cpp/lib/common/sys/apr/APRSocket.h b/Final/cpp/lib/common/sys/apr/APRSocket.h
new file mode 100644
index 0000000000..53f1055c6a
--- /dev/null
+++ b/Final/cpp/lib/common/sys/apr/APRSocket.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _APRSocket_
+#define _APRSocket_
+
+#include <apr_network_io.h>
+#include <Buffer.h>
+
+namespace qpid {
+namespace sys {
+
+ class APRSocket
+ {
+ apr_socket_t* const socket;
+ volatile bool closed;
+ public:
+ APRSocket(apr_socket_t* socket);
+ void read(qpid::framing::Buffer& b);
+ void write(qpid::framing::Buffer& b);
+ void close();
+ bool isOpen();
+ u_int8_t read();
+ ~APRSocket();
+ };
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/common/sys/apr/LFProcessor.cpp b/Final/cpp/lib/common/sys/apr/LFProcessor.cpp
new file mode 100644
index 0000000000..22b601e688
--- /dev/null
+++ b/Final/cpp/lib/common/sys/apr/LFProcessor.cpp
@@ -0,0 +1,181 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <sstream>
+#include <QpidError.h>
+#include "LFProcessor.h"
+#include "APRBase.h"
+#include "APRPool.h"
+#include "LFSessionContext.h"
+
+using namespace qpid::sys;
+using qpid::QpidError;
+
+// TODO aconway 2006-10-12: stopped is read outside locks.
+//
+
+LFProcessor::LFProcessor(int _workers, int _size, int _timeout) :
+ size(_size),
+ timeout(_timeout),
+ signalledCount(0),
+ current(0),
+ count(0),
+ workerCount(_workers),
+ hasLeader(false),
+ workers(new Thread[_workers]),
+ stopped(false)
+{
+ pool = APRPool::get();
+ CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
+}
+
+
+LFProcessor::~LFProcessor(){
+ if (!stopped) stop();
+ delete[] workers;
+ CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
+ APRPool::free(pool);
+}
+
+void LFProcessor::start(){
+ for(int i = 0; i < workerCount; i++){
+ workers[i] = Thread(this);
+ }
+}
+
+void LFProcessor::add(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+ Monitor::ScopedLock l(countLock);
+ sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data));
+ count++;
+}
+
+void LFProcessor::remove(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+ Monitor::ScopedLock l(countLock);
+ sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data)));
+ count--;
+}
+
+void LFProcessor::reactivate(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+void LFProcessor::deactivate(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+}
+
+void LFProcessor::update(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+bool LFProcessor::full(){
+ Mutex::ScopedLock locker(countLock);
+ return count == size;
+}
+
+bool LFProcessor::empty(){
+ Mutex::ScopedLock locker(countLock);
+ return count == 0;
+}
+
+void LFProcessor::poll() {
+ apr_status_t status = APR_EGENERAL;
+ do{
+ current = 0;
+ if(!stopped){
+ status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs);
+ }
+ }while(status != APR_SUCCESS && !stopped);
+}
+
+void LFProcessor::run(){
+ try{
+ while(!stopped){
+ const apr_pollfd_t* event = 0;
+ LFSessionContext* session = 0;
+ {
+ Monitor::ScopedLock l(leadLock);
+ waitToLead();
+ event = getNextEvent();
+ if(!event) return;
+ session = reinterpret_cast<LFSessionContext*>(
+ event->client_data);
+ session->startProcessing();
+ relinquishLead();
+ }
+
+ //process event:
+ if(event->rtnevents & APR_POLLIN) session->read();
+ if(event->rtnevents & APR_POLLOUT) session->write();
+
+ if(session->isClosed()){
+ session->handleClose();
+ Monitor::ScopedLock l(countLock);
+ sessions.erase(find(sessions.begin(),sessions.end(), session));
+ count--;
+ }else{
+ session->stopProcessing();
+ }
+ }
+ }catch(std::exception e){
+ std::cout << e.what() << std::endl;
+ }
+}
+
+void LFProcessor::waitToLead(){
+ while(hasLeader && !stopped) leadLock.wait();
+ hasLeader = !stopped;
+}
+
+void LFProcessor::relinquishLead(){
+ hasLeader = false;
+ leadLock.notify();
+}
+
+const apr_pollfd_t* LFProcessor::getNextEvent(){
+ while(true){
+ if(stopped){
+ return 0;
+ }else if(current < signalledCount){
+ //use result of previous poll if one is available
+ return signalledFDs + (current++);
+ }else{
+ //else poll to get new events
+ poll();
+ }
+ }
+}
+
+void LFProcessor::stop(){
+ stopped = true;
+ {
+ Monitor::ScopedLock l(leadLock);
+ leadLock.notifyAll();
+ }
+ for(int i = 0; i < workerCount; i++){
+ workers[i].join();
+ }
+ for(iterator i = sessions.begin(); i < sessions.end(); i++){
+ (*i)->shutdown();
+ }
+}
+
diff --git a/Final/cpp/lib/common/sys/apr/LFProcessor.h b/Final/cpp/lib/common/sys/apr/LFProcessor.h
new file mode 100644
index 0000000000..0f4850ee08
--- /dev/null
+++ b/Final/cpp/lib/common/sys/apr/LFProcessor.h
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _LFProcessor_
+#define _LFProcessor_
+
+#include <apr_poll.h>
+#include <iostream>
+#include <vector>
+#include <sys/Monitor.h>
+#include <sys/Runnable.h>
+#include <sys/Thread.h>
+
+namespace qpid {
+namespace sys {
+
+ class LFSessionContext;
+
+ /**
+ * This class processes a poll set using the leaders-followers
+ * pattern for thread synchronization: the leader will poll and on
+ * the poll returning, it will remove a session, promote a
+ * follower to leadership, then process the session.
+ */
+ class LFProcessor : private virtual qpid::sys::Runnable
+ {
+ typedef std::vector<LFSessionContext*>::iterator iterator;
+
+ const int size;
+ const apr_interval_time_t timeout;
+ apr_pollset_t* pollset;
+ int signalledCount;
+ int current;
+ const apr_pollfd_t* signalledFDs;
+ int count;
+ const int workerCount;
+ bool hasLeader;
+ qpid::sys::Thread* workers;
+ qpid::sys::Monitor leadLock;
+ qpid::sys::Mutex countLock;
+ std::vector<LFSessionContext*> sessions;
+ volatile bool stopped;
+ apr_pool_t* pool;
+
+ const apr_pollfd_t* getNextEvent();
+ void waitToLead();
+ void relinquishLead();
+ void poll();
+ virtual void run();
+
+ public:
+ LFProcessor(int workers, int size, int timeout);
+ /**
+ * Add the fd to the poll set. Relies on the client_data being
+ * an instance of LFSessionContext.
+ */
+ void add(const apr_pollfd_t* const fd);
+ /**
+ * Remove the fd from the poll set.
+ */
+ void remove(const apr_pollfd_t* const fd);
+ /**
+ * Signal that the fd passed in, already part of the pollset,
+ * has had its flags altered.
+ */
+ void update(const apr_pollfd_t* const fd);
+ /**
+ * Add an fd back to the poll set after deactivation.
+ */
+ void reactivate(const apr_pollfd_t* const fd);
+ /**
+ * Temporarily remove the fd from the poll set. Called when processing
+ * is about to begin.
+ */
+ void deactivate(const apr_pollfd_t* const fd);
+ /**
+ * Indicates whether the capacity of this processor has been
+ * reached (or whether it can still handle further fd's).
+ */
+ bool full();
+ /**
+ * Indicates whether there are any fd's registered.
+ */
+ bool empty();
+ /**
+ * Stop processing.
+ */
+ void stop();
+ /**
+ * Start processing.
+ */
+ void start();
+ /**
+ * Is processing stopped?
+ */
+ bool isStopped();
+
+ ~LFProcessor();
+ };
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/common/sys/apr/LFSessionContext.cpp b/Final/cpp/lib/common/sys/apr/LFSessionContext.cpp
new file mode 100644
index 0000000000..8a7ce18136
--- /dev/null
+++ b/Final/cpp/lib/common/sys/apr/LFSessionContext.cpp
@@ -0,0 +1,181 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LFSessionContext.h"
+#include "APRBase.h"
+#include "APRPool.h"
+#include <QpidError.h>
+#include <assert.h>
+
+using namespace qpid::sys;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+LFSessionContext::LFSessionContext(apr_socket_t* _socket,
+ LFProcessor* const _processor,
+ bool _debug) :
+ debug(_debug),
+ socket(_socket),
+ initiated(false),
+ in(65536),
+ out(65536),
+ processor(_processor),
+ processing(false),
+ closing(false)
+{
+
+ fd.p = APRPool::get();
+ fd.desc_type = APR_POLL_SOCKET;
+ fd.reqevents = APR_POLLIN;
+ fd.client_data = this;
+ fd.desc.s = _socket;
+
+ out.flip();
+}
+
+LFSessionContext::~LFSessionContext(){
+
+}
+
+void LFSessionContext::read(){
+ socket.read(in);
+ in.flip();
+ if(initiated){
+ AMQFrame frame;
+ try{
+ while(frame.decode(in)){
+ if(debug) log("RECV", &frame);
+ handler->received(&frame);
+ }
+ }catch(QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg
+ << " (" << error.location.file << ":" << error.location.line
+ << ")" << std::endl;
+ }
+ }else{
+ ProtocolInitiation protocolInit;
+ if(protocolInit.decode(in)){
+ handler->initiated(&protocolInit);
+ initiated = true;
+ if(debug) std::cout << "INIT [" << &socket << "]" << std::endl;
+ }
+ }
+ in.compact();
+}
+
+void LFSessionContext::write(){
+ bool done = isClosed();
+ while(!done){
+ if(out.available() > 0){
+ socket.write(out);
+ if(out.available() > 0){
+
+ //incomplete write, leave flags to receive notification of readiness to write
+ done = true;//finished processing for now, but write is still in progress
+ }
+ }else{
+ //do we have any frames to write?
+ Mutex::ScopedLock l(writeLock);
+ if(!framesToWrite.empty()){
+ out.clear();
+ bool encoded(false);
+ AMQFrame* frame = framesToWrite.front();
+ while(frame && out.available() >= frame->size()){
+ encoded = true;
+ frame->encode(out);
+ if(debug) log("SENT", frame);
+ delete frame;
+ framesToWrite.pop();
+ frame = framesToWrite.empty() ? 0 : framesToWrite.front();
+ }
+ if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
+ out.flip();
+ }else{
+ //reset flags, don't care about writability anymore
+ fd.reqevents = APR_POLLIN;
+ done = true;
+
+ if(closing){
+ socket.close();
+ }
+ }
+ }
+ }
+}
+
+void LFSessionContext::send(AMQFrame* frame){
+ Mutex::ScopedLock l(writeLock);
+ if(!closing){
+ framesToWrite.push(frame);
+ if(!(fd.reqevents & APR_POLLOUT)){
+ fd.reqevents |= APR_POLLOUT;
+ if(!processing){
+ processor->update(&fd);
+ }
+ }
+ }
+}
+
+void LFSessionContext::startProcessing(){
+ Mutex::ScopedLock l(writeLock);
+ processing = true;
+ processor->deactivate(&fd);
+}
+
+void LFSessionContext::stopProcessing(){
+ Mutex::ScopedLock l(writeLock);
+ processor->reactivate(&fd);
+ processing = false;
+}
+
+void LFSessionContext::close(){
+ Mutex::ScopedLock l(writeLock);
+ closing = true;
+ if(!processing){
+ //allow pending frames to be written to socket
+ fd.reqevents = APR_POLLOUT;
+ processor->update(&fd);
+ }
+}
+
+void LFSessionContext::handleClose(){
+ handler->closed();
+ APRPool::free(fd.p);
+ if (debug) std::cout << "Session closed [" << &socket << "]" << std::endl;
+ delete handler;
+ delete this;
+}
+
+void LFSessionContext::shutdown(){
+ socket.close();
+ handleClose();
+}
+
+void LFSessionContext::init(SessionHandler* _handler){
+ handler = _handler;
+ processor->add(&fd);
+}
+
+void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){
+ Mutex::ScopedLock l(logLock);
+ std::cout << desc << " [" << &socket << "]: " << *frame << std::endl;
+}
+
+Mutex LFSessionContext::logLock;
diff --git a/Final/cpp/lib/common/sys/apr/LFSessionContext.h b/Final/cpp/lib/common/sys/apr/LFSessionContext.h
new file mode 100644
index 0000000000..eeb8279d9a
--- /dev/null
+++ b/Final/cpp/lib/common/sys/apr/LFSessionContext.h
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _LFSessionContext_
+#define _LFSessionContext_
+
+#include <queue>
+
+#include <apr_network_io.h>
+#include <apr_poll.h>
+#include <apr_time.h>
+
+#include <AMQFrame.h>
+#include <Buffer.h>
+#include <sys/Monitor.h>
+#include <sys/SessionContext.h>
+#include <sys/SessionHandler.h>
+
+#include "APRSocket.h"
+#include "LFProcessor.h"
+
+namespace qpid {
+namespace sys {
+
+
+class LFSessionContext : public virtual qpid::sys::SessionContext
+{
+ const bool debug;
+ APRSocket socket;
+ bool initiated;
+
+ qpid::framing::Buffer in;
+ qpid::framing::Buffer out;
+
+ qpid::sys::SessionHandler* handler;
+ LFProcessor* const processor;
+
+ apr_pollfd_t fd;
+
+ std::queue<qpid::framing::AMQFrame*> framesToWrite;
+ qpid::sys::Mutex writeLock;
+
+ bool processing;
+ bool closing;
+
+ static qpid::sys::Mutex logLock;
+ void log(const std::string& desc,
+ qpid::framing::AMQFrame* const frame);
+
+
+ public:
+ LFSessionContext(apr_socket_t* socket,
+ LFProcessor* const processor,
+ bool debug = false);
+ virtual ~LFSessionContext();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void close();
+ void read();
+ void write();
+ void init(qpid::sys::SessionHandler* handler);
+ void startProcessing();
+ void stopProcessing();
+ void handleClose();
+ void shutdown();
+ inline apr_pollfd_t* const getFd(){ return &fd; }
+ inline bool isClosed(){ return !socket.isOpen(); }
+};
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/common/sys/apr/Socket.cpp b/Final/cpp/lib/common/sys/apr/Socket.cpp
new file mode 100644
index 0000000000..c2abf50c5f
--- /dev/null
+++ b/Final/cpp/lib/common/sys/apr/Socket.cpp
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <sys/Socket.h>
+#include <apr/APRBase.h>
+#include <apr/APRPool.h>
+
+#include <iostream>
+
+using namespace qpid::sys;
+
+Socket Socket::createTcp() {
+ Socket s;
+ apr_pool_t* pool = APRPool::get();
+ CHECK_APR_SUCCESS(
+ apr_socket_create(
+ &s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
+ pool));
+ APRPool::free(pool);
+ return s;
+}
+
+Socket::Socket(apr_socket_t* s) {
+ socket = s;
+}
+
+void Socket::setTimeout(Time interval) {
+ apr_socket_timeout_set(socket, interval/TIME_USEC);
+}
+
+void Socket::connect(const std::string& host, int port) {
+ apr_sockaddr_t* address;
+ apr_pool_t* pool = APRPool::get();
+ CHECK_APR_SUCCESS(
+ apr_sockaddr_info_get(
+ &address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK,
+ pool));
+ CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+ APRPool::free(pool);
+}
+
+void Socket::close() {
+ if (socket == 0) return;
+ CHECK_APR_SUCCESS(apr_socket_shutdown(socket, APR_SHUTDOWN_READWRITE));
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ socket = 0;
+}
+
+ssize_t Socket::send(const void* data, size_t size)
+{
+ apr_size_t sent = size;
+ apr_status_t status =
+ apr_socket_send(socket, reinterpret_cast<const char*>(data), &sent);
+ if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
+ if (APR_STATUS_IS_EOF(status)) return SOCKET_EOF;
+ CHECK_APR_SUCCESS(status);
+ return sent;
+}
+
+ssize_t Socket::recv(void* data, size_t size)
+{
+ apr_size_t received = size;
+ apr_status_t status =
+ apr_socket_recv(socket, reinterpret_cast<char*>(data), &received);
+ if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
+ if (APR_STATUS_IS_EOF(status)) return SOCKET_EOF;
+ CHECK_APR_SUCCESS(status);
+ return received;
+}
+
+void Socket::setTcpNoDelay(bool on)
+{
+ CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_TCP_NODELAY, on ? 1 : 0));
+}
+
diff --git a/Final/cpp/lib/common/sys/apr/Thread.cpp b/Final/cpp/lib/common/sys/apr/Thread.cpp
new file mode 100644
index 0000000000..997ff03ab3
--- /dev/null
+++ b/Final/cpp/lib/common/sys/apr/Thread.cpp
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/Thread.h>
+#include "APRPool.h"
+
+using namespace qpid::sys;
+using qpid::sys::Runnable;
+
+void* APR_THREAD_FUNC Thread::runRunnable(apr_thread_t* thread, void *data) {
+ reinterpret_cast<Runnable*>(data)->run();
+ CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS));
+ return NULL;
+}
+
+Thread::~Thread() {
+}
+
+
diff --git a/Final/cpp/lib/common/sys/posix/EventChannel.cpp b/Final/cpp/lib/common/sys/posix/EventChannel.cpp
new file mode 100644
index 0000000000..8b91fc1ba6
--- /dev/null
+++ b/Final/cpp/lib/common/sys/posix/EventChannel.cpp
@@ -0,0 +1,328 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <mqueue.h>
+#include <string.h>
+#include <iostream>
+
+#include <sys/errno.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+
+#include <typeinfo>
+#include <iostream>
+#include <queue>
+
+#include <boost/ptr_container/ptr_map.hpp>
+#include <boost/current_function.hpp>
+
+#include <QpidError.h>
+#include <sys/Monitor.h>
+
+#include "check.h"
+#include "EventChannel.h"
+
+using namespace std;
+
+
+// Convenience template to zero out a struct.
+template <class S> struct ZeroStruct : public S {
+ ZeroStruct() { memset(this, 0, sizeof(*this)); }
+};
+
+namespace qpid {
+namespace sys {
+
+
+/**
+ * EventHandler wraps an epoll file descriptor. Acts as private
+ * interface between EventChannel and subclasses.
+ *
+ * Also implements Event interface for events that are not associated
+ * with a file descriptor and are passed via the message queue.
+ */
+class EventHandler : public Event, private Monitor
+{
+ public:
+ EventHandler(int epollSize = 256);
+ ~EventHandler();
+
+ int getEpollFd() { return epollFd; }
+ void epollAdd(int fd, uint32_t epollEvents, Event* event);
+ void epollMod(int fd, uint32_t epollEvents, Event* event);
+ void epollDel(int fd);
+
+ void mqPut(Event* event);
+ Event* mqGet();
+
+ protected:
+ // Should never be called, only complete.
+ void prepare(EventHandler&) { assert(0); }
+ Event* complete(EventHandler& eh);
+
+ private:
+ int epollFd;
+ std::string mqName;
+ int mqFd;
+ std::queue<Event*> mqEvents;
+};
+
+EventHandler::EventHandler(int epollSize)
+{
+ epollFd = epoll_create(epollSize);
+ if (epollFd < 0) throw QPID_POSIX_ERROR(errno);
+
+ // Create a POSIX message queue for non-fd events.
+ // We write one byte and never read it is always ready for read
+ // when we add it to epoll.
+ //
+ ZeroStruct<struct mq_attr> attr;
+ attr.mq_maxmsg = 1;
+ attr.mq_msgsize = 1;
+ do {
+ char tmpnam[L_tmpnam];
+ tmpnam_r(tmpnam);
+ mqName = tmpnam + 4; // Skip "tmp/"
+ mqFd = mq_open(
+ mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr);
+ if (mqFd < 0) throw QPID_POSIX_ERROR(errno);
+ } while (mqFd == EEXIST); // Name already taken, try again.
+
+ static char zero = '\0';
+ mq_send(mqFd, &zero, 1, 0);
+ epollAdd(mqFd, 0, this);
+}
+
+EventHandler::~EventHandler() {
+ mq_close(mqFd);
+ mq_unlink(mqName.c_str());
+}
+
+void EventHandler::mqPut(Event* event) {
+ ScopedLock l(*this);
+ assert(event != 0);
+ mqEvents.push(event);
+ epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+}
+
+Event* EventHandler::mqGet() {
+ ScopedLock l(*this);
+ if (mqEvents.empty())
+ return 0;
+ Event* event = mqEvents.front();
+ mqEvents.pop();
+ if(!mqEvents.empty())
+ epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+ return event;
+}
+
+void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event)
+{
+ ZeroStruct<struct epoll_event> ee;
+ ee.data.ptr = event;
+ ee.events = epollEvents;
+ if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event)
+{
+ ZeroStruct<struct epoll_event> ee;
+ ee.data.ptr = event;
+ ee.events = epollEvents;
+ if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+void EventHandler::epollDel(int fd) {
+ if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+Event* EventHandler::complete(EventHandler& eh)
+{
+ assert(&eh == this);
+ Event* event = mqGet();
+ return event==0 ? 0 : event->complete(eh);
+}
+
+// ================================================================
+// EventChannel
+
+EventChannel::shared_ptr EventChannel::create() {
+ return shared_ptr(new EventChannel());
+}
+
+EventChannel::EventChannel() : handler(new EventHandler()) {}
+
+EventChannel::~EventChannel() {}
+
+void EventChannel::postEvent(Event& e)
+{
+ e.prepare(*handler);
+}
+
+Event* EventChannel::getEvent()
+{
+ static const int infiniteTimeout = -1;
+ ZeroStruct<struct epoll_event> epollEvent;
+
+ // Loop until we can complete the event. Some events may re-post
+ // themselves and return 0 from complete, e.g. partial reads. //
+ Event* event = 0;
+ while (event == 0) {
+ int eventCount = epoll_wait(handler->getEpollFd(),
+ &epollEvent, 1, infiniteTimeout);
+ if (eventCount < 0) {
+ if (errno != EINTR) {
+ // TODO aconway 2006-11-28: Proper handling/logging of errors.
+ cerr << BOOST_CURRENT_FUNCTION << " ignoring error "
+ << PosixError::getMessage(errno) << endl;
+ assert(0);
+ }
+ }
+ else if (eventCount == 1) {
+ event = reinterpret_cast<Event*>(epollEvent.data.ptr);
+ assert(event != 0);
+ try {
+ event = event->complete(*handler);
+ }
+ catch (const Exception& e) {
+ if (event)
+ event->setError(e);
+ }
+ catch (const std::exception& e) {
+ if (event)
+ event->setError(e);
+ }
+ }
+ }
+ return event;
+}
+
+Event::~Event() {}
+
+void Event::prepare(EventHandler& handler)
+{
+ handler.mqPut(this);
+}
+
+bool Event::hasError() const {
+ return error;
+}
+
+void Event::throwIfError() throw (Exception) {
+ if (hasError())
+ error.throwSelf();
+}
+
+Event* Event::complete(EventHandler&)
+{
+ return this;
+}
+
+void Event::dispatch()
+{
+ try {
+ if (!callback.empty())
+ callback();
+ } catch (const std::exception&) {
+ throw;
+ } catch (...) {
+ throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception.");
+ }
+}
+
+void Event::setError(const ExceptionHolder& e) {
+ error = e;
+}
+
+void ReadEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+}
+
+ssize_t ReadEvent::doRead() {
+ ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received,
+ size - received);
+ if (n > 0) received += n;
+ return n;
+}
+
+Event* ReadEvent::complete(EventHandler& handler)
+{
+ // Read as much as possible without blocking.
+ ssize_t n = doRead();
+ while (n > 0 && received < size) doRead();
+
+ if (received == size) {
+ handler.epollDel(descriptor);
+ received = 0; // Reset for re-use.
+ return this;
+ }
+ else if (n <0 && (errno == EAGAIN)) {
+ // Keep polling for more.
+ handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this);
+ return 0;
+ }
+ else {
+ // Unexpected EOF or error. Throw ENODATA for EOF.
+ handler.epollDel(descriptor);
+ received = 0; // Reset for re-use.
+ throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA);
+ }
+}
+
+void WriteEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+}
+
+Event* WriteEvent::complete(EventHandler& handler)
+{
+ ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written,
+ size - written);
+ if (n < 0) throw QPID_POSIX_ERROR(errno);
+ written += n;
+ if(written < size) {
+ // Keep polling.
+ handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+ return 0;
+ }
+ written = 0; // Reset for re-use.
+ handler.epollDel(descriptor);
+ return this;
+}
+
+void AcceptEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+}
+
+Event* AcceptEvent::complete(EventHandler& handler)
+{
+ handler.epollDel(descriptor);
+ accepted = ::accept(descriptor, 0, 0);
+ if (accepted < 0) throw QPID_POSIX_ERROR(errno);
+ return this;
+}
+
+}}
diff --git a/Final/cpp/lib/common/sys/posix/EventChannel.h b/Final/cpp/lib/common/sys/posix/EventChannel.h
new file mode 100644
index 0000000000..56c1d1549d
--- /dev/null
+++ b/Final/cpp/lib/common/sys/posix/EventChannel.h
@@ -0,0 +1,179 @@
+#ifndef _sys_EventChannel_h
+#define _sys_EventChannel_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <SharedObject.h>
+#include <ExceptionHolder.h>
+#include <boost/function.hpp>
+#include <memory>
+
+namespace qpid {
+namespace sys {
+
+class Event;
+class EventHandler;
+class EventChannel;
+
+/**
+ * Base class for all Events.
+ */
+class Event
+{
+ public:
+ /** Type for callback when event is dispatched */
+ typedef boost::function0<void> Callback;
+
+ /**
+ * Create an event with optional callback.
+ * Instances of Event are sent directly through the channel.
+ * Derived classes define additional waiting behaviour.
+ *@param cb A callback functor that is invoked when dispatch() is called.
+ */
+ Event(Callback cb = 0) : callback(cb) {}
+
+ virtual ~Event();
+
+ /** Call the callback provided to the constructor, if any. */
+ void dispatch();
+
+ /** True if there was an error processing this event */
+ bool hasError() const;
+
+ /** If hasError() throw the corresponding exception. */
+ void throwIfError() throw(Exception);
+
+ protected:
+ virtual void prepare(EventHandler&);
+ virtual Event* complete(EventHandler&);
+ void setError(const ExceptionHolder& e);
+
+ Callback callback;
+ ExceptionHolder error;
+
+ friend class EventChannel;
+ friend class EventHandler;
+};
+
+template <class BufT>
+class IOEvent : public Event {
+ public:
+ void getDescriptor() const { return descriptor; }
+ size_t getSize() const { return size; }
+ BufT getBuffer() const { return buffer; }
+
+ protected:
+ IOEvent(int fd, Callback cb, size_t sz, BufT buf) :
+ Event(cb), descriptor(fd), buffer(buf), size(sz) {}
+
+ int descriptor;
+ BufT buffer;
+ size_t size;
+};
+
+/** Asynchronous read event */
+class ReadEvent : public IOEvent<void*>
+{
+ public:
+ explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) :
+ IOEvent<void*>(fd, cb, sz, buf), received(0) {}
+
+ private:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+ ssize_t doRead();
+
+ size_t received;
+};
+
+/** Asynchronous write event */
+class WriteEvent : public IOEvent<const void*>
+{
+ public:
+ explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0,
+ Callback cb=0) :
+ IOEvent<const void*>(fd, cb, sz, buf), written(0) {}
+
+ protected:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+
+ private:
+ ssize_t doWrite();
+ size_t written;
+};
+
+/** Asynchronous socket accept event */
+class AcceptEvent : public Event
+{
+ public:
+ /** Accept a connection on fd. */
+ explicit AcceptEvent(int fd=-1, Callback cb=0) :
+ Event(cb), descriptor(fd), accepted(0) {}
+
+ /** Get descriptor for server socket */
+ int getAcceptedDesscriptor() const { return accepted; }
+
+ private:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+
+ int descriptor;
+ int accepted;
+};
+
+
+class QueueSet;
+
+/**
+ * Channel to post and wait for events.
+ */
+class EventChannel : public qpid::SharedObject<EventChannel>
+{
+ public:
+ static shared_ptr create();
+
+ ~EventChannel();
+
+ /** Post an event to the channel. */
+ void postEvent(Event& event);
+
+ /** Post an event to the channel. Must not be 0. */
+ void postEvent(Event* event) { postEvent(*event); }
+
+ /**
+ * Wait for the next complete event.
+ *@return Pointer to event. Will never return 0.
+ */
+ Event* getEvent();
+
+ private:
+ EventChannel();
+ boost::shared_ptr<EventHandler> handler;
+};
+
+
+}}
+
+
+
+#endif /*!_sys_EventChannel_h*/
diff --git a/Final/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/Final/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
new file mode 100644
index 0000000000..7cd6f60902
--- /dev/null
+++ b/Final/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
@@ -0,0 +1,149 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+
+#include <boost/assert.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/ptr_container/ptr_deque.hpp>
+#include <boost/bind.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include <sys/SessionContext.h>
+#include <sys/SessionHandler.h>
+#include <sys/SessionHandlerFactory.h>
+#include <sys/Acceptor.h>
+#include <sys/Socket.h>
+#include <framing/Buffer.h>
+#include <framing/AMQFrame.h>
+#include <Exception.h>
+
+#include "EventChannelConnection.h"
+
+namespace qpid {
+namespace sys {
+
+using namespace qpid::framing;
+using namespace std;
+
+class EventChannelAcceptor : public Acceptor {
+ public:
+
+
+ EventChannelAcceptor(
+ int16_t port_, int backlog, int nThreads, bool trace_
+ );
+
+ int getPort() const;
+
+ void run(SessionHandlerFactory& factory);
+
+ void shutdown();
+
+ private:
+
+ void accept();
+
+ Mutex lock;
+ Socket listener;
+ const int port;
+ const bool isTrace;
+ bool isRunning;
+ boost::ptr_vector<EventChannelConnection> connections;
+ AcceptEvent acceptEvent;
+ SessionHandlerFactory* factory;
+ bool isShutdown;
+ EventChannelThreads::shared_ptr threads;
+};
+
+Acceptor::shared_ptr Acceptor::create(
+ int16_t port, int backlog, int threads, bool trace)
+{
+ return Acceptor::shared_ptr(
+ new EventChannelAcceptor(port, backlog, threads, trace));
+}
+
+// Must define Acceptor virtual dtor.
+Acceptor::~Acceptor() {}
+
+EventChannelAcceptor::EventChannelAcceptor(
+ int16_t port_, int backlog, int nThreads, bool trace_
+) : listener(Socket::createTcp()),
+ port(listener.listen(int(port_), backlog)),
+ isTrace(trace_),
+ isRunning(false),
+ acceptEvent(listener.fd(),
+ boost::bind(&EventChannelAcceptor::accept, this)),
+ factory(0),
+ isShutdown(false),
+ threads(EventChannelThreads::create(EventChannel::create(), nThreads))
+{ }
+
+int EventChannelAcceptor::getPort() const {
+ return port; // Immutable no need for lock.
+}
+
+void EventChannelAcceptor::run(SessionHandlerFactory& f) {
+ {
+ Mutex::ScopedLock l(lock);
+ if (!isRunning && !isShutdown) {
+ isRunning = true;
+ factory = &f;
+ threads->post(acceptEvent);
+ }
+ }
+ threads->join(); // Wait for shutdown.
+}
+
+void EventChannelAcceptor::shutdown() {
+ bool doShutdown = false;
+ {
+ Mutex::ScopedLock l(lock);
+ doShutdown = !isShutdown; // I'm the shutdown thread.
+ isShutdown = true;
+ }
+ if (doShutdown) {
+ ::close(acceptEvent.getDescriptor());
+ threads->shutdown();
+ for_each(connections.begin(), connections.end(),
+ boost::bind(&EventChannelConnection::close, _1));
+ }
+ threads->join();
+}
+
+void EventChannelAcceptor::accept()
+{
+ // No lock, we only post one accept event at a time.
+ if (isShutdown)
+ return;
+ if (acceptEvent.getException()) {
+ Exception::log(*acceptEvent.getException(),
+ "EventChannelAcceptor::accept");
+ shutdown();
+ return;
+ }
+ // TODO aconway 2006-11-29: Need to reap closed connections also.
+ int fd = acceptEvent.getAcceptedDesscriptor();
+ connections.push_back(
+ new EventChannelConnection(threads, *factory, fd, fd, isTrace));
+ threads->post(acceptEvent); // Keep accepting.
+}
+
+}} // namespace qpid::sys
diff --git a/Final/cpp/lib/common/sys/posix/EventChannelConnection.cpp b/Final/cpp/lib/common/sys/posix/EventChannelConnection.cpp
new file mode 100644
index 0000000000..85014b7bd4
--- /dev/null
+++ b/Final/cpp/lib/common/sys/posix/EventChannelConnection.cpp
@@ -0,0 +1,232 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <iostream>
+
+#include <boost/bind.hpp>
+#include <boost/assert.hpp>
+
+#include "EventChannelConnection.h"
+#include "sys/SessionHandlerFactory.h"
+#include "QpidError.h"
+
+using namespace std;
+using namespace qpid;
+using namespace qpid::framing;
+
+namespace qpid {
+namespace sys {
+
+const size_t EventChannelConnection::bufferSize = 65536;
+
+EventChannelConnection::EventChannelConnection(
+ EventChannelThreads::shared_ptr threads_,
+ SessionHandlerFactory& factory_,
+ int rfd,
+ int wfd,
+ bool isTrace_
+) :
+ readFd(rfd),
+ writeFd(wfd ? wfd : rfd),
+ readCallback(boost::bind(&EventChannelConnection::closeOnException,
+ this, &EventChannelConnection::endInitRead)),
+
+ isWriting(false),
+ isClosed(false),
+ threads(threads_),
+ handler(factory_.create(this)),
+ in(bufferSize),
+ out(bufferSize),
+ isTrace(isTrace_)
+{
+ BOOST_ASSERT(readFd > 0);
+ BOOST_ASSERT(writeFd > 0);
+ closeOnException(&EventChannelConnection::startRead);
+}
+
+
+void EventChannelConnection::send(std::auto_ptr<AMQFrame> frame) {
+ {
+ Monitor::ScopedLock lock(monitor);
+ assert(frame.get());
+ writeFrames.push_back(frame.release());
+ }
+ closeOnException(&EventChannelConnection::startWrite);
+}
+
+void EventChannelConnection::close() {
+ {
+ Monitor::ScopedLock lock(monitor);
+ if (isClosed)
+ return;
+ isClosed = true;
+ }
+ ::close(readFd);
+ ::close(writeFd);
+ {
+ Monitor::ScopedLock lock(monitor);
+ while (busyThreads > 0)
+ monitor.wait();
+ }
+ handler->closed();
+}
+
+void EventChannelConnection::closeNoThrow() {
+ Exception::tryCatchLog<void>(
+ boost::bind(&EventChannelConnection::close, this),
+ false,
+ "Exception closing channel"
+ );
+}
+
+/**
+ * Call f in a try/catch block and close the connection if
+ * an exception is thrown.
+ */
+void EventChannelConnection::closeOnException(MemberFnPtr f)
+{
+ try {
+ Exception::tryCatchLog<void>(
+ boost::bind(f, this),
+ "Closing connection due to exception"
+ );
+ return;
+ } catch (...) {
+ // Exception was already logged by tryCatchLog
+ closeNoThrow();
+ }
+}
+
+// Post the write event.
+// Always called inside closeOnException.
+// Called by endWrite and send, but only one thread writes at a time.
+//
+void EventChannelConnection::startWrite() {
+ FrameQueue::auto_type frame;
+ {
+ Monitor::ScopedLock lock(monitor);
+ // Stop if closed or a write event is already in progress.
+ if (isClosed || isWriting)
+ return;
+ if (writeFrames.empty()) {
+ isWriting = false;
+ return;
+ }
+ isWriting = true;
+ frame = writeFrames.pop_front();
+ }
+ // No need to lock here - only one thread can be writing at a time.
+ out.clear();
+ if (isTrace)
+ cout << "Send on socket " << writeFd << ": " << *frame << endl;
+ frame->encode(out);
+ out.flip();
+ writeEvent = WriteEvent(
+ writeFd, out.start(), out.available(),
+ boost::bind(&EventChannelConnection::closeOnException,
+ this, &EventChannelConnection::endWrite));
+ threads->post(writeEvent);
+}
+
+// ScopedBusy ctor increments busyThreads.
+// dtor decrements and calls monitor.notifyAll if it reaches 0.
+//
+struct EventChannelConnection::ScopedBusy : public AtomicCount::ScopedIncrement
+{
+ ScopedBusy(EventChannelConnection& ecc)
+ : AtomicCount::ScopedIncrement(
+ ecc.busyThreads, boost::bind(&Monitor::notifyAll, &ecc.monitor))
+ {}
+};
+
+// Write event completed.
+// Always called by a channel thread inside closeOnException.
+//
+void EventChannelConnection::endWrite() {
+ ScopedBusy(*this);
+ {
+ Monitor::ScopedLock lock(monitor);
+ isWriting = false;
+ if (isClosed)
+ return;
+ writeEvent.throwIfException();
+ }
+ // Check if there's more in to write in the write queue.
+ startWrite();
+}
+
+
+// Post the read event.
+// Always called inside closeOnException.
+// Called from ctor and end[Init]Read, so only one call at a time
+// is possible since we only post one read event at a time.
+//
+void EventChannelConnection::startRead() {
+ // Non blocking read, as much as we can swallow.
+ readEvent = ReadEvent(
+ readFd, in.start(), in.available(), readCallback,true);
+ threads->post(readEvent);
+}
+
+// Completion of initial read, expect protocolInit.
+// Always called inside closeOnException in channel thread.
+// Only called by one thread at a time.
+void EventChannelConnection::endInitRead() {
+ ScopedBusy(*this);
+ if (!isClosed) {
+ readEvent.throwIfException();
+ in.move(readEvent.getBytesRead());
+ in.flip();
+ ProtocolInitiation protocolInit;
+ if(protocolInit.decode(in)){
+ handler->initiated(&protocolInit);
+ readCallback = boost::bind(
+ &EventChannelConnection::closeOnException,
+ this, &EventChannelConnection::endRead);
+ }
+ in.compact();
+ // Continue reading.
+ startRead();
+ }
+}
+
+// Normal reads, expect a frame.
+// Always called inside closeOnException in channel thread.
+void EventChannelConnection::endRead() {
+ ScopedBusy(*this);
+ if (!isClosed) {
+ readEvent.throwIfException();
+ in.move(readEvent.getBytesRead());
+ in.flip();
+ AMQFrame frame;
+ while (frame.decode(in)) {
+ // TODO aconway 2006-11-30: received should take Frame&
+ if (isTrace)
+ cout << "Received on socket " << readFd
+ << ": " << frame << endl;
+ handler->received(&frame);
+ }
+ in.compact();
+ startRead();
+ }
+}
+
+}} // namespace qpid::sys
diff --git a/Final/cpp/lib/common/sys/posix/EventChannelConnection.h b/Final/cpp/lib/common/sys/posix/EventChannelConnection.h
new file mode 100644
index 0000000000..da190b0213
--- /dev/null
+++ b/Final/cpp/lib/common/sys/posix/EventChannelConnection.h
@@ -0,0 +1,105 @@
+#ifndef _posix_EventChannelConnection_h
+#define _posix_EventChannelConnection_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/ptr_container/ptr_deque.hpp>
+
+#include "EventChannelThreads.h"
+#include "sys/Monitor.h"
+#include "sys/SessionContext.h"
+#include "sys/SessionHandler.h"
+#include "sys/AtomicCount.h"
+#include "framing/AMQFrame.h"
+
+namespace qpid {
+namespace sys {
+
+class SessionHandlerFactory;
+
+/**
+ * Implements SessionContext and delegates to a SessionHandler
+ * for a connection via the EventChannel.
+ *@param readDescriptor file descriptor for reading.
+ *@param writeDescriptor file descriptor for writing,
+ * by default same as readDescriptor
+ */
+class EventChannelConnection : public SessionContext {
+ public:
+ EventChannelConnection(
+ EventChannelThreads::shared_ptr threads,
+ SessionHandlerFactory& factory,
+ int readDescriptor,
+ int writeDescriptor = 0,
+ bool isTrace = false
+ );
+
+ // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr
+ virtual void send(qpid::framing::AMQFrame* frame) {
+ send(std::auto_ptr<qpid::framing::AMQFrame>(frame));
+ }
+
+ virtual void send(std::auto_ptr<qpid::framing::AMQFrame> frame);
+
+ virtual void close();
+
+ private:
+ typedef boost::ptr_deque<qpid::framing::AMQFrame> FrameQueue;
+ typedef void (EventChannelConnection::*MemberFnPtr)();
+ struct ScopedBusy;
+
+ void startWrite();
+ void endWrite();
+ void startRead();
+ void endInitRead();
+ void endRead();
+ void closeNoThrow();
+ void closeOnException(MemberFnPtr);
+ bool shouldContinue(bool& flag);
+
+ static const size_t bufferSize;
+
+ Monitor monitor;
+
+ int readFd, writeFd;
+ ReadEvent readEvent;
+ WriteEvent writeEvent;
+ Event::Callback readCallback;
+ bool isWriting;
+ bool isClosed;
+ AtomicCount busyThreads;
+
+ EventChannelThreads::shared_ptr threads;
+ std::auto_ptr<SessionHandler> handler;
+ qpid::framing::Buffer in, out;
+ FrameQueue writeFrames;
+ bool isTrace;
+
+ friend struct ScopedBusy;
+};
+
+
+}} // namespace qpid::sys
+
+
+
+#endif /*!_posix_EventChannelConnection_h*/
diff --git a/Final/cpp/lib/common/sys/posix/EventChannelThreads.cpp b/Final/cpp/lib/common/sys/posix/EventChannelThreads.cpp
new file mode 100644
index 0000000000..fe8a290729
--- /dev/null
+++ b/Final/cpp/lib/common/sys/posix/EventChannelThreads.cpp
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "EventChannelThreads.h"
+#include <sys/Runnable.h>
+#include <iostream>
+using namespace std;
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace sys {
+
+EventChannelThreads::shared_ptr EventChannelThreads::create(
+ EventChannel::shared_ptr ec)
+{
+ return EventChannelThreads::shared_ptr(new EventChannelThreads(ec));
+}
+
+EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) :
+ channel(ec), nWaiting(0), state(RUNNING)
+{
+ // TODO aconway 2006-11-15: Estimate initial threads based on CPUs.
+ addThread();
+}
+
+EventChannelThreads::~EventChannelThreads() {
+ shutdown();
+ join();
+}
+
+void EventChannelThreads::shutdown()
+{
+ ScopedLock lock(*this);
+ if (state != RUNNING) // Already shutting down.
+ return;
+ for (size_t i = 0; i < workers.size(); ++i) {
+ channel->postEvent(terminate);
+ }
+ state = TERMINATE_SENT;
+ notify(); // Wake up one join() thread.
+}
+
+void EventChannelThreads::join()
+{
+ {
+ ScopedLock lock(*this);
+ while (state == RUNNING) // Wait for shutdown to start.
+ wait();
+ if (state == SHUTDOWN) // Shutdown is complete
+ return;
+ if (state == JOINING) {
+ // Someone else is doing the join.
+ while (state != SHUTDOWN)
+ wait();
+ return;
+ }
+ // I'm the joining thread
+ assert(state == TERMINATE_SENT);
+ state = JOINING;
+ } // Drop the lock.
+
+ for (size_t i = 0; i < workers.size(); ++i) {
+ assert(state == JOINING); // Only this thread can change JOINING.
+ workers[i].join();
+ }
+ state = SHUTDOWN;
+ notifyAll(); // Notify other join() threaeds.
+}
+
+void EventChannelThreads::addThread() {
+ ScopedLock l(*this);
+ workers.push_back(Thread(*this));
+}
+
+void EventChannelThreads::run()
+{
+ // Start life waiting. Decrement on exit.
+ AtomicCount::ScopedIncrement inc(nWaiting);
+ try {
+ while (true) {
+ Event* e = channel->getEvent();
+ assert(e != 0);
+ if (e == &terminate) {
+ return;
+ }
+ AtomicCount::ScopedDecrement dec(nWaiting);
+ // I'm no longer waiting, make sure someone is.
+ if (dec == 0)
+ addThread();
+ e->dispatch();
+ }
+ }
+ catch (const std::exception& e) {
+ // TODO aconway 2006-11-15: need better logging across the board.
+ std::cerr << "EventChannelThreads::run() caught: " << e.what()
+ << std::endl;
+ }
+ catch (...) {
+ std::cerr << "EventChannelThreads::run() caught unknown exception."
+ << std::endl;
+ }
+}
+
+}}
diff --git a/Final/cpp/lib/common/sys/posix/EventChannelThreads.h b/Final/cpp/lib/common/sys/posix/EventChannelThreads.h
new file mode 100644
index 0000000000..e28190e0ed
--- /dev/null
+++ b/Final/cpp/lib/common/sys/posix/EventChannelThreads.h
@@ -0,0 +1,95 @@
+#ifndef _posix_EventChannelThreads_h
+#define _sys_EventChannelThreads_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <vector>
+
+#include <Exception.h>
+#include <sys/Time.h>
+#include <sys/Monitor.h>
+#include <sys/Thread.h>
+#include <sys/AtomicCount.h>
+#include "EventChannel.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ Dynamic thread pool serving an EventChannel.
+
+ Threads run a loop { e = getEvent(); e->dispatch(); }
+ The size of the thread pool is automatically adjusted to optimal size.
+*/
+class EventChannelThreads :
+ public qpid::SharedObject<EventChannelThreads>,
+ public sys::Monitor, private sys::Runnable
+{
+ public:
+ /** Create the thread pool and start initial threads. */
+ static EventChannelThreads::shared_ptr create(
+ EventChannel::shared_ptr channel
+ );
+
+ ~EventChannelThreads();
+
+ /** Post event to the underlying channel */
+ void postEvent(Event& event) { channel->postEvent(event); }
+
+ /** Post event to the underlying channel Must not be 0. */
+ void postEvent(Event* event) { channel->postEvent(event); }
+
+ /**
+ * Terminate all threads.
+ *
+ * Returns immediately, use join() to wait till all threads are
+ * shut down.
+ */
+ void shutdown();
+
+ /** Wait for all threads to terminate. */
+ void join();
+
+ private:
+ typedef std::vector<sys::Thread> Threads;
+ typedef enum {
+ RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN
+ } State;
+
+ EventChannelThreads(EventChannel::shared_ptr underlyingChannel);
+ void addThread();
+
+ void run();
+ bool keepRunning();
+ void adjustThreads();
+
+ EventChannel::shared_ptr channel;
+ Threads workers;
+ sys::AtomicCount nWaiting;
+ State state;
+ Event terminate;
+};
+
+
+}}
+
+
+#endif /*!_sys_EventChannelThreads_h*/
diff --git a/Final/cpp/lib/common/sys/posix/PosixAcceptor.cpp b/Final/cpp/lib/common/sys/posix/PosixAcceptor.cpp
new file mode 100644
index 0000000000..842aa76f36
--- /dev/null
+++ b/Final/cpp/lib/common/sys/posix/PosixAcceptor.cpp
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/Acceptor.h>
+#include <Exception.h>
+
+namespace qpid {
+namespace sys {
+
+namespace {
+void fail() { throw qpid::Exception("PosixAcceptor not implemented"); }
+}
+
+class PosixAcceptor : public Acceptor {
+ public:
+ virtual int16_t getPort() const { fail(); return 0; }
+ virtual void run(qpid::sys::SessionHandlerFactory* ) { fail(); }
+ virtual void shutdown() { fail(); }
+};
+
+// Define generic Acceptor::create() to return APRAcceptor.
+ Acceptor::shared_ptr Acceptor::create(int16_t , int, int, bool)
+{
+ return Acceptor::shared_ptr(new PosixAcceptor());
+}
+
+// Must define Acceptor virtual dtor.
+Acceptor::~Acceptor() {}
+
+}}
diff --git a/Final/cpp/lib/common/sys/posix/Socket.cpp b/Final/cpp/lib/common/sys/posix/Socket.cpp
new file mode 100644
index 0000000000..e27ced9161
--- /dev/null
+++ b/Final/cpp/lib/common/sys/posix/Socket.cpp
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/socket.h>
+#include <sys/errno.h>
+#include <netinet/in.h>
+#include <netdb.h>
+
+#include <boost/format.hpp>
+
+#include <QpidError.h>
+#include <posix/check.h>
+#include <sys/Socket.h>
+
+using namespace qpid::sys;
+
+Socket Socket::createTcp()
+{
+ int s = ::socket (PF_INET, SOCK_STREAM, 0);
+ if (s < 0) throw QPID_POSIX_ERROR(errno);
+ return s;
+}
+
+Socket::Socket(int descriptor) : socket(descriptor) {}
+
+void Socket::setTimeout(Time interval)
+{
+ struct timeval tv;
+ tv.tv_sec = interval/TIME_SEC;
+ tv.tv_usec = (interval%TIME_SEC)/TIME_USEC;
+ setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
+ setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
+}
+
+void Socket::connect(const std::string& host, int port)
+{
+ struct sockaddr_in name;
+ name.sin_family = AF_INET;
+ name.sin_port = htons(port);
+ struct hostent* hp = gethostbyname ( host.c_str() );
+ if (hp == 0) throw QPID_POSIX_ERROR(errno);
+ memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length);
+ if (::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+void
+Socket::close()
+{
+ if (socket == 0) return;
+ if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno);
+ socket = 0;
+}
+
+ssize_t
+Socket::send(const void* data, size_t size)
+{
+ ssize_t sent = ::send(socket, data, size, 0);
+ if (sent < 0) {
+ if (errno == ECONNRESET) return SOCKET_EOF;
+ if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
+ throw QPID_POSIX_ERROR(errno);
+ }
+ return sent;
+}
+
+ssize_t
+Socket::recv(void* data, size_t size)
+{
+ ssize_t received = ::recv(socket, data, size, 0);
+ if (received < 0) {
+ if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
+ throw QPID_POSIX_ERROR(errno);
+ }
+ return received;
+}
+
+int Socket::listen(int port, int backlog)
+{
+ struct sockaddr_in name;
+ name.sin_family = AF_INET;
+ name.sin_port = htons(port);
+ name.sin_addr.s_addr = 0;
+ if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0)
+ throw QPID_POSIX_ERROR(errno);
+ if (::listen(socket, backlog) < 0)
+ throw QPID_POSIX_ERROR(errno);
+
+ socklen_t namelen = sizeof(name);
+ if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
+ throw QPID_POSIX_ERROR(errno);
+
+ return ntohs(name.sin_port);
+}
+
+
+int Socket::fd()
+{
+ return socket;
+}
+
+void Socket::setTcpNoDelay(bool) {} //not yet implemented
diff --git a/Final/cpp/lib/common/sys/posix/Thread.cpp b/Final/cpp/lib/common/sys/posix/Thread.cpp
new file mode 100644
index 0000000000..7291022dc6
--- /dev/null
+++ b/Final/cpp/lib/common/sys/posix/Thread.cpp
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/Thread.h>
+
+void* qpid::sys::Thread::runRunnable(void* p)
+{
+ static_cast<Runnable*>(p)->run();
+ return 0;
+}
+
+Thread::~Thread() {
+}
diff --git a/Final/cpp/lib/common/sys/posix/check.cpp b/Final/cpp/lib/common/sys/posix/check.cpp
new file mode 100644
index 0000000000..408679caa8
--- /dev/null
+++ b/Final/cpp/lib/common/sys/posix/check.cpp
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <cerrno>
+#include "check.h"
+
+namespace qpid {
+namespace sys {
+
+std::string
+PosixError::getMessage(int errNo)
+{
+ char buf[512];
+ return std::string(strerror_r(errNo, buf, sizeof(buf)));
+}
+
+PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw()
+ : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc)
+{ }
+
+}}
diff --git a/Final/cpp/lib/common/sys/posix/check.h b/Final/cpp/lib/common/sys/posix/check.h
new file mode 100644
index 0000000000..5afbe8f5a8
--- /dev/null
+++ b/Final/cpp/lib/common/sys/posix/check.h
@@ -0,0 +1,62 @@
+#ifndef _posix_check_h
+#define _posix_check_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <cerrno>
+#include <string>
+#include <QpidError.h>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Exception with message from errno.
+ */
+class PosixError : public qpid::QpidError
+{
+ public:
+ static std::string getMessage(int errNo);
+
+ PosixError(int errNo, const qpid::SrcLine& location) throw();
+
+ ~PosixError() throw() {}
+
+ int getErrNo() { return errNo; }
+
+ Exception* clone() const throw() { return new PosixError(*this); }
+
+ void throwSelf() { throw *this; }
+
+ private:
+ int errNo;
+};
+
+}}
+
+/** Create a PosixError for the current file/line and errno. */
+#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE)
+
+/** Throw a posix error if errNo is non-zero */
+#define QPID_POSIX_THROW_IF(ERRNO) \
+ if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO))
+#endif /*!_posix_check_h*/