summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/asyncStore
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-06-15 19:21:07 +0000
committerKim van der Riet <kpvdr@apache.org>2012-06-15 19:21:07 +0000
commit58337ca40df3a57a16cdee9b7f6b4fe0361b0018 (patch)
tree391ad7ad1ea8cd42a7e9d4890c724b186e00f38b /cpp/src/qpid/asyncStore
parent01174a9e568f11cd5aa4f22aaa914e00ab9fe163 (diff)
downloadqpid-python-58337ca40df3a57a16cdee9b7f6b4fe0361b0018.tar.gz
QPID-3858: WIP - async txns for msg publish pathway, but there are some race/thread issues to sort out.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1350745 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/asyncStore')
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp32
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.h5
-rw-r--r--cpp/src/qpid/asyncStore/AtomicCounter.h106
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.cpp73
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.h24
5 files changed, 224 insertions, 16 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index a9fc13363a..9135fcc27e 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -24,6 +24,7 @@
#include "AsyncStoreImpl.h"
#include "AsyncOperation.h"
+#include "TxnHandleImpl.h"
#include "qpid/broker/ConfigHandle.h"
#include "qpid/broker/EnqueueHandle.h"
@@ -62,6 +63,31 @@ void
AsyncStoreImpl::initManagement(qpid::broker::Broker* /*broker*/)
{}
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle()
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl);
+}
+
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle(qpid::broker::TxnBuffer* tb)
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl(tb));
+}
+
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle(const std::string& xid)
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl(xid));
+}
+
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle(const std::string& xid,
+ qpid::broker::TxnBuffer* tb)
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tb));
+}
+
qpid::broker::ConfigHandle
AsyncStoreImpl::createConfigHandle()
{
@@ -96,12 +122,6 @@ AsyncStoreImpl::createQueueHandle(const std::string& name,
return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts));
}
-qpid::broker::TxnHandle
-AsyncStoreImpl::createTxnHandle(const std::string& xid)
-{
- return qpid::broker::TxnHandle(new TxnHandleImpl(xid));
-}
-
void
AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
index 3f2058a94c..a00771abf5 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
@@ -54,6 +54,10 @@ public:
// --- Factory methods for creating handles ---
+ qpid::broker::TxnHandle createTxnHandle();
+ qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb);
+ qpid::broker::TxnHandle createTxnHandle(const std::string& xid);
+ qpid::broker::TxnHandle createTxnHandle(const std::string& xid, qpid::broker::TxnBuffer* tb);
qpid::broker::ConfigHandle createConfigHandle();
qpid::broker::EnqueueHandle createEnqueueHandle(qpid::broker::MessageHandle& msgHandle,
@@ -63,7 +67,6 @@ public:
qpid::broker::MessageHandle createMessageHandle(const qpid::broker::DataSource* const dataSrc);
qpid::broker::QueueHandle createQueueHandle(const std::string& name,
const qpid::types::Variant::Map& opts);
- qpid::broker::TxnHandle createTxnHandle(const std::string& xid=std::string());
// --- Store async interface ---
diff --git a/cpp/src/qpid/asyncStore/AtomicCounter.h b/cpp/src/qpid/asyncStore/AtomicCounter.h
new file mode 100644
index 0000000000..6987b09384
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/AtomicCounter.h
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AtomicCounter.h
+ */
+
+#ifndef qpid_asyncStore_AtomicCounter_h_
+#define qpid_asyncStore_AtomicCounter_h_
+
+#include "qpid/sys/Condition.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace asyncStore {
+
+template <class T>
+class AtomicCounter
+{
+public:
+ AtomicCounter(const T& initValue = T(0)) :
+ m_cnt(initValue),
+ m_cntMutex(),
+ m_cntCondition()
+ {}
+
+ virtual ~AtomicCounter()
+ {}
+
+ T&
+ get() const
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ return m_cnt;
+ }
+
+ AtomicCounter&
+ operator++()
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ ++m_cnt;
+ return *this;
+ }
+
+ AtomicCounter&
+ operator--()
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ if (--m_cnt == 0) {
+ m_cntCondition.notify();
+ }
+ return *this;
+ }
+
+ bool
+ operator==(const AtomicCounter& rhs)
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l1(m_cntMutex);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l2(rhs.m_cntMutex);
+ return m_cnt == rhs.m_cnt;
+ }
+
+ bool
+ operator==(const T rhs)
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ return m_cnt == rhs;
+ }
+
+ void
+ waitForZero(const qpid::sys::Duration& d)
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ while (m_cnt != 0) {
+ m_cntCondition.wait(m_cntMutex, qpid::sys::AbsTime(qpid::sys::AbsTime(), d));
+ }
+ }
+
+private:
+ T m_cnt;
+ mutable qpid::sys::Mutex m_cntMutex;
+ qpid::sys::Condition m_cntCondition;
+};
+
+typedef AtomicCounter<uint32_t> AsyncOpCounter;
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_AtomicCounter_h_
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
index 7ce01f881c..945b50861d 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
@@ -23,6 +23,8 @@
#include "TxnHandleImpl.h"
+#include "qpid/Exception.h"
+#include "qpid/broker/TxnBuffer.h"
#include "qpid/messaging/PrivateImplRef.h"
#include <uuid/uuid.h>
@@ -30,16 +32,42 @@
namespace qpid {
namespace asyncStore {
+TxnHandleImpl::TxnHandleImpl() :
+ m_tpcFlag(false),
+ m_asyncOpCnt(0UL),
+ m_txnBuffer(0)
+{
+ createLocalXid();
+}
+
+TxnHandleImpl::TxnHandleImpl(qpid::broker::TxnBuffer* tb) :
+ m_tpcFlag(false),
+ m_asyncOpCnt(0UL),
+ m_txnBuffer(tb)
+{
+ createLocalXid();
+}
+
TxnHandleImpl::TxnHandleImpl(const std::string& xid) :
m_xid(xid),
- m_tpcFlag(!xid.empty())
-{
- if (m_xid.empty()) { // create a local xid from a random uuid
- uuid_t uuid;
- ::uuid_generate_random(uuid);
- char uuidStr[37]; // 36-char uuid + trailing '\0'
- ::uuid_unparse(uuid, uuidStr);
-// m_xid.assign(uuidStr);
+ m_tpcFlag(!xid.empty()),
+ m_asyncOpCnt(0UL),
+ m_txnBuffer(0)
+{
+ if (m_xid.empty()) {
+ createLocalXid();
+ }
+}
+
+TxnHandleImpl::TxnHandleImpl(const std::string& xid,
+ qpid::broker::TxnBuffer* tb) :
+ m_xid(xid),
+ m_tpcFlag(!xid.empty()),
+ m_asyncOpCnt(0UL),
+ m_txnBuffer(tb)
+{
+ if (m_xid.empty()) {
+ createLocalXid();
}
}
@@ -58,4 +86,33 @@ TxnHandleImpl::is2pc() const
return m_tpcFlag;
}
+void
+TxnHandleImpl::incrOpCnt()
+{
+ ++m_asyncOpCnt;
+}
+
+void
+TxnHandleImpl::decrOpCnt()
+{
+ if (m_asyncOpCnt == 0UL) {
+ throw qpid::Exception("Transaction async operation count underflow");
+ }
+ if (--m_asyncOpCnt == 0UL && m_txnBuffer) {
+ m_txnBuffer->asyncLocalCommit();
+ }
+}
+
+// private
+void
+TxnHandleImpl::createLocalXid()
+{
+ uuid_t uuid;
+ ::uuid_generate_random(uuid);
+ char uuidStr[37]; // 36-char uuid + trailing '\0'
+ ::uuid_unparse(uuid, uuidStr);
+ m_xid.assign(uuidStr);
+//std::cout << "TTT TxnHandleImpl::createLocalXid(): Local XID created: \"" << m_xid << "\"" << std::endl << std::flush;
+}
+
}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
index b28eb0cd4b..e357791508 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
@@ -24,23 +24,45 @@
#ifndef qpid_asyncStore_TxnHandleImpl_h_
#define qpid_asyncStore_TxnHandleImpl_h_
+#include "AtomicCounter.h"
+
#include "qpid/RefCounted.h"
+#include <stdint.h> // uint32_t
#include <string>
namespace qpid {
+
+namespace broker {
+class TxnBuffer;
+}
+
namespace asyncStore {
class TxnHandleImpl : public virtual qpid::RefCounted
{
public:
- TxnHandleImpl(const std::string& xid = std::string());
+ TxnHandleImpl();
+ TxnHandleImpl(qpid::broker::TxnBuffer* tb);
+ TxnHandleImpl(const std::string& xid);
+ TxnHandleImpl(const std::string& xid, qpid::broker::TxnBuffer* tb);
virtual ~TxnHandleImpl();
const std::string& getXid() const;
bool is2pc() const;
+
+ void submitPrepare();
+ void submitCommit();
+ void submitAbort();
+
+ void incrOpCnt();
+ void decrOpCnt();
private:
std::string m_xid;
bool m_tpcFlag;
+ AsyncOpCounter m_asyncOpCnt;
+ qpid::broker::TxnBuffer* const m_txnBuffer;
+
+ void createLocalXid();
};
}} // namespace qpid::asyncStore