summaryrefslogtreecommitdiff
path: root/cpp/common/concurrent
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/common/concurrent')
-rw-r--r--cpp/common/concurrent/inc/APRBase.h63
-rw-r--r--cpp/common/concurrent/inc/APRMonitor.h48
-rw-r--r--cpp/common/concurrent/inc/APRThread.h48
-rw-r--r--cpp/common/concurrent/inc/APRThreadFactory.h44
-rw-r--r--cpp/common/concurrent/inc/APRThreadPool.h67
-rw-r--r--cpp/common/concurrent/inc/LMonitor.h44
-rw-r--r--cpp/common/concurrent/inc/LThreadFactory.h37
-rw-r--r--cpp/common/concurrent/inc/LockedQueue.h68
-rw-r--r--cpp/common/concurrent/inc/Monitor.h59
-rw-r--r--cpp/common/concurrent/inc/MonitorImpl.h57
-rw-r--r--cpp/common/concurrent/inc/Runnable.h34
-rw-r--r--cpp/common/concurrent/inc/TaskQueue.h200
-rw-r--r--cpp/common/concurrent/inc/Thread.h37
-rw-r--r--cpp/common/concurrent/inc/ThreadFactory.h38
-rw-r--r--cpp/common/concurrent/inc/ThreadFactoryImpl.h52
-rw-r--r--cpp/common/concurrent/inc/ThreadPool.h40
-rw-r--r--cpp/common/concurrent/src/APRBase.cpp97
-rw-r--r--cpp/common/concurrent/src/APRMonitor.cpp60
-rw-r--r--cpp/common/concurrent/src/APRThread.cpp50
-rw-r--r--cpp/common/concurrent/src/APRThreadFactory.cpp35
-rw-r--r--cpp/common/concurrent/src/APRThreadPool.cpp85
21 files changed, 1263 insertions, 0 deletions
diff --git a/cpp/common/concurrent/inc/APRBase.h b/cpp/common/concurrent/inc/APRBase.h
new file mode 100644
index 0000000000..e0b526faa1
--- /dev/null
+++ b/cpp/common/concurrent/inc/APRBase.h
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _APRBase_
+#define _APRBase_
+
+#include <string>
+#include "apr_thread_mutex.h"
+#include "apr_errno.h"
+
+namespace qpid {
+namespace concurrent {
+
+ /**
+ * Use of APR libraries necessitates explicit init and terminate
+ * calls. Any class using APR libs should obtain the reference to
+ * this singleton and increment on construction, decrement on
+ * destruction. This class can then correctly initialise apr
+ * before the first use and terminate after the last use.
+ */
+ class APRBase{
+ static APRBase* instance;
+ apr_pool_t* pool;
+ apr_thread_mutex_t* mutex;
+ int count;
+
+ APRBase();
+ ~APRBase();
+ static APRBase* getInstance();
+ bool _increment();
+ void _decrement();
+ public:
+ static void increment();
+ static void decrement();
+ };
+
+ //this is also a convenient place for a helper function for error checking:
+ void check(apr_status_t status, const std::string& file, const int line);
+ std::string get_desc(apr_status_t status);
+
+#define CHECK_APR_SUCCESS(A) check(A, __FILE__, __LINE__);
+
+}
+}
+
+
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/APRMonitor.h b/cpp/common/concurrent/inc/APRMonitor.h
new file mode 100644
index 0000000000..bf72596564
--- /dev/null
+++ b/cpp/common/concurrent/inc/APRMonitor.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _APRMonitor_
+#define _APRMonitor_
+
+#include "apr_thread_mutex.h"
+#include "apr_thread_cond.h"
+#include "Monitor.h"
+
+namespace qpid {
+namespace concurrent {
+
+ class APRMonitor : public virtual Monitor
+ {
+ apr_pool_t* pool;
+ apr_thread_mutex_t* mutex;
+ apr_thread_cond_t* condition;
+
+ public:
+ APRMonitor();
+ virtual ~APRMonitor();
+ virtual void wait();
+ virtual void wait(u_int64_t time);
+ virtual void notify();
+ virtual void notifyAll();
+ virtual void acquire();
+ virtual void release();
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/APRThread.h b/cpp/common/concurrent/inc/APRThread.h
new file mode 100644
index 0000000000..d5034ce3b7
--- /dev/null
+++ b/cpp/common/concurrent/inc/APRThread.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _APRThread_
+#define _APRThread_
+
+#include "apr_thread_proc.h"
+#include "APRThread.h"
+#include "Runnable.h"
+#include "Thread.h"
+
+namespace qpid {
+namespace concurrent {
+
+ class APRThread : public virtual Thread
+ {
+ const Runnable* runnable;
+ apr_pool_t* pool;
+ apr_thread_t* runner;
+
+ public:
+ APRThread(apr_pool_t* pool, Runnable* runnable);
+ virtual ~APRThread();
+ virtual void start();
+ virtual void join();
+ virtual void interrupt();
+ static unsigned int currentThread();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/APRThreadFactory.h b/cpp/common/concurrent/inc/APRThreadFactory.h
new file mode 100644
index 0000000000..87b240025d
--- /dev/null
+++ b/cpp/common/concurrent/inc/APRThreadFactory.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _APRThreadFactory_
+#define _APRThreadFactory_
+
+#include "apr_thread_proc.h"
+
+#include "APRThread.h"
+#include "Thread.h"
+#include "ThreadFactory.h"
+#include "Runnable.h"
+
+namespace qpid {
+namespace concurrent {
+
+ class APRThreadFactory : public virtual ThreadFactory
+ {
+ apr_pool_t* pool;
+ public:
+ APRThreadFactory();
+ virtual ~APRThreadFactory();
+ virtual Thread* create(Runnable* runnable);
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/APRThreadPool.h b/cpp/common/concurrent/inc/APRThreadPool.h
new file mode 100644
index 0000000000..cf6d30774c
--- /dev/null
+++ b/cpp/common/concurrent/inc/APRThreadPool.h
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _APRThreadPool_
+#define _APRThreadPool_
+
+#include <queue>
+#include <vector>
+#include "APRMonitor.h"
+#include "Thread.h"
+#include "ThreadFactory.h"
+#include "ThreadPool.h"
+#include "Runnable.h"
+
+namespace qpid {
+namespace concurrent {
+
+ class APRThreadPool : public virtual ThreadPool
+ {
+ class Worker : public virtual Runnable{
+ APRThreadPool* pool;
+ public:
+ inline Worker(APRThreadPool* _pool) : pool(_pool){}
+ inline virtual void run(){
+ while(pool->running){
+ pool->runTask();
+ }
+ }
+ };
+ const bool deleteFactory;
+ const int size;
+ ThreadFactory* factory;
+ APRMonitor lock;
+ std::vector<Thread*> threads;
+ std::queue<Runnable*> tasks;
+ Worker* worker;
+ volatile bool running;
+
+ void runTask();
+ public:
+ APRThreadPool(int size);
+ APRThreadPool(int size, ThreadFactory* factory);
+ virtual void start();
+ virtual void stop();
+ virtual void addTask(Runnable* task);
+ virtual ~APRThreadPool();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/LMonitor.h b/cpp/common/concurrent/inc/LMonitor.h
new file mode 100644
index 0000000000..8e2569921d
--- /dev/null
+++ b/cpp/common/concurrent/inc/LMonitor.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LMonitor_
+#define _LMonitor_
+
+/* Native Linux Monitor - Based of Kernel patch 19/20 */
+
+#include "Monitor.h"
+
+namespace qpid {
+namespace concurrent {
+
+ class LMonitor : public virtual Monitor
+ {
+
+ public:
+ LMonitor();
+ virtual ~LMonitor();
+ virtual void wait();
+ virtual void notify();
+ virtual void notifyAll();
+ virtual void acquire();
+ virtual void release();
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/LThreadFactory.h b/cpp/common/concurrent/inc/LThreadFactory.h
new file mode 100644
index 0000000000..4a573d1bd1
--- /dev/null
+++ b/cpp/common/concurrent/inc/LThreadFactory.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LAPRThreadFactory_
+#define _LAPRThreadFactory_
+
+
+namespace qpid {
+namespace concurrent {
+
+ class LThreadFactory
+ {
+ public:
+ LThreadFactory();
+ virtual ~LThreadFactory();
+ virtual Thread* create(Runnable* runnable);
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/LockedQueue.h b/cpp/common/concurrent/inc/LockedQueue.h
new file mode 100644
index 0000000000..ef3f0b8381
--- /dev/null
+++ b/cpp/common/concurrent/inc/LockedQueue.h
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LockedQueue_
+#define _LockedQueue_
+
+#include <queue>
+#include "Monitor.h"
+
+/**
+ * A threadsafe queue abstraction
+ */
+namespace qpid {
+namespace concurrent {
+ template<class T, class L> class LockedQueue
+ {
+ L lock;
+ std::queue<T*> queue;
+
+ public:
+ void put(T* item);
+ T* take();
+ bool empty();
+ };
+
+ template<class T, class L> void LockedQueue<T, L>::put(T* item){
+ lock.acquire();
+ queue.push(item);
+ lock.release();
+ }
+
+ template<class T, class L> T* LockedQueue<T, L>::take(){
+ lock.acquire();
+ T* item = 0;
+ if(!queue.empty()){
+ item = queue.front();
+ queue.pop();
+ }
+ lock.release();
+ return item;
+ }
+
+ template<class T, class L> bool LockedQueue<T, L>::empty(){
+ lock.acquire();
+ bool result = queue.empty();
+ lock.release();
+ return result;
+ }
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/Monitor.h b/cpp/common/concurrent/inc/Monitor.h
new file mode 100644
index 0000000000..7f1a299c6a
--- /dev/null
+++ b/cpp/common/concurrent/inc/Monitor.h
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Monitor_
+#define _Monitor_
+
+#include "amqp_types.h"
+
+namespace qpid {
+namespace concurrent {
+
+class Monitor
+{
+ public:
+ virtual ~Monitor(){}
+ virtual void wait() = 0;
+ virtual void wait(u_int64_t time) = 0;
+ virtual void notify() = 0;
+ virtual void notifyAll() = 0;
+ virtual void acquire() = 0;
+ virtual void release() = 0;
+};
+
+/**
+ * Scoped locker for a monitor.
+ */
+class Locker
+{
+ public:
+ Locker(Monitor& lock_) : lock(lock_) { lock.acquire(); }
+ ~Locker() { lock.release(); }
+
+ private:
+ Monitor& lock;
+
+ // private and unimplemented to prevent copying
+ Locker(const Locker&);
+ void operator=(const Locker&);
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/MonitorImpl.h b/cpp/common/concurrent/inc/MonitorImpl.h
new file mode 100644
index 0000000000..e96e81d795
--- /dev/null
+++ b/cpp/common/concurrent/inc/MonitorImpl.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#ifndef _MonitorImpl_
+#define _MonitorImpl_
+
+#ifdef _USE_APR_IO_
+#include "APRMonitor.h"
+#else /* use POSIX Monitor */
+#include "LMonitor.h"
+#endif
+
+
+namespace qpid {
+namespace concurrent {
+
+#ifdef _USE_APR_IO_
+ class MonitorImpl : public virtual APRMonitor
+ {
+
+ public:
+ MonitorImpl() : APRMonitor(){};
+ virtual ~MonitorImpl(){};
+
+ };
+#else
+ class MonitorImpl : public virtual LMonitor
+ {
+
+ public:
+ MonitorImpl() : LMonitor(){};
+ virtual ~MonitorImpl(){};
+
+ };
+#endif
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/Runnable.h b/cpp/common/concurrent/inc/Runnable.h
new file mode 100644
index 0000000000..523ad813f7
--- /dev/null
+++ b/cpp/common/concurrent/inc/Runnable.h
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Runnable_
+#define _Runnable_
+
+namespace qpid {
+namespace concurrent {
+
+ class Runnable
+ {
+ public:
+ virtual void run() = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/TaskQueue.h b/cpp/common/concurrent/inc/TaskQueue.h
new file mode 100644
index 0000000000..e06a3ce069
--- /dev/null
+++ b/cpp/common/concurrent/inc/TaskQueue.h
@@ -0,0 +1,200 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TaskQueue_
+#define _TaskQueue_
+
+#include <iostream>
+#include <memory>
+#include <queue>
+#include "LockedQueue.h"
+#include "Runnable.h"
+#include "ThreadPool.h"
+
+namespace qpid {
+namespace concurrent {
+ template<class T, class L> class TaskQueue : public virtual Runnable
+ {
+ const int max_iterations_per_run;
+ L lock;
+ //LockedQueue<T, L> queue;
+ std::queue<T*> queue;
+ ThreadPool* const pool;
+ T* work;
+ bool running;
+ volatile bool stopped;
+ TaskQueue<T, L>* next;
+
+ volatile bool inrun;
+
+ bool hasWork();
+ void completed();
+
+ T* take();
+
+ protected:
+ /**
+ * Callback though which the task is executed
+ */
+ virtual void execute(T* t) = 0;
+ /**
+ * Allows a task to be completed asynchronously to the
+ * execute() call if required.
+ */
+ virtual bool isComplete(T* t);
+ /**
+ * Should be called to signal completion of a task that was
+ * signalled as not complete through the isComplete() methods
+ * return value. This will allow normal processing to resume.
+ */
+ virtual void complete();
+
+ public:
+ TaskQueue(ThreadPool* const pool, int max_iterations_per_run = 100);
+ virtual void run();
+ void trigger();
+ bool append(T* t);
+ void stop(bool drain);
+ inline void setNext(TaskQueue<T, L>* next){ this->next = next; }
+ };
+
+ template<class T, class L> TaskQueue<T, L>::TaskQueue(ThreadPool* const _pool, int _max_iterations_per_run) :
+ pool(_pool),
+ max_iterations_per_run(_max_iterations_per_run),
+ work(0),
+ running(false),
+ stopped(false),
+ next(0), inrun(false){
+ }
+
+ template<class T, class L> void TaskQueue<T, L>::run(){
+ if(inrun) std::cout << "Already running" << std::endl;
+ inrun = true;
+
+ bool blocked = false;
+ int count = max_iterations_per_run;
+ while(!blocked && hasWork() && count){
+ execute(work);
+ if(isComplete(work)){
+ completed();
+ }else{
+ blocked = true;
+ }
+ count--;
+ }
+ inrun = false;
+
+ if(!blocked && count == 0){//performed max_iterations_per_run, requeue task to ensure fairness
+ //running will still be true at this point
+ lock.acquire();
+ running = false;
+ if(stopped) lock.notify();
+ lock.release();
+
+ trigger();
+ }else if(hasWork()){//task was added to queue after we exited the loop above; should not need this?
+ trigger();
+ }
+ }
+
+ template<class T, class L> void TaskQueue<T, L>::trigger(){
+ lock.acquire();
+ if(!running){
+ running = true;
+ pool->addTask(this);
+ }
+ lock.release();
+ }
+
+ template<class T, class L> bool TaskQueue<T, L>::hasWork(){
+ lock.acquire();
+ if(!work) work = take();//queue.take();
+ if(!work){
+ running = false;
+ if(stopped) lock.notify();
+ }
+ lock.release();
+ return work;
+ }
+
+ template<class T, class L> bool TaskQueue<T, L>::append(T* item){
+ if(!stopped){
+ lock.acquire();
+
+ //queue.put(item);
+ queue.push(item);
+
+ if(!running){
+ running = true;
+ pool->addTask(this);
+ }
+ lock.release();
+ //}
+ return true;
+ }else{
+ return false;
+ }
+ }
+
+ template<class T, class L> bool TaskQueue<T, L>::isComplete(T* item){
+ return true;//by default assume all tasks are synchronous w.r.t. execute()
+ }
+
+
+ template<class T, class L> void TaskQueue<T, L>::completed(){
+ if(next){
+ if(!next->append(work)){
+ std::cout << "Warning: dropping task as next queue appears to have stopped." << std::endl;
+ }
+ }else{
+ delete work;
+ }
+ work = 0;
+ }
+
+ template<class T, class L> void TaskQueue<T, L>::complete(){
+ completed();
+ lock.acquire();
+ running = false;
+ if(stopped) lock.notify();
+ lock.release();
+ }
+
+ template<class T, class L> void TaskQueue<T, L>::stop(bool drain){
+ //prevent new tasks from being added
+ stopped = true;
+ //wait until no longer running
+ lock.acquire();
+ while(running && (drain && hasWork())){
+ lock.wait();
+ }
+ lock.release();
+ }
+
+ template<class T, class L> T* TaskQueue<T, L>::take(){
+ T* item = 0;
+ if(!queue.empty()){
+ item = queue.front();
+ queue.pop();
+ }
+ return item;
+ }
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/Thread.h b/cpp/common/concurrent/inc/Thread.h
new file mode 100644
index 0000000000..6bd2a379ce
--- /dev/null
+++ b/cpp/common/concurrent/inc/Thread.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Thread_
+#define _Thread_
+
+namespace qpid {
+namespace concurrent {
+
+ class Thread
+ {
+ public:
+ virtual ~Thread(){}
+ virtual void start() = 0;
+ virtual void join() = 0;
+ virtual void interrupt() = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/ThreadFactory.h b/cpp/common/concurrent/inc/ThreadFactory.h
new file mode 100644
index 0000000000..53be000ff3
--- /dev/null
+++ b/cpp/common/concurrent/inc/ThreadFactory.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ThreadFactory_
+#define _ThreadFactory_
+
+#include "Thread.h"
+#include "Runnable.h"
+
+namespace qpid {
+namespace concurrent {
+
+ class ThreadFactory
+ {
+ public:
+ virtual ~ThreadFactory(){}
+ virtual Thread* create(Runnable* runnable) = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/ThreadFactoryImpl.h b/cpp/common/concurrent/inc/ThreadFactoryImpl.h
new file mode 100644
index 0000000000..a534b3c1e2
--- /dev/null
+++ b/cpp/common/concurrent/inc/ThreadFactoryImpl.h
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ThreadFactoryImpl_
+#define _ThreadFactoryImpl_
+
+
+#ifdef _USE_APR_IO_
+#include "APRThreadFactory.h"
+#else
+#include "LThreadFactory.h"
+#endif
+
+
+namespace qpid {
+namespace concurrent {
+
+
+#ifdef _USE_APR_IO_
+ class ThreadFactoryImpl : public virtual APRThreadFactory
+ {
+ public:
+ ThreadFactoryImpl(): APRThreadFactory() {};
+ virtual ~ThreadFactoryImpl() {};
+ };
+#else
+ class ThreadFactoryImpl : public virtual LThreadFactory
+ {
+ public:
+ ThreadFactoryImpl(): LThreadFactory() {};
+ virtual ~ThreadFactoryImpl() {};
+ };
+#endif
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/inc/ThreadPool.h b/cpp/common/concurrent/inc/ThreadPool.h
new file mode 100644
index 0000000000..679c889ff3
--- /dev/null
+++ b/cpp/common/concurrent/inc/ThreadPool.h
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ThreadPool_
+#define _ThreadPool_
+
+#include "Thread.h"
+#include "Runnable.h"
+
+namespace qpid {
+namespace concurrent {
+
+ class ThreadPool
+ {
+ public:
+ virtual void start() = 0;
+ virtual void stop() = 0;
+ virtual void addTask(Runnable* runnable) = 0;
+ virtual ~ThreadPool(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/concurrent/src/APRBase.cpp b/cpp/common/concurrent/src/APRBase.cpp
new file mode 100644
index 0000000000..f87ea9e25f
--- /dev/null
+++ b/cpp/common/concurrent/src/APRBase.cpp
@@ -0,0 +1,97 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "APRBase.h"
+#include "QpidError.h"
+
+using namespace qpid::concurrent;
+
+APRBase* APRBase::instance = 0;
+
+APRBase* APRBase::getInstance(){
+ if(instance == 0){
+ instance = new APRBase();
+ }
+ return instance;
+}
+
+
+APRBase::APRBase() : count(0){
+ apr_initialize();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, 0));
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
+}
+
+APRBase::~APRBase(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+ apr_pool_destroy(pool);
+ apr_terminate();
+}
+
+bool APRBase::_increment(){
+ bool deleted(false);
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+ if(this == instance){
+ count++;
+ }else{
+ deleted = true;
+ }
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+ return !deleted;
+}
+
+void APRBase::_decrement(){
+ APRBase* copy = 0;
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+ if(--count == 0){
+ copy = instance;
+ instance = 0;
+ }
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+ if(copy != 0){
+ delete copy;
+ }
+}
+
+void APRBase::increment(){
+ int count = 0;
+ while(count++ < 2 && !getInstance()->_increment()){
+ std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl;
+ }
+}
+
+void APRBase::decrement(){
+ getInstance()->_decrement();
+}
+
+void qpid::concurrent::check(apr_status_t status, const std::string& file, const int line){
+ if (status != APR_SUCCESS){
+ const int size = 50;
+ char tmp[size];
+ std::string msg(apr_strerror(status, tmp, size));
+ throw QpidError(APR_ERROR + ((int) status), msg, file, line);
+ }
+}
+
+std::string qpid::concurrent::get_desc(apr_status_t status){
+ const int size = 50;
+ char tmp[size];
+ std::string msg(apr_strerror(status, tmp, size));
+ return msg;
+}
+
diff --git a/cpp/common/concurrent/src/APRMonitor.cpp b/cpp/common/concurrent/src/APRMonitor.cpp
new file mode 100644
index 0000000000..428d76dff9
--- /dev/null
+++ b/cpp/common/concurrent/src/APRMonitor.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "APRBase.h"
+#include "APRMonitor.h"
+#include <iostream>
+
+qpid::concurrent::APRMonitor::APRMonitor(){
+ APRBase::increment();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
+ CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool));
+}
+
+qpid::concurrent::APRMonitor::~APRMonitor(){
+ CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+ apr_pool_destroy(pool);
+ APRBase::decrement();
+}
+
+void qpid::concurrent::APRMonitor::wait(){
+ CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
+}
+
+
+void qpid::concurrent::APRMonitor::wait(u_int64_t time){
+ apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000);
+ if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);
+}
+
+void qpid::concurrent::APRMonitor::notify(){
+ CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
+}
+
+void qpid::concurrent::APRMonitor::notifyAll(){
+ CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
+}
+
+void qpid::concurrent::APRMonitor::acquire(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+}
+
+void qpid::concurrent::APRMonitor::release(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+}
diff --git a/cpp/common/concurrent/src/APRThread.cpp b/cpp/common/concurrent/src/APRThread.cpp
new file mode 100644
index 0000000000..4202fe81b6
--- /dev/null
+++ b/cpp/common/concurrent/src/APRThread.cpp
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "APRBase.h"
+#include "APRThread.h"
+#include "apr_portable.h"
+
+using namespace qpid::concurrent;
+
+void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){
+ ((Runnable*) data)->run();
+ CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS));
+ return NULL;
+}
+
+APRThread::APRThread(apr_pool_t* _pool, Runnable* _runnable) : pool(_pool), runnable(_runnable){}
+
+APRThread::~APRThread(){
+}
+
+void APRThread::start(){
+ CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool));
+}
+
+void APRThread::join(){
+ apr_status_t status;
+ CHECK_APR_SUCCESS(apr_thread_join(&status, runner));
+}
+
+void APRThread::interrupt(){
+ CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS));
+}
+
+unsigned int qpid::concurrent::APRThread::currentThread(){
+ return apr_os_thread_current();
+}
diff --git a/cpp/common/concurrent/src/APRThreadFactory.cpp b/cpp/common/concurrent/src/APRThreadFactory.cpp
new file mode 100644
index 0000000000..9ba68e9e56
--- /dev/null
+++ b/cpp/common/concurrent/src/APRThreadFactory.cpp
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "APRBase.h"
+#include "APRThreadFactory.h"
+
+using namespace qpid::concurrent;
+
+APRThreadFactory::APRThreadFactory(){
+ APRBase::increment();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+}
+
+APRThreadFactory::~APRThreadFactory(){
+ apr_pool_destroy(pool);
+ APRBase::decrement();
+}
+
+Thread* APRThreadFactory::create(Runnable* runnable){
+ return new APRThread(pool, runnable);
+}
diff --git a/cpp/common/concurrent/src/APRThreadPool.cpp b/cpp/common/concurrent/src/APRThreadPool.cpp
new file mode 100644
index 0000000000..e0fcb804e6
--- /dev/null
+++ b/cpp/common/concurrent/src/APRThreadPool.cpp
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "APRThreadFactory.h"
+#include "APRThreadPool.h"
+#include "QpidError.h"
+#include <iostream>
+
+using namespace qpid::concurrent;
+
+APRThreadPool::APRThreadPool(int _size) : size(_size), factory(new APRThreadFactory()),
+ deleteFactory(true), running(false){
+ worker = new Worker(this);
+}
+
+APRThreadPool::APRThreadPool(int _size, ThreadFactory* _factory) : size(_size), factory(_factory),
+ deleteFactory(false), running(false){
+ worker = new Worker(this);
+}
+
+APRThreadPool::~APRThreadPool(){
+ if(deleteFactory) delete factory;
+}
+
+void APRThreadPool::addTask(Runnable* task){
+ lock.acquire();
+ tasks.push(task);
+ lock.notifyAll();
+ lock.release();
+}
+
+void APRThreadPool::runTask(){
+ lock.acquire();
+ while(tasks.empty()){
+ lock.wait();
+ }
+ Runnable* task = tasks.front();
+ tasks.pop();
+ lock.release();
+ try{
+ task->run();
+ }catch(qpid::QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+}
+
+void APRThreadPool::start(){
+ if(!running){
+ running = true;
+ for(int i = 0; i < size; i++){
+ Thread* t = factory->create(worker);
+ t->start();
+ threads.push_back(t);
+ }
+ }
+}
+
+void APRThreadPool::stop(){
+ if(!running){
+ running = false;
+ lock.acquire();
+ lock.notifyAll();
+ lock.release();
+ for(int i = 0; i < size; i++){
+ threads[i]->join();
+ delete threads[i];
+ }
+ }
+}
+
+