summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp22
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.cpp2
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.h6
-rw-r--r--cpp/src/qpid/broker/AsyncResultQueueImpl.cpp12
-rw-r--r--cpp/src/qpid/broker/QueueEvents.cpp2
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.cpp22
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.h4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp12
8 files changed, 61 insertions, 21 deletions
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index 4eabf82004..0b7f58bd6c 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -50,17 +50,23 @@ OperationQueue::submit(boost::shared_ptr<const AsyncOperation> op)
OperationQueue::OpQueue::Batch::const_iterator
OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
{
- for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
+ try {
+ for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext();
- if (bc) {
- qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue();
- if (arq) {
- qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc);
- boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi));
- arq->submit(arh);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext();
+ if (bc) {
+ qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue();
+ if (arq) {
+ qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc);
+ boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi));
+ arq->submit(arh);
+ }
}
}
+ } catch (const std::exception& e) {
+ std::cerr << "qpid::asyncStore::OperationQueue: Exception thrown processing async op: " << e.what() << std::endl;
+ } catch (...) {
+ std::cerr << "qpid::asyncStore::OperationQueue: Unknown exception thrown processing async op" << std::endl;
}
return e.end();
}
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
index 945b50861d..af3a8f01cf 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
@@ -89,12 +89,14 @@ TxnHandleImpl::is2pc() const
void
TxnHandleImpl::incrOpCnt()
{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_asyncOpCntMutex);
++m_asyncOpCnt;
}
void
TxnHandleImpl::decrOpCnt()
{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_asyncOpCntMutex);
if (m_asyncOpCnt == 0UL) {
throw qpid::Exception("Transaction async operation count underflow");
}
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
index e357791508..9452044d66 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
@@ -24,9 +24,8 @@
#ifndef qpid_asyncStore_TxnHandleImpl_h_
#define qpid_asyncStore_TxnHandleImpl_h_
-#include "AtomicCounter.h"
-
#include "qpid/RefCounted.h"
+#include "qpid/sys/Mutex.h"
#include <stdint.h> // uint32_t
#include <string>
@@ -59,7 +58,8 @@ public:
private:
std::string m_xid;
bool m_tpcFlag;
- AsyncOpCounter m_asyncOpCnt;
+ uint32_t m_asyncOpCnt;
+ qpid::sys::Mutex m_asyncOpCntMutex;
qpid::broker::TxnBuffer* const m_txnBuffer;
void createLocalXid();
diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp
index 9e3298bb4e..62c7ed33d9 100644
--- a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp
+++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp
@@ -49,11 +49,17 @@ AsyncResultQueueImpl::submit(boost::shared_ptr<AsyncResultHandle> arh)
AsyncResultQueueImpl::ResultQueue::Batch::const_iterator
AsyncResultQueueImpl::handle(const ResultQueue::Batch& e)
{
- for (ResultQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
+ try {
+ for (ResultQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
//std::cout << "<== AsyncResultQueueImpl::handle() errNo=" << (*i)->getErrNo() << " errMsg=\"" << (*i)->getErrMsg() << "\"" << std::endl << std::flush;
- if ((*i)->isValid()) {
- (*i)->invokeAsyncResultCallback();
+ if ((*i)->isValid()) {
+ (*i)->invokeAsyncResultCallback();
+ }
}
+ } catch (const std::exception& e) {
+ std::cerr << "qpid::broker::AsyncResultQueueImpl: Exception thrown processing async result: " << e.what() << std::endl;
+ } catch (...) {
+ std::cerr << "qpid::broker::AsyncResultQueueImpl: Unknown exception thrown processing async result" << std::endl;
}
return e.end();
}
diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp
index c66bdabf0f..b102a8554d 100644
--- a/cpp/src/qpid/broker/QueueEvents.cpp
+++ b/cpp/src/qpid/broker/QueueEvents.cpp
@@ -28,7 +28,7 @@ namespace qpid {
namespace broker {
QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync) :
- eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true), sync(isSync)
+ eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true), sync(isSync)
{
if (!sync) eventQueue.start();
}
diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp
index 2af8bd4da3..d6662e5001 100644
--- a/cpp/src/qpid/broker/TxnBuffer.cpp
+++ b/cpp/src/qpid/broker/TxnBuffer.cpp
@@ -48,6 +48,7 @@ void
TxnBuffer::enlist(boost::shared_ptr<TxnOp> op)
{
//std::cout << "TTT TxnBuffer::enlist" << std::endl << std::flush;
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
m_ops.push_back(op);
}
@@ -55,6 +56,7 @@ bool
TxnBuffer::prepare(TxnHandle& th)
{
//std::cout << "TTT TxnBuffer::prepare" << std::endl << std::flush;
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
if (!(*i)->prepare(th)) {
return false;
@@ -67,6 +69,7 @@ void
TxnBuffer::commit()
{
//std::cout << "TTT TxnBuffer::commit" << std::endl << std::flush;
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->commit();
}
@@ -77,6 +80,7 @@ void
TxnBuffer::rollback()
{
//std::cout << "TTT TxnBuffer::rollback" << std::endl << std::flush;
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->rollback();
}
@@ -185,4 +189,22 @@ TxnBuffer::asyncLocalAbort()
}
}
+// for debugging
+/*
+void
+TxnBuffer::printState(std::ostream& os)
+{
+ os << "state=";
+ switch(m_state) {
+ case NONE: os << "NONE"; break;
+ case PREPARE: os << "PREPARE"; break;
+ case COMMIT: os << "COMMIT"; break;
+ case ROLLBACK: os << "ROLLBACK"; break;
+ case COMPLETE: os << "COMPLETE"; break;
+ default: os << m_state << "(unknown)";
+ }
+ os << "; " << m_ops.size() << "; store=" << m_store;
+}
+*/
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h
index 308a91aa03..ea8b73407f 100644
--- a/cpp/src/qpid/broker/TxnBuffer.h
+++ b/cpp/src/qpid/broker/TxnBuffer.h
@@ -54,8 +54,12 @@ public:
void asyncLocalCommit();
void asyncLocalAbort();
+ // --- Debug ---
+ //void printState(std::ostream& os);
+
private:
std::vector<boost::shared_ptr<TxnOp> > m_ops;
+ qpid::sys::Mutex m_opsMutex;
TxnHandle m_txnHandle;
AsyncTransactionalStore* m_store;
AsyncResultQueue& m_resultQueue;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp
index 1a3eae4b43..be2b4c891b 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp
@@ -354,13 +354,13 @@ SimplePersistableQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
qpid::asyncStore::AsyncOperation::MSG_ENQUEUE,
&handleAsyncResult,
&m_resultQueue));
+ if (th.isValid()) {
+ th.incrOpCnt();
+ }
m_store->submitEnqueue(qm.enqHandle(),
th,
qac);
++m_asyncOpCounter;
- if (th.isValid()) {
- th.incrOpCnt();
- }
return true;
}
@@ -376,13 +376,13 @@ SimplePersistableQueue::asyncDequeue(qpid::broker::TxnHandle& th,
qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
&handleAsyncResult,
&m_resultQueue));
+ if (th.isValid()) {
+ th.incrOpCnt();
+ }
m_store->submitDequeue(qm.enqHandle(),
th,
qac);
++m_asyncOpCounter;
- if (th.isValid()) {
- th.incrOpCnt();
- }
return true;
}