summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-06-25 19:11:55 +0000
committerKim van der Riet <kpvdr@apache.org>2012-06-25 19:11:55 +0000
commitcbd4f9c22974db5f53b42a4326486ec8325b79cc (patch)
tree95986a4f10104ea2b9cc79c7463d0bc9ab451bcf /cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
parentb95f9427ede4a2045ac6424a6341de9185a13602 (diff)
downloadqpid-python-cbd4f9c22974db5f53b42a4326486ec8325b79cc.tar.gz
WIP - transactional consume path completed, still some testing to be done.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1353703 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp49
1 files changed, 45 insertions, 4 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 1859bde947..3ac6867ce1 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -23,8 +23,10 @@
#include "MessageConsumer.h"
+#include "DeliveryRecord.h"
#include "SimpleQueue.h"
#include "TestOptions.h"
+#include "TxnAccept.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
#include "qpid/broker/TxnBuffer.h"
@@ -48,26 +50,65 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
MessageConsumer::~MessageConsumer()
{}
+void
+MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr)
+{
+ // TODO: May need a lock?
+ m_unacked.push_back(dr);
+}
+
+void
+MessageConsumer::dequeueComplete()
+{
+//std::cout << "MessageConsumer::dequeueComplete()" << std::endl << std::flush;
+ // TODO: May need a lock
+ //++m_numMsgs;
+}
+
void*
MessageConsumer::runConsumers()
{
const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U;
- uint16_t txnCnt = 0U;
+ uint16_t opsInTxnCnt = 0U;
qpid::broker::TxnBuffer* tb = 0;
if (useTxns) {
tb = new qpid::broker::TxnBuffer(m_resultQueue);
}
- uint32_t numMsgs = 0;
+ uint32_t numMsgs = 0UL;
while (numMsgs < m_perfTestParams.m_numMsgs) {
- if (m_queue->dispatch()) {
+ if (m_queue->dispatch(*this)) {
++numMsgs;
+ if (useTxns) {
+ // --- Transactional dequeue ---
+ if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) {
+ if (m_perfTestParams.m_durable) {
+ boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked));
+ m_unacked.clear();
+ tb->enlist(ta);
+ tb->commitLocal(m_store);
+ if (numMsgs < m_perfTestParams.m_numMsgs) {
+ tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ }
+ } else {
+ tb->commit();
+ }
+ opsInTxnCnt = 0U;
+ }
+ } else {
+ // --- Non-transactional dequeue ---
+ for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) {
+ (*i)->accept();
+ }
+ m_unacked.clear();
+ //++numMsgs;
+ }
} else {
::usleep(1000); // TODO - replace this poller with condition variable
}
}
- if (txnCnt) {
+ if (opsInTxnCnt) {
if (m_perfTestParams.m_durable) {
tb->commitLocal(m_store);
} else {