summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp49
1 files changed, 45 insertions, 4 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 1859bde947..3ac6867ce1 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -23,8 +23,10 @@
#include "MessageConsumer.h"
+#include "DeliveryRecord.h"
#include "SimpleQueue.h"
#include "TestOptions.h"
+#include "TxnAccept.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
#include "qpid/broker/TxnBuffer.h"
@@ -48,26 +50,65 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
MessageConsumer::~MessageConsumer()
{}
+void
+MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr)
+{
+ // TODO: May need a lock?
+ m_unacked.push_back(dr);
+}
+
+void
+MessageConsumer::dequeueComplete()
+{
+//std::cout << "MessageConsumer::dequeueComplete()" << std::endl << std::flush;
+ // TODO: May need a lock
+ //++m_numMsgs;
+}
+
void*
MessageConsumer::runConsumers()
{
const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U;
- uint16_t txnCnt = 0U;
+ uint16_t opsInTxnCnt = 0U;
qpid::broker::TxnBuffer* tb = 0;
if (useTxns) {
tb = new qpid::broker::TxnBuffer(m_resultQueue);
}
- uint32_t numMsgs = 0;
+ uint32_t numMsgs = 0UL;
while (numMsgs < m_perfTestParams.m_numMsgs) {
- if (m_queue->dispatch()) {
+ if (m_queue->dispatch(*this)) {
++numMsgs;
+ if (useTxns) {
+ // --- Transactional dequeue ---
+ if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) {
+ if (m_perfTestParams.m_durable) {
+ boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked));
+ m_unacked.clear();
+ tb->enlist(ta);
+ tb->commitLocal(m_store);
+ if (numMsgs < m_perfTestParams.m_numMsgs) {
+ tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ }
+ } else {
+ tb->commit();
+ }
+ opsInTxnCnt = 0U;
+ }
+ } else {
+ // --- Non-transactional dequeue ---
+ for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) {
+ (*i)->accept();
+ }
+ m_unacked.clear();
+ //++numMsgs;
+ }
} else {
::usleep(1000); // TODO - replace this poller with condition variable
}
}
- if (txnCnt) {
+ if (opsInTxnCnt) {
if (m_perfTestParams.m_durable) {
tb->commitLocal(m_store);
} else {