diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-11-20 14:01:22 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-11-20 14:01:22 +0000 |
commit | e162ae8c89bfc29293ed84d552dbb373dd1c06f3 (patch) | |
tree | 4d0737f7a4324dfd93c4b519cce1e6998e46cf77 /Final/cpp/lib/common/sys | |
parent | bcd011a10c0db4ffc6f78380c548d673e270e000 (diff) | |
download | qpid-python-e162ae8c89bfc29293ed84d552dbb373dd1c06f3.tar.gz |
Creating tag for M2 final release
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/tags/M2@596674 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'Final/cpp/lib/common/sys')
41 files changed, 3998 insertions, 0 deletions
diff --git a/Final/cpp/lib/common/sys/Acceptor.h b/Final/cpp/lib/common/sys/Acceptor.h new file mode 100644 index 0000000000..e6bc27a593 --- /dev/null +++ b/Final/cpp/lib/common/sys/Acceptor.h @@ -0,0 +1,47 @@ +#ifndef _sys_Acceptor_h +#define _sys_Acceptor_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <stdint.h> +#include <SharedObject.h> + +namespace qpid { +namespace sys { + +class SessionHandlerFactory; + +class Acceptor : public qpid::SharedObject<Acceptor> +{ + public: + static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false); + virtual ~Acceptor() = 0; + virtual int16_t getPort() const = 0; + virtual void run(qpid::sys::SessionHandlerFactory* factory) = 0; + virtual void shutdown() = 0; +}; + +}} + + + +#endif /*!_sys_Acceptor_h*/ diff --git a/Final/cpp/lib/common/sys/AtomicCount.h b/Final/cpp/lib/common/sys/AtomicCount.h new file mode 100644 index 0000000000..b3d099192f --- /dev/null +++ b/Final/cpp/lib/common/sys/AtomicCount.h @@ -0,0 +1,74 @@ +#ifndef _posix_AtomicCount_h +#define _posix_AtomicCount_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/detail/atomic_count.hpp> +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +/** + * Atomic counter. + */ +class AtomicCount : boost::noncopyable { + public: + class ScopedDecrement : boost::noncopyable { + public: + /** Decrement counter in constructor and increment in destructor. */ + ScopedDecrement(AtomicCount& c) : count(c) { value = --count; } + ~ScopedDecrement() { ++count; } + /** Return the value returned by the decrement. */ + operator long() { return value; } + private: + AtomicCount& count; + long value; + }; + + class ScopedIncrement : boost::noncopyable { + public: + /** Increment counter in constructor and increment in destructor. */ + ScopedIncrement(AtomicCount& c) : count(c) { ++count; } + ~ScopedIncrement() { --count; } + private: + AtomicCount& count; + }; + + AtomicCount(long value = 0) : count(value) {} + + void operator++() { ++count ; } + + long operator--() { return --count; } + + operator long() const { return count; } + + + private: + boost::detail::atomic_count count; +}; + + +}} + + +#endif // _posix_AtomicCount_h diff --git a/Final/cpp/lib/common/sys/Module.h b/Final/cpp/lib/common/sys/Module.h new file mode 100644 index 0000000000..64ed309afb --- /dev/null +++ b/Final/cpp/lib/common/sys/Module.h @@ -0,0 +1,163 @@ +#ifndef _sys_Module_h +#define _sys_Module_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/noncopyable.hpp> +#include <iostream> +#include <QpidError.h> + +namespace qpid { +namespace sys { +#if USE_APR +#include <apr_dso.h> + typedef apr_dso_handle_t* dso_handle_t; +#else + typedef void* dso_handle_t; +#endif + + template <class T> class Module : private boost::noncopyable + { + typedef T* create_t(); + typedef void destroy_t(T*); + + dso_handle_t handle; + destroy_t* destroy; + T* ptr; + + void load(const std::string& name); + void unload(); + void* getSymbol(const std::string& name); + + public: + Module(const std::string& name); + T* operator->(); + T* get(); + ~Module() throw(); + }; + +} +} + +using namespace qpid::sys; + +template <class T> Module<T>::Module(const std::string& module) : destroy(0), ptr(0) +{ + load(module); + //TODO: need a better strategy for symbol names to allow multiple + //modules to be loaded without clashes... + + //Note: need the double cast to avoid errors in casting from void* to function pointer with -pedantic + create_t* create = reinterpret_cast<create_t*>(reinterpret_cast<intptr_t>(getSymbol("create"))); + destroy = reinterpret_cast<destroy_t*>(reinterpret_cast<intptr_t>(getSymbol("destroy"))); + ptr = create(); +} + +template <class T> T* Module<T>::operator->() +{ + return ptr; +} + +template <class T> T* Module<T>::get() +{ + return ptr; +} + +template <class T> Module<T>::~Module() throw() +{ + try { + if (handle && ptr) { + destroy(ptr); + } + if (handle) unload(); + } catch (std::exception& e) { + std::cout << "Error while destroying module: " << e.what() << std::endl; + } + destroy = 0; + handle = 0; + ptr = 0; +} + +// APR ================================================================ +#if USE_APR + +#include <apr/APRBase.h> +#include <apr/APRPool.h> + +template <class T> void Module<T>::load(const std::string& name) +{ + apr_pool_t* pool = APRPool::get(); + CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), pool)); + APRPool::free(pool); +} + +template <class T> void Module<T>::unload() +{ + CHECK_APR_SUCCESS(apr_dso_unload(handle)); +} + +template <class T> void* Module<T>::getSymbol(const std::string& name) +{ + apr_dso_handle_sym_t symbol; + CHECK_APR_SUCCESS(apr_dso_sym(&symbol, handle, name.c_str())); + return (void*) symbol; +} + +// POSIX================================================================ +#else + +#include <dlfcn.h> + +template <class T> void Module<T>::load(const std::string& name) +{ + dlerror(); + handle = dlopen(name.c_str(), RTLD_NOW); + const char* error = dlerror(); + if (error) { + THROW_QPID_ERROR(INTERNAL_ERROR, error); + } +} + +template <class T> void Module<T>::unload() +{ + dlerror(); + dlclose(handle); + const char* error = dlerror(); + if (error) { + THROW_QPID_ERROR(INTERNAL_ERROR, error); + } +} + +template <class T> void* Module<T>::getSymbol(const std::string& name) +{ + dlerror(); + void* sym = dlsym(handle, name.c_str()); + const char* error = dlerror(); + if (error) { + THROW_QPID_ERROR(INTERNAL_ERROR, error); + } + return sym; +} + +#endif //if USE_APR + +#endif //ifndef _sys_Module_h + diff --git a/Final/cpp/lib/common/sys/Monitor.h b/Final/cpp/lib/common/sys/Monitor.h new file mode 100644 index 0000000000..c615a97aa3 --- /dev/null +++ b/Final/cpp/lib/common/sys/Monitor.h @@ -0,0 +1,129 @@ +#ifndef _sys_Monitor_h +#define _sys_Monitor_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sys/errno.h> +#include <boost/noncopyable.hpp> +#include <sys/Mutex.h> +#include <sys/Time.h> + +#ifdef USE_APR +# include <apr_thread_cond.h> +#endif + +namespace qpid { +namespace sys { + +/** + * A monitor is a condition variable and a mutex + */ +class Monitor : public Mutex +{ + public: + inline Monitor(); + inline ~Monitor(); + inline void wait(); + inline bool wait(const Time& absoluteTime); + inline void notify(); + inline void notifyAll(); + + private: +#ifdef USE_APR + apr_thread_cond_t* condition; +#else + pthread_cond_t condition; +#endif +}; + + +// APR ================================================================ +#ifdef USE_APR + +Monitor::Monitor() { + apr_pool_t* pool = APRPool::get(); + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool)); + APRPool::free(pool); +} + +Monitor::~Monitor() { + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); +} + +void Monitor::wait() { + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); +} + +bool Monitor::wait(const Time& absoluteTime){ + // APR uses microseconds. + apr_status_t status = + apr_thread_cond_timedwait(condition, mutex, absoluteTime/TIME_USEC); + if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status); + return status == 0; +} + +void Monitor::notify(){ + CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void Monitor::notifyAll(){ + CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + +#else +// POSIX ================================================================ + +Monitor::Monitor() { + QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0)); +} + +Monitor::~Monitor() { + QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition)); +} + +void Monitor::wait() { + QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex)); +} + +bool Monitor::wait(const Time& absoluteTime){ + struct timespec ts; + toTimespec(ts, absoluteTime); + int status = pthread_cond_timedwait(&condition, &mutex, &ts); + if (status != 0) { + if (status == ETIMEDOUT) return false; + throw QPID_POSIX_ERROR(status); + } + return true; +} + +void Monitor::notify(){ + QPID_POSIX_THROW_IF(pthread_cond_signal(&condition)); +} + +void Monitor::notifyAll(){ + QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition)); +} +#endif /*USE_APR*/ + + +}} +#endif /*!_sys_Monitor_h*/ diff --git a/Final/cpp/lib/common/sys/Mutex.h b/Final/cpp/lib/common/sys/Mutex.h new file mode 100644 index 0000000000..ae1d6369af --- /dev/null +++ b/Final/cpp/lib/common/sys/Mutex.h @@ -0,0 +1,158 @@ +#ifndef _sys_Mutex_h +#define _sys_Mutex_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifdef USE_APR +# include <apr_thread_mutex.h> +# include <apr_pools.h> +# include "apr/APRBase.h" +# include "apr/APRPool.h" +#else +# include <pthread.h> +# include <posix/check.h> +#endif +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +/** + * Scoped lock template: calls lock() in ctor, unlock() in dtor. + * L can be any class with lock() and unlock() functions. + */ +template <class L> +class ScopedLock +{ + public: + ScopedLock(L& l) : mutex(l) { l.lock(); } + ~ScopedLock() { mutex.unlock(); } + private: + L& mutex; +}; + +/** + * Mutex lock. + */ +class Mutex : private boost::noncopyable { + public: + typedef ScopedLock<Mutex> ScopedLock; + + inline Mutex(); + inline ~Mutex(); + inline void lock(); + inline void unlock(); + inline void trylock(); + + protected: +#ifdef USE_APR + apr_thread_mutex_t* mutex; + apr_pool_t* pool; +#else + pthread_mutex_t mutex; +#endif +}; + +#ifdef USE_APR +// APR ================================================================ + +Mutex::Mutex() { + pool = APRPool::get(); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); +} + +Mutex::~Mutex(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); + APRPool::free(pool); +} + +void Mutex::lock() { + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} +void Mutex::unlock() { + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} + +void Mutex::trylock() { + CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); +} + +#else +// POSIX ================================================================ + +/** + * PODMutex is a POD, can be static-initialized with + * PODMutex m = QPID_PODMUTEX_INITIALIZER + */ +struct PODMutex +{ + typedef ScopedLock<PODMutex> ScopedLock; + + inline void lock(); + inline void unlock(); + inline void trylock(); + + // Must be public to be a POD: + pthread_mutex_t mutex; +}; + +#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER } + + +void PODMutex::lock() { + QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); +} +void PODMutex::unlock() { + QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); +} + +void PODMutex::trylock() { + QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); +} + + +Mutex::Mutex() { + QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, 0)); +} + +Mutex::~Mutex(){ + QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex)); +} + +void Mutex::lock() { + QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); +} +void Mutex::unlock() { + QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); +} + +void Mutex::trylock() { + QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); +} + +#endif // USE_APR + +}} + + + +#endif /*!_sys_Mutex_h*/ diff --git a/Final/cpp/lib/common/sys/Runnable.cpp b/Final/cpp/lib/common/sys/Runnable.cpp new file mode 100644 index 0000000000..c3174b9bc8 --- /dev/null +++ b/Final/cpp/lib/common/sys/Runnable.cpp @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Runnable.h" +#include <boost/bind.hpp> + +namespace qpid { +namespace sys { + +Runnable::~Runnable() {} + +Runnable::Functor Runnable::functor() +{ + return boost::bind(&Runnable::run, this); +} + +}} diff --git a/Final/cpp/lib/common/sys/Runnable.h b/Final/cpp/lib/common/sys/Runnable.h new file mode 100644 index 0000000000..fb3927c612 --- /dev/null +++ b/Final/cpp/lib/common/sys/Runnable.h @@ -0,0 +1,50 @@ +#ifndef _Runnable_ +#define _Runnable_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/function.hpp> + +namespace qpid { +namespace sys { + +/** + * Interface for objects that can be run, e.g. in a thread. + */ +class Runnable +{ + public: + /** Type to represent a runnable as a Functor */ + typedef boost::function0<void> Functor; + + virtual ~Runnable(); + + /** Derived classes override run(). */ + virtual void run() = 0; + + /** Create a functor object that will call this->run(). */ + Functor functor(); +}; + +}} + + +#endif diff --git a/Final/cpp/lib/common/sys/SessionContext.h b/Final/cpp/lib/common/sys/SessionContext.h new file mode 100644 index 0000000000..671e00774f --- /dev/null +++ b/Final/cpp/lib/common/sys/SessionContext.h @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SessionContext_ +#define _SessionContext_ + +#include <OutputHandler.h> + +namespace qpid { +namespace sys { + +/** + * Provides the output handler associated with a connection. + */ +class SessionContext : public virtual qpid::framing::OutputHandler +{ + public: + virtual void close() = 0; +}; + +}} + + +#endif diff --git a/Final/cpp/lib/common/sys/SessionHandler.h b/Final/cpp/lib/common/sys/SessionHandler.h new file mode 100644 index 0000000000..76f79d421d --- /dev/null +++ b/Final/cpp/lib/common/sys/SessionHandler.h @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SessionHandler_ +#define _SessionHandler_ + +#include <InputHandler.h> +#include <InitiationHandler.h> +#include <ProtocolInitiation.h> +#include <sys/TimeoutHandler.h> + +namespace qpid { +namespace sys { + + class SessionHandler : + public qpid::framing::InitiationHandler, + public qpid::framing::InputHandler, + public TimeoutHandler + { + public: + virtual void closed() = 0; + }; + +} +} + + +#endif diff --git a/Final/cpp/lib/common/sys/SessionHandlerFactory.h b/Final/cpp/lib/common/sys/SessionHandlerFactory.h new file mode 100644 index 0000000000..2a01aebcb0 --- /dev/null +++ b/Final/cpp/lib/common/sys/SessionHandlerFactory.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SessionHandlerFactory_ +#define _SessionHandlerFactory_ + +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +class SessionContext; +class SessionHandler; + +/** + * Callback interface used by the Acceptor to + * create a SessionHandler for each new connection. + */ +class SessionHandlerFactory : private boost::noncopyable +{ + public: + virtual SessionHandler* create(SessionContext* ctxt) = 0; + virtual ~SessionHandlerFactory(){} +}; + +}} + + +#endif diff --git a/Final/cpp/lib/common/sys/ShutdownHandler.h b/Final/cpp/lib/common/sys/ShutdownHandler.h new file mode 100644 index 0000000000..88baecb5b6 --- /dev/null +++ b/Final/cpp/lib/common/sys/ShutdownHandler.h @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ShutdownHandler_ +#define _ShutdownHandler_ + +namespace qpid { +namespace sys { + + class ShutdownHandler + { + public: + virtual void shutdown() = 0; + virtual ~ShutdownHandler(){} + }; + +} +} + +#endif diff --git a/Final/cpp/lib/common/sys/Socket.h b/Final/cpp/lib/common/sys/Socket.h new file mode 100644 index 0000000000..b5f74847c2 --- /dev/null +++ b/Final/cpp/lib/common/sys/Socket.h @@ -0,0 +1,89 @@ +#ifndef _sys_Socket_h +#define _sys_Socket_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include <sys/Time.h> + +#ifdef USE_APR +# include <apr_network_io.h> +#endif + +namespace qpid { +namespace sys { + +class Socket +{ + public: + /** Create an initialized TCP socket */ + static Socket createTcp(); + + /** Create a socket wrapper for descriptor. */ +#ifdef USE_APR + Socket(apr_socket_t* descriptor = 0); +#else + Socket(int descriptor = 0); +#endif + + /** Set timeout for read and write */ + void setTimeout(Time interval); + void setTcpNoDelay(bool on); + + void connect(const std::string& host, int port); + + void close(); + + enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode; + + /** Returns bytes sent or an ErrorCode value < 0. */ + ssize_t send(const void* data, size_t size); + + /** + * Returns bytes received, an ErrorCode value < 0 or 0 + * if the connection closed in an orderly manner. + */ + ssize_t recv(void* data, size_t size); + + /** Bind to a port and start listening. + *@param port 0 means choose an available port. + *@param backlog maximum number of pending connections. + *@return The bound port. + */ + int listen(int port = 0, int backlog = 10); + + /** Get file descriptor */ + int fd(); + + private: +#ifdef USE_APR + apr_socket_t* socket; +#else + void init() const; + mutable int socket; // Initialized on demand. +#endif +}; + +}} + + +#endif /*!_sys_Socket_h*/ diff --git a/Final/cpp/lib/common/sys/Thread.h b/Final/cpp/lib/common/sys/Thread.h new file mode 100644 index 0000000000..c14c7cc6ad --- /dev/null +++ b/Final/cpp/lib/common/sys/Thread.h @@ -0,0 +1,150 @@ +#ifndef _sys_Thread_h +#define _sys_Thread_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sys/Runnable.h> + +#ifdef USE_APR +# include <apr_thread_proc.h> +# include <apr_portable.h> +# include <apr/APRPool.h> +# include <apr/APRBase.h> +#else +# include <posix/check.h> +# include <pthread.h> +#endif + +namespace qpid { +namespace sys { + +class Thread +{ + public: + inline static Thread current(); + inline static void yield(); + + inline Thread(); + inline Thread(qpid::sys::Runnable*); + inline Thread(qpid::sys::Runnable&); + ~Thread(); + inline void join(); + inline long id(); + + private: +#ifdef USE_APR + static void* APR_THREAD_FUNC runRunnable(apr_thread_t* thread, void *data); + inline Thread(apr_thread_t* t); + apr_thread_t* thread; +#else + static void* runRunnable(void* runnable); + inline Thread(pthread_t); + pthread_t thread; +#endif +}; + + +Thread::Thread() : thread(0) {} + +// APR ================================================================ +#ifdef USE_APR + +Thread::Thread(Runnable* runnable) { + apr_pool_t* tmp_pool = APRPool::get(); + CHECK_APR_SUCCESS( + apr_thread_create(&thread, 0, runRunnable, runnable, tmp_pool)); + APRPool::free(tmp_pool); +} + +Thread::Thread(Runnable& runnable) { + apr_pool_t* tmp_pool = APRPool::get(); + CHECK_APR_SUCCESS( + apr_thread_create(&thread, 0, runRunnable, &runnable, tmp_pool)); + APRPool::free(tmp_pool); +} + +void Thread::join(){ + apr_status_t status; + if (thread != 0) + CHECK_APR_SUCCESS(apr_thread_join(&status, thread)); +} + +long Thread::id() { + return long(thread); +} + +Thread::Thread(apr_thread_t* t) : thread(t) {} + +Thread Thread::current(){ + apr_pool_t* tmp_pool = APRPool::get(); + apr_thread_t* thr; + apr_os_thread_t osthr = apr_os_thread_current(); + CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, tmp_pool)); + APRPool::free(tmp_pool); + return Thread(thr); +} + +void Thread::yield() +{ + apr_thread_yield(); +} + + +// POSIX ================================================================ +#else + +Thread::Thread(Runnable* runnable) { + QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable)); +} + +Thread::Thread(Runnable& runnable) { + QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable)); +} + +void Thread::join(){ + QPID_POSIX_THROW_IF(pthread_join(thread, 0)); +} + +long Thread::id() { + return long(thread); +} + +Thread::~Thread() { +} + +Thread::Thread(pthread_t thr) : thread(thr) {} + +Thread Thread::current() { + return Thread(pthread_self()); +} + +void Thread::yield() +{ + QPID_POSIX_THROW_IF(pthread_yield()); +} + + +#endif + +}} + +#endif /*!_sys_Thread_h*/ diff --git a/Final/cpp/lib/common/sys/Time.cpp b/Final/cpp/lib/common/sys/Time.cpp new file mode 100644 index 0000000000..ad6185b966 --- /dev/null +++ b/Final/cpp/lib/common/sys/Time.cpp @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Time.h" + +namespace qpid { +namespace sys { + +// APR ================================================================ +#if USE_APR + +Time now() { return apr_time_now() * TIME_USEC; } + +// POSIX================================================================ +#else + +Time now() { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return toTime(ts); +} + +struct timespec toTimespec(const Time& t) { + struct timespec ts; + toTimespec(ts, t); + return ts; +} + +struct timespec& toTimespec(struct timespec& ts, const Time& t) { + ts.tv_sec = t / TIME_SEC; + ts.tv_nsec = t % TIME_SEC; + return ts; +} + +Time toTime(const struct timespec& ts) { + return ts.tv_sec*TIME_SEC + ts.tv_nsec; +} + + +#endif +}} + diff --git a/Final/cpp/lib/common/sys/Time.h b/Final/cpp/lib/common/sys/Time.h new file mode 100644 index 0000000000..3dd46741d8 --- /dev/null +++ b/Final/cpp/lib/common/sys/Time.h @@ -0,0 +1,58 @@ +#ifndef _sys_Time_h +#define _sys_Time_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <stdint.h> + +#ifdef USE_APR +# include <apr_time.h> +#else +# include <time.h> +#endif + +namespace qpid { +namespace sys { + +/** Time in nanoseconds */ +typedef int64_t Time; + +Time now(); + +/** Nanoseconds per second. */ +const Time TIME_SEC = 1000*1000*1000; +/** Nanoseconds per millisecond */ +const Time TIME_MSEC = 1000*1000; +/** Nanoseconds per microseconds. */ +const Time TIME_USEC = 1000; +/** Nanoseconds per nanosecond. */ +const Time TIME_NSEC = 1; + +#ifndef USE_APR +struct timespec toTimespec(const Time& t); +struct timespec& toTimespec(struct timespec& ts, const Time& t); +Time toTime(const struct timespec& ts); +#endif + +}} + +#endif /*!_sys_Time_h*/ diff --git a/Final/cpp/lib/common/sys/TimeoutHandler.h b/Final/cpp/lib/common/sys/TimeoutHandler.h new file mode 100644 index 0000000000..0c10709bbf --- /dev/null +++ b/Final/cpp/lib/common/sys/TimeoutHandler.h @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TimeoutHandler_ +#define _TimeoutHandler_ + +namespace qpid { +namespace sys { + + class TimeoutHandler + { + public: + virtual void idleOut() = 0; + virtual void idleIn() = 0; + virtual ~TimeoutHandler(){} + }; + +} +} + + +#endif diff --git a/Final/cpp/lib/common/sys/apr/APRAcceptor.cpp b/Final/cpp/lib/common/sys/apr/APRAcceptor.cpp new file mode 100644 index 0000000000..a427542fc3 --- /dev/null +++ b/Final/cpp/lib/common/sys/apr/APRAcceptor.cpp @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <sys/Acceptor.h> +#include <sys/SessionHandlerFactory.h> +#include <apr_pools.h> +#include "LFProcessor.h" +#include "LFSessionContext.h" +#include "APRBase.h" +#include "APRPool.h" + +namespace qpid { +namespace sys { + +class APRAcceptor : public Acceptor +{ + public: + APRAcceptor(int16_t port, int backlog, int threads, bool trace); + ~APRAcceptor(); + virtual int16_t getPort() const; + virtual void run(qpid::sys::SessionHandlerFactory* factory); + virtual void shutdown(); + + private: + void shutdownImpl(); + + private: + int16_t port; + bool trace; + LFProcessor processor; + apr_socket_t* socket; + volatile bool running; + Mutex shutdownLock; + apr_pool_t* pool; +}; + +// Define generic Acceptor::create() to return APRAcceptor. +Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace) +{ + return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace)); +} +// Must define Acceptor virtual dtor. +Acceptor::~Acceptor() { +} + +APRAcceptor::~APRAcceptor() { + APRPool::free(pool); +} + +APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) : + port(port_), + trace(trace_), + processor(threads, 1000, 5000000) +{ + pool = APRPool::get(); + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); + CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); + CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); + CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); +} + +int16_t APRAcceptor::getPort() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->port; +} + +void APRAcceptor::run(SessionHandlerFactory* factory) { + running = true; + processor.start(); + std::cout << "Listening on port " << getPort() << "..." << std::endl; + while(running){ + apr_socket_t* client; + apr_status_t status = apr_socket_accept(&client, socket, pool); + if(status == APR_SUCCESS){ + //make this socket non-blocking: + CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); + LFSessionContext* session = new LFSessionContext(client, &processor, trace); + session->init(factory->create(session)); + }else{ + Mutex::ScopedLock locker(shutdownLock); + if(running) { + if(status != APR_EINTR){ + std::cout << "ERROR: " << get_desc(status) << std::endl; + } + shutdownImpl(); + } + } + } +} + +void APRAcceptor::shutdown() { + Mutex::ScopedLock locker(shutdownLock); + if (running) { + shutdownImpl(); + } +} + +void APRAcceptor::shutdownImpl() { + Mutex::ScopedLock locker(shutdownLock); + running = false; + processor.stop(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); +} + + +}} diff --git a/Final/cpp/lib/common/sys/apr/APRBase.cpp b/Final/cpp/lib/common/sys/apr/APRBase.cpp new file mode 100644 index 0000000000..861071499f --- /dev/null +++ b/Final/cpp/lib/common/sys/apr/APRBase.cpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <QpidError.h> +#include "APRBase.h" + +using namespace qpid::sys; + +APRBase* APRBase::instance = 0; + +APRBase* APRBase::getInstance(){ + if(instance == 0){ + instance = new APRBase(); + } + return instance; +} + + +APRBase::APRBase() : count(0){ + apr_initialize(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, 0)); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); +} + +APRBase::~APRBase(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); + apr_pool_destroy(pool); + apr_terminate(); +} + +bool APRBase::_increment(){ + bool deleted(false); + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); + if(this == instance){ + count++; + }else{ + deleted = true; + } + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); + return !deleted; +} + +void APRBase::_decrement(){ + APRBase* copy = 0; + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); + if(--count == 0){ + copy = instance; + instance = 0; + } + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); + if(copy != 0){ + delete copy; + } +} + +void APRBase::increment(){ + int count = 0; + while(count++ < 2 && !getInstance()->_increment()){ + std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl; + } +} + +void APRBase::decrement(){ + getInstance()->_decrement(); +} + +std::string qpid::sys::get_desc(apr_status_t status){ + const int size = 50; + char tmp[size]; + return std::string(apr_strerror(status, tmp, size)); +} + diff --git a/Final/cpp/lib/common/sys/apr/APRBase.h b/Final/cpp/lib/common/sys/apr/APRBase.h new file mode 100644 index 0000000000..6a866a554a --- /dev/null +++ b/Final/cpp/lib/common/sys/apr/APRBase.h @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _APRBase_ +#define _APRBase_ + +#include <string> +#include <apr_thread_mutex.h> +#include <apr_errno.h> +#include <QpidError.h> + +namespace qpid { +namespace sys { + + /** + * Use of APR libraries necessitates explicit init and terminate + * calls. Any class using APR libs should obtain the reference to + * this singleton and increment on construction, decrement on + * destruction. This class can then correctly initialise apr + * before the first use and terminate after the last use. + */ + class APRBase{ + static APRBase* instance; + apr_pool_t* pool; + apr_thread_mutex_t* mutex; + int count; + + APRBase(); + ~APRBase(); + static APRBase* getInstance(); + bool _increment(); + void _decrement(); + public: + static void increment(); + static void decrement(); + }; + + //this is also a convenient place for a helper function for error checking: + void check(apr_status_t status, const char* file, const int line); + std::string get_desc(apr_status_t status); + +#define CHECK_APR_SUCCESS(A) qpid::sys::check(A, __FILE__, __LINE__); + +} +} + +// Inlined as it is called *a lot* +void inline qpid::sys::check(apr_status_t status, const char* file, const int line){ + if (status != APR_SUCCESS){ + const int size = 50; + char tmp[size]; + std::string msg(apr_strerror(status, tmp, size)); + throw qpid::QpidError(APR_ERROR + ((int) status), msg, + qpid::SrcLine(file, line)); + } +} + + + + +#endif diff --git a/Final/cpp/lib/common/sys/apr/APRPool.cpp b/Final/cpp/lib/common/sys/apr/APRPool.cpp new file mode 100644 index 0000000000..91481faf09 --- /dev/null +++ b/Final/cpp/lib/common/sys/apr/APRPool.cpp @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "APRPool.h" +#include "APRBase.h" +#include <boost/pool/detail/singleton.hpp> +#include <iostream> +#include <sstream> + + +using namespace qpid::sys; + +APRPool::APRPool(){ + APRBase::increment(); + allocated_pools = new std::stack<apr_pool_t*>(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&poolGuard, APR_THREAD_MUTEX_NESTED, pool)); +} + +APRPool::~APRPool(){ + while(allocated_pools->size() > 0) { + apr_pool_t* pool = allocated_pools->top(); + allocated_pools->pop(); + apr_pool_destroy(pool); + } + apr_pool_destroy(pool); + apr_thread_mutex_destroy(poolGuard); + delete allocated_pools; + APRBase::decrement(); +} + +void APRPool::free_pool(apr_pool_t* pool) { + CHECK_APR_SUCCESS(apr_thread_mutex_lock(poolGuard)); + allocated_pools->push(pool); + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(poolGuard)); +} + +apr_pool_t* APRPool::allocate_pool() { + CHECK_APR_SUCCESS(apr_thread_mutex_lock(poolGuard)); + apr_pool_t* retval; + if (allocated_pools->size() == 0) { + CHECK_APR_SUCCESS(apr_pool_create(&retval, pool)); + } + else { + retval = allocated_pools->top(); + allocated_pools->pop(); + } + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(poolGuard)); + return retval; +} + +apr_pool_t* APRPool::get() { + return + boost::details::pool::singleton_default<APRPool>::instance().allocate_pool(); +} + +void APRPool::free(apr_pool_t* pool) { + boost::details::pool::singleton_default<APRPool>::instance().free_pool(pool); +} + diff --git a/Final/cpp/lib/common/sys/apr/APRPool.h b/Final/cpp/lib/common/sys/apr/APRPool.h new file mode 100644 index 0000000000..c22338599e --- /dev/null +++ b/Final/cpp/lib/common/sys/apr/APRPool.h @@ -0,0 +1,61 @@ +#ifndef _APRPool_ +#define _APRPool_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/noncopyable.hpp> +#include <apr_pools.h> +#include <apr_thread_mutex.h> +#include <stack> + +namespace qpid { +namespace sys { +/** + * Singleton APR memory pool. + */ +class APRPool : private boost::noncopyable { + public: + APRPool(); + ~APRPool(); + + apr_pool_t* allocate_pool(); + + void free_pool(apr_pool_t* pool); + + /** Allocate pool */ + static apr_pool_t* get(); + + /** Free pool */ + static void free(apr_pool_t* pool); + + private: + apr_pool_t* pool; + apr_thread_mutex_t* poolGuard; + std::stack<apr_pool_t*>* allocated_pools; +}; + +}} + + + + + +#endif /*!_APRPool_*/ diff --git a/Final/cpp/lib/common/sys/apr/APRSocket.cpp b/Final/cpp/lib/common/sys/apr/APRSocket.cpp new file mode 100644 index 0000000000..f68d51d8e4 --- /dev/null +++ b/Final/cpp/lib/common/sys/apr/APRSocket.cpp @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "APRBase.h" +#include "APRSocket.h" +#include <assert.h> +#include <iostream> + +using namespace qpid::sys; +using namespace qpid::framing; + +APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ + +} + +void APRSocket::read(qpid::framing::Buffer& buffer){ + apr_size_t bytes; + bytes = buffer.available(); + apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes); + buffer.move(bytes); + if(APR_STATUS_IS_TIMEUP(s)){ + //timed out + }else if(APR_STATUS_IS_EOF(s)){ + close(); + } +} + +void APRSocket::write(qpid::framing::Buffer& buffer){ + apr_size_t bytes; + do{ + bytes = buffer.available(); + apr_socket_send(socket, buffer.start(), &bytes); + buffer.move(bytes); + }while(bytes > 0); +} + +void APRSocket::close(){ + if(!closed){ + CHECK_APR_SUCCESS(apr_socket_close(socket)); + closed = true; + } +} + +bool APRSocket::isOpen(){ + return !closed; +} + +u_int8_t APRSocket::read(){ + char data[1]; + apr_size_t bytes = 1; + apr_status_t s = apr_socket_recv(socket, data, &bytes); + if(APR_STATUS_IS_EOF(s) || bytes == 0){ + return 0; + }else{ + return *data; + } +} + +APRSocket::~APRSocket(){ +} diff --git a/Final/cpp/lib/common/sys/apr/APRSocket.h b/Final/cpp/lib/common/sys/apr/APRSocket.h new file mode 100644 index 0000000000..53f1055c6a --- /dev/null +++ b/Final/cpp/lib/common/sys/apr/APRSocket.h @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _APRSocket_ +#define _APRSocket_ + +#include <apr_network_io.h> +#include <Buffer.h> + +namespace qpid { +namespace sys { + + class APRSocket + { + apr_socket_t* const socket; + volatile bool closed; + public: + APRSocket(apr_socket_t* socket); + void read(qpid::framing::Buffer& b); + void write(qpid::framing::Buffer& b); + void close(); + bool isOpen(); + u_int8_t read(); + ~APRSocket(); + }; + +} +} + + +#endif diff --git a/Final/cpp/lib/common/sys/apr/LFProcessor.cpp b/Final/cpp/lib/common/sys/apr/LFProcessor.cpp new file mode 100644 index 0000000000..22b601e688 --- /dev/null +++ b/Final/cpp/lib/common/sys/apr/LFProcessor.cpp @@ -0,0 +1,181 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <sstream> +#include <QpidError.h> +#include "LFProcessor.h" +#include "APRBase.h" +#include "APRPool.h" +#include "LFSessionContext.h" + +using namespace qpid::sys; +using qpid::QpidError; + +// TODO aconway 2006-10-12: stopped is read outside locks. +// + +LFProcessor::LFProcessor(int _workers, int _size, int _timeout) : + size(_size), + timeout(_timeout), + signalledCount(0), + current(0), + count(0), + workerCount(_workers), + hasLeader(false), + workers(new Thread[_workers]), + stopped(false) +{ + pool = APRPool::get(); + CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); +} + + +LFProcessor::~LFProcessor(){ + if (!stopped) stop(); + delete[] workers; + CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); + APRPool::free(pool); +} + +void LFProcessor::start(){ + for(int i = 0; i < workerCount; i++){ + workers[i] = Thread(this); + } +} + +void LFProcessor::add(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); + Monitor::ScopedLock l(countLock); + sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data)); + count++; +} + +void LFProcessor::remove(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + Monitor::ScopedLock l(countLock); + sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data))); + count--; +} + +void LFProcessor::reactivate(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +void LFProcessor::deactivate(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); +} + +void LFProcessor::update(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +bool LFProcessor::full(){ + Mutex::ScopedLock locker(countLock); + return count == size; +} + +bool LFProcessor::empty(){ + Mutex::ScopedLock locker(countLock); + return count == 0; +} + +void LFProcessor::poll() { + apr_status_t status = APR_EGENERAL; + do{ + current = 0; + if(!stopped){ + status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); + } + }while(status != APR_SUCCESS && !stopped); +} + +void LFProcessor::run(){ + try{ + while(!stopped){ + const apr_pollfd_t* event = 0; + LFSessionContext* session = 0; + { + Monitor::ScopedLock l(leadLock); + waitToLead(); + event = getNextEvent(); + if(!event) return; + session = reinterpret_cast<LFSessionContext*>( + event->client_data); + session->startProcessing(); + relinquishLead(); + } + + //process event: + if(event->rtnevents & APR_POLLIN) session->read(); + if(event->rtnevents & APR_POLLOUT) session->write(); + + if(session->isClosed()){ + session->handleClose(); + Monitor::ScopedLock l(countLock); + sessions.erase(find(sessions.begin(),sessions.end(), session)); + count--; + }else{ + session->stopProcessing(); + } + } + }catch(std::exception e){ + std::cout << e.what() << std::endl; + } +} + +void LFProcessor::waitToLead(){ + while(hasLeader && !stopped) leadLock.wait(); + hasLeader = !stopped; +} + +void LFProcessor::relinquishLead(){ + hasLeader = false; + leadLock.notify(); +} + +const apr_pollfd_t* LFProcessor::getNextEvent(){ + while(true){ + if(stopped){ + return 0; + }else if(current < signalledCount){ + //use result of previous poll if one is available + return signalledFDs + (current++); + }else{ + //else poll to get new events + poll(); + } + } +} + +void LFProcessor::stop(){ + stopped = true; + { + Monitor::ScopedLock l(leadLock); + leadLock.notifyAll(); + } + for(int i = 0; i < workerCount; i++){ + workers[i].join(); + } + for(iterator i = sessions.begin(); i < sessions.end(); i++){ + (*i)->shutdown(); + } +} + diff --git a/Final/cpp/lib/common/sys/apr/LFProcessor.h b/Final/cpp/lib/common/sys/apr/LFProcessor.h new file mode 100644 index 0000000000..0f4850ee08 --- /dev/null +++ b/Final/cpp/lib/common/sys/apr/LFProcessor.h @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _LFProcessor_ +#define _LFProcessor_ + +#include <apr_poll.h> +#include <iostream> +#include <vector> +#include <sys/Monitor.h> +#include <sys/Runnable.h> +#include <sys/Thread.h> + +namespace qpid { +namespace sys { + + class LFSessionContext; + + /** + * This class processes a poll set using the leaders-followers + * pattern for thread synchronization: the leader will poll and on + * the poll returning, it will remove a session, promote a + * follower to leadership, then process the session. + */ + class LFProcessor : private virtual qpid::sys::Runnable + { + typedef std::vector<LFSessionContext*>::iterator iterator; + + const int size; + const apr_interval_time_t timeout; + apr_pollset_t* pollset; + int signalledCount; + int current; + const apr_pollfd_t* signalledFDs; + int count; + const int workerCount; + bool hasLeader; + qpid::sys::Thread* workers; + qpid::sys::Monitor leadLock; + qpid::sys::Mutex countLock; + std::vector<LFSessionContext*> sessions; + volatile bool stopped; + apr_pool_t* pool; + + const apr_pollfd_t* getNextEvent(); + void waitToLead(); + void relinquishLead(); + void poll(); + virtual void run(); + + public: + LFProcessor(int workers, int size, int timeout); + /** + * Add the fd to the poll set. Relies on the client_data being + * an instance of LFSessionContext. + */ + void add(const apr_pollfd_t* const fd); + /** + * Remove the fd from the poll set. + */ + void remove(const apr_pollfd_t* const fd); + /** + * Signal that the fd passed in, already part of the pollset, + * has had its flags altered. + */ + void update(const apr_pollfd_t* const fd); + /** + * Add an fd back to the poll set after deactivation. + */ + void reactivate(const apr_pollfd_t* const fd); + /** + * Temporarily remove the fd from the poll set. Called when processing + * is about to begin. + */ + void deactivate(const apr_pollfd_t* const fd); + /** + * Indicates whether the capacity of this processor has been + * reached (or whether it can still handle further fd's). + */ + bool full(); + /** + * Indicates whether there are any fd's registered. + */ + bool empty(); + /** + * Stop processing. + */ + void stop(); + /** + * Start processing. + */ + void start(); + /** + * Is processing stopped? + */ + bool isStopped(); + + ~LFProcessor(); + }; + +} +} + + +#endif diff --git a/Final/cpp/lib/common/sys/apr/LFSessionContext.cpp b/Final/cpp/lib/common/sys/apr/LFSessionContext.cpp new file mode 100644 index 0000000000..8a7ce18136 --- /dev/null +++ b/Final/cpp/lib/common/sys/apr/LFSessionContext.cpp @@ -0,0 +1,181 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "LFSessionContext.h" +#include "APRBase.h" +#include "APRPool.h" +#include <QpidError.h> +#include <assert.h> + +using namespace qpid::sys; +using namespace qpid::sys; +using namespace qpid::framing; + +LFSessionContext::LFSessionContext(apr_socket_t* _socket, + LFProcessor* const _processor, + bool _debug) : + debug(_debug), + socket(_socket), + initiated(false), + in(65536), + out(65536), + processor(_processor), + processing(false), + closing(false) +{ + + fd.p = APRPool::get(); + fd.desc_type = APR_POLL_SOCKET; + fd.reqevents = APR_POLLIN; + fd.client_data = this; + fd.desc.s = _socket; + + out.flip(); +} + +LFSessionContext::~LFSessionContext(){ + +} + +void LFSessionContext::read(){ + socket.read(in); + in.flip(); + if(initiated){ + AMQFrame frame; + try{ + while(frame.decode(in)){ + if(debug) log("RECV", &frame); + handler->received(&frame); + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg + << " (" << error.location.file << ":" << error.location.line + << ")" << std::endl; + } + }else{ + ProtocolInitiation protocolInit; + if(protocolInit.decode(in)){ + handler->initiated(&protocolInit); + initiated = true; + if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; + } + } + in.compact(); +} + +void LFSessionContext::write(){ + bool done = isClosed(); + while(!done){ + if(out.available() > 0){ + socket.write(out); + if(out.available() > 0){ + + //incomplete write, leave flags to receive notification of readiness to write + done = true;//finished processing for now, but write is still in progress + } + }else{ + //do we have any frames to write? + Mutex::ScopedLock l(writeLock); + if(!framesToWrite.empty()){ + out.clear(); + bool encoded(false); + AMQFrame* frame = framesToWrite.front(); + while(frame && out.available() >= frame->size()){ + encoded = true; + frame->encode(out); + if(debug) log("SENT", frame); + delete frame; + framesToWrite.pop(); + frame = framesToWrite.empty() ? 0 : framesToWrite.front(); + } + if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); + out.flip(); + }else{ + //reset flags, don't care about writability anymore + fd.reqevents = APR_POLLIN; + done = true; + + if(closing){ + socket.close(); + } + } + } + } +} + +void LFSessionContext::send(AMQFrame* frame){ + Mutex::ScopedLock l(writeLock); + if(!closing){ + framesToWrite.push(frame); + if(!(fd.reqevents & APR_POLLOUT)){ + fd.reqevents |= APR_POLLOUT; + if(!processing){ + processor->update(&fd); + } + } + } +} + +void LFSessionContext::startProcessing(){ + Mutex::ScopedLock l(writeLock); + processing = true; + processor->deactivate(&fd); +} + +void LFSessionContext::stopProcessing(){ + Mutex::ScopedLock l(writeLock); + processor->reactivate(&fd); + processing = false; +} + +void LFSessionContext::close(){ + Mutex::ScopedLock l(writeLock); + closing = true; + if(!processing){ + //allow pending frames to be written to socket + fd.reqevents = APR_POLLOUT; + processor->update(&fd); + } +} + +void LFSessionContext::handleClose(){ + handler->closed(); + APRPool::free(fd.p); + if (debug) std::cout << "Session closed [" << &socket << "]" << std::endl; + delete handler; + delete this; +} + +void LFSessionContext::shutdown(){ + socket.close(); + handleClose(); +} + +void LFSessionContext::init(SessionHandler* _handler){ + handler = _handler; + processor->add(&fd); +} + +void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ + Mutex::ScopedLock l(logLock); + std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; +} + +Mutex LFSessionContext::logLock; diff --git a/Final/cpp/lib/common/sys/apr/LFSessionContext.h b/Final/cpp/lib/common/sys/apr/LFSessionContext.h new file mode 100644 index 0000000000..eeb8279d9a --- /dev/null +++ b/Final/cpp/lib/common/sys/apr/LFSessionContext.h @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _LFSessionContext_ +#define _LFSessionContext_ + +#include <queue> + +#include <apr_network_io.h> +#include <apr_poll.h> +#include <apr_time.h> + +#include <AMQFrame.h> +#include <Buffer.h> +#include <sys/Monitor.h> +#include <sys/SessionContext.h> +#include <sys/SessionHandler.h> + +#include "APRSocket.h" +#include "LFProcessor.h" + +namespace qpid { +namespace sys { + + +class LFSessionContext : public virtual qpid::sys::SessionContext +{ + const bool debug; + APRSocket socket; + bool initiated; + + qpid::framing::Buffer in; + qpid::framing::Buffer out; + + qpid::sys::SessionHandler* handler; + LFProcessor* const processor; + + apr_pollfd_t fd; + + std::queue<qpid::framing::AMQFrame*> framesToWrite; + qpid::sys::Mutex writeLock; + + bool processing; + bool closing; + + static qpid::sys::Mutex logLock; + void log(const std::string& desc, + qpid::framing::AMQFrame* const frame); + + + public: + LFSessionContext(apr_socket_t* socket, + LFProcessor* const processor, + bool debug = false); + virtual ~LFSessionContext(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void close(); + void read(); + void write(); + void init(qpid::sys::SessionHandler* handler); + void startProcessing(); + void stopProcessing(); + void handleClose(); + void shutdown(); + inline apr_pollfd_t* const getFd(){ return &fd; } + inline bool isClosed(){ return !socket.isOpen(); } +}; + +} +} + + +#endif diff --git a/Final/cpp/lib/common/sys/apr/Socket.cpp b/Final/cpp/lib/common/sys/apr/Socket.cpp new file mode 100644 index 0000000000..c2abf50c5f --- /dev/null +++ b/Final/cpp/lib/common/sys/apr/Socket.cpp @@ -0,0 +1,94 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include <sys/Socket.h> +#include <apr/APRBase.h> +#include <apr/APRPool.h> + +#include <iostream> + +using namespace qpid::sys; + +Socket Socket::createTcp() { + Socket s; + apr_pool_t* pool = APRPool::get(); + CHECK_APR_SUCCESS( + apr_socket_create( + &s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, + pool)); + APRPool::free(pool); + return s; +} + +Socket::Socket(apr_socket_t* s) { + socket = s; +} + +void Socket::setTimeout(Time interval) { + apr_socket_timeout_set(socket, interval/TIME_USEC); +} + +void Socket::connect(const std::string& host, int port) { + apr_sockaddr_t* address; + apr_pool_t* pool = APRPool::get(); + CHECK_APR_SUCCESS( + apr_sockaddr_info_get( + &address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, + pool)); + CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); + APRPool::free(pool); +} + +void Socket::close() { + if (socket == 0) return; + CHECK_APR_SUCCESS(apr_socket_shutdown(socket, APR_SHUTDOWN_READWRITE)); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + socket = 0; +} + +ssize_t Socket::send(const void* data, size_t size) +{ + apr_size_t sent = size; + apr_status_t status = + apr_socket_send(socket, reinterpret_cast<const char*>(data), &sent); + if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT; + if (APR_STATUS_IS_EOF(status)) return SOCKET_EOF; + CHECK_APR_SUCCESS(status); + return sent; +} + +ssize_t Socket::recv(void* data, size_t size) +{ + apr_size_t received = size; + apr_status_t status = + apr_socket_recv(socket, reinterpret_cast<char*>(data), &received); + if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT; + if (APR_STATUS_IS_EOF(status)) return SOCKET_EOF; + CHECK_APR_SUCCESS(status); + return received; +} + +void Socket::setTcpNoDelay(bool on) +{ + CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_TCP_NODELAY, on ? 1 : 0)); +} + diff --git a/Final/cpp/lib/common/sys/apr/Thread.cpp b/Final/cpp/lib/common/sys/apr/Thread.cpp new file mode 100644 index 0000000000..997ff03ab3 --- /dev/null +++ b/Final/cpp/lib/common/sys/apr/Thread.cpp @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sys/Thread.h> +#include "APRPool.h" + +using namespace qpid::sys; +using qpid::sys::Runnable; + +void* APR_THREAD_FUNC Thread::runRunnable(apr_thread_t* thread, void *data) { + reinterpret_cast<Runnable*>(data)->run(); + CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); + return NULL; +} + +Thread::~Thread() { +} + + diff --git a/Final/cpp/lib/common/sys/posix/EventChannel.cpp b/Final/cpp/lib/common/sys/posix/EventChannel.cpp new file mode 100644 index 0000000000..8b91fc1ba6 --- /dev/null +++ b/Final/cpp/lib/common/sys/posix/EventChannel.cpp @@ -0,0 +1,328 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <mqueue.h> +#include <string.h> +#include <iostream> + +#include <sys/errno.h> +#include <sys/socket.h> +#include <sys/epoll.h> + +#include <typeinfo> +#include <iostream> +#include <queue> + +#include <boost/ptr_container/ptr_map.hpp> +#include <boost/current_function.hpp> + +#include <QpidError.h> +#include <sys/Monitor.h> + +#include "check.h" +#include "EventChannel.h" + +using namespace std; + + +// Convenience template to zero out a struct. +template <class S> struct ZeroStruct : public S { + ZeroStruct() { memset(this, 0, sizeof(*this)); } +}; + +namespace qpid { +namespace sys { + + +/** + * EventHandler wraps an epoll file descriptor. Acts as private + * interface between EventChannel and subclasses. + * + * Also implements Event interface for events that are not associated + * with a file descriptor and are passed via the message queue. + */ +class EventHandler : public Event, private Monitor +{ + public: + EventHandler(int epollSize = 256); + ~EventHandler(); + + int getEpollFd() { return epollFd; } + void epollAdd(int fd, uint32_t epollEvents, Event* event); + void epollMod(int fd, uint32_t epollEvents, Event* event); + void epollDel(int fd); + + void mqPut(Event* event); + Event* mqGet(); + + protected: + // Should never be called, only complete. + void prepare(EventHandler&) { assert(0); } + Event* complete(EventHandler& eh); + + private: + int epollFd; + std::string mqName; + int mqFd; + std::queue<Event*> mqEvents; +}; + +EventHandler::EventHandler(int epollSize) +{ + epollFd = epoll_create(epollSize); + if (epollFd < 0) throw QPID_POSIX_ERROR(errno); + + // Create a POSIX message queue for non-fd events. + // We write one byte and never read it is always ready for read + // when we add it to epoll. + // + ZeroStruct<struct mq_attr> attr; + attr.mq_maxmsg = 1; + attr.mq_msgsize = 1; + do { + char tmpnam[L_tmpnam]; + tmpnam_r(tmpnam); + mqName = tmpnam + 4; // Skip "tmp/" + mqFd = mq_open( + mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr); + if (mqFd < 0) throw QPID_POSIX_ERROR(errno); + } while (mqFd == EEXIST); // Name already taken, try again. + + static char zero = '\0'; + mq_send(mqFd, &zero, 1, 0); + epollAdd(mqFd, 0, this); +} + +EventHandler::~EventHandler() { + mq_close(mqFd); + mq_unlink(mqName.c_str()); +} + +void EventHandler::mqPut(Event* event) { + ScopedLock l(*this); + assert(event != 0); + mqEvents.push(event); + epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); +} + +Event* EventHandler::mqGet() { + ScopedLock l(*this); + if (mqEvents.empty()) + return 0; + Event* event = mqEvents.front(); + mqEvents.pop(); + if(!mqEvents.empty()) + epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); + return event; +} + +void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event) +{ + ZeroStruct<struct epoll_event> ee; + ee.data.ptr = event; + ee.events = epollEvents; + if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) + throw QPID_POSIX_ERROR(errno); +} + +void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event) +{ + ZeroStruct<struct epoll_event> ee; + ee.data.ptr = event; + ee.events = epollEvents; + if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) + throw QPID_POSIX_ERROR(errno); +} + +void EventHandler::epollDel(int fd) { + if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0) + throw QPID_POSIX_ERROR(errno); +} + +Event* EventHandler::complete(EventHandler& eh) +{ + assert(&eh == this); + Event* event = mqGet(); + return event==0 ? 0 : event->complete(eh); +} + +// ================================================================ +// EventChannel + +EventChannel::shared_ptr EventChannel::create() { + return shared_ptr(new EventChannel()); +} + +EventChannel::EventChannel() : handler(new EventHandler()) {} + +EventChannel::~EventChannel() {} + +void EventChannel::postEvent(Event& e) +{ + e.prepare(*handler); +} + +Event* EventChannel::getEvent() +{ + static const int infiniteTimeout = -1; + ZeroStruct<struct epoll_event> epollEvent; + + // Loop until we can complete the event. Some events may re-post + // themselves and return 0 from complete, e.g. partial reads. // + Event* event = 0; + while (event == 0) { + int eventCount = epoll_wait(handler->getEpollFd(), + &epollEvent, 1, infiniteTimeout); + if (eventCount < 0) { + if (errno != EINTR) { + // TODO aconway 2006-11-28: Proper handling/logging of errors. + cerr << BOOST_CURRENT_FUNCTION << " ignoring error " + << PosixError::getMessage(errno) << endl; + assert(0); + } + } + else if (eventCount == 1) { + event = reinterpret_cast<Event*>(epollEvent.data.ptr); + assert(event != 0); + try { + event = event->complete(*handler); + } + catch (const Exception& e) { + if (event) + event->setError(e); + } + catch (const std::exception& e) { + if (event) + event->setError(e); + } + } + } + return event; +} + +Event::~Event() {} + +void Event::prepare(EventHandler& handler) +{ + handler.mqPut(this); +} + +bool Event::hasError() const { + return error; +} + +void Event::throwIfError() throw (Exception) { + if (hasError()) + error.throwSelf(); +} + +Event* Event::complete(EventHandler&) +{ + return this; +} + +void Event::dispatch() +{ + try { + if (!callback.empty()) + callback(); + } catch (const std::exception&) { + throw; + } catch (...) { + throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception."); + } +} + +void Event::setError(const ExceptionHolder& e) { + error = e; +} + +void ReadEvent::prepare(EventHandler& handler) +{ + handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +} + +ssize_t ReadEvent::doRead() { + ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received, + size - received); + if (n > 0) received += n; + return n; +} + +Event* ReadEvent::complete(EventHandler& handler) +{ + // Read as much as possible without blocking. + ssize_t n = doRead(); + while (n > 0 && received < size) doRead(); + + if (received == size) { + handler.epollDel(descriptor); + received = 0; // Reset for re-use. + return this; + } + else if (n <0 && (errno == EAGAIN)) { + // Keep polling for more. + handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this); + return 0; + } + else { + // Unexpected EOF or error. Throw ENODATA for EOF. + handler.epollDel(descriptor); + received = 0; // Reset for re-use. + throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA); + } +} + +void WriteEvent::prepare(EventHandler& handler) +{ + handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this); +} + +Event* WriteEvent::complete(EventHandler& handler) +{ + ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written, + size - written); + if (n < 0) throw QPID_POSIX_ERROR(errno); + written += n; + if(written < size) { + // Keep polling. + handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this); + return 0; + } + written = 0; // Reset for re-use. + handler.epollDel(descriptor); + return this; +} + +void AcceptEvent::prepare(EventHandler& handler) +{ + handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +} + +Event* AcceptEvent::complete(EventHandler& handler) +{ + handler.epollDel(descriptor); + accepted = ::accept(descriptor, 0, 0); + if (accepted < 0) throw QPID_POSIX_ERROR(errno); + return this; +} + +}} diff --git a/Final/cpp/lib/common/sys/posix/EventChannel.h b/Final/cpp/lib/common/sys/posix/EventChannel.h new file mode 100644 index 0000000000..56c1d1549d --- /dev/null +++ b/Final/cpp/lib/common/sys/posix/EventChannel.h @@ -0,0 +1,179 @@ +#ifndef _sys_EventChannel_h +#define _sys_EventChannel_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <SharedObject.h> +#include <ExceptionHolder.h> +#include <boost/function.hpp> +#include <memory> + +namespace qpid { +namespace sys { + +class Event; +class EventHandler; +class EventChannel; + +/** + * Base class for all Events. + */ +class Event +{ + public: + /** Type for callback when event is dispatched */ + typedef boost::function0<void> Callback; + + /** + * Create an event with optional callback. + * Instances of Event are sent directly through the channel. + * Derived classes define additional waiting behaviour. + *@param cb A callback functor that is invoked when dispatch() is called. + */ + Event(Callback cb = 0) : callback(cb) {} + + virtual ~Event(); + + /** Call the callback provided to the constructor, if any. */ + void dispatch(); + + /** True if there was an error processing this event */ + bool hasError() const; + + /** If hasError() throw the corresponding exception. */ + void throwIfError() throw(Exception); + + protected: + virtual void prepare(EventHandler&); + virtual Event* complete(EventHandler&); + void setError(const ExceptionHolder& e); + + Callback callback; + ExceptionHolder error; + + friend class EventChannel; + friend class EventHandler; +}; + +template <class BufT> +class IOEvent : public Event { + public: + void getDescriptor() const { return descriptor; } + size_t getSize() const { return size; } + BufT getBuffer() const { return buffer; } + + protected: + IOEvent(int fd, Callback cb, size_t sz, BufT buf) : + Event(cb), descriptor(fd), buffer(buf), size(sz) {} + + int descriptor; + BufT buffer; + size_t size; +}; + +/** Asynchronous read event */ +class ReadEvent : public IOEvent<void*> +{ + public: + explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) : + IOEvent<void*>(fd, cb, sz, buf), received(0) {} + + private: + void prepare(EventHandler&); + Event* complete(EventHandler&); + ssize_t doRead(); + + size_t received; +}; + +/** Asynchronous write event */ +class WriteEvent : public IOEvent<const void*> +{ + public: + explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0, + Callback cb=0) : + IOEvent<const void*>(fd, cb, sz, buf), written(0) {} + + protected: + void prepare(EventHandler&); + Event* complete(EventHandler&); + + private: + ssize_t doWrite(); + size_t written; +}; + +/** Asynchronous socket accept event */ +class AcceptEvent : public Event +{ + public: + /** Accept a connection on fd. */ + explicit AcceptEvent(int fd=-1, Callback cb=0) : + Event(cb), descriptor(fd), accepted(0) {} + + /** Get descriptor for server socket */ + int getAcceptedDesscriptor() const { return accepted; } + + private: + void prepare(EventHandler&); + Event* complete(EventHandler&); + + int descriptor; + int accepted; +}; + + +class QueueSet; + +/** + * Channel to post and wait for events. + */ +class EventChannel : public qpid::SharedObject<EventChannel> +{ + public: + static shared_ptr create(); + + ~EventChannel(); + + /** Post an event to the channel. */ + void postEvent(Event& event); + + /** Post an event to the channel. Must not be 0. */ + void postEvent(Event* event) { postEvent(*event); } + + /** + * Wait for the next complete event. + *@return Pointer to event. Will never return 0. + */ + Event* getEvent(); + + private: + EventChannel(); + boost::shared_ptr<EventHandler> handler; +}; + + +}} + + + +#endif /*!_sys_EventChannel_h*/ diff --git a/Final/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/Final/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp new file mode 100644 index 0000000000..7cd6f60902 --- /dev/null +++ b/Final/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp @@ -0,0 +1,149 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> + +#include <boost/assert.hpp> +#include <boost/ptr_container/ptr_vector.hpp> +#include <boost/ptr_container/ptr_deque.hpp> +#include <boost/bind.hpp> +#include <boost/scoped_ptr.hpp> + +#include <sys/SessionContext.h> +#include <sys/SessionHandler.h> +#include <sys/SessionHandlerFactory.h> +#include <sys/Acceptor.h> +#include <sys/Socket.h> +#include <framing/Buffer.h> +#include <framing/AMQFrame.h> +#include <Exception.h> + +#include "EventChannelConnection.h" + +namespace qpid { +namespace sys { + +using namespace qpid::framing; +using namespace std; + +class EventChannelAcceptor : public Acceptor { + public: + + + EventChannelAcceptor( + int16_t port_, int backlog, int nThreads, bool trace_ + ); + + int getPort() const; + + void run(SessionHandlerFactory& factory); + + void shutdown(); + + private: + + void accept(); + + Mutex lock; + Socket listener; + const int port; + const bool isTrace; + bool isRunning; + boost::ptr_vector<EventChannelConnection> connections; + AcceptEvent acceptEvent; + SessionHandlerFactory* factory; + bool isShutdown; + EventChannelThreads::shared_ptr threads; +}; + +Acceptor::shared_ptr Acceptor::create( + int16_t port, int backlog, int threads, bool trace) +{ + return Acceptor::shared_ptr( + new EventChannelAcceptor(port, backlog, threads, trace)); +} + +// Must define Acceptor virtual dtor. +Acceptor::~Acceptor() {} + +EventChannelAcceptor::EventChannelAcceptor( + int16_t port_, int backlog, int nThreads, bool trace_ +) : listener(Socket::createTcp()), + port(listener.listen(int(port_), backlog)), + isTrace(trace_), + isRunning(false), + acceptEvent(listener.fd(), + boost::bind(&EventChannelAcceptor::accept, this)), + factory(0), + isShutdown(false), + threads(EventChannelThreads::create(EventChannel::create(), nThreads)) +{ } + +int EventChannelAcceptor::getPort() const { + return port; // Immutable no need for lock. +} + +void EventChannelAcceptor::run(SessionHandlerFactory& f) { + { + Mutex::ScopedLock l(lock); + if (!isRunning && !isShutdown) { + isRunning = true; + factory = &f; + threads->post(acceptEvent); + } + } + threads->join(); // Wait for shutdown. +} + +void EventChannelAcceptor::shutdown() { + bool doShutdown = false; + { + Mutex::ScopedLock l(lock); + doShutdown = !isShutdown; // I'm the shutdown thread. + isShutdown = true; + } + if (doShutdown) { + ::close(acceptEvent.getDescriptor()); + threads->shutdown(); + for_each(connections.begin(), connections.end(), + boost::bind(&EventChannelConnection::close, _1)); + } + threads->join(); +} + +void EventChannelAcceptor::accept() +{ + // No lock, we only post one accept event at a time. + if (isShutdown) + return; + if (acceptEvent.getException()) { + Exception::log(*acceptEvent.getException(), + "EventChannelAcceptor::accept"); + shutdown(); + return; + } + // TODO aconway 2006-11-29: Need to reap closed connections also. + int fd = acceptEvent.getAcceptedDesscriptor(); + connections.push_back( + new EventChannelConnection(threads, *factory, fd, fd, isTrace)); + threads->post(acceptEvent); // Keep accepting. +} + +}} // namespace qpid::sys diff --git a/Final/cpp/lib/common/sys/posix/EventChannelConnection.cpp b/Final/cpp/lib/common/sys/posix/EventChannelConnection.cpp new file mode 100644 index 0000000000..85014b7bd4 --- /dev/null +++ b/Final/cpp/lib/common/sys/posix/EventChannelConnection.cpp @@ -0,0 +1,232 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <iostream> + +#include <boost/bind.hpp> +#include <boost/assert.hpp> + +#include "EventChannelConnection.h" +#include "sys/SessionHandlerFactory.h" +#include "QpidError.h" + +using namespace std; +using namespace qpid; +using namespace qpid::framing; + +namespace qpid { +namespace sys { + +const size_t EventChannelConnection::bufferSize = 65536; + +EventChannelConnection::EventChannelConnection( + EventChannelThreads::shared_ptr threads_, + SessionHandlerFactory& factory_, + int rfd, + int wfd, + bool isTrace_ +) : + readFd(rfd), + writeFd(wfd ? wfd : rfd), + readCallback(boost::bind(&EventChannelConnection::closeOnException, + this, &EventChannelConnection::endInitRead)), + + isWriting(false), + isClosed(false), + threads(threads_), + handler(factory_.create(this)), + in(bufferSize), + out(bufferSize), + isTrace(isTrace_) +{ + BOOST_ASSERT(readFd > 0); + BOOST_ASSERT(writeFd > 0); + closeOnException(&EventChannelConnection::startRead); +} + + +void EventChannelConnection::send(std::auto_ptr<AMQFrame> frame) { + { + Monitor::ScopedLock lock(monitor); + assert(frame.get()); + writeFrames.push_back(frame.release()); + } + closeOnException(&EventChannelConnection::startWrite); +} + +void EventChannelConnection::close() { + { + Monitor::ScopedLock lock(monitor); + if (isClosed) + return; + isClosed = true; + } + ::close(readFd); + ::close(writeFd); + { + Monitor::ScopedLock lock(monitor); + while (busyThreads > 0) + monitor.wait(); + } + handler->closed(); +} + +void EventChannelConnection::closeNoThrow() { + Exception::tryCatchLog<void>( + boost::bind(&EventChannelConnection::close, this), + false, + "Exception closing channel" + ); +} + +/** + * Call f in a try/catch block and close the connection if + * an exception is thrown. + */ +void EventChannelConnection::closeOnException(MemberFnPtr f) +{ + try { + Exception::tryCatchLog<void>( + boost::bind(f, this), + "Closing connection due to exception" + ); + return; + } catch (...) { + // Exception was already logged by tryCatchLog + closeNoThrow(); + } +} + +// Post the write event. +// Always called inside closeOnException. +// Called by endWrite and send, but only one thread writes at a time. +// +void EventChannelConnection::startWrite() { + FrameQueue::auto_type frame; + { + Monitor::ScopedLock lock(monitor); + // Stop if closed or a write event is already in progress. + if (isClosed || isWriting) + return; + if (writeFrames.empty()) { + isWriting = false; + return; + } + isWriting = true; + frame = writeFrames.pop_front(); + } + // No need to lock here - only one thread can be writing at a time. + out.clear(); + if (isTrace) + cout << "Send on socket " << writeFd << ": " << *frame << endl; + frame->encode(out); + out.flip(); + writeEvent = WriteEvent( + writeFd, out.start(), out.available(), + boost::bind(&EventChannelConnection::closeOnException, + this, &EventChannelConnection::endWrite)); + threads->post(writeEvent); +} + +// ScopedBusy ctor increments busyThreads. +// dtor decrements and calls monitor.notifyAll if it reaches 0. +// +struct EventChannelConnection::ScopedBusy : public AtomicCount::ScopedIncrement +{ + ScopedBusy(EventChannelConnection& ecc) + : AtomicCount::ScopedIncrement( + ecc.busyThreads, boost::bind(&Monitor::notifyAll, &ecc.monitor)) + {} +}; + +// Write event completed. +// Always called by a channel thread inside closeOnException. +// +void EventChannelConnection::endWrite() { + ScopedBusy(*this); + { + Monitor::ScopedLock lock(monitor); + isWriting = false; + if (isClosed) + return; + writeEvent.throwIfException(); + } + // Check if there's more in to write in the write queue. + startWrite(); +} + + +// Post the read event. +// Always called inside closeOnException. +// Called from ctor and end[Init]Read, so only one call at a time +// is possible since we only post one read event at a time. +// +void EventChannelConnection::startRead() { + // Non blocking read, as much as we can swallow. + readEvent = ReadEvent( + readFd, in.start(), in.available(), readCallback,true); + threads->post(readEvent); +} + +// Completion of initial read, expect protocolInit. +// Always called inside closeOnException in channel thread. +// Only called by one thread at a time. +void EventChannelConnection::endInitRead() { + ScopedBusy(*this); + if (!isClosed) { + readEvent.throwIfException(); + in.move(readEvent.getBytesRead()); + in.flip(); + ProtocolInitiation protocolInit; + if(protocolInit.decode(in)){ + handler->initiated(&protocolInit); + readCallback = boost::bind( + &EventChannelConnection::closeOnException, + this, &EventChannelConnection::endRead); + } + in.compact(); + // Continue reading. + startRead(); + } +} + +// Normal reads, expect a frame. +// Always called inside closeOnException in channel thread. +void EventChannelConnection::endRead() { + ScopedBusy(*this); + if (!isClosed) { + readEvent.throwIfException(); + in.move(readEvent.getBytesRead()); + in.flip(); + AMQFrame frame; + while (frame.decode(in)) { + // TODO aconway 2006-11-30: received should take Frame& + if (isTrace) + cout << "Received on socket " << readFd + << ": " << frame << endl; + handler->received(&frame); + } + in.compact(); + startRead(); + } +} + +}} // namespace qpid::sys diff --git a/Final/cpp/lib/common/sys/posix/EventChannelConnection.h b/Final/cpp/lib/common/sys/posix/EventChannelConnection.h new file mode 100644 index 0000000000..da190b0213 --- /dev/null +++ b/Final/cpp/lib/common/sys/posix/EventChannelConnection.h @@ -0,0 +1,105 @@ +#ifndef _posix_EventChannelConnection_h +#define _posix_EventChannelConnection_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/ptr_container/ptr_deque.hpp> + +#include "EventChannelThreads.h" +#include "sys/Monitor.h" +#include "sys/SessionContext.h" +#include "sys/SessionHandler.h" +#include "sys/AtomicCount.h" +#include "framing/AMQFrame.h" + +namespace qpid { +namespace sys { + +class SessionHandlerFactory; + +/** + * Implements SessionContext and delegates to a SessionHandler + * for a connection via the EventChannel. + *@param readDescriptor file descriptor for reading. + *@param writeDescriptor file descriptor for writing, + * by default same as readDescriptor + */ +class EventChannelConnection : public SessionContext { + public: + EventChannelConnection( + EventChannelThreads::shared_ptr threads, + SessionHandlerFactory& factory, + int readDescriptor, + int writeDescriptor = 0, + bool isTrace = false + ); + + // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr + virtual void send(qpid::framing::AMQFrame* frame) { + send(std::auto_ptr<qpid::framing::AMQFrame>(frame)); + } + + virtual void send(std::auto_ptr<qpid::framing::AMQFrame> frame); + + virtual void close(); + + private: + typedef boost::ptr_deque<qpid::framing::AMQFrame> FrameQueue; + typedef void (EventChannelConnection::*MemberFnPtr)(); + struct ScopedBusy; + + void startWrite(); + void endWrite(); + void startRead(); + void endInitRead(); + void endRead(); + void closeNoThrow(); + void closeOnException(MemberFnPtr); + bool shouldContinue(bool& flag); + + static const size_t bufferSize; + + Monitor monitor; + + int readFd, writeFd; + ReadEvent readEvent; + WriteEvent writeEvent; + Event::Callback readCallback; + bool isWriting; + bool isClosed; + AtomicCount busyThreads; + + EventChannelThreads::shared_ptr threads; + std::auto_ptr<SessionHandler> handler; + qpid::framing::Buffer in, out; + FrameQueue writeFrames; + bool isTrace; + + friend struct ScopedBusy; +}; + + +}} // namespace qpid::sys + + + +#endif /*!_posix_EventChannelConnection_h*/ diff --git a/Final/cpp/lib/common/sys/posix/EventChannelThreads.cpp b/Final/cpp/lib/common/sys/posix/EventChannelThreads.cpp new file mode 100644 index 0000000000..fe8a290729 --- /dev/null +++ b/Final/cpp/lib/common/sys/posix/EventChannelThreads.cpp @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "EventChannelThreads.h" +#include <sys/Runnable.h> +#include <iostream> +using namespace std; +#include <boost/bind.hpp> + +namespace qpid { +namespace sys { + +EventChannelThreads::shared_ptr EventChannelThreads::create( + EventChannel::shared_ptr ec) +{ + return EventChannelThreads::shared_ptr(new EventChannelThreads(ec)); +} + +EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) : + channel(ec), nWaiting(0), state(RUNNING) +{ + // TODO aconway 2006-11-15: Estimate initial threads based on CPUs. + addThread(); +} + +EventChannelThreads::~EventChannelThreads() { + shutdown(); + join(); +} + +void EventChannelThreads::shutdown() +{ + ScopedLock lock(*this); + if (state != RUNNING) // Already shutting down. + return; + for (size_t i = 0; i < workers.size(); ++i) { + channel->postEvent(terminate); + } + state = TERMINATE_SENT; + notify(); // Wake up one join() thread. +} + +void EventChannelThreads::join() +{ + { + ScopedLock lock(*this); + while (state == RUNNING) // Wait for shutdown to start. + wait(); + if (state == SHUTDOWN) // Shutdown is complete + return; + if (state == JOINING) { + // Someone else is doing the join. + while (state != SHUTDOWN) + wait(); + return; + } + // I'm the joining thread + assert(state == TERMINATE_SENT); + state = JOINING; + } // Drop the lock. + + for (size_t i = 0; i < workers.size(); ++i) { + assert(state == JOINING); // Only this thread can change JOINING. + workers[i].join(); + } + state = SHUTDOWN; + notifyAll(); // Notify other join() threaeds. +} + +void EventChannelThreads::addThread() { + ScopedLock l(*this); + workers.push_back(Thread(*this)); +} + +void EventChannelThreads::run() +{ + // Start life waiting. Decrement on exit. + AtomicCount::ScopedIncrement inc(nWaiting); + try { + while (true) { + Event* e = channel->getEvent(); + assert(e != 0); + if (e == &terminate) { + return; + } + AtomicCount::ScopedDecrement dec(nWaiting); + // I'm no longer waiting, make sure someone is. + if (dec == 0) + addThread(); + e->dispatch(); + } + } + catch (const std::exception& e) { + // TODO aconway 2006-11-15: need better logging across the board. + std::cerr << "EventChannelThreads::run() caught: " << e.what() + << std::endl; + } + catch (...) { + std::cerr << "EventChannelThreads::run() caught unknown exception." + << std::endl; + } +} + +}} diff --git a/Final/cpp/lib/common/sys/posix/EventChannelThreads.h b/Final/cpp/lib/common/sys/posix/EventChannelThreads.h new file mode 100644 index 0000000000..e28190e0ed --- /dev/null +++ b/Final/cpp/lib/common/sys/posix/EventChannelThreads.h @@ -0,0 +1,95 @@ +#ifndef _posix_EventChannelThreads_h +#define _sys_EventChannelThreads_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <vector> + +#include <Exception.h> +#include <sys/Time.h> +#include <sys/Monitor.h> +#include <sys/Thread.h> +#include <sys/AtomicCount.h> +#include "EventChannel.h" + +namespace qpid { +namespace sys { + +/** + Dynamic thread pool serving an EventChannel. + + Threads run a loop { e = getEvent(); e->dispatch(); } + The size of the thread pool is automatically adjusted to optimal size. +*/ +class EventChannelThreads : + public qpid::SharedObject<EventChannelThreads>, + public sys::Monitor, private sys::Runnable +{ + public: + /** Create the thread pool and start initial threads. */ + static EventChannelThreads::shared_ptr create( + EventChannel::shared_ptr channel + ); + + ~EventChannelThreads(); + + /** Post event to the underlying channel */ + void postEvent(Event& event) { channel->postEvent(event); } + + /** Post event to the underlying channel Must not be 0. */ + void postEvent(Event* event) { channel->postEvent(event); } + + /** + * Terminate all threads. + * + * Returns immediately, use join() to wait till all threads are + * shut down. + */ + void shutdown(); + + /** Wait for all threads to terminate. */ + void join(); + + private: + typedef std::vector<sys::Thread> Threads; + typedef enum { + RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN + } State; + + EventChannelThreads(EventChannel::shared_ptr underlyingChannel); + void addThread(); + + void run(); + bool keepRunning(); + void adjustThreads(); + + EventChannel::shared_ptr channel; + Threads workers; + sys::AtomicCount nWaiting; + State state; + Event terminate; +}; + + +}} + + +#endif /*!_sys_EventChannelThreads_h*/ diff --git a/Final/cpp/lib/common/sys/posix/PosixAcceptor.cpp b/Final/cpp/lib/common/sys/posix/PosixAcceptor.cpp new file mode 100644 index 0000000000..842aa76f36 --- /dev/null +++ b/Final/cpp/lib/common/sys/posix/PosixAcceptor.cpp @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sys/Acceptor.h> +#include <Exception.h> + +namespace qpid { +namespace sys { + +namespace { +void fail() { throw qpid::Exception("PosixAcceptor not implemented"); } +} + +class PosixAcceptor : public Acceptor { + public: + virtual int16_t getPort() const { fail(); return 0; } + virtual void run(qpid::sys::SessionHandlerFactory* ) { fail(); } + virtual void shutdown() { fail(); } +}; + +// Define generic Acceptor::create() to return APRAcceptor. + Acceptor::shared_ptr Acceptor::create(int16_t , int, int, bool) +{ + return Acceptor::shared_ptr(new PosixAcceptor()); +} + +// Must define Acceptor virtual dtor. +Acceptor::~Acceptor() {} + +}} diff --git a/Final/cpp/lib/common/sys/posix/Socket.cpp b/Final/cpp/lib/common/sys/posix/Socket.cpp new file mode 100644 index 0000000000..e27ced9161 --- /dev/null +++ b/Final/cpp/lib/common/sys/posix/Socket.cpp @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sys/socket.h> +#include <sys/errno.h> +#include <netinet/in.h> +#include <netdb.h> + +#include <boost/format.hpp> + +#include <QpidError.h> +#include <posix/check.h> +#include <sys/Socket.h> + +using namespace qpid::sys; + +Socket Socket::createTcp() +{ + int s = ::socket (PF_INET, SOCK_STREAM, 0); + if (s < 0) throw QPID_POSIX_ERROR(errno); + return s; +} + +Socket::Socket(int descriptor) : socket(descriptor) {} + +void Socket::setTimeout(Time interval) +{ + struct timeval tv; + tv.tv_sec = interval/TIME_SEC; + tv.tv_usec = (interval%TIME_SEC)/TIME_USEC; + setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); +} + +void Socket::connect(const std::string& host, int port) +{ + struct sockaddr_in name; + name.sin_family = AF_INET; + name.sin_port = htons(port); + struct hostent* hp = gethostbyname ( host.c_str() ); + if (hp == 0) throw QPID_POSIX_ERROR(errno); + memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length); + if (::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0) + throw QPID_POSIX_ERROR(errno); +} + +void +Socket::close() +{ + if (socket == 0) return; + if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno); + socket = 0; +} + +ssize_t +Socket::send(const void* data, size_t size) +{ + ssize_t sent = ::send(socket, data, size, 0); + if (sent < 0) { + if (errno == ECONNRESET) return SOCKET_EOF; + if (errno == ETIMEDOUT) return SOCKET_TIMEOUT; + throw QPID_POSIX_ERROR(errno); + } + return sent; +} + +ssize_t +Socket::recv(void* data, size_t size) +{ + ssize_t received = ::recv(socket, data, size, 0); + if (received < 0) { + if (errno == ETIMEDOUT) return SOCKET_TIMEOUT; + throw QPID_POSIX_ERROR(errno); + } + return received; +} + +int Socket::listen(int port, int backlog) +{ + struct sockaddr_in name; + name.sin_family = AF_INET; + name.sin_port = htons(port); + name.sin_addr.s_addr = 0; + if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0) + throw QPID_POSIX_ERROR(errno); + if (::listen(socket, backlog) < 0) + throw QPID_POSIX_ERROR(errno); + + socklen_t namelen = sizeof(name); + if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) + throw QPID_POSIX_ERROR(errno); + + return ntohs(name.sin_port); +} + + +int Socket::fd() +{ + return socket; +} + +void Socket::setTcpNoDelay(bool) {} //not yet implemented diff --git a/Final/cpp/lib/common/sys/posix/Thread.cpp b/Final/cpp/lib/common/sys/posix/Thread.cpp new file mode 100644 index 0000000000..7291022dc6 --- /dev/null +++ b/Final/cpp/lib/common/sys/posix/Thread.cpp @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sys/Thread.h> + +void* qpid::sys::Thread::runRunnable(void* p) +{ + static_cast<Runnable*>(p)->run(); + return 0; +} + +Thread::~Thread() { +} diff --git a/Final/cpp/lib/common/sys/posix/check.cpp b/Final/cpp/lib/common/sys/posix/check.cpp new file mode 100644 index 0000000000..408679caa8 --- /dev/null +++ b/Final/cpp/lib/common/sys/posix/check.cpp @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <cerrno> +#include "check.h" + +namespace qpid { +namespace sys { + +std::string +PosixError::getMessage(int errNo) +{ + char buf[512]; + return std::string(strerror_r(errNo, buf, sizeof(buf))); +} + +PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw() + : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc) +{ } + +}} diff --git a/Final/cpp/lib/common/sys/posix/check.h b/Final/cpp/lib/common/sys/posix/check.h new file mode 100644 index 0000000000..5afbe8f5a8 --- /dev/null +++ b/Final/cpp/lib/common/sys/posix/check.h @@ -0,0 +1,62 @@ +#ifndef _posix_check_h +#define _posix_check_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <cerrno> +#include <string> +#include <QpidError.h> + +namespace qpid { +namespace sys { + +/** + * Exception with message from errno. + */ +class PosixError : public qpid::QpidError +{ + public: + static std::string getMessage(int errNo); + + PosixError(int errNo, const qpid::SrcLine& location) throw(); + + ~PosixError() throw() {} + + int getErrNo() { return errNo; } + + Exception* clone() const throw() { return new PosixError(*this); } + + void throwSelf() { throw *this; } + + private: + int errNo; +}; + +}} + +/** Create a PosixError for the current file/line and errno. */ +#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE) + +/** Throw a posix error if errNo is non-zero */ +#define QPID_POSIX_THROW_IF(ERRNO) \ + if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO)) +#endif /*!_posix_check_h*/ |