summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/Cluster.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/Cluster.h')
-rw-r--r--qpid/cpp/src/tests/Cluster.h30
1 files changed, 7 insertions, 23 deletions
diff --git a/qpid/cpp/src/tests/Cluster.h b/qpid/cpp/src/tests/Cluster.h
index e896fccafe..02e642f641 100644
--- a/qpid/cpp/src/tests/Cluster.h
+++ b/qpid/cpp/src/tests/Cluster.h
@@ -20,16 +20,13 @@
*/
#include "qpid/cluster/Cluster.h"
+#include "qpid/sys/ConcurrentQueue.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ChannelOkBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
#include <boost/test/test_tools.hpp>
#include <iostream>
-#include <vector>
#include <functional>
/**
@@ -48,26 +45,12 @@ using namespace boost;
void null_deleter(void*) {}
template <class T>
-class TestHandler : public Handler<T&>, public vector<T>
+class TestHandler : public Handler<T&>, public ConcurrentQueue<T>
{
- Monitor lock;
-
public:
- void handle(T& frame) {
- Mutex::ScopedLock l(lock);
- push_back(frame);
- BOOST_MESSAGE(getpid()<<" TestHandler::handle: " << this->size());
- lock.notifyAll();
- }
-
- bool waitFor(size_t n) {
- Mutex::ScopedLock l(lock);
- BOOST_MESSAGE(getpid()<<" TestHandler::waitFor("<<n<<") "<<this->size());
- AbsTime deadline(now(), 2*TIME_SEC);
- while (this->size() < n && lock.wait(deadline))
- ;
- return this->size() >= n;
- }
+ void handle(T& frame) { push(frame); }
+ bool waitPop(T& x) { return waitPop(x, TIME_SEC); }
+ using ConcurrentQueue<T>::waitPop;
};
typedef TestHandler<AMQFrame> TestFrameHandler;
@@ -83,7 +66,8 @@ struct TestCluster : public Cluster
/** Wait for cluster to be of size n. */
bool waitFor(size_t n) {
BOOST_CHECKPOINT("About to call Cluster::wait");
- return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this), n));
+ return wait(boost::bind(
+ equal_to<size_t>(), bind(&Cluster::size,this), n));
}
TestSessionFrameHandler received;