summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/concurrent
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/concurrent')
-rw-r--r--cpp/src/qpid/concurrent/APRMonitor.h48
-rw-r--r--cpp/src/qpid/concurrent/APRThread.h48
-rw-r--r--cpp/src/qpid/concurrent/APRThreadFactory.h44
-rw-r--r--cpp/src/qpid/concurrent/APRThreadPool.h67
-rw-r--r--cpp/src/qpid/concurrent/LMonitor.h44
-rw-r--r--cpp/src/qpid/concurrent/LThreadFactory.h37
-rw-r--r--cpp/src/qpid/concurrent/Monitor.cpp (renamed from cpp/src/qpid/concurrent/APRMonitor.cpp)18
-rw-r--r--cpp/src/qpid/concurrent/Monitor.h41
-rw-r--r--cpp/src/qpid/concurrent/MonitorImpl.h57
-rw-r--r--cpp/src/qpid/concurrent/Thread.cpp (renamed from cpp/src/qpid/concurrent/APRThread.cpp)14
-rw-r--r--cpp/src/qpid/concurrent/Thread.h19
-rw-r--r--cpp/src/qpid/concurrent/ThreadFactory.cpp (renamed from cpp/src/qpid/concurrent/APRThreadFactory.cpp)10
-rw-r--r--cpp/src/qpid/concurrent/ThreadFactory.h10
-rw-r--r--cpp/src/qpid/concurrent/ThreadFactoryImpl.h52
-rw-r--r--cpp/src/qpid/concurrent/ThreadPool.cpp (renamed from cpp/src/qpid/concurrent/APRThreadPool.cpp)18
-rw-r--r--cpp/src/qpid/concurrent/ThreadPool.h35
16 files changed, 103 insertions, 459 deletions
diff --git a/cpp/src/qpid/concurrent/APRMonitor.h b/cpp/src/qpid/concurrent/APRMonitor.h
deleted file mode 100644
index a396beab50..0000000000
--- a/cpp/src/qpid/concurrent/APRMonitor.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _APRMonitor_
-#define _APRMonitor_
-
-#include "apr-1/apr_thread_mutex.h"
-#include "apr-1/apr_thread_cond.h"
-#include "qpid/concurrent/Monitor.h"
-
-namespace qpid {
-namespace concurrent {
-
- class APRMonitor : public virtual Monitor
- {
- apr_pool_t* pool;
- apr_thread_mutex_t* mutex;
- apr_thread_cond_t* condition;
-
- public:
- APRMonitor();
- virtual ~APRMonitor();
- virtual void wait();
- virtual void wait(u_int64_t time);
- virtual void notify();
- virtual void notifyAll();
- virtual void acquire();
- virtual void release();
- };
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/APRThread.h b/cpp/src/qpid/concurrent/APRThread.h
deleted file mode 100644
index 6328765a06..0000000000
--- a/cpp/src/qpid/concurrent/APRThread.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _APRThread_
-#define _APRThread_
-
-#include "apr-1/apr_thread_proc.h"
-#include "qpid/concurrent/APRThread.h"
-#include "qpid/concurrent/Runnable.h"
-#include "qpid/concurrent/Thread.h"
-
-namespace qpid {
-namespace concurrent {
-
- class APRThread : public Thread
- {
- const Runnable* runnable;
- apr_pool_t* pool;
- apr_thread_t* runner;
-
- public:
- APRThread(apr_pool_t* pool, Runnable* runnable);
- virtual ~APRThread();
- virtual void start();
- virtual void join();
- virtual void interrupt();
- static unsigned int currentThread();
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/APRThreadFactory.h b/cpp/src/qpid/concurrent/APRThreadFactory.h
deleted file mode 100644
index 40e96fc2d1..0000000000
--- a/cpp/src/qpid/concurrent/APRThreadFactory.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _APRThreadFactory_
-#define _APRThreadFactory_
-
-#include "apr-1/apr_thread_proc.h"
-
-#include "qpid/concurrent/APRThread.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/concurrent/Runnable.h"
-
-namespace qpid {
-namespace concurrent {
-
- class APRThreadFactory : public virtual ThreadFactory
- {
- apr_pool_t* pool;
- public:
- APRThreadFactory();
- virtual ~APRThreadFactory();
- virtual Thread* create(Runnable* runnable);
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/APRThreadPool.h b/cpp/src/qpid/concurrent/APRThreadPool.h
deleted file mode 100644
index cab5bcc9ce..0000000000
--- a/cpp/src/qpid/concurrent/APRThreadPool.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _APRThreadPool_
-#define _APRThreadPool_
-
-#include <queue>
-#include <vector>
-#include "qpid/concurrent/APRMonitor.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/concurrent/ThreadPool.h"
-#include "qpid/concurrent/Runnable.h"
-
-namespace qpid {
-namespace concurrent {
-
- class APRThreadPool : public virtual ThreadPool
- {
- class Worker : public virtual Runnable{
- APRThreadPool* pool;
- public:
- inline Worker(APRThreadPool* _pool) : pool(_pool){}
- inline virtual void run(){
- while(pool->running){
- pool->runTask();
- }
- }
- };
- const bool deleteFactory;
- const int size;
- ThreadFactory* factory;
- APRMonitor lock;
- std::vector<Thread*> threads;
- std::queue<Runnable*> tasks;
- Worker* worker;
- volatile bool running;
-
- void runTask();
- public:
- APRThreadPool(int size);
- APRThreadPool(int size, ThreadFactory* factory);
- virtual void start();
- virtual void stop();
- virtual void addTask(Runnable* task);
- virtual ~APRThreadPool();
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/LMonitor.h b/cpp/src/qpid/concurrent/LMonitor.h
deleted file mode 100644
index 70e99b9807..0000000000
--- a/cpp/src/qpid/concurrent/LMonitor.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _LMonitor_
-#define _LMonitor_
-
-/* Native Linux Monitor - Based of Kernel patch 19/20 */
-
-#include "qpid/concurrent/Monitor.h"
-
-namespace qpid {
-namespace concurrent {
-
- class LMonitor : public virtual Monitor
- {
-
- public:
- LMonitor();
- virtual ~LMonitor();
- virtual void wait();
- virtual void notify();
- virtual void notifyAll();
- virtual void acquire();
- virtual void release();
- };
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/LThreadFactory.h b/cpp/src/qpid/concurrent/LThreadFactory.h
deleted file mode 100644
index 4a573d1bd1..0000000000
--- a/cpp/src/qpid/concurrent/LThreadFactory.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _LAPRThreadFactory_
-#define _LAPRThreadFactory_
-
-
-namespace qpid {
-namespace concurrent {
-
- class LThreadFactory
- {
- public:
- LThreadFactory();
- virtual ~LThreadFactory();
- virtual Thread* create(Runnable* runnable);
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/APRMonitor.cpp b/cpp/src/qpid/concurrent/Monitor.cpp
index cc5eda800f..ae68cf8751 100644
--- a/cpp/src/qpid/concurrent/APRMonitor.cpp
+++ b/cpp/src/qpid/concurrent/Monitor.cpp
@@ -16,45 +16,45 @@
*
*/
#include "qpid/concurrent/APRBase.h"
-#include "qpid/concurrent/APRMonitor.h"
+#include "qpid/concurrent/Monitor.h"
#include <iostream>
-qpid::concurrent::APRMonitor::APRMonitor(){
+qpid::concurrent::Monitor::Monitor(){
APRBase::increment();
CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool));
}
-qpid::concurrent::APRMonitor::~APRMonitor(){
+qpid::concurrent::Monitor::~Monitor(){
CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
apr_pool_destroy(pool);
APRBase::decrement();
}
-void qpid::concurrent::APRMonitor::wait(){
+void qpid::concurrent::Monitor::wait(){
CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
}
-void qpid::concurrent::APRMonitor::wait(u_int64_t time){
+void qpid::concurrent::Monitor::wait(u_int64_t time){
apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000);
if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);
}
-void qpid::concurrent::APRMonitor::notify(){
+void qpid::concurrent::Monitor::notify(){
CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
}
-void qpid::concurrent::APRMonitor::notifyAll(){
+void qpid::concurrent::Monitor::notifyAll(){
CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
}
-void qpid::concurrent::APRMonitor::acquire(){
+void qpid::concurrent::Monitor::acquire(){
CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
}
-void qpid::concurrent::APRMonitor::release(){
+void qpid::concurrent::Monitor::release(){
CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
}
diff --git a/cpp/src/qpid/concurrent/Monitor.h b/cpp/src/qpid/concurrent/Monitor.h
index 42e88c0a48..a2777cb2f1 100644
--- a/cpp/src/qpid/concurrent/Monitor.h
+++ b/cpp/src/qpid/concurrent/Monitor.h
@@ -18,42 +18,39 @@
#ifndef _Monitor_
#define _Monitor_
-#include "qpid/framing/amqp_types.h"
+#include "apr-1/apr_thread_mutex.h"
+#include "apr-1/apr_thread_cond.h"
+#include "qpid/concurrent/Monitor.h"
namespace qpid {
namespace concurrent {
class Monitor
{
+ apr_pool_t* pool;
+ apr_thread_mutex_t* mutex;
+ apr_thread_cond_t* condition;
+
public:
- virtual ~Monitor(){}
- virtual void wait() = 0;
- virtual void wait(u_int64_t time) = 0;
- virtual void notify() = 0;
- virtual void notifyAll() = 0;
- virtual void acquire() = 0;
- virtual void release() = 0;
+ Monitor();
+ virtual ~Monitor();
+ virtual void wait();
+ virtual void wait(u_int64_t time);
+ virtual void notify();
+ virtual void notifyAll();
+ virtual void acquire();
+ virtual void release();
};
-/**
- * Scoped locker for a monitor.
- */
class Locker
{
public:
- Locker(Monitor& lock_) : lock(lock_) { lock.acquire(); }
- ~Locker() { lock.release(); }
-
+ Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); }
+ ~Locker() { monitor.release(); }
private:
- Monitor& lock;
-
- // private and unimplemented to prevent copying
- Locker(const Locker&);
- void operator=(const Locker&);
+ Monitor& monitor;
};
-
-}
-}
+}}
#endif
diff --git a/cpp/src/qpid/concurrent/MonitorImpl.h b/cpp/src/qpid/concurrent/MonitorImpl.h
deleted file mode 100644
index 258ad140b3..0000000000
--- a/cpp/src/qpid/concurrent/MonitorImpl.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-
-#ifndef _MonitorImpl_
-#define _MonitorImpl_
-
-#ifdef _USE_APR_IO_
-#include "qpid/concurrent/APRMonitor.h"
-#else /* use POSIX Monitor */
-#include "qpid/concurrent/LMonitor.h"
-#endif
-
-
-namespace qpid {
-namespace concurrent {
-
-#ifdef _USE_APR_IO_
- class MonitorImpl : public virtual APRMonitor
- {
-
- public:
- MonitorImpl() : APRMonitor(){};
- virtual ~MonitorImpl(){};
-
- };
-#else
- class MonitorImpl : public virtual LMonitor
- {
-
- public:
- MonitorImpl() : LMonitor(){};
- virtual ~MonitorImpl(){};
-
- };
-#endif
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/APRThread.cpp b/cpp/src/qpid/concurrent/Thread.cpp
index d4d073cac6..9bbc2f8131 100644
--- a/cpp/src/qpid/concurrent/APRThread.cpp
+++ b/cpp/src/qpid/concurrent/Thread.cpp
@@ -16,7 +16,7 @@
*
*/
#include "qpid/concurrent/APRBase.h"
-#include "qpid/concurrent/APRThread.h"
+#include "qpid/concurrent/Thread.h"
#include "apr-1/apr_portable.h"
using namespace qpid::concurrent;
@@ -27,24 +27,24 @@ void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){
return NULL;
}
-APRThread::APRThread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {}
+Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {}
-APRThread::~APRThread(){
+Thread::~Thread(){
}
-void APRThread::start(){
+void Thread::start(){
CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool));
}
-void APRThread::join(){
+void Thread::join(){
apr_status_t status;
if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner));
}
-void APRThread::interrupt(){
+void Thread::interrupt(){
if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS));
}
-unsigned int qpid::concurrent::APRThread::currentThread(){
+unsigned int qpid::concurrent::Thread::currentThread(){
return apr_os_thread_current();
}
diff --git a/cpp/src/qpid/concurrent/Thread.h b/cpp/src/qpid/concurrent/Thread.h
index 6bd2a379ce..d18bc153bf 100644
--- a/cpp/src/qpid/concurrent/Thread.h
+++ b/cpp/src/qpid/concurrent/Thread.h
@@ -18,16 +18,27 @@
#ifndef _Thread_
#define _Thread_
+#include "apr-1/apr_thread_proc.h"
+#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/concurrent/Thread.h"
+
namespace qpid {
namespace concurrent {
class Thread
{
+ const Runnable* runnable;
+ apr_pool_t* pool;
+ apr_thread_t* runner;
+
public:
- virtual ~Thread(){}
- virtual void start() = 0;
- virtual void join() = 0;
- virtual void interrupt() = 0;
+ Thread(apr_pool_t* pool, Runnable* runnable);
+ virtual ~Thread();
+ virtual void start();
+ virtual void join();
+ virtual void interrupt();
+ static unsigned int currentThread();
};
}
diff --git a/cpp/src/qpid/concurrent/APRThreadFactory.cpp b/cpp/src/qpid/concurrent/ThreadFactory.cpp
index 1c99a3da33..b20f9f2b04 100644
--- a/cpp/src/qpid/concurrent/APRThreadFactory.cpp
+++ b/cpp/src/qpid/concurrent/ThreadFactory.cpp
@@ -16,20 +16,20 @@
*
*/
#include "qpid/concurrent/APRBase.h"
-#include "qpid/concurrent/APRThreadFactory.h"
+#include "qpid/concurrent/ThreadFactory.h"
using namespace qpid::concurrent;
-APRThreadFactory::APRThreadFactory(){
+ThreadFactory::ThreadFactory(){
APRBase::increment();
CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
}
-APRThreadFactory::~APRThreadFactory(){
+ThreadFactory::~ThreadFactory(){
apr_pool_destroy(pool);
APRBase::decrement();
}
-Thread* APRThreadFactory::create(Runnable* runnable){
- return new APRThread(pool, runnable);
+Thread* ThreadFactory::create(Runnable* runnable){
+ return new Thread(pool, runnable);
}
diff --git a/cpp/src/qpid/concurrent/ThreadFactory.h b/cpp/src/qpid/concurrent/ThreadFactory.h
index 60c8ad2556..572419cae6 100644
--- a/cpp/src/qpid/concurrent/ThreadFactory.h
+++ b/cpp/src/qpid/concurrent/ThreadFactory.h
@@ -18,7 +18,11 @@
#ifndef _ThreadFactory_
#define _ThreadFactory_
+#include "apr-1/apr_thread_proc.h"
+
+#include "qpid/concurrent/Thread.h"
#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
#include "qpid/concurrent/Runnable.h"
namespace qpid {
@@ -26,9 +30,11 @@ namespace concurrent {
class ThreadFactory
{
+ apr_pool_t* pool;
public:
- virtual ~ThreadFactory(){}
- virtual Thread* create(Runnable* runnable) = 0;
+ ThreadFactory();
+ virtual ~ThreadFactory();
+ virtual Thread* create(Runnable* runnable);
};
}
diff --git a/cpp/src/qpid/concurrent/ThreadFactoryImpl.h b/cpp/src/qpid/concurrent/ThreadFactoryImpl.h
deleted file mode 100644
index 352b77ac21..0000000000
--- a/cpp/src/qpid/concurrent/ThreadFactoryImpl.h
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _ThreadFactoryImpl_
-#define _ThreadFactoryImpl_
-
-
-#ifdef _USE_APR_IO_
-#include "qpid/concurrent/APRThreadFactory.h"
-#else
-#include "qpid/concurrent/LThreadFactory.h"
-#endif
-
-
-namespace qpid {
-namespace concurrent {
-
-
-#ifdef _USE_APR_IO_
- class ThreadFactoryImpl : public virtual APRThreadFactory
- {
- public:
- ThreadFactoryImpl(): APRThreadFactory() {};
- virtual ~ThreadFactoryImpl() {};
- };
-#else
- class ThreadFactoryImpl : public virtual LThreadFactory
- {
- public:
- ThreadFactoryImpl(): LThreadFactory() {};
- virtual ~ThreadFactoryImpl() {};
- };
-#endif
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/APRThreadPool.cpp b/cpp/src/qpid/concurrent/ThreadPool.cpp
index 3222c71b0c..5da19745a7 100644
--- a/cpp/src/qpid/concurrent/APRThreadPool.cpp
+++ b/cpp/src/qpid/concurrent/ThreadPool.cpp
@@ -15,33 +15,33 @@
* limitations under the License.
*
*/
-#include "qpid/concurrent/APRThreadFactory.h"
-#include "qpid/concurrent/APRThreadPool.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/concurrent/ThreadPool.h"
#include "qpid/QpidError.h"
#include <iostream>
using namespace qpid::concurrent;
-APRThreadPool::APRThreadPool(int _size) : deleteFactory(true), size(_size), factory(new APRThreadFactory()), running(false){
+ThreadPool::ThreadPool(int _size) : deleteFactory(true), size(_size), factory(new ThreadFactory()), running(false){
worker = new Worker(this);
}
-APRThreadPool::APRThreadPool(int _size, ThreadFactory* _factory) : deleteFactory(false), size(_size), factory(_factory), running(false){
+ThreadPool::ThreadPool(int _size, ThreadFactory* _factory) : deleteFactory(false), size(_size), factory(_factory), running(false){
worker = new Worker(this);
}
-APRThreadPool::~APRThreadPool(){
+ThreadPool::~ThreadPool(){
if(deleteFactory) delete factory;
}
-void APRThreadPool::addTask(Runnable* task){
+void ThreadPool::addTask(Runnable* task){
lock.acquire();
tasks.push(task);
lock.notifyAll();
lock.release();
}
-void APRThreadPool::runTask(){
+void ThreadPool::runTask(){
lock.acquire();
while(tasks.empty()){
lock.wait();
@@ -56,7 +56,7 @@ void APRThreadPool::runTask(){
}
}
-void APRThreadPool::start(){
+void ThreadPool::start(){
if(!running){
running = true;
for(int i = 0; i < size; i++){
@@ -67,7 +67,7 @@ void APRThreadPool::start(){
}
}
-void APRThreadPool::stop(){
+void ThreadPool::stop(){
if(!running){
running = false;
lock.acquire();
diff --git a/cpp/src/qpid/concurrent/ThreadPool.h b/cpp/src/qpid/concurrent/ThreadPool.h
index 925faa76de..11f0cc364f 100644
--- a/cpp/src/qpid/concurrent/ThreadPool.h
+++ b/cpp/src/qpid/concurrent/ThreadPool.h
@@ -18,7 +18,12 @@
#ifndef _ThreadPool_
#define _ThreadPool_
+#include <queue>
+#include <vector>
+#include "qpid/concurrent/Monitor.h"
#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/concurrent/ThreadPool.h"
#include "qpid/concurrent/Runnable.h"
namespace qpid {
@@ -26,11 +31,33 @@ namespace concurrent {
class ThreadPool
{
+ class Worker : public virtual Runnable{
+ ThreadPool* pool;
+ public:
+ inline Worker(ThreadPool* _pool) : pool(_pool){}
+ inline virtual void run(){
+ while(pool->running){
+ pool->runTask();
+ }
+ }
+ };
+ const bool deleteFactory;
+ const int size;
+ ThreadFactory* factory;
+ Monitor lock;
+ std::vector<Thread*> threads;
+ std::queue<Runnable*> tasks;
+ Worker* worker;
+ volatile bool running;
+
+ void runTask();
public:
- virtual void start() = 0;
- virtual void stop() = 0;
- virtual void addTask(Runnable* runnable) = 0;
- virtual ~ThreadPool(){}
+ ThreadPool(int size);
+ ThreadPool(int size, ThreadFactory* factory);
+ virtual void start();
+ virtual void stop();
+ virtual void addTask(Runnable* task);
+ virtual ~ThreadPool();
};
}