summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-06-15 19:21:07 +0000
committerKim van der Riet <kpvdr@apache.org>2012-06-15 19:21:07 +0000
commit58337ca40df3a57a16cdee9b7f6b4fe0361b0018 (patch)
tree391ad7ad1ea8cd42a7e9d4890c724b186e00f38b
parent01174a9e568f11cd5aa4f22aaa914e00ab9fe163 (diff)
downloadqpid-python-58337ca40df3a57a16cdee9b7f6b4fe0361b0018.tar.gz
QPID-3858: WIP - async txns for msg publish pathway, but there are some race/thread issues to sort out.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1350745 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/CMakeLists.txt7
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp32
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.h5
-rw-r--r--cpp/src/qpid/asyncStore/AtomicCounter.h (renamed from cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h)34
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.cpp73
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.h24
-rw-r--r--cpp/src/qpid/broker/AsyncResultQueueImpl.cpp1
-rw-r--r--cpp/src/qpid/broker/AsyncStore.h30
-rw-r--r--cpp/src/qpid/broker/TxnAsyncContext.cpp86
-rw-r--r--cpp/src/qpid/broker/TxnAsyncContext.h (renamed from cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h)45
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.cpp188
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.h69
-rw-r--r--cpp/src/qpid/broker/TxnHandle.cpp12
-rw-r--r--cpp/src/qpid/broker/TxnHandle.h2
-rw-r--r--cpp/src/qpid/broker/TxnOp.h (renamed from cpp/src/qpid/broker/AsyncStore.cpp)27
-rw-r--r--cpp/src/tests/CMakeLists.txt4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp)42
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/Deliverable.h54
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp47
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h8
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp2
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp14
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h5
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp18
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h12
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp6
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h9
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp57
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h19
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp240
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.h90
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp113
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h75
34 files changed, 985 insertions, 469 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 575c78e320..6077bade08 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -1076,10 +1076,6 @@ set (qpidbroker_SOURCES
${qpidbroker_platform_SOURCES}
qpid/amqp_0_10/Connection.h
qpid/amqp_0_10/Connection.cpp
- qpid/broker/AsyncResultHandle.cpp
- qpid/broker/AsyncResultHandleImpl.cpp
- qpid/broker/AsyncResultQueueImpl.cpp
- qpid/broker/AsyncStore.cpp
qpid/broker/Broker.cpp
qpid/broker/Credit.cpp
qpid/broker/Exchange.cpp
@@ -1503,12 +1499,15 @@ set (asyncStore_SOURCES
qpid/asyncStore/TxnHandleImpl.cpp
qpid/broker/AsyncResultHandle.cpp
qpid/broker/AsyncResultHandleImpl.cpp
+ qpid/broker/AsyncResultQueueImpl.cpp
qpid/broker/ConfigHandle.cpp
qpid/broker/EnqueueHandle.cpp
qpid/broker/EventHandle.cpp
qpid/broker/IdHandle.cpp
qpid/broker/MessageHandle.cpp
qpid/broker/QueueHandle.cpp
+ qpid/broker/TxnAsyncContext.cpp
+ qpid/broker/TxnBuffer.cpp
qpid/broker/TxnHandle.cpp
)
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index a9fc13363a..9135fcc27e 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -24,6 +24,7 @@
#include "AsyncStoreImpl.h"
#include "AsyncOperation.h"
+#include "TxnHandleImpl.h"
#include "qpid/broker/ConfigHandle.h"
#include "qpid/broker/EnqueueHandle.h"
@@ -62,6 +63,31 @@ void
AsyncStoreImpl::initManagement(qpid::broker::Broker* /*broker*/)
{}
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle()
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl);
+}
+
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle(qpid::broker::TxnBuffer* tb)
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl(tb));
+}
+
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle(const std::string& xid)
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl(xid));
+}
+
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle(const std::string& xid,
+ qpid::broker::TxnBuffer* tb)
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tb));
+}
+
qpid::broker::ConfigHandle
AsyncStoreImpl::createConfigHandle()
{
@@ -96,12 +122,6 @@ AsyncStoreImpl::createQueueHandle(const std::string& name,
return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts));
}
-qpid::broker::TxnHandle
-AsyncStoreImpl::createTxnHandle(const std::string& xid)
-{
- return qpid::broker::TxnHandle(new TxnHandleImpl(xid));
-}
-
void
AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
index 3f2058a94c..a00771abf5 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
@@ -54,6 +54,10 @@ public:
// --- Factory methods for creating handles ---
+ qpid::broker::TxnHandle createTxnHandle();
+ qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb);
+ qpid::broker::TxnHandle createTxnHandle(const std::string& xid);
+ qpid::broker::TxnHandle createTxnHandle(const std::string& xid, qpid::broker::TxnBuffer* tb);
qpid::broker::ConfigHandle createConfigHandle();
qpid::broker::EnqueueHandle createEnqueueHandle(qpid::broker::MessageHandle& msgHandle,
@@ -63,7 +67,6 @@ public:
qpid::broker::MessageHandle createMessageHandle(const qpid::broker::DataSource* const dataSrc);
qpid::broker::QueueHandle createQueueHandle(const std::string& name,
const qpid::types::Variant::Map& opts);
- qpid::broker::TxnHandle createTxnHandle(const std::string& xid=std::string());
// --- Store async interface ---
diff --git a/cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h b/cpp/src/qpid/asyncStore/AtomicCounter.h
index d354c729dc..6987b09384 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h
+++ b/cpp/src/qpid/asyncStore/AtomicCounter.h
@@ -21,16 +21,15 @@
* \file AtomicCounter.h
*/
-#ifndef tests_storePerftools_asyncPerf_AtomicCounter_h_
-#define tests_storePerftools_asyncPerf_AtomicCounter_h_
+#ifndef qpid_asyncStore_AtomicCounter_h_
+#define qpid_asyncStore_AtomicCounter_h_
#include "qpid/sys/Condition.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Time.h"
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace asyncStore {
template <class T>
class AtomicCounter
@@ -52,20 +51,37 @@ public:
return m_cnt;
}
- void
+ AtomicCounter&
operator++()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
++m_cnt;
+ return *this;
}
- void
+ AtomicCounter&
operator--()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
if (--m_cnt == 0) {
m_cntCondition.notify();
}
+ return *this;
+ }
+
+ bool
+ operator==(const AtomicCounter& rhs)
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l1(m_cntMutex);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l2(rhs.m_cntMutex);
+ return m_cnt == rhs.m_cnt;
+ }
+
+ bool
+ operator==(const T rhs)
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ return m_cnt == rhs;
}
void
@@ -85,6 +101,6 @@ private:
typedef AtomicCounter<uint32_t> AsyncOpCounter;
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::asyncStore
-#endif // tests_storePerftools_asyncPerf_AtomicCounter_h_
+#endif // qpid_asyncStore_AtomicCounter_h_
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
index 7ce01f881c..945b50861d 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
@@ -23,6 +23,8 @@
#include "TxnHandleImpl.h"
+#include "qpid/Exception.h"
+#include "qpid/broker/TxnBuffer.h"
#include "qpid/messaging/PrivateImplRef.h"
#include <uuid/uuid.h>
@@ -30,16 +32,42 @@
namespace qpid {
namespace asyncStore {
+TxnHandleImpl::TxnHandleImpl() :
+ m_tpcFlag(false),
+ m_asyncOpCnt(0UL),
+ m_txnBuffer(0)
+{
+ createLocalXid();
+}
+
+TxnHandleImpl::TxnHandleImpl(qpid::broker::TxnBuffer* tb) :
+ m_tpcFlag(false),
+ m_asyncOpCnt(0UL),
+ m_txnBuffer(tb)
+{
+ createLocalXid();
+}
+
TxnHandleImpl::TxnHandleImpl(const std::string& xid) :
m_xid(xid),
- m_tpcFlag(!xid.empty())
-{
- if (m_xid.empty()) { // create a local xid from a random uuid
- uuid_t uuid;
- ::uuid_generate_random(uuid);
- char uuidStr[37]; // 36-char uuid + trailing '\0'
- ::uuid_unparse(uuid, uuidStr);
-// m_xid.assign(uuidStr);
+ m_tpcFlag(!xid.empty()),
+ m_asyncOpCnt(0UL),
+ m_txnBuffer(0)
+{
+ if (m_xid.empty()) {
+ createLocalXid();
+ }
+}
+
+TxnHandleImpl::TxnHandleImpl(const std::string& xid,
+ qpid::broker::TxnBuffer* tb) :
+ m_xid(xid),
+ m_tpcFlag(!xid.empty()),
+ m_asyncOpCnt(0UL),
+ m_txnBuffer(tb)
+{
+ if (m_xid.empty()) {
+ createLocalXid();
}
}
@@ -58,4 +86,33 @@ TxnHandleImpl::is2pc() const
return m_tpcFlag;
}
+void
+TxnHandleImpl::incrOpCnt()
+{
+ ++m_asyncOpCnt;
+}
+
+void
+TxnHandleImpl::decrOpCnt()
+{
+ if (m_asyncOpCnt == 0UL) {
+ throw qpid::Exception("Transaction async operation count underflow");
+ }
+ if (--m_asyncOpCnt == 0UL && m_txnBuffer) {
+ m_txnBuffer->asyncLocalCommit();
+ }
+}
+
+// private
+void
+TxnHandleImpl::createLocalXid()
+{
+ uuid_t uuid;
+ ::uuid_generate_random(uuid);
+ char uuidStr[37]; // 36-char uuid + trailing '\0'
+ ::uuid_unparse(uuid, uuidStr);
+ m_xid.assign(uuidStr);
+//std::cout << "TTT TxnHandleImpl::createLocalXid(): Local XID created: \"" << m_xid << "\"" << std::endl << std::flush;
+}
+
}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
index b28eb0cd4b..e357791508 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
@@ -24,23 +24,45 @@
#ifndef qpid_asyncStore_TxnHandleImpl_h_
#define qpid_asyncStore_TxnHandleImpl_h_
+#include "AtomicCounter.h"
+
#include "qpid/RefCounted.h"
+#include <stdint.h> // uint32_t
#include <string>
namespace qpid {
+
+namespace broker {
+class TxnBuffer;
+}
+
namespace asyncStore {
class TxnHandleImpl : public virtual qpid::RefCounted
{
public:
- TxnHandleImpl(const std::string& xid = std::string());
+ TxnHandleImpl();
+ TxnHandleImpl(qpid::broker::TxnBuffer* tb);
+ TxnHandleImpl(const std::string& xid);
+ TxnHandleImpl(const std::string& xid, qpid::broker::TxnBuffer* tb);
virtual ~TxnHandleImpl();
const std::string& getXid() const;
bool is2pc() const;
+
+ void submitPrepare();
+ void submitCommit();
+ void submitAbort();
+
+ void incrOpCnt();
+ void decrOpCnt();
private:
std::string m_xid;
bool m_tpcFlag;
+ AsyncOpCounter m_asyncOpCnt;
+ qpid::broker::TxnBuffer* const m_txnBuffer;
+
+ void createLocalXid();
};
}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp
index ab391146f9..9e3298bb4e 100644
--- a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp
+++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp
@@ -49,7 +49,6 @@ AsyncResultQueueImpl::submit(boost::shared_ptr<AsyncResultHandle> arh)
AsyncResultQueueImpl::ResultQueue::Batch::const_iterator
AsyncResultQueueImpl::handle(const ResultQueue::Batch& e)
{
-
for (ResultQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
//std::cout << "<== AsyncResultQueueImpl::handle() errNo=" << (*i)->getErrNo() << " errMsg=\"" << (*i)->getErrMsg() << "\"" << std::endl << std::flush;
if ((*i)->isValid()) {
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h
index 7e2ee81620..2f73ec8f3e 100644
--- a/cpp/src/qpid/broker/AsyncStore.h
+++ b/cpp/src/qpid/broker/AsyncStore.h
@@ -37,7 +37,7 @@ class AsyncResultHandle;
// Broker to subclass as a pollable queue
class AsyncResultQueue {
public:
- virtual ~AsyncResultQueue();
+ virtual ~AsyncResultQueue() {}
// TODO: Remove boost::shared_ptr<> from this interface
virtual void submit(boost::shared_ptr<AsyncResultHandle>) = 0;
};
@@ -45,14 +45,14 @@ public:
// Subclass this for specific contexts
class BrokerAsyncContext {
public:
- virtual ~BrokerAsyncContext();
+ virtual ~BrokerAsyncContext() {}
virtual AsyncResultQueue* getAsyncResultQueue() const = 0;
virtual void invokeCallback(const AsyncResultHandle* const) const = 0;
};
class DataSource {
public:
- virtual ~DataSource();
+ virtual ~DataSource() {}
virtual uint64_t getSize() = 0;
virtual void write(char* target) = 0;
};
@@ -65,13 +65,28 @@ class EnqueueHandle;
class EventHandle;
class MessageHandle;
class QueueHandle;
+class TxnBuffer;
class TxnHandle;
+class AsyncTransactionalStore {
+public:
+ virtual ~AsyncTransactionalStore() {}
+
+ virtual TxnHandle createTxnHandle() = 0;
+ virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0;
+ virtual TxnHandle createTxnHandle(const std::string& xid) = 0;
+ virtual TxnHandle createTxnHandle(const std::string& xid, TxnBuffer* tb) = 0;
+
+ // TODO: Remove boost::shared_ptr<> from this interface
+ virtual void submitPrepare(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only
+ virtual void submitCommit(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
+};
// Subclassed by store:
-class AsyncStore {
+class AsyncStore : public AsyncTransactionalStore {
public:
- virtual ~AsyncStore();
+ virtual ~AsyncStore() {}
// --- Factory methods for creating handles ---
@@ -80,16 +95,11 @@ public:
virtual EventHandle createEventHandle(QueueHandle&, const std::string& key=std::string()) = 0;
virtual MessageHandle createMessageHandle(const DataSource* const) = 0;
virtual QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts) = 0;
- virtual TxnHandle createTxnHandle(const std::string& xid=std::string()) = 0; // Distr. txns must supply xid
// --- Store async interface ---
// TODO: Remove boost::shared_ptr<> from this interface
- virtual void submitPrepare(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only
- virtual void submitCommit(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
- virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
-
virtual void submitCreate(ConfigHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0;
virtual void submitDestroy(ConfigHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp
new file mode 100644
index 0000000000..e8abe99dab
--- /dev/null
+++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TransactionAsyncContext.cpp
+ */
+
+#include "TxnAsyncContext.h"
+
+#include <cassert>
+
+namespace qpid {
+namespace broker {
+
+TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb,
+ TxnHandle& th,
+ const qpid::asyncStore::AsyncOperation::opCode op,
+ qpid::broker::AsyncResultCallback rcb,
+ qpid::broker::AsyncResultQueue* const arq):
+ m_tb(tb),
+ m_th(th),
+ m_op(op),
+ m_rcb(rcb),
+ m_arq(arq)
+{
+ assert(m_th.isValid());
+}
+
+TxnAsyncContext::~TxnAsyncContext()
+{}
+
+TxnBuffer*
+TxnAsyncContext::getTxnBuffer() const
+{
+ return m_tb;
+}
+
+qpid::asyncStore::AsyncOperation::opCode
+TxnAsyncContext::getOpCode() const
+{
+ return m_op;
+}
+
+const char*
+TxnAsyncContext::getOpStr() const
+{
+ return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
+}
+
+TxnHandle
+TxnAsyncContext::getTransactionContext() const
+{
+ return m_th;
+}
+
+AsyncResultQueue*
+TxnAsyncContext::getAsyncResultQueue() const
+{
+ return m_arq;
+}
+
+void
+TxnAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const
+{
+ if (m_rcb) {
+ m_rcb(arh);
+ }
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h
index 186a8141cf..9bdd8f7188 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h
+++ b/cpp/src/qpid/broker/TxnAsyncContext.h
@@ -18,39 +18,48 @@
*/
/**
- * \file TransactionAsyncContext.h
+ * \file TxnAsyncContext.h
*/
-#ifndef tests_storePerftools_asyncPerf_TransactionAsyncContext_h_
-#define tests_storePerftools_asyncPerf_TransactionAsyncContext_h_
+#ifndef qpid_broker_TxnAsyncContext_h_
+#define qpid_broker_TxnAsyncContext_h_
+
+#include "AsyncStore.h" // qpid::broker::BrokerAsyncContext
+#include "TxnHandle.h"
#include "qpid/asyncStore/AsyncOperation.h"
-#include "qpid/broker/AsyncStore.h" // qpid::broker::BrokerAsyncContext
#include <boost/shared_ptr.hpp>
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-class SimpleTransactionContext;
+namespace qpid {
+namespace broker {
-class TransactionAsyncContext: public qpid::broker::BrokerAsyncContext
+class TxnAsyncContext: public BrokerAsyncContext
{
public:
- TransactionAsyncContext(boost::shared_ptr<SimpleTransactionContext> tc,
- const qpid::asyncStore::AsyncOperation::opCode op);
- virtual ~TransactionAsyncContext();
+ TxnAsyncContext(TxnBuffer* const tb,
+ TxnHandle& th,
+ const qpid::asyncStore::AsyncOperation::opCode op,
+ qpid::broker::AsyncResultCallback rcb,
+ qpid::broker::AsyncResultQueue* const arq);
+ virtual ~TxnAsyncContext();
+ TxnBuffer* getTxnBuffer() const;
qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
const char* getOpStr() const;
- boost::shared_ptr<SimpleTransactionContext> getTransactionContext() const;
- void destroy();
+ TxnHandle getTransactionContext() const;
+
+ // --- Interface BrokerAsyncContext ---
+ AsyncResultQueue* getAsyncResultQueue() const;
+ void invokeCallback(const AsyncResultHandle* const) const;
private:
- boost::shared_ptr<SimpleTransactionContext> m_tc;
+ TxnBuffer* const m_tb;
+ TxnHandle m_th;
const qpid::asyncStore::AsyncOperation::opCode m_op;
+ AsyncResultCallback m_rcb;
+ AsyncResultQueue* const m_arq;
};
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
-#endif // tests_storePerftools_asyncPerf_TransactionAsyncContext_h_
+#endif // qpid_broker_TxnAsyncContext_h_
diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp
new file mode 100644
index 0000000000..2af8bd4da3
--- /dev/null
+++ b/cpp/src/qpid/broker/TxnBuffer.cpp
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TxnBuffer.cpp
+ */
+
+#include "TxnBuffer.h"
+
+#include "AsyncResultHandle.h"
+#include "AsyncStore.h"
+#include "TxnAsyncContext.h"
+#include "TxnOp.h"
+
+#include "qpid/Exception.h"
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+TxnBuffer::TxnBuffer(AsyncResultQueue& arq) :
+ m_store(0),
+ m_resultQueue(arq),
+ m_state(NONE)
+{}
+
+TxnBuffer::~TxnBuffer()
+{}
+
+void
+TxnBuffer::enlist(boost::shared_ptr<TxnOp> op)
+{
+//std::cout << "TTT TxnBuffer::enlist" << std::endl << std::flush;
+ m_ops.push_back(op);
+}
+
+bool
+TxnBuffer::prepare(TxnHandle& th)
+{
+//std::cout << "TTT TxnBuffer::prepare" << std::endl << std::flush;
+ for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ if (!(*i)->prepare(th)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void
+TxnBuffer::commit()
+{
+//std::cout << "TTT TxnBuffer::commit" << std::endl << std::flush;
+ for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ (*i)->commit();
+ }
+ m_ops.clear();
+}
+
+void
+TxnBuffer::rollback()
+{
+//std::cout << "TTT TxnBuffer::rollback" << std::endl << std::flush;
+ for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ (*i)->rollback();
+ }
+ m_ops.clear();
+}
+
+bool
+TxnBuffer::commitLocal(AsyncTransactionalStore* const store)
+{
+//std::cout << "TTT TxnBuffer::commitLocal" << std::endl << std::flush;
+ if (store) {
+ try {
+ m_store = store;
+ asyncLocalCommit();
+ } catch (std::exception& e) {
+ std::cerr << "Commit failed: " << e.what() << std::endl;
+ } catch (...) {
+ std::cerr << "Commit failed (unknown exception)" << std::endl;
+ }
+ }
+ return false;
+}
+
+// static
+void
+TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh)
+{
+ if (arh) {
+ boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ tac->getTxnBuffer()->asyncLocalAbort();
+ std::cerr << "Transaction xid=\"" << tac->getTransactionContext().getXid() << "\": Operation " << tac->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ tac->getTxnBuffer()->asyncLocalAbort();
+ } else {
+//std::cout << "TTT TxnBuffer::handleAsyncResult() op=" << tac->getOpStr() << std::endl << std::flush;
+ if (tac->getOpCode() == qpid::asyncStore::AsyncOperation::TXN_ABORT) {
+ tac->getTxnBuffer()->asyncLocalAbort();
+ } else {
+ tac->getTxnBuffer()->asyncLocalCommit();
+ }
+ }
+ }
+}
+
+void
+TxnBuffer::asyncLocalCommit()
+{
+ assert(m_store != 0);
+ switch(m_state) {
+ case NONE:
+//std::cout << "TTT TxnBuffer::asyncLocalCommit: NONE->PREPARE" << std::endl << std::flush;
+ m_state = PREPARE;
+ m_txnHandle = m_store->createTxnHandle(this);
+ prepare(m_txnHandle);
+ break;
+ case PREPARE:
+//std::cout << "TTT TxnBuffer::asyncLocalCommit: PREPARE->COMMIT" << std::endl << std::flush;
+ m_state = COMMIT;
+ {
+ boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
+ m_txnHandle,
+ qpid::asyncStore::AsyncOperation::TXN_COMMIT,
+ &handleAsyncResult,
+ &m_resultQueue));
+ m_store->submitCommit(m_txnHandle, tac);
+ }
+ break;
+ case COMMIT:
+//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMMIT->COMPLETE" << std::endl << std::flush;
+ commit();
+ m_state = COMPLETE;
+ //delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
+ break;
+ default: ;
+//std::cout << "TTT TxnBuffer:asyncLocalCommit: Unexpected state " << m_state << std::endl << std::flush;
+ }
+}
+
+void
+TxnBuffer::asyncLocalAbort()
+{
+ assert(m_store != 0);
+ switch (m_state) {
+ case NONE:
+ case PREPARE:
+ case COMMIT:
+//std::cout << "TTT TxnBuffer::asyncRollback: xxx->ROLLBACK" << std::endl << std::flush;
+ m_state = ROLLBACK;
+ {
+ boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
+ m_txnHandle,
+ qpid::asyncStore::AsyncOperation::TXN_ABORT,
+ &handleAsyncResult,
+ &m_resultQueue));
+ m_store->submitCommit(m_txnHandle, tac);
+ }
+ break;
+ case ROLLBACK:
+//std::cout << "TTT TxnBuffer:asyncRollback: ROLLBACK->COMPLETE" << std::endl << std::flush;
+ rollback();
+ m_state = COMPLETE;
+ //delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
+ default: ;
+//std::cout << "TTT TxnBuffer:asyncRollback: Unexpected state " << m_state << std::endl << std::flush;
+ }
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h
new file mode 100644
index 0000000000..308a91aa03
--- /dev/null
+++ b/cpp/src/qpid/broker/TxnBuffer.h
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TxnBuffer.h
+ */
+
+#ifndef qpid_broker_TxnBuffer_h_
+#define qpid_broker_TxnBuffer_h_
+
+#include "TxnHandle.h"
+
+//#include <boost/enable_shared_from_this.hpp>
+#include <boost/shared_ptr.hpp>
+#include <vector>
+
+namespace qpid {
+namespace broker {
+
+class AsyncResultHandle;
+class AsyncResultQueue;
+class AsyncTransactionalStore;
+class TxnOp;
+
+class TxnBuffer /*: public boost::enable_shared_from_this<TxnBuffer>*/ {
+public:
+ TxnBuffer(AsyncResultQueue& arq);
+ virtual ~TxnBuffer();
+
+ void enlist(boost::shared_ptr<TxnOp> op);
+ bool prepare(TxnHandle& th);
+ void commit();
+ void rollback();
+ bool commitLocal(AsyncTransactionalStore* const store);
+
+ // --- Async operations ---
+ static void handleAsyncResult(const AsyncResultHandle* const arh);
+ void asyncLocalCommit();
+ void asyncLocalAbort();
+
+private:
+ std::vector<boost::shared_ptr<TxnOp> > m_ops;
+ TxnHandle m_txnHandle;
+ AsyncTransactionalStore* m_store;
+ AsyncResultQueue& m_resultQueue;
+
+ typedef enum {NONE = 0, PREPARE, COMMIT, ROLLBACK, COMPLETE} e_txnState;
+ e_txnState m_state;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_TxnBuffer_h_
diff --git a/cpp/src/qpid/broker/TxnHandle.cpp b/cpp/src/qpid/broker/TxnHandle.cpp
index 1e6cd2ac6f..07d46b4235 100644
--- a/cpp/src/qpid/broker/TxnHandle.cpp
+++ b/cpp/src/qpid/broker/TxnHandle.cpp
@@ -69,4 +69,16 @@ TxnHandle::is2pc() const
return impl->is2pc();
}
+void
+TxnHandle::incrOpCnt()
+{
+ impl->incrOpCnt();
+}
+
+void
+TxnHandle::decrOpCnt()
+{
+ impl->decrOpCnt();
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TxnHandle.h b/cpp/src/qpid/broker/TxnHandle.h
index 34a9c69434..8bed14e16a 100644
--- a/cpp/src/qpid/broker/TxnHandle.h
+++ b/cpp/src/qpid/broker/TxnHandle.h
@@ -44,6 +44,8 @@ public:
// TxnHandleImpl methods
const std::string& getXid() const;
bool is2pc() const;
+ void incrOpCnt();
+ void decrOpCnt();
private:
friend class qpid::messaging::PrivateImplRef<TxnHandle>;
diff --git a/cpp/src/qpid/broker/AsyncStore.cpp b/cpp/src/qpid/broker/TxnOp.h
index 10cb3d27cf..e98429535e 100644
--- a/cpp/src/qpid/broker/AsyncStore.cpp
+++ b/cpp/src/qpid/broker/TxnOp.h
@@ -17,21 +17,24 @@
* under the License.
*/
-#include "AsyncStore.h"
+/**
+ * \file TxnOp.h
+ */
+
+#ifndef qpid_broker_TxnOp_h_
+#define qpid_broker_TxnOp_h_
namespace qpid {
namespace broker {
-AsyncResultQueue::~AsyncResultQueue()
-{}
-
-BrokerAsyncContext::~BrokerAsyncContext()
-{}
-
-DataSource::~DataSource()
-{}
-
-AsyncStore::~AsyncStore()
-{}
+class TxnOp{
+public:
+ virtual ~TxnOp() {}
+ virtual bool prepare(TxnHandle& th) throw() = 0;
+ virtual void commit() throw() = 0;
+ virtual void rollback() throw() = 0;
+};
}} // namespace qpid::broker
+
+#endif // qpid_broker_TxnOp_h_
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index 1d04828a49..fbe4cfbd7d 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -379,6 +379,7 @@ endif (UNIX)
# Async store perf test (asyncPerf)
set (asyncStorePerf_SOURCES
+ storePerftools/asyncPerf/Deliverable.cpp
storePerftools/asyncPerf/MessageAsyncContext.cpp
storePerftools/asyncPerf/MessageConsumer.cpp
storePerftools/asyncPerf/MessageDeque.cpp
@@ -388,10 +389,9 @@ set (asyncStorePerf_SOURCES
storePerftools/asyncPerf/QueuedMessage.cpp
storePerftools/asyncPerf/SimplePersistableMessage.cpp
storePerftools/asyncPerf/SimplePersistableQueue.cpp
- storePerftools/asyncPerf/SimpleTransactionContext.cpp
storePerftools/asyncPerf/TestOptions.cpp
storePerftools/asyncPerf/TestResult.cpp
- storePerftools/asyncPerf/TransactionAsyncContext.cpp
+ storePerftools/asyncPerf/TxnPublish.cpp
storePerftools/common/Parameters.cpp
storePerftools/common/PerftoolError.cpp
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp
index f150a34027..9da7e348e0 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp
@@ -18,50 +18,26 @@
*/
/**
- * \file TransactionAsyncContext.cpp
+ * \file Deliverable.cpp
*/
-#include "TransactionAsyncContext.h"
-
-#include <cassert>
+#include "Deliverable.h"
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-TransactionAsyncContext::TransactionAsyncContext(boost::shared_ptr<SimpleTransactionContext> tc,
- const qpid::asyncStore::AsyncOperation::opCode op):
- m_tc(tc),
- m_op(op)
-{
- assert(m_tc.get() != 0);
-}
-
-TransactionAsyncContext::~TransactionAsyncContext()
+Deliverable::Deliverable() :
+ m_delivered(false)
{}
-qpid::asyncStore::AsyncOperation::opCode
-TransactionAsyncContext::getOpCode() const
-{
- return m_op;
-}
-
-const char*
-TransactionAsyncContext::getOpStr() const
-{
- return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
-}
-
-boost::shared_ptr<SimpleTransactionContext>
-TransactionAsyncContext::getTransactionContext() const
-{
- return m_tc;
-}
+Deliverable::~Deliverable()
+{}
-void
-TransactionAsyncContext::destroy()
+bool
+Deliverable::isDelivered() const
{
- delete this;
+ return m_delivered;
}
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h
new file mode 100644
index 0000000000..57e130eeba
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Deliverable.h
+ */
+
+#ifndef tests_storePerftools_asyncPerf_Deliverable_h_
+#define tests_storePerftools_asyncPerf_Deliverable_h_
+
+#include <boost/shared_ptr.hpp>
+#include <stdint.h> // uint64_t
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class SimplePersistableMessage;
+class SimplePersistableQueue;
+
+class Deliverable
+{
+public:
+ Deliverable();
+ virtual ~Deliverable();
+
+ virtual uint64_t contentSize() = 0;
+ virtual void deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue) = 0;
+ virtual SimplePersistableMessage& getMessage() = 0;
+ virtual bool isDelivered() const;
+
+protected:
+ bool m_delivered;
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerftools_asyncPerf_Deliverable_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 0a2fd4f333..9b015fc428 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -32,8 +32,6 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class SimpleTransactionContext;
-
MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
boost::shared_ptr<SimplePersistableQueue> queue) :
m_perfTestParams(perfTestParams),
@@ -54,7 +52,7 @@ MessageConsumer::runConsumers()
::usleep(1000); // TODO - replace this poller with condition variable
}
}
- return 0;
+ return reinterpret_cast<void*>(0);
}
//static
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
index 5530513e12..008ddf33e7 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
@@ -26,6 +26,10 @@
#include "SimplePersistableMessage.h"
#include "SimplePersistableQueue.h"
#include "TestOptions.h"
+#include "TxnPublish.h"
+
+#include "qpid/asyncStore/AsyncStoreImpl.h"
+#include "qpid/broker/TxnBuffer.h"
#include <stdint.h> // uint32_t
@@ -33,15 +37,15 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class SimpleTransactionContext;
-
MessageProducer::MessageProducer(const TestOptions& perfTestParams,
const char* msgData,
qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncResultQueue& arq,
boost::shared_ptr<SimplePersistableQueue> queue) :
m_perfTestParams(perfTestParams),
m_msgData(msgData),
m_store(store),
+ m_resultQueue(arq),
m_queue(queue)
{}
@@ -51,11 +55,46 @@ MessageProducer::~MessageProducer()
void*
MessageProducer::runProducers()
{
+ const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U;
+ uint16_t txnCnt = 0U;
+ qpid::broker::TxnBuffer* tb = 0;
+ if (useTxns) {
+ tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ }
for (uint32_t numMsgs=0; numMsgs<m_perfTestParams.m_numMsgs; ++numMsgs) {
boost::intrusive_ptr<SimplePersistableMessage> msg(new SimplePersistableMessage(m_msgData, m_perfTestParams.m_msgSize, m_store));
- m_queue->deliver(msg);
+ if (useTxns) {
+ boost::shared_ptr<TxnPublish> op(new TxnPublish(msg));
+ op->deliverTo(m_queue);
+ tb->enlist(op);
+ if (++txnCnt >= m_perfTestParams.m_enqTxnBlockSize) {
+ if (m_perfTestParams.m_durable) {
+ tb->commitLocal(m_store);
+
+ // TxnBuffer instance tb carries async state that precludes it being re-used for the next
+ // transaction until the current commit cycle completes. So use another instance. This
+ // instance should auto-delete when the async commit cycle completes.
+ if (numMsgs<m_perfTestParams.m_numMsgs) {
+ //tb = boost::shared_ptr<qpid::broker::TxnBuffer>(new qpid::broker::TxnBuffer(m_resultQueue));
+ tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ }
+ } else {
+ tb->commit();
+ }
+ txnCnt = 0U;
+ }
+ } else {
+ m_queue->deliver(msg);
+ }
+ }
+ if (txnCnt) {
+ if (m_perfTestParams.m_durable) {
+ tb->commitLocal(m_store);
+ } else {
+ tb->commit();
+ }
}
- return 0;
+ return reinterpret_cast<void*>(0);
}
//static
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
index 746247e849..55504164ef 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
@@ -28,9 +28,10 @@
namespace qpid {
namespace asyncStore {
-
class AsyncStoreImpl;
-
+}
+namespace broker {
+class AsyncResultQueue;
}}
namespace tests {
@@ -39,6 +40,7 @@ namespace asyncPerf {
class SimplePersistableQueue;
class TestOptions;
+class TxnBuffer;
class MessageProducer
{
@@ -46,6 +48,7 @@ public:
MessageProducer(const TestOptions& perfTestParams,
const char* msgData,
qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncResultQueue& arq,
boost::shared_ptr<SimplePersistableQueue> queue);
virtual ~MessageProducer();
void* runProducers();
@@ -54,6 +57,7 @@ private:
const TestOptions& m_perfTestParams;
const char* m_msgData;
qpid::asyncStore::AsyncStoreImpl* m_store;
+ qpid::broker::AsyncResultQueue& m_resultQueue;
boost::shared_ptr<SimplePersistableQueue> m_queue;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
index fbfebbac2b..7941e761ca 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
@@ -80,7 +80,7 @@ PerfTest::run()
tests::storePerftools::common::ScopedTimer st(m_testResult);
for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) {
- boost::shared_ptr<MessageProducer> mp(new MessageProducer(m_testOpts, m_msgData, m_store, m_queueList[q]));
+ boost::shared_ptr<MessageProducer> mp(new MessageProducer(m_testOpts, m_msgData, m_store, m_resultQueue, m_queueList[q]));
m_producers.push_back(mp);
for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads
boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mp->startProducers,
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
index 4836f9358a..07a80c8a33 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
@@ -31,10 +31,12 @@ namespace storePerftools {
namespace asyncPerf {
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q,
+ qpid::broker::TxnHandle& th,
const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
qpid::broker::AsyncResultQueue* const arq) :
m_q(q),
+ m_th(th),
m_op(op),
m_rcb(rcb),
m_arq(arq)
@@ -44,11 +46,13 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q,
boost::intrusive_ptr<SimplePersistableMessage> msg,
+ qpid::broker::TxnHandle& th,
const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
qpid::broker::AsyncResultQueue* const arq) :
m_q(q),
m_msg(msg),
+ m_th(th),
m_op(op),
m_rcb(rcb),
m_arq(arq)
@@ -84,6 +88,12 @@ QueueAsyncContext::getMessage() const
return m_msg;
}
+qpid::broker::TxnHandle
+QueueAsyncContext::getTxnHandle() const
+{
+ return m_th;
+}
+
qpid::broker::AsyncResultQueue*
QueueAsyncContext::getAsyncResultQueue() const
{
@@ -99,7 +109,9 @@ QueueAsyncContext::getAsyncResultCallback() const
void
QueueAsyncContext::invokeCallback(const qpid::broker::AsyncResultHandle* const arh) const
{
- m_rcb(arh);
+ if (m_rcb) {
+ m_rcb(arh);
+ }
}
void
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
index df45117fe4..b4d16fe615 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
@@ -26,6 +26,7 @@
#include "qpid/asyncStore/AsyncOperation.h"
#include "qpid/broker/AsyncStore.h"
+#include "qpid/broker/TxnHandle.h"
#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
@@ -42,11 +43,13 @@ class QueueAsyncContext: public qpid::broker::BrokerAsyncContext
{
public:
QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q,
+ qpid::broker::TxnHandle& th,
const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
qpid::broker::AsyncResultQueue* const arq);
QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q,
boost::intrusive_ptr<SimplePersistableMessage> msg,
+ qpid::broker::TxnHandle& th,
const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
qpid::broker::AsyncResultQueue* const arq);
@@ -55,6 +58,7 @@ public:
const char* getOpStr() const;
boost::shared_ptr<SimplePersistableQueue> getQueue() const;
boost::intrusive_ptr<SimplePersistableMessage> getMessage() const;
+ qpid::broker::TxnHandle getTxnHandle() const;
qpid::broker::AsyncResultQueue* getAsyncResultQueue() const;
qpid::broker::AsyncResultCallback getAsyncResultCallback() const;
void invokeCallback(const qpid::broker::AsyncResultHandle* const arh) const;
@@ -63,6 +67,7 @@ public:
private:
boost::shared_ptr<SimplePersistableQueue> m_q;
boost::intrusive_ptr<SimplePersistableMessage> m_msg;
+ qpid::broker::TxnHandle m_th;
const qpid::asyncStore::AsyncOperation::opCode m_op;
qpid::broker::AsyncResultCallback m_rcb;
qpid::broker::AsyncResultQueue* const m_arq;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
index dd4de355b3..8ee858587b 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
@@ -79,4 +79,22 @@ QueuedMessage::enqHandle()
return m_enqHandle;
}
+void
+QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th)
+{
+ m_queue->enqueue(th, *this);
+}
+
+void
+QueuedMessage::commitEnqueue()
+{
+ m_queue->process(m_msg);
+}
+
+void
+QueuedMessage::abortEnqueue()
+{
+ m_queue->enqueueAborted(m_msg);
+}
+
}}} // namespace tests::storePerfTools
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
index 29e7f792cd..896c53ab5b 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
@@ -28,6 +28,13 @@
#include <boost/intrusive_ptr.hpp>
+namespace qpid {
+namespace broker {
+
+class TxnHandle;
+
+}}
+
namespace tests {
namespace storePerftools {
namespace asyncPerf {
@@ -48,6 +55,11 @@ public:
const qpid::broker::EnqueueHandle& enqHandle() const;
qpid::broker::EnqueueHandle& enqHandle();
+ // -- Transaction handling ---
+ void prepareEnqueue(qpid::broker::TxnHandle& th);
+ void commitEnqueue();
+ void abortEnqueue();
+
private:
SimplePersistableQueue* m_queue;
boost::intrusive_ptr<SimplePersistableMessage> m_msg;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp
index 58aacabdcd..a9771c1442 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp
@@ -52,6 +52,12 @@ SimplePersistableMessage::getHandle()
return m_msgHandle;
}
+uint64_t
+SimplePersistableMessage::contentSize() const
+{
+ return static_cast<uint64_t>(m_msg.size());
+}
+
void
SimplePersistableMessage::setPersistenceId(uint64_t id) const
{
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h
index f6258822ea..7ac54ddea1 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h
@@ -51,20 +51,21 @@ public:
virtual ~SimplePersistableMessage();
const qpid::broker::MessageHandle& getHandle() const;
qpid::broker::MessageHandle& getHandle();
+ uint64_t contentSize() const;
- // Interface Persistable
+ // --- Interface Persistable ---
virtual void setPersistenceId(uint64_t id) const;
virtual uint64_t getPersistenceId() const;
virtual void encode(qpid::framing::Buffer& buffer) const;
virtual uint32_t encodedSize() const;
- // Interface PersistableMessage
+ // --- Interface PersistableMessage ---
virtual void allDequeuesComplete();
virtual uint32_t encodedHeaderSize() const;
virtual bool isPersistent() const;
- // Interface DataStore
- virtual uint64_t getSize();
+ // --- Interface DataSource ---
+ virtual uint64_t getSize(); // <- same as encodedSize()?
virtual void write(char* target);
private:
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp
index b34357c144..1a3eae4b43 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp
@@ -25,17 +25,21 @@
#include "MessageDeque.h"
#include "SimplePersistableMessage.h"
-#include "SimpleTransactionContext.h"
#include "QueueAsyncContext.h"
#include "QueuedMessage.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
#include "qpid/broker/AsyncResultHandle.h"
+#include "qpid/broker/TxnHandle.h"
namespace tests {
namespace storePerftools {
namespace asyncPerf {
+//static
+qpid::broker::TxnHandle SimplePersistableQueue::s_nullTxnHandle; // used for non-txn operations
+
+
SimplePersistableQueue::SimplePersistableQueue(const std::string& name,
const qpid::framing::FieldTable& /*args*/,
qpid::asyncStore::AsyncStoreImpl* store,
@@ -127,6 +131,7 @@ SimplePersistableQueue::asyncCreate()
{
if (m_store) {
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ s_nullTxnHandle,
qpid::asyncStore::AsyncOperation::QUEUE_CREATE,
&handleAsyncResult,
&m_resultQueue));
@@ -144,6 +149,7 @@ SimplePersistableQueue::asyncDestroy(const bool deleteQueue)
if (m_store) {
if (deleteQueue) {
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ s_nullTxnHandle,
qpid::asyncStore::AsyncOperation::QUEUE_DESTROY,
&handleAsyncResult,
&m_resultQueue));
@@ -159,7 +165,7 @@ void
SimplePersistableQueue::deliver(boost::intrusive_ptr<SimplePersistableMessage> msg)
{
QueuedMessage qm(this, msg);
- enqueue((SimpleTransactionContext*)0, qm);
+ enqueue(s_nullTxnHandle, qm);
push(qm);
}
@@ -168,13 +174,13 @@ SimplePersistableQueue::dispatch()
{
QueuedMessage qm;
if (m_messages->consume(qm)) {
- return dequeue((SimpleTransactionContext*)0, qm);
+ return dequeue(s_nullTxnHandle, qm);
}
return false;
}
bool
-SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt,
+SimplePersistableQueue::enqueue(qpid::broker::TxnHandle& th,
QueuedMessage& qm)
{
ScopedUse u(m_barrier);
@@ -183,13 +189,13 @@ SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt,
}
if (qm.payload()->isPersistent() && m_store) {
qm.payload()->enqueueAsync(shared_from_this(), m_store);
- return asyncEnqueue(ctxt, qm);
+ return asyncEnqueue(th, qm);
}
return false;
}
bool
-SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt,
+SimplePersistableQueue::dequeue(qpid::broker::TxnHandle& th,
QueuedMessage& qm)
{
ScopedUse u(m_barrier);
@@ -198,12 +204,23 @@ SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt,
}
if (qm.payload()->isPersistent() && m_store) {
qm.payload()->dequeueAsync(shared_from_this(), m_store);
- return asyncDequeue(ctxt, qm);
+ return asyncDequeue(th, qm);
}
return true;
}
void
+SimplePersistableQueue::process(boost::intrusive_ptr<SimplePersistableMessage> msg)
+{
+ QueuedMessage qm(this, msg);
+ push(qm);
+}
+
+void
+SimplePersistableQueue::enqueueAborted(boost::intrusive_ptr<SimplePersistableMessage> /*msg*/)
+{}
+
+void
SimplePersistableQueue::encode(qpid::framing::Buffer& buffer) const
{
buffer.putShortString(m_name);
@@ -326,38 +343,46 @@ SimplePersistableQueue::push(QueuedMessage& qm,
// private
bool
-SimplePersistableQueue::asyncEnqueue(SimpleTransactionContext* txn,
+SimplePersistableQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
QueuedMessage& qm)
{
qm.payload()->setPersistenceId(m_store->getNextRid());
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
qm.payload(),
+ th,
qpid::asyncStore::AsyncOperation::MSG_ENQUEUE,
&handleAsyncResult,
&m_resultQueue));
m_store->submitEnqueue(qm.enqHandle(),
- txn->getHandle(),
+ th,
qac);
++m_asyncOpCounter;
+ if (th.isValid()) {
+ th.incrOpCnt();
+ }
return true;
}
// private
bool
-SimplePersistableQueue::asyncDequeue(SimpleTransactionContext* txn,
+SimplePersistableQueue::asyncDequeue(qpid::broker::TxnHandle& th,
QueuedMessage& qm)
{
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
qm.payload(),
+ th,
qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
&handleAsyncResult,
&m_resultQueue));
m_store->submitDequeue(qm.enqHandle(),
- txn->getHandle(),
+ th,
qac);
++m_asyncOpCounter;
+ if (th.isValid()) {
+ th.incrOpCnt();
+ }
return true;
}
@@ -407,6 +432,11 @@ SimplePersistableQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContex
//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush;
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
+
+ qpid::broker::TxnHandle th = qc->getTxnHandle();
+ if (th.isValid()) { // transactional enqueue
+ th.decrOpCnt();
+ }
}
// private
@@ -416,6 +446,11 @@ SimplePersistableQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContex
//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush;
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
+
+ qpid::broker::TxnHandle th = qc->getTxnHandle();
+ if (th.isValid()) { // transactional enqueue
+ th.decrOpCnt();
+ }
}
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h
index 3bc0972ede..34ef9407ac 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h
@@ -24,8 +24,7 @@
#ifndef tests_storePerftools_asyncPerf_SimplePersistableQueue_h_
#define tests_storePerftools_asyncPerf_SimplePersistableQueue_h_
-#include "AtomicCounter.h" // AsyncOpCounter
-
+#include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter
#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/QueueHandle.h"
@@ -41,6 +40,7 @@ class AsyncStoreImpl;
}
namespace broker {
class AsyncResultQueue;
+class TxnHandle;
}
namespace framing {
class FieldTable;
@@ -52,7 +52,6 @@ namespace asyncPerf {
class Messages;
class SimplePersistableMessage;
-class SimpleTransactionContext;
class QueueAsyncContext;
class QueuedMessage;
@@ -78,10 +77,12 @@ public:
// --- Methods in msg handling path from qpid::Queue ---
void deliver(boost::intrusive_ptr<SimplePersistableMessage> msg);
bool dispatch(); // similar to qpid::broker::Queue::distpatch(Consumer&) but without Consumer param
- bool enqueue(SimpleTransactionContext* ctxt,
+ bool enqueue(qpid::broker::TxnHandle& th,
QueuedMessage& qm);
- bool dequeue(SimpleTransactionContext* ctxt,
+ bool dequeue(qpid::broker::TxnHandle& th,
QueuedMessage& qm);
+ void process(boost::intrusive_ptr<SimplePersistableMessage> msg);
+ void enqueueAborted(boost::intrusive_ptr<SimplePersistableMessage> msg);
// --- Interface qpid::broker::Persistable ---
virtual void encode(qpid::framing::Buffer& buffer) const;
@@ -99,10 +100,12 @@ public:
virtual void write(char* target);
private:
+ static qpid::broker::TxnHandle s_nullTxnHandle; // used for non-txn operations
+
const std::string m_name;
qpid::asyncStore::AsyncStoreImpl* m_store;
qpid::broker::AsyncResultQueue& m_resultQueue;
- AsyncOpCounter m_asyncOpCounter;
+ qpid::asyncStore::AsyncOpCounter m_asyncOpCounter;
mutable uint64_t m_persistenceId;
std::string m_persistableData;
qpid::broker::QueueHandle m_queueHandle;
@@ -133,9 +136,9 @@ private:
bool isRecovery = false);
// -- Async ops ---
- bool asyncEnqueue(SimpleTransactionContext* txn,
+ bool asyncEnqueue(qpid::broker::TxnHandle& th,
QueuedMessage& qm);
- bool asyncDequeue(SimpleTransactionContext* txn,
+ bool asyncDequeue(qpid::broker::TxnHandle& th,
QueuedMessage& qm);
// --- Async op counter ---
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp
deleted file mode 100644
index 2aea14bc21..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file SimpleTransactionContext.cpp
- */
-
-#include "SimpleTransactionContext.h"
-
-#include "TransactionAsyncContext.h"
-
-#include "qpid/asyncStore/AsyncStoreImpl.h"
-
-#include <uuid/uuid.h>
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-SimpleTransactionContext::SimpleTransactionContext(const std::string& xid) :
- qpid::broker::TransactionContext(),
- m_xid(xid),
- m_tpcFlag(!xid.empty()),
- m_store(0),
- m_txnHandle(0),
- m_prepared(false),
- m_enqueuedMsgs()
-{
- if (!m_tpcFlag) {
- setLocalXid();
- }
-//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl;
-}
-
-SimpleTransactionContext::SimpleTransactionContext(qpid::asyncStore::AsyncStoreImpl* store,
- const std::string& xid) :
- m_store(store),
- m_prepared(false),
- m_enqueuedMsgs()
-{
-//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl;
- if (m_store != 0) {
- m_txnHandle = store->createTxnHandle(xid);
- }
-}
-
-SimpleTransactionContext::~SimpleTransactionContext()
-{}
-
-// static
-/*
-void
-SimpleTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res,
- qpid::broker::BrokerAsyncContext* bc)
-{
- if (bc && res) {
- TransactionAsyncContext* tac = dynamic_cast<TransactionAsyncContext*>(bc);
- if (res->errNo) {
- // TODO: Handle async failure here
- std::cerr << "Transaction xid=\"" << tac->getTransactionContext()->getXid() << "\": Operation " << tac->getOpStr() << ": failure "
- << res->errNo << " (" << res->errMsg << ")" << std::endl;
- } else {
- // Handle async success here
- switch(tac->getOpCode()) {
- case qpid::asyncStore::AsyncOperation::TXN_PREPARE:
- tac->getTransactionContext()->prepareComplete(tac);
- break;
- case qpid::asyncStore::AsyncOperation::TXN_COMMIT:
- tac->getTransactionContext()->commitComplete(tac);
- break;
- case qpid::asyncStore::AsyncOperation::TXN_ABORT:
- tac->getTransactionContext()->abortComplete(tac);
- break;
- default:
- std::ostringstream oss;
- oss << "tests::storePerftools::asyncPerf::SimpleTransactionContext::handleAsyncResult(): Unknown async operation: " << tac->getOpCode();
- throw qpid::Exception(oss.str());
- };
- }
- }
- if (bc) delete bc;
- if (res) delete res;
-}
-*/
-
-const qpid::broker::TxnHandle&
-SimpleTransactionContext::getHandle() const
-{
- return m_txnHandle;
-}
-
-qpid::broker::TxnHandle&
-SimpleTransactionContext::getHandle()
-{
- return m_txnHandle;
-}
-
-bool
-SimpleTransactionContext::is2pc() const
-{
- return m_tpcFlag;
-}
-
-const std::string&
-SimpleTransactionContext::getXid() const
-{
- return m_xid;
-}
-
-void
-SimpleTransactionContext::addEnqueuedMsg(QueuedMessage* qm)
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
- m_enqueuedMsgs.push_back(qm);
-}
-
-void
-SimpleTransactionContext::prepare()
-{
- if (m_tpcFlag) {
- localPrepare();
- m_prepared = true;
- }
- std::ostringstream oss;
- oss << "SimpleTransactionContext::prepare(): xid=\"" << getXid()
- << "\": Transaction Error: called prepare() on local transaction";
- throw qpid::Exception(oss.str());
-}
-
-void
-SimpleTransactionContext::abort()
-{
- // TODO: Check the following XA transaction semantics:
- // Assuming 2PC aborts can occur without a prepare. Do local prepare if not already prepared.
- if (!m_prepared) {
- localPrepare();
- }
- if (m_store != 0) {
-// m_store->submitAbort(m_txnHandle,
-// &handleAsyncResult,
-// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT)));
- }
-//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
-}
-
-void
-SimpleTransactionContext::commit()
-{
- if (is2pc()) {
- if (!m_prepared) {
- std::ostringstream oss;
- oss << "SimpleTransactionContext::abort(): xid=\"" << getXid()
- << "\": Transaction Error: called commit() without prepare() on 2PC transaction";
- throw qpid::Exception(oss.str());
- }
- } else {
- localPrepare();
- }
- if (m_store != 0) {
-// m_store->submitCommit(m_txnHandle,
-// &handleAsyncResult,
-// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT)));
- }
-//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
-}
-
-
-// private
-void
-SimpleTransactionContext::localPrepare()
-{
- if (m_store != 0) {
-// m_store->submitPrepare(m_txnHandle,
-// &handleAsyncResult,
-// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE)));
- }
-//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
-}
-
-// private
-void
-SimpleTransactionContext::setLocalXid()
-{
- uuid_t uuid;
- // TODO: Valgrind warning: Possible race condition in uuid_generate_random() - is it thread-safe, and if not, does it matter?
- // If this race condition affects the randomness of the UUID, then there could be a problem here.
- ::uuid_generate_random(uuid);
- char uuidStr[37]; // 36-char uuid + trailing '\0'
- ::uuid_unparse(uuid, uuidStr);
- m_xid.assign(uuidStr);
-}
-
-// private
-void
-SimpleTransactionContext::prepareComplete(const TransactionAsyncContext* /*tc*/)
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
-// while (!m_enqueuedMsgs.empty()) {
-// m_enqueuedMsgs.front()->clearTransaction();
-// m_enqueuedMsgs.pop_front();
-// }
-//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush;
-// assert(tc->getTransactionContext().get() == this);
-}
-
-
-// private
-void
-SimpleTransactionContext::abortComplete(const TransactionAsyncContext* tc)
-{
-//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush;
- assert(tc->getTransactionContext().get() == this);
-}
-
-
-// private
-void
-SimpleTransactionContext::commitComplete(const TransactionAsyncContext* tc)
-{
-//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush;
- assert(tc->getTransactionContext().get() == this);
-}
-
-}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.h
deleted file mode 100644
index 49fee9c720..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.h
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file SimpleTransactionContext.h
- */
-
-#ifndef tests_storePerftools_asyncPerf_SimpleTransactionContext_h_
-#define tests_storePerftools_asyncPerf_SimpleTransactionContext_h_
-
-#include "qpid/broker/TransactionalStore.h" // qpid::broker::TransactionContext
-#include "qpid/broker/TxnHandle.h"
-#include "qpid/sys/Mutex.h"
-
-#include <deque>
-
-namespace qpid {
-namespace asyncStore {
-class AsyncStoreImpl;
-}
-namespace broker {
-class AsyncResult;
-class BrokerAsyncContext;
-}}
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-class QueuedMessage;
-class TransactionAsyncContext;
-
-class SimpleTransactionContext : public qpid::broker::TransactionContext
-{
-public:
- SimpleTransactionContext(const std::string& xid = std::string());
- SimpleTransactionContext(qpid::asyncStore::AsyncStoreImpl* store,
- const std::string& xid = std::string());
- virtual ~SimpleTransactionContext();
-// static void handleAsyncResult(const qpid::broker::AsyncResult* res,
-// qpid::broker::BrokerAsyncContext* bc);
-
- const qpid::broker::TxnHandle& getHandle() const;
- qpid::broker::TxnHandle& getHandle();
- bool is2pc() const;
- const std::string& getXid() const;
- void addEnqueuedMsg(QueuedMessage* qm);
-
- void prepare();
- void abort();
- void commit();
-
-private:
- std::string m_xid;
- bool m_tpcFlag;
- qpid::asyncStore::AsyncStoreImpl* m_store;
- qpid::broker::TxnHandle m_txnHandle;
- bool m_prepared;
- std::deque<QueuedMessage*> m_enqueuedMsgs;
- qpid::sys::Mutex m_enqueuedMsgsMutex;
-
- void localPrepare();
- void setLocalXid();
-
- // --- Ascnc op completions (called through handleAsyncResult) ---
- void prepareComplete(const TransactionAsyncContext* tc);
- void abortComplete(const TransactionAsyncContext* tc);
- void commitComplete(const TransactionAsyncContext* tc);
-
-};
-
-}}} // namespace tests:storePerftools::asyncPerf
-
-#endif // tests_storePerftools_asyncPerf_SimpleTransactionContext_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
new file mode 100644
index 0000000000..eff7646de6
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TxnPublish.cpp
+ */
+
+#include "SimplePersistableMessage.h"
+#include "SimplePersistableQueue.h" // debug msg
+#include "TxnPublish.h"
+
+#include "QueuedMessage.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+TxnPublish::TxnPublish(boost::intrusive_ptr<SimplePersistableMessage> msg) :
+ m_msg(msg)
+{
+//std::cout << "TTT new TxnPublish" << std::endl << std::flush;
+}
+
+TxnPublish::~TxnPublish()
+{}
+
+bool
+TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
+{
+//std::cout << "TTT TxnPublish::prepare: " << m_queues.size() << " queues" << std::endl << std::flush;
+ try{
+ while (!m_queues.empty()) {
+ m_queues.front()->prepareEnqueue(th);
+ m_prepared.push_back(m_queues.front());
+ m_queues.pop_front();
+ }
+ return true;
+ } catch (const std::exception& e) {
+ std::cerr << "Failed to prepare transaction: " << e.what() << std::endl;
+ } catch (...) {
+ std::cerr << "Failed to prepare transaction: (unknown error)" << std::endl;
+ }
+ return false;
+}
+
+void
+TxnPublish::commit() throw()
+{
+//std::cout << "TTT TxnPublish::commit" << std::endl << std::flush;
+ try {
+ for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
+ (*i)->commitEnqueue();
+ }
+ } catch (const std::exception& e) {
+ std::cerr << "Failed to commit transaction: " << e.what() << std::endl;
+ } catch (...) {
+ std::cerr << "Failed to commit transaction: (unknown error)" << std::endl;
+ }
+}
+
+void
+TxnPublish::rollback() throw()
+{
+//std::cout << "TTT TxnPublish::rollback" << std::endl << std::flush;
+ try {
+ for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
+ (*i)->abortEnqueue();
+ }
+ } catch (const std::exception& e) {
+ std::cerr << "Failed to rollback transaction: " << e.what() << std::endl;
+ } catch (...) {
+ std::cerr << "Failed to rollback transaction: (unknown error)" << std::endl;
+ }
+}
+
+uint64_t
+TxnPublish::contentSize()
+{
+ return m_msg->contentSize();
+}
+
+void
+TxnPublish::deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue)
+{
+//std::cout << "TTT TxnPublish::deliverTo queue=\"" << queue->getName() << "\"" << std::endl << std::flush;
+ boost::shared_ptr<QueuedMessage> qm(new QueuedMessage(queue.get(), m_msg));
+ m_queues.push_back(qm);
+ m_delivered = true;
+}
+
+SimplePersistableMessage&
+TxnPublish::getMessage()
+{
+ return *m_msg;
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
new file mode 100644
index 0000000000..6e97e99349
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TxnPublish.h
+ */
+
+#ifndef tests_storePerftools_asyncPerf_TxnPublish_h_
+#define tests_storePerftools_asyncPerf_TxnPublish_h_
+
+#include "Deliverable.h"
+
+#include "qpid/broker/TxnOp.h"
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+#include <list>
+
+namespace qpid {
+namespace broker {
+
+class TransactionContext;
+
+}}
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class QueuedMessage;
+class SimplePersistableMessage;
+class SimplePersistableQueue;
+
+class TxnPublish : public qpid::broker::TxnOp,
+ public Deliverable
+{
+public:
+ TxnPublish(boost::intrusive_ptr<SimplePersistableMessage> msg);
+ virtual ~TxnPublish();
+
+ // --- Interface TxOp ---
+ bool prepare(qpid::broker::TxnHandle& th) throw();
+ void commit() throw();
+ void rollback() throw();
+
+ // --- Interface Deliverable ---
+ uint64_t contentSize();
+ void deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue);
+ SimplePersistableMessage& getMessage();
+
+private:
+ boost::intrusive_ptr<SimplePersistableMessage> m_msg;
+ std::list<boost::shared_ptr<QueuedMessage> > m_queues;
+ std::list<boost::shared_ptr<QueuedMessage> > m_prepared;
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerftools_asyncPerf_TxnPublish_h_