diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-06-15 19:21:07 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-06-15 19:21:07 +0000 |
commit | 58337ca40df3a57a16cdee9b7f6b4fe0361b0018 (patch) | |
tree | 391ad7ad1ea8cd42a7e9d4890c724b186e00f38b /cpp/src/qpid/asyncStore | |
parent | 01174a9e568f11cd5aa4f22aaa914e00ab9fe163 (diff) | |
download | qpid-python-58337ca40df3a57a16cdee9b7f6b4fe0361b0018.tar.gz |
QPID-3858: WIP - async txns for msg publish pathway, but there are some race/thread issues to sort out.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1350745 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/asyncStore')
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 32 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AtomicCounter.h | 106 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/TxnHandleImpl.cpp | 73 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/TxnHandleImpl.h | 24 |
5 files changed, 224 insertions, 16 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index a9fc13363a..9135fcc27e 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -24,6 +24,7 @@ #include "AsyncStoreImpl.h" #include "AsyncOperation.h" +#include "TxnHandleImpl.h" #include "qpid/broker/ConfigHandle.h" #include "qpid/broker/EnqueueHandle.h" @@ -62,6 +63,31 @@ void AsyncStoreImpl::initManagement(qpid::broker::Broker* /*broker*/) {} +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle() +{ + return qpid::broker::TxnHandle(new TxnHandleImpl); +} + +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle(qpid::broker::TxnBuffer* tb) +{ + return qpid::broker::TxnHandle(new TxnHandleImpl(tb)); +} + +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle(const std::string& xid) +{ + return qpid::broker::TxnHandle(new TxnHandleImpl(xid)); +} + +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle(const std::string& xid, + qpid::broker::TxnBuffer* tb) +{ + return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tb)); +} + qpid::broker::ConfigHandle AsyncStoreImpl::createConfigHandle() { @@ -96,12 +122,6 @@ AsyncStoreImpl::createQueueHandle(const std::string& name, return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts)); } -qpid::broker::TxnHandle -AsyncStoreImpl::createTxnHandle(const std::string& xid) -{ - return qpid::broker::TxnHandle(new TxnHandleImpl(xid)); -} - void AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index 3f2058a94c..a00771abf5 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -54,6 +54,10 @@ public: // --- Factory methods for creating handles --- + qpid::broker::TxnHandle createTxnHandle(); + qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb); + qpid::broker::TxnHandle createTxnHandle(const std::string& xid); + qpid::broker::TxnHandle createTxnHandle(const std::string& xid, qpid::broker::TxnBuffer* tb); qpid::broker::ConfigHandle createConfigHandle(); qpid::broker::EnqueueHandle createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, @@ -63,7 +67,6 @@ public: qpid::broker::MessageHandle createMessageHandle(const qpid::broker::DataSource* const dataSrc); qpid::broker::QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts); - qpid::broker::TxnHandle createTxnHandle(const std::string& xid=std::string()); // --- Store async interface --- diff --git a/cpp/src/qpid/asyncStore/AtomicCounter.h b/cpp/src/qpid/asyncStore/AtomicCounter.h new file mode 100644 index 0000000000..6987b09384 --- /dev/null +++ b/cpp/src/qpid/asyncStore/AtomicCounter.h @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AtomicCounter.h + */ + +#ifndef qpid_asyncStore_AtomicCounter_h_ +#define qpid_asyncStore_AtomicCounter_h_ + +#include "qpid/sys/Condition.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" + +namespace qpid { +namespace asyncStore { + +template <class T> +class AtomicCounter +{ +public: + AtomicCounter(const T& initValue = T(0)) : + m_cnt(initValue), + m_cntMutex(), + m_cntCondition() + {} + + virtual ~AtomicCounter() + {} + + T& + get() const + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + return m_cnt; + } + + AtomicCounter& + operator++() + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + ++m_cnt; + return *this; + } + + AtomicCounter& + operator--() + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + if (--m_cnt == 0) { + m_cntCondition.notify(); + } + return *this; + } + + bool + operator==(const AtomicCounter& rhs) + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l1(m_cntMutex); + qpid::sys::ScopedLock<qpid::sys::Mutex> l2(rhs.m_cntMutex); + return m_cnt == rhs.m_cnt; + } + + bool + operator==(const T rhs) + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + return m_cnt == rhs; + } + + void + waitForZero(const qpid::sys::Duration& d) + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + while (m_cnt != 0) { + m_cntCondition.wait(m_cntMutex, qpid::sys::AbsTime(qpid::sys::AbsTime(), d)); + } + } + +private: + T m_cnt; + mutable qpid::sys::Mutex m_cntMutex; + qpid::sys::Condition m_cntCondition; +}; + +typedef AtomicCounter<uint32_t> AsyncOpCounter; + +}} // namespace qpid::asyncStore + +#endif // qpid_asyncStore_AtomicCounter_h_ diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp index 7ce01f881c..945b50861d 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp @@ -23,6 +23,8 @@ #include "TxnHandleImpl.h" +#include "qpid/Exception.h" +#include "qpid/broker/TxnBuffer.h" #include "qpid/messaging/PrivateImplRef.h" #include <uuid/uuid.h> @@ -30,16 +32,42 @@ namespace qpid { namespace asyncStore { +TxnHandleImpl::TxnHandleImpl() : + m_tpcFlag(false), + m_asyncOpCnt(0UL), + m_txnBuffer(0) +{ + createLocalXid(); +} + +TxnHandleImpl::TxnHandleImpl(qpid::broker::TxnBuffer* tb) : + m_tpcFlag(false), + m_asyncOpCnt(0UL), + m_txnBuffer(tb) +{ + createLocalXid(); +} + TxnHandleImpl::TxnHandleImpl(const std::string& xid) : m_xid(xid), - m_tpcFlag(!xid.empty()) -{ - if (m_xid.empty()) { // create a local xid from a random uuid - uuid_t uuid; - ::uuid_generate_random(uuid); - char uuidStr[37]; // 36-char uuid + trailing '\0' - ::uuid_unparse(uuid, uuidStr); -// m_xid.assign(uuidStr); + m_tpcFlag(!xid.empty()), + m_asyncOpCnt(0UL), + m_txnBuffer(0) +{ + if (m_xid.empty()) { + createLocalXid(); + } +} + +TxnHandleImpl::TxnHandleImpl(const std::string& xid, + qpid::broker::TxnBuffer* tb) : + m_xid(xid), + m_tpcFlag(!xid.empty()), + m_asyncOpCnt(0UL), + m_txnBuffer(tb) +{ + if (m_xid.empty()) { + createLocalXid(); } } @@ -58,4 +86,33 @@ TxnHandleImpl::is2pc() const return m_tpcFlag; } +void +TxnHandleImpl::incrOpCnt() +{ + ++m_asyncOpCnt; +} + +void +TxnHandleImpl::decrOpCnt() +{ + if (m_asyncOpCnt == 0UL) { + throw qpid::Exception("Transaction async operation count underflow"); + } + if (--m_asyncOpCnt == 0UL && m_txnBuffer) { + m_txnBuffer->asyncLocalCommit(); + } +} + +// private +void +TxnHandleImpl::createLocalXid() +{ + uuid_t uuid; + ::uuid_generate_random(uuid); + char uuidStr[37]; // 36-char uuid + trailing '\0' + ::uuid_unparse(uuid, uuidStr); + m_xid.assign(uuidStr); +//std::cout << "TTT TxnHandleImpl::createLocalXid(): Local XID created: \"" << m_xid << "\"" << std::endl << std::flush; +} + }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h index b28eb0cd4b..e357791508 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h @@ -24,23 +24,45 @@ #ifndef qpid_asyncStore_TxnHandleImpl_h_ #define qpid_asyncStore_TxnHandleImpl_h_ +#include "AtomicCounter.h" + #include "qpid/RefCounted.h" +#include <stdint.h> // uint32_t #include <string> namespace qpid { + +namespace broker { +class TxnBuffer; +} + namespace asyncStore { class TxnHandleImpl : public virtual qpid::RefCounted { public: - TxnHandleImpl(const std::string& xid = std::string()); + TxnHandleImpl(); + TxnHandleImpl(qpid::broker::TxnBuffer* tb); + TxnHandleImpl(const std::string& xid); + TxnHandleImpl(const std::string& xid, qpid::broker::TxnBuffer* tb); virtual ~TxnHandleImpl(); const std::string& getXid() const; bool is2pc() const; + + void submitPrepare(); + void submitCommit(); + void submitAbort(); + + void incrOpCnt(); + void decrOpCnt(); private: std::string m_xid; bool m_tpcFlag; + AsyncOpCounter m_asyncOpCnt; + qpid::broker::TxnBuffer* const m_txnBuffer; + + void createLocalXid(); }; }} // namespace qpid::asyncStore |