From cbd4f9c22974db5f53b42a4326486ec8325b79cc Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Mon, 25 Jun 2012 19:11:55 +0000 Subject: WIP - transactional consume path completed, still some testing to be done. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1353703 13f79535-47bb-0310-9956-ffa450edef68 --- .../storePerftools/asyncPerf/MessageConsumer.cpp | 49 ++++++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) (limited to 'cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp') diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 1859bde947..3ac6867ce1 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -23,8 +23,10 @@ #include "MessageConsumer.h" +#include "DeliveryRecord.h" #include "SimpleQueue.h" #include "TestOptions.h" +#include "TxnAccept.h" #include "qpid/asyncStore/AsyncStoreImpl.h" #include "qpid/broker/TxnBuffer.h" @@ -48,26 +50,65 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, MessageConsumer::~MessageConsumer() {} +void +MessageConsumer::record(boost::shared_ptr dr) +{ + // TODO: May need a lock? + m_unacked.push_back(dr); +} + +void +MessageConsumer::dequeueComplete() +{ +//std::cout << "MessageConsumer::dequeueComplete()" << std::endl << std::flush; + // TODO: May need a lock + //++m_numMsgs; +} + void* MessageConsumer::runConsumers() { const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U; - uint16_t txnCnt = 0U; + uint16_t opsInTxnCnt = 0U; qpid::broker::TxnBuffer* tb = 0; if (useTxns) { tb = new qpid::broker::TxnBuffer(m_resultQueue); } - uint32_t numMsgs = 0; + uint32_t numMsgs = 0UL; while (numMsgs < m_perfTestParams.m_numMsgs) { - if (m_queue->dispatch()) { + if (m_queue->dispatch(*this)) { ++numMsgs; + if (useTxns) { + // --- Transactional dequeue --- + if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) { + if (m_perfTestParams.m_durable) { + boost::shared_ptr ta(new TxnAccept(m_unacked)); + m_unacked.clear(); + tb->enlist(ta); + tb->commitLocal(m_store); + if (numMsgs < m_perfTestParams.m_numMsgs) { + tb = new qpid::broker::TxnBuffer(m_resultQueue); + } + } else { + tb->commit(); + } + opsInTxnCnt = 0U; + } + } else { + // --- Non-transactional dequeue --- + for (std::deque >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) { + (*i)->accept(); + } + m_unacked.clear(); + //++numMsgs; + } } else { ::usleep(1000); // TODO - replace this poller with condition variable } } - if (txnCnt) { + if (opsInTxnCnt) { if (m_perfTestParams.m_durable) { tb->commitLocal(m_store); } else { -- cgit v1.2.1