diff options
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp')
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp | 57 |
1 files changed, 46 insertions, 11 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp index b34357c144..1a3eae4b43 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp @@ -25,17 +25,21 @@ #include "MessageDeque.h" #include "SimplePersistableMessage.h" -#include "SimpleTransactionContext.h" #include "QueueAsyncContext.h" #include "QueuedMessage.h" #include "qpid/asyncStore/AsyncStoreImpl.h" #include "qpid/broker/AsyncResultHandle.h" +#include "qpid/broker/TxnHandle.h" namespace tests { namespace storePerftools { namespace asyncPerf { +//static +qpid::broker::TxnHandle SimplePersistableQueue::s_nullTxnHandle; // used for non-txn operations + + SimplePersistableQueue::SimplePersistableQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, qpid::asyncStore::AsyncStoreImpl* store, @@ -127,6 +131,7 @@ SimplePersistableQueue::asyncCreate() { if (m_store) { boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + s_nullTxnHandle, qpid::asyncStore::AsyncOperation::QUEUE_CREATE, &handleAsyncResult, &m_resultQueue)); @@ -144,6 +149,7 @@ SimplePersistableQueue::asyncDestroy(const bool deleteQueue) if (m_store) { if (deleteQueue) { boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + s_nullTxnHandle, qpid::asyncStore::AsyncOperation::QUEUE_DESTROY, &handleAsyncResult, &m_resultQueue)); @@ -159,7 +165,7 @@ void SimplePersistableQueue::deliver(boost::intrusive_ptr<SimplePersistableMessage> msg) { QueuedMessage qm(this, msg); - enqueue((SimpleTransactionContext*)0, qm); + enqueue(s_nullTxnHandle, qm); push(qm); } @@ -168,13 +174,13 @@ SimplePersistableQueue::dispatch() { QueuedMessage qm; if (m_messages->consume(qm)) { - return dequeue((SimpleTransactionContext*)0, qm); + return dequeue(s_nullTxnHandle, qm); } return false; } bool -SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt, +SimplePersistableQueue::enqueue(qpid::broker::TxnHandle& th, QueuedMessage& qm) { ScopedUse u(m_barrier); @@ -183,13 +189,13 @@ SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt, } if (qm.payload()->isPersistent() && m_store) { qm.payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(ctxt, qm); + return asyncEnqueue(th, qm); } return false; } bool -SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt, +SimplePersistableQueue::dequeue(qpid::broker::TxnHandle& th, QueuedMessage& qm) { ScopedUse u(m_barrier); @@ -198,12 +204,23 @@ SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt, } if (qm.payload()->isPersistent() && m_store) { qm.payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(ctxt, qm); + return asyncDequeue(th, qm); } return true; } void +SimplePersistableQueue::process(boost::intrusive_ptr<SimplePersistableMessage> msg) +{ + QueuedMessage qm(this, msg); + push(qm); +} + +void +SimplePersistableQueue::enqueueAborted(boost::intrusive_ptr<SimplePersistableMessage> /*msg*/) +{} + +void SimplePersistableQueue::encode(qpid::framing::Buffer& buffer) const { buffer.putShortString(m_name); @@ -326,38 +343,46 @@ SimplePersistableQueue::push(QueuedMessage& qm, // private bool -SimplePersistableQueue::asyncEnqueue(SimpleTransactionContext* txn, +SimplePersistableQueue::asyncEnqueue(qpid::broker::TxnHandle& th, QueuedMessage& qm) { qm.payload()->setPersistenceId(m_store->getNextRid()); //std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), qm.payload(), + th, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, &handleAsyncResult, &m_resultQueue)); m_store->submitEnqueue(qm.enqHandle(), - txn->getHandle(), + th, qac); ++m_asyncOpCounter; + if (th.isValid()) { + th.incrOpCnt(); + } return true; } // private bool -SimplePersistableQueue::asyncDequeue(SimpleTransactionContext* txn, +SimplePersistableQueue::asyncDequeue(qpid::broker::TxnHandle& th, QueuedMessage& qm) { //std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), qm.payload(), + th, qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, &handleAsyncResult, &m_resultQueue)); m_store->submitDequeue(qm.enqHandle(), - txn->getHandle(), + th, qac); ++m_asyncOpCounter; + if (th.isValid()) { + th.incrOpCnt(); + } return true; } @@ -407,6 +432,11 @@ SimplePersistableQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContex //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; assert(qc->getQueue().get() == this); --m_asyncOpCounter; + + qpid::broker::TxnHandle th = qc->getTxnHandle(); + if (th.isValid()) { // transactional enqueue + th.decrOpCnt(); + } } // private @@ -416,6 +446,11 @@ SimplePersistableQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContex //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; assert(qc->getQueue().get() == this); --m_asyncOpCounter; + + qpid::broker::TxnHandle th = qc->getTxnHandle(); + if (th.isValid()) { // transactional enqueue + th.decrOpCnt(); + } } }}} // namespace tests::storePerftools::asyncPerf |