diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 32 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AtomicCounter.h | 106 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/TxnHandleImpl.cpp | 73 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/TxnHandleImpl.h | 24 | ||||
-rw-r--r-- | cpp/src/qpid/broker/AsyncResultQueueImpl.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/AsyncStore.h | 30 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnAsyncContext.cpp | 86 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnAsyncContext.h | 65 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnBuffer.cpp | 188 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnBuffer.h | 69 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnHandle.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnHandle.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnOp.h (renamed from cpp/src/qpid/broker/AsyncStore.cpp) | 27 |
14 files changed, 681 insertions, 39 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index a9fc13363a..9135fcc27e 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -24,6 +24,7 @@ #include "AsyncStoreImpl.h" #include "AsyncOperation.h" +#include "TxnHandleImpl.h" #include "qpid/broker/ConfigHandle.h" #include "qpid/broker/EnqueueHandle.h" @@ -62,6 +63,31 @@ void AsyncStoreImpl::initManagement(qpid::broker::Broker* /*broker*/) {} +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle() +{ + return qpid::broker::TxnHandle(new TxnHandleImpl); +} + +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle(qpid::broker::TxnBuffer* tb) +{ + return qpid::broker::TxnHandle(new TxnHandleImpl(tb)); +} + +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle(const std::string& xid) +{ + return qpid::broker::TxnHandle(new TxnHandleImpl(xid)); +} + +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle(const std::string& xid, + qpid::broker::TxnBuffer* tb) +{ + return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tb)); +} + qpid::broker::ConfigHandle AsyncStoreImpl::createConfigHandle() { @@ -96,12 +122,6 @@ AsyncStoreImpl::createQueueHandle(const std::string& name, return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts)); } -qpid::broker::TxnHandle -AsyncStoreImpl::createTxnHandle(const std::string& xid) -{ - return qpid::broker::TxnHandle(new TxnHandleImpl(xid)); -} - void AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index 3f2058a94c..a00771abf5 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -54,6 +54,10 @@ public: // --- Factory methods for creating handles --- + qpid::broker::TxnHandle createTxnHandle(); + qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb); + qpid::broker::TxnHandle createTxnHandle(const std::string& xid); + qpid::broker::TxnHandle createTxnHandle(const std::string& xid, qpid::broker::TxnBuffer* tb); qpid::broker::ConfigHandle createConfigHandle(); qpid::broker::EnqueueHandle createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, @@ -63,7 +67,6 @@ public: qpid::broker::MessageHandle createMessageHandle(const qpid::broker::DataSource* const dataSrc); qpid::broker::QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts); - qpid::broker::TxnHandle createTxnHandle(const std::string& xid=std::string()); // --- Store async interface --- diff --git a/cpp/src/qpid/asyncStore/AtomicCounter.h b/cpp/src/qpid/asyncStore/AtomicCounter.h new file mode 100644 index 0000000000..6987b09384 --- /dev/null +++ b/cpp/src/qpid/asyncStore/AtomicCounter.h @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AtomicCounter.h + */ + +#ifndef qpid_asyncStore_AtomicCounter_h_ +#define qpid_asyncStore_AtomicCounter_h_ + +#include "qpid/sys/Condition.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" + +namespace qpid { +namespace asyncStore { + +template <class T> +class AtomicCounter +{ +public: + AtomicCounter(const T& initValue = T(0)) : + m_cnt(initValue), + m_cntMutex(), + m_cntCondition() + {} + + virtual ~AtomicCounter() + {} + + T& + get() const + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + return m_cnt; + } + + AtomicCounter& + operator++() + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + ++m_cnt; + return *this; + } + + AtomicCounter& + operator--() + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + if (--m_cnt == 0) { + m_cntCondition.notify(); + } + return *this; + } + + bool + operator==(const AtomicCounter& rhs) + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l1(m_cntMutex); + qpid::sys::ScopedLock<qpid::sys::Mutex> l2(rhs.m_cntMutex); + return m_cnt == rhs.m_cnt; + } + + bool + operator==(const T rhs) + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + return m_cnt == rhs; + } + + void + waitForZero(const qpid::sys::Duration& d) + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + while (m_cnt != 0) { + m_cntCondition.wait(m_cntMutex, qpid::sys::AbsTime(qpid::sys::AbsTime(), d)); + } + } + +private: + T m_cnt; + mutable qpid::sys::Mutex m_cntMutex; + qpid::sys::Condition m_cntCondition; +}; + +typedef AtomicCounter<uint32_t> AsyncOpCounter; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_AtomicCounter_h_ diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp index 7ce01f881c..945b50861d 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp @@ -23,6 +23,8 @@ #include "TxnHandleImpl.h" +#include "qpid/Exception.h" +#include "qpid/broker/TxnBuffer.h" #include "qpid/messaging/PrivateImplRef.h" #include <uuid/uuid.h> @@ -30,16 +32,42 @@ namespace qpid { namespace asyncStore { +TxnHandleImpl::TxnHandleImpl() : + m_tpcFlag(false), + m_asyncOpCnt(0UL), + m_txnBuffer(0) +{ + createLocalXid(); +} + +TxnHandleImpl::TxnHandleImpl(qpid::broker::TxnBuffer* tb) : + m_tpcFlag(false), + m_asyncOpCnt(0UL), + m_txnBuffer(tb) +{ + createLocalXid(); +} + TxnHandleImpl::TxnHandleImpl(const std::string& xid) : m_xid(xid), - m_tpcFlag(!xid.empty()) -{ - if (m_xid.empty()) { // create a local xid from a random uuid - uuid_t uuid; - ::uuid_generate_random(uuid); - char uuidStr[37]; // 36-char uuid + trailing '\0' - ::uuid_unparse(uuid, uuidStr); -// m_xid.assign(uuidStr); + m_tpcFlag(!xid.empty()), + m_asyncOpCnt(0UL), + m_txnBuffer(0) +{ + if (m_xid.empty()) { + createLocalXid(); + } +} + +TxnHandleImpl::TxnHandleImpl(const std::string& xid, + qpid::broker::TxnBuffer* tb) : + m_xid(xid), + m_tpcFlag(!xid.empty()), + m_asyncOpCnt(0UL), + m_txnBuffer(tb) +{ + if (m_xid.empty()) { + createLocalXid(); } } @@ -58,4 +86,33 @@ TxnHandleImpl::is2pc() const return m_tpcFlag; } +void +TxnHandleImpl::incrOpCnt() +{ + ++m_asyncOpCnt; +} + +void +TxnHandleImpl::decrOpCnt() +{ + if (m_asyncOpCnt == 0UL) { + throw qpid::Exception("Transaction async operation count underflow"); + } + if (--m_asyncOpCnt == 0UL && m_txnBuffer) { + m_txnBuffer->asyncLocalCommit(); + } +} + +// private +void +TxnHandleImpl::createLocalXid() +{ + uuid_t uuid; + ::uuid_generate_random(uuid); + char uuidStr[37]; // 36-char uuid + trailing '\0' + ::uuid_unparse(uuid, uuidStr); + m_xid.assign(uuidStr); +//std::cout << "TTT TxnHandleImpl::createLocalXid(): Local XID created: \"" << m_xid << "\"" << std::endl << std::flush; +} + }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h index b28eb0cd4b..e357791508 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h @@ -24,23 +24,45 @@ #ifndef qpid_asyncStore_TxnHandleImpl_h_ #define qpid_asyncStore_TxnHandleImpl_h_ +#include "AtomicCounter.h" + #include "qpid/RefCounted.h" +#include <stdint.h> // uint32_t #include <string> namespace qpid { + +namespace broker { +class TxnBuffer; +} + namespace asyncStore { class TxnHandleImpl : public virtual qpid::RefCounted { public: - TxnHandleImpl(const std::string& xid = std::string()); + TxnHandleImpl(); + TxnHandleImpl(qpid::broker::TxnBuffer* tb); + TxnHandleImpl(const std::string& xid); + TxnHandleImpl(const std::string& xid, qpid::broker::TxnBuffer* tb); virtual ~TxnHandleImpl(); const std::string& getXid() const; bool is2pc() const; + + void submitPrepare(); + void submitCommit(); + void submitAbort(); + + void incrOpCnt(); + void decrOpCnt(); private: std::string m_xid; bool m_tpcFlag; + AsyncOpCounter m_asyncOpCnt; + qpid::broker::TxnBuffer* const m_txnBuffer; + + void createLocalXid(); }; }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp index ab391146f9..9e3298bb4e 100644 --- a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp +++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp @@ -49,7 +49,6 @@ AsyncResultQueueImpl::submit(boost::shared_ptr<AsyncResultHandle> arh) AsyncResultQueueImpl::ResultQueue::Batch::const_iterator AsyncResultQueueImpl::handle(const ResultQueue::Batch& e) { - for (ResultQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { //std::cout << "<== AsyncResultQueueImpl::handle() errNo=" << (*i)->getErrNo() << " errMsg=\"" << (*i)->getErrMsg() << "\"" << std::endl << std::flush; if ((*i)->isValid()) { diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index 7e2ee81620..2f73ec8f3e 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -37,7 +37,7 @@ class AsyncResultHandle; // Broker to subclass as a pollable queue class AsyncResultQueue { public: - virtual ~AsyncResultQueue(); + virtual ~AsyncResultQueue() {} // TODO: Remove boost::shared_ptr<> from this interface virtual void submit(boost::shared_ptr<AsyncResultHandle>) = 0; }; @@ -45,14 +45,14 @@ public: // Subclass this for specific contexts class BrokerAsyncContext { public: - virtual ~BrokerAsyncContext(); + virtual ~BrokerAsyncContext() {} virtual AsyncResultQueue* getAsyncResultQueue() const = 0; virtual void invokeCallback(const AsyncResultHandle* const) const = 0; }; class DataSource { public: - virtual ~DataSource(); + virtual ~DataSource() {} virtual uint64_t getSize() = 0; virtual void write(char* target) = 0; }; @@ -65,13 +65,28 @@ class EnqueueHandle; class EventHandle; class MessageHandle; class QueueHandle; +class TxnBuffer; class TxnHandle; +class AsyncTransactionalStore { +public: + virtual ~AsyncTransactionalStore() {} + + virtual TxnHandle createTxnHandle() = 0; + virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0; + virtual TxnHandle createTxnHandle(const std::string& xid) = 0; + virtual TxnHandle createTxnHandle(const std::string& xid, TxnBuffer* tb) = 0; + + // TODO: Remove boost::shared_ptr<> from this interface + virtual void submitPrepare(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only + virtual void submitCommit(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; +}; // Subclassed by store: -class AsyncStore { +class AsyncStore : public AsyncTransactionalStore { public: - virtual ~AsyncStore(); + virtual ~AsyncStore() {} // --- Factory methods for creating handles --- @@ -80,16 +95,11 @@ public: virtual EventHandle createEventHandle(QueueHandle&, const std::string& key=std::string()) = 0; virtual MessageHandle createMessageHandle(const DataSource* const) = 0; virtual QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts) = 0; - virtual TxnHandle createTxnHandle(const std::string& xid=std::string()) = 0; // Distr. txns must supply xid // --- Store async interface --- // TODO: Remove boost::shared_ptr<> from this interface - virtual void submitPrepare(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only - virtual void submitCommit(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitCreate(ConfigHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0; virtual void submitDestroy(ConfigHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp new file mode 100644 index 0000000000..e8abe99dab --- /dev/null +++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TransactionAsyncContext.cpp + */ + +#include "TxnAsyncContext.h" + +#include <cassert> + +namespace qpid { +namespace broker { + +TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb, + TxnHandle& th, + const qpid::asyncStore::AsyncOperation::opCode op, + qpid::broker::AsyncResultCallback rcb, + qpid::broker::AsyncResultQueue* const arq): + m_tb(tb), + m_th(th), + m_op(op), + m_rcb(rcb), + m_arq(arq) +{ + assert(m_th.isValid()); +} + +TxnAsyncContext::~TxnAsyncContext() +{} + +TxnBuffer* +TxnAsyncContext::getTxnBuffer() const +{ + return m_tb; +} + +qpid::asyncStore::AsyncOperation::opCode +TxnAsyncContext::getOpCode() const +{ + return m_op; +} + +const char* +TxnAsyncContext::getOpStr() const +{ + return qpid::asyncStore::AsyncOperation::getOpStr(m_op); +} + +TxnHandle +TxnAsyncContext::getTransactionContext() const +{ + return m_th; +} + +AsyncResultQueue* +TxnAsyncContext::getAsyncResultQueue() const +{ + return m_arq; +} + +void +TxnAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const +{ + if (m_rcb) { + m_rcb(arh); + } +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h new file mode 100644 index 0000000000..9bdd8f7188 --- /dev/null +++ b/cpp/src/qpid/broker/TxnAsyncContext.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnAsyncContext.h + */ + +#ifndef qpid_broker_TxnAsyncContext_h_ +#define qpid_broker_TxnAsyncContext_h_ + +#include "AsyncStore.h" // qpid::broker::BrokerAsyncContext +#include "TxnHandle.h" + +#include "qpid/asyncStore/AsyncOperation.h" + +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +class TxnAsyncContext: public BrokerAsyncContext +{ +public: + TxnAsyncContext(TxnBuffer* const tb, + TxnHandle& th, + const qpid::asyncStore::AsyncOperation::opCode op, + qpid::broker::AsyncResultCallback rcb, + qpid::broker::AsyncResultQueue* const arq); + virtual ~TxnAsyncContext(); + TxnBuffer* getTxnBuffer() const; + qpid::asyncStore::AsyncOperation::opCode getOpCode() const; + const char* getOpStr() const; + TxnHandle getTransactionContext() const; + + // --- Interface BrokerAsyncContext --- + AsyncResultQueue* getAsyncResultQueue() const; + void invokeCallback(const AsyncResultHandle* const) const; + +private: + TxnBuffer* const m_tb; + TxnHandle m_th; + const qpid::asyncStore::AsyncOperation::opCode m_op; + AsyncResultCallback m_rcb; + AsyncResultQueue* const m_arq; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_TxnAsyncContext_h_ diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp new file mode 100644 index 0000000000..2af8bd4da3 --- /dev/null +++ b/cpp/src/qpid/broker/TxnBuffer.cpp @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnBuffer.cpp + */ + +#include "TxnBuffer.h" + +#include "AsyncResultHandle.h" +#include "AsyncStore.h" +#include "TxnAsyncContext.h" +#include "TxnOp.h" + +#include "qpid/Exception.h" + +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +TxnBuffer::TxnBuffer(AsyncResultQueue& arq) : + m_store(0), + m_resultQueue(arq), + m_state(NONE) +{} + +TxnBuffer::~TxnBuffer() +{} + +void +TxnBuffer::enlist(boost::shared_ptr<TxnOp> op) +{ +//std::cout << "TTT TxnBuffer::enlist" << std::endl << std::flush; + m_ops.push_back(op); +} + +bool +TxnBuffer::prepare(TxnHandle& th) +{ +//std::cout << "TTT TxnBuffer::prepare" << std::endl << std::flush; + for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + if (!(*i)->prepare(th)) { + return false; + } + } + return true; +} + +void +TxnBuffer::commit() +{ +//std::cout << "TTT TxnBuffer::commit" << std::endl << std::flush; + for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + (*i)->commit(); + } + m_ops.clear(); +} + +void +TxnBuffer::rollback() +{ +//std::cout << "TTT TxnBuffer::rollback" << std::endl << std::flush; + for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + (*i)->rollback(); + } + m_ops.clear(); +} + +bool +TxnBuffer::commitLocal(AsyncTransactionalStore* const store) +{ +//std::cout << "TTT TxnBuffer::commitLocal" << std::endl << std::flush; + if (store) { + try { + m_store = store; + asyncLocalCommit(); + } catch (std::exception& e) { + std::cerr << "Commit failed: " << e.what() << std::endl; + } catch (...) { + std::cerr << "Commit failed (unknown exception)" << std::endl; + } + } + return false; +} + +// static +void +TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh) +{ + if (arh) { + boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + tac->getTxnBuffer()->asyncLocalAbort(); + std::cerr << "Transaction xid=\"" << tac->getTransactionContext().getXid() << "\": Operation " << tac->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + tac->getTxnBuffer()->asyncLocalAbort(); + } else { +//std::cout << "TTT TxnBuffer::handleAsyncResult() op=" << tac->getOpStr() << std::endl << std::flush; + if (tac->getOpCode() == qpid::asyncStore::AsyncOperation::TXN_ABORT) { + tac->getTxnBuffer()->asyncLocalAbort(); + } else { + tac->getTxnBuffer()->asyncLocalCommit(); + } + } + } +} + +void +TxnBuffer::asyncLocalCommit() +{ + assert(m_store != 0); + switch(m_state) { + case NONE: +//std::cout << "TTT TxnBuffer::asyncLocalCommit: NONE->PREPARE" << std::endl << std::flush; + m_state = PREPARE; + m_txnHandle = m_store->createTxnHandle(this); + prepare(m_txnHandle); + break; + case PREPARE: +//std::cout << "TTT TxnBuffer::asyncLocalCommit: PREPARE->COMMIT" << std::endl << std::flush; + m_state = COMMIT; + { + boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, + m_txnHandle, + qpid::asyncStore::AsyncOperation::TXN_COMMIT, + &handleAsyncResult, + &m_resultQueue)); + m_store->submitCommit(m_txnHandle, tac); + } + break; + case COMMIT: +//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMMIT->COMPLETE" << std::endl << std::flush; + commit(); + m_state = COMPLETE; + //delete this; // TODO: ugly! Find a better way to handle the life cycle of this class + break; + default: ; +//std::cout << "TTT TxnBuffer:asyncLocalCommit: Unexpected state " << m_state << std::endl << std::flush; + } +} + +void +TxnBuffer::asyncLocalAbort() +{ + assert(m_store != 0); + switch (m_state) { + case NONE: + case PREPARE: + case COMMIT: +//std::cout << "TTT TxnBuffer::asyncRollback: xxx->ROLLBACK" << std::endl << std::flush; + m_state = ROLLBACK; + { + boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, + m_txnHandle, + qpid::asyncStore::AsyncOperation::TXN_ABORT, + &handleAsyncResult, + &m_resultQueue)); + m_store->submitCommit(m_txnHandle, tac); + } + break; + case ROLLBACK: +//std::cout << "TTT TxnBuffer:asyncRollback: ROLLBACK->COMPLETE" << std::endl << std::flush; + rollback(); + m_state = COMPLETE; + //delete this; // TODO: ugly! Find a better way to handle the life cycle of this class + default: ; +//std::cout << "TTT TxnBuffer:asyncRollback: Unexpected state " << m_state << std::endl << std::flush; + } +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h new file mode 100644 index 0000000000..308a91aa03 --- /dev/null +++ b/cpp/src/qpid/broker/TxnBuffer.h @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnBuffer.h + */ + +#ifndef qpid_broker_TxnBuffer_h_ +#define qpid_broker_TxnBuffer_h_ + +#include "TxnHandle.h" + +//#include <boost/enable_shared_from_this.hpp> +#include <boost/shared_ptr.hpp> +#include <vector> + +namespace qpid { +namespace broker { + +class AsyncResultHandle; +class AsyncResultQueue; +class AsyncTransactionalStore; +class TxnOp; + +class TxnBuffer /*: public boost::enable_shared_from_this<TxnBuffer>*/ { +public: + TxnBuffer(AsyncResultQueue& arq); + virtual ~TxnBuffer(); + + void enlist(boost::shared_ptr<TxnOp> op); + bool prepare(TxnHandle& th); + void commit(); + void rollback(); + bool commitLocal(AsyncTransactionalStore* const store); + + // --- Async operations --- + static void handleAsyncResult(const AsyncResultHandle* const arh); + void asyncLocalCommit(); + void asyncLocalAbort(); + +private: + std::vector<boost::shared_ptr<TxnOp> > m_ops; + TxnHandle m_txnHandle; + AsyncTransactionalStore* m_store; + AsyncResultQueue& m_resultQueue; + + typedef enum {NONE = 0, PREPARE, COMMIT, ROLLBACK, COMPLETE} e_txnState; + e_txnState m_state; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_TxnBuffer_h_ diff --git a/cpp/src/qpid/broker/TxnHandle.cpp b/cpp/src/qpid/broker/TxnHandle.cpp index 1e6cd2ac6f..07d46b4235 100644 --- a/cpp/src/qpid/broker/TxnHandle.cpp +++ b/cpp/src/qpid/broker/TxnHandle.cpp @@ -69,4 +69,16 @@ TxnHandle::is2pc() const return impl->is2pc(); } +void +TxnHandle::incrOpCnt() +{ + impl->incrOpCnt(); +} + +void +TxnHandle::decrOpCnt() +{ + impl->decrOpCnt(); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnHandle.h b/cpp/src/qpid/broker/TxnHandle.h index 34a9c69434..8bed14e16a 100644 --- a/cpp/src/qpid/broker/TxnHandle.h +++ b/cpp/src/qpid/broker/TxnHandle.h @@ -44,6 +44,8 @@ public: // TxnHandleImpl methods const std::string& getXid() const; bool is2pc() const; + void incrOpCnt(); + void decrOpCnt(); private: friend class qpid::messaging::PrivateImplRef<TxnHandle>; diff --git a/cpp/src/qpid/broker/AsyncStore.cpp b/cpp/src/qpid/broker/TxnOp.h index 10cb3d27cf..e98429535e 100644 --- a/cpp/src/qpid/broker/AsyncStore.cpp +++ b/cpp/src/qpid/broker/TxnOp.h @@ -17,21 +17,24 @@ * under the License. */ -#include "AsyncStore.h" +/** + * \file TxnOp.h + */ + +#ifndef qpid_broker_TxnOp_h_ +#define qpid_broker_TxnOp_h_ namespace qpid { namespace broker { -AsyncResultQueue::~AsyncResultQueue() -{} - -BrokerAsyncContext::~BrokerAsyncContext() -{} - -DataSource::~DataSource() -{} - -AsyncStore::~AsyncStore() -{} +class TxnOp{ +public: + virtual ~TxnOp() {} + virtual bool prepare(TxnHandle& th) throw() = 0; + virtual void commit() throw() = 0; + virtual void rollback() throw() = 0; +}; }} // namespace qpid::broker + +#endif // qpid_broker_TxnOp_h_ |