summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-13 15:53:10 +0000
committerAlan Conway <aconway@apache.org>2007-07-13 15:53:10 +0000
commit2b345eb0d9f03db3a0b14fe759dc696e508b6c01 (patch)
tree57172971526ffa7e9f172ec99d4bc2f7b460b41b
parentc01f57205a289c5af528e7401c1dd93d2cff18fe (diff)
downloadqpid-python-2b345eb0d9f03db3a0b14fe759dc696e508b6c01.tar.gz
* src/qpid/sys/ConcurrentQueue.h: Thread-safe queue with atomic pop()
* src/tests/ConcurrentQueue.cpp: Experimental code to compare a dual-vector, dual-lock implementation with a simple locked deque. Results indicate the more complex design does not perform any better, so ConcurrentQueue.h uses the simpler design. Not part of default test harness, run by hand to see results. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@556045 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/sys/ConcurrentQueue.h67
-rw-r--r--cpp/src/tests/ConcurrentQueue.cpp208
-rw-r--r--cpp/src/tests/Makefile.am4
3 files changed, 279 insertions, 0 deletions
diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h
new file mode 100644
index 0000000000..dd7689666b
--- /dev/null
+++ b/cpp/src/qpid/sys/ConcurrentQueue.h
@@ -0,0 +1,67 @@
+#ifndef QPID_SYS_CONCURRENTQUEUE_H
+#define QPID_SYS_CONCURRENTQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+
+#include <deque>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Thread-safe queue that allows threads to push items onto
+ * the queue concurrently with threads popping items off the
+ * queue.
+ */
+template <class T> class ConcurrentQueue {
+ public:
+ /** Push a data item onto the back of the queue */
+ void push(const T& data) {
+ Mutex::ScopedLock l(lock);
+ queue.push_back(data);
+ }
+
+ /** If the queue is non-empty, pop the front item into data and
+ * return true. If the queue is empty, return false
+ */
+ bool pop(T& data) {
+ Mutex::ScopedLock l(lock);
+ if (queue.empty())
+ return false;
+ else {
+ data = queue.front();
+ queue.pop_front();
+ return true;
+ }
+ }
+
+ private:
+ Mutex lock;
+ std::deque<T> queue;
+};
+
+}} // namespace qpid::sys
+
+
+#endif /*!QPID_SYS_CONCURRENTQUEUE_H*/
diff --git a/cpp/src/tests/ConcurrentQueue.cpp b/cpp/src/tests/ConcurrentQueue.cpp
new file mode 100644
index 0000000000..e1adcce0f9
--- /dev/null
+++ b/cpp/src/tests/ConcurrentQueue.cpp
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**@file
+ * Compare alternative implementations for ConcurrentQueue.
+ */
+
+#include "qpid/sys/ConcurrentQueue.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Time.h"
+
+#include <boost/test/test_tools.hpp>
+#include <boost/bind.hpp>
+
+#include <deque>
+#include <vector>
+#include <iostream>
+
+#include "time.h"
+
+using namespace qpid::sys;
+using namespace std;
+
+template <class T> class DualVectorDualLockQueue {
+ public:
+ /** Optionally specify initial capacity of the queue to minimize
+ * re-allocation.
+ */
+ DualVectorDualLockQueue(size_t capacity=16) {
+ pushVec.reserve(capacity);
+ popVec.reserve(capacity);
+ popIter = popVec.end();
+ }
+
+ /** Push a data item onto the back of the queue */
+ void push(const T& data) {
+ Mutex::ScopedLock l(pushLock);
+ pushVec.push_back(data);
+ }
+
+ /** If the queue is non-empty, pop the front item into data and
+ * return true. If the queue is empty, return false
+ */
+ bool pop(T& data) {
+ Mutex::ScopedLock l(popLock);
+ if (popIter == popVec.end()) {
+ popVec.clear();
+ Mutex::ScopedLock l(pushLock);
+ pushVec.swap(popVec);
+ popIter = popVec.begin();
+ }
+ if (popIter == popVec.end())
+ return false;
+ else {
+ data = *popIter++;
+ return true;
+ }
+ }
+
+ private:
+ Mutex pushLock, popLock;
+ std::vector<T> pushVec, popVec;
+ typename std::vector<T>::iterator popIter;
+};
+
+template <class T> struct LockedDequeQueue : public ConcurrentQueue<T> {
+ /** size_t ignored, can't pre-allocate space in a dequeue */
+ LockedDequeQueue(size_t=0) {};
+};
+
+// ================ Test code.
+
+/** Pause by sleeping */
+void nsleep(const Duration& delay) {
+ static Monitor m;
+ AbsTime stop(now(), delay);
+ while (now() < stop)
+ m.wait(stop);
+}
+
+/** Pause by spinning */
+void nspin(const Duration& delay) {
+ AbsTime stop(now(), delay);
+ while (now() < stop)
+ ;
+}
+
+/** Unlocked fake queue for comparison */
+struct NullQueue {
+ NullQueue(int items=0) : npush(items), npop(items) {}
+ void push(int) { --npush; }
+ bool pop(int& n) {
+ if (npop == 0)
+ return false;
+ else {
+ n=npop--;
+ return true;
+ }
+ }
+ volatile int npush, npop;
+};
+
+
+// Global test parameters.
+int items;
+Duration delay(0);
+boost::function<void()> npause;
+
+template <class Q>
+struct Pusher : public Runnable {
+ Pusher(Q& q) : queue(q) {}
+ void run() {
+ for (int i=items; i > 0; i--) {
+ queue.push(i);
+ npause();
+ }
+ }
+ Q& queue;
+};
+
+template <class Q>
+struct Popper : public Runnable {
+ Popper(Q& q) : queue(q) {}
+ void run() {
+ for (int i=items; i > 0; i--) {
+ int n;
+ if (queue.pop(n))
+ BOOST_REQUIRE_EQUAL(i,n);
+ npause();
+ }
+ }
+ Q& queue;
+};
+
+ostream& operator<<(ostream& out, const Duration& d) {
+ return out << double(d)/TIME_MSEC << " msecs";
+}
+
+void report(const char* s, const Duration &d) {
+ cout << s << ": " << d
+ << " (" << (double(items)*TIME_SEC)/d << " push-pops/sec" << ")"
+ << endl;
+}
+
+template <class Q, class PusherT=Pusher<Q>, class PopperT=Popper<Q> >
+struct Timer {
+ static Duration time() {
+ cout << endl << "==" << typeid(Q).name() << endl;
+
+ Q queue(items);
+ PusherT pusher(queue);
+ PopperT popper(queue);
+
+ // Serial
+ AbsTime start=now();
+ pusher.run();
+ popper.run();
+ Duration serial(start,now());
+ report ("Serial", serial);
+
+ // Concurrent
+ start=now();
+ Thread pushThread(pusher);
+ Thread popThread(popper);
+ pushThread.join();
+ popThread.join();
+ Duration concurrent(start,now());
+ report ("Concurrent", concurrent);
+
+ cout << "Serial/concurrent: " << double(serial)/concurrent << endl;
+ return concurrent;
+ }
+};
+
+int test_main(int argc, char** argv) {
+ items = (argc > 1) ? atoi(argv[1]) : 250*1000;
+ delay = (argc > 2) ? atoi(argv[2]) : 4*1000;
+ npause=boost::bind(nspin, delay);
+
+ cout << "Push/pop " << items << " items, delay=" << delay << endl;
+ Timer<NullQueue>::time();
+ Duration dv = Timer<DualVectorDualLockQueue<int> >::time();
+ Duration d = Timer<LockedDequeQueue<int> >::time();
+ cout << endl;
+ cout << "Ratio deque/dual vector=" << double(d)/dv << endl;
+ return 0;
+}
+// namespace
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index ee1a7317e0..64543268ee 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -42,6 +42,10 @@ check_PROGRAMS+=Shlib
Shlib_SOURCES=Shlib.cpp
Shlib_LDADD=-lboost_unit_test_framework $(lib_common)
+check_PROGRAMS+=ConcurrentQueue
+ConcurrentQueue_SOURCES=ConcurrentQueue.cpp
+ConcurrentQueue_LDADD=-lboost_test_exec_monitor $(lib_common)
+
include cluster.mk
# NB: CppUnit test libraries below will be migrated to boost test programs.