summaryrefslogtreecommitdiff
path: root/qpid/cpp/tests/ProducerConsumerTest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/tests/ProducerConsumerTest.cpp')
-rw-r--r--qpid/cpp/tests/ProducerConsumerTest.cpp283
1 files changed, 283 insertions, 0 deletions
diff --git a/qpid/cpp/tests/ProducerConsumerTest.cpp b/qpid/cpp/tests/ProducerConsumerTest.cpp
new file mode 100644
index 0000000000..e6d4090596
--- /dev/null
+++ b/qpid/cpp/tests/ProducerConsumerTest.cpp
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <vector>
+#include <iostream>
+
+#include <boost/bind.hpp>
+
+#include <qpid_test_plugin.h>
+#include "InProcessBroker.h"
+#include "sys/ProducerConsumer.h"
+#include "sys/Thread.h"
+#include "AMQP_HighestVersion.h"
+#include "sys/AtomicCount.h"
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace boost;
+using namespace std;
+
+/** A counter that notifies a monitor when changed */
+class WatchedCounter : public Monitor {
+ public:
+ WatchedCounter(int i=0) : count(i) {}
+ WatchedCounter(const WatchedCounter& c) : Monitor(), count(int(c)) {}
+
+ WatchedCounter& operator=(const WatchedCounter& x) {
+ return *this = int(x);
+ }
+
+ WatchedCounter& operator=(int i) {
+ Lock l(*this);
+ count = i;
+ return *this;
+ }
+
+ int operator++() {
+ Lock l(*this);
+ notifyAll();
+ return ++count;
+ }
+
+ int operator++(int) {
+ Lock l(*this);
+ notifyAll();
+ return count++;
+ }
+
+ bool operator==(int i) const {
+ Lock l(const_cast<WatchedCounter&>(*this));
+ return i == count;
+ }
+
+ operator int() const {
+ Lock l(const_cast<WatchedCounter&>(*this));
+ return count;
+ }
+
+ bool waitFor(int i, Time timeout=TIME_SEC) {
+ Lock l(*this);
+ Time deadline = timeout+now();
+ while (count != i) {
+ if (!wait(deadline))
+ return false;
+ }
+ assert(count == i);
+ return true;
+ }
+
+ private:
+ typedef Mutex::ScopedLock Lock;
+ int count;
+};
+
+class ProducerConsumerTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(ProducerConsumerTest);
+ CPPUNIT_TEST(testProduceConsume);
+ CPPUNIT_TEST(testTimeout);
+ CPPUNIT_TEST(testStop);
+ CPPUNIT_TEST(testCancel);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+ InProcessBrokerClient client;
+ ProducerConsumer pc;
+
+ WatchedCounter stopped;
+ WatchedCounter timeout;
+ WatchedCounter consumed;
+ WatchedCounter produced;
+
+ struct ConsumeRunnable : public Runnable {
+ ProducerConsumerTest& test;
+ ConsumeRunnable(ProducerConsumerTest& test_) : test(test_) {}
+ void run() { test.consume(); }
+ };
+
+ struct ConsumeTimeoutRunnable : public Runnable {
+ ProducerConsumerTest& test;
+ Time timeout;
+ ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Time& t)
+ : test(test_), timeout(t) {}
+ void run() { test.consumeTimeout(timeout); }
+ };
+
+
+ void consumeInternal(ProducerConsumer::ConsumerLock& consumer) {
+ if (pc.isStopped()) {
+ ++stopped;
+ return;
+ }
+ if (consumer.isTimedOut()) {
+ ++timeout;
+ return;
+ }
+ CPPUNIT_ASSERT(consumer.isOk());
+ CPPUNIT_ASSERT(pc.available() > 0);
+ consumer.confirm();
+ consumed++;
+ }
+
+ void consume() {
+ ProducerConsumer::ConsumerLock consumer(pc);
+ consumeInternal(consumer);
+ };
+
+ void consumeTimeout(const Time& timeout) {
+ ProducerConsumer::ConsumerLock consumer(pc, timeout);
+ consumeInternal(consumer);
+ };
+
+ void produce() {
+ ProducerConsumer::ProducerLock producer(pc);
+ CPPUNIT_ASSERT(producer.isOk());
+ producer.confirm();
+ produced++;
+ }
+
+ void join(vector<Thread>& threads) {
+ for_each(threads.begin(), threads.end(), bind(&Thread::join,_1));
+ }
+
+ vector<Thread> startThreads(size_t n, Runnable& runnable) {
+ vector<Thread> threads(n);
+ while (n > 0)
+ threads[--n] = Thread(runnable);
+ return threads;
+ }
+
+public:
+ ProducerConsumerTest() : client(highestProtocolVersion) {}
+
+ void testProduceConsume() {
+ ConsumeRunnable runMe(*this);
+ produce();
+ produce();
+ CPPUNIT_ASSERT(produced.waitFor(2));
+ vector<Thread> threads = startThreads(1, runMe);
+ CPPUNIT_ASSERT(consumed.waitFor(1));
+ join(threads);
+
+ threads = startThreads(1, runMe);
+ CPPUNIT_ASSERT(consumed.waitFor(2));
+ join(threads);
+
+ threads = startThreads(3, runMe);
+ produce();
+ produce();
+ CPPUNIT_ASSERT(consumed.waitFor(4));
+ produce();
+ CPPUNIT_ASSERT(consumed.waitFor(5));
+ join(threads);
+ CPPUNIT_ASSERT_EQUAL(0, int(stopped));
+ }
+
+ void testTimeout() {
+ try {
+ // 0 timeout no items available throws exception
+ ProducerConsumer::ConsumerLock consumer(pc, 0);
+ CPPUNIT_FAIL("Expected exception");
+ } catch(...){}
+
+ produce();
+ CPPUNIT_ASSERT(produced.waitFor(1));
+ CPPUNIT_ASSERT_EQUAL(1, int(pc.available()));
+ {
+ // 0 timeout succeeds if there's an item available.
+ ProducerConsumer::ConsumerLock consume(pc, 0);
+ CPPUNIT_ASSERT(consume.isOk());
+ consume.confirm();
+ }
+ CPPUNIT_ASSERT_EQUAL(0, int(pc.available()));
+
+ // Produce an item within the timeout.
+ ConsumeTimeoutRunnable runMe(*this, 2*TIME_SEC);
+ vector<Thread> threads = startThreads(1, runMe);
+ produce();
+ CPPUNIT_ASSERT(consumed.waitFor(1));
+ join(threads);
+ }
+
+
+ void testStop() {
+ ConsumeRunnable runMe(*this);
+ vector<Thread> threads = startThreads(2, runMe);
+ while (pc.consumers() != 2)
+ Thread::yield();
+ pc.stop();
+ CPPUNIT_ASSERT(stopped.waitFor(2));
+ join(threads);
+
+ threads = startThreads(1, runMe); // Should stop immediately.
+ CPPUNIT_ASSERT(stopped.waitFor(3));
+ join(threads);
+
+ // Produce/consume while stopped should return isStopped and
+ // throw on confirm.
+ try {
+ ProducerConsumer::ProducerLock p(pc);
+ CPPUNIT_ASSERT(pc.isStopped());
+ CPPUNIT_FAIL("Expected exception");
+ }
+ catch (...) {} // Expected
+ try {
+ ProducerConsumer::ConsumerLock c(pc);
+ CPPUNIT_ASSERT(pc.isStopped());
+ CPPUNIT_FAIL("Expected exception");
+ }
+ catch (...) {} // Expected
+ }
+
+ void testCancel() {
+ CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
+ {
+ ProducerConsumer::ProducerLock p(pc);
+ CPPUNIT_ASSERT(p.isOk());
+ p.cancel();
+ }
+ // Nothing was produced.
+ CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
+ {
+ ProducerConsumer::ConsumerLock c(pc, 0);
+ CPPUNIT_ASSERT(c.isTimedOut());
+ }
+ // Now produce but cancel the consume
+ {
+ ProducerConsumer::ProducerLock p(pc);
+ CPPUNIT_ASSERT(p.isOk());
+ p.confirm();
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
+ {
+ ProducerConsumer::ConsumerLock c(pc);
+ CPPUNIT_ASSERT(c.isOk());
+ c.cancel();
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
+ }
+};
+
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ProducerConsumerTest);
+