diff options
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp')
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp | 47 |
1 files changed, 43 insertions, 4 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp index 5530513e12..008ddf33e7 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -26,6 +26,10 @@ #include "SimplePersistableMessage.h" #include "SimplePersistableQueue.h" #include "TestOptions.h" +#include "TxnPublish.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/broker/TxnBuffer.h" #include <stdint.h> // uint32_t @@ -33,15 +37,15 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimpleTransactionContext; - MessageProducer::MessageProducer(const TestOptions& perfTestParams, const char* msgData, qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq, boost::shared_ptr<SimplePersistableQueue> queue) : m_perfTestParams(perfTestParams), m_msgData(msgData), m_store(store), + m_resultQueue(arq), m_queue(queue) {} @@ -51,11 +55,46 @@ MessageProducer::~MessageProducer() void* MessageProducer::runProducers() { + const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U; + uint16_t txnCnt = 0U; + qpid::broker::TxnBuffer* tb = 0; + if (useTxns) { + tb = new qpid::broker::TxnBuffer(m_resultQueue); + } for (uint32_t numMsgs=0; numMsgs<m_perfTestParams.m_numMsgs; ++numMsgs) { boost::intrusive_ptr<SimplePersistableMessage> msg(new SimplePersistableMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); - m_queue->deliver(msg); + if (useTxns) { + boost::shared_ptr<TxnPublish> op(new TxnPublish(msg)); + op->deliverTo(m_queue); + tb->enlist(op); + if (++txnCnt >= m_perfTestParams.m_enqTxnBlockSize) { + if (m_perfTestParams.m_durable) { + tb->commitLocal(m_store); + + // TxnBuffer instance tb carries async state that precludes it being re-used for the next + // transaction until the current commit cycle completes. So use another instance. This + // instance should auto-delete when the async commit cycle completes. + if (numMsgs<m_perfTestParams.m_numMsgs) { + //tb = boost::shared_ptr<qpid::broker::TxnBuffer>(new qpid::broker::TxnBuffer(m_resultQueue)); + tb = new qpid::broker::TxnBuffer(m_resultQueue); + } + } else { + tb->commit(); + } + txnCnt = 0U; + } + } else { + m_queue->deliver(msg); + } + } + if (txnCnt) { + if (m_perfTestParams.m_durable) { + tb->commitLocal(m_store); + } else { + tb->commit(); + } } - return 0; + return reinterpret_cast<void*>(0); } //static |