summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp48
1 files changed, 33 insertions, 15 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
index 7387c348fd..184a899570 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
@@ -23,6 +23,8 @@
#include "PerfTest.h"
+#include "MessageConsumer.h"
+#include "MessageProducer.h"
#include "MockPersistableQueue.h"
#include "tests/storePerftools/version.h"
@@ -30,6 +32,7 @@
#include "tests/storePerftools/common/Thread.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
+#include "qpid/sys/Poller.h"
#include <iomanip>
@@ -56,8 +59,9 @@ PerfTest::~PerfTest()
m_pollingThread.join();
m_queueList.clear();
+ m_queueList.clear();
+ m_producers.clear();
- if (m_store) delete m_store;
delete[] m_msgData;
}
@@ -69,13 +73,21 @@ PerfTest::prepareStore()
}
void
+PerfTest::destroyStore()
+{
+ if (m_store) {
+ delete m_store;
+ }
+}
+
+void
PerfTest::prepareQueues()
{
for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) {
std::ostringstream qname;
qname << "queue_" << std::setw(4) << std::setfill('0') << i;
- MockPersistableQueue::intrusive_ptr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store, m_testOpts, m_msgData));
- mpq->asyncStoreCreate();
+ boost::shared_ptr<MockPersistableQueue> mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store));
+ mpq->asyncCreate();
m_queueList.push_back(mpq);
}
}
@@ -83,32 +95,38 @@ PerfTest::prepareQueues()
void
PerfTest::destroyQueues()
{
- for (std::deque<MockPersistableQueue::intrusive_ptr>::iterator i=m_queueList.begin(); i!=m_queueList.end(); ++i) {
- (*i)->asyncStoreDestroy();
+ while (m_queueList.size() > 0) {
+ m_queueList.front()->asyncDestroy(m_testOpts.m_destroyQueuesOnCompletion);
+ m_queueList.pop_front();
}
}
void
PerfTest::run()
{
- typedef boost::shared_ptr<tests::storePerftools::common::Thread> ThreadPtr; // TODO - replace with qpid threads
-
- prepareStore();
+ if (m_testOpts.m_durable) {
+ prepareStore();
+ }
prepareQueues();
- std::deque<ThreadPtr> threads;
+ // TODO: replace with qpid::sys::Thread
+ std::deque<boost::shared_ptr<tests::storePerftools::common::Thread> > threads;
{ // --- Start of timed section ---
tests::storePerftools::common::ScopedTimer st(m_testResult);
for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) {
+ boost::shared_ptr<MessageProducer> mp(new MessageProducer(m_testOpts, m_msgData, m_store, m_queueList[q]));
+ m_producers.push_back(mp);
for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads
- ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startEnqueues,
- reinterpret_cast<void*>(m_queueList[q].get())));
+ boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mp->startProducers,
+ reinterpret_cast<void*>(mp.get())));
threads.push_back(tp);
}
+ boost::shared_ptr<MessageConsumer> mc(new MessageConsumer(m_testOpts, m_queueList[q]));
+ m_consumers.push_back(mc);
for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads
- ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startDequeues,
- reinterpret_cast<void*>(m_queueList[q].get())));
+ boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mc->startConsumers,
+ reinterpret_cast<void*>(mc.get())));
threads.push_back(tp);
}
}
@@ -117,8 +135,8 @@ PerfTest::run()
threads.pop_front();
}
} // --- End of timed section ---
- // TODO: Add test param to allow queues to be destroyed or left when test ends
destroyQueues();
+ destroyStore();
}
void
@@ -172,6 +190,6 @@ main(int argc, char** argv)
// Print test result
std::cout << apt << std::endl;
- ::sleep(1);
+ //::sleep(1);
return 0;
}