summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-07-31 13:35:53 +0000
committerKim van der Riet <kpvdr@apache.org>2012-07-31 13:35:53 +0000
commit63c6598f401ac6406e5a31c602c7892b798536fc (patch)
tree73b3c1a519ada213c9e117244aab99d2e64d4f2a
parentb435b07eb8fa9db484f85b39daaf43642dd623ca (diff)
downloadqpid-python-63c6598f401ac6406e5a31c602c7892b798536fc.tar.gz
QPID-3858: WIP: Durable transactions fixed
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1367535 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp8
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.h4
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.cpp71
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.h14
-rw-r--r--cpp/src/qpid/broker/AsyncStore.h10
-rw-r--r--cpp/src/qpid/broker/ConfigHandle.cpp2
-rw-r--r--cpp/src/qpid/broker/ConfigHandle.h2
-rw-r--r--cpp/src/qpid/broker/EnqueueHandle.cpp2
-rw-r--r--cpp/src/qpid/broker/EnqueueHandle.h2
-rw-r--r--cpp/src/qpid/broker/EventHandle.h2
-rw-r--r--cpp/src/qpid/broker/MessageHandle.cpp2
-rw-r--r--cpp/src/qpid/broker/MessageHandle.h2
-rw-r--r--cpp/src/qpid/broker/QueueAsyncContext.cpp32
-rw-r--r--cpp/src/qpid/broker/QueueAsyncContext.h15
-rw-r--r--cpp/src/qpid/broker/QueueHandle.cpp1
-rw-r--r--cpp/src/qpid/broker/QueueHandle.h2
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.cpp147
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.h21
-rw-r--r--cpp/src/qpid/broker/TxnHandle.cpp23
-rw-r--r--cpp/src/qpid/broker/TxnHandle.h7
-rw-r--r--cpp/src/qpid/broker/TxnOp.h6
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp37
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp28
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h2
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h2
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp172
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h9
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp15
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h6
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp25
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h2
34 files changed, 329 insertions, 356 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index 4aeab4c7bf..aa66e7adb8 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -81,16 +81,18 @@ AsyncStoreImpl::createTxnHandle(qpid::broker::TxnBuffer* tb)
}
qpid::broker::TxnHandle
-AsyncStoreImpl::createTxnHandle(const std::string& xid)
+AsyncStoreImpl::createTxnHandle(const std::string& xid,
+ const bool tpcFlag)
{
- return qpid::broker::TxnHandle(new TxnHandleImpl(xid));
+ return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tpcFlag));
}
qpid::broker::TxnHandle
AsyncStoreImpl::createTxnHandle(const std::string& xid,
+ const bool tpcFlag,
qpid::broker::TxnBuffer* tb)
{
- return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tb));
+ return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tpcFlag, tb));
}
void
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
index 3e29039aea..eb3f090ad7 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
@@ -60,8 +60,10 @@ public:
qpid::broker::TxnHandle createTxnHandle();
qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb);
- qpid::broker::TxnHandle createTxnHandle(const std::string& xid);
qpid::broker::TxnHandle createTxnHandle(const std::string& xid,
+ const bool tpcFlag);
+ qpid::broker::TxnHandle createTxnHandle(const std::string& xid,
+ const bool tpcFlag,
qpid::broker::TxnBuffer* tb);
void submitPrepare(qpid::broker::TxnHandle& txnHandle,
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
index 2b343e9517..dd644b29bd 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
@@ -23,53 +23,32 @@
#include "TxnHandleImpl.h"
-#include "qpid/Exception.h"
-#include "qpid/broker/TxnBuffer.h"
-#include "qpid/log/Statement.h"
-
-#include <uuid/uuid.h>
-
namespace qpid {
namespace asyncStore {
TxnHandleImpl::TxnHandleImpl() :
m_tpcFlag(false),
- m_asyncOpCnt(0UL),
m_txnBuffer(0)
-{
- createLocalXid();
-}
+{}
TxnHandleImpl::TxnHandleImpl(qpid::broker::TxnBuffer* tb) :
m_tpcFlag(false),
- m_asyncOpCnt(0UL),
m_txnBuffer(tb)
-{
- createLocalXid();
-}
+{}
-TxnHandleImpl::TxnHandleImpl(const std::string& xid) :
+TxnHandleImpl::TxnHandleImpl(const std::string& xid, const bool tpcFlag) :
m_xid(xid),
- m_tpcFlag(!xid.empty()),
- m_asyncOpCnt(0UL),
+ m_tpcFlag(tpcFlag),
m_txnBuffer(0)
-{
- if (m_xid.empty()) {
- createLocalXid();
- }
-}
+{}
TxnHandleImpl::TxnHandleImpl(const std::string& xid,
+ const bool tpcFlag,
qpid::broker::TxnBuffer* tb) :
m_xid(xid),
- m_tpcFlag(!xid.empty()),
- m_asyncOpCnt(0UL),
+ m_tpcFlag(tpcFlag),
m_txnBuffer(tb)
-{
- if (m_xid.empty()) {
- createLocalXid();
- }
-}
+{}
TxnHandleImpl::~TxnHandleImpl()
{}
@@ -86,38 +65,4 @@ TxnHandleImpl::is2pc() const
return m_tpcFlag;
}
-void
-TxnHandleImpl::incrOpCnt()
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_asyncOpCntMutex);
- ++m_asyncOpCnt;
-}
-
-void
-TxnHandleImpl::decrOpCnt()
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_asyncOpCntMutex);
- if (m_asyncOpCnt == 0UL) {
- throw qpid::Exception("Transaction async operation count underflow");
- }
- if (--m_asyncOpCnt == 0UL && m_txnBuffer) {
- m_txnBuffer->asyncLocalCommit();
- }
-}
-
-// private
-void
-TxnHandleImpl::createLocalXid()
-{
- uuid_t uuid;
-
- // TODO: This call might not be thread safe - Valgrind's helgrind tool emits warnings for this:
- ::uuid_generate_random(uuid);
-
- char uuidStr[37]; // 36-char uuid + trailing '\0'
- ::uuid_unparse(uuid, uuidStr);
- m_xid.assign(uuidStr);
- QPID_LOG(debug, "Local XID created: \"" << m_xid << "\"");
-}
-
}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
index 9452044d66..e1f8afff3e 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
@@ -43,26 +43,16 @@ class TxnHandleImpl : public virtual qpid::RefCounted
public:
TxnHandleImpl();
TxnHandleImpl(qpid::broker::TxnBuffer* tb);
- TxnHandleImpl(const std::string& xid);
- TxnHandleImpl(const std::string& xid, qpid::broker::TxnBuffer* tb);
+ TxnHandleImpl(const std::string& xid, const bool tpcFlag);
+ TxnHandleImpl(const std::string& xid, const bool tpcFlag, qpid::broker::TxnBuffer* tb);
virtual ~TxnHandleImpl();
const std::string& getXid() const;
bool is2pc() const;
- void submitPrepare();
- void submitCommit();
- void submitAbort();
-
- void incrOpCnt();
- void decrOpCnt();
private:
std::string m_xid;
bool m_tpcFlag;
- uint32_t m_asyncOpCnt;
- qpid::sys::Mutex m_asyncOpCntMutex;
qpid::broker::TxnBuffer* const m_txnBuffer;
-
- void createLocalXid();
};
}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h
index 5fb2e0a1eb..6f1c02e059 100644
--- a/cpp/src/qpid/broker/AsyncStore.h
+++ b/cpp/src/qpid/broker/AsyncStore.h
@@ -36,7 +36,6 @@ class AsyncResultHandle;
class AsyncResultQueue {
public:
virtual ~AsyncResultQueue() {}
- // TODO: Remove boost::shared_ptr<> from this interface
virtual void submit(boost::shared_ptr<AsyncResultHandle>) = 0;
};
@@ -79,11 +78,12 @@ public:
virtual TxnHandle createTxnHandle() = 0;
virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0;
- virtual TxnHandle createTxnHandle(const std::string& xid) = 0;
virtual TxnHandle createTxnHandle(const std::string& xid,
- TxnBuffer* tb) = 0;
+ const bool tpcFlag) = 0;
+ virtual TxnHandle createTxnHandle(const std::string& xid,
+ const bool tpcFlag,
+ TxnBuffer* tb) = 0;
- // TODO: Remove boost::shared_ptr<BrokerAsyncContext> from this interface
virtual void submitPrepare(TxnHandle&,
boost::shared_ptr<TpcTxnAsyncContext>) = 0; // Distributed txns only
virtual void submitCommit(TxnHandle&,
@@ -112,8 +112,6 @@ public:
// --- Store async interface ---
- // TODO: Remove boost::shared_ptr<BrokerAsyncContext> from this interface
-
// TODO: Switch from BrokerAsyncContext (parent class) to ConfigAsyncContext
// when theses features (and async context classes) are developed.
virtual void submitCreate(ConfigHandle&,
diff --git a/cpp/src/qpid/broker/ConfigHandle.cpp b/cpp/src/qpid/broker/ConfigHandle.cpp
index 0bd65543ae..6ac8ce6ace 100644
--- a/cpp/src/qpid/broker/ConfigHandle.cpp
+++ b/cpp/src/qpid/broker/ConfigHandle.cpp
@@ -55,4 +55,6 @@ ConfigHandle::operator=(const ConfigHandle& r)
return PrivateImpl::assign(*this, r);
}
+// --- ConfigHandleImpl methods ---
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ConfigHandle.h b/cpp/src/qpid/broker/ConfigHandle.h
index 6bcb3d8ce0..3010018421 100644
--- a/cpp/src/qpid/broker/ConfigHandle.h
+++ b/cpp/src/qpid/broker/ConfigHandle.h
@@ -43,7 +43,7 @@ public:
~ConfigHandle();
ConfigHandle& operator=(const ConfigHandle& r);
- // ConfigHandleImpl methods
+ // --- ConfigHandleImpl methods ---
// <none>
private:
diff --git a/cpp/src/qpid/broker/EnqueueHandle.cpp b/cpp/src/qpid/broker/EnqueueHandle.cpp
index 877eb680a6..aff8673524 100644
--- a/cpp/src/qpid/broker/EnqueueHandle.cpp
+++ b/cpp/src/qpid/broker/EnqueueHandle.cpp
@@ -55,4 +55,6 @@ EnqueueHandle::operator=(const EnqueueHandle& r)
return PrivateImpl::assign(*this, r);
}
+// --- EnqueueHandleImpl methods ---
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/EnqueueHandle.h b/cpp/src/qpid/broker/EnqueueHandle.h
index f869a755b1..63872bb3b3 100644
--- a/cpp/src/qpid/broker/EnqueueHandle.h
+++ b/cpp/src/qpid/broker/EnqueueHandle.h
@@ -43,7 +43,7 @@ public:
~EnqueueHandle();
EnqueueHandle& operator=(const EnqueueHandle& r);
- // EnqueueHandleImpl methods
+ // --- EnqueueHandleImpl methods ---
// <none>
private:
diff --git a/cpp/src/qpid/broker/EventHandle.h b/cpp/src/qpid/broker/EventHandle.h
index 31f0e22dbf..d73cf1e689 100644
--- a/cpp/src/qpid/broker/EventHandle.h
+++ b/cpp/src/qpid/broker/EventHandle.h
@@ -45,7 +45,7 @@ public:
~EventHandle();
EventHandle& operator=(const EventHandle& r);
- // EventHandleImpl methods
+ // --- EventHandleImpl methods ---
const std::string& getKey() const;
private:
diff --git a/cpp/src/qpid/broker/MessageHandle.cpp b/cpp/src/qpid/broker/MessageHandle.cpp
index 2727e74edc..0a9ff50509 100644
--- a/cpp/src/qpid/broker/MessageHandle.cpp
+++ b/cpp/src/qpid/broker/MessageHandle.cpp
@@ -55,4 +55,6 @@ MessageHandle::operator=(const MessageHandle& r)
return PrivateImpl::assign(*this, r);
}
+// --- MessageHandleImpl methods ---
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/MessageHandle.h b/cpp/src/qpid/broker/MessageHandle.h
index e0a68a8878..ab83dd00a0 100644
--- a/cpp/src/qpid/broker/MessageHandle.h
+++ b/cpp/src/qpid/broker/MessageHandle.h
@@ -43,7 +43,7 @@ public:
~MessageHandle();
MessageHandle& operator=(const MessageHandle& r);
- // MessageHandleImpl methods
+ // --- MessageHandleImpl methods ---
// <none>
private:
diff --git a/cpp/src/qpid/broker/QueueAsyncContext.cpp b/cpp/src/qpid/broker/QueueAsyncContext.cpp
index 54a10c9c0e..4bd2d271eb 100644
--- a/cpp/src/qpid/broker/QueueAsyncContext.cpp
+++ b/cpp/src/qpid/broker/QueueAsyncContext.cpp
@@ -29,13 +29,30 @@
namespace qpid {
namespace broker {
+QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+ AsyncResultCallback rcb,
+ AsyncResultQueue* const arq) :
+ m_q(q),
+ m_rcb(rcb),
+ m_arq(arq)
+{}
+
+QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+ boost::intrusive_ptr<PersistableMessage> msg,
+ AsyncResultCallback rcb,
+ AsyncResultQueue* const arq) :
+ m_q(q),
+ m_msg(msg),
+ m_rcb(rcb),
+ m_arq(arq)
+{}
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
- TxnHandle& th,
+ TxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
m_q(q),
- m_th(th),
+ m_tb(tb),
m_rcb(rcb),
m_arq(arq)
{
@@ -44,12 +61,12 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
boost::intrusive_ptr<PersistableMessage> msg,
- TxnHandle& th,
+ TxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
m_q(q),
m_msg(msg),
- m_th(th),
+ m_tb(tb),
m_rcb(rcb),
m_arq(arq)
{
@@ -72,10 +89,9 @@ QueueAsyncContext::getMessage() const
return m_msg;
}
-TxnHandle
-QueueAsyncContext::getTxnHandle() const
-{
- return m_th;
+TxnBuffer*
+QueueAsyncContext::getTxnBuffer() const {
+ return m_tb;
}
AsyncResultQueue*
diff --git a/cpp/src/qpid/broker/QueueAsyncContext.h b/cpp/src/qpid/broker/QueueAsyncContext.h
index 34fd63fd06..e9ba2ebbac 100644
--- a/cpp/src/qpid/broker/QueueAsyncContext.h
+++ b/cpp/src/qpid/broker/QueueAsyncContext.h
@@ -45,18 +45,25 @@ class QueueAsyncContext: public BrokerAsyncContext
{
public:
QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
- TxnHandle& th,
AsyncResultCallback rcb,
AsyncResultQueue* const arq);
QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
boost::intrusive_ptr<PersistableMessage> msg,
- TxnHandle& th,
+ AsyncResultCallback rcb,
+ AsyncResultQueue* const arq);
+ QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+ TxnBuffer* tb,
+ AsyncResultCallback rcb,
+ AsyncResultQueue* const arq);
+ QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+ boost::intrusive_ptr<PersistableMessage> msg,
+ TxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq);
virtual ~QueueAsyncContext();
boost::shared_ptr<PersistableQueue> getQueue() const;
boost::intrusive_ptr<PersistableMessage> getMessage() const;
- TxnHandle getTxnHandle() const;
+ TxnBuffer* getTxnBuffer() const;
AsyncResultQueue* getAsyncResultQueue() const;
AsyncResultCallback getAsyncResultCallback() const;
void invokeCallback(const AsyncResultHandle* const arh) const;
@@ -65,7 +72,7 @@ public:
private:
boost::shared_ptr<PersistableQueue> m_q;
boost::intrusive_ptr<PersistableMessage> m_msg;
- TxnHandle m_th; // TODO: get rid of this when tests::storePerftools::asyncPerf::SimpleQueue has solved its TxnHandle issues.
+ TxnBuffer* m_tb;
AsyncResultCallback m_rcb;
AsyncResultQueue* const m_arq;
};
diff --git a/cpp/src/qpid/broker/QueueHandle.cpp b/cpp/src/qpid/broker/QueueHandle.cpp
index 5a5678df5a..dffb262a3b 100644
--- a/cpp/src/qpid/broker/QueueHandle.cpp
+++ b/cpp/src/qpid/broker/QueueHandle.cpp
@@ -56,6 +56,7 @@ QueueHandle::operator=(const QueueHandle& r)
}
// --- QueueHandleImpl methods ---
+
const std::string&
QueueHandle::getName() const
{
diff --git a/cpp/src/qpid/broker/QueueHandle.h b/cpp/src/qpid/broker/QueueHandle.h
index 234c5e15e8..1110367418 100644
--- a/cpp/src/qpid/broker/QueueHandle.h
+++ b/cpp/src/qpid/broker/QueueHandle.h
@@ -45,7 +45,7 @@ public:
~QueueHandle();
QueueHandle& operator=(const QueueHandle& r);
- // QueueHandleImpl methods
+ // --- QueueHandleImpl methods ---
const std::string& getName() const;
private:
diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp
index f85301e036..4d6e7b7918 100644
--- a/cpp/src/qpid/broker/TxnBuffer.cpp
+++ b/cpp/src/qpid/broker/TxnBuffer.cpp
@@ -29,31 +29,86 @@
#include "qpid/log/Statement.h"
+#include <uuid/uuid.h>
+
namespace qpid {
namespace broker {
+qpid::sys::Mutex TxnBuffer::s_uuidMutex;
+
TxnBuffer::TxnBuffer(AsyncResultQueue& arq) :
m_store(0),
m_resultQueue(arq),
+ m_tpcFlag(false),
+ m_submitOpCnt(0),
+ m_completeOpCnt(0),
m_state(NONE)
-{}
+{
+ createLocalXid();
+}
-TxnBuffer::~TxnBuffer()
-{}
+TxnBuffer::TxnBuffer(AsyncResultQueue& arq, std::string& xid) :
+ m_store(0),
+ m_resultQueue(arq),
+ m_xid(xid),
+ m_tpcFlag(!xid.empty()),
+ m_submitOpCnt(0),
+ m_completeOpCnt(0),
+ m_state(NONE)
+{
+ if (m_xid.empty()) {
+ createLocalXid();
+ }
+}
+
+TxnBuffer::~TxnBuffer() {}
+
+TxnHandle&
+TxnBuffer::getTxnHandle() {
+ return m_txnHandle;
+}
+
+const std::string&
+TxnBuffer::getXid() const {
+ return m_xid;
+}
+
+bool
+TxnBuffer::is2pc() const {
+ return m_tpcFlag;
+}
void
-TxnBuffer::enlist(boost::shared_ptr<TxnOp> op)
-{
+TxnBuffer::incrOpCnt() {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_submitOpCntMutex);
+ ++m_submitOpCnt;
+}
+
+void
+TxnBuffer::decrOpCnt() {
+ const uint32_t numOps = getNumOps();
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l2(m_completeOpCntMutex);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l3(m_submitOpCntMutex);
+ if (m_completeOpCnt == m_submitOpCnt) {
+ throw qpid::Exception("Transaction async operation count underflow");
+ }
+ ++m_completeOpCnt;
+ if (numOps == m_submitOpCnt && numOps == m_completeOpCnt) {
+ asyncLocalCommit();
+ }
+}
+
+void
+TxnBuffer::enlist(boost::shared_ptr<TxnOp> op) {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
m_ops.push_back(op);
}
bool
-TxnBuffer::prepare(TxnHandle& th)
-{
+TxnBuffer::prepare() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
- if (!(*i)->prepare(th)) {
+ if (!(*i)->prepare(this)) {
return false;
}
}
@@ -61,8 +116,7 @@ TxnBuffer::prepare(TxnHandle& th)
}
void
-TxnBuffer::commit()
-{
+TxnBuffer::commit() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->commit();
@@ -71,8 +125,7 @@ TxnBuffer::commit()
}
void
-TxnBuffer::rollback()
-{
+TxnBuffer::rollback() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->rollback();
@@ -81,45 +134,44 @@ TxnBuffer::rollback()
}
bool
-TxnBuffer::commitLocal(AsyncTransactionalStore* const store)
-{
- if (store) {
- try {
- m_store = store;
- asyncLocalCommit();
- } catch (std::exception& e) {
- QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed: " << e.what());
- } catch (...) {
- QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed (unknown exception)");
- }
+TxnBuffer::commitLocal(AsyncTransactionalStore* const store) {
+ try {
+ m_store = store;
+ asyncLocalCommit();
+ } catch (std::exception& e) {
+ QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed: " << e.what());
+ } catch (...) {
+ QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed (unknown exception)");
}
return false;
}
void
-TxnBuffer::asyncLocalCommit()
-{
- assert(m_store != 0);
+TxnBuffer::asyncLocalCommit() {
switch(m_state) {
case NONE:
m_state = PREPARE;
- m_txnHandle = m_store->createTxnHandle(this);
- prepare(m_txnHandle);
- break;
+ if (m_store) {
+ m_txnHandle = m_store->createTxnHandle(this);
+ }
+ prepare(/*shared_from_this()*/);
+ if (m_store) {
+ break;
+ }
case PREPARE:
m_state = COMMIT;
- {
+ if (m_store) {
boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
&handleAsyncCommitResult,
&m_resultQueue));
m_store->testOp();
m_store->submitCommit(m_txnHandle, tac);
+ break;
}
- break;
case COMMIT:
commit();
m_state = COMPLETE;
- delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
+ delete this;
break;
case COMPLETE:
default: ;
@@ -142,8 +194,7 @@ TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) {
}
void
-TxnBuffer::asyncLocalAbort()
-{
+TxnBuffer::asyncLocalAbort() {
assert(m_store != 0);
switch (m_state) {
case NONE:
@@ -160,7 +211,7 @@ TxnBuffer::asyncLocalAbort()
case ROLLBACK:
rollback();
m_state = COMPLETE;
- delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
+ delete this;
default: ;
}
}
@@ -171,11 +222,33 @@ TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) {
if (arh) {
boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
if (arh->getErrNo()) {
- QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo()
- << " (" << arh->getErrMsg() << ")");
+ QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr()
+ << " failed: err=" << arh->getErrNo() << " (" << arh->getErrMsg() << ")");
}
tac->getTxnBuffer()->asyncLocalAbort();
}
}
+// private
+uint32_t
+TxnBuffer::getNumOps() const {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
+ return m_ops.size();
+}
+
+// private
+void
+TxnBuffer::createLocalXid()
+{
+ uuid_t uuid;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(s_uuidMutex);
+ ::uuid_generate_random(uuid); // Not thread-safe
+ }
+ char uuidStr[37]; // 36-char uuid + trailing '\0'
+ ::uuid_unparse(uuid, uuidStr);
+ m_xid.assign(uuidStr);
+ QPID_LOG(debug, "Local XID created: \"" << m_xid << "\"");
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h
index 7b85a1b6c4..02569f6545 100644
--- a/cpp/src/qpid/broker/TxnBuffer.h
+++ b/cpp/src/qpid/broker/TxnBuffer.h
@@ -42,10 +42,16 @@ class TxnOp;
class TxnBuffer {
public:
TxnBuffer(AsyncResultQueue& arq);
+ TxnBuffer(AsyncResultQueue& arq, std::string& xid);
virtual ~TxnBuffer();
+ TxnHandle& getTxnHandle();
+ const std::string& getXid() const;
+ bool is2pc() const;
+ void incrOpCnt();
+ void decrOpCnt();
void enlist(boost::shared_ptr<TxnOp> op);
- bool prepare(TxnHandle& th);
+ bool prepare();
void commit();
void rollback();
bool commitLocal(AsyncTransactionalStore* const store);
@@ -57,14 +63,25 @@ public:
static void handleAsyncAbortResult(const AsyncResultHandle* const arh);
private:
+ mutable qpid::sys::Mutex m_opsMutex;
+ mutable qpid::sys::Mutex m_submitOpCntMutex;
+ mutable qpid::sys::Mutex m_completeOpCntMutex;
+ static qpid::sys::Mutex s_uuidMutex;
+
std::vector<boost::shared_ptr<TxnOp> > m_ops;
- qpid::sys::Mutex m_opsMutex;
TxnHandle m_txnHandle;
AsyncTransactionalStore* m_store;
AsyncResultQueue& m_resultQueue;
+ std::string m_xid;
+ bool m_tpcFlag;
+ uint32_t m_submitOpCnt;
+ uint32_t m_completeOpCnt;
typedef enum {NONE = 0, PREPARE, COMMIT, ROLLBACK, COMPLETE} e_txnState;
e_txnState m_state;
+
+ uint32_t getNumOps() const;
+ void createLocalXid();
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TxnHandle.cpp b/cpp/src/qpid/broker/TxnHandle.cpp
index 58cedd586e..f8977b2132 100644
--- a/cpp/src/qpid/broker/TxnHandle.cpp
+++ b/cpp/src/qpid/broker/TxnHandle.cpp
@@ -57,28 +57,5 @@ TxnHandle::operator=(const TxnHandle& r)
// --- TxnHandleImpl methods ---
-const std::string&
-TxnHandle::getXid() const
-{
- return impl->getXid();
-}
-
-bool
-TxnHandle::is2pc() const
-{
- return impl->is2pc();
-}
-
-void
-TxnHandle::incrOpCnt()
-{
- impl->incrOpCnt();
-}
-
-void
-TxnHandle::decrOpCnt()
-{
- impl->decrOpCnt();
-}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TxnHandle.h b/cpp/src/qpid/broker/TxnHandle.h
index 7302490939..f55d5b01d8 100644
--- a/cpp/src/qpid/broker/TxnHandle.h
+++ b/cpp/src/qpid/broker/TxnHandle.h
@@ -45,11 +45,8 @@ public:
~TxnHandle();
TxnHandle& operator=(const TxnHandle& r);
- // TxnHandleImpl methods
- const std::string& getXid() const;
- bool is2pc() const;
- void incrOpCnt();
- void decrOpCnt();
+ // --- TxnHandleImpl methods ---
+ // <none>
private:
friend class PrivateImplRef<TxnHandle>;
diff --git a/cpp/src/qpid/broker/TxnOp.h b/cpp/src/qpid/broker/TxnOp.h
index 1626e30ccd..bcff87551c 100644
--- a/cpp/src/qpid/broker/TxnOp.h
+++ b/cpp/src/qpid/broker/TxnOp.h
@@ -24,15 +24,17 @@
#ifndef qpid_broker_TxnOp_h_
#define qpid_broker_TxnOp_h_
+#include <boost/shared_ptr.hpp>
+
namespace qpid {
namespace broker {
-class TxnHandle;
+class TxnBuffer;
class TxnOp{
public:
virtual ~TxnOp() {}
- virtual bool prepare(TxnHandle& th) throw() = 0;
+ virtual bool prepare(qpid::broker::TxnBuffer*) throw() = 0;
virtual void commit() throw() = 0;
virtual void rollback() throw() = 0;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
index e1c67a9547..6f33369a26 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
@@ -82,9 +82,9 @@ DeliveryRecord::isRedundant() const
}
void
-DeliveryRecord::dequeue(qpid::broker::TxnHandle& txn)
+DeliveryRecord::dequeue(qpid::broker::TxnBuffer* tb)
{
- m_queuedMessage->getQueue()->dequeue(txn, m_queuedMessage);
+ m_queuedMessage->getQueue()->dequeue(tb, m_queuedMessage);
}
void
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
index d4529941e7..6c5d87f374 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
@@ -30,7 +30,7 @@
namespace qpid {
namespace broker {
-class TxnHandle;
+class TxnBuffer;
}}
namespace tests {
@@ -51,7 +51,7 @@ public:
bool setEnded();
bool isEnded() const;
bool isRedundant() const;
- void dequeue(qpid::broker::TxnHandle& txn);
+ void dequeue(qpid::broker::TxnBuffer* tb);
void committed() const;
boost::shared_ptr<QueuedMessage> getQueuedMessage() const;
private:
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 4a2bc2bf0c..6aa477c470 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -47,22 +47,18 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
m_queue(queue)
{}
-MessageConsumer::~MessageConsumer()
-{}
+MessageConsumer::~MessageConsumer() {}
void
-MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr)
-{
+MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) {
m_unacked.push_back(dr);
}
void
-MessageConsumer::commitComplete()
-{}
+MessageConsumer::commitComplete() {}
void*
-MessageConsumer::runConsumers()
-{
+MessageConsumer::runConsumers() {
const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U;
uint16_t opsInTxnCnt = 0U;
qpid::broker::TxnBuffer* tb = 0;
@@ -78,17 +74,13 @@ MessageConsumer::runConsumers()
++numMsgs;
if (useTxns) {
// --- Transactional dequeue ---
+ boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked));
+ m_unacked.clear();
+ tb->enlist(ta);
if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) {
- if (m_perfTestParams.m_durable) {
- boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked));
- m_unacked.clear();
- tb->enlist(ta);
- tb->commitLocal(m_store);
- if (numMsgs < m_perfTestParams.m_numMsgs) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
- }
- } else {
- tb->commit();
+ tb->commitLocal(m_store);
+ if (numMsgs < m_perfTestParams.m_numMsgs) {
+ tb = new qpid::broker::TxnBuffer(m_resultQueue);
}
opsInTxnCnt = 0U;
}
@@ -105,11 +97,7 @@ MessageConsumer::runConsumers()
}
if (opsInTxnCnt) {
- if (m_perfTestParams.m_durable) {
- tb->commitLocal(m_store);
- } else {
- tb->commit();
- }
+ tb->commitLocal(m_store);
}
return reinterpret_cast<void*>(0);
@@ -117,8 +105,7 @@ MessageConsumer::runConsumers()
//static
void*
-MessageConsumer::startConsumers(void* ptr)
-{
+MessageConsumer::startConsumers(void* ptr) {
return reinterpret_cast<MessageConsumer*>(ptr)->runConsumers();
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
index 7d9aaceb11..974f3f3981 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
@@ -49,12 +49,10 @@ MessageProducer::MessageProducer(const TestOptions& perfTestParams,
m_queue(queue)
{}
-MessageProducer::~MessageProducer()
-{}
+MessageProducer::~MessageProducer() {}
void*
-MessageProducer::runProducers()
-{
+MessageProducer::runProducers() {
const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U;
uint16_t recsInTxnCnt = 0U;
qpid::broker::TxnBuffer* tb = 0;
@@ -68,17 +66,13 @@ MessageProducer::runProducers()
op->deliverTo(m_queue);
tb->enlist(op);
if (++recsInTxnCnt >= m_perfTestParams.m_enqTxnBlockSize) {
- if (m_perfTestParams.m_durable) {
- tb->commitLocal(m_store);
+ tb->commitLocal(m_store);
- // TxnBuffer instance tb carries async state that precludes it being re-used for the next
- // transaction until the current commit cycle completes. So use another instance. This
- // instance should auto-delete when the async commit cycle completes.
- if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
- }
- } else {
- tb->commit();
+ // TxnBuffer instance tb carries async state that precludes it being re-used for the next
+ // transaction until the current commit cycle completes. So use another instance. This
+ // instance should auto-delete when the async commit cycle completes.
+ if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) {
+ tb = new qpid::broker::TxnBuffer(m_resultQueue);
}
recsInTxnCnt = 0U;
}
@@ -87,11 +81,7 @@ MessageProducer::runProducers()
}
}
if (recsInTxnCnt) {
- if (m_perfTestParams.m_durable) {
- tb->commitLocal(m_store);
- } else {
- tb->commit();
- }
+ tb->commitLocal(m_store);
}
return reinterpret_cast<void*>(0);
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
index 7fa74a2c51..127408e3db 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
@@ -32,6 +32,7 @@ class AsyncStoreImpl;
}
namespace broker {
class AsyncResultQueue;
+class TxnBuffer;
}}
namespace tests {
@@ -40,7 +41,6 @@ namespace asyncPerf {
class SimpleQueue;
class TestOptions;
-class TxnBuffer;
class MessageProducer
{
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
index 572089faaf..0d16248c7f 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
@@ -89,9 +89,9 @@ QueuedMessage::enqHandle()
}
void
-QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th)
+QueuedMessage::prepareEnqueue(qpid::broker::TxnBuffer* tb)
{
- m_queue->enqueue(th, shared_from_this());
+ m_queue->enqueue(tb, shared_from_this());
}
void
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
index dd10f8b501..630fe1aedc 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
@@ -59,7 +59,7 @@ public:
qpid::broker::EnqueueHandle& enqHandle();
// --- Transaction handling ---
- void prepareEnqueue(qpid::broker::TxnHandle& th);
+ void prepareEnqueue(qpid::broker::TxnBuffer* tb);
void commitEnqueue();
void abortEnqueue();
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
index f297e83402..06b4e9333f 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
@@ -31,7 +31,7 @@
#include "qpid/broker/AsyncResultHandle.h"
#include "qpid/broker/QueueAsyncContext.h"
-#include "qpid/broker/TxnHandle.h"
+#include "qpid/broker/TxnBuffer.h"
#include <string.h> // memcpy()
@@ -65,33 +65,27 @@ SimpleQueue::SimpleQueue(const std::string& name,
}
}
-SimpleQueue::~SimpleQueue()
-{}
+SimpleQueue::~SimpleQueue() {}
const qpid::broker::QueueHandle&
-SimpleQueue::getHandle() const
-{
+SimpleQueue::getHandle() const {
return m_queueHandle;
}
qpid::broker::QueueHandle&
-SimpleQueue::getHandle()
-{
+SimpleQueue::getHandle() {
return m_queueHandle;
}
qpid::broker::AsyncStore*
-SimpleQueue::getStore()
-{
+SimpleQueue::getStore() {
return m_store;
}
void
-SimpleQueue::asyncCreate()
-{
+SimpleQueue::asyncCreate() {
if (m_store) {
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- s_nullTxnHandle,
&handleAsyncCreateResult,
&m_resultQueue));
m_store->submitCreate(m_queueHandle, this, qac);
@@ -123,7 +117,6 @@ SimpleQueue::asyncDestroy(const bool deleteQueue)
if (m_store) {
if (deleteQueue) {
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- s_nullTxnHandle,
&handleAsyncDestroyResult,
&m_resultQueue));
m_store->submitDestroy(m_queueHandle, qac);
@@ -151,16 +144,14 @@ SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* con
}
void
-SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
-{
+SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) {
boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
- enqueue(s_nullTxnHandle, qm);
+ enqueue(qm);
push(qm);
}
bool
-SimpleQueue::dispatch(MessageConsumer& mc)
-{
+SimpleQueue::dispatch(MessageConsumer& mc) {
boost::shared_ptr<QueuedMessage> qm;
if (m_messages->consume(qm)) {
boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false));
@@ -171,110 +162,95 @@ SimpleQueue::dispatch(MessageConsumer& mc)
}
bool
-SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm)
-{
- return enqueue(s_nullTxnHandle, qm);
+SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) {
+ return enqueue(0, qm);
}
bool
-SimpleQueue::enqueue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<QueuedMessage> qm)
-{
+SimpleQueue::enqueue(qpid::broker::TxnBuffer* tb,
+ boost::shared_ptr<QueuedMessage> qm) {
ScopedUse u(m_barrier);
if (!u.m_acquired) {
return false;
}
if (qm->payload()->isPersistent() && m_store) {
qm->payload()->enqueueAsync(shared_from_this(), m_store);
- return asyncEnqueue(th, qm);
+ return asyncEnqueue(tb, qm);
}
return false;
}
bool
-SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm)
-{
- return dequeue(s_nullTxnHandle, qm);
+SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) {
+ return dequeue(0, qm);
}
bool
-SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<QueuedMessage> qm)
-{
+SimpleQueue::dequeue(qpid::broker::TxnBuffer* tb,
+ boost::shared_ptr<QueuedMessage> qm) {
ScopedUse u(m_barrier);
if (!u.m_acquired) {
return false;
}
if (qm->payload()->isPersistent() && m_store) {
qm->payload()->dequeueAsync(shared_from_this(), m_store);
- return asyncDequeue(th, qm);
+ return asyncDequeue(tb, qm);
}
return true;
}
void
-SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
-{
+SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) {
push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
}
void
-SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage> /*msg*/)
-{}
+SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage>) {}
void
-SimpleQueue::encode(qpid::framing::Buffer& buffer) const
-{
+SimpleQueue::encode(qpid::framing::Buffer& buffer) const {
buffer.putShortString(m_name);
}
uint32_t
-SimpleQueue::encodedSize() const
-{
+SimpleQueue::encodedSize() const {
return m_name.size() + 1;
}
uint64_t
-SimpleQueue::getPersistenceId() const
-{
+SimpleQueue::getPersistenceId() const {
return m_persistenceId;
}
void
-SimpleQueue::setPersistenceId(uint64_t persistenceId) const
-{
+SimpleQueue::setPersistenceId(uint64_t persistenceId) const {
m_persistenceId = persistenceId;
}
void
-SimpleQueue::flush()
-{
+SimpleQueue::flush() {
//if(m_store) m_store->flush(*this);
}
const std::string&
-SimpleQueue::getName() const
-{
+SimpleQueue::getName() const {
return m_name;
}
void
-SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst)
-{
+SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) {
if (externalQueueStore != inst && externalQueueStore)
delete externalQueueStore;
externalQueueStore = inst;
}
uint64_t
-SimpleQueue::getSize()
-{
+SimpleQueue::getSize() {
return m_persistableData.size();
}
void
-SimpleQueue::write(char* target)
-{
+SimpleQueue::write(char* target) {
::memcpy(target, m_persistableData.data(), m_persistableData.size());
}
@@ -344,21 +320,20 @@ SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm,
// private
bool
-SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<QueuedMessage> qm)
-{
+SimpleQueue::asyncEnqueue(qpid::broker::TxnBuffer* tb,
+ boost::shared_ptr<QueuedMessage> qm) {
assert(qm.get());
-// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
qm->payload(),
- th,
+ tb,
&handleAsyncEnqueueResult,
&m_resultQueue));
- // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
- if (th.isValid()) {
- th.incrOpCnt();
+ if (tb) {
+ tb->incrOpCnt();
+ m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac);
+ } else {
+ m_store->submitEnqueue(qm->enqHandle(), s_nullTxnHandle, qac);
}
- m_store->submitEnqueue(qm->enqHandle(), th, qac);
++m_asyncOpCounter;
return true;
}
@@ -382,22 +357,21 @@ SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* con
// private
bool
-SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
+SimpleQueue::asyncDequeue(/*boost::shared_ptr<qpid::broker::TxnBuffer>*/qpid::broker::TxnBuffer* tb,
boost::shared_ptr<QueuedMessage> qm)
{
assert(qm.get());
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
qm->payload(),
- th,
+ tb,
&handleAsyncDequeueResult,
&m_resultQueue));
- // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
- if (th.isValid()) {
- th.incrOpCnt();
+ if (tb) {
+ tb->incrOpCnt();
+ m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac);
+ } else {
+ m_store->submitDequeue(qm->enqHandle(), s_nullTxnHandle, qac);
}
- m_store->submitDequeue(qm->enqHandle(),
- th,
- qac);
++m_asyncOpCounter;
return true;
}
@@ -420,8 +394,7 @@ SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* con
// private
void
-SimpleQueue::destroyCheck(const std::string& opDescr) const
-{
+SimpleQueue::destroyCheck(const std::string& opDescr) const {
if (m_destroyPending || m_destroyed) {
std::ostringstream oss;
oss << opDescr << " on queue \"" << m_name << "\" after call to destroy";
@@ -431,55 +404,54 @@ SimpleQueue::destroyCheck(const std::string& opDescr) const
// private
void
-SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
+SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ }
--m_asyncOpCounter;
}
// private
void
-SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
+SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ }
--m_asyncOpCounter;
}
// private
void
-SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
+SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ }
--m_asyncOpCounter;
m_destroyed = true;
}
// private
void
-SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
- --m_asyncOpCounter;
-
- // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
- qpid::broker::TxnHandle th = qc->getTxnHandle();
- if (th.isValid()) { // transactional enqueue
- th.decrOpCnt();
+SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ if (qc->getTxnBuffer()) { // transactional enqueue
+ qc->getTxnBuffer()->decrOpCnt();
+ }
}
+ --m_asyncOpCounter;
}
// private
void
-SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
- --m_asyncOpCounter;
-
- // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
- qpid::broker::TxnHandle th = qc->getTxnHandle();
- if (th.isValid()) { // transactional enqueue
- th.decrOpCnt();
+SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ if (qc->getTxnBuffer()) { // transactional enqueue
+ qc->getTxnBuffer()->decrOpCnt();
+ }
}
+ --m_asyncOpCounter;
}
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
index bf88e32345..5f64c9b960 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
@@ -38,6 +38,7 @@ namespace qpid {
namespace broker {
class AsyncResultQueue;
class QueueAsyncContext;
+class TxnBuffer;
}
namespace framing {
class FieldTable;
@@ -76,10 +77,10 @@ public:
void deliver(boost::intrusive_ptr<SimpleMessage> msg);
bool dispatch(MessageConsumer& mc);
bool enqueue(boost::shared_ptr<QueuedMessage> qm);
- bool enqueue(qpid::broker::TxnHandle& th,
+ bool enqueue(qpid::broker::TxnBuffer* tb,
boost::shared_ptr<QueuedMessage> qm);
bool dequeue(boost::shared_ptr<QueuedMessage> qm);
- bool dequeue(qpid::broker::TxnHandle& th,
+ bool dequeue(qpid::broker::TxnBuffer* tb,
boost::shared_ptr<QueuedMessage> qm);
void process(boost::intrusive_ptr<SimpleMessage> msg);
void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg);
@@ -134,10 +135,10 @@ private:
bool isRecovery = false);
// -- Async ops ---
- bool asyncEnqueue(qpid::broker::TxnHandle& th,
+ bool asyncEnqueue(qpid::broker::TxnBuffer* tb,
boost::shared_ptr<QueuedMessage> qm);
static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh);
- bool asyncDequeue(qpid::broker::TxnHandle& th,
+ bool asyncDequeue(qpid::broker::TxnBuffer* tb,
boost::shared_ptr<QueuedMessage> qm);
static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh);
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
index 7bede50272..375cd568d2 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
@@ -35,18 +35,17 @@ TxnAccept::TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops) :
m_ops(ops)
{}
-TxnAccept::~TxnAccept()
-{}
+TxnAccept::~TxnAccept() {}
// --- Interface TxnOp ---
bool
-TxnAccept::prepare(qpid::broker::TxnHandle& th) throw()
-{
+TxnAccept::prepare(qpid::broker::TxnBuffer* tb) throw() {
try {
for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
- (*i)->dequeue(th);
+ (*i)->dequeue(tb);
}
+ return true;
} catch (const std::exception& e) {
QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what());
} catch (...) {
@@ -56,8 +55,7 @@ TxnAccept::prepare(qpid::broker::TxnHandle& th) throw()
}
void
-TxnAccept::commit() throw()
-{
+TxnAccept::commit() throw() {
try {
for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) {
(*i)->committed();
@@ -71,7 +69,6 @@ TxnAccept::commit() throw()
}
void
-TxnAccept::rollback() throw()
-{}
+TxnAccept::rollback() throw() {}
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
index 6bc7ff9ccb..5d84289965 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
@@ -41,9 +41,9 @@ public:
virtual ~TxnAccept();
// --- Interface TxnOp ---
- bool prepare(qpid::broker::TxnHandle& th) throw();
- void commit() throw();
- void rollback() throw();
+ bool prepare(qpid::broker::TxnBuffer* tb) throw();
+ void commit() throw();
+ void rollback() throw();
private:
std::deque<boost::shared_ptr<DeliveryRecord> > m_ops;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
index 6e15526e8f..cc36a38be7 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
@@ -38,15 +38,13 @@ TxnPublish::TxnPublish(boost::intrusive_ptr<SimpleMessage> msg) :
m_msg(msg)
{}
-TxnPublish::~TxnPublish()
-{}
+TxnPublish::~TxnPublish() {}
bool
-TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
-{
- try{
+TxnPublish::prepare(qpid::broker::TxnBuffer* tb) throw() {
+ try {
while (!m_queues.empty()) {
- m_queues.front()->prepareEnqueue(th);
+ m_queues.front()->prepareEnqueue(tb);
m_prepared.push_back(m_queues.front());
m_queues.pop_front();
}
@@ -60,8 +58,7 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
}
void
-TxnPublish::commit() throw()
-{
+TxnPublish::commit() throw() {
try {
for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
(*i)->commitEnqueue();
@@ -74,8 +71,7 @@ TxnPublish::commit() throw()
}
void
-TxnPublish::rollback() throw()
-{
+TxnPublish::rollback() throw() {
try {
for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
(*i)->abortEnqueue();
@@ -88,21 +84,18 @@ TxnPublish::rollback() throw()
}
uint64_t
-TxnPublish::contentSize()
-{
+TxnPublish::contentSize() {
return m_msg->contentSize();
}
void
-TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue)
-{
+TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) {
m_queues.push_back(boost::shared_ptr<QueuedMessage>(new QueuedMessage(queue.get(), m_msg)));
m_delivered = true;
}
SimpleMessage&
-TxnPublish::getMessage()
-{
+TxnPublish::getMessage() {
return *m_msg;
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
index 17c3b3778d..eae9ef9c4c 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
@@ -48,7 +48,7 @@ public:
virtual ~TxnPublish();
// --- Interface TxOp ---
- bool prepare(qpid::broker::TxnHandle& th) throw();
+ bool prepare(qpid::broker::TxnBuffer* tb) throw();
void commit() throw();
void rollback() throw();