summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ProducerConsumerTest.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-26 15:47:23 +0000
committerAlan Conway <aconway@apache.org>2007-07-26 15:47:23 +0000
commit6d0dba0db1febf5cfda414c8549a09c235a499ab (patch)
tree5794af5cdb6163a01e8e214a37a3b1e42d7e63a6 /qpid/cpp/src/tests/ProducerConsumerTest.cpp
parent609cb1359a12e7cc7f476d4a5e8a05bdcb9a3c22 (diff)
downloadqpid-python-6d0dba0db1febf5cfda414c8549a09c235a499ab.tar.gz
* README: Instructions for openais install.
* configure.ac: Enable clustering if suitable openais is present. * src/tests/Cluster.cpp, .h, Cluster_child: Updated for 0-10 * src/qpid/sys/ConcurrentQueue.h: Added waitPop() * src/Makefile.am, src/qpid/sys/ThreadSafeQueue.h, ProducerConsumer.h: Removed unused code, ConcurrentQueue provides same functionality. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@559859 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/ProducerConsumerTest.cpp')
-rw-r--r--qpid/cpp/src/tests/ProducerConsumerTest.cpp284
1 files changed, 0 insertions, 284 deletions
diff --git a/qpid/cpp/src/tests/ProducerConsumerTest.cpp b/qpid/cpp/src/tests/ProducerConsumerTest.cpp
deleted file mode 100644
index 789e365a85..0000000000
--- a/qpid/cpp/src/tests/ProducerConsumerTest.cpp
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <vector>
-#include <iostream>
-
-#include <boost/bind.hpp>
-
-#include "qpid_test_plugin.h"
-#include "InProcessBroker.h"
-#include "qpid/sys/ProducerConsumer.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/sys/AtomicCount.h"
-
-using namespace qpid;
-using namespace sys;
-using namespace framing;
-using namespace boost;
-using namespace std;
-
-/** A counter that notifies a monitor when changed */
-class WatchedCounter : public Monitor {
- public:
- WatchedCounter(int i=0) : count(i) {}
- WatchedCounter(const WatchedCounter& c) : Monitor(), count(int(c)) {}
-
- WatchedCounter& operator=(const WatchedCounter& x) {
- return *this = int(x);
- }
-
- WatchedCounter& operator=(int i) {
- Lock l(*this);
- count = i;
- return *this;
- }
-
- int operator++() {
- Lock l(*this);
- notifyAll();
- return ++count;
- }
-
- int operator++(int) {
- Lock l(*this);
- notifyAll();
- return count++;
- }
-
- bool operator==(int i) const {
- Lock l(const_cast<WatchedCounter&>(*this));
- return i == count;
- }
-
- operator int() const {
- Lock l(const_cast<WatchedCounter&>(*this));
- return count;
- }
-
- bool waitFor(int i, Duration timeout=TIME_SEC) {
- Lock l(*this);
- AbsTime deadline(now(), timeout);
- while (count != i) {
- if (!wait(deadline))
- return false;
- }
- assert(count == i);
- return true;
- }
-
- private:
- typedef Mutex::ScopedLock Lock;
- int count;
-};
-
-class ProducerConsumerTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(ProducerConsumerTest);
- CPPUNIT_TEST(testProduceConsume);
- CPPUNIT_TEST(testTimeout);
- CPPUNIT_TEST(testShutdown);
- CPPUNIT_TEST(testCancel);
- CPPUNIT_TEST_SUITE_END();
-
- public:
- client::InProcessBrokerClient client;
- ProducerConsumer pc;
-
- WatchedCounter shutdown;
- WatchedCounter timeout;
- WatchedCounter consumed;
- WatchedCounter produced;
-
- struct ConsumeRunnable : public Runnable {
- ProducerConsumerTest& test;
- ConsumeRunnable(ProducerConsumerTest& test_) : test(test_) {}
- void run() { test.consume(); }
- };
-
- struct ConsumeTimeoutRunnable : public Runnable {
- ProducerConsumerTest& test;
- Duration timeout;
- ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Duration& t)
- : test(test_), timeout(t) {}
- void run() { test.consumeTimeout(timeout); }
- };
-
-
- void consumeInternal(ProducerConsumer::ConsumerLock& consumer) {
- if (pc.isShutdown()) {
- ++shutdown;
- return;
- }
- if (consumer.isTimedOut()) {
- ++timeout;
- return;
- }
- CPPUNIT_ASSERT(consumer.isOk());
- CPPUNIT_ASSERT(pc.available() > 0);
- consumer.confirm();
- consumed++;
- }
-
- void consume() {
- ProducerConsumer::ConsumerLock consumer(pc);
- consumeInternal(consumer);
- };
-
- void consumeTimeout(const Duration& timeout) {
- ProducerConsumer::ConsumerLock consumer(pc, timeout);
- consumeInternal(consumer);
- };
-
- void produce() {
- ProducerConsumer::ProducerLock producer(pc);
- CPPUNIT_ASSERT(producer.isOk());
- producer.confirm();
- produced++;
- }
-
- void join(vector<Thread>& threads) {
- for_each(threads.begin(), threads.end(), bind(&Thread::join,_1));
- }
-
- vector<Thread> startThreads(size_t n, Runnable& runnable) {
- vector<Thread> threads(n);
- while (n > 0)
- threads[--n] = Thread(runnable);
- return threads;
- }
-
-public:
- ProducerConsumerTest() : client() {}
-
- void testProduceConsume() {
- ConsumeRunnable runMe(*this);
- produce();
- produce();
- CPPUNIT_ASSERT(produced.waitFor(2));
- vector<Thread> threads = startThreads(1, runMe);
- CPPUNIT_ASSERT(consumed.waitFor(1));
- join(threads);
-
- threads = startThreads(1, runMe);
- CPPUNIT_ASSERT(consumed.waitFor(2));
- join(threads);
-
- threads = startThreads(3, runMe);
- produce();
- produce();
- CPPUNIT_ASSERT(consumed.waitFor(4));
- produce();
- CPPUNIT_ASSERT(consumed.waitFor(5));
- join(threads);
- CPPUNIT_ASSERT_EQUAL(0, int(shutdown));
- }
-
- void testTimeout() {
- try {
- // 0 timeout no items available throws exception
- ProducerConsumer::ConsumerLock consumer(pc, 0);
- CPPUNIT_FAIL("Expected exception");
- } catch(...){}
-
- produce();
- CPPUNIT_ASSERT(produced.waitFor(1));
- CPPUNIT_ASSERT_EQUAL(1, int(pc.available()));
- {
- // 0 timeout succeeds if there's an item available.
- ProducerConsumer::ConsumerLock consume(pc, 0);
- CPPUNIT_ASSERT(consume.isOk());
- consume.confirm();
- }
- CPPUNIT_ASSERT_EQUAL(0, int(pc.available()));
-
- // Produce an item within the timeout.
- ConsumeTimeoutRunnable runMe(*this, 2*TIME_SEC);
- vector<Thread> threads = startThreads(1, runMe);
- produce();
- CPPUNIT_ASSERT(consumed.waitFor(1));
- join(threads);
- }
-
-
- void testShutdown() {
- ConsumeRunnable runMe(*this);
- vector<Thread> threads = startThreads(2, runMe);
- while (pc.consumers() != 2)
- Thread::yield();
- pc.shutdown();
- CPPUNIT_ASSERT(shutdown.waitFor(2));
- join(threads);
-
- threads = startThreads(1, runMe); // Should shutdown immediately.
- CPPUNIT_ASSERT(shutdown.waitFor(3));
- join(threads);
-
- // Produce/consume while shutdown should return isShutdown and
- // throw on confirm.
- try {
- ProducerConsumer::ProducerLock p(pc);
- CPPUNIT_ASSERT(pc.isShutdown());
- CPPUNIT_FAIL("Expected exception");
- }
- catch (...) {} // Expected
- try {
- ProducerConsumer::ConsumerLock c(pc);
- CPPUNIT_ASSERT(pc.isShutdown());
- CPPUNIT_FAIL("Expected exception");
- }
- catch (...) {} // Expected
- }
-
- void testCancel() {
- CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
- {
- ProducerConsumer::ProducerLock p(pc);
- CPPUNIT_ASSERT(p.isOk());
- p.cancel();
- }
- // Nothing was produced.
- CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
- {
- ProducerConsumer::ConsumerLock c(pc, 0);
- CPPUNIT_ASSERT(c.isTimedOut());
- }
- // Now produce but cancel the consume
- {
- ProducerConsumer::ProducerLock p(pc);
- CPPUNIT_ASSERT(p.isOk());
- p.confirm();
- }
- CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
- {
- ProducerConsumer::ConsumerLock c(pc);
- CPPUNIT_ASSERT(c.isOk());
- c.cancel();
- }
- CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
- }
-};
-
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(ProducerConsumerTest);
-