diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-06-15 19:21:07 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-06-15 19:21:07 +0000 |
commit | 58337ca40df3a57a16cdee9b7f6b4fe0361b0018 (patch) | |
tree | 391ad7ad1ea8cd42a7e9d4890c724b186e00f38b | |
parent | 01174a9e568f11cd5aa4f22aaa914e00ab9fe163 (diff) | |
download | qpid-python-58337ca40df3a57a16cdee9b7f6b4fe0361b0018.tar.gz |
QPID-3858: WIP - async txns for msg publish pathway, but there are some race/thread issues to sort out.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1350745 13f79535-47bb-0310-9956-ffa450edef68
34 files changed, 985 insertions, 469 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 575c78e320..6077bade08 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -1076,10 +1076,6 @@ set (qpidbroker_SOURCES ${qpidbroker_platform_SOURCES} qpid/amqp_0_10/Connection.h qpid/amqp_0_10/Connection.cpp - qpid/broker/AsyncResultHandle.cpp - qpid/broker/AsyncResultHandleImpl.cpp - qpid/broker/AsyncResultQueueImpl.cpp - qpid/broker/AsyncStore.cpp qpid/broker/Broker.cpp qpid/broker/Credit.cpp qpid/broker/Exchange.cpp @@ -1503,12 +1499,15 @@ set (asyncStore_SOURCES qpid/asyncStore/TxnHandleImpl.cpp qpid/broker/AsyncResultHandle.cpp qpid/broker/AsyncResultHandleImpl.cpp + qpid/broker/AsyncResultQueueImpl.cpp qpid/broker/ConfigHandle.cpp qpid/broker/EnqueueHandle.cpp qpid/broker/EventHandle.cpp qpid/broker/IdHandle.cpp qpid/broker/MessageHandle.cpp qpid/broker/QueueHandle.cpp + qpid/broker/TxnAsyncContext.cpp + qpid/broker/TxnBuffer.cpp qpid/broker/TxnHandle.cpp ) diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index a9fc13363a..9135fcc27e 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -24,6 +24,7 @@ #include "AsyncStoreImpl.h" #include "AsyncOperation.h" +#include "TxnHandleImpl.h" #include "qpid/broker/ConfigHandle.h" #include "qpid/broker/EnqueueHandle.h" @@ -62,6 +63,31 @@ void AsyncStoreImpl::initManagement(qpid::broker::Broker* /*broker*/) {} +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle() +{ + return qpid::broker::TxnHandle(new TxnHandleImpl); +} + +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle(qpid::broker::TxnBuffer* tb) +{ + return qpid::broker::TxnHandle(new TxnHandleImpl(tb)); +} + +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle(const std::string& xid) +{ + return qpid::broker::TxnHandle(new TxnHandleImpl(xid)); +} + +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle(const std::string& xid, + qpid::broker::TxnBuffer* tb) +{ + return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tb)); +} + qpid::broker::ConfigHandle AsyncStoreImpl::createConfigHandle() { @@ -96,12 +122,6 @@ AsyncStoreImpl::createQueueHandle(const std::string& name, return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts)); } -qpid::broker::TxnHandle -AsyncStoreImpl::createTxnHandle(const std::string& xid) -{ - return qpid::broker::TxnHandle(new TxnHandleImpl(xid)); -} - void AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index 3f2058a94c..a00771abf5 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -54,6 +54,10 @@ public: // --- Factory methods for creating handles --- + qpid::broker::TxnHandle createTxnHandle(); + qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb); + qpid::broker::TxnHandle createTxnHandle(const std::string& xid); + qpid::broker::TxnHandle createTxnHandle(const std::string& xid, qpid::broker::TxnBuffer* tb); qpid::broker::ConfigHandle createConfigHandle(); qpid::broker::EnqueueHandle createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, @@ -63,7 +67,6 @@ public: qpid::broker::MessageHandle createMessageHandle(const qpid::broker::DataSource* const dataSrc); qpid::broker::QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts); - qpid::broker::TxnHandle createTxnHandle(const std::string& xid=std::string()); // --- Store async interface --- diff --git a/cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h b/cpp/src/qpid/asyncStore/AtomicCounter.h index d354c729dc..6987b09384 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h +++ b/cpp/src/qpid/asyncStore/AtomicCounter.h @@ -21,16 +21,15 @@ * \file AtomicCounter.h */ -#ifndef tests_storePerftools_asyncPerf_AtomicCounter_h_ -#define tests_storePerftools_asyncPerf_AtomicCounter_h_ +#ifndef qpid_asyncStore_AtomicCounter_h_ +#define qpid_asyncStore_AtomicCounter_h_ #include "qpid/sys/Condition.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Time.h" -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace asyncStore { template <class T> class AtomicCounter @@ -52,20 +51,37 @@ public: return m_cnt; } - void + AtomicCounter& operator++() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); ++m_cnt; + return *this; } - void + AtomicCounter& operator--() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); if (--m_cnt == 0) { m_cntCondition.notify(); } + return *this; + } + + bool + operator==(const AtomicCounter& rhs) + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l1(m_cntMutex); + qpid::sys::ScopedLock<qpid::sys::Mutex> l2(rhs.m_cntMutex); + return m_cnt == rhs.m_cnt; + } + + bool + operator==(const T rhs) + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + return m_cnt == rhs; } void @@ -85,6 +101,6 @@ private: typedef AtomicCounter<uint32_t> AsyncOpCounter; -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::asyncStore -#endif // tests_storePerftools_asyncPerf_AtomicCounter_h_ +#endif // qpid_asyncStore_AtomicCounter_h_ diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp index 7ce01f881c..945b50861d 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp @@ -23,6 +23,8 @@ #include "TxnHandleImpl.h" +#include "qpid/Exception.h" +#include "qpid/broker/TxnBuffer.h" #include "qpid/messaging/PrivateImplRef.h" #include <uuid/uuid.h> @@ -30,16 +32,42 @@ namespace qpid { namespace asyncStore { +TxnHandleImpl::TxnHandleImpl() : + m_tpcFlag(false), + m_asyncOpCnt(0UL), + m_txnBuffer(0) +{ + createLocalXid(); +} + +TxnHandleImpl::TxnHandleImpl(qpid::broker::TxnBuffer* tb) : + m_tpcFlag(false), + m_asyncOpCnt(0UL), + m_txnBuffer(tb) +{ + createLocalXid(); +} + TxnHandleImpl::TxnHandleImpl(const std::string& xid) : m_xid(xid), - m_tpcFlag(!xid.empty()) -{ - if (m_xid.empty()) { // create a local xid from a random uuid - uuid_t uuid; - ::uuid_generate_random(uuid); - char uuidStr[37]; // 36-char uuid + trailing '\0' - ::uuid_unparse(uuid, uuidStr); -// m_xid.assign(uuidStr); + m_tpcFlag(!xid.empty()), + m_asyncOpCnt(0UL), + m_txnBuffer(0) +{ + if (m_xid.empty()) { + createLocalXid(); + } +} + +TxnHandleImpl::TxnHandleImpl(const std::string& xid, + qpid::broker::TxnBuffer* tb) : + m_xid(xid), + m_tpcFlag(!xid.empty()), + m_asyncOpCnt(0UL), + m_txnBuffer(tb) +{ + if (m_xid.empty()) { + createLocalXid(); } } @@ -58,4 +86,33 @@ TxnHandleImpl::is2pc() const return m_tpcFlag; } +void +TxnHandleImpl::incrOpCnt() +{ + ++m_asyncOpCnt; +} + +void +TxnHandleImpl::decrOpCnt() +{ + if (m_asyncOpCnt == 0UL) { + throw qpid::Exception("Transaction async operation count underflow"); + } + if (--m_asyncOpCnt == 0UL && m_txnBuffer) { + m_txnBuffer->asyncLocalCommit(); + } +} + +// private +void +TxnHandleImpl::createLocalXid() +{ + uuid_t uuid; + ::uuid_generate_random(uuid); + char uuidStr[37]; // 36-char uuid + trailing '\0' + ::uuid_unparse(uuid, uuidStr); + m_xid.assign(uuidStr); +//std::cout << "TTT TxnHandleImpl::createLocalXid(): Local XID created: \"" << m_xid << "\"" << std::endl << std::flush; +} + }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h index b28eb0cd4b..e357791508 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h @@ -24,23 +24,45 @@ #ifndef qpid_asyncStore_TxnHandleImpl_h_ #define qpid_asyncStore_TxnHandleImpl_h_ +#include "AtomicCounter.h" + #include "qpid/RefCounted.h" +#include <stdint.h> // uint32_t #include <string> namespace qpid { + +namespace broker { +class TxnBuffer; +} + namespace asyncStore { class TxnHandleImpl : public virtual qpid::RefCounted { public: - TxnHandleImpl(const std::string& xid = std::string()); + TxnHandleImpl(); + TxnHandleImpl(qpid::broker::TxnBuffer* tb); + TxnHandleImpl(const std::string& xid); + TxnHandleImpl(const std::string& xid, qpid::broker::TxnBuffer* tb); virtual ~TxnHandleImpl(); const std::string& getXid() const; bool is2pc() const; + + void submitPrepare(); + void submitCommit(); + void submitAbort(); + + void incrOpCnt(); + void decrOpCnt(); private: std::string m_xid; bool m_tpcFlag; + AsyncOpCounter m_asyncOpCnt; + qpid::broker::TxnBuffer* const m_txnBuffer; + + void createLocalXid(); }; }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp index ab391146f9..9e3298bb4e 100644 --- a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp +++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp @@ -49,7 +49,6 @@ AsyncResultQueueImpl::submit(boost::shared_ptr<AsyncResultHandle> arh) AsyncResultQueueImpl::ResultQueue::Batch::const_iterator AsyncResultQueueImpl::handle(const ResultQueue::Batch& e) { - for (ResultQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { //std::cout << "<== AsyncResultQueueImpl::handle() errNo=" << (*i)->getErrNo() << " errMsg=\"" << (*i)->getErrMsg() << "\"" << std::endl << std::flush; if ((*i)->isValid()) { diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index 7e2ee81620..2f73ec8f3e 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -37,7 +37,7 @@ class AsyncResultHandle; // Broker to subclass as a pollable queue class AsyncResultQueue { public: - virtual ~AsyncResultQueue(); + virtual ~AsyncResultQueue() {} // TODO: Remove boost::shared_ptr<> from this interface virtual void submit(boost::shared_ptr<AsyncResultHandle>) = 0; }; @@ -45,14 +45,14 @@ public: // Subclass this for specific contexts class BrokerAsyncContext { public: - virtual ~BrokerAsyncContext(); + virtual ~BrokerAsyncContext() {} virtual AsyncResultQueue* getAsyncResultQueue() const = 0; virtual void invokeCallback(const AsyncResultHandle* const) const = 0; }; class DataSource { public: - virtual ~DataSource(); + virtual ~DataSource() {} virtual uint64_t getSize() = 0; virtual void write(char* target) = 0; }; @@ -65,13 +65,28 @@ class EnqueueHandle; class EventHandle; class MessageHandle; class QueueHandle; +class TxnBuffer; class TxnHandle; +class AsyncTransactionalStore { +public: + virtual ~AsyncTransactionalStore() {} + + virtual TxnHandle createTxnHandle() = 0; + virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0; + virtual TxnHandle createTxnHandle(const std::string& xid) = 0; + virtual TxnHandle createTxnHandle(const std::string& xid, TxnBuffer* tb) = 0; + + // TODO: Remove boost::shared_ptr<> from this interface + virtual void submitPrepare(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only + virtual void submitCommit(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; +}; // Subclassed by store: -class AsyncStore { +class AsyncStore : public AsyncTransactionalStore { public: - virtual ~AsyncStore(); + virtual ~AsyncStore() {} // --- Factory methods for creating handles --- @@ -80,16 +95,11 @@ public: virtual EventHandle createEventHandle(QueueHandle&, const std::string& key=std::string()) = 0; virtual MessageHandle createMessageHandle(const DataSource* const) = 0; virtual QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts) = 0; - virtual TxnHandle createTxnHandle(const std::string& xid=std::string()) = 0; // Distr. txns must supply xid // --- Store async interface --- // TODO: Remove boost::shared_ptr<> from this interface - virtual void submitPrepare(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only - virtual void submitCommit(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitCreate(ConfigHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0; virtual void submitDestroy(ConfigHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp new file mode 100644 index 0000000000..e8abe99dab --- /dev/null +++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TransactionAsyncContext.cpp + */ + +#include "TxnAsyncContext.h" + +#include <cassert> + +namespace qpid { +namespace broker { + +TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb, + TxnHandle& th, + const qpid::asyncStore::AsyncOperation::opCode op, + qpid::broker::AsyncResultCallback rcb, + qpid::broker::AsyncResultQueue* const arq): + m_tb(tb), + m_th(th), + m_op(op), + m_rcb(rcb), + m_arq(arq) +{ + assert(m_th.isValid()); +} + +TxnAsyncContext::~TxnAsyncContext() +{} + +TxnBuffer* +TxnAsyncContext::getTxnBuffer() const +{ + return m_tb; +} + +qpid::asyncStore::AsyncOperation::opCode +TxnAsyncContext::getOpCode() const +{ + return m_op; +} + +const char* +TxnAsyncContext::getOpStr() const +{ + return qpid::asyncStore::AsyncOperation::getOpStr(m_op); +} + +TxnHandle +TxnAsyncContext::getTransactionContext() const +{ + return m_th; +} + +AsyncResultQueue* +TxnAsyncContext::getAsyncResultQueue() const +{ + return m_arq; +} + +void +TxnAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const +{ + if (m_rcb) { + m_rcb(arh); + } +} + +}} // namespace qpid::broker diff --git a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h index 186a8141cf..9bdd8f7188 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h +++ b/cpp/src/qpid/broker/TxnAsyncContext.h @@ -18,39 +18,48 @@ */ /** - * \file TransactionAsyncContext.h + * \file TxnAsyncContext.h */ -#ifndef tests_storePerftools_asyncPerf_TransactionAsyncContext_h_ -#define tests_storePerftools_asyncPerf_TransactionAsyncContext_h_ +#ifndef qpid_broker_TxnAsyncContext_h_ +#define qpid_broker_TxnAsyncContext_h_ + +#include "AsyncStore.h" // qpid::broker::BrokerAsyncContext +#include "TxnHandle.h" #include "qpid/asyncStore/AsyncOperation.h" -#include "qpid/broker/AsyncStore.h" // qpid::broker::BrokerAsyncContext #include <boost/shared_ptr.hpp> -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class SimpleTransactionContext; +namespace qpid { +namespace broker { -class TransactionAsyncContext: public qpid::broker::BrokerAsyncContext +class TxnAsyncContext: public BrokerAsyncContext { public: - TransactionAsyncContext(boost::shared_ptr<SimpleTransactionContext> tc, - const qpid::asyncStore::AsyncOperation::opCode op); - virtual ~TransactionAsyncContext(); + TxnAsyncContext(TxnBuffer* const tb, + TxnHandle& th, + const qpid::asyncStore::AsyncOperation::opCode op, + qpid::broker::AsyncResultCallback rcb, + qpid::broker::AsyncResultQueue* const arq); + virtual ~TxnAsyncContext(); + TxnBuffer* getTxnBuffer() const; qpid::asyncStore::AsyncOperation::opCode getOpCode() const; const char* getOpStr() const; - boost::shared_ptr<SimpleTransactionContext> getTransactionContext() const; - void destroy(); + TxnHandle getTransactionContext() const; + + // --- Interface BrokerAsyncContext --- + AsyncResultQueue* getAsyncResultQueue() const; + void invokeCallback(const AsyncResultHandle* const) const; private: - boost::shared_ptr<SimpleTransactionContext> m_tc; + TxnBuffer* const m_tb; + TxnHandle m_th; const qpid::asyncStore::AsyncOperation::opCode m_op; + AsyncResultCallback m_rcb; + AsyncResultQueue* const m_arq; }; -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker -#endif // tests_storePerftools_asyncPerf_TransactionAsyncContext_h_ +#endif // qpid_broker_TxnAsyncContext_h_ diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp new file mode 100644 index 0000000000..2af8bd4da3 --- /dev/null +++ b/cpp/src/qpid/broker/TxnBuffer.cpp @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnBuffer.cpp + */ + +#include "TxnBuffer.h" + +#include "AsyncResultHandle.h" +#include "AsyncStore.h" +#include "TxnAsyncContext.h" +#include "TxnOp.h" + +#include "qpid/Exception.h" + +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +TxnBuffer::TxnBuffer(AsyncResultQueue& arq) : + m_store(0), + m_resultQueue(arq), + m_state(NONE) +{} + +TxnBuffer::~TxnBuffer() +{} + +void +TxnBuffer::enlist(boost::shared_ptr<TxnOp> op) +{ +//std::cout << "TTT TxnBuffer::enlist" << std::endl << std::flush; + m_ops.push_back(op); +} + +bool +TxnBuffer::prepare(TxnHandle& th) +{ +//std::cout << "TTT TxnBuffer::prepare" << std::endl << std::flush; + for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + if (!(*i)->prepare(th)) { + return false; + } + } + return true; +} + +void +TxnBuffer::commit() +{ +//std::cout << "TTT TxnBuffer::commit" << std::endl << std::flush; + for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + (*i)->commit(); + } + m_ops.clear(); +} + +void +TxnBuffer::rollback() +{ +//std::cout << "TTT TxnBuffer::rollback" << std::endl << std::flush; + for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + (*i)->rollback(); + } + m_ops.clear(); +} + +bool +TxnBuffer::commitLocal(AsyncTransactionalStore* const store) +{ +//std::cout << "TTT TxnBuffer::commitLocal" << std::endl << std::flush; + if (store) { + try { + m_store = store; + asyncLocalCommit(); + } catch (std::exception& e) { + std::cerr << "Commit failed: " << e.what() << std::endl; + } catch (...) { + std::cerr << "Commit failed (unknown exception)" << std::endl; + } + } + return false; +} + +// static +void +TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh) +{ + if (arh) { + boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + tac->getTxnBuffer()->asyncLocalAbort(); + std::cerr << "Transaction xid=\"" << tac->getTransactionContext().getXid() << "\": Operation " << tac->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + tac->getTxnBuffer()->asyncLocalAbort(); + } else { +//std::cout << "TTT TxnBuffer::handleAsyncResult() op=" << tac->getOpStr() << std::endl << std::flush; + if (tac->getOpCode() == qpid::asyncStore::AsyncOperation::TXN_ABORT) { + tac->getTxnBuffer()->asyncLocalAbort(); + } else { + tac->getTxnBuffer()->asyncLocalCommit(); + } + } + } +} + +void +TxnBuffer::asyncLocalCommit() +{ + assert(m_store != 0); + switch(m_state) { + case NONE: +//std::cout << "TTT TxnBuffer::asyncLocalCommit: NONE->PREPARE" << std::endl << std::flush; + m_state = PREPARE; + m_txnHandle = m_store->createTxnHandle(this); + prepare(m_txnHandle); + break; + case PREPARE: +//std::cout << "TTT TxnBuffer::asyncLocalCommit: PREPARE->COMMIT" << std::endl << std::flush; + m_state = COMMIT; + { + boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, + m_txnHandle, + qpid::asyncStore::AsyncOperation::TXN_COMMIT, + &handleAsyncResult, + &m_resultQueue)); + m_store->submitCommit(m_txnHandle, tac); + } + break; + case COMMIT: +//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMMIT->COMPLETE" << std::endl << std::flush; + commit(); + m_state = COMPLETE; + //delete this; // TODO: ugly! Find a better way to handle the life cycle of this class + break; + default: ; +//std::cout << "TTT TxnBuffer:asyncLocalCommit: Unexpected state " << m_state << std::endl << std::flush; + } +} + +void +TxnBuffer::asyncLocalAbort() +{ + assert(m_store != 0); + switch (m_state) { + case NONE: + case PREPARE: + case COMMIT: +//std::cout << "TTT TxnBuffer::asyncRollback: xxx->ROLLBACK" << std::endl << std::flush; + m_state = ROLLBACK; + { + boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, + m_txnHandle, + qpid::asyncStore::AsyncOperation::TXN_ABORT, + &handleAsyncResult, + &m_resultQueue)); + m_store->submitCommit(m_txnHandle, tac); + } + break; + case ROLLBACK: +//std::cout << "TTT TxnBuffer:asyncRollback: ROLLBACK->COMPLETE" << std::endl << std::flush; + rollback(); + m_state = COMPLETE; + //delete this; // TODO: ugly! Find a better way to handle the life cycle of this class + default: ; +//std::cout << "TTT TxnBuffer:asyncRollback: Unexpected state " << m_state << std::endl << std::flush; + } +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h new file mode 100644 index 0000000000..308a91aa03 --- /dev/null +++ b/cpp/src/qpid/broker/TxnBuffer.h @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnBuffer.h + */ + +#ifndef qpid_broker_TxnBuffer_h_ +#define qpid_broker_TxnBuffer_h_ + +#include "TxnHandle.h" + +//#include <boost/enable_shared_from_this.hpp> +#include <boost/shared_ptr.hpp> +#include <vector> + +namespace qpid { +namespace broker { + +class AsyncResultHandle; +class AsyncResultQueue; +class AsyncTransactionalStore; +class TxnOp; + +class TxnBuffer /*: public boost::enable_shared_from_this<TxnBuffer>*/ { +public: + TxnBuffer(AsyncResultQueue& arq); + virtual ~TxnBuffer(); + + void enlist(boost::shared_ptr<TxnOp> op); + bool prepare(TxnHandle& th); + void commit(); + void rollback(); + bool commitLocal(AsyncTransactionalStore* const store); + + // --- Async operations --- + static void handleAsyncResult(const AsyncResultHandle* const arh); + void asyncLocalCommit(); + void asyncLocalAbort(); + +private: + std::vector<boost::shared_ptr<TxnOp> > m_ops; + TxnHandle m_txnHandle; + AsyncTransactionalStore* m_store; + AsyncResultQueue& m_resultQueue; + + typedef enum {NONE = 0, PREPARE, COMMIT, ROLLBACK, COMPLETE} e_txnState; + e_txnState m_state; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_TxnBuffer_h_ diff --git a/cpp/src/qpid/broker/TxnHandle.cpp b/cpp/src/qpid/broker/TxnHandle.cpp index 1e6cd2ac6f..07d46b4235 100644 --- a/cpp/src/qpid/broker/TxnHandle.cpp +++ b/cpp/src/qpid/broker/TxnHandle.cpp @@ -69,4 +69,16 @@ TxnHandle::is2pc() const return impl->is2pc(); } +void +TxnHandle::incrOpCnt() +{ + impl->incrOpCnt(); +} + +void +TxnHandle::decrOpCnt() +{ + impl->decrOpCnt(); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnHandle.h b/cpp/src/qpid/broker/TxnHandle.h index 34a9c69434..8bed14e16a 100644 --- a/cpp/src/qpid/broker/TxnHandle.h +++ b/cpp/src/qpid/broker/TxnHandle.h @@ -44,6 +44,8 @@ public: // TxnHandleImpl methods const std::string& getXid() const; bool is2pc() const; + void incrOpCnt(); + void decrOpCnt(); private: friend class qpid::messaging::PrivateImplRef<TxnHandle>; diff --git a/cpp/src/qpid/broker/AsyncStore.cpp b/cpp/src/qpid/broker/TxnOp.h index 10cb3d27cf..e98429535e 100644 --- a/cpp/src/qpid/broker/AsyncStore.cpp +++ b/cpp/src/qpid/broker/TxnOp.h @@ -17,21 +17,24 @@ * under the License. */ -#include "AsyncStore.h" +/** + * \file TxnOp.h + */ + +#ifndef qpid_broker_TxnOp_h_ +#define qpid_broker_TxnOp_h_ namespace qpid { namespace broker { -AsyncResultQueue::~AsyncResultQueue() -{} - -BrokerAsyncContext::~BrokerAsyncContext() -{} - -DataSource::~DataSource() -{} - -AsyncStore::~AsyncStore() -{} +class TxnOp{ +public: + virtual ~TxnOp() {} + virtual bool prepare(TxnHandle& th) throw() = 0; + virtual void commit() throw() = 0; + virtual void rollback() throw() = 0; +}; }} // namespace qpid::broker + +#endif // qpid_broker_TxnOp_h_ diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index 1d04828a49..fbe4cfbd7d 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -379,6 +379,7 @@ endif (UNIX) # Async store perf test (asyncPerf) set (asyncStorePerf_SOURCES + storePerftools/asyncPerf/Deliverable.cpp storePerftools/asyncPerf/MessageAsyncContext.cpp storePerftools/asyncPerf/MessageConsumer.cpp storePerftools/asyncPerf/MessageDeque.cpp @@ -388,10 +389,9 @@ set (asyncStorePerf_SOURCES storePerftools/asyncPerf/QueuedMessage.cpp storePerftools/asyncPerf/SimplePersistableMessage.cpp storePerftools/asyncPerf/SimplePersistableQueue.cpp - storePerftools/asyncPerf/SimpleTransactionContext.cpp storePerftools/asyncPerf/TestOptions.cpp storePerftools/asyncPerf/TestResult.cpp - storePerftools/asyncPerf/TransactionAsyncContext.cpp + storePerftools/asyncPerf/TxnPublish.cpp storePerftools/common/Parameters.cpp storePerftools/common/PerftoolError.cpp diff --git a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp index f150a34027..9da7e348e0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp @@ -18,50 +18,26 @@ */ /** - * \file TransactionAsyncContext.cpp + * \file Deliverable.cpp */ -#include "TransactionAsyncContext.h" - -#include <cassert> +#include "Deliverable.h" namespace tests { namespace storePerftools { namespace asyncPerf { -TransactionAsyncContext::TransactionAsyncContext(boost::shared_ptr<SimpleTransactionContext> tc, - const qpid::asyncStore::AsyncOperation::opCode op): - m_tc(tc), - m_op(op) -{ - assert(m_tc.get() != 0); -} - -TransactionAsyncContext::~TransactionAsyncContext() +Deliverable::Deliverable() : + m_delivered(false) {} -qpid::asyncStore::AsyncOperation::opCode -TransactionAsyncContext::getOpCode() const -{ - return m_op; -} - -const char* -TransactionAsyncContext::getOpStr() const -{ - return qpid::asyncStore::AsyncOperation::getOpStr(m_op); -} - -boost::shared_ptr<SimpleTransactionContext> -TransactionAsyncContext::getTransactionContext() const -{ - return m_tc; -} +Deliverable::~Deliverable() +{} -void -TransactionAsyncContext::destroy() +bool +Deliverable::isDelivered() const { - delete this; + return m_delivered; } }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h new file mode 100644 index 0000000000..57e130eeba --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Deliverable.h + */ + +#ifndef tests_storePerftools_asyncPerf_Deliverable_h_ +#define tests_storePerftools_asyncPerf_Deliverable_h_ + +#include <boost/shared_ptr.hpp> +#include <stdint.h> // uint64_t + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class SimplePersistableMessage; +class SimplePersistableQueue; + +class Deliverable +{ +public: + Deliverable(); + virtual ~Deliverable(); + + virtual uint64_t contentSize() = 0; + virtual void deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue) = 0; + virtual SimplePersistableMessage& getMessage() = 0; + virtual bool isDelivered() const; + +protected: + bool m_delivered; +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_Deliverable_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 0a2fd4f333..9b015fc428 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -32,8 +32,6 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimpleTransactionContext; - MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, boost::shared_ptr<SimplePersistableQueue> queue) : m_perfTestParams(perfTestParams), @@ -54,7 +52,7 @@ MessageConsumer::runConsumers() ::usleep(1000); // TODO - replace this poller with condition variable } } - return 0; + return reinterpret_cast<void*>(0); } //static diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp index 5530513e12..008ddf33e7 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -26,6 +26,10 @@ #include "SimplePersistableMessage.h" #include "SimplePersistableQueue.h" #include "TestOptions.h" +#include "TxnPublish.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/broker/TxnBuffer.h" #include <stdint.h> // uint32_t @@ -33,15 +37,15 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimpleTransactionContext; - MessageProducer::MessageProducer(const TestOptions& perfTestParams, const char* msgData, qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq, boost::shared_ptr<SimplePersistableQueue> queue) : m_perfTestParams(perfTestParams), m_msgData(msgData), m_store(store), + m_resultQueue(arq), m_queue(queue) {} @@ -51,11 +55,46 @@ MessageProducer::~MessageProducer() void* MessageProducer::runProducers() { + const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U; + uint16_t txnCnt = 0U; + qpid::broker::TxnBuffer* tb = 0; + if (useTxns) { + tb = new qpid::broker::TxnBuffer(m_resultQueue); + } for (uint32_t numMsgs=0; numMsgs<m_perfTestParams.m_numMsgs; ++numMsgs) { boost::intrusive_ptr<SimplePersistableMessage> msg(new SimplePersistableMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); - m_queue->deliver(msg); + if (useTxns) { + boost::shared_ptr<TxnPublish> op(new TxnPublish(msg)); + op->deliverTo(m_queue); + tb->enlist(op); + if (++txnCnt >= m_perfTestParams.m_enqTxnBlockSize) { + if (m_perfTestParams.m_durable) { + tb->commitLocal(m_store); + + // TxnBuffer instance tb carries async state that precludes it being re-used for the next + // transaction until the current commit cycle completes. So use another instance. This + // instance should auto-delete when the async commit cycle completes. + if (numMsgs<m_perfTestParams.m_numMsgs) { + //tb = boost::shared_ptr<qpid::broker::TxnBuffer>(new qpid::broker::TxnBuffer(m_resultQueue)); + tb = new qpid::broker::TxnBuffer(m_resultQueue); + } + } else { + tb->commit(); + } + txnCnt = 0U; + } + } else { + m_queue->deliver(msg); + } + } + if (txnCnt) { + if (m_perfTestParams.m_durable) { + tb->commitLocal(m_store); + } else { + tb->commit(); + } } - return 0; + return reinterpret_cast<void*>(0); } //static diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h index 746247e849..55504164ef 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h @@ -28,9 +28,10 @@ namespace qpid { namespace asyncStore { - class AsyncStoreImpl; - +} +namespace broker { +class AsyncResultQueue; }} namespace tests { @@ -39,6 +40,7 @@ namespace asyncPerf { class SimplePersistableQueue; class TestOptions; +class TxnBuffer; class MessageProducer { @@ -46,6 +48,7 @@ public: MessageProducer(const TestOptions& perfTestParams, const char* msgData, qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq, boost::shared_ptr<SimplePersistableQueue> queue); virtual ~MessageProducer(); void* runProducers(); @@ -54,6 +57,7 @@ private: const TestOptions& m_perfTestParams; const char* m_msgData; qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncResultQueue& m_resultQueue; boost::shared_ptr<SimplePersistableQueue> m_queue; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index fbfebbac2b..7941e761ca 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -80,7 +80,7 @@ PerfTest::run() tests::storePerftools::common::ScopedTimer st(m_testResult); for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) { - boost::shared_ptr<MessageProducer> mp(new MessageProducer(m_testOpts, m_msgData, m_store, m_queueList[q])); + boost::shared_ptr<MessageProducer> mp(new MessageProducer(m_testOpts, m_msgData, m_store, m_resultQueue, m_queueList[q])); m_producers.push_back(mp); for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mp->startProducers, diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp index 4836f9358a..07a80c8a33 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp @@ -31,10 +31,12 @@ namespace storePerftools { namespace asyncPerf { QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q, + qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq) : m_q(q), + m_th(th), m_op(op), m_rcb(rcb), m_arq(arq) @@ -44,11 +46,13 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q, boost::intrusive_ptr<SimplePersistableMessage> msg, + qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq) : m_q(q), m_msg(msg), + m_th(th), m_op(op), m_rcb(rcb), m_arq(arq) @@ -84,6 +88,12 @@ QueueAsyncContext::getMessage() const return m_msg; } +qpid::broker::TxnHandle +QueueAsyncContext::getTxnHandle() const +{ + return m_th; +} + qpid::broker::AsyncResultQueue* QueueAsyncContext::getAsyncResultQueue() const { @@ -99,7 +109,9 @@ QueueAsyncContext::getAsyncResultCallback() const void QueueAsyncContext::invokeCallback(const qpid::broker::AsyncResultHandle* const arh) const { - m_rcb(arh); + if (m_rcb) { + m_rcb(arh); + } } void diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h index df45117fe4..b4d16fe615 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h @@ -26,6 +26,7 @@ #include "qpid/asyncStore/AsyncOperation.h" #include "qpid/broker/AsyncStore.h" +#include "qpid/broker/TxnHandle.h" #include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> @@ -42,11 +43,13 @@ class QueueAsyncContext: public qpid::broker::BrokerAsyncContext { public: QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q, + qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q, boost::intrusive_ptr<SimplePersistableMessage> msg, + qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq); @@ -55,6 +58,7 @@ public: const char* getOpStr() const; boost::shared_ptr<SimplePersistableQueue> getQueue() const; boost::intrusive_ptr<SimplePersistableMessage> getMessage() const; + qpid::broker::TxnHandle getTxnHandle() const; qpid::broker::AsyncResultQueue* getAsyncResultQueue() const; qpid::broker::AsyncResultCallback getAsyncResultCallback() const; void invokeCallback(const qpid::broker::AsyncResultHandle* const arh) const; @@ -63,6 +67,7 @@ public: private: boost::shared_ptr<SimplePersistableQueue> m_q; boost::intrusive_ptr<SimplePersistableMessage> m_msg; + qpid::broker::TxnHandle m_th; const qpid::asyncStore::AsyncOperation::opCode m_op; qpid::broker::AsyncResultCallback m_rcb; qpid::broker::AsyncResultQueue* const m_arq; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp index dd4de355b3..8ee858587b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp @@ -79,4 +79,22 @@ QueuedMessage::enqHandle() return m_enqHandle; } +void +QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th) +{ + m_queue->enqueue(th, *this); +} + +void +QueuedMessage::commitEnqueue() +{ + m_queue->process(m_msg); +} + +void +QueuedMessage::abortEnqueue() +{ + m_queue->enqueueAborted(m_msg); +} + }}} // namespace tests::storePerfTools diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h index 29e7f792cd..896c53ab5b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h @@ -28,6 +28,13 @@ #include <boost/intrusive_ptr.hpp> +namespace qpid { +namespace broker { + +class TxnHandle; + +}} + namespace tests { namespace storePerftools { namespace asyncPerf { @@ -48,6 +55,11 @@ public: const qpid::broker::EnqueueHandle& enqHandle() const; qpid::broker::EnqueueHandle& enqHandle(); + // -- Transaction handling --- + void prepareEnqueue(qpid::broker::TxnHandle& th); + void commitEnqueue(); + void abortEnqueue(); + private: SimplePersistableQueue* m_queue; boost::intrusive_ptr<SimplePersistableMessage> m_msg; diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp index 58aacabdcd..a9771c1442 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp @@ -52,6 +52,12 @@ SimplePersistableMessage::getHandle() return m_msgHandle; } +uint64_t +SimplePersistableMessage::contentSize() const +{ + return static_cast<uint64_t>(m_msg.size()); +} + void SimplePersistableMessage::setPersistenceId(uint64_t id) const { diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h index f6258822ea..7ac54ddea1 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h @@ -51,20 +51,21 @@ public: virtual ~SimplePersistableMessage(); const qpid::broker::MessageHandle& getHandle() const; qpid::broker::MessageHandle& getHandle(); + uint64_t contentSize() const; - // Interface Persistable + // --- Interface Persistable --- virtual void setPersistenceId(uint64_t id) const; virtual uint64_t getPersistenceId() const; virtual void encode(qpid::framing::Buffer& buffer) const; virtual uint32_t encodedSize() const; - // Interface PersistableMessage + // --- Interface PersistableMessage --- virtual void allDequeuesComplete(); virtual uint32_t encodedHeaderSize() const; virtual bool isPersistent() const; - // Interface DataStore - virtual uint64_t getSize(); + // --- Interface DataSource --- + virtual uint64_t getSize(); // <- same as encodedSize()? virtual void write(char* target); private: diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp index b34357c144..1a3eae4b43 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp @@ -25,17 +25,21 @@ #include "MessageDeque.h" #include "SimplePersistableMessage.h" -#include "SimpleTransactionContext.h" #include "QueueAsyncContext.h" #include "QueuedMessage.h" #include "qpid/asyncStore/AsyncStoreImpl.h" #include "qpid/broker/AsyncResultHandle.h" +#include "qpid/broker/TxnHandle.h" namespace tests { namespace storePerftools { namespace asyncPerf { +//static +qpid::broker::TxnHandle SimplePersistableQueue::s_nullTxnHandle; // used for non-txn operations + + SimplePersistableQueue::SimplePersistableQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, qpid::asyncStore::AsyncStoreImpl* store, @@ -127,6 +131,7 @@ SimplePersistableQueue::asyncCreate() { if (m_store) { boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + s_nullTxnHandle, qpid::asyncStore::AsyncOperation::QUEUE_CREATE, &handleAsyncResult, &m_resultQueue)); @@ -144,6 +149,7 @@ SimplePersistableQueue::asyncDestroy(const bool deleteQueue) if (m_store) { if (deleteQueue) { boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + s_nullTxnHandle, qpid::asyncStore::AsyncOperation::QUEUE_DESTROY, &handleAsyncResult, &m_resultQueue)); @@ -159,7 +165,7 @@ void SimplePersistableQueue::deliver(boost::intrusive_ptr<SimplePersistableMessage> msg) { QueuedMessage qm(this, msg); - enqueue((SimpleTransactionContext*)0, qm); + enqueue(s_nullTxnHandle, qm); push(qm); } @@ -168,13 +174,13 @@ SimplePersistableQueue::dispatch() { QueuedMessage qm; if (m_messages->consume(qm)) { - return dequeue((SimpleTransactionContext*)0, qm); + return dequeue(s_nullTxnHandle, qm); } return false; } bool -SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt, +SimplePersistableQueue::enqueue(qpid::broker::TxnHandle& th, QueuedMessage& qm) { ScopedUse u(m_barrier); @@ -183,13 +189,13 @@ SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt, } if (qm.payload()->isPersistent() && m_store) { qm.payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(ctxt, qm); + return asyncEnqueue(th, qm); } return false; } bool -SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt, +SimplePersistableQueue::dequeue(qpid::broker::TxnHandle& th, QueuedMessage& qm) { ScopedUse u(m_barrier); @@ -198,12 +204,23 @@ SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt, } if (qm.payload()->isPersistent() && m_store) { qm.payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(ctxt, qm); + return asyncDequeue(th, qm); } return true; } void +SimplePersistableQueue::process(boost::intrusive_ptr<SimplePersistableMessage> msg) +{ + QueuedMessage qm(this, msg); + push(qm); +} + +void +SimplePersistableQueue::enqueueAborted(boost::intrusive_ptr<SimplePersistableMessage> /*msg*/) +{} + +void SimplePersistableQueue::encode(qpid::framing::Buffer& buffer) const { buffer.putShortString(m_name); @@ -326,38 +343,46 @@ SimplePersistableQueue::push(QueuedMessage& qm, // private bool -SimplePersistableQueue::asyncEnqueue(SimpleTransactionContext* txn, +SimplePersistableQueue::asyncEnqueue(qpid::broker::TxnHandle& th, QueuedMessage& qm) { qm.payload()->setPersistenceId(m_store->getNextRid()); //std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), qm.payload(), + th, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, &handleAsyncResult, &m_resultQueue)); m_store->submitEnqueue(qm.enqHandle(), - txn->getHandle(), + th, qac); ++m_asyncOpCounter; + if (th.isValid()) { + th.incrOpCnt(); + } return true; } // private bool -SimplePersistableQueue::asyncDequeue(SimpleTransactionContext* txn, +SimplePersistableQueue::asyncDequeue(qpid::broker::TxnHandle& th, QueuedMessage& qm) { //std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), qm.payload(), + th, qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, &handleAsyncResult, &m_resultQueue)); m_store->submitDequeue(qm.enqHandle(), - txn->getHandle(), + th, qac); ++m_asyncOpCounter; + if (th.isValid()) { + th.incrOpCnt(); + } return true; } @@ -407,6 +432,11 @@ SimplePersistableQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContex //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; assert(qc->getQueue().get() == this); --m_asyncOpCounter; + + qpid::broker::TxnHandle th = qc->getTxnHandle(); + if (th.isValid()) { // transactional enqueue + th.decrOpCnt(); + } } // private @@ -416,6 +446,11 @@ SimplePersistableQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContex //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; assert(qc->getQueue().get() == this); --m_asyncOpCounter; + + qpid::broker::TxnHandle th = qc->getTxnHandle(); + if (th.isValid()) { // transactional enqueue + th.decrOpCnt(); + } } }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h index 3bc0972ede..34ef9407ac 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h @@ -24,8 +24,7 @@ #ifndef tests_storePerftools_asyncPerf_SimplePersistableQueue_h_ #define tests_storePerftools_asyncPerf_SimplePersistableQueue_h_ -#include "AtomicCounter.h" // AsyncOpCounter - +#include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter #include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource #include "qpid/broker/PersistableQueue.h" #include "qpid/broker/QueueHandle.h" @@ -41,6 +40,7 @@ class AsyncStoreImpl; } namespace broker { class AsyncResultQueue; +class TxnHandle; } namespace framing { class FieldTable; @@ -52,7 +52,6 @@ namespace asyncPerf { class Messages; class SimplePersistableMessage; -class SimpleTransactionContext; class QueueAsyncContext; class QueuedMessage; @@ -78,10 +77,12 @@ public: // --- Methods in msg handling path from qpid::Queue --- void deliver(boost::intrusive_ptr<SimplePersistableMessage> msg); bool dispatch(); // similar to qpid::broker::Queue::distpatch(Consumer&) but without Consumer param - bool enqueue(SimpleTransactionContext* ctxt, + bool enqueue(qpid::broker::TxnHandle& th, QueuedMessage& qm); - bool dequeue(SimpleTransactionContext* ctxt, + bool dequeue(qpid::broker::TxnHandle& th, QueuedMessage& qm); + void process(boost::intrusive_ptr<SimplePersistableMessage> msg); + void enqueueAborted(boost::intrusive_ptr<SimplePersistableMessage> msg); // --- Interface qpid::broker::Persistable --- virtual void encode(qpid::framing::Buffer& buffer) const; @@ -99,10 +100,12 @@ public: virtual void write(char* target); private: + static qpid::broker::TxnHandle s_nullTxnHandle; // used for non-txn operations + const std::string m_name; qpid::asyncStore::AsyncStoreImpl* m_store; qpid::broker::AsyncResultQueue& m_resultQueue; - AsyncOpCounter m_asyncOpCounter; + qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; mutable uint64_t m_persistenceId; std::string m_persistableData; qpid::broker::QueueHandle m_queueHandle; @@ -133,9 +136,9 @@ private: bool isRecovery = false); // -- Async ops --- - bool asyncEnqueue(SimpleTransactionContext* txn, + bool asyncEnqueue(qpid::broker::TxnHandle& th, QueuedMessage& qm); - bool asyncDequeue(SimpleTransactionContext* txn, + bool asyncDequeue(qpid::broker::TxnHandle& th, QueuedMessage& qm); // --- Async op counter --- diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp deleted file mode 100644 index 2aea14bc21..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimpleTransactionContext.cpp - */ - -#include "SimpleTransactionContext.h" - -#include "TransactionAsyncContext.h" - -#include "qpid/asyncStore/AsyncStoreImpl.h" - -#include <uuid/uuid.h> - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -SimpleTransactionContext::SimpleTransactionContext(const std::string& xid) : - qpid::broker::TransactionContext(), - m_xid(xid), - m_tpcFlag(!xid.empty()), - m_store(0), - m_txnHandle(0), - m_prepared(false), - m_enqueuedMsgs() -{ - if (!m_tpcFlag) { - setLocalXid(); - } -//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; -} - -SimpleTransactionContext::SimpleTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, - const std::string& xid) : - m_store(store), - m_prepared(false), - m_enqueuedMsgs() -{ -//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; - if (m_store != 0) { - m_txnHandle = store->createTxnHandle(xid); - } -} - -SimpleTransactionContext::~SimpleTransactionContext() -{} - -// static -/* -void -SimpleTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerAsyncContext* bc) -{ - if (bc && res) { - TransactionAsyncContext* tac = dynamic_cast<TransactionAsyncContext*>(bc); - if (res->errNo) { - // TODO: Handle async failure here - std::cerr << "Transaction xid=\"" << tac->getTransactionContext()->getXid() << "\": Operation " << tac->getOpStr() << ": failure " - << res->errNo << " (" << res->errMsg << ")" << std::endl; - } else { - // Handle async success here - switch(tac->getOpCode()) { - case qpid::asyncStore::AsyncOperation::TXN_PREPARE: - tac->getTransactionContext()->prepareComplete(tac); - break; - case qpid::asyncStore::AsyncOperation::TXN_COMMIT: - tac->getTransactionContext()->commitComplete(tac); - break; - case qpid::asyncStore::AsyncOperation::TXN_ABORT: - tac->getTransactionContext()->abortComplete(tac); - break; - default: - std::ostringstream oss; - oss << "tests::storePerftools::asyncPerf::SimpleTransactionContext::handleAsyncResult(): Unknown async operation: " << tac->getOpCode(); - throw qpid::Exception(oss.str()); - }; - } - } - if (bc) delete bc; - if (res) delete res; -} -*/ - -const qpid::broker::TxnHandle& -SimpleTransactionContext::getHandle() const -{ - return m_txnHandle; -} - -qpid::broker::TxnHandle& -SimpleTransactionContext::getHandle() -{ - return m_txnHandle; -} - -bool -SimpleTransactionContext::is2pc() const -{ - return m_tpcFlag; -} - -const std::string& -SimpleTransactionContext::getXid() const -{ - return m_xid; -} - -void -SimpleTransactionContext::addEnqueuedMsg(QueuedMessage* qm) -{ - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); - m_enqueuedMsgs.push_back(qm); -} - -void -SimpleTransactionContext::prepare() -{ - if (m_tpcFlag) { - localPrepare(); - m_prepared = true; - } - std::ostringstream oss; - oss << "SimpleTransactionContext::prepare(): xid=\"" << getXid() - << "\": Transaction Error: called prepare() on local transaction"; - throw qpid::Exception(oss.str()); -} - -void -SimpleTransactionContext::abort() -{ - // TODO: Check the following XA transaction semantics: - // Assuming 2PC aborts can occur without a prepare. Do local prepare if not already prepared. - if (!m_prepared) { - localPrepare(); - } - if (m_store != 0) { -// m_store->submitAbort(m_txnHandle, -// &handleAsyncResult, -// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT))); - } -//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; -} - -void -SimpleTransactionContext::commit() -{ - if (is2pc()) { - if (!m_prepared) { - std::ostringstream oss; - oss << "SimpleTransactionContext::abort(): xid=\"" << getXid() - << "\": Transaction Error: called commit() without prepare() on 2PC transaction"; - throw qpid::Exception(oss.str()); - } - } else { - localPrepare(); - } - if (m_store != 0) { -// m_store->submitCommit(m_txnHandle, -// &handleAsyncResult, -// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT))); - } -//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; -} - - -// private -void -SimpleTransactionContext::localPrepare() -{ - if (m_store != 0) { -// m_store->submitPrepare(m_txnHandle, -// &handleAsyncResult, -// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE))); - } -//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; -} - -// private -void -SimpleTransactionContext::setLocalXid() -{ - uuid_t uuid; - // TODO: Valgrind warning: Possible race condition in uuid_generate_random() - is it thread-safe, and if not, does it matter? - // If this race condition affects the randomness of the UUID, then there could be a problem here. - ::uuid_generate_random(uuid); - char uuidStr[37]; // 36-char uuid + trailing '\0' - ::uuid_unparse(uuid, uuidStr); - m_xid.assign(uuidStr); -} - -// private -void -SimpleTransactionContext::prepareComplete(const TransactionAsyncContext* /*tc*/) -{ - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); -// while (!m_enqueuedMsgs.empty()) { -// m_enqueuedMsgs.front()->clearTransaction(); -// m_enqueuedMsgs.pop_front(); -// } -//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush; -// assert(tc->getTransactionContext().get() == this); -} - - -// private -void -SimpleTransactionContext::abortComplete(const TransactionAsyncContext* tc) -{ -//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush; - assert(tc->getTransactionContext().get() == this); -} - - -// private -void -SimpleTransactionContext::commitComplete(const TransactionAsyncContext* tc) -{ -//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush; - assert(tc->getTransactionContext().get() == this); -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.h deleted file mode 100644 index 49fee9c720..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.h +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimpleTransactionContext.h - */ - -#ifndef tests_storePerftools_asyncPerf_SimpleTransactionContext_h_ -#define tests_storePerftools_asyncPerf_SimpleTransactionContext_h_ - -#include "qpid/broker/TransactionalStore.h" // qpid::broker::TransactionContext -#include "qpid/broker/TxnHandle.h" -#include "qpid/sys/Mutex.h" - -#include <deque> - -namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -} -namespace broker { -class AsyncResult; -class BrokerAsyncContext; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class QueuedMessage; -class TransactionAsyncContext; - -class SimpleTransactionContext : public qpid::broker::TransactionContext -{ -public: - SimpleTransactionContext(const std::string& xid = std::string()); - SimpleTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, - const std::string& xid = std::string()); - virtual ~SimpleTransactionContext(); -// static void handleAsyncResult(const qpid::broker::AsyncResult* res, -// qpid::broker::BrokerAsyncContext* bc); - - const qpid::broker::TxnHandle& getHandle() const; - qpid::broker::TxnHandle& getHandle(); - bool is2pc() const; - const std::string& getXid() const; - void addEnqueuedMsg(QueuedMessage* qm); - - void prepare(); - void abort(); - void commit(); - -private: - std::string m_xid; - bool m_tpcFlag; - qpid::asyncStore::AsyncStoreImpl* m_store; - qpid::broker::TxnHandle m_txnHandle; - bool m_prepared; - std::deque<QueuedMessage*> m_enqueuedMsgs; - qpid::sys::Mutex m_enqueuedMsgsMutex; - - void localPrepare(); - void setLocalXid(); - - // --- Ascnc op completions (called through handleAsyncResult) --- - void prepareComplete(const TransactionAsyncContext* tc); - void abortComplete(const TransactionAsyncContext* tc); - void commitComplete(const TransactionAsyncContext* tc); - -}; - -}}} // namespace tests:storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_SimpleTransactionContext_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp new file mode 100644 index 0000000000..eff7646de6 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnPublish.cpp + */ + +#include "SimplePersistableMessage.h" +#include "SimplePersistableQueue.h" // debug msg +#include "TxnPublish.h" + +#include "QueuedMessage.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +TxnPublish::TxnPublish(boost::intrusive_ptr<SimplePersistableMessage> msg) : + m_msg(msg) +{ +//std::cout << "TTT new TxnPublish" << std::endl << std::flush; +} + +TxnPublish::~TxnPublish() +{} + +bool +TxnPublish::prepare(qpid::broker::TxnHandle& th) throw() +{ +//std::cout << "TTT TxnPublish::prepare: " << m_queues.size() << " queues" << std::endl << std::flush; + try{ + while (!m_queues.empty()) { + m_queues.front()->prepareEnqueue(th); + m_prepared.push_back(m_queues.front()); + m_queues.pop_front(); + } + return true; + } catch (const std::exception& e) { + std::cerr << "Failed to prepare transaction: " << e.what() << std::endl; + } catch (...) { + std::cerr << "Failed to prepare transaction: (unknown error)" << std::endl; + } + return false; +} + +void +TxnPublish::commit() throw() +{ +//std::cout << "TTT TxnPublish::commit" << std::endl << std::flush; + try { + for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { + (*i)->commitEnqueue(); + } + } catch (const std::exception& e) { + std::cerr << "Failed to commit transaction: " << e.what() << std::endl; + } catch (...) { + std::cerr << "Failed to commit transaction: (unknown error)" << std::endl; + } +} + +void +TxnPublish::rollback() throw() +{ +//std::cout << "TTT TxnPublish::rollback" << std::endl << std::flush; + try { + for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { + (*i)->abortEnqueue(); + } + } catch (const std::exception& e) { + std::cerr << "Failed to rollback transaction: " << e.what() << std::endl; + } catch (...) { + std::cerr << "Failed to rollback transaction: (unknown error)" << std::endl; + } +} + +uint64_t +TxnPublish::contentSize() +{ + return m_msg->contentSize(); +} + +void +TxnPublish::deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue) +{ +//std::cout << "TTT TxnPublish::deliverTo queue=\"" << queue->getName() << "\"" << std::endl << std::flush; + boost::shared_ptr<QueuedMessage> qm(new QueuedMessage(queue.get(), m_msg)); + m_queues.push_back(qm); + m_delivered = true; +} + +SimplePersistableMessage& +TxnPublish::getMessage() +{ + return *m_msg; +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h new file mode 100644 index 0000000000..6e97e99349 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnPublish.h + */ + +#ifndef tests_storePerftools_asyncPerf_TxnPublish_h_ +#define tests_storePerftools_asyncPerf_TxnPublish_h_ + +#include "Deliverable.h" + +#include "qpid/broker/TxnOp.h" + +#include <boost/intrusive_ptr.hpp> +#include <boost/shared_ptr.hpp> +#include <list> + +namespace qpid { +namespace broker { + +class TransactionContext; + +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class QueuedMessage; +class SimplePersistableMessage; +class SimplePersistableQueue; + +class TxnPublish : public qpid::broker::TxnOp, + public Deliverable +{ +public: + TxnPublish(boost::intrusive_ptr<SimplePersistableMessage> msg); + virtual ~TxnPublish(); + + // --- Interface TxOp --- + bool prepare(qpid::broker::TxnHandle& th) throw(); + void commit() throw(); + void rollback() throw(); + + // --- Interface Deliverable --- + uint64_t contentSize(); + void deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue); + SimplePersistableMessage& getMessage(); + +private: + boost::intrusive_ptr<SimplePersistableMessage> m_msg; + std::list<boost::shared_ptr<QueuedMessage> > m_queues; + std::list<boost::shared_ptr<QueuedMessage> > m_prepared; +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_TxnPublish_h_ |