diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/asyncStore/OperationQueue.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/TxnHandleImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/TxnHandleImpl.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/AsyncResultQueueImpl.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueEvents.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnBuffer.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnBuffer.h | 4 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp | 12 |
8 files changed, 61 insertions, 21 deletions
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index 4eabf82004..0b7f58bd6c 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -50,17 +50,23 @@ OperationQueue::submit(boost::shared_ptr<const AsyncOperation> op) OperationQueue::OpQueue::Batch::const_iterator OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) { - for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { + try { + for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { //std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; - boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext(); - if (bc) { - qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue(); - if (arq) { - qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc); - boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi)); - arq->submit(arh); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext(); + if (bc) { + qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue(); + if (arq) { + qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc); + boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi)); + arq->submit(arh); + } } } + } catch (const std::exception& e) { + std::cerr << "qpid::asyncStore::OperationQueue: Exception thrown processing async op: " << e.what() << std::endl; + } catch (...) { + std::cerr << "qpid::asyncStore::OperationQueue: Unknown exception thrown processing async op" << std::endl; } return e.end(); } diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp index 945b50861d..af3a8f01cf 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp @@ -89,12 +89,14 @@ TxnHandleImpl::is2pc() const void TxnHandleImpl::incrOpCnt() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_asyncOpCntMutex); ++m_asyncOpCnt; } void TxnHandleImpl::decrOpCnt() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_asyncOpCntMutex); if (m_asyncOpCnt == 0UL) { throw qpid::Exception("Transaction async operation count underflow"); } diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h index e357791508..9452044d66 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h @@ -24,9 +24,8 @@ #ifndef qpid_asyncStore_TxnHandleImpl_h_ #define qpid_asyncStore_TxnHandleImpl_h_ -#include "AtomicCounter.h" - #include "qpid/RefCounted.h" +#include "qpid/sys/Mutex.h" #include <stdint.h> // uint32_t #include <string> @@ -59,7 +58,8 @@ public: private: std::string m_xid; bool m_tpcFlag; - AsyncOpCounter m_asyncOpCnt; + uint32_t m_asyncOpCnt; + qpid::sys::Mutex m_asyncOpCntMutex; qpid::broker::TxnBuffer* const m_txnBuffer; void createLocalXid(); diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp index 9e3298bb4e..62c7ed33d9 100644 --- a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp +++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp @@ -49,11 +49,17 @@ AsyncResultQueueImpl::submit(boost::shared_ptr<AsyncResultHandle> arh) AsyncResultQueueImpl::ResultQueue::Batch::const_iterator AsyncResultQueueImpl::handle(const ResultQueue::Batch& e) { - for (ResultQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { + try { + for (ResultQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { //std::cout << "<== AsyncResultQueueImpl::handle() errNo=" << (*i)->getErrNo() << " errMsg=\"" << (*i)->getErrMsg() << "\"" << std::endl << std::flush; - if ((*i)->isValid()) { - (*i)->invokeAsyncResultCallback(); + if ((*i)->isValid()) { + (*i)->invokeAsyncResultCallback(); + } } + } catch (const std::exception& e) { + std::cerr << "qpid::broker::AsyncResultQueueImpl: Exception thrown processing async result: " << e.what() << std::endl; + } catch (...) { + std::cerr << "qpid::broker::AsyncResultQueueImpl: Unknown exception thrown processing async result" << std::endl; } return e.end(); } diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp index c66bdabf0f..b102a8554d 100644 --- a/cpp/src/qpid/broker/QueueEvents.cpp +++ b/cpp/src/qpid/broker/QueueEvents.cpp @@ -28,7 +28,7 @@ namespace qpid { namespace broker { QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync) : - eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true), sync(isSync) + eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true), sync(isSync) { if (!sync) eventQueue.start(); } diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp index 2af8bd4da3..d6662e5001 100644 --- a/cpp/src/qpid/broker/TxnBuffer.cpp +++ b/cpp/src/qpid/broker/TxnBuffer.cpp @@ -48,6 +48,7 @@ void TxnBuffer::enlist(boost::shared_ptr<TxnOp> op) { //std::cout << "TTT TxnBuffer::enlist" << std::endl << std::flush; + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); m_ops.push_back(op); } @@ -55,6 +56,7 @@ bool TxnBuffer::prepare(TxnHandle& th) { //std::cout << "TTT TxnBuffer::prepare" << std::endl << std::flush; + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { if (!(*i)->prepare(th)) { return false; @@ -67,6 +69,7 @@ void TxnBuffer::commit() { //std::cout << "TTT TxnBuffer::commit" << std::endl << std::flush; + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->commit(); } @@ -77,6 +80,7 @@ void TxnBuffer::rollback() { //std::cout << "TTT TxnBuffer::rollback" << std::endl << std::flush; + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->rollback(); } @@ -185,4 +189,22 @@ TxnBuffer::asyncLocalAbort() } } +// for debugging +/* +void +TxnBuffer::printState(std::ostream& os) +{ + os << "state="; + switch(m_state) { + case NONE: os << "NONE"; break; + case PREPARE: os << "PREPARE"; break; + case COMMIT: os << "COMMIT"; break; + case ROLLBACK: os << "ROLLBACK"; break; + case COMPLETE: os << "COMPLETE"; break; + default: os << m_state << "(unknown)"; + } + os << "; " << m_ops.size() << "; store=" << m_store; +} +*/ + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h index 308a91aa03..ea8b73407f 100644 --- a/cpp/src/qpid/broker/TxnBuffer.h +++ b/cpp/src/qpid/broker/TxnBuffer.h @@ -54,8 +54,12 @@ public: void asyncLocalCommit(); void asyncLocalAbort(); + // --- Debug --- + //void printState(std::ostream& os); + private: std::vector<boost::shared_ptr<TxnOp> > m_ops; + qpid::sys::Mutex m_opsMutex; TxnHandle m_txnHandle; AsyncTransactionalStore* m_store; AsyncResultQueue& m_resultQueue; diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp index 1a3eae4b43..be2b4c891b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp @@ -354,13 +354,13 @@ SimplePersistableQueue::asyncEnqueue(qpid::broker::TxnHandle& th, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, &handleAsyncResult, &m_resultQueue)); + if (th.isValid()) { + th.incrOpCnt(); + } m_store->submitEnqueue(qm.enqHandle(), th, qac); ++m_asyncOpCounter; - if (th.isValid()) { - th.incrOpCnt(); - } return true; } @@ -376,13 +376,13 @@ SimplePersistableQueue::asyncDequeue(qpid::broker::TxnHandle& th, qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, &handleAsyncResult, &m_resultQueue)); + if (th.isValid()) { + th.incrOpCnt(); + } m_store->submitDequeue(qm.enqHandle(), th, qac); ++m_asyncOpCounter; - if (th.isValid()) { - th.incrOpCnt(); - } return true; } |