summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-06-21 12:09:00 +0000
committerKim van der Riet <kpvdr@apache.org>2012-06-21 12:09:00 +0000
commitb95f9427ede4a2045ac6424a6341de9185a13602 (patch)
treeef1950c1d03961fe9a08a2bfdb84355c139d57e4 /cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
parent5083cef28cd7d1f594a7632ffec109567f5a3b2b (diff)
downloadqpid-python-b95f9427ede4a2045ac6424a6341de9185a13602.tar.gz
QPID-3858: WIP: New classes for transactional consumption of messages
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1352509 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp27
1 files changed, 25 insertions, 2 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 9b015fc428..1859bde947 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -23,9 +23,12 @@
#include "MessageConsumer.h"
-#include "SimplePersistableQueue.h"
+#include "SimpleQueue.h"
#include "TestOptions.h"
+#include "qpid/asyncStore/AsyncStoreImpl.h"
+#include "qpid/broker/TxnBuffer.h"
+
#include <stdint.h> // uint32_t
namespace tests {
@@ -33,8 +36,12 @@ namespace storePerftools {
namespace asyncPerf {
MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
- boost::shared_ptr<SimplePersistableQueue> queue) :
+ qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncResultQueue& arq,
+ boost::shared_ptr<SimpleQueue> queue) :
m_perfTestParams(perfTestParams),
+ m_store(store),
+ m_resultQueue(arq),
m_queue(queue)
{}
@@ -44,6 +51,13 @@ MessageConsumer::~MessageConsumer()
void*
MessageConsumer::runConsumers()
{
+ const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U;
+ uint16_t txnCnt = 0U;
+ qpid::broker::TxnBuffer* tb = 0;
+ if (useTxns) {
+ tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ }
+
uint32_t numMsgs = 0;
while (numMsgs < m_perfTestParams.m_numMsgs) {
if (m_queue->dispatch()) {
@@ -52,6 +66,15 @@ MessageConsumer::runConsumers()
::usleep(1000); // TODO - replace this poller with condition variable
}
}
+
+ if (txnCnt) {
+ if (m_perfTestParams.m_durable) {
+ tb->commitLocal(m_store);
+ } else {
+ tb->commit();
+ }
+ }
+
return reinterpret_cast<void*>(0);
}