diff options
Diffstat (limited to 'cpp/src/tests')
13 files changed, 125 insertions, 185 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp index e1c67a9547..6f33369a26 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp @@ -82,9 +82,9 @@ DeliveryRecord::isRedundant() const } void -DeliveryRecord::dequeue(qpid::broker::TxnHandle& txn) +DeliveryRecord::dequeue(qpid::broker::TxnBuffer* tb) { - m_queuedMessage->getQueue()->dequeue(txn, m_queuedMessage); + m_queuedMessage->getQueue()->dequeue(tb, m_queuedMessage); } void diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h index d4529941e7..6c5d87f374 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h +++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h @@ -30,7 +30,7 @@ namespace qpid { namespace broker { -class TxnHandle; +class TxnBuffer; }} namespace tests { @@ -51,7 +51,7 @@ public: bool setEnded(); bool isEnded() const; bool isRedundant() const; - void dequeue(qpid::broker::TxnHandle& txn); + void dequeue(qpid::broker::TxnBuffer* tb); void committed() const; boost::shared_ptr<QueuedMessage> getQueuedMessage() const; private: diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 4a2bc2bf0c..6aa477c470 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -47,22 +47,18 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, m_queue(queue) {} -MessageConsumer::~MessageConsumer() -{} +MessageConsumer::~MessageConsumer() {} void -MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) -{ +MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) { m_unacked.push_back(dr); } void -MessageConsumer::commitComplete() -{} +MessageConsumer::commitComplete() {} void* -MessageConsumer::runConsumers() -{ +MessageConsumer::runConsumers() { const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U; uint16_t opsInTxnCnt = 0U; qpid::broker::TxnBuffer* tb = 0; @@ -78,17 +74,13 @@ MessageConsumer::runConsumers() ++numMsgs; if (useTxns) { // --- Transactional dequeue --- + boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked)); + m_unacked.clear(); + tb->enlist(ta); if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) { - if (m_perfTestParams.m_durable) { - boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked)); - m_unacked.clear(); - tb->enlist(ta); - tb->commitLocal(m_store); - if (numMsgs < m_perfTestParams.m_numMsgs) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); - } - } else { - tb->commit(); + tb->commitLocal(m_store); + if (numMsgs < m_perfTestParams.m_numMsgs) { + tb = new qpid::broker::TxnBuffer(m_resultQueue); } opsInTxnCnt = 0U; } @@ -105,11 +97,7 @@ MessageConsumer::runConsumers() } if (opsInTxnCnt) { - if (m_perfTestParams.m_durable) { - tb->commitLocal(m_store); - } else { - tb->commit(); - } + tb->commitLocal(m_store); } return reinterpret_cast<void*>(0); @@ -117,8 +105,7 @@ MessageConsumer::runConsumers() //static void* -MessageConsumer::startConsumers(void* ptr) -{ +MessageConsumer::startConsumers(void* ptr) { return reinterpret_cast<MessageConsumer*>(ptr)->runConsumers(); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp index 7d9aaceb11..974f3f3981 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -49,12 +49,10 @@ MessageProducer::MessageProducer(const TestOptions& perfTestParams, m_queue(queue) {} -MessageProducer::~MessageProducer() -{} +MessageProducer::~MessageProducer() {} void* -MessageProducer::runProducers() -{ +MessageProducer::runProducers() { const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U; uint16_t recsInTxnCnt = 0U; qpid::broker::TxnBuffer* tb = 0; @@ -68,17 +66,13 @@ MessageProducer::runProducers() op->deliverTo(m_queue); tb->enlist(op); if (++recsInTxnCnt >= m_perfTestParams.m_enqTxnBlockSize) { - if (m_perfTestParams.m_durable) { - tb->commitLocal(m_store); + tb->commitLocal(m_store); - // TxnBuffer instance tb carries async state that precludes it being re-used for the next - // transaction until the current commit cycle completes. So use another instance. This - // instance should auto-delete when the async commit cycle completes. - if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); - } - } else { - tb->commit(); + // TxnBuffer instance tb carries async state that precludes it being re-used for the next + // transaction until the current commit cycle completes. So use another instance. This + // instance should auto-delete when the async commit cycle completes. + if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) { + tb = new qpid::broker::TxnBuffer(m_resultQueue); } recsInTxnCnt = 0U; } @@ -87,11 +81,7 @@ MessageProducer::runProducers() } } if (recsInTxnCnt) { - if (m_perfTestParams.m_durable) { - tb->commitLocal(m_store); - } else { - tb->commit(); - } + tb->commitLocal(m_store); } return reinterpret_cast<void*>(0); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h index 7fa74a2c51..127408e3db 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h @@ -32,6 +32,7 @@ class AsyncStoreImpl; } namespace broker { class AsyncResultQueue; +class TxnBuffer; }} namespace tests { @@ -40,7 +41,6 @@ namespace asyncPerf { class SimpleQueue; class TestOptions; -class TxnBuffer; class MessageProducer { diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp index 572089faaf..0d16248c7f 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp @@ -89,9 +89,9 @@ QueuedMessage::enqHandle() } void -QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th) +QueuedMessage::prepareEnqueue(qpid::broker::TxnBuffer* tb) { - m_queue->enqueue(th, shared_from_this()); + m_queue->enqueue(tb, shared_from_this()); } void diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h index dd10f8b501..630fe1aedc 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h @@ -59,7 +59,7 @@ public: qpid::broker::EnqueueHandle& enqHandle(); // --- Transaction handling --- - void prepareEnqueue(qpid::broker::TxnHandle& th); + void prepareEnqueue(qpid::broker::TxnBuffer* tb); void commitEnqueue(); void abortEnqueue(); diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp index f297e83402..06b4e9333f 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -31,7 +31,7 @@ #include "qpid/broker/AsyncResultHandle.h" #include "qpid/broker/QueueAsyncContext.h" -#include "qpid/broker/TxnHandle.h" +#include "qpid/broker/TxnBuffer.h" #include <string.h> // memcpy() @@ -65,33 +65,27 @@ SimpleQueue::SimpleQueue(const std::string& name, } } -SimpleQueue::~SimpleQueue() -{} +SimpleQueue::~SimpleQueue() {} const qpid::broker::QueueHandle& -SimpleQueue::getHandle() const -{ +SimpleQueue::getHandle() const { return m_queueHandle; } qpid::broker::QueueHandle& -SimpleQueue::getHandle() -{ +SimpleQueue::getHandle() { return m_queueHandle; } qpid::broker::AsyncStore* -SimpleQueue::getStore() -{ +SimpleQueue::getStore() { return m_store; } void -SimpleQueue::asyncCreate() -{ +SimpleQueue::asyncCreate() { if (m_store) { boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - s_nullTxnHandle, &handleAsyncCreateResult, &m_resultQueue)); m_store->submitCreate(m_queueHandle, this, qac); @@ -123,7 +117,6 @@ SimpleQueue::asyncDestroy(const bool deleteQueue) if (m_store) { if (deleteQueue) { boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - s_nullTxnHandle, &handleAsyncDestroyResult, &m_resultQueue)); m_store->submitDestroy(m_queueHandle, qac); @@ -151,16 +144,14 @@ SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* con } void -SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) -{ +SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) { boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg))); - enqueue(s_nullTxnHandle, qm); + enqueue(qm); push(qm); } bool -SimpleQueue::dispatch(MessageConsumer& mc) -{ +SimpleQueue::dispatch(MessageConsumer& mc) { boost::shared_ptr<QueuedMessage> qm; if (m_messages->consume(qm)) { boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false)); @@ -171,110 +162,95 @@ SimpleQueue::dispatch(MessageConsumer& mc) } bool -SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) -{ - return enqueue(s_nullTxnHandle, qm); +SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) { + return enqueue(0, qm); } bool -SimpleQueue::enqueue(qpid::broker::TxnHandle& th, - boost::shared_ptr<QueuedMessage> qm) -{ +SimpleQueue::enqueue(qpid::broker::TxnBuffer* tb, + boost::shared_ptr<QueuedMessage> qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { return false; } if (qm->payload()->isPersistent() && m_store) { qm->payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(th, qm); + return asyncEnqueue(tb, qm); } return false; } bool -SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) -{ - return dequeue(s_nullTxnHandle, qm); +SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) { + return dequeue(0, qm); } bool -SimpleQueue::dequeue(qpid::broker::TxnHandle& th, - boost::shared_ptr<QueuedMessage> qm) -{ +SimpleQueue::dequeue(qpid::broker::TxnBuffer* tb, + boost::shared_ptr<QueuedMessage> qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { return false; } if (qm->payload()->isPersistent() && m_store) { qm->payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(th, qm); + return asyncDequeue(tb, qm); } return true; } void -SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) -{ +SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) { push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg))); } void -SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage> /*msg*/) -{} +SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage>) {} void -SimpleQueue::encode(qpid::framing::Buffer& buffer) const -{ +SimpleQueue::encode(qpid::framing::Buffer& buffer) const { buffer.putShortString(m_name); } uint32_t -SimpleQueue::encodedSize() const -{ +SimpleQueue::encodedSize() const { return m_name.size() + 1; } uint64_t -SimpleQueue::getPersistenceId() const -{ +SimpleQueue::getPersistenceId() const { return m_persistenceId; } void -SimpleQueue::setPersistenceId(uint64_t persistenceId) const -{ +SimpleQueue::setPersistenceId(uint64_t persistenceId) const { m_persistenceId = persistenceId; } void -SimpleQueue::flush() -{ +SimpleQueue::flush() { //if(m_store) m_store->flush(*this); } const std::string& -SimpleQueue::getName() const -{ +SimpleQueue::getName() const { return m_name; } void -SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) -{ +SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) { if (externalQueueStore != inst && externalQueueStore) delete externalQueueStore; externalQueueStore = inst; } uint64_t -SimpleQueue::getSize() -{ +SimpleQueue::getSize() { return m_persistableData.size(); } void -SimpleQueue::write(char* target) -{ +SimpleQueue::write(char* target) { ::memcpy(target, m_persistableData.data(), m_persistableData.size()); } @@ -344,21 +320,20 @@ SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm, // private bool -SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, - boost::shared_ptr<QueuedMessage> qm) -{ +SimpleQueue::asyncEnqueue(qpid::broker::TxnBuffer* tb, + boost::shared_ptr<QueuedMessage> qm) { assert(qm.get()); -// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), qm->payload(), - th, + tb, &handleAsyncEnqueueResult, &m_resultQueue)); - // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) - if (th.isValid()) { - th.incrOpCnt(); + if (tb) { + tb->incrOpCnt(); + m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac); + } else { + m_store->submitEnqueue(qm->enqHandle(), s_nullTxnHandle, qac); } - m_store->submitEnqueue(qm->enqHandle(), th, qac); ++m_asyncOpCounter; return true; } @@ -382,22 +357,21 @@ SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* con // private bool -SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, +SimpleQueue::asyncDequeue(/*boost::shared_ptr<qpid::broker::TxnBuffer>*/qpid::broker::TxnBuffer* tb, boost::shared_ptr<QueuedMessage> qm) { assert(qm.get()); boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), qm->payload(), - th, + tb, &handleAsyncDequeueResult, &m_resultQueue)); - // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) - if (th.isValid()) { - th.incrOpCnt(); + if (tb) { + tb->incrOpCnt(); + m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac); + } else { + m_store->submitDequeue(qm->enqHandle(), s_nullTxnHandle, qac); } - m_store->submitDequeue(qm->enqHandle(), - th, - qac); ++m_asyncOpCounter; return true; } @@ -420,8 +394,7 @@ SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* con // private void -SimpleQueue::destroyCheck(const std::string& opDescr) const -{ +SimpleQueue::destroyCheck(const std::string& opDescr) const { if (m_destroyPending || m_destroyed) { std::ostringstream oss; oss << opDescr << " on queue \"" << m_name << "\" after call to destroy"; @@ -431,55 +404,54 @@ SimpleQueue::destroyCheck(const std::string& opDescr) const // private void -SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) -{ - assert(qc->getQueue().get() == this); +SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } --m_asyncOpCounter; } // private void -SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) -{ - assert(qc->getQueue().get() == this); +SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } --m_asyncOpCounter; } // private void -SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) -{ - assert(qc->getQueue().get() == this); +SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } --m_asyncOpCounter; m_destroyed = true; } // private void -SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) -{ - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; - - // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) - qpid::broker::TxnHandle th = qc->getTxnHandle(); - if (th.isValid()) { // transactional enqueue - th.decrOpCnt(); +SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + if (qc->getTxnBuffer()) { // transactional enqueue + qc->getTxnBuffer()->decrOpCnt(); + } } + --m_asyncOpCounter; } // private void -SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) -{ - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; - - // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) - qpid::broker::TxnHandle th = qc->getTxnHandle(); - if (th.isValid()) { // transactional enqueue - th.decrOpCnt(); +SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + if (qc->getTxnBuffer()) { // transactional enqueue + qc->getTxnBuffer()->decrOpCnt(); + } } + --m_asyncOpCounter; } }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h index bf88e32345..5f64c9b960 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h @@ -38,6 +38,7 @@ namespace qpid { namespace broker { class AsyncResultQueue; class QueueAsyncContext; +class TxnBuffer; } namespace framing { class FieldTable; @@ -76,10 +77,10 @@ public: void deliver(boost::intrusive_ptr<SimpleMessage> msg); bool dispatch(MessageConsumer& mc); bool enqueue(boost::shared_ptr<QueuedMessage> qm); - bool enqueue(qpid::broker::TxnHandle& th, + bool enqueue(qpid::broker::TxnBuffer* tb, boost::shared_ptr<QueuedMessage> qm); bool dequeue(boost::shared_ptr<QueuedMessage> qm); - bool dequeue(qpid::broker::TxnHandle& th, + bool dequeue(qpid::broker::TxnBuffer* tb, boost::shared_ptr<QueuedMessage> qm); void process(boost::intrusive_ptr<SimpleMessage> msg); void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg); @@ -134,10 +135,10 @@ private: bool isRecovery = false); // -- Async ops --- - bool asyncEnqueue(qpid::broker::TxnHandle& th, + bool asyncEnqueue(qpid::broker::TxnBuffer* tb, boost::shared_ptr<QueuedMessage> qm); static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh); - bool asyncDequeue(qpid::broker::TxnHandle& th, + bool asyncDequeue(qpid::broker::TxnBuffer* tb, boost::shared_ptr<QueuedMessage> qm); static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh); diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp index 7bede50272..375cd568d2 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp @@ -35,18 +35,17 @@ TxnAccept::TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops) : m_ops(ops) {} -TxnAccept::~TxnAccept() -{} +TxnAccept::~TxnAccept() {} // --- Interface TxnOp --- bool -TxnAccept::prepare(qpid::broker::TxnHandle& th) throw() -{ +TxnAccept::prepare(qpid::broker::TxnBuffer* tb) throw() { try { for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { - (*i)->dequeue(th); + (*i)->dequeue(tb); } + return true; } catch (const std::exception& e) { QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what()); } catch (...) { @@ -56,8 +55,7 @@ TxnAccept::prepare(qpid::broker::TxnHandle& th) throw() } void -TxnAccept::commit() throw() -{ +TxnAccept::commit() throw() { try { for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) { (*i)->committed(); @@ -71,7 +69,6 @@ TxnAccept::commit() throw() } void -TxnAccept::rollback() throw() -{} +TxnAccept::rollback() throw() {} }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h index 6bc7ff9ccb..5d84289965 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h @@ -41,9 +41,9 @@ public: virtual ~TxnAccept(); // --- Interface TxnOp --- - bool prepare(qpid::broker::TxnHandle& th) throw(); - void commit() throw(); - void rollback() throw(); + bool prepare(qpid::broker::TxnBuffer* tb) throw(); + void commit() throw(); + void rollback() throw(); private: std::deque<boost::shared_ptr<DeliveryRecord> > m_ops; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp index 6e15526e8f..cc36a38be7 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp @@ -38,15 +38,13 @@ TxnPublish::TxnPublish(boost::intrusive_ptr<SimpleMessage> msg) : m_msg(msg) {} -TxnPublish::~TxnPublish() -{} +TxnPublish::~TxnPublish() {} bool -TxnPublish::prepare(qpid::broker::TxnHandle& th) throw() -{ - try{ +TxnPublish::prepare(qpid::broker::TxnBuffer* tb) throw() { + try { while (!m_queues.empty()) { - m_queues.front()->prepareEnqueue(th); + m_queues.front()->prepareEnqueue(tb); m_prepared.push_back(m_queues.front()); m_queues.pop_front(); } @@ -60,8 +58,7 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw() } void -TxnPublish::commit() throw() -{ +TxnPublish::commit() throw() { try { for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { (*i)->commitEnqueue(); @@ -74,8 +71,7 @@ TxnPublish::commit() throw() } void -TxnPublish::rollback() throw() -{ +TxnPublish::rollback() throw() { try { for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { (*i)->abortEnqueue(); @@ -88,21 +84,18 @@ TxnPublish::rollback() throw() } uint64_t -TxnPublish::contentSize() -{ +TxnPublish::contentSize() { return m_msg->contentSize(); } void -TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) -{ +TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) { m_queues.push_back(boost::shared_ptr<QueuedMessage>(new QueuedMessage(queue.get(), m_msg))); m_delivered = true; } SimpleMessage& -TxnPublish::getMessage() -{ +TxnPublish::getMessage() { return *m_msg; } diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h index 17c3b3778d..eae9ef9c4c 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h @@ -48,7 +48,7 @@ public: virtual ~TxnPublish(); // --- Interface TxOp --- - bool prepare(qpid::broker::TxnHandle& th) throw(); + bool prepare(qpid::broker::TxnBuffer* tb) throw(); void commit() throw(); void rollback() throw(); |