diff options
author | Alan Conway <aconway@apache.org> | 2006-11-01 01:19:12 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-11-01 01:19:12 +0000 |
commit | dda71d21e76e01918ebec2d80dd8e077f94216e0 (patch) | |
tree | 79283c295e00de1eee8d98d4fd9b781db8497c28 /cpp/src | |
parent | 9094d2b10ecadd66fa3b22169183e7573cc79629 (diff) | |
download | qpid-python-dda71d21e76e01918ebec2d80dd8e077f94216e0.tar.gz |
Moved APR specific sources into src_apr.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469738 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
24 files changed, 27 insertions, 1807 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 88dad7aaf9..d671cea9a5 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -38,7 +38,7 @@ Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, lastUsed(0), exclusive(0) { - if(autodelete) lastUsed = apr_time_as_msec(apr_time_now()); + if(autodelete) lastUsed = Time::now().msecs(); } Queue::~Queue(){ @@ -128,7 +128,7 @@ void Queue::consume(Consumer* c, bool requestExclusive){ void Queue::cancel(Consumer* c){ Locker locker(lock); consumers.erase(find(consumers.begin(), consumers.end(), c)); - if(autodelete && consumers.empty()) lastUsed = apr_time_as_msec(apr_time_now()); + if(autodelete && consumers.empty()) lastUsed = Time::now().msecs(); if(exclusive == c) exclusive = 0; } @@ -161,7 +161,7 @@ u_int32_t Queue::getConsumerCount() const{ bool Queue::canAutoDelete() const{ Locker locker(lock); - return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete); + return lastUsed && (Time::now().msecs() - lastUsed > autodelete); } void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){ diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index f954e48c20..edc7c99b4f 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -21,13 +21,13 @@ #include <vector> #include <queue> #include <boost/shared_ptr.hpp> -#include "apr-1/apr_time.h" #include "qpid/framing/amqp_types.h" #include "qpid/broker/Binding.h" #include "qpid/broker/ConnectionToken.h" #include "qpid/broker/Consumer.h" #include "qpid/broker/Message.h" #include "qpid/concurrent/Monitor.h" +#include "qpid/concurrent/Time.h" namespace qpid { namespace broker { @@ -57,7 +57,7 @@ namespace qpid { bool dispatching; int next; mutable qpid::concurrent::Monitor lock; - apr_time_t lastUsed; + int64_t lastUsed; Consumer* exclusive; bool startDispatching(); diff --git a/cpp/src/qpid/concurrent/APRBase.cpp b/cpp/src/qpid/concurrent/APRBase.cpp deleted file mode 100644 index 514c4d1048..0000000000 --- a/cpp/src/qpid/concurrent/APRBase.cpp +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include "qpid/concurrent/APRBase.h" -#include "qpid/QpidError.h" - -using namespace qpid::concurrent; - -APRBase* APRBase::instance = 0; - -APRBase* APRBase::getInstance(){ - if(instance == 0){ - instance = new APRBase(); - } - return instance; -} - - -APRBase::APRBase() : count(0){ - apr_initialize(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, 0)); - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); -} - -APRBase::~APRBase(){ - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); - apr_pool_destroy(pool); - apr_terminate(); -} - -bool APRBase::_increment(){ - bool deleted(false); - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); - if(this == instance){ - count++; - }else{ - deleted = true; - } - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); - return !deleted; -} - -void APRBase::_decrement(){ - APRBase* copy = 0; - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); - if(--count == 0){ - copy = instance; - instance = 0; - } - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); - if(copy != 0){ - delete copy; - } -} - -void APRBase::increment(){ - int count = 0; - while(count++ < 2 && !getInstance()->_increment()){ - std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl; - } -} - -void APRBase::decrement(){ - getInstance()->_decrement(); -} - -void qpid::concurrent::check(apr_status_t status, const std::string& file, const int line){ - if (status != APR_SUCCESS){ - const int size = 50; - char tmp[size]; - std::string msg(apr_strerror(status, tmp, size)); - throw QpidError(APR_ERROR + ((int) status), msg, file, line); - } -} - -std::string qpid::concurrent::get_desc(apr_status_t status){ - const int size = 50; - char tmp[size]; - return std::string(apr_strerror(status, tmp, size)); -} - diff --git a/cpp/src/qpid/concurrent/APRBase.h b/cpp/src/qpid/concurrent/APRBase.h deleted file mode 100644 index f3ff0f89c1..0000000000 --- a/cpp/src/qpid/concurrent/APRBase.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRBase_ -#define _APRBase_ - -#include <string> -#include "apr-1/apr_thread_mutex.h" -#include "apr-1/apr_errno.h" - -namespace qpid { -namespace concurrent { - - /** - * Use of APR libraries necessitates explicit init and terminate - * calls. Any class using APR libs should obtain the reference to - * this singleton and increment on construction, decrement on - * destruction. This class can then correctly initialise apr - * before the first use and terminate after the last use. - */ - class APRBase{ - static APRBase* instance; - apr_pool_t* pool; - apr_thread_mutex_t* mutex; - int count; - - APRBase(); - ~APRBase(); - static APRBase* getInstance(); - bool _increment(); - void _decrement(); - public: - static void increment(); - static void decrement(); - }; - - //this is also a convenient place for a helper function for error checking: - void check(apr_status_t status, const std::string& file, const int line); - std::string get_desc(apr_status_t status); - -#define CHECK_APR_SUCCESS(A) check(A, __FILE__, __LINE__); - -} -} - - - - -#endif diff --git a/cpp/src/qpid/concurrent/Monitor.cpp b/cpp/src/qpid/concurrent/Monitor.cpp deleted file mode 100644 index ae68cf8751..0000000000 --- a/cpp/src/qpid/concurrent/Monitor.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/Monitor.h" -#include <iostream> - -qpid::concurrent::Monitor::Monitor(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); - CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool)); -} - -qpid::concurrent::Monitor::~Monitor(){ - CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); - apr_pool_destroy(pool); - APRBase::decrement(); -} - -void qpid::concurrent::Monitor::wait(){ - CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); -} - - -void qpid::concurrent::Monitor::wait(u_int64_t time){ - apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000); - if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); -} - -void qpid::concurrent::Monitor::notify(){ - CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); -} - -void qpid::concurrent::Monitor::notifyAll(){ - CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); -} - -void qpid::concurrent::Monitor::acquire(){ - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); -} - -void qpid::concurrent::Monitor::release(){ - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); -} diff --git a/cpp/src/qpid/concurrent/Monitor.h b/cpp/src/qpid/concurrent/Monitor.h deleted file mode 100644 index a2777cb2f1..0000000000 --- a/cpp/src/qpid/concurrent/Monitor.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Monitor_ -#define _Monitor_ - -#include "apr-1/apr_thread_mutex.h" -#include "apr-1/apr_thread_cond.h" -#include "qpid/concurrent/Monitor.h" - -namespace qpid { -namespace concurrent { - -class Monitor -{ - apr_pool_t* pool; - apr_thread_mutex_t* mutex; - apr_thread_cond_t* condition; - - public: - Monitor(); - virtual ~Monitor(); - virtual void wait(); - virtual void wait(u_int64_t time); - virtual void notify(); - virtual void notifyAll(); - virtual void acquire(); - virtual void release(); -}; - -class Locker -{ - public: - Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); } - ~Locker() { monitor.release(); } - private: - Monitor& monitor; -}; -}} - - -#endif diff --git a/cpp/src/qpid/concurrent/Thread.cpp b/cpp/src/qpid/concurrent/Thread.cpp deleted file mode 100644 index 9bbc2f8131..0000000000 --- a/cpp/src/qpid/concurrent/Thread.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/Thread.h" -#include "apr-1/apr_portable.h" - -using namespace qpid::concurrent; - -void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){ - ((Runnable*) data)->run(); - CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); - return NULL; -} - -Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} - -Thread::~Thread(){ -} - -void Thread::start(){ - CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool)); -} - -void Thread::join(){ - apr_status_t status; - if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner)); -} - -void Thread::interrupt(){ - if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS)); -} - -unsigned int qpid::concurrent::Thread::currentThread(){ - return apr_os_thread_current(); -} diff --git a/cpp/src/qpid/concurrent/Thread.h b/cpp/src/qpid/concurrent/Thread.h deleted file mode 100644 index d18bc153bf..0000000000 --- a/cpp/src/qpid/concurrent/Thread.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Thread_ -#define _Thread_ - -#include "apr-1/apr_thread_proc.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/concurrent/Thread.h" - -namespace qpid { -namespace concurrent { - - class Thread - { - const Runnable* runnable; - apr_pool_t* pool; - apr_thread_t* runner; - - public: - Thread(apr_pool_t* pool, Runnable* runnable); - virtual ~Thread(); - virtual void start(); - virtual void join(); - virtual void interrupt(); - static unsigned int currentThread(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/ThreadFactory.cpp b/cpp/src/qpid/concurrent/ThreadFactory.cpp deleted file mode 100644 index b20f9f2b04..0000000000 --- a/cpp/src/qpid/concurrent/ThreadFactory.cpp +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/ThreadFactory.h" - -using namespace qpid::concurrent; - -ThreadFactory::ThreadFactory(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); -} - -ThreadFactory::~ThreadFactory(){ - apr_pool_destroy(pool); - APRBase::decrement(); -} - -Thread* ThreadFactory::create(Runnable* runnable){ - return new Thread(pool, runnable); -} diff --git a/cpp/src/qpid/concurrent/ThreadFactory.h b/cpp/src/qpid/concurrent/ThreadFactory.h deleted file mode 100644 index 572419cae6..0000000000 --- a/cpp/src/qpid/concurrent/ThreadFactory.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _ThreadFactory_ -#define _ThreadFactory_ - -#include "apr-1/apr_thread_proc.h" - -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace concurrent { - - class ThreadFactory - { - apr_pool_t* pool; - public: - ThreadFactory(); - virtual ~ThreadFactory(); - virtual Thread* create(Runnable* runnable); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/ThreadPool.cpp b/cpp/src/qpid/concurrent/ThreadPool.cpp deleted file mode 100644 index 5da19745a7..0000000000 --- a/cpp/src/qpid/concurrent/ThreadPool.cpp +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" -#include "qpid/QpidError.h" -#include <iostream> - -using namespace qpid::concurrent; - -ThreadPool::ThreadPool(int _size) : deleteFactory(true), size(_size), factory(new ThreadFactory()), running(false){ - worker = new Worker(this); -} - -ThreadPool::ThreadPool(int _size, ThreadFactory* _factory) : deleteFactory(false), size(_size), factory(_factory), running(false){ - worker = new Worker(this); -} - -ThreadPool::~ThreadPool(){ - if(deleteFactory) delete factory; -} - -void ThreadPool::addTask(Runnable* task){ - lock.acquire(); - tasks.push(task); - lock.notifyAll(); - lock.release(); -} - -void ThreadPool::runTask(){ - lock.acquire(); - while(tasks.empty()){ - lock.wait(); - } - Runnable* task = tasks.front(); - tasks.pop(); - lock.release(); - try{ - task->run(); - }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} - -void ThreadPool::start(){ - if(!running){ - running = true; - for(int i = 0; i < size; i++){ - Thread* t = factory->create(worker); - t->start(); - threads.push_back(t); - } - } -} - -void ThreadPool::stop(){ - if(!running){ - running = false; - lock.acquire(); - lock.notifyAll(); - lock.release(); - for(int i = 0; i < size; i++){ - threads[i]->join(); - delete threads[i]; - } - } -} - - diff --git a/cpp/src/qpid/concurrent/ThreadPool.h b/cpp/src/qpid/concurrent/ThreadPool.h deleted file mode 100644 index 11f0cc364f..0000000000 --- a/cpp/src/qpid/concurrent/ThreadPool.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _ThreadPool_ -#define _ThreadPool_ - -#include <queue> -#include <vector> -#include "qpid/concurrent/Monitor.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace concurrent { - - class ThreadPool - { - class Worker : public virtual Runnable{ - ThreadPool* pool; - public: - inline Worker(ThreadPool* _pool) : pool(_pool){} - inline virtual void run(){ - while(pool->running){ - pool->runTask(); - } - } - }; - const bool deleteFactory; - const int size; - ThreadFactory* factory; - Monitor lock; - std::vector<Thread*> threads; - std::queue<Runnable*> tasks; - Worker* worker; - volatile bool running; - - void runTask(); - public: - ThreadPool(int size); - ThreadPool(int size, ThreadFactory* factory); - virtual void start(); - virtual void stop(); - virtual void addTask(Runnable* task); - virtual ~ThreadPool(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/APRPool.h b/cpp/src/qpid/concurrent/Time.h index 063eedf1ee..ec64ce8a85 100644 --- a/cpp/src/qpid/io/APRPool.h +++ b/cpp/src/qpid/concurrent/Time.h @@ -1,5 +1,5 @@ -#ifndef _APRPool_ -#define _APRPool_ +#ifndef _concurrent_Time_h +#define _concurrent_Time_h /* * @@ -18,30 +18,35 @@ * limitations under the License. * */ -#include <boost/noncopyable.hpp> -#include <apr-1/apr_pools.h> + +#include <stdint.h> namespace qpid { -namespace io { +namespace concurrent { + /** - * Singleton APR memory pool. + * Time since the epoch. */ -class APRPool : private boost::noncopyable { +class Time +{ public: - APRPool(); - ~APRPool(); + static const int64_t NANOS = 1000000000; + static const int64_t MICROS = 1000000; + static const int64_t MILLIS = 1000; + + static Time now(); + + Time(int64_t nsecs_) : ticks(nsecs_) {} - /** Get singleton instance */ - static apr_pool_t* get(); + int64_t nsecs() const { return ticks; } + int64_t usecs() const { return nsecs()/1000; } + int64_t msecs() const { return usecs()/1000; } + int64_t secs() const { return msecs()/1000; } private: - apr_pool_t* pool; + int64_t ticks; }; }} - - - - -#endif /*!_APRPool_*/ +#endif /*!_concurrent_Time_h*/ diff --git a/cpp/src/qpid/io/APRPool.cpp b/cpp/src/qpid/io/APRPool.cpp deleted file mode 100644 index edd434f16c..0000000000 --- a/cpp/src/qpid/io/APRPool.cpp +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "APRPool.h" -#include "qpid/concurrent/APRBase.h" -#include <boost/pool/singleton_pool.hpp> - -using namespace qpid::io; -using namespace qpid::concurrent; - -APRPool::APRPool(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); -} - -APRPool::~APRPool(){ - apr_pool_destroy(pool); - APRBase::decrement(); -} - -apr_pool_t* APRPool::get() { - return boost::details::pool::singleton_default<APRPool>::instance().pool; -} - diff --git a/cpp/src/qpid/io/APRSocket.cpp b/cpp/src/qpid/io/APRSocket.cpp deleted file mode 100644 index 824c376c3b..0000000000 --- a/cpp/src/qpid/io/APRSocket.cpp +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRBase.h" -#include "qpid/io/APRSocket.h" -#include <assert.h> -#include <iostream> - -using namespace qpid::io; -using namespace qpid::framing; -using namespace qpid::concurrent; - -APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ - -} - -void APRSocket::read(qpid::framing::Buffer& buffer){ - apr_size_t bytes; - bytes = buffer.available(); - apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes); - buffer.move(bytes); - if(APR_STATUS_IS_TIMEUP(s)){ - //timed out - }else if(APR_STATUS_IS_EOF(s)){ - close(); - } -} - -void APRSocket::write(qpid::framing::Buffer& buffer){ - apr_size_t bytes; - do{ - bytes = buffer.available(); - apr_socket_send(socket, buffer.start(), &bytes); - buffer.move(bytes); - }while(bytes > 0); -} - -void APRSocket::close(){ - if(!closed){ - std::cout << "Closing socket " << socket << "@" << this << std::endl; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - closed = true; - } -} - -bool APRSocket::isOpen(){ - return !closed; -} - -u_int8_t APRSocket::read(){ - char data[1]; - apr_size_t bytes = 1; - apr_status_t s = apr_socket_recv(socket, data, &bytes); - if(APR_STATUS_IS_EOF(s) || bytes == 0){ - return 0; - }else{ - return *data; - } -} - -APRSocket::~APRSocket(){ -} diff --git a/cpp/src/qpid/io/APRSocket.h b/cpp/src/qpid/io/APRSocket.h deleted file mode 100644 index 0b6644dfb6..0000000000 --- a/cpp/src/qpid/io/APRSocket.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRSocket_ -#define _APRSocket_ - -#include "apr-1/apr_network_io.h" -#include "qpid/framing/Buffer.h" - -namespace qpid { -namespace io { - - class APRSocket - { - apr_socket_t* const socket; - volatile bool closed; - public: - APRSocket(apr_socket_t* socket); - void read(qpid::framing::Buffer& b); - void write(qpid::framing::Buffer& b); - void close(); - bool isOpen(); - u_int8_t read(); - ~APRSocket(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/Acceptor.cpp b/cpp/src/qpid/io/Acceptor.cpp deleted file mode 100644 index f95d9448cf..0000000000 --- a/cpp/src/qpid/io/Acceptor.cpp +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/io/Acceptor.h" -#include "qpid/concurrent/APRBase.h" -#include "APRPool.h" - -using namespace qpid::concurrent; -using namespace qpid::io; - -Acceptor::Acceptor(int16_t port_, int backlog, int threads) : - port(port_), - processor(APRPool::get(), threads, 1000, 5000000) -{ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); - CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); - CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); - CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); -} - -int16_t Acceptor::getPort() const { - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); - return address->port; -} - -void Acceptor::run(SessionHandlerFactory* factory) { - running = true; - processor.start(); - std::cout << "Listening on port " << getPort() << "..." << std::endl; - while(running){ - apr_socket_t* client; - apr_status_t status = apr_socket_accept(&client, socket, APRPool::get()); - if(status == APR_SUCCESS){ - //make this socket non-blocking: - CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); - LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false); - session->init(factory->create(session)); - }else{ - running = false; - if(status != APR_EINTR){ - std::cout << "ERROR: " << get_desc(status) << std::endl; - } - } - } - shutdown(); -} - -void Acceptor::shutdown() { - // TODO aconway 2006-10-12: Cleanup, this is not thread safe. - if (running) { - running = false; - processor.stop(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - } -} - - diff --git a/cpp/src/qpid/io/Acceptor.h b/cpp/src/qpid/io/Acceptor.h deleted file mode 100644 index bc189f7f6e..0000000000 --- a/cpp/src/qpid/io/Acceptor.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LFAcceptor_ -#define _LFAcceptor_ - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_poll.h" -#include "apr-1/apr_time.h" - -#include "qpid/io/Acceptor.h" -#include "qpid/concurrent/Monitor.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" -#include "qpid/io/LFProcessor.h" -#include "qpid/io/LFSessionContext.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandlerFactory.h" -#include "qpid/concurrent/Thread.h" -#include <qpid/SharedObject.h> - -namespace qpid { -namespace io { - -/** APR Acceptor. */ -class Acceptor : public qpid::SharedObject<Acceptor> -{ - public: - Acceptor(int16_t port, int backlog, int threads); - virtual int16_t getPort() const; - virtual void run(SessionHandlerFactory* factory); - virtual void shutdown(); - - private: - int16_t port; - LFProcessor processor; - apr_socket_t* socket; - volatile bool running; -}; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/Connector.cpp b/cpp/src/qpid/io/Connector.cpp deleted file mode 100644 index ca487deb86..0000000000 --- a/cpp/src/qpid/io/Connector.cpp +++ /dev/null @@ -1,201 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include "qpid/concurrent/APRBase.h" -#include "qpid/io/Connector.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/QpidError.h" - -using namespace qpid::io; -using namespace qpid::concurrent; -using namespace qpid::framing; -using qpid::QpidError; - -Connector::Connector(bool _debug, u_int32_t buffer_size) : - debug(_debug), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - closed(true), - lastIn(0), lastOut(0), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), - shutdownHandler(0), - inbuf(receive_buffer_size), - outbuf(send_buffer_size){ - - APRBase::increment(); - - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); - - threadFactory = new ThreadFactory(); - writeLock = new Monitor(); -} - -Connector::~Connector(){ - delete receiver; - delete writeLock; - delete threadFactory; - apr_pool_destroy(pool); - - APRBase::decrement(); -} - -void Connector::connect(const std::string& host, int port){ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); - CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); - closed = false; - - receiver = threadFactory->create(this); - receiver->start(); -} - -void Connector::init(ProtocolInitiation* header){ - writeBlock(header); - delete header; -} - -void Connector::close(){ - closed = true; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - receiver->join(); -} - -void Connector::setInputHandler(InputHandler* handler){ - input = handler; -} - -void Connector::setShutdownHandler(ShutdownHandler* handler){ - shutdownHandler = handler; -} - -OutputHandler* Connector::getOutputHandler(){ - return this; -} - -void Connector::send(AMQFrame* frame){ - writeBlock(frame); - if(debug) std::cout << "SENT: " << *frame << std::endl; - delete frame; -} - -void Connector::writeBlock(AMQDataBlock* data){ - writeLock->acquire(); - data->encode(outbuf); - - //transfer data to wire - outbuf.flip(); - writeToSocket(outbuf.start(), outbuf.available()); - outbuf.clear(); - writeLock->release(); -} - -void Connector::writeToSocket(char* data, size_t available){ - apr_size_t bytes(available); - apr_size_t written(0); - while(written < available && !closed){ - apr_status_t status = apr_socket_send(socket, data + written, &bytes); - if(status == APR_TIMEUP){ - std::cout << "Write request timed out." << std::endl; - } - if(bytes == 0){ - std::cout << "Write request wrote 0 bytes." << std::endl; - } - lastOut = apr_time_as_msec(apr_time_now()); - written += bytes; - bytes = available - written; - } -} - -void Connector::checkIdle(apr_status_t status){ - if(timeoutHandler){ - apr_time_t now = apr_time_as_msec(apr_time_now()); - if(APR_STATUS_IS_TIMEUP(status)){ - if(idleIn && (now - lastIn > idleIn)){ - timeoutHandler->idleIn(); - } - }else if(APR_STATUS_IS_EOF(status)){ - closed = true; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - if(shutdownHandler) shutdownHandler->shutdown(); - }else{ - lastIn = now; - } - if(idleOut && (now - lastOut > idleOut)){ - timeoutHandler->idleOut(); - } - } -} - -void Connector::setReadTimeout(u_int16_t t){ - idleIn = t * 1000;//t is in secs - if(idleIn && (!timeout || idleIn < timeout)){ - timeout = idleIn; - setSocketTimeout(); - } - -} - -void Connector::setWriteTimeout(u_int16_t t){ - idleOut = t * 1000;//t is in secs - if(idleOut && (!timeout || idleOut < timeout)){ - timeout = idleOut; - setSocketTimeout(); - } -} - -void Connector::setSocketTimeout(){ - //interval is in microseconds, timeout in milliseconds - //want the interval to be a bit shorter than the timeout, hence multiply - //by 800 rather than 1000. - apr_interval_time_t interval(timeout * 800); - apr_socket_timeout_set(socket, interval); -} - -void Connector::setTimeoutHandler(TimeoutHandler* handler){ - timeoutHandler = handler; -} - -void Connector::run(){ - try{ - while(!closed){ - apr_size_t bytes(inbuf.available()); - if(bytes < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes)); - - if(bytes > 0){ - inbuf.move(bytes); - inbuf.flip();//position = 0, limit = total data read - - AMQFrame frame; - while(frame.decode(inbuf)){ - if(debug) std::cout << "RECV: " << frame << std::endl; - input->received(&frame); - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); - } - } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} diff --git a/cpp/src/qpid/io/Connector.h b/cpp/src/qpid/io/Connector.h deleted file mode 100644 index 7c52f7e87b..0000000000 --- a/cpp/src/qpid/io/Connector.h +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Connector_ -#define _Connector_ - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/InputHandler.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/io/ShutdownHandler.h" -#include "qpid/io/TimeoutHandler.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/io/Connector.h" -#include "qpid/concurrent/Monitor.h" - -namespace qpid { -namespace io { - - class Connector : public virtual qpid::framing::OutputHandler, - private virtual qpid::concurrent::Runnable - { - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; - - bool closed; - - apr_time_t lastIn; - apr_time_t lastOut; - apr_interval_time_t timeout; - u_int32_t idleIn; - u_int32_t idleOut; - - TimeoutHandler* timeoutHandler; - ShutdownHandler* shutdownHandler; - qpid::framing::InputHandler* input; - qpid::framing::InitiationHandler* initialiser; - qpid::framing::OutputHandler* output; - - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; - - qpid::concurrent::Monitor* writeLock; - qpid::concurrent::ThreadFactory* threadFactory; - qpid::concurrent::Thread* receiver; - - apr_pool_t* pool; - apr_socket_t* socket; - - void checkIdle(apr_status_t status); - void writeBlock(qpid::framing::AMQDataBlock* data); - void writeToSocket(char* data, size_t available); - void setSocketTimeout(); - - void run(); - - public: - Connector(bool debug = false, u_int32_t buffer_size = 1024); - virtual ~Connector(); - virtual void connect(const std::string& host, int port); - virtual void init(qpid::framing::ProtocolInitiation* header); - virtual void close(); - virtual void setInputHandler(qpid::framing::InputHandler* handler); - virtual void setTimeoutHandler(TimeoutHandler* handler); - virtual void setShutdownHandler(ShutdownHandler* handler); - virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void setReadTimeout(u_int16_t timeout); - virtual void setWriteTimeout(u_int16_t timeout); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/LFProcessor.cpp b/cpp/src/qpid/io/LFProcessor.cpp deleted file mode 100644 index dabbdbecae..0000000000 --- a/cpp/src/qpid/io/LFProcessor.cpp +++ /dev/null @@ -1,193 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/io/LFProcessor.h" -#include "qpid/concurrent/APRBase.h" -#include "qpid/io/LFSessionContext.h" -#include "qpid/QpidError.h" -#include <sstream> - -using namespace qpid::io; -using namespace qpid::concurrent; -using qpid::QpidError; - -// TODO aconway 2006-10-12: stopped is read outside locks. -// - -LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : - size(_size), - timeout(_timeout), - signalledCount(0), - current(0), - count(0), - workerCount(_workers), - hasLeader(false), - workers(new Thread*[_workers]), - stopped(false) -{ - - CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); - //create & start the required number of threads - for(int i = 0; i < workerCount; i++){ - workers[i] = factory.create(this); - } -} - - -LFProcessor::~LFProcessor(){ - if (!stopped) stop(); - for(int i = 0; i < workerCount; i++){ - delete workers[i]; - } - delete[] workers; - CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); -} - -void LFProcessor::start(){ - for(int i = 0; i < workerCount; i++){ - workers[i]->start(); - } -} - -void LFProcessor::add(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); - countLock.acquire(); - sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data)); - count++; - countLock.release(); -} - -void LFProcessor::remove(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); - countLock.acquire(); - sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data))); - count--; - countLock.release(); -} - -void LFProcessor::reactivate(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); -} - -void LFProcessor::deactivate(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); -} - -void LFProcessor::update(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); -} - -bool LFProcessor::full(){ - Locker locker(countLock); - return count == size; -} - -bool LFProcessor::empty(){ - Locker locker(countLock); - return count == 0; -} - -void LFProcessor::poll() { - apr_status_t status = APR_EGENERAL; - do{ - current = 0; - if(!stopped){ - status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); - } - }while(status != APR_SUCCESS && !stopped); -} - -void LFProcessor::run(){ - try{ - while(!stopped){ - leadLock.acquire(); - waitToLead(); - if(!stopped){ - const apr_pollfd_t* evt = getNextEvent(); - if(evt){ - LFSessionContext* session = reinterpret_cast<LFSessionContext*>(evt->client_data); - session->startProcessing(); - - relinquishLead(); - leadLock.release(); - - //process event: - if(evt->rtnevents & APR_POLLIN) session->read(); - if(evt->rtnevents & APR_POLLOUT) session->write(); - - if(session->isClosed()){ - session->handleClose(); - countLock.acquire(); - sessions.erase(find(sessions.begin(), sessions.end(), session)); - count--; - countLock.release(); - }else{ - session->stopProcessing(); - } - - }else{ - leadLock.release(); - } - }else{ - leadLock.release(); - } - } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} - -void LFProcessor::waitToLead(){ - while(hasLeader && !stopped) leadLock.wait(); - hasLeader = !stopped; -} - -void LFProcessor::relinquishLead(){ - hasLeader = false; - leadLock.notify(); -} - -const apr_pollfd_t* LFProcessor::getNextEvent(){ - while(true){ - if(stopped){ - return 0; - }else if(current < signalledCount){ - //use result of previous poll if one is available - return signalledFDs + (current++); - }else{ - //else poll to get new events - poll(); - } - } -} - -void LFProcessor::stop(){ - stopped = true; - leadLock.acquire(); - leadLock.notifyAll(); - leadLock.release(); - - for(int i = 0; i < workerCount; i++){ - workers[i]->join(); - } - - for(iterator i = sessions.begin(); i < sessions.end(); i++){ - (*i)->shutdown(); - } -} - diff --git a/cpp/src/qpid/io/LFProcessor.h b/cpp/src/qpid/io/LFProcessor.h deleted file mode 100644 index 5b61f444af..0000000000 --- a/cpp/src/qpid/io/LFProcessor.h +++ /dev/null @@ -1,119 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LFProcessor_ -#define _LFProcessor_ - -#include "apr-1/apr_poll.h" -#include <iostream> -#include <vector> -#include "qpid/concurrent/Monitor.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace io { - - class LFSessionContext; - - /** - * This class processes a poll set using the leaders-followers - * pattern for thread synchronization: the leader will poll and on - * the poll returning, it will remove a session, promote a - * follower to leadership, then process the session. - */ - class LFProcessor : private virtual qpid::concurrent::Runnable - { - typedef std::vector<LFSessionContext*>::iterator iterator; - - const int size; - const apr_interval_time_t timeout; - apr_pollset_t* pollset; - int signalledCount; - int current; - const apr_pollfd_t* signalledFDs; - int count; - const int workerCount; - bool hasLeader; - qpid::concurrent::Thread** const workers; - qpid::concurrent::Monitor leadLock; - qpid::concurrent::Monitor countLock; - qpid::concurrent::ThreadFactory factory; - std::vector<LFSessionContext*> sessions; - volatile bool stopped; - - const apr_pollfd_t* getNextEvent(); - void waitToLead(); - void relinquishLead(); - void poll(); - virtual void run(); - - public: - LFProcessor(apr_pool_t* pool, int workers, int size, int timeout); - /** - * Add the fd to the poll set. Relies on the client_data being - * an instance of LFSessionContext. - */ - void add(const apr_pollfd_t* const fd); - /** - * Remove the fd from the poll set. - */ - void remove(const apr_pollfd_t* const fd); - /** - * Signal that the fd passed in, already part of the pollset, - * has had its flags altered. - */ - void update(const apr_pollfd_t* const fd); - /** - * Add an fd back to the poll set after deactivation. - */ - void reactivate(const apr_pollfd_t* const fd); - /** - * Temporarily remove the fd from the poll set. Called when processing - * is about to begin. - */ - void deactivate(const apr_pollfd_t* const fd); - /** - * Indicates whether the capacity of this processor has been - * reached (or whether it can still handle further fd's). - */ - bool full(); - /** - * Indicates whether there are any fd's registered. - */ - bool empty(); - /** - * Stop processing. - */ - void stop(); - /** - * Start processing. - */ - void start(); - /** - * Is processing stopped? - */ - bool isStopped(); - - ~LFProcessor(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/LFSessionContext.cpp b/cpp/src/qpid/io/LFSessionContext.cpp deleted file mode 100644 index ca1e6431a6..0000000000 --- a/cpp/src/qpid/io/LFSessionContext.cpp +++ /dev/null @@ -1,189 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/io/LFSessionContext.h" -#include "qpid/concurrent/APRBase.h" -#include "qpid/QpidError.h" -#include <assert.h> - -using namespace qpid::concurrent; -using namespace qpid::io; -using namespace qpid::framing; - -LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, - LFProcessor* const _processor, - bool _debug) : - debug(_debug), - socket(_socket), - initiated(false), - in(32768), - out(32768), - processor(_processor), - processing(false), - closing(false), - reading(0), - writing(0) -{ - - fd.p = _pool; - fd.desc_type = APR_POLL_SOCKET; - fd.reqevents = APR_POLLIN; - fd.client_data = this; - fd.desc.s = _socket; - - out.flip(); -} - -LFSessionContext::~LFSessionContext(){ - -} - -void LFSessionContext::read(){ - assert(!reading); // No concurrent read. - reading = Thread::currentThread(); - - socket.read(in); - in.flip(); - if(initiated){ - AMQFrame frame; - while(frame.decode(in)){ - if(debug) log("RECV", &frame); - handler->received(&frame); - } - }else{ - ProtocolInitiation protocolInit; - if(protocolInit.decode(in)){ - handler->initiated(&protocolInit); - initiated = true; - if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; - } - } - in.compact(); - - reading = 0; -} - -void LFSessionContext::write(){ - assert(!writing); // No concurrent writes. - writing = Thread::currentThread(); - - bool done = isClosed(); - while(!done){ - if(out.available() > 0){ - socket.write(out); - if(out.available() > 0){ - writing = 0; - - //incomplete write, leave flags to receive notification of readiness to write - done = true;//finished processing for now, but write is still in progress - } - }else{ - //do we have any frames to write? - writeLock.acquire(); - if(!framesToWrite.empty()){ - out.clear(); - bool encoded(false); - AMQFrame* frame = framesToWrite.front(); - while(frame && out.available() >= frame->size()){ - encoded = true; - frame->encode(out); - if(debug) log("SENT", frame); - delete frame; - framesToWrite.pop(); - frame = framesToWrite.empty() ? 0 : framesToWrite.front(); - } - if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); - out.flip(); - }else{ - //reset flags, don't care about writability anymore - fd.reqevents = APR_POLLIN; - done = true; - - writing = 0; - - if(closing){ - socket.close(); - } - } - writeLock.release(); - } - } -} - -void LFSessionContext::send(AMQFrame* frame){ - writeLock.acquire(); - if(!closing){ - framesToWrite.push(frame); - if(!(fd.reqevents & APR_POLLOUT)){ - fd.reqevents |= APR_POLLOUT; - if(!processing){ - processor->update(&fd); - } - } - } - writeLock.release(); -} - -void LFSessionContext::startProcessing(){ - writeLock.acquire(); - processing = true; - processor->deactivate(&fd); - writeLock.release(); -} - -void LFSessionContext::stopProcessing(){ - writeLock.acquire(); - processor->reactivate(&fd); - processing = false; - writeLock.release(); -} - -void LFSessionContext::close(){ - closing = true; - writeLock.acquire(); - if(!processing){ - //allow pending frames to be written to socket - fd.reqevents = APR_POLLOUT; - processor->update(&fd); - } - writeLock.release(); -} - -void LFSessionContext::handleClose(){ - handler->closed(); - std::cout << "Session closed [" << &socket << "]" << std::endl; - delete handler; - delete this; -} - -void LFSessionContext::shutdown(){ - socket.close(); - handleClose(); -} - -void LFSessionContext::init(SessionHandler* _handler){ - handler = _handler; - processor->add(&fd); -} - -void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ - logLock.acquire(); - std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; - logLock.release(); -} - -Monitor LFSessionContext::logLock; diff --git a/cpp/src/qpid/io/LFSessionContext.h b/cpp/src/qpid/io/LFSessionContext.h deleted file mode 100644 index 8d30b54204..0000000000 --- a/cpp/src/qpid/io/LFSessionContext.h +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LFSessionContext_ -#define _LFSessionContext_ - -#include <queue> - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_poll.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/AMQFrame.h" -#include "qpid/concurrent/Monitor.h" -#include "qpid/io/APRSocket.h" -#include "qpid/framing/Buffer.h" -#include "qpid/io/LFProcessor.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandler.h" - -namespace qpid { -namespace io { - - - class LFSessionContext : public virtual SessionContext - { - const bool debug; - APRSocket socket; - bool initiated; - - qpid::framing::Buffer in; - qpid::framing::Buffer out; - - SessionHandler* handler; - LFProcessor* const processor; - - apr_pollfd_t fd; - - std::queue<qpid::framing::AMQFrame*> framesToWrite; - qpid::concurrent::Monitor writeLock; - - bool processing; - bool closing; - - //these are just for debug, as a crude way of detecting concurrent access - volatile unsigned int reading; - volatile unsigned int writing; - - static qpid::concurrent::Monitor logLock; - void log(const std::string& desc, qpid::framing::AMQFrame* const frame); - - public: - LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, - LFProcessor* const processor, - bool debug = false); - ~LFSessionContext(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void close(); - void read(); - void write(); - void init(SessionHandler* handler); - void startProcessing(); - void stopProcessing(); - void handleClose(); - void shutdown(); - inline apr_pollfd_t* const getFd(){ return &fd; } - inline bool isClosed(){ return !socket.isOpen(); } - }; - -} -} - - -#endif |