summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp32
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.h5
-rw-r--r--cpp/src/qpid/asyncStore/AtomicCounter.h106
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.cpp73
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.h24
-rw-r--r--cpp/src/qpid/broker/AsyncResultQueueImpl.cpp1
-rw-r--r--cpp/src/qpid/broker/AsyncStore.h30
-rw-r--r--cpp/src/qpid/broker/TxnAsyncContext.cpp86
-rw-r--r--cpp/src/qpid/broker/TxnAsyncContext.h65
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.cpp188
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.h69
-rw-r--r--cpp/src/qpid/broker/TxnHandle.cpp12
-rw-r--r--cpp/src/qpid/broker/TxnHandle.h2
-rw-r--r--cpp/src/qpid/broker/TxnOp.h (renamed from cpp/src/qpid/broker/AsyncStore.cpp)27
14 files changed, 681 insertions, 39 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index a9fc13363a..9135fcc27e 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -24,6 +24,7 @@
#include "AsyncStoreImpl.h"
#include "AsyncOperation.h"
+#include "TxnHandleImpl.h"
#include "qpid/broker/ConfigHandle.h"
#include "qpid/broker/EnqueueHandle.h"
@@ -62,6 +63,31 @@ void
AsyncStoreImpl::initManagement(qpid::broker::Broker* /*broker*/)
{}
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle()
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl);
+}
+
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle(qpid::broker::TxnBuffer* tb)
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl(tb));
+}
+
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle(const std::string& xid)
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl(xid));
+}
+
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle(const std::string& xid,
+ qpid::broker::TxnBuffer* tb)
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tb));
+}
+
qpid::broker::ConfigHandle
AsyncStoreImpl::createConfigHandle()
{
@@ -96,12 +122,6 @@ AsyncStoreImpl::createQueueHandle(const std::string& name,
return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts));
}
-qpid::broker::TxnHandle
-AsyncStoreImpl::createTxnHandle(const std::string& xid)
-{
- return qpid::broker::TxnHandle(new TxnHandleImpl(xid));
-}
-
void
AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
index 3f2058a94c..a00771abf5 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
@@ -54,6 +54,10 @@ public:
// --- Factory methods for creating handles ---
+ qpid::broker::TxnHandle createTxnHandle();
+ qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb);
+ qpid::broker::TxnHandle createTxnHandle(const std::string& xid);
+ qpid::broker::TxnHandle createTxnHandle(const std::string& xid, qpid::broker::TxnBuffer* tb);
qpid::broker::ConfigHandle createConfigHandle();
qpid::broker::EnqueueHandle createEnqueueHandle(qpid::broker::MessageHandle& msgHandle,
@@ -63,7 +67,6 @@ public:
qpid::broker::MessageHandle createMessageHandle(const qpid::broker::DataSource* const dataSrc);
qpid::broker::QueueHandle createQueueHandle(const std::string& name,
const qpid::types::Variant::Map& opts);
- qpid::broker::TxnHandle createTxnHandle(const std::string& xid=std::string());
// --- Store async interface ---
diff --git a/cpp/src/qpid/asyncStore/AtomicCounter.h b/cpp/src/qpid/asyncStore/AtomicCounter.h
new file mode 100644
index 0000000000..6987b09384
--- /dev/null
+++ b/cpp/src/qpid/asyncStore/AtomicCounter.h
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AtomicCounter.h
+ */
+
+#ifndef qpid_asyncStore_AtomicCounter_h_
+#define qpid_asyncStore_AtomicCounter_h_
+
+#include "qpid/sys/Condition.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace asyncStore {
+
+template <class T>
+class AtomicCounter
+{
+public:
+ AtomicCounter(const T& initValue = T(0)) :
+ m_cnt(initValue),
+ m_cntMutex(),
+ m_cntCondition()
+ {}
+
+ virtual ~AtomicCounter()
+ {}
+
+ T&
+ get() const
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ return m_cnt;
+ }
+
+ AtomicCounter&
+ operator++()
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ ++m_cnt;
+ return *this;
+ }
+
+ AtomicCounter&
+ operator--()
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ if (--m_cnt == 0) {
+ m_cntCondition.notify();
+ }
+ return *this;
+ }
+
+ bool
+ operator==(const AtomicCounter& rhs)
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l1(m_cntMutex);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l2(rhs.m_cntMutex);
+ return m_cnt == rhs.m_cnt;
+ }
+
+ bool
+ operator==(const T rhs)
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ return m_cnt == rhs;
+ }
+
+ void
+ waitForZero(const qpid::sys::Duration& d)
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ while (m_cnt != 0) {
+ m_cntCondition.wait(m_cntMutex, qpid::sys::AbsTime(qpid::sys::AbsTime(), d));
+ }
+ }
+
+private:
+ T m_cnt;
+ mutable qpid::sys::Mutex m_cntMutex;
+ qpid::sys::Condition m_cntCondition;
+};
+
+typedef AtomicCounter<uint32_t> AsyncOpCounter;
+
+}} // namespace qpid::asyncStore
+
+#endif // qpid_asyncStore_AtomicCounter_h_
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
index 7ce01f881c..945b50861d 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
@@ -23,6 +23,8 @@
#include "TxnHandleImpl.h"
+#include "qpid/Exception.h"
+#include "qpid/broker/TxnBuffer.h"
#include "qpid/messaging/PrivateImplRef.h"
#include <uuid/uuid.h>
@@ -30,16 +32,42 @@
namespace qpid {
namespace asyncStore {
+TxnHandleImpl::TxnHandleImpl() :
+ m_tpcFlag(false),
+ m_asyncOpCnt(0UL),
+ m_txnBuffer(0)
+{
+ createLocalXid();
+}
+
+TxnHandleImpl::TxnHandleImpl(qpid::broker::TxnBuffer* tb) :
+ m_tpcFlag(false),
+ m_asyncOpCnt(0UL),
+ m_txnBuffer(tb)
+{
+ createLocalXid();
+}
+
TxnHandleImpl::TxnHandleImpl(const std::string& xid) :
m_xid(xid),
- m_tpcFlag(!xid.empty())
-{
- if (m_xid.empty()) { // create a local xid from a random uuid
- uuid_t uuid;
- ::uuid_generate_random(uuid);
- char uuidStr[37]; // 36-char uuid + trailing '\0'
- ::uuid_unparse(uuid, uuidStr);
-// m_xid.assign(uuidStr);
+ m_tpcFlag(!xid.empty()),
+ m_asyncOpCnt(0UL),
+ m_txnBuffer(0)
+{
+ if (m_xid.empty()) {
+ createLocalXid();
+ }
+}
+
+TxnHandleImpl::TxnHandleImpl(const std::string& xid,
+ qpid::broker::TxnBuffer* tb) :
+ m_xid(xid),
+ m_tpcFlag(!xid.empty()),
+ m_asyncOpCnt(0UL),
+ m_txnBuffer(tb)
+{
+ if (m_xid.empty()) {
+ createLocalXid();
}
}
@@ -58,4 +86,33 @@ TxnHandleImpl::is2pc() const
return m_tpcFlag;
}
+void
+TxnHandleImpl::incrOpCnt()
+{
+ ++m_asyncOpCnt;
+}
+
+void
+TxnHandleImpl::decrOpCnt()
+{
+ if (m_asyncOpCnt == 0UL) {
+ throw qpid::Exception("Transaction async operation count underflow");
+ }
+ if (--m_asyncOpCnt == 0UL && m_txnBuffer) {
+ m_txnBuffer->asyncLocalCommit();
+ }
+}
+
+// private
+void
+TxnHandleImpl::createLocalXid()
+{
+ uuid_t uuid;
+ ::uuid_generate_random(uuid);
+ char uuidStr[37]; // 36-char uuid + trailing '\0'
+ ::uuid_unparse(uuid, uuidStr);
+ m_xid.assign(uuidStr);
+//std::cout << "TTT TxnHandleImpl::createLocalXid(): Local XID created: \"" << m_xid << "\"" << std::endl << std::flush;
+}
+
}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
index b28eb0cd4b..e357791508 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
@@ -24,23 +24,45 @@
#ifndef qpid_asyncStore_TxnHandleImpl_h_
#define qpid_asyncStore_TxnHandleImpl_h_
+#include "AtomicCounter.h"
+
#include "qpid/RefCounted.h"
+#include <stdint.h> // uint32_t
#include <string>
namespace qpid {
+
+namespace broker {
+class TxnBuffer;
+}
+
namespace asyncStore {
class TxnHandleImpl : public virtual qpid::RefCounted
{
public:
- TxnHandleImpl(const std::string& xid = std::string());
+ TxnHandleImpl();
+ TxnHandleImpl(qpid::broker::TxnBuffer* tb);
+ TxnHandleImpl(const std::string& xid);
+ TxnHandleImpl(const std::string& xid, qpid::broker::TxnBuffer* tb);
virtual ~TxnHandleImpl();
const std::string& getXid() const;
bool is2pc() const;
+
+ void submitPrepare();
+ void submitCommit();
+ void submitAbort();
+
+ void incrOpCnt();
+ void decrOpCnt();
private:
std::string m_xid;
bool m_tpcFlag;
+ AsyncOpCounter m_asyncOpCnt;
+ qpid::broker::TxnBuffer* const m_txnBuffer;
+
+ void createLocalXid();
};
}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp
index ab391146f9..9e3298bb4e 100644
--- a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp
+++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp
@@ -49,7 +49,6 @@ AsyncResultQueueImpl::submit(boost::shared_ptr<AsyncResultHandle> arh)
AsyncResultQueueImpl::ResultQueue::Batch::const_iterator
AsyncResultQueueImpl::handle(const ResultQueue::Batch& e)
{
-
for (ResultQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
//std::cout << "<== AsyncResultQueueImpl::handle() errNo=" << (*i)->getErrNo() << " errMsg=\"" << (*i)->getErrMsg() << "\"" << std::endl << std::flush;
if ((*i)->isValid()) {
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h
index 7e2ee81620..2f73ec8f3e 100644
--- a/cpp/src/qpid/broker/AsyncStore.h
+++ b/cpp/src/qpid/broker/AsyncStore.h
@@ -37,7 +37,7 @@ class AsyncResultHandle;
// Broker to subclass as a pollable queue
class AsyncResultQueue {
public:
- virtual ~AsyncResultQueue();
+ virtual ~AsyncResultQueue() {}
// TODO: Remove boost::shared_ptr<> from this interface
virtual void submit(boost::shared_ptr<AsyncResultHandle>) = 0;
};
@@ -45,14 +45,14 @@ public:
// Subclass this for specific contexts
class BrokerAsyncContext {
public:
- virtual ~BrokerAsyncContext();
+ virtual ~BrokerAsyncContext() {}
virtual AsyncResultQueue* getAsyncResultQueue() const = 0;
virtual void invokeCallback(const AsyncResultHandle* const) const = 0;
};
class DataSource {
public:
- virtual ~DataSource();
+ virtual ~DataSource() {}
virtual uint64_t getSize() = 0;
virtual void write(char* target) = 0;
};
@@ -65,13 +65,28 @@ class EnqueueHandle;
class EventHandle;
class MessageHandle;
class QueueHandle;
+class TxnBuffer;
class TxnHandle;
+class AsyncTransactionalStore {
+public:
+ virtual ~AsyncTransactionalStore() {}
+
+ virtual TxnHandle createTxnHandle() = 0;
+ virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0;
+ virtual TxnHandle createTxnHandle(const std::string& xid) = 0;
+ virtual TxnHandle createTxnHandle(const std::string& xid, TxnBuffer* tb) = 0;
+
+ // TODO: Remove boost::shared_ptr<> from this interface
+ virtual void submitPrepare(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only
+ virtual void submitCommit(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
+};
// Subclassed by store:
-class AsyncStore {
+class AsyncStore : public AsyncTransactionalStore {
public:
- virtual ~AsyncStore();
+ virtual ~AsyncStore() {}
// --- Factory methods for creating handles ---
@@ -80,16 +95,11 @@ public:
virtual EventHandle createEventHandle(QueueHandle&, const std::string& key=std::string()) = 0;
virtual MessageHandle createMessageHandle(const DataSource* const) = 0;
virtual QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts) = 0;
- virtual TxnHandle createTxnHandle(const std::string& xid=std::string()) = 0; // Distr. txns must supply xid
// --- Store async interface ---
// TODO: Remove boost::shared_ptr<> from this interface
- virtual void submitPrepare(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only
- virtual void submitCommit(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
- virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
-
virtual void submitCreate(ConfigHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0;
virtual void submitDestroy(ConfigHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp
new file mode 100644
index 0000000000..e8abe99dab
--- /dev/null
+++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TransactionAsyncContext.cpp
+ */
+
+#include "TxnAsyncContext.h"
+
+#include <cassert>
+
+namespace qpid {
+namespace broker {
+
+TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb,
+ TxnHandle& th,
+ const qpid::asyncStore::AsyncOperation::opCode op,
+ qpid::broker::AsyncResultCallback rcb,
+ qpid::broker::AsyncResultQueue* const arq):
+ m_tb(tb),
+ m_th(th),
+ m_op(op),
+ m_rcb(rcb),
+ m_arq(arq)
+{
+ assert(m_th.isValid());
+}
+
+TxnAsyncContext::~TxnAsyncContext()
+{}
+
+TxnBuffer*
+TxnAsyncContext::getTxnBuffer() const
+{
+ return m_tb;
+}
+
+qpid::asyncStore::AsyncOperation::opCode
+TxnAsyncContext::getOpCode() const
+{
+ return m_op;
+}
+
+const char*
+TxnAsyncContext::getOpStr() const
+{
+ return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
+}
+
+TxnHandle
+TxnAsyncContext::getTransactionContext() const
+{
+ return m_th;
+}
+
+AsyncResultQueue*
+TxnAsyncContext::getAsyncResultQueue() const
+{
+ return m_arq;
+}
+
+void
+TxnAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const
+{
+ if (m_rcb) {
+ m_rcb(arh);
+ }
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h
new file mode 100644
index 0000000000..9bdd8f7188
--- /dev/null
+++ b/cpp/src/qpid/broker/TxnAsyncContext.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TxnAsyncContext.h
+ */
+
+#ifndef qpid_broker_TxnAsyncContext_h_
+#define qpid_broker_TxnAsyncContext_h_
+
+#include "AsyncStore.h" // qpid::broker::BrokerAsyncContext
+#include "TxnHandle.h"
+
+#include "qpid/asyncStore/AsyncOperation.h"
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+class TxnAsyncContext: public BrokerAsyncContext
+{
+public:
+ TxnAsyncContext(TxnBuffer* const tb,
+ TxnHandle& th,
+ const qpid::asyncStore::AsyncOperation::opCode op,
+ qpid::broker::AsyncResultCallback rcb,
+ qpid::broker::AsyncResultQueue* const arq);
+ virtual ~TxnAsyncContext();
+ TxnBuffer* getTxnBuffer() const;
+ qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
+ const char* getOpStr() const;
+ TxnHandle getTransactionContext() const;
+
+ // --- Interface BrokerAsyncContext ---
+ AsyncResultQueue* getAsyncResultQueue() const;
+ void invokeCallback(const AsyncResultHandle* const) const;
+
+private:
+ TxnBuffer* const m_tb;
+ TxnHandle m_th;
+ const qpid::asyncStore::AsyncOperation::opCode m_op;
+ AsyncResultCallback m_rcb;
+ AsyncResultQueue* const m_arq;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_TxnAsyncContext_h_
diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp
new file mode 100644
index 0000000000..2af8bd4da3
--- /dev/null
+++ b/cpp/src/qpid/broker/TxnBuffer.cpp
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TxnBuffer.cpp
+ */
+
+#include "TxnBuffer.h"
+
+#include "AsyncResultHandle.h"
+#include "AsyncStore.h"
+#include "TxnAsyncContext.h"
+#include "TxnOp.h"
+
+#include "qpid/Exception.h"
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+TxnBuffer::TxnBuffer(AsyncResultQueue& arq) :
+ m_store(0),
+ m_resultQueue(arq),
+ m_state(NONE)
+{}
+
+TxnBuffer::~TxnBuffer()
+{}
+
+void
+TxnBuffer::enlist(boost::shared_ptr<TxnOp> op)
+{
+//std::cout << "TTT TxnBuffer::enlist" << std::endl << std::flush;
+ m_ops.push_back(op);
+}
+
+bool
+TxnBuffer::prepare(TxnHandle& th)
+{
+//std::cout << "TTT TxnBuffer::prepare" << std::endl << std::flush;
+ for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ if (!(*i)->prepare(th)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void
+TxnBuffer::commit()
+{
+//std::cout << "TTT TxnBuffer::commit" << std::endl << std::flush;
+ for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ (*i)->commit();
+ }
+ m_ops.clear();
+}
+
+void
+TxnBuffer::rollback()
+{
+//std::cout << "TTT TxnBuffer::rollback" << std::endl << std::flush;
+ for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ (*i)->rollback();
+ }
+ m_ops.clear();
+}
+
+bool
+TxnBuffer::commitLocal(AsyncTransactionalStore* const store)
+{
+//std::cout << "TTT TxnBuffer::commitLocal" << std::endl << std::flush;
+ if (store) {
+ try {
+ m_store = store;
+ asyncLocalCommit();
+ } catch (std::exception& e) {
+ std::cerr << "Commit failed: " << e.what() << std::endl;
+ } catch (...) {
+ std::cerr << "Commit failed (unknown exception)" << std::endl;
+ }
+ }
+ return false;
+}
+
+// static
+void
+TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh)
+{
+ if (arh) {
+ boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ tac->getTxnBuffer()->asyncLocalAbort();
+ std::cerr << "Transaction xid=\"" << tac->getTransactionContext().getXid() << "\": Operation " << tac->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ tac->getTxnBuffer()->asyncLocalAbort();
+ } else {
+//std::cout << "TTT TxnBuffer::handleAsyncResult() op=" << tac->getOpStr() << std::endl << std::flush;
+ if (tac->getOpCode() == qpid::asyncStore::AsyncOperation::TXN_ABORT) {
+ tac->getTxnBuffer()->asyncLocalAbort();
+ } else {
+ tac->getTxnBuffer()->asyncLocalCommit();
+ }
+ }
+ }
+}
+
+void
+TxnBuffer::asyncLocalCommit()
+{
+ assert(m_store != 0);
+ switch(m_state) {
+ case NONE:
+//std::cout << "TTT TxnBuffer::asyncLocalCommit: NONE->PREPARE" << std::endl << std::flush;
+ m_state = PREPARE;
+ m_txnHandle = m_store->createTxnHandle(this);
+ prepare(m_txnHandle);
+ break;
+ case PREPARE:
+//std::cout << "TTT TxnBuffer::asyncLocalCommit: PREPARE->COMMIT" << std::endl << std::flush;
+ m_state = COMMIT;
+ {
+ boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
+ m_txnHandle,
+ qpid::asyncStore::AsyncOperation::TXN_COMMIT,
+ &handleAsyncResult,
+ &m_resultQueue));
+ m_store->submitCommit(m_txnHandle, tac);
+ }
+ break;
+ case COMMIT:
+//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMMIT->COMPLETE" << std::endl << std::flush;
+ commit();
+ m_state = COMPLETE;
+ //delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
+ break;
+ default: ;
+//std::cout << "TTT TxnBuffer:asyncLocalCommit: Unexpected state " << m_state << std::endl << std::flush;
+ }
+}
+
+void
+TxnBuffer::asyncLocalAbort()
+{
+ assert(m_store != 0);
+ switch (m_state) {
+ case NONE:
+ case PREPARE:
+ case COMMIT:
+//std::cout << "TTT TxnBuffer::asyncRollback: xxx->ROLLBACK" << std::endl << std::flush;
+ m_state = ROLLBACK;
+ {
+ boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
+ m_txnHandle,
+ qpid::asyncStore::AsyncOperation::TXN_ABORT,
+ &handleAsyncResult,
+ &m_resultQueue));
+ m_store->submitCommit(m_txnHandle, tac);
+ }
+ break;
+ case ROLLBACK:
+//std::cout << "TTT TxnBuffer:asyncRollback: ROLLBACK->COMPLETE" << std::endl << std::flush;
+ rollback();
+ m_state = COMPLETE;
+ //delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
+ default: ;
+//std::cout << "TTT TxnBuffer:asyncRollback: Unexpected state " << m_state << std::endl << std::flush;
+ }
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h
new file mode 100644
index 0000000000..308a91aa03
--- /dev/null
+++ b/cpp/src/qpid/broker/TxnBuffer.h
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TxnBuffer.h
+ */
+
+#ifndef qpid_broker_TxnBuffer_h_
+#define qpid_broker_TxnBuffer_h_
+
+#include "TxnHandle.h"
+
+//#include <boost/enable_shared_from_this.hpp>
+#include <boost/shared_ptr.hpp>
+#include <vector>
+
+namespace qpid {
+namespace broker {
+
+class AsyncResultHandle;
+class AsyncResultQueue;
+class AsyncTransactionalStore;
+class TxnOp;
+
+class TxnBuffer /*: public boost::enable_shared_from_this<TxnBuffer>*/ {
+public:
+ TxnBuffer(AsyncResultQueue& arq);
+ virtual ~TxnBuffer();
+
+ void enlist(boost::shared_ptr<TxnOp> op);
+ bool prepare(TxnHandle& th);
+ void commit();
+ void rollback();
+ bool commitLocal(AsyncTransactionalStore* const store);
+
+ // --- Async operations ---
+ static void handleAsyncResult(const AsyncResultHandle* const arh);
+ void asyncLocalCommit();
+ void asyncLocalAbort();
+
+private:
+ std::vector<boost::shared_ptr<TxnOp> > m_ops;
+ TxnHandle m_txnHandle;
+ AsyncTransactionalStore* m_store;
+ AsyncResultQueue& m_resultQueue;
+
+ typedef enum {NONE = 0, PREPARE, COMMIT, ROLLBACK, COMPLETE} e_txnState;
+ e_txnState m_state;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_TxnBuffer_h_
diff --git a/cpp/src/qpid/broker/TxnHandle.cpp b/cpp/src/qpid/broker/TxnHandle.cpp
index 1e6cd2ac6f..07d46b4235 100644
--- a/cpp/src/qpid/broker/TxnHandle.cpp
+++ b/cpp/src/qpid/broker/TxnHandle.cpp
@@ -69,4 +69,16 @@ TxnHandle::is2pc() const
return impl->is2pc();
}
+void
+TxnHandle::incrOpCnt()
+{
+ impl->incrOpCnt();
+}
+
+void
+TxnHandle::decrOpCnt()
+{
+ impl->decrOpCnt();
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TxnHandle.h b/cpp/src/qpid/broker/TxnHandle.h
index 34a9c69434..8bed14e16a 100644
--- a/cpp/src/qpid/broker/TxnHandle.h
+++ b/cpp/src/qpid/broker/TxnHandle.h
@@ -44,6 +44,8 @@ public:
// TxnHandleImpl methods
const std::string& getXid() const;
bool is2pc() const;
+ void incrOpCnt();
+ void decrOpCnt();
private:
friend class qpid::messaging::PrivateImplRef<TxnHandle>;
diff --git a/cpp/src/qpid/broker/AsyncStore.cpp b/cpp/src/qpid/broker/TxnOp.h
index 10cb3d27cf..e98429535e 100644
--- a/cpp/src/qpid/broker/AsyncStore.cpp
+++ b/cpp/src/qpid/broker/TxnOp.h
@@ -17,21 +17,24 @@
* under the License.
*/
-#include "AsyncStore.h"
+/**
+ * \file TxnOp.h
+ */
+
+#ifndef qpid_broker_TxnOp_h_
+#define qpid_broker_TxnOp_h_
namespace qpid {
namespace broker {
-AsyncResultQueue::~AsyncResultQueue()
-{}
-
-BrokerAsyncContext::~BrokerAsyncContext()
-{}
-
-DataSource::~DataSource()
-{}
-
-AsyncStore::~AsyncStore()
-{}
+class TxnOp{
+public:
+ virtual ~TxnOp() {}
+ virtual bool prepare(TxnHandle& th) throw() = 0;
+ virtual void commit() throw() = 0;
+ virtual void rollback() throw() = 0;
+};
}} // namespace qpid::broker
+
+#endif // qpid_broker_TxnOp_h_