summaryrefslogtreecommitdiff
path: root/cpp/src/tests/ConcurrentQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/ConcurrentQueue.cpp')
-rw-r--r--cpp/src/tests/ConcurrentQueue.cpp208
1 files changed, 208 insertions, 0 deletions
diff --git a/cpp/src/tests/ConcurrentQueue.cpp b/cpp/src/tests/ConcurrentQueue.cpp
new file mode 100644
index 0000000000..e1adcce0f9
--- /dev/null
+++ b/cpp/src/tests/ConcurrentQueue.cpp
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**@file
+ * Compare alternative implementations for ConcurrentQueue.
+ */
+
+#include "qpid/sys/ConcurrentQueue.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Time.h"
+
+#include <boost/test/test_tools.hpp>
+#include <boost/bind.hpp>
+
+#include <deque>
+#include <vector>
+#include <iostream>
+
+#include "time.h"
+
+using namespace qpid::sys;
+using namespace std;
+
+template <class T> class DualVectorDualLockQueue {
+ public:
+ /** Optionally specify initial capacity of the queue to minimize
+ * re-allocation.
+ */
+ DualVectorDualLockQueue(size_t capacity=16) {
+ pushVec.reserve(capacity);
+ popVec.reserve(capacity);
+ popIter = popVec.end();
+ }
+
+ /** Push a data item onto the back of the queue */
+ void push(const T& data) {
+ Mutex::ScopedLock l(pushLock);
+ pushVec.push_back(data);
+ }
+
+ /** If the queue is non-empty, pop the front item into data and
+ * return true. If the queue is empty, return false
+ */
+ bool pop(T& data) {
+ Mutex::ScopedLock l(popLock);
+ if (popIter == popVec.end()) {
+ popVec.clear();
+ Mutex::ScopedLock l(pushLock);
+ pushVec.swap(popVec);
+ popIter = popVec.begin();
+ }
+ if (popIter == popVec.end())
+ return false;
+ else {
+ data = *popIter++;
+ return true;
+ }
+ }
+
+ private:
+ Mutex pushLock, popLock;
+ std::vector<T> pushVec, popVec;
+ typename std::vector<T>::iterator popIter;
+};
+
+template <class T> struct LockedDequeQueue : public ConcurrentQueue<T> {
+ /** size_t ignored, can't pre-allocate space in a dequeue */
+ LockedDequeQueue(size_t=0) {};
+};
+
+// ================ Test code.
+
+/** Pause by sleeping */
+void nsleep(const Duration& delay) {
+ static Monitor m;
+ AbsTime stop(now(), delay);
+ while (now() < stop)
+ m.wait(stop);
+}
+
+/** Pause by spinning */
+void nspin(const Duration& delay) {
+ AbsTime stop(now(), delay);
+ while (now() < stop)
+ ;
+}
+
+/** Unlocked fake queue for comparison */
+struct NullQueue {
+ NullQueue(int items=0) : npush(items), npop(items) {}
+ void push(int) { --npush; }
+ bool pop(int& n) {
+ if (npop == 0)
+ return false;
+ else {
+ n=npop--;
+ return true;
+ }
+ }
+ volatile int npush, npop;
+};
+
+
+// Global test parameters.
+int items;
+Duration delay(0);
+boost::function<void()> npause;
+
+template <class Q>
+struct Pusher : public Runnable {
+ Pusher(Q& q) : queue(q) {}
+ void run() {
+ for (int i=items; i > 0; i--) {
+ queue.push(i);
+ npause();
+ }
+ }
+ Q& queue;
+};
+
+template <class Q>
+struct Popper : public Runnable {
+ Popper(Q& q) : queue(q) {}
+ void run() {
+ for (int i=items; i > 0; i--) {
+ int n;
+ if (queue.pop(n))
+ BOOST_REQUIRE_EQUAL(i,n);
+ npause();
+ }
+ }
+ Q& queue;
+};
+
+ostream& operator<<(ostream& out, const Duration& d) {
+ return out << double(d)/TIME_MSEC << " msecs";
+}
+
+void report(const char* s, const Duration &d) {
+ cout << s << ": " << d
+ << " (" << (double(items)*TIME_SEC)/d << " push-pops/sec" << ")"
+ << endl;
+}
+
+template <class Q, class PusherT=Pusher<Q>, class PopperT=Popper<Q> >
+struct Timer {
+ static Duration time() {
+ cout << endl << "==" << typeid(Q).name() << endl;
+
+ Q queue(items);
+ PusherT pusher(queue);
+ PopperT popper(queue);
+
+ // Serial
+ AbsTime start=now();
+ pusher.run();
+ popper.run();
+ Duration serial(start,now());
+ report ("Serial", serial);
+
+ // Concurrent
+ start=now();
+ Thread pushThread(pusher);
+ Thread popThread(popper);
+ pushThread.join();
+ popThread.join();
+ Duration concurrent(start,now());
+ report ("Concurrent", concurrent);
+
+ cout << "Serial/concurrent: " << double(serial)/concurrent << endl;
+ return concurrent;
+ }
+};
+
+int test_main(int argc, char** argv) {
+ items = (argc > 1) ? atoi(argv[1]) : 250*1000;
+ delay = (argc > 2) ? atoi(argv[2]) : 4*1000;
+ npause=boost::bind(nspin, delay);
+
+ cout << "Push/pop " << items << " items, delay=" << delay << endl;
+ Timer<NullQueue>::time();
+ Duration dv = Timer<DualVectorDualLockQueue<int> >::time();
+ Duration d = Timer<LockedDequeQueue<int> >::time();
+ cout << endl;
+ cout << "Ratio deque/dual vector=" << double(d)/dv << endl;
+ return 0;
+}
+// namespace