diff options
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp')
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp | 48 |
1 files changed, 33 insertions, 15 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index 7387c348fd..184a899570 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -23,6 +23,8 @@ #include "PerfTest.h" +#include "MessageConsumer.h" +#include "MessageProducer.h" #include "MockPersistableQueue.h" #include "tests/storePerftools/version.h" @@ -30,6 +32,7 @@ #include "tests/storePerftools/common/Thread.h" #include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/sys/Poller.h" #include <iomanip> @@ -56,8 +59,9 @@ PerfTest::~PerfTest() m_pollingThread.join(); m_queueList.clear(); + m_queueList.clear(); + m_producers.clear(); - if (m_store) delete m_store; delete[] m_msgData; } @@ -69,13 +73,21 @@ PerfTest::prepareStore() } void +PerfTest::destroyStore() +{ + if (m_store) { + delete m_store; + } +} + +void PerfTest::prepareQueues() { for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { std::ostringstream qname; qname << "queue_" << std::setw(4) << std::setfill('0') << i; - MockPersistableQueue::intrusive_ptr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store, m_testOpts, m_msgData)); - mpq->asyncStoreCreate(); + boost::shared_ptr<MockPersistableQueue> mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store)); + mpq->asyncCreate(); m_queueList.push_back(mpq); } } @@ -83,32 +95,38 @@ PerfTest::prepareQueues() void PerfTest::destroyQueues() { - for (std::deque<MockPersistableQueue::intrusive_ptr>::iterator i=m_queueList.begin(); i!=m_queueList.end(); ++i) { - (*i)->asyncStoreDestroy(); + while (m_queueList.size() > 0) { + m_queueList.front()->asyncDestroy(m_testOpts.m_destroyQueuesOnCompletion); + m_queueList.pop_front(); } } void PerfTest::run() { - typedef boost::shared_ptr<tests::storePerftools::common::Thread> ThreadPtr; // TODO - replace with qpid threads - - prepareStore(); + if (m_testOpts.m_durable) { + prepareStore(); + } prepareQueues(); - std::deque<ThreadPtr> threads; + // TODO: replace with qpid::sys::Thread + std::deque<boost::shared_ptr<tests::storePerftools::common::Thread> > threads; { // --- Start of timed section --- tests::storePerftools::common::ScopedTimer st(m_testResult); for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) { + boost::shared_ptr<MessageProducer> mp(new MessageProducer(m_testOpts, m_msgData, m_store, m_queueList[q])); + m_producers.push_back(mp); for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads - ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startEnqueues, - reinterpret_cast<void*>(m_queueList[q].get()))); + boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mp->startProducers, + reinterpret_cast<void*>(mp.get()))); threads.push_back(tp); } + boost::shared_ptr<MessageConsumer> mc(new MessageConsumer(m_testOpts, m_queueList[q])); + m_consumers.push_back(mc); for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads - ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startDequeues, - reinterpret_cast<void*>(m_queueList[q].get()))); + boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mc->startConsumers, + reinterpret_cast<void*>(mc.get()))); threads.push_back(tp); } } @@ -117,8 +135,8 @@ PerfTest::run() threads.pop_front(); } } // --- End of timed section --- - // TODO: Add test param to allow queues to be destroyed or left when test ends destroyQueues(); + destroyStore(); } void @@ -172,6 +190,6 @@ main(int argc, char** argv) // Print test result std::cout << apt << std::endl; - ::sleep(1); + //::sleep(1); return 0; } |