diff options
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp')
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp | 49 |
1 files changed, 45 insertions, 4 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 1859bde947..3ac6867ce1 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -23,8 +23,10 @@ #include "MessageConsumer.h" +#include "DeliveryRecord.h" #include "SimpleQueue.h" #include "TestOptions.h" +#include "TxnAccept.h" #include "qpid/asyncStore/AsyncStoreImpl.h" #include "qpid/broker/TxnBuffer.h" @@ -48,26 +50,65 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, MessageConsumer::~MessageConsumer() {} +void +MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) +{ + // TODO: May need a lock? + m_unacked.push_back(dr); +} + +void +MessageConsumer::dequeueComplete() +{ +//std::cout << "MessageConsumer::dequeueComplete()" << std::endl << std::flush; + // TODO: May need a lock + //++m_numMsgs; +} + void* MessageConsumer::runConsumers() { const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U; - uint16_t txnCnt = 0U; + uint16_t opsInTxnCnt = 0U; qpid::broker::TxnBuffer* tb = 0; if (useTxns) { tb = new qpid::broker::TxnBuffer(m_resultQueue); } - uint32_t numMsgs = 0; + uint32_t numMsgs = 0UL; while (numMsgs < m_perfTestParams.m_numMsgs) { - if (m_queue->dispatch()) { + if (m_queue->dispatch(*this)) { ++numMsgs; + if (useTxns) { + // --- Transactional dequeue --- + if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) { + if (m_perfTestParams.m_durable) { + boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked)); + m_unacked.clear(); + tb->enlist(ta); + tb->commitLocal(m_store); + if (numMsgs < m_perfTestParams.m_numMsgs) { + tb = new qpid::broker::TxnBuffer(m_resultQueue); + } + } else { + tb->commit(); + } + opsInTxnCnt = 0U; + } + } else { + // --- Non-transactional dequeue --- + for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) { + (*i)->accept(); + } + m_unacked.clear(); + //++numMsgs; + } } else { ::usleep(1000); // TODO - replace this poller with condition variable } } - if (txnCnt) { + if (opsInTxnCnt) { if (m_perfTestParams.m_durable) { tb->commitLocal(m_store); } else { |