diff options
author | Alan Conway <aconway@apache.org> | 2007-07-26 15:47:23 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-07-26 15:47:23 +0000 |
commit | 6d0dba0db1febf5cfda414c8549a09c235a499ab (patch) | |
tree | 5794af5cdb6163a01e8e214a37a3b1e42d7e63a6 /qpid/cpp/src | |
parent | 609cb1359a12e7cc7f476d4a5e8a05bdcb9a3c22 (diff) | |
download | qpid-python-6d0dba0db1febf5cfda414c8549a09c235a499ab.tar.gz |
* README: Instructions for openais install.
* configure.ac: Enable clustering if suitable openais is present.
* src/tests/Cluster.cpp, .h, Cluster_child: Updated for 0-10
* src/qpid/sys/ConcurrentQueue.h: Added waitPop()
* src/Makefile.am, src/qpid/sys/ThreadSafeQueue.h, ProducerConsumer.h:
Removed unused code, ConcurrentQueue provides same functionality.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@559859 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/Makefile.am | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/ConcurrentQueue.h | 53 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/ProducerConsumer.cpp | 141 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/ProducerConsumer.h | 165 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/ThreadSafeQueue.h | 98 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Cluster.cpp | 25 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Cluster.h | 30 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Cluster_child.cpp | 21 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 3 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ProducerConsumerTest.cpp | 284 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster.mk | 1 |
11 files changed, 83 insertions, 741 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 55032a9eae..f4e807cf66 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -176,7 +176,6 @@ libqpidcommon_la_SOURCES = \ qpid/sys/Runnable.cpp \ qpid/sys/Shlib.h \ qpid/sys/Shlib.cpp \ - qpid/sys/ProducerConsumer.cpp \ qpid/Options.cpp \ qpid/Options.h \ qpid/log/Options.cpp \ @@ -380,7 +379,6 @@ nobase_include_HEADERS = \ qpid/sys/Monitor.h \ qpid/sys/Mutex.h \ qpid/sys/Poller.h \ - qpid/sys/ProducerConsumer.h \ qpid/sys/Runnable.h \ qpid/sys/ScopedIncrement.h \ qpid/sys/ShutdownHandler.h \ @@ -388,7 +386,6 @@ nobase_include_HEADERS = \ qpid/sys/Thread.h \ qpid/sys/ConcurrentQueue.h \ qpid/sys/Serializer.h \ - qpid/sys/ThreadSafeQueue.h \ qpid/sys/Time.h \ qpid/sys/TimeoutHandler.h \ qpid/Exception.h \ diff --git a/qpid/cpp/src/qpid/sys/ConcurrentQueue.h b/qpid/cpp/src/qpid/sys/ConcurrentQueue.h index dd7689666b..917afc5704 100644 --- a/qpid/cpp/src/qpid/sys/ConcurrentQueue.h +++ b/qpid/cpp/src/qpid/sys/ConcurrentQueue.h @@ -22,7 +22,10 @@ * */ -#include "qpid/sys/Mutex.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/ScopedIncrement.h" + +#include <boost/bind.hpp> #include <deque> @@ -33,9 +36,24 @@ namespace sys { * Thread-safe queue that allows threads to push items onto * the queue concurrently with threads popping items off the * queue. + * + * Also allows consuming threads to wait until an item is available. */ template <class T> class ConcurrentQueue { public: + ConcurrentQueue() : waiters(0), shutdown(false) {} + + /** Threads in wait() are woken with ShutdownException before + * destroying the queue. + */ + ~ConcurrentQueue() { + Mutex::ScopedLock l(lock); + shutdown = true; + lock.notifyAll(); + while (waiters > 0) + lock.wait(); + } + /** Push a data item onto the back of the queue */ void push(const T& data) { Mutex::ScopedLock l(lock); @@ -47,6 +65,28 @@ template <class T> class ConcurrentQueue { */ bool pop(T& data) { Mutex::ScopedLock l(lock); + return popInternal(data); + } + + /** Wait up to deadline for a data item to be available. + *@return true if data was available, false if timed out. + *@throws ShutdownException if the queue is destroyed. + */ + bool waitPop(T& data, Duration timeout) { + Mutex::ScopedLock l(lock); + ScopedIncrement<size_t> w( + waiters, boost::bind(&ConcurrentQueue::noWaiters, this)); + AbsTime deadline(now(), timeout); + while (queue.empty() && lock.wait(deadline)) + ; + return popInternal(data); + } + + private: + + bool popInternal(T& data) { + if (shutdown) + throw ShutdownException(); if (queue.empty()) return false; else { @@ -56,9 +96,16 @@ template <class T> class ConcurrentQueue { } } - private: - Mutex lock; + void noWaiters() { + assert(waiters == 0); + if (shutdown) + lock.notify(); // Notify dtor thread. + } + + Monitor lock; std::deque<T> queue; + size_t waiters; + bool shutdown; }; }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/ProducerConsumer.cpp b/qpid/cpp/src/qpid/sys/ProducerConsumer.cpp deleted file mode 100644 index e892f60794..0000000000 --- a/qpid/cpp/src/qpid/sys/ProducerConsumer.cpp +++ /dev/null @@ -1,141 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - - -#include "qpid/QpidError.h" -#include "ScopedIncrement.h" -#include "ProducerConsumer.h" - -namespace qpid { -namespace sys { - -// // ================ ProducerConsumer - -ProducerConsumer::ProducerConsumer(size_t init_items) - : items(init_items), waiters(0), shutdownFlag(false) -{} - -void ProducerConsumer::shutdown() { - Mutex::ScopedLock l(monitor); - shutdownFlag = true; - monitor.notifyAll(); - // Wait for waiting consumers to wake up. - while (waiters > 0) - monitor.wait(); -} - -size_t ProducerConsumer::available() const { - Mutex::ScopedLock l(monitor); - return items; -} - -size_t ProducerConsumer::consumers() const { - Mutex::ScopedLock l(monitor); - return waiters; -} - -// ================ Lock - -ProducerConsumer::Lock::Lock(ProducerConsumer& p) - : pc(p), lock(p.monitor), status(INCOMPLETE) {} - -bool ProducerConsumer::Lock::isOk() const { - return !pc.isShutdown() && status==INCOMPLETE; -} - -void ProducerConsumer::Lock::checkOk() const { - assert(!pc.isShutdown()); - assert(status == INCOMPLETE); -} - -ProducerConsumer::Lock::~Lock() { - assert(status != INCOMPLETE || pc.isShutdown()); -} - -void ProducerConsumer::Lock::confirm() { - checkOk(); - status = CONFIRMED; -} - -void ProducerConsumer::Lock::cancel() { - checkOk(); - status = CANCELLED; -} - -// ================ ProducerLock - -ProducerConsumer::ProducerLock::ProducerLock(ProducerConsumer& p) : Lock(p) -{} - - -ProducerConsumer::ProducerLock::~ProducerLock() { - if (status == CONFIRMED) { - pc.items++; - pc.monitor.notify(); // Notify a consumer. - } -} - -// ================ ConsumerLock - -ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p) -{ - if (isOk()) { - ScopedIncrement<size_t> inc(pc.waiters); - while (pc.items == 0 && !pc.shutdownFlag) { - pc.monitor.wait(); - } - } -} - -ProducerConsumer::ConsumerLock::ConsumerLock( - ProducerConsumer& p, const Duration& timeout) : Lock(p) -{ - if (isOk()) { - // Don't wait if timeout==0 - if (timeout == 0) { - if (pc.items == 0) - status = TIMEOUT; - return; - } - else { - AbsTime deadline(now(), timeout); - ScopedIncrement<size_t> inc(pc.waiters); - while (pc.items == 0 && !pc.shutdownFlag) { - if (!pc.monitor.wait(deadline)) { - status = TIMEOUT; - return; - } - } - } - } -} - -ProducerConsumer::ConsumerLock::~ConsumerLock() { - if (pc.isShutdown()) { - if (pc.waiters == 0) - pc.monitor.notifyAll(); // Notify shutdown thread(s) - } - else if (status==CONFIRMED) { - pc.items--; - if (pc.items > 0) - pc.monitor.notify(); // Notify another consumer. - } -} - - -}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/ProducerConsumer.h b/qpid/cpp/src/qpid/sys/ProducerConsumer.h deleted file mode 100644 index 2a02dab503..0000000000 --- a/qpid/cpp/src/qpid/sys/ProducerConsumer.h +++ /dev/null @@ -1,165 +0,0 @@ -#ifndef _sys_ProducerConsumer_h -#define _sys_ProducerConsumer_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <boost/noncopyable.hpp> -#include "qpid/Exception.h" -#include "Monitor.h" - -namespace qpid { -namespace sys { - -/** - * Producer-consumer synchronisation. - * - * Producers increase the number of available items, consumers reduce it. - * Consumers wait till an item is available. Waiting threads can be - * woken for shutdown using shutdown(). - * - * Note: Currently implements unbounded producer-consumer, i.e. no limit - * to available items, producers never block. Can be extended to support - * bounded PC if required. - * - // TODO aconway 2007-02-13: example, from tests. -*/ -class ProducerConsumer -{ - public: - ProducerConsumer(size_t init_items=0); - - ~ProducerConsumer() { shutdown(); } - - /** - * Wake any threads waiting for ProducerLock or ConsumerLock. - *@post No threads are waiting in Producer or Consumer locks. - */ - void shutdown(); - - /** True if queue is shutdown */ - bool isShutdown() { return shutdownFlag; } - - /** Number of items available for consumers */ - size_t available() const; - - /** Number of consumers waiting for items */ - size_t consumers() const; - - /** True if available == 0 */ - bool empty() const { return available() == 0; } - - /** - * Base class for producer and consumer locks. - */ - class Lock : private boost::noncopyable { - public: - - /** - * You must call isOk() after creating a lock to verify its state. - * - *@return true means the lock succeeded. You MUST call either - *confirm() or cancel() before the lock goes out of scope. - * - * false means the lock failed - timed out or the - * ProducerConsumer is shutdown. You should not do anything in - * the scope of the lock. - */ - bool isOk() const; - - /** - * Confirm that an item was produced/consumed. - *@pre isOk() - */ - void confirm(); - - /** - * Cancel the lock to indicate nothing was produced/consumed. - * Note that locks are not actually released until destroyed. - * - *@pre isOk() - */ - void cancel(); - - /** True if this lock experienced a timeout */ - bool isTimedOut() const { return status == TIMEOUT; } - - /** True if we have been shutdown */ - bool isShutdown() const { return pc.isShutdown(); } - - ProducerConsumer& pc; - - protected: - /** Lock status */ - enum Status { INCOMPLETE, CONFIRMED, CANCELLED, TIMEOUT }; - - Lock(ProducerConsumer& p); - ~Lock(); - void checkOk() const; - Mutex::ScopedLock lock; - Status status; - }; - - /** Lock for code that produces items. */ - struct ProducerLock : public Lock { - /** - * Acquire locks to produce an item. - *@post If isOk() the calling thread has exclusive access - * to produce an item. - */ - ProducerLock(ProducerConsumer& p); - - /** Release locks, signal waiting consumers if confirm() was called. */ - ~ProducerLock(); - }; - - /** Lock for code that consumes items */ - struct ConsumerLock : public Lock { - /** - * Wait for an item to consume and acquire locks. - * - *@post If isOk() there is at least one item available and the - *calling thread has exclusive access to consume it. - */ - ConsumerLock(ProducerConsumer& p); - - /** - * Wait up to timeout to acquire lock. - *@post If isOk() caller has a producer lock. - * If isTimedOut() there was a timeout. - * If neither then we were shutdown. - */ - ConsumerLock(ProducerConsumer& p, const Duration& timeout); - - /** Release locks */ - ~ConsumerLock(); - }; - - private: - mutable Monitor monitor; - size_t items; - size_t waiters; - bool shutdownFlag; - - friend class Lock; - friend class ProducerLock; - friend class ConsumerLock; -}; - -}} // namespace qpid::sys - -#endif /*!_sys_ProducerConsumer_h*/ diff --git a/qpid/cpp/src/qpid/sys/ThreadSafeQueue.h b/qpid/cpp/src/qpid/sys/ThreadSafeQueue.h deleted file mode 100644 index 8f11c42051..0000000000 --- a/qpid/cpp/src/qpid/sys/ThreadSafeQueue.h +++ /dev/null @@ -1,98 +0,0 @@ -#ifndef _sys_ThreadSafeQueue_h -#define _sys_ThreadSafeQueue_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <deque> -#include "ProducerConsumer.h" -#include "qpid/Exception.h" - -namespace qpid { -namespace sys { - -/** - * A thread safe queue template. - */ -template <class T, class ContainerType=std::deque<T> > -class ThreadSafeQueue -{ - public: - - ThreadSafeQueue() {} - - /** Push a value onto the back of the queue */ - void push(const T& value) { - ProducerConsumer::ProducerLock producer(pc); - if (producer.isOk()) { - producer.confirm(); - container.push_back(value); - } - } - - /** Pop a value from the front of the queue. Waits till value is available. - *@throw ShutdownException if queue is shutdown while waiting. - */ - T pop() { - ProducerConsumer::ConsumerLock consumer(pc); - if (consumer.isOk()) { - consumer.confirm(); - T value(container.front()); - container.pop_front(); - return value; - } - throw ShutdownException(); - } - - /** - * If a value becomes available within the timeout, set outValue - * and return true. Otherwise return false; - */ - bool pop(T& outValue, const Time& timeout) { - ProducerConsumer::ConsumerLock consumer(pc, timeout); - if (consumer.isOk()) { - consumer.confirm(); - outValue = container.front(); - container.pop_front(); - return true; - } - return false; - } - - /** Interrupt threads waiting in pop() */ - void shutdown() { pc.shutdown(); } - - /** True if queue is shutdown */ - bool isShutdown() { return pc.isShutdown(); } - - /** Size of the queue */ - size_t size() { ProducerConsumer::Lock l(pc); return container.size(); } - - /** True if queue is empty */ - bool empty() { ProducerConsumer::Lock l(pc); return container.empty(); } - - private: - ProducerConsumer pc; - ContainerType container; -}; - -}} // namespace qpid::sys - - - -#endif /*!_sys_ThreadSafeQueue_h*/ diff --git a/qpid/cpp/src/tests/Cluster.cpp b/qpid/cpp/src/tests/Cluster.cpp index b22f312038..5ace48b736 100644 --- a/qpid/cpp/src/tests/Cluster.cpp +++ b/qpid/cpp/src/tests/Cluster.cpp @@ -19,8 +19,8 @@ #include "Cluster.h" #include "test_tools.h" -#include "qpid/framing/ChannelPingBody.h" -#include "qpid/framing/ChannelOkBody.h" +#include "qpid/framing/SessionPingBody.h" +#include "qpid/framing/SessionPongBody.h" #include "qpid/cluster/ClassifierHandler.h" #define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*> @@ -33,16 +33,16 @@ static const ProtocolVersion VER; /** Verify membership in a cluster with one member. */ BOOST_AUTO_TEST_CASE(testClusterOne) { TestCluster cluster("clusterOne", "amqp:one:1"); - AMQFrame frame(VER, 1, new ChannelPingBody(VER)); + AMQFrame frame(VER, 1, new SessionPingBody(VER)); Uuid id(true); SessionFrame send(id, frame, true); cluster.handle(send); - BOOST_REQUIRE(cluster.received.waitFor(1)); + SessionFrame sf; + BOOST_REQUIRE(cluster.received.waitPop(sf)); - SessionFrame& sf=cluster.received[0]; BOOST_CHECK(sf.isIncoming); BOOST_CHECK_EQUAL(id, sf.uuid); - BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody()); + BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody()); BOOST_CHECK_EQUAL(1u, cluster.size()); Cluster::MemberList members = cluster.getMembers(); @@ -65,17 +65,18 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) { BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child. // Exchange frames with child. - AMQFrame frame(VER, 1, new ChannelPingBody(VER)); + AMQFrame frame(VER, 1, new SessionPingBody(VER)); Uuid id(true); SessionFrame send(id, frame, true); cluster.handle(send); - BOOST_REQUIRE(cluster.received.waitFor(1)); - SessionFrame& sf=cluster.received[0]; + SessionFrame sf; + BOOST_REQUIRE(cluster.received.waitPop(sf)); BOOST_CHECK_EQUAL(id, sf.uuid); BOOST_CHECK(sf.isIncoming); - BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody()); - BOOST_REQUIRE(cluster.received.waitFor(2)); - BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.received[1].frame.getBody()); + BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody()); + + BOOST_REQUIRE(cluster.received.waitPop(sf)); + BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *sf.frame.getBody()); if (!nofork) { // Wait for child to exit. diff --git a/qpid/cpp/src/tests/Cluster.h b/qpid/cpp/src/tests/Cluster.h index e896fccafe..02e642f641 100644 --- a/qpid/cpp/src/tests/Cluster.h +++ b/qpid/cpp/src/tests/Cluster.h @@ -20,16 +20,13 @@ */ #include "qpid/cluster/Cluster.h" +#include "qpid/sys/ConcurrentQueue.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/ChannelOkBody.h" -#include "qpid/framing/BasicGetOkBody.h" -#include "qpid/log/Logger.h" #include <boost/bind.hpp> #include <boost/test/test_tools.hpp> #include <iostream> -#include <vector> #include <functional> /** @@ -48,26 +45,12 @@ using namespace boost; void null_deleter(void*) {} template <class T> -class TestHandler : public Handler<T&>, public vector<T> +class TestHandler : public Handler<T&>, public ConcurrentQueue<T> { - Monitor lock; - public: - void handle(T& frame) { - Mutex::ScopedLock l(lock); - push_back(frame); - BOOST_MESSAGE(getpid()<<" TestHandler::handle: " << this->size()); - lock.notifyAll(); - } - - bool waitFor(size_t n) { - Mutex::ScopedLock l(lock); - BOOST_MESSAGE(getpid()<<" TestHandler::waitFor("<<n<<") "<<this->size()); - AbsTime deadline(now(), 2*TIME_SEC); - while (this->size() < n && lock.wait(deadline)) - ; - return this->size() >= n; - } + void handle(T& frame) { push(frame); } + bool waitPop(T& x) { return waitPop(x, TIME_SEC); } + using ConcurrentQueue<T>::waitPop; }; typedef TestHandler<AMQFrame> TestFrameHandler; @@ -83,7 +66,8 @@ struct TestCluster : public Cluster /** Wait for cluster to be of size n. */ bool waitFor(size_t n) { BOOST_CHECKPOINT("About to call Cluster::wait"); - return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this), n)); + return wait(boost::bind( + equal_to<size_t>(), bind(&Cluster::size,this), n)); } TestSessionFrameHandler received; diff --git a/qpid/cpp/src/tests/Cluster_child.cpp b/qpid/cpp/src/tests/Cluster_child.cpp index c509dc1950..fd4eb42e7b 100644 --- a/qpid/cpp/src/tests/Cluster_child.cpp +++ b/qpid/cpp/src/tests/Cluster_child.cpp @@ -20,6 +20,8 @@ #include "Cluster.h" #include "test_tools.h" +#include "qpid/framing/SessionPingBody.h" +#include "qpid/framing/SessionPongBody.h" using namespace std; using namespace qpid; @@ -33,17 +35,18 @@ static const ProtocolVersion VER; /** Chlid part of Cluster::clusterTwo test */ void clusterTwo() { TestCluster cluster("clusterTwo", "amqp:child:2"); - BOOST_REQUIRE(cluster.received.waitFor(1)); // Frame from parent. - BOOST_CHECK(cluster.received[0].isIncoming); - BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.received[0].frame.getBody()); + SessionFrame sf; + BOOST_REQUIRE(cluster.received.waitPop(sf)); // Frame from parent. + BOOST_CHECK(sf.isIncoming); + BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody()); BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent - AMQFrame frame(VER, 1, new ChannelOkBody(VER)); - SessionFrame sf(cluster.received[0].uuid, frame, false); - cluster.handle(sf); - BOOST_REQUIRE(cluster.received.waitFor(2)); - BOOST_CHECK(!cluster.received[1].isIncoming); - BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.received[1].frame.getBody()); + AMQFrame frame(VER, 1, new SessionPongBody(VER)); + SessionFrame sendframe(sf.uuid, frame, false); + cluster.handle(sendframe); + BOOST_REQUIRE(cluster.received.waitPop(sf)); + BOOST_CHECK(!sf.isIncoming); + BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *sf.frame.getBody()); } int test_main(int, char**) { diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 4f1e7e1ec3..edb4f5b375 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -87,9 +87,6 @@ framing_unit_tests = \ HeaderTest \ SequenceNumberTest -misc_unit_tests = \ - ProducerConsumerTest - posix_unit_tests = \ EventChannelTest \ EventChannelThreadsTest diff --git a/qpid/cpp/src/tests/ProducerConsumerTest.cpp b/qpid/cpp/src/tests/ProducerConsumerTest.cpp deleted file mode 100644 index 789e365a85..0000000000 --- a/qpid/cpp/src/tests/ProducerConsumerTest.cpp +++ /dev/null @@ -1,284 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <vector> -#include <iostream> - -#include <boost/bind.hpp> - -#include "qpid_test_plugin.h" -#include "InProcessBroker.h" -#include "qpid/sys/ProducerConsumer.h" -#include "qpid/sys/Thread.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/sys/AtomicCount.h" - -using namespace qpid; -using namespace sys; -using namespace framing; -using namespace boost; -using namespace std; - -/** A counter that notifies a monitor when changed */ -class WatchedCounter : public Monitor { - public: - WatchedCounter(int i=0) : count(i) {} - WatchedCounter(const WatchedCounter& c) : Monitor(), count(int(c)) {} - - WatchedCounter& operator=(const WatchedCounter& x) { - return *this = int(x); - } - - WatchedCounter& operator=(int i) { - Lock l(*this); - count = i; - return *this; - } - - int operator++() { - Lock l(*this); - notifyAll(); - return ++count; - } - - int operator++(int) { - Lock l(*this); - notifyAll(); - return count++; - } - - bool operator==(int i) const { - Lock l(const_cast<WatchedCounter&>(*this)); - return i == count; - } - - operator int() const { - Lock l(const_cast<WatchedCounter&>(*this)); - return count; - } - - bool waitFor(int i, Duration timeout=TIME_SEC) { - Lock l(*this); - AbsTime deadline(now(), timeout); - while (count != i) { - if (!wait(deadline)) - return false; - } - assert(count == i); - return true; - } - - private: - typedef Mutex::ScopedLock Lock; - int count; -}; - -class ProducerConsumerTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ProducerConsumerTest); - CPPUNIT_TEST(testProduceConsume); - CPPUNIT_TEST(testTimeout); - CPPUNIT_TEST(testShutdown); - CPPUNIT_TEST(testCancel); - CPPUNIT_TEST_SUITE_END(); - - public: - client::InProcessBrokerClient client; - ProducerConsumer pc; - - WatchedCounter shutdown; - WatchedCounter timeout; - WatchedCounter consumed; - WatchedCounter produced; - - struct ConsumeRunnable : public Runnable { - ProducerConsumerTest& test; - ConsumeRunnable(ProducerConsumerTest& test_) : test(test_) {} - void run() { test.consume(); } - }; - - struct ConsumeTimeoutRunnable : public Runnable { - ProducerConsumerTest& test; - Duration timeout; - ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Duration& t) - : test(test_), timeout(t) {} - void run() { test.consumeTimeout(timeout); } - }; - - - void consumeInternal(ProducerConsumer::ConsumerLock& consumer) { - if (pc.isShutdown()) { - ++shutdown; - return; - } - if (consumer.isTimedOut()) { - ++timeout; - return; - } - CPPUNIT_ASSERT(consumer.isOk()); - CPPUNIT_ASSERT(pc.available() > 0); - consumer.confirm(); - consumed++; - } - - void consume() { - ProducerConsumer::ConsumerLock consumer(pc); - consumeInternal(consumer); - }; - - void consumeTimeout(const Duration& timeout) { - ProducerConsumer::ConsumerLock consumer(pc, timeout); - consumeInternal(consumer); - }; - - void produce() { - ProducerConsumer::ProducerLock producer(pc); - CPPUNIT_ASSERT(producer.isOk()); - producer.confirm(); - produced++; - } - - void join(vector<Thread>& threads) { - for_each(threads.begin(), threads.end(), bind(&Thread::join,_1)); - } - - vector<Thread> startThreads(size_t n, Runnable& runnable) { - vector<Thread> threads(n); - while (n > 0) - threads[--n] = Thread(runnable); - return threads; - } - -public: - ProducerConsumerTest() : client() {} - - void testProduceConsume() { - ConsumeRunnable runMe(*this); - produce(); - produce(); - CPPUNIT_ASSERT(produced.waitFor(2)); - vector<Thread> threads = startThreads(1, runMe); - CPPUNIT_ASSERT(consumed.waitFor(1)); - join(threads); - - threads = startThreads(1, runMe); - CPPUNIT_ASSERT(consumed.waitFor(2)); - join(threads); - - threads = startThreads(3, runMe); - produce(); - produce(); - CPPUNIT_ASSERT(consumed.waitFor(4)); - produce(); - CPPUNIT_ASSERT(consumed.waitFor(5)); - join(threads); - CPPUNIT_ASSERT_EQUAL(0, int(shutdown)); - } - - void testTimeout() { - try { - // 0 timeout no items available throws exception - ProducerConsumer::ConsumerLock consumer(pc, 0); - CPPUNIT_FAIL("Expected exception"); - } catch(...){} - - produce(); - CPPUNIT_ASSERT(produced.waitFor(1)); - CPPUNIT_ASSERT_EQUAL(1, int(pc.available())); - { - // 0 timeout succeeds if there's an item available. - ProducerConsumer::ConsumerLock consume(pc, 0); - CPPUNIT_ASSERT(consume.isOk()); - consume.confirm(); - } - CPPUNIT_ASSERT_EQUAL(0, int(pc.available())); - - // Produce an item within the timeout. - ConsumeTimeoutRunnable runMe(*this, 2*TIME_SEC); - vector<Thread> threads = startThreads(1, runMe); - produce(); - CPPUNIT_ASSERT(consumed.waitFor(1)); - join(threads); - } - - - void testShutdown() { - ConsumeRunnable runMe(*this); - vector<Thread> threads = startThreads(2, runMe); - while (pc.consumers() != 2) - Thread::yield(); - pc.shutdown(); - CPPUNIT_ASSERT(shutdown.waitFor(2)); - join(threads); - - threads = startThreads(1, runMe); // Should shutdown immediately. - CPPUNIT_ASSERT(shutdown.waitFor(3)); - join(threads); - - // Produce/consume while shutdown should return isShutdown and - // throw on confirm. - try { - ProducerConsumer::ProducerLock p(pc); - CPPUNIT_ASSERT(pc.isShutdown()); - CPPUNIT_FAIL("Expected exception"); - } - catch (...) {} // Expected - try { - ProducerConsumer::ConsumerLock c(pc); - CPPUNIT_ASSERT(pc.isShutdown()); - CPPUNIT_FAIL("Expected exception"); - } - catch (...) {} // Expected - } - - void testCancel() { - CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available()); - { - ProducerConsumer::ProducerLock p(pc); - CPPUNIT_ASSERT(p.isOk()); - p.cancel(); - } - // Nothing was produced. - CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available()); - { - ProducerConsumer::ConsumerLock c(pc, 0); - CPPUNIT_ASSERT(c.isTimedOut()); - } - // Now produce but cancel the consume - { - ProducerConsumer::ProducerLock p(pc); - CPPUNIT_ASSERT(p.isOk()); - p.confirm(); - } - CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available()); - { - ProducerConsumer::ConsumerLock c(pc); - CPPUNIT_ASSERT(c.isOk()); - c.cancel(); - } - CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available()); - } -}; - - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ProducerConsumerTest); - diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index 765aa02eb0..68725758fa 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -20,6 +20,7 @@ check_PROGRAMS+=Cpg Cpg_SOURCES=Cpg.cpp Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework +# TODO aconway 2007-07-26: Fix this test. #TESTS+=Cluster check_PROGRAMS+=Cluster Cluster_SOURCES=Cluster.cpp Cluster.h |