summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp57
1 files changed, 46 insertions, 11 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp
index b34357c144..1a3eae4b43 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp
@@ -25,17 +25,21 @@
#include "MessageDeque.h"
#include "SimplePersistableMessage.h"
-#include "SimpleTransactionContext.h"
#include "QueueAsyncContext.h"
#include "QueuedMessage.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
#include "qpid/broker/AsyncResultHandle.h"
+#include "qpid/broker/TxnHandle.h"
namespace tests {
namespace storePerftools {
namespace asyncPerf {
+//static
+qpid::broker::TxnHandle SimplePersistableQueue::s_nullTxnHandle; // used for non-txn operations
+
+
SimplePersistableQueue::SimplePersistableQueue(const std::string& name,
const qpid::framing::FieldTable& /*args*/,
qpid::asyncStore::AsyncStoreImpl* store,
@@ -127,6 +131,7 @@ SimplePersistableQueue::asyncCreate()
{
if (m_store) {
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ s_nullTxnHandle,
qpid::asyncStore::AsyncOperation::QUEUE_CREATE,
&handleAsyncResult,
&m_resultQueue));
@@ -144,6 +149,7 @@ SimplePersistableQueue::asyncDestroy(const bool deleteQueue)
if (m_store) {
if (deleteQueue) {
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ s_nullTxnHandle,
qpid::asyncStore::AsyncOperation::QUEUE_DESTROY,
&handleAsyncResult,
&m_resultQueue));
@@ -159,7 +165,7 @@ void
SimplePersistableQueue::deliver(boost::intrusive_ptr<SimplePersistableMessage> msg)
{
QueuedMessage qm(this, msg);
- enqueue((SimpleTransactionContext*)0, qm);
+ enqueue(s_nullTxnHandle, qm);
push(qm);
}
@@ -168,13 +174,13 @@ SimplePersistableQueue::dispatch()
{
QueuedMessage qm;
if (m_messages->consume(qm)) {
- return dequeue((SimpleTransactionContext*)0, qm);
+ return dequeue(s_nullTxnHandle, qm);
}
return false;
}
bool
-SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt,
+SimplePersistableQueue::enqueue(qpid::broker::TxnHandle& th,
QueuedMessage& qm)
{
ScopedUse u(m_barrier);
@@ -183,13 +189,13 @@ SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt,
}
if (qm.payload()->isPersistent() && m_store) {
qm.payload()->enqueueAsync(shared_from_this(), m_store);
- return asyncEnqueue(ctxt, qm);
+ return asyncEnqueue(th, qm);
}
return false;
}
bool
-SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt,
+SimplePersistableQueue::dequeue(qpid::broker::TxnHandle& th,
QueuedMessage& qm)
{
ScopedUse u(m_barrier);
@@ -198,12 +204,23 @@ SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt,
}
if (qm.payload()->isPersistent() && m_store) {
qm.payload()->dequeueAsync(shared_from_this(), m_store);
- return asyncDequeue(ctxt, qm);
+ return asyncDequeue(th, qm);
}
return true;
}
void
+SimplePersistableQueue::process(boost::intrusive_ptr<SimplePersistableMessage> msg)
+{
+ QueuedMessage qm(this, msg);
+ push(qm);
+}
+
+void
+SimplePersistableQueue::enqueueAborted(boost::intrusive_ptr<SimplePersistableMessage> /*msg*/)
+{}
+
+void
SimplePersistableQueue::encode(qpid::framing::Buffer& buffer) const
{
buffer.putShortString(m_name);
@@ -326,38 +343,46 @@ SimplePersistableQueue::push(QueuedMessage& qm,
// private
bool
-SimplePersistableQueue::asyncEnqueue(SimpleTransactionContext* txn,
+SimplePersistableQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
QueuedMessage& qm)
{
qm.payload()->setPersistenceId(m_store->getNextRid());
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
qm.payload(),
+ th,
qpid::asyncStore::AsyncOperation::MSG_ENQUEUE,
&handleAsyncResult,
&m_resultQueue));
m_store->submitEnqueue(qm.enqHandle(),
- txn->getHandle(),
+ th,
qac);
++m_asyncOpCounter;
+ if (th.isValid()) {
+ th.incrOpCnt();
+ }
return true;
}
// private
bool
-SimplePersistableQueue::asyncDequeue(SimpleTransactionContext* txn,
+SimplePersistableQueue::asyncDequeue(qpid::broker::TxnHandle& th,
QueuedMessage& qm)
{
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
qm.payload(),
+ th,
qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
&handleAsyncResult,
&m_resultQueue));
m_store->submitDequeue(qm.enqHandle(),
- txn->getHandle(),
+ th,
qac);
++m_asyncOpCounter;
+ if (th.isValid()) {
+ th.incrOpCnt();
+ }
return true;
}
@@ -407,6 +432,11 @@ SimplePersistableQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContex
//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush;
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
+
+ qpid::broker::TxnHandle th = qc->getTxnHandle();
+ if (th.isValid()) { // transactional enqueue
+ th.decrOpCnt();
+ }
}
// private
@@ -416,6 +446,11 @@ SimplePersistableQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContex
//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush;
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
+
+ qpid::broker::TxnHandle th = qc->getTxnHandle();
+ if (th.isValid()) { // transactional enqueue
+ th.decrOpCnt();
+ }
}
}}} // namespace tests::storePerftools::asyncPerf