From 63c6598f401ac6406e5a31c602c7892b798536fc Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Tue, 31 Jul 2012 13:35:53 +0000 Subject: QPID-3858: WIP: Durable transactions fixed git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1367535 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 8 +- cpp/src/qpid/asyncStore/AsyncStoreImpl.h | 4 +- cpp/src/qpid/asyncStore/TxnHandleImpl.cpp | 71 +-------- cpp/src/qpid/asyncStore/TxnHandleImpl.h | 14 +- cpp/src/qpid/broker/AsyncStore.h | 10 +- cpp/src/qpid/broker/ConfigHandle.cpp | 2 + cpp/src/qpid/broker/ConfigHandle.h | 2 +- cpp/src/qpid/broker/EnqueueHandle.cpp | 2 + cpp/src/qpid/broker/EnqueueHandle.h | 2 +- cpp/src/qpid/broker/EventHandle.h | 2 +- cpp/src/qpid/broker/MessageHandle.cpp | 2 + cpp/src/qpid/broker/MessageHandle.h | 2 +- cpp/src/qpid/broker/QueueAsyncContext.cpp | 32 +++- cpp/src/qpid/broker/QueueAsyncContext.h | 15 +- cpp/src/qpid/broker/QueueHandle.cpp | 1 + cpp/src/qpid/broker/QueueHandle.h | 2 +- cpp/src/qpid/broker/TxnBuffer.cpp | 147 +++++++++++++----- cpp/src/qpid/broker/TxnBuffer.h | 21 ++- cpp/src/qpid/broker/TxnHandle.cpp | 23 --- cpp/src/qpid/broker/TxnHandle.h | 7 +- cpp/src/qpid/broker/TxnOp.h | 6 +- .../storePerftools/asyncPerf/DeliveryRecord.cpp | 4 +- .../storePerftools/asyncPerf/DeliveryRecord.h | 4 +- .../storePerftools/asyncPerf/MessageConsumer.cpp | 37 ++--- .../storePerftools/asyncPerf/MessageProducer.cpp | 28 ++-- .../storePerftools/asyncPerf/MessageProducer.h | 2 +- .../storePerftools/asyncPerf/QueuedMessage.cpp | 4 +- .../tests/storePerftools/asyncPerf/QueuedMessage.h | 2 +- .../tests/storePerftools/asyncPerf/SimpleQueue.cpp | 172 +++++++++------------ .../tests/storePerftools/asyncPerf/SimpleQueue.h | 9 +- .../tests/storePerftools/asyncPerf/TxnAccept.cpp | 15 +- cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h | 6 +- .../tests/storePerftools/asyncPerf/TxnPublish.cpp | 25 ++- .../tests/storePerftools/asyncPerf/TxnPublish.h | 2 +- 34 files changed, 329 insertions(+), 356 deletions(-) diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 4aeab4c7bf..aa66e7adb8 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -81,16 +81,18 @@ AsyncStoreImpl::createTxnHandle(qpid::broker::TxnBuffer* tb) } qpid::broker::TxnHandle -AsyncStoreImpl::createTxnHandle(const std::string& xid) +AsyncStoreImpl::createTxnHandle(const std::string& xid, + const bool tpcFlag) { - return qpid::broker::TxnHandle(new TxnHandleImpl(xid)); + return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tpcFlag)); } qpid::broker::TxnHandle AsyncStoreImpl::createTxnHandle(const std::string& xid, + const bool tpcFlag, qpid::broker::TxnBuffer* tb) { - return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tb)); + return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tpcFlag, tb)); } void diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index 3e29039aea..eb3f090ad7 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -60,8 +60,10 @@ public: qpid::broker::TxnHandle createTxnHandle(); qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb); - qpid::broker::TxnHandle createTxnHandle(const std::string& xid); qpid::broker::TxnHandle createTxnHandle(const std::string& xid, + const bool tpcFlag); + qpid::broker::TxnHandle createTxnHandle(const std::string& xid, + const bool tpcFlag, qpid::broker::TxnBuffer* tb); void submitPrepare(qpid::broker::TxnHandle& txnHandle, diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp index 2b343e9517..dd644b29bd 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp @@ -23,53 +23,32 @@ #include "TxnHandleImpl.h" -#include "qpid/Exception.h" -#include "qpid/broker/TxnBuffer.h" -#include "qpid/log/Statement.h" - -#include - namespace qpid { namespace asyncStore { TxnHandleImpl::TxnHandleImpl() : m_tpcFlag(false), - m_asyncOpCnt(0UL), m_txnBuffer(0) -{ - createLocalXid(); -} +{} TxnHandleImpl::TxnHandleImpl(qpid::broker::TxnBuffer* tb) : m_tpcFlag(false), - m_asyncOpCnt(0UL), m_txnBuffer(tb) -{ - createLocalXid(); -} +{} -TxnHandleImpl::TxnHandleImpl(const std::string& xid) : +TxnHandleImpl::TxnHandleImpl(const std::string& xid, const bool tpcFlag) : m_xid(xid), - m_tpcFlag(!xid.empty()), - m_asyncOpCnt(0UL), + m_tpcFlag(tpcFlag), m_txnBuffer(0) -{ - if (m_xid.empty()) { - createLocalXid(); - } -} +{} TxnHandleImpl::TxnHandleImpl(const std::string& xid, + const bool tpcFlag, qpid::broker::TxnBuffer* tb) : m_xid(xid), - m_tpcFlag(!xid.empty()), - m_asyncOpCnt(0UL), + m_tpcFlag(tpcFlag), m_txnBuffer(tb) -{ - if (m_xid.empty()) { - createLocalXid(); - } -} +{} TxnHandleImpl::~TxnHandleImpl() {} @@ -86,38 +65,4 @@ TxnHandleImpl::is2pc() const return m_tpcFlag; } -void -TxnHandleImpl::incrOpCnt() -{ - qpid::sys::ScopedLock l(m_asyncOpCntMutex); - ++m_asyncOpCnt; -} - -void -TxnHandleImpl::decrOpCnt() -{ - qpid::sys::ScopedLock l(m_asyncOpCntMutex); - if (m_asyncOpCnt == 0UL) { - throw qpid::Exception("Transaction async operation count underflow"); - } - if (--m_asyncOpCnt == 0UL && m_txnBuffer) { - m_txnBuffer->asyncLocalCommit(); - } -} - -// private -void -TxnHandleImpl::createLocalXid() -{ - uuid_t uuid; - - // TODO: This call might not be thread safe - Valgrind's helgrind tool emits warnings for this: - ::uuid_generate_random(uuid); - - char uuidStr[37]; // 36-char uuid + trailing '\0' - ::uuid_unparse(uuid, uuidStr); - m_xid.assign(uuidStr); - QPID_LOG(debug, "Local XID created: \"" << m_xid << "\""); -} - }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h index 9452044d66..e1f8afff3e 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h @@ -43,26 +43,16 @@ class TxnHandleImpl : public virtual qpid::RefCounted public: TxnHandleImpl(); TxnHandleImpl(qpid::broker::TxnBuffer* tb); - TxnHandleImpl(const std::string& xid); - TxnHandleImpl(const std::string& xid, qpid::broker::TxnBuffer* tb); + TxnHandleImpl(const std::string& xid, const bool tpcFlag); + TxnHandleImpl(const std::string& xid, const bool tpcFlag, qpid::broker::TxnBuffer* tb); virtual ~TxnHandleImpl(); const std::string& getXid() const; bool is2pc() const; - void submitPrepare(); - void submitCommit(); - void submitAbort(); - - void incrOpCnt(); - void decrOpCnt(); private: std::string m_xid; bool m_tpcFlag; - uint32_t m_asyncOpCnt; - qpid::sys::Mutex m_asyncOpCntMutex; qpid::broker::TxnBuffer* const m_txnBuffer; - - void createLocalXid(); }; }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index 5fb2e0a1eb..6f1c02e059 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -36,7 +36,6 @@ class AsyncResultHandle; class AsyncResultQueue { public: virtual ~AsyncResultQueue() {} - // TODO: Remove boost::shared_ptr<> from this interface virtual void submit(boost::shared_ptr) = 0; }; @@ -79,11 +78,12 @@ public: virtual TxnHandle createTxnHandle() = 0; virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0; - virtual TxnHandle createTxnHandle(const std::string& xid) = 0; virtual TxnHandle createTxnHandle(const std::string& xid, - TxnBuffer* tb) = 0; + const bool tpcFlag) = 0; + virtual TxnHandle createTxnHandle(const std::string& xid, + const bool tpcFlag, + TxnBuffer* tb) = 0; - // TODO: Remove boost::shared_ptr from this interface virtual void submitPrepare(TxnHandle&, boost::shared_ptr) = 0; // Distributed txns only virtual void submitCommit(TxnHandle&, @@ -112,8 +112,6 @@ public: // --- Store async interface --- - // TODO: Remove boost::shared_ptr from this interface - // TODO: Switch from BrokerAsyncContext (parent class) to ConfigAsyncContext // when theses features (and async context classes) are developed. virtual void submitCreate(ConfigHandle&, diff --git a/cpp/src/qpid/broker/ConfigHandle.cpp b/cpp/src/qpid/broker/ConfigHandle.cpp index 0bd65543ae..6ac8ce6ace 100644 --- a/cpp/src/qpid/broker/ConfigHandle.cpp +++ b/cpp/src/qpid/broker/ConfigHandle.cpp @@ -55,4 +55,6 @@ ConfigHandle::operator=(const ConfigHandle& r) return PrivateImpl::assign(*this, r); } +// --- ConfigHandleImpl methods --- + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ConfigHandle.h b/cpp/src/qpid/broker/ConfigHandle.h index 6bcb3d8ce0..3010018421 100644 --- a/cpp/src/qpid/broker/ConfigHandle.h +++ b/cpp/src/qpid/broker/ConfigHandle.h @@ -43,7 +43,7 @@ public: ~ConfigHandle(); ConfigHandle& operator=(const ConfigHandle& r); - // ConfigHandleImpl methods + // --- ConfigHandleImpl methods --- // private: diff --git a/cpp/src/qpid/broker/EnqueueHandle.cpp b/cpp/src/qpid/broker/EnqueueHandle.cpp index 877eb680a6..aff8673524 100644 --- a/cpp/src/qpid/broker/EnqueueHandle.cpp +++ b/cpp/src/qpid/broker/EnqueueHandle.cpp @@ -55,4 +55,6 @@ EnqueueHandle::operator=(const EnqueueHandle& r) return PrivateImpl::assign(*this, r); } +// --- EnqueueHandleImpl methods --- + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/EnqueueHandle.h b/cpp/src/qpid/broker/EnqueueHandle.h index f869a755b1..63872bb3b3 100644 --- a/cpp/src/qpid/broker/EnqueueHandle.h +++ b/cpp/src/qpid/broker/EnqueueHandle.h @@ -43,7 +43,7 @@ public: ~EnqueueHandle(); EnqueueHandle& operator=(const EnqueueHandle& r); - // EnqueueHandleImpl methods + // --- EnqueueHandleImpl methods --- // private: diff --git a/cpp/src/qpid/broker/EventHandle.h b/cpp/src/qpid/broker/EventHandle.h index 31f0e22dbf..d73cf1e689 100644 --- a/cpp/src/qpid/broker/EventHandle.h +++ b/cpp/src/qpid/broker/EventHandle.h @@ -45,7 +45,7 @@ public: ~EventHandle(); EventHandle& operator=(const EventHandle& r); - // EventHandleImpl methods + // --- EventHandleImpl methods --- const std::string& getKey() const; private: diff --git a/cpp/src/qpid/broker/MessageHandle.cpp b/cpp/src/qpid/broker/MessageHandle.cpp index 2727e74edc..0a9ff50509 100644 --- a/cpp/src/qpid/broker/MessageHandle.cpp +++ b/cpp/src/qpid/broker/MessageHandle.cpp @@ -55,4 +55,6 @@ MessageHandle::operator=(const MessageHandle& r) return PrivateImpl::assign(*this, r); } +// --- MessageHandleImpl methods --- + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/MessageHandle.h b/cpp/src/qpid/broker/MessageHandle.h index e0a68a8878..ab83dd00a0 100644 --- a/cpp/src/qpid/broker/MessageHandle.h +++ b/cpp/src/qpid/broker/MessageHandle.h @@ -43,7 +43,7 @@ public: ~MessageHandle(); MessageHandle& operator=(const MessageHandle& r); - // MessageHandleImpl methods + // --- MessageHandleImpl methods --- // private: diff --git a/cpp/src/qpid/broker/QueueAsyncContext.cpp b/cpp/src/qpid/broker/QueueAsyncContext.cpp index 54a10c9c0e..4bd2d271eb 100644 --- a/cpp/src/qpid/broker/QueueAsyncContext.cpp +++ b/cpp/src/qpid/broker/QueueAsyncContext.cpp @@ -29,13 +29,30 @@ namespace qpid { namespace broker { +QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, + AsyncResultCallback rcb, + AsyncResultQueue* const arq) : + m_q(q), + m_rcb(rcb), + m_arq(arq) +{} + +QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, + boost::intrusive_ptr msg, + AsyncResultCallback rcb, + AsyncResultQueue* const arq) : + m_q(q), + m_msg(msg), + m_rcb(rcb), + m_arq(arq) +{} QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, - TxnHandle& th, + TxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : m_q(q), - m_th(th), + m_tb(tb), m_rcb(rcb), m_arq(arq) { @@ -44,12 +61,12 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, boost::intrusive_ptr msg, - TxnHandle& th, + TxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : m_q(q), m_msg(msg), - m_th(th), + m_tb(tb), m_rcb(rcb), m_arq(arq) { @@ -72,10 +89,9 @@ QueueAsyncContext::getMessage() const return m_msg; } -TxnHandle -QueueAsyncContext::getTxnHandle() const -{ - return m_th; +TxnBuffer* +QueueAsyncContext::getTxnBuffer() const { + return m_tb; } AsyncResultQueue* diff --git a/cpp/src/qpid/broker/QueueAsyncContext.h b/cpp/src/qpid/broker/QueueAsyncContext.h index 34fd63fd06..e9ba2ebbac 100644 --- a/cpp/src/qpid/broker/QueueAsyncContext.h +++ b/cpp/src/qpid/broker/QueueAsyncContext.h @@ -45,18 +45,25 @@ class QueueAsyncContext: public BrokerAsyncContext { public: QueueAsyncContext(boost::shared_ptr q, - TxnHandle& th, AsyncResultCallback rcb, AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr q, boost::intrusive_ptr msg, - TxnHandle& th, + AsyncResultCallback rcb, + AsyncResultQueue* const arq); + QueueAsyncContext(boost::shared_ptr q, + TxnBuffer* tb, + AsyncResultCallback rcb, + AsyncResultQueue* const arq); + QueueAsyncContext(boost::shared_ptr q, + boost::intrusive_ptr msg, + TxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq); virtual ~QueueAsyncContext(); boost::shared_ptr getQueue() const; boost::intrusive_ptr getMessage() const; - TxnHandle getTxnHandle() const; + TxnBuffer* getTxnBuffer() const; AsyncResultQueue* getAsyncResultQueue() const; AsyncResultCallback getAsyncResultCallback() const; void invokeCallback(const AsyncResultHandle* const arh) const; @@ -65,7 +72,7 @@ public: private: boost::shared_ptr m_q; boost::intrusive_ptr m_msg; - TxnHandle m_th; // TODO: get rid of this when tests::storePerftools::asyncPerf::SimpleQueue has solved its TxnHandle issues. + TxnBuffer* m_tb; AsyncResultCallback m_rcb; AsyncResultQueue* const m_arq; }; diff --git a/cpp/src/qpid/broker/QueueHandle.cpp b/cpp/src/qpid/broker/QueueHandle.cpp index 5a5678df5a..dffb262a3b 100644 --- a/cpp/src/qpid/broker/QueueHandle.cpp +++ b/cpp/src/qpid/broker/QueueHandle.cpp @@ -56,6 +56,7 @@ QueueHandle::operator=(const QueueHandle& r) } // --- QueueHandleImpl methods --- + const std::string& QueueHandle::getName() const { diff --git a/cpp/src/qpid/broker/QueueHandle.h b/cpp/src/qpid/broker/QueueHandle.h index 234c5e15e8..1110367418 100644 --- a/cpp/src/qpid/broker/QueueHandle.h +++ b/cpp/src/qpid/broker/QueueHandle.h @@ -45,7 +45,7 @@ public: ~QueueHandle(); QueueHandle& operator=(const QueueHandle& r); - // QueueHandleImpl methods + // --- QueueHandleImpl methods --- const std::string& getName() const; private: diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp index f85301e036..4d6e7b7918 100644 --- a/cpp/src/qpid/broker/TxnBuffer.cpp +++ b/cpp/src/qpid/broker/TxnBuffer.cpp @@ -29,31 +29,86 @@ #include "qpid/log/Statement.h" +#include + namespace qpid { namespace broker { +qpid::sys::Mutex TxnBuffer::s_uuidMutex; + TxnBuffer::TxnBuffer(AsyncResultQueue& arq) : m_store(0), m_resultQueue(arq), + m_tpcFlag(false), + m_submitOpCnt(0), + m_completeOpCnt(0), m_state(NONE) -{} +{ + createLocalXid(); +} -TxnBuffer::~TxnBuffer() -{} +TxnBuffer::TxnBuffer(AsyncResultQueue& arq, std::string& xid) : + m_store(0), + m_resultQueue(arq), + m_xid(xid), + m_tpcFlag(!xid.empty()), + m_submitOpCnt(0), + m_completeOpCnt(0), + m_state(NONE) +{ + if (m_xid.empty()) { + createLocalXid(); + } +} + +TxnBuffer::~TxnBuffer() {} + +TxnHandle& +TxnBuffer::getTxnHandle() { + return m_txnHandle; +} + +const std::string& +TxnBuffer::getXid() const { + return m_xid; +} + +bool +TxnBuffer::is2pc() const { + return m_tpcFlag; +} void -TxnBuffer::enlist(boost::shared_ptr op) -{ +TxnBuffer::incrOpCnt() { + qpid::sys::ScopedLock l(m_submitOpCntMutex); + ++m_submitOpCnt; +} + +void +TxnBuffer::decrOpCnt() { + const uint32_t numOps = getNumOps(); + qpid::sys::ScopedLock l2(m_completeOpCntMutex); + qpid::sys::ScopedLock l3(m_submitOpCntMutex); + if (m_completeOpCnt == m_submitOpCnt) { + throw qpid::Exception("Transaction async operation count underflow"); + } + ++m_completeOpCnt; + if (numOps == m_submitOpCnt && numOps == m_completeOpCnt) { + asyncLocalCommit(); + } +} + +void +TxnBuffer::enlist(boost::shared_ptr op) { qpid::sys::ScopedLock l(m_opsMutex); m_ops.push_back(op); } bool -TxnBuffer::prepare(TxnHandle& th) -{ +TxnBuffer::prepare() { qpid::sys::ScopedLock l(m_opsMutex); for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { - if (!(*i)->prepare(th)) { + if (!(*i)->prepare(this)) { return false; } } @@ -61,8 +116,7 @@ TxnBuffer::prepare(TxnHandle& th) } void -TxnBuffer::commit() -{ +TxnBuffer::commit() { qpid::sys::ScopedLock l(m_opsMutex); for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->commit(); @@ -71,8 +125,7 @@ TxnBuffer::commit() } void -TxnBuffer::rollback() -{ +TxnBuffer::rollback() { qpid::sys::ScopedLock l(m_opsMutex); for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->rollback(); @@ -81,45 +134,44 @@ TxnBuffer::rollback() } bool -TxnBuffer::commitLocal(AsyncTransactionalStore* const store) -{ - if (store) { - try { - m_store = store; - asyncLocalCommit(); - } catch (std::exception& e) { - QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed: " << e.what()); - } catch (...) { - QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed (unknown exception)"); - } +TxnBuffer::commitLocal(AsyncTransactionalStore* const store) { + try { + m_store = store; + asyncLocalCommit(); + } catch (std::exception& e) { + QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed (unknown exception)"); } return false; } void -TxnBuffer::asyncLocalCommit() -{ - assert(m_store != 0); +TxnBuffer::asyncLocalCommit() { switch(m_state) { case NONE: m_state = PREPARE; - m_txnHandle = m_store->createTxnHandle(this); - prepare(m_txnHandle); - break; + if (m_store) { + m_txnHandle = m_store->createTxnHandle(this); + } + prepare(/*shared_from_this()*/); + if (m_store) { + break; + } case PREPARE: m_state = COMMIT; - { + if (m_store) { boost::shared_ptr tac(new TxnAsyncContext(this, &handleAsyncCommitResult, &m_resultQueue)); m_store->testOp(); m_store->submitCommit(m_txnHandle, tac); + break; } - break; case COMMIT: commit(); m_state = COMPLETE; - delete this; // TODO: ugly! Find a better way to handle the life cycle of this class + delete this; break; case COMPLETE: default: ; @@ -142,8 +194,7 @@ TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) { } void -TxnBuffer::asyncLocalAbort() -{ +TxnBuffer::asyncLocalAbort() { assert(m_store != 0); switch (m_state) { case NONE: @@ -160,7 +211,7 @@ TxnBuffer::asyncLocalAbort() case ROLLBACK: rollback(); m_state = COMPLETE; - delete this; // TODO: ugly! Find a better way to handle the life cycle of this class + delete this; default: ; } } @@ -171,11 +222,33 @@ TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) { if (arh) { boost::shared_ptr tac = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); if (arh->getErrNo()) { - QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() - << " (" << arh->getErrMsg() << ")"); + QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr() + << " failed: err=" << arh->getErrNo() << " (" << arh->getErrMsg() << ")"); } tac->getTxnBuffer()->asyncLocalAbort(); } } +// private +uint32_t +TxnBuffer::getNumOps() const { + qpid::sys::ScopedLock l(m_opsMutex); + return m_ops.size(); +} + +// private +void +TxnBuffer::createLocalXid() +{ + uuid_t uuid; + { + qpid::sys::ScopedLock l(s_uuidMutex); + ::uuid_generate_random(uuid); // Not thread-safe + } + char uuidStr[37]; // 36-char uuid + trailing '\0' + ::uuid_unparse(uuid, uuidStr); + m_xid.assign(uuidStr); + QPID_LOG(debug, "Local XID created: \"" << m_xid << "\""); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h index 7b85a1b6c4..02569f6545 100644 --- a/cpp/src/qpid/broker/TxnBuffer.h +++ b/cpp/src/qpid/broker/TxnBuffer.h @@ -42,10 +42,16 @@ class TxnOp; class TxnBuffer { public: TxnBuffer(AsyncResultQueue& arq); + TxnBuffer(AsyncResultQueue& arq, std::string& xid); virtual ~TxnBuffer(); + TxnHandle& getTxnHandle(); + const std::string& getXid() const; + bool is2pc() const; + void incrOpCnt(); + void decrOpCnt(); void enlist(boost::shared_ptr op); - bool prepare(TxnHandle& th); + bool prepare(); void commit(); void rollback(); bool commitLocal(AsyncTransactionalStore* const store); @@ -57,14 +63,25 @@ public: static void handleAsyncAbortResult(const AsyncResultHandle* const arh); private: + mutable qpid::sys::Mutex m_opsMutex; + mutable qpid::sys::Mutex m_submitOpCntMutex; + mutable qpid::sys::Mutex m_completeOpCntMutex; + static qpid::sys::Mutex s_uuidMutex; + std::vector > m_ops; - qpid::sys::Mutex m_opsMutex; TxnHandle m_txnHandle; AsyncTransactionalStore* m_store; AsyncResultQueue& m_resultQueue; + std::string m_xid; + bool m_tpcFlag; + uint32_t m_submitOpCnt; + uint32_t m_completeOpCnt; typedef enum {NONE = 0, PREPARE, COMMIT, ROLLBACK, COMPLETE} e_txnState; e_txnState m_state; + + uint32_t getNumOps() const; + void createLocalXid(); }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnHandle.cpp b/cpp/src/qpid/broker/TxnHandle.cpp index 58cedd586e..f8977b2132 100644 --- a/cpp/src/qpid/broker/TxnHandle.cpp +++ b/cpp/src/qpid/broker/TxnHandle.cpp @@ -57,28 +57,5 @@ TxnHandle::operator=(const TxnHandle& r) // --- TxnHandleImpl methods --- -const std::string& -TxnHandle::getXid() const -{ - return impl->getXid(); -} - -bool -TxnHandle::is2pc() const -{ - return impl->is2pc(); -} - -void -TxnHandle::incrOpCnt() -{ - impl->incrOpCnt(); -} - -void -TxnHandle::decrOpCnt() -{ - impl->decrOpCnt(); -} }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnHandle.h b/cpp/src/qpid/broker/TxnHandle.h index 7302490939..f55d5b01d8 100644 --- a/cpp/src/qpid/broker/TxnHandle.h +++ b/cpp/src/qpid/broker/TxnHandle.h @@ -45,11 +45,8 @@ public: ~TxnHandle(); TxnHandle& operator=(const TxnHandle& r); - // TxnHandleImpl methods - const std::string& getXid() const; - bool is2pc() const; - void incrOpCnt(); - void decrOpCnt(); + // --- TxnHandleImpl methods --- + // private: friend class PrivateImplRef; diff --git a/cpp/src/qpid/broker/TxnOp.h b/cpp/src/qpid/broker/TxnOp.h index 1626e30ccd..bcff87551c 100644 --- a/cpp/src/qpid/broker/TxnOp.h +++ b/cpp/src/qpid/broker/TxnOp.h @@ -24,15 +24,17 @@ #ifndef qpid_broker_TxnOp_h_ #define qpid_broker_TxnOp_h_ +#include + namespace qpid { namespace broker { -class TxnHandle; +class TxnBuffer; class TxnOp{ public: virtual ~TxnOp() {} - virtual bool prepare(TxnHandle& th) throw() = 0; + virtual bool prepare(qpid::broker::TxnBuffer*) throw() = 0; virtual void commit() throw() = 0; virtual void rollback() throw() = 0; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp index e1c67a9547..6f33369a26 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp @@ -82,9 +82,9 @@ DeliveryRecord::isRedundant() const } void -DeliveryRecord::dequeue(qpid::broker::TxnHandle& txn) +DeliveryRecord::dequeue(qpid::broker::TxnBuffer* tb) { - m_queuedMessage->getQueue()->dequeue(txn, m_queuedMessage); + m_queuedMessage->getQueue()->dequeue(tb, m_queuedMessage); } void diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h index d4529941e7..6c5d87f374 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h +++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h @@ -30,7 +30,7 @@ namespace qpid { namespace broker { -class TxnHandle; +class TxnBuffer; }} namespace tests { @@ -51,7 +51,7 @@ public: bool setEnded(); bool isEnded() const; bool isRedundant() const; - void dequeue(qpid::broker::TxnHandle& txn); + void dequeue(qpid::broker::TxnBuffer* tb); void committed() const; boost::shared_ptr getQueuedMessage() const; private: diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 4a2bc2bf0c..6aa477c470 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -47,22 +47,18 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, m_queue(queue) {} -MessageConsumer::~MessageConsumer() -{} +MessageConsumer::~MessageConsumer() {} void -MessageConsumer::record(boost::shared_ptr dr) -{ +MessageConsumer::record(boost::shared_ptr dr) { m_unacked.push_back(dr); } void -MessageConsumer::commitComplete() -{} +MessageConsumer::commitComplete() {} void* -MessageConsumer::runConsumers() -{ +MessageConsumer::runConsumers() { const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U; uint16_t opsInTxnCnt = 0U; qpid::broker::TxnBuffer* tb = 0; @@ -78,17 +74,13 @@ MessageConsumer::runConsumers() ++numMsgs; if (useTxns) { // --- Transactional dequeue --- + boost::shared_ptr ta(new TxnAccept(m_unacked)); + m_unacked.clear(); + tb->enlist(ta); if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) { - if (m_perfTestParams.m_durable) { - boost::shared_ptr ta(new TxnAccept(m_unacked)); - m_unacked.clear(); - tb->enlist(ta); - tb->commitLocal(m_store); - if (numMsgs < m_perfTestParams.m_numMsgs) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); - } - } else { - tb->commit(); + tb->commitLocal(m_store); + if (numMsgs < m_perfTestParams.m_numMsgs) { + tb = new qpid::broker::TxnBuffer(m_resultQueue); } opsInTxnCnt = 0U; } @@ -105,11 +97,7 @@ MessageConsumer::runConsumers() } if (opsInTxnCnt) { - if (m_perfTestParams.m_durable) { - tb->commitLocal(m_store); - } else { - tb->commit(); - } + tb->commitLocal(m_store); } return reinterpret_cast(0); @@ -117,8 +105,7 @@ MessageConsumer::runConsumers() //static void* -MessageConsumer::startConsumers(void* ptr) -{ +MessageConsumer::startConsumers(void* ptr) { return reinterpret_cast(ptr)->runConsumers(); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp index 7d9aaceb11..974f3f3981 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -49,12 +49,10 @@ MessageProducer::MessageProducer(const TestOptions& perfTestParams, m_queue(queue) {} -MessageProducer::~MessageProducer() -{} +MessageProducer::~MessageProducer() {} void* -MessageProducer::runProducers() -{ +MessageProducer::runProducers() { const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U; uint16_t recsInTxnCnt = 0U; qpid::broker::TxnBuffer* tb = 0; @@ -68,17 +66,13 @@ MessageProducer::runProducers() op->deliverTo(m_queue); tb->enlist(op); if (++recsInTxnCnt >= m_perfTestParams.m_enqTxnBlockSize) { - if (m_perfTestParams.m_durable) { - tb->commitLocal(m_store); + tb->commitLocal(m_store); - // TxnBuffer instance tb carries async state that precludes it being re-used for the next - // transaction until the current commit cycle completes. So use another instance. This - // instance should auto-delete when the async commit cycle completes. - if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); - } - } else { - tb->commit(); + // TxnBuffer instance tb carries async state that precludes it being re-used for the next + // transaction until the current commit cycle completes. So use another instance. This + // instance should auto-delete when the async commit cycle completes. + if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) { + tb = new qpid::broker::TxnBuffer(m_resultQueue); } recsInTxnCnt = 0U; } @@ -87,11 +81,7 @@ MessageProducer::runProducers() } } if (recsInTxnCnt) { - if (m_perfTestParams.m_durable) { - tb->commitLocal(m_store); - } else { - tb->commit(); - } + tb->commitLocal(m_store); } return reinterpret_cast(0); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h index 7fa74a2c51..127408e3db 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h @@ -32,6 +32,7 @@ class AsyncStoreImpl; } namespace broker { class AsyncResultQueue; +class TxnBuffer; }} namespace tests { @@ -40,7 +41,6 @@ namespace asyncPerf { class SimpleQueue; class TestOptions; -class TxnBuffer; class MessageProducer { diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp index 572089faaf..0d16248c7f 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp @@ -89,9 +89,9 @@ QueuedMessage::enqHandle() } void -QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th) +QueuedMessage::prepareEnqueue(qpid::broker::TxnBuffer* tb) { - m_queue->enqueue(th, shared_from_this()); + m_queue->enqueue(tb, shared_from_this()); } void diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h index dd10f8b501..630fe1aedc 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h @@ -59,7 +59,7 @@ public: qpid::broker::EnqueueHandle& enqHandle(); // --- Transaction handling --- - void prepareEnqueue(qpid::broker::TxnHandle& th); + void prepareEnqueue(qpid::broker::TxnBuffer* tb); void commitEnqueue(); void abortEnqueue(); diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp index f297e83402..06b4e9333f 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -31,7 +31,7 @@ #include "qpid/broker/AsyncResultHandle.h" #include "qpid/broker/QueueAsyncContext.h" -#include "qpid/broker/TxnHandle.h" +#include "qpid/broker/TxnBuffer.h" #include // memcpy() @@ -65,33 +65,27 @@ SimpleQueue::SimpleQueue(const std::string& name, } } -SimpleQueue::~SimpleQueue() -{} +SimpleQueue::~SimpleQueue() {} const qpid::broker::QueueHandle& -SimpleQueue::getHandle() const -{ +SimpleQueue::getHandle() const { return m_queueHandle; } qpid::broker::QueueHandle& -SimpleQueue::getHandle() -{ +SimpleQueue::getHandle() { return m_queueHandle; } qpid::broker::AsyncStore* -SimpleQueue::getStore() -{ +SimpleQueue::getStore() { return m_store; } void -SimpleQueue::asyncCreate() -{ +SimpleQueue::asyncCreate() { if (m_store) { boost::shared_ptr qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - s_nullTxnHandle, &handleAsyncCreateResult, &m_resultQueue)); m_store->submitCreate(m_queueHandle, this, qac); @@ -123,7 +117,6 @@ SimpleQueue::asyncDestroy(const bool deleteQueue) if (m_store) { if (deleteQueue) { boost::shared_ptr qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - s_nullTxnHandle, &handleAsyncDestroyResult, &m_resultQueue)); m_store->submitDestroy(m_queueHandle, qac); @@ -151,16 +144,14 @@ SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* con } void -SimpleQueue::deliver(boost::intrusive_ptr msg) -{ +SimpleQueue::deliver(boost::intrusive_ptr msg) { boost::shared_ptr qm(boost::shared_ptr(new QueuedMessage(this, msg))); - enqueue(s_nullTxnHandle, qm); + enqueue(qm); push(qm); } bool -SimpleQueue::dispatch(MessageConsumer& mc) -{ +SimpleQueue::dispatch(MessageConsumer& mc) { boost::shared_ptr qm; if (m_messages->consume(qm)) { boost::shared_ptr dr(new DeliveryRecord(qm, mc, false)); @@ -171,110 +162,95 @@ SimpleQueue::dispatch(MessageConsumer& mc) } bool -SimpleQueue::enqueue(boost::shared_ptr qm) -{ - return enqueue(s_nullTxnHandle, qm); +SimpleQueue::enqueue(boost::shared_ptr qm) { + return enqueue(0, qm); } bool -SimpleQueue::enqueue(qpid::broker::TxnHandle& th, - boost::shared_ptr qm) -{ +SimpleQueue::enqueue(qpid::broker::TxnBuffer* tb, + boost::shared_ptr qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { return false; } if (qm->payload()->isPersistent() && m_store) { qm->payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(th, qm); + return asyncEnqueue(tb, qm); } return false; } bool -SimpleQueue::dequeue(boost::shared_ptr qm) -{ - return dequeue(s_nullTxnHandle, qm); +SimpleQueue::dequeue(boost::shared_ptr qm) { + return dequeue(0, qm); } bool -SimpleQueue::dequeue(qpid::broker::TxnHandle& th, - boost::shared_ptr qm) -{ +SimpleQueue::dequeue(qpid::broker::TxnBuffer* tb, + boost::shared_ptr qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { return false; } if (qm->payload()->isPersistent() && m_store) { qm->payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(th, qm); + return asyncDequeue(tb, qm); } return true; } void -SimpleQueue::process(boost::intrusive_ptr msg) -{ +SimpleQueue::process(boost::intrusive_ptr msg) { push(boost::shared_ptr(new QueuedMessage(this, msg))); } void -SimpleQueue::enqueueAborted(boost::intrusive_ptr /*msg*/) -{} +SimpleQueue::enqueueAborted(boost::intrusive_ptr) {} void -SimpleQueue::encode(qpid::framing::Buffer& buffer) const -{ +SimpleQueue::encode(qpid::framing::Buffer& buffer) const { buffer.putShortString(m_name); } uint32_t -SimpleQueue::encodedSize() const -{ +SimpleQueue::encodedSize() const { return m_name.size() + 1; } uint64_t -SimpleQueue::getPersistenceId() const -{ +SimpleQueue::getPersistenceId() const { return m_persistenceId; } void -SimpleQueue::setPersistenceId(uint64_t persistenceId) const -{ +SimpleQueue::setPersistenceId(uint64_t persistenceId) const { m_persistenceId = persistenceId; } void -SimpleQueue::flush() -{ +SimpleQueue::flush() { //if(m_store) m_store->flush(*this); } const std::string& -SimpleQueue::getName() const -{ +SimpleQueue::getName() const { return m_name; } void -SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) -{ +SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) { if (externalQueueStore != inst && externalQueueStore) delete externalQueueStore; externalQueueStore = inst; } uint64_t -SimpleQueue::getSize() -{ +SimpleQueue::getSize() { return m_persistableData.size(); } void -SimpleQueue::write(char* target) -{ +SimpleQueue::write(char* target) { ::memcpy(target, m_persistableData.data(), m_persistableData.size()); } @@ -344,21 +320,20 @@ SimpleQueue::push(boost::shared_ptr qm, // private bool -SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, - boost::shared_ptr qm) -{ +SimpleQueue::asyncEnqueue(qpid::broker::TxnBuffer* tb, + boost::shared_ptr qm) { assert(qm.get()); -// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this boost::shared_ptr qac(new qpid::broker::QueueAsyncContext(shared_from_this(), qm->payload(), - th, + tb, &handleAsyncEnqueueResult, &m_resultQueue)); - // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) - if (th.isValid()) { - th.incrOpCnt(); + if (tb) { + tb->incrOpCnt(); + m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac); + } else { + m_store->submitEnqueue(qm->enqHandle(), s_nullTxnHandle, qac); } - m_store->submitEnqueue(qm->enqHandle(), th, qac); ++m_asyncOpCounter; return true; } @@ -382,22 +357,21 @@ SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* con // private bool -SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, +SimpleQueue::asyncDequeue(/*boost::shared_ptr*/qpid::broker::TxnBuffer* tb, boost::shared_ptr qm) { assert(qm.get()); boost::shared_ptr qac(new qpid::broker::QueueAsyncContext(shared_from_this(), qm->payload(), - th, + tb, &handleAsyncDequeueResult, &m_resultQueue)); - // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) - if (th.isValid()) { - th.incrOpCnt(); + if (tb) { + tb->incrOpCnt(); + m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac); + } else { + m_store->submitDequeue(qm->enqHandle(), s_nullTxnHandle, qac); } - m_store->submitDequeue(qm->enqHandle(), - th, - qac); ++m_asyncOpCounter; return true; } @@ -420,8 +394,7 @@ SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* con // private void -SimpleQueue::destroyCheck(const std::string& opDescr) const -{ +SimpleQueue::destroyCheck(const std::string& opDescr) const { if (m_destroyPending || m_destroyed) { std::ostringstream oss; oss << opDescr << " on queue \"" << m_name << "\" after call to destroy"; @@ -431,55 +404,54 @@ SimpleQueue::destroyCheck(const std::string& opDescr) const // private void -SimpleQueue::createComplete(const boost::shared_ptr qc) -{ - assert(qc->getQueue().get() == this); +SimpleQueue::createComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } --m_asyncOpCounter; } // private void -SimpleQueue::flushComplete(const boost::shared_ptr qc) -{ - assert(qc->getQueue().get() == this); +SimpleQueue::flushComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } --m_asyncOpCounter; } // private void -SimpleQueue::destroyComplete(const boost::shared_ptr qc) -{ - assert(qc->getQueue().get() == this); +SimpleQueue::destroyComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } --m_asyncOpCounter; m_destroyed = true; } // private void -SimpleQueue::enqueueComplete(const boost::shared_ptr qc) -{ - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; - - // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) - qpid::broker::TxnHandle th = qc->getTxnHandle(); - if (th.isValid()) { // transactional enqueue - th.decrOpCnt(); +SimpleQueue::enqueueComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + if (qc->getTxnBuffer()) { // transactional enqueue + qc->getTxnBuffer()->decrOpCnt(); + } } + --m_asyncOpCounter; } // private void -SimpleQueue::dequeueComplete(const boost::shared_ptr qc) -{ - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; - - // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) - qpid::broker::TxnHandle th = qc->getTxnHandle(); - if (th.isValid()) { // transactional enqueue - th.decrOpCnt(); +SimpleQueue::dequeueComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + if (qc->getTxnBuffer()) { // transactional enqueue + qc->getTxnBuffer()->decrOpCnt(); + } } + --m_asyncOpCounter; } }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h index bf88e32345..5f64c9b960 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h @@ -38,6 +38,7 @@ namespace qpid { namespace broker { class AsyncResultQueue; class QueueAsyncContext; +class TxnBuffer; } namespace framing { class FieldTable; @@ -76,10 +77,10 @@ public: void deliver(boost::intrusive_ptr msg); bool dispatch(MessageConsumer& mc); bool enqueue(boost::shared_ptr qm); - bool enqueue(qpid::broker::TxnHandle& th, + bool enqueue(qpid::broker::TxnBuffer* tb, boost::shared_ptr qm); bool dequeue(boost::shared_ptr qm); - bool dequeue(qpid::broker::TxnHandle& th, + bool dequeue(qpid::broker::TxnBuffer* tb, boost::shared_ptr qm); void process(boost::intrusive_ptr msg); void enqueueAborted(boost::intrusive_ptr msg); @@ -134,10 +135,10 @@ private: bool isRecovery = false); // -- Async ops --- - bool asyncEnqueue(qpid::broker::TxnHandle& th, + bool asyncEnqueue(qpid::broker::TxnBuffer* tb, boost::shared_ptr qm); static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh); - bool asyncDequeue(qpid::broker::TxnHandle& th, + bool asyncDequeue(qpid::broker::TxnBuffer* tb, boost::shared_ptr qm); static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh); diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp index 7bede50272..375cd568d2 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp @@ -35,18 +35,17 @@ TxnAccept::TxnAccept(std::deque >& ops) : m_ops(ops) {} -TxnAccept::~TxnAccept() -{} +TxnAccept::~TxnAccept() {} // --- Interface TxnOp --- bool -TxnAccept::prepare(qpid::broker::TxnHandle& th) throw() -{ +TxnAccept::prepare(qpid::broker::TxnBuffer* tb) throw() { try { for (std::deque >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { - (*i)->dequeue(th); + (*i)->dequeue(tb); } + return true; } catch (const std::exception& e) { QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what()); } catch (...) { @@ -56,8 +55,7 @@ TxnAccept::prepare(qpid::broker::TxnHandle& th) throw() } void -TxnAccept::commit() throw() -{ +TxnAccept::commit() throw() { try { for (std::deque >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) { (*i)->committed(); @@ -71,7 +69,6 @@ TxnAccept::commit() throw() } void -TxnAccept::rollback() throw() -{} +TxnAccept::rollback() throw() {} }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h index 6bc7ff9ccb..5d84289965 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h @@ -41,9 +41,9 @@ public: virtual ~TxnAccept(); // --- Interface TxnOp --- - bool prepare(qpid::broker::TxnHandle& th) throw(); - void commit() throw(); - void rollback() throw(); + bool prepare(qpid::broker::TxnBuffer* tb) throw(); + void commit() throw(); + void rollback() throw(); private: std::deque > m_ops; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp index 6e15526e8f..cc36a38be7 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp @@ -38,15 +38,13 @@ TxnPublish::TxnPublish(boost::intrusive_ptr msg) : m_msg(msg) {} -TxnPublish::~TxnPublish() -{} +TxnPublish::~TxnPublish() {} bool -TxnPublish::prepare(qpid::broker::TxnHandle& th) throw() -{ - try{ +TxnPublish::prepare(qpid::broker::TxnBuffer* tb) throw() { + try { while (!m_queues.empty()) { - m_queues.front()->prepareEnqueue(th); + m_queues.front()->prepareEnqueue(tb); m_prepared.push_back(m_queues.front()); m_queues.pop_front(); } @@ -60,8 +58,7 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw() } void -TxnPublish::commit() throw() -{ +TxnPublish::commit() throw() { try { for (std::list >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { (*i)->commitEnqueue(); @@ -74,8 +71,7 @@ TxnPublish::commit() throw() } void -TxnPublish::rollback() throw() -{ +TxnPublish::rollback() throw() { try { for (std::list >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { (*i)->abortEnqueue(); @@ -88,21 +84,18 @@ TxnPublish::rollback() throw() } uint64_t -TxnPublish::contentSize() -{ +TxnPublish::contentSize() { return m_msg->contentSize(); } void -TxnPublish::deliverTo(const boost::shared_ptr& queue) -{ +TxnPublish::deliverTo(const boost::shared_ptr& queue) { m_queues.push_back(boost::shared_ptr(new QueuedMessage(queue.get(), m_msg))); m_delivered = true; } SimpleMessage& -TxnPublish::getMessage() -{ +TxnPublish::getMessage() { return *m_msg; } diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h index 17c3b3778d..eae9ef9c4c 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h @@ -48,7 +48,7 @@ public: virtual ~TxnPublish(); // --- Interface TxOp --- - bool prepare(qpid::broker::TxnHandle& th) throw(); + bool prepare(qpid::broker::TxnBuffer* tb) throw(); void commit() throw(); void rollback() throw(); -- cgit v1.2.1