summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp14
1 files changed, 5 insertions, 9 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 3ac6867ce1..4a2bc2bf0c 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -53,17 +53,12 @@ MessageConsumer::~MessageConsumer()
void
MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr)
{
- // TODO: May need a lock?
m_unacked.push_back(dr);
}
void
-MessageConsumer::dequeueComplete()
-{
-//std::cout << "MessageConsumer::dequeueComplete()" << std::endl << std::flush;
- // TODO: May need a lock
- //++m_numMsgs;
-}
+MessageConsumer::commitComplete()
+{}
void*
MessageConsumer::runConsumers()
@@ -75,8 +70,10 @@ MessageConsumer::runConsumers()
tb = new qpid::broker::TxnBuffer(m_resultQueue);
}
+ uint32_t msgsPerConsumer = m_perfTestParams.m_numEnqThreadsPerQueue * m_perfTestParams.m_numMsgs /
+ m_perfTestParams.m_numDeqThreadsPerQueue;
uint32_t numMsgs = 0UL;
- while (numMsgs < m_perfTestParams.m_numMsgs) {
+ while (numMsgs < msgsPerConsumer) {
if (m_queue->dispatch(*this)) {
++numMsgs;
if (useTxns) {
@@ -101,7 +98,6 @@ MessageConsumer::runConsumers()
(*i)->accept();
}
m_unacked.clear();
- //++numMsgs;
}
} else {
::usleep(1000); // TODO - replace this poller with condition variable