summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-26 15:47:23 +0000
committerAlan Conway <aconway@apache.org>2007-07-26 15:47:23 +0000
commit6d0dba0db1febf5cfda414c8549a09c235a499ab (patch)
tree5794af5cdb6163a01e8e214a37a3b1e42d7e63a6 /qpid/cpp/src
parent609cb1359a12e7cc7f476d4a5e8a05bdcb9a3c22 (diff)
downloadqpid-python-6d0dba0db1febf5cfda414c8549a09c235a499ab.tar.gz
* README: Instructions for openais install.
* configure.ac: Enable clustering if suitable openais is present. * src/tests/Cluster.cpp, .h, Cluster_child: Updated for 0-10 * src/qpid/sys/ConcurrentQueue.h: Added waitPop() * src/Makefile.am, src/qpid/sys/ThreadSafeQueue.h, ProducerConsumer.h: Removed unused code, ConcurrentQueue provides same functionality. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@559859 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/Makefile.am3
-rw-r--r--qpid/cpp/src/qpid/sys/ConcurrentQueue.h53
-rw-r--r--qpid/cpp/src/qpid/sys/ProducerConsumer.cpp141
-rw-r--r--qpid/cpp/src/qpid/sys/ProducerConsumer.h165
-rw-r--r--qpid/cpp/src/qpid/sys/ThreadSafeQueue.h98
-rw-r--r--qpid/cpp/src/tests/Cluster.cpp25
-rw-r--r--qpid/cpp/src/tests/Cluster.h30
-rw-r--r--qpid/cpp/src/tests/Cluster_child.cpp21
-rw-r--r--qpid/cpp/src/tests/Makefile.am3
-rw-r--r--qpid/cpp/src/tests/ProducerConsumerTest.cpp284
-rw-r--r--qpid/cpp/src/tests/cluster.mk1
11 files changed, 83 insertions, 741 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 55032a9eae..f4e807cf66 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -176,7 +176,6 @@ libqpidcommon_la_SOURCES = \
qpid/sys/Runnable.cpp \
qpid/sys/Shlib.h \
qpid/sys/Shlib.cpp \
- qpid/sys/ProducerConsumer.cpp \
qpid/Options.cpp \
qpid/Options.h \
qpid/log/Options.cpp \
@@ -380,7 +379,6 @@ nobase_include_HEADERS = \
qpid/sys/Monitor.h \
qpid/sys/Mutex.h \
qpid/sys/Poller.h \
- qpid/sys/ProducerConsumer.h \
qpid/sys/Runnable.h \
qpid/sys/ScopedIncrement.h \
qpid/sys/ShutdownHandler.h \
@@ -388,7 +386,6 @@ nobase_include_HEADERS = \
qpid/sys/Thread.h \
qpid/sys/ConcurrentQueue.h \
qpid/sys/Serializer.h \
- qpid/sys/ThreadSafeQueue.h \
qpid/sys/Time.h \
qpid/sys/TimeoutHandler.h \
qpid/Exception.h \
diff --git a/qpid/cpp/src/qpid/sys/ConcurrentQueue.h b/qpid/cpp/src/qpid/sys/ConcurrentQueue.h
index dd7689666b..917afc5704 100644
--- a/qpid/cpp/src/qpid/sys/ConcurrentQueue.h
+++ b/qpid/cpp/src/qpid/sys/ConcurrentQueue.h
@@ -22,7 +22,10 @@
*
*/
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/ScopedIncrement.h"
+
+#include <boost/bind.hpp>
#include <deque>
@@ -33,9 +36,24 @@ namespace sys {
* Thread-safe queue that allows threads to push items onto
* the queue concurrently with threads popping items off the
* queue.
+ *
+ * Also allows consuming threads to wait until an item is available.
*/
template <class T> class ConcurrentQueue {
public:
+ ConcurrentQueue() : waiters(0), shutdown(false) {}
+
+ /** Threads in wait() are woken with ShutdownException before
+ * destroying the queue.
+ */
+ ~ConcurrentQueue() {
+ Mutex::ScopedLock l(lock);
+ shutdown = true;
+ lock.notifyAll();
+ while (waiters > 0)
+ lock.wait();
+ }
+
/** Push a data item onto the back of the queue */
void push(const T& data) {
Mutex::ScopedLock l(lock);
@@ -47,6 +65,28 @@ template <class T> class ConcurrentQueue {
*/
bool pop(T& data) {
Mutex::ScopedLock l(lock);
+ return popInternal(data);
+ }
+
+ /** Wait up to deadline for a data item to be available.
+ *@return true if data was available, false if timed out.
+ *@throws ShutdownException if the queue is destroyed.
+ */
+ bool waitPop(T& data, Duration timeout) {
+ Mutex::ScopedLock l(lock);
+ ScopedIncrement<size_t> w(
+ waiters, boost::bind(&ConcurrentQueue::noWaiters, this));
+ AbsTime deadline(now(), timeout);
+ while (queue.empty() && lock.wait(deadline))
+ ;
+ return popInternal(data);
+ }
+
+ private:
+
+ bool popInternal(T& data) {
+ if (shutdown)
+ throw ShutdownException();
if (queue.empty())
return false;
else {
@@ -56,9 +96,16 @@ template <class T> class ConcurrentQueue {
}
}
- private:
- Mutex lock;
+ void noWaiters() {
+ assert(waiters == 0);
+ if (shutdown)
+ lock.notify(); // Notify dtor thread.
+ }
+
+ Monitor lock;
std::deque<T> queue;
+ size_t waiters;
+ bool shutdown;
};
}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/ProducerConsumer.cpp b/qpid/cpp/src/qpid/sys/ProducerConsumer.cpp
deleted file mode 100644
index e892f60794..0000000000
--- a/qpid/cpp/src/qpid/sys/ProducerConsumer.cpp
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-
-#include "qpid/QpidError.h"
-#include "ScopedIncrement.h"
-#include "ProducerConsumer.h"
-
-namespace qpid {
-namespace sys {
-
-// // ================ ProducerConsumer
-
-ProducerConsumer::ProducerConsumer(size_t init_items)
- : items(init_items), waiters(0), shutdownFlag(false)
-{}
-
-void ProducerConsumer::shutdown() {
- Mutex::ScopedLock l(monitor);
- shutdownFlag = true;
- monitor.notifyAll();
- // Wait for waiting consumers to wake up.
- while (waiters > 0)
- monitor.wait();
-}
-
-size_t ProducerConsumer::available() const {
- Mutex::ScopedLock l(monitor);
- return items;
-}
-
-size_t ProducerConsumer::consumers() const {
- Mutex::ScopedLock l(monitor);
- return waiters;
-}
-
-// ================ Lock
-
-ProducerConsumer::Lock::Lock(ProducerConsumer& p)
- : pc(p), lock(p.monitor), status(INCOMPLETE) {}
-
-bool ProducerConsumer::Lock::isOk() const {
- return !pc.isShutdown() && status==INCOMPLETE;
-}
-
-void ProducerConsumer::Lock::checkOk() const {
- assert(!pc.isShutdown());
- assert(status == INCOMPLETE);
-}
-
-ProducerConsumer::Lock::~Lock() {
- assert(status != INCOMPLETE || pc.isShutdown());
-}
-
-void ProducerConsumer::Lock::confirm() {
- checkOk();
- status = CONFIRMED;
-}
-
-void ProducerConsumer::Lock::cancel() {
- checkOk();
- status = CANCELLED;
-}
-
-// ================ ProducerLock
-
-ProducerConsumer::ProducerLock::ProducerLock(ProducerConsumer& p) : Lock(p)
-{}
-
-
-ProducerConsumer::ProducerLock::~ProducerLock() {
- if (status == CONFIRMED) {
- pc.items++;
- pc.monitor.notify(); // Notify a consumer.
- }
-}
-
-// ================ ConsumerLock
-
-ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p)
-{
- if (isOk()) {
- ScopedIncrement<size_t> inc(pc.waiters);
- while (pc.items == 0 && !pc.shutdownFlag) {
- pc.monitor.wait();
- }
- }
-}
-
-ProducerConsumer::ConsumerLock::ConsumerLock(
- ProducerConsumer& p, const Duration& timeout) : Lock(p)
-{
- if (isOk()) {
- // Don't wait if timeout==0
- if (timeout == 0) {
- if (pc.items == 0)
- status = TIMEOUT;
- return;
- }
- else {
- AbsTime deadline(now(), timeout);
- ScopedIncrement<size_t> inc(pc.waiters);
- while (pc.items == 0 && !pc.shutdownFlag) {
- if (!pc.monitor.wait(deadline)) {
- status = TIMEOUT;
- return;
- }
- }
- }
- }
-}
-
-ProducerConsumer::ConsumerLock::~ConsumerLock() {
- if (pc.isShutdown()) {
- if (pc.waiters == 0)
- pc.monitor.notifyAll(); // Notify shutdown thread(s)
- }
- else if (status==CONFIRMED) {
- pc.items--;
- if (pc.items > 0)
- pc.monitor.notify(); // Notify another consumer.
- }
-}
-
-
-}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/ProducerConsumer.h b/qpid/cpp/src/qpid/sys/ProducerConsumer.h
deleted file mode 100644
index 2a02dab503..0000000000
--- a/qpid/cpp/src/qpid/sys/ProducerConsumer.h
+++ /dev/null
@@ -1,165 +0,0 @@
-#ifndef _sys_ProducerConsumer_h
-#define _sys_ProducerConsumer_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <boost/noncopyable.hpp>
-#include "qpid/Exception.h"
-#include "Monitor.h"
-
-namespace qpid {
-namespace sys {
-
-/**
- * Producer-consumer synchronisation.
- *
- * Producers increase the number of available items, consumers reduce it.
- * Consumers wait till an item is available. Waiting threads can be
- * woken for shutdown using shutdown().
- *
- * Note: Currently implements unbounded producer-consumer, i.e. no limit
- * to available items, producers never block. Can be extended to support
- * bounded PC if required.
- *
- // TODO aconway 2007-02-13: example, from tests.
-*/
-class ProducerConsumer
-{
- public:
- ProducerConsumer(size_t init_items=0);
-
- ~ProducerConsumer() { shutdown(); }
-
- /**
- * Wake any threads waiting for ProducerLock or ConsumerLock.
- *@post No threads are waiting in Producer or Consumer locks.
- */
- void shutdown();
-
- /** True if queue is shutdown */
- bool isShutdown() { return shutdownFlag; }
-
- /** Number of items available for consumers */
- size_t available() const;
-
- /** Number of consumers waiting for items */
- size_t consumers() const;
-
- /** True if available == 0 */
- bool empty() const { return available() == 0; }
-
- /**
- * Base class for producer and consumer locks.
- */
- class Lock : private boost::noncopyable {
- public:
-
- /**
- * You must call isOk() after creating a lock to verify its state.
- *
- *@return true means the lock succeeded. You MUST call either
- *confirm() or cancel() before the lock goes out of scope.
- *
- * false means the lock failed - timed out or the
- * ProducerConsumer is shutdown. You should not do anything in
- * the scope of the lock.
- */
- bool isOk() const;
-
- /**
- * Confirm that an item was produced/consumed.
- *@pre isOk()
- */
- void confirm();
-
- /**
- * Cancel the lock to indicate nothing was produced/consumed.
- * Note that locks are not actually released until destroyed.
- *
- *@pre isOk()
- */
- void cancel();
-
- /** True if this lock experienced a timeout */
- bool isTimedOut() const { return status == TIMEOUT; }
-
- /** True if we have been shutdown */
- bool isShutdown() const { return pc.isShutdown(); }
-
- ProducerConsumer& pc;
-
- protected:
- /** Lock status */
- enum Status { INCOMPLETE, CONFIRMED, CANCELLED, TIMEOUT };
-
- Lock(ProducerConsumer& p);
- ~Lock();
- void checkOk() const;
- Mutex::ScopedLock lock;
- Status status;
- };
-
- /** Lock for code that produces items. */
- struct ProducerLock : public Lock {
- /**
- * Acquire locks to produce an item.
- *@post If isOk() the calling thread has exclusive access
- * to produce an item.
- */
- ProducerLock(ProducerConsumer& p);
-
- /** Release locks, signal waiting consumers if confirm() was called. */
- ~ProducerLock();
- };
-
- /** Lock for code that consumes items */
- struct ConsumerLock : public Lock {
- /**
- * Wait for an item to consume and acquire locks.
- *
- *@post If isOk() there is at least one item available and the
- *calling thread has exclusive access to consume it.
- */
- ConsumerLock(ProducerConsumer& p);
-
- /**
- * Wait up to timeout to acquire lock.
- *@post If isOk() caller has a producer lock.
- * If isTimedOut() there was a timeout.
- * If neither then we were shutdown.
- */
- ConsumerLock(ProducerConsumer& p, const Duration& timeout);
-
- /** Release locks */
- ~ConsumerLock();
- };
-
- private:
- mutable Monitor monitor;
- size_t items;
- size_t waiters;
- bool shutdownFlag;
-
- friend class Lock;
- friend class ProducerLock;
- friend class ConsumerLock;
-};
-
-}} // namespace qpid::sys
-
-#endif /*!_sys_ProducerConsumer_h*/
diff --git a/qpid/cpp/src/qpid/sys/ThreadSafeQueue.h b/qpid/cpp/src/qpid/sys/ThreadSafeQueue.h
deleted file mode 100644
index 8f11c42051..0000000000
--- a/qpid/cpp/src/qpid/sys/ThreadSafeQueue.h
+++ /dev/null
@@ -1,98 +0,0 @@
-#ifndef _sys_ThreadSafeQueue_h
-#define _sys_ThreadSafeQueue_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <deque>
-#include "ProducerConsumer.h"
-#include "qpid/Exception.h"
-
-namespace qpid {
-namespace sys {
-
-/**
- * A thread safe queue template.
- */
-template <class T, class ContainerType=std::deque<T> >
-class ThreadSafeQueue
-{
- public:
-
- ThreadSafeQueue() {}
-
- /** Push a value onto the back of the queue */
- void push(const T& value) {
- ProducerConsumer::ProducerLock producer(pc);
- if (producer.isOk()) {
- producer.confirm();
- container.push_back(value);
- }
- }
-
- /** Pop a value from the front of the queue. Waits till value is available.
- *@throw ShutdownException if queue is shutdown while waiting.
- */
- T pop() {
- ProducerConsumer::ConsumerLock consumer(pc);
- if (consumer.isOk()) {
- consumer.confirm();
- T value(container.front());
- container.pop_front();
- return value;
- }
- throw ShutdownException();
- }
-
- /**
- * If a value becomes available within the timeout, set outValue
- * and return true. Otherwise return false;
- */
- bool pop(T& outValue, const Time& timeout) {
- ProducerConsumer::ConsumerLock consumer(pc, timeout);
- if (consumer.isOk()) {
- consumer.confirm();
- outValue = container.front();
- container.pop_front();
- return true;
- }
- return false;
- }
-
- /** Interrupt threads waiting in pop() */
- void shutdown() { pc.shutdown(); }
-
- /** True if queue is shutdown */
- bool isShutdown() { return pc.isShutdown(); }
-
- /** Size of the queue */
- size_t size() { ProducerConsumer::Lock l(pc); return container.size(); }
-
- /** True if queue is empty */
- bool empty() { ProducerConsumer::Lock l(pc); return container.empty(); }
-
- private:
- ProducerConsumer pc;
- ContainerType container;
-};
-
-}} // namespace qpid::sys
-
-
-
-#endif /*!_sys_ThreadSafeQueue_h*/
diff --git a/qpid/cpp/src/tests/Cluster.cpp b/qpid/cpp/src/tests/Cluster.cpp
index b22f312038..5ace48b736 100644
--- a/qpid/cpp/src/tests/Cluster.cpp
+++ b/qpid/cpp/src/tests/Cluster.cpp
@@ -19,8 +19,8 @@
#include "Cluster.h"
#include "test_tools.h"
-#include "qpid/framing/ChannelPingBody.h"
-#include "qpid/framing/ChannelOkBody.h"
+#include "qpid/framing/SessionPingBody.h"
+#include "qpid/framing/SessionPongBody.h"
#include "qpid/cluster/ClassifierHandler.h"
#define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*>
@@ -33,16 +33,16 @@ static const ProtocolVersion VER;
/** Verify membership in a cluster with one member. */
BOOST_AUTO_TEST_CASE(testClusterOne) {
TestCluster cluster("clusterOne", "amqp:one:1");
- AMQFrame frame(VER, 1, new ChannelPingBody(VER));
+ AMQFrame frame(VER, 1, new SessionPingBody(VER));
Uuid id(true);
SessionFrame send(id, frame, true);
cluster.handle(send);
- BOOST_REQUIRE(cluster.received.waitFor(1));
+ SessionFrame sf;
+ BOOST_REQUIRE(cluster.received.waitPop(sf));
- SessionFrame& sf=cluster.received[0];
BOOST_CHECK(sf.isIncoming);
BOOST_CHECK_EQUAL(id, sf.uuid);
- BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody());
+ BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody());
BOOST_CHECK_EQUAL(1u, cluster.size());
Cluster::MemberList members = cluster.getMembers();
@@ -65,17 +65,18 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) {
BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
// Exchange frames with child.
- AMQFrame frame(VER, 1, new ChannelPingBody(VER));
+ AMQFrame frame(VER, 1, new SessionPingBody(VER));
Uuid id(true);
SessionFrame send(id, frame, true);
cluster.handle(send);
- BOOST_REQUIRE(cluster.received.waitFor(1));
- SessionFrame& sf=cluster.received[0];
+ SessionFrame sf;
+ BOOST_REQUIRE(cluster.received.waitPop(sf));
BOOST_CHECK_EQUAL(id, sf.uuid);
BOOST_CHECK(sf.isIncoming);
- BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody());
- BOOST_REQUIRE(cluster.received.waitFor(2));
- BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.received[1].frame.getBody());
+ BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody());
+
+ BOOST_REQUIRE(cluster.received.waitPop(sf));
+ BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *sf.frame.getBody());
if (!nofork) {
// Wait for child to exit.
diff --git a/qpid/cpp/src/tests/Cluster.h b/qpid/cpp/src/tests/Cluster.h
index e896fccafe..02e642f641 100644
--- a/qpid/cpp/src/tests/Cluster.h
+++ b/qpid/cpp/src/tests/Cluster.h
@@ -20,16 +20,13 @@
*/
#include "qpid/cluster/Cluster.h"
+#include "qpid/sys/ConcurrentQueue.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ChannelOkBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
#include <boost/test/test_tools.hpp>
#include <iostream>
-#include <vector>
#include <functional>
/**
@@ -48,26 +45,12 @@ using namespace boost;
void null_deleter(void*) {}
template <class T>
-class TestHandler : public Handler<T&>, public vector<T>
+class TestHandler : public Handler<T&>, public ConcurrentQueue<T>
{
- Monitor lock;
-
public:
- void handle(T& frame) {
- Mutex::ScopedLock l(lock);
- push_back(frame);
- BOOST_MESSAGE(getpid()<<" TestHandler::handle: " << this->size());
- lock.notifyAll();
- }
-
- bool waitFor(size_t n) {
- Mutex::ScopedLock l(lock);
- BOOST_MESSAGE(getpid()<<" TestHandler::waitFor("<<n<<") "<<this->size());
- AbsTime deadline(now(), 2*TIME_SEC);
- while (this->size() < n && lock.wait(deadline))
- ;
- return this->size() >= n;
- }
+ void handle(T& frame) { push(frame); }
+ bool waitPop(T& x) { return waitPop(x, TIME_SEC); }
+ using ConcurrentQueue<T>::waitPop;
};
typedef TestHandler<AMQFrame> TestFrameHandler;
@@ -83,7 +66,8 @@ struct TestCluster : public Cluster
/** Wait for cluster to be of size n. */
bool waitFor(size_t n) {
BOOST_CHECKPOINT("About to call Cluster::wait");
- return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this), n));
+ return wait(boost::bind(
+ equal_to<size_t>(), bind(&Cluster::size,this), n));
}
TestSessionFrameHandler received;
diff --git a/qpid/cpp/src/tests/Cluster_child.cpp b/qpid/cpp/src/tests/Cluster_child.cpp
index c509dc1950..fd4eb42e7b 100644
--- a/qpid/cpp/src/tests/Cluster_child.cpp
+++ b/qpid/cpp/src/tests/Cluster_child.cpp
@@ -20,6 +20,8 @@
#include "Cluster.h"
#include "test_tools.h"
+#include "qpid/framing/SessionPingBody.h"
+#include "qpid/framing/SessionPongBody.h"
using namespace std;
using namespace qpid;
@@ -33,17 +35,18 @@ static const ProtocolVersion VER;
/** Chlid part of Cluster::clusterTwo test */
void clusterTwo() {
TestCluster cluster("clusterTwo", "amqp:child:2");
- BOOST_REQUIRE(cluster.received.waitFor(1)); // Frame from parent.
- BOOST_CHECK(cluster.received[0].isIncoming);
- BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.received[0].frame.getBody());
+ SessionFrame sf;
+ BOOST_REQUIRE(cluster.received.waitPop(sf)); // Frame from parent.
+ BOOST_CHECK(sf.isIncoming);
+ BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody());
BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
- AMQFrame frame(VER, 1, new ChannelOkBody(VER));
- SessionFrame sf(cluster.received[0].uuid, frame, false);
- cluster.handle(sf);
- BOOST_REQUIRE(cluster.received.waitFor(2));
- BOOST_CHECK(!cluster.received[1].isIncoming);
- BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.received[1].frame.getBody());
+ AMQFrame frame(VER, 1, new SessionPongBody(VER));
+ SessionFrame sendframe(sf.uuid, frame, false);
+ cluster.handle(sendframe);
+ BOOST_REQUIRE(cluster.received.waitPop(sf));
+ BOOST_CHECK(!sf.isIncoming);
+ BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *sf.frame.getBody());
}
int test_main(int, char**) {
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 4f1e7e1ec3..edb4f5b375 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -87,9 +87,6 @@ framing_unit_tests = \
HeaderTest \
SequenceNumberTest
-misc_unit_tests = \
- ProducerConsumerTest
-
posix_unit_tests = \
EventChannelTest \
EventChannelThreadsTest
diff --git a/qpid/cpp/src/tests/ProducerConsumerTest.cpp b/qpid/cpp/src/tests/ProducerConsumerTest.cpp
deleted file mode 100644
index 789e365a85..0000000000
--- a/qpid/cpp/src/tests/ProducerConsumerTest.cpp
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <vector>
-#include <iostream>
-
-#include <boost/bind.hpp>
-
-#include "qpid_test_plugin.h"
-#include "InProcessBroker.h"
-#include "qpid/sys/ProducerConsumer.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/sys/AtomicCount.h"
-
-using namespace qpid;
-using namespace sys;
-using namespace framing;
-using namespace boost;
-using namespace std;
-
-/** A counter that notifies a monitor when changed */
-class WatchedCounter : public Monitor {
- public:
- WatchedCounter(int i=0) : count(i) {}
- WatchedCounter(const WatchedCounter& c) : Monitor(), count(int(c)) {}
-
- WatchedCounter& operator=(const WatchedCounter& x) {
- return *this = int(x);
- }
-
- WatchedCounter& operator=(int i) {
- Lock l(*this);
- count = i;
- return *this;
- }
-
- int operator++() {
- Lock l(*this);
- notifyAll();
- return ++count;
- }
-
- int operator++(int) {
- Lock l(*this);
- notifyAll();
- return count++;
- }
-
- bool operator==(int i) const {
- Lock l(const_cast<WatchedCounter&>(*this));
- return i == count;
- }
-
- operator int() const {
- Lock l(const_cast<WatchedCounter&>(*this));
- return count;
- }
-
- bool waitFor(int i, Duration timeout=TIME_SEC) {
- Lock l(*this);
- AbsTime deadline(now(), timeout);
- while (count != i) {
- if (!wait(deadline))
- return false;
- }
- assert(count == i);
- return true;
- }
-
- private:
- typedef Mutex::ScopedLock Lock;
- int count;
-};
-
-class ProducerConsumerTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(ProducerConsumerTest);
- CPPUNIT_TEST(testProduceConsume);
- CPPUNIT_TEST(testTimeout);
- CPPUNIT_TEST(testShutdown);
- CPPUNIT_TEST(testCancel);
- CPPUNIT_TEST_SUITE_END();
-
- public:
- client::InProcessBrokerClient client;
- ProducerConsumer pc;
-
- WatchedCounter shutdown;
- WatchedCounter timeout;
- WatchedCounter consumed;
- WatchedCounter produced;
-
- struct ConsumeRunnable : public Runnable {
- ProducerConsumerTest& test;
- ConsumeRunnable(ProducerConsumerTest& test_) : test(test_) {}
- void run() { test.consume(); }
- };
-
- struct ConsumeTimeoutRunnable : public Runnable {
- ProducerConsumerTest& test;
- Duration timeout;
- ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Duration& t)
- : test(test_), timeout(t) {}
- void run() { test.consumeTimeout(timeout); }
- };
-
-
- void consumeInternal(ProducerConsumer::ConsumerLock& consumer) {
- if (pc.isShutdown()) {
- ++shutdown;
- return;
- }
- if (consumer.isTimedOut()) {
- ++timeout;
- return;
- }
- CPPUNIT_ASSERT(consumer.isOk());
- CPPUNIT_ASSERT(pc.available() > 0);
- consumer.confirm();
- consumed++;
- }
-
- void consume() {
- ProducerConsumer::ConsumerLock consumer(pc);
- consumeInternal(consumer);
- };
-
- void consumeTimeout(const Duration& timeout) {
- ProducerConsumer::ConsumerLock consumer(pc, timeout);
- consumeInternal(consumer);
- };
-
- void produce() {
- ProducerConsumer::ProducerLock producer(pc);
- CPPUNIT_ASSERT(producer.isOk());
- producer.confirm();
- produced++;
- }
-
- void join(vector<Thread>& threads) {
- for_each(threads.begin(), threads.end(), bind(&Thread::join,_1));
- }
-
- vector<Thread> startThreads(size_t n, Runnable& runnable) {
- vector<Thread> threads(n);
- while (n > 0)
- threads[--n] = Thread(runnable);
- return threads;
- }
-
-public:
- ProducerConsumerTest() : client() {}
-
- void testProduceConsume() {
- ConsumeRunnable runMe(*this);
- produce();
- produce();
- CPPUNIT_ASSERT(produced.waitFor(2));
- vector<Thread> threads = startThreads(1, runMe);
- CPPUNIT_ASSERT(consumed.waitFor(1));
- join(threads);
-
- threads = startThreads(1, runMe);
- CPPUNIT_ASSERT(consumed.waitFor(2));
- join(threads);
-
- threads = startThreads(3, runMe);
- produce();
- produce();
- CPPUNIT_ASSERT(consumed.waitFor(4));
- produce();
- CPPUNIT_ASSERT(consumed.waitFor(5));
- join(threads);
- CPPUNIT_ASSERT_EQUAL(0, int(shutdown));
- }
-
- void testTimeout() {
- try {
- // 0 timeout no items available throws exception
- ProducerConsumer::ConsumerLock consumer(pc, 0);
- CPPUNIT_FAIL("Expected exception");
- } catch(...){}
-
- produce();
- CPPUNIT_ASSERT(produced.waitFor(1));
- CPPUNIT_ASSERT_EQUAL(1, int(pc.available()));
- {
- // 0 timeout succeeds if there's an item available.
- ProducerConsumer::ConsumerLock consume(pc, 0);
- CPPUNIT_ASSERT(consume.isOk());
- consume.confirm();
- }
- CPPUNIT_ASSERT_EQUAL(0, int(pc.available()));
-
- // Produce an item within the timeout.
- ConsumeTimeoutRunnable runMe(*this, 2*TIME_SEC);
- vector<Thread> threads = startThreads(1, runMe);
- produce();
- CPPUNIT_ASSERT(consumed.waitFor(1));
- join(threads);
- }
-
-
- void testShutdown() {
- ConsumeRunnable runMe(*this);
- vector<Thread> threads = startThreads(2, runMe);
- while (pc.consumers() != 2)
- Thread::yield();
- pc.shutdown();
- CPPUNIT_ASSERT(shutdown.waitFor(2));
- join(threads);
-
- threads = startThreads(1, runMe); // Should shutdown immediately.
- CPPUNIT_ASSERT(shutdown.waitFor(3));
- join(threads);
-
- // Produce/consume while shutdown should return isShutdown and
- // throw on confirm.
- try {
- ProducerConsumer::ProducerLock p(pc);
- CPPUNIT_ASSERT(pc.isShutdown());
- CPPUNIT_FAIL("Expected exception");
- }
- catch (...) {} // Expected
- try {
- ProducerConsumer::ConsumerLock c(pc);
- CPPUNIT_ASSERT(pc.isShutdown());
- CPPUNIT_FAIL("Expected exception");
- }
- catch (...) {} // Expected
- }
-
- void testCancel() {
- CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
- {
- ProducerConsumer::ProducerLock p(pc);
- CPPUNIT_ASSERT(p.isOk());
- p.cancel();
- }
- // Nothing was produced.
- CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
- {
- ProducerConsumer::ConsumerLock c(pc, 0);
- CPPUNIT_ASSERT(c.isTimedOut());
- }
- // Now produce but cancel the consume
- {
- ProducerConsumer::ProducerLock p(pc);
- CPPUNIT_ASSERT(p.isOk());
- p.confirm();
- }
- CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
- {
- ProducerConsumer::ConsumerLock c(pc);
- CPPUNIT_ASSERT(c.isOk());
- c.cancel();
- }
- CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
- }
-};
-
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(ProducerConsumerTest);
-
diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk
index 765aa02eb0..68725758fa 100644
--- a/qpid/cpp/src/tests/cluster.mk
+++ b/qpid/cpp/src/tests/cluster.mk
@@ -20,6 +20,7 @@ check_PROGRAMS+=Cpg
Cpg_SOURCES=Cpg.cpp
Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
+# TODO aconway 2007-07-26: Fix this test.
#TESTS+=Cluster
check_PROGRAMS+=Cluster
Cluster_SOURCES=Cluster.cpp Cluster.h