diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-06-15 19:21:07 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-06-15 19:21:07 +0000 |
commit | 58337ca40df3a57a16cdee9b7f6b4fe0361b0018 (patch) | |
tree | 391ad7ad1ea8cd42a7e9d4890c724b186e00f38b /cpp/src/tests | |
parent | 01174a9e568f11cd5aa4f22aaa914e00ab9fe163 (diff) | |
download | qpid-python-58337ca40df3a57a16cdee9b7f6b4fe0361b0018.tar.gz |
QPID-3858: WIP - async txns for msg publish pathway, but there are some race/thread issues to sort out.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1350745 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
20 files changed, 384 insertions, 509 deletions
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index 1d04828a49..fbe4cfbd7d 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -379,6 +379,7 @@ endif (UNIX) # Async store perf test (asyncPerf) set (asyncStorePerf_SOURCES + storePerftools/asyncPerf/Deliverable.cpp storePerftools/asyncPerf/MessageAsyncContext.cpp storePerftools/asyncPerf/MessageConsumer.cpp storePerftools/asyncPerf/MessageDeque.cpp @@ -388,10 +389,9 @@ set (asyncStorePerf_SOURCES storePerftools/asyncPerf/QueuedMessage.cpp storePerftools/asyncPerf/SimplePersistableMessage.cpp storePerftools/asyncPerf/SimplePersistableQueue.cpp - storePerftools/asyncPerf/SimpleTransactionContext.cpp storePerftools/asyncPerf/TestOptions.cpp storePerftools/asyncPerf/TestResult.cpp - storePerftools/asyncPerf/TransactionAsyncContext.cpp + storePerftools/asyncPerf/TxnPublish.cpp storePerftools/common/Parameters.cpp storePerftools/common/PerftoolError.cpp diff --git a/cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h b/cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h deleted file mode 100644 index d354c729dc..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file AtomicCounter.h - */ - -#ifndef tests_storePerftools_asyncPerf_AtomicCounter_h_ -#define tests_storePerftools_asyncPerf_AtomicCounter_h_ - -#include "qpid/sys/Condition.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/Time.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -template <class T> -class AtomicCounter -{ -public: - AtomicCounter(const T& initValue = T(0)) : - m_cnt(initValue), - m_cntMutex(), - m_cntCondition() - {} - - virtual ~AtomicCounter() - {} - - T& - get() const - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); - return m_cnt; - } - - void - operator++() - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); - ++m_cnt; - } - - void - operator--() - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); - if (--m_cnt == 0) { - m_cntCondition.notify(); - } - } - - void - waitForZero(const qpid::sys::Duration& d) - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); - while (m_cnt != 0) { - m_cntCondition.wait(m_cntMutex, qpid::sys::AbsTime(qpid::sys::AbsTime(), d)); - } - } - -private: - T m_cnt; - mutable qpid::sys::Mutex m_cntMutex; - qpid::sys::Condition m_cntCondition; -}; - -typedef AtomicCounter<uint32_t> AsyncOpCounter; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_AtomicCounter_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp index f150a34027..9da7e348e0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp @@ -18,50 +18,26 @@ */ /** - * \file TransactionAsyncContext.cpp + * \file Deliverable.cpp */ -#include "TransactionAsyncContext.h" - -#include <cassert> +#include "Deliverable.h" namespace tests { namespace storePerftools { namespace asyncPerf { -TransactionAsyncContext::TransactionAsyncContext(boost::shared_ptr<SimpleTransactionContext> tc, - const qpid::asyncStore::AsyncOperation::opCode op): - m_tc(tc), - m_op(op) -{ - assert(m_tc.get() != 0); -} - -TransactionAsyncContext::~TransactionAsyncContext() +Deliverable::Deliverable() : + m_delivered(false) {} -qpid::asyncStore::AsyncOperation::opCode -TransactionAsyncContext::getOpCode() const -{ - return m_op; -} - -const char* -TransactionAsyncContext::getOpStr() const -{ - return qpid::asyncStore::AsyncOperation::getOpStr(m_op); -} - -boost::shared_ptr<SimpleTransactionContext> -TransactionAsyncContext::getTransactionContext() const -{ - return m_tc; -} +Deliverable::~Deliverable() +{} -void -TransactionAsyncContext::destroy() +bool +Deliverable::isDelivered() const { - delete this; + return m_delivered; } }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h index 186a8141cf..57e130eeba 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h @@ -18,39 +18,37 @@ */ /** - * \file TransactionAsyncContext.h + * \file Deliverable.h */ -#ifndef tests_storePerftools_asyncPerf_TransactionAsyncContext_h_ -#define tests_storePerftools_asyncPerf_TransactionAsyncContext_h_ - -#include "qpid/asyncStore/AsyncOperation.h" -#include "qpid/broker/AsyncStore.h" // qpid::broker::BrokerAsyncContext +#ifndef tests_storePerftools_asyncPerf_Deliverable_h_ +#define tests_storePerftools_asyncPerf_Deliverable_h_ #include <boost/shared_ptr.hpp> +#include <stdint.h> // uint64_t namespace tests { namespace storePerftools { namespace asyncPerf { -class SimpleTransactionContext; +class SimplePersistableMessage; +class SimplePersistableQueue; -class TransactionAsyncContext: public qpid::broker::BrokerAsyncContext +class Deliverable { public: - TransactionAsyncContext(boost::shared_ptr<SimpleTransactionContext> tc, - const qpid::asyncStore::AsyncOperation::opCode op); - virtual ~TransactionAsyncContext(); - qpid::asyncStore::AsyncOperation::opCode getOpCode() const; - const char* getOpStr() const; - boost::shared_ptr<SimpleTransactionContext> getTransactionContext() const; - void destroy(); - -private: - boost::shared_ptr<SimpleTransactionContext> m_tc; - const qpid::asyncStore::AsyncOperation::opCode m_op; + Deliverable(); + virtual ~Deliverable(); + + virtual uint64_t contentSize() = 0; + virtual void deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue) = 0; + virtual SimplePersistableMessage& getMessage() = 0; + virtual bool isDelivered() const; + +protected: + bool m_delivered; }; }}} // namespace tests::storePerftools::asyncPerf -#endif // tests_storePerftools_asyncPerf_TransactionAsyncContext_h_ +#endif // tests_storePerftools_asyncPerf_Deliverable_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 0a2fd4f333..9b015fc428 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -32,8 +32,6 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimpleTransactionContext; - MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, boost::shared_ptr<SimplePersistableQueue> queue) : m_perfTestParams(perfTestParams), @@ -54,7 +52,7 @@ MessageConsumer::runConsumers() ::usleep(1000); // TODO - replace this poller with condition variable } } - return 0; + return reinterpret_cast<void*>(0); } //static diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp index 5530513e12..008ddf33e7 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -26,6 +26,10 @@ #include "SimplePersistableMessage.h" #include "SimplePersistableQueue.h" #include "TestOptions.h" +#include "TxnPublish.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/broker/TxnBuffer.h" #include <stdint.h> // uint32_t @@ -33,15 +37,15 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimpleTransactionContext; - MessageProducer::MessageProducer(const TestOptions& perfTestParams, const char* msgData, qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq, boost::shared_ptr<SimplePersistableQueue> queue) : m_perfTestParams(perfTestParams), m_msgData(msgData), m_store(store), + m_resultQueue(arq), m_queue(queue) {} @@ -51,11 +55,46 @@ MessageProducer::~MessageProducer() void* MessageProducer::runProducers() { + const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U; + uint16_t txnCnt = 0U; + qpid::broker::TxnBuffer* tb = 0; + if (useTxns) { + tb = new qpid::broker::TxnBuffer(m_resultQueue); + } for (uint32_t numMsgs=0; numMsgs<m_perfTestParams.m_numMsgs; ++numMsgs) { boost::intrusive_ptr<SimplePersistableMessage> msg(new SimplePersistableMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); - m_queue->deliver(msg); + if (useTxns) { + boost::shared_ptr<TxnPublish> op(new TxnPublish(msg)); + op->deliverTo(m_queue); + tb->enlist(op); + if (++txnCnt >= m_perfTestParams.m_enqTxnBlockSize) { + if (m_perfTestParams.m_durable) { + tb->commitLocal(m_store); + + // TxnBuffer instance tb carries async state that precludes it being re-used for the next + // transaction until the current commit cycle completes. So use another instance. This + // instance should auto-delete when the async commit cycle completes. + if (numMsgs<m_perfTestParams.m_numMsgs) { + //tb = boost::shared_ptr<qpid::broker::TxnBuffer>(new qpid::broker::TxnBuffer(m_resultQueue)); + tb = new qpid::broker::TxnBuffer(m_resultQueue); + } + } else { + tb->commit(); + } + txnCnt = 0U; + } + } else { + m_queue->deliver(msg); + } + } + if (txnCnt) { + if (m_perfTestParams.m_durable) { + tb->commitLocal(m_store); + } else { + tb->commit(); + } } - return 0; + return reinterpret_cast<void*>(0); } //static diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h index 746247e849..55504164ef 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h @@ -28,9 +28,10 @@ namespace qpid { namespace asyncStore { - class AsyncStoreImpl; - +} +namespace broker { +class AsyncResultQueue; }} namespace tests { @@ -39,6 +40,7 @@ namespace asyncPerf { class SimplePersistableQueue; class TestOptions; +class TxnBuffer; class MessageProducer { @@ -46,6 +48,7 @@ public: MessageProducer(const TestOptions& perfTestParams, const char* msgData, qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq, boost::shared_ptr<SimplePersistableQueue> queue); virtual ~MessageProducer(); void* runProducers(); @@ -54,6 +57,7 @@ private: const TestOptions& m_perfTestParams; const char* m_msgData; qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncResultQueue& m_resultQueue; boost::shared_ptr<SimplePersistableQueue> m_queue; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index fbfebbac2b..7941e761ca 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -80,7 +80,7 @@ PerfTest::run() tests::storePerftools::common::ScopedTimer st(m_testResult); for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) { - boost::shared_ptr<MessageProducer> mp(new MessageProducer(m_testOpts, m_msgData, m_store, m_queueList[q])); + boost::shared_ptr<MessageProducer> mp(new MessageProducer(m_testOpts, m_msgData, m_store, m_resultQueue, m_queueList[q])); m_producers.push_back(mp); for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mp->startProducers, diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp index 4836f9358a..07a80c8a33 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp @@ -31,10 +31,12 @@ namespace storePerftools { namespace asyncPerf { QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q, + qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq) : m_q(q), + m_th(th), m_op(op), m_rcb(rcb), m_arq(arq) @@ -44,11 +46,13 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q, boost::intrusive_ptr<SimplePersistableMessage> msg, + qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq) : m_q(q), m_msg(msg), + m_th(th), m_op(op), m_rcb(rcb), m_arq(arq) @@ -84,6 +88,12 @@ QueueAsyncContext::getMessage() const return m_msg; } +qpid::broker::TxnHandle +QueueAsyncContext::getTxnHandle() const +{ + return m_th; +} + qpid::broker::AsyncResultQueue* QueueAsyncContext::getAsyncResultQueue() const { @@ -99,7 +109,9 @@ QueueAsyncContext::getAsyncResultCallback() const void QueueAsyncContext::invokeCallback(const qpid::broker::AsyncResultHandle* const arh) const { - m_rcb(arh); + if (m_rcb) { + m_rcb(arh); + } } void diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h index df45117fe4..b4d16fe615 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h @@ -26,6 +26,7 @@ #include "qpid/asyncStore/AsyncOperation.h" #include "qpid/broker/AsyncStore.h" +#include "qpid/broker/TxnHandle.h" #include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> @@ -42,11 +43,13 @@ class QueueAsyncContext: public qpid::broker::BrokerAsyncContext { public: QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q, + qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q, boost::intrusive_ptr<SimplePersistableMessage> msg, + qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq); @@ -55,6 +58,7 @@ public: const char* getOpStr() const; boost::shared_ptr<SimplePersistableQueue> getQueue() const; boost::intrusive_ptr<SimplePersistableMessage> getMessage() const; + qpid::broker::TxnHandle getTxnHandle() const; qpid::broker::AsyncResultQueue* getAsyncResultQueue() const; qpid::broker::AsyncResultCallback getAsyncResultCallback() const; void invokeCallback(const qpid::broker::AsyncResultHandle* const arh) const; @@ -63,6 +67,7 @@ public: private: boost::shared_ptr<SimplePersistableQueue> m_q; boost::intrusive_ptr<SimplePersistableMessage> m_msg; + qpid::broker::TxnHandle m_th; const qpid::asyncStore::AsyncOperation::opCode m_op; qpid::broker::AsyncResultCallback m_rcb; qpid::broker::AsyncResultQueue* const m_arq; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp index dd4de355b3..8ee858587b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp @@ -79,4 +79,22 @@ QueuedMessage::enqHandle() return m_enqHandle; } +void +QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th) +{ + m_queue->enqueue(th, *this); +} + +void +QueuedMessage::commitEnqueue() +{ + m_queue->process(m_msg); +} + +void +QueuedMessage::abortEnqueue() +{ + m_queue->enqueueAborted(m_msg); +} + }}} // namespace tests::storePerfTools diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h index 29e7f792cd..896c53ab5b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h @@ -28,6 +28,13 @@ #include <boost/intrusive_ptr.hpp> +namespace qpid { +namespace broker { + +class TxnHandle; + +}} + namespace tests { namespace storePerftools { namespace asyncPerf { @@ -48,6 +55,11 @@ public: const qpid::broker::EnqueueHandle& enqHandle() const; qpid::broker::EnqueueHandle& enqHandle(); + // -- Transaction handling --- + void prepareEnqueue(qpid::broker::TxnHandle& th); + void commitEnqueue(); + void abortEnqueue(); + private: SimplePersistableQueue* m_queue; boost::intrusive_ptr<SimplePersistableMessage> m_msg; diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp index 58aacabdcd..a9771c1442 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp @@ -52,6 +52,12 @@ SimplePersistableMessage::getHandle() return m_msgHandle; } +uint64_t +SimplePersistableMessage::contentSize() const +{ + return static_cast<uint64_t>(m_msg.size()); +} + void SimplePersistableMessage::setPersistenceId(uint64_t id) const { diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h index f6258822ea..7ac54ddea1 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h @@ -51,20 +51,21 @@ public: virtual ~SimplePersistableMessage(); const qpid::broker::MessageHandle& getHandle() const; qpid::broker::MessageHandle& getHandle(); + uint64_t contentSize() const; - // Interface Persistable + // --- Interface Persistable --- virtual void setPersistenceId(uint64_t id) const; virtual uint64_t getPersistenceId() const; virtual void encode(qpid::framing::Buffer& buffer) const; virtual uint32_t encodedSize() const; - // Interface PersistableMessage + // --- Interface PersistableMessage --- virtual void allDequeuesComplete(); virtual uint32_t encodedHeaderSize() const; virtual bool isPersistent() const; - // Interface DataStore - virtual uint64_t getSize(); + // --- Interface DataSource --- + virtual uint64_t getSize(); // <- same as encodedSize()? virtual void write(char* target); private: diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp index b34357c144..1a3eae4b43 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp @@ -25,17 +25,21 @@ #include "MessageDeque.h" #include "SimplePersistableMessage.h" -#include "SimpleTransactionContext.h" #include "QueueAsyncContext.h" #include "QueuedMessage.h" #include "qpid/asyncStore/AsyncStoreImpl.h" #include "qpid/broker/AsyncResultHandle.h" +#include "qpid/broker/TxnHandle.h" namespace tests { namespace storePerftools { namespace asyncPerf { +//static +qpid::broker::TxnHandle SimplePersistableQueue::s_nullTxnHandle; // used for non-txn operations + + SimplePersistableQueue::SimplePersistableQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, qpid::asyncStore::AsyncStoreImpl* store, @@ -127,6 +131,7 @@ SimplePersistableQueue::asyncCreate() { if (m_store) { boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + s_nullTxnHandle, qpid::asyncStore::AsyncOperation::QUEUE_CREATE, &handleAsyncResult, &m_resultQueue)); @@ -144,6 +149,7 @@ SimplePersistableQueue::asyncDestroy(const bool deleteQueue) if (m_store) { if (deleteQueue) { boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + s_nullTxnHandle, qpid::asyncStore::AsyncOperation::QUEUE_DESTROY, &handleAsyncResult, &m_resultQueue)); @@ -159,7 +165,7 @@ void SimplePersistableQueue::deliver(boost::intrusive_ptr<SimplePersistableMessage> msg) { QueuedMessage qm(this, msg); - enqueue((SimpleTransactionContext*)0, qm); + enqueue(s_nullTxnHandle, qm); push(qm); } @@ -168,13 +174,13 @@ SimplePersistableQueue::dispatch() { QueuedMessage qm; if (m_messages->consume(qm)) { - return dequeue((SimpleTransactionContext*)0, qm); + return dequeue(s_nullTxnHandle, qm); } return false; } bool -SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt, +SimplePersistableQueue::enqueue(qpid::broker::TxnHandle& th, QueuedMessage& qm) { ScopedUse u(m_barrier); @@ -183,13 +189,13 @@ SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt, } if (qm.payload()->isPersistent() && m_store) { qm.payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(ctxt, qm); + return asyncEnqueue(th, qm); } return false; } bool -SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt, +SimplePersistableQueue::dequeue(qpid::broker::TxnHandle& th, QueuedMessage& qm) { ScopedUse u(m_barrier); @@ -198,12 +204,23 @@ SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt, } if (qm.payload()->isPersistent() && m_store) { qm.payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(ctxt, qm); + return asyncDequeue(th, qm); } return true; } void +SimplePersistableQueue::process(boost::intrusive_ptr<SimplePersistableMessage> msg) +{ + QueuedMessage qm(this, msg); + push(qm); +} + +void +SimplePersistableQueue::enqueueAborted(boost::intrusive_ptr<SimplePersistableMessage> /*msg*/) +{} + +void SimplePersistableQueue::encode(qpid::framing::Buffer& buffer) const { buffer.putShortString(m_name); @@ -326,38 +343,46 @@ SimplePersistableQueue::push(QueuedMessage& qm, // private bool -SimplePersistableQueue::asyncEnqueue(SimpleTransactionContext* txn, +SimplePersistableQueue::asyncEnqueue(qpid::broker::TxnHandle& th, QueuedMessage& qm) { qm.payload()->setPersistenceId(m_store->getNextRid()); //std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), qm.payload(), + th, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, &handleAsyncResult, &m_resultQueue)); m_store->submitEnqueue(qm.enqHandle(), - txn->getHandle(), + th, qac); ++m_asyncOpCounter; + if (th.isValid()) { + th.incrOpCnt(); + } return true; } // private bool -SimplePersistableQueue::asyncDequeue(SimpleTransactionContext* txn, +SimplePersistableQueue::asyncDequeue(qpid::broker::TxnHandle& th, QueuedMessage& qm) { //std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), qm.payload(), + th, qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, &handleAsyncResult, &m_resultQueue)); m_store->submitDequeue(qm.enqHandle(), - txn->getHandle(), + th, qac); ++m_asyncOpCounter; + if (th.isValid()) { + th.incrOpCnt(); + } return true; } @@ -407,6 +432,11 @@ SimplePersistableQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContex //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; assert(qc->getQueue().get() == this); --m_asyncOpCounter; + + qpid::broker::TxnHandle th = qc->getTxnHandle(); + if (th.isValid()) { // transactional enqueue + th.decrOpCnt(); + } } // private @@ -416,6 +446,11 @@ SimplePersistableQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContex //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; assert(qc->getQueue().get() == this); --m_asyncOpCounter; + + qpid::broker::TxnHandle th = qc->getTxnHandle(); + if (th.isValid()) { // transactional enqueue + th.decrOpCnt(); + } } }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h index 3bc0972ede..34ef9407ac 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h @@ -24,8 +24,7 @@ #ifndef tests_storePerftools_asyncPerf_SimplePersistableQueue_h_ #define tests_storePerftools_asyncPerf_SimplePersistableQueue_h_ -#include "AtomicCounter.h" // AsyncOpCounter - +#include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter #include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource #include "qpid/broker/PersistableQueue.h" #include "qpid/broker/QueueHandle.h" @@ -41,6 +40,7 @@ class AsyncStoreImpl; } namespace broker { class AsyncResultQueue; +class TxnHandle; } namespace framing { class FieldTable; @@ -52,7 +52,6 @@ namespace asyncPerf { class Messages; class SimplePersistableMessage; -class SimpleTransactionContext; class QueueAsyncContext; class QueuedMessage; @@ -78,10 +77,12 @@ public: // --- Methods in msg handling path from qpid::Queue --- void deliver(boost::intrusive_ptr<SimplePersistableMessage> msg); bool dispatch(); // similar to qpid::broker::Queue::distpatch(Consumer&) but without Consumer param - bool enqueue(SimpleTransactionContext* ctxt, + bool enqueue(qpid::broker::TxnHandle& th, QueuedMessage& qm); - bool dequeue(SimpleTransactionContext* ctxt, + bool dequeue(qpid::broker::TxnHandle& th, QueuedMessage& qm); + void process(boost::intrusive_ptr<SimplePersistableMessage> msg); + void enqueueAborted(boost::intrusive_ptr<SimplePersistableMessage> msg); // --- Interface qpid::broker::Persistable --- virtual void encode(qpid::framing::Buffer& buffer) const; @@ -99,10 +100,12 @@ public: virtual void write(char* target); private: + static qpid::broker::TxnHandle s_nullTxnHandle; // used for non-txn operations + const std::string m_name; qpid::asyncStore::AsyncStoreImpl* m_store; qpid::broker::AsyncResultQueue& m_resultQueue; - AsyncOpCounter m_asyncOpCounter; + qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; mutable uint64_t m_persistenceId; std::string m_persistableData; qpid::broker::QueueHandle m_queueHandle; @@ -133,9 +136,9 @@ private: bool isRecovery = false); // -- Async ops --- - bool asyncEnqueue(SimpleTransactionContext* txn, + bool asyncEnqueue(qpid::broker::TxnHandle& th, QueuedMessage& qm); - bool asyncDequeue(SimpleTransactionContext* txn, + bool asyncDequeue(qpid::broker::TxnHandle& th, QueuedMessage& qm); // --- Async op counter --- diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp deleted file mode 100644 index 2aea14bc21..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimpleTransactionContext.cpp - */ - -#include "SimpleTransactionContext.h" - -#include "TransactionAsyncContext.h" - -#include "qpid/asyncStore/AsyncStoreImpl.h" - -#include <uuid/uuid.h> - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -SimpleTransactionContext::SimpleTransactionContext(const std::string& xid) : - qpid::broker::TransactionContext(), - m_xid(xid), - m_tpcFlag(!xid.empty()), - m_store(0), - m_txnHandle(0), - m_prepared(false), - m_enqueuedMsgs() -{ - if (!m_tpcFlag) { - setLocalXid(); - } -//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; -} - -SimpleTransactionContext::SimpleTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, - const std::string& xid) : - m_store(store), - m_prepared(false), - m_enqueuedMsgs() -{ -//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; - if (m_store != 0) { - m_txnHandle = store->createTxnHandle(xid); - } -} - -SimpleTransactionContext::~SimpleTransactionContext() -{} - -// static -/* -void -SimpleTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerAsyncContext* bc) -{ - if (bc && res) { - TransactionAsyncContext* tac = dynamic_cast<TransactionAsyncContext*>(bc); - if (res->errNo) { - // TODO: Handle async failure here - std::cerr << "Transaction xid=\"" << tac->getTransactionContext()->getXid() << "\": Operation " << tac->getOpStr() << ": failure " - << res->errNo << " (" << res->errMsg << ")" << std::endl; - } else { - // Handle async success here - switch(tac->getOpCode()) { - case qpid::asyncStore::AsyncOperation::TXN_PREPARE: - tac->getTransactionContext()->prepareComplete(tac); - break; - case qpid::asyncStore::AsyncOperation::TXN_COMMIT: - tac->getTransactionContext()->commitComplete(tac); - break; - case qpid::asyncStore::AsyncOperation::TXN_ABORT: - tac->getTransactionContext()->abortComplete(tac); - break; - default: - std::ostringstream oss; - oss << "tests::storePerftools::asyncPerf::SimpleTransactionContext::handleAsyncResult(): Unknown async operation: " << tac->getOpCode(); - throw qpid::Exception(oss.str()); - }; - } - } - if (bc) delete bc; - if (res) delete res; -} -*/ - -const qpid::broker::TxnHandle& -SimpleTransactionContext::getHandle() const -{ - return m_txnHandle; -} - -qpid::broker::TxnHandle& -SimpleTransactionContext::getHandle() -{ - return m_txnHandle; -} - -bool -SimpleTransactionContext::is2pc() const -{ - return m_tpcFlag; -} - -const std::string& -SimpleTransactionContext::getXid() const -{ - return m_xid; -} - -void -SimpleTransactionContext::addEnqueuedMsg(QueuedMessage* qm) -{ - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); - m_enqueuedMsgs.push_back(qm); -} - -void -SimpleTransactionContext::prepare() -{ - if (m_tpcFlag) { - localPrepare(); - m_prepared = true; - } - std::ostringstream oss; - oss << "SimpleTransactionContext::prepare(): xid=\"" << getXid() - << "\": Transaction Error: called prepare() on local transaction"; - throw qpid::Exception(oss.str()); -} - -void -SimpleTransactionContext::abort() -{ - // TODO: Check the following XA transaction semantics: - // Assuming 2PC aborts can occur without a prepare. Do local prepare if not already prepared. - if (!m_prepared) { - localPrepare(); - } - if (m_store != 0) { -// m_store->submitAbort(m_txnHandle, -// &handleAsyncResult, -// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT))); - } -//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; -} - -void -SimpleTransactionContext::commit() -{ - if (is2pc()) { - if (!m_prepared) { - std::ostringstream oss; - oss << "SimpleTransactionContext::abort(): xid=\"" << getXid() - << "\": Transaction Error: called commit() without prepare() on 2PC transaction"; - throw qpid::Exception(oss.str()); - } - } else { - localPrepare(); - } - if (m_store != 0) { -// m_store->submitCommit(m_txnHandle, -// &handleAsyncResult, -// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT))); - } -//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; -} - - -// private -void -SimpleTransactionContext::localPrepare() -{ - if (m_store != 0) { -// m_store->submitPrepare(m_txnHandle, -// &handleAsyncResult, -// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE))); - } -//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; -} - -// private -void -SimpleTransactionContext::setLocalXid() -{ - uuid_t uuid; - // TODO: Valgrind warning: Possible race condition in uuid_generate_random() - is it thread-safe, and if not, does it matter? - // If this race condition affects the randomness of the UUID, then there could be a problem here. - ::uuid_generate_random(uuid); - char uuidStr[37]; // 36-char uuid + trailing '\0' - ::uuid_unparse(uuid, uuidStr); - m_xid.assign(uuidStr); -} - -// private -void -SimpleTransactionContext::prepareComplete(const TransactionAsyncContext* /*tc*/) -{ - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); -// while (!m_enqueuedMsgs.empty()) { -// m_enqueuedMsgs.front()->clearTransaction(); -// m_enqueuedMsgs.pop_front(); -// } -//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush; -// assert(tc->getTransactionContext().get() == this); -} - - -// private -void -SimpleTransactionContext::abortComplete(const TransactionAsyncContext* tc) -{ -//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush; - assert(tc->getTransactionContext().get() == this); -} - - -// private -void -SimpleTransactionContext::commitComplete(const TransactionAsyncContext* tc) -{ -//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush; - assert(tc->getTransactionContext().get() == this); -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.h deleted file mode 100644 index 49fee9c720..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.h +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimpleTransactionContext.h - */ - -#ifndef tests_storePerftools_asyncPerf_SimpleTransactionContext_h_ -#define tests_storePerftools_asyncPerf_SimpleTransactionContext_h_ - -#include "qpid/broker/TransactionalStore.h" // qpid::broker::TransactionContext -#include "qpid/broker/TxnHandle.h" -#include "qpid/sys/Mutex.h" - -#include <deque> - -namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -} -namespace broker { -class AsyncResult; -class BrokerAsyncContext; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class QueuedMessage; -class TransactionAsyncContext; - -class SimpleTransactionContext : public qpid::broker::TransactionContext -{ -public: - SimpleTransactionContext(const std::string& xid = std::string()); - SimpleTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, - const std::string& xid = std::string()); - virtual ~SimpleTransactionContext(); -// static void handleAsyncResult(const qpid::broker::AsyncResult* res, -// qpid::broker::BrokerAsyncContext* bc); - - const qpid::broker::TxnHandle& getHandle() const; - qpid::broker::TxnHandle& getHandle(); - bool is2pc() const; - const std::string& getXid() const; - void addEnqueuedMsg(QueuedMessage* qm); - - void prepare(); - void abort(); - void commit(); - -private: - std::string m_xid; - bool m_tpcFlag; - qpid::asyncStore::AsyncStoreImpl* m_store; - qpid::broker::TxnHandle m_txnHandle; - bool m_prepared; - std::deque<QueuedMessage*> m_enqueuedMsgs; - qpid::sys::Mutex m_enqueuedMsgsMutex; - - void localPrepare(); - void setLocalXid(); - - // --- Ascnc op completions (called through handleAsyncResult) --- - void prepareComplete(const TransactionAsyncContext* tc); - void abortComplete(const TransactionAsyncContext* tc); - void commitComplete(const TransactionAsyncContext* tc); - -}; - -}}} // namespace tests:storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_SimpleTransactionContext_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp new file mode 100644 index 0000000000..eff7646de6 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnPublish.cpp + */ + +#include "SimplePersistableMessage.h" +#include "SimplePersistableQueue.h" // debug msg +#include "TxnPublish.h" + +#include "QueuedMessage.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +TxnPublish::TxnPublish(boost::intrusive_ptr<SimplePersistableMessage> msg) : + m_msg(msg) +{ +//std::cout << "TTT new TxnPublish" << std::endl << std::flush; +} + +TxnPublish::~TxnPublish() +{} + +bool +TxnPublish::prepare(qpid::broker::TxnHandle& th) throw() +{ +//std::cout << "TTT TxnPublish::prepare: " << m_queues.size() << " queues" << std::endl << std::flush; + try{ + while (!m_queues.empty()) { + m_queues.front()->prepareEnqueue(th); + m_prepared.push_back(m_queues.front()); + m_queues.pop_front(); + } + return true; + } catch (const std::exception& e) { + std::cerr << "Failed to prepare transaction: " << e.what() << std::endl; + } catch (...) { + std::cerr << "Failed to prepare transaction: (unknown error)" << std::endl; + } + return false; +} + +void +TxnPublish::commit() throw() +{ +//std::cout << "TTT TxnPublish::commit" << std::endl << std::flush; + try { + for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { + (*i)->commitEnqueue(); + } + } catch (const std::exception& e) { + std::cerr << "Failed to commit transaction: " << e.what() << std::endl; + } catch (...) { + std::cerr << "Failed to commit transaction: (unknown error)" << std::endl; + } +} + +void +TxnPublish::rollback() throw() +{ +//std::cout << "TTT TxnPublish::rollback" << std::endl << std::flush; + try { + for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { + (*i)->abortEnqueue(); + } + } catch (const std::exception& e) { + std::cerr << "Failed to rollback transaction: " << e.what() << std::endl; + } catch (...) { + std::cerr << "Failed to rollback transaction: (unknown error)" << std::endl; + } +} + +uint64_t +TxnPublish::contentSize() +{ + return m_msg->contentSize(); +} + +void +TxnPublish::deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue) +{ +//std::cout << "TTT TxnPublish::deliverTo queue=\"" << queue->getName() << "\"" << std::endl << std::flush; + boost::shared_ptr<QueuedMessage> qm(new QueuedMessage(queue.get(), m_msg)); + m_queues.push_back(qm); + m_delivered = true; +} + +SimplePersistableMessage& +TxnPublish::getMessage() +{ + return *m_msg; +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h new file mode 100644 index 0000000000..6e97e99349 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnPublish.h + */ + +#ifndef tests_storePerftools_asyncPerf_TxnPublish_h_ +#define tests_storePerftools_asyncPerf_TxnPublish_h_ + +#include "Deliverable.h" + +#include "qpid/broker/TxnOp.h" + +#include <boost/intrusive_ptr.hpp> +#include <boost/shared_ptr.hpp> +#include <list> + +namespace qpid { +namespace broker { + +class TransactionContext; + +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class QueuedMessage; +class SimplePersistableMessage; +class SimplePersistableQueue; + +class TxnPublish : public qpid::broker::TxnOp, + public Deliverable +{ +public: + TxnPublish(boost::intrusive_ptr<SimplePersistableMessage> msg); + virtual ~TxnPublish(); + + // --- Interface TxOp --- + bool prepare(qpid::broker::TxnHandle& th) throw(); + void commit() throw(); + void rollback() throw(); + + // --- Interface Deliverable --- + uint64_t contentSize(); + void deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue); + SimplePersistableMessage& getMessage(); + +private: + boost::intrusive_ptr<SimplePersistableMessage> m_msg; + std::list<boost::shared_ptr<QueuedMessage> > m_queues; + std::list<boost::shared_ptr<QueuedMessage> > m_prepared; +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_TxnPublish_h_ |