summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-11-01 01:19:12 +0000
committerAlan Conway <aconway@apache.org>2006-11-01 01:19:12 +0000
commitdda71d21e76e01918ebec2d80dd8e077f94216e0 (patch)
tree79283c295e00de1eee8d98d4fd9b781db8497c28 /cpp/src
parent9094d2b10ecadd66fa3b22169183e7573cc79629 (diff)
downloadqpid-python-dda71d21e76e01918ebec2d80dd8e077f94216e0.tar.gz
Moved APR specific sources into src_apr.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469738 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp6
-rw-r--r--cpp/src/qpid/broker/Queue.h4
-rw-r--r--cpp/src/qpid/concurrent/APRBase.cpp96
-rw-r--r--cpp/src/qpid/concurrent/APRBase.h63
-rw-r--r--cpp/src/qpid/concurrent/Monitor.cpp60
-rw-r--r--cpp/src/qpid/concurrent/Monitor.h56
-rw-r--r--cpp/src/qpid/concurrent/Thread.cpp50
-rw-r--r--cpp/src/qpid/concurrent/Thread.h48
-rw-r--r--cpp/src/qpid/concurrent/ThreadFactory.cpp35
-rw-r--r--cpp/src/qpid/concurrent/ThreadFactory.h44
-rw-r--r--cpp/src/qpid/concurrent/ThreadPool.cpp83
-rw-r--r--cpp/src/qpid/concurrent/ThreadPool.h67
-rw-r--r--cpp/src/qpid/concurrent/Time.h (renamed from cpp/src/qpid/io/APRPool.h)39
-rw-r--r--cpp/src/qpid/io/APRPool.cpp39
-rw-r--r--cpp/src/qpid/io/APRSocket.cpp76
-rw-r--r--cpp/src/qpid/io/APRSocket.h45
-rw-r--r--cpp/src/qpid/io/Acceptor.cpp78
-rw-r--r--cpp/src/qpid/io/Acceptor.h60
-rw-r--r--cpp/src/qpid/io/Connector.cpp201
-rw-r--r--cpp/src/qpid/io/Connector.h95
-rw-r--r--cpp/src/qpid/io/LFProcessor.cpp193
-rw-r--r--cpp/src/qpid/io/LFProcessor.h119
-rw-r--r--cpp/src/qpid/io/LFSessionContext.cpp189
-rw-r--r--cpp/src/qpid/io/LFSessionContext.h88
24 files changed, 27 insertions, 1807 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 88dad7aaf9..d671cea9a5 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -38,7 +38,7 @@ Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete,
lastUsed(0),
exclusive(0)
{
- if(autodelete) lastUsed = apr_time_as_msec(apr_time_now());
+ if(autodelete) lastUsed = Time::now().msecs();
}
Queue::~Queue(){
@@ -128,7 +128,7 @@ void Queue::consume(Consumer* c, bool requestExclusive){
void Queue::cancel(Consumer* c){
Locker locker(lock);
consumers.erase(find(consumers.begin(), consumers.end(), c));
- if(autodelete && consumers.empty()) lastUsed = apr_time_as_msec(apr_time_now());
+ if(autodelete && consumers.empty()) lastUsed = Time::now().msecs();
if(exclusive == c) exclusive = 0;
}
@@ -161,7 +161,7 @@ u_int32_t Queue::getConsumerCount() const{
bool Queue::canAutoDelete() const{
Locker locker(lock);
- return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete);
+ return lastUsed && (Time::now().msecs() - lastUsed > autodelete);
}
void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index f954e48c20..edc7c99b4f 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -21,13 +21,13 @@
#include <vector>
#include <queue>
#include <boost/shared_ptr.hpp>
-#include "apr-1/apr_time.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/broker/Binding.h"
#include "qpid/broker/ConnectionToken.h"
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Message.h"
#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/Time.h"
namespace qpid {
namespace broker {
@@ -57,7 +57,7 @@ namespace qpid {
bool dispatching;
int next;
mutable qpid::concurrent::Monitor lock;
- apr_time_t lastUsed;
+ int64_t lastUsed;
Consumer* exclusive;
bool startDispatching();
diff --git a/cpp/src/qpid/concurrent/APRBase.cpp b/cpp/src/qpid/concurrent/APRBase.cpp
deleted file mode 100644
index 514c4d1048..0000000000
--- a/cpp/src/qpid/concurrent/APRBase.cpp
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include "qpid/concurrent/APRBase.h"
-#include "qpid/QpidError.h"
-
-using namespace qpid::concurrent;
-
-APRBase* APRBase::instance = 0;
-
-APRBase* APRBase::getInstance(){
- if(instance == 0){
- instance = new APRBase();
- }
- return instance;
-}
-
-
-APRBase::APRBase() : count(0){
- apr_initialize();
- CHECK_APR_SUCCESS(apr_pool_create(&pool, 0));
- CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
-}
-
-APRBase::~APRBase(){
- CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
- apr_pool_destroy(pool);
- apr_terminate();
-}
-
-bool APRBase::_increment(){
- bool deleted(false);
- CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
- if(this == instance){
- count++;
- }else{
- deleted = true;
- }
- CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
- return !deleted;
-}
-
-void APRBase::_decrement(){
- APRBase* copy = 0;
- CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
- if(--count == 0){
- copy = instance;
- instance = 0;
- }
- CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
- if(copy != 0){
- delete copy;
- }
-}
-
-void APRBase::increment(){
- int count = 0;
- while(count++ < 2 && !getInstance()->_increment()){
- std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl;
- }
-}
-
-void APRBase::decrement(){
- getInstance()->_decrement();
-}
-
-void qpid::concurrent::check(apr_status_t status, const std::string& file, const int line){
- if (status != APR_SUCCESS){
- const int size = 50;
- char tmp[size];
- std::string msg(apr_strerror(status, tmp, size));
- throw QpidError(APR_ERROR + ((int) status), msg, file, line);
- }
-}
-
-std::string qpid::concurrent::get_desc(apr_status_t status){
- const int size = 50;
- char tmp[size];
- return std::string(apr_strerror(status, tmp, size));
-}
-
diff --git a/cpp/src/qpid/concurrent/APRBase.h b/cpp/src/qpid/concurrent/APRBase.h
deleted file mode 100644
index f3ff0f89c1..0000000000
--- a/cpp/src/qpid/concurrent/APRBase.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _APRBase_
-#define _APRBase_
-
-#include <string>
-#include "apr-1/apr_thread_mutex.h"
-#include "apr-1/apr_errno.h"
-
-namespace qpid {
-namespace concurrent {
-
- /**
- * Use of APR libraries necessitates explicit init and terminate
- * calls. Any class using APR libs should obtain the reference to
- * this singleton and increment on construction, decrement on
- * destruction. This class can then correctly initialise apr
- * before the first use and terminate after the last use.
- */
- class APRBase{
- static APRBase* instance;
- apr_pool_t* pool;
- apr_thread_mutex_t* mutex;
- int count;
-
- APRBase();
- ~APRBase();
- static APRBase* getInstance();
- bool _increment();
- void _decrement();
- public:
- static void increment();
- static void decrement();
- };
-
- //this is also a convenient place for a helper function for error checking:
- void check(apr_status_t status, const std::string& file, const int line);
- std::string get_desc(apr_status_t status);
-
-#define CHECK_APR_SUCCESS(A) check(A, __FILE__, __LINE__);
-
-}
-}
-
-
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/Monitor.cpp b/cpp/src/qpid/concurrent/Monitor.cpp
deleted file mode 100644
index ae68cf8751..0000000000
--- a/cpp/src/qpid/concurrent/Monitor.cpp
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/concurrent/APRBase.h"
-#include "qpid/concurrent/Monitor.h"
-#include <iostream>
-
-qpid::concurrent::Monitor::Monitor(){
- APRBase::increment();
- CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
- CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
- CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool));
-}
-
-qpid::concurrent::Monitor::~Monitor(){
- CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
- CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
- apr_pool_destroy(pool);
- APRBase::decrement();
-}
-
-void qpid::concurrent::Monitor::wait(){
- CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
-}
-
-
-void qpid::concurrent::Monitor::wait(u_int64_t time){
- apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000);
- if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);
-}
-
-void qpid::concurrent::Monitor::notify(){
- CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
-}
-
-void qpid::concurrent::Monitor::notifyAll(){
- CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
-}
-
-void qpid::concurrent::Monitor::acquire(){
- CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
-}
-
-void qpid::concurrent::Monitor::release(){
- CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
-}
diff --git a/cpp/src/qpid/concurrent/Monitor.h b/cpp/src/qpid/concurrent/Monitor.h
deleted file mode 100644
index a2777cb2f1..0000000000
--- a/cpp/src/qpid/concurrent/Monitor.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _Monitor_
-#define _Monitor_
-
-#include "apr-1/apr_thread_mutex.h"
-#include "apr-1/apr_thread_cond.h"
-#include "qpid/concurrent/Monitor.h"
-
-namespace qpid {
-namespace concurrent {
-
-class Monitor
-{
- apr_pool_t* pool;
- apr_thread_mutex_t* mutex;
- apr_thread_cond_t* condition;
-
- public:
- Monitor();
- virtual ~Monitor();
- virtual void wait();
- virtual void wait(u_int64_t time);
- virtual void notify();
- virtual void notifyAll();
- virtual void acquire();
- virtual void release();
-};
-
-class Locker
-{
- public:
- Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); }
- ~Locker() { monitor.release(); }
- private:
- Monitor& monitor;
-};
-}}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/Thread.cpp b/cpp/src/qpid/concurrent/Thread.cpp
deleted file mode 100644
index 9bbc2f8131..0000000000
--- a/cpp/src/qpid/concurrent/Thread.cpp
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/concurrent/APRBase.h"
-#include "qpid/concurrent/Thread.h"
-#include "apr-1/apr_portable.h"
-
-using namespace qpid::concurrent;
-
-void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){
- ((Runnable*) data)->run();
- CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS));
- return NULL;
-}
-
-Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {}
-
-Thread::~Thread(){
-}
-
-void Thread::start(){
- CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool));
-}
-
-void Thread::join(){
- apr_status_t status;
- if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner));
-}
-
-void Thread::interrupt(){
- if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS));
-}
-
-unsigned int qpid::concurrent::Thread::currentThread(){
- return apr_os_thread_current();
-}
diff --git a/cpp/src/qpid/concurrent/Thread.h b/cpp/src/qpid/concurrent/Thread.h
deleted file mode 100644
index d18bc153bf..0000000000
--- a/cpp/src/qpid/concurrent/Thread.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _Thread_
-#define _Thread_
-
-#include "apr-1/apr_thread_proc.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/Runnable.h"
-#include "qpid/concurrent/Thread.h"
-
-namespace qpid {
-namespace concurrent {
-
- class Thread
- {
- const Runnable* runnable;
- apr_pool_t* pool;
- apr_thread_t* runner;
-
- public:
- Thread(apr_pool_t* pool, Runnable* runnable);
- virtual ~Thread();
- virtual void start();
- virtual void join();
- virtual void interrupt();
- static unsigned int currentThread();
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/ThreadFactory.cpp b/cpp/src/qpid/concurrent/ThreadFactory.cpp
deleted file mode 100644
index b20f9f2b04..0000000000
--- a/cpp/src/qpid/concurrent/ThreadFactory.cpp
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/concurrent/APRBase.h"
-#include "qpid/concurrent/ThreadFactory.h"
-
-using namespace qpid::concurrent;
-
-ThreadFactory::ThreadFactory(){
- APRBase::increment();
- CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
-}
-
-ThreadFactory::~ThreadFactory(){
- apr_pool_destroy(pool);
- APRBase::decrement();
-}
-
-Thread* ThreadFactory::create(Runnable* runnable){
- return new Thread(pool, runnable);
-}
diff --git a/cpp/src/qpid/concurrent/ThreadFactory.h b/cpp/src/qpid/concurrent/ThreadFactory.h
deleted file mode 100644
index 572419cae6..0000000000
--- a/cpp/src/qpid/concurrent/ThreadFactory.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _ThreadFactory_
-#define _ThreadFactory_
-
-#include "apr-1/apr_thread_proc.h"
-
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/concurrent/Runnable.h"
-
-namespace qpid {
-namespace concurrent {
-
- class ThreadFactory
- {
- apr_pool_t* pool;
- public:
- ThreadFactory();
- virtual ~ThreadFactory();
- virtual Thread* create(Runnable* runnable);
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/ThreadPool.cpp b/cpp/src/qpid/concurrent/ThreadPool.cpp
deleted file mode 100644
index 5da19745a7..0000000000
--- a/cpp/src/qpid/concurrent/ThreadPool.cpp
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/concurrent/ThreadPool.h"
-#include "qpid/QpidError.h"
-#include <iostream>
-
-using namespace qpid::concurrent;
-
-ThreadPool::ThreadPool(int _size) : deleteFactory(true), size(_size), factory(new ThreadFactory()), running(false){
- worker = new Worker(this);
-}
-
-ThreadPool::ThreadPool(int _size, ThreadFactory* _factory) : deleteFactory(false), size(_size), factory(_factory), running(false){
- worker = new Worker(this);
-}
-
-ThreadPool::~ThreadPool(){
- if(deleteFactory) delete factory;
-}
-
-void ThreadPool::addTask(Runnable* task){
- lock.acquire();
- tasks.push(task);
- lock.notifyAll();
- lock.release();
-}
-
-void ThreadPool::runTask(){
- lock.acquire();
- while(tasks.empty()){
- lock.wait();
- }
- Runnable* task = tasks.front();
- tasks.pop();
- lock.release();
- try{
- task->run();
- }catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
- }
-}
-
-void ThreadPool::start(){
- if(!running){
- running = true;
- for(int i = 0; i < size; i++){
- Thread* t = factory->create(worker);
- t->start();
- threads.push_back(t);
- }
- }
-}
-
-void ThreadPool::stop(){
- if(!running){
- running = false;
- lock.acquire();
- lock.notifyAll();
- lock.release();
- for(int i = 0; i < size; i++){
- threads[i]->join();
- delete threads[i];
- }
- }
-}
-
-
diff --git a/cpp/src/qpid/concurrent/ThreadPool.h b/cpp/src/qpid/concurrent/ThreadPool.h
deleted file mode 100644
index 11f0cc364f..0000000000
--- a/cpp/src/qpid/concurrent/ThreadPool.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _ThreadPool_
-#define _ThreadPool_
-
-#include <queue>
-#include <vector>
-#include "qpid/concurrent/Monitor.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/concurrent/ThreadPool.h"
-#include "qpid/concurrent/Runnable.h"
-
-namespace qpid {
-namespace concurrent {
-
- class ThreadPool
- {
- class Worker : public virtual Runnable{
- ThreadPool* pool;
- public:
- inline Worker(ThreadPool* _pool) : pool(_pool){}
- inline virtual void run(){
- while(pool->running){
- pool->runTask();
- }
- }
- };
- const bool deleteFactory;
- const int size;
- ThreadFactory* factory;
- Monitor lock;
- std::vector<Thread*> threads;
- std::queue<Runnable*> tasks;
- Worker* worker;
- volatile bool running;
-
- void runTask();
- public:
- ThreadPool(int size);
- ThreadPool(int size, ThreadFactory* factory);
- virtual void start();
- virtual void stop();
- virtual void addTask(Runnable* task);
- virtual ~ThreadPool();
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/APRPool.h b/cpp/src/qpid/concurrent/Time.h
index 063eedf1ee..ec64ce8a85 100644
--- a/cpp/src/qpid/io/APRPool.h
+++ b/cpp/src/qpid/concurrent/Time.h
@@ -1,5 +1,5 @@
-#ifndef _APRPool_
-#define _APRPool_
+#ifndef _concurrent_Time_h
+#define _concurrent_Time_h
/*
*
@@ -18,30 +18,35 @@
* limitations under the License.
*
*/
-#include <boost/noncopyable.hpp>
-#include <apr-1/apr_pools.h>
+
+#include <stdint.h>
namespace qpid {
-namespace io {
+namespace concurrent {
+
/**
- * Singleton APR memory pool.
+ * Time since the epoch.
*/
-class APRPool : private boost::noncopyable {
+class Time
+{
public:
- APRPool();
- ~APRPool();
+ static const int64_t NANOS = 1000000000;
+ static const int64_t MICROS = 1000000;
+ static const int64_t MILLIS = 1000;
+
+ static Time now();
+
+ Time(int64_t nsecs_) : ticks(nsecs_) {}
- /** Get singleton instance */
- static apr_pool_t* get();
+ int64_t nsecs() const { return ticks; }
+ int64_t usecs() const { return nsecs()/1000; }
+ int64_t msecs() const { return usecs()/1000; }
+ int64_t secs() const { return msecs()/1000; }
private:
- apr_pool_t* pool;
+ int64_t ticks;
};
}}
-
-
-
-
-#endif /*!_APRPool_*/
+#endif /*!_concurrent_Time_h*/
diff --git a/cpp/src/qpid/io/APRPool.cpp b/cpp/src/qpid/io/APRPool.cpp
deleted file mode 100644
index edd434f16c..0000000000
--- a/cpp/src/qpid/io/APRPool.cpp
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "APRPool.h"
-#include "qpid/concurrent/APRBase.h"
-#include <boost/pool/singleton_pool.hpp>
-
-using namespace qpid::io;
-using namespace qpid::concurrent;
-
-APRPool::APRPool(){
- APRBase::increment();
- CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
-}
-
-APRPool::~APRPool(){
- apr_pool_destroy(pool);
- APRBase::decrement();
-}
-
-apr_pool_t* APRPool::get() {
- return boost::details::pool::singleton_default<APRPool>::instance().pool;
-}
-
diff --git a/cpp/src/qpid/io/APRSocket.cpp b/cpp/src/qpid/io/APRSocket.cpp
deleted file mode 100644
index 824c376c3b..0000000000
--- a/cpp/src/qpid/io/APRSocket.cpp
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/concurrent/APRBase.h"
-#include "qpid/io/APRSocket.h"
-#include <assert.h>
-#include <iostream>
-
-using namespace qpid::io;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){
-
-}
-
-void APRSocket::read(qpid::framing::Buffer& buffer){
- apr_size_t bytes;
- bytes = buffer.available();
- apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes);
- buffer.move(bytes);
- if(APR_STATUS_IS_TIMEUP(s)){
- //timed out
- }else if(APR_STATUS_IS_EOF(s)){
- close();
- }
-}
-
-void APRSocket::write(qpid::framing::Buffer& buffer){
- apr_size_t bytes;
- do{
- bytes = buffer.available();
- apr_socket_send(socket, buffer.start(), &bytes);
- buffer.move(bytes);
- }while(bytes > 0);
-}
-
-void APRSocket::close(){
- if(!closed){
- std::cout << "Closing socket " << socket << "@" << this << std::endl;
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- closed = true;
- }
-}
-
-bool APRSocket::isOpen(){
- return !closed;
-}
-
-u_int8_t APRSocket::read(){
- char data[1];
- apr_size_t bytes = 1;
- apr_status_t s = apr_socket_recv(socket, data, &bytes);
- if(APR_STATUS_IS_EOF(s) || bytes == 0){
- return 0;
- }else{
- return *data;
- }
-}
-
-APRSocket::~APRSocket(){
-}
diff --git a/cpp/src/qpid/io/APRSocket.h b/cpp/src/qpid/io/APRSocket.h
deleted file mode 100644
index 0b6644dfb6..0000000000
--- a/cpp/src/qpid/io/APRSocket.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _APRSocket_
-#define _APRSocket_
-
-#include "apr-1/apr_network_io.h"
-#include "qpid/framing/Buffer.h"
-
-namespace qpid {
-namespace io {
-
- class APRSocket
- {
- apr_socket_t* const socket;
- volatile bool closed;
- public:
- APRSocket(apr_socket_t* socket);
- void read(qpid::framing::Buffer& b);
- void write(qpid::framing::Buffer& b);
- void close();
- bool isOpen();
- u_int8_t read();
- ~APRSocket();
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/Acceptor.cpp b/cpp/src/qpid/io/Acceptor.cpp
deleted file mode 100644
index f95d9448cf..0000000000
--- a/cpp/src/qpid/io/Acceptor.cpp
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/io/Acceptor.h"
-#include "qpid/concurrent/APRBase.h"
-#include "APRPool.h"
-
-using namespace qpid::concurrent;
-using namespace qpid::io;
-
-Acceptor::Acceptor(int16_t port_, int backlog, int threads) :
- port(port_),
- processor(APRPool::get(), threads, 1000, 5000000)
-{
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get()));
- CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
- CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
- CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
-}
-
-int16_t Acceptor::getPort() const {
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
- return address->port;
-}
-
-void Acceptor::run(SessionHandlerFactory* factory) {
- running = true;
- processor.start();
- std::cout << "Listening on port " << getPort() << "..." << std::endl;
- while(running){
- apr_socket_t* client;
- apr_status_t status = apr_socket_accept(&client, socket, APRPool::get());
- if(status == APR_SUCCESS){
- //make this socket non-blocking:
- CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
- LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false);
- session->init(factory->create(session));
- }else{
- running = false;
- if(status != APR_EINTR){
- std::cout << "ERROR: " << get_desc(status) << std::endl;
- }
- }
- }
- shutdown();
-}
-
-void Acceptor::shutdown() {
- // TODO aconway 2006-10-12: Cleanup, this is not thread safe.
- if (running) {
- running = false;
- processor.stop();
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- }
-}
-
-
diff --git a/cpp/src/qpid/io/Acceptor.h b/cpp/src/qpid/io/Acceptor.h
deleted file mode 100644
index bc189f7f6e..0000000000
--- a/cpp/src/qpid/io/Acceptor.h
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _LFAcceptor_
-#define _LFAcceptor_
-
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_poll.h"
-#include "apr-1/apr_time.h"
-
-#include "qpid/io/Acceptor.h"
-#include "qpid/concurrent/Monitor.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/concurrent/ThreadPool.h"
-#include "qpid/io/LFProcessor.h"
-#include "qpid/io/LFSessionContext.h"
-#include "qpid/concurrent/Runnable.h"
-#include "qpid/io/SessionContext.h"
-#include "qpid/io/SessionHandlerFactory.h"
-#include "qpid/concurrent/Thread.h"
-#include <qpid/SharedObject.h>
-
-namespace qpid {
-namespace io {
-
-/** APR Acceptor. */
-class Acceptor : public qpid::SharedObject<Acceptor>
-{
- public:
- Acceptor(int16_t port, int backlog, int threads);
- virtual int16_t getPort() const;
- virtual void run(SessionHandlerFactory* factory);
- virtual void shutdown();
-
- private:
- int16_t port;
- LFProcessor processor;
- apr_socket_t* socket;
- volatile bool running;
-};
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/Connector.cpp b/cpp/src/qpid/io/Connector.cpp
deleted file mode 100644
index ca487deb86..0000000000
--- a/cpp/src/qpid/io/Connector.cpp
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include "qpid/concurrent/APRBase.h"
-#include "qpid/io/Connector.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/QpidError.h"
-
-using namespace qpid::io;
-using namespace qpid::concurrent;
-using namespace qpid::framing;
-using qpid::QpidError;
-
-Connector::Connector(bool _debug, u_int32_t buffer_size) :
- debug(_debug),
- receive_buffer_size(buffer_size),
- send_buffer_size(buffer_size),
- closed(true),
- lastIn(0), lastOut(0),
- timeout(0),
- idleIn(0), idleOut(0),
- timeoutHandler(0),
- shutdownHandler(0),
- inbuf(receive_buffer_size),
- outbuf(send_buffer_size){
-
- APRBase::increment();
-
- CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
-
- threadFactory = new ThreadFactory();
- writeLock = new Monitor();
-}
-
-Connector::~Connector(){
- delete receiver;
- delete writeLock;
- delete threadFactory;
- apr_pool_destroy(pool);
-
- APRBase::decrement();
-}
-
-void Connector::connect(const std::string& host, int port){
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
- CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
- closed = false;
-
- receiver = threadFactory->create(this);
- receiver->start();
-}
-
-void Connector::init(ProtocolInitiation* header){
- writeBlock(header);
- delete header;
-}
-
-void Connector::close(){
- closed = true;
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- receiver->join();
-}
-
-void Connector::setInputHandler(InputHandler* handler){
- input = handler;
-}
-
-void Connector::setShutdownHandler(ShutdownHandler* handler){
- shutdownHandler = handler;
-}
-
-OutputHandler* Connector::getOutputHandler(){
- return this;
-}
-
-void Connector::send(AMQFrame* frame){
- writeBlock(frame);
- if(debug) std::cout << "SENT: " << *frame << std::endl;
- delete frame;
-}
-
-void Connector::writeBlock(AMQDataBlock* data){
- writeLock->acquire();
- data->encode(outbuf);
-
- //transfer data to wire
- outbuf.flip();
- writeToSocket(outbuf.start(), outbuf.available());
- outbuf.clear();
- writeLock->release();
-}
-
-void Connector::writeToSocket(char* data, size_t available){
- apr_size_t bytes(available);
- apr_size_t written(0);
- while(written < available && !closed){
- apr_status_t status = apr_socket_send(socket, data + written, &bytes);
- if(status == APR_TIMEUP){
- std::cout << "Write request timed out." << std::endl;
- }
- if(bytes == 0){
- std::cout << "Write request wrote 0 bytes." << std::endl;
- }
- lastOut = apr_time_as_msec(apr_time_now());
- written += bytes;
- bytes = available - written;
- }
-}
-
-void Connector::checkIdle(apr_status_t status){
- if(timeoutHandler){
- apr_time_t now = apr_time_as_msec(apr_time_now());
- if(APR_STATUS_IS_TIMEUP(status)){
- if(idleIn && (now - lastIn > idleIn)){
- timeoutHandler->idleIn();
- }
- }else if(APR_STATUS_IS_EOF(status)){
- closed = true;
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- if(shutdownHandler) shutdownHandler->shutdown();
- }else{
- lastIn = now;
- }
- if(idleOut && (now - lastOut > idleOut)){
- timeoutHandler->idleOut();
- }
- }
-}
-
-void Connector::setReadTimeout(u_int16_t t){
- idleIn = t * 1000;//t is in secs
- if(idleIn && (!timeout || idleIn < timeout)){
- timeout = idleIn;
- setSocketTimeout();
- }
-
-}
-
-void Connector::setWriteTimeout(u_int16_t t){
- idleOut = t * 1000;//t is in secs
- if(idleOut && (!timeout || idleOut < timeout)){
- timeout = idleOut;
- setSocketTimeout();
- }
-}
-
-void Connector::setSocketTimeout(){
- //interval is in microseconds, timeout in milliseconds
- //want the interval to be a bit shorter than the timeout, hence multiply
- //by 800 rather than 1000.
- apr_interval_time_t interval(timeout * 800);
- apr_socket_timeout_set(socket, interval);
-}
-
-void Connector::setTimeoutHandler(TimeoutHandler* handler){
- timeoutHandler = handler;
-}
-
-void Connector::run(){
- try{
- while(!closed){
- apr_size_t bytes(inbuf.available());
- if(bytes < 1){
- THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
- }
- checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes));
-
- if(bytes > 0){
- inbuf.move(bytes);
- inbuf.flip();//position = 0, limit = total data read
-
- AMQFrame frame;
- while(frame.decode(inbuf)){
- if(debug) std::cout << "RECV: " << frame << std::endl;
- input->received(&frame);
- }
- //need to compact buffer to preserve any 'extra' data
- inbuf.compact();
- }
- }
- }catch(QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
- }
-}
diff --git a/cpp/src/qpid/io/Connector.h b/cpp/src/qpid/io/Connector.h
deleted file mode 100644
index 7c52f7e87b..0000000000
--- a/cpp/src/qpid/io/Connector.h
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _Connector_
-#define _Connector_
-
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_time.h"
-
-#include "qpid/framing/InputHandler.h"
-#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/InitiationHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/io/ShutdownHandler.h"
-#include "qpid/io/TimeoutHandler.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/io/Connector.h"
-#include "qpid/concurrent/Monitor.h"
-
-namespace qpid {
-namespace io {
-
- class Connector : public virtual qpid::framing::OutputHandler,
- private virtual qpid::concurrent::Runnable
- {
- const bool debug;
- const int receive_buffer_size;
- const int send_buffer_size;
-
- bool closed;
-
- apr_time_t lastIn;
- apr_time_t lastOut;
- apr_interval_time_t timeout;
- u_int32_t idleIn;
- u_int32_t idleOut;
-
- TimeoutHandler* timeoutHandler;
- ShutdownHandler* shutdownHandler;
- qpid::framing::InputHandler* input;
- qpid::framing::InitiationHandler* initialiser;
- qpid::framing::OutputHandler* output;
-
- qpid::framing::Buffer inbuf;
- qpid::framing::Buffer outbuf;
-
- qpid::concurrent::Monitor* writeLock;
- qpid::concurrent::ThreadFactory* threadFactory;
- qpid::concurrent::Thread* receiver;
-
- apr_pool_t* pool;
- apr_socket_t* socket;
-
- void checkIdle(apr_status_t status);
- void writeBlock(qpid::framing::AMQDataBlock* data);
- void writeToSocket(char* data, size_t available);
- void setSocketTimeout();
-
- void run();
-
- public:
- Connector(bool debug = false, u_int32_t buffer_size = 1024);
- virtual ~Connector();
- virtual void connect(const std::string& host, int port);
- virtual void init(qpid::framing::ProtocolInitiation* header);
- virtual void close();
- virtual void setInputHandler(qpid::framing::InputHandler* handler);
- virtual void setTimeoutHandler(TimeoutHandler* handler);
- virtual void setShutdownHandler(ShutdownHandler* handler);
- virtual qpid::framing::OutputHandler* getOutputHandler();
- virtual void send(qpid::framing::AMQFrame* frame);
- virtual void setReadTimeout(u_int16_t timeout);
- virtual void setWriteTimeout(u_int16_t timeout);
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/LFProcessor.cpp b/cpp/src/qpid/io/LFProcessor.cpp
deleted file mode 100644
index dabbdbecae..0000000000
--- a/cpp/src/qpid/io/LFProcessor.cpp
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/io/LFProcessor.h"
-#include "qpid/concurrent/APRBase.h"
-#include "qpid/io/LFSessionContext.h"
-#include "qpid/QpidError.h"
-#include <sstream>
-
-using namespace qpid::io;
-using namespace qpid::concurrent;
-using qpid::QpidError;
-
-// TODO aconway 2006-10-12: stopped is read outside locks.
-//
-
-LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) :
- size(_size),
- timeout(_timeout),
- signalledCount(0),
- current(0),
- count(0),
- workerCount(_workers),
- hasLeader(false),
- workers(new Thread*[_workers]),
- stopped(false)
-{
-
- CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
- //create & start the required number of threads
- for(int i = 0; i < workerCount; i++){
- workers[i] = factory.create(this);
- }
-}
-
-
-LFProcessor::~LFProcessor(){
- if (!stopped) stop();
- for(int i = 0; i < workerCount; i++){
- delete workers[i];
- }
- delete[] workers;
- CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
-}
-
-void LFProcessor::start(){
- for(int i = 0; i < workerCount; i++){
- workers[i]->start();
- }
-}
-
-void LFProcessor::add(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
- countLock.acquire();
- sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data));
- count++;
- countLock.release();
-}
-
-void LFProcessor::remove(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
- countLock.acquire();
- sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data)));
- count--;
- countLock.release();
-}
-
-void LFProcessor::reactivate(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
-}
-
-void LFProcessor::deactivate(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
-}
-
-void LFProcessor::update(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
- CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
-}
-
-bool LFProcessor::full(){
- Locker locker(countLock);
- return count == size;
-}
-
-bool LFProcessor::empty(){
- Locker locker(countLock);
- return count == 0;
-}
-
-void LFProcessor::poll() {
- apr_status_t status = APR_EGENERAL;
- do{
- current = 0;
- if(!stopped){
- status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs);
- }
- }while(status != APR_SUCCESS && !stopped);
-}
-
-void LFProcessor::run(){
- try{
- while(!stopped){
- leadLock.acquire();
- waitToLead();
- if(!stopped){
- const apr_pollfd_t* evt = getNextEvent();
- if(evt){
- LFSessionContext* session = reinterpret_cast<LFSessionContext*>(evt->client_data);
- session->startProcessing();
-
- relinquishLead();
- leadLock.release();
-
- //process event:
- if(evt->rtnevents & APR_POLLIN) session->read();
- if(evt->rtnevents & APR_POLLOUT) session->write();
-
- if(session->isClosed()){
- session->handleClose();
- countLock.acquire();
- sessions.erase(find(sessions.begin(), sessions.end(), session));
- count--;
- countLock.release();
- }else{
- session->stopProcessing();
- }
-
- }else{
- leadLock.release();
- }
- }else{
- leadLock.release();
- }
- }
- }catch(QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
- }
-}
-
-void LFProcessor::waitToLead(){
- while(hasLeader && !stopped) leadLock.wait();
- hasLeader = !stopped;
-}
-
-void LFProcessor::relinquishLead(){
- hasLeader = false;
- leadLock.notify();
-}
-
-const apr_pollfd_t* LFProcessor::getNextEvent(){
- while(true){
- if(stopped){
- return 0;
- }else if(current < signalledCount){
- //use result of previous poll if one is available
- return signalledFDs + (current++);
- }else{
- //else poll to get new events
- poll();
- }
- }
-}
-
-void LFProcessor::stop(){
- stopped = true;
- leadLock.acquire();
- leadLock.notifyAll();
- leadLock.release();
-
- for(int i = 0; i < workerCount; i++){
- workers[i]->join();
- }
-
- for(iterator i = sessions.begin(); i < sessions.end(); i++){
- (*i)->shutdown();
- }
-}
-
diff --git a/cpp/src/qpid/io/LFProcessor.h b/cpp/src/qpid/io/LFProcessor.h
deleted file mode 100644
index 5b61f444af..0000000000
--- a/cpp/src/qpid/io/LFProcessor.h
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _LFProcessor_
-#define _LFProcessor_
-
-#include "apr-1/apr_poll.h"
-#include <iostream>
-#include <vector>
-#include "qpid/concurrent/Monitor.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/concurrent/Runnable.h"
-
-namespace qpid {
-namespace io {
-
- class LFSessionContext;
-
- /**
- * This class processes a poll set using the leaders-followers
- * pattern for thread synchronization: the leader will poll and on
- * the poll returning, it will remove a session, promote a
- * follower to leadership, then process the session.
- */
- class LFProcessor : private virtual qpid::concurrent::Runnable
- {
- typedef std::vector<LFSessionContext*>::iterator iterator;
-
- const int size;
- const apr_interval_time_t timeout;
- apr_pollset_t* pollset;
- int signalledCount;
- int current;
- const apr_pollfd_t* signalledFDs;
- int count;
- const int workerCount;
- bool hasLeader;
- qpid::concurrent::Thread** const workers;
- qpid::concurrent::Monitor leadLock;
- qpid::concurrent::Monitor countLock;
- qpid::concurrent::ThreadFactory factory;
- std::vector<LFSessionContext*> sessions;
- volatile bool stopped;
-
- const apr_pollfd_t* getNextEvent();
- void waitToLead();
- void relinquishLead();
- void poll();
- virtual void run();
-
- public:
- LFProcessor(apr_pool_t* pool, int workers, int size, int timeout);
- /**
- * Add the fd to the poll set. Relies on the client_data being
- * an instance of LFSessionContext.
- */
- void add(const apr_pollfd_t* const fd);
- /**
- * Remove the fd from the poll set.
- */
- void remove(const apr_pollfd_t* const fd);
- /**
- * Signal that the fd passed in, already part of the pollset,
- * has had its flags altered.
- */
- void update(const apr_pollfd_t* const fd);
- /**
- * Add an fd back to the poll set after deactivation.
- */
- void reactivate(const apr_pollfd_t* const fd);
- /**
- * Temporarily remove the fd from the poll set. Called when processing
- * is about to begin.
- */
- void deactivate(const apr_pollfd_t* const fd);
- /**
- * Indicates whether the capacity of this processor has been
- * reached (or whether it can still handle further fd's).
- */
- bool full();
- /**
- * Indicates whether there are any fd's registered.
- */
- bool empty();
- /**
- * Stop processing.
- */
- void stop();
- /**
- * Start processing.
- */
- void start();
- /**
- * Is processing stopped?
- */
- bool isStopped();
-
- ~LFProcessor();
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/LFSessionContext.cpp b/cpp/src/qpid/io/LFSessionContext.cpp
deleted file mode 100644
index ca1e6431a6..0000000000
--- a/cpp/src/qpid/io/LFSessionContext.cpp
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/io/LFSessionContext.h"
-#include "qpid/concurrent/APRBase.h"
-#include "qpid/QpidError.h"
-#include <assert.h>
-
-using namespace qpid::concurrent;
-using namespace qpid::io;
-using namespace qpid::framing;
-
-LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket,
- LFProcessor* const _processor,
- bool _debug) :
- debug(_debug),
- socket(_socket),
- initiated(false),
- in(32768),
- out(32768),
- processor(_processor),
- processing(false),
- closing(false),
- reading(0),
- writing(0)
-{
-
- fd.p = _pool;
- fd.desc_type = APR_POLL_SOCKET;
- fd.reqevents = APR_POLLIN;
- fd.client_data = this;
- fd.desc.s = _socket;
-
- out.flip();
-}
-
-LFSessionContext::~LFSessionContext(){
-
-}
-
-void LFSessionContext::read(){
- assert(!reading); // No concurrent read.
- reading = Thread::currentThread();
-
- socket.read(in);
- in.flip();
- if(initiated){
- AMQFrame frame;
- while(frame.decode(in)){
- if(debug) log("RECV", &frame);
- handler->received(&frame);
- }
- }else{
- ProtocolInitiation protocolInit;
- if(protocolInit.decode(in)){
- handler->initiated(&protocolInit);
- initiated = true;
- if(debug) std::cout << "INIT [" << &socket << "]" << std::endl;
- }
- }
- in.compact();
-
- reading = 0;
-}
-
-void LFSessionContext::write(){
- assert(!writing); // No concurrent writes.
- writing = Thread::currentThread();
-
- bool done = isClosed();
- while(!done){
- if(out.available() > 0){
- socket.write(out);
- if(out.available() > 0){
- writing = 0;
-
- //incomplete write, leave flags to receive notification of readiness to write
- done = true;//finished processing for now, but write is still in progress
- }
- }else{
- //do we have any frames to write?
- writeLock.acquire();
- if(!framesToWrite.empty()){
- out.clear();
- bool encoded(false);
- AMQFrame* frame = framesToWrite.front();
- while(frame && out.available() >= frame->size()){
- encoded = true;
- frame->encode(out);
- if(debug) log("SENT", frame);
- delete frame;
- framesToWrite.pop();
- frame = framesToWrite.empty() ? 0 : framesToWrite.front();
- }
- if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
- out.flip();
- }else{
- //reset flags, don't care about writability anymore
- fd.reqevents = APR_POLLIN;
- done = true;
-
- writing = 0;
-
- if(closing){
- socket.close();
- }
- }
- writeLock.release();
- }
- }
-}
-
-void LFSessionContext::send(AMQFrame* frame){
- writeLock.acquire();
- if(!closing){
- framesToWrite.push(frame);
- if(!(fd.reqevents & APR_POLLOUT)){
- fd.reqevents |= APR_POLLOUT;
- if(!processing){
- processor->update(&fd);
- }
- }
- }
- writeLock.release();
-}
-
-void LFSessionContext::startProcessing(){
- writeLock.acquire();
- processing = true;
- processor->deactivate(&fd);
- writeLock.release();
-}
-
-void LFSessionContext::stopProcessing(){
- writeLock.acquire();
- processor->reactivate(&fd);
- processing = false;
- writeLock.release();
-}
-
-void LFSessionContext::close(){
- closing = true;
- writeLock.acquire();
- if(!processing){
- //allow pending frames to be written to socket
- fd.reqevents = APR_POLLOUT;
- processor->update(&fd);
- }
- writeLock.release();
-}
-
-void LFSessionContext::handleClose(){
- handler->closed();
- std::cout << "Session closed [" << &socket << "]" << std::endl;
- delete handler;
- delete this;
-}
-
-void LFSessionContext::shutdown(){
- socket.close();
- handleClose();
-}
-
-void LFSessionContext::init(SessionHandler* _handler){
- handler = _handler;
- processor->add(&fd);
-}
-
-void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){
- logLock.acquire();
- std::cout << desc << " [" << &socket << "]: " << *frame << std::endl;
- logLock.release();
-}
-
-Monitor LFSessionContext::logLock;
diff --git a/cpp/src/qpid/io/LFSessionContext.h b/cpp/src/qpid/io/LFSessionContext.h
deleted file mode 100644
index 8d30b54204..0000000000
--- a/cpp/src/qpid/io/LFSessionContext.h
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _LFSessionContext_
-#define _LFSessionContext_
-
-#include <queue>
-
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_poll.h"
-#include "apr-1/apr_time.h"
-
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/concurrent/Monitor.h"
-#include "qpid/io/APRSocket.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/io/LFProcessor.h"
-#include "qpid/io/SessionContext.h"
-#include "qpid/io/SessionHandler.h"
-
-namespace qpid {
-namespace io {
-
-
- class LFSessionContext : public virtual SessionContext
- {
- const bool debug;
- APRSocket socket;
- bool initiated;
-
- qpid::framing::Buffer in;
- qpid::framing::Buffer out;
-
- SessionHandler* handler;
- LFProcessor* const processor;
-
- apr_pollfd_t fd;
-
- std::queue<qpid::framing::AMQFrame*> framesToWrite;
- qpid::concurrent::Monitor writeLock;
-
- bool processing;
- bool closing;
-
- //these are just for debug, as a crude way of detecting concurrent access
- volatile unsigned int reading;
- volatile unsigned int writing;
-
- static qpid::concurrent::Monitor logLock;
- void log(const std::string& desc, qpid::framing::AMQFrame* const frame);
-
- public:
- LFSessionContext(apr_pool_t* pool, apr_socket_t* socket,
- LFProcessor* const processor,
- bool debug = false);
- ~LFSessionContext();
- virtual void send(qpid::framing::AMQFrame* frame);
- virtual void close();
- void read();
- void write();
- void init(SessionHandler* handler);
- void startProcessing();
- void stopProcessing();
- void handleClose();
- void shutdown();
- inline apr_pollfd_t* const getFd(){ return &fd; }
- inline bool isClosed(){ return !socket.isOpen(); }
- };
-
-}
-}
-
-
-#endif