summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-07-31 13:35:53 +0000
committerKim van der Riet <kpvdr@apache.org>2012-07-31 13:35:53 +0000
commit63c6598f401ac6406e5a31c602c7892b798536fc (patch)
tree73b3c1a519ada213c9e117244aab99d2e64d4f2a /cpp/src/tests
parentb435b07eb8fa9db484f85b39daaf43642dd623ca (diff)
downloadqpid-python-63c6598f401ac6406e5a31c602c7892b798536fc.tar.gz
QPID-3858: WIP: Durable transactions fixed
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1367535 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp37
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp28
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h2
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h2
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp172
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h9
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp15
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h6
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp25
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h2
13 files changed, 125 insertions, 185 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
index e1c67a9547..6f33369a26 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
@@ -82,9 +82,9 @@ DeliveryRecord::isRedundant() const
}
void
-DeliveryRecord::dequeue(qpid::broker::TxnHandle& txn)
+DeliveryRecord::dequeue(qpid::broker::TxnBuffer* tb)
{
- m_queuedMessage->getQueue()->dequeue(txn, m_queuedMessage);
+ m_queuedMessage->getQueue()->dequeue(tb, m_queuedMessage);
}
void
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
index d4529941e7..6c5d87f374 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
@@ -30,7 +30,7 @@
namespace qpid {
namespace broker {
-class TxnHandle;
+class TxnBuffer;
}}
namespace tests {
@@ -51,7 +51,7 @@ public:
bool setEnded();
bool isEnded() const;
bool isRedundant() const;
- void dequeue(qpid::broker::TxnHandle& txn);
+ void dequeue(qpid::broker::TxnBuffer* tb);
void committed() const;
boost::shared_ptr<QueuedMessage> getQueuedMessage() const;
private:
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 4a2bc2bf0c..6aa477c470 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -47,22 +47,18 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
m_queue(queue)
{}
-MessageConsumer::~MessageConsumer()
-{}
+MessageConsumer::~MessageConsumer() {}
void
-MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr)
-{
+MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) {
m_unacked.push_back(dr);
}
void
-MessageConsumer::commitComplete()
-{}
+MessageConsumer::commitComplete() {}
void*
-MessageConsumer::runConsumers()
-{
+MessageConsumer::runConsumers() {
const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U;
uint16_t opsInTxnCnt = 0U;
qpid::broker::TxnBuffer* tb = 0;
@@ -78,17 +74,13 @@ MessageConsumer::runConsumers()
++numMsgs;
if (useTxns) {
// --- Transactional dequeue ---
+ boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked));
+ m_unacked.clear();
+ tb->enlist(ta);
if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) {
- if (m_perfTestParams.m_durable) {
- boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked));
- m_unacked.clear();
- tb->enlist(ta);
- tb->commitLocal(m_store);
- if (numMsgs < m_perfTestParams.m_numMsgs) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
- }
- } else {
- tb->commit();
+ tb->commitLocal(m_store);
+ if (numMsgs < m_perfTestParams.m_numMsgs) {
+ tb = new qpid::broker::TxnBuffer(m_resultQueue);
}
opsInTxnCnt = 0U;
}
@@ -105,11 +97,7 @@ MessageConsumer::runConsumers()
}
if (opsInTxnCnt) {
- if (m_perfTestParams.m_durable) {
- tb->commitLocal(m_store);
- } else {
- tb->commit();
- }
+ tb->commitLocal(m_store);
}
return reinterpret_cast<void*>(0);
@@ -117,8 +105,7 @@ MessageConsumer::runConsumers()
//static
void*
-MessageConsumer::startConsumers(void* ptr)
-{
+MessageConsumer::startConsumers(void* ptr) {
return reinterpret_cast<MessageConsumer*>(ptr)->runConsumers();
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
index 7d9aaceb11..974f3f3981 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
@@ -49,12 +49,10 @@ MessageProducer::MessageProducer(const TestOptions& perfTestParams,
m_queue(queue)
{}
-MessageProducer::~MessageProducer()
-{}
+MessageProducer::~MessageProducer() {}
void*
-MessageProducer::runProducers()
-{
+MessageProducer::runProducers() {
const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U;
uint16_t recsInTxnCnt = 0U;
qpid::broker::TxnBuffer* tb = 0;
@@ -68,17 +66,13 @@ MessageProducer::runProducers()
op->deliverTo(m_queue);
tb->enlist(op);
if (++recsInTxnCnt >= m_perfTestParams.m_enqTxnBlockSize) {
- if (m_perfTestParams.m_durable) {
- tb->commitLocal(m_store);
+ tb->commitLocal(m_store);
- // TxnBuffer instance tb carries async state that precludes it being re-used for the next
- // transaction until the current commit cycle completes. So use another instance. This
- // instance should auto-delete when the async commit cycle completes.
- if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
- }
- } else {
- tb->commit();
+ // TxnBuffer instance tb carries async state that precludes it being re-used for the next
+ // transaction until the current commit cycle completes. So use another instance. This
+ // instance should auto-delete when the async commit cycle completes.
+ if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) {
+ tb = new qpid::broker::TxnBuffer(m_resultQueue);
}
recsInTxnCnt = 0U;
}
@@ -87,11 +81,7 @@ MessageProducer::runProducers()
}
}
if (recsInTxnCnt) {
- if (m_perfTestParams.m_durable) {
- tb->commitLocal(m_store);
- } else {
- tb->commit();
- }
+ tb->commitLocal(m_store);
}
return reinterpret_cast<void*>(0);
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
index 7fa74a2c51..127408e3db 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
@@ -32,6 +32,7 @@ class AsyncStoreImpl;
}
namespace broker {
class AsyncResultQueue;
+class TxnBuffer;
}}
namespace tests {
@@ -40,7 +41,6 @@ namespace asyncPerf {
class SimpleQueue;
class TestOptions;
-class TxnBuffer;
class MessageProducer
{
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
index 572089faaf..0d16248c7f 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
@@ -89,9 +89,9 @@ QueuedMessage::enqHandle()
}
void
-QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th)
+QueuedMessage::prepareEnqueue(qpid::broker::TxnBuffer* tb)
{
- m_queue->enqueue(th, shared_from_this());
+ m_queue->enqueue(tb, shared_from_this());
}
void
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
index dd10f8b501..630fe1aedc 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
@@ -59,7 +59,7 @@ public:
qpid::broker::EnqueueHandle& enqHandle();
// --- Transaction handling ---
- void prepareEnqueue(qpid::broker::TxnHandle& th);
+ void prepareEnqueue(qpid::broker::TxnBuffer* tb);
void commitEnqueue();
void abortEnqueue();
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
index f297e83402..06b4e9333f 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
@@ -31,7 +31,7 @@
#include "qpid/broker/AsyncResultHandle.h"
#include "qpid/broker/QueueAsyncContext.h"
-#include "qpid/broker/TxnHandle.h"
+#include "qpid/broker/TxnBuffer.h"
#include <string.h> // memcpy()
@@ -65,33 +65,27 @@ SimpleQueue::SimpleQueue(const std::string& name,
}
}
-SimpleQueue::~SimpleQueue()
-{}
+SimpleQueue::~SimpleQueue() {}
const qpid::broker::QueueHandle&
-SimpleQueue::getHandle() const
-{
+SimpleQueue::getHandle() const {
return m_queueHandle;
}
qpid::broker::QueueHandle&
-SimpleQueue::getHandle()
-{
+SimpleQueue::getHandle() {
return m_queueHandle;
}
qpid::broker::AsyncStore*
-SimpleQueue::getStore()
-{
+SimpleQueue::getStore() {
return m_store;
}
void
-SimpleQueue::asyncCreate()
-{
+SimpleQueue::asyncCreate() {
if (m_store) {
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- s_nullTxnHandle,
&handleAsyncCreateResult,
&m_resultQueue));
m_store->submitCreate(m_queueHandle, this, qac);
@@ -123,7 +117,6 @@ SimpleQueue::asyncDestroy(const bool deleteQueue)
if (m_store) {
if (deleteQueue) {
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- s_nullTxnHandle,
&handleAsyncDestroyResult,
&m_resultQueue));
m_store->submitDestroy(m_queueHandle, qac);
@@ -151,16 +144,14 @@ SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* con
}
void
-SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
-{
+SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) {
boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
- enqueue(s_nullTxnHandle, qm);
+ enqueue(qm);
push(qm);
}
bool
-SimpleQueue::dispatch(MessageConsumer& mc)
-{
+SimpleQueue::dispatch(MessageConsumer& mc) {
boost::shared_ptr<QueuedMessage> qm;
if (m_messages->consume(qm)) {
boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false));
@@ -171,110 +162,95 @@ SimpleQueue::dispatch(MessageConsumer& mc)
}
bool
-SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm)
-{
- return enqueue(s_nullTxnHandle, qm);
+SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) {
+ return enqueue(0, qm);
}
bool
-SimpleQueue::enqueue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<QueuedMessage> qm)
-{
+SimpleQueue::enqueue(qpid::broker::TxnBuffer* tb,
+ boost::shared_ptr<QueuedMessage> qm) {
ScopedUse u(m_barrier);
if (!u.m_acquired) {
return false;
}
if (qm->payload()->isPersistent() && m_store) {
qm->payload()->enqueueAsync(shared_from_this(), m_store);
- return asyncEnqueue(th, qm);
+ return asyncEnqueue(tb, qm);
}
return false;
}
bool
-SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm)
-{
- return dequeue(s_nullTxnHandle, qm);
+SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) {
+ return dequeue(0, qm);
}
bool
-SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<QueuedMessage> qm)
-{
+SimpleQueue::dequeue(qpid::broker::TxnBuffer* tb,
+ boost::shared_ptr<QueuedMessage> qm) {
ScopedUse u(m_barrier);
if (!u.m_acquired) {
return false;
}
if (qm->payload()->isPersistent() && m_store) {
qm->payload()->dequeueAsync(shared_from_this(), m_store);
- return asyncDequeue(th, qm);
+ return asyncDequeue(tb, qm);
}
return true;
}
void
-SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
-{
+SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) {
push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
}
void
-SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage> /*msg*/)
-{}
+SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage>) {}
void
-SimpleQueue::encode(qpid::framing::Buffer& buffer) const
-{
+SimpleQueue::encode(qpid::framing::Buffer& buffer) const {
buffer.putShortString(m_name);
}
uint32_t
-SimpleQueue::encodedSize() const
-{
+SimpleQueue::encodedSize() const {
return m_name.size() + 1;
}
uint64_t
-SimpleQueue::getPersistenceId() const
-{
+SimpleQueue::getPersistenceId() const {
return m_persistenceId;
}
void
-SimpleQueue::setPersistenceId(uint64_t persistenceId) const
-{
+SimpleQueue::setPersistenceId(uint64_t persistenceId) const {
m_persistenceId = persistenceId;
}
void
-SimpleQueue::flush()
-{
+SimpleQueue::flush() {
//if(m_store) m_store->flush(*this);
}
const std::string&
-SimpleQueue::getName() const
-{
+SimpleQueue::getName() const {
return m_name;
}
void
-SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst)
-{
+SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) {
if (externalQueueStore != inst && externalQueueStore)
delete externalQueueStore;
externalQueueStore = inst;
}
uint64_t
-SimpleQueue::getSize()
-{
+SimpleQueue::getSize() {
return m_persistableData.size();
}
void
-SimpleQueue::write(char* target)
-{
+SimpleQueue::write(char* target) {
::memcpy(target, m_persistableData.data(), m_persistableData.size());
}
@@ -344,21 +320,20 @@ SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm,
// private
bool
-SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<QueuedMessage> qm)
-{
+SimpleQueue::asyncEnqueue(qpid::broker::TxnBuffer* tb,
+ boost::shared_ptr<QueuedMessage> qm) {
assert(qm.get());
-// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
qm->payload(),
- th,
+ tb,
&handleAsyncEnqueueResult,
&m_resultQueue));
- // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
- if (th.isValid()) {
- th.incrOpCnt();
+ if (tb) {
+ tb->incrOpCnt();
+ m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac);
+ } else {
+ m_store->submitEnqueue(qm->enqHandle(), s_nullTxnHandle, qac);
}
- m_store->submitEnqueue(qm->enqHandle(), th, qac);
++m_asyncOpCounter;
return true;
}
@@ -382,22 +357,21 @@ SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* con
// private
bool
-SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
+SimpleQueue::asyncDequeue(/*boost::shared_ptr<qpid::broker::TxnBuffer>*/qpid::broker::TxnBuffer* tb,
boost::shared_ptr<QueuedMessage> qm)
{
assert(qm.get());
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
qm->payload(),
- th,
+ tb,
&handleAsyncDequeueResult,
&m_resultQueue));
- // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
- if (th.isValid()) {
- th.incrOpCnt();
+ if (tb) {
+ tb->incrOpCnt();
+ m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac);
+ } else {
+ m_store->submitDequeue(qm->enqHandle(), s_nullTxnHandle, qac);
}
- m_store->submitDequeue(qm->enqHandle(),
- th,
- qac);
++m_asyncOpCounter;
return true;
}
@@ -420,8 +394,7 @@ SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* con
// private
void
-SimpleQueue::destroyCheck(const std::string& opDescr) const
-{
+SimpleQueue::destroyCheck(const std::string& opDescr) const {
if (m_destroyPending || m_destroyed) {
std::ostringstream oss;
oss << opDescr << " on queue \"" << m_name << "\" after call to destroy";
@@ -431,55 +404,54 @@ SimpleQueue::destroyCheck(const std::string& opDescr) const
// private
void
-SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
+SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ }
--m_asyncOpCounter;
}
// private
void
-SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
+SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ }
--m_asyncOpCounter;
}
// private
void
-SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
+SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ }
--m_asyncOpCounter;
m_destroyed = true;
}
// private
void
-SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
- --m_asyncOpCounter;
-
- // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
- qpid::broker::TxnHandle th = qc->getTxnHandle();
- if (th.isValid()) { // transactional enqueue
- th.decrOpCnt();
+SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ if (qc->getTxnBuffer()) { // transactional enqueue
+ qc->getTxnBuffer()->decrOpCnt();
+ }
}
+ --m_asyncOpCounter;
}
// private
void
-SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
- --m_asyncOpCounter;
-
- // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
- qpid::broker::TxnHandle th = qc->getTxnHandle();
- if (th.isValid()) { // transactional enqueue
- th.decrOpCnt();
+SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ if (qc->getTxnBuffer()) { // transactional enqueue
+ qc->getTxnBuffer()->decrOpCnt();
+ }
}
+ --m_asyncOpCounter;
}
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
index bf88e32345..5f64c9b960 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
@@ -38,6 +38,7 @@ namespace qpid {
namespace broker {
class AsyncResultQueue;
class QueueAsyncContext;
+class TxnBuffer;
}
namespace framing {
class FieldTable;
@@ -76,10 +77,10 @@ public:
void deliver(boost::intrusive_ptr<SimpleMessage> msg);
bool dispatch(MessageConsumer& mc);
bool enqueue(boost::shared_ptr<QueuedMessage> qm);
- bool enqueue(qpid::broker::TxnHandle& th,
+ bool enqueue(qpid::broker::TxnBuffer* tb,
boost::shared_ptr<QueuedMessage> qm);
bool dequeue(boost::shared_ptr<QueuedMessage> qm);
- bool dequeue(qpid::broker::TxnHandle& th,
+ bool dequeue(qpid::broker::TxnBuffer* tb,
boost::shared_ptr<QueuedMessage> qm);
void process(boost::intrusive_ptr<SimpleMessage> msg);
void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg);
@@ -134,10 +135,10 @@ private:
bool isRecovery = false);
// -- Async ops ---
- bool asyncEnqueue(qpid::broker::TxnHandle& th,
+ bool asyncEnqueue(qpid::broker::TxnBuffer* tb,
boost::shared_ptr<QueuedMessage> qm);
static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh);
- bool asyncDequeue(qpid::broker::TxnHandle& th,
+ bool asyncDequeue(qpid::broker::TxnBuffer* tb,
boost::shared_ptr<QueuedMessage> qm);
static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh);
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
index 7bede50272..375cd568d2 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
@@ -35,18 +35,17 @@ TxnAccept::TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops) :
m_ops(ops)
{}
-TxnAccept::~TxnAccept()
-{}
+TxnAccept::~TxnAccept() {}
// --- Interface TxnOp ---
bool
-TxnAccept::prepare(qpid::broker::TxnHandle& th) throw()
-{
+TxnAccept::prepare(qpid::broker::TxnBuffer* tb) throw() {
try {
for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
- (*i)->dequeue(th);
+ (*i)->dequeue(tb);
}
+ return true;
} catch (const std::exception& e) {
QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what());
} catch (...) {
@@ -56,8 +55,7 @@ TxnAccept::prepare(qpid::broker::TxnHandle& th) throw()
}
void
-TxnAccept::commit() throw()
-{
+TxnAccept::commit() throw() {
try {
for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) {
(*i)->committed();
@@ -71,7 +69,6 @@ TxnAccept::commit() throw()
}
void
-TxnAccept::rollback() throw()
-{}
+TxnAccept::rollback() throw() {}
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
index 6bc7ff9ccb..5d84289965 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
@@ -41,9 +41,9 @@ public:
virtual ~TxnAccept();
// --- Interface TxnOp ---
- bool prepare(qpid::broker::TxnHandle& th) throw();
- void commit() throw();
- void rollback() throw();
+ bool prepare(qpid::broker::TxnBuffer* tb) throw();
+ void commit() throw();
+ void rollback() throw();
private:
std::deque<boost::shared_ptr<DeliveryRecord> > m_ops;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
index 6e15526e8f..cc36a38be7 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
@@ -38,15 +38,13 @@ TxnPublish::TxnPublish(boost::intrusive_ptr<SimpleMessage> msg) :
m_msg(msg)
{}
-TxnPublish::~TxnPublish()
-{}
+TxnPublish::~TxnPublish() {}
bool
-TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
-{
- try{
+TxnPublish::prepare(qpid::broker::TxnBuffer* tb) throw() {
+ try {
while (!m_queues.empty()) {
- m_queues.front()->prepareEnqueue(th);
+ m_queues.front()->prepareEnqueue(tb);
m_prepared.push_back(m_queues.front());
m_queues.pop_front();
}
@@ -60,8 +58,7 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
}
void
-TxnPublish::commit() throw()
-{
+TxnPublish::commit() throw() {
try {
for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
(*i)->commitEnqueue();
@@ -74,8 +71,7 @@ TxnPublish::commit() throw()
}
void
-TxnPublish::rollback() throw()
-{
+TxnPublish::rollback() throw() {
try {
for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
(*i)->abortEnqueue();
@@ -88,21 +84,18 @@ TxnPublish::rollback() throw()
}
uint64_t
-TxnPublish::contentSize()
-{
+TxnPublish::contentSize() {
return m_msg->contentSize();
}
void
-TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue)
-{
+TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) {
m_queues.push_back(boost::shared_ptr<QueuedMessage>(new QueuedMessage(queue.get(), m_msg)));
m_delivered = true;
}
SimpleMessage&
-TxnPublish::getMessage()
-{
+TxnPublish::getMessage() {
return *m_msg;
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
index 17c3b3778d..eae9ef9c4c 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
@@ -48,7 +48,7 @@ public:
virtual ~TxnPublish();
// --- Interface TxOp ---
- bool prepare(qpid::broker::TxnHandle& th) throw();
+ bool prepare(qpid::broker::TxnBuffer* tb) throw();
void commit() throw();
void rollback() throw();